SpringCloudStream集成kafka


原文鏈接:https://www.jianshu.com/p/a94c67f02c16

Spring Cloud Stream是構建消息驅動的微服務應用程序框架。提供統一的接收發送管道以連接到消息代理。通過@EnableBinding注解開啟SpringCloudStream的支持。通過@StreamListener注解,使其接收流處理的時間。


 
SpringCloudStream應用模型

一、引入依賴包

<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-kafka</artifactId> </dependency> 

二、自定義信息通道

官方提供了Sink(輸入通道)、Source(輸出通道)、Processor(集成Sink和Source通道),我們也可以自定義我們自己的信息通道。
@Input注解標識一個輸入通道
@Output注解標識一個輸出通道
通道名稱作為參數,如果未提供參數,默認使用方法名稱作為通道名稱。
如下我們自定義信息通道EsChannel

/** * 自定義信息通道 * @author dbq * @date 2019/9/26 14:54 */ public interface EsChannel { /** * 缺省發送消息通道名稱 */ String ES_DEFAULT_OUTPUT = "es_default_output"; /** * 缺省接收消息通道名稱 */ String ES_DEFAULT_INPUT = "es_default_input"; /** * 告警發送消息通道名稱 */ String ES_ALARM_OUTPUT = "es_alarm_output"; /** * 告警接收消息通道名稱 */ String ES_ALARM_INPUT = "es_alarm_input"; /** * 缺省發送消息通道 * @return channel 返回缺省信息發送通道 */ 

三、@EnableBinding使應用程序連接到消息代理

@EnableDiscoveryClient @SpringBootApplication @EnableFeignClients @EnableHystrix @MapperScan(basePackages = "com.es.mapper") @EnableBinding(EsChannel.class) public class EsOnenetApplication { public static void main(String[] args) { SpringApplication.run(EsOnenetApplication.class, args); } } 

四、SpringCloudStream及kafka配置

#============================================================== #spring-cloud-stream-Kafka配置 開始 #============================================================== #是否開啟kafka(非spring-cloud-stream配置) spring.kafka.enabled=false #缺省的輸入、輸出通道 spring.cloud.stream.bindings.es_default_input.destination=es_default_topic spring.cloud.stream.bindings.es_default_input.binder=kafka spring.cloud.stream.bindings.es_default_input.group=es_default_group spring.cloud.stream.bindings.es_default_output.destination=es_default_topic spring.cloud.stream.bindings.es_default_output.binder=kafka #入站消費者的並發性 spring.cloud.stream.bindings.es_default_input.consumer.concurrency=2 #告警的輸入、輸出通道(多主題、分組測試用,實際開發中根據業務需求定義) spring.cloud.stream.bindings.es_alarm_input.destination=es_alarm_topic spring.cloud.stream.bindings.es_alarm_input.binder=kafka spring.cloud.stream.bindings.es_alarm_input.group=es_alarm_group spring.cloud.stream.bindings.es_alarm_output.destination=es_alarm_topic spring.cloud.stream.bindings.es_alarm_output.binder=kafka #kafka配置 spring.cloud.stream.kafka.binder.brokers=172.*.*.6:9092,172.*.*.7:9092,172.*.*.8:9092 spring.cloud.stream.kafka.binder.zkNodes=172.*.*.6:2181,172.*.*.7:2181,172.*.*.8:2181 spring.cloud.stream.kafka.binder.requiredAcks=1 #============================================================== #spring-cloud-stream-Kafka配置 結束 #============================================================== 

從上面配置可以看出
1、定義了通道名稱及分組,binder代表綁定實現的標識名稱(如kafka或者rabbit),與3中的定義名稱相對應。
2、定義了入站消費者的並發性,指在一個實例內的並發性,不同實例之間本身就是並發的,默認值為1
spring.cloud.stream.bindings.<channelName>.consumer.concurrency=2
3、定義了kafka連接信息
如果未配置autoCommitOffset,默認自動提交偏移量
詳細參數配置可參考官網

五、發送消息到輸出通道

/** * kafka消息發送器 * @author dbq * @date 2019/9/26 17:50 */ @Component public class EsKafkaMessageSender { @Autowired private EsChannel channel; /** * 消息發送到默認通道:缺省通道對應缺省主題 * @param message */ public void sendToDefaultChannel(String message){ channel.sendEsDefaultMessage().send(MessageBuilder.withPayload(message).build()); } /** * 消息發送到告警通道:告警通道對應告警主題 * @param message */ public void sendToAlarmChannel(String message){ channel.sendEsAlarmMessage().send(MessageBuilder.withPayload(message).build()); } } 

注入先前定義的通道EsChannel,sendToDefaultChannel、sendToAlarmChannel分別為我們自定義的兩個發送方法,可將消息發送到不同的通道中,每個通道對應一個kafka的主題。

六、從輸入通道訂閱消息

@EnableBinding(value = EsChannel.class) public class EsStreamListener { /** * 從缺省通道接收消息 * @param message */ @StreamListener(EsChannel.ES_DEFAULT_INPUT) public void receive(Message<String> message){ SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss"); System.out.println(sdf.format(new Date())+"------start--------安全用電默認消息:" + message); try { Thread.sleep(1000*10); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(sdf.format(new Date())+"------end--------安全用電默認消息"); } /** * 從告警通道接收消息 * @param message */ @StreamListener(EsChannel.ES_ALARM_INPUT) public void receiveAlarm(Message<String> message){ System.out.println("訂閱告警消息:" + message); } } 

從不同的通道實現消息的訂閱。

七、這樣完整的消息系統就搭建好了,定義Controller發送消息測試

@ApiOperation(value = "test1", httpMethod = "POST") @PostMapping(value = "/test1", produces = "application/json;charset=UTF-8") public void test1(String message, HttpServletRequest request, HttpServletResponse response) { sender.sendToDefaultChannel(message); sender.sendToDefaultChannel(message); sender.sendToDefaultChannel(message); sender.sendToDefaultChannel(message); } @ApiOperation(value = "test", httpMethod = "POST") @PostMapping(value = "/test2", produces = "application/json;charset=UTF-8") public void test2(String message, HttpServletRequest request, HttpServletResponse response) { sender.sendToAlarmChannel(message); } 

test1:發送消息的缺省消息通道
test2:發送消息到告警消息通道

八、並發性測試

如七中所示,一次發送4條消息到缺省消息通道中,並啟動兩個實例(即兩個微服務組成一個小型集群),在並發性配置為1的情況下,即spring.cloud.stream.bindings.es_default_input.consumer.concurrency=1

實例1

2019-09-30 11:13:14------start--------默認消息... 2019-09-30 11:13:24------end--------默認消息 

實例2

2019-09-30 11:13:14------start--------默認消息:... 2019-09-30 11:13:24------end--------默認消息 2019-09-30 11:13:24------start--------默認消息:... 2019-09-30 11:13:34------end--------默認消息 2019-09-30 11:13:34------start--------默認消息:... 2019-09-30 11:13:44------end--------默認消息 

通過打印日志(日志做了簡化處理)可以看出,兩個實例之間是做到了並發消費,但是在1個實例內部,並沒有並發消費。
如果將concurrency修改為2.
日志如下
實例1

2019-09-30 11:31:13------start--------:... 2019-09-30 11:31:13------start--------默認消息:... 2019-09-30 11:31:23------end--------默認消息 2019-09-30 11:31:23------end--------默認消息 2019-09-30 11:31:23------start--------默認消息:... 2019-09-30 11:31:33------end--------默認消息 

實例2

2019-09-30 11:31:13------start--------默認消息:... 2019-09-30 11:31:23------end-------- 

從日志可以看出,實例1中實現了兩個線程的並發消費。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM