46.ActiveMQ開篇(Hello World、安全認證、Connection、Session、MessageProducer、MessageConsumer)


 要給有能力的人足夠的發揮空間,公司可以養一些能力平平甚至是混日子的人,但絕不能讓這些人妨礙有能力的人,否則這樣的環境不留也罷。

一、背景介紹

  CORBA\DCOM\RMI等RPC中間件技術已經廣泛應用於各個領域,但是面對規模和復雜度都越來越高的分布式系統,這些技術慢慢顯現出局限性:

  • 同步通信:客戶發出調用后,必須等待服務完成處理並返回結果后才能繼續執行;
  • 客戶端和服務端的生命周期密切耦合;客戶進程和服務對象進行必須都正常運行,如果由於服務對象崩潰或者網絡故障導致客戶的請求不可達,客戶會接收到異常。
  • 點對點通信:客戶的一次調用只發送給一個單獨的目標對象。

  面向消息的中間件較好地解決了上面的問題(Message Oriented Middleware,MOM)。發送者再發送消息給消息服務器消息服務器將消息存放到若干隊列中,在合適的時候再將消息轉發給接受者。這種模式下,發送和結束是異步的,發送者無需等待;二者的生命周期未必相同;發送消息的時候接受者不一定運行,接收消息的時候發送者也不一定運行;一對多通信:對一個消息可以有多個接受者。

  java消息服務(JMS)定義了java中訪問消息中間件的接口。JMS只是接口,並沒有實現。實現JMS的接口的消息中間件成文JMS Provider,已有的MOM系統包括Apache的ActiveMQ、以及阿里的RocketMQ、IBM的MQSeries、Microsoft的MSMQ和BEA的MessageQ、RabbitMQ等待,他們基本都遵循JMS規范。

二、JMS術語

JMS 實現JMS接口的消息中間件

Provider(MessageProvider):生產者

Consumer(MessageConsumer):消費者

PTP:Point to Point ,即點對點的消息模型;

pub/sub : publish/subscribe,發布/訂閱的消息模型;

Queue:隊列目標;

Topic:主題目標;

ConnectionFactory:連接工廠,JMS用它創建連接;

Connection:JMS客戶端到JMS Provider的連接;

Destination:消息的目的地;

Session:會話,一個法師或者接收消息的線程;

 

Message 接口(消息):

  是在消費者和生產者之間傳送的對象,也就是說從一個應用程序傳送到另一個應用程序。一個消息有三個主要部分:

  • 消息頭(必須要有):包含用於識別和為消息尋找路由的設置。
  • 一組消息屬性:包含額外的屬性,支持其他提供者和用戶的兼容。可以創建定制的字段和過路橋(消息選擇器)。
  • 一個消息體:允許用戶創建五種類型的消息(文本消息,映射消息,字節消息,流消息和對象消息)

Session 接口(會話):

  表示一個單線程的上下文,用於發送和接收消息。由於會話是單線程的,所以消息是連續的,就是說消息是按照發送的順序一個一個地接收。會話的好處是它支持事務。如果用戶選擇了事務支持,會話上下文保存一組消息,知道事務被提交才發送這些消息。在提交事務之前,用戶可以使用回滾操作取消這些消息。一個會話運行用戶創建消息生產者來發送消息,創建消費者來接收消息。

 

三、ActiveMQ簡介

  ActiveMQ是Apache出品,最流行的,能力強勁的開源消息總線。

  ActiveMQ是一個完全支持JMS1.1和J2EE1.4規范的JMS Provider實現,盡管JMS規范出台已經是很久的事情了,但是JMS在當今的J2EE應用中仍然扮演着重要角色,可以說ActiveMQ在業界應用最廣泛,當然如果想要有更強大的性能和海量數據處理能力,ActiveMQ還需要不斷地提升版本,80%以上的業務我們用ActiveMq都能滿足,當然后續如天貓淘寶這種大型的電商網站,尤其是雙十一這種特殊事件,ActiveMQ需要進行很復雜的優化源碼以及架構設計才能完成,我們只會會學習一個更強大的分布式消息中間件,RocketMQ,可以說ActiveMQ是和諧,是基礎,所以也必須要掌握好。

四、ActiveMQ的使用

4.1 環境搭建

  去官網下載:http://activemq.apache.org,解壓放到一個地方,我放在D:\Programe_Files下,然后直接啟動bin下的wrapper.exe即可啟動監控台,在瀏覽器中訪問localhost:8186,用戶名admin/admin 即可訪問。

4.2 ActiveMQ Hello World

  首先寫一個簡單的Hello World實例,感受一下ActiveMQ,需要實現接受者和發送者兩部分代碼。

  1. 建立COnnectionFactory工廠對象,需要填入用戶名密碼以及要連接的地址,默認端口為“tcp://localhost:61616”
  2. 通過ConnectionFactory工廠對象創建一個Connection連接,並且調用Connection的Start方法開啟連接。
  3. 通過Connection對象創建Session會話,用於接收消息,參數配置1為是否啟動事務,參數配置2為簽收哦模式,一般我們設置自動簽收。
  4. 通過Session創建Destination對象,指的是一個客戶端用來指定生產消息目標和消費消息來源的對象,在PTP模式中,Destination被稱作Queue;在Pub/Sub模式中,Destination被稱作Topic(即主題)。可以使用多個Queue和Topic
  5. 需要通過Session對象創建消息的發送和接收對象(生產者和消費者)MessageProducer/MessageConsumer
  6. 使用MessageProducer的setDeliveryMode方法為其設置持久化特性和非持久化特性(DeliveryMode)
  7. 使用JMS規范的TextMessage形式創建數據(通過Session對象),並用MessageProducer的send方法發送數據。同理客戶端使用receive方法接收數據,最后莫忘關閉Connection

發送:

 1 package com;
 2 
 3 import javax.jms.Connection;
 4 import javax.jms.ConnectionFactory;
 5 import javax.jms.DeliveryMode;
 6 import javax.jms.Destination;
 7 import javax.jms.JMSException;
 8 import javax.jms.MessageProducer;
 9 import javax.jms.Session;
10 import javax.jms.TextMessage;
11 
12 import org.apache.activemq.ActiveMQConnectionFactory;
13 
14 public class Sender {
15     
16     public static void main(String[]args) throws JMSException{
17         ConnectionFactory factory = new ActiveMQConnectionFactory(
18                 ActiveMQConnectionFactory.DEFAULT_USER,
19                 ActiveMQConnectionFactory.DEFAULT_PASSWORD,
20                 "tcp://localhost:61616"
21                 );
22         
23         Connection conn = factory.createConnection();
24         conn.start();
25         
26         Session session = conn.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
27         
28         Destination dest = session.createQueue("queue1");
29         
30         MessageProducer messProducer = session.createProducer(dest);
31         
32         messProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);;
33         
34         TextMessage mess = session.createTextMessage();
35         for(int i=0;i<5;i++){
36 
37             mess.setText("消息內容"+i);
38             
39             messProducer.send(mess);
40         }
41         
42         if(conn != null)conn.close();
43     }
44     
45 }

 

 

 

 

接收:

package com;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Receiver {
    public static void main(String[]args) throws JMSException{
        ConnectionFactory factory = new ActiveMQConnectionFactory(
                ActiveMQConnectionFactory.DEFAULT_USER,
                ActiveMQConnectionFactory.DEFAULT_PASSWORD,
                "tcp://localhost:61616"
                );
        
        Connection conn = factory.createConnection();
        conn.start();
        
        Session session = conn.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
        
        Destination dest = session.createQueue("queue1");
        
        MessageConsumer messConsumer = session.createConsumer(dest);
        
        while(true){
            TextMessage mess = (TextMessage) messConsumer.receive();
            System.out.println(mess.getText());
        }
        
//        if(conn != null)conn.close();
    }
}

 

 

4.3 啟動安全認證

在acitvemq.xml中的 broker 下加上如下代碼:

        <plugins>
            <simpleAuthenticationPlugin>
                <users>
                    <authenticationUser username="sgm" password="sgm" groups="users,admins" />
                </users>
            </simpleAuthenticationPlugin>
        </plugins>

 

 

重啟ActiveMQ,再運行4.2中的 sender 發生異常:

可見在第24行嘗試連接的時候就發生錯誤了。所以ActiveMQConnectionFactory.DEFAULT_USER要改成"sgm",密碼也同樣修改。receiver做同樣修改。

 

4.4 Connection的使用

  在成功創建正確的ConnectionFactory后,下一步將是創建一個連接,他是JMS定義的一個接口。ConnectionFactory負責返回可以與底層消息傳遞系統進行通信的Connection實現。通常客戶端只使用單一連接。根據JMS文檔,Connection的目的是“利用JMS提供者封裝開發的連接”。以及表示“客戶端與服務例程之間的開放TCP/IP套接字”。該文檔還指出Connection應該是進行客戶端身份校驗的地方。

  當一個Connection被創建時,它的傳輸默認是關閉的,必須使用start方法開啟。一個Connection可以建立一個或者多個Session。

  當一個程序執行完成后,必須關閉Connection,否則ActiveMQ不能釋放資源,關閉一個Connection同樣也關閉了Session、MessageProducer和MessageConsumer。

Connection createConnection();

Connection createConnection(String userName,String password);

 

大多數開源框架不需要自己優化了,能使用好API其實就是最好的優化了。

 

4.5 Session的使用

  一旦從ConnectionFactory中獲得一個Connection,必須從Connection中創建一個或者多個Session,才能進一步執行其他操作。Session是一個發送或者接收消息的線程,可以使用Session創建MessageProducer,MessageConsumer和Message。

  Session可以被事務化,也可以不用事務,通常,可以通過向Connection上的創建方法傳一個布爾值進行設置。

  Session createSession(boolean transacted,int acknowledgeMode);

  其中 transacted為是否使用事務的標識,acknowledgeMode為簽收方式。

  結束事務有兩種方式:提交或者回滾,當一個事務提交,消息被處理。事務中有一個步驟失敗,事務就回滾,這個事務中的已經執行的動作將被撤銷。在發送消息最后也必須要使用session.commit()方法提交事務。

  簽收模式有三種:

  1. session.AUTO_ACKNOWLEDGE當消費端從receive或者onMessage成功返回時,Session自動簽收客戶端的這條消息。
  2. Session.CLIENT_ACNOWLEDGE消費端通過調用消息(Message)的acknoledge方法簽收消息(mess.acknowledge())。在這種情況下,簽收發生在Session層面:簽收一個已經消費的消息會自動地簽收這個Session所有消費已消費消息的收條。
  3. Session.DUPS_OK_ACKNOWLEDGE此選項指示Session不必確保對傳送消息的簽收,它可能引起消息的重復,但是降低了Session的開銷,所以只有消費端能容忍重復消息才能使用。

   工作中一般使用手動簽收的方式

4.6 MessageProducer

 ---

 

 

 

4.7 MessageConsumer

 ---

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM