thrift實現多服務多線程的匹配系統


thrift實現多服務多線程的匹配系統

thrift學習教程thrift官網
本博客代碼:thrift_match_server
官網教程:進入官網->Tutorial->tutorial.thrift

Apache Thrift軟件框架用於可伸縮的跨語言服務開發,它將軟件棧代碼生成引擎結合在一起,以構建在C++、Java、Python、PHP、Ruby、Erlang、Perl、Haskell、C#、Cocoa、JavaScript、Node.js、Smalltalk、OCaml和Delphi等語言之間高效、無縫地工作的服務。

Thrift使用C++進行編寫,在安裝使用的時候需要安裝依賴,windows安裝方式見官網即可。安裝方式:thrift官網介紹安裝方式

項目目的

模擬一個多服務的游戲匹配系統,具體業務邏輯如下圖:

image-20211210125800627

准備工作

創建項目結構

  1. 創建項目文件夾thrift_demo

  2. 游戲系統節點,創建game文件夾;

    匹配系統節點,創建match_system文件夾;

    thrift相關文件,創建thrift文件夾

結果如圖:image-20211210132628478

thrift簡單語法介紹

0.開發流程

  1. 對接口進行描述,定義接口描述文件:.thrift文件,比如match.thrift
  2. 使用thrift將接口的描述文件自動成對應語言的版本的代碼,包括服務端和客戶端

1.命名空間

thrift文件命名一般都是以.thrift作為后綴:XXX.thrift,可以在該文件的開頭為該文件加上命名空間限制,格式為:

namespace 語言名稱 名稱

例如對c++來說,有:

namespace cpp match_service

2.數據類型

大小寫敏感,它共支持以下幾種基本的數據類型:

  1. string, 字符串類型,注意是全部小寫形式;
  2. i16, 16位整形類型,
  3. i32,32位整形類型,對應C/C++/java中的int類型;
  4. i64,64位整形,對應C/C++/java中的long類型;
  5. byte,8位的字符類型,對應C/C++中的char,java中的byte類型
  6. bool, 布爾類型,對應C/C++中的bool,java中的boolean類型;
  7. double,雙精度浮點類型,對應C/C++/java中的double類型;
  8. void,空類型,對應C/C++/java中的void類型;該類型主要用作函數的返回值,

除上述基本類型外,ID還支持以下類型:

  1. map,map類型,例如,定義一個map對象:map[HTML_REMOVED] newmap;
  2. set,集合類型,例如,定義set[HTML_REMOVED]對象:set[HTML_REMOVED] aSet;
  3. list,鏈表類型,例如,定義一個list[HTML_REMOVED]對象:list[HTML_REMOVED] aList;

struct,自定義結構體類型,在IDL中可以自己定義結構體,對應C中的struct,c++中的struct和class,java中的class。例如:

struct User{
      1: i32 id,
      2: string name,
      3: i32 score
}

注意,在struct定義結構體時需要對每個結構體成員用序號標識:“序號: ”。

3.函數接口

文件中對所有接口函數的描述都放在service中,service的名字可以自己指定,該名字也將被用作生成的特定語言接口文件的名字。

接口函數需要對參數使用序號標號,除最后一個接口函數外,要以,結束對函數的描述。

比如:

namespace cpp match_service

struct User {
    1: i32 id,
    2: string name,
    3: i32 score
}

service Match {

    /**
     * user: 添加的用戶信息
     * info: 附加信息
     * 在匹配池中添加一個名用戶
     */
    i32 add_user(1: User user, 2: string info),

    /**
     * user: 刪除的用戶信息
     * info: 附加信息
     * 從匹配池中刪除一名用戶
     */
    i32 remove_user(1: User user, 2: string info),
}

具體運行流程,以下面匹配系統例子介紹。

服務端框架搭建

創建接口描述文件

對於匹配系統的thrift相關配置,我們在thrift文件夾下,創建match.thrift文件,用來生成匹配系統服務端的一系列文件。

vi thrift/match.thrift

參考:打開thrift官網,在上方選擇Tutorial項,查看thrift官方教程,點擊下方的tutorial.thrift進入一個示例文件。

編寫match.thrift配置文件,只需要在文件中寫明接口對象即可:

namespace cpp match_service

struct User {
    1: i32 id,
    2: string name,
    3: i32 score
}

service Match {

    /**
     * user: 添加的用戶信息
     * info: 附加信息
     * 在匹配池中添加一個名用戶
     */
    i32 add_user(1: User user, 2: string info),

    /**
     * user: 刪除的用戶信息
     * info: 附加信息
     * 從匹配池中刪除一名用戶
     */
    i32 remove_user(1: User user, 2: string info),
}

編譯成對應語言的版本

​ 進入到match_system文件夾,創建src文件夾。在src下執行語句:

# thrift -r --gen <語言名> <.thrift文件的路徑>
thrift -r --gen cpp ../../thrift/match.thrift

​ 這樣就會生成各種配置和連接文件,還有代碼框架,只需要在框架中實現自己的業務即可。默認是放在gen-cpp,可以修改為match_server,以便更好的划分業務模塊。

​ 同時其中Match_server.skeleton.cpp為服務端的代碼框架,具體業務就是在這個文件編寫實現,將Match_server.skeleton.cpp移動到match_system/src下並重命名為main.cppmatch_system的整個業務邏輯就是在這個文件中實現。

​ 最后的文件結構如下:

image-20211210144631868

​ 代碼生成以后最好先跑一下,然后再逐步添加功能,看一下是否成功,上述操作導致需要修改兩個地方:

  1. 之前main.cppmatch_server下,現在在match_system/src下,所以main.cpp中對Match.h頭文件的引入需要修改路徑。
  2. 文件中的兩個函數int32_t add_userint32_t remove_user需要有返回值,原來沒有,會報警告,需要手動加上。
// This autogenerated skeleton file illustrates how to build a server.
// You should copy it to another filename to avoid overwriting it.

#include "match_server/Match.h"
#include <thrift/protocol/TBinaryProtocol.h>
#include <thrift/server/TSimpleServer.h>
#include <thrift/transport/TServerSocket.h>
#include <thrift/transport/TBufferTransports.h>

using namespace ::apache::thrift;
using namespace ::apache::thrift::protocol;
using namespace ::apache::thrift::transport;
using namespace ::apache::thrift::server;

using namespace  ::match_service;

class MatchHandler : virtual public MatchIf {
    public:
        MatchHandler() {
            // Your initialization goes here
        }

        /**
         * user: 添加的用戶信息
         * info: 附加信息
         * 在匹配池中添加一名用戶
         * 
         * @param user
         * @param info
         */
        int32_t add_user(const User& user, const std::string& info) {
            // Your implementation goes here
            printf("add_user\n");
            return 0;
        }

        /**
         * user: 刪除的用戶信息
         * info: 附加信息
         * 從匹配池中刪除一名用戶
         * 
         * @param user
         * @param info
         */
        int32_t remove_user(const User& user, const std::string& info) {
            // Your implementation goes here
            printf("remove_user\n");
            return 0;
        }

};

int main(int argc, char **argv) {
    int port = 9090;
    ::std::shared_ptr<MatchHandler> handler(new MatchHandler());
    ::std::shared_ptr<TProcessor> processor(new MatchProcessor(handler));
    ::std::shared_ptr<TServerTransport> serverTransport(new TServerSocket(port));
    ::std::shared_ptr<TTransportFactory> transportFactory(new TBufferedTransportFactory());
    ::std::shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory());

    TSimpleServer server(processor, serverTransport, transportFactory, protocolFactory);
    server.serve();
    return 0;
}


c++文件的編譯、鏈接和運行

C++的編譯過程

(注意大小寫)假設我有一個main.cpp文件

  1. -E:只對文件進行預處理,不進行編譯和匯編。g++ -E main.cpp——>在dos命令行查看某文件的預處理過程,如果你想查看詳細的預處理,可以重定向到一個文件中,如:g++ -E main.cpp -o main.i

  2. -s:編譯到匯編語言,不進行匯編和鏈接,即只激活預處理和編譯,生成匯編語言,如果你想查看詳細的編譯,可以重定向到一個文件中,如:g++ -S main.cpp -o main.s

  3. -c:編譯到目標代碼,g++ -c main.s -o 文件名.o

  4. -o:生成鏈接文件: 如果該文件是獨立的,與其他自己編寫的文件無依賴關系。直接g++ main.o -o 生成的可執行文件的文件名

假設該文件依賴其他源文件(不需要加入頭文件)temp.cpp,在對temp.cpp文件進行預處理->編譯->匯編后,使用指令g++ temp.o main.o -o main

  1. .\:執行文件,輸出結果。如: .\main,當然你可以直接g++ main.cpp temp.cpp -o main 生成目標文件讓編譯器自動為你處理其他流程。

步驟

  1. 編譯src文件夾下的所有.cpp文件

    g++ -c main.cpp match_server/*.cpp
    
  2. 將所有生成的.o文件鏈接成一個可執行文件,要用到thrift動態鏈接庫

    g++ *.o -o main -lthrift
    
  3. 執行生成的可執行文件main

    ./main
    

    為了判斷文件是否正確執行,可以在main.cpp中寫一些輸出語句,驗證效果

  4. 將項目版本提交git,提交時,一般會刪除中間生成的文件和可執行文件

    git add .
    git restore --stage *.o
    git restore --stage match_system/src/main
    git commit -m "first can run"	
    

客戶端框架搭建

跟上面一樣的步驟

  1. game下創建src,在src下執行:

    thrift -r --gen py ../../thrift/match.thrift
    

    這樣,thrift服務端的一系列文件就會生成在src文件夾中的gen-py文件夾下,為了划分業務模塊將gen-py重命名為match_client

    文件結構如下:

    .
    |-- Match.o
    |-- main
    |-- main.cpp
    |-- main.o
    |-- match_client
    |   |-- __init__.py
    |   `-- match
    |       |-- Match-remote
    |       |-- Match.py
    |       |-- __init__.py
    |       |-- constants.py
    |       `-- ttypes.py
    |-- match_server
    |   |-- Match.cpp
    |   |-- Match.h
    |   |-- match_types.cpp
    |   `-- match_types.h
    `-- match_types.o
    

    因為我們只需要實現客戶端,不需要服務端,所以可以把Match-remote刪除

  2. src下創建文件client.py,將 Apache Thrift - Python 頁面中,client中的代碼復制到該文件中,並將代碼進行適當的改動和刪除,client.py中的初始代碼如下:

    from match_client.match import Match
    from match_client.match.ttypes import User
    
    from thrift import Thrift
    from thrift.transport import TSocket
    from thrift.transport import TTransport
    from thrift.protocol import TBinaryProtocol
    
    
    def main():
        # Make socket
        transport = TSocket.TSocket('localhost', 9090)
    
        # Buffering is critical. Raw sockets are very slow
        transport = TTransport.TBufferedTransport(transport)
    
        # Wrap in a protocol
        protocol = TBinaryProtocol.TBinaryProtocol(transport)
    
        # Create a client to use the protocol encoder
        client = Match.Client(protocol)
    
        # Connect!
        transport.open()
    
        # 具體業務代碼
        user = User(1, 'sdz', 1500)
        client.add_user(user, "")
    
        # Close!
        transport.close()
    
    if __name__ == "__main__":
        main()
    
    
  3. 運行一下

    • 先在thrift_demo/match_system/src下,執行:./main,使服務端運行

    • 再在thrift_demo/game/src下,執行:python3 client.py,使客戶端運行

    • 觀察服務端運行處有無相應輸出,若有,說明成功運行

    image-20211210193911626

  4. git保存一下

    git add .
    git restore --stage *.pyc # pyc文件為中間結果文件,類似c++的.o文件
    git commit -m "add match client"
    

客戶端完善

from match_client.match import Match
from match_client.match.ttypes import User

from thrift import Thrift
from thrift.transport import TSocket
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol

# python讀取命令行參數包
from sys import stdin

def operate(op, user_id, username, score):
    # Make socket
    transport = TSocket.TSocket('localhost', 9090)

    # Buffering is critical. Raw sockets are very slow
    transport = TTransport.TBufferedTransport(transport)

    # Wrap in a protocol
    protocol = TBinaryProtocol.TBinaryProtocol(transport)

    # Create a client to use the protocol encoder
    client = Match.Client(protocol)

    # Connect!
    transport.open()

    # 具體業務代碼
    user = User(user_id, username, score)
    if op == "add":
        client.add_user(user, "")
    elif op == "remove":
        client.remove_user(user, "")

    # Close!
    transport.close()

def main():
    for line in stdin:
        op, user_id, username, score = line.split(' ')
        operate(op, int(user_id), username, int(score))

if __name__ == "__main__":
    main()

進行運行查錯步驟並做正確輸入,如果服務端處有相應輸出,說明函數調用成功,運行成功

image-20211210202640610

git保存一下:

git add client.py
git commit -m "finsh client.py"

服務端完善

服務端主要有兩個功能:

  1. 接收客戶端(game)的添加和刪除用戶請求
  2. 完成匹配工作

這兩個功能需要並行執行,為了防止阻塞接收client請求,需要開一個線程去不停地進行匹配。實現邏輯圖如下:

image-20211211003137397

每一次只選前兩個匹配版main.cpp:

// This autogenerated skeleton file illustrates how to build a server.
// You should copy it to another filename to avoid overwriting it.

#include "match_server/Match.h"
#include <thrift/protocol/TBinaryProtocol.h>
#include <thrift/server/TSimpleServer.h>
#include <thrift/transport/TServerSocket.h>
#include <thrift/transport/TBufferTransports.h>

#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <vector>

using namespace ::apache::thrift;
using namespace ::apache::thrift::protocol;
using namespace ::apache::thrift::transport;
using namespace ::apache::thrift::server;

using namespace  ::match_service;
using namespace std;

struct Task
{
    User user;
    string type;
};

struct MessageQueue
{
    queue<Task> q;
    mutex m;
    condition_variable cv;
}message_queue;

class Pool
{
    public:
        void save_result(int a, int b){
            printf("Match result : %d, %d\n", a, b);
        }

        void match(){
            while (users.size() > 1){
                User a = users[0], b = users[1];
                users.erase(users.begin());
                users.erase(users.begin());
                save_result(a.id, b.id);
            }
        }

        void add(User user){
            users.push_back(user);
        }

        void remove(User user){
            for (uint32_t i = 0; i < users.size(); i++) {
                if (users[i].id == user.id) {
                    users.erase(users.begin() + i);
                    break;
                }
            }
        }
    private:
        vector<User> users;
}pool;

class MatchHandler : virtual public MatchIf {
    public:
        MatchHandler() {
            // Your initialization goes here
        }

        /**
         * user: 添加的用戶信息
         * info: 附加信息
         * 在匹配池中添加一名用戶
         * 
         * @param user
         * @param info
         */
        int32_t add_user(const User& user, const std::string& info) {
            // Your implementation goes here
            printf("add_user\n");

            //通過消息隊列中的鎖將方法鎖着。
            //好處:你不需要進行解鎖操作,當方法執行完畢,這個變量就會自動注銷
            unique_lock<mutex> lck(message_queue.m);
            message_queue.q.push({user, "add"});
            //喚醒所有條件變量
            message_queue.cv.notify_all();

            return 0;
        }

        /**
         * user: 刪除的用戶信息
         * info: 附加信息
         * 從匹配池中刪除一名用戶
         * 
         * @param user
         * @param info
         */
        int32_t remove_user(const User& user, const std::string& info) {
            // Your implementation goes here
            printf("remove_user\n");

            unique_lock<mutex> lck(message_queue.m);
            message_queue.q.push({user, "remove"});
            message_queue.cv.notify_all();

            return 0;
        }

};
void consume_task(){
    while (true){
        unique_lock<mutex> lck(message_queue.m);
        if (message_queue.q.empty()){
            /* 因為消費者線程(不止一個)會頻繁判斷隊列是否為空,導致CPU做無用功。
             * 所以使用條件變量的wait()函數可使得當前線程阻塞,直至條件變量喚醒。
             * 當線程阻塞的時候,該函數會自動解鎖,允許其他線程執行。
             **/
            message_queue.cv.wait(lck);
        } else {
            Task task = message_queue.q.front();
            message_queue.q.pop();
            lck.unlock();

            if (task.type == "add") pool.add(task.user);
            else if (task.type == "remove") pool.remove(task.user);

            pool.match();
        }
    }
}



int main(int argc, char **argv) {
    int port = 9090;
    ::std::shared_ptr<MatchHandler> handler(new MatchHandler());
    ::std::shared_ptr<TProcessor> processor(new MatchProcessor(handler));
    ::std::shared_ptr<TServerTransport> serverTransport(new TServerSocket(port));
    ::std::shared_ptr<TTransportFactory> transportFactory(new TBufferedTransportFactory());
    ::std::shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory());

    TSimpleServer server(processor, serverTransport, transportFactory, protocolFactory);

    printf("Start Match Server\n");

    thread matching_thread(consume_task);

    server.serve();
    return 0;
}

只修改了main.cpp,只編譯鏈接main.cpp

先編譯main.cpp,在鏈接時,要用到thrift動態鏈接庫線程相關的動態鏈接庫,所以鏈接時應該執行:

g++ -c main.cpp
g++ *.o -o main -lthrift -pthread

測試一下

image-20211211013906809

git 保存一下

數據存儲客戶端的實現

  1. thrift文件夾下,編輯save.thrift,用來生成數據存儲客戶端的一系列文件

    namespace cpp save_service
    
    service Save {
    
        /**
         * username: myserver的名稱
         * password: myserver的密碼的md5sum的前8位
         * 用戶名密碼驗證成功會返回0,驗證失敗會返回1
         * 驗證成功后,結果會被保存到myserver:homework/lesson_6/result.txt中
         */
        i32 save_data(1: string username, 2: string password, 3: i32 player1_id, 4: i32 player2_id)
    }
    
  2. match_system/src下執行:

    thrift -r --gen cpp ../../thrift/save.thrift
    

    這樣,thrift服務端的一系列文件就會生成在src文件夾中的gen-cpp文件夾下,為了划分業務模塊將gen-cpp重命名為save_client

    注意:

    由於c++整個項目只能有一個main函數,而整個服務端的邏輯都在thrift_project/match_system/src下的main.cpp實現。所以一定要刪除thrift_project/match_system/src/save_client下的Save_server.skeleton.cpp。而python沒有這個問題,所以在用python實現客戶端時,主框架文件可刪可不刪。

  3. 改動main.cpp將數據存儲端的業務寫進去

    官方參考地址:Tutorial- > C++client,的主要改動點:

    • 引入缺少頭文件,即save_client/Save.hthrift/transport/TTransportUtils.h><thrift/transport/TSocket.h>

    • 補全命名空間,即添加using namespace ::save_service;

    • class Pool中的save_resut函數中,添加官網 C++樣例client中的main函數中的所有代碼

    • 由於數據存儲是實現在myserver上,所以在連接時要更改ip地址myserver的ip地址可以執行homework 4 getinfo查看。即自己的數據存儲服務器

    • CalculatorClient改為SaveClient

    • transport->open()transport->close();之間的教程代碼刪除,在此之間實現自己的業務

      // This autogenerated skeleton file illustrates how to build a server.
      // You should copy it to another filename to avoid overwriting it.
      
      #include "match_server/Match.h"
      #include "save_client/Save.h"
      #include <thrift/protocol/TBinaryProtocol.h>
      #include <thrift/server/TSimpleServer.h>
      #include <thrift/transport/TServerSocket.h>
      #include <thrift/transport/TBufferTransports.h>
      #include <thrift/transport/TSocket.h>
      #include <thrift/transport/TTransportUtils.h>
      
      #include <thread>
      #include <mutex>
      #include <condition_variable>
      #include <queue>
      #include <vector>
      #include <iostream>
      
      using namespace ::apache::thrift;
      using namespace ::apache::thrift::protocol;
      using namespace ::apache::thrift::transport;
      using namespace ::apache::thrift::server;
      
      using namespace  ::match_service;
      using namespace  ::save_service;
      using namespace std;
      
      struct Task
      {
          User user;
          string type;
      };
      
      struct MessageQueue
      {
          queue<Task> q;
          mutex m;
          condition_variable cv;
      }message_queue;
      
      class Pool
      {
          public:
              void save_result(int a, int b){
                  std::shared_ptr<TTransport> socket(new TSocket("123.57.47.211", 9090));
                  std::shared_ptr<TTransport> transport(new TBufferedTransport(socket));
                  std::shared_ptr<TProtocol> protocol(new TBinaryProtocol(transport));
                  SaveClient client(protocol);
      
                  puts("w");
                  try {
                      transport->open();
                      int res = client.save_data("acs_2104", "fcf05d68", a, b);
                      if (!res) puts("數據存儲成功!");
                      else puts("數據存取失敗!");
                      transport->close();
                  } catch (TException& tx) {
                      cout << "ERROR: " << tx.what() << endl;
                  }
                  printf("Match result : %d, %d\n", a, b);
              }
      
              void match(){
                  while (users.size() > 1){
                      User a = users[0], b = users[1];
                      users.erase(users.begin());
                      users.erase(users.begin());
                      save_result(a.id, b.id);
                  }
              }
      
              void add(User user){
                  users.push_back(user);
              }
      
              void remove(User user){
                  for (uint32_t i = 0; i < users.size(); i++) {
                      if (users[i].id == user.id) {
                          users.erase(users.begin() + i);
                          break;
                      }
                  }
              }
          private:
              vector<User> users;
      }pool;
      
      class MatchHandler : virtual public MatchIf {
          public:
              MatchHandler() {
                  // Your initialization goes here
              }
      
              /**
               * user: 添加的用戶信息
               * info: 附加信息
               * 在匹配池中添加一名用戶
               * 
               * @param user
               * @param info
               */
              int32_t add_user(const User& user, const std::string& info) {
                  // Your implementation goes here
                  printf("add_user\n");
      
                  //通過消息隊列中的鎖將方法鎖着。
                  //好處:你不需要進行解鎖操作,當方法執行完畢,這個變量就會自動注銷
                  unique_lock<mutex> lck(message_queue.m);
                  message_queue.q.push({user, "add"});
                  //喚醒所有條件變量
                  message_queue.cv.notify_all();
      
                  return 0;
              }
      
              /**
               * user: 刪除的用戶信息
               * info: 附加信息
               * 從匹配池中刪除一名用戶
               * 
               * @param user
               * @param info
               */
              int32_t remove_user(const User& user, const std::string& info) {
                  // Your implementation goes here
                  printf("remove_user\n");
      
                  unique_lock<mutex> lck(message_queue.m);
                  message_queue.q.push({user, "remove"});
                  message_queue.cv.notify_all();
      
                  return 0;
              }
      
      };
      void consume_task(){
          while (true){
              unique_lock<mutex> lck(message_queue.m);
              if (message_queue.q.empty()){
                  /* 因為消費者線程(不止一個)會頻繁判斷隊列是否為空,導致CPU做無用功。
                   * 所以使用條件變量的wait()函數可使得當前線程阻塞,直至條件變量喚醒。
                   * 當線程阻塞的時候,該函數會自動解鎖,允許其他線程執行。
                   **/
                  message_queue.cv.wait(lck);
              } else {
                  Task task = message_queue.q.front();
                  message_queue.q.pop();
                  lck.unlock();
      
                  if (task.type == "add") pool.add(task.user);
                  else if (task.type == "remove") pool.remove(task.user);
      
                  pool.match();
              }
          }
      }
      
      
      
      int main(int argc, char **argv) {
          int port = 9090;
          ::std::shared_ptr<MatchHandler> handler(new MatchHandler());
          ::std::shared_ptr<TProcessor> processor(new MatchProcessor(handler));
          ::std::shared_ptr<TServerTransport> serverTransport(new TServerSocket(port));
          ::std::shared_ptr<TTransportFactory> transportFactory(new TBufferedTransportFactory());
          ::std::shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory());
      
          TSimpleServer server(processor, serverTransport, transportFactory, protocolFactory);
      
          printf("Start Match Server\n");
      
          thread matching_thread(consume_task);
      
          server.serve();
          return 0;
      }
      
      
      
  4. 編譯運行

    g++ -c save_client/*.cpp
    g++ -c main.cpp
    g++ *.o -o main -lthrift -pthread
    
  5. 驗證結果,登錄到myserver服務器上查看存儲的結果:

    ssh myserver
    cd homework/lesson_6 
    cat result.txt
    

image-20211211031148202

注意保存提交到git

匹配系統升級1:按照分差匹配用戶

實現思路:每一秒鍾匹配一次,只要發現一對分差<=50的匹配成功。

注意,match函數的實現的功能只要匹配一對即可,修改下一消耗消息隊列函數和匹配函數即可。

main.cpp改動如下:

// This autogenerated skeleton file illustrates how to build a server.
// You should copy it to another filename to avoid overwriting it.

#include "match_server/Match.h"
#include "save_client/Save.h"
#include <thrift/protocol/TBinaryProtocol.h>
#include <thrift/server/TSimpleServer.h>
#include <thrift/transport/TServerSocket.h>
#include <thrift/transport/TBufferTransports.h>
#include <thrift/transport/TSocket.h>
#include <thrift/transport/TTransportUtils.h>

#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <vector>
#include <iostream>
#include <unistd.h>

using namespace ::apache::thrift;
using namespace ::apache::thrift::protocol;
using namespace ::apache::thrift::transport;
using namespace ::apache::thrift::server;

using namespace  ::match_service;
using namespace  ::save_service;
using namespace std;

struct Task
{
    User user;
    string type;
};

struct MessageQueue
{
    queue<Task> q;
    mutex m;
    condition_variable cv;
}message_queue;

class Pool
{
    public:
        void save_result(int a, int b){
            std::shared_ptr<TTransport> socket(new TSocket("123.57.47.211", 9090));
            std::shared_ptr<TTransport> transport(new TBufferedTransport(socket));
            std::shared_ptr<TProtocol> protocol(new TBinaryProtocol(transport));
            SaveClient client(protocol);

            try {
                transport->open();
                int res = client.save_data("acs_2104", "fcf05d68", a, b);
                if (!res) puts("數據存儲成功!");
                else puts("數據存取失敗!");
                transport->close();
            } catch (TException& tx) {
                cout << "ERROR: " << tx.what() << endl;
            }
            printf("Match result : %d, %d\n", a, b);
        }

        void match(){
            while (users.size() > 1){
                sort(users.begin(), users.end(), [&](User& a, User b){
                        return a.score < b.score;
                        });

                bool flag = true;
                for(uint32_t i = 1; i < users.size(); i++){
                    User a = users[i], b = users[i - 1];
                    if (a.score - b.score <= 50) {
                        save_result(a.id, b.id);
                        users.erase(users.begin() + i - 1, users.begin() + i + 1);
                        flag = false;
                        break;
                    }
                }
                if (flag) break;
            }
        }

        void add(User user){
            users.push_back(user);
        }

        void remove(User user){
            for (uint32_t i = 0; i < users.size(); i++) {
                if (users[i].id == user.id) {
                    users.erase(users.begin() + i);
                    break;
                }
            }
        }
    private:
        vector<User> users;
}pool;

class MatchHandler : virtual public MatchIf {
    public:
        MatchHandler() {
            // Your initialization goes here
        }

        /**
         * user: 添加的用戶信息
         * info: 附加信息
         * 在匹配池中添加一名用戶
         * 
         * @param user
         * @param info
         */
        int32_t add_user(const User& user, const std::string& info) {
            // Your implementation goes here
            printf("add_user\n");

            //通過消息隊列中的鎖將方法鎖着。
            //好處:你不需要進行解鎖操作,當方法執行完畢,這個變量就會自動注銷
            unique_lock<mutex> lck(message_queue.m);
            message_queue.q.push({user, "add"});
            //喚醒所有條件變量
            message_queue.cv.notify_all();

            return 0;
        }

        /**
         * user: 刪除的用戶信息
         * info: 附加信息
         * 從匹配池中刪除一名用戶
         * 
         * @param user
         * @param info
         */
        int32_t remove_user(const User& user, const std::string& info) {
            // Your implementation goes here
            printf("remove_user\n");

            unique_lock<mutex> lck(message_queue.m);
            message_queue.q.push({user, "remove"});
            message_queue.cv.notify_all();

            return 0;
        }

};
void consume_task(){
    while (true){
        unique_lock<mutex> lck(message_queue.m);
        if (message_queue.q.empty()){
            /* 因為消費者線程(不止一個)會頻繁判斷隊列是否為空,導致CPU做無用功。
             * 所以使用條件變量的wait()函數可使得當前線程阻塞,直至條件變量喚醒。
             * 當線程阻塞的時候,該函數會自動解鎖,允許其他線程執行。
             **/

            // message_queue.cv.wait(lck);
            lck.unlock();
            pool.match();
            sleep(1);
        } else {
            Task task = message_queue.q.front();
            message_queue.q.pop();
            lck.unlock();

            if (task.type == "add") pool.add(task.user);
            else if (task.type == "remove") pool.remove(task.user);

            pool.match();
        }
    }
}



int main(int argc, char **argv) {
    int port = 9090;
    ::std::shared_ptr<MatchHandler> handler(new MatchHandler());
    ::std::shared_ptr<TProcessor> processor(new MatchProcessor(handler));
    ::std::shared_ptr<TServerTransport> serverTransport(new TServerSocket(port));
    ::std::shared_ptr<TTransportFactory> transportFactory(new TBufferedTransportFactory());
    ::std::shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory());

    TSimpleServer server(processor, serverTransport, transportFactory, protocolFactory);

    printf("Start Match Server\n");

    thread matching_thread(consume_task);

    server.serve();
    return 0;
}


測試一下然后git保存一下

image-20211211161342894

匹配系統升級2:多線程服務器

之前的版本都是用一個線程來add userremove user,想要提高效率和並發量,可以將服務端升級為多線程版本。

  1. 引入官網 C++樣例Server中,main.cpp沒有的頭文件。

  2. main函數中的TSimpleServer即相關函數,替換成官網 C++樣例Server中的main函數中的TThreadedServer相關內容

  3. 官網 C++樣例Server中的工廠類class CalculatorCloneFactory相關內容加進來

  4. 將文件中的所有Calculator替換為Match,在vim中的具體操作為:

    :1,$s/Calculator/Match/g
    
  5. ::shared::SharedServiceIf*改為MatchIf*

修改后的main.cpp為:

// This autogenerated skeleton file illustrates how to build a server.
// You should copy it to another filename to avoid overwriting it.

#include "match_server/Match.h"
#include "save_client/Save.h"
#include <thrift/concurrency/ThreadManager.h>
#include <thrift/concurrency/ThreadFactory.h>
#include <thrift/protocol/TBinaryProtocol.h>
#include <thrift/server/TSimpleServer.h>
#include <thrift/server/TThreadPoolServer.h>
#include <thrift/server/TThreadedServer.h>
#include <thrift/transport/TServerSocket.h>
#include <thrift/transport/TBufferTransports.h>
#include <thrift/transport/TSocket.h>
#include <thrift/transport/TTransportUtils.h>
#include <thrift/TToString.h>

#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <vector>
#include <iostream>
#include <unistd.h>

using namespace ::apache::thrift;
using namespace ::apache::thrift::protocol;
using namespace ::apache::thrift::transport;
using namespace ::apache::thrift::server;

using namespace  ::match_service;
using namespace  ::save_service;
using namespace std;

struct Task
{
    User user;
    string type;
};

struct MessageQueue
{
    queue<Task> q;
    mutex m;
    condition_variable cv;
}message_queue;

class Pool
{
    public:
        void save_result(int a, int b){
            std::shared_ptr<TTransport> socket(new TSocket("123.57.47.211", 9090));
            std::shared_ptr<TTransport> transport(new TBufferedTransport(socket));
            std::shared_ptr<TProtocol> protocol(new TBinaryProtocol(transport));
            SaveClient client(protocol);

            try {
                transport->open();
                int res = client.save_data("acs_2104", "fcf05d68", a, b);
                if (!res) puts("數據存儲成功!");
                else puts("數據存取失敗!");
                transport->close();
            } catch (TException& tx) {
                cout << "ERROR: " << tx.what() << endl;
            }
            printf("Match result : %d, %d\n", a, b);
        }

        void match(){
            while (users.size() > 1){
                sort(users.begin(), users.end(), [&](User& a, User b){
                        return a.score < b.score;
                        });

                bool flag = true;
                for(uint32_t i = 1; i < users.size(); i++){
                    User a = users[i], b = users[i - 1];
                    if (a.score - b.score <= 50) {
                        save_result(a.id, b.id);
                        users.erase(users.begin() + i - 1, users.begin() + i + 1);
                        flag = false;
                        break;
                    }
                }
                if (flag) break;
            }
        }

        void add(User user){
            users.push_back(user);
        }

        void remove(User user){
            for (uint32_t i = 0; i < users.size(); i++) {
                if (users[i].id == user.id) {
                    users.erase(users.begin() + i);
                    break;
                }
            }
        }
    private:
        vector<User> users;
}pool;

class MatchHandler : virtual public MatchIf {
    public:
        MatchHandler() {
            // Your initialization goes here
        }

        /**
         * user: 添加的用戶信息
         * info: 附加信息
         * 在匹配池中添加一名用戶
         * 
         * @param user
         * @param info
         */
        int32_t add_user(const User& user, const std::string& info) {
            // Your implementation goes here
            printf("add_user\n");

            //通過消息隊列中的鎖將方法鎖着。
            //好處:你不需要進行解鎖操作,當方法執行完畢,這個變量就會自動注銷
            unique_lock<mutex> lck(message_queue.m);
            message_queue.q.push({user, "add"});
            //喚醒所有條件變量
            message_queue.cv.notify_all();

            return 0;
        }

        /**
         * user: 刪除的用戶信息
         * info: 附加信息
         * 從匹配池中刪除一名用戶
         * 
         * @param user
         * @param info
         */
        int32_t remove_user(const User& user, const std::string& info) {
            // Your implementation goes here
            printf("remove_user\n");

            unique_lock<mutex> lck(message_queue.m);
            message_queue.q.push({user, "remove"});
            message_queue.cv.notify_all();

            return 0;
        }

};
void consume_task(){
    while (true){
        unique_lock<mutex> lck(message_queue.m);
        if (message_queue.q.empty()){
            /* 因為消費者線程(不止一個)會頻繁判斷隊列是否為空,導致CPU做無用功。
             * 所以使用條件變量的wait()函數可使得當前線程阻塞,直至條件變量喚醒。
             * 當線程阻塞的時候,該函數會自動解鎖,允許其他線程執行。
             **/

            // message_queue.cv.wait(lck);
            lck.unlock();
            pool.match();
            sleep(1);
        } else {
            Task task = message_queue.q.front();
            message_queue.q.pop();
            lck.unlock();

            if (task.type == "add") pool.add(task.user);
            else if (task.type == "remove") pool.remove(task.user);

            pool.match();
        }
    }
}

class MatchCloneFactory : virtual public MatchIfFactory {
    public:
        ~MatchCloneFactory() override = default;
        MatchIf* getHandler(const ::apache::thrift::TConnectionInfo& connInfo) override
        {
            std::shared_ptr<TSocket> sock = std::dynamic_pointer_cast<TSocket>(connInfo.transport);
            cout << "Incoming connection\n";
            cout << "\tSocketInfo: "  << sock->getSocketInfo() << "\n";
            cout << "\tPeerHost: "    << sock->getPeerHost() << "\n";
            cout << "\tPeerAddress: " << sock->getPeerAddress() << "\n";
            cout << "\tPeerPort: "    << sock->getPeerPort() << "\n";
            return new MatchHandler;
        }
        void releaseHandler(MatchIf* handler) override {
            delete handler;
        }
};

int main(int argc, char **argv) {
    TThreadedServer server(
            std::make_shared<MatchProcessorFactory>(std::make_shared<MatchCloneFactory>()),
            std::make_shared<TServerSocket>(9090), //port
            std::make_shared<TBufferedTransportFactory>(),
            std::make_shared<TBinaryProtocolFactory>());

    printf("Start Match Server\n");

    thread matching_thread(consume_task);

    server.serve();
    return 0;
}


編譯運行測試並保存git:

image-20211211165140988

匹配系統升級3:隨時間擴大匹配閾值

匹配機制:等待時間越長,閾值越大。即匹配的范圍隨時間的推移而變大 故需要記錄當前玩家在匹配池中等待的秒數。

// This autogenerated skeleton file illustrates how to build a server.
// You should copy it to another filename to avoid overwriting it.

#include "match_server/Match.h"
#include "save_client/Save.h"
#include <thrift/concurrency/ThreadManager.h>
#include <thrift/concurrency/ThreadFactory.h>
#include <thrift/protocol/TBinaryProtocol.h>
#include <thrift/server/TSimpleServer.h>
#include <thrift/server/TThreadPoolServer.h>
#include <thrift/server/TThreadedServer.h>
#include <thrift/transport/TServerSocket.h>
#include <thrift/transport/TBufferTransports.h>
#include <thrift/transport/TSocket.h>
#include <thrift/transport/TTransportUtils.h>
#include <thrift/TToString.h>

#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <vector>
#include <iostream>
#include <unistd.h>

using namespace ::apache::thrift;
using namespace ::apache::thrift::protocol;
using namespace ::apache::thrift::transport;
using namespace ::apache::thrift::server;

using namespace  ::match_service;
using namespace  ::save_service;
using namespace std;

struct Task
{
    User user;
    string type;
};

struct MessageQueue
{
    queue<Task> q;
    mutex m;
    condition_variable cv;
}message_queue;

class Pool
{
    public:
        void save_result(int a, int b){
            std::shared_ptr<TTransport> socket(new TSocket("123.57.47.211", 9090));
            std::shared_ptr<TTransport> transport(new TBufferedTransport(socket));
            std::shared_ptr<TProtocol> protocol(new TBinaryProtocol(transport));
            SaveClient client(protocol);

            try {
                transport->open();
                int res = client.save_data("acs_2104", "fcf05d68", a, b);
                if (!res) puts("數據存儲成功!");
                else puts("數據存取失敗!");
                transport->close();
            } catch (TException& tx) {
                cout << "ERROR: " << tx.what() << endl;
            }
            printf("Match result : %d, %d\n", a, b);
        }

        bool check_match(int i, int j){
            User a = users[i], b = users[j];
            int dt = abs(a.score - b.score);
            int a_max_dif = wt[i] * 50;
            int b_max_dif = wt[j] * 50;
            return dt <= a_max_dif && dt <= b_max_dif;
        }


        void match(){
            for(uint32_t i = 0; i < wt.size(); i ++)
                wt[i]++; // 等待秒數 + 1

            while (users.size() > 1){

                bool flag = true;
                for (uint32_t i = 0; i < users.size(); i++){
                    for (uint32_t j = i + 1; j < users.size(); j++) {

                        User a = users[i], b = users[j];
                        if (check_match(i, j)){
                            // 注意刪除順序先j后i
                            users.erase(users.begin() + j);
                            users.erase(users.begin() + i);
                            wt.erase(wt.begin() + j);
                            wt.erase(wt.begin() + i);
                            save_result(a.id, b.id);
                            flag = false;
                            break;
                        }
                    }
                    if (!flag) break;
                }
                if (flag) break;
            }
        }

        void add(User user){
            users.push_back(user);
            wt.push_back(0);
        }

        void remove(User user){
            for (uint32_t i = 0; i < users.size(); i++) {
                if (users[i].id == user.id) {
                    users.erase(users.begin() + i);
                    wt.erase(wt.begin() + i);
                    break;
                }
            }
        }
    private:
        vector<User> users;
        vector<int> wt; // 等待時間,單位:s
}pool;

class MatchHandler : virtual public MatchIf {
    public:
        MatchHandler() {
            // Your initialization goes here
        }

        /**
         * user: 添加的用戶信息
         * info: 附加信息
         * 在匹配池中添加一名用戶
         * 
         * @param user
         * @param info
         */
        int32_t add_user(const User& user, const std::string& info) {
            // Your implementation goes here
            printf("add_user\n");

            //通過消息隊列中的鎖將方法鎖着。
            //好處:你不需要進行解鎖操作,當方法執行完畢,這個變量就會自動注銷
            unique_lock<mutex> lck(message_queue.m);
            message_queue.q.push({user, "add"});
            //喚醒所有條件變量
            message_queue.cv.notify_all();

            return 0;
        }

        /**
         * user: 刪除的用戶信息
         * info: 附加信息
         * 從匹配池中刪除一名用戶
         * 
         * @param user
         * @param info
         */
        int32_t remove_user(const User& user, const std::string& info) {
            // Your implementation goes here
            printf("remove_user\n");

            unique_lock<mutex> lck(message_queue.m);
            message_queue.q.push({user, "remove"});
            message_queue.cv.notify_all();

            return 0;
        }

};
void consume_task(){
    while (true){
        unique_lock<mutex> lck(message_queue.m);
        if (message_queue.q.empty()){
            /* 因為消費者線程(不止一個)會頻繁判斷隊列是否為空,導致CPU做無用功。
             * 所以使用條件變量的wait()函數可使得當前線程阻塞,直至條件變量喚醒。
             * 當線程阻塞的時候,該函數會自動解鎖,允許其他線程執行。
             **/

            // message_queue.cv.wait(lck);
            lck.unlock();
            pool.match();
            sleep(1);
        } else {
            Task task = message_queue.q.front();
            message_queue.q.pop();
            lck.unlock();

            if (task.type == "add") pool.add(task.user);
            else if (task.type == "remove") pool.remove(task.user);

        }
    }
}

class MatchCloneFactory : virtual public MatchIfFactory {
    public:
        ~MatchCloneFactory() override = default;
        MatchIf* getHandler(const ::apache::thrift::TConnectionInfo& connInfo) override
        {
            std::shared_ptr<TSocket> sock = std::dynamic_pointer_cast<TSocket>(connInfo.transport);
            cout << "Incoming connection\n";
            cout << "\tSocketInfo: "  << sock->getSocketInfo() << "\n";
            cout << "\tPeerHost: "    << sock->getPeerHost() << "\n";
            cout << "\tPeerAddress: " << sock->getPeerAddress() << "\n";
            cout << "\tPeerPort: "    << sock->getPeerPort() << "\n";
            return new MatchHandler;
        }
        void releaseHandler(MatchIf* handler) override {
            delete handler;
        }
};

int main(int argc, char **argv) {
    TThreadedServer server(
            std::make_shared<MatchProcessorFactory>(std::make_shared<MatchCloneFactory>()),
            std::make_shared<TServerSocket>(9090), //port
            std::make_shared<TBufferedTransportFactory>(),
            std::make_shared<TBinaryProtocolFactory>());

    printf("Start Match Server\n");

    thread matching_thread(consume_task);

    server.serve();
    return 0;
}


額外知識點

c++編譯很慢,所以對於未修改的文件不需要修改

make:識別那些文件修改。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM