一、topic主題模式
特點:模糊的routingkey的匹配模式
注意:*代表是必須為一個;#代表0個或者多個
二、代碼
RabbitMQ界面配置
創建交換機
創建隊列
將隊列綁定在交換機上
生產者
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 生產者
*/
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
//所有的中間件技術都是基於TCP/IP協議基礎構建的協議規范,rabbitmq遵循的是ampq協議
//1.創建連接工廠
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//2.創建連接Connection
Connection connection = connectionFactory.newConnection();
//3.通過連接獲取通道channel
Channel channel = connection.createChannel();
//4.通過通道創建交換機、聲明隊列、綁定關系、路由key、發送消息、和接受消息
//5.准備消息內容
String msg="hello topic";
//6.准備交換機
String exchangeName="topic_exchange";
//定義路由key
String routingkey="a.add";
//定義指定交換機類型
String exchangeType="topic";
// //聲明隊列
// channel.queueDeclare("q1",false,false,false,null);
// channel.queueDeclare("q2",false,false,false,null);
// channel.queueDeclare("q3",false,false,false,null);
// //聲明交換機
// channel.exchangeDeclare("fanout_exchange","fanout");
// //將隊列綁定到交換機
// channel.queueBind("q1",exchangeName,null);
// channel.queueBind("q2",exchangeName,null);
// channel.queueBind("q3",exchangeName,null);
//6.發送消息給隊列queue
/*
參數一:交換機
參數二:隊列、路由key
參數三:消息的狀態控制
參數四:消息主體
*/
channel.basicPublish(exchangeName,routingkey,null,msg.getBytes());
//7.關閉通道
channel.close();
//8.關閉連接
connection.close();
System.out.println("生產成功");
}
}
消費者
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer {
private static Runnable runnable = new Runnable() {
@Override
public void run() {
//1.創建連接工廠
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//獲取隊列名稱
final String queueName=Thread.currentThread().getName();
Connection connection = null;
Channel channel=null;
try {
//2.創建連接Connection
connection = connectionFactory.newConnection();
//3.通過連接獲取通道channel
channel = connection.createChannel();
//定義接受消息回調
Channel finalChannel =channel;
finalChannel.basicConsume(queueName, true, new DeliverCallback() {
@Override
public void handle(String s, Delivery delivery) throws IOException {
System.out.println(delivery.getEnvelope().getDeliveryTag());
System.out.println(queueName + ":收到消息是:" + new String(delivery.getBody(), "UTF-8"));
}
}, new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {
}
});
System.out.println(queueName+":開始接受消息");
System.in.read();
} catch (IOException | TimeoutException e) {
e.printStackTrace();
System.out.println("發送消息異常。");
}finally {
//7.關閉通道釋放連接
if (channel !=null && channel.isOpen()){
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
if (connection !=null && connection.isOpen()){
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
};
public static void main(String[] args) {
//啟動三個線程
new Thread(runnable,"q7").start();
new Thread(runnable,"q8").start();
new Thread(runnable,"q9").start();
}
}
add的結果