import com.ibm.mq.*; import com.ibm.msg.client.wmq.v6.base.internal.MQC; import java.io.IOException; /** * Created by EalenXie on 2017/2/17. */ public class MessageByMQ { //定義隊列管理器和隊列的名稱 private static String qmName ; private static String qName ; private static MQQueueManager qMgr ; static { /** * 設置環境: * MQEnvironment中包含控制MQQueueManager對象中的環境的構成的靜態變量 * MQEnvironment的值的設定會在MQQueueManager的構造函數加載的時候起作用 * 因此必須在建立MQQueueManager對象之前設定MQEnvironment中的值. */ MQEnvironment.hostname = "127.0.0.1"; //MQ服務器的IP地址 MQEnvironment.channel = "CHL_QM1_SERVER"; //服務器連接的通道 //服務器MQ服務使用的編碼1381代表GBK、1208代表UTF-8 MQEnvironment.CCSID = 1381; MQEnvironment.port = 1415; //MQ 端口 qmName = "QM1"; //MQ 的隊列管理器名稱 qName = "QM1_LOCAL"; //MQ 遠程隊列的名稱 try { //定義並初始化隊列管理器對象並連接 //MQQueueManager 可以被多線程共享,但是從MQ 獲取信息的時候是同步的,任何時候只有一個線程可以和MQ 通信。 qMgr = new MQQueueManager(qmName); } catch (MQException e) { // TODO Auto-generated catch block System.out.println("初使化MQ出錯"); e.printStackTrace(); } } /** * 往MQ發送消息 * * @param message * @return */ public static int sendMessage(String message) { int result = 0; try { //設置將要連接的隊列屬性 //目標為遠程隊列,所有這里不可以用MQOO_INPUT_AS_Q_DEF屬性 //int openOptions = MQC.MQOO_INPUT_AS_Q_DEF | MQC.MQOO_OUTPUT; //以下選項可適合遠程隊列與本地隊列 int openOptions = MQC.MQOO_OUTPUT | MQC.MQOO_FAIL_IF_QUIESCING; //連接隊列 //MQQueue provides inquire, set, put and get operations for WebSphere MQ queues. //The inquire and set capabilities are inherited from MQManagedObject. /*關閉了就重新打開*/ if (qMgr == null || !qMgr.isConnected()) { qMgr = new MQQueueManager(qmName); } MQQueue queue = qMgr.accessQueue(qName, openOptions); //定義一個簡單的消息 MQMessage putMessage = new MQMessage(); //將數據放入消息緩沖區 putMessage.writeUTF(message); //設置寫入消息的屬性(默認屬性) MQPutMessageOptions pmo = new MQPutMessageOptions(); //將消息寫入隊列 queue.put(putMessage, pmo); queue.close(); } catch (MQException ex) { System.out.println("A WebSphere MQ error occurred : Completion code " + ex.completionCode + " Reason code " + ex.reasonCode); ex.printStackTrace(); } catch (IOException ex) { System.out.println("An error occurred whilst writing to the message buffer: " + ex); } catch (Exception ex) { ex.printStackTrace(); } finally { try { qMgr.disconnect(); } catch (MQException e) { e.printStackTrace(); } } return result; } /** * 從隊列中去獲取消息,如果隊列中沒有消息,就會發生異常,不過沒有關系,有TRY...CATCH,如果是第三方程序調用方法,如果無返回則說明無消息 * 第三方可以將該方法放於一個無限循環的while(true){...}之中,不需要設置等待,因為在該方法內部在沒有消息的時候會自動等待。 * * @return */ public static String getMessage() { String message = null; try { //設置將要連接的隊列屬性 int openOptions = MQC.MQOO_INPUT_AS_Q_DEF | MQC.MQOO_OUTPUT; MQMessage retrieve = new MQMessage(); //設置取出消息的屬性(默認屬性) //設置放置消息選項 MQGetMessageOptions gmo = new MQGetMessageOptions(); gmo.options = gmo.options + MQC.MQGMO_SYNCPOINT; //在同步點控制下獲取消息 gmo.options = gmo.options + MQC.MQGMO_WAIT; //如果在隊列上沒有消息則等待 gmo.options = gmo.options + MQC.MQGMO_FAIL_IF_QUIESCING; //如果隊列管理器停頓則失敗 gmo.waitInterval = 1000; //設置等待的毫秒時間限制 /*關閉了就重新打開*/ if (qMgr == null || !qMgr.isConnected()) { qMgr = new MQQueueManager(qmName); } MQQueue queue = qMgr.accessQueue(qName, openOptions); // 從隊列中取出消息 queue.get(retrieve, gmo); message = retrieve.readUTF(); System.out.println("The message is: " + message); queue.close(); } catch (MQException ex) { System.out.println("A WebSphere MQ error occurred : Completion code " + ex.completionCode + " Reason code " + ex.reasonCode); } catch (IOException ex) { System.out.println("An error occurred whilst writing to the message buffer: " + ex); } catch (Exception ex) { ex.printStackTrace(); } finally { try { qMgr.disconnect(); } catch (MQException e) { e.printStackTrace(); } } return message; } public static void main(String args[]) { /*下面兩個方法可同時使用,也可以單獨使用*/ sendMessage("Hi Java MQ!"); getMessage(); } }