import net.sf.json.JSONObject; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationListener; import org.springframework.context.event.ContextRefreshedEvent; import com.rabbitmq.client.Channel; import com.rabbitmq.client.QueueingConsumer;/** * 啟動預加載信息類 *@author Administrator */ public class ContextLoaderSpringListener implements ApplicationListener<ContextRefreshedEvent>{ private static Log logger = LogFactory.getLog(ContextLoaderSpringListener.class); @Autowired private ShipmentCheckService shipmentCheckService; //當spring容器初始化完成后就會執行該方法。 public void onApplicationEvent(ContextRefreshedEvent event) { logger.debug("ConfigLoadListener init......"); try { //創建一個頻道 Channel channel = QueueUtil.getConnection().createChannel(); boolean durable = true; //聲明隊列,主要為了防止消息接收者先運行此程序,隊列還不存在時創建隊列。 channel.queueDeclare(QueueUtil.getQueueName(), durable, false, false, null); //創建隊列消費者 QueueingConsumer consumer = new QueueingConsumer(channel); //指定消費隊列 //TODO:並發測試MQ,ack? channel.basicConsume(QueueUtil.getQueueName(), false/*打開應答機制*/, consumer); while (true) { //nextDelivery是一個阻塞方法(內部實現其實是阻塞隊列的take方法) QueueingConsumer.Delivery delivery = consumer.nextDelivery(); byte[] body = delivery.getBody(); try { String str=new String(body,"UTF-8"); JSONObject j = JSONObject.fromObject(str); String shipmentId = j.getString("shipmentId"); String vehicleId = j.getString("vehicleId"); int planLineType = j.getInt("planLineType"); shipmentCheckService.check(shipmentId,vehicleId,planLineType); } catch (RuntimeException e) { logger.error("貨運單數據校驗出現異常:", e); logger.error("Source package:"+ CommUtil.getEncodeData(body)); } channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } catch (Exception e) { logger.error("貨運單存儲器出現異常:", e); } } }
private void storeInQueue(byte[] dst) throws IOException, TimeoutException { Channel channel = QueueUtil.getConnection().createChannel(); channel.queueDeclare(QueueUtil.getQueueName(), /*持久存儲*/false, false, false, null); channel.basicPublish("", QueueUtil.getQueueName(), null, dst); channel.close(); }