大眾點評CAT開源監控系統剖析


 參考文檔:

大眾點評的實時監控系統分析(一)

CAT_source_analyze

透過CAT,來看分布式實時監控系統的設計與實現

深度剖析開源分布式監控CAT

[分布式監控CAT] Client端源碼解析

大眾點評Cat--架構分析

大眾點評Cat--Server模塊架構分析

Plexus,Spring之外的IoC容器

plexus使用(一)

Spring Cloud Sleuth使用簡介

Spring Cloud Sleuth 整合Zipkin、RabbitMQ 和 (Mysql或Elasticsearch)

Cat監控Druid數據庫連接池

1. 介紹

1.1 概述

CAT(Central Application Tracking)基於Java開發的實時監控平台,主要包括移動端監控,應用側監控,核心網絡層監控,系統層監控等。

CAT是一個提供實時監控報警,應用性能分析診斷的工具。

1.2 CAT能做什么

在此之前,先來想一想對於線上應用我們希望能監控些什么?可能有如下這些:

  • 機器狀態信息。CPU負載、內存信息、磁盤使用率這些是必需的,另外可能還希望收集Java進程的數據,例如線程棧、堆、垃圾回收等信息,以幫助出現問題時快速debug。
  • 請求訪問情況。例如請求個數、響應時間、處理狀態,如果有處理過程中的時間分析那就更完美了。
  • 異常情況。譬如緩存服務時不時出現無響應,我們希望能夠監控到這種異常,從而做進一步的處理。
  • 業務情況。例如訂單量統計,銷售額等等。

CAT支持的監控消息類型包括:

  • Transaction 適合記錄跨越系統邊界的程序訪問行為,比如遠程調用,數據庫調用,也適合執行時間較長的業務邏輯監控,Transaction用來記錄一段代碼的執行時間和次數。
  • Event 用來記錄一件事發生的次數,比如記錄系統異常,它和transaction相比缺少了時間的統計,開銷比transaction要小。
  • Heartbeat 表示程序內定期產生的統計信息, 如CPU%, MEM%, 連接池狀態, 系統負載等。
  • Metric 用於記錄業務指標、指標可能包含對一個指標記錄次數、記錄平均值、記錄總和,業務指標最低統計粒度為1分鍾。
  • Trace 用於記錄基本的trace信息,類似於log4j的info信息,這些信息僅用於查看一些相關信息

在一個請求處理中可能產生有多種消息,CAT將其組織成消息樹的形式。

在處理開始時,默認開始一個類型為URL的Transaction,在這個Transaction中業務本身可以產生子消息。例如,產生一個數據庫訪問的子Transaction或者一個訂單統計的Metric。結構如下所示:

 

1.3 分布式監控系統要求

  • 方便安裝
  • 要求輕量
  • 界面盡可能友好
  • 監控策略豐富,監控元素多樣化
  • 可以嵌套開發
  • 占用服務器資源小,使用時不過多占用機器硬件方面資源,對實際業務影響較小

1.4 CAT使用特點

  • 異步化傳輸數據,不太影響正常業務
  • 實時監控
  • 輕量,部署簡單
  • 嵌入簡單
  • 有問題跟蹤報表
  • 消息樹形化
  • 日志不落地本地磁盤,較少IO,但很消耗網絡資源
  • 監控消息,按照分業務傳輸數據,如業務場景,時間等要求傳輸數據
  • 有報警機制
  • 可能復雜的消息存儲和消息ID查詢看起來麻煩,需要建立查詢索引(目前不考慮這個東東)
  • 消息隊列異步化發送
  • 開源(這個最重要)

2. CAT設計

2.1 整體設計

2.2 客戶端設計

2.3 服務端設計

 

2.4 領域建模

 

3. 模塊划分

 

3.1 模塊說明

3.1.1 client端

cat-client 提供給業務以及中間層埋點的底層SDK。

3.1.2 server端

cat-consumer 用於實時分析從客戶端提供的數據。

cat-home 作為用戶給用戶提供展示的控制端 ,並且cat-home做展示時,通過對cat-consumer的調用獲取其他節點的數據,將所有數據匯總展示。

consumer、home以及路由中心都是部署在一起的,每個服務端節點都可以充當任何一個角色。

CAT服務端在整個實時處理中,基本上實現了全異步化處理:

  • 消息消費基於Netty的NIO實現(Netty-Server);
  • 消息消費到服務端就存放內存隊列,然后程序開啟一個線程會消費這個消息做消息分發(異步消費處理);
  • 每個消息都會有一批線程並發消費各自隊列的數據,以做到消息處理的隔離。(每報表每線程,分別按照自己的規則解析消費這個消息,並且可以動態控制對某種報表類型的處理線程個數);
  • 消息(原始的消息logView)存儲是先存入本地磁盤,然后異步上傳到HDFS文件,這也避免了強依賴HDFS;

4. 設計原理

4.1 cat-client設計

作為一個日志上報的通用客戶端,考慮點至少有如下這些:

  • 為了盡可能減少對業務的影響,需要對消息進行異步處理。即業務線程將消息交給CAT客戶端與CAT客戶端上報這兩個過程需要異步。
  • 為了達到實時的目的以及適應高並發的情況,客戶端上報應該基於TCP而非HTTP開發。
  • 在線程安全的前提下盡可能的資源低消耗以及低延時。我們知道,線程競爭的情況是由於資源共享造成的,要達到線程安全通常需要減少資源共享或者加鎖,而這兩點則會導致系統資源冗余和高延時。

CAT客戶端實現並不復雜,但這些點都考慮到了。它的架構如下所示:

大概步驟為:

  • 業務線程產生消息,交給消息Producer,消息Producer將消息存放在該業務線程消息棧中;
  • 業務線程通知消息Producer消息結束時,消息Producer根據其消息棧產生消息樹放置在同步消息隊列中;
  • 消息上報線程監聽消息隊列,根據消息樹產生最終的消息報文上報CAT服務端。

4.1.1 cat-client包結構

└─com
    ├─dianping
    │  └─cat
    │      ├─build
    │      ├─configuration
    │      ├─log4j
    │      ├─message
    │      │  ├─internal
    │      │  ├─io
    │      │  └─spi
    │      │      ├─codec
    │      │      └─internal
    │      ├─servlet
    │      └─status
    └─site
        ├─helper
        └─lookup
            └─util

 

4.1.2 com.dianping.cat.message包介紹

包結構如下:

com.dianping.cat.message中主要包含了internal、io、spi這三個目錄:

  • internal目錄包含主要的CAT客戶端內部實現類;
  • io目錄包含建立服務端連接、重連、消息隊列監聽、上報等io實現類;
  • spi目錄為上報消息工具包,包含消息二進制編解碼、轉義等實現類。

其uml圖如下所示(可以放大看):

類的功能如下:

  • Message為所有上報消息的抽象,它的子類實現有Transaction、Metric、Event、HeartBeat、Trace這五種。
  • MessageProducer封裝了所有接口,業務在使用CAT時只需要通過MessageProducer來操作。
  • MessageManager為CAT客戶端核心類,相當於MVC中的Controller。
  • Context類保存消息上下文。
  • TransportManager提供發送消息的sender,具體實現有DefaultTransportManager,調用其getSender接口返回一個TcpSocketSender。
  • TcpSocketSender類負責發送消息。

1)Message

上面說到,Message有五類,分別為Transaction、Metric、Event、HeartBeat、Trace。其中Metric、Event、HeartBeat、Trace基本相同,保存的數據都為一個字符串;而Transaction則保存一個Message列表。換句話說,Transaction的結構為一個遞歸包含的結構,其他結構則為原子性結構。

下面為DefaultTransaction的關鍵數據成員及操作:

public class DefaultTransaction extends AbstractMessage implements Transaction {
    private List<Message> m_children;
    private MessageManager m_manager;
    ...

    //添加子消息
    public DefaultTransaction addChild(Message message) {
        ...
    }

    //Transaction結束時調用此方法
    public void complete() {
        ...
        m_manager.end(this); //調用MessageManager來結束Transaction 
        ...
    }

 

值得一提的是,Transaction(或者其他的Message)在創建時自動開始,消息結束時需要業務方調用complete方法,而在complete方法內部則調用MessageManager來完成消息。

2)MessageProducer

MessageProducer對業務方封裝了CAT內部的所有細節,它的主要方法如下:

public void logError(String message, Throwable cause);
public void logEvent(String type, String name, String status, String nameValuePairs);
public void logHeartbeat(String type, String name, String status, String nameValuePairs);
public void logMetric(String name, String status, String nameValuePairs);
public void logTrace(String type, String name, String status, String nameValuePairs);
...
public Event newEvent(String type, String name);
public Event newEvent(Transaction parent, String type, String name);
public Heartbeat newHeartbeat(String type, String name);
public Metric newMetric(String type, String name);
public Transaction newTransaction(String type, String name);
public Trace newTrace(String type, String name);

 

logXXX方法為方法糖(造詞小能手呵呵),這些方法在調用時需要傳入消息數據,方法結束后消息自動結束。

newXXX方法返回相應的Message,業務方需要調用Message方法設置數據,並最終調用Message.complete()方法結束消息。

MessageProducer只是接口封裝,消息處理主要實現依賴於MessageManager這個類。

3)MessageManager

MessageManager為CAT的核心類,但它只是定義了接口,具體實現為DefaultMessageManager。DefaultMessageManager這個類里面主要包含了兩個功能類,ContextTransportManager,分別用於保存上下文和消息傳輸。TransportManager運行期間為單例對象,而Context則包裝成ThreadLocal為每個線程保存上下文。

我們通過接口來了解DefaultMessageManager的主要功能:

public void add(Message message);
public void start(Transaction transaction, boolean forked);
public void end(Transaction transaction);

public void flush(MessageTree tree);

 

add()方法用來添加原子性的Message,也就是Metric、Event、HeartBeat、Trace。

start()和end()方法用來開始和結束Transaction這種消息。

flush()方法用來將當前業務線程的所有消息刷新到CAT服務端,當然,是異步的。

4)Context

Context用來保存消息上下文,我們可以通過它的主要接口來了解它功能:

public void add(Message message) {
    if (m_stack.isEmpty()) {
         MessageTree tree = m_tree.copy();

         tree.setMessage(message);
         flush(tree);
    } else {
         Transaction parent = m_stack.peek();

         addTransactionChild(message, parent);
     }
 }

add方法主要添加原子性消息,它先判斷該消息是否有上文消息(即判斷是否處於一個Transaction中)。如果有則m_stack不為空並且將該消息添加到上文Transaction的子消息隊列中;否則直接調用flush來將此原子性消息刷新到服務端。

public void start(Transaction transaction, boolean forked) {
    if (!m_stack.isEmpty()) {
        ...
        Transaction parent = m_stack.peek();
        addTransactionChild(transaction, parent);
    } else {
        m_tree.setMessage(transaction);
    }

    if (!forked) {
        m_stack.push(transaction);
    }
}

start方法用來開始Transaction(Transaction是消息里比較特殊的一種),如果當前消息棧為空則證明該Transaction為第一個Transaction,使用消息樹保存該消息,同時將該消息壓棧;否則將當前Transaction保存到上文Transaction的子消息隊列中,同時將該消息壓棧。

public boolean end(DefaultMessageManager manager, Transaction transaction) {
if (!m_stack.isEmpty()) {
        Transaction current = m_stack.pop();
        ...
        if (m_stack.isEmpty()) {
            MessageTree tree = m_tree.copy();

            m_tree.setMessageId(null);
            m_tree.setMessage(null);
            ...
            manager.flush(tree); //刷新消息到CAT服務端
            return true;
        }
    }

    return false;
}

end方法用來結束Transaction,每次調用都會pop消息棧,如果棧為空則調用flush來刷新消息到CAT服務端。

綜上,Context的m_stack的結構如下:

Transaction之間是有引用的,因此在end方法中只需要將第一個Transaction(封裝在MessageTree中)通過MessageManager來flush,在拼接消息時可以根據這個引用關系來找到所有的Transaction :)。

5)TransportManager和TcpSocketSender

這兩個類用來發送消息到服務端。MessageManager通過TransportManager獲取到MessageSender,調用sender.send()方法來發送消息。 TransportManager和MessageSender關系如下:

TCPSocketSender為MessageSender的具體子類,它里面主要的數據成員為:

private MessageCodec m_codec;
private MessageQueue m_queue = new DefaultMessageQueue(SIZE);
private ChannelManager m_manager;
  • MessageCodec:CAT基於TCP傳輸消息,因此在發送消息時需要對字符消息編碼成字節流,這個編碼的工作由MessageCodec負責實現。

  • MessageQueue:還記得剛才說業務方在添加消息時,CAT異步發送到服務端嗎?在添加消息時,消息會被放置在TCPSocketSender的m_queue中,如果超出queue大小則拋棄消息。

  • ChannelManager:CAT底層使用netty來實現TCP消息傳輸,ChannelManager負責維護通信Channel。通俗的說,維護連接。

TCPSocketSender主要方法為initialize、send和run,分別介紹如下:

public void initialize() {
    m_manager = new ChannelManager(m_logger, m_serverAddresses, m_queue, m_configManager, m_factory);

    Threads.forGroup("cat").start(this);
    Threads.forGroup("cat").start(m_manager);
    ...
}

initialize方法為初始化方法,在執行時主要創建兩個線程,一個用來運行自身run方法(TCPSocketSender實現了Runnable接口)監聽消息隊列;另一個則用來執行ChannelManager維護通信Channel。

public void send(MessageTree tree) {
    if (isAtomicMessage(tree)) {
        boolean result = m_atomicTrees.offer(tree, m_manager.getSample());

        if (!result) {
            logQueueFullInfo(tree);
        }
    } else {
        boolean result = m_queue.offer(tree, m_manager.getSample());

        if (!result) {
            logQueueFullInfo(tree);
        }
    }
}

send方法被MessageManager調用,把消息放置在消息隊列中。

public void run() {
    m_active = true;

    while (m_active) {
        ChannelFuture channel = m_manager.channel();

        if (channel != null && checkWritable(channel)) {
            try {
                MessageTree tree = m_queue.poll();

                if (tree != null) {
                    sendInternal(tree);
                    tree.setMessage(null);
                }

            } catch (Throwable t) {
                m_logger.error("Error when sending message over TCP socket!", t);
            }
        } else {
            try {
                Thread.sleep(5);
            } catch (Exception e) {
                // ignore it
                m_active = false;
            }
        }
    }
}

private void sendInternal(MessageTree tree) {
    ChannelFuture future = m_manager.channel();
    ByteBuf buf = PooledByteBufAllocator.DEFAULT.buffer(10 * 1024); // 10K

    m_codec.encode(tree, buf);

    int size = buf.readableBytes();
    Channel channel = future.channel();

    channel.writeAndFlush(buf);
    if (m_statistics != null) {
        m_statistics.onBytes(size);
    }
}

run方法會一直執行直到進程退出,在循環里先獲取通信Channel,然后發送消息。值得注意的是,sendInternal方法在執行時調用m_codec.encode(tree, buf),參數為消息樹緩沖區。消息樹里面其實只保存了一個消息,還記得剛才說的Transaction上下文引用嗎?m_codec在encode的時候會判斷消息類型是否為Transaction,如果為Transaction則會遞歸獲取子Transaction,否則直接將該消息編碼。具體實現可以參考源代碼的PlainTextMessageCodec類的encode方法,此處不再贅述。

4.1.3 cat-client 主要類介紹

cat-client的主要入口是cat-client包中的Cat類

Cat類以及Cat的依賴類層級結構如下:

接口層

Cat類以及MessageProducer類。主要功能是為外部提供api,Cat主要作用是與plexus框架做集成,MessageProducer是處理api的主要類

PS:額外說一下,Cat這個項目很【有特色】地用了plexus作為管理容器,初次接觸的時候真是讓人頭大,plexus的基本功能和spring可以說別無二致,但是很多地方的注入竟然都需要手動處理,真是讓人尷尬,雖然作者說spring太重了,plexus的作用已經足夠

消息處理層

MessageManager以及其內部類Context。主要功能是管理消息的發送,Transaction類消息的歸集,等消息的管理工作。在MessageManager中,使用了ThreadLocal類型作為當前線程消息管理的上下文,通過這個對象線程安全地實現消息的添加,合並,發送等等。

PS:MessageManager管理的消息Message是基於Cat的監控模型創建的,其中最主要的區別是Transaction類和其他消息不太一樣,Transaction消息是一個鏈表的模型,每一個消息后面都鏈接着下一個消息,所以MessageManager對Transaction的處理也不同,別的消息都是放到Context中直接從消息處理層flush到下一層,Transaction是放到Context的棧中,直到過了預定時間,或者消息達到規定的最大長度才flush到下一層。

消息傳輸層

TransportManager以及TcpSocketSender以及ChannelManager。主要功能是把消息管理層發下來的消息進行發送,對於與多個發送的目的服務器進行Channel管理,保證有可用服務器能接受消息。TransportManager主要功能是根據配置文件初始化TcpSocketSender,TcpSocketSender主要實現把Message進行編碼(如果是Transaction還會進行合並)並放置到待發送隊列中,再同時由ChannelManager消費隊列中的消息,將消息發送給狀態為active的server端

PS:暫存消息的隊列用的是LinkedBlockingQueue,實際上LinkedBlockingQueue屬於生產消費者隊列的標配了,因為這個類對於添加和移除的消耗小,線程安全,而且達到隊列容量時會成為blocking狀態,所以基本上都會用這個類,或者基於這個類進行擴展來實現相關需求。相對來說還有ConcurrentLinkedQueue可以用,和blockingqueue的主要區別是,Concurrent超過主要容量會直接返回false,不會block,所以如果想馬上就返回的可以用Concurrent隊列。

4.1.4 Cat入口類

1)測試用例

        //靜態方法獲取Transaction對象
        Transaction t=Cat.newTransaction("logTransaction", "logTransaction");

        TimeUnit.SECONDS.sleep(30);
        t.setStatus("0");
        t.complete();

2)Cat源碼

    private static Cat s_instance = new Cat();
    private static volatile boolean s_init = false;

    private static void checkAndInitialize() {
        if (!s_init) {
            synchronized (s_instance) {
                if (!s_init) {
                    initialize(new File(getCatHome(), "client.xml"));
                    log("WARN", "Cat is lazy initialized!");
                    s_init = true;
                }
            }
        }
    }
    private Cat() {
    }

    public static MessageProducer getProducer() {
        checkAndInitialize();

        return s_instance.m_producer;
    }

Cat lazy Init

可以看到類加載時已經完成了Cat對象的初始化,內存中有且僅有一個Cat Object(static Cat s_instance = new Cat();),但是包含配置信息的完整的Cat對象並沒有完全初始化完成。調用Cat時會先嘗試獲取producer對象,並在獲取之前檢查客戶端配置是否加載完畢(checkAndInitialize)。

checkAndInitialize()通過使用doublecheck來對Cat相關配置填充的單次初始化加載。

cat-client首先會使用plexus(一個比較老的IOC容器)加載配置文件/META-INF/plexus/plexus.xml,完成IOC容器的初始化。

接着使用../../client.xml文件完成cat對象的配置信息填充初始化。並且啟動這四個daemon線程,后文詳細說明:

  • cat-StatusUpdateTask 用來每秒鍾上報客戶端基本信息(JVM等信息)
  • cat-merge-atomic-task(消息合並檢查)
  • cat-TcpSocketSender-ChannelManager(NIO 連接服務端檢查)
  • cat-TcpSocketSender(消息發送服務端)

4.1.5 CatClientModule

由於Cat用了十分low的plexus作為容器,所以在加載Cat類的時候會從靜態方法中加載各個Module,CatClientModule就是Cat client工程中首要Module

public class CatClientModule extends AbstractModule {
    public static final String ID = "cat-client";

    @Override
    protected void execute(final ModuleContext ctx) throws Exception {
        ctx.info("Current working directory is " + System.getProperty("user.dir"));

        // initialize milli-second resolution level timer
        MilliSecondTimer.initialize();

        // tracking thread start/stop,此處增加經典的hook,用於線程池關閉的清理工作。
        Threads.addListener(new CatThreadListener(ctx));

        // warm up Cat
        Cat.getInstance().setContainer(((DefaultModuleContext) ctx).getContainer());

        // bring up TransportManager
        ctx.lookup(TransportManager.class);

        ClientConfigManager clientConfigManager = ctx.lookup(ClientConfigManager.class);

        if (clientConfigManager.isCatEnabled()) {
            // start status update task
            StatusUpdateTask statusUpdateTask = ctx.lookup(StatusUpdateTask.class);

            Threads.forGroup("cat").start(statusUpdateTask);
            LockSupport.parkNanos(10 * 1000 * 1000L); // wait 10 ms

            // MmapConsumerTask mmapReaderTask = ctx.lookup(MmapConsumerTask.class);
            // Threads.forGroup("cat").start(mmapReaderTask);
        }
    }

這里plexusIOC的具體的初始化加載邏輯在org\unidal\framework\foundation-service\2.5.0\foundation-service-2.5.0.jar中,有興趣可以仔細查看。 
當准備工作做完之后,會執行具體的消息構造:

DefaultMessageProducer.newTransaction(String type, String name)

@Override
    public Transaction newTransaction(String type, String name) {
        // this enable CAT client logging cat message without explicit setup
        if (!m_manager.hasContext()) {
            //詳細可見下文源碼,此處就是用ThreadLocal存儲一個Context對象:ctx = new Context(m_domain.getId(), m_hostName, m_domain.getIp());
            m_manager.setup();

        }

        if (m_manager.isMessageEnabled()) {
            DefaultTransaction transaction = new DefaultTransaction(type, name, m_manager);

//向Context中填充構造的消息體:Context.m_tree;Context.m_stack;稍后看看Context這個對象
            m_manager.start(transaction, false);
            return transaction;
        } else {
            return NullMessage.TRANSACTION;
        }
    }

DefaultMessageManager.start(Transaction transaction, boolean forked)

@Override
    public void start(Transaction transaction, boolean forked) {
        Context ctx = getContext();//這里獲取上文中說到的ThreadLocal中構造的Context對象

        if (ctx != null) {
            ctx.start(transaction, forked);

            if (transaction instanceof TaggedTransaction) {
                TaggedTransaction tt = (TaggedTransaction) transaction;

                m_taggedTransactions.put(tt.getTag(), tt);
            }
        } else if (m_firstMessage) {
            m_firstMessage = false;
            m_logger.warn("CAT client is not enabled because it's not initialized yet");
        }
    }

DefaultMessageManager.Context.start(Transaction transaction, boolean forked)

        public void start(Transaction transaction, boolean forked) {
            if (!m_stack.isEmpty()) {//
                 {
                    Transaction parent = m_stack.peek();
                    addTransactionChild(transaction, parent);
                }
            } else {
                m_tree.setMessage(transaction);//在這里把返回的transaction放在tree上,如果有嵌套結構,后邊繼續在tree上添枝加葉
            }

            if (!forked) {
                m_stack.push(transaction);
            }
        }

這部分代碼可以看出, 

通過ThreadLocal<Context.>,使Context中實際的消息的構造保證了線程安全。

如果當前Context的棧m_stack不為空,那么接着之前的消息后邊,將當前消息構造為一個孩子結點。如果當前消息之前沒有其他消息,放入m_stack中,並setMessage.也就是當前消息時父節點。

至此,消息體構造完畢。 

這里需要看一下Context類,是DefaultMessageManager包私有的內部類。

Context.java

    class Context {
        private MessageTree m_tree;//初始化的時候構建一個MessageTree

        private Stack<Transaction> m_stack;

        private int m_length;

        private boolean m_traceMode;

        private long m_totalDurationInMicros; // for truncate message

        private Set<Throwable> m_knownExceptions;

        public Context(String domain, String hostName, String ipAddress) {
            m_tree = new DefaultMessageTree();
            m_stack = new Stack<Transaction>();

            Thread thread = Thread.currentThread();
            String groupName = thread.getThreadGroup().getName();

            m_tree.setThreadGroupName(groupName);
            m_tree.setThreadId(String.valueOf(thread.getId()));
            m_tree.setThreadName(thread.getName());

            m_tree.setDomain(domain);
            m_tree.setHostName(hostName);
            m_tree.setIpAddress(ipAddress);
            m_length = 1;
            m_knownExceptions = new HashSet<Throwable>();
        }

每個線程通過使用ThreadLocal構造一個Context對象並存儲。Context主要包含當前的消息體m_tree,和多個嵌套消息體填充的棧:m_stack :

再回到我們原來的UnitTest代碼, 

Transaction t=Cat.newTransaction("logTransaction", "logTransaction"); 

這行代碼完成了客戶端plexusIOC容器的初始化,cat-client的加載初始化、啟動了四個daemon線程,並返回了Transaction對象。

t.setStatus("0");//很簡單,就是這是一個屬性值
t.complete();

消息完成后,將消息放入一個隊列中,從而保證異步上報。

transaction.complete();的具體代碼如下:

........
    public void complete() {
        try {
            if (isCompleted()) {
                // complete() was called more than once
                DefaultEvent event = new DefaultEvent("cat", "BadInstrument");

                event.setStatus("TransactionAlreadyCompleted");
                event.complete();
                addChild(event);
            } else {
                m_durationInMicro = (System.nanoTime() - m_durationStart) / 1000L;

                setCompleted(true);

                if (m_manager != null) {
                    m_manager.end(this);
                }
            }
        } catch (Exception e) {
            // ignore
        }
    }
........
    @Override
    public void end(Transaction transaction) {
        Context ctx = getContext();

        if (ctx != null && transaction.isStandalone()) {
            if (ctx.end(this, transaction)) {
                m_context.remove();
            }
        }
    }
........

        public boolean end(DefaultMessageManager manager, Transaction transaction) {
            if (!m_stack.isEmpty()) {
                Transaction current = m_stack.pop();//Context的成員變量m_stack彈出棧頂元素,LIFO當然是最新的current元素。

                if (transaction == current) {
                    m_validator.validate(m_stack.isEmpty() ? null : m_stack.peek(), current);
                } else {
                    while (transaction != current && !m_stack.empty()) {
                        m_validator.validate(m_stack.peek(), current);

                        current = m_stack.pop();
                    }
                }

                if (m_stack.isEmpty()) {//如果當前線程存儲的Context中m_stack無元素
                    MessageTree tree = m_tree.copy();

                    m_tree.setMessageId(null);//清理m_tree
                    m_tree.setMessage(null);

                    if (m_totalDurationInMicros > 0) {
                        adjustForTruncatedTransaction((Transaction) tree.getMessage());
                    }

                    manager.flush(tree);//將消息放入消費隊列中
                    return true;
                }
            }

            return false;
        }
........
    public void flush(MessageTree tree) {
        if (tree.getMessageId() == null) {
            tree.setMessageId(nextMessageId());//為消息體生產全局唯一ID,詳見snowflate算法
        }

        MessageSender sender = m_transportManager.getSender();

        if (sender != null && isMessageEnabled()) {
            sender.send(tree);

            reset();//ThreadLocal中存儲的Context清理
        } else {
            m_throttleTimes++;

            if (m_throttleTimes % 10000 == 0 || m_throttleTimes == 1) {
                m_logger.info("Cat Message is throttled! Times:" + m_throttleTimes);
            }
        }
    }
........
    private Context getContext() {
        if (Cat.isInitialized()) {
            Context ctx = m_context.get();//ThreadLocal存儲一個Context對象

            if (ctx != null) {
                return ctx;
            } else {
                if (m_domain != null) {
                    ctx = new Context(m_domain.getId(), m_hostName, m_domain.getIp());
                } else {
                    ctx = new Context("Unknown", m_hostName, "");
                }

                m_context.set(ctx);
                return ctx;
            }
        }

        return null;
    }

//TcpSocketSender.send(MessageTree tree)

    private MessageQueue m_queue = new DefaultMessageQueue(SIZE);

    private MessageQueue m_atomicTrees = new DefaultMessageQueue(SIZE);

    @Override
    public void send(MessageTree tree) {
        if (isAtomicMessage(tree)) {
            boolean result = m_atomicTrees.offer(tree, m_manager.getSample());

            if (!result) {
                logQueueFullInfo(tree);
            }
        } else {
            boolean result = m_queue.offer(tree, m_manager.getSample());

            if (!result) {
                logQueueFullInfo(tree);
            }
        }
    }

至此,構造的消息體放入了阻塞隊列中等待上傳。

總結: 我們可以看到Cat-SDK通過ThreadLocal對消息進行收集, 

收集進來按照時間以及類型構造為Tree結構,在compele()方法中將這個構造的消息放入一個內存隊列中,等待TcpSockekSender這個Daemon線程異步上報給服務端。

 4.1.6 cat-TcpSocketSender

消息上傳服務端,會有一個線程cat-TcpSocketSender監聽消費隊列,並消費(上傳服務端)。

通信上報服務端使用了Netty-Client,並且自定義了消息協議。

    @Override
    public void run() {
        m_active = true;

        while (m_active) {
            ChannelFuture channel = m_manager.channel();

            if (channel != null && checkWritable(channel)) {
                try {
                    MessageTree tree = m_queue.poll();

                    if (tree != null) {
                        sendInternal(tree);//netty NIO編碼后TCP發送到服務端。
                        tree.setMessage(null);
                    }

                } catch (Throwable t) {
                    m_logger.error("Error when sending message over TCP socket!", t);
                }
            } else {
                long current = System.currentTimeMillis();
                long oldTimestamp = current - HOUR;

                while (true) {
                    try {
                        MessageTree tree = m_queue.peek();

                        if (tree != null && tree.getMessage().getTimestamp() < oldTimestamp) {
                            MessageTree discradTree = m_queue.poll();

                            if (discradTree != null) {
                                m_statistics.onOverflowed(discradTree);
                            }
                        } else {
                            break;
                        }
                    } catch (Exception e) {
                        m_logger.error(e.getMessage(), e);
                        break;
                    }
                }

                try {
                    Thread.sleep(5);
                } catch (Exception e) {
                    // ignore it
                    m_active = false;
                }
            }
        }
    }

    private void sendInternal(MessageTree tree) {
        ChannelFuture future = m_manager.channel();
        ByteBuf buf = PooledByteBufAllocator.DEFAULT.buffer(10 * 1024); // 10K

        System.out.println(tree);

        m_codec.encode(tree, buf);//編碼后發送

        int size = buf.readableBytes();
        Channel channel = future.channel();

        channel.writeAndFlush(buf);
        if (m_statistics != null) {
            m_statistics.onBytes(size);
        }
    }

 

4.1.7 cat-merge-atomic-task

符合如下邏輯判斷的atomicMessage會放入m_atomicTrees消息隊列,然后由這個后台線程監聽並消費。 

具體代碼如下:

TcpSocketSender.java

private MessageQueue m_atomicTrees = new DefaultMessageQueue(SIZE);

......

        private boolean isAtomicMessage(MessageTree tree) {
        Message message = tree.getMessage();//從tree上拿去message

        if (message instanceof Transaction) {//如果這個message實現了Transaction接口,也就是Transaction類型的消息
            String type = message.getType();

            if (type.startsWith("Cache.") || "SQL".equals(type)) {//如果以Cache.,SQL開頭的則返回True
                return true;
            } else {
                return false;
            }
        } else {
            return true;
        }
        //看到這里,也就是說,"Cache","SQL"開頭的Transaction消息,或者非Transaction消息,認為是atomicMessage.
    }

......

public void send(MessageTree tree) {
        if (isAtomicMessage(tree)) {//如果符合atomicMessage
            boolean result = m_atomicTrees.offer(tree, m_manager.getSample());

            if (!result) {
                logQueueFullInfo(tree);//隊列溢出處理
            }
        } else {
            boolean result = m_queue.offer(tree, m_manager.getSample());

            if (!result) {
                logQueueFullInfo(tree);
            }
        }
    }
......

 

 

public class DefaultMessageQueue implements MessageQueue {
    private BlockingQueue<MessageTree> m_queue;

    private AtomicInteger m_count = new AtomicInteger();

    public DefaultMessageQueue(int size) {
        m_queue = new LinkedBlockingQueue<MessageTree>(size);
    }

    @Override
    public boolean offer(MessageTree tree) {
        return m_queue.offer(tree);
    }

    @Override
    public boolean offer(MessageTree tree, double sampleRatio) {
        if (tree.isSample() && sampleRatio < 1.0) {//如果這個消息是sample,並且sampleRation大於1
            if (sampleRatio > 0) {//這段邏輯就是按采樣率去剔除一些消息,只選取其中一部分進行后續的消費上傳。
                int count = m_count.incrementAndGet();

                if (count % (1 / sampleRatio) == 0) {
                    return offer(tree);
                }
            }
            return false;
        } else {//不做采樣過濾,放入隊列
            return offer(tree);
        }
    }

    @Override
    public MessageTree peek() {
        return m_queue.peek();
    }

    @Override
    public MessageTree poll() {
        try {
            return m_queue.poll(5, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            return null;
        }
    }

    @Override
    public int size() {
        return m_queue.size();
    }
}

 

這個后台進程的消費動作:

......

private boolean shouldMerge(MessageQueue trees) {
        MessageTree tree = trees.peek();//獲取對頭元素,非移除

        if (tree != null) {
            long firstTime = tree.getMessage().getTimestamp();
            int maxDuration = 1000 * 30;
            //消息在30s內生成,或者隊列擠壓消息超過200,則需要merge
            if (System.currentTimeMillis() - firstTime > maxDuration || trees.size() >= MAX_CHILD_NUMBER) {
                return true;
            }
        }
        return false;
    }

......

        @Override
        public void run() {
            while (true) {
                if (shouldMerge(m_atomicTrees)) {
                    MessageTree tree = mergeTree(m_atomicTrees);//把m_atomicTrees隊列中的消息merge為一條消息樹
                    boolean result = m_queue.offer(tree);//放入m_queue隊列,等待cat-TcpSocketSender線程正常消費

                    if (!result) {
                        logQueueFullInfo(tree);
                    }
                } else {
                    try {
                        Thread.sleep(5);
                    } catch (InterruptedException e) {
                        break;
                    }
                }
            }
        }

.....

private MessageTree mergeTree(MessageQueue trees) {
        int max = MAX_CHILD_NUMBER;
        DefaultTransaction tran = new DefaultTransaction("_CatMergeTree", "_CatMergeTree", null);//增加merge處理埋點
        MessageTree first = trees.poll();//從隊列頭部移除

        tran.setStatus(Transaction.SUCCESS);
        tran.setCompleted(true);
        tran.addChild(first.getMessage());
        tran.setTimestamp(first.getMessage().getTimestamp());
        long lastTimestamp = 0;
        long lastDuration = 0;

        //這段邏輯就是不停從這個m_atomicTrees隊列頭部拿去messsage,並使用同一個messageId,把隊列中所有的消息合並為一條Transaction消息。
        while (max >= 0) {
            MessageTree tree = trees.poll();//接着 從隊列頭部移除

            if (tree == null) {
                tran.setDurationInMillis(lastTimestamp - tran.getTimestamp() + lastDuration);
                break;
            }
            lastTimestamp = tree.getMessage().getTimestamp();
            if(tree.getMessage() instanceof DefaultTransaction){
                lastDuration = ((DefaultTransaction) tree.getMessage()).getDurationInMillis();
            } else {
                lastDuration = 0;
            }
            tran.addChild(tree.getMessage());
            m_factory.reuse(tree.getMessageId());
            max--;
        }
        ((DefaultMessageTree) first).setMessage(tran);
        return first;
    } 

4.1.8 TcpSocketSender-ChannelManager 后台線程

這個線程是通過服務端配置的路由ip,10s輪詢一次,當滿足自旋n(n = m_count % 30)次,去檢查路由服務端ip是否變動,並保證連接正常。典型的拉取配置信息機制。

1)客戶端跟服務端連接建立,分兩步:

  • 初始ChannelMananger的時候 ;
  • ChannelManager異步線程,每隔10秒做一次檢查。

初始ChannelMananger的時候

實例化ChannelManager的時候,根據配置的第一個server,從遠程服務器讀取服務器列表,如果能讀取到,則順序建立連接,直到建立成功為止;如果不能讀到,則根據本地配置的列表,逐個建立連接,直到成功為止。

ChannelMananger異步線程,每隔10秒做一次檢查

  • 檢查Server列表是否變更

每間隔10s,檢查當前channelFuture是否活躍,活躍,則300s檢查一次,不活躍,則執行檢查。檢查的邏輯是:比較本地server列表跟遠程服務提供的列表是否相等,不相等則根據遠程服務提供的server列表順序的重新建立第一個能用的ChannelFuture

  • 查看當前客戶端是否有積壓,或者ChannelFuture是否被關閉

如果有積壓,或者關閉掉了,則關閉當前連接,將activeIndex=-1,表示當前連接不可用。

  • 重連默認Server

從0到activeIndex中找一個能連接的server,中心建立一個連接。如果activeIndex為-1,則從整個的server列表中順序的找一個可用的連接建立連接。

2)ChannelManager實例化,建立Netty連接邏輯

客戶端實例化DefaultTransportManager對象時,會按照如下流程先實例化m_tcpSocketSender,接着實例化ChannelManager。ChannelManager管理對服務端的netty連接。 實例化流程如下:

 ChannelManager通過ChannelHolder把netty的ChannnelFuture封裝起來。ChannelHolder結構如下:

public static class ChannelHolder {

        /**
         * 當前活躍的channelFuture
         */
        private ChannelFuture m_activeFuture; /** * 當前server在m_serverAddresses中的第幾個 */ private int m_activeIndex = -1; /** * 當前活躍的ChannelFuture對應的配置 */ private String m_activeServerConfig; /** * 從配置文件中讀取的服務端列表 */ private List<InetSocketAddress> m_serverAddresses; /** * 當前活躍的ChannelFutre對應的ip */ private String m_ip; /** * 連接從第一次初始化開始,是否發生過變更 */ private boolean m_connectChanged;

    //省略其它的代碼
}

3)ChannelManager內部異步線程,動態切換Netty連接邏輯。

ChannelManager內部每隔10秒鍾,檢查netty連接。這部分代碼如下:

    public void run() {
        while (m_active) { /** * make save message id index asyc * 本地存儲index,和 時間戳,防止重啟,導致本地的消息id重了 */ m_idfactory.saveMark(); /** * 檢查本地初始化的服務列表跟遠程的服務列表是否有差異,如果有差異,則取遠程第一個能建立連接的server,建立一個新的連接,關閉舊的連接 */ checkServerChanged(); ChannelFuture activeFuture = m_activeChannelHolder.getActiveFuture(); List<InetSocketAddress> serverAddresses = m_activeChannelHolder.getServerAddresses(); /** * 檢查當前channelFuture是否有消息積壓(本地隊列長度超過4990),或者 channelFuture不是開的 */ doubleCheckActiveServer(activeFuture); /** * 從serverAddresses列表里面,重新順序選一個,重新連接 */ reconnectDefaultServer(activeFuture, serverAddresses); try { Thread.sleep(10 * 1000L); // check every 10 seconds } catch (InterruptedException e) { // ignore  } } }

 總結:服務端沒有做到負載均衡,連接會慢慢連接到server列表里面第一個可用的server上。

4.1.9 StatusUpdateTask

CatClientModule在加載過程中會從StatusUpdateTask中啟動一個線程來每隔一段時間發送一個HeartBeatMessage,其中包括了客戶端能拿到的各種信息,包括CPU,Memory,Disk等等,開發者也可以通過實現StatusExtension接口的方式來實現對於HeartBeatMessage發送內容的擴展。

這個線程很簡單,類似傳統的agent,每分鍾上報關於應用的各種信息(OS、MXBean信息等等)。而且,在每次線程啟動時上報一個Reboot消息表示重啟動。

其中比較重要的實現信息收集的是這行代碼

StatusInfoCollector statusInfoCollector = new StatusInfoCollector(m_statistics, m_jars);
status.accept(statusInfoCollector.setDumpLocked(m_manager.isDumpLocked()));

m_statistics包含的是已經發送過信息的容量,m_jars是通過classLoader加載的jar包名稱,StatusInfoCollector通過大量訪問者模式的代碼實現了將各種指標set到status中的功能,之后將status封裝到HeartBeatMessage中,按照一般對於message的處理流程,flush到消息傳輸層中

4.1.10 MessageId的設計

CAT消息的Message-ID格式applicationName-0a010680-375030-2,CAT消息一共分為四段: 

第一段是應用名applicationName。 

第二段是當前這台機器的IP的16進制格式:

if (m_ipAddress == null) {
            String ip = NetworkInterfaceManager.INSTANCE.getLocalHostAddress();
            List<String> items = Splitters.by(".").noEmptyItem().split(ip);
            byte[] bytes = new byte[4];

            for (int i = 0; i < 4; i++) {
                bytes[i] = (byte) Integer.parseInt(items.get(i));
            }

            StringBuilder sb = new StringBuilder(bytes.length / 2);

            for (byte b : bytes) {
                //1.一個byte 8位
                //2.先獲取高4位的16進制字符
                //3.在獲取低4位的16進制數            
                sb.append(Integer.toHexString((b >> 4) & 0x0F));//通常使用0x0f來與一個整數進行&運算,來獲取該整數的最低4個bit位
                sb.append(Integer.toHexString(b & 0x0F));
            }

            m_ipAddress = sb.toString();

第三段的375030,是系統當前時間除以小時得到的整點數。 
第四段的2,是表示當前這個客戶端在當前小時的順序遞增號(AtomicInteger自增,每小時結束后重置)。

public String getNextId() {
        String id = m_reusedIds.poll();

        if (id != null) {
            return id;
        } else {
            long timestamp = getTimestamp();

            if (timestamp != m_timestamp) {
                m_index = new AtomicInteger(0);
                m_timestamp = timestamp;
            }

            int index = m_index.getAndIncrement();

            StringBuilder sb = new StringBuilder(m_domain.length() + 32);

            sb.append(m_domain);
            sb.append('-');
            sb.append(m_ipAddress);
            sb.append('-');
            sb.append(timestamp);
            sb.append('-');
            sb.append(index);

            return sb.toString();
        }

 

總之,同一個小時內、同一個domain、同一個ip , messageId的唯一性需要 AtomicInteger保證。

4.2 cat-home設計

4.2.1 服務端初始化

1)Servlet容器加載、啟動

CAT目前是使用war包放入Servlet容器(如:tomcat或者jetty,以下假設使用tomcat容器)中的方式部署啟動。 

熟悉servlet容器的同學應該知道,容器啟動時會讀取每個Context(可理解為web工程)中的web.xml然后啟動Servlet等其他組件。

在cat-home模塊中的web.xml中可以看到,除了容器默認的Servlet之外,tomcat啟動時會啟動CatServlet、MVC這兩個Servlet(因為load-on-startup>0,也就是會調用init方法初始化):

<web-app>

<filter>...</filter>

<servlet>
        <servlet-name>cat-servlet</servlet-name>
        <servlet-class>com.dianping.cat.servlet.CatServlet</servlet-class>
        <load-on-startup>1</load-on-startup>
    </servlet>
    <servlet>
        <servlet-name>mvc-servlet</servlet-name>
        <servlet-class>org.unidal.web.MVC</servlet-class>
        <init-param>
            <param-name>cat-client-xml</param-name>
            <param-value>client.xml</param-value>
        </init-param>
        <init-param>
            <param-name>init-modules</param-name>
            <param-value>false</param-value>
        </init-param>
        <load-on-startup>2</load-on-startup>
    </servlet>

<filter-mapping>...</filter-mapping>
<servlet-mapping>...</servlet-mapping>
<jsp-config>...</jsp-config>

</web-app>

 

2)com.dianping.cat.servlet.CatServlet

按照web.xml中Servlet的加載順序CatServlet會優先於MVC完成初始化。 

CatServlet的邏輯基本可以概括為如下兩條線:

CatServlet.init——>CatServlet.initComponents——>DefaultModuleInitializer.execute(...) 
            ——>com.dianping.cat.CatHomeModule.setup(ModuleContext ctx)
                ——>TCPSocketReceiver(netty服務器)

CatServlet.init——>CatServlet.initComponents——>DefaultModuleInitializer.execute(...) 
        ——>com.dianping.cat.***Module.execute(ModuleContext ctx)(完成各個模塊的初始化)

 

com.dianping.cat.servlet.CatServlet.init(ServletConfig servletConfig)

public void init(ServletConfig config) throws ServletException {
        super.init(config);

        try {//1.plexus IOC容器初始化(根據components.xml的設定完成IOC初始化)
            if (m_container == null) {
                m_container = ContainerLoader.getDefaultContainer();
            }
            //2.用來打印日志的m_logger對象實例化(根據plexus.xml設定完成實例化)
            m_logger = ((DefaultPlexusContainer) m_container).getLoggerManager().getLoggerForComponent(
                  getClass().getName());
            //3.初始化CAT-Server必備的組件模塊:cat-home\cat-consumer\cat-core
            initComponents(config);
        } catch (Exception e) {
            if (m_logger != null) {
                m_logger.error("Servlet initializing failed. " + e, e);
            } else {
                System.out.println("Servlet initializing failed. " + e);
                e.printStackTrace(System.out);
            }

            throw new ServletException("Servlet initializing failed. " + e, e);
        }
    }

進入initComponents(config); 我們繼續看下為了啟動server服務,各個cat-*模塊如何初始化。

com.dianping.cat.servlet.CatServlet.initComponents(ServletConfig servletConfig)

    @Override
    protected void initComponents(ServletConfig servletConfig) throws ServletException {
        try {
        //ModuleContext ctx這個對象里主要作用:
        //1.持有 plexus IOC 容器的引用;
        //2.持有 logger對象引用,用來打日志。
        //3.持有 需要使用到的配置文件路徑。
        //比如:cat-server-config-file=\data\appdatas\cat\server.xml 
        //cat-client-config-file=\data\appdatas\cat\client.xml

            ModuleContext ctx = new DefaultModuleContext(getContainer());
            ModuleInitializer initializer = ctx.lookup(ModuleInitializer.class);
            File clientXmlFile = getConfigFile(servletConfig, "cat-client-xml", "client.xml");
            File serverXmlFile = getConfigFile(servletConfig, "cat-server-xml", "server.xml");

            ctx.setAttribute("cat-client-config-file", clientXmlFile);
            ctx.setAttribute("cat-server-config-file", serverXmlFile);
            //通過查找啟動cat-home必要的模塊,然后依次初始化各個模塊。
            initializer.execute(ctx);
        } catch (Exception e) {
            m_exception = e;
            System.err.println(e);
            throw new ServletException(e);
        }
    }

org.unidal.initialization.DefaultModuleInitializer.execute(…). 執行各個模塊的初始化

@Override
   public void execute(ModuleContext ctx) {

   //我們的topLevelModule是cat-home模塊,通過這個模塊去查找需要依賴的其他模塊並初始化他們。
      Module[] modules = m_manager.getTopLevelModules();
      execute(ctx, modules);
      }


   @Override
   public void execute(ModuleContext ctx, Module... modules) {
      Set<Module> all = new LinkedHashSet<Module>();

      info(ctx, "Initializing top level modules:");

      for (Module module : modules) {
         info(ctx, "   " + module.getClass().getName());
      }

      try {
      //1.根據頂層Module獲取到下層所有依賴到的modules,並分別調用他們的setup方法
         expandAll(ctx, modules, all);
      //2.依次調用module實現類的execute方法
         for (Module module : all) {
            if (!module.isInitialized()) {
               executeModule(ctx, module, m_index++);
            }
         }
      } catch (Exception e) {
         throw new RuntimeException("Error when initializing modules! Exception: " + e, e);
      }
   }

   private void expandAll(ModuleContext ctx, Module[] modules, Set<Module> all) throws Exception {
      if (modules != null) {
         for (Module module : modules) {
            expandAll(ctx, module.getDependencies(ctx), all);

            if (!all.contains(module)) {
               if (module instanceof AbstractModule) {
                  ((AbstractModule) module).setup(ctx);//調用各個module實現類的setup
               }
    //all 最終元素以及順序:
    //CatClientModule\CatCoreModule\CatConsumerModule\CatHomeModule

               all.add(module);
            }
         }
      }
   }

我們看到cat-home模塊是一個頂層模塊,接着根據這個模塊找到其他依賴模塊 (CatClientModule\CatConsumerModule\CatCoreModule),並且依次調用setup方法,解析依次調用模塊的execute方法完成初始化。

Modules之間的設計使用了典型的模板模式。 

模塊依賴關系: 
null<——CatClientModule<——CatClientModule<——CatCoreModule<——CatConsumerModule<——CatHomeModule

接着着重看一下子類 CatHomeModule的setup的實現。注意除了這個子類,Module的子類steup()方法為空 。

com.dianping.cat.CatHomeModule.setup(ModuleContext ctx)

@Override
    protected void setup(ModuleContext ctx) throws Exception {
        File serverConfigFile = ctx.getAttribute("cat-server-config-file");//獲取server.xml文件的路徑
        //通過 plexus IOC 初始化一個 ServerConfigManager bean
        ServerConfigManager serverConfigManager = ctx.lookup(ServerConfigManager.class);
        //通過 plexus IOC 初始化一個 TcpSocketReceiver bean
        final TcpSocketReceiver messageReceiver = ctx.lookup(TcpSocketReceiver.class);
        //加載\...\server.xml中的配置
        serverConfigManager.initialize(serverConfigFile);
        //啟動TCPSocketReceiver,就是一個典型的 netty 事件驅動服務器,用來接收客戶端的TCP長連接請求
        messageReceiver.init();
        //增加一個進程觀察者,在這個JVM關閉時回調
        Runtime.getRuntime().addShutdownHook(new Thread() {

            @Override
            public void run() {
                messageReceiver.destory();
            }
        });
    }

 

各個模塊的啟動,executeModule 

各個模塊setup就說到這里,setup完成后,會依次調用module.execute(…)用來完成各個模塊的啟動。

依次調用: 

CatClientModule\CatCoreModule\CatConsumerModule\CatHomeModule.其中只有CatClientModule、CatHomeModule實現了有效的execute方法。

com.dianping.cat.CatClientModule.execute(ModuleContext ctx) 

注意:這里的客戶端是用來監控服務端的,具體client的解析可以參考cat-client設計

@Override
    protected void execute(final ModuleContext ctx) throws Exception {
        ctx.info("Current working directory is " + System.getProperty("user.dir"));

        // initialize milli-second resolution level timer
        MilliSecondTimer.initialize();

        // tracking thread start/stop
        // Threads用來對線程做管理的類。這里默認給每個新建的線程加上監聽器或者說是觀察者
        Threads.addListener(new CatThreadListener(ctx));

        // warm up Cat: setContainer
        Cat.getInstance().setContainer(((DefaultModuleContext) ctx).getContainer());

        // bring up TransportManager:實例化這個類
        ctx.lookup(TransportManager.class);

        //ClientConfigManager對象是加載了client.xml的客戶端配置管理對象。
        //客戶端的解析不進行展開,請看之前寫的《分布式監控CAT源碼解析——cat-client》
        ClientConfigManager clientConfigManager = ctx.lookup(ClientConfigManager.class);
        if (clientConfigManager.isCatEnabled()) {
            StatusUpdateTask statusUpdateTask = ctx.lookup(StatusUpdateTask.class);
            Threads.forGroup("cat").start(statusUpdateTask);
            LockSupport.parkNanos(10 * 1000 * 1000L); // wait 10 ms

        }
    }

com.dianping.cat.CatHomeModule.execute(ModuleContext ctx) 
CatHomeModule涉及很多可說的,此處暫時不做展開,繼續按照Servlet啟動的流程講解。

    @Override
    protected void execute(ModuleContext ctx) throws Exception {
        ServerConfigManager serverConfigManager = ctx.lookup(ServerConfigManager.class);
        //初始化MessageConsumer子類RealtimeConsumer,不僅實例化這個類MessageConsumer對象,還會把這個類中的成員全部實例化
        //      <plexus>
        //      <components>
        //          <component>
        //              <role>com.dianping.cat.analysis.MessageConsumer</role>
        //              <implementation>com.dianping.cat.analysis.RealtimeConsumer</implementation>
        //              <requirements>
        //                  <requirement>
        //                      <role>com.dianping.cat.analysis.MessageAnalyzerManager</role>
        //                  </requirement>
        //                  <requirement>
        //                      <role>com.dianping.cat.statistic.ServerStatisticManager</role>
        //                  </requirement>
        //                  <requirement>
        //                      <role>com.dianping.cat.config.server.BlackListManager</role>
        //                  </requirement>
        //              </requirements>
        //          </component>

        ctx.lookup(MessageConsumer.class);

        ConfigReloadTask configReloadTask = ctx.lookup(ConfigReloadTask.class);
        Threads.forGroup("cat").start(configReloadTask);

        if (serverConfigManager.isJobMachine()) {
            DefaultTaskConsumer taskConsumer = ctx.lookup(DefaultTaskConsumer.class);

            Threads.forGroup("cat").start(taskConsumer);
        }

        if (serverConfigManager.isAlertMachine()) {//如果當前結點開啟了告警功能,則對每種報表啟動一個daemon線程。1分鍾檢查一次
            BusinessAlert metricAlert = ctx.lookup(BusinessAlert.class);
            NetworkAlert networkAlert = ctx.lookup(NetworkAlert.class);
            DatabaseAlert databaseAlert = ctx.lookup(DatabaseAlert.class);
            SystemAlert systemAlert = ctx.lookup(SystemAlert.class);
            ExceptionAlert exceptionAlert = ctx.lookup(ExceptionAlert.class);
            FrontEndExceptionAlert frontEndExceptionAlert = ctx.lookup(FrontEndExceptionAlert.class);
            HeartbeatAlert heartbeatAlert = ctx.lookup(HeartbeatAlert.class);
            ThirdPartyAlert thirdPartyAlert = ctx.lookup(ThirdPartyAlert.class);
            ThirdPartyAlertBuilder alertBuildingTask = ctx.lookup(ThirdPartyAlertBuilder.class);
            AppAlert appAlert = ctx.lookup(AppAlert.class);
            WebAlert webAlert = ctx.lookup(WebAlert.class);
            TransactionAlert transactionAlert = ctx.lookup(TransactionAlert.class);
            EventAlert eventAlert = ctx.lookup(EventAlert.class);
            StorageSQLAlert storageDatabaseAlert = ctx.lookup(StorageSQLAlert.class);
            StorageCacheAlert storageCacheAlert = ctx.lookup(StorageCacheAlert.class);

            Threads.forGroup("cat").start(networkAlert);
            Threads.forGroup("cat").start(databaseAlert);
            Threads.forGroup("cat").start(systemAlert);
            Threads.forGroup("cat").start(metricAlert);
            Threads.forGroup("cat").start(exceptionAlert);
            Threads.forGroup("cat").start(frontEndExceptionAlert);
            Threads.forGroup("cat").start(heartbeatAlert);
            Threads.forGroup("cat").start(thirdPartyAlert);
            Threads.forGroup("cat").start(alertBuildingTask);
            Threads.forGroup("cat").start(appAlert);
            Threads.forGroup("cat").start(webAlert);
            Threads.forGroup("cat").start(transactionAlert);
            Threads.forGroup("cat").start(eventAlert);
            Threads.forGroup("cat").start(storageDatabaseAlert);
            Threads.forGroup("cat").start(storageCacheAlert);
        }

        final MessageConsumer consumer = ctx.lookup(MessageConsumer.class);
        Runtime.getRuntime().addShutdownHook(new Thread() {

            @Override
            public void run() {
                consumer.doCheckpoint();
            }
        });
    }

至此,CatServlet初始化完成了,接下來會初始化org.unidal.web.MVC這個Servlet。 
我們接着看一下另外一個Servlet:mvc-servlet

3)org.unidal.web.MVC

MVC這個Servlet繼承了AbstractContainerServlet,與CatServlet非常類似,均是AbstractContainerServlet 的實現類。這個Servlet顧名思義就是用來處理請求的,類似Spring中的DispatcherServlet,集中分配進入的請求到對應的Controller。

public void init(ServletConfig config) throws ServletException {…} 

與CatServelet一樣,均繼承自父類:

public void init(ServletConfig config) throws ServletException {
        super.init(config);

        try {
            if (m_container == null) {
            //DefaultPlexusContainer m_container 是單例對象,在CATServlet中已經完成初始化了
                m_container = ContainerLoader.getDefaultContainer();
            }

            m_logger = ((DefaultPlexusContainer) m_container).getLoggerManager().getLoggerForComponent(
                  getClass().getName());

            initComponents(config);
        } ......

 

org.unidal.web.MVC.initComponents(ServletConfig config) throws Exception

  @Override
   protected void initComponents(ServletConfig config) throws Exception {
      // /cat
      String contextPath = config.getServletContext().getContextPath();
      // /cat
      String path = contextPath == null || contextPath.length() == 0 ? "/" : contextPath;

      getLogger().info("MVC is starting at " + path);
//使用client.xml初始化代表CATClient的com.dianping.cat.Cat對象(如果CAT未被初始化)。
      initializeCat(config);
      initializeModules(config);

      m_handler = lookup(RequestLifecycle.class, "mvc");
      m_handler.setServletContext(config.getServletContext());

      config.getServletContext().setAttribute(ID, this);
      getLogger().info("MVC started at " + path);
   }

至此,容器啟動成功,http://localhost:2281/cat/r 進入頁面。

4.2.2 報表展示

對於實時報表,直接通過HTTP請求分發到相應消費機,待結果返回后聚合展示(對分區數據做聚合);歷史報表則直接取數據庫並展示。

 

4.3 cat-core設計

Server的主要入口是cat-core包中的RealTimeConsumer類

RealTimeConsumer類以及RealTimeConsumer的依賴類層級結構如下:

Cat Server功能為解碼消息,解碼后按照固定時間間隔分片,將消息分發到各個Analyzer的消費隊列中,然后由各自的Analyzer進行消費。

TCPSocketReceiver,DefaultMessageHandler

TCPSocketReceiver主要負責使用netty建立server端,接受到tcp請求后將其解碼,通過DefaultMessageHandler將Message交由RealTimeConsumer消費

RealTimeConsumer

在內部初始化PeriodManager,並啟動periodManager的線程,該線程會不斷根據時間間隔生成新的Period對象,並啟動Period對象內的多個PeriodTask線程,PeriodTask線程會根據持有的Anaylyzer和MessageQueue進行消費

當RealTimeConsumer終止時會調用doCheckPoint方法

PeriodManager,PeriodStrategy

PeriodManager主要是以時間切片作為策略來拆分整體數據的,所以PeriodManager中包含的List類型是根據PeriodStrategy中的時間策略獲得的。PeriodManager實現Task接口,他的主要任務是在規定的存活期內,每隔一段固定的時間都會創建新的Period對象,並啟動Period對象內的多個消費線程

Period

Period中主要包含了一個類型為Map < String, List < PeriodTask > >的屬性,該屬性根據MessageAnalyzerManager構建。Map < String, List < PeriodTask > >屬性是一個在該Period時間片內,不同類型的Analyzer與各個PeriodTask之間的對應關系,因為偶爾有同一個Analyzer會有多個PeriodTask一同消費,根據Hash進行分配的情況,所以value的類型為List。

PeriodTask是消費Message的消費單元,每個PeriodTask中包含了一個queue,一個analyzer,PeriodTask會一直從queue中取出Message讓analyzer進行消費

distribute方法實現了將Message分發到該Period中所有PeriodTask中的功能

start方法啟動各個PeriodTask線程,對各個PeriodTask的queue中的Message開始消費

finish方法調用各個PeriodTask的finish方法

MessageAnalyzerManager

持有Map < Long, Map < String, List < MessageAnalyzer > > >,該屬性包含了各個Analyzer的實例,每個實例可以通過——時間片——analyzer類型/名字來獲得,analyzer的數量由各個MessageAnalyzer中getAnalyzerCount獲得

PeriodTask

PeriodTask實現Task接口,每個PeriodTask會持有自己專屬的analyzer和queue,在線程啟動后會調用analyzer的consume方法來消費queue。在調用finish時會調用checkPoint方法,執行analyzer實現的檢查點方法

4.3.1 CAT服務端接收MessageTree

消息通過netty發送到服務端,經過MessageDecoder將字節流轉換成文本,PlainTextMessageCode將文本消息轉換成一棵消息樹,DefaultMessageHandler調用RealtimeConsumer實時消費消息樹,RealtimeConsumer調用Period(沒有就生成),將消息樹分發對應的PeriodTask的隊列里面,供對應的Analyzer處理。

4.3.2 EventAnalyzer介紹

Cat server中,以PeriodTask為消費單元,使用MessageAnalyzer進行消息消費,本篇介紹一下EventAnalyzer的功能,並捎帶介紹一下MessageAnalyzer的實現

MessageAnalyzer接口實現結構如下

4.3.3 DumpAnalyzer介紹

 CatServer中,可以定時把消息存儲到hdfs中,dumpAnalyzer就是用來支持這種功能的

LocalMessageBucketManager

ConcurrentHashMap <String, LocalMessageBucket > m_buckets主要根據持久化的日志路徑保存LocalMessageBucket對象

BlockingQueue < MessageBlock > m_messageBlocks 保存MessageItem經過gzip壓縮的block

ConcurrentHashMap < Integer, LinkedBlockingQueue < MessageItem > > m_messageQueues 在內存中持有各個gzip執行線程壓縮隊列對象,根據線程的index作為索引

BlockDumper負責將gzip壓縮過的block持久化到本地文件

MessageGzip負責定時壓縮MessageItem

LogviewUploader負責上傳logview到hdfs

  • archive 把傳入時間范圍內的,將bucket已經壓縮到block,但是沒有flush的MessageBlock放入m_messageBlocks消費隊列中,供BlockDumper,LogviewUploader消費
  • initialize 啟動blockDumper線程,啟動LogviewUploader線程,啟動若干個MessageGzip線程,各自干活
  • loadMessage 從文件中將消息加載出來
  • storeMessage 根據domain,ip等的hash將MessageItem放入MessageGzip線程的消費隊列,供其壓縮生成MessageBlock

LocalMessageBucket

對Message進行讀寫,壓縮,解壓縮的處理單元,LocalMessageBucket使用basePath/{date,yyyyMMdd}/{date,HH}/{name}的路徑生成壓縮文件,對消息的各種讀寫壓縮操作大多都與該文件有關。

  • storeMessage 通過傳入的MessageItem的ByteBuf生成壓縮過的MessageBlock對象
  • findById 根據MessageId加載該Bucket對應文件中的MessageTree對象

MessageBlock

壓縮后的消息信息的持有者,index是Message的id,size是對應message的size

MessageGzip

MessageItem的消費者,消費存儲在ConcurrentHashMap <Integer, LinkedBlockingQueue > m_messageQueues中的MessageItem,將MessageItem壓縮成MessageBlock,每個MessageGzip線程都有一個自己的Queue,Integer是每個線程對應queue的索引

BlockDumper

MessageBlock的消費者,消費存儲在BlockingQueue m_messageBlocks中的MessageBlock,將其通過LocalMessageBucket持久化到本地

##LogViewUploader
上傳logView到hdfs

4.3.4 TaskConsumer介紹

后台的Analyzer在歸檔時會生成Task的記錄到數據庫中,server在CatConsumerModule中初始化過程中啟動了TaskConsumer線程來處理數據庫中的記錄

ReportFacade

此類會在初始化時,將注冊在plexus的所有builder加載到m_reportBuilders中,在執行builderReport時,根據傳入的task的reportName查找對應的TaskBuilder,根據時間片,domain以及reportName查詢已經入庫的report基本記錄,再將report的基本記錄合並,並將合並后的report,生成的graph入庫。

在生成聚合report的過程中,會根據層級樹自上至下歸並遞歸生成,比如月的根據周,周根據日,日根據小時生成。

EventReportBuilder

基於時間片,調用EventService實現入庫聚合報表,入庫聚合graph的功能

EventService

基於domain實現報表插入,報表查詢等功能,是業務執行的基本單元

4.3.5 TcpSocketReceiver

在CAT-Server啟動時會啟動Netty的Nio 多線程Reactor模塊來接收客戶端的請求:

  • 一個Accept線程池(Main Reactor Thread Pool )用來處理連接操作(通常還可以在這各Accept中加入權限驗證、名單過濾邏輯);
  • 接着Accept連接成功的socket請求被轉發到 專門處理IO操作的線程池(Sub Reactor Thread Pool ,實現異步);在這里做了消息的解碼處理;
  • 再接着,解碼處理后,將消息發送到每個報表解析器內置的內存隊列中。消息將被異步分發給各個解析器單獨處理(不存在數據競爭)。

消息的接受是在這個類TcpSocketReceiver.java完成的:

    // 在CatHomeModule啟動時被調用
    public void init() {
        try {
            startServer(m_port);
        } catch (Throwable e) {
            m_logger.error(e.getMessage(), e);
        }
    }
    /**
     * 啟動一個netty服務端
     * @param port
     * @throws InterruptedException
     */
    public synchronized void startServer(int port) throws InterruptedException {
        boolean linux = getOSMatches("Linux") || getOSMatches("LINUX");
        int threads = 24;
        ServerBootstrap bootstrap = new ServerBootstrap();
        //linux走epoll的事件驅動模型
        m_bossGroup = linux ? new EpollEventLoopGroup(threads) : new NioEventLoopGroup(threads);//用來做為接受請求的線程池 master線程
        m_workerGroup = linux ? new EpollEventLoopGroup(threads) : new NioEventLoopGroup(threads);//用來做為處理請求的線程池 slave線程
        bootstrap.group(m_bossGroup, m_workerGroup);
        bootstrap.channel(linux ? EpollServerSocketChannel.class : NioServerSocketChannel.class);

        bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {//channel初始化設置
            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
                ChannelPipeline pipeline = ch.pipeline();

                pipeline.addLast("decode", new MessageDecoder());//增加消息解碼器
            }
        });
        // 設置channel的參數
        bootstrap.childOption(ChannelOption.SO_REUSEADDR, true);
        bootstrap.childOption(ChannelOption.TCP_NODELAY, true);
        bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
        bootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);

        try {
            m_future = bootstrap.bind(port).sync();//綁定監聽端口,並同步等待啟動完成
            m_logger.info("start netty server!");
        } catch (Exception e) {
            m_logger.error("Started Netty Server Failed:" + port, e);
        }
    }

啟動netty,對每個客戶端上報的消息都會做解碼處理,從字節流轉換為消息樹MessageTree tree,接着交給DefaultMessageHandler處理。

public class DefaultMessageHandler extends ContainerHolder implements MessageHandler, LogEnabled {

    /*
     * MessageConsumer按每個period(整小時一個period)組合了多個解析器,用來解析生產多個報表(如:Transaction、
     * Event、Problem等等)。一個解析器對象-一個有界隊列-一個整小時時間組合了一個PeriodTask,輪詢的處理這個有界隊列中的消息
     */
    @Inject
    private MessageConsumer m_consumer;

    private Logger m_logger;

    @Override
    public void enableLogging(Logger logger) {
        m_logger = logger;
    }

    @Override
    public void handle(MessageTree tree) {
        if (m_consumer == null) {
            m_consumer = lookup(MessageConsumer.class);//從容器中加載MessageConsumer實例
        }

        try {
            m_consumer.consume(tree);//消息消費
        } catch (Throwable e) {
            m_logger.error("Error when consuming message in " + m_consumer + "! tree: " + tree, e);
        }
    }
}

 

OMS設計是按照每小時去匯總數據,為什么要使用一個小時的粒度呢? 

這個是一個trade-off,實時內存數據處理的復雜度與內存的開銷方面的折中方案。 

在這個小時結束后將生成的Transaction\Event\Problean報表存入Mysql、File(機器根目錄俠)。然而為了實時性,當前小時的報表是保存在內存中的。

PeriodManager 用來管理 OMS單位小時內的各種類型的解析器,包括將上報的客戶端數據派發給不同的解析器(這種派發可以理解為訂閱\發布)。每個解析器,將收到的消息存入內置隊列,並且用單獨的線程去獲取消息並處理。

com.dianping.cat.analysis.PeriodManager

public class PeriodManager implements Task {
     public void init() {
        long startTime = m_strategy.next(System.currentTimeMillis());//當前小時的起始時間

        startPeriod(startTime);
    }

    @Override
    public void run() {
   // 1s檢查一下當前小時的Period對象是否需要創建(一般都是新的小時需要創建一個Period代表當前小時)
        while (m_active) {
            try {
                long now = System.currentTimeMillis();
                //value>0表示當前小時的Period不存在,需要創建一個
                //如果當前線小時的Period存在,那么Value==0
                long value = m_strategy.next(now);
                if (value > 0) {
                    startPeriod(value);
                } else if (value < 0) {
                    //  //當這個小時結束后,會異步的調用endPeriod(..),將過期的Period對象移除,help GC
                    Threads.forGroup("cat").start(new EndTaskThread(-value));
                }
            } catch (Throwable e) {
                Cat.logError(e);
            }

            try {
                Thread.sleep(1000L);
            } catch (InterruptedException e) {
                break;
            }
        }
    }
    //當這個小時結束后,會異步的調用這個方法,將過期的Period對象移除,help GC
    private void endPeriod(long startTime) {
        int len = m_periods.size();

        for (int i = 0; i < len; i++) {
            Period period = m_periods.get(i);

            if (period.isIn(startTime)) {
                period.finish();
                m_periods.remove(i);
                break;
            }
        }
    }

......
}   

 

消息消費是由MessageConsumer的實現類RealtimeConsumer處理

com.dianping.cat.analysis.RealtimeConsumer.consume(MessageTree tree)

    @Override
    public void consume(MessageTree tree) {
        String domain = tree.getDomain();
        String ip = tree.getIpAddress();

        if (!m_blackListManager.isBlack(domain, ip)) {// 全局黑名單 按domain-ip
            long timestamp = tree.getMessage().getTimestamp();
            //PeriodManager用來管理、啟動periodTask,可以理解為每小時的解析器。
            Period period = m_periodManager.findPeriod(timestamp);//根據消息產生的時間,查找這個小時所屬的對應Period

            if (period != null) {
                period.distribute(tree);//將解碼后的tree消息依次分發給所有類型解析器
            } else {
                m_serverStateManager.addNetworkTimeError(1);
            }
        } else {
            m_black++;

            if (m_black % CatConstants.SUCCESS_COUNT == 0) {
                Cat.logEvent("Discard", domain);
            }
        }
    }

 

分發消息給各個解析器(類似向訂閱者發布消息) 
void com.dianping.cat.analysis.Period.distribute(MessageTree tree)

    /**
     * 將解碼后的tree消息依次分發給所有類型解析器
     * @param tree
     */
    public void distribute(MessageTree tree) {
        m_serverStateManager.addMessageTotal(tree.getDomain(), 1);// 根據domain,統計消息量
        boolean success = true;
        String domain = tree.getDomain();

        for (Entry<String, List<PeriodTask>> entry : m_tasks.entrySet()) {
            List<PeriodTask> tasks = entry.getValue();//某種類型報表的解析器
            int length = tasks.size();
            int index = 0;
            boolean manyTasks = length > 1;

            if (manyTasks) {
                index = Math.abs(domain.hashCode()) % length;//hashCode的絕對值 % 長度 =0~length-1之間的任一個數
            }
            PeriodTask task = tasks.get(index);
            boolean enqueue = task.enqueue(tree);//注意:這里會把同一個消息依依放入各個報表解析中的隊列中

            if (enqueue == false) {
                if (manyTasks) {
                    task = tasks.get((index + 1) % length);
                    enqueue = task.enqueue(tree);//放入隊列,異步消費

                    if (enqueue == false) {
                        success = false;
                    }
                } else {
                    success = false;
                }
            }
        }

        if (!success) {
            m_serverStateManager.addMessageTotalLoss(tree.getDomain(), 1);
        }
    }

 

PeriodTask 
每個periodTask對應一個線程,m_analyzer對應解析器處理m_queue中的消息

public class PeriodTask implements Task, LogEnabled {
    @Override
    public void run() {//每個periodTask對應一個線程,m_analyzer對應解析器處理m_queue中的消息
        try {
            m_analyzer.analyze(m_queue);
        } catch (Exception e) {
            Cat.logError(e);
        }
    }

 

4.3.6 AbstractMessageAnalyzer

    @Override
    public void analyze(MessageQueue queue) {// 解析器在當前小時內自旋,不停從隊列中拿取消息,然后處理
        while (!isTimeout() && isActive()) {// timeOut:當前時間>小時的開始時間+一小時+三分鍾;
                                            // isActive默認為true,調用shutdown后為false
            MessageTree tree = queue.poll();// 非阻塞式獲取消息

            if (tree != null) {
                try {
                    process(tree);// 解析器實現類 override
                } catch (Throwable e) {
                    m_errors++;

                    if (m_errors == 1 || m_errors % 10000 == 0) {
                        Cat.logError(e);
                    }
                }
            }
        }
        // 如果當前解析器以及超時,那么處理完對應隊列內的消息后返回。
        while (true) {
            MessageTree tree = queue.poll();

            if (tree != null) {
                try {
                    process(tree);
                } catch (Throwable e) {
                    m_errors++;

                    if (m_errors == 1 || m_errors % 10000 == 0) {
                        Cat.logError(e);
                    }
                }
            } else {
                break;
            }
        }
    }

 消費流程圖:

 

總結:

消息發送到服務端,服務端解碼為 MessageTree准備消費。期間存在一個demon線程,1s檢查一下當前小時的Period對象是否需要創建(一般都是新的小時需要創建一個Period代表當前小時)。

如果當前小時的Period存在,那么我們的MessageTree會被分發給各個PeriodTask,這里其實就是把消息發送到每個PeriodTask中的內存隊列里,然后每個Task異步去消費。就是通過使用Queue實現了解耦與延遲異步消費。

每個PeriodTask持有MessageAnalyzer analyzer(Transaction\Event\Problean…每種報表都對應一個解析器的實現類)、MessageQueue queue對象,PeriodTask會不停地解析被分發進來的MessageTree,形成這個解析器所代表的報表。

當前時間進入下個小時,會創建一個新的當前小時的Period,並且異步的remove之前的Period。

注意,這里有個比較坑的地方是,作者沒有使用線程池,每小時各個解析器的線程並沒有池化,而是直接銷毀后再次創建!

 

4.4 cat-consumer設計

4.4.1 報表部分數值計算公式

1)數據結構

TRANSACTIONREPORT報表的數據結構如下:

 

CROSSREPORT內存報表數據結構:

 

數據示例:

<?xml version="1.0" encoding="utf-8"?>
<cross-report domain="monitor-cat" startTime="2017-12-28 18:00:00" endTime="2017-12-28 18:59:59">
   <domain>monitor-cat</domain>
   <domain>monitor-dubbo</domain>
   <ip>10.15.83.181</ip>
   <local id="10.15.83.181">


      <remote id="10.15.83.181(monitor-cat):Pigeon.Client" role="Pigeon.Client" app="monitor-cat" ip="10.15.83.181(monitor-cat)">
         <type id="PigeonService" totalCount="19" failCount="0" failPercent="0.00" avg="0.00" sum="28.89" tps="0.00">
            <name id="DubboProviderService.getProviderServiceName" totalCount="19" failCount="0" failPercent="0.00" avg="0.00" sum="28.89" tps="0.00"/>
         </type>
      </remote>


      <remote id="10.15.83.181(monitor-dubbo):Pigeon.Server" role="Pigeon.Server" app="monitor-dubbo" ip="10.15.83.181(monitor-dubbo)">
         <type id="PigeonCall" totalCount="19" failCount="0" failPercent="0.00" avg="0.00" sum="461.26" tps="0.00">
            <name id="DubboProviderService.getProviderServiceName" totalCount="19" failCount="0" failPercent="0.00" avg="0.00" sum="461.26" tps="0.00"/>
         </type>
      </remote>


   </local>
</cross-report>

 

2)計算95線、99線

    /**
     * 統計95線,99.9線。
     * 思路:
     *    求請求的總數,假設是100,求線余下的數目,如果是95線,那余下是5,
     *    將之前統計的durations放進treeMap里面,倒序
     *    從前向后遍歷,找到第5個元素,拿到對應的值
     *
     *    基本上是按照95線的定義來取數的
     * @param durations
     * @param percent
     * @return
     */
    private double computeLineValue(Map<Integer, AllDuration> durations, double percent) {
        int totalCount = 0;
        Map<Integer, AllDuration> sorted = new TreeMap<Integer, AllDuration>(TransactionComparator.DESC);

        sorted.putAll(durations);

        for (AllDuration duration : durations.values()) {
            totalCount += duration.getCount();
        }

        int remaining = (int) (totalCount * (100 - percent) / 100);

        for (Entry<Integer, AllDuration> entry : sorted.entrySet()) {
            remaining -= entry.getValue().getCount();

            if (remaining <= 0) {
                return entry.getKey();
            }
        }

        return 0.0;
    }

3)求方差

    /**
     * 求方差 value = 求和(xi * xi)/count  - 均值*均值
     *      value = 開方(value)
     * @param count
     * @param avg
     * @param sum2
     * @param max
     * @return
     */
    double std(long count, double avg, double sum2, double max) {
        double value = sum2 / count - avg * avg;

        if (value <= 0 || count <= 1) {
            return 0;
        } else if (count == 2) {
            return max - avg;
        } else {
            return Math.sqrt(value);
        }
    }

 4.5 存儲設計(cat-hadoop)

存儲主要分成兩類:一個是 報表(Transaction、Event、Problem….),一個是logview,也是就是原始的MessageTree。

所有原始消息會先存儲在本地文件系統,然后上傳到HDFS中保存;而對於報表,因其遠比原始日志小,則以K/V的方式保存在MySQL中。

報表存儲:在每個小時結束后,將內存中的各個XML報表 保存到MySQL、File(/data/appdatas/cat/bucket/report…)中

logView的保存有后台線程(默認20個,Daemon Thread [cat-Message-Gzip-n])輪詢處理:會間隔一段時間后從消息隊列中拿取MessageTree,並進行編碼壓縮,保存到\data\appdatas\cat\bucket\dump\年月\日\domain-ip1-ip2-ipn目錄下。

com.dianping.cat.consumer.dump.LocalMessageBucketManager.MessageGzip.run()

    @Override
        public void run() {
            try {
                while (true) {
                    MessageItem item = m_messageQueue.poll(5, TimeUnit.MILLISECONDS);

                    if (item != null) {
                        m_count++;
                        if (m_count % (10000) == 0) {
                            gzipMessageWithMonitor(item);//數量達到10000的整數倍,通過上報埋點記錄監控一下
                        } else {
                            gzipMessage(item);
                        }
                    }
                }
            } catch (InterruptedException e) {
                // ignore it
            }
        }


private void gzipMessage(MessageItem item) {
            try {
                MessageId id = item.getMessageId();
                String name = id.getDomain() + '-' + id.getIpAddress() + '-' + m_localIp;
                String path = m_pathBuilder.getLogviewPath(new Date(id.getTimestamp()), name);
                LocalMessageBucket bucket = m_buckets.get(path);

                if (bucket == null) {
                    synchronized (m_buckets) {
                        bucket = m_buckets.get(path);
                        if (bucket == null) {
                            bucket = (LocalMessageBucket) lookup(MessageBucket.class, LocalMessageBucket.ID);
                            bucket.setBaseDir(m_baseDir);
                            bucket.initialize(path);

                            m_buckets.put(path, bucket);
                        }
                    }
                }

                DefaultMessageTree tree = (DefaultMessageTree) item.getTree();
                ByteBuf buf = tree.getBuffer();
                MessageBlock block = bucket.storeMessage(buf, id);

                if (block != null) {
                    if (!m_messageBlocks.offer(block)) {
                        m_serverStateManager.addBlockLoss(1);
                        Cat.logEvent("DumpError", tree.getDomain());
                    }
                }
            } catch (Throwable e) {
                Cat.logError(e);
            }
        }

    public MessageBlock storeMessage(final ByteBuf buf, final MessageId id) throws IOException {
        synchronized (this) {
            int size = buf.readableBytes();

            m_dirty.set(true);
            m_lastAccessTime = System.currentTimeMillis();
            m_blockSize += size;
            m_block.addIndex(id.getIndex(), size);
            buf.getBytes(0, m_out, size); // write buffer and compress it

            if (m_blockSize >= MAX_BLOCK_SIZE) {
                return flushBlock();
            } else {
                return null;
            }
        }
    }

 

4.5.1 logView的文件存儲設計

6. 客戶端接入 

6.1 mybatis接入

使用ORM插件-MybatisPlugin

效果:

6.2 log4j日志接入

Throwable(及其子類)異常上報,使用log日志框架的appender機制

 <root level="INFO">
        <appender-ref ref="STDOUT" />
        <appender-ref ref="CatAppender" />
 </root>

 

效果:

 

6.3 URL請求監控

使用Filter機制實現,並實現URL聚合等其他功能。

6.4 代碼級別監控

aop 做監控,內部封裝,將aop-expression暴露出來給用戶配置(填寫需要監控的實現類范圍)。字節碼織入技術。

 

6.5 接口抽象\靜態綁定

為了向后兼容,輕松替換埋點的實現。比如切換為zipkin或者其他產品的API。從而減少對業務線的影響。 

仿照slf4j的設計,使用了靜態綁定。

6.6 分布式調用鏈

CatCrossHttpClient作為 httpClient的代理類暴露給用戶使用。

@Component
public class CatCrossHttpClient
  extends HttpClientProxy
{
  private String serverToCall;

  public void setServerToCall(String serverToCall)
  {
    this.serverToCall = serverToCall;
  }

  public String execute(HttpRequestBase request, int socketTimeout, int connectTimeout)
    throws Exception
  {
    Transaction t = Cat.newTransaction("PigeonCall", request.getURI().getPath());
    createCrossReport(request, this.serverToCall);
    Cat.Context context = new CatContext();
    Cat.logRemoteCallClient(context);
    request.setHeader("_catRootMessageId", context.getProperty("_catRootMessageId"));
    request.setHeader("_catParentMessageId", context.getProperty("_catParentMessageId"));
    request.setHeader("_catChildMessageId", context.getProperty("_catChildMessageId"));
    request.setHeader("ClientApplication.Name", Cat.getManager().getDomain());
    try
    {
      String ret = super.execute(request, socketTimeout, connectTimeout);
      t.setStatus("0");
      return ret;
    }
    catch (Exception e)
    {
      Cat.logEvent("HTTP_REST_CAT_ERROR", request.getURI().toString(), e.getMessage(), " ");
      t.setStatus(e);
      throw e;
    }
    finally
    {
      t.complete();
    }
  }

  private void createCrossReport(HttpRequestBase request, String serverToCall)
    throws Exception
  {
    Cat.logEvent("PigeonCall.app", serverToCall, "0", " ");

    Cat.logEvent("PigeonCall.server", request.getURI().getHost(), "0", " ");

    Cat.logEvent("PigeonCall.port", String.valueOf(request.getURI().getPort()), "0", " ");
    MessageTree tree = Cat.getManager().getThreadLocalMessageTree();
    ((DefaultMessageTree)tree).setDomain(Cat.getManager().getDomain());
    ((DefaultMessageTree)tree).setIpAddress(InetAddress.getLocalHost().getHostAddress());
  }
}

 

當有外部請求進來,通過如下的Filter判斷請求中是否帶有分布式調用鏈埋點,如果有就繼續構造這個鏈條。當然分布式調用鏈得以使用的前提是被監控的應用中有引入這兩個功能!

public class HttpCatCrossFilter
  implements Filter
{
  private static final Logger logger = LoggerFactory.getLogger(HttpCatCrossFilter.class);

  public void doFilter(ServletRequest req, ServletResponse resp, FilterChain filterChain)
    throws IOException, ServletException
  {
    HttpServletRequest request = (HttpServletRequest)req;
    if (isCatTracing(request))
    {
      String requestURI = request.getRequestURI();

      Transaction t = Cat.newTransaction("PigeonService", requestURI);
      try
      {
        Cat.Context context = new CatContext();
        context.addProperty("_catRootMessageId", request.getHeader("_catRootMessageId"));
        context.addProperty("_catParentMessageId", request.getHeader("_catParentMessageId"));
        context.addProperty("_catChildMessageId", request.getHeader("_catChildMessageId"));
        Cat.logRemoteCallServer(context);

        createCrossReport(request, t);

        filterChain.doFilter(req, resp);

        t.setStatus("0");
      }
      catch (Exception e)
      {
        logger.error("Get cat msgtree error :" + e);
        Cat.logEvent("HTTP_REST_CAT_ERROR", request.getRequestURL().toString(), e.getMessage(), " ");
        t.setStatus(e);
      }
      finally
      {
        t.complete();
      }
    }
    else
    {
      filterChain.doFilter(req, resp);
    }
  }

  private void createCrossReport(HttpServletRequest request, Transaction t)
    throws Exception
  {
    Cat.logEvent("PigeonService.app", request.getHeader("ClientApplication.Name"), "0", " ");

    Cat.logEvent("PigeonService.client", request.getRemoteAddr(), "0", " ");

    MessageTree tree = Cat.getManager().getThreadLocalMessageTree();

    ((DefaultMessageTree)tree).setDomain(Cat.getManager().getDomain());
    ((DefaultMessageTree)tree).setIpAddress(InetAddress.getLocalHost().getHostName());
  }

  public void init(FilterConfig arg0)
    throws ServletException
  {}

  public void destroy() {}

  private boolean isCatTracing(HttpServletRequest request)
  {
    return (null != request.getHeader("_catRootMessageId")) && (null != request.getHeader("_catParentMessageId")) && (null != request.getHeader("_catChildMessageId"));
  }
}

 

效果:

7. 各開源監控對比

zipkin:

優點:分布式調用鏈理論的實現系統。最大的特點是分布式調用鏈。Spring Cloud Sleuth 可以方便的對zipkin元數據進行采集。 

缺點:功能單一,監控維度、監控信息不夠豐富。沒有告警功能。

pinpoint:

優點:使用字節碼織入技術,對用戶完全透明,實現自動埋點。可展示代碼級別監控。 

缺點:  功能不足夠豐富。對於其他非java程序,實現客戶端難度大。

Cat:  

優點:功能豐富,多模型報表展示。可展示代碼級別監控。以及特殊業務數據監控。支持多語言客戶端。多數情況可以替代日志的查看。 

缺點:  手動埋點,需要改造才能減少埋點的侵入性。

 

8. 技術代碼賞析

8.1 MilliSecondTimer

/**
 * This timer provides milli-second precise system time.
 */
public class MilliSecondTimer {
    private static long m_baseTime;

    private static long m_startNanoTime;

    private static boolean m_isWindows = false;

    public static long currentTimeMillis() {
        if (m_isWindows) {
            if (m_baseTime == 0) {
                initialize();
            }

            long elipsed = (long) ((System.nanoTime() - m_startNanoTime) / 1e6);

            return m_baseTime + elipsed;
        } else {
            return System.currentTimeMillis();
        }
    }

    public static void initialize() {
        String os = System.getProperty("os.name");

        if (os.startsWith("Windows")) {
            m_isWindows = true;
            m_baseTime = System.currentTimeMillis();

            while (true) {
                LockSupport.parkNanos(100000); // 0.1 ms

                long millis = System.currentTimeMillis();

                if (millis != m_baseTime) {
                    m_baseTime = millis;
                    m_startNanoTime = System.nanoTime();
                    break;
                }
            }
        } else {
            m_baseTime = System.currentTimeMillis();
            m_startNanoTime = System.nanoTime();
        }
    }
}

  System.currentTimeMillis()返回的毫秒,這個毫秒其實就是自1970年1月1日0時起的毫秒數。

  System.nanoTime()返回的是納秒,nanoTime而返回的可能是任意時間,甚至可能是負數。

  System.currentTimeMillis調用的是native方法,使用的是系統的時間,每個JVM對應的應該是相同的,但因為具體的取值依賴於操作系統的實現,不同JVM間可能會有略微的差異。

  System.nanoTime每個JVM維護一份,和系統時間無關,可用於計算時間間隔,比System.currentTimeMillis的精度要高。

  修改了系統時間會對System.currentTimeMillis造成影響,而對System.nanoTime沒有影響。修改系統時間后會有如下效果:Timmer有影響,Thread.sleep有影響,ScheduledThreadPoolExecutor無影響,可以查看方法的實現調用的是System.currentTimeMillis還是System.nanoTime。

java修改系統時間:

  • windows環境下:
  Runtime.getRuntime().exec("cmd /c date 2013-05-06");//Windows 系統

  Runtime.getRuntime().exec("cmd /c time 22:35:00");//Windows 系統
  • linux環境下:
  Runtime.getRuntime().exec(" sudo date -s 2013-05-06")//linux 系統為tomcat用戶分配了權限

  Runtime.getRuntime().exec(" sudo date -s 22:25:00")//linux 系統為tomcat用戶分配了權限

  Linux上獲取的時間不正確,總是相差幾小時考慮時差的問題,修改/etc/sysconfig/clock。

 

9. Q&A

9.1 為什么基於ThreadLocal收集消息?

CAT客戶端在收集端數據方面使用ThreadLocal(線程局部變量),是線程本地變量。保證了線程安全。

業務方在處理業務邏輯時基本都是在一個線程內部調用后端服務、數據庫、緩存等,將這些數據拿回來再進行業務邏輯封裝,最后將結果展示給用戶。所以將監控請求作為一個監控上下文存入線程變量就非常合適。

9.2 為什么要使用TCP協議?

AT使用了TCP協議上報消息(引入了netty框架)。那么為什么不適用http協議上報呢?

選擇TCP的理由:對於客戶端的數據采集盡量降低性能損耗,TCP協議比HTTP協議更加輕量級(比如TCP不需要header等額外的損耗),在高qps的場景下具備明顯的性能優勢。

另外,CAT的設計也不需要保留一個 Http鏈接供外部調用,這樣的埋點方式效率低下,並不考慮。

 

10. 自己實現小工具

10.1 采集阿里鷹眼的數據,轉換成CAT消息樹並展示

需求來源:

由於某種原因,現有采用HSF+淘寶TDDL+Diamond+ONS消息+Tair+Search的技術選型,但是缺乏阿里鷹眼的監控系統。

解決方案:

上述技術選型,會進行阿里鷹眼需要的數據跟蹤鏈進行打點,記錄日志。

把相關日志通過Flume或者ELK進行采集,投遞到Kafka中,實現一個eagleeye-over-cat的springboot應用,並監聽kafka消息。

可參考Spring Cloud Sleuth兼容方案。

10.2  自定義小工具,解析應用服務器的CAT dump文件

需求來源:

本地dump文件為壓縮文件,無法直觀查看,排查問題時對用戶為黑盒。

解決方案:

使用MessageBlockReader進行文件讀取,使用PlainTextMessageCodec把字節解碼為MessageTree。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM