kafka消費組創建和刪除原理


0.10.0.0版本的kafka的消費者和消費組已經不在zk上注冊節點了,那么消費組是以什么形式存在的呢?

1 入口

看下kafka自帶的腳本kafka-consumer-groups.sh,可見腳本調用了kafka.admin.ConsumerGroupCommand

exec $(dirname $0)/kafka-run-class.sh kafka.admin.ConsumerGroupCommand "$@"

看下ConsumerGroupCommand,從代碼中可以看出新版本的kafka不支持刪除消費組操作,實際上,當消費組內消費者為空的時候消費組就會被刪除。

def main(args: Array[String]) {
    // ...
    val consumerGroupService = {
      if (opts.options.has(opts.newConsumerOpt)) new KafkaConsumerGroupService(opts) // 對於新版本kafka來說調用的是KafkaConsumerGroupService
      else new ZkConsumerGroupService(opts)
    }

    try {
      if (opts.options.has(opts.listOpt))
        consumerGroupService.list() // 以此為例來看下消費組存在的形式
      else if (opts.options.has(opts.describeOpt))
        consumerGroupService.describe()
      else if (opts.options.has(opts.deleteOpt)) {
        consumerGroupService match {
          case service: ZkConsumerGroupService => service.delete()
          case _ => throw new IllegalStateException(s"delete is not supported for $consumerGroupService")
        }
      }
    } 
    // ...
  }

我們以KafkaConsumerGroupService#list為例來看下消費組存在的形式。KafkaConsumerGroupService#list用於獲取所有的消費組。沿着代碼一直追溯可以看到其會調用AdminClient#listAllGroups。從代碼中可以看出要想獲取到所有消費組,就需要遍歷每個broker。而要獲取某個broker上的消費組則需要發送ApiKeys.LIST_GROUPS的請求。

def listAllGroups(): Map[Node, List[GroupOverview]] = {
    findAllBrokers.map {
      case broker =>
        broker -> { // 需要遍歷每個broker
          try {
            listGroups(broker)
          } catch {
            case e: Exception =>
              debug(s"Failed to find groups from broker ${broker}", e)
              List[GroupOverview]()
          }
        }
    }.toMap
}

def listGroups(node: Node): List[GroupOverview] = { // 向相應broker發送請求來獲取改broker上的消費組信息
    val responseBody = send(node, ApiKeys.LIST_GROUPS, new ListGroupsRequest())
    val response = new ListGroupsResponse(responseBody)
    Errors.forCode(response.errorCode()).maybeThrow()
    response.groups().map(group => GroupOverview(group.groupId(), group.protocolType())).toList
  }

看下KafkaApis.scala對應的請求處理方法handleListGroupsRequest

def handleListGroupsRequest(request: RequestChannel.Request) {
    // ... 
    
      val (error, groups) = coordinator.handleListGroups() // 關鍵,獲取消費組列表
      val allGroups = groups.map { group => new ListGroupsResponse.Group(group.groupId, group.protocolType) } 
      new ListGroupsResponse(error.code, allGroups.asJava)
    }
    requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody)))
  }

順着coordinator.handleListGroups一直往下,可以看到最終是調用GroupMetadataManager#currentGroups來獲取到broker上的消費組的。到這里我們可以看出,消費組和GroupMetadataManager有關。

def currentGroups(): Iterable[GroupMetadata] = groupsCache.values

2 存在形式

GroupMetadata表示一個消費組,MemberMetadata表示一個消費者。先放下總結的圖
group

GroupMetadataManager有個groupsCache屬性保存了該broker所管轄的消費組

private val groupsCache = new Pool[String, GroupMetadata]

看下GroupMetadata的內部屬性

private[coordinator] class GroupMetadata(val groupId: String, val protocolType: String) {

  private val members = new mutable.HashMap[String, MemberMetadata] // 消費組的客戶端
  private var state: GroupState = Stable
  var generationId = 0 // generationId 用於reblance
  var leaderId: String = null
  var protocol: String = null
  // ... 
}

// MemberMetadata表示一個消費者
private[coordinator] class MemberMetadata(val memberId: String,
                                          val groupId: String,
                                          val clientId: String,
                                          val clientHost: String,
                                          val sessionTimeoutMs: Int,
                                          var supportedProtocols: List[(String, Array[Byte])]) {

  var assignment: Array[Byte] = Array.empty[Byte] // 消費者分配到的partiton
  var awaitingJoinCallback: JoinGroupResult => Unit = null
  var awaitingSyncCallback: (Array[Byte], Short) => Unit = null
  var latestHeartbeat: Long = -1
  var isLeaving: Boolean = false
  // ...
}

以上就是消費組及其消費者的存在形式,即存在緩存變量中,而不是持久在其他什么地方

3 消費組的創建

消費組是不會單獨創建的,消費組的創建是在消費者第一次發送join_group請求的時候創建的。創建消費組過程也很簡單,就是在GroupMetadataManager#groupsCache加入代表該消費組的GroupMetadata

GroupCoordinator#handleJoinGroup

def handleJoinGroup(groupId: String,
                      memberId: String,
                      clientId: String,
                      clientHost: String,
                      sessionTimeoutMs: Int,
                      protocolType: String,
                      protocols: List[(String, Array[Byte])],
                      responseCallback: JoinCallback) {
    // ...
    } else {
      var group = groupManager.getGroup(groupId)
      if (group == null) {
        if (memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID) {
          responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID.code))
        } else {
          group = groupManager.addGroup(new GroupMetadata(groupId, protocolType)) // 關鍵,如果group為空,則添加一個group
          doJoinGroup(group, memberId, clientId, clientHost, sessionTimeoutMs, protocolType, protocols, responseCallback)
        }
      } else {
        doJoinGroup(group, memberId, clientId, clientHost, sessionTimeoutMs, protocolType, protocols, responseCallback)
      }
    }
  }

GroupMetadataManager#addGroup

def addGroup(group: GroupMetadata): GroupMetadata = {
    val currentGroup = groupsCache.putIfNotExists(group.groupId, group) // 加入代表該消費組的GroupMetadata
    if (currentGroup != null) {
      currentGroup
    } else {
      group
    }
  }

4 消費組的刪除

在第一節ConsumerGroupCommand中我們可以知道消費組是不支持手動刪除的,那么消費組是怎么刪除的呢,實際上當消費組中的消費者為空的時候,消費組就會被刪除。

4.1 刪除動作

看下GroupMetadataManager#removeGroup,我先看下刪除消費組都有哪些動作

def removeGroup(group: GroupMetadata) {
    if (groupsCache.remove(group.groupId, group)) { // 從cache中移除group
        // 然后再__consumer_offsets主題中該group對應的partition寫一個tombstone消息,用於壓縮,這是因為__consumer_offsets不會刪除,只會壓縮

      val groupPartition = partitionFor(group.groupId) // 計算group相關聯分區,默認是abs(hashcode) % 50
      val (magicValue, timestamp) = getMessageFormatVersionAndTimestamp(groupPartition)

      // 然后將tombstone寫入該partition,用於壓縮
      val tombstone = new Message(bytes = null, key = GroupMetadataManager.groupMetadataKey(group.groupId),
        timestamp = timestamp, magicValue = magicValue)

      val partitionOpt = replicaManager.getPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, groupPartition)
      partitionOpt.foreach { partition =>
        val appendPartition = TopicAndPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, groupPartition)

        trace("Marking group %s as deleted.".format(group.groupId))

        try {
          partition.appendMessagesToLeader(new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, tombstone))
        } catch {
          case t: Throwable =>
            error("Failed to mark group %s as deleted in %s.".format(group.groupId, appendPartition), t)
          // ignore and continue
        }
      }
    }
  }

由以上可以看出,刪除消費組有兩個動作

  1. 將cache,即(Pool[String, GroupMetadata])中的消費組移除
  2. 在__consumer_offsets中要刪除消費組相關的partition中寫入tombstone,而不會刪除要刪除消費組的相關記錄

4.2 觸發刪除的動作

唯一調用GroupMetadataManager#removeGroup的地方是GroupCoordinator#onCompleteJoin,而調用GroupCoordinator#onCompleteJoin的唯一地方是DelayedJoin。

GroupCoordinator#onCompleteJoin

def onCompleteJoin(group: GroupMetadata) {
    // ...
        if (group.isEmpty) {
          group.transitionTo(Dead) // 先將消費組置位dead狀態,然后移除
          groupManager.removeGroup(group)
          info("Group %s generation %s is dead and removed".format(group.groupId, group.generationId))
        }
      }
      // ...
}

GroupCoordinator#onCompleteJoin

private[coordinator] class DelayedJoin(coordinator: GroupCoordinator,
                                            group: GroupMetadata,
                                            sessionTimeout: Long)
  extends DelayedOperation(sessionTimeout) {

  override def tryComplete(): Boolean = coordinator.tryCompleteJoin(group, forceComplete)
  override def onExpiration() = coordinator.onExpireJoin()
  override def onComplete() = coordinator.onCompleteJoin(group)
}

難道是在joinGroup操作的時候刪除消費組嗎?其實並不是,而是在heartbeat超時的時候刪除的,即當最后一個消費者心跳超時或者說消費組內沒有了消費者的時候,該消費組就對被刪除。從DelayedHeartbeat開始看下

private[coordinator] class DelayedHeartbeat(coordinator: GroupCoordinator,
                                            group: GroupMetadata,
                                            member: MemberMetadata,
                                            heartbeatDeadline: Long,
                                            sessionTimeout: Long)
  extends DelayedOperation(sessionTimeout) {
  override def tryComplete(): Boolean = coordinator.tryCompleteHeartbeat(group, member, heartbeatDeadline, forceComplete)
  override def onExpiration() = coordinator.onExpireHeartbeat(group, member, heartbeatDeadline) // 關注這里
  override def onComplete() = coordinator.onCompleteHeartbeat()
}

def onExpireHeartbeat(group: GroupMetadata, member: MemberMetadata, heartbeatDeadline: Long) {
    group synchronized {
      if (!shouldKeepMemberAlive(member, heartbeatDeadline))
        onMemberFailure(group, member) // 關注這里
    }
  }
}

private def onMemberFailure(group: GroupMetadata, member: MemberMetadata) {
    trace("Member %s in group %s has failed".format(member.memberId, group.groupId))
    group.remove(member.memberId)
    group.currentState match {
      case Dead =>
      case Stable | AwaitingSync => maybePrepareRebalance(group) // 假設消費組有一個消費者處於Stable狀態,當該消費者超時后,就會調用maybePrepareRebalance
      case PreparingRebalance => joinPurgatory.checkAndComplete(GroupKey(group.groupId))
    }
}

private def maybePrepareRebalance(group: GroupMetadata) {
    group synchronized {
      if (group.canRebalance)
        prepareRebalance(group) // 關注這里
    }
}

private def prepareRebalance(group: GroupMetadata) {
    
    if (group.is(AwaitingSync))
      resetAndPropagateAssignmentError(group, Errors.REBALANCE_IN_PROGRESS.code)

    group.transitionTo(PreparingRebalance)
    info("Preparing to restabilize group %s with old generation %s".format(group.groupId, group.generationId))

    val rebalanceTimeout = group.rebalanceTimeout
    val delayedRebalance = new DelayedJoin(this, group, rebalanceTimeout) // 最終DelayedJoin在這里被調用
    val groupKey = GroupKey(group.groupId)
    joinPurgatory.tryCompleteElseWatch(delayedRebalance, Seq(groupKey))
  }

由以上我們可以總結出,就是在heartbeat超時后會進行reblance操作,最終調用GroupCoordinator#prepareRebalance,這個時候如果消費組中members為空則會刪除。

5 總結

  1. 消費組只存在一個Pool[String, GroupMetadata], 並沒有持久化
  2. 當第一個消費者join請求來的時候,才會創建消費組,創建消費組即在Pool[String, GroupMetadata]加入代表該消費組的GroupMetadata
  3. 不能手動刪除消費組,刪除消費組的時機是當最后一個消費者離開的時候,會觸發heartbeat超時從而reblance將消費組刪除
  4. 消費組刪除涉及兩個動作,一個是將消費組從Pool[String, GroupMetadata]中移除,另一個是在__consumer_offsets中寫入tombstone
  5. __consumer_offsets只會壓縮不會刪除


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM