1、JMS介紹和使用場景及基礎編程模型
簡介:講解什么是小寫隊列,JMS的基礎知識和使用場景
1、什么是JMS: Java消息服務(Java Message Service),Java平台中關於面向消息中間件的接口
2、JMS是一種與廠商無關的 API,用來訪問消息收發系統消息,它類似於JDBC(Java Database Connectivity)。這里,JDBC 是可以用來訪問許多不同關系數據庫的 API
3、使用場景:
1)跨平台
2)多語言
3)多項目
4)解耦
5)分布式事務
6)流量控制
7)最終一致性
8)RPC調用
上下游對接,數據源變動->通知下屬
4、概念
JMS提供者:Apache ActiveMQ、RabbitMQ、Kafka、Notify、MetaQ、RocketMQ
JMS生產者(Message Producer)
JMS消費者(Message Consumer)
JMS消息
JMS隊列
JMS主題
JMS消息通常有兩種類型:點對點(Point-to-Point)、發布/訂閱(Publish/Subscribe)
5、編程模型
MQ中需要用的一些類
ConnectionFactory :連接工廠,JMS 用它創建連接
Connection :JMS 客戶端到JMS Provider 的連接
Session: 一個發送或接收消息的線程
Destination :消息的目的地;消息發送給誰.
MessageConsumer / MessageProducer: 消息接收者,消費者
2、ActiveMQ5.x消息隊列基礎介紹和安裝
簡介:介紹ActiveMQ5.x消息隊列基礎特性和本地快速安裝
特點:
1)支持來自Java,C,C ++,C#,Ruby,Perl,Python,PHP的各種跨語言客戶端和協議
2)支持許多高級功能,如消息組,虛擬目標,通配符和復合目標
3) 完全支持JMS 1.1和J2EE 1.4,支持瞬態,持久,事務和XA消息
4) Spring支持,ActiveMQ可以輕松嵌入到Spring應用程序中,並使用Spring的XML配置機制進行配置
5) 支持在流行的J2EE服務器(如TomEE,Geronimo,JBoss,GlassFish和WebLogic)中進行測試
6) 使用JDBC和高性能日志支持非常快速的持久化
...
1、下載地址:http://activemq.apache.org/activemq-5153-release.html
2、快速開始:http://activemq.apache.org/getting-started.html
3、如果我們是32位的機器,就雙擊win32目錄下的activemq.bat,如果是64位機器,則雙擊win64目錄下的activemq.bat
4、bin目錄里面啟動 選擇對應的系統版本和位數,activeMQ start 啟動
5、啟動后訪問路徑http://127.0.0.1:8161/
6、用戶名和密碼默認都是admin
7、官方案例集合
https://github.com/spring-projects/spring-boot/tree/master/spring-boot-samples
面板:
Name:隊列名稱。
Number Of Pending Messages:等待消費的消息個數。
Number Of Consumers:當前連接的消費者數目
Messages Enqueued:進入隊列的消息總個數,包括出隊列的和待消費的,這個數量只增不減。
Messages Dequeued:已經消費的消息數量。
3、SpringBoot2.x整合ActiveMQ實戰之點對點消息(p2p)
簡介:SpringBoot2.x整合ActiveMQ實戰之點對點消息
2、加入依賴
<!-- 整合消息隊列ActiveMQ -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<!-- 如果配置線程池則加入 -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
</dependency>
3、application.properties配置文件配置
#整合jms測試,安裝在別的機器,防火牆和端口號記得開放
spring.activemq.broker-url=tcp://127.0.0.1:61616
#集群配置
#spring.activemq.broker-url=failover:(tcp://localhost:61616,tcp://localhost:61617)
spring.activemq.user=admin
spring.activemq.password=admin
#下列配置要增加依賴
#開啟連接池,最大連接數為100(自定義)
spring.activemq.pool.enabled=true
spring.activemq.pool.max-connections=100
4、springboot啟動類 @EnableJms,開啟支持jms
5、模擬請求
localhost:8080/api/v1/order?msg=12312321321312
6、消費者:實時監聽對應的隊列
@JmsListener(destination = "order.queue")
4、SpringBoot整合ActiveMQ實戰之發布訂閱模式(pub/sub)
簡介:SpringBoot整合ActiveMQ實戰之發布訂閱模式(pub/sub),及同時支持點對點和發布訂閱模型
1、需要加入配置文件,支持發布訂閱模型,默認只支持點對點
#default point to point
spring.jms.pub-sub-domain=true
注意點:
1、默認消費者並不會消費訂閱發布類型的消息,這是由於springboot默認采用的是p2p模式進行消息的監聽
修改配置:spring.jms.pub-sub-domain=true
2、@JmsListener如果不指定獨立的containerFactory的話是只能消費queue消息
修改訂閱者container:containerFactory="jmsListenerContainerTopic"
//需要給topic定義獨立的JmsListenerContainer
@Bean
public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ConnectionFactory activeMQConnectionFactory) {
DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
bean.setPubSubDomain(true);
bean.setConnectionFactory(activeMQConnectionFactory);
return bean;
}
在配置文件里面,注釋掉 #spring.jms.pub-sub-domain=true
5、RocketMQ4.x消息隊列介紹
簡介:阿里開源消息隊列 RocketMQ4.x介紹和新概念講解
1、Apache RocketMQ作為阿里開源的一款高性能、高吞吐量的分布式消息中間件
2、特點
1)在高壓下1毫秒內響應延遲超過99.6%。
2)適合金融類業務,高可用性跟蹤和審計功能。
3)支持發布訂閱模型,和點對點
4)支持拉pull和推push兩種消息模式
5)單一隊列百萬消息
6)支持單master節點,多master節點,多master多slave節點
...
3、概念
Producer:消息生產者
Producer Group:消息生產者組,發送同類消息的一個消息生產組
Consumer:消費者
Consumer Group:消費同個消息的多個實例
Tag:標簽,子主題(二級分類),用於區分同一個主題下的不同業務的消息
Topic:主題
Message:消息
Broker:MQ程序,接收生產的消息,提供給消費者消費的程序
Name Server:給生產和消費者提供路由信息,提供輕量級的服務發現和路由
3、官網地址:http://rocketmq.apache.org/
學習資源:
1)http://jm.taobao.org/2017/01/12/rocketmq-quick-start-in-10-minutes/
2)https://www.jianshu.com/p/453c6e7ff81c
6、RocketMQ4.x本地快速部署
簡介:RocketMQ4.x本地快速部署
1、安裝前提條件(推薦)
64bit OS, Linux/Unix/Mac
64bit JDK 1.8+;
2、快速開始 http://rocketmq.apache.org/docs/quick-start/
下載安裝包:https://www.apache.org/dyn/closer.cgi?path=rocketmq/4.2.0/rocketmq-all-4.2.0-bin-release.zip
路徑:/Users/jack/Desktop/person/springboot/資料/第13章/第5課/rocketmq-all-4.2.0-bin-release/bin
3、解壓壓縮包
1)進入bin目錄,啟動namesrv
nohup sh mqnamesrv &
2) 查看日志 tail -f nohup.out
結尾:The Name Server boot success. serializeType=JSON 表示啟動成功
3、啟動broker
nohup sh mqbroker -n 127.0.0.1:9876 &
4)、關閉nameserver broker執行的命令
sh mqshutdown namesrv
sh mqshutdown broker
7、RoekerMQ4.x可視化控制台講解
簡介:RoekerMQ4.x可視化控制台講解
1、下載 https://github.com/apache/rocketmq-externals
2、編譯打包 mvn clean package -Dmaven.test.skip=true
3、target目錄 通過java -jar的方式運行
4、無法連接獲取broker信息
1)修改配置文件,名稱路由地址為 namesrvAddr,例如我本機為
2)src/main/resources/application.properties
rocketmq.config.namesrvAddr=192.168.0.101:9876
5、默認端口 localhost:8080
6、注意:
在阿里雲,騰訊雲或者虛擬機,記得檢查端口號和防火牆是否啟動
8、Springboot2.x整合RocketMQ4.x實戰上集
簡介:Springboot2.x整合RocketMQ4.x實戰,加入相關依賴,開發生產者代碼
啟動nameser和broker
1、加入相關依賴
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>${rocketmq.version}</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-common</artifactId>
<version>${rocketmq.version}</version>
</dependency>
2、application.properties加入配置文件
# 消費者的組名
apache.rocketmq.consumer.PushConsumer=orderConsumer
# 生產者的組名
apache.rocketmq.producer.producerGroup=Producer
# NameServer地址
apache.rocketmq.namesrvAddr=127.0.0.1:9876
3、開發MsgProducer
/**
* 生產者的組名
*/
@Value("${apache.rocketmq.producer.producerGroup}")
private String producerGroup;
/**
* NameServer 地址
*/
@Value("${apache.rocketmq.namesrvAddr}")
private String namesrvAddr;
private DefaultMQProducer producer ;
public DefaultMQProducer getProducer(){
return this.producer;
}
@PostConstruct
public void defaultMQProducer() {
//生產者的組名
producer = new DefaultMQProducer(producerGroup);
//指定NameServer地址,多個地址以 ; 隔開
//如 producer.setNamesrvAddr("192.168.100.141:9876;192.168.100.142:9876;192.168.100.149:9876");
producer.setNamesrvAddr(namesrvAddr);
producer.setVipChannelEnabled(false);
try {
/**
* Producer對象在使用之前必須要調用start初始化,只能初始化一次
*/
producer.start();
} catch (Exception e) {
e.printStackTrace();
}
// producer.shutdown(); 一般在應用上下文,關閉的時候進行關閉,用上下文監聽器
}
9、Springboot2.x整合RocketMQ4.x實戰下集
簡介:Springboot2.x整合RocketMQ4.x實戰,開發消費者代碼,常見問題處理
1、創建消費者
問題:
1、Caused by: org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to <172.17.42.1:10911> failed
2、com.alibaba.rocketmq.client.exception.MQClientException: Send [1] times, still failed, cost [1647]ms, Topic: TopicTest1, BrokersSent: [broker-a, null, null]
3、org.apache.rocketmq.client.exception.MQClientException: Send [3] times, still failed, cost [497]ms, Topic: TopicTest, BrokersSent: [chenyaowudeMacBook-Air.local, chenyaowudeMacBook-Air.local, chenyaowudeMacBook-Air.local]
解決:多網卡問題處理
1、設置producer: producer.setVipChannelEnabled(false);
2、編輯ROCKETMQ 配置文件:broker.conf(下列ip為自己的ip)
namesrvAddr = 192.168.0.101:9876
brokerIP1 = 192.168.0.101
4、DESC: service not available now, maybe disk full, CL:
解決:修改啟動腳本runbroker.sh,在里面增加一句話即可:
JAVA_OPT="${JAVA_OPT} -Drocketmq.broker.diskSpaceWarningLevelRatio=0.98"
(磁盤保護的百分比設置成98%,只有磁盤空間使用率達到98%時才拒絕接收producer消息)
常見問題處理:
https://blog.csdn.net/sqzhao/article/details/54834761
https://blog.csdn.net/mayifan0/article/details/67633729
https://blog.csdn.net/a906423355/article/details/78192828
