kafka的前言知識: 1:Kafka是什么? 在流式計算中,Kafka一般用來緩存數據,Storm通過消費Kafka的數據進行計算。kafka是一個生產-消費模型。
Producer:生產者,只負責數據生產,生產者的代碼可以集成到任務系統中。
數據的分發策略由producer決定,默認是defaultPartition Utils.abs(key.hashCode) % numPartitions
Broker:當前服務器上的Kafka進程,俗稱拉皮條。只管數據存儲,不管是誰生產,不管是誰消費。
在集群中每個broker都有一個唯一brokerid,不得重復。
Topic:目標發送的目的地,這是一個邏輯上的概念,落到磁盤上是一個partition的目錄。partition的目錄中有多個segment組合(index,log)
一個Topic對應多個partition[0,1,2,3],一個partition對應多個segment組合。一個segment有默認的大小是1G。
每個partition可以設置多個副本(replication-factor 1),會從所有的副本中選取一個leader出來。所有讀寫操作都是通過leader來進行的。
特別強調,和mysql中主從有區別,mysql做主從是為了讀寫分離,在kafka中讀寫操作都是leader。
ConsumerGroup:數據消費者組,ConsumerGroup可以有多個,每個ConsumerGroup消費的數據都是一樣的。
可以把多個consumer線程划分為一個組,組里面所有成員共同消費一個topic的數據,組員之間不能重復消費。 2:Apache Kafka是一個開源消息系統,由Scala寫成。是由Apache軟件基金會開發的一個開源消息系統項目。 3:Kafka是一個分布式消息隊列:生產者、消費者的功能。它提供了類似於JMS的特性,但是在設計實現上完全不同,此外它並不是JMS規范的實現。 4:Kafka對消息保存時根據Topic進行歸類,發送消息者稱為Producer,消息接受者稱為Consumer,此外kafka集群有多個kafka實例組成,每個實例(server)成為broker。
5:無論是kafka集群,還是producer和consumer都依賴於zookeeper集群保存一些meta信息,來保證系統可用性。
6:Kafka核心組件:Topic :消息根據Topic進行歸類;Producer:發送消息者;Consumer:消息接受者;broker:每個kafka實例(server);Zookeeper:依賴集群保存meta信息。
7:消息系統的核心作用就是三點:解耦,異步和並。
8:kafka生產數據時的分組策略?
默認是defaultPartition Utils.abs(key.hashCode) % numPartitions。
上文中的key是producer在發送數據時傳入的,produer.send(KeyedMessage(topic,myPartitionKey,messageContent))。
9:kafka如何保證數據的完全生產?
ack機制:broker表示發來的數據已確認接收無誤,表示數據已經保存到磁盤。
0:不等待broker返回確認消息。
1:等待topic中某個partition leader保存成功的狀態反饋。
-1:等待topic中某個partition 所有副本都保存成功的狀態反饋。
10:broker如何保存數據?
在理論環境下,broker按照順序讀寫的機制,可以每秒保存600M的數據。主要通過pagecache機制,盡可能的利用當前物理機器上的空閑內存來做緩存。
當前topic所屬的broker,必定有一個該topic的partition,partition是一個磁盤目錄。partition的目錄中有多個segment組合(index,log)。
11:如何保證kafka消費者消費數據是全局有序的?
偽命題,
如果要全局有序的,必須保證生產有序,存儲有序,消費有序。由於生產可以做集群,存儲可以分片,消費可以設置為一個consumerGroup,要保證全局有序,就需要保證每個環節都有序。只有一個可能,就是一個生產者,一個partition,一個消費者。這種場景和大數據應用場景相悖。
1:KafKa的官方網址:http://kafka.apache.org/
開發流程圖,如:
2:KafKa的基礎知識:
2.1:kafka是一個分布式的消息緩存系統。
2.2:kafka集群中的服務器都叫做broker。
2.3:kafka有兩類客戶端,一類叫producer(消息生產者),一類叫做consumer(消息消費者),客戶端和broker服務器之間采用tcp協議連接。
2.4:kafka中不同業務系統的消息可以通過topic進行區分,而且每一個消息topic都會被分區,以分擔消息讀寫的負載。
2.5:每一個分區都可以有多個副本,以防止數據的丟失。
2.6:某一個分區中的數據如果需要更新,都必須通過該分區所有副本中的leader來更新。
2.7:消費者可以分組,比如有兩個消費者組A和B,共同消費一個topic:order_info,A和B所消費的消息不會重復。
比如 order_info 中有100個消息,每個消息有一個id,編號從0-99,那么,如果A組消費0-49號,B組就消費50-99號。
2.8:消費者在具體消費某個topic中的消息時,可以指定起始偏移量。
3:KafKa集群的安裝搭建,注意區分單節點KafKa集群的搭建。
Topic :消息根據Topic進行歸類;Producer:發送消息者;Consumer:消息接受者;broker:每個kafka實例(server);Zookeeper:依賴集群保存meta信息。
3.1:kafka集群安裝,第一步上傳kafka_2.10-0.8.1.1.tgz到虛擬機上面,過程省略,然后進行解壓縮操作:
3.2:修改kafka配置文件,修改server.properties
修改如下所示,具體情況可以根據手冊修改,詳細修改可以參考Kafka的文檔:
1 #broker的全局唯一的編號,不可以重復 2 broker.id=0 3 4 #用來監聽鏈接的端口,producer或者consumer將在此端口建立連接 5 port=9092 6 7 #處理網絡請求的線程數量 8 num.network.threads=3 9 10 #用來處理磁盤Io的線程數量 11 num.io.threads=8 12 13 #發送套接字的緩沖區大小 14 socket.send.buffer.bytes=102400 15 16 #接受套接字的緩沖區的大小 17 socket.receive.buffer.bytes=102400 18 19 #請求套接字的緩沖區大小 20 socket.request.max.bytes=104857600 21 22 #kafka運行日志存放的路徑 23 log.dirs=/tmp/kafka-logs 24 25 #topic在當前broker上的分片個數 26 num.partitions=2 27 28 #用來恢復和清理data下數據的線程數量 29 num.recovery.threads.per.data.dir=1 30 31 #segment文件保留的最長時間,超時將被刪除 32 log.retention.hours=168 33 34 #滾動生成新的segment文件的最大時間 35 log.roll.hours=168 36 37 #topic的分區是以一堆segment文件存儲的,這個控制每個segment的大小,會被topic創建時的指定參數覆蓋 38 log.segment.bytes=536870912 39 40 #文件大小檢查的周期時間,是否處罰 log.cleanup.policy中設置的策略 41 log.retention.check.interval.ms=60000 42 43 #是否開啟日志清理 44 log.cleaner.enable=false 45 46 #zookeeper集群的地址,可以是多個,多個之間用逗號分割 hostname1:port1,hostname2:port2,hostname3:port3 47 zookeeper.connect=localhost:2181 48 49 #ZooKeeper的連接超時時間 50 zookeeper.connection.timeout.ms=1000000
具體操作修改如下所示:
使用自己部署的Zookeeper集群,修改如下所示:
可以直接搜索:/zookeeper.connect找到所要修改的內容:
將配置好的Kafka復制到另外兩個節點上面:
[root@master hadoop]# scp -r kafka_2.10-0.8.1.1/ slaver1:/home/hadoop/
[root@master hadoop]# scp -r kafka_2.10-0.8.1.1/ slaver2:/home/hadoop/
然后修改一下另外兩台的broker.id=2和broker.id=3:
這里插一個slf4j的配置,將 [hadoop@slaver1 slf4j-1.7.6]$ unzip slf4j-1.7.6.zip進行解壓縮操作。
然后將[hadoop@slaver1 slf4j-1.7.6]$ cp slf4j-nop-1.7.6.jar /home/hadoop/soft/kafka_2.9.2-0.8.1/libs/目錄下面。
3.3:將zookeeper集群啟動:
[root@master hadoop]# cd /home/hadoop/zookeeper-3.4.5/bin/
[root@master bin]# ./zkServer.sh start
[root@slaver2 bin]# ./zkServer.sh status
3.4:在每一台節點上啟動broker:
bin/kafka-server-start.sh config/server.properties
Unrecognized VM option 'UseCompressedOops' Error: Could not create the Java Virtual Machine. Error: A fatal exception has occurred. Program will exit.
啟動的時候報錯了,問題的根本是UseCompressedOops是jdk8的,而我的jdk是7,所以解決一下問題:
原因是jdk的版本不匹配,需要修改一下配置文件
修改文件:
去掉這個配置
-XX:+UseCompressedOops
進去以后,搜索一下比較快:/UseCompressedOops,然后看到如下,刪除如此配置:
[root@master bin]# vim kafka-run-class.sh
其他兩個節點的都按照如此刪除掉即可:
修改好以后開始跑:
在每一台節點上啟動broker:
bin/kafka-server-start.sh config/server.properties
1、首先安裝nohup: [hadoop@slaver1 ~]$ yum install coreutils 2、后台啟動kafka服務:
[hadoop@slaver1 kafka_2.9.2-0.8.1]$ nohup bin/kafka-server-start.sh config/server.properties &
然后按照如此將其他兩個節點都啟動起來,然后復制xshell的連接看一下jps進程啟動情況:
三個都啟動起來,可以看一下,broker 1,broker 2,broker 3都啟動起來了:
可以使用復制的xshell窗口查看jps進程啟動情況:
3.5:在kafka集群中創建一個topic:
常用命令如下所示:
1:Kafka常用操作命令 1.1:查看當前服務器中的所有topic。--zookeeper master:2181指定zookeeper。 bin/kafka-topics.sh --list --zookeeper master:2181 1.2:創建topic。--partitions 3,指定三個分區。--replication-factor 1指定備份的副本數量。--topic topicTest,指定topic的名稱。 ./kafka-topics.sh --create --zookeeper master:2181 --replication-factor 1 --partitions 3 --topic topicTest 1.3:刪除topic bin/kafka-topics.sh --delete --zookeeper master:2181 --topic topicTest 注意:需要server.properties中設置delete.topic.enable=true否則只是標記刪除或者直接重啟。 1.4:通過shell命令發送消息。生產者。--broker-list master:9092 bin/kafka-console-producer.sh --broker-list master:9092 --topic topicTest 1.5:通過shell消費消息。消費者。--from-beginning從最開始消費。 bin/kafka-console-consumer.sh --zookeeper master:2181 --from-beginning --topic topicTest 1.6:查看消費位置 bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper master:2181 --group testGroup 1.7:查看某個Topic的詳情 bin/kafka-topics.sh --topic topicTest --describe --zookeeper master:2181
[root@master kafka_2.10-0.8.1.1]# bin/kafka-topics.sh --create --zookeeper master:2181 --replication-factor 3 --partitions 1 --topic order
可以查看一下自己創建的topic:
[root@master kafka_2.10-0.8.1.1]# bin/kafka-topics.sh --list --zookeeper master:2181
可以創建多個多個topic,也可以查看一下自己創建的topic:
3.6:用一個producer向某一個topic中寫入消息,生產者產生消息,消費者消費消息,如下生產者可以生產:
如下先啟動一下生產者,先不生產消息,然后一個消費者,看看是否有輸出,然后再生產消息,再去消費者看看消費消息:
#生產者 [root@master kafka_2.10-0.8.1.1]# bin/kafka-console-producer.sh --broker-list master:9092 --topic order #消費者 [root@master kafka_2.10-0.8.1.1]# bin/kafka-console-consumer.sh --zookeeper master:2181 --from-beginning --topic order
上面是生產者:
下面是消費者:
3.7:查看一個topic的分區及副本狀態信息:
自己可以找任意一個xshell復制連接進程查看:
[root@slaver1 kafka_2.10-0.8.1.1]# bin/kafka-topics.sh --describe --zookeeper master:2181 --topic order
4:kafka運行在后台如何操作,如下所示:
1>/dev/null:代表標准輸入到這個目錄;
2>&1:代表標准輸出也到這個目錄下面;
&:代表這個是后台運行;
[root@master kafka_2.10-0.8.1.1]# bin/kafka-server-start.sh config/server.properties 1>/dev/null 2>&1 &