Rabbitmq的五種模式和案例


 

 

<dependency>  
     <groupId>org.springframework.boot</groupId>  
     <artifactId>spring-boot-starter-amqp</artifactId>  
</dependency>

package rabbitmq;

import org.junit.Test;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;

/**
* @author Administrator
* 消息生產者p將消息放入隊列
* 消費者監聽隊列,如果隊列中有消息,就消費掉,消息被拿走后,自動從隊列刪除
* (隱患,消息可能沒有被消費者正確處理,已經消失了,無法恢復)
* 應用場景:聊天室
*/
public class SimpleTest {
@Test
//模擬生產者將消息放入隊列  
public void send1() throws Exception{
/* 1 創建連接工廠 
        * 2 配置共創config 
        * 3 獲取連接 
        * 4獲取信道 
        * 5 從信道聲明queue 
        * 6 發送消息 
        * 7 釋放資源 
         */
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setVirtualHost("/tb");
factory.setUsername("rabbitmquser");
factory.setPassword("123456");
//從工廠獲取連接  
Connection newConnection = factory.newConnection();
//獲取信道
Channel createChannel = newConnection.createChannel();
//利用channel聲明第一個隊列  
//queue String類型,表示聲明的queue對列的名字  
//durable Boolean類型,表示是否持久化  
//exclusive Boolean類型:當前聲明的queue是否專注;true當前連接創建的  
//任何channle都可以連接這個queue,false,新的channel不可使用  
//autoDelete Boolean類型:在最后連接使用完成后,是否刪除隊列,false  
//arguments Map類型,其他聲明參數  
createChannel.queueDeclare("simple", false, false, false, null);
//發送消息
String msg="helloworld,nihaoa";
//exchange String類型,交換機名稱,簡單模式使用默認交換""  
//routingkey String類型,當前的消息綁定的routingkey,簡單模式下,與隊列同名即可  
//props BasicProperties類型,消息的屬性字段對象,例如BasicProperties  
//可以設置一個deliveryMode的值0 持久化,1 表示不持久化,durable配合使用  
//body byte[] :消息字符串的byte數組  
createChannel.basicPublish("","simple",null, msg.getBytes());
}


//模擬消費端
@Test
public void receive1() throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("172.16.10.132");
factory.setPort(5672);
factory.setVirtualHost("/tb");
factory.setUsername("rabbitmquser");
factory.setPassword("123456");
//從工廠獲取連接
Connection conn=factory.newConnection();
//從連接獲取信道
Channel chan=conn.createChannel();
chan.queueDeclare("simple", false, false, false, null);
//創建一個消費者
QueueingConsumer consumer = new QueueingConsumer(chan);
chan.basicConsume("simple", consumer);
//監聽隊列
while (true) {
//獲取下一個delivery,delivery從隊列獲取消息
Delivery delivery = consumer.nextDelivery();
String msg=new String(delivery.getBody());
System.out.println(msg);

}
}
}

package rabbitmq2;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class ConnectionUtil {
public static Connection getConn(){
try {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setVirtualHost("/tb");
factory.setPort(5672);
factory.setUsername("rabbitmquser");
factory.setPassword("123456");
//從工廠獲取連接
Connection conn = factory.newConnection();
return conn;
} catch (Exception e) {
e.printStackTrace();
}
return null;
}

}

 

 

 

package rabbitmq2;

import org.junit.Test;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;

/**
* @author Administrator
* work模式
*生產者將消息放入隊列
*多個消費者同時監聽同一個隊列,消息如何被消費?
*C1,C2共同爭搶當前消息隊列的內容,誰先拿到消息,誰來負責消費
*應用場景:紅包;大型項目中的資源調度過程(直接由最空閑的系統爭搶到資源處理任務)
*/
public class WorkTest {
@Test
public void send1() throws Exception{
Connection conn = ConnectionUtil.getConn();
Channel channel = conn.createChannel();
//聲明隊列
channel.queueDeclare("work", false, false, false, null);
for(int i=0;i<100;i++){
String msg = "1712,hello:"+i+"message";
channel.basicPublish("","work",null, msg.getBytes());
System.out.println("第"+i+"條信息已經發送");
}
channel.close();
conn.close();
}
@Test
public void receive1() throws Exception{
//獲取連接,獲取信道
Connection connection = ConnectionUtil.getConn();
Channel channel = connection.createChannel();
channel.queueDeclare("work", false, false, false, null);
//同一時刻服務器只發送一條消息給同一消費者,消費者空閑,才發送一條
channel.basicQos(1);
//定義消費者
QueueingConsumer consumer = new QueueingConsumer(channel);
//綁定隊列和消費者的關系 queue
//autoAck:消息被消費后,是否自動確認回執,如果false,不自動需要手動在
//callback完成消息消費后進行回執確認,channel.ack,channel.nack
//chan.basicConsume(queue, autoAck, callback)
channel.basicConsume("work",false, consumer);
//監聽
while (true) {
Delivery delivery = consumer.nextDelivery();
byte[] result = delivery.getBody();
String msg = new String(result);
System.out.println("接收到:"+msg);
Thread.sleep(50);
//返回服務器,回執
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}

@Test
public void receive2() throws Exception{
//獲取連接,獲取信道
Connection connection = ConnectionUtil.getConn();
Channel channel = connection.createChannel();
channel.queueDeclare("work", false, false, false, null);
//同一時刻服務器只發送一條消息給同一消費者,消費者空閑,才發送一條
channel.basicQos(1);
//定義消費者
QueueingConsumer consumer = new QueueingConsumer(channel);
//綁定隊列和消費者的關系 queue
//autoAck:消息被消費后,是否自動確認回執,如果false,不自動需要手動在
//callback完成消息消費后進行回執確認,channel.ack,channel.nack
//chan.basicConsume(queue, autoAck, callback)
channel.basicConsume("work",false, consumer);
//監聽
while (true) {
Delivery delivery = consumer.nextDelivery();
byte[] result = delivery.getBody();
String msg = new String(result);
System.out.println("接收到:"+msg);
Thread.sleep(50);
//返回服務器,回執
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}

 

 

 

 

package rabbitmq3;

import org.junit.Test;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;

/**
* @author Administrator
*生產者將消息交給交換機
*有交換機根據發布訂閱的模式設定將消息同步到所有的綁定隊列中;
*后端的消費者都能拿到消息
*應用場景:郵件群發,群聊天,廣告
*/
public class FanoutTest {

//交換機,有類型,發布訂閱:fanout
//路由模式:direct
//主題模式:topic
@Test
public void send() throws Exception{
//獲取連接
Connection conn = ConnectionUtil.getConn();
Channel channel = conn.createChannel();
//聲明交換機
//參數意義,1交換機名稱2類型:fanout,direct,topic
channel.exchangeDeclare("fanoutEx", "fanout");
//發送消息
for(int i=0;i<100;i++){
String msg="1712 hello:"+i+"msg";
channel.basicPublish("fanoutEx","",null, msg.getBytes());
System.out.println("第"+i+"條信息已經發送");
}
}

@Test
public void receive1() throws Exception{
//獲取連接
Connection conn = ConnectionUtil.getConn();
Channel channel = conn.createChannel();
//聲明隊列
channel.queueDeclare("fanout01", false, false, false, null);
//聲明交換機
channel.exchangeDeclare("fanoutEx", "fanout");
channel.basicQos(1);
//綁定隊列到交換機
//參數 1 隊列名稱,2 交換機名稱 3 路由key
channel.queueBind("fanout01","fanoutEx","");
//定義消費者
QueueingConsumer consumer=new QueueingConsumer(channel);
//消費者與隊列綁定
channel.basicConsume("fanout01",false, consumer);
while(true){
Delivery delivery= consumer.nextDelivery();
System.out.println("一號消費者接收到"+new String(delivery.getBody()));
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
}

}
@Test
public void receive2() throws Exception{
//獲取連接
Connection conn = ConnectionUtil.getConn();
Channel chan = conn.createChannel();
//聲明隊列
chan.queueDeclare("fanout02",false,false,false,null);
//聲明交換機
chan.exchangeDeclare("fanoutEx","fanout");
//綁定隊列到交換機
//參數 1 隊列名稱,2 交換機名稱 3 路由key
chan.queueBind("fanout02","fanoutEx","");
chan.basicQos(1);
//定義消費者
QueueingConsumer consumer=new QueueingConsumer(chan);
//消費者與隊列綁定
chan.basicConsume("fanout02",false, consumer);
while(true){
Delivery delivery= consumer.nextDelivery();
System.out.println("二號消費者接收到"+new String(delivery.getBody()));
chan.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
}
}
}

 

 

 

package rabbitmq4;

import org.junit.Test;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;

/**
* @author Administrator
*生產者發送消息到交換機,同時綁定一個路由Key,交換機根據路由key對下游綁定的隊列進行路
*由key的判斷,滿足路由key的隊列才會接收到消息,消費者消費消息
*應用場景: 項目中的error報錯
*/
public class RoutingTopicTest {
@Test
public void routingSend() throws Exception{
//獲取連接
Connection conn = ConnectionUtil.getConn();
Channel chan = conn.createChannel();
//聲明交換機
//參數意義,1 交換機名稱,2 類型:fanout,direct,topic
chan.exchangeDeclare("directEx","direct");
//發送消息
String msg="路由模式的消息";
chan.basicPublish("directEx","jt1712",null, msg.getBytes());
}

@Test
public void routingRec01() throws Exception{
System.out.println("一號消費者等待接收消息");
//獲取連接
Connection conn = ConnectionUtil.getConn();
Channel chan = conn.createChannel();
//聲明隊列
chan.queueDeclare("direct01", false, false, false, null);
//聲明交換機
chan.exchangeDeclare("directEx","direct");
//綁定隊列到交換機
//參數 1 隊列名稱,2 交換機名稱 3 路由key
chan.queueBind("direct01", "directEx", "jt1712");
chan.basicQos(1);
//定義消費者
QueueingConsumer consumer=new QueueingConsumer(chan);
//消費者與隊列綁定
chan.basicConsume("direct01",false, consumer);
while(true){
Delivery delivery= consumer.nextDelivery();
System.out.println("一號消費者接收到"+new String(delivery.getBody()));
chan.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
}
}
@Test
public void routingRec02() throws Exception{
System.out.println("二號消費者等待接收消息");
//獲取連接
Connection conn = ConnectionUtil.getConn();
Channel chan = conn.createChannel();
//聲明隊列
chan.queueDeclare("direct02", false, false, false, null);
//聲明交換機
chan.exchangeDeclare("directEx","direct");
//綁定隊列到交換機
//參數 1 隊列名稱,2 交換機名稱 3 路由key
chan.queueBind("direct02", "directEx", "jt1711");
chan.basicQos(1);
//定義消費者
QueueingConsumer consumer=new QueueingConsumer(chan);
//消費者與隊列綁定
chan.basicConsume("direct02",false, consumer);
while(true){
Delivery delivery= consumer.nextDelivery();
System.out.println("二號消費者接收到"+new String(delivery.getBody()));
chan.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
}
}
}

 

 

 

 

package rabbitmq5;

import org.junit.Test;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;

/**
* @author Administrator
* *號代表單個詞語
* #代表多個詞語
* 其他的內容與routing路由模式一致
*/
public class RoutingTopicTest {

@Test
public void topicSend() throws Exception {
// 獲取連接
Connection conn = ConnectionUtil.getConn();
Channel chan = conn.createChannel();
// 聲明交換機
// 參數意義,1 交換機名稱,2 類型:fanout,direct,topic
chan.exchangeDeclare("topicEx", "topic");
// 發送消息
String msg = "主題模式的消息";
chan.basicPublish("topicEx", "jt1712.add.update", null, msg.getBytes());
}

@Test
public void routingRec02() throws Exception{
System.out.println("二號消費者等待接收消息");
//獲取連接
Connection conn = ConnectionUtil.getConn();
Channel chan = conn.createChannel();
//聲明隊列
chan.queueDeclare("direct02",false,false,false, null);
//聲明交換機
chan.exchangeDeclare("directEx","direct");
//綁定隊列到交換機
//參數 1 隊列名稱,2 交換機名稱 3 路由key
chan.queueBind("direct02","directEx","jt1711");
chan.basicQos(1);
//定義消費者
QueueingConsumer consumer=new QueueingConsumer(chan);
//消費者與隊列綁定
chan.basicConsume("direct02",false, consumer);
while(true){
Delivery delivery= consumer.nextDelivery();
System.out.println("二號消費者接收到"+new String(delivery.getBody()));
chan.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
}
}
@Test
public void topicRec1() throws Exception{
System.out.println("一號消費者等待接收消息");
//獲取連接
Connection conn = ConnectionUtil.getConn();
Channel chan = conn.createChannel();
//聲明隊列
chan.queueDeclare("topic01",false,false,false, null);
//聲明交換機
chan.exchangeDeclare("topicEx", "topic");
//綁定隊列到交換機
//參數 1 隊列名稱,2 交換機名稱 3 路由key
chan.queueBind("topic01", "topicEx", "jt1712");
chan.basicQos(1);
//定義消費者
QueueingConsumer consumer=new QueueingConsumer(chan);
//消費者與隊列綁定
chan.basicConsume("topic01",false, consumer);
while(true){
Delivery delivery= consumer.nextDelivery();
System.out.println("一號消費者接收到"+new String(delivery.getBody()));
chan.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
}
}
@Test
public void topicRec2() throws Exception{
System.out.println("二號消費者等待接收消息");
//獲取連接
Connection conn = ConnectionUtil.getConn();
Channel chan = conn.createChannel();
//聲明隊列
chan.queueDeclare("topic02",false,false,false, null);
//聲明交換機
chan.exchangeDeclare("topicEx", "topic");
//綁定隊列到交換機
//參數 1 隊列名稱,2 交換機名稱 3 路由key
chan.queueBind("topic02", "topicEx", "jt1712.#");
chan.basicQos(1);
//定義消費者
QueueingConsumer consumer=new QueueingConsumer(chan);
//消費者與隊列綁定
chan.basicConsume("topic02",false, consumer);
while(true){
Delivery delivery= consumer.nextDelivery();
System.out.println("二號消費者接收到"+new String(delivery.getBody()));
chan.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
}
}
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM