skywalking的agent如何創建span(二)


  • 在tomcat插件中的beforeMethod方法中可以看到ContextManager.createEntrySpan。查看源碼,重要的有兩部分,第一部分為創建一個AbstractTracerContext,通過一步一步調試可以看到是創建了一個TracingContext(實現類),第二部分為創建EntrySpan。
private static ThreadLocal<AbstractTracerContext> CONTEXT = new ThreadLocal<AbstractTracerContext>();
public static AbstractSpan createEntrySpan(String operationName, ContextCarrier carrier) {
        AbstractSpan span;
        AbstractTracerContext context;
        operationName = StringUtil.cut(operationName, OPERATION_NAME_THRESHOLD);
        if (carrier != null && carrier.isValid()) {
            SamplingService samplingService = ServiceManager.INSTANCE.findService(SamplingService.class);
            samplingService.forceSampled();
            context = getOrCreate(operationName, true);
            span = context.createEntrySpan(operationName);
            context.extract(carrier);
        } else {
            context = getOrCreate(operationName, false); //① 
          span = context.createEntrySpan(operationName); //②
        }
        return span;
    }


private static AbstractTracerContext getOrCreate(String operationName, boolean forceSampling) {
    AbstractTracerContext context = CONTEXT.get();
    if (context == null) {
        if (StringUtil.isEmpty(operationName)) {
            if (LOGGER.isDebugEnable()) {
                LOGGER.debug("No operation name, ignore this trace.");
            }
            context = new IgnoredTracerContext();
        } else {
            if (EXTEND_SERVICE == null) {
                EXTEND_SERVICE = ServiceManager.INSTANCE.findService(ContextManagerExtendService.class);
            }
            context = EXTEND_SERVICE.createTraceContext(operationName, forceSampling);

        }
        CONTEXT.set(context);
    }
    return context;
}

步驟一種創建的context會被存儲到 ThreadLocal中,ThreadLocal保證了每個線程中的數據是獨立的

在TracingContext中定義了TraceSegment,在TraceSegment中定義了一個private List<AbstractTracingSpan> spans,用於存儲span信息。

/**
     * Initialize all fields with default value.
     */
    TracingContext(String firstOPName, SpanLimitWatcher spanLimitWatcher) {
      this.segment = new TraceSegment();
        this.spanIdGenerator = 0;
        isRunningInAsyncMode = false;
        createTime = System.currentTimeMillis();
        running = true;

        // profiling status
        if (PROFILE_TASK_EXECUTION_SERVICE == null) {
            PROFILE_TASK_EXECUTION_SERVICE = ServiceManager.INSTANCE.findService(ProfileTaskExecutionService.class);
        }
        this.profileStatus = PROFILE_TASK_EXECUTION_SERVICE.addProfiling(
            this, segment.getTraceSegmentId(), firstOPName);

        this.correlationContext = new CorrelationContext();
        this.extensionContext = new ExtensionContext();
        this.spanLimitWatcher = spanLimitWatcher;
    }
private List<AbstractTracingSpan> spans; 

 public TraceSegment() {
        this.traceSegmentId = GlobalIdGenerator.generate();
        this.spans = new LinkedList<>();
        this.relatedGlobalTraceId = new NewDistributedTraceId();
        this.createTime = System.currentTimeMillis();

        //收集線程信息
        threadID = Thread.currentThread().getId();
        threadName = Thread.currentThread().getName();
        ThreadGroup group = Thread.currentThread().getThreadGroup();
        threadGroup = (group == null ? null : group.getName());
    }

 

在步驟二中創建span,該span會被存儲到activeSpanStack ,span信息會在afterMethod中獲取出來,並對span信息補充。

 private LinkedList<AbstractSpan> activeSpanStack = new LinkedList<>(); 
/**
     * Add a new Span at the top of 'ActiveSpanStack'
     *
     * @param span the {@code span} to push
     */
    private AbstractSpan push(AbstractSpan span) {
        if (firstSpan == null) {
            firstSpan = span;
        }
        activeSpanStack.addLast(span);
        this.extensionContext.handle(span);
        return span;
    }


afterMethod源碼可以看到會獲取activeSpan,也就是activeSpanStack 中的span信息,在afterMethod中完成對span信息的補充后,會調用stopSpan


 @Override
    public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
                    Object ret) throws Throwable {
        Request request = (Request) allArguments[0];
        HttpServletResponse response = (HttpServletResponse) allArguments[1];

     AbstractSpan span = ContextManager.activeSpan();

        //設置所處的類信息
        Tags.LOCATION.set(span, objInst.getClass().getName());
if (IS_SERVLET_GET_STATUS_METHOD_EXIST && response.getStatus() >= 400) {
            span.errorOccurred();
            Tags.STATUS_CODE.set(span, Integer.toString(response.getStatus()));
        }
        // Active HTTP parameter collection automatically in the profiling context.
        if (!TomcatPluginConfig.Plugin.Tomcat.COLLECT_HTTP_PARAMS && span.isProfiling()) {
            collectHttpParam(request, span);
        }
        ContextManager.getRuntimeContext().remove(Constants.FORWARD_REQUEST_FLAG);
        ContextManager.stopSpan();
        return ret;
    }

 

繼續跟蹤stopSpan,可以看到在執行玩stopSpan后,會把對應的CONTEXT(也就是之前說的TracingContext)從ThreadLocal中移除
private static void stopSpan(AbstractSpan span, final AbstractTracerContext context) {
        if (context.stopSpan(span)) { CONTEXT.remove();
            RUNTIME_CONTEXT.remove();
        }
    }

繼續追蹤stopSpan到finish方法,可以看到第一部分的紅色代碼,判斷activeSpanStack是否還有span信息,該結果作為判斷是否調用notifyFinish實現數據向collector
發送的條件之一。這么做的意義在哪里呢?可以想象一下,Tomcat作為入口,探針為其創建了EntrySpan,在tomcat應用中可能會通過jdbc調用數據庫(ExitSpan)。這就像括號匹配一樣
"{[()]}"每個span都有beforeMethod和AfterMethod,只有當所有的afterMethod執行完后才能組建成一個完整的TraceSegment(數據的發送是以每個TraceSegment為單位的)。
/**
     * Finish this context, and notify all {@link TracingContextListener}s, managed by {@link
     * TracingContext.ListenerManager} and {@link TracingContext.TracingThreadListenerManager}
     */
    private void finish() {
        if (isRunningInAsyncMode) {
            asyncFinishLock.lock();
        }
        try {
            boolean isFinishedInMainThread = activeSpanStack.isEmpty() && running;
            if (isFinishedInMainThread) {
                /*
                 * Notify after tracing finished in the main thread.
                 */
                TracingThreadListenerManager.notifyFinish(this);
            }

            if (isFinishedInMainThread && (!isRunningInAsyncMode || asyncSpanCounter == 0)) {
                TraceSegment finishedSegment = segment.finish(isLimitMechanismWorking());
                TracingContext.ListenerManager.notifyFinish(finishedSegment);
                running = false;
            }
        } finally {
            if (isRunningInAsyncMode) {
                asyncFinishLock.unlock();
            }
        }
    }

 

追蹤notifyFinish方法一直到Channels類的save方法,數據會存放在本地緩存bufferChannels中。

public class Channels<T> {
    private final QueueBuffer<T>[] bufferChannels;
   
  ......
  ......
  
    public boolean save(T data) {
        int index = dataPartitioner.partition(bufferChannels.length, data);
        int retryCountDown = 1;
        if (BufferStrategy.IF_POSSIBLE.equals(strategy)) {
            int maxRetryCount = dataPartitioner.maxRetryCount();
            if (maxRetryCount > 1) {
                retryCountDown = maxRetryCount;
            }
        }
        for (; retryCountDown > 0; retryCountDown--) {
            if (bufferChannels[index].save(data)) {
                return true;
            }
        }
        return false;
    }

   ......
  ......
}

 

該緩存會通過另外一個線程實現數據的發送
 @Override
    public void run() {
        running = true;

        final List consumeList = new ArrayList(2000);
        while (running) {
            boolean hasData = false;
            for (Group target : consumeTargets) {
                boolean consume = consume(target, consumeList);
                hasData = hasData || consume;
            }

            if (!hasData) {
                try {
                    Thread.sleep(consumeCycle);
                } catch (InterruptedException e) {
                }
            }
        }

        // consumer thread is going to stop
        // consume the last time
        for (Group target : consumeTargets) {
            consume(target, consumeList);

            target.consumer.onExit();
        }
    }

 




免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM