微服務之路(十一)spring cloud stream


前言

場景描述

當客戶端向服務端請求,服務端返回出現了異常,對於客戶端1返回為NULL,而對於客戶端2返回的是正常數據。而服務端並不知道返回給客戶端們的數據對不對,只能通過用戶反饋來證實返回的錯誤性,顯然是不正確的。

Stream簡介

Spring Cloud Stream 是一個用來為微服務應用構建消息驅動能力的框架。它可以基於 Spring Boot 來創建獨立的、可用於生產的 Spring 應用程序。Spring Cloud Stream 為一些供應商的消息中間件產品提供了個性化的自動化配置實現,並引入了發布-訂閱、消費組、分區這三個核心概念。通過使用 Spring Cloud Stream,可以有效簡化開發人員對消息中間件的使用復雜度,讓系統開發人員可以有更多的精力關注於核心業務邏輯的處理。但是目前 Spring Cloud Stream 只支持 RabbitMQ 和 Kafka 的自動化配置。

主要議題

  • Kafka
  • Spring Kafka
  • Spring Boot Kafka
  • Spring Cloud Stream
  • Spring Cloud Stream Kafka Binder
  • 問題總結

主體內容

一、Kafka

官方地址:http://kafka.apache.org

主要用途

  • 消息中間件
  • 流式計算處理
  • 日志

執行腳本目錄bin

E:\JavaEE\kafka-2.5.0-src\kafka-2.5.0-src\bin\windows

同類產品比較

  • ActiveMQ:IMS(Java Message Service)規范實現
  • RabbitMQ:AMQP(Advanved Message Queue Protocol)規范實現
  • Kafka:並非某種規范的實現,它靈活和性能相對是優勢的

快速上手步驟

1.下載並解壓kafka壓縮包。

2.下載並解壓zookeeper壓縮包,這里官方它的quickstart就是以zookeeper保證強一致性。zookeeper官方地址:https://zookeeper.apache.org/

3.以windows為例,我們先到zookeeper的conf目錄下,把zoo_sample.cfg文件復制一份重命名為zoo.cfg。現在目錄如下所示:

然后打開cmd,進入bin目錄,啟動服務。

zkServer.cmd

4.啟動kafka。進入到kafka的window文件夾,執行啟動命令。

kafka-server-start.bat ../../config/server.properties

5.創建kafka主題。再次打開一個cmd窗口,進入到windows文件夾

kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic gupao

6.生產者發送消息/生產消息

kafka-console-producer.bat --broker-list localhost:9092 --topic gupao

然后輸入要發送的消息:

7.消費者接收消息/消費消息

重新打開一個cmd,輸入接收命令。

kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic gupao

當我在生產者端輸入消息,消費者端馬上就接收到了消息。

如果消費命令后面加上--from beginning參數,那么他會接收到從開始就生產的消息。

那么被消費后的消息能否被其他消費者消費?我們再開一個cmd,利用新的消費者消費。答案是可以的。

使用Kafka標准API

1.從start.spring.io構建項目。

2.新建包raw.api,創建類KafkaProducerDemo,這里就是讓生產者通過java api形式進行發送消息。

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.Future;

/**
 * @ClassName
 * @Describe Kafka Producer Demo使用Kafka原始API
 * @Author 66477
 * @Date 2020/6/1417:15
 * @Version 1.0
 */
public class KafkaProducerDemo {
    public static void main(String[] args) throws Exception {

        //初始化配置
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers","localhost:9092");
        properties.setProperty("key.serializer", StringSerializer.class.getName());
        properties.setProperty("value.serializer",StringSerializer.class.getName());//注意引包
        //創建Kafka Producer
        KafkaProducer<String,String> kafkaProducer = new KafkaProducer(properties);
        //創建 Kafka消息
        String topic =  "gupao";

        Integer partition=0;
        Long timestamp= System.currentTimeMillis();
        String key="message-key";
        String value = "gupao.com";
        ProducerRecord<String,String> record = new ProducerRecord<String, String>(topic,partition,timestamp,key,value);
        //發送Kafka消息
        Future<RecordMetadata> metadataFuture = kafkaProducer.send(record);
        //強制執行
        metadataFuture.get();
    }
}

3.運行以上代碼,然后你會發現,cmd窗口的消費者會接收消息。

二、Spring Kafka

那么接下來我們使用Spring整合的kafka。

官方文檔

設計模式

Spring社區對data數據操作,有一個基本的模式,Template模式:

  • JDBC:jdbcTemplate
  • Redis:RedisTemplate
  • Kafka:KafkaTemplate
  • JMS:JmsTemplate
  • Rest:RestTemplate

XXXTemplate一定實現XXXOpeations

Maven依賴

<dependency>
		<groupId>org.springframework.kafka</groupId>
		<artifactId>spring-kafka</artifactId>
</dependency>

三、Spring Boot Kafka

Maven依賴

自動裝配器

KafkaAutoConfiguration

其中KafkaTemplate會被自動裝配:

@Bean
@ConditionalOnMissingBean({KafkaTemplate.class})
public KafkaTemplate<?, ?> kafkaTemplate(ProducerFactory<Object, Object> kafkaProducerFactory, ProducerListener<Object, Object> kafkaProducerListener, ObjectProvider<RecordMessageConverter> messageConverter) {
    KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate(kafkaProducerFactory);
    messageConverter.ifUnique(kafkaTemplate::setMessageConverter);
    kafkaTemplate.setProducerListener(kafkaProducerListener);
    kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic());
    return kafkaTemplate;
}

關閉Spring Security

依賴

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-security</artifactId>
</dependency>
import org.springframework.context.annotation.Configuration;
import org.springframework.security.config.annotation.web.builders.WebSecurity;
import org.springframework.security.config.annotation.web.configuration.EnableWebSecurity;
import org.springframework.security.config.annotation.web.configuration.WebSecurityConfigurerAdapter;

@Configuration
@EnableWebSecurity
public class SecurityConfig extends WebSecurityConfigurerAdapter {
     
        @Override
        public void configure(WebSecurity web) throws Exception {
            web.ignoring().antMatchers("/**");
        }
}

創建生產者

1.我們繼續在上面的項目動刀子,我們先在application.properties文件轉移之前demo類中配置。

#定義應用名稱
spring.application.name=spring-cloud-stream-kafka
#配置端口
server.port=8080
#Spring Kafka配置信息
spring.kafka.bootstrap-servers=localhost:9092
#配置需要的kafka主題
kafka.topic = gupao
#生產者配置
spring.kafka.producer.bootstrap-servers=localhost:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

2.然后編寫一個controller類。

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

/**
 * @ClassName
 * @Describe Kafka生產者Controller
 * @Author 66477
 * @Date 2020/6/1418:07
 * @Version 1.0
 */
@RestController
public class KafkaProducerController {
    private final KafkaTemplate<String,String> kafkaTemplate;

    private final String topic;

    @Autowired
    public KafkaProducerController(KafkaTemplate<String, String> kafkaTemplate,
                                   @Value("${kafka.topic}") String topic) {
        this.kafkaTemplate = kafkaTemplate;
        this.topic = topic;
    }

    @PostMapping("/message/send")
    public Boolean sendMessage(@RequestParam String message){
        kafkaTemplate.send(topic,message);
        return  true;
    }
}

3.通過postman訪問http://localhost:8080/message/send。

4.打開cmd消費端窗口,發現消息成功接收。

創建消費者

5.同樣地,我們開始配置消費者,先去application.properties文件增加消費者配置。

#消費者配置
spring.kafka.consumer.group-id=gupao-1
spring.kafka.consumer.key-Derializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-Derializer=org.apache.kafka.common.serialization.StringDeserializer

6.因為消費者它是以監聽的形式監聽消息的,所以我們創建一個KafkaConsumerListener監聽類。通過@KafkaListener來監聽改主題的消息。

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

/**
 * @ClassName
 * @Describe Kafka消費者監聽器
 * @Author 66477
 * @Date 2020/6/1418:25
 * @Version 1.0
 */
@Component
public class KafkaConsumerListener {
    @KafkaListener(topics ="${kafka.topic}" )
    public void onMessage(String message){
        System.out.println("Kafka消費者監聽器接收到消息:"+message);
    }
}

7.隨后postman訪問http://localhost:8080/message/send。控制台則會打印出:

2020-06-14 18:32:30.970  INFO 451856 --- [nio-8080-exec-2] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.5.0
2020-06-14 18:32:30.970  INFO 451856 --- [nio-8080-exec-2] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 66563e712b0b9f84
2020-06-14 18:32:30.971  INFO 451856 --- [nio-8080-exec-2] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1592130750970
2020-06-14 18:32:30.976  INFO 451856 --- [ad | producer-1] org.apache.kafka.clients.Metadata        : [Producer clientId=producer-1] Cluster ID: i1-NXUmvQRyaT-E27LPozQ
Kafka消費者監聽器接收到消息:hello world

四、Spring Cloud Stream

加上本章中的stream,上一篇中的架構圖又豐富了些東西。

其中

  • RabbitMQ:AMQP、jms規范。
  • kafka:相對松散的消息隊列協議

基本概念

Source:來源,近義詞:Producer,Publisher

Sink:接收器,近義詞:Consumer,Subcriber

Processor:對於上流而言是Sink,對於下流而言是Source

Reactive Streams

  • Publisher
  • Subscriber
  • Processor

代碼示例

1.我們拷貝上面的spring cloud stream kafka項目,導入IDEA。

2.啟動zookeeper,參考以上。

3.啟動kafka,參考以上。

4.我們需要引入spring cloud stream依賴。

<dependency>
   <groupId>org.springframework.cloud</groupId>
   <artifactId>spring-cloud-stream</artifactId>
</dependency>

5.創建一個stream包,包下再創建producer包,創建一個類MessageProducerBean

消息大致分為兩個部分,消息頭和消息體。

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

/**
 * @ClassName
 * @Describe TODO
 * @Author 66477
 * @Date 2020/6/1421:58
 * @Version 1.0
 */
@Component
@EnableBinding(Source.class)
public class MessageProducerBean {

    @Autowired
    @Qualifier(Source.OUTPUT)
    private MessageChannel messageChannel;
    
	@Autowired
    private Source source;
    
    /**
     * 發送消息
     * @param message 消息內容
     */
    public void send(String message){
        //通過消息管道發送消息
        source.output().send(MessageBuilder.withPayload(message).build());
    }

}

改寫一下我們之前寫的controller,增加另一種方式的接口。

import com.example.stream.producer.MessageProducerBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

/**
 * @ClassName
 * @Describe Kafka生產者Controller
 * @Author 66477
 * @Date 2020/6/1418:07
 * @Version 1.0
 */
@RestController
public class KafkaProducerController {
    private final KafkaTemplate<String,String> kafkaTemplate;

    private final String topic;

    private final MessageProducerBean messageProducerBean;

    @Autowired
    public KafkaProducerController(KafkaTemplate<String, String> kafkaTemplate,
                                   @Value("${kafka.topic}") String topic, MessageProducerBean messageProducerBean) {
        this.kafkaTemplate = kafkaTemplate;
        this.topic = topic;
        this.messageProducerBean = messageProducerBean;
    }

    /**
     * 通過KafkaTemplate發送{@link KafkaTemplate}
     * @param message
     * @return
     */
    @PostMapping("/message/send")
    public Boolean sendMessage(@RequestParam String message){
        kafkaTemplate.send(topic,message);
        return  true;
    }

    /**
     * 通過消息生產者Bean發送{@link com.example.stream.producer.MessageProducerBean}
     * @param message
     * @return
     */
    @GetMapping("/message/send")
    public Boolean send(@RequestParam String message){
        messageProducerBean.send(message);
        return  true;
    }
}

6.我們需要給引入依賴

<dependency>
   <groupId>org.springframework.cloud</groupId>
   <artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>

之前沒有加上spring cloud 版本,現在要加上:

<properties>
   <java.version>1.8</java.version>
   <spring-cloud.version>Hoxton.BUILD-SNAPSHOT</spring-cloud.version>
</properties>
<dependencyManagement>
   <dependencies>
      <dependency>
         <groupId>org.springframework.cloud</groupId>
         <artifactId>spring-cloud-dependencies</artifactId>
         <version>${spring-cloud.version}</version>
         <type>pom</type>
         <scope>import</scope>
      </dependency>
   </dependencies>
</dependencyManagement>

7.這時我們再啟動cmd中的consumer消費者。

kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic gupao

8.注釋掉之前配置的生產者序列化。

#生產者配置
#spring.kafka.producer.bootstrap-servers=localhost:9092
#spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
#spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

9.Postman分別以GET,POST方式訪問http://localhost:8080/send/message,發現消費者正常收到消息。

拓展:如果想要多主題怎么辦,那么我能不能仿造Source接口,搭建一個屬於自己的管道呢?我們也在stream包下創建一個message包,message包下創建一個接口(仿Source)MyMessageSource.

import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;

/**
 * @ClassName
 * @Describe TODO
 * @Author 66477
 * @Date 2020/6/1423:27
 * @Version 1.0
 */
public interface MyMessagesSource {
    /**
     * 消息來源的管道名稱
     */
    String NAME="gupao";

    @Output(NAME)
    MessageChannel gupao();

}

然后我們仿造之前寫的MessageProducerBean,再整一套自己的,也就是自定義消息發送源。

import com.example.stream.message.MyMessagesSource;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

/**
 * @ClassName
 * @Describe TODO
 * @Author 66477
 * @Date 2020/6/1421:58
 * @Version 1.0
 */
@Component
@EnableBinding({Source.class, MyMessagesSource.class})
public class MessageProducerBean {

    @Autowired
    @Qualifier(Source.OUTPUT)
    private MessageChannel messageChannel;

    @Autowired
    private Source source;

    @Autowired
    @Qualifier(MyMessagesSource.NAME)//Bean名稱
    private MessageChannel gupaoMessageChannel;

    @Autowired
    private MyMessagesSource myMessagesSource;
    /**
     * 發送消息
     * @param message 消息內容
     */
    public void send(String message){
        //通過消息管道發送消息
        source.output().send(MessageBuilder.withPayload(message).build());
    }

    /**
     * 發送消息
     * @param message 消息內容
     */
    public void sendToGupao(String message){
        //通過消息管道發送消息
        myMessagesSource.gupao().send(MessageBuilder.withPayload(message).build());
    }

}

在application.properties文件增加一行屬於自己的主題配置

spring.cloud.stream.bindings.gupao.destination=mygupao

這時去消費者監聽類里面增加監聽主題。

@KafkaListener(topics ="mygupao" )
public void onGupaoMessage(String message){
    System.out.println("Kafka消費者監聽器接收到主題mygupao消息:"+message);
}

我們去cmd黑窗口,把剛剛主題為gupao的停掉,改成mygupao主題監聽。

E:\JavaEE\kafka\bin\windows>kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic mygupao

最后去controller類增加一個接口,用於發送消息至我們新創建的管道。

/**
 * 通過消息生產者Bean發送{@link com.example.stream.producer.MessageProducerBean}
 * @param message
 * @return
 */
@GetMapping("/message/sendToGupao")
public Boolean sendToGupao(@RequestParam String message){
    messageProducerBean.sendToGupao(message);
    return  true;
}

由於我之前殺死過8080端口,導致zookeeper進程被殺了(它也是8080端口),所以我將stream項目改成8081,重新啟動了zookeeper,postman測試一下我們http://localhost:8081/message/sendToGupao。控制台信息如下:

2020-06-15 00:00:24.170  INFO 171780 --- [nio-8081-exec-3] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.5.0
2020-06-15 00:00:24.170  INFO 171780 --- [nio-8081-exec-3] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 66563e712b0b9f84
2020-06-15 00:00:24.170  INFO 171780 --- [nio-8081-exec-3] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1592150424170
2020-06-15 00:00:24.174  INFO 171780 --- [ad | producer-3] org.apache.kafka.clients.Metadata        : [Producer clientId=producer-3] Cluster ID: i1-NXUmvQRyaT-E27LPozQ
Kafka消費者監聽器接收到主題mygupao消息:mygupaoaaa

cmd窗口如下:

同樣地,我們也可以創建一個消息消費Bean用於接收消息。

在stream包下繼續創建一個consumer包,包下創建名為MessageConsumerBean的Bean。用來實現標准Sink監聽,

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;

/**
 * @ClassName
 * @Describe 消息消費Bean
 * @Author 66477
 * @Date 2020/6/1520:25
 * @Version 1.0
 */
@Component
@EnableBinding({Sink.class})
public class MessageConsumerBean {
    @Autowired
    @Qualifier(Sink.INPUT)//Bean名稱
    private SubscribableChannel subscribableChannel;

    @Autowired
    private Sink sink;
    
     //那么訂閱消息有多種方式
     //方式一:通過SubscribableChannel訂閱消息
     //當字段注入完成后的回調
    @PostConstruct
    public void init(){
        subscribableChannel.subscribe(new MessageHandler() {
            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                System.out.println(message.getPayload());
            }
        });
    }
    
	//方式二:通過@ServiceActivator方式訂閱消息
    @ServiceActivator(inputChannel = Sink.INPUT)
    public void onMessage(Object message){
        System.out.println("onMessage :"+message);
    }
    
    //方式三:通過@StreamListener實現
    @StreamListener(Sink.INPUT)
    public void onMessage(String message){
        System.out.println("StreamListener:"+message);
    }
}

application.properties下也要增加對應的input主題項了。

spring.cloud.stream.bindings.input.destination=${kafka.topic}

五、Spring Cloud Stream Kafka Binder(RabbitMQ)

我們復制一下上面的項目,准備為stream rabbitmq做准備。重命名為spring-cloud-stream-rabbitmq重新導入IDEA,里面pom文件的artifactId也要修改。清除掉有關kafka的代碼,application.properties清除關於kafka的生產者,消費者配置。

現在項目結構如下:

修改依賴為

<dependency>
   <groupId>org.springframework.cloud</groupId>
   <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>

其中MessageConsumerBean

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;

/**
 * @ClassName
 * @Describe 消息消費Bean
 * @Author 66477
 * @Date 2020/6/1520:25
 * @Version 1.0
 */
@Component
@EnableBinding({Sink.class})
public class MessageConsumerBean {
    @Autowired
    @Qualifier(Sink.INPUT)//Bean名稱
    private SubscribableChannel subscribableChannel;

    @Autowired
    private Sink sink;

    //方式一:通過Subcribe訂閱消息
    //當字段注入完成后的回調
    @PostConstruct
    public void init(){
        //實現異步回調
        subscribableChannel.subscribe(new MessageHandler() {
            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                System.out.println("subscribe:"+message.getPayload());
            }
        });
    }

    //方式二:通過@ServiceActivator
    @ServiceActivator(inputChannel = Sink.INPUT)
    public void onMessage(Object message){
        System.out.println("onMessage :"+message);
    }

    //方式三:通過@StreamListener實現
    @StreamListener(Sink.INPUT)
    public void onMessage(String message){
        System.out.println("StreamListener:"+message);
    }

}

MyMessagesSource

import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;

/**
 * @ClassName
 * @Describe TODO
 * @Author 66477
 * @Date 2020/6/1423:27
 * @Version 1.0
 */
public interface MyMessagesSource {
    /**
     * 消息來源的管道名稱
     */
    String NAME="gupao";

    @Output(NAME)
    MessageChannel gupao();

}

MessageProducerBean

import com.example.rabbitmq.stream.message.MyMessagesSource;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

/**
 * @ClassName
 * @Describe TODO
 * @Author 66477
 * @Date 2020/6/1421:58
 * @Version 1.0
 */
@Component
@EnableBinding({Source.class, MyMessagesSource.class})
public class MessageProducerBean {

    @Autowired
    @Qualifier(Source.OUTPUT)
    private MessageChannel messageChannel;

    @Autowired
    private Source source;

    @Autowired
    @Qualifier(MyMessagesSource.NAME)//Bean名稱
    private MessageChannel gupaoMessageChannel;

    @Autowired
    private MyMessagesSource myMessagesSource;
    /**
     * 發送消息
     * @param message 消息內容
     */
    public void send(String message){
        //通過消息管道發送消息
        source.output().send(MessageBuilder.withPayload(message).build());
    }

    /**
     * 發送消息
     * @param message 消息內容
     */
    public void sendToGupao(String message){
        //通過消息管道發送消息
        myMessagesSource.gupao().send(MessageBuilder.withPayload(message).build());
    }

}

MessageProducerController

import com.example.rabbitmq.stream.producer.MessageProducerBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

/**
 * @ClassName
 * @Describe Rabbitmq生產者Controller
 * @Author 66477
 * @Date 2020/6/1418:07
 * @Version 1.0
 */
@RestController
class MessageProducerController {

    private final MessageProducerBean messageProducerBean;

    private final String topic;

    MessageProducerController(MessageProducerBean messageProducerBean, @Value("${kafka.topic}") String topic) {
        this.messageProducerBean = messageProducerBean;
        this.topic = topic;
    }


    /**
     * 通過消息生產者Bean發送{@link MessageProducerBean}
     * @param message
     * @return
     */
    @GetMapping("/messageProducer/send")
    public Boolean send(@RequestParam String message){
        messageProducerBean.send(message);
        return  true;
    }

    /**
     * 通過消息生產者Bean發送{@link MessageProducerBean}
     * @param message
     * @return
     */
    @GetMapping("/message/sendToGupao")
    public Boolean sendToGupao(@RequestParam String message){
        messageProducerBean.sendToGupao(message);
        return  true;
    }
}

application.properties

#定義應用名稱
spring.application.name=spring-cloud-stream-rabbitmq
#配置端口
server.port=8081
#Spring Kafka配置信息
spring.kafka.bootstrap-servers=localhost:9092
#配置需要的kafka主題
kafka.topic = gupao

#定義Spring Cloud Stream Source消息去向
#針對kafka而言,基本模式如下
#spring.cloud.stream.bindings.${channel-name}.destination=${kafka.topic}
spring.cloud.stream.bindings.output.destination=${kafka.topic}
spring.cloud.stream.bindings.gupao.destination=mygupao
spring.cloud.stream.bindings.input.destination=${kafka.topic}

六、問題總結

1.當時用Future時,異步調用都可以使用get()方式強制執行嗎?

解答:是的,get等待當前線程執行完畢,並且獲取返回接口。

2.@KafkaListener和kafka consumer有啥區別?

解答:沒有實質區別,主要是編程模式。

@KafkaListener采用注解驅動

kafka consumer API 采用接口編程。

3.消費者接收消息的地方在哪?

解答:訂閱並且處理后就消失了。

4.生產環境配置多個生產者和消費者只需要定義不同的group就可以了嗎?

解答:group是一種,要看是不是相同topic。

5.為了不丟失數據,消息隊列的容錯,和排錯后的處理,如何實現的?

解答這個依賴於zookeeper。

6.異步接收除了打印還有什么辦法處理消息嗎?

解答:可以處理其他邏輯,比如存儲數據庫。

7.kafka適合什么場景下使用?

解答:高性能的Stream處理。

8.Kafka消息一直都在,內存占用會很多吧,消息量不停產生消息咋辦?

解答:kafka還是會刪除的,並不是一直存在。

9.怎么沒看到broker配置?

解答:broker不需要設置,它是單獨啟動。

10.consumer為什么要分組?

解答:consumer需要定義不同邏輯分組,相同主題里面不同分組,便於管理。

11.@EnableBinding有什么用?

解答:@EnableBinding將Source、Sink以及Processor提升成相應的代理.

12.@Autowired Source source 這種寫法是默認用官方的實現?

解答:是官方的實現。

13.這么多消息框架,各自有點是什么,怎么選取?

解答:RabbitMQ:AMQP,JMS規范

kafka:相對松散的消息隊列協議

ActiveMQ:AMQP,JMS規范

14.如果中間件有問題怎么辦,我們只管用,不用維護嗎?現在遇到的很多問題不是使用,而是俄日胡,中間件一有問題,消息堵塞或者丟失,只有重啟?

解答:消息中間件無法保證不丟消息,多數高一致性的消息背會還是有持久化的。

15.@EnableBinder,@EnableZuulProxy,@EnableDiscoverClient這些注解都是通過特定BeanPostProcessor實現的嗎?

解答:不完全對,主要處理接口在@Import:

  • ImportSelector實現類
  • ImportBeanDefinitionsRegistrar實現類
  • @Configuration標注類
  • BeanPostProcessor實現類

16.我對流式處理還是懵懵的,到底啥事流式處理,怎樣才能稱為流式處理,一般應用在什么場景?

解答:Stream處理簡單的說,異步處理,消息是一種處理方式。

提交申請,機器生產,對於高密度提交任務,多數場景采用異步處理,Stream,Evnet-Driven。舉例說明:審核流程,鑒別黃圖。

17.如果是大量消息,怎么快速消費,用多線程嗎?

解答:確實是使用多線程,不過不一定奏效,依賴於處理的具體內容,比如:一個線程使用了25%的CPU,四個線程就將cpu耗盡,因此,並發100個處理,實際上還是4個線程在處理。I/O密集型,CPU密集型。大多數是多線程,其實也單線程,流式非阻塞。

18.購物車的價格計算可以使用流式計算來處理嗎?能說下思路嗎?有沒有什么高性能的方式推薦?

解答:當商品添加到購物車的時候,就可以開始計算了。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM