JMS組件:activemq(慢)
AMQP組件(advance message queue protocol):rabbitmq和kafka
一.、消息隊列解決了什么問題?
異步處理
應用解耦
流量削鋒
日志處理
二、rabbitmq安裝與配置
三、java操作rabbitmq
1. simple 簡單隊列
2. work queues 工作隊列 公平分發 輪詢分發
3. publish/subscribe 發布於訂閱
4. routing 路由選擇 通配符模式
5. topics 主題
6. 手動和自動確認消息
7. 隊列的持久化和非持久化
8. rabbitmq的延遲隊列
四、spring AMQP spring-rabbitmq
五、場景demo mq實現搜索引擎DIH增量
六、場景demo 未支付訂單30分鍾,取消
七、大數據應用 類似百度統計 cnzz架構 消息隊列
一、簡單隊列
ConnectionUtils.java

1 public class ConnectionUtils { 2 public static Connection getConnection() throws IOException, TimeoutException { 3 ConnectionFactory factory = new ConnectionFactory(); 4 factory.setHost("127.0.0.1"); 5 factory.setPort(5672); 6 factory.setVirtualHost("/vhost_mmr"); 7 factory.setUsername("cxx"); 8 factory.setPassword("cxx"); 9 return factory.newConnection(); 10 } 11 }
Send.java

1 /** 2 * 生產者發送消息 3 */ 4 public class Send { 5 6 private static final String QUEUE_NAME = "test_simple_queue"; 7 8 public static void main(String[] args) throws IOException, TimeoutException { 9 Connection connection = ConnectionUtils.getConnection(); 10 11 Channel channel = connection.createChannel(); 12 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 13 14 String msg = "hello simple!!!!!!!!!!!"; 15 channel.basicPublish("", QUEUE_NAME, null, msg.getBytes()); 16 System.out.println("--send msg;" + msg); 17 channel.close(); 18 connection.close(); 19 } 20 }
Receive.java

1 /** 2 * 消費者獲取消息 3 */ 4 public class Receive { 5 6 private static final String QUEUE_NAME = "test_simple_queue"; 7 8 public static void main(String[] args) throws IOException, TimeoutException { 9 //獲取鏈接 10 Connection connection = ConnectionUtils.getConnection(); 11 //創建通道 12 Channel channel = connection.createChannel(); 13 //隊列聲明 14 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 15 16 DefaultConsumer consumer = new DefaultConsumer(channel) { 17 //獲取到達的消息 18 @Override 19 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { 20 super.handleDelivery(consumerTag, envelope, properties, body); 21 String msg = new String(body, "utf-8"); 22 System.out.println("new api recv:" + msg); 23 } 24 }; 25 26 //監聽隊列 27 channel.basicConsume(QUEUE_NAME, true, consumer); 28 } 29 }
二、工作隊列
2.1 輪詢分發
Send.java

1 /** 2 * \---c1 3 * p---Queue----\ 4 * \---c2 5 */ 6 public class Send { 7 private static final String QUEUE_NAME = "test_work_queue"; 8 9 public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { 10 //獲取鏈接 11 Connection connection = ConnectionUtils.getConnection(); 12 //獲取channel 13 Channel channel = connection.createChannel(); 14 //聲明隊列 15 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 16 for (int i = 0; i < 50; i++) { 17 String msg = "hello" + i; 18 19 System.out.println("[WQ ] send:" + msg); 20 channel.basicPublish("", QUEUE_NAME, null, msg.getBytes()); 21 Thread.sleep(i * 20); 22 } 23 channel.close(); 24 connection.close(); 25 } 26 }
Receive1.java

1 public class Receive1 { 2 public static final String QUEUE_NAME = "test_work_queue"; 3 4 public static void main(String[] args) throws IOException, TimeoutException { 5 //創建鏈接 6 Connection connection = ConnectionUtils.getConnection(); 7 //創建頻道 8 Channel channel = connection.createChannel(); 9 //聲明隊列 10 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 11 12 //定義一個消費者 13 Consumer consumer = new DefaultConsumer(channel) { 14 @Override 15 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { 16 String msg = new String(body, "utf-8"); 17 System.out.println("[1] Recv msg :" + msg); 18 try { 19 Thread.sleep(2000); 20 } catch (InterruptedException e) { 21 e.printStackTrace(); 22 } finally { 23 System.out.println("[1] done"); 24 } 25 } 26 }; 27 boolean autoAck = false; 28 channel.basicConsume(QUEUE_NAME, autoAck, consumer); 29 } 30 }
Receive2.java

1 public class Receive2 { 2 public static final String QUEUE_NAME = "test_work_queue"; 3 4 public static void main(String[] args) throws IOException, TimeoutException { 5 //創建鏈接 6 Connection connection = ConnectionUtils.getConnection(); 7 //創建頻道 8 Channel channel = connection.createChannel(); 9 //聲明隊列 10 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 11 12 //定義一個消費者 13 Consumer consumer = new DefaultConsumer(channel) { 14 @Override 15 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { 16 String msg = new String(body, "utf-8"); 17 System.out.println("[2] Recv msg :" + msg); 18 try { 19 Thread.sleep(1000); 20 } catch (InterruptedException e) { 21 e.printStackTrace(); 22 } finally { 23 System.out.println("[2] done"); 24 } 25 } 26 }; 27 boolean autoAck = false; 28 channel.basicConsume(QUEUE_NAME, autoAck, consumer); 29 } 30 }
現象:消費者1和消費者2處理的消息是一樣的
消費者1偶數
消費者1奇數
這種方式叫做輪詢分發(round-robin),結果就是不管誰忙活着誰清閑,都不會多給一個消息,任意消息總是你一個,我一個
2.2 公平分發(fair dipatch)
Send.java

1 /** 2 * \---c1 3 * p---Queue----\ 4 * \---c2 5 */ 6 public class Send { 7 private static final String QUEUE_NAME = "test_work_queue"; 8 9 public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { 10 //獲取鏈接 11 Connection connection = ConnectionUtils.getConnection(); 12 //獲取channel 13 Channel channel = connection.createChannel(); 14 //聲明隊列 15 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 16 /** 17 * 每個消費者發送消費之前,消息隊列不發送下一個消息到消費者,一次只處理一個消息 18 * 限制發送給同一個消費者不得超過一個消息 19 */ 20 int prefetchCount = 1; 21 channel.basicQos(prefetchCount); 22 23 for (int i = 0; i < 50; i++) { 24 String msg = "hello" + i; 25 26 System.out.println("[WQ ] send:" + msg); 27 channel.basicPublish("", QUEUE_NAME, null, msg.getBytes()); 28 Thread.sleep(i * 5); 29 } 30 channel.close(); 31 connection.close(); 32 } 33 }
Receive1.java

1 public class Receive1 { 2 public static final String QUEUE_NAME = "test_work_queue"; 3 4 public static void main(String[] args) throws IOException, TimeoutException { 5 //創建鏈接 6 Connection connection = ConnectionUtils.getConnection(); 7 //創建頻道 8 final Channel channel = connection.createChannel(); 9 //聲明隊列 10 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 11 //保證一次只發送一個 12 channel.basicQos(1); 13 14 //定義一個消費者 15 Consumer consumer = new DefaultConsumer(channel) { 16 @Override 17 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { 18 String msg = new String(body, "utf-8"); 19 System.out.println("[1] Recv msg :" + msg); 20 try { 21 Thread.sleep(2000); 22 } catch (InterruptedException e) { 23 e.printStackTrace(); 24 } finally { 25 System.out.println("[1] done"); 26 //手動回執 27 channel.basicAck(envelope.getDeliveryTag(), false); 28 } 29 } 30 }; 31 boolean autoAck = false; 32 channel.basicConsume(QUEUE_NAME, autoAck, consumer); 33 } 34 }
Receive2.java

1 public class Receive2 { 2 public static final String QUEUE_NAME = "test_work_queue"; 3 4 public static void main(String[] args) throws IOException, TimeoutException { 5 //創建鏈接 6 Connection connection = ConnectionUtils.getConnection(); 7 //創建頻道 8 final Channel channel = connection.createChannel(); 9 //聲明隊列 10 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 11 //保證一次只發送一個 12 channel.basicQos(1); 13 //定義一個消費者 14 Consumer consumer = new DefaultConsumer(channel) { 15 @Override 16 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { 17 String msg = new String(body, "utf-8"); 18 System.out.println("[2] Recv msg :" + msg); 19 try { 20 Thread.sleep(1000); 21 } catch (InterruptedException e) { 22 e.printStackTrace(); 23 } finally { 24 System.out.println("[2] done"); 25 //手動回執 26 channel.basicAck(envelope.getDeliveryTag(), false); 27 } 28 } 29 }; 30 boolean autoAck = false; 31 channel.basicConsume(QUEUE_NAME, autoAck, consumer); 32 } 33 }
現象:消費者2處理的消息比消費者1多,能者多勞
三、消息與應答ack與消息持久化durable
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
boolean autoAck = true;(自動確認模式)一旦rabbitmq將消息分發給消費者,就會從內存中刪除
這種情況下,如果殺死正在執行的消費者,就會丟失正在處理的消息
boolnea autoAck = false;(手動模式),如果有一個消費者掛掉,就會交付給其他消費者,
rabbitmq支持消息應答,消費者發送一個應答,告訴rabbitmq這個消息我已經處理完成,你可以刪除了,然后rabbitmq就刪除
內存中的消息
消息應答模式是打開的,false
Message acknowkedgment
消息持久化
boolean durable = false
channel.queueDeclare(QUEUE_NAME, durable, false, false, null);