一,問題描述
搭建的用來測試的單節點Kafka集群(Zookeeper和Kafka Broker都在同一台Ubuntu上),在命令行下使用:
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic topicForTest
創建了一個3個分區的Topic如下:(Topic名稱為 topicForTest)
使用 Console producer/consumer都能夠正常地向topicForTest發送和接收消息:
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topicForTest bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topicForTest --from-beginning
但是在自己的windows 機器的開發環境下,使用kafka client JAVA API (0.10版本)中的KafkaConsumer 卻無法接收消息,表現為:在poll()方法中阻塞了。
更具體一點地,是在:org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient類的awaitMetadataUpdate方法中長時間阻塞了。類似問題可參考:這里
然而,在windows機器上,使用telnet client 能夠連接到 kafka broker 的9092默認端口。
后面發現是Kafka server中,配置文件 config/server.properties中 沒有配置:advertised.host.name 或者 listener 參數。官網查了下這個參數的解釋如下:
advertised.host.name Hostname to publish to ZooKeeper for clients to use. If this is not set, it will use the value for `host.name` if configured.
Otherwise it will use the value returned from java.net.InetAddress.getCanonicalHostName(). advertised.listeners Listeners to publish to ZooKeeper for clients to use, if different than the listeners above.If this is not set, the value for `listeners` will be used.
這里的原因是: JAVA API中的kafkaConsumer找不到Zookeeper去獲取元數據信息。
The first time you call poll() with a new consumer, it is responsible for finding the GroupCoordinator,
joining the consumer group and receiving a partition assignment.
使用bin/kafka-verifiable-producer.sh --topic topicForTest --max-messages 200 --broker-list localhost:9092 向該Topic中寫入200條消息。啟動下面的程序測試:
import java.util.Arrays; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; public class ConsumerTest { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "192.168.121.34:9092"); props.put("group.id", "mygroup"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("auto.offset.reset", "earliest"); props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(props); kafkaConsumer.subscribe(Arrays.asList("topicForTest")); while(true) { System.out.println("nothing available..."); ConsumerRecords<String, String> records = kafkaConsumer.poll(1000); for(ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, value = %s", record.offset(), record.value()); System.out.println(); } } } }
程序拋出的DEBUG異常如下:
2017-08-17 18:14:48.210 [main] INFO o.a.kafka.common.utils.AppInfoParser SEQ - Kafka version : 0.10.1.0 2017-08-17 18:14:48.210 [main] INFO o.a.kafka.common.utils.AppInfoParser SEQ - Kafka commitId : 3402a74efb23d1d4 2017-08-17 18:14:48.211 [main] DEBUG o.a.k.clients.consumer.KafkaConsumer SEQ - Kafka consumer created 2017-08-17 18:14:48.212 [main] DEBUG o.a.k.clients.consumer.KafkaConsumer SEQ - Subscribed to topic(s): topicForTest 2017-08-17 18:14:48.212 [main] DEBUG o.a.k.c.c.i.AbstractCoordinator SEQ - Sending coordinator request for group group_test109 to broker xxx:9092 (id: -1 rack: null) ..... ..... 2017-08-17 18:14:48.274 [main] DEBUG o.a.kafka.common.network.Selector SEQ - Created socket with SO_RCVBUF = 65536, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node -1 2017-08-17 18:14:48.275 [main] DEBUG o.apache.kafka.clients.NetworkClient SEQ - Completed connection to node -1 2017-08-17 18:14:48.337 [main] DEBUG o.apache.kafka.clients.NetworkClient SEQ - Sending metadata request {topics=[topicForTest]} to node -1 2017-08-17 18:14:48.396 [main] DEBUG org.apache.kafka.clients.Metadata SEQ - Updated cluster metadata version 2 to Cluster(id = xgdvTIvHTn2dL3cnEm-dRQ, nodes = [ubuntu:9092 (id: 0 rack: null)], partitions = [Partition(topic = topicForTest,partition = 0, leader = 0, replicas = [0,], isr = [0,])]) 2017-08-17 18:14:48.398 [main] DEBUG o.a.k.c.c.i.AbstractCoordinator SEQ - Received group coordinator response ClientResponse(receivedTimeMs=1502964888398, disconnected=false, request=ClientRequest(expectResponse=true, callback=org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler@144d0b84, request=RequestSend(header={api_key=10,api_version=0,correlation_id=0,client_id=consumer-1}, body={group_id=group_test109}), createdTimeMs=1502964888230, sendTimeMs=1502964888338), responseBody={error_code=0,coordinator={node_id=0,host=ubuntu,port=9092}}) 2017-08-17 18:14:48.399 [main] INFO o.a.k.c.c.i.AbstractCoordinator SEQ - Discovered coordinator ubuntu:9092 (id: 2147483647 rack: null) for group group_test109. 2017-08-17 18:14:48.399 [main] DEBUG o.apache.kafka.clients.NetworkClient SEQ - Initiating connection to node 2147483647 at ubuntu:9092. 2017-08-17 18:14:51.127 [main] DEBUG o.apache.kafka.clients.NetworkClient SEQ - Error connecting to node 2147483647 at ubuntu:9092: java.io.IOException: Can't resolve address: ubuntu:9092 at org.apache.kafka.common.network.Selector.connect(Selector.java:180) ~[kafka-clients-0.10.1.0.jar:na] at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:498) [kafka-clients-0.10.1.0.jar:na] at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:159) [kafka-clients-0.10.1.0.jar:na] at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.tryConnect(ConsumerNetworkClient.java:454) [kafka-clients-0.10.1.0.jar:na] at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$GroupCoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:556) [kafka-clients-0.10.1.0.jar:na]
....[main] INFO o.a.k.c.c.i.AbstractCoordinator SEQ - Marking the coordinator ubuntu:9092 (id: 2147483647 rack: null) dead for group xxx
再來看ubuntu上的etc/hosts文件:
127.0.0.1 ubuntu localhost 127.0.1.1 ubuntu localhost
因此,只需要在config/server.properties里面配置 listeners 參數就可以了。
listeners=PLAINTEXT://your.host.name:9092
二,關於Kafka的一些簡單理解
①目錄結構
前面testForTopic一共有三個分區,因此在 log.dirs目錄下關於該Topic一共有三個目錄,每個目錄下內容如下:
使用命令:./bin/kafka-topics.sh --list --zookeeper localhost:2181 可以查看當前Topic信息。
使用命令:./bin/kafka-consumer-groups.sh --list --bootstrap-server YOUR_IP_ADDRESS:9092 可以查看consumer group的信息
(如果提示:Error: Executing consumer group command failed due to Request METADATA failed on brokers List(ubuntu:9092 (id: -1 rack: null)))(ip地址/主機名/localhost 試試?)
使用命令:./bin/kafka-consumer-groups.sh --bootstrap-server YOUR_IP_ADDRESS:9092 --describe --group groupName 查看某個具體的group的情況
② Topic 、Partition、 ConsumerGroup、Consumer 之間的一些關系
一個Topic一般會 分為 多個 分區(Partition),生產者可以同時向這個Topic的多個分區寫入消息,而消費者則以 組 為單位,訂閱這個Topic,消費者組里面的 某個消費者 負責 消費 某個Partition。 感覺 Topic 像是邏輯上的概念。
一般是訂閱了同一Topic的若干個Consumer 屬於某個ConsumerGroup。對於一個ConsumerGroup而言,其中的某個Consumer負責消費某個Partition,則該Partition中的消息就不會被其他的Consumer消費了。如下圖:
ⓐTopic T1有四個分區,即TopicT1中的消息存儲在這四個分區中,它被ConsumerGroup1 這個組中的消費者訂閱,其中Consumer1負責消費Partition0和2,Consumer2負責消費Partition1和3。正常情況下,Topic T1中被ConsumerGroup中的消費者 消費一次,也即:TopicT1中的某條消息被Consumer1消費了,就不會被Consumer2消費---對於ConsumerGroup組內成員而言,Consumer1消費了 消息A,Consumer2就不會消費 消息A了。
若要想讓TopicT1中的消費被多個 消費者消費,可以再創建一個 消費者組ConsumerGroup2,ConsumerGroup2中的消費者 去訂閱TopicT1 即可。如下圖:TopicT1中的消息,都會被消費2次,一次是ConsumerGroup1中的消費者消費;另一次是被ConsumerGroup2中的消費者消費。
每個ConsumerGroup里面有個 group leader。group leader一般是最先加入到該消費者組的 消費者。group leader從 group coordinator那里接受分區信息,然后分配給各個consumer去訂閱。
When a consumer wants to join a group, it sends a JoinGroup request to the group coordinator. The first consumer to join the group becomes the group leader. The leader receives a list of all consumers in the group from the group coordinator and it is responsible for assigning a subset of partitions to each consumer
ⓑConsumerGroup中消費者數量大於 Topic中的分區數量,則某個消費者 將沒有 Partition 可消費。如下圖:Consumer5,消費不到 任何消息。
Partition rebalance:
從上面圖片中可看出,消息的消費是以 Partition為單位的。若,ConsumerGroup新增了 幾個消費者,或者減少了幾個消費者,那么Kafka Broker就會重新分配Partition給Consumer。這個重新分配的過程就是 rebalance。比如說,ConsumerA 正在消費PartitionA,某個原因ConsumerA掛了,PartitionA中的消息就沒有Consumer消費了。因此Broker發現ConsumerA掛了之后,就要把PartitionA交給另外還存活的Consumer去消費。
The event in which partition ownership is moved from one consumer to another is called a rebalance
rebalance過程會有很多問題,比如:1,在 rebalance這個過程中,Conusmer是不能消費消息的。
During a rebalance, consumers can’t consume messaged, so a rebalance is in effect a short window of unavailability on the entire consumer group
2,會造成消息被重復消費。比如ConsumerA 得到了 PartitionA 的幾條消息,進行了一定的處理,然后還未 來得及 向Broker 確認它消費完了這幾條消息(未commit),它就掛了。Broker rebalance之后,把PartitionA 交給了ComsumerB訂閱,那么 ConsumerB 也會得到 ConsumerA 處理了 但未提交 的 那幾條消息。那這幾條消息 就被 重復消費了。
3,Broker是如何發現Consumer掛了的呢?
這是通過KafkaConsumer 中的poll(long )方法實現的。
③KafkaConsumer 的 poll(long )方法
poll方法干了哪些事兒?coordination、分區平衡、consumer與broker之間心跳包 keep alive、獲取消息...
Once the consumer subscribes to topics, the poll loop handles all details of coordination, partition rebalances, heartbeats and data fetching
消費者必須不停地 執行 poll 方法,一是不斷地從kafka那里獲得消息,另一個是告訴kafka,我沒有發生故障,與 broker是 keep alive的。
consumers must keep polling Kafka or they will be considered dead and the partitions they are consuming will be handed to another consumer in the group to continue consuming.
poll(long )方法有一個 long 類型的參數,這些參數受 consumer 參數配置的影響,也與具體的應用 如何 處理消息 有關。
This specifies how long it will take poll to return, with or without data. The value is typically driven by application needs for quick responses - how fast do you want to return control to the thread that does the polling?
消費者消費完消息后,不再消費了,要記得關閉。因為,consumer要離開了,那么就會造成 rebalance,consumer.close() 使得consumer主動 通知 Group Coordinator 進行 rebalance,而不是靠 GroupCoordinator去等待一段時間發現 Consumer離開了(Consumer不再執行poll方法了),然后再進行 rebalance。
consumer.close();
④Kafka 中的一些配置參數
Broker的配置參數;Producer的配置參數;Consumer的配置參數
auto.commit.interval.ms The frequency in ms that the consumer offsets are committed to zookeeper.(consumer 隔多久提交 offsets --消費指針)
group.id A unique string that identifies the Connect cluster group this worker belongs to.
heartbeat.interval.ms
session.timeout.ms ....這些參數的設置與具體的應用相關,也會影響 rebalance時機,具體不是太了解。
具體的配置參數可參考:Kafka配置參數解釋。
參考文獻: