kyber調度器原理及源碼分析


linux內核塊層有kyber、mq-deadline以及bfq三個針對multi queue設計的調度器,這篇文章主要是講解kyber調度器的原理和源碼,分析的內核版本是4.20。

原理

我們知道當Block層不使用任何的調度器的時候,調度隊列是按照每個cpu核一個軟隊列,一定數量的硬隊列,並將軟隊列和硬隊列建立一定的映射關系“map”來調度IO的,通過當前的cpu_id能找到per_cpu的軟隊列,在軟隊列里面完成插入、合並等動作,然后通過map[cpu_id]找到其映射的硬隊列,從而將IO分發到硬隊列,最后硬隊列將IO下發到驅動層,由驅動層將IO通過某個總線(PCIe、FC、RoCE等)發送到設備層。

當kyber調度器啟用時,kyber舍棄軟隊列,創建read、write、discard、other四個隊列將IO分類處理。kyber不是消耗光了某個隊列再去分發下一個隊列,而是消耗到一定的個數就切換到下一個隊列,從而防止后面的隊列被餓死,這個個數分別是16、8、1、1,也就是分發了16個讀IO之后去分發寫,分發了8個寫之后再分發一個discard,最后分發一個other的IO,以此類推循環。

kyber是怎么找到應該往哪個硬隊列分發呢?

答案是根據當前cpu_id找到軟隊列,然后通過記錄在軟隊列里面的“該軟隊列在其對應的硬隊列里面的所有跟這個硬隊列關聯的軟隊列的編號”就能找到其對應的硬隊列。前面這句話有點繞,解釋一下,系統內最好的情況下是軟隊列個數等於硬隊列個數,這樣軟隊列和硬隊列就能一一對應,但是通常情況下受限於實際的塊設備的處理能力,硬隊列個數往往小於軟隊列的個數,因此需要將多個軟隊列跟一個硬隊列映射綁定,形成多對一的關系,在硬隊列里面有個數組struct blk_mq_ctx ctxs就記錄了該硬隊列對應的所有軟隊列,其下標則存在於軟隊列結構體里面unsigned int index_hw,詳細可參考源碼函數blk_mq_map_swqueue()。

下面說說kyber的這個“分發隊列”,讓人反直覺的是request(block層io結構體)不是直接insert到分發隊列的,而是先insert到“暫存隊列”(我們暫時給它這么個名字,后面有詳細的結構體),在暫存隊列IO被合並、分類,然后當分發隊列為空而需要分發一個IO的時候就會將暫存隊列的IO都掛到分發隊列上,然后選擇一個IO分發到硬隊列。

image

​ 圖中我們假設cpu0、cpu1對應這個硬隊列。

掛入分發隊列的IO個數理論是無限的,雖然每輪只會分發固定數量的IO,但是一輪循環完畢就會立馬進行下一輪循環,而此時前面分發的IO可能都還沒有回來,就沒有起到控制的作用了。kyber針對read、write、discard、other分別設置了總的token數目,分別為256、128、64、16,當要分發一個IO到硬隊列的時候,先得拿到這個token,然后才能分發,如果當前token已經耗光,也就是說底層硬處理地慢而上層下發地快了,就要將當前的分發隊列掛起,在下層的某個IO執行完畢回來后看這個IO的類型的分發隊列是否掛起的,有則將其喚醒去分發IO。

此外kyber還會統計IO的時延,並且針對read、write、discard設置了時延參考值,分別為2000000nsec、10000000nsec、5000000nsec。kyber通過將統計的時延與參考值進行對比,動態地調整每種類型IO的token數目,以求公平。

數據結構

kyber_queue_data

kyber_queue_data是kyber的主要數據結構之一,當將塊設備的調度器切換到kyber時就會生成這樣的結構體存放在elevator_queue的elevator_data中,通過request_queue可以找到elevator_queue,自然就能找到kyber_queue_data。

struct kyber_queue_data {
	struct request_queue *q; // 塊設備對應一個request_queue,也就對應一個kyber_queue_data

	/*
	 * Each scheduling domain has a limited number of in-flight requests
	 * device-wide, limited by these tokens.
	 */
    // 每種隊列的token占用情況,分發IO時從這里申請
	struct sbitmap_queue domain_tokens[KYBER_NUM_DOMAINS];

	/*
	 * Async request percentage, converted to per-word depth for
	 * sbitmap_get_shallow().
	 */
    // 用該值限制異步請求的帶寬,防止同步請求被餓死
	unsigned int async_depth;
	// per_cpu地統計時延信息,IO完成時就會統計時延到這里
	struct kyber_cpu_latency __percpu *cpu_latency;

	/* Timer for stats aggregation and adjusting domain tokens. */
    // 有一個timer,每隔一段時間統計一下時延情況,根據統計情況調整token數量
	struct timer_list timer;
	// 上面的timer每隔一段時間會將per_cpu統計的時延加到這里面計算新的token數量
	unsigned int latency_buckets[KYBER_OTHER][2][KYBER_LATENCY_BUCKETS];
	// 記錄上一次調整token的時間
	unsigned long latency_timeout[KYBER_OTHER];
	// 記錄上一次timer得到的時延好壞結果,如果下一次timer的時候時延的樣本數量不夠則用這一次的
    // 下一次timer如果采用了這一次計算的延時好壞結果,則將其值置為-1,不再使用,只使用一次
	int domain_p99[KYBER_OTHER];

	/* Target latencies in nanoseconds. */
    // 每種IO類型的時延參考值
	u64 latency_targets[KYBER_OTHER];
};

kyber_hctx_data

存放於硬隊列blk_mq_hw_ctx的sched_data字段,包含了原理一章中提到的“暫存隊列”和“分發隊列”。

struct kyber_hctx_data {
	spinlock_t lock;
	struct list_head rqs[KYBER_NUM_DOMAINS]; // 分發隊列,IO從這個隊列提交到硬隊列
	unsigned int cur_domain; // 當前分發的是read、write、discard還是other
	unsigned int batching; 
	struct kyber_ctx_queue *kcqs; // 暫存隊列,硬隊列對應的軟隊列有多少個就有多少個暫存隊列
    // kcq_map用於表示暫存隊列上是否有IO
	struct sbitmap kcq_map[KYBER_NUM_DOMAINS];
 	// 當token耗光的時候,分發隊列進入wait狀態,等待回來的io釋放token將之喚醒,以下是這個流程需要用到的結構
	wait_queue_entry_t domain_wait[KYBER_NUM_DOMAINS];
	struct sbq_wait_state *domain_ws[KYBER_NUM_DOMAINS];
	atomic_t wait_index[KYBER_NUM_DOMAINS];
};
struct kyber_ctx_queue { // 暫存隊列,由此可見一個暫存隊列就有4個種類的隊列
	/*
	 * Used to ensure operations on rq_list and kcq_map to be an atmoic one.
	 * Also protect the rqs on rq_list when merge.
	 */
	spinlock_t lock;
	struct list_head rq_list[KYBER_NUM_DOMAINS];
} ____cacheline_aligned_in_smp;

初始化

初始化kyber

當kyber模塊注冊時回調用kyber_init()函數注冊kyber給elevator,包括kyber的名字、屬性、debugfs相關信息、owner以及最重要的hook。

static int __init kyber_init(void)
{
	return elv_register(&kyber_sched);
}
static struct elevator_type kyber_sched = { // 這些hook在后續的章節中會一一講到
	.ops.mq = {
		.init_sched = kyber_init_sched,
		.exit_sched = kyber_exit_sched,
		.init_hctx = kyber_init_hctx,
		.exit_hctx = kyber_exit_hctx, // 以上是初始化和釋放kyber內部結構體的函數
		.limit_depth = kyber_limit_depth, // 限制request隊列的深度
		.bio_merge = kyber_bio_merge, // 將bio合並到kyber
		.prepare_request = kyber_prepare_request, // 初始化request時調用
		.insert_requests = kyber_insert_requests, // 將request插入到kyber
		.finish_request = kyber_finish_request, // 釋放request時調用
		.requeue_request = kyber_finish_request, // 將request重新入隊時調用
		.completed_request = kyber_completed_request, // request完成時調用
		.dispatch_request = kyber_dispatch_request, // 分發request
		.has_work = kyber_has_work, // kyber是否有未分發的request
	},
	.uses_mq = true,
#ifdef CONFIG_BLK_DEBUG_FS
	.queue_debugfs_attrs = kyber_queue_debugfs_attrs,
	.hctx_debugfs_attrs = kyber_hctx_debugfs_attrs,
#endif
	.elevator_attrs = kyber_sched_attrs,
	.elevator_name = "kyber",
	.elevator_owner = THIS_MODULE,
};

初始化kyber_queue_data

當塊設備的調度器被設置成kyber時會調用kyber_init_sched()函數初始化kyber_queue_data,將kyber_queue_data與request_queue綁定。

static int kyber_init_sched(struct request_queue *q, struct elevator_type *e)
{
	struct kyber_queue_data *kqd;
	struct elevator_queue *eq;

	eq = elevator_alloc(q, e); // 申請電梯隊列結構,request可通過該結構訪問到kyber_queue_data
	if (!eq)
		return -ENOMEM;

	kqd = kyber_queue_data_alloc(q); // 申請kyber_queue_data並初始化
	if (IS_ERR(kqd)) {
		kobject_put(&eq->kobj);
		return PTR_ERR(kqd);
	}

	blk_stat_enable_accounting(q);

	eq->elevator_data = kqd; // 將kyber_queue_data和elevator_queue綁定
	q->elevator = eq; // 將elevator_queue和request_queue綁定

	return 0;
}

kyber_queue_data_alloc函數

static struct kyber_queue_data *kyber_queue_data_alloc(struct request_queue *q)
{
	struct kyber_queue_data *kqd;
	unsigned int shift;
	int ret = -ENOMEM;
	int i;

	kqd = kzalloc_node(sizeof(*kqd), GFP_KERNEL, q->node);
	if (!kqd)
		goto err;

	kqd->q = q; // 將kyber_queue_data與request_queue綁定
	// 初始化per_cpu的時延統計
	kqd->cpu_latency = alloc_percpu_gfp(struct kyber_cpu_latency,
					    GFP_KERNEL | __GFP_ZERO);
	if (!kqd->cpu_latency)
		goto err_kqd;
	// 初始化時延統計timer
    // kyber_timer_fn會將所有per_cpu的時延統計加起來,然后通過一定的算法調整每個類型隊列的    token數目
	timer_setup(&kqd->timer, kyber_timer_fn, 0);
    
	// 初始化每種隊列的token數,kyber_depth全局變量顯示為256、128、64、16
    // token的申請、掛起、釋放是通過sbitmap實現的
	for (i = 0; i < KYBER_NUM_DOMAINS; i++) {
		WARN_ON(!kyber_depth[i]);
		WARN_ON(!kyber_batch_size[i]);
		ret = sbitmap_queue_init_node(&kqd->domain_tokens[i],
					      kyber_depth[i], -1, false,
					      GFP_KERNEL, q->node);
		if (ret) {
			while (--i >= 0)
				sbitmap_queue_free(&kqd->domain_tokens[i]);
			goto err_buckets;
		}
	}
	// 初始化總的時延統計和每種隊列的時延參考值(kyber_latency_targets)
	for (i = 0; i < KYBER_OTHER; i++) {
		kqd->domain_p99[i] = -1;
		kqd->latency_targets[i] = kyber_latency_targets[i];
	}

	shift = kyber_sched_tags_shift(q);
    // 限制異步IO的帶寬為百分之七十五,留百分之二十五給同步IO,防止同步IO被餓死
	kqd->async_depth = (1U << shift) * KYBER_ASYNC_PERCENT / 100U;

	return kqd;

err_buckets:
	free_percpu(kqd->cpu_latency);
err_kqd:
	kfree(kqd);
err:
	return ERR_PTR(ret);
}

初始化kyber_hctx_data

static int kyber_init_hctx(struct blk_mq_hw_ctx *hctx, unsigned int hctx_idx)
{
	struct kyber_queue_data *kqd = hctx->queue->elevator->elevator_data;
	struct kyber_hctx_data *khd;
	int i;
	// 申請kyber_hctx_data
	khd = kmalloc_node(sizeof(*khd), GFP_KERNEL, hctx->numa_node);
	if (!khd)
		return -ENOMEM;
	// 申請hctx->nr_ctx個kyber_ctx_queue(暫存隊列)
    // hctx->nr_ctx就是與硬隊列對應的軟隊列的個數
	khd->kcqs = kmalloc_array_node(hctx->nr_ctx,
				       sizeof(struct kyber_ctx_queue),
				       GFP_KERNEL, hctx->numa_node);
	if (!khd->kcqs)
		goto err_khd;
	// 初始化暫存隊列
	for (i = 0; i < hctx->nr_ctx; i++)
		kyber_ctx_queue_init(&khd->kcqs[i]);
	// 初始化kcq_map,用來記錄暫存隊列上是否有IO掛着,如果有當分發隊列為空時就從暫存隊列將IO都取到分發隊列。
	for (i = 0; i < KYBER_NUM_DOMAINS; i++) {
		if (sbitmap_init_node(&khd->kcq_map[i], hctx->nr_ctx,
				      ilog2(8), GFP_KERNEL, hctx->numa_node)) {
			while (--i >= 0)
				sbitmap_free(&khd->kcq_map[i]);
			goto err_kcqs;
		}
	}

	spin_lock_init(&khd->lock);
	
	for (i = 0; i < KYBER_NUM_DOMAINS; i++) {
        // 初始化分發隊列
		INIT_LIST_HEAD(&khd->rqs[i]);
        // 初始化token消耗完了時掛起的IO被喚醒的函數
		init_waitqueue_func_entry(&khd->domain_wait[i],
					  kyber_domain_wake);
		khd->domain_wait[i].private = hctx;
        // 掛起的IO就掛在這里
		INIT_LIST_HEAD(&khd->domain_wait[i].entry);
		atomic_set(&khd->wait_index[i], 0);
	}

	khd->cur_domain = 0;
	khd->batching = 0;

	hctx->sched_data = khd;
	sbitmap_queue_min_shallow_depth(&hctx->sched_tags->bitmap_tags,
					kqd->async_depth);

	return 0;

err_kcqs:
	kfree(khd->kcqs);
err_khd:
	kfree(khd);
	return -ENOMEM;
}

bio合並入kyber

當一個bio來到塊層時,首先看看這個bio是否能夠合並到當前已有的request里面。

static bool kyber_bio_merge(struct blk_mq_hw_ctx *hctx, struct bio *bio)
{
	struct kyber_hctx_data *khd = hctx->sched_data;
	struct blk_mq_ctx *ctx = blk_mq_get_ctx(hctx->queue);
    // 根據軟隊列在硬隊列里的下標找到應該合並哪個暫存隊列
	struct kyber_ctx_queue *kcq = &khd->kcqs[ctx->index_hw];
    // 根據op flag找到是read、write、discard還是other
	unsigned int sched_domain = kyber_sched_domain(bio->bi_opf);
    // 是什么類型的IO就嘗試合並到什么類型的暫存隊列
	struct list_head *rq_list = &kcq->rq_list[sched_domain];
	bool merged;

	spin_lock(&kcq->lock);
    // 調用block層通用函數去合並bio到某個request
    // blk_mq_bio_list_merge會從后往前遍歷隊列,檢查8次能否合並
	merged = blk_mq_bio_list_merge(hctx->queue, rq_list, bio);
	spin_unlock(&kcq->lock);
	blk_mq_put_ctx(ctx);

	return merged;
}

request插入到kyber

當發現bio並不能合並到已有的request時,根據這個bio生成一個新的request,並且將這個request插入到kyber當前的隊列里面。

生成時調用prepare函數進行request的調度器相關的初始化。

static void kyber_prepare_request(struct request *rq, struct bio *bio)
{
    // 設置token為-1,表示還未分配token
	rq_set_domain_token(rq, -1);
}

插入到相應的隊列上:

static void kyber_insert_requests(struct blk_mq_hw_ctx *hctx,
				  struct list_head *rq_list, bool at_head)
{
	struct kyber_hctx_data *khd = hctx->sched_data;
	struct request *rq, *next;

	list_for_each_entry_safe(rq, next, rq_list, queuelist) {
        // 與bio合並時同理找到sched_domain和kyber_ctx_queue
		unsigned int sched_domain = kyber_sched_domain(rq->cmd_flags);
		struct kyber_ctx_queue *kcq = &khd->kcqs[rq->mq_ctx->index_hw];
		struct list_head *head = &kcq->rq_list[sched_domain];

		spin_lock(&kcq->lock);
        // 將request插入到隊列上
		if (at_head)
			list_move(&rq->queuelist, head);
		else
			list_move_tail(&rq->queuelist, head);
        // 設置bit表示暫存隊列上有request
		sbitmap_set_bit(&khd->kcq_map[sched_domain],
				rq->mq_ctx->index_hw);
		blk_mq_sched_request_inserted(rq);
		spin_unlock(&kcq->lock);
	}
}

kyber分發request

kyber注冊的分發hook為kyber_dispatch_request()函數。kyber采用round robin的方式遍歷分發隊列的read、write、discard、other隊列,選擇一個IO分發到硬隊列,當分發隊列上沒有IO時會遍歷與這個分發隊列相關聯的所有暫存隊列,將暫存隊列上的所有IO都轉到分發隊列上,然后再看有沒有IO可以分發的。

static struct request *kyber_dispatch_request(struct blk_mq_hw_ctx *hctx)
{
	struct kyber_queue_data *kqd = hctx->queue->elevator->elevator_data;
	struct kyber_hctx_data *khd = hctx->sched_data;
	struct request *rq;
	int i;

	spin_lock(&khd->lock);

	/*
	 * First, if we are still entitled to batch, try to dispatch a request
	 * from the batch.
	 */
    // 如果當前隊列派發的IO個數還沒有達到最大值則繼續派發當前隊列的IO
	if (khd->batching < kyber_batch_size[khd->cur_domain]) {
		rq = kyber_dispatch_cur_domain(kqd, khd, hctx);
		if (rq)
			goto out;
	}

	/*
	 * Either,
	 * 1. We were no longer entitled to a batch.
	 * 2. The domain we were batching didn't have any requests.
	 * 3. The domain we were batching was out of tokens.
	 *
	 * Start another batch. Note that this wraps back around to the original
	 * domain if no other domains have requests or tokens.
	 */
    // 否則將batching置為0,選擇下一個派發的隊列,如果當前已經是other隊列了,
    // 則跳到第一個的read隊列
	khd->batching = 0;
	for (i = 0; i < KYBER_NUM_DOMAINS; i++) {
		if (khd->cur_domain == KYBER_NUM_DOMAINS - 1)
			khd->cur_domain = 0;
		else
			khd->cur_domain++;

		rq = kyber_dispatch_cur_domain(kqd, khd, hctx);
		if (rq)
			goto out;
	}

	rq = NULL;
out:
	spin_unlock(&khd->lock);
	return rq;
}

kyber_dispatch_cur_domain()函數

static struct request *
kyber_dispatch_cur_domain(struct kyber_queue_data *kqd,
			  struct kyber_hctx_data *khd,
			  struct blk_mq_hw_ctx *hctx)
{
	struct list_head *rqs;
	struct request *rq;
	int nr;

    // 獲取當前的分發隊列
	rqs = &khd->rqs[khd->cur_domain];

	/*
	 * If we already have a flushed request, then we just need to get a
	 * token for it. Otherwise, if there are pending requests in the kcqs,
	 * flush the kcqs, but only if we can get a token. If not, we should
	 * leave the requests in the kcqs so that they can be merged. Note that
	 * khd->lock serializes the flushes, so if we observed any bit set in
	 * the kcq_map, we will always get a request.
	 */
    // 選擇隊列里面第一個IO
	rq = list_first_entry_or_null(rqs, struct request, queuelist);
	if (rq) {
        // 獲取token
		nr = kyber_get_domain_token(kqd, khd, hctx);
		if (nr >= 0) {
            // 獲取到了token
			khd->batching++;
            // 將token保存在request的priv字段里面
			rq_set_domain_token(rq, nr);
            // 從分發隊列上摘鏈
			list_del_init(&rq->queuelist);
			return rq;
		} else {
			trace_kyber_throttled(kqd->q,
					      kyber_domain_names[khd->cur_domain]);
		}
       // kcq_map的bit位被設置表示當前分發的IO類型在暫存隊列是有IO的
       // kcq_map在insert request的時候置的
	} else if (sbitmap_any_bit_set(&khd->kcq_map[khd->cur_domain])) {
       
		nr = kyber_get_domain_token(kqd, khd, hctx);
		if (nr >= 0) {
            // 暫存隊列有IO,並且當前IO類型的token還沒有被消耗完
            // 將暫存隊列的IO轉到分發隊列上
			kyber_flush_busy_kcqs(khd, khd->cur_domain, rqs);
            // IO轉到分發隊列后肯定能獲取到IO進行分發
			rq = list_first_entry(rqs, struct request, queuelist);
			khd->batching++;
			rq_set_domain_token(rq, nr);
			list_del_init(&rq->queuelist);
			return rq;
		} else {
			trace_kyber_throttled(kqd->q,
					      kyber_domain_names[khd->cur_domain]);
		}
	}

	/* There were either no pending requests or no tokens. */
	return NULL;
}

kyber_get_domain_token()函數

static int kyber_get_domain_token(struct kyber_queue_data *kqd,
				  struct kyber_hctx_data *khd,
				  struct blk_mq_hw_ctx *hctx)
{
	unsigned int sched_domain = khd->cur_domain;
    // 根據當前分發的IO類型找到申請token的sbitmap_queue
	struct sbitmap_queue *domain_tokens = &kqd->domain_tokens[sched_domain];
    // 用於將當前分發隊列掛到等待隊列的結構
	wait_queue_entry_t *wait = &khd->domain_wait[sched_domain];
	struct sbq_wait_state *ws;
	int nr;
	// 從sbitmap_queue獲取一個沒有在用的token
	nr = __sbitmap_queue_get(domain_tokens);

	/*
	 * If we failed to get a domain token, make sure the hardware queue is
	 * run when one becomes available. Note that this is serialized on
	 * khd->lock, but we still need to be careful about the waker.
	 */
    // nr < 0表示沒有獲取到token,並且當前分發隊列沒有被掛起
	if (nr < 0 && list_empty_careful(&wait->entry)) {
		ws = sbq_wait_ptr(domain_tokens,
				  &khd->wait_index[sched_domain]);
		khd->domain_ws[sched_domain] = ws;
        // 將當前的分發隊列掛到相應的等待隊列
		add_wait_queue(&ws->wait, wait);

		/*
		 * Try again in case a token was freed before we got on the wait
		 * queue.
		 */
        // 可能在掛起的時候有IO回來釋放了token,在掛起之后再嘗試一次看能否獲取到token
        // 如果能獲取到則從等待隊列上取下
		nr = __sbitmap_queue_get(domain_tokens);
	}

	/*
	 * If we got a token while we were on the wait queue, remove ourselves
	 * from the wait queue to ensure that all wake ups make forward
	 * progress. It's possible that the waker already deleted the entry
	 * between the !list_empty_careful() check and us grabbing the lock, but
	 * list_del_init() is okay with that.
	 */
    // 如果獲取到了token,並且分發隊列是被掛起的
	if (nr >= 0 && !list_empty_careful(&wait->entry)) {
		ws = khd->domain_ws[sched_domain];
		spin_lock_irq(&ws->wait.lock);
        // 將分發隊列從等待隊列上摘下
		list_del_init(&wait->entry);
		spin_unlock_irq(&ws->wait.lock);
	}

	return nr;
}

kyber_flush_busy_kcqs()函數

static void kyber_flush_busy_kcqs(struct kyber_hctx_data *khd,
				  unsigned int sched_domain,
				  struct list_head *list)
{
	struct flush_kcq_data data = {
		.khd = khd,
		.sched_domain = sched_domain,
		.list = list,
	};
	// 這個函數會遍歷當前IO類型的kcq_map的每一個bit,
    // 然后執行flush_busy_kcq函數將暫存隊列的IO都摘到分發隊列上
    // 我們知道一個bit代表一個跟硬隊列對應的CPU核
	sbitmap_for_each_set(&khd->kcq_map[sched_domain],
			     flush_busy_kcq, &data);
}
// bitnr就代表是跟硬隊列對應的第幾個cpu核
static bool flush_busy_kcq(struct sbitmap *sb, unsigned int bitnr, void *data)
{
	struct flush_kcq_data *flush_data = data;
    // 獲取到這個核的暫存隊列
	struct kyber_ctx_queue *kcq = &flush_data->khd->kcqs[bitnr];

	spin_lock(&kcq->lock);
    // 將暫存隊列的sched_domain IO種類的隊列上的IO都摘到分發隊列去,
    // sched_domain是kyber_flush_busy_kcqs函數傳入的
	list_splice_tail_init(&kcq->rq_list[flush_data->sched_domain],
			      flush_data->list);
    // 清理到bit位,表示當前這個核的這個IO類型的暫存隊列已經沒有IO了
	sbitmap_clear_bit(sb, bitnr);
	spin_unlock(&kcq->lock);

	return true;
}

IO結束的時候釋放token,同時這也是kyber的requeue操作的hook,重新進入隊列要釋放掉已經拿到的token。

static void kyber_finish_request(struct request *rq)
{
	struct kyber_queue_data *kqd = rq->q->elevator->elevator_data;
	
    // 釋放掉request的priv字段記錄的token
	rq_clear_domain_token(kqd, rq);
}
static void rq_clear_domain_token(struct kyber_queue_data *kqd,
				  struct request *rq)
{
	unsigned int sched_domain;
	int nr;

	nr = rq_get_domain_token(rq); // token保存在request的priv字段
	if (nr != -1) {
        // 獲取IO類型
		sched_domain = kyber_sched_domain(rq->cmd_flags);
        // 釋放token
		sbitmap_queue_clear(&kqd->domain_tokens[sched_domain], nr,
				    rq->mq_ctx->cpu);
	}
}
void sbitmap_queue_clear(struct sbitmap_queue *sbq, unsigned int nr,
			 unsigned int cpu)
{
	sbitmap_clear_bit_unlock(&sbq->sb, nr); // 清除bit位
	/*
	 * Pairs with the memory barrier in set_current_state() to ensure the
	 * proper ordering of clear_bit_unlock()/waitqueue_active() in the waker
	 * and test_and_set_bit_lock()/prepare_to_wait()/finish_wait() in the
	 * waiter. See the comment on waitqueue_active().
	 */
	smp_mb__after_atomic();
	sbitmap_queue_wake_up(sbq); // 喚醒分發隊列

	if (likely(!sbq->round_robin && nr < sbq->sb.depth))
		*per_cpu_ptr(sbq->alloc_hint, cpu) = nr;
}

喚醒動作會調用在初始化流程注冊的kyber_domain_wake()函數。

static int kyber_domain_wake(wait_queue_entry_t *wait, unsigned mode, int flags,
			     void *key)
{
	struct blk_mq_hw_ctx *hctx = READ_ONCE(wait->private);

	list_del_init(&wait->entry); // 移出等待隊列
	blk_mq_run_hw_queue(hctx, true); // 執行run_hw_queue函數,去分發IO
	return 1;
}

request完成

IO從設備驅動回到block層的時候會調用kyber調度器注冊的hook--completed_request即kyber_completed_request()函數,在完成函數里面統計IO的時延以調整token數量。

kyber會統計兩種時延:

KYBER_TOTAL_LATENCY:表示IO在kernel里的時延,即總的時延。

KYBER_IO_LATENCY:表示IO在設備上的執行時延。

kyber_cpu_latency:

/* buckets是一個atomic_t的三維數組,第一維表示要統計的IO類型,第二維表示是KYBER_TOTAL_LATENCY還是KYBER_IO_LATENCY,第三維表示的是8個IO時延相對於參考值的倍數的桶,先將時延統計到per_cpu的kyber_cpu_latency中,然后通過timer將所有cpu的統計加起來,通過一定的算法調整token數。 */
/*
	KYBER_LATENCY_BUCKETS: 值為8,表示個桶,前KYBER_GOOD_BUCKETS(4)個桶表示“GOOD”的時延,后面4個表示“BAD”的時延。
	GOOD時延表示IO時延小於等於時延參考值。
	BAD時延表示IO時延大於時延參考值。
*/
struct kyber_cpu_latency {
	atomic_t buckets[KYBER_OTHER][2][KYBER_LATENCY_BUCKETS];
};
static void kyber_completed_request(struct request *rq, u64 now)
{
	struct kyber_queue_data *kqd = rq->q->elevator->elevator_data;
	struct kyber_cpu_latency *cpu_latency;
	unsigned int sched_domain;
	u64 target;
	// 獲取完成的IO的類型
	sched_domain = kyber_sched_domain(rq->cmd_flags);
    // other類型的IO不會統計時延,自然也不會去調整其token數量
	if (sched_domain == KYBER_OTHER)
		return;
	// cpu_latency延遲統計是per_cpu的,這里獲取當前CPU的指針,將IO的時延統計到當前CPU
	cpu_latency = get_cpu_ptr(kqd->cpu_latency);
    // 當前IO的類型的延遲參考值
	target = kqd->latency_targets[sched_domain];
    // 統計總的IO時延,與下面對比,now減去的起始時間不一樣的
	add_latency_sample(cpu_latency, sched_domain, KYBER_TOTAL_LATENCY,
			   target, now - rq->start_time_ns);
    // 統計IO在設備上的時延
	add_latency_sample(cpu_latency, sched_domain, KYBER_IO_LATENCY, target,
			   now - rq->io_start_time_ns);
	put_cpu_ptr(kqd->cpu_latency);
	
    // 減小timer的到期時間
	timer_reduce(&kqd->timer, jiffies + HZ / 10);
}

add_latency_sample()統計函數,kyber會統計當前IO的時延相對於參考時延的倍數(0~7倍),然后將這個倍數的計數記錄在per_cpu的kyber_cpu_latency中。

static void add_latency_sample(struct kyber_cpu_latency *cpu_latency,
			       unsigned int sched_domain, unsigned int type,
			       u64 target, u64 latency)
{
	unsigned int bucket;
	u64 divisor;

	if (latency > 0) {
        // 這里將參考值除以4,計算所得的bucket表示時延是
        // <= 1/4 * 時延參考值
        // <= 1/2 * 時延參考值
        // <= 3/4 * 時延參考值
        // <= 時延參考值
        // <= (1 + 1/4) * 時延參考值
        // <= (1 + 1/2) * 時延參考值
        // <= (1 + 3/4) * 時延參考值
        // > 3/4 * 時延參考值
		divisor = max_t(u64, target >> KYBER_LATENCY_SHIFT, 1);
        // 這里將latency減去1,除法所得的值可以准確的落入buckets的下標里面,因為數組下標是從0開始的。 
        // 如果時延過大,除法所得的值超過了7,則約束其到7,這里取一個最小值
        // 大家可以假設一些值來計算一下。
		bucket = min_t(unsigned int, div64_u64(latency - 1, divisor),
			       KYBER_LATENCY_BUCKETS - 1);
	} else {
		bucket = 0;
	}
	// 增加相應的桶的計數
	atomic_inc(&cpu_latency->buckets[sched_domain][type][bucket]);
}

根據時延統計調整token數

在初始化流程的kyber_queue_data_alloc()函數初始化了一個timer,timer每隔一段時間會執行來動態調整token數量,執行的函數為kyber_timer_fn()。

static void kyber_timer_fn(struct timer_list *t)
{
	struct kyber_queue_data *kqd = from_timer(kqd, t, timer);
	unsigned int sched_domain;
	int cpu;
	bool bad = false;

	/* Sum all of the per-cpu latency histograms. */
    // 將per_cpu的統計信息合並到kyber_queue_data的latency_buckets里面
	for_each_online_cpu(cpu) {
		struct kyber_cpu_latency *cpu_latency;

		cpu_latency = per_cpu_ptr(kqd->cpu_latency, cpu);
		for (sched_domain = 0; sched_domain < KYBER_OTHER; sched_domain++) {
			flush_latency_buckets(kqd, cpu_latency, sched_domain,
					      KYBER_TOTAL_LATENCY);
			flush_latency_buckets(kqd, cpu_latency, sched_domain,
					      KYBER_IO_LATENCY);
		}
	}

	/*
	 * Check if any domains have a high I/O latency, which might indicate
	 * congestion in the device. Note that we use the p90; we don't want to
	 * be too sensitive to outliers here.
	 */
    // 看IO在設備上的時延是否是BAD的
	for (sched_domain = 0; sched_domain < KYBER_OTHER; sched_domain++) {
		int p90;

		p90 = calculate_percentile(kqd, sched_domain, KYBER_IO_LATENCY,
					   90);
		if (p90 >= KYBER_GOOD_BUCKETS)
			bad = true;
	}

	/*
	 * Adjust the scheduling domain depths. If we determined that there was
	 * congestion, we throttle all domains with good latencies. Either way,
	 * we ease up on throttling domains with bad latencies.
	 */
	for (sched_domain = 0; sched_domain < KYBER_OTHER; sched_domain++) {
		unsigned int orig_depth, depth;
		int p99;

		p99 = calculate_percentile(kqd, sched_domain,
					   KYBER_TOTAL_LATENCY, 99);
		/*
		 * This is kind of subtle: different domains will not
		 * necessarily have enough samples to calculate the latency
		 * percentiles during the same window, so we have to remember
		 * the p99 for the next time we observe congestion; once we do,
		 * we don't want to throttle again until we get more data, so we
		 * reset it to -1.
		 */
		if (bad) {
            // 當前的樣本數不夠或者timer的時間不夠1s,則返回的p99 < 0
            // 采用上一次timer計算所得的p99
			if (p99 < 0)
				p99 = kqd->domain_p99[sched_domain];
			kqd->domain_p99[sched_domain] = -1;
		} else if (p99 >= 0) {
            // 記錄這一次計算的p99,下一次timer可能用到
			kqd->domain_p99[sched_domain] = p99;
		}
		if (p99 < 0)
			continue;

		/*
		 * If this domain has bad latency, throttle less. Otherwise,
		 * throttle more iff we determined that there is congestion.
		 *
		 * The new depth is scaled linearly with the p99 latency vs the
		 * latency target. E.g., if the p99 is 3/4 of the target, then
		 * we throttle down to 3/4 of the current depth, and if the p99
		 * is 2x the target, then we double the depth.
		 */
         // 根據總的IO時延統計動態調整token數量的算法比較的“啟發式”,也就是根據測試結果和經驗來判斷應該怎樣去調整token的數量
		if (bad || p99 >= KYBER_GOOD_BUCKETS) {
			orig_depth = kqd->domain_tokens[sched_domain].sb.depth;
			depth = (orig_depth * (p99 + 1)) >> KYBER_LATENCY_SHIFT;
			kyber_resize_domain(kqd, sched_domain, depth);
		}
	}
}
// 根據統計的時延計算時延的“好壞”
static int calculate_percentile(struct kyber_queue_data *kqd,
				unsigned int sched_domain, unsigned int type,
				unsigned int percentile)
{
	unsigned int *buckets = kqd->latency_buckets[sched_domain][type];
	unsigned int bucket, samples = 0, percentile_samples;

	for (bucket = 0; bucket < KYBER_LATENCY_BUCKETS; bucket++)
		samples += buckets[bucket];
	// 沒有樣本,表明這段時間沒有IO
	if (!samples)
		return -1;

	/*
	 * We do the calculation once we have 500 samples or one second passes
	 * since the first sample was recorded, whichever comes first.
	 */
    // 記錄超時時間為1s
	if (!kqd->latency_timeout[sched_domain])
		kqd->latency_timeout[sched_domain] = max(jiffies + HZ, 1UL);

    if (samples < 500 &&
	    time_is_after_jiffies(kqd->latency_timeout[sched_domain])) {
		return -1;
	}
	kqd->latency_timeout[sched_domain] = 0;
	// 找到最能代表延遲情況的那個bucket,
    // 至於這里的算法以及為什么要先計算一個百分比就不清楚了,代碼能看懂但卻不知為什么。
	percentile_samples = DIV_ROUND_UP(samples * percentile, 100);
	for (bucket = 0; bucket < KYBER_LATENCY_BUCKETS - 1; bucket++) {
		if (buckets[bucket] >= percentile_samples)
			break;
		percentile_samples -= buckets[bucket];
	}
	memset(buckets, 0, sizeof(kqd->latency_buckets[sched_domain][type]));

	trace_kyber_latency(kqd->q, kyber_domain_names[sched_domain],
			    kyber_latency_type_names[type], percentile,
			    bucket + 1, 1 << KYBER_LATENCY_SHIFT, samples);

	return bucket;
}
static void kyber_resize_domain(struct kyber_queue_data *kqd,
				unsigned int sched_domain, unsigned int depth)
{
    // 新的token數量不會大於最初的初始化值,也不會為0
	depth = clamp(depth, 1U, kyber_depth[sched_domain]);
	if (depth != kqd->domain_tokens[sched_domain].sb.depth) {
		sbitmap_queue_resize(&kqd->domain_tokens[sched_domain], depth);
		trace_kyber_adjust(kqd->q, kyber_domain_names[sched_domain],
				   depth);
	}
}

kyber退出

kyber退出的時候有兩個hook函數需要執行,kyber_exit_sched()和kyber_exit_hctx()。

static void kyber_exit_hctx(struct blk_mq_hw_ctx *hctx, unsigned int hctx_idx)
{
	struct kyber_hctx_data *khd = hctx->sched_data;
	int i;

	for (i = 0; i < KYBER_NUM_DOMAINS; i++)
		sbitmap_free(&khd->kcq_map[i]);
	kfree(khd->kcqs);
	kfree(hctx->sched_data);
}
static void kyber_exit_sched(struct elevator_queue *e)
{
	struct kyber_queue_data *kqd = e->elevator_data;
	int i;

	del_timer_sync(&kqd->timer);

	for (i = 0; i < KYBER_NUM_DOMAINS; i++)
		sbitmap_queue_free(&kqd->domain_tokens[i]);
	free_percpu(kqd->cpu_latency);
	kfree(kqd);
}

釋放掉timer和所有初始化時申請的內存。

總結

kyber調度器是一個適合於高速存儲介質(如NVMe)的IO調度器,文章對其做了機制介紹和源碼分析,關於根據時延統計動態調整token數目的部分還未弄懂開發者的用意,算是這篇文章的一個遺憾,如果有知道的大佬還請不吝賜教。

另外mq-deadline調度器和bfq調度器的分析文章也在撰寫中,bfq比kyber復雜的很多(僅從代碼量上看就能大概推斷),因此可能需要一些時間理解代碼,mq-deadline調度器則會盡快呈現。

參考資料

https://www.dazhuanlan.com/olunx/topics/1559753


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM