前言
場景描述
當客戶端向服務端請求,服務端返回出現了異常,對於客戶端1返回為NULL,而對於客戶端2返回的是正常數據。而服務端並不知道返回給客戶端們的數據對不對,只能通過用戶反饋來證實返回的錯誤性,顯然是不正確的。
Stream簡介
Spring Cloud Stream 是一個用來為微服務應用構建消息驅動能力的框架。它可以基於 Spring Boot 來創建獨立的、可用於生產的 Spring 應用程序。Spring Cloud Stream 為一些供應商的消息中間件產品提供了個性化的自動化配置實現,並引入了發布-訂閱、消費組、分區這三個核心概念。通過使用 Spring Cloud Stream,可以有效簡化開發人員對消息中間件的使用復雜度,讓系統開發人員可以有更多的精力關注於核心業務邏輯的處理。但是目前 Spring Cloud Stream 只支持 RabbitMQ 和 Kafka 的自動化配置。
主要議題
- Kafka
- Spring Kafka
- Spring Boot Kafka
- Spring Cloud Stream
- Spring Cloud Stream Kafka Binder
- 問題總結
主體內容
一、Kafka
主要用途
- 消息中間件
- 流式計算處理
- 日志
執行腳本目錄bin
E:\JavaEE\kafka-2.5.0-src\kafka-2.5.0-src\bin\windows
同類產品比較
- ActiveMQ:IMS(Java Message Service)規范實現
- RabbitMQ:AMQP(Advanved Message Queue Protocol)規范實現
- Kafka:並非某種規范的實現,它靈活和性能相對是優勢的
快速上手步驟
1.下載並解壓kafka壓縮包。
2.下載並解壓zookeeper壓縮包,這里官方它的quickstart就是以zookeeper保證強一致性。zookeeper官方地址:https://zookeeper.apache.org/
3.以windows為例,我們先到zookeeper的conf目錄下,把zoo_sample.cfg文件復制一份重命名為zoo.cfg。現在目錄如下所示:
然后打開cmd,進入bin目錄,啟動服務。
zkServer.cmd
4.啟動kafka。進入到kafka的window文件夾,執行啟動命令。
kafka-server-start.bat ../../config/server.properties
5.創建kafka主題。再次打開一個cmd窗口,進入到windows文件夾
kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic gupao
6.生產者發送消息/生產消息
kafka-console-producer.bat --broker-list localhost:9092 --topic gupao
然后輸入要發送的消息:
7.消費者接收消息/消費消息
重新打開一個cmd,輸入接收命令。
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic gupao
當我在生產者端輸入消息,消費者端馬上就接收到了消息。
如果消費命令后面加上--from beginning參數,那么他會接收到從開始就生產的消息。
那么被消費后的消息能否被其他消費者消費?我們再開一個cmd,利用新的消費者消費。答案是可以的。
使用Kafka標准API
1.從start.spring.io構建項目。
2.新建包raw.api,創建類KafkaProducerDemo,這里就是讓生產者通過java api形式進行發送消息。
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.Future;
/**
* @ClassName
* @Describe Kafka Producer Demo使用Kafka原始API
* @Author 66477
* @Date 2020/6/1417:15
* @Version 1.0
*/
public class KafkaProducerDemo {
public static void main(String[] args) throws Exception {
//初始化配置
Properties properties = new Properties();
properties.setProperty("bootstrap.servers","localhost:9092");
properties.setProperty("key.serializer", StringSerializer.class.getName());
properties.setProperty("value.serializer",StringSerializer.class.getName());//注意引包
//創建Kafka Producer
KafkaProducer<String,String> kafkaProducer = new KafkaProducer(properties);
//創建 Kafka消息
String topic = "gupao";
Integer partition=0;
Long timestamp= System.currentTimeMillis();
String key="message-key";
String value = "gupao.com";
ProducerRecord<String,String> record = new ProducerRecord<String, String>(topic,partition,timestamp,key,value);
//發送Kafka消息
Future<RecordMetadata> metadataFuture = kafkaProducer.send(record);
//強制執行
metadataFuture.get();
}
}
3.運行以上代碼,然后你會發現,cmd窗口的消費者會接收消息。
二、Spring Kafka
那么接下來我們使用Spring整合的kafka。
官方文檔
設計模式
Spring社區對data數據操作,有一個基本的模式,Template模式:
- JDBC:jdbcTemplate
- Redis:RedisTemplate
- Kafka:KafkaTemplate
- JMS:JmsTemplate
- Rest:RestTemplate
XXXTemplate一定實現XXXOpeations
Maven依賴
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
三、Spring Boot Kafka
Maven依賴
自動裝配器
KafkaAutoConfiguration
其中KafkaTemplate會被自動裝配:
@Bean
@ConditionalOnMissingBean({KafkaTemplate.class})
public KafkaTemplate<?, ?> kafkaTemplate(ProducerFactory<Object, Object> kafkaProducerFactory, ProducerListener<Object, Object> kafkaProducerListener, ObjectProvider<RecordMessageConverter> messageConverter) {
KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate(kafkaProducerFactory);
messageConverter.ifUnique(kafkaTemplate::setMessageConverter);
kafkaTemplate.setProducerListener(kafkaProducerListener);
kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic());
return kafkaTemplate;
}
關閉Spring Security
依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-security</artifactId>
</dependency>
import org.springframework.context.annotation.Configuration;
import org.springframework.security.config.annotation.web.builders.WebSecurity;
import org.springframework.security.config.annotation.web.configuration.EnableWebSecurity;
import org.springframework.security.config.annotation.web.configuration.WebSecurityConfigurerAdapter;
@Configuration
@EnableWebSecurity
public class SecurityConfig extends WebSecurityConfigurerAdapter {
@Override
public void configure(WebSecurity web) throws Exception {
web.ignoring().antMatchers("/**");
}
}
創建生產者
1.我們繼續在上面的項目動刀子,我們先在application.properties文件轉移之前demo類中配置。
#定義應用名稱
spring.application.name=spring-cloud-stream-kafka
#配置端口
server.port=8080
#Spring Kafka配置信息
spring.kafka.bootstrap-servers=localhost:9092
#配置需要的kafka主題
kafka.topic = gupao
#生產者配置
spring.kafka.producer.bootstrap-servers=localhost:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
2.然后編寫一個controller類。
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
/**
* @ClassName
* @Describe Kafka生產者Controller
* @Author 66477
* @Date 2020/6/1418:07
* @Version 1.0
*/
@RestController
public class KafkaProducerController {
private final KafkaTemplate<String,String> kafkaTemplate;
private final String topic;
@Autowired
public KafkaProducerController(KafkaTemplate<String, String> kafkaTemplate,
@Value("${kafka.topic}") String topic) {
this.kafkaTemplate = kafkaTemplate;
this.topic = topic;
}
@PostMapping("/message/send")
public Boolean sendMessage(@RequestParam String message){
kafkaTemplate.send(topic,message);
return true;
}
}
3.通過postman訪問http://localhost:8080/message/send。
4.打開cmd消費端窗口,發現消息成功接收。
創建消費者
5.同樣地,我們開始配置消費者,先去application.properties文件增加消費者配置。
#消費者配置
spring.kafka.consumer.group-id=gupao-1
spring.kafka.consumer.key-Derializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-Derializer=org.apache.kafka.common.serialization.StringDeserializer
6.因為消費者它是以監聽的形式監聽消息的,所以我們創建一個KafkaConsumerListener監聽類。通過@KafkaListener來監聽改主題的消息。
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
/**
* @ClassName
* @Describe Kafka消費者監聽器
* @Author 66477
* @Date 2020/6/1418:25
* @Version 1.0
*/
@Component
public class KafkaConsumerListener {
@KafkaListener(topics ="${kafka.topic}" )
public void onMessage(String message){
System.out.println("Kafka消費者監聽器接收到消息:"+message);
}
}
7.隨后postman訪問http://localhost:8080/message/send。控制台則會打印出:
2020-06-14 18:32:30.970 INFO 451856 --- [nio-8080-exec-2] o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.5.0
2020-06-14 18:32:30.970 INFO 451856 --- [nio-8080-exec-2] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 66563e712b0b9f84
2020-06-14 18:32:30.971 INFO 451856 --- [nio-8080-exec-2] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1592130750970
2020-06-14 18:32:30.976 INFO 451856 --- [ad | producer-1] org.apache.kafka.clients.Metadata : [Producer clientId=producer-1] Cluster ID: i1-NXUmvQRyaT-E27LPozQ
Kafka消費者監聽器接收到消息:hello world
四、Spring Cloud Stream
加上本章中的stream,上一篇中的架構圖又豐富了些東西。
其中
- RabbitMQ:AMQP、jms規范。
- kafka:相對松散的消息隊列協議
基本概念
Source:來源,近義詞:Producer,Publisher
Sink:接收器,近義詞:Consumer,Subcriber
Processor:對於上流而言是Sink,對於下流而言是Source
Reactive Streams
- Publisher
- Subscriber
- Processor
代碼示例
1.我們拷貝上面的spring cloud stream kafka項目,導入IDEA。
2.啟動zookeeper,參考以上。
3.啟動kafka,參考以上。
4.我們需要引入spring cloud stream依賴。
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
5.創建一個stream包,包下再創建producer包,創建一個類MessageProducerBean
消息大致分為兩個部分,消息頭和消息體。
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
/**
* @ClassName
* @Describe TODO
* @Author 66477
* @Date 2020/6/1421:58
* @Version 1.0
*/
@Component
@EnableBinding(Source.class)
public class MessageProducerBean {
@Autowired
@Qualifier(Source.OUTPUT)
private MessageChannel messageChannel;
@Autowired
private Source source;
/**
* 發送消息
* @param message 消息內容
*/
public void send(String message){
//通過消息管道發送消息
source.output().send(MessageBuilder.withPayload(message).build());
}
}
改寫一下我們之前寫的controller,增加另一種方式的接口。
import com.example.stream.producer.MessageProducerBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
/**
* @ClassName
* @Describe Kafka生產者Controller
* @Author 66477
* @Date 2020/6/1418:07
* @Version 1.0
*/
@RestController
public class KafkaProducerController {
private final KafkaTemplate<String,String> kafkaTemplate;
private final String topic;
private final MessageProducerBean messageProducerBean;
@Autowired
public KafkaProducerController(KafkaTemplate<String, String> kafkaTemplate,
@Value("${kafka.topic}") String topic, MessageProducerBean messageProducerBean) {
this.kafkaTemplate = kafkaTemplate;
this.topic = topic;
this.messageProducerBean = messageProducerBean;
}
/**
* 通過KafkaTemplate發送{@link KafkaTemplate}
* @param message
* @return
*/
@PostMapping("/message/send")
public Boolean sendMessage(@RequestParam String message){
kafkaTemplate.send(topic,message);
return true;
}
/**
* 通過消息生產者Bean發送{@link com.example.stream.producer.MessageProducerBean}
* @param message
* @return
*/
@GetMapping("/message/send")
public Boolean send(@RequestParam String message){
messageProducerBean.send(message);
return true;
}
}
6.我們需要給引入依賴
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
之前沒有加上spring cloud 版本,現在要加上:
<properties>
<java.version>1.8</java.version>
<spring-cloud.version>Hoxton.BUILD-SNAPSHOT</spring-cloud.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
7.這時我們再啟動cmd中的consumer消費者。
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic gupao
8.注釋掉之前配置的生產者序列化。
#生產者配置
#spring.kafka.producer.bootstrap-servers=localhost:9092
#spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
#spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
9.Postman分別以GET,POST方式訪問http://localhost:8080/send/message,發現消費者正常收到消息。
拓展:如果想要多主題怎么辦,那么我能不能仿造Source接口,搭建一個屬於自己的管道呢?我們也在stream包下創建一個message包,message包下創建一個接口(仿Source)MyMessageSource.
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
/**
* @ClassName
* @Describe TODO
* @Author 66477
* @Date 2020/6/1423:27
* @Version 1.0
*/
public interface MyMessagesSource {
/**
* 消息來源的管道名稱
*/
String NAME="gupao";
@Output(NAME)
MessageChannel gupao();
}
然后我們仿造之前寫的MessageProducerBean,再整一套自己的,也就是自定義消息發送源。
import com.example.stream.message.MyMessagesSource;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
/**
* @ClassName
* @Describe TODO
* @Author 66477
* @Date 2020/6/1421:58
* @Version 1.0
*/
@Component
@EnableBinding({Source.class, MyMessagesSource.class})
public class MessageProducerBean {
@Autowired
@Qualifier(Source.OUTPUT)
private MessageChannel messageChannel;
@Autowired
private Source source;
@Autowired
@Qualifier(MyMessagesSource.NAME)//Bean名稱
private MessageChannel gupaoMessageChannel;
@Autowired
private MyMessagesSource myMessagesSource;
/**
* 發送消息
* @param message 消息內容
*/
public void send(String message){
//通過消息管道發送消息
source.output().send(MessageBuilder.withPayload(message).build());
}
/**
* 發送消息
* @param message 消息內容
*/
public void sendToGupao(String message){
//通過消息管道發送消息
myMessagesSource.gupao().send(MessageBuilder.withPayload(message).build());
}
}
在application.properties文件增加一行屬於自己的主題配置
spring.cloud.stream.bindings.gupao.destination=mygupao
這時去消費者監聽類里面增加監聽主題。
@KafkaListener(topics ="mygupao" )
public void onGupaoMessage(String message){
System.out.println("Kafka消費者監聽器接收到主題mygupao消息:"+message);
}
我們去cmd黑窗口,把剛剛主題為gupao的停掉,改成mygupao主題監聽。
E:\JavaEE\kafka\bin\windows>kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic mygupao
最后去controller類增加一個接口,用於發送消息至我們新創建的管道。
/**
* 通過消息生產者Bean發送{@link com.example.stream.producer.MessageProducerBean}
* @param message
* @return
*/
@GetMapping("/message/sendToGupao")
public Boolean sendToGupao(@RequestParam String message){
messageProducerBean.sendToGupao(message);
return true;
}
由於我之前殺死過8080端口,導致zookeeper進程被殺了(它也是8080端口),所以我將stream項目改成8081,重新啟動了zookeeper,postman測試一下我們http://localhost:8081/message/sendToGupao。控制台信息如下:
2020-06-15 00:00:24.170 INFO 171780 --- [nio-8081-exec-3] o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.5.0
2020-06-15 00:00:24.170 INFO 171780 --- [nio-8081-exec-3] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 66563e712b0b9f84
2020-06-15 00:00:24.170 INFO 171780 --- [nio-8081-exec-3] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1592150424170
2020-06-15 00:00:24.174 INFO 171780 --- [ad | producer-3] org.apache.kafka.clients.Metadata : [Producer clientId=producer-3] Cluster ID: i1-NXUmvQRyaT-E27LPozQ
Kafka消費者監聽器接收到主題mygupao消息:mygupaoaaa
cmd窗口如下:
同樣地,我們也可以創建一個消息消費Bean用於接收消息。
在stream包下繼續創建一個consumer包,包下創建名為MessageConsumerBean的Bean。用來實現標准Sink監聽,
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
/**
* @ClassName
* @Describe 消息消費Bean
* @Author 66477
* @Date 2020/6/1520:25
* @Version 1.0
*/
@Component
@EnableBinding({Sink.class})
public class MessageConsumerBean {
@Autowired
@Qualifier(Sink.INPUT)//Bean名稱
private SubscribableChannel subscribableChannel;
@Autowired
private Sink sink;
//那么訂閱消息有多種方式
//方式一:通過SubscribableChannel訂閱消息
//當字段注入完成后的回調
@PostConstruct
public void init(){
subscribableChannel.subscribe(new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
System.out.println(message.getPayload());
}
});
}
//方式二:通過@ServiceActivator方式訂閱消息
@ServiceActivator(inputChannel = Sink.INPUT)
public void onMessage(Object message){
System.out.println("onMessage :"+message);
}
//方式三:通過@StreamListener實現
@StreamListener(Sink.INPUT)
public void onMessage(String message){
System.out.println("StreamListener:"+message);
}
}
application.properties下也要增加對應的input主題項了。
spring.cloud.stream.bindings.input.destination=${kafka.topic}
五、Spring Cloud Stream Kafka Binder(RabbitMQ)
我們復制一下上面的項目,准備為stream rabbitmq做准備。重命名為spring-cloud-stream-rabbitmq重新導入IDEA,里面pom文件的artifactId也要修改。清除掉有關kafka的代碼,application.properties清除關於kafka的生產者,消費者配置。
現在項目結構如下:
修改依賴為
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
其中MessageConsumerBean
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
/**
* @ClassName
* @Describe 消息消費Bean
* @Author 66477
* @Date 2020/6/1520:25
* @Version 1.0
*/
@Component
@EnableBinding({Sink.class})
public class MessageConsumerBean {
@Autowired
@Qualifier(Sink.INPUT)//Bean名稱
private SubscribableChannel subscribableChannel;
@Autowired
private Sink sink;
//方式一:通過Subcribe訂閱消息
//當字段注入完成后的回調
@PostConstruct
public void init(){
//實現異步回調
subscribableChannel.subscribe(new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
System.out.println("subscribe:"+message.getPayload());
}
});
}
//方式二:通過@ServiceActivator
@ServiceActivator(inputChannel = Sink.INPUT)
public void onMessage(Object message){
System.out.println("onMessage :"+message);
}
//方式三:通過@StreamListener實現
@StreamListener(Sink.INPUT)
public void onMessage(String message){
System.out.println("StreamListener:"+message);
}
}
MyMessagesSource
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
/**
* @ClassName
* @Describe TODO
* @Author 66477
* @Date 2020/6/1423:27
* @Version 1.0
*/
public interface MyMessagesSource {
/**
* 消息來源的管道名稱
*/
String NAME="gupao";
@Output(NAME)
MessageChannel gupao();
}
MessageProducerBean
import com.example.rabbitmq.stream.message.MyMessagesSource;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
/**
* @ClassName
* @Describe TODO
* @Author 66477
* @Date 2020/6/1421:58
* @Version 1.0
*/
@Component
@EnableBinding({Source.class, MyMessagesSource.class})
public class MessageProducerBean {
@Autowired
@Qualifier(Source.OUTPUT)
private MessageChannel messageChannel;
@Autowired
private Source source;
@Autowired
@Qualifier(MyMessagesSource.NAME)//Bean名稱
private MessageChannel gupaoMessageChannel;
@Autowired
private MyMessagesSource myMessagesSource;
/**
* 發送消息
* @param message 消息內容
*/
public void send(String message){
//通過消息管道發送消息
source.output().send(MessageBuilder.withPayload(message).build());
}
/**
* 發送消息
* @param message 消息內容
*/
public void sendToGupao(String message){
//通過消息管道發送消息
myMessagesSource.gupao().send(MessageBuilder.withPayload(message).build());
}
}
MessageProducerController
import com.example.rabbitmq.stream.producer.MessageProducerBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
/**
* @ClassName
* @Describe Rabbitmq生產者Controller
* @Author 66477
* @Date 2020/6/1418:07
* @Version 1.0
*/
@RestController
class MessageProducerController {
private final MessageProducerBean messageProducerBean;
private final String topic;
MessageProducerController(MessageProducerBean messageProducerBean, @Value("${kafka.topic}") String topic) {
this.messageProducerBean = messageProducerBean;
this.topic = topic;
}
/**
* 通過消息生產者Bean發送{@link MessageProducerBean}
* @param message
* @return
*/
@GetMapping("/messageProducer/send")
public Boolean send(@RequestParam String message){
messageProducerBean.send(message);
return true;
}
/**
* 通過消息生產者Bean發送{@link MessageProducerBean}
* @param message
* @return
*/
@GetMapping("/message/sendToGupao")
public Boolean sendToGupao(@RequestParam String message){
messageProducerBean.sendToGupao(message);
return true;
}
}
application.properties
#定義應用名稱
spring.application.name=spring-cloud-stream-rabbitmq
#配置端口
server.port=8081
#Spring Kafka配置信息
spring.kafka.bootstrap-servers=localhost:9092
#配置需要的kafka主題
kafka.topic = gupao
#定義Spring Cloud Stream Source消息去向
#針對kafka而言,基本模式如下
#spring.cloud.stream.bindings.${channel-name}.destination=${kafka.topic}
spring.cloud.stream.bindings.output.destination=${kafka.topic}
spring.cloud.stream.bindings.gupao.destination=mygupao
spring.cloud.stream.bindings.input.destination=${kafka.topic}
六、問題總結
1.當時用Future時,異步調用都可以使用get()方式強制執行嗎?
解答:是的,get等待當前線程執行完畢,並且獲取返回接口。
2.@KafkaListener和kafka consumer有啥區別?
解答:沒有實質區別,主要是編程模式。
@KafkaListener采用注解驅動
kafka consumer API 采用接口編程。
3.消費者接收消息的地方在哪?
解答:訂閱並且處理后就消失了。
4.生產環境配置多個生產者和消費者只需要定義不同的group就可以了嗎?
解答:group是一種,要看是不是相同topic。
5.為了不丟失數據,消息隊列的容錯,和排錯后的處理,如何實現的?
解答這個依賴於zookeeper。
6.異步接收除了打印還有什么辦法處理消息嗎?
解答:可以處理其他邏輯,比如存儲數據庫。
7.kafka適合什么場景下使用?
解答:高性能的Stream處理。
8.Kafka消息一直都在,內存占用會很多吧,消息量不停產生消息咋辦?
解答:kafka還是會刪除的,並不是一直存在。
9.怎么沒看到broker配置?
解答:broker不需要設置,它是單獨啟動。
10.consumer為什么要分組?
解答:consumer需要定義不同邏輯分組,相同主題里面不同分組,便於管理。
11.@EnableBinding有什么用?
解答:@EnableBinding將Source、Sink以及Processor提升成相應的代理.
12.@Autowired Source source 這種寫法是默認用官方的實現?
解答:是官方的實現。
13.這么多消息框架,各自有點是什么,怎么選取?
解答:RabbitMQ:AMQP,JMS規范
kafka:相對松散的消息隊列協議
ActiveMQ:AMQP,JMS規范
14.如果中間件有問題怎么辦,我們只管用,不用維護嗎?現在遇到的很多問題不是使用,而是俄日胡,中間件一有問題,消息堵塞或者丟失,只有重啟?
解答:消息中間件無法保證不丟消息,多數高一致性的消息背會還是有持久化的。
15.@EnableBinder,@EnableZuulProxy,@EnableDiscoverClient這些注解都是通過特定BeanPostProcessor實現的嗎?
解答:不完全對,主要處理接口在@Import:
- ImportSelector實現類
- ImportBeanDefinitionsRegistrar實現類
- @Configuration標注類
- BeanPostProcessor實現類
16.我對流式處理還是懵懵的,到底啥事流式處理,怎樣才能稱為流式處理,一般應用在什么場景?
解答:Stream處理簡單的說,異步處理,消息是一種處理方式。
提交申請,機器生產,對於高密度提交任務,多數場景采用異步處理,Stream,Evnet-Driven。舉例說明:審核流程,鑒別黃圖。
17.如果是大量消息,怎么快速消費,用多線程嗎?
解答:確實是使用多線程,不過不一定奏效,依賴於處理的具體內容,比如:一個線程使用了25%的CPU,四個線程就將cpu耗盡,因此,並發100個處理,實際上還是4個線程在處理。I/O密集型,CPU密集型。大多數是多線程,其實也單線程,流式非阻塞。
18.購物車的價格計算可以使用流式計算來處理嗎?能說下思路嗎?有沒有什么高性能的方式推薦?
解答:當商品添加到購物車的時候,就可以開始計算了。