本文涉及到的項目使用的版本如下:
Spring Boot:2.0.6.RELEASE
Spring Cloud:Finchley.SR2
背景:
微服務架構,在網關服務里攔截每個請求,進行日志信息記錄與管理,發現當請求體過長時,只能獲取到一部分body,查看攔截過濾器,發現Spring Cloud Gateway是基於reactor-core.jar進行請求數據的操作,獲取body內容時,用到了reactor-core.jar的Flux<DataBuffer>,即一個包含0-N個DataBuffer類型元素的同步序列。
之前嘗試了網上多種寫法,不管是使用subscribe還是block,都無效
subscribe只會接收到第一個發出的元素,所以會導致獲取不全,不管使用AtomicReference<String>還是StringBuilder來包裝獲取到的字符串,都無效。
翻看Spring Cloud Gateway包,會發現有個官方自帶的修改請求體內容的過濾器工廠類:ModifyRequestBodyGatewayFilterFactory(對應的還有修改輸出的body的過濾器工廠類)
但是因為要結合我們自己的業務邏輯,所以這個類我們無法直接使用,但是可以自己定義一個類似的過濾器。
正確寫法:
package cn.miao.gateway.filter; import lombok.extern.slf4j.Slf4j; import org.springframework.cloud.gateway.filter.GatewayFilter; import org.springframework.cloud.gateway.filter.GatewayFilterChain; import org.springframework.cloud.gateway.support.BodyInserterContext; import org.springframework.cloud.gateway.support.CachedBodyOutputMessage; import org.springframework.cloud.gateway.support.DefaultServerRequest; import org.springframework.core.Ordered; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpMethod; import org.springframework.http.MediaType; import org.springframework.http.server.reactive.ServerHttpRequest; import org.springframework.http.server.reactive.ServerHttpRequestDecorator; import org.springframework.stereotype.Component; import org.springframework.web.reactive.function.BodyInserter; import org.springframework.web.reactive.function.BodyInserters; import org.springframework.web.reactive.function.server.ServerRequest; import org.springframework.web.server.ServerWebExchange; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @Slf4j @Component public class RequestBodyOperationFilter implements GatewayFilter, Ordered { @Override public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { ServerHttpRequest request = exchange.getRequest(); if (request.getMethod() != HttpMethod.POST) { return chain.filter(exchange); } return operationExchange(exchange, chain); } private Mono<Void> operationExchange(ServerWebExchange exchange, GatewayFilterChain chain) { // mediaType MediaType mediaType = exchange.getRequest().getHeaders().getContentType(); // read & modify body ServerRequest serverRequest = new DefaultServerRequest(exchange); Mono<String> modifiedBody = serverRequest.bodyToMono(String.class) .flatMap(body -> { if (MediaType.APPLICATION_JSON.isCompatibleWith(mediaType)) { // 對原先的body進行修改操作 String newBody = "{\"testName\":\"testValue\"}"; return Mono.just(newBody); } return Mono.empty(); }); BodyInserter bodyInserter = BodyInserters.fromPublisher(modifiedBody, String.class); HttpHeaders headers = new HttpHeaders(); headers.putAll(exchange.getRequest().getHeaders()); headers.remove(HttpHeaders.CONTENT_LENGTH); CachedBodyOutputMessage outputMessage = new CachedBodyOutputMessage(exchange, headers); return bodyInserter.insert(outputMessage, new BodyInserterContext()) .then(Mono.defer(() -> { ServerHttpRequestDecorator decorator = new ServerHttpRequestDecorator( exchange.getRequest()) { @Override public HttpHeaders getHeaders() { long contentLength = headers.getContentLength(); HttpHeaders httpHeaders = new HttpHeaders(); httpHeaders.putAll(super.getHeaders()); if (contentLength > 0) { httpHeaders.setContentLength(contentLength); } else { httpHeaders.set(HttpHeaders.TRANSFER_ENCODING, "chunked"); } return httpHeaders; } @Override public Flux<DataBuffer> getBody() { return outputMessage.getBody(); } }; return chain.filter(exchange.mutate().request(decorator).build()); })); } @Override public int getOrder() { return -1; } }
原先body會被截斷的寫法A:
package cn.miao.gateway.filter; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import io.netty.buffer.ByteBufAllocator; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.cloud.gateway.filter.GatewayFilter; import org.springframework.cloud.gateway.filter.GatewayFilterChain; import org.springframework.core.Ordered; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferUtils; import org.springframework.core.io.buffer.NettyDataBufferFactory; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpMethod; import org.springframework.http.server.reactive.ServerHttpRequest; import org.springframework.http.server.reactive.ServerHttpRequestDecorator; import org.springframework.stereotype.Component; import org.springframework.web.server.ServerWebExchange; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import java.nio.charset.Charset; @Slf4j @Component public class RequestBodyOperationFilter implements GatewayFilter, Ordered { @Value("${unified.request.sign.flag}") private boolean signFlag; @Override public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { ServerHttpRequest request = exchange.getRequest(); // 只攔截POST 請求 if (request.getMethod() != HttpMethod.POST) { return chain.filter(exchange); } // 操作body ServerHttpRequestDecorator serverHttpRequestDecorator = requestDecorator(exchange); return chain.filter(exchange.mutate().request(serverHttpRequestDecorator).build()); } private ServerHttpRequestDecorator requestDecorator(ServerWebExchange exchange) { ServerHttpRequestDecorator serverHttpRequestDecorator = new ServerHttpRequestDecorator(exchange.getRequest()) { @Override public Flux<DataBuffer> getBody() { Flux<DataBuffer> body = super.getBody(); return body.map(dataBuffer -> { byte[] content = new byte[dataBuffer.readableByteCount()]; dataBuffer.read(content); //釋放掉內存 DataBufferUtils.release(dataBuffer); //request body的json格式數據 String bodyJson = new String(content, Charset.forName("UTF-8")); //轉化成json對象 JSONObject jsonObject = JSON.parseObject(bodyJson); // 對原先的body進行修改操作 jsonObject.put("testName", "testValue"); String result = jsonObject.toJSONString(); //轉成字節 byte[] bytes = result.getBytes(); NettyDataBufferFactory nettyDataBufferFactory = new NettyDataBufferFactory(ByteBufAllocator.DEFAULT); DataBuffer buffer = nettyDataBufferFactory.allocateBuffer(bytes.length); buffer.write(bytes); return buffer; }); } //復寫getHeaders方法 @Override public HttpHeaders getHeaders() { HttpHeaders httpHeaders = new HttpHeaders(); httpHeaders.putAll(super.getHeaders()); //由於修改了請求體的body,導致content-length長度不確定,因此需要刪除原先的content-length httpHeaders.remove(HttpHeaders.CONTENT_LENGTH); httpHeaders.set(HttpHeaders.TRANSFER_ENCODING, "chunked"); return httpHeaders; } }; return serverHttpRequestDecorator; } @Override public int getOrder() { return -1; } }
原先body會被截斷的寫法B:
package cn.miao.gateway.filter; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import io.netty.buffer.ByteBufAllocator; import lombok.extern.slf4j.Slf4j; import org.springframework.cloud.gateway.filter.GatewayFilter; import org.springframework.cloud.gateway.filter.GatewayFilterChain; import org.springframework.core.Ordered; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferUtils; import org.springframework.core.io.buffer.NettyDataBufferFactory; import org.springframework.http.HttpMethod; import org.springframework.http.server.reactive.ServerHttpRequest; import org.springframework.http.server.reactive.ServerHttpRequestDecorator; import org.springframework.stereotype.Component; import org.springframework.web.server.ServerWebExchange; import org.springframework.web.util.UriComponentsBuilder; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import java.net.URI; import java.nio.charset.StandardCharsets; @Slf4j @Component public class RequestBodyOperationFilter implements GatewayFilter, Ordered { @Override public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { ServerHttpRequest request = exchange.getRequest(); if (request.getMethod() != HttpMethod.POST) { return chain.filter(exchange); } ServerWebExchange serverWebExchange = requestExchange(exchange); return chain.filter(serverWebExchange); } private ServerWebExchange requestExchange(ServerWebExchange exchange) { ServerHttpRequest serverHttpRequest = exchange.getRequest(); URI requestUri = serverHttpRequest.getURI(); URI ex = UriComponentsBuilder.fromUri(requestUri).build(true).toUri(); ServerHttpRequest newRequest = serverHttpRequest.mutate().uri(ex).build(); // 獲取body內容 Flux<DataBuffer> body = serverHttpRequest.getBody(); StringBuilder sb = new StringBuilder(); body.subscribe(dataBuffer -> { byte[] bytes = new byte[dataBuffer.readableByteCount()]; dataBuffer.read(bytes); DataBufferUtils.release(dataBuffer); String bodyString = new String(bytes, StandardCharsets.UTF_8); sb.append(bodyString); }); String bodyStr = sb.toString(); // 對原先的body進行修改操作 JSONObject jsonObject = JSON.parseObject(bodyStr); jsonObject.put("testName", "testValue"); String result = jsonObject.toJSONString(); //轉成字節 byte[] bytes = result.getBytes(StandardCharsets.UTF_8); NettyDataBufferFactory nettyDataBufferFactory = new NettyDataBufferFactory(ByteBufAllocator.DEFAULT); DataBuffer bodyDataBuffer = nettyDataBufferFactory.allocateBuffer(bytes.length); bodyDataBuffer.write(bytes); Flux<DataBuffer> bodyFlux = Flux.just(bodyDataBuffer); newRequest = new ServerHttpRequestDecorator(newRequest) { @Override public Flux<DataBuffer> getBody() { return bodyFlux; } }; return exchange.mutate().request(newRequest).build(); } @Override public int getOrder() { return -1; } }