步驟一:創建一個數據庫
步驟二:配置activemq.xml配置文件
1.在persistenceAdapter加入如下配置
<!--createTablesOnStartup 啟動是否創建表 第一次為true 后續為false--> <jdbcPersistenceAdapter dataSource="#activemq-db" createTablesOnStartup="true" />
第一次為true是為了創建表,之后的每次都不創建,使用第一次創建的表保存數據
2.配置數據源
在beans節點中
<bean id="activemq-db" class="org.apache.commons.dbcp.BasicDataSource"> <property name="driverClassName" value="com.mysql.jdbc.Driver"/> <property name="url" value="jdbc:mysql://127.0.0.1:3306/activemq"/> <property name="username" value="root"/> <property name="password" value="root"/> <property name="maxActive" value="200"/> <property name="poolPreparedStatements" value="true"/> </bean>
3.將數據庫連接Jar放到activemq解壓的lib文件夾下
步驟三:重新啟動activemq
重啟后刷新數據庫,會生成三張表
activemq_msgs用於存儲消息,Queue和Topic都存儲在這個表中:
ID:自增的數據庫主鍵
CONTAINER:消息的Destination
MSGID_PROD:消息發送者客戶端的主鍵
MSG_SEQ:是發送消息的順序,MSGID_PROD+MSG_SEQ可以組成JMS的MessageID
EXPIRATION:消息的過期時間,存儲的是從1970-01-01到現在的毫秒數
MSG:消息本體的Java序列化對象的二進制數據
PRIORITY:優先級,從0-9,數值越大優先級越高
activemq_acks用於存儲訂閱關系。如果是持久化Topic,訂閱者和服務器的訂閱關系在這個表保存:
主要的數據庫字段如下:
CONTAINER:消息的Destination
SUB_DEST:如果是使用Static集群,這個字段會有集群其他系統的信息
CLIENT_ID:每個訂閱者都必須有一個唯一的客戶端ID用以區分
SUB_NAME:訂閱者名稱
SELECTOR:選擇器,可以選擇只消費滿足條件的消息。條件可以用自定義屬性實現,可支持多屬性AND和OR操作
LAST_ACKED_ID:記錄消費過的消息的ID。
表activemq_lock在集群環境中才有用,只有一個Broker可以獲得消息,稱為Master Broker,
其他的只能作為備份等待Master Broker不可用,才可能成為下一個Master Broker。
這個表用於記錄哪個Broker是當前的Master Broker。
啟動生產者生產一條消息
public static void main(String[] args) throws JMSException { //創建MQ連接工廠 ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_USER, ActiveMQConnectionFactory.DEFAULT_PASSWORD, "tcp://127.0.0.1:61616"); //創建連接 Connection connection = activeMQConnectionFactory.createConnection(); //啟動連接 connection.start(); //創建會話工廠 Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); //創建隊列 Queue queue = session.createQueue("wdksoft_queue"); //創建消息生產者 MessageProducer producer = session.createProducer(queue); //消息持久化 producer.setDeliveryMode(2); //模擬消息 TextMessage message = session.createTextMessage("hello activeMQ"); //發送消息 producer.send(message); System.out.println("生產者發送消息完畢~"); session.close(); connection.close(); }
刷新msgs數據表
數據表中就多了一條剛才的信息
當啟動消費者消費了這條信息時,數據庫便不再保存這條記錄
public static void main(String[] args) throws JMSException { // ConnectionFactory :連接工廠,JMS 用它創建連接 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://127.0.0.1:61616"); // JMS 客戶端到JMS Provider 的連接 Connection connection = connectionFactory.createConnection(); connection.start(); // Session: 一個發送或接收消息的線程 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // Destination :消息的目的地;消息發送給誰. Destination destination = session.createQueue("wdksoft_queue"); // 消費者,消息接收者 MessageConsumer consumer = session.createConsumer(destination); while (true) { //監聽消息 TextMessage message = (TextMessage) consumer.receive(); if (null != message) { System.out.println("收到消息:" + message.getText()); } else break; } session.close(); connection.close(); }