本次爬蟲所要爬取的數據為京東建材數據,在爬取京東的過程中,發現京東並沒有做反爬蟲動作,所以爬取的過程還是比較順利的。
為什么要用WebMagic:
- WebMagic作為一款輕量級的Java爬蟲框架,可以極大的減少爬蟲的開發時間
為什么要使用MQ(本項目用的RabbitMq,其他的MQ也可以):
- 解耦各個模塊,實現各個爬蟲之間相互獨立
- 項目健壯性,不管是主動還是被動原因(斷電等狀況)停下了項目,只需要重新讀取MQ中的數據就能繼續工作
- 拆分了業務邏輯,使每個模塊更加簡單。代碼易於編寫
為什么要用ES:
- 方便后期搜索
- 業務需求
項目大體架構圖:
此處有多個spider,前面幾層的spider分別處理不同模塊的數據,將處理好的數據放入mq,供下一級的spider來調用。
本次爬取的最終頁面是商品的詳情頁,所以最后一級的spider將詳情數據爬取完之后存儲到ES之中。
spider1處理京東建材主頁:
spider2:處理京東分頁欄:
spider3:處理京東列表:
spider4:處理產品詳情:
根據上面的框架圖。我們發現每一個spider都需要跟MQ鏈接,第一級的Spider不需要對MQ進行消費,最后一級的Spider不需要負責Mq數據的生產。 其他的spider既需要對MQ進行消費,也需要對MQ進行生產。
因此我們給沒一個spider都綁上一個消費者和生產者,框架示意圖如下:
pipline獲取想要的數據后,在reids儲存爬取過的路徑,如果有重復爬取過的路徑就不進行保存。
WebMagic作為一款優秀爬蟲框架,拓展性良好,我們在原先的框架上稍作拓展。
首先附上Spider拓展后的代碼:
package com.chinaredstar.jc.core.spider; import com.chinaredstar.jc.core.page.CrawlerPage; import com.chinaredstar.jc.crawler.consumer.Consumer; import com.chinaredstar.jc.infras.utils.JSONUtil; import com.rabbitmq.client.QueueingConsumer; import org.apache.commons.collections.map.HashedMap; import us.codecraft.webmagic.Request; import us.codecraft.webmagic.Spider; import us.codecraft.webmagic.SpiderListener; import us.codecraft.webmagic.processor.PageProcessor; import java.io.IOException; import java.util.ArrayList; import java.util.Map; /** * @author zhuangj * @date 2017/12/1 */ public class JcSpider extends Spider { /** * 隊列消費者,為spider提供數據 */ private Consumer consumer; /** * 隊列消費者,為spider提供數據 */ private String consumerQueueName; /** * 用於確認Mq中的消息是否執行完畢 */ private Map<String,QueueingConsumer.Delivery> ackMap=new HashedMap(); /** * 剩余消息數量 */ private Integer messageNum=0; /** * 父節點爬蟲,父節點停止,子節點才能停止 */ private JcSpider parentSpider; public JcSpider(PageProcessor pageProcessor) { super(pageProcessor); exitWhenComplete=false; } public Consumer getConsumer() { return consumer; } public void setConsumer(Consumer consumer) { this.consumer = consumer; } public JcSpider getParentSpider() { return parentSpider; } public void setParentSpider(JcSpider parentSpider) { this.parentSpider = parentSpider; } public Integer getMessageNum() { return messageNum; } public void setMessageNum(Integer messageNum) { this.messageNum = messageNum; } public String getConsumerQueueName() { return consumerQueueName; } public void setConsumerQueueName(String consumerQueueName) { this.consumerQueueName = consumerQueueName; } @Override protected void initComponent() { super.initComponent(); this.setSpiderListeners(new ArrayList<>()); this.requestMessageListen(); // this.startConsumer(consumerQueueName); } public void startConsumer(String queueName) { if(consumer==null){ this.exitWhenComplete=true; return; } logger.info("queueName:{},startConsumer",queueName); JcSpider jcSpider = this; Runnable myRunnable = () -> { try { messageNum=consumer.getQueueMsgNum(queueName); Status parentStatus = Status.Stopped; if(parentSpider!=null){ parentStatus=parentSpider.getStatus(); } while (!parentStatus.equals(Status.Stopped) || messageNum > 0) { if(!jcSpider.getStatus().equals(Status.Running)){ Thread.sleep(500); } QueueingConsumer.Delivery delivery = consumer.getDeliveryMessage(queueName); String message = new String(delivery.getBody()); CrawlerPage crawlerPage = JSONUtil.toObject(message, CrawlerPage.class); Request request = crawlerPage.translateRequest(); //添加監聽 ackMap.put(request.getUrl(),delivery); jcSpider.addRequest(request); messageNum=consumer.getQueueMsgNum(queueName); if(messageNum==0){ Thread.sleep(500); } } System.out.println("spider:"+getUUID()+",consumer stop"); if(parentSpider!=null){ System.out.println("parentStatus:"+parentSpider.getStatus().name()); } System.out.println("messageNum:"+messageNum); //父級沒有消息,消息隊列沒有消息,爬蟲完成后就退出了 Thread.sleep(2000); this.exitWhenComplete=true; } catch (Exception e) { e.printStackTrace(); } }; Thread thread = new Thread(myRunnable); thread.start(); } /** * 添加請求RequestMessage */ private void requestMessageListen(){ this.getSpiderListeners().add(new SpiderListener() { @Override public void onSuccess(Request request) { ackMq(request); } @Override public void onError(Request request) { ackMq(request); } }); } public void ackMq(Request request){ try { QueueingConsumer.Delivery delivery=ackMap.get(request.getUrl()); if(delivery!=null){ consumer.ackMessage(delivery); ackMap.remove(request.getUrl()); } } catch (IOException e) { e.printStackTrace(); } } }
我們在原先的webMagic spider基礎上添加一個異步的消費者consumer(consumer封裝了rabbitMq的消費操作,比較簡單就不附代碼),它的作用:
- 負責讀取MQ中待消費的信息,並將需要爬取數據添加的spider的requestList。
- 記錄所有讀取到消息。當spider消費完這段消息后,返回消息的ack給MQ,表示消息已經被成功消費。
- 讀取queue中的消息剩余量,作為關閉spider的條件之一
spider基礎上添加一個父級的spider,它的作用:
- 配合consumer讀取消息剩余量關閉spider。如果父級的spider不存在或者已經關閉,當前spider已經消費完畢,queue中也沒有剩余的消息。當前的spider就可以關閉了
spider根據級別添加MqPipeline或者EsPipeline,將處理后的數據添加到MQ或者ES之中:
MqPipeline代碼如下:
package com.chinaredstar.jc.core.pipeline; import com.chinaredstar.jc.core.page.CrawlerPage; import com.chinaredstar.jc.core.util.RedisUtil; import com.chinaredstar.jc.crawler.producer.Producer; import com.chinaredstar.jc.infras.utils.JSONUtil; import org.apache.commons.collections4.CollectionUtils; import us.codecraft.webmagic.ResultItems; import us.codecraft.webmagic.Task; import us.codecraft.webmagic.pipeline.Pipeline; import java.io.IOException; import java.util.List; /** * 消息隊列pipeline * @author lenovo * @date 2017/12/1 */ public class MqPipeline implements Pipeline { private Producer producer; public MqPipeline(Producer producer) { this.producer = producer; } public Producer getProducer() { return producer; } public void setProducer(Producer producer) { this.producer = producer; } @Override public void process(ResultItems resultItems, Task task) { try { List<CrawlerPage> crawlerPageList= resultItems.get("nextPageList"); if(CollectionUtils.isEmpty(crawlerPageList)) { return; } String key=task.getUUID(); for(CrawlerPage page:crawlerPageList){ //校驗路徑是否爬取過 String url=page.getCurrentUrl(); if(RedisUtil.sContain(key,url)){ continue; } producer.basicPublish(JSONUtil.toJSonString(page)); RedisUtil.sAdd(key,url); } } catch (IOException e) { e.printStackTrace(); } } }
EsPipeline如下:
package com.chinaredstar.jc.core.pipeline; import com.chinaredstar.jc.core.es.EsConnectionPool; import com.chinaredstar.jc.infras.utils.json.JsonFormatter; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest; import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.common.xcontent.XContentType; import us.codecraft.webmagic.ResultItems; import us.codecraft.webmagic.Task; import us.codecraft.webmagic.pipeline.Pipeline; import java.io.IOException; import java.net.UnknownHostException; /** * Created by zhuangj on 2017/12/1. */ public class EsPipeline implements Pipeline { private String taskName; private EsConnectionPool pool=new EsConnectionPool(3); public EsPipeline(String taskName) { this.taskName=taskName; } @Override public void process(ResultItems resultItems, Task task) { try { TransportClient client=pool.get(); this.createIndex(client,taskName); this.insertData(client,taskName,"crawler",null, JsonFormatter.toJsonAsString(resultItems.getAll())); // System.out.println("save ES:" + JsonFormatter.toJsonAsString(resultItems.getAll())); pool.returnToPool(client); } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } /** * 創建索引 */ private boolean createIndex(TransportClient client,String index) { try { if (isIndexExist(client,index)) { return true; } client.admin().indices().create(new CreateIndexRequest(index)).actionGet(); return true; }catch (Exception e){ return false; } } /** * 查詢索引是否存在 * @param index * @return */ private boolean isIndexExist(TransportClient client,String index) throws UnknownHostException, InterruptedException { IndicesExistsRequest request = new IndicesExistsRequest(index); IndicesExistsResponse response = client.admin().indices().exists(request).actionGet(); return response.isExists(); } /** * 導入數據 * * @param index * @param type * @param id * @param data * @throws IOException */ private BulkResponse insertData(TransportClient client,String index, String type, String id, String data) throws IOException, InterruptedException { //核心方法BulkRequestBuilder拼接多個Json BulkRequestBuilder bulkRequest = client.prepareBulk(); IndexRequestBuilder requestBuilder = client.prepareIndex(index, type, id).setSource(data, XContentType.JSON); bulkRequest.add(requestBuilder); //插入文檔至ES, 完成! BulkResponse bulkRequestBuilder=bulkRequest.execute().actionGet(); return bulkRequestBuilder; } }
上方的EsPipine中創建了一個EsConnectionPool,使用池技術重用ES的conection,提高了數據儲存到ES中速度。
(后面發現Esclient使用異步方式來處理請求,本身自帶多線程功能。此處不需要使用池技術,只需要將client設置為單例,就可以了。)
在業務代碼中,將spider創建出來,並遞歸地創建子級spider
package com.chinaredstar.jc.crawler.biz.service.impl; import com.chinaredstar.jc.core.downloader.JcHttpClientDownloader; import com.chinaredstar.jc.core.page.CrawlerPage; import com.chinaredstar.jc.core.pipeline.EsPipeline; import com.chinaredstar.jc.core.pipeline.MqPipeline; import com.chinaredstar.jc.core.processor.jd.JdProcessorLevelEnum; import com.chinaredstar.jc.core.spider.JcSpider; import com.chinaredstar.jc.crawler.biz.service.IJdService; import com.chinaredstar.jc.crawler.channel.MqChannel; import com.chinaredstar.jc.crawler.common.MqConnectionFactory; import com.chinaredstar.jc.crawler.consumer.Consumer; import com.chinaredstar.jc.crawler.exchange.DefaultExchange; import com.chinaredstar.jc.crawler.exchange.Exchange; import com.chinaredstar.jc.crawler.producer.Producer; import org.springframework.stereotype.Service; import us.codecraft.webmagic.processor.PageProcessor; import java.io.IOException; import java.util.List; import java.util.concurrent.TimeoutException; /** * * 京東數據爬取服務類 * @author zhuangj * @date 2017/11/29 */ @Service public class JdServiceImpl implements IJdService { @Override public List<CrawlerPage> startSpider(String url,String taskName,Integer maxLevel) throws IOException, TimeoutException, InterruptedException { for(int level=1;level<maxLevel;level++){ createJcSpider(url, taskName, maxLevel,level,null); } return null; } @Override public List<CrawlerPage> startSpider(String url, String taskName, Integer level, Integer maxLevel) throws IOException, TimeoutException, InterruptedException { createJcSpider(url, taskName, maxLevel,level,null); return null; } private void createJcSpider(String url, String taskName, Integer maxLevel,Integer level,JcSpider parentSpider) throws IOException, TimeoutException, InterruptedException { PageProcessor pageProcessor= JdProcessorLevelEnum.getProcessorByLevel(level); JcSpider jcSpider=new JcSpider(pageProcessor); jcSpider.setUUID(taskName+level); jcSpider.setDownloader(new JcHttpClientDownloader()); jcSpider.setParentSpider(parentSpider); String producerQueueName=taskName+level; String consumerQueueName=taskName+(level-1); MqChannel mqChannelProducer=createMqChannel(producerQueueName); Producer producer=createProduct(mqChannelProducer); MqChannel mqChannelConsumer=createMqChannel(consumerQueueName); Consumer consumer=createConsumer(mqChannelConsumer); jcSpider.setConsumer(consumer); jcSpider.setConsumerQueueName(consumerQueueName); //最后一級直接進入ES,所以不用進MQ,不需要MQ生產者 if(level<maxLevel){ jcSpider.addPipeline(new MqPipeline(producer)); }else { jcSpider.addPipeline(new EsPipeline(taskName)); } //第一級不需要從MQ中取數據,所以不需要消費者 if(level==1){ jcSpider.addUrl(url); jcSpider.setConsumer(null); } jcSpider.thread(10).start(); jcSpider.startConsumer(consumerQueueName); //創建子集 if(level<maxLevel){ //稍等等待父級spider和consumer啟動 Thread.sleep(2000); createJcSpider(url,taskName,maxLevel,level+1,jcSpider); } } /** * 創建消費者 * @param mqChannel * @return * @throws IOException * @throws TimeoutException */ private Producer createProduct(MqChannel mqChannel) throws IOException, TimeoutException { return mqChannel.createProducer(); } /** * 創建生產者 * @param mqChannel * @return * @throws IOException * @throws TimeoutException */ private Consumer createConsumer(MqChannel mqChannel) throws IOException, TimeoutException { return mqChannel.createConsumer(); } /** * 創建連接渠道 * @return * @throws IOException * @throws TimeoutException */ private MqChannel createMqChannel(String queueName) throws IOException, TimeoutException { MqChannel mqChannel=MqConnectionFactory.getConnectionChannel(); Exchange exchange=new DefaultExchange(mqChannel.getChannel(),queueName); mqChannel.setExchange(exchange); return mqChannel; } }
最后啟動項目,跑一遍結果:
rabbitMq中的創建的相應的隊列並且跑起了數據,unacked問題尚未解決。
數據存入ES:
因為分析相對簡單,只獲取了部分數據。
有任何的不合適的地方還請指正。