dubbo+zipkin調用鏈監控


分布式環境下,對於線上出現問題往往比單體應用要復雜的多,原因是前端的一個請求可能對應后端多個系統的多個請求,錯綜復雜。
分布式調用

對於快速問題定位,我們一般希望是這樣的:

  • 從下到下關鍵節點的日志,入參,出差,異常等。
  • 關鍵節點的響應時間
  • 關鍵節點依賴關系

而這些需求原來在單體應用中可以比較容易實現,但到了分布式環境,可能會出現:

  • 每個系統的技術棧不同
  • 有的系統有日志有的連日志都沒有
  • 日志實現手段不相同

以上系統都是自治的,要想看整體的調用鏈非常困難。

分布式系統日志統一的手段有很多,比如常見的ELK,但這些日志都是文本,不太容易做分析。

更希望看到類似如下瀏覽器對於網絡請求的分析:將分散的請求串聯在一起

瀏覽器網絡

zipkin

這是推特的一個產品,通過API收集各系統的調用鏈信息然后做數據分析,展示調用鏈數據。
zipkin

核心功能:

  • 搜索調用鏈信息
    此處不多說,無非就是從存儲中按一定條件搜索請求信息。

zipkin默認是內存存儲,也可以是其它的比如:mysq,elasticsearch

  • 查看某條請求的詳細調用鏈

比如查詢產品明細,除了產品的基本信息還需要展示對產品的所有評論。下圖可以清晰的展示調用關系,product-dubbo-consumer調用product-dubbo-provider,product-dubbo-provider內部再調用comment-dubbo-provider。每步之間的時間也一目了然。

上面顯示的時間默認是指調用端發起遠程開始到從服務端接收到數據,其中包含網絡連接以及數據傳輸的時間。

詳細請求調用鏈

  • 查看服務之間的依賴關系

互聯網項目目前微服務比較流行,微服務之間可能會存在循環引用形成一個網狀關系。當項目規模越來越大后,微服務之間的依賴關系估計誰也理不清,現在可以從請求鏈中清楚查看依賴。

詳細請求調用依賴

幾個關鍵概念

  • traceId
    就是一個全局的跟蹤ID,是跟蹤的入口點,根據需求來決定在哪生成traceId。比如一個http請求,首先入口是web應用,一般看完整的調用鏈這里自然是traceId生成的起點,結束點在web請求返回點。

  • spanId
    這是下一層的請求跟蹤ID,這個也根據自己的需求,比如認為一次rpc,一次sql執行等都可以是一個span。一個traceId包含一個以上的spanId。

  • parentId
    上一次請求跟蹤ID,用來將前后的請求串聯起來。

  • cs
    客戶端發起請求的時間,比如dubbo調用端開始執行遠程調用之前。

  • cr
    客戶端收到處理完請求的時間。

  • ss
    服務端處理完邏輯的時間。

  • sr
    服務端收到調用端請求的時間。

客戶端調用時間=cr-cs
服務端處理時間=sr-ss

優化考慮

默認系統是通過http請求將數據發送到zipkin,如果系統的調用量比較大,需要考慮如下這些問題:

  • 網絡傳輸
    如果一次請求內部包含多次遠程請求,那么對應span生成的數據會相對較大,可以考慮壓縮之后再傳輸。

  • 阻塞
    調用鏈的功能只是輔助功能,不能影響現有業務系統(比如性能相比之前有下降,zipkin的穩定性影響現有業務等),所以在推送日志時最好采用異步+容錯方式進行。

  • 數據丟失
    如果日志在后台積壓,未處理完時服務器出現重啟就會導致未來的急處理的日志數據會丟失,盡管這種調用數據可以容忍,但如果想做到極致的話,也是有辦法的,比如用消息隊列做緩沖。

dubbo zipkin

由於工作中一直用dubbo這個rpc框架實現微服務,以前我們基本都是在kibana平台上查詢各自服務的日志然后分析,比較麻煩,特別是在分析性能瓶頸時。在dubbo中引入zipkin是非常方便的,因為無非就是寫filter,在請求處理前后發送日志數據,讓zipkin生成調用鏈數據。

調用鏈跟蹤自動配置

由於我的項目環境是spring boot,所以附帶做一個調用鏈追蹤的自動配置。

  • 自動配置的注解
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface EnableTraceAutoConfigurationProperties {
}
  • 自動配置的實現,主要是將特定配置節點的值讀取到上下文對象中
@Configuration
@ConditionalOnBean(annotation = EnableTraceAutoConfigurationProperties.class)
@AutoConfigureAfter(SpringBootConfiguration.class)
@EnableConfigurationProperties(TraceConfig.class)
public class EnableTraceAutoConfiguration {

    @Autowired
    private TraceConfig traceConfig;

    @PostConstruct
    public void init() throws Exception {
        TraceContext.init(this.traceConfig);
    }
}
  • 配置類
@ConfigurationProperties(prefix = "dubbo.trace")
public class TraceConfig {

    private boolean enabled=true;

    private int connectTimeout;

    private int readTimeout;

    private int flushInterval=0;

    private boolean compressionEnabled=true;

    private String zipkinUrl;

    @Value("${server.port}")
    private int serverPort;

    @Value("${spring.application.name}")
    private String applicationName;

}
  • spring 配置
    按如下圖配置才能實現自動加載功能。
    spring自動配置

  • 啟動自動配置

最后在啟動類中增加@EnableTraceAutoConfigurationProperties即可顯示啟動。

追蹤上下文數據

因為一個請求內部會多次調用下級遠程服務,所以會共享traceId以及spanId等,設計一個TraceContext用來方便訪問這些共享數據。

這些上下文數據由於是請求級別,所以用ThreadLocal存儲

public class TraceContext extends AbstractContext {

    private static ThreadLocal<Long> TRACE_ID = new InheritableThreadLocal<>();

    private static ThreadLocal<Long> SPAN_ID = new InheritableThreadLocal<>();

    private static ThreadLocal<List<Span>> SPAN_LIST = new InheritableThreadLocal<>();

    public static final String TRACE_ID_KEY = "traceId";

    public static final String SPAN_ID_KEY = "spanId";

    public static final String ANNO_CS = "cs";

    public static final String ANNO_CR = "cr";

    public static final String ANNO_SR = "sr";

    public static final String ANNO_SS = "ss";

    private static TraceConfig traceConfig;


    public static void clear(){
        TRACE_ID.remove();
        SPAN_ID.remove();
        SPAN_LIST.remove();
    }

    public static void init(TraceConfig traceConfig) {
        setTraceConfig(traceConfig);
    }

    public static void start(){
        clear();
        SPAN_LIST.set(new ArrayList<Span>());
    }

}

zipkin日志收集器

這里直接使用http發送數據,詳細代碼就不貼了,核心功能就是將數據通過http傳送到zipkin,中間可以配合壓縮等優化手段。

日志收集器代理

由於考慮到會擴展到多種日志收集器,所以用代理做封裝。考慮到優化,可以結合線程池來異步執行日志發送,避免阻塞正常業務邏輯。

public class TraceAgent {
    private final AbstractSpanCollector collector;

    private final int THREAD_POOL_COUNT=5;

    private final ExecutorService executor =
            Executors.newFixedThreadPool(this.THREAD_POOL_COUNT, new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                    Thread worker = new Thread(r);
                    worker.setName("TRACE-AGENT-WORKER");
                    worker.setDaemon(true);
                    return worker;
                }
            });

    public TraceAgent(String server) {

        SpanCollectorMetricsHandler metrics = new SimpleMetricsHandler();

        collector = HttpCollector.create(server, TraceContext.getTraceConfig(), metrics);
    }

    public void send(final List<Span> spans){
        if (spans != null && !spans.isEmpty()){
            executor.submit(new Runnable() {
                @Override
                public void run() {
                    for (Span span : spans){
                        collector.collect(span);
                    }
                    collector.flush();
                }
            });
        }
    }
}

dubbo filter

上面做了那么的功能,都是為filter實現准備的。使用filter機制基本上可以認為對現有系統是無侵入性的,當然如果公司項目都直接引用dubbo原生包多少有些麻煩,最好的做法是公司對dubbo做一層包裝,然后項目引用包裝之后的包,這樣就可以避免上面提到的問題,如此一來,調用端只涉及到修改配置文件。

  • 調用端filter
    調用端是調用鏈的入口,但需要判斷是第一次調用還是內部多次調用。如果是第一次調用那么生成全新的traceId以及spanId。如果是內部多次調用,那么需要從TraceContext中獲取traceId以及spanId。
private Span startTrace(Invoker<?> invoker, Invocation invocation) {

    Span consumerSpan = new Span();

    Long traceId=null;
    long id = IdUtils.get();
    consumerSpan.setId(id);
    if(null==TraceContext.getTraceId()){
        TraceContext.start();
        traceId=id;
    }
    else {
        traceId=TraceContext.getTraceId();
    }

    consumerSpan.setTrace_id(traceId);
    consumerSpan.setParent_id(TraceContext.getSpanId());
    consumerSpan.setName(TraceContext.getTraceConfig().getApplicationName());
    long timestamp = System.currentTimeMillis()*1000;
    consumerSpan.setTimestamp(timestamp);

    consumerSpan.addToAnnotations(
            Annotation.create(timestamp, TraceContext.ANNO_CS,
                    Endpoint.create(
                            TraceContext.getTraceConfig().getApplicationName(),
                            NetworkUtils.ip2Num(NetworkUtils.getSiteIp()),
                            TraceContext.getTraceConfig().getServerPort() )));

    Map<String, String> attaches = invocation.getAttachments();
    attaches.put(TraceContext.TRACE_ID_KEY, String.valueOf(consumerSpan.getTrace_id()));
    attaches.put(TraceContext.SPAN_ID_KEY, String.valueOf(consumerSpan.getId()));
    return consumerSpan;
}

private void endTrace(Span span, Stopwatch watch) {

    span.addToAnnotations(
            Annotation.create(System.currentTimeMillis()*1000, TraceContext.ANNO_CR,
                    Endpoint.create(
                            span.getName(),
                            NetworkUtils.ip2Num(NetworkUtils.getSiteIp()),
                            TraceContext.getTraceConfig().getServerPort())));

    span.setDuration(watch.stop().elapsed(TimeUnit.MICROSECONDS));
    TraceAgent traceAgent=new TraceAgent(TraceContext.getTraceConfig().getZipkinUrl());

    traceAgent.send(TraceContext.getSpans());

}

調用端需要通過Invocation的參數列表將生成的traceId以及spanId傳遞到下游系統中。

Map<String, String> attaches = invocation.getAttachments();
attaches.put(TraceContext.TRACE_ID_KEY, String.valueOf(consumerSpan.getTrace_id()));
attaches.put(TraceContext.SPAN_ID_KEY, String.valueOf(consumerSpan.getId()));
  • 服務端filter
    與調用端的邏輯類似,核心區別在於發送給zipkin的數據是服務端的。
private Span startTrace(Map<String, String> attaches) {

    Long traceId = Long.valueOf(attaches.get(TraceContext.TRACE_ID_KEY));
    Long parentSpanId = Long.valueOf(attaches.get(TraceContext.SPAN_ID_KEY));

    TraceContext.start();
    TraceContext.setTraceId(traceId);
    TraceContext.setSpanId(parentSpanId);

    Span providerSpan = new Span();

    long id = IdUtils.get();
    providerSpan.setId(id);
    providerSpan.setParent_id(parentSpanId);
    providerSpan.setTrace_id(traceId);
    providerSpan.setName(TraceContext.getTraceConfig().getApplicationName());
    long timestamp = System.currentTimeMillis()*1000;
    providerSpan.setTimestamp(timestamp);

    providerSpan.addToAnnotations(
            Annotation.create(timestamp, TraceContext.ANNO_SR,
                    Endpoint.create(
                            TraceContext.getTraceConfig().getApplicationName(),
                            NetworkUtils.ip2Num(NetworkUtils.getSiteIp()),
                            TraceContext.getTraceConfig().getServerPort() )));

    TraceContext.addSpan(providerSpan);
    return providerSpan;
}

private void endTrace(Span span, Stopwatch watch) {

    span.addToAnnotations(
            Annotation.create(System.currentTimeMillis()*1000, TraceContext.ANNO_SS,
                    Endpoint.create(
                            span.getName(),
                            NetworkUtils.ip2Num(NetworkUtils.getSiteIp()),
                            TraceContext.getTraceConfig().getServerPort())));

    span.setDuration(watch.stop().elapsed(TimeUnit.MICROSECONDS));
    TraceAgent traceAgent=new TraceAgent(TraceContext.getTraceConfig().getZipkinUrl());

    traceAgent.send(TraceContext.getSpans());

}

RPC之間的調用之所以能夠串起來,主要是通過dubbo的Invocation所攜帶的參數來傳遞

filter應用

  • 調用端
<dubbo:consumer filter="traceConsumerFilter"></dubbo:consumer>
  • 服務端
<dubbo:provider filter="traceProviderFilter" />

埋點

要想生成調用鏈的數據,就需要確認關鍵節點,不限於遠程調用,也有可能是本地的服務方法的調用,這就需要根據不同的需求來做埋點。

  • web 請求,通過filter機制,粗粒度。
  • rpc 請求,通過filter機制(一般rpc框架都有實現filter做擴展,如果沒有就只能自己實現),粗粒度。
  • 內部服務,通過AOP機制,一般結合注解,類似於Spring Cache的使用,細粒度。
  • 數據庫持久層,比如select,update這類,像mybatis都提供了攔截接口,與filter類似,細粒度。

代碼下載

https://github.com/jiangmin168168/jim-framework

引用

上面博主的思路還是很不錯的,不僅完成了基本功能也提到了需要注意的一些地方,我在此基本上按自己的方式做了一些調整。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM