zipkin微服務調用鏈分析


1.zipkin的作用
在微服務架構下,一個http請求從發出到響應,中間可能經過了N多服務的調用,或者N多邏輯操作,
如何監控某個服務,或者某個邏輯操作的執行情況,對分析耗時操作,性能瓶頸具有很大價值,
zipkin幫助我們實現了這一監控功能。

2.啟動zipkin
下載可執行文件: https://zipkin.io/quickstart.sh | bash -s

java -jar zipkin.jar

運行結果:

zipkin監聽9411端口,通過瀏覽器查看:

3.python中zipkin的實現模塊py_zipkin
創建flask項目,新建demo.py

import requests
from flask import Flask
from py_zipkin.zipkin import zipkin_span,create_http_headers_for_new_span
import time

app = Flask(__name__)

app.config.update({
    "ZIPKIN_HOST":"127.0.0.1",
    "ZIPKIN_PORT":"9411",
    "APP_PORT":5000,
    # any other app config-y things
})


def do_stuff():
    time.sleep(2)
    headers = create_http_headers_for_new_span()
    requests.get('http://localhost:6000/service1/', headers=headers)
    return 'OK'


def http_transport(encoded_span):
    # encoding prefix explained in https://github.com/Yelp/py_zipkin#transport
    #body = b"\x0c\x00\x00\x00\x01"+encoded_span
    body=encoded_span
    zipkin_url="http://127.0.0.1:9411/api/v1/spans"
    #zipkin_url = "http://{host}:{port}/api/v1/spans".format(
     #   host=app.config["ZIPKIN_HOST"], port=app.config["ZIPKIN_PORT"])
    headers = {"Content-Type": "application/x-thrift"}

    # You'd probably want to wrap this in a try/except in case POSTing fails
    r=requests.post(zipkin_url, data=body, headers=headers)
    print(type(encoded_span))
    print(encoded_span)
    print(body)
    print(r)
    print(r.content)


@app.route('/')
def index():
    with zipkin_span(
        service_name='webapp',
        span_name='index',
        transport_handler=http_transport,
        port=5000,
        sample_rate=100, #0.05, # Value between 0.0 and 100.0
    ):
        with zipkin_span(service_name='webapp', span_name='do_stuff'):
            do_stuff()
        time.sleep(1)
    return 'OK', 200

if __name__=='__main__':
    app.run(host="0.0.0.0",port=5000,debug=True)

新建server1.py

from flask import request
import requests
from flask import Flask
from py_zipkin.zipkin import zipkin_span,ZipkinAttrs
import time

import MySQLdb

app = Flask(__name__)
app.config.update({
    "ZIPKIN_HOST":"127.0.0.1",
    "ZIPKIN_PORT":"9411",
    "APP_PORT":5000,
    # any other app config-y things
})


def do_stuff():
    time.sleep(2)
    with zipkin_span(service_name='service1', span_name='service1_db_search'):
        db_search()
    return 'OK'


def db_search():
    # 打開數據庫連接
    db = MySQLdb.connect("127.0.0.1", "username", "psw", "db", charset='utf8')
    # 使用cursor()方法獲取操作游標
    cursor = db.cursor()
    # 使用execute方法執行SQL語句
    cursor.execute("SELECT VERSION()")
    # 使用 fetchone() 方法獲取一條數據
    data = cursor.fetchone()
    print("Database version : %s " % data)
    # 關閉數據庫連接
    db.close()

def http_transport(encoded_span):
    # encoding prefix explained in https://github.com/Yelp/py_zipkin#transport
    #body = b"\x0c\x00\x00\x00\x01" + encoded_span
    body=encoded_span
    zipkin_url="http://127.0.0.1:9411/api/v1/spans"
    #zipkin_url = "http://{host}:{port}/api/v1/spans".format(
    #    host=app.config["ZIPKIN_HOST"], port=app.config["ZIPKIN_PORT"])
    headers = {"Content-Type": "application/x-thrift"}

    # You'd probably want to wrap this in a try/except in case POSTing fails
    requests.post(zipkin_url, data=body, headers=headers)


@app.route('/service1/')
def index():
    with zipkin_span(
        service_name='service1',
        zipkin_attrs=ZipkinAttrs(
            trace_id=request.headers['X-B3-TraceID'],
            span_id=request.headers['X-B3-SpanID'],
            parent_span_id=request.headers['X-B3-ParentSpanID'],
            flags=request.headers['X-B3-Flags'],
            is_sampled=request.headers['X-B3-Sampled'],
        ),
        span_name='index_service1',
        transport_handler=http_transport,
        port=6000,
        sample_rate=100, #0.05, # Value between 0.0 and 100.0
    ):
        with zipkin_span(service_name='service1', span_name='service1_do_stuff'):
            do_stuff()
    return 'OK', 200

if __name__=='__main__':
    app.run(host="0.0.0.0",port=6000,debug=True)

運行demo.py

運行server1.py

訪問5000端口

查看調用鏈:

可以看到,有webapp和services兩個service,5個span標簽,可以清楚看到service和service,service和span,span和span之間的關系,和各span耗時情況。

4.py_zipkin代碼分析
上例中我們主要使用了py_zipkin的兩個對象,zipkin_span和create_http_headers_for_new_span
1)zipkin_span

with zipkin_span(
        service_name='webapp',
        span_name='index',
        transport_handler=http_transport,
        port=5000,
        sample_rate=100, #0.05, # Value between 0.0 and 100.0
    ):

service_name:服務名
span_name:標簽名,用來標志服務里面的某個操作
transport_handler:處理函數,post數據到zipkin
port:服務端口號
sample_rate:待研究

需要注意的是,在一個服務里面,只有root-span需要定義transport_handler,port等參數,非root-span只有service_name是必須的,其他參數繼承root-span

with zipkin_span(service_name='webapp', span_name='do_stuff'):
    do_stuff()

zip_span還可以是用裝飾器的方式,來包裹對應的操作

@zipkin_span(service_name='webapp', span_name='do_stuff')
def do_stuff():
    time.sleep(2)
    headers = create_http_headers_for_new_span()
    requests.get('http://localhost:6000/service1/', headers=headers)
    return 'OK'

但是,我這邊試了裝飾器的方式,不起作用

transport_handler的定義如下:

def http_transport(encoded_span):
    # encoding prefix explained in https://github.com/Yelp/py_zipkin#transport
    #body = b"\x0c\x00\x00\x00\x01"+encoded_span
    body=encoded_span
    zipkin_url="http://127.0.0.1:9411/api/v1/spans"
    #zipkin_url = "http://{host}:{port}/api/v1/spans".format(
     #   host=app.config["ZIPKIN_HOST"], port=app.config["ZIPKIN_PORT"])
    headers = {"Content-Type": "application/x-thrift"}

    # You'd probably want to wrap this in a try/except in case POSTing fails
    r=requests.post(zipkin_url, data=body, headers=headers)

zipkin會把編碼后的span,通過接口post到zipkin,我們通過瀏覽器就可以看到調用鏈詳情了。
2)create_http_headers_for_new_span
跳到一個新的服務時,通過create_http_headers_for_new_span生成新的span信息,包括trace_id,span_id,parent_span_id等,服務之間就做好關聯了

with zipkin_span(
        service_name='service1',
        zipkin_attrs=ZipkinAttrs(
            trace_id=request.headers['X-B3-TraceID'],
            span_id=request.headers['X-B3-SpanID'],
            parent_span_id=request.headers['X-B3-ParentSpanID'],
            flags=request.headers['X-B3-Flags'],
            is_sampled=request.headers['X-B3-Sampled'],
        ),
        span_name='index_service1',
        transport_handler=http_transport,
        port=6000,
        sample_rate=100, #0.05, # Value between 0.0 and 100.0
    ):


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM