1.前言
消息队列除了kafka 外,还有许多种,比如RabbitMQ 、ActiveMQ、ZeroMQ、JMQ等。
老牌的ActiveMQ ,底层使用Java写的,资源消耗大,速度也慢,但是适合 JMS 【java message service】的使用 ,事实上,性能差,现在用的人很少了。
现在流行使用kafka,那是因为支持很大的吞吐量,处理数据速度很快,但是,对数据的处理安全性不高,而且,需要处理那么大吞吐量的应用实际上不多,
kafka更多的是使用在大数据方面,底层是 使用 zookeeper开发 。
RabbitMQ的吞吐量比kafka的低一些,实际上这没有可比性,RabbitMQ的开发理念重点不做吞吐量,而是安全性,常使用在金融方面的应用,使用的人很多,技术成熟,
是 amqp协议的完美实现,底层使用erlang语言实现,每个节点的服务程序【broker】由交换机和 消息队列 组成 ,消息队列又分主队列和镜像队列,如果主队列挂掉了,
那么会选一个镜像队列成为主队列,也就是说镜像队列只要是用来备份的。那么,读取队列信息如果连接到非主队列,则需要交换机路由到指定主队列读取,因此这样的单节点,
导致了吞吐量受限。
综合上来说,RabbitMQ是最好的,如果单考虑吞吐量,那么肯定选择kafka。
这一篇随笔,讲解RabbitMQ 的 4大交换机中的 直连交换机的简单使用。
消息中间件不仅可以在多个服务器间使用,也可以在单个服务器使用,用于消息转发给订阅消息队列的监听器,
这里我以两个服务器作为演示,消息生产者工程端口为1004,消息消费者工程端口为1002.
注意,需要提前安装RabbitMQ软件,window10 详细安装 的 随笔地址 https://www.cnblogs.com/c2g5201314/p/12990634.html
消息生产者端总结: (1)使用直连交换机 ,需要给绑定的消息队列分配路由键 ,也就是一串用于识别的字符串。 (2)调用rabbit模板发送消息时,需要参数分别是 直连交换机名字、路由键、消息字, 数据类型都是字符串 ,如果是键值对象则需要转成json字符串,然后后接收的消费者端解析json即可。 (3)直连交换机发送消息的底层原理,其实是使用rabbit模板,根据指定的交换机名字查找交换机【因此不同类型的交换机名字是不允许相同的】,找到后将路由键和消息传给该直连交换机,
然后该交换机根据路由键查找消息队列,需要与消息队列的路由键完全一样才可以匹配成功,找到匹配的消息队列后,将消息放入消息队列中,然后消息队列会自动将消息推送给监听该消息队列的消费者端,
当消费者接收后并且确认后,消息队列会将该消息销毁。 (4)如果路由键匹配不到消息队列【即消息队列不存在】,消息将会抛弃。 (5)如果匹配到了消息队列,但是没有监听该消息队列的消费者端,那么消息将一直存在该队列中,直到有监听该队列的消费者端启动后,消费该消息,消息才会从消息队列中销毁。

消费完清空后,将恢复为0,【因此可证明 ,数据很安全,不会丢失】
消息消费者端总结: 总结: (1)消息消费者不需要配置什么东西,只需要在配置文件添加rabbit地址端口账号密码,即可连接, 然后在需要的监听类关联指定的队列名字即可接收到该队列的消息 (2)如果是使用 直连交换机发送消息,该队列的所有监听将会使用轮询策略做负载均衡来消费信息, 不论是将监听放于类上还是方法上,效果都是一样的 (3)方法上写监听,记得在类上加@Service或@Component注册bean,否则消息队列监听注册无效 (4)消息只能传输字符串,但是可以使用json字符串,获取后再对其解析即可,可用fastjson解析,
也可以使用objectMapper强制解析【不建议使用】
2.消息生产者端
(1)目录结构
红箭头标出来的两个文件是核心文件
(2)导入依赖包
<!-- 消息中间件--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> <version>2.1.6.RELEASE</version> </dependency>
pom.xml源码【不可直接复制源码,我这里是maven多模块的子工程,需要改依赖管理的】

<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>cen.cloud</groupId> <artifactId>cen-mycloud</artifactId> <version>0.0.1-SNAPSHOT</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.example</groupId> <artifactId>rabbitmq-producer-1004</artifactId> <version>0.0.1-SNAPSHOT</version> <name>rabbitmq-producer-1004</name> <description>Demo project for Spring Boot</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> <exclusions> <exclusion> <groupId>org.junit.vintage</groupId> <artifactId>junit-vintage-engine</artifactId> </exclusion> </exclusions> </dependency> <!--eureka 注册中心依赖包 --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-server</artifactId> </dependency> <!-- 消息中间件--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> <version>2.1.6.RELEASE</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
(3)配置application.properties文件
完整源码

#工程名/项目名/应用名/服务名 spring.application.name=rabbitmq-producer-1004 #端口号 server.port=1004 #eureka注册 eureka.client.serviceUrl.defaultZone=http://localhost:7001/eureka/ #rabbitmq配置 #spring.rabbitmq.virtual-host=/ spring.rabbitmq.host=127.0.0.1 spring.rabbitmq.port=5672 #默认账户 spring.rabbitmq.username=guest #默认密码 spring.rabbitmq.password=guest # #spring.rabbitmq.listener.simple.concurrency=10 #spring.rabbitmq.listener.simple.max-concurrency=20 #spring.rabbitmq.listener.simple.prefetch=50 ## #mq.env=local # # # #日志配置 # 指定日志输入级别【根节点,表明整个项目基本的日志级别】 logging.level.root=info # ** 表示是指定的某个文件的路径或类的日志级别 #logging.level.**=info # 指定日志输出位置和日志文件名 , ./指工程根目录 logging.file=./rabbitmq-producer-1004/log/spring.log # 指定日志输出路径,若file和path同时配置,则file生效 # 此配置默认生成文件为spring.log #logging.file.path=./log # 控制台日志输出格式 # -5表示从左显示5个字符宽度 logging.pattern.console=%d{yyyy-MM-dd HH:mm:ss.SSS} %highlight(%-5level) %boldYellow(%thread) | %boldGreen(%logger) | %msg%n # 文件中输出的格式 logging.pattern.file=%d{yyyy-MM-dd HH:mm:ss.SSS} = [%thread] = %-5level = %logger{50} - %msg%n
(4)创建rabbitmq配置类

package com.example.rabbitmqproducer1004.config; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * rabbitmq配置类---消息生产者 */ @Configuration public class RabbitmqConfig { //日志记录器 Logger logger = LoggerFactory.getLogger(getClass()); /** * 定义 交换机、消息队列、路由关键字 的名字 */ //定义交换机名字 exchange public static final String EXCHANG_1 = "exchange_1"; //定义消息队列名字 queue public static final String QUEUE_1 = "queu_1"; //定义路由键 routingkey public static final String ROUTINGKEY_1 = "routing_1"; //=============================================================== /** * 下面的是 直连交换机 设置 绑定 消息队列 到 交换机 * * DirectExchange:直连交换机,按照routingkey分发到指定队列 */ //============================================== /** * 设置交换机类型 */ @Bean public DirectExchange directExchange() { logger.warn("设置交换机类型"); //实例交换机对象,然后注入该交换机的名字 return new DirectExchange(EXCHANG_1); } /** * 创建消息队列 */ @Bean public Queue queue1() { logger.warn("创建消息队列"); //实例消息队列对象,输入该队列名字,如果需要该队列持久化,则设为true,默认是false // return new Queue(QUEUE_1, true); return new Queue(QUEUE_1); } /** * 绑定 消息队列 到 交换机【一个 交换机 允许被多个 消息队列 绑定】 */ @Bean public Binding binding() { logger.warn("绑定 消息队列 到 交换机"); //使用绑定构造器将 指定的队列 绑定到 指定的交换机上 ,Direct交换机需要携带 路由键 return BindingBuilder.bind(queue1()).to(directExchange()).with(ROUTINGKEY_1); } }
(5)创建消息生产类

package com.example.rabbitmqproducer1004.rabbitmqFactory; import com.example.rabbitmqproducer1004.config.RabbitmqConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; import java.util.UUID; /** * 消息生产类 */ @Component //实现接口 public class SendMessage { Logger logger = LoggerFactory.getLogger(this.getClass()); @Autowired private RabbitTemplate rabbitTemplate; /** * 发送消息 * * 参数是消息内容 */ public void send(String message){ logger.warn("发送消息,内容:"+message); //发送消息 ,参数分别是 : 指定的交换机名字 、指定的路由键、消息字符串 rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANG_1,RabbitmqConfig.ROUTINGKEY_1,message); } }
(6)controller层,调用消息生产类

package com.example.rabbitmqproducer1004.controller; import com.example.rabbitmqproducer1004.rabbitmqFactory.SendMessage; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; @RestController public class DController { @Autowired private SendMessage sendMessage; @RequestMapping("/mq") public String mq(String msg){ sendMessage.send(msg); return "发送成功"; } }
3.消息消费者端
(1)目录结构
红箭头标出来的文件是核心文件
(2)导入依赖包
<!-- 消息中间件--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> <version>2.1.6.RELEASE</version> </dependency>
完整的pom.xml

<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>cen.cloud</groupId> <artifactId>cen-mycloud</artifactId> <version>0.0.1-SNAPSHOT</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.example</groupId> <artifactId>rabbitmq-consumer-1002</artifactId> <version>0.0.1-SNAPSHOT</version> <name>rabbitmq-consumer-1002</name> <description>Demo project for Spring Boot</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> <exclusions> <exclusion> <groupId>org.junit.vintage</groupId> <artifactId>junit-vintage-engine</artifactId> </exclusion> </exclusions> </dependency> <!--eureka 注册中心依赖包 --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-server</artifactId> </dependency> <!-- <!–健康检测管理中心 ,可刷新配置文件–>--> <!-- <dependency>--> <!-- <groupId>org.springframework.boot</groupId>--> <!-- <artifactId>spring-boot-starter-actuator</artifactId>--> <!-- </dependency>--> <!-- 消息中间件--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> <version>2.1.6.RELEASE</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
(3)配置application.properties文件
完整源码

#工程名/项目名/应用名/服务名 spring.application.name=rabbitmq-consumer-1002 #端口号 server.port=1002 #eureka注册 eureka.client.serviceUrl.defaultZone=http://localhost:7001/eureka/ #rabbitmq配置 #spring.rabbitmq.virtual-host=/ spring.rabbitmq.host=127.0.0.1 spring.rabbitmq.port=5672 #默认账户 spring.rabbitmq.username=guest #默认密码 spring.rabbitmq.password=guest # #spring.rabbitmq.listener.simple.concurrency=10 #spring.rabbitmq.listener.simple.max-concurrency=20 #spring.rabbitmq.listener.simple.prefetch=50 ## #mq.env=local # # # #日志配置 # 指定日志输入级别【根节点,表明整个项目基本的日志级别】 logging.level.root=info # ** 表示是指定的某个文件的路径或类的日志级别 #logging.level.**=info # 指定日志输出位置和日志文件名 , ./指工程根目录 logging.file=./rabbitmq-consumer-1002/log/spring.log # 指定日志输出路径,若file和path同时配置,则file生效 # 此配置默认生成文件为spring.log #logging.file.path=./log # 控制台日志输出格式 # -5表示从左显示5个字符宽度 logging.pattern.console=%d{yyyy-MM-dd HH:mm:ss.SSS} %highlight(%-5level) %boldYellow(%thread) | %boldGreen(%logger) | %msg%n # 文件中输出的格式 logging.pattern.file=%d{yyyy-MM-dd HH:mm:ss.SSS} = [%thread] = %-5level = %logger{50} - %msg%n
(4)rabbitmq配置类

package com.example.rabbitmqconsumer1002.config; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Configuration; import org.springframework.stereotype.Component; /** * rabbitmq的消费者配置类 */ @Configuration public class RabbitConfig { private final Logger logger = LoggerFactory.getLogger(this.getClass()); //定义需要关联的消息队列名字 queue public static final String QUEUE_1 = "queu_1"; }
是的,你没看错,就这么点东西
(5)消息队列监听
接听方式分两种方式,
一种是放在类上

package com.example.rabbitmqconsumer1002.rabbitmqListener; import com.example.rabbitmqconsumer1002.config.RabbitConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * 消息监听类--发短信 */ //注册bean @Component //设置需要监听的消息队列 @RabbitListener(queues = RabbitConfig.QUEUE_1) public class SendMessageListener { Logger logger = LoggerFactory.getLogger(getClass()); //消息事件处理 @RabbitHandler public void sendMessage(String msg) { logger.warn("我是端口1002的消费者,收到信息:" + msg); } }
一种是放在方法上 【但记得给这个方法的类注册bean】

package com.example.rabbitmqconsumer1002.rabbitmqListener; import com.example.rabbitmqconsumer1002.config.RabbitConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import org.springframework.stereotype.Service; /** * 记得加@Service或@Component注册bean,否则消息队列监听注册无效 */ //@Service @Component public class SMService { Logger logger = LoggerFactory.getLogger(getClass()); @RabbitListener(queues = RabbitConfig.QUEUE_1) public void kk(String msg){ logger.warn("我是端口1002的消费者--方法监听--,收到信息:" + msg); } }
4.测试
必须先启动消息生产者端的工程,会自动在rabbitmq创建消息队列和交换机,然后再启动消息消费者端,
否则消费者端因为监听不到该指定消息队列而报错。
(1)启动消息生产端
打印的初始化循序
【请忽略日志级别,那是因为我故意设为警告级别,红色看起来明显】
浏览器输入网址 http://127.0.0.1:15672/
可进入rabbitmq监控页面
使用默认账号密码登录
选择 connection 选项,可以查看当前连接rabbitmq的主机信息
选择exchange选项,可以看到新建的交换机
选择queue选项,可看到新建的消息队列
(2)启动消息消费端
启动后,再次选择 connection 选项,可以看的消费者端也连接好了
(3)调用消息生产者的接口发消息,访问网址 http://localhost:1004/mq?msg=你大爷,帮我发短信85345
提示发送成功
查看生产者控制台
现在去看消费者的控制台
可见 ,消费者 成功从消息队列获取到了消息。