Zookeeper + Kafka


ZooKeeper 是一个分布式服务框架,它主要是用来解决分布式应用中经常遇到的一些数据管理问题,
如:命名服务、状态同步、配置中心、集群管理等。借用阿里的Dubbo图详细解释。

 

 

1. 生产者启动
2. 生产者注册至zookeeper
3. 消费者启动并订阅频道
4. zookeeper 通知消费者事件
5. 消费者调用生产者
6. 监控中心负责统计和监控服务状态

========================================================

节点角色状态:
LOOKING:寻找 Leader状态,处于该状态需要进入选举流程
LEADING:领导者状态,处于该状态的节点说明是角色已经是Leader
FOLLOWING:跟随者状态,表示 Leader已经选举出来,当前节点角色是follower
OBSERVER:观察者状态,表明当前节点角色是 observer
选举 ID:
ZXID(zookeeper transaction id):每个改变 Zookeeper状态的操作都会形成一个对应的zxid。
ZXID最大的节点优先选为Leader
myid:服务器的唯一标识(SID),通过配置 myid 文件指定,集群中唯一,当ZXID一样时,myid大的节
点优先选为Leader
zookeeper 集群选举过程:
当集群中的 zookeeper 节点启动以后,会根据配置文件中指定的 zookeeper节点地址进行leader 选择
操作,过程如下:每个zookeeper 都会发出投票,由于是第一次选举leader,因此每个节点都会把自己当做leader
角色进行选举,每个zookeeper 的投票中都会包含自己的myid和zxid,此时zookeeper 1 的投票
为myid 为 1,初始zxid有一个初始值0x0,后期会随着数据更新而自动变化,zookeeper2 的投票
为myid 为2,初始zxid 为初始生成的值。
每个节点接受并检查对方的投票信息,比如投票时间、是否状态为LOOKING状态的投票。
对比投票,优先检查zxid,如果zxid 不一样则 zxid 大的为leader,如果zxid相同则继续对比
myid,myid 大的一方为 leader
成为 Leader 的必要条件: Leader 要具有最高的zxid;当集群的规模是 n 时,集群中大多数的机
器(至少n/2+1)得到响应并follower 选出的 Leader。
心跳机制:Leader 与 Follower 利用PING 来感知对方的是否存活,当 Leader无法响应PING 时,将重
新发起 Leader 选举。
当 Leader 服务器出现网络中断、崩溃退出与重启等异常情况时,ZAB(Zookeeper Atomic Broadcast)
协议就会进入恢复模式并选举产生新的Leader服务器。这个过程大致如下:
Leader election(选举阶段):节点在一开始都处于选举阶段,只要有一个节点得到超半数节点的
票数,它就可以当选准 leader。
Discovery(发现阶段):在这个阶段,followers 跟准 leader 进行通信,同步 followers 最近接
收的事务提议。
Synchronization(同步阶段):同步阶段主要是利用 leader 前一阶段获得的最新提议历史,同步集
群中所有的副本。同步完成之后 准 leader 才会成为真正的 leader。
Broadcast(广播阶段) :到了这个阶段,Zookeeper 集群才能正式对外提供事务服务,并且
leader 可以进行消息广播。同时如果有新的节点加入,还需要对新节点进行同步
[19:50:46 root@zookeeper-node1 ~]#cat /usr/local/zookeeper/conf/zoo.cfg 
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/usr/local/zookeeper/data
clientPort=2181
maxClientCnxns=128
autopurge.snapRetainCount=3
autopurge.purgeInterval=24
server.1=10.0.0.201:2888:3888
server.2=10.0.0.202:2888:3888
server.3=10.0.0.203:2888:3888

[19:50:46 root@zookeeper-node1 ~]#echo 1 > /usr/local/zookeeper/data/myid
[19:51:11 root@zookeeper-node1 ~]#echo 2 > /usr/local/zookeeper/data/myid
[19:52:31 root@zookeeper-node1 ~]#echo 3 > /usr/local/zookeeper/data/myid

如果想使用nc命令监控状态,用于zabbix等,需要添加一下选项,不然会报错
[19:53:36 root@zookeeper-node1 ~]#echo stat | nc 10.0.0.201 2181
stat is not executed because it is not in the whitelist.
[19:53:49 root@zookeeper-node1 ~]#cat /usr/local/zookeeper/conf/zoo.cfg 
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/usr/local/zookeeper/data
clientPort=2181
maxClientCnxns=128
autopurge.snapRetainCount=3
autopurge.purgeInterval=24
server.1=10.0.0.201:2888:3888
server.2=10.0.0.202:2888:3888
server.3=10.0.0.203:2888:3888

4lw.commands.whitelist=stat,ruok,conf,isro
[19:53:55 root@zookeeper-node1 ~]#echo stat | nc 10.0.0.201 2181
Zookeeper version: 3.6.3--6401e4ad2087061bc6b9f80dec2d69f2e3c8660a, built on 04/08/2021 16:35 GMT
Clients:
 /10.0.0.201:34424[0](queued=0,recved=1,sent=0)
 /10.0.0.202:50822[1](queued=0,recved=3,sent=3)
 /10.0.0.201:34420[1](queued=0,recved=3,sent=3)

Latency min/avg/max: 0/1.0/2
Received: 8
Sent: 7
Connections: 3
Outstanding: 0
Zxid: 0x10000006f
Mode: follower
Node count: 39
=== 查看集群状态

[19:55:52 root@zookeeper-node1 ~]#zkServer.sh status
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: follower

[19:58:50 root@zookeeper-node2 ~]#zkServer.sh status
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: follower

[19:57:37 root@zookeeper-node3 spring-boot-helloWorld]#zkServer.sh status
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: leader

=== 停掉leader,查看其余follower情况,node2自动提升为leader
[19:57:55 root@zookeeper-node3 spring-boot-helloWorld]#systemctl stop zookeeper.service

[19:59:01 root@zookeeper-node2 ~]#zkServer.sh status
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: leader

[19:55:55 root@zookeeper-node1 ~]#zkServer.sh status
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: follower

=== 恢复node3,变为follower,其余node不变
[20:01:18 root@zookeeper-node3 spring-boot-helloWorld]#zkServer.sh status
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: follower

[20:00:26 root@zookeeper-node1 ~]#zkServer.sh status
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: follower

[20:00:22 root@zookeeper-node2 ~]#zkServer.sh status
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: leader

  到此,zookeeper配置完毕,开始安装kafka,首先了解kafka是什么

Kafka用于构建实时数据管道和流应用程序。 它具有水平可伸缩性,容错性,快速性,可在数千家组织
中同时投入生产协同工作。 

 

 

特点
分布式: 多机实现,不允许单机
分区: 一个消息.可以拆分出多个,分别存储在多个位置
多副本: 防止信息丢失,可以多来几个备份
多订阅者: 可以有很多应用连接kafka
zookeeper:前提环境zookeeper
优势
kafka 通过 O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以 TB 级别以上的消息存
储也能够保持长时间的稳定性能。
高吞吐量:即使是非常普通的硬件Kafka也可以支持每秒数百万的消息。支持通过Kafka 服务器分
区消息。
支持 Hadoop 并行数据加载。
通常用于大数据场合,传递单条消息比较大(Rabbitmq 消息主要是传输业务的指令数据,单条数据较
小) 

 

 

kafka最主要的是分区和副本概念,分片是负载均衡,解决单台服务器容量问题,副本技术是高可用,解决存储数据问题

 

 

 

Broker 依赖于 Zookeeper,每个Broker 的id 和 Topic、Partition这些元数据信息都会写入
Zookeeper 的 ZNode 节点中
consumer 依赖于Zookeeper,Consumer 在消费消息时,每消费完一条消息,会将产生的offset
保存到 Zookeeper 中,下次消费在当前offset往后继续消费.kafka0.9 之前Consumer 的offset 存
储在 Zookeeper 中,kafka0,9 以后offset存储在本地。
Partition 依赖于 Zookeeper,Partition 完成Replication 备份后,选举出一个Leader,这个是依
托于 Zookeeper 的选举机制实现的 
关键配置在于集群配置:唯一的broker.id, zookeeper.connect -》zookeeper集群
[20:01:48 root@zookeeper-node2 ~]#cat /usr/local/kafka/config/server.properties
broker.id=2
listeners=PLAINTEXT://10.0.0.202:9092
log.dirs=/usr/local/kafka/data
num.partitions=1
log.retention.hours=168
zookeeper.connect=10.0.0.201:2181,10.0.0.202:2181,10.0.0.203:2181
zookeeper.connection.timeout.ms=6000h

 

 

 

 

 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM