当前位置: 首页 > news >正文

并发编程应用——线程池中的ThreadLocal变量传递

本文讲述下在项目开发过程中遇到的实际问题。

问题

问题描述

​ 之前搞得项目是SpringCloud开发的微服务项目,用户登录之后把用户信息塞到header里给我们,然后后端统一在网关拦截,读取用户ID然后塞到用户上下文中。但是我们在改造项目的时候发现个问题,莫名其妙的用户信息丢失了

​ 经过排查,发现是因为SpringCloud的Hystrix隔离策略默认是线程池隔离,但是为啥是线程池隔离就会丢失用户信息呢?

原因分析

​ 在前面其实有讲过,信号量跟管程都是操作系统的同步互斥方案,那这里Hytrix我们都知道是SpringCloud防止微服务雪崩的熔断器组件,它的隔离策略也有一个信号量隔离,跟前面同步互斥的信号量是一个东西吗?没错,是的。首先看下Hytrix的隔离策略是做什么的,隔离策略,是限制对共享资源访问的并发量,当线程池/信号已满,就立刻拒绝请求,转入服务降级模式。没错,Hytrix不仅仅可以在服务出现故障的时候降级,也可以在访问并发量过高的时候降级。

​ 那它的隔离策略就很容易理解了:

  • 线程池隔离:不同服务的执行使用不同的线程池,同时将用户请求的线程与具体业务执行的线程分开

  • 信号量隔离:总共允许访问的并发量就是一共多少个通行证,每次访问先tryAcquire,如果发现通行证没了就拒绝请求,进入降级模式。用户请求线程与业务执行线程是同一个线程。

​ 用线程池与new出来的线程又有什么区别呢?我们使用线程池而不是new线程的目的,就是因为避免创建与销毁线程的开销,想要重复使用线程。问题就出在这里,我们在分析ThreadLocal的时候知道,子线程是可以获取到父线程init方法初始化的值的,但是父线程set的值,子线程的ThreadLocal是获取不到的,这是第一个问题;第二个问题,假设我们通过了某种手段让子线程获取到了父线程set的ThreadLocal值,那么使用线程池的时候,线程是重复使用的。这就意味着最开始创建的线程确实可以获取父线程的ThreadLocal,但是后面重复使用的线程如果不做处理,就会一直拿着创建最开始线程的父线程的ThreadLocal。

​ 可能有点绕,我们看下下面的例子:


demo测试

​ 我们来写几个demo,分别验证:

  • 直接new出来的子线程是否可以获取父线程设置的ThreadLocal变量的值

  • 使用线程池的子线程是否可以获取父线程设置的ThreadLocal变量的值

    再分别看下如何解决

测试一

测试目的

​ 直接new出来的子线程是否可以获取父线程设置的ThreadLocal变量的值

测试代码

public class UnSafeId {
    static ThreadLocal<String> userId = new ThreadLocal<>();

    public static void main(String[] args) {
        userId.set("77");
        System.out.println(Thread.currentThread().getName() + " userId : " + userId.get());

        new Thread(() -> {
            System.out.println(Thread.currentThread().getName() + " userId : " + userId.get());
        }).start();

    }
}

测试结果

​ 直接new出来的子线程不能获取父线程设置的ThreadLocal变量的值

解决方案

解决方案

​ 使用InheritableThreadLocal

代码示例
public class UnSafeId {
    static ThreadLocal<String> userId = new ThreadLocal<>();
    static InheritableThreadLocal<String> address=
            new InheritableThreadLocal<>();
    public static void main(String[] args) {
        userId.set("77");
        address.set("GuangDongProvince");
        System.out.println(Thread.currentThread().getName() + " userId : " + userId.get());
        System.out.println(Thread.currentThread().getName() + " userId : " + address.get());

        new Thread(() -> {
            System.out.println(Thread.currentThread().getName() + " userId : " + userId.get());
            System.out.println(Thread.currentThread().getName() + " userId : " + address.get());
        }).start();

    }
}
源码分析

​ 我们在子线程address.get()这里,用线程debug方式查看子线程的初始化过程,会发现在Thread类的init方法中有这样一段:

在这里插入图片描述

​ 也就是说,子线程在初始化的时候会查看父线程是否有InheritableThreadLocal,如果有,就用父线程InheritableThreadLocal的值创建当前线程的ThreadLocalMap,所以子线程就可以取到父线程的InheritableThreadLocal的值了。


测试二

测试目的

​ 使用线程池的子线程是否可以获取父线程设置的ThreadLocal变量的值(需要注意,不同的线程池创建的线程方式也不同)

​ 可能你会说,既然new线程你可以用InheritableThreadLocal解决,那么线程池用InheritableThreadLocal不是一样吗?

测试代码

public class ThreadPoolTest {
    static InheritableThreadLocal<String> userId = new InheritableThreadLocal<>();

    public static void main(String[] args) {
        userId.set("77");
        System.out.println(Thread.currentThread().getName() + " userId : " + userId.get());

        ExecutorService executorService = Executors.newFixedThreadPool(2);
        executorService.execute(() -> {
            System.out.println(Thread.currentThread().getName() + " userId : " + userId.get());
        });
        userId.set("777");
        executorService.execute(() -> {
            System.out.println(Thread.currentThread().getName() + " userId : " + userId.get());
        });
        userId.set("7777");
        executorService.execute(() -> {
            System.out.println(Thread.currentThread().getName() + " userId : " + userId.get());
        });
        executorService.shutdown();

    }
}

测试结果

在这里插入图片描述

结果分析

​ 主线程将ThreadLocal设置为了77,然后创建了固定线程数,线程数为2的线程池

​ 跑第一个任务,成功获取到了主线程的ThreadLocal:77,没问题。然后将主线程的ThreadLocal改为777,跑第二个任务,也没问题,成功获取到了,然后主线程将ThreadLocal改为7777,接下来继续跑第三个任务,哦豁,怎么还是77呢?

​ 这是因为线程的init方法是在子线程初始化的时候,把主线程的InheritableThreadLocal拿来放在自己的ThreadLocalMap里。那线程池是什么?我们为了防止重复的创建销毁线程,搞了个线程池,可以重复利用线程。对,问题就出现在重复使用上(除非你用的是newCachedThreadPool,但这跟没使用线程池又有什么区别呢?),第一个第二个任务可以正常的获取ThreadLocal,是因为我们设定的固定线程池的核心线程数量是2,这就意味着第一个第二个任务都是主线程新拉起来的线程,那自然可以获取到,而第三个任务是重复使用之前的线程,那主线程此时的ThreadLocal自然就丢了。


解决方案

解决方案一

​ 我们可以对Runnable做一下包装:

代码示例
public class ThreadPoolTestSolution {

    public static void main(String[] args) {
        ThreadLocalHolder.userId.set("77");
        System.out.println(Thread.currentThread().getName() + " userId : " + ThreadLocalHolder.userId.get());

        ExecutorService executorService = Executors.newFixedThreadPool(2);
        executorService.execute(WrapRunnable.wrap(new RunnableTest()));

        ThreadLocalHolder.userId.set("777");
        executorService.execute(WrapRunnable.wrap(new RunnableTest()));

        ThreadLocalHolder.userId.set("7777");
        executorService.execute(WrapRunnable.wrap(new RunnableTest()));
        executorService.shutdown();

    }
}

class RunnableTest implements Runnable {

    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName() + " userId : " + ThreadLocalHolder.userId.get());
    }
}

class ThreadLocalHolder {
    static ThreadLocal<String> userId = new ThreadLocal<>();
}

class WrapRunnable {

    //把runnable包装一下
    public static Runnable wrap(Runnable runnable) {
        //获取当前线程的ThreadLocalHolder
        String currentThreadLocal = ThreadLocalHolder.userId.get();
        return new Runnable() {
            @Override
            public void run() {
                try {
                    //塞到子线程的ThreadLocalHolder
                    ThreadLocalHolder.userId.set(currentThreadLocal);
                    //继续执行原来Runnable的run方法
                    runnable.run();
                } finally {
                    //最终清空下ThreadLocalHolder,防止线程池重复使用线程带来的ThreadLocal数据遗留问题
                    ThreadLocalHolder.userId.remove();
                }
            }
        };
    }
}

测试结果

在这里插入图片描述

结果分析

​ 这种解决方案就是将所有线程执行的Runnable方法进行了包装,在包装方法中,就是取出当前线程上下文,然后塞给子线程,然后执行run方法,最后清空子线程的上下文。但是这样所有的Runnable方法都需要调用包装函数,还是有点麻烦。


解决方案二(Ttl示例一)

​ 使用阿里巴巴开源的ransmittable-thread-local

代码示例
public class TransmittableThreadLocalTest {

    public static void main(String[] args) {
        UserIdHolder.userId.set("77");
        System.out.println(Thread.currentThread().getName() + " userId : " + UserIdHolder.userId.get());
		//ttl对线程池做了包装
        ExecutorService executorService = Executors.newFixedThreadPool(1);
        executorService = TtlExecutors.getTtlExecutorService(executorService);
        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName() + " userId : " + UserIdHolder.userId.get());
            }
        };
        executorService.execute(runnable);

        UserIdHolder.userId.set("777");
        executorService.execute(runnable);

        UserIdHolder.userId.set("7777");
        executorService.execute(runnable);
        executorService.shutdown();
    }
}

class UserIdHolder {
    //使用ttl的ThreadLocal
    static TransmittableThreadLocal<String> userId = new TransmittableThreadLocal<>();
}
测试结果

在这里插入图片描述

Ttl示例2:

​ 使用Ttl修饰的Runnable:

代码示例:
public class TransmittableThreadLocalTest2 {

    public static void main(String[] args) {
        UserIdHolder.userId.set("77");
        System.out.println(Thread.currentThread().getName() + " userId : " + UserIdHolder.userId.get());

        ExecutorService executorService = Executors.newFixedThreadPool(1);
        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName() + " userId : " + UserIdHolder.userId.get());
            }
        };
        
        //TtlRunnable ttlRunnable = TtlRunnable.get(runnable);
      //  executorService.execute(ttlRunnable);
        executorService.execute( TtlRunnable.get(runnable));

        UserIdHolder.userId.set("777");
      //  executorService.execute(ttlRunnable);
        executorService.execute( TtlRunnable.get(runnable));

        UserIdHolder.userId.set("7777");
      //  executorService.execute(ttlRunnable);
        executorService.execute( TtlRunnable.get(runnable));
        executorService.shutdown();
    }
}

测试结果

在这里插入图片描述

结果分析

​ ttl的使用用ttl修饰的线程池跟runnable都行,不过线程池的方式要好一点,因为TtlRunnable的原理跟上面wrapRunnable差不多,因为它也是在TtlRunnable.get方法里塞上下文的,就需要每次new Runnable都调用一下,麻烦。


官方做法

TaskDecorator

​ 这个装饰器其实就是我们自己实现的WrapRunnable,看下源码,是不是一毛一样:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-SqhXnMdV-1637572707999)(image-20211121173740861.png)]

总结

​ 在开发中,我们往往使用ThreadLocal来存放一些上下文啊,用户信息之类的数据,但是如果使用到了线程池,如果直接用的ThreadLoal,父线程的数据就会丢失,如果使用的InheritableThreadLocal,new的Thread中数据不会丢失,但是线程池跑的线程,当线程重复使用时,数据会残留。

​ 解决思想:对线程要执行的runnable方法进行包装,把当前线程的上下文塞进去,然后再清除掉就可以啦。
解决方法:

​ 成熟的框架

  • 阿里巴巴开源的ransmittable-thread-local,TtlExecutors(推荐)或者TtlRunnable

  • 如果是为了微服务开发中链路追踪将TraceId传递下去,可以使用Sl4j实现的MDC

​ 自己封装(这两种方法本质上都一样):

  • 包装Runnable
  • 使用TaskDecorator

相关文章:

  • 并发编程——线程协作
  • 并发编程——并发容器
  • 消息队列笔记
  • Kafka核心概念与源码阅读
  • JVM调优与线上问题监控工具安利
  • Kafka的事务实现
  • Kafka的高可靠性保证
  • Kafka集群
  • 线程池の优雅使用
  • 优雅的退出
  • 分布式架构演进
  • synchronized关键字
  • 分布式锁的几种实现方式
  • 延时队列的几种实现方式(只有原理,并没有源码)
  • DDD整理(概念篇)
  • SegmentFault for Android 3.0 发布
  • 【Linux系统编程】快速查找errno错误码信息
  • 【附node操作实例】redis简明入门系列—字符串类型
  • Android组件 - 收藏集 - 掘金
  • bearychat的java client
  • Bytom交易说明(账户管理模式)
  • docker-consul
  • ES6, React, Redux, Webpack写的一个爬 GitHub 的网页
  • Eureka 2.0 开源流产,真的对你影响很大吗?
  • js面向对象
  • Laravel深入学习6 - 应用体系结构:解耦事件处理器
  • magento 货币换算
  • MySQL Access denied for user 'root'@'localhost' 解决方法
  • MySQL常见的两种存储引擎:MyISAM与InnoDB的爱恨情仇
  • Python 反序列化安全问题(二)
  • SAP云平台里Global Account和Sub Account的关系
  • Webpack入门之遇到的那些坑,系列示例Demo
  • 阿里云ubuntu14.04 Nginx反向代理Nodejs
  • 海量大数据大屏分析展示一步到位:DataWorks数据服务+MaxCompute Lightning对接DataV最佳实践...
  • 好的网址,关于.net 4.0 ,vs 2010
  • 离散点最小(凸)包围边界查找
  • 盘点那些不知名却常用的 Git 操作
  • 入职第二天:使用koa搭建node server是种怎样的体验
  • 微信小程序上拉加载:onReachBottom详解+设置触发距离
  • 我是如何设计 Upload 上传组件的
  • Java性能优化之JVM GC(垃圾回收机制)
  • LIGO、Virgo第三轮探测告捷,同时探测到一对黑洞合并产生的引力波事件 ...
  • raise 与 raise ... from 的区别
  • ​RecSys 2022 | 面向人岗匹配的双向选择偏好建模
  • ​渐进式Web应用PWA的未来
  • (c语言版)滑动窗口 给定一个字符串,只包含字母和数字,按要求找出字符串中的最长(连续)子串的长度
  • (Python) SOAP Web Service (HTTP POST)
  • (简单) HDU 2612 Find a way,BFS。
  • (六)什么是Vite——热更新时vite、webpack做了什么
  • (图)IntelliTrace Tools 跟踪云端程序
  • (循环依赖问题)学习spring的第九天
  • (原创)boost.property_tree解析xml的帮助类以及中文解析问题的解决
  • .gitignore文件_Git:.gitignore
  • .htaccess配置重写url引擎
  • .NET 的静态构造函数是否线程安全?答案是肯定的!