第一步:創建.net core 控制台引用
第二步:引用nuget包Rabbit.Zookeeper
第三步:看代碼
using org.apache.zookeeper;
using org.apache.zookeeper.data;
using Rabbit.Zookeeper;
using Rabbit.Zookeeper.Implementation;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using static org.apache.zookeeper.Watcher.Event;
using static org.apache.zookeeper.ZooDefs;
namespace zookeepercore
{
class Program
{
private static IZookeeperClient _client;
static async Task Main(string[] args)
{
_client = new ZookeeperClient(new ZookeeperClientOptions("ip:2181,ip:2182,ip:2183")
{
SessionTimeout = TimeSpan.FromSeconds(20),
OperatingTimeout = TimeSpan.FromSeconds(30)
});
//await GetChildrenAsyncTest();
//await ExistsAsyncTest();
//await GetDataAsyncTest();
//await CreateTest();
//await SubscribeDataChangeTest();
//await SubscribeChildrenChangeTest();
//await Test();
await MiaoShaTest();
Console.ReadLine();
}
static async Task Test()
{
var path = $"/C-20200415001";
Console.WriteLine("創建節點");
if (!await _client.ExistsAsync(path))
await _client.CreateEphemeralAsync(path, null);
Console.WriteLine("監聽節點");
await _client.SubscribeDataChange(path, (client, args) =>
{
Console.WriteLine($"監聽輸出:{args.Type}({DateTime.Now.ToShortTimeString()})");
return Task.CompletedTask;
});
await Task.Factory.StartNew(() =>
{
while (true)
{
_client.SetDataAsync(path, new byte[] { 1 });
Thread.Sleep(1000);
}
});
}
public static async Task MiaoShaTest()
{
var nodePath = "/goods";
if (await _client.ExistsAsync(nodePath))
{
Console.WriteLine("沒有找到節點");
return;
}
//創建5個線程搶三件商品
for (int i = 0; i < 5; i++)
{
Task.Factory.StartNew((index) =>
{
while (true)
{
//判斷節點是否存在
var goods = _client.GetChildrenAsync(nodePath).Result;
if (goods == null || goods.Count() <= 0)
{
Console.WriteLine($"{index}線程{Thread.CurrentThread.ManagedThreadId}沒搶到商品");
return;
}
try
{
var tempGoods = goods.First();
//刪除商品節點,刪除成功代表搶到商品
_client.DeleteAsync(nodePath + "/" + tempGoods).Wait();
Console.WriteLine($"{index}線程{Thread.CurrentThread.ManagedThreadId}搶到商品{tempGoods}");
return;
}
catch (Exception ex)
{
}
}
}, i);
}
}
/// <summary>
/// 獲取節點
/// </summary>
/// <returns></returns>
public static async Task GetChildrenAsyncTest()
{
var childrens = await _client.GetChildrenAsync("/");
childrens = await _client.GetChildrenAsync("/ApiRouteRoot");
}
/// <summary>
/// 判斷節點是否存在
/// </summary>
/// <returns></returns>
public static async Task ExistsAsyncTest()
{
var result = await _client.ExistsAsync("/ApiRouteRoot");
}
/// <summary>
/// 獲取節點值
/// </summary>
/// <returns></returns>
public static async Task GetDataAsyncTest()
{
var data = await _client.GetDataAsync("/");
data = await _client.GetDataAsync("/config/");
}
public static async Task ReconnectionTest()
{
await _client.ExistsAsync("/");
await Task.Delay(TimeSpan.FromSeconds(8));
await _client.ExistsAsync("/");
}
/// <summary>
/// 創建節點並賦值
/// </summary>
/// <returns></returns>
public static async Task CreateTest()
{
var path = $"/{Guid.NewGuid():N}";
if (await _client.ExistsAsync(path))
await _client.DeleteAsync(path);
await _client.CreateEphemeralAsync(path, Encoding.UTF8.GetBytes("abc"));
var data = (await _client.GetDataAsync(path)).ToArray();
if (data != null)
{
var value = Encoding.UTF8.GetString(data);
}
if (await _client.ExistsAsync(path))
await _client.DeleteAsync(path);
}
/// <summary>
/// 訂閱節點數據變化
/// </summary>
/// <returns></returns>
public static async Task SubscribeDataChangeTest()
{
var path = $"/{DateTime.Now:yyyy_MM_dd_HH_mm_ss_ff}";
try
{
if (await _client.ExistsAsync(path))
await _client.DeleteAsync(path);
var types = new List<EventType>();
var waitEvent = new AutoResetEvent(false);
await _client.SubscribeDataChange(path, (client, args) =>
{
types.Add(args.Type);
waitEvent.Set();
return Task.CompletedTask;
});
//created
await _client.CreateEphemeralAsync(path, null);
waitEvent.WaitOne(10000);
//modify
await _client.SetDataAsync(path, new byte[] { 1 });
waitEvent.WaitOne(10000);
//deleted
await _client.DeleteAsync(path);
waitEvent.WaitOne(10000);
}
finally
{
if (await _client.ExistsAsync(path))
await _client.DeleteAsync(path);
}
}
/// <summary>
/// 訂閱子節點變化
/// </summary>
/// <returns></returns>
public static async Task SubscribeChildrenChangeTest()
{
var path = $"/{DateTime.Now:yyyy_MM_dd_HH_mm_ss_ff}";
var path2 = $"{path}/123";
var types = new List<Watcher.Event.EventType>();
try
{
if (await _client.ExistsAsync(path))
await _client.DeleteRecursiveAsync(path);
var semaphore = new Semaphore(0, 2);
await _client.SubscribeDataChange(path, (client, args) =>
{
if (args.Type == Watcher.Event.EventType.NodeCreated)
semaphore.Release();
return Task.CompletedTask;
});
await _client.SubscribeChildrenChange(path, (client, args) =>
{
types.Add(args.Type);
semaphore.Release();
return Task.CompletedTask;
});
await _client.CreatePersistentAsync(path, null);
semaphore.WaitOne(10000);
await _client.CreatePersistentAsync(path2, null);
semaphore.WaitOne(10000);
}
finally
{
if (await _client.ExistsAsync(path))
await _client.DeleteRecursiveAsync(path);
}
}
}
}