Hystrix-request collapsing(請求合並)


介紹:  

  Hystrix的請求合並就是把重復的請求批量的用一個HystrixCommand命令去執行,以減少通信消耗和線程數的占用。Hystrix的請求合並用到了HystrixCollapser這個抽象類,它在HystrixCommand之前前放置一個合並處理器,將處於一個很短的時間窗(默認10ms)內對同一依賴服務的多個請求進行整合並以批量方式發起請求的功能(服務提供方也需要提供相應的匹狼實現接口)。下面我們通過一個例子來看看怎么使用。

示例:

  這個示例是基於spring cloud的,對此不熟悉的可以參考這里了解。

1.首先我們需要一個EurekaServer來作為注冊中心。這個沒什么特別的說明,可以參考代碼示例中的 eureka-server工程

2.新建一個服務提供者工程eureka-server-service

  2.1 新建一個User.java。這個model必須要有一個無參的默認構造器,否則后面的實驗會報錯

package org.hope.model;

public class User {
    //必須要有一個無參的構造器
    public User(){}

    public User(Long id, String name) {
        this.id = id;
        this.name = name;
    }

    private Long id;
    private String name;

   setters()&getters();
}

  2.2 新建一個提供服務的controller

package org.hope.web;

import org.hope.model.User;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

import java.util.ArrayList;
import java.util.List;

@RestController
public class UserBatchController {

    @RequestMapping(value = "/users", method = RequestMethod.GET)
    public List<User> batchUser(String ids) {
        System.out.println("ids===:" + ids);
        List<User> lists = new ArrayList<User>();
        lists.add(new User(1l, "小明"));
        lists.add(new User(2l, "小紅"));
        lists.add(new User(3l, "小張"));
        lists.add(new User(4l, "小王"));
        lists.add(new User(5l, "小李"));

        return lists;
    }

    @RequestMapping(value = "/users/{id}", method = RequestMethod.GET)
    public User singleUser(@PathVariable("id") String id) {
        User user = new User(100L, "大王");
        return user;
    }

}

  2.3 springboot的main函數

package org.hope;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@SpringBootApplication
@EnableEurekaClient
@RestController
public class ServiceApplication {

    public static void main(String[] args) {
        SpringApplication.run(ServiceApplication.class, args);
    }

}

  2.4 配置文件application.yml

eureka:
  client:
    serviceUrl:
      #注冊中心的地址
      defaultZone: http://localhost:8761/eureka/
server:
  #當前服務端口號
  port: 8762
spring:
  application:
    #當前應用名稱
    name: service-batch

 3.新建一個服務消費者工程ribbon-hystrix,在這個工程里我們測試hystrix批量服務調用的請求合並功能

  3.1新建一個model User.java。這個model一定要有無參的構造器

package org.hope.lee.model;

public class User {
    //一定要有無參的構造器
    public User(){}

    public User(Long id, String name) {
        this.id = id;
        this.name = name;
    }

    private Long id;
    private String name;

    getters()&setters();
}

   3.2 service負責調用服務

package org.hope.lee.service;

import org.hope.lee.model.User;

import java.util.List;

public interface UserService {
    public User find(Long id);

    public List<User> findAll(List<Long> ids);
}
package org.hope.lee.service;

import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.lang.StringUtils;
import org.hope.lee.model.User;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestTemplate;

import java.util.Arrays;
import java.util.List;

@Service("userService")
public class UserServiceImpl implements UserService{
    @Autowired
    private RestTemplate restTemplate;

    @Override
    public User find(Long id) {
        return restTemplate.getForObject("http://localhost:8762/users/{1}",User.class, id);
    }

    @Override
    public List<User> findAll(List<Long> ids) {
        System.out.println("finaAll request:---------" + ids + "Thread.currentThread().getName():-------" + Thread.currentThread().getName());
        User[] users = restTemplate.getForObject("http://localhost:8762/users?ids={1}", User[].class, StringUtils.join(ids, ","));
        return Arrays.asList(users);
    }
}

   3.3 HystrixCommand命令執行請求

package org.hope.lee.command;

import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey;
import com.netflix.hystrix.HystrixCommandKey;
import org.hope.lee.model.User;
import org.hope.lee.service.UserService;

import java.util.ArrayList;
import java.util.List;

public class UserBatchCommand extends HystrixCommand<List<User>> {

    UserService userService;
    List<Long> userIds;

    public UserBatchCommand(UserService userService, List<Long> userIds) {
        super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")).
                andCommandKey(HystrixCommandKey.Factory.asKey("GetValueForKey")));
        this.userService = userService;
        this.userIds = userIds;
    }

    @Override
    protected List<User> run() throws Exception {
        return userService.findAll(userIds);
    }

    @Override
    protected List<User> getFallback() {
        List<User> users = new ArrayList<User>();
        users.add(new User(99L, "失敗者"));
        return new ArrayList<User>(users);
    }
}

   3.4 HystrixCollapser命令來做請求合並

package org.hope.lee.command;

import com.netflix.hystrix.HystrixCollapser;
import com.netflix.hystrix.HystrixCollapserKey;
import com.netflix.hystrix.HystrixCollapserProperties;
import com.netflix.hystrix.HystrixCommand;
import org.hope.lee.model.User;
import org.hope.lee.service.UserService;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;

  /**
  * Created by lisen on 2017/12/27.
  * 通過看HystrixCollapser類的源碼:
  * public abstract class HystrixCollapser<BatchReturnType, ResponseType, RequestArgumentType>
  * 我們可以知道List<User>表示:合並后批量請求的返回類型
  * User表示:單個請求返回的類型
  * Long表示:請求參數類型
  */

public class UserCollapseCommand extends HystrixCollapser<List<User>, User, Long> {

    private UserService userService;

    private Long userId;

    public UserCollapseCommand(UserService userService, Long userId) {
        super(Setter.withCollapserKey(HystrixCollapserKey.Factory.asKey("userCollapseCommand")).
                andCollapserPropertiesDefaults(HystrixCollapserProperties.Setter().withTimerDelayInMilliseconds(100)));
        this.userService = userService;
        this.userId = userId;
    }

    @Override
    public Long getRequestArgument() {
        return userId;
    }

    /**
     *
     * @param collapsedRequests 保存了延遲時間窗中收集到的所有獲取單個User的請求。通過獲取這些請求的參數來組織
     *                          我們准備的批量請求命令UserBatchCommand實例
     * @return
     */
    @Override
    protected HystrixCommand<List<User>> createCommand(Collection<CollapsedRequest<User, Long>> collapsedRequests) {
        List<Long> userIds = new ArrayList<>(collapsedRequests.size());
        userIds.addAll(collapsedRequests.stream().map(CollapsedRequest::getArgument).collect(Collectors.toList()));
        return new UserBatchCommand(userService, userIds);
    }

    /**
     * 在批量請求命令UserBatchCommand實例被觸發執行完成后,該方法開始執行,
     * 在這里我們通過批量結果batchResponse對象,為collapsedRequests中每個合並前的單個請求設置返回結果。
     * 來完成批量結果到單個請求結果的轉換
     * @param batchResponse 保存了createCommand中組織的批量請求命令的返回結果
     * @param collapsedRequests 代表了每個合並的請求
     */
    @Override
    protected void mapResponseToRequests(List<User> batchResponse, Collection<CollapsedRequest<User, Long>> collapsedRequests) {
        System.out.println("mapResponseToRequests========>");
        int count = 0;
        for(CollapsedRequest<User, Long> collapsedRequest : collapsedRequests) {
            User user = batchResponse.get(count++);
            collapsedRequest.setResponse(user);
        }
    }
}

  3.5寫一個controller來輔助測試

package org.hope.lee.web;

import com.netflix.hystrix.strategy.concurrency.HystrixRequestContext;
import org.hope.lee.command.UserCollapseCommand;
import org.hope.lee.model.User;
import org.hope.lee.service.UserService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

import java.util.concurrent.Future;

@RestController
public class CollapseCommandController {

    @Autowired
    private UserService userService;

    @RequestMapping(value = "/collapse", method = RequestMethod.GET)
    public void requestCollapse() {
        HystrixRequestContext context = HystrixRequestContext.initializeContext();
        try {
            Future<User> f1 = new UserCollapseCommand(userService, 1L).queue();
            Future<User> f2 = new UserCollapseCommand(userService, 2L).queue();
            Future<User> f3 = new UserCollapseCommand(userService, 3L).queue();

            Thread.sleep(3000);

            Future<User> f4 = new UserCollapseCommand(userService, 4L).queue();
            Future<User> f5 = new UserCollapseCommand(userService, 5L).queue();

            User u1 = f1.get();
            User u2 = f2.get();
            User u3 = f3.get();

            User u4 = f4.get();
            User u5 = f5.get();
            System.out.println(u1.getName());
            System.out.println(u2.getName());
            System.out.println(u3.getName());
            System.out.println(u4.getName());
            System.out.println(u5.getName());
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            context.close();
        }
    }

}

  3.6 spring boot main方法

package org.hope.lee;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.client.loadbalancer.LoadBalanced;
import org.springframework.cloud.netflix.hystrix.EnableHystrix;
import org.springframework.cloud.netflix.hystrix.dashboard.EnableHystrixDashboard;
import org.springframework.context.annotation.Bean;
import org.springframework.web.client.RestTemplate;

@SpringBootApplication
@EnableDiscoveryClient
public class ServiceRibbonApplication {

    public static void main(String[] args) {
        SpringApplication.run(ServiceRibbonApplication.class, args);
    }

    @Bean
    RestTemplate restTemplate() {
        return new RestTemplate();
    }

}

  3.7.配置文件applicationi.properties

eureka.client.serviceUrl.defaultZone=http://localhost:8761/eureka/
server.port=8763
spring.application.name=batch-customer

測試:

  1.運行eureka-server啟動注冊中心

  2.運行eureka-server-service啟動服務提供者

  3.運行ribbon-hystrix啟動服務消費者

  4.在瀏覽器輸入: http://localhost:8763/collapse

輸出結果:從結果中我們看到前3次請求合並為一個請求,后面2次請求合並為了一個請求

遇到的問題:

  model的實體類一定要定義一個無參數的默認構造器。否則就會報錯。

 

 使用注解實現請求合並器

package org.hope.lee.service;

import com.netflix.hystrix.contrib.javanica.annotation.HystrixCollapser;
import com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand;
import com.netflix.hystrix.contrib.javanica.annotation.HystrixProperty;
import org.apache.commons.lang.StringUtils;
import org.hope.lee.model.User;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestTemplate;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Future;

@Service("peopleService")
public class PeopleServiceImpl implements PeopleService {
    @Autowired
    private RestTemplate restTemplate;

    @HystrixCollapser(batchMethod = "findAll", collapserProperties = {@HystrixProperty(name = "timerDelayInMilliseconds", value = "100")}) public Future<User> find(Long id) {
        throw new RuntimeException("This method body should not be executed");
    }

    @HystrixCommand public List<User> findAll(List<Long> ids) {
        System.out.println("Annotation---------" + ids + "Thread.currentThread().getName():" + Thread.currentThread().getName());
        User[] users = restTemplate.getForObject("http://localhost:8762/users?ids={1}", User[].class, StringUtils.join(ids, ","));
        return Arrays.asList(users);
    }

}

 

 

https://gitee.com/huayicompany/Hystrix-learn/tree/master/hystrix-request-collapsing

參考:

[1]博客,https://segmentfault.com/a/1190000011468804,Spring Cloud中Hystrix的請求合並

[2]官網,https://github.com/Netflix/Hystrix/wiki/How-To-Use#Collapsing, Request Collapsing

[3]《SpringCloud微服務實戰》,電子工業出版社,翟永超

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM