上一篇記錄了rabbitmq的安裝,這一篇記錄一下rabbitmq的java客戶端的簡單使用,當然在項目中我們有更為復雜的應用場景,這里只有最簡單的點對點生產者與消費者模式。
1、建立工程
首先建立一個簡單的maven工程,我這邊使用了平時使用的demo工程
pom.xml配置,本次案例中只需要兩個包即可,是用commons包的序列化,amqp則是rabbitmq的java包。
2、新建點對點抽象類
因為這個例子只講述非常簡單的點對點生產者與消費者關系,在某種程度上兩者有很多共性,所以這里干脆抽象成一個類了。具體代碼如下:
package ucs_test.rabbitmq; import java.io.IOException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * @author 作者 ucs_fuqing * @date 創建時間:2017年8月11日 下午2:21:27 * @version 1.0 * @parameter * @since * @return */ public abstract class PointToPoint { protected Channel channel; protected Connection connection; protected String pointName; /** * 獲取一個隊列的連接 * @param pointName * @throws IOException */ public PointToPoint(String pointName) throws IOException{ this.pointName = pointName; //創建連接工廠 ConnectionFactory cf = new ConnectionFactory(); //設置rabbitmq服務器地址 cf.setHost("192.168.149.133"); //設置rabbitmq服務器用戶名 cf.setUsername("hxb"); //設置rabbitmq服務器密碼 cf.setPassword("hxb"); //獲取一個新的連接 connection = cf.newConnection(); //創建一個通道 channel = connection.createChannel(); //申明一個隊列,如果這個隊列不存在,將會被創建 channel.queueDeclare(pointName, false, false, false, null); } /** * * @Title: close * @Description: 其實在程序完成時一般會自動關閉連接,但是這里提供手動操作的入口, * @param @throws IOException 設定文件 * @return void 返回類型 * @throws */ public void close() throws IOException{ this.channel.close(); this.connection.close(); } }
在上面代碼中,實現的是創建一個隊列或者關閉它,在默認的情況下channel和connection會自動關閉,但是我覺得還是提供手動關閉的入口更好一些。
3、生產者
這個例子中的生產者其實非常簡單,我們創建了一個連接,並且獲取了通道,接下來就可以直接往我們指定的隊列(queue)中發送消息了,如果這個隊列不存在,則會被程序自動創建。
package ucs_test.rabbitmq; import java.io.IOException; import java.io.Serializable; import org.apache.commons.lang.SerializationUtils; import com.mchange.io.SerializableUtils; /** * @author 作者 ucs_fuqing * @date 創建時間:2017年8月11日 下午2:33:13 * @version 1.0 * @parameter * @since * @return */ public class Producer extends PointToPoint{ public Producer(String pointName) throws IOException { super(pointName); // TODO Auto-generated constructor stub } /** * * @Title: sendMessage * @Description: 生產消息 * @param @param Object * @param @throws IOException 設定文件 * @return void 返回類型 * @throws */ public void sendMessage(Serializable Object) throws IOException{ channel.basicPublish("", pointName, null, SerializationUtils.serialize(Object)); } }
上面代碼看到,我們只是簡單的向pointName的隊列發送了一個對象。
4、消費者
我們這里的消費者也非常簡單,僅僅只是拿到並打印出消息即可
package ucs_test.rabbitmq; import java.io.IOException; import java.util.HashMap; import java.util.Map; import org.apache.commons.lang.SerializationUtils; import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.Envelope; import com.rabbitmq.client.ShutdownSignalException; /** * @author 作者 ucs_fuqing * @date 創建時間:2017年8月11日 下午2:39:51 * @version 1.0 * @parameter * @since * @return */ public class QueueConsumer extends PointToPoint implements Runnable,Consumer{ public QueueConsumer(String pointName) throws IOException { super(pointName); // TODO Auto-generated constructor stub } public void run(){ try { channel.basicConsume(pointName,true,this); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } @Override public void handleConsumeOk(String consumerTag) { // TODO Auto-generated method stub System.out.println("Consumer "+consumerTag +" registered"); } @Override public void handleCancelOk(String consumerTag) { // TODO Auto-generated method stub } @Override public void handleCancel(String consumerTag) throws IOException { // TODO Auto-generated method stub } @Override public void handleDelivery(String consumerTag, Envelope env, BasicProperties props, byte[] body) throws IOException { // TODO Auto-generated method stub Map map = (HashMap)SerializationUtils.deserialize(body); System.out.println("Message Number "+ map.get("tagId") + " received."); //channel.basicAck(env.getDeliveryTag(), false); } @Override public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) { // TODO Auto-generated method stub } @Override public void handleRecoverOk(String consumerTag) { // TODO Auto-generated method stub } }
以上代碼中,我們指定了消費的隊列,並從中拿到消息,打印出來。
5、測試類
至此我們的生產者與消費者都寫完了,接着寫個測試類來驗證一下
package ucs_test.rabbitmq; import java.io.IOException; import java.util.HashMap; /** * @author 作者 ucs_fuqing * @date 創建時間:2017年8月11日 下午2:44:59 * @version 1.0 * @parameter * @since * @return */ public class MainTest { public MainTest() throws IOException{ QueueConsumer consumer = new QueueConsumer("testqueue"); Thread cuThread = new Thread(consumer); cuThread.start(); Producer producer = new Producer("testqueue"); int i = 0; while (i<10000) { HashMap<String, Object> hm = new HashMap<>(); hm.put("tagId", i); producer.sendMessage(hm); System.out.println("發送第"+i+"消息"); i++; } } public static void main(String[] args) throws IOException { new MainTest(); } }
在這里我們的生產者生產10000條消息,消費者拿到並打印出來。看看運行結果:
可以看到雖然有點亂序,但是10000條消息全部被消費完畢。
6、消息應答
在上面的例子中,我們的生產者只管發送消息,消費者只管消費消息,而RabbitMQ在上面的例子中,將消息交付給消費者之后,會從內存中移除掉這個消息。在正式的項目中,消費消息可能需要那么幾秒鍾,那么問題來了:如果我們拿到消息后需要進行更為復雜的業務處理,而這個業務處理失敗或者中斷了,那么意味着這條消息代表的工作並未完成,但是消息已經不存在了,我們會丟失掉正在處理的信息,也會丟失掉發給消費者但是並未被消費的消息。
現在我們使用兩個消費者來接受同一個隊列的消息,測試類如下:
package ucs_test.rabbitmq; import java.io.IOException; import java.util.HashMap; /** * @author 作者 ucs_fuqing * @date 創建時間:2017年8月11日 下午2:44:59 * @version 1.0 * @parameter * @since * @return */ public class MainTest { public MainTest() throws IOException{ QueueConsumer consumer = new QueueConsumer("testqueue"); Thread cuThread = new Thread(consumer); QueueConsumer consumer2 = new QueueConsumer("testqueue"); Thread cuThread2 = new Thread(consumer2); cuThread.start(); cuThread2.start(); Producer producer = new Producer("testqueue"); int i = 0; while (i<10000) { HashMap<String, Object> hm = new HashMap<>(); hm.put("tagId", i); producer.sendMessage(hm); //System.out.println("發送第"+i+"消息"); i++; } } public static void main(String[] args) throws IOException { new MainTest(); } }
在這種情況下,MQ將會均勻的將消息發送給兩個消費者消費,但是如果consumer2半路終止或者異常,那么將會導致我們的測試結果顯示接受到的消息少於10000條,消失的消息被異常的消費者吃掉了,而我們沒有任何辦法。。。
為了保證消息不會丟失,或者說肯定被消費,RabbitMQ支持消息應答模式,簡單的只需要修改兩個位置:
消費者類QueueConsumer中
設置basicConsume方法參數為false,打開消息應答
消費完成之后,向mq返回應答消息。
這樣,當消費者異常時,MQ沒有收到消費者消息應答,將會把消息發送給其他消費者,保證這條消息被消費掉。
OK,簡單的RabbitMQ服務器Java端例子就這樣了。下一篇會在此基礎上增加一些高級的應用。