我的是两个项目 生产者 和消费者没在一个项目中。
两个项目都导入Rabbitmq的依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
生产者的代码:import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
@Component
@Configuration
@Service
public class AmqpConfig {
ConnectionFactory factory = new ConnectionFactory();
private final String EXCHANGE_NAME = "lanwon.statistics.exchange"; //路由名称
@Value("${spring.rabbitmq.hospitalcode}")
String hospitalcode;
@Value("${spring.rabbitmq.host}")
String host;
@Value("${spring.rabbitmq.port}")
int port;
@Value("${spring.rabbitmq.username}")
String name;
@Value("${spring.rabbitmq.password}")
String password;
@Value("${spring.rabbitmq.virtual-host}")
String VirtualHost;
public void MQCentil(String data) {
String routingKey = "hospital."+hospitalcode+".study.realtime"; //消息的别名
String QUEUE_NAME ="hospital.study.realtime"; //消息队列名称
try {
factory.setUsername(name);
factory.setPassword(password);
factory.setVirtualHost(VirtualHost);
factory.setHost(host);
factory.setPort(port);
Connection conn = factory.newConnection(); //创建连接
Channel channel = conn.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME,"topic",true); //创建路有器
/* 创建消息队列,并且发送消息 */
//queueDeclare第一个参数表示队列名称、第二个参数为是否持久化(true表示是,队列将在服务器重启时生存)、第三个参数为是否是独占队列(创建者可以使用的私有队列,断开后自动删除)、第四个参数为当所有消费者客户端连接断开时是否自动删除队列、第五个参数为队列的其他参数
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, routingKey); //绑定器
byte[] messageBodyBytes = data.getBytes();
channel.basicPublish(EXCHANGE_NAME, routingKey, null, messageBodyBytes); //basicPublish第一个参数为交换机名称、第二个参数为队列映射的路由key、第三个参数为消息的其他属性、第四个参数为发送信息的主体
/* 关闭连接 */
channel.close();
conn.close();
} catch (Exception e) {
}
}
}
生产者的配置文件:
1 spring.rabbitmq.hospitalcode=455769641 2 3 spring.rabbitmq.host=localhost 4 spring.rabbitmq.port=5672 5 spring.rabbitmq.username=guest 6 spring.rabbitmq.password=guest 7 spring.rabbitmq.publisher-confirms=true 8 spring.rabbitmq.virtual-host=/
生产者代码调用我是用定时器做的,实现每分钟调用一次生产者发送消息到消息队列 MQCentil() 方法里的data参数就是发送的消息内容
消费者的代码:
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;
import com.yarlung.service.statistics.TStatisticsStudyRealtimeService;
@Service
public class CustomerMQ{
@Autowired
private TStatisticsStudyRealtimeService tStatisticsStudyRealtimeService;
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
@Value("${spring.rabbitmq.host}")
String host;
@Value("${spring.rabbitmq.port}")
int port;
@Value("${spring.rabbitmq.username}")
String name;
@Value("${spring.rabbitmq.password}")
String password;
@Value("${spring.rabbitmq.virtual-host}")
String VirtualHost;
private final String EXCHANGE_NAME ="lanwon.statistics.exchange";
private String QUEUE_NAME ="hospital.study.realtime";
// @RabbitHandler
// @RabbitListener(queues="hospital.study.realtime")
public void customer() throws IOException, TimeoutException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
//设置RabbitMQ地址
factory.setUsername(name);
factory.setPassword(password);
factory.setVirtualHost(VirtualHost);
factory.setHost(host);
factory.setPort(port);
//创建一个新的连接
Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME,"topic",true); //创建路有器
/* 创建消息队列,并且发送消息 */
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "lanwon.hospital.#"); //绑定器 topic主题方法获取消息
// 同一时刻服务器只会发一条消息给消费者 ,在消息确认前不接收其他消息
// channel.basicQos(1);
//配置好获取消息的方式
boolean autoAck=false;
//消息消费完成确认
channel.basicConsume(QUEUE_NAME, autoAck,"", new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
try {
String message = new String(body, "UTF-8");
tStatisticsStudyRealtimeService.saveStatistics(message); //分钟表存储
}
catch (Exception e) {
channel.abort(); //此操作中的所有异常将被丢弃
}
finally {
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
});
}
}
消费者的代码使用过程中遇到一个问题,我最初是用
@RabbitListener(queues="lanwon.hospital.study.statistics.realtime")这个注解来让springboot知道这是个消费者方法,让他自动调用这个方法,但是会出现一个问题,消费者的连接会不断的增加,到最后服务器的tcp连接就爆了

然后我就没用这个注解了 ,我写了一个在springboot启动后就执行一次的方法来调用消费者来启动,就完美解决了这个问题。(注意如果是启动中调用消费者的话,不能读取到配置文件里面的配置)
下面是springboot启动后执行的方法实现
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.event.ContextRefreshedEvent;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.ShutdownSignalException;
import com.yarlung.service.rabbitmq.CustomerMQ;
@Configuration
public class ApplicationStartup implements ApplicationListener<ContextRefreshedEvent>{
@Autowired
private CustomerMQ customerMQ;
@Override
public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) { //项目启动后,执行启动消费者方法
try {
customerMQ.customer(); //消费者的实现方法
} catch (ShutdownSignalException | ConsumerCancelledException | IOException | TimeoutException
| InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
消费者配置文件
spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest spring.rabbitmq.virtual-host=/ spring.rabbitmq.publisher-confirms=true
