介紹:
Hystrix的請求合並就是把重復的請求批量的用一個HystrixCommand命令去執行,以減少通信消耗和線程數的占用。Hystrix的請求合並用到了HystrixCollapser這個抽象類,它在HystrixCommand之前前放置一個合並處理器,將處於一個很短的時間窗(默認10ms)內對同一依賴服務的多個請求進行整合並以批量方式發起請求的功能(服務提供方也需要提供相應的匹狼實現接口)。下面我們通過一個例子來看看怎么使用。
示例:
這個示例是基於spring cloud的,對此不熟悉的可以參考這里了解。
1.首先我們需要一個EurekaServer來作為注冊中心。這個沒什么特別的說明,可以參考代碼示例中的 eureka-server工程
2.新建一個服務提供者工程eureka-server-service
2.1 新建一個User.java。這個model必須要有一個無參的默認構造器,否則后面的實驗會報錯
package org.hope.model; public class User { //必須要有一個無參的構造器 public User(){} public User(Long id, String name) { this.id = id; this.name = name; } private Long id; private String name; setters()&getters(); }
2.2 新建一個提供服務的controller
package org.hope.web; import org.hope.model.User; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RestController; import java.util.ArrayList; import java.util.List; @RestController public class UserBatchController { @RequestMapping(value = "/users", method = RequestMethod.GET) public List<User> batchUser(String ids) { System.out.println("ids===:" + ids); List<User> lists = new ArrayList<User>(); lists.add(new User(1l, "小明")); lists.add(new User(2l, "小紅")); lists.add(new User(3l, "小張")); lists.add(new User(4l, "小王")); lists.add(new User(5l, "小李")); return lists; } @RequestMapping(value = "/users/{id}", method = RequestMethod.GET) public User singleUser(@PathVariable("id") String id) { User user = new User(100L, "大王"); return user; } }
2.3 springboot的main函數
package org.hope; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.client.discovery.EnableDiscoveryClient; import org.springframework.cloud.netflix.eureka.EnableEurekaClient; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; @SpringBootApplication @EnableEurekaClient @RestController public class ServiceApplication { public static void main(String[] args) { SpringApplication.run(ServiceApplication.class, args); } }
2.4 配置文件application.yml
eureka:
client:
serviceUrl:
#注冊中心的地址
defaultZone: http://localhost:8761/eureka/
server:
#當前服務端口號
port: 8762
spring:
application:
#當前應用名稱
name: service-batch
3.新建一個服務消費者工程ribbon-hystrix,在這個工程里我們測試hystrix批量服務調用的請求合並功能
3.1新建一個model User.java。這個model一定要有無參的構造器
package org.hope.lee.model; public class User { //一定要有無參的構造器 public User(){} public User(Long id, String name) { this.id = id; this.name = name; } private Long id; private String name; getters()&setters(); }
3.2 service負責調用服務
package org.hope.lee.service; import org.hope.lee.model.User; import java.util.List; public interface UserService { public User find(Long id); public List<User> findAll(List<Long> ids); }
package org.hope.lee.service; import org.apache.commons.lang.ArrayUtils; import org.apache.commons.lang.StringUtils; import org.hope.lee.model.User; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.web.client.RestTemplate; import java.util.Arrays; import java.util.List; @Service("userService") public class UserServiceImpl implements UserService{ @Autowired private RestTemplate restTemplate; @Override public User find(Long id) { return restTemplate.getForObject("http://localhost:8762/users/{1}",User.class, id); } @Override public List<User> findAll(List<Long> ids) { System.out.println("finaAll request:---------" + ids + "Thread.currentThread().getName():-------" + Thread.currentThread().getName()); User[] users = restTemplate.getForObject("http://localhost:8762/users?ids={1}", User[].class, StringUtils.join(ids, ",")); return Arrays.asList(users); } }
3.3 HystrixCommand命令執行請求
package org.hope.lee.command; import com.netflix.hystrix.HystrixCommand; import com.netflix.hystrix.HystrixCommandGroupKey; import com.netflix.hystrix.HystrixCommandKey; import org.hope.lee.model.User; import org.hope.lee.service.UserService; import java.util.ArrayList; import java.util.List; public class UserBatchCommand extends HystrixCommand<List<User>> { UserService userService; List<Long> userIds; public UserBatchCommand(UserService userService, List<Long> userIds) { super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")). andCommandKey(HystrixCommandKey.Factory.asKey("GetValueForKey"))); this.userService = userService; this.userIds = userIds; } @Override protected List<User> run() throws Exception { return userService.findAll(userIds); } @Override protected List<User> getFallback() { List<User> users = new ArrayList<User>(); users.add(new User(99L, "失敗者")); return new ArrayList<User>(users); } }
3.4 HystrixCollapser命令來做請求合並
package org.hope.lee.command; import com.netflix.hystrix.HystrixCollapser; import com.netflix.hystrix.HystrixCollapserKey; import com.netflix.hystrix.HystrixCollapserProperties; import com.netflix.hystrix.HystrixCommand; import org.hope.lee.model.User; import org.hope.lee.service.UserService; import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.stream.Collectors;
/**
* Created by lisen on 2017/12/27.
* 通過看HystrixCollapser類的源碼:
* public abstract class HystrixCollapser<BatchReturnType, ResponseType, RequestArgumentType>
* 我們可以知道List<User>表示:合並后批量請求的返回類型
* User表示:單個請求返回的類型
* Long表示:請求參數類型
*/
public class UserCollapseCommand extends HystrixCollapser<List<User>, User, Long> { private UserService userService; private Long userId; public UserCollapseCommand(UserService userService, Long userId) { super(Setter.withCollapserKey(HystrixCollapserKey.Factory.asKey("userCollapseCommand")). andCollapserPropertiesDefaults(HystrixCollapserProperties.Setter().withTimerDelayInMilliseconds(100))); this.userService = userService; this.userId = userId; } @Override public Long getRequestArgument() { return userId; } /** * * @param collapsedRequests 保存了延遲時間窗中收集到的所有獲取單個User的請求。通過獲取這些請求的參數來組織 * 我們准備的批量請求命令UserBatchCommand實例 * @return */ @Override protected HystrixCommand<List<User>> createCommand(Collection<CollapsedRequest<User, Long>> collapsedRequests) { List<Long> userIds = new ArrayList<>(collapsedRequests.size()); userIds.addAll(collapsedRequests.stream().map(CollapsedRequest::getArgument).collect(Collectors.toList())); return new UserBatchCommand(userService, userIds); } /** * 在批量請求命令UserBatchCommand實例被觸發執行完成后,該方法開始執行, * 在這里我們通過批量結果batchResponse對象,為collapsedRequests中每個合並前的單個請求設置返回結果。 * 來完成批量結果到單個請求結果的轉換 * @param batchResponse 保存了createCommand中組織的批量請求命令的返回結果 * @param collapsedRequests 代表了每個合並的請求 */ @Override protected void mapResponseToRequests(List<User> batchResponse, Collection<CollapsedRequest<User, Long>> collapsedRequests) { System.out.println("mapResponseToRequests========>"); int count = 0; for(CollapsedRequest<User, Long> collapsedRequest : collapsedRequests) { User user = batchResponse.get(count++); collapsedRequest.setResponse(user); } } }
3.5寫一個controller來輔助測試
package org.hope.lee.web; import com.netflix.hystrix.strategy.concurrency.HystrixRequestContext; import org.hope.lee.command.UserCollapseCommand; import org.hope.lee.model.User; import org.hope.lee.service.UserService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RestController; import java.util.concurrent.Future; @RestController public class CollapseCommandController { @Autowired private UserService userService; @RequestMapping(value = "/collapse", method = RequestMethod.GET) public void requestCollapse() { HystrixRequestContext context = HystrixRequestContext.initializeContext(); try { Future<User> f1 = new UserCollapseCommand(userService, 1L).queue(); Future<User> f2 = new UserCollapseCommand(userService, 2L).queue(); Future<User> f3 = new UserCollapseCommand(userService, 3L).queue(); Thread.sleep(3000); Future<User> f4 = new UserCollapseCommand(userService, 4L).queue(); Future<User> f5 = new UserCollapseCommand(userService, 5L).queue(); User u1 = f1.get(); User u2 = f2.get(); User u3 = f3.get(); User u4 = f4.get(); User u5 = f5.get(); System.out.println(u1.getName()); System.out.println(u2.getName()); System.out.println(u3.getName()); System.out.println(u4.getName()); System.out.println(u5.getName()); } catch (Exception e) { e.printStackTrace(); }finally { context.close(); } } }
3.6 spring boot main方法
package org.hope.lee; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.client.discovery.EnableDiscoveryClient; import org.springframework.cloud.client.loadbalancer.LoadBalanced; import org.springframework.cloud.netflix.hystrix.EnableHystrix; import org.springframework.cloud.netflix.hystrix.dashboard.EnableHystrixDashboard; import org.springframework.context.annotation.Bean; import org.springframework.web.client.RestTemplate; @SpringBootApplication @EnableDiscoveryClient public class ServiceRibbonApplication { public static void main(String[] args) { SpringApplication.run(ServiceRibbonApplication.class, args); } @Bean RestTemplate restTemplate() { return new RestTemplate(); } }
3.7.配置文件applicationi.properties
eureka.client.serviceUrl.defaultZone=http://localhost:8761/eureka/
server.port=8763
spring.application.name=batch-customer
測試:
1.運行eureka-server啟動注冊中心
2.運行eureka-server-service啟動服務提供者
3.運行ribbon-hystrix啟動服務消費者
4.在瀏覽器輸入: http://localhost:8763/collapse
輸出結果:從結果中我們看到前3次請求合並為一個請求,后面2次請求合並為了一個請求
遇到的問題:
model的實體類一定要定義一個無參數的默認構造器。否則就會報錯。
使用注解實現請求合並器
package org.hope.lee.service; import com.netflix.hystrix.contrib.javanica.annotation.HystrixCollapser; import com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand; import com.netflix.hystrix.contrib.javanica.annotation.HystrixProperty; import org.apache.commons.lang.StringUtils; import org.hope.lee.model.User; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.web.client.RestTemplate; import java.util.Arrays; import java.util.List; import java.util.concurrent.Future; @Service("peopleService") public class PeopleServiceImpl implements PeopleService { @Autowired private RestTemplate restTemplate; @HystrixCollapser(batchMethod = "findAll", collapserProperties = {@HystrixProperty(name = "timerDelayInMilliseconds", value = "100")}) public Future<User> find(Long id) { throw new RuntimeException("This method body should not be executed"); } @HystrixCommand public List<User> findAll(List<Long> ids) { System.out.println("Annotation---------" + ids + "Thread.currentThread().getName():" + Thread.currentThread().getName()); User[] users = restTemplate.getForObject("http://localhost:8762/users?ids={1}", User[].class, StringUtils.join(ids, ",")); return Arrays.asList(users); } }
https://gitee.com/huayicompany/Hystrix-learn/tree/master/hystrix-request-collapsing
參考:
[1]博客,https://segmentfault.com/a/1190000011468804,Spring Cloud中Hystrix的請求合並
[2]官網,https://github.com/Netflix/Hystrix/wiki/How-To-Use#Collapsing, Request Collapsing
[3]《SpringCloud微服務實戰》,電子工業出版社,翟永超