在实现一个大型分布式系统时,往往还需要更多高级的配置功能。下面将详细介绍Hystrix各接口和注解的使用方法。
创建请求命令
Hystrix命令就是前面的HystrixCommand,它用来封装具体的依赖服务调用逻辑。
- 使用继承的方式,比如:
1
2
3
4
5
6
7
8
9
10
11
12
13public class UserCommand extends HystrixCommand<User> {
private RestTemplate restTemplate;
private Long id;
public UserCommand(RestTemplate restTemplate, Long id) {
super(Setter.withGroupKey(asKey("")));
this.restTemplate = restTemplate;
this.id = id;
}
@Override
protected User run() {
return restTemplate.getForObject("http://USER-SERVICE/users/{1}", User.class, id);
}
}
通过继承的方式,我们可以实现请求的同步或异步执行。
- 同步执行:User user=new UserCommand(restTemplate,10L).execute();
- 异步执行:Future
futureUser = new UserCommand(restTemplate, 11L).queue();
异步执行时,可以通过对返回的futureUser调用get方法来获取结果。
- 使用“快速入门”中讲解的方式
1
2
3
4
5
6
7
8
9@Service
public class UserService {
@Autowired
private RestTemplate restTemplate;
@HystrixCommand
public User getUserById(Long id){
return restTemplate.getForObject("http://USER-SERVICE/users/{1}", User.class, id);
}
}
虽然@HystrixCommand注解可以非常优雅地定义Hystrix命令的实现,但是getUserById只是实现同步执行,如需实现异步执行,要另外定义:1
2
3
4
5
6
7
8
9@HystrixCommand
public Future<User> getUserByIdAsync(final Long id){
return new AsyncResult<User>() {
@Override
public User invoke() {
return restTemplate.getForObject("http://USER-SERVICE/users/{1}", User.class, id);
}
};
}
- 通过Observable来实现响应式执行方式
通过调用observe()和toObservable()方法可以返回Observable对象,比如:1
2Observable<User> ho = new UserCommand(restTemplate,14L).observe();
Observable<User> co = new UserCommand(restTemplate,15L).toObservable();
虽然HystrixCommand具备了observe()和toObservable()的功能,但它返回的Observable只能发射一次数据,所以Hystrix还提供了另外一个特殊的命令封装HystrixObsercableCommand,通过它实现的命令可以获取能发射多次的Observable。
命令的执行逻辑需要在construct方法中重载:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26public class UserObservableCommand extends HystrixObservableCommand<User> {
private RestTemplate restTemplate;
private Long id;
public UserObservableCommand(RestTemplate restTemplate, Long id) {
super(Setter.withGroupKey(asKey("")));
this.restTemplate=restTemplate;
this.id=id;
}
@Override
protected Observable<User> construct() {
return Observable.create(new Observable.OnSubscribe<User>() {
@Override
public void call(Subscriber<? super User> observer) {
try {
if (!observer.isUnsubscribed()) {
User user = restTemplate.getForObject("http://USER-SERVICE/users/{1}", User.class, id);
observer.onNext(user);
observer.onCompleted();
}
} catch (Exception e) {
observer.onError(e);
}
}
});
}
}
- 使用@HystrixCommand注解实现响应式命令
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22public class ObservableUserService {
@Autowired
RestTemplate restTemplate;
@HystrixCommand(observableExecutionMode = ObservableExecutionMode.EAGER)
public Observable<User> getUserById(final Long id) {
return Observable.create(new Observable.OnSubscribe<User>() {
@Override
public void call(Subscriber<? super User> observer) {
try {
if (!observer.isUnsubscribed()) {
User user = restTemplate.getForObject("http://USER-SERVICE/users/{1}", User.class, id);
observer.onNext(user);
observer.onCompleted();
}
} catch (Exception e) {
observer.onError(e);
}
}
});
}
}
- observableExecutionMode = ObservableExecutionMode.EAGER:表示使用observe()执行方式
- observableExecutionMode = ObservableExecutionMode.LAZY:表示使用toObservable()执行方式
HystrixObservableCommand和HystrixCommand进行比较:
- HystrixCommand提供了同步和异步两种执行方式,而HystrixObservableCommand只有异步方式
- HystrixCommand的run方法是用内部线程池的线程来执行的,同一个HystrixCommand对象只能执行一次run(),而HystrixObservableCommand则是由调用方(例如Tomcat容器)的线程来执行的,因为是异步,所以两种方式都能很好的起到资源隔离的效果。
- HystrixCommand一次只能发送单条数据返回,而HystrixObservableCommand一次可以发送多条数据返回.
定义服务降级
fallback是Hystrix命令执行失败时使用的后备方法,用来实现服务降级处理逻辑。
针对前面创建请求命令的三种方式,当命令执行过程中出现错误、超时、线程池拒绝、断路器熔断等情况时,分别执行对应创建请求命令方式的降级处理逻辑。
使用继承的方式——重载getFallback()方法
1
2
3
4@Override
protected User getFallback(){
return new User();
}通过Observable来实现响应式执行方式——重载resumeWithFallback()方法
1 | @Override |
- 使用@HystrixCommand注解(对象创建请求命令的第2、4种方式)——使用@HystrixCommand中的fallbackMethod参数指定服务降级实现方法
具体见快速入门中的例子。
如果降级实现方法并不是一个稳定逻辑,它依然可能发生异常,那么我们也可以为它添加@HystrixCommand注解以生成Hystrix命令,同时使用fallbackMethod来指定服务降级逻辑。比如:1
2
3
4
5
6
7
8
9@HystrixCommand(fallbackMethod = "defaultUserSec")
//observableFailed()已经是降级逻辑
public User observableFailed(Long id) {
//此处可能是另外一个网络请求来获取,所以也可能失败
return new User();
}
public User defaultUserSec() {
return new User();
}
注意:
- fallbackMethod指定的服务降级方法返回值类型和函数签名必须与对应的请求命令执行方法一致
- 通过继承HystrixCommand
方式创建的请求命令,run()方法的返回类型为T - 通过继承HystrixObservableCommand
方式创建的请求命令,重写的construct返回类型为Observable
以下情况可以不实现降级逻辑
- 执行文件的写操作:当写失败时候只需要通知调用者即可,因为写操作一般占据不少时间
- 执行批处理或离线计算的命令:当失败的时候告诉调用者重试即可,不用去进行备用方法的调用
命令名称、分组以及线程池划分
以继承方式实现的Hystrix命令设置命名名称、分组和线程池划分方式
1
2
3
4
5
6
7public UserCommand(RestTemplate restTemplate, Long id) {
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("CommandGroupKey"))
.andCommandKey(HystrixCommandKey.Factory.asKey("CommandKey"))
.andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("ThreadPoolKey")));
this.restTemplate = restTemplate;
this.id = id;
}使用注解创建的请求命令设置命名名称、分组和线程池划分方式
1
2
3
4@HystrixCommand(commandKey = "getUserById",groupKey = "UserGroup",threadPoolKey = "getUserByIdThread",fallbackMethod = "getError")
public User getUserById(Long id) {
return restTemplate.getForObject("http://USER-SERVICE/users/{1}", User.class, id);
}- CommandKey:命令名称
默认情况下,以继承方式实现的Hystrix的命令名称是类名,注解实现的Hystrix的命令名称是方法名。
每个CommandKey代表一个依赖抽象,相同的依赖要使用相同的CommandKey名称。依赖隔离的根本就是对相同CommandKey的依赖做隔离。 - CommandGroup:分组
继承实现方式中的必选参数,只有withGroupKey静态函数可以创建Setter实例。Hystrix会根据组来组织和统计命令的告警、仪表盘等信息。在不指定ThreadPoolKey的情况下,字面值用于对不同依赖的线程池/信号区分。 - ThreadPoolKey:线程池/信号
实现更细粒度的线程池划分,通常情况下,尽量通过HystrixThreadPoolKey的方式来指定线程池的划分,因为多个不同的命令可能从业务逻辑上来看属于同一个组,但是往往从实现本身上需要跟其他命令进行隔离。
如:对同一依赖的不同远程调用(一个是redis 一个是http),使用HystrixThreadPoolKey做隔离区分.
- CommandKey:命令名称
请求缓存
请求依赖服务的资源需要通过通信来实现,类似于对数据库这样的外部资源进行读写操作,在高并发情况下会成为系统的瓶颈。开启和使用请求缓存,减轻高并发时的请求线程消耗、降低请求响应时间。
开启请求缓存功能
- 请求命令的创建通过继承HystrixCommand或HystrixObservableCommand时,重载getCachekey()方法
UserCommand:1
2
3
4@Override
protected String getCacheKey(){
return String.valueOf(id);
}
服务提供者中:1
2
3
4
5
6
7
8
9
10
11
12@RequestMapping(value = "/{id}", method = RequestMethod.GET)
public User getUser(@PathVariable Long id) {
if (id == 1) {
System.out.println(">>>>>>>>/users/{id}");
User userCache = new User("dodo", 18);
userCache.setId(id);
return userCache;
}
User user = new User("didi", 20);
user.setId(id);
return user;
}
服务消费者的controller中:1
2
3
4
5
6
7
8
9
10
11@RequestMapping(value = "test-cache")
public User testCahce() {
HystrixRequestContext.initializeContext();
User user1 = new UserCommand(restTemplate, 1L).execute();
User user2 = new UserCommand(restTemplate, 1L).execute();
User user3 = new UserCommand(restTemplate, 1L).execute();
System.out.println("user1:" + user1);
System.out.println("user2:" + user2);
System.out.println("user3:" + user3);
return user1;
}
- 通过注解创建请求命令,使用@CacheResult注解开启缓存,指定cacheKeyMethod为缓存key值的生成策略。若不指定则默认使用HystrixCommand的所有参数的组合作为缓存的key
1
2
3
4
5
6
7
8@HystrixCommand(commandKey = "getUserById",groupKey = "UserGroup",threadPoolKey = "getUserByIdThread",fallbackMethod = "getError")
@CacheResult(cacheKeyMethod = "getCacheKey")
public User getUserById(Long id) {
return restTemplate.getForObject("http://USER-SERVICE/users/{1}", User.class, id);
}
public String getCacheKey(Long id){
return String.valueOf(id);
}
系统在运行时会根据getCacheKey方法的返回值来判断这个请求是否和之前执行过的请求一样,即被缓存,如果被缓存,则直接使用缓存数据而不去请求服务提供者.
好处:
- 减少重复请求数,降低依赖服务的并发度。
- 在同一用户请求的上下文中,相同依赖服务的返回数据始终保持一致。
- 请求缓存在run()和construct()执行之前生效,所以可以有效减少不必要的线程开销。
注意:
开启请求缓存功能的Hystrix命令调用时,要初始化Hystrix请求上下文:HystrixRequestContext.initializeContext();清理失效缓存功能
有一种特殊的情况:如果我将服务提供者的数据修改了,那么缓存的数据就应该被清除,否则用户在读取的时候就有可能获取到一个错误的数据,我们可以通过HystrixRequestCache.clear()方法来清除缓存。1
2
3
4
5
6
7
8
9
10
11
12@RequestMapping(value = "test-cache-clear")
public User testCacheClear() {
HystrixRequestContext.initializeContext();
User user1 = new UserCommand(restTemplate, 1L).execute();
UserCommand.flushCache(1L);
User user2 = new UserCommand(restTemplate, 1L).execute();
User user3 = new UserCommand(restTemplate, 1L).execute();
System.out.println("user1:" + user1);
System.out.println("user2:" + user2);
System.out.println("user3:" + user3);
return user1;
}
请求合并
使用继承实现请求合并器
通常微服务架构中的依赖通过远程调用实现,而远程调用中最常见的问题就是通信消耗与连接数占用。在高并发的情况之下,因通信次数的增加,总的通信时间消耗将会变的不那么理想。同时,因为对依赖服务的线程池资源有限,将出现排队等待与响应延迟的情况。为了优化这两个问题,Hystrix提供了HystrixCollapser来实现请求的合并,以减少通信消耗和线程数的占用。
HystrixCollapser实现了在HystrixCommand之前放置一个合并处理器,它将处于一个很短时间窗(默认10毫秒)内对同一依赖服务的多个请求进行整合并以批量方式发起请求的功能(服务提供方也需要提供相应的批量实现接口)。通过HystrixCollapser的封装,我们不需要去关注线程合并的细节过程,只需要关注批量化服务和处理。下面是HystrixCollapser的使用示例:
增加服务提供方USER-SERVICE批量实现接口:
1
2
3
4
5
6
7
8
9
10@RequestMapping(method = RequestMethod.GET)
public List<User> findAll(@RequestParam String ids){
System.out.println(">>>>>>>>请求合并:"+ids);
ArrayList<User> users=new ArrayList<User>();
users.add(new User(ids,11));
users.add(new User(ids,12));
users.add(new User(ids,13));
users.add(new User(ids,14));
return users;
}服务消费端,通过RestTemplate实现简单调用
1
2
3
4
5
6@HystrixCommand
public List<User> findAll(List<Long> ids) {
System.out.println("findAll---------"+ids+"Thread.currentThread().getName():" + Thread.currentThread().getName());
User[] users=restTemplate.getForObject("http://USER-SERVICE/users?ids={1}", User[].class, StringUtils.join(ids, ","));
return Arrays.asList(users);
}创建UserBatchCommand对象
1
2
3
4
5
6
7
8
9
10
11
12
13
14public class UserBatchCommand extends HystrixCommand<List<User>> {
UserService userService;
List<Long> userIds;
public UserBatchCommand(UserService userService, List<Long> userIds) {
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("CollapsingGroup"))
.andCommandKey(HystrixCommandKey.Factory.asKey("CollapsingKey")));
this.userService = userService;
this.userIds = userIds;
}
@Override
protected List<User> run() throws Exception {
return userService.findAll(userIds);
}
}通过继承HystrixCollapser实现请求合并器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32public class UserCollapseCommand extends HystrixCollapser<List<User>, User, Long> {
private UserService userService;
private Long userId;
public UserCollapseCommand(UserService userService, Long userId) {
super(Setter.withCollapserKey(HystrixCollapserKey.Factory.asKey("userCollapseCommand")).andCollapserPropertiesDefaults(
HystrixCollapserProperties.Setter().withTimerDelayInMilliseconds(100)));
this.userService = userService;
this.userId = userId;
}
@Override
public Long getRequestArgument() {
return userId;
}
@Override
protected HystrixCommand<List<User>> createCommand(Collection<CollapsedRequest<User, Long>> collapsedRequests) {
List<Long> userIds = new ArrayList<>(collapsedRequests.size());
userIds.addAll(collapsedRequests
.stream()
.map(CollapsedRequest::getArgument)
.collect(Collectors.toList()));
return new UserBatchCommand(userService, userIds);
}
@Override
protected void mapResponseToRequests(List<User> batchResponse, Collection<CollapsedRequest<User, Long>> collapsedRequests) {
System.out.println("mapResponseToRequests");
int count = 0;
for (CollapsedRequest<User, Long> collapsedRequest : collapsedRequests) {
User user = batchResponse.get(count++);
collapsedRequest.setResponse(user);
}
}
}
在上面的构造函数中,我们为请求合并器设置了时间延迟属性,即请求时间间隔在100ms之内的请求会被合并为一个请求。合并器会在该时间窗内收集获取单个User的请求并在时间窗结束时进行合并组装成单个批量请求。下面getRequestArgument方法返回给定的单个请求参数userId,而createCommand和mapResponseToRequests是请求合并器的两个核心:
- createCommand:该方法的collapsedRequests参数中保存了延迟时间窗中收集到的所有获取单个User的请求。通过获取这些请求的参数来组织上面我们准备的批量请求命令UserBatchCommand实例。
- mapResponseToRequests:在批量命令UserBatchCommand实例被触发执行完成之后,该方法开始执行,其中batchResponse参数保存了createCommand中组织的批量请求命令的返回结果,而collapsedRequests参数则代表了每个被合并的请求。在这里我们通过遍历批量结果batchResponse对象,为collapsedRequests中每个合并前的单个请求设置返回结果,以此完成批量结果到单个请求结果的转换.
- 测试,在服务消费者端创建访问接口,来测试合并请求
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20@RequestMapping(value = "teest-request-merger")
public void testMerger() throws ExecutionException, InterruptedException {
HystrixRequestContext context = HystrixRequestContext.initializeContext();
UserCollapseCommand uc1 = new UserCollapseCommand(userCollapseByExtendsService, 1L);
UserCollapseCommand uc2 = new UserCollapseCommand(userCollapseByExtendsService, 2L);
UserCollapseCommand uc3 = new UserCollapseCommand(userCollapseByExtendsService, 3L);
Future<User> f1=uc1.queue();
Future<User> f2=uc2.queue();
User user1 = f1.get();
User user2 = f2.get();
User user3 = uc3.queue().get();
Thread.sleep(3000);
UserCollapseCommand uc4 = new UserCollapseCommand(userCollapseByExtendsService, 4L);
User user4 = uc4.queue().get();
System.out.println("book1>>>" + user1);
System.out.println("book2>>>" + user2);
System.out.println("book3>>>" + user3);
System.out.println("book4>>>" + user4);
context.close();
}
- 测试,在服务消费者端创建访问接口,来测试合并请求
说明:
- 初始化HystrixRequestContext
- 创建UserCollapseCommand类的实例来发起请求,先发送3个请求,然后睡眠3秒钟,再发起1个请求,这样,前3个请求就会被合并为一个请求,第四个请求因为间隔的时间比较久,所以不会被合并,而是单独创建一个线程去处理。
但是:
如果没有uc3.queue()这步来得到Future
使用注解实现请求合并器
1 | @Service |
测试:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17@RequestMapping(value = "test-request-merger2")
public void testMerger2() throws ExecutionException, InterruptedException {
HystrixRequestContext context = HystrixRequestContext.initializeContext();
Future<User> f1=userCollapseService.find(1L);
Future<User> f2=userCollapseService.find(2L);
Future<User> f3=userCollapseService.find(3L);
User user1 = f1.get();
User user2 = f2.get();
User user3 = f3.get();
Thread.sleep(3000);
User user4=userCollapseService.find(4L).get();
System.out.println("book1>>>" + user1);
System.out.println("book2>>>" + user2);
System.out.println("book3>>>" + user3);
System.out.println("book4>>>" + user4);
context.close();
}
请求合并的额外开销
虽然通过请求合并可以减少请求的数量以缓解依赖服务线程池的资源,但是在使用的时候也需要注意它所带来的额外开销:用于请求合并的延迟时间窗会使得依赖服务的请求延迟增高。比如:某个请求在不通过请求合并器访问的平均耗时为5ms,请求合并的延迟时间窗为10ms(默认值),那么当该请求的设置了请求合并器之后,最坏情况下(在延迟时间窗结束时才发起请求)该请求需要15ms才能完成。
由于请求合并器的延迟时间窗会带来额外开销,所以我们是否使用请求合并器需要根据依赖服务调用的实际情况来选择,主要考虑下面两个方面:
- 请求命令本身的延迟。如果依赖服务的请求命令本身是一个高延迟的命令,那么可以使用请求合并器,因为延迟时间窗的时间消耗就显得莫不足道了。
- 延迟时间窗内的并发量。如果一个时间窗内只有1-2个请求,那么这样的依赖服务不适合使用请求合并器,这种情况下不但不能提升系统性能,反而会成为系统瓶颈,因为每个请求都需要多消耗一个时间窗才响应。相反,如果一个时间窗内具有很高的并发量,并且服务提供方也实现了批量处理接口,那么使用请求合并器可以有效的减少网络连接数量并极大地提升系统吞吐量,此时延迟时间窗所增加的消耗就可以忽略不计了。