MQ java 基礎編程
編寫人:鄔文俊
編寫時間 : 2006-2-16
聯系郵件 : wenjunwu430@gmail.com
前言
通過 2 個多星期對 MQ 學習,在 partner 丁 & partner 武 的幫助下完成了該文檔。該文檔提供一個簡單的例子,通過對該例子的講解,你將知道:
1. 用 java 寫客戶端從 MQ Server 收發消息。
2. MQ 作為 Websphere Application Server 的 JMS 資源提供者。
3. JMS message 映射轉化為 MQ message
文檔中的知識全部從參考資料和 IBM 提供的文檔中獲得。 I recommend u to read the documents if u want to know more about the MQ.
參考資料
- 《 Using java 》( some place name it 《 base java 》) —–the very important document offered by IBM, every java programmer should read it!
- 《讓 WebSphere MQ 成為部署在 WebSphere Application Server 中的應用程序的 JMS 提供程序》
- Websphere MQ 入門教程 (a document written by IBM engineer)
- mqseries_class_for_java
- 《 IBM - JMS 應用和使用 WebSphere MQ MQI 接口的應用如何進行信息頭的交換(二)數據映射》 ——- 《 using java 》 mapping message 部分的翻譯。
- MQ–IBM MQSeries 使用指南
- WebSphere Application Server V5 and WebSphere MQ Family Integration. PDF
- WebSphere MQ Application Programming Guide. PDF
- IBM MQSeries 的觸發機制
- 讓 WebSphere MQ 成為部署在 WebSphere Application Server 中的應用程序的 JMS 提供程序
例子說明
例子包括 3 個部分:發送客戶端、 MDB 、接收客戶端。
客戶端 A 把文件發送到服務器,服務器將該文件轉發給客戶端 B 。客戶端A通過向 MQ 客戶機直接發送消息。 MQ 隊列會把客戶端A發送的 MQ Message 轉換為 JMS Message 。 MDB 接收到轉換后的 JMS 消息后,保存文件在服務器,然后把文件轉發到發送隊列( JMS 隊列)。發送隊列由 MQ Server 提供,所以即為發送到了 MQ 隊列。 MQ 隊列把 JMS Message 轉換為 MQ Message 。客戶端 B 從 MQ 隊列中接收轉換后的消息,從消息中讀取文件,保存在硬盤。
MQMESSAGE JMS MESSAGE
Client A————————->mq queue ——————->MDB
Client B<———————— mq queue <——————-MDB
- 配置 MQ Server
這里使用的是 MQ Server 5.2 。 MQ Server 和 WebSphere Application Server 安裝在同一台機器上( 2 者可以使用綁定方式通信)。
要配置的項目:
1. 隊列管理器 QMGR
2. 偵聽端口 4001
3. 本地隊列 EXAMPLE.QUEUE
4. 本地隊列 EXAMPLE.SENDQUEUE
5. 通道 EXAMPLE.CHANNEL
打開 WebSphere MQ 資源管理器。展開隊列管理器節點,右鍵,新建隊列管理器。取名字為 QMGR ,設置偵聽端口 4001 。
在建好的隊列管理器 QMGR 下面新建 2 個本地隊列: EXAMPLE.QUEUE , EXAMPLE.SENDQUEUE 。
展開高級節點,新建服務器連接通道 EXAMPLE.CHANNEL 。
Note :不要搞錯隊列和通道的類型。
2. 驗證 MQ 配置
打開 WebSphere MQ 服務。可以查看服務是否啟動、服務監聽端口。
3. 配置 WAS JMS
具體操作參考《讓 WebSphere MQ 成為部署在 WebSphere Application Server 中的應用程序的 JMS 提供程序》該文章可以在 IBM WebSphere 開發者技術期刊 中找到。
要配置的項目:
1. WebSphere MQ 連接工廠
JNDI name : jms/JMSExampleConnectionFactory
2. WebSphere MQ 隊列目標
JNDI name : jms/JMSExampleQueue ;基本隊列名: EXAMPLE.QUEUE ;目標客戶機: JMS 。目標客戶機決定了 MQ 隊列接收方的消息格式。因為是用 MDB 接收消息,所以設置為 JMS 。另一個隊列是由 MQ 客戶端接收消息,所以另一個隊列的目標客戶機是 MQ 。如果配置錯誤, MQ 隊列轉換消息的格式將不是你所想要的。具體參考《 IBM - JMS 應用和使用 WebSphere MQ MQI 接口的應用如何進行信息頭的交換(二)數據映射》
3. WebSphere MQ 隊列目標
JNDI name : jms/JMSExampleSendQueue ;
基本隊列名: EXAMPLE.SENDQUEUE ;目標客戶機: MQ 。
4. 配置 MDB
在 WAS 上配置 偵聽器端口
名稱: JMSExampleQueuePort ;
連接工廠 JNDI 名 jms/JMSExampleConnectionFactory ;
目標 JNDI 名: jms/JMSExampleQueue 。
Message Driven Beans 用於偵聽消息的偵聽器端口。每個端口指定 MDB 將偵聽的(依據該端口部署的) JMS 連接工廠和 JMS 目標。
MDB 部署描述符中配置
連接工廠 JNDI 名 jms/JMSExampleConnectionFactory ;
目標 JNDI 名: jms/JMSExampleQueue ;
監聽端口名稱: JMSExampleQueuePort (監聽端口名稱也可以在管理控制台中修改)
5. 代碼
客戶端 A (發送方)
MqPut.java
package cn.edu.itec.mqclient;
import java.io.File;
import com.ibm.mq.MQC;
import com.ibm.mq.MQEnvironment;
import com.ibm.mq.MQException;
import com.ibm.mq.MQMessage;
import com.ibm.mq.MQPutMessageOptions;
import com.ibm.mq.MQQueueManager;
public class MQPut {
private String HOST_URL = “192.168.1.116”;
private String MQ_CHANNEL = "EXAMPLE.CHANNEL";
private String MQ_MANAGER = "QMGR";
private String MQ_QUEUE = "EXAMPLE.QUEUE";
private int MQ_PORT = 4001;
public static void main(String args[]) {
new MQPut().SendFile("f:/JMSExampleEJB.jar");
}
public void SendFile(String sFilePath) {
try {
/* 設置 MQEnvironment 屬性以便客戶機連接 */
MQEnvironment.hostname = HOST_URL;
MQEnvironment.channel = MQ_CHANNEL;
MQEnvironment.properties.put(MQC.TRANSPORT_PROPERTY,
MQC.TRANSPORT_MQSERIES);
MQEnvironment.CCSID = 1381;
MQEnvironment.port = MQ_PORT;
/* 連接到隊列管理器 */
MQQueueManager qMgr = new MQQueueManager(MQ_MANAGER);
System.out.println("queue manager is connected!");
/* 設置打開選項以便打開用於輸出的隊列,如果隊列管理器正在停止,我們也已設置了選項去應對不成功情況。 */
int openOptions = MQC.MQOO_OUTPUT | MQC.MQOO_FAIL_IF_QUIESCING;
/* 打開隊列 */
com.ibm.mq.MQQueue queue = qMgr.accessQueue(MQ_QUEUE, openOptions);
/* 設置放置消息選項我們將使用默認設置 */
MQPutMessageOptions pmo = new MQPutMessageOptions();
/* 創建消息, MQMessage 類包含實際消息數據的數據緩沖區,和描述消息的所有 MQMD 參數 */
/* 創建消息緩沖區 */
MQMessage outMsg = new MQMessage();
/* set the properties of the message fot the selector */
outMsg.correlationId = "clinet_B_receive".getBytes();
outMsg.messageId = "1Aa".getBytes();
/* write msg */
MsgWriter.readFile(outMsg, new File(sFilePath));
/* put message with default options */
queue.put(outMsg, new MQPutMessageOptions());
System.out.println("send file is success!");
/* release resource */
queue.close();
qMgr.disconnect();
} catch (MQException ex) {
//System.out.println("fft!");
System.out.println("An MQ Error Occurred: Completion Code is :\t"
+ ex.completionCode + "\n\n The Reason Code is :\t"
+ ex.reasonCode);
ex.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
}
private void readFileToMessage(String FilePath) {
}
}
JMS message 和 MQ message 有幾個字段是相同的,這些字段的值將會在轉換中保留。比較方便的是使用 CorrelationID 這個字段。通過設置這個字段,達到選擇性的接收特定消息的功能。其它字段沒有完全搞清楚,有的數據類型需要轉換,例如 MessageID (對應於 JMSMessageID )。 MQ 消息選擇和 JMS 不同,后者采用 selector ,前者通過設置接收消息的屬性完成。例如設置 CorrelationID 為特定值。
客戶端 B
MQGet.java
package cn.edu.itec.mqclient;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.Hashtable;
import com.ibm.mq.MQC;
import com.ibm.mq.MQEnvironment;
import com.ibm.mq.MQException;
import com.ibm.mq.MQGetMessageOptions;
import com.ibm.mq.MQMessage;
import com.ibm.mq.MQQueueManager;
/**
* @author Administrator
*
* TODO To change the template for this generated type comment go to Window -
* Preferences - Java - Code Style - Code Templates
*/
public class MQGet {
private static String HOST_URL = “192.168.1.116”;
private static String MQ_CHANNEL = "EXAMPLE.CHANNEL";
private static String MQ_MANAGER = "QMGR";
private static String MQ_QUEUE = "EXAMPLE.SENDQUEUE";
private static int MQ_PORT = 4001;
public static void main(String[] args) {
MQGet.getMessage();
}
public static void getMessage() {
try {
/* 設置 MQEnvironment 屬性以便客戶機連接 */
MQEnvironment.hostname = HOST_URL;
MQEnvironment.channel = MQ_CHANNEL;
MQEnvironment.properties.put(MQC.TRANSPORT_PROPERTY,
MQC.TRANSPORT_MQSERIES);
MQEnvironment.CCSID = 1381;
MQEnvironment.port = MQ_PORT;
/* 連接到隊列管理器 */
MQQueueManager qMgr = new MQQueueManager(MQ_MANAGER);
System.out.println("queue manager is connected!");
/*
* 設置打開選項以便打開用於輸出的隊列,如果隊列管理器停止,我們也 已設置了選項去應對不成功情況
*/
int openOptions = MQC.MQOO_INPUT_SHARED
| MQC.MQOO_FAIL_IF_QUIESCING;
/* 打開隊列 */
com.ibm.mq.MQQueue queue = qMgr.accessQueue(MQ_QUEUE, openOptions);
System.out.println(" 隊列連接成功 ");
/* 設置放置消息選項 */
MQGetMessageOptions gmo = new MQGetMessageOptions();
/* 在同步點控制下獲取消息 */
gmo.options = gmo.options + MQC.MQGMO_SYNCPOINT;
/* 如果在隊列上沒有消息則等待 */
gmo.options = gmo.options + MQC.MQGMO_WAIT;
/* 如果隊列管理器停頓則失敗 */
gmo.options = gmo.options + MQC.MQGMO_FAIL_IF_QUIESCING;
/* 設置等待的時間限制 */
gmo.waitInterval = MQC.MQWI_UNLIMITED;
/* create the message buffer store */
MQMessage inMessage = new MQMessage();
/* set the selector */
inMessage.correlationId = "clinet_B_receive".getBytes();
/* get the message */
queue.get(inMessage, gmo);
System.out.println("get message success");
/* 讀出消息對象 */
Hashtable messageObject = (Hashtable) inMessage.readObject();
System.out.println(messageObject);
/* 讀出消息內容 */
byte[] content = (byte[]) messageObject.get("content");
/* save file */
FileOutputStream output = new FileOutputStream(
"f:/exampleReceive.jar");
output.write(content);
output.close();
System.out.println(messageObject.get("FileName"));
/* 提交事務 , 相當於確認消息已經接收,服務器會刪除該消息 */
qMgr.commit();
} catch (MQException e) {
e.printStackTrace();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (ClassNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
MDB
MQMDBBeanBean.java MDB 文件
package ejbs;
import javax.jms.ObjectMessage;
import javax.jms.BytesMessage;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;
import javax.jms.JMSException;
import ehub.ihub.exchangeManager.*;
import java.util.Hashtable;
import java.io.ByteArrayInputStream;
import java.io.FileOutputStream;
import java.io.File;
import java.io.ObjectInputStream;
/**
* Bean implementation class for Enterprise Bean: MQMDBBean
*/
public class MQMDBBeanBean implements javax.ejb.MessageDrivenBean,
javax.jms.MessageListener {
private javax.ejb.MessageDrivenContext fMessageDrivenCtx;
/**
* getMessageDrivenContext
*/
public javax.ejb.MessageDrivenContext getMessageDrivenContext() {
return fMessageDrivenCtx;
}
/**
* setMessageDrivenContext
*/
public void setMessageDrivenContext(javax.ejb.MessageDrivenContext ctx) {
fMessageDrivenCtx = ctx;
}
/**
* ejbCreate
*/
public void ejbCreate() {
}
/**
* onMessage
*/
public void onMessage(javax.jms.Message msg) {
try {
System.out.println(msg.toString());
if (msg instanceof TextMessage) {
System.out.println("TextMessage");
} else if (msg instanceof ObjectMessage) {
System.out.println("ObjectMessage");
} else if (msg instanceof StreamMessage) {
System.out.println("StreamMessage");
} else if (msg instanceof BytesMessage) {
System.out.println("BytesMessage");
BytesMessage bytesMessage = (BytesMessage) msg;
String sCorrelationID = new String(bytesMessage
.getJMSCorrelationIDAsBytes());
String sMessageID = bytesMessage.getJMSMessageID();
long size = bytesMessage.getBodyLength();
System.out.println("size=" + size + "/n CorrelationID="
+ sCorrelationID + "/n MessageID=" + sMessageID);
/*read the message and save the file*/
ReadMessage.readMessage(bytesMessage);
System.out.println("read message success");
/*send the message to the client */
SendMessage.sendFileToReceiveQueue(new File("c:/receivedExample.jar"));
System.out.println("send file success");
} else {
System.out.println("no message");
}
} catch (Exception e) {
System.out.println("onmessage 執行錯誤,回滾! ");
e.printStackTrace(System.err);
fMessageDrivenCtx.setRollbackOnly();
}
}
private void getProperties(byte[] p) {
}
/**
* ejbRemove
*/
public void ejbRemove() {
}
}
ReadMessage.java
/*
* Created on 2006-2-15
*
* TODO To change the template for this generated file go to
* Window - Preferences - Java - Code Style - Code Templates
*/
package ehub.ihub.exchangeManager;
import java.io.ByteArrayInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.util.Hashtable;
import javax.jms.BytesMessage;
import javax.jms.JMSException;
/**
* @author Administrator
*
*
*/
public class ReadMessage {
/**
* read message including property and body
*
* @param Message
* @throws JMSException
* @throws IOException
* @throws ClassNotFoundException
*/
public static void readMessage(BytesMessage Message) {
try {
long bodySize = Message.getBodyLength();
byte[] buf = new byte[Integer.parseInt(String.valueOf(bodySize))];
/* 消息內容讀到字節數組中 */
Message.readBytes(buf);
ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(
buf);
/* 從字節流讀出消息對象 */
ObjectInputStream objectInputStream = new ObjectInputStream(
byteArrayInputStream);
Hashtable messageObject = (Hashtable) objectInputStream
.readObject();
/* 解析消息 */
byte[] contentBuf = (byte[]) messageObject.get("content");
/* 把文件保存 */
FileOutputStream fileWriter = new FileOutputStream(
"c:/receivedExample.jar");
fileWriter.write(contentBuf);
fileWriter.close();
} catch (JMSException e) {
e.printStackTrace();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (ClassNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
SendMessage.java
/*
* Created on 2006-2-16
*
* TODO To change the template for this generated file go to
* Window - Preferences - Java - Code Style - Code Templates
*/
package ehub.ihub.exchangeManager;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.Hashtable;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Queue;
import javax.jms.Session;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
/**
* @author Administrator
*
* TODO To change the template for this generated type comment go to Window -
* Preferences - Java - Code Style - Code Templates
*/
public class SendMessage {
private static String MQ_CHANNEL = “EXAMPLE.CHANNEL”;
private static String MQ_MANAGER = "QMGR";
private static String MQ_QUEUE = "EXAMPLE.SENDQUEUE";
private static int MQ_PORT = 4001;
private static String JMS_CONNECTIONFACTORY = "jms/JMSExampleConnectionFactory";
private static String QUEUE_NAME="jms/JMSExampleSendQueue";
public static void sendFileToReceiveQueue(File file) {
try {
Context initContext = new InitialContext();
ConnectionFactory qconFactory = (ConnectionFactory) initContext
.lookup(JMS_CONNECTIONFACTORY);
Connection qcon = qconFactory.createConnection();
Session session = qcon.createSession(false,
Session.AUTO_ACKNOWLEDGE);
Queue queue = (Queue) initContext.lookup(QUEUE_NAME);
MessageProducer producer = session.createProducer(queue);
ObjectMessage outMessage=session.createObjectMessage();
/* write the file information into the message */
Hashtable fileInfo = new Hashtable();
fileInfo.put("FileName", file.getName());
fileInfo.put("FileSize", Long.toString(file.length()));
/* write the file content into the message */
FileInputStream fos = new FileInputStream(file);
byte[] buf;
int size = (int) file.length();
buf = new byte[size];
int num = fos.read(buf);
fos.close();
/*add the file byte stream to the object*/
fileInfo.put("content", buf);
outMessage.setObject(fileInfo);
outMessage.getObject();
outMessage.setJMSCorrelationIDAsBytes((new String("clinet_B_receive")).getBytes());
// qcon.start();
producer.send(outMessage);
producer.close();
session.close();
qcon.close();
} catch (NamingException e) {
System.out.println(" 獲得連接失敗 ,jndi 查找失敗 ");
e.printStackTrace();
} catch (JMSException e) {
System.out.println(" 發送文件異常 ");
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IOException e) {
// TODO Auto-generated catch block
System.out.println(" 發送文件過程中 io 操作失敗 ");
e.printStackTrace();
}
}
}