環境:
Windows 10,
Ubuntu 18.04 (虛擬機)
Java 8
Spring Boot 2.5.2,其下的 spring-kafka 版本 2.7.3,kafka-streams 版本 2.7.1
apache-zookeeper-3.7.0,單機 standalone
kafka_2.13-2.8.0,單機 standalone
Eclipse Version: 2021-03 (4.19.0):開發工具
Poatman:接口調用工具 來自#博客園
---
兩個Spring Boot項目:
1、nothing - 非Web項目
執行kafka stream程序,從 1#主題(TOPIC) 接收信息,再傳到 2#主題。
2、web - Web-Servlet項目
向 1#主題 發送消息;
接收 1#主題 的消息;
接收 2#主題 的消息——kafka stream處理過的;
---
關於kafka主題的創建:
在本文中,主題都是 自動創建的——web項目啟動后,發送消息給主題時創建、監聽主題時給創建。
nothing項目 必須在 web項目 啟動,並執行相關操作 生成 主題 后 才可以使用。
自動創建 的主題 是有限制的——只有一個分區?需要看文檔。
注,本文寫成時,作者對 kafka還不是太熟,就是 水平有限 的意思,本君盡量避免錯誤描述。
Ubuntu上運行 ZooKeeper:
停止:
<!-- Kafka --> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
spring-kafka包 的內容展示:
配置 application.properties:來自#博客園
# # ubuntu's ip ubuntu.ip=192.168.151.81 # # Kafka spring.kafka.bootstrap-servers=${ubuntu.ip}:9092 spring.kafka.consumer.group-id=myGroup
生產者——調用接口:
1 @RestController 2 @RequestMapping(path="/kafka") 3 public class KafkaProducer { 4 5 @Autowired 6 private KafkaTemplate<String, Object> kafkaTemplate; 7 8 // 發送消息接口 9 @GetMapping(path="/sendNormal")10 public void sendMessage1(@RequestParam String msg) { 11 kafkaTemplate.send("topic01", msg); 12 } 13 }
消費者——@KafkaListener:
1 @Component 2 public class MyKafkaConsumer { 3 4 // 消費監聽 5 @KafkaListener(topics= {"topic01"}) 6 public void onMessage1(ConsumerRecord<?, ?> record) { 7 // 消費的哪個topic、partition的消息,打印出消息內容 8 System.out.println("kafka簡單消費:topic=" + record.topic() + ", partition=" + record.partition() 9 + ", offset=" + record.offset() + "-" + record.value()); 10 } 11 }
消費者輸出結果:
kafka簡單消費:topic=topic01, partition=0, offset=1-that is a test,在中國
# 事務
spring.kafka.producer.transaction-id-prefix=tx # tx可以是其它名稱
運行web項目,調用生產者發送接口報錯:
java.lang.IllegalStateException: No transaction is in process; possible solutions...省略
在生產者放接口添加@Transactional,再次調用,仍然報錯:
1 @GetMapping(path="/sendNormal") 2 @Transactional 3 public void sendMessage1(@RequestParam String msg) { 4 kafkaTemplate.send(tsrc, msg); 5 6 }
錯誤信息:
org.springframework.beans.factory.NoUniqueBeanDefinitionException: No qualifying bean of type 'org.springframework.transaction.TransactionManager'
available: expected single matching bean but found 2: transactionManager,kafkaTransactionManager
注意,看了一些其它文章,此時應該執行成功了。這里Spring容器中出現了兩個 事務manager——檢查Spring容器(ApplicationContext中 Arrays.toString(context.getBeanDefinitionNames()) ),怎么辦呢?
修改@Transactional,指定使用 kafkaTransactionManager:
1 @GetMapping(path="/sendNormal") 2 @Transactional(transactionManager = "kafkaTransactionManager") 3 public void sendMessage1(@RequestParam String msg) { 4 kafkaTemplate.send(tsrc, msg); 5 6 }
再次調用接口發送,成功,監聽器輸出下面的信息:來自#博客園
kafka簡單消費:topic=topic01, partition=0, offset=7-事務transaction消息
說明,
將 transactionManager 配置為 另一個Bean transactionManager 也執行成功。
事務的基本用法有了,接下來 驗證事務的有效性。
驗證代碼:消息中包含error時,拋出異常
1 @GetMapping(path="/sendThreeInTrans") 2 @Transactional(transactionManager = "kafkaTransactionManager") 3 public void sendThreeInTrans(@RequestParam String msg) { 4 kafkaTemplate.send(tsrc, msg + "-1"); 5 kafkaTemplate.send(tsrc, msg + "-2"); 6 7 if (msg.contains("error")) { 8 throw new RuntimeException("事務執行失敗"); 9 } 10 11 kafkaTemplate.send(tsrc, msg + "-3"); 12 }
msg = 正確的事務transaction消息 時,輸出:3條消息都被監聽到了
kafka簡單消費:topic=topic01, partition=0, offset=9-正確的事務transaction消息-1 kafka簡單消費:topic=topic01, partition=0, offset=10-正確的事務transaction消息-2 kafka簡單消費:topic=topic01, partition=0, offset=11-正確的事務transaction消息-3
msg = 錯誤的事務transaction消息error 時,輸出:沒有消息被監聽到
測試將 transactionManager 配置為 transactionManager,得到了相同的效果。
疑問:
既然兩個都有效,是否不需要在 application.properties 中配置就可以使用事務呢?
測試結果是 不可以。
去掉配置后,發送錯誤消息的結果:異常發生前的兩條消息 成功發送到了 kafka 並被監聽器接收到了
檢查spring容器中的Bean,去掉配置后,就沒有 kafkaTransactionManager 這個Bean了。
檢查 兩個Bean 的類型:transactionManager,kafkaTransactionManager
通過ApplicationContext檢查:
1 System.out.println(context.getBean("transactionManager")); 2 System.out.println(context.getBean("kafkaTransactionManager")); 3 System.out.println(context.getBean("transactionManager").getClass()); 4 System.out.println(context.getBean("kafkaTransactionManager").getClass());
結果:
org.springframework.orm.jpa.JpaTransactionManager@4cddc3d9 org.springframework.kafka.transaction.KafkaTransactionManager@673fdc28 class org.springframework.orm.jpa.JpaTransactionManager class org.springframework.kafka.transaction.KafkaTransactionManager
原來,一個時 JPA的,一個時 Kafka的啊。
不好意思,我的項目有依賴 spring-boot-starter-data-jpa ,這才導致了 上面的錯誤——出現兩個bean。
org.springframework.transaction.TransactionManager接口及其子孫依賴結構:其中就有 JpaTransactionManager、KafkaTransactionManager
在其它博文中,還提到另外一種執行事務的方式:
kafkaTemplate.executeInTransaction(...)
事務的事情,還沒完呢!
上面是在 application.properties 中配置開啟了 kafka事務,下面介紹另一種方式。
注釋掉配置:
# 事務
#spring.kafka.producer.transaction-id-prefix=tx
添加kafka配置類:
1 @Configuration 2 public class WebKafkaConfig { 3 4 @Value("${spring.kafka.bootstrap-servers}") 5 private String servers; 6 7 @Bean 8 public ProducerFactory producerFactory() { 9 DefaultKafkaProducerFactory factory = new DefaultKafkaProducerFactory(this.senderProps()); 10 factory.transactionCapable(); 11 factory.setTransactionIdPrefix("code-tx-"); 12 return factory; 13 } 14 15 @Bean 16 public KafkaTransactionManager transactionManager(ProducerFactory producerFactory) { 17 KafkaTransactionManager manager = new KafkaTransactionManager(producerFactory); 18 return manager; 19 } 20 21 private Map<String, Object> senderProps() { 22 Map<String, Object> props = new HashMap<>(); 23 24 //連接地址 25 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); 26 //重試,0為不啟用重試機制 27 props.put(ProducerConfig.RETRIES_CONFIG, 1); 28 //acks=all :只有當所有參與復制的節點全部收到消息時,生產者才會收到一個來自服務器的成功響應。 29 props.put(ProducerConfig.ACKS_CONFIG, "all"); 30 // 設置冪等性 31 props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); 32 33 // 沒有時報錯: 34 // Missing required configuration "key.serializer" which has no default value. 35 //鍵的序列化方式 36 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); 37 //值的序列化方式 38 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); 39 40 return props; 41 } 42 }
16行:
這個函數的名稱為 transactionManager,生產的Bean也是這個名字,此時,不會出現 兩個 transactionManager Bean的情況了。
監聽器 直接使用 @Transactional 即可——但卻覆蓋了JpaTransactionManager實例:
修改函數名為 myKafkaTransactionManager,再配置@Transactional(transactionManager = "myKafkaTransactionManager"),運行程序。
測試事務:測試成功。
疑問:
但是,自定義了上面的 KafkaTransactionManager 后,屬於 JPA 的 transactionManager 消失了。
此時,使用JPA要用到 transactionManager 會不會報錯呢?需要進一步驗證,TODO
事務總結:
1、2種配置方式
2、兩種使用方式
更進一步:
學習spring、spring boot的事務機制;
@Transactional 的函數中,既包括kafka,又包括 數據庫操作,是否也有效呢?一旦錯誤發生,大家都執行失敗?
<!-- Kafka stream --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> </dependency>
包結構:
1 public static void startKafkaStream3() { 2 Properties props = new Properties(); // 基本配置 3 props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe"); 4 props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.242.81:9092"); 5 props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); 6 props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); 7 8 final StreamsBuilder builder = new StreamsBuilder(); 9 10 builder.stream("streams-plaintext-input") // 消息來源主題 11 .mapValues((v)->{ // 除了mapValues,還有很多 處理函數,還可以建立自定義的 org.apache.kafka.streams.processor.Processor 類 12 return v + "-suffix"; 13 }) 14 .to("streams-pipe-output"); // 處理后的消息目的地主題 15 16 final Topology topology = builder.build(); // 很重要的概念,拓撲 17 18 final KafkaStreams streams = new KafkaStreams(topology, props); 19 final CountDownLatch latch = new CountDownLatch(1); 20 21 // attach shutdown handler to catch control-c 22 Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") { 23 @Override 24 public void run() { 25 streams.close(); // 關閉kafka stream,,居然可以這么玩! 26 latch.countDown(); 27 } 28 }); 29 30 try { 31 streams.start(); // 啟動kafka stream計算 32 latch.await(); 33 } catch (Throwable e) { 34 System.exit(1); 35 } 36 System.exit(0); 37 }
11~13行是 官網沒有的,用來給 接收的消息添加后綴。來自#博客園
nothing項目啟動后,執行上面的程序即可(第一個項目——關鍵)。
在 web項目 開發:
不需要依賴 kafka-streams包,繼續上面的 程序開發即可。
1、消息發送接口 到 主題 streams-plaintext-input 來自#博客園
1 @GetMapping(path="/sendNormal2") 2 @Transactional 3 public void sendNormal2(@RequestParam String msg) { 4 kafkaTemplate.send("streams-plaintext-input", msg); 5 }
2、監聽 主題 streams-pipe-output 的消息
@KafkaListener(topics= {"streams-pipe-output"}) public void onMessage1(ConsumerRecord<?, ?> record) { // 消費的哪個topic、partition的消息,打印出消息內容 System.out.println("kafka簡單消費:topic=" + record.topic() + ", partition=" + record.partition() + ", offset=" + record.offset() + "-" + record.value()); }
測試結果:
測試發送了 4條消息,都成功添加了后綴,並從 主題 streams-pipe-output 獲取成功。
本篇博文就這么愉快地結束吧!Kafka原來這么強大啊,還要繼續挖掘才是。來自#博客園
0、kafka官網
1、Spring-Kafka(五)—— 使用Kafka事務的兩種方式
作者:海苔胖胖
2、【Kafka】- KafkaStream wordcount 案例
程序一直起不來,報下面一些錯誤:
2021-07-19 11:43:39.611 WARN 7242 --- [ main] o.a.k.s.p.internals.StateDirectory :
Using /tmp directory in the state.dir property can cause failures with writing the checkpoint file due to the fact that this directory can be cleared by the OS # windows 10 上 下面兩條錯誤,,到Ubuntu運行就沒有了 來自#博客園 2021-07-19 11:08:35.534 ERROR 14520 --- [ main] o.a.k.s.p.internals.StateDirectory :
Failed to change permissions for the directory \tmp\kafka-streams 2021-07-19 11:08:35.535 ERROR 14520 --- [ main] o.a.k.s.p.internals.StateDirectory :
Failed to change permissions for the directory \tmp\kafka-streams\streams-wordcount 2021-07-19 11:19:53.102 ERROR 14056 --- [ms-close-thread] o.a.k.s.p.internals.StateDirectory :
Some task directories still locked while closing state, this indicates unclean shutdown: {} # TODO
作者:久七年
很好的博文,可是,第一個程序運行就遇到故障——不知道怎么讀取 目標主題topic02 的數據。
報了下面的一些錯誤:序列化、反序列哈的問題吧,還需探究
這篇博文中介紹的比較全面。
Failed to convert from type [java.util.ArrayList<?>] to type [org.apache.kafka.clients.consumer.ConsumerRecord<?, ?>] for value '[2, 2, 2, 2, 2,... nested exception is org.springframework.core.convert.ConverterNotFoundException: No converter found capable of converting
from type [java.lang.Integer] to type [org.apache.kafka.clients.consumer.ConsumerRecord<?, ?>], failedMessage=GenericMessage
[payload=[2, 2, 2, 2, 2, 2, 2, 2, 2,... Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.util.ArrayList]
to [org.apache.kafka.clients.consumer.ConsumerRecord] for GenericMessage [payload=[[]], headers={kafka_offset=[14],
kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@31b53850, kafka_timestampType=[CREATE_TIME], kafka_receivedPartitionId=[0],
kafka_receivedMessageKey=[null], kafka_batchConvertedHeaders=[{}], kafka_receivedTopic=[topic1], kafka_receivedTimestamp=[1626514480655],
kafka_groupId=myGroup}]
作者:努力的小強
基礎概念、功能介紹。
學了好幾天,寫了三小時,還不錯。