grpc(三)之grpc客戶端使用連接池


  本文使用commons-pool2來實現連接池應用

1、定義一個產生連接池的工廠,需要繼承BasePooledObjectFactory,其用處是生產和銷毀連接池中保存的對象。根據需求,現在池子里保存的應該是grpc客戶端對象。

  GrpcClientFactory類:

package com.oy.grpc;

import org.apache.commons.pool2.BasePooledObjectFactory;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import com.oy.grpc.client.GrpcClient;
import com.oy.utils.UtilFunctions;

public class GrpcClientFactory extends BasePooledObjectFactory<GrpcClient> {

    @Override
    public GrpcClient create() throws Exception {
        return new GrpcClient("localhost", 23333);
    }

    @Override
    public PooledObject<GrpcClient> wrap(GrpcClient client) {
        return new DefaultPooledObject<>(client);
    }

    @Override
    public void destroyObject(PooledObject<GrpcClient> p) throws Exception {
        UtilFunctions.log.info("==== GrpcClientFactory#destroyObject ====");
        p.getObject().shutdown();
        super.destroyObject(p);
    }

}

 

2、連接池GrpcClientPool類

package com.oy.grpc;

import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import com.oy.grpc.client.GrpcClient;
import com.oy.utils.UtilFunctions;

public class GrpcClientPool {

    private static GenericObjectPool<GrpcClient> objectPool = null;

    static {
        GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
        // 池中的最大連接數
        poolConfig.setMaxTotal(8);
        // 最少的空閑連接數
        poolConfig.setMinIdle(0);
        // 最多的空閑連接數
        poolConfig.setMaxIdle(8);
        // 當連接池資源耗盡時,調用者最大阻塞的時間,超時時拋出異常 單位:毫秒數
        poolConfig.setMaxWaitMillis(-1);
        // 連接池存放池化對象方式,true放在空閑隊列最前面,false放在空閑隊列最后
        poolConfig.setLifo(true);
        // 連接空閑的最小時間,達到此值后空閑連接可能會被移除,默認即為30分鍾
        poolConfig.setMinEvictableIdleTimeMillis(1000L * 60L * 30L);// 連接耗盡時是否阻塞,默認為true
        poolConfig.setBlockWhenExhausted(true);
        objectPool = new GenericObjectPool<>(new GrpcClientFactory(), poolConfig);
    }

    public static GrpcClient borrowObject() {
        try {
            GrpcClient client = objectPool.borrowObject();
            UtilFunctions.log.info("=======total threads created: " + objectPool.getCreatedCount());
            return client;
        } catch (Exception e) {
            UtilFunctions.log.error("objectPool.borrowObject error, msg:{}, exception:{}", e.toString(), e);
        }
        return createClient();
    }

    public static void returnObject(GrpcClient client) {
        try {
            objectPool.returnObject(client);
        } catch (Exception e) {
            UtilFunctions.log.error("objectPool.returnObject error, msg:{}, exception:{}", e.toString(), e);
        }
    }

    private static GrpcClient createClient() {
        return new GrpcClient("localhost", 23333);
    }

}

 

3、客戶端程序

  這里僅僅簡單列出了客戶端GrpcClient類的代碼,其他代碼包括服務端代碼見另一篇博客grpc(一)之helloworld。

package com.oy.grpc.client;

import java.lang.reflect.Method;
import java.util.concurrent.TimeUnit;
import com.oy.grpc.BookServiceGrpc;
import com.oy.grpc.GrpcClientPool;
import com.oy.grpc.GrpcLib.GrpcReply;
import com.oy.grpc.GrpcLib.addBookRequest;
import com.oy.grpc.GrpcLib.getUserByIdRequest;
import com.oy.grpc.UserServiceGrpc;
import com.oy.utils.UtilFunctions;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;

public class GrpcClient {
    public static String host = "localhost";
    private final ManagedChannel channel;
    private final UserServiceGrpc.UserServiceBlockingStub userBlockingStub;
    private final BookServiceGrpc.BookServiceBlockingStub bookBlockingStub;

    public GrpcClient(String host, int port) {
        channel = ManagedChannelBuilder.forAddress(host, port).usePlaintext().build();
        userBlockingStub = UserServiceGrpc.newBlockingStub(channel);
        bookBlockingStub = BookServiceGrpc.newBlockingStub(channel);
    }

    public void shutdown() throws InterruptedException {
        channel.shutdown().awaitTermination(10, TimeUnit.SECONDS);
    }

    @SuppressWarnings({ "rawtypes" })
    public static Object call(String rpcMethoddName, Object... args) throws Exception {
        UtilFunctions.log.info("=========== GrpcClient#call begin ===========");
        GrpcClient client = null;
        try {
            client = GrpcClientPool.borrowObject();
            // client = new GrpcClient(host, 23333);

            Class[] argsTypes = new Class[args.length];
            for (int i = 0; i < args.length; i++) {
                UtilFunctions.log.info("args types: {}", args[i].getClass());
                argsTypes[i] = args[i].getClass();
            }
            Method method = client.getClass().getMethod(rpcMethoddName, argsTypes);
            Object result = method.invoke(client, args);
            UtilFunctions.log.info("=========== GrpcClient#call end ===========");
            return result;
        } catch (Exception e) {
            UtilFunctions.log.error("GrpcClient#call error, msg:{}, exception:{}", e.toString(), e);
            return null;
        } finally {
            if (client != null) {
                GrpcClientPool.returnObject(client);
                // client.shutdown();
            }
        }
    }

    // ============= User module =============
    public Object getUserById(Integer id) {
        UtilFunctions.log.info("=========== GrpcClient#getUserById begin ===========");
        getUserByIdRequest request = getUserByIdRequest.newBuilder().setId(id).build();
        GrpcReply response;
        try {
            response = userBlockingStub.getUserById(request);
            UtilFunctions.log.info("GrpcClient#getUserById response, code:{}, data:{}", response.getCode(),
                    response.getData());
        } catch (StatusRuntimeException e) {
            UtilFunctions.log.error("GrpcClient#addBook error, msg:{}, exception:{}", e.toString(), e);
            return null;
        }
        return response;
    }

    // ============= Book module =============
    public Object addBook(Integer id, String name, Double price) {
        UtilFunctions.log.info("=========== GrpcClient#addBook begin ===========");
        addBookRequest request = addBookRequest.newBuilder().setId(id).setName(name).setPrice(price).build();
        GrpcReply response;
        try {
            response = bookBlockingStub.addBook(request);
            UtilFunctions.log.info("GrpcClient#addBook response, code:{}, data:{}", response.getCode(),
                    response.getData());
            UtilFunctions.log.info("=========== GrpcClient#addBook end ===========");
        } catch (StatusRuntimeException e) {
            UtilFunctions.log.error("GrpcClient#addBook error, msg:{}, exception:{}", e.toString(), e);
            return null;
        }
        return response;
    }

}

 

4、客戶端測試

package com.oy.grpc.client;

import com.oy.grpc.GrpcClientPool;
import com.oy.grpc.GrpcLib.GrpcReply;
import com.oy.utils.UtilFunctions;

public class TestService {

    public static void main(String[] args) throws Exception {

        for (int i = 0; i < 4; i++) {
            new Thread(new Runnable() {

                @Override
                public void run() {
                    GrpcReply result = null;
                    try {
                        // result = (GrpcReply) GrpcClient.call("getUserById", Integer.valueOf("1"));
                        // result = (GrpcReply) GrpcClient.call("getUserById", 2);
                        result = (GrpcReply) GrpcClient.call("addBook", 1, "thinking in java", 50.0);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    if (result != null) {
                        UtilFunctions.log.info("client call interface, get code:{}, data:{}", result.getCode(),
                                result.getData());
                    }

            // 如果注釋掉下面兩句,則客戶端程序結束后,服務端報java.io.IOException: 遠程主機強迫關閉了一個現有的連接。
                    // UtilFunctions.log.info("TestService#main: objectPool is closing...");
                    // GrpcClientPool.getObjectPool().close();
                }
            }).start();
        }
    }
}

 

  運行testService類的main()方法,客戶端能正常調用grpc server得到數據,但是grpc服務端報錯:

2019-04-11 14:29:30.458  INFO 1192 --- [-worker-ELG-3-1] i.g.n.NettyServerTransport.connections   : Transport failed
java.io.IOException: 遠程主機強迫關閉了一個現有的連接。

  出現這個問題的原因:客戶端強制斷開連接。參考https://stackoverflow.com/questions/46802521/io-grpc-netty-nettyservertransport-notifyterminated,

  

 

  我在GrpcClientFactory里面也實現了銷毀方法:

@Override
public void destroyObject(PooledObject<GrpcClient> p) throws Exception {
    UtilFunctions.log.info("==== GrpcClientFactory#destroyObject ====");
    p.getObject().shutdown(); super.destroyObject(p);
}

  但是運行testService類的main()方法結束后服務端程序就結束了,程序沒有主動調用destroyObject()方法銷毀池子中的對象,所以grpcClient也沒有shutdown,所以報錯。

 

5、啟動客戶端springboot項目來測試

package com.oy;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import com.oy.grpc.client.TestService;

@SpringBootApplication
public class Grpc007ClientMainApplication {

    public static void main(String[] args) throws Exception {
        SpringApplication.run(Grpc007ClientMainApplication.class, args);
        TestService.main(args);
    }

}

  但是這樣當關閉客戶端程序,還是出現同樣的問題。其實很好理解,因為關閉客戶端程序時,池中的對象還處於空閑狀態,沒有銷毀,destroyObject()方法沒有調用,所以grpcClient也沒有shutdown

  

6、解決方法

  客戶端程序關閉時,池也要close。

package com.oy;

import javax.annotation.PreDestroy;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.springframework.stereotype.Controller;
import com.oy.grpc.GrpcClientPool;
import com.oy.grpc.client.GrpcClient;
import com.oy.utils.UtilFunctions;

@Controller
public class InitController {

    @PreDestroy
    public void destroy() {
        UtilFunctions.log.info("InitController#destroy running...");
        
        GenericObjectPool<GrpcClient> objectPool = GrpcClientPool.getObjectPool();
        UtilFunctions.log.info("InitController#destroy, total threads created: " + objectPool.getCreatedCount());
        
        UtilFunctions.log.info("InitController#destroy objectPool is closing..."); objectPool.close();
    }
}

    

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM