Netflix網關zuul(1.x和2.x)全解析


zuul 是netflix開源的一個API Gateway 服務器, 本質上是一個web servlet應用。

Zuul可以通過加載動態過濾機制,從而實現以下各項功能:

  • 驗證與安全保障: 識別面向各類資源的驗證要求並拒絕那些與要求不符的請求。
  • 審查與監控: 在邊緣位置追蹤有意義數據及統計結果,從而為我們帶來准確的生產狀態結論。
  • 動態路由: 以動態方式根據需要將請求路由至不同后端集群處。
  • 壓力測試: 逐漸增加指向集群的負載流量,從而計算性能水平。
  • 負載分配: 為每一種負載類型分配對應容量,並棄用超出限定值的請求。
  • 靜態響應處理: 在邊緣位置直接建立部分響應,從而避免其流入內部集群。
  • 多區域彈性: 跨越AWS區域進行請求路由,旨在實現ELB使用多樣化並保證邊緣位置與使用者盡可能接近。

網關zuul從1.0到2.0 經歷了較大的變化,先從架構上看看吧

 zuul 1.0的架構

 

從上圖看,

1.ZuulServlet負責接收請求,對filter進行處理

/**
 * Core Zuul servlet which intializes and orchestrates zuulFilter execution
 *
 * @author Mikey Cohen
 *         Date: 12/23/11
 *         Time: 10:44 AM
 */
 @Override
    public void service(javax.servlet.ServletRequest servletRequest, javax.servlet.ServletResponse servletResponse) throws ServletException, IOException {
        try {
            init((HttpServletRequest) servletRequest, (HttpServletResponse) servletResponse);

            // Marks this request as having passed through the "Zuul engine", as opposed to servlets
            // explicitly bound in web.xml, for which requests will not have the same data attached
            RequestContext context = RequestContext.getCurrentContext();
            context.setZuulEngineRan();

            try {
                preRoute();
            } catch (ZuulException e) {
                error(e);
                postRoute();
                return;
            }
            try {
                route();
            } catch (ZuulException e) {
                error(e);
                postRoute();
                return;
            }
            try {
                postRoute();
            } catch (ZuulException e) {
                error(e);
                return;
            }

        } catch (Throwable e) {
            error(new ZuulException(e, 500, "UNHANDLED_EXCEPTION_" + e.getClass().getName()));
        } finally {
            RequestContext.getCurrentContext().unset();
        }
    }

其中

FilterProcessor處理核心類

前置filter

runFilters("pre"); //前置filter類型

跳轉filter

runFilters("route");

后置filter

runFilters("post");

2. zuul的核心是一系列的filters, 其作用可以類比Servlet框架的Filter,或者AOP。工作原理如下圖所示

Zuul可以對Groovy過濾器進行動態的加載,編譯,運行。FilterFileManager.java

/**
 * This class manages the directory polling for changes and new Groovy filters.
 * Polling interval and directories are specified in the initialization of the class, and a poller will check
 * for changes and additions.
 *
 * @author Mikey Cohen
 *         Date: 12/7/11
 *         Time: 12:09 PM
 */
    void processGroovyFiles(List<File> aFiles) throws Exception {

        List<Callable<Boolean>> tasks = new ArrayList<>();
        for (File file : aFiles) {
            tasks.add(() -> {
                try {
                    return filterLoader.putFilter(file);
                }
                catch(Exception e) {
                    LOG.error("Error loading groovy filter from disk! file = " + String.valueOf(file), e);
                    return false;
                }
            });
        }
        processFilesService.invokeAll(tasks, FILE_PROCESSOR_TASKS_TIMEOUT_SECS.get(), TimeUnit.SECONDS);
    }

 3.對groovy文件的動態操作管理類FilterScriptManagerServlet

/**
 * Servlet for uploading/downloading/managing scripts.
 * <p/>
 * <ul>
 * <li>Upload scripts to the registry for a given endpoint.</li>
 * <li>Download scripts from the registry</li>
 * <li>List all revisions of scripts for a given endpoint.</li>
 * <li>Mark a particular script revision as active for production.</li>
 * </ul>
 */
   @Override
    protected void doPut(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {

        if (!adminEnabled.get()) {
            response.sendError(HttpServletResponse.SC_FORBIDDEN, "Filter admin is disabled. See the zuul.filters.admin.enabled FastProperty.");
            return;
        }

        // retrieve arguments and validate
        String action = request.getParameter("action");
        /* validate the action and method */
        if (!isValidAction(request, response)) {
            return;
        }

        // perform action
        if ("UPLOAD".equals(action)) {
            handleUploadAction(request, response);
        } else if ("ACTIVATE".equals(action)) {
            handleActivateAction(request, response);
        } else if ("CANARY".equals(action)) {
            handleCanaryAction(request, response);
        } else if ("DEACTIVATE".equals(action)) {
            handledeActivateAction(request, response);
        }

    }

zuul 2.0架構

 

從上圖可以看到:

1.Zuul引入了Netty和RxJava,正如之前的 ZuulFilter 分為了 Pre,Post,Route,Error,Zuul2的Filter分為三種類型

  • Inbound Filters: 在路由之前執行
  • Endpoint Filters: 路由操作
  • Outbound Filters: 得到相應數據之后執行

使用RxJava重寫了Pre,Post,Route ZuulFilter的結構如下

 

ZuulServerChannelInitializer.java

    @Override
    protected void initChannel(Channel ch) throws Exception
    {
        // Configure our pipeline of ChannelHandlerS.
        ChannelPipeline pipeline = ch.pipeline();

        storeChannel(ch);
        addTimeoutHandlers(pipeline);
        addPassportHandler(pipeline);
        addTcpRelatedHandlers(pipeline);
        addHttp1Handlers(pipeline);
        addHttpRelatedHandlers(pipeline);
        addZuulHandlers(pipeline);
    }

其父類實現了addZuulHandlers方法

   protected void addZuulHandlers(final ChannelPipeline pipeline)
    {
        pipeline.addLast("logger", nettyLogger);
        pipeline.addLast(new ClientRequestReceiver(sessionContextDecorator));
        pipeline.addLast(passportLoggingHandler);
        addZuulFilterChainHandler(pipeline);
        pipeline.addLast(new ClientResponseWriter(requestCompleteHandler, registry));
    }

    protected void addZuulFilterChainHandler(final ChannelPipeline pipeline) {
        final ZuulFilter<HttpResponseMessage, HttpResponseMessage>[] responseFilters = getFilters(   //1
                new OutboundPassportStampingFilter(FILTERS_OUTBOUND_START),
                new OutboundPassportStampingFilter(FILTERS_OUTBOUND_END));

        // response filter chain
        final ZuulFilterChainRunner<HttpResponseMessage> responseFilterChain = getFilterChainRunner(responseFilters,
                filterUsageNotifier);

        // endpoint | response filter chain
        final FilterRunner<HttpRequestMessage, HttpResponseMessage> endPoint = getEndpointRunner(responseFilterChain,  //2
                filterUsageNotifier, filterLoader);

        final ZuulFilter<HttpRequestMessage, HttpRequestMessage>[] requestFilters = getFilters(                   //3
                new InboundPassportStampingFilter(FILTERS_INBOUND_START),
                new InboundPassportStampingFilter(FILTERS_INBOUND_END));

        // request filter chain | end point | response filter chain
        final ZuulFilterChainRunner<HttpRequestMessage> requestFilterChain = getFilterChainRunner(requestFilters,
                filterUsageNotifier, endPoint);

        pipeline.addLast(new ZuulFilterChainHandler(requestFilterChain, responseFilterChain));
    }

調用Handler處理

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof HttpRequestMessage) {
            zuulRequest = (HttpRequestMessage)msg;

            //Replace NETTY_SERVER_CHANNEL_HANDLER_CONTEXT in SessionContext
            final SessionContext zuulCtx = zuulRequest.getContext();
            zuulCtx.put(NETTY_SERVER_CHANNEL_HANDLER_CONTEXT, ctx);
            zuulCtx.put(ZUUL_FILTER_CHAIN, requestFilterChain);

            requestFilterChain.filter(zuulRequest);
        }
        else if ((msg instanceof HttpContent)&&(zuulRequest != null)) {
            requestFilterChain.filter(zuulRequest, (HttpContent) msg);
        }
        else {
            LOG.debug("Received unrecognized message type. " + msg.getClass().getName());
            ReferenceCountUtil.release(msg);
        }
    }

調用ZuulFilterChainRunner的filter方法

 @Override
    public void filter(T inMesg, HttpContent chunk) {
        String filterName = "-";
        try {
            Preconditions.checkNotNull(inMesg, "input message");

            final AtomicInteger runningFilterIdx = getRunningFilterIndex(inMesg);
            final int limit = runningFilterIdx.get();
            for (int i = 0; i < limit; i++) {
                final ZuulFilter<T, T> filter = filters[i];
                filterName = filter.filterName();
                if ((! filter.isDisabled()) && (! shouldSkipFilter(inMesg, filter))) {
                    final HttpContent newChunk = filter.processContentChunk(inMesg, chunk);
                    if (newChunk == null)  {
                        //Filter wants to break the chain and stop propagating this chunk any further
                        return;
                    }
                    //deallocate original chunk if necessary
                    if ((newChunk != chunk) && (chunk.refCnt() > 0)) {
                        chunk.release(chunk.refCnt());
                    }
                    chunk = newChunk;
                }
            }

            if (limit >= filters.length) {
                //Filter chain has run to end, pass down the channel pipeline
                invokeNextStage(inMesg, chunk);
            } else {
                inMesg.bufferBodyContents(chunk);

                boolean isAwaitingBody = isFilterAwaitingBody(inMesg);

                // Record passport states for start and end of buffering bodies.
                if (isAwaitingBody) {
                    CurrentPassport passport = CurrentPassport.fromSessionContext(inMesg.getContext());
                    if (inMesg.hasCompleteBody()) {
                        if (inMesg instanceof HttpRequestMessage) {
                            passport.addIfNotAlready(PassportState.FILTERS_INBOUND_BUF_END);
                        } else if (inMesg instanceof HttpResponseMessage) {
                            passport.addIfNotAlready(PassportState.FILTERS_OUTBOUND_BUF_END);
                        }
                    }
                    else {
                        if (inMesg instanceof HttpRequestMessage) {
                            passport.addIfNotAlready(PassportState.FILTERS_INBOUND_BUF_START);
                        } else if (inMesg instanceof HttpResponseMessage) {
                            passport.addIfNotAlready(PassportState.FILTERS_OUTBOUND_BUF_START);
                        }
                    }
                }

                if (isAwaitingBody && inMesg.hasCompleteBody()) {
                    //whole body has arrived, resume filter chain
                    runFilters(inMesg, runningFilterIdx);
                }
            }
        }
        catch (Exception ex) {
            handleException(inMesg, filterName, ex);
        }
    }

2.NettyClient 

            if (filter.getSyncType() == FilterSyncType.SYNC) {
                final SyncZuulFilter<I, O> syncFilter = (SyncZuulFilter) filter;
                final O outMesg = syncFilter.apply(inMesg);
                recordFilterCompletion(SUCCESS, filter, startTime, inMesg, snapshot);
                return (outMesg != null) ? outMesg : filter.getDefaultOutput(inMesg);
            }

            // async filter
            filter.incrementConcurrency();
            resumer = new FilterChainResumer(inMesg, filter, snapshot, startTime);
            filter.applyAsync(inMesg)
                .observeOn(Schedulers.from(getChannelHandlerContext(inMesg).executor()))
                .doOnUnsubscribe(resumer::decrementConcurrency)
                .subscribe(resumer);

ProxyEndpoint.java

    @Override
    public HttpResponseMessage apply(final HttpRequestMessage input) {
        // If no Origin has been selected, then just return a 404 static response.
        // handle any exception here
        try {

            if (origin == null) {
                handleNoOriginSelected();
                return null;
            }

            origin.getProxyTiming(zuulRequest).start();

            // To act the same as Ribbon, we must do this before starting execution (as well as before each attempt).
            IClientConfig requestConfig = origin.getExecutionContext(zuulRequest).getRequestConfig();
            originalReadTimeout = requestConfig.getProperty(ReadTimeout, null);
            setReadTimeoutOnContext(requestConfig, 1);

            origin.onRequestExecutionStart(zuulRequest);
            proxyRequestToOrigin(); //Doesn't return origin response to caller, calls invokeNext() internally in response filter chain
            return null;
        } catch (Exception ex) {
            handleError(ex);
            return null;
        }
    }

將請求轉發至遠端

    private void proxyRequestToOrigin() {
        Promise<PooledConnection> promise = null;
        try {
            attemptNum += 1;
            requestStat = createRequestStat();
            origin.preRequestChecks(zuulRequest);
            concurrentReqCount++;

            // update RPS trackers
            updateOriginRpsTrackers(origin, attemptNum);

            // We pass this AtomicReference<Server> here and the origin impl will assign the chosen server to it.
            promise = origin.connectToOrigin(zuulRequest, channelCtx.channel().eventLoop(), attemptNum, passport, chosenServer, chosenHostAddr);

            storeAndLogOriginRequestInfo();
            currentRequestAttempt = origin.newRequestAttempt(chosenServer.get(), context, attemptNum);
            requestAttempts.add(currentRequestAttempt);
            passport.add(PassportState.ORIGIN_CONN_ACQUIRE_START);

            if (promise.isDone()) {
                operationComplete(promise);
            } else {
                promise.addListener(this);
            }
        }
        catch (Exception ex) {
            LOG.error("Error while connecting to origin, UUID {} " + context.getUUID(), ex);
            storeAndLogOriginRequestInfo();
            if (promise != null && ! promise.isDone()) {
                promise.setFailure(ex);
            } else {
                errorFromOrigin(ex);
            }
        }
    }

調用BasicNettyOrigin

    @Override
    public Promise<PooledConnection> connectToOrigin(HttpRequestMessage zuulReq, EventLoop eventLoop, int attemptNumber,
                                                     CurrentPassport passport, AtomicReference<Server> chosenServer,
                                                     AtomicReference<String> chosenHostAddr) {
        return clientChannelManager.acquire(eventLoop, null, zuulReq.getMethod().toUpperCase(),
                zuulReq.getPath(), attemptNumber, passport, chosenServer, chosenHostAddr);
    }

 

3.小結

   >> zuul2通過啟動BaseServerStartup的實現類,啟動一個netty server

   >> netty server將ZuulFilter (InboundOutboundEndPoint)包裹成ChainRunner組合成netty的一個handler:ZuulFilterChainHandler

   >> ZuulFilterChainHandler將請求包裝成SyncZuulFilter封裝成NettyClient

4.zuul1和zuul2的選擇

  性能對比

Zuul 1 (阻塞)的應用場景

  cpu密集型任務

  簡單操作的需求

  開發簡單的需求

  實時請求高的

zuul2(非阻塞)的應用場景

  io密集的任務

  大請求或者大文件

  隊列的流式數據

  超大量的連接

參考文獻

【1】https://www.cnblogs.com/lexiaofei/p/7080257.html

【2】https://blog.csdn.net/lengyue309/article/details/82192118

【】https://github.com/strangeloop/StrangeLoop2017/blob/master/slides/ArthurGonigberg-ZuulsJourneyToNonBlocking.pdf


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM