流式計算(二)-Kafka Stream


前面說了Java8的流,這里還說流處理,既然是流,比如水流車流,肯定得有流的源頭,源可以有多種,可以自建,也可以從應用端獲取,

今天就拿非常經典的Kafka做源頭來說事,比如要來一套應用日志實時分析框架,或者是高並發實時流處理框架,正是Kafka的拿手好戲。

 

作者原創文章,謝絕一切轉載!

 本文只發表在"公眾號"和"博客園",其他均屬復制粘貼!如果覺得排版不清晰,請查看公眾號文章。 

 

環境:Idea2019.03/Gradle6.0.1/JDK11.0.4/Lambda/RHEL8.0/VMWare15.5/Springboot2.2.1.RELEASE/Zookeeper3.5.5/Kafka2.3.1

難度:新手--戰士--老兵--大師

目標:

  1. 理解kafka原理
  2. Linux下kafka集群安裝
  3. 使用kafka操作流式處理

說明:

為了遇見各種問題,同時保持時效性,我盡量使用最新的軟件版本。代碼地址:其中的day23,https://github.com/xiexiaobiao/dubbo-project.git

第一部分——原理


1.先看看Kafka,目前kafka的發展已超出消息中間件的范疇,趨於向流平台靠攏,先總結如下:

1.1 Scala語言編寫,若作為消息中間件,並發10W+級別,大於其他MQ;

1.2 必須有Zookeeper做協調,ZK保存消費者/生產者狀態信息,使得兩端非常輕量化;使用發布/訂閱模式,所有消息按主題(topic)分類,使用pull模式消費消息;

 

 1.3 每條消息由key + value + timestamp構成,其中key用於計算目的發送分區(partition),消息記錄由不可變(immutable)的順序式Append log文件持久化消息,Append寫方式是高吞吐率的重要支撐之一!偏移量(offset)標識消息在文件中的位置,下圖來自官網:

 

 1.4 每條消息不論是否已被消費都將保存一個設定的時間,這是和RabbitMQ的顯著差異;消費者僅需保存消息offset信息,可按順序消費(一個topic只有一個partition),也能進行非順序式回溯,但隨機讀寫性能差;多個consumer消費互不影響,這也是高並發的支撐之一!下圖來自官網:

 

 1.5 每個topic的所有消息,均衡(或指定)寫入多個分區(partition),分區分布在不同的broker上,每個分區使用主(Leader)+從(Follower)多節點,這樣的好處,一是分區文件大小和負載可控,增強單個topic的數據承載量,二是適應並行處理;Leader負責讀/寫,Followers僅復制備份,Leader不可用時,自動選舉Follower轉為主:

 

 1.6 每個Consumer實例都屬於一個消費者組(consumer group),多個Consumer實例可以存在於不同的進程或機器上(Consumer實例可類比於java類的實例對象),一個消息記錄只會發送給有對應主題訂閱的消費者組中的一個Consumer實例!一個消費者組中,每個分區至多只能發送到同一消費者的一個實例上,但一個消費者實例可以消費多個分區,因此,若一個group中的消費者數量大於分區數量的話,多余的消費者將不會收到任何消息,所以分區(partition)數必須大於等於消費者組中的實例數量。下圖中,具有2個server的kafka集群,擁有同一個topic的4個分區,並對接2個消費者組,如果A或B組中Consumer都是同一消費者的實例,則輪詢均衡消費,若同組都是不同的消費者實例,則相當於廣播消息,下圖來自官網:

 

1.7 缺少事務特性,沒有接收確認和消費確認ACK機制,也沒有RocketMQ的二階段提交。

1.8 使用場景,下圖來自官網,這也讓我想起了kafka的幾個圈圈的圖標:

 

  • 常規消息系統:消息系統一般有queuepublish-subscribe兩種模式,queue模式下,多個consumer可以並行地各自處理一部分消息,增加吞吐量和速度,但不能一個消息多分發,因為消息被消費掉就不存在了。publish-subscribe模式下,可以廣播一個消息給多個訂閱者,但無法擴大吞吐量,kafka的consumer group概念下既能並行也能分發!我認為事實上kafka並沒有使用隊列這個數據結構,因沒有先進先出的概念!
  • 實時流處理 :對接KstreamAPI,可以實現流式處理,狀態計算。
  • 分布式流式數據儲存:分區+副本的磁盤存儲方式可以實現高可用,低延時,大數據量下無性能衰減,kafka還具有僅當所有主從復制全部完成時才算寫入成功的確認機制,從而可作為commit log存儲系統。

第二部分——安裝


雖然window下也可使用kafka,但我想生產環境下都是使用linux,我使用RHEL8.0虛擬機,JDK11的安裝,略!

 

 2.1 先進行Zookeeper安裝,雖然kafka新版本已經自帶ZK,但我還是推薦單獨安裝ZK,配置和功能獨立,步驟比較清晰,且如果是ZK集群,更建議單獨配置,為節省篇幅,此部分非重點我就簡述了,下載apache-zookeeper-3.5.5-bin.tar.gz,創建/usr/zookeeper目錄,cp到該目錄,tar命令解壓,創建data和logs目錄,用於保存zk的數據和log日志,根據zoo_sample.cfg復制一個zoo.cfg文件,並vim編輯如下圖,順帶研究下zk的配置:

 

 然后配置linux環境變量,

[root@localhost ~]# vim /etc/profile

 

保存后使用source命令,使配置生效:

[root@localhost ~]# source /etc/profile
ZK啟動命令,會自動使用zoo.cfg配置文件:
[root@localhost apache-zookeeper-3.5.5-bin]# ./bin/zkServer.sh start
成功后狀態:

其他ZK管理命令:

  • /查看服務狀態: ./zkServer.sh status
  • /停止服務: ./zkServer.sh stop
  • /重啟服務: ./zkServer.sh restart
  • /使用ZKCli連接服務器: ./zkCli.sh -server 127.0.0.1:2181,

我本地zkCli實例如下:

 

 2.2 安裝kafka,下載kafka_2.12-2.3.1.tgz,創建/usr/kafka目錄,cp到此目錄,解壓,得到kafka_2.12-2.3.1目錄,進入此目錄,先看配置,這里有consumer、producer和server三個properties配置文件:

 

 使用命令啟動:

[root@localhost kafka_2.12-2.3.1]# ./bin/kafka-server-start.sh config/ server.properties
如下為啟動kafka成功:

 

 再回到zkCli下ls命令查看下,發現創建了很多node,用於保存kafka運行上下文信息:

 

新開一個terminal,創建一個topic,指定replication副本因子為1,即復制0份,分區partitions數量指定為 1:

[root@localhost kafka_2.12-2.3.1]# ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic biao
列出存在的topic:
[root@localhost kafka_2.12-2.3.1]# ./bin/kafka-topics.sh --list --zookeeper localhost:2181

 

 

創建另一個topic :

[root@localhost kafka_2.12-2.3.1]# ./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
下圖中創建了一個topic:test,使用本機kafka做集群識別,前面使用zk做集群識別,--bootstrap-server和--zookeeper參數效果一樣。再模擬producer,該topic下發送兩行消息,默認條件下,每行為一個消息記錄:
[root@localhost kafka_2.12-2.3.1]# ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

再另開一個terminal,模擬consumer,此terminal輸出將會和producer輸入一致:

[root@localhost kafka_2.12-2.3.1]# ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

 

 

 Ctrl + C 退出程序。

 

2.3 以上為單ZK單kafka搭建,下面搭建單ZK多kafka實例環境:復制出3份配置文件:

[root@localhost kafka_2.12-2.3.1]# cp config/server.properties  config/server-0.properties
[root@localhost kafka_2.12-2.3.1]# cp config/server.properties  config/server-1.properties
[root@localhost kafka_2.12-2.3.1]# cp config/server.properties  config/server-2.properties
以server-1.properties為例,其他數字依次修改即可:
broker.id=1  #集群內必須唯一
listeners=PLAINTEXT://:9093  #Socket監聽地址,沒寫hostname/IP即為listen所有IP
log.dirs=/tmp/kafka-logs-1  #log目錄,每個實例獨立,防止互相覆蓋
zookeeper.connect  #ZK注冊地址,因為是單ZK,三個實例一樣

 

單獨的terminal下創建topic:

[root@localhost kafka_2.12-2.3.1]# ./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 1 --topic my-replicated-topic
這里:指定replication副本因子為3,即復制2份,分區partitions數量指定為1,

查看topic的詳細信息:

[root@localhost kafka_2.12-2.3.1]# ./bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic my-replicated-topic

另一個例子:

[root@localhost kafka_2.12-2.3.1]# ./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 3 --topic replicated-xiao
[root@localhost kafka_2.12-2.3.1]# ./bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic replicated-xiao
 
        

 

以上每行說明一個partition,

  • "Leader":leader節點,負責讀寫,一個partition下的leader是隨機選取的;
  • "replicas":列出所有同步保存append log文件的節點,不論主從角色和狀態是否有效;
  • "isr" :意為"in-sync",即當前有主從同步的有效節點列表;

模擬producer,並輸入幾行信息:

[root@localhost kafka_2.12-2.3.1]# ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic replicated-xiao
>xie
>xiaobiao
>hell world

 

新terminal下,模擬consumer:

[root@localhost kafka_2.12-2.3.1]# ./bin/kafka-console-consumer.sh -bootstrap-server localhost:9092 --from-beginning --topic replicated-xiao

Consumer窗口輸出內容會和producer窗口輸入內容保持一致:

 

容錯測試,關閉broker-1實例:

[root@localhost ~]# ps -aux | grep server-1.properties
 
        

 

[root@localhost ~]# kill 21753
或者直接到server-1界面CTRL+C關閉,效果一樣:

 

 對比上面的圖,可以看到Leader發生變化,Isr 里都沒有1了:

 

 再使用consumer讀取記錄,效果一樣,可見容錯機制啟用了主從替代:

 

 如果再啟動server-1,可見主從替換后,不會恢復:

 

第三部分——應用


創建一個Springboot+gradle項目,命名為kafka-stream02,

3.1 應用測試01:位於包com.biao.kafka下,實現kafka消息的發送和消費:

 

build.gradle中的核心依賴為:

compile group: 'org.springframework.boot', name: 'spring-boot-starter', version: '2.2.1.RELEASE'
compile group: 'org.springframework.kafka', name: 'spring-kafka', version: '2.3.3.RELEASE'
 

創建消息發送者com.biao.kafka.Producer:

@Component
//@Slf4j
public class Producer {
    @Autowired
    private KafkaTemplate<String,String> kafkaTemplate;

    private Logger log = LoggerFactory.getLogger(Producer.class);
    private String time = LocalDateTime.now().toString();
    private final String msg = "THIS IS MESSAGE CONTENT " + time;

    public void send() throws InterruptedException {
        log.info("send message is {}",this.msg);
        Thread.sleep(1000L);
        // kafkaTemplate.sendDefault() 為異步方法,返回 ListenerFuture<T>,
        kafkaTemplate.send("HelloWorld","test-key",this.msg);
    }
}

以上核心為kafkaTemplate的API, 可以使用kafkaTemplate.send(topic,key,value)同步方法發送消息,或者kafkaTemplate. sendDefault()異步方法發送,

再創建消費者com.biao.kafka.Consumer,使用@KafkaListener注解標識一個topic的監聽方法:

@Component
//@Slf4j
public class Consumer {

    private Logger log = LoggerFactory.getLogger(Consumer.class);

    @KafkaListener(id = "foo",groupId = "test-consumer-group",topics = "HelloWorld")
    public void listen(ConsumerRecord<?,?> records){
        Optional<?> msg = Optional.ofNullable(records.value());
        if (msg.isPresent()){
            Object data = msg.get();
            log.info("ConsumerRecord >>>>>> {}", records);
            log.info("Record Data >>>>>> {}", data);
        }
    }
}

創建入口類 com.biao.kafka.KafkaApplication:

@SpringBootApplication
public class KafkaApplication {
    public static void main(String[] args) throws InterruptedException {
        System.out.println("KafkaApplication started >>>>>>");
        ConfigurableApplicationContext context = SpringApplication.run(KafkaApplication.class,args);
        Producer producer = context.getBean(Producer.class);
        producer.send();
    }
}

 

配置文件application.properties,請關注下Serializer和Deserializer:

#以下這些值也可以在運行時通過參數指定
#============== kafka ===================
# 指定kafka 代理地址,可以多個,用逗號隔開
spring.kafka.bootstrap-servers=192.168.1.204:9092
# 運行com.biao.wordcount.WordCountApplication時使用,我換了一個linux虛擬機
#spring.kafka.bootstrap-servers=192.168.1.221:9092

#=============== provider  =======================
spring.kafka.producer.retries=2
# 每次批量發送消息的數量,kafka是使用流模擬批量處理,每次提交都是批處理方式
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432

spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

#=============== consumer  =======================
spring.kafka.consumer.group-id=test-consumer-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=100

spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

 

運行程序即可看到結果,這里使用Springboot的DI機制啟動運行了consumer和producer,注意關閉linux的防火牆或打開9092端口:

 

 

再到kafka服務器上驗證一下是否真的發送成功:

[root@localhost kafka_2.12-2.3.1]# ./bin/kafka-console-consumer.sh -bootstrap-server 192.168.1.204:9092 --from-beginning --topic HelloWorld

 

 

 3.2 應用測試02,包com.biao.pipe下,實現一個流處理邏輯,開啟一個流傳輸管道,將一個topic的內容傳輸到另一個topic中,代碼com.biao.pipe.PipeApplication:

public class PipeApplication {
    public static void main(String[] args) {
        System.out.println("PipeApplication starting .........");
        Properties props = new Properties();
        // StreamsConfig已經預定義了很多參數名稱,運行時console會輸出所有StreamsConfig values
        // 這里沒有使用springboot的application.properties來配置
        props.put(StreamsConfig.APPLICATION_ID_CONFIG,"streams-pipe");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.1.221:9092");
        // kafka流都是byte[],必須有序列化,
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass());

        // kafka流計算是一個各broker連接的拓撲結構,以下使用builder來構造拓撲
        final StreamsBuilder builder = new StreamsBuilder();
        // 構建一個KStream流對象,元素是<String, String>類型的key-value對值,
        KStream<String, String> source = builder.stream("streams-plaintext-input");
        // 將前面的topic:"streams-plaintext-input"寫入另一個topic:"streams-pipe-output"
        source.to("streams-pipe-output");
        // 以上兩行等同以下一行
        // builder.stream("streams-plaintext-input").to("streams-pipe-output");

        // 查看具體構建的拓撲結構
        final Topology topology = builder.build();
        System.out.println(topology.describe());

        final KafkaStreams streams = new KafkaStreams(topology,props);
        // 控制運行次數,一次后就結束
        final CountDownLatch latch = new CountDownLatch(1);

        Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook"){
            @Override
            public void run() {
                streams.close();
                latch.countDown();
            }
        });

        try{
            streams.start();
            latch.await();
        }catch (Throwable e){
            System.exit(1);
        }
        System.exit(0);
    }
}

注意:以上即使用kafka topic構建了一個KStream流源頭,運行輸出以下,即為成功,進一步可以在kafka中進行topic寫入,再到另一個topic驗證輸出,我就不演示了。注意配置/usr/kafka2.3/kafka_2.12-2.3.1/config/server.properties中的listeners地址(見后記1):

 

 解釋:以上構造了有2個處理節點的kafka流計算拓撲結構,源節點:KSTREAM-SOURCE-0000000000,匯聚(Sink)節點:KSTREAM-SINK-0000000001,源節點持續的讀取topic為streams-plaintext-input的有序記錄並輸送到下游Sink節點,Sink節點再將記錄寫入topic為streams-pipe-output的流,--> 和 <-- 指示左右端對象的上游和下游關系,圖中有換行,導致顯示不連貫拓撲展示如下:

 

 

 3.3 應用測試03,包com.biao.linesplit下,創建一個無狀態的流處理邏輯,讀取一個topic的記錄,並將文本行按空格分開,再傳輸到另一個topic,代碼 com.biao.linesplit.LineSplitApplication:

public class LineSplitApplication {
    public static void main(String[] args) {
        System.out.println("LineSplitApplication starting .........");
        Properties props = new Properties();
        // StreamsConfig已經預定義了很多參數名稱,運行時console會輸出所有StreamsConfig values
        // 這里沒有使用springboot的application.properties來配置
        props.put(StreamsConfig.APPLICATION_ID_CONFIG,"streams-line-split");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.1.221:9092");
        // kafka流都是byte[],必須有序列化,不同的對象使用不同的序列化器
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass());

        // kafka流計算是一個各broker連接的拓撲結構,以下使用builder來構造拓撲
        final StreamsBuilder builder = new StreamsBuilder();
        // 構建一個KStream流對象,元素是<String, String>類型的key-value對值,
        KStream<String, String> source = builder.stream("streams-plaintext-input");
        /*
        // 以source為輸入,產生一條新流words,這里使用了流的扁平化語法,我的前篇文章有講此基礎
        KStream<String, String > words = source.flatMapValues(value -> Arrays.asList("\\W+"));
        // 將前面的topic:"streams-plaintext-input"寫入另一個topic:"streams-pipe-output"
        words.to("streams-pipe-output");*/

        // 以上兩行使用stream鏈式語法+lambda等同以下一行,我的前篇文章有講此基礎
        source.flatMapValues(value -> Arrays.asList(value.split("\\W+")))
                      .to("streams-linesplit-output");

        // 查看具體構建的拓撲結構
        final Topology topology = builder.build();
        System.out.println(topology.describe());

        final KafkaStreams streams = new KafkaStreams(topology,props);
        // 控制運行次數,一次后就結束
        final CountDownLatch latch = new CountDownLatch(1);

        Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook"){
            @Override
            public void run() {
                streams.close();
                latch.countDown();
            }
        });

        try{
            streams.start();
            latch.await();
        }catch (Throwable e){
            System.exit(1);
        }
        System.exit(0);
    }
}

 

運行輸出以下,即為成功,也可以進一步在kafka上直接進行topic寫入和另一個topic輸出驗證,演示,略!注意配置/usr/kafka2.3/kafka_2.12-2.3.1/config/server.properties中的listeners地址(見后記1):

 

 解釋:以上構造了有3個處理節點的kafka流計算拓撲結構,源節點:KSTREAM-SOURCE-0000000000,處理節點:KSTREAM-FLATMAPVALUES-0000000001,匯聚節點:KSTREAM-SINK-0000000002,處理節點從源節點取得流元素,進行處理,再將結果傳輸給匯聚節點,注意這個過程是無狀態的,拓撲展示如下:

 

 

 3.4 應用測試04,包com.biao.wordcount下,構建一個無限流處理邏輯,讀取一個topic,統計文本單詞數,最終輸出到另一個topic,代碼com.biao.wordcount.WordApplication:

public class WordCountApplication {
    public static void main(String[] args) {
        System.out.println("WordCountApplication starting .........");
        Properties props = new Properties();
        // StreamsConfig已經預定義了很多參數名稱,運行時console會輸出所有StreamsConfig values
        // 這里沒有使用springboot的application.properties來配置
        props.put(StreamsConfig.APPLICATION_ID_CONFIG,"streams-word-count");
        // kafka虛擬機linux地址
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.1.221:9092");
        // kafka流都是byte[],必須有序列化,不同的對象使用不同的序列化器
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass());

        // kafka流計算是一個各broker連接的拓撲結構,以下使用builder來構造拓撲
        final StreamsBuilder builder = new StreamsBuilder();
        // 構建一個KStream流對象,元素是<String, String>類型的key-value對值,topic:streams-plaintext-input
        KStream<String, String> source = builder.stream("streams-plaintext-input");

        // 以下使用stream鏈式語法+lambda,具體分開的過程語句我就不寫了
        // flatMapValues將text line使用空格分隔成words
        source.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")))
                .groupBy(((key, value) -> value))
                .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"))
                .toStream()
                .to("streams-wordcount-output", Produced.with(Serdes.String(),Serdes.Long()));

        // 查看具體構建的拓撲結構
        final Topology topology = builder.build();
        System.out.println(topology.describe());

        final KafkaStreams streams = new KafkaStreams(topology,props);
        // 控制運行次數,一次后就結束
        final CountDownLatch latch = new CountDownLatch(1);

        Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook"){
            @Override
            public void run() {
                streams.close();
                latch.countDown();
            }
        });

        try{
            streams.start();
            latch.await();
        }catch (Throwable e){
            System.exit(1);
        }
        System.exit(0);
    }
}

 

運行輸出以下內容,即為成功,注意配置/usr/kafka2.3/kafka_2.12-2.3.1/config/server.properties中的listeners地址(見后記1):

 

 

解釋:最重要一點即此WordCountApplication僅是一個邏輯處理單元,可以理解為一個流水線車間,里面有兩條流水線對來料加工再輸出加工品。以上可以看出,有兩個不連通的拓撲結構,第一個拓撲無狀態,其匯聚節點KSTREAM-SINK-0000000005寫入到topic: counts-store-repartition,這個topic又作為第二個拓撲的源,此中間topic的作用是因分組聚合運算”打亂”流元素的順序。插入的節點Processor: KSTREAM-FILTER-0000000005是過濾掉分組聚合key值為空的記錄。

第二個拓撲有狀態,即生成並保存了計算中間值,因為要做分組統計,分組聚合運算節點KSTREAM-AGGREGATE-0000000003保存狀態使用了counts-store,即程序中指定的值。對流中每個元素統計時,會先去保存的狀態數據中去查找匹配,如果有則累加一,然后再寫入counts-store。每個被更新的統計值都再傳輸到處理節點KTABLE-TOSTREAM-0000000007,此節點作用是將統計更新的值再解析成新流。最后傳輸給匯聚節點KSTREAM-SINK-0000000008。以上可見流處理的思想和邏輯,內部迭代確實很強大!拓撲圖如下:

 

 

應用04運行步驟:

第一步,啟動ZK,再啟動kafka,注意先修改config/server.properties 中listeners=PLAINTEXT:// 192.168.1.221:9092:

[root@localhost kafka_2.12-2.3.1]#  ./bin/kafka-server-start.sh config/server.properties
 

第二步,運行com.biao.wordcount.WordCountApplication,啟動kafka流處理車間。

topic數據寫入放在包com.biao.wordcount.producer,當然也可以直接在kafka server中使用命令行寫入,我這里是為了演示多種代碼操作模式。配置類com.biao.wordcount.producer.KafkaConfig,這里使用了kafka的API配置方式,分別配置了topic,producer和consumer的相應參數,並生成Bean對象,請對比application.properties方式:

@Configuration
@EnableKafka
public class KafkaConfig {

    @Bean
    public KafkaTemplate<Integer,String > kafkaTemplate(){
        return new KafkaTemplate<>(this.producerFactory());
    }

    // topic
    @Bean
    public KafkaAdmin admin(){
        Map<String,Object> configs = new HashMap<>(16);
        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.1.221:9092");
        return new KafkaAdmin(configs);
    }

    @Bean
    // NewTopic(String name, int numPartitions, short replicationFactor)
    // kafka中每個topic只需創建一次,
    public NewTopic topic(){
        return new NewTopic("streams-plaintext-input",1, (short) 1);
    }

    @Bean
    // NewTopic(String name, int numPartitions, short replicationFactor)
    // kafka中每個topic只需創建一次,
    public NewTopic topic2(){
        return new NewTopic("streams-wordcount-output",1, (short) 1);
    }

    // producer
    @Bean
    public Map<String,Object> producerConfigs(){
        Map<String, Object> props = new HashMap<>(16);
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.1.221:9092");
        props.put("acks","all");
        props.put("retries",2);
        props.put("batch.size",16384);
        props.put("linger.ms",1);
        props.put("buffer.memory",33554432);
        props.put("key.serializer","org.apache.kafka.common.serialization.IntegerSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//        props.put("key.converter","org.apache.kafka.connect.storage.IntegerConverter");
//        props.put("value.converter","org.apache.kafka.connect.storage.StringConverter");
        return props;
    }

    @Bean
    public ProducerFactory<Integer,String> producerFactory(){
        return new DefaultKafkaProducerFactory<>(this.producerConfigs());
    }

    // consumer
    @Bean
    public Map<String,Object> consumerConfigs(){
        HashMap<String,Object> props =  new HashMap<>(16);
        props.put("bootstrap.servers","192.168.1.221:9092");
        props.put("group.id","foo");
        props.put("enable.auto.commit","true");
        // WordCountApplication 的consumer消費對象是統計的結果 key-value
        props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer","org.apache.kafka.common.serialization.LongDeserializer");
        props.put("formatter","kafka.tools.DefaultMessageFormatter");
        props.put("print.key","true");
        props.put("value.key","true");
//        props.put("key.converter","org.apache.kafka.connect.storage.IntegerConverter");
//        props.put("value.converter","org.apache.kafka.connect.storage.StringConverter");
        return props;
    }

    @Bean
    public ConsumerFactory<Integer,String> consumerFactory(){
        return new DefaultKafkaConsumerFactory<>(this.consumerConfigs());
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<Integer,String> kafkaListenerContainerFactory(){
        ConcurrentKafkaListenerContainerFactory<Integer,String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(this.consumerFactory());
        return factory;
    }

    @Bean
    public SimpleConsumer simpleConsumerLister(){
        return new SimpleConsumer();
    }
}

 

定義消費者,com.biao.wordcount.producer.SimpleConsumer:

@Component
public class SimpleConsumer {
    private Logger log = LoggerFactory.getLogger(SimpleConsumer.class);
    private final CountDownLatch countDownLatch = new CountDownLatch(1);

    @KafkaListener(id = "foo",topics = "streams-wordcount-output")
    public void listen(byte[] records){
        System.out.println("records is >>>> "+ records);
        this.countDownLatch.countDown();
        log.debug("consume successfully!");
    }
    //在WordCountApplication實例中,無法打印流結果,因為需要格式化
/*    public void listen(ConsumerRecord<?,?> records){
        Optional<?> msg = Optional.ofNullable(records.value());
        if (msg.isPresent()){
            Object data = msg.get();
            log.info("Consumer Record >>>>>> {}", records);
            log.info("Record Data >>>>>> {}", data);
        }
    }*/
}

 

定義生產者,並作為啟動類,com.biao.wordcount.producer.KafakaProducer:

@SpringBootApplication
public class KafakaProducer {
    public static void main(String[] args) {
        AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(KafkaConfig.class);
//        KafkaTemplate<Integer,String> kafkaTemplate = (KafkaTemplate<Integer, String>) context.getBean(KafkaTemplate.class);
        KafkaTemplate<Integer,String> kafkaTemplate = (KafkaTemplate<Integer, String>) context.getBean(KafkaTemplate.class);
        LocalDateTime time = LocalDateTime.now();
        String data = "MSG CONTENT -> " + time ;
        // send(String topic, K key, @Nullable V data)
        ListenableFuture<SendResult<Integer,String>> send = kafkaTemplate.send("streams-plaintext-input", 1, data);
        send.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() {
            @Override
            public void onFailure(Throwable ex) {
                System.out.println(">>>>>>> kafka message send failure");
            }

            @Override
            public void onSuccess(SendResult<Integer, String> result) {
                System.out.println(">>>>>>> kafka message send successfully");
            }
        });
    }
}

 

第三步,運行com.biao.wordcount.producer.KafakaProducer, 啟動topic數據寫入,kafka中驗證如下:

 

 

如果多次運行導致測試數據太多,影響結果查看,可以先刪除topic及其數據,若當前topic有使用過即有傳輸過信息:並沒有真正刪除topic只是把這個topic標記為刪除(marked for deletion),要徹底刪除需到ZK中刪除相應的目錄:

[root@localhost kafka_2.12-2.3.1]# ./bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic HelloWorld
Topic HelloWorld is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
 

第四步,在kafka server上查看最終word統計結果,命令:

[root@localhost kafka_2.12-2.3.1]# ./bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.221:9092 --topic streams-wordcount-output --from-beginning --formatter kafka.tools.DefaultMessageFormatter --property print.key=true --property print.value=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

 

 

后記:

1.關於有狀態無狀態,復雜問題簡單化!無狀態對象本身只是個純粹的處理邏輯,不依賴上下文信息,也不改變上下文信息,比如FUNC(x+y),只要有輸入x和y,就輸出相加值,對程序“無害”;有狀態指會保留上下文,如統計單詞數,必須保留每次計算的中間結果,用於下次累加,有狀態對象會破壞程序運行現場,不利於並發和共享。

2.如遇到程序出錯:

[AdminClient clientId=adminclient-1] Error connecting to node dubbo204.domain:9092 (id: 0 rack: null)

這是因為linux的監聽hosts配置引起的,直接修改 
config/server.properties中listeners為linux的虛機IP地址即可,並注意關閉linux的防火牆或打開9092端口:

 

 

3.添加lombok依賴

providedCompile group: 'org.projectlombok', name: 'lombok', version: '1.18.10'
遇到編譯錯誤:
Could not find method providedCompile() for arguments [{group=org.projectlombok, name=lombok, version=1.18.10}]

因providedCompile必須配合 war插件,修改build.gradle:

 

 

4.運行WordCountApplication 報錯:

org.apache.kafka.common.errors.SerializationException: Size of data received by IntegerDeserializer is not 4
這是序列化問題,必須使用正確的序列化器處理對應的數據,如IntegerDeserializer只能反序列化Integer對象,StringSerializer用於序列化String對象。

5.RHEL8.0版本可用性還是不錯的,比7要流暢很多,很多命令都變了,我開的共享:https://pan.baidu.com/s/19gkx07hQ6TuN9UyNWHmChQ 提取碼:bg69,絕對保證可用,之前我也下載了幾次都是損壞的,每次6.62G大小,快哭了。

 

 

總結:kafka API,分為Producer,Consumer,Stream,Connect和AdminClient。Producer/Consumer分別用於管理生產者和消費者,Stream則是自帶的KStream,可以類比JDK8的Stream來理解,即在輸出到最終sink前進行流式計算,且很多方法使用類似,Connect是用於kafka連接到輸入/輸出,支持很多類型,如DB,file,redis,ELK等。AdminClient則管理topic/broker等。KStream+kafka強強聯手,可以預計未來會干出一番大事!

 

推薦閱讀:

  1. 流式計算(一)-Java8Stream

  2. Dubbo學習系列之十六(ELK海量日志分析)
  3. Linux下Redis集群

  4. Dubbo學習系列之十五(Seata分布式事務方案TCC模式)

  5. Dubbo學習系列之十四(Seata分布式事務方案AT模式)


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM