curator-上手

curator-上手

丁起男 235 2022-01-13

curator-上手

curator是Netflix公司开源的一个zookeeper客户端,目前由apache进行维护。与原生客户端相比,curator的抽象层更高,功能也更丰富,是目前zookeeper使用范围最广的java客户端

依赖

        <!-- 对zookeeper的底层api的一些封装 -->
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>4.0.0</version>
        </dependency>
        <!-- 封装了一些高级特性,如:Cache事件监听、选举、分布式锁、分布式Barrier -->
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>4.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.4.11</version>
        </dependency>

创建客户端

    public static CuratorFramework getCurator(){
        return CuratorFrameworkFactory.builder()
                .connectString("localhost:2181")//服务地址
                .sessionTimeoutMs(10000)//会话超时时间
                .retryPolicy(new RetryNTimes(3,5000))//重试策略
                .namespace("dqn")//命名空间,指定命名空间后clinet的作用操作都会以其开头
                .build();
    }

使用

curator.start();//开启连接
curator.close();//断开连接

重试策略

重试策略主要分为两大类:

  • RetryForever:代表一直重试,直到连接成功
  • SleepingRetry:基于一定间隔时间的重试

判断服务状态

		CuratorFrameworkState state = curator.getState();
        System.out.println(state == CuratorFrameworkState.STARTED);

创建节点

		curator.create().creatingParentsIfNeeded()
                .withMode(CreateMode.PERSISTENT) //节点类型
                .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
                .forPath("/data","aaa".getBytes());

CreateMode节点类型

  • PERSISTENT:永久节点
  • PERSISTENT_SEQUENTIAL:永久有序节点
  • EPHEMERAL:临时节点
  • EPHEMERAL_SEQUENTIAL:临时有序节点

获取节点信息

		Stat stat = new Stat();
        byte[] bytes = curator.getData().storingStatIn(stat).forPath("/data");
        System.out.println("节点数据:"+new String(bytes));
        System.out.println("节点信息:"+stat);

Stat节点信息

  • czxid:数据节点创建时的事务id
  • ctime:数据节点创建时的时间
  • mzxid:数据节点最后一次更新时的事务id
  • mtime:数据节点最后一次更新时的时间
  • pzxid:数据节点的子节点最后一次被修改的事务id
  • cversion:子节点的更改次数
  • version:节点数据的更改次数
  • aversion:节点的acl的更改次数
  • ephemeralOwner:如果节点是临时节点,则表示创建该节点的会话的SessionID;如果节点是持久节点,则属性值为0
  • dataLength:数据内容的长度
  • numChildren:数据节点当前的子节点个数

获取子节点列表

		List<String> list = curator.getChildren().forPath("/");
        list.forEach(System.out::println);

更新节点

		curator.setData().withVersion(0)//传入版本号,如果版本号错误则拒绝更新
                .forPath("/data","bbb".getBytes());

删除节点

		curator.delete()
                .guaranteed()//如果删除失败,会重新执行,直到成功
                .deletingChildrenIfNeeded()//如果有子节点会递归删除
                .withVersion(1)//版本号,如果版本号有误则拒绝操作
                .forPath("/data");

判断节点是否存在

		Stat stat = curator.checkExists().forPath("/data");
        System.out.println("节点是否存在:"+(stat!=null));

一次性监听

		curator.getData().usingWatcher(new CuratorWatcher() {
            @Override
            public void process(WatchedEvent event) throws Exception {
                System.out.println("节点:"+event.getPath()+"发生了事件:"+event.getType());
            }
        }).forPath("/data");

永久性监听

		//使用nodeCache包装节点,对其注册的监听作用于节点,是永久性的
        NodeCache nodeCache = new NodeCache(curator,"/data");
        //设置为true,代表创建nodeCache时,就去获取对应节点的值并缓存
        nodeCache.start(true);
        nodeCache.getListenable().addListener(new NodeCacheListener() {
            @Override
            public void nodeChanged() throws Exception {
                ChildData currentData = nodeCache.getCurrentData();
                if (currentData != null){
                    System.out.println("节点路径:"+currentData.getPath()+
                            "数据:"+new String(currentData.getData()));
                }
            }
        });

监听子节点

		PathChildrenCache pathChildrenCache = new PathChildrenCache(curator,"/data",true);
        pathChildrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);

        List<ChildData> childDataList = pathChildrenCache.getCurrentData();
        System.out.println("子节点列表:");
        childDataList.forEach(childData -> System.out.println(childData.getPath()));

        pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
            @Override
            public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent event) throws Exception {
                System.out.println("节点:"+event.getData().getPath()+
                        "发生事件:"+event.getType());
            }
        });

PathChildrenCache.StartMode初始化方式

  • NORMAL:异步初始化
  • BUILD_INITIAL_CACHE:同步初始化
  • POST_INITIALIZED_EVENT:异步并通知,会调用INITIALIZED事件

PathChildrenCacheEvent.Type节点状态

  • CHILD_ADDED:添加子节点
  • CHILD_UPDATED:子节点被修改
  • CHILD_REMOVED:子节点被删除
  • CONNECTION_SUSPENDED:连接中断
  • CONNECTION_RECONNECTED:重新连接
  • CONNECTION_LOST:连接丢失
  • INITIALIZED:初始化完成

事务

		curator.inTransaction()
                .create().forPath("/node2","aaa".getBytes())
                .and().
                create().forPath("/node3","bbb".getBytes())
                .and()
                .delete().forPath("/node1")
                .and()
                .commit();

一系列操作要么全部成功,要么全部失败