Cat模塊
Cat-client : cat客戶端,編譯后生成 cat-client-2.0.0.jar ,用戶可以通過它來向cat-home上報統一格式的日志信息,可以集成到 mybatis、spring、微服務 dubbo 的監控等等流行框架。
Cat-consumer: 用於實時分析從客戶端提供的數據。在實際開發和部署中,Cat-consumer和Cat-home是部署在一個JVM內部,每個CAT服務端都可以作為consumer也可以作為home,這樣既能減少整個層級結構,也可以增加系統穩定性。
Cat-core:Cat核心模塊
Cat-hadoop : 大數據統計依賴模塊。
cat-home:大眾點評CAT服務器端主程序,編譯安裝之后生成 cat-alpha-2.0.0.war 包部署於servlet容器中,我們用的是Tomcat,war包依賴cat-client.jar、cat-consumer.jar, cat-core.jar, cat-hadoop.jar 包,通過web.xml 配置,看到Cat會啟動 cat-servlet 和 mvc-servlet , mvc-servlet 是一個類似 spring MVC 的框架,用於處理用戶WEB管理平台請求。cat-servlet是CAT服務端監聽入口,CAT會在這里開啟監聽端口,接收處理客戶端的日志記錄請求,本章主要介紹cat-servlet。
<?xml version="1.0" encoding="UTF-8"?> <web-app xmlns="http://java.sun.com/xml/ns/javaee" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd" version="2.5"> ... <servlet> <servlet-name>cat-servlet</servlet-name> <servlet-class>com.dianping.cat.servlet.CatServlet</servlet-class> <load-on-startup>1</load-on-startup> </servlet> <servlet> <servlet-name>mvc-servlet</servlet-name> <servlet-class>org.unidal.web.MVC</servlet-class> <init-param> <param-name>cat-client-xml</param-name> <param-value>client.xml</param-value> </init-param> <init-param> <param-name>init-modules</param-name> <param-value>false</param-value> </init-param> <load-on-startup>2</load-on-startup> </servlet> ....
Cat-servlet初始化
圖1 - 容器初始化類圖
CatServlet 首先會調用父類 AbstractContainerServlet 的init方法做初始化工作, 可以認為這是CatServlet的入口,他主要做了3件事情,首先調用基類HttpServlet的init方法對Servlet進行初始化,然后初始化Plexus容器,最后調用子類initComponents初始化Module模塊。
public abstract class AbstractContainerServlet extends HttpServlet { public void init(ServletConfig config) throws ServletException { super.init(config); try { if(this.m_container == null) { this.m_container = ContainerLoader.getDefaultContainer(); } this.m_logger = this.m_container.getLogger(); this.initComponents(config); } catch (Exception var3) { ... } } }
plexus - IOC容器
上面講到init(...)方法在初始化完Servlet之后調用 ContainerLoader.getDefaultContainer() 初始化plexus容器。
注:這里可能大家不太了解plexus,它相當於Spring的IoC容器,但是它和Spring框架不同,它並不是一個完整的,擁有各種組件的大型框架,僅僅是一個純粹的IoC容器,它的開發者與Maven的開發者是同一撥人,最初開發Maven的時候,Spring並不成熟,所以Maven的開發者決定使用自己維護的IoC容器Plexus,它與Spring在語法和描述方式稍有不同。在Plexus中,有ROLE的概念,相當於Spring中的一個Bean。支持組件生命周期管理。
非JAVA開發者不懂IOC容器?簡單來說,IOC容器就相當於一個對象裝載器,對象不是由程序員new創建,而是框架在初始化的時候從配置文件中讀取需要實例化的類信息,將信息裝入一個對象裝載器,然后在需要的時候,從對象裝載器中找是否存在該類的信息,存在則返回類的對象。
plexus容器是如何工作的呢?就上面的類圖來說,
a. AbstractContainerServlet 通過容器工廠ContainerLoader 的 getDefaultContainer方法,該方法會創建 MyPlexusContainer 容器,MyPlexusContainer是接口 PlexusContainer 的實現,MyPlexusContainer在構造函數中會創建組件管理器(ComponentManager),可以認為每個類都是容器中的一個組件,ComponentManager就是用來管理這些組件的,包括他的生命周期,組件在Plexus容器配置文件中配置。
b.組件管理器(ComponentManager)會創建組件模型管理器(ComponentModelManager)以及組件生命周期管理器(ComponentLifecycle),ComponentModelManager用於存儲Plexus容器配置文件中的所有component組件信息,它的loadComponentsFromClasspath()方法會掃描各個jar包中存在的plexus容器配置文件,如圖2,將xml內容解析之后放入PlexusModel 列表中。
public class ComponentManager { private Map<String, ComponentBox<?>> m_components = new HashMap(); private PlexusContainer m_container; private ComponentLifecycle m_lifecycle; private ComponentModelManager m_modelManager; private LoggerManager m_loggerManager; public ComponentManager(PlexusContainer container, InputStream in) throws Exception { this.m_container = container; this.m_modelManager = new ComponentModelManager(); this.m_lifecycle = new ComponentLifecycle(this); if(in != null) { this.m_modelManager.loadComponents(in); } this.m_modelManager.loadComponentsFromClasspath(); this.m_loggerManager = (LoggerManager)this.lookup(new ComponentKey(LoggerManager.class, (String)null)); this.register(new ComponentKey(PlexusContainer.class, (String)null), container); this.register(new ComponentKey(Logger.class, (String)null), this.m_loggerManager.getLoggerForComponent("")); } }
我們也可以將我們自己寫的類交給容器管理,只需要將類配置到容器配置文件中,例如:cat-consumer/src/main/resources/META-INF/plexus/components-cat-consumer.xml, 只要是存在於 META-INF/plexus/ 目錄下,並且文件名以"components-" 開頭的 ".xml" 文件,都會被 ComponentModelManager 認為是容器配置文件。
圖2 - plexus IOC容器類配置文件
c.然后就可以通過lookup方法找到類,並在首次使用的時候實例化,並且xml配置中的該類依賴的其它類也會被一並實例化,另外如果類方法實現了 Initializable 接口,創建對象后會執行類的 initialize() 方法做一些初始化的工作。
if(component instanceof Initializable) { try { ((Initializable)component).initialize(); } catch (Throwable var5) { ComponentModel model = ctx.getComponentModel(); throw new ComponentLookupException("Error when initializing component!", model.getRole(), model.getHint(), var5); } }
模塊的加載 - 模型模式
init(...)函數最后會調用CatServlet的initComponents()方法初始化Module模塊。
圖3 - 模塊初始化類圖
initComponents()方法首先創建一個模塊上下文 DefaultModuleContext對象,該對象擁有plexus容器的指針,以及server.xml、client.xml配置文件信息 ,服務端配置server.xml中有消息存儲路徑、HDFS上傳等一系列配置,由於cat-home默認是服務端也是客戶端,也就是說cat-home自身也會被監控,所以我們在這里看到有client.xml配置,配置文件所在目錄由環境變量CAT_HOME指定,如果未指定,默認是/data/appdatas/cat。
隨后CatServlet創建一個模塊初始器 DefaultModuleInitializer,並調用他的execute(ctx)方法創建並初始化模塊。
注:DefaultModuleInitializer有一個模塊管理器DefaultModelManager m_manager, 讀者可能沒有看見m_manager的創建過程,實際上,對象在components-foundation-service.xml配置文件中配置的,然后在plexus容器實例化類對象的過程中創建的,后面還有很多對象的屬性也是通過plexus容器注入的。比如DefaultModuleManager的m_topLevelModules屬性通過以下配置注入。
<component> <role>org.unidal.initialization.ModuleManager</role> <implementation>org.unidal.initialization.DefaultModuleManager</implementation> <configuration> <topLevelModules>cat-home</topLevelModules> </configuration> </component>
上面XML配置顯示m_topLevelModules 指定為cat-home,這樣DefaultModuleInitializer通過DefaultModelManager的getTopLevelModules()方法獲取的就是CatHomeModule模塊對象,可以認為cat-home是一個頂層模塊,所有Module都包含getDependencies方法,該方法會找到當前模塊所依賴的其他模塊,並實例化模塊,比如下面cat-home就依賴cat-consumer模塊,
public class CatHomeModule extends AbstractModule { @Override public Module[] getDependencies(ModuleContext ctx) { return ctx.getModules(CatConsumerModule.ID); } }
從cat-consumer的getDependencies看出他依賴cat-core模塊,cat-core模塊又依賴cat-client模塊,這樣子我們就從頂層模塊引出了所有依賴的其它模塊,在實例化模塊的同時調用模塊的setup方法安裝模塊。在所有模塊安裝完成之后,依次調用模塊的execute方法完成初始化,但是初始化順序則是按照安裝順序反着來的,cat-client -> cat-core -> cat-consumer -> cat-home ,Modules之間的設計使用了典型的模板模式。
cat-home的setup
在上一章講到模塊初始化的時候, 講到setup安裝cat-home模塊,對於客戶端的請求的監聽處理,就是在這里完成的。
@Named(type = Module.class, value = CatHomeModule.ID) public class CatHomeModule extends AbstractModule { @Override protected void setup(ModuleContext ctx) throws Exception { if (!isInitialized()) { File serverConfigFile = ctx.getAttribute("cat-server-config-file"); ServerConfigManager serverConfigManager = ctx.lookup(ServerConfigManager.class); final TcpSocketReceiver messageReceiver = ctx.lookup(TcpSocketReceiver.class); serverConfigManager.initialize(serverConfigFile); messageReceiver.init(); Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { messageReceiver.destory(); } }); } } }
1、讀取 server.xml 配置,裝進配置管理器(ServerConfigManager)。
2、創建消息接收器 final TcpSocketReceiver messageReceiver;
3、messageReceiver.init() 初始化服務,采用的經典的 netty reactor 模型。
4、注冊一個JVM關閉的鈎子,在進程掛掉的時候,執行一些清理現場的代碼。
TcpSocketReceiver--- netty reactor 模式的應用
我們來看看CatHomeModule對TcpSocketReceiver的初始化做了什么,如下源碼:
public final class TcpSocketReceiver implements LogEnabled { public void init() { try { startServer(m_port); } catch (Throwable e) { m_logger.error(e.getMessage(), e); } } public synchronized void startServer(int port) throws InterruptedException { boolean linux = getOSMatches("Linux") || getOSMatches("LINUX"); int threads = 24; ServerBootstrap bootstrap = new ServerBootstrap(); m_bossGroup = linux ? new EpollEventLoopGroup(threads) : new NioEventLoopGroup(threads); m_workerGroup = linux ? new EpollEventLoopGroup(threads) : new NioEventLoopGroup(threads); bootstrap.group(m_bossGroup, m_workerGroup); bootstrap.channel(linux ? EpollServerSocketChannel.class : NioServerSocketChannel.class); bootstrap.childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("decode", new MessageDecoder()); } }); bootstrap.childOption(ChannelOption.SO_REUSEADDR, true); bootstrap.childOption(ChannelOption.TCP_NODELAY, true); bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true); bootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); try { m_future = bootstrap.bind(port).sync(); m_logger.info("start netty server!"); } catch (Exception e) { m_logger.error("Started Netty Server Failed:" + port, e); } } }
1、創建EventLoopGroup對象, EventLoopGroup是用來處理IO操作的多線程事件循環器,m_bossGroup作為一個acceptor負責接收來自客戶端的請求,然后分發給m_workerGroup用來所有的事件event和channel的IO。
2、創建ServerBootstrap對象,ServerBootstrap 是一個啟動Epoll(非Linux為NIO)服務的輔助啟動類,他將設置bossGroup和workerGroup兩個多線程時間循環器。
3、接下來的channel()方法設置了ServerBootstrap 的 ChannelFactory,這里傳入的參數是EpollServerSocketChannel.class (非Linux為NioServerSocketChannel.class),也就是說這個ChannelFactory創建的就是EpollServerSocketChannel/NioServerSocketChannel的實例。
Channel是Netty的核心概念之一,它是Netty網絡通信的主體,他從EventLoopGroup獲得一個EventLoop,並注冊到該EventLoop,channel生命周期內都和該EventLoop在一起,由它負責對網絡通信連接的打開、關閉、連接和讀寫操作。如果是對於讀寫事件,執行線程調度pipeline來處理用戶業務邏輯。
4、接下來bootstrap.childHandler的目的是添加一個handler,用來監聽已經連接的客戶端的Channel的動作和狀態,傳入的 ChannelInitializer重寫了initChannel方法,這個方法在Channel被注冊到EventLoop的時候會被調用。
5、initChannel會創建ChannelPipeline對象,並調用addLast添加ChannelHandler。有網絡請求時,ChannelPipeline會調用ChannelHandler來處理,有ChannelInboundHandler和ChannelOutboundHandler兩種,ChannelPipeline會從頭到尾順序調用ChannelInboundHandler處理網絡請求內容,從尾到頭調用ChannelOutboundHandler處理網絡請求內容。這也是Netty用來靈活處理網絡請求的機制之一,因為使用的時候可以用多個decoder和encoder進行組合,從而適應不同的網絡協議。而且這種類似分層的方式可以讓每一個Handler專注於處理自己的任務而不用管上下游,這也是pipeline機制的特點。這跟TCP/IP協議中的五層和七層的分層機制有異曲同工之妙。
在這里,ChannelPipeline添加的 ChannelHandler 是MessageDecoder ,MessageDecoder的祖先類實現了ChannelHandler接口,他本質上還是一個Handler,是網絡IO事件具體處理類,當客戶端將日志數據上傳到服務器之后,會交給MessageDecoder 解碼數據,然后進行后續處理。
6、調用 childOption 設置 channel 的參數。
7、最后調用bind()方法啟動服務。
關於netty ,我就講到這里,網上關於netty框架的文章非常多,大家可以自行去查。
消息的解碼
上一章我們講到Netty將接收到的消息交給 MessageDecoder 去做解碼,解碼是交由PlainTextMessageCodec對象將接收到的字節碼反序列化為MessageTree對象(所有的消息都是由消息樹來組織),具體的解碼邏輯在這里暫不做詳細闡述,在第三章我們會闡述編碼過程,解碼只是編碼的一個逆過程。
解碼之后調用 DefaultMessageHandler 的 handle方法對消息進行處理,handle方法就干了一件事情,就是調用 m_consumer.consume(tree) 方法去消費消息樹,在消費模塊,CAT實現了隊列化,異步化,在消息消費章節會詳細闡述。
當然netty handler也是支持異步處理的,我們也可以將 DefaultMessageHandler 像 MessageDecoder那樣向netty注冊handler, 再由netty來做線程池分發。
public class MessageDecoder extends ByteToMessageDecoder { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception { if (buffer.readableBytes() < 4) { return; } buffer.markReaderIndex(); int length = buffer.readInt(); ... ByteBuf readBytes = buffer.readBytes(length + 4); ... DefaultMessageTree tree = (DefaultMessageTree) m_codec.decode(readBytes); readBytes.resetReaderIndex(); tree.setBuffer(readBytes); m_handler.handle(tree); m_processCount++; ... } }