【運維技術】從零開始搭建開發使用的Kafka環境


【原創】從零開始搭建開發使用的Kafka環境

入門資料

  1. 百度百科:
    Kafka是一種高吞吐量的分布式發布訂閱消息系統,這些數據通常是由於吞吐量的要求而通過處理日志和日志聚合來解決。 對於像Hadoop的一樣的日志數據和離線分析系統,但又要求實時處理的限制,這是一個可行的解決方案。Kafka的目的是通過Hadoop的並行加載機制來統一線上和離線的消息處理,也是為了通過集群來提供實時的消費。

  2. 歸屬公司
    Apache Kafka
    軟件語言:scala

  3. 相關術語介紹

  • Broker: Kafka集群包含一個或多個服務器,這種服務器被稱為broker[
  • Topic:每條發布到Kafka集群的消息都有一個類別,這個類別被稱為Topic。(物理上不同Topic的消息分開存儲,邏輯上一個Topic的消息雖然保存於一個或多個broker上但用戶只需指定消息的Topic即可生產或消費數據而不必關心數據存於何處)
  • Partition:Partition是物理上的概念,每個Topic包含一個或多個Partition.
  • Producer:賦值發布消息到負責發布消息到Kafka broker(生產者)
  • Consumer:消息消費者,向Kafka broker讀取消息的客戶端。
  • Consumer Group:每個Consumer屬於一個特定的Consumer Group(可為每個Consumer指定group name,若不指定group name則屬於默認的group)。

目標以及流程:

  1. 單機搭建kafka
  2. 集群搭建kafka(下一步)
  3. 搭建kafka控制台(下一步)

kafka控制台選型:

  1. Kafka Web Console
  2. Kafka Manager
  3. KafkaOffsetMonitor
    比較:
    若只需要監控功能,推薦使用KafkaOffsetMonito,若偏重Kafka集群管理,推薦使用Kafka Manager。
    因為都是開源程序,穩定性欠缺。故需先了解清楚目前已存在哪些Bug,多測試一下,避免出現類似於Kafka Web Console的問題。

開始單機搭建kafka:

  1. 官網:http://kafka.apache.org/intro
  2. 學習官方網站的快速啟動教程:http://kafka.apache.org/quickstart
  3. 官網的教程比較有服務器上的測試

開始前的備注

# 查看防火牆狀態
systemctl status firewalld
# 關閉防火牆
service firewalld stop  
# 啟動防火牆
service firewalld start  

首先要確認你已經安裝了java環境

# 檢查java的命令
java -version

1:下載kafka並解壓

# 獲取kafka最新安裝包,這邊使用的是鏡像地址,可以去官方網站獲得最新地址
wget http://mirrors.hust.edu.cn/apache/kafka/0.11.0.1/kafka_2.11-0.11.0.1.tgz
# 解壓程序
tar -xzf kafka_2.11-0.11.0.1.tgz
# 進入目錄
cd kafka_2.11-0.11.0.1

配置對應的配置文件,server.properties

# 配置服務器zk地址
zookeeper.connect=localhost:2181
# 配置內網綁定關系
listeners=PLAINTEXT://<your.ip>:9092
# 配置外網綁定關系
advertised.listeners=PLAINTEXT://your.host.name:9092
# 配置kafka使用內存kafka-server-start.sh
# 在start中加入jvm的啟動參數,默認是1G
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"

2:啟動服務器

kafka需要使用zookeeper,所以你首先需要啟動一個zookeeper的服務,如果你沒有的話,就使用kafka內置的腳本來啟動一個單節點的zookeeper的實例
加入& 使進程常駐在內存中
默認端口:9092
默認為localhost,如果不配置對應的服務器ip的話

#執行快速啟動zookeeper,通過內置的zookeeper進行啟動,如果要zookeeper服務器的話嗎,需要再server.properties的配置文件里面加入zookeeper.connect = 你的服務器內網ip:2181
bin/zookeeper-server-start.sh config/zookeeper.properties &

[2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
...

然后啟動kafka的服務器:通過配置文件啟動kafka

# 啟動kafka
# server.properties的配置文件中有一個項目:	host.name需要配置成為你的內網服務器ip地址,訪問的時候通過外網環境通過外網ip地址訪問,內網環境通過內網地址訪問
bin/kafka-server-start.sh config/server.properties &
[2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)
[2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
...

3:創建一個topic名字叫做test

# 通過腳本命令創建一個主題為test的,並且使用的zookeeper的地址為localhost的
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

確認一下這個topic是否含有完畢

# 通過zookeeper的地址來訪問對應的topics中的主題列表
bin/kafka-topics.sh --list --zookeeper localhost:2181
test

4:啟動一個生產者發送一些消息過去

# 啟動客戶端推送對應的消息到服務器的kafka提供的端口
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

This is a message
This is another message

5:啟動一個消費者獲取對應主題的消息

# 啟動客戶端獲取對應服務端信息的地址來消費消息,使用pull的方式,每間隔0.1s進行一次服務器獲取
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

This is a message
This is another message

6:將kafka創建集群節點(暫時省略)

7:使用kafka連接進行導入導出數據(暫時省略)

8:使用kafka的流去處理數據(暫時省略)

使用場景切換:本地服務器,變成真實服務器,首先提供外部調用,應該使用的是服務器的地址

這個是在服務器本地測試場景:
切換成服務器場景的情況下,需要首先在將server.properties的配置文件中的
配置方式修正為服務器的內網ip地址,對外提供的外網ip地址會進行映射,映射到最終的內網地址中去
新版的只需要修改如下兩個配置:參考文章
http://blog.csdn.net/chenxun_2010/article/details/72626618
zookeeper.connect = localhost:9092
listeners = PLAINTEXT://ip:9092

java項目進行場景測試:

服務器kafka版本:2.11- 0.11.1
客戶端kafka版本:0.11.0.1

所以去maven中尋找對應的版本的jar包進行使用

org.apache.kafka kafka-clients 0.11.0.1

查看具體的api調用:

  1. 生產者:org.apache.kafka.clients.producer.KafkaProducer.class
  2. 消費者:org.apache.kafka.clients.consumer.KafkaConsumer.class

包結構:
clients
admin
consumer:KafkaProducer(實現類)
producer:KafkaConsumer(實現類)
otherclass
common
server.policy(服務公用)
在具體實現類里面源碼中有啟動的示例代碼再類的頭部注釋中

遇到了一個問題:
Failed to load class "org.slf4j.impl.StaticLoggerBinder".

kafka默認引用再java類中在程序中引入了org\slf4j\slf4j-api\1.7.25\slf4j-api-1.7.25.jar的jar包
引入對應的實現日志的框架使用logback框架

再pom的配置文件中加入對應的依賴
並且debug的日志等級很煩人,所以就加入了配置文件
參考文章:http://www.cnblogs.com/h--d/p/5668152.html

加入logback的庫依賴引用
至少需要引用三個模塊:
logback-classic
logback-core
logback-access
這三個模塊的內容
其中參考了這篇文章覺得很詳細,所以就提供出來:
http://www.cnblogs.com/warking/p/5710303.html

使用一個新的工程進行測試,一個啟動消費者,一個啟動生產者,編寫對應的代碼

  1. 編寫單元測試讀取配置建立連接,發送消息
  2. 生產者消費者所需要的依賴jar包
  3. 啟動線程消費者拉取消息成功消費(實現消息隊列的功能)

源碼示例大家可以去github上拉取:

https://github.com/fly-piglet/kafkastudy


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM