Java消息服務


  現在很多人都喜歡上網購物,當然對我這種喜歡便宜的人來說,雙十一,節假日搶購無疑是我的最愛。但是對於全國這么多人來說

大家都爭先恐后地去搶一件上平,那服務器還不得炸了才怪;所以今天我就來說說用消息服務來處理這個問題。用戶發送請求后,服務器

接到請求后,向消息隊列發送一個消息,就立刻返回“訂單正在處理”的消息給客戶。而訂單服務則不停的從消息隊列中取出消息,按照自己

的節奏去處理訂單,從而有效的避免了高峰期!!!

 

1 什么是JMS

  java消息服務(Java Message Service, JMS)是一個允許應用創建、發送、接收和讀取消息的Java API

  好了,那么怎么去更好地理解他呢?

  例如,一個汽車企業中某個庫存量低於某個等級時,庫存組件可以向工廠發送一個消息,使工廠產生更多的汽車

  工廠組件可以向部件發送一個消息,使得工廠可以裝配它需要的的部件

  部件組件可以進一步向他們自己的庫存和訂單組件發送消息跟新庫存

  。。。。

  通過消息傳送來完成這些任務,不同組件之間可以高效地交互,而不需要占用網絡或其他資源。

 

2 JMS的架構
  JMS應用由以下部分組成
  JMS提供者:是一個實現JMS接口的消息傳送系統,提供管理和控制特性
  JMS客戶端:是用Java編程語言編寫的程序和組件,可以生成(生產)和使用(消費)消息
  消息:是在JMS客戶端之間傳遞信息的對象
  受管理對象:是為客戶端配置的JMS對象,有兩類JMS受管理
  對象:目的工廠(destination) 和 連接工廠(connection factory)


3 發送方式
  (1) 點對點(point to point)
  * 每個消息只有一個消費者(即使用方)
  * 接收者可以獲取消息,而不論客戶端發送消息時接收者是否在運行
  所以說如果你發送一個每一個消息都必須由一個消費者成功地處理,就可以使用PTP消息傳送

  (2) 發布/訂閱消息傳送方式
  * 每個消息可以有多個消費者
  * 對於訂閱一個主題的客戶端,只有這個客戶端創建一個訂閱之后才能使用鎖發送的消息,消費者必須保持活動狀態才能使用消息

  (對於這種情況相對與上一種要復雜一點。只有創建了共享訂閱才能被多個訂閱者訂閱)
  所以如果消息可以由任意多個消費者(或者沒有任何消費者)處理,就可以使用發布/訂閱消息傳送方式

4 消息的接收
  消息產品本質上市異步的:在消息的生成和使用之間並沒有時間依賴關系,不過JMS還是提供了兩種方案
  (1)同步:消息者通過調用receive方法,顯式地從目的地獲取消息。receive方法會阻塞,直到消息到達,或者如皋消息未在指定的時限內到達則會超時。
  (2)異步:應用客戶端或Java SE客戶端可以向消費者注冊一個消息監聽器(messagelistener).消息監聽器類似於事件監聽器。一旦有消息到達目的地,JMS
  提供者就通過調用這個監聽器onMessage方法傳送消息,這個方法會處理消息的內容。在一個Java EE應用中,消息驅動bean會作為消息監聽器(它也有
  一個onMessage方法),不過客戶端不需要消費者注冊這個監聽器。

通這張圖然后理解下面的例子就簡單了

5 參數分析:

  • 連接工廠(connection factory)是客戶端用來創建連接的對象,客戶端可以用它創建與一個提供者的連接。
  • 目的地(destination) 客戶端可以用目的地(destination)對象指定所生成消息的目標以及所使用的來源。在PTP消息傳送方式中,目的地稱為隊列。
  • 在發布/訂閱消息傳送方式中,目的地稱為主題。JMS應用可以使用多個隊列或主題*或者兩者都使用)。
  • 連接(connection) 封裝了與一個JMS提供者的虛擬連接
  • 會話(session) 是生成和使用消息的一個單線程上下文。有時會通過JMSContext或connection來創建一個會話
  • 消息生產者:由JMSContext或會話創建的一個對象,用來向目的地發送消息。
  • 消息消費者:由JMSContext或會話創建的一個對象,用來接收發送到一個目的地的消息

 

好了上酸菜:(點對點 + 同步接收)

package Test1;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class Consumer {
    public static void main(String[] args) throws JMSException{
        //從連接工廠里獲取連接
        ConnectionFactory factory=new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,ActiveMQConnection.DEFAULT_PASSWORD,"tcp://localhost:61616");
        Connection connection=factory.createConnection();
        connection.start();
        
        //創建會話
        Session session=connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
        //創建目的地,它是用來接收信息的:這里創建一個序列
        Destination destination=session.createQueue("MyQueue");
        //創建消費者
        MessageConsumer consumer=session.createConsumer(destination);
        Message message=consumer.receive();
        if(message!=null){
            TextMessage text=(TextMessage) message;
            System.out.println("接收到了:"+text.getText());
        }
    }

}
package Test1;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class Produecer {
    public static void main(String[] args) throws JMSException{
        //從連接工廠里獲取連接
        ConnectionFactory factory=new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,ActiveMQConnection.DEFAULT_PASSWORD,"tcp://localhost:61616");
        Connection connection=factory.createConnection();
        connection.start();
        
        //創建會話
        Session session=connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
        //創建目的地,它是用來接收信息的:這里創建一個序列
        Destination destination=session.createQueue("MyQueue");
        //創建生產者
        MessageProducer producer=session.createProducer(destination);
        //創建一個發送消息
        Message message=session.createTextMessage("你好啊!!!|");
        //發送
        producer.send(message);
        
        //這里可別忘了,提交會話
        session.commit();
        System.out.println("信息發送成功");
    }
}

發布/訂閱 +異步處理

package Subcriber;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class TopicPublisher {
    public static void main(String[] args) throws JMSException {
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
        Connection connection = factory.createConnection();
        connection.start();
        
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Topic topic = session.createTopic("myTopic.messages");

        MessageProducer producer = session.createProducer(topic);
        
        
        //創建非持久的訂閱
        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

        while(true) {
            TextMessage message = session.createTextMessage();
            message.setText("message_" + System.currentTimeMillis());
            producer.send(message);
            System.out.println("Sent message: " + message.getText());

            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

//        session.close();
//        connection.stop();
//        connection.close();
    }
}
package Subcriber;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class TopicSubscriber {
    public static void main(String[] args) throws JMSException {
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
        Connection connection = factory.createConnection();
        
        connection.start();
        
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    
        Topic topic = session.createTopic("myTopic.messages");

        MessageConsumer consumer = session.createConsumer(topic);
        consumer.setMessageListener(new MessageListener() {
            public void onMessage(Message message) {
                TextMessage tm = (TextMessage) message;
                try {
                    System.out.println("Received message: " + tm.getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
//        session.close();
//        connection.stop();
//        connection.close();
    }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM