Kafka消費者生產者實例


 
 

為了更為直觀展示Kafka的消息生產消費的過程,我會從基於Console和基於Application兩個方面介紹使用實例。Kafka是一個分布式流處理平台,具體來說有三層含義:

  1. 它允許發布和訂閱記錄流,類似於消息隊列或企業消息傳遞系統。
  2. 它可以容錯的方式存儲記錄流。
  3. 它可以處理記錄發生時的流。

由於主要介紹如何使用Kafka快速構建生產者消費者實例,所以不會涉及Kafka內部的原理。一個基於Kafka的生產者消費者過程通常是這樣的(來自官網):

Kafka生產者消費者

安裝Kafka

官網下載kafka_2.11-0.11.0.0.tgz,解壓后安裝到指定目錄:

cd kafka_2.11-0.11.0.0 tar -zxvf kafka_2.11-0.11.0.0.tgz -C pathToInstall
  • 1
  • 2

啟動Kafka:

bin/kafka-server-start.sh config/server.properties
  • 1

基於Console

創建Topic

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
  • 1

Producer發送消息

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
  • 1

在控制台輸入要發送的消息:

This is a message This is another message
  • 1
  • 2

Consumer接收消息

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
  • 1

輸入命令后可以看到控制台輸出了剛才的消息:

This is a message This is another message
  • 1
  • 2

基於Application

單個consumer

生產者:

public class SimpleKafkaProducer { public static void main(String[] args) { Properties props = new Properties(); //broker地址 props.put("bootstrap.servers", "localhost:9092"); //請求時候需要驗證 props.put("acks", "all"); //請求失敗時候需要重試 props.put("retries", 0); //內存緩存區大小 props.put("buffer.memory", 33554432); //指定消息key序列化方式 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //指定消息本身的序列化方式 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer<>(props); for (int i = 0; i < 10; i++) producer.send(new ProducerRecord<>("test", Integer.toString(i), Integer.toString(i))); System.out.println("Message sent successfully"); producer.close(); } }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34

消費者:


public class SimpleKafkaConsumer { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); //每個消費者分配獨立的組號 props.put("group.id", "test"); //如果value合法,則自動提交偏移量 props.put("enable.auto.commit", "true"); //設置多久一次更新被消費消息的偏移量 props.put("auto.commit.interval.ms", "1000"); //設置會話響應的時間,超過這個時間kafka可以選擇放棄消費或者消費下一條消息 props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList("test")); System.out.println("Subscribed to topic " + "test"); int i = 0; while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) // print the offset,key and value for the consumer records. System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value()); } } }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42

先啟動生產者,發送消息到broker,這里簡單發送了10條從0-9的消息,再啟動消費者,控制台輸出如下:

消費結果

集群消費

以上的程序只是單生產者單消費者的場景,所謂集群消費就是同一個topic的消費可能有多個消費者消費,也稱廣播消費。集群消費只一種多線程或者多機器的消費方式。

要實現集群消費只需要為每個消費者指定不同的group.id就可以。由於代碼比較簡單就不貼了。

測試發現,當為了兩個consumer(這里是兩個進程)指定不同的group.id后,producer發送的消息兩個consumer都能接受到,這很顯然,集群消費嘛。為設置兩個consumer的group.id為同一個的時候,只有一個消費者能消費者到。也就是說,kafka的消息只能由組中的單個用戶讀取。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM