Changeset 181a746 in mainline


Ignore:
Timestamp:
2012-07-10T17:50:29Z (12 years ago)
Author:
Adam Hraska <adam.hraska+hos@…>
Branches:
lfn, master, serial, ticket/834-toolchain-update, topic/msim-upgrade, topic/simplify-dev-export
Children:
5b6c033
Parents:
22b5924
Message:

rcu: Added preemptible RCU's core API implementation.

Location:
kernel/generic
Files:
1 added
12 edited

Legend:

Unmodified
Added
Removed
  • kernel/generic/include/cpu.h

    r22b5924 r181a746  
    3838#include <mm/tlb.h>
    3939#include <synch/spinlock.h>
     40#include <synch/rcu.h>
    4041#include <proc/scheduler.h>
    4142#include <arch/cpu.h>
    4243#include <arch/context.h>
     44#include <adt/list.h>
    4345
    4446/** CPU structure.
     
    99101        list_t smp_pending_calls;
    100102       
     103        /** RCU per-cpu data. Uses own locking. */
     104        rcu_cpu_data_t rcu;
     105       
    101106        /**
    102107         * Stack used by scheduler when there is no running thread.
  • kernel/generic/include/proc/thread.h

    r22b5924 r181a746  
    4141#include <cpu.h>
    4242#include <synch/spinlock.h>
     43#include <synch/rcu.h>
    4344#include <adt/avl.h>
    4445#include <mm/slab.h>
     
    185186        /** Links work queue threads. Protected by workq->lock. */
    186187        link_t workq_link;
    187         /** True if the worker was blocked and is not running.
    188          *
    189          * Protected by thread->lock.
    190          */
     188        /** True if the worker was blocked and is not running. Use thread->lock. */
    191189        bool workq_blocked;
    192         /** True if the worker will block in order to become idle.
    193          *
    194          * Protected by workq->lock.
    195          */
     190        /** True if the worker will block in order to become idle. Use workq->lock. */
    196191        bool workq_idling;
     192       
     193        /** RCU thread related data. Protected by its own locks. */
     194        rcu_thread_data_t rcu;
    197195       
    198196        /** Architecture-specific data. */
  • kernel/generic/include/synch/rcu.h

    r22b5924 r181a746  
    3636#define KERN_RCU_H_
    3737
     38#include <adt/list.h>
     39#include <synch/semaphore.h>
     40#include <compiler/barrier.h>
     41
     42
    3843/* Fwd decl. */
    39 struct rcu_head;
    40 
    41 /** RCU callback type. */
    42 typedef void (*rcu_cb_func_t)(struct rcu_head *head);
    43 
    44 /** */
    45 struct rcu_head {
    46         struct rcu_head *next;
    47         rcu_cb_func_t func;
    48 };
    49 
    50 
    51 
    52 void rcu_read_lock(void);
    53 void rcu_read_unlock(void);
    54 void rcu_synchronize(void);
    55 void rcu_call(struct rcu_head *head, rcu_cb_func_t func);
    56 
    57 
     44struct thread;
     45struct rcu_item;
     46
     47/** Grace period number typedef. */
     48typedef uint64_t rcu_gp_t;
     49
     50/** RCU callback type. The passed rcu_item_t maybe freed. */
     51typedef void (*rcu_func_t)(struct rcu_item *rcu_item);
     52
     53typedef struct rcu_item {
     54        rcu_func_t func;
     55        struct rcu_item *next;
     56} rcu_item_t;
     57
     58
     59/** RCU related per-cpu data. */
     60typedef struct rcu_cpu_data {
     61        /** The cpu recorded a quiescent state last time during this grace period */
     62        rcu_gp_t last_seen_gp;
     63       
     64        /** Pointer to the currently used nesting count (THREAD's or CPU's). */
     65        size_t *pnesting_cnt;
     66        /** Temporary nesting count if THREAD is NULL, eg in scheduler(). */
     67        size_t tmp_nesting_cnt;
     68
     69        /** Callbacks to invoke once the current grace period ends, ie cur_cbs_gp.
     70         * Accessed by the local reclaimer only.
     71         */
     72        rcu_item_t *cur_cbs;
     73        /** Callbacks to invoke once the next grace period ends, ie next_cbs_gp.
     74         * Accessed by the local reclaimer only.
     75         */
     76        rcu_item_t *next_cbs;
     77        /** New callbacks are place at the end of this list. */
     78        rcu_item_t *arriving_cbs;
     79        /** Tail of arriving_cbs list. Disable interrupts to access. */
     80        rcu_item_t **parriving_cbs_tail;
     81        /** Number of callbacks currently in arriving_cbs.
     82         * Disable interrupts to access.
     83         */
     84        size_t arriving_cbs_cnt;
     85
     86        /** At the end of this grace period callbacks in cur_cbs will be invoked.*/
     87        rcu_gp_t cur_cbs_gp;
     88        /** At the end of this grace period callbacks in next_cbs will be invoked.
     89         *
     90         * Should be the next grace period but it allows the reclaimer to
     91         * notice if it missed a grace period end announcement. In that
     92         * case it can execute next_cbs without waiting for another GP.
     93         *
     94         * Invariant: next_cbs_gp >= cur_cbs_gp
     95         */
     96        rcu_gp_t next_cbs_gp;
     97       
     98        /** This cpu has not yet passed a quiescent state and it is delaying the
     99         * detector. Once it reaches a QS it must sema_up(rcu.remaining_readers).
     100         */
     101        bool is_delaying_gp;
     102       
     103        /** Positive if there are callbacks pending in arriving_cbs. */
     104        semaphore_t arrived_flag;
     105       
     106        /** The reclaimer should expedite GPs for cbs in arriving_cbs. */
     107        bool expedite_arriving;
     108       
     109        /** Interruptable attached reclaimer thread. */
     110        struct thread *reclaimer_thr;
     111       
     112        /* Some statistics. */
     113        size_t stat_max_cbs;
     114        size_t stat_avg_cbs;
     115        size_t stat_missed_gps;
     116} rcu_cpu_data_t;
     117
     118
     119/** RCU related per-thread data. */
     120typedef struct rcu_thread_data {
     121        /** The number of times an RCU reader section is nested.
     122         *
     123         * If positive, it is definitely executing reader code. If zero,
     124         * the thread might already be executing reader code thanks to
     125         * cpu instruction reordering.
     126         */
     127        size_t nesting_cnt;
     128       
     129        /** True if the thread was preempted in a reader section.
     130         *
     131         * The thread is place into rcu.cur_preempted or rcu.next_preempted
     132         * and must remove itself in rcu_read_unlock().
     133         *
     134         * Access with interrupts disabled.
     135         */
     136        bool was_preempted;
     137        /** Preempted threads link. Access with rcu.prempt_lock.*/
     138        link_t preempt_link;
     139} rcu_thread_data_t;
     140
     141
     142#ifndef member_to_inst
     143#define member_to_inst(ptr_member, type, member_identif) \
     144        ((type*) (((void*)(ptr_member)) - ((void*)&(((type*)0)->member_identif))))
    58145#endif
    59146
     147/** Use to assign a pointer to newly initialized data to a rcu reader
     148 * accessible pointer.
     149 *
     150 * Example:
     151 * @code
     152 * typedef struct exam {
     153 *     struct exam *next;
     154 *     int grade;
     155 * } exam_t;
     156 *
     157 * exam_t *exam_list;
     158 * // ..
     159 *
     160 * // Insert at the beginning of the list.
     161 * exam_t *my_exam = malloc(sizeof(exam_t), 0);
     162 * my_exam->grade = 5;
     163 * my_exam->next = exam_list;
     164 * rcu_assign(exam_list, my_exam);
     165 *
     166 * // Changes properly propagate. Every reader either sees
     167 * // the old version of exam_list or the new version with
     168 * // the fully initialized my_exam.
     169 * rcu_synchronize();
     170 * // Now we can be sure every reader sees my_exam.
     171 *
     172 * @endcode
     173 */
     174#define rcu_assign(ptr, value) \
     175        do { \
     176                memory_barrier(); \
     177                (ptr) = (value); \
     178        } while (0)
     179
     180/** Use to access RCU protected data in a reader section.
     181 *
     182 * Example:
     183 * @code
     184 * exam_t *exam_list;
     185 * // ...
     186 *
     187 * rcu_read_lock();
     188 * exam_t *first_exam = rcu_access(exam_list);
     189 * // We can now safely use first_exam, it won't change
     190 * // under us while we're using it.
     191 *
     192 * // ..
     193 * rcu_read_unlock();
     194 * @endcode
     195 */
     196#define rcu_access(ptr) ACCESS_ONCE(ptr)
     197
     198extern void rcu_read_lock(void);
     199extern void rcu_read_unlock(void);
     200extern void rcu_synchronize(void);
     201extern void rcu_call(rcu_item_t *rcu_item, rcu_func_t func);
     202
     203extern void rcu_print_stat(void);
     204
     205extern void rcu_init(void);
     206extern void rcu_stop(void);
     207extern void rcu_cpu_init(void);
     208extern void rcu_kinit_init(void);
     209extern void rcu_thread_init(struct thread*);
     210extern void rcu_thread_exiting(void);
     211extern void rcu_after_thread_ran(void);
     212extern void rcu_before_thread_runs(void);
     213
     214/* Debugging/testing support. Not part of public API. Do not use! */
     215extern uint64_t rcu_completed_gps(void);
     216extern void _rcu_call(bool expedite, rcu_item_t *rcu_item, rcu_func_t func);
     217
     218#endif
     219
    60220/** @}
    61221 */
    62  
  • kernel/generic/include/synch/semaphore.h

    r22b5924 r181a746  
    5353        _semaphore_down_timeout((s), (usec), SYNCH_FLAGS_NONE)
    5454
     55#define semaphore_down_interruptable(s) \
     56        (ESYNCH_INTERRUPTED != _semaphore_down_timeout((s), SYNCH_NO_TIMEOUT, \
     57                SYNCH_FLAGS_INTERRUPTIBLE))
     58
    5559extern void semaphore_initialize(semaphore_t *, int);
    5660extern int _semaphore_down_timeout(semaphore_t *, uint32_t, unsigned int);
  • kernel/generic/src/console/cmd.c

    r22b5924 r181a746  
    6969#include <symtab.h>
    7070#include <synch/workqueue.h>
     71#include <synch/rcu.h>
    7172#include <errno.h>
    7273
     
    459460};
    460461
     462/* Data and methods for the 'workq' command */
     463static int cmd_rcu(cmd_arg_t *argv);
     464static cmd_info_t rcu_info = {
     465        .name = "rcu",
     466        .description = "Show RCU run-time statistics.",
     467        .func = cmd_rcu,
     468        .argc = 0
     469};
     470
    461471/* Data and methods for 'ipc' command */
    462472static int cmd_ipc(cmd_arg_t *argv);
     
    522532        &physmem_info,
    523533        &reboot_info,
     534        &rcu_info,
    524535        &sched_info,
    525536        &set4_info,
     
    10381049}
    10391050
     1051/** Prints RCU statistics.
     1052 *
     1053 * @param argv Ignores
     1054 *
     1055 * @return Always 1
     1056 */
     1057int cmd_rcu(cmd_arg_t *argv)
     1058{
     1059        rcu_print_stat();
     1060        return 1;
     1061}
    10401062
    10411063/** Command for listing memory zones
  • kernel/generic/src/cpu/cpu.c

    r22b5924 r181a746  
    5050#include <sysinfo/sysinfo.h>
    5151#include <arch/cycle.h>
     52#include <synch/rcu.h>
    5253
    5354cpu_t *cpus;
     
    102103        cpu_identify();
    103104        cpu_arch_init();
     105        rcu_cpu_init();
    104106}
    105107
  • kernel/generic/src/interrupt/interrupt.c

    r22b5924 r181a746  
    110110        }
    111111       
    112         /* Account CPU usage if it has waked up from sleep */
    113         if (CPU) {
     112        /* Account CPU usage if it woke up from sleep */
     113        if (CPU && CPU->idle) {
    114114                irq_spinlock_lock(&CPU->lock, false);
    115                 if (CPU->idle) {
    116                         uint64_t now = get_cycle();
    117                         CPU->idle_cycles += now - CPU->last_cycle;
    118                         CPU->last_cycle = now;
    119                         CPU->idle = false;
    120                 }
     115                uint64_t now = get_cycle();
     116                CPU->idle_cycles += now - CPU->last_cycle;
     117                CPU->last_cycle = now;
     118                CPU->idle = false;
    121119                irq_spinlock_unlock(&CPU->lock, false);
    122120        }
  • kernel/generic/src/lib/str.c

    r22b5924 r181a746  
    554554        /* There must be space for a null terminator in the buffer. */
    555555        ASSERT(size > 0);
     556        ASSERT(src != NULL);
    556557       
    557558        size_t src_off = 0;
  • kernel/generic/src/main/kinit.c

    r22b5924 r181a746  
    7878#include <synch/spinlock.h>
    7979#include <synch/workqueue.h>
     80#include <synch/rcu.h>
    8081
    8182#define ALIVE_CHARS  4
     
    106107
    107108        interrupts_disable();
     109       
     110        /* Start processing RCU callbacks. RCU is fully functional afterwards. */
     111        rcu_kinit_init();
    108112       
    109113        /*
  • kernel/generic/src/proc/scheduler.c

    r22b5924 r181a746  
    5353#include <synch/spinlock.h>
    5454#include <synch/workqueue.h>
     55#include <synch/rcu.h>
    5556#include <config.h>
    5657#include <context.h>
     
    8788{
    8889        before_thread_runs_arch();
     90        rcu_before_thread_runs();
    8991       
    9092#ifdef CONFIG_FPU_LAZY
     
    128130{
    129131        workq_after_thread_ran();
     132        rcu_after_thread_ran();
    130133        after_thread_ran_arch();
    131134}
     
    220223                goto loop;
    221224        }
     225
     226        ASSERT(!CPU->idle);
    222227       
    223228        unsigned int i;
     
    422427               
    423428                case Exiting:
     429                        rcu_thread_exiting();
    424430repeat:
    425431                        if (THREAD->detached) {
  • kernel/generic/src/proc/thread.c

    r22b5924 r181a746  
    4747#include <synch/waitq.h>
    4848#include <synch/workqueue.h>
     49#include <synch/rcu.h>
    4950#include <cpu.h>
    5051#include <str.h>
     
    405406        /* Might depend on previous initialization */
    406407        thread_create_arch(thread);
     408       
     409        rcu_thread_init(thread);
    407410       
    408411        if ((flags & THREAD_FLAG_NOATTACH) != THREAD_FLAG_NOATTACH)
  • kernel/generic/src/synch/rcu.c

    r22b5924 r181a746  
    3434/**
    3535 * @file
    36  * @brief RCU.
     36 * @brief Preemptible read-copy update. Usable from interrupt handlers.
    3737 */
    3838 
    3939#include <synch/rcu.h>
    40 
    41 
    42 /** Delimits the start of an RCU reader critical section.
    43  *
     40#include <synch/condvar.h>
     41#include <synch/semaphore.h>
     42#include <synch/spinlock.h>
     43#include <proc/thread.h>
     44#include <cpu/cpu_mask.h>
     45#include <cpu.h>
     46#include <smp/smp_call.h>
     47#include <compiler/barrier.h>
     48#include <atomic.h>
     49#include <arch.h>
     50#include <macros.h>
     51
     52/*
     53 * Number of milliseconds to give to preexisting readers to finish
     54 * when non-expedited grace period detection is in progress.
     55 */
     56#define DETECT_SLEEP_MS    5
     57/*
     58 * Max number of pending callbacks in the local cpu's queue before
     59 * aggressively expediting the current grace period
     60 */
     61#define EXPEDITE_THRESHOLD 1000
     62/* Half the number of values a uint32 can hold. */
     63#define UINT32_MAX_HALF    2147483648U
     64
     65
     66/*c Global RCU data. */
     67typedef struct rcu_data {
     68        /** Detector uses so signal reclaimers that a grace period ended. */
     69        condvar_t gp_ended;
     70        /** Reclaimers notify the detector when they request more grace periods.*/
     71        condvar_t req_gp_changed;
     72        /** Reclaimers use to notify the detector to accelerate GP detection. */
     73        condvar_t expedite_now;
     74        /**
     75         * The detector waits on this semaphore for any readers delaying the GP.
     76         *
     77         * Each of the cpus with readers that are delaying the current GP
     78         * must up() this sema once they reach a quiescent state. If there
     79         * are any readers in cur_preempted (ie preempted preexisting) and
     80         * they are already delaying GP detection, the last to unlock its
     81         * reader section must up() this sema once.
     82         */
     83        semaphore_t remaining_readers;
     84       
     85        /** Protects the 4 fields below. */
     86        SPINLOCK_DECLARE(gp_lock);
     87        /** Number of grace period ends the detector was requested to announce. */
     88        size_t req_gp_end_cnt;
     89        /** Number of consecutive grace periods to detect quickly and aggressively.*/
     90        size_t req_expedited_cnt;
     91        /**
     92         * The current grace period number. Increases monotonically.
     93         * Lock gp_lock or preempt_lock to get a current value.
     94         */
     95        rcu_gp_t cur_gp;
     96        /**
     97         * The number of the most recently completed grace period.
     98         * At most one behind cur_gp. If equal to cur_gp, a grace
     99         * period detection is not in progress and the detector
     100         * is idle.
     101         */
     102        rcu_gp_t completed_gp;
     103       
     104        /** Protect the following 3 fields. */
     105        IRQ_SPINLOCK_DECLARE(preempt_lock);
     106        /** Preexisting readers that have been preempted. */
     107        list_t cur_preempted;
     108        /** Reader that have been preempted and might delay the next grace period.*/
     109        list_t next_preempted;
     110        /**
     111         * The detector is waiting for the last preempted reader
     112         * in cur_preempted to announce that it exited its reader
     113         * section by up()ing remaining_readers.
     114         */
     115        bool preempt_blocking_det;
     116       
     117        /**
     118         * Number of cpus with readers that are delaying the current GP.
     119         * They will up() remaining_readers.
     120         */
     121        atomic_t delaying_cpu_cnt;
     122       
     123        /** Interruptable attached detector thread pointer. */
     124        thread_t *detector_thr;
     125       
     126        /* Some statistics. */
     127        size_t stat_expedited_cnt;
     128        size_t stat_delayed_cnt;
     129        size_t stat_preempt_blocking_cnt;
     130        /* Does not contain self/local calls. */
     131        size_t stat_smp_call_cnt;
     132} rcu_data_t;
     133
     134
     135static rcu_data_t rcu;
     136
     137static void start_detector(void);
     138static void start_reclaimers(void);
     139static void rcu_read_unlock_impl(size_t *pnesting_cnt);
     140static void synch_complete(rcu_item_t *rcu_item);
     141static void check_qs(void);
     142static void record_qs(void);
     143static void signal_read_unlock(void);
     144static bool arriving_cbs_empty(void);
     145static bool next_cbs_empty(void);
     146static bool cur_cbs_empty(void);
     147static bool all_cbs_empty(void);
     148static void reclaimer(void *arg);
     149static bool wait_for_pending_cbs(void);
     150static bool advance_cbs(void);
     151static void exec_completed_cbs(rcu_gp_t last_completed_gp);
     152static void exec_cbs(rcu_item_t **phead);
     153static void req_detection(size_t req_cnt);
     154static bool wait_for_cur_cbs_gp_end(bool expedite, rcu_gp_t *last_completed_gp);
     155static bool cv_wait_for_gp(rcu_gp_t wait_on_gp);
     156static void detector(void *);
     157static bool wait_for_detect_req(void);
     158static void start_new_gp(void);
     159static void end_cur_gp(void);
     160static bool wait_for_readers(void);
     161static void rm_quiescent_cpus(cpu_mask_t *cpu_mask);
     162static bool gp_sleep(void);
     163static void interrupt_delaying_cpus(cpu_mask_t *cpu_mask);
     164static void sample_local_cpu(void *);
     165static bool wait_for_delaying_cpus(void);
     166static bool wait_for_preempt_reader(void);
     167
     168
     169
     170/** Initializes global RCU structures. */
     171void rcu_init(void)
     172{
     173        condvar_initialize(&rcu.gp_ended);
     174        condvar_initialize(&rcu.req_gp_changed);
     175        condvar_initialize(&rcu.expedite_now);
     176        semaphore_initialize(&rcu.remaining_readers, 0);
     177       
     178        spinlock_initialize(&rcu.gp_lock, "rcu.gp_lock");
     179        rcu.req_gp_end_cnt = 0;
     180        rcu.req_expedited_cnt = 0;
     181        rcu.cur_gp = 0;
     182        rcu.completed_gp = 0;
     183       
     184        irq_spinlock_initialize(&rcu.preempt_lock, "rcu.preempt_lock");
     185        list_initialize(&rcu.cur_preempted);
     186        list_initialize(&rcu.next_preempted);
     187        rcu.preempt_blocking_det = false;
     188       
     189        atomic_set(&rcu.delaying_cpu_cnt, 0);
     190       
     191        rcu.detector_thr = 0;
     192       
     193        rcu.stat_expedited_cnt = 0;
     194        rcu.stat_delayed_cnt = 0;
     195        rcu.stat_preempt_blocking_cnt = 0;
     196        rcu.stat_smp_call_cnt = 0;
     197}
     198
     199/** Initializes per-CPU RCU data. If on the boot cpu inits global data too.*/
     200void rcu_cpu_init(void)
     201{
     202        if (config.cpu_active == 1) {
     203                rcu_init();
     204        }
     205       
     206        CPU->rcu.last_seen_gp = 0;
     207       
     208        CPU->rcu.pnesting_cnt = &CPU->rcu.tmp_nesting_cnt;
     209        CPU->rcu.tmp_nesting_cnt = 0;
     210       
     211        CPU->rcu.cur_cbs = 0;
     212        CPU->rcu.next_cbs = 0;
     213        CPU->rcu.arriving_cbs = 0;
     214        CPU->rcu.parriving_cbs_tail = &CPU->rcu.arriving_cbs;
     215       
     216        CPU->rcu.arriving_cbs_cnt = 0;
     217
     218        CPU->rcu.cur_cbs_gp = 0;
     219        CPU->rcu.next_cbs_gp = 0;
     220       
     221        CPU->rcu.is_delaying_gp = false;
     222       
     223        semaphore_initialize(&CPU->rcu.arrived_flag, 0);
     224        CPU->rcu.reclaimer_thr = 0;
     225       
     226        CPU->rcu.stat_max_cbs = 0;
     227        CPU->rcu.stat_avg_cbs = 0;
     228        CPU->rcu.stat_missed_gps = 0;
     229}
     230
     231/** Completes RCU init. Creates and runs the detector and reclaimer threads.*/
     232void rcu_kinit_init(void)
     233{
     234        start_detector();
     235        start_reclaimers();
     236}
     237
     238/** Initializes any per-thread RCU structures. */
     239void rcu_thread_init(thread_t *thread)
     240{
     241        thread->rcu.nesting_cnt = 0;
     242        thread->rcu.was_preempted = false;
     243        link_initialize(&thread->rcu.preempt_link);
     244}
     245
     246/** Called from scheduler() when exiting the current thread.
     247 *
     248 * Preemption or interrupts are disabled and the scheduler() already
     249 * switched away from the current thread, calling rcu_after_thread_ran().
     250 */
     251void rcu_thread_exiting(void)
     252{
     253        ASSERT(THREAD != 0);
     254        ASSERT(THREAD->state == Exiting);
     255        ASSERT(PREEMPTION_DISABLED || interrupts_disabled());
     256        /*
     257         * The scheduler() must have already switched to a temporary
     258         * nesting counter for interrupt handlers (we could be idle)
     259         * so that interrupt handlers do not modify the exiting thread's
     260         * reader section nesting count while we examine/process it.
     261         */
     262        ASSERT(&CPU->rcu.tmp_nesting_cnt == CPU->rcu.pnesting_cnt);
     263       
     264        /*
     265         * The thread forgot to exit its reader critical secion.
     266         * It is a bug, but rather than letting the entire system lock up
     267         * forcefully leave the reader section. The thread is not holding
     268         * any references anyway since it is exiting so it is safe.
     269         */
     270        if (0 < THREAD->rcu.nesting_cnt) {
     271                THREAD->rcu.nesting_cnt = 1;
     272                rcu_read_unlock_impl(&THREAD->rcu.nesting_cnt);
     273        }
     274}
     275
     276/** Cleans up global RCU resources and stops dispatching callbacks.
     277 *
     278 * Call when shutting down the kernel. Outstanding callbacks will
     279 * not be processed. Instead they will linger forever.
     280 */
     281void rcu_stop(void)
     282{
     283        /* todo: stop accepting new callbacks instead of just letting them linger?*/
     284       
     285        /* Stop and wait for reclaimers. */
     286        for (unsigned int cpu_id = 0; cpu_id < config.cpu_active; ++cpu_id) {
     287                ASSERT(cpus[cpu_id].rcu.reclaimer_thr != 0);
     288       
     289                if (cpus[cpu_id].rcu.reclaimer_thr) {
     290                        thread_interrupt(cpus[cpu_id].rcu.reclaimer_thr);
     291                        thread_join(cpus[cpu_id].rcu.reclaimer_thr);
     292                        thread_detach(cpus[cpu_id].rcu.reclaimer_thr);
     293                        cpus[cpu_id].rcu.reclaimer_thr = 0;
     294                }
     295        }
     296
     297        /* Stop the detector and wait. */
     298        if (rcu.detector_thr) {
     299                thread_interrupt(rcu.detector_thr);
     300                thread_join(rcu.detector_thr);
     301                thread_detach(rcu.detector_thr);
     302                rcu.detector_thr = 0;
     303        }
     304}
     305
     306/** Starts the detector thread. */
     307static void start_detector(void)
     308{
     309        rcu.detector_thr =
     310                thread_create(detector, 0, TASK, THREAD_FLAG_NONE, "rcu-det");
     311       
     312        if (!rcu.detector_thr)
     313                panic("Failed to create RCU detector thread.");
     314       
     315        thread_ready(rcu.detector_thr);
     316}
     317
     318/** Creates and runs cpu-bound reclaimer threads. */
     319static void start_reclaimers(void)
     320{
     321        for (unsigned int cpu_id = 0; cpu_id < config.cpu_count; ++cpu_id) {
     322                char name[THREAD_NAME_BUFLEN] = {0};
     323               
     324                snprintf(name, THREAD_NAME_BUFLEN - 1, "rcu-rec/%u", cpu_id);
     325               
     326                cpus[cpu_id].rcu.reclaimer_thr =
     327                        thread_create(reclaimer, 0, TASK, THREAD_FLAG_NONE, name);
     328
     329                if (!cpus[cpu_id].rcu.reclaimer_thr)
     330                        panic("Failed to create RCU reclaimer thread on cpu%u.", cpu_id);
     331
     332                thread_wire(cpus[cpu_id].rcu.reclaimer_thr, &cpus[cpu_id]);
     333                thread_ready(cpus[cpu_id].rcu.reclaimer_thr);
     334        }
     335}
     336
     337/** Returns the number of elapsed grace periods since boot. */
     338uint64_t rcu_completed_gps(void)
     339{
     340        spinlock_lock(&rcu.gp_lock);
     341        uint64_t completed = rcu.completed_gp;
     342        spinlock_unlock(&rcu.gp_lock);
     343       
     344        return completed;
     345}
     346
     347/** Delimits the start of an RCU reader critical section.
     348 *
     349 * Reader sections may be nested and are preemptable. You must not
     350 * however block/sleep within reader sections.
    44351 */
    45352void rcu_read_lock(void)
    46353{
    47         /* todo: implement */
    48 }
    49 
    50 
    51 /** Delimits the end of an RCU reader critical section.
    52  *
    53  */
     354        ASSERT(CPU);
     355        preemption_disable();
     356
     357        check_qs();
     358        ++(*CPU->rcu.pnesting_cnt);
     359
     360        preemption_enable();
     361}
     362
     363/** Delimits the end of an RCU reader critical section. */
    54364void rcu_read_unlock(void)
    55365{
    56         /* todo: implement */
    57 }
    58 
    59 
    60 /** Blocks until all preexisting readers exit their critical sections.
    61  *
    62  */
     366        ASSERT(CPU);
     367        preemption_disable();
     368       
     369        rcu_read_unlock_impl(CPU->rcu.pnesting_cnt);
     370       
     371        preemption_enable();
     372}
     373
     374/** Unlocks the local reader section using the given nesting count.
     375 *
     376 * Preemption or interrupts must be disabled.
     377 *
     378 * @param pnesting_cnt Either &CPU->rcu.tmp_nesting_cnt or
     379 *           THREAD->rcu.nesting_cnt.
     380 */
     381static void rcu_read_unlock_impl(size_t *pnesting_cnt)
     382{
     383        ASSERT(PREEMPTION_DISABLED || interrupts_disabled());
     384       
     385        if (0 == --(*pnesting_cnt)) {
     386                record_qs();
     387               
     388                /*
     389                 * The thread was preempted while in a critical section or
     390                 * the detector is eagerly waiting for this cpu's reader
     391                 * to finish.
     392                 *
     393                 * Note that THREAD may be 0 in scheduler() and not just during boot.
     394                 */
     395                if ((THREAD && THREAD->rcu.was_preempted) || CPU->rcu.is_delaying_gp) {
     396                        /* Rechecks with disabled interrupts. */
     397                        signal_read_unlock();
     398                }
     399        }
     400}
     401
     402/** Records a QS if not in a reader critical section. */
     403static void check_qs(void)
     404{
     405        ASSERT(PREEMPTION_DISABLED || interrupts_disabled());
     406       
     407        if (0 == *CPU->rcu.pnesting_cnt)
     408                record_qs();
     409}
     410
     411/** Unconditionally records a quiescent state for the local cpu. */
     412static void record_qs(void)
     413{
     414        ASSERT(PREEMPTION_DISABLED || interrupts_disabled());
     415       
     416        /*
     417         * A new GP was started since the last time we passed a QS.
     418         * Notify the detector we have reached a new QS.
     419         */
     420        if (CPU->rcu.last_seen_gp != rcu.cur_gp) {
     421                rcu_gp_t cur_gp = ACCESS_ONCE(rcu.cur_gp);
     422                /*
     423                 * Contain memory accesses within a reader critical section.
     424                 * If we are in rcu_lock() it also makes changes prior to the
     425                 * start of the GP visible in the reader section.
     426                 */
     427                memory_barrier();
     428                /*
     429                 * Acknowledge we passed a QS since the beginning of rcu.cur_gp.
     430                 * Cache coherency will lazily transport the value to the
     431                 * detector while it sleeps in gp_sleep().
     432                 *
     433                 * Note that there is a theoretical possibility that we
     434                 * overwrite a more recent/greater last_seen_gp here with
     435                 * an older/smaller value. If this cpu is interrupted here
     436                 * while in rcu_lock() reader sections in the interrupt handler
     437                 * will update last_seen_gp to the same value as is currently
     438                 * in local cur_gp. However, if the cpu continues processing
     439                 * interrupts and the detector starts a new GP immediately,
     440                 * local interrupt handlers may update last_seen_gp again (ie
     441                 * properly ack the new GP) with a value greater than local cur_gp.
     442                 * Resetting last_seen_gp to a previous value here is however
     443                 * benign and we only have to remember that this reader may end up
     444                 * in cur_preempted even after the GP ends. That is why we
     445                 * append next_preempted to cur_preempted rather than overwriting
     446                 * it as if cur_preempted were empty.
     447                 */
     448                CPU->rcu.last_seen_gp = cur_gp;
     449        }
     450}
     451
     452/** If necessary, signals the detector that we exited a reader section. */
     453static void signal_read_unlock(void)
     454{
     455        ASSERT(PREEMPTION_DISABLED || interrupts_disabled());
     456       
     457        /*
     458         * We have to disable interrupts in order to make checking
     459         * and resetting was_preempted and is_delaying_gp atomic
     460         * with respect to local interrupt handlers. Otherwise
     461         * an interrupt could beat us to calling semaphore_up()
     462         * before we reset the appropriate flag.
     463         */
     464        ipl_t ipl = interrupts_disable();
     465       
     466        /*
     467         * If the detector is eagerly waiting for this cpu's reader to unlock,
     468         * notify it that the reader did so.
     469         */
     470        if (CPU->rcu.is_delaying_gp) {
     471                CPU->rcu.is_delaying_gp = false;
     472                semaphore_up(&rcu.remaining_readers);
     473        }
     474       
     475        /*
     476         * This reader was preempted while in a reader section.
     477         * We might be holding up the current GP. Notify the
     478         * detector if so.
     479         */
     480        if (THREAD && THREAD->rcu.was_preempted) {
     481                ASSERT(link_used(&THREAD->rcu.preempt_link));
     482                THREAD->rcu.was_preempted = false;
     483
     484                irq_spinlock_lock(&rcu.preempt_lock, false);
     485               
     486                bool prev_empty = list_empty(&rcu.cur_preempted);
     487                list_remove(&THREAD->rcu.preempt_link);
     488                bool now_empty = list_empty(&rcu.cur_preempted);
     489               
     490                /* This was the last reader in cur_preempted. */
     491                bool last_removed = now_empty && !prev_empty;
     492               
     493                /*
     494                 * Preempted readers are blocking the detector and
     495                 * this was the last reader blocking the current GP.
     496                 */
     497                if (last_removed && rcu.preempt_blocking_det) {
     498                        rcu.preempt_blocking_det = false;
     499                        semaphore_up(&rcu.remaining_readers);
     500                }
     501               
     502                irq_spinlock_unlock(&rcu.preempt_lock, false);
     503        }
     504        interrupts_restore(ipl);
     505}
     506
     507typedef struct synch_item {
     508        waitq_t wq;
     509        rcu_item_t rcu_item;
     510} synch_item_t;
     511
     512/** Blocks until all preexisting readers exit their critical sections. */
    63513void rcu_synchronize(void)
    64514{
    65         /* todo: implement */
    66 }
    67 
    68 
    69 /** Adds a callback to invoke after all preexisting readers finish.
    70  *
    71  */
    72 void rcu_call(struct rcu_head *head, rcu_cb_func_t func)
    73 {
    74         /* todo: implement */
    75 }
    76 
     515        /* Calling from a reader section will deadlock. */
     516        ASSERT(THREAD == 0 || 0 == THREAD->rcu.nesting_cnt);
     517       
     518        synch_item_t completion;
     519
     520        waitq_initialize(&completion.wq);
     521        rcu_call(&completion.rcu_item, synch_complete);
     522        waitq_sleep(&completion.wq);
     523        waitq_complete_wakeup(&completion.wq);
     524}
     525
     526/** rcu_synchronize's callback. */
     527static void synch_complete(rcu_item_t *rcu_item)
     528{
     529        synch_item_t *completion = member_to_inst(rcu_item, synch_item_t, rcu_item);
     530        ASSERT(completion);
     531        waitq_wakeup(&completion->wq, WAKEUP_FIRST);
     532}
     533
     534/** Adds a callback to invoke after all preexisting readers finish.
     535 *
     536 * May be called from within interrupt handlers or RCU reader sections.
     537 *
     538 * @param rcu_item Used by RCU to track the call. Must remain
     539 *         until the user callback function is entered.
     540 * @param func User callback function that will be invoked once a full
     541 *         grace period elapsed, ie at a time when all preexisting
     542 *         readers have finished. The callback should be short and must
     543 *         not block. If you must sleep, enqueue your work in the system
     544 *         work queue from the callback (ie workq_global_enqueue()).
     545 */
     546void rcu_call(rcu_item_t *rcu_item, rcu_func_t func)
     547{
     548        _rcu_call(false, rcu_item, func);
     549}
     550
     551/** rcu_call() implementation. See rcu_call() for comments. */
     552void _rcu_call(bool expedite, rcu_item_t *rcu_item, rcu_func_t func)
     553{
     554        ASSERT(rcu_item);
     555       
     556        rcu_item->func = func;
     557        rcu_item->next = 0;
     558       
     559        preemption_disable();
     560       
     561        ipl_t ipl = interrupts_disable();
     562
     563        *CPU->rcu.parriving_cbs_tail = rcu_item;
     564        CPU->rcu.parriving_cbs_tail = &rcu_item->next;
     565       
     566        size_t cnt = ++CPU->rcu.arriving_cbs_cnt;
     567        interrupts_restore(ipl);
     568       
     569        if (expedite) {
     570                CPU->rcu.expedite_arriving = true;
     571        }
     572       
     573        /* Added first callback - notify the reclaimer. */
     574        if (cnt == 1 && !semaphore_count_get(&CPU->rcu.arrived_flag)) {
     575                semaphore_up(&CPU->rcu.arrived_flag);
     576        }
     577       
     578        preemption_enable();
     579}
     580
     581static bool cur_cbs_empty(void)
     582{
     583        ASSERT(THREAD && THREAD->wired);
     584        return 0 == CPU->rcu.cur_cbs;
     585}
     586
     587static bool next_cbs_empty(void)
     588{
     589        ASSERT(THREAD && THREAD->wired);
     590        return 0 == CPU->rcu.next_cbs;
     591}
     592
     593/** Disable interrupts to get an up-to-date result. */
     594static bool arriving_cbs_empty(void)
     595{
     596        ASSERT(THREAD && THREAD->wired);
     597        /*
     598         * Accessing with interrupts enabled may at worst lead to
     599         * a false negative if we race with a local interrupt handler.
     600         */
     601        return 0 == CPU->rcu.arriving_cbs;
     602}
     603
     604static bool all_cbs_empty(void)
     605{
     606        return cur_cbs_empty() && next_cbs_empty() && arriving_cbs_empty();
     607}
     608
     609/** Reclaimer thread dispatches locally queued callbacks once a GP ends. */
     610static void reclaimer(void *arg)
     611{
     612        ASSERT(THREAD && THREAD->wired);
     613
     614        rcu_gp_t last_compl_gp = 0;
     615        bool ok = true;
     616       
     617        while (ok && wait_for_pending_cbs()) {
     618                exec_completed_cbs(last_compl_gp);
     619               
     620                bool expedite = advance_cbs();
     621               
     622                ok = wait_for_cur_cbs_gp_end(expedite, &last_compl_gp);
     623        }
     624}
     625
     626/** Waits until there are callbacks waiting to be dispatched. */
     627static bool wait_for_pending_cbs(void)
     628{
     629        if (!all_cbs_empty())
     630                return true;
     631
     632        bool ok = true;
     633       
     634        while (arriving_cbs_empty() && ok) {
     635                ok = semaphore_down_interruptable(&CPU->rcu.arrived_flag);
     636        }
     637       
     638        return ok;
     639}
     640
     641static void upd_stat_missed_gp(rcu_gp_t compl, rcu_gp_t cb_gp)
     642{
     643        if (compl > cb_gp) {
     644                CPU->rcu.stat_missed_gps += (size_t)(compl - cb_gp);
     645        }
     646}
     647
     648/** Executes all callbacks for the given completed grace period. */
     649static void exec_completed_cbs(rcu_gp_t last_completed_gp)
     650{
     651        if (CPU->rcu.cur_cbs_gp <= last_completed_gp) {
     652                upd_stat_missed_gp(last_completed_gp, CPU->rcu.cur_cbs_gp);
     653                exec_cbs(&CPU->rcu.cur_cbs);
     654        }
     655       
     656        if (CPU->rcu.next_cbs_gp <= last_completed_gp) {
     657                upd_stat_missed_gp(last_completed_gp, CPU->rcu.next_cbs_gp);
     658                exec_cbs(&CPU->rcu.next_cbs);   
     659        }
     660}
     661
     662/** Executes callbacks in the single-linked list. The list is left empty. */
     663static void exec_cbs(rcu_item_t **phead)
     664{
     665        rcu_item_t *rcu_item = *phead;
     666
     667        while (rcu_item) {
     668                /* func() may free rcu_item. Get a local copy. */
     669                rcu_item_t *next = rcu_item->next;
     670                rcu_func_t func = rcu_item->func;
     671               
     672                func(rcu_item);
     673               
     674                rcu_item = next;
     675        }
     676       
     677        *phead = 0;
     678}
     679
     680static void upd_stat_cb_cnts(size_t arriving_cnt)
     681{
     682        CPU->rcu.stat_max_cbs = max(arriving_cnt, CPU->rcu.stat_max_cbs);
     683        if (0 < arriving_cnt) {
     684                CPU->rcu.stat_avg_cbs =
     685                        (99 * CPU->rcu.stat_avg_cbs + 1 * arriving_cnt) / 100;
     686        }
     687}
     688
     689
     690/** Prepares another batch of callbacks to dispatch at the nest grace period.
     691 *
     692 * @return True if the next batch of callbacks must be expedited quickly.
     693 */
     694static bool advance_cbs(void)
     695{
     696        /* Move next_cbs to cur_cbs. */
     697        CPU->rcu.cur_cbs = CPU->rcu.next_cbs;
     698        CPU->rcu.cur_cbs_gp = CPU->rcu.next_cbs_gp;
     699       
     700        /* Move arriving_cbs to next_cbs. Empties arriving_cbs. */
     701        ipl_t ipl = interrupts_disable();
     702
     703        /*
     704         * Too many callbacks queued. Better speed up the detection
     705         * or risk exhausting all system memory.
     706         */
     707        bool expedite = (EXPEDITE_THRESHOLD < CPU->rcu.arriving_cbs_cnt)
     708                || CPU->rcu.expedite_arriving; 
     709
     710        /* Update statistics. */
     711        upd_stat_cb_cnts(CPU->rcu.arriving_cbs_cnt);
     712               
     713        CPU->rcu.expedite_arriving = false;
     714        CPU->rcu.next_cbs = CPU->rcu.arriving_cbs;
     715        CPU->rcu.arriving_cbs = 0;
     716        CPU->rcu.parriving_cbs_tail = &CPU->rcu.arriving_cbs;
     717        CPU->rcu.arriving_cbs_cnt = 0;
     718       
     719        interrupts_restore(ipl);
     720       
     721        /*
     722         * Make changes prior to queuing next_cbs visible to readers.
     723         * See comment in wait_for_readers().
     724         */
     725        memory_barrier(); /* MB A, B */
     726
     727        /* At the end of next_cbs_gp, exec next_cbs. Determine what GP that is. */
     728       
     729        if (!next_cbs_empty()) {
     730                spinlock_lock(&rcu.gp_lock);
     731       
     732                /* Exec next_cbs at the end of the next GP. */
     733                CPU->rcu.next_cbs_gp = rcu.cur_gp + 1;
     734               
     735                /*
     736                 * There are no callbacks to invoke before next_cbs. Instruct
     737                 * wait_for_cur_cbs_gp() to notify us of the nearest GP end.
     738                 * That could be sooner than next_cbs_gp (if the current GP
     739                 * had not yet completed), so we'll create a shorter batch
     740                 * of callbacks next time around.
     741                 */
     742                if (cur_cbs_empty()) {
     743                        CPU->rcu.cur_cbs_gp = rcu.completed_gp + 1;
     744                }
     745               
     746                spinlock_unlock(&rcu.gp_lock);
     747        } else {
     748                CPU->rcu.next_cbs_gp = CPU->rcu.cur_cbs_gp;
     749        }
     750       
     751        ASSERT(CPU->rcu.cur_cbs_gp <= CPU->rcu.next_cbs_gp);
     752       
     753        return expedite;       
     754}
     755
     756/** Waits for the grace period associated with callbacks cub_cbs to elapse.
     757 *
     758 * @param expedite Instructs the detector to aggressively speed up grace
     759 *            period detection without any delay.
     760 * @param completed_gp Returns the most recent completed grace period
     761 *            number.
     762 * @return false if the thread was interrupted and should stop.
     763 */
     764static bool wait_for_cur_cbs_gp_end(bool expedite, rcu_gp_t *completed_gp)
     765{
     766        /*
     767         * Use a possibly outdated version of completed_gp to bypass checking
     768         * with the lock.
     769         *
     770         * Note that loading and storing rcu.completed_gp is not atomic
     771         * (it is 64bit wide). Reading a clobbered value that is less than
     772         * rcu.completed_gp is harmless - we'll recheck with a lock. The
     773         * only way to read a clobbered value that is greater than the actual
     774         * value is if the detector increases the higher-order word first and
     775         * then decreases the lower-order word (or we see stores in that order),
     776         * eg when incrementing from 2^32 - 1 to 2^32. The loaded value
     777         * suddenly jumps by 2^32. It would take hours for such an increase
     778         * to occur so it is safe to discard the value. We allow increases
     779         * of up to half the maximum to generously accommodate for loading an
     780         * outdated lower word.
     781         */
     782        rcu_gp_t compl_gp = ACCESS_ONCE(rcu.completed_gp);
     783        if (CPU->rcu.cur_cbs_gp <= compl_gp
     784                && compl_gp <= CPU->rcu.cur_cbs_gp + UINT32_MAX_HALF) {
     785                *completed_gp = compl_gp;
     786                return true;
     787        }
     788       
     789        spinlock_lock(&rcu.gp_lock);
     790       
     791        if (CPU->rcu.cur_cbs_gp <= rcu.completed_gp) {
     792                *completed_gp = rcu.completed_gp;
     793                spinlock_unlock(&rcu.gp_lock);
     794                return true;
     795        }
     796       
     797        ASSERT(CPU->rcu.cur_cbs_gp <= CPU->rcu.next_cbs_gp);
     798        ASSERT(rcu.cur_gp <= CPU->rcu.cur_cbs_gp);
     799       
     800        /*
     801         * Notify the detector of how many GP ends we intend to wait for, so
     802         * it can avoid going to sleep unnecessarily. Optimistically assume
     803         * new callbacks will arrive while we're waiting; hence +1.
     804         */
     805        size_t remaining_gp_ends = (size_t) (CPU->rcu.next_cbs_gp - rcu.cur_gp);
     806        req_detection(remaining_gp_ends + (arriving_cbs_empty() ? 0 : 1));
     807       
     808        /*
     809         * Ask the detector to speed up GP detection if there are too many
     810         * pending callbacks and other reclaimers have not already done so.
     811         */
     812        if (expedite) {
     813                if(0 == rcu.req_expedited_cnt)
     814                        condvar_signal(&rcu.expedite_now);
     815               
     816                /*
     817                 * Expedite only cub_cbs. If there really is a surge of callbacks
     818                 * the arriving batch will expedite the GP for the huge number
     819                 * of callbacks currently in next_cbs
     820                 */
     821                rcu.req_expedited_cnt = 1;
     822        }
     823
     824        /* Wait for cur_cbs_gp to end. */
     825        bool interrupted = cv_wait_for_gp(CPU->rcu.cur_cbs_gp);
     826
     827        *completed_gp = rcu.completed_gp;
     828        spinlock_unlock(&rcu.gp_lock); 
     829       
     830        return !interrupted;
     831}
     832
     833/** Requests the detector to detect at least req_cnt consecutive grace periods.*/
     834static void req_detection(size_t req_cnt)
     835{
     836        if (rcu.req_gp_end_cnt < req_cnt) {
     837                bool detector_idle = (0 == rcu.req_gp_end_cnt);
     838                rcu.req_gp_end_cnt = req_cnt;
     839
     840                //printf("reqs:%d,idle:%d ", req_cnt, detector_idle);
     841               
     842                if (detector_idle) {
     843                        ASSERT(rcu.cur_gp == rcu.completed_gp);
     844                        condvar_signal(&rcu.req_gp_changed);
     845                }
     846        } else {
     847                //printf("myreqs:%d,detr:%d ", req_cnt, rcu.req_gp_end_cnt);
     848        }
     849}
     850
     851/** Waits for an announcement of the end of the grace period wait_on_gp. */
     852static bool cv_wait_for_gp(rcu_gp_t wait_on_gp)
     853{
     854        ASSERT(spinlock_locked(&rcu.gp_lock));
     855       
     856        bool interrupted = false;
     857       
     858        /* Wait until wait_on_gp ends. */
     859        while (rcu.completed_gp < wait_on_gp && !interrupted) {
     860                int ret = _condvar_wait_timeout_spinlock(&rcu.gp_ended, &rcu.gp_lock,
     861                        SYNCH_NO_TIMEOUT, SYNCH_FLAGS_INTERRUPTIBLE);
     862                interrupted = (ret == ESYNCH_INTERRUPTED);
     863        }
     864       
     865        ASSERT(wait_on_gp <= rcu.completed_gp);
     866       
     867        return interrupted;
     868}
     869
     870/** The detector thread detects and notifies reclaimers of grace period ends. */
     871static void detector(void *arg)
     872{
     873        spinlock_lock(&rcu.gp_lock);
     874       
     875        while (wait_for_detect_req()) {
     876                /*
     877                 * Announce new GP started. Readers start lazily acknowledging that
     878                 * they passed a QS.
     879                 */
     880                start_new_gp();
     881               
     882                spinlock_unlock(&rcu.gp_lock);
     883               
     884                if (!wait_for_readers())
     885                        goto unlocked_out;
     886               
     887                spinlock_lock(&rcu.gp_lock);
     888
     889                /* Notify reclaimers that they may now invoke queued callbacks. */
     890                end_cur_gp();
     891        }
     892       
     893        spinlock_unlock(&rcu.gp_lock);
     894       
     895unlocked_out:
     896        return;
     897}
     898
     899/** Waits for a request from a reclaimer thread to detect a grace period. */
     900static bool wait_for_detect_req(void)
     901{
     902        ASSERT(spinlock_locked(&rcu.gp_lock));
     903       
     904        bool interrupted = false;
     905       
     906        while (0 == rcu.req_gp_end_cnt && !interrupted) {
     907                int ret = _condvar_wait_timeout_spinlock(&rcu.req_gp_changed,
     908                        &rcu.gp_lock, SYNCH_NO_TIMEOUT, SYNCH_FLAGS_INTERRUPTIBLE);
     909               
     910                interrupted = (ret == ESYNCH_INTERRUPTED);
     911        }
     912       
     913        return !interrupted;
     914}
     915
     916/** Announces the start of a new grace period for preexisting readers to ack. */
     917static void start_new_gp(void)
     918{
     919        ASSERT(spinlock_locked(&rcu.gp_lock));
     920       
     921        irq_spinlock_lock(&rcu.preempt_lock, true);
     922       
     923        /* Start a new GP. Announce to readers that a quiescent state is needed. */
     924        ++rcu.cur_gp;
     925       
     926        /*
     927         * Readers preempted before the start of this GP (next_preempted)
     928         * are preexisting readers now that a GP started and will hold up
     929         * the current GP until they exit their reader sections.
     930         *
     931         * Preempted readers from the previous GP have finished so
     932         * cur_preempted is empty, but see comment in record_qs().
     933         */
     934        list_append_list(&rcu.next_preempted, &rcu.cur_preempted);
     935       
     936        irq_spinlock_unlock(&rcu.preempt_lock, true);
     937}
     938
     939static void end_cur_gp(void)
     940{
     941        ASSERT(spinlock_locked(&rcu.gp_lock));
     942       
     943        rcu.completed_gp = rcu.cur_gp;
     944        --rcu.req_gp_end_cnt;
     945       
     946        condvar_broadcast(&rcu.gp_ended);
     947}
     948
     949/** Waits for readers that started before the current GP started to finish. */
     950static bool wait_for_readers(void)
     951{
     952        DEFINE_CPU_MASK(reading_cpus);
     953       
     954        /* All running cpus have potential readers. */
     955        cpu_mask_active(reading_cpus);
     956
     957        /*
     958         * Ensure the announcement of the start of a new GP (ie up-to-date
     959         * cur_gp) propagates to cpus that are just coming out of idle
     960         * mode before we sample their idle state flag.
     961         *
     962         * Cpus guarantee that after they set CPU->idle = true they will not
     963         * execute any RCU reader sections without first setting idle to
     964         * false and issuing a memory barrier. Therefore, if rm_quiescent_cpus()
     965         * later on sees an idle cpu, but the cpu is just exiting its idle mode,
     966         * the cpu must not have yet executed its memory barrier (otherwise
     967         * it would pair up with this mem barrier and we would see idle == false).
     968         * That memory barrier will pair up with the one below and ensure
     969         * that a reader on the now-non-idle cpu will see the most current
     970         * cur_gp. As a result, such a reader will never attempt to semaphore_up(
     971         * pending_readers) during this GP, which allows the detector to
     972         * ignore that cpu (the detector thinks it is idle). Moreover, any
     973         * changes made by RCU updaters will have propagated to readers
     974         * on the previously idle cpu -- again thanks to issuing a memory
     975         * barrier after returning from idle mode.
     976         *
     977         * idle -> non-idle cpu      | detector      | reclaimer
     978         * ------------------------------------------------------
     979         * rcu reader 1              |               | rcu_call()
     980         * MB X                      |               |
     981         * idle = true               |               | rcu_call()
     982         * (no rcu readers allowed ) |               | MB A in advance_cbs()
     983         * MB Y                      | (...)         | (...)
     984         * (no rcu readers allowed)  |               | MB B in advance_cbs()
     985         * idle = false              | ++cur_gp      |
     986         * (no rcu readers allowed)  | MB C          |
     987         * MB Z                      | signal gp_end |
     988         * rcu reader 2              |               | exec_cur_cbs()
     989         *
     990         *
     991         * MB Y orders visibility of changes to idle for detector's sake.
     992         *
     993         * MB Z pairs up with MB C. The cpu making a transition from idle
     994         * will see the most current value of cur_gp and will not attempt
     995         * to notify the detector even if preempted during this GP.
     996         *
     997         * MB Z pairs up with MB A from the previous batch. Updaters' changes
     998         * are visible to reader 2 even when the detector thinks the cpu is idle
     999         * but it is not anymore.
     1000         *
     1001         * MB X pairs up with MB B. Late mem accesses of reader 1 are contained
     1002         * and visible before idling and before any callbacks are executed
     1003         * by reclaimers.
     1004         *
     1005         * In summary, the detector does not know of or wait for reader 2, but
     1006         * it does not have to since it is a new reader that will not access
     1007         * data from previous GPs and will see any changes.
     1008         */
     1009        memory_barrier(); /* MB C */
     1010       
     1011        /*
     1012         * Give readers time to pass through a QS. Also, batch arriving
     1013         * callbacks in order to amortize detection overhead.
     1014         */
     1015        if (!gp_sleep())
     1016                return false;
     1017       
     1018        /* Non-intrusively determine which cpus have yet to pass a QS. */
     1019        rm_quiescent_cpus(reading_cpus);
     1020       
     1021        /* Actively interrupt cpus delaying the current GP and demand a QS. */
     1022        interrupt_delaying_cpus(reading_cpus);
     1023       
     1024        /* Wait for the interrupted cpus to notify us that they reached a QS. */
     1025        if (!wait_for_delaying_cpus())
     1026                return false;
     1027        /*
     1028         * All cpus recorded a QS or are still idle. Any new readers will be added
     1029         * to next_preempt if preempted, ie the number of readers in cur_preempted
     1030         * monotonically descreases.
     1031         */
     1032       
     1033        /* Wait for the last reader in cur_preempted to notify us it is done. */
     1034        if (!wait_for_preempt_reader())
     1035                return false;
     1036       
     1037        return true;
     1038}
     1039
     1040/** Remove those cpus from the mask that have already passed a quiescent
     1041 * state since the start of the current grace period.
     1042 */
     1043static void rm_quiescent_cpus(cpu_mask_t *cpu_mask)
     1044{
     1045        cpu_mask_for_each(*cpu_mask, cpu_id) {
     1046                /*
     1047                 * The cpu already checked for and passed through a quiescent
     1048                 * state since the beginning of this GP.
     1049                 *
     1050                 * rcu.cur_gp is modified by local detector thread only.
     1051                 * Therefore, it is up-to-date even without a lock.
     1052                 */
     1053                bool cpu_acked_gp = (cpus[cpu_id].rcu.last_seen_gp == rcu.cur_gp);
     1054               
     1055                /*
     1056                 * Either the cpu is idle or it is exiting away from idle mode
     1057                 * and already sees the most current rcu.cur_gp. See comment
     1058                 * in wait_for_readers().
     1059                 */
     1060                bool cpu_idle = cpus[cpu_id].idle;
     1061               
     1062                if (cpu_acked_gp || cpu_idle) {
     1063                        cpu_mask_reset(cpu_mask, cpu_id);
     1064                }
     1065        }
     1066}
     1067
     1068/** Sleeps a while if the current grace period is not to be expedited. */
     1069static bool gp_sleep(void)
     1070{
     1071        spinlock_lock(&rcu.gp_lock);
     1072
     1073        int ret = 0;
     1074        while (0 == rcu.req_expedited_cnt && 0 == ret) {
     1075                /* minor bug: sleeps for the same duration if woken up spuriously. */
     1076                ret = _condvar_wait_timeout_spinlock(&rcu.expedite_now, &rcu.gp_lock,
     1077                        DETECT_SLEEP_MS * 1000, SYNCH_FLAGS_INTERRUPTIBLE);
     1078        }
     1079       
     1080        if (0 < rcu.req_expedited_cnt) {
     1081                --rcu.req_expedited_cnt;
     1082                /* Update statistic. */
     1083                ++rcu.stat_expedited_cnt;
     1084        }
     1085       
     1086        spinlock_unlock(&rcu.gp_lock);
     1087       
     1088        return (ret != ESYNCH_INTERRUPTED);
     1089}
     1090
     1091/** Actively interrupts and checks the offending cpus for quiescent states. */
     1092static void interrupt_delaying_cpus(cpu_mask_t *cpu_mask)
     1093{
     1094        const size_t max_conconcurrent_calls = 16;
     1095        smp_call_t call[max_conconcurrent_calls];
     1096        size_t outstanding_calls = 0;
     1097       
     1098        atomic_set(&rcu.delaying_cpu_cnt, 0);
     1099       
     1100        cpu_mask_for_each(*cpu_mask, cpu_id) {
     1101                smp_call_async(cpu_id, sample_local_cpu, 0, &call[outstanding_calls]);
     1102                ++outstanding_calls;
     1103
     1104                /* Update statistic. */
     1105                if (CPU->id != cpu_id)
     1106                        ++rcu.stat_smp_call_cnt;
     1107               
     1108                if (outstanding_calls == max_conconcurrent_calls) {
     1109                        for (size_t k = 0; k < outstanding_calls; ++k) {
     1110                                smp_call_wait(&call[k]);
     1111                        }
     1112                       
     1113                        outstanding_calls = 0;
     1114                }
     1115        }
     1116       
     1117        for (size_t k = 0; k < outstanding_calls; ++k) {
     1118                smp_call_wait(&call[k]);
     1119        }
     1120}
     1121
     1122/** Invoked on a cpu delaying grace period detection.
     1123 *
     1124 * Induces a quiescent state for the cpu or it instructs remaining
     1125 * readers to notify the detector once they finish.
     1126 */
     1127static void sample_local_cpu(void *arg)
     1128{
     1129        ASSERT(interrupts_disabled());
     1130        ASSERT(!CPU->rcu.is_delaying_gp);
     1131       
     1132        /* Cpu did not pass a quiescent state yet. */
     1133        if (CPU->rcu.last_seen_gp != rcu.cur_gp) {
     1134                /* Interrupted a reader in a reader critical section. */
     1135                if (0 < (*CPU->rcu.pnesting_cnt)) {
     1136                        ASSERT(!CPU->idle);
     1137                        /* Note to notify the detector from rcu_read_unlock(). */
     1138                        CPU->rcu.is_delaying_gp = true;
     1139                        atomic_inc(&rcu.delaying_cpu_cnt);
     1140                } else {
     1141                        /*
     1142                         * The cpu did not enter any rcu reader sections since
     1143                         * the start of the current GP. Record a quiescent state.
     1144                         *
     1145                         * Or, we interrupted rcu_read_unlock_impl() right before
     1146                         * it recorded a QS. Record a QS for it. The memory barrier
     1147                         * contains the reader section's mem accesses before
     1148                         * updating last_seen_gp.
     1149                         *
     1150                         * Or, we interrupted rcu_read_lock() right after it recorded
     1151                         * a QS for the previous GP but before it got a chance to
     1152                         * increment its nesting count. The memory barrier again
     1153                         * stops the CS code from spilling out of the CS.
     1154                         */
     1155                        memory_barrier();
     1156                        CPU->rcu.last_seen_gp = rcu.cur_gp;
     1157                }
     1158        } else {
     1159                /*
     1160                 * This cpu already acknowledged that it had passed through
     1161                 * a quiescent state since the start of cur_gp.
     1162                 */
     1163        }
     1164       
     1165        /*
     1166         * smp_call() makes sure any changes propagate back to the caller.
     1167         * In particular, it makes the most current last_seen_gp visible
     1168         * to the detector.
     1169         */
     1170}
     1171
     1172/** Waits for cpus delaying the current grace period if there are any. */
     1173static bool wait_for_delaying_cpus(void)
     1174{
     1175        int delaying_cpu_cnt = atomic_get(&rcu.delaying_cpu_cnt);
     1176
     1177        for (int i = 0; i < delaying_cpu_cnt; ++i){
     1178                if (!semaphore_down_interruptable(&rcu.remaining_readers))
     1179                        return false;
     1180        }
     1181       
     1182        /* Update statistic. */
     1183        rcu.stat_delayed_cnt += delaying_cpu_cnt;
     1184       
     1185        return true;
     1186}
     1187
     1188/** Waits for any preempted readers blocking this grace period to finish.*/
     1189static bool wait_for_preempt_reader(void)
     1190{
     1191        irq_spinlock_lock(&rcu.preempt_lock, true);
     1192
     1193        bool reader_exists = !list_empty(&rcu.cur_preempted);
     1194        rcu.preempt_blocking_det = reader_exists;
     1195       
     1196        irq_spinlock_unlock(&rcu.preempt_lock, true);
     1197       
     1198        if (reader_exists) {
     1199                /* Update statistic. */
     1200                ++rcu.stat_preempt_blocking_cnt;
     1201               
     1202                return semaphore_down_interruptable(&rcu.remaining_readers);
     1203        }       
     1204       
     1205        return true;
     1206}
     1207
     1208/** Called by the scheduler() when switching away from the current thread. */
     1209void rcu_after_thread_ran(void)
     1210{
     1211        ASSERT(interrupts_disabled());
     1212        ASSERT(CPU->rcu.pnesting_cnt == &THREAD->rcu.nesting_cnt);
     1213       
     1214        /* Preempted a reader critical section for the first time. */
     1215        if (0 < THREAD->rcu.nesting_cnt && !THREAD->rcu.was_preempted) {
     1216                THREAD->rcu.was_preempted = true;
     1217               
     1218                irq_spinlock_lock(&rcu.preempt_lock, false);
     1219               
     1220                if (CPU->rcu.last_seen_gp != rcu.cur_gp) {
     1221                        /* The reader started before the GP started - we must wait for it.*/
     1222                        list_append(&THREAD->rcu.preempt_link, &rcu.cur_preempted);
     1223                } else {
     1224                        /*
     1225                         * The reader started after the GP started and this cpu
     1226                         * already noted a quiescent state. We might block the next GP.
     1227                         */
     1228                        list_append(&THREAD->rcu.preempt_link, &rcu.next_preempted);
     1229                }
     1230
     1231                irq_spinlock_unlock(&rcu.preempt_lock, false);
     1232        }
     1233       
     1234        /*
     1235         * The preempted reader has been noted globally. There are therefore
     1236         * no readers running on this cpu so this is a quiescent state.
     1237         */
     1238        record_qs();
     1239
     1240        /*
     1241         * This cpu is holding up the current GP. Let the detector know
     1242         * it has just passed a quiescent state.
     1243         *
     1244         * The detector waits separately for preempted readers, so we have
     1245         * to notify the detector even if we have just preempted a reader.
     1246         */
     1247        if (CPU->rcu.is_delaying_gp) {
     1248                CPU->rcu.is_delaying_gp = false;
     1249                semaphore_up(&rcu.remaining_readers);
     1250        }
     1251       
     1252        /*
     1253         * After this point THREAD is 0 and stays 0 until the scheduler()
     1254         * switches to a new thread. Use a temporary nesting counter for readers
     1255         * in handlers of interrupts that are raised while idle in the scheduler.
     1256         */
     1257        CPU->rcu.pnesting_cnt = &CPU->rcu.tmp_nesting_cnt;
     1258
     1259        /*
     1260         * Forcefully associate the detector with the highest priority
     1261         * even if preempted due to its time slice running out.
     1262         *
     1263         * todo: Replace with strict scheduler priority classes.
     1264         */
     1265        if (THREAD == rcu.detector_thr) {
     1266                THREAD->priority = -1;
     1267        }
     1268}
     1269
     1270/** Called by the scheduler() when switching to a newly scheduled thread. */
     1271void rcu_before_thread_runs(void)
     1272{
     1273        ASSERT(PREEMPTION_DISABLED || interrupts_disabled());
     1274        ASSERT(&CPU->rcu.tmp_nesting_cnt == CPU->rcu.pnesting_cnt);
     1275       
     1276        CPU->rcu.pnesting_cnt = &THREAD->rcu.nesting_cnt;
     1277}
     1278
     1279
     1280/** Prints RCU run-time statistics. */
     1281void rcu_print_stat(void)
     1282{
     1283        /* Don't take locks. Worst case is we get out-dated values. */
     1284        printf("Configuration: expedite_threshold=%d, detect_sleep=%dms\n",
     1285                EXPEDITE_THRESHOLD, DETECT_SLEEP_MS);
     1286        printf("Completed GPs: %" PRIu64 "\n", rcu.completed_gp);
     1287        printf("Expedited GPs: %zu\n", rcu.stat_expedited_cnt);
     1288        printf("Delayed GPs:   %zu (cpus w/ still running readers after gp sleep)\n",
     1289                rcu.stat_delayed_cnt);
     1290        printf("Preempt blocked GPs: %zu (waited for preempted readers; "
     1291                "running or not)\n", rcu.stat_delayed_cnt);
     1292       
     1293        printf("Max callbacks per GP:\n");
     1294        for (unsigned i = 0; i < config.cpu_count; ++i) {
     1295                printf(" %zu", cpus[i].rcu.stat_max_cbs);
     1296        }
     1297
     1298        printf("\nAvg callbacks per GP (nonempty batches only):\n");
     1299        for (unsigned i = 0; i < config.cpu_count; ++i) {
     1300                printf(" %zu", cpus[i].rcu.stat_avg_cbs);
     1301        }
     1302       
     1303        printf("\nMissed GP notifications:\n");
     1304        for (unsigned i = 0; i < config.cpu_count; ++i) {
     1305                printf(" %zu", cpus[i].rcu.stat_missed_gps);
     1306        }
     1307        printf("\n");
     1308}
    771309
    781310/** @}
    791311 */
    80  
    81 
Note: See TracChangeset for help on using the changeset viewer.