RocketMq 廣播模式下 確保順序消費


生產者(指定隊列推送,或者默認創建主題時就創建一個隊列):
在發送消息時,自動創建服務器不存在的topic,默認創建的隊列數 默認值 4改成1即可,已有的主題只能指定了。

package com.apacherocketmq.test;
import java.io.UnsupportedEncodingException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;
import com.apacherocketmq.connection.RocketProducer;
public class BroadcastProducer
{
    private static int i = 0;
    private static int index = 0;
    private static long count =1L;
    @SuppressWarnings("unused")
    public static void main(String[] args)
            throws UnsupportedEncodingException, MQClientException, RemotingException, MQBrokerException, InterruptedException
    {
        DefaultMQProducer producer = RocketProducer.newInstance();
        producer.setProducerGroup("GroupE");
        producer.start();
        while (true)
        {
            if (i == 0)
            {
                i = 1;
                index++;
            } else if (i == 1)
            {
                i = 0;
            }
            Message message = new Message("GroupETopicA", "TagE", "e",
                    (" " + time() + " " + i + " " + index).getBytes(RemotingHelper.DEFAULT_CHARSET));
            SendResult sendResult = producer.send(message, new MessageQueueSelector()
            {
                public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg)
                {
                    //0號隊列
                    return mqs.get(0);
                }
            }, "ss");
//            Thread.sleep(2000);
            System.out.println(count++);
        }
        // producer.shutdown();
    }
    private static String time()
    {
        SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        return df.format(new Date());
    }
}

消費者(從指定隊列取,其他隊列的拋棄,若主題只有一個隊列則無所謂):

package com.apacherocketmq.test;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import com.apacherocketmq.connection.RocketMQPushConsumer;
import com.jdbc.MyJDBCUtil;
public class BroadcastConsumerTEST
{
    public static void main(String[] args) throws MQClientException
    {
        DefaultMQPushConsumer consumer = RocketMQPushConsumer.newInstance();
        // 消息模型,支持以下兩種:集群消費(clustering),廣播消費(broadcasting)
        consumer.setMessageModel(MessageModel.BROADCASTING);
        consumer.setConsumerGroup("GroupE");
        consumer.subscribe("GroupETopicA", "*");
        // consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        // consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        // 從指定時間消費
        // consumer.setConsumeTimestamp(UtilAll.timeMillisToHumanString3(System.currentTimeMillis()
        // - (1000 * 60 * 30)));
        consumer.setConsumeThreadMax(1);
        consumer.setConsumeThreadMin(1);
        consumer.registerMessageListener(new MessageListenerConcurrently()
        {
            private boolean isFirst = true;
            private long i = 0L;
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context)
            {
                if (isFirst)
                {
                    i = query("JYLQ");
                    isFirst = false;
                }
                long offset = msgs.get(0).getQueueOffset();
                int s = msgs.get(0).getQueueId();
                if (s != 0)
                {
                    //拋棄其他隊列數據
                    System.err.println("QueueId NOT 0");
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
                if (offset > i)
                {
                    update(offset);
                    String ss = offset + new String(msgs.get(0).getBody()) + " N \r\n";
                    System.out.println(ss);
                } else
                {
                    String t = offset + new String(msgs.get(0).getBody()) + " O \r\n";
                    System.err.println(t);
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
            private long query(String param)
            {
                long i = 0;
                String SQL = "SELECT CONFIG_VALUE FROM common_sys_config WHERE CONFIG_ITEM=?";
                ResultSet result = MyJDBCUtil.query(SQL, param);
                try
                {
                    result.next();
                    i = result.getLong("CONFIG_VALUE");
                } catch (SQLException e)
                {
                    e.printStackTrace();
                }
                return i;
            }
            private void update(long param)
            {
                String SQL1 = "UPDATE common_sys_config SET CONFIG_VALUE=? WHERE CONFIG_ITEM='JYLQ'";
                MyJDBCUtil.execute(SQL1, String.valueOf(param));
            }
        });
        consumer.start();
        System.out.println("Broadcast ConsumerA Started.");
    }
    public static void waitFor(long i)
    {
        try
        {
            Thread.sleep(1000);
        } catch (InterruptedException e)
        {
            e.printStackTrace();
        }
    }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM