一、需求介紹
后端使用Spring Boot2.0框架,要實現IBM MQ的實時數據JMS監聽接收處理,並形成回執通過MQ隊列發送。
二、引入依賴jar包
<dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>4.3.18.RELEASE</version> </dependency> <dependency> <groupId>javax.jms</groupId> <artifactId>javax.jms-api</artifactId> </dependency> <dependency> <groupId>com.ibm.mq</groupId> <artifactId>com.ibm.mq.allclient</artifactId> <version>9.1.0.0</version> </dependency>
三、監聽實現
代碼中分為三大塊:
1、MQ通道連接,我這邊是用的用戶名密碼連接,如果非密碼的可不入參
2、MQ的隊列連接並實現監聽
3、MQ發送
@Configuration public class MqTestConfig { @Autowired private MqProperties mqProperties; /**=======================MQ 通道工廠============================**/ @Bean(name="mqQueueConnectionFactory") public MQQueueConnectionFactory mqQueueConnectionFactory(){ MQQueueConnectionFactory mqQueueConnectionFactory = new MQQueueConnectionFactory(); mqQueueConnectionFactory.setHostName(mqProperties.getHostName()); try { mqQueueConnectionFactory.setTransportType(WMQConstants.WMQ_CM_CLIENT); mqQueueConnectionFactory.setCCSID(mqProperties.getCcsid()); mqQueueConnectionFactory.setChannel(mqProperties.getChannel()); mqQueueConnectionFactory.setPort(mqProperties.getPort()); mqQueueConnectionFactory.setQueueManager(mqProperties.getQueueManager()); } catch (JMSException e) { e.printStackTrace(); } return mqQueueConnectionFactory; } @Bean(name="userCredentialsConnectionFactoryAdapter") public UserCredentialsConnectionFactoryAdapter userCredentialsConnectionFactoryAdapter(MQQueueConnectionFactory mqQueueConnectionFactory){ UserCredentialsConnectionFactoryAdapter userCredentialsConnectionFactoryAdapter = new UserCredentialsConnectionFactoryAdapter(); userCredentialsConnectionFactoryAdapter.setUsername(mqProperties.getUserName()); userCredentialsConnectionFactoryAdapter.setPassword(mqProperties.getPassword()); userCredentialsConnectionFactoryAdapter.setTargetConnectionFactory(mqQueueConnectionFactory); return userCredentialsConnectionFactoryAdapter; } /**============================MQ 消息監聽接收=============================**/ //隊列連接 @Bean(name="mqueue") public MQQueue mqueue(){ MQQueue mqQueue = new MQQueue(); try { mqQueue.setBaseQueueName(mqProperties.getBaseQueueNameRecv()); mqQueue.setBaseQueueManagerName(mqProperties.getBaseQueueManagerName()); } catch (JMSException e) { e.printStackTrace(); } return mqQueue; } //對隊列進行監聽 @Bean(name="simpleMessageListenerContainer") public SimpleMessageListenerContainer simpleMessageListenerContainer(UserCredentialsConnectionFactoryAdapter userCredentialsConnectionFactoryAdapter,MQQueue mqueue){ SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(); simpleMessageListenerContainer.setConnectionFactory(userCredentialsConnectionFactoryAdapter); simpleMessageListenerContainer.setDestination(mqueue); simpleMessageListenerContainer.setMessageListener(decMqRiskRecvService()); return simpleMessageListenerContainer; } //報文處理類 @Bean(name="decMqRiskRecvService") public DecMqRiskRecvService decMqRiskRecvService(){ return new DecMqRiskRecvService(); } /**============================MQ 發送消息============================**/ @Bean(name="cachingConnectionFactory") public CachingConnectionFactory cachingConnectionFactory(UserCredentialsConnectionFactoryAdapter userCredentialsConnectionFactoryAdapter){ CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(); cachingConnectionFactory.setTargetConnectionFactory(userCredentialsConnectionFactoryAdapter); cachingConnectionFactory.setSessionCacheSize(5); cachingConnectionFactory.setReconnectOnException(true); return cachingConnectionFactory; } @Bean(name="jmsTransactionManager") public PlatformTransactionManager jmsTransactionManager(CachingConnectionFactory cachingConnectionFactory){ JmsTransactionManager jmsTransactionManager = new JmsTransactionManager(); jmsTransactionManager.setConnectionFactory(cachingConnectionFactory); return jmsTransactionManager; } @Bean(name="jmsOperations") public JmsOperations jmsOperations(CachingConnectionFactory cachingConnectionFactory){ JmsTemplate jmsTemplate = new JmsTemplate(cachingConnectionFactory); jmsTemplate.setReceiveTimeout(mqProperties.getReceiveTimeout()); return jmsTemplate; } }
mq配置文件
記得要添加get和set方法
@Configuration @ConfigurationProperties(prefix=MqProperties.MQ_PREFIX) public class MqProperties { public static final String MQ_PREFIX = "mq"; private String hostName; private int port; private String channel; private int ccsid; private String userName; private String password; private String queueManager; private String baseQueueManagerName; private String baseQueueNameRecv; private String baseQueueNameSend; private long receiveTimeout; }
報文處理類及回執發送
1、實現類要實現MessageListener,重寫onMessage方法,Message就是監聽到的消息。
2、讀取報文時為防止亂碼,我這邊按照格式分兩種方式讀取轉碼。
3、發送回執,之前發送發現報文多出了一些報文頭信息,所以在隊列信息加了
"queue:///" + mqProperties.getBaseQueueNameSend() + "?targetClient=1"
這樣發送的報文會去掉報文頭信息。
@Service public class DecMqRiskRecvService implements MessageListener { @Autowired private JmsOperations jmsOperations; @Autowired private MqProperties mqProperties; @Override public void onMessage(Message message) { String str = null; // 1、讀取報文 try { if (message instanceof BytesMessage) { BytesMessage bm = (BytesMessage) message; byte[] bys = null; bys = new byte[(int) bm.getBodyLength()]; bm.readBytes(bys); str = new String(bys, "UTF-8"); } else { str = ((TextMessage) message).getText(); str = new String(str.getBytes("ISO-8859-1"), "UTF-8"); } } catch (JMSException e) { e.printStackTrace(); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } // 2、處理報文 // 3、組裝回執發送 String receipt = ""; try { jmsOperations.convertAndSend("queue:///" + mqProperties.getBaseQueueNameSend() + "?targetClient=1", receipt.getBytes("UTF-8")); } catch (JmsException e) { e.printStackTrace(); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } }