本文使用commons-pool2
来实现连接池应用
1、定义一个产生连接池的工厂,需要继承BasePooledObjectFactory,其用处是生产和销毁连接池中保存的对象。根据需求,现在池子里保存的应该是grpc客户端对象。
GrpcClientFactory类:
package com.oy.grpc; import org.apache.commons.pool2.BasePooledObjectFactory; import org.apache.commons.pool2.PooledObject; import org.apache.commons.pool2.impl.DefaultPooledObject; import com.oy.grpc.client.GrpcClient; import com.oy.utils.UtilFunctions; public class GrpcClientFactory extends BasePooledObjectFactory<GrpcClient> { @Override public GrpcClient create() throws Exception { return new GrpcClient("localhost", 23333); } @Override public PooledObject<GrpcClient> wrap(GrpcClient client) { return new DefaultPooledObject<>(client); } @Override public void destroyObject(PooledObject<GrpcClient> p) throws Exception { UtilFunctions.log.info("==== GrpcClientFactory#destroyObject ===="); p.getObject().shutdown(); super.destroyObject(p); } }
2、连接池GrpcClientPool类
package com.oy.grpc; import org.apache.commons.pool2.impl.GenericObjectPool; import org.apache.commons.pool2.impl.GenericObjectPoolConfig; import com.oy.grpc.client.GrpcClient; import com.oy.utils.UtilFunctions; public class GrpcClientPool { private static GenericObjectPool<GrpcClient> objectPool = null; static { GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig(); // 池中的最大连接数 poolConfig.setMaxTotal(8); // 最少的空闲连接数 poolConfig.setMinIdle(0); // 最多的空闲连接数 poolConfig.setMaxIdle(8); // 当连接池资源耗尽时,调用者最大阻塞的时间,超时时抛出异常 单位:毫秒数 poolConfig.setMaxWaitMillis(-1); // 连接池存放池化对象方式,true放在空闲队列最前面,false放在空闲队列最后 poolConfig.setLifo(true); // 连接空闲的最小时间,达到此值后空闲连接可能会被移除,默认即为30分钟 poolConfig.setMinEvictableIdleTimeMillis(1000L * 60L * 30L);// 连接耗尽时是否阻塞,默认为true poolConfig.setBlockWhenExhausted(true); objectPool = new GenericObjectPool<>(new GrpcClientFactory(), poolConfig); } public static GrpcClient borrowObject() { try { GrpcClient client = objectPool.borrowObject(); UtilFunctions.log.info("=======total threads created: " + objectPool.getCreatedCount()); return client; } catch (Exception e) { UtilFunctions.log.error("objectPool.borrowObject error, msg:{}, exception:{}", e.toString(), e); } return createClient(); } public static void returnObject(GrpcClient client) { try { objectPool.returnObject(client); } catch (Exception e) { UtilFunctions.log.error("objectPool.returnObject error, msg:{}, exception:{}", e.toString(), e); } } private static GrpcClient createClient() { return new GrpcClient("localhost", 23333); } }
3、客户端程序
这里仅仅简单列出了客户端GrpcClient类的代码,其他代码包括服务端代码见另一篇博客grpc(一)之helloworld。
package com.oy.grpc.client; import java.lang.reflect.Method; import java.util.concurrent.TimeUnit; import com.oy.grpc.BookServiceGrpc; import com.oy.grpc.GrpcClientPool; import com.oy.grpc.GrpcLib.GrpcReply; import com.oy.grpc.GrpcLib.addBookRequest; import com.oy.grpc.GrpcLib.getUserByIdRequest; import com.oy.grpc.UserServiceGrpc; import com.oy.utils.UtilFunctions; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import io.grpc.StatusRuntimeException; public class GrpcClient { public static String host = "localhost"; private final ManagedChannel channel; private final UserServiceGrpc.UserServiceBlockingStub userBlockingStub; private final BookServiceGrpc.BookServiceBlockingStub bookBlockingStub; public GrpcClient(String host, int port) { channel = ManagedChannelBuilder.forAddress(host, port).usePlaintext().build(); userBlockingStub = UserServiceGrpc.newBlockingStub(channel); bookBlockingStub = BookServiceGrpc.newBlockingStub(channel); } public void shutdown() throws InterruptedException { channel.shutdown().awaitTermination(10, TimeUnit.SECONDS); } @SuppressWarnings({ "rawtypes" }) public static Object call(String rpcMethoddName, Object... args) throws Exception { UtilFunctions.log.info("=========== GrpcClient#call begin ==========="); GrpcClient client = null; try { client = GrpcClientPool.borrowObject(); // client = new GrpcClient(host, 23333); Class[] argsTypes = new Class[args.length]; for (int i = 0; i < args.length; i++) { UtilFunctions.log.info("args types: {}", args[i].getClass()); argsTypes[i] = args[i].getClass(); } Method method = client.getClass().getMethod(rpcMethoddName, argsTypes); Object result = method.invoke(client, args); UtilFunctions.log.info("=========== GrpcClient#call end ==========="); return result; } catch (Exception e) { UtilFunctions.log.error("GrpcClient#call error, msg:{}, exception:{}", e.toString(), e); return null; } finally { if (client != null) { GrpcClientPool.returnObject(client); // client.shutdown(); } } } // ============= User module ============= public Object getUserById(Integer id) { UtilFunctions.log.info("=========== GrpcClient#getUserById begin ==========="); getUserByIdRequest request = getUserByIdRequest.newBuilder().setId(id).build(); GrpcReply response; try { response = userBlockingStub.getUserById(request); UtilFunctions.log.info("GrpcClient#getUserById response, code:{}, data:{}", response.getCode(), response.getData()); } catch (StatusRuntimeException e) { UtilFunctions.log.error("GrpcClient#addBook error, msg:{}, exception:{}", e.toString(), e); return null; } return response; } // ============= Book module ============= public Object addBook(Integer id, String name, Double price) { UtilFunctions.log.info("=========== GrpcClient#addBook begin ==========="); addBookRequest request = addBookRequest.newBuilder().setId(id).setName(name).setPrice(price).build(); GrpcReply response; try { response = bookBlockingStub.addBook(request); UtilFunctions.log.info("GrpcClient#addBook response, code:{}, data:{}", response.getCode(), response.getData()); UtilFunctions.log.info("=========== GrpcClient#addBook end ==========="); } catch (StatusRuntimeException e) { UtilFunctions.log.error("GrpcClient#addBook error, msg:{}, exception:{}", e.toString(), e); return null; } return response; } }
4、客户端测试
package com.oy.grpc.client; import com.oy.grpc.GrpcClientPool; import com.oy.grpc.GrpcLib.GrpcReply; import com.oy.utils.UtilFunctions; public class TestService { public static void main(String[] args) throws Exception { for (int i = 0; i < 4; i++) { new Thread(new Runnable() { @Override public void run() { GrpcReply result = null; try { // result = (GrpcReply) GrpcClient.call("getUserById", Integer.valueOf("1")); // result = (GrpcReply) GrpcClient.call("getUserById", 2); result = (GrpcReply) GrpcClient.call("addBook", 1, "thinking in java", 50.0); } catch (Exception e) { e.printStackTrace(); } if (result != null) { UtilFunctions.log.info("client call interface, get code:{}, data:{}", result.getCode(), result.getData()); } // 如果注释掉下面两句,则客户端程序结束后,服务端报java.io.IOException: 远程主机强迫关闭了一个现有的连接。 // UtilFunctions.log.info("TestService#main: objectPool is closing..."); // GrpcClientPool.getObjectPool().close(); } }).start(); } } }
运行testService类的main()方法,客户端能正常调用grpc server得到数据,但是grpc服务端报错:
2019-04-11 14:29:30.458 INFO 1192 --- [-worker-ELG-3-1] i.g.n.NettyServerTransport.connections : Transport failed
java.io.IOException: 远程主机强迫关闭了一个现有的连接。
出现这个问题的原因:客户端强制断开连接。参考https://stackoverflow.com/questions/46802521/io-grpc-netty-nettyservertransport-notifyterminated,
我在GrpcClientFactory里面也实现了销毁方法:
@Override public void destroyObject(PooledObject<GrpcClient> p) throws Exception { UtilFunctions.log.info("==== GrpcClientFactory#destroyObject ===="); p.getObject().shutdown(); super.destroyObject(p); }
但是运行testService类的main()方法结束后服务端程序就结束了,程序没有主动调用destroyObject()方法销毁池子中的对象,所以grpcClient也没有shutdown,所以报错。
5、启动客户端springboot项目来测试
package com.oy; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import com.oy.grpc.client.TestService; @SpringBootApplication public class Grpc007ClientMainApplication { public static void main(String[] args) throws Exception { SpringApplication.run(Grpc007ClientMainApplication.class, args); TestService.main(args); } }
但是这样当关闭客户端程序,还是出现同样的问题。其实很好理解,因为关闭客户端程序时,池中的对象还处于空闲状态,没有销毁,destroyObject()方法没有调用,所以grpcClient也没有shutdown。
6、解决方法
客户端程序关闭时,池也要close。
package com.oy; import javax.annotation.PreDestroy; import org.apache.commons.pool2.impl.GenericObjectPool; import org.springframework.stereotype.Controller; import com.oy.grpc.GrpcClientPool; import com.oy.grpc.client.GrpcClient; import com.oy.utils.UtilFunctions; @Controller public class InitController { @PreDestroy public void destroy() { UtilFunctions.log.info("InitController#destroy running..."); GenericObjectPool<GrpcClient> objectPool = GrpcClientPool.getObjectPool(); UtilFunctions.log.info("InitController#destroy, total threads created: " + objectPool.getCreatedCount()); UtilFunctions.log.info("InitController#destroy objectPool is closing..."); objectPool.close(); } }