從Kafka發消息到第一個kafka stream程序(基礎)


環境:

Windows 10,

Ubuntu 18.04 (虛擬機)

Java 8

Spring Boot 2.5.2,其下的 spring-kafka 版本 2.7.3,kafka-streams 版本 2.7.1

apache-zookeeper-3.7.0,單機 standalone

kafka_2.13-2.8.0,單機 standalone

Eclipse Version: 2021-03 (4.19.0):開發工具

Poatman:接口調用工具 來自#博客園

---

 

兩個Spring  Boot項目:

1、nothing - 非Web項目

執行kafka stream程序,從 1#主題(TOPIC) 接收信息,再傳到 2#主題。

2、web - Web-Servlet項目

向 1#主題 發送消息;

接收 1#主題 的消息;

接收 2#主題 的消息——kafka stream處理過的;

---

 

關於kafka主題的創建:

在本文中,主題都是 自動創建的——web項目啟動后,發送消息給主題時創建、監聽主題時給創建。

nothing項目 必須在 web項目 啟動,並執行相關操作 生成 主題 后 才可以使用。

自動創建 的主題 是有限制的——只有一個分區?需要看文檔。

 

注,本文寫成時,作者對 kafka還不是太熟,就是 水平有限 的意思,本君盡量避免錯誤描述。

 

Ubuntu上運行 ZooKeeper:

./zkServer.sh start

停止:

./zkServer.sh stop
注,conf目錄下的 zoo_sample.cfg 復制為 zoo.cfg,並根據需要 修改其中的 dataDir 等參數。 來自#博客園
---
ZooKeeper啟動后,啟動Kafka:
修改配置文件 config/server.properties ,主要配置項:
listeners=PLAINTEXT://192.168.151.81:9092 # 某網卡的IP,配置后,可以被其它主機訪問
advertised.listeners=PLAINTEXT://192.168.151.81:9092 # 某網卡的IP,配置后,可以被其它主機訪問
log.dirs=/home/ben/kafka/logs-1 # 數據目錄
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=18000
注,根據需要配置,還有更多參數。
啟動命令:
bin/kafka-server-start.sh config/server.properties
停止命令:
1、執行 Ctrl + C (ZooKeeper正常時,可以立即停止,否則,可以通過 kill -9 PID 干掉) 來自#博客園
2、bin/kafka-server-stop.sh
---
 
目錄
 
 
一、Kafka發送消息
web項目 測試:給 1#主題 發送消息,再 使用 KafkaListener 監聽 1#主題 的消息。
 
引入 kafak依賴包:
        <!-- Kafka -->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

 

spring-kafka包 的內容展示:

 

配置 application.properties:來自#博客園

#
# ubuntu's ip
ubuntu.ip=192.168.151.81

#
# Kafka
spring.kafka.bootstrap-servers=${ubuntu.ip}:9092
spring.kafka.consumer.group-id=myGroup

生產者——調用接口:

 1 @RestController
 2 @RequestMapping(path="/kafka")
 3 public class KafkaProducer {
 4 
 5     @Autowired
 6     private KafkaTemplate<String, Object> kafkaTemplate;
 7 
 8     // 發送消息接口
 9     @GetMapping(path="/sendNormal")10     public void sendMessage1(@RequestParam String msg) {
11         kafkaTemplate.send("topic01", msg);
12     }
13 }

消費者——@KafkaListener:

 1 @Component
 2 public class MyKafkaConsumer {
 3     
 4     // 消費監聽
 5     @KafkaListener(topics= {"topic01"})
 6     public void onMessage1(ConsumerRecord<?, ?> record) {
 7         // 消費的哪個topic、partition的消息,打印出消息內容
 8         System.out.println("kafka簡單消費:topic=" + record.topic() + ", partition=" + record.partition() 
 9             + ", offset=" + record.offset() + "-" + record.value());
10     }
11 }

消費者輸出結果:

kafka簡單消費:topic=topic01, partition=0, offset=1-that is a test,在中國

 

二、Kafka事務
在一個事務進行中,如果出現異常導致事務執行失敗,則事務中所有消息發送都不會執行成功。 來自#博客園
比如,一個事務中發送了消息 1、2、3,在2和3之間發生了異常,此時,1、2、3 都不會發送成功。如果沒有事務的控制,這種情況下 1、2可以發送成功,只有3不會發送。
 
在上一節 的基礎上。
 
配置 application.properties:
# 事務
spring.kafka.producer.transaction-id-prefix=tx # tx可以是其它名稱

運行web項目,調用生產者發送接口報錯

java.lang.IllegalStateException: No transaction is in process; possible solutions...省略

在生產者放接口添加@Transactional,再次調用,仍然報錯

1     @GetMapping(path="/sendNormal")
2  @Transactional 3     public void sendMessage1(@RequestParam String msg) {
4         kafkaTemplate.send(tsrc, msg);
5         
6     }

錯誤信息:

org.springframework.beans.factory.NoUniqueBeanDefinitionException: No qualifying bean of type 'org.springframework.transaction.TransactionManager' 
available: expected single matching bean but found 2: transactionManager,kafkaTransactionManager

注意,看了一些其它文章,此時應該執行成功了。這里Spring容器中出現了兩個 事務manager——檢查Spring容器(ApplicationContext中 Arrays.toString(context.getBeanDefinitionNames()) ),怎么辦呢?

修改@Transactional,指定使用 kafkaTransactionManager:

1     @GetMapping(path="/sendNormal")
2     @Transactional(transactionManager = "kafkaTransactionManager") 3     public void sendMessage1(@RequestParam String msg) {
4         kafkaTemplate.send(tsrc, msg);
5         
6     }

再次調用接口發送,成功,監聽器輸出下面的信息:來自#博客園

kafka簡單消費:topic=topic01, partition=0, offset=7-事務transaction消息

 

說明,

將 transactionManager  配置為 另一個Bean transactionManager 也執行成功。

 

事務的基本用法有了,接下來 驗證事務的有效性

驗證代碼:消息中包含error時,拋出異常

 1     @GetMapping(path="/sendThreeInTrans")
 2     @Transactional(transactionManager = "kafkaTransactionManager")
 3     public void sendThreeInTrans(@RequestParam String msg) {
 4         kafkaTemplate.send(tsrc, msg + "-1");
 5         kafkaTemplate.send(tsrc, msg + "-2");
 6         
 7         if (msg.contains("error")) {
 8             throw new RuntimeException("事務執行失敗");
 9         }
10         
11         kafkaTemplate.send(tsrc, msg + "-3");
12     }

msg = 正確的事務transaction消息 時,輸出:3條消息都被監聽到了

kafka簡單消費:topic=topic01, partition=0, offset=9-正確的事務transaction消息-1
kafka簡單消費:topic=topic01, partition=0, offset=10-正確的事務transaction消息-2
kafka簡單消費:topic=topic01, partition=0, offset=11-正確的事務transaction消息-3

msg = 錯誤的事務transaction消息error 時,輸出:沒有消息被監聽到

 

測試將 transactionManager 配置為 transactionManager,得到了相同的效果。

 

疑問:

既然兩個都有效,是否不需要在 application.properties 中配置就可以使用事務呢

測試結果是 不可以。

去掉配置后,發送錯誤消息的結果:異常發生前的兩條消息 成功發送到了 kafka 並被監聽器接收到了

檢查spring容器中的Bean,去掉配置后,就沒有 kafkaTransactionManager 這個Bean了。

 

檢查 兩個Bean 的類型:transactionManager,kafkaTransactionManager

通過ApplicationContext檢查:

1         System.out.println(context.getBean("transactionManager"));
2         System.out.println(context.getBean("kafkaTransactionManager"));
3         System.out.println(context.getBean("transactionManager").getClass());
4         System.out.println(context.getBean("kafkaTransactionManager").getClass());

結果:

org.springframework.orm.jpa.JpaTransactionManager@4cddc3d9
org.springframework.kafka.transaction.KafkaTransactionManager@673fdc28
class org.springframework.orm.jpa.JpaTransactionManager
class org.springframework.kafka.transaction.KafkaTransactionManager

原來,一個時 JPA的,一個時 Kafka的啊。

不好意思,我的項目有依賴 spring-boot-starter-data-jpa ,這才導致了 上面的錯誤——出現兩個bean

 

org.springframework.transaction.TransactionManager接口及其子孫依賴結構:其中就有 JpaTransactionManager、KafkaTransactionManager

 

在其它博文中,還提到另外一種執行事務的方式:

kafkaTemplate.executeInTransaction(...)

 

事務的事情,還沒完呢!

上面是在 application.properties 中配置開啟了 kafka事務,下面介紹另一種方式。

 

注釋掉配置:

# 事務
#spring.kafka.producer.transaction-id-prefix=tx

添加kafka配置類:

 1 @Configuration
 2 public class WebKafkaConfig {
 3 
 4     @Value("${spring.kafka.bootstrap-servers}")
 5     private String servers;
 6     
 7     @Bean
 8     public ProducerFactory producerFactory() {
 9         DefaultKafkaProducerFactory factory = new DefaultKafkaProducerFactory(this.senderProps());
10         factory.transactionCapable();
11         factory.setTransactionIdPrefix("code-tx-");
12         return factory;
13     }
14     
15     @Bean
16     public KafkaTransactionManager transactionManager(ProducerFactory producerFactory) {
17         KafkaTransactionManager manager = new KafkaTransactionManager(producerFactory);
18         return manager;
19     }
20     
21     private Map<String, Object> senderProps() {
22         Map<String, Object> props = new HashMap<>();
23         
24         //連接地址
25         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
26         //重試,0為不啟用重試機制
27         props.put(ProducerConfig.RETRIES_CONFIG, 1);
28         //acks=all :只有當所有參與復制的節點全部收到消息時,生產者才會收到一個來自服務器的成功響應。
29         props.put(ProducerConfig.ACKS_CONFIG, "all");
30         // 設置冪等性
31         props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
32 
33         // 沒有時報錯:
34         // Missing required configuration "key.serializer" which has no default value.
35         //鍵的序列化方式
36         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
37         //值的序列化方式
38         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
39         
40         return props;
41     }
42 }

 

16行:

這個函數的名稱為 transactionManager,生產的Bean也是這個名字,此時,不會出現 兩個 transactionManager Bean的情況了。

監聽器 直接使用 @Transactional 即可——但卻覆蓋了JpaTransactionManager實例:

 

修改函數名為 myKafkaTransactionManager,再配置@Transactional(transactionManager = "myKafkaTransactionManager"),運行程序。

測試事務:測試成功。

 

疑問

但是,自定義了上面的 KafkaTransactionManager 后,屬於 JPA 的 transactionManager 消失了。

此時,使用JPA要用到 transactionManager 會不會報錯呢?需要進一步驗證,TODO

 

事務總結:

1、2種配置方式

2、兩種使用方式

 

更進一步

學習spring、spring boot的事務機制;

@Transactional 的函數中,既包括kafka,又包括 數據庫操作,是否也有效呢?一旦錯誤發生,大家都執行失敗?

 
三、Kafka Stream
 流式計算。
依賴 kafka-streams包 即可使用。
        <!-- Kafka stream -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
        </dependency>

包結構:

 
本節完成一個小試驗:
在nothing項目中,
使用 kafka stream 接收 主題  streams-plaintext-input 的消息,
給消息添加后綴"-suffix" 后,
發送到 主題 streams-pipe-output
再由 web項目的 主題 streams-pipe-output 監聽器把處理后的消息打印出來。
另外,發送消息也是調用 web項目的接口。
 
參考程序:
官網-TUTORIAL: WRITE A KAFKA STREAMS APPLICATION
 
說明,
本來應該寫一個 單詞統計程序(WordCount)的,可是,參考了好多篇博文 都沒搞定,不是這個錯,就是 另一個錯。
暫且先實現個小的 kafka stream程序,體驗下 流計算的樂趣先。
 
nothing項目,依賴 kafka-streams包,編寫 程序:
 1     public static void startKafkaStream3() {
 2         Properties props = new Properties(); // 基本配置
 3         props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe");
 4         props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.242.81:9092");
 5         props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
 6         props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
 7 
 8         final StreamsBuilder builder = new StreamsBuilder();
 9 
10         builder.stream("streams-plaintext-input") // 消息來源主題 11         .mapValues((v)->{ // 除了mapValues,還有很多 處理函數,還可以建立自定義的 org.apache.kafka.streams.processor.Processor 類
12             return v + "-suffix";
13         })
14         .to("streams-pipe-output"); // 處理后的消息目的地主題 15 
16         final Topology topology = builder.build(); // 很重要的概念,拓撲
17 
18         final KafkaStreams streams = new KafkaStreams(topology, props);
19         final CountDownLatch latch = new CountDownLatch(1);
20 
21         // attach shutdown handler to catch control-c
22         Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
23             @Override
24             public void run() {
25                 streams.close(); // 關閉kafka stream,,居然可以這么玩!
26                 latch.countDown();
27             }
28         });
29 
30         try {
31  streams.start(); // 啟動kafka stream計算
32             latch.await();
33         } catch (Throwable e) {
34             System.exit(1);
35         }
36         System.exit(0);
37     }

 

11~13行是 官網沒有的,用來給 接收的消息添加后綴。來自#博客園

nothing項目啟動后,執行上面的程序即可(第一個項目——關鍵)。

 

在 web項目 開發:

不需要依賴 kafka-streams包,繼續上面的 程序開發即可。

1、消息發送接口 到 主題 streams-plaintext-input 來自#博客園

1     @GetMapping(path="/sendNormal2")
2     @Transactional
3     public void sendNormal2(@RequestParam String msg) {
4         kafkaTemplate.send("streams-plaintext-input", msg);
5     }

 

2、監聽 主題 streams-pipe-output 的消息

    @KafkaListener(topics= {"streams-pipe-output"})
    public void onMessage1(ConsumerRecord<?, ?> record) {
        // 消費的哪個topic、partition的消息,打印出消息內容
        System.out.println("kafka簡單消費:topic=" + record.topic() + ", partition=" + record.partition() 
            + ", offset=" + record.offset() + "-" + record.value());
    }

 

測試結果:

測試發送了 4條消息,都成功添加了后綴,並從 主題 streams-pipe-output 獲取成功。

 

本篇博文就這么愉快地結束吧!Kafka原來這么強大啊,還要繼續挖掘才是。來自#博客園

 

參考資料:

0、kafka官網

1、Spring-Kafka(五)—— 使用Kafka事務的兩種方式

作者:海苔胖胖

2、【Kafka】- KafkaStream wordcount 案例

程序一直起不來,報下面一些錯誤:

2021-07-19 11:43:39.611  WARN 7242 --- [           main] o.a.k.s.p.internals.StateDirectory       : 
Using /tmp directory in the state.dir property can cause failures with writing the checkpoint file due to the fact that this directory can be cleared by the OS # windows 10 上 下面兩條錯誤,,到Ubuntu運行就沒有了 來自#博客園 2021-07-19 11:08:35.534 ERROR 14520 --- [ main] o.a.k.s.p.internals.StateDirectory :
Failed to change permissions for the directory \tmp\kafka-streams 2021-07-19 11:08:35.535 ERROR 14520 --- [ main] o.a.k.s.p.internals.StateDirectory :
Failed to change permissions for the directory \tmp\kafka-streams\streams-wordcount 2021-07-19 11:19:53.102 ERROR 14056 --- [ms-close-thread] o.a.k.s.p.internals.StateDirectory :
Some task directories still locked while closing state, this indicates unclean shutdown: {} # TODO

3、Kafka Stream 微服務領域流處理

作者:久七年

很好的博文,可是,第一個程序運行就遇到故障——不知道怎么讀取 目標主題topic02 的數據。

報了下面的一些錯誤:序列化、反序列哈的問題吧,還需探究

這篇博文中介紹的比較全面。

Failed to convert from type [java.util.ArrayList<?>] to type [org.apache.kafka.clients.consumer.ConsumerRecord<?, ?>] for value '[2, 2, 2, 2, 2,...

nested exception is org.springframework.core.convert.ConverterNotFoundException: No converter found capable of converting 
from type [java.lang.Integer] to type [org.apache.kafka.clients.consumer.ConsumerRecord<?, ?>], failedMessage=GenericMessage
[payload=[2, 2, 2, 2, 2, 2, 2, 2, 2,... Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.util.ArrayList]
to [org.apache.kafka.clients.consumer.ConsumerRecord] for GenericMessage [payload=[[]], headers={kafka_offset=[14],
kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@31b53850, kafka_timestampType=[CREATE_TIME], kafka_receivedPartitionId=[0],
kafka_receivedMessageKey=[null], kafka_batchConvertedHeaders=[{}], kafka_receivedTopic=[topic1], kafka_receivedTimestamp=[1626514480655],
kafka_groupId=myGroup}]

4、Kafka Stream

作者:努力的小強

基礎概念、功能介紹。

 

學了好幾天,寫了三小時,還不錯。 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM