使用方式
创建一个 KafkaConsumer 对象订阅主题并开始接收消息:
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("enable.auto.commit", "false"); // 禁止自动提交
properties.setProperty("group.id", "my-group-id"); // 设置消费者组群ID
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singletonList("topic")); // 订阅主题
try {
while (! Thread.currentThread().isInterrupted()) {
ConsumerRecords<String, String> records = consumer.poll(1000);
records.forEach(System.out::println);
consumer.commitAsync(); // 异步提交偏移量
}
} catch (WakeupException ignore) {
// 忽略关闭异常
} finally {
try {
consumer.commitSync(); // 同步提交偏移量
} finally {
consumer.close();
}
}
Thread thread = Thread.currentThread();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
consumer.wakeup();
try {
thread.join();
} catch (InterruptedException ignore) {}
}));
也可以通过手工管理消费逻辑,实现更为复杂的功能:
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("enable.auto.commit", "false"); // 禁止自动提交
// commit 机制依赖于 group.id
// 当使用 assign 人工分配主题与分片时,可以不指定 group.id
// 但是此时仍然可以执行 commit 操作,底层使用空字符串作为 group.id
properties.setProperty("group.id", "my-group-id"); // 仍然设置消费者组群ID,避免出现意外情况
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
List<PartitionInfo> partitions = consumer.partitionsFor("topic"); // 查询订阅主题对应的分片信息
List<TopicPartition> topicPartitions = partitions.stream().map(partition -> new TopicPartition(partition.topic(), partition.partition())).collect(Collectors.toList());
// consumer 会在启动时,查找最近一次 commit 记录,并从此处开始消费
// 无论 subscribe 与 assign 均有该效果
consumer.assign(topicPartitions); // 人工为 consumer 指定消费分片
// seek 可以指定消费的起始位置
// 需要注意,当 seek 超前于 beginningOffset 时,会导致 kafkaConsumer 一直无法读取到数据
// 可以使用 seekToBeginning 来规避这一问题
// consumer.seek(partition, 518811150L); // 从 518811150 这条消息开始消费(包含)
// 使用 assign 的情况下,commit 仍然是有效的,可以通过 committed 获取最近一次提交记录
// consumer.committed(partition)
try {
while (! Thread.currentThread().isInterrupted()) {
ConsumerRecords<String, String> records = consumer.poll(1000);
records.forEach(System.out::println);
// 消费过程中,可以使用 commit 来保存消费位置,方便下一次进行恢复
// 更为常见的情况是将 offset 保存外部存储中,比如:MySQL
// consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(offset)));
// 消费过程中,可以使用 seek 直接修改 poll 读取位置,达到回溯的效果
// consumer.seek(partition, offset);
// 通过 pause 与 resume 可以控制下一次 poll 是否返回指定的 partition 的数据
// 可以实现类似延迟队列的效果
// consumer.pause(partitions);
// consumer.resume(partitions);
}
} catch (WakeupException ignore) {
// 忽略关闭异常
} finally {
consumer.close();
}
消费者群组
为了实现横向扩展,应用程序需要创建一个消费者群组,然后往群组里添加消费者来提高处理效率,群组里的每个消费者只处理一部分消息:
一个群组里的消费者订阅的是同一个主题,每个消费者接收主题一部分分区的消息(左图)。
消费者无法跟上数据生成的速度,可以向组群内增加更多的消费者分担负载,是消费端横向伸缩的主要手段(中图)。
预先为主题预留的分区可以在负载增长时增加更多的消费者,不过当消费者的数量超过主题分区时,多余的消费者只会被闲置(右图)。
只要保证每个应用程序有独立的消费者群组,就可以让它们获取到主题所有的消息,而不只是其中的一部分:
提交和偏移量
为了保证调用KafkaConsumer.poll()方法时总能返回未被被消费者读取过的记录,消费者需要维护每个分区中已读消息对应的偏移量offset。
一旦消费者发生崩溃或者有新的消费者加入群组,就会触发再均衡rebalance,每个消费者可能分配到新的分区,而不是之前处理的那个。为了能够继续之前的工作,消费者需要知道每个分区的已读偏移量,然后从偏移量指定的地方继续处理。
为了保证这些信息不丢失,消费者需要定期向一个名为 _consumer_offset 的特殊主题发送消息,消息里包含每个分区的偏移量。Kafka 中将这一更新的操作称作提交commit。
边界情况
如果提交的偏移量小于客户端处理的最后一个消息的偏移量,那么处于两个偏移量之间的消息就会被重复处理:
如果提交的偏移量大于客户端处理的最后一个消息的偏移量,那么处于两个偏移量之间的消息将会丢失:
处理偏移量的方式对客户端会有很大的影响,下面分析几种常见的提交方式:
自动提交
最简单的提交方式是让消费者自动提交偏移量:设置 enable.auto.commit = true,那么每过 auto.commit.interval.ms,消费者会自动把从KafkaConsumer.poll()方法接收到的最大偏移量提交上去。
自动提交是在轮询里进行的:消费者每次在进行轮询时会检查是否该提交偏移量,并且自动提交最近一次轮询返回的偏移量。
不过,在使用这种简便的方式之前,需要知道它将会带来怎样的结果:
-
假设在两次提交间隔之间发生了再均衡,期间的消息会被重复处理。
可以通过修改提交时间间隔来减小可能出现重复消息的时间窗,不过无法完全避免这种情况。 -
每次调用轮询方法都会提交上一次调用返回的偏移量,而并不关心具体哪些消息已经被处理了。
在再次调用之前,最好确保所有当前已返回消息都已经处理完毕(调用KafkaConsumer.close()前也会进行自动提交)。
手动提交
自动提交虽然方便,不过并没有为开发者留有余地来避免重复处理消息。为了提高可控性,开发者可以设置 enable.auto.commit = false,让应用程序决定何时提交偏移量。
-
同步提交:使用
KafkaConsumer.commitSync()会提交最新偏移量并等待 broker 对提交请求作出回应。
在成功提交或碰到无法恢复的错误之前会不断重试,会导致应用程序一直阻塞,限制了应用程序的吞吐量。 -
异步提交:使用
KafkaConsumer.commitAsync()会提交最新偏移量但无需等待 broker 的响应并且不进行重试。
不进行重试,是因为可能有一个更大的偏移量已经提交成功,重试可能会覆盖到最新的值,导致再均衡后出现重复消息。
该方法在 broker 作出响应时会执行用户指定的回调,回调经常被用于记录提交错误或生成度量指标。
不过如果要在其中进行重试,一定要注意提交的顺序。
一般情况下,偶尔异步提交失败不会有太大问题,后续的提交总会有成功的。但在关闭消费者或再均衡前的最后一次提交,必须确保提交成功。为了保证可靠性与吞吐量,比较常见的方式是将两者组合使用(具体参考开头的代码示例)。
提交特定偏移量
上面讨论的提交方式中,提交偏移量的频率与处理消息批次的频率是一样的,但某些场景需要在更细的粒度上控制提交:如果KafkaConsumer.poll()方法返回一大批数据,为了避免因再均衡引起的重复处理整批消息,我们希望能在批次处理中间提交部分偏移量。
此时,可以在调用KafkaConsumer.commitSync()或KafkaConsumer.commitAsync()时,通过参数指定具体的分区及其对应的偏移量,人为地控制提交内容。
分区再均衡
群组里的消费者共同读取主题的分区,消费过程中可能出现以下情况:
- 新的消费者加入群组,它会被分配到一个原本由其他消费者读取的分区
- 当前消费者离开群组,原本由它读取的分区将分配给群组里的其他消费者
- 订阅主题发生变化时,比如管理员添加了新的分区,会发生分区重分配
分区的所有权从一个消费者转移到另一个消费者,这样的行为被称为再均衡rebalance。
再均衡非常重要,它为消费者群组带来了高可用性和伸缩性(支持动态添加或移除消费者)。
但是,再均衡整个群组会一小段时间不可用,期间消费者无法读取消息,在恢复之前会拖慢应用程序。
消费者分代
消费者组群可以进行任意次再均衡,为了更好地隔离已失效的状态(比如:避免僵尸实例提交过期的偏移量),Kafka 消费者端引入了分代generation的概念。
消费者的分代信息是一个整数,每当组群进行一次 rebalance 操作,组内所有消费者的 generation 都会递增。当消费者提交偏移时会伴随着 generation 信息,当 broker 会据此判断消息是否来源于一个上一代的消费者,并拒绝过期的提交。
在均衡监听器
如果消费者想在再均衡前后做一些清理工作或准备工作,只需在调用 KafkaConsumer.subscribe() 方法时传进去一个 ConsumerRebalanceListener 实例即可:
private Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
// 订阅时注册再均衡监听器
consumer.subscribe(Collections.singletonList("topic"), new ConsumerRebalanceListener(){
/** 方法会在重新分配分区之后和消费者开始读取消息之前被调用 */
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// 从特定的偏移量处开始读取消息
offsets.entrySet().stream().
filter(e -> partitions.contains(e.getKey())).
forEach(e -> consumer.seek(e.getKey(), e.getValue().offset()));
}
/** 方法会在再均衡开始之前和消费者停止读取消息之后被调用 */
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
consumer.commitSync(offsets); // 提交已经数据对应的偏移量
}
});
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
// 消息处理过程中,实时更新偏移量变化
offsets.put(
new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1));
}
消费者管理
消费者管理由两部分组成:
- 成员管理:管理组内所有 consumer 实例生命周期并负责下发分区分配方案到每个 consumer。
该功能依赖于一个被选为组协调者coordinator的 broker。 - 分区分配:根据指定的分区分配策略制定分配方案并上报 coordinator。
该功能由一个 leader consumer 负责。
分区分配的操作是在 consumer 端执行的好处主要有以下两点:
- 便于维护与升级: 调整分配策略时,无需重启整 broker 集群。
- 便于实现自定义策略:代码实现更为灵活,可以依赖外部存储维护复杂策略,甚至实现机架感知。
变更流程
初始化
- 根据 $\tiny \texttt{_consumer_offset}$ 主题的分区数量 $\tiny \texttt{n}$ 计算 $\tiny \texttt{hashmod(group.id, n)}$
- 选择第 $\tiny \texttt{hashmod(group.id, n)}$ 个 $\tiny \texttt{_consumer_offset}$ 分区的 leader broker 作为 coordinator
加入组群
JoinGroup请求。
coordinator 会从中选择一个组群 leader consumer,并把所有成员信息以及它们的订阅信息发送给这个 leader consumer。
规划分配方案
SyncGroup请求将方案中发送给 coordinator。
其他 consumer 也会向 coordinator 发送不包含分区方案的
SyncGroup请求,然后 coordinator 会将分区方案返回给这些 consumer。
发送心跳
Heartbeat请求,维持它们和群组会话
session以及对分区的所有权。
当 coordinator 长时间未接收到 consumer 的心请求时,会认为 consumer 已失效并触发一次再均衡。
若 coordinator 在心跳响应中返回了 REBALANCE_IN_PROGRESS 信息,意味着当前组群已经开启了新一轮的再均衡。
离开组群
LeaveGroup告知 coordinator 它将要离开群组,并立即触发一次再均衡,尽可能降低处理停顿。
配置解析
client.id
该属性可以是任意字符串,broker 用它来标识从客户端发送过来的消息,通常被用在日志、度量指标和配额里。
group.id
标识消费者所属的消费者组群的唯一字符串。
如果消费者需要基于组群的对订阅进行管理,或基于 Kafka 管理偏移量时,需要指定此属性。
bootstrap.servers
该属性指定 broker 的地址列表。
清单里不需要包含所有的 broker 地址,生产者会从给定的 broker 里查找到其他 broker 的信息。
不过建议至少要提供两个 broker 的信息,一旦其中一个宕机,生产者仍然能够连接到集群上。
key.deserializer & value.deserializer
这两个属性必须被设置为一个实现了org.apache.kafka.common.serialization.Deserializer接口的类。
生产者会使用这个类把键值字节数组反序列化成对象。
receive.buffer.bytes & send.buffer.bytes
设置 socket 读写数据时用到的 TCP 缓冲区大小。如果它们被设为 -1,就使用操作系统的默认值。
当生产者或消费者与 broker 处于不同的机房时,可以适当增大这些值。
fetch.min.bytes
该属性指定了消费者从服务器获取记录的最小字节数。
broker 在收到消费者的数据请求时,如果可用的数据量小于该配置,那么它会等到有足够的可用数据时一并返回给消费者。
如果消费者的数量比较多,把该属性的值设置得大一点可以降低 broker 的工作负载。
或当消费者订阅的主题不活跃时,消费者的 CPU 使用率却很高,可以适当调大该值。
fetch.max.wait.ms
该属性指定了可用数据量不足时 broker 的等待时间。
如果没有足够的数据流入 Kafka,消费者获取最小数据量的要求就得不到满足,最终导致fetch.max.wait.ms的延迟。
如果要降低潜在的延迟(为了满足 SLA),可以把该参数值设置得小一些。
max.poll.records
该属性用于控制单次调用KafkaConsumer.poll()方法能够返回的记录数量。
可以借助该配置控制在轮询里需要处理的数据量。
max.partition.fetch.bytes
该属性指定了KafkaConsumer.poll() 方法从每个分区里返回的最大字节数。
消费者需要保证 max.partition.fetch.bytes * 消费分区数量 可用内存来接收记录。
在为消费者分配内存时,可以给它们多分配一些,因为如果群组里有消费者发生崩溃,剩下的消费者需要处理更多的分区。
该值必须比 broker 能够接收的最大消息的字节数max.message.size大,否则消费者可能无法读取这些消息,导致消费者一直挂起重试。
在设置该属性时,另一个需要考虑的因素是消费者处理数据的时间。
消费者需要频繁调用 KafkaConsumer.poll() 方法来避免会话过期和发生分区再均衡,如果单次调用返回的数据太多,消费者需要更多的时间来处理,可能无法及时进行下一个轮询来避免会话过期。
如果出现这种情况,可以把该值改小,或者延长会话过期时间。
session.timeout.ms
该属性指定了消费者在被认为下线之前可以与服务器断开连接的时间。
如果消费者没有在指定的时间内发送心跳给群组协调器,会被认为已经下线。
协调器就会触发再均衡,把它的分区分配给群组里的其他消费者。
该值越小,可以更快地检测和恢复崩溃的节点,但可能导致非预期的再均衡。
该值越大,可以减少意外的再均衡,不过检测节点崩溃需要更长的时间。
该属性与heartbeat.interval.ms紧密相关:
heartbeat.interval.ms指定了 poll() 方法向协调器发送心跳的频率。session.timeout.ms则指定了消费者可以多久不发送心跳。
一般需要同时修改这两个属性,heartbeat.interval.ms 一般设置为 session.timeout.ms 的三分之一。
auto.offset.reset
该属性指定了消费者在读取一个没有偏移量的分区或者偏移量失效时(包含偏移量的记录已过时并被删除)该作何处理。
latest在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)。earliest在偏移量无效的情况下,消费者将从起始位置读取分区的记录。
enable.auto.commit
该属性指定了消费者是否自动提交偏移量(可以通过配置auto.commit.interval.ms来控制提交的频率)。
为了尽量避免出现重复数据和数据丢失,可以把它设为 false ,由自己控制何时提交偏移量。
partition.assignment.strategy
这两个属性必须被设置为一个实现了org.apache.kafka.clients.consumer.internals.PartitionAssignor接口的类。
消费协调者ConsumerCoordinator会使用这个类,根据给定的消费者和主题,决定哪些分区应该被分配给哪个消费者。
Kafka 默认的分配策略:
org.apache.kafka.clients.consumer.RangeAssignor把主题的若干个连续的分区分配给消费者org.apache.kafka.clients.consumer.RoundRobinAssignor把主题的所有分区逐个分配给消费者org.apache.kafka.clients.consumer.StickyAssignor尽可保证分配平衡的前提下减少再均衡造成的变更
