Kafka消费者生产者实例


 
 

为了更为直观展示Kafka的消息生产消费的过程,我会从基于Console和基于Application两个方面介绍使用实例。Kafka是一个分布式流处理平台,具体来说有三层含义:

  1. 它允许发布和订阅记录流,类似于消息队列或企业消息传递系统。
  2. 它可以容错的方式存储记录流。
  3. 它可以处理记录发生时的流。

由于主要介绍如何使用Kafka快速构建生产者消费者实例,所以不会涉及Kafka内部的原理。一个基于Kafka的生产者消费者过程通常是这样的(来自官网):

Kafka生产者消费者

安装Kafka

官网下载kafka_2.11-0.11.0.0.tgz,解压后安装到指定目录:

cd kafka_2.11-0.11.0.0 tar -zxvf kafka_2.11-0.11.0.0.tgz -C pathToInstall
  • 1
  • 2

启动Kafka:

bin/kafka-server-start.sh config/server.properties
  • 1

基于Console

创建Topic

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
  • 1

Producer发送消息

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
  • 1

在控制台输入要发送的消息:

This is a message This is another message
  • 1
  • 2

Consumer接收消息

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
  • 1

输入命令后可以看到控制台输出了刚才的消息:

This is a message This is another message
  • 1
  • 2

基于Application

单个consumer

生产者:

public class SimpleKafkaProducer { public static void main(String[] args) { Properties props = new Properties(); //broker地址 props.put("bootstrap.servers", "localhost:9092"); //请求时候需要验证 props.put("acks", "all"); //请求失败时候需要重试 props.put("retries", 0); //内存缓存区大小 props.put("buffer.memory", 33554432); //指定消息key序列化方式 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //指定消息本身的序列化方式 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer<>(props); for (int i = 0; i < 10; i++) producer.send(new ProducerRecord<>("test", Integer.toString(i), Integer.toString(i))); System.out.println("Message sent successfully"); producer.close(); } }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34

消费者:


public class SimpleKafkaConsumer { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); //每个消费者分配独立的组号 props.put("group.id", "test"); //如果value合法,则自动提交偏移量 props.put("enable.auto.commit", "true"); //设置多久一次更新被消费消息的偏移量 props.put("auto.commit.interval.ms", "1000"); //设置会话响应的时间,超过这个时间kafka可以选择放弃消费或者消费下一条消息 props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList("test")); System.out.println("Subscribed to topic " + "test"); int i = 0; while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) // print the offset,key and value for the consumer records. System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value()); } } }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42

先启动生产者,发送消息到broker,这里简单发送了10条从0-9的消息,再启动消费者,控制台输出如下:

消费结果

集群消费

以上的程序只是单生产者单消费者的场景,所谓集群消费就是同一个topic的消费可能有多个消费者消费,也称广播消费。集群消费只一种多线程或者多机器的消费方式。

要实现集群消费只需要为每个消费者指定不同的group.id就可以。由于代码比较简单就不贴了。

测试发现,当为了两个consumer(这里是两个进程)指定不同的group.id后,producer发送的消息两个consumer都能接受到,这很显然,集群消费嘛。为设置两个consumer的group.id为同一个的时候,只有一个消费者能消费者到。也就是说,kafka的消息只能由组中的单个用户读取。


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM