关于C#分步式锁的资料很少,特别是对Zookeeper的,在网上搜索多数是java版的示例,在查阅了相关资料了解了分步式锁原理后,写了一个小程序,并测试通过,现在把研究成果记录一下,供大家研究学习,目前还未应用到项目中,可能会存在未知问题,就目前来讲,仅供参考。
第一步:布属zookeeper(步骤略,请百度)
第二步:创建项目并添加相关的包
右键创建好的项目,添加NuGet程序包,如下图

搜索并安装,记得选红框内的版本

安装后,项目中会多出两个引用

分步式锁工具类的设计

业务调用(这里在获得锁后又加了一层锁,主要考虑到分步式时,某台服务器宕机后,其它端的应用可能会同时获得锁)

实现原理
在获取锁时,是通过zookeeper在其服务端创建了目录
string result = _client.Create(lockName, "".GetBytes(), list, CreateMode.EphemeralSequential);
注意采用CreateMode.EphemeralSequential类型,创建后,获取目录下所有文件目录,并排序
List<string> childrens = (List<string>)_client.GetChildren("/", false);
IEnumerable<string> order = childrens.OrderBy(t => t);
string frist = order.First<string>();
取得最小的目录,判断是否与当前创建的目录相等,如果相等,则无需添加事件通知(在WaitLock时如果没有添加通知,则无需等待)。如果不相等,说明还有锁未释放,需要做的是对创建的目录对应的前一个目录做好监听(Watcher),举例:如果当前目录是locks_00000258,则监听locks_00000257,同时增加locks_00000257相应的目录通知事件(这里我是采用AutoResetEvent来实现的)。当locks_00000257被释放后,监听应用会将locks_00000257对应的事件进行set,locks_00000258将获得下一个锁的权力。
所以,在GetLock之后,需要通过WaitLock来监听回调。
在任务执行完后,调用Unlock来将锁释放。

下面贴相关代码
/// <summary>
	/// ZooKeeperLock 分步式锁
	/// </summary>
	public class ZooKeeperLock
	{
		private string _connectString = String.Empty;
		
		private AutoResetEvent _connectResetEvent = new AutoResetEvent(false);

		public AutoResetEvent ConnectSuccessEvent { get { return _connectResetEvent; } }
		
		private IWatcher _watcher;

		/// <summary>
		/// 锁释放后通过AutoResetEvent对象通知应用
		/// </summary>
		public static Hashtable _noticeResetsEvent = new Hashtable();

		public ZooKeeperNet.ZooKeeper Client {
			get { return _client; }
		}

		/// <summary>
		/// 通知释放锁
		/// </summary>
		/// <param name="lockName"></param>
		public void Release(string lockName)
		{
			if (_noticeResetsEvent.ContainsKey(lockName))
			{
				(_noticeResetsEvent[lockName] as AutoResetEvent).Set();
				lock (_noticeResetsEvent)
				{
					_noticeResetsEvent.Remove(lockName);
				}
			}
		}

		/// <summary>
		/// 同步等待锁
		/// </summary>
		/// <param name="lockName"></param>
		public void WaitLock(string lockName)
		{
			lockName = getPrev(lockName, "/locks_");
			AutoResetEvent are = null;
			lock (_noticeResetsEvent)
			{
				if (_noticeResetsEvent.ContainsKey(lockName))
				{
					are = (_noticeResetsEvent[lockName] as AutoResetEvent);
				}
			}
			if (null != are)
				WaitHandle.WaitAny(new WaitHandle[] { are });
			lock (_noticeResetsEvent)
			{
				_noticeResetsEvent.Remove(lockName);
			}
		}

		public ZooKeeperLock(string connectString)
		{
			_connectString = connectString;
			_watcher = new AbWatcher("", this);
			_client = new ZooKeeperNet.ZooKeeper(_connectString, new TimeSpan(0, 0, 30), _watcher);
			_connectResetEvent.WaitOne();
		}

		ZooKeeperNet.ZooKeeper _client = null;

		/// <summary>
		/// 重连
		/// </summary>
		/// <returns></returns>
		public bool ReConnect()
		{
			_client = new ZooKeeperNet.ZooKeeper(_connectString, new TimeSpan(0, 0, 30), _watcher);
			return true;
		}

		/// <summary>
		/// 获取前一个目录名
		/// </summary>
		/// <param name="seq"></param>
		/// <param name="lockName"></param>
		/// <returns></returns>
		private string getPrev(string seq, string lockName)
		{
			if (String.IsNullOrEmpty(seq)) return String.Empty;
			string sSeqIndex = seq.Replace(lockName, "");
			long seqIndex = 0;
			long.TryParse(sSeqIndex, out seqIndex);
			seqIndex = seqIndex - 1;
			string listen = lockName + seqIndex.ToString().PadLeft(seq.Length - lockName.Length, '0');
			return listen;
		}

		/// <summary>
		/// 获取锁
		/// </summary>
		/// <param name="lockName"></param>
		/// <returns></returns>
		public string Lock()
		{
			try
			{
				HashSet<ACL> list = new HashSet<ACL>();
				list.Add(new ACL((int)Perms.ALL, new ZKId("ip", "127.0.0.1")));

				string lockName = "locks_";
				string result = _client.Create(lockName, "".GetBytes(), list, CreateMode.EphemeralSequential);

				List<string> childrens = (List<string>)_client.GetChildren("/", false);
				IEnumerable<string> order = childrens.OrderBy(t => t);

				string listen = getPrev(result, lockName);

				string frist = order.First<string>();
				if (result.Replace("/", "").Equals(frist))
				{
					Debug.WriteLine("当前就是:" + frist);
					return result;
				}
				else
				{
					if (null != _client.Exists(listen, _watcher))
					{
						///加入通知事件中
						lock (_noticeResetsEvent)
						{
							_noticeResetsEvent[listen] = new AutoResetEvent(false);
						}
					}
					Debug.WriteLine("当前是:" + frist + "新加入的是:" + result);
					return result;
				}
			}
			catch (Exception ex)
			{
				throw ex;
			}
		}
		
		/// <summary>
		/// 释放锁
		/// </summary>
		/// <param name="lockName"></param>
		/// <returns></returns>
		public bool UnLock(string lockName)
		{
			try
			{
				_client.Delete(lockName, -1);
				Debug.WriteLine("释放成功。" + lockName);
				return true;
			}
			catch (ZooKeeperNet.KeeperException.SessionExpiredException se)
			{
				throw se;
			}
			catch (Exception ex)
			{
				throw ex;
			}
		}
	}
监听
public class AbWatcher : IWatcher
	{
		public AbWatcher()
		{ 
		}

		private string _tag = String.Empty;
		private ZooKeeperLock _zoolock = null;
		public ZooKeeperLock Zoolock {
			set { _zoolock = value; }
		}

		public AbWatcher(string tag, ZooKeeperLock zoolock)
		{
			_tag = tag;
			_zoolock = zoolock;
		}

		public void Process(WatchedEvent @event)
		{
			if (@event.Type == EventType.NodeDeleted)
			{
				///释放目录对应的锁
				_zoolock.Release(@event.Path);
			}
			if (@event.State == KeeperState.SyncConnected)
			{
				///连接成功
				_zoolock.ConnectSuccessEvent.Set();
			}
			if (@event.State == KeeperState.Disconnected)
			{
				Debug.WriteLine("服务器中断重连。");
				_zoolock.ReConnect();
			}
			if (@event.State == KeeperState.Expired)
			{
				Debug.WriteLine("连接已超时重连。");
				_zoolock.ReConnect();
			}
		}
	}

调用
				string lockName = z.Lock();
				if (!String.IsNullOrEmpty(lockName))
				{
					z.WaitLock(lockName);
					lock (this)
					{
						//do somethings...
						i++;
					}
					z.UnLock(lockName);
				}

参考资料

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐