thrift實現多服務多線程的匹配系統
thrift學習教程:thrift官網
本博客代碼:thrift_match_server
官網教程:進入官網->Tutorial->tutorial.thrift
Apache Thrift軟件框架用於可伸縮的跨語言服務開發,它將軟件棧和代碼生成引擎結合在一起,以構建在C++、Java、Python、PHP、Ruby、Erlang、Perl、Haskell、C#、Cocoa、JavaScript、Node.js、Smalltalk、OCaml和Delphi等語言之間高效、無縫地工作的服務。
Thrift使用C++進行編寫,在安裝使用的時候需要安裝依賴,windows安裝方式見官網即可。安裝方式:thrift官網介紹安裝方式
項目目的
模擬一個多服務的游戲匹配系統,具體業務邏輯如下圖:
准備工作
創建項目結構
-
創建項目文件夾
thrift_demo
-
游戲系統節點
,創建game
文件夾;匹配系統節點
,創建match_system
文件夾;thrift
相關文件,創建thrift
文件夾
結果如圖:
thrift簡單語法介紹
0.開發流程
- 對接口進行描述,定義接口描述文件:
.thrift
文件,比如match.thrift
- 使用
thrift
將接口的描述文件自動成對應語言的版本的代碼,包括服務端和客戶端
1.命名空間
thrift
文件命名一般都是以.thrift
作為后綴:XXX.thrift
,可以在該文件的開頭為該文件加上命名空間限制,格式為:
namespace 語言名稱 名稱
例如對c++來說,有:
namespace cpp match_service
2.數據類型
大小寫敏感,它共支持以下幾種基本的數據類型:
string
, 字符串類型,注意是全部小寫形式;i16
, 16位整形類型,i32
,32位整形類型,對應C/C++/java中的int類型;i64
,64位整形,對應C/C++/java中的long類型;byte
,8位的字符類型,對應C/C++中的char,java中的byte類型bool
, 布爾類型,對應C/C++中的bool,java中的boolean類型;double
,雙精度浮點類型,對應C/C++/java中的double類型;void
,空類型,對應C/C++/java中的void類型;該類型主要用作函數的返回值,除上述基本類型外,ID還支持以下類型:
map
,map類型,例如,定義一個map對象:map[HTML_REMOVED] newmap;set
,集合類型,例如,定義set[HTML_REMOVED]對象:set[HTML_REMOVED] aSet;list
,鏈表類型,例如,定義一個list[HTML_REMOVED]對象:list[HTML_REMOVED] aList;
struct,自定義結構體類型,在IDL中可以自己定義結構體,對應C中的struct,c++中的struct和class,java中的class。例如:
struct User{
1: i32 id,
2: string name,
3: i32 score
}
注意,在struct定義結構體時需要對每個結構體成員用序號標識:“序號: ”。
3.函數接口
文件中對所有接口函數的描述都放在service中,service的名字可以自己指定,該名字也將被用作生成的特定語言接口文件的名字。
接口函數需要對參數使用序號標號,除最后一個接口函數外,要以,
結束對函數的描述。
比如:
namespace cpp match_service
struct User {
1: i32 id,
2: string name,
3: i32 score
}
service Match {
/**
* user: 添加的用戶信息
* info: 附加信息
* 在匹配池中添加一個名用戶
*/
i32 add_user(1: User user, 2: string info),
/**
* user: 刪除的用戶信息
* info: 附加信息
* 從匹配池中刪除一名用戶
*/
i32 remove_user(1: User user, 2: string info),
}
具體運行流程,以下面匹配系統例子介紹。
服務端框架搭建
創建接口描述文件
對於匹配系統的thrift相關配置,我們在thrift
文件夾下,創建match.thrift
文件,用來生成匹配系統服務端
的一系列文件。
vi thrift/match.thrift
參考:打開thrift官網,在上方選擇
Tutorial
項,查看thrift官方教程,點擊下方的tutorial.thrift
進入一個示例文件。
編寫match.thrift
配置文件,只需要在文件中寫明接口
和對象
即可:
namespace cpp match_service
struct User {
1: i32 id,
2: string name,
3: i32 score
}
service Match {
/**
* user: 添加的用戶信息
* info: 附加信息
* 在匹配池中添加一個名用戶
*/
i32 add_user(1: User user, 2: string info),
/**
* user: 刪除的用戶信息
* info: 附加信息
* 從匹配池中刪除一名用戶
*/
i32 remove_user(1: User user, 2: string info),
}
編譯成對應語言的版本
進入到match_system
文件夾,創建src
文件夾。在src
下執行語句:
# thrift -r --gen <語言名> <.thrift文件的路徑>
thrift -r --gen cpp ../../thrift/match.thrift
這樣就會生成各種配置和連接文件,還有代碼框架
,只需要在框架中實現自己的業務即可。默認是放在gen-cpp
,可以修改為match_server
,以便更好的划分業務模塊。
同時其中Match_server.skeleton.cpp
為服務端的代碼框架,具體業務就是在這個文件編寫實現,將Match_server.skeleton.cpp
移動到match_system/src
下並重命名為main.cpp
,match_system
的整個業務邏輯就是在這個文件中實現。
最后的文件結構如下:
代碼生成以后最好先跑一下,然后再逐步添加功能,看一下是否成功,上述操作導致需要修改兩個地方:
- 之前
main.cpp
在match_server
下,現在在match_system/src
下,所以main.cpp
中對Match.h
頭文件的引入需要修改路徑。 - 文件中的兩個函數
int32_t add_user
和int32_t remove_user
需要有返回值,原來沒有,會報警告,需要手動加上。
// This autogenerated skeleton file illustrates how to build a server.
// You should copy it to another filename to avoid overwriting it.
#include "match_server/Match.h"
#include <thrift/protocol/TBinaryProtocol.h>
#include <thrift/server/TSimpleServer.h>
#include <thrift/transport/TServerSocket.h>
#include <thrift/transport/TBufferTransports.h>
using namespace ::apache::thrift;
using namespace ::apache::thrift::protocol;
using namespace ::apache::thrift::transport;
using namespace ::apache::thrift::server;
using namespace ::match_service;
class MatchHandler : virtual public MatchIf {
public:
MatchHandler() {
// Your initialization goes here
}
/**
* user: 添加的用戶信息
* info: 附加信息
* 在匹配池中添加一名用戶
*
* @param user
* @param info
*/
int32_t add_user(const User& user, const std::string& info) {
// Your implementation goes here
printf("add_user\n");
return 0;
}
/**
* user: 刪除的用戶信息
* info: 附加信息
* 從匹配池中刪除一名用戶
*
* @param user
* @param info
*/
int32_t remove_user(const User& user, const std::string& info) {
// Your implementation goes here
printf("remove_user\n");
return 0;
}
};
int main(int argc, char **argv) {
int port = 9090;
::std::shared_ptr<MatchHandler> handler(new MatchHandler());
::std::shared_ptr<TProcessor> processor(new MatchProcessor(handler));
::std::shared_ptr<TServerTransport> serverTransport(new TServerSocket(port));
::std::shared_ptr<TTransportFactory> transportFactory(new TBufferedTransportFactory());
::std::shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory());
TSimpleServer server(processor, serverTransport, transportFactory, protocolFactory);
server.serve();
return 0;
}
c++文件的編譯、鏈接和運行
C++的編譯過程
(注意大小寫)假設我有一個main.cpp文件
-E
:只對文件進行預處理,不進行編譯和匯編。g++ -E main.cpp
——>在dos命令行查看某文件的預處理過程,如果你想查看詳細的預處理,可以重定向到一個文件中,如:g++ -E main.cpp -o main.i
-s
:編譯到匯編語言,不進行匯編和鏈接,即只激活預處理和編譯,生成匯編語言,如果你想查看詳細的編譯,可以重定向到一個文件中,如:g++ -S main.cpp -o main.s
-c
:編譯到目標代碼,g++ -c main.s -o 文件名.o
-o
:生成鏈接文件: 如果該文件是獨立的,與其他自己編寫的文件無依賴關系。直接g++ main.o -o 生成的可執行文件的文件名
,假設該文件依賴其他源文件(不需要加入頭文件)
temp.cpp
,在對temp.cpp
文件進行預處理->編譯->匯編后,使用指令g++ temp.o main.o -o main
.\
:執行文件,輸出結果。如:.\main
,當然你可以直接g++ main.cpp temp.cpp -o main
生成目標文件讓編譯器自動為你處理其他流程。
步驟
-
編譯
src
文件夾下的所有.cpp
文件g++ -c main.cpp match_server/*.cpp
-
將所有生成的
.o
文件鏈接成一個可執行文件,要用到thrift
動態鏈接庫g++ *.o -o main -lthrift
-
執行生成的可執行文件
main
./main
為了判斷文件是否正確執行,可以在
main.cpp
中寫一些輸出語句,驗證效果 -
將項目版本提交git,提交時,一般會刪除中間生成的文件和可執行文件
git add . git restore --stage *.o git restore --stage match_system/src/main git commit -m "first can run"
客戶端框架搭建
跟上面一樣的步驟
-
在
game
下創建src
,在src
下執行:thrift -r --gen py ../../thrift/match.thrift
這樣,thrift服務端的一系列文件就會生成在
src
文件夾中的gen-py
文件夾下,為了划分業務模塊將gen-py
重命名為match_client
文件結構如下:
. |-- Match.o |-- main |-- main.cpp |-- main.o |-- match_client | |-- __init__.py | `-- match | |-- Match-remote | |-- Match.py | |-- __init__.py | |-- constants.py | `-- ttypes.py |-- match_server | |-- Match.cpp | |-- Match.h | |-- match_types.cpp | `-- match_types.h `-- match_types.o
因為我們只需要實現客戶端,不需要服務端,所以可以把
Match-remote
刪除 -
在
src
下創建文件client.py
,將 Apache Thrift - Python 頁面中,client
中的代碼復制到該文件中,並將代碼進行適當的改動和刪除,client.py
中的初始代碼如下:from match_client.match import Match from match_client.match.ttypes import User from thrift import Thrift from thrift.transport import TSocket from thrift.transport import TTransport from thrift.protocol import TBinaryProtocol def main(): # Make socket transport = TSocket.TSocket('localhost', 9090) # Buffering is critical. Raw sockets are very slow transport = TTransport.TBufferedTransport(transport) # Wrap in a protocol protocol = TBinaryProtocol.TBinaryProtocol(transport) # Create a client to use the protocol encoder client = Match.Client(protocol) # Connect! transport.open() # 具體業務代碼 user = User(1, 'sdz', 1500) client.add_user(user, "") # Close! transport.close() if __name__ == "__main__": main()
-
運行一下
-
先在
thrift_demo/match_system/src
下,執行:./main
,使服務端運行 -
再在
thrift_demo/game/src
下,執行:python3 client.py
,使客戶端運行 -
觀察服務端運行處有無相應輸出,若有,說明成功運行
-
-
git
保存一下git add . git restore --stage *.pyc # pyc文件為中間結果文件,類似c++的.o文件 git commit -m "add match client"
客戶端完善
from match_client.match import Match
from match_client.match.ttypes import User
from thrift import Thrift
from thrift.transport import TSocket
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol
# python讀取命令行參數包
from sys import stdin
def operate(op, user_id, username, score):
# Make socket
transport = TSocket.TSocket('localhost', 9090)
# Buffering is critical. Raw sockets are very slow
transport = TTransport.TBufferedTransport(transport)
# Wrap in a protocol
protocol = TBinaryProtocol.TBinaryProtocol(transport)
# Create a client to use the protocol encoder
client = Match.Client(protocol)
# Connect!
transport.open()
# 具體業務代碼
user = User(user_id, username, score)
if op == "add":
client.add_user(user, "")
elif op == "remove":
client.remove_user(user, "")
# Close!
transport.close()
def main():
for line in stdin:
op, user_id, username, score = line.split(' ')
operate(op, int(user_id), username, int(score))
if __name__ == "__main__":
main()
進行運行查錯
步驟並做正確輸入,如果服務端處有相應輸出,說明函數調用成功,運行成功
git保存一下:
git add client.py
git commit -m "finsh client.py"
服務端完善
服務端主要有兩個功能:
- 接收客戶端(
game
)的添加和刪除用戶請求 - 完成匹配工作
這兩個功能需要並行執行,為了防止阻塞接收client
請求,需要開一個線程去不停地進行匹配。實現邏輯圖如下:
每一次只選前兩個匹配版main.cpp
:
// This autogenerated skeleton file illustrates how to build a server.
// You should copy it to another filename to avoid overwriting it.
#include "match_server/Match.h"
#include <thrift/protocol/TBinaryProtocol.h>
#include <thrift/server/TSimpleServer.h>
#include <thrift/transport/TServerSocket.h>
#include <thrift/transport/TBufferTransports.h>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <vector>
using namespace ::apache::thrift;
using namespace ::apache::thrift::protocol;
using namespace ::apache::thrift::transport;
using namespace ::apache::thrift::server;
using namespace ::match_service;
using namespace std;
struct Task
{
User user;
string type;
};
struct MessageQueue
{
queue<Task> q;
mutex m;
condition_variable cv;
}message_queue;
class Pool
{
public:
void save_result(int a, int b){
printf("Match result : %d, %d\n", a, b);
}
void match(){
while (users.size() > 1){
User a = users[0], b = users[1];
users.erase(users.begin());
users.erase(users.begin());
save_result(a.id, b.id);
}
}
void add(User user){
users.push_back(user);
}
void remove(User user){
for (uint32_t i = 0; i < users.size(); i++) {
if (users[i].id == user.id) {
users.erase(users.begin() + i);
break;
}
}
}
private:
vector<User> users;
}pool;
class MatchHandler : virtual public MatchIf {
public:
MatchHandler() {
// Your initialization goes here
}
/**
* user: 添加的用戶信息
* info: 附加信息
* 在匹配池中添加一名用戶
*
* @param user
* @param info
*/
int32_t add_user(const User& user, const std::string& info) {
// Your implementation goes here
printf("add_user\n");
//通過消息隊列中的鎖將方法鎖着。
//好處:你不需要進行解鎖操作,當方法執行完畢,這個變量就會自動注銷
unique_lock<mutex> lck(message_queue.m);
message_queue.q.push({user, "add"});
//喚醒所有條件變量
message_queue.cv.notify_all();
return 0;
}
/**
* user: 刪除的用戶信息
* info: 附加信息
* 從匹配池中刪除一名用戶
*
* @param user
* @param info
*/
int32_t remove_user(const User& user, const std::string& info) {
// Your implementation goes here
printf("remove_user\n");
unique_lock<mutex> lck(message_queue.m);
message_queue.q.push({user, "remove"});
message_queue.cv.notify_all();
return 0;
}
};
void consume_task(){
while (true){
unique_lock<mutex> lck(message_queue.m);
if (message_queue.q.empty()){
/* 因為消費者線程(不止一個)會頻繁判斷隊列是否為空,導致CPU做無用功。
* 所以使用條件變量的wait()函數可使得當前線程阻塞,直至條件變量喚醒。
* 當線程阻塞的時候,該函數會自動解鎖,允許其他線程執行。
**/
message_queue.cv.wait(lck);
} else {
Task task = message_queue.q.front();
message_queue.q.pop();
lck.unlock();
if (task.type == "add") pool.add(task.user);
else if (task.type == "remove") pool.remove(task.user);
pool.match();
}
}
}
int main(int argc, char **argv) {
int port = 9090;
::std::shared_ptr<MatchHandler> handler(new MatchHandler());
::std::shared_ptr<TProcessor> processor(new MatchProcessor(handler));
::std::shared_ptr<TServerTransport> serverTransport(new TServerSocket(port));
::std::shared_ptr<TTransportFactory> transportFactory(new TBufferedTransportFactory());
::std::shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory());
TSimpleServer server(processor, serverTransport, transportFactory, protocolFactory);
printf("Start Match Server\n");
thread matching_thread(consume_task);
server.serve();
return 0;
}
只修改了main.cpp
,只編譯鏈接main.cpp
先編譯main.cpp
,在鏈接時,要用到thrift動態鏈接庫
和線程相關的動態鏈接庫
,所以鏈接時應該執行:
g++ -c main.cpp
g++ *.o -o main -lthrift -pthread
測試一下
git
保存一下
數據存儲客戶端的實現
-
在
thrift
文件夾下,編輯save.thrift
,用來生成數據存儲客戶端
的一系列文件namespace cpp save_service service Save { /** * username: myserver的名稱 * password: myserver的密碼的md5sum的前8位 * 用戶名密碼驗證成功會返回0,驗證失敗會返回1 * 驗證成功后,結果會被保存到myserver:homework/lesson_6/result.txt中 */ i32 save_data(1: string username, 2: string password, 3: i32 player1_id, 4: i32 player2_id) }
-
在
match_system/src
下執行:thrift -r --gen cpp ../../thrift/save.thrift
這樣,
thrift
服務端的一系列文件就會生成在src
文件夾中的gen-cpp
文件夾下,為了划分業務模塊將gen-cpp
重命名為save_client
注意:
由於c++整個項目只能有一個
main
函數,而整個服務端的邏輯都在thrift_project/match_system/src
下的main.cpp
實現。所以一定要刪除thrift_project/match_system/src/save_client
下的Save_server.skeleton.cpp
。而python沒有這個問題,所以在用python實現客戶端時,主框架文件可刪可不刪。 -
改動
main.cpp
將數據存儲端的業務寫進去官方參考地址:Tutorial- > C++的client,的主要改動點:
-
引入缺少頭文件,即
save_client/Save.h
,thrift/transport/TTransportUtils.h>
和<thrift/transport/TSocket.h>
-
補全命名空間,即添加
using namespace ::save_service;
-
在
class Pool
中的save_resut
函數中,添加官網 C++樣例的client
中的main
函數中的所有代碼 -
由於數據存儲是實現在
myserver
上,所以在連接時要更改ip地址
。myserver
的ip地址可以執行homework 4 getinfo
查看。即自己的數據存儲服務器 -
將
CalculatorClient
改為SaveClient
-
將
transport->open()
和transport->close();
之間的教程代碼刪除,在此之間實現自己的業務// This autogenerated skeleton file illustrates how to build a server. // You should copy it to another filename to avoid overwriting it. #include "match_server/Match.h" #include "save_client/Save.h" #include <thrift/protocol/TBinaryProtocol.h> #include <thrift/server/TSimpleServer.h> #include <thrift/transport/TServerSocket.h> #include <thrift/transport/TBufferTransports.h> #include <thrift/transport/TSocket.h> #include <thrift/transport/TTransportUtils.h> #include <thread> #include <mutex> #include <condition_variable> #include <queue> #include <vector> #include <iostream> using namespace ::apache::thrift; using namespace ::apache::thrift::protocol; using namespace ::apache::thrift::transport; using namespace ::apache::thrift::server; using namespace ::match_service; using namespace ::save_service; using namespace std; struct Task { User user; string type; }; struct MessageQueue { queue<Task> q; mutex m; condition_variable cv; }message_queue; class Pool { public: void save_result(int a, int b){ std::shared_ptr<TTransport> socket(new TSocket("123.57.47.211", 9090)); std::shared_ptr<TTransport> transport(new TBufferedTransport(socket)); std::shared_ptr<TProtocol> protocol(new TBinaryProtocol(transport)); SaveClient client(protocol); puts("w"); try { transport->open(); int res = client.save_data("acs_2104", "fcf05d68", a, b); if (!res) puts("數據存儲成功!"); else puts("數據存取失敗!"); transport->close(); } catch (TException& tx) { cout << "ERROR: " << tx.what() << endl; } printf("Match result : %d, %d\n", a, b); } void match(){ while (users.size() > 1){ User a = users[0], b = users[1]; users.erase(users.begin()); users.erase(users.begin()); save_result(a.id, b.id); } } void add(User user){ users.push_back(user); } void remove(User user){ for (uint32_t i = 0; i < users.size(); i++) { if (users[i].id == user.id) { users.erase(users.begin() + i); break; } } } private: vector<User> users; }pool; class MatchHandler : virtual public MatchIf { public: MatchHandler() { // Your initialization goes here } /** * user: 添加的用戶信息 * info: 附加信息 * 在匹配池中添加一名用戶 * * @param user * @param info */ int32_t add_user(const User& user, const std::string& info) { // Your implementation goes here printf("add_user\n"); //通過消息隊列中的鎖將方法鎖着。 //好處:你不需要進行解鎖操作,當方法執行完畢,這個變量就會自動注銷 unique_lock<mutex> lck(message_queue.m); message_queue.q.push({user, "add"}); //喚醒所有條件變量 message_queue.cv.notify_all(); return 0; } /** * user: 刪除的用戶信息 * info: 附加信息 * 從匹配池中刪除一名用戶 * * @param user * @param info */ int32_t remove_user(const User& user, const std::string& info) { // Your implementation goes here printf("remove_user\n"); unique_lock<mutex> lck(message_queue.m); message_queue.q.push({user, "remove"}); message_queue.cv.notify_all(); return 0; } }; void consume_task(){ while (true){ unique_lock<mutex> lck(message_queue.m); if (message_queue.q.empty()){ /* 因為消費者線程(不止一個)會頻繁判斷隊列是否為空,導致CPU做無用功。 * 所以使用條件變量的wait()函數可使得當前線程阻塞,直至條件變量喚醒。 * 當線程阻塞的時候,該函數會自動解鎖,允許其他線程執行。 **/ message_queue.cv.wait(lck); } else { Task task = message_queue.q.front(); message_queue.q.pop(); lck.unlock(); if (task.type == "add") pool.add(task.user); else if (task.type == "remove") pool.remove(task.user); pool.match(); } } } int main(int argc, char **argv) { int port = 9090; ::std::shared_ptr<MatchHandler> handler(new MatchHandler()); ::std::shared_ptr<TProcessor> processor(new MatchProcessor(handler)); ::std::shared_ptr<TServerTransport> serverTransport(new TServerSocket(port)); ::std::shared_ptr<TTransportFactory> transportFactory(new TBufferedTransportFactory()); ::std::shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory()); TSimpleServer server(processor, serverTransport, transportFactory, protocolFactory); printf("Start Match Server\n"); thread matching_thread(consume_task); server.serve(); return 0; }
-
-
編譯運行
g++ -c save_client/*.cpp g++ -c main.cpp g++ *.o -o main -lthrift -pthread
-
驗證結果,登錄到myserver服務器上查看存儲的結果:
ssh myserver cd homework/lesson_6 cat result.txt
注意保存提交到git
。
匹配系統升級1:按照分差匹配用戶
實現思路:每一秒鍾匹配一次,只要發現一對分差<=50的匹配成功。
注意,match
函數的實現的功能只要匹配一對即可,修改下一消耗消息隊列函數和匹配函數即可。
main.cpp
改動如下:
// This autogenerated skeleton file illustrates how to build a server.
// You should copy it to another filename to avoid overwriting it.
#include "match_server/Match.h"
#include "save_client/Save.h"
#include <thrift/protocol/TBinaryProtocol.h>
#include <thrift/server/TSimpleServer.h>
#include <thrift/transport/TServerSocket.h>
#include <thrift/transport/TBufferTransports.h>
#include <thrift/transport/TSocket.h>
#include <thrift/transport/TTransportUtils.h>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <vector>
#include <iostream>
#include <unistd.h>
using namespace ::apache::thrift;
using namespace ::apache::thrift::protocol;
using namespace ::apache::thrift::transport;
using namespace ::apache::thrift::server;
using namespace ::match_service;
using namespace ::save_service;
using namespace std;
struct Task
{
User user;
string type;
};
struct MessageQueue
{
queue<Task> q;
mutex m;
condition_variable cv;
}message_queue;
class Pool
{
public:
void save_result(int a, int b){
std::shared_ptr<TTransport> socket(new TSocket("123.57.47.211", 9090));
std::shared_ptr<TTransport> transport(new TBufferedTransport(socket));
std::shared_ptr<TProtocol> protocol(new TBinaryProtocol(transport));
SaveClient client(protocol);
try {
transport->open();
int res = client.save_data("acs_2104", "fcf05d68", a, b);
if (!res) puts("數據存儲成功!");
else puts("數據存取失敗!");
transport->close();
} catch (TException& tx) {
cout << "ERROR: " << tx.what() << endl;
}
printf("Match result : %d, %d\n", a, b);
}
void match(){
while (users.size() > 1){
sort(users.begin(), users.end(), [&](User& a, User b){
return a.score < b.score;
});
bool flag = true;
for(uint32_t i = 1; i < users.size(); i++){
User a = users[i], b = users[i - 1];
if (a.score - b.score <= 50) {
save_result(a.id, b.id);
users.erase(users.begin() + i - 1, users.begin() + i + 1);
flag = false;
break;
}
}
if (flag) break;
}
}
void add(User user){
users.push_back(user);
}
void remove(User user){
for (uint32_t i = 0; i < users.size(); i++) {
if (users[i].id == user.id) {
users.erase(users.begin() + i);
break;
}
}
}
private:
vector<User> users;
}pool;
class MatchHandler : virtual public MatchIf {
public:
MatchHandler() {
// Your initialization goes here
}
/**
* user: 添加的用戶信息
* info: 附加信息
* 在匹配池中添加一名用戶
*
* @param user
* @param info
*/
int32_t add_user(const User& user, const std::string& info) {
// Your implementation goes here
printf("add_user\n");
//通過消息隊列中的鎖將方法鎖着。
//好處:你不需要進行解鎖操作,當方法執行完畢,這個變量就會自動注銷
unique_lock<mutex> lck(message_queue.m);
message_queue.q.push({user, "add"});
//喚醒所有條件變量
message_queue.cv.notify_all();
return 0;
}
/**
* user: 刪除的用戶信息
* info: 附加信息
* 從匹配池中刪除一名用戶
*
* @param user
* @param info
*/
int32_t remove_user(const User& user, const std::string& info) {
// Your implementation goes here
printf("remove_user\n");
unique_lock<mutex> lck(message_queue.m);
message_queue.q.push({user, "remove"});
message_queue.cv.notify_all();
return 0;
}
};
void consume_task(){
while (true){
unique_lock<mutex> lck(message_queue.m);
if (message_queue.q.empty()){
/* 因為消費者線程(不止一個)會頻繁判斷隊列是否為空,導致CPU做無用功。
* 所以使用條件變量的wait()函數可使得當前線程阻塞,直至條件變量喚醒。
* 當線程阻塞的時候,該函數會自動解鎖,允許其他線程執行。
**/
// message_queue.cv.wait(lck);
lck.unlock();
pool.match();
sleep(1);
} else {
Task task = message_queue.q.front();
message_queue.q.pop();
lck.unlock();
if (task.type == "add") pool.add(task.user);
else if (task.type == "remove") pool.remove(task.user);
pool.match();
}
}
}
int main(int argc, char **argv) {
int port = 9090;
::std::shared_ptr<MatchHandler> handler(new MatchHandler());
::std::shared_ptr<TProcessor> processor(new MatchProcessor(handler));
::std::shared_ptr<TServerTransport> serverTransport(new TServerSocket(port));
::std::shared_ptr<TTransportFactory> transportFactory(new TBufferedTransportFactory());
::std::shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory());
TSimpleServer server(processor, serverTransport, transportFactory, protocolFactory);
printf("Start Match Server\n");
thread matching_thread(consume_task);
server.serve();
return 0;
}
測試一下然后git保存一下:
匹配系統升級2:多線程服務器
之前的版本都是用一個線程來add user
和remove user
,想要提高效率和並發量,可以將服務端升級為多線程版本。
-
引入官網 C++樣例的
Server
中,main.cpp
沒有的頭文件。 -
將
main
函數中的TSimpleServer
即相關函數,替換成官網 C++樣例的Server
中的main
函數中的TThreadedServer
相關內容 -
將官網 C++樣例的
Server
中的工廠類class CalculatorCloneFactory
相關內容加進來 -
將文件中的所有
Calculator
替換為Match
,在vim中的具體操作為::1,$s/Calculator/Match/g
-
::shared::SharedServiceIf*
改為MatchIf*
修改后的main.cpp
為:
// This autogenerated skeleton file illustrates how to build a server.
// You should copy it to another filename to avoid overwriting it.
#include "match_server/Match.h"
#include "save_client/Save.h"
#include <thrift/concurrency/ThreadManager.h>
#include <thrift/concurrency/ThreadFactory.h>
#include <thrift/protocol/TBinaryProtocol.h>
#include <thrift/server/TSimpleServer.h>
#include <thrift/server/TThreadPoolServer.h>
#include <thrift/server/TThreadedServer.h>
#include <thrift/transport/TServerSocket.h>
#include <thrift/transport/TBufferTransports.h>
#include <thrift/transport/TSocket.h>
#include <thrift/transport/TTransportUtils.h>
#include <thrift/TToString.h>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <vector>
#include <iostream>
#include <unistd.h>
using namespace ::apache::thrift;
using namespace ::apache::thrift::protocol;
using namespace ::apache::thrift::transport;
using namespace ::apache::thrift::server;
using namespace ::match_service;
using namespace ::save_service;
using namespace std;
struct Task
{
User user;
string type;
};
struct MessageQueue
{
queue<Task> q;
mutex m;
condition_variable cv;
}message_queue;
class Pool
{
public:
void save_result(int a, int b){
std::shared_ptr<TTransport> socket(new TSocket("123.57.47.211", 9090));
std::shared_ptr<TTransport> transport(new TBufferedTransport(socket));
std::shared_ptr<TProtocol> protocol(new TBinaryProtocol(transport));
SaveClient client(protocol);
try {
transport->open();
int res = client.save_data("acs_2104", "fcf05d68", a, b);
if (!res) puts("數據存儲成功!");
else puts("數據存取失敗!");
transport->close();
} catch (TException& tx) {
cout << "ERROR: " << tx.what() << endl;
}
printf("Match result : %d, %d\n", a, b);
}
void match(){
while (users.size() > 1){
sort(users.begin(), users.end(), [&](User& a, User b){
return a.score < b.score;
});
bool flag = true;
for(uint32_t i = 1; i < users.size(); i++){
User a = users[i], b = users[i - 1];
if (a.score - b.score <= 50) {
save_result(a.id, b.id);
users.erase(users.begin() + i - 1, users.begin() + i + 1);
flag = false;
break;
}
}
if (flag) break;
}
}
void add(User user){
users.push_back(user);
}
void remove(User user){
for (uint32_t i = 0; i < users.size(); i++) {
if (users[i].id == user.id) {
users.erase(users.begin() + i);
break;
}
}
}
private:
vector<User> users;
}pool;
class MatchHandler : virtual public MatchIf {
public:
MatchHandler() {
// Your initialization goes here
}
/**
* user: 添加的用戶信息
* info: 附加信息
* 在匹配池中添加一名用戶
*
* @param user
* @param info
*/
int32_t add_user(const User& user, const std::string& info) {
// Your implementation goes here
printf("add_user\n");
//通過消息隊列中的鎖將方法鎖着。
//好處:你不需要進行解鎖操作,當方法執行完畢,這個變量就會自動注銷
unique_lock<mutex> lck(message_queue.m);
message_queue.q.push({user, "add"});
//喚醒所有條件變量
message_queue.cv.notify_all();
return 0;
}
/**
* user: 刪除的用戶信息
* info: 附加信息
* 從匹配池中刪除一名用戶
*
* @param user
* @param info
*/
int32_t remove_user(const User& user, const std::string& info) {
// Your implementation goes here
printf("remove_user\n");
unique_lock<mutex> lck(message_queue.m);
message_queue.q.push({user, "remove"});
message_queue.cv.notify_all();
return 0;
}
};
void consume_task(){
while (true){
unique_lock<mutex> lck(message_queue.m);
if (message_queue.q.empty()){
/* 因為消費者線程(不止一個)會頻繁判斷隊列是否為空,導致CPU做無用功。
* 所以使用條件變量的wait()函數可使得當前線程阻塞,直至條件變量喚醒。
* 當線程阻塞的時候,該函數會自動解鎖,允許其他線程執行。
**/
// message_queue.cv.wait(lck);
lck.unlock();
pool.match();
sleep(1);
} else {
Task task = message_queue.q.front();
message_queue.q.pop();
lck.unlock();
if (task.type == "add") pool.add(task.user);
else if (task.type == "remove") pool.remove(task.user);
pool.match();
}
}
}
class MatchCloneFactory : virtual public MatchIfFactory {
public:
~MatchCloneFactory() override = default;
MatchIf* getHandler(const ::apache::thrift::TConnectionInfo& connInfo) override
{
std::shared_ptr<TSocket> sock = std::dynamic_pointer_cast<TSocket>(connInfo.transport);
cout << "Incoming connection\n";
cout << "\tSocketInfo: " << sock->getSocketInfo() << "\n";
cout << "\tPeerHost: " << sock->getPeerHost() << "\n";
cout << "\tPeerAddress: " << sock->getPeerAddress() << "\n";
cout << "\tPeerPort: " << sock->getPeerPort() << "\n";
return new MatchHandler;
}
void releaseHandler(MatchIf* handler) override {
delete handler;
}
};
int main(int argc, char **argv) {
TThreadedServer server(
std::make_shared<MatchProcessorFactory>(std::make_shared<MatchCloneFactory>()),
std::make_shared<TServerSocket>(9090), //port
std::make_shared<TBufferedTransportFactory>(),
std::make_shared<TBinaryProtocolFactory>());
printf("Start Match Server\n");
thread matching_thread(consume_task);
server.serve();
return 0;
}
編譯運行測試並保存git
:
匹配系統升級3:隨時間擴大匹配閾值
匹配機制:等待時間越長,閾值越大。即匹配的范圍隨時間的推移而變大 故需要記錄當前玩家在匹配池中等待的秒數。
// This autogenerated skeleton file illustrates how to build a server.
// You should copy it to another filename to avoid overwriting it.
#include "match_server/Match.h"
#include "save_client/Save.h"
#include <thrift/concurrency/ThreadManager.h>
#include <thrift/concurrency/ThreadFactory.h>
#include <thrift/protocol/TBinaryProtocol.h>
#include <thrift/server/TSimpleServer.h>
#include <thrift/server/TThreadPoolServer.h>
#include <thrift/server/TThreadedServer.h>
#include <thrift/transport/TServerSocket.h>
#include <thrift/transport/TBufferTransports.h>
#include <thrift/transport/TSocket.h>
#include <thrift/transport/TTransportUtils.h>
#include <thrift/TToString.h>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <vector>
#include <iostream>
#include <unistd.h>
using namespace ::apache::thrift;
using namespace ::apache::thrift::protocol;
using namespace ::apache::thrift::transport;
using namespace ::apache::thrift::server;
using namespace ::match_service;
using namespace ::save_service;
using namespace std;
struct Task
{
User user;
string type;
};
struct MessageQueue
{
queue<Task> q;
mutex m;
condition_variable cv;
}message_queue;
class Pool
{
public:
void save_result(int a, int b){
std::shared_ptr<TTransport> socket(new TSocket("123.57.47.211", 9090));
std::shared_ptr<TTransport> transport(new TBufferedTransport(socket));
std::shared_ptr<TProtocol> protocol(new TBinaryProtocol(transport));
SaveClient client(protocol);
try {
transport->open();
int res = client.save_data("acs_2104", "fcf05d68", a, b);
if (!res) puts("數據存儲成功!");
else puts("數據存取失敗!");
transport->close();
} catch (TException& tx) {
cout << "ERROR: " << tx.what() << endl;
}
printf("Match result : %d, %d\n", a, b);
}
bool check_match(int i, int j){
User a = users[i], b = users[j];
int dt = abs(a.score - b.score);
int a_max_dif = wt[i] * 50;
int b_max_dif = wt[j] * 50;
return dt <= a_max_dif && dt <= b_max_dif;
}
void match(){
for(uint32_t i = 0; i < wt.size(); i ++)
wt[i]++; // 等待秒數 + 1
while (users.size() > 1){
bool flag = true;
for (uint32_t i = 0; i < users.size(); i++){
for (uint32_t j = i + 1; j < users.size(); j++) {
User a = users[i], b = users[j];
if (check_match(i, j)){
// 注意刪除順序先j后i
users.erase(users.begin() + j);
users.erase(users.begin() + i);
wt.erase(wt.begin() + j);
wt.erase(wt.begin() + i);
save_result(a.id, b.id);
flag = false;
break;
}
}
if (!flag) break;
}
if (flag) break;
}
}
void add(User user){
users.push_back(user);
wt.push_back(0);
}
void remove(User user){
for (uint32_t i = 0; i < users.size(); i++) {
if (users[i].id == user.id) {
users.erase(users.begin() + i);
wt.erase(wt.begin() + i);
break;
}
}
}
private:
vector<User> users;
vector<int> wt; // 等待時間,單位:s
}pool;
class MatchHandler : virtual public MatchIf {
public:
MatchHandler() {
// Your initialization goes here
}
/**
* user: 添加的用戶信息
* info: 附加信息
* 在匹配池中添加一名用戶
*
* @param user
* @param info
*/
int32_t add_user(const User& user, const std::string& info) {
// Your implementation goes here
printf("add_user\n");
//通過消息隊列中的鎖將方法鎖着。
//好處:你不需要進行解鎖操作,當方法執行完畢,這個變量就會自動注銷
unique_lock<mutex> lck(message_queue.m);
message_queue.q.push({user, "add"});
//喚醒所有條件變量
message_queue.cv.notify_all();
return 0;
}
/**
* user: 刪除的用戶信息
* info: 附加信息
* 從匹配池中刪除一名用戶
*
* @param user
* @param info
*/
int32_t remove_user(const User& user, const std::string& info) {
// Your implementation goes here
printf("remove_user\n");
unique_lock<mutex> lck(message_queue.m);
message_queue.q.push({user, "remove"});
message_queue.cv.notify_all();
return 0;
}
};
void consume_task(){
while (true){
unique_lock<mutex> lck(message_queue.m);
if (message_queue.q.empty()){
/* 因為消費者線程(不止一個)會頻繁判斷隊列是否為空,導致CPU做無用功。
* 所以使用條件變量的wait()函數可使得當前線程阻塞,直至條件變量喚醒。
* 當線程阻塞的時候,該函數會自動解鎖,允許其他線程執行。
**/
// message_queue.cv.wait(lck);
lck.unlock();
pool.match();
sleep(1);
} else {
Task task = message_queue.q.front();
message_queue.q.pop();
lck.unlock();
if (task.type == "add") pool.add(task.user);
else if (task.type == "remove") pool.remove(task.user);
}
}
}
class MatchCloneFactory : virtual public MatchIfFactory {
public:
~MatchCloneFactory() override = default;
MatchIf* getHandler(const ::apache::thrift::TConnectionInfo& connInfo) override
{
std::shared_ptr<TSocket> sock = std::dynamic_pointer_cast<TSocket>(connInfo.transport);
cout << "Incoming connection\n";
cout << "\tSocketInfo: " << sock->getSocketInfo() << "\n";
cout << "\tPeerHost: " << sock->getPeerHost() << "\n";
cout << "\tPeerAddress: " << sock->getPeerAddress() << "\n";
cout << "\tPeerPort: " << sock->getPeerPort() << "\n";
return new MatchHandler;
}
void releaseHandler(MatchIf* handler) override {
delete handler;
}
};
int main(int argc, char **argv) {
TThreadedServer server(
std::make_shared<MatchProcessorFactory>(std::make_shared<MatchCloneFactory>()),
std::make_shared<TServerSocket>(9090), //port
std::make_shared<TBufferedTransportFactory>(),
std::make_shared<TBinaryProtocolFactory>());
printf("Start Match Server\n");
thread matching_thread(consume_task);
server.serve();
return 0;
}
額外知識點
c++編譯很慢,所以對於未修改的文件不需要修改
make
:識別那些文件修改。