ActiveMQ的消息的(含附件)發送和接收使用


首先介紹一下ActiveMQ的版本:apache-activemq-5.10.2

啟動MQ:activemq.bat

下面來編寫MQ的發送類:

里面的發送ip和模式名稱可以根據具體的實際情況填寫。

SendMessageByMq.java

  1 public class SendMessageByMq {
  2 
  3     public static void main(String[] args) {
  4         String url = "";
  5         // String url = "D:/mqfile/84.zip";
  6         File file = new File(url);// 發送的文件
  7         System.out.println("file=======" + file);
  8         String sendType = "2";// 發送的類型 1發布一對一 ;2訂閱一對多
  9         String isNotFile = "false";// 是否有附件true有 false沒有
 10         String ip = ContentUtils.MQ_SEND_IP;// 發送ip
 11         String modeName = ContentUtils.MQ_POINT_QUEUENAME;// 模式名稱
 12         String json = "[{\"name\":\"40013386.jpg\",\"url\":\"http://h.hiphotos.baidu.com/baik23e5.jpg\"}]";// 要發送的json數據
 13         // 發送方法
 14         String result = send(sendType, ip, modeName, json, file);
 15 
 16         if (result.equals("success")) {
 17             try {
 18                 System.out.println("開始接收1");
 19                 // 接收方法
 20                 ReceiveMessageByMq.receive(sendType, ip, isNotFile, modeName);
 21             } catch (JMSException e) {
 22                 e.printStackTrace();
 23             }
 24         }
 25     }
 26 
 27     /**
 28      * 
 29      * 
 30      * Title String Description
 31      * 
 32      * @author jacun
 33      * @date 2017-4-11上午11:44:17
 34      * @param sendType
 35      *            發送類型 1發布一對一 ;2訂閱一對多
 36      * @param ipport
 37      *            發送ip和端口
 38      * @param modeName
 39      *            模式名稱
 40      * @param jsonData
 41      *            要發送的json數據
 42      * @param file
 43      *            發送的文件
 44      * @return
 45      */
 46     public static String send(String sendType, String ip, String modeName,
 47             String jsonData, File file) {
 48         String str = null;
 49         System.out.println("開始發送1");
 50         try {
 51             // 獲取 ConnectionFactory,ConnectionFactory:連接工廠,JMS用它創建連接
 52             ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
 53                     "tcp://"
 54                             + ip
 55                             + ":61616?jms.blobTransferPolicy.defaultUploadUrl=http://"
 56                             + ip + ":8161/fileserver/");
 57             // 創建 Connection,Connection:JMS客戶端到JMS Provider的連接
 58             Connection connection = connectionFactory.createConnection();
 59             connection.start();
 60             // 創建 Session,Session:一個發送或接收消息的線程
 61             ActiveMQSession session = (ActiveMQSession) connection
 62                     .createSession(false, Session.AUTO_ACKNOWLEDGE);
 63             // 創建 Destination,Destination:消息的目的地;消息發送給誰.
 64             // Destination destination = null;
 65             // 判斷是點對點1還是發布訂閱2
 66             if ("2".equals(sendType)) {
 67                 System.out.println("一對多發布2");
 68                 createTopic(session, modeName, jsonData, file);
 69             } else {
 70                 System.out.println("一對一發布2");
 71                 // 點對點發布
 72                 createQueue(session, modeName, jsonData, file);
 73             }
 74 
 75             session.close();
 76             // 不關閉 Connection, 程序則不退出
 77             connection.close();
 78             // 發送完成刪除文件
 79             // if (file != null) {
 80             // if (file.exists()) {
 81             // file.delete();
 82             // }
 83             // }
 84             str = "success";
 85             return str;
 86         } catch (JMSException e) {
 87             e.printStackTrace();
 88             str = "fail";
 89             return str;
 90         }
 91     }
 92 
 93     private static void createQueue(ActiveMQSession session, String modeName,
 94             String jsonData, File file) {
 95         try {
 96             Destination destination = session.createQueue(modeName);
 97             // 創建 Producer,MessageProducer:消息發送者
 98             MessageProducer producer = session.createProducer(destination);
 99             // 設置持久性的話,文件也可以先緩存下來,接收端離線再連接也可以收到文件
100             producer.setDeliveryMode(DeliveryMode.PERSISTENT);// 設置為持久性
101             if (file.length() > 0) {
102                 System.out.println("一對一上傳文件3");
103                 // 構造 blobMessage,用來傳輸文件
104                 isFileTransfer(producer, session, file, jsonData);
105             } else {
106                 System.out.println("一對一無文件3");
107                 notFileTransfer(producer, session, jsonData);
108             }
109 
110         } catch (JMSException e) {
111             e.printStackTrace();
112         }
113 
114     }
115 
116     // 點對點無文件發送
117     private static void notFileTransfer(MessageProducer producer,
118             ActiveMQSession session, String jsonData) {
119         try {
120             TextMessage message = session.createTextMessage();
121             message.setStringProperty("sendType", "1");
122             message.setStringProperty("jsonData", jsonData);
123             message.setStringProperty("isNotFile", "false");
124             // 設置該消息的超時時間(有效期)
125             producer.setTimeToLive(60000);
126             // 發送
127             producer.send(message);
128             producer.close();
129             System.out.println("發送成功無文件4");
130         } catch (JMSException e) {
131             e.printStackTrace();
132         }
133 
134     }
135 
136     // 點對點有文件發送
137     private static void isFileTransfer(MessageProducer producer,
138             ActiveMQSession session, File file, String jsonData) {
139         try {
140             BlobMessage blobMessage = session.createBlobMessage(file);
141             blobMessage.setStringProperty("sendType", "1");
142             blobMessage.setStringProperty("jsonData", jsonData);
143             blobMessage.setStringProperty("isNotFile", "true");
144             // 設置該消息的超時時間(有效期)
145             producer.setTimeToLive(60000);
146             // 發送
147             producer.send(blobMessage);
148             producer.close();
149             System.out.println("發送成功有文件4");
150         } catch (JMSException e) {
151             e.printStackTrace();
152         }
153 
154     }
155 
156     private static void createTopic(ActiveMQSession session, String modeName,
157             String jsonData, File file) {
158         try {
159             Topic topic = session.createTopic(modeName);
160             MessageProducer producer = session.createProducer(topic);
161             producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
162             if (file.length() > 0) {
163                 System.out.println("一對多上傳文件3");
164                 // 構造 blobMessage,用來傳輸文件
165                 isFileTransfer(producer, session, file, jsonData);
166             } else {
167                 System.out.println("一對多無文件3");
168                 notFileTransfer(producer, session, jsonData);
169             }
170         } catch (JMSException e) {
171             e.printStackTrace();
172         }
173     }
174 }

 

ActiveMQ的接收類:

里面的發送ip和模式名稱可以根據具體的實際情況填寫。

ReceiveMessageByMq.java

  1 public class ReceiveMessageByMq {
  2 
  3     public static void main(String[] args) {
  4         
  7         String receiveType = "1";// 接收的類型 1發布一對一 ;2訂閱一對多
  8         String isNotFile = "true";// 是否有附件
  9         String ip = ContentUtils.MQ_RECEIVE_IP;// 接收ip
 10         String modeName = ContentUtils.MQ_POINT_QUEUENAME;// 模式名稱
 11         try {
 12             receive(receiveType, ip, isNotFile, modeName);
 13         } catch (JMSException e) {
 14             e.printStackTrace();
 15         }
 16     }
 17 
 18     /**
 19      * 
 20      * 
 21      * Title void Description
 22      * 
 23      * @author jacun
 24      * @param modeName
 25      * @param ip
 26      * @param receiveType
 27      * @date 2017-4-11上午10:43:10
 28      * @throws JMSException
 29      */
 30     public static void receive(String receiveType, String ip, String isNotFile,
 31             String modeName) throws JMSException {
 32         System.out.println("開始接收2");
 33         // 獲取 ConnectionFactory
 34         ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
 35                 "tcp://" + ip + ":61616");
 36         // 創建 Connection
 37         Connection connection = connectionFactory.createConnection();
 38         connection.start();
 39         // 創建 Session
 40         Session session = connection.createSession(false,
 41                 Session.AUTO_ACKNOWLEDGE);
 42         // 創建 Destinatione
 43         // 判斷是一對一還是一對多
 44         if ("2".equals(receiveType)) {
 45             // 一對多
 46             System.out.println("一對多接收數據3");
 47             receiveTopic(session, isNotFile, modeName);
 48         } else {
 49             // 一對一
 50             System.out.println("一對一接收數據3");
 51             receiveQueue(session, isNotFile, modeName);
 52         }
 53 
 54     }
 55 
 56     private static void receiveTopic(Session session, String isNotFile,
 57             String modeName) {
 58         try {
 59             final String isFile = isNotFile;
 60             Destination destination = session.createTopic(modeName);
 61             // 創建 Consumer
 62             MessageConsumer consumer = session.createConsumer(destination);
 63             // 注冊消息監聽器,當消息到達時被觸發並處理消息
 64             consumer.setMessageListener(new MessageListener() {
 65                 // 監聽器中處理消息
 66                 public void onMessage(Message message) {
 67                     if ("true".equals(isFile)) {
 68                         System.out.println("有文件接收數據4");
 69                         ReceiveMessageByMq.receiveFile(message);
 70                     } else {
 71                         System.out.println("無文件接收數據4");
 72                         ReceiveMessageByMq.receiveData(message);
 73 
 74                     }
 75 
 76                 }
 77 
 78             });
 79         } catch (JMSException e) {
 80             e.printStackTrace();
 81         }
 82 
 83     }
 84 
 85     private static void receiveQueue(Session session, String isNotFile,
 86             String modeName) {
 87         try {
 88             final String isFile = isNotFile;
 89             Destination destination = session.createQueue(modeName);
 90             // 創建 Consumer
 91             MessageConsumer consumer = session.createConsumer(destination);
 92             // 注冊消息監聽器,當消息到達時被觸發並處理消息
 93             consumer.setMessageListener(new MessageListener() {
 94                 // 監聽器中處理消息
 95 
 96                 public void onMessage(Message message) {
 97                     if ("true".equals(isFile)) {
 98                         System.out.println("有文件接收數據4");
 99                         ReceiveMessageByMq.receiveFile(message);
100                     } else {
101                         System.out.println("無文件接收數據4");
102                         ReceiveMessageByMq.receiveData(message);
103 
104                     }
105 
106                 }
107 
108             });
109         } catch (JMSException e) {
110             e.printStackTrace();
111         }
112 
113     }
114 
115     protected static void receiveData(Message message) {
116         String sendType = null;
117         String jsonData = null;
118         try {
119             TextMessage msg = (TextMessage) message;
120             sendType = msg.getStringProperty("sendType");
121             jsonData = msg.getStringProperty("jsonData");
122         } catch (JMSException e) {
123             e.printStackTrace();
124         }
125         System.out.println("無文件接收成功5");
126         System.out.println(sendType);
127         System.out.println(jsonData);
128     }
129 
130     private static void receiveFile(Message message) {
131         String sendType = null;
132         String jsonData = null;
133         if (message instanceof BlobMessage) {
134             BlobMessage blobMessage = (BlobMessage) message;
135             try {
136                 String path = CreateZipFile.createZip("test");
137                 JFileChooser fileChooser = new JFileChooser();
138                 fileChooser.setDialogTitle("請指定文件保存位置");
139                 fileChooser.setSelectedFile(new File(path));
140                 File file = fileChooser.getSelectedFile();
141                 OutputStream os = new FileOutputStream(file);
142                 InputStream inputStream = blobMessage.getInputStream();
143                 // 寫文件,你也可以使用其他方式
144                 byte[] buff = new byte[256];
145                 int len = 0;
146                 while ((len = inputStream.read(buff)) > 0) {
147                     os.write(buff, 0, len);
148                 }
149                 os.close();
150                 System.out.println("有文件接收成功5");
151                 sendType = blobMessage.getStringProperty("sendType");
152                 jsonData = blobMessage.getStringProperty("jsonData");
153                 System.out.println(sendType);
154                 System.out.println(jsonData);
155 
156             } catch (Exception e) {
157                 e.printStackTrace();
158             }
159 
160         }
161 
162     }
163 }

 

親測好使,這兩個工具類包含了發送和接收的方法,而且可以點對點或者發布訂閱、有無附件均可,對了還有一點,ActiveMQ需要的jar包,網上信息很多!

 

補充:補充一個mq接收端自動連接到mq服務器的方法:

那就是將連接方式換一下就可以了:

ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin", "admin","failover:("tcp://" + ip + ":61616")?initialReconnectDelay=1000&maxReconnectDelay=30000");

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM