当前位置: 首页 > news >正文

ZooKeeper实现分布式锁

1、基于ZooKeeper基本API实现

pom.xml
<dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.5.7</version>
</dependency>
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;public class DistributeLock {private final String connectString = "127.0.0.1:2181";private final int sessionTimeout = 2000;private final ZooKeeper zk;private CountDownLatch connectLatch = new CountDownLatch(1);private CountDownLatch waitLatch = new CountDownLatch(1);private String waitPath;private String currentNode;public DistributeLock() throws IOException, InterruptedException, KeeperException {//获取连接zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {@Overridepublic void process(WatchedEvent event) {//connectLatch 如果连接上zk,释放if(event.getState() == Event.KeeperState.SyncConnected) {connectLatch.countDown();}// waitLatch 前一个节点释放锁删除后,释放if(event.getType() == Event.EventType.NodeDeleted && event.getPath().equals(waitPath)) {waitLatch.countDown();}}});//等待zk正常连接后,再往下执行程序connectLatch.await();// 判断根节点(/locks)是否存在Stat stat = zk.exists("/locks", false);if (null == stat) {//创建根节点zk.create("/locks", "locking".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);}}//加锁public void zkLock() {//创建临时节点try {currentNode = zk.create("/locks/" + "seq-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);//判断创建的节点是否是最小的序号节点,如果是则获取到锁,如果不是则监听它序号前一个节点List<String> children = zk.getChildren("/locks", false);//如果children只有一个值,那就直接获取锁;如果有多个节点,需要判断,谁最小if(children.size() == 1) {return;} else {Collections.sort(children);//获得节点名称 seq-00000000String thisNode = currentNode.substring("/locks/".length());int index = children.indexOf(thisNode);if(index == -1) {System.out.println("数据异常");} else if (index == 0) {//就一个节点,直接获取锁return;} else {//需要监听它前一个节点变化waitPath = "/locks/" + children.get(index - 1);zk.getData(waitPath, true, null);//等待监听waitLatch.await();}}} catch (KeeperException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}}//解锁public void unZkLock() {try {zk.delete(currentNode, -1);} catch (InterruptedException e) {e.printStackTrace();} catch (KeeperException e) {e.printStackTrace();}}
}
测试:
import org.apache.zookeeper.KeeperException;import java.io.IOException;
import java.util.concurrent.TimeUnit;public class DistributeLockTest {public static void main(String[] args) throws InterruptedException, IOException, KeeperException {final DistributeLock lock1 = new DistributeLock();final DistributeLock lock2 = new DistributeLock();new Thread(new Runnable() {@Overridepublic void run() {try {lock1.zkLock();System.out.println(Thread.currentThread().getName()+"==>获得锁");TimeUnit.SECONDS.sleep(10);lock1.unZkLock();System.out.println(Thread.currentThread().getName()+"==>释放锁");} catch (Exception e) {e.printStackTrace();}}}, "线程1").start();new Thread(new Runnable() {@Overridepublic void run() {try {lock2.zkLock();System.out.println(Thread.currentThread().getName()+"==>获得锁");TimeUnit.SECONDS.sleep(10);lock2.unZkLock();System.out.println(Thread.currentThread().getName()+"==>释放锁");} catch (Exception e) {e.printStackTrace();}}}, "线程2").start();}
}

2、基于框架curator实现

pom.xml
<dependency><groupId>org.apache.curator</groupId><artifactId>curator-framework</artifactId><version>4.3.0</version>
</dependency>
<dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>4.3.0</version>
</dependency>
<dependency><groupId>org.apache.curator</groupId><artifactId>curator-client</artifactId><version>4.3.0</version>
</dependency>
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;import java.util.concurrent.TimeUnit;public class CuratorLock {public static void main(String[] args) {//创建分布式锁1final InterProcessMutex lock1 = new InterProcessMutex(getCuratorFramework(), "/locks");//创建分布式锁2final InterProcessMutex lock2 = new InterProcessMutex(getCuratorFramework(), "/locks");new Thread(new Runnable() {@Overridepublic void run() {try {lock1.acquire();System.out.println(Thread.currentThread().getName()+"==>获得锁");lock1.acquire();System.out.println(Thread.currentThread().getName()+"==>再次获得锁");TimeUnit.SECONDS.sleep(10);lock1.release();System.out.println(Thread.currentThread().getName()+"==>释放锁");lock1.release();System.out.println(Thread.currentThread().getName()+"==>再次释放锁");} catch (Exception e) {e.printStackTrace();}}}, "线程1").start();new Thread(new Runnable() {@Overridepublic void run() {try {lock2.acquire();System.out.println(Thread.currentThread().getName()+"==>获得锁");lock2.acquire();System.out.println(Thread.currentThread().getName()+"==>再次获得锁");TimeUnit.SECONDS.sleep(8);lock2.release();System.out.println(Thread.currentThread().getName()+"==>释放锁");lock2.release();System.out.println(Thread.currentThread().getName()+"==>再次释放锁");} catch (Exception e) {e.printStackTrace();}}}, "线程2").start();}private static CuratorFramework getCuratorFramework() {ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(3000, 3);CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181").connectionTimeoutMs(2000).sessionTimeoutMs(2000).retryPolicy(retryPolicy).build();//启动客户端client.start();System.out.println("zookeeper启动成功");return client;}
}

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • 浅析 VO、DTO、DO、PO 的概念
  • Oracle透明数据加密:数据泵文件导出
  • 5.SpringBoot核心源码-启动类源码分析
  • Redis 7.x 系列【23】哨兵模式
  • 进程信号
  • VINS-Fusion源码逐行解析:除单目+imu模式外的位姿初始化函数initFramePoseByPnP()及其内部函数
  • 科普文:Redis一问一答
  • 特斯拉的选择:.NET技术栈的工业级魅力
  • 第三方配件也能适配苹果了,iOS 18与iPadOS 18将支持快速配对
  • Vue1-Vue核心
  • 斐讯N1盒子刷入Armbian并安装Docker拉取网络下行流量教程
  • docker安装nginx并配置https
  • html5——列表、表格
  • JDK14新特征最全详解
  • Linux rsync文件同步工具
  • 《Javascript高级程序设计 (第三版)》第五章 引用类型
  • Android单元测试 - 几个重要问题
  • CAP理论的例子讲解
  • JavaScript设计模式与开发实践系列之策略模式
  • JS 面试题总结
  • nodejs调试方法
  • Spring思维导图,让Spring不再难懂(mvc篇)
  • V4L2视频输入框架概述
  • 初识 beanstalkd
  • 从tcpdump抓包看TCP/IP协议
  • 从零到一:用Phaser.js写意地开发小游戏(Chapter 3 - 加载游戏资源)
  • 关键词挖掘技术哪家强(一)基于node.js技术开发一个关键字查询工具
  • 全栈开发——Linux
  • 如何用Ubuntu和Xen来设置Kubernetes?
  • 温故知新之javascript面向对象
  • 问:在指定的JSON数据中(最外层是数组)根据指定条件拿到匹配到的结果
  • 小程序button引导用户授权
  • 移动端解决方案学习记录
  • 终端用户监控:真实用户监控还是模拟监控?
  • 字符串匹配基础上
  • 3月7日云栖精选夜读 | RSA 2019安全大会:企业资产管理成行业新风向标,云上安全占绝对优势 ...
  • hi-nginx-1.3.4编译安装
  • 专访Pony.ai 楼天城:自动驾驶已经走过了“从0到1”,“规模”是行业的分水岭| 自动驾驶这十年 ...
  • # Java NIO(一)FileChannel
  • #大学#套接字
  • (8)STL算法之替换
  • (C#)Windows Shell 外壳编程系列4 - 上下文菜单(iContextMenu)(二)嵌入菜单和执行命令...
  • (C#)一个最简单的链表类
  • (C语言)二分查找 超详细
  • (poj1.2.1)1970(筛选法模拟)
  • (二十四)Flask之flask-session组件
  • (附源码)apringboot计算机专业大学生就业指南 毕业设计061355
  • (附源码)ssm智慧社区管理系统 毕业设计 101635
  • (回溯) LeetCode 40. 组合总和II
  • .h头文件 .lib动态链接库文件 .dll 动态链接库
  • .net core使用RPC方式进行高效的HTTP服务访问
  • .NET Framework杂记
  • .net Signalr 使用笔记
  • .NET WPF 抖动动画
  • .NET/C# 在代码中测量代码执行耗时的建议(比较系统性能计数器和系统时间)...