JMS-AciveMQ


一:JMS基本概念

JMS即Java消息服务(Java Message Service)应用程序接口是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信(用于解决两个或者多个程序之间的耦合)。它便于消息系统中的 Java 应用程序进行消息交换,并且通过提供标准的产生、发送、接收消息的接口简化企业应用的开发。

也就是说它定义看一系列规范,然后大家按照这种规范来开发自己消息服务,当然,现在有好多开源的来供大家使用了, 比如说Apache ActiveMQ、RabbitMQ、Redis、Jafka/Kafka 等等这些

1.  JMS的目标

        为企业级的应用提供一种智能的消息系统,JMS定义了一整套的企业级的消息概念与工具,尽可能最小化的Java语言概念去构建最大化企业消息应用。统一已经存在的企业级消息系统功能。

2.  JMS应用程序, 

一个完整的JMS应用应该实现以下功能:

l  JMS 客户端 – Java语言开发的接受与发送消息的程序

l  非JMS客户端 – 基于消息系统的本地API实现而不是JMS

l  消息 – 应用程序用来相互交流信息的载体

l  被管理对象–预先配置的JMS对象,JMS管理员创建,被客户端运用。如链接工厂,主题等

l  JMS提供者–完成JMS功能与管理功能的消息系统

3.  JMS体系结构

描述如下:

l  JMS提供者(JMS的实现者,比如activemq jbossmq等)

l  JMS客户(使用提供者发送消息的程序或对象,例如在12306中,负责发送一条购票消息到处理队列中,用来解决购票高峰问题,那么,发送消息到队列的程序和从队列获取消息的程序都叫做客户)

l  JMS生产者,JMS消费者(生产者及负责创建并发送消息的客户,消费者是负责接收并处理消息的客户)

l  JMS消息(在JMS客户之间传递数据的对象)

l  JMS队列(一个容纳那些被发送的等待阅读的消息的区域)

l  JMS主题(一种支持发送消息给多个订阅者的机制)

4.  JMS对象模型

l  连接工厂(connectionfactory)客户端使用连接工厂创建一个JMS连接(connection)。

l  JMS连接 表示JMS客户端和服务器端之间的一个活动的连接,是由客户端通过调用连接工厂的方法建立的。

l  JMS会话 session 标识JMS客户端和服务端的会话状态。会话建立在JMS连接上,标识客户与服务器之间的一个会话进程。

二:JMS的消息模式

1.  点对点的消息模式(Point to Point Messaging)

 

点对点消息模型:通过一个服务器消息队列实现,消息的发送者向队列写入消息,消息的接收者从队列取出消息。

下面的JMS对象在点对点消息模式中是必须的:

a.      队列(Queue) – 一个提供者命名的队列对象,客户端将会使用这个命名的队列对象

b.     队列链接工厂(QueueConnectionFactory) – 客户端使用队列链接工厂创建链接队列

        ConnectionQueue来取得与JMS点对点消息提供者的链接。

c.      链接队列(ConnectionQueue) – 一个活动的链接队列存在在客户端与点对点消息提供者之间,客户用它创建一个或者多个JMS队列会话(QueueSession)

d.     队列会话(QueueSession) – 用来创建队列消息的发送者与接受者(QueueSenderand

         QueueReceiver)

e.     消息发送者(QueueSender 或者MessageProducer)– 发送消息到已经声明的队列

f.       消息接受者(QueueReceiver或者MessageConsumer) – 接受已经被发送到指定队列的消息

2.  发布订阅模式(publish – subscribe Mode)

 

发布-订阅模式:把消息发送到给一个主题(Topic),消息服务器将消息发布给订阅器该主题的每一个订阅者。举个通俗的例子,就好比如一家杂志社(相当于消息发送者)把一堆杂志(相当于消息)寄到了邮政(相当于主题),再由邮政将杂志发给每一个有订阅这本杂志的读者(相当于消息接收者)

必须的消息对象:

a.      主题Topic(Destination) – 一个提供者命名的主题对象,客户端将会使用这个命名的主题对象

b.     主题链接工厂(TopciConnectionFactory) – 客户端使用主题链接工厂创建链接主题

         ConnectionTopic来取得与JMS消息Pub/Sub提供者的链接。

c.      链接主题(ConnectionTopic) – 一个活动的链接主题存在发布者与订阅者之间

d.     会话(TopicSession) – 用来创建主题消息的发布者与订阅者 (TopicPublisher  and

         TopicSubscribers)

e.     消息发送者MessageProducer) – 发送消息到已经声明的主题

f.       消息接受者(MessageConsumer) – 接受已经被发送到指定主题的消息

3.  区别:

点对点模型每一个消息只有一个接收者。

发布-订阅消息模式的每一个消息可以有多个接收者。

三:介绍ActiveMQ

ActiveMQ 是 Apache 出品,最流行的、能力强劲的开源消息总线。ActiveMQ 是一个完全支持 JMS1.1 和 J2EE 1.4 规范的 JMS Provider 实现,可以很容易内嵌到使用Spring的系统里面去,所以我们选择它。

ActiveMQ拥有以下优点

1.支持多种语言和协议编写客户端。语言: Java, C, C++, C#, Ruby, Perl, Python, PHP。应用协议: OpenWire,Stomp REST,WS Notification,XMPP,AMQP

2.完全支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务)

3.对Spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面

4.完全支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务)

5.通过了常见J2EE服务器(如 Geronimo,JBoss 4, GlassFish,WebLogic)的测试,其中通过JCA 1.5 resource adaptors的配置,可以让ActiveMQ可以自动的部署到任何兼容J2EE 1.4 商业服务器上

6.支持多种传送协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA

7.从设计上保证了高性能的集群,客户端-服务器,点对点

8.支持Ajax

9.支持与Axis的整合

10.可以很容易得调用内嵌JMS provider,进行测试

安装:

ActiveMQ(本文简称MQ)要求JDK1.5以上。

下载地址:

http://activemq.apache.org/download.html

解压:

activemq-all-5.5.0.jar:所有MQ JAR包的集合,用于用户系统调用
bin:其中包含MQ的启动脚本
conf:包含MQ的所有配置文件
data:日志文件及持久性消息数据
example:MQ的示例
lib:MQ运行所需的所有Lib
webapps:MQ的Web控制台及一些相关的DEMO

启动MQ:

Linux   ./active start | stop
windows: 双击bin目录下的activemq.bat文件即可启动MQ

 

登录地址:

http://IP:8161  (http://172.16.0.15:8161)

四:基于ActiveMQ(Point to Point)模式Demo程序

消息对象

public class MqBean implements Serializable {

   private Integer age;
   private String name;


   public Integer getAge() {
      return age;
   }

   public void setAge(Integer age) {
      this.age = age;
   }

   public String getName() {
      return name;
   }

   public void setName(String name) {
      this.name = name;
   }
}

 

4.1 队列消息的发送:

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.transport.TransportListener;

import javax.jms.*;
import java.io.IOException;

/**
 * Created with IntelliJ IDEA.
 * Project Name : ActiveMQ
 * User: Jelynn
 * Date: 2017/4/10
 * Time: 10:07
 * Describe:
 * Version:1.0
 */
public class Sender {

   public static void main(String[] args) {
      send();
   }

   //队列消息的发送
  
public static void send() {
      Connection connection;
      Session session;
      Destination destination;
      MessageProducer producer;
      ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "failover:tcp://172.16.0.15:61616");
      connectionFactory.setTrustAllPackages(true);
      try {
         connection = connectionFactory.createConnection();
         connection.start();
         //第一个参数是是否是事务型消息,设置为true,第二个参数无效
         //
第二个参数是
         //Session.AUTO_ACKNOWLEDGE
为自动确认,客户端发送和接收消息不需要做额外的工作。异常也会确认消息,应该是在执行之前确认的
         //Session.CLIENT_ACKNOWLEDGE
为客户端确认。客户端接收到消息后,必须调用javax.jms.Messageacknowledge方法。jms服务器才会删除消息。可以在失败的
         //
时候不确认消息,不确认的话不会移出队列,一直存在,下次启动继续接受。接收消息的连接不断开,其他的消费者也不会接受(正常情况下队列模式不存在其他消费者)
         //DUPS_OK_ACKNOWLEDGE
允许副本的确认模式。一旦接收方应用程序的方法调用从处理消息处返回,会话对象就会确认消息的接收;而且允许重复确认。在需要考虑资源使用时,这种模式非常有效。
         //
待测试
        
session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
         destination = session.createQueue("jelynn-queue");  //点对点模型
        
producer = session.createProducer(destination);
         //NON_PERSISTENT
         //PERSISTENT
        
producer.setDeliveryMode(DeliveryMode.PERSISTENT); //消息持久化,默认就是持久的(未消费的消息会持久化)
         //ObjectMessage
        
MqBean mqBean = new MqBean();
         mqBean.setAge(20);
         int i = 0;
         String str;
         while (true){
            i++;
//          str = "小黄" + i;
//          producer.send(session.createTextMessage(str));
           
mqBean.setName("小黄" + i);
            producer.send(session.createObjectMessage(mqBean));
            Thread.sleep(1000);
         }
//       producer.close();

     
} catch (JMSException e) {
         e.printStackTrace();
      } catch (InterruptedException e) {
         e.printStackTrace();
      }
   }
}

4.2 队列消息的接收

package com.jelynn.activemq.p2p;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * Created with IntelliJ IDEA.
 * Project Name : ActiveMQ
 * User: Jelynn
 * Date: 2017/4/10
 * Time: 10:07
 * Describe:
 * Version:1.0
 */
public class Receiver {

   public static void main(String[] args) {
      receive();
   }

   //消息队列接收
  
public static void receive(){
      Connection connection;
      Session session;
      Destination destination;
      MessageConsumer consumer;

      ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin","admin","failover:tcp://172.16.0.15:61616");
      connectionFactory.setTrustAllPackages(true);
      try {
         // 构造从工厂得到连接对象
        
connection = connectionFactory.createConnection();
         // 启动
        
connection.start();
         // 获取操作连接
         //
这个最好还是有事务
        
session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
         destination = session.createQueue("jelynn-queue");
         consumer = session.createConsumer(destination);
         consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
               try {
                  if(null != message){
                     MqBean mqBean = (MqBean) ((ObjectMessage)message).getObject();
                     System.out.println("接收到消息"+mqBean.getName());
                  }
//                if(null != message){
//                   String str = ((TextMessage)message).getText();
//                   System.out.println("
接收到消息  : "+str);
//                }
              
} catch (JMSException e) {
                  e.printStackTrace();
               }
            }
         });
      } catch (JMSException e) {
         e.printStackTrace();
      }
   }
}

如果针对一个queue,定义有多个Receiver,则一条message只能被一个Receiver消费,其他的无法接收到该消息。

注意:如果传输的消息为ObjectMessage,需要进行如下配置:

在${ACTIVEMQ_HOME}/bin/env 的ACTIVEMQ_OPTS参数中添加:

-Dorg.apache.activemq.SERIALIZABLE_PACKAGES=*    (*表示所有,也可以添加具体的包)

5.12.4和5.13.0以后的版本,可以在客户端需要信任的包:

ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");

factory.setTrustedPackages(new ArrayList(Arrays.asList("org.apache.activemq.test,org.apache.camel.test".split(","))));

或者信任所有:

ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");

factory.setTrustAllPackages(true);

参考:(http://activemq.apache.org/objectmessage.html)

五:基于ActiveMQPublish/subscribe模式Demo程序

5.1订阅消息的发送

package com.jelynn.activemq.publishsubscribe;

import com.jelynn.activemq.p2p.MqBean;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * Created by Jelynn on 2017/4/10.
 *
订阅消息的发送
 */
public class Publisher {

   public static void main(String[] args) throws JMSException {
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("failover:tcp://172.16.0.15:61616");
        Connection connection = factory.createConnection();
        connection.start();

        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Topic topic = session.createTopic("myTopic.messages");

        MessageProducer producer = session.createProducer(topic);
        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

        while(true) {
            TextMessage message = session.createTextMessage();
            message.setText("message_" + System.currentTimeMillis());
            producer.send(message);
            System.out.println("Sent message: " + message.getText());

            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
//      session.close();
//      connection.stop();
//      connection.close();
   
}
}

5.2订阅消息的接收

package com.jelynn.activemq.publishsubscribe;

import com.jelynn.activemq.p2p.MqBean;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * Created by Jelynn on 2017/4/10.
 *
订阅消息的接收
 * <p/>
 * Number Of Pending Messages
等待消费的消息这个是当前未出队列的数量。可以理解为总接收数-总出队列数
 * Messages Enqueued
进入队列的消息进入队列的总数量,包括出队列的。这个数量只增不减
 * Messages Dequeued
出了队列的消息可以理解为是消费这消费掉的数量
 */
public class Subscriber {

    public static void main(String[] args) throws JMSException {
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("failover:tcp://172.16.0.15:61616");
        Connection connection = factory.createConnection();
        connection.start();

        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Topic topic = session.createTopic("myTopic.messages");

        MessageConsumer consumer = session.createConsumer(topic);
        consumer.setMessageListener(new MessageListener() {
            public void onMessage(Message message) {
                TextMessage tm = (TextMessage) message;
                try {
                    System.out.println("Received message: " + tm.getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
//      session.close();
//      connection.stop();
//      connection.close();
   
}

}

可以定义多个Subscriber,进行订阅消息的接收,每个Subscriber都能接收到订阅消息

 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM