pom
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>springcloudparent</artifactId> <groupId>com.cxy</groupId> <version>0.0.1-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>rabbitMqConsumer</artifactId> <dependencies> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.4.3</version> </dependency> </dependencies> </project>
消費者代碼:
package com.cxy.consumer; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /*** * @ClassName: Consumer1 * @Description: * @Auther: cxy * @Date: 2019/3/24:11:37 * @version : V1.0 */ public class Consumer1 { private static final String Queue="helloworld"; public static void main(String[] args) { ConnectionFactory connectionFactory= new ConnectionFactory(); //設置連接地址 connectionFactory.setHost("192.168.230.134"); //設置端口 connectionFactory.setPort(5672); //設置密碼用戶名 connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); //設置虛擬機,每個虛擬機相當於一個小的mq connectionFactory.setVirtualHost("/"); Connection connection =null; try { //建立連接 connection = connectionFactory.newConnection(); //建立通道,生產着和消費者都是在通道中完成 Channel channel = connection.createChannel(); /* queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) 參數1,聲明隊列 參數2 是否持久化 參數3 是否排他,是否獨戰連接,隊列只允許該鏈接中訪問,如果連接關閉,隊列也就刪除了 參數4:是否自動刪除,如果將此參數設置true,那么就變成零時隊列 參數5 :擴展參數,例如存活時間 */ channel.queueDeclare(Queue,true,false,false,null); /* String basicConsume(String queue, boolean autoAck, Consumer callback) 參數一:隊列名稱 參數二:自動回復 參數三 消費者方法 */ DefaultConsumer defaultConsumer=new DefaultConsumer(channel) { //當接受到消息時候,此方法被調用 /** * @Author cxy * @Description //TODO * @Date 2019/3/24 * @Param [consumerTag, envelope, properties, body] * @return void * * consumerTag 用來標識.可以再監聽隊列時候設置 * envelope 信封,通過envelope可以通過這個獲取到很多東西 * properties 額外的消息屬性 * body:消息體 **/ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //獲取交換機 String exchange = envelope.getExchange(); //消息id,用來表示那個消息消費了 long deliveryTag = envelope.getDeliveryTag(); String message=new String(body,"utf-8"); System.out.println("receive"); } }; channel.basicConsume(Queue,true ,defaultConsumer); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } }
由於注釋內容都寫得很詳細就沒有單獨寫文字了.運行之后可以發掘管控台中消息沒有了,
在正式開發中不會使用這種原生得代碼去使用,會采用springboot去整合相關內容,至於以上代碼為什么還要去監聽隊列,防止如果隊列不存在,程序會存在異常,所以這樣,在正式開發中
會采用手動會簽得方式,如果,沒有會簽,就會進行消息重試.保證消息不會丟失