添加依賴
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.4.3</version> </dependency>
消息的發送者(生產者)
public static void main(String[] args) throws Exception { //創建連接工廠,並設置連接信息 ConnectionFactory f = new ConnectionFactory(); f.setHost("ip地址"); f.setPort(5672);//可選,5672是默認端口 f.setUsername("admin"); f.setPassword("admin"); /* * 與rabbitmq服務器建立連接, * rabbitmq服務器端使用的是nio,會復用tcp連接, * 並開辟多個信道與客戶端通信 * 以減輕服務器端建立連接的開銷 */ Connection c = f.newConnection(); //建立信道 Channel ch = c.createChannel(); /* * 聲明隊列,會在rabbitmq中創建一個隊列 * 如果已經創建過該隊列,就不能再使用其他參數來創建 * * 參數含義: * -queue: 隊列名稱 * -durable: 隊列持久化,true表示RabbitMQ重啟后隊列仍存在 * -exclusive: 排他,true表示限制僅當前連接可用 * -autoDelete: 當最后一個消費者斷開后,是否刪除隊列 * -arguments: 其他參數 */ ch.queueDeclare("task_queue", true,false,false,null); /* * 發布消息 * 這里把消息向默認交換機發送. * 默認交換機隱含與所有隊列綁定,routing key即為隊列名稱 * * 參數含義: * -exchange: 交換機名稱,空串表示默認交換機"(AMQP default)",不能用 null * -routingKey: 對於默認交換機,路由鍵就是目標隊列名稱 * -props: 其他參數,例如頭信息 * -body: 消息內容byte[]數組 */ // ch.basicPublish("", "helloworld", null, "Hello world!".getBytes()); // System.out.println("消息已發送"); while (true) { //控制台輸入的消息發送到rabbitmq System.out.print("輸入消息: "); String msg = new Scanner(System.in).nextLine(); //如果輸入的是"exit"則結束生產者進程 if ("exit".equals(msg)) { break; } //參數:exchage,routingKey,props,body; //MessageProperties.PERSISTENT_TEXT_PLAIN持久化的設置 ch.basicPublish("", "task_queue", MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes()); System.out.println("消息已發送: "+msg); } c.close(); }
消費者
public static void main(String[] args) throws Exception { //連接工廠 ConnectionFactory f = new ConnectionFactory(); f.setHost("ip地址"); f.setUsername("admin"); f.setPassword("admin"); //建立連接 Connection c = f.newConnection(); //建立信道 Channel ch = c.createChannel(); //聲明隊列,如果該隊列已經創建過,則不會重復創建 ch.queueDeclare("task_queue",true,false,false,null); System.out.println("等待接收數據"); //收到消息后用來處理消息的回調對象 DeliverCallback callback = new DeliverCallback() { @Override public void handle(String consumerTag, Delivery message) throws IOException { String msg = new String(message.getBody(), "UTF-8"); System.out.println("收到: "+msg); //遍歷字符串中的字符,每個點使進程暫停一秒 for (int i = 0; i < msg.length(); i++) { if (msg.charAt(i)=='.') { try { Thread.sleep(1000); } catch (InterruptedException e) { } } } System.out.println("處理結束"); //參數1:消息標簽,參數2:是否確認多條消息 ch.basicAck(message.getEnvelope().getDeliveryTag(),false); } }; //消費者取消時的回調對象 CancelCallback cancel = new CancelCallback() { @Override public void handle(String consumerTag) throws IOException { } }; //一次只能接受一條數據 ch.basicQos(1); //第二個參數為消息回執,消息確認處理完成,為true為自動確認,只要消息發送到消費者即消息處理成功;為false為,手動發送確認回執,服務器才認為這個消息處理成功 ch.basicConsume("task_queue", false, callback, cancel); }
以上的ip地址為你MQ安裝的ip地址
以上內容摘自https://blog.csdn.net/weixin_38305440/article/details/102810522,此處做下記錄方便以后查詢使用