SpringBoot 對IBM MQ進行數據監聽接收以及數據發送


一、需求介紹

后端使用Spring Boot2.0框架,要實現IBM MQ的實時數據JMS監聽接收處理,並形成回執通過MQ隊列發送。

二、引入依賴jar包

<dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-jms</artifactId>
        <version>4.3.18.RELEASE</version>
</dependency>
<dependency>
        <groupId>javax.jms</groupId>
        <artifactId>javax.jms-api</artifactId>
</dependency>
<dependency>
        <groupId>com.ibm.mq</groupId>
        <artifactId>com.ibm.mq.allclient</artifactId>
        <version>9.1.0.0</version>
</dependency>

三、監聽實現

代碼中分為三大塊:

1、MQ通道連接,我這邊是用的用戶名密碼連接,如果非密碼的可不入參

2、MQ的隊列連接並實現監聽

3、MQ發送

@Configuration
public class MqTestConfig {
    
    @Autowired
    private MqProperties mqProperties;
    
    /**=======================MQ 通道工廠============================**/
    @Bean(name="mqQueueConnectionFactory")
    public MQQueueConnectionFactory mqQueueConnectionFactory(){
        MQQueueConnectionFactory mqQueueConnectionFactory = new MQQueueConnectionFactory();
        mqQueueConnectionFactory.setHostName(mqProperties.getHostName());
        try {
            mqQueueConnectionFactory.setTransportType(WMQConstants.WMQ_CM_CLIENT);
            mqQueueConnectionFactory.setCCSID(mqProperties.getCcsid());
            mqQueueConnectionFactory.setChannel(mqProperties.getChannel());
            mqQueueConnectionFactory.setPort(mqProperties.getPort());
            mqQueueConnectionFactory.setQueueManager(mqProperties.getQueueManager());
        } catch (JMSException e) {
            e.printStackTrace();
        }
        return mqQueueConnectionFactory;
    }
    @Bean(name="userCredentialsConnectionFactoryAdapter")
    public UserCredentialsConnectionFactoryAdapter userCredentialsConnectionFactoryAdapter(MQQueueConnectionFactory mqQueueConnectionFactory){
        UserCredentialsConnectionFactoryAdapter userCredentialsConnectionFactoryAdapter = new UserCredentialsConnectionFactoryAdapter();
        userCredentialsConnectionFactoryAdapter.setUsername(mqProperties.getUserName());
        userCredentialsConnectionFactoryAdapter.setPassword(mqProperties.getPassword());
        userCredentialsConnectionFactoryAdapter.setTargetConnectionFactory(mqQueueConnectionFactory);
        return userCredentialsConnectionFactoryAdapter;
    }
    
    /**============================MQ 消息監聽接收=============================**/
    //隊列連接
    @Bean(name="mqueue")
    public MQQueue mqueue(){
        MQQueue mqQueue = new MQQueue();
        try {
            mqQueue.setBaseQueueName(mqProperties.getBaseQueueNameRecv());
            mqQueue.setBaseQueueManagerName(mqProperties.getBaseQueueManagerName());
        } catch (JMSException e) {
            e.printStackTrace();
        }
        return mqQueue;
    }
    //對隊列進行監聽
    @Bean(name="simpleMessageListenerContainer")
    public SimpleMessageListenerContainer simpleMessageListenerContainer(UserCredentialsConnectionFactoryAdapter userCredentialsConnectionFactoryAdapter,MQQueue mqueue){
        SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer();
        simpleMessageListenerContainer.setConnectionFactory(userCredentialsConnectionFactoryAdapter);
        simpleMessageListenerContainer.setDestination(mqueue);
        simpleMessageListenerContainer.setMessageListener(decMqRiskRecvService());
        return simpleMessageListenerContainer;
    }
    //報文處理類
    @Bean(name="decMqRiskRecvService")
    public DecMqRiskRecvService decMqRiskRecvService(){
        return new DecMqRiskRecvService();
    }
    
    /**============================MQ 發送消息============================**/
    @Bean(name="cachingConnectionFactory")
    public CachingConnectionFactory cachingConnectionFactory(UserCredentialsConnectionFactoryAdapter userCredentialsConnectionFactoryAdapter){
        CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
        cachingConnectionFactory.setTargetConnectionFactory(userCredentialsConnectionFactoryAdapter);
        cachingConnectionFactory.setSessionCacheSize(5);
        cachingConnectionFactory.setReconnectOnException(true);
        return cachingConnectionFactory;
    }
    @Bean(name="jmsTransactionManager")
    public PlatformTransactionManager jmsTransactionManager(CachingConnectionFactory cachingConnectionFactory){
        JmsTransactionManager jmsTransactionManager = new JmsTransactionManager();
        jmsTransactionManager.setConnectionFactory(cachingConnectionFactory);
        return jmsTransactionManager;
    }
    @Bean(name="jmsOperations")
    public JmsOperations jmsOperations(CachingConnectionFactory cachingConnectionFactory){
        JmsTemplate jmsTemplate = new JmsTemplate(cachingConnectionFactory);
        jmsTemplate.setReceiveTimeout(mqProperties.getReceiveTimeout());
        return jmsTemplate;
    }

}

mq配置文件

記得要添加get和set方法

@Configuration
@ConfigurationProperties(prefix=MqProperties.MQ_PREFIX)
public class MqProperties {
    public static final String MQ_PREFIX = "mq";
    private String hostName;
    private int port;
    private String channel;
    private int ccsid;
    private String userName;
    private String password;
    private String queueManager;
    private String baseQueueManagerName;
    private String baseQueueNameRecv;
    private String baseQueueNameSend;
    private long receiveTimeout;
    
}

報文處理類及回執發送

1、實現類要實現MessageListener,重寫onMessage方法,Message就是監聽到的消息。

2、讀取報文時為防止亂碼,我這邊按照格式分兩種方式讀取轉碼。

3、發送回執,之前發送發現報文多出了一些報文頭信息,所以在隊列信息加了

  "queue:///" + mqProperties.getBaseQueueNameSend() + "?targetClient=1"

  這樣發送的報文會去掉報文頭信息。

@Service
public class DecMqRiskRecvService implements MessageListener {

    @Autowired
    private JmsOperations jmsOperations;
    @Autowired
    private MqProperties mqProperties;

    @Override
    public void onMessage(Message message) {
        String str = null;
        // 1、讀取報文
        try {
            if (message instanceof BytesMessage) {
                BytesMessage bm = (BytesMessage) message;
                byte[] bys = null;
                bys = new byte[(int) bm.getBodyLength()];
                bm.readBytes(bys);
                str = new String(bys, "UTF-8");
            } else {
                str = ((TextMessage) message).getText();
                str = new String(str.getBytes("ISO-8859-1"), "UTF-8");
            }
        } catch (JMSException e) {
            e.printStackTrace();
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }

        // 2、處理報文

        // 3、組裝回執發送
        String receipt = "";
        try {
            jmsOperations.convertAndSend("queue:///" + mqProperties.getBaseQueueNameSend() + "?targetClient=1", receipt.getBytes("UTF-8"));
        } catch (JmsException e) {
            e.printStackTrace();
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
    }

}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM