深入詳解美團點評CAT跨語言服務監控(二) CAT服務端初始化


Cat模塊

Cat-client : cat客戶端,編譯后生成 cat-client-2.0.0.jar ,用戶可以通過它來向cat-home上報統一格式的日志信息,可以集成到 mybatis、spring、微服務 dubbo 的監控等等流行框架。 

Cat-consumer: 用於實時分析從客戶端提供的數據。在實際開發和部署中,Cat-consumer和Cat-home是部署在一個JVM內部,每個CAT服務端都可以作為consumer也可以作為home,這樣既能減少整個層級結構,也可以增加系統穩定性。

Cat-core:Cat核心模塊

Cat-hadoop : 大數據統計依賴模塊。

cat-home:大眾點評CAT服務器端主程序,編譯安裝之后生成 cat-alpha-2.0.0.war 包部署於servlet容器中,我們用的是Tomcat,war包依賴cat-client.jar、cat-consumer.jar, cat-core.jar, cat-hadoop.jar 包,通過web.xml 配置,看到Cat會啟動 cat-servlet 和 mvc-servlet , mvc-servlet 是一個類似 spring MVC 的框架,用於處理用戶WEB管理平台請求。cat-servlet是CAT服務端監聽入口,CAT會在這里開啟監聽端口,接收處理客戶端的日志記錄請求,本章主要介紹cat-servlet。

<?xml version="1.0" encoding="UTF-8"?>
<web-app xmlns="http://java.sun.com/xml/ns/javaee" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd"
    version="2.5">
...
    <servlet>
        <servlet-name>cat-servlet</servlet-name>
        <servlet-class>com.dianping.cat.servlet.CatServlet</servlet-class>
        <load-on-startup>1</load-on-startup>
    </servlet>
    <servlet>
        <servlet-name>mvc-servlet</servlet-name>
        <servlet-class>org.unidal.web.MVC</servlet-class>
        <init-param>
            <param-name>cat-client-xml</param-name>
            <param-value>client.xml</param-value>
        </init-param>
        <init-param>
            <param-name>init-modules</param-name>
            <param-value>false</param-value>
        </init-param>
        <load-on-startup>2</load-on-startup>
    </servlet>
....

 

Cat-servlet初始化

 圖1 - 容器初始化類圖

 

CatServlet 首先會調用父類 AbstractContainerServlet 的init方法做初始化工作, 可以認為這是CatServlet的入口,他主要做了3件事情,首先調用基類HttpServlet的init方法對Servlet進行初始化,然后初始化Plexus容器,最后調用子類initComponents初始化Module模塊。

public abstract class AbstractContainerServlet extends HttpServlet {
    public void init(ServletConfig config) throws ServletException {
        super.init(config);
 
        try {
            if(this.m_container == null) {
                this.m_container = ContainerLoader.getDefaultContainer();
            }
 
            this.m_logger = this.m_container.getLogger();
            this.initComponents(config);
        } catch (Exception var3) {
            ...
        }
    }
}

 

plexus - IOC容器

 上面講到init(...)方法在初始化完Servlet之后調用 ContainerLoader.getDefaultContainer() 初始化plexus容器。

    注:這里可能大家不太了解plexus,它相當於Spring的IoC容器,但是它和Spring框架不同,它並不是一個完整的,擁有各種組件的大型框架,僅僅是一個純粹的IoC容器,它的開發者與Maven的開發者是同一撥人,最初開發Maven的時候,Spring並不成熟,所以Maven的開發者決定使用自己維護的IoC容器Plexus,它與Spring在語法和描述方式稍有不同。在Plexus中,有ROLE的概念,相當於Spring中的一個Bean。支持組件生命周期管理。

   非JAVA開發者不懂IOC容器?簡單來說,IOC容器就相當於一個對象裝載器,對象不是由程序員new創建,而是框架在初始化的時候從配置文件中讀取需要實例化的類信息,將信息裝入一個對象裝載器,然后在需要的時候,從對象裝載器中找是否存在該類的信息,存在則返回類的對象。

        plexus容器是如何工作的呢?就上面的類圖來說,

    a. AbstractContainerServlet 通過容器工廠ContainerLoader 的 getDefaultContainer方法,該方法會創建 MyPlexusContainer 容器,MyPlexusContainer是接口 PlexusContainer 的實現,MyPlexusContainer在構造函數中會創建組件管理器(ComponentManager),可以認為每個類都是容器中的一個組件,ComponentManager就是用來管理這些組件的,包括他的生命周期,組件在Plexus容器配置文件中配置。

     b.組件管理器(ComponentManager)會創建組件模型管理器(ComponentModelManager)以及組件生命周期管理器(ComponentLifecycle),ComponentModelManager用於存儲Plexus容器配置文件中的所有component組件信息,它的loadComponentsFromClasspath()方法會掃描各個jar包中存在的plexus容器配置文件,如圖2,將xml內容解析之后放入PlexusModel 列表中。

public class ComponentManager {
    private Map<String, ComponentBox<?>> m_components = new HashMap();
    private PlexusContainer m_container;
    private ComponentLifecycle m_lifecycle;
    private ComponentModelManager m_modelManager;
    private LoggerManager m_loggerManager;
    
    public ComponentManager(PlexusContainer container, InputStream in) throws Exception {
        this.m_container = container;
        this.m_modelManager = new ComponentModelManager();
        this.m_lifecycle = new ComponentLifecycle(this);
        if(in != null) {
            this.m_modelManager.loadComponents(in);
        }
 
        this.m_modelManager.loadComponentsFromClasspath();
        this.m_loggerManager = (LoggerManager)this.lookup(new ComponentKey(LoggerManager.class, (String)null));
        this.register(new ComponentKey(PlexusContainer.class, (String)null), container);
        this.register(new ComponentKey(Logger.class, (String)null), this.m_loggerManager.getLoggerForComponent(""));
    }
}

 

我們也可以將我們自己寫的類交給容器管理,只需要將類配置到容器配置文件中,例如:cat-consumer/src/main/resources/META-INF/plexus/components-cat-consumer.xml, 只要是存在於 META-INF/plexus/ 目錄下,並且文件名以"components-" 開頭的 ".xml" 文件,都會被 ComponentModelManager 認為是容器配置文件。

圖2 - plexus IOC容器類配置文件

 

 c.然后就可以通過lookup方法找到類,並在首次使用的時候實例化,並且xml配置中的該類依賴的其它類也會被一並實例化,另外如果類方法實現了 Initializable 接口,創建對象后會執行類的 initialize() 方法做一些初始化的工作。

if(component instanceof Initializable) {
    try {
        ((Initializable)component).initialize();
    } catch (Throwable var5) {
        ComponentModel model = ctx.getComponentModel();
        throw new ComponentLookupException("Error when initializing component!", model.getRole(), model.getHint(), var5);
    }
}

 

 

模塊的加載 - 模型模式

init(...)函數最后會調用CatServlet的initComponents()方法初始化Module模塊。

圖3 - 模塊初始化類圖

 

initComponents()方法首先創建一個模塊上下文 DefaultModuleContext對象,該對象擁有plexus容器的指針,以及server.xml、client.xml配置文件信息 ,服務端配置server.xml中有消息存儲路徑、HDFS上傳等一系列配置,由於cat-home默認是服務端也是客戶端,也就是說cat-home自身也會被監控,所以我們在這里看到有client.xml配置,配置文件所在目錄由環境變量CAT_HOME指定,如果未指定,默認是/data/appdatas/cat。

  隨后CatServlet創建一個模塊初始器 DefaultModuleInitializer,並調用他的execute(ctx)方法創建並初始化模塊。

  注:DefaultModuleInitializer有一個模塊管理器DefaultModelManager m_manager, 讀者可能沒有看見m_manager的創建過程,實際上,對象在components-foundation-service.xml配置文件中配置的,然后在plexus容器實例化類對象的過程中創建的,后面還有很多對象的屬性也是通過plexus容器注入的。比如DefaultModuleManager的m_topLevelModules屬性通過以下配置注入。

<component>
    <role>org.unidal.initialization.ModuleManager</role>
    <implementation>org.unidal.initialization.DefaultModuleManager</implementation>
    <configuration>
        <topLevelModules>cat-home</topLevelModules>
    </configuration>
</component>

 

上面XML配置顯示m_topLevelModules 指定為cat-home,這樣DefaultModuleInitializer通過DefaultModelManager的getTopLevelModules()方法獲取的就是CatHomeModule模塊對象,可以認為cat-home是一個頂層模塊,所有Module都包含getDependencies方法,該方法會找到當前模塊所依賴的其他模塊,並實例化模塊,比如下面cat-home就依賴cat-consumer模塊,

public class CatHomeModule extends AbstractModule {
    @Override
    public Module[] getDependencies(ModuleContext ctx) {
        return ctx.getModules(CatConsumerModule.ID);
    }
}

從cat-consumer的getDependencies看出他依賴cat-core模塊,cat-core模塊又依賴cat-client模塊,這樣子我們就從頂層模塊引出了所有依賴的其它模塊,在實例化模塊的同時調用模塊的setup方法安裝模塊。在所有模塊安裝完成之后,依次調用模塊的execute方法完成初始化,但是初始化順序則是按照安裝順序反着來的,cat-client -> cat-core -> cat-consumer -> cat-home ,Modules之間的設計使用了典型的模板模式。

cat-home的setup

在上一章講到模塊初始化的時候, 講到setup安裝cat-home模塊,對於客戶端的請求的監聽處理,就是在這里完成的。

@Named(type = Module.class, value = CatHomeModule.ID)
public class CatHomeModule extends AbstractModule {
    @Override
    protected void setup(ModuleContext ctx) throws Exception {
        if (!isInitialized()) {
            File serverConfigFile = ctx.getAttribute("cat-server-config-file");
            ServerConfigManager serverConfigManager = ctx.lookup(ServerConfigManager.class);
            final TcpSocketReceiver messageReceiver = ctx.lookup(TcpSocketReceiver.class);
 
            serverConfigManager.initialize(serverConfigFile);
            messageReceiver.init();
 
            Runtime.getRuntime().addShutdownHook(new Thread() {
                @Override
                public void run() {
                    messageReceiver.destory();
                }
            });
        }
    }
}

 

1、讀取 server.xml 配置,裝進配置管理器(ServerConfigManager)。

2、創建消息接收器 final TcpSocketReceiver messageReceiver;

3、messageReceiver.init() 初始化服務,采用的經典的 netty reactor 模型。

4、注冊一個JVM關閉的鈎子,在進程掛掉的時候,執行一些清理現場的代碼。

 

TcpSocketReceiver--- netty reactor 模式的應用

我們來看看CatHomeModule對TcpSocketReceiver的初始化做了什么,如下源碼:

 

public final class TcpSocketReceiver implements LogEnabled {
    public void init() {
        try {
            startServer(m_port);
        } catch (Throwable e) {
            m_logger.error(e.getMessage(), e);
        }
    }
    
    public synchronized void startServer(int port) throws InterruptedException {
        boolean linux = getOSMatches("Linux") || getOSMatches("LINUX");
        int threads = 24;
        ServerBootstrap bootstrap = new ServerBootstrap();
 
        m_bossGroup = linux ? new EpollEventLoopGroup(threads) : new NioEventLoopGroup(threads);
        m_workerGroup = linux ? new EpollEventLoopGroup(threads) : new NioEventLoopGroup(threads);
        bootstrap.group(m_bossGroup, m_workerGroup);
        bootstrap.channel(linux ? EpollServerSocketChannel.class : NioServerSocketChannel.class);
 
        bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
                ChannelPipeline pipeline = ch.pipeline();
 
                pipeline.addLast("decode", new MessageDecoder());
            }
        });
 
        bootstrap.childOption(ChannelOption.SO_REUSEADDR, true);
        bootstrap.childOption(ChannelOption.TCP_NODELAY, true);
        bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
        bootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
 
        try {
            m_future = bootstrap.bind(port).sync();
            m_logger.info("start netty server!");
        } catch (Exception e) {
            m_logger.error("Started Netty Server Failed:" + port, e);
        }
    }
}

1、創建EventLoopGroup對象, EventLoopGroup是用來處理IO操作的多線程事件循環器,m_bossGroup作為一個acceptor負責接收來自客戶端的請求,然后分發給m_workerGroup用來所有的事件event和channel的IO。

2、創建ServerBootstrap對象,ServerBootstrap 是一個啟動Epoll(非Linux為NIO)服務的輔助啟動類,他將設置bossGroup和workerGroup兩個多線程時間循環器。

3、接下來的channel()方法設置了ServerBootstrap 的 ChannelFactory,這里傳入的參數是EpollServerSocketChannel.class (非Linux為NioServerSocketChannel.class),也就是說這個ChannelFactory創建的就是EpollServerSocketChannel/NioServerSocketChannel的實例。

    Channel是Netty的核心概念之一,它是Netty網絡通信的主體,他從EventLoopGroup獲得一個EventLoop,並注冊到該EventLoop,channel生命周期內都和該EventLoop在一起,由它負責對網絡通信連接的打開、關閉、連接和讀寫操作。如果是對於讀寫事件,執行線程調度pipeline來處理用戶業務邏輯。

4、接下來bootstrap.childHandler的目的是添加一個handler,用來監聽已經連接的客戶端的Channel的動作和狀態,傳入的 ChannelInitializer重寫了initChannel方法,這個方法在Channel被注冊到EventLoop的時候會被調用。

5、initChannel會創建ChannelPipeline對象,並調用addLast添加ChannelHandler。有網絡請求時,ChannelPipeline會調用ChannelHandler來處理,有ChannelInboundHandler和ChannelOutboundHandler兩種,ChannelPipeline會從頭到尾順序調用ChannelInboundHandler處理網絡請求內容,從尾到頭調用ChannelOutboundHandler處理網絡請求內容。這也是Netty用來靈活處理網絡請求的機制之一,因為使用的時候可以用多個decoder和encoder進行組合,從而適應不同的網絡協議。而且這種類似分層的方式可以讓每一個Handler專注於處理自己的任務而不用管上下游,這也是pipeline機制的特點。這跟TCP/IP協議中的五層和七層的分層機制有異曲同工之妙。

      在這里,ChannelPipeline添加的 ChannelHandler 是MessageDecoder ,MessageDecoder的祖先類實現了ChannelHandler接口,他本質上還是一個Handler,是網絡IO事件具體處理類,當客戶端將日志數據上傳到服務器之后,會交給MessageDecoder 解碼數據,然后進行后續處理。

6、調用 childOption 設置 channel 的參數。

7、最后調用bind()方法啟動服務。 

關於netty ,我就講到這里,網上關於netty框架的文章非常多,大家可以自行去查。

 

消息的解碼

 

 

  上一章我們講到Netty將接收到的消息交給 MessageDecoder 去做解碼,解碼是交由PlainTextMessageCodec對象將接收到的字節碼反序列化為MessageTree對象(所有的消息都是由消息樹來組織),具體的解碼邏輯在這里暫不做詳細闡述,在第三章我們會闡述編碼過程,解碼只是編碼的一個逆過程。

解碼之后調用 DefaultMessageHandler 的 handle方法對消息進行處理,handle方法就干了一件事情,就是調用 m_consumer.consume(tree) 方法去消費消息樹,在消費模塊,CAT實現了隊列化,異步化,在消息消費章節會詳細闡述。

    當然netty handler也是支持異步處理的,我們也可以將 DefaultMessageHandler 像 MessageDecoder那樣向netty注冊handler, 再由netty來做線程池分發。

public class MessageDecoder extends ByteToMessageDecoder {
    
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception {
        if (buffer.readableBytes() < 4) {
            return;
        }
        buffer.markReaderIndex();
        int length = buffer.readInt();
        
        ...
        
        ByteBuf readBytes = buffer.readBytes(length + 4);
        
        ...
        
        DefaultMessageTree tree = (DefaultMessageTree) m_codec.decode(readBytes);
        
        readBytes.resetReaderIndex();
        tree.setBuffer(readBytes);
        m_handler.handle(tree);
        m_processCount++;
        
        ...
    }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM