首先介紹一下ActiveMQ的版本:apache-activemq-5.10.2
啟動MQ:activemq.bat
下面來編寫MQ的發送類:
里面的發送ip和模式名稱可以根據具體的實際情況填寫。
SendMessageByMq.java
1 public class SendMessageByMq { 2 3 public static void main(String[] args) { 4 String url = ""; 5 // String url = "D:/mqfile/84.zip"; 6 File file = new File(url);// 發送的文件 7 System.out.println("file=======" + file); 8 String sendType = "2";// 發送的類型 1發布一對一 ;2訂閱一對多 9 String isNotFile = "false";// 是否有附件true有 false沒有 10 String ip = ContentUtils.MQ_SEND_IP;// 發送ip 11 String modeName = ContentUtils.MQ_POINT_QUEUENAME;// 模式名稱 12 String json = "[{\"name\":\"40013386.jpg\",\"url\":\"http://h.hiphotos.baidu.com/baik23e5.jpg\"}]";// 要發送的json數據 13 // 發送方法 14 String result = send(sendType, ip, modeName, json, file); 15 16 if (result.equals("success")) { 17 try { 18 System.out.println("開始接收1"); 19 // 接收方法 20 ReceiveMessageByMq.receive(sendType, ip, isNotFile, modeName); 21 } catch (JMSException e) { 22 e.printStackTrace(); 23 } 24 } 25 } 26 27 /** 28 * 29 * 30 * Title String Description 31 * 32 * @author jacun 33 * @date 2017-4-11上午11:44:17 34 * @param sendType 35 * 發送類型 1發布一對一 ;2訂閱一對多 36 * @param ipport 37 * 發送ip和端口 38 * @param modeName 39 * 模式名稱 40 * @param jsonData 41 * 要發送的json數據 42 * @param file 43 * 發送的文件 44 * @return 45 */ 46 public static String send(String sendType, String ip, String modeName, 47 String jsonData, File file) { 48 String str = null; 49 System.out.println("開始發送1"); 50 try { 51 // 獲取 ConnectionFactory,ConnectionFactory:連接工廠,JMS用它創建連接 52 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory( 53 "tcp://" 54 + ip 55 + ":61616?jms.blobTransferPolicy.defaultUploadUrl=http://" 56 + ip + ":8161/fileserver/"); 57 // 創建 Connection,Connection:JMS客戶端到JMS Provider的連接 58 Connection connection = connectionFactory.createConnection(); 59 connection.start(); 60 // 創建 Session,Session:一個發送或接收消息的線程 61 ActiveMQSession session = (ActiveMQSession) connection 62 .createSession(false, Session.AUTO_ACKNOWLEDGE); 63 // 創建 Destination,Destination:消息的目的地;消息發送給誰. 64 // Destination destination = null; 65 // 判斷是點對點1還是發布訂閱2 66 if ("2".equals(sendType)) { 67 System.out.println("一對多發布2"); 68 createTopic(session, modeName, jsonData, file); 69 } else { 70 System.out.println("一對一發布2"); 71 // 點對點發布 72 createQueue(session, modeName, jsonData, file); 73 } 74 75 session.close(); 76 // 不關閉 Connection, 程序則不退出 77 connection.close(); 78 // 發送完成刪除文件 79 // if (file != null) { 80 // if (file.exists()) { 81 // file.delete(); 82 // } 83 // } 84 str = "success"; 85 return str; 86 } catch (JMSException e) { 87 e.printStackTrace(); 88 str = "fail"; 89 return str; 90 } 91 } 92 93 private static void createQueue(ActiveMQSession session, String modeName, 94 String jsonData, File file) { 95 try { 96 Destination destination = session.createQueue(modeName); 97 // 創建 Producer,MessageProducer:消息發送者 98 MessageProducer producer = session.createProducer(destination); 99 // 設置持久性的話,文件也可以先緩存下來,接收端離線再連接也可以收到文件 100 producer.setDeliveryMode(DeliveryMode.PERSISTENT);// 設置為持久性 101 if (file.length() > 0) { 102 System.out.println("一對一上傳文件3"); 103 // 構造 blobMessage,用來傳輸文件 104 isFileTransfer(producer, session, file, jsonData); 105 } else { 106 System.out.println("一對一無文件3"); 107 notFileTransfer(producer, session, jsonData); 108 } 109 110 } catch (JMSException e) { 111 e.printStackTrace(); 112 } 113 114 } 115 116 // 點對點無文件發送 117 private static void notFileTransfer(MessageProducer producer, 118 ActiveMQSession session, String jsonData) { 119 try { 120 TextMessage message = session.createTextMessage(); 121 message.setStringProperty("sendType", "1"); 122 message.setStringProperty("jsonData", jsonData); 123 message.setStringProperty("isNotFile", "false"); 124 // 設置該消息的超時時間(有效期) 125 producer.setTimeToLive(60000); 126 // 發送 127 producer.send(message); 128 producer.close(); 129 System.out.println("發送成功無文件4"); 130 } catch (JMSException e) { 131 e.printStackTrace(); 132 } 133 134 } 135 136 // 點對點有文件發送 137 private static void isFileTransfer(MessageProducer producer, 138 ActiveMQSession session, File file, String jsonData) { 139 try { 140 BlobMessage blobMessage = session.createBlobMessage(file); 141 blobMessage.setStringProperty("sendType", "1"); 142 blobMessage.setStringProperty("jsonData", jsonData); 143 blobMessage.setStringProperty("isNotFile", "true"); 144 // 設置該消息的超時時間(有效期) 145 producer.setTimeToLive(60000); 146 // 發送 147 producer.send(blobMessage); 148 producer.close(); 149 System.out.println("發送成功有文件4"); 150 } catch (JMSException e) { 151 e.printStackTrace(); 152 } 153 154 } 155 156 private static void createTopic(ActiveMQSession session, String modeName, 157 String jsonData, File file) { 158 try { 159 Topic topic = session.createTopic(modeName); 160 MessageProducer producer = session.createProducer(topic); 161 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); 162 if (file.length() > 0) { 163 System.out.println("一對多上傳文件3"); 164 // 構造 blobMessage,用來傳輸文件 165 isFileTransfer(producer, session, file, jsonData); 166 } else { 167 System.out.println("一對多無文件3"); 168 notFileTransfer(producer, session, jsonData); 169 } 170 } catch (JMSException e) { 171 e.printStackTrace(); 172 } 173 } 174 }
ActiveMQ的接收類:
里面的發送ip和模式名稱可以根據具體的實際情況填寫。
ReceiveMessageByMq.java
1 public class ReceiveMessageByMq { 2 3 public static void main(String[] args) { 4 7 String receiveType = "1";// 接收的類型 1發布一對一 ;2訂閱一對多 8 String isNotFile = "true";// 是否有附件 9 String ip = ContentUtils.MQ_RECEIVE_IP;// 接收ip 10 String modeName = ContentUtils.MQ_POINT_QUEUENAME;// 模式名稱 11 try { 12 receive(receiveType, ip, isNotFile, modeName); 13 } catch (JMSException e) { 14 e.printStackTrace(); 15 } 16 } 17 18 /** 19 * 20 * 21 * Title void Description 22 * 23 * @author jacun 24 * @param modeName 25 * @param ip 26 * @param receiveType 27 * @date 2017-4-11上午10:43:10 28 * @throws JMSException 29 */ 30 public static void receive(String receiveType, String ip, String isNotFile, 31 String modeName) throws JMSException { 32 System.out.println("開始接收2"); 33 // 獲取 ConnectionFactory 34 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory( 35 "tcp://" + ip + ":61616"); 36 // 創建 Connection 37 Connection connection = connectionFactory.createConnection(); 38 connection.start(); 39 // 創建 Session 40 Session session = connection.createSession(false, 41 Session.AUTO_ACKNOWLEDGE); 42 // 創建 Destinatione 43 // 判斷是一對一還是一對多 44 if ("2".equals(receiveType)) { 45 // 一對多 46 System.out.println("一對多接收數據3"); 47 receiveTopic(session, isNotFile, modeName); 48 } else { 49 // 一對一 50 System.out.println("一對一接收數據3"); 51 receiveQueue(session, isNotFile, modeName); 52 } 53 54 } 55 56 private static void receiveTopic(Session session, String isNotFile, 57 String modeName) { 58 try { 59 final String isFile = isNotFile; 60 Destination destination = session.createTopic(modeName); 61 // 創建 Consumer 62 MessageConsumer consumer = session.createConsumer(destination); 63 // 注冊消息監聽器,當消息到達時被觸發並處理消息 64 consumer.setMessageListener(new MessageListener() { 65 // 監聽器中處理消息 66 public void onMessage(Message message) { 67 if ("true".equals(isFile)) { 68 System.out.println("有文件接收數據4"); 69 ReceiveMessageByMq.receiveFile(message); 70 } else { 71 System.out.println("無文件接收數據4"); 72 ReceiveMessageByMq.receiveData(message); 73 74 } 75 76 } 77 78 }); 79 } catch (JMSException e) { 80 e.printStackTrace(); 81 } 82 83 } 84 85 private static void receiveQueue(Session session, String isNotFile, 86 String modeName) { 87 try { 88 final String isFile = isNotFile; 89 Destination destination = session.createQueue(modeName); 90 // 創建 Consumer 91 MessageConsumer consumer = session.createConsumer(destination); 92 // 注冊消息監聽器,當消息到達時被觸發並處理消息 93 consumer.setMessageListener(new MessageListener() { 94 // 監聽器中處理消息 95 96 public void onMessage(Message message) { 97 if ("true".equals(isFile)) { 98 System.out.println("有文件接收數據4"); 99 ReceiveMessageByMq.receiveFile(message); 100 } else { 101 System.out.println("無文件接收數據4"); 102 ReceiveMessageByMq.receiveData(message); 103 104 } 105 106 } 107 108 }); 109 } catch (JMSException e) { 110 e.printStackTrace(); 111 } 112 113 } 114 115 protected static void receiveData(Message message) { 116 String sendType = null; 117 String jsonData = null; 118 try { 119 TextMessage msg = (TextMessage) message; 120 sendType = msg.getStringProperty("sendType"); 121 jsonData = msg.getStringProperty("jsonData"); 122 } catch (JMSException e) { 123 e.printStackTrace(); 124 } 125 System.out.println("無文件接收成功5"); 126 System.out.println(sendType); 127 System.out.println(jsonData); 128 } 129 130 private static void receiveFile(Message message) { 131 String sendType = null; 132 String jsonData = null; 133 if (message instanceof BlobMessage) { 134 BlobMessage blobMessage = (BlobMessage) message; 135 try { 136 String path = CreateZipFile.createZip("test"); 137 JFileChooser fileChooser = new JFileChooser(); 138 fileChooser.setDialogTitle("請指定文件保存位置"); 139 fileChooser.setSelectedFile(new File(path)); 140 File file = fileChooser.getSelectedFile(); 141 OutputStream os = new FileOutputStream(file); 142 InputStream inputStream = blobMessage.getInputStream(); 143 // 寫文件,你也可以使用其他方式 144 byte[] buff = new byte[256]; 145 int len = 0; 146 while ((len = inputStream.read(buff)) > 0) { 147 os.write(buff, 0, len); 148 } 149 os.close(); 150 System.out.println("有文件接收成功5"); 151 sendType = blobMessage.getStringProperty("sendType"); 152 jsonData = blobMessage.getStringProperty("jsonData"); 153 System.out.println(sendType); 154 System.out.println(jsonData); 155 156 } catch (Exception e) { 157 e.printStackTrace(); 158 } 159 160 } 161 162 } 163 }
親測好使,這兩個工具類包含了發送和接收的方法,而且可以點對點或者發布訂閱、有無附件均可,對了還有一點,ActiveMQ需要的jar包,網上信息很多!
補充:補充一個mq接收端自動連接到mq服務器的方法:
那就是將連接方式換一下就可以了:
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin", "admin","failover:("tcp://" + ip + ":61616")?initialReconnectDelay=1000&maxReconnectDelay=30000");