当前位置: 首页 > news >正文

并发编程——线程协作

并发编程——线程协作

​ 前面学习了线程,那么并发编程中,如何协调多个线程来开发呢?


Semaphore

​ 信号量跟前面将的同步互斥解决方案——信号量是一个东西,这是JDK的信号量实现。

源码分析

​ 先看下JDK的Semaphore类注释,看下Semaphore是做什么的:

A counting semaphore. Conceptually, a semaphore maintains a set of permits. Each acquire blocks if necessary until a permit is available, and then takes it. Each release adds a permit, potentially releasing a blocking acquirer. However, no actual permit objects are used; the Semaphore just keeps a count of the number available and acts accordingly.
Semaphores are often used to restrict the number of threads than can access some (physical or logical) resource

​ 简单的翻译:

​ 就是一组许可证,可以用来限制对共享资源的访问数量。

​ 也就是说,当count>1时,信号量是允许多个线程同时访问临界区的,当count=1,就是串行化的效果。

接下来看下它的构造函数:

  public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }

 public Semaphore(int permits, boolean fair) {
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }

​ 也就是说信号量是排队的时候公平/非公平的,那我们分别看下它的两个内部类:NonfairSync与FairSync,都只有一个方法:tryAcquireShared,尝试获取许可证,来看下它们的实现:

​ 公平FairSync:

protected int tryAcquireShared(int acquires) {
            for (;;) {
                if (hasQueuedPredecessors())
                    return -1;
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }

非公平NonfairSync:

final int nonfairTryAcquireShared(int acquires) {
            for (;;) {
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }

​ 可以看到,公平锁的做法是先去等待队列里看看,如果等待队列里还有人排队,就直接返回-1,其余的操作一样,都是根据当前可用许可证-申请许可证,查看是否足够返回。

接下来是它的重要的几个方法:

public void acquire(int permits) throws InterruptedException	//获取permits数量的许可证,如果获取不到,线程就一直阻塞,但是线程可以被中断
public void acquireUninterruptibly()					 //获取许可证,如果获取不到就一直阻塞
public void release() 									//归还许可证
public boolean tryAcquire()								//尝试获取许可证
public boolean tryAcquire(int permits)					 //尝试获取permits数量的许可证
public boolean tryAcquire(long timeout, TimeUnit unit)	   //尝试在timeout时间内获取许可证

​ 从源码上我们可以看出信号量几个特性:

  • 支持公平与非公平
  • 支持每次获取不同数量的许可证,也就是说我们每次获取的时候可以设置权重(也要注意归还相同数量的许可证)
  • 并不要求归还许可证的线程=获取许可证的线程

使用举例

​ 用Semaphore演示串行的场景


public class SemaphoreDemo1 {
    static Semaphore semaphore = new Semaphore(1);

    public static void main(String[] args) {
        Thread thread1 = new Thread(new SemaphoreRunnable());
        Thread thread2 = new Thread(new SemaphoreRunnable());
        thread1.start();
        thread2.start();
        
    }

    static class SemaphoreRunnable implements Runnable {

        @Override
        public void run() {
            try {
                semaphore.acquire();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + " ,我拿到了许可证~");
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + "释放了许可证");
            semaphore.release();
        }
    }
}

![在这里插入图片描述](https://img-blog.csdnimg.cn/f3f4ab571a4a4df09a51644d90fd2a3d.png?x-oss-process=image/watermark,type_ZHJvaWRzYW5zZmFsbGJhY2s,shadow_50,text_Q1NETiBATWlyYW5hXzc3,size_18,color_FFFFFF,t_70,g_se,x_16


CountDownLatch

​ 这个词呢百度翻译出来就是倒计时门闩

源码分析

​ 照例看下源码注释对这个类的描述

A synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes.
A CountDownLatch is initialized with a given count. The await methods block until the current count reaches zero due to invocations of the countDown method, after which all waiting threads are released and any subsequent invocations of await return immediately. This is a one-shot phenomenon – the count cannot be reset. If you need a version that resets the count, consider using a CyclicBarrier.

​ 是种同步辅助机制,允许一个或者多个线程等待直到其他线程的业务逻辑执行完成。CountDownLatch 在初始化的时候有个计数器,调用一次就会倒计时,直到计数器为0就会释放所有的线程,并且,CountDownLatch 无法被重置,如果需要重复使用,用CyclicBarrier。

​ 这个类的初始化方法只有一个,就是传入计数器数量:

public CountDownLatch(int count)

​ 重要方法:

public void await() throws InterruptedException			//在计数器没到0之前一直等待,除非线程被中断
public boolean await(long timeout, TimeUnit unit)  throws InterruptedException 	//在计数器没到0之前一直等待,除非线程被中断或者到了超时时间
public void countDown()								 //计数器-1
public long getCount() 								 //获取当前计数器数量

使用举例

​ 这里演示个跑步比赛的场景:

  • 一共4名运动员,大家都准备好了等待裁判开枪

  • 4名运动员都到达重点后,终点守候的裁判发枪表明比赛结束

    public class CountDownLatchDemo {
        public static void main(String[] args) throws InterruptedException {
            CountDownLatch begin = new CountDownLatch(1);
            CountDownLatch end = new CountDownLatch(4);
            ExecutorService executorService = Executors.newFixedThreadPool(4);
            for (int i=0;i<4;i++){
                Runnable runnable = new Runnable() {
                    @Override
                    public void run() {
                        System.out.println(Thread.currentThread().getName() + " 准备好了,等待裁判发枪");
                        try {
                            begin.await();
                            System.out.println(Thread.currentThread().getName() + " 开始跑步");
                            Thread.sleep((long) (Math.random() * 10000));
                            System.out.println(Thread.currentThread().getName() + " 到达终点");
    
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }finally {
                            end.countDown();
                        }
                    }
                };
                executorService.execute(runnable);
            }
            Thread.sleep(500);
            System.out.println("发令枪响,比赛开始!");
            begin.countDown();
            end.await();
            System.out.println("4名运动员都到终点,比赛结束");
            executorService.shutdown();
        }
    }
    

在这里插入图片描述


CyclicBarrier

​ 循环屏障,从名字就可以看出它作用跟CountDownLatch差不多,但是可以循环计数。

源码分析

​ 注释描述:

A synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point. CyclicBarriers are useful in programs involving a fixed sized party of threads that must occasionally wait for each other. The barrier is called cyclic because it can be re-used after the waiting threads are released.
A CyclicBarrier supports an optional Runnable command that is run once per barrier point, after the last thread in the party arrives, but before any threads are released. This barrier action is useful for updating shared-state before any of the parties continue.

​ 允许一组线程等待对方到达同一个障碍点

​ 可以在释放等待线程后重新使用

​ 支持可选参数Runnable,在最后一个线程到达之后但是在所有线程释放之前,所有的线程都运行一遍

​ 先看下CyclicBarrier有哪些成员变量:

	//可重入锁
	private final ReentrantLock lock = new ReentrantLock();
    /** 需要等待的条件Condition */
    private final Condition trip = lock.newCondition();
    /** 计数器数量,因为是可循环的,所以这里需要保留一份最初的值 */
    private final int parties;
    /* 最后一个到达的线程要执行的任务 */
    private final Runnable barrierCommand;
    /** 当前的generation */
    private Generation generation = new Generation();
    /**
     * 本次循环中的计数器值,为0就唤醒全部线程
     */
    private int count;

​ 内部类generation,其实就是一个标识,标识本次循环状态,如果有线程被中断或者超时,又或者因为异常,broken就是true

    private static class Generation {
        boolean broken = false;
    }

两个构造函数:

public CyclicBarrier(int parties, Runnable barrierAction) 	//计数器与最后一个到达的线程要执行的任务
public CyclicBarrier(int parties)

几个重要方法:

 public int await() throws InterruptedException, BrokenBarrierException		
 public int await(long timeout, TimeUnit unit)					//有超时的等待
        throws InterruptedException,
               BrokenBarrierException,
               TimeoutException 
  public void reset() 										//重置屏障,会调用breakBarrier与nextGeneration方法
  private void breakBarrier()								//打破屏障,将broken标记为true,重置计数器,唤醒所有线程
  private void nextGeneration()								//重置计数器,唤醒所有线程,new一个新的Generation

如果await方法的线程不是最后一个线程,它将进入休眠状态,直到这四种情况之一被唤醒:

  • 最后一个线程到达

  • 其他线程中断当前线程

  • 其他线程中断了其余在等待的线程

  • 其他线程等待超时

  • 其他线程调用了reset方法

    CyclicBarrier的计数器递减其实是在隐藏在了await方法,await支持超时或者不超时,内部调用了doawait方法,我们具体看下实现逻辑:

在这里插入图片描述

​ 首先获取锁,然后计数器–;如果计数器–后为0,那么就会去执行我们传入的runnable,然后执行nextGeneration方法开启下一轮循环;如果在执行runnable方法中出错,就会去执行breakBarrier方法;如果当计数器不为0,就会继续等待(会根据是否允许超时判断处理逻辑);

使用举例

​ 一共8名小伙伴,都到了下班的时间了,大家都准备坐车回家。

​ 班车可以坐4个人,最后一个上车的人发现人满了可以发车后要跟司机喊一嗓子:车上人满啦,司机你快来开车送我们回家!

​ 等8名小伙伴都到家之后,再发出一个大家都平安到家的通知

public class CyclicBarrierDemo {
    public static void main(String[] args) throws InterruptedException {
        CountDownLatch end = new CountDownLatch(8);
        CyclicBarrier mid = new CyclicBarrier(4, new Runnable() {
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName() + " " +
                        " 统计了下人数,当前班车人满了出发喽");
            }
        });
        for (int i = 0; i < 8; i++) {
            new Thread(new Task(mid,end)).start();
        }
        end.await();
        System.out.println("8位小伙伴都到家啦!");
    }

    static class Task implements Runnable {
        private CyclicBarrier cyclicBarrier;
        CountDownLatch end;

        public Task(CyclicBarrier cyclicBarrier, CountDownLatch end) {
            this.cyclicBarrier = cyclicBarrier;
            this.end = end;
        }

        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName() + " 去做班车啦");
            try {
                Thread.sleep((long) (Math.random() * 10000));
                System.out.println("线程" + Thread.currentThread().getName() +
                        "到了班车乘车点,等待其他线程");
                try {
                    cyclicBarrier.await();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
                System.out.println("线程" + Thread.currentThread().getName() +
                        "   班车出发回家啦");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally {
                end.countDown();
            }
        }
    }
}

在这里插入图片描述
在这里插入图片描述


Condition

​ Lock+Condition在之前的章节实现过管程模型

源码分析

​ Condition是个接口,是由抽象类AQS实现的,主要就包括了三个方法:await、signal和signalAll。

​	[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-RhXPxmCa-1637571875738)(image-20211122150509621.png)]

​ await:释放掉锁;同时线程处于wait状态直到被其他的线程唤醒或中断

​ signal:如果有多个线程都在这个条件这里等待,随机挑去一个唤醒

​ signalAll:唤醒等待这个条件的全部线程

使用举例

栗子一

​ 证明await的线程处于wait状态

示例代码
public class ConditionDemo1 {
    private ReentrantLock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();

    public static void main(String[] args) throws InterruptedException {
        ConditionDemo1 conditionDemo1 = new ConditionDemo1();
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(1000);
                    conditionDemo1.method2();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
        conditionDemo1.method1();
    }

    void method1() throws InterruptedException {
        lock.lock();
        try {
            System.out.println("条件不满足,开始await");
            condition.await();
            System.out.println("条件满足了,开始执行后续的任务");
        } finally {
            lock.unlock();
        }
    }

    void method2() {
        lock.lock();
        try {
            System.out.println("准备工作完成,唤醒其他的线程");
            condition.signal();
        } finally {
            lock.unlock();
        }
    }
}

​ 在 condition.signal();处打断点,用Thread模式debug可以看到这时候的主线程状态是wait:

在这里插入图片描述

栗子二

​ 使用condition实现阻塞队列,阻塞队列有两个要求:

​ 队列为空不允许出队

​ 队列满不允许入队

​ 下面用Condition+ReetrantLock实现下由数组实现的有界阻塞队列

/**
 * @Description 用Condition实现阻塞队列
 * @Author Mirana
 * @Date 2021/11/22 15:32
 * @Version 1.0
 **/
public class ConditionBlockQueue<T> {
    final Lock lock = new ReentrantLock();
    //队列不空
    final Condition notEmpty = lock.newCondition();
    //队列不满
    final Condition notFull = lock.newCondition();
    //队列容量
    int capacity;
    //用数组实现的队列
    Object[] arr;
    //数组现在的容量
    int size;

    public ConditionBlockQueue(int capacity) {
        this.capacity = capacity;
        this.arr = new Object[capacity];
        this.size = 0;
    }

    public static void main(String[] args) throws InterruptedException {
        ConditionBlockQueue<Integer> conditionBlockQueue =
                new ConditionBlockQueue<>(1);

        new Thread(()->{
            System.out.println("你好,我是入队子线程:");
            try {
                conditionBlockQueue.offer(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("第一个元素成功入队");
            try {
                conditionBlockQueue.offer(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("第二个元素成功入队");
        }).start();
        try {
            Thread.sleep(10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        new Thread(() -> {
            System.out.println("你好,我是出队子线程:");
            System.out.println(Thread.currentThread().getName() + "开始出队啦~");
            try {
                conditionBlockQueue.poll();
                System.out.println(Thread.currentThread().getName() + "出队完毕~");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();

    }

    //入队
    void offer(T t) throws InterruptedException {
        lock.lock();
        try {
            while (size == capacity) {
                notFull.await();
            }
            //入队
            arr[size] = t;
            size++;
            //通知可以出队了
            notEmpty.signalAll();
        } finally {
            lock.unlock();
        }
    }

    //出队
    void poll() throws InterruptedException {
        lock.lock();
        try {
            while (size == 0) {
                notEmpty.await();
            }
            //出队
            size--;
            //通知可以入队了
            notFull.signalAll();
        } finally {
            lock.unlock();
        }
    }
}/**
 * @Description 用Condition实现阻塞队列
 * @Author Mirana
 * @Date 2021/11/22 15:32
 * @Version 1.0
 **/
public class ConditionBlockQueue<T> {
    final Lock lock = new ReentrantLock();
    //队列不空
    final Condition notEmpty = lock.newCondition();
    //队列不满
    final Condition notFull = lock.newCondition();
    //队列容量
    int capacity;
    //用数组实现的队列
    Object[] arr = new Object[capacity];
    //数组现在的容量
    int size;

    public ConditionBlockQueue(int capacity) {
        this.capacity = capacity;
    }

    //入队
    void offer(T t) throws InterruptedException {
        lock.lock();
        try {
            while (size == capacity) {
                notFull.await();
            }
            //入队
            size++;
            arr[size] = t;
            //通知可以出队了
            notEmpty.signalAll();
        } finally {
            lock.unlock();
        }
    }

    //出队
    void poll() throws InterruptedException {
        lock.lock();
        try {
            while (size == 0) {
                notEmpty.await();
            }
            //出队
            size--;
            //通知可以入队了
            notFull.signalAll();
        } finally {
            lock.unlock();
        }
    }
}

在这里插入图片描述

栗子三

​ 用Condition+PriorityQueue实现的生产者消费者模型,为什么是PriorityQueue呢,因为要选一个没有阻塞功能的队列

public class ConditionProducerConsumer {
    private int queueSize = 2;
    
    private PriorityQueue<Integer> queue =
            new PriorityQueue<>(queueSize);
    private Lock lock = new ReentrantLock();
    private Condition notFull = lock.newCondition();
    private Condition notEmpty = lock.newCondition();

    public static void main(String[] args) {
        ConditionProducerConsumer conditionDemo2 = new ConditionProducerConsumer();
        Producer producer = conditionDemo2.new Producer();
        Consumer consumer = conditionDemo2.new Consumer();
        producer.start();
        consumer.start();
    }

    class Producer extends Thread {
        @Override
        public void run() {
            produce();
        }

        private void produce() {
            while (true) {
                lock.lock();
                try {
                    while (queue.size() == queueSize) {
                        System.out.println("队列满了,等待消费数据");
                        try {
                            notFull.await();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    queue.offer(queue.size());
                    notEmpty.signalAll();
                    System.out.println("生产者生产了数据,队列剩余空间:" + (queueSize - queue.size()));
                } finally {
                    lock.unlock();
                }
            }
        }
    }

    class Consumer extends Thread {
        @Override
        public void run() {
            consume();
        }

        private void consume() {
            while (true) {
                lock.lock();
                try {
                    while (queue.size() == 0) {
                        System.out.println("队列为空,等待生产者生产数据");
                        try {
                            notEmpty.await();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    queue.poll();
                    notFull.signalAll();
                    System.out.println("消费者消费掉一笔数据,队列剩余:" + queue.size());
                } finally {
                    lock.unlock();
                }
            }
        }
    }
}

在这里插入图片描述

​ 没有设置需要生产消费几轮,所以就会一直生产消费。只要队列空,生产者就生产数据,消费者就消费数据

栗子四

​ Dubbo的同步转异步

​ 在TCP协议层面,发送完RPC请求后,线程数不会等待RPC的响应结果的,但是我们平常使用的RPC调用大多是同步的。那一定有人帮你做了同步转异步的事情。比如RPC框架——Dubbo:

​ 例如下面的例子,我们执行个sayHello方法,线程会停下来等待获取结果:

DemoService service = 初始化部分省略
String message = 
  service.sayHello("dubbo");
System.out.println(message);

​ 如果此時dump綫程,会发现调用线程阻塞住了,状态是TIMED_WAITING,阻塞在了DefaultFuture.get方法

在这里插入图片描述

​ 那我们继续查看DefaultFuture.get方法做了什么事情:

// 创建锁与条件变量
private final Lock lock 
    = new ReentrantLock();
private final Condition done 
    = lock.newCondition();

// 调用方通过该方法等待结果
Object get(int timeout){
  long start = System.nanoTime();
  lock.lock();
  try {
  while (!isDone()) {
    done.await(timeout);
      long cur=System.nanoTime();
    if (isDone() || 
          cur-start > timeout){
      break;
    }
  }
  } finally {
  lock.unlock();
  }
  if (!isDone()) {
  throw new TimeoutException();
  }
  return returnFromResponse();
}
// RPC结果是否已经返回
boolean isDone() {
  return response != null;
}
// RPC结果返回时调用该方法   
private void doReceived(Response res) {
  lock.lock();
  try {
    response = res;
    if (done != null) {
      done.signal();
    }
  } finally {
    lock.unlock();
  }
}

​ 当RPC结果返回时,会调用doReceived方法,在这个方法里:首先lock获取锁,然后在finally里释放锁。获取锁后,通过signal方法通知调用线程,结果已经返回,你可以继续执行了。


总结

​ Lock&Condition相比于synchronized+wait+notify,可以更灵活的实现管程;同时Dubbo中同步转异步也是通过Condition实现的;

​ Semaphore也是互斥方案之一,可以用信号量实现限流器——Hytrix的信号量隔离也是这么做的;

​ CountDownLatch与CyclicBarrier则是Java并发包提供的线程同步工具类,二者相似之处在于它们内部都维护了一个计数器,当计数器=0,就唤醒等待的线程;它们区别在于:

​ CountDownLatch适用于一个线程等待多个线程;CyclicBarrier适用于一组线程之间互相等待

​ CyclicBarrier的计数器是可以循环利用的,CountDownLatch不可以,当计数器为0了,下个线程会直接通过

​ CountDownLatch内部的计数器递减实际上是CAS实现的,需要我们手动调用CountDownLatch.countDown;CyclicBarrier则是通过可重入锁ReentrantLock,并且它在await方法中递减计数器:如果当前线程是最后一个线程,如果我们传入了回调函数,当前线程就会去执行回调函数,然后唤醒所有的线程

相关文章:

  • 并发编程——并发容器
  • 消息队列笔记
  • Kafka核心概念与源码阅读
  • JVM调优与线上问题监控工具安利
  • Kafka的事务实现
  • Kafka的高可靠性保证
  • Kafka集群
  • 线程池の优雅使用
  • 优雅的退出
  • 分布式架构演进
  • synchronized关键字
  • 分布式锁的几种实现方式
  • 延时队列的几种实现方式(只有原理,并没有源码)
  • DDD整理(概念篇)
  • DDD的分层架构设计
  • 【许晓笛】 EOS 智能合约案例解析(3)
  • exports和module.exports
  • FineReport中如何实现自动滚屏效果
  • js如何打印object对象
  • leetcode-27. Remove Element
  • mysql外键的使用
  • nodejs实现webservice问题总结
  • Redis字符串类型内部编码剖析
  • SQL 难点解决:记录的引用
  • ViewService——一种保证客户端与服务端同步的方法
  • 初识 webpack
  • 动态魔术使用DBMS_SQL
  • 使用前端开发工具包WijmoJS - 创建自定义DropDownTree控件(包含源代码)
  • 通过几道题目学习二叉搜索树
  • 由插件封装引出的一丢丢思考
  • 你对linux中grep命令知道多少?
  • 400多位云计算专家和开发者,加入了同一个组织 ...
  • 扩展资源服务器解决oauth2 性能瓶颈
  • ​MySQL主从复制一致性检测
  • ​Python 3 新特性:类型注解
  • # 手柄编程_北通阿修罗3动手评:一款兼具功能、操控性的电竞手柄
  • (附源码)ssm码农论坛 毕业设计 231126
  • (免费领源码)python+django+mysql线上兼职平台系统83320-计算机毕业设计项目选题推荐
  • (七)理解angular中的module和injector,即依赖注入
  • (一)Thymeleaf用法——Thymeleaf简介
  • (原创)攻击方式学习之(4) - 拒绝服务(DOS/DDOS/DRDOS)
  • (转载)OpenStack Hacker养成指南
  • .[backups@airmail.cc].faust勒索病毒的最新威胁:如何恢复您的数据?
  • .gitattributes 文件
  • .net core 源码_ASP.NET Core之Identity源码学习
  • .NET DataGridView数据绑定说明
  • .net 简单实现MD5
  • .net 托管代码与非托管代码
  • .Net面试题4
  • .net中的Queue和Stack
  • .NET中使用Protobuffer 实现序列化和反序列化
  • [acwing周赛复盘] 第 94 场周赛20230311
  • [CDOJ 838]母仪天下 【线段树手速练习 15分钟内敲完算合格】
  • [CTO札记]盛大文学公司名称对联
  • [Machine Learning][Part 7]神经网络的基本组成结构