Kafka1 利用虛擬機搭建自己的Kafka集群


前言:

      上周末自己學習了一下Kafka,參考網上的文章,學習過程中還是比較順利的,遇到的一些問題最終也都解決了,現在將學習的過程記錄與此,供以后自己查閱,如果能幫助到其他人,自然是更好的。

===============================================================長長的分割線====================================================================

正文:

  關於Kafka的理論介紹,網上可以搜到到很多的資料,大家可以自行搜索,我這里就不在重復贅述。

      本文中主要涉及三塊內容: 第一,就是搭建Zookeeper環境;第二,搭建Kafka環境,並學習使用基本命令發送接收消息;第三,使用Java API完成操作,以便初步了解在實際項目中的使用方式。

  閑話少說,言歸正傳,本次的目的是利用VMware搭建一個屬於自己的ZooKeeper和Kafka集群。本次我們選擇的是VMware10,具體的安裝步驟大家可以到網上搜索,資源很多。

  第一步,確定目標:

      ZooKeeperOne       192.168.224.170  CentOS

      ZooKeeperTwo       192.168.224.171  CentOS

      ZooKeeperThree     192.168.224.172  CentOS

      KafkaOne                192.168.224.180  CentOS

      KafkaTwo                192.168.224.181  CentOS

      我們安裝的ZooKeeper是3.4.6版本,可以從這里下載zookeeper-3.4.6; Kafka安裝的是0.8.1版本,可以從這里下載kafka_2.10-0.8.1.tgz; JDK安裝的版本是1.7版本。

      另: 我在學習的時候,搭建了兩台Kafka服務器,正式環境中我們最好是搭建2n+1台,此處僅作為學些之用,暫不計較。

 

      第二步,搭建Zookeeper集群:

      此處大家可以參照我之前寫的一篇文章 ZooKeeper1  利用虛擬機搭建自己的ZooKeeper集群 ,我在搭建Kafka的環境的時候就是使用的之前搭建好的Zookeeper集群。

       

      第三步,搭建Kafka集群:

      (1). 將第一步中下載的 kafka_2.10-0.8.1.tgz 解壓縮后,進入config目錄,會看到如下圖所示的一些配置文件,我們准備編輯server.properties文件。

      

      (2). 打開 server.properties 文件,需要編輯的屬性如下所示:

1 broker.id=0
2 port=9092
3 host.name=192.168.224.180
4 
5 log.dirs=/opt/kafka0.8.1/kafka-logs
6 
7 zookeeper.connect=192.168.224.170:2181,192.168.224.171:2181,192.168.224.172:2181

      注意: 

      a. broker.id: 每個kafka對應一個唯一的id,自行分配即可

      b. port: 默認的端口號是9092,使用默認端口即可

      c. host.name: 配置的是當前機器的ip地址

      d. log.dirs: 日志目錄,此處自定義一個目錄路徑即可

      e. zookeeper.connect: 將我們在第二步搭建的Zookeeper集群的配置全部寫上

      (3). 上邊的配置完畢后,我們需要執行命令 vi /etc/hosts,將相關服務器的host配置如下圖,如果沒有執行此步,后邊我們在執行一些命令的時候,會報無法識別主機的錯誤

       

      (4).  經過上述操作,我們已經完成了對Kafka的配置,很簡單吧?!但是如果我們執行 bin/kafka-server-start.sh   config/server.properties  & 這個啟動命令,可能我們會遇到如下兩個問題:

       a. 我們在啟動的報 Unrecognized VM option '+UseCompressedOops'.Could not create the Java virtual machine. 這個錯誤。

       

       解決方式:

       查看 bin/kafka-run-class.sh 

       找到下面這段代碼,去掉-XX:+UseCompressedOops     

1 if [ -z "$KAFKA_JVM_PERFORMANCE_OPTS" ]; then
2 KAFKA_JVM_PERFORMANCE_OPTS="-server -XX:+UseCompressedOops -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:+CMSClassUnloadingEnabled -XX:+CMSScavengeBeforeRemark -XX:+DisableExplicitGC -Djava.awt.headless=true"
3 fi

        b. 解決了第一個問題,我們還有可能在啟動的時候遇到 java.lang.NoClassDefFoundError: org/slf4j/impl/StaticLoggerBinder 這個錯誤。

        

        解決方式:

        從網上的下載 slf4j-nop-1.6.0.jar 這個jar包,然后放到kafka安裝目錄下的libs目錄中即可。注意,基於我目前的kafka版本,我最開始從網上下載的slf4j-nop-1.5.0.jar 這個jar包,但是啟動的時候依然會報錯,所以一定要注意版本號哦~

      (5). 現在我們執行 bin/kafka-server-start.sh   config/server.properties  & 這個啟動命令,應該就可以正常的啟動Kafka了。命令最后的 & 符號是為了讓啟動程序在后台執行。如果不加這個 & 符號,當執行完啟動后,我們通常會使用 ctrl + c 退出當前控制台,kafka此時會自動執行shutdown,所以此處最好加上 & 符號。

 

      第三步,使用基本命令創建消息主題,發送和接收主題消息:

      (1). 創建、查看消息主題

 1 #連接zookeeper, 創建一個名為myfirsttopic的topic
 2 bin/kafka-topics.sh --create --zookeeper 192.168.224.170:2181 --replication-factor 2 --partitions 1 --topic myfirsttopic
 3 
 4 # 查看此topic的屬性  
 5 bin/kafka-topics.sh --describe --zookeeper 192.168.224.170:2181 --topic myfirsttopic  
 6 
 7 # 查看已經創建的topic列表  
 8 bin/kafka-topics.sh --list --zookeeper 192.168.224.170:2181

      上述命令執行完畢后,截圖如下:

       

       

      (2). 創建一個消息的生產者:

1 #啟動生產者,發送消息
2 bin/kafka-console-producer.sh --broker-list 192.168.224.180:9092 --topic myfirsttopic
3 
4 #啟動消費者,接收消息
5 bin/kafka-console-consumer.sh --zookeeper 192.168.224.170:2181 --from-beginning --topic myfirsttopic

      上述命令執行完畢后,截圖如下:

       

      (3). 按照(1)、(2)這兩步,你應該可以利用Kafka感受到了分布式消息系統。這里需要着重的再說一下我在這個過程中發現的一個問題: 大家可以看下上圖中的consumer的命令,我選擇了zookeeper的其中一台192.168.224.170:2181接收消息是可以正常接收的!不要忘了,我是三台zookeeper的,所以我又嘗試了向192.168.224.171:2181和192.168.224.172:2181接收myfirsttopic這個主題的消息。正常情況下,三台訪問的結果應該都是可以正常的接收消息,但是當時我的情況在訪問了192.168.224.171:2181這台時會報 org.apache.zookeeper.clientcnxn 這個錯誤!!!

             我當時多試了兩遍,發現我的三台zookeeper中,誰是leader(zkServer.sh status命令),concumer連接的時候就會報上面的那個異常。后來定位到了zookeeper的zoo.cfg配置文件中的maxClientCnxns屬性,即客戶端最大連接數,我當時使用的是默認配置是2。后來我把這個屬性的值調大一些,consumer連接zookeeper leader時,就不會報這個錯誤了。如果你選擇將這個屬性注釋掉(從網上查詢到注釋掉該屬性默認值是10),也不會報這個錯誤了。其實網上的很多文章也只是說了此屬性可以盡量設置的大一些,沒有解釋其他的。

             但我后來還是仔細想了想,當我把maxClientCnxns這個屬性設置為2時,如果兩台kafka啟動時,每個kafka和zookeeper的節點之間建立了一個客戶端連接,那么此時zookeeper的每個節點的客戶端連接數就已經達到了最大連接數2,那么我創建consumer的時候,應該是三台zookeeper連接都有問題,而不是只有leader會有問題。所以,此處需要各位有見解的再幫忙解釋一下!!!       

 

      第四步,使用Java API 操作Kafka:

      其實Java API提供的功能基本也是基於上邊的客戶端命令來實現的,萬變不離其宗,我將我整理的網上的例子貼到下面,大家可以在本地Java工程中執行一下,即可了解調用方法。

      (1). 我的maven工程中pom.xml的配置

 1 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 2   <modelVersion>4.0.0</modelVersion>
 3   <groupId>com.ismurf.study</groupId>
 4   <artifactId>com.ismurf.study.kafka</artifactId>
 5   <version>0.0.1-SNAPSHOT</version>
 6   <name>Kafka_Project_0001</name>
 7   <packaging>war</packaging>
 8   
 9   <dependencies>
10           <dependency>
11             <groupId>org.apache.kafka</groupId>
12             <artifactId>kafka_2.10</artifactId>
13             <version>0.8.1.1</version>
14         </dependency>
15   </dependencies>
16   
17   <build>
18       <plugins>
19           <plugin>
20             <groupId>org.apache.maven.plugins</groupId>
21             <artifactId>maven-war-plugin</artifactId>
22             <version>2.1.1</version>
23             <configuration>
24                 <outputFileNameMapping>@{artifactId}@.@{extension}@</outputFileNameMapping>
25             </configuration>
26         </plugin>
27           
28         <!-- Ensures we are compiling at 1.6 level -->
29         <plugin>
30             <groupId>org.apache.maven.plugins</groupId>
31             <artifactId>maven-compiler-plugin</artifactId>
32             <configuration>
33                 <source>1.6</source>
34                 <target>1.6</target>
35             </configuration>
36         </plugin>
37         
38         <plugin>
39             <groupId>org.apache.maven.plugins</groupId>
40             <artifactId>maven-surefire-plugin</artifactId>
41             <configuration>
42                 <skipTests>true</skipTests>
43             </configuration>
44         </plugin>
45       </plugins>
46   </build>
47   
48 </project>

      (2). 實例代碼: 大家可以參考這片文章的  http://blog.csdn.net/honglei915/article/details/37563647 中的代碼,粘貼到工程后即可使用,上述文章中的代碼整理后目錄截圖如下:

      

   

             

   

      

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM