本文介紹一個簡單的多線程並發爬蟲,這里說的簡單是指爬取的數據規模不大,單機運行,並且不使用數據庫,但保證多線程下的數據的一致性,並且能讓爬得正起勁的爬蟲停下來,而且能保存爬取狀態以備下次繼續。
爬蟲實現的步驟基本如下:
- 分析網頁結構,選取自己感興趣的部分;
- 建立兩個Buffer,一個用於保存已經訪問的URL,一個用戶保存帶訪問的URL;
- 從待訪問的Buffer中取出一個URL來爬取,保存這個URL中感興趣的信息;並將這個URL加入已經訪問的Buffer中,然后將這個URL中的所有外鏈URLs中沒有被訪問過的URL加到待訪問Buffer;
- 只要待訪問的Buffer不為空,重復上一步。
這次是為了給博客園的用戶進行一次pagerank排名,爬取了博客園各個用戶的粉絲與關注者。博客園的用戶用17萬多個。爬取的頁面是http://home.cnblogs.com/u/+userId,每個用戶的url只需要用用戶的id表示就可以了,用戶id按平均10B來計算,保存所有用戶也只需1.7Mb內存。因此我把兩個Buffer都放在內存中。
這個項目的就四個java文件,結構如下:
下面對整個爬蟲的實現過程進行詳細的介紹。
一、登錄
要獲取用戶的粉絲與關注,必須先登錄,博客園的模擬登陸算是比較簡單,找到登錄時要上傳的參數,然后Pos發送即登錄成功,可以使用Chrome的工具,打開登錄頁面,調好賬號和密碼后,按F12彈出工具,按登錄就能看到要傳的參數了,再POST一個這些參數就好了。
我之前是使用這樣的方式,后來用使用Jsoup解析的參數,代碼實現如下:
1 /** 2 * 使用Joup解析登錄參數,然后POST發送參數實現登錄 3 * 4 * @throws UnsupportedEncodingException 5 * @throws IOException 6 */ 7 private static void login() throws UnsupportedEncodingException, 8 IOException { 9 CookieHandler.setDefault(new CookieManager()); 10 // 獲取登錄頁面 11 String page = getPage(LOGIN_URL); 12 // 從登錄去取出參數,並填充賬號和密碼 13 Document doc = Jsoup.parse(page); 14 // 取登錄表格 15 Element loginform = doc.getElementById("frmLogin"); 16 Elements inputElements = loginform.getElementsByTag("input"); 17 List<String> paramList = new ArrayList<String>(); 18 for (Element inputElement : inputElements) { 19 String key = inputElement.attr("name"); 20 String value = inputElement.attr("value"); 21 if (key.equals("tbUserName")) 22 value = Test.Name; 23 else if (key.equals("tbPassword")) 24 value = Test.passwd; 25 paramList.add(key + "=" + URLEncoder.encode(value, "UTF-8")); 26 } 27 // 封裝請求參數 28 StringBuilder para = new StringBuilder(); 29 for (String param : paramList) { 30 if (para.length() == 0) { 31 para.append(param); 32 } else { 33 para.append("&" + param); 34 } 35 } 36 // POST發送登錄 37 String result = sendPost(LOGIN_URL, para.toString()); 38 if (!result.contains("followees")) { 39 cookies = null; 40 System.out.println("登錄失敗"); 41 } else 42 System.out.println("登錄成功"); 43 }
二、獲取粉絲與關注
登錄成功就可以爬取粉絲和關注了,關注在http://home.cnblogs.com/u/userid/followees/鏈接中,而粉絲在http://home.cnblogs.com/u/userid/followers/,兩個網頁結構基本相同,只需要把選擇一下followees(被關注者)和(followers)關注者,用Jsoup解析avatar_list中avatar_name就好了,代碼如下:
1 /** 2 * 獲取一頁中的關注or粉絲 3 * 4 * @param pageHtml 5 * @return 6 */ 7 8 private List<String> getOnePageFriends(Document doc) { 9 List<String> firends = new ArrayList<String>(); 10 Elements inputElements = doc.getElementsByClass("avatar_name"); 11 for (Element inputElement : inputElements) { 12 Elements links = inputElement.getElementsByTag("a"); 13 for (Element link : links) { 14 //從href中解析出用戶id 15 String href = link.attr("href"); 16 firends.add(href.substring(3, href.length() - 1)); 17 } 18 } 19 return firends; 20 }
每一頁顯示50個粉絲or關注者,需要分頁爬取,獲取下一頁跟獲取用戶粉絲差不多,找到元素就好。
三、爬取單個用戶
爬取的一個用戶的過程就是先分頁爬取粉絲,再爬取關注者,然后把爬過的用戶放入訪問Buffer中,再把爬到的用戶放到未訪問隊列中。
1 @Override 2 public void run() { 3 while (stop.get() == false) { 4 // 取出一個待訪問 5 String userId = mUserBuffer.pickOne(); 6 try { 7 // 爬取粉絲 8 List<String> fans = crawUser(userId, "/followers"); 9 // 爬取關注者 10 List<String> heros = crawUser(userId, "/followees"); 11 // 只需要保持粉絲關系即可 12 StringBuilder sb = new StringBuilder(userId).append("\t"); 13 for (String friend : fans) { 14 sb.append(friend).append("\t"); 15 } 16 sb.deleteCharAt(sb.length() - 1).append("\n"); 17 saver.save(sb.toString()); 18 // 被關注者應該放進隊列里面,以供下次爬取他的粉絲 19 fans.addAll(heros); 20 mUserBuffer.addUnCrawedUsers(fans); 21 } catch (Exception e) { 22 saver.log(e.getMessage()); 23 // 訪問錯誤時,放入訪問出錯的隊列中,以備以后重新訪問。 24 mUserBuffer.addErrorUser(userId); 25 } 26 }
一頁一頁爬取單個用戶如下:
1 /** 2 * 爬取用戶,根據tag來決定是爬該用戶關注的人,還是該用戶的粉絲 3 * 4 * @param userId 5 * @return 6 * @throws IOException 7 */ 8 private List<String> crawUser(String userId, String tag) throws IOException { 9 //構造URL 10 StringBuilder urlBuilder = new StringBuilder(USER_HOME); 11 urlBuilder.append("/u/").append(userId).append(tag); 12 //請求頁面 13 String page = getPage(urlBuilder.toString()); 14 Document doc = Jsoup.parse(page); 15 List<String> friends = new ArrayList<String>(); 16 //爬取第一頁 17 friends.addAll(getOnePageFriends(doc)); 18 String nextUrl = null; 19 //不斷地爬取下一頁 20 while ((nextUrl = getNextUrl(doc)) != null) { 21 page = getPage(nextUrl); 22 doc = Jsoup.parse(page); 23 friends.addAll(getOnePageFriends(doc)); 24 } 25 return friends; 26 }
整個爬蟲結構就是這樣了:
1 public class UserCrawler implements Runnable { 2 // 停止任務標志 3 private static AtomicBoolean stop; 4 // 當前爬蟲的id 5 private int id; 6 // 用戶緩存 7 private UserBuffer mUserBuffer; 8 // 日志與粉絲保存工具 9 private Saver saver; 10 11 static { 12 stop = new AtomicBoolean(false); 13 try { 14 // 登錄一次即可 15 login(); 16 // 保存數據線程先啟動 17 Saver.getInstance().start(); 18 } catch (IOException e) { 19 e.printStackTrace(); 20 } 21 // new Thread(new CommandListener()).start(); 22 } 23 24 public UserCrawler(UserBuffer userBuffer) { 25 mUserBuffer = userBuffer; 26 mUserBuffer.crawlerCountIncrease(); 27 id = c++; 28 saver = Saver.getInstance(); 29 } 30 31 @Override 32 public void run() { 33 if (id > 0) { 34 // 等第一個線程啟動一段時候再開始新的線程 35 try { 36 TimeUnit.SECONDS.sleep(20 + id); 37 } catch (InterruptedException e) { 38 e.printStackTrace(); 39 } 40 } 41 System.out.println("UserCrawler " + id + " start"); 42 int retry = 3;// 重置嘗試次數 43 while (stop.get() == false) { 44 // 取出一個待訪問 45 String userId = mUserBuffer.pickOne(); 46 if (userId == null) {// 隊列元素已經為空 47 retry--;// 重試3次 48 if (retry <= 0) 49 break; 50 continue; 51 } 52 ...//爬取用戶 53 } 54 System.out.println("UserCrawler " + id + " stop"); 55 // 當前線程停止了 56 mUserBuffer.crawlerCountDecrease(); 57 } 58 59 private List<String> crawUser(String userId, String tag) throws IOException { 60 61 } 62 63 /** 64 * 獲取一頁中的關注or粉絲 65 * 66 * @param pageHtml 67 * @return 68 */ 69 70 private List<String> getOnePageFriends(Document doc) { 71 ... 72 } 73 74 /** 75 * 獲取下一頁的地址 76 * 77 * @param doc 78 * @return 79 */ 80 private String getNextUrl(Document doc) { 81 82 } 83 84 private static String getPage(String pageUrl) throws IOException { 85 86 } 87 88 /*** 89 * 終止所有爬蟲任務 90 */ 91 public static void stop() { 92 System.out.println("正在終止..."); 93 stop.compareAndSet(false, true); 94 UserBuffer.getInstance().prepareForStop(); 95 } 96 97 private static void login() throws UnsupportedEncodingException, 98 IOException { 99 ... 100 } 101 102 103 private static String sendPost(String url, String postParams) 104 throws IOException { 105 ... 106 107 }
四、Buffer並發控制
在用戶Buffer設置一個已經訪問的用戶集合、一個訪問出錯的用戶集合和一個待訪問的隊列。
1 private UserBuffer() { 2 crawedUsers = new HashSet<String>();// 已經訪問的用戶,包括訪問成功和訪問出錯的用戶 3 errorUsers = new HashSet<String>();// 訪問出錯的用戶 4 unCrawedUsers = new LinkedList<String>();// 未訪問的用戶 5 }
UserBuffer全局唯一,因此采用單例模式,使用的集合和隊列都不是線程安全的數據結構,我認為沒有必要使用線程安全的ConcurrentSkipListSet與ConcurrentLinkedQueue,因為將一個用戶插入unCrawedUsers隊列時需要先判斷是否已經存在於crawedUsers用戶集合中了,這需要用鎖來同步訪問,如果不使用鎖來控制,單個線程安全的set和queque不能保證多個變量之間的test-try有效性。而使用了鎖后,再使用線程安全的數據結構只會增加加鎖和解鎖的次數,反而降低了性能。
1 /*** 2 * 添加未訪問的用戶 3 * 4 * @param users 5 * @return 6 */ 7 public synchronized void addUnCrawedUsers(List<String> users) { 8 // 添加未訪問的用戶 9 for (String user : users) { 10 if (!crawedUsers.contains(user)) 11 unCrawedUsers.add(user); 12 } 13 } 14 15 /** 16 * 從隊列中取一個元素,並把這個元素添加到已經訪問的集合中,以免重復訪問。 17 * 18 * 19 * @return 20 */ 21 public synchronized String pickOne() { 22 String newId = unCrawedUsers.poll(); 23 // 隊列中可能包含重復的id,因為插入隊列時只檢查是否在訪問集合里, 24 // 沒有檢查是否已經出現在隊列里 25 while (crawedUsers.contains(newId)) { 26 newId = unCrawedUsers.poll(); 27 } 28 //訪問前先把添加到已經訪問的集合中 29 crawedUsers.add(newId); 30 return newId; 31 } 32 33 /** 34 * 添加訪問出錯的用戶 35 * 36 * @param userId 37 */ 38 public synchronized void addErrorUser(String userId) { 39 errorUsers.add(userId); 40 }
五、安全地終止爬蟲
大丈夫能伸能屈,能走能停,爬蟲也當如此。爬博客園的用戶時,我真是提心吊膽,擔心管理員封我的賬號,我盡量挑凌晨的時間爬數據,另外就是爬了一會后,我就停下來,過一段時間再爬。要讓一個多線程的程序平穩的停下來還真不簡單,最擔心的就是死鎖,發送停止命令后,線程不動卻也沒有終止,爬了好久的Buffer空間沒有保存,那個恨啊。
我的思路:
1、爬蟲啟動時,向Buffer注冊一下,buffer記錄啟動的爬蟲數量;
2、對爬蟲設置一個全局的標志,爬蟲在每次爬取一個用戶前檢查終止標志是否被設置;
3、當發生停止命令時,爬蟲檢查到停止標志被設置,於是通知Buffer自己將要停止,通知完后就結束運行;
4、Buffer收到爬蟲停止通知后,將爬蟲計數器減1,當計數器為0時,保存工作空間,同時通知關閉日志;
5、為了避免有些爬蟲在運行異常時推出而沒有通知Buffer,在發出停止命令時,同時通知Buffer准備停止,Buffer設置一個計時器,2分鍾后,強制保存爬蟲狀態和日志。
完整的代碼還是放在Github上,有興趣的同學可以看看。
感謝閱讀,轉載請注明出處:http://www.cnblogs.com/fengfenggirl/