第一種:點對點

#發布者
public class Producer {
private static final String userName = ActiveMQXAConnectionFactory.DEFAULT_USER;
private static final String password = ActiveMQXAConnectionFactory.DEFAULT_PASSWORD;
private static final String brokerURL = "tcp://192.168.178.X:61616";
public static void main(String[] args) throws JMSException {
//1.創建連接工廠類
ConnectionFactory factory = new ActiveMQXAConnectionFactory(userName, password, brokerURL);
//2.創建連接
Connection connection = factory.createConnection();
//3.啟動連接
connection.start();
//4.創建會話對象session(事務transacted為true,參數2不生效)
//acknowledgeMode:
Session session = connection.createSession(true,Session.AUTO_ACKNOWLEDGE);
//5.目的地
Queue queue = session.createQueue("mq-test-01");
//7.創建發送者
MessageProducer producer = session.createProducer(queue);
for(int i=1;i<=10;i++) {
//6.消息對象
TextMessage message = session.createTextMessage();
message.setText("消息"+i);
//8.發送消息
producer.send(message);
}
//9.會話提交
// session.commit();
//10.關閉連接
connection.close();
}
}
#消費者
public class Consumer1 {
private static final String userName = ActiveMQXAConnectionFactory.DEFAULT_USER;
private static final String password = ActiveMQXAConnectionFactory.DEFAULT_PASSWORD;
private static final String brokerURL = "tcp://192.168.178.X:61616";
public static void main(String[] args) throws JMSException {
//1.創建連接工廠類
ConnectionFactory factory = new ActiveMQXAConnectionFactory(userName, password, brokerURL);
//2.創建連接
Connection connection = factory.createConnection();
//3.啟動連接
connection.start();
//4.創建會話對象session(事務transacted為true,參數2不生效)
//acknowledgeMode:
Session session = connection.createSession(true,Session.AUTO_ACKNOWLEDGE);
//5.目的地
Queue queue = session.createQueue("mq-test-01");
//6.接收消息對象
MessageConsumer consumer = session.createConsumer(queue);
//7.通過監聽器接收消息
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage)message;
//獲取消息
try {
String msg = textMessage.getText();
System.out.println(msg);
} catch (JMSException e) {
}
}
});
}
}
第二種: 發布者/訂閱者

啟動順序:先訂閱、再發布
#訂閱者
public class Subscriber1 {
private static final String userName = ActiveMQXAConnectionFactory.DEFAULT_USER;
private static final String password = ActiveMQXAConnectionFactory.DEFAULT_PASSWORD;
private static final String brokerURL = "tcp://192.168.129.10:61616";
public static void main(String[] args) throws JMSException {
//1.創建連接工廠類
ConnectionFactory factory = new ActiveMQXAConnectionFactory(userName, password, brokerURL);
//2.創建連接
Connection connection = factory.createConnection();
//3.啟動連接
connection.start();
//4.創建會話對象session(事務transacted為true,參數2不生效)
//acknowledgeMode:
Session session = connection.createSession(true,Session.AUTO_ACKNOWLEDGE);
//5.目的地 Topic topic = session.createTopic("mq-test-02"); //6.接收消息對象
MessageConsumer consumer = session.createConsumer(topic);
//7.通過監聽器接收消息
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage)message;
//獲取消息
try {
String msg = textMessage.getText();
System.out.println(msg);
} catch (JMSException e) {
}
}
});
}
}
#發布者
public class Publisher {
private static final String userName = ActiveMQXAConnectionFactory.DEFAULT_USER;
private static final String password = ActiveMQXAConnectionFactory.DEFAULT_PASSWORD;
private static final String brokerURL = "tcp://192.168.129.10:61616";
public static void main(String[] args) throws JMSException {
//1.創建連接工廠類
ConnectionFactory factory = new ActiveMQXAConnectionFactory(userName, password, brokerURL);
//2.創建連接
Connection connection = factory.createConnection();
//3.啟動連接
connection.start();
//4.創建會話對象session(事務transacted為true,參數2不生效)
//acknowledgeMode:
Session session = connection.createSession(true,Session.AUTO_ACKNOWLEDGE);
//5.目的地 Topic topic = session.createTopic("mq-test-02"); //7.創建發送者
MessageProducer producer = session.createProducer(topic);
for(int i=1;i<=10;i++) {
//6.消息對象
TextMessage message = session.createTextMessage();
message.setText("消息"+i);
//8.發送消息
producer.send(message);
//設置自動持久化 producer.setDeliveryMode(DeliveryMode.PERSISTENT);
}
//9.會話提交
// session.commit();
//10.關閉連接
connection.close();
}
}