使用common-pool2構建thrift客戶端連接池


  本文不討論thrift是否有發展前途,重點在於common-pool2的使用。

  背景說明:最近在維護公司的一個老項目,項目目的是使公司內部產品和購買的產品進行交互,主要功能有導入用戶、崗位、權限等。由於購買的產品有緩存設置,所以我們無法使用數據庫導入的方式進行數據修改,而必須使用它內部的方法進行處理。公司選用thrift進行遠程接口調用,我們在購買的產品中增加一個jar包,部署thrift服務端,接受客戶端請求后再調用內部接口函數完成操作。

  在接手這個項目后,我發現代碼中維護一個org.apache.thrift.transport.TSocket長連接,所有的線程都使用這個連接進行傳輸數據。我就想,所有線程共用一個TSocket客戶端,並發高時不會出錯么?我們直接拿代碼測試一下(thrift版本為0.13.0)

  服務端代碼:

package com.zhi.demo.server;

import org.apache.thrift.TProcessor;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.server.TServer;
import org.apache.thrift.server.TThreadPoolServer;
import org.apache.thrift.transport.TServerSocket;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.zhi.demo.api.HelloWorld;

/**
 * Thrift服務端示例
 * 
 * @author 張遠志
 * @since 2020年5月27日15:53:29
 *
 */
public class ThriftServer {
    private final static Logger logger = LoggerFactory.getLogger(ThriftServer.class);

    @SuppressWarnings({ "rawtypes", "unchecked" })
    public static void main(String[] args) {
        try {
            TServerSocket serverTransport = new TServerSocket(9090);
            TBinaryProtocol.Factory proFactory = new TBinaryProtocol.Factory();
            TProcessor processor = new HelloWorld.Processor(new HelloWorldImpl());

            TThreadPoolServer.Args serverArgs = new TThreadPoolServer.Args(serverTransport);
            serverArgs.processor(processor);
            serverArgs.protocolFactory(proFactory);
            serverArgs.maxWorkerThreads(100);

            logger.info("Thrift服務端啟動,監聽端口9090");

            TServer server = new TThreadPoolServer(serverArgs);
            server.serve();
        } catch (Throwable e) {
            logger.error("RPC服務報錯", e);
        }
    }
}

  客戶端測試代碼:

package com.zhi.demo.client;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.MethodOrderer;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.TestInstance.Lifecycle;
import org.junit.jupiter.api.TestMethodOrder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.zhi.demo.api.HelloWorld;

/**
 * TSocket測試<br>
 * 
 * @author 張遠志
 * @since 2020年5月30日16:16:47
 *
 */
@TestInstance(Lifecycle.PER_CLASS)
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
public class TSocketTest {
    private final static Logger logger = LoggerFactory.getLogger(TSocketTest.class);
    private int jobCount = 10;
    private ExecutorService executor;

    /**
     * 初始化
     */
    @BeforeAll
    public void init() {
        executor = Executors.newFixedThreadPool(jobCount);
    }

    @Test
    public void test() throws InterruptedException, TTransportException {
        CountDownLatch latch = new CountDownLatch(jobCount);
        TTransport transport = new TSocket("127.0.0.1", 9090);
        transport.open();
        for (int i = 0; i < jobCount; i++) {
            executor.execute(new Job(latch, transport));
        }
        latch.await();
        transport.close();
    }

    class Job implements Runnable {
        CountDownLatch latch;
        TTransport transport;

        public Job(CountDownLatch latch, TTransport transport) {
            this.latch = latch;
            this.transport = transport;
        }

        @Override
        public void run() {
            try {
                HelloWorld.Client client = new HelloWorld.Client(new TBinaryProtocol(transport));
                client.sayHello("張三");
            } catch (TException e) {
                logger.error("調用RPC服務報錯", e);
            }
            latch.countDown();
        }
    }
}

  果然,幾個線程一運行,thrift服務端馬上出錯

21:48:51.553 [pool-1-thread-2] ERROR org.apache.thrift.server.TThreadPoolServer - Thrift Error occurred during processing of message.
org.apache.thrift.protocol.TProtocolException: Negative length: -2147418111
	at org.apache.thrift.protocol.TBinaryProtocol.checkStringReadLength(TBinaryProtocol.java:434)
	at org.apache.thrift.protocol.TBinaryProtocol.readString(TBinaryProtocol.java:396)
	at org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:249)
	at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:27)
	at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:313)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

  為了解決這個問題,我們必須為每個線程創建一個TSocket連接,但問題又來了,如果並發比較高,對服務端的壓力就增大了,而且創建連接也需要時間,有沒有辦法可以讓TSocket池化,需要時從池中取,用完后放會池中,我想到了common-pool2。為了實現了池化功能,我百度了一些資料,並且參閱了jedis和實現,總算實現了TSocket池化操作。

  第一步:改造TSocket,不讓直接關閉,而是歸還到pool中

package com.zhi.thrift.pool;

import org.apache.commons.pool2.ObjectPool;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * 可池化的TSocket連接
 * 
 * @author 張遠志
 * @since 2020年5月27日20:29:39
 *
 */
public class PoolableTSocket extends TSocket {
    private final Logger logger = LoggerFactory.getLogger(this.getClass());
    private ObjectPool<PoolableTSocket> pool;
    private boolean broken = false;

    public PoolableTSocket(String host, int port, int timeout) {
        super(host, port, timeout);
    }

    public void setPool(ObjectPool<PoolableTSocket> pool) {
        this.pool = pool;
    }

    /**
     * 重寫close方法,如果使用了池,則不會直接關閉,而是歸還到池中
     */
    @Override
    public void close() {
        logger.trace("{}PoolableTSocket對象,{}", pool == null ? "關閉" : (isBroken() ? "銷毀" : "歸還"), this);
        if (pool == null) {
            super.close();
        } else if (isBroken()) {
            try {
                pool.invalidateObject(this);
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        } else {
            try {
                pool.returnObject(this);
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }

    public boolean isBroken() {
        return broken;
    }

    /**
     * 真正的關閉
     */
    protected void reallyClose() {
        super.close();
    }

    @Override
    public void flush() throws TTransportException {
        try {
            super.flush();
        } catch (TTransportException e) {
            broken = true;
            throw e;
        }
    }
}

  第二步:創建池化工廠

package com.zhi.thrift.pool;

import org.apache.commons.pool2.BasePooledObjectFactory;
import org.apache.commons.pool2.ObjectPool;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * PoolableTSocket連接池
 * 
 * @author 張遠志
 * @since 2020年5月27日17:12:46
 *
 */
public class PooledTSocketFactory extends BasePooledObjectFactory<PoolableTSocket> {
    private final Logger logger = LoggerFactory.getLogger(this.getClass());
    private volatile ObjectPool<PoolableTSocket> pool;
    private String host;
    private int port;
    private int timeout;

    public PooledTSocketFactory(String host, int port, int timeout) {
        this.host = host;
        this.port = port;
        this.timeout = timeout;
    }

    @Override
    public PoolableTSocket create() throws Exception {
        PoolableTSocket socket = new PoolableTSocket(host, port, timeout);
        socket.open();
        socket.setPool(pool); // 保證連接創建成功后才對pool賦值
        logger.trace("成功創建PoolableTSocket對象,{},本地端口:{}", socket, socket.getSocket().getLocalPort());
        return socket;
    }

    @Override
    public PooledObject<PoolableTSocket> wrap(PoolableTSocket socket) {
        return new DefaultPooledObject<PoolableTSocket>(socket);
    }

    @Override
    public void destroyObject(PooledObject<PoolableTSocket> p) throws Exception {
        PoolableTSocket socket = p.getObject();
        logger.trace("銷毀PoolableTSocket對象,{}", socket);
        if (socket.isOpen()) {
            socket.reallyClose();
        }
        p.markAbandoned();
    }

    public synchronized void setPool(final ObjectPool<PoolableTSocket> pool) {
        if (null != this.pool && pool != this.pool) {
            try {
                this.pool.close();
            } catch (final Exception e) {
            }
        }
        this.pool = pool;
    }

}

第三步:創建一個池

package com.zhi.thrift.pool;

import java.util.NoSuchElementException;

import org.apache.commons.pool2.ObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPool;

/**
 * PoolableTSocket連接池
 * 
 * @author 張遠志
 * @since 2020年5月30日01:35:02
 *
 */
public class TSocketPool implements ObjectPool<PoolableTSocket> {
    private final GenericObjectPool<PoolableTSocket> pool;

    public TSocketPool() {
        this("127.0.0.1", 9090, 2000);
    }

    public TSocketPool(String host, int port, int timeout) {
        this(new TSocketPoolConfig(), host, port, timeout);
    }

    public TSocketPool(TSocketPoolConfig config, String host, int port, int timeout) {
        PooledTSocketFactory factory = new PooledTSocketFactory(host, port, timeout);
        pool = new GenericObjectPool<>(factory, config);
        factory.setPool(pool);
    }

    public GenericObjectPool<PoolableTSocket> getPool() {
        return pool;
    }

    @Override
    public void addObject() throws Exception, IllegalStateException, UnsupportedOperationException {
        pool.addObject();
    }

    @Override
    public PoolableTSocket borrowObject() throws Exception, NoSuchElementException, IllegalStateException {
        return pool.borrowObject();
    }

    @Override
    public void clear() throws Exception, UnsupportedOperationException {
        pool.clear();
    }

    @Override
    public void close() {
        pool.close();
    }

    @Override
    public int getNumActive() {
        return pool.getNumActive();
    }

    @Override
    public int getNumIdle() {
        return pool.getNumIdle();
    }

    @Override
    public void invalidateObject(PoolableTSocket obj) throws Exception {
        pool.invalidateObject(obj);
    }

    @Override
    public void returnObject(PoolableTSocket obj) throws Exception {
        pool.returnObject(obj);
    }
}

  第四步:定義自己的池配置

package com.zhi.thrift.pool;

import org.apache.commons.pool2.impl.GenericObjectPoolConfig;

/**
 * PoolableTSocket連接池參數配置,默認參數配置參照JedisPoolConfig
 * 
 * @author 張遠志
 * @since 2020年5月27日21:08:40
 *
 */
public class TSocketPoolConfig extends GenericObjectPoolConfig<PoolableTSocket> {
    public TSocketPoolConfig() {
        setMaxTotal(20); // 不能超過thrift服務端線程池設置
        setTestWhileIdle(true);
        setMinEvictableIdleTimeMillis(60000);
        setTimeBetweenEvictionRunsMillis(30000);
        setNumTestsPerEvictionRun(-1);
    }
}

  簡單四步就完成了TSocket的池化改造,現在我們就可以驗證功能了

package com.zhi.demo.client;

import java.util.Random;

import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.junit.jupiter.api.MethodOrderer;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.TestMethodOrder;
import org.junit.jupiter.api.TestInstance.Lifecycle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.zhi.demo.api.HelloWorld;
import com.zhi.thrift.pool.PoolableTSocket;
import com.zhi.thrift.pool.TSocketPool;

/**
 * Thrift池化連接測試
 * 
 * @author 張遠志
 * @since 2020年5月30日01:32:15
 *
 */
@TestInstance(Lifecycle.PER_CLASS)
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
public class MultiClient {
    private final static Logger logger = LoggerFactory.getLogger(MultiClient.class);
    private TSocketPool pool;
    private volatile boolean flag = false; // 用於保證有一個線程已經開啟

    @Test
    public void test() throws Exception {
        pool = new TSocketPool();

        int jobCount = 20;
        for (int i = 0; i < jobCount; i++) {
            new Job().start();
        }
        while (true) {
            boolean tem = flag; // 重要,跳出循環不能直接使用flag;
            int active = pool.getNumActive(), idle = pool.getNumIdle();
            logger.info("active={}, idle={}", active, idle);
            if (tem && active == 0 && idle == pool.getPool().getMinIdle()) {
                break;
            }
            Thread.sleep(1000);
        }
logger.info("池中資源已完成釋放,可以關閉池了"); pool.close(); }
class Job extends Thread { @Override public void run() { MultiClient.sleep(new Random().nextInt(5) * 1000); // 讓資源申請發生在不同的時間 try (PoolableTSocket socket = pool.borrowObject();) { // 借來的東西務必要還,不然對象一致會被占用,其他線程無法申請 flag = true; TProtocol protocol = new TBinaryProtocol(socket); HelloWorld.Client client = new HelloWorld.Client(protocol); client.sayHello("張三"); MultiClient.sleep(1000); // 模擬占用一段時間的資源 } catch (Exception e) { logger.error("調用Thrift遠程接口失敗", e); } } } static void sleep(long time) { try { Thread.sleep(time); } catch (InterruptedException e) { } } }

  測試結果:

22:22:19.233 [main] INFO com.zhi.demo.client.MultiClient - active=0, idle=0
22:22:20.249 [main] INFO com.zhi.demo.client.MultiClient - active=6, idle=0
22:22:21.249 [main] INFO com.zhi.demo.client.MultiClient - active=5, idle=3
22:22:22.250 [main] INFO com.zhi.demo.client.MultiClient - active=3, idle=5
22:22:23.251 [main] INFO com.zhi.demo.client.MultiClient - active=6, idle=2
22:22:24.252 [main] INFO com.zhi.demo.client.MultiClient - active=0, idle=8
22:22:25.252 [main] INFO com.zhi.demo.client.MultiClient - active=0, idle=8
...
22:23:48.300 [main] INFO com.zhi.demo.client.MultiClient - active=0, idle=8
22:23:49.301 [main] INFO com.zhi.demo.client.MultiClient - active=0, idle=0
22:23:49.301 [main] INFO com.zhi.demo.client.MultiClient - 池中資源已完成釋放,可以關閉池了

  效果很好,完美了實現了我的要求。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM