ActiveMQ服務器之間傳輸對象,項目A發送對象到項目B接收發送對象《一》


項目A發送消息端:

        // TODO Auto-generated method stub
        ConnectionFactory connectionFactory; // Connection :JMS 客戶端到JMS
        // Provider 的連接
        Connection connection = null; // Session: 一個發送或接收消息的線程
        Session session; // Destination :消息的目的地;消息發送給誰.
        Destination destination; // MessageProducer:消息發送者
        MessageProducer producer; // TextMessage message;
        // 構造ConnectionFactory實例對象,此處采用ActiveMq的實現jar
        connectionFactory = new ActiveMQConnectionFactory(
                ActiveMQConnection.DEFAULT_USER,
                ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616");
 
        try { // 構造從工廠得到連接對象
            connection = connectionFactory.createConnection();
            // 啟動
            connection.start();
            // 獲取操作連接
            session = connection.createSession(Boolean.TRUE,
                    Session.AUTO_ACKNOWLEDGE);
            // 獲取session注意參數值xingbo.xu-queue是一個服務器的queue,須在在ActiveMq的console配置
            destination = session.createQueue("FirstQueue");
            // 得到消息生成者【發送者】
            producer = session.createProducer(destination);
            // 設置不持久化,此處學習,實際根據項目決定
            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
            // 構造消息,此處寫死,項目就是參數,或者方法獲取
 
            ActiveMQObjectMessage msg = (ActiveMQObjectMessage) session
                    .createObjectMessage();
            msg.setObject((Serializable) user);
            //將對象發送
            producer.send(msg);
            
            session.commit();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                if (null != connection)
                    connection.close();
            } catch (Throwable ignore) {
            }
        }

項目B 接受對象:

因為使用監聽一直觸發不了,沒有解決,所以使用項目運行時開啟線程持續接收消息。

開啟線程:

public class MyListener  implements ServletContextListener   {
 
    
    private MyThread myThread;  
    
    public void contextDestroyed(ServletContextEvent e) { 
        if (myThread != null && myThread.isInterrupted()) {  
            myThread.interrupt();  
        }  
    }  
  
    public void contextInitialized(ServletContextEvent e) {  
        String str = null;  
        if (str == null && myThread == null) {  
            myThread = new MyThread();  
            myThread.start(); // servlet 上下文初始化時啟動 socket  
        }  
    } 
 
}

線程開啟接收消息:

    public class MyThread extends Thread {
    
    public void run() {
 
 
        while (!this.isInterrupted()) {// 線程未中斷執行循環
            try {
                Thread.sleep(200); // 每隔2000ms執行一次
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
 
 
            // ------------------ 開始執行 ---------------------------
 
 
            // ConnectionFactory :連接工廠,JMS 用它創建連接
            ConnectionFactory connectionFactory;
            // Connection :JMS 客戶端到JMS Provider 的連接
            Connection connection = null;
            // Session: 一個發送或接收消息的線程
            Session session;
            // Destination :消息的目的地;消息發送給誰.
            Destination destination;
            // 消費者,消息接收者
            MessageConsumer consumer;
 
 
            connectionFactory = new ActiveMQConnectionFactory(
                    ActiveMQConnection.DEFAULT_USER,
                    ActiveMQConnection.DEFAULT_PASSWORD,
                    "tcp://localhost:61616");
 
 
            try {
                // 構造從工廠得到連接對象
                connection = connectionFactory.createConnection();
                // 啟動
                connection.start();
                // 獲取操作連接
                session = connection.createSession(Boolean.FALSE,
                        Session.AUTO_ACKNOWLEDGE);
                // 獲取session注意參數值xingbo.xu-queue是一個服務器的queue,須在在ActiveMq的console配置
                destination = session.createQueue("FirstQueue");
                consumer = session.createConsumer(destination);
 
 
                consumer.setMessageListener(new MessageListener() {
 
 
                    @Override
                    public void onMessage(Message message) {
                        // TODO Auto-generated method stub
                        User user;
                        try {
                            user = (User) ((ObjectMessage) message).getObject();
                            if (message != null) {
                                // User s = (User) message.getObject();
                                System.out.println("收到的消息對象:"
                                        + user.getLoginName());
                                user.setCreateBy(new User("1"));
                                user.setUpdateBy(new User("1"));
                                //使用getBean方式獲取Bean 因注解方式掃描不到.
                                SystemService systemService = (SystemService)ApplicationContextHandle.getBean("systemService");                                
                                systemService.saveUser(user);
                                
                            }
 
 
                        } catch (JMSException e) {
                            // TODO Auto-generated catch block
                            e.printStackTrace();
                        }
                    }
                });
 
 
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                try {
                    if (null != connection)
                        connection.close();
                } catch (Throwable ignore) {
                }
            }
        }
    }
}

配置Web.xml 配置線程監聽。項目開啟時啟動線程

     <listener>
            <listener-class>com.thinkgem.jeesite.modules.test.web.MyListener</listener-class>
    </listener> 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM