当前位置: 首页 > news >正文

跟着实例学习ZooKeeper的用法: Barrier

分布式Barrier是这样一个类: 它会阻塞所有节点上的等待进程,知道某一个被满足, 然后所有的节点继续进行。

比如赛马比赛中, 等赛马陆续来到起跑线前。 一声令下,所有的赛马都飞奔而出。

栅栏Barrier

DistributedBarrier类实现了栅栏的功能。 它的构造函数如下:


public DistributedBarrier(CuratorFramework client, String barrierPath)
Parameters:
client - client
barrierPath - path to use as the barrier


首先你需要设置栅栏,它将阻塞在它上面等待的线程:


setBarrier();


然后需要阻塞的线程调用“方法等待放行条件:


public void waitOnBarrier()


当条件满足时,移除栅栏,所有等待的线程将继续执行:


removeBarrier();


异常处理 DistributedBarrier 会监控连接状态,当连接断掉时waitOnBarrier()方法会抛出异常。

看一个例子:


package com.colobu.zkrecipe.barrier;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.barriers.DistributedBarrier;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingServer;

public class DistributedBarrierExample {
    private static final int QTY = 5;
    private static final String PATH = "/examples/barrier";

    public static void main(String[] args) throws Exception {
        try (TestingServer server = new TestingServer()) {
            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();

            ExecutorService service = Executors.newFixedThreadPool(QTY);
            DistributedBarrier controlBarrier = new DistributedBarrier(client, PATH);
            controlBarrier.setBarrier();

            for (int i = 0; i < QTY; ++i) {
                final DistributedBarrier barrier = new DistributedBarrier(client, PATH);
                final int index = i;
                Callable<Void> task = new Callable<Void>() {
                    @Override
                    public Void call() throws Exception {

                        Thread.sleep((long) (3 * Math.random()));
                        System.out.println("Client #" + index + " waits on Barrier");
                        barrier.waitOnBarrier();
                        System.out.println("Client #" + index + " begins");
                        return null;
                    }
                };
                service.submit(task);
            }

            Thread.sleep(10000);
            System.out.println("all Barrier instances should wait the condition");


            controlBarrier.removeBarrier();


            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);

        }

    }

}


这个例子创建了controlBarrier来设置栅栏和移除栅栏。 我们创建了5个线程,在此Barrier上等待。 最后移除栅栏后所有的线程才继续执行。

如果你开始不设置栅栏,所有的线程就不会阻塞住。

双栅栏Double Barrier

双栅栏允许客户端在计算的开始和结束时同步。当足够的进程加入到双栅栏时,进程开始计算, 当计算完成时,离开栅栏。 双栅栏类是DistributedDoubleBarrier。 构造函数为:


public DistributedDoubleBarrier(CuratorFramework client,
                                String barrierPath,
                                int memberQty)
Creates the barrier abstraction. memberQty is the number of members in the barrier. When enter() is called, it blocks until
all members have entered. When leave() is called, it blocks until all members have left.

Parameters:
client - the client
barrierPath - path to use
memberQty - the number of members in the barrier


memberQty是成员数量,当enter方法被调用时,成员被阻塞,直到所有的成员都调用了enter。 当leave方法被调用时,它也阻塞调用线程, 知道所有的成员都调用了leave。 就像百米赛跑比赛, 发令枪响, 所有的运动员开始跑,等所有的运动员跑过终点线,比赛才结束。

DistributedBarrier 会监控连接状态,当连接断掉时enter()leave方法会抛出异常。

例子代码:


package com.colobu.zkrecipe.barrier;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.barriers.DistributedBarrier;
import org.apache.curator.framework.recipes.barriers.DistributedDoubleBarrier;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingServer;

public class DistributedBarrierExample {
    private static final int QTY = 5;
    private static final String PATH = "/examples/barrier";

    public static void main(String[] args) throws Exception {
        try (TestingServer server = new TestingServer()) {
            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();

            ExecutorService service = Executors.newFixedThreadPool(QTY);
            for (int i = 0; i < QTY; ++i) {
                final DistributedDoubleBarrier barrier = new DistributedDoubleBarrier(client, PATH, QTY);
                final int index = i;
                Callable<Void> task = new Callable<Void>() {
                    @Override
                    public Void call() throws Exception {

                        Thread.sleep((long) (3 * Math.random()));
                        System.out.println("Client #" + index + " enters");
                        barrier.enter();
                        System.out.println("Client #" + index + " begins");
                        Thread.sleep((long) (3000 * Math.random()));
                        barrier.leave();
                        System.out.println("Client #" + index + " left");
                        return null;
                    }
                };
                service.submit(task);
            }


            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);

        }

    }

}


相关文章:

  • Partition4:增加分区
  • Oracle创建表空间
  • succ
  • 新技能,利用Reflector来修改dll引用
  • 溢出隐藏
  • .Net转前端开发-启航篇,如何定制博客园主题
  • NTFS For Mac 的特点有哪些
  • 第十次课作业(风险管理、项目收尾、知识产权)
  • 5.jenkins使用Email Extension Plugin插件配置邮件通知
  • Fouandation(NSString ,NSArray,NSDictionary,NSSet) 中常见的理解错误区
  • 如何在Kettle4.2上面实现cassandra的输入与输出
  • hibernate延迟加载
  • EventBus (四) Sticky事件
  • Centos6.6搭建中文版本的Cacti监控
  • 模拟停车POJ(3505)
  • 【刷算法】求1+2+3+...+n
  • Bytom交易说明(账户管理模式)
  • CNN 在图像分割中的简史:从 R-CNN 到 Mask R-CNN
  • crontab执行失败的多种原因
  • ES6核心特性
  • Laravel Mix运行时关于es2015报错解决方案
  • LeetCode541. Reverse String II -- 按步长反转字符串
  • oschina
  • php的插入排序,通过双层for循环
  • spring security oauth2 password授权模式
  • 对JS继承的一点思考
  • 开年巨制!千人千面回放技术让你“看到”Flutter用户侧问题
  • 设计模式(12)迭代器模式(讲解+应用)
  • 深入浏览器事件循环的本质
  • 什么是Javascript函数节流?
  • 使用Maven插件构建SpringBoot项目,生成Docker镜像push到DockerHub上
  • 硬币翻转问题,区间操作
  • 优秀架构师必须掌握的架构思维
  • HanLP分词命名实体提取详解
  • python最赚钱的4个方向,你最心动的是哪个?
  • 新年再起“裁员潮”,“钢铁侠”马斯克要一举裁掉SpaceX 600余名员工 ...
  • ​2021半年盘点,不想你错过的重磅新书
  • ​RecSys 2022 | 面向人岗匹配的双向选择偏好建模
  • ​软考-高级-系统架构设计师教程(清华第2版)【第9章 软件可靠性基础知识(P320~344)-思维导图】​
  • ​虚拟化系列介绍(十)
  • (9)目标检测_SSD的原理
  • (C语言)编写程序将一个4×4的数组进行顺时针旋转90度后输出。
  • (办公)springboot配置aop处理请求.
  • (分布式缓存)Redis分片集群
  • (附源码)springboot教学评价 毕业设计 641310
  • (附源码)ssm失物招领系统 毕业设计 182317
  • (附源码)计算机毕业设计ssm高校《大学语文》课程作业在线管理系统
  • (官网安装) 基于CentOS 7安装MangoDB和MangoDB Shell
  • (机器学习-深度学习快速入门)第三章机器学习-第二节:机器学习模型之线性回归
  • (五)关系数据库标准语言SQL
  • (一)插入排序
  • (转)Android中使用ormlite实现持久化(一)--HelloOrmLite
  • (转)MVC3 类型“System.Web.Mvc.ModelClientValidationRule”同时存在
  • (轉)JSON.stringify 语法实例讲解
  • .NET 6 Mysql Canal (CDC 增量同步,捕获变更数据) 案例版