我的是两个项目 生产者 和消费者没在一个项目中。
两个项目都导入Rabbitmq的依赖:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
生产者的代码:import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration; import org.springframework.stereotype.Component; import org.springframework.stereotype.Service; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; @Component @Configuration @Service public class AmqpConfig { ConnectionFactory factory = new ConnectionFactory(); private final String EXCHANGE_NAME = "lanwon.statistics.exchange"; //路由名称 @Value("${spring.rabbitmq.hospitalcode}") String hospitalcode; @Value("${spring.rabbitmq.host}") String host; @Value("${spring.rabbitmq.port}") int port; @Value("${spring.rabbitmq.username}") String name; @Value("${spring.rabbitmq.password}") String password; @Value("${spring.rabbitmq.virtual-host}") String VirtualHost; public void MQCentil(String data) { String routingKey = "hospital."+hospitalcode+".study.realtime"; //消息的别名 String QUEUE_NAME ="hospital.study.realtime"; //消息队列名称 try { factory.setUsername(name); factory.setPassword(password); factory.setVirtualHost(VirtualHost); factory.setHost(host); factory.setPort(port); Connection conn = factory.newConnection(); //创建连接 Channel channel = conn.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME,"topic",true); //创建路有器 /* 创建消息队列,并且发送消息 */
//queueDeclare第一个参数表示队列名称、第二个参数为是否持久化(true表示是,队列将在服务器重启时生存)、第三个参数为是否是独占队列(创建者可以使用的私有队列,断开后自动删除)、第四个参数为当所有消费者客户端连接断开时是否自动删除队列、第五个参数为队列的其他参数
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, routingKey); //绑定器
byte[] messageBodyBytes = data.getBytes();
channel.basicPublish(EXCHANGE_NAME, routingKey, null, messageBodyBytes); //basicPublish第一个参数为交换机名称、第二个参数为队列映射的路由key、第三个参数为消息的其他属性、第四个参数为发送信息的主体
/* 关闭连接 */ channel.close(); conn.close(); } catch (Exception e) { } } }
生产者的配置文件:
1 spring.rabbitmq.hospitalcode=455769641 2 3 spring.rabbitmq.host=localhost 4 spring.rabbitmq.port=5672 5 spring.rabbitmq.username=guest 6 spring.rabbitmq.password=guest 7 spring.rabbitmq.publisher-confirms=true 8 spring.rabbitmq.virtual-host=/
生产者代码调用我是用定时器做的,实现每分钟调用一次生产者发送消息到消息队列 MQCentil() 方法里的data参数就是发送的消息内容
消费者的代码:
import java.io.IOException; import java.util.concurrent.TimeoutException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.ConsumerCancelledException; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import com.rabbitmq.client.ShutdownSignalException; import com.yarlung.service.statistics.TStatisticsStudyRealtimeService; @Service public class CustomerMQ{ @Autowired private TStatisticsStudyRealtimeService tStatisticsStudyRealtimeService; // 创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); @Value("${spring.rabbitmq.host}") String host; @Value("${spring.rabbitmq.port}") int port; @Value("${spring.rabbitmq.username}") String name; @Value("${spring.rabbitmq.password}") String password; @Value("${spring.rabbitmq.virtual-host}") String VirtualHost; private final String EXCHANGE_NAME ="lanwon.statistics.exchange"; private String QUEUE_NAME ="hospital.study.realtime"; // @RabbitHandler // @RabbitListener(queues="hospital.study.realtime") public void customer() throws IOException, TimeoutException, ShutdownSignalException, ConsumerCancelledException, InterruptedException { //设置RabbitMQ地址 factory.setUsername(name); factory.setPassword(password); factory.setVirtualHost(VirtualHost); factory.setHost(host); factory.setPort(port); //创建一个新的连接 Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME,"topic",true); //创建路有器 /* 创建消息队列,并且发送消息 */ channel.queueDeclare(QUEUE_NAME, true, false, false, null); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "lanwon.hospital.#"); //绑定器 topic主题方法获取消息 // 同一时刻服务器只会发一条消息给消费者 ,在消息确认前不接收其他消息 // channel.basicQos(1); //配置好获取消息的方式 boolean autoAck=false; //消息消费完成确认 channel.basicConsume(QUEUE_NAME, autoAck,"", new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { try { String message = new String(body, "UTF-8"); tStatisticsStudyRealtimeService.saveStatistics(message); //分钟表存储 } catch (Exception e) { channel.abort(); //此操作中的所有异常将被丢弃 } finally { channel.basicAck(envelope.getDeliveryTag(),false); } } }); } }
消费者的代码使用过程中遇到一个问题,我最初是用
@RabbitListener(queues="lanwon.hospital.study.statistics.realtime")这个注解来让springboot知道这是个消费者方法,让他自动调用这个方法,但是会出现一个问题,消费者的连接会不断的增加,到最后服务器的tcp连接就爆了
然后我就没用这个注解了 ,我写了一个在springboot启动后就执行一次的方法来调用消费者来启动,就完美解决了这个问题。(注意如果是启动中调用消费者的话,不能读取到配置文件里面的配置)
下面是springboot启动后执行的方法实现
import java.io.IOException; import java.util.concurrent.TimeoutException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationListener; import org.springframework.context.annotation.Configuration; import org.springframework.context.event.ContextRefreshedEvent; import com.rabbitmq.client.ConsumerCancelledException; import com.rabbitmq.client.ShutdownSignalException; import com.yarlung.service.rabbitmq.CustomerMQ; @Configuration public class ApplicationStartup implements ApplicationListener<ContextRefreshedEvent>{ @Autowired private CustomerMQ customerMQ; @Override public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) { //项目启动后,执行启动消费者方法 try { customerMQ.customer(); //消费者的实现方法 } catch (ShutdownSignalException | ConsumerCancelledException | IOException | TimeoutException | InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
消费者配置文件
spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest spring.rabbitmq.virtual-host=/ spring.rabbitmq.publisher-confirms=true