Index: kernel/generic/src/synch/condvar.c
===================================================================
--- kernel/generic/src/synch/condvar.c	(revision feeac0d8cd6e0ce736779bc5f1dc701c5cf42ba6)
+++ kernel/generic/src/synch/condvar.c	(revision 5e3fa9dbfffc978cdcfdaaea9fc6a50dbb0bca1a)
@@ -38,4 +38,5 @@
 #include <synch/condvar.h>
 #include <synch/mutex.h>
+#include <synch/spinlock.h>
 #include <synch/waitq.h>
 #include <arch.h>
@@ -90,4 +91,5 @@
 
 	ipl = waitq_sleep_prepare(&cv->wq);
+	/* Unlock only after the waitq is locked so we don't miss a wakeup. */
 	mutex_unlock(mtx);
 
@@ -95,10 +97,93 @@
 	rc = waitq_sleep_timeout_unsafe(&cv->wq, usec, flags);
 
+	waitq_sleep_finish(&cv->wq, rc, ipl);
+	/* Lock only after releasing the waitq to avoid a possible deadlock. */
 	mutex_lock(mtx);
-	waitq_sleep_finish(&cv->wq, rc, ipl);
 
 	return rc;
 }
 
+/** Wait for the condition to become true with a locked spinlock.
+ * 
+ * The function is not aware of irq_spinlock. Therefore do not even
+ * try passing irq_spinlock_t to it. Use _condvar_wait_timeout_irq_spinlock()
+ * instead.
+ *
+ * @param cv		Condition variable.
+ * @param lock		Locked spinlock.
+ * @param usec		Timeout value in microseconds.
+ * @param flags		Select mode of operation.
+ *
+ * For exact description of meaning of possible combinations of usec and flags,
+ * see comment for waitq_sleep_timeout().  Note that when
+ * SYNCH_FLAGS_NON_BLOCKING is specified here, ESYNCH_WOULD_BLOCK is always
+ * returned.
+ *
+ * @return See comment for waitq_sleep_timeout().
+ */
+int _condvar_wait_timeout_spinlock_impl(condvar_t *cv, spinlock_t *lock, 
+	uint32_t usec, int flags)
+{
+	int rc;
+	ipl_t ipl;
+	
+	ipl = waitq_sleep_prepare(&cv->wq);
+
+	/* Unlock only after the waitq is locked so we don't miss a wakeup. */
+	spinlock_unlock(lock);
+
+	cv->wq.missed_wakeups = 0;	/* Enforce blocking. */
+	rc = waitq_sleep_timeout_unsafe(&cv->wq, usec, flags);
+
+	waitq_sleep_finish(&cv->wq, rc, ipl);
+	/* Lock only after releasing the waitq to avoid a possible deadlock. */
+	spinlock_lock(lock);
+	
+	return rc;
+}
+
+/** Wait for the condition to become true with a locked irq spinlock.
+ * 
+ * @param cv		Condition variable.
+ * @param lock		Locked irq spinlock.
+ * @param usec		Timeout value in microseconds.
+ * @param flags		Select mode of operation.
+ *
+ * For exact description of meaning of possible combinations of usec and flags,
+ * see comment for waitq_sleep_timeout().  Note that when
+ * SYNCH_FLAGS_NON_BLOCKING is specified here, ESYNCH_WOULD_BLOCK is always
+ * returned.
+ *
+ * @return See comment for waitq_sleep_timeout().
+ */
+int _condvar_wait_timeout_irq_spinlock(condvar_t *cv, irq_spinlock_t *irq_lock, 
+	uint32_t usec, int flags)
+{
+	int rc;
+	/* Save spinlock's state so we can restore it correctly later on. */
+	ipl_t ipl = irq_lock->ipl;
+	bool guard = irq_lock->guard;
+	
+	irq_lock->guard = false;
+	
+	/* 
+	 * waitq_prepare() restores interrupts to the current state, 
+	 * ie disabled. Therefore, interrupts will remain disabled while 
+	 * it spins waiting for a pending timeout handler to complete. 
+	 * Although it spins with interrupts disabled there can only
+	 * be a pending timeout if we failed to cancel an imminent
+	 * timeout (on another cpu) during a wakeup. As a result the 
+	 * timeout handler is guaranteed to run (it is most likely already 
+	 * running) and there is no danger of a deadlock.
+	 */
+	rc = _condvar_wait_timeout_spinlock(cv, &irq_lock->lock, usec, flags);
+	
+	irq_lock->guard = guard;
+	irq_lock->ipl = ipl;
+	
+	return rc;
+}
+
+
 /** @}
  */
Index: kernel/generic/src/synch/futex.c
===================================================================
--- kernel/generic/src/synch/futex.c	(revision feeac0d8cd6e0ce736779bc5f1dc701c5cf42ba6)
+++ kernel/generic/src/synch/futex.c	(revision 5e3fa9dbfffc978cdcfdaaea9fc6a50dbb0bca1a)
@@ -1,4 +1,5 @@
 /*
  * Copyright (c) 2006 Jakub Jermar
+ * Copyright (c) 2012 Adam Hraska
  * All rights reserved.
  *
@@ -34,4 +35,28 @@
  * @file
  * @brief	Kernel backend for futexes.
+ * 
+ * Kernel futex objects are stored in a global hash table futex_ht 
+ * where the physical address of the futex variable (futex_t.paddr)
+ * is used as the lookup key. As a result multiple address spaces 
+ * may share the same futex variable. 
+ * 
+ * A kernel futex object is created the first time a task accesses
+ * the futex (having a futex variable at a physical address not 
+ * encountered before). Futex object's lifetime is governed by
+ * a reference count that represents the number of all the different
+ * user space virtual addresses from all tasks that map to the
+ * physical address of the futex variable. A futex object is freed
+ * when the last task having accessed the futex exits.
+ * 
+ * Each task keeps track of the futex objects it accessed in a list
+ * of pointers (futex_ptr_t, task->futex_list) to the different futex 
+ * objects.
+ * 
+ * To speed up translation of futex variables' virtual addresses
+ * to their physical addresses, futex pointers accessed by the
+ * task are furthermore stored in a concurrent hash table (CHT,
+ * task->futexes->ht). A single lookup without locks or accesses
+ * to the page table translates a futex variable's virtual address 
+ * into its futex kernel object. 
  */
 
@@ -39,4 +64,5 @@
 #include <synch/mutex.h>
 #include <synch/spinlock.h>
+#include <synch/rcu.h>
 #include <mm/frame.h>
 #include <mm/page.h>
@@ -46,4 +72,5 @@
 #include <genarch/mm/page_pt.h>
 #include <genarch/mm/page_ht.h>
+#include <adt/cht.h>
 #include <adt/hash_table.h>
 #include <adt/list.h>
@@ -52,26 +79,55 @@
 #include <panic.h>
 #include <errno.h>
-#include <print.h>
 
 #define FUTEX_HT_SIZE	1024	/* keep it a power of 2 */
 
-static void futex_initialize(futex_t *futex);
-
-static futex_t *futex_find(uintptr_t paddr);
+/** Task specific pointer to a global kernel futex object. */
+typedef struct futex_ptr {
+	/** CHT link. */
+	cht_link_t cht_link;
+	/** List of all futex pointers used by the task. */
+	link_t all_link;
+	/** Kernel futex object. */
+	futex_t *futex;
+	/** User space virtual address of the futex variable in the task. */
+	uintptr_t uaddr;
+} futex_ptr_t;
+
+
+static void destroy_task_cache(work_t *work);
+
+static void futex_initialize(futex_t *futex, uintptr_t paddr);
+static void futex_add_ref(futex_t *futex);
+static void futex_release_ref(futex_t *futex);
+static void futex_release_ref_locked(futex_t *futex);
+
+static futex_t *get_futex(uintptr_t uaddr);
+static futex_t *find_cached_futex(uintptr_t uaddr);
+static futex_t *get_and_cache_futex(uintptr_t phys_addr, uintptr_t uaddr);
+static bool find_futex_paddr(uintptr_t uaddr, uintptr_t *phys_addr);
+
 static size_t futex_ht_hash(sysarg_t *key);
 static bool futex_ht_compare(sysarg_t *key, size_t keys, link_t *item);
 static void futex_ht_remove_callback(link_t *item);
 
-/**
- * Mutex protecting global futex hash table.
- * It is also used to serialize access to all futex_t structures.
- * Must be acquired before the task futex B+tree lock.
- */
-static mutex_t futex_ht_lock;
-
-/** Futex hash table. */
+static size_t task_fut_ht_hash(const cht_link_t *link);
+static size_t task_fut_ht_key_hash(void *key);
+static bool task_fut_ht_equal(const cht_link_t *item1, const cht_link_t *item2);
+static bool task_fut_ht_key_equal(void *key, const cht_link_t *item);
+
+
+/** Mutex protecting the global futex hash table.
+ * 
+ * Acquire task specific TASK->futex_list_lock before this mutex.
+ */
+SPINLOCK_STATIC_INITIALIZE_NAME(futex_ht_lock, "futex-ht-lock");
+
+/** Global kernel futex hash table. Lock futex_ht_lock before accessing.
+ * 
+ * Physical address of the futex variable is the lookup key.
+ */
 static hash_table_t futex_ht;
 
-/** Futex hash table operations. */
+/** Global kernel futex hash table operations. */
 static hash_table_operations_t futex_ht_ops = {
 	.hash = futex_ht_hash,
@@ -80,21 +136,251 @@
 };
 
+/** Task futex cache CHT operations. */
+static cht_ops_t task_futex_ht_ops = {
+	.hash = task_fut_ht_hash,
+	.key_hash = task_fut_ht_key_hash,
+	.equal = task_fut_ht_equal,
+	.key_equal = task_fut_ht_key_equal,
+	.remove_callback = NULL
+};
+
 /** Initialize futex subsystem. */
 void futex_init(void)
 {
-	mutex_initialize(&futex_ht_lock, MUTEX_PASSIVE);
 	hash_table_create(&futex_ht, FUTEX_HT_SIZE, 1, &futex_ht_ops);
 }
 
-/** Initialize kernel futex structure.
- *
- * @param futex		Kernel futex structure.
- */
-void futex_initialize(futex_t *futex)
+/** Initializes the futex structures for the new task. */
+void futex_task_init(struct task *task)
+{
+	task->futexes = malloc(sizeof(struct futex_cache), 0);
+	
+	cht_create(&task->futexes->ht, 0, 0, 0, true, &task_futex_ht_ops);
+	
+	list_initialize(&task->futexes->list);
+	spinlock_initialize(&task->futexes->list_lock, "futex-list-lock");
+}
+
+/** Destroys the futex structures for the dying task. */
+void futex_task_deinit(task_t *task)
+{
+	/* Interrupts are disabled so we must not block (cannot run cht_destroy). */
+	if (interrupts_disabled()) {
+		/* Invoke the blocking cht_destroy in the background. */
+		workq_global_enqueue_noblock(&task->futexes->destroy_work, 
+			destroy_task_cache);
+	} else {
+		/* We can block. Invoke cht_destroy in this thread. */
+		destroy_task_cache(&task->futexes->destroy_work);
+	}
+}
+
+/** Deallocates a task's CHT futex cache (must already be empty). */
+static void destroy_task_cache(work_t *work)
+{
+	struct futex_cache *cache = 
+		member_to_inst(work, struct futex_cache, destroy_work);
+	
+	/* 
+	 * Destroy the cache before manually freeing items of the cache in case
+	 * table resize is in progress.
+	 */
+	cht_destroy_unsafe(&cache->ht);
+	
+	/* Manually free futex_ptr cache items. */
+	list_foreach_safe(cache->list, cur_link, next_link) {
+		futex_ptr_t *fut_ptr = member_to_inst(cur_link, futex_ptr_t, all_link);
+
+		list_remove(cur_link);
+		free(fut_ptr);
+	}
+	
+	free(cache);
+}
+
+/** Remove references from futexes known to the current task. */
+void futex_task_cleanup(void)
+{
+	struct futex_cache *futexes = TASK->futexes;
+	
+	/* All threads of this task have terminated. This is the last thread. */
+	spinlock_lock(&futexes->list_lock);
+	
+	list_foreach_safe(futexes->list, cur_link, next_link) {
+		futex_ptr_t *fut_ptr = member_to_inst(cur_link, futex_ptr_t, all_link);
+
+		/*
+		 * The function is free to free the futex. All other threads of this
+		 * task have already terminated, so they have also definitely
+		 * exited their CHT futex cache protecting rcu reader sections.
+		 * Moreover release_ref() only frees the futex if this is the 
+		 * last task referencing the futex. Therefore, only threads
+		 * of this task may have referenced the futex if it is to be freed.
+		 */
+		futex_release_ref_locked(fut_ptr->futex);
+	}
+	
+	spinlock_unlock(&futexes->list_lock);
+}
+
+
+/** Initialize the kernel futex structure.
+ *
+ * @param futex	Kernel futex structure.
+ * @param paddr Physical address of the futex variable.
+ */
+static void futex_initialize(futex_t *futex, uintptr_t paddr)
 {
 	waitq_initialize(&futex->wq);
 	link_initialize(&futex->ht_link);
-	futex->paddr = 0;
+	futex->paddr = paddr;
 	futex->refcount = 1;
+}
+
+/** Increments the counter of tasks referencing the futex. */
+static void futex_add_ref(futex_t *futex)
+{
+	ASSERT(spinlock_locked(&futex_ht_lock));
+	ASSERT(0 < futex->refcount);
+	++futex->refcount;
+}
+
+/** Decrements the counter of tasks referencing the futex. May free the futex.*/
+static void futex_release_ref(futex_t *futex)
+{
+	ASSERT(spinlock_locked(&futex_ht_lock));
+	ASSERT(0 < futex->refcount);
+	
+	--futex->refcount;
+	
+	if (0 == futex->refcount) {
+		hash_table_remove(&futex_ht, &futex->paddr, 1);
+	}
+}
+
+/** Decrements the counter of tasks referencing the futex. May free the futex.*/
+static void futex_release_ref_locked(futex_t *futex)
+{
+	spinlock_lock(&futex_ht_lock);
+	futex_release_ref(futex);
+	spinlock_unlock(&futex_ht_lock);
+}
+
+/** Returns a futex for the virtual address @a uaddr (or creates one). */
+static futex_t *get_futex(uintptr_t uaddr)
+{
+	futex_t *futex = find_cached_futex(uaddr);
+	
+	if (futex) 
+		return futex;
+
+	uintptr_t paddr;
+
+	if (!find_futex_paddr(uaddr, &paddr)) {
+		return 0;
+	}
+
+	return get_and_cache_futex(paddr, uaddr);
+}
+
+
+/** Finds the physical address of the futex variable. */
+static bool find_futex_paddr(uintptr_t uaddr, uintptr_t *paddr)
+{
+	spinlock_lock(&futex_ht_lock);
+	page_table_lock(AS, false);
+
+	bool found = false;
+	pte_t *t = page_mapping_find(AS, ALIGN_DOWN(uaddr, PAGE_SIZE), true);
+	
+	if (t && PTE_VALID(t) && PTE_PRESENT(t)) {
+		found = true;
+		*paddr = PTE_GET_FRAME(t) + (uaddr - ALIGN_DOWN(uaddr, PAGE_SIZE));
+	}
+	
+	page_table_unlock(AS, false);
+	spinlock_unlock(&futex_ht_lock);
+	
+	return found;
+}
+
+/** Returns the futex cached in this task with the virtual address uaddr. */
+static futex_t *find_cached_futex(uintptr_t uaddr)
+{
+	cht_read_lock();
+	
+	futex_t *futex;
+	cht_link_t *futex_ptr_link = cht_find_lazy(&TASK->futexes->ht, &uaddr);
+
+	if (futex_ptr_link) {
+		futex_ptr_t *futex_ptr 
+			= member_to_inst(futex_ptr_link, futex_ptr_t, cht_link);
+		
+		futex = futex_ptr->futex;
+	} else {
+		futex = NULL;
+	}
+	
+	cht_read_unlock();
+	
+	return futex;
+}
+
+
+/** 
+ * Returns a kernel futex for the physical address @a phys_addr and caches 
+ * it in this task under the virtual address @a uaddr (if not already cached).
+ */
+static futex_t *get_and_cache_futex(uintptr_t phys_addr, uintptr_t uaddr)
+{
+	futex_t *futex = malloc(sizeof(futex_t), 0);
+	
+	/* 
+	 * Find the futex object in the global futex table (or insert it 
+	 * if it is not present).
+	 */
+	spinlock_lock(&futex_ht_lock);
+	
+	link_t *fut_link = hash_table_find(&futex_ht, &phys_addr);
+	
+	if (fut_link) {
+		free(futex);
+		futex = member_to_inst(fut_link, futex_t, ht_link);
+		futex_add_ref(futex);
+	} else {
+		futex_initialize(futex, phys_addr);
+		hash_table_insert(&futex_ht, &phys_addr, &futex->ht_link);
+	}
+	
+	spinlock_unlock(&futex_ht_lock);
+	
+	/* 
+	 * Cache the link to the futex object for this task. 
+	 */
+	futex_ptr_t *fut_ptr = malloc(sizeof(futex_ptr_t), 0);
+	cht_link_t *dup_link;
+	
+	fut_ptr->futex = futex;
+	fut_ptr->uaddr = uaddr;
+	
+	cht_read_lock();
+	
+	/* Cache the mapping from the virtual address to the futex for this task. */
+	if (cht_insert_unique(&TASK->futexes->ht, &fut_ptr->cht_link, &dup_link)) {
+		spinlock_lock(&TASK->futexes->list_lock);
+		list_append(&fut_ptr->all_link, &TASK->futexes->list);
+		spinlock_unlock(&TASK->futexes->list_lock);
+	} else {
+		/* Another thread of this task beat us to it. Use that mapping instead.*/
+		free(fut_ptr);
+		futex_release_ref_locked(futex);
+		
+		futex_ptr_t *dup = member_to_inst(dup_link, futex_ptr_t, cht_link);
+		futex = dup->futex;		
+	}
+
+	cht_read_unlock();
+	
+	return futex;
 }
 
@@ -109,30 +395,11 @@
 sysarg_t sys_futex_sleep(uintptr_t uaddr)
 {
-	futex_t *futex;
-	uintptr_t paddr;
-	pte_t *t;
-	int rc;
-	
-	/*
-	 * Find physical address of futex counter.
-	 */
-	page_table_lock(AS, true);
-	t = page_mapping_find(AS, ALIGN_DOWN(uaddr, PAGE_SIZE), false);
-	if (!t || !PTE_VALID(t) || !PTE_PRESENT(t)) {
-		page_table_unlock(AS, true);
+	futex_t *futex = get_futex(uaddr);
+	
+	if (!futex) 
 		return (sysarg_t) ENOENT;
-	}
-	paddr = PTE_GET_FRAME(t) + (uaddr - ALIGN_DOWN(uaddr, PAGE_SIZE));
-	page_table_unlock(AS, true);
-	
-	futex = futex_find(paddr);
-
-#ifdef CONFIG_UDEBUG
-	udebug_stoppable_begin();
-#endif
-	rc = waitq_sleep_timeout(&futex->wq, 0, SYNCH_FLAGS_INTERRUPTIBLE); 
-#ifdef CONFIG_UDEBUG
-	udebug_stoppable_end();
-#endif
+
+	int rc = waitq_sleep_timeout(&futex->wq, 0, SYNCH_FLAGS_INTERRUPTIBLE); 
+
 	return (sysarg_t) rc;
 }
@@ -146,84 +413,14 @@
 sysarg_t sys_futex_wakeup(uintptr_t uaddr)
 {
-	futex_t *futex;
-	uintptr_t paddr;
-	pte_t *t;
-	
-	/*
-	 * Find physical address of futex counter.
-	 */
-	page_table_lock(AS, true);
-	t = page_mapping_find(AS, ALIGN_DOWN(uaddr, PAGE_SIZE), false);
-	if (!t || !PTE_VALID(t) || !PTE_PRESENT(t)) {
-		page_table_unlock(AS, true);
+	futex_t *futex = get_futex(uaddr);
+	
+	if (futex) {
+		waitq_wakeup(&futex->wq, WAKEUP_FIRST);
+		return 0;
+	} else {
 		return (sysarg_t) ENOENT;
 	}
-	paddr = PTE_GET_FRAME(t) + (uaddr - ALIGN_DOWN(uaddr, PAGE_SIZE));
-	page_table_unlock(AS, true);
-	
-	futex = futex_find(paddr);
-		
-	waitq_wakeup(&futex->wq, WAKEUP_FIRST);
-	
-	return 0;
-}
-
-/** Find kernel address of the futex structure corresponding to paddr.
- *
- * If the structure does not exist already, a new one is created.
- *
- * @param paddr		Physical address of the userspace futex counter.
- *
- * @return		Address of the kernel futex structure.
- */
-futex_t *futex_find(uintptr_t paddr)
-{
-	link_t *item;
-	futex_t *futex;
-	btree_node_t *leaf;
-	
-	/*
-	 * Find the respective futex structure
-	 * or allocate new one if it does not exist already.
-	 */
-	mutex_lock(&futex_ht_lock);
-	item = hash_table_find(&futex_ht, &paddr);
-	if (item) {
-		futex = hash_table_get_instance(item, futex_t, ht_link);
-
-		/*
-		 * See if the current task knows this futex.
-		 */
-		mutex_lock(&TASK->futexes_lock);
-		if (!btree_search(&TASK->futexes, paddr, &leaf)) {
-			/*
-			 * The futex is new to the current task.
-			 * Upgrade its reference count and put it to the
-			 * current task's B+tree of known futexes.
-			 */
-			futex->refcount++;
-			btree_insert(&TASK->futexes, paddr, futex, leaf);
-		}
-		mutex_unlock(&TASK->futexes_lock);
-	} else {
-		futex = (futex_t *) malloc(sizeof(futex_t), 0);
-		futex_initialize(futex);
-		futex->paddr = paddr;
-		hash_table_insert(&futex_ht, &paddr, &futex->ht_link);
-			
-		/*
-		 * This is the first task referencing the futex.
-		 * It can be directly inserted into its
-		 * B+tree of known futexes.
-		 */
-		mutex_lock(&TASK->futexes_lock);
-		btree_insert(&TASK->futexes, paddr, futex, NULL);
-		mutex_unlock(&TASK->futexes_lock);
-		
-	}
-	mutex_unlock(&futex_ht_lock);
-	
-	return futex;
-}
+}
+
 
 /** Compute hash index into futex hash table.
@@ -268,25 +465,34 @@
 }
 
-/** Remove references from futexes known to the current task. */
-void futex_cleanup(void)
-{
-	mutex_lock(&futex_ht_lock);
-	mutex_lock(&TASK->futexes_lock);
-
-	list_foreach(TASK->futexes.leaf_list, leaf_link, btree_node_t, node) {
-		unsigned int i;
-		
-		for (i = 0; i < node->keys; i++) {
-			futex_t *ftx;
-			uintptr_t paddr = node->key[i];
-			
-			ftx = (futex_t *) node->value[i];
-			if (--ftx->refcount == 0)
-				hash_table_remove(&futex_ht, &paddr, 1);
-		}
-	}
-	
-	mutex_unlock(&TASK->futexes_lock);
-	mutex_unlock(&futex_ht_lock);
+/*
+ * Operations of a task's CHT that caches mappings of futex user space 
+ * virtual addresses to kernel futex objects.
+ */
+
+static size_t task_fut_ht_hash(const cht_link_t *link)
+{
+	const futex_ptr_t *fut_ptr = member_to_inst(link, futex_ptr_t, cht_link);
+	return fut_ptr->uaddr;
+}
+
+static size_t task_fut_ht_key_hash(void *key)
+{
+	return *(uintptr_t*)key;
+}
+
+static bool task_fut_ht_equal(const cht_link_t *item1, const cht_link_t *item2)
+{
+	const futex_ptr_t *fut_ptr1 = member_to_inst(item1, futex_ptr_t, cht_link);
+	const futex_ptr_t *fut_ptr2 = member_to_inst(item2, futex_ptr_t, cht_link);
+	
+	return fut_ptr1->uaddr == fut_ptr2->uaddr;
+}
+
+static bool task_fut_ht_key_equal(void *key, const cht_link_t *item)
+{
+	const futex_ptr_t *fut_ptr = member_to_inst(item, futex_ptr_t, cht_link);
+	uintptr_t uaddr = *(uintptr_t*)key;
+	
+	return fut_ptr->uaddr == uaddr;
 }
 
Index: kernel/generic/src/synch/mutex.c
===================================================================
--- kernel/generic/src/synch/mutex.c	(revision feeac0d8cd6e0ce736779bc5f1dc701c5cf42ba6)
+++ kernel/generic/src/synch/mutex.c	(revision 5e3fa9dbfffc978cdcfdaaea9fc6a50dbb0bca1a)
@@ -41,4 +41,6 @@
 #include <arch.h>
 #include <stacktrace.h>
+#include <cpu.h>
+#include <proc/thread.h>
 
 /** Initialize mutex.
Index: kernel/generic/src/synch/rcu.c
===================================================================
--- kernel/generic/src/synch/rcu.c	(revision 5e3fa9dbfffc978cdcfdaaea9fc6a50dbb0bca1a)
+++ kernel/generic/src/synch/rcu.c	(revision 5e3fa9dbfffc978cdcfdaaea9fc6a50dbb0bca1a)
@@ -0,0 +1,1873 @@
+/*
+ * Copyright (c) 2012 Adam Hraska
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * - Redistributions of source code must retain the above copyright
+ *   notice, this list of conditions and the following disclaimer.
+ * - Redistributions in binary form must reproduce the above copyright
+ *   notice, this list of conditions and the following disclaimer in the
+ *   documentation and/or other materials provided with the distribution.
+ * - The name of the author may not be used to endorse or promote products
+ *   derived from this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
+ * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
+ * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
+ * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
+ * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
+ * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
+ * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+ 
+ 
+/** @addtogroup sync
+ * @{
+ */
+
+/**
+ * @file
+ * @brief Preemptible read-copy update. Usable from interrupt handlers.
+ * 
+ * @par Podzimek-preempt-RCU (RCU_PREEMPT_PODZIMEK)
+ * 
+ * Podzimek-preempt-RCU is a preemptible variant of Podzimek's non-preemptible
+ * RCU algorithm [1, 2]. Grace period (GP) detection is centralized into a
+ * single detector thread. The detector requests that each cpu announces
+ * that it passed a quiescent state (QS), ie a state when the cpu is
+ * outside of an rcu reader section (CS). Cpus check for QSs during context
+ * switches and when entering and exiting rcu reader sections. Once all 
+ * cpus announce a QS and if there were no threads preempted in a CS, the 
+ * GP ends.
+ * 
+ * The detector increments the global GP counter, _rcu_cur_gp, in order 
+ * to start a new GP. Readers notice the new GP by comparing the changed 
+ * _rcu_cur_gp to a locally stored value last_seen_gp which denotes the
+ * the last GP number for which the cpu noted an explicit QS (and issued
+ * a memory barrier). Readers check for the change in the outer-most
+ * (ie not nested) rcu_read_lock()/unlock() as these functions represent 
+ * a QS. The reader first executes a memory barrier (MB) in order to contain 
+ * memory references within a CS (and to make changes made by writers 
+ * visible in the CS following rcu_read_lock()). Next, the reader notes 
+ * that it reached a QS by updating the cpu local last_seen_gp to the
+ * global GP counter, _rcu_cur_gp. Cache coherency eventually makes
+ * the updated last_seen_gp visible to the detector cpu, much like it
+ * delivered the changed _rcu_cur_gp to all cpus.
+ * 
+ * The detector waits a while after starting a GP and then reads each 
+ * cpu's last_seen_gp to see if it reached a QS. If a cpu did not record 
+ * a QS (might be a long running thread without an RCU reader CS; or cache
+ * coherency has yet to make the most current last_seen_gp visible to
+ * the detector; or the cpu is still in a CS) the cpu is interrupted
+ * via an IPI. If the IPI handler finds the cpu still in a CS, it instructs
+ * the cpu to notify the detector that it had exited the CS via a semaphore
+ * (CPU->rcu.is_delaying_gp). 
+ * The detector then waits on the semaphore for any cpus to exit their
+ * CSs. Lastly, it waits for the last reader preempted in a CS to 
+ * exit its CS if there were any and signals the end of the GP to
+ * separate reclaimer threads wired to each cpu. Reclaimers then
+ * execute the callbacks queued on each of the cpus.
+ * 
+ * 
+ * @par A-RCU algorithm (RCU_PREEMPT_A)
+ * 
+ * A-RCU is based on the user space rcu algorithm in [3] utilizing signals
+ * (urcu) and Podzimek's rcu [1]. Like in Podzimek's rcu, callbacks are 
+ * executed by cpu-bound reclaimer threads. There is however no dedicated 
+ * detector thread and the reclaimers take on the responsibilities of the 
+ * detector when they need to start a new GP. A new GP is again announced 
+ * and acknowledged with _rcu_cur_gp and the cpu local last_seen_gp. Unlike
+ * Podzimek's rcu, cpus check explicitly for QS only during context switches. 
+ * Like in urcu, rcu_read_lock()/unlock() only maintain the nesting count
+ * and never issue any memory barriers. This makes rcu_read_lock()/unlock()
+ * simple and fast.
+ * 
+ * If a new callback is queued for a reclaimer and no GP is in progress,
+ * the reclaimer takes on the role of a detector. The detector increments 
+ * _rcu_cur_gp in order to start a new GP. It waits a while to give cpus 
+ * a chance to switch a context (a natural QS). Then, it examines each
+ * non-idle cpu that has yet to pass a QS via an IPI. The IPI handler
+ * sees the most current _rcu_cur_gp and last_seen_gp and notes a QS
+ * with a memory barrier and an update to last_seen_gp. If the handler
+ * finds the cpu in a CS it does nothing and let the detector poll/interrupt
+ * the cpu again after a short sleep.
+ * 
+ * @par Caveats
+ * 
+ * last_seen_gp and _rcu_cur_gp are always 64bit variables and they
+ * are read non-atomically on 32bit machines. Reading a clobbered
+ * value of last_seen_gp or _rcu_cur_gp or writing a clobbered value
+ * of _rcu_cur_gp to last_seen_gp will at worst force the detector
+ * to unnecessarily interrupt a cpu. Interrupting a cpu makes the 
+ * correct value of _rcu_cur_gp visible to the cpu and correctly
+ * resets last_seen_gp in both algorithms.
+ * 
+ * 
+ * 
+ * [1] Read-copy-update for opensolaris,
+ *     2010, Podzimek
+ *     https://andrej.podzimek.org/thesis.pdf
+ * 
+ * [2] (podzimek-rcu) implementation file "rcu.patch"
+ *     http://d3s.mff.cuni.cz/projects/operating_systems/rcu/rcu.patch
+ * 
+ * [3] User-level implementations of read-copy update,
+ *     2012, appendix
+ *     http://www.rdrop.com/users/paulmck/RCU/urcu-supp-accepted.2011.08.30a.pdf
+ * 
+ */
+ 
+#include <synch/rcu.h>
+#include <synch/condvar.h>
+#include <synch/semaphore.h>
+#include <synch/spinlock.h>
+#include <synch/mutex.h>
+#include <proc/thread.h>
+#include <cpu/cpu_mask.h>
+#include <cpu.h>
+#include <smp/smp_call.h>
+#include <compiler/barrier.h>
+#include <atomic.h>
+#include <arch.h>
+#include <macros.h>
+
+/* 
+ * Number of milliseconds to give to preexisting readers to finish 
+ * when non-expedited grace period detection is in progress.
+ */
+#define DETECT_SLEEP_MS    10
+/* 
+ * Max number of pending callbacks in the local cpu's queue before 
+ * aggressively expediting the current grace period
+ */
+#define EXPEDITE_THRESHOLD 2000
+/*
+ * Max number of callbacks to execute in one go with preemption
+ * enabled. If there are more callbacks to be executed they will
+ * be run with preemption disabled in order to prolong reclaimer's
+ * time slice and give it a chance to catch up with callback producers.
+ */
+#define CRITICAL_THRESHOLD 30000
+/* Half the number of values a uint32 can hold. */
+#define UINT32_MAX_HALF    2147483648U
+
+/** 
+ * The current grace period number. Increases monotonically. 
+ * Lock rcu.gp_lock or rcu.preempt_lock to get a current value.
+ */
+rcu_gp_t _rcu_cur_gp;
+
+/** Global RCU data. */
+typedef struct rcu_data {
+	/** Detector uses so signal reclaimers that a grace period ended. */
+	condvar_t gp_ended;
+	/** Reclaimers use to notify the detector to accelerate GP detection. */
+	condvar_t expedite_now;
+	/** 
+	 * Protects: req_gp_end_cnt, req_expedited_cnt, completed_gp, _rcu_cur_gp;
+	 * or: completed_gp, _rcu_cur_gp
+	 */
+	SPINLOCK_DECLARE(gp_lock);
+	/**
+	 * The number of the most recently completed grace period. At most 
+	 * one behind _rcu_cur_gp. If equal to _rcu_cur_gp, a grace period 
+	 * detection is not in progress and the detector is idle.
+	 */
+	rcu_gp_t completed_gp;
+	
+	/** Protects the following 3 fields. */
+	IRQ_SPINLOCK_DECLARE(preempt_lock);
+	/** Preexisting readers that have been preempted. */
+	list_t cur_preempted;
+	/** Reader that have been preempted and might delay the next grace period.*/
+	list_t next_preempted;
+	/** 
+	 * The detector is waiting for the last preempted reader 
+	 * in cur_preempted to announce that it exited its reader 
+	 * section by up()ing remaining_readers.
+	 */
+	bool preempt_blocking_det;
+	
+#ifdef RCU_PREEMPT_A
+	
+	/** 
+	 * The detector waits on this semaphore for any preempted readers 
+	 * delaying the grace period once all cpus pass a quiescent state.
+	 */
+	semaphore_t remaining_readers;
+
+#elif defined(RCU_PREEMPT_PODZIMEK)
+	
+	/** Reclaimers notify the detector when they request more grace periods.*/
+	condvar_t req_gp_changed;
+	/** Number of grace period ends the detector was requested to announce. */
+	size_t req_gp_end_cnt;
+	/** Number of consecutive grace periods to detect quickly and aggressively.*/
+	size_t req_expedited_cnt;
+	/** 
+	 * Number of cpus with readers that are delaying the current GP.
+	 * They will up() remaining_readers.
+	 */
+	atomic_t delaying_cpu_cnt;
+	/** 
+	 * The detector waits on this semaphore for any readers delaying the GP.
+	 * 
+	 * Each of the cpus with readers that are delaying the current GP 
+	 * must up() this sema once they reach a quiescent state. If there 
+	 * are any readers in cur_preempted (ie preempted preexisting) and 
+	 * they are already delaying GP detection, the last to unlock its
+	 * reader section must up() this sema once.
+	 */
+	semaphore_t remaining_readers;
+#endif
+	
+	/** Excludes simultaneous rcu_barrier() calls. */
+	mutex_t barrier_mtx;
+	/** Number of cpus that we are waiting for to complete rcu_barrier(). */
+	atomic_t barrier_wait_cnt;
+	/** rcu_barrier() waits for the completion of barrier callbacks on this wq.*/
+	waitq_t barrier_wq;
+	
+	/** Interruptible attached detector thread pointer. */
+	thread_t *detector_thr;
+	
+	/* Some statistics. */
+	size_t stat_expedited_cnt;
+	size_t stat_delayed_cnt;
+	size_t stat_preempt_blocking_cnt;
+	/* Does not contain self/local calls. */
+	size_t stat_smp_call_cnt;
+} rcu_data_t;
+
+
+static rcu_data_t rcu;
+
+static void start_reclaimers(void);
+static void synch_complete(rcu_item_t *rcu_item);
+static inline void rcu_call_impl(bool expedite, rcu_item_t *rcu_item, 
+	rcu_func_t func);
+static void add_barrier_cb(void *arg);
+static void barrier_complete(rcu_item_t *barrier_item);
+static bool arriving_cbs_empty(void);
+static bool next_cbs_empty(void);
+static bool cur_cbs_empty(void);
+static bool all_cbs_empty(void);
+static void reclaimer(void *arg);
+static bool wait_for_pending_cbs(void);
+static bool advance_cbs(void);
+static void exec_completed_cbs(rcu_gp_t last_completed_gp);
+static void exec_cbs(rcu_item_t **phead);
+static bool wait_for_cur_cbs_gp_end(bool expedite, rcu_gp_t *last_completed_gp);
+static void upd_missed_gp_in_wait(rcu_gp_t completed_gp);
+
+#ifdef RCU_PREEMPT_PODZIMEK
+static void start_detector(void);
+static void read_unlock_impl(size_t *pnesting_cnt);
+static void req_detection(size_t req_cnt);
+static bool cv_wait_for_gp(rcu_gp_t wait_on_gp);
+static void detector(void *);
+static bool wait_for_detect_req(void);
+static void end_cur_gp(void);
+static bool wait_for_readers(void);
+static bool gp_sleep(void);
+static void interrupt_delaying_cpus(cpu_mask_t *cpu_mask);
+static bool wait_for_delaying_cpus(void);
+#elif defined(RCU_PREEMPT_A)
+static bool wait_for_readers(bool expedite);
+static bool gp_sleep(bool *expedite);
+#endif
+
+static void start_new_gp(void);
+static void rm_quiescent_cpus(cpu_mask_t *cpu_mask);
+static void sample_cpus(cpu_mask_t *reader_cpus, void *arg);
+static void sample_local_cpu(void *);
+static bool wait_for_preempt_reader(void);
+static void note_preempted_reader(void);
+static void rm_preempted_reader(void);
+static void upd_max_cbs_in_slice(size_t arriving_cbs_cnt);
+
+
+
+/** Initializes global RCU structures. */
+void rcu_init(void)
+{
+	condvar_initialize(&rcu.gp_ended);
+	condvar_initialize(&rcu.expedite_now);
+
+	spinlock_initialize(&rcu.gp_lock, "rcu.gp_lock");
+	_rcu_cur_gp = 0;
+	rcu.completed_gp = 0;
+	
+	irq_spinlock_initialize(&rcu.preempt_lock, "rcu.preempt_lock");
+	list_initialize(&rcu.cur_preempted);
+	list_initialize(&rcu.next_preempted);
+	rcu.preempt_blocking_det = false;
+	
+	mutex_initialize(&rcu.barrier_mtx, MUTEX_PASSIVE);
+	atomic_set(&rcu.barrier_wait_cnt, 0);
+	waitq_initialize(&rcu.barrier_wq);
+
+	semaphore_initialize(&rcu.remaining_readers, 0);
+	
+#ifdef RCU_PREEMPT_PODZIMEK
+	condvar_initialize(&rcu.req_gp_changed);
+	
+	rcu.req_gp_end_cnt = 0;
+	rcu.req_expedited_cnt = 0;
+	atomic_set(&rcu.delaying_cpu_cnt, 0);
+#endif
+	
+	rcu.detector_thr = NULL;
+	
+	rcu.stat_expedited_cnt = 0;
+	rcu.stat_delayed_cnt = 0;
+	rcu.stat_preempt_blocking_cnt = 0;
+	rcu.stat_smp_call_cnt = 0;
+}
+
+/** Initializes per-CPU RCU data. If on the boot cpu inits global data too.*/
+void rcu_cpu_init(void)
+{
+	if (config.cpu_active == 1) {
+		rcu_init();
+	}
+
+	CPU->rcu.last_seen_gp = 0;
+
+#ifdef RCU_PREEMPT_PODZIMEK
+	CPU->rcu.nesting_cnt = 0;
+	CPU->rcu.is_delaying_gp = false;
+	CPU->rcu.signal_unlock = false;
+#endif
+	
+	CPU->rcu.cur_cbs = NULL;
+	CPU->rcu.cur_cbs_cnt = 0;
+	CPU->rcu.next_cbs = NULL;
+	CPU->rcu.next_cbs_cnt = 0;
+	CPU->rcu.arriving_cbs = NULL;
+	CPU->rcu.parriving_cbs_tail = &CPU->rcu.arriving_cbs;
+	CPU->rcu.arriving_cbs_cnt = 0;
+
+	CPU->rcu.cur_cbs_gp = 0;
+	CPU->rcu.next_cbs_gp = 0;
+	
+	semaphore_initialize(&CPU->rcu.arrived_flag, 0);
+
+	/* BSP creates reclaimer threads before AP's rcu_cpu_init() runs. */
+	if (config.cpu_active == 1)
+		CPU->rcu.reclaimer_thr = NULL;
+	
+	CPU->rcu.stat_max_cbs = 0;
+	CPU->rcu.stat_avg_cbs = 0;
+	CPU->rcu.stat_missed_gps = 0;
+	CPU->rcu.stat_missed_gp_in_wait = 0;
+	CPU->rcu.stat_max_slice_cbs = 0;
+	CPU->rcu.last_arriving_cnt = 0;
+}
+
+/** Completes RCU init. Creates and runs the detector and reclaimer threads.*/
+void rcu_kinit_init(void)
+{
+#ifdef RCU_PREEMPT_PODZIMEK
+	start_detector();
+#endif
+	
+	start_reclaimers();
+}
+
+/** Initializes any per-thread RCU structures. */
+void rcu_thread_init(thread_t *thread)
+{
+	thread->rcu.nesting_cnt = 0;
+
+#ifdef RCU_PREEMPT_PODZIMEK
+	thread->rcu.was_preempted = false;
+#endif
+	
+	link_initialize(&thread->rcu.preempt_link);
+}
+
+
+/** Cleans up global RCU resources and stops dispatching callbacks. 
+ * 
+ * Call when shutting down the kernel. Outstanding callbacks will
+ * not be processed. Instead they will linger forever.
+ */
+void rcu_stop(void)
+{
+	/* Stop and wait for reclaimers. */
+	for (unsigned int cpu_id = 0; cpu_id < config.cpu_active; ++cpu_id) {
+		ASSERT(cpus[cpu_id].rcu.reclaimer_thr != NULL);
+	
+		if (cpus[cpu_id].rcu.reclaimer_thr) {
+			thread_interrupt(cpus[cpu_id].rcu.reclaimer_thr);
+			thread_join(cpus[cpu_id].rcu.reclaimer_thr);
+			thread_detach(cpus[cpu_id].rcu.reclaimer_thr);
+			cpus[cpu_id].rcu.reclaimer_thr = NULL;
+		}
+	}
+
+#ifdef RCU_PREEMPT_PODZIMEK
+	/* Stop the detector and wait. */
+	if (rcu.detector_thr) {
+		thread_interrupt(rcu.detector_thr);
+		thread_join(rcu.detector_thr);
+		thread_detach(rcu.detector_thr);
+		rcu.detector_thr = NULL;
+	}
+#endif
+}
+
+/** Returns the number of elapsed grace periods since boot. */
+uint64_t rcu_completed_gps(void)
+{
+	spinlock_lock(&rcu.gp_lock);
+	uint64_t completed = rcu.completed_gp;
+	spinlock_unlock(&rcu.gp_lock);
+	
+	return completed;
+}
+
+/** Creates and runs cpu-bound reclaimer threads. */
+static void start_reclaimers(void)
+{
+	for (unsigned int cpu_id = 0; cpu_id < config.cpu_count; ++cpu_id) {
+		char name[THREAD_NAME_BUFLEN] = {0};
+		
+		snprintf(name, THREAD_NAME_BUFLEN - 1, "rcu-rec/%u", cpu_id);
+		
+		cpus[cpu_id].rcu.reclaimer_thr = 
+			thread_create(reclaimer, NULL, TASK, THREAD_FLAG_NONE, name);
+
+		if (!cpus[cpu_id].rcu.reclaimer_thr) 
+			panic("Failed to create RCU reclaimer thread on cpu%u.", cpu_id);
+
+		thread_wire(cpus[cpu_id].rcu.reclaimer_thr, &cpus[cpu_id]);
+		thread_ready(cpus[cpu_id].rcu.reclaimer_thr);
+	}
+}
+
+#ifdef RCU_PREEMPT_PODZIMEK
+
+/** Starts the detector thread. */
+static void start_detector(void)
+{
+	rcu.detector_thr = 
+		thread_create(detector, NULL, TASK, THREAD_FLAG_NONE, "rcu-det");
+	
+	if (!rcu.detector_thr) 
+		panic("Failed to create RCU detector thread.");
+	
+	thread_ready(rcu.detector_thr);
+}
+
+/** Returns true if in an rcu reader section. */
+bool rcu_read_locked(void)
+{
+	preemption_disable();
+	bool locked = 0 < CPU->rcu.nesting_cnt;
+	preemption_enable();
+	
+	return locked;
+}
+
+/** Unlocks the local reader section using the given nesting count. 
+ * 
+ * Preemption or interrupts must be disabled. 
+ * 
+ * @param pnesting_cnt Either &CPU->rcu.tmp_nesting_cnt or 
+ *           THREAD->rcu.nesting_cnt.
+ */
+static void read_unlock_impl(size_t *pnesting_cnt)
+{
+	ASSERT(PREEMPTION_DISABLED || interrupts_disabled());
+	
+	if (0 == --(*pnesting_cnt)) {
+		_rcu_record_qs();
+		
+		/* 
+		 * The thread was preempted while in a critical section or 
+		 * the detector is eagerly waiting for this cpu's reader 
+		 * to finish. 
+		 * 
+		 * Note that THREAD may be NULL in scheduler() and not just during boot.
+		 */
+		if ((THREAD && THREAD->rcu.was_preempted) || CPU->rcu.is_delaying_gp) {
+			/* Rechecks with disabled interrupts. */
+			_rcu_signal_read_unlock();
+		}
+	}
+}
+
+/** If necessary, signals the detector that we exited a reader section. */
+void _rcu_signal_read_unlock(void)
+{
+	ASSERT(PREEMPTION_DISABLED || interrupts_disabled());
+	
+	/*
+	 * If an interrupt occurs here (even a NMI) it may beat us to
+	 * resetting .is_delaying_gp or .was_preempted and up the semaphore
+	 * for us.
+	 */
+	
+	/* 
+	 * If the detector is eagerly waiting for this cpu's reader to unlock,
+	 * notify it that the reader did so.
+	 */
+	if (local_atomic_exchange(&CPU->rcu.is_delaying_gp, false)) {
+		semaphore_up(&rcu.remaining_readers);
+	}
+	
+	/*
+	 * This reader was preempted while in a reader section.
+	 * We might be holding up the current GP. Notify the
+	 * detector if so.
+	 */
+	if (THREAD && local_atomic_exchange(&THREAD->rcu.was_preempted, false)) {
+		ASSERT(link_used(&THREAD->rcu.preempt_link));
+
+		rm_preempted_reader();
+	}
+	
+	/* If there was something to signal to the detector we have done so. */
+	CPU->rcu.signal_unlock = false;
+}
+
+#endif /* RCU_PREEMPT_PODZIMEK */
+
+typedef struct synch_item {
+	waitq_t wq;
+	rcu_item_t rcu_item;
+} synch_item_t;
+
+/** Blocks until all preexisting readers exit their critical sections. */
+void rcu_synchronize(void)
+{
+	_rcu_synchronize(false);
+}
+
+/** Blocks until all preexisting readers exit their critical sections. */
+void rcu_synchronize_expedite(void)
+{
+	_rcu_synchronize(true);
+}
+
+/** Blocks until all preexisting readers exit their critical sections. */
+void _rcu_synchronize(bool expedite)
+{
+	/* Calling from a reader section will deadlock. */
+	ASSERT(!rcu_read_locked());
+	
+	synch_item_t completion; 
+
+	waitq_initialize(&completion.wq);
+	_rcu_call(expedite, &completion.rcu_item, synch_complete);
+	waitq_sleep(&completion.wq);
+}
+
+/** rcu_synchronize's callback. */
+static void synch_complete(rcu_item_t *rcu_item)
+{
+	synch_item_t *completion = member_to_inst(rcu_item, synch_item_t, rcu_item);
+	ASSERT(completion);
+	waitq_wakeup(&completion->wq, WAKEUP_FIRST);
+}
+
+/** Waits for all outstanding rcu calls to complete. */
+void rcu_barrier(void)
+{
+	/* 
+	 * Serialize rcu_barrier() calls so we don't overwrite cpu.barrier_item
+	 * currently in use by rcu_barrier().
+	 */
+	mutex_lock(&rcu.barrier_mtx);
+	
+	/* 
+	 * Ensure we queue a barrier callback on all cpus before the already
+	 * enqueued barrier callbacks start signaling completion.
+	 */
+	atomic_set(&rcu.barrier_wait_cnt, 1);
+
+	DEFINE_CPU_MASK(cpu_mask);
+	cpu_mask_active(cpu_mask);
+	
+	cpu_mask_for_each(*cpu_mask, cpu_id) {
+		smp_call(cpu_id, add_barrier_cb, NULL);
+	}
+	
+	if (0 < atomic_predec(&rcu.barrier_wait_cnt)) {
+		waitq_sleep(&rcu.barrier_wq);
+	}
+	
+	mutex_unlock(&rcu.barrier_mtx);
+}
+
+/** Issues a rcu_barrier() callback on the local cpu. 
+ * 
+ * Executed with interrupts disabled.  
+ */
+static void add_barrier_cb(void *arg)
+{
+	ASSERT(interrupts_disabled() || PREEMPTION_DISABLED);
+	atomic_inc(&rcu.barrier_wait_cnt);
+	rcu_call(&CPU->rcu.barrier_item, barrier_complete);
+}
+
+/** Local cpu's rcu_barrier() completion callback. */
+static void barrier_complete(rcu_item_t *barrier_item)
+{
+	/* Is this the last barrier callback completed? */
+	if (0 == atomic_predec(&rcu.barrier_wait_cnt)) {
+		/* Notify rcu_barrier() that we're done. */
+		waitq_wakeup(&rcu.barrier_wq, WAKEUP_FIRST);
+	}
+}
+
+/** Adds a callback to invoke after all preexisting readers finish. 
+ * 
+ * May be called from within interrupt handlers or RCU reader sections.
+ * 
+ * @param rcu_item Used by RCU to track the call. Must remain
+ *         until the user callback function is entered.
+ * @param func User callback function that will be invoked once a full
+ *         grace period elapsed, ie at a time when all preexisting
+ *         readers have finished. The callback should be short and must
+ *         not block. If you must sleep, enqueue your work in the system
+ *         work queue from the callback (ie workq_global_enqueue()).
+ */
+void rcu_call(rcu_item_t *rcu_item, rcu_func_t func)
+{
+	rcu_call_impl(false, rcu_item, func);
+}
+
+/** rcu_call() implementation. See rcu_call() for comments. */
+void _rcu_call(bool expedite, rcu_item_t *rcu_item, rcu_func_t func)
+{
+	rcu_call_impl(expedite, rcu_item, func);
+}
+
+/** rcu_call() inline-able implementation. See rcu_call() for comments. */
+static inline void rcu_call_impl(bool expedite, rcu_item_t *rcu_item, 
+	rcu_func_t func)
+{
+	ASSERT(rcu_item);
+	
+	rcu_item->func = func;
+	rcu_item->next = NULL;
+	
+	preemption_disable();
+
+	rcu_cpu_data_t *r = &CPU->rcu;
+
+	rcu_item_t **prev_tail 
+		= local_atomic_exchange(&r->parriving_cbs_tail, &rcu_item->next);
+	*prev_tail = rcu_item;
+	
+	/* Approximate the number of callbacks present. */
+	++r->arriving_cbs_cnt;
+	
+	if (expedite) {
+		r->expedite_arriving = true;
+	}
+	
+	bool first_cb = (prev_tail == &CPU->rcu.arriving_cbs);
+	
+	/* Added first callback - notify the reclaimer. */
+	if (first_cb && !semaphore_count_get(&r->arrived_flag)) {
+		semaphore_up(&r->arrived_flag);
+	}
+	
+	preemption_enable();
+}
+
+static bool cur_cbs_empty(void)
+{
+	ASSERT(THREAD && THREAD->wired);
+	return NULL == CPU->rcu.cur_cbs;
+}
+
+static bool next_cbs_empty(void)
+{
+	ASSERT(THREAD && THREAD->wired);
+	return NULL == CPU->rcu.next_cbs;
+}
+
+/** Disable interrupts to get an up-to-date result. */
+static bool arriving_cbs_empty(void)
+{
+	ASSERT(THREAD && THREAD->wired);
+	/* 
+	 * Accessing with interrupts enabled may at worst lead to 
+	 * a false negative if we race with a local interrupt handler.
+	 */
+	return NULL == CPU->rcu.arriving_cbs;
+}
+
+static bool all_cbs_empty(void)
+{
+	return cur_cbs_empty() && next_cbs_empty() && arriving_cbs_empty();
+}
+
+
+/** Reclaimer thread dispatches locally queued callbacks once a GP ends. */
+static void reclaimer(void *arg)
+{
+	ASSERT(THREAD && THREAD->wired);
+	ASSERT(THREAD == CPU->rcu.reclaimer_thr);
+
+	rcu_gp_t last_compl_gp = 0;
+	bool ok = true;
+	
+	while (ok && wait_for_pending_cbs()) {
+		ASSERT(CPU->rcu.reclaimer_thr == THREAD);
+		
+		exec_completed_cbs(last_compl_gp);
+
+		bool expedite = advance_cbs();
+		
+		ok = wait_for_cur_cbs_gp_end(expedite, &last_compl_gp);
+	}
+}
+
+/** Waits until there are callbacks waiting to be dispatched. */
+static bool wait_for_pending_cbs(void)
+{
+	if (!all_cbs_empty()) 
+		return true;
+
+	bool ok = true;
+	
+	while (arriving_cbs_empty() && ok) {
+		ok = semaphore_down_interruptable(&CPU->rcu.arrived_flag);
+	}
+	
+	return ok;
+}
+
+static void upd_stat_missed_gp(rcu_gp_t compl)
+{
+	if (CPU->rcu.cur_cbs_gp < compl) {
+		CPU->rcu.stat_missed_gps += (size_t)(compl - CPU->rcu.cur_cbs_gp);
+	}
+}
+
+/** Executes all callbacks for the given completed grace period. */
+static void exec_completed_cbs(rcu_gp_t last_completed_gp)
+{
+	upd_stat_missed_gp(last_completed_gp);
+	
+	/* Both next_cbs and cur_cbs GP elapsed. */
+	if (CPU->rcu.next_cbs_gp <= last_completed_gp) {
+		ASSERT(CPU->rcu.cur_cbs_gp <= CPU->rcu.next_cbs_gp);
+		
+		size_t exec_cnt = CPU->rcu.cur_cbs_cnt + CPU->rcu.next_cbs_cnt;
+		
+		if (exec_cnt < CRITICAL_THRESHOLD) {
+			exec_cbs(&CPU->rcu.cur_cbs);
+			exec_cbs(&CPU->rcu.next_cbs);	
+		} else {
+			/* 
+			 * Getting overwhelmed with too many callbacks to run. 
+			 * Disable preemption in order to prolong our time slice 
+			 * and catch up with updaters posting new callbacks.
+			 */
+			preemption_disable();
+			exec_cbs(&CPU->rcu.cur_cbs);
+			exec_cbs(&CPU->rcu.next_cbs);	
+			preemption_enable();
+		}
+		
+		CPU->rcu.cur_cbs_cnt = 0;
+		CPU->rcu.next_cbs_cnt = 0;
+	} else if (CPU->rcu.cur_cbs_gp <= last_completed_gp) {
+
+		if (CPU->rcu.cur_cbs_cnt < CRITICAL_THRESHOLD) {
+			exec_cbs(&CPU->rcu.cur_cbs);
+		} else {
+			/* 
+			 * Getting overwhelmed with too many callbacks to run. 
+			 * Disable preemption in order to prolong our time slice 
+			 * and catch up with updaters posting new callbacks.
+			 */
+			preemption_disable();
+			exec_cbs(&CPU->rcu.cur_cbs);
+			preemption_enable();
+		}
+
+		CPU->rcu.cur_cbs_cnt = 0;
+	}
+}
+
+/** Executes callbacks in the single-linked list. The list is left empty. */
+static void exec_cbs(rcu_item_t **phead)
+{
+	rcu_item_t *rcu_item = *phead;
+
+	while (rcu_item) {
+		/* func() may free rcu_item. Get a local copy. */
+		rcu_item_t *next = rcu_item->next;
+		rcu_func_t func = rcu_item->func;
+		
+		func(rcu_item);
+		
+		rcu_item = next;
+	}
+	
+	*phead = NULL;
+}
+
+static void upd_stat_cb_cnts(size_t arriving_cnt)
+{
+	CPU->rcu.stat_max_cbs = max(arriving_cnt, CPU->rcu.stat_max_cbs);
+	if (0 < arriving_cnt) {
+		CPU->rcu.stat_avg_cbs = 
+			(99 * CPU->rcu.stat_avg_cbs + 1 * arriving_cnt) / 100;
+	}
+}
+
+/** Prepares another batch of callbacks to dispatch at the nest grace period.
+ * 
+ * @return True if the next batch of callbacks must be expedited quickly.
+ */
+static bool advance_cbs(void)
+{
+	/* Move next_cbs to cur_cbs. */
+	CPU->rcu.cur_cbs = CPU->rcu.next_cbs;
+	CPU->rcu.cur_cbs_cnt = CPU->rcu.next_cbs_cnt;
+	CPU->rcu.cur_cbs_gp = CPU->rcu.next_cbs_gp;
+	
+	/* Move arriving_cbs to next_cbs. */
+	
+	CPU->rcu.next_cbs_cnt = CPU->rcu.arriving_cbs_cnt;
+	CPU->rcu.arriving_cbs_cnt = 0;
+	
+	/* 
+	 * Too many callbacks queued. Better speed up the detection
+	 * or risk exhausting all system memory.
+	 */
+	bool expedite = (EXPEDITE_THRESHOLD < CPU->rcu.next_cbs_cnt)
+		|| CPU->rcu.expedite_arriving;	
+	CPU->rcu.expedite_arriving = false;
+
+	/* Start moving the arriving_cbs list to next_cbs. */
+	CPU->rcu.next_cbs = CPU->rcu.arriving_cbs;
+	
+	/* 
+	 * At least one callback arrived. The tail therefore does not point
+	 * to the head of arriving_cbs and we can safely reset it to NULL.
+	 */
+	if (CPU->rcu.next_cbs) {
+		ASSERT(CPU->rcu.parriving_cbs_tail != &CPU->rcu.arriving_cbs);
+		
+		CPU->rcu.arriving_cbs = NULL;
+		/* Reset arriving_cbs before updating the tail pointer. */
+		compiler_barrier();
+		/* Updating the tail pointer completes the move of arriving_cbs. */
+		ACCESS_ONCE(CPU->rcu.parriving_cbs_tail) = &CPU->rcu.arriving_cbs;
+	} else {
+		/* 
+		 * arriving_cbs was null and parriving_cbs_tail pointed to it 
+		 * so leave it that way. Note that interrupt handlers may have
+		 * added a callback in the meantime so it is not safe to reset
+		 * arriving_cbs or parriving_cbs.
+		 */
+	}
+
+	/* Update statistics of arrived callbacks. */
+	upd_stat_cb_cnts(CPU->rcu.next_cbs_cnt);
+	
+	/* 
+	 * Make changes prior to queuing next_cbs visible to readers. 
+	 * See comment in wait_for_readers().
+	 */
+	memory_barrier(); /* MB A, B */
+
+	/* At the end of next_cbs_gp, exec next_cbs. Determine what GP that is. */
+	
+	if (!next_cbs_empty()) {
+		spinlock_lock(&rcu.gp_lock);
+	
+		/* Exec next_cbs at the end of the next GP. */
+		CPU->rcu.next_cbs_gp = _rcu_cur_gp + 1;
+		
+		/* 
+		 * There are no callbacks to invoke before next_cbs. Instruct
+		 * wait_for_cur_cbs_gp() to notify us of the nearest GP end.
+		 * That could be sooner than next_cbs_gp (if the current GP 
+		 * had not yet completed), so we'll create a shorter batch
+		 * of callbacks next time around.
+		 */
+		if (cur_cbs_empty()) {
+			CPU->rcu.cur_cbs_gp = rcu.completed_gp + 1;
+		} 
+		
+		spinlock_unlock(&rcu.gp_lock);
+	} else {
+		CPU->rcu.next_cbs_gp = CPU->rcu.cur_cbs_gp;
+	}
+	
+	ASSERT(CPU->rcu.cur_cbs_gp <= CPU->rcu.next_cbs_gp);
+	
+	return expedite;	
+}
+
+
+#ifdef RCU_PREEMPT_A
+
+/** Waits for the grace period associated with callbacks cub_cbs to elapse. 
+ * 
+ * @param expedite Instructs the detector to aggressively speed up grace 
+ *            period detection without any delay.
+ * @param completed_gp Returns the most recent completed grace period 
+ *            number.
+ * @return false if the thread was interrupted and should stop.
+ */
+static bool wait_for_cur_cbs_gp_end(bool expedite, rcu_gp_t *completed_gp)
+{
+	spinlock_lock(&rcu.gp_lock);
+
+	ASSERT(CPU->rcu.cur_cbs_gp <= CPU->rcu.next_cbs_gp);
+	ASSERT(CPU->rcu.cur_cbs_gp <= _rcu_cur_gp + 1);
+	
+	while (rcu.completed_gp < CPU->rcu.cur_cbs_gp) {
+		/* GP has not yet started - start a new one. */
+		if (rcu.completed_gp == _rcu_cur_gp) {
+			start_new_gp();
+			spinlock_unlock(&rcu.gp_lock);
+
+			if (!wait_for_readers(expedite))
+				return false;
+
+			spinlock_lock(&rcu.gp_lock);
+			/* Notify any reclaimers this GP had ended. */
+			rcu.completed_gp = _rcu_cur_gp;
+			condvar_broadcast(&rcu.gp_ended);
+		} else {
+			/* GP detection is in progress.*/ 
+			
+			if (expedite) 
+				condvar_signal(&rcu.expedite_now);
+			
+			/* Wait for the GP to complete. */
+			int ret = _condvar_wait_timeout_spinlock(&rcu.gp_ended, &rcu.gp_lock, 
+				SYNCH_NO_TIMEOUT, SYNCH_FLAGS_INTERRUPTIBLE);
+			
+			if (ret == ESYNCH_INTERRUPTED) {
+				spinlock_unlock(&rcu.gp_lock);
+				return false;			
+			}
+		}
+	}
+	
+	upd_missed_gp_in_wait(rcu.completed_gp);
+	
+	*completed_gp = rcu.completed_gp;
+	spinlock_unlock(&rcu.gp_lock);
+	
+	return true;
+}
+
+static bool wait_for_readers(bool expedite)
+{
+	DEFINE_CPU_MASK(reader_cpus);
+	
+	cpu_mask_active(reader_cpus);
+	rm_quiescent_cpus(reader_cpus);
+	
+	while (!cpu_mask_is_none(reader_cpus)) {
+		/* Give cpus a chance to context switch (a QS) and batch callbacks. */
+		if(!gp_sleep(&expedite)) 
+			return false;
+		
+		rm_quiescent_cpus(reader_cpus);
+		sample_cpus(reader_cpus, reader_cpus);
+	}
+	
+	/* Update statistic. */
+	if (expedite) {
+		++rcu.stat_expedited_cnt;
+	}
+	
+	/* 
+	 * All cpus have passed through a QS and see the most recent _rcu_cur_gp.
+	 * As a result newly preempted readers will associate with next_preempted
+	 * and the number of old readers in cur_preempted will monotonically
+	 * decrease. Wait for those old/preexisting readers.
+	 */
+	return wait_for_preempt_reader();
+}
+
+static bool gp_sleep(bool *expedite)
+{
+	if (*expedite) {
+		scheduler();
+		return true;
+	} else {
+		spinlock_lock(&rcu.gp_lock);
+
+		int ret = 0;
+		ret = _condvar_wait_timeout_spinlock(&rcu.expedite_now, &rcu.gp_lock,
+			DETECT_SLEEP_MS * 1000, SYNCH_FLAGS_INTERRUPTIBLE);
+
+		/* rcu.expedite_now was signaled. */
+		if (ret == ESYNCH_OK_BLOCKED) {
+			*expedite = true;
+		}
+
+		spinlock_unlock(&rcu.gp_lock);
+
+		return (ret != ESYNCH_INTERRUPTED);
+	}
+}
+
+static void sample_local_cpu(void *arg)
+{
+	ASSERT(interrupts_disabled());
+	cpu_mask_t *reader_cpus = (cpu_mask_t *)arg;
+	
+	bool locked = RCU_CNT_INC <= THE->rcu_nesting;
+	/* smp_call machinery makes the most current _rcu_cur_gp visible. */
+	bool passed_qs = (CPU->rcu.last_seen_gp == _rcu_cur_gp);
+		
+	if (locked && !passed_qs) {
+		/* 
+		 * This cpu has not yet passed a quiescent state during this grace
+		 * period and it is currently in a reader section. We'll have to
+		 * try to sample this cpu again later.
+		 */
+	} else {
+		/* Either not in a reader section or already passed a QS. */
+		cpu_mask_reset(reader_cpus, CPU->id);
+		/* Contain new reader sections and make prior changes visible to them.*/
+		memory_barrier();
+		CPU->rcu.last_seen_gp = _rcu_cur_gp;
+	}
+}
+
+/** Called by the scheduler() when switching away from the current thread. */
+void rcu_after_thread_ran(void)
+{
+	ASSERT(interrupts_disabled());
+
+	/* 
+	 * In order not to worry about NMI seeing rcu_nesting change work 
+	 * with a local copy.
+	 */
+	size_t nesting_cnt = local_atomic_exchange(&THE->rcu_nesting, 0);
+	
+	/* 
+	 * Ensures NMIs see .rcu_nesting without the WAS_PREEMPTED mark and
+	 * do not accidentally call rm_preempted_reader() from unlock().
+	 */
+	compiler_barrier();
+	
+	/* Preempted a reader critical section for the first time. */
+	if (RCU_CNT_INC <= nesting_cnt && !(nesting_cnt & RCU_WAS_PREEMPTED)) {
+		nesting_cnt |= RCU_WAS_PREEMPTED;
+		note_preempted_reader();
+	}
+	
+	/* Save the thread's nesting count when it is not running. */
+	THREAD->rcu.nesting_cnt = nesting_cnt;
+
+	if (CPU->rcu.last_seen_gp != _rcu_cur_gp) {
+		/* 
+		 * Contain any memory accesses of old readers before announcing a QS. 
+		 * Also make changes from the previous GP visible to this cpu.
+		 * Moreover it separates writing to last_seen_gp from 
+		 * note_preempted_reader().
+		 */
+		memory_barrier();
+		/* 
+		 * The preempted reader has been noted globally. There are therefore
+		 * no readers running on this cpu so this is a quiescent state.
+		 * 
+		 * Reading the multiword _rcu_cur_gp non-atomically is benign. 
+		 * At worst, the read value will be different from the actual value.
+		 * As a result, both the detector and this cpu will believe
+		 * this cpu has not yet passed a QS although it really did.
+		 * 
+		 * Reloading _rcu_cur_gp is benign, because it cannot change
+		 * until this cpu acknowledges it passed a QS by writing to
+		 * last_seen_gp. Since interrupts are disabled, only this
+		 * code may to so (IPIs won't get through).
+		 */
+		CPU->rcu.last_seen_gp = _rcu_cur_gp;
+	}
+
+	/* 
+	 * Forcefully associate the reclaimer with the highest priority
+	 * even if preempted due to its time slice running out.
+	 */
+	if (THREAD == CPU->rcu.reclaimer_thr) {
+		THREAD->priority = -1;
+	} 
+	
+	upd_max_cbs_in_slice(CPU->rcu.arriving_cbs_cnt);
+}
+
+/** Called by the scheduler() when switching to a newly scheduled thread. */
+void rcu_before_thread_runs(void)
+{
+	ASSERT(!rcu_read_locked());
+	
+	/* Load the thread's saved nesting count from before it was preempted. */
+	THE->rcu_nesting = THREAD->rcu.nesting_cnt;
+}
+
+/** Called from scheduler() when exiting the current thread. 
+ * 
+ * Preemption or interrupts are disabled and the scheduler() already
+ * switched away from the current thread, calling rcu_after_thread_ran().
+ */
+void rcu_thread_exiting(void)
+{
+	ASSERT(THE->rcu_nesting == 0);
+	
+	/* 
+	 * The thread forgot to exit its reader critical section. 
+	 * It is a bug, but rather than letting the entire system lock up
+	 * forcefully leave the reader section. The thread is not holding 
+	 * any references anyway since it is exiting so it is safe.
+	 */
+	if (RCU_CNT_INC <= THREAD->rcu.nesting_cnt) {
+		/* Emulate _rcu_preempted_unlock() with the proper nesting count. */
+		if (THREAD->rcu.nesting_cnt & RCU_WAS_PREEMPTED) {
+			rm_preempted_reader();
+		}
+
+		printf("Bug: thread (id %" PRIu64 " \"%s\") exited while in RCU read"
+			" section.\n", THREAD->tid, THREAD->name);
+	}
+}
+
+/** Returns true if in an rcu reader section. */
+bool rcu_read_locked(void)
+{
+	return RCU_CNT_INC <= THE->rcu_nesting;
+}
+
+/** Invoked when a preempted reader finally exits its reader section. */
+void _rcu_preempted_unlock(void)
+{
+	ASSERT(0 == THE->rcu_nesting || RCU_WAS_PREEMPTED == THE->rcu_nesting);
+	
+	size_t prev = local_atomic_exchange(&THE->rcu_nesting, 0);
+	if (prev == RCU_WAS_PREEMPTED) {
+		/* 
+		 * NMI handlers are never preempted but may call rm_preempted_reader()
+		 * if a NMI occurred in _rcu_preempted_unlock() of a preempted thread.
+		 * The only other rcu code that may have been interrupted by the NMI
+		 * in _rcu_preempted_unlock() is: an IPI/sample_local_cpu() and
+		 * the initial part of rcu_after_thread_ran().
+		 * 
+		 * rm_preempted_reader() will not deadlock because none of the locks
+		 * it uses are locked in this case. Neither _rcu_preempted_unlock()
+		 * nor sample_local_cpu() nor the initial part of rcu_after_thread_ran()
+		 * acquire any locks.
+		 */
+		rm_preempted_reader();
+	}
+}
+
+#elif defined(RCU_PREEMPT_PODZIMEK)
+
+/** Waits for the grace period associated with callbacks cub_cbs to elapse. 
+ * 
+ * @param expedite Instructs the detector to aggressively speed up grace 
+ *            period detection without any delay.
+ * @param completed_gp Returns the most recent completed grace period 
+ *            number.
+ * @return false if the thread was interrupted and should stop.
+ */
+static bool wait_for_cur_cbs_gp_end(bool expedite, rcu_gp_t *completed_gp)
+{
+	/* 
+	 * Use a possibly outdated version of completed_gp to bypass checking
+	 * with the lock.
+	 * 
+	 * Note that loading and storing rcu.completed_gp is not atomic 
+	 * (it is 64bit wide). Reading a clobbered value that is less than 
+	 * rcu.completed_gp is harmless - we'll recheck with a lock. The 
+	 * only way to read a clobbered value that is greater than the actual 
+	 * value is if the detector increases the higher-order word first and 
+	 * then decreases the lower-order word (or we see stores in that order), 
+	 * eg when incrementing from 2^32 - 1 to 2^32. The loaded value 
+	 * suddenly jumps by 2^32. It would take hours for such an increase 
+	 * to occur so it is safe to discard the value. We allow increases 
+	 * of up to half the maximum to generously accommodate for loading an
+	 * outdated lower word.
+	 */
+	rcu_gp_t compl_gp = ACCESS_ONCE(rcu.completed_gp);
+	if (CPU->rcu.cur_cbs_gp <= compl_gp 
+		&& compl_gp <= CPU->rcu.cur_cbs_gp + UINT32_MAX_HALF) {
+		*completed_gp = compl_gp;
+		return true;
+	}
+	
+	spinlock_lock(&rcu.gp_lock);
+	
+	if (CPU->rcu.cur_cbs_gp <= rcu.completed_gp) {
+		*completed_gp = rcu.completed_gp;
+		spinlock_unlock(&rcu.gp_lock);
+		return true;
+	}
+	
+	ASSERT(CPU->rcu.cur_cbs_gp <= CPU->rcu.next_cbs_gp);
+	ASSERT(_rcu_cur_gp <= CPU->rcu.cur_cbs_gp);
+	
+	/* 
+	 * Notify the detector of how many GP ends we intend to wait for, so 
+	 * it can avoid going to sleep unnecessarily. Optimistically assume
+	 * new callbacks will arrive while we're waiting; hence +1.
+	 */
+	size_t remaining_gp_ends = (size_t) (CPU->rcu.next_cbs_gp - _rcu_cur_gp);
+	req_detection(remaining_gp_ends + (arriving_cbs_empty() ? 0 : 1));
+	
+	/* 
+	 * Ask the detector to speed up GP detection if there are too many 
+	 * pending callbacks and other reclaimers have not already done so.
+	 */
+	if (expedite) {
+		if(0 == rcu.req_expedited_cnt) 
+			condvar_signal(&rcu.expedite_now);
+		
+		/* 
+		 * Expedite only cub_cbs. If there really is a surge of callbacks 
+		 * the arriving batch will expedite the GP for the huge number
+		 * of callbacks currently in next_cbs
+		 */
+		rcu.req_expedited_cnt = 1;
+	}
+
+	/* Wait for cur_cbs_gp to end. */
+	bool interrupted = cv_wait_for_gp(CPU->rcu.cur_cbs_gp);
+	
+	*completed_gp = rcu.completed_gp;
+	spinlock_unlock(&rcu.gp_lock);	
+	
+	if (!interrupted)
+		upd_missed_gp_in_wait(*completed_gp);
+	
+	return !interrupted;
+}
+
+/** Waits for an announcement of the end of the grace period wait_on_gp. */
+static bool cv_wait_for_gp(rcu_gp_t wait_on_gp)
+{
+	ASSERT(spinlock_locked(&rcu.gp_lock));
+	
+	bool interrupted = false;
+	
+	/* Wait until wait_on_gp ends. */
+	while (rcu.completed_gp < wait_on_gp && !interrupted) {
+		int ret = _condvar_wait_timeout_spinlock(&rcu.gp_ended, &rcu.gp_lock, 
+			SYNCH_NO_TIMEOUT, SYNCH_FLAGS_INTERRUPTIBLE);
+		interrupted = (ret == ESYNCH_INTERRUPTED);
+	}
+	
+	return interrupted;
+}
+
+/** Requests the detector to detect at least req_cnt consecutive grace periods.*/
+static void req_detection(size_t req_cnt)
+{
+	if (rcu.req_gp_end_cnt < req_cnt) {
+		bool detector_idle = (0 == rcu.req_gp_end_cnt);
+		rcu.req_gp_end_cnt = req_cnt;
+
+		if (detector_idle) {
+			ASSERT(_rcu_cur_gp == rcu.completed_gp);
+			condvar_signal(&rcu.req_gp_changed);
+		}
+	}
+}
+
+
+/** The detector thread detects and notifies reclaimers of grace period ends. */
+static void detector(void *arg)
+{
+	spinlock_lock(&rcu.gp_lock);
+	
+	while (wait_for_detect_req()) {
+		/* 
+		 * Announce new GP started. Readers start lazily acknowledging that
+		 * they passed a QS.
+		 */
+		start_new_gp();
+		
+		spinlock_unlock(&rcu.gp_lock);
+		
+		if (!wait_for_readers()) 
+			goto unlocked_out;
+		
+		spinlock_lock(&rcu.gp_lock);
+
+		/* Notify reclaimers that they may now invoke queued callbacks. */
+		end_cur_gp();
+	}
+	
+	spinlock_unlock(&rcu.gp_lock);
+	
+unlocked_out:
+	return;
+}
+
+/** Waits for a request from a reclaimer thread to detect a grace period. */
+static bool wait_for_detect_req(void)
+{
+	ASSERT(spinlock_locked(&rcu.gp_lock));
+	
+	bool interrupted = false;
+	
+	while (0 == rcu.req_gp_end_cnt && !interrupted) {
+		int ret = _condvar_wait_timeout_spinlock(&rcu.req_gp_changed, 
+			&rcu.gp_lock, SYNCH_NO_TIMEOUT, SYNCH_FLAGS_INTERRUPTIBLE);
+		
+		interrupted = (ret == ESYNCH_INTERRUPTED);
+	}
+	
+	return !interrupted;
+}
+
+
+static void end_cur_gp(void)
+{
+	ASSERT(spinlock_locked(&rcu.gp_lock));
+	
+	rcu.completed_gp = _rcu_cur_gp;
+	--rcu.req_gp_end_cnt;
+	
+	condvar_broadcast(&rcu.gp_ended);
+}
+
+/** Waits for readers that started before the current GP started to finish. */
+static bool wait_for_readers(void)
+{
+	DEFINE_CPU_MASK(reading_cpus);
+	
+	/* All running cpus have potential readers. */
+	cpu_mask_active(reading_cpus);
+
+	/* 
+	 * Give readers time to pass through a QS. Also, batch arriving 
+	 * callbacks in order to amortize detection overhead.
+	 */
+	if (!gp_sleep())
+		return false;
+	
+	/* Non-intrusively determine which cpus have yet to pass a QS. */
+	rm_quiescent_cpus(reading_cpus);
+	
+	/* Actively interrupt cpus delaying the current GP and demand a QS. */
+	interrupt_delaying_cpus(reading_cpus);
+	
+	/* Wait for the interrupted cpus to notify us that they reached a QS. */
+	if (!wait_for_delaying_cpus())
+		return false;
+	/*
+	 * All cpus recorded a QS or are still idle. Any new readers will be added
+	 * to next_preempt if preempted, ie the number of readers in cur_preempted
+	 * monotonically descreases.
+	 */
+	
+	/* Wait for the last reader in cur_preempted to notify us it is done. */
+	if (!wait_for_preempt_reader())
+		return false;
+	
+	return true;
+}
+
+/** Sleeps a while if the current grace period is not to be expedited. */
+static bool gp_sleep(void)
+{
+	spinlock_lock(&rcu.gp_lock);
+
+	int ret = 0;
+	while (0 == rcu.req_expedited_cnt && 0 == ret) {
+		/* minor bug: sleeps for the same duration if woken up spuriously. */
+		ret = _condvar_wait_timeout_spinlock(&rcu.expedite_now, &rcu.gp_lock,
+			DETECT_SLEEP_MS * 1000, SYNCH_FLAGS_INTERRUPTIBLE);
+	}
+	
+	if (0 < rcu.req_expedited_cnt) {
+		--rcu.req_expedited_cnt;
+		/* Update statistic. */
+		++rcu.stat_expedited_cnt;
+	}
+	
+	spinlock_unlock(&rcu.gp_lock);
+	
+	return (ret != ESYNCH_INTERRUPTED);
+}
+
+/** Actively interrupts and checks the offending cpus for quiescent states. */
+static void interrupt_delaying_cpus(cpu_mask_t *cpu_mask)
+{
+	atomic_set(&rcu.delaying_cpu_cnt, 0);
+	
+	sample_cpus(cpu_mask, NULL);
+}
+
+/** Invoked on a cpu delaying grace period detection. 
+ * 
+ * Induces a quiescent state for the cpu or it instructs remaining 
+ * readers to notify the detector once they finish.
+ */
+static void sample_local_cpu(void *arg)
+{
+	ASSERT(interrupts_disabled());
+	ASSERT(!CPU->rcu.is_delaying_gp);
+	
+	/* Cpu did not pass a quiescent state yet. */
+	if (CPU->rcu.last_seen_gp != _rcu_cur_gp) {
+		/* Interrupted a reader in a reader critical section. */
+		if (0 < CPU->rcu.nesting_cnt) {
+			ASSERT(!CPU->idle);
+			/* 
+			 * Note to notify the detector from rcu_read_unlock(). 
+			 * 
+			 * ACCESS_ONCE ensures the compiler writes to is_delaying_gp
+			 * only after it determines that we are in a reader CS.
+			 */
+			ACCESS_ONCE(CPU->rcu.is_delaying_gp) = true;
+			CPU->rcu.signal_unlock = true;
+			
+			atomic_inc(&rcu.delaying_cpu_cnt);
+		} else {
+			/* 
+			 * The cpu did not enter any rcu reader sections since 
+			 * the start of the current GP. Record a quiescent state.
+			 * 
+			 * Or, we interrupted rcu_read_unlock_impl() right before
+			 * it recorded a QS. Record a QS for it. The memory barrier 
+			 * contains the reader section's mem accesses before 
+			 * updating last_seen_gp.
+			 * 
+			 * Or, we interrupted rcu_read_lock() right after it recorded
+			 * a QS for the previous GP but before it got a chance to
+			 * increment its nesting count. The memory barrier again
+			 * stops the CS code from spilling out of the CS.
+			 */
+			memory_barrier();
+			CPU->rcu.last_seen_gp = _rcu_cur_gp;
+		}
+	} else {
+		/* 
+		 * This cpu already acknowledged that it had passed through 
+		 * a quiescent state since the start of cur_gp. 
+		 */
+	}
+	
+	/* 
+	 * smp_call() makes sure any changes propagate back to the caller.
+	 * In particular, it makes the most current last_seen_gp visible
+	 * to the detector.
+	 */
+}
+
+/** Waits for cpus delaying the current grace period if there are any. */
+static bool wait_for_delaying_cpus(void)
+{
+	int delaying_cpu_cnt = atomic_get(&rcu.delaying_cpu_cnt);
+
+	for (int i = 0; i < delaying_cpu_cnt; ++i){
+		if (!semaphore_down_interruptable(&rcu.remaining_readers))
+			return false;
+	}
+	
+	/* Update statistic. */
+	rcu.stat_delayed_cnt += delaying_cpu_cnt;
+	
+	return true;
+}
+
+/** Called by the scheduler() when switching away from the current thread. */
+void rcu_after_thread_ran(void)
+{
+	ASSERT(interrupts_disabled());
+
+	/* 
+	 * Prevent NMI handlers from interfering. The detector will be notified
+	 * in this function if CPU->rcu.is_delaying_gp. The current thread is 
+	 * no longer running so there is nothing else to signal to the detector.
+	 */
+	CPU->rcu.signal_unlock = false;
+	/* 
+	 * Separates clearing of .signal_unlock from accesses to 
+	 * THREAD->rcu.was_preempted and CPU->rcu.nesting_cnt.
+	 */
+	compiler_barrier();
+	
+	/* Save the thread's nesting count when it is not running. */
+	THREAD->rcu.nesting_cnt = CPU->rcu.nesting_cnt;
+	
+	/* Preempted a reader critical section for the first time. */
+	if (0 < THREAD->rcu.nesting_cnt && !THREAD->rcu.was_preempted) {
+		THREAD->rcu.was_preempted = true;
+		note_preempted_reader();
+	}
+	
+	/* 
+	 * The preempted reader has been noted globally. There are therefore
+	 * no readers running on this cpu so this is a quiescent state.
+	 */
+	_rcu_record_qs();
+
+	/* 
+	 * Interrupt handlers might use RCU while idle in scheduler(). 
+	 * The preempted reader has been noted globally, so the handlers 
+	 * may now start announcing quiescent states.
+	 */
+	CPU->rcu.nesting_cnt = 0;
+	
+	/* 
+	 * This cpu is holding up the current GP. Let the detector know 
+	 * it has just passed a quiescent state. 
+	 * 
+	 * The detector waits separately for preempted readers, so we have 
+	 * to notify the detector even if we have just preempted a reader.
+	 */
+	if (CPU->rcu.is_delaying_gp) {
+		CPU->rcu.is_delaying_gp = false;
+		semaphore_up(&rcu.remaining_readers);
+	}
+
+	/* 
+	 * Forcefully associate the detector with the highest priority
+	 * even if preempted due to its time slice running out.
+	 * 
+	 * todo: Replace with strict scheduler priority classes.
+	 */
+	if (THREAD == rcu.detector_thr) {
+		THREAD->priority = -1;
+	} 
+	else if (THREAD == CPU->rcu.reclaimer_thr) {
+		THREAD->priority = -1;
+	} 
+	
+	upd_max_cbs_in_slice(CPU->rcu.arriving_cbs_cnt);
+}
+
+/** Called by the scheduler() when switching to a newly scheduled thread. */
+void rcu_before_thread_runs(void)
+{
+	ASSERT(PREEMPTION_DISABLED || interrupts_disabled());
+	ASSERT(0 == CPU->rcu.nesting_cnt);
+	
+	/* Load the thread's saved nesting count from before it was preempted. */
+	CPU->rcu.nesting_cnt = THREAD->rcu.nesting_cnt;
+	
+	/* 
+	 * Ensures NMI see the proper nesting count before .signal_unlock.
+	 * Otherwise the NMI may incorrectly signal that a preempted reader
+	 * exited its reader section.
+	 */
+	compiler_barrier();
+	
+	/* 
+	 * In the unlikely event that a NMI occurs between the loading of the 
+	 * variables and setting signal_unlock, the NMI handler may invoke 
+	 * rcu_read_unlock() and clear signal_unlock. In that case we will
+	 * incorrectly overwrite signal_unlock from false to true. This event
+	 * is benign and the next rcu_read_unlock() will at worst 
+	 * needlessly invoke _rcu_signal_unlock().
+	 */
+	CPU->rcu.signal_unlock = THREAD->rcu.was_preempted || CPU->rcu.is_delaying_gp;
+}
+
+/** Called from scheduler() when exiting the current thread. 
+ * 
+ * Preemption or interrupts are disabled and the scheduler() already
+ * switched away from the current thread, calling rcu_after_thread_ran().
+ */
+void rcu_thread_exiting(void)
+{
+	ASSERT(THREAD != NULL);
+	ASSERT(THREAD->state == Exiting);
+	ASSERT(PREEMPTION_DISABLED || interrupts_disabled());
+	
+	/* 
+	 * The thread forgot to exit its reader critical section. 
+	 * It is a bug, but rather than letting the entire system lock up
+	 * forcefully leave the reader section. The thread is not holding 
+	 * any references anyway since it is exiting so it is safe.
+	 */
+	if (0 < THREAD->rcu.nesting_cnt) {
+		THREAD->rcu.nesting_cnt = 1;
+		read_unlock_impl(&THREAD->rcu.nesting_cnt);
+
+		printf("Bug: thread (id %" PRIu64 " \"%s\") exited while in RCU read"
+			" section.\n", THREAD->tid, THREAD->name);
+	}
+}
+
+
+#endif /* RCU_PREEMPT_PODZIMEK */
+
+/** Announces the start of a new grace period for preexisting readers to ack. */
+static void start_new_gp(void)
+{
+	ASSERT(spinlock_locked(&rcu.gp_lock));
+	
+	irq_spinlock_lock(&rcu.preempt_lock, true);
+	
+	/* Start a new GP. Announce to readers that a quiescent state is needed. */
+	++_rcu_cur_gp;
+	
+	/* 
+	 * Readers preempted before the start of this GP (next_preempted)
+	 * are preexisting readers now that a GP started and will hold up 
+	 * the current GP until they exit their reader sections.
+	 * 
+	 * Preempted readers from the previous GP have finished so 
+	 * cur_preempted is empty, but see comment in _rcu_record_qs(). 
+	 */
+	list_concat(&rcu.cur_preempted, &rcu.next_preempted);
+	
+	irq_spinlock_unlock(&rcu.preempt_lock, true);
+}
+
+/** Remove those cpus from the mask that have already passed a quiescent
+ * state since the start of the current grace period.
+ */
+static void rm_quiescent_cpus(cpu_mask_t *cpu_mask)
+{
+	/*
+	 * Ensure the announcement of the start of a new GP (ie up-to-date 
+	 * cur_gp) propagates to cpus that are just coming out of idle 
+	 * mode before we sample their idle state flag.
+	 * 
+	 * Cpus guarantee that after they set CPU->idle = true they will not
+	 * execute any RCU reader sections without first setting idle to
+	 * false and issuing a memory barrier. Therefore, if rm_quiescent_cpus()
+	 * later on sees an idle cpu, but the cpu is just exiting its idle mode,
+	 * the cpu must not have yet executed its memory barrier (otherwise
+	 * it would pair up with this mem barrier and we would see idle == false).
+	 * That memory barrier will pair up with the one below and ensure
+	 * that a reader on the now-non-idle cpu will see the most current
+	 * cur_gp. As a result, such a reader will never attempt to semaphore_up(
+	 * pending_readers) during this GP, which allows the detector to
+	 * ignore that cpu (the detector thinks it is idle). Moreover, any
+	 * changes made by RCU updaters will have propagated to readers
+	 * on the previously idle cpu -- again thanks to issuing a memory
+	 * barrier after returning from idle mode.
+	 * 
+	 * idle -> non-idle cpu      | detector      | reclaimer
+	 * ------------------------------------------------------
+	 * rcu reader 1              |               | rcu_call()
+	 * MB X                      |               |
+	 * idle = true               |               | rcu_call() 
+	 * (no rcu readers allowed ) |               | MB A in advance_cbs() 
+	 * MB Y                      | (...)         | (...)
+	 * (no rcu readers allowed)  |               | MB B in advance_cbs() 
+	 * idle = false              | ++cur_gp      |
+	 * (no rcu readers allowed)  | MB C          |
+	 * MB Z                      | signal gp_end |
+	 * rcu reader 2              |               | exec_cur_cbs()
+	 * 
+	 * 
+	 * MB Y orders visibility of changes to idle for detector's sake.
+	 * 
+	 * MB Z pairs up with MB C. The cpu making a transition from idle 
+	 * will see the most current value of cur_gp and will not attempt
+	 * to notify the detector even if preempted during this GP.
+	 * 
+	 * MB Z pairs up with MB A from the previous batch. Updaters' changes
+	 * are visible to reader 2 even when the detector thinks the cpu is idle 
+	 * but it is not anymore.
+	 * 
+	 * MB X pairs up with MB B. Late mem accesses of reader 1 are contained
+	 * and visible before idling and before any callbacks are executed 
+	 * by reclaimers.
+	 * 
+	 * In summary, the detector does not know of or wait for reader 2, but
+	 * it does not have to since it is a new reader that will not access
+	 * data from previous GPs and will see any changes.
+	 */
+	memory_barrier(); /* MB C */
+	
+	cpu_mask_for_each(*cpu_mask, cpu_id) {
+		/* 
+		 * The cpu already checked for and passed through a quiescent 
+		 * state since the beginning of this GP.
+		 * 
+		 * _rcu_cur_gp is modified by local detector thread only. 
+		 * Therefore, it is up-to-date even without a lock. 
+		 * 
+		 * cpu.last_seen_gp may not be up-to-date. At worst, we will
+		 * unnecessarily sample its last_seen_gp with a smp_call. 
+		 */
+		bool cpu_acked_gp = (cpus[cpu_id].rcu.last_seen_gp == _rcu_cur_gp);
+		
+		/*
+		 * Either the cpu is idle or it is exiting away from idle mode
+		 * and already sees the most current _rcu_cur_gp. See comment
+		 * in wait_for_readers().
+		 */
+		bool cpu_idle = cpus[cpu_id].idle;
+		
+		if (cpu_acked_gp || cpu_idle) {
+			cpu_mask_reset(cpu_mask, cpu_id);
+		}
+	}
+}
+
+/** Serially invokes sample_local_cpu(arg) on each cpu of reader_cpus. */
+static void sample_cpus(cpu_mask_t *reader_cpus, void *arg)
+{
+	cpu_mask_for_each(*reader_cpus, cpu_id) {
+		smp_call(cpu_id, sample_local_cpu, arg);
+
+		/* Update statistic. */
+		if (CPU->id != cpu_id)
+			++rcu.stat_smp_call_cnt;
+	}
+}
+
+static void upd_missed_gp_in_wait(rcu_gp_t completed_gp)
+{
+	ASSERT(CPU->rcu.cur_cbs_gp <= completed_gp);
+	
+	size_t delta = (size_t)(completed_gp - CPU->rcu.cur_cbs_gp);
+	CPU->rcu.stat_missed_gp_in_wait += delta;
+}
+
+/** Globally note that the current thread was preempted in a reader section. */
+static void note_preempted_reader(void)
+{
+	irq_spinlock_lock(&rcu.preempt_lock, false);
+
+	if (CPU->rcu.last_seen_gp != _rcu_cur_gp) {
+		/* The reader started before the GP started - we must wait for it.*/
+		list_append(&THREAD->rcu.preempt_link, &rcu.cur_preempted);
+	} else {
+		/* 
+		 * The reader started after the GP started and this cpu
+		 * already noted a quiescent state. We might block the next GP.
+		 */
+		list_append(&THREAD->rcu.preempt_link, &rcu.next_preempted);
+	}
+
+	irq_spinlock_unlock(&rcu.preempt_lock, false);
+}
+
+/** Remove the current thread from the global list of preempted readers. */
+static void rm_preempted_reader(void)
+{
+	irq_spinlock_lock(&rcu.preempt_lock, true);
+	
+	ASSERT(link_used(&THREAD->rcu.preempt_link));
+
+	bool prev_empty = list_empty(&rcu.cur_preempted);
+	list_remove(&THREAD->rcu.preempt_link);
+	bool now_empty = list_empty(&rcu.cur_preempted);
+
+	/* This was the last reader in cur_preempted. */
+	bool last_removed = now_empty && !prev_empty;
+
+	/* 
+	 * Preempted readers are blocking the detector and 
+	 * this was the last reader blocking the current GP. 
+	 */
+	if (last_removed && rcu.preempt_blocking_det) {
+		rcu.preempt_blocking_det = false;
+		semaphore_up(&rcu.remaining_readers);
+	}
+
+	irq_spinlock_unlock(&rcu.preempt_lock, true);
+}
+
+/** Waits for any preempted readers blocking this grace period to finish.*/
+static bool wait_for_preempt_reader(void)
+{
+	irq_spinlock_lock(&rcu.preempt_lock, true);
+
+	bool reader_exists = !list_empty(&rcu.cur_preempted);
+	rcu.preempt_blocking_det = reader_exists;
+	
+	irq_spinlock_unlock(&rcu.preempt_lock, true);
+	
+	if (reader_exists) {
+		/* Update statistic. */
+		++rcu.stat_preempt_blocking_cnt;
+		
+		return semaphore_down_interruptable(&rcu.remaining_readers);
+	} 	
+	
+	return true;
+}
+
+static void upd_max_cbs_in_slice(size_t arriving_cbs_cnt)
+{
+	rcu_cpu_data_t *cr = &CPU->rcu;
+	
+	if (arriving_cbs_cnt > cr->last_arriving_cnt) {
+		size_t arrived_cnt = arriving_cbs_cnt - cr->last_arriving_cnt;
+		cr->stat_max_slice_cbs = max(arrived_cnt, cr->stat_max_slice_cbs);
+	}
+	
+	cr->last_arriving_cnt = arriving_cbs_cnt;
+}
+
+/** Prints RCU run-time statistics. */
+void rcu_print_stat(void)
+{
+	/* 
+	 * Don't take locks. Worst case is we get out-dated values. 
+	 * CPU local values are updated without any locks, so there 
+	 * are no locks to lock in order to get up-to-date values.
+	 */
+	
+#ifdef RCU_PREEMPT_PODZIMEK
+	const char *algo = "podzimek-preempt-rcu";
+#elif defined(RCU_PREEMPT_A)
+	const char *algo = "a-preempt-rcu";
+#endif
+	
+	printf("Config: expedite_threshold=%d, critical_threshold=%d,"
+		" detect_sleep=%dms, %s\n",	
+		EXPEDITE_THRESHOLD, CRITICAL_THRESHOLD, DETECT_SLEEP_MS, algo);
+	printf("Completed GPs: %" PRIu64 "\n", rcu.completed_gp);
+	printf("Expedited GPs: %zu\n", rcu.stat_expedited_cnt);
+	printf("Delayed GPs:   %zu (cpus w/ still running readers after gp sleep)\n", 
+		rcu.stat_delayed_cnt);
+	printf("Preempt blocked GPs: %zu (waited for preempted readers; "
+		"running or not)\n", rcu.stat_preempt_blocking_cnt);
+	printf("Smp calls:     %zu\n", rcu.stat_smp_call_cnt);
+	
+	printf("Max arrived callbacks per GP and CPU:\n");
+	for (unsigned int i = 0; i < config.cpu_count; ++i) {
+		printf(" %zu", cpus[i].rcu.stat_max_cbs);
+	}
+
+	printf("\nAvg arrived callbacks per GP and CPU (nonempty batches only):\n");
+	for (unsigned int i = 0; i < config.cpu_count; ++i) {
+		printf(" %zu", cpus[i].rcu.stat_avg_cbs);
+	}
+	
+	printf("\nMax arrived callbacks per time slice and CPU:\n");
+	for (unsigned int i = 0; i < config.cpu_count; ++i) {
+		printf(" %zu", cpus[i].rcu.stat_max_slice_cbs);
+	}
+
+	printf("\nMissed GP notifications per CPU:\n");
+	for (unsigned int i = 0; i < config.cpu_count; ++i) {
+		printf(" %zu", cpus[i].rcu.stat_missed_gps);
+	}
+
+	printf("\nMissed GP notifications per CPU while waking up:\n");
+	for (unsigned int i = 0; i < config.cpu_count; ++i) {
+		printf(" %zu", cpus[i].rcu.stat_missed_gp_in_wait);
+	}
+	printf("\n");
+}
+
+/** @}
+ */
Index: kernel/generic/src/synch/smc.c
===================================================================
--- kernel/generic/src/synch/smc.c	(revision feeac0d8cd6e0ce736779bc5f1dc701c5cf42ba6)
+++ kernel/generic/src/synch/smc.c	(revision 5e3fa9dbfffc978cdcfdaaea9fc6a50dbb0bca1a)
@@ -41,4 +41,5 @@
 #include <arch/barrier.h>
 #include <synch/smc.h>
+#include <mm/as.h>
 
 sysarg_t sys_smc_coherence(uintptr_t va, size_t size)
Index: kernel/generic/src/synch/smp_memory_barrier.c
===================================================================
--- kernel/generic/src/synch/smp_memory_barrier.c	(revision 5e3fa9dbfffc978cdcfdaaea9fc6a50dbb0bca1a)
+++ kernel/generic/src/synch/smp_memory_barrier.c	(revision 5e3fa9dbfffc978cdcfdaaea9fc6a50dbb0bca1a)
@@ -0,0 +1,63 @@
+/*
+ * Copyright (c) 2012 Adam Hraska
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * - Redistributions of source code must retain the above copyright
+ *   notice, this list of conditions and the following disclaimer.
+ * - Redistributions in binary form must reproduce the above copyright
+ *   notice, this list of conditions and the following disclaimer in the
+ *   documentation and/or other materials provided with the distribution.
+ * - The name of the author may not be used to endorse or promote products
+ *   derived from this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
+ * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
+ * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
+ * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
+ * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
+ * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
+ * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+/** @addtogroup generic
+ * @{
+ */
+
+/**
+ * @file
+ * @brief Syscall implementation that issues a memory barrier on all cpus.
+ */
+
+#include <synch/smp_memory_barrier.h>
+#include <smp/smp_call.h>
+#include <config.h>
+
+
+static void issue_mem_bar(void *arg)
+{
+	/* smp_call already issues memory barriers on return from this function */
+}
+
+/** Issues a memory barrier on each cpu that is running a thread of the current
+ * task.
+ * 
+ * @return Irrelevant.
+ */
+sysarg_t sys_smp_memory_barrier(void)
+{
+	for (unsigned int cpu_id = 0; cpu_id < config.cpu_active; ++cpu_id) {
+		smp_call(cpu_id, issue_mem_bar, NULL);
+	}
+	
+	return 0;
+}
+
+/** @}
+ */
Index: kernel/generic/src/synch/spinlock.c
===================================================================
--- kernel/generic/src/synch/spinlock.c	(revision feeac0d8cd6e0ce736779bc5f1dc701c5cf42ba6)
+++ kernel/generic/src/synch/spinlock.c	(revision 5e3fa9dbfffc978cdcfdaaea9fc6a50dbb0bca1a)
@@ -45,4 +45,5 @@
 #include <symtab.h>
 #include <stacktrace.h>
+#include <cpu.h>
 
 #ifdef CONFIG_SMP
@@ -198,7 +199,6 @@
  *
  * @param lock    IRQ spinlock to be locked.
- * @param irq_dis If true, interrupts are actually disabled
- *                prior locking the spinlock. If false, interrupts
- *                are expected to be already disabled.
+ * @param irq_dis If true, disables interrupts before locking the spinlock.
+ *                If false, interrupts are expected to be already disabled.
  *
  */
Index: kernel/generic/src/synch/waitq.c
===================================================================
--- kernel/generic/src/synch/waitq.c	(revision feeac0d8cd6e0ce736779bc5f1dc701c5cf42ba6)
+++ kernel/generic/src/synch/waitq.c	(revision 5e3fa9dbfffc978cdcfdaaea9fc6a50dbb0bca1a)
@@ -57,4 +57,6 @@
 
 static void waitq_sleep_timed_out(void *);
+static void waitq_complete_wakeup(waitq_t *);
+
 
 /** Initialize wait queue
@@ -330,4 +332,18 @@
 		break;
 	default:
+		/* 
+		 * Wait for a waitq_wakeup() or waitq_unsleep() to complete
+		 * before returning from waitq_sleep() to the caller. Otherwise
+		 * the caller might expect that the wait queue is no longer used 
+		 * and deallocate it (although the wakeup on a another cpu has 
+		 * not yet completed and is using the wait queue). 
+		 * 
+		 * Note that we have to do this for ESYNCH_OK_BLOCKED and
+		 * ESYNCH_INTERRUPTED, but not necessarily for ESYNCH_TIMEOUT
+		 * where the timeout handler stops using the waitq before waking 
+		 * us up. To be on the safe side, ensure the waitq is not in use 
+		 * anymore in this case as well.
+		 */
+		waitq_complete_wakeup(wq);
 		break;
 	}
@@ -357,5 +373,5 @@
 	} else {
 		if (PARAM_NON_BLOCKING(flags, usec)) {
-			/* Return immediatelly instead of going to sleep */
+			/* Return immediately instead of going to sleep */
 			return ESYNCH_WOULD_BLOCK;
 		}
@@ -442,4 +458,48 @@
 	irq_spinlock_unlock(&wq->lock, true);
 }
+
+/** If there is a wakeup in progress actively waits for it to complete.
+ * 
+ * The function returns once the concurrently running waitq_wakeup()
+ * exits. It returns immediately if there are no concurrent wakeups 
+ * at the time.
+ * 
+ * Interrupts must be disabled.
+ * 
+ * Example usage:
+ * @code
+ * void callback(waitq *wq)
+ * {
+ *     // Do something and notify wait_for_completion() that we're done.
+ *     waitq_wakeup(wq);
+ * }
+ * void wait_for_completion(void) 
+ * {
+ *     waitq wg;
+ *     waitq_initialize(&wq);
+ *     // Run callback() in the background, pass it wq.
+ *     do_asynchronously(callback, &wq);
+ *     // Wait for callback() to complete its work.
+ *     waitq_sleep(&wq);
+ *     // callback() completed its work, but it may still be accessing 
+ *     // wq in waitq_wakeup(). Therefore it is not yet safe to return 
+ *     // from waitq_sleep() or it would clobber up our stack (where wq 
+ *     // is stored). waitq_sleep() ensures the wait queue is no longer
+ *     // in use by invoking waitq_complete_wakeup() internally.
+ *     
+ *     // waitq_sleep() returned, it is safe to free wq.
+ * }
+ * @endcode
+ * 
+ * @param wq  Pointer to a wait queue.
+ */
+static void waitq_complete_wakeup(waitq_t *wq)
+{
+	ASSERT(interrupts_disabled());
+	
+	irq_spinlock_lock(&wq->lock, false);
+	irq_spinlock_unlock(&wq->lock, false);
+}
+
 
 /** Internal SMP- and IRQ-unsafe version of waitq_wakeup()
Index: kernel/generic/src/synch/workqueue.c
===================================================================
--- kernel/generic/src/synch/workqueue.c	(revision 5e3fa9dbfffc978cdcfdaaea9fc6a50dbb0bca1a)
+++ kernel/generic/src/synch/workqueue.c	(revision 5e3fa9dbfffc978cdcfdaaea9fc6a50dbb0bca1a)
@@ -0,0 +1,974 @@
+/*
+ * Copyright (c) 2012 Adam Hraska
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * - Redistributions of source code must retain the above copyright
+ *   notice, this list of conditions and the following disclaimer.
+ * - Redistributions in binary form must reproduce the above copyright
+ *   notice, this list of conditions and the following disclaimer in the
+ *   documentation and/or other materials provided with the distribution.
+ * - The name of the author may not be used to endorse or promote products
+ *   derived from this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
+ * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
+ * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
+ * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
+ * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
+ * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
+ * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+/** @addtogroup generic
+ * @{
+ */
+
+/**
+ * @file
+ * @brief Work queue/thread pool that automatically adjusts its size
+ *        depending on the current load. Queued work functions may sleep..
+ */
+
+#include <synch/workqueue.h>
+#include <synch/spinlock.h>
+#include <synch/condvar.h>
+#include <synch/mutex.h>
+#include <proc/thread.h>
+#include <config.h>
+#include <arch.h>
+#include <cpu.h>
+#include <macros.h>
+
+#define WORKQ_MAGIC      0xf00c1333U
+#define WORK_ITEM_MAGIC  0xfeec1777U
+
+
+struct work_queue {
+	/* 
+	 * Protects everything except activate_worker. 
+	 * Must be acquired after any thread->locks.
+	 */
+	IRQ_SPINLOCK_DECLARE(lock);
+	
+	/* Activates a worker if new work arrives or if shutting down the queue. */
+	condvar_t activate_worker;
+	
+	/* Queue of work_items ready to be dispatched. */
+	list_t queue;
+	
+	/* List of worker threads. */
+	list_t workers;
+	
+	/* Number of work items queued. */
+	size_t item_cnt;
+	
+	/* Indicates the work queue is shutting down. */
+	bool stopping;
+	const char *name;
+
+	/* Total number of created worker threads. */
+	size_t cur_worker_cnt;
+	/* Number of workers waiting for work to arrive. */
+	size_t idle_worker_cnt;
+	/* Number of idle workers signaled that have not yet been woken up. */
+	size_t activate_pending;
+	/* Number of blocked workers sleeping in work func() (ie not idle). */
+	size_t blocked_worker_cnt;
+	
+	/* Number of pending signal_worker_op() operations. */
+	size_t pending_op_cnt;
+	
+	link_t nb_link;
+	
+#ifdef CONFIG_DEBUG
+	/* Magic cookie for integrity checks. Immutable. Accessed without lock. */
+	uint32_t cookie;
+#endif 
+};
+
+
+/** Min number of idle workers to keep. */
+static size_t min_worker_cnt;
+/** Max total number of workers - be it blocked, idle, or active. */
+static size_t max_worker_cnt;
+/** Max number of concurrently running active workers, ie not blocked nor idle. */
+static size_t max_concurrent_workers;
+/** Max number of work items per active worker before a new worker is activated.*/
+static const size_t max_items_per_worker = 8;
+	
+/** System wide work queue. */
+static struct work_queue g_work_queue;
+
+static int booting = true;
+
+
+typedef struct {
+	IRQ_SPINLOCK_DECLARE(lock);
+	condvar_t req_cv;
+	thread_t *thread;
+	list_t work_queues;
+} nonblock_adder_t;
+
+static nonblock_adder_t nonblock_adder;
+
+
+
+/** Typedef a worker thread signaling operation prototype. */
+typedef void (*signal_op_t)(struct work_queue *workq);
+
+
+/* Fwd decl. */
+static void workq_preinit(struct work_queue *workq, const char *name);
+static bool add_worker(struct work_queue *workq);
+static void interrupt_workers(struct work_queue *workq);
+static void wait_for_workers(struct work_queue *workq);
+static int _workq_enqueue(struct work_queue *workq, work_t *work_item, 
+	work_func_t func, bool can_block);
+static void init_work_item(work_t *work_item, work_func_t func);
+static signal_op_t signal_worker_logic(struct work_queue *workq, bool can_block);
+static void worker_thread(void *arg);
+static bool dequeue_work(struct work_queue *workq, work_t **pwork_item);
+static bool worker_unnecessary(struct work_queue *workq);
+static void cv_wait(struct work_queue *workq);
+static void nonblock_init(void);
+
+#ifdef CONFIG_DEBUG
+static bool workq_corrupted(struct work_queue *workq);
+static bool work_item_corrupted(work_t *work_item);
+#endif
+
+/** Creates worker thread for the system-wide worker queue. */
+void workq_global_worker_init(void)
+{
+	/* 
+	 * No need for additional synchronization. Stores to word-sized 
+	 * variables are atomic and the change will eventually propagate.
+	 * Moreover add_worker() includes the necessary memory barriers
+	 * in spinlock lock/unlock().
+	 */
+	booting = false;
+	
+	nonblock_init();
+	
+	if (!add_worker(&g_work_queue))
+		panic("Could not create a single global work queue worker!\n");
+	
+}
+
+/** Initializes the system wide work queue and support for other work queues. */
+void workq_global_init(void)
+{
+	/* Keep idle workers on 1/4-th of cpus, but at least 2 threads. */
+	min_worker_cnt = max(2, config.cpu_count / 4);
+	/* Allow max 8 sleeping work items per cpu. */
+	max_worker_cnt = max(32, 8 * config.cpu_count);
+	/* Maximum concurrency without slowing down the system. */
+	max_concurrent_workers = max(2, config.cpu_count);
+	
+	workq_preinit(&g_work_queue, "kworkq");
+}
+
+/** Stops the system global work queue and waits for all work items to complete.*/
+void workq_global_stop(void)
+{
+	workq_stop(&g_work_queue);
+}
+
+/** Creates and initializes a work queue. Returns NULL upon failure. */
+struct work_queue * workq_create(const char *name)
+{
+	struct work_queue *workq = malloc(sizeof(struct work_queue), 0);
+	
+	if (workq) {
+		if (workq_init(workq, name)) {
+			ASSERT(!workq_corrupted(workq));
+			return workq;
+		}
+		
+		free(workq);
+	}
+	
+	return NULL;
+}
+
+/** Frees work queue resources and stops it if it had not been done so already.*/
+void workq_destroy(struct work_queue *workq)
+{
+	ASSERT(!workq_corrupted(workq));
+	
+	irq_spinlock_lock(&workq->lock, true);
+	bool stopped = workq->stopping;
+#ifdef CONFIG_DEBUG
+	size_t running_workers = workq->cur_worker_cnt;
+#endif
+	irq_spinlock_unlock(&workq->lock, true);
+	
+	if (!stopped) {
+		workq_stop(workq);
+	} else {
+		ASSERT(0 == running_workers);
+	}
+	
+#ifdef CONFIG_DEBUG
+	workq->cookie = 0;
+#endif 
+	
+	free(workq);
+}
+
+/** Initializes workq structure without creating any workers. */
+static void workq_preinit(struct work_queue *workq, const char *name)
+{
+#ifdef CONFIG_DEBUG
+	workq->cookie = WORKQ_MAGIC;
+#endif 
+	
+	irq_spinlock_initialize(&workq->lock, name);
+	condvar_initialize(&workq->activate_worker);
+	
+	list_initialize(&workq->queue);
+	list_initialize(&workq->workers);
+	
+	workq->item_cnt = 0;
+	workq->stopping = false;
+	workq->name = name;
+	
+	workq->cur_worker_cnt = 1;
+	workq->idle_worker_cnt = 0;
+	workq->activate_pending = 0;
+	workq->blocked_worker_cnt = 0;
+	
+	workq->pending_op_cnt = 0;
+	link_initialize(&workq->nb_link);
+}
+
+/** Initializes a work queue. Returns true if successful.  
+ * 
+ * Before destroying a work queue it must be stopped via
+ * workq_stop().
+ */
+int workq_init(struct work_queue *workq, const char *name)
+{
+	workq_preinit(workq, name);
+	return add_worker(workq);
+}
+
+/** Add a new worker thread. Returns false if the thread could not be created. */
+static bool add_worker(struct work_queue *workq)
+{
+	ASSERT(!workq_corrupted(workq));
+
+	thread_t *thread = thread_create(worker_thread, workq, TASK, 
+		THREAD_FLAG_NONE, workq->name);
+	
+	if (!thread) {
+		irq_spinlock_lock(&workq->lock, true);
+		
+		/* cur_worker_cnt proactively increased in signal_worker_logic() .*/
+		ASSERT(0 < workq->cur_worker_cnt);
+		--workq->cur_worker_cnt;
+		
+		irq_spinlock_unlock(&workq->lock, true);
+		return false;
+	}
+	
+	/* Respect lock ordering. */
+	irq_spinlock_lock(&thread->lock, true);
+	irq_spinlock_lock(&workq->lock, false);
+
+	bool success;
+
+	if (!workq->stopping) {
+		success = true;
+		
+		/* Try to distribute workers among cpus right away. */
+		unsigned int cpu_id = (workq->cur_worker_cnt) % config.cpu_active;
+		
+		if (!cpus[cpu_id].active)
+			cpu_id = CPU->id;
+
+		thread->workq = workq;	
+		thread->cpu = &cpus[cpu_id];
+		thread->workq_blocked = false;
+		thread->workq_idling = false;
+		link_initialize(&thread->workq_link);
+
+		list_append(&thread->workq_link, &workq->workers);
+	} else {
+		/* 
+		 * Work queue is shutting down - we must not add the worker
+		 * and we cannot destroy it without ready-ing it. Mark it
+		 * interrupted so the worker exits right away without even
+		 * touching workq.
+		 */
+		success = false;
+		
+		/* cur_worker_cnt proactively increased in signal_worker() .*/
+		ASSERT(0 < workq->cur_worker_cnt);
+		--workq->cur_worker_cnt;
+	}
+	
+	irq_spinlock_unlock(&workq->lock, false);
+	irq_spinlock_unlock(&thread->lock, true);
+
+	if (!success) {
+		thread_interrupt(thread);
+	}
+		
+	thread_ready(thread);
+	
+	return success;
+}
+
+/** Shuts down the work queue. Waits for all pending work items to complete.  
+ *
+ * workq_stop() may only be run once. 
+ */
+void workq_stop(struct work_queue *workq)
+{
+	ASSERT(!workq_corrupted(workq));
+	
+	interrupt_workers(workq);
+	wait_for_workers(workq);
+}
+
+/** Notifies worker threads the work queue is shutting down. */
+static void interrupt_workers(struct work_queue *workq)
+{
+	irq_spinlock_lock(&workq->lock, true);
+
+	/* workq_stop() may only be called once. */
+	ASSERT(!workq->stopping);
+	workq->stopping = true;
+	
+	/* Respect lock ordering - do not hold workq->lock during broadcast. */
+	irq_spinlock_unlock(&workq->lock, true);
+	
+	condvar_broadcast(&workq->activate_worker);
+}
+
+/** Waits for all worker threads to exit. */
+static void wait_for_workers(struct work_queue *workq)
+{
+	ASSERT(!PREEMPTION_DISABLED);
+	
+	irq_spinlock_lock(&workq->lock, true);
+	
+	list_foreach_safe(workq->workers, cur_worker, next_worker) {
+		thread_t *worker = list_get_instance(cur_worker, thread_t, workq_link);
+		list_remove(cur_worker);
+
+		/* Wait without the lock. */
+		irq_spinlock_unlock(&workq->lock, true);
+		
+		thread_join(worker);
+		thread_detach(worker);
+		
+		irq_spinlock_lock(&workq->lock, true);
+	}
+	
+	ASSERT(list_empty(&workq->workers));
+	
+	/* Wait for deferred add_worker_op(), signal_worker_op() to finish. */
+	while (0 < workq->cur_worker_cnt || 0 < workq->pending_op_cnt) {
+		irq_spinlock_unlock(&workq->lock, true);
+		
+		scheduler();
+		
+		irq_spinlock_lock(&workq->lock, true);
+	}
+	
+	irq_spinlock_unlock(&workq->lock, true);
+}
+
+/** Queues a function into the global wait queue without blocking. 
+ * 
+ * See workq_enqueue_noblock() for more details.
+ */
+int workq_global_enqueue_noblock(work_t *work_item, work_func_t func)
+{
+	return workq_enqueue_noblock(&g_work_queue, work_item, func);
+}
+
+/** Queues a function into the global wait queue; may block. 
+ * 
+ * See workq_enqueue() for more details.
+ */
+int workq_global_enqueue(work_t *work_item, work_func_t func)
+{
+	return workq_enqueue(&g_work_queue, work_item, func);
+}
+
+/** Adds a function to be invoked in a separate thread without blocking. 
+ * 
+ * workq_enqueue_noblock() is guaranteed not to block. It is safe 
+ * to invoke from interrupt handlers.
+ * 
+ * Consider using workq_enqueue() instead if at all possible. Otherwise,
+ * your work item may have to wait for previously enqueued sleeping 
+ * work items to complete if you are unlucky.
+ * 
+ * @param workq     Work queue where to queue the work item.
+ * @param work_item Work item bookkeeping structure. Must be valid
+ *                  until func() is entered.
+ * @param func      User supplied function to invoke in a worker thread.
+ 
+ * @return false if work queue is shutting down; function is not 
+ *               queued for further processing. 
+ * @return true  Otherwise. func() will be invoked in a separate thread.
+ */
+int workq_enqueue_noblock(struct work_queue *workq, work_t *work_item, 
+	work_func_t func)
+{
+	return _workq_enqueue(workq, work_item, func, false);
+}
+
+/** Adds a function to be invoked in a separate thread; may block. 
+ * 
+ * While the workq_enqueue() is unlikely to block, it may do so if too 
+ * many previous work items blocked sleeping.
+ * 
+ * @param workq     Work queue where to queue the work item.
+ * @param work_item Work item bookkeeping structure. Must be valid
+ *                  until func() is entered.
+ * @param func      User supplied function to invoke in a worker thread.
+ 
+ * @return false if work queue is shutting down; function is not 
+ *               queued for further processing. 
+ * @return true  Otherwise. func() will be invoked in a separate thread.
+ */
+int workq_enqueue(struct work_queue *workq, work_t *work_item, work_func_t func)
+{
+	return _workq_enqueue(workq, work_item, func, true);
+}
+
+/** Adds a work item that will be processed by a separate worker thread.
+ * 
+ * func() will be invoked in another kernel thread and may block. 
+ * 
+ * Prefer to call _workq_enqueue() with can_block set. Otherwise
+ * your work item may have to wait for sleeping work items to complete.
+ * If all worker threads are blocked/sleeping a new worker thread cannot
+ * be create without can_block set because creating a thread might
+ * block due to low memory conditions.
+ * 
+ * @param workq     Work queue where to queue the work item.
+ * @param work_item Work item bookkeeping structure. Must be valid
+ *                  until func() is entered.
+ * @param func      User supplied function to invoke in a worker thread.
+ * @param can_block May adding this work item block?
+ 
+ * @return false if work queue is shutting down; function is not 
+ *               queued for further processing. 
+ * @return true  Otherwise.
+ */
+static int _workq_enqueue(struct work_queue *workq, work_t *work_item, 
+	work_func_t func, bool can_block)
+{
+	ASSERT(!workq_corrupted(workq));
+	
+	bool success = true;
+	signal_op_t signal_op = NULL;
+	
+	irq_spinlock_lock(&workq->lock, true);
+	
+	if (workq->stopping) {
+		success = false;
+	} else {
+		init_work_item(work_item, func);
+		list_append(&work_item->queue_link, &workq->queue);
+		++workq->item_cnt;
+		success = true;
+		
+		if (!booting) {
+			signal_op = signal_worker_logic(workq, can_block);
+		} else {
+			/* 
+			 * During boot there are no workers to signal. Just queue 
+			 * the work and let future workers take care of it.
+			 */
+		}
+	}
+	
+	irq_spinlock_unlock(&workq->lock, true);
+
+	if (signal_op) {
+		signal_op(workq);
+	}
+	
+	return success;
+}
+
+/** Prepare an item to be added to the work item queue. */
+static void init_work_item(work_t *work_item, work_func_t func)
+{
+#ifdef CONFIG_DEBUG
+	work_item->cookie = WORK_ITEM_MAGIC;
+#endif 
+	
+	link_initialize(&work_item->queue_link);
+	work_item->func = func;
+}
+
+/** Returns the number of workers running work func() that are not blocked. */
+static size_t active_workers_now(struct work_queue *workq)
+{
+	ASSERT(irq_spinlock_locked(&workq->lock));
+	
+	/* Workers blocked are sleeping in the work function (ie not idle). */
+	ASSERT(workq->blocked_worker_cnt <= workq->cur_worker_cnt);
+	/* Idle workers are waiting for more work to arrive in condvar_wait. */
+	ASSERT(workq->idle_worker_cnt <= workq->cur_worker_cnt);
+	
+	/* Idle + blocked workers == sleeping worker threads. */
+	size_t sleeping_workers = workq->blocked_worker_cnt + workq->idle_worker_cnt;
+	
+	ASSERT(sleeping_workers	<= workq->cur_worker_cnt);
+	/* Workers pending activation are idle workers not yet given a time slice. */
+	ASSERT(workq->activate_pending <= workq->idle_worker_cnt);
+	
+	/* 
+	 * Workers actively running the work func() this very moment and 
+	 * are neither blocked nor idle. Exclude ->activate_pending workers 
+	 * since they will run their work func() once they get a time slice 
+	 * and are not running it right now.
+	 */
+	return workq->cur_worker_cnt - sleeping_workers;
+}
+
+/** 
+ * Returns the number of workers that are running or are about to run work 
+ * func() and that are not blocked. 
+ */
+static size_t active_workers(struct work_queue *workq)
+{
+	ASSERT(irq_spinlock_locked(&workq->lock));
+	
+	/* 
+	 * Workers actively running the work func() and are neither blocked nor 
+	 * idle. ->activate_pending workers will run their work func() once they
+	 * get a time slice after waking from a condvar wait, so count them
+	 * as well.
+	 */
+	return active_workers_now(workq) + workq->activate_pending;
+}
+
+static void add_worker_noblock_op(struct work_queue *workq)
+{
+	condvar_signal(&nonblock_adder.req_cv);
+}
+
+static void add_worker_op(struct work_queue *workq)
+{
+	add_worker(workq);
+}
+
+static void signal_worker_op(struct work_queue *workq)
+{
+	ASSERT(!workq_corrupted(workq));
+
+	condvar_signal(&workq->activate_worker);
+	
+	irq_spinlock_lock(&workq->lock, true);
+	ASSERT(0 < workq->pending_op_cnt);
+	--workq->pending_op_cnt;
+	irq_spinlock_unlock(&workq->lock, true);
+}
+
+/** Determines how to signal workers if at all.
+ * 
+ * @param workq     Work queue where a new work item was queued.
+ * @param can_block True if we may block while signaling a worker or creating 
+ *                  a new worker.
+ * 
+ * @return Function that will notify workers or NULL if no action is needed.
+ */
+static signal_op_t signal_worker_logic(struct work_queue *workq, bool can_block)
+{
+	ASSERT(!workq_corrupted(workq));
+	ASSERT(irq_spinlock_locked(&workq->lock));
+	
+	/* Only signal workers if really necessary. */
+	signal_op_t signal_op = NULL;
+
+	/* 
+	 * Workers actively running the work func() and neither blocked nor idle. 
+	 * Including ->activate_pending workers that will run their work func() 
+	 * once they get a time slice.
+	 */
+	size_t active = active_workers(workq);
+	/* Max total allowed number of work items queued for active workers. */
+	size_t max_load = active * max_items_per_worker;
+
+	/* Active workers are getting overwhelmed - activate another. */
+	if (max_load < workq->item_cnt) {
+
+		size_t remaining_idle = 
+			workq->idle_worker_cnt - workq->activate_pending;
+
+		/* Idle workers still exist - activate one. */
+		if (remaining_idle > 0) {
+			/* 
+			 * Directly changing idle_worker_cnt here would not allow
+			 * workers to recognize spurious wake-ups. Change 
+			 * activate_pending instead.
+			 */
+			++workq->activate_pending;
+			++workq->pending_op_cnt;
+			signal_op = signal_worker_op;
+		} else {
+			/* No idle workers remain. Request that a new one be created. */
+			bool need_worker = (active < max_concurrent_workers)
+				&& (workq->cur_worker_cnt < max_worker_cnt);
+			
+			if (need_worker && can_block) {
+				signal_op = add_worker_op;
+				/* 
+				 * It may take some time to actually create the worker.
+				 * We don't want to swamp the thread pool with superfluous
+				 * worker creation requests so pretend it was already
+				 * created and proactively increase the worker count.
+				 */
+				++workq->cur_worker_cnt;
+			}
+			
+			/* 
+			 * We cannot create a new worker but we need one desperately
+			 * because all workers are blocked in their work functions.
+			 */
+			if (need_worker && !can_block && 0 == active) {
+				ASSERT(0 == workq->idle_worker_cnt);
+				
+				irq_spinlock_lock(&nonblock_adder.lock, true);
+
+				if (nonblock_adder.thread && !link_used(&workq->nb_link)) {
+					signal_op = add_worker_noblock_op;
+					++workq->cur_worker_cnt;
+					list_append(&workq->nb_link, &nonblock_adder.work_queues);
+				}
+
+				irq_spinlock_unlock(&nonblock_adder.lock, true);
+			}
+		}
+	} else {
+		/* 
+		 * There are enough active/running workers to process the queue. 
+		 * No need to signal/activate any new workers.
+		 */
+		signal_op = NULL;
+	}
+	
+	return signal_op;
+}
+
+/** Executes queued work items. */
+static void worker_thread(void *arg)
+{
+	/* 
+	 * The thread has been created after the work queue was ordered to stop. 
+	 * Do not access the work queue and return immediately. 
+	 */
+	if (thread_interrupted(THREAD)) {
+		thread_detach(THREAD);
+		return;
+	}
+	
+	ASSERT(arg != NULL);
+	
+	struct work_queue *workq = arg;
+	work_t *work_item;
+	
+	while (dequeue_work(workq, &work_item)) {
+		/* Copy the func field so func() can safely free work_item. */
+		work_func_t func = work_item->func;
+
+		func(work_item);
+	}
+}
+
+/** Waits and retrieves a work item. Returns false if the worker should exit. */
+static bool dequeue_work(struct work_queue *workq, work_t **pwork_item)
+{
+	ASSERT(!workq_corrupted(workq));
+	
+	irq_spinlock_lock(&workq->lock, true);
+	
+	/* Check if we should exit if load is low. */
+	if (!workq->stopping && worker_unnecessary(workq)) {
+		/* There are too many workers for this load. Exit. */
+		ASSERT(0 < workq->cur_worker_cnt);
+		--workq->cur_worker_cnt;
+		list_remove(&THREAD->workq_link);
+		irq_spinlock_unlock(&workq->lock, true);
+		
+		thread_detach(THREAD);
+		return false;
+	}
+	
+	bool stop = false;
+	
+	/* Wait for work to arrive. */
+	while (list_empty(&workq->queue) && !workq->stopping) {
+		cv_wait(workq);
+		
+		if (0 < workq->activate_pending)
+			--workq->activate_pending;
+	}
+
+	/* Process remaining work even if requested to stop. */
+	if (!list_empty(&workq->queue)) {
+		link_t *work_link = list_first(&workq->queue);
+		*pwork_item = list_get_instance(work_link, work_t, queue_link);
+		
+#ifdef CONFIG_DEBUG
+		ASSERT(!work_item_corrupted(*pwork_item));
+		(*pwork_item)->cookie = 0;
+#endif
+		list_remove(work_link);
+		--workq->item_cnt;
+		
+		stop = false;
+	} else {
+		/* Requested to stop and no more work queued. */
+		ASSERT(workq->stopping);
+		--workq->cur_worker_cnt;
+		stop = true;
+	}
+	
+	irq_spinlock_unlock(&workq->lock, true);
+	
+	return !stop;
+}
+
+/** Returns true if for the given load there are too many workers. */
+static bool worker_unnecessary(struct work_queue *workq)
+{
+	ASSERT(irq_spinlock_locked(&workq->lock));
+	
+	/* No work is pending. We don't need too many idle threads. */
+	if (list_empty(&workq->queue)) {
+		/* There are too many idle workers. Exit. */
+		return (min_worker_cnt <= workq->idle_worker_cnt);
+	} else {
+		/* 
+		 * There is work but we are swamped with too many active workers
+		 * that were woken up from sleep at around the same time. We
+		 * don't need another worker fighting for cpu time.
+		 */
+		size_t active = active_workers_now(workq);
+		return (max_concurrent_workers < active);
+	}
+}
+
+/** Waits for a signal to activate_worker. Thread marked idle while waiting. */
+static void cv_wait(struct work_queue *workq)
+{
+	++workq->idle_worker_cnt;
+	THREAD->workq_idling = true;
+	
+	/* Ignore lock ordering just here. */
+	ASSERT(irq_spinlock_locked(&workq->lock));
+	
+	_condvar_wait_timeout_irq_spinlock(&workq->activate_worker,
+		&workq->lock, SYNCH_NO_TIMEOUT, SYNCH_FLAGS_NONE);
+
+	ASSERT(!workq_corrupted(workq));
+	ASSERT(irq_spinlock_locked(&workq->lock));
+	
+	THREAD->workq_idling = false;
+	--workq->idle_worker_cnt;
+}
+
+
+/** Invoked from thread_ready() right before the thread is woken up. */
+void workq_before_thread_is_ready(thread_t *thread)
+{
+	ASSERT(thread);
+	ASSERT(irq_spinlock_locked(&thread->lock));
+
+	/* Worker's work func() is about to wake up from sleeping. */
+	if (thread->workq && thread->workq_blocked) {
+		/* Must be blocked in user work func() and not be waiting for work. */
+		ASSERT(!thread->workq_idling);
+		ASSERT(thread->state == Sleeping);
+		ASSERT(THREAD != thread);
+		ASSERT(!workq_corrupted(thread->workq));
+		
+		/* Protected by thread->lock */
+		thread->workq_blocked = false;
+		
+		irq_spinlock_lock(&thread->workq->lock, true);
+		--thread->workq->blocked_worker_cnt;
+		irq_spinlock_unlock(&thread->workq->lock, true);
+	}
+}
+
+/** Invoked from scheduler() before switching away from a thread. */
+void workq_after_thread_ran(void)
+{
+	ASSERT(THREAD);
+	ASSERT(irq_spinlock_locked(&THREAD->lock));
+
+	/* Worker's work func() is about to sleep/block. */
+	if (THREAD->workq && THREAD->state == Sleeping && !THREAD->workq_idling) {
+		ASSERT(!THREAD->workq_blocked);
+		ASSERT(!workq_corrupted(THREAD->workq));
+		
+		THREAD->workq_blocked = true;
+		
+		irq_spinlock_lock(&THREAD->workq->lock, false);
+
+		++THREAD->workq->blocked_worker_cnt;
+		
+		bool can_block = false;
+		signal_op_t op = signal_worker_logic(THREAD->workq, can_block);
+		
+		irq_spinlock_unlock(&THREAD->workq->lock, false);
+		
+		if (op) {
+			ASSERT(add_worker_noblock_op == op || signal_worker_op == op);
+			op(THREAD->workq);
+		}
+	}
+}
+
+/** Prints stats of the work queue to the kernel console. */
+void workq_print_info(struct work_queue *workq)
+{
+	irq_spinlock_lock(&workq->lock, true);
+
+	size_t total = workq->cur_worker_cnt;
+	size_t blocked = workq->blocked_worker_cnt;
+	size_t idle = workq->idle_worker_cnt;
+	size_t active = active_workers(workq);
+	size_t items = workq->item_cnt;
+	bool stopping = workq->stopping;
+	bool worker_surplus = worker_unnecessary(workq);
+	const char *load_str = worker_surplus ? "decreasing" : 
+		(0 < workq->activate_pending) ? "increasing" : "stable";
+	
+	irq_spinlock_unlock(&workq->lock, true);
+	
+	printf(
+		"Configuration: max_worker_cnt=%zu, min_worker_cnt=%zu,\n"
+		" max_concurrent_workers=%zu, max_items_per_worker=%zu\n"
+		"Workers: %zu\n"
+		"Active:  %zu (workers currently processing work)\n"
+		"Blocked: %zu (work functions sleeping/blocked)\n"
+		"Idle:    %zu (idle workers waiting for more work)\n"
+		"Items:   %zu (queued not yet dispatched work)\n"
+		"Stopping: %d\n"
+		"Load: %s\n",
+		max_worker_cnt, min_worker_cnt, 
+		max_concurrent_workers, max_items_per_worker,
+		total,
+		active,
+		blocked,
+		idle,
+		items,
+		stopping,
+		load_str
+	);
+}
+
+/** Prints stats of the global work queue. */
+void workq_global_print_info(void)
+{
+	workq_print_info(&g_work_queue);
+}
+
+
+static bool dequeue_add_req(nonblock_adder_t *info, struct work_queue **pworkq)
+{
+	bool stop = false;
+
+	irq_spinlock_lock(&info->lock, true);
+	
+	while (list_empty(&info->work_queues) && !stop) {
+		int ret = _condvar_wait_timeout_irq_spinlock(&info->req_cv, 
+			&info->lock, SYNCH_NO_TIMEOUT, SYNCH_FLAGS_INTERRUPTIBLE);
+		
+		stop = (ret == ESYNCH_INTERRUPTED);
+	}
+	
+	if (!stop) {
+		*pworkq = list_get_instance(list_first(&info->work_queues), 
+			struct work_queue, nb_link);
+
+		ASSERT(!workq_corrupted(*pworkq));
+		
+		list_remove(&(*pworkq)->nb_link);
+	}
+	
+	irq_spinlock_unlock(&info->lock, true);
+	
+	return !stop;
+}
+
+static void thr_nonblock_add_worker(void *arg)
+{
+	nonblock_adder_t *info = arg;
+	struct work_queue *workq;
+	
+	while (dequeue_add_req(info, &workq)) {
+		add_worker(workq);
+	}
+}
+
+
+static void nonblock_init(void)
+{
+	irq_spinlock_initialize(&nonblock_adder.lock, "kworkq-nb.lock");
+	condvar_initialize(&nonblock_adder.req_cv);
+	list_initialize(&nonblock_adder.work_queues);
+	
+	nonblock_adder.thread = thread_create(thr_nonblock_add_worker, 
+		&nonblock_adder, TASK, THREAD_FLAG_NONE, "kworkq-nb");
+	
+	if (nonblock_adder.thread) {
+		thread_ready(nonblock_adder.thread);
+	} else {
+		/* 
+		 * We won't be able to add workers without blocking if all workers
+		 * sleep, but at least boot the system.
+		 */
+		printf("Failed to create kworkq-nb. Sleeping work may stall the workq.\n");
+	}
+}
+
+#ifdef CONFIG_DEBUG
+/** Returns true if the workq is definitely corrupted; false if not sure. 
+ * 
+ * Can be used outside of any locks.
+ */
+static bool workq_corrupted(struct work_queue *workq)
+{
+	/* 
+	 * Needed to make the most current cookie value set by workq_preinit()
+	 * visible even if we access the workq right after it is created but
+	 * on a different cpu. Otherwise, workq_corrupted() would not work
+	 * outside a lock.
+	 */
+	memory_barrier();
+	return NULL == workq || workq->cookie != WORKQ_MAGIC;
+}
+
+/** Returns true if the work_item is definitely corrupted; false if not sure. 
+ * 
+ * Must be used with the work queue protecting spinlock locked.
+ */
+static bool work_item_corrupted(work_t *work_item)
+{
+	return NULL == work_item || work_item->cookie != WORK_ITEM_MAGIC;
+}
+#endif
+
+/** @}
+ */
