github上的esl-client已經N年未更新了,上面有一堆bug,記錄一下:
一、內存泄露
org.freeswitch.esl.client.transport.message.EslFrameDecoder 這個類,使用了netty的ByteBuf,對netty有了解的同學應該知道,netty底層大量使用了堆外內存,建議開發人員及時手動釋放。
https://github.com/esl-client/esl-client/issues/24 也有記載
參考下圖,手動加上釋放處理即可

二、線程池優化

org.freeswitch.esl.client.outbound.OutboundChannelInitializer 這個類,每次freeswitch有來電時,會以outbound外聯模式,通過tcp連接到esl client,初始化channel。callbackExector是一個單線程池,正常情況下問題倒不大,但是jdk源碼:
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
LinkedBlockingQueue默認是一個無界隊列:
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
有點風險,改成下面這樣更安全點:
private ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
.setNameFormat("outbound-pool-%d").build();
public ExecutorService callbackExecutor = new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(10000), namedThreadFactory);
這個單線程池的用法也順帶研究了下,它真正使用的地方在於org.freeswitch.esl.client.outbound.OutboundClientHandler,用於處理freeswitch發過來的事件
@Override
protected void handleEslEvent(final ChannelHandlerContext ctx, final EslEvent event) {
callbackExecutor.execute(() -> clientHandler.onEslEvent(
new Context(ctx.channel(), OutboundClientHandler.this), event));
}
大家知道Netty本身就有2個線程池:bossGroup,workerGroup,默認大小在io.netty.channel.MultithreadEventLoopGroup中
static {
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
"io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
if (logger.isDebugEnabled()) {
logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
}
}
即:核數*2。 既然已經是線程池了,為啥這里esl的事件又單獨交給1個單線程池來處理呢? 先來看OutboundChannelInitializer實例化的地方,在org.freeswitch.esl.client.outbound.SocketClient的doStart里
@Override
protected void doStart() {
final ServerBootstrap bootstrap = new ServerBootstrap()
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new OutboundChannelInitializer(clientHandlerFactory));
serverChannel = bootstrap.bind(bindAddress).syncUninterruptibly().channel();
notifyStarted();
log.info("SocketClient waiting for connections on [{}] ...", bindAddress);
}
也就是說,只有outbound tcp server啟用時,才會對OutboundChannelInitializer做1次初始化,言外之意,剛才的單線程池實例也只會實例化1次。
試想一下,如果在outbound的處理過程中,一通電話進來,我們訂閱了一堆事件,這堆事件發過來后,如果讓workerGroup並行處理,事件的處理順序就得不到保證了,這在電話系統中是很重要的,比如:響鈴->接聽->掛斷。肯定要有順序的!所以為了保證事件處理的順序性,強制讓所有事件,都交給這個單線程池實例來處理,保證了順序性。
其實不光是outbound,inbound也是類似機制,保證事件接收時按順序處理。明白這個原理后,回過頭來想想,這個單線程池的callbackExector實例,應該處理成static靜態實例更穩妥,這樣強制讓jvm保證肯定只有一個實例,處理事件絕對有順序。
另外,在outbound的onConnect事件里,如果嘗試跟freeswitch發命令,會發現block住,后面的代碼完全無法執行,這也是一個大坑。解決辦法:
將onConnect的處理,放在另外1個專用線程池里
class OutboundClientHandler extends AbstractEslClientHandler {
//這是保證事件接收順序的單線程池
private final ExecutorService onEslEventExecutor;
//這是用於並發處理onConnect的多線程池
private final ExecutorService onConnectExecutor;
public OutboundClientHandler(IClientHandler clientHandler, ExecutorService onEslEventExecutor, ExecutorService onConnectExecutor) {
this.clientHandler = clientHandler;
//構造函數里允許傳入
this.onEslEventExecutor = onEslEventExecutor;
this.onConnectExecutor = onConnectExecutor;
}
@Override
public void channelActive(final ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
// Have received a connection from FreeSWITCH server, send connect response
long threadId = Thread.currentThread().getId();
log.debug("Received new connection from server, sending connect message,threadId:" + threadId);
sendApiSingleLineCommand(ctx.channel(), "connect")
.thenAccept(response ->
//這里改為線程池執行
onConnectExecutor.execute(() -> clientHandler.onConnect(
new Context(ctx.channel(), OutboundClientHandler.this),
new EslEvent(response, true)))
)
.exceptionally(throwable -> {
ctx.channel().close();
handleDisconnectionNotice();
return null;
});
}
@Override
protected void handleEslEvent(final ChannelHandlerContext ctx, final EslEvent event) {
//這里仍然用單一線程池處理,保證順序
onEslEventExecutor.execute(() -> clientHandler.onEslEvent(
new Context(ctx.channel(), OutboundClientHandler.this), event));
}
...
}
然后
public class OutboundChannelInitializer extends ChannelInitializer<SocketChannel> {
private final IClientHandlerFactory clientHandlerFactory;
private static ThreadFactory onEslThreadFactory = new ThreadFactoryBuilder()
.setNameFormat("outbound-onEsl-pool-%d").build();
//專門接收訂閱事件的單一線程池(保證順序)
private static ExecutorService onEslExecutor = new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(100000), onEslThreadFactory);
private static ThreadFactory onConnectThreadFactory = new ThreadFactoryBuilder()
.setNameFormat("outbound-onConnect-pool-%d").build();
//專用於處理新來電onConnect的多線程池
private static ExecutorService onConnectExecutor = new ThreadPoolExecutor(32, 512,
60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(2048), onConnectThreadFactory);
public OutboundChannelInitializer(IClientHandlerFactory clientHandlerFactory) {
this.clientHandlerFactory = clientHandlerFactory;
}
/**
* 重載版本,允許開發人員初始化時,傳入自己的線程池
* @param clientHandlerFactory
* @param connExecutor
* @param eslExecutor
*/
public OutboundChannelInitializer(IClientHandlerFactory clientHandlerFactory, ExecutorService connExecutor, ExecutorService eslExecutor) {
this.clientHandlerFactory = clientHandlerFactory;
onEslExecutor = eslExecutor;
onConnectExecutor = connExecutor;
}
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// Add the text line codec combination first
pipeline.addLast("encoder", new StringEncoder());
// Note that outbound mode requires the decoder to treat many 'headers' as body lines
pipeline.addLast("decoder", new EslFrameDecoder(8092, true));
// now the outbound client logic
//將2個線程池,傳入實例
pipeline.addLast("clientHandler",
new OutboundClientHandler(clientHandlerFactory.createClientHandler(), onEslExecutor, onConnectExecutor));
}
}
三、源碼上的Test示例代碼各種錯誤
https://github.com/esl-client/esl-client/blob/master/src/test/java/OutboundTest.java 這是示例源碼
String uuid = eslEvent.getEventHeaders().get("unique-id");
45行,這里應該是"Unique-ID",小寫取不到值。
另外82行,outbound的onEslEvent方法,其實永遠也不會被觸發,因為根本沒訂閱任何事件,inbound的示例部分也有同樣問題。
56行,執行后,實測下來,后面的操作其實都是阻塞的,代碼無法向下執行,建議改在新線程里執行(或者參考上面的“線程池優化”分析,修改源碼)。
上述這些問題,筆者已經fork了一份代碼進行了修改,有興趣的同學,歡迎fork,地址:https://github.com/yjmyzz/esl-client
