一. 使用HystrixCommand编码方式


//构造setter  

HystrixCommandGroupKey groupKey = HystrixCommandGroupKey.Factory.asKey(group); 

HystrixThreadPoolKey threadPoolKey = HystrixThreadPoolKey.Factory.asKey(group); 

HystrixCommandKey commandKey = HystrixCommandKey.Factory.asKey(service);

HystrixCommandProperties.Setter commandPropertiesDefaults = HystrixCommandProperties.Setter()

     .withExecutionTimeoutInMilliseconds(100)

     .withCircuitBreakerForceOpen(true);

HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults = HystrixThreadPoolProperties.Setter()

 .withCoreSize(10)

 .withQueueSizeRejectionThreshold(10);

 


HytrixCommand.Setter setter = HytrixCommand.Setter.withGroupKey(groupKey)  

  .andCommandKey(commandKey)  

  .andThreadPoolKey(threadPoolKey);

  .andCommandPropertiesDefaults(commandPropertiesDefaults)

  .andThreadPoolPropertiesDefaults(threadPoolPropertiesDefaults);

  

//构造command  

HystrixCommand<String> command = new HystrixCommand<String>(setter) {  

  protected String run() throws Exception {  

  logger.info("#####################  in hystrix thread");  

  Thread.sleep(time);  

  if(isException)  

  throw  new RuntimeException("exception in run");  

  return service+ ":return";  

  }  

  @Override  

  protected String getFallback() {  

  logger.info("#####################  in request thread");  

  return service+":fallback";  

  }  

};  

  

  

1 HystrixCommandKey

Hystrix使用单例模式存储HystrixCommand,熔断机制就是根据单实例上的调用情况统计实现的,所以每个HystrixCommand要有自己的名字,用于区分,同时用于依赖调用的隔离。HystrixCommandKey就是用于定义这个名字,如果没有定义这个名字,Hystrix会使用其类名作为其名字,可以使用HystrixCommandKey.Factory.asKey(String name)方法定义一个名称。


2 HystrixThreadPoolKey

HystrixThreadPoolKey是HystrixCommand所在的线程池,如果该参数不设置则使用HystrixCommandGroupKey作为HystrixThreadPoolKey,这种情况下同一个HystrixCommandGroupKey下的依赖调用共用同一个线程池内,如果不想共用同一个线程池,则需要设置该参数。可以使用HystrixThreadPoolKey.Factory.asKey(String name)方法设置。


3 HystrixCommandGroupKey

Hystrix需要对HystrixCommand进行分组,便于统计、管理,所以需要一个分组名称,HystrixCommandGroupKey就是用于定义分组名称,可以使用HystrixCommandGroupKey.Factory.asKey(String name)方法定义一个分组名。每个HystrixCommand必须要配置一个分组名,一个是用于分组,还有如果没有配置HystrixThreadPoolKey,这个分组名将会用于线程池名。


4 HystrixThreadPoolProperties

从名称上可以看出这是线程池的属性配置,可以通过它设置核心线程数大小、最大线程数、任务队列大小等,当然它也又一些默认的配置参数。


5 HystrixCommandProperties

这个就是HystrixCommand的属性配置,它可以设置熔断器是否可用、熔断器熔断的错误百分比、依赖调用超时时间等,它有一些默认的配置参数,如熔断器熔断的错误百分比默认值是50%、依赖调用超时时间默认值是1000毫秒。



二. 使用@HystrixCommand方式

这里只讲注解的使用方式以及比较重要的部分,如果需要了解全部查看:https://github.com/Netflix/Hystrix/wiki/How-To-Use

2.1 Hystrix command

2.1.1 同步执行

public  class  UserService {
...
     @HystrixCommand
     public  User getUserById(String id) {
         return  userResource.getUserById(id);
     }
}

2.1.2 异步执行

public   class   UserService {
...
    @HystrixCommand
     public  Future<User> getUserByIdAsync( final  String id) {
         return  new  AsyncResult<User>() {
             @Override
             public  User invoke() {
                 return  userResource.getUserById(id);
             }
         };
     }
}

2.1.3 反应执行()

public class UserService {
    ...
    @HystrixCommand
     public  Observable<User> getUserById( final  String id) {
         return  Observable.create( new  Observable.OnSubscribe<User>() {
                 @Override
                 public  void  call(Subscriber<?  super  User> observer) {
                     try  {
                         if  (!observer.isUnsubscribed()) {
                             observer.onNext( userResource.getUserById(id) );
                             observer.onCompleted();
                         }
                     catch  (Exception e) {
                         observer.onError(e);
                     }
                 }
             });
     }
}

2.1.4 三种模式使用区别

  • 同步执行:当执行到注解方法时,程序会顺序执行。

  • 异步执行:当执行到注解方法时,会并发异步执行,返回一个Future对象,后面使用.get()方法来阻塞拿到结果。如果有多个方法时,执行时间就是其中最长的一个服务的执行时间。

  • 反应执行:当执行到注解方法时,返回一个观察者。支持EAGER和LAZY模式。和同步异步执行的区别是,当对多个方法之间的返回结果不需要做合并而是希望当多个方法返回时触发一些事件时比较适合使用该模式。

反应执行没太明白,如果需要了解可以先参考下这个https://mcxiaoke.gitbooks.io/rxdocs/content/Intro.html

2.2 Fallback

@HystrixCommand (fallbackMethod =  "fallback1" )
User getUserById(String id) {
     throw  new  RuntimeException( "getUserById command failed" );
}
 
@HystrixCommand (fallbackMethod =  "fallback2" )
User fallback1(String id, Throwable e) {
     assert  "getUserById command failed" .equals(e.getMessage());
     throw  new  RuntimeException( "fallback1 failed" );
}
 
@HystrixCommand (fallbackMethod =  "fallback3" )
User fallback2(String id) {
     throw  new  RuntimeException( "fallback2 failed" );
}

注意点:

  • fallback应该和注解方法在同一类下

  • fallback的返回值和参数列表应该和注解方法一致,如果需要异常,则在末尾添加Throwable参数,对访问修饰符无要求

  • fallback方法上可以继续添加fallback

command和fallback只支持以下几种组合:

  • sync command, sync fallback

  • async command, sync fallback

  • async command, async fallback

2.3 Error Propagation

@HystrixCommand (ignoreExceptions = {BadRequestException. class })
     public  User getUserById(String id) {
         return  userResource.getUserById(id);
     }

当遇到BadRequestException时不会进入fallback,而是直接抛出异常

2.4 Configuration

@HystrixCommand (groupKey= "UserGroup" , commandKey =  "GetUserByIdCommand"
                 commandProperties = {
                         @HystrixProperty (name =  "execution.isolation.thread.timeoutInMilliseconds" , value =  "500" )
                 },
                 threadPoolProperties = {
                         @HystrixProperty (name =  "coreSize" , value =  "30" ),
                         @HystrixProperty (name =  "maxQueueSize" , value =  "101" ),
                         @HystrixProperty (name =  "keepAliveTimeMinutes" , value =  "2" ),
                         @HystrixProperty (name =  "queueSizeRejectionThreshold" , value =  "15" ),
                         @HystrixProperty (name =  "metrics.rollingStats.numBuckets" , value =  "12" ),
                         @HystrixProperty (name =  "metrics.rollingStats.timeInMilliseconds" , value =  "1440" )
         })

参数

作用

备注

groupKey

表示所属的group,一个group共用线程池

默认值:getClass().getSimpleName();

commandKey


默认值:当前执行方法名

execution.isolation.strategy

隔离策略,有THREAD和SEMAPHORE

默认使用THREAD模式,以下几种可以使用SEMAPHORE模式:

  • 只想控制并发度

  • 外部的方法已经做了线程隔离

  • 调用的是本地方法或者可靠度非常高、耗时特别小的方法(如medis)

execution.isolation.thread.timeoutInMilliseconds

 

超时时间

默认值:1000

在THREAD模式下,达到超时时间,可以中断

在SEMAPHORE模式下,会等待执行完成后,再去判断是否超时

设置标准:

有retry,99meantime+avg meantime

没有retry,99.5meantime

execution.timeout.enabled

是否打开超时

execution.isolation.thread.interruptOnTimeout

是否打开超时线程中断THREAD模式有效

execution.isolation.semaphore.maxConcurrentRequests

信号量最大并发度SEMAPHORE模式有效,默认值:10

fallback.isolation.semaphore.maxConcurrentRequests

fallback最大并发度默认值:10

circuitBreaker.requestVolumeThreshold

熔断触发的最小个数/10s默认值:20

circuitBreaker.sleepWindowInMilliseconds

熔断多少秒后去尝试请求默认值:5000

circuitBreaker.errorThresholdPercentage

失败率达到多少百分比后熔断

默认值:50

主要根据依赖重要性进行调整

circuitBreaker.forceClosed

是否强制关闭熔断如果是强依赖,应该设置为true

coreSize

线程池coreSize

默认值:10

设置标准:qps*99meantime+breathing room

maxQueueSize

请求等待队列

默认值:-1

如果使用正数,队列将从SynchronizeQueue改为LinkedBlockingQueue