基於 OpenTelemetry 的鏈路追蹤


鏈路追蹤的前世今生

分布式跟蹤(也稱為分布式請求跟蹤)是一種用於分析和監控應用程序的方法,尤其是使用微服務架構構建的應用程序。分布式跟蹤有助於精確定位故障發生的位置以及導致性能差的原因。

起源

鏈路追蹤(Distributed Tracing) 一詞最早出現於谷歌發布的論文 《Dapper, a Large-Scale Distributed Systems Tracing Infrastructure》 中,這篇論文對於實現鏈路追蹤,對於后來出現的 Jaeger、Zipkin 等開源分布式追蹤項目設計理念仍有很深的影響。

微服務架構是一個分布式的架構,會有很多個不同的服務。不同的服務之前相互調用,如果出現了錯誤由於一個請求經過了 N 個服務。隨着業務的增加越來越多的服務之間的調用,如果沒有一個工具去記錄調用鏈,解決問題的時候就會像下面圖片里小貓咪玩的毛線球一樣,毫無頭緒,無從下手

所以需要有一個工具能夠清楚的了解一個請求經過了哪些服務,順序是如何,從而能夠輕易的定位問題。

百家爭艷

從谷歌發布 Dapper 后,分布式鏈路追蹤工具越來越多,以下簡單列舉了一些常用的鏈路追蹤系統

  • Skywalking
  • 阿里 鷹眼
  • 大眾點評 CAT
  • Twitter Zipkin
  • Naver pinpoint
  • Uber Jaeger

爭鋒相對?

隨着鏈路追蹤工具越來越多,開源領域主要分為兩派,一派是以 CNCF技術委員 會為主的 OpenTracing 的規范,例如 jaeger zipkin 都是遵循了OpenTracing 的規范。而另一派則是谷歌作為發起者的 OpenCensus,而且谷歌本身還是最早提出鏈路追蹤概念的公司,后期連微軟也加入了 OpenCensus

OpenTelemetry 誕生

OpenTelemetric 是一組 API、SDK、模組和集成,專為創建和管理‎‎遙測數據‎‎(如追蹤、指標和日志)而設

微軟加入 OpenCensus 后,直接打破了之前平衡的局面,間接的導致了 OpenTelemetry 的誕生
谷歌和微軟下定決心結束江湖之亂,首要的問題是如何整合兩個兩個社區已有的項目,OpenTelemetry 主要的理念就是,兼容 OpenCensusOpenTracing ,可以讓使用者無需改動或者很小的改動就可以接入 OpenTelemetry

Kratos 的鏈路追蹤實踐

Kratos 一套輕量級 Go 微服務框架,包含大量微服務相關框架及工具。

tracing 中間件

kratos 框架提供的自帶中間件中有一個名為 tracing 中間件,它基於 Opentelemetry 實現了kratos 框架的鏈路追蹤功能,中間件的代碼可以從 middleware/tracing 中看到。

實現原理

kratos 的鏈路追蹤中間件由三個文件組成 carrie.go,tracer.go,tracing.go。client和 server 的實現原理基本相同,本文以 server 實現進行原理解析。

  1. 首先當請求進入時,tracing 中間件會被調用,首先調用了 tracer.go 中的 NewTracer 方法
// Server returns a new server middleware for OpenTelemetry.
func Server(opts ...Option) middleware.Middleware {
        // 調用 tracer.go 中的 NewTracer 傳入了一個 SpanKindServer 和配置項
	tracer := NewTracer(trace.SpanKindServer, opts...)
        // ... 省略代碼
}
  1. tracer.go 中的 NewTracer 方法被調用后會返回一個 Tracer,實現如下
func NewTracer(kind trace.SpanKind, opts ...Option) *Tracer {
	options := options{}
	for _, o := range opts {
		o(&options)
	}
	// 判斷是否存在 otel 追蹤提供者配置,如果存在則設置
	if options.TracerProvider != nil {
		otel.SetTracerProvider(options.TracerProvider)
	}
	/*
	判斷是否存在 Propagators 設置,如果存在設置則覆蓋,不存在則設置一個默認的TextMapPropagator
	注意如果沒有設置默認的TextMapPropagator,鏈路信息則無法正確的傳遞
	*/
	if options.Propagators != nil {
		otel.SetTextMapPropagator(options.Propagators)
	} else {	otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagation.Baggage{}, propagation.TraceContext{}))
	}


	var name string
	// 判斷當前中間件的類型,是 server 還是 client
	if kind == trace.SpanKindServer {
		name = "server"
	} else if kind == trace.SpanKindClient {
		name = "client"
	} else {
		panic(fmt.Sprintf("unsupported span kind: %v", kind))
	}
	// 調用 otel包的 Tracer 方法 傳入 name 用來創建一個 tracer 實例
	tracer := otel.Tracer(name)
	return &Tracer{tracer: tracer, kind: kind}
}
  1. 判斷當前請求類型,處理需要采集的數據,並調用 tracer.go 中的 Start 方法
var (
	component string
	operation string
	carrier   propagation.TextMapCarrier
)
// 判斷請求類型
if info, ok := http.FromServerContext(ctx); ok {
	// HTTP
	component = "HTTP"
	// 取出請求的地址
	operation = info.Request.RequestURI
	// 調用 otel/propagation包中的 HeaderCarrier,會處理 http.Header 以用來滿足TextMapCarrier interface
	// TextMapCarrier 是一個文本映射載體,用於承載信息
	carrier = propagation.HeaderCarrier(info.Request.Header)
	// otel.GetTextMapPropagator().Extract() 方法用於將文本映射載體,讀取到上下文中
	ctx = otel.GetTextMapPropagator().Extract(ctx, propagation.HeaderCarrier(info.Request.Header))
} else if info, ok := grpc.FromServerContext(ctx); ok {
	// Grpc
	component = "gRPC"
	operation = info.FullMethod
	//
	// 調用 grpc/metadata包中metadata.FromIncomingContext(ctx)傳入 ctx,轉換 grpc 的元數據
	if md, ok := metadata.FromIncomingContext(ctx); ok {
		// 調用carrier.go 中的 MetadataCarrier 將 MD 轉換 成文本映射載體
		carrier = MetadataCarrier(md)
	}
}
// 調用 tracer.Start 方法
ctx, span := tracer.Start(ctx, component, operation, carrier)
// ... 省略代碼
}
  1. 調用 tracing.go 中的 Start 方法
func (t *Tracer) Start(ctx context.Context, component string, operation string, carrier propagation.TextMapCarrier) (context.Context, trace.Span) {
	// 判斷當前中間件如果是 server則將 carrier 注入到上下文中
	if t.kind == trace.SpanKindServer {
		ctx = otel.GetTextMapPropagator().Extract(ctx, carrier)
	}
	// 調用otel/tracer 包中的 start 方法,用來創建一個 span
	ctx, span := t.tracer.Start(ctx,
		// tracing.go 中聲明的請求路由作為 spanName
		operation,
		// 設置 span 的屬性,設置了一個 component,component的值為請求類型
		trace.WithAttributes(attribute.String("component", component)),
		// 設置 span種類
		trace.WithSpanKind(t.kind),
	)
	// 判斷如果當前中間件是 client 則將 carrier 注入到請求里面
	if t.kind == trace.SpanKindClient {
		otel.GetTextMapPropagator().Inject(ctx, carrier)
	}
	return ctx, span
}
  1. defer 聲明了一個閉包方法
// 這個地方要注意,需要使用閉包,因為 defer 的參數是實時計算的如果異常發生,err 會一直為 nil
// https://github.com/go-kratos/kratos/issues/927
defer func() { tracer.End(ctx, span, err) }()
  1. 中間件繼續執行
// tracing.go 69行
reply, err = handler(ctx, req)
  1. 中間件調用結束 defer 中的閉包被調用后執行了 tracer.go 中的 End 方法
func (t *Tracer) End(ctx context.Context, span trace.Span, err error) {
	// 判斷是否有異常發生,如果有則設置一些異常信息
	if err != nil {
		// 記錄異常
		span.RecordError(err)
		// 設置span 屬性
		span.SetAttributes(
			// 設置事件為異常
			attribute.String("event", "error"),
			// 設置 message 為 err.Error().
			attribute.String("message", err.Error()),
		)
		//設置了 span 的狀態
		span.SetStatus(codes.Error, err.Error())
	} else {
		// 如果沒有發生異常,span 狀態則為 ok
		span.SetStatus(codes.Ok, "OK")
	}
	// 中止 span
	span.End()
}

如何使用

tracing 中間件的使用示例可以從 kratos/examples/traces ,該示例簡單的實現了跨服務間的鏈路追蹤,以下代碼片段包含部分示例代碼。

// https://github.com/go-kratos/kratos/blob/7f835db398c9d0332e69b81bad4c652b4b45ae2e/examples/traces/app/message/main.go#L38
// 首先調用otel 庫方法,得到一個 TracerProvider
func tracerProvider(url string) (*tracesdk.TracerProvider, error) {
	// examples/traces 中使用的是 jaeger,其他方式可以查看 opentelemetry 官方示例
	exp, err := jaeger.NewRawExporter(jaeger.WithCollectorEndpoint(jaeger.WithEndpoint(url)))
	if err != nil {
		return nil, err
	}
	tp := tracesdk.NewTracerProvider(
		tracesdk.WithSampler(tracesdk.AlwaysSample()),
		// 設置 Batcher,注冊jaeger導出程序
		tracesdk.WithBatcher(exp),
		// 記錄一些默認信息
		tracesdk.WithResource(resource.NewWithAttributes(
			semconv.ServiceNameKey.String(pb.User_ServiceDesc.ServiceName),
			attribute.String("environment", "development"),
			attribute.Int64("ID", 1),
		)),
	)
	return tp, nil
}

在 grpc/server 中使用

// https://github.com/go-kratos/kratos/blob/main/examples/traces/app/message/main.go
grpcSrv := grpc.NewServer(
	grpc.Address(":9000"),
	grpc.Middleware(
		// Configuring tracing Middleware
		tracing.Server(
			tracing.WithTracerProvider(tp),
		),
	),
)

在 grpc/client 中使用

// https://github.com/go-kratos/kratos/blob/149fc0195eb62ee1fbc2728adb92e1bcd1a12c4e/examples/traces/app/user/main.go#L63
conn, err := grpc.DialInsecure(ctx,
	grpc.WithEndpoint("127.0.0.1:9000"),
	grpc.WithMiddleware(
		tracing.Client(
			tracing.WithTracerProvider(s.tracer),
			tracing.WithPropagators(
				propagation.NewCompositeTextMapPropagator(propagation.Baggage{}, propagation.TraceContext{}),
			),
		)
	),
	grpc.WithTimeout(2*time.Second),
)

在 http/server 中使用

// https://github.com/go-kratos/kratos/blob/main/examples/traces/app/user/main.go
httpSrv := http.NewServer(http.Address(":8000"))
httpSrv.HandlePrefix("/", pb.NewUserHandler(s,
	http.Middleware(
		// Configuring tracing middleware
		tracing.Server(
			tracing.WithTracerProvider(tp),
			tracing.WithPropagators(
				propagation.NewCompositeTextMapPropagator(propagation.Baggage{}, propagation.TraceContext{}),
			),
		),
	),
)

在 http/client 中使用

http.NewClient(ctx, http.WithMiddleware(
	tracing.Client(
		tracing.WithTracerProvider(s.tracer),
	),
))

如何實現一個其他場景的 tracing

我們可以借鑒 kratostracing 中間件的代碼來實現例如數據庫的 tracing,如下面的代碼片段,作者借鑒了tracing 中間件,實現了 qmgo 庫操作 MongoDB 數據庫的 tracing

func mongoTracer(ctx context.Context,tp trace.TracerProvider, command interface{}) {
	var (
		commandName string
		failure     string
		nanos       int64
		reply       bson.Raw
		queryId     int64
		eventName   string
	)
	otel.SetTracerProvider(tp)
	reply = bson.Raw{}
	switch value := command.(type) {
	case *event.CommandStartedEvent:
		commandName = value.CommandName
		reply = value.Command
		queryId = value.RequestID
		eventName = "CommandStartedEvent"
	case *event.CommandSucceededEvent:
		commandName = value.CommandName
		nanos = value.DurationNanos
		queryId = value.RequestID
		eventName = "CommandSucceededEvent"
	case *event.CommandFailedEvent:
		commandName = value.CommandName
		failure = value.Failure
		nanos = value.DurationNanos
		queryId = value.RequestID
		eventName = "CommandFailedEvent"
	}
	duration, _ := time.ParseDuration(strconv.FormatInt(nanos, 10) + "ns")
	tracer := otel.Tracer("mongodb")
	kind := trace.SpanKindServer
	ctx, span := tracer.Start(ctx,
		commandName,
		trace.WithAttributes(
			attribute.String("event", eventName),
			attribute.String("command", commandName),
			attribute.String("query", reply.String()),
			attribute.Int64("queryId", queryId),
			attribute.String("ms", duration.String()),
		),
		trace.WithSpanKind(kind),
	)
	if failure != "" {
		span.RecordError(errors.New(failure))
	}
	span.End()
}

文章轉自



免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM