問題場景:
1.分布式環境下,支持一系列的任務,任務模式類似,在多機器多線程環境下能夠讓數據不重復也不遺漏的執行。
2.任務執行需要一定耗時,要不斷輪詢查看狀態。
比較適合使用動物園管理員zoo keeper來維護任務的狀態,提供一系列的原子操作,實現分布式環境下的線程調度。
筆者淺顯的認為:
1.分布式環境下代來的高並發好處非常大,但需要引入一些依賴如中心節點來進行系統的調度。比如消息中間件吞吐量可以大到爆而且很很好的拓展性,但同樣需要維護一個注冊中心式的東西來利用心跳,長鏈接等方式存儲活着的客戶端機器。才能對消息進行分發。
2.即便是單機環境下,多線程之間的數據共享,能達到多大的並發度鎖的粒度也是很關鍵的,最終都有一個需要串行畫的地方來保證線程安全,數據一致性。
有描述不准確的地方可勁拍磚,謝謝。
回歸正題:
如何對任務進行調度,讓任務各司其職有具備一定的個性,有木有感覺像策略模式。
如何讓任務和調度有效的分離。
抽象問題:
任務***1,每台機器1個線程,選取執行數據,執行任務代碼。
任務***2,每台機器2個線程,選取執行數據,執行任務代碼。
任務***3,每台機器4個線程,選取執行數據,執行任務代碼。
。。。點點滴滴。。。
任務設計:
抽象基本任務BaseJob繼承runnable。
run方法執行過程:
1)校驗當前機器是否能夠執行當前線程。查詢zk節點數據當前機器ip是否與分派機器相同。
2)select task,選擇數據分片。
3)執行數據分片。
4)釋放任務(刪除子節點),心跳線程重現分配任務。
5)睡一會。
調度設計:啟動時與zk server創建鏈接。啟動一個HeartBeatTimeTask extends TimerTask,每隔幾秒進行schedule server的刷新,任務的重新分配。
1.更新server節點,釋放當前機器還活着。
2.如果當前機器時leader,重新分配任務給server。
3.當前任務節點創建zk子節點,寫入分配的server ip(任務執行時校驗任務是否分配給當前機器)。
任務啟動:
1.將每種任務的信息(任務執行class,啟動線程數,任務名)寫入map。
2.便利map,遍歷map,根據線程數和class 創建 instance,寫入線程池。
e.g.
任務TaskXJob,每台機器啟動4個線程(線程TaskXJob_4_0,TaskXJob_4_1,TaskXJob_4_2,TaskXJob_4_3),共有三台機器執行這個任務S1,S2,S3。
S1,S2,S3中都啟動這四個線程Job,繼承了BaseJob。

import java.util.List; public abstract class BaseJob<T> implements Runnable { private static final Integer BASE_SLEEP_TIME = 30000; protected String name; // 任務名 protected Integer fetchNum; // 每次select取數據數量 protected Integer modNum; // 數據分配模 e.g. mod = 8 taskNum = id % modNum protected Integer taskNum; @Override public void run() { while (true) { execute(); sleep(); } } protected void execute() { Boolean isOwner = false; try { isOwner = isOwner(name); if (!isOwner) { return; } List<T> tasks = selectTasks(); if (tasks == null || tasks.isEmpty()) { return; } for (T task : tasks) { executeTask(task); } } catch (Exception e) { } finally { releaseTask(); } } /** * 選擇數據分配 * @return */ protected abstract List<T> selectTasks(); /** * 執行任務 * @param task */ protected abstract void executeTask(T task); /** * 通過zk client看任務節點下的分配機器ip與當前機器是否相同 * @param name * @return */ protected boolean isOwner(String name) { return true; } /** * 釋放之后任務可以由leader重新分配,釋放過程就是刪除zk節點下的任務執行信息節點 */ protected void releaseTask() { } /** * 睡多少s由具體任務決定 */ protected void sleep() { try { Thread.sleep(BASE_SLEEP_TIME); } catch (InterruptedException e) { return; } } }
TaskXJob和TaskXDO沒有具體給實現,就是業務代碼

import java.util.List; public class TaskXJob extends BaseJob<TaskXDO> { @Override protected List<TaskXDO> selectTasks() { // db中選擇任務分片,每個線程在zk節點上注冊,跟進modNum,taskItemNum,fetchNum取出數據 return null; } @Override protected void executeTask(TaskXDO task) { // TODO 執行業務操作 System.out.println((task != null) ? task.getValue() : "task is null"); } }

public class TaskXDO { private String value; public String getValue() { return value; } public void setValue(String value) { this.value = value; } }
每個線程在zk樹上都有一個注冊節點,並包含了任務個性化的信息,fetchNum,modNum,taskItemNum等,任務線程在執行過程中會跟進這些信息取出數據。
比如適應db的主鍵id % modNum == taskItemNum,TaskXJob_4_0 select 到的任務數據就是id為0,4,8,12....
任務調度,zk樹結構如下:
應用啟動時運行一個心跳線程,做兩件事。
1.將新的alive的server注冊到servers的子節點下,清除過期節點。
2.遍歷所有任務節點,分配這些alive的機器,將分配到的執行機器寫到任務的子節點上。(任務是隨機分配的)
執行任務分配只有一台leader機器執行,leader選取的時version最大的機器。
只有一台機器能夠去分配任務,分配的任務不同的線程執行的數據沒有交集且每個任務線程同一時刻只能在一台機器上執行,就能夠保證任務不被重復的執行。
zk client的代碼就不貼出來了,網上很多。
設計優缺點:
優點:能夠滿足需求,任務的調度分配和執行,結構比較簡單。
缺點:
1.拓展性差,想要增減線程需要修改任務啟動代碼,修改注冊節點。
2.任務分配采用的是隨機分配的方式,不能根據當前狀態做到負載。
3.任務多的時候,線程會爆炸,機器擴容也不能夠解決這個問題。
后續將會介紹tbschedule的思想,解決上面的問題。
看到篇大牛的分布式系統設計鏈接貼下:
http://www.cnblogs.com/ccdev/p/3338412.html
http://www.cnblogs.com/ccdev/p/3340484.html
http://www.cnblogs.com/ccdev/p/3341234.html