為什么需要心跳檢測?
正常的情況客戶端斷開連接會向服務端發送一個fin包,服務端收到fin包后得知客戶端連接斷開,則立刻觸發onClose事件回調。
但是有些極端情況如客戶端掉電、網絡關閉、拔網線、路由故障等,這些極端情況客戶端無法發送fin包給服務端,服務端便無法知道連接已經斷開。如果客戶端與服務端定時有心跳數據傳輸,則會比較及時的發現連接斷開,觸發onClose事件回調。
另外路由節點防火牆會關閉長時間不通訊的socket連接,導致socket長連接斷開。所以需要客戶端與服務端定時發送心跳數據保持連接不被斷開。
心跳檢測的原理是什么?
客戶端定時每X秒(推薦小於60秒)向服務端發送特定數據(任意數據都可),服務端設定為X秒沒有收到客戶端心跳則認為客戶端掉線,並關閉連接觸發onClose回調。這樣即通過心跳檢測請求維持了連接(避免連接因長時間不活躍而被網關防火牆關閉),也能讓服務端比較及時的知道客戶端是否異常掉線。
保持對網絡的連接有效性的檢測,在項目中必須關注的點。
一、TCP Keep-Alive 選項
TCP 有一個保持活躍的機制叫做 Keep-Alive,該機制的原理如下:
定義一個時間段,在這個時間段內,如果沒有任何連接相關的活動,TCP 保活機制會開始作用,每隔一個時間間隔,發送一個探測報文,該探測報文包含的數據非常少,如果連續幾個探測報文都沒有得到響應,則認為當前的 TCP 連接已經死亡,系統內核將錯誤信息通知給上層應用程序。
可定義變量,分別被稱為保活時間、保活時間間隔和保活探測次數。在 Linux 系統中,這些變量分別對應 sysctl 變量net.ipv4.tcp_keepalive_time、net.ipv4.tcp_keepalive_intvl、 net.ipv4.tcp_keepalve_probes,默認設置是 7200 秒(2 小時)、75 秒和 9 次探測。
開啟了 TCP 保活,需要考慮以下幾種情況:
- 第一種,對端程序是正常工作的。當 TCP 保活的探測報文發送給對端, 對端會正常響應,這樣 TCP 保活時間會被重置,等待下一個 TCP 保活時間的到來。
- 第二種,對端程序崩潰並重啟。當 TCP 保活的探測報文發送給對端后,對端是可以響應的,但由於沒有該連接的有效信息,會產生一個 RST 報文,這樣很快就會發現 TCP 連接已經被重置。
- 第三種,是對端程序崩潰,或對端由於其他原因導致報文不可達。當 TCP 保活的探測報文發送給對端后,石沉大海,沒有響應,連續幾次,達到保活探測次數后,TCP 會報告該 TCP 連接已經死亡。
TCP 保活機制默認是關閉的,當我們選擇打開時,可以分別在連接的兩個方向上開啟,也可以單獨在一個方向上開啟。如果開啟服務器端到客戶端的檢測,就可以在客戶端非正常斷連的情況下清除在服務器端保留的“臟數據”;而開啟客戶端到服務器端的檢測,就可以在服務器無響應的情況下,重新發起連接。
應用層探活
通過在應用程序中模擬 TCP Keep-Alive 機制,來完成在應用層的連接探活
以設計一個 PING-PONG 的機制,需要保活的一方,比如客戶端,在保活時間達到后,發起對連接的 PING 操作,如果服務器端對 PING 操作有回應,則重新設置保活時間,否則對探測次數進行計數,如果最終探測次數達到了保活探測次數預先設置的值之后,則認為連接已經無效。
關鍵點:
第一個是需要使用定時器,這可以通過使用 I/O 復用自身的機制來實現;第二個是需要設計一個 PING-PONG 的協議。
消息格式設計:
typedef struct {
u_int32_t type;
char data[1024];
} messageObject;
#define MSG_PING 1
#define MSG_PONG 2
#define MSG_TYPE1 11
#define MSG_TYPE2 21
消息對象是一個結構體,前 4 個字節標識了消息類型
設計了MSG_PING、MSG_PONG、MSG_TYPE 1和MSG_TYPE 2四種消息類型。
客戶端程序
客戶端完全模擬 TCP Keep-Alive 的機制,在保活時間達到后,探活次數增加 1,同時向服務器端發送 PING 格式的消息,此后以預設的保活時間間隔,不斷地向服務器端發送 PING 格式的消息。如果能收到服務器端的應答,則結束保活,將保活時間置為 0。
#include "lib/common.h"
#include "message_objecte.h"
#define MAXLINE 4096
#define KEEP_ALIVE_TIME 10
#define KEEP_ALIVE_INTERVAL 3
#define KEEP_ALIVE_PROBETIMES 3
int main(int argc, char **argv) {
if (argc != 2) {
error(1, 0, "usage: tcpclient <IPaddress>");
}
int socket_fd;
socket_fd = socket(AF_INET, SOCK_STREAM, 0);
struct sockaddr_in server_addr;
bzero(&server_addr, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(SERV_PORT);
inet_pton(AF_INET, argv[1], &server_addr.sin_addr);
socklen_t server_len = sizeof(server_addr);
int connect_rt = connect(socket_fd, (struct sockaddr *) &server_addr, server_len);
if (connect_rt < 0) {
error(1, errno, "connect failed ");
}
char recv_line[MAXLINE + 1];
int n;
fd_set readmask;
fd_set allreads;
struct timeval tv;
int heartbeats = 0;
tv.tv_sec = KEEP_ALIVE_TIME;
tv.tv_usec = 0;
messageObject messageObject;
FD_ZERO(&allreads);
FD_SET(0, &allreads);
FD_SET(socket_fd, &allreads);
for (;;) {
readmask = allreads;
int rc = select(socket_fd + 1, &readmask, NULL, NULL, &tv);
if (rc < 0) {
error(1, errno, "select failed");
}
if (rc == 0) {
if (++heartbeats > KEEP_ALIVE_PROBETIMES) {
error(1, 0, "connection dead\n");
}
printf("sending heartbeat #%d\n", heartbeats);
messageObject.type = htonl(MSG_PING);
rc = send(socket_fd, (char *) &messageObject, sizeof(messageObject), 0);
if (rc < 0) {
error(1, errno, "send failure");
}
tv.tv_sec = KEEP_ALIVE_INTERVAL;
continue;
}
if (FD_ISSET(socket_fd, &readmask)) {
n = read(socket_fd, recv_line, MAXLINE);
if (n < 0) {
error(1, errno, "read error");
} else if (n == 0) {
error(1, 0, "server terminated \n");
}
printf("received heartbeat, make heartbeats to 0 \n");
heartbeats = 0;
tv.tv_sec = KEEP_ALIVE_TIME;
}
}
}
第一部分:socket套接字的連接
第二部分:select定時器准備,其中設置了超時時間KEEP_ALIVE_TIME,相當於保活時間,初始化select函數套接字
第三部分:處理心跳報文
當KEEP_ALIVE_TIME這段時間達到之后,select函數會返回0,進行相應的處理流程中
客戶端已經在KEEP_ALIVE_TIME這段時間內沒有收到任何對當前連接的反饋,於是發起PING消息,這里通過傳送一個類型為MSG_PING的消息對象來完成PING操作。
當客戶端在收到服務器的消息的處理中, 實際工作中,需要對報文進行解析后處理。這里只是簡單將探活計數器和探活時間置零,等待下一次探活時間的來臨。
服務器端程序
//
// Created by shengym on 2019-07-07.
//
#include "lib/common.h"
#include "message_objecte.h"
static int count;
static void sig_int(int signo) {
printf("\nreceived %d datagrams\n", count);
exit(0);
}
int main(int argc, char **argv) {
if (argc != 2) {
error(1, 0, "usage: tcpsever <sleepingtime>");
}
int sleepingTime = atoi(argv[1]);
int listenfd;
listenfd = socket(AF_INET, SOCK_STREAM, 0);
struct sockaddr_in server_addr;
bzero(&server_addr, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
server_addr.sin_port = htons(SERV_PORT);
int rt1 = bind(listenfd, (struct sockaddr *) &server_addr, sizeof(server_addr));
if (rt1 < 0) {
error(1, errno, "bind failed ");
}
int rt2 = listen(listenfd, LISTENQ);
if (rt2 < 0) {
error(1, errno, "listen failed ");
}
signal(SIGINT, sig_int);
signal(SIGPIPE, SIG_IGN);
int connfd;
struct sockaddr_in client_addr;
socklen_t client_len = sizeof(client_addr);
if ((connfd = accept(listenfd, (struct sockaddr *) &client_addr, &client_len)) < 0) {
error(1, errno, "bind failed ");
}
messageObject message;
count = 0;
for (;;) {
int n = read(connfd, (char *) &message, sizeof(messageObject));
if (n < 0) {
error(1, errno, "error read");
} else if (n == 0) {
error(1, 0, "client closed \n");
}
printf("received %d bytes\n", n, message);
count++;
switch (ntohl(message.type)) {
case MSG_TYPE1 :
printf("process MSG_TYPE1 \n");
break;
case MSG_TYPE2 :
printf("process MSG_TYPE2 \n");
break;
case MSG_PING: {
messageObject pong_message;
pong_message.type = MSG_PONG;
sleep(sleepingTime);
ssize_t rc = send(connfd, (char *) &pong_message, sizeof(pong_message), 0);
if (rc < 0)
error(1, errno, "send failure");
break;
}
default :
error(1, 0, "unknown message type (%d)\n", ntohl(message.type));
}
}
}
效果:
服務端:
客戶端:
服務單休眠時間60秒,當客戶端在發送了三次心跳檢測報文PING報文后,判斷連接無效,直接退出,這是因為在這段時間內沒有收到來自服務器端任何PONG報文。