C\C++語言利用epoll實現高並發聊天室Demo


2019-11-19

00:19:02

參考大佬:https://github.com/LinHaoo/chat

 

 

 Makefile:

all:server client
server:server.cpp
    g++ $^ -o $@
client:client.cpp
    g++ $^ -o $@
clean:
    rm server client

utility.h

#ifndef CHAT_UTILITY_H
#define CHAT_UTILITY_H

#include <iostream>
#include <list>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/epoll.h>
#include <fcntl.h>
#include <errno.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
using namespace std;

//clients_list save all the clients's socket
list<int> clients_list;

/***** macro defintion *****/
//server ip
#define SERVER_IP "127.0.0.1"

//server port
#define SERVER_PORT 8888

//epoll size
#define EPOLL_SIZE 5000

//message buffer size
#define BUF_SIZE 0xFFFF

#define SERVER_WELCOME "Welcome you join to the chat room! Your chat ID is: Client #%d"

#define SERVER_MESSAGE "ClientID %d say >> %s"

//exit
#define EXIT "EXIT"

#define CAUTION "There is only ont int the char root!"

/****** some function *****/
/**
 *設置非阻塞
 */
int setnonblockint(int sockfd) {
    fcntl(sockfd, F_SETFL, fcntl(sockfd, F_GETFD, 0) | O_NONBLOCK);
    return 0;
}

/**
 * 將文件描述符 fd 添加到 epollfd 標示的內核事件表中,
 * 並注冊 EPOLLIN 和 EPOOLET 事件,
 * EPOLLIN 是數據可讀事件;EPOOLET 表明是 ET 工作方式。
 * 最后將文件描述符設置非阻塞方式
 * @param epollfd:epoll句柄
 * @param fd:文件描述符
 * @param enable_et:enable_et = true,
 * 是否采用epoll的ET(邊緣觸發)工作方式;否則采用LT(水平觸發)工作方式
 */
void addfd(int epollfd, int fd, bool enable_et) {
    struct epoll_event ev;
    ev.data.fd = fd;
    ev.events = EPOLLIN;
    if (enable_et) {
        ev.events = EPOLLIN | EPOLLET;
    }
    epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &ev);
    setnonblockint(fd);
    printf("fd added to epoll!\n\n");
}

//發送廣播
int sendBroadcastmessage(int clientfd) {
    char buf[BUF_SIZE];
    char message[BUF_SIZE];
    bzero(buf, BUF_SIZE);
    bzero(buf, BUF_SIZE);

    printf("read from client(clientID = %d)\n", clientfd);
    int len = recv(clientfd, buf, BUF_SIZE, 0);

    if (0 == len) {
        close(clientfd);
        clients_list.remove(clientfd);
        printf("ClientID = %d closed.\n now there are %d client in the char room\n",
        clientfd, (int)clients_list.size());
    } else {
        if (1 == clients_list.size()) {
            send(clientfd, CAUTION, strlen(CAUTION), 0);
            return 0;
        }
        sprintf(message, SERVER_MESSAGE, clientfd, buf);
        list<int>::iterator it;
        for (it = clients_list.begin(); it != clients_list.end(); ++it) {
            if (*it != clientfd) {
                if (send(*it, message, BUF_SIZE, 0) < 0) {
                    perror("error");
                    exit(-1);
                }
            }
        }
    }
    return len;
}

#endif //CHAT_UTILITY_H

client.cpp

#include "utility.h"

#define error(msg) \
    do {perror(msg); exit(EXIT_FAILURE); } while (0)

int main(int argc, char *argv[]) {
    /**
     * TCP 客戶端通信
     * 1.創建套接字(socket)
     * 2.使用 connect() 建立到達服務器的連接(connect)
     * 3.客戶端進行通信(使用 write()/send() 或 send()/recv() )
     * 4.使用 close() 關閉客戶連接
     */

    /**
     * 1:創建套接字socket
     * param1:指定地址族為IPv4;param2:指定傳輸協議為流式套接字;param3:指定傳輸協議為TCP,可設為0,由系統推導
     */
    int clientfd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
    if (clientfd < 0) { error("socket error"); }

    // 填充sockadd結構,指定ip與端口
    struct sockaddr_in serverAddr;
    serverAddr.sin_family = AF_INET;
    serverAddr.sin_port = htons(SERVER_PORT);
    serverAddr.sin_addr.s_addr = inet_addr(SERVER_IP);

    // 2:連接服務端
    if (connect(clientfd, (struct sockaddr *) &serverAddr, sizeof(serverAddr)) < 0) {
        error("connect error");
    }

    // 創建管道,其中fd[0]用於父進程讀,fd[1]用於子進程寫
    int pipefd[2];
    if (pipe(pipefd) < 0) { error("pipe error"); }

    /**
     * epoll使用
     * 1:調用 epoll_create 函數在 Linux 內核中創建一個事件表;
     * 2:然后將文件描述符添加到所創建的事件表中 (epoll_ctl);
     * 3:在主循環中,調用 epoll_wait 等待返回就緒的文件描述符集合;
     * 4:分別處理就緒的事件集合
     */
    // 創建epoll
    int epfd = epoll_create(EPOLL_SIZE);
    if (epfd < 0) { error("epfd error"); }

    static struct epoll_event events[2];
    //將sock和管道讀端描述符都添加到內核事件表中
    addfd(epfd, clientfd, true);
    addfd(epfd, pipefd[0], true);

    // 表示客戶端是否正常工作
    bool isClientwork = true;

    // 聊天信息緩沖區
    char message[BUF_SIZE];

    // Fork
    int pid = fork();
    if (pid < 0) {
        error("fork error");
    } else if (pid == 0) {      // 子進程
        //子進程負責寫入管道,因此先關閉讀端
        close(pipefd[0]);
        printf("Please input 'exit' to exit the chat room\n");

        while (isClientwork) {
            bzero(&message, BUF_SIZE);
            fgets(message, BUF_SIZE, stdin);

            // 客戶輸出exit,退出
            if (strncasecmp(message, EXIT, strlen(EXIT)) == 0) {
                isClientwork = 0;
            } else {    // 子進程將信息寫入管道
                if (write(pipefd[1], message, strlen(message) - 1) < 0) {
                    error("fork error");
                }
            }
        }
    } else { //pid > 0 父進程
        //父進程負責讀管道數據,因此先關閉寫端
        close(pipefd[1]);

        // 主循環(epoll_wait)
        while (isClientwork) {
            // 等待事件的產生,函數返回需要處理的事件數目
            int epoll_events_count = epoll_wait(epfd, events, 2, -1);
            // 處理就緒事件
            for (int i = 0; i < epoll_events_count; ++i) {
                bzero(&message, BUF_SIZE);

                //服務端發來消息
                if (events[i].data.fd == clientfd) {
                    //接受服務端消息
                    int ret = recv(clientfd, message, BUF_SIZE, 0);

                    // ret= 0 服務端關閉
                    if (ret == 0) {
                        printf("Server closed connection: %d\n", clientfd);
                        close(clientfd);
                        isClientwork = 0;
                    } else printf("%s\n", message);
                }
                    //子進程寫入事件發生,父進程處理並發送服務端
                else {
                    //父進程從管道中讀取數據
                    int ret = read(events[i].data.fd, message, BUF_SIZE);

                    // ret = 0
                    if (ret == 0) {
                        isClientwork = 0;
                    } else {   // 將信息發送給服務端
                        send(clientfd, message, BUF_SIZE, 0);
                    }
                }
            }//for
        }//while
    }

    if (pid) {
        //關閉父進程和sock
        close(pipefd[0]);
        close(clientfd);
    } else {
        //關閉子進程
        close(pipefd[1]);
    }
    return 0;
}

server.cpp

#include "utility.h"

#define error(msg) \
    do {perror(msg); exit(EXIT_FAILURE); } while (0)

int main(int argc, char *argv[]) {
    /**
     * TCP服務端通信
     * 1:使用 socket()創建 TCP 套接字(socket)
     * 2:將創建的套接字綁定到一個本地地址和端口上(bind)
     * 3:將套接字設為監聽模式,准備接收客戶端請求(listen)
     * 4:等待客戶請求到來: 當請求到來后,接受連接請求,返回一個對應於此次連接的新的套接字(accept)
     * 5:用 accept 返回的套接字和客戶端進行通信(使用write()/send()或send()/recv())
     * 6:返回,等待另一個客戶請求
     * 7:關閉套接字
     */

    /**
     * 1:創建套接字socket
     * param1:指定地址族為IPv4;param2:指定傳輸協議為流式套接字;param3:指定傳輸協議為TCP,可設為0,由系統推導
     */
    int listener = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
    if (listener < 0) {
        error("socket error");
    }
    printf("listen socket created \n");

    //地址復用
    int on = 1;
    if (setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) < 0) {
        error("setsockopt");
    }

    struct sockaddr_in serverAddr;
    serverAddr.sin_family = PF_INET;
    serverAddr.sin_port = htons(SERVER_PORT);
    serverAddr.sin_addr.s_addr = inet_addr(SERVER_IP);
    //綁定地址
    if (bind(listener, (struct sockaddr *) &serverAddr, sizeof(serverAddr)) < 0) {
        error("bind error");
    }

    //監聽
    if (listen(listener, SOMAXCONN) < 0) {
        error("listen error");
    }

    printf("Start to listen: %s\n", SERVER_IP);

    //在內核中創建事件表
    int epfd = epoll_create(EPOLL_SIZE);
    if (epfd < 0) {
        error("epfd error");
    }
    printf("epoll created, epollfd = %d\n", epfd);
    static struct epoll_event events[EPOLL_SIZE];
    //往內核事件表里添加事件
    addfd(epfd, listener, true);

    //主循環
    while (1) {
        //epoll_events_count表示就緒事件的數目
        int epoll_events_count = epoll_wait(epfd, events, EPOLL_SIZE, -1);
        if (epoll_events_count < 0) {
            perror("epoll failure");
            break;
        }

        printf("epoll_events_count = %d\n", epoll_events_count);
        //處理這epoll_events_count個就緒事件
        for (int i = 0; i < epoll_events_count; ++i) {
            int sockfd = events[i].data.fd;
            //新用戶連接
            if (sockfd == listener) {
                struct sockaddr_in client_address;
                socklen_t client_addrLength = sizeof(struct sockaddr_in);
                int clientfd = accept(listener, (struct sockaddr *) &client_address, &client_addrLength);

                printf("client connection from: %s : % d(IP : port), clientfd = %d \n",
                       inet_ntoa(client_address.sin_addr),
                       ntohs(client_address.sin_port),
                       clientfd);

                addfd(epfd, clientfd, true);

                // 服務端用list保存用戶連接
                clients_list.push_back(clientfd);
                printf("Add new clientfd = %d to epoll\n", clientfd);
                printf("Now there are %d clients int the chat room\n", (int) clients_list.size());

                // 服務端發送歡迎信息
                printf("welcome message\n");
                char message[BUF_SIZE];
                bzero(message, BUF_SIZE);
                sprintf(message, SERVER_WELCOME, clientfd);
                int ret = send(clientfd, message, BUF_SIZE, 0);
                if (ret < 0) {
                    error("send error");
                }
            } else {           //處理用戶發來的消息,並廣播,使其他用戶收到信息
                int ret = sendBroadcastmessage(sockfd);
                if (ret < 0) {
                    error("error");
                }
            }
        }
    }
    close(listener); //關閉socket
    close(epfd);    //關閉內核
    return 0;
}

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM