在网上碰到的问题,想了下使用现有的API还是可以实现的。 首先,需要引入Kafka服务器端代码,比如加入Kafka 1.0.0依赖: Maven <dependency> <groupId>org.apache.kafka</groupId> < ...
前段时间在Kafka QQ群中有人问及此事 关于Java consumer如何动态修改topic订阅的问题。仔细一想才发现这的确是个好问题,因为如果简单地在另一个线程中直接持有consumer实例然后调用subscribe进行修改,consumer端必然会抛出异常ConcurrentModificationException:KafkaConsumer is not safe for multi ...
2017-06-17 16:55 6 9663 推荐指数:
在网上碰到的问题,想了下使用现有的API还是可以实现的。 首先,需要引入Kafka服务器端代码,比如加入Kafka 1.0.0依赖: Maven <dependency> <groupId>org.apache.kafka</groupId> < ...
之前写过如何用服务器端的API代码来获取订阅某topic的所有consumer group,参见这里。使用服务器端的API需要用到kafka.admin.AdminClient类,但是这个类在0.11.0.0版本已经被标记为不推荐使用了,故目前最合适的方式还是通过客户端API ...
1,查看kafka topic列表,使用--list参数 2,查看kafka特定topic的详情,使用--topic与--describe参数 列出了test_topic的parition数量、replica因子以及每个partition的leader ...
最近工作中遇到需要使用kafka的场景,测试消费程序启动后,要莫名的过几十秒乃至几分钟才能成功获取到到topic的partition和offset,而后开始消费数据,于是学习了一下查看kafka broker里topic和consumer group状态的相关命令,这里记录一下。 命令参考 ...
最近工作中遇到需要使用kafka的场景,测试消费程序启动后,要莫名的过几十秒乃至几分钟才能成功获取到到topic的partition和offset,而后开始消费数据,于是学习了一下查看kafka broker里topic和consumer group状态的相关命令,这里记录一下。 命令参考 ...
1.获取所有topic package com.example.demo; import java.io.IOException; import java.util.List; import org.apache.zookeeper.KeeperException; import ...
Kafka提供了两种Consumer API,分别是:High Level Consumer API 和 Lower Level Consumer API(Simple Consumer API) High Level Consumer API:高度抽象的Kafka消费者API;将底层具体获取 ...
Kafka提供了两种Consumer API,分别是:High Level Consumer API 和 Lower Level Consumer API(Simple Consumer API) High Level Consumer API:高度抽象的Kafka消费者API;将底层具体获取 ...