消息中間件RabbitMq的代碼使用案例


 正文前先來一波福利推薦:

福利一:

百萬年薪架構師視頻,該視頻可以學到很多東西,是本人花錢買的VIP課程,學習消化了一年,為了支持一下女朋友公眾號也方便大家學習,共享給大家。

福利二:

畢業答辯以及工作上各種答辯,平時積累了不少精品PPT,現在共享給大家,大大小小加起來有幾千套,總有適合你的一款,很多是網上是下載不到。

獲取方式:

微信關注 精品3分鍾 ,id為 jingpin3mins,關注后回復   百萬年薪架構師 ,精品收藏PPT  獲取雲盤鏈接,謝謝大家支持!

------------------------正文開始---------------------------

消費者:

---------------------- 構造初始化:

public RabbitMqReceiver(String host, int port, String username, String password) 
{
connFactory = new ConnectionFactory();
connFactory.setHost(host);
connFactory.setPort(port);
connFactory.setUsername(username);
connFactory.setPassword(password);
}
********************************************************************************


---------------------- 構造初始化:
    public Channel createChannel() throws IOException {
getConnection();
Channel channel = connection.createChannel();
if (channel != null) {
           int prefetchCount = 1;
           channel.basicQos(prefetchCount);//最多為當前接收方發送一條消息。如果接收方還未處理完畢消息,還沒有回發確認,就不要再給他分配消息了,應該把當前消息分配給其它空閑接收方。
        boolean durable = true; //Server端的Queue持久化
        channel.queueDeclare("task_queue", durable, false, false, null); 
logger.info(mqInfo.getAddress() + ":" + mqInfo.getPort() + " MQ Receiver成功創建Channel");
} else {
logger.info(mqInfo.getAddress() + ":" + mqInfo.getPort() + " MQ Receiver創建Channel失敗");
}

return channel;
}
********************************************************************************

---------------------- 取得connection實例:
private void getConnection() throws IOException
{
synchronized (this) {
if (connection == null || !connection.isOpen()) {
connection = connFactory.newConnection();
if (connection != null) {
logger.info(mqInfo.getAddress() + ":" + mqInfo.getPort() + " MQ Receiver成功獲取連接");
} else {
logger.info(mqInfo.getAddress() + ":" + mqInfo.getPort() + " MQ Receiver獲取連接失敗");
}
} else {
logger.info(mqInfo.getAddress() + ":" + mqInfo.getPort() + " MQ Receiver連接已存在,復用此連接");
}
}
}
********************************************************************************

----------------------獲取Consumer實例:
public QueueingConsumer createConsumer(Channel channel, String queueName) throws IOException {
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, consumer); //自動消息確認打開,默認開啟了消息確認(接收方接收到消息后,立即向服務器發回確認)。消息接收方處理完消息后,向服務器發送消息確認,服務器再刪除該消息。

return consumer;
}
********************************************************************************

----------------------從從rabbitMQ提取消息並轉換為對象:
private String getMessageFromMQ() {
String message = StringUtils.EMPTY;
String source = StringUtils.EMPTY;
try {
message = receiver.nextMessage(checkNotNull(consumer), 1000);
source = message;
} catch (ShutdownSignalException e) {
logger.error("", e);
} catch (ConsumerCancelledException e) {
logger.error("consumer exception", e);
} catch (InterruptedException e) {
logger.error("timeout exception", e);
}
try {
if (StringUtils.isNotBlank(message)) {
message = checkNotNull(StringUtils.substringAfter(message, "yyy:"), "xxx");
message = checkNotNull(StringEscapeUtils.unescapeJava(message), "unescape error");
int size = message.length();
if (size > 1) {
message = checkNotNull(message.substring(0, message.length() - 1), "get json-data error");// 去掉末尾的”
} else {
logger.warn(String.format("數據異常,message=%s", source));
}
}
} catch (Throwable e) {
logger.error(String.format("數據異常,message=%s", source), e);
}
return message;
}
********************************************************************************

----------------------每次讀取一條消息:

public String nextMessage(QueueingConsumer consumer, long timeOut) throws ShutdownSignalException, ConsumerCancelledException, InterruptedException {

QueueingConsumer.Delivery delivery;
if (timeOut > 0) {
delivery = consumer.nextDelivery(timeOut);
} else {
delivery = consumer.nextDelivery();
}
if (delivery == null) {
return StringUtils.EMPTY;
}

String message = new String(delivery.getBody());
return message;
}
********************************************************************************
 
        
---------------------- 在storm中創建mq實例:
SpoutOutputCollector collector;
RabbitMqReceiver receiver;
Channel channel;
QueueingConsumer consumer;
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector)  //初始化調用一次
{
this.collector = collector;
receiver = checkNotNull(new RabbitMqReceiver(conf.get("crash.mq.host").toString(),
Integer.valueOf(conf.get("crash.mq.port").toString()), conf.get("crash.mq.user").toString(),
conf.get("crash.mq.pwd").toString()), "receiver is null");
try {
channel = checkNotNull(receiver.createChannel(), "channel is null");
consumer = checkNotNull(receiver.createConsumer(channel, conf.get("crash.mq.channel").toString()),
"comsumer is null");
} catch (Exception e) {
logger.error("init mq-client error:", e);
}
}


---------------------- 在storm中循環執行獲得消息實例:
@Override
public void nextTuple()
{
String message = getMessageFromMQ();
}

生產者:
--------------------------------------------------:


private final static String QUEUE_NAME = "hello2";// 隊列名不能重復 之前已有就會失敗
public class Producer {  

    private final static String QUEUE_NAME = "hello2";// 隊列名不能重復 之前已有就會失敗  

    public static void main(String[] argv) throws java.io.IOException {  

        /* 使用工廠類建立Connection和Channel,並且設置參數 */  
        ConnectionFactory factory = new ConnectionFactory();  
        factory.setHost("192.168.10.111");// MQ的IP  
        factory.setPort(5672);// MQ端口  
        factory.setUsername("asdf");// MQ用戶名  
        factory.setPassword("123456");// MQ密碼  
        Connection connection = factory.newConnection();  
        Channel channel = connection.createChannel();  

        /* 創建消息隊列,並且發送消息 */  
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);  
        String message = "消息2";  
        channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());  //Message持久化
        System.out.println("生產了個'" + message + "'");  

        /* 關閉連接 */  
        channel.close();  
        connection.close();  
    }  

}

  




免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM