Java 多線程爬蟲及分布式爬蟲架構探索


這是 Java 爬蟲系列博文的第五篇,在上一篇 Java 爬蟲服務器被屏蔽,不要慌,咱們換一台服務器 中,我們簡單的聊反爬蟲策略和反反爬蟲方法,主要針對的是 IP 被封及其對應辦法。前面幾篇文章我們把爬蟲相關的基本知識都講的差不多啦。這一篇我們來聊一聊爬蟲架構相關的內容。

前面幾章內容我們的爬蟲程序都是單線程,在我們調試爬蟲程序的時候,單線程爬蟲沒什么問題,但是當我們在線上環境使用單線程爬蟲程序去采集網頁時,單線程就暴露出了兩個致命的問題:

  • 采集效率特別慢,單線程之間都是串行的,下一個執行動作需要等上一個執行完才能執行
  • 對服務器的CUP等利用率不高,想想我們的服務器都是 8核16G,32G 的只跑一個線程會不會太浪費啦

線上環境不可能像我們本地測試一樣,不在乎采集效率,只要能正確提取結果就行。在這個時間就是金錢的年代,不可能給你時間去慢慢的采集,所以單線程爬蟲程序是行不通的,我們需要將單線程改成多線程的模式,來提升采集效率和提高計算機利用率。

多線程的爬蟲程序設計比單線程就要復雜很多,但是與其他業務在高並發下要保證數據安全又不同,多線程爬蟲在數據安全上到要求不是那么的高,因為每個頁面都可以被看作是一個獨立體。要做好多線程爬蟲就必須做好兩點:第一點就是統一的待采集 URL 維護,第二點就是 URL 的去重, 下面我們簡單的來聊一聊這兩點。

維護待采集的 URL

多線程爬蟲程序就不能像單線程那樣,每個線程獨自維護這自己的待采集 URL,如果這樣的話,那么每個線程采集的網頁將是一樣的,你這就不是多線程采集啦,你這是將一個頁面采集的多次。基於這個原因我們就需要將待采集的 URL 統一維護,每個線程從統一 URL 維護處領取采集 URL ,完成采集任務,如果在頁面上發現新的 URL 鏈接則添加到 統一 URL 維護的容器中。下面是幾種適合用作統一 URL 維護的容器:

  • JDK 的安全隊列,例如 LinkedBlockingQueue
  • 高性能的 NoSQL,比如 Redis、Mongodb
  • MQ 消息中間件

URL 的去重

URL 的去重也是多線程采集的關鍵一步,因為如果不去重的話,那么我們將采集到大量重復的 URL,這樣並沒有提升我們的采集效率,比如一個分頁的新聞列表,我們在采集第一頁的時候可以得到 2、3、4、5 頁的鏈接,在采集第二頁的時候又會得到 1、3、4、5 頁的鏈接,待采集的 URL 隊列中將存在大量的列表頁鏈接,這樣就會重復采集甚至進入到一個死循環當中,所以就需要 URL 去重。URL 去重的方法就非常多啦,下面是幾種常用的 URL 去重方式:

  • 將 URL 保存到數據庫進行去重,比如 redis、MongoDB
  • 將 URL 放到哈希表中去重,例如 hashset
  • 將 URL 經過 MD5 之后保存到哈希表中去重,相比於上面一種,能夠節約空間
  • 使用 布隆過濾器(Bloom Filter)去重,這種方式能夠節約大量的空間,就是不那么准確。

關於多線程爬蟲的兩個核心知識點我們都知道啦,下面我畫了一個簡單的多線程爬蟲架構圖,如下圖所示:

多線程爬蟲架構圖

上面我們主要了解了多線程爬蟲的架構設計,接下來我們不妨來試試 Java 多線程爬蟲,我們以采集虎撲新聞為例來實戰一下 Java 多線程爬蟲,Java 多線程爬蟲中設計到了 待采集 URL 的維護和 URL 去重,由於我們這里只是演示,所以我們就使用 JDK 內置的容器來完成,我們使用 LinkedBlockingQueue 作為待采集 URL 維護容器,HashSet 作為 URL 去重容器。下面是 Java 多線程爬蟲核心代碼,詳細代碼以上傳 GitHub,地址在文末:

/**
 * 多線程爬蟲
 */
public class ThreadCrawler implements Runnable {
    // 采集的文章數
    private final AtomicLong pageCount = new AtomicLong(0);
    // 列表頁鏈接正則表達式
    public static final String URL_LIST = "https://voice.hupu.com/nba";
    protected Logger logger = LoggerFactory.getLogger(getClass());
    // 待采集的隊列
    LinkedBlockingQueue<String> taskQueue;
    // 采集過的鏈接列表
    HashSet<String> visited;
    // 線程池
    CountableThreadPool threadPool;
    /**
     *
     * @param url 起始頁
     * @param threadNum 線程數
     * @throws InterruptedException
     */
    public ThreadCrawler(String url, int threadNum) throws InterruptedException {
        this.taskQueue = new LinkedBlockingQueue<>();
        this.threadPool = new CountableThreadPool(threadNum);
        this.visited = new HashSet<>();
        // 將起始頁添加到待采集隊列中
        this.taskQueue.put(url);
    }

    @Override
    public void run() {
        logger.info("Spider started!");
        while (!Thread.currentThread().isInterrupted()) {
	        // 從隊列中獲取待采集 URL
            final String request = taskQueue.poll();
            // 如果獲取 request 為空,並且當前的線程采已經沒有線程在運行
            if (request == null) {
                if (threadPool.getThreadAlive() == 0) {
                    break;
                }
            } else {
                // 執行采集任務
                threadPool.execute(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            processRequest(request);
                        } catch (Exception e) {
                            logger.error("process request " + request + " error", e);
                        } finally {
                            // 采集頁面 +1
                            pageCount.incrementAndGet();
                        }
                    }
                });
            }
        }
        threadPool.shutdown();
        logger.info("Spider closed! {} pages downloaded.", pageCount.get());
    }

    /**
     * 處理采集請求
     * @param url
     */
    protected void processRequest(String url) {
        // 判斷是否為列表頁
        if (url.matches(URL_LIST)) {
	        // 列表頁解析出詳情頁鏈接添加到待采集URL隊列中
            processTaskQueue(url);
        } else {
	        // 解析網頁
            processPage(url);
        }
    }
    /**
     * 處理鏈接采集
     * 處理列表頁,將 url 添加到隊列中
     *
     * @param url
     */
    protected void processTaskQueue(String url) {
        try {
            Document doc = Jsoup.connect(url).get();
            // 詳情頁鏈接
            Elements elements = doc.select(" div.news-list > ul > li > div.list-hd > h4 > a");
            elements.stream().forEach((element -> {
                String request = element.attr("href");
                // 判斷該鏈接是否存在隊列或者已采集的 set 中,不存在則添加到隊列中
                if (!visited.contains(request) && !taskQueue.contains(request)) {
                    try {
                        taskQueue.put(request);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }));
            // 列表頁鏈接
            Elements list_urls = doc.select("div.voice-paging > a");
            list_urls.stream().forEach((element -> {
                String request = element.absUrl("href");
                // 判斷是否符合要提取的列表鏈接要求
                if (request.matches(URL_LIST)) {
                    // 判斷該鏈接是否存在隊列或者已采集的 set 中,不存在則添加到隊列中
                    if (!visited.contains(request) && !taskQueue.contains(request)) {
                        try {
                            taskQueue.put(request);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }));

        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    /**
     * 解析頁面
     *
     * @param url
     */
    protected void processPage(String url) {
        try {
            Document doc = Jsoup.connect(url).get();
            String title = doc.select("body > div.hp-wrap > div.voice-main > div.artical-title > h1").first().ownText();

            System.out.println(Thread.currentThread().getName() + " 在 " + new Date() + " 采集了虎撲新聞 " + title);
            // 將采集完的 url 存入到已經采集的 set 中
            visited.add(url);

        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {

        try {
            new ThreadCrawler("https://voice.hupu.com/nba", 5).run();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

我們用 5 個線程去采集虎撲新聞列表頁看看效果如果?運行該程序,得到如下結果:

多線程采集結果

結果中可以看出,我們啟動了 5 個線程采集了 61 頁頁面,一共耗時 2 秒鍾,可以說效果還是不錯的,我們來跟單線程對比一下,看看差距有多大?我們將線程數設置為 1 ,再次啟動程序,得到如下結果:

單線程運行結果

可以看出單線程采集虎撲 61 條新聞花費了 7 秒鍾,耗時差不多是多線程的 4 倍,你想想這可只是 61 個頁面,頁面更多的話,差距會越來越大,所以多線程爬蟲效率還是非常高的。

分布式爬蟲架構

分布式爬蟲架構是一個大型采集程序才需要使用的架構,一般情況下使用單機多線程就可以解決業務需求,反正我是沒有分布式爬蟲項目的經驗,所以這一塊我也沒什么可以講的,但是我們作為技術人員,我們需要對技術保存熱度,雖然不用,但是了解了解也無妨,我查閱了不少資料得出了如下結論:

分布式爬蟲架構跟我們多線程爬蟲架構在思路上來說是一樣的,我們只需要在多線程的基礎上稍加改進就可以變成一個簡單的分布式爬蟲架構。因為分布式爬蟲架構中爬蟲程序部署在不同的機器上,所以我們待采集的 URL 和 采集過的 URL 就不能存放在爬蟲程序機器的內存中啦,我們需要將它統一在某台機器上維護啦,比如存放在 Redis 或者 MongoDB 中,每台機器都從這上面獲取采集鏈接,而不是從 LinkedBlockingQueue 這樣的內存隊列中取鏈接啦,這樣一個簡單的分布式爬蟲架構就出現了,當然這里面還會有很多細節問題,因為我沒有分布式架構的經驗,我也無從說起,如果你有興趣的話,歡迎交流。

源代碼:源代碼

文章不足之處,望大家多多指點,共同學習,共同進步

最后

打個小廣告,歡迎掃碼關注微信公眾號:「平頭哥的技術博文」,一起進步吧。
平頭哥的技術博文


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM