[spring cloud] [error] java.lang.IllegalStateException: Only one connection receive subscriber allowed.


前言

最近在開發api-gateway的時候遇到了一個問題,網上能夠找到的解決方案也很少,之后由公司的大佬解決了這個問題。寫下這篇文章記錄一下解決方案。希望可以幫助到更多的人。

 

環境

  • java版本:8
  • 框架:spring-cloud 2.0.0.RC1

 

介紹

api-gateway主要接收前端請求,然后對請求的數據進行驗證,驗證之后請求反向代理到服務器。當請求 method 為 GET 時,可以順利通過api-gateway。當請求 method 為 POST 時,api-gateway則會報如下錯誤:

2018-07-18 11:49:04.073 ERROR 3025 --- [ctor-http-nio-4] .a.w.r.e.DefaultErrorWebExceptionHandler : Failed to handle request [POST http://localhost:9000/api/hello]

java.lang.IllegalStateException: Only one connection receive subscriber allowed.
    at reactor.ipc.netty.channel.FluxReceive.startReceiver(FluxReceive.java:276) ~[reactor-netty-0.7.5.RELEASE.jar:0.7.5.RELEASE]
    at reactor.ipc.netty.channel.FluxReceive.lambda$subscribe$2(FluxReceive.java:127) ~[reactor-netty-0.7.5.RELEASE.jar:0.7.5.RELEASE]
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) ~[netty-common-4.1.22.Final.jar:4.1.22.Final]
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404) ~[netty-common-4.1.22.Final.jar:4.1.22.Final]
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463) ~[netty-transport-4.1.22.Final.jar:4.1.22.Final]
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:886) ~[netty-common-4.1.22.Final.jar:4.1.22.Final]
    at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_171]

 

錯誤分析

實際上spring-cloud-gateway反向代理的原理是,首先讀取原請求的數據,然后構造一個新的請求,將原請求的數據封裝到新的請求中,然后再轉發出去。然而我們在他封裝之前讀取了一次request body,而request body只能讀取一次。因此就出現了上面的錯誤。

 

解決方案

對於上面的錯誤我們給出的解決方案是:

讀取request body的時候,我們再封裝一次request,轉發出去

 

下面是我們的代碼:

@Component
public class PostFilter extends AbstractNameValueGatewayFilterFactory {


    private static final String X_APP_ID_HEADER = "X-app-id";
    public static final String X_FORWARDED_FOR = "X-Forwarded-For";

    @Override
    public GatewayFilter apply(NameValueConfig nameValueConfig) {
        return (exchange, chain) -> {
            URI uri = exchange.getRequest().getURI();
            URI ex = UriComponentsBuilder.fromUri(uri).build(true).toUri();
            ServerHttpRequest request = exchange.getRequest().mutate().uri(ex).build();
            if("POST".equalsIgnoreCase(request.getMethodValue())){//判斷是否為POST請求
                Flux<DataBuffer> body = request.getBody();
                AtomicReference<String> bodyRef = new AtomicReference<>();//緩存讀取的request body信息
                body.subscribe(dataBuffer -> {
                    CharBuffer charBuffer = StandardCharsets.UTF_8.decode(dataBuffer.asByteBuffer());
                    DataBufferUtils.release(dataBuffer);
                    bodyRef.set(charBuffer.toString());
                });//讀取request body到緩存
                String bodyStr = bodyRef.get();//獲取request body
                System.out.println(bodyStr);//這里是我們需要做的操作
                DataBuffer bodyDataBuffer = stringBuffer(bodyStr);
                Flux<DataBuffer> bodyFlux = Flux.just(bodyDataBuffer);

                request = new ServerHttpRequestDecorator(request){
                    @Override
                    public Flux<DataBuffer> getBody() {
                        return bodyFlux;
                    }
                };//封裝我們的request
            }
            return chain.filter(exchange.mutate().request(request).build());
        };
    }

    protected DataBuffer stringBuffer(String value) {
        byte[] bytes = value.getBytes(StandardCharsets.UTF_8);

        NettyDataBufferFactory nettyDataBufferFactory = new NettyDataBufferFactory(ByteBufAllocator.DEFAULT);
        DataBuffer buffer = nettyDataBufferFactory.allocateBuffer(bytes.length);
        buffer.write(bytes);
        return buffer;
    }
}

至此,該問題得到解決。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM