一個簡單的多線程爬蟲


 

   本文介紹一個簡單的多線程並發爬蟲,這里說的簡單是指爬取的數據規模不大,單機運行,並且不使用數據庫,但保證多線程下的數據的一致性,並且能讓爬得正起勁的爬蟲停下來,而且能保存爬取狀態以備下次繼續。

  爬蟲實現的步驟基本如下: 

  • 分析網頁結構,選取自己感興趣的部分;
  • 建立兩個Buffer,一個用於保存已經訪問的URL,一個用戶保存帶訪問的URL;
  • 從待訪問的Buffer中取出一個URL來爬取,保存這個URL中感興趣的信息;並將這個URL加入已經訪問的Buffer中,然后將這個URL中的所有外鏈URLs中沒有被訪問過的URL加到待訪問Buffer;
  • 只要待訪問的Buffer不為空,重復上一步。

  這次是為了給博客園的用戶進行一次pagerank排名,爬取了博客園各個用戶的粉絲與關注者。博客園的用戶用17萬多個。爬取的頁面是http://home.cnblogs.com/u/+userId,每個用戶的url只需要用用戶的id表示就可以了,用戶id按平均10B來計算,保存所有用戶也只需1.7Mb內存。因此我把兩個Buffer都放在內存中。

  這個項目的就四個java文件,結構如下:  

  下面對整個爬蟲的實現過程進行詳細的介紹。

 

一、登錄

  要獲取用戶的粉絲與關注,必須先登錄,博客園的模擬登陸算是比較簡單,找到登錄時要上傳的參數,然后Pos發送即登錄成功,可以使用Chrome的工具,打開登錄頁面,調好賬號和密碼后,按F12彈出工具,按登錄就能看到要傳的參數了,再POST一個這些參數就好了。  

  我之前是使用這樣的方式,后來用使用Jsoup解析的參數,代碼實現如下:

 1   /**
 2      * 使用Joup解析登錄參數,然后POST發送參數實現登錄
 3      * 
 4      * @throws UnsupportedEncodingException
 5      * @throws IOException
 6      */
 7     private static void login() throws UnsupportedEncodingException,
 8             IOException {
 9         CookieHandler.setDefault(new CookieManager());
10         // 獲取登錄頁面
11         String page = getPage(LOGIN_URL);
12         // 從登錄去取出參數,並填充賬號和密碼
13         Document doc = Jsoup.parse(page);
14         // 取登錄表格
15         Element loginform = doc.getElementById("frmLogin");
16         Elements inputElements = loginform.getElementsByTag("input");
17         List<String> paramList = new ArrayList<String>();
18         for (Element inputElement : inputElements) {
19             String key = inputElement.attr("name");
20             String value = inputElement.attr("value");
21             if (key.equals("tbUserName"))
22                 value = Test.Name;
23             else if (key.equals("tbPassword"))
24                 value = Test.passwd;
25             paramList.add(key + "=" + URLEncoder.encode(value, "UTF-8"));
26         }
27         // 封裝請求參數
28         StringBuilder para = new StringBuilder();
29         for (String param : paramList) {
30             if (para.length() == 0) {
31                 para.append(param);
32             } else {
33                 para.append("&" + param);
34             }
35         }
36         // POST發送登錄
37         String result = sendPost(LOGIN_URL, para.toString());
38         if (!result.contains("followees")) {
39             cookies = null;
40             System.out.println("登錄失敗");
41         } else
42             System.out.println("登錄成功");
43     }

 

二、獲取粉絲與關注

  登錄成功就可以爬取粉絲和關注了,關注在http://home.cnblogs.com/u/userid/followees/鏈接中,而粉絲在http://home.cnblogs.com/u/userid/followers/,兩個網頁結構基本相同,只需要把選擇一下followees(被關注者)和(followers)關注者,用Jsoup解析avatar_list中avatar_name就好了,代碼如下:

 1   /**
 2      * 獲取一頁中的關注or粉絲
 3      * 
 4      * @param pageHtml
 5      * @return
 6      */
 7 
 8     private List<String> getOnePageFriends(Document doc) {
 9         List<String> firends = new ArrayList<String>();
10         Elements inputElements = doc.getElementsByClass("avatar_name");
11         for (Element inputElement : inputElements) {
12             Elements links = inputElement.getElementsByTag("a");
13             for (Element link : links) {
14                 //從href中解析出用戶id
15                 String href = link.attr("href");
16                 firends.add(href.substring(3, href.length() - 1));
17             }
18         }
19         return firends;
20     }

  每一頁顯示50個粉絲or關注者,需要分頁爬取,獲取下一頁跟獲取用戶粉絲差不多,找到元素就好。

 

三、爬取單個用戶

  爬取的一個用戶的過程就是先分頁爬取粉絲,再爬取關注者,然后把爬過的用戶放入訪問Buffer中,再把爬到的用戶放到未訪問隊列中。

 1     @Override
 2     public void run() {        
 3         while (stop.get() == false) {
 4             // 取出一個待訪問
 5             String userId = mUserBuffer.pickOne();            
 6             try {
 7                 // 爬取粉絲
 8                 List<String> fans = crawUser(userId, "/followers");
 9                 // 爬取關注者
10                 List<String> heros = crawUser(userId, "/followees");
11                 // 只需要保持粉絲關系即可
12                 StringBuilder sb = new StringBuilder(userId).append("\t");
13                 for (String friend : fans) {
14                     sb.append(friend).append("\t");
15                 }
16                 sb.deleteCharAt(sb.length() - 1).append("\n");
17                 saver.save(sb.toString());
18                 // 被關注者應該放進隊列里面,以供下次爬取他的粉絲
19                 fans.addAll(heros);
20                 mUserBuffer.addUnCrawedUsers(fans);
21             } catch (Exception e) {
22                 saver.log(e.getMessage());
23                 // 訪問錯誤時,放入訪問出錯的隊列中,以備以后重新訪問。
24                 mUserBuffer.addErrorUser(userId);
25             }
26         }    

  一頁一頁爬取單個用戶如下:

 1       /**
 2      * 爬取用戶,根據tag來決定是爬該用戶關注的人,還是該用戶的粉絲
 3      * 
 4      * @param userId
 5      * @return
 6      * @throws IOException
 7      */
 8     private List<String> crawUser(String userId, String tag) throws IOException {
 9         //構造URL
10         StringBuilder urlBuilder = new StringBuilder(USER_HOME);
11         urlBuilder.append("/u/").append(userId).append(tag);
12         //請求頁面
13         String page = getPage(urlBuilder.toString());
14         Document doc = Jsoup.parse(page);
15         List<String> friends = new ArrayList<String>();
16         //爬取第一頁
17         friends.addAll(getOnePageFriends(doc));
18         String nextUrl = null;
19         //不斷地爬取下一頁
20         while ((nextUrl = getNextUrl(doc)) != null) {
21             page = getPage(nextUrl);
22             doc = Jsoup.parse(page);
23             friends.addAll(getOnePageFriends(doc));
24         }
25         return friends;
26     }

   整個爬蟲結構就是這樣了:

  1 public class UserCrawler implements Runnable {
  2     // 停止任務標志
  3     private static AtomicBoolean stop;
  4     // 當前爬蟲的id
  5     private int id;
  6     // 用戶緩存
  7     private UserBuffer mUserBuffer;
  8     // 日志與粉絲保存工具
  9     private Saver saver;
 10 
 11     static {
 12         stop = new AtomicBoolean(false);
 13         try {
 14             // 登錄一次即可
 15             login();
 16             // 保存數據線程先啟動
 17             Saver.getInstance().start();
 18         } catch (IOException e) {
 19             e.printStackTrace();
 20         }
 21         // new Thread(new CommandListener()).start();
 22     }
 23 
 24     public UserCrawler(UserBuffer userBuffer) {
 25         mUserBuffer = userBuffer;
 26         mUserBuffer.crawlerCountIncrease();
 27         id = c++;
 28         saver = Saver.getInstance();
 29     }
 30 
 31     @Override
 32     public void run() {
 33         if (id > 0) {
 34             // 等第一個線程啟動一段時候再開始新的線程
 35             try {
 36                 TimeUnit.SECONDS.sleep(20 + id);
 37             } catch (InterruptedException e) {
 38                 e.printStackTrace();
 39             }
 40         }
 41         System.out.println("UserCrawler " + id + " start");
 42         int retry = 3;// 重置嘗試次數
 43         while (stop.get() == false) {
 44             // 取出一個待訪問
 45             String userId = mUserBuffer.pickOne();
 46             if (userId == null) {// 隊列元素已經為空
 47                 retry--;// 重試3次
 48                 if (retry <= 0)
 49                     break;
 50                  continue;
 51             }
 52             ...//爬取用戶
 53         }
 54         System.out.println("UserCrawler " + id + " stop");
 55         // 當前線程停止了
 56         mUserBuffer.crawlerCountDecrease();
 57     }
 58     
 59     private List<String> crawUser(String userId, String tag) throws IOException {
 60         
 61     }
 62 
 63     /**
 64      * 獲取一頁中的關注or粉絲
 65      * 
 66      * @param pageHtml
 67      * @return
 68      */
 69 
 70     private List<String> getOnePageFriends(Document doc) {
 71         ...
 72     }
 73 
 74     /**
 75      * 獲取下一頁的地址
 76      * 
 77      * @param doc
 78      * @return
 79      */
 80     private String getNextUrl(Document doc) {
 81         
 82     }
 83 
 84     private static String getPage(String pageUrl) throws IOException {
 85         
 86     }
 87 
 88     /***
 89      * 終止所有爬蟲任務
 90      */
 91     public static void stop() {
 92         System.out.println("正在終止...");
 93         stop.compareAndSet(false, true);
 94         UserBuffer.getInstance().prepareForStop();
 95     }
 96 
 97     private static void login() throws UnsupportedEncodingException,
 98             IOException {
 99         ...
100     }
101 
102 
103     private static String sendPost(String url, String postParams)
104             throws IOException {
105         ...
106 
107 }

 

四、Buffer並發控制

  在用戶Buffer設置一個已經訪問的用戶集合、一個訪問出錯的用戶集合和一個待訪問的隊列。

1     private UserBuffer() {
2         crawedUsers = new HashSet<String>();// 已經訪問的用戶,包括訪問成功和訪問出錯的用戶
3         errorUsers = new HashSet<String>();// 訪問出錯的用戶
4         unCrawedUsers = new LinkedList<String>();// 未訪問的用戶
5     }

  UserBuffer全局唯一,因此采用單例模式,使用的集合和隊列都不是線程安全的數據結構,我認為沒有必要使用線程安全的ConcurrentSkipListSet與ConcurrentLinkedQueue,因為將一個用戶插入unCrawedUsers隊列時需要先判斷是否已經存在於crawedUsers用戶集合中了,這需要用鎖來同步訪問,如果不使用鎖來控制,單個線程安全的set和queque不能保證多個變量之間的test-try有效性。而使用了鎖后,再使用線程安全的數據結構只會增加加鎖和解鎖的次數,反而降低了性能。

 1     /***
 2      * 添加未訪問的用戶 
 3      * 
 4      * @param users
 5      * @return
 6      */
 7     public synchronized void addUnCrawedUsers(List<String> users) {
 8         // 添加未訪問的用戶
 9         for (String user : users) {
10             if (!crawedUsers.contains(user))
11                 unCrawedUsers.add(user);
12         }
13     }
14 
15     /**
16      * 從隊列中取一個元素,並把這個元素添加到已經訪問的集合中,以免重復訪問。
17      * 
18      * 
19      * @return
20      */
21     public synchronized String pickOne() {
22         String newId = unCrawedUsers.poll();
23         // 隊列中可能包含重復的id,因為插入隊列時只檢查是否在訪問集合里,
24         // 沒有檢查是否已經出現在隊列里
25         while (crawedUsers.contains(newId)) {
26             newId = unCrawedUsers.poll();
27         }
28         //訪問前先把添加到已經訪問的集合中
29         crawedUsers.add(newId);
30         return newId;
31     }
32 
33     /**
34      * 添加訪問出錯的用戶
35      * 
36      * @param userId
37      */
38     public synchronized void addErrorUser(String userId) {
39         errorUsers.add(userId);
40     }

 

五、安全地終止爬蟲

  大丈夫能伸能屈,能走能停,爬蟲也當如此。爬博客園的用戶時,我真是提心吊膽,擔心管理員封我的賬號,我盡量挑凌晨的時間爬數據,另外就是爬了一會后,我就停下來,過一段時間再爬。要讓一個多線程的程序平穩的停下來還真不簡單,最擔心的就是死鎖,發送停止命令后,線程不動卻也沒有終止,爬了好久的Buffer空間沒有保存,那個恨啊。

  我的思路:

  1、爬蟲啟動時,向Buffer注冊一下,buffer記錄啟動的爬蟲數量;

  2、對爬蟲設置一個全局的標志,爬蟲在每次爬取一個用戶前檢查終止標志是否被設置;

  3、當發生停止命令時,爬蟲檢查到停止標志被設置,於是通知Buffer自己將要停止,通知完后就結束運行;

  4、Buffer收到爬蟲停止通知后,將爬蟲計數器減1,當計數器為0時,保存工作空間,同時通知關閉日志;

  5、為了避免有些爬蟲在運行異常時推出而沒有通知Buffer,在發出停止命令時,同時通知Buffer准備停止,Buffer設置一個計時器,2分鍾后,強制保存爬蟲狀態和日志。

 

 

  完整的代碼還是放在Github上,有興趣的同學可以看看。 

  感謝閱讀,轉載請注明出處:http://www.cnblogs.com/fengfenggirl/


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM