1.環境信息
整體架構
不同的機房的producer向中心節點發送信息,中心節點內部局域網消費信息進入hadoop集群
部署服務器基本信息
外網IP 操作系統 內網IP 安裝服務
1.1.1.1_yqmusic01 centos7.2_x86_64 10.19.97.25 kafka,zook
2.2.2.2_yqmusic02 centos7.2_x86_64 10.19.48.131 kafka,zook
3.3.3.3_ck04 centos6.5_x86_64 10.19.105.50 kafka,zook
依賴的jdk1.8.0_111
export JAVA_HOME=/usr/java/jdk1.8.0_111
export PATH=$JAVA_HOME/bin:$PATH
2.安裝配置zookeeper3.4.11
Linux服務器一台、三台、五台、(2*n+1),Zookeeper集群的工作是超過半數才能對外提供服務,3台中超過兩台超過半數,允許1台掛掉 ,是否可以用偶數,答案是不能
下載地址
https://mirrors.cnnic.cn/apache/zookeeper/zookeeper-3.4.11/zookeeper-3.4.11.tar.gz
創建數據和日志目錄
mkdir -p /home/yunva/zookeeper-3.4.11/data
mkdir -p /home/yunva/zookeeper-3.4.11/logs
修改配置文件
#zoo_sample.cfg 這個文件是官方給我們的zookeeper的樣板文件,給他復制一份命名為zoo.cfg,zoo.cfg是官方指定的文件命名規則
cd /home/yunva/zookeeper-3.4.11
cp zoo_sample.cfg zoo.cfg
# egrep '^[a-Z]' /home/yunva/zookeeper-3.4.11/conf/zoo.cfg
tickTime=6000
initLimit=50
syncLimit=25
snapshot=5000
preAllocSize=1000
dataDir=/home/yunva/zookeeper-3.4.11/data
dataLogDir=/home/yunva/zookeeper-3.4.11/logs
clientPort=2181
autopurge.snapRetainCount=3
autopurge.purgeInterval=1
server.1=10.19.97.25:2888:3888
server.2=10.19.48.131:2888:3888
server.3=10.19.105.50:2888:3888
配置文件解釋:
#tickTime:
這個時間是作為 Zookeeper 服務器之間或客戶端與服務器之間維持心跳的時間間隔,也就是每個 tickTime 時間就會發送一個心跳。
#initLimit:
這個配置項是用來配置 Zookeeper 接受客戶端(這里所說的客戶端不是用戶連接 Zookeeper 服務器的客戶端,而是 Zookeeper 服務器集群中連接到 Leader 的 Follower 服務器)初始化連接時最長能忍受多少個心跳時間間隔數。當已經超過 5個心跳的時間(也就是 tickTime)長度后 Zookeeper 服務器還沒有收到客戶端的返回信息,那么表明這個客戶端連接失敗。總的時間長度就是 5*2000=10 秒
#syncLimit:
這個配置項標識 Leader 與Follower 之間發送消息,請求和應答時間長度,最長不能超過多少個 tickTime 的時間長度,總的時間長度就是5*2000=10秒
#dataDir:
快照日志的存儲路徑
#dataLogDir:
事物日志的存儲路徑,如果不配置這個那么事物日志會默認存儲到dataDir制定的目錄,這樣會嚴重影響zk的性能,當zk吞吐量較大的時候,產生的事物日志、快照日志太多
#clientPort:
這個端口就是客戶端連接 Zookeeper 服務器的端口,Zookeeper 會監聽這個端口,接受客戶端的訪問請求
生成myid
主機(10.19.97.25)
echo "1" >/home/yunva/zookeeper-3.4.11/myid ##生成ID,這里需要注意, myid對應的zoo.cfg的server.ID,比如第二台zookeeper主機對應的myid應該是2
主機(10.19.48.131)
echo "2" >/home/yunva/zookeeper-3.4.11/myid
主機(10.19.105.50)
echo "3" >/home/yunva/zookeeper-3.4.11/myid
# 將配置好的zookeeper程序拷貝到其他2台服務器中
scp -P 28290 -r zookeeper-3.4.11/ 10.19.105.50:/home/yunva/ # 記得有反斜杠,否則就散落在/home/yunva目錄了
# 分別啟動三台zookeeper服務
[root@u04yq01 zookeeper-3.4.11]# pwd
/home/yunva/zookeeper-3.4.11
[root@u04yq01 zookeeper-3.4.11]# bin/zkServer.sh start
# 查看狀態,其中兩台為follower,一台為leader
[root@u04yq01 zookeeper-3.4.11]# bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /home/yunva/zookeeper-3.4.11/bin/../conf/zoo.cfg
Mode: follower
[root@u04ck04 zookeeper-3.4.11]# bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /home/yunva/zookeeper-3.4.11/bin/../conf/zoo.cfg
Mode: leader
查看zook節點相關信息
/home/yunva/zookeeper-3.4.11/bin/zkCli.sh -server 127.0.0.1:2181
3.Kafka集群搭建安裝kafka(三台主機上執行)
1、軟件環境
1、linux一台或多台,大於等於2
2、已經搭建好的zookeeper集群
3、軟件版本kafka_2.11-0.11.0.2.tgz
2、創建目錄並下載安裝軟件
一些基本的概念:
1、消費者:(Consumer):從消息隊列中請求消息的客戶端應用程序
2、生產者:(Producer) :向broker發布消息的應用程序
3、AMQP服務端(broker):用來接收生產者發送的消息並將這些消息路由給服務器中的隊列,便於fafka將生產者發送的消息,動態的添加到磁盤並給每一條消息一個偏移量,所以對於kafka一個broker就是一個應用程序的實例
kafka支持的客戶端語言:Kafka客戶端支持當前大部分主流語言,包括:C、C++、Erlang、Java、.net、perl、PHP、Python、Ruby、Go、Javascript
可以使用以上任何一種語言和kafka服務器進行通信(即辨析自己的consumer從kafka集群訂閱消息也可以自己寫producer程序)
4、主題(Topic):一個主題類似新聞中的體育、娛樂、教育等分類概念,在實際工程中通常一個業務一個主題。
5、分區(Partition):一個Topic中的消息數據按照多個分區組織,分區是kafka消息隊列組織的最小單位,一個分區可以看作是一個FIFO( First Input First Output的縮寫,先入先出隊列)的隊列。
kafka分區是提高kafka性能的關鍵所在,當你發現你的集群性能不高時,常用手段就是增加Topic的分區,分區里面的消息是按照從新到老的順序進行組織,消費者從隊列頭訂閱消息,生產者從隊列尾添加消息。
創建kafka數據目錄並下載安裝軟件
mkdir -p /data/kafkadata/kafka-logs
下載二進制文件kafka_2.11-0.11.0.2.tgz
https://archive.apache.org/dist/kafka/0.11.0.1/kafka_2.11-0.11.0.1.tgz
#解壓軟件
tar zxf kafka_2.11-0.9.0.1.tgz
主要關注:server.properties 這個文件即可,我們可以發現在目錄下
這里可以發現有Zookeeper文件,我們可以根據Kafka內帶的zk集群來啟動,但是建議使用獨立的zk集群
修改配置文件:
broker.id=0 #當前機器在集群中的唯一標識,和zookeeper的myid性質一樣
port=9092 #當前kafka對外提供服務的端口默認是9092
listeners=PLAINTEXT://u04yq01.yaya.corp:9092 #這個參數默認是關閉的,配置本機的hostname
num.network.threads=3 #這個是borker進行網絡處理的線程數
num.io.threads=8 #這個是borker進行I/O處理的線程數
log.dirs=/data/kafkadata/kafka-logs #消息存放的目錄,這個目錄可以配置為“,”逗號分割的表達式,上面的num.io.threads要大於這個目錄的個數這個目錄,如果配置多個目錄,新創建的topic他把消息持久化的地方是,當前以逗號分割的目錄中,那個分區數最少就放那一個
socket.send.buffer.bytes=102400 #發送緩沖區buffer大小,數據不是一下子就發送的,先回存儲到緩沖區了到達一定的大小后在發送,能提高性能
socket.receive.buffer.bytes=102400 #kafka接收緩沖區大小,當數據到達一定大小后在序列化到磁盤
socket.request.max.bytes=104857600 #這個參數是向kafka請求消息或者向kafka發送消息的請請求的最大數,這個值不能超過java的堆棧大小
num.partitions=1 #默認的分區數,一個topic默認1個分區數
log.retention.hours=168 #默認消息的最大持久化時間,168小時,7天
message.max.byte=5242880 #消息保存的最大值5M
default.replication.factor=2 #kafka保存消息的副本數,如果一個副本失效了,另一個還可以繼續提供服務
replica.fetch.max.bytes=5242880 #取消息的最大直接數
log.segment.bytes=1073741824 #這個參數是:因為kafka的消息是以追加的形式落地到文件,當超過這個值的時候,kafka會新起一個文件
log.retention.check.interval.ms=300000 #每隔300000毫秒去檢查上面配置的log失效時間(log.retention.hours=168 ),到目錄查看是否有過期的消息如果有,刪除
log.cleaner.enable=false #是否啟用log壓縮,一般不用啟用,啟用的話可以提高性能
zookeeper.connect=10.19.97.25:2181,10.19.48.131:2181,10.19.105.50:2181 #設置zookeeper的連接端口
上面是參數的解釋,實際的修改項為:
broker.id=0 每台服務器的broker.id都不能相同
listeners=PLAINTEXT://u04yq01.yaya.corp:9092 # 每台服務器節點都修改為自己的主機名
#在log.retention.hours=168 下面新增下面三項
message.max.byte=5242880
default.replication.factor=2
replica.fetch.max.bytes=5242880
#設置zookeeper集群的連接
zookeeper.connect=192.168.7.100:12181,192.168.7.101:12181,192.168.7.107:12181
配置信息:
節點1:10.19.97.25
[root@u04yq01 config]# egrep '^[a-Z]' server.properties
broker.id=0 # 集群的編號,每個都不一樣
listeners=PLAINTEXT://u04yq01.yaya.corp:9092 # 修改為本機hostname,關鍵配置
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafkadata/kafka-logs # 日志保存路徑
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168 # 日志保留時間
message.max.byte=5242880
default.replication.factor=2
replica.fetch.max.bytes=5242880
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=10.19.97.25:2181,10.19.48.131:2181,10.19.105.50:2181 # zookeeper集群連接地址
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
節點2:10.19.48.131
[root@u04yq02 kafka_2.11-0.11.0.2]# egrep '^[a-Z]' config/server.properties
broker.id=1
listeners=PLAINTEXT://u04yq02.yaya.corp:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafkadata/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=72
message.max.byte=5242880
default.replication.factor=2
replica.fetch.max.bytes=5242880
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=10.19.97.25:2181,10.19.48.131:2181,10.19.105.50:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
節點3:10.19.105.50
[root@u04ck04 config]# egrep '^[a-Z]' server.properties
broker.id=2
listeners=PLAINTEXT://u04ck04.yaya.corp:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafkadata/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=72
message.max.byte=5242880
default.replication.factor=2
replica.fetch.max.bytes=5242880
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=10.19.97.25:2181,10.19.48.131:2181,10.19.105.50:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
修改每台主機的/etc/hosts,將集群節點的主機名解析到對應的內網IP中
10.19.97.25 u04yq01.yaya.corp
10.19.48.131 u04yq02.yaya.corp
10.19.105.50 u04ck04.yaya.corp
啟動Kafka集群並測試
①啟動kafka服務
#從后台啟動Kafka集群(3台都需要啟動)
cd /home/yunva/kafka_2.11-0.11.0.2 #進入到kafka的bin目錄
bin/kafka-server-start.sh -daemon config/server.properties
②檢查服務是否啟動
#執行命令jps
[root@u04yq01 yunva]# jps
17472 JarLauncher
17926 JarLauncher
17801 Kafka
# 列出topics
[root@u04yq01 ~]# /home/yunva/kafka_2.11-0.11.0.2/bin/kafka-topics.sh --list --zookeeper localhost:2181
SCRIPT-BI-TOPIC
__consumer_offsets
登錄zk來查看zk的目錄情況
[root@u04yq01 yunva]# zookeeper-3.4.11/bin/zkCli.sh -server
[zk: localhost:2181(CONNECTED) 0] get /brokers/ids/0
{"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://u04yq01.yaya.corp:9092"],"jmx_port":-1,"host":"u04yq01.yaya.corp","timestamp":"1515745869088","port":9092,"version":4}
cZxid = 0x10000001e
ctime = Fri Jan 12 16:31:09 CST 2018
mZxid = 0x10000001e
mtime = Fri Jan 12 16:31:09 CST 2018
pZxid = 0x10000001e
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x104df61b4bd0001
dataLength = 204
numChildren = 0
[zk: localhost:2181(CONNECTED) 3] get /brokers/topics/SCRIPT-BI-TOPIC/partitions/0/state
{"controller_epoch":1,"leader":2,"version":1,"leader_epoch":0,"isr":[2,0]}
cZxid = 0x100000033
ctime = Fri Jan 12 16:57:28 CST 2018
mZxid = 0x100000033
mtime = Fri Jan 12 16:57:28 CST 2018
pZxid = 0x100000033
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 74
numChildren = 0
③測試kafka集群是否ok
手動測試方法可以參考:http://www.cnblogs.com/reblue520/p/7853116.html
也可以通過測試腳本來測試:
腳本測試架構
①機房A的producer生產者
# cat kafka_test.py
# -*- coding: utf-8 -*-
'''''
使用kafka-Python 1.3.3模塊
'''
import sys
import time
import json
from kafka import KafkaProducer
from kafka import KafkaConsumer
from kafka.errors import KafkaError
KAFAKA_HOST = "1.1.1.1,2.2.2.2,3.3.3.3"
KAFAKA_PORT = 9092
KAFAKA_TOPIC = "SCRIPT-BI-TOPIC"
class Kafka_producer():
'''''
生產模塊:根據不同的key,區分消息
'''
def __init__(self, kafkahost,kafkaport, kafkatopic, key):
self.kafkaHost = kafkahost
self.kafkaPort = kafkaport
self.kafkatopic = kafkatopic
self.key = key
print("producer:h,p,t,k",kafkahost,kafkaport,kafkatopic,key)
bootstrap_servers = '{kafka_host}:{kafka_port}'.format(
kafka_host=self.kafkaHost,
kafka_port=self.kafkaPort
)
print("boot svr:",bootstrap_servers)
self.producer = KafkaProducer(bootstrap_servers = bootstrap_servers
)
def sendjsondata(self, params):
try:
parmas_message = json.dumps(params,ensure_ascii=False)
producer = self.producer
print(parmas_message)
v = parmas_message.encode('utf-8')
k = key.encode('utf-8')
print("send msg:(k,v)",k,v)
producer.send(self.kafkatopic, key=k, value= v)
producer.flush()
except KafkaError as e:
print (e)
class Kafka_consumer():
'''''
消費模塊: 通過不同groupid消費topic里面的消息
'''
def __init__(self, kafkahost, kafkaport, kafkatopic, groupid):
self.kafkaHost = kafkahost
self.kafkaPort = kafkaport
self.kafkatopic = kafkatopic
self.groupid = groupid
self.key = key
self.consumer = KafkaConsumer(self.kafkatopic, group_id = self.groupid,
bootstrap_servers = '{kafka_host}:{kafka_port}'.format(
kafka_host=self.kafkaHost,
kafka_port=self.kafkaPort )
)
def consume_data(self):
try:
for message in self.consumer:
yield message
except KeyboardInterrupt as e:
print (e)
def main(xtype, group, key):
'''''
測試consumer和producer
'''
if xtype == "p":
# 生產模塊
producer = Kafka_producer(KAFAKA_HOST, KAFAKA_PORT, KAFAKA_TOPIC, key)
print ("===========> producer:", producer)
for _id in range(1000):
params = '{"msg" : "%s"}' % str(_id)
params=[{"msg0" :_id},{"msg1" :_id}]
producer.sendjsondata(params)
time.sleep(1)
if xtype == 'c':
# 消費模塊
consumer = Kafka_consumer(KAFAKA_HOST, KAFAKA_PORT, KAFAKA_TOPIC, group)
print ("===========> consumer:", consumer)
message = consumer.consume_data()
for msg in message:
print ('msg---------------->k,v', msg.key,msg.value)
print ('offset---------------->', msg.offset)
if __name__ == '__main__':
xtype = sys.argv[1]
group = sys.argv[2]
key = sys.argv[3]
main(xtype, group, key)
外網機房A消費者,需要配置/etc/hosts,把kafka集群的外網ip指向他們的hostname
[kafka_producer]# cat /etc/hosts
1.1.1.1 u04yq01.yaya.corp
2.2.2.2 u04yq02.yaya.corp
3.3.3.3 u04ck04.yaya.corp
②中心機房同一內網消費者配置(代碼相同部分就省略掉了)
# -*- coding: utf-8 -*-
'''''
使用kafka-Python 1.3.3模塊
'''
import sys
import time
import json
from kafka import KafkaProducer
from kafka import KafkaConsumer
from kafka.errors import KafkaError
KAFAKA_HOST = "10.19.97.25,10.19.48.131,10.19.105.50"
KAFAKA_PORT = 9092
KAFAKA_TOPIC = "SCRIPT-BI-TOPIC"
.......
內網消費者/etc/hosts配置
10.19.97.25 u04yq01.yaya.corp
10.19.48.131 u04yq02.yaya.corp
10.19.105.50 u04ck04.yaya.corp
腳本使用方法:
producer生產者
python kafka_test p g v
consumer消費者
python kafka_test c g v