linux rcu


RCU(Read-Copy Update)是一種同步機制,通過保存對象的多個副本來保障讀操作的連續性,並保證在預定的讀方臨界區沒有完成之前不會釋放這個對象。傳統的同步機制如spin lock,semaphore,rwlock等,並發線程不區分讀寫線程,或者並發線程允許同時讀,但是讀的時候不允許更新。RCU與這些機制最大的區別是允許在更新的同時讀數據。RCU允許同時有一個更新線程和多個讀線程並發;

RCU是如何做到上述的那種效果呢? RCU把更新操作分解為兩個部分removal 和reclaimation;在removal階段,刪除對該數據結構的引用,因為CPU對單個指針的寫入操作是原子的,因此刪除過程可以與新的讀線程並發執行;reclaimation階段必須等待removal階段所有的讀線程結束后才可以回收該數據結構,對於removal階段以后的讀線程看到的是更新后的數據結構,因此只需要考慮在removal階段已經存在的那些讀線程;

 

RCU實現過程主要解決以下2個問題:

1. 在一個讀線程遍歷鏈表的過程中,另外一個更新線程對鏈表進行插入操作,RCU需要保證讀線程要么能看見新的節點或者看不見新的節點;

2. 讀線程讀取了某個鏈表節點,更新線程可以從鏈表中刪除這個節點,但是不能直接回收這個節點,必須等到所有的讀線程完成后才進行回收操作;

 

經典RCU由三個基本機制組成:Publish-­Subscribe Mechanism,Waiting for All Pre-­existing RCU Readers to Complete,Maintain Multiple Version of Recently Updated Objects;

 

Publish-­Subscribe Mechanism

訂閱發布機制就是能夠並發插入鏈表的能力,允許即使鏈表正被修改,讀線程也可以安全的遍歷鏈表;考慮以下例子:

 

struct foo {
    int a;
    int b;
    int c;
};

struct foo *gp = NULL;

p = kmalloc(sizeof(*p), GFP_KERNEL);

p->a = 1;
p->b = 2;
p->c = 3;
gp = p;

對於不同的編譯器,有可能不能保證最后4條語句的順序執行。

RCU提供了rcu_assign_pointer用於發布新的數據結構;上面的代碼就可以修改為

p->a = 1;
p->b = 2;
p->c = 3;
rcu_assign_pointer(gp, p);

rcu_assign_pointer封裝了內存屏障,用於保證操作的順序;

 

讀線程考慮以下代碼:

p = gp;
if (p != NULL) {
    do_something_with(p-­>a, p-­>b, p-­>c);
}

看上去好像不會有執行順序問題,但是某些架構的CPU及其編譯器可能會在取p的值之前直接取p的成員。編譯器會先猜測p的值,然后取p的成員內容,然后才去取p的真實值來檢查之前的猜測是否正確;

RCU提供了rcu_dereference用於訂閱其他線程發布的值;

 

/**
 * rcu_dereference - fetch an RCU-protected pointer in an
 * RCU read-side critical section.  This pointer may later
 * be safely dereferenced.
 *
 * Inserts memory barriers on architectures that require them
 * (currently only the Alpha), and, more importantly, documents
 * exactly which pointers are protected by RCU.
 */

#define rcu_dereference(p)     ({ \
                typeof(p) _________p1 = p; \
                smp_read_barrier_depends(); \
                (_________p1); \
                })

讀線程的代碼就可以修改為

rcu_read_lock();
p = rcu_dereference(gp);
if (p != NULL) {
    do_something_with(p->a, p->b, p->c);
}
rcu_read_unlock();

 

Waiting for All Pre-­existing RCU Readers to Complete:RCU把所有已存在的讀線程完成的這段時間稱為grace period,如下圖所示:

image

從圖上可以看出grace period從removal階段發布新的指針開始一直到所有的已存在讀者完成對舊版本的節點的引用,直到reclaimation開始;圖中可以看出有4個讀線程引用了舊版本的數據,因此reclamation階段必須等到這4個讀線程完成后才可以開始;另外grace period開始后的讀線程看到的是更新后的節點,因此grace period可以忽略這些讀線程;

 

Linux中使用了一個小技巧來判斷讀線程是否已經完成對舊版本數據結構的引用,因為經典RCU中不允許阻塞或睡眠,因此可以通過該CPU是否完成了一次上下文切換來判斷讀線程是否已經完成對舊版本數據結構的引用。也就是說如果CPU完成了至少一次的上下文切換,讀線程已經安全地從臨界區退出了,因此可以安全地釋放舊版本的數據。CPU完成一次上下文切換也稱為經歷了一個quiescent state。

 

Maintain Multiple Version of Recently Updated Objects:對於RCU保護的數據,同時對數據結構進行讀和更新時,RCU的此項能力保證讀線程可以看到不同版本的數據結構,而不是部分更新的數據;

 

以下分析Linux kernel中RCU的實現。

 

1. 初始化:

static struct notifier_block __cpuinitdata rcu_nb = {
    .notifier_call    = rcu_cpu_notify,
};

 

/*
 * Initializes rcu mechanism.  Assumed to be called early.
 * That is before local timer(SMP) or jiffie timer (uniproc) is setup.
 * Note that rcu_qsctr and friends are implicitly
 * initialized due to the choice of ``0'' for RCU_CTR_INVALID.
 */
void __init rcu_init(void)
{
    rcu_cpu_notify(&rcu_nb, CPU_UP_PREPARE,
            (void *)(long)smp_processor_id());
    /* Register notifier for non-boot CPUs */
    register_cpu_notifier(&rcu_nb);
}

為了支持熱拔插的CPU,注冊了一個CPU事件的回調,對於已經啟動的CPU直接調用rcu_cpu_notify的CPU_UP_PREPARE事件

 

static int __cpuinit rcu_cpu_notify(struct notifier_block *self,
                unsigned long action, void *hcpu)
{
    long cpu = (long)hcpu;
    switch (action) {
    case CPU_UP_PREPARE:
        rcu_online_cpu(cpu);
        break;
    case CPU_DEAD:
        rcu_offline_cpu(cpu);
        break;
    default:
        break;
    }
    return NOTIFY_OK;
}

rcu_online_cpu中對每個CPU的rcu_data進行了初始化

static void __devinit rcu_online_cpu(int cpu)
{
    struct rcu_data *rdp = &per_cpu(rcu_data, cpu);
    struct rcu_data *bh_rdp = &per_cpu(rcu_bh_data, cpu);

    rcu_init_percpu_data(cpu, &rcu_ctrlblk, rdp);
    rcu_init_percpu_data(cpu, &rcu_bh_ctrlblk, bh_rdp);
    tasklet_init(&per_cpu(rcu_tasklet, cpu), rcu_process_callbacks, 0UL);
}

 

這里順便插入per_cpu的實現分析:

unsigned long __per_cpu_offset[NR_CPUS] __read_mostly;

EXPORT_SYMBOL(__per_cpu_offset);

static void __init setup_per_cpu_areas(void)
{
    unsigned long size, i;
    char *ptr;
    unsigned long nr_possible_cpus = num_possible_cpus();

    /* Copy section for each CPU (we discard the original) */
    size = ALIGN(__per_cpu_end - __per_cpu_start, SMP_CACHE_BYTES);
#ifdef CONFIG_MODULES
    if (size < PERCPU_ENOUGH_ROOM)
        size = PERCPU_ENOUGH_ROOM;
#endif
    ptr = alloc_bootmem(size * nr_possible_cpus);

    for_each_possible_cpu(i) {
        __per_cpu_offset[i] = ptr - __per_cpu_start;
        memcpy(ptr, __per_cpu_start, __per_cpu_end - __per_cpu_start);
        ptr += size;
    }
}

上述代碼在kernel初始化過程中調用,首先分配一段內存,然后把.data..percpu段中的數據為每個CPU都拷貝一份數據,並把每個CPU引用自己的那一段副本的地址偏移記錄下來;

 

19953775_1

因此后面每個CPU就可以通過該偏移地址來找到對應自己的那份副本

#define RELOC_HIDE(ptr, off)                    \
  ({ unsigned long __ptr;                    \
     __ptr = (unsigned long) (ptr);                \
    (typeof(ptr)) (__ptr + (off)); })

#define per_cpu(var, cpu) (*({                \
    extern int simple_identifier_##var(void);    \
    RELOC_HIDE(&per_cpu__##var, __per_cpu_offset[cpu]); }))

 

回到rcu_init_percpu_data中,

static void rcu_init_percpu_data(int cpu, struct rcu_ctrlblk *rcp,
                        struct rcu_data *rdp)
{
    memset(rdp, 0, sizeof(*rdp));
    rdp->curtail = &rdp->curlist;
    rdp->nxttail = &rdp->nxtlist;
    rdp->donetail = &rdp->donelist;
    /* 相等表示當前CPU無需等待quiescent state */
    rdp->quiescbatch = rcp->completed;
    rdp->qs_pending = 0;
    rdp->cpu = cpu;
    rdp->blimit = blimit;
}

其中第二個參數是全局rcu控制塊,結構如下:

/* Global control variables for rcupdate callback mechanism. */
struct rcu_ctrlblk {
    long    cur;        /* Current batch number.                      */
    long    completed;    /* Number of the last completed batch         */
    int    next_pending;    /* Is the next batch already waiting?         */

    int    signaled;

    spinlock_t    lock    ____cacheline_internodealigned_in_smp;
    cpumask_t    cpumask; /* CPUs that need to switch in order    */
                             /* for current batch to proceed.        */
} ____cacheline_internodealigned_in_smp;

rcu_data結構如下:

/*
 * Per-CPU data for Read-Copy UPdate.
 * nxtlist - new callbacks are added here
 * curlist - current batch for which quiescent cycle started if any
 */
struct rcu_data {
    /* 1) quiescent state handling : */
    long        quiescbatch;     /* Batch # for grace period, 正在等待的grace period號 */
    int        passed_quiesc;     /* User-mode/idle loop etc. 是否已經通過至少一次quiescent state */
    int        qs_pending;     /* core waits for quiesc state */

    /* 2) batch handling */
    long                 batch;     /* Batch # for current RCU batch,當前CPU正在進行的grace period號 */
    struct rcu_head *nxtlist;
    struct rcu_head **nxttail;
    long            qlen;           /* # of queued callbacks */
    struct rcu_head *curlist;
    struct rcu_head **curtail;
    struct rcu_head *donelist;
    struct rcu_head **donetail;
    long        blimit;         /* Upper limit on a processed batch */
    int cpu;
    struct rcu_head barrier;
};

DECLARE_PER_CPU(struct rcu_data, rcu_data);
DECLARE_PER_CPU(struct rcu_data, rcu_bh_data);

 

一次線程調度表明該CPU已經經歷了一次quiescent state, 在進程調度schedule中會調用rcu_qsctr_inc把rdp->passed_quiesc置為1。

/*
 * Increment the quiescent state counter.
 * The counter is a bit degenerated: We do not need to know
 * how many quiescent states passed, just if there was at least
 * one since the start of the grace period. Thus just a flag.
 */
static inline void rcu_qsctr_inc(int cpu)
{
    struct rcu_data *rdp = &per_cpu(rcu_data, cpu);
    rdp->passed_quiesc = 1;
}

另外在每次時鍾中斷都會檢查是否有RCU相關工作需要處理

/*
 * Called from the timer interrupt handler to charge one tick to the current 
 * process.  user_tick is 1 if the tick is user time, 0 for system.
 */
void update_process_times(int user_tick)
{
    struct task_struct *p = current;
    int cpu = smp_processor_id();

    /* Note: this timer irq context must be accounted for as well. */
    if (user_tick)
        account_user_time(p, jiffies_to_cputime(1));
    else
        account_system_time(p, HARDIRQ_OFFSET, jiffies_to_cputime(1));
    run_local_timers();
    if (rcu_pending(cpu))
        rcu_check_callbacks(cpu, user_tick);
    scheduler_tick();
     run_posix_cpu_timers(p);
}
static int __rcu_pending(struct rcu_ctrlblk *rcp, struct rcu_data *rdp)
{
    /* This cpu has pending rcu entries and the grace period
     * for them has completed.
     */
    /* 已經完成的grace period號大於等於當前CPU等待的grace period號 */
    if (rdp->curlist && !rcu_batch_before(rcp->completed, rdp->batch))
        return 1;

    /* This cpu has no pending entries, but there are new entries */
    /* 上一個等待已完成,有新的call_rcu調用 */
    if (!rdp->curlist && rdp->nxtlist)
        return 1;

    /* This cpu has finished callbacks to invoke */
    /* reclaimation階段,等待已完成,調用其回調函數 */
    if (rdp->donelist)
        return 1;

    /* The rcu core waits for a quiescent state from the cpu */
    /* 當前CPU已經進入grace period在等待quiescent state */
    if (rdp->quiescbatch != rcp->cur || rdp->qs_pending)
        return 1;

    /* nothing to do */
    return 0;
}

/*
 * Check to see if there is any immediate RCU-related work to be done
 * by the current CPU, returning 1 if so.  This function is part of the
 * RCU implementation; it is -not- an exported member of the RCU API.
 */
int rcu_pending(int cpu)
{
    return __rcu_pending(&rcu_ctrlblk, &per_cpu(rcu_data, cpu)) ||
        __rcu_pending(&rcu_bh_ctrlblk, &per_cpu(rcu_bh_data, cpu));
}

如果rcu_pending返回1,則進入rcu_check_callbacks,檢查當前CPU是否已經通過一次quiescent state,並調用rcu_process_callbacks進行處理

void rcu_check_callbacks(int cpu, int user)
{
    /* 處於用戶線程上下文中或者CPU空閑且不處於軟中斷上下文中且已經處理過一次硬件中斷 */
    if (user || 
        (idle_cpu(cpu) && !in_softirq() && 
                hardirq_count() <= (1 << HARDIRQ_SHIFT))) {
        rcu_qsctr_inc(cpu);
        rcu_bh_qsctr_inc(cpu);
    } else if (!in_softirq())
        rcu_bh_qsctr_inc(cpu);
    tasklet_schedule(&per_cpu(rcu_tasklet, cpu));
}
/*
 * This does the RCU processing work from tasklet context. 
 */
static void __rcu_process_callbacks(struct rcu_ctrlblk *rcp,
                    struct rcu_data *rdp)
{
    /* 當前正在等待quiescent state鏈表不為空且當前等待的grace period已結束 */
    if (rdp->curlist && !rcu_batch_before(rcp->completed, rdp->batch)) {
        /* 把當前等待的rcu置為完成狀態表示可以進行reclaimation階段 */
        *rdp->donetail = rdp->curlist;
        rdp->donetail = rdp->curtail;
        rdp->curlist = NULL;
        rdp->curtail = &rdp->curlist;
    }

    /* 上一次等待quiescent state已完成且有新的rcu調用 */
    if (rdp->nxtlist && !rdp->curlist) {
        /* 把這些新的rcu調用置為正在等待狀態然后開始新的grace period等待 */
        local_irq_disable();
        rdp->curlist = rdp->nxtlist;
        rdp->curtail = rdp->nxttail;
        rdp->nxtlist = NULL;
        rdp->nxttail = &rdp->nxtlist;
        local_irq_enable();

        /*
         * start the next batch of callbacks
         */

        /* 當前CPU等待的grace period號為已經在等待的grace period的下一個 */
        /* determine batch number */
        rdp->batch = rcp->cur + 1;
        /* see the comment and corresponding wmb() in
         * the rcu_start_batch()
         */
        smp_rmb();

        /* 若其他CPU已經開始等待,等待當前的grace period完成后再通過序列號來判斷自己是否可以結束grace period */
        if (!rcp->next_pending) {
            /* and start it/schedule start if it's a new batch */
            spin_lock(&rcp->lock);
            /* 防止有多個CPU進來開始新的grace period等待 */
            rcp->next_pending = 1;
            rcu_start_batch(rcp);
            spin_unlock(&rcp->lock);
        }
    }

    rcu_check_quiescent_state(rcp, rdp);
    if (rdp->donelist)
        rcu_do_batch(rdp);
}

 

/*
 * Grace period handling:
 * The grace period handling consists out of two steps:
 * - A new grace period is started.
 *   This is done by rcu_start_batch. The start is not broadcasted to
 *   all cpus, they must pick this up by comparing rcp->cur with
 *   rdp->quiescbatch. All cpus are recorded  in the
 *   rcu_ctrlblk.cpumask bitmap.
 * - All cpus must go through a quiescent state.
 *   Since the start of the grace period is not broadcasted, at least two
 *   calls to rcu_check_quiescent_state are required:
 *   The first call just notices that a new grace period is running. The
 *   following calls check if there was a quiescent state since the beginning
 *   of the grace period. If so, it updates rcu_ctrlblk.cpumask. If
 *   the bitmap is empty, then the grace period is completed.
 *   rcu_check_quiescent_state calls rcu_start_batch(0) to start the next grace
 *   period (if necessary).
 */
/*
 * Register a new batch of callbacks, and start it up if there is currently no
 * active batch and the batch to be registered has not already occurred.
 * Caller must hold rcu_ctrlblk.lock.
 */
static void rcu_start_batch(struct rcu_ctrlblk *rcp)
{
    /* 當前沒有正在等待的quiescent state,等待完成后會把rcp->completed = rcp->cur */
    if (rcp->next_pending &&
            rcp->completed == rcp->cur) {
        rcp->next_pending = 0;
        /*
         * next_pending == 0 must be visible in
         * __rcu_process_callbacks() before it can see new value of cur.
         */
        smp_wmb();
        /* 啟動新的等待,當前grace period號+1 */
        rcp->cur++;

        /*
         * Accessing nohz_cpu_mask before incrementing rcp->cur needs a
         * Barrier  Otherwise it can cause tickless idle CPUs to be
         * included in rcp->cpumask, which will extend graceperiods
         * unnecessarily.
         */
        smp_mb();

        /* 把當前存在的CPU置1 */
        cpus_andnot(rcp->cpumask, cpu_online_map, nohz_cpu_mask);

        rcp->signaled = 0;
    }
}

 

/*
 * Check if the cpu has gone through a quiescent state (say context
 * switch). If so and if it already hasn't done so in this RCU
 * quiescent cycle, then indicate that it has done so.
 */
static void rcu_check_quiescent_state(struct rcu_ctrlblk *rcp,
                    struct rcu_data *rdp)
{
    /* 開始了一個新的等待把qs_pending置為1表示處於grace period */
    if (rdp->quiescbatch != rcp->cur) {
        /* start new grace period: */
        rdp->qs_pending = 1;
        rdp->passed_quiesc = 0;
        rdp->quiescbatch = rcp->cur;
        return;
    }

    /* Grace period already completed for this cpu?
     * qs_pending is checked instead of the actual bitmap to avoid
     * cacheline trashing.
     */
    /* 等待未完成,返回 */
    if (!rdp->qs_pending)
        return;

    /* 
     * Was there a quiescent state since the beginning of the grace
     * period? If no, then exit and wait for the next call.
     */
    /* 還沒有經過上下文切換,grace period未完成 */
    if (!rdp->passed_quiesc)
        return;

    /* 已經經過了quiescent state */
    rdp->qs_pending = 0;

    spin_lock(&rcp->lock);
    /*
     * rdp->quiescbatch/rcp->cur and the cpu bitmap can come out of sync
     * during cpu startup. Ignore the quiescent state.
     */
    /* 當前完成的quiescent state是當前等待的那個grace period則把rcp中當前CPU位清空 */
    if (likely(rdp->quiescbatch == rcp->cur))
        cpu_quiet(rdp->cpu, rcp);

    spin_unlock(&rcp->lock);
}

最后執行donelist中的reclaimation

/*
 * Invoke the completed RCU callbacks. They are expected to be in
 * a per-cpu list.
 */
static void rcu_do_batch(struct rcu_data *rdp)
{
    struct rcu_head *next, *list;
    int count = 0;

    list = rdp->donelist;
    while (list) {
        next = list->next;
        prefetch(next);
        list->func(list);
        list = next;
        /* 反之一次性調用過多耗時太久 */
        if (++count >= rdp->blimit)
            break;
    }
    rdp->donelist = list;

    local_irq_disable();
    rdp->qlen -= count;
    local_irq_enable();
    if (rdp->blimit == INT_MAX && rdp->qlen <= qlowmark)
        rdp->blimit = blimit;

    if (!rdp->donelist)
        rdp->donetail = &rdp->donelist;
    else
        tasklet_schedule(&per_cpu(rcu_tasklet, rdp->cpu));
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM