ActiveMQ實現消息的發送與接受


activemq是apache的一個JMS接口的實現產品,java中定義了JMS規范,雖然RocketMQ,kafka現在比較火,但是還是建議先學一下activeMQ再學其他兩個就容易很多

首先可以下載壓縮包,linux中,Windows中都可以,個人建議linux,目錄結構

直接在bin下面運行:

然后進入管控台,默認用戶密碼是admin,admin,可以在配置文件里進行配置:

,開啟之后新建maven項目,添加mq依賴,這時候就可以實現消息發送和接受了:

先說一下代碼中用到的對象:

 

 activeMQ是嚴格實現了JMS規范的,所以代碼中也就是這些接口的實現

發送消息端:

//發送消息
public class Sender {
    public static void main(String[] args) throws JMSException {
        //1 創建ConnectionFactory工廠,  用戶名  密碼  連接url
        // 如果添加了安全機制,只有用有認證的用戶去發送或者接受
        ConnectionFactory factory = new ActiveMQConnectionFactory(
                "houzheng",//默認的用戶
                "houzheng",
                "tcp://localhost:61616"  //默認端口: 61616
        );
        //2 創建連接,並開啟,默認是關閉的,並且完成后要關閉連接
        Connection connection = factory.createConnection();
        connection.start();
        //3 創建session   參數: 是否啟用事務   選擇簽收模式(自動簽收,客戶端簽收,不確認是否成功簽收(消息可能崇重復))
        Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
        //4 創建Destination對象, 參數: 指定消息隊列名稱
        //Destination指的是消息的目的地或者消費來源,TPT模式中成為Queue,訂閱/發布中稱為主題
        Destination destination=session.createQueue("myqueue");
        //5 通過session對象創建消息的發送者或者接受者
        MessageProducer producer = session.createProducer(destination);//傳入Destination
        //6 可以設置持久化或者非持久化
        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
        //7 發送消息並關閉連接,可發送多種定義格式的消息
        for (int i = 0; i < 5; i++) {
            TextMessage textMessage = session.createTextMessage();
            textMessage.setText("我是第"+i+"條消息");
            producer.send(textMessage);
        }
        //如果開啟了事務,則必須要提交才能全部發送到服務器
        //session.commit();
        if(connection!=null)connection.close();
    }
}  

這里我使用了自己配置的用戶,使用了安全機制,即只有我認證的用戶才能發送或者接受消息,當然也可以使用默認的用戶,不過不要加安全驗證,加了就必須使用驗證通過的

接受消息端:

//消費者
public class Receiver {
    public static void main(String[] args) throws JMSException {
        //1 創建ConnectionFactory工廠,  用戶名  密碼  連接url
        ConnectionFactory factory = new ActiveMQConnectionFactory(
                "houzheng",//默認的用戶
                "houzheng",
                "tcp://localhost:61616"  //默認端口: 61616
        );
        //2 創建連接,並開啟,默認是關閉的,並且完成后要關閉連接
        Connection connection = factory.createConnection();
        connection.start();
        //3 創建session   參數: 是否啟用事務   選擇簽收模式(自動簽收,客戶端簽收,不確認是否成功簽收(消息可能崇重復))
        Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
        //4 創建Destination對象, 參數: 指定消息隊列名稱
        //Destination指的是消息的目的地或者消費來源,TPT模式中成為Queue,訂閱/發布中稱為主題
        Destination destination=session.createQueue("myqueue");//必須和生產者指定的queue一樣
        //5 通過session對象創建消息的發送者或者接受者
        MessageConsumer consumer = session.createConsumer(destination);//傳入Destination
        //6 接受消息
        while(true){
            TextMessage message = (TextMessage) consumer.receive();//阻塞接受,還可以指定等待時間或者不阻塞接受,沒有就跳過
            if(message==null)break;
            System.out.println(message.getText());
        }
        if(connection!=null)connection.close();//關閉連接
    }
}

  發送消息並消費之后就可以去管控台查看消息隊列,這里只是PTP模式的

 

 

 

或者也可以進行持久化到mysql中,直接去數據庫中查詢,不過還是使用默認的kahadb或者leveldb這種內存數據存儲性能比較好,mysql 可能比較直觀看到!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM