轉載請注明原創地址:http://www.cnblogs.com/dongxiao-yang/p/7200971.html
最近需要研究flink-connector-kafka的消費行為,發現flink使用了kafka consumer一個比較底層一點的assign接口而不是之前比較常用的subscirbe,於是研究下二者之間的差異。
首先看api文檔:http://kafka.apache.org/0110/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
public void assign(Collection<TopicPartition> partitions)
unsubscribe()
.
Manual topic assignment through this method does not use the consumer's group management functionality. As such, there will be no rebalance operation triggered when group membership or cluster and topic metadata change. Note that it is not possible to use both manual partition assignment with assign(Collection)
and group assignment with subscribe(Collection, ConsumerRebalanceListener)
.
與subscirbe方法不同,assign方法由用戶直接手動consumer實例消費哪些具體分區,根據api上述描述,assign的consumer不會擁有kafka的group management機制,也就是當group內消費者數量變化的時候不會有reblance行為發生。assign的方法不能和subscribe方法同時使用。
然后看一下具體實現源碼:
public class KafkaConsumer<K, V> implements Consumer<K, V>{ ........... private final SubscriptionState subscriptions; public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) { ........... this.subscriptions.subscribe(new HashSet<>(topics), listener); ........... } ...... public void assign(Collection<TopicPartition> partitions) { ........... this.subscriptions.assignFromUser(new HashSet<>(partitions)); ........... }
每一個KafkaConsumer實例內部都擁有一個SubscriptionState對象,subscribe內部調用了subscribe方法,assign內部調用了assignFromUser方法,看一下這兩個方法的具體實現:
private enum SubscriptionType { NONE, AUTO_TOPICS, AUTO_PATTERN, USER_ASSIGNED } private void setSubscriptionType(SubscriptionType type) { if (this.subscriptionType == SubscriptionType.NONE) this.subscriptionType = type; else if (this.subscriptionType != type) throw new IllegalStateException(SUBSCRIPTION_EXCEPTION_MESSAGE); } public void subscribe(Set<String> topics, ConsumerRebalanceListener listener) { if (listener == null) throw new IllegalArgumentException("RebalanceListener cannot be null"); setSubscriptionType(SubscriptionType.AUTO_TOPICS); this.listener = listener; changeSubscription(topics); } public void assignFromUser(Set<TopicPartition> partitions) { setSubscriptionType(SubscriptionType.USER_ASSIGNED); if (!this.assignment.partitionSet().equals(partitions)) { fireOnAssignment(partitions); Map<TopicPartition, TopicPartitionState> partitionToState = new HashMap<>(); for (TopicPartition partition : partitions) { TopicPartitionState state = assignment.stateValue(partition); if (state == null) state = new TopicPartitionState(); partitionToState.put(partition, state); } this.assignment.set(partitionToState); this.needsFetchCommittedOffsets = true; } }
由上述代碼可知,SubscriptionState 內部擁有一個SubscriptionType類型的枚舉變量subscriptionType,枚舉共擁有NONE, AUTO_TOPICS, AUTO_PATTERN, USER_ASSIGNED四種狀態類型,subscribe方法會把subscriptionType狀態設置為AUTO_TOPICS,assignFromUser會設置為USER_ASSIGNED。尤其是setSubscriptionType設置枚舉的方法內部:else if (this.subscriptionType != type) throw new IllegalStateException(SUBSCRIPTION_EXCEPTION_MESSAGE); 代碼保證了,如果同一個consumer已經調用了某一種訂閱模式,再次試圖更改為另一種模式的時候程序會直接拋出錯誤。
poll方法調用情況下的不同實現
上述兩種模式初始化的consumer在fetch數據的時候調用的是同樣的poll方法,每次poll會調用pollOnce方法內的
coordinator.poll(time.milliseconds());
具體源碼如下
public void poll(long now) { invokeCompletedOffsetCommitCallbacks(); if (subscriptions.partitionsAutoAssigned() && coordinatorUnknown()) { ensureCoordinatorReady(); now = time.milliseconds(); } if (needRejoin()) { // due to a race condition between the initial metadata fetch and the initial rebalance, // we need to ensure that the metadata is fresh before joining initially. This ensures // that we have matched the pattern against the cluster's topics at least once before joining. if (subscriptions.hasPatternSubscription()) client.ensureFreshMetadata(); ensureActiveGroup(); now = time.milliseconds(); } pollHeartbeat(now); maybeAutoCommitOffsetsAsync(now); } public boolean needRejoin() { if (!subscriptions.partitionsAutoAssigned()) return false; // we need to rejoin if we performed the assignment and metadata has changed if (assignmentSnapshot != null && !assignmentSnapshot.equals(metadataSnapshot)) return true; // we need to join if our subscription has changed since the last join if (joinedSubscription != null && !joinedSubscription.equals(subscriptions.subscription())) return true; return super.needRejoin(); }
public boolean partitionsAutoAssigned() {
return this.subscriptionType == SubscriptionType.AUTO_TOPICS || this.subscriptionType == SubscriptionType.AUTO_PATTERN;
}
ConsumerCoordinator的poll方法會判斷consumer的前述SubscriptionType類型,只有類型是AUTO_TOPICS或者AUTO_PATTERN(subscribe方法的另一種參數重載),才會與ConsumerCoordinator進行交互,判斷是否需要reblance等行為。
所以正如api文檔描述,assign的consumer不會擁有kafka的group management機制,也就是當group內消費者數量變化的時候不會有reblance行為發生。