rabbitMq創建和獲取消息


import net.sf.json.JSONObject;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;/**
 * 啟動預加載信息類
 *@author Administrator
 */
public class ContextLoaderSpringListener implements ApplicationListener<ContextRefreshedEvent>{
    
    private static Log logger = LogFactory.getLog(ContextLoaderSpringListener.class);
    @Autowired
    private ShipmentCheckService shipmentCheckService;

    //當spring容器初始化完成后就會執行該方法。
    public void onApplicationEvent(ContextRefreshedEvent event) {
        logger.debug("ConfigLoadListener init......");
        try {
            //創建一個頻道
            Channel channel = QueueUtil.getConnection().createChannel();
            boolean durable = true;
            //聲明隊列,主要為了防止消息接收者先運行此程序,隊列還不存在時創建隊列。
            channel.queueDeclare(QueueUtil.getQueueName(), durable, false, false, null);

            //創建隊列消費者
            QueueingConsumer consumer = new QueueingConsumer(channel);
            //指定消費隊列
            //TODO:並發測試MQ,ack?
            channel.basicConsume(QueueUtil.getQueueName(), false/*打開應答機制*/, consumer);
            while (true) {
                //nextDelivery是一個阻塞方法(內部實現其實是阻塞隊列的take方法)
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                byte[] body = delivery.getBody();
                try {                    
                    String str=new String(body,"UTF-8");
                    JSONObject j = JSONObject.fromObject(str);
                    String shipmentId = j.getString("shipmentId");
                    String vehicleId = j.getString("vehicleId");
                    int planLineType = j.getInt("planLineType");
                
                    shipmentCheckService.check(shipmentId,vehicleId,planLineType);
                } catch (RuntimeException e) {
                    logger.error("貨運單數據校驗出現異常:", e);
                    logger.error("Source package:"+ CommUtil.getEncodeData(body));
                }
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        } catch (Exception e) {
            logger.error("貨運單存儲器出現異常:", e);
        }
    }
    

}

 

    private void storeInQueue(byte[] dst) throws IOException, TimeoutException {
        Channel channel = QueueUtil.getConnection().createChannel();
        channel.queueDeclare(QueueUtil.getQueueName(), /*持久存儲*/false, false, false, null);
        channel.basicPublish("", QueueUtil.getQueueName(), null, dst);
        channel.close();
    }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM