原创论文:https://www.cnblogs.com/goujh/p/8510239.html
消息队列的应用场景:
消息队列应用场景 异步处理,应用解耦,流量削锋和消息通讯四个场景 异步处理: 场景说明:用户注册后,需要发注册邮件和注册短信 应用解耦: 场景说明:用户下单后,订单系统需要通知库存系统 流量削锋: 应用场景:秒杀活动,一般会因为流量过大,导致流量暴增,应用容易挂掉. 1、用户的请求,服务器接收后,首先写入消息队列。假如消息队列长度超过最大数量,则直接抛弃用户请求或跳转到错误页面 2、秒杀业务根据消息队列中的请求信息,再做后续处理 日志处理: 应用场景:日志处理是指将消息队列用在日志处理中,比如Kafka的应用,解决大量日志传输的问题
1、在安装ActiveMQ
http://activemq.apache.org/activemq-5158-release.html
2、解压启动服务
tar -zxvf apache-activemq-5.15.8-bin.tar.gz 进入目录,运行./bin/activemq start
3、网页查看
网址:http://139.199.64.189:8161/
点击:Manage ActiveMQ broker
输入默认用户和密码都为:admin
4、创建maven工程,在pom.xml文件中添加
<!-- https://mvnrepository.com/artifact/org.apache.activemq/activemq-core --> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-core</artifactId> <version>5.7.0</version> </dependency>
/****************************************************消息队列***************************************************************/
注意:消息队列模式,当生产者生产消息后,会将消息放入消息队列中,一旦消费者启动了,立马回读取到。
5、创建JMSConsumer.java文件,这是一个消费者
package com.activemq.demo.method1; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.MessageConsumer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class JMSConsumer { //默认连接用户名 private static final String USERNAME = "admin";//ActiveMQConnection.DEFAULT_USER; //默认连接密码 private static final String PASSWORD = "admin";//ActiveMQConnection.DEFAULT_PASSWORD; //默认连接地址,默认端口为61616 private static final String BROKERURL = "tcp://ip:61616";//ActiveMQConnection.DEFAULT_BROKER_URL; public static void main(String[] args) { //连接工厂 ConnectionFactory connectionFactory; //连接 Connection connection = null; //会话,接收或者发送消息的线程 Session session; //消息目的地 Destination destination; //消息的消费者 MessageConsumer messageConsumer; //实例化连接工厂 connectionFactory = new ActiveMQConnectionFactory(JMSConsumer.USERNAME, JMSConsumer.PASSWORD, JMSConsumer.BROKERURL); try { //通过工厂获取连接 connection = connectionFactory.createConnection(); //启动连接 connection.start(); //创建会话 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //创建一个连接Hello World!的消息队列 destination = session.createQueue("Hello World"); //创建消息的消费者 messageConsumer = session.createConsumer(destination); while(true){ TextMessage textMessage = (TextMessage) messageConsumer.receive(100000); if (textMessage != null) { System.err.println("收到的消息:" + textMessage.getText()); } else { break; } } } catch (Exception e) { e.printStackTrace(); } } }
6、创建JMSProducer.java,这是一个生产者
package com.activemq.demo.method1; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class JMSProducer { //默认连接用户名 private static final String USERNAME = "admin";//ActiveMQConnection.DEFAULT_USER; //默认连接密码 private static final String PASSWORD = "admin";//ActiveMQConnection.DEFAULT_PASSWORD; //默认连接地址,默认端口为61616 private static final String BROKERURL = "tcp://ip:61616";//ActiveMQConnection.DEFAULT_BROKER_URL; //发送的消息数量 private static final int SENDNUM = 10; public static void main(String[] args) { //连接工厂 ConnectionFactory connectionFactory; //连接 Connection connection = null; //会话,接收或者发送消息的线程 Session session; //消息的目的地 Destination destination; //消息生产者 MessageProducer messageProducer; //实例化连接工厂 connectionFactory = new ActiveMQConnectionFactory(JMSProducer.USERNAME, JMSProducer.PASSWORD, JMSProducer.BROKERURL); try { //通过连接工厂获取连接 connection = connectionFactory.createConnection(); //启动连接 connection.start(); //创建session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); //创建一个名称为Hello World!的消息队列 destination = session.createQueue("Hello World"); //创建消息生产者 messageProducer = session.createProducer(destination); //发送消息 sendMessage(session,messageProducer); session.commit(); } catch (Exception e) { e.printStackTrace(); } finally{ if(connection != null){ try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } } /** * 发送消息 * @param session * @param messageProducer 消息生产者 * @throws Exception */ public static void sendMessage(Session session,MessageProducer messageProducer) throws Exception{ for (int i = 0; i < JMSProducer.SENDNUM; i++) { //创建一条文本消息 TextMessage message = session.createTextMessage("activemq 发送消息:" + i); System.err.println("发送消息:activemq 发送消息:" + i); //通过消息生产者发出消息 messageProducer.send(message); } } }
/**********************************************************发布订阅模式************************************************************/
注意:发布订阅者模式,生产者创建了消息,如果消费者没有启动是获取不到消息;只有等消费者启动后,生产者再生产消息,消费者才会有获取到消息
7、创建JMSConsumer.java文件

package com.activemq.demo.method2; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.MessageConsumer; import javax.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory; public class JMSConsumer { //默认连接用户名 private static final String USERNAME = "admin";//ActiveMQConnection.DEFAULT_USER; //默认连接密码 private static final String PASSWORD = "admin";//ActiveMQConnection.DEFAULT_PASSWORD; //默认连接地址 private static final String BROKERURL = "tcp://139.199.64.189:61616";//ActiveMQConnection.DEFAULT_BROKER_URL; public static void main(String[] args) { ConnectionFactory connectionFactory; // 连接工厂 Connection connection = null; // 连接 Session session; // 会话 接受或者发送消息的线程 Destination destination; // 消息的目的地 MessageConsumer consumer; //创建消费者 // 实例化连接工厂 connectionFactory=new ActiveMQConnectionFactory(JMSConsumer.USERNAME, JMSConsumer.PASSWORD, JMSConsumer.BROKERURL); try { connection=connectionFactory.createConnection(); // 通过连接工厂获取连接 connection.start(); // 启动连接 /** * 这里的最好使用Boolean.FALSE,如果是用true则必须commit才能生效,且http://127.0.0.1:8161/admin管理页面才会更新消息队列的变化情况。 */ session=connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 创建Session // destination=session.createQueue("FirstQueue1"); // 创建消息队列 destination=session.createTopic("firstTopic"); consumer=session.createConsumer(destination); consumer.setMessageListener(new MyListener()); // 注册消息监听 } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
8、创建JMSProducer.java文件

package com.activemq.demo.method2; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; /** * 消息生产者 * @author Administrator * */ public class JMSProducer { //默认连接用户名 private static final String USERNAME = "admin";//ActiveMQConnection.DEFAULT_USER; //默认连接密码 private static final String PASSWORD = "admin";//ActiveMQConnection.DEFAULT_PASSWORD; //默认连接地址 private static final String BROKERURL = "tcp://139.199.64.189:61616";//ActiveMQConnection.DEFAULT_BROKER_URL; private static final int SENDNUM=10; // 发送的消息数量 public static void main(String[] args) { ConnectionFactory connectionFactory; // 连接工厂 Connection connection = null; // 连接 Session session; // 会话 接受或者发送消息的线程 Destination destination; // 消息的目的地 MessageProducer messageProducer; // 消息生产者 // 实例化连接工厂 connectionFactory=new ActiveMQConnectionFactory(JMSProducer.USERNAME, JMSProducer.PASSWORD, JMSProducer.BROKERURL); try { connection=connectionFactory.createConnection(); // 通过连接工厂获取连接 connection.start(); // 启动连接 session=connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // 创建Session // destination=session.createQueue("FirstQueue1"); // 创建消息队列 destination=session.createTopic("firstTopic"); messageProducer=session.createProducer(destination); // 创建消息生产者 sendMessage(session, messageProducer); // 发送消息 session.commit(); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } finally{ if(connection!=null){ try { connection.close(); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } /** * 发送消息 * @param session * @param messageProducer * @throws Exception */ public static void sendMessage(Session session,MessageProducer messageProducer)throws Exception{ for(int i=0;i<JMSProducer.SENDNUM;i++){ TextMessage message=session.createTextMessage("ActiveMQ 发布的消息"+i); System.out.println("发送消息:"+"ActiveMQ 发布的消息"+i); messageProducer.send(message); } } }
9、创建监听文件MyListener.java
package com.activemq.demo.method2; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; public class MyListener implements MessageListener { @Override public void onMessage(Message message) { try { System.out.println("订阅者一收到的消息:"+((TextMessage)message).getText()); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
7、运行两个实例,可在web端查看