Node.js socket end、finish、close事件與stream


源文:https://github.com/cool-firer/docs/blob/main/node.js_socket_stream.md

 

net socket與stream事件

測試程序

tcp_server.js

查看代碼
const net = require('net');

net.createServer(function(c) {
  console.log('conneceted');

  c.on('finish', function() {
    console.log('finish 111');
  })
  
  c.on('close', function() {
    console.log('close');
  })
  
  c.on('finish', function() {
    console.log('finish 222');
  })

  c.on('end', function() {
    console.log('end');
  });

}).listen(9988);

console.log('listen on 9988', ' pid:', process.pid)

 

tcp_client.js

查看代碼
const net = require('net');

const c = net.createConnection({
  port: 9988
})

c.on('finish', function() {
  console.log('finish 111');
})

c.on('close', function() {
  console.log('close');
})

c.on('finish', function() {
  console.log('finish 222');
})

 

啟動server,再啟動cilent,ctrl + c 直接退出client,server端打印出:

$ node tcp_server.js 
listen on 9988  pid: 27157
conneceted
end
finish 111
finish 222
close

 

需要查socket的文檔和stream的文檔,再配合tcp的四次揮手理解。 

 

socket的end事件:

https://nodejs.org/docs/latest-v10.x/api/net.html#net_event_end

Emitted when the other end of the socket sends a FIN packet, thus ending the readable side of the socket.

By default (allowHalfOpen is false) the socket will send a FIN packet back and destroy its file descriptor once it has written out its pending write queue. However, if allowHalfOpen is set to true, the socket will not automatically end() its writable side, allowing the user to write arbitrary amounts of data. The user must call end() explicitly to close the connection (i.e. sending a FIN packet back).

意思是收到了對端發來的FIN包就觸發'end'事件,表示不再可讀。

為了更好的理解,看一下stream的end事件:

https://nodejs.org/docs/latest-v10.x/api/stream.html#stream_event_end

The 'end' event is emitted when there is no more data to be consumed from the stream.

注意,只有Readable stream才有end事件。

socket是Duplex stream,可以看到socket的end與Readable stream的end意義上是對應的,表示不再有數據可讀。

socket_stream_normal

所以,在1觸發,最先打印出了end。

 

socket沒有finish事件,那么只能是stream里的:

https://nodejs.org/docs/latest-v10.x/api/stream.html#stream_event_finish

The 'finish' event is emitted after the stream.end() method has been called, and all data has been flushed to the underlying system.

意思是所有的內部buffer數據都被刷到底層系統。同時注意,只有Writable stream才有finish事件。可以猜測,只有當前socket端不再可寫時,才會觸發,而這正是當前socket向對端發送FIN后。

socket_stream_normal

對應2,打印出finish。

 

之后,socket的close事件:

https://nodejs.org/docs/latest-v10.x/api/net.html#net_event_close_1

Added in: v0.1.90

  • hadError true if the socket had a transmission error.

Emitted once the socket is fully closed. The argument hadError is a boolean which says if the socket was closed due to a transmission error.

socket完全被關閉時觸發。

同時Readable stream和Writable stream都有close事件,看一下:

Readable:

https://nodejs.org/docs/latest-v10.x/api/stream.html#stream_event_close_1

The 'close' event is emitted when the stream and any of its underlying resources (a file descriptor, for example) have been closed. The event indicates that no more events will be emitted, and no further computation will occur.

 

Writable:

https://nodejs.org/docs/latest-v10.x/api/stream.html#stream_event_close

The 'close' event is emitted when the stream and any of its underlying resources (a file descriptor, for example) have been closed. The event indicates that no more events will be emitted, and no further computation will occur.

Readable和Writable兩種流對close事件的描述高度一致,都是說流的底層資源(文件描述符)被關閉了,這也與socket的close事件相對應。

socket_stream_normal

對應3,打印close。

 

socket.end與消費

如果我們改一下tcp_client.js的代碼,把ctrl + c換成socket.end()方法,服務端保持不變呢?

查看代碼
// tcp_client.js
const net = require('net');

const c = net.createConnection({
  port: 9988
})

c.on('end', function() {
  console.log('end');
})
c.on('finish', function() {
  console.log('finish 111');
})
c.on('close', function() {
  console.log('close');
})
c.on('finish', function() {
  console.log('finish 222');
})
setTimeout(function() {
  c.end('what the hell');
}, 3000)

3s后,調用end()方法,關閉當前連接。

先看一下socket.end()方法描述:

https://nodejs.org/docs/latest-v10.x/api/net.html#net_socket_end_data_encoding_callback

socket.end([data][, encoding][, callback])[src]#

Added in: v0.1.90

  • data | |
  • encoding Only used when data is stringDefault: 'utf8'.
  • callback Optional callback for when the socket is finished.
  • Returns: <net.Socket> The socket itself.

Half-closes the socket. i.e., it sends a FIN packet. It is possible the server will still send some data.

If data is specified, it is equivalent to calling socket.write(data, encoding) followed by socket.end().

半關閉socket,向對端發送FIN包。

那么,按照新改的代碼,服務端是不是就會走四次揮手流程,依次打印出'end'、'finish'、'close'呢?先看客戶端的輸出:

$ node tcp_cilent.js 
finish 111
finish 222

再看服務端的輸出:

$ node tcp_server.js 
listen on 9988  pid: 32405
conneceted

調用了end()方法,連接竟然沒有斷開?而且服務端也沒有觸發'end'事件?這。。。

ineedav

線索在stream的end事件描述里:

https://nodejs.org/docs/latest-v10.x/api/stream.html#stream_event_end

Event: 'end'#

Added in: v0.9.4

The 'end' event is emitted when there is no more data to be consumed from the stream.

The 'end' event will not be emitted unless the data is completely consumed. This can be accomplished by switching the stream into flowing mode, or by calling stream.read() repeatedly until all data has been consumed.

除非data被完全消費,否則'end'不會觸發。

還有在文檔的最后面,也有講,並給出了例子:

https://nodejs.org/docs/latest-v10.x/api/stream.html#stream_compatibility_with_older_node_js_versions

 

再去看服務端的代碼,沒有為新來的socket綁定'data'事件、也沒有'readable' + read()方法消費內部data,socket處於pause mode。或者可以理解為,FIN包被排到了內部buffer的尾部,只有消費完了前面的data,才能輪到FIN包。

tcp_demo1

所以,要讓他正常走完四次揮手,需要消費一下服務端的socket,像這樣:

查看代碼
const net = require('net');


net.createServer(function(c) {
  console.log('conneceted');

  c.on('finish', function() {
    console.log('finish 111');
  })
  c.on('close', function() {
    console.log('close');
  })
  c.on('finish', function() {
    console.log('finish 222');
  })
  c.on('end', function() {
    console.log('end');
  });

  setTimeout(async () => {
    /**
      幾種方法選一種
    */
    
    // 方法1: 用flow mode
    c.on('data', (chunk) => {
      console.log(`Received ${chunk.length} bytes of data. chunkStr:${chunk.toString()}`);
    });
  }, 5000);
  
  	// 方法2: pause mode readable + read方法
    c.on('readable', () => {
      let chunk;
      while (null !== (chunk = c.read())) {
        console.log(`Received ${chunk.length} bytes of data. chunkStr:${chunk.toString()}`);
      }
    });
  
  	// 方法3: pause mode 直接read
    for(let i = 0; i < 16;i++) {
      const internelBuf = c.read(1);
      console.log(`${i} Received ${internelBuf ? internelBuf.length + ' bytes of data. chunkStr:' +  internelBuf.toString() : null }`);
  
      await new Promise((r,j) => {
        setTimeout(() => {
          r(true);
        }, 2000)
      })
    }
  
  	// 方法4: flow mode resume方法
    c.resume();
  

}).listen(9988);

console.log('listen on 9988', ' pid:', process.pid)

 如此一來,客戶端、服務端都正常打印。

$ node tcp_cilent.js 
finish 111
finish 222
end
close

 

$ node tcp_server.js 
listen on 9988  pid: 32627
conneceted
end
finish 111
finish 222
close

所以,socket 'end'事件的觸發,需要加上一個條件:就是當前socket需要被消費完並且收到FIN包,才會觸發。

 

socket.destroy與finish

如果,把end改為destroy呢?

查看代碼
// tcp_client.js
const net = require('net');

const c = net.createConnection({
  port: 9988
})

c.on('end', function() {
  console.log('end');
})
c.on('finish', function() {
  console.log('finish 111');
})
c.on('close', function() {
  console.log('close');
})
c.on('finish', function() {
  console.log('finish 222');
})

setTimeout(function() {
  // c.end('what the hell');
  c.destroy();

}, 3000)

在官方文檔里,關於destroy的描述是這樣:

socket部分: https://nodejs.org/docs/latest-v10.x/api/net.html#net_socket_destroy_exception

socket.destroy([exception])

Added in: v0.1.90

Ensures that no more I/O activity happens on this socket. Only necessary in case of errors (parse error or so).

If exception is specified, an 'error' event will be emitted and any listeners for that event will receive exception as an argument.

 

stream部分: https://nodejs.org/docs/latest-v10.x/api/stream.html#stream_writable_destroy_error

writable.destroy([error])#

Added in: v8.0.0

Destroy the stream, and emit the passed 'error' and a 'close' event. After this call, the writable stream has ended and subsequent calls to write() or end() will result in an ERR_STREAM_DESTROYED error. Implementors should not override this method, but instead implement writable._destroy().

stream部分說,銷毀流,並觸發'close'事件,客戶端確實是這樣:

$ node tcp_cilent.js 
close

而服務端,不管有沒有消費socket,都正常打印:

$ node tcp_server.js 
listen on 9988  pid: 32712
conneceted
end
finish 111
finish 222
close

之前說過,發送FIN包后會觸發'finish',但這里destroy並沒有觸發'finish',按照來說,不管是end還是destroy,都會向對端發送FIN,只是destroy發完后就直接銷毀fd, 不等對端的ACK。

Event: 'finish'#

Added in: v0.9.4

The 'finish' event is emitted after the stream.end() method has been called, and all data has been flushed to the underlying system.

所以,發送FIN包后就不會馬上觸發'finish',而是發送FIN包,並且內部buffer被刷到底層fd后才會觸發。

 

socket與Transform

再來變一下,如果把socket pipe進一個自定義的Transform呢?很多網絡NPM客戶端庫都是這么做的,比如mysql2、AMQ等。

改寫一下服務端代碼:

查看代碼
// tcp_server.js
const net = require('net');

class Incoming extends require('stream').Transform {
  _flush(callback) {
    console.log('  incoming _flush')
    this.push(null);
    callback();
  }

  _transform(chunk, encoding, callback) {
    callback(null, chunk);
  }
}
const income = new Incoming();


net.createServer(function(c) {
  console.log('conneceted');
  
  c.on('finish', function() {
    console.log('finish 111');
  })
  c.on('close', function() {
    console.log('close');
  })
  c.on('finish', function() {
    console.log('finish 222');
  })
  c.on('end', function() {
    console.log('end');
  })

  }, 5000)

  c.pipe(income);

  income.on('end' ,function() {
    console.log('  incoming end')
  })
  income.on('finish' ,function() {
    console.log('  incoming finish')
  })
  income.on('close' ,function() {
    console.log('  incoming close')
  })

}).listen(9988);

console.log('listen on 9988', ' pid:', process.pid)
客戶端保持不變,2s后end
查看代碼
// tcp_client.js
const net = require('net');


const c = net.createConnection({
  port: 9988
})

c.on('end', function() {
  console.log('end');
})
c.on('finish', function() {
  console.log('finish 111');
})
c.on('close', function() {
  console.log('close');
})
c.on('finish', function() {
  console.log('finish 222');
})

setTimeout(function() {
  c.end('what the hell');
}, 3000)


此場景下,客戶端輸出:

$ node tcp_cilent.js 
finish 111
finish 222
end
close

 

服務端輸出:

$ node tcp_server.js 
listen on 9988  pid: 34930
conneceted
end
  incoming _flush
  incoming finish
finish 111
finish 222
close

可以看到,兩端的socket都正常關閉了。

socket.pipe(income)實現上會為socket綁定data事件,把從socket讀取的數據forward到income。

對於socket來說,是有消費數據的,所以socket可以正常走完end、finish、close。

那么,要如何理解income的finish呢?

socket_pipe

前面說過,對於socket,finish事件是在當前端發送FIN,且flush到底層fd后觸發,表示不能往當前端寫入任何數據,確切的說,是不能再寫入數據到當前端的內部writable buffer,體現在代碼上,就是socket.write('xxx')。觸發finish的事件可以用這樣的偽代碼表示:

socket.write('xxx') ---> socket.write(FIN) ---> flushTo(fd) ---> emit('finish')

對於income,沒有底層fd,它的底層fd就是它自己,由socket轉來的數據在代碼上相當於income.write('xxx'),同樣表示不能再往income里寫數據。用偽代碼表示:

income.write('xxx') ---> income.write(FIN) ---> flushTo(income buffer) ---> emit('finish')

至於_flush方法在finish之前打印,是因為:

https://nodejs.org/docs/latest-v10.x/api/stream.html#stream_transform_flush_callback

transform._flush(callback)#

  • callback Function A callback function (optionally with an error argument and data) to be called when remaining data has been flushed.

Custom Transform implementations may implement the transform._flush() method. This will be called when there is no more written data to be consumed, but before the 'end' event is emitted signaling the end of the Readable stream.

 這是寫入數據鏈會調用的最后一個方法,此時數據還沒flush,必然會在finish事件之前。

 

如果想讓income像socket一樣,正常走完end、finish、close,那么同樣的,需要消費完income的內部數據才會觸發,方法也是跟前面的方法一樣,綁定data事件、調用resume、多次調用read。

查看代碼
const net = require('net');

class Incoming extends require('stream').Transform {
  _flush(callback) {
    console.log('  incoming _flush')
    this.push(null);
    callback();
  }

  _transform(chunk, encoding, callback) {
    callback(null, chunk);
  }
}
const income = new Incoming();


net.createServer(function(c) {
  console.log('conneceted');

  c.on('finish', function() {
    console.log('finish 111');
  })
  
  c.on('close', function() {
    console.log('close');
  })
  
  c.on('finish', function() {
    console.log('finish 222');
  })

  c.on('end', function() {
    console.log('end');
  });

  c.pipe(income);
  income.on('end' ,function() {
    console.log('  incoming end')
  })
  income.on('finish' ,function() {
    console.log('  incoming finish')
  })
  income.on('close' ,function() {
    console.log('  incoming close')
  })

  setTimeout(async () => {

    // 方法1: 用flow mode
    income.on('data', (chunk) => {
      console.log(`income Received ${chunk.length} bytes of data. chunkStr:${chunk.toString()}`);
    })
    
  	// 方法2: pause mode readable + read方法
    income.on('readable', () => {
      let chunk;
      while (null !== (chunk = income.read())) {
        console.log(`income Received ${chunk.length} bytes of data. chunkStr:${chunk.toString()}`);
      }
    });

  	// 方法3: pause mode 直接read
    for(let i = 0; i < 16;i++) {
      const internelBuf = income.read(1);
      console.log(`${i} income Received ${internelBuf ? internelBuf.length + ' bytes of data. chunkStr:' +  internelBuf.toString() : null }`);
  
      await new Promise((r,j) => {
        setTimeout(() => {
          r(true);
        }, 2000)
      })
    }

  	// 方法4: flow mode resume方法
    income.resume();
  }, 5000)


}).listen(9988);

console.log('listen on 9988', ' pid:', process.pid)

此時服務端就可以正常輸出走完end、finish、close:

$ node tcp_server.js 
listen on 9988  pid: 35495
conneceted
end
  incoming _flush
  incoming finish
finish 111
finish 222
close
income Received 13 bytes of data. chunkStr:what the hell
  incoming end
  incoming close


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM