Kafka的消息格式


Commit Log

Kafka儲存消息的文件被它叫做log,按照Kafka文檔的說法是:

Each partition is an ordered, immutable sequence of messages that is continually appended to—a commit log

這反應出來的Kafka的行為是:消息被不斷地append到文件末尾,而且消息是不可變的。

這種行為源於Kafka想要實現的功能:高吞吐量,多副本,消息持久化。這種簡單的log形式的文件結構能夠更好地實現這些功能,不過也會在其它方面有所欠缺,比如檢索消息的能力。

而Kafka的行為也決定了它的消息格式。對於Kafka來說,消息的主體部分的格式在網絡傳輸中和磁盤上是一致的,也就是說消息的主體部分可以直接從網絡讀取的字節buffer中寫入到文件(部分情況下),也可以直接從文件中copy到網絡,而不需要在程序中再加工,這有利於降低服務器端的開銷,以及提高IO速度(比如使用zero-copy的傳輸)。

這也就決定了Kafka的消息格式必須是適於被直接append到文件中的。當然啥都可以append到文件后面,問題在於怎么從文件中拆分出來一條條記錄。

記錄的划分以及消息的格式

對於日志來說,一條記錄以"\n"結尾,或者通過其它特定的分隔符分隔,這樣就可以從文件中拆分出一條一條的記錄,不過這種格式更適用於文本,對於Kafka來說,需要的是二進制的格式。所以,Kafka使用了另一種經典的格式:在消息前面固定長度的幾個字節記錄下這條消息的大小(以byte記),所以Kafka的記錄格式變成了:

Offset MessageSize Message

消息被以這樣格式append到文件里,在讀的時候通過MessageSize可以確定一條消息的邊界。

需要注意的是,在Kafka的文檔以及源碼中,消息(Message)並不包括它的offset。Kafka的log是由一條一條的記錄構成的,Kafka並沒有給這種記錄起個專門的名字,但是需要記住的是這個“記錄”並不等於"Message"。Offset MessageSize Message加在一起,構成一條記錄。而在Kafka Protocol中,Message具體的格式為

Message => Crc MagicByte Attributes Key Value
   Crc => int32
   MagicByte => int8
   Attributes => int8
   Key => bytes
   Value => bytes

各個部分的含義是

Field

Description

Attributes

This byte holds metadata attributes about the message. The lowest 2 bits contain the compression codec used for the message. The other bits should be set to 0.

Crc

The CRC is the CRC32 of the remainder of the message bytes. This is used to check the integrity of the message on the broker and consumer.

Key

The key is an optional message key that was used for partition assignment. The key can be null.

MagicByte

This is a version id used to allow backwards compatible evolution of the message binary format. The current value is 0.

Offset

This is the offset used in kafka as the log sequence number. When the producer is sending messages it doesn't actually know the offset and can fill in any value here it likes.

Value

The value is the actual message contents as an opaque byte array. Kafka supports recursive messages in which case this may itself contain a message set. The message can be null.

 

MessageSet

之所以要強調記錄與Message的區別,是為了更好地理解MessageSet的概念。Kafka protocol里對於MessageSet的定義是這樣的

MessageSet => [Offset MessageSize Message]
   Offset => int64
   MessageSize => int32

也就是說MessageSet是由多條記錄組成的,而不是消息,這就決定了一個MessageSet實際上不需要借助其它信息就可以從它對應的字節流中切分出消息,而這決定了更重要的性質:Kafka的壓縮是以MessageSet為單位的。而以MessageSet為單位壓縮,決定了對於壓縮后的MessageSet,不需要在它的外部記錄這個MessageSet的結構,也就決定了Kafka的消息是可以遞歸包含的,也就是前邊"value"字段的說明“Kafka supports recursive messages in which case this may itself contain a message set"。

具體地說,對於Kafka來說,可以對一個MessageSet做為整體壓縮,把壓縮后得到的字節數組作為一條Message的value。於是,Message既可以表示未壓縮的單條消息,也可以表示壓縮后的MessageSet。

壓縮后的消息的讀取

就看Message頭部的Attributes里的壓縮格式標識。說到這個,得說下遞歸包含的事情,理論上,一個壓縮的的MessageSet里的一個Message可能會是另一個壓縮后的MessageSet,或者包含更深層的MessageSet。但是實際上,Kafka中的一個Message最多只含有一個MessageSet。從Message中讀取MessageSet的邏輯,可以在ByteBufferMessageSet的internalIterator方法中找到:

        if(isShallow) { //是否要進行深層迭代
          new MessageAndOffset(newMessage, offset)
        } else { //如果要深層迭代的話
          newMessage.compressionCodec match {
            case NoCompressionCodec =>
              innerIter = null
              new MessageAndOffset(newMessage, offset) //如果這個Message沒有壓縮,就直接把它作為一個Message返回
            case _ =>
              innerIter = ByteBufferMessageSet.deepIterator(newMessage) //如果這個Message采用了壓縮,就對它進行深層迭代
              if(!innerIter.hasNext)
                innerIter = null
              makeNext()
          }
        }

而ByteBufferMessageSet的deepIterator方法就是對這個Message的value進行解壓,然后從中按照Offset MessageSize Message的格式讀取一條條記錄,對於這次讀取的Message,就不再進行深層迭代了。下面是deepIterator的makeNext方法,它被不斷調用以生成迭代器的元素

      override def makeNext(): MessageAndOffset = {
        try {
          // read the offset
          val offset = compressed.readLong()
          // read record size
          val size = compressed.readInt()

          if (size < Message.MinHeaderSize)
            throw new InvalidMessageException("Message found with corrupt size (" + size + ") in deep iterator")

          // read the record into an intermediate record buffer
          // and hence has to do extra copy
          val bufferArray = new Array[Byte](size)
          compressed.readFully(bufferArray, 0, size)
          val buffer = ByteBuffer.wrap(bufferArray)

          val newMessage = new Message(buffer)

          // the decompressed message should not be a wrapper message since we do not allow nested compression
          new MessageAndOffset(newMessage, offset)
        } catch {
          case eofe: EOFException =>
            compressed.close()
            allDone()
          case ioe: IOException =>
            throw new KafkaException(ioe)
        }
      }

KAFKA-1718

至於一個MessageSet中不能包含多個壓縮后的Message(壓縮后的Message也就是以壓縮后的MessageSet作為value的Message),Kafka Protocol中是這么說的

The outer MessageSet should contain only one compressed "Message" (see KAFKA-1718 for details).

KAFKA-1718就是在Protocol里添加這么一個特殊說明的原因。事情是這樣的:

報各這個問題的人是Go語言client的作者,他發現自己發的Message明顯沒有過大,但是發生了MessageSizeTooLargeException。后來跟其它人討論,發現是因為broker端在調用Log.append時,會把傳送給這個方法的MessageSet解壓開,然后再組合成一個壓縮后的MessageSet(ByteBufferMessageSet)。而Go語言的客戶端發送的MessageSet中包含了多個壓縮后的Message,這樣即使發送時的Message不會超過message.max.bytes的限制,但是broker端再次生成的Message就超過了這個限制。所以,Kafka Protocol對這種情況做了特殊說明:The outer MessageSet should contain only one compressed "Message"。

Compressed Message的offset

即然可以把壓縮后的MessageSet作為Message的value,那么這個Message的offset該如何設置呢?

這個offset的值只有兩種可能:1, 被壓縮的MessageSet里Message的最大offset; 2, 被壓縮的MessageSet里Message的最小offset.

這兩種取值沒有功能的不同,只有效率的不同。

由於FetchRequest協議中的offset是要求broker提供大於等於這個offset的消息,因此broker會檢查log,找到符合條件的,然后傳輸出去。那么由於FetchRequest中的offset位置的消息可位於一個compressed message中,所以broker需要確定一個compressed Message是否需要被包含在respone中。

  • 如果compressed Message的offset是它包含的MessageSet的最小offset。那么,我們對於這個Message是否應包含在response中,無法給出"是”或"否“的回答。比如FetchRequest中指明的開始讀取的offset是14,而一個compressed Message的offset是13,那么這個Message中可能包含offset為14的消息,也可能不包含。
  • 如果compressed Message的offset是它包含的MessageSet的最大offset,那么,可以根據這個offset確定這個Message“不應該”包含在response中。比如FetchRequest中指明的開始讀取的offset是14,那么如果一個compressed Message的offset是13,那它就不該被包含在response中。而當我們順序排除這種不符合條件的Message,就可以找到第一個應該被包含在response中的Message(壓縮或者未壓縮), 從它開始讀取。

在第一種情況下(最小offset),我們盡管可以通過連續的兩個Message確定第一個Message的offset范圍,但是這樣在讀取時需要在讀取第二個Message的offset之后跳回到第一個Message,  這通常會使得最近一次讀(也就讀第二個offset)的文件系統的緩存失效。而且邏輯比第二種情況更復雜。在第二種情況下,broker只需要找到第一個其offset大於或等於目標offset的Message,從它可以讀取即可,而且也通常能利用到文件系統緩存,因為offset和消息內容有可能在同一個緩存塊中。

在處理FetchRequest時,broker的邏輯也正是如此。對FetchRequest的處理會調用到Log#read(startOffset: Long, maxLength: Int, maxOffset: Option[Long] = None)方法,然后調用到LogSegment的read方法,它的之后的調用有很多,所有不貼代碼了,它的注釋說明了讀取的邏輯

* Read a message set from this segment beginning with the first offset >= startOffset. The message set will include
* no more than maxSize bytes and will end before maxOffset if a maxOffset is specified

即,返回的MessageSet的第一條Message的offset >= startOffset。

而在broker給compressed Message賦予offset時,其邏輯也是賦予其包含的messages中的最大offset。這段邏輯在ByteBufferMessageSet的create方法中:

      messageWriter.write(codec = compressionCodec) { outputStream =>
        val output = new DataOutputStream(CompressionFactory(compressionCodec, outputStream)) //創建壓縮流
        try {
          for (message <- messages) {
            offset = offsetCounter.getAndIncrement //offsetCounter是一個AtomicLong,使用它的當前值作為這條Message的offset,然后+1作為下一條消息的offset
            output.writeLong(offset)//寫入這條日志記錄的offset
            output.writeInt(message.size)//寫入這條日志記錄的大小
            output.write(message.buffer.array, message.buffer.arrayOffset, message.buffer.limit) //寫入這條記錄的Message
          }
        } finally {
          output.close()
        }
      }
      val buffer = ByteBuffer.allocate(messageWriter.size + MessageSet.LogOverhead)
      writeMessage(buffer, messageWriter, offset)//以最后一個Message的offset作為這個compressed Message的offset

Validate Message

什么需要驗證?

先看一下消息的哪些特征需要被驗證。

首先,網絡傳輸過程中,數據可能會產生錯誤,即使是寫在磁盤上的消息,也可能會由於磁盤的問題產生錯誤。因此,broker對接收到的消息需要驗證其完整性。這里的消息就是前邊協議里定義的Message。對於消息完整性的檢測,是使用CRC32校驗,但是並不是對消息的所有部分計算CRC,而是對Message的Crc部分以后的部分,不包括記錄的offset和MessageSize部分。把offset和MessageSize加到CRC計算中,可以對完整性有更強的估證,但是壞處在於這兩個部分在消息由producer到達broker以后,會被broker重寫,因此如果把它們計算在crc里邊,就需要在broker端重新計算crc32,這樣會帶來額外的開銷。

CRC32沒有檢測出錯誤的概率在0.0047%以下,加上TCP本身也有校驗機制,不能檢測出錯誤的概率就很小了(這個還需要再仔細算一下)。

除了消息的完整性,還需要對消息的合規性進行檢驗,主要是檢驗offset是否是單調增長的,以及MessageSize是超過了最大值。

這里檢驗時使用的MessageSize就不是Message本身的大小了,而是一個記錄的大小,包括offset和MessageSize,這個也挺奇怪的,有必要非拉上這倆嗎?

而且在broker端檢驗producer發來的MessageSet時,也沒必要檢驗它的offset是否是單調增長的呀,畢竟leader還要對Message的offset重新賦值。而follower是從leader處拉取的,如果網絡或者磁盤出錯,通過對offset的單調性檢查也可能會漏掉出錯了的記錄,對於consumer來說也是同理。所以這里有點奇怪。

何時需要驗證?

在broker把收到的producer request里的MessageSet append到Log之前,以及consumer和follower獲取消息之后,都需要進行校驗。

這種情況分成兩種:

1. broker和consumer把收到的消息append到log之前

2. consumser收到消息后

第一種情況都是在調用Log#append時進行檢驗的。

如何驗證?

先看下Log#append的方法聲明

def append(messages: ByteBufferMessageSet, assignOffsets: Boolean = true): LogAppendInfo

在replica的fetcher線程調用append方法時,會把assignOffsets設成false,而leader處理produce request時,會把assignOffsets設成true。

下面append方法的一部分代碼

    val appendInfo = analyzeAndValidateMessageSet(messages) //驗證消息
    
    // if we have any valid messages, append them to the log
    if(appendInfo.shallowCount == 0)
      return appendInfo
      
    // trim any invalid bytes or partial messages before appending it to the on-disk log
    var validMessages = trimInvalidBytes(messages, appendInfo)//trim掉不可用的部分或者殘缺的消息

    try {
      // they are valid, insert them in the log
      lock synchronized {
        appendInfo.firstOffset = nextOffsetMetadata.messageOffset 

       if(assignOffsets) { //如果需要重新賦予offset
          // assign offsets to the message set
          val offset = new AtomicLong(nextOffsetMetadata.messageOffset)
          try {
            validMessages = validMessages.validateMessagesAndAssignOffsets(offset, appendInfo.sourceCodec, appendInfo.targetCodec, config.compact) //驗證消息並且賦予offset
          } catch {
            case e: IOException => throw new KafkaException("Error in validating messages while appending to log '%s'".format(name), e)
          }
          appendInfo.lastOffset = offset.get - 1
        } else {
          // we are taking the offsets we are given
          if(!appendInfo.offsetsMonotonic || appendInfo.firstOffset < nextOffsetMetadata.messageOffset)
            throw new IllegalArgumentException("Out of order offsets found in " + messages)
        }

        // re-validate message sizes since after re-compression some may exceed the limit 對壓縮后消息重新驗證MessageSize是否超過了允許的最大值
        for(messageAndOffset <- validMessages.shallowIterator) {
          if(MessageSet.entrySize(messageAndOffset.message) > config.maxMessageSize) {
            // we record the original message set size instead of trimmed size
            // to be consistent with pre-compression bytesRejectedRate recording
            BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).bytesRejectedRate.mark(messages.sizeInBytes)
            BrokerTopicStats.getBrokerAllTopicsStats.bytesRejectedRate.mark(messages.sizeInBytes)
            throw new MessageSizeTooLargeException("Message size is %d bytes which exceeds the maximum configured message size of %d."
              .format(MessageSet.entrySize(messageAndOffset.message), config.maxMessageSize))
          }
        }

注意到對MessageSize驗證了兩次,第二次是對重新壓縮后的消息。KAFKA-1718里提到MessageSizeToLargeException,就是在這時候檢測出來的。

初步檢驗:analyzeAndValidateMessageSet

具體的檢驗消息完整性和offset單調增長的邏輯在analyzeAndValidateMessageSet方法里。這個方法的實現里,需要注意幾點:

  1. 它是使用ByteBufferMessageSize的shallowIterator來對這個MessageSet的消息進行迭代,這也意味着並不會對compressed message里邊的MessageSet解壓后再進行檢驗,而是把comprssed message作為單個Message進行檢驗。
  2. 它計算checksum時,是計算的MagicByte及其以后的內容。
     def computeChecksum(): Long = 
        CoreUtils.crc32(buffer.array, buffer.arrayOffset + MagicOffset,  buffer.limit - MagicOffset)

     

  3. 它比較的是entrySize與MaxMessageSize的大小,來確定這個消息是否太大
      def entrySize(message: Message): Int = LogOverhead + message.size
    
    ---------------------------------
    
      val MessageSizeLength = 4
      val OffsetLength = 8
      val LogOverhead = MessageSizeLength + OffsetLength

     

  4. 它返回的LogAppendInfo中會包括一個targetCodec,指明這個MessageSet將要使用的壓縮方式。leader處理produce request時,將使用這個壓縮方式重新壓縮整個MessageSet。
        val targetCodec = BrokerCompressionCodec.getTargetCompressionCodec(config.compressionType, sourceCodec)

    config.compressionType就是broker配置里的compression.type的值,如果它是“producer", 就會使用producer request使用壓縮方式,否則就使用config.compressionType指明的壓縮方式。注意如果一個MessageSet里的Message采用了不同的壓縮方式,最后被當成sourceCodec的是最后一個壓縮了的消息的壓縮方式。

再次檢驗並且賦予offset :validateMessagesAndAssignOffsets

只有leader處理produce request時,會調用ByteBufferMessageSet的這個方法。 它不會檢測analyzeAndValidateMessageSet已經檢測的內容,但是會把這個MessageSet進行深度遍歷(即如果它里邊的消息是壓縮后,就把這個消息解壓開再遍歷),這樣它就能做analyzeAndValidateMessageSet不能進行的檢測:對於compacted topic檢測其key是否為空,如果為空就拋出InvalidMessageException。

另外,它會把深度遍歷后獲得的Message放在一起重新壓縮。

如果MessageSet的尾部不是完整的Message呢?

這是在獲取ByteBufferMessageSet的iternalIterator時候處理的。

      def makeNextOuter: MessageAndOffset = {
        // if there isn't at least an offset and size, we are done
        if (topIter.remaining < 12)
          return allDone()
        val offset = topIter.getLong()
        val size = topIter.getInt()
        if(size < Message.MinHeaderSize)
          throw new InvalidMessageException("Message found with corrupt size (" + size + ") in shallow iterator")

        // we have an incomplete message
        if(topIter.remaining < size)
          return allDone()
    .    ...
    }    

注意返回allDone()和拋出InvalidMessageException的時機。

  • 如果這個MessageSet剩下部分不到12bytes,那剩下的部分就是下一個MessageSet頭部的一部分,是沒法處理的,也是沒辦法檢驗的,因此就返回allDone。
  • 如果夠12bytes,就可以讀出offset和MessageSize。MessageSize至少會大於Message頭里邊的那些crc、Attributes, MagicBytes等加起來的大小,因此如果MessageSize比這個還小,就肯定是個entry有問題,所以就拋出異常。這里的問題在於,即使MessageSet最后的那個Message是不完整的,只要MessageSize有問題,也會拋異常,而不是忽略這個不完整的Message。(這個可能是沒考慮到,也可能是有別的考慮,不過無論怎么處理最后的這個不完整的Message,都有一定的道理)。

 consumer端的驗證

consumer(0.9)會檢查checksum,不過是可以配置的,原因正如config里說的一樣。

    public static final String CHECK_CRCS_CONFIG = "check.crcs";
    private static final String CHECK_CRCS_DOC = "Automatically check the CRC32 of the records consumed. This ensures no on-the-wire or on-disk corruption to the messages occurred. This check adds some overhead, so it may be disabled in cases seeking extreme performance.";

config的文檔說,檢查checksum是為了"ensures no on-the-wire or on-disk corruption to the message occurred."即,為了保證沒有在網絡傳輸出或者磁盤存儲時出現了消息的損壞。但是checksum計算時會帶來開銷,所以追求最佳性能,可以關掉checksum的檢查。


 

下面來看一下幾個與消息格式相關的KIP。為什么需要這些改變呢?為什么之前沒有實現這些改變呢?都是因為各種折衷吧,需求與性能折衷,需求與實現所需的工作量的折衷……

下面的幾個KIP可能會一起加上去,畢竟都是對消息格式的修改,不能搞沖突了。

KIP-31 - Move to relative offsets in compressed message sets

前邊提到了,在leader收到ProduceRequet之后,它會解壓開compressed message(也就是是這個KIP里的compressed messageset,這兩說說法的確有些亂),然后給里邊包含的message set的每條消息重新賦予offset。這個做法也是應該的,乍一看也沒什么不好。但是問題在於,不僅是直接改個offset這么簡單,在改完之后,需要重新壓縮這些消息,還要計算。這么一搞,開銷就大了。KIP-31就是想把這部分的性能損失降下來。(這個KIP已經是accepted狀態)

做法是把在一個compressed message set里邊的每個message的offset里記下當前message相對於外層的wrapper message的偏移。用漢語說這個意思比較費勁,KIP里這么說

When the producer compresses a message, write the relative offset value in the raw message's offset field. Leave the wrapped message's offset blank.

When broker receives a compressed message, it only needs to 

    1. Decompress the message to verify the CRC and relative offset.
    2. Set outer message's base offset. The outer message's base offset will be the offset of the last inner message.  (Since the broker only needs to update the message-set header, there is no need to re-compress message sets.)

注意,這個wrapper message里記的base offset, 是它所含的message set里的最后一個message的offset。這個和當前的compressed message的offset是一致的。

然后當broker收到一個壓縮后的消息時,它只需要

  • 驗證CRC與realtive offset的正確性
  • 重新設定外層消息的offset,也就是base offset。

KIP-32 - Add timestamps to Kafka message

在消息里加時間戳。需要注意的是,這個KIP還在討論中(以下的內容是基於2016年1月7日的版本)。不像上一個已經確定了。

(俺是覺得這個事情早該做了……)

首先,來看一下動機,這個提有意思

Motivation

This KIP tries to address the following issues in Kafka.

  1. Log retention might not be honored: Log retention is currently at the log segment level, and is driven off the last modification time of a log segment. This approach does not quite work when a replica reassignment happens because the newly created log segment will effectively have its modification time reset to now.
  2. Log rolling might break for a newly created replica as well because of the same reason as (1).
  3. Some use cases such as streaming processing needs a timestamp in messages.

說的是這幾個原因

1. Log retention會不靠譜。當前log retention是在log segment層面做的,是按照log segment的最后修改時間確定是否要刪除一個log segment. 但是,當replica重分配發生時,新被分配的這個replica的log segment的修改時間會被設成當前時間。這么一來,它就不能被按照log retention想要做的那樣(實際上是想把一段時間之前的消息刪除)被刪除。

2. 由於和1同樣的原因,對於一個新創建的replica(意思應該是移動位置的replica, 並不是增加分區后新加的replica)log rolling有時候也會不靠譜。

3. 有些場景中需要消息含有時間戳,比如流處理。

感覺,貌似第三個原因才是決定性的,擁抱流處理。

接口的變化

准備在Message里加入timestamp字段

准備增加兩個配置

  • message.timestamp.type 可以選CreateTime或者LogAppendTime,CreateTime就是這條消息生成的時間,是在producer端指定的。LogAppendTime就是append到log的時間(實現細節沒有說明)。
  • max.message.time.difference.ms 如果選擇了CreateTime, 那么只有當createTime和broker的本地時間相差在這個配置指定的差距之內,broker才會接受這條消息。

糾結之處

之前關於這個KIP的討論主要是關於使用哪個時間, 是使用LogAppendTime(broker time),還是CreateTime(application time)。

兩種都有利有弊:

The good things about LogAppendTime are: 使用LogAppendTime的好處在於

  1. Broker is more robust. Broker比起用戶程序更健壯(更不容易出錯,比如用戶程序可能有bug,導致CreateTime設置的不正確,想一想KIP-33,如果錯得離譜,索引怎么建?)
  2. Monotonically increasing. LogAppendTime是單調增長的。(但是,follower收到的消息的timestamp該怎么設呢?如果不用leader帶來的,就不能確定是否monotonically increasing)
  3. Deterministic behavior for log rolling and retention.log rolling和retention的行為是確定性的。(如果按消息里的這個timestamp來決定這兩個操作的行為,那么讓用戶指定timestamp的確挺危險的)
  4. If CreateTime is required, it can always be put into the message payload.如果需要CreateTime,可以加到消息的內容里。(這個的確是……)

The good things about CreateTime are: 使用CreateTime的好處是

  1. More intuitive to users. 更符合用戶的思維(用戶當然是想使用自己填進去的時間)。
  2. User may want to have log retention based on when the message is created instead of when the message enters the pipeline.用戶可能更希望用消息被創建的時間來決定log retention的行為,而不是消息進行處理管道的時間。
  3. Immutable after entering the pipeline.這樣,消息的timestamp在進入管道后就不會再改變了。

在俺看來,這兩個選擇的確挺糾結的。用戶肯定是想用自己產生消息的時間,不然很難准確地找到一條消息。但是,如果使用用戶指定的時間,broker端的行為就變得復雜了,比如,如果用戶指定的時間不是單調遞增的,該怎么建時間索引。但是用戶產生畸形的時間,倒可以通過配置里max.message.time.difference.ms來控制。或許可以加另一個配置,允許broker在一定范圍內修改CreateTime,比如最多可以更改1000ms。這樣就能即使消息的timestamp單調增長,也能使用戶對消息的時間的估計比較准確。不過,這樣可能就需要讓broker time的含義變成broker收到消息時間,而不是append到log的時間。否則就難以確定何時該拒絕無法在指定范圍內修改timestamp的消息。

 

KIP-33 - Add a time based log index

動機:

當前按照時間戳查找offset得到的結果是非常粗粒度的,只能在log segment的級別。(對於reassigned replica就差得沒譜了。)所以這個KIP提議建一個基於時間的對日志的索引,來允許按timestamp搜索消息的結果更准確。

這個KIP和KIP-32是緊密相關的。這倆KIP都在討論過程中。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM