Kafka是什么
Kafka是最初由Linkedin公司開發,是一個分布式、分區的、多副本的、多訂閱者,基於zookeeper協調的分布式日志系統(也可以當做MQ系統),常見可以用於web/nginx日志、訪問日志,消息服務等等,Linkedin於2010年貢獻給了Apache基金會並成為頂級開源項目。
1.前言
一個商業化消息隊列的性能好壞,其文件存儲機制設計是衡量一個消息隊列服務技術水平和最關鍵指標之一。
下面將從Kafka文件存儲機制和物理結構角度,分析Kafka是如何實現高效文件存儲,及實際應用效果。
2.Kafka文件存儲機制
Kafka部分名詞解釋如下:
- Broker:消息中間件處理結點,一個Kafka節點就是一個broker,多個broker可以組成一個Kafka集群。
- Topic:一類消息,例如page view日志、click日志等都可以以topic的形式存在,Kafka集群能夠同時負責多個topic的分發。
- Partition:topic物理上的分組,一個topic可以分為多個partition,每個partition是一個有序的隊列。
- Segment:partition物理上由多個segment組成,下面2.2和2.3有詳細說明。
- offset:每個partition都由一系列有序的、不可變的消息組成,這些消息被連續的追加到partition中。partition中的每個消息都有一個連續的序列號叫做offset,用於partition唯一標識一條消息.
分析過程分為以下4個步驟:
- topic中partition存儲分布
- partiton中文件存儲方式
- partiton中segment文件存儲結構
- 在partition中如何通過offset查找message
通過上述4過程詳細分析,我們就可以清楚認識到kafka文件存儲機制的奧秘。
2.1 topic中partition存儲分布
假設實驗環境中Kafka集群只有一個broker,xxx/message-folder為數據文件存儲根目錄,在Kafka broker中server.properties文件配置(參數log.dirs=xxx/message-folder),例如創建2個topic名稱分別為report_push、launch_info, partitions數量都為partitions=4
存儲路徑和目錄規則為:
xxx/message-folder |--report_push-0
|--report_push-1
|--report_push-2
|--report_push-3
|--launch_info-0
|--launch_info-1
|--launch_info-2
|--launch_info-3
在Kafka文件存儲中,同一個topic下有多個不同partition,每個partition為一個目錄,partiton命名規則為topic名稱+有序序號,第一個partiton序號從0開始,序號最大值為partitions數量減1。
如果是多broker分布情況,請參考kafka集群partition分布原理分析
2.2 partiton中文件存儲方式
下面示意圖形象說明了partition中文件存儲方式:
圖1
- 每個partion(目錄)相當於一個巨型文件被平均分配到多個大小相等segment(段)數據文件中。但每個段segment file消息數量不一定相等,這種特性方便old segment file快速被刪除。
- 每個partiton只需要支持順序讀寫就行了,segment文件生命周期由服務端配置參數決定。
這樣做的好處就是能快速刪除無用文件,有效提高磁盤利用率。
2.3 partiton中segment文件存儲結構
讀者從2.2節了解到Kafka文件系統partition存儲方式,本節深入分析partion中segment file組成和物理結構。
- segment file組成:由2大部分組成,分別為index file和data file,此2個文件一一對應,成對出現,后綴".index"和“.log”分別表示為segment索引文件、數據文件.
- segment文件命名規則:partion全局的第一個segment從0開始,后續每個segment文件名為上一個segment文件最后一條消息的offset值。數值最大為64位long大小,19位數字字符長度,沒有數字用0填充。
下面文件列表是筆者在Kafka broker上做的一個實驗,創建一個topicXXX包含1 partition,設置每個segment大小為500MB,並啟動producer向Kafka broker寫入大量數據,如下圖2所示segment文件列表形象說明了上述2個規則:
圖2
以上述圖2中一對segment file文件為例,說明segment中index<—->data file對應關系物理結構如下:
圖3
上述圖3中索引文件存儲大量元數據,數據文件存儲大量消息,索引文件中元數據指向對應數據文件中message的物理偏移地址。
其中以索引文件中元數據3,497為例,依次在數據文件中表示第3個message(在全局partiton表示第368772個message)、以及該消息的物理偏移地址為497。
從上述圖3了解到segment data file由許多message組成,下面詳細說明message物理結構如下:
圖4
參數說明:
關鍵字 | 解釋說明 |
---|---|
8 byte offset | 在parition(分區)內的每條消息都有一個有序的id號,這個id號被稱為偏移(offset),它可以唯一確定每條消息在parition(分區)內的位置。即offset表示partiion的第多少message |
4 byte message size | message大小 |
4 byte CRC32 | 用crc32校驗message |
1 byte “magic" | 表示本次發布Kafka服務程序協議版本號 |
1 byte “attributes" | 表示為獨立版本、或標識壓縮類型、或編碼類型。 |
4 byte key length | 表示key的長度,當key為-1時,K byte key字段不填 |
K byte key | 可選 |
value bytes payload | 表示實際消息數據。 |
2.4 在partition中如何通過offset查找message
例如讀取offset=368776的message,需要通過下面2個步驟查找。
-
第一步查找segment file
上述圖2為例,其中00000000000000000000.index表示最開始的文件,起始偏移量(offset)為0.第二個文件00000000000000368769.index的消息量起始偏移量為368770 = 368769 + 1.同樣,第三個文件00000000000000737337.index的起始偏移量為737338=737337 + 1,其他后續文件依次類推,以起始偏移量命名並排序這些文件,只要根據offset **二分查找**文件列表,就可以快速定位到具體文件。
當offset=368776時定位到00000000000000368769.index|log -
第二步通過segment file查找message
通過第一步定位到segment file,當offset=368776時,依次定位到00000000000000368769.index的元數據物理位置和00000000000000368769.log的物理偏移地址,然后再通過00000000000000368769.log順序查找直到offset=368776為止。
從上述圖3可知這樣做的優點,segment index file采取稀疏索引存儲方式,它減少索引文件大小,通過mmap可以直接內存操作,稀疏索引為數據文件的每個對應message設置一個元數據指針,它比稠密索引節省了更多的存儲空間,但查找起來需要消耗更多的時間。
3 Kafka文件存儲機制–實際運行效果
實驗環境:
- Kafka集群:由2台虛擬機組成
- cpu:4核
- 物理內存:8GB
- 網卡:千兆網卡
- jvm heap: 4GB
- 詳細Kafka服務端配置及其優化請參考:kafka server.properties配置詳解
圖5
從上述圖5可以看出,Kafka運行時很少有大量讀磁盤的操作,主要是定期批量寫磁盤操作,因此操作磁盤很高效。這跟Kafka文件存儲中讀寫message的設計是息息相關的。Kafka中讀寫message有如下特點:
寫message
- 消息從java堆轉入page cache(即物理內存)。
- 由異步線程刷盤,消息從page cache刷入磁盤。
讀message
- 消息直接從page cache轉入socket發送出去。
- 當從page cache沒有找到相應數據時,此時會產生磁盤IO,從磁
盤Load消息到page cache,然后直接從socket發出去
4.offset存儲方式
- 1、在kafka 0.9版本之后,kafka為了降低zookeeper的io讀寫,減少network data transfer,也自己實現了在kafka server上存儲consumer,topic,partitions,offset信息將消費的 offset 遷入到了 Kafka 一個名為 __consumer_offsets 的Topic中。
- 2、將消費的 offset 存放在 Zookeeper 集群中。
- 3、將offset存放至第三方存儲,如Redis, 為了嚴格實現不重復消費
下面分別說一下這三種存儲方式的實現
4.1 __consumer_offsets [kafka]
下面的代碼案例實現了test這一topic的數據連續消費
from kafka import KafkaConsumer class KafkaStreamTest: ''' This class consume all external Kafka topics''' def __init__(self): self.appName = "kafkatest" self.kafkaHosts = "192.168.4.201:6667,192.168.4.231:6667" self.kafkaAutoOffsetReset = "largest" self._kafka_topic = "test" def start(self): reload(sys) sys.setdefaultencoding('utf-8') elogging.debug(self.appName, elogging.normalCID(), "receiver starting") consumer = KafkaConsumer('test', bootstrap_servers=['192.168.4.201:6667','192.168.4.231:6667'], enable_auto_commit=True, auto_offset_reset='earliest') #consumer = KafkaConsumer('test', bootstrap_servers=['192.168.4.201:6667', '192.168.4.231:6667'], auto_offset_reset='earliest') while True: # The definition of KafkaMessage: # KafkaMessage = namedtuple("KafkaMessage", # ["topic", "partition", "offset", "key", "value"]) kafkaMsg = consumer.next() # for debug print kafkaMsg.topic, kafkaMsg.partition, kafkaMsg.offset, kafkaMsg.key, kafkaMsg.value if __name__ =="__main__": test = KafkaStreamTest() test.start()
enable_auto_commit (bool) – If True , the consumer’s offset will be periodically committed in the background. Default: True設置為true,表示offset自動托管到kafka內部的一個特定名稱為__consumer_offsets的topic
auto_offset_reset:What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted):
- earliest: automatically reset the offset to the earliest offset
- latest: automatically reset the offset to the latest offset
只有當offset不存在的時候,才用latest或者earliest
其他詳細內容請參看
https://github.com/dpkp/kafka-python
https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html
https://stackoverflow.com/questions/35432326/how-to-get-latest-offset-for-a-partition-for-a-kafka-topic
Kafka 如何讀取offset topic內容 (__consumer_offsets)
kafka 0.9.0.0 __consumer_offsets日志清理問題?
Kafka 0.10.2<auto.offset.reset和enable.auto.commit>
4.2 zookeeper
請參考
spark createDirectStream保存kafka offset
4.3 Redis[推薦]
import os import sys sys.path.append("..") sys.path.append(sys.argv[0][:sys.argv[0].rfind(os.path.join('com','ericsson'))]) import copy import traceback import redis from pyspark import SparkContext, SparkConf from pyspark.streaming import StreamingContext, DStream from pyspark.sql import SQLContext import simplejson as json from com.ericsson.analytics.fms.common.common import ELogForDistributedApp,getSqlContextInstance from pyspark.streaming.kafka import KafkaUtils,TopicAndPartition from com.ericsson.analytics.oamf.client.logging import elogging from com.ericsson.analytics.fms.common.common import HDFSOperation class KafkaStreamTest: ''' This class consume all external Kafka topics, store the data into Parquet and send the data to internal Kafka topics ''' def __init__(self): self.appName = "kafkatest" self.kafkaHosts = "192.168.4.201:6667,192.168.4.231:6667" self.kafkaAutoOffsetReset = "largest" self.kafka_offset_redis_db = 6 self._kafka_topic = "test" self.redisHost = "192.168.4.231" self.redisPort = 6379 self.spark_batch_duration = 20 def createStreamingContext(self, sc): ssc = StreamingContext(sc, self.spark_batch_duration) ds = self.getDStreamFromKafka(ssc) if ds is not None: elogging.info(self.appName, elogging.normalCID(), "Kafka succeeded to getting the data") return ssc, ds else: return None, None def getDStreamFromKafka(self, ssc): kafkaParams = {"metadata.broker.list": self.kafkaHosts} elogging.debug(self.appName, elogging.normalCID(), kafkaParams) sc = ssc.sparkContext dstream = None try: redisConn = self.getRedisConnection(self.kafka_offset_redis_db) if redisConn.exists(self.appName): elogging.debug(self.appName, elogging.normalCID(), "key " + self.appName + " exists in redis") fromOffset = {} offsetListStr = redisConn.get(self.appName) offsetList = eval(offsetListStr) for offset in offsetList: elogging.debug(self.appName, elogging.normalCID(), str(offset)) topicPartion = TopicAndPartition(offset["topic"], offset["partition"]) fromOffset[topicPartion] = offset["untilOffset"] dstream = KafkaUtils.createDirectStream(ssc, [self._kafka_topic], kafkaParams, fromOffset) else: kafkaParams = {"metadata.broker.list": self.kafkaHosts, "auto.offset.reset": self.kafkaAutoOffsetReset} elogging.debug(self.appName, elogging.normalCID(), "key " + self.appName + " doesn't exist in redis") dstream = KafkaUtils.createDirectStream(ssc, [self._kafka_topic], kafkaParams) except: traceInfo = traceback.format_exc() elogging.error(self.appName, elogging.faultCID(), "failed to create DStream : " + traceInfo) return dstream def getRedisConnection(self, redisDB): try: pool = redis.ConnectionPool(host=self.redisHost, port=self.redisPort, db=redisDB) redisConn = redis.Redis(connection_pool=pool) except: traceInfo = traceback.format_exc() elogging.error(self.appName, elogging.faultCID(), "failed to create DStream : " + traceInfo) return None return redisConn def getOffSetRangesFromRDD(self, rdd): try: offsetRanges = rdd.offsetRanges() except: traceInfo = traceback.format_exc() elogging.error(self.appName, elogging.faultCID(), "failed to call rdd.offsetRanges() function : " + traceInfo) return None offsetList = [] for offset in offsetRanges: offsetList.append({"topic": offset.topic, "partition": offset.partition, "fromOffset": offset.fromOffset, "untilOffset": offset.untilOffset}) elogging.info(self.appName, elogging.normalCID(), "getOffSetRangesFromRDD, offsetList: " + str(offsetList)) return offsetList def saveOffSetRangesToRedis(self, offsetList): redisConn = self.getRedisConnection(self.kafka_offset_redis_db) if redisConn is not None: redisConn.set(self.appName, offsetList) elogging.info(self.appName, elogging.normalCID(), "saveOffSetRangesToRedis, offsetList : " + str(offsetList)) def handleMessages(self, runTime, rdd): elogging.debug(self.appName, elogging.normalCID(), "========= %s =========" % str(runTime)) offsetList = self.getOffSetRangesFromRDD(rdd) if offsetList is not None: self.saveOffSetRangesToRedis(offsetList) rddFilter = rdd.map(lambda p: p[1]) counts = rddFilter.flatMap(lambda line: line.split(" ")) \ .map(lambda word: (word, 1)) \ .reduceByKey(lambda a, b: a + b) sqlContext = getSqlContextInstance(rddFilter.context) if counts is not None: df = sqlContext.createDataFrame(counts) df.show() def start(self): reload(sys) sys.setdefaultencoding('utf-8') sc = SparkContext(appName=self.appName) eloggingConfig = None try: eloggingConfig = HDFSOperation.getConfigFromHDFS(ELogForDistributedApp.LOGHDFSPATH, sc) elogging.initLogFromDict(eloggingConfig) except StandardError, se: pass elogging.debug(self.appName, elogging.normalCID(), "receiver starting") configInfoStr = 'kafkaHosts:' + str(self.kafkaHosts) + ', kafkaAutoOffsetReset:' + str(self.kafkaAutoOffsetReset) + \ ', kafka_offset_redis_db:' + str(self.kafka_offset_redis_db) + ', spark_batch_duration:' + str(self.spark_batch_duration) + \ ', redisHost:' + str(self.redisHost) + ', redisPort:' + str(self.redisPort) elogging.info(self.appName, elogging.normalCID(), configInfoStr) ssc, newDS = self.createStreamingContext(sc) if newDS is not None: newDS.foreachRDD(self.handleMessages) ssc.start() elogging.debug(self.appName, elogging.normalCID(), "StreamingContext start") ssc.awaitTermination() elogging.debug(self.appName, elogging.normalCID(), "receiver end") else: traceInfo = traceback.format_exc() elogging.error(self.appName, elogging.faultCID(), "Failed to create DStream " + traceInfo) if __name__ =="__main__": test = KafkaStreamTest() test.start()
5.總結
Kafka高效文件存儲設計特點
- Kafka把topic中一個parition大文件分成多個小文件段,通過多個小文件段,就容易定期清除或刪除已經消費完文件,減少磁盤占用。
- 通過索引信息可以快速定位message和確定response的最大大小。
- 通過index元數據全部映射到memory,可以避免segment file的IO磁盤操作。
- 通過索引文件稀疏存儲,可以大幅降低index文件元數據占用空間大小。