ActiveMQ監聽消息並進行轉發,監聽不同的mq服務器和不同的隊列


工作中剛接觸mq消息業務,其實也就是監聽一下別的項目發送的消息然后進行對應的轉發,但是監聽的mq會有多個,而且轉發的地址也可能有多個,這里就使用spring集成的方式!記錄一下實現方式:

監聽多個mq配置,主要還是在xml或者配置類里進行配置多個,這里以兩個為例:

properties文件中配置好多個mq的tcp地址,

<!-- mq配置 -->
		<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
			<property name="brokerURL" value="${amq.tpl.server}" />
		</bean>
		<bean id="connectionFactory"
			class="org.springframework.jms.connection.SingleConnectionFactory">
			<property name="targetConnectionFactory" ref="targetConnectionFactory" />
		</bean>
		<!-- <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
			<property name="connectionFactory" ref="connectionFactory" />
		</bean> -->
		<!-- 監聽的消息隊列 -->
		<bean id="wechatQueueDestination" class="org.apache.activemq.command.ActiveMQQueue">
			<constructor-arg>
				<value>templateQueue</value>
			</constructor-arg>
			<!--可繼續配置多個隊列 -->
		</bean>
		<!-- 消息監聽器配置,引用制定的mq服務器與監聽隊列->
		<bean id="templateMessageListener" class="com.zhuzher.amq.listener.TemplateMessageListener"/> 
		<bean id="templateMessageContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">  
		    <property name="connectionFactory" ref="connectionFactory" />  
		    <property name="destination" ref="wechatQueueDestination" />  
		    <property name="messageListener" ref="templateMessageListener" />  
		</bean>

  多個,mq就切換不同的地址,配置不同的連接工廠就可以了,然后再配置監聽器!

然后就是消息轉發了,這里采用httpclient調用接口方式實現,然后將地址配置在數據庫中,達到高可擴展的目的,為了提高性能還可以在項目啟動的時候把地址加載的內存中,取地址就從內存中獲取,並提供一個刷新的接口即可!

這里直接貼上內存類的代碼:

//存儲轉發地址
public class ForwardAddressHelper {
	private static Logger log=Logger.getLogger(ForwardAddressHelper.class);
	@Autowired PmsForwardService pmsForwardService;
	//存儲轉發地址
	private static List<PmsForwardAddress> address = new ArrayList<>();
	private static ForwardAddressHelper forwardAddressHelper=null;//單例
	//私有化構造函數
	private  ForwardAddressHelper(){}
	public static ForwardAddressHelper getInstance() {
        if (forwardAddressHelper == null) {
            synchronized (ForwardAddressHelper.class) {
                if (forwardAddressHelper == null) {
                	forwardAddressHelper = new ForwardAddressHelper();
                }
            }
        }
        return forwardAddressHelper;
    }
	/**
	 * 初始化轉發地址
	 */
	public void init(){
		if(ForwardAddressHelper.address.size()==0){
			System.out.println("----------------初始化成功----------------");
			initAddress();
		}
	}
	/**
	 * 重載轉發地址
	 */
	public void reLoad() {
		ForwardAddressHelper.address.clear();
		init();
	}
	/**
	 * 初始化轉發地址數據
	 */
	private void initAddress(){
		log.info("--------------轉發地址初始化-----------");
		ForwardAddressHelper.address.addAll(pmsForwardService.queryAddress());
	}
	/**
	 * 獲取所有轉發地址
	 */
	public static List<PmsForwardAddress> getAddress(){
		return ForwardAddressHelper.address;
	}
}

  ,然后只需要在spring容器啟動的時候調用這個init方法就可以了,這里有兩種,一種是監聽器方式,還有一種是xml配置,這里我就直接使用xml了:

lazy-init="false" :表示容器加載立即執行
<bean id="forwardAddressHelper" lazy-init="false" class="com.helper.ForwardAddressHelper" init-method="init"/>

,然后就是寫消息監聽類了,實現具體業務,由於已經配置了監聽器,所以直接寫就行,這里直接上代碼,具體業務就是用httpclient調一遍接口,消息內容是接口的參數:
public class PmsMessageListener implements MessageListener {
      
	static Logger log=Logger.getLogger(PmsMessageListener.class);
	static final Gson GSON = new Gson();
	@Autowired ForwardAddressHelper forwardAddressHelper;
	@Override
	public void onMessage(Message message) {
		log.debug("監聽器接收到消息:"+message);
		if(null == message || !(message instanceof TextMessage))return;
		TextMessage textMessage = (TextMessage) message;
		String text = null;
		try {
			text = textMessage.getText();log.debug("message:"+textMessage.getText());
		} catch (JMSException e) { e.printStackTrace(); }
		if(StringUtil.isBlank(text))return;
		Map<String, String> messageMap = GSON.fromJson(text, new TypeToken<Map<String, String>>(){}.getType());
		pmsForward(messageMap);
	}
	//消息轉發-獲取參數中對應參數調用對應接口
	public void pmsForward(Map<String, String> map){
		List<PmsForwardAddress> address = forwardAddressHelper.getAddress();//從內存獲取轉發地址
		//封裝參數
		List<NameValuePair> params = new ArrayList<NameValuePair>();
		Iterator<Map.Entry<String, String>> iterator = map.entrySet().iterator();
		while(iterator.hasNext()){
			params.add(new BasicNameValuePair(iterator.next().getKey(),iterator.next().getValue()));
		}
		address.forEach(x->{
			CloseableHttpResponse response = null;
			CloseableHttpClient httpClient = HttpClients.createDefault();
			try {
				URIBuilder builder = new URIBuilder(x.getAddress());
			    builder.setParameters(params);
			    HttpGet get = new HttpGet(builder.build());
			    response = httpClient.execute(get);
			    if(response != null && response.getStatusLine().getStatusCode() == 200)log.info("消息轉發成功");
			} catch (Exception e) {e.printStackTrace();log.error("消息轉發失敗");
			} finally {
	            try { httpClient.close();if(response != null)response.close();
	            } catch (IOException e) {e.printStackTrace();}
			}
		});
	}

}

  

PmsMessageListener 這個類配置在了xml文件中,會監聽我們指定的mq的消息隊列,只要有消息來就會取數據庫里配置的接口一一調用!

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM