CAT 由大眾點評開發的,基於 Java 的實時應用監控平台,包括實時應用監控,業務監控。對於及時發現線上問題非常有用。(不知道大家有沒有在用)
應用自然是最初級的,用完之后,還想了解下其背后的原理,所以有了源碼閱讀一說。
今天來看看 cat-client 模塊,重在調用方。
打開文件,首先看一下使用說明,背景,資料。ok,進入正題。
先大致看一下目錄結構:
接下來,從樣例開始着手,在這里從單元測試開始干活。
public class CatTest { @Test public void test() { Transaction trans = Cat.newTransaction("logTransaction", "logTransaction"); Cat.newEvent("logEvent", "logEvent"); Cat.newTrace("logTrace", "logTrace"); Cat.newHeartbeat("logHeartbeat", "logHeartbeat"); Throwable cause = new Throwable(); Cat.logError(cause); Cat.logError("message", cause); Cat.logTrace("logTrace", "<trace>"); Cat.logTrace("logTrace", "<trace>", Trace.SUCCESS, "data"); Cat.logMetric("logMetric", "test", "test"); Cat.logMetricForCount("logMetricForCount"); Cat.logMetricForCount("logMetricForCount", 4); Cat.logMetricForDuration("logMetricForDuration", 100); Cat.logMetricForSum("logMetricForSum", 100); Cat.logMetricForSum("logMetricForSum", 100, 100); Cat.logEvent("RemoteLink", "Call", Message.SUCCESS, "Cat-0a010680-384736-2061"); Cat.logEvent("EventType", "EventName"); Cat.logHeartbeat("logHeartbeat", "logHeartbeat", Message.SUCCESS, null); trans.setStatus(Transaction.SUCCESS); // trans.setStatus(cause); trans.complete(); Assert.assertEquals(true, Cat.isInitialized()); } }
看得出來,cat把其主要功能都列舉在了這個單元測試里。大概功能就是,記錄event,trace,error,metrics.
不過,咱們只討論下其中個別類型的處理就O了。
先來看第一個創建事務的方法:
Cat.newTransaction("logTransaction", "logTransaction"); // 進入方法查看,1. 先獲取生產者; 2. 創建一個事務 public static Transaction newTransaction(String type, String name) { return Cat.getProducer().newTransaction(type, name); } // 查看獲取生產者的方法,檢查是否已初始化,如果沒有初始化則進行初始化,深度咱們就先到這里 public static MessageProducer getProducer() { checkAndInitialize(); return s_instance.m_producer; } // 2. 創建一個事務,1.先獲取上下文如果沒有則新建; 2. 如果可以記錄消息,則立馬創建一個默認事務DefaultTransaction; 3. 開啟執行,返回事務實例,供下文調用; @Override public Transaction newTransaction(String type, String name) { // this enable CAT client logging cat message without explicit setup if (!m_manager.hasContext()) { m_manager.setup(); } if (m_manager.isMessageEnabled()) { DefaultTransaction transaction = new DefaultTransaction(type, name, m_manager); m_manager.start(transaction, false); return transaction; } else { return NullMessage.TRANSACTION; } } // 2.1. 如何獲取當前上下文, @Override public void setup() { Context ctx; if (m_domain != null) { ctx = new Context(m_domain.getId(), m_hostName, m_domain.getIp()); } else { ctx = new Context("Unknown", m_hostName, ""); } m_context.set(ctx); } // 2.2. 檢查是否已初始化上下文 @Override public boolean hasContext() { return m_context.get() != null; } // 2.3. 上下文怎么保證線程安全,使用 ThreadLocal 線程變量 private ThreadLocal<Context> m_context = new ThreadLocal<Context>(); // 2.4. 開啟一個事務,1. 獲取上下文; 2. 開啟上下文事務; 3. 如果是tag類型的事務,則將其放入 m_taggedTransactions; 配置有誤,只提示一次警告 @Override public void start(Transaction transaction, boolean forked) { Context ctx = getContext(); if (ctx != null) { ctx.start(transaction, forked); if (transaction instanceof TaggedTransaction) { TaggedTransaction tt = (TaggedTransaction) transaction; m_taggedTransactions.put(tt.getTag(), tt); } } else if (m_firstMessage) { m_firstMessage = false; m_logger.warn("CAT client is not enabled because it's not initialized yet"); } } // 2.4.1. 獲取上下文 private Context getContext() { if (Cat.isInitialized()) { Context ctx = m_context.get(); if (ctx != null) { return ctx; } else { if (m_domain != null) { ctx = new Context(m_domain.getId(), m_hostName, m_domain.getIp()); } else { ctx = new Context("Unknown", m_hostName, ""); } m_context.set(ctx); return ctx; } } return null; } // 2.4.2. 開啟事務,1. 如果stack為空就把事務設置到m_tree上,否則處理子節點; 2. 把事務壓入棧中; public void start(Transaction transaction, boolean forked) { if (!m_stack.isEmpty()) { // Do NOT make strong reference from parent transaction to forked transaction. // Instead, we create a "soft" reference to forked transaction later, via linkAsRunAway() // By doing so, there is no need for synchronization between parent and child threads. // Both threads can complete() anytime despite the other thread. if (!(transaction instanceof ForkedTransaction)) { Transaction parent = m_stack.peek(); addTransactionChild(transaction, parent); } } else { m_tree.setMessage(transaction); } if (!forked) { m_stack.push(transaction); } } // 2.4.3. 上下文結構 public Context(String domain, String hostName, String ipAddress) { m_tree = new DefaultMessageTree(); // 創建一個消息樹 m_stack = new Stack<Transaction>(); // 存放棧信息 Thread thread = Thread.currentThread(); String groupName = thread.getThreadGroup().getName(); m_tree.setThreadGroupName(groupName); m_tree.setThreadId(String.valueOf(thread.getId())); m_tree.setThreadName(thread.getName()); m_tree.setDomain(domain); m_tree.setHostName(hostName); m_tree.setIpAddress(ipAddress); m_length = 1; m_knownExceptions = new HashSet<Throwable>(); } // DefaultModuleInitializer @Override public void execute(ModuleContext ctx, Module... modules) { Set<Module> all = new LinkedHashSet<Module>(); info(ctx, "Initializing top level modules:"); for (Module module : modules) { info(ctx, " " + module.getClass().getName()); } try { expandAll(ctx, modules, all); for (Module module : all) { if (!module.isInitialized()) { executeModule(ctx, module, m_index++); } } } catch (Exception e) { throw new RuntimeException("Error when initializing modules! Exception: " + e, e); } } // 調用executeModule方法,初始化數據 private synchronized void executeModule(ModuleContext ctx, Module module, int index) throws Exception { long start = System.currentTimeMillis(); // set flat to avoid re-entrance module.setInitialized(true); info(ctx, index + " ------ " + module.getClass().getName()); // execute itself after its dependencies module.initialize(ctx); long end = System.currentTimeMillis(); info(ctx, index + " ------ " + module.getClass().getName() + " DONE in " + (end - start) + " ms."); } // cat初始化 // this should be called during application initialization time public static void initialize(File configFile) { PlexusContainer container = ContainerLoader.getDefaultContainer(); initialize(container, configFile); } public static void initialize(PlexusContainer container, File configFile) { ModuleContext ctx = new DefaultModuleContext(container); // 該方法會去 components.xml中查找 org.unidal.initialization.Module 的實現類, Module module = ctx.lookup(Module.class, CatClientModule.ID); if (!module.isInitialized()) { ModuleInitializer initializer = ctx.lookup(ModuleInitializer.class); ctx.setAttribute("cat-client-config-file", configFile); initializer.execute(ctx, module); } }
// components.xml 中配置的 Module, 加載入 CatClientModule
<component> <role>org.unidal.initialization.Module</role> <role-hint>cat-client</role-hint> <implementation>com.dianping.cat.CatClientModule</implementation> </component>
// plexus.xml 中 配置日志輸出
<plexus> <components> <component> <role>org.codehaus.plexus.logging.LoggerManager</role> <implementation>org.unidal.lookup.logger.TimedConsoleLoggerManager</implementation> <configuration> <dateFormat>MM-dd HH:mm:ss.SSS</dateFormat> <showClass>true</showClass> <logFilePattern>cat_{0,date,yyyyMMdd}.log</logFilePattern> <baseDirRef>CAT_HOME</baseDirRef> <defaultBaseDir>/data/applogs/cat</defaultBaseDir> </configuration> </component> </components> </plexus>
// logEvent 舉個例子,event處理過程
Cat.logEvent("RemoteLink", "Call", Message.SUCCESS, "Cat-0a010680-384736-2061"); // 進入方法 public static void logEvent(String type, String name, String status, String nameValuePairs) { Cat.getProducer().logEvent(type, name, status, nameValuePairs); } // DefaultMessageProducer, logEvent @Override public void logEvent(String type, String name, String status, String nameValuePairs) { Event event = newEvent(type, name); if (nameValuePairs != null && nameValuePairs.length() > 0) { event.addData(nameValuePairs); } event.setStatus(status); event.complete(); } // DefaultEvent, complete 方法 @Override public void complete() { setCompleted(true); if (m_manager != null) { m_manager.add(this); } } // DefaultMessageManager, add方法,添加到上下文中 @Override public void add(Message message) { Context ctx = getContext(); if (ctx != null) { ctx.add(message); } } // DefaultMessageManager, 最終添加方法 public void add(Message message) { if (m_stack.isEmpty()) { MessageTree tree = m_tree.copy(); tree.setMessage(message); flush(tree); } else { Transaction parent = m_stack.peek(); addTransactionChild(message, parent); } } // DefaultMessageManager, 發送刷寫數據 public void flush(MessageTree tree) { if (tree.getMessageId() == null) { tree.setMessageId(nextMessageId()); } MessageSender sender = m_transportManager.getSender(); if (sender != null && isMessageEnabled()) { sender.send(tree); reset(); } else { m_throttleTimes++; if (m_throttleTimes % 10000 == 0 || m_throttleTimes == 1) { m_logger.info("Cat Message is throttled! Times:" + m_throttleTimes); } } } // TcpSocketSender, 發送數據 // 先插入 BlockingQueue<MessageTree> m_queue 阻塞隊列中,如果插入失敗,則進行日志隊列檢查 @Override public void send(MessageTree tree) { if (isAtomicMessage(tree)) { boolean result = m_atomicTrees.offer(tree, m_manager.getSample()); if (!result) { logQueueFullInfo(tree); } } else { boolean result = m_queue.offer(tree, m_manager.getSample()); if (!result) { logQueueFullInfo(tree); } } } // 日志隊列檢查 private void logQueueFullInfo(MessageTree tree) { if (m_statistics != null) { m_statistics.onOverflowed(tree); } int count = m_errors.incrementAndGet(); if (count % 1000 == 0 || count == 1) { m_logger.error("Message queue is full in tcp socket sender! Count: " + count); } tree = null; } // 如果隊列不為空,則插入到上一節點之后 private void addTransactionChild(Message message, Transaction transaction) { long treePeriod = trimToHour(m_tree.getMessage().getTimestamp()); long messagePeriod = trimToHour(message.getTimestamp() - 10 * 1000L); // 10 seconds extra time allowed if (treePeriod < messagePeriod || m_length >= m_configManager.getMaxMessageLength()) { m_validator.truncateAndFlush(this, message.getTimestamp()); } transaction.addChild(message); m_length++; } // DefaultTransaction, addChild, 添加子節點,完成添加操作 @Override public DefaultTransaction addChild(Message message) { if (m_children == null) { m_children = new ArrayList<Message>(); } if (message != null) { m_children.add(message); } else { Cat.logError(new Exception("null child message")); } return this; }
// Transaction 的 complete 實現,最終的提交
trans.complete(); // 進入方法,如果已經結束,則認為是異常情況 @Override public void complete() { try { if (isCompleted()) { // complete() was called more than once DefaultEvent event = new DefaultEvent("cat", "BadInstrument"); event.setStatus("TransactionAlreadyCompleted"); event.complete(); addChild(event); } else { m_durationInMicro = (System.nanoTime() - m_durationStart) / 1000L; setCompleted(true); // 防止下次再進入 if (m_manager != null) { m_manager.end(this); } } } catch (Exception e) { // ignore } } // DefaultMessageManager, end 方法 @Override public void end(Transaction transaction) { Context ctx = getContext(); if (ctx != null && transaction.isStandalone()) { if (ctx.end(this, transaction)) { m_context.remove(); } } } // DefaultMessageManager, end transaction 進行校驗 public boolean end(DefaultMessageManager manager, Transaction transaction) { if (!m_stack.isEmpty()) { Transaction current = m_stack.pop(); if (transaction == current) { m_validator.validate(m_stack.isEmpty() ? null : m_stack.peek(), current); } else { while (transaction != current && !m_stack.empty()) { m_validator.validate(m_stack.peek(), current); current = m_stack.pop(); } } if (m_stack.isEmpty()) { MessageTree tree = m_tree.copy(); m_tree.setMessageId(null); m_tree.setMessage(null); if (m_totalDurationInMicros > 0) { adjustForTruncatedTransaction((Transaction) tree.getMessage()); } manager.flush(tree); return true; } } return false; } // 驗證事務的正確性,對嵌套的 transaction 進行驗證 public void validate(Transaction parent, Transaction transaction) { if (transaction.isStandalone()) { List<Message> children = transaction.getChildren(); int len = children.size(); for (int i = 0; i < len; i++) { Message message = children.get(i); if (message instanceof Transaction) { validate(transaction, (Transaction) message); } } if (!transaction.isCompleted() && transaction instanceof DefaultTransaction) { // missing transaction end, log a BadInstrument event so that // developer can fix the code markAsNotCompleted((DefaultTransaction) transaction); } } else if (!transaction.isCompleted()) { if (transaction instanceof DefaultForkedTransaction) { // link it as run away message since the forked transaction is not completed yet linkAsRunAway((DefaultForkedTransaction) transaction); } else if (transaction instanceof DefaultTaggedTransaction) { // link it as run away message since the forked transaction is not completed yet markAsRunAway(parent, (DefaultTaggedTransaction) transaction); } } } // 適應事務時間段 private void adjustForTruncatedTransaction(Transaction root) { DefaultEvent next = new DefaultEvent("TruncatedTransaction", "TotalDuration"); long actualDurationInMicros = m_totalDurationInMicros + root.getDurationInMicros(); next.addData(String.valueOf(actualDurationInMicros)); next.setStatus(Message.SUCCESS); root.addChild(next); m_totalDurationInMicros = 0; } // 發送最后的數據 public void flush(MessageTree tree) { if (tree.getMessageId() == null) { tree.setMessageId(nextMessageId()); } MessageSender sender = m_transportManager.getSender(); if (sender != null && isMessageEnabled()) { sender.send(tree); reset(); } else { m_throttleTimes++; if (m_throttleTimes % 10000 == 0 || m_throttleTimes == 1) { m_logger.info("Cat Message is throttled! Times:" + m_throttleTimes); } } } // 可以記錄的前提是,所有條件均滿足 @Override public boolean isMessageEnabled() { return m_domain != null && m_domain.isEnabled() && m_context.get() != null && m_configManager.isCatEnabled(); } // 發送messageTree到 LinkedBlockingQueue<MessageTree> m_tree @Override public void send(MessageTree tree) { if (isAtomicMessage(tree)) { boolean result = m_atomicTrees.offer(tree, m_manager.getSample()); if (!result) { logQueueFullInfo(tree); } } else { boolean result = m_queue.offer(tree, m_manager.getSample()); if (!result) { logQueueFullInfo(tree); } } } // 發送數據完成后,需要將原來的數據清空還原,以便下次可用 @Override public void reset() { // destroy current thread local data Context ctx = m_context.get(); if (ctx != null) { if (ctx.m_totalDurationInMicros == 0) { ctx.m_stack.clear(); ctx.m_knownExceptions.clear(); m_context.remove(); } else { ctx.m_knownExceptions.clear(); } } } // 上下文的移除,其他鏈表結構各自移除 public void remove() { ThreadLocalMap m = getMap(Thread.currentThread()); if (m != null) m.remove(this); } // 為保證上下文絕對移除,再次操作 @Override public void end(Transaction transaction) { Context ctx = getContext(); if (ctx != null && transaction.isStandalone()) { if (ctx.end(this, transaction)) { m_context.remove(); } } }
// 寫入隊列后,由 TcpSocketSender 線程進行輪詢發送到cat后台
@Override public void run() { m_active = true; while (m_active) { ChannelFuture channel = m_manager.channel(); if (channel != null && checkWritable(channel)) { try { MessageTree tree = m_queue.poll(); if (tree != null) { sendInternal(tree); tree.setMessage(null); } } catch (Throwable t) { m_logger.error("Error when sending message over TCP socket!", t); } } else { long current = System.currentTimeMillis(); long oldTimestamp = current - HOUR; while (true) { try { MessageTree tree = m_queue.peek(); if (tree != null && tree.getMessage().getTimestamp() < oldTimestamp) { MessageTree discradTree = m_queue.poll(); if (discradTree != null) { m_statistics.onOverflowed(discradTree); } } else { break; } } catch (Exception e) { m_logger.error(e.getMessage(), e); break; } } try { Thread.sleep(5); } catch (Exception e) { // ignore it m_active = false; } } } }
如此,整個cat埋點的過程就搞定了。關鍵技術就是:
1. ThreadLocal 用於保存上下文埋點,保證線程安全。
2. LinkedBlockingQueue 用於保存消息樹,作為生產線程與消費線的溝通橋梁!
3. AtomicInteger 用於計數,保證准確性。
4. 心跳線和用於發送本機的狀態到cat后台。
5. 懶加載,單例模式的使用。
等等,來個圖: