MQ java 基礎編程


MQ java 基礎編程
編寫人:鄔文俊
編寫時間 : 2006-2-16
聯系郵件 : wenjunwu430@gmail.com
前言

通過 2 個多星期對 MQ 學習,在 partner 丁 & partner 武 的幫助下完成了該文檔。該文檔提供一個簡單的例子,通過對該例子的講解,你將知道:
1. 用 java 寫客戶端從 MQ Server 收發消息。
2. MQ 作為 Websphere Application Server 的 JMS 資源提供者。
3. JMS message 映射轉化為  MQ message
文檔中的知識全部從參考資料和 IBM 提供的文檔中獲得。 I recommend u to read the documents if u want to know more about the MQ.
參考資料

  1. 《 Using java 》( some place name it 《 base java 》) —–the very important document offered by IBM, every java programmer should read it!
  2. 《讓 WebSphere MQ 成為部署在 WebSphere Application Server 中的應用程序的 JMS 提供程序》
  3. Websphere MQ 入門教程 (a document written by IBM engineer)
  4. mqseries_class_for_java
  5. 《 IBM - JMS 應用和使用 WebSphere MQ MQI 接口的應用如何進行信息頭的交換(二)數據映射》 ——- 《 using java 》 mapping message 部分的翻譯。
  6. MQ–IBM MQSeries 使用指南
  7. WebSphere Application Server V5 and WebSphere MQ Family Integration. PDF
  8. WebSphere MQ Application Programming Guide. PDF
  9. IBM MQSeries 的觸發機制
  10. 讓 WebSphere MQ 成為部署在 WebSphere Application Server 中的應用程序的 JMS 提供程序
    例子說明

例子包括 3 個部分:發送客戶端、 MDB 、接收客戶端。
客戶端 A 把文件發送到服務器,服務器將該文件轉發給客戶端 B 。客戶端A通過向 MQ 客戶機直接發送消息。 MQ 隊列會把客戶端A發送的 MQ Message 轉換為 JMS Message 。 MDB 接收到轉換后的 JMS 消息后,保存文件在服務器,然后把文件轉發到發送隊列( JMS 隊列)。發送隊列由 MQ Server 提供,所以即為發送到了 MQ 隊列。 MQ 隊列把 JMS Message 轉換為 MQ Message 。客戶端 B 從 MQ 隊列中接收轉換后的消息,從消息中讀取文件,保存在硬盤。

        MQMESSAGE             JMS MESSAGE

Client A————————->mq queue ——————->MDB
Client B<———————— mq queue <——————-MDB

  1. 配置 MQ Server

這里使用的是 MQ Server 5.2 。 MQ Server 和 WebSphere Application Server 安裝在同一台機器上( 2 者可以使用綁定方式通信)。
要配置的項目:
1. 隊列管理器 QMGR
2. 偵聽端口 4001
3. 本地隊列 EXAMPLE.QUEUE
4. 本地隊列 EXAMPLE.SENDQUEUE
5. 通道 EXAMPLE.CHANNEL

打開 WebSphere MQ 資源管理器。展開隊列管理器節點,右鍵,新建隊列管理器。取名字為 QMGR ,設置偵聽端口 4001 。
在建好的隊列管理器 QMGR 下面新建 2 個本地隊列: EXAMPLE.QUEUE , EXAMPLE.SENDQUEUE 。
展開高級節點,新建服務器連接通道 EXAMPLE.CHANNEL 。
Note :不要搞錯隊列和通道的類型。
2. 驗證 MQ 配置

打開 WebSphere MQ 服務。可以查看服務是否啟動、服務監聽端口。
3. 配置 WAS JMS

具體操作參考《讓 WebSphere MQ 成為部署在 WebSphere Application Server 中的應用程序的 JMS 提供程序》該文章可以在 IBM WebSphere 開發者技術期刊 中找到。
要配置的項目:
1. WebSphere MQ 連接工廠
JNDI name : jms/JMSExampleConnectionFactory
2. WebSphere MQ 隊列目標
JNDI name : jms/JMSExampleQueue ;基本隊列名: EXAMPLE.QUEUE ;目標客戶機: JMS 。目標客戶機決定了 MQ 隊列接收方的消息格式。因為是用 MDB 接收消息,所以設置為 JMS 。另一個隊列是由 MQ 客戶端接收消息,所以另一個隊列的目標客戶機是 MQ 。如果配置錯誤, MQ 隊列轉換消息的格式將不是你所想要的。具體參考《 IBM - JMS 應用和使用 WebSphere MQ MQI 接口的應用如何進行信息頭的交換(二)數據映射》
3. WebSphere MQ 隊列目標
JNDI name : jms/JMSExampleSendQueue ;
基本隊列名: EXAMPLE.SENDQUEUE ;目標客戶機: MQ 。
4. 配置 MDB

在 WAS 上配置 偵聽器端口
名稱: JMSExampleQueuePort ;
連接工廠 JNDI 名 jms/JMSExampleConnectionFactory ;
目標 JNDI 名: jms/JMSExampleQueue 。
Message Driven Beans 用於偵聽消息的偵聽器端口。每個端口指定 MDB 將偵聽的(依據該端口部署的) JMS 連接工廠和 JMS 目標。

MDB 部署描述符中配置
連接工廠 JNDI 名 jms/JMSExampleConnectionFactory ;
目標 JNDI 名: jms/JMSExampleQueue ;
監聽端口名稱: JMSExampleQueuePort (監聽端口名稱也可以在管理控制台中修改)
5. 代碼

客戶端 A (發送方)

MqPut.java
package cn.edu.itec.mqclient;

import java.io.File;

import com.ibm.mq.MQC;
import com.ibm.mq.MQEnvironment;
import com.ibm.mq.MQException;
import com.ibm.mq.MQMessage;
import com.ibm.mq.MQPutMessageOptions;
import com.ibm.mq.MQQueueManager;

public class MQPut {
private String HOST_URL = “192.168.1.116”;

   private String MQ_CHANNEL = "EXAMPLE.CHANNEL";

   private String MQ_MANAGER = "QMGR";

   private String MQ_QUEUE = "EXAMPLE.QUEUE";

   private int MQ_PORT = 4001;

   public static void main(String args[]) {
          new MQPut().SendFile("f:/JMSExampleEJB.jar");
   }

   public void SendFile(String sFilePath) {
          try {

                 /* 設置 MQEnvironment 屬性以便客戶機連接 */
                 MQEnvironment.hostname = HOST_URL;
                 MQEnvironment.channel = MQ_CHANNEL;
                 MQEnvironment.properties.put(MQC.TRANSPORT_PROPERTY,
                               MQC.TRANSPORT_MQSERIES);
                 MQEnvironment.CCSID = 1381;
                 MQEnvironment.port = MQ_PORT;

                 /* 連接到隊列管理器 */
                 MQQueueManager qMgr = new MQQueueManager(MQ_MANAGER);
                 System.out.println("queue manager is connected!");

                 /* 設置打開選項以便打開用於輸出的隊列,如果隊列管理器正在停止,我們也已設置了選項去應對不成功情況。 */
                 int openOptions = MQC.MQOO_OUTPUT | MQC.MQOO_FAIL_IF_QUIESCING;

                 /* 打開隊列 */
                 com.ibm.mq.MQQueue queue = qMgr.accessQueue(MQ_QUEUE, openOptions);

                 /* 設置放置消息選項我們將使用默認設置 */
                 MQPutMessageOptions pmo = new MQPutMessageOptions();

                 /* 創建消息, MQMessage 類包含實際消息數據的數據緩沖區,和描述消息的所有 MQMD 參數 */

                 /* 創建消息緩沖區 */
                 MQMessage outMsg = new MQMessage();

                 /* set the properties of the message fot the selector */
                 outMsg.correlationId = "clinet_B_receive".getBytes();
                 outMsg.messageId = "1Aa".getBytes();

                 /* write msg */
                 MsgWriter.readFile(outMsg, new File(sFilePath));

                 /* put message with default options */
                 queue.put(outMsg, new MQPutMessageOptions());
                 System.out.println("send file is success!");
                 /* release resource */
                 queue.close();
                 qMgr.disconnect();

          } catch (MQException ex) {
                 //System.out.println("fft!");
                 System.out.println("An MQ Error Occurred: Completion Code is :\t"
                               + ex.completionCode + "\n\n The Reason Code is :\t"
                               + ex.reasonCode);
                 ex.printStackTrace();
          } catch (Exception e) {
                 e.printStackTrace();
          }
   }

   private void readFileToMessage(String FilePath) {

   }

}

JMS message 和 MQ message 有幾個字段是相同的,這些字段的值將會在轉換中保留。比較方便的是使用 CorrelationID 這個字段。通過設置這個字段,達到選擇性的接收特定消息的功能。其它字段沒有完全搞清楚,有的數據類型需要轉換,例如 MessageID (對應於 JMSMessageID )。 MQ 消息選擇和 JMS 不同,后者采用 selector ,前者通過設置接收消息的屬性完成。例如設置 CorrelationID 為特定值。
客戶端 B

MQGet.java
package cn.edu.itec.mqclient;

import java.io.FileOutputStream;
import java.io.IOException;
import java.util.Hashtable;

import com.ibm.mq.MQC;
import com.ibm.mq.MQEnvironment;
import com.ibm.mq.MQException;
import com.ibm.mq.MQGetMessageOptions;
import com.ibm.mq.MQMessage;
import com.ibm.mq.MQQueueManager;

/**
* @author Administrator
*
* TODO To change the template for this generated type comment go to Window -
* Preferences - Java - Code Style - Code Templates
*/
public class MQGet {
private static String HOST_URL = “192.168.1.116”;

   private static String MQ_CHANNEL = "EXAMPLE.CHANNEL";

   private static String MQ_MANAGER = "QMGR";

   private static String MQ_QUEUE = "EXAMPLE.SENDQUEUE";

   private static int MQ_PORT = 4001;

   public static void main(String[] args) {
          MQGet.getMessage();
   }

   public static void getMessage() {
          try {
                 /* 設置 MQEnvironment 屬性以便客戶機連接 */
                 MQEnvironment.hostname = HOST_URL;
                 MQEnvironment.channel = MQ_CHANNEL;
                 MQEnvironment.properties.put(MQC.TRANSPORT_PROPERTY,
                               MQC.TRANSPORT_MQSERIES);
                 MQEnvironment.CCSID = 1381;
                 MQEnvironment.port = MQ_PORT;

                 /* 連接到隊列管理器 */
                 MQQueueManager qMgr = new MQQueueManager(MQ_MANAGER);
                 System.out.println("queue manager is connected!");

                 /*
                  * 設置打開選項以便打開用於輸出的隊列,如果隊列管理器停止,我們也 已設置了選項去應對不成功情況
                  */
                 int openOptions = MQC.MQOO_INPUT_SHARED
                               | MQC.MQOO_FAIL_IF_QUIESCING;

                 /* 打開隊列 */
                 com.ibm.mq.MQQueue queue = qMgr.accessQueue(MQ_QUEUE, openOptions);
                 System.out.println(" 隊列連接成功 ");
                 /* 設置放置消息選項 */
                 MQGetMessageOptions gmo = new MQGetMessageOptions();

                 /* 在同步點控制下獲取消息 */
                 gmo.options = gmo.options + MQC.MQGMO_SYNCPOINT;

                 /* 如果在隊列上沒有消息則等待 */
                 gmo.options = gmo.options + MQC.MQGMO_WAIT;

                 /* 如果隊列管理器停頓則失敗 */
                 gmo.options = gmo.options + MQC.MQGMO_FAIL_IF_QUIESCING;

                 /* 設置等待的時間限制 */
                 gmo.waitInterval = MQC.MQWI_UNLIMITED;
                 /* create the message buffer store */
                 MQMessage inMessage = new MQMessage();
                 /* set the selector */
                 inMessage.correlationId = "clinet_B_receive".getBytes();
                 /* get the message */
                 queue.get(inMessage, gmo);
                 System.out.println("get message success");

                 /* 讀出消息對象 */
                 Hashtable messageObject = (Hashtable) inMessage.readObject();
                 System.out.println(messageObject);
                 /* 讀出消息內容 */
                 byte[] content = (byte[]) messageObject.get("content");
                 /* save file */
                 FileOutputStream output = new FileOutputStream(
                               "f:/exampleReceive.jar");
                 output.write(content);
                 output.close();

                 System.out.println(messageObject.get("FileName"));
                 /* 提交事務 , 相當於確認消息已經接收,服務器會刪除該消息 */
                 qMgr.commit();

          } catch (MQException e) {
                 e.printStackTrace();
          } catch (IOException e) {
                 // TODO Auto-generated catch block
                 e.printStackTrace();
          } catch (ClassNotFoundException e) {
                 // TODO Auto-generated catch block
                 e.printStackTrace();
          }
   }

}

MDB

MQMDBBeanBean.java MDB 文件
package ejbs;

import javax.jms.ObjectMessage;
import javax.jms.BytesMessage;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;
import javax.jms.JMSException;
import ehub.ihub.exchangeManager.*;
import java.util.Hashtable;
import java.io.ByteArrayInputStream;
import java.io.FileOutputStream;
import java.io.File;
import java.io.ObjectInputStream;

/**
* Bean implementation class for Enterprise Bean: MQMDBBean
*/
public class MQMDBBeanBean implements javax.ejb.MessageDrivenBean,
javax.jms.MessageListener {
private javax.ejb.MessageDrivenContext fMessageDrivenCtx;

   /**
    * getMessageDrivenContext
    */
   public javax.ejb.MessageDrivenContext getMessageDrivenContext() {
          return fMessageDrivenCtx;
   }

   /**
    * setMessageDrivenContext
    */
   public void setMessageDrivenContext(javax.ejb.MessageDrivenContext ctx) {
          fMessageDrivenCtx = ctx;
   }

   /**
    * ejbCreate
    */
   public void ejbCreate() {
   }

   /**
    * onMessage
    */
   public void onMessage(javax.jms.Message msg) {
          try {
                 System.out.println(msg.toString());
                 if (msg instanceof TextMessage) {
                        System.out.println("TextMessage");
                 } else if (msg instanceof ObjectMessage) {
                        System.out.println("ObjectMessage");
                 } else if (msg instanceof StreamMessage) {
                        System.out.println("StreamMessage");
                 } else if (msg instanceof BytesMessage) {
                        System.out.println("BytesMessage");
                        BytesMessage bytesMessage = (BytesMessage) msg;
                        String sCorrelationID = new String(bytesMessage
                                      .getJMSCorrelationIDAsBytes());
                        String sMessageID = bytesMessage.getJMSMessageID();
                        long size = bytesMessage.getBodyLength();
                        System.out.println("size=" + size + "/n CorrelationID="
                                      + sCorrelationID + "/n MessageID=" + sMessageID);
                        /*read the message and save the file*/
                        ReadMessage.readMessage(bytesMessage);
                        System.out.println("read message success");
                        /*send the message to the client */
                        SendMessage.sendFileToReceiveQueue(new File("c:/receivedExample.jar"));
                        System.out.println("send file success");

                 } else {

                        System.out.println("no message");
                 }

          } catch (Exception e) {
                 System.out.println("onmessage 執行錯誤,回滾! ");
                 e.printStackTrace(System.err);
                 fMessageDrivenCtx.setRollbackOnly();
          }
   }

   private void getProperties(byte[] p) {

   }

   /**
    * ejbRemove
    */
   public void ejbRemove() {
   }

}

ReadMessage.java
/*
* Created on 2006-2-15
*
* TODO To change the template for this generated file go to
* Window - Preferences - Java - Code Style - Code Templates
*/
package ehub.ihub.exchangeManager;

import java.io.ByteArrayInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.util.Hashtable;

import javax.jms.BytesMessage;
import javax.jms.JMSException;

/**
* @author Administrator
*
*
*/
public class ReadMessage {
/**
* read message including property and body
*
* @param Message
* @throws JMSException
* @throws IOException
* @throws ClassNotFoundException
*/
public static void readMessage(BytesMessage Message) {
try {
long bodySize = Message.getBodyLength();

                 byte[] buf = new byte[Integer.parseInt(String.valueOf(bodySize))];
                 /* 消息內容讀到字節數組中 */
                 Message.readBytes(buf);
                 ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(
                               buf);
                 /* 從字節流讀出消息對象 */
                 ObjectInputStream objectInputStream = new ObjectInputStream(
                               byteArrayInputStream);
                 Hashtable messageObject = (Hashtable) objectInputStream
                               .readObject();
                 /* 解析消息 */
                 byte[] contentBuf = (byte[]) messageObject.get("content");
                 /* 把文件保存 */
                 FileOutputStream fileWriter = new FileOutputStream(
                               "c:/receivedExample.jar");
                 fileWriter.write(contentBuf);
                 fileWriter.close();
          } catch (JMSException e) {
                 e.printStackTrace();
          } catch (IOException e) {
                 // TODO Auto-generated catch block
                 e.printStackTrace();
          } catch (ClassNotFoundException e) {
                 // TODO Auto-generated catch block
                 e.printStackTrace();
          }
   }

}

SendMessage.java
/*
* Created on 2006-2-16
*
* TODO To change the template for this generated file go to
* Window - Preferences - Java - Code Style - Code Templates
*/
package ehub.ihub.exchangeManager;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.Hashtable;

import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Queue;
import javax.jms.Session;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;

/**
* @author Administrator
*
* TODO To change the template for this generated type comment go to Window -
* Preferences - Java - Code Style - Code Templates
*/
public class SendMessage {
private static String MQ_CHANNEL = “EXAMPLE.CHANNEL”;

   private static String MQ_MANAGER = "QMGR";

   private static String MQ_QUEUE = "EXAMPLE.SENDQUEUE";

   private static int MQ_PORT = 4001;

   private static String JMS_CONNECTIONFACTORY = "jms/JMSExampleConnectionFactory";

   private static String QUEUE_NAME="jms/JMSExampleSendQueue";

   public static void sendFileToReceiveQueue(File file) {

          try {
                 Context initContext = new InitialContext();
                 ConnectionFactory qconFactory = (ConnectionFactory) initContext
                               .lookup(JMS_CONNECTIONFACTORY);
                 Connection qcon = qconFactory.createConnection();
                 Session session = qcon.createSession(false,
                               Session.AUTO_ACKNOWLEDGE);
                 Queue queue = (Queue) initContext.lookup(QUEUE_NAME);

                 MessageProducer producer = session.createProducer(queue);
                 ObjectMessage outMessage=session.createObjectMessage();

                 /* write the file information into the message */
                 Hashtable fileInfo = new Hashtable();
                 fileInfo.put("FileName", file.getName());
                 fileInfo.put("FileSize", Long.toString(file.length()));

                 /* write the file content into the message */
                 FileInputStream fos = new FileInputStream(file);
                 byte[] buf;
                 int size = (int) file.length();
                 buf = new byte[size];
                 int num = fos.read(buf);
                 fos.close();

                 /*add the file byte stream to the object*/
                 fileInfo.put("content", buf);

                 outMessage.setObject(fileInfo);
                 outMessage.getObject();
                 outMessage.setJMSCorrelationIDAsBytes((new String("clinet_B_receive")).getBytes());

// qcon.start();

                 producer.send(outMessage);
                 producer.close();
                 session.close();
                 qcon.close();
          } catch (NamingException e) {
                 System.out.println(" 獲得連接失敗 ,jndi 查找失敗 ");
                 e.printStackTrace();
          } catch (JMSException e) {
                 System.out.println(" 發送文件異常 ");
                 // TODO Auto-generated catch block
                 e.printStackTrace();
          } catch (IOException e) {
                 // TODO Auto-generated catch block
                 System.out.println(" 發送文件過程中 io 操作失敗 ");
                 e.printStackTrace();
          }
   }

}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM