一、生產者
-
創建ConnectionFactory工廠(地址、用戶名、密碼、vhost)
-
創建Connection
-
創建信道(Channel)
-
創建 exchange(指定 名稱、類型-DIRECT("direct"), FANOUT("fanout"), TOPIC("topic"), HEADERS("headers");、是否持久化)
-
發送消息(指定:exchange、發送的routingKey , 發送到的消息 )
基礎的生產者:

public class TestProducer { public final static String EXCHANGE_NAME = "direct_logs"; public static void main(String[] args) throws IOException, TimeoutException { /* 創建連接,連接到RabbitMQ*/ ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.112.131"); connectionFactory.setVirtualHost("my_vhost"); connectionFactory.setUsername("admin"); connectionFactory.setPassword("admin"); Connection connection = connectionFactory.newConnection(); /*創建信道*/ Channel channel = connection.createChannel(); /*創建交換器*/ channel.exchangeDeclare(EXCHANGE_NAME,"direct"); //channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.DIRECT); /*日志消息級別,作為路由鍵使用*/ String[] routekeys = {"king","queue","prince"}; for(int i=0;i<3;i++){ String routekey = routekeys[i%3]; String msg = "Hellol,RabbitMq"+(i+1); /*發布消息,需要參數:交換器,路由鍵,其中以日志消息級別為路由鍵*/ channel.basicPublish(EXCHANGE_NAME,routekey,null, msg.getBytes()); System.out.println("Sent "+routekey+":"+msg); } channel.close(); connection.close(); } }
二、消費者
-
創建ConnectionFactory工廠(地址、用戶名、密碼、vhost)
-
創建Connection
-
創建信道(Channel)
-
聲明一個 exchange(指定 名稱、類型、是否持久化)
-
創建一個隊列(指定:名稱,是否持久化,是否獨占,是否自動刪除,其他參數)
-
隊列、exchange通過routeKey進行綁定
-
消費者接收消息(隊列名稱,是否自動ACK)
基本的消費者:

public class TestConsumer { public static void main(String[] argv) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.112.131"); factory.setVirtualHost("my_vhost"); factory.setUsername("admin"); factory.setPassword("admin"); // 打開連接和創建頻道,與發送端一樣 Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); channel.exchangeDeclare(TestProducer.EXCHANGE_NAME, "direct"); /*聲明一個隊列*/ String queueName = "focuserror"; channel.queueDeclare(queueName,false,false, false,null); /*綁定,將隊列和交換器通過路由鍵進行綁定*/ String routekey = "king";/*表示只關注error級別的日志消息*/ channel.queueBind(queueName,TestProducer.EXCHANGE_NAME,routekey); System.out.println("waiting for message........"); /*聲明了一個消費者*/ final Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("Received["+envelope.getRoutingKey() +"]"+message); } }; /*消費者正式開始在指定隊列上消費消息*/ channel.basicConsume(queueName,true,consumer); } }
三、消息持久化
-
exchange 需要持久化
-
發送消息設置參數為 MessageProperties.PERSISTENT_TEXT_PLAIN
-
隊列需要設置參數為持久化
1、//TODO 創建持久化交換器 durable=true channel.exchangeDeclare(EXCHANGE_NAME,"direct",true); 2、//TODO 發布持久化的消息(delivery-mode=2) channel.basicPublish(EXCHANGE_NAME,routekey, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes()); 3、//TODO 聲明一個持久化隊列(durable=true) // autoDelete=true 消費者停止了,則隊列會自動刪除 //exclusive=true獨占隊列,只能有一個消費者消費 String queueName = "msgdurable"; channel.queueDeclare(queueName,true,false, false,null);
四、如何支持事務(防止投遞消息的時候消息丟失-效率特別低,不建議使用,可以使用生產者ACK機制)
-
啟動事務
-
成功提交
-
失敗則回滾

//TODO //加入事務 channel.txSelect(); try { for(int i=0;i<3;i++){ String routekey = routekeys[i%3]; // 發送的消息 String message = "Hello World_"+(i+1) +("_"+System.currentTimeMillis()); channel.basicPublish(EXCHANGE_NAME, routekey, true, null, message.getBytes()); System.out.println("----------------------------------"); System.out.println(" Sent Message: [" + routekey +"]:'" + message + "'"); Thread.sleep(200); } //TODO //事務提交 channel.txCommit(); } catch (IOException e) { e.printStackTrace(); //TODO //事務回滾 channel.txRollback(); } catch (InterruptedException e) { e.printStackTrace(); }
五、消費消息手動ACK,如果異常則使用拒絕的方式,然后異常消息推送到-死信隊列
批量ack的時候如果其中有一個消息出現異常,則會導致消息丟失(日志處理的時候可以使用批量)

1 /*消費者正式開始在指定隊列上消費消息,第二個參數false為手動應答*/ channel.basicConsume(queueName,false,consumer); 2 收到消息以后,手動應答數據接收成功 channel.basicAck( envelope.getDeliveryTag(),false); 3 收到消息,如果處理失敗則拒絕消息:DeliveryTag是消息在隊列中的標識 channel.basicReject( envelope.getDeliveryTag(),false); 4 決絕的參數說明 //TODO Reject方式拒絕(這里第2個參數決定是否重新投遞),不要重復投遞,因為消息重復投遞后處理可能依然異常 //channel.basicReject(envelope.getDeliveryTag(),false); //TODO Nack方式的拒絕(第2個參數決定是否批量,第3個參數是否重新投遞) channel.basicNack(envelope.getDeliveryTag(), false, true);
六、創建隊列的參數解析:場景,延遲隊列,保存帶有時效性的訂單,一旦訂單過期,則信息會轉移到死信隊列
//TODO /*自動過期隊列--參數需要Map傳遞*/ String queueName = "setQueue"; Map<String, Object> arguments = new HashMap<String, Object>(); arguments.put("x-expires",10*1000);//消息在隊列中保存10秒后被刪除 //TODO 隊列的各種參數 /*加入隊列的各種參數*/ // autoDelete=true 消費者停止了,則隊列會自動刪除 //exclusive=true獨占隊列,只能有一個消費者消費 channel.queueDeclare(queueName,true,true, false,arguments);
七、發送消息以后帶有應答的隊列
-
聲明一個回應隊列
-
聲明一個回應消息的消費者
-
聲明一個屬性對象(指定隊列,會唯一的id)
-
生產者發送消息給消費者(帶着回應隊列)
-
消費者接收到消息以后根據對應的信息,給予回應
生產者端:

1、 //TODO 響應QueueName ,消費者將會把要返回的信息發送到該Queue String responseQueue = channel.queueDeclare().getQueue(); //TODO 消息的唯一id String msgId = UUID.randomUUID().toString(); 2、 /*聲明了一個消費者*/ final Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("Received["+envelope.getRoutingKey() +"]"+message); } }; //TODO 消費者應答隊列上的消息 channel.basicConsume(responseQueue,true,consumer); 3、 //TODO 設置消息中的應答屬性 AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder() .replyTo(responseQueue) .messageId(msgId) .build(); 4、 String msg = "Hello,RabbitMq"; //TODO 發送消息時,把響應相關屬性設置進去 channel.basicPublish(EXCHANGE_NAME,"error", properties, msg.getBytes());
消費者端:
String message = new String(body, "UTF-8"); System.out.println("Received["+envelope.getRoutingKey() +"]"+message); //TODO 從消息中拿到相關屬性(確定要應答的消息ID,) AMQP.BasicProperties respProp = new AMQP.BasicProperties.Builder() .replyTo(properties.getReplyTo()) .correlationId(properties.getMessageId()) .build(); //TODO 消息消費時,同時需要生作為生產者生產消息(以OK為標識) channel.basicPublish("", respProp.getReplyTo() , respProp , ("OK,"+message).getBytes("UTF-8"));
八、死信隊列 -
下列消息會放到死信隊列
-
消息被否定確認,使用 channel.basicNack 或 channel.basicReject ,並且此時requeue 屬性被設置為false。
-
消息在隊列的存活時間超過設置的TTL時間。
-
消息隊列的消息數量已經超過最大隊列長度

測試,消費者被拒絕的時候消息會進到死信隊列中: final Channel channel = connection.createChannel(); channel.exchangeDeclare(WillMakeDlxConsumer.BUS_EXCHANGE_NAME, BuiltinExchangeType.TOPIC); //TODO 綁定死信交換器 /*聲明一個隊列,並綁定死信交換器*/ Map<String,Object> args = new HashMap<>(); args.put("x-dead-letter-exchange", WillMakeDlxConsumer.DLX_EXCHANGE_NAME); // //TODO 死信路由鍵,會替換消息原來的路由鍵 // args.put("x-dead-letter-routing-key", "deal"); channel.queueDeclare(WillMakeDlxConsumer.BUS_QUEUE_NAME,false,false, false, args); /*綁定,將隊列和交換器通過路由鍵進行綁定*/ channel.queueBind(WillMakeDlxConsumer.BUS_QUEUE_NAME, WillMakeDlxConsumer.BUS_EXCHANGE_NAME,"#"); System.out.println("waiting for message........"); //聲明死信隊列 channel.exchangeDeclare(WillMakeDlxConsumer.DLX_EXCHANGE_NAME, "topic",true); channel.queueDeclare(WillMakeDlxConsumer.DLX_QUEUE_NAME, true, false, false, null); //路由鍵為 # 代表可以路由到所有消息 channel.queueBind(WillMakeDlxConsumer.DLX_QUEUE_NAME ,WillMakeDlxConsumer.DLX_EXCHANGE_NAME, "#"); /*聲明了一個消費者*/ final Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); //TODO //TODO 如果是king的消息確認 if(envelope.getRoutingKey().equals("king")){ System.out.println("Received[" +envelope.getRoutingKey() +"]"+message); channel.basicAck(envelope.getDeliveryTag(), false); }else{ //TODO 如果是其他的消息拒絕(rqueue=false),成為死信消息 System.out.println("Will reject[" +envelope.getRoutingKey() +"]"+message); channel.basicReject(envelope.getDeliveryTag(), false); } } }; /*消費者正式開始在指定隊列上消費消息*/ channel.basicConsume(WillMakeDlxConsumer.BUS_QUEUE_NAME ,false,consumer);
過期消息進入到死信隊列:

public static void main(String[] argsv) throws IOException, TimeoutException { Connection connection = getConnection(); // 創建一個信道 Channel channel = connection.createChannel(); // 指定轉發 channel.exchangeDeclare(WillMakeDlxConsumer.BUS_EXCHANGE_NAME, BuiltinExchangeType.TOPIC); Map<String,Object> args = new HashMap<>(); args.put("x-dead-letter-exchange", WillMakeDlxConsumer.DLX_EXCHANGE_NAME); //設置隊列中消息過期時間,過期則進入死信隊列 args.put("x-message-ttl", 10*1000); channel.queueDeclare(WillMakeDlxConsumer.BUS_QUEUE_NAME,true,false, false, args); /*綁定,將隊列和交換器通過路由鍵進行綁定*/ channel.queueBind(WillMakeDlxConsumer.BUS_QUEUE_NAME, WillMakeDlxConsumer.BUS_EXCHANGE_NAME,"#"); System.out.println("waiting for message........"); //聲明死信隊列 channel.exchangeDeclare(WillMakeDlxConsumer.DLX_EXCHANGE_NAME, "topic",true); channel.queueDeclare(WillMakeDlxConsumer.DLX_QUEUE_NAME, true, false, false, null); //路由鍵為 # 代表可以路由到所有消息 channel.queueBind(WillMakeDlxConsumer.DLX_QUEUE_NAME ,WillMakeDlxConsumer.DLX_EXCHANGE_NAME, "#"); /*日志消息級別,作為路由鍵使用*/ String[] routekeys = {"king","mark","james"}; for(int i=0;i<10;i++){ String routekey = routekeys[i%3]; String msg = "Hellol,RabbitMq"+(i+1); /*發布消息,需要參數:交換器,路由鍵,其中以日志消息級別為路由鍵*/ channel.basicPublish(WillMakeDlxConsumer.BUS_EXCHANGE_NAME,routekey,null, msg.getBytes()); System.out.println("Sent "+routekey+":"+msg); } // 關閉頻道和連接 channel.close(); connection.close(); }
九、消費者批量預取消費-每次服務器給消費者推送多少數據進行處理

//TODO 如果是兩個消費者(QOS ,批量)則輪詢獲取數據 //TODO 150條預取(150都取出來 150, 210-150 60 ) channel.basicQos(150,true); /*消費者正式開始在指定隊列上消費消息*/ channel.basicConsume(queueName,false,consumer); //TODO 自定義消費者批量確認 //BatchAckConsumer batchAckConsumer = new BatchAckConsumer(channel); //channel.basicConsume(queueName,false,batchAckConsumer);
十、生產者投遞消息確認模式,如果失敗了則可以重新投遞
同步確認:

// 啟用發送者確認模式 channel.confirmSelect(); //所有日志嚴重性級別 for(int i=0;i<2;i++){ // 發送的消息 String message = "Hello World_"+(i+1); //參數1:exchange name //參數2:routing key channel.basicPublish(EXCHANGE_NAME, ROUTE_KEY, true,null, message.getBytes()); System.out.println(" Sent Message: [" + ROUTE_KEY +"]:'"+ message + "'"); //TODO //確認是否成功(true成功) if(channel.waitForConfirms()){ System.out.println("send success"); }else{ //如果失敗則,可以重新投遞減小消息丟失的幾率 System.out.println("send failure"); } }
異步確認-添加監聽器:

//TODO // 啟用發送者確認模式 channel.confirmSelect(); //TODO // 添加發送者確認監聽器 channel.addConfirmListener(new ConfirmListener() { //TODO 成功 public void handleAck(long deliveryTag, boolean multiple) throws IOException { System.out.println("send_ACK:"+deliveryTag+",multiple:"+multiple); } //TODO 失敗 public void handleNack(long deliveryTag, boolean multiple) throws IOException { System.out.println("Erro----send_NACK:"+deliveryTag+",multiple:"+multiple); } }); //TODO // 添加失敗者通知 channel.addReturnListener(new ReturnListener() { public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body); System.out.println("RabbitMq路由失敗: "+routingKey+"."+message); } }); String[] routekeys={"king","mark"}; //TODO 6條 for(int i=0;i<20;i++){ String routekey = routekeys[i%2]; //String routekey = "king"; // 發送的消息 String message = "Hello World_"+(i+1)+("_"+System.currentTimeMillis()); channel.basicPublish(EXCHANGE_NAME, routekey, true, MessageProperties.PERSISTENT_BASIC, message.getBytes()); }
十一、基本信息-創建鏈接

public class BasicMq { public static final String MQ_IP = "192.168.112.131"; public static final String USER = "admin"; public static final String PWD = "admin"; public static final String VHOST = "my_vhost"; /** * * @return */ public static final Connection getConnection() { /* 創建連接,連接到RabbitMQ*/ ConnectionFactory connectionFactory = null; try { connectionFactory = new ConnectionFactory(); connectionFactory.setHost(BasicMq.MQ_IP); connectionFactory.setUsername( BasicMq.USER); connectionFactory.setPassword( BasicMq.PWD ); connectionFactory.setVirtualHost( BasicMq.VHOST ); return connectionFactory.newConnection(); } catch (Exception e) { e.printStackTrace(); } return null ; } public static final Channel getChannel() throws IOException { return getConnection().createChannel(); } }
十二、異常整理
-
如果生產者聲明exchange為 durable=true,那么消費者對應的exchange也必須為durable=true
-
消費者原來是durable=false,修改后變為durable=true,那么因為服務器上已經有這個隊列,但是參數不一致會異常,需要刪除服務器上的對應隊列