.net core 使用zookeeper


第一步:創建.net core 控制台引用

第二步:引用nuget包Rabbit.Zookeeper

 

 第三步:看代碼

using org.apache.zookeeper;
using org.apache.zookeeper.data;
using Rabbit.Zookeeper;
using Rabbit.Zookeeper.Implementation;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using static org.apache.zookeeper.Watcher.Event;
using static org.apache.zookeeper.ZooDefs;

namespace zookeepercore
{
class Program
{
private static IZookeeperClient _client;

static async Task Main(string[] args)
{
_client = new ZookeeperClient(new ZookeeperClientOptions("ip:2181,ip:2182,ip:2183")
{
SessionTimeout = TimeSpan.FromSeconds(20),
OperatingTimeout = TimeSpan.FromSeconds(30)
});

//await GetChildrenAsyncTest();
//await ExistsAsyncTest();
//await GetDataAsyncTest();
//await CreateTest();
//await SubscribeDataChangeTest();
//await SubscribeChildrenChangeTest();
//await Test();
await MiaoShaTest();

Console.ReadLine();
}

static async Task Test()
{
var path = $"/C-20200415001";

Console.WriteLine("創建節點");
if (!await _client.ExistsAsync(path))
await _client.CreateEphemeralAsync(path, null);
Console.WriteLine("監聽節點");
await _client.SubscribeDataChange(path, (client, args) =>
{
Console.WriteLine($"監聽輸出:{args.Type}({DateTime.Now.ToShortTimeString()})");
return Task.CompletedTask;
});

await Task.Factory.StartNew(() =>
{
while (true)
{
_client.SetDataAsync(path, new byte[] { 1 });
Thread.Sleep(1000);
}
});
}

public static async Task MiaoShaTest()
{
var nodePath = "/goods";
if (await _client.ExistsAsync(nodePath))
{
Console.WriteLine("沒有找到節點");
return;
}
//創建5個線程搶三件商品
for (int i = 0; i < 5; i++)
{
Task.Factory.StartNew((index) =>
{
while (true)
{
//判斷節點是否存在
var goods = _client.GetChildrenAsync(nodePath).Result;
if (goods == null || goods.Count() <= 0)
{
Console.WriteLine($"{index}線程{Thread.CurrentThread.ManagedThreadId}沒搶到商品");
return;
}

try
{
var tempGoods = goods.First();
//刪除商品節點,刪除成功代表搶到商品
_client.DeleteAsync(nodePath + "/" + tempGoods).Wait();
Console.WriteLine($"{index}線程{Thread.CurrentThread.ManagedThreadId}搶到商品{tempGoods}");
return;
}
catch (Exception ex)
{
}
}

}, i);

}
}

/// <summary>
/// 獲取節點
/// </summary>
/// <returns></returns>
public static async Task GetChildrenAsyncTest()
{
var childrens = await _client.GetChildrenAsync("/");

childrens = await _client.GetChildrenAsync("/ApiRouteRoot");
}

/// <summary>
/// 判斷節點是否存在
/// </summary>
/// <returns></returns>
public static async Task ExistsAsyncTest()
{
var result = await _client.ExistsAsync("/ApiRouteRoot");
}

/// <summary>
/// 獲取節點值
/// </summary>
/// <returns></returns>
public static async Task GetDataAsyncTest()
{
var data = await _client.GetDataAsync("/");

data = await _client.GetDataAsync("/config/");
}

public static async Task ReconnectionTest()
{
await _client.ExistsAsync("/");
await Task.Delay(TimeSpan.FromSeconds(8));
await _client.ExistsAsync("/");
}

/// <summary>
/// 創建節點並賦值
/// </summary>
/// <returns></returns>
public static async Task CreateTest()
{
var path = $"/{Guid.NewGuid():N}";

if (await _client.ExistsAsync(path))
await _client.DeleteAsync(path);

await _client.CreateEphemeralAsync(path, Encoding.UTF8.GetBytes("abc"));

var data = (await _client.GetDataAsync(path)).ToArray();
if (data != null)
{
var value = Encoding.UTF8.GetString(data);
}

if (await _client.ExistsAsync(path))
await _client.DeleteAsync(path);
}

/// <summary>
/// 訂閱節點數據變化
/// </summary>
/// <returns></returns>
public static async Task SubscribeDataChangeTest()
{
var path = $"/{DateTime.Now:yyyy_MM_dd_HH_mm_ss_ff}";
try
{
if (await _client.ExistsAsync(path))
await _client.DeleteAsync(path);

var types = new List<EventType>();
var waitEvent = new AutoResetEvent(false);

await _client.SubscribeDataChange(path, (client, args) =>
{
types.Add(args.Type);
waitEvent.Set();
return Task.CompletedTask;
});

//created
await _client.CreateEphemeralAsync(path, null);
waitEvent.WaitOne(10000);

//modify
await _client.SetDataAsync(path, new byte[] { 1 });
waitEvent.WaitOne(10000);

//deleted
await _client.DeleteAsync(path);
waitEvent.WaitOne(10000);
}
finally
{
if (await _client.ExistsAsync(path))
await _client.DeleteAsync(path);
}
}

/// <summary>
/// 訂閱子節點變化
/// </summary>
/// <returns></returns>
public static async Task SubscribeChildrenChangeTest()
{
var path = $"/{DateTime.Now:yyyy_MM_dd_HH_mm_ss_ff}";
var path2 = $"{path}/123";
var types = new List<Watcher.Event.EventType>();

try
{
if (await _client.ExistsAsync(path))
await _client.DeleteRecursiveAsync(path);

var semaphore = new Semaphore(0, 2);

await _client.SubscribeDataChange(path, (client, args) =>
{
if (args.Type == Watcher.Event.EventType.NodeCreated)
semaphore.Release();
return Task.CompletedTask;
});
await _client.SubscribeChildrenChange(path, (client, args) =>
{
types.Add(args.Type);
semaphore.Release();
return Task.CompletedTask;
});

await _client.CreatePersistentAsync(path, null);
semaphore.WaitOne(10000);
await _client.CreatePersistentAsync(path2, null);
semaphore.WaitOne(10000);
}
finally
{
if (await _client.ExistsAsync(path))
await _client.DeleteRecursiveAsync(path);
}
}

}
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM