springboot集成rabbitmq的代碼實現


我的是兩個項目 生產者 和消費者沒在一個項目中。

兩個項目都導入Rabbitmq的依賴:

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>

 生產者的代碼:import org.springframework.beans.factory.annotation.Value;

import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

@Component
@Configuration
@Service
public class AmqpConfig {
	
	ConnectionFactory factory = new ConnectionFactory();
	 private final  String EXCHANGE_NAME = "lanwon.statistics.exchange";  //路由名稱
	@Value("${spring.rabbitmq.hospitalcode}")
	String hospitalcode;
	@Value("${spring.rabbitmq.host}")
	String host;
	@Value("${spring.rabbitmq.port}")
	int port;
	@Value("${spring.rabbitmq.username}")
	String name;

	@Value("${spring.rabbitmq.password}")
	String password;
	@Value("${spring.rabbitmq.virtual-host}")
	String VirtualHost;

	public void MQCentil(String data) {
		String routingKey = "hospital."+hospitalcode+".study.realtime";   //消息的別名
		String  QUEUE_NAME ="hospital.study.realtime";             //消息隊列名稱
		try {
			factory.setUsername(name);
			factory.setPassword(password);
			factory.setVirtualHost(VirtualHost);
			factory.setHost(host);
			factory.setPort(port);
			Connection conn = factory.newConnection();         //創建連接
	        Channel channel = conn.createChannel();          
	        channel.exchangeDeclare(EXCHANGE_NAME,"topic",true); //創建路有器
	        /* 創建消息隊列,並且發送消息 */
   //queueDeclare第一個參數表示隊列名稱、第二個參數為是否持久化(true表示是,隊列將在服務器重啟時生存)、第三個參數為是否是獨占隊列(創建者可以使用的私有隊列,斷開后自動刪除)、第四個參數為當所有消費者客戶端連接斷開時是否自動刪除隊列、第五個參數為隊列的其他參數
         channel.queueDeclare(QUEUE_NAME, true, false, false, null);
   channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, routingKey); //綁定器
             byte[] messageBodyBytes = data.getBytes();                 
           channel.basicPublish(EXCHANGE_NAME, routingKey, null, messageBodyBytes); //basicPublish第一個參數為交換機名稱、第二個參數為隊列映射的路由key、第三個參數為消息的其他屬性、第四個參數為發送信息的主體
	        /* 關閉連接 */  
	        channel.close();
	        conn.close();  
		} catch (Exception e) {
		}
	}

}

  生產者的配置文件:

1 spring.rabbitmq.hospitalcode=455769641
2 
3 spring.rabbitmq.host=localhost
4 spring.rabbitmq.port=5672
5 spring.rabbitmq.username=guest
6 spring.rabbitmq.password=guest
7 spring.rabbitmq.publisher-confirms=true
8 spring.rabbitmq.virtual-host=/

生產者代碼調用我是用定時器做的,實現每分鍾調用一次生產者發送消息到消息隊列  MQCentil() 方法里的data參數就是發送的消息內容

消費者的代碼:

import java.io.IOException;
import java.util.concurrent.TimeoutException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;
import com.yarlung.service.statistics.TStatisticsStudyRealtimeService;

@Service
public class CustomerMQ{
	@Autowired
	private TStatisticsStudyRealtimeService tStatisticsStudyRealtimeService;
		// 創建連接工廠	
	     ConnectionFactory factory = new ConnectionFactory();

		@Value("${spring.rabbitmq.host}")
		 String host;
		@Value("${spring.rabbitmq.port}")
		 int port;
		@Value("${spring.rabbitmq.username}")
		 String name;
		@Value("${spring.rabbitmq.password}")
		 String password;
		@Value("${spring.rabbitmq.virtual-host}")
		 String VirtualHost;
    private  final  String EXCHANGE_NAME ="lanwon.statistics.exchange";    
    private  String  QUEUE_NAME ="hospital.study.realtime";
//    @RabbitHandler
//    @RabbitListener(queues="hospital.study.realtime")
    public  void customer() throws IOException, TimeoutException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
        //設置RabbitMQ地址
		factory.setUsername(name);
		factory.setPassword(password);
		factory.setVirtualHost(VirtualHost);
		factory.setHost(host);
		factory.setPort(port);
        //創建一個新的連接
        Connection connection = factory.newConnection();
        final Channel channel =  connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME,"topic",true); //創建路有器
        /* 創建消息隊列,並且發送消息 */       
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);  
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "lanwon.hospital.#");  //綁定器 topic主題方法獲取消息
        // 同一時刻服務器只會發一條消息給消費者  ,在消息確認前不接收其他消息
//        channel.basicQos(1);
        //配置好獲取消息的方式   
        boolean autoAck=false;
        //消息消費完成確認
        channel.basicConsume(QUEUE_NAME, autoAck,"", new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
                try {
                    String message = new String(body, "UTF-8");
                tStatisticsStudyRealtimeService.saveStatistics(message);  //分鍾表存儲
                }
                catch (Exception e) {
                	channel.abort();  //此操作中的所有異常將被丟棄
                }
                finally {     	
                    channel.basicAck(envelope.getDeliveryTag(),false);        
                }
            }
        });
    }
}

 消費者的代碼使用過程中遇到一個問題,我最初是用

 @RabbitListener(queues="lanwon.hospital.study.statistics.realtime")這個注解來讓springboot知道這是個消費者方法,讓他自動調用這個方法,但是會出現一個問題,消費者的連接會不斷的增加,到最后服務器的tcp連接就爆了

然后我就沒用這個注解了 ,我寫了一個在springboot啟動后就執行一次的方法來調用消費者來啟動,就完美解決了這個問題。(注意如果是啟動中調用消費者的話,不能讀取到配置文件里面的配置)

下面是springboot啟動后執行的方法實現

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.event.ContextRefreshedEvent;

import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.ShutdownSignalException;
import com.yarlung.service.rabbitmq.CustomerMQ;

@Configuration
public class ApplicationStartup implements ApplicationListener<ContextRefreshedEvent>{ 
	 @Autowired
	 private CustomerMQ customerMQ;
	 
	 @Override
	 public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {  //項目啟動后,執行啟動消費者方法
		 try {
			customerMQ.customer();   //消費者的實現方法
		} catch (ShutdownSignalException | ConsumerCancelledException | IOException | TimeoutException
				| InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	 }
	 
}  

消費者配置文件

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
spring.rabbitmq.publisher-confirms=true

  

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM