将Kafka在zookeeper的默认目录,修改为自定义目录时,启动失败,抛出以下异常
INFO 2012-03-06 02:39:04,072 main kafka.server.KafkaZooKeeper Registering broker /brokers/ids/1 FATAL 2012-03-06 02:39:04,111 main kafka.server.KafkaServer Fatal error during startup. java.lang.IllegalArgumentException: Path length must be > 0 at org.apache.zookeeper.common.PathUtils.validatePath(PathUtils.java:48) at org.apache.zookeeper.common.PathUtils.validatePath(PathUtils.java:35) at org.apache.zookeeper.ZooKeeper.create(ZooKeeper.java:620) at org.I0Itec.zkclient.ZkConnection.create(ZkConnection.java:87) at org.I0Itec.zkclient.ZkClient$1.call(ZkClient.java:308) at org.I0Itec.zkclient.ZkClient$1.call(ZkClient.java:304) at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675) at org.I0Itec.zkclient.ZkClient.create(ZkClient.java:304) at org.I0Itec.zkclient.ZkClient.createPersistent(ZkClient.java:213) at org.I0Itec.zkclient.ZkClient.createPersistent(ZkClient.java:223) at org.I0Itec.zkclient.ZkClient.createPersistent(ZkClient.java:223) at kafka.utils.ZkUtils$.createParentPath(ZkUtils.scala:48) at kafka.utils.ZkUtils$.createEphemeralPath(ZkUtils.scala:60) at kafka.utils.ZkUtils$.createEphemeralPathExpectConflict(ZkUtils.scala:72) at kafka.server.KafkaZooKeeper.registerBrokerInZk(KafkaZooKeeper.scala:57) at kafka.log.LogManager.startup(LogManager.scala:124) at kafka.server.KafkaServer.startup(KafkaServer.scala:80) at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:47) at kafka.Kafka$.main(Kafka.scala:60) at kafka.Kafka.main(Kafka.scala)
经确定这是一个Bug,由于initZk()方法没有对路径进行处理导致
原代码:
private def initZk(): ZkClient = { info("Connecting to zookeeper on " + config.zkConnect) val zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, ZKStringSerializer) ZkUtils.setupCommonPaths(zkClient) zkClient }
解决代码:
private def initZk(): ZkClient = { info("Connecting to zookeeper on " + config.zkConnect) val chroot = { if (config.zkConnect.indexOf("/") > 0) config.zkConnect.substring(config.zkConnect.indexOf("/")) else "" } if (chroot.length > 1) { val zkConnForChrootCreation = config.zkConnect.substring(0, config.zkConnect.indexOf("/")) val zkClientForChrootCreation = new ZkClient(zkConnForChrootCreation, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, ZKStringSerializer) ZkUtils.makeSurePersistentPathExists(zkClientForChrootCreation, chroot) info("Created zookeeper path " + chroot) zkClientForChrootCreation.close() } val zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, ZKStringSerializer) ZkUtils.setupCommonPaths(zkClient) zkClient }
BUG
https://issues.apache.org/jira/browse/KAFKA-294
使用的版本是:kafka_2.10-0.8.1.1.tgz