一般我們常見的RPC框架都包含如下三個部分:

- 注冊中心,用於服務端注冊遠程服務以及客戶端發現服務
- 服務端,對外提供后台服務,將自己的服務信息注冊到注冊中心
- 客戶端,從注冊中心獲取遠程服務的注冊信息,然后進行遠程過程調用
上面提到的注冊中心其實屬於服務治理,即使沒有注冊中心,RPC的功能也是完整的。之前我大多接觸的是基於zookeeper的注冊中心,這里基於consul來實現注冊中心的基本功能。
Consul的一些特點:
- Raft相比Paxos直接
此外不多描述,還沒研究raft
- 支持數據中心,可以用來解決單點故障之類的問題
- 集成相比zookeeper更加簡單(代碼量少,邏輯清晰簡單)
- 支持健康檢查,支持http以及tcp
- 自帶UI管理功能,不需要額外第三方支持。(zookeeper需要單獨部署zkui之類的第三方工具)
- 支持key/value存儲
啟動consul之后訪問管理頁面
RPC集成
提取出服務注冊與服務發現兩個接口,然后使用Consul實現,這里主要通過consul-client來實現(也可以是consul-api),需要在pom中引入:
<dependency>
<groupId>com.orbitz.consul</groupId>
<artifactId>consul-client</artifactId>
<version>0.14.1</version>
</dependency>
服務注冊
- RegistryService
提供服務的注冊與刪除功能
public interface RegistryService {
void register(RpcURL url);
void unregister(RpcURL url);
}
- AbstractConsulService
consul的基類,用於構建Consl對象,服務於服務端以及客戶端。
public class AbstractConsulService {
private static final Logger logger = LoggerFactory.getLogger(AbstractConsulService.class);
protected final static String CONSUL_NAME="consul_node_jim";
protected final static String CONSUL_ID="consul_node_id";
protected final static String CONSUL_TAGS="v3";
protected final static String CONSUL_HEALTH_INTERVAL="1s";
protected Consul buildConsul(String registryHost, int registryPort){
return Consul.builder().withHostAndPort(HostAndPort.fromString(registryHost+":"+registryPort)).build();
}
}
- ConsulRegistryService
服務注冊實現類,在注冊服務的同時,指定了健康檢查。
服務的刪除暫時未實現
public class ConsulRegistryService extends AbstractConsulService implements RegistryService {
private final static int CONSUL_CONNECT_PERIOD=1*1000;
@Override
public void register(RpcURL url) {
Consul consul = this.buildConsul(url.getRegistryHost(),url.getRegistryPort());
AgentClient agent = consul.agentClient();
ImmutableRegCheck check = ImmutableRegCheck.builder().tcp(url.getHost()+":"+url.getPort()).interval(CONSUL_HEALTH_INTERVAL).build();
ImmutableRegistration.Builder builder = ImmutableRegistration.builder();
builder.id(CONSUL_ID).name(CONSUL_NAME).addTags(CONSUL_TAGS).address(url.getHost()).port(url.getPort()).addChecks(check);
agent.register(builder.build());
}
@Override
public void unregister(RpcURL url) {
}
}
由於我實現的RPC是基於TCP的,所以服務注冊的健康檢查也指定為TCP,consul會按指定的IP以及端口建立連接以此判斷服務的健康狀態。如果是http,則需要調用http方法,同時指定健康檢查地址。
ImmutableRegCheck check = ImmutableRegCheck.builder().tcp(url.getHost()+":"+url.getPort()).interval(CONSUL_HEALTH_INTERVAL).build();
后台的監控信息如下:

雖然只是指定了TCP,可能出於某種機制后台依然會發起HTTP的健康檢查請求,上圖第一條請求日志。
服務發現
- DiscoveryService
獲取所有注冊的有效的服務信息。
public interface DiscoveryService {
List<RpcURL> getUrls(String registryHost, int registryPort);
}
- ConsulDiscoveryService
首先是獲取有效的服務列表:
List<RpcURL> urls= Lists.newArrayList();
Consul consul = this.buildConsul(registryHost,registryPort);
HealthClient client = consul.healthClient();
String name = CONSUL_NAME;
ConsulResponse object= client.getAllServiceInstances(name);
List<ImmutableServiceHealth> serviceHealths=(List<ImmutableServiceHealth>)object.getResponse();
for(ImmutableServiceHealth serviceHealth:serviceHealths){
RpcURL url=new RpcURL();
url.setHost(serviceHealth.getService().getAddress());
url.setPort(serviceHealth.getService().getPort());
urls.add(url);
}
服務更新監聽,當可用服務列表發現變化時需要通知調用端。
try {
ServiceHealthCache serviceHealthCache = ServiceHealthCache.newCache(client, name);
serviceHealthCache.addListener(new ConsulCache.Listener<ServiceHealthKey, ServiceHealth>() {
@Override
public void notify(Map<ServiceHealthKey, ServiceHealth> map) {
logger.info("serviceHealthCache.addListener notify");
RpcClientInvokerCache.clear();
}
});
serviceHealthCache.start();
} catch (Exception e) {
logger.info("serviceHealthCache.start error:",e);
}
由於之前對客戶端的Invoker有緩存,所以當服務列表有變化時需要對緩存信息進行更新。
這里簡單的直接對緩存做清除處理,其實好一點的方法應該只對有變化的做處理。
- RpcClientInvokerCache
對客戶端實例化后的Invoker的緩存類
public class RpcClientInvokerCache {
private static CopyOnWriteArrayList<RpcClientInvoker> connectedHandlers = new CopyOnWriteArrayList<>();
public static CopyOnWriteArrayList<RpcClientInvoker> getConnectedHandlersClone(){
return (CopyOnWriteArrayList<RpcClientInvoker>) RpcClientInvokerCache.getConnectedHandlers().clone();
}
public static void addHandler(RpcClientInvoker handler) {
CopyOnWriteArrayList<RpcClientInvoker> newHandlers = getConnectedHandlersClone();
newHandlers.add(handler);
connectedHandlers=newHandlers;
}
public static CopyOnWriteArrayList<RpcClientInvoker> getConnectedHandlers(){
return connectedHandlers;
}
public static RpcClientInvoker get(int i){
return connectedHandlers.get(i);
}
public static int size(){
return connectedHandlers.size();
}
public static void clear(){
CopyOnWriteArrayList<RpcClientInvoker> newHandlers = getConnectedHandlersClone();
newHandlers.clear();
connectedHandlers=newHandlers;
}
}
- 負載均衡
當同一個接口有多個服務同時提供服務時,客戶端需要有一定的負載均衡機制去決策將客戶端的請求分配給哪一台服務器,這里實現一個簡易的輪詢實現方式。請求次數累加,累加的值與服務列表的大小做取模操作。
代碼中取服務列表的方法有小問題,未按接口信息取,后續再完成
public class RoundRobinLoadbalanceService implements LoadbalanceService {
private AtomicInteger roundRobin = new AtomicInteger(0);
private static final int MAX_VALUE=1000;
private static final int MIN_VALUE=1;
private AtomicInteger getRoundRobinValue(){
if(this.roundRobin.getAndAdd(1)>MAX_VALUE){
this.roundRobin.set(MIN_VALUE);
}
return this.roundRobin;
}
@Override
public int index(int size) {
return (this.getRoundRobinValue().get() + size) % size;
}
}
待完善的功能
- 代碼中取服務列表的方法有小問題,未按接口信息取
- 注冊中心的可用服務地址信息變化時,需要優化為按需更新
- 注冊中心的服務刪除未實現


