本文轉自:http://www.cnblogs.com/top15from/p/4899954.html
ZBUS = MQ + RPC + PROXY
- 支持消息隊列, 發布訂閱, RPC, 代理(TCP/DMZ)
- 億級消息堆積能力、支持HA高可用
- 單個Jar包無依賴 ~300K
- 服務代理 -- 適配改造已有業務系統,使之具備跨平台與語言
- 豐富的API--JAVA/C/C++/C#/Python/Node.JS多語言接入
zbus-dist選擇zbus.sh或者zbus.bat直接執行

總線默認占用 15555 端口, http://localhost:15555 可以直接進入監控,注意zbus因為原生兼容HTTP協議所以監控與消息隊列使用同一個端口
ZBUS 角色概要

ZBUS 消息通訊基礎(NET模塊)
ZBUS項目不依賴其他第三方庫,消息通訊基於NIO完成(NET子項目)。NET包對NIO做了簡潔的封裝,相對Netty而言,學習成本低幾個量級,模型簡單,但不失擴展性。

框架結構保持 Dispatcher + N SelectorThread + IoAdaptor
Dispatcher 負責管理N個Selector線程
SelectorThread 負責NIO讀寫事件分發
IoAdaptor 個性化讀寫事件
基於NET的服務器程序基本只要關心IoAdaptor的個性化,比如ZBUS入口就是MqAdaptor
ZBUS API
ZBUS PROXY
ZBUS 示例
Java Maven 依賴
<dependency>
<groupId>org.zbus</groupId>
<artifactId>zbus</artifactId>
<version>6.2.6</version>
</dependency>
生產者
public static void main(String[] args) throws Exception {
//創建Broker代理
BrokerConfig config = new BrokerConfig();
config.setServerAddress("127.0.0.1:15555");
final Broker broker = new SingleBroker(config);
Producer producer = new Producer(broker, "MyMQ");
producer.createMQ(); // 如果已經確定存在,不需要創建
//創建消息,消息體可以是任意binary,應用協議交給使用者
Message msg = new Message();
msg.setBody("hello world");
producer.sendSync(msg);
broker.close();
}
消費者
public static void main(String[] args) throws Exception{
//創建Broker代表
BrokerConfig brokerConfig = new BrokerConfig();
brokerConfig.setServerAddress("127.0.0.1:15555");
Broker broker = new SingleBroker(brokerConfig);
MqConfig config = new MqConfig();
config.setBroker(broker);
config.setMq("MyMQ");
//創建消費者
@SuppressWarnings("resource")
Consumer c = new Consumer(config);
c.onMessage(new MessageHandler() {
@Override
public void handle(Message msg, Session sess) throws IOException {
System.out.println(msg);
}
});
//啟動消費線程
c.start();
}
RPC動態代理【各類復雜類型】
參考源碼test目下的rpc部分
//1)創建Broker代表(可用高可用替代)
BrokerConfig config = new BrokerConfig();
config.setServerAddress("127.0.0.1:15555");
Broker broker = new SingleBroker(config);
//2)創建基於MQ的Invoker以及Rpc工廠,指定RPC采用的MQ為MyRpc
MqInvoker invoker = new MqInvoker(broker, "MyRpc");
RpcFactory factory = new RpcFactory(invoker);
//3) 動態代理出實現類
Interface hello = factory.getService(Interface.class);
test(hello);
broker.close();
Spring集成--服務端(RPC示例)
無任何代碼侵入使得你已有的業務接口接入到zbus,獲得跨平台和多語言支持
<!-- 暴露的的接口實現示例 -->
<bean id="interface" class="org.zbus.rpc.biz.InterfaceImpl"></bean>
<bean id="serviceProcessor" class="org.zbus.rpc.RpcProcessor">
<constructor-arg>
<list>
<!-- 放入你需要的暴露的的接口 -->
<ref bean="interface"/>
</list>
</constructor-arg>
</bean>
<bean id="broker" class="org.zbus.broker.SingleBroker">
<constructor-arg>
<bean class="org.zbus.broker.BrokerConfig">
<property name="serverAddress" value="127.0.0.1:15555" />
<property name="maxTotal" value="20"/>
<!-- 這里可以增加連接池參數配置,不配置使用默認值(參考commons-pool2) -->
</bean>
</constructor-arg>
</bean>
<!-- 默認調用了start方法,由Spring容器直接帶起來注冊到zbus總線上 -->
<bean id="myrpcService" class="org.zbus.rpc.mq.Service" init-method="start">
<constructor-arg>
<bean class="org.zbus.rpc.mq.ServiceConfig">
<!-- 支持多總線注冊 -->
<constructor-arg>
<list>
<ref bean="broker"/>
</list>
</constructor-arg>
<property name="mq" value="MyRpc"/>
<property name="consumerCount" value="2"/>
<property name="messageProcessor" ref="serviceProcessor"/>
</bean>
</constructor-arg>
</bean>
Spring集成--客戶端
<bean id="broker" class="org.zbus.broker.SingleBroker">
<constructor-arg>
<bean class="org.zbus.broker.BrokerConfig">
<property name="serverAddress" value="127.0.0.1:15555" />
</bean>
</constructor-arg>
</bean>
<bean id="myrpc" class="org.zbus.rpc.RpcFactory">
<constructor-arg>
<bean class="org.zbus.rpc.mq.MqInvoker">
<constructor-arg ref="broker"/>
<constructor-arg value="MyRpc"/>
</bean>
</constructor-arg>
</bean>
<bean id="interface" factory-bean="myrpc" factory-method="getService">
<constructor-arg type="java.lang.Class" value="org.zbus.rpc.biz.Interface"/>
</bean>
Spring完成zbus代理透明化,zbus設施從你的應用邏輯中徹底消失
public static void main(String[] args) {
ApplicationContext context = new ClassPathXmlApplicationContext("SpringRpcClient.xml");
Interface intf = (Interface) context.getBean("interface");
for(int i=0;i<100;i++){
System.out.println(intf.listMap());
}
}
