本文不討論thrift是否有發展前途,重點在於common-pool2的使用。
背景說明:最近在維護公司的一個老項目,項目目的是使公司內部產品和購買的產品進行交互,主要功能有導入用戶、崗位、權限等。由於購買的產品有緩存設置,所以我們無法使用數據庫導入的方式進行數據修改,而必須使用它內部的方法進行處理。公司選用thrift進行遠程接口調用,我們在購買的產品中增加一個jar包,部署thrift服務端,接受客戶端請求后再調用內部接口函數完成操作。
在接手這個項目后,我發現代碼中維護一個org.apache.thrift.transport.TSocket長連接,所有的線程都使用這個連接進行傳輸數據。我就想,所有線程共用一個TSocket客戶端,並發高時不會出錯么?我們直接拿代碼測試一下(thrift版本為0.13.0)
服務端代碼:
package com.zhi.demo.server; import org.apache.thrift.TProcessor; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.server.TServer; import org.apache.thrift.server.TThreadPoolServer; import org.apache.thrift.transport.TServerSocket; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.zhi.demo.api.HelloWorld; /** * Thrift服務端示例 * * @author 張遠志 * @since 2020年5月27日15:53:29 * */ public class ThriftServer { private final static Logger logger = LoggerFactory.getLogger(ThriftServer.class); @SuppressWarnings({ "rawtypes", "unchecked" }) public static void main(String[] args) { try { TServerSocket serverTransport = new TServerSocket(9090); TBinaryProtocol.Factory proFactory = new TBinaryProtocol.Factory(); TProcessor processor = new HelloWorld.Processor(new HelloWorldImpl()); TThreadPoolServer.Args serverArgs = new TThreadPoolServer.Args(serverTransport); serverArgs.processor(processor); serverArgs.protocolFactory(proFactory); serverArgs.maxWorkerThreads(100); logger.info("Thrift服務端啟動,監聽端口9090"); TServer server = new TThreadPoolServer(serverArgs); server.serve(); } catch (Throwable e) { logger.error("RPC服務報錯", e); } } }
客戶端測試代碼:
package com.zhi.demo.client; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.apache.thrift.TException; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.transport.TSocket; import org.apache.thrift.transport.TTransport; import org.apache.thrift.transport.TTransportException; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.MethodOrderer; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInstance; import org.junit.jupiter.api.TestInstance.Lifecycle; import org.junit.jupiter.api.TestMethodOrder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.zhi.demo.api.HelloWorld; /** * TSocket測試<br> * * @author 張遠志 * @since 2020年5月30日16:16:47 * */ @TestInstance(Lifecycle.PER_CLASS) @TestMethodOrder(MethodOrderer.OrderAnnotation.class) public class TSocketTest { private final static Logger logger = LoggerFactory.getLogger(TSocketTest.class); private int jobCount = 10; private ExecutorService executor; /** * 初始化 */ @BeforeAll public void init() { executor = Executors.newFixedThreadPool(jobCount); } @Test public void test() throws InterruptedException, TTransportException { CountDownLatch latch = new CountDownLatch(jobCount); TTransport transport = new TSocket("127.0.0.1", 9090); transport.open(); for (int i = 0; i < jobCount; i++) { executor.execute(new Job(latch, transport)); } latch.await(); transport.close(); } class Job implements Runnable { CountDownLatch latch; TTransport transport; public Job(CountDownLatch latch, TTransport transport) { this.latch = latch; this.transport = transport; } @Override public void run() { try { HelloWorld.Client client = new HelloWorld.Client(new TBinaryProtocol(transport)); client.sayHello("張三"); } catch (TException e) { logger.error("調用RPC服務報錯", e); } latch.countDown(); } } }
果然,幾個線程一運行,thrift服務端馬上出錯
21:48:51.553 [pool-1-thread-2] ERROR org.apache.thrift.server.TThreadPoolServer - Thrift Error occurred during processing of message. org.apache.thrift.protocol.TProtocolException: Negative length: -2147418111 at org.apache.thrift.protocol.TBinaryProtocol.checkStringReadLength(TBinaryProtocol.java:434) at org.apache.thrift.protocol.TBinaryProtocol.readString(TBinaryProtocol.java:396) at org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:249) at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:27) at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:313) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)
為了解決這個問題,我們必須為每個線程創建一個TSocket連接,但問題又來了,如果並發比較高,對服務端的壓力就增大了,而且創建連接也需要時間,有沒有辦法可以讓TSocket池化,需要時從池中取,用完后放會池中,我想到了common-pool2。為了實現了池化功能,我百度了一些資料,並且參閱了jedis和實現,總算實現了TSocket池化操作。
第一步:改造TSocket,不讓直接關閉,而是歸還到pool中
package com.zhi.thrift.pool; import org.apache.commons.pool2.ObjectPool; import org.apache.thrift.transport.TSocket; import org.apache.thrift.transport.TTransportException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * 可池化的TSocket連接 * * @author 張遠志 * @since 2020年5月27日20:29:39 * */ public class PoolableTSocket extends TSocket { private final Logger logger = LoggerFactory.getLogger(this.getClass()); private ObjectPool<PoolableTSocket> pool; private boolean broken = false; public PoolableTSocket(String host, int port, int timeout) { super(host, port, timeout); } public void setPool(ObjectPool<PoolableTSocket> pool) { this.pool = pool; } /** * 重寫close方法,如果使用了池,則不會直接關閉,而是歸還到池中 */ @Override public void close() { logger.trace("{}PoolableTSocket對象,{}", pool == null ? "關閉" : (isBroken() ? "銷毀" : "歸還"), this); if (pool == null) { super.close(); } else if (isBroken()) { try { pool.invalidateObject(this); } catch (Exception e) { throw new RuntimeException(e); } } else { try { pool.returnObject(this); } catch (Exception e) { throw new RuntimeException(e); } } } public boolean isBroken() { return broken; } /** * 真正的關閉 */ protected void reallyClose() { super.close(); } @Override public void flush() throws TTransportException { try { super.flush(); } catch (TTransportException e) { broken = true; throw e; } } }
第二步:創建池化工廠
package com.zhi.thrift.pool; import org.apache.commons.pool2.BasePooledObjectFactory; import org.apache.commons.pool2.ObjectPool; import org.apache.commons.pool2.PooledObject; import org.apache.commons.pool2.impl.DefaultPooledObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * PoolableTSocket連接池 * * @author 張遠志 * @since 2020年5月27日17:12:46 * */ public class PooledTSocketFactory extends BasePooledObjectFactory<PoolableTSocket> { private final Logger logger = LoggerFactory.getLogger(this.getClass()); private volatile ObjectPool<PoolableTSocket> pool; private String host; private int port; private int timeout; public PooledTSocketFactory(String host, int port, int timeout) { this.host = host; this.port = port; this.timeout = timeout; } @Override public PoolableTSocket create() throws Exception { PoolableTSocket socket = new PoolableTSocket(host, port, timeout); socket.open(); socket.setPool(pool); // 保證連接創建成功后才對pool賦值 logger.trace("成功創建PoolableTSocket對象,{},本地端口:{}", socket, socket.getSocket().getLocalPort()); return socket; } @Override public PooledObject<PoolableTSocket> wrap(PoolableTSocket socket) { return new DefaultPooledObject<PoolableTSocket>(socket); } @Override public void destroyObject(PooledObject<PoolableTSocket> p) throws Exception { PoolableTSocket socket = p.getObject(); logger.trace("銷毀PoolableTSocket對象,{}", socket); if (socket.isOpen()) { socket.reallyClose(); } p.markAbandoned(); } public synchronized void setPool(final ObjectPool<PoolableTSocket> pool) { if (null != this.pool && pool != this.pool) { try { this.pool.close(); } catch (final Exception e) { } } this.pool = pool; } }
第三步:創建一個池
package com.zhi.thrift.pool; import java.util.NoSuchElementException; import org.apache.commons.pool2.ObjectPool; import org.apache.commons.pool2.impl.GenericObjectPool; /** * PoolableTSocket連接池 * * @author 張遠志 * @since 2020年5月30日01:35:02 * */ public class TSocketPool implements ObjectPool<PoolableTSocket> { private final GenericObjectPool<PoolableTSocket> pool; public TSocketPool() { this("127.0.0.1", 9090, 2000); } public TSocketPool(String host, int port, int timeout) { this(new TSocketPoolConfig(), host, port, timeout); } public TSocketPool(TSocketPoolConfig config, String host, int port, int timeout) { PooledTSocketFactory factory = new PooledTSocketFactory(host, port, timeout); pool = new GenericObjectPool<>(factory, config); factory.setPool(pool); } public GenericObjectPool<PoolableTSocket> getPool() { return pool; } @Override public void addObject() throws Exception, IllegalStateException, UnsupportedOperationException { pool.addObject(); } @Override public PoolableTSocket borrowObject() throws Exception, NoSuchElementException, IllegalStateException { return pool.borrowObject(); } @Override public void clear() throws Exception, UnsupportedOperationException { pool.clear(); } @Override public void close() { pool.close(); } @Override public int getNumActive() { return pool.getNumActive(); } @Override public int getNumIdle() { return pool.getNumIdle(); } @Override public void invalidateObject(PoolableTSocket obj) throws Exception { pool.invalidateObject(obj); } @Override public void returnObject(PoolableTSocket obj) throws Exception { pool.returnObject(obj); } }
第四步:定義自己的池配置
package com.zhi.thrift.pool; import org.apache.commons.pool2.impl.GenericObjectPoolConfig; /** * PoolableTSocket連接池參數配置,默認參數配置參照JedisPoolConfig * * @author 張遠志 * @since 2020年5月27日21:08:40 * */ public class TSocketPoolConfig extends GenericObjectPoolConfig<PoolableTSocket> { public TSocketPoolConfig() { setMaxTotal(20); // 不能超過thrift服務端線程池設置 setTestWhileIdle(true); setMinEvictableIdleTimeMillis(60000); setTimeBetweenEvictionRunsMillis(30000); setNumTestsPerEvictionRun(-1); } }
簡單四步就完成了TSocket的池化改造,現在我們就可以驗證功能了
package com.zhi.demo.client; import java.util.Random; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.protocol.TProtocol; import org.junit.jupiter.api.MethodOrderer; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInstance; import org.junit.jupiter.api.TestMethodOrder; import org.junit.jupiter.api.TestInstance.Lifecycle; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.zhi.demo.api.HelloWorld; import com.zhi.thrift.pool.PoolableTSocket; import com.zhi.thrift.pool.TSocketPool; /** * Thrift池化連接測試 * * @author 張遠志 * @since 2020年5月30日01:32:15 * */ @TestInstance(Lifecycle.PER_CLASS) @TestMethodOrder(MethodOrderer.OrderAnnotation.class) public class MultiClient { private final static Logger logger = LoggerFactory.getLogger(MultiClient.class); private TSocketPool pool; private volatile boolean flag = false; // 用於保證有一個線程已經開啟 @Test public void test() throws Exception { pool = new TSocketPool(); int jobCount = 20; for (int i = 0; i < jobCount; i++) { new Job().start(); } while (true) { boolean tem = flag; // 重要,跳出循環不能直接使用flag; int active = pool.getNumActive(), idle = pool.getNumIdle(); logger.info("active={}, idle={}", active, idle); if (tem && active == 0 && idle == pool.getPool().getMinIdle()) { break; } Thread.sleep(1000); }
logger.info("池中資源已完成釋放,可以關閉池了"); pool.close(); } class Job extends Thread { @Override public void run() { MultiClient.sleep(new Random().nextInt(5) * 1000); // 讓資源申請發生在不同的時間 try (PoolableTSocket socket = pool.borrowObject();) { // 借來的東西務必要還,不然對象一致會被占用,其他線程無法申請 flag = true; TProtocol protocol = new TBinaryProtocol(socket); HelloWorld.Client client = new HelloWorld.Client(protocol); client.sayHello("張三"); MultiClient.sleep(1000); // 模擬占用一段時間的資源 } catch (Exception e) { logger.error("調用Thrift遠程接口失敗", e); } } } static void sleep(long time) { try { Thread.sleep(time); } catch (InterruptedException e) { } } }
測試結果:
22:22:19.233 [main] INFO com.zhi.demo.client.MultiClient - active=0, idle=0 22:22:20.249 [main] INFO com.zhi.demo.client.MultiClient - active=6, idle=0 22:22:21.249 [main] INFO com.zhi.demo.client.MultiClient - active=5, idle=3 22:22:22.250 [main] INFO com.zhi.demo.client.MultiClient - active=3, idle=5 22:22:23.251 [main] INFO com.zhi.demo.client.MultiClient - active=6, idle=2 22:22:24.252 [main] INFO com.zhi.demo.client.MultiClient - active=0, idle=8 22:22:25.252 [main] INFO com.zhi.demo.client.MultiClient - active=0, idle=8 ... 22:23:48.300 [main] INFO com.zhi.demo.client.MultiClient - active=0, idle=8 22:23:49.301 [main] INFO com.zhi.demo.client.MultiClient - active=0, idle=0 22:23:49.301 [main] INFO com.zhi.demo.client.MultiClient - 池中資源已完成釋放,可以關閉池了
效果很好,完美了實現了我的要求。