source: mainline/kernel/generic/src/synch/workqueue.c@ bf2042f9

lfn serial ticket/834-toolchain-update topic/msim-upgrade topic/simplify-dev-export
Last change on this file since bf2042f9 was a35b458, checked in by Jiří Zárevúcky <zarevucky.jiri@…>, 7 years ago

style: Remove trailing whitespace on _all_ lines, including empty ones, for particular file types.

Command used: tools/srepl '\s\+$' '' -- *.c *.h *.py *.sh *.s *.S *.ag

Currently, whitespace on empty lines is very inconsistent.
There are two basic choices: Either remove the whitespace, or keep empty lines
indented to the level of surrounding code. The former is AFAICT more common,
and also much easier to do automatically.

Alternatively, we could write script for automatic indentation, and use that
instead. However, if such a script exists, it's possible to use the indented
style locally, by having the editor apply relevant conversions on load/save,
without affecting remote repository. IMO, it makes more sense to adopt
the simpler rule.

  • Property mode set to 100644
File size: 27.0 KB
RevLine 
[9f8745c5]1/*
2 * Copyright (c) 2012 Adam Hraska
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 *
9 * - Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 * - Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
14 * - The name of the author may not be used to endorse or promote products
15 * derived from this software without specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
18 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
19 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
20 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
21 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
22 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
26 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27 */
28
29/** @addtogroup generic
30 * @{
31 */
32
33/**
34 * @file
35 * @brief Work queue/thread pool that automatically adjusts its size
36 * depending on the current load. Queued work functions may sleep..
37 */
38
[63e27ef]39#include <assert.h>
[897fd8f1]40#include <errno.h>
[8a64e81e]41#include <synch/workqueue.h>
42#include <synch/spinlock.h>
43#include <synch/condvar.h>
44#include <synch/mutex.h>
45#include <proc/thread.h>
46#include <config.h>
47#include <arch.h>
48#include <cpu.h>
49#include <macros.h>
50
[0d56712]51#define WORKQ_MAGIC 0xf00c1333U
52#define WORK_ITEM_MAGIC 0xfeec1777U
53
[8a64e81e]54
55struct work_queue {
[1b20da0]56 /*
57 * Protects everything except activate_worker.
[8a64e81e]58 * Must be acquired after any thread->locks.
59 */
60 IRQ_SPINLOCK_DECLARE(lock);
[a35b458]61
[8a64e81e]62 /* Activates a worker if new work arrives or if shutting down the queue. */
63 condvar_t activate_worker;
[a35b458]64
[8a64e81e]65 /* Queue of work_items ready to be dispatched. */
66 list_t queue;
[a35b458]67
[8a64e81e]68 /* List of worker threads. */
69 list_t workers;
[a35b458]70
[8a64e81e]71 /* Number of work items queued. */
72 size_t item_cnt;
[a35b458]73
[8a64e81e]74 /* Indicates the work queue is shutting down. */
75 bool stopping;
76 const char *name;
77
78 /* Total number of created worker threads. */
79 size_t cur_worker_cnt;
80 /* Number of workers waiting for work to arrive. */
81 size_t idle_worker_cnt;
82 /* Number of idle workers signaled that have not yet been woken up. */
83 size_t activate_pending;
84 /* Number of blocked workers sleeping in work func() (ie not idle). */
85 size_t blocked_worker_cnt;
[a35b458]86
[8a64e81e]87 /* Number of pending signal_worker_op() operations. */
88 size_t pending_op_cnt;
[a35b458]89
[8a64e81e]90 link_t nb_link;
[a35b458]91
[0d56712]92#ifdef CONFIG_DEBUG
93 /* Magic cookie for integrity checks. Immutable. Accessed without lock. */
94 uint32_t cookie;
[1b20da0]95#endif
[8a64e81e]96};
97
98
99/** Min number of idle workers to keep. */
100static size_t min_worker_cnt;
101/** Max total number of workers - be it blocked, idle, or active. */
102static size_t max_worker_cnt;
103/** Max number of concurrently running active workers, ie not blocked nor idle. */
104static size_t max_concurrent_workers;
105/** Max number of work items per active worker before a new worker is activated.*/
106static const size_t max_items_per_worker = 8;
[a35b458]107
[8a64e81e]108/** System wide work queue. */
109static struct work_queue g_work_queue;
110
111static int booting = true;
112
113
114typedef struct {
115 IRQ_SPINLOCK_DECLARE(lock);
116 condvar_t req_cv;
117 thread_t *thread;
118 list_t work_queues;
119} nonblock_adder_t;
120
121static nonblock_adder_t nonblock_adder;
122
123
124
125/** Typedef a worker thread signaling operation prototype. */
126typedef void (*signal_op_t)(struct work_queue *workq);
127
128
129/* Fwd decl. */
130static void workq_preinit(struct work_queue *workq, const char *name);
131static bool add_worker(struct work_queue *workq);
132static void interrupt_workers(struct work_queue *workq);
133static void wait_for_workers(struct work_queue *workq);
[1b20da0]134static int _workq_enqueue(struct work_queue *workq, work_t *work_item,
[8a64e81e]135 work_func_t func, bool can_block);
[0d56712]136static void init_work_item(work_t *work_item, work_func_t func);
[8a64e81e]137static signal_op_t signal_worker_logic(struct work_queue *workq, bool can_block);
138static void worker_thread(void *arg);
139static bool dequeue_work(struct work_queue *workq, work_t **pwork_item);
140static bool worker_unnecessary(struct work_queue *workq);
141static void cv_wait(struct work_queue *workq);
142static void nonblock_init(void);
[04552324]143
144#ifdef CONFIG_DEBUG
[0d56712]145static bool workq_corrupted(struct work_queue *workq);
146static bool work_item_corrupted(work_t *work_item);
[04552324]147#endif
[8a64e81e]148
149/** Creates worker thread for the system-wide worker queue. */
150void workq_global_worker_init(void)
151{
[1b20da0]152 /*
153 * No need for additional synchronization. Stores to word-sized
[8a64e81e]154 * variables are atomic and the change will eventually propagate.
155 * Moreover add_worker() includes the necessary memory barriers
156 * in spinlock lock/unlock().
157 */
158 booting = false;
[a35b458]159
[8a64e81e]160 nonblock_init();
[a35b458]161
[8a64e81e]162 if (!add_worker(&g_work_queue))
163 panic("Could not create a single global work queue worker!\n");
[a35b458]164
[8a64e81e]165}
166
167/** Initializes the system wide work queue and support for other work queues. */
168void workq_global_init(void)
169{
170 /* Keep idle workers on 1/4-th of cpus, but at least 2 threads. */
171 min_worker_cnt = max(2, config.cpu_count / 4);
172 /* Allow max 8 sleeping work items per cpu. */
173 max_worker_cnt = max(32, 8 * config.cpu_count);
174 /* Maximum concurrency without slowing down the system. */
175 max_concurrent_workers = max(2, config.cpu_count);
[a35b458]176
[8a64e81e]177 workq_preinit(&g_work_queue, "kworkq");
178}
179
180/** Stops the system global work queue and waits for all work items to complete.*/
181void workq_global_stop(void)
182{
183 workq_stop(&g_work_queue);
184}
185
186/** Creates and initializes a work queue. Returns NULL upon failure. */
187struct work_queue * workq_create(const char *name)
188{
189 struct work_queue *workq = malloc(sizeof(struct work_queue), 0);
[a35b458]190
[8a64e81e]191 if (workq) {
192 if (workq_init(workq, name)) {
[63e27ef]193 assert(!workq_corrupted(workq));
[8a64e81e]194 return workq;
195 }
[a35b458]196
[8a64e81e]197 free(workq);
198 }
[a35b458]199
[8a64e81e]200 return NULL;
201}
202
203/** Frees work queue resources and stops it if it had not been done so already.*/
204void workq_destroy(struct work_queue *workq)
205{
[63e27ef]206 assert(!workq_corrupted(workq));
[a35b458]207
[8a64e81e]208 irq_spinlock_lock(&workq->lock, true);
209 bool stopped = workq->stopping;
[04552324]210#ifdef CONFIG_DEBUG
[8a64e81e]211 size_t running_workers = workq->cur_worker_cnt;
[04552324]212#endif
[8a64e81e]213 irq_spinlock_unlock(&workq->lock, true);
[a35b458]214
[8a64e81e]215 if (!stopped) {
216 workq_stop(workq);
217 } else {
[63e27ef]218 assert(0 == running_workers);
[8a64e81e]219 }
[a35b458]220
[0d56712]221#ifdef CONFIG_DEBUG
222 workq->cookie = 0;
[1b20da0]223#endif
[a35b458]224
[8a64e81e]225 free(workq);
226}
227
228/** Initializes workq structure without creating any workers. */
229static void workq_preinit(struct work_queue *workq, const char *name)
230{
[0d56712]231#ifdef CONFIG_DEBUG
232 workq->cookie = WORKQ_MAGIC;
[1b20da0]233#endif
[a35b458]234
[8a64e81e]235 irq_spinlock_initialize(&workq->lock, name);
236 condvar_initialize(&workq->activate_worker);
[a35b458]237
[8a64e81e]238 list_initialize(&workq->queue);
239 list_initialize(&workq->workers);
[a35b458]240
[8a64e81e]241 workq->item_cnt = 0;
242 workq->stopping = false;
243 workq->name = name;
[a35b458]244
[8a64e81e]245 workq->cur_worker_cnt = 1;
246 workq->idle_worker_cnt = 0;
247 workq->activate_pending = 0;
248 workq->blocked_worker_cnt = 0;
[a35b458]249
[8a64e81e]250 workq->pending_op_cnt = 0;
251 link_initialize(&workq->nb_link);
252}
253
[1b20da0]254/** Initializes a work queue. Returns true if successful.
255 *
[8a64e81e]256 * Before destroying a work queue it must be stopped via
257 * workq_stop().
258 */
[89ea2dc]259bool workq_init(struct work_queue *workq, const char *name)
[8a64e81e]260{
261 workq_preinit(workq, name);
262 return add_worker(workq);
263}
264
265/** Add a new worker thread. Returns false if the thread could not be created. */
266static bool add_worker(struct work_queue *workq)
267{
[63e27ef]268 assert(!workq_corrupted(workq));
[0d56712]269
[1b20da0]270 thread_t *thread = thread_create(worker_thread, workq, TASK,
[8a64e81e]271 THREAD_FLAG_NONE, workq->name);
[a35b458]272
[8a64e81e]273 if (!thread) {
274 irq_spinlock_lock(&workq->lock, true);
[a35b458]275
[8a64e81e]276 /* cur_worker_cnt proactively increased in signal_worker_logic() .*/
[63e27ef]277 assert(0 < workq->cur_worker_cnt);
[8a64e81e]278 --workq->cur_worker_cnt;
[a35b458]279
[8a64e81e]280 irq_spinlock_unlock(&workq->lock, true);
281 return false;
282 }
[a35b458]283
[8a64e81e]284 /* Respect lock ordering. */
285 irq_spinlock_lock(&thread->lock, true);
286 irq_spinlock_lock(&workq->lock, false);
287
288 bool success;
289
290 if (!workq->stopping) {
291 success = true;
[a35b458]292
[8a64e81e]293 /* Try to distribute workers among cpus right away. */
294 unsigned int cpu_id = (workq->cur_worker_cnt) % config.cpu_active;
[a35b458]295
[8a64e81e]296 if (!cpus[cpu_id].active)
297 cpu_id = CPU->id;
298
[1b20da0]299 thread->workq = workq;
[8a64e81e]300 thread->cpu = &cpus[cpu_id];
301 thread->workq_blocked = false;
302 thread->workq_idling = false;
303 link_initialize(&thread->workq_link);
304
305 list_append(&thread->workq_link, &workq->workers);
306 } else {
[1b20da0]307 /*
[8a64e81e]308 * Work queue is shutting down - we must not add the worker
309 * and we cannot destroy it without ready-ing it. Mark it
310 * interrupted so the worker exits right away without even
311 * touching workq.
312 */
313 success = false;
[a35b458]314
[8a64e81e]315 /* cur_worker_cnt proactively increased in signal_worker() .*/
[63e27ef]316 assert(0 < workq->cur_worker_cnt);
[8a64e81e]317 --workq->cur_worker_cnt;
318 }
[a35b458]319
[8a64e81e]320 irq_spinlock_unlock(&workq->lock, false);
321 irq_spinlock_unlock(&thread->lock, true);
322
323 if (!success) {
324 thread_interrupt(thread);
325 }
[a35b458]326
[8a64e81e]327 thread_ready(thread);
[a35b458]328
[8a64e81e]329 return success;
330}
331
[1b20da0]332/** Shuts down the work queue. Waits for all pending work items to complete.
[8a64e81e]333 *
[1b20da0]334 * workq_stop() may only be run once.
[8a64e81e]335 */
336void workq_stop(struct work_queue *workq)
337{
[63e27ef]338 assert(!workq_corrupted(workq));
[a35b458]339
[8a64e81e]340 interrupt_workers(workq);
341 wait_for_workers(workq);
342}
343
344/** Notifies worker threads the work queue is shutting down. */
345static void interrupt_workers(struct work_queue *workq)
346{
347 irq_spinlock_lock(&workq->lock, true);
348
349 /* workq_stop() may only be called once. */
[63e27ef]350 assert(!workq->stopping);
[8a64e81e]351 workq->stopping = true;
[a35b458]352
[8a64e81e]353 /* Respect lock ordering - do not hold workq->lock during broadcast. */
354 irq_spinlock_unlock(&workq->lock, true);
[a35b458]355
[8a64e81e]356 condvar_broadcast(&workq->activate_worker);
357}
358
359/** Waits for all worker threads to exit. */
360static void wait_for_workers(struct work_queue *workq)
361{
[63e27ef]362 assert(!PREEMPTION_DISABLED);
[a35b458]363
[8a64e81e]364 irq_spinlock_lock(&workq->lock, true);
[a35b458]365
[8a64e81e]366 list_foreach_safe(workq->workers, cur_worker, next_worker) {
367 thread_t *worker = list_get_instance(cur_worker, thread_t, workq_link);
368 list_remove(cur_worker);
369
370 /* Wait without the lock. */
371 irq_spinlock_unlock(&workq->lock, true);
[a35b458]372
[8a64e81e]373 thread_join(worker);
374 thread_detach(worker);
[a35b458]375
[8a64e81e]376 irq_spinlock_lock(&workq->lock, true);
377 }
[a35b458]378
[63e27ef]379 assert(list_empty(&workq->workers));
[a35b458]380
[8a64e81e]381 /* Wait for deferred add_worker_op(), signal_worker_op() to finish. */
382 while (0 < workq->cur_worker_cnt || 0 < workq->pending_op_cnt) {
383 irq_spinlock_unlock(&workq->lock, true);
[a35b458]384
[8a64e81e]385 scheduler();
[a35b458]386
[8a64e81e]387 irq_spinlock_lock(&workq->lock, true);
388 }
[a35b458]389
[8a64e81e]390 irq_spinlock_unlock(&workq->lock, true);
391}
392
[1b20da0]393/** Queues a function into the global wait queue without blocking.
394 *
[8a64e81e]395 * See workq_enqueue_noblock() for more details.
396 */
[89ea2dc]397bool workq_global_enqueue_noblock(work_t *work_item, work_func_t func)
[8a64e81e]398{
399 return workq_enqueue_noblock(&g_work_queue, work_item, func);
400}
401
[1b20da0]402/** Queues a function into the global wait queue; may block.
403 *
[8a64e81e]404 * See workq_enqueue() for more details.
405 */
[89ea2dc]406bool workq_global_enqueue(work_t *work_item, work_func_t func)
[8a64e81e]407{
408 return workq_enqueue(&g_work_queue, work_item, func);
409}
410
[1b20da0]411/** Adds a function to be invoked in a separate thread without blocking.
412 *
413 * workq_enqueue_noblock() is guaranteed not to block. It is safe
[8a64e81e]414 * to invoke from interrupt handlers.
[1b20da0]415 *
[8a64e81e]416 * Consider using workq_enqueue() instead if at all possible. Otherwise,
[1b20da0]417 * your work item may have to wait for previously enqueued sleeping
[8a64e81e]418 * work items to complete if you are unlucky.
[1b20da0]419 *
[8a64e81e]420 * @param workq Work queue where to queue the work item.
421 * @param work_item Work item bookkeeping structure. Must be valid
422 * until func() is entered.
423 * @param func User supplied function to invoke in a worker thread.
[a35b458]424
[1b20da0]425 * @return false if work queue is shutting down; function is not
426 * queued for further processing.
[8a64e81e]427 * @return true Otherwise. func() will be invoked in a separate thread.
428 */
[1b20da0]429bool workq_enqueue_noblock(struct work_queue *workq, work_t *work_item,
[8a64e81e]430 work_func_t func)
431{
432 return _workq_enqueue(workq, work_item, func, false);
433}
434
[1b20da0]435/** Adds a function to be invoked in a separate thread; may block.
436 *
437 * While the workq_enqueue() is unlikely to block, it may do so if too
[8a64e81e]438 * many previous work items blocked sleeping.
[1b20da0]439 *
[8a64e81e]440 * @param workq Work queue where to queue the work item.
441 * @param work_item Work item bookkeeping structure. Must be valid
442 * until func() is entered.
443 * @param func User supplied function to invoke in a worker thread.
[a35b458]444
[1b20da0]445 * @return false if work queue is shutting down; function is not
446 * queued for further processing.
[8a64e81e]447 * @return true Otherwise. func() will be invoked in a separate thread.
448 */
[89ea2dc]449bool workq_enqueue(struct work_queue *workq, work_t *work_item, work_func_t func)
[8a64e81e]450{
451 return _workq_enqueue(workq, work_item, func, true);
452}
453
454/** Adds a work item that will be processed by a separate worker thread.
[1b20da0]455 *
456 * func() will be invoked in another kernel thread and may block.
457 *
[8a64e81e]458 * Prefer to call _workq_enqueue() with can_block set. Otherwise
459 * your work item may have to wait for sleeping work items to complete.
460 * If all worker threads are blocked/sleeping a new worker thread cannot
461 * be create without can_block set because creating a thread might
462 * block due to low memory conditions.
[1b20da0]463 *
[8a64e81e]464 * @param workq Work queue where to queue the work item.
465 * @param work_item Work item bookkeeping structure. Must be valid
466 * until func() is entered.
467 * @param func User supplied function to invoke in a worker thread.
468 * @param can_block May adding this work item block?
[a35b458]469
[1b20da0]470 * @return false if work queue is shutting down; function is not
471 * queued for further processing.
[8a64e81e]472 * @return true Otherwise.
473 */
[1b20da0]474static int _workq_enqueue(struct work_queue *workq, work_t *work_item,
[8a64e81e]475 work_func_t func, bool can_block)
476{
[63e27ef]477 assert(!workq_corrupted(workq));
[a35b458]478
[8a64e81e]479 bool success = true;
480 signal_op_t signal_op = NULL;
[a35b458]481
[8a64e81e]482 irq_spinlock_lock(&workq->lock, true);
[a35b458]483
[8a64e81e]484 if (workq->stopping) {
485 success = false;
486 } else {
[0d56712]487 init_work_item(work_item, func);
[8a64e81e]488 list_append(&work_item->queue_link, &workq->queue);
489 ++workq->item_cnt;
490 success = true;
[a35b458]491
[8a64e81e]492 if (!booting) {
493 signal_op = signal_worker_logic(workq, can_block);
494 } else {
[1b20da0]495 /*
496 * During boot there are no workers to signal. Just queue
[8a64e81e]497 * the work and let future workers take care of it.
498 */
499 }
500 }
[a35b458]501
[8a64e81e]502 irq_spinlock_unlock(&workq->lock, true);
503
504 if (signal_op) {
505 signal_op(workq);
506 }
[a35b458]507
[8a64e81e]508 return success;
509}
510
[0d56712]511/** Prepare an item to be added to the work item queue. */
512static void init_work_item(work_t *work_item, work_func_t func)
513{
514#ifdef CONFIG_DEBUG
515 work_item->cookie = WORK_ITEM_MAGIC;
[1b20da0]516#endif
[a35b458]517
[0d56712]518 link_initialize(&work_item->queue_link);
519 work_item->func = func;
520}
521
[8a64e81e]522/** Returns the number of workers running work func() that are not blocked. */
523static size_t active_workers_now(struct work_queue *workq)
524{
[63e27ef]525 assert(irq_spinlock_locked(&workq->lock));
[a35b458]526
[8a64e81e]527 /* Workers blocked are sleeping in the work function (ie not idle). */
[63e27ef]528 assert(workq->blocked_worker_cnt <= workq->cur_worker_cnt);
[8a64e81e]529 /* Idle workers are waiting for more work to arrive in condvar_wait. */
[63e27ef]530 assert(workq->idle_worker_cnt <= workq->cur_worker_cnt);
[a35b458]531
[8a64e81e]532 /* Idle + blocked workers == sleeping worker threads. */
533 size_t sleeping_workers = workq->blocked_worker_cnt + workq->idle_worker_cnt;
[a35b458]534
[63e27ef]535 assert(sleeping_workers <= workq->cur_worker_cnt);
[8a64e81e]536 /* Workers pending activation are idle workers not yet given a time slice. */
[63e27ef]537 assert(workq->activate_pending <= workq->idle_worker_cnt);
[a35b458]538
[1b20da0]539 /*
540 * Workers actively running the work func() this very moment and
541 * are neither blocked nor idle. Exclude ->activate_pending workers
542 * since they will run their work func() once they get a time slice
[8a64e81e]543 * and are not running it right now.
544 */
545 return workq->cur_worker_cnt - sleeping_workers;
546}
547
[1b20da0]548/**
549 * Returns the number of workers that are running or are about to run work
550 * func() and that are not blocked.
[8a64e81e]551 */
552static size_t active_workers(struct work_queue *workq)
553{
[63e27ef]554 assert(irq_spinlock_locked(&workq->lock));
[a35b458]555
[1b20da0]556 /*
557 * Workers actively running the work func() and are neither blocked nor
[8a64e81e]558 * idle. ->activate_pending workers will run their work func() once they
559 * get a time slice after waking from a condvar wait, so count them
560 * as well.
561 */
562 return active_workers_now(workq) + workq->activate_pending;
563}
564
565static void add_worker_noblock_op(struct work_queue *workq)
566{
567 condvar_signal(&nonblock_adder.req_cv);
568}
569
570static void add_worker_op(struct work_queue *workq)
571{
572 add_worker(workq);
573}
574
575static void signal_worker_op(struct work_queue *workq)
576{
[63e27ef]577 assert(!workq_corrupted(workq));
[0d56712]578
[8a64e81e]579 condvar_signal(&workq->activate_worker);
[a35b458]580
[8a64e81e]581 irq_spinlock_lock(&workq->lock, true);
[63e27ef]582 assert(0 < workq->pending_op_cnt);
[8a64e81e]583 --workq->pending_op_cnt;
584 irq_spinlock_unlock(&workq->lock, true);
585}
586
587/** Determines how to signal workers if at all.
[1b20da0]588 *
[8a64e81e]589 * @param workq Work queue where a new work item was queued.
[1b20da0]590 * @param can_block True if we may block while signaling a worker or creating
[8a64e81e]591 * a new worker.
[1b20da0]592 *
[8a64e81e]593 * @return Function that will notify workers or NULL if no action is needed.
594 */
595static signal_op_t signal_worker_logic(struct work_queue *workq, bool can_block)
596{
[63e27ef]597 assert(!workq_corrupted(workq));
598 assert(irq_spinlock_locked(&workq->lock));
[a35b458]599
[8a64e81e]600 /* Only signal workers if really necessary. */
601 signal_op_t signal_op = NULL;
602
[1b20da0]603 /*
604 * Workers actively running the work func() and neither blocked nor idle.
605 * Including ->activate_pending workers that will run their work func()
[8a64e81e]606 * once they get a time slice.
607 */
608 size_t active = active_workers(workq);
609 /* Max total allowed number of work items queued for active workers. */
610 size_t max_load = active * max_items_per_worker;
611
612 /* Active workers are getting overwhelmed - activate another. */
613 if (max_load < workq->item_cnt) {
614
[1b20da0]615 size_t remaining_idle =
[8a64e81e]616 workq->idle_worker_cnt - workq->activate_pending;
617
618 /* Idle workers still exist - activate one. */
619 if (remaining_idle > 0) {
[1b20da0]620 /*
[8a64e81e]621 * Directly changing idle_worker_cnt here would not allow
[1b20da0]622 * workers to recognize spurious wake-ups. Change
[8a64e81e]623 * activate_pending instead.
624 */
625 ++workq->activate_pending;
626 ++workq->pending_op_cnt;
627 signal_op = signal_worker_op;
628 } else {
629 /* No idle workers remain. Request that a new one be created. */
630 bool need_worker = (active < max_concurrent_workers)
631 && (workq->cur_worker_cnt < max_worker_cnt);
[a35b458]632
[8a64e81e]633 if (need_worker && can_block) {
634 signal_op = add_worker_op;
[1b20da0]635 /*
[8a64e81e]636 * It may take some time to actually create the worker.
637 * We don't want to swamp the thread pool with superfluous
638 * worker creation requests so pretend it was already
639 * created and proactively increase the worker count.
640 */
641 ++workq->cur_worker_cnt;
642 }
[a35b458]643
[1b20da0]644 /*
[8a64e81e]645 * We cannot create a new worker but we need one desperately
646 * because all workers are blocked in their work functions.
647 */
648 if (need_worker && !can_block && 0 == active) {
[63e27ef]649 assert(0 == workq->idle_worker_cnt);
[a35b458]650
[8a64e81e]651 irq_spinlock_lock(&nonblock_adder.lock, true);
652
653 if (nonblock_adder.thread && !link_used(&workq->nb_link)) {
654 signal_op = add_worker_noblock_op;
655 ++workq->cur_worker_cnt;
656 list_append(&workq->nb_link, &nonblock_adder.work_queues);
657 }
658
659 irq_spinlock_unlock(&nonblock_adder.lock, true);
660 }
661 }
662 } else {
[1b20da0]663 /*
664 * There are enough active/running workers to process the queue.
[8a64e81e]665 * No need to signal/activate any new workers.
666 */
667 signal_op = NULL;
668 }
[a35b458]669
[8a64e81e]670 return signal_op;
671}
672
673/** Executes queued work items. */
674static void worker_thread(void *arg)
675{
[1b20da0]676 /*
677 * The thread has been created after the work queue was ordered to stop.
678 * Do not access the work queue and return immediately.
[8a64e81e]679 */
680 if (thread_interrupted(THREAD)) {
681 thread_detach(THREAD);
682 return;
683 }
[a35b458]684
[63e27ef]685 assert(arg != NULL);
[a35b458]686
[8a64e81e]687 struct work_queue *workq = arg;
688 work_t *work_item;
[a35b458]689
[8a64e81e]690 while (dequeue_work(workq, &work_item)) {
691 /* Copy the func field so func() can safely free work_item. */
692 work_func_t func = work_item->func;
[0d56712]693
[8a64e81e]694 func(work_item);
695 }
696}
697
698/** Waits and retrieves a work item. Returns false if the worker should exit. */
699static bool dequeue_work(struct work_queue *workq, work_t **pwork_item)
700{
[63e27ef]701 assert(!workq_corrupted(workq));
[a35b458]702
[8a64e81e]703 irq_spinlock_lock(&workq->lock, true);
[a35b458]704
[8a64e81e]705 /* Check if we should exit if load is low. */
706 if (!workq->stopping && worker_unnecessary(workq)) {
707 /* There are too many workers for this load. Exit. */
[63e27ef]708 assert(0 < workq->cur_worker_cnt);
[8a64e81e]709 --workq->cur_worker_cnt;
710 list_remove(&THREAD->workq_link);
711 irq_spinlock_unlock(&workq->lock, true);
[a35b458]712
[8a64e81e]713 thread_detach(THREAD);
714 return false;
715 }
[a35b458]716
[8a64e81e]717 bool stop = false;
[a35b458]718
[8a64e81e]719 /* Wait for work to arrive. */
720 while (list_empty(&workq->queue) && !workq->stopping) {
721 cv_wait(workq);
[a35b458]722
[8a64e81e]723 if (0 < workq->activate_pending)
724 --workq->activate_pending;
725 }
726
727 /* Process remaining work even if requested to stop. */
728 if (!list_empty(&workq->queue)) {
729 link_t *work_link = list_first(&workq->queue);
730 *pwork_item = list_get_instance(work_link, work_t, queue_link);
[a35b458]731
[0d56712]732#ifdef CONFIG_DEBUG
[63e27ef]733 assert(!work_item_corrupted(*pwork_item));
[0d56712]734 (*pwork_item)->cookie = 0;
735#endif
[8a64e81e]736 list_remove(work_link);
737 --workq->item_cnt;
[a35b458]738
[8a64e81e]739 stop = false;
740 } else {
741 /* Requested to stop and no more work queued. */
[63e27ef]742 assert(workq->stopping);
[8a64e81e]743 --workq->cur_worker_cnt;
744 stop = true;
745 }
[a35b458]746
[8a64e81e]747 irq_spinlock_unlock(&workq->lock, true);
[a35b458]748
[8a64e81e]749 return !stop;
750}
751
752/** Returns true if for the given load there are too many workers. */
753static bool worker_unnecessary(struct work_queue *workq)
754{
[63e27ef]755 assert(irq_spinlock_locked(&workq->lock));
[a35b458]756
[8a64e81e]757 /* No work is pending. We don't need too many idle threads. */
758 if (list_empty(&workq->queue)) {
759 /* There are too many idle workers. Exit. */
760 return (min_worker_cnt <= workq->idle_worker_cnt);
761 } else {
[1b20da0]762 /*
[8a64e81e]763 * There is work but we are swamped with too many active workers
764 * that were woken up from sleep at around the same time. We
765 * don't need another worker fighting for cpu time.
766 */
767 size_t active = active_workers_now(workq);
768 return (max_concurrent_workers < active);
769 }
770}
771
772/** Waits for a signal to activate_worker. Thread marked idle while waiting. */
773static void cv_wait(struct work_queue *workq)
774{
775 ++workq->idle_worker_cnt;
776 THREAD->workq_idling = true;
[a35b458]777
[8a64e81e]778 /* Ignore lock ordering just here. */
[63e27ef]779 assert(irq_spinlock_locked(&workq->lock));
[a35b458]780
[8a64e81e]781 _condvar_wait_timeout_irq_spinlock(&workq->activate_worker,
782 &workq->lock, SYNCH_NO_TIMEOUT, SYNCH_FLAGS_NONE);
783
[63e27ef]784 assert(!workq_corrupted(workq));
785 assert(irq_spinlock_locked(&workq->lock));
[a35b458]786
[8a64e81e]787 THREAD->workq_idling = false;
788 --workq->idle_worker_cnt;
789}
790
791
792/** Invoked from thread_ready() right before the thread is woken up. */
793void workq_before_thread_is_ready(thread_t *thread)
794{
[63e27ef]795 assert(thread);
796 assert(irq_spinlock_locked(&thread->lock));
[8a64e81e]797
798 /* Worker's work func() is about to wake up from sleeping. */
799 if (thread->workq && thread->workq_blocked) {
800 /* Must be blocked in user work func() and not be waiting for work. */
[63e27ef]801 assert(!thread->workq_idling);
802 assert(thread->state == Sleeping);
803 assert(THREAD != thread);
804 assert(!workq_corrupted(thread->workq));
[a35b458]805
[8a64e81e]806 /* Protected by thread->lock */
807 thread->workq_blocked = false;
[a35b458]808
[8a64e81e]809 irq_spinlock_lock(&thread->workq->lock, true);
810 --thread->workq->blocked_worker_cnt;
811 irq_spinlock_unlock(&thread->workq->lock, true);
812 }
813}
814
815/** Invoked from scheduler() before switching away from a thread. */
816void workq_after_thread_ran(void)
817{
[63e27ef]818 assert(THREAD);
819 assert(irq_spinlock_locked(&THREAD->lock));
[8a64e81e]820
821 /* Worker's work func() is about to sleep/block. */
822 if (THREAD->workq && THREAD->state == Sleeping && !THREAD->workq_idling) {
[63e27ef]823 assert(!THREAD->workq_blocked);
824 assert(!workq_corrupted(THREAD->workq));
[a35b458]825
[8a64e81e]826 THREAD->workq_blocked = true;
[a35b458]827
[8a64e81e]828 irq_spinlock_lock(&THREAD->workq->lock, false);
829
830 ++THREAD->workq->blocked_worker_cnt;
[a35b458]831
[8a64e81e]832 bool can_block = false;
833 signal_op_t op = signal_worker_logic(THREAD->workq, can_block);
[a35b458]834
[8a64e81e]835 irq_spinlock_unlock(&THREAD->workq->lock, false);
[a35b458]836
[8a64e81e]837 if (op) {
[63e27ef]838 assert(add_worker_noblock_op == op || signal_worker_op == op);
[8a64e81e]839 op(THREAD->workq);
840 }
841 }
842}
843
844/** Prints stats of the work queue to the kernel console. */
845void workq_print_info(struct work_queue *workq)
846{
847 irq_spinlock_lock(&workq->lock, true);
848
849 size_t total = workq->cur_worker_cnt;
850 size_t blocked = workq->blocked_worker_cnt;
851 size_t idle = workq->idle_worker_cnt;
852 size_t active = active_workers(workq);
853 size_t items = workq->item_cnt;
854 bool stopping = workq->stopping;
855 bool worker_surplus = worker_unnecessary(workq);
[1b20da0]856 const char *load_str = worker_surplus ? "decreasing" :
[8a64e81e]857 (0 < workq->activate_pending) ? "increasing" : "stable";
[a35b458]858
[8a64e81e]859 irq_spinlock_unlock(&workq->lock, true);
[a35b458]860
[8a64e81e]861 printf(
[0d56712]862 "Configuration: max_worker_cnt=%zu, min_worker_cnt=%zu,\n"
863 " max_concurrent_workers=%zu, max_items_per_worker=%zu\n"
[8a64e81e]864 "Workers: %zu\n"
865 "Active: %zu (workers currently processing work)\n"
866 "Blocked: %zu (work functions sleeping/blocked)\n"
867 "Idle: %zu (idle workers waiting for more work)\n"
868 "Items: %zu (queued not yet dispatched work)\n"
869 "Stopping: %d\n"
870 "Load: %s\n",
[1b20da0]871 max_worker_cnt, min_worker_cnt,
[8a64e81e]872 max_concurrent_workers, max_items_per_worker,
873 total,
874 active,
875 blocked,
876 idle,
877 items,
878 stopping,
879 load_str
880 );
881}
882
883/** Prints stats of the global work queue. */
884void workq_global_print_info(void)
885{
886 workq_print_info(&g_work_queue);
887}
888
889
890static bool dequeue_add_req(nonblock_adder_t *info, struct work_queue **pworkq)
891{
892 bool stop = false;
893
894 irq_spinlock_lock(&info->lock, true);
[a35b458]895
[8a64e81e]896 while (list_empty(&info->work_queues) && !stop) {
[1b20da0]897 errno_t ret = _condvar_wait_timeout_irq_spinlock(&info->req_cv,
[8a64e81e]898 &info->lock, SYNCH_NO_TIMEOUT, SYNCH_FLAGS_INTERRUPTIBLE);
[a35b458]899
[897fd8f1]900 stop = (ret == EINTR);
[8a64e81e]901 }
[a35b458]902
[8a64e81e]903 if (!stop) {
[1b20da0]904 *pworkq = list_get_instance(list_first(&info->work_queues),
[8a64e81e]905 struct work_queue, nb_link);
[0d56712]906
[63e27ef]907 assert(!workq_corrupted(*pworkq));
[a35b458]908
[8a64e81e]909 list_remove(&(*pworkq)->nb_link);
910 }
[a35b458]911
[8a64e81e]912 irq_spinlock_unlock(&info->lock, true);
[a35b458]913
[8a64e81e]914 return !stop;
915}
916
917static void thr_nonblock_add_worker(void *arg)
918{
919 nonblock_adder_t *info = arg;
920 struct work_queue *workq;
[a35b458]921
[8a64e81e]922 while (dequeue_add_req(info, &workq)) {
923 add_worker(workq);
924 }
925}
926
927
928static void nonblock_init(void)
929{
[0d56712]930 irq_spinlock_initialize(&nonblock_adder.lock, "kworkq-nb.lock");
[8a64e81e]931 condvar_initialize(&nonblock_adder.req_cv);
932 list_initialize(&nonblock_adder.work_queues);
[a35b458]933
[1b20da0]934 nonblock_adder.thread = thread_create(thr_nonblock_add_worker,
[0d56712]935 &nonblock_adder, TASK, THREAD_FLAG_NONE, "kworkq-nb");
[a35b458]936
[8a64e81e]937 if (nonblock_adder.thread) {
938 thread_ready(nonblock_adder.thread);
939 } else {
[1b20da0]940 /*
[8a64e81e]941 * We won't be able to add workers without blocking if all workers
942 * sleep, but at least boot the system.
943 */
[0d56712]944 printf("Failed to create kworkq-nb. Sleeping work may stall the workq.\n");
[8a64e81e]945 }
946}
947
[04552324]948#ifdef CONFIG_DEBUG
[1b20da0]949/** Returns true if the workq is definitely corrupted; false if not sure.
950 *
[0d56712]951 * Can be used outside of any locks.
952 */
953static bool workq_corrupted(struct work_queue *workq)
954{
[1b20da0]955 /*
[0d56712]956 * Needed to make the most current cookie value set by workq_preinit()
957 * visible even if we access the workq right after it is created but
958 * on a different cpu. Otherwise, workq_corrupted() would not work
959 * outside a lock.
960 */
961 memory_barrier();
962 return NULL == workq || workq->cookie != WORKQ_MAGIC;
963}
[8a64e81e]964
[1b20da0]965/** Returns true if the work_item is definitely corrupted; false if not sure.
966 *
[0d56712]967 * Must be used with the work queue protecting spinlock locked.
968 */
969static bool work_item_corrupted(work_t *work_item)
970{
971 return NULL == work_item || work_item->cookie != WORK_ITEM_MAGIC;
972}
[04552324]973#endif
[9f8745c5]974
975/** @}
976 */
Note: See TracBrowser for help on using the repository browser.