RabbitMQ連接池、生產者、消費者實例


1、本文分享RabbitMQ的工具類,經過實際項目長期測試,在此分享給發家,各位大神有什么建議請指正 !!!

2、下面是鏈接池主要代碼:

 1 import java.util.HashMap;
 2 import java.util.Map;
 3 
 4 import org.apache.commons.lang3.StringUtils;
 5 import org.slf4j.Logger;
 6 import org.slf4j.LoggerFactory;
 7 
 8 import com.rabbitmq.client.Connection;
 9 import com.rabbitmq.client.ConnectionFactory;
10 
11 /**
12  * 獲取RabbitMq連接
13  * @author skyfeng
14  */
15 public class RabbitMqConnectFactory {
16     static Logger log = LoggerFactory.getLogger(RabbitMqConnectFactory.class);
17     /**
18      * 緩存連接工廠,將建立的鏈接放入map緩存
19      */
20     private static Map<String, ConnectionFactory> connectionFactoryMap = new HashMap<String, ConnectionFactory>();
21     /**
22      * 根據rabbitMqName獲取一個連接,使用完記得要自己關閉連接 conn.close()
23      */
24     public static Connection getConnection(String rabbitMqName) {
25         if(StringUtils.isEmpty(rabbitMqName)){
26             log.error("rabbitMqName不能為空!");
27             throw new java.lang.NullPointerException("rabbitMqName為空");
28         }
29         if(connectionFactoryMap.get(rabbitMqName)==null){
30             initConnectionFactory(rabbitMqName);
31         }
32         ConnectionFactory connectionFactory = connectionFactoryMap.get(rabbitMqName);
33         if(connectionFactory==null){
34             log.info("沒有找到對應的rabbitmq,name={}",rabbitMqName);
35         }
36         try {
37             return connectionFactory.newConnection();
38         }catch (Exception e) {
39             log.error("創建rabbitmq連接異常!",e);
40             return null;
41         }
42     }
43     /**
44      * 初始化一個連接工廠
45      * @param rabbitMqName
46      */
47     private static void initConnectionFactory(String rabbitMqName){
48         
49         try {
50             ConnectionFactory factory = new ConnectionFactory();
51             //新增代碼,如果連接斷開會自動重連
52             //factory.setAutomaticRecoveryEnabled(true);
53             factory.setHost("127.0.0.1");
54             factory.setPort(5672);
55             //factory.setVirtualHost(vhost);
56             factory.setUsername("test");
57             factory.setPassword("test");
58             connectionFactoryMap.put(rabbitMqName, factory);
59         } catch (Exception e) {
60             e.printStackTrace();
61         }finally{
62         }
63     }
64     
65 }

 3、消費端的代碼:

 1 import org.slf4j.Logger;
 2 import org.slf4j.LoggerFactory;
 3 
 4 import com.rabbitmq.client.Channel;
 5 import com.rabbitmq.client.Connection;
 6 import com.rabbitmq.client.Consumer;
 7 
 8 /**
 9  * RabbitMQq客戶端代碼
10  * @author skyfeng
11  *
12  */
13 public class CustomerMqClient {
14 
15     final static Logger log = LoggerFactory.getLogger(CustomerMqClient.class);
16     private final static String RABBITMQ_NAME = "mq_name";
17     private final static String EXCHANGE_NAME = "Exchange_name";
18     private final static String QUEUE_NAME = "queue_name";
19     private static Channel channel = null;
20     private static Connection connection = null;
21     
22     /**
23      * 初始化客戶端代碼
24      */
25     public static void initClient() {
26         //重新鏈接時判斷之前的channel是否關閉,沒有關閉先關閉
27         if(null != channel  && channel.isOpen()){
28             try {
29                 channel.close();
30             } catch (Exception e) {
31                 log.error("mq name =[" +RABBITMQ_NAME+"] close old channel exception.e={}",e);
32             }finally {
33                 channel = null;
34             }
35         }
36         //重新鏈接時判斷之前的connection是否關閉,沒有關閉先關閉
37         if (null != connection && connection.isOpen()) {
38             try {
39                 connection.close();
40             } catch (Exception e) {
41                 log.error("mq name =[" +RABBITMQ_NAME+"] close old connection exception.e={}",e);
42             }finally{
43                 connection = null;
44             }
45         }
46         //從鏈接池中獲取鏈接
47         connection = RabbitMqConnectFactory.getConnection(RABBITMQ_NAME);
48         try {
49             channel = connection.createChannel();
50             channel.exchangeDeclare(EXCHANGE_NAME, "topic", true);
51             channel.queueDeclare(QUEUE_NAME, false, false, false, null);
52             channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "#");//#號接收所有的數據
53             Consumer consumer = new CustomerMqConsumer(channel);//具體的業務邏輯在CustomerMqConsumer中
54             channel.basicConsume(QUEUE_NAME, false, consumer);
55         } catch (Exception e) {
56             log.error("CustomerMqClient mq client connection fail .....{}", e);
57             //發生異常時,重連
58             reConnect();
59         }
60     }
61 
62     // 異常時,重連的方法
63     public static void reConnect() {
64         log.error("等待5s后重連");
65         try {
66             Thread.sleep(5000);
67         } catch (InterruptedException e) {
68         }
69         initClient();
70     }
71 
72 }

4、生產端代碼:

 1 import org.apache.commons.lang3.StringUtils;
 2 import org.slf4j.Logger;
 3 import org.slf4j.LoggerFactory;
 4 
 5 import com.rabbitmq.client.AlreadyClosedException;
 6 import com.rabbitmq.client.Channel;
 7 import com.rabbitmq.client.Connection;
 8 
 9 /**
10  * 把數據發送到rabbitmq的exchange,
11  */
12 public class SendToExchange {
13     static Logger log = LoggerFactory.getLogger(SendToExchange.class);
14     
15     final static String TYPE = "topic";
16     final static String CHARSET_UTF8 = "UTF-8";
17     //MQ生產者exchange,把數據發給這個exchange
18     final static String rabbitExchangeName = "ExchangeName";
19     static boolean mqConnected = false;//mq當前處於連接狀態
20     
21     static Channel channel=null;
22     static{
23         init();
24     }
25     public static void init(){
26         log.info(" rabbit mq init begin...");
27         try {
28             //在mq連接中斷后,發送程序判斷已經斷開,啟動重連的時候會執行
29             if(channel!=null){
30                 try {
31                     channel.close();
32                 } catch (Exception e) {
33                     log.error("關閉老channel 異常",e);
34                 }finally{
35                     channel = null;
36                 }
37             }
38             Connection connection = RabbitMqConnectFactory.getConnection("connection");
39             channel = connection.createChannel();
40             /*
41              *這里只定義exchange,因為每個業務模塊都會從這里接入數據,所以不在這里定義隊列
42              *隊列的定義在各個業務模塊自己的消費端定義
43              */
44             channel.exchangeDeclare(rabbitExchangeName, TYPE, true, false, null);
45             log.info(" rabbit mq init OK");
46             mqConnected = true;
47         } catch (Exception e) {
48             log.error("rabbitmq初始化錯誤",e);
49             mqConnected = false;
50         }
51     }
52     /**
53      * 往rabbitmq發數據
54      * @param message
55      */
56     public static void sendToRabbitMq(String message,String routingKey){
57         try {
58             if(StringUtils.isEmpty(message)){
59                 log.debug("message is empty");
60                 return;
61             }
62             channel.basicPublish(rabbitExchangeName, routingKey, null, message.getBytes(CHARSET_UTF8));
63         }catch(AlreadyClosedException ex){
64             log.error("往rabbitmq發數據報錯,可能連接已關閉,嘗試重連,data:",message,ex);
65             init();
66         }catch (Exception e) {
67             log.error("往rabbitmq發數據報錯,data:",message,e);
68         }
69     }
70 }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM