之前寫過:
本文說的是ZK另一個重要使用場景,消息隊列!
場景
一個典型的生產消費者模型,如下圖:
WEB點提交要處理的數據,注意是多結點的也就是多個生產者,數量可能比較大。在后台有個處理者也就是消費者,注意前后是分開的,生產者應用本身不做消費,而Curator提供的API好像默認是一起的,一個應用既是生產者又可以做消費。
配置
一切都基於前面的示例Zookeeper和Curator-Framework實踐系列之: 配置管理,下面只說不一樣的地方。
applicationContext.xml
同樣與Zookeeper和Curator-Framework實踐系列之: 配置管理相同,只需配置一下zookeeperFactoryBean的listeners增加或修改成DistributedQueueDemo
<bean id="zookeeperFactoryBean" class="cn.bg.zk.core.ZookeeperFactoryBean" lazy-init="false">
<property name="zkConnectionString" value="zookeepermaster:2181"/>
<property name="listeners">
<list>
<bean class="cn.bg.zk.queues.DistributedQueueDemo"></bean>
</list>
</property>
</bean>
添加一個bean,指定CuratorFramework,充當生產者時需用用它來添加數據到隊列
<bean id="mainController" class="cn.bg.controller.MainController">
<constructor-arg ref="zookeeperFactoryBean" />
</bean>
代碼
DistributedQueueDemo.java
分布式隊列處理類
package cn.bg.zk.queues;
public class DistributedQueueDemo implements IZKListener{
//申明兩個隊列實例
private DistributedQueue<String> queue1 = null;
private DistributedQueue<String> queue2 = null;
//數據系列化轉換工具類
private QueueSerializer<String> serializer = new QueueItemSerializer();
//消費者處理方法
private QueueConsumer<String> consumer = new QueueConsumer<String>() {
@Override
public void consumeMessage(String message) throws Exception {
//線程等待5秒,模擬數據處理,以達到數據搶奪的目的
Thread.sleep(5000);
//打印出是哪個線程處理了哪些數據
System.out.println(Thread.currentThread().getId() + " consume: " + message);
}
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState) {
System.out.println("new state: " + newState);
}
};
//Spring啟動時調用此方法以啟動所有隊列實例
@Override
public void executor(CuratorFramework client) {
//實例化所有隊列,指定ZK隊列數據獲取地址,和其它參數
//由於它們的地址是相同的,都是*/zk_queue_test*,所以Curator會根據它們的空閑狀態來分配新的任務,上面通過線程暫停5秒來拉開它們的處理間隔。
queue1 = QueueBuilder.builder(client, consumer, serializer, "/zk_queue_test").buildQueue();
queue2 = QueueBuilder.builder(client, consumer, serializer, "/zk_queue_test").buildQueue();
try {
//啟動所有隊列實例,讓它們開始工作,注意所有指定的動作只有在調用了queue1.start()方法之后才會被執行,比如queue.put()等。
//Curator提供了queue.put()方法來往隊列里添加數據,但它同時也會處理,但我們不想這樣,所以添加的過程我們通過其它的方式來實現。
queue1.start();
queue2.start();
System.out.println("Queues started!");
} catch (Exception e) {
}
}
}
QueueSerializer.java
數據系列化處理工具類
package cn.bg.zk.queues;
public class QueueItemSerializer implements QueueSerializer<String>
{
@Override
public byte[] serialize(String item)
{
return item.getBytes();
}
@Override
public String deserialize(byte[] bytes)
{
return new String(new String(bytes));
}
}
上面的是消息隊列處理的部分,下面開始消息添加,也就是生產者部分:
生產者是一個Controller,也就是通過用戶提交數據來做為生產者
MainController.java
package cn.bg.controller;
@Controller
public class MainController {
private final CuratorFramework zkClient;
//通過Spring注入CuratorFramework實例
public MainController(final CuratorFramework zkClient) {
Assert.notNull(zkClient, "zkClient cannot be null");
this.zkClient = zkClient;
}
//簡單的使用傳遞值來做數據處理的實體
@RequestMapping("/put/{val}")
@ResponseBody
public String put(@PathVariable String val) throws Exception {
//需要使用特定的格式來添加數據到隊列,使用ItemSerializer來做格式化生成byte。
byte[] bytes = ItemSerializer.serialize(val, new QueueItemSerializer());
String path = "" ;
//創建znode並添加數據
path = zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath("/zk_queue_test/queue-");
zkClient.setData().forPath(path, bytes);
return path;
}
}
ItemSerializer.java
這個類是格式化數據,也就是設置一些znode的屬性,並生成byte
此類來自Curator源碼的簡化版,主要目的是分離Curator Queue來添加隊列數據用到。
package cn.bg.zk.queues;
public class ItemSerializer {
private static final int VERSION = 0x00010001;
private static final byte ITEM_OPCODE = 0x01;
private static final byte EOF_OPCODE = 0x02;
private static final int INITIAL_BUFFER_SIZE = 0x1000;
public static <T> byte[] serialize(T item, QueueSerializer<T> serializer) throws Exception {
ByteArrayOutputStream bytes = new ByteArrayOutputStream(INITIAL_BUFFER_SIZE);
DataOutputStream out = new DataOutputStream(bytes);
out.writeInt(VERSION);
byte[] itemBytes = serializer.serialize(item);
out.writeByte(ITEM_OPCODE);
out.writeInt(itemBytes.length);
if (itemBytes.length > 0) {
out.write(itemBytes);
}
out.writeByte(EOF_OPCODE);
out.close();
return bytes.toByteArray();
}
}
運行
啟動應用后所有隊列都已處理待命狀態,理論上只要在ZK目錄/zk_queue_test/添加數據就會被處理掉,只是它有固定的添加格式。
通過訪問/put/{?}等這樣路徑數據{?}就會被添加到隊列並處理,所以可以刷多條數據到隊列來觀察隊列的處理狀態,基本的輸出應該是這樣的:
17 consume: 111
18 consume: 222
17 consume: 111
18 consume: 222
17 consume: 111
17 consume: 111
整個過程基本完成,經測試運行狀態良好,Curator自己維護與ZK集群的連接,本人通過JMX將應用與ZK的連接強制斷開后Curator主動識別並重新連接,基本不用擔心一些基礎問題上處理,可以專心解決我們的業務需要。