二.Consumer、Producer簡單例子


1、先導入jar包,我使用的是maven

<dependency>
            <groupId>com.alibaba.rocketmq</groupId>
            <artifactId>rocketmq-all</artifactId>
            <version>3.4.9</version>
            <type>pom</type>
        </dependency>
        <!-- rocker-mq核心:start -->
        <dependency>
            <groupId>com.aliyun.openservices</groupId>
            <artifactId>ons-client</artifactId>
            <version>1.2.1</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>3.2.6</version>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>1.1.1</version>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-core</artifactId>
            <version>1.1.1</version>
        </dependency>
2、消息訂閱
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.message.MessageExt;

/**
 * 消息訂閱
 * 
 * @author admin
 *
 */
public class Consumer {
    public static void main(String[] args) throws InterruptedException, MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("QuickStartConsumer");
        /**
         * 1、負責查找NameServer,多個NameServer地址用分號隔開
         */
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.setInstanceName("QuickStartConsumer");
        
         /** 
         * 訂閱指定topic下tags分別等於TagA或TagC或TagD 
         */  
        consumer.subscribe("TopicModel","TagA");  
        /** 
         * 訂閱指定topic下所有消息<br> 
         * 注意:一個consumer對象可以訂閱多個topic 
         */  
        consumer.subscribe("TopicMode2","*");  

        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.println(Thread.currentThread().getName() + " 接收消息: " + msgs.size());
                MessageExt msg = msgs.get(0);
                if (msg.getTopic().equals("TopicModel")) {
                    // 執行TopicModel的消費邏輯
                    if (msg.getTags() != null && msg.getTags().equals("TagA")) {
                        // 執行TagA的消費
                        System.out.println("執行消費邏輯:"+new String(msg.getBody()));
                    }
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        /** 
         * Consumer對象在使用之前必須要調用start初始化,初始化一次即可<br> 
         */  
        consumer.start();
        System.out.println("\nConsumer Started.========>運行...\n");
    }
}
3、消息產生者Producer
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;

/**
 * 消息產生者Producer
 * 
 * @author admin
 *
 */
public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        /**
         * 一個應用創建一個Producer,由應用來維護此對象,可以設置為全局對象或者單例<br>
         * 注意:ProducerGroupName需要由應用來保證唯一<br>
         * ProducerGroup這個概念發送普通的消息時,作用不大,但是發送分布式事務消息時,比較關鍵,
         * 因為服務器會回查這個Group下的任意一個Producer
         */
        DefaultMQProducer producer = new DefaultMQProducer("QuickStartProducer");
        /**
         * 1、負責查找NameServer,多個NameServer地址用分號隔開
         */
        producer.setNamesrvAddr("127.0.0.1:9876");
        /**
         * 2、客戶端實例名稱(這個實例包含網絡連接、線程資源等)
         */
        producer.setInstanceName("QuickStartProducer");
        /**
         * 3、Producer對象在使用之前必須要調用start初始化,初始化一次即可<br>
         * 注意:切記不可以在每次發送消息時,都調用start方法
         */
        producer.start();
        /**
         * 這里模擬了 10 次消息發送,一個Producer對象可以發送多個topic,多個tag的消息。
         * //發生消息時必須聲明區分Message msg = new Message("MQ_TOPIC","TagA","Hello MQ".getBytes());:
         */
        for (int I = 1; I <=1; I++) {
            try {
                {
                    Message msg = new Message("TopicModel", // topic:必填,線下環境不需要申請,線上環境需要申請后才能使用
                            "TagA", // tag:設置的標簽,方便服務器過濾使用。目前只支持每個消息設置一個tag
                            "OrderID001",// key  :保證key唯一,訂單號,商品Id等,代表這條消息的業務關鍵詞,可以在Console系統根據Topic、Keys來查詢消息
                            ("產品:內容......" + I).getBytes()// body:序列化應用,Producer與Consumer要協商好序列化形式。
                    );
                    SendResult sendResult = producer.send(msg);
                    System.out.println(sendResult);
                }
                {
                    Message msg = new Message("TopicModel",
                            "TagB", 
                            "OrderID002",
                            ("特定信息:......" + I).getBytes()
                    );
                    SendResult sendResult = producer.send(msg);
                    System.out.println(sendResult);
                }
                {
                    Message msg = new Message("TopicMode2",
                            "TagA", 
                            "OrderID002",
                            ("訂單:......" + I).getBytes()
                    );
                    SendResult sendResult = producer.send(msg);
                    System.out.println(sendResult);
                }
            } catch (Exception e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }
        }
        /**
         * 應用退出時,要調用shutdown來清理資源,關閉網絡連接,從MetaQ服務器上注銷自己
         * 注意:我們建議應用在JBOSS、Tomcat等容器的退出鈎子里調用shutdown方法
         */
        // producer.shutdown();
        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {

            @Override
            public void run() {
                producer.shutdown();
                System.out.println("\nProducer========>運行...\n");
            }
        }));
        System.exit(0);
    }
}
⦁    消息產生者Producer
       /**
        * 一個應用創建一個Producer,由應用來維護此對象,可以設置為全局對象或者單例
         * 注意:ProducerGroupName需要由應用來保證唯一
         * ProducerGroup這個概念發送普通的消息時,作用不大,但是發送分布式事務消息時,比較關鍵,
         * 因為服務器會回查這個Group下的任意一個Producer
         */
        DefaultMQProducer producer = new DefaultMQProducer("QuickStartProducer");
        /**
         * 1、負責查找NameServer,多個NameServer地址用分號隔開
         */
        producer. setNamesrvAddr ("127.0.0.1:9876");
        /**
         * 2、客戶端實例名稱(這個實例包含網絡連接、線程資源等)
         */
        producer. setInstanceName("QuickStartProducer");
        /**
         * 3、Producer對象在使用之前必須要調用start初始化,初始化一次即可
         * 注意:切記不可以在每次發送消息時,都調用start方法
         */
        producer. start ();


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM