轉載出處:https://topic.atatech.org/articles/210737
傳統的一個大型而又全面的系統,隨着業務體量的增大已經很難滿足市場對技術的需求,通過對將整塊業務系統拆分為多個互聯依賴的子系統並針對子系統進行獨立優化,能夠有效提升整個系統的吞吐量。在進行系統拆分之后,完整的業務事務邏輯所對應的功能會部署在多個子系統上,此時用戶的一次點擊請求會觸發若干次子系統之間的相互功能調用,如何分析一次用戶請求所觸發的多次跨系統的調用過程、如何定位存在響應問題的調用鏈路等等問題是分布式鏈路追蹤技術所要解決的問題。
EagleEye是分布式系統跟蹤技術Dapper的Java語言實現,EagleEye通過在每一次系統調用前和調用后進行埋點同時收集和分析埋點日志來梳理應用的調用和依賴關系,能夠快速定位異常、分析系統調用鏈路和評估鏈路瓶頸等。在EagleEye控制台上使用一次請求的traceId能夠分析出整條調用鏈的信息,效果圖如下所示: 圖 1
EagleEye很強大,底層實現的原理是什么樣的呢?本文通過分析EagleEye的源碼,來解釋EagleEye的背后執行原理。EagleEye在應用服務器中主要做了兩件事情:第一件是生成traceId和rpcId,並將這兩個信息傳遞到調用鏈的下游應用中;第二件是在本地記錄日志,主要是記錄本次調用的相關信息。
本文目錄如下:
- 1.traceId和rpcId
- 1.1. traceId
- 1.2. rpcId
- 1.3. traceId和rpcId傳遞原理
- 1.3.1. MetaQ的EagleEye埋點邏輯
- 1.3.1.1. MetaQ producer埋點邏輯
- 1.3.1.2. MetaQ consumer埋點邏輯
- 1.3.2. HSF的EagleEye埋點邏輯
- 1.3.2.1. HSF consumer的埋點
- 1.3.2.2 HSF provider的埋點邏輯
- 1.3.3 tair的EagleEye埋點邏輯
- 1.3.4 tddl的EagleEye埋點邏輯
- 1.3.1. MetaQ的EagleEye埋點邏輯
- 2.EagleEye日志記錄原理
- 2.1 EagleEye日志記錄源碼解讀
- 2.1.1 日志隊列中添加日志對象
- 2.1.2 從日志隊列中消費日志對象
- 2.2 EagleEye控制台分析traceId的原理
- 2.1 EagleEye日志記錄源碼解讀
- 3.自定義EagleEye接入
- 3.1 EagleEye信息塞入
- 3.1.1 客戶端中塞入EagleEye信息
- 3.1.2 入口型應用塞入EagleEye信息
- 3.1.3 定時任務塞入EagleEye信息
- 3.2 EagleEye信息取出
- 3.3 EagleEye異步線程恢復
- 3.1 EagleEye信息塞入
1.traceId和rpcId
在用戶請求到達服務器時,應用容器在執行實際業務處理之前,會先執行EagleEye的埋點邏輯(基於Servlet的Filter的機制),為這個請求分配一個全局唯一的調用鏈ID,這個ID在EagleEye 里面被稱為 traceId,traceId在整個調用過程中都不會改變,用於唯一標識這一次用戶請求。EagleEye將traceId存儲在ThreadLocal中的上下文信息里面,上下文信息里還有一個rpcId(等價於Dapper論文中的SpanID),rpcId用於區分同一個調用鏈下的多個網絡調用的發生順序和嵌套層次關系。
1.1. traceId
traceId的作用是唯一標識一次請求的整個調用鏈路,同時traceId在調用過程中會被傳遞到調用鏈下游,並且在調用全過程中保持不變。通過traceId可以把一次前端請求在不同服務器記錄的調用日志關聯起來,經過組合可以得出該請求的調用鏈信息。因此,EagleEye不僅可以分析到應用之間的直接調用關系,還可以得到他們的間接調用關系、以及上下游的業務處理信息;對於調用鏈的底層系統,可以追溯到它的最上層請求來源以及中間經過的所有節點。
traceId的生成原理如下所示:
EagleEye 使用了帶有業務語義的traceId方案,由五個部分組成:
第一部分是生成traceId的機器的8個字符的IP地址;
第二部分是13個字符的生成traceId的毫秒級的生成時間;
第三部分是4位(1000-9999)的自增順序數,順序數用於避免多線程並發時traceId碰撞;
第四部分是一個字符的標志位,用於標識生成該traceId的應用模塊(例如nginx模塊的標志位為e,Java應用中的標志位固定為d);
第五部分是4個字符的進程id。
如圖3所示,應用A是接受到來自用請求的一條調用鏈的開始端,在請求收到后它會先調用EagleEye.StartTrace生成traceId並放置在當前線程的ThreadLocal中,在應用A調用應用B、C的HSF服務,或者發送MetaQ消息時,traceId被包含在EagleEye上下文中,隨網絡請求到達應用B、C、F、G之中,並放置在接收端的當前線程ThreadLocal內,因此后續調用到的這些系統都會有EagleEye這次請求的上下文。這些系統再發起網絡請求時,也類似的攜帶了上下文信息的。
1.2. rpcId
traceId能夠唯一標識一條調用鏈,但是無法標識該調用鏈路的每一次調用的順序和嵌套層次,因此EagleEye還額外使用了rpcId,rpcId的作用是標識當前調用過程在整條調用鏈路的位置。rpcId用0.X1.X2.X3…..Xi表示,Xi都是非負整數,根節點的rpcId固定從0開始,第一層網絡調用的rpcId是0.X1,第二層的則為0.X1.X2,依次類推,通過rpcId,可以准確的還原出調用鏈上每次調用的層次關系和先后順序。
通過rpcId,可以准確的還原出調用鏈上每次調用的層次關系和先后順序。下面的圖4與圖3是同一個調用鏈,但是展現的是rpcId的層次關系,可以看出對 TDDL的訪問0.1.1.1源於 B到 D 的調用0.1.1,對 Tair 的訪問0.2.1.2源於C到E的調用0.2.1。
1.3. traceId和rpcId傳遞原理
在分析源碼之前,首先要明確一點,EagleEye中的rpc不只是rpc框架HSF,而是廣義的rpc概念,即泛指任何的遠程調用過程。
EagleEye已經集成在HSF、Notify、MetaQ、TDDL、Tair等集團中間件產品中,這些中間件能夠自動執行EagleEye的埋點邏輯,完成traceId和rpcId的上下游傳遞,下面將深入源碼來分析這一過程。
1.3.1. MetaQ的EagleEye埋點邏輯
MetaQ有兩個邏輯主體:消息的發送端producer和消息的消費端consumer,下面將分別MetaQ的producer和consumer的EagleEye埋點邏輯。
1.3.1.1. MetaQ producer埋點邏輯
首先metaq的producer實現類MetaProducerImpl在初始化時會在registerHook()方法中注冊兩個和EagleEye相關的hook:MetaQSendMessageHookImpl和MetaQSendMessageTraceHookImpl。
public class MetaProducerImpl extends TransactionMQProducer {
//其余代碼
private void registerHooks() {
if (!isAuthEnabled) {
//注冊MetaQSendMessageHookImpl
this.getDefaultMQProducerImpl().registerSendMessageHook(new MetaQSendMessageHookImpl());
//注冊MetaQSendMessageTraceHookImpl
this.getDefaultMQProducerImpl().registerSendMessageHook(new MetaQSendMessageTraceHookImpl());
//其余代碼
}
//其余代碼
}
其中MetaQSendMessageHookImpl負責生成rpcId和traceId,並將rpcId和traceId放入到上下文信息中的消息結構體traceBean中;MetaQSendMessageTraceHookImpl負責生產消息時的TRACE埋點,進行MetaQ的消息上下文的traceBean封裝(此處會封裝traceBean的所有信息,封裝過程中會再次校驗和封裝traceId和rpcId)。
下面看着兩個hook的源代碼。
首先看MetaQSendMessageHookImpl:
public class MetaQSendMessageHookImpl implements SendMessageHook, MetaQTraceConstants {
@Override
public String hookName() {
return "EagleEyeSendMessageHook";
}
//發送消息前開啟鷹眼rpc
@Override
public void sendMessageBefore(SendMessageContext context) {
MetaQSendMessageTraceLog.startEagleEyeRpc(context);
}
//發送消息后關閉鷹眼rpc
@Override
public void sendMessageAfter(SendMessageContext context) {
MetaQSendMessageTraceLog.endEagleEyeRpc(context);
}
}
跟進MetaQSendMessageTraceLog.startEagleEyeRpc(context)方法,該方法的主要邏輯為開啟一次Rpc調用(主要任務是計算rpcId並放入到rpc上下文RpcContext_inner中),並記錄該事件同時在EagleEyeContextListener中注冊事件的回調邏輯。
跟進MetaQSendMessageTraceLog.endEagleEyeRpc(context)方法,該方法主要邏輯是提交調用上下文,生成本地的EagleEye日志,並且並在異步執行的情況下重新將EagleEye信息塞入到上下文內容中。具體代碼如下所示:
public class MetaQSendMessageTraceLog {
/**
* 其余代碼
*/
public static void startEagleEyeRpc(SendMessageContext context) {
if (context == null || context.getMessage() == null
|| !MetaQTraceLogUtils.isEagleEyeTraceOn(context.getProducerGroup())) {
return;
}
Message msg = context.getMessage();
String eagleLog = EagleEyeLogUtils.pubEagleLog(context.getMessage(), context.getProducerGroup());
//標識一次Rpc調用開始,計算rpcId並將rpcId放入RpcContext_inner
EagleEye.startRpc(sendServiceName, eagleLog);
////記錄本次RPC調用事件到日志中,並在EagleEyeContextListener中注冊本次事件的回調邏輯
EagleEye.rpcClientSend();
EagleEye.requestSize(msg.getBody().length);
MetaQTraceContext mqTraceContext;
if (context.getMqTraceContext() != null) {
mqTraceContext = (MetaQTraceContext) context.getMqTraceContext();
} else {
mqTraceContext = new MetaQTraceContext();
}
context.setMqTraceContext(mqTraceContext);
MetaQTraceBean traceBean;
if (mqTraceContext.getTraceBeans() == null || mqTraceContext.getTraceBeans().isEmpty()) {
traceBean = new MetaQTraceBean();
mqTraceContext.setTraceBeans(Arrays.asList(traceBean));
} else {
traceBean = mqTraceContext.getTraceBeans().get(0);
}
if (StringUtils.isBlank(traceBean.getTraceId())) {
traceBean.setTraceId(EagleEye.getTraceId());
// 這里是因為如果rpcId太長會導致消息發送失敗,因為Message的屬性(Short.MAX_VALUE)太長
// 這里給Rpc一半的屬性長度
traceBean.setRpcId(EagleEye.getRpcId() + ".1");
if (traceBean.getRpcId() != null && traceBean.getRpcId().length() >= (Short.MAX_VALUE / 2)) {
traceBean.setRpcId(EagleEye.MAL_ROOT_RPC_ID);
}
traceBean.setEagleEyeUserData(EagleEye.exportUserData());
}
// 如果采用異步的方式提交消息,需要將EagleEye的context在不同線程中進行傳遞(默認EagleEye的context是存放在ThreadLocal 中)
if (CommunicationMode.ASYNC.equals(context.getCommunicationMode())) {
mqTraceContext.setRpcContextInner(EagleEye.getRpcContext());
EagleEye.popRpcContext();
}
}
public static void endEagleEyeRpc(SendMessageContext context) {
if (context == null || context.getMessage() == null
|| !MetaQTraceLogUtils.isEagleEyeTraceOn(context.getProducerGroup())) {
return;
}
if (context.getMqTraceContext() == null) {
return;
}
MetaQTraceContext ctx = (MetaQTraceContext) context.getMqTraceContext();
if (ctx.getTraceBeans() == null || ctx.getTraceBeans().isEmpty()) {
return;
}
MetaQTraceBean traceBean = ctx.getTraceBeans().get(0);
// 如果采用異步的方式提交消息,需要將 EagleEye 的 context 在不同線程中進行傳遞
// (默認 EagleEye 的 context 是存放在 ThreadLocal 中)
if (ctx.isAsync()) {
//通過傳入context,設置threadlocal變量
EagleEye.setRpcContext(ctx.getRpcContextInner());
}
//追加遠程服務地址到rpc日志中
EagleEye.remoteIp(traceBean.getStoreHost());
if (CommunicationMode.ONEWAY.equals(context.getCommunicationMode())) {
//記錄RPC響應的事件到日志中
EagleEye.rpcClientRecv(EagleEye.RPC_RESULT_SUCCESS, EagleEye.TYPE_METAQ);
return;
}
if (context.getSendResult() == null && CommunicationMode.SYNC.equals(context.getCommunicationMode())) {
///記錄RPC響應的事件到日志中
EagleEye.rpcClientRecv(EagleEye.RPC_RESULT_FAILED, EagleEye.TYPE_METAQ);
return;
}
if (context.getSendResult() != null) {
// 消息發送成功后追加MsgId到rpc調用日志中
EagleEye.callBack(context.getSendResult().getOffsetMsgId());
//記錄RPC響應的事件到日志中
EagleEye.rpcClientRecv(EagleEye.RPC_RESULT_SUCCESS, EagleEye.TYPE_METAQ);
} else {
if (context.getException() != null) {
//記錄RPC響應的事件到日志中
EagleEye.rpcClientRecv(EagleEye.RPC_RESULT_FAILED, EagleEye.TYPE_METAQ);
}
}
}
/**
* 其余代碼
*/
}
接下來看MetaQSendMessageTraceHookImpl :
public class MetaQSendMessageTraceHookImpl implements SendMessageHook, MetaQTraceConstants {
@Override
public String hookName() {
return "TraceSendMessageHook";
}
//發消息前開啟TRACE記錄
@Override
public void sendMessageBefore(SendMessageContext context) {
MetaQSendMessageTraceLog.startTrace(context);
MetaQSendMessageTraceLog.putEagleEyeToMsgProp(context);
}
//發消息后關閉TRACE記錄
@Override
public void sendMessageAfter(SendMessageContext context) {
//關閉trace記錄,清空ThreadLocal變量
MetaQSendMessageTraceLog.endTrace(context);
}
}
跟進MetaQSendMessageTraceLog.startTrace(context)方法,主要邏輯為將traceId和消息topic、tag等信息塞入到EagleEye的上下文信息中。需要注意的是,startTrace方法並不計算traceId,只會將當前已有的traceId塞入到上下文信息中,沒有traceId則不塞入。在后續消費者consumer消息的時候,若是沒有traceId,會重新計算一個traceId放入到上下文中,這個下一小節再講。
跟進MetaQSendMessageTraceLog.endTrace(context)方法,該方法的主要邏輯為關閉當前traceId記錄並記錄trace日志。
了解了MetaQ的埋點邏輯之后,還有一個問題就是兩個hook的邏輯是如何被執行的:上述的發送消息兩個hook的邏輯會在發送消息前和發送消息后執行,執行邏輯如下:
public class DefaultMQProducerImpl implements MQProducerInner{
//其余代碼
//執行發送消息前的hook
public void executeSendMessageHookBefore(final SendMessageContext context) {
if (!this.sendMessageHookList.isEmpty()) {
for (SendMessageHook hook : this.sendMessageHookList) {
try {
hook.sendMessageBefore(context);
} catch (Throwable e) {
log.warn("failed to executeSendMessageHookBefore", e);
}
}
}
}
//執行發送消息后的hook
public void executeSendMessageHookAfter(final SendMessageContext context) {
if (!this.sendMessageHookList.isEmpty()) {
for (SendMessageHook hook : this.sendMessageHookList) {
try {
hook.sendMessageAfter(context);
} catch (Throwable e) {
log.warn("failed to executeSendMessageHookAfter", e);
}
}
}
}
//其余代碼
}
1.3.1.2. MetaQ consumer埋點邏輯
同樣的,metaQ的consumer在初始化時也注冊了一個跟EagleEye相關的hook:MetaQConsumeMessageHookImpl。
MetaQConsumeMessageHookImpl主要有兩個邏輯:一個是消息消費前將消息屬性中的traid,rpcId等信息放入到消息上下文中,一個是消息消費后計算耗時,再將EagleEye相關的信息打到本地日志中。
public class MetaQConsumeMessageHookImpl implements ConsumeMessageHook, MetaQTraceConstants {
@Override
public String hookName() {
return "EagleEyeConsumeMessageHook";
}
@Override
public void consumeMessageBefore(ConsumeMessageContext context) {
//其他代碼
for (MessageExt message : context.getMsgList()) {
if (message == null) {
continue;
}
MessageClientExt msg = (MessageClientExt) message;
MetaQTraceBean traceBean = new MetaQTraceBean();
//從消息屬性中取出traceId和rpcId裝配到traceBean中
traceBean.setTraceId(msg.getProperty(TRACE_ID_KEY));
traceBean.setRpcId(msg.getProperty(RPC_ID_KEY));
traceBean.setEagleEyeUserData(msg.getProperty(USER_DATA_KEY));
//其他代碼
}
mqTraceContext.setTraceBeans(beans);
MetaQConsumeMessageTraceLog.consumeMessageBefore(mqTraceContext);
}
@Override
public void consumeMessageAfter(ConsumeMessageContext context) {
if (context == null || context.getMsgList() == null || context.getMsgList().isEmpty()
|| !MetaQTraceLogUtils.isTraceLogOn(context.getConsumerGroup())) {
return;
}
MetaQTraceContext mqTraceContext = (MetaQTraceContext) context.getMqTraceContext();
mqTraceContext.setSuccess(context.isSuccess());
mqTraceContext.setStatus(context.getStatus());
//記錄本地EagleEye日志,包括方法調用耗時等信息
MetaQConsumeMessageTraceLog.consumeMessageAfter(mqTraceContext);
}
}
1.3.2. HSF的EagleEye埋點邏輯
HSF的EagleEye埋點的原理是在hsf的consumer端和provider端分別寫了一個ClientFilter和ServerFilter,在消費服務和提供服務時執行響應的invoke邏輯,從而進行埋點。
1.3.2.1. HSF consumer的埋點
HSF consumer作為服務的調用端,其埋點目的是將本地的rpc上下文、traceId和rpcId等信息傳到調用端。HSF的consumer端使用EagleEyeClientFilter來執行響應的EagleEye埋點邏輯。具體如下所示:
public ListenableFuture<RPCResult> invoke(InvocationHandler invocationHandler, Invocation invocation)
throws Throwable {
/**
* 其余代碼
*/
String logName = methodLogNameService.convertToLogName(invocation, invocation.getMethodName(),
invocation.getMethodArgSigs(),
invocation.getMethodArgs());
// 調用EagleEye.startRpc()方法,計算rpcId並將rpcd放入到rpc上下文RpcContext_inner中
logService.startRpc(serviceUniqueName, logName, EagleEye.TYPE_HSF_CLIENT, invocation.getMethodArgs());
Object rpcCtx = logService.currentRpcContext();
if (null != rpcCtx) {
// 將rpc上下文信息塞入到調用對象invocation中,invocation最終會被傳遞到server端
invocation.setRequestProps(EagleEyeConstants.REQUEST_EAGLEEYE_CONTEXT, rpcCtx);
}
//在調用對象信息中塞入traceId和rpcId
invocation.put(EagleEyeConstants.EAGLEEYE_TRACE_ID_KEY, logService.getTraceId());
invocation.put(EagleEyeConstants.EAGLEEYE_RPC_ID_KEY, logService.getRpcId());
if (containerInfo.isSupportContainer()) {
invocation.setRequestProps(HSFConstants.CONTAINER_ID, containerInfo.getContainerId());
}
/**
* 其余代碼
*/
}
1.3.2.2 HSF provider的埋點邏輯
HSF provider作為服務的提供端,其埋點的目的是將調用端所傳來的rpc上下文、traceId和rpcId等信息進行還原。HSF provider端通過實現EagleEyeServerFilter來進行埋點,其invoke()方法源碼如下:
public ListenableFuture<RPCResult> invoke(InvocationHandler invocationHandler,
Invocation invocation) throws Throwable {
//取出rpc上下文信息
Object oldContext = EagleEye.getRpcContext();
try {
invocation.put(EAGLEEYE_EXECUTED_KEY, Boolean.TRUE);
//將rpc上下文信息放入到當前線程中
handleEagleEyeServerRecv(invocation);
//將rpc上下文放入到本次調用對象invocation中
invocation.put(INVOCATION_EAGLEEYE_CONTEXT_KEY, logService.getRpcContext());
invocation.addContextAware(eagleEyeServerContextAware);
//把traceId,rpcId放到本次調用對象invocation中
invocation.put(EagleEyeConstants.EAGLEEYE_TRACE_ID_KEY, logService.getTraceId());
invocation.put(EagleEyeConstants.EAGLEEYE_RPC_ID_KEY, logService.getRpcId());
ListenableFuture<RPCResult> rpcFuture = invocationHandler.invoke(invocation);
return rpcFuture;
} finally {
EagleEye.setRpcContext(oldContext);
}
}
1.3.3 tair的EagleEye埋點邏輯
tair的client在執行相應數據操作時也會進行EagleEye的埋點邏輯,包括put,get,invalid,delete等數據操作。在tair的基本數據操作源碼中,執行了相同的EagleEye埋點邏輯,下面以delete數據操作為例講述tair的EagleEye埋點邏輯。
public ResultCode delete(int namespace, Serializable key) {
if (asyncMethodDependent) {
return tairAsync.delete(namespace, key).get();
}
try {
//在執行數據操作前的EagleEye埋點
eagleEyeFirstOps(namespace, TairConstant.TAIR_REQ_REMOVE_PACKET);
/**
* 進行實際的delete數據操作過程
*/
} finally {
//在執行數據操作后的EagleEye埋點
eagleEyeLastOps(namespace);
}
}
tair中的EagleEye埋點主邏輯是在執行數據操作之前,調用eagleEyeFirstOps方法進行埋點,在數據操作完成后,再執行eagleEyeLastOps方法進行埋點,以記錄下完整的調用記錄。在tair的EagleEye埋點邏輯中,有兩個重要的狀態:isStart(當前步驟環境是否為開啟了一次tair操作)和isSend(當前步驟環境是否為已經發送tair數據請求),tair會在判斷這兩個狀態后才進行相應的埋點邏輯,具體代碼分析如下:
private void eagleEyeFirstOps(int namespace, int pcode) {
if (namespace != 0) {
//判斷當前步驟環境是否為開啟了一次tair操作
if (!EagleEyeState.isStartCall()) {
//EagleEye開啟一次rpc,計算rpcId並將rpcId放到上下文中
EagleEye.startRpc(String.valueOf(pcode), groupName);
}
//設置當前步驟環境為開啟了一次tair操作
EagleEyeState.setStart(true);
//設置當前步驟環境為未發送tair請求
EagleEyeState.setSend(false);
}
}
private void eagleEyeLastOps(int namespace) {
if (namespace != 0) {
//判斷當前步驟環境是否為已發送tair請求
if (!EagleEyeState.isSendCall()) {
//記錄發起本次調用的日志
EagleEye.rpcClientSend();
// 記錄響應本次調用的日志
EagleEye.rpcClientRecv(String.valueOf(EagleEyeState.getResultCode().getCode()),
EagleEye.TYPE_TAIR, String.valueOf(namespace));
EagleEyeState.setResultCode(ResultCode.ERROR_BEFORE_SEND);
}
//設置當前步驟環境為已發送tair請求
EagleEyeState.setSend(true);
//設置當前步驟環境為未開啟一次tair操作
EagleEyeState.setStart(false);
}
}
1.3.4 tddl的EagleEye埋點邏輯
tddl的接口TddlEagleeye負責進行tddl的EagleEye埋點邏輯,其實現類EagleEyeTaobaoImpl核心方法源碼分析如下所示:
@Activate(order = 2)
public class EagleEyeTaobaoImpl implements TddlEagleeye {
protected static Map<String, StatLogger> statLoggerMaps = new ConcurrentHashMap<String, StatLogger>();
@Override
public void startRpc(String ip, String port, String dbName, String sqlType) {
//開啟一次EagleEye rpc調用,計算rpcId並將rpcId放入到上下文變量中
EagleEye.startRpc(dbName, sqlType);
//將遠程服務器ip放入到上下文變量中
EagleEye.remoteIp(ip + ':' + port);
//記錄本次rpc調用的發起日志
EagleEye.rpcClientSend();
}
@Override
public void endRpc(SqlMetaData sqlMetaData, Exception e) {
String index = StringUtils.isEmpty(sqlMetaData.getLogicSql()) ? null : index("!" + sqlMetaData.getLogicSql());
if (e == null) {
//記錄本次rpc調用的響應日志,包含調用耗時等信息
EagleEye.rpcClientRecv(EagleEye.RPC_RESULT_SUCCESS, EagleEye.TYPE_TDDL, index);
} else {
EagleEye.rpcClientRecv(EagleEye.RPC_RESULT_FAILED, EagleEye.TYPE_TDDL, index);
}
}
/**
* 其余代碼
*/
}
2.EagleEye日志記錄原理
2.1 EagleEye日志記錄源碼解讀
EagleEye的日志記錄過程是高並發的,初期采用同步寫日志的方案,但是同步寫日志會導致在極端情況下線程池會被日志線程占滿,影響到業務應用的主線程。EagleEye作為輔助的功能,其記錄日志過程不應影響到主線程,因此EagleEye后期采用異步寫日志的方式:任何線程要寫日志,只需要把日志事件對象加入日志隊列就行了,后台會專門起一個線程從日志隊列中取出日志對象再寫入到本地文件中,雖然此舉會導致日志記錄有些許延時,但是保證了只有一個日志記錄線程,不會將線程池占滿。
前面小節的敘述中,大部分的情況下都是業務應用調用EagleEye.rpcClientRecv方法來進行打日志,跟進這個方法會發現底層調用了eagleeye自定義的日志appender來實現打日志的邏輯,下面將分析EagleEye自定義的appender的邏輯。
為了追求高性能,EagleEye采用異步的方式寫日志,並那么EagleEye就需要兩個邏輯流程:一個流程是往日志隊列中加入日志對象,一個流程是從日志隊列中取出日志對象,並寫日志到本地文件中。EagleEye維護了一個環形隊列,由 put、take 兩個游標來標識當前隊列的下一次調用 put 和 take 應該返回的位置,每當put一個元素則put指針往前移動一格,每當take一個元素也往前移動一格,並且保證put指針一直在take指針前面(“前面”是指逆時針方向上,put指針在take指針之前)。具體的樣式如圖5所示。有意思的是,EagleEye是采用一個環形數組來實現這個環形隊列的。
下面將從往隊列中添加日志對象與從隊列中取出日志對象兩個角度來分析EagleEye的記錄日志流程。
2.1.1 日志隊列中添加日志對象
往日志隊列中加入日志對象,是EagleEye的AsyncAppender類的append方法實現的,其原理是向一個環形隊列(由環形數組實現)中放入日志對象ctx,看一下具體過程:
class AsyncAppender extends EagleEyeAppender {
private static final int DEFAULT_NOTIFY_THRESHOLD = 512;
//環形數組實現的環形隊列
private final BaseContext[] entries;
private final int queueSize;
//下標掩碼,其值是queueSize-1
private final int indexMask;
private final ReentrantLock lock;
private final Condition notEmpty;
//put位置的游標
private AtomicLong putIndex;
//take位置的游標
private AtomicLong takeIndex;
private EagleEyeAppender appender;
int size() {
return (int) (putIndex.get() - takeIndex.get());
}
//塞入日志對象ctx的主邏輯
boolean append(BaseContext ctx) {
/**
* 其余代碼
*/
//將新的日志對象ctx放入到環形數組entries中,通過將當前put的值與下標掩碼indexMask進行與操作,計算出ctx應該被放入到的數組下標位置
if (putIndex.compareAndSet(put, put + 1)) {
entries[(int) put & indexMask] = ctx;���
if (size >= notifyThreshold && !running.get() && lock.tryLock()) {
try {
notEmpty.signal();
} catch (Exception e) {
EagleEye.selfLog("[ERROR] fail to signal notEmpty: " + workerName, e);
} finally {
lock.unlock();
}
}
return true;
}
}
}
/**
* 其余代碼
*/
}
其中,核心代碼是entries[(int)put & indexMask] = ctx,由於下標掩碼indexMask的值是queueSize-1,而queueSize的值是4096,所以indexMask的二進制碼是全為1的,因此能夠保證put與indexMask相與后得到的數組下標不會超過數組長度,同時能夠形成環形(當put的值開始超過數組長度時,與indexMask進行“與計算”后會重新被放入到數組的頭部)。
2.1.2 從日志隊列中消費日志對象
EagleEye的AsyncAppender類的start方法開啟了EagleEye日志消費的線程,具體源碼如下:
void start(EagleEyeAppender appender, BaseContextEncoder encoder, String workerName) {
/**
* 其他代碼
*/
//開啟記錄日志的AsyncAppender線程,並執行run方法
this.worker = new Thread(new AsyncAppender.AsyncRunnable(), "EagleEye-AsyncAppender-Thread-" + workerName);
this.worker.setDaemon(true);
this.worker.start();
}
再深入AsyncAppender的源碼,查看其run()方法的邏輯:
class AsyncRunnable implements Runnable{
public void run() {
/**
* 其余代碼
*/
//執行日志記錄邏輯
processContext(ctx, parent.appender, encoder);
}
private final void processContext(final BaseContext ctx,
final EagleEyeAppender appender, final BaseContextEncoder encoder) throws IOException {
if (ctx.isEvent()) {
final int logType = ctx.logType;
if (logType == LOG_TYPE_EVENT_FLUSH) {
appender.flush();
} else if (logType == LOG_TYPE_EVENT_ROLLOVER) {
appender.rollOver();
} else if (logType == LOG_TYPE_EVENT_RELOAD) {
appender.reload();
} else if (logType == LOG_TYPE_EVENT_CLOSE) {
doNotifyIfRequired(ctx);
throw closeEvent;
}
doNotifyIfRequired(ctx);
} else {
//調用encoder打印日志,encoder的實現類是DefaultRpcContextEncoder
encoder.encode(ctx, appender);
}
}
}
下面來看EagleEye的DefaultRpcContextEncoder類的具體打日志過程:
class DefaultRpcContextEncoder extends BaseContextEncoder {
@Override
public void encode(BaseContext base, EagleEyeAppender eea) throws IOException {
//定義日志輸出的StringBuilder對象
StringBuilder buffer = getBuffer();
/**
* 組裝日志StringBuilder的對象buffer,省略掉這部分略長的代碼
*/
//調用EagleEyeRollingFileAppender類的appender方法將日志流buffer輸出到本地文件中
eea.append(buffer.toString());
}
}
最后來看EagleEyeRollingFileAppender類中的日志寫入到文件的過程:
class EagleEyeRollingFileAppender extends EagleEyeAppender{
/**
* 其余代碼
*/
@Override
public void append(String log) {
BufferedOutputStream bos = this.bos;
if (bos != null) {
try {
waitUntilRollFinish();
//獲取到日志流的byte字節數組
byte[] bytes = log.getBytes(EagleEye.DEFAULT_CHARSET);
int len = bytes.length;
if (len > DEFAULT_BUFFER_SIZE && this.multiProcessDetected) {
len = DEFAULT_BUFFER_SIZE;
bytes[len - 1] = '\n';
}
//將字節數組寫入到緩沖區
bos.write(bytes, 0, len);
outputByteSize += len;
//若是超出文件大小限制則切片寫入到文件中,否則直接寫入到文件中
if (outputByteSize >= maxFileSize) {
rollOver();
} else {
if (System.currentTimeMillis() >= nextFlushTime) {
flush();
}
}
} catch (Exception e) {
//出現寫入異常則恢復現場
doSelfLog("[ERROR] fail to write log to file " + filePath + ", error=" + e.getMessage());
close();
setFile();
}
}
}
}
2.2 EagleEye控制台分析traceId的原理
EagleEye控制器能夠看到一次traceId全部調用過程並進行分析,如圖1所示,要實現如此功能必須要在EagleEye控制台的服務器集群中存儲所有應用的EagleEye日志。
站在設計者的角度去思考,要想EagleEye控制台獲取到所有應用的EagleEye日志,有兩種思路:第一種是所有的應用在執行EagleEye埋點邏輯時,也將本地的EagleEye日志上傳到EagleEye控制台上;第二種是EagleEye控制台的服務器定時去所有應用的服務器上拉取EagleEye日志。
EagleEye選擇的第二種方式,即通過starAgent定時去應用服務器上拉取指定路徑的EagleEye日志(在EagleEye的控制台上可以指定應用EagleEye日志的路徑),並根據traceId將日志進行重組和排列,最終得到了EagleEye控制台上展現的調用鏈。事后分析一下,選擇第二種方式的好處是拉取日志的速率可以由EagleEye自己決定,但是也有缺點比如EagleEye日志會有一些延時;轉而一想,如果選擇了第一種方式,則會存在更大的問題:所有應用無時無刻不在打EagleEye日志,第一種方式中所有應用每打一次日志就上傳一次,全集團的應用加在一起勢必會把EagleEye服務器給打掛,最終導致服務不可用。
3.自定義EagleEye接入
EagleEye的埋點邏輯已經集成到了中間件中,如果應用未使用到集成了EagleEye的中間件和接入端,也可以自定義EagleEye的接入方式。
以下內容參考自EagleEye官方文檔。
自定義的EagleEye接入需要在入口型應用或者客戶端中塞入EagleEye的相關信息,再在后面的應用中進行信息還原即可。
3.1 EagleEye信息塞入
3.1.1 客戶端中塞入EagleEye信息
EagleEye.startRpc(serviceName, methodName, 91, params); //開啟一次rpc調用,計算rpcId並放入到上下文中,91是自定義客戶端的EagleEye的RpcType碼
EagleEye.requestSize(reqSize); //設置請求大小(可選)
EagleEye.remoteIp(serverIp); //設置服務端IP,如果是非集團的外部應用可以填10.0.0.0
EagleEye.rpcClientSend(); //標識客戶端發送RPC調用請求
// 構造請求對象
RpcRequest request = new RpcRequest();
// 在請求對象中塞入eagleeye的相關信息
request.addAttachment("EagleEyeTraceId", EagleEye.getTraceId());
request.addAttachment("EagleEyeRpcId", EagleEye.getRpcId());
request.addAttachment("EagleEyeUserData", EagleEye.exportUserData());
//設置響應大小(可選)
EagleEye.responseSize(resSize);
//設置RPC類型名
EagleEye.attribute("rpcName", "RPC");
//記錄本次rpc調用的響應事件日志
EagleEye.rpcClientRecv(EagleEye.RPC_RESULT_SUCCESS, null);
3.1.2 入口型應用塞入EagleEye信息
String traceId = null;
//嘗試從本地上下文中獲取traceId,若為空,則重新生成traceId
if (null == EagleEye.getTraceId()) {
traceId = EagleEye.generateTraceId(localIP);
}
//標識一次trace調用的開始,traceName為用戶設置,90是入口型應用的rpcType
EagleEye.startTrace(traceId, traceName, 90);
try {
// 處理業務邏輯...
} finally {
EagleEye.attribute("rpcName", "HTTP"); //設置RPC類型名
//關閉trace記錄,清空ThreadLocal變量
EagleEye.endTrace("00", null);
}
3.1.3 定時任務塞入EagleEye信息
目前集團內的定時任務一般是用Schedulerx來調用,Schedulerx在進行任務調度時不會傳入traceId等信息,需要應用在自定義的任務類中手動塞入traceId並進行trace記錄,示例如下:
public class MyMapJobProcessor extends MapJobProcessor {
@Override
//定時任務方法
public ProcessResult process(JobContext context) throws Exception {
String traceId = EagleEye.getTraceId();
//表示是否新建了 traceId
boolean flag = false;
//若上下文中不存在traceId,需要手動生成traceId並開啟trace記錄
if (StringUtils.isBlank(traceId)) {
String id = EagleEye.generateTraceId(null);
EagleEye.startTrace(id, methodName, EagleEye.TYPE_HSF_SERVER);
flag = true;
}
try {
/**
*執行業務邏輯
*/
}finally {
if (flag) {
//若開啟了trace記錄,則必須要在線程結束時關閉trace記錄,以清空ThreadLocal變量
EagleEye.endTrace(EagleEye.RPC_RESULT_SUCCESS);
}
}
}
}
3.2 EagleEye信息取出
// 收到了 RPC 請求
RpcRequest request = ...
// 從請求獲取traceId、rpcId、UserData,還原EagleEye調用鏈上下文
String traceId = request.getAttachment("EagleEyeTraceId");
String rpcId = request.getAttachment("EagleEyeRpcId");
String userData = request.getAttachment("EagleEyeUserData");
// 如果沒有上下文,需要生成新的
if (traceId == null) {
traceId = EagleEye.generateTraceId(null);
}
if (rpcId == null) {
rpcId = EagleEye.MAL_ROOT_RPC_ID;
}
// userData 可以為 null
// 如果是一對多場景(即類似 Notify、MetaQ 的方式,客戶端發布一個消息,會對應多個服務端收到這條消息),服務端埋點需要增加下面一句:
rpcId = EagleEye.generateMulticastRpcId(rpcId, null);
// 重新構建上下文
Map<String, String> context = new HashMap<String, String>();
context.put(EagleEye.TRACE_ID_KEY, traceId);
context.put(EagleEye.RPC_ID_KEY, rpcId);
context.put(EagleEye.USER_DATA_KEY, userData);
EagleEye.setRpcContext(context);
EagleEye.rpcServerRecv(serviceName, methodName, 92, params); //params為參數,可以不填
EagleEye.remoteIp(clientIp); //設置遠程服務器ip
// 服務端處理請求...
// 生成響應返回
EagleEye.responseSize(respSize);
EagleEye.attribute("rpcName", "RPC"); //設置RPC類型名
EagleEye.rpcServerSend(EagleEye.RPC_RESULT_SUCCESS, null); // 自動清理線程上的EagleEye上下文,並設置RpcType
3.3 EagleEye異步線程恢復
EagleEye的上下文信息是存儲在ThreadLocal中的,所以在使用異步線程時需要手動傳遞上下文,當業務邏輯轉移到異步線程時,需要先備份 EagleEye 的調用上下文到異步任務中,保證鏈路的正確性。下面是一個例子:
先做EagleEye上下文備份:
Object ctx = EagleEye.getRpcContext(); // 從當前 ThreadLocal 備份
MyAsyncTask task = new MyAsyncTask(); // 這里的 MyAsyncTask 是一個業務自定義的 Runnable
task.setRpcContext(ctx); // 將 ctx 保存到 task 中
Future future = bizThreadPoolExecutor.submit(task); // 提交任務
// 后面繼續執行其他邏輯,或者用 future.get() 等待任務的結果,都沒有問題
// 如果 submit 多個 task,每個 task 都需要保存一份 ctx
再在執行邏輯時恢復上下文:
class MyAsyncTask implements Runnable {
private Object ctx; // 用於存放之前保存的 EagleEye 上下文
public void setRpcContext(Object ctx) { this.ctx = ctx; }
public void run() {
EagleEye.setRpcContext(ctx); // 還原到 ThreadLocal
try {
// 開始做異步邏輯,如調用 HSF、Notify、Tair 之類
// ...
} finally {
EagleEye.clearRpcContext(); // 務必清理 ThreadLocal 的上下文,避免異步線程復用時出現上下文互串的問題
}
}
}