又一个支持断线重连、永久watcher、递归操作 ZooKeeper 客户端
2017-10-14

项目介绍

ZooKeeper本质上是一个分布式的小文件存储系统。原本是Apache Hadoop的一个组件,现在被拆分为一个Hadoop的独立子项目。

Zookeeper 作为一个分布式的服务框架,主要用来解决分布式集群中应用系统的一致性问题,它能提供基于类似于文件系统的目录节点树方式的数据存储,但是 Zookeeper 并不是用来专门存储数据的,它的作用主要是用来维护和监控你存储的数据的状态变化。通过监控这些数据状态的变化,从而可以达到基于数据的集群管理。

Zookeeper 在Windows安装和使用,可参考http://www.cnblogs.com/shanyou/p/3221990.html

ZookeeperClient是在https://github.com/shayhatsor/zookeeper基础上的再次封装,使开发者更方便使用ZooKeeper相关的功能。

ZookeeperClient实现了断线重连,会话过期重连,永久监听,子节点数据变化的监听。并且加入了常用功能,例如分布式锁,Leader选举,分布式队列

,项目地址https://github.com/milanyangbo/ZooKeeper.Net

使用说明

下面列一下常用的使用方法,不仅限于此哦!

一、创建ZKClient对象

创建ZKClient对象 有两种方式可以方便的创建ZKClient对象

  1. 使用构造函数创建

 string address = "localhost:2181"; 
   ZKClient zkClient1 = new ZKClient(address);     
   ZKClient zkClient2 = new ZKClient(address, TimeSpan.FromMilliseconds(10000));   
   ZKClient zkClient3 = new ZKClient(address, TimeSpan.FromMilliseconds(10000), TimeSpan.FromMilliseconds(10000));   
   ZKClient zkClient4 = new ZKClient(address, TimeSpan.FromMilliseconds(30000), TimeSpan.FromMilliseconds(10000), new SerializableSerializer());      
   ZKClient zkClient5 = new ZKClient(address, TimeSpan.FromMilliseconds(30000), TimeSpan.FromMilliseconds(10000), new SerializableSerializer(), TimeSpan.FromMilliseconds(60000)); 


  1. 使用辅助类创建

string address = "localhost:2181";  
   ZKClient zkClient = ZKClientBuilder.NewZKClient(address)  
                              .SessionTimeout(30000)//可选  
                              .Serializer(new SerializableSerializer())//可选  
                              .RetryTimeout(60000)//可选  
                              .ConnectionTimeout(10000)//可选  
                              .Build(); //创建实例


二、节点的新增、更新、删除和获取

新增节点

  1. 常规新增节点

    父节点不存在会抛出异常

    await zkClient.CreateAsync("/test1", "123", CreateMode.EPHEMERAL);  
    await zkClient.CreateAsync("/test1-1", 123, CreateMode.EPHEMERAL_SEQUENTIAL);  
    await zkClient.CreateAsync("/test1-2", 123, CreateMode.PERSISTENT);  
    await zkClient.CreateAsync("/test1-3", 123, CreateMode.PERSISTENT_SEQUENTIAL);
  1. 递归新增节点(新增节点及其父节点)

    如果父节点不存在会被一并创建。
    对于PERSISTENT类型的节点,递归创建,父节点和子节点都创建为PERSISTENT。
    对于EPHEMERAL类型的节点,递归创建,父节点都是PERSISTENT类型,而最后一级节点才是EPHEMERAL类型。(因为EPHEMERAL不能拥有子节点)
    注意:第二个参数为节点的值,指的的最后一级节点的值。

 string path = "/test8/1/2/3";     //递归创建节点及父节点
    await zkClient.CreateRecursiveAsync(path, "abc", CreateMode.PERSISTENT);     await zkClient.CreateRecursiveAsync(path, "123", ZooDefs.Ids.CREATOR_ALL_ACL, CreateMode.PERSISTENT);

  1. 特殊的EPHEMERAL类型节点

    特殊类型的EPHEMERAL节点,该节点在会话失效被删除后,重新连接会被自动创建。


更新节点数据

   string path = "/test";   await zkClient.SetDataAsync(path, "456");  
   //带期望版本号的更新,如果真实的版本号与期望版本号不一致会更新失败,抛出异常
   await zkClient.SetDataAsync(path, "123", 2);

删除节点

  1. 常规删除

    bool flag = await zkClient.DeleteAsync("/test");//删除任意版本
    bool flag = await zkClient.DeleteAsync("/test",1);//删除指定版本
  1. 递归删除(删除节点及子节点)

    string path = "/test";   
    await zkClient.DeleteRecursiveAsync(path);//如果/test下有多个子节点,会被一并删除

获取节点数据

    string path = "/test";   
    await zkClient.GetDataAsync<string>(path); //如果节点不存在抛出异常
    await zkClient.GetDataAsync<string>(path, true); //如果节点不存在返回null
    Stat stat = (await zkClient.GetZKDataAsync<string>(path)).stat; //获得数据以及stat信息

等待节点创建

    string path = "/test";    //等待直到超时或者节点创建成功。
    await zkClient.WaitUntilExistsAsync(path, TimeSpan.FromMilliseconds(5000));

三、权限管理

ZooKeeper的权限管理亦即ACL控制功能通过Server、Client两端协调完成:
Server端:
一个ZooKeeper的节点(znode)存储两部分内容:数据和状态,状态中包含ACL信息。创建一个znode会产生一个ACL列表,列表中每个ACL包括:

验证模式(scheme)
具体内容(Id)(当scheme=“digest”时,Id为用户名密码,例如“root:J0sTy9BCUKubtK1y8pkbL7qoxSw=”)
权限(perms)

ZooKeeper提供了如下几种验证模式(scheme):

digest:Client端由用户名和密码验证,譬如user:password,digest的密码生成方式是Sha1摘要的base64形式
auth:不使用任何id,代表任何已确认用户。
ip:Client端由IP地址验证,譬如172.2.0.0/24
world:固定用户为anyone,为所有Client端开放权限
super:在这种scheme情况下,对应的id拥有超级权限,可以做任何事情(cdrwa)

注意的是,exists操作和getAcl操作并不受ACL许可控制,因此任何客户端可以查询节点的状态和节点的ACL。
节点的权限(perms)主要有以下几种:

Create 允许对子节点Create操作
Read 允许对本节点GetChildren和GetData操作
Write 允许对本节点SetData操作
Delete 允许对子节点Delete操作
Admin 允许对本节点setAcl操作
Znode ACL权限用一个int型数字perms表示,perms的5个二进制位分别表示setacl、delete、create、write、read。比如0x1f=adcwr,0x1=----r,0x15=a-c-r。

四、监听相关

注意:对于断开连接时间过长造成的会话过期,由于服务器端在会话过期后会删除客户端设置的监听。

即便客户端在会话过期后自动连接成功,但是在会话过期到会话重建这段时间客户端监听的节点仍可能发生了改变,

而具体哪些变了或是没变,客户端是无法感知到的。

为了避免丢掉任何数据改变的事件,所有的监听器的都有一个回调方法(SessionExpiredHandler),用来处理会话过期这种特殊情况。

    SessionExpiredHandler = async (path) =>
                {            
                    await Task.Run(() =>
                    {                
                       Console.WriteLine(path);
                    });
                };

订阅节点的信息改变(创建节点,删除节点,添加子节点)

    IZKChildListener childListener = new ZKChildListener();  
      //子节点内容变化
    childListener.ChildChangeHandler = async (parentPath, currentChilds) =>
         {             
           await Task.Run(() =>
               {      
                     Console.WriteLine(parentPath);  
                    Console.WriteLine(string.Join(".", currentChilds));
               });
               
         };     //子节点数量变化
     childListener.ChildCountChangedHandler = async (parentPath, currentChilds) =>
         {              
          await Task.Run(() =>
               {                
                    Console.WriteLine(parentPath);    
                    Console.WriteLine(string.Join(".", currentChilds));
               });
         };     //"/testUserNode" 监听的节点,可以是现在存在的也可以是不存在的 
     zkClient.SubscribeChildChanges("/testUserNode3", childListener);

订阅节点的数据内容的变化

    IZKDataListener dataListener = new ZKDataListener();   
     // 节点创建和节点内容变化
    dataListener.DataCreatedOrChangeHandler = async (dataPath, data) =>
        {                await Task.Run(() =>
                {                     
                 Console.WriteLine(dataPath + ":" + Convert.ToString(data));
                });
        };     // 节点删除
     dataListener.DataDeletedHandler = async (dataPath) =>
         {                 await Task.Run(() =>
                 {                  
                     Console.WriteLine(dataPath);
                 });

         };      // 节点创建
      dataListener.DataCreatedHandler = async (dataPath, data) =>
          {                 
            await Task.Run(() =>
                   {                     
                     Console.WriteLine(dataPath + ":" + Convert.ToString(data));
                   });
           };       // 节点内容变化
       dataListener.DataChangeHandler = async (dataPath, data) =>
            {                  
              await Task.Run(() =>
                    {                    
                        Console.WriteLine(dataPath);
                     });

            };      zkClient.SubscribeDataChanges("/testUserNode", dataListener);

客户端状态监听

    IZKStateListener stateListener = new ZKStateListener();    //状态改变
    stateListener.StateChangedHandler = async (state) =>
         {               
          await Task.Run(() =>
                {               
                    Console.WriteLine(state.ToString());
                });
                
         };      //会话失效
      stateListener.SessionExpiredHandler = async (path) =>
           {                
             await Task.Run(() =>
                  {             
                          Console.WriteLine(path);
                  });
           };       //创建会话
        stateListener.NewSessionHandler = async () =>
             {                  
              await Task.Run(() =>{});
             };        //会话失败
        stateListener.SessionEstablishmentErrorHandler = async (ex) =>
             {                
               await Task.Run(() =>
                   {             
                     Console.WriteLine(ex.Message);
                  });

             };        
       zkClient.SubscribeStateChanges(stateListener);

五、扩展功能


分布式锁

 using (var zkClient = new ZKClient(TestUtil.zkServers))
 {    
         await zkClient.CreateRecursiveAsync("/zk/lock", null, CreateMode.Persistent);    
         //创建分布式锁, 非线程安全类,每个线程请创建单独实例。
         var _lock = new ZKDistributedLock(zkClient, "/zk/lock");    
         await _lock.LockAsync(); //获得锁
    
         //do someting
    
         await _lock.UnLockAsync();//释放锁}

Leader选举

  ( zkClient   (.))
 {          
          .(, , .);        
          listener   ();         
        .   (, ) 
                                 {                 
                                     .(   .());
                            .();
                                 };
       selector   (, , zkClient, , listener);
  
         .();        
         
          .();        
         
         .();
}

分布式队列

 ( zkClient   (.))
 {         
 .(, , .);    
          queue   long( (.), )       
          .();
    
          value   .();
   
          value    .(); 
 }