RocketMQ簡介
RocketMQ基本概念
RocketMQ安裝運行
wusi@wusi-virtual-machine:~/桌面$ sudo wget https://archive.apache.org/dist/rocketmq/4.7.1/rocketmq-all-4.7.1-bin-release.zip
wusi@wusi-virtual-machine:~/桌面$ unzip rocketmq-all-4.7.1-bin-release.zip
root@wusi-virtual-machine:/home/wusi/桌面/rocketmq-all-4.7.1-bin-release# mkdir log
root@wusi-virtual-machine:/home/wusi/桌面/rocketmq-all-4.7.1-bin-release# nohup bin/mqnamesrv > log/mqname.log 2>&1 &
root@wusi-virtual-machine:/home/wusi/桌面/rocketmq-all-4.7.1-bin-release# nohup bin/mqbroker -n 172.16.20.246:9876 -c conf/broker.conf autoCreateTopicEnable=true > log/borker.log 2>&1 &
2>&1 的意思就是將標准錯誤重定向到標准輸出。
還需要修改runserver.sh,不要設置的太大
修改conf目錄下的broker.conf
修改runbroker.sh,不要設置的太大
RocketMQ架構方案及角色詳解
API的使用
生產者啟動后,發送消息時會報以下錯:
Exception in thread "main" org.apache.rocketmq.client.exception.MQClientException: No route info of this topic, TopicTest
原因:
使用RocketMQ進行發消息時,必須要指定topic,對於topic的設置有一個開關autoCreateTopicEnable,一般在開發測試環境中會使用默認設置autoCreateTopicEnable = true,
但是這樣就會導致topic的設置不容易規范管理,沒有統一的審核等等,所以在正式環境中會在Broker啟動時設置參數autoCreateTopicEnable = false。
但是,目前的版本中,autoCreateTopicEnable設置為true也不會生效
解決方法:
手動通過命令或管理界面創建主題
/usr/rocketmq/bin/mqadmin updateTopic -n '192.168.100.242:9876' -c DefaultCluster -t TopicTest
消費者代碼
package com.study.rocketmq.a151_simple;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import java.io.UnsupportedEncodingException;
import java.util.List;
/**
* 普通消息消費者
*/
public class Consumer {
public static final String NAME_SERVER_ADDR = "192.168.100.242:9876";
public static void main(String[] args) throws MQClientException {
// 1. 創建消費者(Push)對象
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("GROUP_TEST");
// 2. 設置NameServer的地址,如果設置了環境變量NAMESRV_ADDR,可以省略此步
consumer.setNamesrvAddr(NAME_SERVER_ADDR);
consumer.setMaxReconsumeTimes(-1);// 消費重試次數 -1代表16次
// 3. 訂閱對應的主題和Tag
consumer.subscribe("TopicTest", "*");
// 4. 注冊消息接收到Broker消息后的處理接口
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
try {
MessageExt messageExt = list.get(0);
System.out.printf("線程:%-25s 接收到新消息 %s --- %s %n", Thread.currentThread().getName(), messageExt.getTags(), new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
});
// 5. 啟動消費者(必須在注冊完消息監聽器后啟動,否則會報錯)
consumer.start();
System.out.println("已啟動消費者");
}
}
同步生產者代碼
package com.study.rocketmq.a151_simple;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.io.UnsupportedEncodingException;
/**
* 發送同步消息
* 可靠的同步傳輸用於廣泛的場景,如重要的通知消息,短信通知,短信營銷系統等。
*/
public class SyncProducer {
public static final String NAME_SERVER_ADDR = "192.168.100.242:9876";
public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException {
// 1. 創建生產者對象
DefaultMQProducer producer = new DefaultMQProducer("GROUP_TEST");
// 2. 設置NameServer的地址,如果設置了環境變量NAMESRV_ADDR,可以省略此步
producer.setNamesrvAddr(NAME_SERVER_ADDR);
// 3. 啟動生產者
producer.start();
// 4. 生產者發送消息
for (int i = 0; i < 10; i++) {
Message message = new Message("TopicTest", "TagA", ("Hello MQ:" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult result = producer.send(message);
System.out.printf("發送結果:%s%n", result);
}
// 5. 停止生產者
producer.shutdown();
}
}
異步生產者
package com.study.rocketmq.a151_simple;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.io.UnsupportedEncodingException;
import java.util.concurrent.CountDownLatch;
/**
* 異步消息
* 一般用來對方法調用響應時間有較嚴格要求的情況下,異步調用,立即返回
* 不同於同步的唯一在於: send方法調用的時候多攜帶一個回調接口參數,用來異步處理消息發送結果
*/
public class AsyncProducer {
public static final String NAME_SERVER_ADDR = "192.168.100.242:9876";
public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException {
// 1:創建生產者對象,並指定組名
DefaultMQProducer producer = new DefaultMQProducer("GROUP_TEST");
// 2:指定NameServer地址
producer.setNamesrvAddr(NAME_SERVER_ADDR);
// 3:啟動生產者
producer.start();
producer.setRetryTimesWhenSendAsyncFailed(0); // 設置異步發送失敗重試次數,默認為2
int count = 10;
CountDownLatch cd = new CountDownLatch(count);
// 4:循環發送消息
for (int i = 0; i < count; i++) {
final int index = i;
// ID110:業務數據的ID,比如用戶ID、訂單編號等等
Message msg = new Message("TopicTest", "TagB", "ID110", ("Hello World " + index).getBytes(RemotingHelper.DEFAULT_CHARSET));
// 發送異步消息
producer.send(msg, new SendCallback() {
/**
* 發送成功的回調函數
* 但會結果有多種狀態,在SendStatus枚舉中定義
* @param sendResult
*/
public void onSuccess(SendResult sendResult) {
System.out.printf("%-10d OK MSG_ID:%s %n", index, sendResult.getMsgId());
cd.countDown();
}
/**
* 發送失敗的回調函數
* @param e
*/
public void onException(Throwable e) {
System.out.printf("%-10d Exception %s %n", index, e);
e.printStackTrace();
cd.countDown();
}
});
}
// 確保消息都發送出去了
cd.await();
// 5:關閉生產者
producer.shutdown();
}
}
單向模式
package com.study.rocketmq.a151_simple;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.io.UnsupportedEncodingException;
/**
* 單向模式
* 一般用來對可靠性有一定要求的消息發送,例如日志系統
* 不同於同步的唯一之處在於:調用的是sendOneway方法,且方法不返回任何值,即調用者不需要關心成功或失敗
*/
public class OnewayProducer {
public static final String NAME_SERVER_ADDR = "192.168.100.242:9876";
public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException {
// 1:創建生產者對象
DefaultMQProducer producer = new DefaultMQProducer("GROUP_TEST");
// 2:指定NameServer地址
producer.setNamesrvAddr(NAME_SERVER_ADDR);
// 3:啟動生產者
producer.start();
// 4:發送消息
for (int i = 0; i < 10; i++) {
Message msg = new Message("TopicTest", "TagC", ("Hello OneWay :" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
producer.sendOneway(msg);
}
System.out.println("消息已發送");
producer.shutdown();
}
}
都是連接NAME_SERVER然后選擇相應的集群
消息的高可用
從內存將消息寫入磁盤,然后將消息同步到從服務器中,數據有同步寫入和異步寫入倆種方式,master之間沒有通信
可以理解為同步就是消息立刻寫入
異步就是消息累計一段時間后寫入
雙寫、同步復制
雙主雙從,四台機器屬於同一個集群