当前位置: 首页 > news >正文

22-08-30 西安JUC(03) Callable接口、阻塞队列4套方法、ThreadPool线程池

Callable接口

1、为什么使用Callable接口

Thread和Runnable

都有的缺点:启动子线程的线程 不能获取子线程的执行结果,也不能捕获子线程的异常

从java5开始,提供了Callable接口,是Runable接口的增强版。用Call()方法作为线程的执行体,增强了之前的run()方法。因为call方法可以有返回值,也可以声明抛出异常。

1.Runnable方式创建:在主线程里捕获子线程的异常? 不可以。

try {
    //Runnable方式:主线程无法捕获子线程执行的异常
    new Thread(()->{
            int i = 1/0;
            String result = "jieguo";
    }).start();
} catch (Exception e) {
    System.out.println("主线程获取到了子线程异常:"+ e.getMessage());
}

根据控制台打印,在main线程里并没有捕获到子线程出现的异常


2、使用Callable创建线程并运行

Callable接口中注意点: 可以使用泛型<V>指定返回值类型,并且call方法可以抛出异常

@FunctionalInterface
public interface Callable<V> {
    V call() throws Exception;
}

使用Callable创建的子线程需要借助FutureTask对象来执行它的call方法

无论哪种方式创建多线程 都必须借助Thread对象的start方法启动线程,Thread只能接受Runnable对象: thread.run()-> runnable.run()

public static void main(String[] args) {
    Callable<String> callable = ()->{
        System.out.println("callable的call方法执行了.....");
        return "hehe....";
    };
    //juc包中提供了FutureTask  间接实现了Runnable接口,并实现了run方法
    new Thread(new FutureTask<String>(callable)).start();
}

1.Thread 调用了start方法后,系统CPU调度执行线程的run方法,run方法中判断 传入的runnable对象如果不为空则调用它的run方法。


2.我们传入的runnable对象是FutureTask的对象,所以调用的是FutureTask的run方法


3.FutureTask的run方法执行时,调用了我们传入的Callable对象的call方法执行 并接受返回的结果

总的来说:就是移花接木,FutureTask间接实现了Runnable接口 并实现了run方法:run方法中调用了Callable的call方法

java.util.concurrent.FutureTask

1.juc包中提供了FutureTask  间接实现了Runnable接口,并实现了run方法

public class FutureTask<V> implements RunnableFuture<V> 

public interface RunnableFuture<V> extends Runnable, Future<V> {
    void run();
}

2.FutureTask的run方法执行时的结果 和异常 会通过FutureTask的成员属性接收,并通过一个布尔类型的标记记录执行是否有异常

//结果和异常使用同一个变量接收
private Object outcome;

3.FutureTask中会在run方法执行结束时  将线程执行的状态从0(线程还未执行完毕)改为1(线程执行结束)

private volatile int state;
private static final int NEW          = 0;
private static final int COMPLETING   = 1;

3、FutureTask的get()方法

在主线程中需要执行比较耗时的操作时,但又不想阻塞主线程时,可以把这些作业交给Future对象在后台完成,当主线程将来需要时,就可以通过Future对象获得后台作业的计算结果或者执行状态。

callable可以返回方法的执行结果:通过 futureTask的get方法 阻塞获取返回结果

public static void main(String[] args) {
    //Callable:获取执行结果+捕获异常
    Callable<String> callable = ()->{
        System.out.println(Thread.currentThread().getName()+"执行了call方法..");
        return "haha....";
    };
    //FutureTask它的泛型跟Callable的泛型要一样,因为都是代表该子线程的执行结果类型
    FutureTask<String> futureTask = new FutureTask<String>(callable);
    new Thread(futureTask,"AA").start();
    try {
        //获取子线程执行的结果   调用get方法会阻塞主线程,所以一定要将获取子线程结果的操作写在方法的最后
        String result = futureTask.get();
        System.out.println("主线程获取到的子线程结果:"+futureTask.get());
    } catch (Exception e) {//捕获子线程执行的异常
        System.out.println(Thread.currentThread().getName()+"  获取到子线程的异常:"+e.getMessage());
    }
}

在使用futureTask的get方法时,要去捕获异常,可以获取子线程的异常

    public static void main(String[] args) {
        //Callable:获取执行结果+捕获异常
        Callable<String> callable = ()->{
            int i = 1/0;
            System.out.println(Thread.currentThread().getName()+"执行了call方法..");
            return "haha....";
        };

        //FutureTask它的泛型跟Callable的泛型要一样,因为都是代表该子线程的执行结果类型
        FutureTask<String> futureTask = new FutureTask<String>(callable);

        new Thread(futureTask,"AA").start();

        try {
            //获取子线程执行的结果   调用get方法会阻塞主线程,所以一定要将获取子线程结果的操作写在方法的最后
            String result = futureTask.get();
            System.out.println("主线程获取到的子线程结果:"+futureTask.get());

        } catch (Exception e) {//捕获子线程执行的异常
            System.out.println(Thread.currentThread().getName()+"  获取到子线程的异常:"+e.getMessage());
        }
    }

控制台打印:

为什么说get方法会阻塞主线程呢?

//以下为FutureTask的源码
public V get() throws InterruptedException, ExecutionException {
    int s = state;
    if (s <= COMPLETING)
        s = awaitDone(false, 0L);
    return report(s);
}

开发时:一定要把获取子线程结果的位置放在方法的最后


 4、FutureTask的复用

只计算一次,FutureTask会复用之前计算过得结果

 public static void main(String[] args) {
     //FutureTask复用问题:  执行异步任务时 为了提高效率它会缓存执行结果
     FutureTask<String> futureTask = new FutureTask<>(() -> {
         System.out.println(Thread.currentThread().getName()+"...");
         return "haha..";
     });
     new Thread(futureTask,"AA").start();
     new Thread(futureTask,"BB").start();
 }
执行异步任务时 为了提高效率它会缓存执行结果。。所以BB线程是从缓存拿的,并没有去走call方法

不想复用之前的计算结果。怎么办?再创建一个FutureTask对象即可

public static void main(String[] args) {
    //FutureTask复用问题:  执行异步任务时 为了提高效率它会缓存执行结果
    FutureTask<String> futureTask = new FutureTask<>(() -> {
        System.out.println(Thread.currentThread().getName()+"...");
        return "haha..";
    });
    FutureTask<String> futureTask2 = new FutureTask<>(() -> {
        System.out.println(Thread.currentThread().getName()+"...");
        return "haha..";
    });
    
    new Thread(futureTask,"AA").start();
    new Thread(futureTask2,"BB").start();
}


5、Callable接口与Runnable接口的区别

老师版:

1、callable有返回值   可以抛出异常
2、runnable可以直接通过Thread启动
3、callable需要通过FutureTask来接收,再由Thread启动
4、callable的任务方法时call(),runnable的是run()

笔记版:

相同点:都是接口,都可以编写多线程程序,都采用Thread.start()启动线程

不同点:

  1. 具体方法不同:一个是run,一个是call

  2. Runnable没有返回值;Callable可以返回执行结果,是个泛型

  3. Callable接口的call()方法允许抛出异常;Runnable的run()方法异常只能在内部消化,不能往上继续抛

  4. 它提供了检查计算是否完成的方法,以等待计算的完成,并检索计算的结果。


阻塞队列

1、队列 queue 和 栈 stack

queue: 先进先出,后进后出。怎么容易理解呢:左进右出

线程池使用、mq也使用了

---------------------

Stack: 特点 先进后出 后进先出

public class Stack<E> extends Vector<E>{// Stack就是一个集合类  是一个线程安全的集合类
	//1、入栈方法
    public E push(E item) {
        addElement(item);
        return item;
    }
    // 数组:连续的一块内存,按照添加的索引先后顺序有序
    // Vector中的方法:添加元素到 elementData元素数组中
    public synchronized void addElement(E obj) {
        modCount++;
        ensureCapacityHelper(elementCount + 1);
        elementData[elementCount++] = obj;//将元素添加到elementData数组的最后一个位置
    }
    //2、出栈方法
    public synchronized E pop() {
        E       obj;
        int     len = size();
        obj = peek();
        removeElementAt(len - 1);//将获取到的最后一个位置元素删除

        return obj;
    }
    // 获取数组最后一个索引的元素 返回
    public synchronized E peek() {
        int     len = size();//获取元素个数

        if (len == 0)
            throw new EmptyStackException();
        return elementAt(len - 1);//数组长度-1  也就是数组最后一个位置的元素
    }
}

扩展

方法栈:java代码执行时,每个线程jvm会为他创建一个栈来存储线程调用方法的执行过程数据

线程方法栈。每个方法都是一个栈帧


2、阻塞队列 BlockingQueue

线程池用来存不能及时处理的任务的数据结构

BlockingQueue:阻塞队列,看名字就知道可以解决并发问题。

在开发中我们可以不用关心向队列中添加元素的线程  如果队列满了 它如何阻塞等待  获取队列中元素的线程 如果队列空了 它如何阻塞等待  以及阻塞的线程如何被唤醒

BlockingQueue是一个接口,继承Queue接口,Queue接口继承 Collection接口

BlockingQueue接口主要有以下7个实现类

  1. ArrayBlockingQueue:由数组结构组成的有界阻塞队列。

  2. LinkedBlockingQueue:由链表结构组成的有界(但大小默认值为integer.MAX_VALUE)阻塞队列。

  3. PriorityBlockingQueue:支持优先级排序的无界阻塞队列。

  4. DelayQueue:使用优先级队列实现的延迟无界阻塞队列。

  5. SynchronousQueue:不存储元素的阻塞队列,也即单个元素的队列。

  6. LinkedTransferQueue:由链表组成的无界阻塞队列。

  7. LinkedBlockingDeque:由链表组成的双向阻塞队列。

线程池使用阻塞队列

  1. ArrayBlockingQueue: 数组阻塞队列
  2. LinkedBlockingQueue:链表阻塞队列
  3. SynchronousQueue:同步阻塞队列(不存储元素)

ArrayBlockingQueue:创建时手动指定的长度就是该队列的最大的长度

LinkedBlockingQueue:
        默认长度:Integer.MAX_VALUE   最多存储21亿左右的元素

阻塞队列:添加元素  获取元素 移除元素的方法有4套,根据方法是否返回结果 是否抛出异常 是否可以阻塞 是否可以阻塞超时划分

抛出异常特殊值阻塞超时
插入add(e)offer(e)put(e)offer(e, time, unit)
移除remove()poll()take()poll(time, unit)
获取element()peek()不可用不可用

要是看着懵,是因为没继续看下去,看完再回来看就一目了然了


3、抛出异常的方法

add正常执行返回true,element(不删除)和remove正常执行会返回阻塞队列中的第一个元素 ​

  • 当阻塞队列满时,再往队列里add插入元素会抛IllegalStateException:Queue full ​
  • 当阻塞队列空时,再往队列里remove移除元素会抛NoSuchElementException ​
  • 当阻塞队列空时,再调用element检查元素会抛出NoSuchElementException

add():添加元素失败抛出异常

public static void main(String[] args) {
    //数组阻塞队列初始化时需要传入  初始化数组的长度
    ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<String>(3);
    System.out.println("1:"+queue.add("a"));
    System.out.println("2:"+queue.add("b"));
    System.out.println("3:"+queue.add("c"));
    System.out.println("4:"+queue.add("d"));
}

----------------------

remove():移除成功返回移除的数据 移除失败抛出异常

public static void main(String[] args) {
    //数组阻塞队列初始化时需要传入  初始化数组的长度
    ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<String>(3);
    System.out.println("1:"+queue.add("a"));
    System.out.println("2:"+queue.add("b"));
    System.out.println("3:"+queue.add("c"));
    //移除成功返回移除的数据
    System.out.println(queue.remove());
    System.out.println(queue.remove());
    System.out.println(queue.remove());
    //移除失败抛出异常
    System.out.println(queue.remove());
}


--------------------------

element():获取最先添加的元素 获取不到抛出异常

 public static void main(String[] args) {
     //数组阻塞队列初始化时需要传入  初始化数组的长度
     ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<String>(3);
     //获取最先添加的元素,即左进右出情况下,获取最右边的元素,获取不到抛出异常
     System.out.println(queue.element());
     System.out.println("1:"+queue.add("a"));
 }


 4、返回特殊值的方法

offer()/poll()/peek()

offer() 插入方法,成功ture失败false

 public static void main(String[] args) {
     //数组阻塞队列初始化时需要传入  初始化数组的长度
     ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<String>(3);
     //添加成功ture,添加失败false
     System.out.println(queue.offer("a"));
     System.out.println(queue.offer("b"));
     System.out.println(queue.offer("c"));
     System.out.println(queue.offer("d"));
 }

------------------------------- 

poll() 移除方法,成功返回出队列的元素,队列里没有就返回null

public static void main(String[] args) {
    //数组阻塞队列初始化时需要传入  初始化数组的长度
    ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<String>(3);
    //添加成功ture,添加失败false
    System.out.println(queue.offer("a"));
    System.out.println(queue.offer("b"));
    System.out.println(queue.offer("c"));
    //移除元素成功返回出队列的元素,移除失败返回null
    System.out.println(queue.poll());
    System.out.println(queue.poll());
    System.out.println(queue.poll());
    System.out.println(queue.poll());
}

-------------------------------

peek() 获取方法,成功返回队列中的元素,没有返回null

public static void main(String[] args) {
    //数组阻塞队列初始化时需要传入  初始化数组的长度
    ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<String>(3);
    //获取成功返回队列中的最右边的元素,没有返回null
    System.out.println(queue.peek());
    
    //添加成功ture,添加失败false
    System.out.println(queue.offer("a"));
    System.out.println(queue.offer("b"));
    System.out.println(queue.offer("c"));
    //获取成功返回队列中的最右边的元素,没有返回null
    System.out.println(queue.peek());
}


5、阻塞等待方法

put() / take()

如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行。

当阻塞队列满时,再往队列里put元素,队列会一直阻塞生产者线程直到put数据or响应中断退出 ​ 当阻塞队列空时,再从队列里take元素,队列会一直阻塞消费者线程直到队列可用

public static void main(String[] args) throws InterruptedException {
    //数组阻塞队列初始化时需要传入  初始化数组的长度
    ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<String>(3);
    //put给队列中从添加元素,左进右出
    queue.put("a");
    queue.put("b");
    queue.put("c");
    System.out.println("take前,队列大小:"+queue.size());
    
    //take取走队列中最右面的元素
    System.out.println(queue.take());
    System.out.println("take后,队列大小:"+queue.size());
}


6、超时等待方法

offer(  timeout) /poll(timeout)

  • offer( timeout):阻塞等待添加元素,成功返回true 超时失败返回false
  • poll(timeout): 超时不能移除元素返回null
public static void main(String[] args) throws InterruptedException {
    //数组阻塞队列初始化时需要传入  初始化数组的长度
    ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<String>(3);
    //poll(timeout): 超时不能移除元素返回null
    System.out.println(queue.poll(3, TimeUnit.SECONDS));
    //offer(  timeout):阻塞等待添加元素,成功返回true  超时失败返回false
    System.out.println(queue.offer("aa", 3, TimeUnit.SECONDS));
    System.out.println(queue.offer("cc", 3, TimeUnit.SECONDS));
    System.out.println(queue.offer("bb", 3, TimeUnit.SECONDS));
    System.out.println(queue.offer("dd", 3, TimeUnit.SECONDS));
}

如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行,但等待时间不会超过给定值。 ​


7、阻塞等待方法的源码

put() / take()

请欣赏我的画作,以我的理解应该是这么画的。。。

put:
添加元素时使用ReentrantLock加锁
如果队列的长度等于队列中存入元素的个数代表队列已满,当前线程使用notFull.await()进入阻塞等待状态 

//以下为ArrayBlockingQueue的源码
public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == items.length)
            notFull.await();
        enqueue(e);
    } finally {
        lock.unlock();
    }
}

如果队列的长度不等于队列中存入元素的个数代表队列未满,当前线程将元素添加到队列数组中,notEmpty.signal()唤醒等待消费队列中元素的线程

//以下为ArrayBlockingQueue的源码
private void enqueue(E x) {
    // assert lock.getHoldCount() == 1;
    // assert items[putIndex] == null;
    final Object[] items = this.items;
    items[putIndex] = x;
    if (++putIndex == items.length)
        putIndex = 0;
    count++;
    notEmpty.signal();
}

take:
移除元素时使用Lock加锁
如果队列中元素个数为0,代表队列是空的,当前线程使用notEmpty.await()让自己等待

//以下为ArrayBlockingQueue的源码
public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0)
            notEmpty.await();
        return dequeue();
    } finally {
        lock.unlock();
    }
}

如果队列中元素个数>0,队列中有元素,当前线程获取元素返回 并调用notFull.signal()唤醒向队列添加元素阻塞的线程

//以下为ArrayBlockingQueue的源码
private E dequeue() {
    // assert lock.getHoldCount() == 1;
    // assert items[takeIndex] != null;
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    E x = (E) items[takeIndex];
    items[takeIndex] = null;
    if (++takeIndex == items.length)
        takeIndex = 0;
    count--;
    if (itrs != null)
        itrs.elementDequeued();
    notFull.signal();
    return x;
}

ThreadPool线程池

1、线程池架构

2、工具类Executors

3、自定义线程池

4、拒绝策略

5、线程池的线程数如何设置

相关文章:

  • React(8)-组件ref
  • 2022/8/30
  • picoCTF - Day 1 - Warm up
  • 前端面试题之组件
  • 自己动手写编译器:词法解析的系统化研究
  • 【程序员面试金典】01.02. 判定是否互为字符重排
  • go实现剑指offer
  • 【Go-Lua】Golang嵌入Lua代码——gopher-lua
  • yolov5+shufflenet轻量化目标检测
  • 【BurpSuite】插件开发学习之J2EEScan(上)-被动扫描
  • java计算机毕业设计企业公开招聘系统源码+数据库+系统+lw文档+mybatis+运行部署
  • 赛事开源Baseline参考目录格式
  • C++设计模式之Bridge桥模式
  • Kibana-8.4.0-Linux安装
  • @hook扩展分析
  • ----------
  • -------------------- 第二讲-------- 第一节------在此给出链表的基本操作
  • 08.Android之View事件问题
  • C语言笔记(第一章:C语言编程)
  • Docker入门(二) - Dockerfile
  • Making An Indicator With Pure CSS
  • Python 反序列化安全问题(二)
  • SegmentFault 技术周刊 Vol.27 - Git 学习宝典:程序员走江湖必备
  • Vim 折腾记
  • XForms - 更强大的Form
  • 阿里中间件开源组件:Sentinel 0.2.0正式发布
  • 闭包--闭包之tab栏切换(四)
  • 发布国内首个无服务器容器服务,运维效率从未如此高效
  • 互联网大裁员:Java程序员失工作,焉知不能进ali?
  • 今年的LC3大会没了?
  • 数组的操作
  • 运行时添加log4j2的appender
  • 树莓派用上kodexplorer也能玩成私有网盘
  • ​2020 年大前端技术趋势解读
  • #【QT 5 调试软件后,发布相关:软件生成exe文件 + 文件打包】
  • (03)光刻——半导体电路的绘制
  • (pojstep1.1.1)poj 1298(直叙式模拟)
  • (TipsTricks)用客户端模板精简JavaScript代码
  • (保姆级教程)Mysql中索引、触发器、存储过程、存储函数的概念、作用,以及如何使用索引、存储过程,代码操作演示
  • (补)B+树一些思想
  • (附源码)springboot码头作业管理系统 毕业设计 341654
  • (附源码)ssm高校实验室 毕业设计 800008
  • (十)DDRC架构组成、效率Efficiency及功能实现
  • (转)jdk与jre的区别
  • (转)Sql Server 保留几位小数的两种做法
  • (转)从零实现3D图像引擎:(8)参数化直线与3D平面函数库
  • .NET Compact Framework 多线程环境下的UI异步刷新
  • .net core控制台应用程序初识
  • .NET 将多个程序集合并成单一程序集的 4+3 种方法
  • .NET 使用配置文件
  • @EnableConfigurationProperties注解使用
  • [ARC066F]Contest with Drinks Hard
  • [ASP.NET MVC]Ajax与CustomErrors的尴尬
  • [CERC2017]Cumulative Code
  • [DevEpxress]GridControl 显示Gif动画