基於管道通知的百萬並發長連接server模型


0、前言

最近突然想了解怎樣設計一個支持百萬連接的后台server架構。

要設計一個支持百萬連接的后台server,我們首先要知道會有哪些因素限制后台server的高並發連接,這里想到的因素有以下幾點:

1、操作系統的參數設置能否支持百萬並發連接;

2、操作系統維持百萬並發長連接需要多少內存;

3、應用層面上維持百萬並發長連接需要多少內存;

4、百萬並發長連接的吞吐量是否超過了硬件網卡的限制。

在學習的過程中,主要針對的是1、2、4,第3點一般跟業務相關,這里暫時沒有考慮。

本篇文章估計需要多次才能完成,現在初步的想法是先寫一個demo程序,然后后面再慢慢測試優化。

1、后台設計

1.1 后台設計圖

如下為后台的設計結構:

1、首先主進程根據機器CPU個數,創建對應數量的管道;

2、創建完對應的管道之后,再創建一樣數量的線程,每個線程綁定一個CPU;

3、主進程開始初始化socket,然后accept,當接收到一個客戶端連接時,就把conn_fd寫到某個pipe中;

3、每個線程創建epoll,然后監聽對應pipe的寫端fd,當監聽到pipe中有數據時,就讀取該數據,格式化為fd,將該fd加入epoll進行監聽。

1.2 編碼實現

根據1.1的設計,我們編寫代碼,包括server模塊和worker模塊。server模塊負責創建pipe、線程、和監聽客戶端連接;worker模塊負責處理每個客戶端的連接。代碼如下所示:

1.2.0 common

 1 #ifndef _SERV_COMMON_H
 2 #define _SERV_COMMON_H
 3 
 4 typedef struct {
 5     int id;
 6     int fd;
 7 } thread_arg;
 8 
 9 #define SERV_PORT 9876
10 #define MAX_LINE  1024
11 
12 #endif
View Code

1.2.1 worker

worker.h

1 #ifndef _SERV_WORKER_H
2 #define _SERV_WORKER_H
3 
4 void *worker(void *arg);
5 
6 #endif
View Code

worker.cc

  1 #include <errno.h>
  2 #include <fcntl.h>
  3 #include <stdio.h>
  4 #include <stdlib.h>
  5 #include <string.h>
  6 #include <unistd.h>
  7 #include <sched.h>
  8 #include <pthread.h>
  9 #include <sys/epoll.h>
 10 #include <sys/types.h>
 11 #include <sys/socket.h>
 12 
 13 #include "common.h"
 14 
 15 #define MAXFDS 1000000
 16 #define EVENTSIZE 1000
 17 
 18 int taskset_thread_core(int core_id)
 19 {
 20     cpu_set_t cpuset;
 21     CPU_ZERO(&cpuset);
 22     CPU_SET(core_id, &cpuset);
 23 
 24     pthread_t curr_tid = pthread_self();
 25     return pthread_setaffinity_np(curr_tid, sizeof(cpu_set_t), &cpuset);
 26 }
 27 
 28 int setnonblocking(int fd)
 29 {
 30     if (fcntl(fd, F_SETFL, fcntl(fd, F_GETFD, 0) | O_NONBLOCK) == -1) {
 31         printf("fd %d set non blocking failed\n", fd);
 32         return -1;
 33     }
 34 
 35     return 0;
 36 }
 37 
 38 void handle_req(int cli_fd)
 39 {
 40     char in_buff[MAX_LINE];
 41     int ret, rs = 1;
 42 
 43     while (rs) {
 44         ret = recv(cli_fd, in_buff, 1024, 0);
 45 
 46         if (ret < 0) {
 47             if (errno == EAGAIN) {
 48                 printf("EAGAIN\n");
 49                 break;
 50             } else {
 51                 printf("recv error: %d\n", errno);
 52                 close(cli_fd);
 53                 break;
 54             }
 55         } else if (ret == 0) {
 56             rs = 0;
 57         }
 58 
 59         if (ret == sizeof(in_buff))
 60             rs = 1;
 61         else
 62             rs = 0;
 63     }
 64 
 65     if (ret > 0) {
 66         send(cli_fd, in_buff, strlen(in_buff), 0);
 67     }
 68 }
 69 
 70 void run_epoll(int epfd, int pipe_fd)
 71 {
 72     int i, cli_fd, nfds;
 73     struct epoll_event ev, events[EVENTSIZE];
 74     char buff[16];
 75 
 76     ev.events = EPOLLIN | EPOLLET;
 77 
 78     while (1) {
 79         nfds = epoll_wait(epfd, events, EVENTSIZE , -1);
 80         for (i = 0; i < nfds; i++) {
 81             // pipe msg, add connected fd to epoll
 82             if (events[i].data.fd == pipe_fd) {
 83                 read(pipe_fd, buff, 16);
 84                 cli_fd = atoi(buff);
 85                 setnonblocking(cli_fd);
 86                 ev.data.fd = cli_fd;
 87 
 88                 if (epoll_ctl(epfd, EPOLL_CTL_ADD, cli_fd, &ev) < 0) {
 89                     printf("epoll add fd %d failed\n", cli_fd);
 90                 }
 91             } else {  // socket msg
 92                 cli_fd = events[i].data.fd;
 93                 handle_req(cli_fd);
 94             }
 95         }
 96     }
 97 }
 98 
 99 void *worker(void *arg)
100 {
101     int epfd, pipe_fd;
102     struct epoll_event ev;
103 
104     taskset_thread_core(((thread_arg*) arg)->id);
105     
106     pipe_fd = ((thread_arg*) arg)->fd;
107     epfd = epoll_create(MAXFDS);
108     setnonblocking(pipe_fd);
109     ev.data.fd = pipe_fd;
110     ev.events = EPOLLIN | EPOLLET | EPOLLONESHOT;
111     if (epoll_ctl(epfd, EPOLL_CTL_ADD, pipe_fd, &ev) < 0) {
112         printf("epoll add mq fail\n");
113     }
114 
115     run_epoll(epfd, pipe_fd);
116 
117     return 0;
118 }
View Code

1.2.2 server

 1 #include <stdio.h>
 2 #include <stdlib.h>
 3 #include <string.h>
 4 #include <unistd.h>
 5 #include <sched.h>
 6 #include <pthread.h>
 7 #include <arpa/inet.h>
 8 #include <sys/socket.h>
 9 
10 #include "common.h"
11 #include "worker.h"
12 
13 int start_listen()
14 {
15     int fd, opt = 1;
16     struct sockaddr_in servaddr;
17 
18     bzero(&servaddr, sizeof(servaddr));
19     servaddr.sin_family = AF_INET;
20     servaddr.sin_port = htons(SERV_PORT);
21     servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
22 
23     if ((fd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
24         printf("open socket failed!\n");
25         exit(1);
26     }
27 
28     setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
29 
30     if ((bind(fd, (struct sockaddr*) &servaddr, sizeof(servaddr))) < 0) {
31         printf("bind failed!\n");
32         exit(1);
33     }
34 
35     if (listen(fd, SOMAXCONN) < 0) {
36         printf("listen failed!\n");
37         exit(1);
38     }
39 
40     return fd;
41 }
42 
43 int main(int argc, char **argv)
44 {
45     int i, num_cores, listen_fd, cli_fd;
46     char name[32];
47 
48     listen_fd = start_listen();
49 
50     num_cores = sysconf(_SC_NPROCESSORS_ONLN);
51     printf("core num: %d\n", num_cores);
52 
53     int pipe_fd[num_cores][2];
54     thread_arg targ[num_cores];
55     pthread_t tid[num_cores];
56     pthread_attr_t attr;
57 
58     if (pthread_attr_init(&attr) != 0) {
59         perror("pthrad attr init error: ");
60         exit(1);
61     }
62 
63     if (pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED) != 0) {
64         perror("pthread set attr detached error: ");
65         exit(1);
66     }
67 
68     for (i = 0; i < num_cores; i++) {
69         pipe(pipe_fd[i]);
70         targ[i] = (thread_arg) {i, pipe_fd[i][0]};
71 
72         if (pthread_create(&tid[i], &attr, worker, &targ[i]) != 0) {
73             perror("pthread create error: ");
74             exit(1);
75         }
76     }
77 
78     pthread_attr_destroy(&attr);
79     sleep(2);
80     printf("server started\n\n");
81 
82     while ((cli_fd = accept(listen_fd, NULL, NULL)) > 0) {
83         sprintf(name, "%d", cli_fd);
84         i = cli_fd % num_cores;
85         write(pipe_fd[i][1], name, strlen(name));
86     }
87 
88     close(listen_fd);
89 
90     for (i = 0; i < num_cores; i++) {
91         close(pipe_fd[i][1]);
92     }
93 
94     return 0;
95 }
View Code

 寫完后台代碼之后,開始測試能支持多少連接,但測試過程中一直有問題,會報如下的錯誤:error: Cannot assign requested address。

google了一下,說是因為短時間內大量短連接造成TIME_WAIT耗盡端口問題,不明白我的測試代碼怎么是短連接,而不是長連接。

我的客戶端代碼如下,不知道是哪里出問題了。

#include <unistd.h>
#include <arpa/inet.h>
#include <sys/socket.h>

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>

void process_conn_svr(const char *svr_ip, int svr_port);

int connections = 0;

#define MAX_CONN 1005000
int fd[MAX_CONN];
 
int main(int argc, char **argv) 
{
    if (argc <= 2) { 
        printf("usage: %s ip port\n", argv[0]);
        exit(0);
    }
    const char *ip = argv[1];
    int port = atoi(argv[2]);

   
    pid_t pid = fork(); 
    if (pid == 0) { 
        process_conn_svr(ip, port);
    }

    const char buf[] = "keepalive!";
    for (;;) {
        usleep(1*1000);
        for (int i = 0; i < MAX_CONN; ++i) {
            if (fd[i] != 0) { 
                send(fd[i], buf, sizeof(buf), 0);
            }       
        }       
    }
    return 0;   
    
}

void process_conn_svr(const char *svr_ip, int svr_port)
{
    int conn_idx = 0;
     for (;;) {
        struct sockaddr_in serv_addr;
        bzero(&serv_addr, sizeof(serv_addr));
        serv_addr.sin_family = AF_INET;
        inet_pton(AF_INET, svr_ip, &serv_addr.sin_addr);

        serv_addr.sin_port = htons(svr_port);
        int cli_fd = socket(AF_INET, SOCK_STREAM, 0);
        if (cli_fd == -1) {
            goto sock_err;
        }

        if (connect(cli_fd, (struct sockaddr*)&serv_addr, sizeof(serv_addr)) == -1) {
            goto sock_err;
        }

        fd[conn_idx] = cli_fd;
        conn_idx++;

        connections++;
        printf("connections: %d, fd: %d\n", connections, cli_fd);

        if (connections % 10000 == 9999) {
            printf("press Enter to continue: ");
            getchar();
        }
        usleep(1*1000);
    }

sock_err:
    printf("error: %s\n", strerror(errno));
}
View Code


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM