一臉懵逼學習KafKa集群的安裝搭建--(一種高吞吐量的分布式發布訂閱消息系統)


kafka的前言知識:
1:Kafka是什么?
    在流式計算中,Kafka一般用來緩存數據,Storm通過消費Kafka的數據進行計算。kafka是一個生產-消費模型。
   Producer:生產者,只負責數據生產,生產者的代碼可以集成到任務系統中。
              數據的分發策略由producer決定,默認是defaultPartition  Utils.abs(key.hashCode) % numPartitions
    Broker:當前服務器上的Kafka進程,俗稱拉皮條。只管數據存儲,不管是誰生產,不管是誰消費。
            在集群中每個broker都有一個唯一brokerid,不得重復。
    Topic:目標發送的目的地,這是一個邏輯上的概念,落到磁盤上是一個partition的目錄。partition的目錄中有多個segment組合(index,log)
            一個Topic對應多個partition[0,1,2,3],一個partition對應多個segment組合。一個segment有默認的大小是1G。
            每個partition可以設置多個副本(replication-factor 1),會從所有的副本中選取一個leader出來。所有讀寫操作都是通過leader來進行的。
            特別強調,和mysql中主從有區別,mysql做主從是為了讀寫分離,在kafka中讀寫操作都是leader。
    ConsumerGroup:數據消費者組,ConsumerGroup可以有多個,每個ConsumerGroup消費的數據都是一樣的。
                   可以把多個consumer線程划分為一個組,組里面所有成員共同消費一個topic的數據,組員之間不能重復消費。 2:Apache Kafka是一個開源消息系統,由Scala寫成。是由Apache軟件基金會開發的一個開源消息系統項目。 3:Kafka是一個分布式消息隊列:生產者、消費者的功能。它提供了類似於JMS的特性,但是在設計實現上完全不同,此外它並不是JMS規范的實現。 4:Kafka對消息保存時根據Topic進行歸類,發送消息者稱為Producer,消息接受者稱為Consumer,此外kafka集群有多個kafka實例組成,每個實例(server)成為broker。
5:無論是kafka集群,還是producer和consumer都依賴於zookeeper集群保存一些meta信息,來保證系統可用性。
6:Kafka核心組件:Topic :消息根據Topic進行歸類;Producer:發送消息者;Consumer:消息接受者;broker:每個kafka實例(server);Zookeeper:依賴集群保存meta信息。
7:消息系統的核心作用就是三點:解耦,異步和並。
8:kafka生產數據時的分組策略?
  默認是defaultPartition  Utils.abs(key.hashCode) % numPartitions。
   上文中的key是producer在發送數據時傳入的,produer.send(KeyedMessage(topic,myPartitionKey,messageContent))。
9:kafka如何保證數據的完全生產?
  ack機制:broker表示發來的數據已確認接收無誤,表示數據已經保存到磁盤。
    0:不等待broker返回確認消息。
    1:等待topic中某個partition leader保存成功的狀態反饋。
    -1:等待topic中某個partition 所有副本都保存成功的狀態反饋。
10:broker如何保存數據?
  在理論環境下,broker按照順序讀寫的機制,可以每秒保存600M的數據。主要通過pagecache機制,盡可能的利用當前物理機器上的空閑內存來做緩存。
    當前topic所屬的broker,必定有一個該topic的partition,partition是一個磁盤目錄。partition的目錄中有多個segment組合(index,log)。
11:如何保證kafka消費者消費數據是全局有序的?
   偽命題,
   如果要全局有序的,必須保證生產有序,存儲有序,消費有序。由於生產可以做集群,存儲可以分片,消費可以設置為一個consumerGroup,要保證全局有序,就需要保證每個環節都有序。只有一個可能,就是一個生產者,一個partition,一個消費者。這種場景和大數據應用場景相悖。

 1:KafKa的官方網址:http://kafka.apache.org/

開發流程圖,如:

2:KafKa的基礎知識:

2.1:kafka是一個分布式的消息緩存系統。
2.2:kafka集群中的服務器都叫做broker。
2.3:kafka有兩類客戶端,一類叫producer(消息生產者),一類叫做consumer(消息消費者),客戶端和broker服務器之間采用tcp協議連接。
2.4:kafka中不同業務系統的消息可以通過topic進行區分,而且每一個消息topic都會被分區,以分擔消息讀寫的負載。
2.5每一個分區都可以有多個副本,以防止數據的丟失。
2.6某一個分區中的數據如果需要更新,都必須通過該分區所有副本中的leader來更新。
2.7消費者可以分組,比如有兩個消費者組A和B,共同消費一個topic:order_info,A和B所消費的消息不會重復。
  比如 order_info 中有100個消息,每個消息有一個id,編號從0-99,那么,如果A組消費0-49號,B組就消費50-99號。
2.8消費者在具體消費某個topic中的消息時,可以指定起始偏移量。

 3:KafKa集群的安裝搭建,注意區分單節點KafKa集群的搭建。

Topic :消息根據Topic進行歸類;Producer:發送消息者;Consumer:消息接受者;broker:每個kafka實例(server);Zookeeper:依賴集群保存meta信息。

  3.1:kafka集群安裝,第一步上傳kafka_2.10-0.8.1.1.tgz到虛擬機上面,過程省略,然后進行解壓縮操作:

  3.2:修改kafka配置文件,修改server.properties

修改如下所示,具體情況可以根據手冊修改,詳細修改可以參考Kafka的文檔:

 1 #broker的全局唯一的編號,不可以重復
 2 broker.id=0 
 3 
 4 #用來監聽鏈接的端口,producer或者consumer將在此端口建立連接
 5 port=9092
 6  
 7 #處理網絡請求的線程數量 
 8 num.network.threads=3
 9 
10 #用來處理磁盤Io的線程數量 
11 num.io.threads=8  
12 
13 #發送套接字的緩沖區大小
14 socket.send.buffer.bytes=102400
15 
16 #接受套接字的緩沖區的大小
17 socket.receive.buffer.bytes=102400
18 
19 #請求套接字的緩沖區大小 
20 socket.request.max.bytes=104857600  
21 
22 #kafka運行日志存放的路徑
23 log.dirs=/tmp/kafka-logs  
24 
25 #topic在當前broker上的分片個數
26 num.partitions=2  
27 
28 #用來恢復和清理data下數據的線程數量
29 num.recovery.threads.per.data.dir=1
30 
31 #segment文件保留的最長時間,超時將被刪除
32 log.retention.hours=168  
33 
34 #滾動生成新的segment文件的最大時間
35 log.roll.hours=168
36 
37 #topic的分區是以一堆segment文件存儲的,這個控制每個segment的大小,會被topic創建時的指定參數覆蓋
38 log.segment.bytes=536870912  
39 
40 #文件大小檢查的周期時間,是否處罰 log.cleanup.policy中設置的策略
41 log.retention.check.interval.ms=60000 
42 
43 #是否開啟日志清理 
44 log.cleaner.enable=false  
45 
46 #zookeeper集群的地址,可以是多個,多個之間用逗號分割 hostname1:port1,hostname2:port2,hostname3:port3
47 zookeeper.connect=localhost:2181  
48 
49 #ZooKeeper的連接超時時間
50 zookeeper.connection.timeout.ms=1000000  

 具體操作修改如下所示:

 

 使用自己部署的Zookeeper集群,修改如下所示:

可以直接搜索:/zookeeper.connect找到所要修改的內容:

 將配置好的Kafka復制到另外兩個節點上面:

[root@master hadoop]# scp -r kafka_2.10-0.8.1.1/ slaver1:/home/hadoop/

[root@master hadoop]# scp -r kafka_2.10-0.8.1.1/ slaver2:/home/hadoop/

 

 然后修改一下另外兩台的broker.id=2和broker.id=3:

 

這里插一個slf4j的配置,將 [hadoop@slaver1 slf4j-1.7.6]$ unzip slf4j-1.7.6.zip進行解壓縮操作。

然后將[hadoop@slaver1 slf4j-1.7.6]$ cp slf4j-nop-1.7.6.jar  /home/hadoop/soft/kafka_2.9.2-0.8.1/libs/目錄下面。

  3.3:將zookeeper集群啟動:

[root@master hadoop]# cd /home/hadoop/zookeeper-3.4.5/bin/
[root@master bin]# ./zkServer.sh start
[root@slaver2 bin]#  ./zkServer.sh status

 

   3.4:在每一台節點上啟動broker:

    bin/kafka-server-start.sh config/server.properties

Unrecognized VM option 'UseCompressedOops'
Error: Could not create the Java Virtual Machine.
Error: A fatal exception has occurred. Program will exit.

啟動的時候報錯了,問題的根本是UseCompressedOops是jdk8的,而我的jdk是7,所以解決一下問題:

原因是jdk的版本不匹配,需要修改一下配置文件
修改文件:
  去掉這個配置
  -XX:+UseCompressedOops

 

進去以后,搜索一下比較快:/UseCompressedOops,然后看到如下,刪除如此配置:

[root@master bin]# vim kafka-run-class.sh

其他兩個節點的都按照如此刪除掉即可

修改好以后開始跑:

在每一台節點上啟動broker:
bin/kafka-server-start.sh config/server.properties

1、首先安裝nohup:
[hadoop@slaver1 ~]$ yum install coreutils
2、后台啟動kafka服務:
[hadoop@slaver1 kafka_2.9.2-0.8.1]$ nohup bin/kafka-server-start.sh config/server.properties &

然后按照如此將其他兩個節點都啟動起來,然后復制xshell的連接看一下jps進程啟動情況:

 三個都啟動起來,可以看一下,broker 1,broker 2,broker 3都啟動起來了:

可以使用復制的xshell窗口查看jps進程啟動情況:

   3.5:在kafka集群中創建一個topic:

常用命令如下所示:

1:Kafka常用操作命令
1.1:查看當前服務器中的所有topic。--zookeeper master:2181指定zookeeper。
    bin/kafka-topics.sh --list --zookeeper  master:2181
1.2:創建topic。--partitions 3,指定三個分區。--replication-factor 1指定備份的副本數量。--topic topicTest,指定topic的名稱。
    ./kafka-topics.sh --create --zookeeper master:2181 --replication-factor 1 --partitions 3 --topic topicTest
1.3:刪除topic
    bin/kafka-topics.sh --delete --zookeeper master:2181 --topic topicTest
    注意:需要server.properties中設置delete.topic.enable=true否則只是標記刪除或者直接重啟。
1.4:通過shell命令發送消息。生產者。--broker-list master:9092
    bin/kafka-console-producer.sh --broker-list master:9092 --topic topicTest
1.5:通過shell消費消息。消費者。--from-beginning從最開始消費。
    bin/kafka-console-consumer.sh --zookeeper master:2181 --from-beginning --topic topicTest
1.6:查看消費位置 bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper master:2181 --group testGroup 1.7:查看某個Topic的詳情
    bin/kafka-topics.sh --topic topicTest --describe --zookeeper master:2181

[root@master kafka_2.10-0.8.1.1]# bin/kafka-topics.sh --create --zookeeper master:2181 --replication-factor 3 --partitions 1 --topic order

 

可以查看一下自己創建的topic:

 [root@master kafka_2.10-0.8.1.1]# bin/kafka-topics.sh --list --zookeeper master:2181

可以創建多個多個topic,也可以查看一下自己創建的topic:

   3.6:用一個producer向某一個topic中寫入消息,生產者產生消息,消費者消費消息,如下生產者可以生產:

如下先啟動一下生產者,先不生產消息,然后一個消費者,看看是否有輸出,然后再生產消息,再去消費者看看消費消息:

#生產者
[root@master kafka_2.10-0.8.1.1]# bin/kafka-console-producer.sh --broker-list master:9092 --topic order
#消費者
[root@master kafka_2.10-0.8.1.1]# bin/kafka-console-consumer.sh --zookeeper master:2181 --from-beginning --topic order

上面是生產者:

下面是消費者:

  3.7:查看一個topic的分區及副本狀態信息:

自己可以找任意一個xshell復制連接進程查看:

[root@slaver1 kafka_2.10-0.8.1.1]# bin/kafka-topics.sh --describe --zookeeper master:2181 --topic order

 4:kafka運行在后台如何操作,如下所示:

  1>/dev/null:代表標准輸入到這個目錄;

  2>&1:代表標准輸出也到這個目錄下面;

  &:代表這個是后台運行;

[root@master kafka_2.10-0.8.1.1]# bin/kafka-server-start.sh config/server.properties 1>/dev/null 2>&1 &

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM