fluent-logger-java is a Java library, to record events via Fluentd, from Java application. https://github.com/fluent/fluent-logger-java
使用該sdk過程發現,tcp連接斷開之后,該sdk的重連機制無效。
2018-01-26 12:36:25,620 ERROR [org.fluentd.logger.sender.RawSocketSender] - <org.fluentd.logger.sender.RawSocketSender> java.net.SocketException: Software caused connection abort: socket write error at java.net.SocketOutputStream.socketWrite0(Native Method) at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:113) at java.net.SocketOutputStream.write(SocketOutputStream.java:159) at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140) at org.fluentd.logger.sender.RawSocketSender.flush(RawSocketSender.java:200) at org.fluentd.logger.sender.RawSocketSender.send(RawSocketSender.java:188) at org.fluentd.logger.sender.RawSocketSender.emit(RawSocketSender.java:158) at org.fluentd.logger.sender.RawSocketSender.emit(RawSocketSender.java:140) at org.fluentd.logger.sender.RawSocketSender.emit(RawSocketSender.java:135) at org.fluentd.logger.FluentLogger.log(FluentLogger.java:101) at org.fluentd.logger.FluentLogger.log(FluentLogger.java:86) at fluentdDemo.fluentdDemo.main(fluentdDemo.java:90)
查看源碼:見RawSocketSender類
private void reconnect() throws IOException { if (socket == null) { connect(); } else if (socket.isClosed() || (!socket.isConnected())) { close(); connect(); } }
判斷 Socket 遠程端連接如果關閉的話,就要重建連接。Socket的類提供了一些已經封裝好的方法, 如 isClosed()、isConnected()、isInputStreamShutdown()、isOutputStreamShutdown()等,
在測試時發現,這些方法都是本地端的狀態,無法判斷遠端是否已經斷開連接。
有些同學處理類似問題時,通過OutputStream發送一段測試數據,如果發送失敗就表示遠端已經斷開連接,類似ping,但是這樣會影響到正常的輸出數據,遠端無法把正常數據和測試數據分開。
其實,這種方法也是可以的,只不過,不要發送測試數據,直接發送需要發送的數據,一旦失敗,就主動close socket,再新建連接,再重新發送就行了。
也有些同學想到通過發送緊急數據,來驗證連接狀態,見socket類(如下),如果失敗,就close socket,再新建連接。
/** * Send one byte of urgent data on the socket. The byte to be sent is the lowest eight * bits of the data parameter. The urgent byte is * sent after any preceding writes to the socket OutputStream * and before any future writes to the OutputStream. * @param data The byte of data to send * @exception IOException if there is an error * sending the data. * @since 1.4 */ public void sendUrgentData (int data) throws IOException { if (!getImpl().supportsUrgentData ()) { throw new SocketException ("Urgent data not supported"); } getImpl().sendUrgentData (data); }
可通過如下寫法實現:
/** * 判斷是否斷開連接,斷開返回true,沒有返回false * @param socket * @return */ public Boolean isServerClose(Socket socket){ try{ socket.sendUrgentData(0xFF);//發送1個字節的緊急數據,默認情況下,服務器端沒有開啟緊急數據處理,不影響正常通信 return false; }catch(Exception se){ return true; } }
前提:對方Socket的SO_OOBINLINE屬性沒有打開,就會自動舍棄這個字節,而SO_OOBINLINE屬性默認情況下就是關閉的
見SocketOptions接口
/** * When the OOBINLINE option is set, any TCP urgent data received on * the socket will be received through the socket input stream. * When the option is disabled (which is the default) urgent data * is silently discarded. * * @see Socket#setOOBInline * @see Socket#getOOBInline */ @Native public final static int SO_OOBINLINE = 0x1003;
當然,我覺得也可以通過定時發送緊急數據來做心跳,確保tcp長連接保活,對方可以不用回應。
測試結果:
這兩種方式再連接斷開后的第一次發送數據,並沒有異常,但是server端沒收到數據。第二次發送時候,才檢測到連接異常。
有同學的說法是:Socket通過發送數據sendUrgentData()或PrintWriter 發送數據時的數據太小,被放到緩沖區沒用實時發送導致的。后來嘗試設置setSendBufferSize(1)發現能夠正常出現異常,這樣就能夠判斷實時網絡連接斷開了。(網上資料說sendUrgentData是實時發送數據不經過緩沖區的,但跟我實際測試的不一樣,有待驗證)
查看了一下源碼,緊急數據的發送時間是,在之前write到OutputStream之后,在接下來write到OutputStream之前
/** * Send one byte of urgent data on the socket. The byte to be sent is the lowest eight * bits of the data parameter. The urgent byte is * sent after any preceding writes to the socket OutputStream * and before any future writes to the OutputStream. * @param data The byte of data to send * @exception IOException if there is an error * sending the data. * @since 1.4 */ public void sendUrgentData (int data) throws IOException { if (!getImpl().supportsUrgentData ()) { throw new SocketException ("Urgent data not supported"); } getImpl().sendUrgentData (data); }
嘗試設置setSendBufferSize(1)發現能夠正常出現異常,這樣就能夠判斷實時網絡連接斷開了。
fluentd的in_forward插件提供了基於udp的心跳監聽,遺憾的是fluent-logger-java並沒有做對應的心跳機制。
https://docs.fluentd.org/v0.12/articles/in_forward