我的是兩個項目 生產者 和消費者沒在一個項目中。
兩個項目都導入Rabbitmq的依賴:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
生產者的代碼:import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration; import org.springframework.stereotype.Component; import org.springframework.stereotype.Service; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; @Component @Configuration @Service public class AmqpConfig { ConnectionFactory factory = new ConnectionFactory(); private final String EXCHANGE_NAME = "lanwon.statistics.exchange"; //路由名稱 @Value("${spring.rabbitmq.hospitalcode}") String hospitalcode; @Value("${spring.rabbitmq.host}") String host; @Value("${spring.rabbitmq.port}") int port; @Value("${spring.rabbitmq.username}") String name; @Value("${spring.rabbitmq.password}") String password; @Value("${spring.rabbitmq.virtual-host}") String VirtualHost; public void MQCentil(String data) { String routingKey = "hospital."+hospitalcode+".study.realtime"; //消息的別名 String QUEUE_NAME ="hospital.study.realtime"; //消息隊列名稱 try { factory.setUsername(name); factory.setPassword(password); factory.setVirtualHost(VirtualHost); factory.setHost(host); factory.setPort(port); Connection conn = factory.newConnection(); //創建連接 Channel channel = conn.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME,"topic",true); //創建路有器 /* 創建消息隊列,並且發送消息 */
//queueDeclare第一個參數表示隊列名稱、第二個參數為是否持久化(true表示是,隊列將在服務器重啟時生存)、第三個參數為是否是獨占隊列(創建者可以使用的私有隊列,斷開后自動刪除)、第四個參數為當所有消費者客戶端連接斷開時是否自動刪除隊列、第五個參數為隊列的其他參數
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, routingKey); //綁定器
byte[] messageBodyBytes = data.getBytes();
channel.basicPublish(EXCHANGE_NAME, routingKey, null, messageBodyBytes); //basicPublish第一個參數為交換機名稱、第二個參數為隊列映射的路由key、第三個參數為消息的其他屬性、第四個參數為發送信息的主體
/* 關閉連接 */ channel.close(); conn.close(); } catch (Exception e) { } } }
生產者的配置文件:
1 spring.rabbitmq.hospitalcode=455769641 2 3 spring.rabbitmq.host=localhost 4 spring.rabbitmq.port=5672 5 spring.rabbitmq.username=guest 6 spring.rabbitmq.password=guest 7 spring.rabbitmq.publisher-confirms=true 8 spring.rabbitmq.virtual-host=/
生產者代碼調用我是用定時器做的,實現每分鍾調用一次生產者發送消息到消息隊列 MQCentil() 方法里的data參數就是發送的消息內容
消費者的代碼:
import java.io.IOException; import java.util.concurrent.TimeoutException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.ConsumerCancelledException; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import com.rabbitmq.client.ShutdownSignalException; import com.yarlung.service.statistics.TStatisticsStudyRealtimeService; @Service public class CustomerMQ{ @Autowired private TStatisticsStudyRealtimeService tStatisticsStudyRealtimeService; // 創建連接工廠 ConnectionFactory factory = new ConnectionFactory(); @Value("${spring.rabbitmq.host}") String host; @Value("${spring.rabbitmq.port}") int port; @Value("${spring.rabbitmq.username}") String name; @Value("${spring.rabbitmq.password}") String password; @Value("${spring.rabbitmq.virtual-host}") String VirtualHost; private final String EXCHANGE_NAME ="lanwon.statistics.exchange"; private String QUEUE_NAME ="hospital.study.realtime"; // @RabbitHandler // @RabbitListener(queues="hospital.study.realtime") public void customer() throws IOException, TimeoutException, ShutdownSignalException, ConsumerCancelledException, InterruptedException { //設置RabbitMQ地址 factory.setUsername(name); factory.setPassword(password); factory.setVirtualHost(VirtualHost); factory.setHost(host); factory.setPort(port); //創建一個新的連接 Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME,"topic",true); //創建路有器 /* 創建消息隊列,並且發送消息 */ channel.queueDeclare(QUEUE_NAME, true, false, false, null); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "lanwon.hospital.#"); //綁定器 topic主題方法獲取消息 // 同一時刻服務器只會發一條消息給消費者 ,在消息確認前不接收其他消息 // channel.basicQos(1); //配置好獲取消息的方式 boolean autoAck=false; //消息消費完成確認 channel.basicConsume(QUEUE_NAME, autoAck,"", new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { try { String message = new String(body, "UTF-8"); tStatisticsStudyRealtimeService.saveStatistics(message); //分鍾表存儲 } catch (Exception e) { channel.abort(); //此操作中的所有異常將被丟棄 } finally { channel.basicAck(envelope.getDeliveryTag(),false); } } }); } }
消費者的代碼使用過程中遇到一個問題,我最初是用
@RabbitListener(queues="lanwon.hospital.study.statistics.realtime")這個注解來讓springboot知道這是個消費者方法,讓他自動調用這個方法,但是會出現一個問題,消費者的連接會不斷的增加,到最后服務器的tcp連接就爆了
然后我就沒用這個注解了 ,我寫了一個在springboot啟動后就執行一次的方法來調用消費者來啟動,就完美解決了這個問題。(注意如果是啟動中調用消費者的話,不能讀取到配置文件里面的配置)
下面是springboot啟動后執行的方法實現
import java.io.IOException; import java.util.concurrent.TimeoutException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationListener; import org.springframework.context.annotation.Configuration; import org.springframework.context.event.ContextRefreshedEvent; import com.rabbitmq.client.ConsumerCancelledException; import com.rabbitmq.client.ShutdownSignalException; import com.yarlung.service.rabbitmq.CustomerMQ; @Configuration public class ApplicationStartup implements ApplicationListener<ContextRefreshedEvent>{ @Autowired private CustomerMQ customerMQ; @Override public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) { //項目啟動后,執行啟動消費者方法 try { customerMQ.customer(); //消費者的實現方法 } catch (ShutdownSignalException | ConsumerCancelledException | IOException | TimeoutException | InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
消費者配置文件
spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest spring.rabbitmq.virtual-host=/ spring.rabbitmq.publisher-confirms=true