Curator



为了更好的实现Java操作zookeeper服务器,后来出现了Curator框架,非常的强大,目前已经是Apache的顶级项目,里面提供了更多丰富的操作,例如session超时重连、主从选举、分布式计数器、分布式锁等等适用于各种复杂的zookeeper场景的API封装




1 Curator框架使用(一)

Curator框架中使用链式编程风格,易读性更强,使用工厂方法创建连接对象。

1.使用CuratorFrameworkFactory的两个静态工厂方法(参数不同)来实现

1.1 connectString:连接串

1.2 retryPolicy:重试连接策略。有四种实现,分别是:ExponentialBackoffRetry、RetryNTimes、RetryOneTimes、RetryUntilElapsed

1.3sessionTimeoutMs:会话超时时间,默认为60000ms

1.4connectionTimeoutMs连接超时时间,默认为15000ms

 

注意对于retryPolicy策略通过一个接口来让用户自定义实现




2 Curator框架使用(二)

2.1创建连接

/** 重试策略: 初始时间为1s, 重试10次 */

RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);

 

/** 通过工厂创建连接 */

CuratorFramework cf = CuratorFrameworkFactory.builder()

.connectString(ZK_ADDR)

.sessionTimeoutMs(SESSION_TIMEOUT)

.retryPolicy(retryPolicy)

.build();

 

/** 开启连接 */

cf.start();



2.2 新增节点

/**

 * 新增节点:指定节点类型(不加withMode默认为持久类型节点)、路径、数据内容

 * 1.creatingParentsIfNeeded() 递归创建父目录

 * 2.withMode() 节点类型(持久|临时)

 * 3.forPath() 指定路径

 */

cf.create()

.creatingParentsIfNeeded()

.withMode(CreateMode.PERSISTENT)

.forPath("/super/c1", "c1内容".getBytes());



2.3 删除节点

/**

 * 删除节点

 * 1.deletingChildrenIfNeeded() 递归删除

 * 2.guaranteed() 确保节点被删除

 * 3. withVersion(int version) //特定版本号  

 */

cf.delete().deletingChildrenIfNeeded().forPath("/super");



2.4 读取和修改数据

/**

 * 读取和修改数据 : getData()和setData()

 */

cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/super/c1", "c1内容".getBytes());

cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/super/c2", "c2内容".getBytes());

 

/** 读取节点内容 */

String c2_data = new String(cf.getData().forPath("/super/c2"));

System.out.println("c2_data-->"+c2_data);

 

/** 修改节点内容 */

cf.setData().forPath("/super/c2", "修改c2的内容".getBytes());

String update_c2_data = new String(cf.getData().forPath("/super/c2"));

System.out.println("update_c2_data-->"+update_c2_data);



2.5 绑定回调函数

ExecutorService pool = Executors.newCachedThreadPool();

 

cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)

.inBackground(new BackgroundCallback() {


@Override

public void proce***esult(CuratorFramework cf, CuratorEvent event)

throws Exception {

System.out.println("code-->" + event.getResultCode());

System.out.println("type-->" + event.getType());

System.out.println("线程为-->" + Thread.currentThread().getName());

}

}, pool).forPath("/super/c3", "c2的内容".getBytes());

 

System.out.println("主线程-->" + Thread.currentThread().getName());

 

Thread.sleep(Integer.MAX_VALUE);



2.6 读取子节点和判断节点是否存在

/**

 * 读取子节点的方法: getChildren()

 * 判断节点是否存在: checkExists()

 */

List<String> list = cf.getChildren().forPath("/super");

for (String p: list) {

System.out.println(p);

}

 

//如果为null标识不存在

Stat stat = cf.checkExists().forPath("/super/c4");

System.out.println(stat);



3 Curator框架使用(三)

如果要使用类似Wather的监听功能Curator必须依赖一个jar包,Maven依赖

<dependency>

<groupId>org.apache.curator</groupId>

<artifactId>curator-recipes</artifactId>

<version>2.4.2</version>

</dependency>

有了这个依赖包,使用NodeCache的方式去客户端实例中注册一个监听缓存,然后实现对应的监听方法即可,这里主要有两种监听方式

NodeCacheListener:监听节点的新增、修改操作

PathChildrenCacheListener:监听子节点的新增、修改、删除操作



4 Curator使用场景

4.1 分布式锁

在分布式场景中,为了保证数据的一致性,经常在程序运行的某一个点需要进行同步操作(java提供了synchronized或者Reentrantlock实现)比如看一个小示例,这个示例出现分布式不同步的问题

比如:之前是在高并发下访问一个程序,现在则是在高并发下访问多个服务器节点(分布式)

 

使用Curator基于zookeeper的特性提供的分布式锁来处理分布式场景的数据一致性,zookeeper本身的分布式是有写问题的,之前实现的时候遇到过,这里强烈推荐使用Curator分布式锁

public class Lock2 {
/** zk地址 */
private static final String ZK_ADDR = "192.168.1.220:2181,192.168.1.127:2181,192.168.1.128:2181";
/** session超时时间 */
private static final int SESSION_TIMEOUT = 5000; //MS
static int count = 10;
public static CuratorFramework createCuratorFramework(){
CuratorFramework cf = CuratorFrameworkFactory.builder()
.connectString(ZK_ADDR)
.sessionTimeoutMs(SESSION_TIMEOUT)
.retryPolicy(new ExponentialBackoffRetry(1000, 10))
.build();
return cf;
}
public static void main(String[] args) throws Exception {
final CountDownLatch countDown = new CountDownLatch(1);
for (int i =0; i < 10; i++) {
new Thread(new Runnable() {
@Override
public void run() {
CuratorFramework cf = createCuratorFramework();
cf.start();
//锁对象 client 锁节点
final InterProcessMutex lock = new InterProcessMutex(cf, "/super");
try {
countDown.await();
lock.acquire(); //获得锁
number();
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
lock.release();//释放锁
} catch (Exception e) {
e.printStackTrace();
}
}
}
},"t" + i).start();;
}
Thread.sleep(2000);
countDown.countDown();
}
 
public static void number() {
count--;
System.out.println(Thread.currentThread().getName() + "-->" + count);
}
}




4.2 分布式计数器功能

一说到分布式计数器,可能脑海里想到AtomicInteger(原子累加)这种经典方式,如果针对一个JVM的场景当然没问题,但是现在是在分布式场景下,就需要利用Curator框架的DistributedAtomicInteger了

public class CuratorAtomicInteger {
/** zk地址 */
private static final String ZK_ADDR = "192.168.1.220:2181,192.168.1.127:2181,192.168.1.128:2181";
/** session超时时间 */
private static final int SESSION_TIMEOUT = 5000; //MS
public static void main(String[] args) throws Exception {
CuratorFramework cf = CuratorFrameworkFactory.builder()
.connectString(ZK_ADDR)
.sessionTimeoutMs(SESSION_TIMEOUT)
.retryPolicy(new ExponentialBackoffRetry(1000, 10))
.build();
cf.start();
//使用DistributedAtomicInteger
DistributedAtomicInteger atomicInteger = new DistributedAtomicInteger(cf, "/superM", new RetryNTimes(3, 1000));
//atomicInteger.increment();
atomicInteger.add(1);
AtomicValue<Integer> atomicValue = atomicInteger.get();
System.out.println("atomicValue.succeeded()-->" + atomicValue.succeeded());
System.out.println("atomicValue.postValue()-->" + atomicValue.postValue());
System.out.println("atomicValue.preValue()-->" + atomicValue.preValue());
}
}





4.3 Barrier

4.3.1 DistributedDoubleBarrier

分布式Barrier 类DistributedDoubleBarrier: 它会阻塞所有节点上的等待进程,直到某一个被满足, 然后所有的节点同时开始,中间谁先运行完毕,谁后运行完毕不关心,但是最终一定是一块退出运行的

 

public class CuratorBarrier {
/** zk地址 */
private static final String ZK_ADDR = "192.168.1.220:2181,192.168.1.127:2181,192.168.1.128:2181";
/** session超时时间 */
private static final int SESSION_TIMEOUT = 5000; //MS
public static void main(String[] args) throws Exception{
for (int i =0; i < 5; i++) {
new Thread(new Runnable() {
@Override
public void run() {
try {
/** 实例化5个客户端对象 */
CuratorFramework cf = CuratorFrameworkFactory.builder()
.connectString(ZK_ADDR)
.sessionTimeoutMs(SESSION_TIMEOUT)
.retryPolicy(new ExponentialBackoffRetry(1000, 10))
.build();
cf.start();
DistributedDoubleBarrier barrier = new DistributedDoubleBarrier(cf, "/superBarrier", 5);
Thread.sleep(1000 * (new Random()).nextInt(3));
System.out.println(Thread.currentThread().getName() + " 已准备好!");
barrier.enter();
System.out.println("同时开始运行...");
Thread.sleep(1000 * (new Random()).nextInt(3));
System.out.println("运行完毕...");
barrier.leave();
System.out.println("同时退出运行...");
} catch (Exception e) {
e.printStackTrace();
}
}
},"t" + i).start();;
}
}
}





4.3.2 DistributedBarrier

分布式Barrier 类DistributedBarrier: 它会阻塞所有节点上的等待进程(所有节点进入待执行状态),直到“某一个人吹哨”说开始执行, 然后所有的节点同时开始

public class CuratorBarrier2 {
/** zk地址 */
private static final String ZK_ADDR = "192.168.1.220:2181,192.168.1.127:2181,192.168.1.128:2181";
/** session超时时间 */
private static final int SESSION_TIMEOUT = 5000; //MS
static DistributedBarrier barrier = null;
public static void main(String[] args) throws Exception{
for (int i =0; i < 5; i++) {
new Thread(new Runnable() {
@Override
public void run() {
try {
/** 实例化5个客户端对象 */
CuratorFramework cf = CuratorFrameworkFactory.builder()
.connectString(ZK_ADDR)
.sessionTimeoutMs(SESSION_TIMEOUT)
.retryPolicy(new ExponentialBackoffRetry(1000, 10))
.build();
cf.start();
barrier = new DistributedBarrier(cf, "/superBarrier");
System.out.println(Thread.currentThread().getName() + " 设置barrier");
barrier.setBarrier(); //设置
barrier.waitOnBarrier(); //等待
System.out.println("开始执行程序...");
} catch (Exception e) {
e.printStackTrace();
}
}
},"t" + i).start();;
}
Thread.sleep(5000);
barrier.removeBarrier(); //释放
}
}





5 Curator重试策略

Curator内部实现的几种重试策略:

1.ExponentialBackoffRetry:重试指定的次数, 且每一次重试之间停顿的时间逐渐增加.

2.RetryNTimes:指定最大重试次数的重试策略   

3.RetryOneTime:仅重试一次

4.RetryUntilElapsed:一直重试直到达到规定的时间



5.1 ExponentialBackoffRetry

ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries)

ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries, int maxSleepMs)

 

参数说明

   1.baseSleepTimeMs 初始sleep时间

   2.maxRetries 最大重试次数

   3.maxSleepMs 最大重试时间 




5.2 RetryNTimes

RetryNTimes(int n, int sleepMsBetweenRetries)


参数说明

   1.n 最大重试次数

   2.sleepMsBetweenRetries 每次重试的间隔时间

 



5.3 RetryOneTime

RetryOneTime(int sleepMsBetweenRetry)

 

参数说明

   1.sleepMsBetweenRetry为重试间隔的时间

 

5.4 RetryUntilElapsed

RetryUntilElapsed(int maxElapsedTimeMs, int sleepMsBetweenRetries)

 

参数说明

   1.maxElapsedTimeMs 最大重试时间

   2.sleepMsBetweenRetries 每次重试的间隔时间