Linux socket編程(三) 簡單的多線程聊天室


要用到多線程以及線程的讀寫鎖,之前寫的Socket類、ServerSocket要做相應的改變

因為服務器端要維持着一個存儲客戶端Socket信息到數據結構,當多個線程同時訪問這個結構時,要做同步處理,所以要在適當的時候加上讀鎖或寫鎖。

新的ServerSocket類

#ifndef SERVERSOCKET_H
#define SERVERSOCKET_H

#include "Socket.h"
#include <list>
#include <semaphore.h>
#include "ThreadReadWriteLock.h"

using std::list;

class ServerSocket:public Socket
{
    public:
        ServerSocket(const int port);
        ServerSocket();
        virtual ~ServerSocket();

        void Accept(Socket& socket);
        //run server to connect multi-clients
        void Run();

    private:
        //accept multi-clients
        bool Accept();
        void AddClient(Socket* clientSocket);
        static void DeleteClient(Socket* clientSocket);
        static void* ProcessMessage(void* arg);
        static void SendMsgToAllUsers(const std::string& message);

        static list<Socket*> clientSockets;
        static bool serviceFlag;
        //use thread-read-write-lock to synchronize threads
        static ThreadReadWriteLock readWriteLock;
};

#endif

其中有static成員函數,因為創建一個新的線程時,要傳遞一個函數指針,不過類普通的成員函數的函數指針與一般的函數指針是不兼容的,所以要傳遞static成員函數的函數指針。

以下是ServerSocket的新實現:

ServerSocket.cpp

#include "ServerSocket.h"
#include "SocketException.h"
#include <pthread.h>
#include <iostream>

list<Socket*> ServerSocket::clientSockets;
ThreadReadWriteLock ServerSocket::readWriteLock;
bool ServerSocket::serviceFlag=true;

ServerSocket::ServerSocket(const int port)
{
      if ( ! Socket::Create() )
        {
          throw SocketException ( "Could not create server socket." );
        }

      if ( ! Socket::Bind ( port ) )
        {
          throw SocketException ( "Could not bind to port." );
        }

      if ( ! Socket::Listen() )
        {
          throw SocketException ( "Could not listen to socket." );
        }
}

ServerSocket::~ServerSocket()
{
    list<Socket*>::iterator iter;
    for(iter=clientSockets.begin();iter!=clientSockets.end();iter++)
        delete (*iter);
}

void ServerSocket::Accept(Socket& socket)
{
      if ( ! Socket::Accept ( socket ) )
        {
          throw SocketException ( "Could not accept socket." );
        }
}

bool ServerSocket::Accept()
{
    Socket* clientSocket=new Socket;
    Accept(*clientSocket);
    AddClient(clientSocket);

    //create new thread for a new client
    pthread_t newThread;
    int result=pthread_create(&newThread,NULL,ProcessMessage,static_cast<void*>(clientSocket));
    if(result!=0)
        return false;

    //detach the newThread
    //so when newThread exits it can release it's resource
    result=pthread_detach(newThread);
    if(result!=0)
        perror("Failed to detach thread");
return true; } void ServerSocket::Run() { while(serviceFlag) { if(clientSockets.size()>=static_cast<unsigned int>(MAXCONNECTION)) serviceFlag=false; else serviceFlag=Accept(); sleep(1); } } void* ServerSocket::ProcessMessage(void* arg) { std::string message; Socket* clientSocket=static_cast<Socket*>(arg); Send(*clientSocket,"Welcome!"); while(serviceFlag) { Receive(*clientSocket,message); if(message=="exit") { Send(*clientSocket,"user_exit"); DeleteClient(clientSocket); break; } else SendMsgToAllUsers(message); sleep(1); } pthread_exit(NULL); return NULL; } void ServerSocket::AddClient(Socket* socket) { if(readWriteLock.SetWriteLock()) { clientSockets.push_back(socket); std::cout<<"Now "<<clientSockets.size()<<" users.."; std::cout<<"New User: "<<socket->GetAddress()<<" "<<socket->GetPort()<<"\n"; readWriteLock.UnLock(); } else serviceFlag=false; } void ServerSocket::DeleteClient(Socket* socket) { if(readWriteLock.SetWriteLock()) { list<Socket*>::iterator iter; for(iter=clientSockets.begin();iter!=clientSockets.end();iter++) if((*iter)->GetAddress()==socket->GetAddress() && (*iter)->GetPort()==socket->GetPort()) {                 //delete socket* in list
                delete (*iter);
clientSockets.erase(iter); std::cout
<<"Now "<<clientSockets.size()<<" users..\n"; break; } readWriteLock.UnLock(); } else serviceFlag=false; }

 

接下來是讀寫鎖操作的封裝  ThreadReadWriteLock.h

這個類封裝了對線程讀寫鎖pthread_rwlock_t的操作,這些操作包括pthread_rwlock_init,pthread_rwlock_rlock,pthread_rwlock_wrlock,pthread_rwlock_unlock

#ifndef THREADREADWRITELOCK_H
#define THREADREADWRITELOCK_H

#include <pthread.h>

class ThreadReadWriteLock
{
    public:
        ThreadReadWriteLock();
        ~ThreadReadWriteLock();

        bool SetReadLock();
        bool SetWriteLock();
        void UnLock();

    private:
        pthread_rwlock_t readWriteLock;
};

#endif

然后客戶端做些許改變即可(開多一個線程接收服務器發來的信息,這樣發送和接收就可以並行了)

以下是測試結果:

服務器:

客戶端1:

客戶端2:

客戶端3:

可以看到服務器端顯示的客戶端數量的變化。。。。

 

測試一下當連接的客戶端數量超過我們設置的最大連接數時的情況(在Socket.h中定義這個連接數等於5)

最后說一下要注意的地方

由於pthread 庫不是 Linux 系統默認的庫,連接時需要使用靜態庫 libpthread.a,否則在使用一些與線程有關的函數時會報錯

如使用pthread_create會提示以下錯誤undefined reference to `pthread_create’,解決方法如下:

(1)使用gcc或g++要在編譯中要加 -lpthread參數

(2)如果使用eclipse的話,要設置
Project->Properties->C/C++ Build->Settings->GCC C++ Linker->Libraries
在Libraries(-l)中添加pthread即可
在Libraries(-l)中添加crypto即可

 

最后說一下程序的不足:

(1)首先我把服務器處理消息的操作暫時都放在了ProcessMessage函數里面了,如果處理的消息很復雜的話,那么ServerSocket這個類就會很臃腫,

所以必要時要將這些功能拆分。

(2)像檢測客戶端是否非正常的掉線,這些操作還沒做,可以通過設置Socket的keep alive來檢測,就是通過一個心跳包,在服務器和客戶端沒有通信時,隔一段時間發送一個

數據包,若客戶端沒有反應則認為客戶端已經掉線了。

(3)畢竟這只是個小程序,當然還有其他不足,如現在只有群聊功能,還可以加上私聊的功能。。。。。。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM