【轉】Spring mvc集成ZBUS--輕量級MQ、RPC、服務總線


本文轉自:http://www.cnblogs.com/top15from/p/4899954.html

ZBUS = MQ + RPC + PROXY

  • 支持消息隊列, 發布訂閱, RPC, 代理(TCP/DMZ)
  • 億級消息堆積能力、支持HA高可用
  • 單個Jar包無依賴 ~300K
  • 服務代理 -- 適配改造已有業務系統,使之具備跨平台與語言
  • 豐富的API--JAVA/C/C++/C#/Python/Node.JS多語言接入

zbus-dist選擇zbus.sh或者zbus.bat直接執行

簡單監控

總線默認占用 15555 端口, http://localhost:15555 可以直接進入監控,注意zbus因為原生兼容HTTP協議所以監控與消息隊列使用同一個端口

ZBUS 角色概要

zbus-arch

ZBUS 消息通訊基礎(NET模塊)

ZBUS項目不依賴其他第三方庫,消息通訊基於NIO完成(NET子項目)。NET包對NIO做了簡潔的封裝,相對Netty而言,學習成本低幾個量級,模型簡單,但不失擴展性。

znet-arch

框架結構保持 Dispatcher + N SelectorThread + IoAdaptor

Dispatcher 負責管理N個Selector線程

SelectorThread 負責NIO讀寫事件分發

IoAdaptor 個性化讀寫事件

基於NET的服務器程序基本只要關心IoAdaptor的個性化,比如ZBUS入口就是MqAdaptor

ZBUS API

ZBUS PROXY

ZBUS 示例

Java Maven 依賴

<dependency>
    <groupId>org.zbus</groupId>
    <artifactId>zbus</artifactId>
    <version>6.2.6</version>
</dependency>

生產者

public static void main(String[] args) throws Exception { 
    //創建Broker代理
    BrokerConfig config = new BrokerConfig();
    config.setServerAddress("127.0.0.1:15555");
    final Broker broker = new SingleBroker(config);

    Producer producer = new Producer(broker, "MyMQ");
    producer.createMQ(); // 如果已經確定存在,不需要創建

    //創建消息,消息體可以是任意binary,應用協議交給使用者
    Message msg = new Message();
    msg.setBody("hello world");
    producer.sendSync(msg);  

    broker.close();
}

消費者

public static void main(String[] args) throws Exception{  
    //創建Broker代表
    BrokerConfig brokerConfig = new BrokerConfig();
    brokerConfig.setServerAddress("127.0.0.1:15555");
    Broker broker = new SingleBroker(brokerConfig);

    MqConfig config = new MqConfig(); 
    config.setBroker(broker);
    config.setMq("MyMQ");

    //創建消費者
    @SuppressWarnings("resource")
    Consumer c = new Consumer(config);  

    c.onMessage(new MessageHandler() { 
        @Override
        public void handle(Message msg, Session sess) throws IOException {
            System.out.println(msg);
        }
    });

    //啟動消費線程
    c.start();   

}  

RPC動態代理【各類復雜類型】

參考源碼test目下的rpc部分

    //1)創建Broker代表(可用高可用替代)
    BrokerConfig config = new BrokerConfig();
    config.setServerAddress("127.0.0.1:15555");
    Broker broker = new SingleBroker(config);

    //2)創建基於MQ的Invoker以及Rpc工廠,指定RPC采用的MQ為MyRpc
    MqInvoker invoker = new MqInvoker(broker, "MyRpc"); 
    RpcFactory factory = new RpcFactory(invoker); 

    //3) 動態代理出實現類
    Interface hello = factory.getService(Interface.class);

    test(hello);  

    broker.close();

Spring集成--服務端(RPC示例)

無任何代碼侵入使得你已有的業務接口接入到zbus,獲得跨平台和多語言支持

<!-- 暴露的的接口實現示例 -->
<bean id="interface" class="org.zbus.rpc.biz.InterfaceImpl"></bean>

<bean id="serviceProcessor" class="org.zbus.rpc.RpcProcessor">
    <constructor-arg>
        <list>
            <!-- 放入你需要的暴露的的接口 -->
            <ref bean="interface"/>
        </list>
    </constructor-arg>
</bean>

<bean id="broker" class="org.zbus.broker.SingleBroker">
    <constructor-arg>
        <bean class="org.zbus.broker.BrokerConfig">
            <property name="serverAddress" value="127.0.0.1:15555" />
            <property name="maxTotal" value="20"/>
            <!-- 這里可以增加連接池參數配置,不配置使用默認值(參考commons-pool2) -->
        </bean>
    </constructor-arg>
</bean>

<!-- 默認調用了start方法,由Spring容器直接帶起來注冊到zbus總線上 -->
<bean id="myrpcService" class="org.zbus.rpc.mq.Service" init-method="start">
    <constructor-arg>  
        <bean class="org.zbus.rpc.mq.ServiceConfig">
            <!-- 支持多總線注冊 -->
            <constructor-arg> 
                <list>
                    <ref bean="broker"/> 
                </list>
            </constructor-arg>  
            <property name="mq" value="MyRpc"/>
            <property name="consumerCount" value="2"/> 
            <property name="messageProcessor" ref="serviceProcessor"/>
        </bean>
    </constructor-arg>
</bean>

Spring集成--客戶端

<bean id="broker" class="org.zbus.broker.SingleBroker">
    <constructor-arg>
        <bean class="org.zbus.broker.BrokerConfig">
            <property name="serverAddress" value="127.0.0.1:15555" /> 
        </bean>
    </constructor-arg>
</bean>

<bean id="myrpc" class="org.zbus.rpc.RpcFactory">
    <constructor-arg> 
        <bean class="org.zbus.rpc.mq.MqInvoker"> 
            <constructor-arg ref="broker"/>
            <constructor-arg value="MyRpc"/> 
        </bean>
    </constructor-arg>
</bean>


<bean id="interface" factory-bean="myrpc" factory-method="getService">
    <constructor-arg type="java.lang.Class" value="org.zbus.rpc.biz.Interface"/> 
</bean> 

Spring完成zbus代理透明化,zbus設施從你的應用邏輯中徹底消失

public static void main(String[] args) { 
    ApplicationContext context = new ClassPathXmlApplicationContext("SpringRpcClient.xml");

    Interface intf = (Interface) context.getBean("interface"); 
    for(int i=0;i<100;i++){
        System.out.println(intf.listMap());
    } 
} 

ZBUS消息協議


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM