先放出鏈接,喜歡的給個star:https://gitee.com/a1234567891/koalas-rpc

一:項目介紹
koalas-RPC 個人作品,提供大家交流學習,有意見請私信,歡迎拍磚。客戶端采用thrift協議,服務端支持netty和thrift的TThreadedSelectorServer半同步半異步線程模型,支持動態擴容,服務上下線,權重動態,可用性配置,頁面流量統計等,QPS統計,TP90,TP99,TP95等豐富可視化數據,持續為個人以及中小型公司提供可靠的RPC框架技術方案。
1:為什么要寫這個RPC
市面上常見的RPC框架很多,grpc,motan,dubbo等,但是隨着越來越多的元素加入,復雜的架構設計等因素似使得這些框架和spring一樣,雖然號稱是輕量級,但是用起來卻是讓我們很蹩腳,大量的配置,繁雜的API設計,其實,我們根本用不上這些東西!!! 我也算得上是在很多個互聯網企業廝殺過,見過很多很多的內部RPC框架,有些優秀的設計讓我非常贊賞,有一天我突然想着,為什么不對這些設計原型進行聚合歸類,自己搞一套【輕量級】RPC框架呢,礙於工作原因,一直沒有時間倒騰出空,十一期間工作閑暇,說搞就搞吧,落地不易,很多細節性問題,比如tcp中怎么解決大量的wait-time,如何做到thrift和netty的兼容等等大量細節的優化,希望源碼對大家對認識RPC框架起到推進的作用。東西越寫越多,有各種問題歡迎隨時拍磚
2:為什么叫koalas
樹袋熊英文翻譯,希望考拉RPC給那些不太喜歡動手自己去造輪子的人提供可靠的RPC使用環境
3:技術棧
- thrift 0.8.0
- spring-core-4.2.5,spring-context-4.2.5,spring-beans-4.2.5
- log4j,slf4j
- org.apache.commons(v2.0+)
- io.netty4
- fastJson
- zookeeper
- 點評cat(V3.0.0+ 做數據大盤統計上報等使用,可不配置)
- AOP,反射代理等
4:關於技術選型
- 序列化篇 考察了很多個序列化組件,其中包括jdk原生,kryo、hessian、protoStuff,thrift,json等,最終選擇了Thrift,原因如下 原生JDK序列化反序列化效率堪憂,其序列化內容太過全面kryo和hessian,json相對來說比原生JDK強一些,但是對跨語言支持一般,所以舍棄了,最終想在protoBuf和Thrift協議里面選擇一套框架,這倆框架很相通,支持跨語言,需要靜態編譯等等。但是protoBuf不帶RPC服務,本着提供多套服務端模式(thrift rpc,netty)的情況下,最終選擇了Thrift協議。
- IO線程模型篇 原生socket可以模擬出簡單的RPC框架,但是對於大規模並發,要求吞吐量的系統來說,也就算得上是一個demo級別的,所以BIO肯定是不考慮了,NIO的模型在序列化技術選型的時候已經說了,Thrift本身支持很多個io線程模型,同步,異步,半同步異步等(SimpleServer,TNonblockingServer,THsHaServer,TThreadedSelectorServer,TThreadPoolServer),其中吞吐量最高的肯定是半同步半異步的IO模TThreadedSelectorServer了,具體原因大家可自行google,這次不做多的闡述,選擇好了模型之后,發現thrift簡直就是神器一樣的存在,再一想,對於服務端來說,IO模型怎么能少得了Netty啊,所以下決心也要支持Netty,但是很遺憾Netty目前沒有對Thrift的序列化解析,拆包粘包的處理,但是有protoBuf,和http協議的封裝,怎么辦,自己在netty上寫對thrift的支持唄,雖然工作量大了一些,但是一想netty不就是干這個事兒的嘛- -!
- 服務發現 支持集群的RPC框架里面,像dubbo,或者是其他三方框架,對服務發現都進行的封裝,那么自研RPC的話,服務發現就要自己來寫了,那么簡單小巧容易上手的zookeeper肯定是首選了。

5:安裝教程
考拉RPC確保精簡,輕量的原則,只需要zk服務器進行服務發現(后續版本服務治理可能需要Datasource),對於zookeeper的各個環境安裝教程請自行google,不在本安裝教程內特意說明 如果需要cat的數據大盤功能,想更方便的查看服務的調用情況,需要安裝cat服務,至於cat的安裝就更簡單了,就是war包扔在tomcat里面運行,然后配置一些參數即可,當然你也可以不接入cat,單獨的作為RPC框架來使用。 CAT接入參考:https://github.com/dianping/cat
二:使用說明
1:前期准以及依賴
maven依賴
1 <dependency> 2 <groupId>koalas.rpc</groupId> 3 <artifactId>com.Koalas.rpc</artifactId> 4 <version>Koalas-1.0-SNAPSHOT</version> 5 </dependency>
關於私服的引用問題,記得全局文件不要把全局的依賴都代理掉,因為這么做只能從aliyun的私服上下載項目,由於koalas-rpc中的Cat依賴只在美團點評的私有倉庫中存在,這么做會下載依賴失敗,所以不要暴力的設置下面的代理做法。
<mirror>
<id>nexus-aliyun</id>
<mirrorOf>*</mirrorOf>
<name>Nexus aliyun</name>
<url>http://maven.aliyun.com/nexus/content/groups/public</url>
</mirror>
正確的做法是將代理去掉,直接按照作者在pom.xml文件中給定的依賴倉庫地址就可以了。
首先需要編寫自己的thrift idl文件了,這里多說一句,在群里的小伙伴曾經說過idl文件編寫不熟悉,有可能出錯 這里順帶說一嘴,thrift的ldl文件和寫java的請求體和service幾乎沒有任何區別,熟能生巧,上手之后非常簡單 這里推薦幾篇thrift的文章,有興趣可以看一看 https://blog.csdn.net/lk10207160511/article/details/50450541,https://blog.csdn.net/hrn1216/article/details/51306395 下面截圖為測試的thrift文件
更新於2019年06月10日
如果大家實在不樂意手寫idl文件,那么作者給大家提供了一個簡單的插件。鏈接: https://pan.baidu.com/s/1d_Raox39zSdFrMGw--VUsQ 提取碼: y7yu ,下載之后在src/test/java下面寫自己的普通java接口對象,然后一鍵生成thrfit文件和便后之后的文件(前提條件是需要使用者把thrift編譯環境設置到path中,否則不能正常運行),使用方式如下:寫好了自己的接口文件之后直接運行ThriftFileBuilderTest測試類中方法。
1 @Test 2 public void testToOutputstream() throws Exception { 3 4 String baseDir = "src/test/java"; 5 Class clazz = ICommonUserService.class; 6 String outPutFile =baseDir.concat ( "/" ).concat (clazz.getPackage ().getName ().replaceAll ( "\\.","/" )).concat ( "/" ); 7 outPutFile=outPutFile.concat ( clazz.getSimpleName () ).concat ( "/" ); 8 outPutFile=outPutFile.concat ( clazz.getSimpleName ()+".thrift" ); 9 10 File file = new File ( outPutFile); 11 if (file.getParentFile() != null && !file.getParentFile().exists()) { 12 file.getParentFile().mkdirs(); 13 file.createNewFile (); 14 } 15 16 this.fileBuilder.setSourceDir(baseDir); 17 18 FileOutputStream fileOutputStream= new FileOutputStream(file); 19 this.fileBuilder.buildToOutputStream(clazz,fileOutputStream); 20 21 excuteThriftCommand(file.getAbsolutePath ()); 22 }
只需要修改clazz的接口就可以了,執行過后在當前包下會生成一個thrift文件和編譯過后的class文件,直接使用即可。
test0包是作者的測試包名,改成自己實際的包名就可以了。最后說明的是作者還是推薦自己練習寫idl文件,熟練過后就可以不依賴這個工具了。
1 namespace java thrift.service 2 3 include 'WmCreateAccountRequest.thrift' 4 include 'WmCreateAccountRespone.thrift' 5 6 service WmCreateAccountService { 7 WmCreateAccountRespone.WmCreateAccountRespone getRPC(1:WmCreateAccountRequest.WmCreateAccountRequest wmCreateAccountRequest); 8 WmCreateAccountRespone.WmCreateAccountRespone koaloasTest1(1:WmCreateAccountRequest.WmCreateAccountRequest wmCreateAccountRequest); 9 WmCreateAccountRespone.WmCreateAccountRespone koaloasTest2(1:WmCreateAccountRequest.WmCreateAccountRequest wmCreateAccountRequest); 10 WmCreateAccountRespone.WmCreateAccountRespone koaloasTest3(1:WmCreateAccountRequest.WmCreateAccountRequest wmCreateAccountRequest); 11 WmCreateAccountRespone.WmCreateAccountRespone koaloasTest4(1:WmCreateAccountRequest.WmCreateAccountRequest wmCreateAccountRequest); 12 WmCreateAccountRespone.WmCreateAccountRespone koaloasTest5(1:WmCreateAccountRequest.WmCreateAccountRequest wmCreateAccountRequest); 13 WmCreateAccountRespone.WmCreateAccountRespone koaloasTest6(1:WmCreateAccountRequest.WmCreateAccountRequest wmCreateAccountRequest); 14 WmCreateAccountRespone.WmCreateAccountRespone koaloasTest7(1:WmCreateAccountRequest.WmCreateAccountRequest wmCreateAccountRequest); 15 WmCreateAccountRespone.WmCreateAccountRespone koaloasTest8(1:WmCreateAccountRequest.WmCreateAccountRequest wmCreateAccountRequest); 16 WmCreateAccountRespone.WmCreateAccountRespone koaloasTest9(1:WmCreateAccountRequest.WmCreateAccountRequest wmCreateAccountRequest); 17 WmCreateAccountRespone.WmCreateAccountRespone koaloasTest10(1:WmCreateAccountRequest.WmCreateAccountRequest wmCreateAccountRequest); 18 WmCreateAccountRespone.WmCreateAccountRespone koaloasTest11(1:WmCreateAccountRequest.WmCreateAccountRequest wmCreateAccountRequest); 19 WmCreateAccountRespone.WmCreateAccountRespone koaloasTest12(1:WmCreateAccountRequest.WmCreateAccountRequest wmCreateAccountRequest); 20 }
1 namespace java thrift.domain 2 /** 3 * 測試類 4 **/ 5 struct WmCreateAccountRequest { 6 7 1:i32 source, 8 9 2:i32 accountType, 10 11 3:i64 partnerId, 12 13 4:i32 partnerType, 14 15 5:string partnerName, 16 17 6:i32 poiFlag, 18 } 19 namespace java thrift.domain 20 /** 21 * 測試類 22 **/ 23 struct WmCreateAccountRespone { 24 1:i32 code, 25 2:string message, 26 }
編譯器需要大家去下載對應的版本 windows和linux下不同的編譯器,下載地址http://archive.apache.org/dist/thrift/0.8.0/ 下載0.8.0版本即可,0.8.0版本是很老的版本了,但是相對穩定,后續會把thirft版本升級。如果上面地址下載不下來或者失效,可以上作者的網盤上下載zip包,上面有win版本和mac,linux版本的0.8.0的thrift編譯器,鏈接: https://pan.baidu.com/s/1JpLqVbmokTOe30nU_TznWw 提取碼: ntye, 編譯上面三個文件 thrift -gen java WmCreateAccountService.thrift, thrift -gen java WmCreateAccountRequest.thrift, thrift -gen java WmCreateAccountRespone.thrift 在當前目錄下會生成3個java文件 這三個文件分別是請求體,返回體,和服務類,就這么簡單 Ok作為開發者而言,所有的准備工作都結束了。下面就開始進入實際開發~
2:xml配置方式
1. 客戶端同步調用
首先在你的xml里面配置一下引用
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:koalas="http://www.koalas.com/schema/ch" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd http://www.koalas.com/schema/ch http://www.koalas.com/schema/ch.xsd"> <koalas:client id="wmCreateAccountService1" serviceInterface="thrift.service.WmCreateAccountService" zkPath="127.0.0.1:2181"/> </beans>
首先引用koalas的自定義schema,xmlns:koalas和xsi:schemaLocation, 其中serviceInterface為thrift自動生成的java類,zkPath為zk的服務地址,默認是同步調用,接下來就是在java里面的遠程調用了。
package thrift.service; import org.apache.thrift.TException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import thrift.domain.WmCreateAccountRequest; import thrift.domain.WmCreateAccountRespone; @Service("testService") public class TestService { @Autowired WmCreateAccountService.Iface wmCreateAccountService; public void getRemoteRpc() throws TException { WmCreateAccountRequest request= new WmCreateAccountRequest ( ); //request.setSource ( 10 ); request.setAccountType ( 1 ); request.setPartnerId ( 1 ); request.setPartnerType ( 1 ); request.setPartnerName ( "你好" ); request.setPoiFlag ( 1 ); WmCreateAccountRespone respone = wmCreateAccountService.getRPC ( request); System.out.println (respone); } }
就這么簡單一個高性能的RPC框架就誕生了。WmCreateAccountService是thrift自動生成的,作為使用者而言不需要做任何事情,只需要在spring bean中注入xxx.Iface即可。
2. 客戶端異步調用
剛剛我們看了客戶端的同步調用方式,下面我們一起來看看異步的使用方式, 首先在你的xml里面配置一下引用
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:koalas="http://www.koalas.com/schema/ch" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd http://www.koalas.com/schema/ch http://www.koalas.com/schema/ch.xsd"> <koalas:client id="wmCreateAccountService2" serviceInterface="thrift.service.WmCreateAccountService" zkPath="127.0.0.1:2181" async="true"/> </beans>
和同步的區別async=true,代表異步使用,接下來就是在java里面的異步遠程調用了
package thrift.service; import client.async.KoalasAsyncCallBack; import org.apache.thrift.TException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import thrift.domain.WmCreateAccountRequest; import thrift.domain.WmCreateAccountRespone; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; @Service("testService") public class TestService2 { @Autowired WmCreateAccountService.AsyncIface wmCreateAccountService; public void getRemoteRpc() throws TException{ KoalasAsyncCallBack<WmCreateAccountRespone, WmCreateAccountService.AsyncClient.getRPC_call> koalasAsyncCallBack = new KoalasAsyncCallBack<> (); WmCreateAccountRequest request= new WmCreateAccountRequest ( ); request.setAccountType ( 1 ); request.setPartnerId ( 1 ); request.setPartnerType ( 1 ); request.setPartnerName ( "你好啊" ); request.setPoiFlag ( 1 ); wmCreateAccountService.getRPC ( request ,koalasAsyncCallBack); Future<WmCreateAccountRespone> future= koalasAsyncCallBack.getFuture (); try { //to get other things System.out.println (future.get ()); } catch (InterruptedException e) { e.printStackTrace (); } catch (ExecutionException e) { e.printStackTrace (); } } }
這次調用getRpc方法不會阻塞等待server同步結果了。而是可以去干一些自己的其他事情,然后在調用future.get ()來獲得返回resopne,當然future.get ()支持最大等待時間的,超時之后會拋出TimeOutException,當然這僅僅是client超時而已不會影響server的執行結果。
3. 服務端實現
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:koalas="http://www.koalas.com/schema/ch" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd http://www.koalas.com/schema/ch http://www.koalas.com/schema/ch.xsd"> <koalas:server id="WmCreateAccountService" serviceInterface="thrift.service.WmCreateAccountService" serviceImpl="wmCreateAccountServiceImpl" port="8001" zkpath="127.0.0.1:2181"/> </beans>
服務端只需要指定暴露的端口,zk服務地址和服務端實現即可。
@Service public class WmCreateAccountServiceImpl implements WmCreateAccountService.Iface { @Override public WmCreateAccountRespone getRPC(WmCreateAccountRequest wmCreateAccountRequest) throws TException { WmCreateAccountRespone wmCreateAccountRespone = new WmCreateAccountRespone (); wmCreateAccountRespone.setCode ( 1 ); wmCreateAccountRespone.setMessage ( "你好" ); if(new Random ( ).nextInt ( 5 )>100){ throw new RuntimeException ( "測試錯誤" ); } System.out.println ( "getRPC start ...." + wmCreateAccountRequest + "------" + atomicInteger.incrementAndGet () ); return wmCreateAccountRespone; } }
只需要實現xxxx.Iface即可
3:注解配置方式
有的小伙伴會覺得配置xml有點麻煩,koalas-rpc也提供了純注解的使用方式
1. 客戶端調用
xml中的配置
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:koalas="http://www.koalas.com/schema/ch" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd http://www.koalas.com/schema/ch http://www.koalas.com/schema/ch.xsd"> <koalas:annotation package="thrift.annotation.client.impl"/> </beans>
一個掃描標簽就行了,如果你在spring bean里想通過調用rpc遠程服務,那么掃描一下就行了
java中使用
@Service("testServiceSync")
public class TestServiceSync {
@KoalasClient(zkPath = "127.0.0.1:2181",readTimeout = 5000*1000)
WmCreateAccountService.Iface wmCreateAccountService;
public void getRemoteRpc() throws TException {
WmCreateAccountRequest request= new WmCreateAccountRequest ( );
//request.setSource ( 10 );
request.setAccountType ( 1 );
request.setPartnerId ( 1 );
request.setPartnerType ( 1 );
request.setPartnerName ( "你好啊-我是注解實現的" );
request.setPoiFlag ( 1 );
WmCreateAccountRespone respone = wmCreateAccountService.getRPC ( request);
System.out.println (respone);
}
}
只需要在你想遠程調用的類上加一個@KoalasClient注解就可以了,遠程調用就這么簡單,當然異步使用方式也類似
@Service("testServiceAsync")
public class TestServiceAsync {
@KoalasClient(zkPath = "127.0.0.1:2181",readTimeout = 5000*1000)
WmCreateAccountService.AsyncIface wmCreateAccountService;
public void getRemoteRpc() throws TException{
KoalasAsyncCallBack<WmCreateAccountRespone, WmCreateAccountService.AsyncClient.getRPC_call> koalasAsyncCallBack = new KoalasAsyncCallBack<> ();
WmCreateAccountRequest request= new WmCreateAccountRequest ( );
//request.setSource ( 10 );
request.setAccountType ( 1 );
request.setPartnerId ( 1 );
request.setPartnerType ( 1 );
request.setPartnerName ( "你好啊-我是注解實現的" );
request.setPoiFlag ( 1 );
wmCreateAccountService.getRPC ( request ,koalasAsyncCallBack);
Future<WmCreateAccountRespone> future= koalasAsyncCallBack.getFuture ();
try {
System.out.println (future.get ());
} catch (InterruptedException e) {
e.printStackTrace ();
} catch (ExecutionException e) {
e.printStackTrace ();
}
}
}
注意和同步調用不同的是自定義注解注入的接口是xxxx.AsyncIface,同步是xxxx.Iface。KoalasAsyncCallBack回調使用方式和上面的xml一樣。有一點需要說明
<koalas:annotation package="thrift.annotation.client.impl"/>
如果package屬性設置為空,那么所有的@KoalasClient都會生效,也就是說所有在spring bean中的自定義注解@KoalasClient都會自動注入。這里說另外一種用法
private WmCreateAccountService.Iface wmCreateAccountService; @KoalasClient(zkPath = "127.0.0.1:2181",readTimeout = 5000*1000) public void setWmCreateAccountService(WmCreateAccountService.Iface wmCreateAccountService){ this.wmCreateAccountService = wmCreateAccountService; }
直接注入方法的方式也是可以的。
2. 服務端實現
xml中的配置
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:koalas="http://www.koalas.com/schema/ch" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd http://www.koalas.com/schema/ch http://www.koalas.com/schema/ch.xsd"> <koalas:annotation package="thrift.annotation.server.impl"/> </beans>
配置和client中一樣只需要配置一個自定義標簽即可,java中的使用方式如下:
package thrift.annotation.server.impl; import annotation.KoalasServer; import org.apache.thrift.TException; import thrift.domain.WmCreateAccountRequest; import thrift.domain.WmCreateAccountRespone; import thrift.service.WmCreateAccountService; import java.util.Random; import java.util.concurrent.atomic.AtomicInteger; @KoalasServer ( port = 8801,zkpath="127.0.0.1:2181") public class WmCreateAccountServiceNettyImpl implements WmCreateAccountService.Iface { private AtomicInteger atomicInteger = new AtomicInteger ( 0 ); @Override public WmCreateAccountRespone getRPC(WmCreateAccountRequest wmCreateAccountRequest) throws TException { WmCreateAccountRespone wmCreateAccountRespone = new WmCreateAccountRespone (); wmCreateAccountRespone.setCode ( 1 ); wmCreateAccountRespone.setMessage ( "你好啊" ); if(new Random ( ).nextInt ( 5 )>100){ try { Thread.sleep ( 5000 ); } catch (InterruptedException e) { e.printStackTrace (); } } System.out.println ( "getRPC start ...." + wmCreateAccountRequest + "------" + atomicInteger.incrementAndGet () ); return wmCreateAccountRespone; } }
這樣服務實現就會主從注冊到zookeeper中提供給client端使用了。值得說明的是被掃描到並且類上有@KoalasServer的類會被加載到spring上下文中,可以當成一個普通的spring bean來處理,還有一點如果你不指定package,配置成如下情況
<koalas:annotation package=""/>
這樣配置會以spring的bean為基礎實現,那么使用方式需要改成
package thrift.annotation.server.impl; import annotation.KoalasServer; import org.apache.thrift.TException; import thrift.domain.WmCreateAccountRequest; import thrift.domain.WmCreateAccountRespone; import thrift.service.WmCreateAccountService; import java.util.Random; import java.util.concurrent.atomic.AtomicInteger; @KoalasServer ( port = 8801,zkpath="127.0.0.1:2181") @Service public class WmCreateAccountServiceNettyImpl implements WmCreateAccountService.Iface { private AtomicInteger atomicInteger = new AtomicInteger ( 0 ); @Override public WmCreateAccountRespone getRPC(WmCreateAccountRequest wmCreateAccountRequest) throws TException { WmCreateAccountRespone wmCreateAccountRespone = new WmCreateAccountRespone (); wmCreateAccountRespone.setCode ( 1 ); wmCreateAccountRespone.setMessage ( "你好啊" ); if(new Random ( ).nextInt ( 5 )>100){ try { Thread.sleep ( 5000 ); } catch (InterruptedException e) { e.printStackTrace (); } } System.out.println ( "getRPC start ...." + wmCreateAccountRequest + "------" + atomicInteger.incrementAndGet () ); return wmCreateAccountRespone; } }
就這么簡單即可。
3. 泛化調用
為什么需要泛化調用? 1:有一個通用壓測平台,想去壓測不同的server。那么現在就有一個問題了,不可能讓壓測平台服務端去依賴所有的下游服務,這樣依賴會很繁雜,這時候如果說只配置serviceName,request模型和request請求json就可以進行遠程調用,那么將大大的減少頭疼的依賴。 2:假設php同事對java代碼不熟悉,不可能讓他們去依賴spring,一共一套簡單的api來使用是很有必要的。 3:上游服務不想依賴下游服務的數據模型。
對於泛化調用來說,dubbo已經提供,soft-rpc也有提供。當然koalas-rpc也不會例外,並且支持xml,注解和java api的使用方式。下面幾個例子來說明一下使用方式。更多demo去源碼中查看,作者已經寫好,開箱即用。
xml使用方式
<koalas:client id="wmCreateAccountService3" serviceInterface="thrift.service.WmCreateAccountService" zkPath="127.0.0.1:2181" generic="true" readTimeout="50000000"/>
@Autowired @Qualifier("wmCreateAccountService3") GenericService.Iface wmGenericService; public void getGenericRpc() throws TException { GenericRequest request = new GenericRequest ( ); request.setMethodName ( "getRPC" ); request.setClassType ( new ArrayList<String> ( ){{ add ( "thrift.domain.WmCreateAccountRequest"); }} ); request.setRequestObj ( new ArrayList<String> ( ){{ add ( "{\"accountType\":1,\"partnerId\":1,\"partnerName\":\"你好\",\"partnerType\":1,\"poiFlag\":1,\"source\":0}"); }} ); String str = wmGenericService.invoke ( request ); System.out.println (str); }
簡單說明一下,GenericService.Iface是通用服務,有三個參數,第一個是方法名稱,第二個是請求體類型集合,第三個是請求體內容。直接調用即可,返回值是server端的json類型,使用json工具為阿里巴巴的Fast-json
注解使用方式
@KoalasClient(zkPath = "127.0.0.1:2181",readTimeout = 5000*1000,genericService = "thrift.service.WmCreateAccountService") GenericService.Iface genericService; public void getGenericRemoteRpc() throws TException { GenericRequest request = new GenericRequest ( ); request.setMethodName ( "getRPC" ); request.setClassType ( new ArrayList<String> ( ){{ add ( "thrift.domain.WmCreateAccountRequest"); }} ); request.setRequestObj ( new ArrayList<String> ( ){{ add ( "{\"accountType\":1,\"partnerId\":1,\"partnerName\":\"你好\",\"partnerType\":1,\"poiFlag\":1,\"setAccountType\":true,\"setPartnerId\":true,\"setPartnerName\":true,\"setPartnerType\":true,\"setPoiFlag\":true,\"setSource\":false,\"source\":0}"); }} ); String str = genericService.invoke ( request ); System.out.println (str); }
唯一區別的是注解要指定genericService,當genericService不為空時,默認開啟泛化調用 當然,java api方式也是支持的。
KoalasClientProxy koalasClientProxy = new KoalasClientProxy(); koalasClientProxy.setServiceInterface ( "thrift.service.WmCreateAccountService" ); koalasClientProxy.setZkPath ("127.0.0.1:2181" ); koalasClientProxy.setGeneric ( true ); koalasClientProxy.setReadTimeout ( 50000000 ); koalasClientProxy.afterPropertiesSet (); GenericService.Iface genericService = (GenericService.Iface) koalasClientProxy.getObject (); GenericRequest request = new GenericRequest ( ); request.setMethodName ( "getRPC" ); request.setClassType ( new ArrayList<String> ( ){{ add ( "thrift.domain.WmCreateAccountRequest"); }} ); request.setRequestObj ( new ArrayList<String> ( ){{ add ( "{\"accountType\":1,\"partnerId\":1,\"partnerName\":\"你好\",\"partnerType\":1,\"poiFlag\":1,\"setAccountType\":true,\"setPartnerId\":true,\"setPartnerName\":true,\"setPartnerType\":true,\"setPoiFlag\":true,\"setSource\":false,\"source\":0}"); }} ); String str = genericService.invoke ( request ); System.out.println (str); koalasClientProxy.destroy ();
特別注意的是KoalasClientProxy對象非常非常重,一定要在服務關閉的時候執行koalasClientProxy.destroy ();方法,並且需要帶應用程序中緩存該對象,千萬不要每次使用都要創建,這樣會極大的浪費資源,每個服務對應一個KoalasClientProxy,同步和異步也是不同的對象,這些使用者需要注意。
4. 原生調用支持
koalas-rpc在原生基礎上封裝了自定義協議和特定的傳輸類型,看過源碼的朋友一定覺得處理非常非常麻煩,但是在自定義協議的過程中koalas-rpc也同時支持原生的thrift請求,可以在本地做測試等等。請求調用demo:
package xml.client; import org.apache.thrift.TException; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.protocol.TProtocol; import org.apache.thrift.transport.TFramedTransport; import org.apache.thrift.transport.TSocket; import org.apache.thrift.transport.TTransport; import thrift.domain.WmCreateAccountRequest; import thrift.domain.WmCreateAccountRespone; import thrift.service.WmCreateAccountService; public class ThriftNative { public static final String SERVER_IP = "localhost"; public static final int SERVER_PORT = 8001; public static final int TIMEOUT = 3000000; public static void main(String[] args) throws TException { TTransport transport = new TFramedTransport (new TSocket (SERVER_IP, SERVER_PORT, TIMEOUT)); TProtocol protocol = new TBinaryProtocol (transport); WmCreateAccountService.Client client = new WmCreateAccountService.Client(protocol); transport.open(); WmCreateAccountRequest request= new WmCreateAccountRequest ( ); //request.setSource ( 10 ); request.setAccountType ( 1 ); request.setPartnerId ( 1 ); request.setPartnerType ( 1 ); request.setPartnerName ( "你好啊-我是ThriftNative實現的服務端getRemoteRpc" ); request.setPoiFlag ( 1 ); WmCreateAccountRespone respone=client.getRPC (request ); System.out.println (respone); } }
三:參數配置文檔
1:客戶端
| 參數名 | 說明 | 是否必須 |
|---|---|---|
| serviceInterface | thrift生成的接口類 | Y |
| zkPath | zk的服務地址,集群中間逗號分隔 | Y |
| serverIpPorts | 不實用zk發現直接連接服務器server,格式ip:端口#權重。多個逗號分隔 | N |
| async | 是否異步 | N,默認false同步 |
| generic | 是否泛化調用(xml配置中使用) | N,默認false |
| genericService | 泛化調用的serviceName(注解配置中使用)使用方法參照代碼中demo | N,默認false |
| cat | 是否開啟CAT數據大盤,需要配置CAT服務,即可查看詳細調用情況) | N,默認false |
| connTimeout | 連接超時 | N,默認3000ms |
| readTimeout | 讀取超時 | N,默認5000ms,按照服務端指定時間適當調整 |
| localMockServiceImpl | 本地測試的實現 | N |
| retryRequest | 是否錯誤重試 | N,默認true |
| retryTimes | 重試次數 | N,默認3次 |
| maxTotal | TCP長連接池,參照Apache Pool參數 | 100 |
| maxIdle | TCP長連接池,參照Apache Pool參數 | 50 |
| minIdle | TCP長連接池,參照Apache Pool參數 | 10 |
| lifo | TCP長連接池,參照Apache Pool參數 | true |
| fairness | TCP長連接池,參照Apache Pool參數 | false |
| maxWaitMillis | TCP長連接池,參照Apache Pool參數 | 30 * 1000 |
| timeBetweenEvictionRunsMillis | TCP長連接池,參照Apache Pool參數 | 3 * 60 * 1000 |
| minEvictableIdleTimeMillis | TCP長連接池,參照Apache Pool參數 | 5 * 60 * 1000 |
| softMinEvictableIdleTimeMillis | TCP長連接池,參照Apache Pool參數 | 10 * 60 * 1000 |
| numTestsPerEvictionRun | TCP長連接池,參照Apache Pool參數 | 20 |
| testOnCreate | TCP長連接池,參照Apache Pool參數 | false |
| testOnBorrow | TCP長連接池,參照Apache Pool參數 | false |
| testOnReturn | TCP長連接池,參照Apache Pool參數 | false |
| testWhileIdle | TCP長連接池,參照Apache Pool參數 | true |
| iLoadBalancer | 負載略側,默認隨機 | N |
| env | 環境 | N,默認dev |
| removeAbandonedOnBorrow | TCP長連接池,參照Apache Pool參數 | true |
| removeAbandonedOnMaintenance | TCP長連接池,參照Apache Pool參數 | true |
| removeAbandonedTimeout | TCP長連接池,參照Apache Pool參數 | 30000ms |
| maxLength_ | 允許發送最大字節數 | N,10 * 1024 * 1024 |
| cores | selecter核心數量 | N,默認當前cpu數量 |
| asyncSelectorThreadCount | 異步請求時線程數量 | N,默認當前CPU核心數量*2 |
| privateKey | 私鑰 | N |
| publicKey | 公鑰 | N |
2:服務端
| 參數 | 說明 | 是否必須 |
|---|---|---|
| serviceImpl | 服務端實現 | Y |
| serviceInterface | thrift自動生成的類 | Y |
| port | 暴露的服務端口 | Y |
| zkpath | 服務端的zk路徑 | Y |
| cat | (是否開啟CAT數據大盤,需要配置CAT服務,即可查看詳細調用情況) | N,默認false |
| bossThreadCount | 處理連接線程 | N,當前CPU核心數 |
| workThreadCount | 讀取線程 | N,當前CPU核心數*2 |
| koalasThreadCount | 業務線程數 | 256 |
| maxLength | 最大接收字節數 | Integer.MAX_VALUE |
| env | 環境 | N,dev |
| weight | 權重 | N,10 |
| serverType | 采用哪些服務端,可以選NETTY和THRIFT,默認NETTY | N |
| workQueue | 當server超載時,可以容納等待任務的隊列長度 | 0 |
| privateKey | 私鑰 | N |
| publicKey | 公鑰 | N |
3:客戶端服務端RSA雙向加密
源碼中utils.KoalasRsaUtil的main方法已經為大家寫好生成私鑰和公鑰的代碼,執行即可 ,下面為核心源碼展示
public static String sign(byte[] data, String privateKey) throws Exception { byte[] keyBytes = Base64.decodeBase64 ( privateKey.getBytes ( "UTF-8" ) ); PKCS8EncodedKeySpec pkcs8KeySpec = new PKCS8EncodedKeySpec ( keyBytes ); KeyFactory keyFactory = KeyFactory.getInstance ( KEY_ALGORITHM ); PrivateKey privateK = keyFactory.generatePrivate ( pkcs8KeySpec ); Signature signature = Signature.getInstance ( SIGNATURE_ALGORITHM ); signature.initSign ( privateK ); signature.update ( data ); return new String ( Base64.encodeBase64 ( signature.sign () ), "UTF-8" ); } public static boolean verify(byte[] data, String publicKey, String sign) throws Exception { byte[] keyBytes = Base64.decodeBase64 ( publicKey.getBytes ("UTF-8") ); X509EncodedKeySpec keySpec = new X509EncodedKeySpec ( keyBytes ); KeyFactory keyFactory = KeyFactory.getInstance ( KEY_ALGORITHM ); PublicKey publicK = keyFactory.generatePublic ( keySpec ); Signature signature = Signature.getInstance ( SIGNATURE_ALGORITHM ); signature.initVerify ( publicK ); signature.update ( data ); return signature.verify ( Base64.decodeBase64 ( sign.getBytes ("UTF-8") ) );
}
執行main方法之后,會得到4個長長的字符串
下面四個字符串為koalas-rpc中客戶端和服務端使用的rsa非對稱秘鑰,復制使用即可 MIICdQIBADANBgkqhkiG9w0BAQEFAASCAl8wggJbAgEAAoGBAIPQIc8/+wl5hTDT8fT4rCEA//pwSqdX8djur+UDwR/qg5iW3xBHUuxTGXRko/3SXYKJLugRmT2gV4ZggSHLpToSFYJZwATIbVD2p3oqZx4ZC5g3mZdTCScHbTb4CITFPacJCKads75Plrk8ryW7wP9dWlSmrF8f3CzReKUTjf5dAgMBAAECgYBRigXwK9cCNG8lFmc9sDriq7it1psHzApqtLSQifME6FCBqwrQCh8M3BcJ/lvH30NDRdODcaeHDNI36SjYnB5X25mMG95OEgLqPm7T8oB3DBY/BhJbAY43FbZSU3Lb+El5zknpTtH0M8DTlul1EmLbe+TJVL/x/SkpDx/HSS3GAQJBALtSSBeskQ4P+Pn5M4F2+GZJmFDxaOQHIuy/RdfckxV1aEMN425ieSrinSCXyBC8uTN0zF1NlJsfWLAUhtfSQ90CQQC0I+mEXsxWtTDT+fd3bDgiJtfOwPpyNT4HSObdq+aAqO44NL7fqD2plNZ3vBULfDbdbnTlvKJJnPUdt457WjyBAkAiM63SFMIPbT8qdSPAWbaVBo73CHz8VYk87NeVyEJawqscwyZpezVgbSv/TXdMBwlRqdu+lXGyuRB6ZeUQ9uVJAkAscjfpqyIruqUDiEdgtdjbxE22+7JPf4eAcKJVy1YiJIwyXgFCWdZtAwYvoL5oiQtYcypwjKxWEV4BKQsEsG0BAkBmlDi0wSPA2x7YjudQNWv+H51CsYDWMjOQ7AzUYABfkWVnbeYS/3uf7W56AHl3Rmdo7zUTBJFCyM/Rt28yZVLj MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQDAAxbccTLuu12V2Le1mI5b+0kZMiQwN/WTSv8d2y0J/wVl+yMWgjZi4c8/kAs8pACEiFQ8hUUovmoAwceKEd5h3ISSV5lEPyBt+68DzinOrSGv7bZhGm5bwkRG7MMpSgAVSJj2lWTkf63fp2e/FwHs3WM64sSlbdlUN/57YtUC6QIDAQAB MIICdwIBADANBgkqhkiG9w0BAQEFAASCAmEwggJdAgEAAoGBAMADFtxxMu67XZXYt7WYjlv7SRkyJDA39ZNK/x3bLQn/BWX7IxaCNmLhzz+QCzykAISIVDyFRSi+agDBx4oR3mHchJJXmUQ/IG37rwPOKc6tIa/ttmEablvCREbswylKABVImPaVZOR/rd+nZ78XAezdYzrixKVt2VQ3/nti1QLpAgMBAAECgYEApwwI/4+b+AYZzRvV967Zazyaw8jTov+MLrC4cokUDfZIBAkQ5awzFKPPYkU3AXLM4ICaiGyJVoESR8ZOitgw1wB6tbI2DhP4FD5dqJkIOdUNujo+gAda3kfeCjAgWbtUL3Zhj7Ff+xFvSDDxUYKGG4fZwge3CFwyQ2vjxhPTXGECQQDpAkS6AW17LvWAiiu2924MEicJQW/s3w+chjuQ3VaauzotAHoSMi8VjBSlINbKxpklthKB4vubfA6AtTHae3hPAkEA0vVBKk9Qz8TkraN3QcILJwHjcjqP8+51n1jimSpZeZQL4BJxStdqqMP2nUzAVnh4ncEoFZ/3QA0sSwcdPtDLRwJBAIDpMmC+HXYDWuvMhbbqWUXwXQxv2Z5xIk/0q8vPyPQ+FUeEdgTPIuGG6H0bF/qDuYL1onOdwpoZHmTy2iwIF10CQBiVNdvNVFhx1EgbtWj3SL9p6+xCwMWnMxO3kuhQVA7j3qJk48jZ43b5JwLbj8pDzaJsgNRMSM6w+klf8duBDz8CQBMIMmhU84An2nv/CPNPArCC8BN8YhY1AH685zgRQBLv5untRhfZ+hJtqjSzTJlY7JHybMzc6wt2FZXrhvuopO4= MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQCD0CHPP/sJeYUw0/H0+KwhAP/6cEqnV/HY7q/lA8Ef6oOYlt8QR1LsUxl0ZKP90l2CiS7oEZk9oFeGYIEhy6U6EhWCWcAEyG1Q9qd6KmceGQuYN5mXUwknB202+AiExT2nCQimnbO+T5a5PK8lu8D/XVpUpqxfH9ws0XilE43+XQIDAQAB 上面四個字符串為koalas-rpc中客戶端和服務端使用的rsa非對稱秘鑰,復制使用即可
得到上面的四個長長的字符串,可以由server端給client端提供。其中字符串1,字符串2分別對應client的privateKey,和publicKey,字符串3和字符串4分別對應server端的privateKey,和publicKey,提供rsa雙向加密的初衷是為了將非常重要的項目保護起來,不允許其他項目隨意調用,但是RSA雙向加密會對性能有所影響。當RSA驗證失敗的時候,client會拋RsaException。RSA對稱加密適合給三方系統進行調用,對稱加密會影響傳輸性能。
實際性能壓測
8C 16G mac開發本,單機10000次請求耗時截圖 
10w次請求,大約耗時12s,平均qps在8000左右,在集群環境下會有不錯的性能表現
數據大盤展示
開啟數據大盤,需要設置客戶端或者服務端的cat參數為true,默認為false。 koalas2.0已經接入了cat服務,cat服務支持qps統計,可用率,tp90line,tp99line,豐富自定義監控報警等,接入效果圖
豐富的可視參數,流量統計,日,周,月報表展示等。
鏈路跟蹤
對RPC服務來說,系統間的調用和排查異常接口,確定耗時代碼是非常重要的,只要接入了cat,koalsa-rpc天然的支持鏈路跟蹤,一切盡在眼前! 
代碼下載后如何測試
作者在src/test/java和resource下面有已經寫好了的豐富的xml配置和注解配置,下載后直接運行測試即可,注意測試的時候需要安裝zookeeper服務,如果不想通過zk做服務發現,那么客戶端可以進行直連,指定的server列表,逗號分隔,#分隔權重,格式,192.168.3.253:6666#10,192.168.3.253:6667#10 詳情見參數配置列表,但是這種辦法作者是不推薦的,在生產環境下沒有心跳和動態上下線功能。
CAT服務按需配置,不需要數據大盤不需要配置,不會影響RPC功能,CAT接入參考:https://github.com/dianping/cat
開源協議 :
Apache License Version 2.0 see http://www.apache.org/licenses/LICENSE-2.0.html
聯系作者 :
高級java QQ群:825199617 博客地址:https://www.cnblogs.com/zyl2016/
