自定义Gateway负载均衡(根据请求参数路由到不同服务器)


一 概述

   想让post请求中相同参数走特定的服务器(例如age=25只能走服务a).post\get请求走自定义策略,get\delete请求走轮训策略.

 

二 自定义负载均衡实例

   主要步骤:

      a 自定义全局过滤器CacheBodyGlobalFilter,把body中的数据缓存起来,此过滤器优先级较高,负责自定义负载均衡策略时拿不到post请求中body的数据

      b 路由规则接口 ICustomRule

      c 路由规则实现类CustomChooseRule

      d 自定义负载均衡CustomLoadBalancerFilter此优先级要比CacheBodyGlobalFilter低,负责先经过它,body数据没缓存

      e 解析body的工具类 FilterRequestResponseUtil

   以People中的age内容路由(对psot put请求相同age只能通过相同服务,get\delete请求走自定义轮训策略和框架一样)

  1 cloud-parent  cloud-providea  cloud-provideb的创建

          

      (1)父工程pom文件(可能包含无用依赖) \及服务A B的创建

        

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.dp</groupId>
  <artifactId>cloud-parent</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <packaging>pom</packaging>


    <properties>
        <java.version>1.8</java.version>
        <spring-boot.version>2.1.6.RELEASE</spring-boot.version>
        <spring-cloud.version>Greenwich.SR2</spring-cloud.version>
        <nacos.version>0.2.2.RELEASE</nacos.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>



        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger2</artifactId>
            <version>2.9.2</version>
        </dependency>
     <dependency>
       <groupId>cn.miludeer</groupId>
       <artifactId>jsoncode</artifactId>
       <version>1.2.4</version>
    </dependency>
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger-ui</artifactId>
            <version>2.9.2</version>
        </dependency>
         <dependency>
              <groupId>com.baomidou</groupId>
               <artifactId>mybatis-plus-boot-starter</artifactId>
                <version>3.3.1.tmp</version>
        </dependency>
        <dependency>
                <groupId>com.baomidou</groupId>
                <artifactId>mybatis-plus</artifactId>
                <version>3.3.1.tmp</version>
        </dependency>
         <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <scope>runtime</scope>
        </dependency>

<dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid</artifactId>
            <version>1.0.20</version>
 </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
         </dependency>
    </dependencies>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-dependencies</artifactId>
                <version>${spring-boot.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-alibaba-dependencies</artifactId>
                <version>${nacos.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>


    <modules>
        <module>cloud-providea</module>
    <module>cloud-getway</module>
    <module>cloud-provideb</module>
    </modules>
</project>
View Code

      (2)服务A服务B目录结构(共用实体类没提出来,都单独创建了,懒得再创建common服务)

         以创建服务A为列(服务B和A一样,只不过端口不一样)

        

       Controller层代码如下:

          

import org.cloud.provide.entity.People;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.PutMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/hello")
public class HelloController {

    
    @PostMapping
    public String   helloPost(@RequestBody People people) {                
        System.out.println("a=="+people.getName());
        return  "服务A  hello";
                
    }
    
    
    @PutMapping
    @RequestMapping(method=RequestMethod.PUT,value = "{age}")
    public String helloPut(@PathVariable int age) {        
        return "服务A  put"+age;
    }
    
    @GetMapping
    public String  helloGet(String age) {        
        return "服务A get"+age;
    }
}

        实体类Peopler如下:

        

@Data
@AllArgsConstructor
@NoArgsConstructor
public class People {
    
    
    private String name;
    private Integer age;
}

      yml配置文件(数据库连接用不到没删除掉)

      

server:
  port: 7200 #服务B是7300
spring:
  application:
    name: nacos-provide
  cloud:
    nacos:
      discovery:
        server-addr: localhost:8848


  datasource:
    username: root
    password: 123
    url: jdbc:mysql://localhost:3306/cloud
    driver-class-name: com.mysql.jdbc.Driver
    type: com.alibaba.druid.pool.DruidDataSource

mybatis-plus: 
    configuration : 
      map-underscore-to-camel-case : true
      log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
    mapper-locations: classpath:mapper/*.xml

    服务B同上,只不过yml配置文件端口是7300

  

        2 网关服务cloud-gateway的创建

      (1) pom yml文件如下

   pom.xml     

<?xml version="1.0"?>
<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
  <modelVersion>4.0.0</modelVersion>
  <parent>
    <groupId>com.dp</groupId>
    <artifactId>cloud-parent</artifactId>
    <version>0.0.1-SNAPSHOT</version>
  </parent>
  <groupId>com.dp</groupId>
  <artifactId>cloud-getway</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <name>cloud-getway</name>
  <url>http://maven.apache.org</url>
  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  </properties>
  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-gateway</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-starter-web</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-starter-webflux</artifactId>
                </exclusion>
            </exclusions>
        </dependency>


        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-gateway</artifactId>
</dependency>

  </dependencies>
  
  
  <build>
      <plugins>
       <plugin>
      <groupId>org.apache.maven.plugins</groupId>
      <artifactId>maven-compiler-plugin</artifactId>
      <configuration>
        <source>1.8</source>
        <target>1.8</target>
      </configuration>
    </plugin>   
      
      </plugins>
  </build>
</project>
View Code

 

   yml

    

spring:
  application:
    name: getway
  cloud: #配置SpringCloudGateway的路由
    gateway:
      discovery:
        locator:
          enabled: true
          lower-case-service-id: true # 服务名小写
      routes:
        - id: nacos-provide  #路由的ID,没有固定规则但求唯一,建议配合服务名
          uri: lb://nacos-provide # 匹配后提供服务的路由地址,lb代表从注册中心获取服务,且以负载均衡方式转发
          predicates:
            - Path=/nacos-provide/** #断言,路径相匹配的进行路由
    nacos:
       discovery:
        server-addr: localhost:8848
  datasource:
    username: root
    password: 123
    url: jdbc:mysql://localhost:3306/cloud
    driver-class-name: com.mysql.jdbc.Driver
    type: com.alibaba.druid.pool.DruidDataSource
mybatis-plus: 
    configuration : 
      map-underscore-to-camel-case : true
      log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
    mapper-locations: classpath:mapper/*.xml 
    
    
    
server:
  port: 9000 #原来是http的8080
View Code

 

     

 (2)路由规则接口ICustomRule

   

/*
 * 路由规则
 */
public interface ICustomRule {

    
    ServiceInstance choose(ServerWebExchange exchange, DiscoveryClient discoveryClient);

}

      (3)路由规则实现类

      

import java.net.URI;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.discovery.DiscoveryClient;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.web.server.ServerWebExchange;
import com.alibaba.fastjson.JSONObject;
import reactor.core.publisher.Flux;
import static org.springframework.cloud.gateway.support.ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR;

public class CustomChooseRule    implements ICustomRule {
    
    private AtomicInteger atomicInteger = new AtomicInteger(0);

    @Override
    public ServiceInstance choose(ServerWebExchange exchange, DiscoveryClient discoveryClient) {

          URI originalUrl = exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR);
          String instancesId = originalUrl.getHost();                     
          Flux<DataBuffer> dataBufferFlux = exchange.getRequest().getBody();
          //获取body中的数据
          String body = FilterRequestResponseUtil.resolveBodyFromRequest(dataBufferFlux);
          //所有服务数据
          List<ServiceInstance> instances = discoveryClient.getInstances(instancesId);
          int index=0;
          
          //拦截nacos-provide服务也可拦截服务中特定url
          if(instancesId.equals("nacos-provide")){
             /*
              * 根据age中的数据访问特定的服务,例如age=25 只能访问服务a
              */
              if("POST"==exchange.getRequest().getMethodValue()) {
                  JSONObject json = JSONObject.parseObject(new String(body));
                  index =json.getString("age").hashCode() % instances.size();
              }else if("PUT"==exchange.getRequest().getMethodValue()) {
                  String str=originalUrl.toString();
                  String json=str.substring(str.lastIndexOf("/")+1, str.length());
                  index=json.hashCode()% instances.size();
              }else {
                  /*
                   * GET Delete请求采用轮训算法
                   */
                   index = this.getAndIncrement() % instances.size();
              }

         }else {
           /*
            * 别的服务采用轮训
            */
             index = this.getAndIncrement() % instances.size();
         }

        return instances.get(index);
    }


    
    /**
     * 计算得到当前调用次数
     * @return
     */
    public final int getAndIncrement(){
        int current;
        int next;

        do {
            current = atomicInteger.get();
            next = current >= Integer.MAX_VALUE ? 0 : current+1;
        }while (!atomicInteger.compareAndSet(current,next));   

        return next;
    }
}

     (3)CacheBodyGlobalFilter自定义全局过滤器把body中的数据缓存起来,负责拿不动body中的数据

      order设置的是Ordered.HIGHEST_PRECEDENCE,即最高优先级的过滤器。优先级设置这么高的原因是某些系统内置的过滤器可能也会去读body,这样就会导致我们自定义过滤器中获取body的时候报body只能读取一次这样的错误

import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.GlobalFilter;
import org.springframework.core.Ordered;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpRequestDecorator;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/**
 * @Description 把body中的数据缓存起来
 */
@Slf4j
@Component
public class CacheBodyGlobalFilter implements Ordered, GlobalFilter {


  @Override
  public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
    if (exchange.getRequest().getHeaders().getContentType() == null) {
      return chain.filter(exchange);
    } else {
      return DataBufferUtils.join(exchange.getRequest().getBody())
          .flatMap(dataBuffer -> {
            DataBufferUtils.retain(dataBuffer);
            Flux<DataBuffer> cachedFlux = Flux
                .defer(() -> Flux.just(dataBuffer.slice(0, dataBuffer.readableByteCount())));
            ServerHttpRequest mutatedRequest = new ServerHttpRequestDecorator(
                exchange.getRequest()) {
              @Override
              public Flux<DataBuffer> getBody() {
                return cachedFlux;
              }
            };
            //exchange.getAttributes().put(CACHE_REQUEST_BODY_OBJECT_KEY, cachedFlux);

            return chain.filter(exchange.mutate().request(mutatedRequest).build());
          });
    }
  }

  @Override
  public int getOrder() {
    return Ordered.HIGHEST_PRECEDENCE;
  }

}

 

      (4)自定义负责均衡策略

      

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.discovery.DiscoveryClient;
import org.springframework.cloud.client.loadbalancer.LoadBalancerClient;
import org.springframework.cloud.gateway.config.LoadBalancerProperties;
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.LoadBalancerClientFilter;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.util.CollectionUtils;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.core.Ordered;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpRequestDecorator;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import static org.springframework.cloud.gateway.support.ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR;
import static org.springframework.cloud.gateway.support.ServerWebExchangeUtils.GATEWAY_SCHEME_PREFIX_ATTR;
/**
 * @Description 自定义负载均衡
 **/
public class CustomLoadBalancerFilter extends LoadBalancerClientFilter   {

    private final DiscoveryClient discoveryClient;

    private final List<ICustomRule> chooseRules;

    public CustomLoadBalancerFilter(LoadBalancerClient loadBalancer,
                                          LoadBalancerProperties properties,
                                          DiscoveryClient discoveryClient) {
        super(loadBalancer, properties);
        this.discoveryClient = discoveryClient;
        this.chooseRules = new ArrayList<>();
        chooseRules.add(new CustomChooseRule());
    }

    protected ServiceInstance choose(ServerWebExchange exchange) {
        if(!CollectionUtils.isEmpty(chooseRules)){
            Iterator<ICustomRule> iChooseRuleIterator = chooseRules.iterator();
            while (iChooseRuleIterator.hasNext()){
                ICustomRule chooseRule = iChooseRuleIterator.next();
                ServiceInstance choose = chooseRule.choose(exchange,discoveryClient);
                if(choose != null){
                    return choose;
                }
            }
        }
        return loadBalancer.choose(
                 ((ServiceInstance) exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR)).getHost());
    }
    
    /*
     * 降低优先级/要比CacheBodyGlobalFilter优先级低,要不然缓存不了body数据
     */
    public int getOrder() {
        return Ordered.LOWEST_PRECEDENCE;
      }
    
}

      (5)解析body数据的工具类

      

import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import reactor.core.publisher.Flux;
import java.nio.CharBuffer;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

public final class FilterRequestResponseUtil {

  /**
   * spring cloud gateway 获取post请求的body体
   * @param body
   * @return
   */
  public static String resolveBodyFromRequest( Flux<DataBuffer> body){
    AtomicReference<String> bodyRef = new AtomicReference<>();
    // 缓存读取的request body信息
    body.subscribe(dataBuffer -> {
      CharBuffer charBuffer = StandardCharsets.UTF_8.decode(dataBuffer.asByteBuffer());
      DataBufferUtils.release(dataBuffer);
      bodyRef.set(charBuffer.toString());
    });
    //获取request body
    return bodyRef.get();

  }

  /**
   * 读取body内容
   * @param body
   * @return
   */
  public static String resolveBodyFromRequest2( Flux<DataBuffer> body){
    StringBuilder sb = new StringBuilder();

    body.subscribe(buffer -> {
      byte[] bytes = new byte[buffer.readableByteCount()];
      buffer.read(bytes);
      DataBufferUtils.release(buffer);
      String bodyString = new String(bytes, StandardCharsets.UTF_8);
      sb.append(bodyString);
    });
    return formatStr(sb.toString());
  }

  /**
   * 去掉空格,换行和制表符
   * @param str
   * @return
   */
  private static String formatStr(String str){
    if (str != null && str.length() > 0) {
      Pattern p = Pattern.compile("\\s*|\t|\r|\n");
      Matcher m = p.matcher(str);
      return m.replaceAll("");
    }
    return str;
  }
}

    (6)把自定义类启动时加载到spring容器

    

import org.springframework.cloud.client.discovery.DiscoveryClient;
import org.springframework.cloud.client.loadbalancer.LoadBalancerClient;
import org.springframework.cloud.gateway.config.LoadBalancerProperties;
import org.springframework.cloud.gateway.filter.GlobalFilter;
import org.springframework.cloud.gateway.filter.LoadBalancerClientFilter;
import org.springframework.cloud.gateway.route.RouteLocator;
import org.springframework.cloud.gateway.route.builder.RouteLocatorBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.HttpMethod;
import org.springframework.http.HttpRequest;
import org.springframework.http.HttpStatus;

import reactor.core.publisher.Mono;

@Configuration
public class GetawayConfig {
    
        @Bean
        public LoadBalancerClientFilter loadBalancerClientFilter(LoadBalancerClient client,
                                                                 LoadBalancerProperties properties,
                                                                DiscoveryClient discoveryClient) {
                return new CustomLoadBalancerFilter(client, properties,discoveryClient);
        }

}

 

     yml文件:

      

spring:
  application:
    name: getway
  cloud: #配置SpringCloudGateway的路由
    gateway:
      discovery:
        locator:
          enabled: true
          lower-case-service-id: true # 服务名小写
      routes:
        - id: nacos-provide  #路由的ID,没有固定规则但求唯一,建议配合服务名
          uri: lb://nacos-provide # 匹配后提供服务的路由地址,lb代表从注册中心获取服务,且以负载均衡方式转发
          predicates:
            - Path=/nacos-provide/** #断言,路径相匹配的进行路由
    nacos:
       discovery:
        server-addr: localhost:8848
  datasource:
    username: root
    password: 123
    url: jdbc:mysql://localhost:3306/cloud
    driver-class-name: com.mysql.jdbc.Driver
    type: com.alibaba.druid.pool.DruidDataSource
mybatis-plus: 
    configuration : 
      map-underscore-to-camel-case : true
      log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
    mapper-locations: classpath:mapper/*.xml 
    
    
    
server:
  port: 9000 #原来是http的8080
View Code

    

3 简单测试 

  (1)post请求如下(age相同请求多少次都是走服务A put请求同post请求):

          

        (2)  get请求测试如下(走轮训策略,每次请求服务不一样)

      1第一次请求

        

      2 第二次请求

        

 

 

代码路径:  https://github.com/kangchangchang/gateway.git

    


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM