当前位置: 首页 > news >正文

Zookeeprt实战(待完善)

目录

原生java客户端实战

常用API

代码

Curator客户端实战

1. maven依赖

2. 初始化客户端

3. 重试策略

4. 增删改成API

5. 监听器API

分布式ID生成器

顺序节点生成分布式ID

实现雪花算法

zookeeper实现分布式队列


原生java客户端实战

常用API

  • create(path, data, acl,createMode): 创建一个给定路径的 znode,并在 znode 保存 data[]的 数据,createMode指定 znode 的类型。
  • delete(path, version):如果给定 path 上的 znode 的版本和给定的 version 匹配, 删除 znode。
  • exists(path, watch):判断给定 path 上的 znode 是否存在,并在 znode 设置一个 watch。
  • getData(path, watch):返回给定 path 上的 znode 数据,并在 znode 设置一个 watch。
  • setData(path, data, version):如果给定 path 上的 znode 的版本和给定的 version 匹配,设置 znode 数据。
  • getChildren(path, watch):返回给定 path 上的 znode 的孩子 znode 名字,并在 znode 设置一个 watch。
  • sync(path):把客户端 session 连接节点和 leader 节点进行同步。

代码

1. 引入maven

        <dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.8.0</version></dependency>

2. 增删改查API 


@FixMethodOrder(MethodSorters.NAME_ASCENDING)
public class ZkJavaClient {private final static  String CLUSTER_CONNECT_STR="192.168.6.128:2181,192.168.6.128:2182,192.168.6.128:2183";private static ZooKeeper zooKeeper;// 初始化zk客户端@BeforeClasspublic static void initZookeeper() throws IOException, InterruptedException {System.out.println("initZookeeper");CountDownLatch countDownLatch =new CountDownLatch(1);zooKeeper = new ZooKeeper(CLUSTER_CONNECT_STR, 3000, new Watcher() {@Overridepublic void process(WatchedEvent event) {if(Event.KeeperState.SyncConnected==event.getState()&& event.getType()== Event.EventType.None){//如果收到了服务端的响应事件,连接成功countDownLatch.countDown();System.out.println("zookeeper连接建立");}}});System.out.println("zookeeper连接中...");countDownLatch.await();// 打印连接状态System.out.println(zooKeeper.getState());}public static String  getUniqueNode(String node) {return node + "-" + System.currentTimeMillis();}private static  String syncNode = getUniqueNode("/user");// 新增-同步@Testpublic void a_createSync() throws InterruptedException, KeeperException {String result = zooKeeper.create(syncNode, "kk".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);System.out.println("同步创建node成功" + result);}// 新增-异步@Testpublic void b_createASync() throws InterruptedException, KeeperException {zooKeeper.create(getUniqueNode("/user-sync"), "kk".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT,new AsyncCallback.StringCallback() {@Overridepublic void processResult(int rc, String path, Object ctx, String name) {System.out.println(String.format("异步创建node成功 rc  %s, path %s,ctx %s,name %s",rc,path,ctx,name));}}, "context");Thread.sleep(1000 * 2);}// 更新-同步@Testpublic void c_updateSync() throws InterruptedException, KeeperException {Stat stat = new Stat();byte[] data = zooKeeper.getData(syncNode, false, stat);System.out.println("node = " + syncNode + "修改前: " + new String(data));stat = zooKeeper.setData(syncNode, "kk2".getBytes(), stat.getVersion());System.out.println("同步修改node = " + syncNode + "成功   " + stat);data = zooKeeper.getData(syncNode, false, stat);System.out.println("node = " + syncNode + "修改后: " + new String(data));}// 删除@Testpublic void d_delSync() throws InterruptedException, KeeperException {Stat stat = new Stat();byte[] data = zooKeeper.getData(syncNode, false, stat);System.out.println("node = " + syncNode + "删除前: " + new String(data));zooKeeper.delete(syncNode, stat.getVersion());System.out.println("同步删除node = " + syncNode + "成功   ");stat = zooKeeper.exists(syncNode, null);System.out.println("node = " + syncNode + "删除后: " + stat);}
}

Curator客户端实战

1. maven依赖

Curator 包含了几个包:

  • curator-framework是对ZooKeeper的底层API的一些封装。
  • curator-client提供了一些客户端的操作,例如重试策略等。
  • curator-recipes封装了一些高级特性,如:Cache事件监听、选举、分布式锁、分布式计数器、分布式Barrier等。
<!-- zookeeper client -->
<dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.8.0</version>
</dependency><!--curator-->
<dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>5.1.0</version><exclusions><exclusion><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId></exclusion></exclusions>
</dependency>

2. 初始化客户端

private final static  String CLUSTER_CONNECT_STR="192.168.6.128:2181,192.168.6.128:2182,192.168.6.128:2183";private static CuratorFramework client;@BeforeClasspublic static void initZookeeper() {System.out.println("initZookeeper");RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);client = CuratorFrameworkFactory.builder().connectString(CLUSTER_CONNECT_STR).sessionTimeoutMs(5000)  // 会话超时时间.connectionTimeoutMs(5000) // 连接超时时间.retryPolicy(retryPolicy).namespace("Curator-test") // 包含隔离名称.build();client.start();}

3. 重试策略

// 定义重试策略        
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);

        当客户端异常退出或者与服务端失去连接的时候,可以通过设置客户端重新连接 ZooKeeper 服务端。而 Curator 提供了 一次重试、多次重试等不同种类的实现方式。在 Curator 内部,可以通过判断服务器返回的 keeperException 的状态代码来判断是否进行重试处理,如果返回的是 OK 表示一切操作都没有问题,而 SYSTEMERROR 表示系统或服务端错误。

策略名称

描述

ExponentialBackoffRetry

重试一组次数,重试之间的睡眠时间增加

RetryNTimes

重试最大次数

RetryOneTime

只重试一次

RetryUntilElapsed

在给定的时间结束之前重试

4. 增删改成API


@FixMethodOrder(MethodSorters.NAME_ASCENDING)
public class ZkCuratorClient {private final static  String CLUSTER_CONNECT_STR="192.168.6.128:2181,192.168.6.128:2182,192.168.6.128:2183";private static CuratorFramework client;@BeforeClasspublic static void initZookeeper() {System.out.println("initZookeeper");RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);client = CuratorFrameworkFactory.builder().connectString(CLUSTER_CONNECT_STR).sessionTimeoutMs(5000)  // 会话超时时间.connectionTimeoutMs(5000) // 连接超时时间.retryPolicy(retryPolicy).namespace("Curator-test") // 包含隔离名称.build();client.start();}private static String TMP_PATH = getUniqueNode("/curator-kk");;// 创建单节点@Testpublic void a_Create() throws Exception {
//        String path = client.create().forPath("/curator-node");String pathResult = client.create().withMode(CreateMode.PERSISTENT).forPath(TMP_PATH, "kk".getBytes());System.out.println("同步创建node成功, path = " + TMP_PATH + " result = " + pathResult);}public static String  getUniqueNode(String node) {return node + "-" + System.currentTimeMillis();}// 创建父子节点@Testpublic void b_Create_Parent() throws Exception {String pathWithParent = getUniqueNode("/kk-parent/kk-sub-1");String pathResult = client.create().creatingParentsIfNeeded().forPath(pathWithParent, "kk_son".getBytes());System.out.println("同步创建node成功, path = " + pathWithParent + " result = " + pathResult);}// 更新节点@Testpublic void c_SetData() throws Exception {Stat stat = client.setData().forPath(TMP_PATH, "changed!".getBytes());System.out.println("更新node成功, path = " + TMP_PATH + " result = " + stat);}// 查询节点@Testpublic void d_GetData() throws Exception {byte[] bytes = client.getData().forPath(TMP_PATH);System.out.println("查询node成功, path = " + TMP_PATH + " result = " + new String(bytes));}// 删除节点@Testpublic void e_Delete() throws Exception {String pathWithParent="/kk-parent";client.delete().guaranteed().deletingChildrenIfNeeded().forPath(pathWithParent);}// 查询节点 - 异步@Testpublic void f_GetData_Async() throws Exception {client.getData().inBackground((item1, item2) -> {System.out.println(" background:  val  " + new String(item2.getData()) + " item2 = " + item2);}).forPath(TMP_PATH);Thread.sleep(1000 * 2);}// 查询节点 - 异步 - 指定线程池@Testpublic void g_GetData_Async_Excutor() throws Exception {ExecutorService executorService = Executors.newSingleThreadExecutor();client.getData().inBackground((item1, item2) -> {System.out.println(" background:  val  " + new String(item2.getData()) + " item2 = " + item2);},executorService).forPath(TMP_PATH);Thread.sleep(1000 * 2);}
}

5. 监听器API

  • NodeCache: 监听单节点
  • PathChildrenCache: 监听子节点
  • TreeCache: 监听所有层级子节点(树节点)

@FixMethodOrder(MethodSorters.NAME_ASCENDING)
public class ZkCuratorWatchClient {private final static  String CLUSTER_CONNECT_STR="192.168.6.128:2181,192.168.6.128:2182,192.168.6.128:2183";private static CuratorFramework client;@BeforeClasspublic static void initZookeeper() {System.out.println("initZookeeper");RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);client = CuratorFrameworkFactory.builder().connectString(CLUSTER_CONNECT_STR).sessionTimeoutMs(5000)  // 会话超时时间.connectionTimeoutMs(5000) // 连接超时时间.retryPolicy(retryPolicy).namespace("Curator-test") // 包含隔离名称.build();client.start();}private static String TMP_PATH = "/curator-kk-w";;// 添加单节点监听器-永久@Testpublic void a_addWatch() throws Exception {createIfNeed(TMP_PATH);NodeCache nodeCache = new NodeCache(client, TMP_PATH);nodeCache.getListenable().addListener(new NodeCacheListener() {@Overridepublic void nodeChanged() throws Exception {System.out.println(TMP_PATH + " path nodeChanged");print_GetData();}});nodeCache.start();Thread.sleep(1000 * 300);}// 添加子节点(Child)监听器-永久@Testpublic void b_addWatch_Child() throws Exception {createIfNeed(TMP_PATH);PathChildrenCache nodeCache = new PathChildrenCache(client, TMP_PATH, true);nodeCache.getListenable().addListener(new PathChildrenCacheListener() {@Overridepublic void childEvent(CuratorFramework cli, PathChildrenCacheEvent event) throws Exception {ChildData data = event.getData();System.out.println(" path nodeChanged" + data.getPath() + " type = " + event.getType() + " val = " + new String(data.getData()));}});nodeCache.start();Thread.sleep(1000 * 300);}// 添加所有子节点(Tree)监听器-永久@Testpublic void testTreeCache() throws Exception {createIfNeed(TMP_PATH);TreeCache treeCache = new TreeCache(client, TMP_PATH);treeCache.getListenable().addListener(new TreeCacheListener() {@Overridepublic void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {ChildData data = event.getData();System.out.println(" path nodeChanged" + data.getPath() + " type = " + event.getType() + " val = " + new String(data.getData()));}});treeCache.start();Thread.sleep(1000 * 300);}private void createIfNeed(String path) throws Exception {Stat stat = client.checkExists().forPath(path);System.out.println(stat);if  (stat == null) {String pathResult = client.create().withMode(CreateMode.PERSISTENT).forPath(path, "kk".getBytes());System.out.println("同步创建node成功, path = " + path + " result = " + pathResult);}}public void print_GetData() throws Exception {byte[] bytes = client.getData().forPath(TMP_PATH);System.out.println("查询node成功, path = " + TMP_PATH + " result = " + new String(bytes));}
}

分布式ID生成器

  • java的UUID
  • mongo的ObjectId
  • Redis的incr生成id
  • Twitter的SnowFlake算法
  • zookeeper的顺序节点

顺序节点生成分布式ID


public class IDMaker{private static CuratorFramework client;private final static  String CLUSTER_CONNECT_STR="192.168.6.128:2181,192.168.6.128:2182,192.168.6.128:2183";// 初始化客户端static  {RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);client = CuratorFrameworkFactory.builder().connectString(CLUSTER_CONNECT_STR).sessionTimeoutMs(5000)  // 会话超时时间.connectionTimeoutMs(5000) // 连接超时时间.retryPolicy(retryPolicy).namespace("Curator-test") // 包含隔离名称.build();client.start();}// 创建临时顺序节点private String createSeqNode(String pathPefix) throws Exception {//创建一个临时顺序节点String destPath = client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(pathPefix);return destPath;}// 生成分布式idpublic String  makeId(String path) throws Exception {String str = createSeqNode(path);if(null != str){//获取末尾的序号int index = str.lastIndexOf(path);if(index>=0){index+=path.length();return index<=str.length() ? str.substring(index):"";}}return str;}// 测试-多线程批量生成idpublic static void main(String[] args) throws Exception {String path = "/idmarker/id-";IDMaker idMaker = new IDMaker();for (int i = 0; i < 5; i++) {new Thread(() -> {for (int j = 0; j < 10; j++) {try {String id = idMaker.makeId(path);System.out.println(Thread.currentThread().getName() + " 第" + j + "生产的id = " +id);} catch (Exception e) {System.err.println(e);}}}, "thread-" + i).start();}Thread.sleep(1000 * 300);}
}

实现雪花算法

==

zookeeper实现分布式队列

相关文章:

  • java虚拟机内存管理
  • 最新版本Vue3的学习笔记-第五章
  • java 内部错误2753_内部错误 2753.regutils.dll
  • LLM之RAG实战(九)| 高级RAG 03:多文档RAG体系结构
  • webrtc turn服务器搭建
  • leetcode 75. 颜色分类(medium)(优质解法)
  • 每日一练:LeeCode-347. 前 K 个高频元素(中) - 【优先级队列】
  • docker-compose Install TeamCity
  • git教程——日常工作git使用流程
  • Android Matrix画布Canvas旋转Rotate,Kotlin
  • Xcode 编译速度慢是什么原因?如何提高编译速度?
  • 太阳系三体模拟器
  • PHP序列化总结1--序列化和反序列化的基础知识
  • UEFI模拟环境搭建——windows+EDKII
  • TiDB 7.1 多租户在中泰证券中的应用
  • C++11: atomic 头文件
  • Create React App 使用
  • Date型的使用
  • ES6语法详解(一)
  • Java-详解HashMap
  • JS基础篇--通过JS生成由字母与数字组合的随机字符串
  • js作用域和this的理解
  • mysql_config not found
  • Python socket服务器端、客户端传送信息
  • vue从入门到进阶:计算属性computed与侦听器watch(三)
  • 分类模型——Logistics Regression
  • 后端_ThinkPHP5
  • 经典排序算法及其 Java 实现
  • 开源SQL-on-Hadoop系统一览
  • 如何使用 OAuth 2.0 将 LinkedIn 集成入 iOS 应用
  • 如何用vue打造一个移动端音乐播放器
  • 如何优雅地使用 Sublime Text
  • 如何在GitHub上创建个人博客
  • -- 数据结构 顺序表 --Java
  • 微服务框架lagom
  • !$boo在php中什么意思,php前戏
  • # Pytorch 中可以直接调用的Loss Functions总结:
  • # 移动硬盘误操作制作为启动盘数据恢复问题
  • #微信小程序:微信小程序常见的配置传旨
  • $Django python中使用redis, django中使用(封装了),redis开启事务(管道)
  • (C语言)共用体union的用法举例
  • (done) 两个矩阵 “相似” 是什么意思?
  • (SERIES10)DM逻辑备份还原
  • (SpringBoot)第二章:Spring创建和使用
  • (附源码)springboot青少年公共卫生教育平台 毕业设计 643214
  • (附源码)计算机毕业设计ssm高校《大学语文》课程作业在线管理系统
  • (五)c52学习之旅-静态数码管
  • (原創) 如何動態建立二維陣列(多維陣列)? (.NET) (C#)
  • (原創) 如何將struct塞進vector? (C/C++) (STL)
  • (转)Scala的“=”符号简介
  • .net 7和core版 SignalR
  • .net core 3.0 linux,.NET Core 3.0 的新增功能
  • .net core开源商城系统源码,支持可视化布局小程序
  • .NET国产化改造探索(三)、银河麒麟安装.NET 8环境
  • @ComponentScan比较