zookeeper
一直琢磨着分布式的東西怎么搞,公司也沒有相關的項目能夠參與,所以還是回歸自己的專長來吧——基於ZooKeeper的分布式隊列爬蟲,由於沒什么人能夠一起溝通分布式的相關知識,下面的小項目純屬“胡編亂造”。
簡單介紹下ZooKeeper:ZooKeeper是一個分布式的,開放源碼的分布式應用程序協調服務,是Google的Chubby一個開源的實現,它是集群的管理者,監視着集群中各個節點的狀態根據節點提交的反饋進行下一步合理操作。最終,將簡單易用的接口和性能高效、功能穩定的系統提供給用戶。
基本的知識就不過多介紹了,可以參考參考下面這些人的:
ZooKeeper官網
http://www.cnblogs.com/wuxl360/p/5817471.html
一、整體架構
這張圖來自skyme,我也是看了這張圖的啟發寫了這篇文章的。
最基本的分布式隊列即一個生產者不斷抓取鏈接,然后將鏈接存儲進ZooKeeper的隊列節點里,每個節點的value都只是鏈接,然后消費者從中獲取一條url進行抓取。本項目生產這主要是用來生產URL即可,這部分就不要要求太多。然后是消費者,消費者需要解決的問題有:
1.隊列如何保證自己的分發正確;
2.消費這如何進行高效的抓取。
二、ZooKeeper隊列原理
2.1 介紹
分布式隊列,目前此類產品大多類似於ActiveMQ、RabbitMQ等,本文主要介紹的是Zookeeper實現的分布式隊列,它的實現方式也有兩種,一種是FIFO(先進先出)的隊列,另一種是等待隊列元素聚集之后才統一安排的Barrier模型。同樣,本文主要講的是FIFO的隊列模型。其大體設計思路也很簡單,主要是在/SinaQueue下創建順序節點,如/SinaQueue/qn-000000000,創建完節點之后,根據下面的4個步驟來決定執行的順序。
1.通過調用getChildren()接口來獲取某一節點下的所有節點,即獲取隊列中的所有元素。
2.確定自己的節點序號在所有子節點中的順序。
3.如果自己不是序號最小的子節點,那么就需要進入等待,同時向比自己序號小的最后一個節點注冊Watcher監聽。
4.接收到Watcher通知后,重復步驟1。
2.2 Watcher介紹
znode以某種方式發生變化時,“觀察”(watch)機制可以讓客戶端得到通知.可以針對ZooKeeper服務的“操作”來設置觀察,該服務的其他 操作可以觸發觀察。
1.Watch是一次性的,每次都需要重新注冊,並且客戶端在會話異常結束時不會收到任何通知,而快速重連接時仍不影響接收通知。
2.Watch的回調執行都是順序執行的,並且客戶端在沒有收到關注數據的變化事件通知之前是不會看到最新的數據,另外需要注意不要在Watch回調邏輯中阻塞整個客戶端的Watch回調
3.Watch是輕量級的,WatchEvent是最小的通信單元,結構上只包含通知狀態、事件類型和節點路徑。ZooKeeper服務端只會通知客戶端發生了什么,並不會告訴具體內容。
2.3 源碼
在csdn上找到了某個人寫的這個過程,使用的是ZKClient,有興趣可以看看傑布斯的博客,但是沒有實現上面過程的第三步(Watcher相關的),這里,我們使用的是Zookeeper的另一個客戶端工具curator,其中,curator實現了各種Zookeeper的特性,如:Election(選舉),Lock(鎖),Barrier(關卡),Atonmic(原子量),Cache(緩存),Queue(隊列)等。我們來看看Curator實現的簡單的分布式隊列的源碼。
public class SimpleDistributedQueue {
...
private final CuratorFramework client;//連接Zookeeper的客戶端
private final String path;//路徑
private final EnsureContainers ensureContainers;//確保原子特性
private final String PREFIX = "qn-";//順序節點的同意前綴,使用qn-
...
其中PREFIX是用來生成順序節點的,默認不可更改,將生成的路徑賦予給path,然后向節點賦予數據。下面是賦予數據的代碼
public boolean offer(byte[] data) throws Exception {
String thisPath = ZKPaths.makePath(this.path, "qn-");//生成的路徑
((ACLBackgroundPathAndBytesable)this.client.create().creatingParentContainersIfNeeded().withMode(CreateMode.PERSISTENT_SEQUENTIAL)).forPath(thisPath, data);//如果沒有路徑將生成持久化的路徑然后存儲節點的數據。
return true;
}
最關鍵的來了,隊列如何保證自己的分發正確?SimpleDistributedQueue使用take()來取得隊列的頭部,然后將頭部刪掉,這一過程的一致性是通過CountDownLatch和Watcher來實現的。
public byte[] take() throws Exception {//直接調用interPoll,並將超時的設置為0;
return this.internalPoll(0L, (TimeUnit)null);
}
private byte[] internalPoll(long timeout, TimeUnit unit) throws Exception {
...//忽略超時的設置代碼
while(true) {
final CountDownLatch latch = new CountDownLatch(1);//定義一個latch,設置為1,先加鎖,然后執行完任務后再釋放鎖
Watcher watcher = new Watcher() {
public void process(WatchedEvent event) {
latch.countDown();
}
};
byte[] bytes;
try {
bytes = this.internalElement(true, watcher);//調用internalElement函數來獲取字節流
} catch (NoSuchElementException var17) {
}
...
if (hasTimeout) {
long elapsedMs = System.currentTimeMillis() - startMs;
long thisWaitMs = maxWaitMs - elapsedMs;
if (thisWaitMs <= 0L) { //如果等待超時了則返回為空
return null;
}
latch.await(thisWaitMs, TimeUnit.MILLISECONDS);
} else {
latch.await();
}
}
}
private byte[] internalElement(boolean removeIt, Watcher watcher) throws Exception {
this.ensurePath();
List nodes;
try {
nodes = watcher != null ? (List)((BackgroundPathable)this.client.getChildren().usingWatcher(watcher)).forPath(this.path) : (List)this.client.getChildren().forPath(this.path);//獲取節點下的所有子節點注冊監聽(watcher默認都不是為空的,每一個都注冊)
} catch (NoNodeException var8) {
throw new NoSuchElementException();
}
Collections.sort(nodes);//對節點進行排序
Iterator var4 = nodes.iterator();
while(true) {//遍歷
while(var4.hasNext()) {
String node = (String)var4.next();//取得當前頭結點
if (node.startsWith("qn-")) {
String thisPath = ZKPaths.makePath(this.path, node);
try {
byte[] bytes = (byte[])this.client.getData().forPath(thisPath);
if (removeIt) {
this.client.delete().forPath(thisPath);//刪除該節點
}
return bytes;//返回節點的字節流
...
}
三、多線程並發
對於分布式爬蟲來說,讓每一個消費者高效的進行抓取是具有重要意義的,為了加快爬蟲的速度,采用多線程爬蟲的方法。Java多線程實現方式主要有三種:繼承Thread類、實現Runnable接口、使用ExecutorService、Callable、Future實現有返回結果的多線程。其中前兩種方式線程執行完后都沒有返回值,只有最后一種是帶返回值的。其中使用Executors提供了四種聲明線程池的方法,分別是newCachedThreadPool、newFixedThreadPool、newSingleThreadExecutor和newScheduledThreadPool,為了監控實時監控隊列的長度,我們使用數組型的阻塞隊列ArrayBlockingQueue。聲明方式如下:
private static final BlockingQueue<Runnable> queuelength = new ArrayBlockingQueue<>(1000);
ExecutorService es = new ThreadPoolExecutor(CORE, CORE,
0L, TimeUnit.MILLISECONDS,
queuelength);
四、使用
本次實驗主要環境如下:
zookeeper.version=3.5
java.version=1.8.0_65
os.arch=amd64
i5 四核心CPU
網速為中國電信100M
這里主要是對博客園中的前兩千條博客進行爬取,本文主要是對分布式隊列的理解,就不再進行什么難度的處理(比如元素的選取、數據的存儲等),只輸出每篇博客的title即可。
生產者代碼:
public class Producer {
//logger
private static final Logger logger = LoggerFactory.getLogger(Producer.class);
public static final CuratorFramework client = CuratorFrameworkFactory.builder().connectString("119.23.46.71:2181")
.sessionTimeoutMs(1000)
.connectionTimeoutMs(1000)
.canBeReadOnly(false)
.retryPolicy(new ExponentialBackoffRetry(1000, Integer.MAX_VALUE))
.defaultData(null)
.build();
private static SimpleDistributedQueue queue = new SimpleDistributedQueue(client, "/Queue");
private static Integer j = 0;
public static void begin(String url) {//對博客園的每一頁進行爬取
try {
String content = HttpHelper.getInstance().get(url);
resolveweb(content);
} catch (Exception e) {
logger.error("", e);
}
}
public static void resolveweb(String content) throws Exception {
Elements elements = Jsoup.parse(content).select("a.titlelink");//對每篇博客的標題進行獲取
for (Element element : elements) {
String url = element.attr("href");//
if (StringUtils.isNotEmpty(url) && !url.contains("javascript") && !url.contains("jump")) {//去除a中調用href過程
logger.info(url + " " + String.valueOf(j++));
queue.offer(url.getBytes());
}
}
}
public static void main(String[] args) {
client.start();
for (int i = 0; i < 100; i++) {
begin("https://www.cnblogs.com/#p" + String.valueOf(i));
}
}
}
消費者
public class Consumer {
//logger
private static final Logger logger = LoggerFactory.getLogger(Consumer.class);
private static final CuratorFramework client = CuratorFrameworkFactory.builder().connectString("119.23.46.71:2181")
.sessionTimeoutMs(1000)
.connectionTimeoutMs(1000)
.canBeReadOnly(false)
.retryPolicy(new ExponentialBackoffRetry(1000, Integer.MAX_VALUE))
.defaultData(null)
.build();
private static SimpleDistributedQueue queue = new SimpleDistributedQueue(client, "/SinaQueue");
private static Integer i = 0;
private static final Integer CORE = Runtime.getRuntime().availableProcessors();
//聲明為一個數組型的阻塞隊列,這里限制大小為
private static final BlockingQueue<Runnable> queuelength = new ArrayBlockingQueue<>(1000);
static class CBCrawler implements Runnable {
private String url;
public CBCrawler(String url) {
this.url = url;
}
@Override
public void run() {
String content = HttpHelper.getInstance().get(url);
logger.info(url + " " + Jsoup.parse(content).title());//打印網頁的標題
}
}
public static void begin() {
try {
ExecutorService es = new ThreadPoolExecutor(CORE, CORE,
0L, TimeUnit.MILLISECONDS,
queuelength);
while (client.getChildren().forPath("/SinaQueue").size() > 0) {
CBCrawler crawler = new CBCrawler(new String(queue.take()));
es.submit(crawler);//執行爬蟲
i = i + 1;
logger.info(String.valueOf(i) + " is finished\n" + " queue size is" + queuelength.size());//監控當前隊列的長度
}
if (!es.isShutdown()) {//如果線程池沒有關閉則關閉
es.shutdown();
}
} catch (Exception e) {
logger.error("", e);
}
}
public static void main(String[] args) {
long start = System.currentTimeMillis();
client.start();
begin();
client.close();
logger.info("start time: " + start);
long end = System.currentTimeMillis();
logger.info("end time: " + end);
logger.info("take time: " + String.valueOf(end - start));//記錄開始時間和結束時間
}
}
由於在隊列的take中使用了CountDownLatch和Collections.sort(nodes)進行排序,耗時過程變長了不少,2000個節點,單台服務器和多台服務器的耗時是一樣的,都是9分鍾,具體實驗見下面。
實驗結果
生產者生產URL:
單機模式下的消費者,耗時:560825/(1000*60)=9分鍾
分布式模式下的抓取:
耗時:564374/(1000*60)=9分鍾:
由圖可見,當每個消費者處理能力大於隊列分配的能力時,耗時的過程反而是在隊列,畢竟分布式隊列在進行take動作的時候對節點進行了加鎖,還要對隊列進行排序,特別是在節點多達2000+的情況下,耗時是十分嚴重的。
實驗二
實驗二的主要解決的問題是將消費者處理的耗時延長,我們使用Thread.sleep(n)來模擬時長。由於博客園突然連不上,為了減少這種不可控的故障,抓取的網頁改為新浪,並將抓取后的URL以文本形式保存下來。
public static void sleepUtil(Integer time) {
try {
Thread.sleep(time * 1000);
} catch (Exception e) {
logger.error("線程sleep異常", e);
}
}
此時再看程序的輸出,可以看出,隊列的分發能力已經大於消費者的處理能力,總算是正常了。
分布式隊列分發的時間是:341998/(1000*60)=5.6分鍾
2017-10-30 08:55:48.458 [main] INFO com.crawler.Consumer - start time: 1509324606460
2017-10-30 08:55:48.458 [main] INFO com.crawler.Consumer - end time: 1509324948458
2017-10-30 08:55:48.458 [main] INFO com.crawler.Consumer - take time: 341998
兩台機子抓取完畢的耗時分別是:
A服務器:08:49:54.509——09:02:07
B服務器:08:49:54.509——09:05:05
單機的時候分發時間是:353198/(1000*60)=5.8分鍾
2017-10-30 09:30:25.812 [main] INFO com.crawler.Consumer - start time: 1509326672614
2017-10-30 09:30:25.812 [main] INFO com.crawler.Consumer - end time: 1509327025812
2017-10-30 09:30:25.812 [main] INFO com.crawler.Consumer - take time: 353198
耗時
09:24:33.391——09:51:44.733
分布式下平均耗時約為13分鍾,單機模式下耗時約為27分鍾,還是蠻符合估算的。
總結
源代碼都放在這里了,有興趣的可以star一下或者下載看一下,也歡迎大家提提意見,沒企業級的實戰環境,見笑了O(∩_∩)O~
歡迎訪問我的個人網站
個人網站網址:http://www.wenzhihuai.com
個人網站代碼地址:https://github.com/Zephery/newblog