ZooKeeper 实现分布式锁的方法示例
ZooKeeper 是一个典型的分布式数据一致性解决方案,分布式应用程序可以基于 ZooKeeper 实现诸如数据发布/订阅、负载均衡、分布式协调/通知、集群管理、Master 选举、分布式锁等功能。
节点
在介绍 ZooKeeper 分布式锁前需要先了解一下 ZooKeeper 中节点(Znode),ZooKeeper 的数据存储数据模型是一棵树(Znode Tree),由斜杠(/)的进行分割的路径,就是一个 Znode(如 /locks/my_lock)。每个 Znode 上都会保存自己的数据内容,同时还会保存一系列属性信息。
Znode 又分为以下四种类型:
类型 | 描述 |
---|---|
持久节点 | 节点创建后,会一直存在,不会因客户端会话失效而删除 |
持久顺序节点 | 基本特性与持久节点一致,创建节点的过程中,ZooKeeper 会在其名字后自动追加一个单调增长的数字后缀,作为新的节点名 |
临时节点 | 客户端会话失效或连接关闭后,该节点会被自动删除 |
临时顺序节点 | 基本特性与临时节点一致,创建节点的过程中,ZooKeeper 会在其名字后自动追加一个单调增长的数字后缀,作为新的节点名 |
锁原理
ZooKeeper 分布式锁是基于 临时顺序节点 来实现的,锁可理解为 ZooKeeper 上的一个节点,当需要获取锁时,就在这个锁节点下创建一个临时顺序节点。当存在多个客户端同时来获取锁,就按顺序依次创建多个临时顺序节点,但只有排列序号是第一的那个节点能获取锁成功,其他节点则按顺序分别监听前一个节点的变化,当被监听者释放锁时,监听者就可以马上获得锁。
而且用临时顺序节点的另外一个用意是如果某个客户端创建临时顺序节点后,自己意外宕机了也没关系,ZooKeeper 感知到某个客户端宕机后会自动删除对应的临时顺序节点,相当于自动释放锁。
如上图:ClientA 和 ClientB 同时想获取锁,所以都在 locks 节点下创建了一个临时节点 1 和 2,而 1 是当前 locks 节点下排列序号第一的节点,所以 ClientA 获取锁成功,而 ClientB 处于等待状态,这时 ZooKeeper 中的 2 节点会监听 1 节点,当 1节点锁释放(节点被删除)时,2 就变成了 locks 节点下排列序号第一的节点,这样 ClientB 就获取锁成功了。
代码测试
请确保 ZooKeeper 服务已启动,ZooKeeper 的搭建可参考Kafka 集群 中的 ZooKeeper 集群部分
以下是基于 C# 的测试,Java 可使用 Curator 框架,实现原理和上面描述是一致的,有兴趣可以看看源码,应该也不难理解。
创建 .NET Core 控制台程序 Nuget
安装 ZooKeeperNetEx.Recipes
创建 ZooKeeper Client
private const int CONNECTION_TIMEOUT = 50000; private const string CONNECTION_STRING = "127.0.0.1:2181"; private ZooKeeper CreateClient() { var zooKeeper = new ZooKeeper(CONNECTION_STRING, CONNECTION_TIMEOUT, NullWatcher.Instance); Stopwatch sw = new Stopwatch(); sw.Start(); while (sw.ElapsedMilliseconds < CONNECTION_TIMEOUT) { var state = zooKeeper.getState(); if (state == ZooKeeper.States.CONNECTED || state == ZooKeeper.States.CONNECTING) { break; } } sw.Stop(); return zooKeeper; } class NullWatcher : Watcher { public static readonly NullWatcher Instance = new NullWatcher(); private NullWatcher() { } public override Task process(WatchedEvent @event) { return Task.CompletedTask; } }
添加 Lock 方法
/// <summary> /// 加锁 /// </summary> /// <param name="key">加锁的节点名</param> /// <param name="lockAcquiredAction">加锁成功后需要执行的逻辑</param> /// <param name="lockReleasedAction">锁释放后需要执行的逻辑,可为空</param> /// <returns></returns> public async Task Lock(string key, Action lockAcquiredAction, Action lockReleasedAction = null) { // 获取 ZooKeeper Client ZooKeeper keeper = CreateClient(); // 指定锁节点 WriteLock writeLock = new WriteLock(keeper, $"/{key}", null); var lockCallback = new LockCallback(() => { lockAcquiredAction.Invoke(); writeLock.unlock(); }, lockReleasedAction); // 绑定锁获取和释放的监听对象 writeLock.setLockListener(lockCallback); // 获取锁(获取失败时会监听上一个临时节点) await writeLock.Lock(); } class LockCallback : LockListener { private readonly Action _lockAcquiredAction; private readonly Action _lockReleasedAction; public LockCallback(Action lockAcquiredAction, Action lockReleasedAction) { _lockAcquiredAction = lockAcquiredAction; _lockReleasedAction = lockReleasedAction; } /// <summary> /// 获取锁成功回调 /// </summary> /// <returns></returns> public Task lockAcquired() { _lockAcquiredAction?.Invoke(); return Task.FromResult(0); } /// <summary> /// 释放锁成功回调 /// </summary> /// <returns></returns> public Task lockReleased() { _lockReleasedAction?.Invoke(); return Task.FromResult(0); } }
多线程模拟测试
static async Task RunAsync() { Parallel.For(1, 10, async (i) => { await new ZooKeeprDistributedLock().Lock("locks", () => { Console.WriteLine($"第{i}个请求,获取锁成功:{DateTime.Now},线程Id:{Thread.CurrentThread.ManagedThreadId}"); Thread.Sleep(1000); // 业务逻辑... }, () => { Console.WriteLine($"第{i}个请求,释放锁成功:{DateTime.Now},线程Id:{Thread.CurrentThread.ManagedThreadId}"); Console.WriteLine("-------------------------------"); }); }); await Task.CompletedTask; }
虽然模拟的是多线程并行执行,但最终都会依赖锁的获取和释放而串行执行实际业务逻辑。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。
上一篇:利用Distinct()内置方法对List集合的去重问题详解
栏 目:C#教程
下一篇:聊一聊C# 8.0中的await foreach使用
本文地址:https://www.xiuzhanwang.com/a1/C_jiaocheng/4735.html
您可能感兴趣的文章
- 01-10C#实现txt定位指定行完整实例
- 01-10WinForm实现仿视频播放器左下角滚动新闻效果的方法
- 01-10C#实现清空回收站的方法
- 01-10C#实现读取注册表监控当前操作系统已安装软件变化的方法
- 01-10C#实现多线程下载文件的方法
- 01-10C#实现Winform中打开网页页面的方法
- 01-10C#实现远程关闭计算机或重启计算机的方法
- 01-10C#自定义签名章实现方法
- 01-10C#文件断点续传实现方法
- 01-10winform实现创建最前端窗体的方法
阅读排行
本栏相关
- 01-10C#通过反射获取当前工程中所有窗体并
- 01-10关于ASP网页无法打开的解决方案
- 01-10WinForm限制窗体不能移到屏幕外的方法
- 01-10WinForm绘制圆角的方法
- 01-10C#实现txt定位指定行完整实例
- 01-10WinForm实现仿视频播放器左下角滚动新
- 01-10C#停止线程的方法
- 01-10C#实现清空回收站的方法
- 01-10C#通过重写Panel改变边框颜色与宽度的
- 01-10C#实现读取注册表监控当前操作系统已
随机阅读
- 08-05织梦dedecms什么时候用栏目交叉功能?
- 01-10delphi制作wav文件的方法
- 01-11Mac OSX 打开原生自带读写NTFS功能(图文
- 08-05DEDE织梦data目录下的sessions文件夹有什
- 01-10使用C语言求解扑克牌的顺子及n个骰子
- 08-05dedecms(织梦)副栏目数量限制代码修改
- 01-10C#中split用法实例总结
- 01-11ajax实现页面的局部加载
- 04-02jquery与jsp,用jquery
- 01-10SublimeText编译C开发环境设置