由淺入深了解Thrift之客戶端連接池化


一、問題描述                                                                                        

    在上一篇《由淺入深了解Thrift之服務模型和序列化機制》文章中,我們已經了解了thrift的基本架構和網絡服務模型的優缺點。如今的互聯網圈中,RPC服務化的思想如火如荼。我們又該如何將thrift服務化應用到我們的項目中哪?實現thrift服務化前,我們先想想這幾個問題:服務注冊、服務發現、服務健康檢測、服務“Load Balance”、隱藏client和server端的交互細節、服務調用端的對象池化。

  • 服務的注冊、發現和健康檢測,我們使用zookeeper可以很好的解決
  • 服務“Load Balance",我們可以使用簡單的算法“權重+隨機”,當然也可以使用成熟復雜的算法
  • 服務調用端的對象池化,我們可以使用common pool,使用簡單又可以滿足我們的需求   

二、實現思路                                                                                        

    1、thrift server端啟動時,每個實例向zk集群以臨時節點方式注冊(這樣,遍歷zk上/server下有多少個臨時節點就知道有哪些server實例)

        thrift server端可以單機多端口多實例或多機部署多實例方式運行。

   2、服務調用方實現一個連接池,連接池初始化時,通過zk將在線的server實例信息同步到本地並緩存,同時監聽zk下的節點變化。

   3、服務調用方與Server通訊時,從連接池中取一個可用的連接,用它實現RPC調用。

                 

三、具體實現                                                                                       

   1、thrift server端   

      thrift server端,向zk中注冊server address

package com.wy.thriftpool.commzkpool;

import java.lang.instrument.IllegalClassFormatException;
import java.lang.reflect.Constructor;

import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TBinaryProtocol.Factory;
import org.apache.thrift.server.TServer;
import org.apache.thrift.server.TThreadedSelectorServer;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TNonblockingServerSocket;
import org.springframework.beans.factory.InitializingBean;

import com.wy.thrift.service.UserService.Processor;
import com.wy.thriftpool.commzkpool.support.ThriftServerAddressReporter;
import com.wy.thriftpool.commzkpool.support.ThriftServerIpTransfer;
import com.wy.thriftpool.commzkpool.support.impl.LocalNetworkIpTransfer;

/**
 * thrift server端,向zk中注冊server address
 * 
 * @author wy
 * 
 */
public class ThriftServiceServerFactory implements InitializingBean {

    // thrift server 服務端口
    private Integer port;
    // default 權重
    private Integer priority = 1;
    // service實現類
    private Object service;
    // thrift server 注冊路徑
    private String configPath;

    private ThriftServerIpTransfer ipTransfer;
    // thrift server注冊類
    private ThriftServerAddressReporter addressReporter;
    // thrift server開啟服務
    private ServerThread serverThread;

    @Override
    public void afterPropertiesSet() throws Exception {
        if (ipTransfer == null) {
            ipTransfer = new LocalNetworkIpTransfer();
        }
        String ip = ipTransfer.getIp();
        if (ip == null) {
            throw new NullPointerException("cant find server ip...");
        }
        String hostname = ip + ":" + port + ":" + priority;
        Class<? extends Object> serviceClass = service.getClass();
        ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
        Class<?>[] interfaces = serviceClass.getInterfaces();
        if (interfaces.length == 0) {
            throw new IllegalClassFormatException("service-class should implements Iface");
        }

        // reflect,load "Processor";
        Processor<?> processor = null;
        for (Class<?> clazz : interfaces) {
            String cname = clazz.getSimpleName();
            if (!cname.equals("Iface")) {
                continue;
            }
            String pname = clazz.getEnclosingClass().getName() + "$Processor";
            try {
                Class<?> pclass = classLoader.loadClass(pname);
                if (!pclass.isAssignableFrom(Processor.class)) {
                    continue;
                }
                Constructor<?> constructor = pclass.getConstructor(clazz);
                processor = (Processor<?>) constructor.newInstance(service);
                break;
            } catch (Exception e) {
                // TODO
            }
        }

        if (processor == null) {
            throw new IllegalClassFormatException("service-class should implements Iface");
        }
        // 需要單獨的線程,因為serve方法是阻塞的.
        serverThread = new ServerThread(processor, port);
        serverThread.start();
        // report
        if (addressReporter != null) {
            addressReporter.report(configPath, hostname);
        }
    }

    class ServerThread extends Thread {
        private TServer server;

        ServerThread(Processor<?> processor, int port) throws Exception {
            // 設置傳輸通道
            TNonblockingServerSocket serverTransport = new TNonblockingServerSocket(port);
            // 設置二進制協議
            Factory protocolFactory = new TBinaryProtocol.Factory();
            
            TThreadedSelectorServer.Args tArgs = new TThreadedSelectorServer.Args(serverTransport);
            tArgs.processor(processor);
            tArgs.transportFactory(new TFramedTransport.Factory());
            tArgs.protocolFactory(protocolFactory);
            int num = Runtime.getRuntime().availableProcessors() * 2 + 1;
            tArgs.selectorThreads(num);
            tArgs.workerThreads(num * 10);
            
            // 網絡服務模型
            server = new TThreadedSelectorServer(tArgs);
            
        }

        @Override
        public void run() {
            try {
                server.serve();
            } catch (Exception e) {
                //TODO
            }
        }

        public void stopServer() {
            server.stop();
        }
    }

    public void close() {
        serverThread.stopServer();
    }

    public void setService(Object service) {
        this.service = service;
    }

    public void setPriority(Integer priority) {
        this.priority = priority;
    }

    public void setPort(Integer port) {
        this.port = port;
    }

    public void setIpTransfer(ThriftServerIpTransfer ipTransfer) {
        this.ipTransfer = ipTransfer;
    }

    public void setAddressReporter(ThriftServerAddressReporter addressReporter) {
        this.addressReporter = addressReporter;
    }

    public void setConfigPath(String configPath) {
        this.configPath = configPath;
    }
}
View Code

      thrift server address注冊到zk   

package com.wy.thriftpool.commzkpool.support.impl;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.zookeeper.CreateMode;

import com.wy.thriftpool.commzkpool.support.ThriftServerAddressReporter;

/**
 * thrift server address注冊到zk
 * 
 * @author wy
 *
 */
public class DynamicAddressReporter implements ThriftServerAddressReporter {

    private CuratorFramework zookeeper;

    public DynamicAddressReporter() {
    }

    public DynamicAddressReporter(CuratorFramework zookeeper) {
        this.zookeeper = zookeeper;
    }

    public void setZookeeper(CuratorFramework zookeeper) {
        this.zookeeper = zookeeper;
    }

    @Override
    public void report(String service, String address) throws Exception {
        if (zookeeper.getState() == CuratorFrameworkState.LATENT) {
            zookeeper.start();
            zookeeper.newNamespaceAwareEnsurePath(service);
        }
        zookeeper.create().creatingParentsIfNeeded()
                .withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
                .forPath(service + "/i_", address.getBytes("utf-8"));
    }

    public void close() {
        zookeeper.close();
    }

}
View Code

      。。。

      spring配置文件

<!-- zookeeper -->
    <bean id="thriftZookeeper" class="com.wy.thriftpool.commzkpool.zookeeper.ZookeeperFactory" destroy-method="close">
        <property name="connectString" value="127.0.0.1:2181"></property>
        <property name="namespace" value="thrift/thrift-service"></property>
    </bean>
    
    <bean id="serviceAddressReporter" class="com.wy.thriftpool.commzkpool.support.impl.DynamicAddressReporter" destroy-method="close">
        <property name="zookeeper" ref="thriftZookeeper"></property>
    </bean>
    
    <bean id="userService" class="com.wy.thrift.service.UserServiceImpl"/>
    
    <bean class="com.wy.thriftpool.commzkpool.ThriftServiceServerFactory" destroy-method="close">
        <property name="service" ref="userService"></property>
        <property name="configPath" value="UserServiceImpl"></property>
        <property name="port" value="9090"></property>
        <property name="addressReporter" ref="serviceAddressReporter"></property>
    </bean>
View Code

   2、服務調用端

      連接池實現  

     杯了個具,為啥就不能提交。代碼在評論中。

   連接池工廠,負責與Thrift server通信 

package com.wy.thriftpool.commzkconnpool;

import java.net.InetSocketAddress;

import org.apache.commons.pool.PoolableObjectFactory;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.wy.thriftpool.commzkpool.support.ThriftServerAddressProvider;

/**
 * 連接池工廠,負責與Thrift server通信
 * 
 * @author wy
 *
 */
public class ThriftPoolFactory implements PoolableObjectFactory<TTransport> {
    private final Logger logger = LoggerFactory.getLogger(getClass());
    // 超時設置 
    public int timeOut;
    
    private final ThriftServerAddressProvider addressProvider;
    private PoolOperationCallBack callback;
    
    public ThriftPoolFactory(ThriftServerAddressProvider addressProvider, PoolOperationCallBack callback) {
        super();
        this.addressProvider = addressProvider;
        this.callback = callback;
    }
    
    public ThriftPoolFactory(ThriftServerAddressProvider addressProvider, PoolOperationCallBack callback, int timeOut) {
        super();
        this.addressProvider = addressProvider;
        this.callback = callback;
        this.timeOut = timeOut;
    }

    /**
     * 創建對象
     */
    @Override
    public TTransport makeObject() throws Exception {
        try {
            InetSocketAddress address = addressProvider.selector();
            TTransport transport = new TSocket(address.getHostName(), address.getPort(), this.timeOut);
            transport.open();
            if (callback != null) {
                callback.make(transport);
            }
            return transport;
        } catch (Exception e) {
            logger.error("creat transport error:", e);
            throw new RuntimeException(e);
        }
    }

    /**
     * 銷毀對象
     */
    @Override
    public void destroyObject(TTransport transport) throws Exception {
        if (transport != null && transport.isOpen()) {
            transport.close();
        }
    }

    /**
     * 檢驗對象是否可以由pool安全返回
     */
    @Override
    public boolean validateObject(TTransport transport) {
        try {
            if (transport != null && transport instanceof TSocket) {
                TSocket thriftSocket = (TSocket) transport;
                if (thriftSocket.isOpen()) {
                    return true;
                } else {
                    return false;
                }
            } else {
                return false;
            }
        } catch (Exception e) {
            return false;
        }
    }

    @Override
    public void activateObject(TTransport obj) throws Exception {
        // TODO Auto-generated method stub
        
    }

    @Override
    public void passivateObject(TTransport obj) throws Exception {
        // TODO Auto-generated method stub
        
    }

    public static interface PoolOperationCallBack {
        // 創建成功是執行
        void make(TTransport transport);

        // 銷毀之前執行
        void destroy(TTransport transport);
    }
}
View Code

      連接池管理 

package com.wy.thriftpool.commzkconnpool;

import org.apache.thrift.transport.TSocket;  
import org.slf4j.Logger;  
import org.slf4j.LoggerFactory;  
import org.springframework.beans.factory.annotation.Autowired;  
import org.springframework.stereotype.Service;  

  
/**
 * 連接池管理
 *   
 * @author wy
 *
 */
@Service  
public class ConnectionManager {  
    private final Logger logger = LoggerFactory.getLogger(getClass());  
    // 保存local對象 
    ThreadLocal<TSocket> socketThreadSafe = new ThreadLocal<TSocket>();  
  
    // 連接提供池   
    @Autowired  
    private ConnectionProvider connectionProvider;  
  
    public TSocket getSocket() {  
        TSocket socket = null;  
        try {  
            socket = connectionProvider.getConnection();  
            socketThreadSafe.set(socket);  
            return socketThreadSafe.get();  
        } catch (Exception e) {  
            logger.error("error ConnectionManager.invoke()", e);  
        } finally {  
            connectionProvider.returnCon(socket);  
            socketThreadSafe.remove();  
        }  
        return socket;  
    }  
  
}  
View Code

      spring配置文件  

<!-- zookeeper -->
    <bean id="thriftZookeeper" class="com.wy.thriftpool.commzkpool.zookeeper.ZookeeperFactory" destroy-method="close">
        <property name="connectString" value="127.0.0.1:2181" />
        <property name="namespace" value="thrift/thrift-service" />
    </bean>
    <bean id="connectionProvider" class="com.wy.thriftpool.commzkconnpool.impl.ConnectionProviderImpl">
        <property name="maxActive" value="10" />
        <property name="maxIdle" value="10" />
        <property name="conTimeOut" value="2000" />
        <property name="testOnBorrow" value="true" />
        <property name="testOnReturn" value="true" />
        <property name="testWhileIdle" value="true" />
        
        <property name="addressProvider">
            <bean class="com.wy.thriftpool.commzkpool.support.impl.DynamicAddressProvider">
                <property name="configPath" value="UserServiceImpl" />
                <property name="zookeeper" ref="thriftZookeeper" />
            </bean>
        </property>
    </bean>
View Code

 

參考:http://www.cnblogs.com/mumuxinfei/p/3876187.html

 

由於本人經驗有限,文章中難免會有錯誤,請瀏覽文章的您指正或有不同的觀點共同探討!

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM