簡介
首先簡單說下對kafka的理解:
1、kafka是一個分布式的消息緩存系統;
2、kafka集群中的服務器節點都被稱作broker
3、kafka的客戶端分為:一是producer(消息生產者)負責往消息隊列中放入消息;另一類是consumer(消息消費者)負責從消息隊列中取消息。客戶端和服務器之間的通信采用tcp協議
4、kafka中不同業務系統的消息可以通過topic(主題)進行區分,也就是說一個主題就是一個消息隊列,而且每一個消息topic都會被分區,以分擔消息讀寫的負載
5、parition(分區)是物理上的概念,每個topic包含一個或多個partition,創建topic時可指定parition數量。每個partition對應於一個文件夾,該文件夾下存儲該partition的數據和索引文件。每一個分區都可以有多個副本,以防止數據的丟失
6、某一個分區中的數據如果需要更新,都必須通過該分區所有副本中的leader來更新
7、消費者可以分組,每一個consumer屬於特定的組,同一topic的一條消息只能被同一個consumer group內的一個consumer消費,但多個consumer group可同時消費這一消息。比如有兩個消費者組A和B,共同消費一個topic:topic-1,A和B所消費的消息不會重復.
比如 topic-1中有100個消息,每個消息有一個id,編號從0-99,那么,如果A組消費0-49號,B組就消費50-99號
8、消費者在具體消費某個topic中的消息時,可以指定起始偏移量
集群安裝、啟動
1、下載安裝包並解壓
tar xf kafka_2.10-0.8.1.1.tgz
cd kafka_2.10-0.8.1.1
2、修改config/server.properties配置文件
broker.id=1
zookeeper.connect=192.168.2.100:2181, 192.168.2.110:2181, 192.168.2.120:2181
注:kafka集群依賴zookeeper集群,所以此處需要配置zookeeper集群;zookeeper集群配置請參見:http://www.cnblogs.com/skyfeng/articles/6701458.html
3、將kafka解壓包使用scp命令拷貝至集群其他節點,命令:
scp -r kafka_2.10-0.8.1.1/ 192.168.2.110://home/hadoop/app
4、將zookeeper集群啟動,請參見:http://www.cnblogs.com/skyfeng/articles/6701458.html
5、在每一台節點上啟動broker
bin/kafka-server-start.sh config/server.properties
//運行在后台命令:
bin/kafka-server-start.sh config/server.properties 1>/dev/null 2>&1 &
//使用jps命令查看是否啟動
[hadoop@hadoop1-1 kafka_2.10-0.8.1.1]$ jps 2400 Jps 2360 Kafka 2289 QuorumPeerMain
簡單測試
1、在kafka集群中創建一個topic
[hadoop@hadoop1-1 kafka_2.10-0.8.1.1]$ bin/kafka-topics.sh --create --zookeeper 192.168.2.100:2181 --replication-factor 3 --partitions 1 --topic topictest
Created topic "topictest".
replication-factor:表示副本數量
partitions :分區數量
2、用一個producer向某一個topic中寫入消息
[hadoop@hadoop1-1 kafka_2.10-0.8.1.1]$ bin/kafka-console-producer.sh --broker-list 192.168.2.100:9092 --topic topictest
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
3、用一個comsumer從某一個topic中讀取信息
[hadoop@hadoop1-2 kafka_2.10-0.8.1.1]$ bin/kafka-console-consumer.sh --zookeeper 192.168.2.100:2181 --from-beginning --topic topictest
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
在生產者中輸入內容,消費者會及時從隊列中獲取消息,如下圖:
4、查看一個topic的分區及副本狀態信息
[hadoop@hadoop1-3 kafka_2.10-0.8.1.1]$ bin/kafka-topics.sh --describe --zookeeper 192.168.2.110:2181 --topic topictest
Topic:topictest PartitionCount:1 ReplicationFactor:3 Configs: Topic: topictest Partition: 0 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2 [hadoop@hadoop1-3 kafka_2.10-0.8.1.1]$ bin/kafka-topics.sh --describe --zookeeper 192.168.2.100:2181 --topic topictest Topic:topictest PartitionCount:1 ReplicationFactor:3 Configs: Topic: topictest Partition: 0 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2 [hadoop@hadoop1-3 kafka_2.10-0.8.1.1]$ bin/kafka-topics.sh --describe --zookeeper 192.168.2.120:2181 --topic topictest Topic:topictest PartitionCount:1 ReplicationFactor:3 Configs: Topic: topictest Partition: 0 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2 [hadoop@hadoop1-3 kafka_2.10-0.8.1.1]$
5、查看topic
bin/kafka-topics.sh --list --zookeeper 192.168.2.100:2181