TCP 連接半關閉問題
在用 rust + tokio 的網絡處理時,碰到一個很有意思的 tcp 連接並關閉的問題。
具體是這樣的,首先一個 tcp stream 拆分(split)為 SplitSink + SplitStream 的方式,各自單獨工作。
通過 SplitStream 讀取到客戶端的請求,進行處理並發送給用戶,同時,由於服務器會廣播消息,因此使用一個 mpsc 的通道,把收到的東西整合發到 SplitSink。
代碼如下:
let event_rx = ... // 服務器廣播
...
let (sink, mut stream) = framed.split(); // 拆分
let (tx, rx) = unbounded(); // 通道,mpsc
...
// 客戶端消息處理
let tx_clone = tx.clone();
tokio::spawn(async move {
let mut tx = tx_clone;
while let Some(Ok(req)) = stream.next().await {
let resp = handle(req);
tx.send(resp).await.expecte("Channel failed");
}
});
// 廣播處理
tokio::spawn(event_rx.forward(tx));
// 發送回客戶端
tokio::spawn(rx.map(|s| Ok(Bytes::from(s))).forward(sink));
本來好像也沒什么問題,但是用 netcat 連上訪問時,就發現有問題
nc 127.0.0.1 8080
...
用 ctrl+c
強制結束后,用 lsof -i -n -P
看一下監聽的端口,發現有很多連接處於 CLOSE_WAIT
狀態,即半關閉狀態。
從 tcp 的狀態轉換來看,當客戶端主動關閉時,會發個 FIN 給服務器,服務器進行 CLOSE_WAIT
,如果此時服務正在忙着輸出或其它事情,就會卡在這個狀態。
問題就在廣播處理上,盡管 SplitStream 已進行了關閉(可在 while 后加個日志確定),但是廣播還在繼續,因此輸出的 SplitSink 並沒有關閉。
因此,必須讓廣播中斷,或 SplitSink 知道已完成了輸出。
方法1,通道中用 Option ,然后在 Sink 轉換為 Err
// 客戶端消息處理
let resp = handle(req);
tx.send(Some(resp)).await.expect("Channel failed");
// 發送回客戶端
tokio::spawn(rx.map(|s| {
match s {
Some(s) => Ok(Bytes::from(s)),
None => Err(io::ErrorKink::Other.into()) // None 時中斷
}
}).forward(sink));
方法2,在廣播時進行處理
let close_flag = Arc::new(AtomicBool::new(false));
let close_flag_clone = close_flag.clone();
// 客戶端消息處理
while ... {
let resp = handle(req);
tx.send(resp).await.expect("Channel failed");
}
close_flag_clone.store(false, Ordering::Relaxed);
// 廣播消息處理
tokio::spawn(async move {
while let Some(event) = event_rx.next().await {
if close_flag_clone.load(Ordering::Relaxed) {
break;
}
...
}
});
2種方法,都可以停止相應的輸出,解決因為存在廣播其它的處理等導致半連接問題。