基於Protobuf的分布式高性能RPC框架——Navi-Pbrpc
1 簡介
Navi-pbrpc框架是一個高性能的遠程調用RPC框架,使用netty4技術提供非阻塞、異步、全雙工的信道,使用protobuf作為序列化協議,同時提供長、短連接模式,支持non-blocking和傳統的blocking io,以及負載均衡,容錯處理策略等,對於基於socket的分布式調用提供通信基礎。
如果你的項目中需要高性能的RPC解決方案,那么navi-pbrpc可以幫助到你構建一個強大的遠程調用系統。
Navi-pbrpc使用netty nio開發,全雙工、異步、非阻塞的通信模型,保證了高性能和理想的QPS,了解詳細性能測試報告見附錄性能測試。
單測覆蓋率見附錄。
設計關於UML類圖見附錄。
github已開源,鏈接請點此https://github.com/neoremind/navi-pbrpc。
2 協議介紹
------------- ------------- | | | | | 客戶端 | | 服務端 | | | | | | | | | | | | | | 應用層 | ----NsHead + protobuf序列化body(byte[])-----| 應用層 | |-------------| |-------------| | | ----------- 全雙工短連接tcp socket --------| | | | ------------[全雙工長連接tcp socket]---------| | | | . | | | | . | | | 傳輸層 | (1-n條channel) | 傳輸層 | | | . | | | | . | | | | ------------[全雙工長連接tcp socket]---------| | |-------------| |-------------| | 網絡層 | | 網絡層 | |-------------| |-------------| | 鏈路層 | | 鏈路層 | |-------------| |-------------| | 物理層 | ================== <<->> ================= | 物理層 | ------------- ------------- |
Header在框架內部叫做NsHead,NsHead + protobuf序列化body包結構示意如下,關於NsHead頭結構更多信息見附錄。
Byte/ 0 | 1 | 2 | 3 | / | | | | |0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7| +---------------+---------------+---------------+---------------+ 0/ NsHead / / / / / / / +---------------+---------------+---------------+---------------+ 36/ protobuf序列化后的數據 / +/ (body長度在NsHead中定義) / +---------------+---------------+---------------+---------------+ |
3 使用方法
3.1 准備工作
使用Maven管理的工程POM依賴請添加:
<dependency> <groupId>com.baidu.beidou</groupId> <artifactId>navi-pbrpc</artifactId> <version>1.1.1</version> </dependency> |
最新依賴請查找:Sonatype(https://oss.sonatype.org/#nexus-search;quick%7Enavi-pbrpc)
Maven依賴樹如下:
+- commons-pool:commons-pool:jar:1.5.7:compile +- com.google.protobuf:protobuf-java:jar:2.5.0:compile +- io.netty:netty-all:jar:4.0.28.Final:compile +- org.javassist:javassist:jar:3.18.1-GA:compile +- org.slf4j:slf4j-api:jar:1.7.7:compile +- org.slf4j:slf4j-log4j12:jar:1.7.7:compile | \- log4j:log4j:jar:1.2.17:compile |
3.2 服務端開發
3.2.1 protoc生成代碼
首先定義服務的proto,例如新建一個demo.proto文件,內容如下:
package com.baidu.beidou.navi.pbrpc.demo.proto; option cc_generic_services = true; message DemoRequest { optional int32 user_id = 1; } message DemoResponse { optional int32 user_id = 1; optional string user_name = 2; enum GenderType { MALE = 1; FEMALE = 2; } optional GenderType gender_type = 3; } |
使用protoc命令編譯,生成Demo.java,方法見附錄。
3.2.2 開發服務實現
開發一個服務端的實現,例如DemoServiceImpl,代碼如下:
public class DemoServiceImpl implements DemoService { @Override public DemoResponse doSmth(DemoRequest req) { DemoResponse.Builder builder = DemoResponse.newBuilder(); builder.setUserId(1); builder.setUserName("name-1"); builder.setGenderType(DemoResponse.GenderType.MALE); return builder.build(); } } |
特別注意,一個方法若想暴露為服務必須滿足如下限制:
- 參數必須只有1個。
- 參數和返回值類型必須為繼承自com.google.protobuf.GeneratedMessage。由protoc生成的java bean都會繼承這個類。
3.2.3 暴露並且啟動服務
啟動服務端,代碼如下:
PbrpcServer server = new PbrpcServer(8088); server.register(100, new DemoServiceImpl()); server.start(); |
表示開放端口為8088,將DemoServiceImpl這個對象中的方法注入server,作為服務。register(int, Object)中的第一個參數作為服務標示的起始值,默認會遍歷Object中的所有方法,把符合上述限制條件的方法暴露為服務,其標示從int起始值開始,依次遞增1,這個例子中DemoServiceImpl.doSmth(..)方法的標示就是100,如果還有其他方法可以暴露,則從101開始遞增。
這里注意,服務端默認如果全雙工的channel鏈路在1個小時之內沒有任何數據寫入,那么會自動關閉該鏈路,避免浪費服務端資源。Navi-rpc短連接調用不受影響,對於池化的長連接再下次發起請求的時候會重新make connection,如果是非Navi-rpc客戶端的其他長連接接入,請注意這個限制。
3.2.4 關閉服務
安全關閉連接的方法如下:
server.shutdown(); |
4 客戶端開發
4.1 同步調用與異步調用
在下面的代碼示例中,會看到client調用遠程RPC,會有同步以及異步的方式,作為異步方式的調用示例如下:
// 異步調用 CallFuture<demoresponse> future = client.asyncTransport(DemoResponse.class, msg); // 阻塞線程,等待結果 DemoResponse res = future.get(); </demoresponse> |
調用客戶端可以發送完請求后,拿到future,選擇做其他邏輯,或者在get()上阻塞等待。
作為同步方式的調用示例如下:
// 同步調用 DemoResponse res = client.syncTransport(DemoResponse.class, msg); |
調用客戶端會一直阻塞等待。
4.2 nio短連接調用
// 構造客戶端 PbrpcClient client = PbrpcClientFactory.buildShortLiveConnection("127.0.0.1", 8088, 60000); // 構建請求數據 DemoRequest.Builder req = DemoRequest.newBuilder(); req.setUserId(1); byte[] data = req.build().toByteArray(); // 構造請求消息 PbrpcMsg msg = new PbrpcMsg(); msg.setServiceId(100); msg.setProvider("beidou"); msg.setData(data); // 異步調用 CallFuture<demoresponse> future = client.asyncTransport(DemoResponse.class, msg); // 阻塞線程,等待結果 DemoResponse res = future.get(); // 打印結果 System.out.println(res); 這里注意,一旦PbrpcClient建立好是可以復用的,無需每次重新新建。 </demoresponse> |
PbrpcClientFactory是一個client工廠,幫助構造短連接調用,其他參數如下:
public static SimplePbrpcClient buildShortLiveConnection(String ip, int port); public static SimplePbrpcClient buildShortLiveConnection(String ip, int port, int readTimeout); public static SimplePbrpcClient buildShortLiveConnection(String ip, int port, int connTimeout, int readTimeout); |
其中connTimeout表示客戶端連接時間,單位毫秒。
readTimeout表示客戶端調用時間,單位毫秒,超時會拋出TimeoutException。例如如下:
Exception in thread "main" java.lang.RuntimeException: Error occurrs due to Client call timeout, request logId=1696636656 at com.baidu.beidou.navi.pbrpc.client.callback.CallFuture.get(CallFuture.java:97) at com.baidu.beidou.navi.pbrpc.client.PooledPbrpcClient.syncTransport(PooledPbrpcClient.java:109) at com.baidu.unbiz.soma.biz.siconf.rpc.pbrpc.product.protocol.TestBiz.main(TestBiz.java:31) Caused by: com.baidu.beidou.navi.pbrpc.exception.TimeoutException: Client call timeout, request logId=1696636656 at com.baidu.beidou.navi.pbrpc.client.TimeoutEvictor.detectTimetout(TimeoutEvictor.java:68) at com.baidu.beidou.navi.pbrpc.client.TimeoutEvictor.run(TimeoutEvictor.java:47) at java.util.TimerThread.mainLoop(Timer.java:512) at java.util.TimerThread.run(Timer.java:462) |
4.3 nio長連接池調用
連接池默認開啟8個keepAlive長連接,代碼如下:
// 構造客戶端 PbrpcClient client = PbrpcClientFactory.buildPooledConnection(new PooledConfiguration(), "127.0.0.1", 8088, 60000); // 構建請求數據 DemoRequest.Builder req = DemoRequest.newBuilder(); req.setUserId(1); byte[] data = req.build().toByteArray(); // 構造請求消息 PbrpcMsg msg = new PbrpcMsg(); msg.setServiceId(100); msg.setProvider("beidou"); msg.setData(data); // 異步調用 CallFuture<demoresponse> future = client.asyncTransport(DemoResponse.class, msg); // 阻塞線程,等待結果 DemoResponse res = future.get(); // 打印結果 System.out.println(res); </demoresponse> |
其中PooledConfiguration可以設置連接池相關的參數,例如多少個長連接等策略。
PbrpcClientFactory是一個client工廠,幫助構造長連接池調用,其他參數如下:
public static PooledPbrpcClient buildPooledConnection(String ip, int port); public static PooledPbrpcClient buildPooledConnection(String ip, int port, int readTimeout); public static PooledPbrpcClient buildPooledConnection(PooledConfiguration configuration, String ip, int port, int readTimeout); public static PooledPbrpcClient buildPooledConnection(PooledConfiguration configuration, String ip, int port, int connTimeout, int readTimeout); |
其中connTimeout表示客戶端連接時間,單位毫秒。
readTimeout表示客戶端調用時間,單位毫秒,超時會拋出TimeoutException。
4.4 Blocking IO短連接調用
// 構造客戶端 PbrpcClient client = PbrpcClientFactory.buildShortLiveBlockingIOConnection("127.0.0.1", 8088, 60000); // 構建請求數據 DemoRequest.Builder req = DemoRequest.newBuilder(); req.setUserId(1); byte[] data = req.build().toByteArray(); // 構造請求消息 PbrpcMsg msg = new PbrpcMsg(); msg.setServiceId(100); msg.setProvider("beidou"); msg.setData(data); // 同步調用,blocking IO只支持同步調用 DemoResponse res = client.syncTransport(DemoResponse.class, msg); // 打印結果 System.out.println(res); |
默認只支持同步調用,其他構造方法如下:
public static BlockingIOPbrpcClient buildShortLiveBlockingIOConnection(String ip, int port); public static BlockingIOPbrpcClient buildShortLiveBlockingIOConnection(String ip, int port, int readTimeout); public static BlockingIOPbrpcClient buildShortLiveBlockingIOConnection(String ip, int port, int connTimeout, int readTimeout); |
特別注意,調用一個不能定位logId的pbrpc服務,請必須使用blocking IO方式,半雙工通信方式,即一問一答,流程如下圖所示:
1.request -------------------------> client --------single TCP connection-------- server <-------------------------2.response |
對於netty nio來說無法標示到全雙工后服務端發送回來的一個包到底映射到本地哪個調用請求上,對於通過Navi-pbrpc暴露的service服務,各種方式可以隨意使用。
4.5 Blocking IO長連接池調用
// 構造客戶端 PbrpcClient client = PbrpcClientFactory.buildPooledBlockingIOConnection("127.0.0.1", 8088, 60000); // 構建請求數據 DemoRequest.Builder req = DemoRequest.newBuilder(); req.setUserId(1); byte[] data = req.build().toByteArray(); // 構造請求消息 PbrpcMsg msg = new PbrpcMsg(); msg.setServiceId(100); msg.setProvider("beidou"); msg.setData(data); // 同步調用,blocking IO只支持同步調用 DemoResponse res = client.syncTransport(DemoResponse.class, msg); // 打印結果 System.out.println(res); |
默認只支持同步調用,其他構造方法如下:
public static BlockingIOPooledPbrpcClient buildPooledBlockingIOConnection(String ip, int port); public static BlockingIOPooledPbrpcClient buildPooledBlockingIOConnection(String ip, int port, int readTimeout); public static BlockingIOPooledPbrpcClient buildPooledBlockingIOConnection( PooledConfiguration configuration, String ip, int port, int readTimeout); public static BlockingIOPooledPbrpcClient buildPooledBlockingIOConnection( PooledConfiguration configuration, String ip, int port, int connTimeout, int readTimeout); |
4.6 帶有負載均衡以及容錯策略的HA客戶端調用
// 構造客戶端 PbrpcClient client = HAPbrpcClientFactory.buildShortLiveConnection("127.0.0.1:8088,1.1.1.1:9999", new RRLoadBalanceStrategy(new FailOverStrategy(2))); // 構建請求數據 DemoRequest.Builder req = DemoRequest.newBuilder(); req.setUserId(1); byte[] data = req.build().toByteArray(); // 構造請求消息 PbrpcMsg msg = new PbrpcMsg(); msg.setServiceId(100); msg.setProvider("beidou"); msg.setData(data); // 異步調用 CallFuture<demoresponse> future = client.asyncTransport(DemoResponse.class, msg); // 阻塞線程,等待結果 DemoResponse res = future.get(); // 打印結果 System.out.println(res); </demoresponse> |
其中HAPbrpcClientFactory是負責構造高可用客戶端的工廠,第一個參數是一個IP:PORT串,按照逗號分隔。
其后面的參數是可擴展的負載均衡策略和容錯處理策略,RRLoadBalanceStrategy表示使用輪訓(Round Robin)策略,FailOverStrategy表示容錯策略為失敗重試,最多重試次數為2。
還支持的其他策略組合為RandomLoadBalanceStrategy標示隨機策略,FailFastStrategy表示失敗立即退出。可以隨意組合。
其他構造方法如下:
public static HAPbrpcClient buildShortLiveConnection(String connectString, LoadBalanceStrategy lb); public static HAPbrpcClient buildShortLiveConnection(String connectString, int readTimeout, LoadBalanceStrategy lb); public static HAPbrpcClient buildShortLiveConnection(String connectString, int connTimeout, int readTimeout, LoadBalanceStrategy lb); public static HAPbrpcClient buildPooledConnection(String connectString, LoadBalanceStrategy lb); public static HAPbrpcClient buildPooledConnection(String connectString, int readTimeout, LoadBalanceStrategy lb); public static HAPbrpcClient buildPooledConnection(PooledConfiguration configuration, String connectString, int readTimeout, LoadBalanceStrategy lb); public static HAPbrpcClient buildPooledConnection(PooledConfiguration configuration, String connectString, int connTimeout, int readTimeout, LoadBalanceStrategy lb); public static HAPbrpcClient buildShortLiveBlockingIOConnection(String connectString, LoadBalanceStrategy lb); public static HAPbrpcClient buildShortLiveBlockingIOConnection(String connectString, int readTimeout, LoadBalanceStrategy lb); public static HAPbrpcClient buildShortLiveBlockingIOConnection(String connectString, int connTimeout, int readTimeout, LoadBalanceStrategy lb); public static HAPbrpcClient buildShortLiveBlockingIOConnection( PbrpcClientConfiguration configuration, String connectString, int connTimeout, int readTimeout, LoadBalanceStrategy lb); public static HAPbrpcClient buildPooledBlockingIOConnection(String connectString, LoadBalanceStrategy lb); public static HAPbrpcClient buildPooledBlockingIOConnection(String connectString, int readTimeout, LoadBalanceStrategy lb); public static HAPbrpcClient buildPooledBlockingIOConnection(PooledConfiguration configuration, String connectString, int readTimeout, LoadBalanceStrategy lb); public static HAPbrpcClient buildPooledBlockingIOConnection(String connectString, int connTimeout, int readTimeout, LoadBalanceStrategy lb); public static HAPbrpcClient buildPooledBlockingIOConnection(PooledConfiguration configuration, String connectString, int connTimeout, int readTimeout, LoadBalanceStrategy lb); public static HAPbrpcClient buildPooledBlockingIOConnection(PooledConfiguration configuration, PbrpcClientConfiguration clientConfig, String connectString, int connTimeout, int readTimeout, LoadBalanceStrategy lb); |
4.7 關閉連接
安全關閉連接和各種連接池的方法如下:
client.shutdown(); |
5 與Spring集成
5.1 准備工作
Maven POM依賴請添加:
<dependency> <groupId>com.baidu.beidou</groupId> <artifactId>navi-pbrpc-spring</artifactId> <version>1.1.1</version> </dependency> |
5.2 開發服務接口
1)根據服務提供方的proto文件生成java代碼。此處省略具體方法。詳細見第一部分。
2)開發一個Java的Interface
接口名稱隨意,達意即可。
入參有且僅有一個請求類型,參數和返回值類型必須繼承自com.google.protobuf.GeneratedMessage。由protoc生成的java bean都會繼承這個類。
方法名隨意,達意即可。
方法上加入一個PbrpcMethodId的注解,標明遠程服務的method id,如果沒有注解則默認為0。
一個實例如下,這里的DemoResponse和DemoRequest都是根據proto生成的java類定義,100標示遠程服務的method id標識。
/** * ClassName: DemoService <br /> * Function: 遠程服務接口demo * * @author Zhang Xu */ public interface DemoService { /** * 干點什么 * * @param req 請求 * @return 響應 */ @PbrpcMethodId(100) DemoResponse doSmth(DemoRequest req); } |
5.3 配置XML
通常項目均會與Spring集成,利用Spring的IoC配置管理,可以做到功能的靈活插拔可擴展,一個最常用的典型配置是
使用properties文件中配置的IP:PORT列表標示遠程服務
使用短連接blocking io訪問遠程服務
將下面的配置加入到你的XML文件即可,說明全在注釋中。
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:aop="http://www.springframework.org/schema/aop" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd"> <aop:aspectj-autoproxy proxy-target-class="true"/> <context:annotation-config/> <context:component-scan base-package="com.baidu.beidou"/> <!-- properties配置文件,內含ip端口列表或者一些timeout設置 --> <bean id="propertyPlaceholderConfigurerConfig" class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"> <property name="systemPropertiesModeName" value="SYSTEM_PROPERTIES_MODE_OVERRIDE"/> <property name="ignoreResourceNotFound" value="true"/> <property name="ignoreUnresolvablePlaceholders" value="true"/> <property name="locations"> <list> <value>classpath:autoevict/application.properties</value> </list> </property> </bean> <!-- 自動剔除傳輸回調callback,單位時間內調用失敗率大於某個百分比,則剔除掉該客戶端 --> <!-- 下面的例子表示服務啟動后2s(initDelay)開始第一次檢查,檢查周期是6s(checkPeriod), --> <!-- 檢查周期內錯誤率大於80%(maxFailPercentage)並且調用次數大於3次(minInvokeNumber)則剔除 --> <bean id="autoEvictTransportCallback" class="com.baidu.beidou.navi.pbrpc.client.AutoEvictTransportCallback"> <property name="checkPeriod" value="6000"/> <property name="minInvokeNumber" value="3"/> <property name="initDelay" value="2000"/> <property name="maxFailPercentage" value="80"/> </bean> <!-- 高可用相關配置,FailOverStrategy代表失敗重試,FailFastStrategy代表失敗立即退出 --> <!-- 負載均衡配置中,RRLoadBalanceStrategy代表輪訓調用服務器,RandomLoadBalanceStrategy代表隨機選擇服務器調用 --> <!-- 默認transportCallback不做任何事情,可以配置AutoEvictTransportCallback做自動剔除失效鏈接 --> <bean id="failoverStrategy" class="com.baidu.beidou.navi.pbrpc.client.ha.FailOverStrategy"/> <bean id="roundRobinLoadBalanceStrategy" class="com.baidu.beidou.navi.pbrpc.client.ha.RRLoadBalanceStrategy"> <property name="failStrategy" ref="failoverStrategy"/> <property name="transportCallback" ref="autoEvictTransportCallback"/> </bean> <!-- Pbprc服務server定位locator工廠,這里使用BlockingIO短連接 --> <bean id="pbrpcServerLocator" class="com.baidu.beidou.navi.pbrpc.client.IpPortShortLiveBlockingIOPbrpcServerLocator"/> <!-- 通過Pbprc服務server定位locator工廠構造高可用客戶端 --> <bean id="haPbrpcClient" factory-bean="pbrpcServerLocator" factory-method="factory"> <constructor-arg value="${pbrpc.client.server}"/> <constructor-arg value="${pbrpc.client.connect.timeout}"/> <constructor-arg value="${pbrpc.client.read.timeout}"/> <constructor-arg ref="roundRobinLoadBalanceStrategy"/> </bean> <!-- Pbprc代理proxy生成器,需要指定高可用pbrpc客戶端和provider標示 --> <!-- 這里的proxy是利用jdk的動態代理技術構建的,proxy也可以使用javassist動態字節碼技術生成 --> <bean id="integrationProxy" class="com.baidu.beidou.navi.pbrpc.spring.JdkDynamicIntegrationProxy"> <property name="pbrpcClient" ref="haPbrpcClient"/> <property name="provider" value="beidou"/> </bean> <!-- 服務bean定義,使用Spring的FactoryBean來做bean代理,可以使用Resource注解注入這個bean --> <bean id="demoService" class="com.baidu.beidou.navi.pbrpc.spring.PbrpcProxyFactoryBean"> <property name="integrationProxy" ref="integrationProxy"/> <property name="serviceInterface"> <value>com.baidu.beidou.navi.pbrpc.demo.service.DemoService</value> </property> </bean> </beans> |
properties配置如下:
pbrpc.client.server=127.0.0.1:14419,127.0.0.1:14420 pbrpc.client.connect.timeout=2000 pbrpc.client.read.timeout=5000 |
了解更多可選配置見下面小節。
5.4 開始調用
由於上面配置了DemoService的代理,因此可以用@Resource很自然地來使用bean,一個testcase如下。
@ContextConfiguration(locations = {"classpath*:applicationContext.xml"}) public class SpringIntegrationIpPortListTest extends AbstractJUnit4SpringContextTests { @Autowired private DemoService demoService; @Test public void testDoSmth() { Demo.DemoRequest.Builder req = Demo.DemoRequest.newBuilder(); req.setUserId(1); Demo.DemoResponse response = demoService.doSmth(req.build()); System.out.println(response); assertThat(response.getUserId(), is(1)); } } |
5.5 其他配置
5.5.1 單點的配置IP:PORT並且不啟用自動失效剔除
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:aop="http://www.springframework.org/schema/aop" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd"> <aop:aspectj-autoproxy proxy-target-class="true"/> <context:annotation-config/> <context:component-scan base-package="com.baidu.beidou"/> <!-- properties配置文件,內含ip端口列表或者一些timeout設置 --> <bean id="propertyPlaceholderConfigurerConfig" class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"> <property name="systemPropertiesModeName" value="SYSTEM_PROPERTIES_MODE_OVERRIDE"/> <property name="ignoreResourceNotFound" value="true"/> <property name="ignoreUnresolvablePlaceholders" value="true"/> <property name="locations"> <list> <value>classpath:ipportlist/application.properties</value> </list> </property> </bean> <!-- 高可用相關配置,FailOverStrategy代表失敗重試,FailFastStrategy代表失敗立即退出 --> <!-- 負載均衡配置中,RRLoadBalanceStrategy代表輪訓調用服務器,RandomLoadBalanceStrategy代表隨機選擇服務器調用 --> <!-- 默認transportCallback不做任何事情,可以配置AutoEvictTransportCallback做自動剔除失效鏈接 --> <bean id="failoverStrategy" class="com.baidu.beidou.navi.pbrpc.client.ha.FailOverStrategy"/> <bean id="roundRobinLoadBalanceStrategy" class="com.baidu.beidou.navi.pbrpc.client.ha.RRLoadBalanceStrategy"> <property name="failStrategy" ref="failoverStrategy"/> </bean> <!-- 手工配置單點pbrpc客戶端,可以配置1到多個 --> <!-- 這里使用BlockingIO短連接 --> <bean id="pbrpcClient1" class="com.baidu.beidou.navi.pbrpc.client.BlockingIOPbrpcClient"> <property name="ip" value="${pbrpc.client1.ip}"/> <property name="port" value="${pbrpc.client1.port}"/> <property name="readTimeout" value="${pbrpc.client.read.timeout}"/> <property name="connTimeout" value="${pbrpc.client.connect.timeout}"/> </bean> <bean id="pbrpcClient2" class="com.baidu.beidou.navi.pbrpc.client.BlockingIOPbrpcClient"> <property name="ip" value="${pbrpc.client2.ip}"/> <property name="port" value="${pbrpc.client2.port}"/> <property name="readTimeout" value="${pbrpc.client.read.timeout}"/> <property name="connTimeout" value="${pbrpc.client.connect.timeout}"/> </bean> <!-- 高可用pbrpc客戶端,集成多個單點客戶端以及負載均衡策略 --> <bean id="haPbrpcClient" class="com.baidu.beidou.navi.pbrpc.client.HAPbrpcClient"> <property name="loadBalanceStrategy" ref="roundRobinLoadBalanceStrategy"/> <property name="clientList"> <list> <ref bean="pbrpcClient1"/> <ref bean="pbrpcClient2"/> </list> </property> </bean> <!-- Pbprc代理proxy生成器,需要指定高可用pbrpc客戶端和provider標示 --> <bean id="integrationProxy" class="com.baidu.beidou.navi.pbrpc.spring.JdkDynamicIntegrationProxy"> <property name="pbrpcClient" ref="haPbrpcClient"/> <property name="provider" value="beidou"/> </bean> <!-- 服務bean定義,使用Spring的FactoryBean來做代理 --> <bean id="demoService" class="com.baidu.beidou.navi.pbrpc.spring.PbrpcProxyFactoryBean"> <property name="integrationProxy" ref="integrationProxy"/> <property name="serviceInterface"> <value>com.baidu.beidou.navi.pbrpc.demo.service.DemoService</value> </property> </bean> </beans> |
5.5.2 使用長連接池
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:aop="http://www.springframework.org/schema/aop" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd"> <aop:aspectj-autoproxy proxy-target-class="true"/> <context:annotation-config/> <context:component-scan base-package="com.baidu.beidou"/> <!-- properties配置文件,內含ip端口列表或者一些timeout設置 --> <bean id="propertyPlaceholderConfigurerConfig" class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"> <property name="systemPropertiesModeName" value="SYSTEM_PROPERTIES_MODE_OVERRIDE"/> <property name="ignoreResourceNotFound" value="true"/> <property name="ignoreUnresolvablePlaceholders" value="true"/> <property name="locations"> <list> <value>classpath:ipportstring_pooled/application.properties</value> </list> </property> </bean> <!-- 高可用相關配置,FailOverStrategy代表失敗重試,FailFastStrategy代表失敗立即退出 --> <!-- 負載均衡配置中,RRLoadBalanceStrategy代表輪訓調用服務器,RandomLoadBalanceStrategy代表隨機選擇服務器調用 --> <!-- 默認transportCallback不做任何事情,可以配置AutoEvictTransportCallback做自動剔除失效鏈接 --> <bean id="failoverStrategy" class="com.baidu.beidou.navi.pbrpc.client.ha.FailOverStrategy"/> <bean id="roundRobinLoadBalanceStrategy" class="com.baidu.beidou.navi.pbrpc.client.ha.RRLoadBalanceStrategy"> <property name="failStrategy" ref="failoverStrategy"/> </bean> <!-- Pbprc服務server定位locator工廠,這里使用BlockingIO長連接池 --> <bean id="pbrpcServerLocator" class="com.baidu.beidou.navi.pbrpc.client.IpPortPooledBlockingIOPbrpcServerLocator"/> <!-- 通過Pbprc服務server定位locator工廠構造高可用客戶端 --> <bean id="haPbrpcClient" factory-bean="pbrpcServerLocator" factory-method="factory"> <constructor-arg value="${pbrpc.client.server}"/> <constructor-arg value="${pbrpc.client.connect.timeout}"/> <constructor-arg value="${pbrpc.client.read.timeout}"/> <constructor-arg ref="roundRobinLoadBalanceStrategy"/> </bean> <!-- Pbprc代理proxy生成器,需要指定高可用pbrpc客戶端和provider標示 --> <bean id="integrationProxy" class="com.baidu.beidou.navi.pbrpc.spring.JdkDynamicIntegrationProxy"> <property name="pbrpcClient" ref="haPbrpcClient"/> <property name="provider" value="beidou"/> </bean> <!-- 服務bean定義,使用Spring的FactoryBean來做代理 --> <bean id="demoService" class="com.baidu.beidou.navi.pbrpc.spring.PbrpcProxyFactoryBean"> <property name="integrationProxy" ref="integrationProxy"/> <property name="serviceInterface"> <value>com.baidu.beidou.navi.pbrpc.demo.service.DemoService</value> </property> </bean> </beans> |
6 附錄
6.1 NsHead頭結構
Byte/ 0 | 1 | 2 | 3 | / | | | | |0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7| +---------------+---------------+---------------+---------------+ 0| id | flags | +---------------+---------------+---------------+---------------+ 4| log id | +---------------+---------------+---------------+---------------+ 8| provider | + + | | + + 16| | + + 20| | +---------------+---------------+---------------+---------------+ 24| magic number | +---------------+---------------+---------------+---------------+ 28| method id | +---------------+---------------+---------------+---------------+ 32| body length | +---------------+---------------+---------------+---------------+ Total 36 bytes |
Header各字段含義
- id請求的id。目前未使用。建議設置為0。
- flags本次請求的一些標志符。目前框架用於傳輸errorCode。
- log-id。本次請求的日志id。Navi-rpc服務端用該id定位一個唯一的客戶端請求。
- provider標識調用方的表示。
- magic-number特殊標識,用於標識一個包的完整性。目前未使用。
- method-id是RPC方法的序列號。根據proto文件中定義的service順序,從注冊進入的起始值開始依次遞增。
- body-length消息體長度。
6.2 設計——UML類圖
6.3 性能測試報告
測試環境: Linux內核版本:2.6.32_1-11-0-0 CPU:Intel(R) Xeon(R) CPU E5-2620 0 @ 2.00GHz processor_count : 12 內存:64G 在同一台物理機上測試。
JVM參數: -Xms512m -Xmx512m
測試壓力: 10w請求,20並發,測試期間會有4個以上的核全部100%負荷。
測試case: 客戶端發起請求,要求字符串長度以及數量,服務端返回一個指定數量的List給予客戶端,字符串為隨機生成。
測試結果: 可以看出在常見的請求區間10k左右數據大小,QPS能在18000+。
傳輸數據大小 響應時間(毫秒) QPS 50byte 3186 31387 1k 4063 24612 10k 5354 18677 20k 7833 12766 50k 12658 7900 |
6.4 長連接池PooledConfiguration配置詳解
/** * 控制池中空閑的對象的最大數量。 默認值是8,如果是負值表示沒限制。 */ private int maxIdle = GenericObjectPool.DEFAULT_MAX_IDLE; /** * whenExhaustedAction如果是WHEN_EXHAUSTED_BLOCK,指定等待的毫秒數。<br /> * 如果maxWait是正數,那么會等待maxWait的毫秒的時間,超時會拋出NoSuchElementException異常 ;<br /> * 如果maxWait為負值,會永久等待。 maxWait的默認值是-1。 */ private long maxWait = GenericObjectPool.DEFAULT_MAX_WAIT; /** * 如果testOnBorrow被設置,pool會在borrowObject返回對象之前使用PoolableObjectFactory的validateObject來驗證這個對象是否有效,要是對象沒通過驗證,這個對象會被丟棄, * 然后重新選擇一個新的對象。 testOnBorrow的默認值是false,可以使用GenericObjectPool.DEFAULT_TEST_ON_BORROW;。 * <p> </p> * 注意,對於長期idle的連接,服務端會默認關閉channel此時客戶端並不知曉,因此不能使用已經失效的channel,為保證客戶端可用,這里暫時使用這個策略每次borrow的時候都test */ private boolean testOnBorrow = true; /** * 控制池中空閑的對象的最小數量。 默認值是0。 */ private int minIdle = GenericObjectPool.DEFAULT_MIN_IDLE; /** * 控制池中對象的最大數量。 默認值是8,如果是負值表示沒限制。 */ private int maxActive = GenericObjectPool.DEFAULT_MAX_ACTIVE; /** * 如果testOnReturn被設置,pool會在returnObject的時候通過PoolableObjectFactory的validateObject方法驗證對象,如果對象沒通過驗證,對象會被丟棄,不會被放到池中。 * testOnReturn的默認值是false。 */ private boolean testOnReturn = GenericObjectPool.DEFAULT_TEST_ON_RETURN; /** * 指定idle對象是否應該使用PoolableObjectFactory的validateObject校驗,如果校驗失敗,這個對象會從對象池中被清除。 * 這個設置僅在timeBetweenEvictionRunsMillis被設置成正值(>0)的時候才會生效。 testWhileIdle的默認值是false。 */ private boolean testWhileIdle = GenericObjectPool.DEFAULT_TEST_WHILE_IDLE; /** * 指定驅逐線程的休眠時間。如果這個值不是正數(>0),不會有驅逐線程運行。 timeBetweenEvictionRunsMillis的默認值是-1。 */ private long timeBetweenEvictionRunsMillis = GenericObjectPool.DEFAULT_TIME_BETWEEN_EVICTION_RUNS_MILLIS; /** * 設置驅逐線程每次檢測對象的數量。 這個設置僅在timeBetweenEvictionRunsMillis被設置成正值(>0)的時候才會生效。 numTestsPerEvictionRun的默認值是3。 */ private int numTestsPerEvictionRun = GenericObjectPool.DEFAULT_NUM_TESTS_PER_EVICTION_RUN; /** * 指定最小的空閑驅逐的時間間隔(空閑超過指定的時間的對象,會被清除掉)。 這個設置僅在timeBetweenEvictionRunsMillis被設置成正值(>0)的時候才會生效。 * minEvictableIdleTimeMillis默認值是30分鍾。 */ private long minEvictableIdleTimeMillis = GenericObjectPool.DEFAULT_MIN_EVICTABLE_IDLE_TIME_MILLIS; /** * 與minEvictableIdleTimeMillis類似,也是指定最小的空閑驅逐的時間間隔(空閑超過指定的時間的對象,會被清除掉),不過會參考minIdle的值,只有idle對象的數量超過minIdle的值,對象才會被清除。 * 這個設置僅在timeBetweenEvictionRunsMillis被設置成正值 * (>0)的時候才會生效,並且這個配置能被minEvictableIdleTimeMillis配置取代(minEvictableIdleTimeMillis配置項的優先級更高)。 * softMinEvictableIdleTimeMillis的默認值是-1。 */ private long softMinEvictableIdleTimeMillis = GenericObjectPool.DEFAULT_SOFT_MIN_EVICTABLE_IDLE_TIME_MILLIS; /** * 設置后進先出的池策略。pool可以被配置成LIFO隊列(last-in-first-out)或FIFO隊列(first-in-first-out),來指定空閑對象被使用的次序。 lifo的默認值是true。 */ private boolean lifo = GenericObjectPool.DEFAULT_LIFO; /** * 指定池中對象被消耗完以后的行為,有下面這些選擇: WHEN_EXHAUSTED_FAIL 0 WHEN_EXHAUSTED_GROW 2 WHEN_EXHAUSTED_BLOCK 1 * 如果是WHEN_EXHAUSTED_FAIL,當池中對象達到上限以后,繼續borrowObject會拋出NoSuchElementException異常。 * 如果是WHEN_EXHAUSTED_GROW,當池中對象達到上限以后,會創建一個新對象,並返回它。 * 如果是WHEN_EXHAUSTED_BLOCK,當池中對象達到上限以后,會一直等待,直到有一個對象可用。這個行為還與maxWait有關 * ,如果maxWait是正數,那么會等待maxWait的毫秒的時間,超時會拋出NoSuchElementException異常;如果maxWait為負值,會永久等待。 * whenExhaustedAction的默認值是WHEN_EXHAUSTED_BLOCK,maxWait的默認值是-1。 */ private byte whenExhaustedAction = GenericObjectPool.WHEN_EXHAUSTED_BLOCK; |
6.5 默認值手冊
/** * 默認客戶端連接超時時間,單位毫秒 */ public static final int DEFAULT_CLIENT_CONN_TIMEOUT = 4000; /** * 默認客戶端調用讀超時時間,單位毫秒 */ public static final int DEFAULT_CLIENT_READ_TIMEOUT = 60000; /** * 默認客戶端超時調用檢測器啟動時間,單位毫秒 */ public static int CLIENT_TIMEOUT_EVICTOR_DELAY_START_TIME = 5000; /** * 默認客戶端超時調用檢測器檢測間隔,單位毫秒 */ public static int CLIENT_TIMEOUT_EVICTOR_CHECK_INTERVAL = 5000; |
6.6 protoc生成原生proto代碼方法
1)下載的protobuffer編譯客戶端: github:https://github.com/google/protobuf/releases 目前常用的是2.5.0版本
2)重命名問xxx.proto為自己想生成類名稱
3)修改文件中package為自己的包前綴
4)調用命令:protoc –java_out=xxx.proto
其他生成方法可以使用各種IDE或者編輯器(如sublime text)直接生成。