ActiveMQ持久化到mysql
配置
1.找到apache-activemq-5.15.2/examples/conf下面的activemq-jdbc-performance.xml
2.打開activemq-jdbc-performance.xml,在persistenceAdapter節點后面添加dataSource="#mysql-ds"
並配置你的數據庫
其實可以直接更改apache-activemq-5.15.2/conf/activemq.xml的persistenceAdapter節點.配置下數據庫也是可以的
用activemq-jdbc-performance.xml 我的理解應該是高性能模式,連
3.把activemq-jdbc-performance.xml復制到apache-activemq-5.15.2/conf目錄下,從命名為activemq.xml,覆蓋原來的activemq.xml
4.在對應的數據庫創建activemq庫,然后重啟ActiveMQ
我們這里用debug模式啟動,提示沒有mysql的jar包
5.我們在apache-activemq-5.15.2/lib下面添加mysql的jar包,再次啟動,就不會報錯了
6.這時可以看到剛才創建的activemq庫多了三張表,說明配置成功了
點對點測試
生產者
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class Producer {
public static void main(String[] args) {
// String user = ActiveMQConnection.DEFAULT_USER;
// String password = ActiveMQConnection.DEFAULT_PASSWORD;
// String url = ActiveMQConnection.DEFAULT_BROKER_URL;
String subject = "test.queue";
ConnectionFactory contectionFactory = new ActiveMQConnectionFactory("tcp://192.168.1.109:61616");
// ConnectionFactory contectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
try{
Connection connection = contectionFactory.createConnection();
connection.start();
Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue(subject);
MessageProducer producer = session.createProducer(destination);
// producer.setDeliveryMode(DeliveryMode.PERSISTENT);//設置為持久化
for(int i = 0; i < 20;) {
TextMessage createTextMessage = session.createTextMessage("這是要發送的第"+ ++i +"條消息消息");
producer.send(createTextMessage);
System.out.println("第"+ i +"條消息已發送");
}
Thread.sleep(2000);
session.commit();
session.close();
connection.close();
}catch (JMSException e) {
// e.printStackTrace();
}catch (InterruptedException e) {
// e.printStackTrace();
}
}
}
消費者
import java.util.Date;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class Customer {
public static void main(String[] args) {
// String user = ActiveMQConnection.DEFAULT_USER;
//
// String password = ActiveMQConnection.DEFAULT_PASSWORD;
//
// String url = ActiveMQConnection.DEFAULT_BROKER_URL;
String subject = "test.queue";
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.1.109:61616");
// ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
Connection connection;
try {
connection= connectionFactory.createConnection();
connection.start();
final Session session =connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue(subject);
MessageConsumer message = session.createConsumer(destination);
message.setMessageListener(new MessageListener() {
public void onMessage(Message msg){
TextMessage message = (TextMessage) msg;
try {
System.out.println("--收到消息:" +new Date()+message.getText());
session.commit();
}catch(JMSException e) {
// e.printStackTrace();
}
}
});
// Thread.sleep(30000);
//
// session.close();
//
// Thread.sleep(30000);
//
// connection.close();
//
// Thread.sleep(30000);
}catch(Exception e) {
// e.printStackTrace();
}
}
}
這時生產者生產數據,消費者一直不在線,數據就會持久化到數據庫的activemq_msgs表,就算ActiveMQ的服務掛了,再次啟動后,等消費者在線了就可以再次獲取生產者生產的數據(消費之后數據庫的數據會自動刪除)