例子1
Producer.java
import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Producer { public final static String QUEUE_NAME="rabbitMQ_test2"; public static void main(String[] args) throws IOException, TimeoutException { //創建連接工廠 ConnectionFactory factory = new ConnectionFactory(); //設置RabbitMQ相關信息 factory.setHost("100.51.15.10"); factory.setUsername("admin"); factory.setPassword("admin"); factory.setPort(5672); //創建一個新的連接 Connection connection = factory.newConnection(); //創建一個通道 Channel channel = connection.createChannel(); // 聲明一個隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); //發送消息到隊列中 String message = "Hello RabbitMQ"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8")); System.out.println("Producer Send +'" + message + "'"); //關閉通道和連接 channel.close(); connection.close(); } }
Consumer.java
import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import com.rabbitmq.client.AMQP; public class Customer { private final static String QUEUE_NAME = "rabbitMQ_test2"; public static void main(String[] args) throws IOException, TimeoutException { // 創建連接工廠 ConnectionFactory factory = new ConnectionFactory(); //設置RabbitMQ地址 factory.setHost("100.51.15.10"); factory.setUsername("admin"); factory.setPassword("admin"); factory.setPort(5672); //創建一個新的連接 Connection connection = factory.newConnection(); //創建一個通道 Channel channel = connection.createChannel(); //聲明要關注的隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println("Customer Waiting Received messages"); //DefaultConsumer類實現了Consumer接口,通過傳入一個頻道, // 告訴服務器我們需要那個頻道的消息,如果頻道中有消息,就會執行回調函數handleDelivery Consumer consumer = new DefaultConsumer(channel) { public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("Customer Received '" + message + "'"); } }; //自動回復隊列應答 -- RabbitMQ中的消息確認機制 channel.basicConsume(QUEUE_NAME, true, consumer); } }
執行
Producer.java
Producer Send +'Hello RabbitMQ'
Producer Send +'Hello RabbitMQ'
Consumer.java
Customer Received 'Hello RabbitMQ' Customer Received 'Hello RabbitMQ'
例子2
首先寫一個類,將產生產者和消費者統一為 EndPoint類型的隊列。不管是生產者還是消費者,連接隊列的代碼都是一樣的,這樣可以通用一些。
EndPoint.java
//package co.syntx.examples.rabbitmq; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * Represents a connection with a queue * @author syntx * */ public abstract class EndPoint{ protected Channel channel; protected Connection connection; protected String endPointName; public EndPoint(String endpointName) throws IOException{ this.endPointName = endpointName; //Create a connection factory ConnectionFactory factory = new ConnectionFactory(); //hostname of your rabbitmq server factory.setHost("100.51.15.10"); factory.setUsername("admin"); factory.setPassword("admin"); factory.setPort(5672); //getting a connection try{ connection = factory.newConnection(); }catch (TimeoutException ex) { System.out.println(ex); connection = null; } //creating a channel channel = connection.createChannel(); //declaring a queue for this channel. If queue does not exist, //it will be created on the server. channel.queueDeclare(endpointName, false, false, false, null); } /** * 關閉channel和connection。並非必須,因為隱含是自動調用的。 * @throws IOException */ public void close() throws IOException{ try{ this.channel.close(); } catch (TimeoutException ex){ System.out.println("ex" + ex); } this.connection.close(); } }
Producer2.java
import java.io.IOException; import java.io.Serializable; import org.apache.commons.lang.SerializationUtils; public class Producer2 extends EndPoint{ public Producer2(String endPointName) throws IOException{ super(endPointName); } public void sendMessage(Serializable object) throws IOException { channel.basicPublish("",endPointName, null, SerializationUtils.serialize(object)); } }
QueueConsumer.java
import java.io.IOException; import java.util.HashMap; import java.util.Map; import org.apache.commons.lang.SerializationUtils; import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.Envelope; import com.rabbitmq.client.ShutdownSignalException; public class QueueConsumer extends EndPoint implements Runnable, Consumer{ public QueueConsumer(String endPointName) throws IOException{ super(endPointName); } public void run() { try { //start consuming messages. Auto acknowledge messages. channel.basicConsume(endPointName, true,this); } catch (IOException e) { e.printStackTrace(); } } /** * Called when consumer is registered. */ public void handleConsumeOk(String consumerTag) { System.out.println("Consumer "+consumerTag +" registered"); } /** * Called when new message is available. */ public void handleDelivery(String consumerTag, Envelope env, BasicProperties props, byte[] body) throws IOException { Map map = (HashMap)SerializationUtils.deserialize(body); System.out.println("Message Number "+ map.get("message number") + " received."); } public void handleCancel(String consumerTag) {} public void handleCancelOk(String consumerTag) {} public void handleRecoverOk(String consumerTag) {} public void handleShutdownSignal(String consumerTag, ShutdownSignalException arg1) {} }
Main.java
import java.io.IOException; import java.sql.SQLException; import java.util.HashMap; public class Main { public Main() throws Exception{ QueueConsumer consumer = new QueueConsumer("queue"); Thread consumerThread = new Thread(consumer); consumerThread.start(); Producer2 producer = new Producer2("queue"); for (int i = 0; i < 5; i++) { HashMap message = new HashMap(); message.put("message number", i); producer.sendMessage(message); System.out.println("Message Number "+ i +" sent."); } } public static void main(String[] args) throws Exception{ new Main(); System.out.println("##############end..."); } }