问题场景:
1.分布式环境下,支持一系列的任务,任务模式类似,在多机器多线程环境下能够让数据不重复也不遗漏的执行。
2.任务执行需要一定耗时,要不断轮询查看状态。
比较适合使用动物园管理员zoo keeper来维护任务的状态,提供一系列的原子操作,实现分布式环境下的线程调度。
笔者浅显的认为:
1.分布式环境下代来的高并发好处非常大,但需要引入一些依赖如中心节点来进行系统的调度。比如消息中间件吞吐量可以大到爆而且很很好的拓展性,但同样需要维护一个注册中心式的东西来利用心跳,长链接等方式存储活着的客户端机器。才能对消息进行分发。
2.即便是单机环境下,多线程之间的数据共享,能达到多大的并发度锁的粒度也是很关键的,最终都有一个需要串行画的地方来保证线程安全,数据一致性。
有描述不准确的地方可劲拍砖,谢谢。
回归正题:
如何对任务进行调度,让任务各司其职有具备一定的个性,有木有感觉像策略模式。
如何让任务和调度有效的分离。
抽象问题:
任务***1,每台机器1个线程,选取执行数据,执行任务代码。
任务***2,每台机器2个线程,选取执行数据,执行任务代码。
任务***3,每台机器4个线程,选取执行数据,执行任务代码。
。。。点点滴滴。。。
任务设计:
抽象基本任务BaseJob继承runnable。
run方法执行过程:
1)校验当前机器是否能够执行当前线程。查询zk节点数据当前机器ip是否与分派机器相同。
2)select task,选择数据分片。
3)执行数据分片。
4)释放任务(删除子节点),心跳线程重现分配任务。
5)睡一会。
调度设计:启动时与zk server创建链接。启动一个HeartBeatTimeTask extends TimerTask,每隔几秒进行schedule server的刷新,任务的重新分配。
1.更新server节点,释放当前机器还活着。
2.如果当前机器时leader,重新分配任务给server。
3.当前任务节点创建zk子节点,写入分配的server ip(任务执行时校验任务是否分配给当前机器)。
任务启动:
1.将每种任务的信息(任务执行class,启动线程数,任务名)写入map。
2.便利map,遍历map,根据线程数和class 创建 instance,写入线程池。
e.g.
任务TaskXJob,每台机器启动4个线程(线程TaskXJob_4_0,TaskXJob_4_1,TaskXJob_4_2,TaskXJob_4_3),共有三台机器执行这个任务S1,S2,S3。
S1,S2,S3中都启动这四个线程Job,继承了BaseJob。

import java.util.List; public abstract class BaseJob<T> implements Runnable { private static final Integer BASE_SLEEP_TIME = 30000; protected String name; // 任务名 protected Integer fetchNum; // 每次select取数据数量 protected Integer modNum; // 数据分配模 e.g. mod = 8 taskNum = id % modNum protected Integer taskNum; @Override public void run() { while (true) { execute(); sleep(); } } protected void execute() { Boolean isOwner = false; try { isOwner = isOwner(name); if (!isOwner) { return; } List<T> tasks = selectTasks(); if (tasks == null || tasks.isEmpty()) { return; } for (T task : tasks) { executeTask(task); } } catch (Exception e) { } finally { releaseTask(); } } /** * 选择数据分配 * @return */ protected abstract List<T> selectTasks(); /** * 执行任务 * @param task */ protected abstract void executeTask(T task); /** * 通过zk client看任务节点下的分配机器ip与当前机器是否相同 * @param name * @return */ protected boolean isOwner(String name) { return true; } /** * 释放之后任务可以由leader重新分配,释放过程就是删除zk节点下的任务执行信息节点 */ protected void releaseTask() { } /** * 睡多少s由具体任务决定 */ protected void sleep() { try { Thread.sleep(BASE_SLEEP_TIME); } catch (InterruptedException e) { return; } } }
TaskXJob和TaskXDO没有具体给实现,就是业务代码

import java.util.List; public class TaskXJob extends BaseJob<TaskXDO> { @Override protected List<TaskXDO> selectTasks() { // db中选择任务分片,每个线程在zk节点上注册,跟进modNum,taskItemNum,fetchNum取出数据 return null; } @Override protected void executeTask(TaskXDO task) { // TODO 执行业务操作 System.out.println((task != null) ? task.getValue() : "task is null"); } }

public class TaskXDO { private String value; public String getValue() { return value; } public void setValue(String value) { this.value = value; } }
每个线程在zk树上都有一个注册节点,并包含了任务个性化的信息,fetchNum,modNum,taskItemNum等,任务线程在执行过程中会跟进这些信息取出数据。
比如适应db的主键id % modNum == taskItemNum,TaskXJob_4_0 select 到的任务数据就是id为0,4,8,12....
任务调度,zk树结构如下:
应用启动时运行一个心跳线程,做两件事。
1.将新的alive的server注册到servers的子节点下,清除过期节点。
2.遍历所有任务节点,分配这些alive的机器,将分配到的执行机器写到任务的子节点上。(任务是随机分配的)
执行任务分配只有一台leader机器执行,leader选取的时version最大的机器。
只有一台机器能够去分配任务,分配的任务不同的线程执行的数据没有交集且每个任务线程同一时刻只能在一台机器上执行,就能够保证任务不被重复的执行。
zk client的代码就不贴出来了,网上很多。
设计优缺点:
优点:能够满足需求,任务的调度分配和执行,结构比较简单。
缺点:
1.拓展性差,想要增减线程需要修改任务启动代码,修改注册节点。
2.任务分配采用的是随机分配的方式,不能根据当前状态做到负载。
3.任务多的时候,线程会爆炸,机器扩容也不能够解决这个问题。
后续将会介绍tbschedule的思想,解决上面的问题。
看到篇大牛的分布式系统设计链接贴下:
http://www.cnblogs.com/ccdev/p/3338412.html
http://www.cnblogs.com/ccdev/p/3340484.html
http://www.cnblogs.com/ccdev/p/3341234.html