activemq是apache的一個JMS接口的實現產品,java中定義了JMS規范,雖然RocketMQ,kafka現在比較火,但是還是建議先學一下activeMQ再學其他兩個就容易很多
首先可以下載壓縮包,linux中,Windows中都可以,個人建議linux,目錄結構
直接在bin下面運行:
然后進入管控台,默認用戶密碼是admin,admin,可以在配置文件里進行配置:
,開啟之后新建maven項目,添加mq依賴,這時候就可以實現消息發送和接受了:
先說一下代碼中用到的對象:
activeMQ是嚴格實現了JMS規范的,所以代碼中也就是這些接口的實現
發送消息端:
//發送消息 public class Sender { public static void main(String[] args) throws JMSException { //1 創建ConnectionFactory工廠, 用戶名 密碼 連接url // 如果添加了安全機制,只有用有認證的用戶去發送或者接受 ConnectionFactory factory = new ActiveMQConnectionFactory( "houzheng",//默認的用戶 "houzheng", "tcp://localhost:61616" //默認端口: 61616 ); //2 創建連接,並開啟,默認是關閉的,並且完成后要關閉連接 Connection connection = factory.createConnection(); connection.start(); //3 創建session 參數: 是否啟用事務 選擇簽收模式(自動簽收,客戶端簽收,不確認是否成功簽收(消息可能崇重復)) Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); //4 創建Destination對象, 參數: 指定消息隊列名稱 //Destination指的是消息的目的地或者消費來源,TPT模式中成為Queue,訂閱/發布中稱為主題 Destination destination=session.createQueue("myqueue"); //5 通過session對象創建消息的發送者或者接受者 MessageProducer producer = session.createProducer(destination);//傳入Destination //6 可以設置持久化或者非持久化 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); //7 發送消息並關閉連接,可發送多種定義格式的消息 for (int i = 0; i < 5; i++) { TextMessage textMessage = session.createTextMessage(); textMessage.setText("我是第"+i+"條消息"); producer.send(textMessage); } //如果開啟了事務,則必須要提交才能全部發送到服務器 //session.commit(); if(connection!=null)connection.close(); } }
這里我使用了自己配置的用戶,使用了安全機制,即只有我認證的用戶才能發送或者接受消息,當然也可以使用默認的用戶,不過不要加安全驗證,加了就必須使用驗證通過的
接受消息端:
//消費者 public class Receiver { public static void main(String[] args) throws JMSException { //1 創建ConnectionFactory工廠, 用戶名 密碼 連接url ConnectionFactory factory = new ActiveMQConnectionFactory( "houzheng",//默認的用戶 "houzheng", "tcp://localhost:61616" //默認端口: 61616 ); //2 創建連接,並開啟,默認是關閉的,並且完成后要關閉連接 Connection connection = factory.createConnection(); connection.start(); //3 創建session 參數: 是否啟用事務 選擇簽收模式(自動簽收,客戶端簽收,不確認是否成功簽收(消息可能崇重復)) Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); //4 創建Destination對象, 參數: 指定消息隊列名稱 //Destination指的是消息的目的地或者消費來源,TPT模式中成為Queue,訂閱/發布中稱為主題 Destination destination=session.createQueue("myqueue");//必須和生產者指定的queue一樣 //5 通過session對象創建消息的發送者或者接受者 MessageConsumer consumer = session.createConsumer(destination);//傳入Destination //6 接受消息 while(true){ TextMessage message = (TextMessage) consumer.receive();//阻塞接受,還可以指定等待時間或者不阻塞接受,沒有就跳過 if(message==null)break; System.out.println(message.getText()); } if(connection!=null)connection.close();//關閉連接 } }
發送消息並消費之后就可以去管控台查看消息隊列,這里只是PTP模式的
或者也可以進行持久化到mysql中,直接去數據庫中查詢,不過還是使用默認的kahadb或者leveldb這種內存數據存儲性能比較好,mysql 可能比較直觀看到!