JAVA RPC 生產級高可用RPC框架使用分享


 先放出鏈接,喜歡的給個star:https://gitee.com/a1234567891/koalas-rpc

 


一:項目介紹

koalas-RPC 個人作品,提供大家交流學習,有意見請私信,歡迎拍磚。客戶端采用thrift協議,服務端支持netty和thrift的TThreadedSelectorServer半同步半異步線程模型,支持動態擴容,服務上下線,權重動態,可用性配置,頁面流量統計等,QPS統計,TP90,TP99,TP95等豐富可視化數據,持續為個人以及中小型公司提供可靠的RPC框架技術方案。

1:為什么要寫這個RPC

市面上常見的RPC框架很多,grpc,motan,dubbo等,但是隨着越來越多的元素加入,復雜的架構設計等因素似使得這些框架和spring一樣,雖然號稱是輕量級,但是用起來卻是讓我們很蹩腳,大量的配置,繁雜的API設計,其實,我們根本用不上這些東西!!! 我也算得上是在很多個互聯網企業廝殺過,見過很多很多的內部RPC框架,有些優秀的設計讓我非常贊賞,有一天我突然想着,為什么不對這些設計原型進行聚合歸類,自己搞一套【輕量級】RPC框架呢,礙於工作原因,一直沒有時間倒騰出空,十一期間工作閑暇,說搞就搞吧,落地不易,很多細節性問題,比如tcp中怎么解決大量的wait-time,如何做到thrift和netty的兼容等等大量細節的優化,希望源碼對大家對認識RPC框架起到推進的作用。東西越寫越多,有各種問題歡迎隨時拍磚

2:為什么叫koalas

樹袋熊英文翻譯,希望考拉RPC給那些不太喜歡動手自己去造輪子的人提供可靠的RPC使用環境

3:技術棧
  •  thrift 0.8.0
  •  spring-core-4.2.5,spring-context-4.2.5,spring-beans-4.2.5
  •  log4j,slf4j
  •  org.apache.commons(v2.0+)
  •  io.netty4
  •  fastJson
  •  zookeeper
  •  點評cat(V3.0.0+ 做數據大盤統計上報等使用,可不配置)
  •  AOP,反射代理等
4:關於技術選型
  1. 序列化篇 考察了很多個序列化組件,其中包括jdk原生,kryo、hessian、protoStuff,thrift,json等,最終選擇了Thrift,原因如下 原生JDK序列化反序列化效率堪憂,其序列化內容太過全面kryo和hessian,json相對來說比原生JDK強一些,但是對跨語言支持一般,所以舍棄了,最終想在protoBuf和Thrift協議里面選擇一套框架,這倆框架很相通,支持跨語言,需要靜態編譯等等。但是protoBuf不帶RPC服務,本着提供多套服務端模式(thrift rpc,netty)的情況下,最終選擇了Thrift協議。
  2. IO線程模型篇 原生socket可以模擬出簡單的RPC框架,但是對於大規模並發,要求吞吐量的系統來說,也就算得上是一個demo級別的,所以BIO肯定是不考慮了,NIO的模型在序列化技術選型的時候已經說了,Thrift本身支持很多個io線程模型,同步,異步,半同步異步等(SimpleServer,TNonblockingServer,THsHaServer,TThreadedSelectorServer,TThreadPoolServer),其中吞吐量最高的肯定是半同步半異步的IO模TThreadedSelectorServer了,具體原因大家可自行google,這次不做多的闡述,選擇好了模型之后,發現thrift簡直就是神器一樣的存在,再一想,對於服務端來說,IO模型怎么能少得了Netty啊,所以下決心也要支持Netty,但是很遺憾Netty目前沒有對Thrift的序列化解析,拆包粘包的處理,但是有protoBuf,和http協議的封裝,怎么辦,自己在netty上寫對thrift的支持唄,雖然工作量大了一些,但是一想netty不就是干這個事兒的嘛- -!
  3. 服務發現 支持集群的RPC框架里面,像dubbo,或者是其他三方框架,對服務發現都進行的封裝,那么自研RPC的話,服務發現就要自己來寫了,那么簡單小巧容易上手的zookeeper肯定是首選了。

5:安裝教程

考拉RPC確保精簡,輕量的原則,只需要zk服務器進行服務發現(后續版本服務治理可能需要Datasource),對於zookeeper的各個環境安裝教程請自行google,不在本安裝教程內特意說明 如果需要cat的數據大盤功能,想更方便的查看服務的調用情況,需要安裝cat服務,至於cat的安裝就更簡單了,就是war包扔在tomcat里面運行,然后配置一些參數即可,當然你也可以不接入cat,單獨的作為RPC框架來使用。 CAT接入參考:https://github.com/dianping/cat

 

二:使用說明

1:前期准以及依賴

maven依賴

1 <dependency>
2         <groupId>koalas.rpc</groupId>
3         <artifactId>com.Koalas.rpc</artifactId>
4         <version>Koalas-1.0-SNAPSHOT</version>
5     </dependency>

 

關於私服的引用問題,記得全局文件不要把全局的依賴都代理掉,因為這么做只能從aliyun的私服上下載項目,由於koalas-rpc中的Cat依賴只在美團點評的私有倉庫中存在,這么做會下載依賴失敗,所以不要暴力的設置下面的代理做法。

<mirror>
        <id>nexus-aliyun</id>
        <mirrorOf>*</mirrorOf>
        <name>Nexus aliyun</name>
        <url>http://maven.aliyun.com/nexus/content/groups/public</url>
    </mirror>

 

正確的做法是將代理去掉,直接按照作者在pom.xml文件中給定的依賴倉庫地址就可以了。

首先需要編寫自己的thrift idl文件了,這里多說一句,在群里的小伙伴曾經說過idl文件編寫不熟悉,有可能出錯 這里順帶說一嘴,thrift的ldl文件和寫java的請求體和service幾乎沒有任何區別,熟能生巧,上手之后非常簡單 這里推薦幾篇thrift的文章,有興趣可以看一看 https://blog.csdn.net/lk10207160511/article/details/50450541,https://blog.csdn.net/hrn1216/article/details/51306395 下面截圖為測試的thrift文件

更新於2019年06月10日

如果大家實在不樂意手寫idl文件,那么作者給大家提供了一個簡單的插件。鏈接: https://pan.baidu.com/s/1d_Raox39zSdFrMGw--VUsQ 提取碼: y7yu ,下載之后在src/test/java下面寫自己的普通java接口對象,然后一鍵生成thrfit文件和便后之后的文件(前提條件是需要使用者把thrift編譯環境設置到path中,否則不能正常運行),使用方式如下:寫好了自己的接口文件之后直接運行ThriftFileBuilderTest測試類中方法。

 1 @Test
 2     public void testToOutputstream() throws Exception {
 3 
 4         String baseDir = "src/test/java";
 5         Class clazz = ICommonUserService.class;
 6         String outPutFile =baseDir.concat ( "/" ).concat (clazz.getPackage ().getName ().replaceAll ( "\\.","/" )).concat ( "/" );
 7         outPutFile=outPutFile.concat ( clazz.getSimpleName () ).concat ( "/" );
 8         outPutFile=outPutFile.concat ( clazz.getSimpleName ()+".thrift" );
 9 
10         File file = new File (  outPutFile);
11         if (file.getParentFile() != null && !file.getParentFile().exists()) {
12             file.getParentFile().mkdirs();
13             file.createNewFile ();
14         }
15 
16         this.fileBuilder.setSourceDir(baseDir);
17 
18         FileOutputStream fileOutputStream= new FileOutputStream(file);
19         this.fileBuilder.buildToOutputStream(clazz,fileOutputStream);
20 
21         excuteThriftCommand(file.getAbsolutePath ());
22     }

 

只需要修改clazz的接口就可以了,執行過后在當前包下會生成一個thrift文件和編譯過后的class文件,直接使用即可。 輸入圖片說明test0包是作者的測試包名,改成自己實際的包名就可以了。最后說明的是作者還是推薦自己練習寫idl文件,熟練過后就可以不依賴這個工具了。

 

 
 1 namespace java thrift.service
 2 
 3 include 'WmCreateAccountRequest.thrift'
 4 include 'WmCreateAccountRespone.thrift'
 5 
 6 service WmCreateAccountService {
 7       WmCreateAccountRespone.WmCreateAccountRespone getRPC(1:WmCreateAccountRequest.WmCreateAccountRequest wmCreateAccountRequest);
 8       WmCreateAccountRespone.WmCreateAccountRespone koaloasTest1(1:WmCreateAccountRequest.WmCreateAccountRequest wmCreateAccountRequest);
 9       WmCreateAccountRespone.WmCreateAccountRespone koaloasTest2(1:WmCreateAccountRequest.WmCreateAccountRequest wmCreateAccountRequest);
10       WmCreateAccountRespone.WmCreateAccountRespone koaloasTest3(1:WmCreateAccountRequest.WmCreateAccountRequest wmCreateAccountRequest);
11       WmCreateAccountRespone.WmCreateAccountRespone koaloasTest4(1:WmCreateAccountRequest.WmCreateAccountRequest wmCreateAccountRequest);
12       WmCreateAccountRespone.WmCreateAccountRespone koaloasTest5(1:WmCreateAccountRequest.WmCreateAccountRequest wmCreateAccountRequest);
13       WmCreateAccountRespone.WmCreateAccountRespone koaloasTest6(1:WmCreateAccountRequest.WmCreateAccountRequest wmCreateAccountRequest);
14       WmCreateAccountRespone.WmCreateAccountRespone koaloasTest7(1:WmCreateAccountRequest.WmCreateAccountRequest wmCreateAccountRequest);
15       WmCreateAccountRespone.WmCreateAccountRespone koaloasTest8(1:WmCreateAccountRequest.WmCreateAccountRequest wmCreateAccountRequest);
16       WmCreateAccountRespone.WmCreateAccountRespone koaloasTest9(1:WmCreateAccountRequest.WmCreateAccountRequest wmCreateAccountRequest);
17       WmCreateAccountRespone.WmCreateAccountRespone koaloasTest10(1:WmCreateAccountRequest.WmCreateAccountRequest wmCreateAccountRequest);
18       WmCreateAccountRespone.WmCreateAccountRespone koaloasTest11(1:WmCreateAccountRequest.WmCreateAccountRequest wmCreateAccountRequest);
19       WmCreateAccountRespone.WmCreateAccountRespone koaloasTest12(1:WmCreateAccountRequest.WmCreateAccountRequest wmCreateAccountRequest);
20 }

 

 1 namespace java thrift.domain
 2 /**
 3 * 測試類
 4 **/
 5 struct WmCreateAccountRequest {
 6 
 7     1:i32 source,
 8 
 9     2:i32 accountType,
10 
11     3:i64 partnerId,
12 
13     4:i32 partnerType,
14 
15     5:string partnerName,
16 
17     6:i32 poiFlag,
18 }
19 namespace java thrift.domain
20 /**
21 * 測試類
22 **/
23 struct WmCreateAccountRespone {
24     1:i32 code,
25     2:string message,
26 }

 

編譯器需要大家去下載對應的版本 windows和linux下不同的編譯器,下載地址http://archive.apache.org/dist/thrift/0.8.0/ 下載0.8.0版本即可,0.8.0版本是很老的版本了,但是相對穩定,后續會把thirft版本升級。如果上面地址下載不下來或者失效,可以上作者的網盤上下載zip包,上面有win版本和mac,linux版本的0.8.0的thrift編譯器,鏈接: https://pan.baidu.com/s/1JpLqVbmokTOe30nU_TznWw 提取碼: ntye, 編譯上面三個文件 thrift -gen java WmCreateAccountService.thrift, thrift -gen java WmCreateAccountRequest.thrift, thrift -gen java WmCreateAccountRespone.thrift 在當前目錄下會生成3個java文件 這三個文件分別是請求體,返回體,和服務類,就這么簡單 Ok作為開發者而言,所有的准備工作都結束了。下面就開始進入實際開發~

 

2:xml配置方式

1. 客戶端同步調用


首先在你的xml里面配置一下引用

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:koalas="http://www.koalas.com/schema/ch"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
                           http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
                           http://www.koalas.com/schema/ch
                           http://www.koalas.com/schema/ch.xsd">

    <koalas:client id="wmCreateAccountService1"
               serviceInterface="thrift.service.WmCreateAccountService"
               zkPath="127.0.0.1:2181"/>
</beans>

 

首先引用koalas的自定義schema,xmlns:koalas和xsi:schemaLocation, 其中serviceInterface為thrift自動生成的java類,zkPath為zk的服務地址,默認是同步調用,接下來就是在java里面的遠程調用了。

package thrift.service;

import org.apache.thrift.TException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import thrift.domain.WmCreateAccountRequest;
import thrift.domain.WmCreateAccountRespone;

@Service("testService")
public class TestService {

    @Autowired
    WmCreateAccountService.Iface wmCreateAccountService;

    public void getRemoteRpc() throws TException {

        WmCreateAccountRequest request= new WmCreateAccountRequest (  );
        //request.setSource ( 10 );
        request.setAccountType ( 1 );
        request.setPartnerId ( 1 );
        request.setPartnerType ( 1 );
        request.setPartnerName ( "你好" );
        request.setPoiFlag ( 1 );
        WmCreateAccountRespone respone = wmCreateAccountService.getRPC (  request);
        System.out.println (respone);
     }
}

 

就這么簡單一個高性能的RPC框架就誕生了。WmCreateAccountService是thrift自動生成的,作為使用者而言不需要做任何事情,只需要在spring bean中注入xxx.Iface即可。

2. 客戶端異步調用


剛剛我們看了客戶端的同步調用方式,下面我們一起來看看異步的使用方式, 首先在你的xml里面配置一下引用

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:koalas="http://www.koalas.com/schema/ch"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
                           http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
                           http://www.koalas.com/schema/ch
                           http://www.koalas.com/schema/ch.xsd">

    <koalas:client id="wmCreateAccountService2"
           serviceInterface="thrift.service.WmCreateAccountService"
               zkPath="127.0.0.1:2181"
           async="true"/>
</beans>

 

和同步的區別async=true,代表異步使用,接下來就是在java里面的異步遠程調用了

package thrift.service;

import client.async.KoalasAsyncCallBack;
import org.apache.thrift.TException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import thrift.domain.WmCreateAccountRequest;
import thrift.domain.WmCreateAccountRespone;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
@Service("testService")
public class TestService2 {
    @Autowired
    WmCreateAccountService.AsyncIface wmCreateAccountService;
    public void getRemoteRpc() throws TException{
        KoalasAsyncCallBack<WmCreateAccountRespone, WmCreateAccountService.AsyncClient.getRPC_call> 
        koalasAsyncCallBack = new KoalasAsyncCallBack<> ();
        WmCreateAccountRequest request= new WmCreateAccountRequest (  );
        request.setAccountType ( 1 );
        request.setPartnerId ( 1 );
        request.setPartnerType ( 1 );
        request.setPartnerName ( "你好啊" );
        request.setPoiFlag ( 1 );
        wmCreateAccountService.getRPC ( request ,koalasAsyncCallBack);
        Future<WmCreateAccountRespone> future= koalasAsyncCallBack.getFuture ();
        try {
            //to get other things
            System.out.println (future.get ());
        } catch (InterruptedException e) {
            e.printStackTrace ();
        } catch (ExecutionException e) {
            e.printStackTrace ();
        }
    }

}

 

這次調用getRpc方法不會阻塞等待server同步結果了。而是可以去干一些自己的其他事情,然后在調用future.get ()來獲得返回resopne,當然future.get ()支持最大等待時間的,超時之后會拋出TimeOutException,當然這僅僅是client超時而已不會影響server的執行結果。

3. 服務端實現

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:koalas="http://www.koalas.com/schema/ch"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
       http://www.springframework.org/schema/context
       http://www.springframework.org/schema/context/spring-context-4.2.xsd
       http://www.koalas.com/schema/ch
       http://www.koalas.com/schema/ch.xsd">

    <koalas:server id="WmCreateAccountService"
                   serviceInterface="thrift.service.WmCreateAccountService"
                   serviceImpl="wmCreateAccountServiceImpl"
                   port="8001"
                   zkpath="127.0.0.1:2181"/>
</beans>

 

服務端只需要指定暴露的端口,zk服務地址和服務端實現即可。
@Service
public class WmCreateAccountServiceImpl implements WmCreateAccountService.Iface {
    @Override
    public WmCreateAccountRespone getRPC(WmCreateAccountRequest wmCreateAccountRequest) throws TException {
        WmCreateAccountRespone wmCreateAccountRespone = new WmCreateAccountRespone ();
        wmCreateAccountRespone.setCode ( 1 );
        wmCreateAccountRespone.setMessage ( "你好" );
        if(new Random (  ).nextInt ( 5 )>100){
            throw new RuntimeException ( "測試錯誤" );
        }
        System.out.println ( "getRPC  start ...." + wmCreateAccountRequest + "------" + atomicInteger.incrementAndGet () );

        return wmCreateAccountRespone;
    }
}

 

只需要實現xxxx.Iface即可

3:注解配置方式

有的小伙伴會覺得配置xml有點麻煩,koalas-rpc也提供了純注解的使用方式

1. 客戶端調用

xml中的配置

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:koalas="http://www.koalas.com/schema/ch"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
                           http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
                           http://www.koalas.com/schema/ch
                           http://www.koalas.com/schema/ch.xsd">

    <koalas:annotation package="thrift.annotation.client.impl"/>
</beans>

 

一個掃描標簽就行了,如果你在spring bean里想通過調用rpc遠程服務,那么掃描一下就行了

java中使用

@Service("testServiceSync")
public class TestServiceSync {

    @KoalasClient(zkPath = "127.0.0.1:2181",readTimeout = 5000*1000)
    WmCreateAccountService.Iface wmCreateAccountService;

    public void getRemoteRpc() throws TException {
        WmCreateAccountRequest request= new WmCreateAccountRequest (  );
        //request.setSource ( 10 );
        request.setAccountType ( 1 );
        request.setPartnerId ( 1 );
        request.setPartnerType ( 1 );
        request.setPartnerName ( "你好啊-我是注解實現的" );
        request.setPoiFlag ( 1 );
        WmCreateAccountRespone respone = wmCreateAccountService.getRPC (  request);
        System.out.println (respone);
     }

}

 

只需要在你想遠程調用的類上加一個@KoalasClient注解就可以了,遠程調用就這么簡單,當然異步使用方式也類似

@Service("testServiceAsync")
public class TestServiceAsync {
    @KoalasClient(zkPath = "127.0.0.1:2181",readTimeout = 5000*1000)
    WmCreateAccountService.AsyncIface wmCreateAccountService;
    public void getRemoteRpc() throws TException{
        KoalasAsyncCallBack<WmCreateAccountRespone, WmCreateAccountService.AsyncClient.getRPC_call> koalasAsyncCallBack = new KoalasAsyncCallBack<> ();
        WmCreateAccountRequest request= new WmCreateAccountRequest (  );
        //request.setSource ( 10 );
        request.setAccountType ( 1 );
        request.setPartnerId ( 1 );
        request.setPartnerType ( 1 );
        request.setPartnerName ( "你好啊-我是注解實現的" );
        request.setPoiFlag ( 1 );
        wmCreateAccountService.getRPC ( request ,koalasAsyncCallBack);
        Future<WmCreateAccountRespone> future= koalasAsyncCallBack.getFuture ();
        try {
            System.out.println (future.get ());
        } catch (InterruptedException e) {
            e.printStackTrace ();
        } catch (ExecutionException e) {
            e.printStackTrace ();
        }
    }

}

 

注意和同步調用不同的是自定義注解注入的接口是xxxx.AsyncIface,同步是xxxx.Iface。KoalasAsyncCallBack回調使用方式和上面的xml一樣。有一點需要說明

<koalas:annotation package="thrift.annotation.client.impl"/>

 

如果package屬性設置為空,那么所有的@KoalasClient都會生效,也就是說所有在spring bean中的自定義注解@KoalasClient都會自動注入。這里說另外一種用法

private WmCreateAccountService.Iface wmCreateAccountService;

@KoalasClient(zkPath = "127.0.0.1:2181",readTimeout = 5000*1000)
public void setWmCreateAccountService(WmCreateAccountService.Iface wmCreateAccountService){
    this.wmCreateAccountService = wmCreateAccountService;
}

 

直接注入方法的方式也是可以的。

2. 服務端實現

xml中的配置

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:koalas="http://www.koalas.com/schema/ch"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
       http://www.koalas.com/schema/ch
       http://www.koalas.com/schema/ch.xsd">
    <koalas:annotation package="thrift.annotation.server.impl"/>
</beans>

 

配置和client中一樣只需要配置一個自定義標簽即可,java中的使用方式如下:

package thrift.annotation.server.impl;

import annotation.KoalasServer;
import org.apache.thrift.TException;
import thrift.domain.WmCreateAccountRequest;
import thrift.domain.WmCreateAccountRespone;
import thrift.service.WmCreateAccountService;

import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;

@KoalasServer ( port = 8801,zkpath="127.0.0.1:2181")
public class WmCreateAccountServiceNettyImpl implements WmCreateAccountService.Iface {
    private AtomicInteger atomicInteger = new AtomicInteger ( 0 );
    @Override
    public WmCreateAccountRespone getRPC(WmCreateAccountRequest wmCreateAccountRequest) throws TException {
        WmCreateAccountRespone wmCreateAccountRespone = new WmCreateAccountRespone ();
        wmCreateAccountRespone.setCode ( 1 );
        wmCreateAccountRespone.setMessage ( "你好啊" );
        if(new Random (  ).nextInt ( 5 )>100){
            try {
                Thread.sleep ( 5000 );
            } catch (InterruptedException e) {
                e.printStackTrace ();
            }
        }
        System.out.println ( "getRPC  start ...." + wmCreateAccountRequest + "------" + atomicInteger.incrementAndGet () );

        return wmCreateAccountRespone;
    }
}

 

這樣服務實現就會主從注冊到zookeeper中提供給client端使用了。值得說明的是被掃描到並且類上有@KoalasServer的類會被加載到spring上下文中,可以當成一個普通的spring bean來處理,還有一點如果你不指定package,配置成如下情況

 <koalas:annotation package=""/>

 

這樣配置會以spring的bean為基礎實現,那么使用方式需要改成

package thrift.annotation.server.impl;

import annotation.KoalasServer;
import org.apache.thrift.TException;
import thrift.domain.WmCreateAccountRequest;
import thrift.domain.WmCreateAccountRespone;
import thrift.service.WmCreateAccountService;

import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;


@KoalasServer ( port = 8801,zkpath="127.0.0.1:2181")
@Service
public class WmCreateAccountServiceNettyImpl implements WmCreateAccountService.Iface {
    private AtomicInteger atomicInteger = new AtomicInteger ( 0 );
    @Override
    public WmCreateAccountRespone getRPC(WmCreateAccountRequest wmCreateAccountRequest) throws TException {
        WmCreateAccountRespone wmCreateAccountRespone = new WmCreateAccountRespone ();
        wmCreateAccountRespone.setCode ( 1 );
        wmCreateAccountRespone.setMessage ( "你好啊" );
        if(new Random (  ).nextInt ( 5 )>100){
            try {
                Thread.sleep ( 5000 );
            } catch (InterruptedException e) {
                e.printStackTrace ();
            }
        }
        System.out.println ( "getRPC  start ...." + wmCreateAccountRequest + "------" + atomicInteger.incrementAndGet () );

        return wmCreateAccountRespone;
    }
}

 

就這么簡單即可。

3. 泛化調用

為什么需要泛化調用? 1:有一個通用壓測平台,想去壓測不同的server。那么現在就有一個問題了,不可能讓壓測平台服務端去依賴所有的下游服務,這樣依賴會很繁雜,這時候如果說只配置serviceName,request模型和request請求json就可以進行遠程調用,那么將大大的減少頭疼的依賴。 2:假設php同事對java代碼不熟悉,不可能讓他們去依賴spring,一共一套簡單的api來使用是很有必要的。 3:上游服務不想依賴下游服務的數據模型。

對於泛化調用來說,dubbo已經提供,soft-rpc也有提供。當然koalas-rpc也不會例外,並且支持xml,注解和java api的使用方式。下面幾個例子來說明一下使用方式。更多demo去源碼中查看,作者已經寫好,開箱即用。

xml使用方式

<koalas:client id="wmCreateAccountService3"
                   serviceInterface="thrift.service.WmCreateAccountService"
                   zkPath="127.0.0.1:2181"
                   generic="true"
                   readTimeout="50000000"/>

 

@Autowired
@Qualifier("wmCreateAccountService3")
GenericService.Iface wmGenericService;

public void getGenericRpc() throws TException {
        GenericRequest request = new GenericRequest (  );
        request.setMethodName ( "getRPC" );

        request.setClassType ( new ArrayList<String> (  ){{
            add ( "thrift.domain.WmCreateAccountRequest");
        }} );

        request.setRequestObj ( new ArrayList<String> (  ){{
            add ( "{\"accountType\":1,\"partnerId\":1,\"partnerName\":\"你好\",\"partnerType\":1,\"poiFlag\":1,\"source\":0}");
        }} );

        String str = wmGenericService.invoke ( request );
        System.out.println (str);
    }

 

簡單說明一下,GenericService.Iface是通用服務,有三個參數,第一個是方法名稱,第二個是請求體類型集合,第三個是請求體內容。直接調用即可,返回值是server端的json類型,使用json工具為阿里巴巴的Fast-json

注解使用方式

@KoalasClient(zkPath = "127.0.0.1:2181",readTimeout = 5000*1000,genericService = "thrift.service.WmCreateAccountService")
 GenericService.Iface genericService;
 
 public void getGenericRemoteRpc() throws TException {
        GenericRequest request = new GenericRequest (  );
        request.setMethodName ( "getRPC" );

        request.setClassType ( new ArrayList<String> (  ){{
            add ( "thrift.domain.WmCreateAccountRequest");
        }} );

        request.setRequestObj ( new ArrayList<String> (  ){{
            add ( "{\"accountType\":1,\"partnerId\":1,\"partnerName\":\"你好\",\"partnerType\":1,\"poiFlag\":1,\"setAccountType\":true,\"setPartnerId\":true,\"setPartnerName\":true,\"setPartnerType\":true,\"setPoiFlag\":true,\"setSource\":false,\"source\":0}");
        }} );

        String str = genericService.invoke ( request );
        System.out.println (str);
    }
    

 

唯一區別的是注解要指定genericService,當genericService不為空時,默認開啟泛化調用 當然,java api方式也是支持的。

 KoalasClientProxy koalasClientProxy = new KoalasClientProxy();
        koalasClientProxy.setServiceInterface ( "thrift.service.WmCreateAccountService" );
        koalasClientProxy.setZkPath ("127.0.0.1:2181"  );
        koalasClientProxy.setGeneric ( true );
        koalasClientProxy.setReadTimeout ( 50000000 );
        koalasClientProxy.afterPropertiesSet ();
        GenericService.Iface genericService = (GenericService.Iface) koalasClientProxy.getObject ();
        GenericRequest request = new GenericRequest (  );
        request.setMethodName ( "getRPC" );

        request.setClassType ( new ArrayList<String> (  ){{
            add ( "thrift.domain.WmCreateAccountRequest");
        }} );

        request.setRequestObj ( new ArrayList<String> (  ){{
            add ( "{\"accountType\":1,\"partnerId\":1,\"partnerName\":\"你好\",\"partnerType\":1,\"poiFlag\":1,\"setAccountType\":true,\"setPartnerId\":true,\"setPartnerName\":true,\"setPartnerType\":true,\"setPoiFlag\":true,\"setSource\":false,\"source\":0}");
        }} );

        String str = genericService.invoke ( request );
        System.out.println (str);
        koalasClientProxy.destroy ();

 

特別注意的是KoalasClientProxy對象非常非常重,一定要在服務關閉的時候執行koalasClientProxy.destroy ();方法,並且需要帶應用程序中緩存該對象,千萬不要每次使用都要創建,這樣會極大的浪費資源,每個服務對應一個KoalasClientProxy,同步和異步也是不同的對象,這些使用者需要注意。

4. 原生調用支持

koalas-rpc在原生基礎上封裝了自定義協議和特定的傳輸類型,看過源碼的朋友一定覺得處理非常非常麻煩,但是在自定義協議的過程中koalas-rpc也同時支持原生的thrift請求,可以在本地做測試等等。請求調用demo:

package xml.client;

import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import thrift.domain.WmCreateAccountRequest;
import thrift.domain.WmCreateAccountRespone;
import thrift.service.WmCreateAccountService;

public class ThriftNative {
    public static final String SERVER_IP = "localhost";
    public static final int SERVER_PORT = 8001;
    public static final int TIMEOUT = 3000000;

    public static void main(String[] args) throws TException {
        TTransport transport = new TFramedTransport (new TSocket (SERVER_IP, SERVER_PORT, TIMEOUT));
        TProtocol protocol = new TBinaryProtocol (transport);
        WmCreateAccountService.Client client = new WmCreateAccountService.Client(protocol);
        transport.open();

        WmCreateAccountRequest request= new WmCreateAccountRequest (  );
        //request.setSource ( 10 );
        request.setAccountType ( 1 );
        request.setPartnerId ( 1 );
        request.setPartnerType ( 1 );
        request.setPartnerName ( "你好啊-我是ThriftNative實現的服務端getRemoteRpc" );
        request.setPoiFlag ( 1 );

        WmCreateAccountRespone respone=client.getRPC (request  );
        System.out.println (respone);

    }
}

 

三:參數配置文檔

1:客戶端

參數名 說明 是否必須
serviceInterface thrift生成的接口類 Y
zkPath zk的服務地址,集群中間逗號分隔 Y
serverIpPorts 不實用zk發現直接連接服務器server,格式ip:端口#權重。多個逗號分隔 N
async 是否異步 N,默認false同步
generic 是否泛化調用(xml配置中使用) N,默認false
genericService 泛化調用的serviceName(注解配置中使用)使用方法參照代碼中demo N,默認false
cat 是否開啟CAT數據大盤,需要配置CAT服務,即可查看詳細調用情況) N,默認false
connTimeout 連接超時 N,默認3000ms
readTimeout 讀取超時 N,默認5000ms,按照服務端指定時間適當調整
localMockServiceImpl 本地測試的實現 N
retryRequest 是否錯誤重試 N,默認true
retryTimes 重試次數 N,默認3次
maxTotal TCP長連接池,參照Apache Pool參數 100
maxIdle TCP長連接池,參照Apache Pool參數 50
minIdle TCP長連接池,參照Apache Pool參數 10
lifo TCP長連接池,參照Apache Pool參數 true
fairness TCP長連接池,參照Apache Pool參數 false
maxWaitMillis TCP長連接池,參照Apache Pool參數 30 * 1000
timeBetweenEvictionRunsMillis TCP長連接池,參照Apache Pool參數 3 * 60 * 1000
minEvictableIdleTimeMillis TCP長連接池,參照Apache Pool參數 5 * 60 * 1000
softMinEvictableIdleTimeMillis TCP長連接池,參照Apache Pool參數 10 * 60 * 1000
numTestsPerEvictionRun TCP長連接池,參照Apache Pool參數 20
testOnCreate TCP長連接池,參照Apache Pool參數 false
testOnBorrow TCP長連接池,參照Apache Pool參數 false
testOnReturn TCP長連接池,參照Apache Pool參數 false
testWhileIdle TCP長連接池,參照Apache Pool參數 true
iLoadBalancer 負載略側,默認隨機 N
env 環境 N,默認dev
removeAbandonedOnBorrow TCP長連接池,參照Apache Pool參數 true
removeAbandonedOnMaintenance TCP長連接池,參照Apache Pool參數 true
removeAbandonedTimeout TCP長連接池,參照Apache Pool參數 30000ms
maxLength_ 允許發送最大字節數 N,10 * 1024 * 1024
cores selecter核心數量 N,默認當前cpu數量
asyncSelectorThreadCount 異步請求時線程數量 N,默認當前CPU核心數量*2
privateKey 私鑰 N
publicKey 公鑰 N

2:服務端

參數 說明 是否必須
serviceImpl 服務端實現 Y
serviceInterface thrift自動生成的類 Y
port 暴露的服務端口 Y
zkpath 服務端的zk路徑 Y
cat (是否開啟CAT數據大盤,需要配置CAT服務,即可查看詳細調用情況) N,默認false
bossThreadCount 處理連接線程 N,當前CPU核心數
workThreadCount 讀取線程 N,當前CPU核心數*2
koalasThreadCount 業務線程數 256
maxLength 最大接收字節數 Integer.MAX_VALUE
env 環境 N,dev
weight 權重 N,10
serverType 采用哪些服務端,可以選NETTY和THRIFT,默認NETTY N
workQueue 當server超載時,可以容納等待任務的隊列長度 0
privateKey 私鑰 N
publicKey 公鑰 N
3:客戶端服務端RSA雙向加密

源碼中utils.KoalasRsaUtil的main方法已經為大家寫好生成私鑰和公鑰的代碼,執行即可 ,下面為核心源碼展示

public static String sign(byte[] data, String privateKey) throws Exception {
        byte[] keyBytes = Base64.decodeBase64 ( privateKey.getBytes ( "UTF-8" ) );
        PKCS8EncodedKeySpec pkcs8KeySpec = new PKCS8EncodedKeySpec ( keyBytes );
        KeyFactory keyFactory = KeyFactory.getInstance ( KEY_ALGORITHM );
        PrivateKey privateK = keyFactory.generatePrivate ( pkcs8KeySpec );
        Signature signature = Signature.getInstance ( SIGNATURE_ALGORITHM );
        signature.initSign ( privateK );
        signature.update ( data );
        return new String ( Base64.encodeBase64 ( signature.sign () ), "UTF-8" );
    }
public static boolean verify(byte[] data, String publicKey, String sign)
            throws Exception {
        byte[] keyBytes = Base64.decodeBase64 ( publicKey.getBytes ("UTF-8") );
        X509EncodedKeySpec keySpec = new X509EncodedKeySpec ( keyBytes );
        KeyFactory keyFactory = KeyFactory.getInstance ( KEY_ALGORITHM );
        PublicKey publicK = keyFactory.generatePublic ( keySpec );
        Signature signature = Signature.getInstance ( SIGNATURE_ALGORITHM );
        signature.initVerify ( publicK );
        signature.update ( data );
        return signature.verify ( Base64.decodeBase64 ( sign.getBytes ("UTF-8") ) );

 

 }

執行main方法之后,會得到4個長長的字符串

下面四個字符串為koalas-rpc中客戶端和服務端使用的rsa非對稱秘鑰,復制使用即可
MIICdQIBADANBgkqhkiG9w0BAQEFAASCAl8wggJbAgEAAoGBAIPQIc8/+wl5hTDT8fT4rCEA//pwSqdX8djur+UDwR/qg5iW3xBHUuxTGXRko/3SXYKJLugRmT2gV4ZggSHLpToSFYJZwATIbVD2p3oqZx4ZC5g3mZdTCScHbTb4CITFPacJCKads75Plrk8ryW7wP9dWlSmrF8f3CzReKUTjf5dAgMBAAECgYBRigXwK9cCNG8lFmc9sDriq7it1psHzApqtLSQifME6FCBqwrQCh8M3BcJ/lvH30NDRdODcaeHDNI36SjYnB5X25mMG95OEgLqPm7T8oB3DBY/BhJbAY43FbZSU3Lb+El5zknpTtH0M8DTlul1EmLbe+TJVL/x/SkpDx/HSS3GAQJBALtSSBeskQ4P+Pn5M4F2+GZJmFDxaOQHIuy/RdfckxV1aEMN425ieSrinSCXyBC8uTN0zF1NlJsfWLAUhtfSQ90CQQC0I+mEXsxWtTDT+fd3bDgiJtfOwPpyNT4HSObdq+aAqO44NL7fqD2plNZ3vBULfDbdbnTlvKJJnPUdt457WjyBAkAiM63SFMIPbT8qdSPAWbaVBo73CHz8VYk87NeVyEJawqscwyZpezVgbSv/TXdMBwlRqdu+lXGyuRB6ZeUQ9uVJAkAscjfpqyIruqUDiEdgtdjbxE22+7JPf4eAcKJVy1YiJIwyXgFCWdZtAwYvoL5oiQtYcypwjKxWEV4BKQsEsG0BAkBmlDi0wSPA2x7YjudQNWv+H51CsYDWMjOQ7AzUYABfkWVnbeYS/3uf7W56AHl3Rmdo7zUTBJFCyM/Rt28yZVLj
MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQDAAxbccTLuu12V2Le1mI5b+0kZMiQwN/WTSv8d2y0J/wVl+yMWgjZi4c8/kAs8pACEiFQ8hUUovmoAwceKEd5h3ISSV5lEPyBt+68DzinOrSGv7bZhGm5bwkRG7MMpSgAVSJj2lWTkf63fp2e/FwHs3WM64sSlbdlUN/57YtUC6QIDAQAB
MIICdwIBADANBgkqhkiG9w0BAQEFAASCAmEwggJdAgEAAoGBAMADFtxxMu67XZXYt7WYjlv7SRkyJDA39ZNK/x3bLQn/BWX7IxaCNmLhzz+QCzykAISIVDyFRSi+agDBx4oR3mHchJJXmUQ/IG37rwPOKc6tIa/ttmEablvCREbswylKABVImPaVZOR/rd+nZ78XAezdYzrixKVt2VQ3/nti1QLpAgMBAAECgYEApwwI/4+b+AYZzRvV967Zazyaw8jTov+MLrC4cokUDfZIBAkQ5awzFKPPYkU3AXLM4ICaiGyJVoESR8ZOitgw1wB6tbI2DhP4FD5dqJkIOdUNujo+gAda3kfeCjAgWbtUL3Zhj7Ff+xFvSDDxUYKGG4fZwge3CFwyQ2vjxhPTXGECQQDpAkS6AW17LvWAiiu2924MEicJQW/s3w+chjuQ3VaauzotAHoSMi8VjBSlINbKxpklthKB4vubfA6AtTHae3hPAkEA0vVBKk9Qz8TkraN3QcILJwHjcjqP8+51n1jimSpZeZQL4BJxStdqqMP2nUzAVnh4ncEoFZ/3QA0sSwcdPtDLRwJBAIDpMmC+HXYDWuvMhbbqWUXwXQxv2Z5xIk/0q8vPyPQ+FUeEdgTPIuGG6H0bF/qDuYL1onOdwpoZHmTy2iwIF10CQBiVNdvNVFhx1EgbtWj3SL9p6+xCwMWnMxO3kuhQVA7j3qJk48jZ43b5JwLbj8pDzaJsgNRMSM6w+klf8duBDz8CQBMIMmhU84An2nv/CPNPArCC8BN8YhY1AH685zgRQBLv5untRhfZ+hJtqjSzTJlY7JHybMzc6wt2FZXrhvuopO4=
MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQCD0CHPP/sJeYUw0/H0+KwhAP/6cEqnV/HY7q/lA8Ef6oOYlt8QR1LsUxl0ZKP90l2CiS7oEZk9oFeGYIEhy6U6EhWCWcAEyG1Q9qd6KmceGQuYN5mXUwknB202+AiExT2nCQimnbO+T5a5PK8lu8D/XVpUpqxfH9ws0XilE43+XQIDAQAB
上面四個字符串為koalas-rpc中客戶端和服務端使用的rsa非對稱秘鑰,復制使用即可

 

得到上面的四個長長的字符串,可以由server端給client端提供。其中字符串1,字符串2分別對應client的privateKey,和publicKey,字符串3和字符串4分別對應server端的privateKey,和publicKey,提供rsa雙向加密的初衷是為了將非常重要的項目保護起來,不允許其他項目隨意調用,但是RSA雙向加密會對性能有所影響。當RSA驗證失敗的時候,client會拋RsaException。RSA對稱加密適合給三方系統進行調用,對稱加密會影響傳輸性能。

實際性能壓測

8C 16G mac開發本,單機10000次請求耗時截圖 輸入圖片說明

10w次請求,大約耗時12s,平均qps在8000左右,在集群環境下會有不錯的性能表現

數據大盤展示

開啟數據大盤,需要設置客戶端或者服務端的cat參數為true,默認為false。 koalas2.0已經接入了cat服務,cat服務支持qps統計,可用率,tp90line,tp99line,豐富自定義監控報警等,接入效果圖 輸入圖片說明豐富的可視參數,流量統計,日,周,月報表展示等。

鏈路跟蹤

對RPC服務來說,系統間的調用和排查異常接口,確定耗時代碼是非常重要的,只要接入了cat,koalsa-rpc天然的支持鏈路跟蹤,一切盡在眼前! 輸入圖片說明

代碼下載后如何測試

作者在src/test/java和resource下面有已經寫好了的豐富的xml配置和注解配置,下載后直接運行測試即可,注意測試的時候需要安裝zookeeper服務,如果不想通過zk做服務發現,那么客戶端可以進行直連,指定的server列表,逗號分隔,#分隔權重,格式,192.168.3.253:6666#10,192.168.3.253:6667#10 詳情見參數配置列表,但是這種辦法作者是不推薦的,在生產環境下沒有心跳和動態上下線功能。

CAT服務按需配置,不需要數據大盤不需要配置,不會影響RPC功能,CAT接入參考:https://github.com/dianping/cat

開源協議 :

Apache License Version 2.0 see http://www.apache.org/licenses/LICENSE-2.0.html

聯系作者 :

高級java QQ群:825199617 博客地址:https://www.cnblogs.com/zyl2016/ 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM