kafka內外網連通並且外網內網分開處理集群的搭建和測試


1.環境信息
整體架構
不同的機房的producer向中心節點發送信息,中心節點內部局域網消費信息進入hadoop集群 部署服務器基本信息 外網IP    操作系統     內網IP 安裝服務
1.1.1.1_yqmusic01 centos7.2_x86_64 10.19.97.25 kafka,zook 2.2.2.2_yqmusic02 centos7.2_x86_64 10.19.48.131 kafka,zook 3.3.3.3_ck04 centos6.5_x86_64 10.19.105.50 kafka,zook 依賴的jdk1.8.0_111 export JAVA_HOME=/usr/java/jdk1.8.0_111 export PATH=$JAVA_HOME/bin:$PATH 2.安裝配置zookeeper3.4.11 Linux服務器一台、三台、五台、(2*n+1),Zookeeper集群的工作是超過半數才能對外提供服務,3台中超過兩台超過半數,允許1台掛掉 ,是否可以用偶數,答案是不能 下載地址 https://mirrors.cnnic.cn/apache/zookeeper/zookeeper-3.4.11/zookeeper-3.4.11.tar.gz 創建數據和日志目錄 mkdir -p /home/yunva/zookeeper-3.4.11/data mkdir -p /home/yunva/zookeeper-3.4.11/logs 修改配置文件 #zoo_sample.cfg 這個文件是官方給我們的zookeeper的樣板文件,給他復制一份命名為zoo.cfg,zoo.cfg是官方指定的文件命名規則 cd /home/yunva/zookeeper-3.4.11 cp zoo_sample.cfg zoo.cfg # egrep '^[a-Z]' /home/yunva/zookeeper-3.4.11/conf/zoo.cfg tickTime=6000 initLimit=50 syncLimit=25 snapshot=5000 preAllocSize=1000 dataDir=/home/yunva/zookeeper-3.4.11/data dataLogDir=/home/yunva/zookeeper-3.4.11/logs clientPort=2181 autopurge.snapRetainCount=3 autopurge.purgeInterval=1 server.1=10.19.97.25:2888:3888 server.2=10.19.48.131:2888:3888 server.3=10.19.105.50:2888:3888 配置文件解釋: #tickTime: 這個時間是作為 Zookeeper 服務器之間或客戶端與服務器之間維持心跳的時間間隔,也就是每個 tickTime 時間就會發送一個心跳。 #initLimit: 這個配置項是用來配置 Zookeeper 接受客戶端(這里所說的客戶端不是用戶連接 Zookeeper 服務器的客戶端,而是 Zookeeper 服務器集群中連接到 Leader 的 Follower 服務器)初始化連接時最長能忍受多少個心跳時間間隔數。當已經超過 5個心跳的時間(也就是 tickTime)長度后 Zookeeper 服務器還沒有收到客戶端的返回信息,那么表明這個客戶端連接失敗。總的時間長度就是 5*2000=10 秒 #syncLimit: 這個配置項標識 Leader 與Follower 之間發送消息,請求和應答時間長度,最長不能超過多少個 tickTime 的時間長度,總的時間長度就是5*2000=10秒 #dataDir: 快照日志的存儲路徑 #dataLogDir: 事物日志的存儲路徑,如果不配置這個那么事物日志會默認存儲到dataDir制定的目錄,這樣會嚴重影響zk的性能,當zk吞吐量較大的時候,產生的事物日志、快照日志太多 #clientPort: 這個端口就是客戶端連接 Zookeeper 服務器的端口,Zookeeper 會監聽這個端口,接受客戶端的訪問請求 生成myid 主機(10.19.97.25) echo "1" >/home/yunva/zookeeper-3.4.11/myid ##生成ID,這里需要注意, myid對應的zoo.cfg的server.ID,比如第二台zookeeper主機對應的myid應該是2 主機(10.19.48.131) echo "2" >/home/yunva/zookeeper-3.4.11/myid 主機(10.19.105.50) echo "3" >/home/yunva/zookeeper-3.4.11/myid # 將配置好的zookeeper程序拷貝到其他2台服務器中 scp -P 28290 -r zookeeper-3.4.11/ 10.19.105.50:/home/yunva/ # 記得有反斜杠,否則就散落在/home/yunva目錄了 # 分別啟動三台zookeeper服務 [root@u04yq01 zookeeper-3.4.11]# pwd /home/yunva/zookeeper-3.4.11 [root@u04yq01 zookeeper-3.4.11]# bin/zkServer.sh start # 查看狀態,其中兩台為follower,一台為leader [root@u04yq01 zookeeper-3.4.11]# bin/zkServer.sh status ZooKeeper JMX enabled by default Using config: /home/yunva/zookeeper-3.4.11/bin/../conf/zoo.cfg Mode: follower [root@u04ck04 zookeeper-3.4.11]# bin/zkServer.sh status ZooKeeper JMX enabled by default Using config: /home/yunva/zookeeper-3.4.11/bin/../conf/zoo.cfg Mode: leader 查看zook節點相關信息 /home/yunva/zookeeper-3.4.11/bin/zkCli.sh -server 127.0.0.1:2181 3.Kafka集群搭建安裝kafka(三台主機上執行) 1、軟件環境 1、linux一台或多台,大於等於2 2、已經搭建好的zookeeper集群 3、軟件版本kafka_2.11-0.11.0.2.tgz 2、創建目錄並下載安裝軟件 一些基本的概念: 1、消費者:(Consumer):從消息隊列中請求消息的客戶端應用程序 2、生產者:(Producer) :向broker發布消息的應用程序 3、AMQP服務端(broker):用來接收生產者發送的消息並將這些消息路由給服務器中的隊列,便於fafka將生產者發送的消息,動態的添加到磁盤並給每一條消息一個偏移量,所以對於kafka一個broker就是一個應用程序的實例 kafka支持的客戶端語言:Kafka客戶端支持當前大部分主流語言,包括:C、C++、Erlang、Java、.net、perl、PHP、Python、Ruby、Go、Javascript 可以使用以上任何一種語言和kafka服務器進行通信(即辨析自己的consumer從kafka集群訂閱消息也可以自己寫producer程序) 4、主題(Topic):一個主題類似新聞中的體育、娛樂、教育等分類概念,在實際工程中通常一個業務一個主題。 5、分區(Partition):一個Topic中的消息數據按照多個分區組織,分區是kafka消息隊列組織的最小單位,一個分區可以看作是一個FIFO( First Input First Output的縮寫,先入先出隊列)的隊列。 kafka分區是提高kafka性能的關鍵所在,當你發現你的集群性能不高時,常用手段就是增加Topic的分區,分區里面的消息是按照從新到老的順序進行組織,消費者從隊列頭訂閱消息,生產者從隊列尾添加消息。 創建kafka數據目錄並下載安裝軟件 mkdir -p /data/kafkadata/kafka-logs 下載二進制文件kafka_2.11-0.11.0.2.tgz https://archive.apache.org/dist/kafka/0.11.0.1/kafka_2.11-0.11.0.1.tgz #解壓軟件 tar zxf kafka_2.11-0.9.0.1.tgz 主要關注:server.properties 這個文件即可,我們可以發現在目錄下 這里可以發現有Zookeeper文件,我們可以根據Kafka內帶的zk集群來啟動,但是建議使用獨立的zk集群 修改配置文件: broker.id=0 #當前機器在集群中的唯一標識,和zookeeper的myid性質一樣 port=9092 #當前kafka對外提供服務的端口默認是9092 listeners=PLAINTEXT://u04yq01.yaya.corp:9092 #這個參數默認是關閉的,配置本機的hostname num.network.threads=3 #這個是borker進行網絡處理的線程數 num.io.threads=8 #這個是borker進行I/O處理的線程數 log.dirs=/data/kafkadata/kafka-logs #消息存放的目錄,這個目錄可以配置為“,”逗號分割的表達式,上面的num.io.threads要大於這個目錄的個數這個目錄,如果配置多個目錄,新創建的topic他把消息持久化的地方是,當前以逗號分割的目錄中,那個分區數最少就放那一個 socket.send.buffer.bytes=102400 #發送緩沖區buffer大小,數據不是一下子就發送的,先回存儲到緩沖區了到達一定的大小后在發送,能提高性能 socket.receive.buffer.bytes=102400 #kafka接收緩沖區大小,當數據到達一定大小后在序列化到磁盤 socket.request.max.bytes=104857600 #這個參數是向kafka請求消息或者向kafka發送消息的請請求的最大數,這個值不能超過java的堆棧大小 num.partitions=1 #默認的分區數,一個topic默認1個分區數 log.retention.hours=168 #默認消息的最大持久化時間,168小時,7天 message.max.byte=5242880 #消息保存的最大值5M default.replication.factor=2 #kafka保存消息的副本數,如果一個副本失效了,另一個還可以繼續提供服務 replica.fetch.max.bytes=5242880 #取消息的最大直接數 log.segment.bytes=1073741824 #這個參數是:因為kafka的消息是以追加的形式落地到文件,當超過這個值的時候,kafka會新起一個文件 log.retention.check.interval.ms=300000 #每隔300000毫秒去檢查上面配置的log失效時間(log.retention.hours=168 ),到目錄查看是否有過期的消息如果有,刪除 log.cleaner.enable=false #是否啟用log壓縮,一般不用啟用,啟用的話可以提高性能 zookeeper.connect=10.19.97.25:2181,10.19.48.131:2181,10.19.105.50:2181 #設置zookeeper的連接端口 上面是參數的解釋,實際的修改項為: broker.id=0 每台服務器的broker.id都不能相同 listeners=PLAINTEXT://u04yq01.yaya.corp:9092 # 每台服務器節點都修改為自己的主機名 #在log.retention.hours=168 下面新增下面三項 message.max.byte=5242880 default.replication.factor=2 replica.fetch.max.bytes=5242880 #設置zookeeper集群的連接 zookeeper.connect=192.168.7.100:12181,192.168.7.101:12181,192.168.7.107:12181 配置信息: 節點1:10.19.97.25 [root@u04yq01 config]# egrep '^[a-Z]' server.properties broker.id=0 # 集群的編號,每個都不一樣 listeners=PLAINTEXT://u04yq01.yaya.corp:9092 # 修改為本機hostname,關鍵配置 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/data/kafkadata/kafka-logs # 日志保存路徑 num.partitions=1 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.retention.hours=168 # 日志保留時間 message.max.byte=5242880 default.replication.factor=2 replica.fetch.max.bytes=5242880 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 zookeeper.connect=10.19.97.25:2181,10.19.48.131:2181,10.19.105.50:2181 # zookeeper集群連接地址 zookeeper.connection.timeout.ms=6000 group.initial.rebalance.delay.ms=0 節點2:10.19.48.131 [root@u04yq02 kafka_2.11-0.11.0.2]# egrep '^[a-Z]' config/server.properties broker.id=1 listeners=PLAINTEXT://u04yq02.yaya.corp:9092 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/data/kafkadata/kafka-logs num.partitions=1 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.retention.hours=72 message.max.byte=5242880 default.replication.factor=2 replica.fetch.max.bytes=5242880 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 zookeeper.connect=10.19.97.25:2181,10.19.48.131:2181,10.19.105.50:2181 zookeeper.connection.timeout.ms=6000 group.initial.rebalance.delay.ms=0 節點3:10.19.105.50 [root@u04ck04 config]# egrep '^[a-Z]' server.properties broker.id=2 listeners=PLAINTEXT://u04ck04.yaya.corp:9092 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/data/kafkadata/kafka-logs num.partitions=1 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.retention.hours=72 message.max.byte=5242880 default.replication.factor=2 replica.fetch.max.bytes=5242880 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 zookeeper.connect=10.19.97.25:2181,10.19.48.131:2181,10.19.105.50:2181 zookeeper.connection.timeout.ms=6000 group.initial.rebalance.delay.ms=0 修改每台主機的/etc/hosts,將集群節點的主機名解析到對應的內網IP中 10.19.97.25 u04yq01.yaya.corp 10.19.48.131 u04yq02.yaya.corp 10.19.105.50 u04ck04.yaya.corp 啟動Kafka集群並測試 ①啟動kafka服務 #從后台啟動Kafka集群(3台都需要啟動) cd /home/yunva/kafka_2.11-0.11.0.2 #進入到kafka的bin目錄 bin/kafka-server-start.sh -daemon config/server.properties ②檢查服務是否啟動 #執行命令jps [root@u04yq01 yunva]# jps 17472 JarLauncher 17926 JarLauncher 17801 Kafka # 列出topics [root@u04yq01 ~]# /home/yunva/kafka_2.11-0.11.0.2/bin/kafka-topics.sh --list --zookeeper localhost:2181 SCRIPT-BI-TOPIC __consumer_offsets 登錄zk來查看zk的目錄情況 [root@u04yq01 yunva]# zookeeper-3.4.11/bin/zkCli.sh -server [zk: localhost:2181(CONNECTED) 0] get /brokers/ids/0 {"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://u04yq01.yaya.corp:9092"],"jmx_port":-1,"host":"u04yq01.yaya.corp","timestamp":"1515745869088","port":9092,"version":4} cZxid = 0x10000001e ctime = Fri Jan 12 16:31:09 CST 2018 mZxid = 0x10000001e mtime = Fri Jan 12 16:31:09 CST 2018 pZxid = 0x10000001e cversion = 0 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x104df61b4bd0001 dataLength = 204 numChildren = 0 [zk: localhost:2181(CONNECTED) 3] get /brokers/topics/SCRIPT-BI-TOPIC/partitions/0/state {"controller_epoch":1,"leader":2,"version":1,"leader_epoch":0,"isr":[2,0]} cZxid = 0x100000033 ctime = Fri Jan 12 16:57:28 CST 2018 mZxid = 0x100000033 mtime = Fri Jan 12 16:57:28 CST 2018 pZxid = 0x100000033 cversion = 0 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 74 numChildren = 0 ③測試kafka集群是否ok 手動測試方法可以參考:http://www.cnblogs.com/reblue520/p/7853116.html 也可以通過測試腳本來測試:
腳本測試架構
①機房A的producer生產者 #
cat kafka_test.py # -*- coding: utf-8 -*- ''''' 使用kafka-Python 1.3.3模塊 ''' import sys import time import json from kafka import KafkaProducer from kafka import KafkaConsumer from kafka.errors import KafkaError KAFAKA_HOST = "1.1.1.1,2.2.2.2,3.3.3.3" KAFAKA_PORT = 9092 KAFAKA_TOPIC = "SCRIPT-BI-TOPIC" class Kafka_producer(): ''''' 生產模塊:根據不同的key,區分消息 ''' def __init__(self, kafkahost,kafkaport, kafkatopic, key): self.kafkaHost = kafkahost self.kafkaPort = kafkaport self.kafkatopic = kafkatopic self.key = key print("producer:h,p,t,k",kafkahost,kafkaport,kafkatopic,key) bootstrap_servers = '{kafka_host}:{kafka_port}'.format( kafka_host=self.kafkaHost, kafka_port=self.kafkaPort ) print("boot svr:",bootstrap_servers) self.producer = KafkaProducer(bootstrap_servers = bootstrap_servers ) def sendjsondata(self, params): try: parmas_message = json.dumps(params,ensure_ascii=False) producer = self.producer print(parmas_message) v = parmas_message.encode('utf-8') k = key.encode('utf-8') print("send msg:(k,v)",k,v) producer.send(self.kafkatopic, key=k, value= v) producer.flush() except KafkaError as e: print (e) class Kafka_consumer(): ''''' 消費模塊: 通過不同groupid消費topic里面的消息 ''' def __init__(self, kafkahost, kafkaport, kafkatopic, groupid): self.kafkaHost = kafkahost self.kafkaPort = kafkaport self.kafkatopic = kafkatopic self.groupid = groupid self.key = key self.consumer = KafkaConsumer(self.kafkatopic, group_id = self.groupid, bootstrap_servers = '{kafka_host}:{kafka_port}'.format( kafka_host=self.kafkaHost, kafka_port=self.kafkaPort ) ) def consume_data(self): try: for message in self.consumer: yield message except KeyboardInterrupt as e: print (e) def main(xtype, group, key): ''''' 測試consumer和producer ''' if xtype == "p": # 生產模塊 producer = Kafka_producer(KAFAKA_HOST, KAFAKA_PORT, KAFAKA_TOPIC, key) print ("===========> producer:", producer) for _id in range(1000): params = '{"msg" : "%s"}' % str(_id) params=[{"msg0" :_id},{"msg1" :_id}] producer.sendjsondata(params) time.sleep(1) if xtype == 'c': # 消費模塊 consumer = Kafka_consumer(KAFAKA_HOST, KAFAKA_PORT, KAFAKA_TOPIC, group) print ("===========> consumer:", consumer) message = consumer.consume_data() for msg in message: print ('msg---------------->k,v', msg.key,msg.value) print ('offset---------------->', msg.offset) if __name__ == '__main__': xtype = sys.argv[1] group = sys.argv[2] key = sys.argv[3] main(xtype, group, key) 外網機房A消費者,需要配置/etc/hosts,把kafka集群的外網ip指向他們的hostname [kafka_producer]# cat /etc/hosts 1.1.1.1 u04yq01.yaya.corp 2.2.2.2 u04yq02.yaya.corp 3.3.3.3 u04ck04.yaya.corp ②中心機房同一內網消費者配置(代碼相同部分就省略掉了) # -*- coding: utf-8 -*- ''''' 使用kafka-Python 1.3.3模塊 ''' import sys import time import json from kafka import KafkaProducer from kafka import KafkaConsumer from kafka.errors import KafkaError KAFAKA_HOST = "10.19.97.25,10.19.48.131,10.19.105.50" KAFAKA_PORT = 9092 KAFAKA_TOPIC = "SCRIPT-BI-TOPIC" ....... 內網消費者/etc/hosts配置 10.19.97.25 u04yq01.yaya.corp 10.19.48.131 u04yq02.yaya.corp 10.19.105.50 u04ck04.yaya.corp 腳本使用方法: producer生產者 python kafka_test p g v consumer消費者 python kafka_test c g v

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM