Zookeeper和Curator-Framework實踐之:分布式消息隊列


之前寫過:

  1. Curator-Framework開源Zookeeper快速開發框架介紹
  2. Zookeeper和Curator-Framework實踐系列之: 配置管理

本文說的是ZK另一個重要使用場景,消息隊列!

場景

一個典型的生產消費者模型,如下圖:

WEB點提交要處理的數據,注意是多結點的也就是多個生產者,數量可能比較大。在后台有個處理者也就是消費者,注意前后是分開的,生產者應用本身不做消費,而Curator提供的API好像默認是一起的,一個應用既是生產者又可以做消費。

配置

一切都基於前面的示例Zookeeper和Curator-Framework實踐系列之: 配置管理,下面只說不一樣的地方。

applicationContext.xml

同樣與Zookeeper和Curator-Framework實踐系列之: 配置管理相同,只需配置一下zookeeperFactoryBeanlisteners增加或修改成DistributedQueueDemo

<bean id="zookeeperFactoryBean" class="cn.bg.zk.core.ZookeeperFactoryBean" lazy-init="false">
    <property name="zkConnectionString" value="zookeepermaster:2181"/>
    <property name="listeners">
        <list>
            <bean class="cn.bg.zk.queues.DistributedQueueDemo"></bean>
        </list>
    </property>
</bean>

添加一個bean,指定CuratorFramework,充當生產者時需用用它來添加數據到隊列

<bean id="mainController" class="cn.bg.controller.MainController">
    <constructor-arg ref="zookeeperFactoryBean" />
</bean>

代碼

DistributedQueueDemo.java

分布式隊列處理類

package cn.bg.zk.queues;

public class DistributedQueueDemo implements IZKListener{

    //申明兩個隊列實例
    private DistributedQueue<String> queue1 = null;
    private DistributedQueue<String> queue2 = null;

    //數據系列化轉換工具類
    private QueueSerializer<String> serializer = new QueueItemSerializer();

    //消費者處理方法
    private QueueConsumer<String> consumer = new QueueConsumer<String>() {
        @Override
        public void consumeMessage(String message) throws Exception {
            //線程等待5秒,模擬數據處理,以達到數據搶奪的目的
            Thread.sleep(5000);
            //打印出是哪個線程處理了哪些數據
            System.out.println(Thread.currentThread().getId() +  " consume: " + message);
        }

        @Override
        public void stateChanged(CuratorFramework client, ConnectionState newState) {
            System.out.println("new state: " + newState);
        }
    };

    //Spring啟動時調用此方法以啟動所有隊列實例    
    @Override
    public void executor(CuratorFramework client) {
        //實例化所有隊列,指定ZK隊列數據獲取地址,和其它參數
        //由於它們的地址是相同的,都是*/zk_queue_test*,所以Curator會根據它們的空閑狀態來分配新的任務,上面通過線程暫停5秒來拉開它們的處理間隔。
        queue1 = QueueBuilder.builder(client, consumer, serializer, "/zk_queue_test").buildQueue();
        queue2 = QueueBuilder.builder(client, consumer, serializer, "/zk_queue_test").buildQueue();

        try {
            //啟動所有隊列實例,讓它們開始工作,注意所有指定的動作只有在調用了queue1.start()方法之后才會被執行,比如queue.put()等。
            //Curator提供了queue.put()方法來往隊列里添加數據,但它同時也會處理,但我們不想這樣,所以添加的過程我們通過其它的方式來實現。
            queue1.start();
            queue2.start();
            System.out.println("Queues started!");
        } catch (Exception e) {

        }
    }
}

QueueSerializer.java

數據系列化處理工具類

package cn.bg.zk.queues;

public class QueueItemSerializer implements QueueSerializer<String>
{
    @Override
    public byte[] serialize(String item)
    {
        return item.getBytes();
    }

    @Override
    public String deserialize(byte[] bytes)
    {
        return new String(new String(bytes));
    }
}

上面的是消息隊列處理的部分,下面開始消息添加,也就是生產者部分:

生產者是一個Controller,也就是通過用戶提交數據來做為生產者

MainController.java

package cn.bg.controller;

@Controller
public class MainController {

    private final CuratorFramework zkClient;
    //通過Spring注入CuratorFramework實例
    public MainController(final CuratorFramework zkClient) {
        Assert.notNull(zkClient, "zkClient cannot be null");
        this.zkClient = zkClient;
    }

    //簡單的使用傳遞值來做數據處理的實體
    @RequestMapping("/put/{val}")
    @ResponseBody
    public String put(@PathVariable String val) throws Exception {
        //需要使用特定的格式來添加數據到隊列,使用ItemSerializer來做格式化生成byte。
        byte[] bytes = ItemSerializer.serialize(val, new QueueItemSerializer());
        String path = "" ;    

        //創建znode並添加數據
        path = zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath("/zk_queue_test/queue-");
        zkClient.setData().forPath(path, bytes);
        return path;
    }

}

ItemSerializer.java

這個類是格式化數據,也就是設置一些znode的屬性,並生成byte
此類來自Curator源碼的簡化版,主要目的是分離Curator Queue來添加隊列數據用到。

package cn.bg.zk.queues;

public class ItemSerializer {
    private static final int VERSION = 0x00010001;

    private static final byte ITEM_OPCODE = 0x01;
    private static final byte EOF_OPCODE = 0x02;

    private static final int INITIAL_BUFFER_SIZE = 0x1000;

    public static <T> byte[] serialize(T item, QueueSerializer<T> serializer) throws Exception {
        ByteArrayOutputStream bytes = new ByteArrayOutputStream(INITIAL_BUFFER_SIZE);
        DataOutputStream out = new DataOutputStream(bytes);
        out.writeInt(VERSION);

        byte[] itemBytes = serializer.serialize(item);
        out.writeByte(ITEM_OPCODE);
        out.writeInt(itemBytes.length);
        if (itemBytes.length > 0) {
            out.write(itemBytes);
        }

        out.writeByte(EOF_OPCODE);
        out.close();

        return bytes.toByteArray();
    }
}

運行

啟動應用后所有隊列都已處理待命狀態,理論上只要在ZK目錄/zk_queue_test/添加數據就會被處理掉,只是它有固定的添加格式。

通過訪問/put/{?}等這樣路徑數據{?}就會被添加到隊列並處理,所以可以刷多條數據到隊列來觀察隊列的處理狀態,基本的輸出應該是這樣的:

17 consume: 111
18 consume: 222
17 consume: 111
18 consume: 222
17 consume: 111
17 consume: 111

整個過程基本完成,經測試運行狀態良好,Curator自己維護與ZK集群的連接,本人通過JMX將應用與ZK的連接強制斷開后Curator主動識別並重新連接,基本不用擔心一些基礎問題上處理,可以專心解決我們的業務需要。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM