source: mainline/kernel/generic/src/synch/workqueue.c@ 7c3fb9b

lfn serial ticket/834-toolchain-update topic/msim-upgrade topic/simplify-dev-export
Last change on this file since 7c3fb9b was 7c3fb9b, checked in by Jiri Svoboda <jiri@…>, 7 years ago

Fix block comment formatting (ccheck).

  • Property mode set to 100644
File size: 27.1 KB
RevLine 
[9f8745c5]1/*
2 * Copyright (c) 2012 Adam Hraska
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 *
9 * - Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 * - Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
14 * - The name of the author may not be used to endorse or promote products
15 * derived from this software without specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
18 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
19 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
20 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
21 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
22 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
26 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27 */
28
29/** @addtogroup generic
30 * @{
31 */
32
33/**
34 * @file
35 * @brief Work queue/thread pool that automatically adjusts its size
36 * depending on the current load. Queued work functions may sleep..
37 */
38
[63e27ef]39#include <assert.h>
[897fd8f1]40#include <errno.h>
[8a64e81e]41#include <synch/workqueue.h>
42#include <synch/spinlock.h>
43#include <synch/condvar.h>
44#include <synch/mutex.h>
45#include <proc/thread.h>
46#include <config.h>
47#include <arch.h>
48#include <cpu.h>
49#include <macros.h>
50
[0d56712]51#define WORKQ_MAGIC 0xf00c1333U
52#define WORK_ITEM_MAGIC 0xfeec1777U
53
[8a64e81e]54
55struct work_queue {
[1b20da0]56 /*
57 * Protects everything except activate_worker.
[8a64e81e]58 * Must be acquired after any thread->locks.
59 */
60 IRQ_SPINLOCK_DECLARE(lock);
[a35b458]61
[8a64e81e]62 /* Activates a worker if new work arrives or if shutting down the queue. */
63 condvar_t activate_worker;
[a35b458]64
[8a64e81e]65 /* Queue of work_items ready to be dispatched. */
66 list_t queue;
[a35b458]67
[8a64e81e]68 /* List of worker threads. */
69 list_t workers;
[a35b458]70
[8a64e81e]71 /* Number of work items queued. */
72 size_t item_cnt;
[a35b458]73
[8a64e81e]74 /* Indicates the work queue is shutting down. */
75 bool stopping;
76 const char *name;
77
78 /* Total number of created worker threads. */
79 size_t cur_worker_cnt;
80 /* Number of workers waiting for work to arrive. */
81 size_t idle_worker_cnt;
82 /* Number of idle workers signaled that have not yet been woken up. */
83 size_t activate_pending;
84 /* Number of blocked workers sleeping in work func() (ie not idle). */
85 size_t blocked_worker_cnt;
[a35b458]86
[8a64e81e]87 /* Number of pending signal_worker_op() operations. */
88 size_t pending_op_cnt;
[a35b458]89
[8a64e81e]90 link_t nb_link;
[a35b458]91
[0d56712]92#ifdef CONFIG_DEBUG
93 /* Magic cookie for integrity checks. Immutable. Accessed without lock. */
94 uint32_t cookie;
[1b20da0]95#endif
[8a64e81e]96};
97
98
99/** Min number of idle workers to keep. */
100static size_t min_worker_cnt;
101/** Max total number of workers - be it blocked, idle, or active. */
102static size_t max_worker_cnt;
103/** Max number of concurrently running active workers, ie not blocked nor idle. */
104static size_t max_concurrent_workers;
105/** Max number of work items per active worker before a new worker is activated.*/
106static const size_t max_items_per_worker = 8;
[a35b458]107
[8a64e81e]108/** System wide work queue. */
109static struct work_queue g_work_queue;
110
111static int booting = true;
112
113
114typedef struct {
115 IRQ_SPINLOCK_DECLARE(lock);
116 condvar_t req_cv;
117 thread_t *thread;
118 list_t work_queues;
119} nonblock_adder_t;
120
121static nonblock_adder_t nonblock_adder;
122
123
124
125/** Typedef a worker thread signaling operation prototype. */
126typedef void (*signal_op_t)(struct work_queue *workq);
127
128
129/* Fwd decl. */
130static void workq_preinit(struct work_queue *workq, const char *name);
131static bool add_worker(struct work_queue *workq);
132static void interrupt_workers(struct work_queue *workq);
133static void wait_for_workers(struct work_queue *workq);
[1b20da0]134static int _workq_enqueue(struct work_queue *workq, work_t *work_item,
[3bacee1]135 work_func_t func, bool can_block);
[0d56712]136static void init_work_item(work_t *work_item, work_func_t func);
[8a64e81e]137static signal_op_t signal_worker_logic(struct work_queue *workq, bool can_block);
138static void worker_thread(void *arg);
139static bool dequeue_work(struct work_queue *workq, work_t **pwork_item);
140static bool worker_unnecessary(struct work_queue *workq);
141static void cv_wait(struct work_queue *workq);
142static void nonblock_init(void);
[04552324]143
144#ifdef CONFIG_DEBUG
[0d56712]145static bool workq_corrupted(struct work_queue *workq);
146static bool work_item_corrupted(work_t *work_item);
[04552324]147#endif
[8a64e81e]148
149/** Creates worker thread for the system-wide worker queue. */
150void workq_global_worker_init(void)
151{
[1b20da0]152 /*
153 * No need for additional synchronization. Stores to word-sized
[8a64e81e]154 * variables are atomic and the change will eventually propagate.
155 * Moreover add_worker() includes the necessary memory barriers
156 * in spinlock lock/unlock().
157 */
158 booting = false;
[a35b458]159
[8a64e81e]160 nonblock_init();
[a35b458]161
[8a64e81e]162 if (!add_worker(&g_work_queue))
163 panic("Could not create a single global work queue worker!\n");
[a35b458]164
[8a64e81e]165}
166
167/** Initializes the system wide work queue and support for other work queues. */
168void workq_global_init(void)
169{
170 /* Keep idle workers on 1/4-th of cpus, but at least 2 threads. */
171 min_worker_cnt = max(2, config.cpu_count / 4);
172 /* Allow max 8 sleeping work items per cpu. */
173 max_worker_cnt = max(32, 8 * config.cpu_count);
174 /* Maximum concurrency without slowing down the system. */
175 max_concurrent_workers = max(2, config.cpu_count);
[a35b458]176
[8a64e81e]177 workq_preinit(&g_work_queue, "kworkq");
178}
179
180/** Stops the system global work queue and waits for all work items to complete.*/
181void workq_global_stop(void)
182{
183 workq_stop(&g_work_queue);
184}
185
186/** Creates and initializes a work queue. Returns NULL upon failure. */
[3bacee1]187struct work_queue *workq_create(const char *name)
[8a64e81e]188{
[11b285d]189 struct work_queue *workq = malloc(sizeof(struct work_queue));
[7473807]190 if (!workq)
191 return NULL;
[a35b458]192
[8a64e81e]193 if (workq) {
194 if (workq_init(workq, name)) {
[63e27ef]195 assert(!workq_corrupted(workq));
[8a64e81e]196 return workq;
197 }
[a35b458]198
[8a64e81e]199 free(workq);
200 }
[a35b458]201
[8a64e81e]202 return NULL;
203}
204
205/** Frees work queue resources and stops it if it had not been done so already.*/
206void workq_destroy(struct work_queue *workq)
207{
[63e27ef]208 assert(!workq_corrupted(workq));
[a35b458]209
[8a64e81e]210 irq_spinlock_lock(&workq->lock, true);
211 bool stopped = workq->stopping;
[04552324]212#ifdef CONFIG_DEBUG
[8a64e81e]213 size_t running_workers = workq->cur_worker_cnt;
[04552324]214#endif
[8a64e81e]215 irq_spinlock_unlock(&workq->lock, true);
[a35b458]216
[8a64e81e]217 if (!stopped) {
218 workq_stop(workq);
219 } else {
[63e27ef]220 assert(0 == running_workers);
[8a64e81e]221 }
[a35b458]222
[0d56712]223#ifdef CONFIG_DEBUG
224 workq->cookie = 0;
[1b20da0]225#endif
[a35b458]226
[8a64e81e]227 free(workq);
228}
229
230/** Initializes workq structure without creating any workers. */
231static void workq_preinit(struct work_queue *workq, const char *name)
232{
[0d56712]233#ifdef CONFIG_DEBUG
234 workq->cookie = WORKQ_MAGIC;
[1b20da0]235#endif
[a35b458]236
[8a64e81e]237 irq_spinlock_initialize(&workq->lock, name);
238 condvar_initialize(&workq->activate_worker);
[a35b458]239
[8a64e81e]240 list_initialize(&workq->queue);
241 list_initialize(&workq->workers);
[a35b458]242
[8a64e81e]243 workq->item_cnt = 0;
244 workq->stopping = false;
245 workq->name = name;
[a35b458]246
[8a64e81e]247 workq->cur_worker_cnt = 1;
248 workq->idle_worker_cnt = 0;
249 workq->activate_pending = 0;
250 workq->blocked_worker_cnt = 0;
[a35b458]251
[8a64e81e]252 workq->pending_op_cnt = 0;
253 link_initialize(&workq->nb_link);
254}
255
[1b20da0]256/** Initializes a work queue. Returns true if successful.
257 *
[8a64e81e]258 * Before destroying a work queue it must be stopped via
259 * workq_stop().
260 */
[89ea2dc]261bool workq_init(struct work_queue *workq, const char *name)
[8a64e81e]262{
263 workq_preinit(workq, name);
264 return add_worker(workq);
265}
266
267/** Add a new worker thread. Returns false if the thread could not be created. */
268static bool add_worker(struct work_queue *workq)
269{
[63e27ef]270 assert(!workq_corrupted(workq));
[0d56712]271
[1b20da0]272 thread_t *thread = thread_create(worker_thread, workq, TASK,
[3bacee1]273 THREAD_FLAG_NONE, workq->name);
[a35b458]274
[8a64e81e]275 if (!thread) {
276 irq_spinlock_lock(&workq->lock, true);
[a35b458]277
[8a64e81e]278 /* cur_worker_cnt proactively increased in signal_worker_logic() .*/
[63e27ef]279 assert(0 < workq->cur_worker_cnt);
[8a64e81e]280 --workq->cur_worker_cnt;
[a35b458]281
[8a64e81e]282 irq_spinlock_unlock(&workq->lock, true);
283 return false;
284 }
[a35b458]285
[8a64e81e]286 /* Respect lock ordering. */
287 irq_spinlock_lock(&thread->lock, true);
288 irq_spinlock_lock(&workq->lock, false);
289
290 bool success;
291
292 if (!workq->stopping) {
293 success = true;
[a35b458]294
[8a64e81e]295 /* Try to distribute workers among cpus right away. */
296 unsigned int cpu_id = (workq->cur_worker_cnt) % config.cpu_active;
[a35b458]297
[8a64e81e]298 if (!cpus[cpu_id].active)
299 cpu_id = CPU->id;
300
[1b20da0]301 thread->workq = workq;
[8a64e81e]302 thread->cpu = &cpus[cpu_id];
303 thread->workq_blocked = false;
304 thread->workq_idling = false;
305 link_initialize(&thread->workq_link);
306
307 list_append(&thread->workq_link, &workq->workers);
308 } else {
[1b20da0]309 /*
[8a64e81e]310 * Work queue is shutting down - we must not add the worker
311 * and we cannot destroy it without ready-ing it. Mark it
312 * interrupted so the worker exits right away without even
313 * touching workq.
314 */
315 success = false;
[a35b458]316
[8a64e81e]317 /* cur_worker_cnt proactively increased in signal_worker() .*/
[63e27ef]318 assert(0 < workq->cur_worker_cnt);
[8a64e81e]319 --workq->cur_worker_cnt;
320 }
[a35b458]321
[8a64e81e]322 irq_spinlock_unlock(&workq->lock, false);
323 irq_spinlock_unlock(&thread->lock, true);
324
325 if (!success) {
326 thread_interrupt(thread);
327 }
[a35b458]328
[8a64e81e]329 thread_ready(thread);
[a35b458]330
[8a64e81e]331 return success;
332}
333
[1b20da0]334/** Shuts down the work queue. Waits for all pending work items to complete.
[8a64e81e]335 *
[1b20da0]336 * workq_stop() may only be run once.
[8a64e81e]337 */
338void workq_stop(struct work_queue *workq)
339{
[63e27ef]340 assert(!workq_corrupted(workq));
[a35b458]341
[8a64e81e]342 interrupt_workers(workq);
343 wait_for_workers(workq);
344}
345
346/** Notifies worker threads the work queue is shutting down. */
347static void interrupt_workers(struct work_queue *workq)
348{
349 irq_spinlock_lock(&workq->lock, true);
350
351 /* workq_stop() may only be called once. */
[63e27ef]352 assert(!workq->stopping);
[8a64e81e]353 workq->stopping = true;
[a35b458]354
[8a64e81e]355 /* Respect lock ordering - do not hold workq->lock during broadcast. */
356 irq_spinlock_unlock(&workq->lock, true);
[a35b458]357
[8a64e81e]358 condvar_broadcast(&workq->activate_worker);
359}
360
361/** Waits for all worker threads to exit. */
362static void wait_for_workers(struct work_queue *workq)
363{
[63e27ef]364 assert(!PREEMPTION_DISABLED);
[a35b458]365
[8a64e81e]366 irq_spinlock_lock(&workq->lock, true);
[a35b458]367
[8a64e81e]368 list_foreach_safe(workq->workers, cur_worker, next_worker) {
369 thread_t *worker = list_get_instance(cur_worker, thread_t, workq_link);
370 list_remove(cur_worker);
371
372 /* Wait without the lock. */
373 irq_spinlock_unlock(&workq->lock, true);
[a35b458]374
[8a64e81e]375 thread_join(worker);
376 thread_detach(worker);
[a35b458]377
[8a64e81e]378 irq_spinlock_lock(&workq->lock, true);
379 }
[a35b458]380
[63e27ef]381 assert(list_empty(&workq->workers));
[a35b458]382
[8a64e81e]383 /* Wait for deferred add_worker_op(), signal_worker_op() to finish. */
384 while (0 < workq->cur_worker_cnt || 0 < workq->pending_op_cnt) {
385 irq_spinlock_unlock(&workq->lock, true);
[a35b458]386
[8a64e81e]387 scheduler();
[a35b458]388
[8a64e81e]389 irq_spinlock_lock(&workq->lock, true);
390 }
[a35b458]391
[8a64e81e]392 irq_spinlock_unlock(&workq->lock, true);
393}
394
[1b20da0]395/** Queues a function into the global wait queue without blocking.
396 *
[8a64e81e]397 * See workq_enqueue_noblock() for more details.
398 */
[89ea2dc]399bool workq_global_enqueue_noblock(work_t *work_item, work_func_t func)
[8a64e81e]400{
401 return workq_enqueue_noblock(&g_work_queue, work_item, func);
402}
403
[1b20da0]404/** Queues a function into the global wait queue; may block.
405 *
[8a64e81e]406 * See workq_enqueue() for more details.
407 */
[89ea2dc]408bool workq_global_enqueue(work_t *work_item, work_func_t func)
[8a64e81e]409{
410 return workq_enqueue(&g_work_queue, work_item, func);
411}
412
[1b20da0]413/** Adds a function to be invoked in a separate thread without blocking.
414 *
415 * workq_enqueue_noblock() is guaranteed not to block. It is safe
[8a64e81e]416 * to invoke from interrupt handlers.
[1b20da0]417 *
[8a64e81e]418 * Consider using workq_enqueue() instead if at all possible. Otherwise,
[1b20da0]419 * your work item may have to wait for previously enqueued sleeping
[8a64e81e]420 * work items to complete if you are unlucky.
[1b20da0]421 *
[8a64e81e]422 * @param workq Work queue where to queue the work item.
423 * @param work_item Work item bookkeeping structure. Must be valid
424 * until func() is entered.
425 * @param func User supplied function to invoke in a worker thread.
[7c3fb9b]426 *
[1b20da0]427 * @return false if work queue is shutting down; function is not
428 * queued for further processing.
[8a64e81e]429 * @return true Otherwise. func() will be invoked in a separate thread.
430 */
[1b20da0]431bool workq_enqueue_noblock(struct work_queue *workq, work_t *work_item,
[3bacee1]432 work_func_t func)
[8a64e81e]433{
434 return _workq_enqueue(workq, work_item, func, false);
435}
436
[1b20da0]437/** Adds a function to be invoked in a separate thread; may block.
438 *
439 * While the workq_enqueue() is unlikely to block, it may do so if too
[8a64e81e]440 * many previous work items blocked sleeping.
[1b20da0]441 *
[8a64e81e]442 * @param workq Work queue where to queue the work item.
443 * @param work_item Work item bookkeeping structure. Must be valid
444 * until func() is entered.
445 * @param func User supplied function to invoke in a worker thread.
[7c3fb9b]446 *
[1b20da0]447 * @return false if work queue is shutting down; function is not
448 * queued for further processing.
[8a64e81e]449 * @return true Otherwise. func() will be invoked in a separate thread.
450 */
[89ea2dc]451bool workq_enqueue(struct work_queue *workq, work_t *work_item, work_func_t func)
[8a64e81e]452{
453 return _workq_enqueue(workq, work_item, func, true);
454}
455
456/** Adds a work item that will be processed by a separate worker thread.
[1b20da0]457 *
458 * func() will be invoked in another kernel thread and may block.
459 *
[8a64e81e]460 * Prefer to call _workq_enqueue() with can_block set. Otherwise
461 * your work item may have to wait for sleeping work items to complete.
462 * If all worker threads are blocked/sleeping a new worker thread cannot
463 * be create without can_block set because creating a thread might
464 * block due to low memory conditions.
[1b20da0]465 *
[8a64e81e]466 * @param workq Work queue where to queue the work item.
467 * @param work_item Work item bookkeeping structure. Must be valid
468 * until func() is entered.
469 * @param func User supplied function to invoke in a worker thread.
470 * @param can_block May adding this work item block?
[7c3fb9b]471 *
[1b20da0]472 * @return false if work queue is shutting down; function is not
473 * queued for further processing.
[8a64e81e]474 * @return true Otherwise.
475 */
[1b20da0]476static int _workq_enqueue(struct work_queue *workq, work_t *work_item,
[3bacee1]477 work_func_t func, bool can_block)
[8a64e81e]478{
[63e27ef]479 assert(!workq_corrupted(workq));
[a35b458]480
[8a64e81e]481 bool success = true;
482 signal_op_t signal_op = NULL;
[a35b458]483
[8a64e81e]484 irq_spinlock_lock(&workq->lock, true);
[a35b458]485
[8a64e81e]486 if (workq->stopping) {
487 success = false;
488 } else {
[0d56712]489 init_work_item(work_item, func);
[8a64e81e]490 list_append(&work_item->queue_link, &workq->queue);
491 ++workq->item_cnt;
492 success = true;
[a35b458]493
[8a64e81e]494 if (!booting) {
495 signal_op = signal_worker_logic(workq, can_block);
496 } else {
[1b20da0]497 /*
498 * During boot there are no workers to signal. Just queue
[8a64e81e]499 * the work and let future workers take care of it.
500 */
501 }
502 }
[a35b458]503
[8a64e81e]504 irq_spinlock_unlock(&workq->lock, true);
505
506 if (signal_op) {
507 signal_op(workq);
508 }
[a35b458]509
[8a64e81e]510 return success;
511}
512
[0d56712]513/** Prepare an item to be added to the work item queue. */
514static void init_work_item(work_t *work_item, work_func_t func)
515{
516#ifdef CONFIG_DEBUG
517 work_item->cookie = WORK_ITEM_MAGIC;
[1b20da0]518#endif
[a35b458]519
[0d56712]520 link_initialize(&work_item->queue_link);
521 work_item->func = func;
522}
523
[8a64e81e]524/** Returns the number of workers running work func() that are not blocked. */
525static size_t active_workers_now(struct work_queue *workq)
526{
[63e27ef]527 assert(irq_spinlock_locked(&workq->lock));
[a35b458]528
[8a64e81e]529 /* Workers blocked are sleeping in the work function (ie not idle). */
[63e27ef]530 assert(workq->blocked_worker_cnt <= workq->cur_worker_cnt);
[8a64e81e]531 /* Idle workers are waiting for more work to arrive in condvar_wait. */
[63e27ef]532 assert(workq->idle_worker_cnt <= workq->cur_worker_cnt);
[a35b458]533
[8a64e81e]534 /* Idle + blocked workers == sleeping worker threads. */
535 size_t sleeping_workers = workq->blocked_worker_cnt + workq->idle_worker_cnt;
[a35b458]536
[63e27ef]537 assert(sleeping_workers <= workq->cur_worker_cnt);
[8a64e81e]538 /* Workers pending activation are idle workers not yet given a time slice. */
[63e27ef]539 assert(workq->activate_pending <= workq->idle_worker_cnt);
[a35b458]540
[1b20da0]541 /*
542 * Workers actively running the work func() this very moment and
543 * are neither blocked nor idle. Exclude ->activate_pending workers
544 * since they will run their work func() once they get a time slice
[8a64e81e]545 * and are not running it right now.
546 */
547 return workq->cur_worker_cnt - sleeping_workers;
548}
549
[1b20da0]550/**
551 * Returns the number of workers that are running or are about to run work
552 * func() and that are not blocked.
[8a64e81e]553 */
554static size_t active_workers(struct work_queue *workq)
555{
[63e27ef]556 assert(irq_spinlock_locked(&workq->lock));
[a35b458]557
[1b20da0]558 /*
559 * Workers actively running the work func() and are neither blocked nor
[8a64e81e]560 * idle. ->activate_pending workers will run their work func() once they
561 * get a time slice after waking from a condvar wait, so count them
562 * as well.
563 */
564 return active_workers_now(workq) + workq->activate_pending;
565}
566
567static void add_worker_noblock_op(struct work_queue *workq)
568{
569 condvar_signal(&nonblock_adder.req_cv);
570}
571
572static void add_worker_op(struct work_queue *workq)
573{
574 add_worker(workq);
575}
576
577static void signal_worker_op(struct work_queue *workq)
578{
[63e27ef]579 assert(!workq_corrupted(workq));
[0d56712]580
[8a64e81e]581 condvar_signal(&workq->activate_worker);
[a35b458]582
[8a64e81e]583 irq_spinlock_lock(&workq->lock, true);
[63e27ef]584 assert(0 < workq->pending_op_cnt);
[8a64e81e]585 --workq->pending_op_cnt;
586 irq_spinlock_unlock(&workq->lock, true);
587}
588
589/** Determines how to signal workers if at all.
[1b20da0]590 *
[8a64e81e]591 * @param workq Work queue where a new work item was queued.
[1b20da0]592 * @param can_block True if we may block while signaling a worker or creating
[8a64e81e]593 * a new worker.
[1b20da0]594 *
[8a64e81e]595 * @return Function that will notify workers or NULL if no action is needed.
596 */
597static signal_op_t signal_worker_logic(struct work_queue *workq, bool can_block)
598{
[63e27ef]599 assert(!workq_corrupted(workq));
600 assert(irq_spinlock_locked(&workq->lock));
[a35b458]601
[8a64e81e]602 /* Only signal workers if really necessary. */
603 signal_op_t signal_op = NULL;
604
[1b20da0]605 /*
606 * Workers actively running the work func() and neither blocked nor idle.
607 * Including ->activate_pending workers that will run their work func()
[8a64e81e]608 * once they get a time slice.
609 */
610 size_t active = active_workers(workq);
611 /* Max total allowed number of work items queued for active workers. */
612 size_t max_load = active * max_items_per_worker;
613
614 /* Active workers are getting overwhelmed - activate another. */
615 if (max_load < workq->item_cnt) {
616
[1b20da0]617 size_t remaining_idle =
[3bacee1]618 workq->idle_worker_cnt - workq->activate_pending;
[8a64e81e]619
620 /* Idle workers still exist - activate one. */
621 if (remaining_idle > 0) {
[1b20da0]622 /*
[8a64e81e]623 * Directly changing idle_worker_cnt here would not allow
[1b20da0]624 * workers to recognize spurious wake-ups. Change
[8a64e81e]625 * activate_pending instead.
626 */
627 ++workq->activate_pending;
628 ++workq->pending_op_cnt;
629 signal_op = signal_worker_op;
630 } else {
631 /* No idle workers remain. Request that a new one be created. */
[3bacee1]632 bool need_worker = (active < max_concurrent_workers) &&
633 (workq->cur_worker_cnt < max_worker_cnt);
[a35b458]634
[8a64e81e]635 if (need_worker && can_block) {
636 signal_op = add_worker_op;
[1b20da0]637 /*
[8a64e81e]638 * It may take some time to actually create the worker.
639 * We don't want to swamp the thread pool with superfluous
640 * worker creation requests so pretend it was already
641 * created and proactively increase the worker count.
642 */
643 ++workq->cur_worker_cnt;
644 }
[a35b458]645
[1b20da0]646 /*
[8a64e81e]647 * We cannot create a new worker but we need one desperately
648 * because all workers are blocked in their work functions.
649 */
650 if (need_worker && !can_block && 0 == active) {
[63e27ef]651 assert(0 == workq->idle_worker_cnt);
[a35b458]652
[8a64e81e]653 irq_spinlock_lock(&nonblock_adder.lock, true);
654
655 if (nonblock_adder.thread && !link_used(&workq->nb_link)) {
656 signal_op = add_worker_noblock_op;
657 ++workq->cur_worker_cnt;
658 list_append(&workq->nb_link, &nonblock_adder.work_queues);
659 }
660
661 irq_spinlock_unlock(&nonblock_adder.lock, true);
662 }
663 }
664 } else {
[1b20da0]665 /*
666 * There are enough active/running workers to process the queue.
[8a64e81e]667 * No need to signal/activate any new workers.
668 */
669 signal_op = NULL;
670 }
[a35b458]671
[8a64e81e]672 return signal_op;
673}
674
675/** Executes queued work items. */
676static void worker_thread(void *arg)
677{
[1b20da0]678 /*
679 * The thread has been created after the work queue was ordered to stop.
680 * Do not access the work queue and return immediately.
[8a64e81e]681 */
682 if (thread_interrupted(THREAD)) {
683 thread_detach(THREAD);
684 return;
685 }
[a35b458]686
[63e27ef]687 assert(arg != NULL);
[a35b458]688
[8a64e81e]689 struct work_queue *workq = arg;
690 work_t *work_item;
[a35b458]691
[8a64e81e]692 while (dequeue_work(workq, &work_item)) {
693 /* Copy the func field so func() can safely free work_item. */
694 work_func_t func = work_item->func;
[0d56712]695
[8a64e81e]696 func(work_item);
697 }
698}
699
700/** Waits and retrieves a work item. Returns false if the worker should exit. */
701static bool dequeue_work(struct work_queue *workq, work_t **pwork_item)
702{
[63e27ef]703 assert(!workq_corrupted(workq));
[a35b458]704
[8a64e81e]705 irq_spinlock_lock(&workq->lock, true);
[a35b458]706
[8a64e81e]707 /* Check if we should exit if load is low. */
708 if (!workq->stopping && worker_unnecessary(workq)) {
709 /* There are too many workers for this load. Exit. */
[63e27ef]710 assert(0 < workq->cur_worker_cnt);
[8a64e81e]711 --workq->cur_worker_cnt;
712 list_remove(&THREAD->workq_link);
713 irq_spinlock_unlock(&workq->lock, true);
[a35b458]714
[8a64e81e]715 thread_detach(THREAD);
716 return false;
717 }
[a35b458]718
[8a64e81e]719 bool stop = false;
[a35b458]720
[8a64e81e]721 /* Wait for work to arrive. */
722 while (list_empty(&workq->queue) && !workq->stopping) {
723 cv_wait(workq);
[a35b458]724
[8a64e81e]725 if (0 < workq->activate_pending)
726 --workq->activate_pending;
727 }
728
729 /* Process remaining work even if requested to stop. */
730 if (!list_empty(&workq->queue)) {
731 link_t *work_link = list_first(&workq->queue);
732 *pwork_item = list_get_instance(work_link, work_t, queue_link);
[a35b458]733
[0d56712]734#ifdef CONFIG_DEBUG
[63e27ef]735 assert(!work_item_corrupted(*pwork_item));
[0d56712]736 (*pwork_item)->cookie = 0;
737#endif
[8a64e81e]738 list_remove(work_link);
739 --workq->item_cnt;
[a35b458]740
[8a64e81e]741 stop = false;
742 } else {
743 /* Requested to stop and no more work queued. */
[63e27ef]744 assert(workq->stopping);
[8a64e81e]745 --workq->cur_worker_cnt;
746 stop = true;
747 }
[a35b458]748
[8a64e81e]749 irq_spinlock_unlock(&workq->lock, true);
[a35b458]750
[8a64e81e]751 return !stop;
752}
753
754/** Returns true if for the given load there are too many workers. */
755static bool worker_unnecessary(struct work_queue *workq)
756{
[63e27ef]757 assert(irq_spinlock_locked(&workq->lock));
[a35b458]758
[8a64e81e]759 /* No work is pending. We don't need too many idle threads. */
760 if (list_empty(&workq->queue)) {
761 /* There are too many idle workers. Exit. */
762 return (min_worker_cnt <= workq->idle_worker_cnt);
763 } else {
[1b20da0]764 /*
[8a64e81e]765 * There is work but we are swamped with too many active workers
766 * that were woken up from sleep at around the same time. We
767 * don't need another worker fighting for cpu time.
768 */
769 size_t active = active_workers_now(workq);
770 return (max_concurrent_workers < active);
771 }
772}
773
774/** Waits for a signal to activate_worker. Thread marked idle while waiting. */
775static void cv_wait(struct work_queue *workq)
776{
777 ++workq->idle_worker_cnt;
778 THREAD->workq_idling = true;
[a35b458]779
[8a64e81e]780 /* Ignore lock ordering just here. */
[63e27ef]781 assert(irq_spinlock_locked(&workq->lock));
[a35b458]782
[8a64e81e]783 _condvar_wait_timeout_irq_spinlock(&workq->activate_worker,
[3bacee1]784 &workq->lock, SYNCH_NO_TIMEOUT, SYNCH_FLAGS_NONE);
[8a64e81e]785
[63e27ef]786 assert(!workq_corrupted(workq));
787 assert(irq_spinlock_locked(&workq->lock));
[a35b458]788
[8a64e81e]789 THREAD->workq_idling = false;
790 --workq->idle_worker_cnt;
791}
792
793
794/** Invoked from thread_ready() right before the thread is woken up. */
795void workq_before_thread_is_ready(thread_t *thread)
796{
[63e27ef]797 assert(thread);
798 assert(irq_spinlock_locked(&thread->lock));
[8a64e81e]799
800 /* Worker's work func() is about to wake up from sleeping. */
801 if (thread->workq && thread->workq_blocked) {
802 /* Must be blocked in user work func() and not be waiting for work. */
[63e27ef]803 assert(!thread->workq_idling);
804 assert(thread->state == Sleeping);
805 assert(THREAD != thread);
806 assert(!workq_corrupted(thread->workq));
[a35b458]807
[8a64e81e]808 /* Protected by thread->lock */
809 thread->workq_blocked = false;
[a35b458]810
[8a64e81e]811 irq_spinlock_lock(&thread->workq->lock, true);
812 --thread->workq->blocked_worker_cnt;
813 irq_spinlock_unlock(&thread->workq->lock, true);
814 }
815}
816
817/** Invoked from scheduler() before switching away from a thread. */
818void workq_after_thread_ran(void)
819{
[63e27ef]820 assert(THREAD);
821 assert(irq_spinlock_locked(&THREAD->lock));
[8a64e81e]822
823 /* Worker's work func() is about to sleep/block. */
824 if (THREAD->workq && THREAD->state == Sleeping && !THREAD->workq_idling) {
[63e27ef]825 assert(!THREAD->workq_blocked);
826 assert(!workq_corrupted(THREAD->workq));
[a35b458]827
[8a64e81e]828 THREAD->workq_blocked = true;
[a35b458]829
[8a64e81e]830 irq_spinlock_lock(&THREAD->workq->lock, false);
831
832 ++THREAD->workq->blocked_worker_cnt;
[a35b458]833
[8a64e81e]834 bool can_block = false;
835 signal_op_t op = signal_worker_logic(THREAD->workq, can_block);
[a35b458]836
[8a64e81e]837 irq_spinlock_unlock(&THREAD->workq->lock, false);
[a35b458]838
[8a64e81e]839 if (op) {
[63e27ef]840 assert(add_worker_noblock_op == op || signal_worker_op == op);
[8a64e81e]841 op(THREAD->workq);
842 }
843 }
844}
845
846/** Prints stats of the work queue to the kernel console. */
847void workq_print_info(struct work_queue *workq)
848{
849 irq_spinlock_lock(&workq->lock, true);
850
851 size_t total = workq->cur_worker_cnt;
852 size_t blocked = workq->blocked_worker_cnt;
853 size_t idle = workq->idle_worker_cnt;
854 size_t active = active_workers(workq);
855 size_t items = workq->item_cnt;
856 bool stopping = workq->stopping;
857 bool worker_surplus = worker_unnecessary(workq);
[1b20da0]858 const char *load_str = worker_surplus ? "decreasing" :
[3bacee1]859 (0 < workq->activate_pending) ? "increasing" : "stable";
[a35b458]860
[8a64e81e]861 irq_spinlock_unlock(&workq->lock, true);
[a35b458]862
[8a64e81e]863 printf(
[3bacee1]864 "Configuration: max_worker_cnt=%zu, min_worker_cnt=%zu,\n"
865 " max_concurrent_workers=%zu, max_items_per_worker=%zu\n"
866 "Workers: %zu\n"
867 "Active: %zu (workers currently processing work)\n"
868 "Blocked: %zu (work functions sleeping/blocked)\n"
869 "Idle: %zu (idle workers waiting for more work)\n"
870 "Items: %zu (queued not yet dispatched work)\n"
871 "Stopping: %d\n"
872 "Load: %s\n",
873 max_worker_cnt, min_worker_cnt,
874 max_concurrent_workers, max_items_per_worker,
875 total,
876 active,
877 blocked,
878 idle,
879 items,
880 stopping,
881 load_str);
[8a64e81e]882}
883
884/** Prints stats of the global work queue. */
885void workq_global_print_info(void)
886{
887 workq_print_info(&g_work_queue);
888}
889
890
891static bool dequeue_add_req(nonblock_adder_t *info, struct work_queue **pworkq)
892{
893 bool stop = false;
894
895 irq_spinlock_lock(&info->lock, true);
[a35b458]896
[8a64e81e]897 while (list_empty(&info->work_queues) && !stop) {
[1b20da0]898 errno_t ret = _condvar_wait_timeout_irq_spinlock(&info->req_cv,
[3bacee1]899 &info->lock, SYNCH_NO_TIMEOUT, SYNCH_FLAGS_INTERRUPTIBLE);
[a35b458]900
[897fd8f1]901 stop = (ret == EINTR);
[8a64e81e]902 }
[a35b458]903
[8a64e81e]904 if (!stop) {
[1b20da0]905 *pworkq = list_get_instance(list_first(&info->work_queues),
[3bacee1]906 struct work_queue, nb_link);
[0d56712]907
[63e27ef]908 assert(!workq_corrupted(*pworkq));
[a35b458]909
[8a64e81e]910 list_remove(&(*pworkq)->nb_link);
911 }
[a35b458]912
[8a64e81e]913 irq_spinlock_unlock(&info->lock, true);
[a35b458]914
[8a64e81e]915 return !stop;
916}
917
918static void thr_nonblock_add_worker(void *arg)
919{
920 nonblock_adder_t *info = arg;
[0a4667a7]921 struct work_queue *workq = NULL;
[a35b458]922
[8a64e81e]923 while (dequeue_add_req(info, &workq)) {
924 add_worker(workq);
925 }
926}
927
928
929static void nonblock_init(void)
930{
[0d56712]931 irq_spinlock_initialize(&nonblock_adder.lock, "kworkq-nb.lock");
[8a64e81e]932 condvar_initialize(&nonblock_adder.req_cv);
933 list_initialize(&nonblock_adder.work_queues);
[a35b458]934
[1b20da0]935 nonblock_adder.thread = thread_create(thr_nonblock_add_worker,
[3bacee1]936 &nonblock_adder, TASK, THREAD_FLAG_NONE, "kworkq-nb");
[a35b458]937
[8a64e81e]938 if (nonblock_adder.thread) {
939 thread_ready(nonblock_adder.thread);
940 } else {
[1b20da0]941 /*
[8a64e81e]942 * We won't be able to add workers without blocking if all workers
943 * sleep, but at least boot the system.
944 */
[0d56712]945 printf("Failed to create kworkq-nb. Sleeping work may stall the workq.\n");
[8a64e81e]946 }
947}
948
[04552324]949#ifdef CONFIG_DEBUG
[1b20da0]950/** Returns true if the workq is definitely corrupted; false if not sure.
951 *
[0d56712]952 * Can be used outside of any locks.
953 */
954static bool workq_corrupted(struct work_queue *workq)
955{
[1b20da0]956 /*
[0d56712]957 * Needed to make the most current cookie value set by workq_preinit()
958 * visible even if we access the workq right after it is created but
959 * on a different cpu. Otherwise, workq_corrupted() would not work
960 * outside a lock.
961 */
962 memory_barrier();
963 return NULL == workq || workq->cookie != WORKQ_MAGIC;
964}
[8a64e81e]965
[1b20da0]966/** Returns true if the work_item is definitely corrupted; false if not sure.
967 *
[0d56712]968 * Must be used with the work queue protecting spinlock locked.
969 */
970static bool work_item_corrupted(work_t *work_item)
971{
972 return NULL == work_item || work_item->cookie != WORK_ITEM_MAGIC;
973}
[04552324]974#endif
[9f8745c5]975
976/** @}
977 */
Note: See TracBrowser for help on using the repository browser.