C# 关于Zookeeper的分步式锁
关于C#分步式锁的资料很少,特别是对Zookeeper的,在网上搜索多数是java版的示例,在所以查阅了相关资料了解了分步式锁原理后,写了一个小程序,并测试通过,现在把研究成果记录一下,供大家研究学习,目前还未应用到项目中,可能会存在未知问题,就目前来讲,仅供参考。
·
关于C#分步式锁的资料很少,特别是对Zookeeper的,在网上搜索多数是java版的示例,在查阅了相关资料了解了分步式锁原理后,写了一个小程序,并测试通过,现在把研究成果记录一下,供大家研究学习,目前还未应用到项目中,可能会存在未知问题,就目前来讲,仅供参考。
第一步:布属zookeeper(步骤略,请百度)
第二步:创建项目并添加相关的包
右键创建好的项目,添加NuGet程序包,如下图
搜索并安装,记得选红框内的版本
安装后,项目中会多出两个引用
分步式锁工具类的设计
业务调用(这里在获得锁后又加了一层锁,主要考虑到分步式时,某台服务器宕机后,其它端的应用可能会同时获得锁)
实现原理
在获取锁时,是通过zookeeper在其服务端创建了目录
string result = _client.Create(lockName, "".GetBytes(), list, CreateMode.EphemeralSequential);
注意采用CreateMode.EphemeralSequential类型,创建后,获取目录下所有文件目录,并排序
List<string> childrens = (List<string>)_client.GetChildren("/", false);
IEnumerable<string> order = childrens.OrderBy(t => t);
string frist = order.First<string>();
取得最小的目录,判断是否与当前创建的目录相等,如果相等,则无需添加事件通知(在WaitLock时如果没有添加通知,则无需等待)。如果不相等,说明还有锁未释放,需要做的是对创建的目录对应的前一个目录做好监听(Watcher),举例:如果当前目录是locks_00000258,则监听locks_00000257,同时增加locks_00000257相应的目录通知事件(这里我是采用AutoResetEvent来实现的)。当locks_00000257被释放后,监听应用会将locks_00000257对应的事件进行set,locks_00000258将获得下一个锁的权力。
所以,在GetLock之后,需要通过WaitLock来监听回调。
在任务执行完后,调用Unlock来将锁释放。
下面贴相关代码
/// <summary>
/// ZooKeeperLock 分步式锁
/// </summary>
public class ZooKeeperLock
{
private string _connectString = String.Empty;
private AutoResetEvent _connectResetEvent = new AutoResetEvent(false);
public AutoResetEvent ConnectSuccessEvent { get { return _connectResetEvent; } }
private IWatcher _watcher;
/// <summary>
/// 锁释放后通过AutoResetEvent对象通知应用
/// </summary>
public static Hashtable _noticeResetsEvent = new Hashtable();
public ZooKeeperNet.ZooKeeper Client {
get { return _client; }
}
/// <summary>
/// 通知释放锁
/// </summary>
/// <param name="lockName"></param>
public void Release(string lockName)
{
if (_noticeResetsEvent.ContainsKey(lockName))
{
(_noticeResetsEvent[lockName] as AutoResetEvent).Set();
lock (_noticeResetsEvent)
{
_noticeResetsEvent.Remove(lockName);
}
}
}
/// <summary>
/// 同步等待锁
/// </summary>
/// <param name="lockName"></param>
public void WaitLock(string lockName)
{
lockName = getPrev(lockName, "/locks_");
AutoResetEvent are = null;
lock (_noticeResetsEvent)
{
if (_noticeResetsEvent.ContainsKey(lockName))
{
are = (_noticeResetsEvent[lockName] as AutoResetEvent);
}
}
if (null != are)
WaitHandle.WaitAny(new WaitHandle[] { are });
lock (_noticeResetsEvent)
{
_noticeResetsEvent.Remove(lockName);
}
}
public ZooKeeperLock(string connectString)
{
_connectString = connectString;
_watcher = new AbWatcher("", this);
_client = new ZooKeeperNet.ZooKeeper(_connectString, new TimeSpan(0, 0, 30), _watcher);
_connectResetEvent.WaitOne();
}
ZooKeeperNet.ZooKeeper _client = null;
/// <summary>
/// 重连
/// </summary>
/// <returns></returns>
public bool ReConnect()
{
_client = new ZooKeeperNet.ZooKeeper(_connectString, new TimeSpan(0, 0, 30), _watcher);
return true;
}
/// <summary>
/// 获取前一个目录名
/// </summary>
/// <param name="seq"></param>
/// <param name="lockName"></param>
/// <returns></returns>
private string getPrev(string seq, string lockName)
{
if (String.IsNullOrEmpty(seq)) return String.Empty;
string sSeqIndex = seq.Replace(lockName, "");
long seqIndex = 0;
long.TryParse(sSeqIndex, out seqIndex);
seqIndex = seqIndex - 1;
string listen = lockName + seqIndex.ToString().PadLeft(seq.Length - lockName.Length, '0');
return listen;
}
/// <summary>
/// 获取锁
/// </summary>
/// <param name="lockName"></param>
/// <returns></returns>
public string Lock()
{
try
{
HashSet<ACL> list = new HashSet<ACL>();
list.Add(new ACL((int)Perms.ALL, new ZKId("ip", "127.0.0.1")));
string lockName = "locks_";
string result = _client.Create(lockName, "".GetBytes(), list, CreateMode.EphemeralSequential);
List<string> childrens = (List<string>)_client.GetChildren("/", false);
IEnumerable<string> order = childrens.OrderBy(t => t);
string listen = getPrev(result, lockName);
string frist = order.First<string>();
if (result.Replace("/", "").Equals(frist))
{
Debug.WriteLine("当前就是:" + frist);
return result;
}
else
{
if (null != _client.Exists(listen, _watcher))
{
///加入通知事件中
lock (_noticeResetsEvent)
{
_noticeResetsEvent[listen] = new AutoResetEvent(false);
}
}
Debug.WriteLine("当前是:" + frist + "新加入的是:" + result);
return result;
}
}
catch (Exception ex)
{
throw ex;
}
}
/// <summary>
/// 释放锁
/// </summary>
/// <param name="lockName"></param>
/// <returns></returns>
public bool UnLock(string lockName)
{
try
{
_client.Delete(lockName, -1);
Debug.WriteLine("释放成功。" + lockName);
return true;
}
catch (ZooKeeperNet.KeeperException.SessionExpiredException se)
{
throw se;
}
catch (Exception ex)
{
throw ex;
}
}
}
监听
public class AbWatcher : IWatcher
{
public AbWatcher()
{
}
private string _tag = String.Empty;
private ZooKeeperLock _zoolock = null;
public ZooKeeperLock Zoolock {
set { _zoolock = value; }
}
public AbWatcher(string tag, ZooKeeperLock zoolock)
{
_tag = tag;
_zoolock = zoolock;
}
public void Process(WatchedEvent @event)
{
if (@event.Type == EventType.NodeDeleted)
{
///释放目录对应的锁
_zoolock.Release(@event.Path);
}
if (@event.State == KeeperState.SyncConnected)
{
///连接成功
_zoolock.ConnectSuccessEvent.Set();
}
if (@event.State == KeeperState.Disconnected)
{
Debug.WriteLine("服务器中断重连。");
_zoolock.ReConnect();
}
if (@event.State == KeeperState.Expired)
{
Debug.WriteLine("连接已超时重连。");
_zoolock.ReConnect();
}
}
}
调用
string lockName = z.Lock();
if (!String.IsNullOrEmpty(lockName))
{
z.WaitLock(lockName);
lock (this)
{
//do somethings...
i++;
}
z.UnLock(lockName);
}
参考资料
更多推荐
已为社区贡献1条内容
所有评论(0)