Zookeeper基礎教程(五):C#實現Zookeeper分布式鎖


  分布式鎖 

  互聯網初期,我們系統一般都是單點部署,也就是在一台服務器完成系統的部署,后期隨着用戶量的增加,服務器的壓力也越來越大,響應速度越來越慢,甚至出現服務器崩潰的情況。

  為解決服務器壓力太大,響應慢的特點,分布式系統部署出現了。

  簡單的說,就是我們將系統資源部署到多台服務器中,然后使用一台服務器做入口代理,根據一些決策將接收到的請求轉發到資源服務器,這也就是我們常說的 反向代理(一般就是使用nginx)

  

   雖然分布式解決了服務器壓力的問題,但也帶來了新的問題。

  比如,我們有一個下單統計的功能,當完成下單后,需要執行統計功能,而在高訪問的情況下,可能有兩個下單請求(A和B)同時完成,然后一起執行了統計功能,這樣可能導致的結果就是A請求未將B請求數據統計在內,而B請求可能也未將A請求數據統計在內,這樣就造成了數據的統計錯,這個問題的產生的根本原因就是統計功能的並發導致的,如果是單點部署的系統,我們簡單的使用一個鎖操作就能完成了,但是在分布式環境下,A和B請求可能同時運行在兩個服務器中,普通的鎖就不能起到效果了,這個時候就要使用分布式鎖了。

 

  Zookeeper分布式鎖原理

  分布式鎖的實現發放有多種,簡單的,我們可以使用數據庫表去實現它,也可以使用redis去實現它,這里要使用的Zookeeper去實現分布式鎖

  Zookeeper分布式鎖的原理是巧妙的是使用了znode臨時節點的特點和監聽(watcher)機制,監聽機制很簡單,就是我們可以給znode添加一個監聽器,當znode節點狀態發生改變時(如:數據內容改變,節點被刪除),會通知到監聽器。

  前面幾節介紹過znode有三種類型  

  PERSISTENT:持久節點,即使在創建該特定znode的客戶端斷開連接后,持久節點仍然存在。默認情況下,除非另有說明,否則所有znode都是持久的。
  EPHEMERAL:臨時節點,客戶端是連接狀態時,臨時節點就是有效的。當客戶端與ZooKeeper集合斷開連接時,臨時節點會自動刪除。臨時節點不允許有子節點。臨時節點在leader選舉中起着重要作用。
  SEQUENTIAL:順序節點,可以是持久的或臨時的。當一個新的znode被創建為一個順序節點時,ZooKeeper通過將10位的序列號附加到原始名稱來設置znode的路徑,順序節點在鎖定和同步中起重要作用。

  其中,順序節點,可以是持久的或臨時的,而臨時節點有個特點,就是它屬於創建它的那個會話,當會話斷開,臨時節點就會自動刪除,如果在臨時節點上注冊了監聽器,那么監聽器就會收到通知,如果臨時節點有了時間順序,那我們為實現分布式鎖就又有一個想法:

  假如在Zookeeper中有一個znode節點/Locker

  1、當client1連接Zookeeper時,先判斷/Locker節點是否存在子節點,如果沒有子節點,那么會在/Locker節點下創建一個臨時順序的znode節點,假如是/client1,表示client1獲取了鎖狀態,client1可以繼續執行。

  2、當client2連接Zookeeper時,先判斷/Locker節點是否存在子節點,發現已經存在子節點了,然后獲取/Locker下的所有子節點,同時按時間順序排序,在最后一個節點,也就是/client1節點上注冊一個監聽器(watcher1),同時在/Locker節點下創建一個臨時順序的znode節點,假如是/client2。同時client2將被阻塞,而阻塞狀態的釋放是在監聽器(watcher1)中的。

  3、當client3連接Zookeeper時,先判斷/Locker節點是否存在子節點,發現已經存在子節點了,然后獲取/Locker下的所有子節點,同時按時間順序排序,在最后一個節點,也就是/client2節點上注冊一個監聽器(watcher2),同時在/Locker節點下創建一個臨時順序的znode節點,假如是/client3。同時client2將被阻塞,而阻塞狀態的釋放是在監聽器(watcher2)中的。

  以此類推。

  4、當client1執行完操作了,斷開Zookeeper的連接,因為/client1是臨時順序節點,於是將會自動刪除,而client2已經往/client1節點中注冊了一個監聽器(watcher1),於是watcher1將會受到通知,而watcher1又會釋放client2的阻塞狀態。於是client2獲取鎖狀態,繼續執行。

  5、當client2執行完操作了,斷開Zookeeper的連接,因為/client2是臨時順序節點,於是將會自動刪除,而client3已經往/client2節點中注冊了一個監聽器(watcher2),於是watcher2將會受到通知,而watcher2又會釋放client3的阻塞狀態。於是client3獲取鎖狀態,繼續執行。

  以此類推。

  這樣,不管分布式環境中有幾台服務器,都可以保證程序的排隊似的執行了。

 

  C#實現Zookeeper分布式鎖

  上一節有封裝過一個ZookeeperHelper的輔助類(Zookeeper基礎教程(四):C#連接使用Zookeeper),使用這個輔助類實現了一個ZookeeperLocker類:  

  
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace AspNetCore.ZookeeperConsole
{
    /// <summary>
    /// 基於Zookeeper的分布式鎖
    /// </summary>
    public class ZookeeperLocker : IDisposable
    {
        /// <summary>
        /// 單點鎖
        /// </summary>
        static object locker = new object();
        /// <summary>
        /// Zookeeper集群地址
        /// </summary>
        string[] address;
        /// <summary>
        /// Zookeeper操作輔助類
        /// </summary>
        ZookeeperHelper zookeeperHelper;

        /// <summary>
        /// 構造函數
        /// </summary>
        /// <param name="lockerPath">分布式鎖的根路徑</param>
        /// <param name="address">集群地址</param>
        public ZookeeperLocker(string lockerPath, params string[] address) : this(lockerPath, 0, address)
        {
        }
        /// <summary>
        /// 構造函數
        /// </summary>
        /// <param name="lockerPath">分布式鎖的根路徑</param>
        /// <param name="sessionTimeout">回話過期時間</param>
        /// <param name="address">集群地址</param>
        public ZookeeperLocker(string lockerPath, int sessionTimeout, params string[] address)
        {
            this.address = address.ToArray();

            zookeeperHelper = new ZookeeperHelper(address, lockerPath);
            if (sessionTimeout > 0)
            {
                zookeeperHelper.SessionTimeout = sessionTimeout;
            }
            if (!zookeeperHelper.Connect())
            {
                throw new Exception("connect failed:" + string.Join(",", address));
            }
            lock (locker)
            {
                if (!zookeeperHelper.Exists())//根節點不存在則創建
                {
                    zookeeperHelper.SetData("", "", true);
                }
            }
        }
        /// <summary>
        /// 生成一個鎖
        /// </summary>
        /// <returns>返回鎖名</returns>
        public string CreateLock()
        {
            var path = Guid.NewGuid().ToString().Replace("-", "");
            while (zookeeperHelper.Exists(path))
            {
                path = Guid.NewGuid().ToString().Replace("-", "");
            }
            return CreateLock(path);
        }
        /// <summary>
        /// 使用指定的路徑名稱設置鎖
        /// </summary>
        /// <param name="path">鎖名,不能包含路徑分隔符(/)</param>
        /// <returns>返回鎖名</returns>
        public string CreateLock(string path)
        {
            if (path.Contains("/"))
            {
                throw new ArgumentException("invalid path");
            }
            return zookeeperHelper.SetData(path, "", false, true);
        }
        /// <summary>
        /// 獲取鎖
        /// </summary>
        /// <param name="path">鎖名</param>
        /// <returns>如果獲得鎖返回true,否則一直等待</returns>
        public bool Lock(string path)
        {
            return LockAsync(path).GetAwaiter().GetResult();
        }
        /// <summary>
        /// 獲取鎖
        /// </summary>
        /// <param name="path">鎖名</param>
        /// <param name="millisecondsTimeout">超時時間,單位:毫秒</param>
        /// <returns>如果獲得鎖返回true,否則等待指定時間后返回false</returns>
        public bool Lock(string path, int millisecondsTimeout)
        {
            return LockAsync(path, millisecondsTimeout).GetAwaiter().GetResult();
        }
        /// <summary>
        /// 異步獲取鎖等等
        /// </summary>
        /// <param name="path">鎖名</param>
        /// <returns>如果獲得鎖返回true,否則一直等待</returns>
        public async Task<bool> LockAsync(string path)
        {
            return await LockAsync(path, System.Threading.Timeout.Infinite);
        }
        /// <summary>
        /// 異步獲取鎖等等
        /// </summary>
        /// <param name="path">鎖名</param>
        /// <param name="millisecondsTimeout">超時時間,單位:毫秒</param>
        /// <returns>如果獲得鎖返回true,否則等待指定時間后返回false</returns>
        public async Task<bool> LockAsync(string path, int millisecondsTimeout)
        {
            var array = await zookeeperHelper.GetChildrenAsync("", true);
            if (array != null && array.Length > 0)
            {
                var first = array.FirstOrDefault();
                if (first == path)//正好是優先級最高的,則獲得鎖
                {
                    return true;
                }

                var index = array.ToList().IndexOf(path);
                if (index > 0)
                {
                    //否則添加監聽
                    var are = new AutoResetEvent(false);
                    var watcher = new NodeWatcher();
                    watcher.NodeDeleted += (ze) =>
                    {
                        are.Set();
                    };
                    if (await zookeeperHelper.WatchAsync(array[index - 1], watcher))//監聽順序節點中的前一個節點
                    {
                        if (!are.WaitOne(millisecondsTimeout))
                        {
                            return false;
                        }
                    }

                    are.Dispose();
                }
                else
                {
                    throw new InvalidOperationException($"no locker found in path:{zookeeperHelper.CurrentPath}");
                }
            }
            return true;
        }
        /// <summary>
        /// 釋放資源
        /// </summary>
        public void Dispose()
        {
            zookeeperHelper.Dispose();
        }
    }
}
View Code

  現在寫個程序可以模擬一下  

using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;

namespace AspNetCore.ZookeeperConsole
{
    class Program
    {
        static void Main(string[] args)
        {
            //Zookeeper連接字符串,采用host:port格式,多個地址之間使用逗號(,)隔開
            string[] address = new string[] { "192.168.209.133:2181", "192.168.209.133:2181", "192.168.209.133:2181" };
            //會話超時時間,單位毫秒
            int sessionTimeOut = 10000;
            //鎖節點根路徑
            string lockerPath = "/Locker";

            for (var i = 0; i < 10; i++)
            {
                string client = "client" + i;
                //多線程模擬並發
                new Thread(() =>
                {
                    using (ZookeeperLocker zookeeperLocker = new ZookeeperLocker(lockerPath, sessionTimeOut, address))
                    {
                        string path = zookeeperLocker.CreateLock();
                        if (zookeeperLocker.Lock(path))
                        {
                            //模擬處理過程
                            Console.WriteLine($"【{client}】獲得鎖:{DateTime.Now}");
                            Thread.Sleep(3000);
                            Console.WriteLine($"【{client}】處理完成:{DateTime.Now}");
                        }
                        else
                        {
                            Console.WriteLine($"【{client}】獲得鎖失敗:{DateTime.Now}");
                        }
                    }
                }).Start();
            }
                        
            Console.ReadKey();
        }
    }
}

  運行結果如下:

  

   可以發現,鎖功能是實現了的

  如果程序運行中打印日志:Client session timed out, have not heard from server in 8853ms for sessionid 0x1000000ec5500b2

  或者直接拋出異常:org.apache.zookeeper.KeeperException.ConnectionLossException:“Exception_WasThrown”

  只需要適當調整sessionTimeOut時間即可


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM