前言:
kafka是linkedin開源的消息隊列, 淘寶的metaq就是基於kafka而研發. 而消息隊列作為一個分布式組件, 在服務解耦/異步化, 扮演非常重要的角色. 本系列主要研究kafka的思想和使用, 本文主要講解kafka的一些基本概念和api的使用.
*) 准備工作
1) 配置maven依賴
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.9.2</artifactId> <version>0.8.1.1</version> </dependency>
2).配置hosts
vim /etc/hosts
把kafka集群相關的ip及其hostname, 配置到kafka客戶端的本地機器
*) Kafka的基礎知識
1). Broker, Zookeeper, Producer, Consumer
Broker具體承擔消息存儲轉發工作, Zookeeper則用與元信息的存儲(topic的定義/消費進度), Producer則是消息的生產者, Consumer則是消息的消費者.
2). Topic, Partition, Replication, Consumer Group
Topic對應一個具體的隊列, 在Kafka的概念中, 一個應用一個隊列. 應用數據往往呈現部分有序的特點, 因此對kafka的隊列, 引入partition的概念, 即可topic划分為多個partition. 單個Partition內保證有序, Partition間不保證. 這樣作的好處, 是充分利用了集群的能力, 均勻負載和提高性能.
Replication主要為了高可用性, 保證部分節點失效的惡劣情況下, 隊列數據能不丟.
Consumer Group的概念的引入, 很有創新性, 把以往傳統隊列(topic模式, queue模式)的屬性從隊列本身挪到了消費端. 若要使用queue模式, 則所有的消費端都采用統一個consumer group, 若采用topic模式, 則所有的客戶端都設置為不同的consumer group. 其partition的消費進度在zookeeper有所保存.
*) Kafka API的簡單樣列代碼
1). 生產者代碼
分區類代碼片段
public class SimplePartitioner implements Partitioner {
public SimplePartitioner (VerifiableProperties props) {
}
public int partition(Object key, int numPartitions) {
return (key.hashCode() & 0x0FFFFFFF) % numPartitions;
}
}
評注: SimplePartitioner用於對消息進行分發到具體的partition中, 有消息的key來決定, 這個有點像map/reduce中的partition機制.
生產者代碼片段
Properties props = new Properties();
// 配置metadata.broker.list, 為了高可用, 最好配兩個broker實例
props.put("metadata.broker.list", "127.0.0.1:9092");
// serializer.class為消息的序列化類
props.put("serializer.class", "kafka.serializer.StringEncoder");
// 設置Partition類, 對隊列進行合理的划分
props.put("partitioner.class", "mmxf.kafka.practise.SimplePartitioner");
// ACK機制, 消息發送需要kafka服務端確認
props.put("request.required.acks", "1");
ProducerConfig config = new ProducerConfig(props);
Producer<String, String> producer = new Producer<String, String>(config);
// KeyedMessage<K, V>
// K對應Partition Key的類型
// V對應消息本身的類型
// topic: "test", key: "key", message: "message"
KeyedMessage<String, String> message = new KeyedMessage<String, String>("test", "key", "message");
producer.send(message);
// 關閉producer實例
producer.close();
2). 消費者代碼
使用High Level Consumer的API 線程模型和Partition數最好能保持一致, 即One Thread For Partition
參考sample樣例: https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example
代碼片段如下:
public static void main(String[] args) {
// *) 創建ConsumerConfig
Properties props = new Properties();
// 設置zookeeper的鏈接地址
props.put("zookeeper.connect", "127.0.0.1:2181");
// 設置group id
props.put("group.id", "group_id");
// kafka的group 消費記錄是保存在zookeeper上的, 但這個信息在zookeeper上不是實時更新的, 需要有個間隔時間更新
props.put("auto.commit.interval.ms", "1000");
ConsumerConfig consumerConfig = new ConsumerConfig(props);
ConsumerConnector consumer = (ConsumerConnector) Consumer.createJavaConsumerConnector(consumerConfig);
String topic = "test";
int threadNum = 1;
// *) 設置Topic=>Thread Num映射關系, 構建具體的流
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic,threadNum);
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
// *) 啟動線程池去消費對應的消息
ExecutorService executor = Executors.newCachedThreadPool();
for ( final KafkaStream<byte[], byte[]> stream : streams ) {
executor.submit(new Runnable() {
public void run() {
ConsumerIterator<byte[], byte[]> iter = stream.iterator();
while ( iter.hasNext() ) {
MessageAndMetadata<byte[] , byte[]> mam = iter.next();
System.out.println(
String.format("thread_id: %d, key: %s, value: %s",
Thread.currentThread().getId(),
new String(mam.key()),
new String(mam.message())
)
);
}
}
});
}
try {
Thread.sleep(1000 * 10);
} catch (InterruptedException e) {
e.printStackTrace();
}
// *) 優雅地退出
consumer.shutdown();
executor.shutdown();
while ( !executor.isTerminated() ) {
try {
executor.awaitTermination(1, TimeUnit.SECONDS);
} catch (InterruptedException e) {
}
}
}
結果輸出:
thread_id: 18, key: key, value: message
