当前位置: 首页 > news >正文

springcloud-hystrix详解(含java代码)

为什么使用Hystrix

  • 对来自依赖的延迟和故障进行防护和控制
    • 这些依赖通常都是通过网络访问的
  • 阻止故障的连锁反应
  • 快速失败并迅速恢复
  • 回退并优雅降级
  • 提供近实时的监控与告警

Hystrix遵循的设计原则

  • 防止任何单独的依赖耗尽资源(线程)
  • 过载立即切断并快速失败,防止排队
  • 尽可能提供回退以保护用户免受故障
  • 使用隔离技术(例如隔板,泳道和断路器模式)来限制任何一个依赖的影响
  • 通过近实时的指标,监控和告警,确保故障被及时发现
  • 通过动态修改配置属性,确保故障及时恢复
  • 防止整个依赖客户端执行失败,而不仅仅是网络通信

Hystrix处理流程

    1. 构造一个 HystrixCommand或HystrixObservableCommand对象,用于封装请求,并在构造方法配置请求被执行需要的参数;
    1. 执行命令
    1. 判断是否使用缓存响应请求,若启用了缓存,且缓存可用,直接使用缓存响应请求。Hystrix支持请求缓存,但需要用户自定义启动;
    1. 判断熔断器是否打开,如果打开,跳到第8步;
    1. 判断线程池/队列/信号量是否已满,已满则跳到第8步;
    1. 执行HystrixObservableCommand.construct()或HystrixCommand.run(),如果执行失败或者超时,跳到第8步;否则,跳到第9步;
    1. 统计熔断器监控指标;
    1. 走Fallback备用逻辑
    1. 返回请求响应
  • 第5步线程池/队列/信号量已满时,还会执行第7步逻辑,更新熔断器统计信息,而第6步无论成功与否,都会更新熔断器统计信息

Hystrix4种执行命令的方法

  • Hystrix提供了4种执行命令的方法

    • execute()和queue() 适用于HystrixCommand对象
      • execute() -> 实际是调用了queue().get()
        • 以同步堵塞方式执行run(),只支持接收一个值对象。
        • hystrix会从线程池中取一个线程来执行run(),并等待返回值。
      • queue() -> 实际调用了toObservable().toBlocking().toFuture()
        • 以异步非阻塞方式执行run(),只支持接收一个值对象。
        • 调用queue()就直接返回一个Future对象。
        • 可通过 Future.get()拿到run()的返回结果,但Future.get()是阻塞执行的。
        • 若执行成功,Future.get()返回单个返回值。
        • 当执行失败时,如果没有重写fallback,Future.get()抛出异常。
    • observe()和toObservable()适用于HystrixObservableCommand对象。
      • observe()

        • 事件注册前执行run()/construct(),支持接收多个值对象,取决于发射源。
        • 调用observe()会返回一个hot Observable,也就是说,调用observe()自动触发执行run()/construct(),无论是否存在订阅者。
          • 如果继承的是HystrixCommand,hystrix会从线程池中取一个线程以非阻塞方式执行run();
          • 如果继承的是HystrixObservableCommand,将以调用线程阻塞执行construct()。
      • observe()使用方法:

        • 调用observe()会返回一个Observable对象
        • 调用这个Observable对象的subscribe()方法完成事件注册,从而获取结果
      • toObservable()

        • 事件注册后执行run()/construct(),支持接收多个值对象,取决于发射源。
        • 调用toObservable()会返回一个cold Observable也就是说,调用toObservable()不会立即触发执行run()/construct(),必须有订阅者订阅Observable时才会执行。
          • 如果继承的是HystrixCommand,hystrix会从线程池中取一个线程以非阻塞方式执行run(),调用线程不必等待run();
          • 如果继承的是HystrixObservableCommand,将以调用线程堵塞执行construct(),调用线程需等待construct()执行完才能继续往下走。
      • toObservable()使用方法:

        • 调用observe()会返回一个Observable对象
        • 调用这个Observable对象的subscribe()方法完成事件注册,从而获取结果
      • observe()实际调用toObservable()获得一个cold Observable

        • 再创建一个ReplaySubject对象订阅Observable,将源Observable转化为hot Observable。
        • 因此调用observe()会自动触发执行run()/construct()
  • 需注意的是HystrixCommand也支持toObservable()和observe(),但是即使将HystrixCommand转换成Observable,它也只能发射一个值对象。只有HystrixObservableCommand才支持发射多个值对象。

Hystrix容错

  • Hystrix的容错主要是通过添加容许延迟和容错方法,帮助控制这些分布式服务之间的交互。
  • 还通过隔离服务之间的访问点,阻止它们之间的级联故障以及提供回退选项来实现这一点,从而提高系统的整体弹性。
  • Hystrix主要提供了以下几种容错方法
    • 资源隔离
    • 熔断
    • 降级

资源隔离

  • 资源隔离主要指对线程的隔离。Hystrix提供了两种线程隔离方式
    • 线程池
    • 信号量。
类型线程切换支持异步支持超时支持熔断限流开销
线程池
信号量
  • 线程池和信号量都支持熔断和限流。
  • 相比线程池,信号量不需要线程切换,因此避免了不必要的开销。
  • 但是信号量不支持异步,也不支持超时
    • 也就是说当所请求的服务不可用时,信号量会控制超过限制的请求立即返回
    • 但是已经持有信号量的线程只能等待服务响应或从超时中返回,即可能出现长时间等待。
    • 线程池模式下,当超过指定时间未响应的服务,Hystrix会通过响应中断的方式通知线程立即结束并返回。
线程池
  • Hystrix通过命令模式对发送请求的对象和执行请求的对象进行解耦,将不同类型的业务请求封装为对应的命令请求。
    • 如订单服务查询商品
      • 查询商品请求->商品Command;
    • 商品服务查询库存
      • 查询库存请求->库存Command。
  • 并且为每个类型的Command配置一个线程池,当第一次创建Command时,根据配置创建一个线程池,并放入ConcurrentHashMap
final static ConcurrentHashMap<String, HystrixThreadPool> threadPools = new ConcurrentHashMap<String, HystrixThreadPool>();
...
if (!threadPools.containsKey(key)) {
    threadPools.put(key, new HystrixThreadPoolDefault(threadPoolKey, propertiesBuilder));
}
  • 后续查询商品的请求创建Command时,将会重用已创建的线程池
  • 通过将发送请求线程与执行请求的线程分离,可有效防止发生级联故障。
  • 当线程池或请求队列饱和时,Hystrix将拒绝服务,使得请求线程可以快速失败,从而避免依赖问题扩散。
  • 优点
    • 保护应用程序以免受来自依赖故障的影响,指定依赖线程池饱和不会影响应用程序的其余部分。
    • 当引入新客户端lib时,即使发生问题,也是在本lib中,并不会影响到其他内容。
    • 当依赖从故障恢复正常时,应用程序会立即恢复正常的性能。
    • 当应用程序一些配置参数错误时,线程池的运行状况会很快检测到这一点(通过增加错误,延迟,超时,拒绝等),同时可以通过动态属性进行实时纠正错误的参数配置。
    • 如果服务的性能有变化,需要实时调整,比如增加或者减少超时时间,更改重试次数,可以通过线程池指标动态属性修改,而且不会影响到其他调用请求。
    • 除了隔离优势外,hystrix拥有专门的线程池可提供内置的并发功能,使得可以在同步调用之上构建异步门面(外观模式),为异步编程提供了支持(Hystrix引入了Rxjava异步框架)。

注意:尽管线程池提供了线程隔离,我们的客户端底层代码也必须要有超时设置或响应线程中断,不能无限制的阻塞以致线程池一直饱和。

  • 缺点
    • 线程池的主要缺点是增加了计算开销。每个命令的执行都在单独的线程完成,增加了排队、调度和上下文切换的开销。因此,要使用Hystrix,就必须接受它带来的开销,以换取它所提供的好处。
    • 通常情况下,线程池引入的开销足够小,不会有重大的成本或性能影响。但对于一些访问延迟极低的服务,如只依赖内存缓存,线程池引入的开销就比较明显了,这时候使用线程池隔离技术就不适合了,我们需要考虑更轻量级的方式,如信号量隔离。
信号量
  • 上面提到了线程池隔离的缺点,当依赖延迟极低的服务时,线程池隔离技术引入的开销超过了它所带来的好处。

  • 这时候可以使用信号量隔离技术来代替,通过设置信号量来限制对任何给定依赖的并发调用量

    • 使用线程池时,发送请求的线程和执行依赖服务的线程不是同一个
    • 而使用信号量时,发送请求的线程和执行依赖服务的线程是同一个,都是发起请求的线程。
  • 由于Hystrix默认使用线程池做线程隔离

    • 使用信号量隔离需要显示地将属性execution.isolation.strategy设置为ExecutionIsolationStrategy.SEMAPHORE
    • 同时配置信号量个数,默认为10。
    • 客户端需向依赖服务发起请求时,首先要获取一个信号量才能真正发起调用
    • 由于信号量的数量有限,当并发请求量超过信号量个数时,后续的请求都会直接拒绝,进入fallback流程。

信号量隔离主要是通过控制并发请求量,防止请求线程大面积阻塞,从而达到限流和防止雪崩的目的。

熔断

  • Hystrix在运行过程中会向每个commandKey对应的熔断器报告成功、失败、超时和拒绝的状态
  • 熔断器维护并统计这些数据,并根据这些统计信息来决策熔断开关是否打开。
    • 如果打开,熔断后续请求,快速返回。
    • 隔一段时间(默认是5s)之后熔断器尝试半开,放入一部分流量请求进来,相当于对依赖服务进行一次健康检查
    • 如果请求成功,熔断器关闭。
  • 熔断器配置
    • Circuit Breaker主要包括如下6个参数:
      • circuitBreaker.enabled
        • 是否启用熔断器,默认是TRUE
      • circuitBreaker.forceOpen
        • 熔断器强制打开,始终保持打开状态,不关注熔断开关的实际状态。默认值FLASE。
      • circuitBreaker.forceClosed
        • 熔断器强制关闭,始终保持关闭状态,不关注熔断开关的实际状态。默认值FLASE。
      • circuitBreaker.errorThresholdPercentage
        • 错误率,默认值50%,例如一段时间(10s)内有100个请求,其中有54个超时或者异常
        • 那么这段时间内的错误率是54%,大于了默认值50%,这种情况下会触发熔断器打开。
      • circuitBreaker.requestVolumeThreshold
        • 默认值20。含义是一段时间内至少有20个请求才进行errorThresholdPercentage计算。
        • 比如一段时间了有19个请求,且这些请求全部失败了,错误率是100%,但熔断器不会打开,总请求数不满足20。
      • circuitBreaker.sleepWindowInMilliseconds
        • 半开状态试探睡眠时间,默认值5000ms。
        • 如:当熔断器开启5000ms之后,会尝试放过去一部分流量进行试探,确定依赖服务是否恢复。
  • 熔断器工作步骤
      1. 调用allowRequest()判断是否允许将请求提交到线程池
      • 如果熔断器强制打开,circuitBreaker.forceOpen为true,不允许放行,返回。
      • 如果熔断器强制关闭,circuitBreaker.forceClosed为true,允许放行。此外不必关注熔断器实际状态,也就是说熔断器仍然会维护统计数据和开关状态,只是不生效而已。
      1. 调用isOpen()判断熔断器开关是否打开
      • 如果熔断器开关打开,进入3,否则继续;
      • 如果一个周期内总的请求数小于circuitBreaker.requestVolumeThreshold的值,允许请求放行,否则继续;
      • 如果一个周期内错误率小于circuitBreaker.errorThresholdPercentage的值,允许请求放行。否则,打开熔断器开关,进入3。
      1. 调用allowSingleTest()判断是否允许单个请求通行,检查依赖服务是否恢复
      • 如果熔断器打开,且距离熔断器打开的时间或上一次试探请求放行的时间超过circuitBreaker.sleepWindowInMilliseconds的值时,熔断器器进入半开状态,允许放行一个试探请求
      • 否则,不允许放行。

降级

  • 降级,通常指务高峰期,为了保证核心服务正常运行,需要停掉一些不太重要的业务,或者某些服务不可用时,执行备用逻辑从故障服务中快速失败或快速返回,以保障主体业务不受影响。
  • Hystrix提供的降级主要是为了容错,保证当前服务不受依赖服务故障的影响,从而提高服务的健壮性。
    • 要支持回退或降级处理
      • 可以重写HystrixCommand的getFallBack方法
      • 或HystrixObservableCommand的resumeWithFallback方法。
  • Hystrix在以下几种情况下会走降级逻辑
    • 执行construct()或run()抛出异常
    • 熔断器打开导致命令短路
    • 命令的线程池和队列或信号量的容量超额,命令被拒绝
    • 命令执行超时
  • 降级回退方式
    • Fail Fast 快速失败
      • 快速失败是最普通的命令执行方法,命令没有重写降级逻辑。 如果命令执行发生任何类型的故障,它将直接抛出异常。
    • Fail Silent 无声失败
      • 指在降级方法中通过返回null,空Map,空List或其他类似的响应来完成。

@Override
protected Integer getFallback() {
   return null;
}
 
@Override
protected List<Integer> getFallback() {
   return Collections.emptyList();
}
 
@Override
protected Observable<Integer> resumeWithFallback() {
   return Observable.empty();
}
  • Fallback: Static
    • 指在降级方法中返回静态默认值。 这不会导致服务以“无声失败”的方式被删除,而是导致默认行为发生。
    • 如:应用根据命令执行返回true / false执行相应逻辑,但命令执行失败,则返回自定义的默认值true/false
@Override
protected Boolean getFallback() {
    return true;
}
@Override
protected Observable<Boolean> resumeWithFallback() {
    return Observable.just( true );
}
  • Fallback: Stubbed
    • 当命令返回一个包含多个字段的复合对象时,适合以Stubbed 的方式回退。
@Override
protected MissionInfo getFallback() {
   return new MissionInfo("missionName","error");
}
  • Fallback: Cache via Network
    • 有时,如果调用依赖服务失败,可以从缓存服务(如redis)中查询旧数据版本。
    • 由于又会发起远程调用,所以建议重新封装一个Command,使用不同的ThreadPoolKey,与主线程池进行隔离。
@Override
protected Integer getFallback() {
   return new RedisServiceCommand(redisService).execute();
}

结果cache

  • Hystrix支持将一个请求结果缓存起来,下一个具有相同key的请求将直接从缓存中取出结果,减少请求开销。
  • 要使用Hystrix cache功能
    • 第一个要求是重写getCacheKey(),用来构造cache key;
    • 第二个要求是构建context,如果请求B要用到请求A的结果缓存,A和B必须同处一个context。
      • 通过HystrixRequestContext.initializeContext()和context.shutdown()可以构建一个context
      • 这两条语句间的所有请求都处于同一个context。

请求合并器

  • HystrixCollapser

    • 微服务架构中通常需要依赖多个远程的微服务,而远程调用中最常见的问题就是通信消耗与连接数占用。在高并发的情况之下,因通信次数的增加,总的通信时间消耗将会变得越来越长。
    • 同时,因为依赖服务的线程池资源有限,将出现排队等待与响应延迟的清况。
    • 为了优化这两个问题,Hystrix 提供了HystrixCollapser来实现请求的合并,以减少通信消耗和线程数的占用。
    • HystrixCollapser实现了在 HystrixCommand之前放置一个合并处理器,将处于一个很短的时间窗(默认10毫秒)内对同一依赖服务的多个请求进行整合,并以批量方式发起请求的功能(前提是服务提供方提供相应的批量接口)。
    • HystrixCollapser的封装多个请求合并发送的具体细节,开发者只需关注将业务上将单次请求合并成多次请求即可。
  • 合并请求的开销

    • 需要注意请求合并的额外开销:用于请求合并的延迟时间窗会使得依赖服务的请求延迟增高。
    • 比如,某个请求不通过请求合并器访问的平均耗时为5ms,请求合并的延迟时间窗为10ms (默认值), 那么当该请求设置了请求合并器之后,最坏情况下(在延迟时间 窗结束时才发起请求)该请求需要15ms才能完成。
  • 什么时候使用合并请求的功能?

    • 合并请求存在额外开销,所以需要根据依赖服务调用的实际情况决定是否使用此功能,主要考虑下面两个方面:
      • 请求命令本身的延迟
        • 对于单次请求而言,如果[单次请求平均时间/时间窗口]越小,对于单次请求的性能影响越小。
        • 如果依赖服务的请求命令本身是一个高延迟的命令,那么可以使用请求合并器,因为延迟时间窗的时间消耗显得微不足道了。
      • 并发量
        • 时间窗口内并发量越大,合并求情的性能提升越明显。
        • 如果一个时间窗内只有少数几个请求,那么就不适合使用请求合并器。
        • 相反,如果一个时间窗内具有很高的并发量,那么使用请求合并器可以有效减少网络连接数量并极大提升系统吞吐量,此时延迟时间窗所增加的消耗就可以忽略不计了。

Java实现Hystrix

  • 创建一个hystrix-service模块
  • 添加pom
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-netflix-hystrix</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
  • 在application.yml进行配置
server:
  port: 8401
spring:
  application:
    name: hystrix-service
eureka:
  client:
    register-with-eureka: true
    fetch-registry: true
    service-url:
      defaultZone: http://localhost:8001/eureka/
service-url:
  user-service: http://user-service
  • 在启动类上添加@EnableCircuitBreaker来开启Hystrix的断路器功能
@EnableCircuitBreaker
@EnableDiscoveryClient
@SpringBootApplication
public class HystrixServiceApplication {

    public static void main(String[] args) {
        SpringApplication.run(HystrixServiceApplication.class, args);
    }
  • 在UserService中添加调用方法与服务降级方法,方法上需要添加@HystrixCommand注解
@HystrixCommand(fallbackMethod = "getDefaultUser")
public CommonResult getUser(Long id) {
    return restTemplate.getForObject(userServiceUrl + "/user/{1}", CommonResult.class, id);
}

public CommonResult getDefaultUser(@PathVariable Long id) {
    User defaultUser = new User(-1L, "defaultUser", "123456");
    return new CommonResult<>(defaultUser);
}
  • 创建UserHystrixController接口用于调用user-service服务
    • 在UserHystrixController中添加用于测试服务降级的接口:
@GetMapping("/testFallback/{id}")
public CommonResult testFallback(@PathVariable Long id) {
    return userService.getUser(id);
}

  • 启动eureka-server、user-service、hystrix-service服务
    在这里插入图片描述
  • 调用接口进行测试:http://localhost:8401/user/testFallback/1
  • 关闭user-service服务重新测试该接口,发现已经发生了服务降级
    在这里插入图片描述
  • @HystrixCommand中的常用参数
    • fallbackMethod:指定服务降级处理方法;
    • ignoreExceptions:忽略某些异常,不发生服务降级;
    • commandKey:命令名称,用于区分不同的命令;
    • groupKey:分组名称,Hystrix会根据不同的分组来统计命令的告警及仪表盘信息;
    • threadPoolKey:线程池名称,用于划分线程池。
设置命令、分组及线程池名称
  • 在UserHystrixController中添加测试接口:
@GetMapping("/testCommand/{id}")
public CommonResult testCommand(@PathVariable Long id) {
    return userService.getUserCommand(id);
}
  • 在UserService中添加方式实现功能:
 @HystrixCommand(fallbackMethod = "getDefaultUser",
    commandKey = "getUserCommand",
    groupKey = "getUserGroup",
    threadPoolKey = "getUserThreadPool")
public CommonResult getUserCommand(@PathVariable Long id) {
    return restTemplate.getForObject(userServiceUrl + "/user/{1}", CommonResult.class, id);
 }
使用ignoreExceptions忽略某些异常降级
  • 在UserHystrixController中添加测试接口:
@GetMapping("/testException/{id}")
public CommonResult testException(@PathVariable Long id) {
    return userService.getUserException(id);
}
  • 在UserService中添加实现方法,这里忽略了NullPointerException,当id为1时抛出IndexOutOfBoundsException,id为2时抛出NullPointerException:
@HystrixCommand(fallbackMethod = "getDefaultUser2", ignoreExceptions = {NullPointerException.class})
public CommonResult getUserException(Long id) {
    if (id == 1) {
        throw new IndexOutOfBoundsException();
    } else if (id == 2) {
        throw new NullPointerException();
    }
    return restTemplate.getForObject(userServiceUrl + "/user/{1}", CommonResult.class, id);
}

public CommonResult getDefaultUser2(@PathVariable Long id, Throwable e) {
    LOGGER.error("getDefaultUser2 id:{},throwable class:{}", id, e.getClass());
    User defaultUser = new User(-2L, "defaultUser2", "123456");
    return new CommonResult<>(defaultUser);
}
  • 调用接口进行测试:http://localhost:8401/user/tesException/1
    在这里插入图片描述
  • 调用接口进行测试:http://localhost:8401/user/tesException/1

在这里插入图片描述

Hystrix的请求缓存

  • 相关注解
    • @CacheResult
      • 开启缓存,默认所有参数作为缓存的key,cacheKeyMethod可以通过返回String类型的方法指定key
    • @CacheKey
      • 指定缓存的key,可以指定参数或指定参数中的属性值为缓存key,cacheKeyMethod还可以通过返回String类型的方法指定
    • @CacheRemove
      • 移除缓存,需要指定commandKey
测试使用缓存
  • 在UserHystrixController中添加使用缓存的测试接口,直接调用三次getUserCache方法:
@GetMapping("/testCache/{id}")
public CommonResult testCache(@PathVariable Long id) {
    userService.getUserCache(id);
    userService.getUserCache(id);
    userService.getUserCache(id);
    return new CommonResult("操作成功", 200);
}
  • 在UserService中添加具有缓存功能的getUserCache方法:
@CacheResult(cacheKeyMethod = "getCacheKey")
@HystrixCommand(fallbackMethod = "getDefaultUser", commandKey = "getUserCache")
    public CommonResult getUserCache(Long id) {
    LOGGER.info("getUserCache id:{}", id);
    return restTemplate.getForObject(userServiceUrl + "/user/{1}", CommonResult.class, id);
}

/**
 * 为缓存生成key的方法
 */
public String getCacheKey(Long id) {
    return String.valueOf(id);
}
  • 调用接口测试http://localhost:8401/user/testCache/1
    • 这个接口中调用了三次getUserCache方法,但是只打印了一次日志,说明有两次走的是缓存
      在这里插入图片描述
测试移除缓存
  • 在UserHystrixController中添加移除缓存的测试接口,调用一次removeCache方法:
@GetMapping("/testRemoveCache/{id}")
public CommonResult testRemoveCache(@PathVariable Long id) {
    userService.getUserCache(id);
    userService.removeCache(id);
    userService.getUserCache(id);
    return new CommonResult("操作成功", 200);
}
  • 在UserService中添加具有移除缓存功能的removeCache方法:
@CacheRemove(commandKey = "getUserCache", cacheKeyMethod = "getCacheKey")
@HystrixCommand
public CommonResult removeCache(Long id) {
    LOGGER.info("removeCache id:{}", id);
    return restTemplate.postForObject(userServiceUrl + "/user/delete/{1}", null, CommonResult.class, id);
}
  • 调用接口测试http://localhost:8401/user/testRemoveCache/1,可以发现有两次查询都走的是接口:
    在这里插入图片描述
缓存使用过程中的问题
  • 在缓存使用过程中,我们需要在每次使用缓存的请求前后对HystrixRequestContext进行初始化和关闭,否则会出现如下异常:
java.lang.IllegalStateException: Request caching is not available. Maybe you need to initialize the HystrixRequestContext?
	at com.netflix.hystrix.HystrixRequestCache.get(HystrixRequestCache.java:104) ~[hystrix-core-1.5.18.jar:1.5.18]
	at com.netflix.hystrix.AbstractCommand$7.call(AbstractCommand.java:478) ~[hystrix-core-1.5.18.jar:1.5.18]
	at com.netflix.hystrix.AbstractCommand$7.call(AbstractCommand.java:454) ~[hystrix-core-1.5.18.jar:1.5.18]
  • 这里我们通过使用过滤器,在每个请求前后初始化和关闭HystrixRequestContext来解决该问题:
@Component
@WebFilter(urlPatterns = "/*",asyncSupported = true)
public class HystrixRequestContextFilter implements Filter {
    @Override
    public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) throws IOException, ServletException {
        HystrixRequestContext context = HystrixRequestContext.initializeContext();
        try {
            filterChain.doFilter(servletRequest, servletResponse);
        } finally {
            context.close();
        }
    }
}

请求合并

  • Hystrix中提供了@HystrixCollapser用于合并请求

  • @HystrixCollapser的常用属性

    • batchMethod:用于设置请求合并的方法;
    • collapserProperties:请求合并属性,用于控制实例属性,有很多;
    • timerDelayInMilliseconds:collapserProperties中的属性,用于控制每隔多少时间合并一次请求;
  • 在UserHystrixController中添加testCollapser方法,这里我们先进行两次服务调用,再间隔200ms以后进行第三次服务调用:

@GetMapping("/testCollapser")
public CommonResult testCollapser() throws ExecutionException, InterruptedException {
    Future<User> future1 = userService.getUserFuture(1L);
    Future<User> future2 = userService.getUserFuture(2L);
    future1.get();
    future2.get();
    ThreadUtil.safeSleep(200);
    Future<User> future3 = userService.getUserFuture(3L);
    future3.get();
    return new CommonResult("操作成功", 200);
}
  • 使用@HystrixCollapser实现请求合并,所有对getUserFuture的的多次调用都会转化为对getUserByIds的单次调用:
@HystrixCollapser(batchMethod = "getUserByIds",collapserProperties = {
    @HystrixProperty(name = "timerDelayInMilliseconds", value = "100")
})
public Future<User> getUserFuture(Long id) {
    return new AsyncResult<User>(){
    @Override
    public User invoke() {
        CommonResult commonResult = restTemplate.getForObject(userServiceUrl + "/user/{1}", CommonResult.class, id);
        Map data = (Map) commonResult.getData();
        User user = BeanUtil.mapToBean(data,User.class,true);
        LOGGER.info("getUserById username:{}", user.getUsername());
        return user;
        }
    };
}

@HystrixCommand
public List<User> getUserByIds(List<Long> ids) {
    LOGGER.info("getUserByIds:{}", ids);
    CommonResult commonResult = restTemplate.getForObject(userServiceUrl + "/user/getUserByIds?ids={1}", CommonResult.class, CollUtil.join(ids,","));
    return (List<User>) commonResult.getData();
}
  • 访问接口测试http://localhost:8401/user/testCollapser,由于我们设置了100毫秒进行一次请求合并,前两次被合并,最后一次自己单独合并了。
    在这里插入图片描述

Hystrix的常用配置

全局配置

hystrix:
  #用于控制HystrixCommand的行为
  command: 
    default:
      execution:
        isolation:
          #控制HystrixCommand的隔离策略,THREAD->线程池隔离策略(默认),SEMAPHORE->信号量隔离策略
          strategy: THREAD 
          thread:
             #配置HystrixCommand执行的超时时间,执行超过该时间会进行服务降级处理
            timeoutInMilliseconds: 1000
            #配置HystrixCommand执行超时的时候是否要中断
            interruptOnTimeout: true 
            #配置HystrixCommand执行被取消的时候是否要中断
            interruptOnCancel: true 
          timeout:
            #配置HystrixCommand的执行是否启用超时时间
            enabled: true 
          semaphore:
             #当使用信号量隔离策略时,用来控制并发量的大小,超过该并发量的请求会被拒绝
            maxConcurrentRequests: 10
      fallback:
        #用于控制是否启用服务降级
        enabled: true 
      #用于控制HystrixCircuitBreaker的行为
      circuitBreaker: 
        #用于控制断路器是否跟踪健康状况以及熔断请求
        enabled: true 
        #超过该请求数的请求会被拒绝
        requestVolumeThreshold: 20 
        #强制打开断路器,拒绝所有请求
        forceOpen: false 
        #强制关闭断路器,接收所有请求
        forceClosed: false 
      requestCache:
        #用于控制是否开启请求缓存
        enabled: true 
  #用于控制HystrixCollapser的执行行为      
  collapser: 
    default:
      #控制一次合并请求合并的最大请求数
      maxRequestsInBatch: 100 
      #控制多少毫秒内的请求会被合并成一个
      timerDelayinMilliseconds: 10 
      requestCache:
        #控制合并请求是否开启缓存
        enabled: true 
  #用于控制HystrixCommand执行所在线程池的行为      
  threadpool: 
    default:
      #线程池的核心线程数
      coreSize: 10 
      #线程池的最大线程数,超过该线程数的请求会被拒绝
      maximumSize: 10 
      #用于设置线程池的最大队列大小,-1采用SynchronousQueue,其他正数采用LinkedBlockingQueue
      maxQueueSize: -1 
      #用于设置线程池队列的拒绝阀值,由于LinkedBlockingQueue不能动态改版大小,使用时需要用该参数来控制线程数
      queueSizeRejectionThreshold: 5 

实例配置

hystrix:
  command:
    #将default换成HystrixComrnandKey
    HystrixComandKey: 
      execution:
        isolation:
          strategy: THREAD
  collapser:
    #将default换成HystrixCollapserKey
    HystrixCollapserKey: 
      maxRequestsInBatch: 100
  threadpool:
    #将default换成HystrixThreadPoolKey
    HystrixThreadPoolKey: 
      coreSize: 10
  • HystrixComandKey对应@HystrixCommand中的commandKey属性
  • HystrixCollapserKey对应@HystrixCollapser注解中的collapserKey属性
  • HystrixThreadPoolKey对应@HystrixCommand中的threadPoolKey属性

Hystrix Dashboard

  • Hystrix提供了Hystrix Dashboard来实时监控HystrixCommand方法的执行情况。 Hystrix Dashboard可以有效地反映出每个Hystrix实例的运行情况,帮助我们快速发现系统中的问题,从而采取对应措施。

Hystrix 单个实例监控

  • 创建一个hystrix-dashboard模块
  • pom
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-netflix-hystrix-dashboard</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
  • 在application.yml进行配置:
server:
  port: 8501
spring:
  application:
    name: hystrix-dashboard
eureka:
  client:
    register-with-eureka: true
    fetch-registry: true
    service-url:
      defaultZone: http://localhost:8001/eureka/
  • 在启动类上添加@EnableHystrixDashboard来启用监控功能:
@EnableHystrixDashboard
@EnableDiscoveryClient
@SpringBootApplication
public class HystrixDashboardApplication {

    public static void main(String[] args) {
        SpringApplication.run(HystrixDashboardApplication.class, args);
    }

}
  • 启动如下服务:eureka-server、user-service、hystrix-service、hystrix-dashboard,启动后注册中心显示如下。
    在这里插入图片描述
  • 访问Hystrix Dashboard:http://localhost:8501/hystrix
    在这里插入图片描述
  • 填写好信息后点击监控按钮,这里我们需要注意的是,由于我们本地不支持https,所以我们的地址需要填入的是http,否则会无法获取监控信息;
    在这里插入图片描述
  • 被监控的hystrix-service服务需要开启Actuator的hystrix.stream端点,配置信息如下:
management:
  endpoints:
    web:
      exposure:
        include: 'hystrix.stream' #暴露hystrix监控端点
  • 调用几次hystrix-service接口:http://localhost:8401/user/testCommand/1
    在这里插入图片描述
  • 可以发现曾经我们在@HystrixCommand中添加的commandKey和threadPoolKey属性都显示在上面了,并且有7次调用都成功了

Hystrix Dashboard 图表解读

在这里插入图片描述

Hystrix 集群实例监控

  • 创建一个turbine-service模块,用来聚合hystrix-service的监控信息。
  • pom
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-netflix-turbine</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
  • 在application.yml进行配置,主要是添加了Turbine相关配置:
server:
  port: 8601
spring:
  application:
    name: turbine-service
eureka:
  client:
    register-with-eureka: true
    fetch-registry: true
    service-url:
      defaultZone: http://localhost:8001/eureka/
turbine:
  #指定需要收集信息的服务名称
  app-config: hystrix-service 
  #指定服务所属集群
  cluster-name-expression: new String('default') 
   #以主机名和端口号来区分服务
  combine-host-port: true
  • 在启动类上添加@EnableTurbine来启用Turbine相关功能:
@EnableTurbine
@EnableDiscoveryClient
@SpringBootApplication
public class TurbineServiceApplication {

    public static void main(String[] args) {
        SpringApplication.run(TurbineServiceApplication.class, args);
    }

}
  • hystrix-service服务添加application-replica1.yml
server:
  port: 8402
spring:
  application:
    name: hystrix-service
eureka:
  client:
    register-with-eureka: true
    fetch-registry: true
    service-url:
      defaultZone: http://localhost:8001/eureka/
service-url:
  user-service: http://user-service
management:
  endpoints:
    web:
      exposure:
        include: 'hystrix.stream' #暴露hystrix监控端点
  • 使用application-replica1.yml配置再启动一个hystrix-service服务,启动turbine-service服务,此时注册中心显示如下
    在这里插入图片描述

Hystrix集群监控演示

  • 访问Hystrix Dashboard:http://localhost:8501/hystrix
  • 添加集群监控地址,需要注意的是我们需要添加的是turbine-service的监控端点地址:
    在这里插入图片描述
  • 调用几次hystrix-service的接口
    • http://localhost:8401/user/testCommand/1
    • http://localhost:8402/user/testCommand/1
    • 可以看到我们的Hystrix实例数量变成了两个。
      在这里插入图片描述

相关文章:

  • WebSocket | Netty实现WebSocket服务端
  • TypeScript——TS简介(面试题)、运行环境、变量声明、基础类型、类型断言、初识接口
  • 使用PyCharm写脚本运行时报错“This version of ChromeDriver only supports Chrome version 98”
  • TypeScript——笔试题/面试题
  • Vue入门【一】-- 基本模板语法
  • Vue入门【二】-- watch侦听器之普通监听与深度监听
  • 机器人地面站-[QGroundControl源码解析]-[6]-[AnalysizeView2]
  • FFmpeg入门详解之34:FFmpeg应用之视频播放器
  • 2022牛客多校(九)
  • Java常用类
  • [C/C++]_[初级]_[关于编译时出现有符号-无符号不匹配的警告-sizeof使用注意事项]
  • aarch64服务器-部署mysql
  • PDF转为网页文件怎么转?这篇文章告诉你
  • Java 基本数据类型-包装类-String的相互转换(总结+代码实现)
  • JUC并发编程
  • @angular/forms 源码解析之双向绑定
  • Angular 响应式表单 基础例子
  • angular学习第一篇-----环境搭建
  • HTML中设置input等文本框为不可操作
  • IDEA 插件开发入门教程
  • javascript 哈希表
  • Linux链接文件
  • linux学习笔记
  • Python实现BT种子转化为磁力链接【实战】
  • React Native移动开发实战-3-实现页面间的数据传递
  • Spring Security中异常上抛机制及对于转型处理的一些感悟
  • 跨域
  • 七牛云假注销小指南
  • 手机app有了短信验证码还有没必要有图片验证码?
  • 我感觉这是史上最牛的防sql注入方法类
  • 线性表及其算法(java实现)
  • 原生JS动态加载JS、CSS文件及代码脚本
  • 终端用户监控:真实用户监控还是模拟监控?
  • ​软考-高级-系统架构设计师教程(清华第2版)【第1章-绪论-思维导图】​
  • #Spring-boot高级
  • #我与Java虚拟机的故事#连载16:打开Java世界大门的钥匙
  • (1)Nginx简介和安装教程
  • (NO.00004)iOS实现打砖块游戏(九):游戏中小球与反弹棒的碰撞
  • (附源码)ssm高校实验室 毕业设计 800008
  • (论文阅读23/100)Hierarchical Convolutional Features for Visual Tracking
  • (论文阅读笔记)Network planning with deep reinforcement learning
  • (没学懂,待填坑)【动态规划】数位动态规划
  • (心得)获取一个数二进制序列中所有的偶数位和奇数位, 分别输出二进制序列。
  • (转)Sql Server 保留几位小数的两种做法
  • ****** 二 ******、软设笔记【数据结构】-KMP算法、树、二叉树
  • ***汇编语言 实验16 编写包含多个功能子程序的中断例程
  • .mat 文件的加载与创建 矩阵变图像? ∈ Matlab 使用笔记
  • .NET/C# 异常处理:写一个空的 try 块代码,而把重要代码写到 finally 中(Constrained Execution Regions)
  • .NET面试题(二)
  • .net企业级架构实战之7——Spring.net整合Asp.net mvc
  • /bin/bash^M: bad interpreter: No such file or directory
  • @Transactional类内部访问失效原因详解
  • [Bada开发]初步入口函数介绍
  • [c]扫雷
  • [C++]模板与STL简介