
service.thrift
- struct User {
- 1:i64 id,
- 2:string name,
- 3:i64 timestamp,
- 4:bool vip
- }
- service UserService {
- User getById(1:i64 id)
- }
- //windows平台下,將API文件輸出在service目錄下(此目錄需要存在)
- > thrift.exe --gen java -o service service.thrift
- public class UserServiceImpl implements UserService.Iface {
- @Override
- public User getById(long id){
- System.out.println("invoke...id:" + id);
- return new User();//for test
- }
- }
- //read方法逐個讀取字段,按照"索引",最終將"struct"對象封裝完畢.
- //write方法也非常類似,按照"索引"順序逐個輸出到流中.
- while (true){
- schemeField = iprot.readFieldBegin();
- if (schemeField.type == org.apache.thrift.protocol.TType.STOP) {
- break;
- }
- switch (schemeField.id) {
- case 1: // ID
- if (schemeField.type == org.apache.thrift.protocol.TType.I32) {
- struct.id = iprot.readI32();
- struct.setIdIsSet(true);
- } else {
- org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
- }
- break;
- case 2: // NAME
- ..
- }
- }
- //參見:TServiceClient
- //API方法調用時,發送請求數據流
- protected void sendBase(String methodName, TBase args) throws TException {
- oprot_.writeMessageBegin(new TMessage(methodName, TMessageType.CALL, ++seqid_));//首先寫入"方法名稱"和"seqid_"
- args.write(oprot_);//序列化參數
- oprot_.writeMessageEnd();
- oprot_.getTransport().flush();
- }
- protected void receiveBase(TBase result, String methodName) throws TException {
- TMessage msg = iprot_.readMessageBegin();//如果執行有異常
- if (msg.type == TMessageType.EXCEPTION) {
- TApplicationException x = TApplicationException.read(iprot_);
- iprot_.readMessageEnd();
- throw x;
- }//檢測seqid是否一致
- if (msg.seqid != seqid_) {
- throw new TApplicationException(TApplicationException.BAD_SEQUENCE_ID, methodName + " failed: out of sequence response");
- }
- result.read(iprot_);//反序列化
- iprot_.readMessageEnd();
- }
- //參考: TBaseProcessor.java
- @Override
- public boolean process(TProtocol in, TProtocol out) throws TException {
- TMessage msg = in.readMessageBegin();
- ProcessFunction fn = processMap.get(msg.name);//根據方法名,查找"內部類"
- if (fn == null) {
- TProtocolUtil.skip(in, TType.STRUCT);
- in.readMessageEnd();
- TApplicationException x = new TApplicationException(TApplicationException.UNKNOWN_METHOD, "Invalid method name: '"+msg.name+"'");
- out.writeMessageBegin(new TMessage(msg.name, TMessageType.EXCEPTION, msg.seqid));
- x.write(out);//序列化響應結果,直接輸出
- out.writeMessageEnd();
- out.getTransport().flush();
- return true;
- }
- fn.process(msg.seqid, in, out, iface);
- return true;
- }
- public static class getById<I extends Iface> extends org.apache.thrift.ProcessFunction<I, getById_args> {
- public getById() {
- super("getById");//其中getById為標識符
- }
- public getById_args getEmptyArgsInstance() {
- return new getById_args();
- }
- protected boolean isOneway() {
- return false;
- }
- //實際處理方法
- public getById_result getResult(I iface, getById_args args) throws org.apache.thrift.TException {
- getById_result result = new getById_result();
- result.success = iface.getById(args.id);
- return result;
- }
- }
- public static class Processor<I extends Iface> extends org.apache.thrift.TBaseProcessor<I> implements org.apache.thrift.TProcessor {
- public Processor(I iface) {
- super(iface, getProcessMap(new HashMap<String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>>()));
- }
- protected Processor(I iface, Map<String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>> processMap) {
- super(iface, getProcessMap(processMap));
- }
- private static <I extends Iface> Map<String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>> getProcessMap(Map<String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>> processMap) {
- //放入map
- processMap.put("getById", new getById());
- return processMap;
- }
- ....
- }
- TThreadPoolServer
- public void serve() {
- try {
- //啟動服務
- serverTransport_.listen();
- } catch (TTransportException ttx) {
- LOGGER.error("Error occurred during listening.", ttx);
- return;
- }
- // Run the preServe event
- if (eventHandler_ != null) {
- eventHandler_.preServe();
- }
- stopped_ = false;
- setServing(true);
- //循環,直到被關閉
- while (!stopped_) {
- int failureCount = 0;
- try {
- //accept客戶端Socket鏈接,
- //對於每個新鏈接,將會封裝成runnable,並提交給線程或者線程池中運行.
- TTransport client = serverTransport_.accept();
- WorkerProcess wp = new WorkerProcess(client);
- executorService_.execute(wp);
- } catch (TTransportException ttx) {
- if (!stopped_) {
- ++failureCount;
- LOGGER.warn("Transport error occurred during acceptance of message.", ttx);
- }
- }
- }
- //....
- }
- public class UserServiceClient {
- public void startClient() {
- TTransport transport;
- try {
- transport = new TSocket("localhost", 1234);
- TProtocol protocol = new TBinaryProtocol(transport);
- UserService.Client client = new UserService.Client(protocol);
- transport.open();
- User user = client.getById(1000);
- ////
- transport.close();
- } catch (TTransportException e) {
- e.printStackTrace();
- } catch (TException e) {
- e.printStackTrace();
- }
- }
- }
- public class Server {
- public void startServer() {
- try {
- TServerSocket serverTransport = new TServerSocket(1234);
- UserService.Processor process = new Processor(new UserServiceImpl());
- Factory portFactory = new TBinaryProtocol.Factory(true, true);
- Args args = new Args(serverTransport);
- args.processor(process);
- args.protocolFactory(portFactory);
- TServer server = new TThreadPoolServer(args);
- server.serve();
- } catch (TTransportException e) {
- e.printStackTrace();
- }
- }
- }
- <dependencies>
- <dependency>
- <groupId>org.springframework</groupId>
- <artifactId>spring-context</artifactId>
- <version>3.0.7.RELEASE</version>
- </dependency>
- <dependency>
- <groupId>org.apache.zookeeper</groupId>
- <artifactId>zookeeper</artifactId>
- <version>3.4.5</version>
- <!--<exclusions>-->
- <!--<exclusion>-->
- <!--<groupId>log4j</groupId>-->
- <!--<artifactId>log4j</artifactId>-->
- <!--</exclusion>-->
- <!--</exclusions>-->
- </dependency>
- <!--
- <dependency>
- <groupId>com.101tec</groupId>
- <artifactId>zkclient</artifactId>
- <version>0.4</version>
- </dependency>
- -->
- <dependency>
- <groupId>org.apache.thrift</groupId>
- <artifactId>libthrift</artifactId>
- <version>0.9.1</version>
- </dependency>
- <dependency>
- <groupId>org.apache.curator</groupId>
- <artifactId>curator-recipes</artifactId>
- <version>2.3.0</version>
- </dependency>
- <dependency>
- <groupId>commons-pool</groupId>
- <artifactId>commons-pool</artifactId>
- <version>1.6</version>
- </dependency>
- </dependencies>
- <!-- zookeeper -->
- <bean id="thriftZookeeper" class="com.demo.thrift.zookeeper.ZookeeperFactory" destroy-method="close">
- <property name="connectString" value="127.0.0.1:2181"></property>
- <property name="namespace" value="demo/thrift-service"></property>
- </bean>
- <bean id="sericeAddressReporter" class="com.demo.thrift.support.impl.DynamicAddressReporter" destroy-method="close">
- <property name="zookeeper" ref="thriftZookeeper"></property>
- </bean>
- <bean id="userService" class="com.demo.service.UserServiceImpl"/>
- <bean class="com.demo.thrift.ThriftServiceServerFactory" destroy-method="close">
- <property name="service" ref="userService"></property>
- <property name="configPath" value="UserServiceImpl"></property>
- <property name="port" value="9090"></property>
- <property name="addressReporter" ref="sericeAddressReporter"></property>
- </bean>
- public class ThriftServiceServerFactory implements InitializingBean {
- private Integer port;
- private Integer priority = 1;// default
- private Object service;// serice實現類
- private ThriftServerIpTransfer ipTransfer;
- private ThriftServerAddressReporter addressReporter;
- private ServerThread serverThread;
- private String configPath;
- public void setService(Object service) {
- this.service = service;
- }
- public void setPriority(Integer priority) {
- this.priority = priority;
- }
- public void setPort(Integer port) {
- this.port = port;
- }
- public void setIpTransfer(ThriftServerIpTransfer ipTransfer) {
- this.ipTransfer = ipTransfer;
- }
- public void setAddressReporter(ThriftServerAddressReporter addressReporter) {
- this.addressReporter = addressReporter;
- }
- public void setConfigPath(String configPath) {
- this.configPath = configPath;
- }
- @Override
- public void afterPropertiesSet() throws Exception {
- if (ipTransfer == null) {
- ipTransfer = new LocalNetworkIpTransfer();
- }
- String ip = ipTransfer.getIp();
- if (ip == null) {
- throw new NullPointerException("cant find server ip...");
- }
- String hostname = ip + ":" + port + ":" + priority;
- Class serviceClass = service.getClass();
- ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
- Class<?>[] interfaces = serviceClass.getInterfaces();
- if (interfaces.length == 0) {
- throw new IllegalClassFormatException("service-class should implements Iface");
- }
- // reflect,load "Processor";
- Processor processor = null;
- for (Class clazz : interfaces) {
- String cname = clazz.getSimpleName();
- if (!cname.equals("Iface")) {
- continue;
- }
- String pname = clazz.getEnclosingClass().getName() + "$Processor";
- try {
- Class pclass = classLoader.loadClass(pname);
- if (!pclass.isAssignableFrom(Processor.class)) {
- continue;
- }
- Constructor constructor = pclass.getConstructor(clazz);
- processor = (Processor) constructor.newInstance(service);
- break;
- } catch (Exception e) {
- //
- }
- }
- if (processor == null) {
- throw new IllegalClassFormatException("service-class should implements Iface");
- }
- //需要單獨的線程,因為serve方法是阻塞的.
- serverThread = new ServerThread(processor, port);
- serverThread.start();
- // report
- if (addressReporter != null) {
- addressReporter.report(configPath, hostname);
- }
- }
- class ServerThread extends Thread {
- private TServer server;
- ServerThread(Processor processor, int port) throws Exception {
- TServerSocket serverTransport = new TServerSocket(port);
- Factory portFactory = new TBinaryProtocol.Factory(true, true);
- Args args = new Args(serverTransport);
- args.processor(processor);
- args.protocolFactory(portFactory);
- server = new TThreadPoolServer(args);
- }
- @Override
- public void run(){
- try{
- server.serve();
- }catch(Exception e){
- //
- }
- }
- public void stopServer(){
- server.stop();
- }
- }
- public void close() {
- serverThread.stopServer();
- }
- }
- public class DynamicAddressReporter implements ThriftServerAddressReporter {
- private CuratorFramework zookeeper;
- public DynamicAddressReporter(){}
- public DynamicAddressReporter(CuratorFramework zookeeper){
- this.zookeeper = zookeeper;
- }
- public void setZookeeper(CuratorFramework zookeeper) {
- this.zookeeper = zookeeper;
- }
- @Override
- public void report(String service, String address) throws Exception {
- if(zookeeper.getState() == CuratorFrameworkState.LATENT){
- zookeeper.start();
- zookeeper.newNamespaceAwareEnsurePath(service);
- }
- zookeeper.create()
- .creatingParentsIfNeeded()
- .withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
- .forPath(service +"/i_",address.getBytes("utf-8"));
- }
- public void close(){
- zookeeper.close();
- }
- }
5. 測試類
- public class ServiceMain {
- /**
- * @param args
- */
- public static void main(String[] args) {
- try {
- ApplicationContext context = new ClassPathXmlApplicationContext("spring-thrift-server.xml");
- Thread.sleep(3000000);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
- <dependencies>
- <dependency>
- <groupId>org.springframework</groupId>
- <artifactId>spring-context</artifactId>
- <version>3.0.7.RELEASE</version>
- </dependency>
- <dependency>
- <groupId>org.apache.zookeeper</groupId>
- <artifactId>zookeeper</artifactId>
- <version>3.4.5</version>
- <!--<exclusions>-->
- <!--<exclusion>-->
- <!--<groupId>log4j</groupId>-->
- <!--<artifactId>log4j</artifactId>-->
- <!--</exclusion>-->
- <!--</exclusions>-->
- </dependency>
- <!--
- <dependency>
- <groupId>com.101tec</groupId>
- <artifactId>zkclient</artifactId>
- <version>0.4</version>
- </dependency>
- -->
- <dependency>
- <groupId>org.apache.thrift</groupId>
- <artifactId>libthrift</artifactId>
- <version>0.9.1</version>
- </dependency>
- <dependency>
- <groupId>org.apache.curator</groupId>
- <artifactId>curator-recipes</artifactId>
- <version>2.3.0</version>
- </dependency>
- <dependency>
- <groupId>commons-pool</groupId>
- <artifactId>commons-pool</artifactId>
- <version>1.6</version>
- </dependency>
- </dependencies>
2. spring-thrift-client.xml
- <!-- fixedAddress -->
- <!--
- <bean id="userService" class="com.demo.thrift.ThriftServiceClientProxyFactory">
- <property name="service" value="com.demo.service.UserService"></property>
- <property name="serverAddress" value="127.0.0.1:9090:2"></property>
- <property name="maxActive" value="5"></property>
- <property name="idleTime" value="10000"></property>
- </bean>
- -->
- <!-- zookeeper -->
- <bean id="thriftZookeeper" class="com.demo.thrift.zookeeper.ZookeeperFactory" destroy-method="close">
- <property name="connectString" value="127.0.0.1:2181"></property>
- <property name="namespace" value="demo/thrift-service"></property>
- </bean>
- <bean id="userService" class="com.demo.thrift.ThriftServiceClientProxyFactory" destroy-method="close">
- <property name="service" value="com.demo.service.UserService"></property>
- <property name="maxActive" value="5"></property>
- <property name="idleTime" value="1800000"></property>
- <property name="addressProvider">
- <bean class="com.demo.thrift.support.impl.DynamicAddressProvider">
- <property name="configPath" value="UserServiceImpl"></property>
- <property name="zookeeper" ref="thriftZookeeper"></property>
- </bean>
- </property>
- </bean>
3. ThriftServiceClientProxyFactory.java
- @SuppressWarnings("rawtypes")
- public class ThriftServiceClientProxyFactory implements FactoryBean,InitializingBean {
- private String service;
- private String serverAddress;
- private Integer maxActive = 32;//最大活躍連接數
- ////ms,default 3 min,鏈接空閑時間
- //-1,關閉空閑檢測
- private Integer idleTime = 180000;
- private ThriftServerAddressProvider addressProvider;
- private Object proxyClient;
- public void setMaxActive(Integer maxActive) {
- this.maxActive = maxActive;
- }
- public void setIdleTime(Integer idleTime) {
- this.idleTime = idleTime;
- }
- public void setService(String service) {
- this.service = service;
- }
- public void setServerAddress(String serverAddress) {
- this.serverAddress = serverAddress;
- }
- public void setAddressProvider(ThriftServerAddressProvider addressProvider) {
- this.addressProvider = addressProvider;
- }
- private Class objectClass;
- private GenericObjectPool<TServiceClient> pool;
- private PoolOperationCallBack callback = new PoolOperationCallBack() {
- @Override
- public void make(TServiceClient client) {
- System.out.println("create");
- }
- @Override
- public void destroy(TServiceClient client) {
- System.out.println("destroy");
- }
- };
- @Override
- public void afterPropertiesSet() throws Exception {
- if(serverAddress != null){
- addressProvider = new FixedAddressProvider(serverAddress);
- }
- ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
- //加載Iface接口
- objectClass = classLoader.loadClass(service + "$Iface");
- //加載Client.Factory類
- Class<TServiceClientFactory<TServiceClient>> fi = (Class<TServiceClientFactory<TServiceClient>>)classLoader.loadClass(service + "$Client$Factory");
- TServiceClientFactory<TServiceClient> clientFactory = fi.newInstance();
- ThriftClientPoolFactory clientPool = new ThriftClientPoolFactory(addressProvider, clientFactory,callback);
- GenericObjectPool.Config poolConfig = new GenericObjectPool.Config();
- poolConfig.maxActive = maxActive;
- poolConfig.minIdle = 0;
- poolConfig.minEvictableIdleTimeMillis = idleTime;
- poolConfig.timeBetweenEvictionRunsMillis = idleTime/2L;
- pool = new GenericObjectPool<TServiceClient>(clientPool,poolConfig);
- proxyClient = Proxy.newProxyInstance(classLoader,new Class[]{objectClass},new InvocationHandler() {
- @Override
- public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
- //
- TServiceClient client = pool.borrowObject();
- try{
- return method.invoke(client, args);
- }catch(Exception e){
- throw e;
- }finally{
- pool.returnObject(client);
- }
- }
- });
- }
- @Override
- public Object getObject() throws Exception {
- return proxyClient;
- }
- @Override
- public Class<?> getObjectType() {
- return objectClass;
- }
- @Override
- public boolean isSingleton() {
- return true; //To change body of implemented methods use File | Settings | File Templates.
- }
- public void close(){
- if(addressProvider != null){
- addressProvider.close();
- }
- }
- }
4. ThriftClientPoolFactory.java
- /**
- * 連接池,thrift-client for spring
- */
- public class ThriftClientPoolFactory extends BasePoolableObjectFactory<TServiceClient>{
- private final ThriftServerAddressProvider addressProvider;
- private final TServiceClientFactory<TServiceClient> clientFactory;
- private PoolOperationCallBack callback;
- protected ThriftClientPoolFactory(ThriftServerAddressProvider addressProvider,TServiceClientFactory<TServiceClient> clientFactory) throws Exception {
- this.addressProvider = addressProvider;
- this.clientFactory = clientFactory;
- }
- protected ThriftClientPoolFactory(ThriftServerAddressProvider addressProvider,TServiceClientFactory<TServiceClient> clientFactory,PoolOperationCallBack callback) throws Exception {
- this.addressProvider = addressProvider;
- this.clientFactory = clientFactory;
- this.callback = callback;
- }
- @Override
- public TServiceClient makeObject() throws Exception {
- InetSocketAddress address = addressProvider.selector();
- TSocket tsocket = new TSocket(address.getHostName(),address.getPort());
- TProtocol protocol = new TBinaryProtocol(tsocket);
- TServiceClient client = this.clientFactory.getClient(protocol);
- tsocket.open();
- if(callback != null){
- try{
- callback.make(client);
- }catch(Exception e){
- //
- }
- }
- return client;
- }
- public void destroyObject(TServiceClient client) throws Exception {
- if(callback != null){
- try{
- callback.destroy(client);
- }catch(Exception e){
- //
- }
- }
- TTransport pin = client.getInputProtocol().getTransport();
- pin.close();
- }
- public boolean validateObject(TServiceClient client) {
- TTransport pin = client.getInputProtocol().getTransport();
- return pin.isOpen();
- }
- static interface PoolOperationCallBack {
- //銷毀client之前執行
- void destroy(TServiceClient client);
- //創建成功是執行
- void make(TServiceClient client);
- }
- }
5. DynamicAddressProvider.java
- /**
- * 可以動態獲取address地址,方案設計參考
- * 1) 可以間歇性的調用一個web-service來獲取地址
- * 2) 可以使用事件監聽機制,被動的接收消息,來獲取最新的地址(比如基於MQ,nio等)
- * 3) 可以基於zookeeper-watcher機制,獲取最新地址
- * <p/>
- * 本實例,使用zookeeper作為"config"中心,使用apache-curator方法庫來簡化zookeeper開發
- * 如下實現,僅供參考
- */
- public class DynamicAddressProvider implements ThriftServerAddressProvider, InitializingBean {
- private String configPath;
- private PathChildrenCache cachedPath;
- private CuratorFramework zookeeper;
- //用來保存當前provider所接觸過的地址記錄
- //當zookeeper集群故障時,可以使用trace中地址,作為"備份"
- private Set<String> trace = new HashSet<String>();
- private final List<InetSocketAddress> container = new ArrayList<InetSocketAddress>();
- private Queue<InetSocketAddress> inner = new LinkedList<InetSocketAddress>();
- private Object lock = new Object();
- private static final Integer DEFAULT_PRIORITY = 1;
- public void setConfigPath(String configPath) {
- this.configPath = configPath;
- }
- public void setZookeeper(CuratorFramework zookeeper) {
- this.zookeeper = zookeeper;
- }
- @Override
- public void afterPropertiesSet() throws Exception {
- //如果zk尚未啟動,則啟動
- if(zookeeper.getState() == CuratorFrameworkState.LATENT){
- zookeeper.start();
- }
- buildPathChildrenCache(zookeeper, configPath, true);
- cachedPath.start(StartMode.POST_INITIALIZED_EVENT);
- }
- private void buildPathChildrenCache(CuratorFramework client, String path, Boolean cacheData) throws Exception {
- cachedPath = new PathChildrenCache(client, path, cacheData);
- cachedPath.getListenable().addListener(new PathChildrenCacheListener() {
- @Override
- public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
- PathChildrenCacheEvent.Type eventType = event.getType();
- switch (eventType) {
- // case CONNECTION_RECONNECTED:
- //
- // break;
- case CONNECTION_SUSPENDED:
- case CONNECTION_LOST:
- System.out.println("Connection error,waiting...");
- return;
- default:
- //
- }
- //任何節點的時機數據變動,都會rebuild,此處為一個"簡單的"做法.
- cachedPath.rebuild();
- rebuild();
- }
- protected void rebuild() throws Exception {
- List<ChildData> children = cachedPath.getCurrentData();
- if (children == null || children.isEmpty()) {
- //有可能所有的thrift server都與zookeeper斷開了鏈接
- //但是,有可能,thrift client與thrift server之間的網絡是良好的
- //因此此處是否需要清空container,是需要多方面考慮的.
- container.clear();
- System.out.println("thrift server-cluster error....");
- return;
- }
- List<InetSocketAddress> current = new ArrayList<InetSocketAddress>();
- for (ChildData data : children) {
- String address = new String(data.getData(), "utf-8");
- current.addAll(transfer(address));
- trace.add(address);
- }
- Collections.shuffle(current);
- synchronized (lock) {
- container.clear();
- container.addAll(current);
- inner.clear();
- inner.addAll(current);
- }
- }
- });
- }
- private List<InetSocketAddress> transfer(String address){
- String[] hostname = address.split(":");
- Integer priority = DEFAULT_PRIORITY;
- if (hostname.length == 3) {
- priority = Integer.valueOf(hostname[2]);
- }
- String ip = hostname[0];
- Integer port = Integer.valueOf(hostname[1]);
- List<InetSocketAddress> result = new ArrayList<InetSocketAddress>();
- for (int i = 0; i < priority; i++) {
- result.add(new InetSocketAddress(ip, port));
- }
- return result;
- }
- @Override
- public List<InetSocketAddress> getAll() {
- return Collections.unmodifiableList(container);
- }
- @Override
- public synchronized InetSocketAddress selector() {
- if (inner.isEmpty()) {
- if(!container.isEmpty()){
- inner.addAll(container);
- }else if(!trace.isEmpty()){
- synchronized (lock) {
- for(String hostname : trace){
- container.addAll(transfer(hostname));
- }
- Collections.shuffle(container);
- inner.addAll(container);
- }
- }
- }
- return inner.poll();//null
- }
- @Override
- public void close() {
- try {
- cachedPath.close();
- zookeeper.close();
- } catch (Exception e) {
- //
- }
- }
- }
- // *) Client API 調用
- (EchoService.Client)client.echo("hello lilei"); ---(1)
- // *) Service 接口 調用
- (EchoService.Iface)service.echo("hello lilei"); ---(2)
- <dependency>
- <groupId>org.apache.thrift</groupId>
- <artifactId>libthrift</artifactId>
- <version>0.9.2</version>
- </dependency>
- <dependency>
- <groupId>commons-pool</groupId>
- <artifactId>commons-pool</artifactId>
- <version>1.6</version>
- </dependency>
- <dependency>
- <groupId>org.springframework</groupId>
- <artifactId>spring-context</artifactId>
- <version>4.0.9.RELEASE</version>
- </dependency>
- <dependency>
- <groupId>org.apache.zookeeper</groupId>
- <artifactId>zookeeper</artifactId>
- <version>3.4.6</version>
- </dependency>
- <dependency>
- <groupId>org.apache.curator</groupId>
- <artifactId>curator-recipes</artifactId>
- <version>2.7.1</version>
- </dependency>
二、使用zookeeper管理服務節點配置
RPC服務往平台化的方向發展, 會屏蔽掉更多的服務細節(服務的IP地址集群, 集群的擴容和遷移), 只暴露服務接口. 這部分的演化, 使得server端和client端完全的解耦合. 兩者的交互通過ConfigServer(MetaServer)的中介角色來搭線。
注: 該圖源自dubbo的官網
這邊借助Zookeeper來扮演該角色, server扮演發布者的角色, 而client扮演訂閱者的角色.
Zookeeper是分布式應用協作服務. 它實現了paxos的一致性算法, 在命名管理/配置推送/數據同步/主從切換方面扮演重要的角色。 其數據組織類似文件系統的目錄結構:
每個節點被稱為znode, 為znode節點依據其特性, 又可以分為如下類型:
1). PERSISTENT: 永久節點
2). EPHEMERAL: 臨時節點, 會隨session(client disconnect)的消失而消失
3). PERSISTENT_SEQUENTIAL: 永久節點, 其節點的名字編號是單調遞增的
4). EPHEMERAL_SEQUENTIAL: 臨時節點, 其節點的名字編號是單調遞增的
注: 臨時節點不能成為父節點
Watcher觀察模式, client可以注冊對節點的狀態/內容變更的事件回調機制. 其Event事件的兩類屬性需要關注下:
1). KeeperState: Disconnected,SyncConnected,Expired
2). EventType: None,NodeCreated,NodeDeleted,NodeDataChanged,NodeChildrenChanged
RPC服務端:
作為具體業務服務的RPC服務發布方, 對其自身的服務描述由以下元素構成.
1). namespace: 命名空間,來區分不同應用
2). service: 服務接口, 采用發布方的類全名來表示
3). version: 版本號
借鑒了Maven的GAV坐標系, 三維坐標系更符合服務平台化的大環境.
*) 數據模型的設計
具體RPC服務的注冊路徑為: /rpc/{namespace}/{service}/{version}, 該路徑上的節點都是永久節點
RPC服務集群節點的注冊路徑為: /rpc/{namespace}/{service}/{version}/{ip:port:weight}, 末尾的節點是臨時節點.
1.定義Zookeeper的客戶端的管理
- import org.apache.curator.framework.CuratorFramework;
- import org.apache.curator.framework.CuratorFrameworkFactory;
- import org.apache.curator.retry.ExponentialBackoffRetry;
- import org.springframework.beans.factory.FactoryBean;
- import org.springframework.util.StringUtils;
- /**
- * 獲取zookeeper客戶端鏈接
- */
- public class ZookeeperFactory implements FactoryBean<CuratorFramework> {
- private String zkHosts;
- // session超時
- private int sessionTimeout = 30000;
- private int connectionTimeout = 30000;
- // 共享一個zk鏈接
- private boolean singleton = true;
- // 全局path前綴,常用來區分不同的應用
- private String namespace;
- private final static String ROOT = "rpc";
- private CuratorFramework zkClient;
- public void setZkHosts(String zkHosts) {
- this.zkHosts = zkHosts;
- }
- public void setSessionTimeout(int sessionTimeout) {
- this.sessionTimeout = sessionTimeout;
- }
- public void setConnectionTimeout(int connectionTimeout) {
- this.connectionTimeout = connectionTimeout;
- }
- public void setSingleton(boolean singleton) {
- this.singleton = singleton;
- }
- public void setNamespace(String namespace) {
- this.namespace = namespace;
- }
- public void setZkClient(CuratorFramework zkClient) {
- this.zkClient = zkClient;
- }
- @Override
- public CuratorFramework getObject() throws Exception {
- if (singleton) {
- if (zkClient == null) {
- zkClient = create();
- zkClient.start();
- }
- return zkClient;
- }
- return create();
- }
- @Override
- public Class<?> getObjectType() {
- return CuratorFramework.class;
- }
- @Override
- public boolean isSingleton() {
- return singleton;
- }
- public CuratorFramework create() throws Exception {
- if (StringUtils.isEmpty(namespace)) {
- namespace = ROOT;
- } else {
- namespace = ROOT +"/"+ namespace;
- }
- return create(zkHosts, sessionTimeout, connectionTimeout, namespace);
- }
- public static CuratorFramework create(String connectString, int sessionTimeout, int connectionTimeout, String namespace) {
- CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder();
- return builder.connectString(connectString).sessionTimeoutMs(sessionTimeout).connectionTimeoutMs(30000)
- .canBeReadOnly(true).namespace(namespace).retryPolicy(new ExponentialBackoffRetry(1000, Integer.MAX_VALUE))
- .defaultData(null).build();
- }
- public void close() {
- if (zkClient != null) {
- zkClient.close();
- }
- }
- }
2.服務端注冊服務
由於服務端配置需要獲取本機的IP地址,因此定義IP獲取接口
ThriftServerIpResolve.java
- package cn.slimsmart.thrift.rpc.zookeeper;
- /**
- *
- * 解析thrift-server端IP地址,用於注冊服務
- * 1) 可以從一個物理機器或者虛機的特殊文件中解析
- * 2) 可以獲取指定網卡序號的Ip
- * 3) 其他
- */
- public interface ThriftServerIpResolve {
- String getServerIp() throws Exception;
- void reset();
- //當IP變更時,將會調用reset方法
- static interface IpRestCalllBack{
- public void rest(String newIp);
- }
- }
可以對該接口做不通的實現,下面我們基於網卡獲取IP地址,也可以通過配置serverIp
ThriftServerIpLocalNetworkResolve.java
- package cn.slimsmart.thrift.rpc.zookeeper;
- import java.net.Inet6Address;
- import java.net.InetAddress;
- import java.net.NetworkInterface;
- import java.net.SocketException;
- import java.util.Enumeration;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- /**
- * 解析網卡Ip
- *
- */
- public class ThriftServerIpLocalNetworkResolve implements ThriftServerIpResolve {
- private Logger logger = LoggerFactory.getLogger(getClass());
- //緩存
- private String serverIp;
- public void setServerIp(String serverIp) {
- this.serverIp = serverIp;
- }
- @Override
- public String getServerIp() {
- if (serverIp != null) {
- return serverIp;
- }
- // 一個主機有多個網絡接口
- try {
- Enumeration<NetworkInterface> netInterfaces = NetworkInterface.getNetworkInterfaces();
- while (netInterfaces.hasMoreElements()) {
- NetworkInterface netInterface = netInterfaces.nextElement();
- // 每個網絡接口,都會有多個"網絡地址",比如一定會有lookback地址,會有siteLocal地址等.以及IPV4或者IPV6 .
- Enumeration<InetAddress> addresses = netInterface.getInetAddresses();
- while (addresses.hasMoreElements()) {
- InetAddress address = addresses.nextElement();
- if(address instanceof Inet6Address){
- continue;
- }
- if (address.isSiteLocalAddress() && !address.isLoopbackAddress()) {
- serverIp = address.getHostAddress();
- logger.info("resolve server ip :"+ serverIp);
- continue;
- }
- }
- }
- } catch (SocketException e) {
- e.printStackTrace();
- }
- return serverIp;
- }
- @Override
- public void reset() {
- serverIp = null;
- }
- }
接下來我們定義發布服務接口,並實現將服務信息(服務接口、版本號,IP、port、weight)發布到zookeeper中。
ThriftServerAddressRegister.java
- package cn.slimsmart.thrift.rpc.zookeeper;
- /**
- * 發布服務地址及端口到服務注冊中心,這里是zookeeper服務器
- */
- public interface ThriftServerAddressRegister {
- /**
- * 發布服務接口
- * @param service 服務接口名稱,一個產品中不能重復
- * @param version 服務接口的版本號,默認1.0.0
- * @param address 服務發布的地址和端口
- */
- void register(String service,String version,String address);
- }
實現:ThriftServerAddressRegisterZookeeper.java
- package cn.slimsmart.thrift.rpc.zookeeper;
- import java.io.UnsupportedEncodingException;
- import org.apache.curator.framework.CuratorFramework;
- import org.apache.curator.framework.imps.CuratorFrameworkState;
- import org.apache.zookeeper.CreateMode;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.util.StringUtils;
- import cn.slimsmart.thrift.rpc.ThriftException;
- /**
- * 注冊服務列表到Zookeeper
- */
- public class ThriftServerAddressRegisterZookeeper implements ThriftServerAddressRegister{
- private Logger logger = LoggerFactory.getLogger(getClass());
- private CuratorFramework zkClient;
- public ThriftServerAddressRegisterZookeeper(){}
- public ThriftServerAddressRegisterZookeeper(CuratorFramework zkClient){
- this.zkClient = zkClient;
- }
- public void setZkClient(CuratorFramework zkClient) {
- this.zkClient = zkClient;
- }
- @Override
- public void register(String service, String version, String address) {
- if(zkClient.getState() == CuratorFrameworkState.LATENT){
- zkClient.start();
- }
- if(StringUtils.isEmpty(version)){
- version="1.0.0";
- }
- //臨時節點
- try {
- zkClient.create()
- .creatingParentsIfNeeded()
- .withMode(CreateMode.EPHEMERAL)
- .forPath("/"+service+"/"+version+"/"+address);
- } catch (UnsupportedEncodingException e) {
- logger.error("register service address to zookeeper exception:{}",e);
- throw new ThriftException("register service address to zookeeper exception: address UnsupportedEncodingException", e);
- } catch (Exception e) {
- logger.error("register service address to zookeeper exception:{}",e);
- throw new ThriftException("register service address to zookeeper exception:{}", e);
- }
- }
- public void close(){
- zkClient.close();
- }
- }
3.客戶端發現服務
定義獲取服務地址接口
ThriftServerAddressProvider.java
- package cn.slimsmart.thrift.rpc.zookeeper;
- import java.net.InetSocketAddress;
- import java.util.List;
- /**
- * thrift server-service地址提供者,以便構建客戶端連接池
- */
- public interface ThriftServerAddressProvider {
- //獲取服務名稱
- String getService();
- /**
- * 獲取所有服務端地址
- * @return
- */
- List<InetSocketAddress> findServerAddressList();
- /**
- * 選取一個合適的address,可以隨機獲取等'
- * 內部可以使用合適的算法.
- * @return
- */
- InetSocketAddress selector();
- void close();
- }
基於zookeeper服務地址自動發現實現:ThriftServerAddressProviderZookeeper.java
- package cn.slimsmart.thrift.rpc.zookeeper;
- import java.net.InetSocketAddress;
- import java.util.ArrayList;
- import java.util.Collections;
- import java.util.HashSet;
- import java.util.LinkedList;
- import java.util.List;
- import java.util.Queue;
- import java.util.Set;
- import org.apache.curator.framework.CuratorFramework;
- import org.apache.curator.framework.imps.CuratorFrameworkState;
- import org.apache.curator.framework.recipes.cache.ChildData;
- import org.apache.curator.framework.recipes.cache.PathChildrenCache;
- import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;
- import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
- import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.beans.factory.InitializingBean;
- /**
- * 使用zookeeper作為"config"中心,使用apache-curator方法庫來簡化zookeeper開發
- */
- public class ThriftServerAddressProviderZookeeper implements ThriftServerAddressProvider, InitializingBean {
- private Logger logger = LoggerFactory.getLogger(getClass());
- // 注冊服務
- private String service;
- // 服務版本號
- private String version = "1.0.0";
- private PathChildrenCache cachedPath;
- private CuratorFramework zkClient;
- // 用來保存當前provider所接觸過的地址記錄
- // 當zookeeper集群故障時,可以使用trace中地址,作為"備份"
- private Set<String> trace = new HashSet<String>();
- private final List<InetSocketAddress> container = new ArrayList<InetSocketAddress>();
- private Queue<InetSocketAddress> inner = new LinkedList<InetSocketAddress>();
- private Object lock = new Object();
- // 默認權重
- private static final Integer DEFAULT_WEIGHT = 1;
- public void setService(String service) {
- this.service = service;
- }
- public void setVersion(String version) {
- this.version = version;
- }
- public ThriftServerAddressProviderZookeeper() {
- }
- public ThriftServerAddressProviderZookeeper(CuratorFramework zkClient) {
- this.zkClient = zkClient;
- }
- public void setZkClient(CuratorFramework zkClient) {
- this.zkClient = zkClient;
- }
- @Override
- public void afterPropertiesSet() throws Exception {
- // 如果zk尚未啟動,則啟動
- if (zkClient.getState() == CuratorFrameworkState.LATENT) {
- zkClient.start();
- }
- buildPathChildrenCache(zkClient, getServicePath(), true);
- cachedPath.start(StartMode.POST_INITIALIZED_EVENT);
- }
- private String getServicePath(){
- return "/" + service + "/" + version;
- }
- private void buildPathChildrenCache(final CuratorFramework client, String path, Boolean cacheData) throws Exception {
- cachedPath = new PathChildrenCache(client, path, cacheData);
- cachedPath.getListenable().addListener(new PathChildrenCacheListener() {
- @Override
- public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
- PathChildrenCacheEvent.Type eventType = event.getType();
- switch (eventType) {
- case CONNECTION_RECONNECTED:
- logger.info("Connection is reconection.");
- break;
- case CONNECTION_SUSPENDED:
- logger.info("Connection is suspended.");
- break;
- case CONNECTION_LOST:
- logger.warn("Connection error,waiting...");
- return;
- default:
- //
- }
- // 任何節點的時機數據變動,都會rebuild,此處為一個"簡單的"做法.
- cachedPath.rebuild();
- rebuild();
- }
- protected void rebuild() throws Exception {
- List<ChildData> children = cachedPath.getCurrentData();
- if (children == null || children.isEmpty()) {
- // 有可能所有的thrift server都與zookeeper斷開了鏈接
- // 但是,有可能,thrift client與thrift server之間的網絡是良好的
- // 因此此處是否需要清空container,是需要多方面考慮的.
- container.clear();
- logger.error("thrift server-cluster error....");
- return;
- }
- List<InetSocketAddress> current = new ArrayList<InetSocketAddress>();
- String path = null;
- for (ChildData data : children) {
- path = data.getPath();
- logger.debug("get path:"+path);
- path = path.substring(getServicePath().length()+1);
- logger.debug("get serviceAddress:"+path);
- String address = new String(path.getBytes(), "utf-8");
- current.addAll(transfer(address));
- trace.add(address);
- }
- Collections.shuffle(current);
- synchronized (lock) {
- container.clear();
- container.addAll(current);
- inner.clear();
- inner.addAll(current);
- }
- }
- });
- }
- private List<InetSocketAddress> transfer(String address) {
- String[] hostname = address.split(":");
- Integer weight = DEFAULT_WEIGHT;
- if (hostname.length == 3) {
- weight = Integer.valueOf(hostname[2]);
- }
- String ip = hostname[0];
- Integer port = Integer.valueOf(hostname[1]);
- List<InetSocketAddress> result = new ArrayList<InetSocketAddress>();
- // 根據優先級,將ip:port添加多次到地址集中,然后隨機取地址實現負載
- for (int i = 0; i < weight; i++) {
- result.add(new InetSocketAddress(ip, port));
- }
- return result;
- }
- @Override
- public List<InetSocketAddress> findServerAddressList() {
- return Collections.unmodifiableList(container);
- }
- @Override
- public synchronized InetSocketAddress selector() {
- if (inner.isEmpty()) {
- if (!container.isEmpty()) {
- inner.addAll(container);
- } else if (!trace.isEmpty()) {
- synchronized (lock) {
- for (String hostname : trace) {
- container.addAll(transfer(hostname));
- }
- Collections.shuffle(container);
- inner.addAll(container);
- }
- }
- }
- return inner.poll();
- }
- @Override
- public void close() {
- try {
- cachedPath.close();
- zkClient.close();
- } catch (Exception e) {
- }
- }
- @Override
- public String getService() {
- return service;
- }
- }
對此接口還做了一種實現,通過配置獲取服務地址,參考附件:FixedAddressProvider.java
三、服務端服務注冊實現
ThriftServiceServerFactory.java
- package cn.slimsmart.thrift.rpc;
- import java.lang.instrument.IllegalClassFormatException;
- import java.lang.reflect.Constructor;
- import org.apache.thrift.TProcessor;
- import org.apache.thrift.TProcessorFactory;
- import org.apache.thrift.protocol.TBinaryProtocol;
- import org.apache.thrift.server.TServer;
- import org.apache.thrift.server.TThreadedSelectorServer;
- import org.apache.thrift.transport.TFramedTransport;
- import org.apache.thrift.transport.TNonblockingServerSocket;
- import org.springframework.beans.factory.InitializingBean;
- import org.springframework.util.StringUtils;
- import cn.slimsmart.thrift.rpc.zookeeper.ThriftServerAddressRegister;
- import cn.slimsmart.thrift.rpc.zookeeper.ThriftServerIpLocalNetworkResolve;
- import cn.slimsmart.thrift.rpc.zookeeper.ThriftServerIpResolve;
- /**
- * 服務端注冊服務工廠
- */
- public class ThriftServiceServerFactory implements InitializingBean {
- // 服務注冊本機端口
- private Integer port = 8299;
- // 優先級
- private Integer weight = 1;// default
- // 服務實現類
- private Object service;// serice實現類
- //服務版本號
- private String version;
- // 解析本機IP
- private ThriftServerIpResolve thriftServerIpResolve;
- //服務注冊
- private ThriftServerAddressRegister thriftServerAddressRegister;
- private ServerThread serverThread;
- public void setPort(Integer port) {
- this.port = port;
- }
- public void setWeight(Integer weight) {
- this.weight = weight;
- }
- public void setService(Object service) {
- this.service = service;
- }
- public void setVersion(String version) {
- this.version = version;
- }
- public void setThriftServerIpResolve(ThriftServerIpResolve thriftServerIpResolve) {
- this.thriftServerIpResolve = thriftServerIpResolve;
- }
- public void setThriftServerAddressRegister(ThriftServerAddressRegister thriftServerAddressRegister) {
- this.thriftServerAddressRegister = thriftServerAddressRegister;
- }
- @Override
- public void afterPropertiesSet() throws Exception {
- if (thriftServerIpResolve == null) {
- thriftServerIpResolve = new ThriftServerIpLocalNetworkResolve();
- }
- String serverIP = thriftServerIpResolve.getServerIp();
- if (StringUtils.isEmpty(serverIP)) {
- throw new ThriftException("cant find server ip...");
- }
- String hostname = serverIP + ":" + port + ":" + weight;
- Class<?> serviceClass = service.getClass();
- // 獲取實現類接口
- Class<?>[] interfaces = serviceClass.getInterfaces();
- if (interfaces.length == 0) {
- throw new IllegalClassFormatException("service-class should implements Iface");
- }
- // reflect,load "Processor";
- TProcessor processor = null;
- String serviceName = null;
- for (Class<?> clazz : interfaces) {
- String cname = clazz.getSimpleName();
- if (!cname.equals("Iface")) {
- continue;
- }
- serviceName = clazz.getEnclosingClass().getName();
- String pname = serviceName + "$Processor";
- try {
- ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
- Class<?> pclass = classLoader.loadClass(pname);
- if (!TProcessor.class.isAssignableFrom(pclass)) {
- continue;
- }
- Constructor<?> constructor = pclass.getConstructor(clazz);
- processor = (TProcessor) constructor.newInstance(service);
- break;
- } catch (Exception e) {
- //
- }
- }
- if (processor == null) {
- throw new IllegalClassFormatException("service-class should implements Iface");
- }
- //需要單獨的線程,因為serve方法是阻塞的.
- serverThread = new ServerThread(processor, port);
- serverThread.start();
- // 注冊服務
- if (thriftServerAddressRegister != null) {
- thriftServerAddressRegister.register(serviceName, version, hostname);
- }
- }
- class ServerThread extends Thread {
- private TServer server;
- ServerThread(TProcessor processor, int port) throws Exception {
- TNonblockingServerSocket serverTransport = new TNonblockingServerSocket(port);
- TThreadedSelectorServer.Args tArgs = new TThreadedSelectorServer.Args(serverTransport);
- TProcessorFactory processorFactory = new TProcessorFactory(processor);
- tArgs.processorFactory(processorFactory);
- tArgs.transportFactory(new TFramedTransport.Factory());
- tArgs.protocolFactory( new TBinaryProtocol.Factory(true, true));
- server = new TThreadedSelectorServer(tArgs);
- }
- @Override
- public void run(){
- try{
- //啟動服務
- server.serve();
- }catch(Exception e){
- //
- }
- }
- public void stopServer(){
- server.stop();
- }
- }
- public void close() {
- serverThread.stopServer();
- }
- }
四、客戶端獲取服務代理及連接池實現
客戶端連接池實現:ThriftClientPoolFactory.java
- package cn.slimsmart.thrift.rpc;
- import java.net.InetSocketAddress;
- import org.apache.commons.pool.BasePoolableObjectFactory;
- import org.apache.thrift.TServiceClient;
- import org.apache.thrift.TServiceClientFactory;
- import org.apache.thrift.protocol.TBinaryProtocol;
- import org.apache.thrift.protocol.TProtocol;
- import org.apache.thrift.transport.TFramedTransport;
- import org.apache.thrift.transport.TSocket;
- import org.apache.thrift.transport.TTransport;
- import cn.slimsmart.thrift.rpc.zookeeper.ThriftServerAddressProvider;
- /**
- * 連接池,thrift-client for spring
- */
- public class ThriftClientPoolFactory extends BasePoolableObjectFactory<TServiceClient> {
- private final ThriftServerAddressProvider serverAddressProvider;
- private final TServiceClientFactory<TServiceClient> clientFactory;
- private PoolOperationCallBack callback;
- protected ThriftClientPoolFactory(ThriftServerAddressProvider addressProvider, TServiceClientFactory<TServiceClient> clientFactory) throws Exception {
- this.serverAddressProvider = addressProvider;
- this.clientFactory = clientFactory;
- }
- protected ThriftClientPoolFactory(ThriftServerAddressProvider addressProvider, TServiceClientFactory<TServiceClient> clientFactory,
- PoolOperationCallBack callback) throws Exception {
- this.serverAddressProvider = addressProvider;
- this.clientFactory = clientFactory;
- this.callback = callback;
- }
- static interface PoolOperationCallBack {
- // 銷毀client之前執行
- void destroy(TServiceClient client);
- // 創建成功是執行
- void make(TServiceClient client);
- }
- public void destroyObject(TServiceClient client) throws Exception {
- if (callback != null) {
- try {
- callback.destroy(client);
- } catch (Exception e) {
- //
- }
- }
- TTransport pin = client.getInputProtocol().getTransport();
- pin.close();
- }
- public boolean validateObject(TServiceClient client) {
- TTransport pin = client.getInputProtocol().getTransport();
- return pin.isOpen();
- }
- @Override
- public TServiceClient makeObject() throws Exception {
- InetSocketAddress address = serverAddressProvider.selector();
- TSocket tsocket = new TSocket(address.getHostName(), address.getPort());
- TTransport transport = new TFramedTransport(tsocket);
- TProtocol protocol = new TBinaryProtocol(transport);
- TServiceClient client = this.clientFactory.getClient(protocol);
- transport.open();
- if (callback != null) {
- try {
- callback.make(client);
- } catch (Exception e) {
- //
- }
- }
- return client;
- }
- }
客戶端服務代理工廠實現:ThriftServiceClientProxyFactory.java
- package cn.slimsmart.thrift.rpc;
- import java.lang.reflect.InvocationHandler;
- import java.lang.reflect.Method;
- import java.lang.reflect.Proxy;
- import org.apache.commons.pool.impl.GenericObjectPool;
- import org.apache.thrift.TServiceClient;
- import org.apache.thrift.TServiceClientFactory;
- import org.springframework.beans.factory.FactoryBean;
- import org.springframework.beans.factory.InitializingBean;
- import cn.slimsmart.thrift.rpc.ThriftClientPoolFactory.PoolOperationCallBack;
- import cn.slimsmart.thrift.rpc.zookeeper.ThriftServerAddressProvider;
- /**
- * 客戶端代理
- */
- @SuppressWarnings({ "unchecked", "rawtypes" })
- public class ThriftServiceClientProxyFactory implements FactoryBean, InitializingBean {
- private Integer maxActive = 32;// 最大活躍連接數
- // ms,default 3 min,鏈接空閑時間
- // -1,關閉空閑檢測
- private Integer idleTime = 180000;
- private ThriftServerAddressProvider serverAddressProvider;
- private Object proxyClient;
- private Class<?> objectClass;
- private GenericObjectPool<TServiceClient> pool;
- private PoolOperationCallBack callback = new PoolOperationCallBack() {
- @Override
- public void make(TServiceClient client) {
- System.out.println("create");
- }
- @Override
- public void destroy(TServiceClient client) {
- System.out.println("destroy");
- }
- };
- public void setMaxActive(Integer maxActive) {
- this.maxActive = maxActive;
- }
- public void setIdleTime(Integer idleTime) {
- this.idleTime = idleTime;
- }
- public void setServerAddressProvider(ThriftServerAddressProvider serverAddressProvider) {
- this.serverAddressProvider = serverAddressProvider;
- }
- @Override
- public void afterPropertiesSet() throws Exception {
- ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
- // 加載Iface接口
- objectClass = classLoader.loadClass(serverAddressProvider.getService() + "$Iface");
- // 加載Client.Factory類
- Class<TServiceClientFactory<TServiceClient>> fi = (Class<TServiceClientFactory<TServiceClient>>) classLoader.loadClass(serverAddressProvider.getService() + "$Client$Factory");
- TServiceClientFactory<TServiceClient> clientFactory = fi.newInstance();
- ThriftClientPoolFactory clientPool = new ThriftClientPoolFactory(serverAddressProvider, clientFactory, callback);
- GenericObjectPool.Config poolConfig = new GenericObjectPool.Config();
- poolConfig.maxActive = maxActive;
- poolConfig.minIdle = 0;
- poolConfig.minEvictableIdleTimeMillis = idleTime;
- poolConfig.timeBetweenEvictionRunsMillis = idleTime / 2L;
- pool = new GenericObjectPool<TServiceClient>(clientPool, poolConfig);
- proxyClient = Proxy.newProxyInstance(classLoader, new Class[] { objectClass }, new InvocationHandler() {
- @Override
- public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
- //
- TServiceClient client = pool.borrowObject();
- try {
- return method.invoke(client, args);
- } catch (Exception e) {
- throw e;
- } finally {
- pool.returnObject(client);
- }
- }
- });
- }
- @Override
- public Object getObject() throws Exception {
- return proxyClient;
- }
- @Override
- public Class<?> getObjectType() {
- return objectClass;
- }
- @Override
- public boolean isSingleton() {
- return true;
- }
- public void close() {
- if (serverAddressProvider != null) {
- serverAddressProvider.close();
- }
- }
- }
下面我們看一下服務端和客戶端的配置;
服務端spring-context-thrift-server.xml
- <?xml version="1.0" encoding="UTF-8"?>
- <beans xmlns="http://www.springframework.org/schema/beans"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
- xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
- xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
- http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd
- http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.0.xsd
- http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.0.xsd"
- default-lazy-init="false">
- <!-- zookeeper -->
- <bean id="thriftZookeeper" class="cn.slimsmart.thrift.rpc.zookeeper.ZookeeperFactory"
- destroy-method="close">
- <property name="zkHosts"
- value="192.168.36.54:2181,192.168.36.99:2181,192.168.36.189:2181" />
- <property name="namespace" value="cn.slimsmart.thrift.rpc.demo" />
- <property name="connectionTimeout" value="3000" />
- <property name="sessionTimeout" value="3000" />
- <property name="singleton" value="true" />
- </bean>
- <bean id="sericeAddressRegister"
- class="cn.slimsmart.thrift.rpc.zookeeper.ThriftServerAddressRegisterZookeeper"
- destroy-method="close">
- <property name="zkClient" ref="thriftZookeeper" />
- </bean>
- <bean id="echoSerivceImpl" class="cn.slimsmart.thrift.rpc.demo.EchoSerivceImpl" />
- <bean id="echoSerivce" class="cn.slimsmart.thrift.rpc.ThriftServiceServerFactory"
- destroy-method="close">
- <property name="service" ref="echoSerivceImpl" />
- <property name="port" value="9000" />
- <property name="version" value="1.0.0" />
- <property name="weight" value="1" />
- <property name="thriftServerAddressRegister" ref="sericeAddressRegister" />
- </bean>
- <bean id="echoSerivce1" class="cn.slimsmart.thrift.rpc.ThriftServiceServerFactory"
- destroy-method="close">
- <property name="service" ref="echoSerivceImpl" />
- <property name="port" value="9001" />
- <property name="version" value="1.0.0" />
- <property name="weight" value="1" />
- <property name="thriftServerAddressRegister" ref="sericeAddressRegister" />
- </bean>
- <bean id="echoSerivce2" class="cn.slimsmart.thrift.rpc.ThriftServiceServerFactory"
- destroy-method="close">
- <property name="service" ref="echoSerivceImpl" />
- <property name="port" value="9002" />
- <property name="version" value="1.0.0" />
- <property name="weight" value="1" />
- <property name="thriftServerAddressRegister" ref="sericeAddressRegister" />
- </bean>
- </beans>
客戶端:spring-context-thrift-client.xml
- <?xml version="1.0" encoding="UTF-8"?>
- <beans xmlns="http://www.springframework.org/schema/beans"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
- xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
- xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
- http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd
- http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.0.xsd
- http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.0.xsd"
- default-lazy-init="false">
- <!-- fixedAddress -->
- <!--
- <bean id="fixedAddressProvider" class="cn.slimsmart.thrift.rpc.zookeeper.FixedAddressProvider">
- <property name="service" value="cn.slimsmart.thrift.rpc.demo.EchoSerivce" />
- <property name="serverAddress" value="192.168.36.215:9001:1,192.168.36.215:9002:2,192.168.36.215:9003:3" />
- </bean>
- <bean id="echoSerivce" class="cn.slimsmart.thrift.rpc.ThriftServiceClientProxyFactory">
- <property name="maxActive" value="5" />
- <property name="idleTime" value="10000" />
- <property name="serverAddressProvider" ref="fixedAddressProvider" />
- </bean>
- -->
- <!-- zookeeper -->
- <bean id="thriftZookeeper" class="cn.slimsmart.thrift.rpc.zookeeper.ZookeeperFactory"
- destroy-method="close">
- <property name="zkHosts"
- value="192.168.36.54:2181,192.168.36.99:2181,192.168.36.189:2181" />
- <property name="namespace" value="cn.slimsmart.thrift.rpc.demo" />
- <property name="connectionTimeout" value="3000" />
- <property name="sessionTimeout" value="3000" />
- <property name="singleton" value="true" />
- </bean>
- <bean id="echoSerivce" class="cn.slimsmart.thrift.rpc.ThriftServiceClientProxyFactory" destroy-method="close">
- <property name="maxActive" value="5" />
- <property name="idleTime" value="1800000" />
- <property name="serverAddressProvider">
- <bean class="cn.slimsmart.thrift.rpc.zookeeper.ThriftServerAddressProviderZookeeper">
- <property name="service" value="cn.slimsmart.thrift.rpc.demo.EchoSerivce" />
- <property name="version" value="1.0.0" />
- <property name="zkClient" ref="thriftZookeeper" />
- </bean>
- </property>
- </bean>
- </beans>