ActiveMQ的消息的(含附件)发送和接收使用


首先介绍一下ActiveMQ的版本:apache-activemq-5.10.2

启动MQ:activemq.bat

下面来编写MQ的发送类:

里面的发送ip和模式名称可以根据具体的实际情况填写。

SendMessageByMq.java

  1 public class SendMessageByMq {
  2 
  3     public static void main(String[] args) {
  4         String url = "";
  5         // String url = "D:/mqfile/84.zip";
  6         File file = new File(url);// 发送的文件
  7         System.out.println("file=======" + file);
  8         String sendType = "2";// 发送的类型 1发布一对一 ;2订阅一对多
  9         String isNotFile = "false";// 是否有附件true有 false没有
 10         String ip = ContentUtils.MQ_SEND_IP;// 发送ip
 11         String modeName = ContentUtils.MQ_POINT_QUEUENAME;// 模式名称
 12         String json = "[{\"name\":\"40013386.jpg\",\"url\":\"http://h.hiphotos.baidu.com/baik23e5.jpg\"}]";// 要发送的json数据
 13         // 发送方法
 14         String result = send(sendType, ip, modeName, json, file);
 15 
 16         if (result.equals("success")) {
 17             try {
 18                 System.out.println("开始接收1");
 19                 // 接收方法
 20                 ReceiveMessageByMq.receive(sendType, ip, isNotFile, modeName);
 21             } catch (JMSException e) {
 22                 e.printStackTrace();
 23             }
 24         }
 25     }
 26 
 27     /**
 28      * 
 29      * 
 30      * Title String Description
 31      * 
 32      * @author jacun
 33      * @date 2017-4-11上午11:44:17
 34      * @param sendType
 35      *            发送类型 1发布一对一 ;2订阅一对多
 36      * @param ipport
 37      *            发送ip和端口
 38      * @param modeName
 39      *            模式名称
 40      * @param jsonData
 41      *            要发送的json数据
 42      * @param file
 43      *            发送的文件
 44      * @return
 45      */
 46     public static String send(String sendType, String ip, String modeName,
 47             String jsonData, File file) {
 48         String str = null;
 49         System.out.println("开始发送1");
 50         try {
 51             // 获取 ConnectionFactory,ConnectionFactory:连接工厂,JMS用它创建连接
 52             ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
 53                     "tcp://"
 54                             + ip
 55                             + ":61616?jms.blobTransferPolicy.defaultUploadUrl=http://"
 56                             + ip + ":8161/fileserver/");
 57             // 创建 Connection,Connection:JMS客户端到JMS Provider的连接
 58             Connection connection = connectionFactory.createConnection();
 59             connection.start();
 60             // 创建 Session,Session:一个发送或接收消息的线程
 61             ActiveMQSession session = (ActiveMQSession) connection
 62                     .createSession(false, Session.AUTO_ACKNOWLEDGE);
 63             // 创建 Destination,Destination:消息的目的地;消息发送给谁.
 64             // Destination destination = null;
 65             // 判断是点对点1还是发布订阅2
 66             if ("2".equals(sendType)) {
 67                 System.out.println("一对多发布2");
 68                 createTopic(session, modeName, jsonData, file);
 69             } else {
 70                 System.out.println("一对一发布2");
 71                 // 点对点发布
 72                 createQueue(session, modeName, jsonData, file);
 73             }
 74 
 75             session.close();
 76             // 不关闭 Connection, 程序则不退出
 77             connection.close();
 78             // 发送完成删除文件
 79             // if (file != null) {
 80             // if (file.exists()) {
 81             // file.delete();
 82             // }
 83             // }
 84             str = "success";
 85             return str;
 86         } catch (JMSException e) {
 87             e.printStackTrace();
 88             str = "fail";
 89             return str;
 90         }
 91     }
 92 
 93     private static void createQueue(ActiveMQSession session, String modeName,
 94             String jsonData, File file) {
 95         try {
 96             Destination destination = session.createQueue(modeName);
 97             // 创建 Producer,MessageProducer:消息发送者
 98             MessageProducer producer = session.createProducer(destination);
 99             // 设置持久性的话,文件也可以先缓存下来,接收端离线再连接也可以收到文件
100             producer.setDeliveryMode(DeliveryMode.PERSISTENT);// 设置为持久性
101             if (file.length() > 0) {
102                 System.out.println("一对一上传文件3");
103                 // 构造 blobMessage,用来传输文件
104                 isFileTransfer(producer, session, file, jsonData);
105             } else {
106                 System.out.println("一对一无文件3");
107                 notFileTransfer(producer, session, jsonData);
108             }
109 
110         } catch (JMSException e) {
111             e.printStackTrace();
112         }
113 
114     }
115 
116     // 点对点无文件发送
117     private static void notFileTransfer(MessageProducer producer,
118             ActiveMQSession session, String jsonData) {
119         try {
120             TextMessage message = session.createTextMessage();
121             message.setStringProperty("sendType", "1");
122             message.setStringProperty("jsonData", jsonData);
123             message.setStringProperty("isNotFile", "false");
124             // 设置该消息的超时时间(有效期)
125             producer.setTimeToLive(60000);
126             // 发送
127             producer.send(message);
128             producer.close();
129             System.out.println("发送成功无文件4");
130         } catch (JMSException e) {
131             e.printStackTrace();
132         }
133 
134     }
135 
136     // 点对点有文件发送
137     private static void isFileTransfer(MessageProducer producer,
138             ActiveMQSession session, File file, String jsonData) {
139         try {
140             BlobMessage blobMessage = session.createBlobMessage(file);
141             blobMessage.setStringProperty("sendType", "1");
142             blobMessage.setStringProperty("jsonData", jsonData);
143             blobMessage.setStringProperty("isNotFile", "true");
144             // 设置该消息的超时时间(有效期)
145             producer.setTimeToLive(60000);
146             // 发送
147             producer.send(blobMessage);
148             producer.close();
149             System.out.println("发送成功有文件4");
150         } catch (JMSException e) {
151             e.printStackTrace();
152         }
153 
154     }
155 
156     private static void createTopic(ActiveMQSession session, String modeName,
157             String jsonData, File file) {
158         try {
159             Topic topic = session.createTopic(modeName);
160             MessageProducer producer = session.createProducer(topic);
161             producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
162             if (file.length() > 0) {
163                 System.out.println("一对多上传文件3");
164                 // 构造 blobMessage,用来传输文件
165                 isFileTransfer(producer, session, file, jsonData);
166             } else {
167                 System.out.println("一对多无文件3");
168                 notFileTransfer(producer, session, jsonData);
169             }
170         } catch (JMSException e) {
171             e.printStackTrace();
172         }
173     }
174 }

 

ActiveMQ的接收类:

里面的发送ip和模式名称可以根据具体的实际情况填写。

ReceiveMessageByMq.java

  1 public class ReceiveMessageByMq {
  2 
  3     public static void main(String[] args) {
  4         
  7         String receiveType = "1";// 接收的类型 1发布一对一 ;2订阅一对多
  8         String isNotFile = "true";// 是否有附件
  9         String ip = ContentUtils.MQ_RECEIVE_IP;// 接收ip
 10         String modeName = ContentUtils.MQ_POINT_QUEUENAME;// 模式名称
 11         try {
 12             receive(receiveType, ip, isNotFile, modeName);
 13         } catch (JMSException e) {
 14             e.printStackTrace();
 15         }
 16     }
 17 
 18     /**
 19      * 
 20      * 
 21      * Title void Description
 22      * 
 23      * @author jacun
 24      * @param modeName
 25      * @param ip
 26      * @param receiveType
 27      * @date 2017-4-11上午10:43:10
 28      * @throws JMSException
 29      */
 30     public static void receive(String receiveType, String ip, String isNotFile,
 31             String modeName) throws JMSException {
 32         System.out.println("开始接收2");
 33         // 获取 ConnectionFactory
 34         ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
 35                 "tcp://" + ip + ":61616");
 36         // 创建 Connection
 37         Connection connection = connectionFactory.createConnection();
 38         connection.start();
 39         // 创建 Session
 40         Session session = connection.createSession(false,
 41                 Session.AUTO_ACKNOWLEDGE);
 42         // 创建 Destinatione
 43         // 判断是一对一还是一对多
 44         if ("2".equals(receiveType)) {
 45             // 一对多
 46             System.out.println("一对多接收数据3");
 47             receiveTopic(session, isNotFile, modeName);
 48         } else {
 49             // 一对一
 50             System.out.println("一对一接收数据3");
 51             receiveQueue(session, isNotFile, modeName);
 52         }
 53 
 54     }
 55 
 56     private static void receiveTopic(Session session, String isNotFile,
 57             String modeName) {
 58         try {
 59             final String isFile = isNotFile;
 60             Destination destination = session.createTopic(modeName);
 61             // 创建 Consumer
 62             MessageConsumer consumer = session.createConsumer(destination);
 63             // 注册消息监听器,当消息到达时被触发并处理消息
 64             consumer.setMessageListener(new MessageListener() {
 65                 // 监听器中处理消息
 66                 public void onMessage(Message message) {
 67                     if ("true".equals(isFile)) {
 68                         System.out.println("有文件接收数据4");
 69                         ReceiveMessageByMq.receiveFile(message);
 70                     } else {
 71                         System.out.println("无文件接收数据4");
 72                         ReceiveMessageByMq.receiveData(message);
 73 
 74                     }
 75 
 76                 }
 77 
 78             });
 79         } catch (JMSException e) {
 80             e.printStackTrace();
 81         }
 82 
 83     }
 84 
 85     private static void receiveQueue(Session session, String isNotFile,
 86             String modeName) {
 87         try {
 88             final String isFile = isNotFile;
 89             Destination destination = session.createQueue(modeName);
 90             // 创建 Consumer
 91             MessageConsumer consumer = session.createConsumer(destination);
 92             // 注册消息监听器,当消息到达时被触发并处理消息
 93             consumer.setMessageListener(new MessageListener() {
 94                 // 监听器中处理消息
 95 
 96                 public void onMessage(Message message) {
 97                     if ("true".equals(isFile)) {
 98                         System.out.println("有文件接收数据4");
 99                         ReceiveMessageByMq.receiveFile(message);
100                     } else {
101                         System.out.println("无文件接收数据4");
102                         ReceiveMessageByMq.receiveData(message);
103 
104                     }
105 
106                 }
107 
108             });
109         } catch (JMSException e) {
110             e.printStackTrace();
111         }
112 
113     }
114 
115     protected static void receiveData(Message message) {
116         String sendType = null;
117         String jsonData = null;
118         try {
119             TextMessage msg = (TextMessage) message;
120             sendType = msg.getStringProperty("sendType");
121             jsonData = msg.getStringProperty("jsonData");
122         } catch (JMSException e) {
123             e.printStackTrace();
124         }
125         System.out.println("无文件接收成功5");
126         System.out.println(sendType);
127         System.out.println(jsonData);
128     }
129 
130     private static void receiveFile(Message message) {
131         String sendType = null;
132         String jsonData = null;
133         if (message instanceof BlobMessage) {
134             BlobMessage blobMessage = (BlobMessage) message;
135             try {
136                 String path = CreateZipFile.createZip("test");
137                 JFileChooser fileChooser = new JFileChooser();
138                 fileChooser.setDialogTitle("请指定文件保存位置");
139                 fileChooser.setSelectedFile(new File(path));
140                 File file = fileChooser.getSelectedFile();
141                 OutputStream os = new FileOutputStream(file);
142                 InputStream inputStream = blobMessage.getInputStream();
143                 // 写文件,你也可以使用其他方式
144                 byte[] buff = new byte[256];
145                 int len = 0;
146                 while ((len = inputStream.read(buff)) > 0) {
147                     os.write(buff, 0, len);
148                 }
149                 os.close();
150                 System.out.println("有文件接收成功5");
151                 sendType = blobMessage.getStringProperty("sendType");
152                 jsonData = blobMessage.getStringProperty("jsonData");
153                 System.out.println(sendType);
154                 System.out.println(jsonData);
155 
156             } catch (Exception e) {
157                 e.printStackTrace();
158             }
159 
160         }
161 
162     }
163 }

 

亲测好使,这两个工具类包含了发送和接收的方法,而且可以点对点或者发布订阅、有无附件均可,对了还有一点,ActiveMQ需要的jar包,网上信息很多!

 

补充:补充一个mq接收端自动连接到mq服务器的方法:

那就是将连接方式换一下就可以了:

ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin", "admin","failover:("tcp://" + ip + ":61616")?initialReconnectDelay=1000&maxReconnectDelay=30000");

 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM