rabbitmq簡單實例


 

JMS組件:activemq(慢)
AMQP組件(advance message queue protocol):rabbitmq和kafka

一.、消息隊列解決了什么問題?
異步處理
應用解耦
流量削鋒
日志處理

二、rabbitmq安裝與配置

三、java操作rabbitmq
1. simple 簡單隊列
2. work queues 工作隊列 公平分發 輪詢分發
3. publish/subscribe 發布於訂閱
4. routing 路由選擇 通配符模式
5. topics 主題
6. 手動和自動確認消息
7. 隊列的持久化和非持久化
8. rabbitmq的延遲隊列

四、spring AMQP spring-rabbitmq

五、場景demo mq實現搜索引擎DIH增量

六、場景demo 未支付訂單30分鍾,取消

七、大數據應用 類似百度統計 cnzz架構 消息隊列

 

一、簡單隊列

ConnectionUtils.java

 1 public class ConnectionUtils {
 2     public static Connection getConnection() throws IOException, TimeoutException {
 3         ConnectionFactory factory = new ConnectionFactory();
 4         factory.setHost("127.0.0.1");
 5         factory.setPort(5672);
 6         factory.setVirtualHost("/vhost_mmr");
 7         factory.setUsername("cxx");
 8         factory.setPassword("cxx");
 9         return factory.newConnection();
10     }
11 }
View Code

Send.java

 1 /**
 2  * 生產者發送消息
 3  */
 4 public class Send {
 5 
 6     private static final String QUEUE_NAME = "test_simple_queue";
 7 
 8     public static void main(String[] args) throws IOException, TimeoutException {
 9         Connection connection = ConnectionUtils.getConnection();
10 
11         Channel channel = connection.createChannel();
12         channel.queueDeclare(QUEUE_NAME, false, false, false, null);
13 
14         String msg = "hello simple!!!!!!!!!!!";
15         channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
16         System.out.println("--send msg;" + msg);
17         channel.close();
18         connection.close();
19     }
20 }
View Code

Receive.java

 1 /**
 2  * 消費者獲取消息
 3  */
 4 public class Receive {
 5 
 6     private static final String QUEUE_NAME = "test_simple_queue";
 7 
 8     public static void main(String[] args) throws IOException, TimeoutException {
 9         //獲取鏈接
10         Connection connection = ConnectionUtils.getConnection();
11         //創建通道
12         Channel channel = connection.createChannel();
13         //隊列聲明
14         channel.queueDeclare(QUEUE_NAME, false, false, false, null);
15 
16         DefaultConsumer consumer = new DefaultConsumer(channel) {
17             //獲取到達的消息
18             @Override
19             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
20                 super.handleDelivery(consumerTag, envelope, properties, body);
21                 String msg = new String(body, "utf-8");
22                 System.out.println("new api recv:" + msg);
23             }
24         };
25 
26         //監聽隊列
27         channel.basicConsume(QUEUE_NAME, true, consumer);
28     }
29 }
View Code

 

二、工作隊列

2.1 輪詢分發

Send.java

 1 /**
 2  * \---c1
 3  * p---Queue----\
 4  * \---c2
 5  */
 6 public class Send {
 7     private static final String QUEUE_NAME = "test_work_queue";
 8 
 9     public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
10         //獲取鏈接
11         Connection connection = ConnectionUtils.getConnection();
12         //獲取channel
13         Channel channel = connection.createChannel();
14         //聲明隊列
15         channel.queueDeclare(QUEUE_NAME, false, false, false, null);
16         for (int i = 0; i < 50; i++) {
17             String msg = "hello" + i;
18 
19             System.out.println("[WQ ] send:" + msg);
20             channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
21             Thread.sleep(i * 20);
22         }
23         channel.close();
24         connection.close();
25     }
26 }
View Code

Receive1.java

 1 public class Receive1 {
 2     public static final String QUEUE_NAME = "test_work_queue";
 3 
 4     public static void main(String[] args) throws IOException, TimeoutException {
 5         //創建鏈接
 6         Connection connection = ConnectionUtils.getConnection();
 7         //創建頻道
 8         Channel channel = connection.createChannel();
 9         //聲明隊列
10         channel.queueDeclare(QUEUE_NAME, false, false, false, null);
11 
12         //定義一個消費者
13         Consumer consumer = new DefaultConsumer(channel) {
14             @Override
15             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
16                 String msg = new String(body, "utf-8");
17                 System.out.println("[1] Recv msg :" + msg);
18                 try {
19                     Thread.sleep(2000);
20                 } catch (InterruptedException e) {
21                     e.printStackTrace();
22                 } finally {
23                     System.out.println("[1] done");
24                 }
25             }
26         };
27         boolean autoAck = false;
28         channel.basicConsume(QUEUE_NAME, autoAck, consumer);
29     }
30 }
View Code

Receive2.java

 1 public class Receive2 {
 2     public static final String QUEUE_NAME = "test_work_queue";
 3 
 4     public static void main(String[] args) throws IOException, TimeoutException {
 5         //創建鏈接
 6         Connection connection = ConnectionUtils.getConnection();
 7         //創建頻道
 8         Channel channel = connection.createChannel();
 9         //聲明隊列
10         channel.queueDeclare(QUEUE_NAME, false, false, false, null);
11 
12         //定義一個消費者
13         Consumer consumer = new DefaultConsumer(channel) {
14             @Override
15             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
16                 String msg = new String(body, "utf-8");
17                 System.out.println("[2] Recv msg :" + msg);
18                 try {
19                     Thread.sleep(1000);
20                 } catch (InterruptedException e) {
21                     e.printStackTrace();
22                 } finally {
23                     System.out.println("[2] done");
24                 }
25             }
26         };
27         boolean autoAck = false;
28         channel.basicConsume(QUEUE_NAME, autoAck, consumer);
29     }
30 }
View Code

 

現象:消費者1和消費者2處理的消息是一樣的

           消費者1偶數

           消費者1奇數

           這種方式叫做輪詢分發(round-robin),結果就是不管誰忙活着誰清閑,都不會多給一個消息,任意消息總是你一個,我一個

 

2.2 公平分發(fair dipatch)

Send.java

 1 /**
 2  * \---c1
 3  * p---Queue----\
 4  * \---c2
 5  */
 6 public class Send {
 7     private static final String QUEUE_NAME = "test_work_queue";
 8 
 9     public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
10         //獲取鏈接
11         Connection connection = ConnectionUtils.getConnection();
12         //獲取channel
13         Channel channel = connection.createChannel();
14         //聲明隊列
15         channel.queueDeclare(QUEUE_NAME, false, false, false, null);
16         /**
17          * 每個消費者發送消費之前,消息隊列不發送下一個消息到消費者,一次只處理一個消息
18          * 限制發送給同一個消費者不得超過一個消息
19          */
20         int prefetchCount = 1;
21         channel.basicQos(prefetchCount);
22 
23         for (int i = 0; i < 50; i++) {
24             String msg = "hello" + i;
25 
26             System.out.println("[WQ ] send:" + msg);
27             channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
28             Thread.sleep(i * 5);
29         }
30         channel.close();
31         connection.close();
32     }
33 }
View Code

Receive1.java

 1 public class Receive1 {
 2     public static final String QUEUE_NAME = "test_work_queue";
 3 
 4     public static void main(String[] args) throws IOException, TimeoutException {
 5         //創建鏈接
 6         Connection connection = ConnectionUtils.getConnection();
 7         //創建頻道
 8         final Channel channel = connection.createChannel();
 9         //聲明隊列
10         channel.queueDeclare(QUEUE_NAME, false, false, false, null);
11         //保證一次只發送一個
12         channel.basicQos(1);
13 
14         //定義一個消費者
15         Consumer consumer = new DefaultConsumer(channel) {
16             @Override
17             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
18                 String msg = new String(body, "utf-8");
19                 System.out.println("[1] Recv msg :" + msg);
20                 try {
21                     Thread.sleep(2000);
22                 } catch (InterruptedException e) {
23                     e.printStackTrace();
24                 } finally {
25                     System.out.println("[1] done");
26                     //手動回執
27                     channel.basicAck(envelope.getDeliveryTag(), false);
28                 }
29             }
30         };
31         boolean autoAck = false;
32         channel.basicConsume(QUEUE_NAME, autoAck, consumer);
33     }
34 }
View Code

Receive2.java

 1 public class Receive2 {
 2     public static final String QUEUE_NAME = "test_work_queue";
 3 
 4     public static void main(String[] args) throws IOException, TimeoutException {
 5         //創建鏈接
 6         Connection connection = ConnectionUtils.getConnection();
 7         //創建頻道
 8         final Channel channel = connection.createChannel();
 9         //聲明隊列
10         channel.queueDeclare(QUEUE_NAME, false, false, false, null);
11         //保證一次只發送一個
12         channel.basicQos(1);
13         //定義一個消費者
14         Consumer consumer = new DefaultConsumer(channel) {
15             @Override
16             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
17                 String msg = new String(body, "utf-8");
18                 System.out.println("[2] Recv msg :" + msg);
19                 try {
20                     Thread.sleep(1000);
21                 } catch (InterruptedException e) {
22                     e.printStackTrace();
23                 } finally {
24                     System.out.println("[2] done");
25                     //手動回執
26                     channel.basicAck(envelope.getDeliveryTag(), false);
27                 }
28             }
29         };
30         boolean autoAck = false;
31         channel.basicConsume(QUEUE_NAME, autoAck, consumer);
32     }
33 }
View Code

 現象:消費者2處理的消息比消費者1多,能者多勞

三、消息與應答ack與消息持久化durable

 boolean autoAck = false;

 channel.basicConsume(QUEUE_NAME, autoAck, consumer);

 

 boolean autoAck = true;(自動確認模式)一旦rabbitmq將消息分發給消費者,就會從內存中刪除

 這種情況下,如果殺死正在執行的消費者,就會丟失正在處理的消息

 

 boolnea autoAck = false;(手動模式),如果有一個消費者掛掉,就會交付給其他消費者,

 rabbitmq支持消息應答,消費者發送一個應答,告訴rabbitmq這個消息我已經處理完成,你可以刪除了,然后rabbitmq就刪除

內存中的消息

 

消息應答模式是打開的,false

 

Message acknowkedgment

消息持久化

boolean durable = false
channel.queueDeclare(QUEUE_NAME, durable, false, false, null);

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM