<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
package rabbitmq;
import org.junit.Test;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
/**
* @author Administrator
* 消息生产者p将消息放入队列
* 消费者监听队列,如果队列中有消息,就消费掉,消息被拿走后,自动从队列删除
* (隐患,消息可能没有被消费者正确处理,已经消失了,无法恢复)
* 应用场景:聊天室
*/
public class SimpleTest {
@Test
//模拟生产者将消息放入队列
public void send1() throws Exception{
/* 1 创建连接工厂
* 2 配置共创config
* 3 获取连接
* 4获取信道
* 5 从信道声明queue
* 6 发送消息
* 7 释放资源
*/
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setVirtualHost("/tb");
factory.setUsername("rabbitmquser");
factory.setPassword("123456");
//从工厂获取连接
Connection newConnection = factory.newConnection();
//获取信道
Channel createChannel = newConnection.createChannel();
//利用channel声明第一个队列
//queue String类型,表示声明的queue对列的名字
//durable Boolean类型,表示是否持久化
//exclusive Boolean类型:当前声明的queue是否专注;true当前连接创建的
//任何channle都可以连接这个queue,false,新的channel不可使用
//autoDelete Boolean类型:在最后连接使用完成后,是否删除队列,false
//arguments Map类型,其他声明参数
createChannel.queueDeclare("simple", false, false, false, null);
//发送消息
String msg="helloworld,nihaoa";
//exchange String类型,交换机名称,简单模式使用默认交换""
//routingkey String类型,当前的消息绑定的routingkey,简单模式下,与队列同名即可
//props BasicProperties类型,消息的属性字段对象,例如BasicProperties
//可以设置一个deliveryMode的值0 持久化,1 表示不持久化,durable配合使用
//body byte[] :消息字符串的byte数组
createChannel.basicPublish("","simple",null, msg.getBytes());
}
//模拟消费端
@Test
public void receive1() throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("172.16.10.132");
factory.setPort(5672);
factory.setVirtualHost("/tb");
factory.setUsername("rabbitmquser");
factory.setPassword("123456");
//从工厂获取连接
Connection conn=factory.newConnection();
//从连接获取信道
Channel chan=conn.createChannel();
chan.queueDeclare("simple", false, false, false, null);
//创建一个消费者
QueueingConsumer consumer = new QueueingConsumer(chan);
chan.basicConsume("simple", consumer);
//监听队列
while (true) {
//获取下一个delivery,delivery从队列获取消息
Delivery delivery = consumer.nextDelivery();
String msg=new String(delivery.getBody());
System.out.println(msg);
}
}
}
package rabbitmq2;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class ConnectionUtil {
public static Connection getConn(){
try {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setVirtualHost("/tb");
factory.setPort(5672);
factory.setUsername("rabbitmquser");
factory.setPassword("123456");
//从工厂获取连接
Connection conn = factory.newConnection();
return conn;
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
}
package rabbitmq2;
import org.junit.Test;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
/**
* @author Administrator
* work模式
*生产者将消息放入队列
*多个消费者同时监听同一个队列,消息如何被消费?
*C1,C2共同争抢当前消息队列的内容,谁先拿到消息,谁来负责消费
*应用场景:红包;大型项目中的资源调度过程(直接由最空闲的系统争抢到资源处理任务)
*/
public class WorkTest {
@Test
public void send1() throws Exception{
Connection conn = ConnectionUtil.getConn();
Channel channel = conn.createChannel();
//声明队列
channel.queueDeclare("work", false, false, false, null);
for(int i=0;i<100;i++){
String msg = "1712,hello:"+i+"message";
channel.basicPublish("","work",null, msg.getBytes());
System.out.println("第"+i+"条信息已经发送");
}
channel.close();
conn.close();
}
@Test
public void receive1() throws Exception{
//获取连接,获取信道
Connection connection = ConnectionUtil.getConn();
Channel channel = connection.createChannel();
channel.queueDeclare("work", false, false, false, null);
//同一时刻服务器只发送一条消息给同一消费者,消费者空闲,才发送一条
channel.basicQos(1);
//定义消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
//绑定队列和消费者的关系 queue
//autoAck:消息被消费后,是否自动确认回执,如果false,不自动需要手动在
//callback完成消息消费后进行回执确认,channel.ack,channel.nack
//chan.basicConsume(queue, autoAck, callback)
channel.basicConsume("work",false, consumer);
//监听
while (true) {
Delivery delivery = consumer.nextDelivery();
byte[] result = delivery.getBody();
String msg = new String(result);
System.out.println("接收到:"+msg);
Thread.sleep(50);
//返回服务器,回执
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
@Test
public void receive2() throws Exception{
//获取连接,获取信道
Connection connection = ConnectionUtil.getConn();
Channel channel = connection.createChannel();
channel.queueDeclare("work", false, false, false, null);
//同一时刻服务器只发送一条消息给同一消费者,消费者空闲,才发送一条
channel.basicQos(1);
//定义消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
//绑定队列和消费者的关系 queue
//autoAck:消息被消费后,是否自动确认回执,如果false,不自动需要手动在
//callback完成消息消费后进行回执确认,channel.ack,channel.nack
//chan.basicConsume(queue, autoAck, callback)
channel.basicConsume("work",false, consumer);
//监听
while (true) {
Delivery delivery = consumer.nextDelivery();
byte[] result = delivery.getBody();
String msg = new String(result);
System.out.println("接收到:"+msg);
Thread.sleep(50);
//返回服务器,回执
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}
package rabbitmq3;
import org.junit.Test;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
/**
* @author Administrator
*生产者将消息交给交换机
*有交换机根据发布订阅的模式设定将消息同步到所有的绑定队列中;
*后端的消费者都能拿到消息
*应用场景:邮件群发,群聊天,广告
*/
public class FanoutTest {
//交换机,有类型,发布订阅:fanout
//路由模式:direct
//主题模式:topic
@Test
public void send() throws Exception{
//获取连接
Connection conn = ConnectionUtil.getConn();
Channel channel = conn.createChannel();
//声明交换机
//参数意义,1交换机名称2类型:fanout,direct,topic
channel.exchangeDeclare("fanoutEx", "fanout");
//发送消息
for(int i=0;i<100;i++){
String msg="1712 hello:"+i+"msg";
channel.basicPublish("fanoutEx","",null, msg.getBytes());
System.out.println("第"+i+"条信息已经发送");
}
}
@Test
public void receive1() throws Exception{
//获取连接
Connection conn = ConnectionUtil.getConn();
Channel channel = conn.createChannel();
//声明队列
channel.queueDeclare("fanout01", false, false, false, null);
//声明交换机
channel.exchangeDeclare("fanoutEx", "fanout");
channel.basicQos(1);
//绑定队列到交换机
//参数 1 队列名称,2 交换机名称 3 路由key
channel.queueBind("fanout01","fanoutEx","");
//定义消费者
QueueingConsumer consumer=new QueueingConsumer(channel);
//消费者与队列绑定
channel.basicConsume("fanout01",false, consumer);
while(true){
Delivery delivery= consumer.nextDelivery();
System.out.println("一号消费者接收到"+new String(delivery.getBody()));
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
}
}
@Test
public void receive2() throws Exception{
//获取连接
Connection conn = ConnectionUtil.getConn();
Channel chan = conn.createChannel();
//声明队列
chan.queueDeclare("fanout02",false,false,false,null);
//声明交换机
chan.exchangeDeclare("fanoutEx","fanout");
//绑定队列到交换机
//参数 1 队列名称,2 交换机名称 3 路由key
chan.queueBind("fanout02","fanoutEx","");
chan.basicQos(1);
//定义消费者
QueueingConsumer consumer=new QueueingConsumer(chan);
//消费者与队列绑定
chan.basicConsume("fanout02",false, consumer);
while(true){
Delivery delivery= consumer.nextDelivery();
System.out.println("二号消费者接收到"+new String(delivery.getBody()));
chan.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
}
}
}
package rabbitmq4;
import org.junit.Test;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
/**
* @author Administrator
*生产者发送消息到交换机,同时绑定一个路由Key,交换机根据路由key对下游绑定的队列进行路
*由key的判断,满足路由key的队列才会接收到消息,消费者消费消息
*应用场景: 项目中的error报错
*/
public class RoutingTopicTest {
@Test
public void routingSend() throws Exception{
//获取连接
Connection conn = ConnectionUtil.getConn();
Channel chan = conn.createChannel();
//声明交换机
//参数意义,1 交换机名称,2 类型:fanout,direct,topic
chan.exchangeDeclare("directEx","direct");
//发送消息
String msg="路由模式的消息";
chan.basicPublish("directEx","jt1712",null, msg.getBytes());
}
@Test
public void routingRec01() throws Exception{
System.out.println("一号消费者等待接收消息");
//获取连接
Connection conn = ConnectionUtil.getConn();
Channel chan = conn.createChannel();
//声明队列
chan.queueDeclare("direct01", false, false, false, null);
//声明交换机
chan.exchangeDeclare("directEx","direct");
//绑定队列到交换机
//参数 1 队列名称,2 交换机名称 3 路由key
chan.queueBind("direct01", "directEx", "jt1712");
chan.basicQos(1);
//定义消费者
QueueingConsumer consumer=new QueueingConsumer(chan);
//消费者与队列绑定
chan.basicConsume("direct01",false, consumer);
while(true){
Delivery delivery= consumer.nextDelivery();
System.out.println("一号消费者接收到"+new String(delivery.getBody()));
chan.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
}
}
@Test
public void routingRec02() throws Exception{
System.out.println("二号消费者等待接收消息");
//获取连接
Connection conn = ConnectionUtil.getConn();
Channel chan = conn.createChannel();
//声明队列
chan.queueDeclare("direct02", false, false, false, null);
//声明交换机
chan.exchangeDeclare("directEx","direct");
//绑定队列到交换机
//参数 1 队列名称,2 交换机名称 3 路由key
chan.queueBind("direct02", "directEx", "jt1711");
chan.basicQos(1);
//定义消费者
QueueingConsumer consumer=new QueueingConsumer(chan);
//消费者与队列绑定
chan.basicConsume("direct02",false, consumer);
while(true){
Delivery delivery= consumer.nextDelivery();
System.out.println("二号消费者接收到"+new String(delivery.getBody()));
chan.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
}
}
}
package rabbitmq5;
import org.junit.Test;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
/**
* @author Administrator
* *号代表单个词语
* #代表多个词语
* 其他的内容与routing路由模式一致
*/
public class RoutingTopicTest {
@Test
public void topicSend() throws Exception {
// 获取连接
Connection conn = ConnectionUtil.getConn();
Channel chan = conn.createChannel();
// 声明交换机
// 参数意义,1 交换机名称,2 类型:fanout,direct,topic
chan.exchangeDeclare("topicEx", "topic");
// 发送消息
String msg = "主题模式的消息";
chan.basicPublish("topicEx", "jt1712.add.update", null, msg.getBytes());
}
@Test
public void routingRec02() throws Exception{
System.out.println("二号消费者等待接收消息");
//获取连接
Connection conn = ConnectionUtil.getConn();
Channel chan = conn.createChannel();
//声明队列
chan.queueDeclare("direct02",false,false,false, null);
//声明交换机
chan.exchangeDeclare("directEx","direct");
//绑定队列到交换机
//参数 1 队列名称,2 交换机名称 3 路由key
chan.queueBind("direct02","directEx","jt1711");
chan.basicQos(1);
//定义消费者
QueueingConsumer consumer=new QueueingConsumer(chan);
//消费者与队列绑定
chan.basicConsume("direct02",false, consumer);
while(true){
Delivery delivery= consumer.nextDelivery();
System.out.println("二号消费者接收到"+new String(delivery.getBody()));
chan.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
}
}
@Test
public void topicRec1() throws Exception{
System.out.println("一号消费者等待接收消息");
//获取连接
Connection conn = ConnectionUtil.getConn();
Channel chan = conn.createChannel();
//声明队列
chan.queueDeclare("topic01",false,false,false, null);
//声明交换机
chan.exchangeDeclare("topicEx", "topic");
//绑定队列到交换机
//参数 1 队列名称,2 交换机名称 3 路由key
chan.queueBind("topic01", "topicEx", "jt1712");
chan.basicQos(1);
//定义消费者
QueueingConsumer consumer=new QueueingConsumer(chan);
//消费者与队列绑定
chan.basicConsume("topic01",false, consumer);
while(true){
Delivery delivery= consumer.nextDelivery();
System.out.println("一号消费者接收到"+new String(delivery.getBody()));
chan.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
}
}
@Test
public void topicRec2() throws Exception{
System.out.println("二号消费者等待接收消息");
//获取连接
Connection conn = ConnectionUtil.getConn();
Channel chan = conn.createChannel();
//声明队列
chan.queueDeclare("topic02",false,false,false, null);
//声明交换机
chan.exchangeDeclare("topicEx", "topic");
//绑定队列到交换机
//参数 1 队列名称,2 交换机名称 3 路由key
chan.queueBind("topic02", "topicEx", "jt1712.#");
chan.basicQos(1);
//定义消费者
QueueingConsumer consumer=new QueueingConsumer(chan);
//消费者与队列绑定
chan.basicConsume("topic02",false, consumer);
while(true){
Delivery delivery= consumer.nextDelivery();
System.out.println("二号消费者接收到"+new String(delivery.getBody()));
chan.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
}
}
}