kafka針對每個消費組的消費信息都在zookeeper中做了記錄,啟動一個zookeeper客戶端,以下可以看出kafka每個消費組在zookeeper中的消費每個kafka主題的信息
[zk: localhost:2181(CONNECTED) 0] ls /consumers/kafkawarning0 kafkawarning02 (消費組groupid) kafkawarning01 [zk: localhost:2181(CONNECTED) 0] ls /consumers/kafkawarning01/offsets [avro-log-bj-jingnei1-dev-proxy-websense] [zk: localhost:2181(CONNECTED) 1] ls /consumers/kafkawarning01/offsets/avro-log-bj-jingnei1-dev-proxy-websense(對應主題) [3, 2, 1, 0, 4] [zk: localhost:2181(CONNECTED) 2] get /consumers/kafkawarning01/offsets/avro-log-bj-jingnei1-dev-proxy-websense/0(對應分區) 80492922(偏移量) cZxid = 0x80037a3fa ctime = Tue Mar 29 10:51:35 CST 2016 mZxid = 0x80037a3fa mtime = Tue Mar 29 10:51:35 CST 2016 pZxid = 0x80037a3fa cversion = 0 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 8 numChildren = 0
針對這個我們可以通過修改zookeeper中的信息來重置一個topic的消費組的offset,以下是代碼
public static void main(String[] args) { String zkConnect = "11.11.184.162:2181,11.11.184.165:2181,11.11.184.178:2181"; int sessionTimeout = 5000; int connectionTimeout = 5000; ZkClient zkClient = new ZkClient(zkConnect, sessionTimeout, connectionTimeout,new ZkSerializerSuper()); //修改指定消費組在zookeeper中的偏移量
map.put("0","12345");
map.put(2,"23451");
map.put(1,"23541"); setPersistentPathInZk(map,zkClient);//map存放的是每個分區對應的偏移量 }
//調用ZKUtils中的updatePersistentPath方法進行修改
private static void setPersistentPathInZk(Map map, ZkClient zkClient) {
String group_id = "kafkawarning01";
String topic = "topic1";
Set set = map.keySet(); for (Object key : set) { ZkUtils.updatePersistentPath(zkClient, "/consumers/"+group_id+"/offsets/"+topic+"/"+key, map.get(key).toString()); } }