brokers和消費者使用zk來獲取狀態信息和追蹤消息坐標。
每一個partition是一個有序的,不可變的消息序列。
只有當partition里面的file置換到磁盤文件以后,才開放給消費者來消費。
每一個partition是跨服務器地被復制到其他地方,為了容錯的目的。
這個partition可以理解為hadoop中block的單位。
但是只有被選擇為leader的服務器partition來服務消費者的讀和生產者的寫,
followers只是把數據同步過去。同步狀態較好的被列入ISR,這些ISR和leader
信息都保存在zk中,當leader狀態異常,ISR中的某一個Follower變成新的leader.
在整個kafka集群中,每一個服務器扮演一個雙重角色,它可能是某個top的leader partition,
也同時可以是另一個topic的follower partition.這確保了集群的負載均衡。
每一個消費者代表一個進程,多個消費者組成一個消費者組。
一個topic中的一條消息只能被一個消費者組中的某一個消費者消費,如果需要被多個消費者消費,則這些消費者需要在不同的消費者組中。
原因可能是以消費者組的單位在zk中保持partition的offset.
kafka的設計中,broker是無狀態的,這意味着它並不負責管理哪些消費者消費了哪些partition中的消息到什么位置,甚至誰消費的都不理會。
對於消息保持策略,kafka采用了基於時間的SLA,一個消息將會被自動刪除當它達到了這個SLA.
kafka的復制策略有兩種,同步和異步,同步會在lead replica和follower都完成消息的存儲后才給producer發確認信息。
異步同步,只要lead replica收到了信息,就給producer發確認信息,如果這個時候lead replica的broker出問題,就會有風險。
生產者
kafka的message api for producer
從前面分析得知,數據被封裝成消息,如何發送給kafka呢?首先需要獲取這個topic的 lead partition。
消息可以一條一條發送,也可以批量壓縮異步發送。即攢到一定的數量或一定的時間再發送。
Producer:Kafka provides the kafka.javaapi.producer.Producer class (classProducer<K,V>)。默認的分區策略是對key進行hash.
import java.util.Date; import java.util.Properties; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; public class SimpleProducer { private static Producer<String, String> producer; public SimpleProducer() { Properties props = new Properties(); // Set the broker list for requesting metadata to find the lead broker props.put("metadata.broker.list", "192.168.146.132:9092, 192.168.146.132:9093, 192.168.146.132:9094"); //This specifies the serializer class for keys props.put("serializer.class", "kafka.serializer.StringEncoder"); // 1 means the producer receives an acknowledgment once the lead replica // has received the data. This option provides better durability as the // client waits until the server acknowledges the request as successful. props.put("request.required.acks", "1"); ProducerConfig config = new ProducerConfig(props); producer = new Producer<String, String>(config); } public static void main(String[] args) { int argsCount = args.length; if (argsCount == 0 || argsCount == 1) throw new IllegalArgumentException( "Please provide topic name and Message count as arguments"); String topic = (String) args[0]; String count = (String) args[1]; int messageCount = Integer.parseInt(count); System.out.println("Topic Name - " + topic); System.out.println("Message Count - " + messageCount); SimpleProducer simpleProducer = new SimpleProducer(); simpleProducer.publishMessage(topic, messageCount); } private void publishMessage(String topic, int messageCount) { for (int mCount = 0; mCount < messageCount; mCount++) { String runtime = new Date().toString(); String msg = "Message Publishing Time - " + runtime; System.out.println(msg); // Creates a KeyedMessage instance KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, msg); // Publish the message producer.send(data); } // Close producer connection with broker. producer.close(); } }