github上的esl-client已經N年未更新了,上面有一堆bug,記錄一下:
一、內存泄露
org.freeswitch.esl.client.transport.message.EslFrameDecoder 這個類,使用了netty的ByteBuf,對netty有了解的同學應該知道,netty底層大量使用了堆外內存,建議開發人員及時手動釋放。
https://github.com/esl-client/esl-client/issues/24 也有記載
參考下圖,手動加上釋放處理即可
二、線程池優化
org.freeswitch.esl.client.outbound.OutboundChannelInitializer 這個類,每次freeswitch有來電時,會以outbound外聯模式,通過tcp連接到esl client,初始化channel。callbackExector是一個單線程池,正常情況下問題倒不大,但是jdk源碼:
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
LinkedBlockingQueue默認是一個無界隊列:
public LinkedBlockingQueue() { this(Integer.MAX_VALUE); }
有點風險,改成下面這樣更安全點:
private ThreadFactory namedThreadFactory = new ThreadFactoryBuilder() .setNameFormat("outbound-pool-%d").build(); public ExecutorService callbackExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(10000), namedThreadFactory);
這個單線程池的用法也順帶研究了下,它真正使用的地方在於org.freeswitch.esl.client.outbound.OutboundClientHandler,用於處理freeswitch發過來的事件
@Override protected void handleEslEvent(final ChannelHandlerContext ctx, final EslEvent event) { callbackExecutor.execute(() -> clientHandler.onEslEvent( new Context(ctx.channel(), OutboundClientHandler.this), event)); }
大家知道Netty本身就有2個線程池:bossGroup,workerGroup,默認大小在io.netty.channel.MultithreadEventLoopGroup中
static { DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt( "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2)); if (logger.isDebugEnabled()) { logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS); } }
即:核數*2。 既然已經是線程池了,為啥這里esl的事件又單獨交給1個單線程池來處理呢? 先來看OutboundChannelInitializer實例化的地方,在org.freeswitch.esl.client.outbound.SocketClient的doStart里
@Override protected void doStart() { final ServerBootstrap bootstrap = new ServerBootstrap() .group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childOption(ChannelOption.TCP_NODELAY, true) .childOption(ChannelOption.SO_KEEPALIVE, true) .childHandler(new OutboundChannelInitializer(clientHandlerFactory)); serverChannel = bootstrap.bind(bindAddress).syncUninterruptibly().channel(); notifyStarted(); log.info("SocketClient waiting for connections on [{}] ...", bindAddress); }
也就是說,只有outbound tcp server啟用時,才會對OutboundChannelInitializer做1次初始化,言外之意,剛才的單線程池實例也只會實例化1次。
試想一下,如果在outbound的處理過程中,一通電話進來,我們訂閱了一堆事件,這堆事件發過來后,如果讓workerGroup並行處理,事件的處理順序就得不到保證了,這在電話系統中是很重要的,比如:響鈴->接聽->掛斷。肯定要有順序的!所以為了保證事件處理的順序性,強制讓所有事件,都交給這個單線程池實例來處理,保證了順序性。
其實不光是outbound,inbound也是類似機制,保證事件接收時按順序處理。明白這個原理后,回過頭來想想,這個單線程池的callbackExector實例,應該處理成static靜態實例更穩妥,這樣強制讓jvm保證肯定只有一個實例,處理事件絕對有順序。
另外,在outbound的onConnect事件里,如果嘗試跟freeswitch發命令,會發現block住,后面的代碼完全無法執行,這也是一個大坑。解決辦法:
將onConnect的處理,放在另外1個專用線程池里
class OutboundClientHandler extends AbstractEslClientHandler { //這是保證事件接收順序的單線程池 private final ExecutorService onEslEventExecutor; //這是用於並發處理onConnect的多線程池 private final ExecutorService onConnectExecutor; public OutboundClientHandler(IClientHandler clientHandler, ExecutorService onEslEventExecutor, ExecutorService onConnectExecutor) { this.clientHandler = clientHandler; //構造函數里允許傳入 this.onEslEventExecutor = onEslEventExecutor; this.onConnectExecutor = onConnectExecutor; } @Override public void channelActive(final ChannelHandlerContext ctx) throws Exception { super.channelActive(ctx); // Have received a connection from FreeSWITCH server, send connect response long threadId = Thread.currentThread().getId(); log.debug("Received new connection from server, sending connect message,threadId:" + threadId); sendApiSingleLineCommand(ctx.channel(), "connect") .thenAccept(response -> //這里改為線程池執行 onConnectExecutor.execute(() -> clientHandler.onConnect( new Context(ctx.channel(), OutboundClientHandler.this), new EslEvent(response, true))) ) .exceptionally(throwable -> { ctx.channel().close(); handleDisconnectionNotice(); return null; }); } @Override protected void handleEslEvent(final ChannelHandlerContext ctx, final EslEvent event) { //這里仍然用單一線程池處理,保證順序 onEslEventExecutor.execute(() -> clientHandler.onEslEvent( new Context(ctx.channel(), OutboundClientHandler.this), event)); } ... }
然后
public class OutboundChannelInitializer extends ChannelInitializer<SocketChannel> { private final IClientHandlerFactory clientHandlerFactory; private static ThreadFactory onEslThreadFactory = new ThreadFactoryBuilder() .setNameFormat("outbound-onEsl-pool-%d").build(); //專門接收訂閱事件的單一線程池(保證順序) private static ExecutorService onEslExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(100000), onEslThreadFactory); private static ThreadFactory onConnectThreadFactory = new ThreadFactoryBuilder() .setNameFormat("outbound-onConnect-pool-%d").build(); //專用於處理新來電onConnect的多線程池 private static ExecutorService onConnectExecutor = new ThreadPoolExecutor(32, 512, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(2048), onConnectThreadFactory); public OutboundChannelInitializer(IClientHandlerFactory clientHandlerFactory) { this.clientHandlerFactory = clientHandlerFactory; } /** * 重載版本,允許開發人員初始化時,傳入自己的線程池 * @param clientHandlerFactory * @param connExecutor * @param eslExecutor */ public OutboundChannelInitializer(IClientHandlerFactory clientHandlerFactory, ExecutorService connExecutor, ExecutorService eslExecutor) { this.clientHandlerFactory = clientHandlerFactory; onEslExecutor = eslExecutor; onConnectExecutor = connExecutor; } @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); // Add the text line codec combination first pipeline.addLast("encoder", new StringEncoder()); // Note that outbound mode requires the decoder to treat many 'headers' as body lines pipeline.addLast("decoder", new EslFrameDecoder(8092, true)); // now the outbound client logic //將2個線程池,傳入實例 pipeline.addLast("clientHandler", new OutboundClientHandler(clientHandlerFactory.createClientHandler(), onEslExecutor, onConnectExecutor)); } }
三、源碼上的Test示例代碼各種錯誤
https://github.com/esl-client/esl-client/blob/master/src/test/java/OutboundTest.java 這是示例源碼
String uuid = eslEvent.getEventHeaders().get("unique-id");
45行,這里應該是"Unique-ID",小寫取不到值。
另外82行,outbound的onEslEvent方法,其實永遠也不會被觸發,因為根本沒訂閱任何事件,inbound的示例部分也有同樣問題。
56行,執行后,實測下來,后面的操作其實都是阻塞的,代碼無法向下執行,建議改在新線程里執行(或者參考上面的“線程池優化”分析,修改源碼)。
上述這些問題,筆者已經fork了一份代碼進行了修改,有興趣的同學,歡迎fork,地址:https://github.com/yjmyzz/esl-client