当前位置: 首页 > news >正文

zookeeper -- 第八章 zk开源客户端 Curator介绍 (下)

2019独角兽企业重金招聘Python工程师标准>>> hot3.png

1、读取数据

构建操作包装类(Builder): GetDataBuilder getData() ---- CuratorFramework

GetDataBuilder

  • storingStatIn(org.apache.zookeeper.data.Stat stat) // 把服务器端获取的状态数据存储到stat对象

  • Byte[] forPath (String path) // 节点路劲

public void readNode(String path) throws Exception {
		Stat stat = new Stat();
		byte[] data = client.getData().storingStatIn(stat).forPath(path);
		System.out.println("读取节点" + path + "的数据:" + new String(data));
		System.out.println(stat.toString());
	}

2、更新数据

构建操作包装类 (Builder) SetDataBuilder setData() ---- CuratorFramework

SetDataBuilder

  • withVersion (int version) // 特定版本号

  • forPath (String path, byte[] data) // 节点路劲

  • forPath (String path) // 节点路劲

	public void updateNode(String path, byte[] data, int version)
			throws Exception {
		client.setData().withVersion(version).forPath(path, data);
	}

3、读取子节点

构建操作包装类(Builder):GetChildrenBuilder getChildren() --- CuratorFramework

GetChildrenBuilder

  • storingStatIn(org.apache.zookeeper.data.Stat stat) // 把服务器端获取的状态数据存储到stat对象

  • Byte[] forPath(String path) // 节点路径

  • usingWatcher(org.apache.zookeeper.Watcher watcher) // 设置watcher ,类似zk本身api ,也只能使用一次

  • usingWatcher(CuratorWatcher watcher) // 设置watcher ,类似于zk本身的api,也只能使用一次

	public void getChildren(String path) throws Exception {
		List<String> children = client.getChildren().usingWatcher(new WatcherTest()).forPath("/curator");
		for (String pth : children) {
			System.out.println("child=" + pth);
		}
	}

4、设置watcher

1、 NodeCache

  • 监听数据节点的内容变更

  • 监听节点的创建,即如果指定的节点不存在,则节点创建后,会触发这个监听

2、PathChildrenCache

  • 监听指定节点的子节点变化情况

  • 包括:新增子节点 子节点数据变更和子节点删除

3、 NodeCache介绍

构造函数

  • NodeCache(CuratorFramework client, String path)

  • NodeCache(CuratorFramework client, String path, boolean dataIsCompressed)

client : 客户端实例

path : 数据节点路径

dataIsCompressed : 是否进行数据压缩

  • 回调接口
public interface NodeCacheListener {
    // 没有参数,怎么获取事件信息以及节点数据
    void nodeChanged() throws Exception;
}

4、PathChildrenCache介绍

1、 构造函数

    public PathChildrenCache(CuratorFramework client, String path, boolean cacheData) {
        this(client, path, cacheData, false, new CloseableExecutorService(Executors.newSingleThreadExecutor(defaultThreadFactory), true));
    }

    public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, ThreadFactory threadFactory) {
        this(client, path, cacheData, false, new CloseableExecutorService(Executors.newSingleThreadExecutor(threadFactory), true));
    }

    public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, ThreadFactory threadFactory) {
        this(client, path, cacheData, dataIsCompressed, new CloseableExecutorService(Executors.newSingleThreadExecutor(threadFactory), true));
    }

    public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, ExecutorService executorService) {
        this(client, path, cacheData, dataIsCompressed, new CloseableExecutorService(executorService));
    }

2、 回调接口

public interface PathChildrenCacheListener {
    void childEvent(CuratorFramework var1, PathChildrenCacheEvent var2) throws Exception;
}

3、 构造函数参数

client : 客户端实例

path : 数据节点路径

dataIsCompressed : 是否进行数据压缩

cacheData : 用于配置是否把节点内容缓存起来,如果配置true,那么客户端在接收到节点列表变更时,也

能够获取到节点的数据内容,如果为false则无法取到数据内容

threadFactory : 通过这两个参数构造专门的线程池来处理事件通知

ExecutorService

4、 监听接口

public static enum Type {
        CHILD_ADDED,                                // 新增子节点(CHILD_ADDED)
        CHILD_UPDATED,                           // 子节点数据变更(CHILD_UPDATED)
        CHILD_REMOVED,                          // 子节点删除(CHILD_REMOVED)
        CONNECTION_SUSPENDED,
        CONNECTION_RECONNECTED,
        CONNECTION_LOST,
        INITIALIZED;

        private Type() {
        }
    }

5、 PathChildrenCache.StartMode

public static enum StartMode {
         // 异步初始化cache
        NORMAL,  
                          
        // 同步初始化客户端的cache,及创建cache后,就从服务器端拉入对应的数据
        BUILD_INITIAL_CACHE,     

        // 异步初始化,初始化完成触发事件, PathChildrenCacheEvent.Type.INITIALIZED
        POST_INITIALIZED_EVENT;

        private StartMode() {
        }
    }

6、 代码演示

public void addChildWatcher(String path) throws Exception {
		final PathChildrenCache cache = new PathChildrenCache(this.client,
				path, true);
		cache.start(StartMode.POST_INITIALIZED_EVENT);
		System.out.println(cache.getCurrentData().size());
		//byte childone[] = cache.getCurrentData().get(0).getData();
//		System.out.println("childone:"
//				+ cache.getCurrentData().get(0).getPath() + ";data="
//				+ new String(childone));
		cache.getListenable().addListener(new PathChildrenCacheListener() {
			public void childEvent(CuratorFramework client,
					PathChildrenCacheEvent event) throws Exception {
				if(event.getType().equals(PathChildrenCacheEvent.Type.INITIALIZED)){
					System.out.println("客户端子节点cache初始化数据完成");
					System.out.println("size="+cache.getCurrentData().size());
				}else if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)){
					System.out.println("添加子节点:"+event.getData().getPath());
					System.out.println("修改子节点数据:"+new String(event.getData().getData()));
				}else if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)){
					System.out.println("删除子节点:"+event.getData().getPath());
				}else if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){
					System.out.println("修改子节点数据:"+event.getData().getPath());
					System.out.println("修改子节点数据:"+new String(event.getData().getData()));
				}
			}
		});
	}

转载于:https://my.oschina.net/u/3136594/blog/1630283

相关文章:

  • Flex中使用HTTPService与服务器端交换数据
  • Oculus Touch再度被坑,这次是电商Saturn流出上架信息
  • 「镁客·请讲」打造一台眼睛专属“跑步机”,鹰视菲诺是如何用AI拯救近视的?...
  • 可关闭与最小化的右下角浮动广告代码
  • 第185天:百度星座案例
  • ccpuid:CPUID信息模块 V1.01版,支持GCC(兼容32位或64位的Windows/Linux)
  • C#面向服务编程技术WCF从入门到实战演练
  • java、数据库中命名规则
  • 分类模型——Logistics Regression
  • AR图书,看着很美其实有点坑
  • 有关libpthread.so库的问题
  • 运用免费OA让你有意想不到的效果
  • MongoDB内存机制
  • LDAP注入与防御剖析
  • PCA算法学习_1(OpenCV中PCA实现人脸降维)
  • __proto__ 和 prototype的关系
  • 【每日笔记】【Go学习笔记】2019-01-10 codis proxy处理流程
  • centos安装java运行环境jdk+tomcat
  • gcc介绍及安装
  • happypack两次报错的问题
  • leetcode-27. Remove Element
  • MySQL-事务管理(基础)
  • php的插入排序,通过双层for循环
  • Spark RDD学习: aggregate函数
  • Spring Cloud Alibaba迁移指南(一):一行代码从 Hystrix 迁移到 Sentinel
  • Terraform入门 - 1. 安装Terraform
  • V4L2视频输入框架概述
  • 阿里云Kubernetes容器服务上体验Knative
  • 技术胖1-4季视频复习— (看视频笔记)
  • 前端技术周刊 2019-01-14:客户端存储
  • 实现简单的正则表达式引擎
  • 线性表及其算法(java实现)
  • 学习使用ExpressJS 4.0中的新Router
  • 主流的CSS水平和垂直居中技术大全
  • ​你们这样子,耽误我的工作进度怎么办?
  • ​如何使用ArcGIS Pro制作渐变河流效果
  • ###51单片机学习(1)-----单片机烧录软件的使用,以及如何建立一个工程项目
  • #include到底该写在哪
  • #NOIP 2014# day.1 T2 联合权值
  • #使用清华镜像源 安装/更新 指定版本tensorflow
  • #我与Java虚拟机的故事#连载19:等我技术变强了,我会去看你的 ​
  • (2)(2.4) TerraRanger Tower/Tower EVO(360度)
  • (20050108)又读《平凡的世界》
  • (done) ROC曲线 和 AUC值 分别是什么?
  • (vue)el-checkbox 实现展示区分 label 和 value(展示值与选中获取值需不同)
  • (二)丶RabbitMQ的六大核心
  • (附源码)springboot掌上博客系统 毕业设计063131
  • (论文阅读30/100)Convolutional Pose Machines
  • (三) diretfbrc详解
  • (算法)Travel Information Center
  • (一) storm的集群安装与配置
  • * 论文笔记 【Wide Deep Learning for Recommender Systems】
  • .[hudsonL@cock.li].mkp勒索病毒数据怎么处理|数据解密恢复
  • .NET 8.0 中有哪些新的变化?
  • .Net 高效开发之不可错过的实用工具