freeswitch筆記(8)-esl outbound 填坑筆記


github上的esl-client已經N年未更新了,上面有一堆bug,記錄一下:

 

一、內存泄露

org.freeswitch.esl.client.transport.message.EslFrameDecoder 這個類,使用了netty的ByteBuf,對netty有了解的同學應該知道,netty底層大量使用了堆外內存,建議開發人員及時手動釋放。

https://github.com/esl-client/esl-client/issues/24 也有記載

參考下圖,手動加上釋放處理即可

 

 

二、線程池優化

org.freeswitch.esl.client.outbound.OutboundChannelInitializer 這個類,每次freeswitch有來電時,會以outbound外聯模式,通過tcp連接到esl client,初始化channel。callbackExector是一個單線程池,正常情況下問題倒不大,但是jdk源碼:

    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

LinkedBlockingQueue默認是一個無界隊列:

    public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }

有點風險,改成下面這樣更安全點:

    private ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
            .setNameFormat("outbound-pool-%d").build();

    public ExecutorService callbackExecutor = new ThreadPoolExecutor(1, 1,
            0L, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<>(10000), namedThreadFactory);

這個單線程池的用法也順帶研究了下,它真正使用的地方在於org.freeswitch.esl.client.outbound.OutboundClientHandler,用於處理freeswitch發過來的事件

	@Override
	protected void handleEslEvent(final ChannelHandlerContext ctx, final EslEvent event) {
		callbackExecutor.execute(() -> clientHandler.onEslEvent(
				new Context(ctx.channel(), OutboundClientHandler.this), event));
	}

大家知道Netty本身就有2個線程池:bossGroup,workerGroup,默認大小在io.netty.channel.MultithreadEventLoopGroup中

    static {
        DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
                "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));

        if (logger.isDebugEnabled()) {
            logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
        }
    }

即:核數*2。 既然已經是線程池了,為啥這里esl的事件又單獨交給1個單線程池來處理呢? 先來看OutboundChannelInitializer實例化的地方,在org.freeswitch.esl.client.outbound.SocketClient的doStart里

	@Override
	protected void doStart() {
		final ServerBootstrap bootstrap = new ServerBootstrap()
				.group(bossGroup, workerGroup)
				.channel(NioServerSocketChannel.class)
				.childOption(ChannelOption.TCP_NODELAY, true)
				.childOption(ChannelOption.SO_KEEPALIVE, true)
				.childHandler(new OutboundChannelInitializer(clientHandlerFactory));

		serverChannel = bootstrap.bind(bindAddress).syncUninterruptibly().channel();
		notifyStarted();
		log.info("SocketClient waiting for connections on [{}] ...", bindAddress);
	}

也就是說,只有outbound tcp server啟用時,才會對OutboundChannelInitializer做1次初始化,言外之意,剛才的單線程池實例也只會實例化1次。

試想一下,如果在outbound的處理過程中,一通電話進來,我們訂閱了一堆事件,這堆事件發過來后,如果讓workerGroup並行處理,事件的處理順序就得不到保證了,這在電話系統中是很重要的,比如:響鈴->接聽->掛斷。肯定要有順序的!所以為了保證事件處理的順序性,強制讓所有事件,都交給這個單線程池實例來處理,保證了順序性。

其實不光是outbound,inbound也是類似機制,保證事件接收時按順序處理。明白這個原理后,回過頭來想想,這個單線程池的callbackExector實例,應該處理成static靜態實例更穩妥,這樣強制讓jvm保證肯定只有一個實例,處理事件絕對有順序。

 

另外,在outbound的onConnect事件里,如果嘗試跟freeswitch發命令,會發現block住,后面的代碼完全無法執行,這也是一個大坑。解決辦法:

將onConnect的處理,放在另外1個專用線程池里

class OutboundClientHandler extends AbstractEslClientHandler {

    //這是保證事件接收順序的單線程池 
    private final ExecutorService onEslEventExecutor;
    //這是用於並發處理onConnect的多線程池
    private final ExecutorService onConnectExecutor;


    public OutboundClientHandler(IClientHandler clientHandler, ExecutorService onEslEventExecutor, ExecutorService onConnectExecutor) {
        this.clientHandler = clientHandler;
        //構造函數里允許傳入
        this.onEslEventExecutor = onEslEventExecutor;
        this.onConnectExecutor = onConnectExecutor;
    }

    @Override
    public void channelActive(final ChannelHandlerContext ctx) throws Exception {
        super.channelActive(ctx);

        // Have received a connection from FreeSWITCH server, send connect response
        long threadId = Thread.currentThread().getId();
        log.debug("Received new connection from server, sending connect message,threadId:" + threadId);

        sendApiSingleLineCommand(ctx.channel(), "connect")
                .thenAccept(response ->
                        //這里改為線程池執行
                        onConnectExecutor.execute(() -> clientHandler.onConnect(
                                new Context(ctx.channel(), OutboundClientHandler.this),
                                new EslEvent(response, true)))
                )
                .exceptionally(throwable -> {
                    ctx.channel().close();
                    handleDisconnectionNotice();
                    return null;
                });
    }

    @Override
    protected void handleEslEvent(final ChannelHandlerContext ctx, final EslEvent event) {
        //這里仍然用單一線程池處理,保證順序 
        onEslEventExecutor.execute(() -> clientHandler.onEslEvent(
                new Context(ctx.channel(), OutboundClientHandler.this), event));
    }

    ...
}

然后

public class OutboundChannelInitializer extends ChannelInitializer<SocketChannel> {

    private final IClientHandlerFactory clientHandlerFactory;

    private static ThreadFactory onEslThreadFactory = new ThreadFactoryBuilder()
            .setNameFormat("outbound-onEsl-pool-%d").build();

    //專門接收訂閱事件的單一線程池(保證順序)
    private static ExecutorService onEslExecutor = new ThreadPoolExecutor(1, 1,
            0L, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<>(100000), onEslThreadFactory);

    private static ThreadFactory onConnectThreadFactory = new ThreadFactoryBuilder()
            .setNameFormat("outbound-onConnect-pool-%d").build();

    //專用於處理新來電onConnect的多線程池
    private static ExecutorService onConnectExecutor = new ThreadPoolExecutor(32, 512,
            60L, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(2048), onConnectThreadFactory);


    public OutboundChannelInitializer(IClientHandlerFactory clientHandlerFactory) {
        this.clientHandlerFactory = clientHandlerFactory;
    }

    /**
     * 重載版本,允許開發人員初始化時,傳入自己的線程池
     * @param clientHandlerFactory
     * @param connExecutor
     * @param eslExecutor
     */
    public OutboundChannelInitializer(IClientHandlerFactory clientHandlerFactory, ExecutorService connExecutor, ExecutorService eslExecutor) {
        this.clientHandlerFactory = clientHandlerFactory;
        onEslExecutor = eslExecutor;
        onConnectExecutor = connExecutor;
    }

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        // Add the text line codec combination first
        pipeline.addLast("encoder", new StringEncoder());
        // Note that outbound mode requires the decoder to treat many 'headers' as body lines
        pipeline.addLast("decoder", new EslFrameDecoder(8092, true));

        // now the outbound client logic
        //將2個線程池,傳入實例
        pipeline.addLast("clientHandler",
                new OutboundClientHandler(clientHandlerFactory.createClientHandler(), onEslExecutor, onConnectExecutor));

    }
}

   

三、源碼上的Test示例代碼各種錯誤

https://github.com/esl-client/esl-client/blob/master/src/test/java/OutboundTest.java 這是示例源碼

String uuid = eslEvent.getEventHeaders().get("unique-id");

45行,這里應該是"Unique-ID",小寫取不到值。

另外82行,outbound的onEslEvent方法,其實永遠也不會被觸發,因為根本沒訂閱任何事件,inbound的示例部分也有同樣問題。

56行,執行后,實測下來,后面的操作其實都是阻塞的,代碼無法向下執行,建議改在新線程里執行(或者參考上面的“線程池優化”分析,修改源碼)。

 

上述這些問題,筆者已經fork了一份代碼進行了修改,有興趣的同學,歡迎fork,地址:https://github.com/yjmyzz/esl-client


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM