我的是兩個項目 生產者 和消費者沒在一個項目中。
兩個項目都導入Rabbitmq的依賴:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
生產者的代碼:import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
@Component
@Configuration
@Service
public class AmqpConfig {
ConnectionFactory factory = new ConnectionFactory();
private final String EXCHANGE_NAME = "lanwon.statistics.exchange"; //路由名稱
@Value("${spring.rabbitmq.hospitalcode}")
String hospitalcode;
@Value("${spring.rabbitmq.host}")
String host;
@Value("${spring.rabbitmq.port}")
int port;
@Value("${spring.rabbitmq.username}")
String name;
@Value("${spring.rabbitmq.password}")
String password;
@Value("${spring.rabbitmq.virtual-host}")
String VirtualHost;
public void MQCentil(String data) {
String routingKey = "hospital."+hospitalcode+".study.realtime"; //消息的別名
String QUEUE_NAME ="hospital.study.realtime"; //消息隊列名稱
try {
factory.setUsername(name);
factory.setPassword(password);
factory.setVirtualHost(VirtualHost);
factory.setHost(host);
factory.setPort(port);
Connection conn = factory.newConnection(); //創建連接
Channel channel = conn.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME,"topic",true); //創建路有器
/* 創建消息隊列,並且發送消息 */
//queueDeclare第一個參數表示隊列名稱、第二個參數為是否持久化(true表示是,隊列將在服務器重啟時生存)、第三個參數為是否是獨占隊列(創建者可以使用的私有隊列,斷開后自動刪除)、第四個參數為當所有消費者客戶端連接斷開時是否自動刪除隊列、第五個參數為隊列的其他參數
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, routingKey); //綁定器
byte[] messageBodyBytes = data.getBytes();
channel.basicPublish(EXCHANGE_NAME, routingKey, null, messageBodyBytes); //basicPublish第一個參數為交換機名稱、第二個參數為隊列映射的路由key、第三個參數為消息的其他屬性、第四個參數為發送信息的主體
/* 關閉連接 */
channel.close();
conn.close();
} catch (Exception e) {
}
}
}
生產者的配置文件:
1 spring.rabbitmq.hospitalcode=455769641 2 3 spring.rabbitmq.host=localhost 4 spring.rabbitmq.port=5672 5 spring.rabbitmq.username=guest 6 spring.rabbitmq.password=guest 7 spring.rabbitmq.publisher-confirms=true 8 spring.rabbitmq.virtual-host=/
生產者代碼調用我是用定時器做的,實現每分鍾調用一次生產者發送消息到消息隊列 MQCentil() 方法里的data參數就是發送的消息內容
消費者的代碼:
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;
import com.yarlung.service.statistics.TStatisticsStudyRealtimeService;
@Service
public class CustomerMQ{
@Autowired
private TStatisticsStudyRealtimeService tStatisticsStudyRealtimeService;
// 創建連接工廠
ConnectionFactory factory = new ConnectionFactory();
@Value("${spring.rabbitmq.host}")
String host;
@Value("${spring.rabbitmq.port}")
int port;
@Value("${spring.rabbitmq.username}")
String name;
@Value("${spring.rabbitmq.password}")
String password;
@Value("${spring.rabbitmq.virtual-host}")
String VirtualHost;
private final String EXCHANGE_NAME ="lanwon.statistics.exchange";
private String QUEUE_NAME ="hospital.study.realtime";
// @RabbitHandler
// @RabbitListener(queues="hospital.study.realtime")
public void customer() throws IOException, TimeoutException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
//設置RabbitMQ地址
factory.setUsername(name);
factory.setPassword(password);
factory.setVirtualHost(VirtualHost);
factory.setHost(host);
factory.setPort(port);
//創建一個新的連接
Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME,"topic",true); //創建路有器
/* 創建消息隊列,並且發送消息 */
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "lanwon.hospital.#"); //綁定器 topic主題方法獲取消息
// 同一時刻服務器只會發一條消息給消費者 ,在消息確認前不接收其他消息
// channel.basicQos(1);
//配置好獲取消息的方式
boolean autoAck=false;
//消息消費完成確認
channel.basicConsume(QUEUE_NAME, autoAck,"", new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
try {
String message = new String(body, "UTF-8");
tStatisticsStudyRealtimeService.saveStatistics(message); //分鍾表存儲
}
catch (Exception e) {
channel.abort(); //此操作中的所有異常將被丟棄
}
finally {
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
});
}
}
消費者的代碼使用過程中遇到一個問題,我最初是用
@RabbitListener(queues="lanwon.hospital.study.statistics.realtime")這個注解來讓springboot知道這是個消費者方法,讓他自動調用這個方法,但是會出現一個問題,消費者的連接會不斷的增加,到最后服務器的tcp連接就爆了

然后我就沒用這個注解了 ,我寫了一個在springboot啟動后就執行一次的方法來調用消費者來啟動,就完美解決了這個問題。(注意如果是啟動中調用消費者的話,不能讀取到配置文件里面的配置)
下面是springboot啟動后執行的方法實現
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.event.ContextRefreshedEvent;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.ShutdownSignalException;
import com.yarlung.service.rabbitmq.CustomerMQ;
@Configuration
public class ApplicationStartup implements ApplicationListener<ContextRefreshedEvent>{
@Autowired
private CustomerMQ customerMQ;
@Override
public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) { //項目啟動后,執行啟動消費者方法
try {
customerMQ.customer(); //消費者的實現方法
} catch (ShutdownSignalException | ConsumerCancelledException | IOException | TimeoutException
| InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
消費者配置文件
spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest spring.rabbitmq.virtual-host=/ spring.rabbitmq.publisher-confirms=true
