前面說了Java8的流,這里還說流處理,既然是流,比如水流車流,肯定得有流的源頭,源可以有多種,可以自建,也可以從應用端獲取,
今天就拿非常經典的Kafka做源頭來說事,比如要來一套應用日志實時分析框架,或者是高並發實時流處理框架,正是Kafka的拿手好戲。
作者原創文章,謝絕一切轉載!
本文只發表在"公眾號"和"博客園",其他均屬復制粘貼!如果覺得排版不清晰,請查看公眾號文章。
環境:Idea2019.03/Gradle6.0.1/JDK11.0.4/Lambda/RHEL8.0/VMWare15.5/Springboot2.2.1.RELEASE/Zookeeper3.5.5/Kafka2.3.1
難度:新手--戰士--老兵--大師
目標:
- 理解kafka原理
- Linux下kafka集群安裝
- 使用kafka操作流式處理
說明:
為了遇見各種問題,同時保持時效性,我盡量使用最新的軟件版本。代碼地址:其中的day23,https://github.com/xiexiaobiao/dubbo-project.git
第一部分——原理
1.先看看Kafka,目前kafka的發展已超出消息中間件的范疇,趨於向流平台
靠攏,先總結如下:
1.1 Scala語言編寫,若作為消息中間件,並發10W+級別,大於其他MQ;
1.2 必須有Zookeeper做協調,ZK保存消費者/生產者狀態信息,使得兩端非常輕量化;使用發布/訂閱
模式,所有消息按主題(topic)分類,使用pull
模式消費消息;
1.3 每條消息由key + value + timestamp構成,其中key用於計算目的發送分區(partition),消息記錄由不可變(immutable)的順序式Append log文件持久化消息,Append寫
方式是高吞吐率的重要支撐之一!偏移量(offset)標識消息在文件中的位置,下圖來自官網:
1.4 每條消息不論是否已被消費都將保存一個設定的時間
,這是和RabbitMQ的顯著差異;消費者僅需保存消息offset信息,可按順序消費(一個topic只有一個partition),也能進行非順序式回溯,但隨機讀寫性能差;多個consumer消費互不影響,這也是高並發的支撐之一!下圖來自官網:
1.5 每個topic的所有消息,均衡(或指定)寫入多個分區(partition),分區分布在不同的broker上,每個分區使用主(Leader)+從(Follower)多節點,這樣的好處,一是分區文件大小和負載可控,增強單個topic的數據承載量,二是適應並行處理;Leader負責讀/寫,Followers僅復制備份,Leader不可用時,自動選舉Follower轉為主:
1.6 每個Consumer實例都屬於一個消費者組(consumer group),多個Consumer實例可以存在於不同的進程或機器上(Consumer實例可類比於java類的實例對象),一個消息記錄只會發送給有對應主題訂閱的消費者組中的一個Consumer實例!在一個消費者組中,每個分區至多只能發送到同一消費者的一個實例上,但一個消費者實例可以消費多個分區
,因此,若一個group中的消費者數量大於分區數量的話,多余的消費者將不會收到任何消息,所以分區(partition)數必須大於等於消費者組中的實例數量。下圖中,具有2個server的kafka集群,擁有同一個topic的4個分區,並對接2個消費者組,如果A或B組中Consumer都是同一消費者的實例,則輪詢均衡消費,若同組都是不同的消費者實例,則相當於廣播消息,下圖來自官網:
1.7 缺少事務特性,沒有接收確認和消費確認ACK機制,也沒有RocketMQ的二階段提交。
1.8 使用場景,下圖來自官網,這也讓我想起了kafka的幾個圈圈的圖標:
- 常規消息系統:消息系統一般有
queue
和publish-subscribe
兩種模式,queue模式下,多個consumer可以並行地各自處理一部分消息,增加吞吐量和速度,但不能一個消息多分發,因為消息被消費掉就不存在了。publish-subscribe模式下,可以廣播一個消息給多個訂閱者,但無法擴大吞吐量,kafka的consumer group概念下既能並行也能分發!我認為事實上kafka並沒有使用隊列這個數據結構,因沒有先進先出的概念! - 實時流處理 :對接KstreamAPI,可以實現流式處理,狀態計算。
- 分布式流式數據儲存:分區+副本的磁盤存儲方式可以實現高可用,低延時,大數據量下無性能衰減,kafka還具有僅當所有主從復制全部完成時才算寫入成功的確認機制,從而可作為commit log存儲系統。
第二部分——安裝
雖然window下也可使用kafka,但我想生產環境下都是使用linux,我使用RHEL8.0虛擬機,JDK11的安裝,略!
2.1 先進行Zookeeper安裝,雖然kafka新版本已經自帶ZK,但我還是推薦單獨安裝ZK,配置和功能獨立,步驟比較清晰,且如果是ZK集群,更建議單獨配置,為節省篇幅,此部分非重點我就簡述了,下載apache-zookeeper-3.5.5-bin.tar.gz,創建/usr/zookeeper目錄,cp到該目錄,tar命令解壓,創建data和logs目錄,用於保存zk的數據和log日志,根據zoo_sample.cfg復制一個zoo.cfg文件,並vim編輯如下圖,順帶研究下zk的配置:
然后配置linux環境變量,
[root@localhost ~]# vim /etc/profile
保存后使用source命令,使配置生效:
[root@localhost ~]# source /etc/profile
ZK啟動命令,會自動使用zoo.cfg配置文件:
[root@localhost apache-zookeeper-3.5.5-bin]# ./bin/zkServer.sh start
成功后狀態:

其他ZK管理命令:
- /查看服務狀態: ./zkServer.sh status
- /停止服務: ./zkServer.sh stop
- /重啟服務: ./zkServer.sh restart
- /使用ZKCli連接服務器: ./zkCli.sh -server 127.0.0.1:2181,
我本地zkCli實例如下:

2.2 安裝kafka,下載kafka_2.12-2.3.1.tgz
,創建/usr/kafka目錄,cp到此目錄,解壓,得到kafka_2.12-2.3.1目錄,進入此目錄,先看配置,這里有consumer、producer和server三個properties配置文件:
使用命令啟動:
[root@localhost kafka_2.12-2.3.1]# ./bin/kafka-server-start.sh config/ server.properties
如下為啟動kafka成功:

再回到zkCli下ls命令查看下,發現創建了很多node,用於保存kafka運行上下文信息:

新開一個terminal,創建一個topic,指定replication副本因子為1,即復制0份,分區partitions數量指定為 1:
[root@localhost kafka_2.12-2.3.1]# ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic biao
列出存在的topic:
[root@localhost kafka_2.12-2.3.1]# ./bin/kafka-topics.sh --list --zookeeper localhost:2181
創建另一個topic :
[root@localhost kafka_2.12-2.3.1]# ./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
下圖中創建了一個topic:test,使用本機kafka做集群識別,前面使用zk做集群識別,--bootstrap-server和--zookeeper參數效果一樣。再模擬producer,該topic下發送兩行消息,默認條件下,每行為一個消息記錄:
[root@localhost kafka_2.12-2.3.1]# ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

再另開一個terminal,模擬consumer,此terminal輸出將會和producer輸入一致:
[root@localhost kafka_2.12-2.3.1]# ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

Ctrl + C 退出程序。
2.3 以上為單ZK單kafka搭建,下面搭建單ZK多kafka實例環境:復制出3份配置文件:
[root@localhost kafka_2.12-2.3.1]# cp config/server.properties config/server-0.properties [root@localhost kafka_2.12-2.3.1]# cp config/server.properties config/server-1.properties [root@localhost kafka_2.12-2.3.1]# cp config/server.properties config/server-2.properties
以server-1.properties為例,其他數字依次修改即可:
broker.id=1 #集群內必須唯一 listeners=PLAINTEXT://:9093 #Socket監聽地址,沒寫hostname/IP即為listen所有IP log.dirs=/tmp/kafka-logs-1 #log目錄,每個實例獨立,防止互相覆蓋 zookeeper.connect #ZK注冊地址,因為是單ZK,三個實例一樣

單獨的terminal下創建topic:
[root@localhost kafka_2.12-2.3.1]# ./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 1 --topic my-replicated-topic
這里:指定replication副本因子為3,即復制2份,分區partitions數量指定為1,
查看topic的詳細信息:
[root@localhost kafka_2.12-2.3.1]# ./bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic my-replicated-topic

另一個例子:
[root@localhost kafka_2.12-2.3.1]# ./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 3 --topic replicated-xiao [root@localhost kafka_2.12-2.3.1]# ./bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic replicated-xiao
以上每行說明一個partition,
- "Leader":leader節點,負責讀寫,一個partition下的leader是隨機選取的;
- "replicas":列出所有同步保存append log文件的節點,不論主從角色和狀態是否有效;
- "isr" :意為"in-sync",即當前有主從同步的有效節點列表;
模擬producer,並輸入幾行信息:
[root@localhost kafka_2.12-2.3.1]# ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic replicated-xiao >xie >xiaobiao >hell world
新terminal下,模擬consumer:
[root@localhost kafka_2.12-2.3.1]# ./bin/kafka-console-consumer.sh -bootstrap-server localhost:9092 --from-beginning --topic replicated-xiao
Consumer窗口輸出內容會和producer窗口輸入內容保持一致:
容錯測試,關閉broker-1實例:
[root@localhost ~]# ps -aux | grep server-1.properties
[root@localhost ~]# kill 21753
或者直接到server-1界面CTRL+C關閉,效果一樣:
對比上面的圖,可以看到Leader發生變化,Isr 里都沒有1了:
再使用consumer讀取記錄,效果一樣,可見容錯機制啟用了主從替代:
如果再啟動server-1,可見主從替換后,不會恢復:
第三部分——應用
創建一個Springboot+gradle項目,命名為kafka-stream02,
3.1 應用測試01:位於包com.biao.kafka下,實現kafka消息的發送和消費:
build.gradle中的核心依賴為:
compile group: 'org.springframework.boot', name: 'spring-boot-starter', version: '2.2.1.RELEASE' compile group: 'org.springframework.kafka', name: 'spring-kafka', version: '2.3.3.RELEASE'
創建消息發送者com.biao.kafka.Producer:
@Component //@Slf4j public class Producer { @Autowired private KafkaTemplate<String,String> kafkaTemplate; private Logger log = LoggerFactory.getLogger(Producer.class); private String time = LocalDateTime.now().toString(); private final String msg = "THIS IS MESSAGE CONTENT " + time; public void send() throws InterruptedException { log.info("send message is {}",this.msg); Thread.sleep(1000L); // kafkaTemplate.sendDefault() 為異步方法,返回 ListenerFuture<T>, kafkaTemplate.send("HelloWorld","test-key",this.msg); } }
以上核心為kafkaTemplate的API, 可以使用kafkaTemplate.send(topic,key,value)同步方法發送消息,或者kafkaTemplate. sendDefault()異步方法發送,
再創建消費者com.biao.kafka.Consumer,使用@KafkaListener
注解標識一個topic的監聽方法:
@Component //@Slf4j public class Consumer { private Logger log = LoggerFactory.getLogger(Consumer.class); @KafkaListener(id = "foo",groupId = "test-consumer-group",topics = "HelloWorld") public void listen(ConsumerRecord<?,?> records){ Optional<?> msg = Optional.ofNullable(records.value()); if (msg.isPresent()){ Object data = msg.get(); log.info("ConsumerRecord >>>>>> {}", records); log.info("Record Data >>>>>> {}", data); } } }
創建入口類 com.biao.kafka.KafkaApplication:
@SpringBootApplication public class KafkaApplication { public static void main(String[] args) throws InterruptedException { System.out.println("KafkaApplication started >>>>>>"); ConfigurableApplicationContext context = SpringApplication.run(KafkaApplication.class,args); Producer producer = context.getBean(Producer.class); producer.send(); } }
配置文件application.properties,請關注下Serializer和Deserializer:
#以下這些值也可以在運行時通過參數指定 #============== kafka =================== # 指定kafka 代理地址,可以多個,用逗號隔開 spring.kafka.bootstrap-servers=192.168.1.204:9092 # 運行com.biao.wordcount.WordCountApplication時使用,我換了一個linux虛擬機 #spring.kafka.bootstrap-servers=192.168.1.221:9092 #=============== provider ======================= spring.kafka.producer.retries=2 # 每次批量發送消息的數量,kafka是使用流模擬批量處理,每次提交都是批處理方式 spring.kafka.producer.batch-size=16384 spring.kafka.producer.buffer-memory=33554432 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer #=============== consumer ======================= spring.kafka.consumer.group-id=test-consumer-group spring.kafka.consumer.auto-offset-reset=earliest spring.kafka.consumer.enable-auto-commit=true spring.kafka.consumer.auto-commit-interval=100 spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
運行程序即可看到結果,這里使用Springboot的DI機制啟動運行了consumer和producer,注意關閉linux的防火牆或打開9092端口:
再到kafka服務器上驗證一下是否真的發送成功:
[root@localhost kafka_2.12-2.3.1]# ./bin/kafka-console-consumer.sh -bootstrap-server 192.168.1.204:9092 --from-beginning --topic HelloWorld
3.2 應用測試02,包com.biao.pipe下,實現一個流處理邏輯,開啟一個流傳輸管道,將一個topic的內容傳輸到另一個topic中,代碼com.biao.pipe.PipeApplication:
public class PipeApplication { public static void main(String[] args) { System.out.println("PipeApplication starting ........."); Properties props = new Properties(); // StreamsConfig已經預定義了很多參數名稱,運行時console會輸出所有StreamsConfig values // 這里沒有使用springboot的application.properties來配置 props.put(StreamsConfig.APPLICATION_ID_CONFIG,"streams-pipe"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.1.221:9092"); // kafka流都是byte[],必須有序列化, props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass()); // kafka流計算是一個各broker連接的拓撲結構,以下使用builder來構造拓撲 final StreamsBuilder builder = new StreamsBuilder(); // 構建一個KStream流對象,元素是<String, String>類型的key-value對值, KStream<String, String> source = builder.stream("streams-plaintext-input"); // 將前面的topic:"streams-plaintext-input"寫入另一個topic:"streams-pipe-output" source.to("streams-pipe-output"); // 以上兩行等同以下一行 // builder.stream("streams-plaintext-input").to("streams-pipe-output"); // 查看具體構建的拓撲結構 final Topology topology = builder.build(); System.out.println(topology.describe()); final KafkaStreams streams = new KafkaStreams(topology,props); // 控制運行次數,一次后就結束 final CountDownLatch latch = new CountDownLatch(1); Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook"){ @Override public void run() { streams.close(); latch.countDown(); } }); try{ streams.start(); latch.await(); }catch (Throwable e){ System.exit(1); } System.exit(0); } }
注意:以上即使用kafka topic構建了一個KStream流源頭,運行輸出以下,即為成功,進一步可以在kafka中進行topic寫入,再到另一個topic驗證輸出,我就不演示了。注意配置/usr/kafka2.3/kafka_2.12-2.3.1/config/server.properties中的listeners地址(見后記1):
解釋:以上構造了有2個處理節點的kafka流計算拓撲結構,源節點:KSTREAM-SOURCE-0000000000,匯聚(Sink)節點:KSTREAM-SINK-0000000001,源節點持續的讀取topic為streams-plaintext-input的有序記錄並輸送到下游Sink節點,Sink節點再將記錄寫入topic為streams-pipe-output的流,--> 和 <-- 指示左右端對象的上游和下游關系,圖中有換行,導致顯示不連貫拓撲展示如下:
3.3 應用測試03,包com.biao.linesplit下,創建一個無狀態
的流處理邏輯,讀取一個topic的記錄,並將文本行按空格分開,再傳輸到另一個topic,代碼 com.biao.linesplit.LineSplitApplication:
public class LineSplitApplication { public static void main(String[] args) { System.out.println("LineSplitApplication starting ........."); Properties props = new Properties(); // StreamsConfig已經預定義了很多參數名稱,運行時console會輸出所有StreamsConfig values // 這里沒有使用springboot的application.properties來配置 props.put(StreamsConfig.APPLICATION_ID_CONFIG,"streams-line-split"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.1.221:9092"); // kafka流都是byte[],必須有序列化,不同的對象使用不同的序列化器 props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass()); // kafka流計算是一個各broker連接的拓撲結構,以下使用builder來構造拓撲 final StreamsBuilder builder = new StreamsBuilder(); // 構建一個KStream流對象,元素是<String, String>類型的key-value對值, KStream<String, String> source = builder.stream("streams-plaintext-input"); /* // 以source為輸入,產生一條新流words,這里使用了流的扁平化語法,我的前篇文章有講此基礎 KStream<String, String > words = source.flatMapValues(value -> Arrays.asList("\\W+")); // 將前面的topic:"streams-plaintext-input"寫入另一個topic:"streams-pipe-output" words.to("streams-pipe-output");*/ // 以上兩行使用stream鏈式語法+lambda等同以下一行,我的前篇文章有講此基礎 source.flatMapValues(value -> Arrays.asList(value.split("\\W+"))) .to("streams-linesplit-output"); // 查看具體構建的拓撲結構 final Topology topology = builder.build(); System.out.println(topology.describe()); final KafkaStreams streams = new KafkaStreams(topology,props); // 控制運行次數,一次后就結束 final CountDownLatch latch = new CountDownLatch(1); Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook"){ @Override public void run() { streams.close(); latch.countDown(); } }); try{ streams.start(); latch.await(); }catch (Throwable e){ System.exit(1); } System.exit(0); } }
運行輸出以下,即為成功,也可以進一步在kafka上直接進行topic寫入和另一個topic輸出驗證,演示,略!注意配置/usr/kafka2.3/kafka_2.12-2.3.1/config/server.properties中的listeners地址(見后記1):
解釋:以上構造了有3個處理節點的kafka流計算拓撲結構,源節點:KSTREAM-SOURCE-0000000000,處理節點:KSTREAM-FLATMAPVALUES-0000000001,匯聚節點:KSTREAM-SINK-0000000002,處理節點從源節點取得流元素,進行處理,再將結果傳輸給匯聚節點,注意這個過程是無狀態的,拓撲展示如下:
3.4 應用測試04,包com.biao.wordcount下,構建一個無限流
處理邏輯,讀取一個topic,統計文本單詞數,最終輸出到另一個topic,代碼com.biao.wordcount.WordApplication:
public class WordCountApplication { public static void main(String[] args) { System.out.println("WordCountApplication starting ........."); Properties props = new Properties(); // StreamsConfig已經預定義了很多參數名稱,運行時console會輸出所有StreamsConfig values // 這里沒有使用springboot的application.properties來配置 props.put(StreamsConfig.APPLICATION_ID_CONFIG,"streams-word-count"); // kafka虛擬機linux地址 props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.1.221:9092"); // kafka流都是byte[],必須有序列化,不同的對象使用不同的序列化器 props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass()); // kafka流計算是一個各broker連接的拓撲結構,以下使用builder來構造拓撲 final StreamsBuilder builder = new StreamsBuilder(); // 構建一個KStream流對象,元素是<String, String>類型的key-value對值,topic:streams-plaintext-input KStream<String, String> source = builder.stream("streams-plaintext-input"); // 以下使用stream鏈式語法+lambda,具體分開的過程語句我就不寫了 // flatMapValues將text line使用空格分隔成words source.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+"))) .groupBy(((key, value) -> value)) .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store")) .toStream() .to("streams-wordcount-output", Produced.with(Serdes.String(),Serdes.Long())); // 查看具體構建的拓撲結構 final Topology topology = builder.build(); System.out.println(topology.describe()); final KafkaStreams streams = new KafkaStreams(topology,props); // 控制運行次數,一次后就結束 final CountDownLatch latch = new CountDownLatch(1); Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook"){ @Override public void run() { streams.close(); latch.countDown(); } }); try{ streams.start(); latch.await(); }catch (Throwable e){ System.exit(1); } System.exit(0); } }
運行輸出以下內容,即為成功,注意配置/usr/kafka2.3/kafka_2.12-2.3.1/config/server.properties中的listeners地址(見后記1):
解釋:最重要一點即此WordCountApplication僅是一個邏輯處理單元,可以理解為一個流水線車間,里面有兩條流水線對來料加工再輸出加工品。以上可以看出,有兩個不連通的拓撲結構,第一個拓撲無狀態,其匯聚節點KSTREAM-SINK-0000000005寫入到topic: counts-store-repartition,這個topic又作為第二個拓撲的源,此中間topic的作用是因分組聚合運算”打亂”流元素的順序。插入的節點Processor: KSTREAM-FILTER-0000000005是過濾掉分組聚合key值為空的記錄。
第二個拓撲有狀態,即生成並保存了計算中間值,因為要做分組統計,分組聚合運算節點KSTREAM-AGGREGATE-0000000003保存狀態使用了counts-store,即程序中指定的值。對流中每個元素統計時,會先去保存的狀態數據中去查找匹配,如果有則累加一,然后再寫入counts-store。每個被更新的統計值都再傳輸到處理節點KTABLE-TOSTREAM-0000000007,此節點作用是將統計更新的值再解析成新流。最后傳輸給匯聚節點KSTREAM-SINK-0000000008。以上可見流處理的思想和邏輯,內部迭代確實很強大!拓撲圖如下:
應用04運行步驟:
第一步,啟動ZK,再啟動kafka,注意先修改config/server.properties 中listeners=PLAINTEXT:// 192.168.1.221:9092:
[root@localhost kafka_2.12-2.3.1]# ./bin/kafka-server-start.sh config/server.properties
第二步,運行com.biao.wordcount.WordCountApplication,啟動kafka流處理車間。
topic數據寫入放在包com.biao.wordcount.producer,當然也可以直接在kafka server中使用命令行寫入,我這里是為了演示多種代碼操作模式。配置類com.biao.wordcount.producer.KafkaConfig,這里使用了kafka的API配置方式,分別配置了topic,producer和consumer的相應參數,並生成Bean對象,請對比application.properties方式:
@Configuration @EnableKafka public class KafkaConfig { @Bean public KafkaTemplate<Integer,String > kafkaTemplate(){ return new KafkaTemplate<>(this.producerFactory()); } // topic @Bean public KafkaAdmin admin(){ Map<String,Object> configs = new HashMap<>(16); configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.1.221:9092"); return new KafkaAdmin(configs); } @Bean // NewTopic(String name, int numPartitions, short replicationFactor) // kafka中每個topic只需創建一次, public NewTopic topic(){ return new NewTopic("streams-plaintext-input",1, (short) 1); } @Bean // NewTopic(String name, int numPartitions, short replicationFactor) // kafka中每個topic只需創建一次, public NewTopic topic2(){ return new NewTopic("streams-wordcount-output",1, (short) 1); } // producer @Bean public Map<String,Object> producerConfigs(){ Map<String, Object> props = new HashMap<>(16); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.1.221:9092"); props.put("acks","all"); props.put("retries",2); props.put("batch.size",16384); props.put("linger.ms",1); props.put("buffer.memory",33554432); props.put("key.serializer","org.apache.kafka.common.serialization.IntegerSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // props.put("key.converter","org.apache.kafka.connect.storage.IntegerConverter"); // props.put("value.converter","org.apache.kafka.connect.storage.StringConverter"); return props; } @Bean public ProducerFactory<Integer,String> producerFactory(){ return new DefaultKafkaProducerFactory<>(this.producerConfigs()); } // consumer @Bean public Map<String,Object> consumerConfigs(){ HashMap<String,Object> props = new HashMap<>(16); props.put("bootstrap.servers","192.168.1.221:9092"); props.put("group.id","foo"); props.put("enable.auto.commit","true"); // WordCountApplication 的consumer消費對象是統計的結果 key-value props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer","org.apache.kafka.common.serialization.LongDeserializer"); props.put("formatter","kafka.tools.DefaultMessageFormatter"); props.put("print.key","true"); props.put("value.key","true"); // props.put("key.converter","org.apache.kafka.connect.storage.IntegerConverter"); // props.put("value.converter","org.apache.kafka.connect.storage.StringConverter"); return props; } @Bean public ConsumerFactory<Integer,String> consumerFactory(){ return new DefaultKafkaConsumerFactory<>(this.consumerConfigs()); } @Bean public ConcurrentKafkaListenerContainerFactory<Integer,String> kafkaListenerContainerFactory(){ ConcurrentKafkaListenerContainerFactory<Integer,String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(this.consumerFactory()); return factory; } @Bean public SimpleConsumer simpleConsumerLister(){ return new SimpleConsumer(); } }
定義消費者,com.biao.wordcount.producer.SimpleConsumer:
@Component public class SimpleConsumer { private Logger log = LoggerFactory.getLogger(SimpleConsumer.class); private final CountDownLatch countDownLatch = new CountDownLatch(1); @KafkaListener(id = "foo",topics = "streams-wordcount-output") public void listen(byte[] records){ System.out.println("records is >>>> "+ records); this.countDownLatch.countDown(); log.debug("consume successfully!"); } //在WordCountApplication實例中,無法打印流結果,因為需要格式化 /* public void listen(ConsumerRecord<?,?> records){ Optional<?> msg = Optional.ofNullable(records.value()); if (msg.isPresent()){ Object data = msg.get(); log.info("Consumer Record >>>>>> {}", records); log.info("Record Data >>>>>> {}", data); } }*/ }
定義生產者,並作為啟動類,com.biao.wordcount.producer.KafakaProducer:
@SpringBootApplication public class KafakaProducer { public static void main(String[] args) { AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(KafkaConfig.class); // KafkaTemplate<Integer,String> kafkaTemplate = (KafkaTemplate<Integer, String>) context.getBean(KafkaTemplate.class); KafkaTemplate<Integer,String> kafkaTemplate = (KafkaTemplate<Integer, String>) context.getBean(KafkaTemplate.class); LocalDateTime time = LocalDateTime.now(); String data = "MSG CONTENT -> " + time ; // send(String topic, K key, @Nullable V data) ListenableFuture<SendResult<Integer,String>> send = kafkaTemplate.send("streams-plaintext-input", 1, data); send.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() { @Override public void onFailure(Throwable ex) { System.out.println(">>>>>>> kafka message send failure"); } @Override public void onSuccess(SendResult<Integer, String> result) { System.out.println(">>>>>>> kafka message send successfully"); } }); } }
第三步,運行com.biao.wordcount.producer.KafakaProducer, 啟動topic數據寫入,kafka中驗證如下:
如果多次運行導致測試數據太多,影響結果查看,可以先刪除topic及其數據,若當前topic有使用過即有傳輸過信息:並沒有真正刪除topic只是把這個topic標記為刪除(marked for deletion),要徹底刪除需到ZK中刪除相應的目錄:
[root@localhost kafka_2.12-2.3.1]# ./bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic HelloWorld Topic HelloWorld is marked for deletion. Note: This will have no impact if delete.topic.enable is not set to true.
第四步,在kafka server上查看最終word統計結果,命令:
[root@localhost kafka_2.12-2.3.1]# ./bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.221:9092 --topic streams-wordcount-output --from-beginning --formatter kafka.tools.DefaultMessageFormatter --property print.key=true --property print.value=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
后記:
1.關於有狀態
和無狀態
,復雜問題簡單化!無狀態對象本身只是個純粹的處理邏輯,不依賴上下文信息,也不改變上下文信息,比如FUNC(x+y),只要有輸入x和y,就輸出相加值,對程序“無害”;有狀態指會保留上下文,如統計單詞數,必須保留每次計算的中間結果,用於下次累加,有狀態對象會破壞程序運行現場,不利於並發和共享。
2.如遇到程序出錯:
[AdminClient clientId=adminclient-1] Error connecting to node dubbo204.domain:9092 (id: 0 rack: null)
這是因為linux的監聽hosts配置引起的,直接修改config/server.properties
中listeners為linux的虛機IP地址即可,並注意關閉linux的防火牆或打開9092端口:
3.添加lombok依賴
providedCompile group: 'org.projectlombok', name: 'lombok', version: '1.18.10'
遇到編譯錯誤:
Could not find method providedCompile() for arguments [{group=org.projectlombok, name=lombok, version=1.18.10}]
因providedCompile必須配合 war插件,修改build.gradle:
4.運行WordCountApplication 報錯:
org.apache.kafka.common.errors.SerializationException: Size of data received by IntegerDeserializer is not 4
這是序列化問題,必須使用正確的序列化器處理對應的數據,如IntegerDeserializer只能反序列化Integer對象,StringSerializer用於序列化String對象。
5.RHEL8.0版本可用性還是不錯的,比7要流暢很多,很多命令都變了,我開的共享:https://pan.baidu.com/s/19gkx07hQ6TuN9UyNWHmChQ
提取碼:bg69
,絕對保證可用,之前我也下載了幾次都是損壞的,每次6.62G大小,快哭了。
總結:kafka API,分為Producer,Consumer,Stream,Connect和AdminClient。Producer/Consumer分別用於管理生產者和消費者,Stream則是自帶的KStream,可以類比JDK8的Stream來理解,即在輸出到最終sink前進行流式計算,且很多方法使用類似,Connect是用於kafka連接到輸入/輸出,支持很多類型,如DB,file,redis,ELK等。AdminClient則管理topic/broker等。KStream+kafka強強聯手,可以預計未來會干出一番大事!
推薦閱讀: