ActiveMQ教程(消息發送和接受)


一 環境的搭建

  version為你的版本號

  如果你是普通的項目的話,創建一個lib文件夾導入相應的jar包到你的lib中,jar包為:activemq-all-{version}.jar、log4j-{version}.jar、slf4j-log4j12-{version}.jar,並且bulidpath,將jar加載到你的項目中就可以使用了。

  如果是web項目,需要將以上的jar包導入到你的web-inf的lib中,將自動加載到你的工程中。

  如果你是一個mave工程,需要修改你的pom.xml文件,添加相關的依賴:(如下)

  <dependency>
      <groupId>org.apache.activemq</groupId>
      <artifactId>activemq-all</artifactId>
      <version>{version}</version>
  </dependency>

  <dependency>
      <groupId>log4j</groupId>
      <artifactId>log4j</artifactId>
      <version>{version}</version>
  </dependency>

  <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-log4j12</artifactId>
      <version>{version}</version>
  </dependency>

  相應的版本可以去maven的資源庫中下載:http://www.mvnrepository.com。

  導入相應的jar包之后就可以進行相應的開發了,現在我們開發發送端。創建創建一個普通的java類,名稱為ActiveMqUtil

import java.util.Date;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;

public class ActiveMqUtil {
    
    public static void senderMessage(String message){
        //鏈接工廠
        ConnectionFactory connectionFactory;
        //創建鏈接
        Connection connection = null;
        //創建一個session
        Session session;
        //創建目的地
        Destination destination;
        //消息提供者
        MessageProducer messageProducer;
        //構造ConnectionFactory
        connectionFactory = new org.apache.activemq.ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,ActiveMQConnection.DEFAULT_PASSWORD,
                "tcp://localhost:61616");

    // 設置用戶名和密碼,這個用戶名和密碼在conf目錄下的credentials.properties文件中,也可以在activemq.xml中配置,我這里是默認的,所以就注釋掉了
       //connectionFactory.setUserName("應戶名");
       //connectionFactory.setPassword("密碼");
        try {
            //得到連接對象
            connection = connectionFactory.createConnection();
            //啟動鏈接
            connection.start();

    // 創建Session,參數解釋:
            // 第一個參數是否使用事務:當消息發送者向消息提供者(即消息代理)發送消息時,消息發送者等待消息代理的確認,沒有回應則拋出異常,消息發送程序負責處理這個錯誤。
            // 第二個參數消息的確認模式:
            // AUTO_ACKNOWLEDGE : 指定消息提供者在每次收到消息時自動發送確認。消息只向目標發送一次,但傳輸過程中可能因為錯誤而丟失消息。
            // CLIENT_ACKNOWLEDGE : 由消息接收者確認收到消息,通過調用消息的acknowledge()方法(會通知消息提供者收到了消息)
            // DUPS_OK_ACKNOWLEDGE : 指定消息提供者在消息接收者沒有確認發送時重新發送消息(這種確認模式不在乎接收者收到重復的消息)。
            session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
            //獲取服務商的消息
            destination = session.createQueue("你的active里面創建的que的名稱");
            //得到消息的發送者
            messageProducer = session.createProducer(destination);
            messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
            sendMessage(session, messageProducer,message);
            session.commit();
        } catch (JMSException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }finally{
                try {
                    if(null != connection){
                        connection.close();
                    }
                } catch (JMSException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
            }
        }
    
    }
    
    public static void sendMessage(Session session,MessageProducer messageProducer,String messages) throws JMSException{
        TextMessage message = session.createTextMessage(messages);
        //發送消息到服務器
        messageProducer.send(message);
    }
}

 

之后創建一個測試類,Test

  public class Test{

    public static void main(String [] args){

      ActiveMqUtil.senderMessage("發送者提供的消息!");

    }

  }

運行test的main方法,進行相應的發送。現在你可以訪問你安裝的activemq的頁面去查看相應的信息。這個que的Number Of Consumers會多一條記錄,代表發送成功。

現在發送者已經發送成功,現在我們開發接受端。創建一個類,名稱為Reciver

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class Reciver {
    
    public static void main(String[] args) {
        //創建工廠
        ConnectionFactory connectionFactory;
        //創建connection
        Connection connection = null;
        //創建session
        Session session;
        //創建目的地
        Destination destination;
        //消費者
        MessageConsumer consumer;
        //得到工廠
        connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
                ActiveMQConnection.DEFAULT_PASSWORD , "tcp://localhost:61616");
        try {
            //創建鏈接
            connection = connectionFactory.createConnection();
            //啟動
            connection.start();
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //獲取服務器上的消息
            destination = session.createQueue("yc-security-mq");
            consumer = session.createConsumer(destination);
            while(true){
                TextMessage message = (TextMessage) consumer.receive(100000);
                if(null != message){
                    System.out.println(message.getText());
                }else{
                    break;
                }
            }
        } catch (JMSException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }finally{
            try{
                if(null != null){
                    connection.close();
                }
            }catch(Exception e){
                e.printStackTrace();
            }
        }
    }

}

運行這個測試類,將會顯示:發送者提供的消息!,這時代表接受端成功。在activemq安裝的頁面上,Messages Dequeued  將會增加一條。

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM