并发编程应用——线程池中的ThreadLocal变量传递
本文讲述下在项目开发过程中遇到的实际问题。
问题
问题描述
之前搞得项目是SpringCloud开发的微服务项目,用户登录之后把用户信息塞到header里给我们,然后后端统一在网关拦截,读取用户ID然后塞到用户上下文中。但是我们在改造项目的时候发现个问题,莫名其妙的用户信息丢失了
经过排查,发现是因为SpringCloud的Hystrix隔离策略默认是线程池隔离,但是为啥是线程池隔离就会丢失用户信息呢?
原因分析
在前面其实有讲过,信号量跟管程都是操作系统的同步互斥方案,那这里Hytrix我们都知道是SpringCloud防止微服务雪崩的熔断器组件,它的隔离策略也有一个信号量隔离,跟前面同步互斥的信号量是一个东西吗?没错,是的。首先看下Hytrix的隔离策略是做什么的,隔离策略,是限制对共享资源访问的并发量,当线程池/信号已满,就立刻拒绝请求,转入服务降级模式。没错,Hytrix不仅仅可以在服务出现故障的时候降级,也可以在访问并发量过高的时候降级。
那它的隔离策略就很容易理解了:
-
线程池隔离:不同服务的执行使用不同的线程池,同时将用户请求的线程与具体业务执行的线程分开
-
信号量隔离:总共允许访问的并发量就是一共多少个通行证,每次访问先tryAcquire,如果发现通行证没了就拒绝请求,进入降级模式。用户请求线程与业务执行线程是同一个线程。
用线程池与new出来的线程又有什么区别呢?我们使用线程池而不是new线程的目的,就是因为避免创建与销毁线程的开销,想要重复使用线程。问题就出在这里,我们在分析ThreadLocal的时候知道,子线程是可以获取到父线程init方法初始化的值的,但是父线程set的值,子线程的ThreadLocal是获取不到的,这是第一个问题;第二个问题,假设我们通过了某种手段让子线程获取到了父线程set的ThreadLocal值,那么使用线程池的时候,线程是重复使用的。这就意味着最开始创建的线程确实可以获取父线程的ThreadLocal,但是后面重复使用的线程如果不做处理,就会一直拿着创建最开始线程的父线程的ThreadLocal。
可能有点绕,我们看下下面的例子:
demo测试
我们来写几个demo,分别验证:
-
直接new出来的子线程是否可以获取父线程设置的ThreadLocal变量的值
-
使用线程池的子线程是否可以获取父线程设置的ThreadLocal变量的值
再分别看下如何解决
测试一
测试目的
直接new出来的子线程是否可以获取父线程设置的ThreadLocal变量的值
测试代码
public class UnSafeId {
static ThreadLocal<String> userId = new ThreadLocal<>();
public static void main(String[] args) {
userId.set("77");
System.out.println(Thread.currentThread().getName() + " userId : " + userId.get());
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + " userId : " + userId.get());
}).start();
}
}
测试结果
直接new出来的子线程不能获取父线程设置的ThreadLocal变量的值
解决方案
解决方案
使用InheritableThreadLocal
代码示例
public class UnSafeId {
static ThreadLocal<String> userId = new ThreadLocal<>();
static InheritableThreadLocal<String> address=
new InheritableThreadLocal<>();
public static void main(String[] args) {
userId.set("77");
address.set("GuangDongProvince");
System.out.println(Thread.currentThread().getName() + " userId : " + userId.get());
System.out.println(Thread.currentThread().getName() + " userId : " + address.get());
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + " userId : " + userId.get());
System.out.println(Thread.currentThread().getName() + " userId : " + address.get());
}).start();
}
}
源码分析
我们在子线程address.get()这里,用线程debug方式查看子线程的初始化过程,会发现在Thread类的init方法中有这样一段:
也就是说,子线程在初始化的时候会查看父线程是否有InheritableThreadLocal,如果有,就用父线程InheritableThreadLocal的值创建当前线程的ThreadLocalMap,所以子线程就可以取到父线程的InheritableThreadLocal的值了。
测试二
测试目的
使用线程池的子线程是否可以获取父线程设置的ThreadLocal变量的值(需要注意,不同的线程池创建的线程方式也不同)
可能你会说,既然new线程你可以用InheritableThreadLocal解决,那么线程池用InheritableThreadLocal不是一样吗?
测试代码
public class ThreadPoolTest {
static InheritableThreadLocal<String> userId = new InheritableThreadLocal<>();
public static void main(String[] args) {
userId.set("77");
System.out.println(Thread.currentThread().getName() + " userId : " + userId.get());
ExecutorService executorService = Executors.newFixedThreadPool(2);
executorService.execute(() -> {
System.out.println(Thread.currentThread().getName() + " userId : " + userId.get());
});
userId.set("777");
executorService.execute(() -> {
System.out.println(Thread.currentThread().getName() + " userId : " + userId.get());
});
userId.set("7777");
executorService.execute(() -> {
System.out.println(Thread.currentThread().getName() + " userId : " + userId.get());
});
executorService.shutdown();
}
}
测试结果
结果分析
主线程将ThreadLocal设置为了77,然后创建了固定线程数,线程数为2的线程池
跑第一个任务,成功获取到了主线程的ThreadLocal:77,没问题。然后将主线程的ThreadLocal改为777,跑第二个任务,也没问题,成功获取到了,然后主线程将ThreadLocal改为7777,接下来继续跑第三个任务,哦豁,怎么还是77呢?
这是因为线程的init方法是在子线程初始化的时候,把主线程的InheritableThreadLocal拿来放在自己的ThreadLocalMap里。那线程池是什么?我们为了防止重复的创建销毁线程,搞了个线程池,可以重复利用线程。对,问题就出现在重复使用上(除非你用的是newCachedThreadPool,但这跟没使用线程池又有什么区别呢?),第一个第二个任务可以正常的获取ThreadLocal,是因为我们设定的固定线程池的核心线程数量是2,这就意味着第一个第二个任务都是主线程新拉起来的线程,那自然可以获取到,而第三个任务是重复使用之前的线程,那主线程此时的ThreadLocal自然就丢了。
解决方案
解决方案一
我们可以对Runnable做一下包装:
代码示例
public class ThreadPoolTestSolution {
public static void main(String[] args) {
ThreadLocalHolder.userId.set("77");
System.out.println(Thread.currentThread().getName() + " userId : " + ThreadLocalHolder.userId.get());
ExecutorService executorService = Executors.newFixedThreadPool(2);
executorService.execute(WrapRunnable.wrap(new RunnableTest()));
ThreadLocalHolder.userId.set("777");
executorService.execute(WrapRunnable.wrap(new RunnableTest()));
ThreadLocalHolder.userId.set("7777");
executorService.execute(WrapRunnable.wrap(new RunnableTest()));
executorService.shutdown();
}
}
class RunnableTest implements Runnable {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " userId : " + ThreadLocalHolder.userId.get());
}
}
class ThreadLocalHolder {
static ThreadLocal<String> userId = new ThreadLocal<>();
}
class WrapRunnable {
//把runnable包装一下
public static Runnable wrap(Runnable runnable) {
//获取当前线程的ThreadLocalHolder
String currentThreadLocal = ThreadLocalHolder.userId.get();
return new Runnable() {
@Override
public void run() {
try {
//塞到子线程的ThreadLocalHolder
ThreadLocalHolder.userId.set(currentThreadLocal);
//继续执行原来Runnable的run方法
runnable.run();
} finally {
//最终清空下ThreadLocalHolder,防止线程池重复使用线程带来的ThreadLocal数据遗留问题
ThreadLocalHolder.userId.remove();
}
}
};
}
}
测试结果
结果分析
这种解决方案就是将所有线程执行的Runnable方法进行了包装,在包装方法中,就是取出当前线程上下文,然后塞给子线程,然后执行run方法,最后清空子线程的上下文。但是这样所有的Runnable方法都需要调用包装函数,还是有点麻烦。
解决方案二(Ttl示例一)
使用阿里巴巴开源的ransmittable-thread-local
代码示例
public class TransmittableThreadLocalTest {
public static void main(String[] args) {
UserIdHolder.userId.set("77");
System.out.println(Thread.currentThread().getName() + " userId : " + UserIdHolder.userId.get());
//ttl对线程池做了包装
ExecutorService executorService = Executors.newFixedThreadPool(1);
executorService = TtlExecutors.getTtlExecutorService(executorService);
Runnable runnable = new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " userId : " + UserIdHolder.userId.get());
}
};
executorService.execute(runnable);
UserIdHolder.userId.set("777");
executorService.execute(runnable);
UserIdHolder.userId.set("7777");
executorService.execute(runnable);
executorService.shutdown();
}
}
class UserIdHolder {
//使用ttl的ThreadLocal
static TransmittableThreadLocal<String> userId = new TransmittableThreadLocal<>();
}
测试结果
Ttl示例2:
使用Ttl修饰的Runnable:
代码示例:
public class TransmittableThreadLocalTest2 {
public static void main(String[] args) {
UserIdHolder.userId.set("77");
System.out.println(Thread.currentThread().getName() + " userId : " + UserIdHolder.userId.get());
ExecutorService executorService = Executors.newFixedThreadPool(1);
Runnable runnable = new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " userId : " + UserIdHolder.userId.get());
}
};
//TtlRunnable ttlRunnable = TtlRunnable.get(runnable);
// executorService.execute(ttlRunnable);
executorService.execute( TtlRunnable.get(runnable));
UserIdHolder.userId.set("777");
// executorService.execute(ttlRunnable);
executorService.execute( TtlRunnable.get(runnable));
UserIdHolder.userId.set("7777");
// executorService.execute(ttlRunnable);
executorService.execute( TtlRunnable.get(runnable));
executorService.shutdown();
}
}
测试结果
结果分析
ttl的使用用ttl修饰的线程池跟runnable都行,不过线程池的方式要好一点,因为TtlRunnable的原理跟上面wrapRunnable差不多,因为它也是在TtlRunnable.get方法里塞上下文的,就需要每次new Runnable都调用一下,麻烦。
官方做法
TaskDecorator
这个装饰器其实就是我们自己实现的WrapRunnable,看下源码,是不是一毛一样:
总结
在开发中,我们往往使用ThreadLocal来存放一些上下文啊,用户信息之类的数据,但是如果使用到了线程池,如果直接用的ThreadLoal,父线程的数据就会丢失,如果使用的InheritableThreadLocal,new的Thread中数据不会丢失,但是线程池跑的线程,当线程重复使用时,数据会残留。
解决思想:对线程要执行的runnable方法进行包装,把当前线程的上下文塞进去,然后再清除掉就可以啦。
解决方法:
成熟的框架
-
阿里巴巴开源的ransmittable-thread-local,TtlExecutors(推荐)或者TtlRunnable
-
如果是为了微服务开发中链路追踪将TraceId传递下去,可以使用Sl4j实现的MDC
自己封装(这两种方法本质上都一样):
- 包装Runnable
- 使用TaskDecorator