轉自 https://blog.csdn.net/weixin_37097605/article/details/101488760
SPDK Thread 模型是SPDK誕生以來十分重要的模塊,它的設計確保了spdk應用的無鎖化編程模型,本文基於spdk最新的release 19.07版本介紹了整體thread模型的設計與實現,並詳細分析了NVMe-oF的使用案例。
SPDK Thread 模型設計與實現
Reactor – 單個CPU Core抽象,主要包含了:
- Lcore對應的CPU Core id
- Threads在該核心下的線程
- Events 這是一個spdk ring,用於事件傳遞接收
Thread – 線程,但它是
spdk抽象出來的線程,主要包含了:
- io_channels資源的抽象,可以是bdev,也可以是具體的tgt
- tailq 線程隊列,用於連接下一個線程
- name 線程的名稱
- Stats 用於計時統計閑置和忙時時間的
- active_pollers 輪詢使用的poller,非定時
- timer_pollers 定時的poller
- messages 這是一個spdk ring,用於消息傳遞接收
- msg_cache 事件的緩存
1.1 Reactor
對象g_reactor_state有五個狀態對應了應用中reactors運行運行狀態,
enum spdk_reactor_state {
SPDK_REACTOR_STATE_INVALID = 0,
SPDK_REACTOR_STATE_INITIALIZED = 1,
SPDK_REACTOR_STATE_RUNNING = 2,
SPDK_REACTOR_STATE_EXITING = 3,
SPDK_REACTOR_STATE_SHUTDOWN = 4,
};
初始情況下是:
SPDK_REACTOR_STATE_INVALID狀態,在spdk app(任意一個target,比如nvmf_tgt)啟動時,即調用了spdk_app_start方法,會調用spdk_reactors_init,在這個方法中將會初始化所有需要被初始化的reactors(可以在配置文件中指定需要使用的Core,
CPU Core 和reactor是一對一的)。並且會將g_reactor_state設置為SPDK_REACTOR_STATE_INITIALIZED。具體代碼如下:
1 Int spdk_reactors_init(void) 2 { 3 // 初始化所有的event mempool 4 g_spdk_event_mempool = spdk_mempool_create(…); 5 // 為g_reactors分配內存,g_reactors是一個數組,管理了所有的reactors 6 posix_memalign((void **)&g_reactors, 64, (last_core + 1) * sizeof(struct spdk_reactor)); 7 // 這里設置了reactor創建線程的方法,之后需要初始化線程的時候將會調用該方法 8 spdk_thread_lib_init(spdk_reactor_schedule_thread, sizeof(struct spdk_lw_thread)); 9 // 對於每一個啟動的reactor,將會初始化它們 10 // 初始化reactor過程,即為綁定lcore,初始化spdk ring、threads,對rusage無操作 11 SPDK_ENV_FOREACH_CORE(i) { 12 reactor = spdk_reactor_get(i); 13 spdk_reactor_construct(reactor, i); 14 } 15 // 設置好狀態返回 16 g_reactor_state = SPDK_REACTOR_STATE_INITIALIZED; 17 return 0; 18 }
在進入
SPDK_REACTOR_STATE_INITIALIZED狀態且spdk_app_start在創建了自己的線程並綁定到了reactors后,會調用spdk_reactors_start方法並將g_reactor_state設置為SPDK_REACTOR_STATE_RUNNING狀態並會創建所有reactor的線程且輪詢。
1 Void spdk_reactors_start(void) { 2 SPDK_ENV_FOREACH_CORE(i) { 3 if (i != current_core) { // 在非master reactor中 4 reactor = spdk_reactor_get(i); // 得到相應的reactor 5 // 設置好線程創建后的一個消息,該消息為輪詢函數 6 rc = spdk_env_thread_launch_pinned(reactor->lcore, _spdk_reactor_run, reactor); 7 // reactor創建好線程並且會自動執行第一個消息 8 spdk_thread_create(thread_name, tmp_cpumask); 9 } 10 } 11 // 當前CPU core得到reactor,並且開始輪詢 12 reactor = spdk_reactor_get(current_core); 13 _spdk_reactor_run(reactor); 14 }
之前提到spdk_reactors_init方法中調用了spdk_thread_lib_init方法傳入了創建thread的spdk_reactor_schedule_thread方法,在調用spdk_thread_create會回調該方法。這個方法它主要的功能就是告訴這個新創建的線程綁定創建該線程的reactor。
1 spdk_reactor_schedule_thread(struct spdk_thread *thread) 2 { 3 // 得到該線程設置的cpu mask 4 cpumask = spdk_thread_get_cpumask(thread); 5 for (i = 0; i < spdk_env_get_core_count(); i++) { 6 …. // 遍歷cpu core 7 // 通過cpu mask找到對應的核心,並產生event 8 if (spdk_cpuset_get_cpu(cpumask, core)) { 9 evt = spdk_event_allocate(core, _schedule_thread, lw_thread, NULL); 10 break; 11 } 12 } 13 // 傳遞該event,即對應的reatcor會調用_schedule_thread方法, 14 spdk_event_call(evt); 15 } 16 _schedule_thread(void *arg1, void *arg2) 17 { 18 struct spdk_lw_thread *lw_thread = arg1; 19 struct spdk_reactor *reactor; 20 // 消息傳遞到對應的reactor后將該thread加入到reactor中 21 reactor = spdk_reactor_get(spdk_env_get_current_core()); 22 TAILQ_INSERT_TAIL(&reactor->threads, lw_thread, link); 23 } 24 在SPDK_REACTOR_STATE_RUNNING后,此時所有reactor就進入了輪詢狀態。_spdk_reactor_run函數為線程提供了輪詢方法: 25 static int _spdk_reactor_run(void *arg) { 26 while (1) { 27 // 處理reactor上的event消息,消息會在之后講到 28 _spdk_event_queue_run_batch(reactor); 29 // 每一個reactor上注冊的thread進行遍歷並且處理poller事件 30 TAILQ_FOREACH_SAFE(lw_thread, &reactor->threads, link, tmp) { 31 rc = spdk_thread_poll(thread, 0, now); 32 } 33 // 檢查reactor的狀態 34 if (g_reactor_state != SPDK_REACTOR_STATE_RUNNING) { 35 break; 36 } 37 } 38 }
而當spdk app被調用spdk_app_stop方法后將會相應的通知每一個reactor調用spdk_reactors_stop方法,將g_reactor_state賦值為SPDK_REACTOR_STATE_EXITING,即開始退出了。回到_spdk_reactor_run函數中,輪詢將會被跳出,並且執行銷毀線程的代碼。
1 static int _spdk_reactor_run(void *arg) { 2 ….. // 輪詢 3 TAILQ_FOREACH_SAFE(lw_thread, &reactor->threads, link, tmp) { 4 thread = spdk_thread_get_from_ctx(lw_thread); 5 TAILQ_REMOVE(&reactor->threads, lw_thread, link); 6 spdk_set_thread(thread); 7 spdk_thread_exit(thread); 8 spdk_thread_destroy(thread); 9 } 10 }
在這之后,主線程的_spdk_reactor_run會返回到spdk_reactors_start中,並將g_reactor_state賦值為SPDK_REACTOR_STATE_SHUTDOWN,返回到spdk_app_start中等待應用退出。
最后,總結一下reactors和CPU core以及spdk thread關系應該如圖1所示

圖1 CPU cores、reactors和thread關系圖
Reactor生命周期流程圖則如圖2所示
圖2 reactor生命周期流程圖
1.2 thread
當Reactors進行輪詢時,除了處理自己的事件消息之外,還會
調用注冊在該reactor下面的每一個線程進行輪詢。不過
通常一個reactor只有一個thread,在spdk應用中,更多的是注冊多個poller而不是注冊多個thread。具體的輪詢方法為:
1 Int spdk_thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now) { 2 // 首先先處理ring傳遞過來的消息 3 msg_count = _spdk_msg_queue_run_batch(thread, max_msgs); 4 // 調用非定時poller中的方法 5 TAILQ_FOREACH_REVERSE_SAFE(poller, &thread->active_pollers, 6 active_pollers_head, tailq, tmp) { 7 // 調用poller注冊的方法之前,會對poller狀態檢測且轉換 8 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { 9 TAILQ_REMOVE(&thread->active_pollers, poller, tailq); 10 free(poller); 11 continue; 12 } 13 poller->state = SPDK_POLLER_STATE_RUNNING; 14 // 調用poller注冊的方法 15 poller_rc = poller->fn(poller->arg); 16 // poller轉換狀態 17 poller->state = SPDK_POLLER_STATE_WAITING; 18 } 19 // 調用定時poller中的方法 20 TAILQ_FOREACH_SAFE(poller, &thread->timer_pollers, tailq, tmp) { 21 // 類似非定時poller過程,不過會檢查是否到了預定的時間 22 if (now < poller->next_run_tick) break; 23 } 24 // 最后統計時間 25 }
Io_device 和 io_channel在thread中也是非常重要的概念。它們的實現都在thread.c中,io_device是設備的抽象,io_channel是對該設備通道的抽象。一個線程可以創建多個io_channel . io_channel只能和一個io_device綁定,並且這個io_channel是別的線程使用不了的。

圖 3 io_device、io_channel和線程關系圖
Io_device結構
1 struct io_device { 2 void *io_device; // 抽象的device指針 3 char name[SPDK_MAX_DEVICE_NAME_LEN + 1]; // 名字 4 spdk_io_channel_create_cb create_cb; // io_channel創建的回調函數 5 spdk_io_channel_destroy_cb destroy_cb; // io_channel銷毀的回調函數 6 spdk_io_device_unregister_cb unregister_cb; // io_device解綁的回調函數 7 struct spdk_thread *unregister_thread; // 不使用該device線程 8 uint32_t ctx_size; // ctx的大小,將會傳給io_channel處理 9 uint32_t for_each_count; // io_channel的數量 10 TAILQ_ENTRY(io_device) tailq; // device隊列頭 11 uint32_t refcnt; // 計數器 12 bool unregistered; // 是否該device被注冊 13 };
可以看到,io_device實際上只提供了一些自身io_device的操作和io_channel相關的方法,具體的io_device實體其實是那個名字叫io_device的void指針。因為thread中的io_device只提供了thread這一層接口,具體的io操作每一個設備很難被抽象出來,所以這一層的接口只負責管理io_channel的創建、銷毀和綁定等。
1 Io_channel的結構 2 struct spdk_io_channel { 3 struct spdk_thread *thread; // 綁定的線程 4 struct io_device *dev; // 綁定的io_device 5 uint32_t ref; // io_channel引用計數 6 uint32_t destroy_ref; // destroy前被引用的次數 7 TAILQ_ENTRY(spdk_io_channel) tailq; // io_channel 隊列頭 8 spdk_io_channel_destroy_cb destroy_cb; // io_channel銷毀的回調函數 9 };
雖然io_channel看起來是很簡單的結構體,實際上在創建一個io_device的時候,會要求使用者傳入一個io_channel_ctx的大小作為調用的參數,而在給io_channel分配內存的時候,除了分配本身io_channel結構體的大小外,還會額外分配一個io_channel_ctx的大小,這個context可以理解成一個void指針,當用戶在使用io_channel的時候,實際上還是通過context的部分去訪問io_device。
NVMe-oF實例
nvmf_tgt 是spdk中一個重要的模塊,這里詳細的寫一下它作為一個target實例是如何使用thread、io_device以及io_channel的。
在spdk應用剛啟動的時候,reactor模塊就會自動加載起來,然后在加載nvmf subsystem的時候,會調用spdk_nvmf_subsystem_init(lib/event/subsystems/nvmf/nvmf_tgt.c)方法,nvmf_tgt其實也是有生命周期,並且有一個狀態機去管理它的生命周期。
1 enum nvmf_tgt_state { 2 NVMF_TGT_INIT_NONE = 0, // 最初的狀態 3 NVMF_TGT_INIT_PARSE_CONFIG, // 解析配置文件 4 NVMF_TGT_INIT_CREATE_POLL_GROUPS, // 創建poll groups 5 NVMF_TGT_INIT_START_SUBSYSTEMS, // 啟動subsystem 6 NVMF_TGT_INIT_START_ACCEPTOR, // 開始接收 7 NVMF_TGT_RUNNING, // running 8 NVMF_TGT_FINI_STOP_SUBSYSTEMS, 9 NVMF_TGT_FINI_DESTROY_POLL_GROUPS, 10 NVMF_TGT_FINI_STOP_ACCEPTOR, 11 NVMF_TGT_FINI_FREE_RESOURCES, 12 NVMF_TGT_STOPPED, 13 NVMF_TGT_ERROR, 14 };
首先在
NVMF_TGT_INIT_PARSE_CONFIG狀態中,nvmf_tgt會去解析啟動時傳入的配置文件,當解析了[nvmf]這個label后,會調用spdk_nvmf_tgt_create這個方法,這個方法將初
始化了全局的g_nvmf_tgt變量,同時也將tgt注冊成了一個io_device。
1 spdk_io_device_register(tgt, 2 spdk_nvmf_tgt_create_poll_group, 3 spdk_nvmf_tgt_destroy_poll_group, 4 sizeof(struct spdk_nvmf_poll_group), 5 "nvmf_tgt");
spdk_nvmf_tgt_create_poll_group和spdk_nvmf_tgt_destroy_poll_group是io_channel創建和銷毀的回調方法(在
spdk_get_io_channel時調用 create_cb)。第三個參數是io_channel_ctx的size,既然這里傳入了spdk_nvmf_poll_group的大小,那么很明顯說明
在nvmf中io_channel_ctx對象就是spdk_nvmf_poll_group。
當config文件解析完了之后,nvmf_tgt狀態到了
NVMF_TGT_INIT_CREATE_POLL_GROUPS,這個狀態下會為每一個線程都創建相應的poll group。
1 spdk_for_each_thread(nvmf_tgt_create_poll_group, 2 NULL, 3 nvmf_tgt_create_poll_group_done); 4 static void nvmf_tgt_create_poll_group(void *ctx) 5 { 6 struct nvmf_tgt_poll_group *pg; 7 …. 8 pg->thread = spdk_get_thread(); 9 pg->group = spdk_nvmf_poll_group_create(g_spdk_nvmf_tgt); 10 …. 11 }
再看spdk_nvmf_poll_group_create中,
1 struct spdk_nvmf_poll_group * spdk_nvmf_poll_group_create(struct spdk_nvmf_tgt *tgt) 2 { 3 struct spdk_io_channel *ch; 4 ch = spdk_get_io_channel(tgt); 5 …. 6 return spdk_io_channel_get_ctx(ch); 7 }
在spdk_get_io_channel中,會先去檢查傳入的io_device是不是已經注冊好了的,如果已經注冊了,將會創建一個新的io_channel返回,
創建的過程會回調在注冊io_device時注冊的io_channel創建方法(即方法spdk_nvmf_tgt_create_poll_group)。
1 static int spdk_nvmf_tgt_create_poll_group(void *io_device, void *ctx_buf) 2 { 3 ….. // 初始化transport 、nvmf subsystem等 4 // 注冊一個poller 5 group->poller = spdk_poller_register(spdk_nvmf_poll_group_poll, group, 0); 6 group->thread = spdk_get_thread(); 7 return 0; 8 }
在spdk_nvmf_poll_group_poll中,因為spdk_nvmf_poll_group對象中有transport的poll group,所以它會調用對應的transport的poll_group_poll方法,比如rdma的poll_group_poll就會輪詢rdma注冊的poller處理每個在相應的qpair來的請求,進入rdma的狀態機將請求處理好。
然后這個狀態就結束了,之后再初始化好了nvmf subsystem相關的東西之后,到了狀態NVMF_TGT_INIT_START_ACCEPTOR。在這個狀態中,只注冊了一個poller。
1 g_acceptor_poller = spdk_poller_register(acceptor_poll, g_spdk_nvmf_tgt, 2 g_spdk_nvmf_tgt_conf->acceptor_poll_rate);
這個poller調用的transport的方法,不
斷的監聽是不是有新的fd連接進來,如果有就調用new_qpair的回調。
總結
spdk thread 模型是spdk無鎖化的基礎,在一個線程中,當分配一個任務后,一直會運行到任務結束為止,這確保了不需要進行線程之間的切換而帶來額外的損耗。同時,高效的spdk ring提供了不同線程之間的消息傳遞,這就使得任務結束的結果可以高效的傳遞給別的處理線程。而io_device和io_channel的設計保證了資源的抽象訪問以及獨立的路徑不去爭搶資源池,並且塊設備由於是對塊進行操作的所以也十分適合抽象成io_device。正是因為以上幾點才讓spdk線程模型能夠達到無鎖化且為多個target提供了基礎線程框架的支持。
————————————————
版權聲明:本文為CSDN博主「weixin_37097605」的原創文章,遵循CC 4.0 BY-SA版權協議,轉載請附上原文出處鏈接及本聲明。
原文鏈接:https://blog.csdn.net/weixin_37097605/java/article/details/101488760
————————————————
版權聲明:本文為CSDN博主「weixin_37097605」的原創文章,遵循CC 4.0 BY-SA版權協議,轉載請附上原文出處鏈接及本聲明。
原文鏈接:https://blog.csdn.net/weixin_37097605/java/article/details/101488760