使用監聽器,來實現實時消費nsq的消息
一、目前spring boot中支持的事件類型如下
-
ApplicationFailedEvent:該事件為spring boot啟動失敗時的操作
-
ApplicationPreparedEvent:上下文context准備時觸發
-
ApplicationReadyEvent:上下文已經准備完畢的時候觸發
-
ApplicationStartedEvent:spring boot 啟動監聽類
-
SpringApplicationEvent:獲取SpringApplication
-
ApplicationEnvironmentPreparedEvent:環境事先准備
二、這里我使用的是監聽ApplicationReadyEvent事件,實現ApplicationListener<ApplicationReadyEvent>接口
package com.device.nsq.Receiver; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.context.event.ApplicationReadyEvent; import org.springframework.context.ApplicationListener; import org.springframework.context.ConfigurableApplicationContext;import org.springframework.stereotype.Component; import com.github.brainlag.nsq.NSQConsumer; import com.github.brainlag.nsq.lookup.DefaultNSQLookup; import com.github.brainlag.nsq.lookup.NSQLookup; /** * nsq監聽消息 * * @author joey * */ @Component public class NsqMessageReceiver implements ApplicationListener<ApplicationReadyEvent> { private Logger logger = LoggerFactory.getLogger(NsqMessageReceiver.class);/** * 監聽nsq消息 */ @Override public void onApplicationEvent(ApplicationReadyEvent event) { NSQLookup lookup = new DefaultNSQLookup(); Executor executor = Executors.newFixedThreadPool(20); lookup.addLookupAddress(127.0.0.1, 4150);// 監聽topicname的topic NSQConsumer registerConsumer = new NSQConsumer(lookup, "topicname", "channel", (message) -> { logger.info("收到消息:" + new String(message.getMessage())); message.finished(); }); registerConsumer.setExecutor(executor); registerConsumer.start(); } }
三、通過SpringApplication類中的addListeners方法將自定義的監聽器注冊進去
package com.device; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import com.device.nsq.Receiver.NsqMessageReceiver; @SpringBootApplication public class Application { public static void main(String[] args) { SpringApplication application = new SpringApplication(Application.class); application.addListeners(new NsqMessageReceiver()); application.run(args); } }
啟動,向nsq的topicname發送消息,程序會自動進行消費