一 概述
想让post请求中相同参数走特定的服务器(例如age=25只能走服务a).post\get请求走自定义策略,get\delete请求走轮训策略.
二 自定义负载均衡实例
主要步骤:
a 自定义全局过滤器CacheBodyGlobalFilter,把body中的数据缓存起来,此过滤器优先级较高,负责自定义负载均衡策略时拿不到post请求中body的数据
b 路由规则接口 ICustomRule
c 路由规则实现类CustomChooseRule
d 自定义负载均衡CustomLoadBalancerFilter此优先级要比CacheBodyGlobalFilter低,负责先经过它,body数据没缓存
e 解析body的工具类 FilterRequestResponseUtil
以People中的age内容路由(对psot put请求相同age只能通过相同服务,get\delete请求走自定义轮训策略和框架一样)
1 cloud-parent cloud-providea cloud-provideb的创建
(1)父工程pom文件(可能包含无用依赖) \及服务A B的创建

<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.dp</groupId> <artifactId>cloud-parent</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>pom</packaging> <properties> <java.version>1.8</java.version> <spring-boot.version>2.1.6.RELEASE</spring-boot.version> <spring-cloud.version>Greenwich.SR2</spring-cloud.version> <nacos.version>0.2.2.RELEASE</nacos.version> </properties> <dependencies> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>io.springfox</groupId> <artifactId>springfox-swagger2</artifactId> <version>2.9.2</version> </dependency> <dependency> <groupId>cn.miludeer</groupId> <artifactId>jsoncode</artifactId> <version>1.2.4</version> </dependency> <dependency> <groupId>io.springfox</groupId> <artifactId>springfox-swagger-ui</artifactId> <version>2.9.2</version> </dependency> <dependency> <groupId>com.baomidou</groupId> <artifactId>mybatis-plus-boot-starter</artifactId> <version>3.3.1.tmp</version> </dependency> <dependency> <groupId>com.baomidou</groupId> <artifactId>mybatis-plus</artifactId> <version>3.3.1.tmp</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <scope>runtime</scope> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>druid</artifactId> <version>1.0.20</version> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId> </dependency> </dependencies> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>${spring-cloud.version}</version> <type>pom</type> <scope>import</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-dependencies</artifactId> <version>${spring-boot.version}</version> <type>pom</type> <scope>import</scope> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-alibaba-dependencies</artifactId> <version>${nacos.version}</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <modules> <module>cloud-providea</module> <module>cloud-getway</module> <module>cloud-provideb</module> </modules> </project>
(2)服务A服务B目录结构(共用实体类没提出来,都单独创建了,懒得再创建common服务)
以创建服务A为列(服务B和A一样,只不过端口不一样)
Controller层代码如下:
import org.cloud.provide.entity.People; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.PutMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RestController; @RestController @RequestMapping("/hello") public class HelloController { @PostMapping public String helloPost(@RequestBody People people) { System.out.println("a=="+people.getName()); return "服务A hello"; } @PutMapping @RequestMapping(method=RequestMethod.PUT,value = "{age}") public String helloPut(@PathVariable int age) { return "服务A put"+age; } @GetMapping public String helloGet(String age) { return "服务A get"+age; } }
实体类Peopler如下:
@Data @AllArgsConstructor @NoArgsConstructor public class People { private String name; private Integer age; }
yml配置文件(数据库连接用不到没删除掉)
server: port: 7200 #服务B是7300 spring: application: name: nacos-provide cloud: nacos: discovery: server-addr: localhost:8848 datasource: username: root password: 123 url: jdbc:mysql://localhost:3306/cloud driver-class-name: com.mysql.jdbc.Driver type: com.alibaba.druid.pool.DruidDataSource mybatis-plus: configuration : map-underscore-to-camel-case : true log-impl: org.apache.ibatis.logging.stdout.StdOutImpl mapper-locations: classpath:mapper/*.xml
服务B同上,只不过yml配置文件端口是7300
2 网关服务cloud-gateway的创建
(1) pom yml文件如下
pom.xml

<?xml version="1.0"?> <project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>com.dp</groupId> <artifactId>cloud-parent</artifactId> <version>0.0.1-SNAPSHOT</version> </parent> <groupId>com.dp</groupId> <artifactId>cloud-getway</artifactId> <version>0.0.1-SNAPSHOT</version> <name>cloud-getway</name> <url>http://maven.apache.org</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>3.8.1</version> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-gateway</artifactId> <exclusions> <exclusion> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </exclusion> <exclusion> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-webflux</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-gateway</artifactId> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins> </build> </project>
yml

spring: application: name: getway cloud: #配置SpringCloudGateway的路由 gateway: discovery: locator: enabled: true lower-case-service-id: true # 服务名小写 routes: - id: nacos-provide #路由的ID,没有固定规则但求唯一,建议配合服务名 uri: lb://nacos-provide # 匹配后提供服务的路由地址,lb代表从注册中心获取服务,且以负载均衡方式转发 predicates: - Path=/nacos-provide/** #断言,路径相匹配的进行路由 nacos: discovery: server-addr: localhost:8848 datasource: username: root password: 123 url: jdbc:mysql://localhost:3306/cloud driver-class-name: com.mysql.jdbc.Driver type: com.alibaba.druid.pool.DruidDataSource mybatis-plus: configuration : map-underscore-to-camel-case : true log-impl: org.apache.ibatis.logging.stdout.StdOutImpl mapper-locations: classpath:mapper/*.xml server: port: 9000 #原来是http的8080
(2)路由规则接口ICustomRule
/* * 路由规则 */ public interface ICustomRule { ServiceInstance choose(ServerWebExchange exchange, DiscoveryClient discoveryClient); }
(3)路由规则实现类
import java.net.URI; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; import org.springframework.cloud.client.ServiceInstance; import org.springframework.cloud.client.discovery.DiscoveryClient; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.web.server.ServerWebExchange; import com.alibaba.fastjson.JSONObject; import reactor.core.publisher.Flux; import static org.springframework.cloud.gateway.support.ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR; public class CustomChooseRule implements ICustomRule { private AtomicInteger atomicInteger = new AtomicInteger(0); @Override public ServiceInstance choose(ServerWebExchange exchange, DiscoveryClient discoveryClient) { URI originalUrl = exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR); String instancesId = originalUrl.getHost(); Flux<DataBuffer> dataBufferFlux = exchange.getRequest().getBody(); //获取body中的数据 String body = FilterRequestResponseUtil.resolveBodyFromRequest(dataBufferFlux); //所有服务数据 List<ServiceInstance> instances = discoveryClient.getInstances(instancesId); int index=0; //拦截nacos-provide服务也可拦截服务中特定url if(instancesId.equals("nacos-provide")){ /* * 根据age中的数据访问特定的服务,例如age=25 只能访问服务a */ if("POST"==exchange.getRequest().getMethodValue()) { JSONObject json = JSONObject.parseObject(new String(body)); index =json.getString("age").hashCode() % instances.size(); }else if("PUT"==exchange.getRequest().getMethodValue()) { String str=originalUrl.toString(); String json=str.substring(str.lastIndexOf("/")+1, str.length()); index=json.hashCode()% instances.size(); }else { /* * GET Delete请求采用轮训算法 */ index = this.getAndIncrement() % instances.size(); } }else { /* * 别的服务采用轮训 */ index = this.getAndIncrement() % instances.size(); } return instances.get(index); } /** * 计算得到当前调用次数 * @return */ public final int getAndIncrement(){ int current; int next; do { current = atomicInteger.get(); next = current >= Integer.MAX_VALUE ? 0 : current+1; }while (!atomicInteger.compareAndSet(current,next)); return next; } }
(3)CacheBodyGlobalFilter自定义全局过滤器把body中的数据缓存起来,负责拿不动body中的数据
order设置的是Ordered.HIGHEST_PRECEDENCE,即最高优先级的过滤器。优先级设置这么高的原因是某些系统内置的过滤器可能也会去读body,这样就会导致我们自定义过滤器中获取body的时候报body只能读取一次这样的错误
import lombok.extern.slf4j.Slf4j; import org.springframework.cloud.gateway.filter.GatewayFilterChain; import org.springframework.cloud.gateway.filter.GlobalFilter; import org.springframework.core.Ordered; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferUtils; import org.springframework.http.server.reactive.ServerHttpRequest; import org.springframework.http.server.reactive.ServerHttpRequestDecorator; import org.springframework.stereotype.Component; import org.springframework.web.server.ServerWebExchange; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; /** * @Description 把body中的数据缓存起来 */ @Slf4j @Component public class CacheBodyGlobalFilter implements Ordered, GlobalFilter { @Override public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { if (exchange.getRequest().getHeaders().getContentType() == null) { return chain.filter(exchange); } else { return DataBufferUtils.join(exchange.getRequest().getBody()) .flatMap(dataBuffer -> { DataBufferUtils.retain(dataBuffer); Flux<DataBuffer> cachedFlux = Flux .defer(() -> Flux.just(dataBuffer.slice(0, dataBuffer.readableByteCount()))); ServerHttpRequest mutatedRequest = new ServerHttpRequestDecorator( exchange.getRequest()) { @Override public Flux<DataBuffer> getBody() { return cachedFlux; } }; //exchange.getAttributes().put(CACHE_REQUEST_BODY_OBJECT_KEY, cachedFlux); return chain.filter(exchange.mutate().request(mutatedRequest).build()); }); } } @Override public int getOrder() { return Ordered.HIGHEST_PRECEDENCE; } }
(4)自定义负责均衡策略
import java.util.ArrayList; import java.util.Iterator; import java.util.List; import org.springframework.beans.factory.config.BeanPostProcessor; import org.springframework.cloud.client.ServiceInstance; import org.springframework.cloud.client.discovery.DiscoveryClient; import org.springframework.cloud.client.loadbalancer.LoadBalancerClient; import org.springframework.cloud.gateway.config.LoadBalancerProperties; import org.springframework.cloud.gateway.filter.GatewayFilterChain; import org.springframework.cloud.gateway.filter.LoadBalancerClientFilter; import org.springframework.core.io.buffer.DataBufferUtils; import org.springframework.util.CollectionUtils; import org.springframework.web.server.ServerWebExchange; import org.springframework.core.Ordered; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferUtils; import org.springframework.http.server.reactive.ServerHttpRequest; import org.springframework.http.server.reactive.ServerHttpRequestDecorator; import org.springframework.stereotype.Component; import org.springframework.web.server.ServerWebExchange; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import static org.springframework.cloud.gateway.support.ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR; import static org.springframework.cloud.gateway.support.ServerWebExchangeUtils.GATEWAY_SCHEME_PREFIX_ATTR; /** * @Description 自定义负载均衡 **/ public class CustomLoadBalancerFilter extends LoadBalancerClientFilter { private final DiscoveryClient discoveryClient; private final List<ICustomRule> chooseRules; public CustomLoadBalancerFilter(LoadBalancerClient loadBalancer, LoadBalancerProperties properties, DiscoveryClient discoveryClient) { super(loadBalancer, properties); this.discoveryClient = discoveryClient; this.chooseRules = new ArrayList<>(); chooseRules.add(new CustomChooseRule()); } protected ServiceInstance choose(ServerWebExchange exchange) { if(!CollectionUtils.isEmpty(chooseRules)){ Iterator<ICustomRule> iChooseRuleIterator = chooseRules.iterator(); while (iChooseRuleIterator.hasNext()){ ICustomRule chooseRule = iChooseRuleIterator.next(); ServiceInstance choose = chooseRule.choose(exchange,discoveryClient); if(choose != null){ return choose; } } } return loadBalancer.choose( ((ServiceInstance) exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR)).getHost()); } /* * 降低优先级/要比CacheBodyGlobalFilter优先级低,要不然缓存不了body数据 */ public int getOrder() { return Ordered.LOWEST_PRECEDENCE; } }
(5)解析body数据的工具类
import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferUtils; import reactor.core.publisher.Flux; import java.nio.CharBuffer; import java.nio.charset.StandardCharsets; import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Matcher; import java.util.regex.Pattern; public final class FilterRequestResponseUtil { /** * spring cloud gateway 获取post请求的body体 * @param body * @return */ public static String resolveBodyFromRequest( Flux<DataBuffer> body){ AtomicReference<String> bodyRef = new AtomicReference<>(); // 缓存读取的request body信息 body.subscribe(dataBuffer -> { CharBuffer charBuffer = StandardCharsets.UTF_8.decode(dataBuffer.asByteBuffer()); DataBufferUtils.release(dataBuffer); bodyRef.set(charBuffer.toString()); }); //获取request body return bodyRef.get(); } /** * 读取body内容 * @param body * @return */ public static String resolveBodyFromRequest2( Flux<DataBuffer> body){ StringBuilder sb = new StringBuilder(); body.subscribe(buffer -> { byte[] bytes = new byte[buffer.readableByteCount()]; buffer.read(bytes); DataBufferUtils.release(buffer); String bodyString = new String(bytes, StandardCharsets.UTF_8); sb.append(bodyString); }); return formatStr(sb.toString()); } /** * 去掉空格,换行和制表符 * @param str * @return */ private static String formatStr(String str){ if (str != null && str.length() > 0) { Pattern p = Pattern.compile("\\s*|\t|\r|\n"); Matcher m = p.matcher(str); return m.replaceAll(""); } return str; } }
(6)把自定义类启动时加载到spring容器
import org.springframework.cloud.client.discovery.DiscoveryClient; import org.springframework.cloud.client.loadbalancer.LoadBalancerClient; import org.springframework.cloud.gateway.config.LoadBalancerProperties; import org.springframework.cloud.gateway.filter.GlobalFilter; import org.springframework.cloud.gateway.filter.LoadBalancerClientFilter; import org.springframework.cloud.gateway.route.RouteLocator; import org.springframework.cloud.gateway.route.builder.RouteLocatorBuilder; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.http.HttpMethod; import org.springframework.http.HttpRequest; import org.springframework.http.HttpStatus; import reactor.core.publisher.Mono; @Configuration public class GetawayConfig { @Bean public LoadBalancerClientFilter loadBalancerClientFilter(LoadBalancerClient client, LoadBalancerProperties properties, DiscoveryClient discoveryClient) { return new CustomLoadBalancerFilter(client, properties,discoveryClient); } }
yml文件:

spring: application: name: getway cloud: #配置SpringCloudGateway的路由 gateway: discovery: locator: enabled: true lower-case-service-id: true # 服务名小写 routes: - id: nacos-provide #路由的ID,没有固定规则但求唯一,建议配合服务名 uri: lb://nacos-provide # 匹配后提供服务的路由地址,lb代表从注册中心获取服务,且以负载均衡方式转发 predicates: - Path=/nacos-provide/** #断言,路径相匹配的进行路由 nacos: discovery: server-addr: localhost:8848 datasource: username: root password: 123 url: jdbc:mysql://localhost:3306/cloud driver-class-name: com.mysql.jdbc.Driver type: com.alibaba.druid.pool.DruidDataSource mybatis-plus: configuration : map-underscore-to-camel-case : true log-impl: org.apache.ibatis.logging.stdout.StdOutImpl mapper-locations: classpath:mapper/*.xml server: port: 9000 #原来是http的8080
3 简单测试
(1)post请求如下(age相同请求多少次都是走服务A put请求同post请求):
(2) get请求测试如下(走轮训策略,每次请求服务不一样)
1第一次请求
2 第二次请求
代码路径: https://github.com/kangchangchang/gateway.git