RabbitMQ的簡單模式快速入門與超時異常的處理方法


本文適合JAVA新人,想了解RabbitMQ又不想去看官網文檔的人(英語水看的頭疼(◎﹏◎),但建議有能力還是去看官網文檔)。

消息隊列MQ(一)

MQ全稱為Message Queue,消息隊列是應用程序和應用程序之間的通信方法。

先引入一下常見的通訊方案。

為什么使用MQ

在項目中,可將一些無需即時返回且耗時的操作提取出來,進行異步處理,而這種異步處理的方式大大的節省服務器的請求響應時間,從而提高了系統的吞吐量

 開發中消息隊列通常有如下應用場景:

應用解耦、異步處理(提高系統響應速度)、流量削峰(高峰堆積消息,峰后繼續處理消息)、日志處理(分布式日志,一般使用kafka)、純粹通訊。

 

AMQP 和 JMS

MQ是消息通信的模型;實現MQ的大致有兩種主流方式:AMQP、JMS。

AMQP

AMQP高級消息隊列協議,是一個進程間傳遞異步消息的網絡協議,更准確的說是一種binary wire-level protocol(鏈接協議)。這是其和JMS的本質差別,AMQP不從API層進行限定,而是直接定義網絡交換的數據格式。

JMS

JMS即Java消息服務(JavaMessage Service)應用程序接口,是一個Java平台中關於面向消息中間件(MOM)的API,用於在兩個應用程序之間,或分布式系統中發送消息,進行異步通信。

 AMQP 與 JMS 區別

JMS是定義了統一的接口,來對消息操作進行統一;AMQP是通過規定協議來統一數據交互的格式

JMS限定了必須使用Java語言;AMQP只是協議,不規定實現方式,因此是跨語言的。

JMS規定了兩種消息模式;而AMQP的消息模式更加豐富。

消息隊列產品:目前市面上成熟主流的MQ有Kafka 、RocketMQ、RabbitMQ,本文主要介紹RabbitMQ使用。

使用Erlang(語言)編寫的一個開源的消息隊列,本身支持很多的協議:AMQP,XMPP, SMTP,STOMP,也正是如此,使的它變的非常重量級,更適合於企業級的開發。同時實現了Broker架構,核心思想是生產者不會將消息直接發送給隊列,消息在發送給客戶端時先在中心隊列排隊。對路由(Routing),負載均衡(Load balance)、數據持久化都有很好的支持。多用於進行企業級的ESB整合。

RabbitMQ介紹

RabbitMQ是由erlang語言開發,基於AMQP(Advanced Message Queue 高級消息隊列協議)協議實現的消息隊列,它是一種應用程序之間的通信方法,消息隊列在分布式系統開發中應用非常廣泛。

RabbitMQ官方地址:http://www.rabbitmq.com/

RabbitMQ提供了6種模式:簡單模式,work工作隊列(集群)模式,Publish/Subscribe發布與訂閱(交換機的廣播)模式,Routing(交換機的定向)路由模式,Topics主題(路由靈活)模式,RPC遠程調用模式(遠程調用,不太算MQ;不作介紹);//括號內的是自己的理解方式僅供參考。詳細可以去看官方介紹。

官網對應模式介紹:https://www.rabbitmq.com/getstarted.html

安裝RabbirMQ

兩種方式:windows環境與Linux環境(這里跳過)

我是LinuxCenOS6.7安裝的3.6.10版本

啟動成功參考如下兩張圖

先在WEB頁面管理用戶

角色說明: Tags

1、超級管理員(administrator)

可登陸管理控制台,可查看所有的信息,並且可以對用戶,策略(policy)進行操作。

2、監控者(monitoring)

可登陸管理控制台,同時可以查看rabbitmq節點的相關信息(進程數,內存使用情況,磁盤使用情況等)

3、策略制定者(policymaker) :可登陸管理控制台, 同時可以對policy進行管理。但無法查看節點的相關信息(上圖紅框標識的部分)。

4、普通管理者(management):僅可登陸管理控制台,無法看到節點信息,也無法對策略進行管理。

5、其他 :  無法登陸管理控制台,通常就是普通的生產者和消費者。

Virtual Hosts配置

在RabbitMQ中可以虛擬消息服務器Virtual Host,每個Virtual Hosts相當於一個相對獨立的RabbitMQ服務器,每個VirtualHost之間是相互隔離的。exchange、queue、message不能互通。 相當於mysql的db。Virtual Name一般以/開頭。

添加隊列,這里需要將上下兩張圖結合起來看

需改用戶的密碼

查看默認的交換機

常見的端口

RabbitMQ入門

目標:入門案例將使用RabbitMQ的簡單模式實現通訊過程。

1.創建Maven工程,先在pom.xml添加依賴。

 1 <?xml version="1.0" encoding="UTF-8"?>
 2 <project xmlns="http://maven.apache.org/POM/4.0.0"
 3          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 4          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 5     <modelVersion>4.0.0</modelVersion>
 6     <groupId>com.jxjdemo</groupId>
 7     <artifactId>rabbitmq1_demo</artifactId>
 8     <version>1.0-SNAPSHOT</version>
 9 
10     <dependencies>
11         <dependency> <!--rabbitmq的依賴-->
12             <groupId>com.rabbitmq</groupId>
13             <artifactId>amqp-client</artifactId>
14             <version>5.6.0</version>
15         </dependency>
16     </dependencies>
17 </project>

2.新建生產者類,生產發送消息

 1 package com.jxjdemo.mq.simple;
 2 
 3 import com.rabbitmq.client.Channel;
 4 import com.rabbitmq.client.Connection;
 5 import com.rabbitmq.client.ConnectionFactory;
 6 
 7 public class SimpleProducer {
 8     public static void main(String args[]) throws Exception{
 9 //1、創建鏈接工廠對象-factory=newConnectionFactory()。創建鏈接用
10         ConnectionFactory factory = new ConnectionFactory();
11 
12 //2、設置RabbitMQ服務主機地址,默認localhost-factory.setHost("localhost")
13         factory.setHost("192.168.211.128");
14 //3、設置RabbitMQ服務端口,默認-1-factory.setPort(5672)
15         factory.setPort(5672);
16 //4、設置虛擬主機名字,默認/-factory.setVirtualHost("szitheima")
17          factory.setVirtualHost("shujuku1122");
18 //5、設置用戶連接名,默認guest-factory.setUsername("admin")
19         factory.setUsername("admin");
20 //6、設置鏈接密碼,默認guest-factory.setPassword("admin")
21         factory.setPassword("123456");
22  //      factory.setConnectionTimeout(5000);
23 //        factory.setWorkPoolTimeout(5000);
24 //        factory.setHandshakeTimeout(5000);
25 //7、創建鏈接-connection=factory.newConnection()
26         Connection connection = factory.newConnection(); //報錯,拋異常
27 //8、創建頻道-channel=connection.createChannel()
28         Channel channel = connection.createChannel();
29 //9、聲明隊列-channel.queueDeclare(名稱,是否持久化(true先存硬盤,讀完再刪),是否獨占本連接,是否自動刪除(false讀完再刪),附加參數)
30         channel.queueDeclare("simplequeue", true, false, false, null);
31 //10、創建消息-Stringm=xxx
32         String msg = "這是我們第一次發送 MQ消息";
33 //11、消息發送-channel.basicPublish(交換機[默認DefaultExchage],路由key[簡單模式可以傳遞隊列名稱],消息其它屬性,消息內容)
34         channel.basicPublish("", "simplequeue", null, msg.getBytes("utf-8"));
35 //12、關閉資源-channel.close();connection.close()
36         channel.close();
37        connection.close();
38     }
39 }

執行后發個消息,沒看到異常。

擴展:這里遇到的異常有,時間超時

解決方法一:

發送不成功報錯,就先重啟MQ,在重啟【管理員的方式啟動】IDE,一般都是MQ的問題。

發送消息為空,消息不能有空格。注意庫名字。

解決方法二:

我們安裝系統會給系統起個名字導致:修改后的主機名並沒有在linux系統的hosts文件中,因此解析的時候,無法直接從該文件中獲取,需要多重解析,才能解析該主機名。

 不同的linux版本,這個配置文件也可能不同vim /etc/hosts

 繼續說發送成功的事情。

3.創建消費者,接收消息。

 1 package com.jxjdemo.mq.simple;

 2 
 3 import com.rabbitmq.client.*;  4 
 5 import javax.security.auth.callback.Callback;  6 import java.io.IOException;  7 import java.util.concurrent.TimeoutException;  8 
//這里刪除了文檔注釋
16 public class SimpleConsumer { 17 public static void main(String args[]) throws IOException, TimeoutException { 18 //1、創建鏈接工廠對象-factory=newConnectionFactory() 19 ConnectionFactory factory = new ConnectionFactory(); 20 //2、設置RabbitMQ服務主機地址,默認localhost-factory.setHost("localhost") 21 factory.setHost("192.168.211.128"); 22 //3、設置RabbitMQ服務端口,默認-1-factory.setPort(5672) 23 factory.setPort(5672); 24 //4、設置虛擬主機名字,默認/-factory.setVirtualHost("szitheima") 25 factory.setVirtualHost("shujuku1122"); 26 //5、設置用戶連接名,默認guest-factory.setUsername("admin") 27 factory.setUsername("admin"); 28 //6、設置鏈接密碼,默認guest-factory.setPassword("admin") 29 factory.setPassword("123456"); 30 //7、創建鏈接-connection=factory.newConnection() 31 Connection connection = factory.newConnection(); 32 //8、創建頻道-channel=connection.createChannel() 33 Channel channel = connection.createChannel(); 34 //9、聲明隊列-channel.queueDeclare(名稱,是否持久化,是否獨占本連接,是否自動刪除,附加參數) 35 channel.queueDeclare("simplequeue",true ,false , false,null ); 36 //10接收消息 37 Consumer callback = new DefaultConsumer(channel){ 38 /** 39 * @param consumerTag 消費者標簽,在channel.basicConsume時候可以指定 40 * @param envelope 信封,消息包的內容,可從中獲取消息id,消息routingkey,交換機,消息和重傳標志(收到消息失敗后是否需要重新發送) 41 * @param properties 屬性信息(生產者的發送時指定) 42 * @param body 消息內容 43 * @throws IOException 44 */ 45 @Override 46 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { 47 Long deliveryTag = envelope.getDeliveryTag(); //消息ID 48 String exchange = envelope.getExchange(); 49 String routingKey = envelope.getRoutingKey(); //路由KEY 50 //消息內容 51 String msg = new String(body,"utf-8"); 52 System.out.println( 53 "routingKey:" + routingKey + 54 "routingKey:" + routingKey + 55 ",exchange:" + exchange + 56 ",deliveryTag:" + deliveryTag + 57 ",message:" + msg); 58 } 59 }; 60 channel.basicConsume("simplequeue", callback); 61 //不關閉,繼續接受消息 62 } 63 }

執行后看到一下結果

當你的代碼運行到這里,那么恭喜你入門成功。

這次暫時先到這里結束。欲知其他4種模式且看下回慢慢分解。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM