SpringBoot 消費NSQ消息


使用監聽器,來實現實時消費nsq的消息

一、目前spring boot中支持的事件類型如下

  1. ApplicationFailedEvent:該事件為spring boot啟動失敗時的操作

  2. ApplicationPreparedEvent:上下文context准備時觸發

  3. ApplicationReadyEvent:上下文已經准備完畢的時候觸發

  4. ApplicationStartedEvent:spring boot 啟動監聽類

  5. SpringApplicationEvent:獲取SpringApplication

  6. ApplicationEnvironmentPreparedEvent:環境事先准備

二、這里我使用的是監聽ApplicationReadyEvent事件,實現ApplicationListener<ApplicationReadyEvent>接口

package com.device.nsq.Receiver;

import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.context.ConfigurableApplicationContext;import org.springframework.stereotype.Component;
import com.github.brainlag.nsq.NSQConsumer;
import com.github.brainlag.nsq.lookup.DefaultNSQLookup;
import com.github.brainlag.nsq.lookup.NSQLookup;

/**
 * nsq監聽消息
 * 
 * @author joey
 *
 */
@Component
public class NsqMessageReceiver implements ApplicationListener<ApplicationReadyEvent> {
    private Logger logger = LoggerFactory.getLogger(NsqMessageReceiver.class);/**
     * 監聽nsq消息
     */
    @Override
    public void onApplicationEvent(ApplicationReadyEvent event) {
        NSQLookup lookup = new DefaultNSQLookup();
        Executor executor = Executors.newFixedThreadPool(20);        lookup.addLookupAddress(127.0.0.1, 4150);// 監聽topicname的topic
        NSQConsumer registerConsumer = new NSQConsumer(lookup, "topicname", "channel",
                (message) -> {
                    logger.info("收到消息:" + new String(message.getMessage()));
                    message.finished();
                });
        registerConsumer.setExecutor(executor);
        registerConsumer.start();
  }
}

三、通過SpringApplication類中的addListeners方法將自定義的監聽器注冊進去

package com.device;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import com.device.nsq.Receiver.NsqMessageReceiver;

@SpringBootApplication
public class Application {
    public static void main(String[] args) {
        SpringApplication application = new SpringApplication(Application.class);
        application.addListeners(new NsqMessageReceiver());
        application.run(args);
    }
}

啟動,向nsq的topicname發送消息,程序會自動進行消費


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM