背景
在互聯網的高並發場景下,請求會非常多,但是數據庫連接池比較少,或者說需要減少CPU壓力,減少處理邏輯的,需要把單個查詢,用某些手段,改為批量查詢多個后返回。
如:支付寶中,查詢“個人信息”,用戶只會觸發一次請求,查詢自己的信息,但是多個人同時這樣做就會產生多次數據庫連接。為了減少連接,需要在JAVA服務端進行合並請求,把多個“個人信息”查詢接口,合並為批量查詢多個“個人信息”接口,然后以個人信息在數據庫的id作為Key返回給上游系統或者頁面URL等調用方。
目的
- 減少訪問數據庫的次數
- 單位時間內的多個請求,合並為一個請求。讓業務邏輯層把單個查詢的sql,改為批量查詢的sql。或者邏輯里面需要調用redis,那批量邏輯里面就可以用redis的pipeline去實現。
點贊再看,關注公眾號:【地藏思維】給大家分享互聯網場景設計與架構設計方案
掘金:地藏Kelvin https://juejin.im/user/5d67da8d6fb9a06aff5e85f7
主要解決手段
- SpringCloud的Hystrix的自定義HystrixCollapse和HystrixCommand
- SpringCloud的Hystrix注解方式。
- 沒有服務治理框架時,利用JDK隊列、定時任務線程池處理。
鑒於現在大部分都有SpringCloud,所以先說第2種的注解方式,后續再說第3種,不用第1種是因為注解方式比較方便。
交互流程
- 主思路是接收請求后,從上一次計數開始累計等待200ms
- 一次過處理200ms內的接口入參
- 然后以id為key,批量查詢多個id的結果
- 批量查詢完后,以id為key,返回給上游系統的單個查詢
測試手段
- Postman
- 在本地系統創建單元測試方式,調用自己啟動的服務
- 建立上游系統工程來調用
- 手動在頁面請求多次
- Jmeter生成多線程請求
選其一種。建議1、4、5
開發
本文主要使用Hystrix注解的方式去實現,還有另外一種辦法實現的就是編碼自定義HystrixCollapser,那種方法是建立兩個類,一個繼承HystrixCollapser,另一個繼承HystrixCommand,這個方法比較顯式的編碼聲明有助於理解,但是不夠Hystrix方式便捷。
自定義HystrixCollapser方式和Hystrix注解方式實現請求合並的優劣
-
雖然Hystrix注解方式比較快,但是不能做到實時更改等待的單位時間,那個超時時間是放在注解上,如果要更改單位時間,其實都需要重啟服務或者重新編譯打包。
-
用自定義HystrixCollapser比較好的地方就是可以在運行過程中,讀字典表去更改單位時間,這樣線上出問題了就不用重啟了。
-
但是自定義HystrixCollapser方式缺點還是有的,因為綁定一個批量方法就要建立一個HystrixCommand類,如果有多個請求合並的情況,就只能建立多個HystrixCommand類了。
1. 添加POM
聲明springboot 和springcloud版本
我以前做的工程使用了1.4.7.RELEASE,Camden.SR2。
其實大家可以用新版本的,只是新版本的eureka、Feign依賴的artifactId改變了,但是后續使用方式是一樣的。
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.4.7.RELEASE</version>
</parent>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>Camden.SR2</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
添加關鍵依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-eureka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-hystrix</artifactId>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger2</artifactId>
<version>2.2.2</version>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger-ui</artifactId>
<version>2.2.2</version>
</dependency>
2. 啟動注解
除了SpringCloud客戶端所基本需要的@SpringBootApplication @EnableEurekaClient,主要加上@EnableCircuitBreaker。因為使用到hystrix的都必須聲明這個注解,為了啟動斷路器的意思,如熔斷的時候也會使用,熔斷也是通過hystrix來實現的。
這個比較關鍵,不啟動的話,后續編碼怎么弄都不生效的
@SpringBootApplication
@EnableDiscoveryClient
//使用hystrix必須增加
@EnableCircuitBreaker
@EnableEurekaClient
@EnableSwagger2
public class ProviderRequestMergeApplication {
public static void main(String[] args) {
SpringApplication.run(ProviderRequestMergeApplication.class, args);
}
}
3. 請求接口Controller
編寫兩個接口,user方法是沒有經過合並請求的樣例,在本案例實際沒有作用,只是用於校驗合並與不合並的效果。
userbyMerge方法在合並請求的方法,其作為請求接口入口,合並請求的邏輯,並不需要在Controller里面實現,使得Controller只作為請求這一層,不耦合其他功能。
/**
*
* @author kelvin.cai
*
*/
@RestController
public class UserController {
@Autowired
private UserBatchServiceImpl userBatchServiceImpl;
@RequestMapping(method = RequestMethod.POST,value = "/user/{id}")
public User user(@PathVariable Long id) {
User book = new User( 55L, "姚雪垠2");
return book;
}
@RequestMapping(method = RequestMethod.GET,value = "/userbyMerge/{id}")
public User userbyMerge(@PathVariable Long id) throws InterruptedException, ExecutionException {
Future<User> userFu = this.userBatchServiceImpl.getUserById(id);
User user = userFu.get();
return user;
}
}
4. 編寫請求合並邏輯
/**
*
* @author kelvin.cai
*
*/
@Component
public class UserBatchServiceImpl {
@HystrixCollapser(batchMethod = "getUserBatchById",scope=Scope.GLOBAL,
collapserProperties = {@HystrixProperty(name ="timerDelayInMilliseconds",value = "2000")})
public Future<User> getUserById(Long id) {
throw new RuntimeException("This method body should not be executed");
}
@HystrixCommand
public List<User> getUserBatchById(List<Long> ids) {
System.out.println("進入批量處理方法"+ids);
List<User> ps = new ArrayList<User>();
for (Long id : ids) {
User p = new User();
p.setId(id);
p.setUsername("dizang"+ids);
ps.add(p);
}
return ps;
}
}
這里有幾個關鍵點(如果沒生效可以看看)
- @HystrixCollapser參數batchMethod 的值為批量處理的方法的名字,批量處理方法必須在同一個類中。
- 單個處理方法和批量處理方法必須要同一個基本類型,只是批量方法需要使用List去包裹
- 單個處理方法,建議用Future,這個是jdk線程異步獲取的那個類,用於異步獲取結果。其實有另外的返回類型,讓調用getUserById使用同步阻塞的方式去使用,但是不是很建議。
- scope有兩個值一個是Scope.REQUEST,意思就是當次請求接口內調用UserBatchServiceImpl.getUserById多次才會合並。想想看,如果我一個接口內,調用多次單個插敘,為何不直接使用一個批量查詢呢?我沒想到有什么場景會需要這個值。
scope有另外一個值Scope.GLOBAL,就是樣例所示的值,意思就是,所有請求接口進來都合並。大家回顧一下需求目的,就比較符合要求了,如多個支付寶用戶查詢自己的信息時就是合並全局請求。 - @HystrixProperty填合並請求的單位時間,debug時可以把他設置為5秒,比較好測試。
這里有個包路徑的建議
這個合並請求類UserBatchServiceImpl 不建議放在業務邏輯層,為了保持業務邏輯service層代碼是干凈的只保留業務邏輯,所以這個UserBatchServiceImpl 類建議放在另外一個包collapser下,讓這個包路徑只是用於處理請求合並的事情。
因為這個類是利用springcloud框架實現,萬一以后不用springcloud來做合並請求而用原始隊列加線程池怎么辦?
而且有些工程設計時,是建立server工程只做請求和服務治理,搞另外一個工程專門寫domain領域下的東西,不包含其他框架的,這樣為了第三個工程叫job定時任務工程可以直接使用domain工程的依賴。
這個領域驅動設計,請看我之前的文章。
測試方法
1. 觸發測試
swagger-ui
如果你有添加swagger,那你打開http://localhost:7902/swagger-ui.html,對接口填一下參數請求兩次。
2. 結果輸出
下圖中,console日志已經輸出了兩次請求的入參
3. Jmeter
Postman不能測試並發請求,為了試驗並發,要么用上面的辦法,要么下載Jmeter來做測試。
總結
到這里相信大家都已經完成了合並請求了,其實原理還是基於原始做法,利用隊里存入參,然后利用線程池定時的獲取隊列的入參,再批量處理,利用線程的Future,異步返回結果。大致流程是這樣的就不再描述了,如果有空會繼續弄原始方法的請求合並。
大家還可以去看看Hystrix合並請求的其他參數,搜索相關信息來擴展hystrix功能。
本文Demo
都在我springcloud的demo里面了,看provider-hystrix-request-merge這個工程下的內容。
https://gitee.com/kelvin-cai/spring-cloud-demo
歡迎關注公眾號,文章更快一步
我的公眾號 :地藏思維
掘金:地藏Kelvin
簡書:地藏Kelvin
我的Gitee: 地藏Kelvin https://gitee.com/kelvin-cai