tokio TCP 連接半關閉問題


TCP 連接半關閉問題

在用 rust + tokio 的網絡處理時,碰到一個很有意思的 tcp 連接並關閉的問題。

具體是這樣的,首先一個 tcp stream 拆分(split)為 SplitSink + SplitStream 的方式,各自單獨工作。

通過 SplitStream 讀取到客戶端的請求,進行處理並發送給用戶,同時,由於服務器會廣播消息,因此使用一個 mpsc 的通道,把收到的東西整合發到 SplitSink。

代碼如下:

let event_rx = ...                          // 服務器廣播
...
let (sink, mut stream) = framed.split();    // 拆分
let (tx, rx) = unbounded();                 // 通道,mpsc
...

// 客戶端消息處理
let tx_clone = tx.clone();
tokio::spawn(async move {
    let mut tx = tx_clone;
    while let Some(Ok(req)) = stream.next().await {
        let resp = handle(req);
        tx.send(resp).await.expecte("Channel failed");
    }
});

// 廣播處理
tokio::spawn(event_rx.forward(tx));

// 發送回客戶端
tokio::spawn(rx.map(|s| Ok(Bytes::from(s))).forward(sink));

本來好像也沒什么問題,但是用 netcat 連上訪問時,就發現有問題

nc 127.0.0.1 8080
...

ctrl+c 強制結束后,用 lsof -i -n -P 看一下監聽的端口,發現有很多連接處於 CLOSE_WAIT 狀態,即半關閉狀態。

從 tcp 的狀態轉換來看,當客戶端主動關閉時,會發個 FIN 給服務器,服務器進行 CLOSE_WAIT,如果此時服務正在忙着輸出或其它事情,就會卡在這個狀態。

問題就在廣播處理上,盡管 SplitStream 已進行了關閉(可在 while 后加個日志確定),但是廣播還在繼續,因此輸出的 SplitSink 並沒有關閉。
因此,必須讓廣播中斷,或 SplitSink 知道已完成了輸出。

方法1,通道中用 Option ,然后在 Sink 轉換為 Err

// 客戶端消息處理
let resp = handle(req);
tx.send(Some(resp)).await.expect("Channel failed");

// 發送回客戶端
tokio::spawn(rx.map(|s| {
    match s {
        Some(s) => Ok(Bytes::from(s)),
        None => Err(io::ErrorKink::Other.into())    // None 時中斷
    }
}).forward(sink));

方法2,在廣播時進行處理

let close_flag = Arc::new(AtomicBool::new(false));
let close_flag_clone = close_flag.clone();

// 客戶端消息處理
while ... {
    let resp = handle(req);
    tx.send(resp).await.expect("Channel failed");
}
close_flag_clone.store(false, Ordering::Relaxed);

// 廣播消息處理
tokio::spawn(async move {
    while let Some(event) = event_rx.next().await {
        if close_flag_clone.load(Ordering::Relaxed) {
            break;
        }
        ...
    }
});

2種方法,都可以停止相應的輸出,解決因為存在廣播其它的處理等導致半連接問題。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM