source: mainline/kernel/generic/src/synch/rcu.c@ 9306cd7

lfn serial ticket/834-toolchain-update topic/msim-upgrade topic/simplify-dev-export
Last change on this file since 9306cd7 was 63e27ef, checked in by Jiri Svoboda <jiri@…>, 8 years ago

ASSERT → assert

  • Property mode set to 100644
File size: 56.4 KB
Line 
1/*
2 * Copyright (c) 2012 Adam Hraska
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 *
9 * - Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 * - Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
14 * - The name of the author may not be used to endorse or promote products
15 * derived from this software without specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
18 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
19 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
20 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
21 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
22 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
26 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27 */
28
29
30/** @addtogroup sync
31 * @{
32 */
33
34/**
35 * @file
36 * @brief Preemptible read-copy update. Usable from interrupt handlers.
37 *
38 * @par Podzimek-preempt-RCU (RCU_PREEMPT_PODZIMEK)
39 *
40 * Podzimek-preempt-RCU is a preemptible variant of Podzimek's non-preemptible
41 * RCU algorithm [1, 2]. Grace period (GP) detection is centralized into a
42 * single detector thread. The detector requests that each cpu announces
43 * that it passed a quiescent state (QS), ie a state when the cpu is
44 * outside of an rcu reader section (CS). Cpus check for QSs during context
45 * switches and when entering and exiting rcu reader sections. Once all
46 * cpus announce a QS and if there were no threads preempted in a CS, the
47 * GP ends.
48 *
49 * The detector increments the global GP counter, _rcu_cur_gp, in order
50 * to start a new GP. Readers notice the new GP by comparing the changed
51 * _rcu_cur_gp to a locally stored value last_seen_gp which denotes the
52 * the last GP number for which the cpu noted an explicit QS (and issued
53 * a memory barrier). Readers check for the change in the outer-most
54 * (ie not nested) rcu_read_lock()/unlock() as these functions represent
55 * a QS. The reader first executes a memory barrier (MB) in order to contain
56 * memory references within a CS (and to make changes made by writers
57 * visible in the CS following rcu_read_lock()). Next, the reader notes
58 * that it reached a QS by updating the cpu local last_seen_gp to the
59 * global GP counter, _rcu_cur_gp. Cache coherency eventually makes
60 * the updated last_seen_gp visible to the detector cpu, much like it
61 * delivered the changed _rcu_cur_gp to all cpus.
62 *
63 * The detector waits a while after starting a GP and then reads each
64 * cpu's last_seen_gp to see if it reached a QS. If a cpu did not record
65 * a QS (might be a long running thread without an RCU reader CS; or cache
66 * coherency has yet to make the most current last_seen_gp visible to
67 * the detector; or the cpu is still in a CS) the cpu is interrupted
68 * via an IPI. If the IPI handler finds the cpu still in a CS, it instructs
69 * the cpu to notify the detector that it had exited the CS via a semaphore
70 * (CPU->rcu.is_delaying_gp).
71 * The detector then waits on the semaphore for any cpus to exit their
72 * CSs. Lastly, it waits for the last reader preempted in a CS to
73 * exit its CS if there were any and signals the end of the GP to
74 * separate reclaimer threads wired to each cpu. Reclaimers then
75 * execute the callbacks queued on each of the cpus.
76 *
77 *
78 * @par A-RCU algorithm (RCU_PREEMPT_A)
79 *
80 * A-RCU is based on the user space rcu algorithm in [3] utilizing signals
81 * (urcu) and Podzimek's rcu [1]. Like in Podzimek's rcu, callbacks are
82 * executed by cpu-bound reclaimer threads. There is however no dedicated
83 * detector thread and the reclaimers take on the responsibilities of the
84 * detector when they need to start a new GP. A new GP is again announced
85 * and acknowledged with _rcu_cur_gp and the cpu local last_seen_gp. Unlike
86 * Podzimek's rcu, cpus check explicitly for QS only during context switches.
87 * Like in urcu, rcu_read_lock()/unlock() only maintain the nesting count
88 * and never issue any memory barriers. This makes rcu_read_lock()/unlock()
89 * simple and fast.
90 *
91 * If a new callback is queued for a reclaimer and no GP is in progress,
92 * the reclaimer takes on the role of a detector. The detector increments
93 * _rcu_cur_gp in order to start a new GP. It waits a while to give cpus
94 * a chance to switch a context (a natural QS). Then, it examines each
95 * non-idle cpu that has yet to pass a QS via an IPI. The IPI handler
96 * sees the most current _rcu_cur_gp and last_seen_gp and notes a QS
97 * with a memory barrier and an update to last_seen_gp. If the handler
98 * finds the cpu in a CS it does nothing and let the detector poll/interrupt
99 * the cpu again after a short sleep.
100 *
101 * @par Caveats
102 *
103 * last_seen_gp and _rcu_cur_gp are always 64bit variables and they
104 * are read non-atomically on 32bit machines. Reading a clobbered
105 * value of last_seen_gp or _rcu_cur_gp or writing a clobbered value
106 * of _rcu_cur_gp to last_seen_gp will at worst force the detector
107 * to unnecessarily interrupt a cpu. Interrupting a cpu makes the
108 * correct value of _rcu_cur_gp visible to the cpu and correctly
109 * resets last_seen_gp in both algorithms.
110 *
111 *
112 *
113 * [1] Read-copy-update for opensolaris,
114 * 2010, Podzimek
115 * https://andrej.podzimek.org/thesis.pdf
116 *
117 * [2] (podzimek-rcu) implementation file "rcu.patch"
118 * http://d3s.mff.cuni.cz/projects/operating_systems/rcu/rcu.patch
119 *
120 * [3] User-level implementations of read-copy update,
121 * 2012, appendix
122 * http://www.rdrop.com/users/paulmck/RCU/urcu-supp-accepted.2011.08.30a.pdf
123 *
124 */
125
126#include <assert.h>
127#include <synch/rcu.h>
128#include <synch/condvar.h>
129#include <synch/semaphore.h>
130#include <synch/spinlock.h>
131#include <synch/mutex.h>
132#include <proc/thread.h>
133#include <cpu/cpu_mask.h>
134#include <cpu.h>
135#include <smp/smp_call.h>
136#include <compiler/barrier.h>
137#include <atomic.h>
138#include <arch.h>
139#include <macros.h>
140
141/*
142 * Number of milliseconds to give to preexisting readers to finish
143 * when non-expedited grace period detection is in progress.
144 */
145#define DETECT_SLEEP_MS 10
146/*
147 * Max number of pending callbacks in the local cpu's queue before
148 * aggressively expediting the current grace period
149 */
150#define EXPEDITE_THRESHOLD 2000
151/*
152 * Max number of callbacks to execute in one go with preemption
153 * enabled. If there are more callbacks to be executed they will
154 * be run with preemption disabled in order to prolong reclaimer's
155 * time slice and give it a chance to catch up with callback producers.
156 */
157#define CRITICAL_THRESHOLD 30000
158/* Half the number of values a uint32 can hold. */
159#define UINT32_MAX_HALF 2147483648U
160
161/**
162 * The current grace period number. Increases monotonically.
163 * Lock rcu.gp_lock or rcu.preempt_lock to get a current value.
164 */
165rcu_gp_t _rcu_cur_gp;
166
167/** Global RCU data. */
168typedef struct rcu_data {
169 /** Detector uses so signal reclaimers that a grace period ended. */
170 condvar_t gp_ended;
171 /** Reclaimers use to notify the detector to accelerate GP detection. */
172 condvar_t expedite_now;
173 /**
174 * Protects: req_gp_end_cnt, req_expedited_cnt, completed_gp, _rcu_cur_gp;
175 * or: completed_gp, _rcu_cur_gp
176 */
177 SPINLOCK_DECLARE(gp_lock);
178 /**
179 * The number of the most recently completed grace period. At most
180 * one behind _rcu_cur_gp. If equal to _rcu_cur_gp, a grace period
181 * detection is not in progress and the detector is idle.
182 */
183 rcu_gp_t completed_gp;
184
185 /** Protects the following 3 fields. */
186 IRQ_SPINLOCK_DECLARE(preempt_lock);
187 /** Preexisting readers that have been preempted. */
188 list_t cur_preempted;
189 /** Reader that have been preempted and might delay the next grace period.*/
190 list_t next_preempted;
191 /**
192 * The detector is waiting for the last preempted reader
193 * in cur_preempted to announce that it exited its reader
194 * section by up()ing remaining_readers.
195 */
196 bool preempt_blocking_det;
197
198#ifdef RCU_PREEMPT_A
199
200 /**
201 * The detector waits on this semaphore for any preempted readers
202 * delaying the grace period once all cpus pass a quiescent state.
203 */
204 semaphore_t remaining_readers;
205
206#elif defined(RCU_PREEMPT_PODZIMEK)
207
208 /** Reclaimers notify the detector when they request more grace periods.*/
209 condvar_t req_gp_changed;
210 /** Number of grace period ends the detector was requested to announce. */
211 size_t req_gp_end_cnt;
212 /** Number of consecutive grace periods to detect quickly and aggressively.*/
213 size_t req_expedited_cnt;
214 /**
215 * Number of cpus with readers that are delaying the current GP.
216 * They will up() remaining_readers.
217 */
218 atomic_t delaying_cpu_cnt;
219 /**
220 * The detector waits on this semaphore for any readers delaying the GP.
221 *
222 * Each of the cpus with readers that are delaying the current GP
223 * must up() this sema once they reach a quiescent state. If there
224 * are any readers in cur_preempted (ie preempted preexisting) and
225 * they are already delaying GP detection, the last to unlock its
226 * reader section must up() this sema once.
227 */
228 semaphore_t remaining_readers;
229#endif
230
231 /** Excludes simultaneous rcu_barrier() calls. */
232 mutex_t barrier_mtx;
233 /** Number of cpus that we are waiting for to complete rcu_barrier(). */
234 atomic_t barrier_wait_cnt;
235 /** rcu_barrier() waits for the completion of barrier callbacks on this wq.*/
236 waitq_t barrier_wq;
237
238 /** Interruptible attached detector thread pointer. */
239 thread_t *detector_thr;
240
241 /* Some statistics. */
242 size_t stat_expedited_cnt;
243 size_t stat_delayed_cnt;
244 size_t stat_preempt_blocking_cnt;
245 /* Does not contain self/local calls. */
246 size_t stat_smp_call_cnt;
247} rcu_data_t;
248
249
250static rcu_data_t rcu;
251
252static void start_reclaimers(void);
253static void synch_complete(rcu_item_t *rcu_item);
254static inline void rcu_call_impl(bool expedite, rcu_item_t *rcu_item,
255 rcu_func_t func);
256static void add_barrier_cb(void *arg);
257static void barrier_complete(rcu_item_t *barrier_item);
258static bool arriving_cbs_empty(void);
259static bool next_cbs_empty(void);
260static bool cur_cbs_empty(void);
261static bool all_cbs_empty(void);
262static void reclaimer(void *arg);
263static bool wait_for_pending_cbs(void);
264static bool advance_cbs(void);
265static void exec_completed_cbs(rcu_gp_t last_completed_gp);
266static void exec_cbs(rcu_item_t **phead);
267static bool wait_for_cur_cbs_gp_end(bool expedite, rcu_gp_t *last_completed_gp);
268static void upd_missed_gp_in_wait(rcu_gp_t completed_gp);
269
270#ifdef RCU_PREEMPT_PODZIMEK
271static void start_detector(void);
272static void read_unlock_impl(size_t *pnesting_cnt);
273static void req_detection(size_t req_cnt);
274static bool cv_wait_for_gp(rcu_gp_t wait_on_gp);
275static void detector(void *);
276static bool wait_for_detect_req(void);
277static void end_cur_gp(void);
278static bool wait_for_readers(void);
279static bool gp_sleep(void);
280static void interrupt_delaying_cpus(cpu_mask_t *cpu_mask);
281static bool wait_for_delaying_cpus(void);
282#elif defined(RCU_PREEMPT_A)
283static bool wait_for_readers(bool expedite);
284static bool gp_sleep(bool *expedite);
285#endif
286
287static void start_new_gp(void);
288static void rm_quiescent_cpus(cpu_mask_t *cpu_mask);
289static void sample_cpus(cpu_mask_t *reader_cpus, void *arg);
290static void sample_local_cpu(void *);
291static bool wait_for_preempt_reader(void);
292static void note_preempted_reader(void);
293static void rm_preempted_reader(void);
294static void upd_max_cbs_in_slice(size_t arriving_cbs_cnt);
295
296
297
298/** Initializes global RCU structures. */
299void rcu_init(void)
300{
301 condvar_initialize(&rcu.gp_ended);
302 condvar_initialize(&rcu.expedite_now);
303
304 spinlock_initialize(&rcu.gp_lock, "rcu.gp_lock");
305 _rcu_cur_gp = 0;
306 rcu.completed_gp = 0;
307
308 irq_spinlock_initialize(&rcu.preempt_lock, "rcu.preempt_lock");
309 list_initialize(&rcu.cur_preempted);
310 list_initialize(&rcu.next_preempted);
311 rcu.preempt_blocking_det = false;
312
313 mutex_initialize(&rcu.barrier_mtx, MUTEX_PASSIVE);
314 atomic_set(&rcu.barrier_wait_cnt, 0);
315 waitq_initialize(&rcu.barrier_wq);
316
317 semaphore_initialize(&rcu.remaining_readers, 0);
318
319#ifdef RCU_PREEMPT_PODZIMEK
320 condvar_initialize(&rcu.req_gp_changed);
321
322 rcu.req_gp_end_cnt = 0;
323 rcu.req_expedited_cnt = 0;
324 atomic_set(&rcu.delaying_cpu_cnt, 0);
325#endif
326
327 rcu.detector_thr = NULL;
328
329 rcu.stat_expedited_cnt = 0;
330 rcu.stat_delayed_cnt = 0;
331 rcu.stat_preempt_blocking_cnt = 0;
332 rcu.stat_smp_call_cnt = 0;
333}
334
335/** Initializes per-CPU RCU data. If on the boot cpu inits global data too.*/
336void rcu_cpu_init(void)
337{
338 if (config.cpu_active == 1) {
339 rcu_init();
340 }
341
342 CPU->rcu.last_seen_gp = 0;
343
344#ifdef RCU_PREEMPT_PODZIMEK
345 CPU->rcu.nesting_cnt = 0;
346 CPU->rcu.is_delaying_gp = false;
347 CPU->rcu.signal_unlock = false;
348#endif
349
350 CPU->rcu.cur_cbs = NULL;
351 CPU->rcu.cur_cbs_cnt = 0;
352 CPU->rcu.next_cbs = NULL;
353 CPU->rcu.next_cbs_cnt = 0;
354 CPU->rcu.arriving_cbs = NULL;
355 CPU->rcu.parriving_cbs_tail = &CPU->rcu.arriving_cbs;
356 CPU->rcu.arriving_cbs_cnt = 0;
357
358 CPU->rcu.cur_cbs_gp = 0;
359 CPU->rcu.next_cbs_gp = 0;
360
361 semaphore_initialize(&CPU->rcu.arrived_flag, 0);
362
363 /* BSP creates reclaimer threads before AP's rcu_cpu_init() runs. */
364 if (config.cpu_active == 1)
365 CPU->rcu.reclaimer_thr = NULL;
366
367 CPU->rcu.stat_max_cbs = 0;
368 CPU->rcu.stat_avg_cbs = 0;
369 CPU->rcu.stat_missed_gps = 0;
370 CPU->rcu.stat_missed_gp_in_wait = 0;
371 CPU->rcu.stat_max_slice_cbs = 0;
372 CPU->rcu.last_arriving_cnt = 0;
373}
374
375/** Completes RCU init. Creates and runs the detector and reclaimer threads.*/
376void rcu_kinit_init(void)
377{
378#ifdef RCU_PREEMPT_PODZIMEK
379 start_detector();
380#endif
381
382 start_reclaimers();
383}
384
385/** Initializes any per-thread RCU structures. */
386void rcu_thread_init(thread_t *thread)
387{
388 thread->rcu.nesting_cnt = 0;
389
390#ifdef RCU_PREEMPT_PODZIMEK
391 thread->rcu.was_preempted = false;
392#endif
393
394 link_initialize(&thread->rcu.preempt_link);
395}
396
397
398/** Cleans up global RCU resources and stops dispatching callbacks.
399 *
400 * Call when shutting down the kernel. Outstanding callbacks will
401 * not be processed. Instead they will linger forever.
402 */
403void rcu_stop(void)
404{
405 /* Stop and wait for reclaimers. */
406 for (unsigned int cpu_id = 0; cpu_id < config.cpu_active; ++cpu_id) {
407 assert(cpus[cpu_id].rcu.reclaimer_thr != NULL);
408
409 if (cpus[cpu_id].rcu.reclaimer_thr) {
410 thread_interrupt(cpus[cpu_id].rcu.reclaimer_thr);
411 thread_join(cpus[cpu_id].rcu.reclaimer_thr);
412 thread_detach(cpus[cpu_id].rcu.reclaimer_thr);
413 cpus[cpu_id].rcu.reclaimer_thr = NULL;
414 }
415 }
416
417#ifdef RCU_PREEMPT_PODZIMEK
418 /* Stop the detector and wait. */
419 if (rcu.detector_thr) {
420 thread_interrupt(rcu.detector_thr);
421 thread_join(rcu.detector_thr);
422 thread_detach(rcu.detector_thr);
423 rcu.detector_thr = NULL;
424 }
425#endif
426}
427
428/** Returns the number of elapsed grace periods since boot. */
429uint64_t rcu_completed_gps(void)
430{
431 spinlock_lock(&rcu.gp_lock);
432 uint64_t completed = rcu.completed_gp;
433 spinlock_unlock(&rcu.gp_lock);
434
435 return completed;
436}
437
438/** Creates and runs cpu-bound reclaimer threads. */
439static void start_reclaimers(void)
440{
441 for (unsigned int cpu_id = 0; cpu_id < config.cpu_count; ++cpu_id) {
442 char name[THREAD_NAME_BUFLEN] = {0};
443
444 snprintf(name, THREAD_NAME_BUFLEN - 1, "rcu-rec/%u", cpu_id);
445
446 cpus[cpu_id].rcu.reclaimer_thr =
447 thread_create(reclaimer, NULL, TASK, THREAD_FLAG_NONE, name);
448
449 if (!cpus[cpu_id].rcu.reclaimer_thr)
450 panic("Failed to create RCU reclaimer thread on cpu%u.", cpu_id);
451
452 thread_wire(cpus[cpu_id].rcu.reclaimer_thr, &cpus[cpu_id]);
453 thread_ready(cpus[cpu_id].rcu.reclaimer_thr);
454 }
455}
456
457#ifdef RCU_PREEMPT_PODZIMEK
458
459/** Starts the detector thread. */
460static void start_detector(void)
461{
462 rcu.detector_thr =
463 thread_create(detector, NULL, TASK, THREAD_FLAG_NONE, "rcu-det");
464
465 if (!rcu.detector_thr)
466 panic("Failed to create RCU detector thread.");
467
468 thread_ready(rcu.detector_thr);
469}
470
471/** Returns true if in an rcu reader section. */
472bool rcu_read_locked(void)
473{
474 preemption_disable();
475 bool locked = 0 < CPU->rcu.nesting_cnt;
476 preemption_enable();
477
478 return locked;
479}
480
481/** Unlocks the local reader section using the given nesting count.
482 *
483 * Preemption or interrupts must be disabled.
484 *
485 * @param pnesting_cnt Either &CPU->rcu.tmp_nesting_cnt or
486 * THREAD->rcu.nesting_cnt.
487 */
488static void read_unlock_impl(size_t *pnesting_cnt)
489{
490 assert(PREEMPTION_DISABLED || interrupts_disabled());
491
492 if (0 == --(*pnesting_cnt)) {
493 _rcu_record_qs();
494
495 /*
496 * The thread was preempted while in a critical section or
497 * the detector is eagerly waiting for this cpu's reader
498 * to finish.
499 *
500 * Note that THREAD may be NULL in scheduler() and not just during boot.
501 */
502 if ((THREAD && THREAD->rcu.was_preempted) || CPU->rcu.is_delaying_gp) {
503 /* Rechecks with disabled interrupts. */
504 _rcu_signal_read_unlock();
505 }
506 }
507}
508
509/** If necessary, signals the detector that we exited a reader section. */
510void _rcu_signal_read_unlock(void)
511{
512 assert(PREEMPTION_DISABLED || interrupts_disabled());
513
514 /*
515 * If an interrupt occurs here (even a NMI) it may beat us to
516 * resetting .is_delaying_gp or .was_preempted and up the semaphore
517 * for us.
518 */
519
520 /*
521 * If the detector is eagerly waiting for this cpu's reader to unlock,
522 * notify it that the reader did so.
523 */
524 if (local_atomic_exchange(&CPU->rcu.is_delaying_gp, false)) {
525 semaphore_up(&rcu.remaining_readers);
526 }
527
528 /*
529 * This reader was preempted while in a reader section.
530 * We might be holding up the current GP. Notify the
531 * detector if so.
532 */
533 if (THREAD && local_atomic_exchange(&THREAD->rcu.was_preempted, false)) {
534 assert(link_used(&THREAD->rcu.preempt_link));
535
536 rm_preempted_reader();
537 }
538
539 /* If there was something to signal to the detector we have done so. */
540 CPU->rcu.signal_unlock = false;
541}
542
543#endif /* RCU_PREEMPT_PODZIMEK */
544
545typedef struct synch_item {
546 waitq_t wq;
547 rcu_item_t rcu_item;
548} synch_item_t;
549
550/** Blocks until all preexisting readers exit their critical sections. */
551void rcu_synchronize(void)
552{
553 _rcu_synchronize(false);
554}
555
556/** Blocks until all preexisting readers exit their critical sections. */
557void rcu_synchronize_expedite(void)
558{
559 _rcu_synchronize(true);
560}
561
562/** Blocks until all preexisting readers exit their critical sections. */
563void _rcu_synchronize(bool expedite)
564{
565 /* Calling from a reader section will deadlock. */
566 assert(!rcu_read_locked());
567
568 synch_item_t completion;
569
570 waitq_initialize(&completion.wq);
571 _rcu_call(expedite, &completion.rcu_item, synch_complete);
572 waitq_sleep(&completion.wq);
573}
574
575/** rcu_synchronize's callback. */
576static void synch_complete(rcu_item_t *rcu_item)
577{
578 synch_item_t *completion = member_to_inst(rcu_item, synch_item_t, rcu_item);
579 assert(completion);
580 waitq_wakeup(&completion->wq, WAKEUP_FIRST);
581}
582
583/** Waits for all outstanding rcu calls to complete. */
584void rcu_barrier(void)
585{
586 /*
587 * Serialize rcu_barrier() calls so we don't overwrite cpu.barrier_item
588 * currently in use by rcu_barrier().
589 */
590 mutex_lock(&rcu.barrier_mtx);
591
592 /*
593 * Ensure we queue a barrier callback on all cpus before the already
594 * enqueued barrier callbacks start signaling completion.
595 */
596 atomic_set(&rcu.barrier_wait_cnt, 1);
597
598 DEFINE_CPU_MASK(cpu_mask);
599 cpu_mask_active(cpu_mask);
600
601 cpu_mask_for_each(*cpu_mask, cpu_id) {
602 smp_call(cpu_id, add_barrier_cb, NULL);
603 }
604
605 if (0 < atomic_predec(&rcu.barrier_wait_cnt)) {
606 waitq_sleep(&rcu.barrier_wq);
607 }
608
609 mutex_unlock(&rcu.barrier_mtx);
610}
611
612/** Issues a rcu_barrier() callback on the local cpu.
613 *
614 * Executed with interrupts disabled.
615 */
616static void add_barrier_cb(void *arg)
617{
618 assert(interrupts_disabled() || PREEMPTION_DISABLED);
619 atomic_inc(&rcu.barrier_wait_cnt);
620 rcu_call(&CPU->rcu.barrier_item, barrier_complete);
621}
622
623/** Local cpu's rcu_barrier() completion callback. */
624static void barrier_complete(rcu_item_t *barrier_item)
625{
626 /* Is this the last barrier callback completed? */
627 if (0 == atomic_predec(&rcu.barrier_wait_cnt)) {
628 /* Notify rcu_barrier() that we're done. */
629 waitq_wakeup(&rcu.barrier_wq, WAKEUP_FIRST);
630 }
631}
632
633/** Adds a callback to invoke after all preexisting readers finish.
634 *
635 * May be called from within interrupt handlers or RCU reader sections.
636 *
637 * @param rcu_item Used by RCU to track the call. Must remain
638 * until the user callback function is entered.
639 * @param func User callback function that will be invoked once a full
640 * grace period elapsed, ie at a time when all preexisting
641 * readers have finished. The callback should be short and must
642 * not block. If you must sleep, enqueue your work in the system
643 * work queue from the callback (ie workq_global_enqueue()).
644 */
645void rcu_call(rcu_item_t *rcu_item, rcu_func_t func)
646{
647 rcu_call_impl(false, rcu_item, func);
648}
649
650/** rcu_call() implementation. See rcu_call() for comments. */
651void _rcu_call(bool expedite, rcu_item_t *rcu_item, rcu_func_t func)
652{
653 rcu_call_impl(expedite, rcu_item, func);
654}
655
656/** rcu_call() inline-able implementation. See rcu_call() for comments. */
657static inline void rcu_call_impl(bool expedite, rcu_item_t *rcu_item,
658 rcu_func_t func)
659{
660 assert(rcu_item);
661
662 rcu_item->func = func;
663 rcu_item->next = NULL;
664
665 preemption_disable();
666
667 rcu_cpu_data_t *r = &CPU->rcu;
668
669 rcu_item_t **prev_tail
670 = local_atomic_exchange(&r->parriving_cbs_tail, &rcu_item->next);
671 *prev_tail = rcu_item;
672
673 /* Approximate the number of callbacks present. */
674 ++r->arriving_cbs_cnt;
675
676 if (expedite) {
677 r->expedite_arriving = true;
678 }
679
680 bool first_cb = (prev_tail == &CPU->rcu.arriving_cbs);
681
682 /* Added first callback - notify the reclaimer. */
683 if (first_cb && !semaphore_count_get(&r->arrived_flag)) {
684 semaphore_up(&r->arrived_flag);
685 }
686
687 preemption_enable();
688}
689
690static bool cur_cbs_empty(void)
691{
692 assert(THREAD && THREAD->wired);
693 return NULL == CPU->rcu.cur_cbs;
694}
695
696static bool next_cbs_empty(void)
697{
698 assert(THREAD && THREAD->wired);
699 return NULL == CPU->rcu.next_cbs;
700}
701
702/** Disable interrupts to get an up-to-date result. */
703static bool arriving_cbs_empty(void)
704{
705 assert(THREAD && THREAD->wired);
706 /*
707 * Accessing with interrupts enabled may at worst lead to
708 * a false negative if we race with a local interrupt handler.
709 */
710 return NULL == CPU->rcu.arriving_cbs;
711}
712
713static bool all_cbs_empty(void)
714{
715 return cur_cbs_empty() && next_cbs_empty() && arriving_cbs_empty();
716}
717
718
719/** Reclaimer thread dispatches locally queued callbacks once a GP ends. */
720static void reclaimer(void *arg)
721{
722 assert(THREAD && THREAD->wired);
723 assert(THREAD == CPU->rcu.reclaimer_thr);
724
725 rcu_gp_t last_compl_gp = 0;
726 bool ok = true;
727
728 while (ok && wait_for_pending_cbs()) {
729 assert(CPU->rcu.reclaimer_thr == THREAD);
730
731 exec_completed_cbs(last_compl_gp);
732
733 bool expedite = advance_cbs();
734
735 ok = wait_for_cur_cbs_gp_end(expedite, &last_compl_gp);
736 }
737}
738
739/** Waits until there are callbacks waiting to be dispatched. */
740static bool wait_for_pending_cbs(void)
741{
742 if (!all_cbs_empty())
743 return true;
744
745 bool ok = true;
746
747 while (arriving_cbs_empty() && ok) {
748 ok = semaphore_down_interruptable(&CPU->rcu.arrived_flag);
749 }
750
751 return ok;
752}
753
754static void upd_stat_missed_gp(rcu_gp_t compl)
755{
756 if (CPU->rcu.cur_cbs_gp < compl) {
757 CPU->rcu.stat_missed_gps += (size_t)(compl - CPU->rcu.cur_cbs_gp);
758 }
759}
760
761/** Executes all callbacks for the given completed grace period. */
762static void exec_completed_cbs(rcu_gp_t last_completed_gp)
763{
764 upd_stat_missed_gp(last_completed_gp);
765
766 /* Both next_cbs and cur_cbs GP elapsed. */
767 if (CPU->rcu.next_cbs_gp <= last_completed_gp) {
768 assert(CPU->rcu.cur_cbs_gp <= CPU->rcu.next_cbs_gp);
769
770 size_t exec_cnt = CPU->rcu.cur_cbs_cnt + CPU->rcu.next_cbs_cnt;
771
772 if (exec_cnt < CRITICAL_THRESHOLD) {
773 exec_cbs(&CPU->rcu.cur_cbs);
774 exec_cbs(&CPU->rcu.next_cbs);
775 } else {
776 /*
777 * Getting overwhelmed with too many callbacks to run.
778 * Disable preemption in order to prolong our time slice
779 * and catch up with updaters posting new callbacks.
780 */
781 preemption_disable();
782 exec_cbs(&CPU->rcu.cur_cbs);
783 exec_cbs(&CPU->rcu.next_cbs);
784 preemption_enable();
785 }
786
787 CPU->rcu.cur_cbs_cnt = 0;
788 CPU->rcu.next_cbs_cnt = 0;
789 } else if (CPU->rcu.cur_cbs_gp <= last_completed_gp) {
790
791 if (CPU->rcu.cur_cbs_cnt < CRITICAL_THRESHOLD) {
792 exec_cbs(&CPU->rcu.cur_cbs);
793 } else {
794 /*
795 * Getting overwhelmed with too many callbacks to run.
796 * Disable preemption in order to prolong our time slice
797 * and catch up with updaters posting new callbacks.
798 */
799 preemption_disable();
800 exec_cbs(&CPU->rcu.cur_cbs);
801 preemption_enable();
802 }
803
804 CPU->rcu.cur_cbs_cnt = 0;
805 }
806}
807
808/** Executes callbacks in the single-linked list. The list is left empty. */
809static void exec_cbs(rcu_item_t **phead)
810{
811 rcu_item_t *rcu_item = *phead;
812
813 while (rcu_item) {
814 /* func() may free rcu_item. Get a local copy. */
815 rcu_item_t *next = rcu_item->next;
816 rcu_func_t func = rcu_item->func;
817
818 func(rcu_item);
819
820 rcu_item = next;
821 }
822
823 *phead = NULL;
824}
825
826static void upd_stat_cb_cnts(size_t arriving_cnt)
827{
828 CPU->rcu.stat_max_cbs = max(arriving_cnt, CPU->rcu.stat_max_cbs);
829 if (0 < arriving_cnt) {
830 CPU->rcu.stat_avg_cbs =
831 (99 * CPU->rcu.stat_avg_cbs + 1 * arriving_cnt) / 100;
832 }
833}
834
835/** Prepares another batch of callbacks to dispatch at the nest grace period.
836 *
837 * @return True if the next batch of callbacks must be expedited quickly.
838 */
839static bool advance_cbs(void)
840{
841 /* Move next_cbs to cur_cbs. */
842 CPU->rcu.cur_cbs = CPU->rcu.next_cbs;
843 CPU->rcu.cur_cbs_cnt = CPU->rcu.next_cbs_cnt;
844 CPU->rcu.cur_cbs_gp = CPU->rcu.next_cbs_gp;
845
846 /* Move arriving_cbs to next_cbs. */
847
848 CPU->rcu.next_cbs_cnt = CPU->rcu.arriving_cbs_cnt;
849 CPU->rcu.arriving_cbs_cnt = 0;
850
851 /*
852 * Too many callbacks queued. Better speed up the detection
853 * or risk exhausting all system memory.
854 */
855 bool expedite = (EXPEDITE_THRESHOLD < CPU->rcu.next_cbs_cnt)
856 || CPU->rcu.expedite_arriving;
857 CPU->rcu.expedite_arriving = false;
858
859 /* Start moving the arriving_cbs list to next_cbs. */
860 CPU->rcu.next_cbs = CPU->rcu.arriving_cbs;
861
862 /*
863 * At least one callback arrived. The tail therefore does not point
864 * to the head of arriving_cbs and we can safely reset it to NULL.
865 */
866 if (CPU->rcu.next_cbs) {
867 assert(CPU->rcu.parriving_cbs_tail != &CPU->rcu.arriving_cbs);
868
869 CPU->rcu.arriving_cbs = NULL;
870 /* Reset arriving_cbs before updating the tail pointer. */
871 compiler_barrier();
872 /* Updating the tail pointer completes the move of arriving_cbs. */
873 ACCESS_ONCE(CPU->rcu.parriving_cbs_tail) = &CPU->rcu.arriving_cbs;
874 } else {
875 /*
876 * arriving_cbs was null and parriving_cbs_tail pointed to it
877 * so leave it that way. Note that interrupt handlers may have
878 * added a callback in the meantime so it is not safe to reset
879 * arriving_cbs or parriving_cbs.
880 */
881 }
882
883 /* Update statistics of arrived callbacks. */
884 upd_stat_cb_cnts(CPU->rcu.next_cbs_cnt);
885
886 /*
887 * Make changes prior to queuing next_cbs visible to readers.
888 * See comment in wait_for_readers().
889 */
890 memory_barrier(); /* MB A, B */
891
892 /* At the end of next_cbs_gp, exec next_cbs. Determine what GP that is. */
893
894 if (!next_cbs_empty()) {
895 spinlock_lock(&rcu.gp_lock);
896
897 /* Exec next_cbs at the end of the next GP. */
898 CPU->rcu.next_cbs_gp = _rcu_cur_gp + 1;
899
900 /*
901 * There are no callbacks to invoke before next_cbs. Instruct
902 * wait_for_cur_cbs_gp() to notify us of the nearest GP end.
903 * That could be sooner than next_cbs_gp (if the current GP
904 * had not yet completed), so we'll create a shorter batch
905 * of callbacks next time around.
906 */
907 if (cur_cbs_empty()) {
908 CPU->rcu.cur_cbs_gp = rcu.completed_gp + 1;
909 }
910
911 spinlock_unlock(&rcu.gp_lock);
912 } else {
913 CPU->rcu.next_cbs_gp = CPU->rcu.cur_cbs_gp;
914 }
915
916 assert(CPU->rcu.cur_cbs_gp <= CPU->rcu.next_cbs_gp);
917
918 return expedite;
919}
920
921
922#ifdef RCU_PREEMPT_A
923
924/** Waits for the grace period associated with callbacks cub_cbs to elapse.
925 *
926 * @param expedite Instructs the detector to aggressively speed up grace
927 * period detection without any delay.
928 * @param completed_gp Returns the most recent completed grace period
929 * number.
930 * @return false if the thread was interrupted and should stop.
931 */
932static bool wait_for_cur_cbs_gp_end(bool expedite, rcu_gp_t *completed_gp)
933{
934 spinlock_lock(&rcu.gp_lock);
935
936 assert(CPU->rcu.cur_cbs_gp <= CPU->rcu.next_cbs_gp);
937 assert(CPU->rcu.cur_cbs_gp <= _rcu_cur_gp + 1);
938
939 while (rcu.completed_gp < CPU->rcu.cur_cbs_gp) {
940 /* GP has not yet started - start a new one. */
941 if (rcu.completed_gp == _rcu_cur_gp) {
942 start_new_gp();
943 spinlock_unlock(&rcu.gp_lock);
944
945 if (!wait_for_readers(expedite))
946 return false;
947
948 spinlock_lock(&rcu.gp_lock);
949 /* Notify any reclaimers this GP had ended. */
950 rcu.completed_gp = _rcu_cur_gp;
951 condvar_broadcast(&rcu.gp_ended);
952 } else {
953 /* GP detection is in progress.*/
954
955 if (expedite)
956 condvar_signal(&rcu.expedite_now);
957
958 /* Wait for the GP to complete. */
959 int ret = _condvar_wait_timeout_spinlock(&rcu.gp_ended, &rcu.gp_lock,
960 SYNCH_NO_TIMEOUT, SYNCH_FLAGS_INTERRUPTIBLE);
961
962 if (ret == ESYNCH_INTERRUPTED) {
963 spinlock_unlock(&rcu.gp_lock);
964 return false;
965 }
966 }
967 }
968
969 upd_missed_gp_in_wait(rcu.completed_gp);
970
971 *completed_gp = rcu.completed_gp;
972 spinlock_unlock(&rcu.gp_lock);
973
974 return true;
975}
976
977static bool wait_for_readers(bool expedite)
978{
979 DEFINE_CPU_MASK(reader_cpus);
980
981 cpu_mask_active(reader_cpus);
982 rm_quiescent_cpus(reader_cpus);
983
984 while (!cpu_mask_is_none(reader_cpus)) {
985 /* Give cpus a chance to context switch (a QS) and batch callbacks. */
986 if(!gp_sleep(&expedite))
987 return false;
988
989 rm_quiescent_cpus(reader_cpus);
990 sample_cpus(reader_cpus, reader_cpus);
991 }
992
993 /* Update statistic. */
994 if (expedite) {
995 ++rcu.stat_expedited_cnt;
996 }
997
998 /*
999 * All cpus have passed through a QS and see the most recent _rcu_cur_gp.
1000 * As a result newly preempted readers will associate with next_preempted
1001 * and the number of old readers in cur_preempted will monotonically
1002 * decrease. Wait for those old/preexisting readers.
1003 */
1004 return wait_for_preempt_reader();
1005}
1006
1007static bool gp_sleep(bool *expedite)
1008{
1009 if (*expedite) {
1010 scheduler();
1011 return true;
1012 } else {
1013 spinlock_lock(&rcu.gp_lock);
1014
1015 int ret = 0;
1016 ret = _condvar_wait_timeout_spinlock(&rcu.expedite_now, &rcu.gp_lock,
1017 DETECT_SLEEP_MS * 1000, SYNCH_FLAGS_INTERRUPTIBLE);
1018
1019 /* rcu.expedite_now was signaled. */
1020 if (ret == ESYNCH_OK_BLOCKED) {
1021 *expedite = true;
1022 }
1023
1024 spinlock_unlock(&rcu.gp_lock);
1025
1026 return (ret != ESYNCH_INTERRUPTED);
1027 }
1028}
1029
1030static void sample_local_cpu(void *arg)
1031{
1032 assert(interrupts_disabled());
1033 cpu_mask_t *reader_cpus = (cpu_mask_t *)arg;
1034
1035 bool locked = RCU_CNT_INC <= THE->rcu_nesting;
1036 /* smp_call machinery makes the most current _rcu_cur_gp visible. */
1037 bool passed_qs = (CPU->rcu.last_seen_gp == _rcu_cur_gp);
1038
1039 if (locked && !passed_qs) {
1040 /*
1041 * This cpu has not yet passed a quiescent state during this grace
1042 * period and it is currently in a reader section. We'll have to
1043 * try to sample this cpu again later.
1044 */
1045 } else {
1046 /* Either not in a reader section or already passed a QS. */
1047 cpu_mask_reset(reader_cpus, CPU->id);
1048 /* Contain new reader sections and make prior changes visible to them.*/
1049 memory_barrier();
1050 CPU->rcu.last_seen_gp = _rcu_cur_gp;
1051 }
1052}
1053
1054/** Called by the scheduler() when switching away from the current thread. */
1055void rcu_after_thread_ran(void)
1056{
1057 assert(interrupts_disabled());
1058
1059 /*
1060 * In order not to worry about NMI seeing rcu_nesting change work
1061 * with a local copy.
1062 */
1063 size_t nesting_cnt = local_atomic_exchange(&THE->rcu_nesting, 0);
1064
1065 /*
1066 * Ensures NMIs see .rcu_nesting without the WAS_PREEMPTED mark and
1067 * do not accidentally call rm_preempted_reader() from unlock().
1068 */
1069 compiler_barrier();
1070
1071 /* Preempted a reader critical section for the first time. */
1072 if (RCU_CNT_INC <= nesting_cnt && !(nesting_cnt & RCU_WAS_PREEMPTED)) {
1073 nesting_cnt |= RCU_WAS_PREEMPTED;
1074 note_preempted_reader();
1075 }
1076
1077 /* Save the thread's nesting count when it is not running. */
1078 THREAD->rcu.nesting_cnt = nesting_cnt;
1079
1080 if (CPU->rcu.last_seen_gp != _rcu_cur_gp) {
1081 /*
1082 * Contain any memory accesses of old readers before announcing a QS.
1083 * Also make changes from the previous GP visible to this cpu.
1084 * Moreover it separates writing to last_seen_gp from
1085 * note_preempted_reader().
1086 */
1087 memory_barrier();
1088 /*
1089 * The preempted reader has been noted globally. There are therefore
1090 * no readers running on this cpu so this is a quiescent state.
1091 *
1092 * Reading the multiword _rcu_cur_gp non-atomically is benign.
1093 * At worst, the read value will be different from the actual value.
1094 * As a result, both the detector and this cpu will believe
1095 * this cpu has not yet passed a QS although it really did.
1096 *
1097 * Reloading _rcu_cur_gp is benign, because it cannot change
1098 * until this cpu acknowledges it passed a QS by writing to
1099 * last_seen_gp. Since interrupts are disabled, only this
1100 * code may to so (IPIs won't get through).
1101 */
1102 CPU->rcu.last_seen_gp = _rcu_cur_gp;
1103 }
1104
1105 /*
1106 * Forcefully associate the reclaimer with the highest priority
1107 * even if preempted due to its time slice running out.
1108 */
1109 if (THREAD == CPU->rcu.reclaimer_thr) {
1110 THREAD->priority = -1;
1111 }
1112
1113 upd_max_cbs_in_slice(CPU->rcu.arriving_cbs_cnt);
1114}
1115
1116/** Called by the scheduler() when switching to a newly scheduled thread. */
1117void rcu_before_thread_runs(void)
1118{
1119 assert(!rcu_read_locked());
1120
1121 /* Load the thread's saved nesting count from before it was preempted. */
1122 THE->rcu_nesting = THREAD->rcu.nesting_cnt;
1123}
1124
1125/** Called from scheduler() when exiting the current thread.
1126 *
1127 * Preemption or interrupts are disabled and the scheduler() already
1128 * switched away from the current thread, calling rcu_after_thread_ran().
1129 */
1130void rcu_thread_exiting(void)
1131{
1132 assert(THE->rcu_nesting == 0);
1133
1134 /*
1135 * The thread forgot to exit its reader critical section.
1136 * It is a bug, but rather than letting the entire system lock up
1137 * forcefully leave the reader section. The thread is not holding
1138 * any references anyway since it is exiting so it is safe.
1139 */
1140 if (RCU_CNT_INC <= THREAD->rcu.nesting_cnt) {
1141 /* Emulate _rcu_preempted_unlock() with the proper nesting count. */
1142 if (THREAD->rcu.nesting_cnt & RCU_WAS_PREEMPTED) {
1143 rm_preempted_reader();
1144 }
1145
1146 printf("Bug: thread (id %" PRIu64 " \"%s\") exited while in RCU read"
1147 " section.\n", THREAD->tid, THREAD->name);
1148 }
1149}
1150
1151/** Returns true if in an rcu reader section. */
1152bool rcu_read_locked(void)
1153{
1154 return RCU_CNT_INC <= THE->rcu_nesting;
1155}
1156
1157/** Invoked when a preempted reader finally exits its reader section. */
1158void _rcu_preempted_unlock(void)
1159{
1160 assert(0 == THE->rcu_nesting || RCU_WAS_PREEMPTED == THE->rcu_nesting);
1161
1162 size_t prev = local_atomic_exchange(&THE->rcu_nesting, 0);
1163 if (prev == RCU_WAS_PREEMPTED) {
1164 /*
1165 * NMI handlers are never preempted but may call rm_preempted_reader()
1166 * if a NMI occurred in _rcu_preempted_unlock() of a preempted thread.
1167 * The only other rcu code that may have been interrupted by the NMI
1168 * in _rcu_preempted_unlock() is: an IPI/sample_local_cpu() and
1169 * the initial part of rcu_after_thread_ran().
1170 *
1171 * rm_preempted_reader() will not deadlock because none of the locks
1172 * it uses are locked in this case. Neither _rcu_preempted_unlock()
1173 * nor sample_local_cpu() nor the initial part of rcu_after_thread_ran()
1174 * acquire any locks.
1175 */
1176 rm_preempted_reader();
1177 }
1178}
1179
1180#elif defined(RCU_PREEMPT_PODZIMEK)
1181
1182/** Waits for the grace period associated with callbacks cub_cbs to elapse.
1183 *
1184 * @param expedite Instructs the detector to aggressively speed up grace
1185 * period detection without any delay.
1186 * @param completed_gp Returns the most recent completed grace period
1187 * number.
1188 * @return false if the thread was interrupted and should stop.
1189 */
1190static bool wait_for_cur_cbs_gp_end(bool expedite, rcu_gp_t *completed_gp)
1191{
1192 /*
1193 * Use a possibly outdated version of completed_gp to bypass checking
1194 * with the lock.
1195 *
1196 * Note that loading and storing rcu.completed_gp is not atomic
1197 * (it is 64bit wide). Reading a clobbered value that is less than
1198 * rcu.completed_gp is harmless - we'll recheck with a lock. The
1199 * only way to read a clobbered value that is greater than the actual
1200 * value is if the detector increases the higher-order word first and
1201 * then decreases the lower-order word (or we see stores in that order),
1202 * eg when incrementing from 2^32 - 1 to 2^32. The loaded value
1203 * suddenly jumps by 2^32. It would take hours for such an increase
1204 * to occur so it is safe to discard the value. We allow increases
1205 * of up to half the maximum to generously accommodate for loading an
1206 * outdated lower word.
1207 */
1208 rcu_gp_t compl_gp = ACCESS_ONCE(rcu.completed_gp);
1209 if (CPU->rcu.cur_cbs_gp <= compl_gp
1210 && compl_gp <= CPU->rcu.cur_cbs_gp + UINT32_MAX_HALF) {
1211 *completed_gp = compl_gp;
1212 return true;
1213 }
1214
1215 spinlock_lock(&rcu.gp_lock);
1216
1217 if (CPU->rcu.cur_cbs_gp <= rcu.completed_gp) {
1218 *completed_gp = rcu.completed_gp;
1219 spinlock_unlock(&rcu.gp_lock);
1220 return true;
1221 }
1222
1223 assert(CPU->rcu.cur_cbs_gp <= CPU->rcu.next_cbs_gp);
1224 assert(_rcu_cur_gp <= CPU->rcu.cur_cbs_gp);
1225
1226 /*
1227 * Notify the detector of how many GP ends we intend to wait for, so
1228 * it can avoid going to sleep unnecessarily. Optimistically assume
1229 * new callbacks will arrive while we're waiting; hence +1.
1230 */
1231 size_t remaining_gp_ends = (size_t) (CPU->rcu.next_cbs_gp - _rcu_cur_gp);
1232 req_detection(remaining_gp_ends + (arriving_cbs_empty() ? 0 : 1));
1233
1234 /*
1235 * Ask the detector to speed up GP detection if there are too many
1236 * pending callbacks and other reclaimers have not already done so.
1237 */
1238 if (expedite) {
1239 if(0 == rcu.req_expedited_cnt)
1240 condvar_signal(&rcu.expedite_now);
1241
1242 /*
1243 * Expedite only cub_cbs. If there really is a surge of callbacks
1244 * the arriving batch will expedite the GP for the huge number
1245 * of callbacks currently in next_cbs
1246 */
1247 rcu.req_expedited_cnt = 1;
1248 }
1249
1250 /* Wait for cur_cbs_gp to end. */
1251 bool interrupted = cv_wait_for_gp(CPU->rcu.cur_cbs_gp);
1252
1253 *completed_gp = rcu.completed_gp;
1254 spinlock_unlock(&rcu.gp_lock);
1255
1256 if (!interrupted)
1257 upd_missed_gp_in_wait(*completed_gp);
1258
1259 return !interrupted;
1260}
1261
1262/** Waits for an announcement of the end of the grace period wait_on_gp. */
1263static bool cv_wait_for_gp(rcu_gp_t wait_on_gp)
1264{
1265 assert(spinlock_locked(&rcu.gp_lock));
1266
1267 bool interrupted = false;
1268
1269 /* Wait until wait_on_gp ends. */
1270 while (rcu.completed_gp < wait_on_gp && !interrupted) {
1271 int ret = _condvar_wait_timeout_spinlock(&rcu.gp_ended, &rcu.gp_lock,
1272 SYNCH_NO_TIMEOUT, SYNCH_FLAGS_INTERRUPTIBLE);
1273 interrupted = (ret == ESYNCH_INTERRUPTED);
1274 }
1275
1276 return interrupted;
1277}
1278
1279/** Requests the detector to detect at least req_cnt consecutive grace periods.*/
1280static void req_detection(size_t req_cnt)
1281{
1282 if (rcu.req_gp_end_cnt < req_cnt) {
1283 bool detector_idle = (0 == rcu.req_gp_end_cnt);
1284 rcu.req_gp_end_cnt = req_cnt;
1285
1286 if (detector_idle) {
1287 assert(_rcu_cur_gp == rcu.completed_gp);
1288 condvar_signal(&rcu.req_gp_changed);
1289 }
1290 }
1291}
1292
1293
1294/** The detector thread detects and notifies reclaimers of grace period ends. */
1295static void detector(void *arg)
1296{
1297 spinlock_lock(&rcu.gp_lock);
1298
1299 while (wait_for_detect_req()) {
1300 /*
1301 * Announce new GP started. Readers start lazily acknowledging that
1302 * they passed a QS.
1303 */
1304 start_new_gp();
1305
1306 spinlock_unlock(&rcu.gp_lock);
1307
1308 if (!wait_for_readers())
1309 goto unlocked_out;
1310
1311 spinlock_lock(&rcu.gp_lock);
1312
1313 /* Notify reclaimers that they may now invoke queued callbacks. */
1314 end_cur_gp();
1315 }
1316
1317 spinlock_unlock(&rcu.gp_lock);
1318
1319unlocked_out:
1320 return;
1321}
1322
1323/** Waits for a request from a reclaimer thread to detect a grace period. */
1324static bool wait_for_detect_req(void)
1325{
1326 assert(spinlock_locked(&rcu.gp_lock));
1327
1328 bool interrupted = false;
1329
1330 while (0 == rcu.req_gp_end_cnt && !interrupted) {
1331 int ret = _condvar_wait_timeout_spinlock(&rcu.req_gp_changed,
1332 &rcu.gp_lock, SYNCH_NO_TIMEOUT, SYNCH_FLAGS_INTERRUPTIBLE);
1333
1334 interrupted = (ret == ESYNCH_INTERRUPTED);
1335 }
1336
1337 return !interrupted;
1338}
1339
1340
1341static void end_cur_gp(void)
1342{
1343 assert(spinlock_locked(&rcu.gp_lock));
1344
1345 rcu.completed_gp = _rcu_cur_gp;
1346 --rcu.req_gp_end_cnt;
1347
1348 condvar_broadcast(&rcu.gp_ended);
1349}
1350
1351/** Waits for readers that started before the current GP started to finish. */
1352static bool wait_for_readers(void)
1353{
1354 DEFINE_CPU_MASK(reading_cpus);
1355
1356 /* All running cpus have potential readers. */
1357 cpu_mask_active(reading_cpus);
1358
1359 /*
1360 * Give readers time to pass through a QS. Also, batch arriving
1361 * callbacks in order to amortize detection overhead.
1362 */
1363 if (!gp_sleep())
1364 return false;
1365
1366 /* Non-intrusively determine which cpus have yet to pass a QS. */
1367 rm_quiescent_cpus(reading_cpus);
1368
1369 /* Actively interrupt cpus delaying the current GP and demand a QS. */
1370 interrupt_delaying_cpus(reading_cpus);
1371
1372 /* Wait for the interrupted cpus to notify us that they reached a QS. */
1373 if (!wait_for_delaying_cpus())
1374 return false;
1375 /*
1376 * All cpus recorded a QS or are still idle. Any new readers will be added
1377 * to next_preempt if preempted, ie the number of readers in cur_preempted
1378 * monotonically descreases.
1379 */
1380
1381 /* Wait for the last reader in cur_preempted to notify us it is done. */
1382 if (!wait_for_preempt_reader())
1383 return false;
1384
1385 return true;
1386}
1387
1388/** Sleeps a while if the current grace period is not to be expedited. */
1389static bool gp_sleep(void)
1390{
1391 spinlock_lock(&rcu.gp_lock);
1392
1393 int ret = 0;
1394 while (0 == rcu.req_expedited_cnt && 0 == ret) {
1395 /* minor bug: sleeps for the same duration if woken up spuriously. */
1396 ret = _condvar_wait_timeout_spinlock(&rcu.expedite_now, &rcu.gp_lock,
1397 DETECT_SLEEP_MS * 1000, SYNCH_FLAGS_INTERRUPTIBLE);
1398 }
1399
1400 if (0 < rcu.req_expedited_cnt) {
1401 --rcu.req_expedited_cnt;
1402 /* Update statistic. */
1403 ++rcu.stat_expedited_cnt;
1404 }
1405
1406 spinlock_unlock(&rcu.gp_lock);
1407
1408 return (ret != ESYNCH_INTERRUPTED);
1409}
1410
1411/** Actively interrupts and checks the offending cpus for quiescent states. */
1412static void interrupt_delaying_cpus(cpu_mask_t *cpu_mask)
1413{
1414 atomic_set(&rcu.delaying_cpu_cnt, 0);
1415
1416 sample_cpus(cpu_mask, NULL);
1417}
1418
1419/** Invoked on a cpu delaying grace period detection.
1420 *
1421 * Induces a quiescent state for the cpu or it instructs remaining
1422 * readers to notify the detector once they finish.
1423 */
1424static void sample_local_cpu(void *arg)
1425{
1426 assert(interrupts_disabled());
1427 assert(!CPU->rcu.is_delaying_gp);
1428
1429 /* Cpu did not pass a quiescent state yet. */
1430 if (CPU->rcu.last_seen_gp != _rcu_cur_gp) {
1431 /* Interrupted a reader in a reader critical section. */
1432 if (0 < CPU->rcu.nesting_cnt) {
1433 assert(!CPU->idle);
1434 /*
1435 * Note to notify the detector from rcu_read_unlock().
1436 *
1437 * ACCESS_ONCE ensures the compiler writes to is_delaying_gp
1438 * only after it determines that we are in a reader CS.
1439 */
1440 ACCESS_ONCE(CPU->rcu.is_delaying_gp) = true;
1441 CPU->rcu.signal_unlock = true;
1442
1443 atomic_inc(&rcu.delaying_cpu_cnt);
1444 } else {
1445 /*
1446 * The cpu did not enter any rcu reader sections since
1447 * the start of the current GP. Record a quiescent state.
1448 *
1449 * Or, we interrupted rcu_read_unlock_impl() right before
1450 * it recorded a QS. Record a QS for it. The memory barrier
1451 * contains the reader section's mem accesses before
1452 * updating last_seen_gp.
1453 *
1454 * Or, we interrupted rcu_read_lock() right after it recorded
1455 * a QS for the previous GP but before it got a chance to
1456 * increment its nesting count. The memory barrier again
1457 * stops the CS code from spilling out of the CS.
1458 */
1459 memory_barrier();
1460 CPU->rcu.last_seen_gp = _rcu_cur_gp;
1461 }
1462 } else {
1463 /*
1464 * This cpu already acknowledged that it had passed through
1465 * a quiescent state since the start of cur_gp.
1466 */
1467 }
1468
1469 /*
1470 * smp_call() makes sure any changes propagate back to the caller.
1471 * In particular, it makes the most current last_seen_gp visible
1472 * to the detector.
1473 */
1474}
1475
1476/** Waits for cpus delaying the current grace period if there are any. */
1477static bool wait_for_delaying_cpus(void)
1478{
1479 int delaying_cpu_cnt = atomic_get(&rcu.delaying_cpu_cnt);
1480
1481 for (int i = 0; i < delaying_cpu_cnt; ++i){
1482 if (!semaphore_down_interruptable(&rcu.remaining_readers))
1483 return false;
1484 }
1485
1486 /* Update statistic. */
1487 rcu.stat_delayed_cnt += delaying_cpu_cnt;
1488
1489 return true;
1490}
1491
1492/** Called by the scheduler() when switching away from the current thread. */
1493void rcu_after_thread_ran(void)
1494{
1495 assert(interrupts_disabled());
1496
1497 /*
1498 * Prevent NMI handlers from interfering. The detector will be notified
1499 * in this function if CPU->rcu.is_delaying_gp. The current thread is
1500 * no longer running so there is nothing else to signal to the detector.
1501 */
1502 CPU->rcu.signal_unlock = false;
1503 /*
1504 * Separates clearing of .signal_unlock from accesses to
1505 * THREAD->rcu.was_preempted and CPU->rcu.nesting_cnt.
1506 */
1507 compiler_barrier();
1508
1509 /* Save the thread's nesting count when it is not running. */
1510 THREAD->rcu.nesting_cnt = CPU->rcu.nesting_cnt;
1511
1512 /* Preempted a reader critical section for the first time. */
1513 if (0 < THREAD->rcu.nesting_cnt && !THREAD->rcu.was_preempted) {
1514 THREAD->rcu.was_preempted = true;
1515 note_preempted_reader();
1516 }
1517
1518 /*
1519 * The preempted reader has been noted globally. There are therefore
1520 * no readers running on this cpu so this is a quiescent state.
1521 */
1522 _rcu_record_qs();
1523
1524 /*
1525 * Interrupt handlers might use RCU while idle in scheduler().
1526 * The preempted reader has been noted globally, so the handlers
1527 * may now start announcing quiescent states.
1528 */
1529 CPU->rcu.nesting_cnt = 0;
1530
1531 /*
1532 * This cpu is holding up the current GP. Let the detector know
1533 * it has just passed a quiescent state.
1534 *
1535 * The detector waits separately for preempted readers, so we have
1536 * to notify the detector even if we have just preempted a reader.
1537 */
1538 if (CPU->rcu.is_delaying_gp) {
1539 CPU->rcu.is_delaying_gp = false;
1540 semaphore_up(&rcu.remaining_readers);
1541 }
1542
1543 /*
1544 * Forcefully associate the detector with the highest priority
1545 * even if preempted due to its time slice running out.
1546 *
1547 * todo: Replace with strict scheduler priority classes.
1548 */
1549 if (THREAD == rcu.detector_thr) {
1550 THREAD->priority = -1;
1551 }
1552 else if (THREAD == CPU->rcu.reclaimer_thr) {
1553 THREAD->priority = -1;
1554 }
1555
1556 upd_max_cbs_in_slice(CPU->rcu.arriving_cbs_cnt);
1557}
1558
1559/** Called by the scheduler() when switching to a newly scheduled thread. */
1560void rcu_before_thread_runs(void)
1561{
1562 assert(PREEMPTION_DISABLED || interrupts_disabled());
1563 assert(0 == CPU->rcu.nesting_cnt);
1564
1565 /* Load the thread's saved nesting count from before it was preempted. */
1566 CPU->rcu.nesting_cnt = THREAD->rcu.nesting_cnt;
1567
1568 /*
1569 * Ensures NMI see the proper nesting count before .signal_unlock.
1570 * Otherwise the NMI may incorrectly signal that a preempted reader
1571 * exited its reader section.
1572 */
1573 compiler_barrier();
1574
1575 /*
1576 * In the unlikely event that a NMI occurs between the loading of the
1577 * variables and setting signal_unlock, the NMI handler may invoke
1578 * rcu_read_unlock() and clear signal_unlock. In that case we will
1579 * incorrectly overwrite signal_unlock from false to true. This event
1580 * is benign and the next rcu_read_unlock() will at worst
1581 * needlessly invoke _rcu_signal_unlock().
1582 */
1583 CPU->rcu.signal_unlock = THREAD->rcu.was_preempted || CPU->rcu.is_delaying_gp;
1584}
1585
1586/** Called from scheduler() when exiting the current thread.
1587 *
1588 * Preemption or interrupts are disabled and the scheduler() already
1589 * switched away from the current thread, calling rcu_after_thread_ran().
1590 */
1591void rcu_thread_exiting(void)
1592{
1593 assert(THREAD != NULL);
1594 assert(THREAD->state == Exiting);
1595 assert(PREEMPTION_DISABLED || interrupts_disabled());
1596
1597 /*
1598 * The thread forgot to exit its reader critical section.
1599 * It is a bug, but rather than letting the entire system lock up
1600 * forcefully leave the reader section. The thread is not holding
1601 * any references anyway since it is exiting so it is safe.
1602 */
1603 if (0 < THREAD->rcu.nesting_cnt) {
1604 THREAD->rcu.nesting_cnt = 1;
1605 read_unlock_impl(&THREAD->rcu.nesting_cnt);
1606
1607 printf("Bug: thread (id %" PRIu64 " \"%s\") exited while in RCU read"
1608 " section.\n", THREAD->tid, THREAD->name);
1609 }
1610}
1611
1612
1613#endif /* RCU_PREEMPT_PODZIMEK */
1614
1615/** Announces the start of a new grace period for preexisting readers to ack. */
1616static void start_new_gp(void)
1617{
1618 assert(spinlock_locked(&rcu.gp_lock));
1619
1620 irq_spinlock_lock(&rcu.preempt_lock, true);
1621
1622 /* Start a new GP. Announce to readers that a quiescent state is needed. */
1623 ++_rcu_cur_gp;
1624
1625 /*
1626 * Readers preempted before the start of this GP (next_preempted)
1627 * are preexisting readers now that a GP started and will hold up
1628 * the current GP until they exit their reader sections.
1629 *
1630 * Preempted readers from the previous GP have finished so
1631 * cur_preempted is empty, but see comment in _rcu_record_qs().
1632 */
1633 list_concat(&rcu.cur_preempted, &rcu.next_preempted);
1634
1635 irq_spinlock_unlock(&rcu.preempt_lock, true);
1636}
1637
1638/** Remove those cpus from the mask that have already passed a quiescent
1639 * state since the start of the current grace period.
1640 */
1641static void rm_quiescent_cpus(cpu_mask_t *cpu_mask)
1642{
1643 /*
1644 * Ensure the announcement of the start of a new GP (ie up-to-date
1645 * cur_gp) propagates to cpus that are just coming out of idle
1646 * mode before we sample their idle state flag.
1647 *
1648 * Cpus guarantee that after they set CPU->idle = true they will not
1649 * execute any RCU reader sections without first setting idle to
1650 * false and issuing a memory barrier. Therefore, if rm_quiescent_cpus()
1651 * later on sees an idle cpu, but the cpu is just exiting its idle mode,
1652 * the cpu must not have yet executed its memory barrier (otherwise
1653 * it would pair up with this mem barrier and we would see idle == false).
1654 * That memory barrier will pair up with the one below and ensure
1655 * that a reader on the now-non-idle cpu will see the most current
1656 * cur_gp. As a result, such a reader will never attempt to semaphore_up(
1657 * pending_readers) during this GP, which allows the detector to
1658 * ignore that cpu (the detector thinks it is idle). Moreover, any
1659 * changes made by RCU updaters will have propagated to readers
1660 * on the previously idle cpu -- again thanks to issuing a memory
1661 * barrier after returning from idle mode.
1662 *
1663 * idle -> non-idle cpu | detector | reclaimer
1664 * ------------------------------------------------------
1665 * rcu reader 1 | | rcu_call()
1666 * MB X | |
1667 * idle = true | | rcu_call()
1668 * (no rcu readers allowed ) | | MB A in advance_cbs()
1669 * MB Y | (...) | (...)
1670 * (no rcu readers allowed) | | MB B in advance_cbs()
1671 * idle = false | ++cur_gp |
1672 * (no rcu readers allowed) | MB C |
1673 * MB Z | signal gp_end |
1674 * rcu reader 2 | | exec_cur_cbs()
1675 *
1676 *
1677 * MB Y orders visibility of changes to idle for detector's sake.
1678 *
1679 * MB Z pairs up with MB C. The cpu making a transition from idle
1680 * will see the most current value of cur_gp and will not attempt
1681 * to notify the detector even if preempted during this GP.
1682 *
1683 * MB Z pairs up with MB A from the previous batch. Updaters' changes
1684 * are visible to reader 2 even when the detector thinks the cpu is idle
1685 * but it is not anymore.
1686 *
1687 * MB X pairs up with MB B. Late mem accesses of reader 1 are contained
1688 * and visible before idling and before any callbacks are executed
1689 * by reclaimers.
1690 *
1691 * In summary, the detector does not know of or wait for reader 2, but
1692 * it does not have to since it is a new reader that will not access
1693 * data from previous GPs and will see any changes.
1694 */
1695 memory_barrier(); /* MB C */
1696
1697 cpu_mask_for_each(*cpu_mask, cpu_id) {
1698 /*
1699 * The cpu already checked for and passed through a quiescent
1700 * state since the beginning of this GP.
1701 *
1702 * _rcu_cur_gp is modified by local detector thread only.
1703 * Therefore, it is up-to-date even without a lock.
1704 *
1705 * cpu.last_seen_gp may not be up-to-date. At worst, we will
1706 * unnecessarily sample its last_seen_gp with a smp_call.
1707 */
1708 bool cpu_acked_gp = (cpus[cpu_id].rcu.last_seen_gp == _rcu_cur_gp);
1709
1710 /*
1711 * Either the cpu is idle or it is exiting away from idle mode
1712 * and already sees the most current _rcu_cur_gp. See comment
1713 * in wait_for_readers().
1714 */
1715 bool cpu_idle = cpus[cpu_id].idle;
1716
1717 if (cpu_acked_gp || cpu_idle) {
1718 cpu_mask_reset(cpu_mask, cpu_id);
1719 }
1720 }
1721}
1722
1723/** Serially invokes sample_local_cpu(arg) on each cpu of reader_cpus. */
1724static void sample_cpus(cpu_mask_t *reader_cpus, void *arg)
1725{
1726 cpu_mask_for_each(*reader_cpus, cpu_id) {
1727 smp_call(cpu_id, sample_local_cpu, arg);
1728
1729 /* Update statistic. */
1730 if (CPU->id != cpu_id)
1731 ++rcu.stat_smp_call_cnt;
1732 }
1733}
1734
1735static void upd_missed_gp_in_wait(rcu_gp_t completed_gp)
1736{
1737 assert(CPU->rcu.cur_cbs_gp <= completed_gp);
1738
1739 size_t delta = (size_t)(completed_gp - CPU->rcu.cur_cbs_gp);
1740 CPU->rcu.stat_missed_gp_in_wait += delta;
1741}
1742
1743/** Globally note that the current thread was preempted in a reader section. */
1744static void note_preempted_reader(void)
1745{
1746 irq_spinlock_lock(&rcu.preempt_lock, false);
1747
1748 if (CPU->rcu.last_seen_gp != _rcu_cur_gp) {
1749 /* The reader started before the GP started - we must wait for it.*/
1750 list_append(&THREAD->rcu.preempt_link, &rcu.cur_preempted);
1751 } else {
1752 /*
1753 * The reader started after the GP started and this cpu
1754 * already noted a quiescent state. We might block the next GP.
1755 */
1756 list_append(&THREAD->rcu.preempt_link, &rcu.next_preempted);
1757 }
1758
1759 irq_spinlock_unlock(&rcu.preempt_lock, false);
1760}
1761
1762/** Remove the current thread from the global list of preempted readers. */
1763static void rm_preempted_reader(void)
1764{
1765 irq_spinlock_lock(&rcu.preempt_lock, true);
1766
1767 assert(link_used(&THREAD->rcu.preempt_link));
1768
1769 bool prev_empty = list_empty(&rcu.cur_preempted);
1770 list_remove(&THREAD->rcu.preempt_link);
1771 bool now_empty = list_empty(&rcu.cur_preempted);
1772
1773 /* This was the last reader in cur_preempted. */
1774 bool last_removed = now_empty && !prev_empty;
1775
1776 /*
1777 * Preempted readers are blocking the detector and
1778 * this was the last reader blocking the current GP.
1779 */
1780 if (last_removed && rcu.preempt_blocking_det) {
1781 rcu.preempt_blocking_det = false;
1782 semaphore_up(&rcu.remaining_readers);
1783 }
1784
1785 irq_spinlock_unlock(&rcu.preempt_lock, true);
1786}
1787
1788/** Waits for any preempted readers blocking this grace period to finish.*/
1789static bool wait_for_preempt_reader(void)
1790{
1791 irq_spinlock_lock(&rcu.preempt_lock, true);
1792
1793 bool reader_exists = !list_empty(&rcu.cur_preempted);
1794 rcu.preempt_blocking_det = reader_exists;
1795
1796 irq_spinlock_unlock(&rcu.preempt_lock, true);
1797
1798 if (reader_exists) {
1799 /* Update statistic. */
1800 ++rcu.stat_preempt_blocking_cnt;
1801
1802 return semaphore_down_interruptable(&rcu.remaining_readers);
1803 }
1804
1805 return true;
1806}
1807
1808static void upd_max_cbs_in_slice(size_t arriving_cbs_cnt)
1809{
1810 rcu_cpu_data_t *cr = &CPU->rcu;
1811
1812 if (arriving_cbs_cnt > cr->last_arriving_cnt) {
1813 size_t arrived_cnt = arriving_cbs_cnt - cr->last_arriving_cnt;
1814 cr->stat_max_slice_cbs = max(arrived_cnt, cr->stat_max_slice_cbs);
1815 }
1816
1817 cr->last_arriving_cnt = arriving_cbs_cnt;
1818}
1819
1820/** Prints RCU run-time statistics. */
1821void rcu_print_stat(void)
1822{
1823 /*
1824 * Don't take locks. Worst case is we get out-dated values.
1825 * CPU local values are updated without any locks, so there
1826 * are no locks to lock in order to get up-to-date values.
1827 */
1828
1829#ifdef RCU_PREEMPT_PODZIMEK
1830 const char *algo = "podzimek-preempt-rcu";
1831#elif defined(RCU_PREEMPT_A)
1832 const char *algo = "a-preempt-rcu";
1833#endif
1834
1835 printf("Config: expedite_threshold=%d, critical_threshold=%d,"
1836 " detect_sleep=%dms, %s\n",
1837 EXPEDITE_THRESHOLD, CRITICAL_THRESHOLD, DETECT_SLEEP_MS, algo);
1838 printf("Completed GPs: %" PRIu64 "\n", rcu.completed_gp);
1839 printf("Expedited GPs: %zu\n", rcu.stat_expedited_cnt);
1840 printf("Delayed GPs: %zu (cpus w/ still running readers after gp sleep)\n",
1841 rcu.stat_delayed_cnt);
1842 printf("Preempt blocked GPs: %zu (waited for preempted readers; "
1843 "running or not)\n", rcu.stat_preempt_blocking_cnt);
1844 printf("Smp calls: %zu\n", rcu.stat_smp_call_cnt);
1845
1846 printf("Max arrived callbacks per GP and CPU:\n");
1847 for (unsigned int i = 0; i < config.cpu_count; ++i) {
1848 printf(" %zu", cpus[i].rcu.stat_max_cbs);
1849 }
1850
1851 printf("\nAvg arrived callbacks per GP and CPU (nonempty batches only):\n");
1852 for (unsigned int i = 0; i < config.cpu_count; ++i) {
1853 printf(" %zu", cpus[i].rcu.stat_avg_cbs);
1854 }
1855
1856 printf("\nMax arrived callbacks per time slice and CPU:\n");
1857 for (unsigned int i = 0; i < config.cpu_count; ++i) {
1858 printf(" %zu", cpus[i].rcu.stat_max_slice_cbs);
1859 }
1860
1861 printf("\nMissed GP notifications per CPU:\n");
1862 for (unsigned int i = 0; i < config.cpu_count; ++i) {
1863 printf(" %zu", cpus[i].rcu.stat_missed_gps);
1864 }
1865
1866 printf("\nMissed GP notifications per CPU while waking up:\n");
1867 for (unsigned int i = 0; i < config.cpu_count; ++i) {
1868 printf(" %zu", cpus[i].rcu.stat_missed_gp_in_wait);
1869 }
1870 printf("\n");
1871}
1872
1873/** @}
1874 */
Note: See TracBrowser for help on using the repository browser.