JMS-AciveMQ


一:JMS基本概念

JMS即Java消息服務(Java Message Service)應用程序接口是一個Java平台中關於面向消息中間件(MOM)的API,用於在兩個應用程序之間,或分布式系統中發送消息,進行異步通信(用於解決兩個或者多個程序之間的耦合)。它便於消息系統中的 Java 應用程序進行消息交換,並且通過提供標准的產生、發送、接收消息的接口簡化企業應用的開發。

也就是說它定義看一系列規范,然后大家按照這種規范來開發自己消息服務,當然,現在有好多開源的來供大家使用了, 比如說Apache ActiveMQ、RabbitMQ、Redis、Jafka/Kafka 等等這些

1.  JMS的目標

        為企業級的應用提供一種智能的消息系統,JMS定義了一整套的企業級的消息概念與工具,盡可能最小化的Java語言概念去構建最大化企業消息應用。統一已經存在的企業級消息系統功能。

2.  JMS應用程序, 

一個完整的JMS應用應該實現以下功能:

l  JMS 客戶端 – Java語言開發的接受與發送消息的程序

l  非JMS客戶端 – 基於消息系統的本地API實現而不是JMS

l  消息 – 應用程序用來相互交流信息的載體

l  被管理對象–預先配置的JMS對象,JMS管理員創建,被客戶端運用。如鏈接工廠,主題等

l  JMS提供者–完成JMS功能與管理功能的消息系統

3.  JMS體系結構

描述如下:

l  JMS提供者(JMS的實現者,比如activemq jbossmq等)

l  JMS客戶(使用提供者發送消息的程序或對象,例如在12306中,負責發送一條購票消息到處理隊列中,用來解決購票高峰問題,那么,發送消息到隊列的程序和從隊列獲取消息的程序都叫做客戶)

l  JMS生產者,JMS消費者(生產者及負責創建並發送消息的客戶,消費者是負責接收並處理消息的客戶)

l  JMS消息(在JMS客戶之間傳遞數據的對象)

l  JMS隊列(一個容納那些被發送的等待閱讀的消息的區域)

l  JMS主題(一種支持發送消息給多個訂閱者的機制)

4.  JMS對象模型

l  連接工廠(connectionfactory)客戶端使用連接工廠創建一個JMS連接(connection)。

l  JMS連接 表示JMS客戶端和服務器端之間的一個活動的連接,是由客戶端通過調用連接工廠的方法建立的。

l  JMS會話 session 標識JMS客戶端和服務端的會話狀態。會話建立在JMS連接上,標識客戶與服務器之間的一個會話進程。

二:JMS的消息模式

1.  點對點的消息模式(Point to Point Messaging)

 

點對點消息模型:通過一個服務器消息隊列實現,消息的發送者向隊列寫入消息,消息的接收者從隊列取出消息。

下面的JMS對象在點對點消息模式中是必須的:

a.      隊列(Queue) – 一個提供者命名的隊列對象,客戶端將會使用這個命名的隊列對象

b.     隊列鏈接工廠(QueueConnectionFactory) – 客戶端使用隊列鏈接工廠創建鏈接隊列

        ConnectionQueue來取得與JMS點對點消息提供者的鏈接。

c.      鏈接隊列(ConnectionQueue) – 一個活動的鏈接隊列存在在客戶端與點對點消息提供者之間,客戶用它創建一個或者多個JMS隊列會話(QueueSession)

d.     隊列會話(QueueSession) – 用來創建隊列消息的發送者與接受者(QueueSenderand

         QueueReceiver)

e.     消息發送者(QueueSender 或者MessageProducer)– 發送消息到已經聲明的隊列

f.       消息接受者(QueueReceiver或者MessageConsumer) – 接受已經被發送到指定隊列的消息

2.  發布訂閱模式(publish – subscribe Mode)

 

發布-訂閱模式:把消息發送到給一個主題(Topic),消息服務器將消息發布給訂閱器該主題的每一個訂閱者。舉個通俗的例子,就好比如一家雜志社(相當於消息發送者)把一堆雜志(相當於消息)寄到了郵政(相當於主題),再由郵政將雜志發給每一個有訂閱這本雜志的讀者(相當於消息接收者)

必須的消息對象:

a.      主題Topic(Destination) – 一個提供者命名的主題對象,客戶端將會使用這個命名的主題對象

b.     主題鏈接工廠(TopciConnectionFactory) – 客戶端使用主題鏈接工廠創建鏈接主題

         ConnectionTopic來取得與JMS消息Pub/Sub提供者的鏈接。

c.      鏈接主題(ConnectionTopic) – 一個活動的鏈接主題存在發布者與訂閱者之間

d.     會話(TopicSession) – 用來創建主題消息的發布者與訂閱者 (TopicPublisher  and

         TopicSubscribers)

e.     消息發送者MessageProducer) – 發送消息到已經聲明的主題

f.       消息接受者(MessageConsumer) – 接受已經被發送到指定主題的消息

3.  區別:

點對點模型每一個消息只有一個接收者。

發布-訂閱消息模式的每一個消息可以有多個接收者。

三:介紹ActiveMQ

ActiveMQ 是 Apache 出品,最流行的、能力強勁的開源消息總線。ActiveMQ 是一個完全支持 JMS1.1 和 J2EE 1.4 規范的 JMS Provider 實現,可以很容易內嵌到使用Spring的系統里面去,所以我們選擇它。

ActiveMQ擁有以下優點

1.支持多種語言和協議編寫客戶端。語言: Java, C, C++, C#, Ruby, Perl, Python, PHP。應用協議: OpenWire,Stomp REST,WS Notification,XMPP,AMQP

2.完全支持JMS1.1和J2EE 1.4規范 (持久化,XA消息,事務)

3.對Spring的支持,ActiveMQ可以很容易內嵌到使用Spring的系統里面

4.完全支持JMS1.1和J2EE 1.4規范 (持久化,XA消息,事務)

5.通過了常見J2EE服務器(如 Geronimo,JBoss 4, GlassFish,WebLogic)的測試,其中通過JCA 1.5 resource adaptors的配置,可以讓ActiveMQ可以自動的部署到任何兼容J2EE 1.4 商業服務器上

6.支持多種傳送協議:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA

7.從設計上保證了高性能的集群,客戶端-服務器,點對點

8.支持Ajax

9.支持與Axis的整合

10.可以很容易得調用內嵌JMS provider,進行測試

安裝:

ActiveMQ(本文簡稱MQ)要求JDK1.5以上。

下載地址:

http://activemq.apache.org/download.html

解壓:

activemq-all-5.5.0.jar:所有MQ JAR包的集合,用於用戶系統調用
bin:其中包含MQ的啟動腳本
conf:包含MQ的所有配置文件
data:日志文件及持久性消息數據
example:MQ的示例
lib:MQ運行所需的所有Lib
webapps:MQ的Web控制台及一些相關的DEMO

啟動MQ:

Linux   ./active start | stop
windows: 雙擊bin目錄下的activemq.bat文件即可啟動MQ

 

登錄地址:

http://IP:8161  (http://172.16.0.15:8161)

四:基於ActiveMQ(Point to Point)模式Demo程序

消息對象

public class MqBean implements Serializable {

   private Integer age;
   private String name;


   public Integer getAge() {
      return age;
   }

   public void setAge(Integer age) {
      this.age = age;
   }

   public String getName() {
      return name;
   }

   public void setName(String name) {
      this.name = name;
   }
}

 

4.1 隊列消息的發送:

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.transport.TransportListener;

import javax.jms.*;
import java.io.IOException;

/**
 * Created with IntelliJ IDEA.
 * Project Name : ActiveMQ
 * User: Jelynn
 * Date: 2017/4/10
 * Time: 10:07
 * Describe:
 * Version:1.0
 */
public class Sender {

   public static void main(String[] args) {
      send();
   }

   //隊列消息的發送
  
public static void send() {
      Connection connection;
      Session session;
      Destination destination;
      MessageProducer producer;
      ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "failover:tcp://172.16.0.15:61616");
      connectionFactory.setTrustAllPackages(true);
      try {
         connection = connectionFactory.createConnection();
         connection.start();
         //第一個參數是是否是事務型消息,設置為true,第二個參數無效
         //
第二個參數是
         //Session.AUTO_ACKNOWLEDGE
為自動確認,客戶端發送和接收消息不需要做額外的工作。異常也會確認消息,應該是在執行之前確認的
         //Session.CLIENT_ACKNOWLEDGE
為客戶端確認。客戶端接收到消息后,必須調用javax.jms.Messageacknowledge方法。jms服務器才會刪除消息。可以在失敗的
         //
時候不確認消息,不確認的話不會移出隊列,一直存在,下次啟動繼續接受。接收消息的連接不斷開,其他的消費者也不會接受(正常情況下隊列模式不存在其他消費者)
         //DUPS_OK_ACKNOWLEDGE
允許副本的確認模式。一旦接收方應用程序的方法調用從處理消息處返回,會話對象就會確認消息的接收;而且允許重復確認。在需要考慮資源使用時,這種模式非常有效。
         //
待測試
        
session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
         destination = session.createQueue("jelynn-queue");  //點對點模型
        
producer = session.createProducer(destination);
         //NON_PERSISTENT
         //PERSISTENT
        
producer.setDeliveryMode(DeliveryMode.PERSISTENT); //消息持久化,默認就是持久的(未消費的消息會持久化)
         //ObjectMessage
        
MqBean mqBean = new MqBean();
         mqBean.setAge(20);
         int i = 0;
         String str;
         while (true){
            i++;
//          str = "小黃" + i;
//          producer.send(session.createTextMessage(str));
           
mqBean.setName("小黃" + i);
            producer.send(session.createObjectMessage(mqBean));
            Thread.sleep(1000);
         }
//       producer.close();

     
} catch (JMSException e) {
         e.printStackTrace();
      } catch (InterruptedException e) {
         e.printStackTrace();
      }
   }
}

4.2 隊列消息的接收

package com.jelynn.activemq.p2p;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * Created with IntelliJ IDEA.
 * Project Name : ActiveMQ
 * User: Jelynn
 * Date: 2017/4/10
 * Time: 10:07
 * Describe:
 * Version:1.0
 */
public class Receiver {

   public static void main(String[] args) {
      receive();
   }

   //消息隊列接收
  
public static void receive(){
      Connection connection;
      Session session;
      Destination destination;
      MessageConsumer consumer;

      ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin","admin","failover:tcp://172.16.0.15:61616");
      connectionFactory.setTrustAllPackages(true);
      try {
         // 構造從工廠得到連接對象
        
connection = connectionFactory.createConnection();
         // 啟動
        
connection.start();
         // 獲取操作連接
         //
這個最好還是有事務
        
session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
         destination = session.createQueue("jelynn-queue");
         consumer = session.createConsumer(destination);
         consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
               try {
                  if(null != message){
                     MqBean mqBean = (MqBean) ((ObjectMessage)message).getObject();
                     System.out.println("接收到消息"+mqBean.getName());
                  }
//                if(null != message){
//                   String str = ((TextMessage)message).getText();
//                   System.out.println("
接收到消息  : "+str);
//                }
              
} catch (JMSException e) {
                  e.printStackTrace();
               }
            }
         });
      } catch (JMSException e) {
         e.printStackTrace();
      }
   }
}

如果針對一個queue,定義有多個Receiver,則一條message只能被一個Receiver消費,其他的無法接收到該消息。

注意:如果傳輸的消息為ObjectMessage,需要進行如下配置:

在${ACTIVEMQ_HOME}/bin/env 的ACTIVEMQ_OPTS參數中添加:

-Dorg.apache.activemq.SERIALIZABLE_PACKAGES=*    (*表示所有,也可以添加具體的包)

5.12.4和5.13.0以后的版本,可以在客戶端需要信任的包:

ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");

factory.setTrustedPackages(new ArrayList(Arrays.asList("org.apache.activemq.test,org.apache.camel.test".split(","))));

或者信任所有:

ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");

factory.setTrustAllPackages(true);

參考:(http://activemq.apache.org/objectmessage.html)

五:基於ActiveMQPublish/subscribe模式Demo程序

5.1訂閱消息的發送

package com.jelynn.activemq.publishsubscribe;

import com.jelynn.activemq.p2p.MqBean;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * Created by Jelynn on 2017/4/10.
 *
訂閱消息的發送
 */
public class Publisher {

   public static void main(String[] args) throws JMSException {
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("failover:tcp://172.16.0.15:61616");
        Connection connection = factory.createConnection();
        connection.start();

        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Topic topic = session.createTopic("myTopic.messages");

        MessageProducer producer = session.createProducer(topic);
        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

        while(true) {
            TextMessage message = session.createTextMessage();
            message.setText("message_" + System.currentTimeMillis());
            producer.send(message);
            System.out.println("Sent message: " + message.getText());

            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
//      session.close();
//      connection.stop();
//      connection.close();
   
}
}

5.2訂閱消息的接收

package com.jelynn.activemq.publishsubscribe;

import com.jelynn.activemq.p2p.MqBean;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * Created by Jelynn on 2017/4/10.
 *
訂閱消息的接收
 * <p/>
 * Number Of Pending Messages
等待消費的消息這個是當前未出隊列的數量。可以理解為總接收數-總出隊列數
 * Messages Enqueued
進入隊列的消息進入隊列的總數量,包括出隊列的。這個數量只增不減
 * Messages Dequeued
出了隊列的消息可以理解為是消費這消費掉的數量
 */
public class Subscriber {

    public static void main(String[] args) throws JMSException {
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("failover:tcp://172.16.0.15:61616");
        Connection connection = factory.createConnection();
        connection.start();

        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Topic topic = session.createTopic("myTopic.messages");

        MessageConsumer consumer = session.createConsumer(topic);
        consumer.setMessageListener(new MessageListener() {
            public void onMessage(Message message) {
                TextMessage tm = (TextMessage) message;
                try {
                    System.out.println("Received message: " + tm.getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
//      session.close();
//      connection.stop();
//      connection.close();
   
}

}

可以定義多個Subscriber,進行訂閱消息的接收,每個Subscriber都能接收到訂閱消息

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM