中間件、MetaQ入門學習


目錄

1. 中間件技術
2. MetaQ中間件
3. MetaQ編程實踐

 

1. 中間件技術

0x1: 中間件簡介

中間件(Middleware)是提供系統軟件和應用軟件之間連接的軟件,以便於軟件各部件之間的溝通,特別是應用軟件對於系統軟件的集中的邏輯,在現代信息技術應用框架如Web服務、面向服務的體系結構等中應用比較廣泛,如:

1. 數據庫
2. Apache的Tomcat
3. IBM公司的WebSphere
4. BEA公司的WebLogic[[應用服務器]
5. 東方通公司的Tong系列中間件
6. Kingdee公司的等

都屬於中間件,中間件技術本質上就是在計算機系統不同層次的模塊之間的異構、跨協議的通信問題
嚴格來講,中間件技術已經不局限於應用服務器、數據庫服務器。中間件技術創建在對應用軟件部分常用功能的抽象上,將常用且重要的

1. 過程調用
2. 分布式組件
3. 消息隊列
4. 事務
5. 安全
6. 連結器
7. 商業流程
8. 網絡並發
9. HTTP服務器
10. Web Service

等功能集於一身或者分別在不同產品中分別完成 ,我國學術界一般認可的定義是中間件是指網絡環境下處於操作系統、數據庫等系統軟件和應用軟件之間的一種起連接作用的分布式軟件,主要解決異構網絡環境下分布式應用軟件的互連與互操作問題,提供標准接口、協議,屏蔽實現細節,提高應用系統易移植性

0x2: 中間件的特征(內涵)

總的來說,中間件有幾個非常重要的特征

1. 平台化
所謂平台就是能夠獨立運行並自主存在,為其所支撐的上層系統和應用提供運行所依賴的環境。中間件是一個平台,因此中間件是必須獨立存在,是運行時刻的"系統軟件",它為上層的網絡應用系統提供一個運行環境,並通過標准的接口和API來隔離其支撐的系統,實現其獨立性,也就是平台性。J2EE應用服務器提供JAVA應用的運行環境,就是經典的中間件

2. 應用支撐
中間件的最終目的是解決上層應用系統的問題,而且也是軟件技術發展到今天對應用軟件提供最完善徹底的解決方案。
    1) 高級程序設計語言的發明,使得軟件開發變成一個獨立的科學和技術體系,而操作系統平台的出現,使得應用軟件通過標准的API接口,實現了軟件與硬件的分離。
      2) 現代面向服務(SOA)的中間件在軟件的模型、結構、互操作以及開發方法等四個方面提供了更強的應用支撐能力:
          2.1) 模型:構件模型彈性粒度化
        通過抽象層度更高的構件模型,實現具備更高結構獨立性、內容自包含性和業務完整性的可復用構件,即服務(RESTFUL API就是一個最好的例子)。並且在細粒度服務基礎上,提供了更粗粒度的服務封裝方式,即業務層面的封裝,形成業務組件,就可以實現從組件模型到業務模型的全生命周期企業建模的能力。
          2.2) 結構:結構松散化
        將"服務描述""服務功能實現"分離,將"服務的使用者""提供者"分離,從而避免分布式應用系統構建和集成時常見的技術、組織、時間等不良約束。
          2.3) 互操作:交互過程標准化
        將與互操作相關的內容進行標准化定義,如服務封裝、描述、發布、發現、調用等契約,通信協議以及數據交換格式等等。最終實現訪問互操作、連接互操作和語義互操作(RESTFUL、SOAP、WPF、WebService、SCA/SDO)
          2.4) 開發集成方法:
        應用系統的構建方式由代碼編寫轉為主要通過服務間的快捷組合及編排,完成更為復雜的業務邏輯的按需提供和改善,從而大大簡化和加速應用系統的搭建及重構過程(一種典型的輕耦合思想)而要最終解決軟件的質量問題、效率問題、互操作問題、靈活應變問題這四大問題,需要在軟件技術的內在結構(Structure)、架構(Architecture)層面進行思考。解決這些問題,技術的本質是復用、松耦合、互操作(標准)等軟件技術的內在機制。這也是中間件技術和產品的本質特征

3. 軟件復用
軟件復用,即軟件的重用,也叫再用,是指同一事物不作修改或稍加改動就多次重復使用。從軟件復用技術的發展來看,就是不斷提升抽象級別,擴大復用范圍。最早的復用技術是子程序,人們發明子程序,就可以在不同系統之間進行復用了。但是,子程序是最原始的復用,因為這種復用范圍是一個可執行程序內復用,靜態開發期復用,如果子程序修改,意味着所有調用這個子程序的程序必須重新編譯、測試和發布
復用對象復用范圍
    1) 子程序: 一個可執行程序內復用: 靜態開發期復用
    2) 組件(DLL、Com..): 系統內復用,動態運行期復用
    3) 企業對象組件(Com+、.NET、EJB..): 企業網絡內復用,不同系統之間復用
    4) 服務(RESTFUL、SOAP、WPF、WebService、SCA/SDO): 不同企業之間、跨系統、跨異構環境復用,動態可配置

4. 耦合關系 
    1) 分布式對象技術將"連接邏輯"進行分離
    2) 消息中間件將"連接邏輯"進行異步處理,增加了更大的靈活性
    3) 消息代理和一些分布式對象中間件將數據轉換也進行了分離
    4) 而SOA架構,通過服務的封裝,實現了業務邏輯與網絡連接、數據轉換等進行完全的解耦

5. 互操作性
在軟件的互操作方面,傳統中間件只是實現了訪問互操作,即通過標准化的API實現了同類系統之間的調用互操作,而連接互操作還是依賴於特定的訪問協議,如JAVA使用RMI,CORBA使用IIOP等。而SOA通過標准的、支持Internet、與操作系統無關的SOAP協議實現了連接互操作。而且,服務的封裝是采用XML協議,具有自解析和自定義的特性,這樣,基於SOA的中間件還可以實現語義互操作
總之,服務化體現的是中間件在完整業務復用、靈活業務組織方面的發展趨勢,其核心目標是提升IT基礎設施的業務敏捷性。因此,中間件將成為SOA的主要實現平台

Relevant Link:

http://zh.wikipedia.org/wiki/%E4%B8%AD%E9%97%B4%E4%BB%B6
http://kb.cnblogs.com/page/196448/
http://jm.taobao.org/

 

2. MetaQ中間件

0x1: MetaQ的應用場景

假設我們有這么一個應用場景,為了完成一個用戶注冊操作,可能需要將用戶信息寫入到用戶庫中,然后通知給紅包中心給用戶發新手紅包,然后還需要通知博客系統給用戶准備對應的博客賬號,進行合法性驗證,告知SNS系統給用戶導入新的用戶等10步操作。
那么針對這個場景,一個最簡單的設計方法就是串行的執行整個流程

//全稱串行操作
用戶注冊->通知給紅包中心給用戶發新手紅包->博客系統給用戶准備對應的博客賬號->進行合法性驗證->告知SNS系統給用戶導入新的用戶 

通過業務分析我們能夠得知,用戶的實際的核心流程其實只有一個,就是用戶注冊。而后續的准備博客帳號,通知SNS等操作雖然必須要完成,但卻是不需要讓用戶等待的。
這種模式有個專業的名詞,就叫"最終一致",即實際上這並不是一個嚴格強制的串行操作,從業務的角度上來說,有很多步驟完全是可以異步完成的,只要最終的結果是"最終一致"的就可以

0x2: MetaQ技術原理

METAQ是一款完全的隊列模型消息中間件,服務器使用Java語言編寫,可在多種軟硬件平台上部署。客戶端支持Java、C++編程語言

MetaQ對外提供的是一個隊列服務,內部實現也是完全的隊列模型,這里的隊列是持久化的磁盤隊列,具有非常高的可靠性,並且充分利用了操作系統cache來提高性能

1. MetaQ是一個隊列模型的消息中間件,具有高性能、高可靠、高實時、分布式特點。
2. Producer、Consumer、隊列都可以分布式。
3. 能夠保證嚴格的消息順序
4. 提供豐富的消息拉取模式
5. 高效的訂閱者水平擴展能力
6. 實時的消息訂閱機制
7. 億級消息堆積能力

MetaQ的存儲結構是根據大規模互聯網應用需求,完全重新設計的一套存儲結構,使用這套存儲結構可以支持上萬的隊列模型,並且可以支持消息查詢、分布式事務、定時隊列等功能

MetaQ內部大部分功能都靠隊列來驅動,那么必須支持足夠多的隊列,才能更好的滿足業務需求,MetaQ可以在單機支持上萬隊列,這里的隊列全部為持久化磁盤方式,從而對IO性能提出了挑戰。MetaQ是這樣解決的

1. Message全部寫入到一個獨立的隊列,完全的順序寫
2. Message在文件的位置信息寫入到另外的文件,串行方式寫

通過以上方式,既做到數據可靠,又可以支持更多的隊列

Relevant Link:

http://blog.csdn.net/blogdevteam/article/details/8449916
https://github.com/killme2008/Metamorphosis/wiki
http://m.bianceng.cn/web/Skills/201407/42090.htm
http://www.bkjia.com/ASPjc/871354.html 

 

3. MetaQ編程實踐

消息中間件中有兩個角色: "消息生產者(Producer)"和"消息消費者(Consumer)"。Meta里同樣有這兩個概念,消息生產者負責創建消息並發送到Meta服務器(Broker),Meta服務器會將消息持久化到磁盤,消息消費者從Meta服務器拉取消息並提交給應用消費

回顧我們之前說的MetaQ的架構圖

要使用MetaQ進行分布式消息通信編程學習,就必須要實現最基本的架構搭建

0x1: 配置Zookeeper集群

MetaQ使用zookeeper發布和訂閱服務,並默認使用zookeeper存儲消費者offset,因此,你需要首先安裝一個zookeeper到某台機器上,或者使用某個現有的zk集群

/mate-queue/taobao/metamorphosis-server-wrapper/conf/server.ini

使用內置的zookeeper服務器進行搭建

[zookeeper]
zk.zkConnect=localhost:2181
zk.zkSessionTimeoutMs=30000
zk.zkConnectionTimeoutMs=30000
zk.zkSyncTimeMs=5000

0x2: 啟動Zookeeper

//停止local模式啟動的broker1,並重新以集群模式啟動
bin/metaServer.sh start 

0x3: 引入MetaQ需要依賴的JAR包

http://fnil.net/downloads/index.html

包括client、server的

0x4: Producer.java

package com.taobao.metamorphosis.example;

import java.io.BufferedReader;
import java.io.InputStreamReader;

import java.io.BufferedReader;
import java.io.InputStreamReader;

import com.taobao.metamorphosis.Message;
import com.taobao.metamorphosis.client.MessageSessionFactory;
import com.taobao.metamorphosis.client.MetaClientConfig;
import com.taobao.metamorphosis.client.MetaMessageSessionFactory;
import com.taobao.metamorphosis.client.producer.MessageProducer;
import com.taobao.metamorphosis.client.producer.SendResult;
import com.taobao.metamorphosis.utils.ZkUtils.ZKConfig;

public class Producer {
    public static void main(String[] args) throws Exception {
        final MetaClientConfig metaClientConfig = new MetaClientConfig();
        final ZKConfig zkConfig = new ZKConfig();
        //設置zookeeper地址
        zkConfig.zkConnect = "192.168.207.128:2181";
        metaClientConfig.setZkConfig(zkConfig);
        // New session factory,強烈建議使用單例
        MessageSessionFactory sessionFactory = new MetaMessageSessionFactory(metaClientConfig);
        /*
         *  create producer,強烈建議使用單例
         *  消息生產者的接口是MessageProducer,你可以通過它來發送消息
         */
        MessageProducer producer = sessionFactory.createProducer();
        // publish topic
        final String topic = "test";
        /*
         * 這一步在發送消息前是必須的,你必須發布你將要發送消息的topic
         * 這是為了讓會話工廠幫你去查找接收這些topic的meta服務器地址並初始化連接
         * 這個步驟針對每個topic只需要做一次,多次調用無影響
         */
        producer.publish(topic);

        BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
        String line = null;
        while ((line = reader.readLine()) != null) 
        {
            /*
             * send message
             * 在Meta里,每個消息對象都是Message類的實例,Message表示一個消息對象,它包含這么幾個屬性:
             * 1) id: Long型的消息id,消息的唯一id,系統自動產生,用戶無法設置,在發送成功后由服務器返回,發送失敗則為0。
             * 2) topic: 消息的主題,訂閱者訂閱該主題即可接收發送到該主題下的消息,生產者通過指定發布的topic查找到需要連接的服務器地址,必須。
             * 3) data: 消息的有效載荷,二進制數據,也就是消息內容,meta永遠不會修改消息內容,你發送出去是什么樣子,接收到就是什么樣子。消息內容通常限制在1M以內,我的建議是最好不要發送超過上百K的消息,必須。數據是否壓縮也完全取決於用戶。
             * 4) attribute: 消息屬性,一個字符串,可選。發送者可設置消息屬性來讓消費者過濾。
             */
            SendResult sendResult = producer.sendMessage(new Message(topic, line.getBytes()));
            // check result
            if (!sendResult.isSuccess()) 
            {
                System.err.println("Send message failed,error message:" + sendResult.getErrorMessage());
            }
            else {
                System.out.println("Send message successfully,sent to " + sendResult.getPartition());
            }
        }
    }

}

0x5: AsyncConsumer.java

package com.taobao.metamorphosis.example;

import java.util.concurrent.Executor;

import com.taobao.metamorphosis.Message;
import com.taobao.metamorphosis.client.MessageSessionFactory;
import com.taobao.metamorphosis.client.MetaClientConfig;
import com.taobao.metamorphosis.client.MetaMessageSessionFactory;
import com.taobao.metamorphosis.client.consumer.ConsumerConfig;
import com.taobao.metamorphosis.client.consumer.MessageConsumer;
import com.taobao.metamorphosis.client.consumer.MessageListener;
import com.taobao.metamorphosis.utils.ZkUtils.ZKConfig;

public class AsyncConsumer {
    public static void main(String[] args) throws Exception {
        final MetaClientConfig metaClientConfig = new MetaClientConfig();
        final ZKConfig zkConfig = new ZKConfig();
        //設置zookeeper地址
        zkConfig.zkConnect = "192.168.207.128:2181";
        metaClientConfig.setZkConfig(zkConfig);
        // New session factory,強烈建議使用單例
        MessageSessionFactory sessionFactory = new MetaMessageSessionFactory(metaClientConfig);
        // subscribed topic
        final String topic = "test";
        // consumer group
        final String group = "meta-example";
        /*
         * create consumer,強烈建議使用單例
         * 通過createConsumer方法來創建MessageConsumer,注意到我們傳入一個ConsumerConfig參數,
         * 這是消費者的配置對象。每個消息者都必須有一個ConsumerConfig配置對象,
         * 我們這里只設置了group屬性,這是消費者的分組名稱。
         * Meta的Producer、Consumer和Broker都可以為集群。
         * 消費者可以組成一個集群共同消費同一個topic,發往這個topic的消息將按照一定的負載均衡規則發送給集群里的一台機器。
         * 同一個消費者集群必須擁有同一個分組名稱,也就是同一個group。我們這里將分組名稱設置為meta-example
         */
        MessageConsumer consumer = sessionFactory.createConsumer(new ConsumerConfig(group));
        /*
         * subscribe topic
         * 訂閱消息通過subscribe方法,這個方法接受三個參數 
         * 1) topic,訂閱的主題
         * 2) maxSize,因為meta是一個消費者主動拉取的模型,這個參數規定每次拉取的最大數據量,單位為字節,這里設置為1M,默認最大為1M。
         * 3) MessageListener,消息監聽器,負責消息消息。
         */
        consumer.subscribe(topic, 1024 * 1024, new MessageListener() {

            public void recieveMessages(Message message) {
                System.out.println("Receive message " + new String(message.getData()));
            }


            public Executor getExecutor() {
                // Thread pool to process messages,maybe null.
                return null;
            }
        });
        // complete subscribe
        consumer.completeSubscribe();
    }
}

Relevant Link:

https://github.com/killme2008/Metamorphosis/wiki/%E5%A6%82%E4%BD%95%E5%BC%80%E5%A7%8B
http://www.it165.net/admin/html/201402/2409.html

 

Copyright (c) 2014 LittleHann All rights reserved

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM