1、本文分享RabbitMQ的工具類,經過實際項目長期測試,在此分享給發家,各位大神有什么建議請指正 !!!
2、下面是鏈接池主要代碼:
1 import java.util.HashMap; 2 import java.util.Map; 3 4 import org.apache.commons.lang3.StringUtils; 5 import org.slf4j.Logger; 6 import org.slf4j.LoggerFactory; 7 8 import com.rabbitmq.client.Connection; 9 import com.rabbitmq.client.ConnectionFactory; 10 11 /** 12 * 獲取RabbitMq連接 13 * @author skyfeng 14 */ 15 public class RabbitMqConnectFactory { 16 static Logger log = LoggerFactory.getLogger(RabbitMqConnectFactory.class); 17 /** 18 * 緩存連接工廠,將建立的鏈接放入map緩存 19 */ 20 private static Map<String, ConnectionFactory> connectionFactoryMap = new HashMap<String, ConnectionFactory>(); 21 /** 22 * 根據rabbitMqName獲取一個連接,使用完記得要自己關閉連接 conn.close() 23 */ 24 public static Connection getConnection(String rabbitMqName) { 25 if(StringUtils.isEmpty(rabbitMqName)){ 26 log.error("rabbitMqName不能為空!"); 27 throw new java.lang.NullPointerException("rabbitMqName為空"); 28 } 29 if(connectionFactoryMap.get(rabbitMqName)==null){ 30 initConnectionFactory(rabbitMqName); 31 } 32 ConnectionFactory connectionFactory = connectionFactoryMap.get(rabbitMqName); 33 if(connectionFactory==null){ 34 log.info("沒有找到對應的rabbitmq,name={}",rabbitMqName); 35 } 36 try { 37 return connectionFactory.newConnection(); 38 }catch (Exception e) { 39 log.error("創建rabbitmq連接異常!",e); 40 return null; 41 } 42 } 43 /** 44 * 初始化一個連接工廠 45 * @param rabbitMqName 46 */ 47 private static void initConnectionFactory(String rabbitMqName){ 48 49 try { 50 ConnectionFactory factory = new ConnectionFactory(); 51 //新增代碼,如果連接斷開會自動重連 52 //factory.setAutomaticRecoveryEnabled(true); 53 factory.setHost("127.0.0.1"); 54 factory.setPort(5672); 55 //factory.setVirtualHost(vhost); 56 factory.setUsername("test"); 57 factory.setPassword("test"); 58 connectionFactoryMap.put(rabbitMqName, factory); 59 } catch (Exception e) { 60 e.printStackTrace(); 61 }finally{ 62 } 63 } 64 65 }
3、消費端的代碼:
1 import org.slf4j.Logger; 2 import org.slf4j.LoggerFactory; 3 4 import com.rabbitmq.client.Channel; 5 import com.rabbitmq.client.Connection; 6 import com.rabbitmq.client.Consumer; 7 8 /** 9 * RabbitMQq客戶端代碼 10 * @author skyfeng 11 * 12 */ 13 public class CustomerMqClient { 14 15 final static Logger log = LoggerFactory.getLogger(CustomerMqClient.class); 16 private final static String RABBITMQ_NAME = "mq_name"; 17 private final static String EXCHANGE_NAME = "Exchange_name"; 18 private final static String QUEUE_NAME = "queue_name"; 19 private static Channel channel = null; 20 private static Connection connection = null; 21 22 /** 23 * 初始化客戶端代碼 24 */ 25 public static void initClient() { 26 //重新鏈接時判斷之前的channel是否關閉,沒有關閉先關閉 27 if(null != channel && channel.isOpen()){ 28 try { 29 channel.close(); 30 } catch (Exception e) { 31 log.error("mq name =[" +RABBITMQ_NAME+"] close old channel exception.e={}",e); 32 }finally { 33 channel = null; 34 } 35 } 36 //重新鏈接時判斷之前的connection是否關閉,沒有關閉先關閉 37 if (null != connection && connection.isOpen()) { 38 try { 39 connection.close(); 40 } catch (Exception e) { 41 log.error("mq name =[" +RABBITMQ_NAME+"] close old connection exception.e={}",e); 42 }finally{ 43 connection = null; 44 } 45 } 46 //從鏈接池中獲取鏈接 47 connection = RabbitMqConnectFactory.getConnection(RABBITMQ_NAME); 48 try { 49 channel = connection.createChannel(); 50 channel.exchangeDeclare(EXCHANGE_NAME, "topic", true); 51 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 52 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "#");//#號接收所有的數據 53 Consumer consumer = new CustomerMqConsumer(channel);//具體的業務邏輯在CustomerMqConsumer中 54 channel.basicConsume(QUEUE_NAME, false, consumer); 55 } catch (Exception e) { 56 log.error("CustomerMqClient mq client connection fail .....{}", e); 57 //發生異常時,重連 58 reConnect(); 59 } 60 } 61 62 // 異常時,重連的方法 63 public static void reConnect() { 64 log.error("等待5s后重連"); 65 try { 66 Thread.sleep(5000); 67 } catch (InterruptedException e) { 68 } 69 initClient(); 70 } 71 72 }
4、生產端代碼:
1 import org.apache.commons.lang3.StringUtils; 2 import org.slf4j.Logger; 3 import org.slf4j.LoggerFactory; 4 5 import com.rabbitmq.client.AlreadyClosedException; 6 import com.rabbitmq.client.Channel; 7 import com.rabbitmq.client.Connection; 8 9 /** 10 * 把數據發送到rabbitmq的exchange, 11 */ 12 public class SendToExchange { 13 static Logger log = LoggerFactory.getLogger(SendToExchange.class); 14 15 final static String TYPE = "topic"; 16 final static String CHARSET_UTF8 = "UTF-8"; 17 //MQ生產者exchange,把數據發給這個exchange 18 final static String rabbitExchangeName = "ExchangeName"; 19 static boolean mqConnected = false;//mq當前處於連接狀態 20 21 static Channel channel=null; 22 static{ 23 init(); 24 } 25 public static void init(){ 26 log.info(" rabbit mq init begin..."); 27 try { 28 //在mq連接中斷后,發送程序判斷已經斷開,啟動重連的時候會執行 29 if(channel!=null){ 30 try { 31 channel.close(); 32 } catch (Exception e) { 33 log.error("關閉老channel 異常",e); 34 }finally{ 35 channel = null; 36 } 37 } 38 Connection connection = RabbitMqConnectFactory.getConnection("connection"); 39 channel = connection.createChannel(); 40 /* 41 *這里只定義exchange,因為每個業務模塊都會從這里接入數據,所以不在這里定義隊列 42 *隊列的定義在各個業務模塊自己的消費端定義 43 */ 44 channel.exchangeDeclare(rabbitExchangeName, TYPE, true, false, null); 45 log.info(" rabbit mq init OK"); 46 mqConnected = true; 47 } catch (Exception e) { 48 log.error("rabbitmq初始化錯誤",e); 49 mqConnected = false; 50 } 51 } 52 /** 53 * 往rabbitmq發數據 54 * @param message 55 */ 56 public static void sendToRabbitMq(String message,String routingKey){ 57 try { 58 if(StringUtils.isEmpty(message)){ 59 log.debug("message is empty"); 60 return; 61 } 62 channel.basicPublish(rabbitExchangeName, routingKey, null, message.getBytes(CHARSET_UTF8)); 63 }catch(AlreadyClosedException ex){ 64 log.error("往rabbitmq發數據報錯,可能連接已關閉,嘗試重連,data:",message,ex); 65 init(); 66 }catch (Exception e) { 67 log.error("往rabbitmq發數據報錯,data:",message,e); 68 } 69 } 70 }
