source: mainline/kernel/generic/src/synch/rcu.c@ d4d36f9

lfn serial ticket/834-toolchain-update topic/msim-upgrade topic/simplify-dev-export
Last change on this file since d4d36f9 was d4d36f9, checked in by Adam Hraska <adam.hraska+hos@…>, 13 years ago

rcu: Added another preemptible kernel rcu - A-RCU.

  • Property mode set to 100644
File size: 49.5 KB
RevLine 
[79d74fe]1/*
2 * Copyright (c) 2012 Adam Hraska
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 *
9 * - Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 * - Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
14 * - The name of the author may not be used to endorse or promote products
15 * derived from this software without specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
18 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
19 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
20 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
21 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
22 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
26 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27 */
28
29
30/** @addtogroup sync
31 * @{
32 */
33
34/**
35 * @file
[181a746]36 * @brief Preemptible read-copy update. Usable from interrupt handlers.
[79d74fe]37 */
38
39#include <synch/rcu.h>
[181a746]40#include <synch/condvar.h>
41#include <synch/semaphore.h>
42#include <synch/spinlock.h>
[4ec9ea41]43#include <synch/mutex.h>
[181a746]44#include <proc/thread.h>
45#include <cpu/cpu_mask.h>
46#include <cpu.h>
47#include <smp/smp_call.h>
48#include <compiler/barrier.h>
49#include <atomic.h>
50#include <arch.h>
51#include <macros.h>
[79d74fe]52
[181a746]53/*
54 * Number of milliseconds to give to preexisting readers to finish
55 * when non-expedited grace period detection is in progress.
56 */
[0cf813d]57#define DETECT_SLEEP_MS 10
[181a746]58/*
59 * Max number of pending callbacks in the local cpu's queue before
60 * aggressively expediting the current grace period
61 */
[0cf813d]62#define EXPEDITE_THRESHOLD 2000
63/*
64 * Max number of callbacks to execute in one go with preemption
65 * enabled. If there are more callbacks to be executed they will
66 * be run with preemption disabled in order to prolong reclaimer's
67 * time slice and give it a chance to catch up with callback producers.
68 */
69#define CRITICAL_THRESHOLD 30000
[181a746]70/* Half the number of values a uint32 can hold. */
71#define UINT32_MAX_HALF 2147483648U
[79d74fe]72
[8e3ed06]73/**
74 * The current grace period number. Increases monotonically.
75 * Lock rcu.gp_lock or rcu.preempt_lock to get a current value.
76 */
77rcu_gp_t _rcu_cur_gp;
[181a746]78
[0cf813d]79/** Global RCU data. */
[181a746]80typedef struct rcu_data {
81 /** Detector uses so signal reclaimers that a grace period ended. */
82 condvar_t gp_ended;
83 /** Reclaimers use to notify the detector to accelerate GP detection. */
84 condvar_t expedite_now;
85 /**
[d4d36f9]86 * Protects: req_gp_end_cnt, req_expedited_cnt, completed_gp, _rcu_cur_gp;
87 * or: completed_gp, _rcu_cur_gp
[181a746]88 */
89 SPINLOCK_DECLARE(gp_lock);
90 /**
[8e3ed06]91 * The number of the most recently completed grace period. At most
92 * one behind _rcu_cur_gp. If equal to _rcu_cur_gp, a grace period
93 * detection is not in progress and the detector is idle.
[181a746]94 */
95 rcu_gp_t completed_gp;
96
[0cf813d]97 /** Protects the following 3 fields. */
[181a746]98 IRQ_SPINLOCK_DECLARE(preempt_lock);
99 /** Preexisting readers that have been preempted. */
100 list_t cur_preempted;
101 /** Reader that have been preempted and might delay the next grace period.*/
102 list_t next_preempted;
103 /**
104 * The detector is waiting for the last preempted reader
105 * in cur_preempted to announce that it exited its reader
106 * section by up()ing remaining_readers.
107 */
108 bool preempt_blocking_det;
109
[d4d36f9]110#ifdef RCU_PREEMPT_A
111
112 /**
113 * The detector waits on this semaphore for any preempted readers
114 * delaying the grace period once all cpus pass a quiescent state.
115 */
116 semaphore_t remaining_readers;
117
118#elif defined(RCU_PREEMPT_PODZIMEK)
119
120 /** Reclaimers notify the detector when they request more grace periods.*/
121 condvar_t req_gp_changed;
122 /** Number of grace period ends the detector was requested to announce. */
123 size_t req_gp_end_cnt;
124 /** Number of consecutive grace periods to detect quickly and aggressively.*/
125 size_t req_expedited_cnt;
[181a746]126 /**
127 * Number of cpus with readers that are delaying the current GP.
128 * They will up() remaining_readers.
129 */
130 atomic_t delaying_cpu_cnt;
[d4d36f9]131 /**
132 * The detector waits on this semaphore for any readers delaying the GP.
133 *
134 * Each of the cpus with readers that are delaying the current GP
135 * must up() this sema once they reach a quiescent state. If there
136 * are any readers in cur_preempted (ie preempted preexisting) and
137 * they are already delaying GP detection, the last to unlock its
138 * reader section must up() this sema once.
139 */
140 semaphore_t remaining_readers;
141#endif
[181a746]142
[4ec9ea41]143 /** Excludes simultaneous rcu_barrier() calls. */
144 mutex_t barrier_mtx;
145 /** Number of cpus that we are waiting for to complete rcu_barrier(). */
146 atomic_t barrier_wait_cnt;
147 /** rcu_barrier() waits for the completion of barrier callbacks on this wq.*/
148 waitq_t barrier_wq;
149
[0cf813d]150 /** Interruptible attached detector thread pointer. */
[181a746]151 thread_t *detector_thr;
152
153 /* Some statistics. */
154 size_t stat_expedited_cnt;
155 size_t stat_delayed_cnt;
156 size_t stat_preempt_blocking_cnt;
157 /* Does not contain self/local calls. */
158 size_t stat_smp_call_cnt;
159} rcu_data_t;
160
161
162static rcu_data_t rcu;
163
164static void start_reclaimers(void);
165static void synch_complete(rcu_item_t *rcu_item);
[4ec9ea41]166static void add_barrier_cb(void *arg);
167static void barrier_complete(rcu_item_t *barrier_item);
[181a746]168static bool arriving_cbs_empty(void);
169static bool next_cbs_empty(void);
170static bool cur_cbs_empty(void);
171static bool all_cbs_empty(void);
172static void reclaimer(void *arg);
173static bool wait_for_pending_cbs(void);
174static bool advance_cbs(void);
175static void exec_completed_cbs(rcu_gp_t last_completed_gp);
176static void exec_cbs(rcu_item_t **phead);
177static bool wait_for_cur_cbs_gp_end(bool expedite, rcu_gp_t *last_completed_gp);
[0cf813d]178static void upd_missed_gp_in_wait(rcu_gp_t completed_gp);
[d4d36f9]179
180#ifdef RCU_PREEMPT_PODZIMEK
181static void start_detector(void);
182static void read_unlock_impl(size_t *pnesting_cnt);
183static void req_detection(size_t req_cnt);
[181a746]184static bool cv_wait_for_gp(rcu_gp_t wait_on_gp);
185static void detector(void *);
186static bool wait_for_detect_req(void);
187static void end_cur_gp(void);
188static bool wait_for_readers(void);
189static bool gp_sleep(void);
190static void interrupt_delaying_cpus(cpu_mask_t *cpu_mask);
191static bool wait_for_delaying_cpus(void);
[d4d36f9]192#elif defined(RCU_PREEMPT_A)
193static bool wait_for_readers(bool expedite);
194static bool gp_sleep(bool *expedite);
195#endif
196
197static void start_new_gp(void);
198static void rm_quiescent_cpus(cpu_mask_t *cpu_mask);
199static void sample_cpus(cpu_mask_t *reader_cpus, void *arg);
200static void sample_local_cpu(void *);
[181a746]201static bool wait_for_preempt_reader(void);
[d4d36f9]202static void note_preempted_reader(void);
203static void rm_preempted_reader(void);
[0cf813d]204static void upd_max_cbs_in_slice(void);
[181a746]205
206
207
208/** Initializes global RCU structures. */
209void rcu_init(void)
210{
211 condvar_initialize(&rcu.gp_ended);
212 condvar_initialize(&rcu.expedite_now);
[d4d36f9]213
[181a746]214 spinlock_initialize(&rcu.gp_lock, "rcu.gp_lock");
[8e3ed06]215 _rcu_cur_gp = 0;
[181a746]216 rcu.completed_gp = 0;
217
218 irq_spinlock_initialize(&rcu.preempt_lock, "rcu.preempt_lock");
219 list_initialize(&rcu.cur_preempted);
220 list_initialize(&rcu.next_preempted);
221 rcu.preempt_blocking_det = false;
222
[4ec9ea41]223 mutex_initialize(&rcu.barrier_mtx, MUTEX_PASSIVE);
224 atomic_set(&rcu.barrier_wait_cnt, 0);
225 waitq_initialize(&rcu.barrier_wq);
[d4d36f9]226
227 semaphore_initialize(&rcu.remaining_readers, 0);
228
229#ifdef RCU_PREEMPT_PODZIMEK
230 condvar_initialize(&rcu.req_gp_changed);
[4ec9ea41]231
[d4d36f9]232 rcu.req_gp_end_cnt = 0;
233 rcu.req_expedited_cnt = 0;
[181a746]234 atomic_set(&rcu.delaying_cpu_cnt, 0);
[d4d36f9]235#endif
[181a746]236
237 rcu.detector_thr = 0;
238
239 rcu.stat_expedited_cnt = 0;
240 rcu.stat_delayed_cnt = 0;
241 rcu.stat_preempt_blocking_cnt = 0;
242 rcu.stat_smp_call_cnt = 0;
243}
244
245/** Initializes per-CPU RCU data. If on the boot cpu inits global data too.*/
246void rcu_cpu_init(void)
247{
248 if (config.cpu_active == 1) {
249 rcu_init();
250 }
[d4d36f9]251
[181a746]252 CPU->rcu.last_seen_gp = 0;
[d4d36f9]253
254#ifdef RCU_PREEMPT_PODZIMEK
[5b03a72]255 CPU->rcu.nesting_cnt = 0;
[d4d36f9]256 CPU->rcu.is_delaying_gp = false;
257 CPU->rcu.signal_unlock = false;
258#endif
[181a746]259
260 CPU->rcu.cur_cbs = 0;
[0cf813d]261 CPU->rcu.cur_cbs_cnt = 0;
[181a746]262 CPU->rcu.next_cbs = 0;
[0cf813d]263 CPU->rcu.next_cbs_cnt = 0;
[181a746]264 CPU->rcu.arriving_cbs = 0;
265 CPU->rcu.parriving_cbs_tail = &CPU->rcu.arriving_cbs;
266 CPU->rcu.arriving_cbs_cnt = 0;
267
268 CPU->rcu.cur_cbs_gp = 0;
269 CPU->rcu.next_cbs_gp = 0;
270
271 semaphore_initialize(&CPU->rcu.arrived_flag, 0);
[0cf813d]272
273 /* BSP creates reclaimer threads before AP's rcu_cpu_init() runs. */
274 if (config.cpu_active == 1)
275 CPU->rcu.reclaimer_thr = 0;
[181a746]276
277 CPU->rcu.stat_max_cbs = 0;
278 CPU->rcu.stat_avg_cbs = 0;
279 CPU->rcu.stat_missed_gps = 0;
[0cf813d]280 CPU->rcu.stat_missed_gp_in_wait = 0;
281 CPU->rcu.stat_max_slice_cbs = 0;
282 CPU->rcu.last_arriving_cnt = 0;
[181a746]283}
284
285/** Completes RCU init. Creates and runs the detector and reclaimer threads.*/
286void rcu_kinit_init(void)
287{
[d4d36f9]288#ifdef RCU_PREEMPT_PODZIMEK
[181a746]289 start_detector();
[d4d36f9]290#endif
291
[181a746]292 start_reclaimers();
293}
294
295/** Initializes any per-thread RCU structures. */
296void rcu_thread_init(thread_t *thread)
297{
298 thread->rcu.nesting_cnt = 0;
[d4d36f9]299
300#ifdef RCU_PREEMPT_PODZIMEK
[181a746]301 thread->rcu.was_preempted = false;
[d4d36f9]302#endif
303
[181a746]304 link_initialize(&thread->rcu.preempt_link);
305}
306
[79d74fe]307
[181a746]308/** Cleans up global RCU resources and stops dispatching callbacks.
309 *
310 * Call when shutting down the kernel. Outstanding callbacks will
311 * not be processed. Instead they will linger forever.
312 */
313void rcu_stop(void)
314{
315 /* Stop and wait for reclaimers. */
316 for (unsigned int cpu_id = 0; cpu_id < config.cpu_active; ++cpu_id) {
317 ASSERT(cpus[cpu_id].rcu.reclaimer_thr != 0);
318
319 if (cpus[cpu_id].rcu.reclaimer_thr) {
320 thread_interrupt(cpus[cpu_id].rcu.reclaimer_thr);
321 thread_join(cpus[cpu_id].rcu.reclaimer_thr);
322 thread_detach(cpus[cpu_id].rcu.reclaimer_thr);
323 cpus[cpu_id].rcu.reclaimer_thr = 0;
324 }
325 }
326
[d4d36f9]327#ifdef RCU_PREEMPT_PODZIMEK
[181a746]328 /* Stop the detector and wait. */
329 if (rcu.detector_thr) {
330 thread_interrupt(rcu.detector_thr);
331 thread_join(rcu.detector_thr);
332 thread_detach(rcu.detector_thr);
333 rcu.detector_thr = 0;
334 }
[d4d36f9]335#endif
[181a746]336}
[79d74fe]337
[d4d36f9]338/** Returns the number of elapsed grace periods since boot. */
339uint64_t rcu_completed_gps(void)
[181a746]340{
[d4d36f9]341 spinlock_lock(&rcu.gp_lock);
342 uint64_t completed = rcu.completed_gp;
343 spinlock_unlock(&rcu.gp_lock);
[181a746]344
[d4d36f9]345 return completed;
[181a746]346}
347
348/** Creates and runs cpu-bound reclaimer threads. */
349static void start_reclaimers(void)
350{
351 for (unsigned int cpu_id = 0; cpu_id < config.cpu_count; ++cpu_id) {
352 char name[THREAD_NAME_BUFLEN] = {0};
353
354 snprintf(name, THREAD_NAME_BUFLEN - 1, "rcu-rec/%u", cpu_id);
355
356 cpus[cpu_id].rcu.reclaimer_thr =
357 thread_create(reclaimer, 0, TASK, THREAD_FLAG_NONE, name);
358
359 if (!cpus[cpu_id].rcu.reclaimer_thr)
360 panic("Failed to create RCU reclaimer thread on cpu%u.", cpu_id);
361
362 thread_wire(cpus[cpu_id].rcu.reclaimer_thr, &cpus[cpu_id]);
363 thread_ready(cpus[cpu_id].rcu.reclaimer_thr);
364 }
365}
366
[d4d36f9]367#ifdef RCU_PREEMPT_PODZIMEK
368
369/** Starts the detector thread. */
370static void start_detector(void)
[181a746]371{
[d4d36f9]372 rcu.detector_thr =
373 thread_create(detector, 0, TASK, THREAD_FLAG_NONE, "rcu-det");
[181a746]374
[d4d36f9]375 if (!rcu.detector_thr)
376 panic("Failed to create RCU detector thread.");
377
378 thread_ready(rcu.detector_thr);
[181a746]379}
380
[4a6da62]381/** Returns true if in an rcu reader section. */
382bool rcu_read_locked(void)
383{
384 preemption_disable();
[5b03a72]385 bool locked = 0 < CPU->rcu.nesting_cnt;
[4a6da62]386 preemption_enable();
387
388 return locked;
389}
390
[181a746]391/** Unlocks the local reader section using the given nesting count.
392 *
393 * Preemption or interrupts must be disabled.
394 *
395 * @param pnesting_cnt Either &CPU->rcu.tmp_nesting_cnt or
396 * THREAD->rcu.nesting_cnt.
[79d74fe]397 */
[8e3ed06]398static void read_unlock_impl(size_t *pnesting_cnt)
[181a746]399{
400 ASSERT(PREEMPTION_DISABLED || interrupts_disabled());
401
402 if (0 == --(*pnesting_cnt)) {
[8e3ed06]403 _rcu_record_qs();
[181a746]404
405 /*
406 * The thread was preempted while in a critical section or
407 * the detector is eagerly waiting for this cpu's reader
408 * to finish.
409 *
410 * Note that THREAD may be 0 in scheduler() and not just during boot.
411 */
412 if ((THREAD && THREAD->rcu.was_preempted) || CPU->rcu.is_delaying_gp) {
413 /* Rechecks with disabled interrupts. */
[8e3ed06]414 _rcu_signal_read_unlock();
[181a746]415 }
416 }
417}
418
419/** If necessary, signals the detector that we exited a reader section. */
[8e3ed06]420void _rcu_signal_read_unlock(void)
[181a746]421{
422 ASSERT(PREEMPTION_DISABLED || interrupts_disabled());
423
[f0fcb04]424 /* todo: make NMI safe with cpu-local atomic ops. */
425
[181a746]426 /*
427 * We have to disable interrupts in order to make checking
428 * and resetting was_preempted and is_delaying_gp atomic
429 * with respect to local interrupt handlers. Otherwise
430 * an interrupt could beat us to calling semaphore_up()
431 * before we reset the appropriate flag.
432 */
433 ipl_t ipl = interrupts_disable();
434
435 /*
436 * If the detector is eagerly waiting for this cpu's reader to unlock,
437 * notify it that the reader did so.
438 */
439 if (CPU->rcu.is_delaying_gp) {
440 CPU->rcu.is_delaying_gp = false;
441 semaphore_up(&rcu.remaining_readers);
442 }
443
444 /*
445 * This reader was preempted while in a reader section.
446 * We might be holding up the current GP. Notify the
447 * detector if so.
448 */
449 if (THREAD && THREAD->rcu.was_preempted) {
450 ASSERT(link_used(&THREAD->rcu.preempt_link));
451 THREAD->rcu.was_preempted = false;
452
[d4d36f9]453 rm_preempted_reader();
[181a746]454 }
[f0fcb04]455
456 /* If there was something to signal to the detector we have done so. */
457 CPU->rcu.signal_unlock = false;
458
[181a746]459 interrupts_restore(ipl);
460}
461
[d4d36f9]462#endif /* RCU_PREEMPT_PODZIMEK */
463
[181a746]464typedef struct synch_item {
465 waitq_t wq;
466 rcu_item_t rcu_item;
467} synch_item_t;
468
469/** Blocks until all preexisting readers exit their critical sections. */
[79d74fe]470void rcu_synchronize(void)
[4ec9ea41]471{
472 _rcu_synchronize(false);
473}
474
475/** Blocks until all preexisting readers exit their critical sections. */
476void rcu_synchronize_expedite(void)
477{
478 _rcu_synchronize(true);
479}
480
481/** Blocks until all preexisting readers exit their critical sections. */
482void _rcu_synchronize(bool expedite)
[79d74fe]483{
[181a746]484 /* Calling from a reader section will deadlock. */
[d4d36f9]485 ASSERT(!rcu_read_locked());
[181a746]486
487 synch_item_t completion;
488
489 waitq_initialize(&completion.wq);
[4ec9ea41]490 _rcu_call(expedite, &completion.rcu_item, synch_complete);
[181a746]491 waitq_sleep(&completion.wq);
492 waitq_complete_wakeup(&completion.wq);
[79d74fe]493}
494
[181a746]495/** rcu_synchronize's callback. */
496static void synch_complete(rcu_item_t *rcu_item)
497{
498 synch_item_t *completion = member_to_inst(rcu_item, synch_item_t, rcu_item);
499 ASSERT(completion);
500 waitq_wakeup(&completion->wq, WAKEUP_FIRST);
501}
[79d74fe]502
[4ec9ea41]503/** Waits for all outstanding rcu calls to complete. */
504void rcu_barrier(void)
505{
506 /*
507 * Serialize rcu_barrier() calls so we don't overwrite cpu.barrier_item
508 * currently in use by rcu_barrier().
509 */
510 mutex_lock(&rcu.barrier_mtx);
511
512 /*
513 * Ensure we queue a barrier callback on all cpus before the already
514 * enqueued barrier callbacks start signaling completion.
515 */
516 atomic_set(&rcu.barrier_wait_cnt, 1);
517
518 DEFINE_CPU_MASK(cpu_mask);
519 cpu_mask_active(cpu_mask);
520
521 cpu_mask_for_each(*cpu_mask, cpu_id) {
522 smp_call(cpu_id, add_barrier_cb, 0);
523 }
524
525 if (0 < atomic_predec(&rcu.barrier_wait_cnt)) {
526 waitq_sleep(&rcu.barrier_wq);
527 }
528
529 mutex_unlock(&rcu.barrier_mtx);
530}
531
532/** Issues a rcu_barrier() callback on the local cpu.
533 *
534 * Executed with interrupts disabled.
535 */
536static void add_barrier_cb(void *arg)
537{
538 ASSERT(interrupts_disabled() || PREEMPTION_DISABLED);
539 atomic_inc(&rcu.barrier_wait_cnt);
540 rcu_call(&CPU->rcu.barrier_item, barrier_complete);
541}
542
543/** Local cpu's rcu_barrier() completion callback. */
544static void barrier_complete(rcu_item_t *barrier_item)
545{
546 /* Is this the last barrier callback completed? */
547 if (0 == atomic_predec(&rcu.barrier_wait_cnt)) {
548 /* Notify rcu_barrier() that we're done. */
549 waitq_wakeup(&rcu.barrier_wq, WAKEUP_FIRST);
550 }
551}
552
[181a746]553/** Adds a callback to invoke after all preexisting readers finish.
554 *
555 * May be called from within interrupt handlers or RCU reader sections.
556 *
557 * @param rcu_item Used by RCU to track the call. Must remain
558 * until the user callback function is entered.
559 * @param func User callback function that will be invoked once a full
560 * grace period elapsed, ie at a time when all preexisting
561 * readers have finished. The callback should be short and must
562 * not block. If you must sleep, enqueue your work in the system
563 * work queue from the callback (ie workq_global_enqueue()).
[79d74fe]564 */
[181a746]565void rcu_call(rcu_item_t *rcu_item, rcu_func_t func)
[79d74fe]566{
[181a746]567 _rcu_call(false, rcu_item, func);
[79d74fe]568}
569
[181a746]570/** rcu_call() implementation. See rcu_call() for comments. */
571void _rcu_call(bool expedite, rcu_item_t *rcu_item, rcu_func_t func)
572{
573 ASSERT(rcu_item);
574
575 rcu_item->func = func;
576 rcu_item->next = 0;
577
578 preemption_disable();
579
580 ipl_t ipl = interrupts_disable();
[79d74fe]581
[181a746]582 *CPU->rcu.parriving_cbs_tail = rcu_item;
583 CPU->rcu.parriving_cbs_tail = &rcu_item->next;
584
585 size_t cnt = ++CPU->rcu.arriving_cbs_cnt;
586 interrupts_restore(ipl);
587
588 if (expedite) {
589 CPU->rcu.expedite_arriving = true;
590 }
591
592 /* Added first callback - notify the reclaimer. */
593 if (cnt == 1 && !semaphore_count_get(&CPU->rcu.arrived_flag)) {
594 semaphore_up(&CPU->rcu.arrived_flag);
595 }
596
597 preemption_enable();
598}
599
600static bool cur_cbs_empty(void)
601{
602 ASSERT(THREAD && THREAD->wired);
603 return 0 == CPU->rcu.cur_cbs;
604}
605
606static bool next_cbs_empty(void)
607{
608 ASSERT(THREAD && THREAD->wired);
609 return 0 == CPU->rcu.next_cbs;
610}
611
612/** Disable interrupts to get an up-to-date result. */
613static bool arriving_cbs_empty(void)
614{
615 ASSERT(THREAD && THREAD->wired);
616 /*
617 * Accessing with interrupts enabled may at worst lead to
618 * a false negative if we race with a local interrupt handler.
619 */
620 return 0 == CPU->rcu.arriving_cbs;
621}
622
623static bool all_cbs_empty(void)
624{
625 return cur_cbs_empty() && next_cbs_empty() && arriving_cbs_empty();
626}
627
[d4d36f9]628
[181a746]629/** Reclaimer thread dispatches locally queued callbacks once a GP ends. */
630static void reclaimer(void *arg)
631{
632 ASSERT(THREAD && THREAD->wired);
[0cf813d]633 ASSERT(THREAD == CPU->rcu.reclaimer_thr);
[181a746]634
635 rcu_gp_t last_compl_gp = 0;
636 bool ok = true;
637
638 while (ok && wait_for_pending_cbs()) {
[0cf813d]639 ASSERT(CPU->rcu.reclaimer_thr == THREAD);
[181a746]640
[0cf813d]641 exec_completed_cbs(last_compl_gp);
642
[181a746]643 bool expedite = advance_cbs();
644
645 ok = wait_for_cur_cbs_gp_end(expedite, &last_compl_gp);
646 }
647}
648
649/** Waits until there are callbacks waiting to be dispatched. */
650static bool wait_for_pending_cbs(void)
651{
652 if (!all_cbs_empty())
653 return true;
654
655 bool ok = true;
656
657 while (arriving_cbs_empty() && ok) {
658 ok = semaphore_down_interruptable(&CPU->rcu.arrived_flag);
659 }
660
661 return ok;
662}
663
[b68ae24]664static void upd_stat_missed_gp(rcu_gp_t compl)
[181a746]665{
[b68ae24]666 if (CPU->rcu.cur_cbs_gp < compl) {
667 CPU->rcu.stat_missed_gps += (size_t)(compl - CPU->rcu.cur_cbs_gp);
[181a746]668 }
669}
670
671/** Executes all callbacks for the given completed grace period. */
672static void exec_completed_cbs(rcu_gp_t last_completed_gp)
673{
[b68ae24]674 upd_stat_missed_gp(last_completed_gp);
675
[0cf813d]676 /* Both next_cbs and cur_cbs GP elapsed. */
[181a746]677 if (CPU->rcu.next_cbs_gp <= last_completed_gp) {
[0cf813d]678 ASSERT(CPU->rcu.cur_cbs_gp <= CPU->rcu.next_cbs_gp);
679
680 size_t exec_cnt = CPU->rcu.cur_cbs_cnt + CPU->rcu.next_cbs_cnt;
681
682 if (exec_cnt < CRITICAL_THRESHOLD) {
683 exec_cbs(&CPU->rcu.cur_cbs);
684 exec_cbs(&CPU->rcu.next_cbs);
685 } else {
686 /*
687 * Getting overwhelmed with too many callbacks to run.
688 * Disable preemption in order to prolong our time slice
689 * and catch up with updaters posting new callbacks.
690 */
691 preemption_disable();
692 exec_cbs(&CPU->rcu.cur_cbs);
693 exec_cbs(&CPU->rcu.next_cbs);
694 preemption_enable();
695 }
696
697 CPU->rcu.cur_cbs_cnt = 0;
698 CPU->rcu.next_cbs_cnt = 0;
699 } else if (CPU->rcu.cur_cbs_gp <= last_completed_gp) {
700
701 if (CPU->rcu.cur_cbs_cnt < CRITICAL_THRESHOLD) {
702 exec_cbs(&CPU->rcu.cur_cbs);
703 } else {
704 /*
705 * Getting overwhelmed with too many callbacks to run.
706 * Disable preemption in order to prolong our time slice
707 * and catch up with updaters posting new callbacks.
708 */
709 preemption_disable();
710 exec_cbs(&CPU->rcu.cur_cbs);
711 preemption_enable();
712 }
713
714 CPU->rcu.cur_cbs_cnt = 0;
[181a746]715 }
716}
717
718/** Executes callbacks in the single-linked list. The list is left empty. */
719static void exec_cbs(rcu_item_t **phead)
720{
721 rcu_item_t *rcu_item = *phead;
722
723 while (rcu_item) {
724 /* func() may free rcu_item. Get a local copy. */
725 rcu_item_t *next = rcu_item->next;
726 rcu_func_t func = rcu_item->func;
727
728 func(rcu_item);
729
730 rcu_item = next;
731 }
732
733 *phead = 0;
734}
735
736static void upd_stat_cb_cnts(size_t arriving_cnt)
737{
738 CPU->rcu.stat_max_cbs = max(arriving_cnt, CPU->rcu.stat_max_cbs);
739 if (0 < arriving_cnt) {
740 CPU->rcu.stat_avg_cbs =
741 (99 * CPU->rcu.stat_avg_cbs + 1 * arriving_cnt) / 100;
742 }
743}
744
745/** Prepares another batch of callbacks to dispatch at the nest grace period.
746 *
747 * @return True if the next batch of callbacks must be expedited quickly.
[79d74fe]748 */
[181a746]749static bool advance_cbs(void)
750{
751 /* Move next_cbs to cur_cbs. */
752 CPU->rcu.cur_cbs = CPU->rcu.next_cbs;
[0cf813d]753 CPU->rcu.cur_cbs_cnt = CPU->rcu.next_cbs_cnt;
[181a746]754 CPU->rcu.cur_cbs_gp = CPU->rcu.next_cbs_gp;
755
756 /* Move arriving_cbs to next_cbs. Empties arriving_cbs. */
757 ipl_t ipl = interrupts_disable();
758
759 /*
760 * Too many callbacks queued. Better speed up the detection
761 * or risk exhausting all system memory.
762 */
763 bool expedite = (EXPEDITE_THRESHOLD < CPU->rcu.arriving_cbs_cnt)
764 || CPU->rcu.expedite_arriving;
765
766 CPU->rcu.expedite_arriving = false;
[0cf813d]767
[181a746]768 CPU->rcu.next_cbs = CPU->rcu.arriving_cbs;
[0cf813d]769 CPU->rcu.next_cbs_cnt = CPU->rcu.arriving_cbs_cnt;
770
[181a746]771 CPU->rcu.arriving_cbs = 0;
772 CPU->rcu.parriving_cbs_tail = &CPU->rcu.arriving_cbs;
773 CPU->rcu.arriving_cbs_cnt = 0;
774
775 interrupts_restore(ipl);
[0cf813d]776
777 /* Update statistics of arrived callbacks. */
778 upd_stat_cb_cnts(CPU->rcu.next_cbs_cnt);
[181a746]779
780 /*
781 * Make changes prior to queuing next_cbs visible to readers.
782 * See comment in wait_for_readers().
783 */
784 memory_barrier(); /* MB A, B */
785
786 /* At the end of next_cbs_gp, exec next_cbs. Determine what GP that is. */
787
788 if (!next_cbs_empty()) {
789 spinlock_lock(&rcu.gp_lock);
790
791 /* Exec next_cbs at the end of the next GP. */
[8e3ed06]792 CPU->rcu.next_cbs_gp = _rcu_cur_gp + 1;
[181a746]793
794 /*
795 * There are no callbacks to invoke before next_cbs. Instruct
796 * wait_for_cur_cbs_gp() to notify us of the nearest GP end.
797 * That could be sooner than next_cbs_gp (if the current GP
798 * had not yet completed), so we'll create a shorter batch
799 * of callbacks next time around.
800 */
801 if (cur_cbs_empty()) {
802 CPU->rcu.cur_cbs_gp = rcu.completed_gp + 1;
803 }
804
805 spinlock_unlock(&rcu.gp_lock);
806 } else {
807 CPU->rcu.next_cbs_gp = CPU->rcu.cur_cbs_gp;
808 }
809
810 ASSERT(CPU->rcu.cur_cbs_gp <= CPU->rcu.next_cbs_gp);
811
812 return expedite;
813}
814
[d4d36f9]815
816#ifdef RCU_PREEMPT_A
817
[181a746]818/** Waits for the grace period associated with callbacks cub_cbs to elapse.
819 *
820 * @param expedite Instructs the detector to aggressively speed up grace
821 * period detection without any delay.
822 * @param completed_gp Returns the most recent completed grace period
823 * number.
824 * @return false if the thread was interrupted and should stop.
825 */
826static bool wait_for_cur_cbs_gp_end(bool expedite, rcu_gp_t *completed_gp)
827{
828 spinlock_lock(&rcu.gp_lock);
[d4d36f9]829
[181a746]830 ASSERT(CPU->rcu.cur_cbs_gp <= CPU->rcu.next_cbs_gp);
[d4d36f9]831 ASSERT(CPU->rcu.cur_cbs_gp <= _rcu_cur_gp + 1);
[181a746]832
[d4d36f9]833 while (rcu.completed_gp < CPU->rcu.cur_cbs_gp) {
834 /* GP has not yet started - start a new one. */
835 if (rcu.completed_gp == _rcu_cur_gp) {
836 start_new_gp();
837 spinlock_unlock(&rcu.gp_lock);
[181a746]838
[d4d36f9]839 if (!wait_for_readers(expedite))
840 return false;
841
842 spinlock_lock(&rcu.gp_lock);
843 /* Notify any reclaimers this GP had ended. */
844 rcu.completed_gp = _rcu_cur_gp;
845 condvar_broadcast(&rcu.gp_ended);
846 } else {
847 /* GP detection is in progress.*/
848
849 if (expedite)
850 condvar_signal(&rcu.expedite_now);
851
852 /* Wait for the GP to complete. */
853 int ret = _condvar_wait_timeout_spinlock(&rcu.gp_ended,
854 &rcu.gp_lock, SYNCH_NO_TIMEOUT, SYNCH_FLAGS_INTERRUPTIBLE);
855
856 if (ret == ESYNCH_INTERRUPTED) {
857 spinlock_unlock(&rcu.gp_lock);
858 return false;
859 }
860
861 upd_missed_gp_in_wait(rcu.completed_gp);
862 }
863 }
[0cf813d]864
[181a746]865 *completed_gp = rcu.completed_gp;
[d4d36f9]866 spinlock_unlock(&rcu.gp_lock);
[0cf813d]867
[d4d36f9]868 return true;
[181a746]869}
870
[d4d36f9]871static bool wait_for_readers(bool expedite)
[0cf813d]872{
[d4d36f9]873 DEFINE_CPU_MASK(reader_cpus);
[0cf813d]874
[d4d36f9]875 cpu_mask_active(reader_cpus);
876 rm_quiescent_cpus(reader_cpus);
877
878 while (!cpu_mask_is_none(reader_cpus)) {
879 /* Give cpus a chance to context switch (a QS) and batch callbacks. */
880 if(!gp_sleep(&expedite))
881 return false;
882
883 rm_quiescent_cpus(reader_cpus);
884 sample_cpus(reader_cpus, reader_cpus);
885 }
886
887 /* Update statistic. */
888 if (expedite) {
889 ++rcu.stat_expedited_cnt;
890 }
891
892 /*
893 * All cpus have passed through a QS and see the most recent _rcu_cur_gp.
894 * As a result newly preempted readers will associate with next_preempted
895 * and the number of old readers in cur_preempted will monotonically
896 * decrease. Wait for those old/preexisting readers.
897 */
898 return wait_for_preempt_reader();
[0cf813d]899}
900
[d4d36f9]901static bool gp_sleep(bool *expedite)
[181a746]902{
[d4d36f9]903 if (*expedite) {
904 scheduler();
905 return true;
906 } else {
907 spinlock_lock(&rcu.gp_lock);
[181a746]908
[d4d36f9]909 int ret = 0;
910 ret = _condvar_wait_timeout_spinlock(&rcu.expedite_now, &rcu.gp_lock,
911 DETECT_SLEEP_MS * 1000, SYNCH_FLAGS_INTERRUPTIBLE);
912
913 /* rcu.expedite_now was signaled. */
914 if (ret == ESYNCH_OK_BLOCKED) {
915 *expedite = true;
[181a746]916 }
[d4d36f9]917
918 spinlock_unlock(&rcu.gp_lock);
919
920 return (ret != ESYNCH_INTERRUPTED);
[181a746]921 }
922}
923
[d4d36f9]924static void sample_local_cpu(void *arg)
[181a746]925{
[d4d36f9]926 ASSERT(interrupts_disabled());
927 cpu_mask_t *reader_cpus = (cpu_mask_t *)arg;
[181a746]928
[d4d36f9]929 bool locked = RCU_CNT_INC <= THE->rcu_nesting;
930 bool passed_qs = (CPU->rcu.last_seen_gp == _rcu_cur_gp);
931
932 if (locked && !passed_qs) {
933 /*
934 * This cpu has not yet passed a quiescent state during this grace
935 * period and it is currently in a reader section. We'll have to
936 * try to sample this cpu again later.
937 */
938 } else {
939 /* Either not in a reader section or already passed a QS. */
940 cpu_mask_reset(reader_cpus, CPU->id);
941 /* Contain new reader sections and make prior changes visible to them.*/
942 memory_barrier();
943 CPU->rcu.last_seen_gp = _rcu_cur_gp;
944 }
945}
946
947/** Called by the scheduler() when switching away from the current thread. */
948void rcu_after_thread_ran(void)
949{
950 ASSERT(interrupts_disabled());
951
952 /* Preempted a reader critical section for the first time. */
953 if (rcu_read_locked() && !(THE->rcu_nesting & RCU_WAS_PREEMPTED)) {
954 THE->rcu_nesting |= RCU_WAS_PREEMPTED;
955 note_preempted_reader();
956 }
957
958 /* Save the thread's nesting count when it is not running. */
959 THREAD->rcu.nesting_cnt = THE->rcu_nesting;
960
961 /* Clear rcu_nesting only after noting that a thread was preempted. */
962 compiler_barrier();
963 THE->rcu_nesting = 0;
964
965 if (CPU->rcu.last_seen_gp != _rcu_cur_gp) {
966 /*
967 * Contain any memory accesses of old readers before announcing a QS.
968 * Also make changes from the previous GP visible to this cpu.
969 */
970 memory_barrier();
971 /*
972 * The preempted reader has been noted globally. There are therefore
973 * no readers running on this cpu so this is a quiescent state.
974 */
975 CPU->rcu.last_seen_gp = _rcu_cur_gp;
976 }
977
978 /*
979 * Forcefully associate the reclaime with the highest priority
980 * even if preempted due to its time slice running out.
981 */
982 if (THREAD == CPU->rcu.reclaimer_thr) {
983 THREAD->priority = -1;
984 }
985
986 upd_max_cbs_in_slice();
987}
988
989/** Called by the scheduler() when switching to a newly scheduled thread. */
990void rcu_before_thread_runs(void)
991{
992 ASSERT(PREEMPTION_DISABLED || interrupts_disabled());
993 ASSERT(!rcu_read_locked());
994
995 /* Load the thread's saved nesting count from before it was preempted. */
996 THE->rcu_nesting = THREAD->rcu.nesting_cnt;
997}
998
999/** Called from scheduler() when exiting the current thread.
1000 *
1001 * Preemption or interrupts are disabled and the scheduler() already
1002 * switched away from the current thread, calling rcu_after_thread_ran().
1003 */
1004void rcu_thread_exiting(void)
1005{
1006 ASSERT(PREEMPTION_DISABLED || interrupts_disabled());
1007 /*
1008 * The thread forgot to exit its reader critical section.
1009 * It is a bug, but rather than letting the entire system lock up
1010 * forcefully leave the reader section. The thread is not holding
1011 * any references anyway since it is exiting so it is safe.
1012 */
1013 if (RCU_CNT_INC <= THREAD->rcu.nesting_cnt) {
1014 /* Emulate _rcu_preempted_unlock() with the proper nesting count. */
1015 if (THREAD->rcu.nesting_cnt & RCU_WAS_PREEMPTED) {
1016 ipl_t ipl = interrupts_disable();
1017 rm_preempted_reader();
1018 interrupts_restore(ipl);
1019 }
1020 }
1021}
1022
1023/** Returns true if in an rcu reader section. */
1024bool rcu_read_locked(void)
1025{
1026 return RCU_CNT_INC <= THE->rcu_nesting;
1027}
1028
1029/** Invoked when a preempted reader finally exits its reader section. */
1030void _rcu_preempted_unlock(void)
1031{
1032 ipl_t ipl = interrupts_disable();
1033
1034 if (THE->rcu_nesting == RCU_WAS_PREEMPTED) {
1035 THE->rcu_nesting = 0;
1036 rm_preempted_reader();
1037 }
1038
1039 interrupts_restore(ipl);
1040}
1041
1042#elif defined(RCU_PREEMPT_PODZIMEK)
1043
1044/** Waits for the grace period associated with callbacks cub_cbs to elapse.
1045 *
1046 * @param expedite Instructs the detector to aggressively speed up grace
1047 * period detection without any delay.
1048 * @param completed_gp Returns the most recent completed grace period
1049 * number.
1050 * @return false if the thread was interrupted and should stop.
1051 */
1052static bool wait_for_cur_cbs_gp_end(bool expedite, rcu_gp_t *completed_gp)
1053{
1054 /*
1055 * Use a possibly outdated version of completed_gp to bypass checking
1056 * with the lock.
1057 *
1058 * Note that loading and storing rcu.completed_gp is not atomic
1059 * (it is 64bit wide). Reading a clobbered value that is less than
1060 * rcu.completed_gp is harmless - we'll recheck with a lock. The
1061 * only way to read a clobbered value that is greater than the actual
1062 * value is if the detector increases the higher-order word first and
1063 * then decreases the lower-order word (or we see stores in that order),
1064 * eg when incrementing from 2^32 - 1 to 2^32. The loaded value
1065 * suddenly jumps by 2^32. It would take hours for such an increase
1066 * to occur so it is safe to discard the value. We allow increases
1067 * of up to half the maximum to generously accommodate for loading an
1068 * outdated lower word.
1069 */
1070 rcu_gp_t compl_gp = ACCESS_ONCE(rcu.completed_gp);
1071 if (CPU->rcu.cur_cbs_gp <= compl_gp
1072 && compl_gp <= CPU->rcu.cur_cbs_gp + UINT32_MAX_HALF) {
1073 *completed_gp = compl_gp;
1074 return true;
1075 }
1076
1077 spinlock_lock(&rcu.gp_lock);
1078
1079 if (CPU->rcu.cur_cbs_gp <= rcu.completed_gp) {
1080 *completed_gp = rcu.completed_gp;
1081 spinlock_unlock(&rcu.gp_lock);
1082 return true;
1083 }
1084
1085 ASSERT(CPU->rcu.cur_cbs_gp <= CPU->rcu.next_cbs_gp);
1086 ASSERT(_rcu_cur_gp <= CPU->rcu.cur_cbs_gp);
1087
1088 /*
1089 * Notify the detector of how many GP ends we intend to wait for, so
1090 * it can avoid going to sleep unnecessarily. Optimistically assume
1091 * new callbacks will arrive while we're waiting; hence +1.
1092 */
1093 size_t remaining_gp_ends = (size_t) (CPU->rcu.next_cbs_gp - _rcu_cur_gp);
1094 req_detection(remaining_gp_ends + (arriving_cbs_empty() ? 0 : 1));
1095
1096 /*
1097 * Ask the detector to speed up GP detection if there are too many
1098 * pending callbacks and other reclaimers have not already done so.
1099 */
1100 if (expedite) {
1101 if(0 == rcu.req_expedited_cnt)
1102 condvar_signal(&rcu.expedite_now);
1103
1104 /*
1105 * Expedite only cub_cbs. If there really is a surge of callbacks
1106 * the arriving batch will expedite the GP for the huge number
1107 * of callbacks currently in next_cbs
1108 */
1109 rcu.req_expedited_cnt = 1;
1110 }
1111
1112 /* Wait for cur_cbs_gp to end. */
1113 bool interrupted = cv_wait_for_gp(CPU->rcu.cur_cbs_gp);
1114
1115 *completed_gp = rcu.completed_gp;
1116 spinlock_unlock(&rcu.gp_lock);
1117
1118 upd_missed_gp_in_wait(*completed_gp);
1119
1120 return !interrupted;
1121}
1122
1123/** Waits for an announcement of the end of the grace period wait_on_gp. */
1124static bool cv_wait_for_gp(rcu_gp_t wait_on_gp)
1125{
1126 ASSERT(spinlock_locked(&rcu.gp_lock));
1127
1128 bool interrupted = false;
1129
1130 /* Wait until wait_on_gp ends. */
1131 while (rcu.completed_gp < wait_on_gp && !interrupted) {
[181a746]1132 int ret = _condvar_wait_timeout_spinlock(&rcu.gp_ended, &rcu.gp_lock,
1133 SYNCH_NO_TIMEOUT, SYNCH_FLAGS_INTERRUPTIBLE);
1134 interrupted = (ret == ESYNCH_INTERRUPTED);
1135 }
1136
1137 ASSERT(wait_on_gp <= rcu.completed_gp);
1138
1139 return interrupted;
1140}
1141
[d4d36f9]1142/** Requests the detector to detect at least req_cnt consecutive grace periods.*/
1143static void req_detection(size_t req_cnt)
1144{
1145 if (rcu.req_gp_end_cnt < req_cnt) {
1146 bool detector_idle = (0 == rcu.req_gp_end_cnt);
1147 rcu.req_gp_end_cnt = req_cnt;
1148
1149 if (detector_idle) {
1150 ASSERT(_rcu_cur_gp == rcu.completed_gp);
1151 condvar_signal(&rcu.req_gp_changed);
1152 }
1153 }
1154}
1155
1156
[181a746]1157/** The detector thread detects and notifies reclaimers of grace period ends. */
1158static void detector(void *arg)
1159{
1160 spinlock_lock(&rcu.gp_lock);
1161
1162 while (wait_for_detect_req()) {
1163 /*
1164 * Announce new GP started. Readers start lazily acknowledging that
1165 * they passed a QS.
1166 */
1167 start_new_gp();
1168
1169 spinlock_unlock(&rcu.gp_lock);
1170
1171 if (!wait_for_readers())
1172 goto unlocked_out;
1173
1174 spinlock_lock(&rcu.gp_lock);
1175
1176 /* Notify reclaimers that they may now invoke queued callbacks. */
1177 end_cur_gp();
1178 }
1179
1180 spinlock_unlock(&rcu.gp_lock);
1181
1182unlocked_out:
1183 return;
1184}
1185
1186/** Waits for a request from a reclaimer thread to detect a grace period. */
1187static bool wait_for_detect_req(void)
1188{
1189 ASSERT(spinlock_locked(&rcu.gp_lock));
1190
1191 bool interrupted = false;
1192
1193 while (0 == rcu.req_gp_end_cnt && !interrupted) {
1194 int ret = _condvar_wait_timeout_spinlock(&rcu.req_gp_changed,
1195 &rcu.gp_lock, SYNCH_NO_TIMEOUT, SYNCH_FLAGS_INTERRUPTIBLE);
1196
1197 interrupted = (ret == ESYNCH_INTERRUPTED);
1198 }
[d4d36f9]1199
1200 return !interrupted;
1201}
1202
1203
1204static void end_cur_gp(void)
1205{
1206 ASSERT(spinlock_locked(&rcu.gp_lock));
1207
1208 rcu.completed_gp = _rcu_cur_gp;
1209 --rcu.req_gp_end_cnt;
1210
1211 condvar_broadcast(&rcu.gp_ended);
1212}
1213
1214/** Waits for readers that started before the current GP started to finish. */
1215static bool wait_for_readers(void)
1216{
1217 DEFINE_CPU_MASK(reading_cpus);
1218
1219 /* All running cpus have potential readers. */
1220 cpu_mask_active(reading_cpus);
1221
1222 /*
1223 * Give readers time to pass through a QS. Also, batch arriving
1224 * callbacks in order to amortize detection overhead.
1225 */
1226 if (!gp_sleep())
1227 return false;
1228
1229 /* Non-intrusively determine which cpus have yet to pass a QS. */
1230 rm_quiescent_cpus(reading_cpus);
1231
1232 /* Actively interrupt cpus delaying the current GP and demand a QS. */
1233 interrupt_delaying_cpus(reading_cpus);
1234
1235 /* Wait for the interrupted cpus to notify us that they reached a QS. */
1236 if (!wait_for_delaying_cpus())
1237 return false;
1238 /*
1239 * All cpus recorded a QS or are still idle. Any new readers will be added
1240 * to next_preempt if preempted, ie the number of readers in cur_preempted
1241 * monotonically descreases.
1242 */
1243
1244 /* Wait for the last reader in cur_preempted to notify us it is done. */
1245 if (!wait_for_preempt_reader())
1246 return false;
1247
1248 return true;
1249}
1250
1251/** Sleeps a while if the current grace period is not to be expedited. */
1252static bool gp_sleep(void)
1253{
1254 spinlock_lock(&rcu.gp_lock);
1255
1256 int ret = 0;
1257 while (0 == rcu.req_expedited_cnt && 0 == ret) {
1258 /* minor bug: sleeps for the same duration if woken up spuriously. */
1259 ret = _condvar_wait_timeout_spinlock(&rcu.expedite_now, &rcu.gp_lock,
1260 DETECT_SLEEP_MS * 1000, SYNCH_FLAGS_INTERRUPTIBLE);
1261 }
1262
1263 if (0 < rcu.req_expedited_cnt) {
1264 --rcu.req_expedited_cnt;
1265 /* Update statistic. */
1266 ++rcu.stat_expedited_cnt;
1267 }
1268
1269 spinlock_unlock(&rcu.gp_lock);
1270
1271 return (ret != ESYNCH_INTERRUPTED);
1272}
1273
1274/** Actively interrupts and checks the offending cpus for quiescent states. */
1275static void interrupt_delaying_cpus(cpu_mask_t *cpu_mask)
1276{
1277 atomic_set(&rcu.delaying_cpu_cnt, 0);
1278
1279 sample_cpus(cpu_mask, 0);
1280}
1281
1282/** Invoked on a cpu delaying grace period detection.
1283 *
1284 * Induces a quiescent state for the cpu or it instructs remaining
1285 * readers to notify the detector once they finish.
1286 */
1287static void sample_local_cpu(void *arg)
1288{
1289 ASSERT(interrupts_disabled());
1290 ASSERT(!CPU->rcu.is_delaying_gp);
1291
1292 /* Cpu did not pass a quiescent state yet. */
1293 if (CPU->rcu.last_seen_gp != _rcu_cur_gp) {
1294 /* Interrupted a reader in a reader critical section. */
1295 if (0 < CPU->rcu.nesting_cnt) {
1296 ASSERT(!CPU->idle);
1297 /* Note to notify the detector from rcu_read_unlock(). */
1298 CPU->rcu.is_delaying_gp = true;
1299 /*
1300 * Set signal_unlock only after setting is_delaying_gp so
1301 * that NMI handlers do not accidentally clear it in unlock()
1302 * before seeing and acting upon is_delaying_gp.
1303 */
1304 compiler_barrier();
1305 CPU->rcu.signal_unlock = true;
1306
1307 atomic_inc(&rcu.delaying_cpu_cnt);
1308 } else {
1309 /*
1310 * The cpu did not enter any rcu reader sections since
1311 * the start of the current GP. Record a quiescent state.
1312 *
1313 * Or, we interrupted rcu_read_unlock_impl() right before
1314 * it recorded a QS. Record a QS for it. The memory barrier
1315 * contains the reader section's mem accesses before
1316 * updating last_seen_gp.
1317 *
1318 * Or, we interrupted rcu_read_lock() right after it recorded
1319 * a QS for the previous GP but before it got a chance to
1320 * increment its nesting count. The memory barrier again
1321 * stops the CS code from spilling out of the CS.
1322 */
1323 memory_barrier();
1324 CPU->rcu.last_seen_gp = _rcu_cur_gp;
1325 }
1326 } else {
1327 /*
1328 * This cpu already acknowledged that it had passed through
1329 * a quiescent state since the start of cur_gp.
1330 */
1331 }
1332
1333 /*
1334 * smp_call() makes sure any changes propagate back to the caller.
1335 * In particular, it makes the most current last_seen_gp visible
1336 * to the detector.
1337 */
1338}
1339
1340/** Waits for cpus delaying the current grace period if there are any. */
1341static bool wait_for_delaying_cpus(void)
1342{
1343 int delaying_cpu_cnt = atomic_get(&rcu.delaying_cpu_cnt);
1344
1345 for (int i = 0; i < delaying_cpu_cnt; ++i){
1346 if (!semaphore_down_interruptable(&rcu.remaining_readers))
1347 return false;
1348 }
1349
1350 /* Update statistic. */
1351 rcu.stat_delayed_cnt += delaying_cpu_cnt;
1352
1353 return true;
1354}
1355
1356/** Called by the scheduler() when switching away from the current thread. */
1357void rcu_after_thread_ran(void)
1358{
1359 ASSERT(interrupts_disabled());
1360 /* todo: make is_delaying_gp and was_preempted NMI safe via local atomics.*/
1361
1362 /*
1363 * Prevent NMI handlers from interfering. The detector will be notified
1364 * here if CPU->rcu.is_delaying_gp and the current thread is no longer
1365 * running so there is nothing to signal to the detector.
1366 */
1367 CPU->rcu.signal_unlock = false;
1368 /* Separates clearing of .signal_unlock from CPU->rcu.nesting_cnt = 0. */
1369 compiler_barrier();
1370
1371 /* Save the thread's nesting count when it is not running. */
1372 THREAD->rcu.nesting_cnt = CPU->rcu.nesting_cnt;
1373 /* Interrupt handlers might use RCU while idle in scheduler(). */
1374 CPU->rcu.nesting_cnt = 0;
1375
1376 /* Preempted a reader critical section for the first time. */
1377 if (0 < THREAD->rcu.nesting_cnt && !THREAD->rcu.was_preempted) {
1378 THREAD->rcu.was_preempted = true;
1379 note_preempted_reader();
1380 }
1381
1382 /*
1383 * The preempted reader has been noted globally. There are therefore
1384 * no readers running on this cpu so this is a quiescent state.
1385 */
1386 _rcu_record_qs();
1387
1388 /*
1389 * This cpu is holding up the current GP. Let the detector know
1390 * it has just passed a quiescent state.
1391 *
1392 * The detector waits separately for preempted readers, so we have
1393 * to notify the detector even if we have just preempted a reader.
1394 */
1395 if (CPU->rcu.is_delaying_gp) {
1396 CPU->rcu.is_delaying_gp = false;
1397 semaphore_up(&rcu.remaining_readers);
1398 }
1399
1400 /*
1401 * Forcefully associate the detector with the highest priority
1402 * even if preempted due to its time slice running out.
1403 *
1404 * todo: Replace with strict scheduler priority classes.
1405 */
1406 if (THREAD == rcu.detector_thr) {
1407 THREAD->priority = -1;
1408 }
1409 else if (THREAD == CPU->rcu.reclaimer_thr) {
1410 THREAD->priority = -1;
1411 }
1412
1413 upd_max_cbs_in_slice();
1414}
1415
1416/** Called by the scheduler() when switching to a newly scheduled thread. */
1417void rcu_before_thread_runs(void)
1418{
1419 ASSERT(PREEMPTION_DISABLED || interrupts_disabled());
1420 ASSERT(0 == CPU->rcu.nesting_cnt);
1421
1422 /* Load the thread's saved nesting count from before it was preempted. */
1423 CPU->rcu.nesting_cnt = THREAD->rcu.nesting_cnt;
1424 /*
1425 * In the unlikely event that a NMI occurs between the loading of the
1426 * variables and setting signal_unlock, the NMI handler may invoke
1427 * rcu_read_unlock() and clear signal_unlock. In that case we will
1428 * incorrectly overwrite signal_unlock from false to true. This event
1429 * situation benign and the next rcu_read_unlock() will at worst
1430 * needlessly invoke _rcu_signal_unlock().
1431 */
1432 CPU->rcu.signal_unlock = THREAD->rcu.was_preempted || CPU->rcu.is_delaying_gp;
1433}
1434
1435/** Called from scheduler() when exiting the current thread.
1436 *
1437 * Preemption or interrupts are disabled and the scheduler() already
1438 * switched away from the current thread, calling rcu_after_thread_ran().
1439 */
1440void rcu_thread_exiting(void)
1441{
1442 ASSERT(THREAD != 0);
1443 ASSERT(THREAD->state == Exiting);
1444 ASSERT(PREEMPTION_DISABLED || interrupts_disabled());
1445 /*
1446 * The thread forgot to exit its reader critical section.
1447 * It is a bug, but rather than letting the entire system lock up
1448 * forcefully leave the reader section. The thread is not holding
1449 * any references anyway since it is exiting so it is safe.
1450 */
1451 if (0 < THREAD->rcu.nesting_cnt) {
1452 THREAD->rcu.nesting_cnt = 1;
1453 read_unlock_impl(&THREAD->rcu.nesting_cnt);
1454 }
[181a746]1455}
1456
[d4d36f9]1457
1458#endif /* RCU_PREEMPT_PODZIMEK */
1459
[181a746]1460/** Announces the start of a new grace period for preexisting readers to ack. */
1461static void start_new_gp(void)
1462{
1463 ASSERT(spinlock_locked(&rcu.gp_lock));
1464
1465 irq_spinlock_lock(&rcu.preempt_lock, true);
1466
1467 /* Start a new GP. Announce to readers that a quiescent state is needed. */
[8e3ed06]1468 ++_rcu_cur_gp;
[181a746]1469
1470 /*
1471 * Readers preempted before the start of this GP (next_preempted)
1472 * are preexisting readers now that a GP started and will hold up
1473 * the current GP until they exit their reader sections.
1474 *
1475 * Preempted readers from the previous GP have finished so
[8e3ed06]1476 * cur_preempted is empty, but see comment in _rcu_record_qs().
[181a746]1477 */
[c14762e]1478 list_concat(&rcu.cur_preempted, &rcu.next_preempted);
[181a746]1479
1480 irq_spinlock_unlock(&rcu.preempt_lock, true);
1481}
1482
[d4d36f9]1483/** Remove those cpus from the mask that have already passed a quiescent
1484 * state since the start of the current grace period.
1485 */
1486static void rm_quiescent_cpus(cpu_mask_t *cpu_mask)
[181a746]1487{
1488 /*
1489 * Ensure the announcement of the start of a new GP (ie up-to-date
1490 * cur_gp) propagates to cpus that are just coming out of idle
1491 * mode before we sample their idle state flag.
1492 *
1493 * Cpus guarantee that after they set CPU->idle = true they will not
1494 * execute any RCU reader sections without first setting idle to
1495 * false and issuing a memory barrier. Therefore, if rm_quiescent_cpus()
1496 * later on sees an idle cpu, but the cpu is just exiting its idle mode,
1497 * the cpu must not have yet executed its memory barrier (otherwise
1498 * it would pair up with this mem barrier and we would see idle == false).
1499 * That memory barrier will pair up with the one below and ensure
1500 * that a reader on the now-non-idle cpu will see the most current
1501 * cur_gp. As a result, such a reader will never attempt to semaphore_up(
1502 * pending_readers) during this GP, which allows the detector to
1503 * ignore that cpu (the detector thinks it is idle). Moreover, any
1504 * changes made by RCU updaters will have propagated to readers
1505 * on the previously idle cpu -- again thanks to issuing a memory
1506 * barrier after returning from idle mode.
1507 *
1508 * idle -> non-idle cpu | detector | reclaimer
1509 * ------------------------------------------------------
1510 * rcu reader 1 | | rcu_call()
1511 * MB X | |
1512 * idle = true | | rcu_call()
1513 * (no rcu readers allowed ) | | MB A in advance_cbs()
1514 * MB Y | (...) | (...)
1515 * (no rcu readers allowed) | | MB B in advance_cbs()
1516 * idle = false | ++cur_gp |
1517 * (no rcu readers allowed) | MB C |
1518 * MB Z | signal gp_end |
1519 * rcu reader 2 | | exec_cur_cbs()
1520 *
1521 *
1522 * MB Y orders visibility of changes to idle for detector's sake.
1523 *
1524 * MB Z pairs up with MB C. The cpu making a transition from idle
1525 * will see the most current value of cur_gp and will not attempt
1526 * to notify the detector even if preempted during this GP.
1527 *
1528 * MB Z pairs up with MB A from the previous batch. Updaters' changes
1529 * are visible to reader 2 even when the detector thinks the cpu is idle
1530 * but it is not anymore.
1531 *
1532 * MB X pairs up with MB B. Late mem accesses of reader 1 are contained
1533 * and visible before idling and before any callbacks are executed
1534 * by reclaimers.
1535 *
1536 * In summary, the detector does not know of or wait for reader 2, but
1537 * it does not have to since it is a new reader that will not access
1538 * data from previous GPs and will see any changes.
1539 */
1540 memory_barrier(); /* MB C */
1541
1542 cpu_mask_for_each(*cpu_mask, cpu_id) {
1543 /*
1544 * The cpu already checked for and passed through a quiescent
1545 * state since the beginning of this GP.
1546 *
[8e3ed06]1547 * _rcu_cur_gp is modified by local detector thread only.
[181a746]1548 * Therefore, it is up-to-date even without a lock.
1549 */
[8e3ed06]1550 bool cpu_acked_gp = (cpus[cpu_id].rcu.last_seen_gp == _rcu_cur_gp);
[181a746]1551
1552 /*
1553 * Either the cpu is idle or it is exiting away from idle mode
[8e3ed06]1554 * and already sees the most current _rcu_cur_gp. See comment
[181a746]1555 * in wait_for_readers().
1556 */
1557 bool cpu_idle = cpus[cpu_id].idle;
1558
1559 if (cpu_acked_gp || cpu_idle) {
1560 cpu_mask_reset(cpu_mask, cpu_id);
1561 }
1562 }
1563}
1564
[d4d36f9]1565/** Invokes sample_local_cpu(arg) on each cpu of reader_cpus. */
1566static void sample_cpus(cpu_mask_t *reader_cpus, void *arg)
[181a746]1567{
1568 const size_t max_conconcurrent_calls = 16;
1569 smp_call_t call[max_conconcurrent_calls];
1570 size_t outstanding_calls = 0;
1571
[d4d36f9]1572 cpu_mask_for_each(*reader_cpus, cpu_id) {
1573 smp_call_async(cpu_id, sample_local_cpu, arg, &call[outstanding_calls]);
[181a746]1574 ++outstanding_calls;
1575
1576 /* Update statistic. */
1577 if (CPU->id != cpu_id)
1578 ++rcu.stat_smp_call_cnt;
1579
1580 if (outstanding_calls == max_conconcurrent_calls) {
1581 for (size_t k = 0; k < outstanding_calls; ++k) {
1582 smp_call_wait(&call[k]);
1583 }
1584
1585 outstanding_calls = 0;
1586 }
1587 }
1588
1589 for (size_t k = 0; k < outstanding_calls; ++k) {
1590 smp_call_wait(&call[k]);
1591 }
1592}
1593
[d4d36f9]1594static void upd_missed_gp_in_wait(rcu_gp_t completed_gp)
[181a746]1595{
[d4d36f9]1596 ASSERT(CPU->rcu.cur_cbs_gp <= completed_gp);
[181a746]1597
[d4d36f9]1598 size_t delta = (size_t)(completed_gp - CPU->rcu.cur_cbs_gp);
1599 CPU->rcu.stat_missed_gp_in_wait += delta;
1600}
1601
1602/** Globally note that the current thread was preempted in a reader section. */
1603static void note_preempted_reader(void)
1604{
1605 irq_spinlock_lock(&rcu.preempt_lock, false);
1606
[8e3ed06]1607 if (CPU->rcu.last_seen_gp != _rcu_cur_gp) {
[d4d36f9]1608 /* The reader started before the GP started - we must wait for it.*/
1609 list_append(&THREAD->rcu.preempt_link, &rcu.cur_preempted);
[181a746]1610 } else {
1611 /*
[d4d36f9]1612 * The reader started after the GP started and this cpu
1613 * already noted a quiescent state. We might block the next GP.
[181a746]1614 */
[d4d36f9]1615 list_append(&THREAD->rcu.preempt_link, &rcu.next_preempted);
[181a746]1616 }
[d4d36f9]1617
1618 irq_spinlock_unlock(&rcu.preempt_lock, false);
[181a746]1619}
1620
[d4d36f9]1621/** Remove the current thread from the global list of preempted readers. */
1622static void rm_preempted_reader(void)
[181a746]1623{
[d4d36f9]1624 irq_spinlock_lock(&rcu.preempt_lock, false);
1625
1626 ASSERT(link_used(&THREAD->rcu.preempt_link));
[79d74fe]1627
[d4d36f9]1628 bool prev_empty = list_empty(&rcu.cur_preempted);
1629 list_remove(&THREAD->rcu.preempt_link);
1630 bool now_empty = list_empty(&rcu.cur_preempted);
1631
1632 /* This was the last reader in cur_preempted. */
1633 bool last_removed = now_empty && !prev_empty;
1634
1635 /*
1636 * Preempted readers are blocking the detector and
1637 * this was the last reader blocking the current GP.
1638 */
1639 if (last_removed && rcu.preempt_blocking_det) {
1640 rcu.preempt_blocking_det = false;
1641 semaphore_up(&rcu.remaining_readers);
[181a746]1642 }
[d4d36f9]1643
1644 irq_spinlock_unlock(&rcu.preempt_lock, false);
[181a746]1645}
1646
1647/** Waits for any preempted readers blocking this grace period to finish.*/
1648static bool wait_for_preempt_reader(void)
1649{
1650 irq_spinlock_lock(&rcu.preempt_lock, true);
1651
1652 bool reader_exists = !list_empty(&rcu.cur_preempted);
1653 rcu.preempt_blocking_det = reader_exists;
1654
1655 irq_spinlock_unlock(&rcu.preempt_lock, true);
1656
1657 if (reader_exists) {
1658 /* Update statistic. */
1659 ++rcu.stat_preempt_blocking_cnt;
1660
1661 return semaphore_down_interruptable(&rcu.remaining_readers);
1662 }
1663
1664 return true;
1665}
1666
[0cf813d]1667static void upd_max_cbs_in_slice(void)
1668{
1669 rcu_cpu_data_t *cr = &CPU->rcu;
1670
1671 if (cr->arriving_cbs_cnt > cr->last_arriving_cnt) {
1672 size_t arrived_cnt = cr->arriving_cbs_cnt - cr->last_arriving_cnt;
1673 cr->stat_max_slice_cbs = max(arrived_cnt, cr->stat_max_slice_cbs);
1674 }
1675
1676 cr->last_arriving_cnt = cr->arriving_cbs_cnt;
[181a746]1677}
1678
1679/** Prints RCU run-time statistics. */
1680void rcu_print_stat(void)
1681{
[0cf813d]1682 /*
1683 * Don't take locks. Worst case is we get out-dated values.
1684 * CPU local values are updated without any locks, so there
1685 * are no locks to lock in order to get up-to-date values.
1686 */
1687
[d4d36f9]1688#ifdef RCU_PREEMPT_PODZIMEK
1689 const char *algo = "podzimek-preempt-rcu";
1690#elif defined(RCU_PREEMPT_A)
1691 const char *algo = "a-preempt-rcu";
1692#endif
1693
1694 printf("Config: expedite_threshold=%d, critical_threshold=%d,"
1695 " detect_sleep=%dms, %s\n",
1696 EXPEDITE_THRESHOLD, CRITICAL_THRESHOLD, DETECT_SLEEP_MS, algo);
[181a746]1697 printf("Completed GPs: %" PRIu64 "\n", rcu.completed_gp);
1698 printf("Expedited GPs: %zu\n", rcu.stat_expedited_cnt);
1699 printf("Delayed GPs: %zu (cpus w/ still running readers after gp sleep)\n",
1700 rcu.stat_delayed_cnt);
1701 printf("Preempt blocked GPs: %zu (waited for preempted readers; "
[b68ae24]1702 "running or not)\n", rcu.stat_preempt_blocking_cnt);
1703 printf("Smp calls: %zu\n", rcu.stat_smp_call_cnt);
[181a746]1704
[0cf813d]1705 printf("Max arrived callbacks per GP and CPU:\n");
1706 for (unsigned int i = 0; i < config.cpu_count; ++i) {
[181a746]1707 printf(" %zu", cpus[i].rcu.stat_max_cbs);
1708 }
1709
[0cf813d]1710 printf("\nAvg arrived callbacks per GP and CPU (nonempty batches only):\n");
1711 for (unsigned int i = 0; i < config.cpu_count; ++i) {
[181a746]1712 printf(" %zu", cpus[i].rcu.stat_avg_cbs);
1713 }
1714
[0cf813d]1715 printf("\nMax arrived callbacks per time slice and CPU:\n");
1716 for (unsigned int i = 0; i < config.cpu_count; ++i) {
1717 printf(" %zu", cpus[i].rcu.stat_max_slice_cbs);
1718 }
1719
1720 printf("\nMissed GP notifications per CPU:\n");
1721 for (unsigned int i = 0; i < config.cpu_count; ++i) {
[181a746]1722 printf(" %zu", cpus[i].rcu.stat_missed_gps);
1723 }
[0cf813d]1724
1725 printf("\nMissed GP notifications per CPU while waking up:\n");
1726 for (unsigned int i = 0; i < config.cpu_count; ++i) {
1727 printf(" %zu", cpus[i].rcu.stat_missed_gp_in_wait);
1728 }
[181a746]1729 printf("\n");
1730}
1731
1732/** @}
1733 */
Note: See TracBrowser for help on using the repository browser.