linux下kafka與zookeeper集群部署


*********************************配置主機名,通過主機名連接機器*********************************

比如說,已經有了三台主機

1,在linux上設置hostname,通過hostname來訪問linux虛擬機

1.1. 修改hosts文件

vim /etc/hosts

#/etc/hosts 的內容一般有如下類似內容:
127.0.0.1   localhost localhost.localdomain localhost4 localhost4.localdomain4
::1         localhost localhost.localdomain localhost6 localhost6.localdomain6
192.168.202.156    node1
192.168.202.157    node2
192.168.202.158    node3

node1我當時沒有專門加這一行,而是直接在127.0.0.1后面,把localhost.localdomain修改為 node1

1.2. 修改network

修改配置文件/etc/sysconfig/network
修改HOSTNAME=yourname

NETWORKING=yes
HOSTNAME=node1

然后三台機器重啟,reboot

重啟后,ssh node2 ,發現能通過主機名字,連上

*********************************不同機器間,免密訪問*********************************

通過secureCRT,send commands to all sessions,可以達到一個輸入,在多個linux中響應

免密訪問可以看 http://blog.chinaunix.net/uid-26284395-id-2949145.html

1、ssh-keygen

2、ssh-copy-id -i  /root/.ssh/id_rsa.pub node1  (更換node2、3,然后一共重復三遍,將每台機器的publickey放到三台機器中)

最后,可以查看 cat /root/.ssh/authorized_keys 是否有node1、2、3,有的話就是可以

通過ssh node1、2、3,可以分別連上三台機器。

*********************************安裝clustershell*********************************

我的linux是CentOS6.5

去下載包 clustershell-1.6-1.el6.noarch.rpm — RPM RHEL6/CentOS6/SL6

https://github.com/cea-hpc/clustershell/downloads

執行命令,安裝:rpm -ivh clustershell-1.6-1.el6.noarch.rpm

安裝成功后,

vim /etc/clustershell/groups

在groups里面加一個組

kafka: node[1-3]

這樣就把node[1-3] 加入到kafka這個組里面。

這樣,clustershell 安裝成功

clush  -g kafka -c /opt/kafka

可以將/opt/kafka復制到集群中這個組中去

*********************************安裝zookeeper,並啟動*********************************

 

cd zookeeper-3.4.10

cd conf/

cp zoo_sample.cfg zoo.cfg

vim zoo.cfg 
加入:
server.1=node1:2888:3888
server.2=node2:2888:3888
server.3=node3:2888:3888

clush -g kafka -c zoo.cfg 

clush -g kafka mkdir /tmp/zookeeper

echo "1" > /tmp/zookeeper/myid

[root@node1 conf]# clush -g kafka cat /tmp/zookeeper/myid 
node3: 3
node2: 2
node1: 1

[root@node1 zookeeper-3.4.10]# clush -g kafka "/opt/kafka/zookeeper-3.4.10/bin/zkServer.sh start /opt/kafka/zookeeper-3.4.10/conf/zoo.cfg "
node1: ZooKeeper JMX enabled by default
node1: Using config: /opt/kafka/zookeeper-3.4.10/conf/zoo.cfg
node2: ZooKeeper JMX enabled by default
node3: ZooKeeper JMX enabled by default
node2: Using config: /opt/kafka/zookeeper-3.4.10/conf/zoo.cfg
node3: Using config: /opt/kafka/zookeeper-3.4.10/conf/zoo.cfg
node1: Starting zookeeper ... STARTED
node2: Starting zookeeper ... STARTED
node3: Starting zookeeper ... STARTED


[root@node1 zookeeper-3.4.10]# clush -g kafka "/opt/kafka/zookeeper-3.4.10/bin/zkServer.sh status /opt/kafka/zookeeper-3.4.10/conf/zoo.cfg "
通過看各個節點的狀態,驗證zookeeper集群是否啟動成功
也可以通過看 2181/2888/3888這幾個端口是否都被占用來驗證

如果沒有啟動成功,那就可能是防火牆的問題,吧防火牆關了即可

clush -g kafka service iptables stop

接下來,可以看看三台機器數據是不是同步的:
在 node1 上,用 zookeeper 的客戶端工具,連接服務器
bin/zkCli.sh -server node1:2181
#
#
#
#
ls /
會看到 / 下面的一些東西
也可以創建一個節點,並給他一個值hello:
create /test hello
ls / 可以看一下
然后在 node2 上,如果可以看到node1 創建的數據,說明數據是同步一致的:
bin/zkCli.sh -server node1:2181
get /test 可以看到剛才輸入的hello
通過quit可以退出

 

*********************************安裝kafka,並啟動*********************************

安裝:
修改server.properties 
broker.id=1
zookeeper.connect=node1:2181,node2:2181,node3:2181
修改完成后,分發到集群中
並單獨修改broker.id=2 、3 之類

在三台機器上啟動:
bin/kafka-server-start.sh -daemon config/server.properties
啟動后,查看9092端口是否被監聽
lsof -i:9092


 

在node1上創建消費者,接收消息

創建一個topic: [root@node1 kafka_2.
10-0.10.2.1]# bin/kafka-topics.sh --zookeeper node1:2181 --topic topic1 --create --partitions 3 --replication-factor 2 Created topic "topic1". 查看這個topic [root@node1 kafka_2.10-0.10.2.1]# bin/kafka-topics.sh --zookeeper node1:2181 --topic topic1 --describe Topic:topic1 PartitionCount:3 ReplicationFactor:2 Configs: Topic: topic1 Partition: 0 Leader: 1 Replicas: 1,3 Isr: 1,3 Topic: topic1 Partition: 1 Leader: 2 Replicas: 2,1 Isr: 2,1 Topic: topic1 Partition: 2 Leader: 3 Replicas: 3,2 Isr: 3,2 創建一個consumer,去接收生產者的消息 [root@node1 kafka_2.10-0.10.2.1]# bin/kafka-console-consumer.sh --zookeeper node1:2181 --topic topic1 Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper]. 1 hello

 

在node2上創建生產者,生產消息
[root@node2 kafka_2.
10-0.10.2.1]# bin/kafka-console-producer.sh --broker-list node2:9092 --topic topic1 1 hello

 

查看已有的topic
bin/kafka-topics.sh --list --zookeeper node1:2181

 

Furthermore, ConsumerOffestChecker shows a row for each topic partition. Your topic topic5 does have some partitions.
  • Pid: partition ID
  • Offset: the latest committed offset for a partition for the corresponding consumer group
  • logSize: the number of messages stored in the partition
  • Lag: the number of not yet consumed message for a partition for the corresponding consumer group (ie, lag = logSize - offset)
  • Owner: unique ID of the running consumer thread

[orco@node1 kafka_2.10-0.10.1.1]$ bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper node1 --topic topic5 --group group1
[
2017-07-26 11:39:16,748] WARN WARNING: ConsumerOffsetChecker is deprecated and will be dropped in releases following 0.9.0. Use ConsumerGroupCommand instead. (kafka.tools.ConsumerOffsetChecker$) Group Topic Pid Offset logSize Lag Owner group1 topic5 0 0 0 0 none group1 topic5 1 10 10 0 none group1 topic5 2 0 0 0 none

 

有點記不清,eclipse中使用java api 調用kafka服務,好像額外需要在service.properties中修改下面這個

#listeners=PLAINTEXT://:9092

listeners=PLAINTEXT://192.168.202.156:9092

或者是

listeners=PLAINTEXT://node1:9092

不同機器,不同的node2 node3等等


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM