前言
年前閑着無聊,研究了一陣子爬蟲技術,接觸到爬蟲框架 WebMagic,感覺很好用。
在之后的工作中,接手了新站與第三方接口對接的工作,主要的工作是去抓取對方接口的內容;初始的時候,之前負責該工作的同事,是手動使用多線程去抓取,在應用的過程當中暴露了不少問題。比如對於接口內容超級多的時候,雖然使用了多線程,但是抓取的效率很低,而且也沒有實現增量抓取,每次都需要去全量抓取,跑一次基本需要好幾天-.-;小說是連載的情況下,這種問題是亟需解決的。
趁着熟悉了新兵器 WebMagic, 果斷在項目中進行引入,解決以上問題。功能上線后,替換了原有的多線程抓取,目前已經十分穩定, 基本上配置好任務之后,就無需再人工干預了。
以下,正文是基於我學習 WebMagic 時練手項目,功能和在公司開發的差不多,只不過我本地開發的是去抓取盜版網站的內容。
項目預覽
-
菜單管理

-
爬蟲任務管理

-
實現了爬蟲的狀態監控,以及可視化啟停

初入手兵器-基本使用
-
爬蟲套路分析
先看官方文檔的總體架構圖

大部分模塊WebMagic已經提供了默認實現。
一般來說,對於編寫一個爬蟲,PageProcessor是需要編寫的部分,而Spider則是創建和控制爬蟲的入口。得益於 WebMagic 框架的良好封裝,對於框架的使用者來說,所需要編寫的代碼幾乎只有爬蟲的邏輯代碼,而對於怎么爬,維護任務隊列的事情,WebMagic 都可以替我們做好。開始我們的爬蟲之旅吧!
-
引入依賴
本文中所使用到的項目是基於 Maven 的 SSM 項目,在 pom.xml 中引入 WebMagic 的依賴。<dependency> <groupId>us.codecraft</groupId> <artifactId>webmagic-core</artifactId> <version>0.7.3</version> </dependency> <dependency> <groupId>us.codecraft</groupId> <artifactId>webmagic-extension</artifactId> <version>0.7.3</version> </dependency> -
基本類圖
先將對應的處理類進行抽象出來,方便統一處理。

-
每個爬蟲都有其對應的配置信息

Site 是 抓取網站的相關配置,包括編碼、抓取間隔、重試次數 -
對應的實現類重寫 process 方法,在方法中實現對應的爬蟲邏輯處理

-
啟動爬蟲

-
爬蟲的使用就簡單帶過,具體可以將本文與官方文檔結合使用,官方文檔的示例只是基於 main 方法。
爬蟲監控
-
擴展源碼
為了實現項目預覽的效果,實現爬蟲的狀態監控,需要對爬蟲進行擴展。因為官網提供的方式功能不足以達到在頁面展示的效果。添加監控非常簡單,獲取一個 SpiderMonitor 的單例 SpiderMonitor.instance(),並將你想要監控的 Spider 注冊進去即可。你可以注冊多個 Spider 到 SpiderMonitor 中。
查看 SpiderMonitor 源代碼后,如果調用的是 獲取一個 SpiderMonitor 的單例 SpiderMonitor 的 注冊方法,發現 WebMagic 將每只爬蟲的狀態對象 SpiderStatusMXBean 全部添加到一個 List 集合當中去,這樣就難以區分具體是哪一只爬蟲的狀態,所以我們需要對 SpiderMonitor 進行擴展。
將 SpiderMonitor 中的
private List<SpiderStatusMXBean> spiderStatuses = new ArrayList<SpiderStatusMXBean>();
修改為 Map 集合,key 選擇 Spider 的 UUID 作為唯一區分爬蟲的標記。@Experimental public class MySpiderMonitor { private static MySpiderMonitor INSTANCE = new MySpiderMonitor(); private AtomicBoolean started = new AtomicBoolean(false); private Logger logger = LoggerFactory.getLogger(getClass()); private MBeanServer mbeanServer; private String jmxServerName; private Map<String,MySpiderStatus> spiderStatuses = new HashMap<String,MySpiderStatus>(); protected MySpiderMonitor() { jmxServerName = "WebMagic"; mbeanServer = ManagementFactory.getPlatformMBeanServer(); } public Map<String,MySpiderStatus> getSpiderStatuses() { return spiderStatuses; } /** * Register spider for monitor. * * @param spiders spiders * @return this */ public synchronized MySpiderMonitor register(Spider... spiders) throws JMException { for (Spider spider : spiders) { MyMonitorSpiderListener monitorSpiderListener = new MyMonitorSpiderListener(); if (spider.getSpiderListeners() == null) { List<SpiderListener> spiderListeners = new ArrayList<SpiderListener>(); spiderListeners.add(monitorSpiderListener); spider.setSpiderListeners(spiderListeners); } else { spider.getSpiderListeners().add(monitorSpiderListener); } MySpiderStatus spiderStatusMBean = getSpiderStatusMBean(spider, monitorSpiderListener); registerMBean(spiderStatusMBean); spiderStatuses.put(spider.getUUID(),spiderStatusMBean); } return this; } protected MySpiderStatus getSpiderStatusMBean(Spider spider, MyMonitorSpiderListener monitorSpiderListener) { return new MySpiderStatus(spider, monitorSpiderListener); } public static MySpiderMonitor instance() { return INSTANCE; } public class MyMonitorSpiderListener implements SpiderListener { private final AtomicInteger successCount = new AtomicInteger(0); private final AtomicInteger errorCount = new AtomicInteger(0); private List<String> errorUrls = Collections.synchronizedList(new ArrayList<String>()); @Override public void onSuccess(Request request) { successCount.incrementAndGet(); } @Override public void onError(Request request) { errorUrls.add(request.getUrl()); errorCount.incrementAndGet(); } public AtomicInteger getSuccessCount() { return successCount; } public AtomicInteger getErrorCount() { return errorCount; } public List<String> getErrorUrls() { return errorUrls; } } protected void registerMBean(SpiderStatusMXBean spiderStatus) throws MalformedObjectNameException, InstanceAlreadyExistsException, MBeanRegistrationException, NotCompliantMBeanException { ObjectName objName = new ObjectName(jmxServerName + ":name=" + spiderStatus.getName()); if(mbeanServer.isRegistered(objName)==false) { mbeanServer.registerMBean(spiderStatus, objName); } } }需要注意的是,SpiderMonitor 中使用的 SpiderStatus 也需要進行一同擴展。
public class MySpiderStatus implements SpiderStatusMXBean { protected final Spider spider; protected Logger logger = LoggerFactory.getLogger(getClass()); protected final MySpiderMonitor.MyMonitorSpiderListener monitorSpiderListener; public MySpiderStatus(Spider spider, MySpiderMonitor.MyMonitorSpiderListener monitorSpiderListener) { this.spider = spider; this.monitorSpiderListener = monitorSpiderListener; } public Spider getSpider() { return this.spider; } public String getName() { return spider.getUUID(); } public int getLeftPageCount() { if (spider.getScheduler() instanceof MonitorableScheduler) { return ((MonitorableScheduler) spider.getScheduler()).getLeftRequestsCount(spider); } logger.warn("Get leftPageCount fail, try to use a Scheduler implement MonitorableScheduler for monitor count!"); return -1; } public int getTotalPageCount() { if (spider.getScheduler() instanceof MonitorableScheduler) { return ((MonitorableScheduler) spider.getScheduler()).getTotalRequestsCount(spider); } logger.warn("Get totalPageCount fail, try to use a Scheduler implement MonitorableScheduler for monitor count!"); return -1; } @Override public int getSuccessPageCount() { return monitorSpiderListener.getSuccessCount().get(); } @Override public int getErrorPageCount() { return monitorSpiderListener.getErrorCount().get(); } public List<String> getErrorPages() { return monitorSpiderListener.getErrorUrls(); } @Override public String getStatus() { return spider.getStatus().name(); } @Override public int getThread() { return spider.getThreadAlive(); } public void start() { spider.start(); } public void stop() { spider.stop(); } @Override public Date getStartTime() { return spider.getStartTime(); } @Override public int getPagePerSecond() { int runSeconds = (int) (System.currentTimeMillis() - getStartTime().getTime()) / 1000; return getSuccessPageCount() / runSeconds; } } -
重寫爬蟲啟動處代碼
@Service public class WebMagicService { @Resource private ApplicationContext context; @Resource private TaskService taskService; public void run(TaskDTO taskDTO, boolean runAsync) throws Exception { MySpiderMonitor spiderMonitor = MySpiderMonitor.instance(); String ruleJson = taskDTO.getTaskRuleJson(); WebMagicConfig config = JSONObject.parseObject(ruleJson, WebMagicConfig.class); SpiderConfig spiderConfig = config.getSpider(); AbstractPageProcess pageProcess = context.getBean(spiderConfig.getProcesser(), AbstractPageProcess.class); pageProcess.init(config); pageProcess.setUuid(taskDTO.getSpiderUUID()); Spider spider = Spider.create(pageProcess).thread(spiderConfig.getThread()); spider.setUUID(taskDTO.getSpiderUUID()); List<String> pipelines = spiderConfig.getPipeline(); for (String pipeline : pipelines) { Pipeline bean = context.getBean(pipeline, Pipeline.class); if (bean != null) { spider.addPipeline(bean); } } // 設置Downloader // 設置Scheduler // 注冊爬蟲 spiderMonitor.register(spider); spider.addUrl(spiderConfig.getStartUrl()); if (runAsync) { spider.runAsync(); } else { spider.run(); } } /** * 爬蟲狀態監控 * @return */ public List<TaskDTO> runTaskList() { MySpiderMonitor spiderMonitor = MySpiderMonitor.instance(); Map<String, MySpiderStatus> spiderStatuses = spiderMonitor.getSpiderStatuses(); List<TaskDTO> taskDTOList = taskService.findAll(); for (TaskDTO taskDTO : taskDTOList) { MySpiderStatus spiderStatus = spiderStatuses.get(taskDTO.getSpiderUUID()); if (spiderStatus == null) { taskDTO.setRunState(Spider.Status.Stopped.name()); } else { taskDTO.setRunState(spiderStatus.getStatus()); } } return taskDTOList; } public TaskDTO stop(TaskDTO taskDTO) { MySpiderMonitor spiderMonitor = MySpiderMonitor.instance(); Map<String, MySpiderStatus> spiderStatuses = spiderMonitor.getSpiderStatuses(); MySpiderStatus spiderStatus = spiderStatuses.get(taskDTO.getSpiderUUID()); if (spiderStatus != null) { spiderStatus.stop(); spiderStatus.getSpider().close(); } return taskDTO; } }創建爬蟲時,將爬蟲注冊到 MySpiderMonitor 中,之后通過 getSpiderStatuses 方法即可獲取所有爬蟲的狀態了。
