近期在做kafka metrics. 參考了幾個開源的項目,諸如kafka manager, Burrow, kafkaOffsetMonitor,東西都很不錯,可惜沒有一個是用java編寫的,最終自己去仿照kafka源碼寫了個java版的adminclient,拿到了自己需要的metrics數據。這個功能開發完,也對kafka有了些許的了解。遂記錄如下。
基本概念:
producer:數據發送方。producer可以把消息以K-V的格式發送到某個topic。K是任意的表示,可string,可int;V可string,可byte[]。
consumer:數據接收方,或使用方。一個consumer可訂閱一個或多個topic。每個consumer都屬於一個consumer group.
group:某一類consumer的集合,有一個groupId,一個group中可以有多個consumer,發送到topic中的消息,只會被一個group中的某一個consumer消費。
Topic:一類消息的總稱。Topic可以被分成多個partition存放在kafka集群的不同server上。發到topic中的數據以append的形式存儲在log文件中,每條數據有一個唯一標示(offset)。
Partition:實際存儲data的分區。一個topic的數據可以分布在多個分區,每個分區也可以定義備份的個數。每個分區有一個leader partition,在別的broker上有對應的多個follow partitions。Topic只從leader partition消費消息。當leader partition壞掉之后,kafka會自動從follow partitions中選出重新選出一個leader partition。
logSize:某個parition上log的總長度。
offset:數據在parition中的偏移量。這個offset不是該數據在partition文件中的實際偏移量,而是一個邏輯值用於確定一條message數據。比如有100條數據,offset為0~99,根據數據內容的大小,物理上可能分成5個segment文件,offset分別為0~15,16~20,21~55,55~80,81~99,每個segment的名字以最小offset命名,分別為0,16,21,55,81,這樣根據某個offset定位數據的時候,就比較容易了。為了進一步方便定位,kafka還為每個segment建立了index,index包含兩個部分:offset和position,position代表數據再segment文件中的絕對位置。
Lags: logSize - currentOffset.
kafka使用方式:
kafka可以有多種使用方法,比如作為常規的message bus, log日志集中通道,網站訪問信息收集通道等等。特殊一點的,可以作為一個分布式的多線程庫,消息分發到同一個group的不同的consumer上,進行並行處理。
kafka工作機制
kafka的消息傳送機制:
- at most once: 消費者fetch消息,然后保存offset,然后處理消息。當client保存offset之后,但是在消息處理過程中出現了異常,導致部分消息未能繼續處理.那么此后”未處理”的消息將不能被fetch到。
- at least once: 消費者fetch消息,然后處理消息,然后保存offset。如果消息處理成功之后,但是在保存offset階段zookeeper異常導致保存操作未能執行成功,這就導致接下來再次fetch時可能獲得上次已經處理過的消息。
通常情況下,選用at least once。
Kafka復制備份機制:
kafka把每個parition的消息復制到多個broker上,任何一個parition都有一個leader和多個follow,備份個數可以在創建topic的時候指定。leader負責處理所有read/write請求,follower像consumer一樣從leader接收消息並把消息存儲在log文件中。leader還負責跟蹤所有的follower狀態,如果follower“落后”太多或失效,leader將會把它從replicas同步列表中刪除。當所有的follower都將一條消息保存成功,此消息才被認為是“committed”。
Kafka與Zookeeper的交互機制:
當一個kafka broker啟動后,會向zookeeper注冊自己的節點信息,當broker和zookeeper斷開鏈接時,zookeeper也會刪除該節點的信息。除了自身的信息,broker也會向zookeeper注冊自己持有的topic和partitions信息。
當一個consumer被創建時,會向zookeeper注冊自己的信息,此作用主要是為了“負載均衡”。一個group中的多個consumer可以交錯的消費一個topic的所有partitions。簡而言之,保證此topic的所有partitions都能被此group所消費,且消費時為了性能的考慮,讓partition相對均衡的分撒到每個consumer上。每一個consumer都有一個唯一的ID(host:uuid,可以通過配置文件指定,也可以由系統生成),此ID用來標記消費者信息,主要是topic+partition信息。
Producer端使用zookeeper用來”發現”broker列表,以及和Topic下每個partition leader建立socket連接並發送消息。
zookeeper上還存放partition被哪個consumer所消費的信息,以及每個consumer目前所消費的partition中的最大offset。
在kafka 0.9版本之后,kafka為了減少與zookeeper的交互,減少network data transfer,也自己實現了在kafka server上存儲consumer,topic,partitions,offset信息。
kafka metrics:
對kafka的metrics主要是對lags的分析,lags是topic/partition的logSize與consumer消費到的offset之間的差值,即producer產生數據的量與consumer消費數據的量的差值,差值越來越大,說明消費數據的速度小於產生數據的速度。一般可以認定是consumer出了問題。當然也不能只看某一點的lags大小,更重要的是關注lags的變化的趨勢,當趨勢越來越大時,可推斷consumer的performance越來越差。
在kafka 0.8.1版本之后,可以通過配置選擇把topic/partition的logsize,offset等信息存儲在zookeeper上或存儲在kafka server上。在做metrics時,注意可能需要分別從兩邊獲取數據。
獲取zookeeper上的kafka數據比較簡單,可以通過SimpleConsumer配合zookeeper.getChildren方法獲取consumerGroup, topic, paritions信息,然后通過SimpleConsumer的getOffsetsBefore方法獲取logSize,fetchOffsets獲取topic parition的currentOffsets。
獲取kafka server上的數據比較麻煩,目前kafka 0.10提供的kafkaConsumer類主要還是關注topic消費,對consumerGroup及Group和topic關系的獲取,還沒有提供API。不過我們知道可以通過kafka-consumer-groups.sh得到group,topic等信息的,這個shell文件里面調用了kafka.admin.ConsumerGroupCommand類,這個類確實提供了一個listGroup方法,可惜這個方法的返回值是void,shell文件的輸出是打印到控制台的,並沒有返回值。再去研究ConsumerGroupCommand是怎么拿到group的,發現它通過AdminClient對象的listAllConsumerGroup獲取的group list,所以只要new出來一個AdminClient就能解決問題。
Java實現AdminClient(for kafka 0.9)
private static AdminClient getAdminClient(){ if(null != adminClient){ return adminClient; }else{ Time time = new SystemTime(); Metrics metrics = new Metrics(time); Metadata metadata = new Metadata(); ConfigDef configs = new ConfigDef(); configs.define( CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, Type.LIST, Importance.HIGH, CommonClientConfigs.BOOSTRAP_SERVERS_DOC) .define( CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, ConfigDef.Type.STRING, CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL, ConfigDef.Importance.MEDIUM, CommonClientConfigs.SECURITY_PROTOCOL_DOC) .withClientSslSupport() .withClientSaslSupport(); HashMap<String, String> originals = new HashMap<String, String>(); originals.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, KAFKA_METRICS_BOOTSTRAP_SERVERS); AbstractConfig abstractConfig = new AbstractConfig(configs, originals); ChannelBuilder channelBuilder = org.apache.kafka.clients.ClientUtils.createChannelBuilder(abstractConfig.values()); List<String> brokerUrls = abstractConfig.getList(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG); List<InetSocketAddress> brokerAddresses = org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(brokerUrls); Cluster bootstrapCluster = Cluster.bootstrap(brokerAddresses); metadata.update(bootstrapCluster, 0); Long DefaultConnectionMaxIdleMs = 9 * 60 * 1000L; int DefaultRequestTimeoutMs = 5000; int DefaultMaxInFlightRequestsPerConnection = 100; Long DefaultReconnectBackoffMs = 50L; int DefaultSendBufferBytes = 128 * 1024; int DefaultReceiveBufferBytes = 32 * 1024; Long DefaultRetryBackoffMs = 100L; String metricGrpPrefix = "admin"; Map<String, String> metricTags = new LinkedHashMap<String, String>(); //Selector selector = new Selector(DefaultConnectionMaxIdleMs, metrics, time, metricGrpPrefix, channelBuilder); Selector selector = new Selector(DefaultConnectionMaxIdleMs, metrics, time, metricGrpPrefix, metricTags, channelBuilder); AtomicInteger AdminClientIdSequence = new AtomicInteger(1); NetworkClient client = new NetworkClient(selector, metadata, "admin-" + AdminClientIdSequence.getAndIncrement(), DefaultMaxInFlightRequestsPerConnection, DefaultReconnectBackoffMs, DefaultSendBufferBytes, DefaultReceiveBufferBytes, DefaultReceiveBufferBytes, time); ConsumerNetworkClient highLevelClient = new ConsumerNetworkClient(client, metadata, time, DefaultRetryBackoffMs); //ConsumerNetworkClient highLevelClient = new ConsumerNetworkClient(client, metadata, time, DefaultRetryBackoffMs, DefaultRequestTimeoutMs); scala.collection.immutable.List<Node> nList = scala.collection.JavaConverters.asScalaBufferConverter(bootstrapCluster.nodes()).asScala().toList(); adminClient = new AdminClient(time, DefaultRequestTimeoutMs, highLevelClient, nList); return adminClient; } }