kafka搭建&&kafka基礎知識


一、kafa搭建:

1、下載kafka:wget http://archive.apache.org/dist/kafka/0.10.1.1/kafka_2.11-0.10.1.1.tgz
2、解壓 tar -xvzf kafka_2.11-0.10.1.1.tgz
3、在/usr/kafa/kafka_2.11-0.10.1.1/config目錄下配置“zookeeper.properties”,修改dataDir和clientPort、dataLogDir
新增:
dataDir=/usr/kafa/kafka_2.11-0.10.1.1/zk/data
dataLogDir=/usr/kafa/kafka_2.11-0.10.1.1/zk/logs
clientPort=2181
4、配置kafka_2.11-1.1.0/config下的“server.properties”,修改log.dirs和zookeeper.connect。前者是日志存放文件夾,后者是zookeeper連接地址(端口和clientPort保 持一致)。

修改:
port=9092 #端口號
host.name=192.168.0.11 #服務器IP地址,修改為自己的服務器IP
zookeeper.connect=192.168.80.4:2181
5、開啟kafka自帶zookeeper
kafka_2.11-0.10.1.1目錄下:nohup bin/zookeeper-server-start.sh config/zookeeper.properties > zookeeper-run.log 2>&1 &
或者bin/zookeeper-server-start.sh config/zookeeper.properties
6、開啟kafka:
kafka_2.11-0.10.1.1目錄下:nohup bin/kafka-server-start.sh config/server.properties > kafka-run.log 2>&1 &
或者bin/kafka-server-start.sh config/server.properties
7、驗證zookeeper和kafka啟動成功:jps
顯示:[root@localhost bin]# jps
7797 Kafka
7564 QuorumPeerMain //QuorumPeerMain是zookeeper的守護進程
7、創建topic:
kafka_2.11-0.10.1.1目錄下:bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test     //注意這里2181是config目錄中zookeeper.properties配置clientPort端口,replication-factor是每個partition的副本個數  --partitions是對應topic下分區數量
返回結果:Created topic "test"
顯示topic:
kafka_2.11-0.10.1.1目錄下:bin/kafka-topics.sh -list -zookeeper localhost:2181

8、創建消費者

 kafka_2.11-0.10.1.1目錄目錄下:bin/kafka-console-consumer.sh --bootstrap-server 192.168.80.4:9092 --topic test --from-beginning

消費者創建完成之后,因為還沒有發送任何數據,因此這里在執行后沒有打印出任何數據

 


9、創建kafka生產者
bin/kafka-console-producer.sh --broker-list 192.168.80.4:9092 --topic test   //9092和192.168.80.4對應config目錄中server.properties中配置port和host.name=192.168.80.4

在執行完畢后會進入的編輯器頁面進行輸入,然后在消費者中顯示

 

二、kafka基礎知識

1、kafka結構

                       

  broker:消息由producer發往consumer的載體,是kafka集群中一台或多台服務器

  message:消息,通訊的基本單位,每個producer向一個topic發布消息,kafka中message是以topic為基本單位的,不同topic之間是相互獨立的,每個topic又可分為多個partition,每個partition存儲一部分,partition中每條Message包含以下三條屬性:

offset     long
MessageSize   int32
data         messages的具體內容

   

    kafka對外使用topic概念,生產者往topic寫消息,消費者從topic讀消息。為了做到水平擴展一個topic實際由多個partition組成,通過增加partition數量來進行橫向擴容。kafka會為partition選出一個leader,之后所有該partition請求實際操作都是leader,然后再同步到其他follower,當一個broker歇菜后,所有leader在該broker上的partition都會重新選舉,選出一個leader

    leader選取:partition%Num(broker)作為leader

  消費偏移量保存:一個消費組消費partition,需要保存offset記錄消費到哪,0.10之前offset保存在zk中,0.10之后offset保存在_consumeroffsets topic的topic中。寫進消息的key由groupid、topic、partition組成,value是偏移量offset。每個key的offset都是緩存在內存中,查詢時候不用遍歷partition,如果沒有緩存第一次會遍歷partition建立緩存,然后查詢返回。確定consumer group位移信息寫入_consumers_offsets的那個partition計算公式:

                  __consumers_offsets partition =
                             Math.abs(groupId.hashCode() % groupMetadataTopicPartitionCount)                     //groupMetadataTopicPartitionCount由offsets.topic.num.partitions指定,默認是50個分區。

2、kafka多播消費

 

   kafka同一個topic消息只能被同一個consumer group內其中一個consumer消費,而不能被所有消費者消費,故稱為多播,但多個consumer group可同時消費這一消息。

   對比rocketmq,同一個topic消息可以被同一個comsumer group內所有consumer消費(采用MessageModel.BROADCASTING方式)

   對比rabbitmq,同一個queue內消息只能被一個消費實例消費;采用廣播(fanout)方式exchange可以將消息廣播到所有綁定的queue中。

  kafka如果實現廣播,只要每個consumer有一個獨立的group即可;如果實現單播消費,只要所有consumer在同一個group里

 

3、kafka順序消息

     針對部分消息有序(message.key相同的message要保證消費順序),可以在producer往kafka插入數據時控制,同一個key分發到同一個partition上,因為每個partition是固定分配給某個消費者線程進行消費的。所以對於同一個分區的消息來說是嚴格有序的;

    消息producer在發送消息時,對於一個有着先后順序的消息A、B,正常情況下應該是A先發送完成后再發送B,但是在異常情況下,在A發送失敗的情況下,B發送成功,而A由於重試機制在B發送完成之后重試發送成功了,這時對於本身順序為AB的消息順序變成了BA。 為解決此問題,嚴格的消費需要支持參數:max.in.flight.requests.per.connection,該參數含義:在發送阻塞前對於每個連接,正在發送但是發送狀態未知的最大消息數量。如果設置大於1,那么就有可能存在有發送失敗的情況下,因為重試發送導致的消息亂序問題。所以我們應該將其設置為1,保證在后一條消息發送前,前一條的消息狀態已經是可知的。

   

  rocketmq支持順序消費方式:message.key相同的message發往同一個queue上

  rabbitmq支持順序消費:exchange不采用廣播方式,消息只發送到一條queue上

 

 

4、Kafka 分區partition

  4.1、生產者發送消息到topic,消費者訂閱topic,topic下是partition,消息時存儲在partition中的,所以事實上生產者發送消息到partition,消費者從partition讀取消息。

  4.2、topic的partition數設置

    server.properties配置文件中可以指定一個全局的分區數設置,這是對每個主題下的分區數的默認設置,默認是1。

    當然每個主題也可以自己設置分區數量,如果創建主題的時候沒有指定分區數量,則會使用server.properties中的設置:  

          bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic my-topic --partitions 2 --replication-factor 1         

     在創建主題時候可以使用--partition指定topic的分區數量    

     4.3、生產者與partition

       kafkaTemplate.send(topic, partition, key, data)

     默認分區策略:

      ①如果發送消息時指定partition,則消息投遞到指定分區

      ②如果沒有指定partition,但key不是空,則基於key的hash值來選擇一個分區

      ③如果沒有指定partition,key也是空,則用輪詢方式選擇一個分區

  4.4、partition與消費者

    消費者以consumergroup名義訂閱topic,consumergroup中有多個消費實例,同一個consumergroup下消費實例只能以單播方式消費topic下partition內容,每個消息分區 只能被同組的一個消費者進行消費

    消費實例和partition對應關系如下:

    4.4.1  range消費者分區分配策略

    是kafka默認分配策略,限於主題,range分配策略是針對主題,①首先將主題a下partition按數字排序,然后用partition數除以consumergroup1下消費實例總數,如果除盡則平均分配,如果除不盡則位於排序前面的消費者多負責一個分區;②首先將主題a下partition按數字排序,然后用partition數除以consumergroup2下消費實例總數,如果除盡則平均分配,如果除不盡則位於排序前面的消費者多負責一個分區;①②類似,這樣原因是kafka針對不同consumergroup是多播消費的;③將主題b進行步驟①、②相同操作

   

  consumergroup1 :

    c1:tAp0 tAp1   / tBp0

    c2:tAp2  / tBp1

  consumergroup2:

    c1:tAp0  /tBp0

    c2:tAp1 /tBp1  

    c3:tAp2

      

 4.4.2  roundrobi(輪詢)   

      與前面的range策略最大的不同就是它不再局限於某個主題

      如果所有的消費者實例的訂閱都是相同的,那么這樣最好了,可用統一分配,均衡分配

      例如,假設有兩個消費者C0和C1,兩個主題t0和t1,每個主題有3個分區,分別是t0p0,t0p1,t0p2,t1p0,t1p1,t1p2 

        那么,最終分配的結果是這樣的:

        C0: [t0p0, t0p2, t1p1]

        C1: [t0p1, t1p0, t1p2]

 

 5、zookeeper在kafka作用

   5.1 Broker注冊

  Zookeeper上會有一個專門用來進行Broker服務器列表記錄的節點,每個Broker在啟動時,都會到Zookeeper上進行注冊,即到/brokers/ids下創建屬於自己的節點,如/brokers/ids/[0...N],Kafka使用了全局唯一的數字來指代每個Broker服務器,創建完節點后,每個Broker就會將自己的IP地址和端口信息記錄到該節點中去。

  Broker創建的節點類型是臨時節點,一旦Broker宕機,則對應的臨時節點也會被自動刪除。

  5.2 topci注冊

    kafka中,同一個topic信息會被分成多個分區,並將其分布在多個broker上,這些分區信息及broker對應關系是由zookeeper專門節點brokers/topics/[topic]維護的。broker啟動后回到對應topic節點注冊自己的broker ID並寫入針對該topic分區總數,

    如/brokers/topics/login/3->2:broker id為3的broker服務器,杜宇login這個topic消息提供2個分區。

    這個節點是臨時節點

    5.3 生產者負載均衡

     由於同一個Topic消息會被分區並將其分布在多個Broker上,因此,生產者需要將消息合理地發送到這些分布式的Broker上,使用Zookeeper進行負載均衡,由於每個Broker啟動時,都會完成Broker注冊過程,生產者會通過該節點的變化來動態地感知到Broker服務器列表的變更,這樣就可以實現動態的負載均衡機制。

 

 

參考https://www.cnblogs.com/zhangchao0515/p/9502843.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM