Docker 下的Zookeeper以及.ne core 的分布式鎖


單節點

1.拉取鏡像:docker pull zookeeper

2.運行容器

a.我的容器同一放在/root/docker下面,然后創建相應的目錄和文件,

mkdir zookeeper
cd zookeeper
mkdir data
mkdir datalog
mkdir conf
cd conf
touch zoo.cfg

其中zoo.cfg(這里是默認的主要延時怪哉文件)如下:

tickTime=2000
initLimit=10
syncLimit=5
dataDir=/data
dataLogDir=/datalog
clientPort=2181
maxClientCnxns=60

這里也設置了zookeeper默認的環境變量

b.運行實例,切換到/root/docker/zookeeper下載執行(不知道為什么這里zoo.cfg一定要用相對路徑,用絕對路徑提示docker-entrypoint.sh: line 15: /conf/zoo.cfg: Is a directory

docker run --name zookeeper --restart always -d -v$(pwd)/data:/data  -v$(pwd)/datalog:/datalog -v $(pwd)/conf/zoo.cfg:/conf/zoo.cfg  -p 2181:2181  -p 2888:2888 -p 3888:3888  zookeeperp 2181:2181  -p 2888:2888 -p 3888:3888  zookeeper
# 2181端口號是zookeeper client端口
# 2888端口號是zookeeper服務之間通信的端口
# 3888端口是zookeeper與其他應用程序通信的端口
#用絕對路徑 docker run --name zookeeper --restart always -d -v/root/docker/zookeep/data:/data  -v/root/docker/zookeep/datalog:/datalog -v /root/docker/zookeep/conf/zoo.cfg:/conf/zoo.cfg  -p 2181:2181  -p 2888:2888 -p 3888:3888  zookeeper
#提示/docker-entrypoint.sh: line 15: /conf/zoo.cfg: Is a directory
docker run --name zookeeper --restart always -d -v/root/docker/zookeep/data:/data  -v/root/docker/zookeep/datalog:/datalog -v /root/docker/zookeep/conf/:/conf/  -p 2181:2181  -p 2888:2888 -p 3888:388 zookeeper #正確的用法是不指定文件

c.zookeeper常規操作,首先執行以下指令進入zookeeper客服端:

docker exec -it zookeeper zkCli.sh -server 192.168.100.5:2181 #如果是集群server用逗號分割 -server 192.168.100.5:2181,192.168.100.6:2182
 
create /zk "zkval1" #創建zk節點
create /zk/test1 "testval1" #創建zk/test1節點
create /zk/test2 "testval2" #創建zk/test2節點
#create /test/node "node1" 失敗,不支持遞歸創建,多級時,必須一級一級創建
#create /zk/test2/ null 節點不能以 / 結尾,會直接報錯
ls -s /zk #查看zk節點信息
set /zk/test1 "{1111}" #修改節點數據
get /zk/test1 #查看節點數據
delete /zk #刪除時,須先清空節點下的內容,才能刪除節點
delete /zk/test2

集群搭建

我這里搞了很久,最后還是用官網的配置 創建docker-compose.yml文件如下:

version: '3.1'
 
services:
  zoo1:
    image: zookeeper
    restart: always
    hostname: zoo1
    ports:
      - 2181:2181
    environment:
      ZOO_MY_ID: 1
      ZOO_SERVERS: server.1=0.0.0.0:2888:3888;2181 server.2=zoo2:2888:3888;2181 server.3=zoo3:2888:3888;2181
 
  zoo2:
    image: zookeeper
    restart: always
    hostname: zoo2
    ports:
      - 2182:2181
    environment:
      ZOO_MY_ID: 2
      ZOO_SERVERS: server.1=zoo1:2888:3888;2181 server.2=0.0.0.0:2888:3888;2181 server.3=zoo3:2888:3888;2181
 
  zoo3:
    image: zookeeper
    restart: always
    hostname: zoo3
    ports:
      - 2183:2181
    environment:
      ZOO_MY_ID: 3
      ZOO_SERVERS: server.1=zoo1:2888:3888;2181 server.2=zoo2:2888:3888;2181 server.3=0.0.0.0:2888:3888;2181

最后運行docker-compose up指令,最后驗證,

docker exec -it zookeeper_zoo1_1 zkCli.sh -server 192.168.100.5:2181
create /zk "test"
quit #退出容器1
 
docker exec -it zookeeper_zoo2_1 zkCli.sh -server 192.168.100.5:2182
get /zk #在容器2獲取值
quit
 
docker exec -it zookeeper_zoo3_1 zkCli.sh -server 192.168.100.5:2183
get /zk #在容器3獲取值
quit

分布式鎖

ZooKeeper 分布式鎖是基於 臨時順序節點 來實現的,鎖可理解為 ZooKeeper 上的一個節點,當需要獲取鎖時,就在這個鎖節點下創建一個臨時順序節點。當存在多個客戶端同時來獲取鎖,就按順序依次創建多個臨時順序節點,但只有排列序號是第一的那個節點能獲取鎖成功,其他節點則按順序分別監聽前一個節點的變化,當被監聽者釋放鎖時,監聽者就可以馬上獲得鎖。而且用臨時順序節點的另外一個用意是如果某個客戶端創建臨時順序節點后,自己意外宕機了也沒關系,ZooKeeper 感知到某個客戶端宕機后會自動刪除對應的臨時順序節點,相當於自動釋放鎖。

如上圖:ClientA 和 ClientB 同時想獲取鎖,所以都在 locks 節點下創建了一個臨時節點 1 和 2,而 1 是當前 locks 節點下排列序號第一的節點,所以 ClientA 獲取鎖成功,而 ClientB 處於等待狀態,這時 ZooKeeper 中的 2 節點會監聽 1 節點,當 1節點鎖釋放(節點被刪除)時,2 就變成了 locks 節點下排列序號第一的節點,這樣 ClientB 就獲取鎖成功了。如下是c#代碼:

創建 .NET Core 控制台程序

Nuget 安裝 ZooKeeperNetEx.Recipes

創建 ZooKeeper Client, ZooKeeprLock代碼如下:

namespace ZookeeperDemo
{
    using org.apache.zookeeper;
    using org.apache.zookeeper.recipes.@lock;
    using System;
    using System.Diagnostics;
    using System.Threading.Tasks;
    public class ZooKeeprLock
    {
        private const int CONNECTION_TIMEOUT = 50000;
        private const string CONNECTION_STRING = "192.168.100.5:2181,192.168.100.5:2182,192.168.100.5:2183";
 
        /// <summary>
        /// 加鎖
        /// </summary>
        /// <param name="key">加鎖的節點名</param>
        /// <param name="lockAcquiredAction">加鎖成功后需要執行的邏輯</param>
        /// <param name="lockReleasedAction">鎖釋放后需要執行的邏輯,可為空</param>
        /// <returns></returns>
        public async Task Lock(string key, Action lockAcquiredAction, Action lockReleasedAction = null)
        {
            // 獲取 ZooKeeper Client
            ZooKeeper keeper = CreateClient();
            // 指定鎖節點
            WriteLock writeLock = new WriteLock(keeper, $"/{key}", null);
 
            var lockCallback = new LockCallback(() =>
            {
                lockAcquiredAction.Invoke();
                writeLock.unlock();
            }, lockReleasedAction);
            // 綁定鎖獲取和釋放的監聽對象
            writeLock.setLockListener(lockCallback);
            // 獲取鎖(獲取失敗時會監聽上一個臨時節點)
            await writeLock.Lock();
        }
 
        private ZooKeeper CreateClient()
        {
            var zooKeeper = new ZooKeeper(CONNECTION_STRING, CONNECTION_TIMEOUT, NullWatcher.Instance);
            Stopwatch sw = new Stopwatch();
            sw.Start();
            while (sw.ElapsedMilliseconds < CONNECTION_TIMEOUT)
            {
                var state = zooKeeper.getState();
                if (state == ZooKeeper.States.CONNECTED || state == ZooKeeper.States.CONNECTING)
                {
                    break;
                }
            }
            sw.Stop();
            return zooKeeper;
        }
 
        class NullWatcher : Watcher
        {
            public static readonly NullWatcher Instance = new NullWatcher();
            private NullWatcher() { }
            public override Task process(WatchedEvent @event)
            {
                return Task.CompletedTask;
            }
        }
 
        class LockCallback : LockListener
        {
            private readonly Action _lockAcquiredAction;
            private readonly Action _lockReleasedAction;
 
            public LockCallback(Action lockAcquiredAction, Action lockReleasedAction)
            {
                _lockAcquiredAction = lockAcquiredAction;
                _lockReleasedAction = lockReleasedAction;
            }
 
            /// <summary>
            /// 獲取鎖成功回調
            /// </summary>
            /// <returns></returns>
            public Task lockAcquired()
            {
                _lockAcquiredAction?.Invoke();
                return Task.FromResult(0);
            }
 
            /// <summary>
            /// 釋放鎖成功回調
            /// </summary>
            /// <returns></returns>
            public Task lockReleased()
            {
                _lockReleasedAction?.Invoke();
                return Task.FromResult(0);
            }
        }
 
    }
}

測試代碼:

namespace ZookeeperDemo
{
    using System;
    using System.Threading;
    using System.Threading.Tasks;
    class Program
    {
        static void Main(string[] args)
        {       
            Parallel.For(1, 10, async (i) =>
            {
                await new ZooKeeprLock().Lock("locks", () =>
                {
                    Console.WriteLine($"第{i}個請求,獲取鎖成功:{DateTime.Now},線程Id:{Thread.CurrentThread.ManagedThreadId}");
                    Thread.Sleep(1000); // 業務邏輯...
                }, () =>
                {
                    Console.WriteLine($"第{i}個請求,釋放鎖成功:{DateTime.Now},線程Id:{Thread.CurrentThread.ManagedThreadId}");
                    Console.WriteLine("-------------------------------");
                });
            });
            Console.ReadKey();
        }
    }
}

運行結果:

關於分布式鎖, 我們也可以采用數據庫和redis來實現, 各有優缺點。

參考:

ZooKeeper 實現分布式鎖

七張圖徹底講清楚ZooKeeper分布式鎖的實現原理

面試請不要再問我Redis分布式鎖的實現原理

Docker下Zookeeper集群搭建

zookeeper Docker Official

zookeeper客戶端命令詳解

How To Install and Configure an Apache ZooKeeper Cluster on Ubuntu 18.04

docker zookeeper 集群搭建

CSharpKit 微服務工具包

Redis實現分布式鎖

分布式鎖的幾種使用方式(redis、zookeeper、數據庫)


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM