一、問題描述
在上一篇《由淺入深了解Thrift之服務模型和序列化機制》文章中,我們已經了解了thrift的基本架構和網絡服務模型的優缺點。如今的互聯網圈中,RPC服務化的思想如火如荼。我們又該如何將thrift服務化應用到我們的項目中哪?實現thrift服務化前,我們先想想這幾個問題:服務注冊、服務發現、服務健康檢測、服務“Load Balance”、隱藏client和server端的交互細節、服務調用端的對象池化。
- 服務的注冊、發現和健康檢測,我們使用zookeeper可以很好的解決
- 服務“Load Balance",我們可以使用簡單的算法“權重+隨機”,當然也可以使用成熟復雜的算法
- 服務調用端的對象池化,我們可以使用common pool,使用簡單又可以滿足我們的需求
二、實現思路
1、thrift server端啟動時,每個實例向zk集群以臨時節點方式注冊(這樣,遍歷zk上/server下有多少個臨時節點就知道有哪些server實例)
thrift server端可以單機多端口多實例或多機部署多實例方式運行。
2、服務調用方實現一個連接池,連接池初始化時,通過zk將在線的server實例信息同步到本地並緩存,同時監聽zk下的節點變化。
3、服務調用方與Server通訊時,從連接池中取一個可用的連接,用它實現RPC調用。
三、具體實現
1、thrift server端
thrift server端,向zk中注冊server address

package com.wy.thriftpool.commzkpool; import java.lang.instrument.IllegalClassFormatException; import java.lang.reflect.Constructor; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.protocol.TBinaryProtocol.Factory; import org.apache.thrift.server.TServer; import org.apache.thrift.server.TThreadedSelectorServer; import org.apache.thrift.transport.TFramedTransport; import org.apache.thrift.transport.TNonblockingServerSocket; import org.springframework.beans.factory.InitializingBean; import com.wy.thrift.service.UserService.Processor; import com.wy.thriftpool.commzkpool.support.ThriftServerAddressReporter; import com.wy.thriftpool.commzkpool.support.ThriftServerIpTransfer; import com.wy.thriftpool.commzkpool.support.impl.LocalNetworkIpTransfer; /** * thrift server端,向zk中注冊server address * * @author wy * */ public class ThriftServiceServerFactory implements InitializingBean { // thrift server 服務端口 private Integer port; // default 權重 private Integer priority = 1; // service實現類 private Object service; // thrift server 注冊路徑 private String configPath; private ThriftServerIpTransfer ipTransfer; // thrift server注冊類 private ThriftServerAddressReporter addressReporter; // thrift server開啟服務 private ServerThread serverThread; @Override public void afterPropertiesSet() throws Exception { if (ipTransfer == null) { ipTransfer = new LocalNetworkIpTransfer(); } String ip = ipTransfer.getIp(); if (ip == null) { throw new NullPointerException("cant find server ip..."); } String hostname = ip + ":" + port + ":" + priority; Class<? extends Object> serviceClass = service.getClass(); ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); Class<?>[] interfaces = serviceClass.getInterfaces(); if (interfaces.length == 0) { throw new IllegalClassFormatException("service-class should implements Iface"); } // reflect,load "Processor"; Processor<?> processor = null; for (Class<?> clazz : interfaces) { String cname = clazz.getSimpleName(); if (!cname.equals("Iface")) { continue; } String pname = clazz.getEnclosingClass().getName() + "$Processor"; try { Class<?> pclass = classLoader.loadClass(pname); if (!pclass.isAssignableFrom(Processor.class)) { continue; } Constructor<?> constructor = pclass.getConstructor(clazz); processor = (Processor<?>) constructor.newInstance(service); break; } catch (Exception e) { // TODO } } if (processor == null) { throw new IllegalClassFormatException("service-class should implements Iface"); } // 需要單獨的線程,因為serve方法是阻塞的. serverThread = new ServerThread(processor, port); serverThread.start(); // report if (addressReporter != null) { addressReporter.report(configPath, hostname); } } class ServerThread extends Thread { private TServer server; ServerThread(Processor<?> processor, int port) throws Exception { // 設置傳輸通道 TNonblockingServerSocket serverTransport = new TNonblockingServerSocket(port); // 設置二進制協議 Factory protocolFactory = new TBinaryProtocol.Factory(); TThreadedSelectorServer.Args tArgs = new TThreadedSelectorServer.Args(serverTransport); tArgs.processor(processor); tArgs.transportFactory(new TFramedTransport.Factory()); tArgs.protocolFactory(protocolFactory); int num = Runtime.getRuntime().availableProcessors() * 2 + 1; tArgs.selectorThreads(num); tArgs.workerThreads(num * 10); // 網絡服務模型 server = new TThreadedSelectorServer(tArgs); } @Override public void run() { try { server.serve(); } catch (Exception e) { //TODO } } public void stopServer() { server.stop(); } } public void close() { serverThread.stopServer(); } public void setService(Object service) { this.service = service; } public void setPriority(Integer priority) { this.priority = priority; } public void setPort(Integer port) { this.port = port; } public void setIpTransfer(ThriftServerIpTransfer ipTransfer) { this.ipTransfer = ipTransfer; } public void setAddressReporter(ThriftServerAddressReporter addressReporter) { this.addressReporter = addressReporter; } public void setConfigPath(String configPath) { this.configPath = configPath; } }
thrift server address注冊到zk

package com.wy.thriftpool.commzkpool.support.impl; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.imps.CuratorFrameworkState; import org.apache.zookeeper.CreateMode; import com.wy.thriftpool.commzkpool.support.ThriftServerAddressReporter; /** * thrift server address注冊到zk * * @author wy * */ public class DynamicAddressReporter implements ThriftServerAddressReporter { private CuratorFramework zookeeper; public DynamicAddressReporter() { } public DynamicAddressReporter(CuratorFramework zookeeper) { this.zookeeper = zookeeper; } public void setZookeeper(CuratorFramework zookeeper) { this.zookeeper = zookeeper; } @Override public void report(String service, String address) throws Exception { if (zookeeper.getState() == CuratorFrameworkState.LATENT) { zookeeper.start(); zookeeper.newNamespaceAwareEnsurePath(service); } zookeeper.create().creatingParentsIfNeeded() .withMode(CreateMode.EPHEMERAL_SEQUENTIAL) .forPath(service + "/i_", address.getBytes("utf-8")); } public void close() { zookeeper.close(); } }
。。。
spring配置文件

<!-- zookeeper --> <bean id="thriftZookeeper" class="com.wy.thriftpool.commzkpool.zookeeper.ZookeeperFactory" destroy-method="close"> <property name="connectString" value="127.0.0.1:2181"></property> <property name="namespace" value="thrift/thrift-service"></property> </bean> <bean id="serviceAddressReporter" class="com.wy.thriftpool.commzkpool.support.impl.DynamicAddressReporter" destroy-method="close"> <property name="zookeeper" ref="thriftZookeeper"></property> </bean> <bean id="userService" class="com.wy.thrift.service.UserServiceImpl"/> <bean class="com.wy.thriftpool.commzkpool.ThriftServiceServerFactory" destroy-method="close"> <property name="service" ref="userService"></property> <property name="configPath" value="UserServiceImpl"></property> <property name="port" value="9090"></property> <property name="addressReporter" ref="serviceAddressReporter"></property> </bean>
2、服務調用端
連接池實現
杯了個具,為啥就不能提交。代碼在評論中。
連接池工廠,負責與Thrift server通信

package com.wy.thriftpool.commzkconnpool; import java.net.InetSocketAddress; import org.apache.commons.pool.PoolableObjectFactory; import org.apache.thrift.transport.TSocket; import org.apache.thrift.transport.TTransport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.wy.thriftpool.commzkpool.support.ThriftServerAddressProvider; /** * 連接池工廠,負責與Thrift server通信 * * @author wy * */ public class ThriftPoolFactory implements PoolableObjectFactory<TTransport> { private final Logger logger = LoggerFactory.getLogger(getClass()); // 超時設置 public int timeOut; private final ThriftServerAddressProvider addressProvider; private PoolOperationCallBack callback; public ThriftPoolFactory(ThriftServerAddressProvider addressProvider, PoolOperationCallBack callback) { super(); this.addressProvider = addressProvider; this.callback = callback; } public ThriftPoolFactory(ThriftServerAddressProvider addressProvider, PoolOperationCallBack callback, int timeOut) { super(); this.addressProvider = addressProvider; this.callback = callback; this.timeOut = timeOut; } /** * 創建對象 */ @Override public TTransport makeObject() throws Exception { try { InetSocketAddress address = addressProvider.selector(); TTransport transport = new TSocket(address.getHostName(), address.getPort(), this.timeOut); transport.open(); if (callback != null) { callback.make(transport); } return transport; } catch (Exception e) { logger.error("creat transport error:", e); throw new RuntimeException(e); } } /** * 銷毀對象 */ @Override public void destroyObject(TTransport transport) throws Exception { if (transport != null && transport.isOpen()) { transport.close(); } } /** * 檢驗對象是否可以由pool安全返回 */ @Override public boolean validateObject(TTransport transport) { try { if (transport != null && transport instanceof TSocket) { TSocket thriftSocket = (TSocket) transport; if (thriftSocket.isOpen()) { return true; } else { return false; } } else { return false; } } catch (Exception e) { return false; } } @Override public void activateObject(TTransport obj) throws Exception { // TODO Auto-generated method stub } @Override public void passivateObject(TTransport obj) throws Exception { // TODO Auto-generated method stub } public static interface PoolOperationCallBack { // 創建成功是執行 void make(TTransport transport); // 銷毀之前執行 void destroy(TTransport transport); } }
連接池管理

package com.wy.thriftpool.commzkconnpool; import org.apache.thrift.transport.TSocket; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; /** * 連接池管理 * * @author wy * */ @Service public class ConnectionManager { private final Logger logger = LoggerFactory.getLogger(getClass()); // 保存local對象 ThreadLocal<TSocket> socketThreadSafe = new ThreadLocal<TSocket>(); // 連接提供池 @Autowired private ConnectionProvider connectionProvider; public TSocket getSocket() { TSocket socket = null; try { socket = connectionProvider.getConnection(); socketThreadSafe.set(socket); return socketThreadSafe.get(); } catch (Exception e) { logger.error("error ConnectionManager.invoke()", e); } finally { connectionProvider.returnCon(socket); socketThreadSafe.remove(); } return socket; } }
spring配置文件

<!-- zookeeper --> <bean id="thriftZookeeper" class="com.wy.thriftpool.commzkpool.zookeeper.ZookeeperFactory" destroy-method="close"> <property name="connectString" value="127.0.0.1:2181" /> <property name="namespace" value="thrift/thrift-service" /> </bean> <bean id="connectionProvider" class="com.wy.thriftpool.commzkconnpool.impl.ConnectionProviderImpl"> <property name="maxActive" value="10" /> <property name="maxIdle" value="10" /> <property name="conTimeOut" value="2000" /> <property name="testOnBorrow" value="true" /> <property name="testOnReturn" value="true" /> <property name="testWhileIdle" value="true" /> <property name="addressProvider"> <bean class="com.wy.thriftpool.commzkpool.support.impl.DynamicAddressProvider"> <property name="configPath" value="UserServiceImpl" /> <property name="zookeeper" ref="thriftZookeeper" /> </bean> </property> </bean>
參考:http://www.cnblogs.com/mumuxinfei/p/3876187.html
由於本人經驗有限,文章中難免會有錯誤,請瀏覽文章的您指正或有不同的觀點共同探討!