要用到多線程以及線程的讀寫鎖,之前寫的Socket類、ServerSocket要做相應的改變
因為服務器端要維持着一個存儲客戶端Socket信息到數據結構,當多個線程同時訪問這個結構時,要做同步處理,所以要在適當的時候加上讀鎖或寫鎖。
新的ServerSocket類
#ifndef SERVERSOCKET_H #define SERVERSOCKET_H #include "Socket.h" #include <list> #include <semaphore.h> #include "ThreadReadWriteLock.h" using std::list; class ServerSocket:public Socket { public: ServerSocket(const int port); ServerSocket(); virtual ~ServerSocket(); void Accept(Socket& socket); //run server to connect multi-clients void Run(); private: //accept multi-clients bool Accept(); void AddClient(Socket* clientSocket); static void DeleteClient(Socket* clientSocket); static void* ProcessMessage(void* arg); static void SendMsgToAllUsers(const std::string& message); static list<Socket*> clientSockets; static bool serviceFlag; //use thread-read-write-lock to synchronize threads static ThreadReadWriteLock readWriteLock; }; #endif
其中有static成員函數,因為創建一個新的線程時,要傳遞一個函數指針,不過類普通的成員函數的函數指針與一般的函數指針是不兼容的,所以要傳遞static成員函數的函數指針。
以下是ServerSocket的新實現:
ServerSocket.cpp
#include "ServerSocket.h" #include "SocketException.h" #include <pthread.h> #include <iostream> list<Socket*> ServerSocket::clientSockets; ThreadReadWriteLock ServerSocket::readWriteLock; bool ServerSocket::serviceFlag=true; ServerSocket::ServerSocket(const int port) { if ( ! Socket::Create() ) { throw SocketException ( "Could not create server socket." ); } if ( ! Socket::Bind ( port ) ) { throw SocketException ( "Could not bind to port." ); } if ( ! Socket::Listen() ) { throw SocketException ( "Could not listen to socket." ); } } ServerSocket::~ServerSocket() { list<Socket*>::iterator iter; for(iter=clientSockets.begin();iter!=clientSockets.end();iter++) delete (*iter); } void ServerSocket::Accept(Socket& socket) { if ( ! Socket::Accept ( socket ) ) { throw SocketException ( "Could not accept socket." ); } } bool ServerSocket::Accept() { Socket* clientSocket=new Socket; Accept(*clientSocket); AddClient(clientSocket); //create new thread for a new client pthread_t newThread; int result=pthread_create(&newThread,NULL,ProcessMessage,static_cast<void*>(clientSocket)); if(result!=0) return false;
//detach the newThread
//so when newThread exits it can release it's resource
result=pthread_detach(newThread);
if(result!=0)
perror("Failed to detach thread");
return true; } void ServerSocket::Run() { while(serviceFlag) { if(clientSockets.size()>=static_cast<unsigned int>(MAXCONNECTION)) serviceFlag=false; else serviceFlag=Accept(); sleep(1); } } void* ServerSocket::ProcessMessage(void* arg) { std::string message; Socket* clientSocket=static_cast<Socket*>(arg); Send(*clientSocket,"Welcome!"); while(serviceFlag) { Receive(*clientSocket,message); if(message=="exit") { Send(*clientSocket,"user_exit"); DeleteClient(clientSocket); break; } else SendMsgToAllUsers(message); sleep(1); } pthread_exit(NULL); return NULL; } void ServerSocket::AddClient(Socket* socket) { if(readWriteLock.SetWriteLock()) { clientSockets.push_back(socket); std::cout<<"Now "<<clientSockets.size()<<" users.."; std::cout<<"New User: "<<socket->GetAddress()<<" "<<socket->GetPort()<<"\n"; readWriteLock.UnLock(); } else serviceFlag=false; } void ServerSocket::DeleteClient(Socket* socket) { if(readWriteLock.SetWriteLock()) { list<Socket*>::iterator iter; for(iter=clientSockets.begin();iter!=clientSockets.end();iter++) if((*iter)->GetAddress()==socket->GetAddress() && (*iter)->GetPort()==socket->GetPort()) { //delete socket* in list
delete (*iter);
clientSockets.erase(iter); std::cout<<"Now "<<clientSockets.size()<<" users..\n"; break; } readWriteLock.UnLock(); } else serviceFlag=false; }
接下來是讀寫鎖操作的封裝 ThreadReadWriteLock.h
這個類封裝了對線程讀寫鎖pthread_rwlock_t的操作,這些操作包括pthread_rwlock_init,pthread_rwlock_rlock,pthread_rwlock_wrlock,pthread_rwlock_unlock等
#ifndef THREADREADWRITELOCK_H #define THREADREADWRITELOCK_H #include <pthread.h> class ThreadReadWriteLock { public: ThreadReadWriteLock(); ~ThreadReadWriteLock(); bool SetReadLock(); bool SetWriteLock(); void UnLock(); private: pthread_rwlock_t readWriteLock; }; #endif
然后客戶端做些許改變即可(開多一個線程接收服務器發來的信息,這樣發送和接收就可以並行了)
以下是測試結果:
服務器:

客戶端1:

客戶端2:

客戶端3:

可以看到服務器端顯示的客戶端數量的變化。。。。
測試一下當連接的客戶端數量超過我們設置的最大連接數時的情況(在Socket.h中定義這個連接數等於5)

最后說一下要注意的地方
由於pthread 庫不是 Linux 系統默認的庫,連接時需要使用靜態庫 libpthread.a,否則在使用一些與線程有關的函數時會報錯
如使用pthread_create會提示以下錯誤undefined reference to `pthread_create’,解決方法如下:
(1)使用gcc或g++要在編譯中要加 -lpthread參數
(2)如果使用eclipse的話,要設置
Project->Properties->C/C++ Build->Settings->GCC C++ Linker->Libraries
在Libraries(-l)中添加pthread即可
在Libraries(-l)中添加crypto即可
最后說一下程序的不足:
(1)首先我把服務器處理消息的操作暫時都放在了ProcessMessage函數里面了,如果處理的消息很復雜的話,那么ServerSocket這個類就會很臃腫,
所以必要時要將這些功能拆分。
(2)像檢測客戶端是否非正常的掉線,這些操作還沒做,可以通過設置Socket的keep alive來檢測,就是通過一個心跳包,在服務器和客戶端沒有通信時,隔一段時間發送一個
數據包,若客戶端沒有反應則認為客戶端已經掉線了。
(3)畢竟這只是個小程序,當然還有其他不足,如現在只有群聊功能,還可以加上私聊的功能。。。。。。
