教程翻譯自Seastar官方文檔:https://github.com/scylladb/seastar/blob/master/doc/tutorial.md
轉載請注明出處:https://www.cnblogs.com/morningli/p/15963859.html
介紹 Seastar 的網絡堆棧
為了獲得最佳性能,Seastar 的網絡堆棧像 Seastar 應用程序一樣被分片:每個分片(線程)負責連接的不同子集。每個傳入的連接都指向其中一個線程,在建立連接后,它會繼續在同一個線程上處理。
在我們之前看到的示例中,main()
只在第一個線程上運行了一次我們的函數f()
。除非服務器使用"-c1
"選項運行(僅一個線程),否則這將意味着任何到達不同線程的連接都不會被處理。因此,在下面的所有示例中,我們將需要在所有內核上運行相同的服務循環。我們可以使用smp::submit_to
函數輕松做到這一點:
seastar::future<> service_loop();
seastar::future<> f() {
return seastar::parallel_for_each(boost::irange<unsigned>(0, seastar::smp::count),
[] (unsigned c) {
return seastar::smp::submit_to(c, service_loop);
});
}
在這里,我們要求每個 Seastar 內核(從 0 到smp::count
-1)運行相同的函數service_loop()
。這些調用中的每一個都會返回一個future
,f()
會在它們全部返回時返回(在下面的示例中,它們永遠不會返回 —— 我們將在后面的部分中討論關閉服務)。
我們從一個用 Seastar 編寫的 TCP 網絡服務器的簡單示例開始。此服務器反復接受 TCP 端口 1234 上的連接,並返回一個空響應:
#include <seastar/core/seastar.hh>
#include <seastar/core/reactor.hh>
#include <seastar/core/future-util.hh>
#include <seastar/net/api.hh>
seastar::future<> service_loop() {
return seastar::do_with(seastar::listen(seastar::make_ipv4_address({1234})),
[] (auto& listener) {
return seastar::keep_doing([&listener] () {
return listener.accept().then(
[] (seastar::accept_result res) {
std::cout << "Accepted connection from " << res.remote_address << "\n";
});
});
});
}
此代碼的工作原理如下:
listen()
調用創建了一個server_socket
對象 ,listener
,它偵聽 TCP 端口 1234(在任何網絡接口上)。- 我們使用
do_with()
用來確保監聽套接字在整個循環中都存在。 - 為了處理一個連接,我們調用
listener
的accept()
方法。該方法返回一個future<accept_result>
,即最終通過來自客戶端的傳入 TCP 連接 (accept_result.connection
) 以及客戶端的 IP 地址和端口 (accept_result.remote_address
) 進行解析。 - 為了反復接受新的連接,我們使用
keep_doing()
循環成語。keep_doing()
一遍又一遍地運行它的 lambda 參數,一旦上一次迭代返回的future
完成,就開始下一次迭代。只有遇到異常時迭代才會停止。僅當迭代停止時(即僅在異常情況下),keep_doing()
返回的future
才會完成。
此服務器的輸出類似於以下示例:
$ ./a.out
Accepted connection from 127.0.0.1:47578
Accepted connection from 127.0.0.1:47582
...
如果你在殺死之前的服務器后立即運行上面的示例服務器,它經常無法重新啟動,會返回下面的錯誤:
$ ./a.out
program failed with uncaught exception: bind: Address already in use
發生這種情況是因為默認情況下,如果使用該端口的舊連接有任何痕跡,Seastar 將拒絕重用本地端口。在我們這種愚蠢的服務器中,由於服務器是最先關閉連接的一方,每個連接在關閉后都會在“TIME_WAIT
”狀態下徘徊一段時間,這些都阻止了在同一個端口上listen()
的成功。幸運的是,我們可以給listen指定一個選項來忽略這些存在着的TIME_WAIT
。這個選項類似於socket(7)
的SO_REUSEADDR
選項:
seastar::listen_options lo;
lo.reuse_address = true;
return seastar::do_with(seastar::listen(seastar::make_ipv4_address({1234}), lo),
大多數服務器將始終打開reuse_address
監聽選項。Stevens 的《Unix 網絡編程》一書甚至說“所有 TCP 服務器都應指定此套接字選項以允許重新啟動服務器”。因此,未來 Seastar 可能應該默認啟用此選項 —— 即使出於歷史原因,這不是 Linux 套接字 API 中的默認設置。
讓我們通過向每個連接輸出一些預設響應來推進我們的示例服務器,而不是立即用空回復關閉每個連接。
#include <seastar/core/seastar.hh>
#include <seastar/core/reactor.hh>
#include <seastar/core/future-util.hh>
#include <seastar/net/api.hh>
const char* canned_response = "Seastar is the future!\n";
seastar::future<> service_loop() {
seastar::listen_options lo;
lo.reuse_address = true;
return seastar::do_with(seastar::listen(seastar::make_ipv4_address({1234}), lo),
[] (auto& listener) {
return seastar::keep_doing([&listener] () {
return listener.accept().then(
[] (seastar::accept_result res) {
auto s = std::move(res.connection);
auto out = s.output();
return seastar::do_with(std::move(s), std::move(out),
[] (auto& s, auto& out) {
return out.write(canned_response).then([&out] {
return out.close();
});
});
});
});
});
}
這段代碼的新部分以connected_socket
的output()開始,它返回一個output_stream<char>
對象。在這個輸出流out
上,我們可以使用write()
方法編寫我們的響應。看似簡單的write()
操作,其實是一個復雜的后台異步操作,可能會導致根據需要發送、重傳等多個數據包。write()
返回一個future
告訴我們什么時候可以再次write()
到這個輸出流;這並不一定保證遠程對等方接收到我們發送給它的所有數據,但它保證輸出流有足夠的緩沖區空間(或者在 TCP 的情況下,TCP 擁塞窗口中有足夠的空間)允許另一個寫入開始。
在write()
響應out
之后,示例代碼調用out.close()
並等待它返回的future
。這是必要的,因為write()
嘗試批量寫入,所以此時可能還沒有向 TCP 堆棧寫入任何內容,只有當 close()
結束時,我們才能確定我們寫入輸出流的所有數據實際上已經到達 TCP stack —— 只有在這一點上,我們才能最終處理out
和s
對象。
事實上,這個服務器返回了預期的響應:
$ telnet localhost 1234
...
Seastar is the future!
Connection closed by foreign host.
在上面的例子中,我們只看到了對套接字的寫入。真正的服務器也需要從套接字中讀取。connected_socket
的input()
方法返回一個可用於從套接字讀取的input_stream<char>
對象。從此流中讀取數據的最簡單方法是使用read()
方法,這個方法會返回一個future``temporary_buffer<char>
,該方法包含從套接字讀取的更多字節 —— 或遠程端關閉連接時的空緩沖區。
temporary_buffer<char>
是一種用來傳遞僅臨時需要的字節緩沖區(例如,在處理請求時)的方便且安全的方式。一旦該對象超出范圍(通過正常返回或異常),它持有的內存就會自動釋放。也可以通過std::move()
來轉移緩沖區的所有權。我們將在后面的部分中更詳細地討論temporary_buffer
。
讓我們看一個涉及讀取和寫入的簡單示例服務器。這是一個簡單的回顯服務器,如 RFC 862 中所述:服務器偵聽來自客戶端的連接,一旦建立連接,接收到的任何數據都會被簡單地返回 —— 直到客戶端關閉連接。
#include <seastar/core/seastar.hh>
#include <seastar/core/reactor.hh>
#include <seastar/core/future-util.hh>
#include <seastar/net/api.hh>
seastar::future<> handle_connection(seastar::connected_socket s,
seastar::socket_address a) {
auto out = s.output();
auto in = s.input();
return do_with(std::move(s), std::move(out), std::move(in),
[] (auto& s, auto& out, auto& in) {
return seastar::repeat([&out, &in] {
return in.read().then([&out] (auto buf) {
if (buf) {
return out.write(std::move(buf)).then([&out] {
return out.flush();
}).then([] {
return seastar::stop_iteration::no;
});
} else {
return seastar::make_ready_future<seastar::stop_iteration>(
seastar::stop_iteration::yes);
}
});
}).then([&out] {
return out.close();
});
});
}
seastar::future<> service_loop_3() {
seastar::listen_options lo;
lo.reuse_address = true;
return seastar::do_with(seastar::listen(seastar::make_ipv4_address({1234}), lo),
[] (auto& listener) {
return seastar::keep_doing([&listener] () {
return listener.accept().then(
[] (seastar::accept_result res) {
// Note we ignore, not return, the future returned by
// handle_connection(), so we do not wait for one
// connection to be handled before accepting the next one.
(void)handle_connection(std::move(res.connection), std::move(res.remote_address)).handle_exception(
[] (std::exception_ptr ep) {
fmt::print(stderr, "Could not handle connection: {}\n", ep);
});
});
});
});
}
主函數service_loop()
循環接受新的連接,並為每個連接調用handle_connection()
來處理這個連接。當處理這個連接完成時,我們handle_connection()
返回一個future
說明這個連接什么時候處理完成,但重要的是,我們不等待這個future
:記住,keep_doing
只有當前一個迭代返回的future
被解決時,才會開始下一個迭代。因為我們希望允許並行正在進行的連接,我們不希望下一個accept()
等到之前接受的連接關閉。所以我們調用handle_connection()
來開始處理連接,但沒有從continuation
中返回任何東西,這會立即解決這個future
,所以keep_doing
將繼續下一個accept()
。
這展示了在 Seastar 中運行並行fiber
(continuation
鏈)是多么容易—— 當continuation
運行異步函數但忽略它返回的future
時,異步操作將並行繼續,但從不等待。
默默地忽略異常通常是錯誤的,所以如果我們忽略的future
可能會用異常解決,建議處理這種情況,例如使用handle_exception()``continuation
。在我們的例子中,一個失敗的連接是沒問題的(例如,客戶端可能會關閉它的連接,我們會發送它輸出),所以我們沒有費心去處理這個異常。
handle_connection()
函數本身很簡單 —— 它在輸入流上反復調用 read()
,以接收帶有一些數據的temporary_buffer
,然后將此臨時緩沖區move
到對輸出流的write()
調用中。緩沖區最終將在write()
完成后自動釋放。當read()
最終返回一個表示輸入結束的空緩沖區時,我們repeat通過返回stop_iteration::yes
來停止迭代。
分片服務(sharded services)
在上一節中,我們看到 Seastar 應用程序通常需要在所有可用的 CPU 內核上運行其代碼。我們看到seastar::smp::submit_to()
函數允許最初僅在第一個內核(core
)上運行的main函數在所有seastar::smp::count
個內核上啟動服務器的代碼。
但是,通常不僅需要在每個內核上運行代碼,還需要有一個包含該代碼狀態的對象。此外,人們可能喜歡與那些不同的對象進行交互,並且還具有一種機制來停止在不同內核上運行的服務。
seastar::sharded<T>
模板提供了一種結構化的方式來創建這樣的sharded service
。它在每個核心中創建一個單獨的T
類型的對象,並提供與這些副本交互的機制,在每個核心上啟動一些代碼,最后徹底停止服務。
要使用seastar::sharded
,首先要為在單核上保存服務狀態的對象創建一個類。例如:
#include <seastar/core/future.hh>
#include <iostream>
class my_service {
public:
std::string _str;
my_service(const std::string& str) : _str(str) { }
seastar::future<> run() {
std::cerr << "running on " << seastar::engine().cpu_id() <<
", _str = " << _str << \n";
return seastar::make_ready_future<>();
}
seastar::future<> stop() {
return seastar::make_ready_future<>();
}
};
該對象中唯一必須要實現的方法是stop()
,當我們想要停止分片服務並希望等到它在所有核心上停止時,它將在每個核心中調用。
現在讓我們看看如何使用它:
#include <seastar/core/sharded.hh>
seastar::sharded<my_service> s;
seastar::future<> f() {
return s.start(std::string("hello")).then([] {
return s.invoke_on_all([] (my_service& local_service) {
return local_service.run();
});
}).then([] {
return s.stop();
});
}
s.start()
通過在每個核心上創建一個my_service
對象來啟動服務。s.start()
的參數,如果有的話(在這個例子中,std::string("hello")
),被傳遞給my_service
的構造函數。
但s.start()
還沒有開始運行任何代碼(除了對象的構造函數)。為此,我們有在所有內核上運行給定的 lambda 的s.invoke_on_all()
—— 為每個 lambda 提供該內核上的本地對象my_service
。在這個例子中,我們對每個對象都有一個run()
方法,所以我們運行它。
最后,在運行結束時,我們想讓所有核心上的服務有機會干凈地關閉,所以我們調用s.stop()
。這將調用每個核心對象的``方法,並等待它們全部完成。s
銷毀前調用s.stop()
是強制性的 —— 如果您忘記這樣做,Seastar 會警告您。
除了在所有分片上運行相同的代碼的invoke_on_all()
之外,分片服務通常需要的另一個功能是在一個分片上調用另一個特定分片的代碼。這是通過調用分片服務的invoke_on()
方法來完成的。例如:
seastar::sharded<my_service> s;
...
return s.invoke_on(0, [] (my_service& local_service) {
std::cerr << "invoked on " << seastar::engine().cpu_id() <<
", _str = " << local_service._str << "\n";
});
這將在分片 0 上運行 lambda 函數,並引用該分片上的本地my_service
對象。
命令行選項
標准 Seastar 命令行選項
所有 Seastar 應用程序都接受一組標准的命令行參數,例如我們在上面已經看到的那些:-c
用於控制使用的線程數的選項,或-m
用於確定分配給應用程序的內存量的選項。
每個 Seastar 應用程序還接受-h
(or --help
) 選項,它列出並解釋了所有可用選項 —— 標准 Seastar 選項和用戶定義選項,如下所述。
用戶定義的命令行選項
Seastar 在傳遞給app_template::run()
時解析命令行選項 (argv[]
) ,尋找自己的標准選項。因此,不建議應用程序嘗試自行解析argv[]
,因為應用程序可能無法理解某些標准 Seastar 選項並且無法正確跳過它們。
相反,想要擁有自己的命令行選項的應用程序應該告訴 Seastar 的命令行解析器這些額外的應用程序特定選項,並要求 Seastar 的命令行解析器也識別它們。Seastar 的命令行解析器實際上是 Boost 庫的boost::program_options
. 應用程序通過使用app_template
的add_options()
和add_positional_options()
方法添加自己的選項來定義選項,然后調用configuration()
以檢索這些選項的設置。例如,
#include <iostream>
#include <seastar/core/app-template.hh>
#include <seastar/core/reactor.hh>
int main(int argc, char** argv) {
seastar::app_template app;
namespace bpo = boost::program_options;
app.add_options()
("flag", "some optional flag")
("size,s", bpo::value<int>()->default_value(100), "size")
;
app.add_positional_options({
{ "filename", bpo::value<std::vector<seastar::sstring>>()->default_value({}),
"sstable files to verify", -1}
});
app.run(argc, argv, [&app] {
auto& args = app.configuration();
if (args.count("flag")) {
std::cout << "Flag is on\n";
}
std::cout << "Size is " << args["size"].as<int>() << "\n";
auto& filenames = args["filename"].as<std::vector<seastar::sstring>>();
for (auto&& fn : filenames) {
std::cout << fn << "\n";
}
return seastar::make_ready_future<>();
});
return 0;
}
在這個例子中,我們通過add_options()
添加兩個特定於應用程序的選項:--flag
是一個不帶任何附加參數的可選參數,--size
(或-s
)采用一個整數值,默認(如果缺少此選項)為 100。此外,我們通過add_positional_options()
詢問不以“ -
”開頭的無限數量的參數——所謂的positional arguments——被收集到“filename”選項下的字符串向量中。該程序的一些示例輸出:
$ ./a.out
Size is 100
$ ./a.out --flag
Flag is on
Size is 100
$ ./a.out --flag -s 3
Flag is on
Size is 3
$ ./a.out --size 3 hello hi
Size is 3
hello
hi
$ ./a.out --filename hello --size 3 hi
Size is 3
hello
hi
boost::program_options具有更強大的功能,例如所需選項,選項檢查和組合,各種選項類型等等。請參閱 Boost 的文檔以獲取更多信息。
promise 對象
正如我們在上面已經定義的那樣,異步函數(asynchronous function
),也稱為promise
,是一個返回未來並安排這個未來最終被解決的函數。正如我們已經看到的,一個異步函數通常是根據其他異步函數來編寫的,例如我們看到了等待現有異步函數sleep()完成,然后返回 3 的函數slow()
:
seastar::future<int> slow() {
using namespace std::chrono_literals;
return seastar::sleep(100ms).then([] { return 3; });
}
編寫promise
的最基本構建塊是promise
對象,它是一個promise<T>
類型的對象。promise<T>
有一個返回future
的方法future<T> get_future()
和一個來解決這個future
的方法set_value(T)
。一個異步函數可以創建一個promise
對象,返回它的future
,以及最終調用set_value
方法——這將最終解決它返回的future
。
Seastar 中的內存分配
每線程內存分配
Seastar 要求對應用程序進行分片,即運行在不同線程上的代碼對內存中的不同對象進行操作。我們已經在 [Seastar memory] 中看到 Seastar 如何接管給定數量的內存(通常是機器的大部分內存)並將其平均分配給不同的線程。現代多插槽機器具有非統一內存訪問(NUMA),這意味着內存的某些部分更接近某些內核,Seastar 在線程之間划分內存時會考慮到這一點。目前,線程之間的內存分配是靜態的,並且是相等的——線程預計會經歷大致相等的負載量並且需要大致相等的內存量。
為了實現這種按線程分配,Seastar 重新定義了 C 庫函數malloc()
、free()
和它們的眾多相關函數 --- calloc()
、realloc()
、posix_memalign()
、memalign()
、malloc_usable_size()
和malloc_trim()
。它還重新定義了 C++ 內存分配函數 、operator new
及其operator delete
所有變體(包括數組版本、C++14 delete要求size,以及 C++17 變體要求所需的對齊方式)。
重要的是要記住 Seastar 的不同線程可以看到其他線程分配的內存,但強烈建議不要實際這樣做。在現代多核機器上的線程之間共享數據對象會導致鎖定、內存屏障和高速緩存行彈跳導致嚴重的性能損失。相反,Seastar 鼓勵應用程序盡可能避免在線程之間共享對象(通過分片——每個線程擁有對象的一個子集),當線程確實需要交互時,它們會使用submit_to()
通過顯式消息傳遞來進行交互,正如我們之后將看到的那樣。
外來指針
在一個線程上分配的對象將歸該線程所有,最終應由同一線程釋放。強烈建議不要在錯誤的線程上釋放內存,但目前為了支持 Seastar 無法控制的庫代碼,這是支持的(盡管速度很慢)。比如std::exception_ptr
分配內存;因此,如果我們在遠程線程上調用異步操作並且該操作返回異常,則當我們釋放返回的std::exception_ptr
時,這將發生在“錯誤”的核心上。所以 Seastar 允許這樣做,但效率低下。
在大多數情況下,對象應該將其整個生命周期都花在一個線程上,並且只能由該線程使用。但是在某些情況下,我們希望將在一個線程上開始其生命的對象的所有權重新分配給另一個線程。這可以使用seastar::foreign_ptr<>
。 指向對象的指針或智能指針包含在seastar::foreign_ptr<P>
。然后可以將該包裝器移動到在不同線程中運行的代碼中(例如,使用submit_to()
)。
最常見的用例是seastar::foreign_ptr<std::unique_ptr<T>>
。接收到foreign_ptr
的線程將獲得該對象的獨占使用權,當它銷毀這個包裝器時,它會回到原來的線程銷毀該對象。請注意,該對象不僅在原始分片上被釋放 - 它還在那里被銷毀(即,它的析構函數運行)。當對象的析構函數需要訪問屬於原始分片的其他狀態時,這通常很重要 - 例如,將自身與容器取消鏈接。
雖然foreign_ptr
確保對象的析構函數自動在對象的主線程上運行,但它並不能免除用戶擔心在何處運行對象的其他方法的麻煩。一些簡單的方法,例如,從對象的字段中讀取的方法,可以在接收線程上運行。但是,其他方法可能需要訪問對象的主分片所擁有的其他數據,或者需要防止並發操作。即使我們確定該對象現在僅由接收線程使用,這些方法仍必須在主線程上顯式運行:
// fp is some foreign_ptr<>
return smp::submit_to(fp.get_owner_shard(), [p=fp.get()]
{ return p->some_method(); });
所以seastar::foreign_ptr<>不僅有功能上的好處(即在主分片上運行析構函數),它還有文檔上的好處——它警告程序員每次使用對象時都要小心,這是一個外部指針,如果我們想要要對指向的對象做任何重要的事情,我們可能需要在home shard
上做。
上面,我們討論了通過seastar::foreign_ptr<std::unique_ptr<T>>
將對象的所有權轉移到另一個分片的情況。但是,有時發送者不想放棄對象的所有權。有時,它希望遠程線程對其對象進行操作,並返回完整的對象。有時,它想將同一個對象發送到多個分片。在這種情況下,可以使用seastar::foreign_ptr<seastar::lw_shared_ptr<T>>
。使用者當然也要小心,不要從多個線程並行操作同一個對象。如果這不能通過程序邏輯來保證,必須使用一些串行化的方法——比如在home shard
使用上述的 submit_to()
來運行這些操作。
通常, seastar::foreign_ptr
不能被復制 - 只能move
。但是,當它擁有一個可以復制的智能指針(即shared_ptr
)時,可能需要制作該指針的額外副本並創建第二個foreign_ptr
。這樣做是低效且異步的(它需要與對象的原始所有者通信以創建副本),因此需要顯式使用方法future<foreign_ptr> copy()
而不是普通的復制構造函數。
Seastar::thread
Seastar 使用future
和continuation
的編程模型是非常強大和高效的。然而,正如我們在上面的示例中已經看到的那樣,它也相對冗長:每次在進行計算之前我們需要等待,我們需要編寫另一個continuation
。我們還需要擔心在不同continuation
之間傳遞數據(使用[生命周期管理]部分中描述的技術)。簡單的流控制結構(如循環)也更多地使用continuation
。例如,考慮這個簡單的經典同步代碼:
std::cout << "Hi.\n";
for (int i = 1; i < 4; i++) {
sleep(1);
std::cout << i << "\n";
}
在 Seastar 中,使用future
和continuation
,我們需要這樣寫:
std::cout << "Hi.\n";
return seastar::do_for_each(boost::counting_iterator<int>(1),
boost::counting_iterator<int>(4), [] (int i) {
return seastar::sleep(std::chrono::seconds(1)).then([i] {
std::cout << i << "\n";
});
});
但是 Seastar 還允許通過 seastar::thread
編寫看起來更像同步代碼的代碼。seastar::thread·提供了一個允許阻塞的執行環境;您可以發出一個異步函數,並在同一個函數中等待它,而不是建立一個要調用的回調future<>::then():
seastar::thread th([] {
std::cout << "Hi.\n";
for (int i = 1; i < 4; i++) {
seastar::sleep(std::chrono::seconds(1)).get();
std::cout << i << "\n";
}
});
seastar::thread
不是一個單獨的操作系統線程。它仍然使用在 Seastar 的單線程(每個核心)上安排的continuation
。它的工作原理如下:
seastar::thread
分配一個 128KB的堆棧,並運行給定的函數,直到它在future
的get()方法的調用上阻塞。在seastar::thread
上下文之外,get()
只能在已經可用的未來上調用。但是在線程內部,調用一個還不可用的future
上的get()
會停止運行線程函數,並為這個future
安排一個continuation
,當future
可用時繼續運行線程的函數(在同一個已保存的堆棧上)。
就像正常的 Seastar·continuation
一樣,seastar::threads
總是在啟動它們的同一內核上運行。它們也是合作的:它們永遠不會被搶占,除非seastar::future::get()
阻塞或顯式調用seastar::thread::yield()
.
值得重申的是,seastar::thread
不是 POSIX 線程,它只能阻塞 Seastar future
,不能阻塞系統調用。上面的例子使用seastar::sleep()了 ,而不是sleep()
系統調用。seastar::thread
的函數可以正常拋出和捕獲異常。請記住,如果future
以異常解決,get()
將拋出異常。
除seastar::future::get()
之外,我們還可以使用seastar::future::wait()
等待而不獲取future
的結果。當您想要避免在future
失敗時拋出異常時,這有時會很有用(就像get()
這樣)。例如:
future<char> getchar();
int try_getchar() noexcept { // run this in seastar::thread context
future fut = get_char();
fut.wait();
if (fut.failed()) {
return -1;
} else {
// Here we already know that get() will return immediately,
// and will not throw.
return fut.get();
}
}
開始和結束 seastar::thread
創建seastar::thread
對象后,我們需要使用它的方法join()
等到它結束。我們還需要讓該對象保持活動狀態直到join()
完成。一個seastar::thread
的完整的使用示例如下所示:
#include <seastar/core/sleep.hh>
#include <seastar/core/thread.hh>
seastar::future<> f() {
seastar::thread th([] {
std::cout << "Hi.\n";
for (int i = 1; i < 4; i++) {
seastar::sleep(std::chrono::seconds(1)).get();
std::cout << i << "\n";
}
});
return do_with(std::move(th), [] (auto& th) {
return th.join();
});
}
seastar::async()
函數提供了一個方便的快捷方式來創建seastar::thread
和返回一個future
,它在線程完成時解決:
#include <seastar/core/sleep.hh>
#include <seastar/core/thread.hh>
seastar::future<> f() {
return seastar::async([] {
std::cout << "Hi.\n";
for (int i = 1; i < 4; i++) {
seastar::sleep(std::chrono::seconds(1)).get();
std::cout << i << "\n";
}
});
}
seastar::async()
的 lambda 可能會返回一個值,並在seastar::async()
完成時返回它。例如:
seastar::future<seastar::sstring> read_file(sstring file_name) {
return seastar::async([file_name] () { // lambda executed in a thread
file f = seastar::open_file_dma(file_name).get0(); // get0() call "blocks"
auto buf = f.dma_read(0, 512).get0(); // "block" again
return seastar::sstring(buf.get(), buf.size());
});
};
雖然seastar::threads
和seastar::async()
使編程更方便,但它們也增加了直接使用延續編程之外的開銷。最值得注意的是,每個seastar::thread
堆棧都需要額外的內存。因此,使用 seastar::thread
來處理高度並發的操作並不是一個好主意。例如,如果您需要處理 10,000 個並發請求,請不要使用seastar::thread
來處理每個 --- 使用 future
和 continuation
。但是,如果您正在編寫您知道只有少數實例會同時運行的代碼,例如,在您的應用程序中進行后台清理操作,那么這seastar::thread
是一個很好的匹配。seastar::thread
也適用於不關心性能的代碼——例如測試代碼。
應用程序組件的隔離
Seastar 讓多任務處理變得非常簡單——就像運行異步函數一樣簡單。因此,服務器很容易並行執行許多不相關的事情。例如,一個服務器可能正在響應 100 個用戶的請求,同時也在進行一些長時間的后台操作。
但是在上面的例子中,后台操作會得到多少百分比的 CPU 和磁盤吞吐量呢?用戶的其中一個請求可以被后台操作延遲多長時間?如果沒有我們在本節中描述的機制,就無法可靠地回答這些問題:
- 后台操作可能是一個非常“周到”的單個
fiber
,即運行一個很短的continuation
,然后安排下一個continuation
稍后運行。在每一點,調度程序都會看到 100 個請求處理continuation
,並且只有一個准備好運行的后台延續。后台任務獲得大約 1% 的 CPU 時間,用戶的請求幾乎沒有延遲。 - 另一方面,后台操作可能會並行生成 1,000 個
fiber
,並且每次都有 1,000 個准備運行的continuation
。后台操作將獲得大約 90% 的運行時間,而處理用戶請求的continuation
可能會卡在 1,000 個這些后台continuation
之后,並經歷巨大的延遲。
復雜的 Seastar 應用程序通常具有不同的組件,這些組件並行運行並具有不同的性能目標。在上面的例子中,我們看到了兩個組件——用戶請求和后台操作。我們在本節中描述的機制的第一個目標是將每個組件的性能與其他組件隔離開來;換句話說,一個組件的吞吐量和延遲不應該取決於另一個組件做出的決定——例如,它並行運行多少個continuation
。第二個目標是允許應用程序控制這種隔離,例如,在上面的示例中,允許應用程序顯式控制后台操作接收的 CPU 數量,以便它以所需的速度完成。
在上面的示例中,我們使用 CPU 時間作為不同組件需要有效共享的有限資源。正如我們稍后將展示的,另一個重要的共享資源是磁盤 I/O。
調度組(CPU 調度程序)
考慮下面的異步函數loop()
,它循環直到某個共享變量stop
變為真。它保留迭代次數counter
直到停止,並在最終停止時返回此計數器。
seastar::future<long> loop(int parallelism, bool& stop) {
return seastar::do_with(0L, [parallelism, &stop] (long& counter) {
return seastar::parallel_for_each(boost::irange<unsigned>(0, parallelism),
[&stop, &counter] (unsigned c) {
return seastar::do_until([&stop] { return stop; }, [&counter] {
++counter;
return seastar::make_ready_future<>();
});
}).then([&counter] { return counter; });
});
}
parallelism
參數決定了計數操作的並行性:parallelism=1
意味着我們只有一個循環遞增計數器;parallelism=10
意味着我們並行啟動 10 個循環,所有循環都遞增相同的計數器。
如果我們並行啟動兩個loop()
調用並讓它們運行 10 秒會發生什么?
seastar::future<> f() {
return seastar::do_with(false, [] (bool& stop) {
seastar::sleep(std::chrono::seconds(10)).then([&stop] {
stop = true;
});
return seastar::when_all_succeed(loop(1, stop), loop(1, stop)).then(
[] (long n1, long n2) {
std::cout << "Counters: " << n1 << ", " << n2 << "\n";
});
});
}
事實證明,如果這兩個loop()
調用具有相同的並行度1
,它們的工作量大致相同:
Counters: 3'559'635'758, 3'254'521'376
但是,例如,如果我們將 loop(1)
與loop(10)
並行運行,則結果是loop(10)
完成了10 倍的工作:
Counters: 629'482'397, 6'320'167'297
為什么 loop(1)
可以在 10 秒內完成的工作量取決於其競爭對手選擇的並行度,我們如何解決這個問題?
發生這種情況的原因如下:當一個future
解析並且一個continuation
鏈接到它時,這個continuation
就可以運行了。默認情況下,Seastar 的調度程序保留一個准備運行的continuation
列表(當然在每個分片中),並按照它們准備好運行的相同順序運行continuation
。在上面的例子中,loop(1)總是有一個准備運行的continuation
,但是loop(10),並行運行 10 個循環,總是有十個准備運行的continuation
。因此,對於loop(1)
的每一個continuation
,Seastar 的默認調度程序將運行loop(10)的10個continuation
,這就是loop(10)
完成 10倍的工作的原因。
為了解決這個問題,Seastar 允許應用程序定義稱為調度組的單獨組件,每個組件都有一個單獨的准備運行延續列表。每個調度組都可以在所需的 CPU 時間百分比上運行自己的continuation
,但是一個調度組中可運行的continuation
的數量不會影響另一個調度組獲得的CPU量。讓我們看看這是如何完成的:
調度組由scheduling_group
類型的值定義。這個值是不透明的,但在內部它是一個小整數(類似於 Linux 中的進程 ID)。我們使用seastar::with_scheduling_group()
函數在所需的調度組中運行代碼:
seastar::future<long>
loop_in_sg(int parallelism, bool& stop, seastar::scheduling_group sg) {
return seastar::with_scheduling_group(sg, [parallelism, &stop] {
return loop(parallelism, stop);
});
}
現在讓我們創建兩個調度組,在第一個調度組運行loop(1)
和第二個調度組中運行loop(10)
:
seastar::future<> f() {
return seastar::when_all_succeed(
seastar::create_scheduling_group("loop1", 100),
seastar::create_scheduling_group("loop2", 100)).then(
[] (seastar::scheduling_group sg1, seastar::scheduling_group sg2) {
return seastar::do_with(false, [sg1, sg2] (bool& stop) {
seastar::sleep(std::chrono::seconds(10)).then([&stop] {
stop = true;
});
return seastar::when_all_succeed(loop_in_sg(1, stop, sg1), loop_in_sg(10, stop, sg2)).then(
[] (long n1, long n2) {
std::cout << "Counters: " << n1 << ", " << n2 << "\n";
});
});
});
}
在這里,我們創建了兩個調度組,sg1
和sg2
. 每個調度組都有一個名稱(僅用於診斷目的)和份額(shares)數,通常介於 1 和 1000 之間:如果一個調度組的份額數是第二個調度組的兩倍,它將得到CPU 時間的兩倍。在此示例中,我們為兩個組使用了相同數量的份額 (100),因此它們應該獲得相同的 CPU 時間。
與 Seastar 中的大多數對象每個分片都是獨立的不同,Seastar 希望調度組的標識和編號在所有分片上都相同,因為在遠程分片上調用任務時這很重要。因此,創建調度組的函數seastar::create_scheduling_group()
是一個返回future<scheduling_group>
的異步函數。
運行上面的示例,兩個調度組都設置了相同數量的份額(100),確實會導致兩個調度組獲得相同數量的 CPU 時間:
Counters: 3'353'900'256, 3'350'871'461
注意現在兩個循環如何完成相同數量的工作——盡管一個循環的並行度是第二個循環的 10 倍。
如果我們將第二個調度組的定義更改為擁有 200 個份額,是第一個調度組份額數的兩倍,我們將看到第二個調度組獲得兩倍的 CPU 時間:
Counters: 2'273'783'385, 4'549'995'716