0、前言
最近突然想了解怎樣設計一個支持百萬連接的后台server架構。
要設計一個支持百萬連接的后台server,我們首先要知道會有哪些因素限制后台server的高並發連接,這里想到的因素有以下幾點:
1、操作系統的參數設置能否支持百萬並發連接;
2、操作系統維持百萬並發長連接需要多少內存;
3、應用層面上維持百萬並發長連接需要多少內存;
4、百萬並發長連接的吞吐量是否超過了硬件網卡的限制。
在學習的過程中,主要針對的是1、2、4,第3點一般跟業務相關,這里暫時沒有考慮。
本篇文章估計需要多次才能完成,現在初步的想法是先寫一個demo程序,然后后面再慢慢測試優化。
1、后台設計
1.1 后台設計圖
如下為后台的設計結構:
1、首先主進程根據機器CPU個數,創建對應數量的管道;
2、創建完對應的管道之后,再創建一樣數量的線程,每個線程綁定一個CPU;
3、主進程開始初始化socket,然后accept,當接收到一個客戶端連接時,就把conn_fd寫到某個pipe中;
3、每個線程創建epoll,然后監聽對應pipe的寫端fd,當監聽到pipe中有數據時,就讀取該數據,格式化為fd,將該fd加入epoll進行監聽。

1.2 編碼實現
根據1.1的設計,我們編寫代碼,包括server模塊和worker模塊。server模塊負責創建pipe、線程、和監聽客戶端連接;worker模塊負責處理每個客戶端的連接。代碼如下所示:
1.2.0 common
1 #ifndef _SERV_COMMON_H 2 #define _SERV_COMMON_H 3 4 typedef struct { 5 int id; 6 int fd; 7 } thread_arg; 8 9 #define SERV_PORT 9876 10 #define MAX_LINE 1024 11 12 #endif
1.2.1 worker
worker.h
1 #ifndef _SERV_WORKER_H 2 #define _SERV_WORKER_H 3 4 void *worker(void *arg); 5 6 #endif
worker.cc
1 #include <errno.h> 2 #include <fcntl.h> 3 #include <stdio.h> 4 #include <stdlib.h> 5 #include <string.h> 6 #include <unistd.h> 7 #include <sched.h> 8 #include <pthread.h> 9 #include <sys/epoll.h> 10 #include <sys/types.h> 11 #include <sys/socket.h> 12 13 #include "common.h" 14 15 #define MAXFDS 1000000 16 #define EVENTSIZE 1000 17 18 int taskset_thread_core(int core_id) 19 { 20 cpu_set_t cpuset; 21 CPU_ZERO(&cpuset); 22 CPU_SET(core_id, &cpuset); 23 24 pthread_t curr_tid = pthread_self(); 25 return pthread_setaffinity_np(curr_tid, sizeof(cpu_set_t), &cpuset); 26 } 27 28 int setnonblocking(int fd) 29 { 30 if (fcntl(fd, F_SETFL, fcntl(fd, F_GETFD, 0) | O_NONBLOCK) == -1) { 31 printf("fd %d set non blocking failed\n", fd); 32 return -1; 33 } 34 35 return 0; 36 } 37 38 void handle_req(int cli_fd) 39 { 40 char in_buff[MAX_LINE]; 41 int ret, rs = 1; 42 43 while (rs) { 44 ret = recv(cli_fd, in_buff, 1024, 0); 45 46 if (ret < 0) { 47 if (errno == EAGAIN) { 48 printf("EAGAIN\n"); 49 break; 50 } else { 51 printf("recv error: %d\n", errno); 52 close(cli_fd); 53 break; 54 } 55 } else if (ret == 0) { 56 rs = 0; 57 } 58 59 if (ret == sizeof(in_buff)) 60 rs = 1; 61 else 62 rs = 0; 63 } 64 65 if (ret > 0) { 66 send(cli_fd, in_buff, strlen(in_buff), 0); 67 } 68 } 69 70 void run_epoll(int epfd, int pipe_fd) 71 { 72 int i, cli_fd, nfds; 73 struct epoll_event ev, events[EVENTSIZE]; 74 char buff[16]; 75 76 ev.events = EPOLLIN | EPOLLET; 77 78 while (1) { 79 nfds = epoll_wait(epfd, events, EVENTSIZE , -1); 80 for (i = 0; i < nfds; i++) { 81 // pipe msg, add connected fd to epoll 82 if (events[i].data.fd == pipe_fd) { 83 read(pipe_fd, buff, 16); 84 cli_fd = atoi(buff); 85 setnonblocking(cli_fd); 86 ev.data.fd = cli_fd; 87 88 if (epoll_ctl(epfd, EPOLL_CTL_ADD, cli_fd, &ev) < 0) { 89 printf("epoll add fd %d failed\n", cli_fd); 90 } 91 } else { // socket msg 92 cli_fd = events[i].data.fd; 93 handle_req(cli_fd); 94 } 95 } 96 } 97 } 98 99 void *worker(void *arg) 100 { 101 int epfd, pipe_fd; 102 struct epoll_event ev; 103 104 taskset_thread_core(((thread_arg*) arg)->id); 105 106 pipe_fd = ((thread_arg*) arg)->fd; 107 epfd = epoll_create(MAXFDS); 108 setnonblocking(pipe_fd); 109 ev.data.fd = pipe_fd; 110 ev.events = EPOLLIN | EPOLLET | EPOLLONESHOT; 111 if (epoll_ctl(epfd, EPOLL_CTL_ADD, pipe_fd, &ev) < 0) { 112 printf("epoll add mq fail\n"); 113 } 114 115 run_epoll(epfd, pipe_fd); 116 117 return 0; 118 }
1.2.2 server
1 #include <stdio.h> 2 #include <stdlib.h> 3 #include <string.h> 4 #include <unistd.h> 5 #include <sched.h> 6 #include <pthread.h> 7 #include <arpa/inet.h> 8 #include <sys/socket.h> 9 10 #include "common.h" 11 #include "worker.h" 12 13 int start_listen() 14 { 15 int fd, opt = 1; 16 struct sockaddr_in servaddr; 17 18 bzero(&servaddr, sizeof(servaddr)); 19 servaddr.sin_family = AF_INET; 20 servaddr.sin_port = htons(SERV_PORT); 21 servaddr.sin_addr.s_addr = htonl(INADDR_ANY); 22 23 if ((fd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { 24 printf("open socket failed!\n"); 25 exit(1); 26 } 27 28 setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)); 29 30 if ((bind(fd, (struct sockaddr*) &servaddr, sizeof(servaddr))) < 0) { 31 printf("bind failed!\n"); 32 exit(1); 33 } 34 35 if (listen(fd, SOMAXCONN) < 0) { 36 printf("listen failed!\n"); 37 exit(1); 38 } 39 40 return fd; 41 } 42 43 int main(int argc, char **argv) 44 { 45 int i, num_cores, listen_fd, cli_fd; 46 char name[32]; 47 48 listen_fd = start_listen(); 49 50 num_cores = sysconf(_SC_NPROCESSORS_ONLN); 51 printf("core num: %d\n", num_cores); 52 53 int pipe_fd[num_cores][2]; 54 thread_arg targ[num_cores]; 55 pthread_t tid[num_cores]; 56 pthread_attr_t attr; 57 58 if (pthread_attr_init(&attr) != 0) { 59 perror("pthrad attr init error: "); 60 exit(1); 61 } 62 63 if (pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED) != 0) { 64 perror("pthread set attr detached error: "); 65 exit(1); 66 } 67 68 for (i = 0; i < num_cores; i++) { 69 pipe(pipe_fd[i]); 70 targ[i] = (thread_arg) {i, pipe_fd[i][0]}; 71 72 if (pthread_create(&tid[i], &attr, worker, &targ[i]) != 0) { 73 perror("pthread create error: "); 74 exit(1); 75 } 76 } 77 78 pthread_attr_destroy(&attr); 79 sleep(2); 80 printf("server started\n\n"); 81 82 while ((cli_fd = accept(listen_fd, NULL, NULL)) > 0) { 83 sprintf(name, "%d", cli_fd); 84 i = cli_fd % num_cores; 85 write(pipe_fd[i][1], name, strlen(name)); 86 } 87 88 close(listen_fd); 89 90 for (i = 0; i < num_cores; i++) { 91 close(pipe_fd[i][1]); 92 } 93 94 return 0; 95 }
寫完后台代碼之后,開始測試能支持多少連接,但測試過程中一直有問題,會報如下的錯誤:error: Cannot assign requested address。
google了一下,說是因為短時間內大量短連接造成TIME_WAIT耗盡端口問題,不明白我的測試代碼怎么是短連接,而不是長連接。
我的客戶端代碼如下,不知道是哪里出問題了。
#include <unistd.h> #include <arpa/inet.h> #include <sys/socket.h> #include <stdio.h> #include <stdlib.h> #include <string.h> #include <errno.h> void process_conn_svr(const char *svr_ip, int svr_port); int connections = 0; #define MAX_CONN 1005000 int fd[MAX_CONN]; int main(int argc, char **argv) { if (argc <= 2) { printf("usage: %s ip port\n", argv[0]); exit(0); } const char *ip = argv[1]; int port = atoi(argv[2]); pid_t pid = fork(); if (pid == 0) { process_conn_svr(ip, port); } const char buf[] = "keepalive!"; for (;;) { usleep(1*1000); for (int i = 0; i < MAX_CONN; ++i) { if (fd[i] != 0) { send(fd[i], buf, sizeof(buf), 0); } } } return 0; } void process_conn_svr(const char *svr_ip, int svr_port) { int conn_idx = 0; for (;;) { struct sockaddr_in serv_addr; bzero(&serv_addr, sizeof(serv_addr)); serv_addr.sin_family = AF_INET; inet_pton(AF_INET, svr_ip, &serv_addr.sin_addr); serv_addr.sin_port = htons(svr_port); int cli_fd = socket(AF_INET, SOCK_STREAM, 0); if (cli_fd == -1) { goto sock_err; } if (connect(cli_fd, (struct sockaddr*)&serv_addr, sizeof(serv_addr)) == -1) { goto sock_err; } fd[conn_idx] = cli_fd; conn_idx++; connections++; printf("connections: %d, fd: %d\n", connections, cli_fd); if (connections % 10000 == 9999) { printf("press Enter to continue: "); getchar(); } usleep(1*1000); } sock_err: printf("error: %s\n", strerror(errno)); }
