WebMagic之爬蟲監控


訪問我的博客

前言

年前閑着無聊,研究了一陣子爬蟲技術,接觸到爬蟲框架 WebMagic,感覺很好用。
在之后的工作中,接手了新站與第三方接口對接的工作,主要的工作是去抓取對方接口的內容;初始的時候,之前負責該工作的同事,是手動使用多線程去抓取,在應用的過程當中暴露了不少問題。比如對於接口內容超級多的時候,雖然使用了多線程,但是抓取的效率很低,而且也沒有實現增量抓取,每次都需要去全量抓取,跑一次基本需要好幾天-.-;小說是連載的情況下,這種問題是亟需解決的。

趁着熟悉了新兵器 WebMagic, 果斷在項目中進行引入,解決以上問題。功能上線后,替換了原有的多線程抓取,目前已經十分穩定, 基本上配置好任務之后,就無需再人工干預了。

以下,正文是基於我學習 WebMagic 時練手項目,功能和在公司開發的差不多,只不過我本地開發的是去抓取盜版網站的內容。

項目預覽

  1. 菜單管理

  2. 爬蟲任務管理

  3. 實現了爬蟲的狀態監控,以及可視化啟停

初入手兵器-基本使用

  1. 爬蟲套路分析
    先看官方文檔的總體架構圖

    大部分模塊WebMagic已經提供了默認實現。
    一般來說,對於編寫一個爬蟲,PageProcessor是需要編寫的部分,而Spider則是創建和控制爬蟲的入口。

    得益於 WebMagic 框架的良好封裝,對於框架的使用者來說,所需要編寫的代碼幾乎只有爬蟲的邏輯代碼,而對於怎么爬,維護任務隊列的事情,WebMagic 都可以替我們做好。開始我們的爬蟲之旅吧!

  2. 引入依賴
    本文中所使用到的項目是基於 Maven 的 SSM 項目,在 pom.xml 中引入 WebMagic 的依賴。

    <dependency>
    	<groupId>us.codecraft</groupId>
    	<artifactId>webmagic-core</artifactId>
    	<version>0.7.3</version>
    </dependency>
    <dependency>
    	<groupId>us.codecraft</groupId>
    	<artifactId>webmagic-extension</artifactId>
    	<version>0.7.3</version>
    </dependency>
    
  3. 基本類圖
    先將對應的處理類進行抽象出來,方便統一處理。

  4. 每個爬蟲都有其對應的配置信息

    Site 是 抓取網站的相關配置,包括編碼、抓取間隔、重試次數

  5. 對應的實現類重寫 process 方法,在方法中實現對應的爬蟲邏輯處理

  6. 啟動爬蟲

  7. 爬蟲的使用就簡單帶過,具體可以將本文與官方文檔結合使用,官方文檔的示例只是基於 main 方法。

爬蟲監控

  1. 擴展源碼
    為了實現項目預覽的效果,實現爬蟲的狀態監控,需要對爬蟲進行擴展。因為官網提供的方式功能不足以達到在頁面展示的效果。

    添加監控非常簡單,獲取一個 SpiderMonitor 的單例 SpiderMonitor.instance(),並將你想要監控的 Spider 注冊進去即可。你可以注冊多個 Spider 到 SpiderMonitor 中。

    查看 SpiderMonitor 源代碼后,如果調用的是 獲取一個 SpiderMonitor 的單例 SpiderMonitor 的 注冊方法,發現 WebMagic 將每只爬蟲的狀態對象 SpiderStatusMXBean 全部添加到一個 List 集合當中去,這樣就難以區分具體是哪一只爬蟲的狀態,所以我們需要對 SpiderMonitor 進行擴展。

    將 SpiderMonitor 中的
    private List<SpiderStatusMXBean> spiderStatuses = new ArrayList<SpiderStatusMXBean>();
    修改為 Map 集合,key 選擇 Spider 的 UUID 作為唯一區分爬蟲的標記。

    @Experimental
    public class MySpiderMonitor {
    
    	private static MySpiderMonitor INSTANCE = new MySpiderMonitor();
    
    	private AtomicBoolean started = new AtomicBoolean(false);
    
    	private Logger logger = LoggerFactory.getLogger(getClass());
    
    	private MBeanServer mbeanServer;
    
    	private String jmxServerName;
    
    	private Map<String,MySpiderStatus> spiderStatuses = new HashMap<String,MySpiderStatus>();
    
    	protected MySpiderMonitor() {
    		jmxServerName = "WebMagic";
    		mbeanServer = ManagementFactory.getPlatformMBeanServer();
    	}
        public Map<String,MySpiderStatus> getSpiderStatuses()
        {
            return spiderStatuses;
        }
    
    	/**
    	 * Register spider for monitor.
    	 *
    	 * @param spiders spiders
    	 * @return this
    	 */
    	public synchronized MySpiderMonitor register(Spider... spiders) throws JMException {
    		for (Spider spider : spiders) {
    			MyMonitorSpiderListener monitorSpiderListener = new MyMonitorSpiderListener();
    			if (spider.getSpiderListeners() == null) {
    				List<SpiderListener> spiderListeners = new ArrayList<SpiderListener>();
    				spiderListeners.add(monitorSpiderListener);
    				spider.setSpiderListeners(spiderListeners);
    			} else {
    				spider.getSpiderListeners().add(monitorSpiderListener);
    			}
                MySpiderStatus spiderStatusMBean = getSpiderStatusMBean(spider, monitorSpiderListener);
    			registerMBean(spiderStatusMBean);
    			spiderStatuses.put(spider.getUUID(),spiderStatusMBean);
    		}
    		return this;
    	}
    
    	protected MySpiderStatus getSpiderStatusMBean(Spider spider, MyMonitorSpiderListener monitorSpiderListener) {
    		return new MySpiderStatus(spider, monitorSpiderListener);
    	}
    
    	public static MySpiderMonitor instance() {
    		return INSTANCE;
    	}
    
    	public class MyMonitorSpiderListener implements SpiderListener {
    
    		private final AtomicInteger successCount = new AtomicInteger(0);
    
    		private final AtomicInteger errorCount = new AtomicInteger(0);
    
    		private List<String> errorUrls = Collections.synchronizedList(new ArrayList<String>());
    
    		@Override
    		public void onSuccess(Request request) {
    			successCount.incrementAndGet();
    		}
    
    		@Override
    		public void onError(Request request) {
    			errorUrls.add(request.getUrl());
    			errorCount.incrementAndGet();
    		}
    
    		public AtomicInteger getSuccessCount() {
    			return successCount;
    		}
    
    		public AtomicInteger getErrorCount() {
    			return errorCount;
    		}
    
    		public List<String> getErrorUrls() {
    			return errorUrls;
    		}
    	}
    
    	protected void registerMBean(SpiderStatusMXBean spiderStatus) throws MalformedObjectNameException, InstanceAlreadyExistsException, MBeanRegistrationException, NotCompliantMBeanException {
    		ObjectName objName = new ObjectName(jmxServerName + ":name=" + spiderStatus.getName());
            if(mbeanServer.isRegistered(objName)==false)
            {
                mbeanServer.registerMBean(spiderStatus, objName);
            }
    	}
    
    }
    

    需要注意的是,SpiderMonitor 中使用的 SpiderStatus 也需要進行一同擴展。

    public class MySpiderStatus implements SpiderStatusMXBean {
    
        protected final Spider spider;
    
        protected Logger logger = LoggerFactory.getLogger(getClass());
    
        protected final MySpiderMonitor.MyMonitorSpiderListener monitorSpiderListener;
    
        public MySpiderStatus(Spider spider, MySpiderMonitor.MyMonitorSpiderListener monitorSpiderListener) {
            this.spider = spider;
            this.monitorSpiderListener = monitorSpiderListener;
        }
    
        public Spider getSpider()
        {
            return this.spider;
        }
    
        public String getName() {
            return spider.getUUID();
        }
    
        public int getLeftPageCount() {
            if (spider.getScheduler() instanceof MonitorableScheduler) {
                return ((MonitorableScheduler) spider.getScheduler()).getLeftRequestsCount(spider);
            }
            logger.warn("Get leftPageCount fail, try to use a Scheduler implement MonitorableScheduler for monitor count!");
            return -1;
        }
    
        public int getTotalPageCount() {
            if (spider.getScheduler() instanceof MonitorableScheduler) {
                return ((MonitorableScheduler) spider.getScheduler()).getTotalRequestsCount(spider);
            }
            logger.warn("Get totalPageCount fail, try to use a Scheduler implement MonitorableScheduler for monitor count!");
            return -1;
        }
    
        @Override
        public int getSuccessPageCount() {
            return monitorSpiderListener.getSuccessCount().get();
        }
    
        @Override
        public int getErrorPageCount() {
            return monitorSpiderListener.getErrorCount().get();
        }
    
        public List<String> getErrorPages() {
            return monitorSpiderListener.getErrorUrls();
        }
    
        @Override
        public String getStatus() {
            return spider.getStatus().name();
        }
    
        @Override
        public int getThread() {
            return spider.getThreadAlive();
        }
    
        public void start() {
            spider.start();
        }
    
        public void stop() {
            spider.stop();
        }
    
        @Override
        public Date getStartTime() {
            return spider.getStartTime();
        }
    
        @Override
        public int getPagePerSecond() {
            int runSeconds = (int) (System.currentTimeMillis() - getStartTime().getTime()) / 1000;
            return getSuccessPageCount() / runSeconds;
        }
    
    }
    
  2. 重寫爬蟲啟動處代碼

    @Service
    public class WebMagicService {
        
        @Resource
        private ApplicationContext context;
        @Resource
        private TaskService taskService;
        
        public void run(TaskDTO taskDTO, boolean runAsync) throws Exception {
            MySpiderMonitor spiderMonitor = MySpiderMonitor.instance();
            String ruleJson = taskDTO.getTaskRuleJson();
            WebMagicConfig config = JSONObject.parseObject(ruleJson, WebMagicConfig.class);
            SpiderConfig spiderConfig = config.getSpider();
            AbstractPageProcess pageProcess = context.getBean(spiderConfig.getProcesser(), AbstractPageProcess.class);
        
            pageProcess.init(config);
            pageProcess.setUuid(taskDTO.getSpiderUUID());
        
            Spider spider = Spider.create(pageProcess).thread(spiderConfig.getThread());
            spider.setUUID(taskDTO.getSpiderUUID());
        
            List<String> pipelines = spiderConfig.getPipeline();
            for (String pipeline : pipelines) {
                Pipeline bean = context.getBean(pipeline, Pipeline.class);
                if (bean != null) {
                    spider.addPipeline(bean);
                }
            }
            // 設置Downloader
            // 設置Scheduler
        
            // 注冊爬蟲
            spiderMonitor.register(spider);
            spider.addUrl(spiderConfig.getStartUrl());
        
            if (runAsync) {
                spider.runAsync();
            } else {
                spider.run();
            }
        }
        
        /**
         * 爬蟲狀態監控
         * @return
         */
        public List<TaskDTO> runTaskList() {
        
            MySpiderMonitor spiderMonitor = MySpiderMonitor.instance();
            Map<String, MySpiderStatus> spiderStatuses = spiderMonitor.getSpiderStatuses();
        
            List<TaskDTO> taskDTOList = taskService.findAll();
            for (TaskDTO taskDTO : taskDTOList) {
                MySpiderStatus spiderStatus = spiderStatuses.get(taskDTO.getSpiderUUID());
                if (spiderStatus == null) {
                    taskDTO.setRunState(Spider.Status.Stopped.name());
                } else {
                    taskDTO.setRunState(spiderStatus.getStatus());
                }
            }
        
            return taskDTOList;
        }
        
        public TaskDTO stop(TaskDTO taskDTO) {
            MySpiderMonitor spiderMonitor = MySpiderMonitor.instance();
            Map<String, MySpiderStatus> spiderStatuses = spiderMonitor.getSpiderStatuses();
            MySpiderStatus spiderStatus = spiderStatuses.get(taskDTO.getSpiderUUID());
        
            if (spiderStatus != null) {
                spiderStatus.stop();
                spiderStatus.getSpider().close();
            }
        
            return taskDTO;
        }
    }
    

    創建爬蟲時,將爬蟲注冊到 MySpiderMonitor 中,之后通過 getSpiderStatuses 方法即可獲取所有爬蟲的狀態了。

資源下載


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM