version 2.0 【更新於 2020.03.20】
- 本次更新重點,主從同步!!!(很多小伙伴出現主從不同步的問題)
- 主從同步關注點(詳見配置說明):
- 1.
brokerRole - 2.
brokerIP2 - 3. 重啟程序順序
- 1.
- 官方壓縮包由
tar.gz改為zip - bin目錄下多了一個文件夾,從而
grep命令參數多了--exclude-dir - 注意
nameserver和master以及slave的區別 - 注意重啟方式,不要使用
kill -9 nameserver集群、broker(單master/單slave;多master/多slave;單master/多slave)
下載&安裝
下載地址
http://rocketmq.apache.org/docs/quick-start/解壓
tar -zxvf rocketmq-all-4.4.0-bin-release.tar.gz解壓
unzip rocketmq-all-4.7.0-bin-release.zip【version 2.0】進入HOME目錄
cd rocketmq-all-4.4.0-bin-release開啟端口
9876/987610909/10912驗證netstat -nltp
修改配置
啟動腳本內存配置
cd bin
grep "Xmx" *
grep 'Xmx' * --exclude-dir="*"【version 2.0】
grep 'MaxDirectMemorySize' * --exclude-dir="*"【version 2.0】
vim runbroker.shrunserver.shvim tools.sh將
JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
JAVA_OPT="${JAVA_OPT} -XX:MaxDirectMemorySize=15g"改為
JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
JAVA_OPT="${JAVA_OPT} -XX:MaxDirectMemorySize=1g"Broker配置 【master】
vim conf/broker.confbrokerClusterName = DefaultCluster brokerName = broker-a # 0 表示master,大於0 表示slave brokerId = 0 # 服務節點/注冊中心 namesrvAddr=注冊中心IP:9876 # 多服務節點/注冊中心,使用分號【;】 #namesrvAddr=注冊中心IP:9876;注冊中心IP:9876;注冊中心IP:9876 # Broker服務地址 brokerIP1=當前機器IP # BrokerHAIP地址,供slave同步消息的地址 brokerIP2=當前機器IP # 刪除時間【時】此處表示凌晨4點 deleteWhen = 04 # 數據存儲時間【時】 此處表示48小時 fileReservedTime = 48 # SYNC_MASTER(同步雙寫) 、ASYNC_MASTER(異步復制) 、SLAVE brokerRole = SYNC_MASTER # SYNC_FLUSH(同步刷盤) 和ASYNC_FLUSH(異步刷盤),寫磁盤 flushDiskType = SYNC_FLUSH # 是否自動創建topic 線上改為false 測試true autoCreateTopicEnable=true
Broker配置 【slave】
vim conf/broker.confbrokerClusterName = DefaultCluster brokerName = broker-a # 0 表示master,大於0 表示slave brokerId = 1 # 服務節點/注冊中心 namesrvAddr=注冊中心IP:9876 # 多服務節點/注冊中心,使用分號【;】 #namesrvAddr=注冊中心IP:9876;注冊中心IP:9876;注冊中心IP:9876 # Broker服務地址 brokerIP1=當前機器IP # BrokerHAIP地址,供slave同步消息的地址 brokerIP2=當前機器IP # 刪除時間【時】此處表示凌晨4點 deleteWhen = 04 # 數據存儲時間【時】 此處表示48小時 fileReservedTime = 48 # SYNC_MASTER(同步雙寫) 、ASYNC_MASTER(異步復制) 、SLAVE brokerRole = SLAVE # SYNC_FLUSH(同步刷盤) 和ASYNC_FLUSH(異步刷盤),寫磁盤 flushDiskType = SYNC_FLUSH # 是否自動創建topic 線上改為false 測試true autoCreateTopicEnable=true
啟動
【NamesrvStartup】啟動namesrv
nohup bin/mqnamesrv -n 注冊中心IP:9876 > mqnamesrv.log 2>&1 &檢查端口監聽是否為
0.0.0.0:9876/注冊中心IP:9876命令netstat -anpt | grep 9876【BrokerStartup】啟動master節點
nohup sh bin/mqbroker -n 注冊中心IP:9876 -c conf/broker.conf > broker.log 2>&1 &【BrokerStartup】啟動slave節點
nohup sh bin/mqbroker -n 注冊中心IP:9876 -c conf/broker.conf > broker.log 2>&1 &查看是否注冊成功(集群信息)
bin/mqadmin clusterList -n 注冊中心IP:9876
停止 不要使用kill -9!!!
停止 slave
bin/mqshutdown broker停止 master
bin/mqshutdown broker停止 namesrv
bin/mqshutdown namesrv
常用命令
cd rocketmq-all-4.4.0-bin-release/- 注冊中心機器上
---集群相關
查詢集群信息 bin/mqadmin clusterList -n localhost:9876 打印Broker配置 bin/mqbroker -m -n localhost:9876 更新Broker配置 bin/mqadmin updateBrokerConfig -c DefaultCluster -k autoCreateTopicEnable -v false -n localhost:9876 查看Broker統計信息 bin/mqadmin brokerstatus –n localhost:9876 –b locahost:10909
---訂閱組相關
創建訂閱組 bin/mqadmin updateSubGroup -n localhost:9876 -c ClusterName -g GroupName 列出消費組 bin/mqadmin consumerProgress -n localhost:9876 查看消費組IP bin/mqadmin consumerStatus -g GroupName -n localhost:9876 查看消費組數據堆積 bin/mqadmin consumerProgress -n localhost:9876 -g GroupName 刪除訂閱組 bin/mqadmin deleteSubGroup -n localhost:9876 -c ClusterName -g GroupName
---Topic相關
創建Topic bin/mqadmin updateTopic -c ClusterName -n localhost:9876 -t TopicName Topic列表 bin/mqadmin topicList -n localhost:9876 發送Topic消息測試 bin/mqadmin checkMsgSendRT -n localhost:9876 -t TopicName -s 1024 打印Topic消息 bin/mqadmin printMsg -n localhost:9876 -t TopicName Topic詳情統計 bin/mqadmin topicstatus -n localhost:9876 -t TopicName 獲取Topic的cluster bin/mqadmin topicClusterList -n localhost:9876 -t TopicName 刪除Topic bin/mqadmin deleteTopic -n localhost:9876 -t TopicName -c ClusterName 查看Topic路由 bin/mqadmin topicRoute -n localhost:9876 -t TopicName 查看Topic狀態 bin/mqadmin topicStatus -n localhost:9876 -t TopicName 根據ID查詢消息 bin/mqadmin queryMsgById -i msgId -n localhost:9876 根據偏移量查詢消息 bin/mqadmin queryMsgByOffset -b BrokerName -i 3 -n localhost:9876 -o 299 -t TopicName
broker配置說明
基礎配置
配置 描述 默認值 例子 namesrvAddr nameServer地址,如果nameserver是多台集群的話,用分號分割 無 namesrvAddr=10.1.219.75:9876;10.1.219.76:9876 brokerClusterName 所屬集群名字。Cluster 的地址,如果集群機器數比較多,可以分成多個Cluster ,每個Cluster 供一個業務群使用 無 brokerClusterName=rocketmq-cluster brokerName Broker 的名稱, Master 和Slave 通過使用相同的Broker 名稱來表明相互關系,以說明某個Slave 是哪個Master 的Slave brokerName=broker-a brokerId 一個Master Barker 可以有多個Slave, 0 表示Master ,大於0 表示不同的 Slave 的ID brokerId=0 fileReservedTime 在磁盤上保存消息的時長,單位是小時,自動刪除超時的消息 fileReservedTime=48 deleteWhen 與fileReservedTim巳參數呼應,表明在幾點做消息刪除動作,默認值04 表示凌晨4 點 deleteWhen=04 brokerRole brokerRole 有3 種: SYNCMASTER(同步雙寫) 、ASYNCMASTER(異步復制) 、SLAVE 。關鍵詞SYNC 和ASYNC 表示Master 和Slave 之間同步消息的機制, SYNC 的意思是當Slave 和Master 消息同步完成后,再返回發送成功的狀態 brokerRole=SYNC_MASTER flushDiskType flushDiskType 表示刷盤策略,分為SYNCFLUSH 和ASYNCFLUSH兩種,分別代表同步刷盤和異步刷盤。同步刷盤情況下,消息真正寫人磁盤后再返回成功狀態;異步刷盤情況下,消息寫人page_cache 后就返回成功狀態 flushDiskType=ASYNC_FLUSH listenPort Broker 監聽的端口號,如果一台機器上啟動了多個Broker , 則要設置不同的端口號,避免沖突 listenPort=10911 storePathRootDir 存儲消息以及一些配置信息的根目錄 storePathRootDir=/app/custom/data/rocketmq/store-a
進階配置
配置 描述 默認值 例子 autoCreateTopicEnable 是否允許 Broker 自動創建Topic,建議線下開啟,線上關閉 autoCreateTopicEnable=true defaultTopicQueueNums 在發送消息時,自動創建服務器不存在的topic,默認創建的隊列數。 defaultTopicQueueNums=4 autoCreateSubscriptionGroup 是否允許 Broker 自動創建訂閱組,建議線下開啟,線上關閉 autoCreateTopicEnable=true mapedFileSizeCommitLog commitLog每個文件的大小,默認1G 1G mapedFileSizeCommitLog=1073741824 mapedFileSizeConsumeQueue ConsumeQueue每個文件默認存30W條,根據業務情況調整 mapedFileSizeConsumeQueue=300000
存儲配置
配置 描述 默認值 例子 storePathRootDir 存儲消息以及一些配置信息的根目錄 storePathRootDir=/app/custom/data/rocketmq/store-a storePathCommitLog commitLog 存儲路徑 storePathCommitLog=/data/rocketmq/store/commitlog storePathConsumeQueue 消費隊列存儲路徑存儲路徑 storePathConsumeQueue=/data/rocketmq/store/consumequeue storePathIndex 消息索引存儲路徑 storePathIndex=/data/rocketmq/store/index storeCheckpoint checkpoint 文件存儲路徑 storeCheckpoint=/data/rocketmq/store/checkpoint abortFile abort 文件存儲路徑 abortFile=/data/rocketmq/store/abort
JAVA示例
1. pom.xml
<dependencies> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.5.0</version> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-common</artifactId> <version>4.5.0</version> <exclusions> <exclusion> <groupId>io.netty</groupId> <artifactId>netty-tcnative</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.47</version> </dependency> <dependency> <groupId>commons-codec</groupId> <artifactId>commons-codec</artifactId> <version>1.10</version> </dependency> </dependencies>
2. Producer 生產者
public class Producer { public static void main(String[] args) throws Exception { //Instantiate with a producer group name. DefaultMQProducer producer = new DefaultMQProducer("GroupName"); // Specify name server addresses. producer.setNamesrvAddr("IP:9876"); //Launch the instance. producer.start(); for (int i = 0; i < 100; i++) { //Create a message instance, specifying topic, tag and message body. Message msg = new Message("TopicName" /* Topic */, "TagA" /* Tag */, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */ ); //Call send message to deliver message to one of brokers. SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } //Shut down once the producer instance is not longer in use. producer.shutdown(); } }
3. Consumer 消費者
public class Consumer { public static void main(String[] args) { String topicName = "TopicName"; DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("GroupName"); consumer.setNamesrvAddr("IP:9876"); try { consumer.subscribe(topicName, "*"); /** * 設置Consumer第一次啟動是從隊列頭部開始消費還是隊列尾部開始消費<br> * 如果非第一次啟動,那么按照上次消費的位置繼續消費 */ consumer.setConsumeFromWhere( ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); /** * 如果是順序消息,這邊的監聽就要使用MessageListenerOrderly監聽 * 並且,返回結果也要使用ConsumeOrderlyStatus */ consumer.registerMessageListener(new MessageListenerOrderly() { @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { //設置自動提交,如果不設置自動提交就算返回SUCCESS,消費者關閉重啟 還是會重復消費的 context.setAutoCommit(true); try { for (MessageExt msg : msgs) { String recString = null; try { recString = new String(msg.getBody(), "UTF-8"); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } System.out.println(recString); } catch (Exception e) { e.printStackTrace(); //如果出現異常,消費失敗,掛起消費隊列一會會,稍后繼續消費 return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; } //消費成功 return ConsumeOrderlyStatus.SUCCESS; } }); consumer.start(); } catch (Exception e) { e.printStackTrace(); } } }
4. 打包成jar的插件[可選]
<build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <!--jdk 版本--> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <!--全限定名--> <mainClass>com.package.Consumer</mainClass> </transformer> </transformers> <artifactSet> </artifactSet> </configuration> </execution> </executions> </plugin> </plugins> </build>
個人微信,有什么建議、意見或補充,歡迎及時溝通!!!(添加時注明“博客園”,謝謝)
