消息隊列介紹和SpringBoot2.x整合RockketMQ、ActiveMQ 9節課


1、JMS介紹和使用場景及基礎編程模型
     簡介:講解什么是小寫隊列,JMS的基礎知識和使用場景
     1、什么是JMS: Java消息服務(Java Message Service),Java平台中關於面向消息中間件的接口

    2、JMS是一種與廠商無關的 API,用來訪問消息收發系統消息,它類似於JDBC(Java Database Connectivity)。這里,JDBC 是可以用來訪問許多不同關系數據庫的 API

    3、使用場景:
         1)跨平台
         2)多語言
         3)多項目
         4)解耦
         5)分布式事務

        6)流量控制
         7)最終一致性
         8)RPC調用
             上下游對接,數據源變動->通知下屬
     4、概念   
         JMS提供者:Apache ActiveMQ、RabbitMQ、Kafka、Notify、MetaQ、RocketMQ
         JMS生產者(Message Producer)
         JMS消費者(Message Consumer)
         JMS消息
         JMS隊列
         JMS主題

        JMS消息通常有兩種類型:點對點(Point-to-Point)、發布/訂閱(Publish/Subscribe)
    
     5、編程模型
         MQ中需要用的一些類
         ConnectionFactory :連接工廠,JMS 用它創建連接
         Connection :JMS 客戶端到JMS Provider 的連接
         Session: 一個發送或接收消息的線程
         Destination :消息的目的地;消息發送給誰.
         MessageConsumer / MessageProducer: 消息接收者,消費者





2、ActiveMQ5.x消息隊列基礎介紹和安裝
    
     簡介:介紹ActiveMQ5.x消息隊列基礎特性和本地快速安裝
         特點:
             1)支持來自Java,C,C ++,C#,Ruby,Perl,Python,PHP的各種跨語言客戶端和協議
             2)支持許多高級功能,如消息組,虛擬目標,通配符和復合目標
             3) 完全支持JMS 1.1和J2EE 1.4,支持瞬態,持久,事務和XA消息
             4) Spring支持,ActiveMQ可以輕松嵌入到Spring應用程序中,並使用Spring的XML配置機制進行配置
             5) 支持在流行的J2EE服務器(如TomEE,Geronimo,JBoss,GlassFish和WebLogic)中進行測試
             6) 使用JDBC和高性能日志支持非常快速的持久化
             ...

        1、下載地址:http://activemq.apache.org/activemq-5153-release.html
         2、快速開始:http://activemq.apache.org/getting-started.html
         3、如果我們是32位的機器,就雙擊win32目錄下的activemq.bat,如果是64位機器,則雙擊win64目錄下的activemq.bat
         4、bin目錄里面啟動 選擇對應的系統版本和位數,activeMQ start 啟動
         5、啟動后訪問路徑http://127.0.0.1:8161/

        6、用戶名和密碼默認都是admin
         7、官方案例集合
             https://github.com/spring-projects/spring-boot/tree/master/spring-boot-samples
         面板:   
         Name:隊列名稱。
         Number Of Pending Messages:等待消費的消息個數。
         Number Of Consumers:當前連接的消費者數目
         Messages Enqueued:進入隊列的消息總個數,包括出隊列的和待消費的,這個數量只增不減。
         Messages Dequeued:已經消費的消息數量。



3、SpringBoot2.x整合ActiveMQ實戰之點對點消息(p2p)
    
     簡介:SpringBoot2.x整合ActiveMQ實戰之點對點消息

    1、官網地址:https://docs.spring.io/spring-boot/docs/2.1.0.BUILD-SNAPSHOT/reference/htmlsingle/#boot-features-activemq

    2、加入依賴
         <!-- 整合消息隊列ActiveMQ -->
         <dependency> 
             <groupId>org.springframework.boot</groupId> 
             <artifactId>spring-boot-starter-activemq</artifactId> 
         </dependency> 
        
         <!-- 如果配置線程池則加入 -->
         <dependency> 
             <groupId>org.apache.activemq</groupId> 
             <artifactId>activemq-pool</artifactId> 
         </dependency>

     3、application.properties配置文件配置
          #整合jms測試,安裝在別的機器,防火牆和端口號記得開放
         spring.activemq.broker-url=tcp://127.0.0.1:61616

        #集群配置
         #spring.activemq.broker-url=failover:(tcp://localhost:61616,tcp://localhost:61617)

        spring.activemq.user=admin
         spring.activemq.password=admin
         #下列配置要增加依賴

        #開啟連接池,最大連接數為100(自定義)
         spring.activemq.pool.enabled=true
         spring.activemq.pool.max-connections=100

      4、springboot啟動類 @EnableJms,開啟支持jms

      5、模擬請求
           localhost:8080/api/v1/order?msg=12312321321312

      6、消費者:實時監聽對應的隊列
           @JmsListener(destination = "order.queue")
          



   


4、SpringBoot整合ActiveMQ實戰之發布訂閱模式(pub/sub)
     簡介:SpringBoot整合ActiveMQ實戰之發布訂閱模式(pub/sub),及同時支持點對點和發布訂閱模型

        1、需要加入配置文件,支持發布訂閱模型,默認只支持點對點
             #default point to point
             spring.jms.pub-sub-domain=true

    注意點:
        
         1、默認消費者並不會消費訂閱發布類型的消息,這是由於springboot默認采用的是p2p模式進行消息的監聽
             修改配置:spring.jms.pub-sub-domain=true
        
         2、@JmsListener如果不指定獨立的containerFactory的話是只能消費queue消息
             修改訂閱者container:containerFactory="jmsListenerContainerTopic"

            //需要給topic定義獨立的JmsListenerContainer
             @Bean
             public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ConnectionFactory activeMQConnectionFactory) {
                 DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
                 bean.setPubSubDomain(true);
                 bean.setConnectionFactory(activeMQConnectionFactory);
                 return bean;
             }

            在配置文件里面,注釋掉 #spring.jms.pub-sub-domain=true




5、RocketMQ4.x消息隊列介紹
     簡介:阿里開源消息隊列 RocketMQ4.x介紹和新概念講解

    1、Apache RocketMQ作為阿里開源的一款高性能、高吞吐量的分布式消息中間件
     2、特點
         1)在高壓下1毫秒內響應延遲超過99.6%。
         2)適合金融類業務,高可用性跟蹤和審計功能。
         3)支持發布訂閱模型,和點對點
         4)支持拉pull和推push兩種消息模式
         5)單一隊列百萬消息
         6)支持單master節點,多master節點,多master多slave節點
         ...
     3、概念
         Producer:消息生產者
         Producer Group:消息生產者組,發送同類消息的一個消息生產組

        Consumer:消費者
         Consumer Group:消費同個消息的多個實例

        Tag:標簽,子主題(二級分類),用於區分同一個主題下的不同業務的消息

        Topic:主題
         Message:消息
         Broker:MQ程序,接收生產的消息,提供給消費者消費的程序
         Name Server:給生產和消費者提供路由信息,提供輕量級的服務發現和路由       

    3、官網地址:http://rocketmq.apache.org/

    學習資源:
         1)http://jm.taobao.org/2017/01/12/rocketmq-quick-start-in-10-minutes/   
         2)https://www.jianshu.com/p/453c6e7ff81c






6、RocketMQ4.x本地快速部署
     簡介:RocketMQ4.x本地快速部署

    1、安裝前提條件(推薦)
         64bit OS, Linux/Unix/Mac
         64bit JDK 1.8+;

    2、快速開始 http://rocketmq.apache.org/docs/quick-start/
        下載安裝包:https://www.apache.org/dyn/closer.cgi?path=rocketmq/4.2.0/rocketmq-all-4.2.0-bin-release.zip

       路徑:/Users/jack/Desktop/person/springboot/資料/第13章/第5課/rocketmq-all-4.2.0-bin-release/bin
    
     3、解壓壓縮包
         1)進入bin目錄,啟動namesrv
              nohup sh mqnamesrv &
        
         2) 查看日志 tail -f nohup.out
         結尾:The Name Server boot success. serializeType=JSON 表示啟動成功
        
         3、啟動broker  
             nohup sh mqbroker -n 127.0.0.1:9876 &

        4)、關閉nameserver broker執行的命令
             sh mqshutdown namesrv
             sh mqshutdown broker




7、RoekerMQ4.x可視化控制台講解
     簡介:RoekerMQ4.x可視化控制台講解

        1、下載 https://github.com/apache/rocketmq-externals
         2、編譯打包  mvn clean package -Dmaven.test.skip=true
         3、target目錄 通過java -jar的方式運行
        
         4、無法連接獲取broker信息
             1)修改配置文件,名稱路由地址為 namesrvAddr,例如我本機為
             2)src/main/resources/application.properties
                 rocketmq.config.namesrvAddr=192.168.0.101:9876
        
         5、默認端口 localhost:8080
        
         6、注意:
             在阿里雲,騰訊雲或者虛擬機,記得檢查端口號和防火牆是否啟動





8、Springboot2.x整合RocketMQ4.x實戰上集
     簡介:Springboot2.x整合RocketMQ4.x實戰,加入相關依賴,開發生產者代碼
    
     啟動nameser和broker

    1、加入相關依賴
         <dependency> 
             <groupId>org.apache.rocketmq</groupId> 
             <artifactId>rocketmq-client</artifactId> 
             <version>${rocketmq.version}</version> 
         </dependency> 
         <dependency> 
             <groupId>org.apache.rocketmq</groupId> 
             <artifactId>rocketmq-common</artifactId> 
             <version>${rocketmq.version}</version> 
         </dependency> 


     2、application.properties加入配置文件       
         # 消費者的組名
         apache.rocketmq.consumer.PushConsumer=orderConsumer
         # 生產者的組名
         apache.rocketmq.producer.producerGroup=Producer
         # NameServer地址
         apache.rocketmq.namesrvAddr=127.0.0.1:9876

    3、開發MsgProducer
          /**
          * 生產者的組名
          */
         @Value("${apache.rocketmq.producer.producerGroup}")
         private String producerGroup;

        /**
          * NameServer 地址
          */
         @Value("${apache.rocketmq.namesrvAddr}")
         private String namesrvAddr;

        private  DefaultMQProducer producer ;

           
         public DefaultMQProducer getProducer(){
             return this.producer;
         }
        
         @PostConstruct
         public void defaultMQProducer() {
             //生產者的組名
             producer = new DefaultMQProducer(producerGroup);
             //指定NameServer地址,多個地址以 ; 隔開
             //如 producer.setNamesrvAddr("192.168.100.141:9876;192.168.100.142:9876;192.168.100.149:9876");
             producer.setNamesrvAddr(namesrvAddr);
             producer.setVipChannelEnabled(false);
            
             try {
                 /**
                  * Producer對象在使用之前必須要調用start初始化,只能初始化一次
                  */
                 producer.start();

            } catch (Exception e) {
                 e.printStackTrace();
             }
            
             // producer.shutdown();  一般在應用上下文,關閉的時候進行關閉,用上下文監聽器

        }



9、Springboot2.x整合RocketMQ4.x實戰下集
     簡介:Springboot2.x整合RocketMQ4.x實戰,開發消費者代碼,常見問題處理

    1、創建消費者


     問題:
         1、Caused by: org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to <172.17.42.1:10911> failed

        2、com.alibaba.rocketmq.client.exception.MQClientException: Send [1] times, still failed, cost [1647]ms, Topic: TopicTest1, BrokersSent: [broker-a, null, null]

        3、org.apache.rocketmq.client.exception.MQClientException: Send [3] times, still failed, cost [497]ms, Topic: TopicTest, BrokersSent: [chenyaowudeMacBook-Air.local,     chenyaowudeMacBook-Air.local, chenyaowudeMacBook-Air.local]
         解決:多網卡問題處理
         1、設置producer:  producer.setVipChannelEnabled(false);
         2、編輯ROCKETMQ 配置文件:broker.conf(下列ip為自己的ip)
             namesrvAddr = 192.168.0.101:9876
             brokerIP1 = 192.168.0.101


        4、DESC: service not available now, maybe disk full, CL:
         解決:修改啟動腳本runbroker.sh,在里面增加一句話即可:       
         JAVA_OPT="${JAVA_OPT} -Drocketmq.broker.diskSpaceWarningLevelRatio=0.98"
         (磁盤保護的百分比設置成98%,只有磁盤空間使用率達到98%時才拒絕接收producer消息)


         常見問題處理:
             https://blog.csdn.net/sqzhao/article/details/54834761
             https://blog.csdn.net/mayifan0/article/details/67633729
             https://blog.csdn.net/a906423355/article/details/78192828


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM