source: mainline/kernel/generic/src/synch/workqueue.c@ cc74cb5

lfn serial ticket/834-toolchain-update topic/msim-upgrade topic/simplify-dev-export
Last change on this file since cc74cb5 was 09ab0a9a, checked in by Jiri Svoboda <jiri@…>, 7 years ago

Fix vertical spacing with new Ccheck revision.

  • Property mode set to 100644
File size: 27.1 KB
RevLine 
[9f8745c5]1/*
2 * Copyright (c) 2012 Adam Hraska
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 *
9 * - Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 * - Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
14 * - The name of the author may not be used to endorse or promote products
15 * derived from this software without specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
18 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
19 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
20 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
21 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
22 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
26 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27 */
28
29/** @addtogroup generic
30 * @{
31 */
32
33/**
34 * @file
35 * @brief Work queue/thread pool that automatically adjusts its size
36 * depending on the current load. Queued work functions may sleep..
37 */
38
[63e27ef]39#include <assert.h>
[897fd8f1]40#include <errno.h>
[8a64e81e]41#include <synch/workqueue.h>
42#include <synch/spinlock.h>
43#include <synch/condvar.h>
44#include <synch/mutex.h>
45#include <proc/thread.h>
46#include <config.h>
47#include <arch.h>
48#include <cpu.h>
49#include <macros.h>
50
[0d56712]51#define WORKQ_MAGIC 0xf00c1333U
52#define WORK_ITEM_MAGIC 0xfeec1777U
53
[8a64e81e]54struct work_queue {
[1b20da0]55 /*
56 * Protects everything except activate_worker.
[8a64e81e]57 * Must be acquired after any thread->locks.
58 */
59 IRQ_SPINLOCK_DECLARE(lock);
[a35b458]60
[8a64e81e]61 /* Activates a worker if new work arrives or if shutting down the queue. */
62 condvar_t activate_worker;
[a35b458]63
[8a64e81e]64 /* Queue of work_items ready to be dispatched. */
65 list_t queue;
[a35b458]66
[8a64e81e]67 /* List of worker threads. */
68 list_t workers;
[a35b458]69
[8a64e81e]70 /* Number of work items queued. */
71 size_t item_cnt;
[a35b458]72
[8a64e81e]73 /* Indicates the work queue is shutting down. */
74 bool stopping;
75 const char *name;
76
77 /* Total number of created worker threads. */
78 size_t cur_worker_cnt;
79 /* Number of workers waiting for work to arrive. */
80 size_t idle_worker_cnt;
81 /* Number of idle workers signaled that have not yet been woken up. */
82 size_t activate_pending;
83 /* Number of blocked workers sleeping in work func() (ie not idle). */
84 size_t blocked_worker_cnt;
[a35b458]85
[8a64e81e]86 /* Number of pending signal_worker_op() operations. */
87 size_t pending_op_cnt;
[a35b458]88
[8a64e81e]89 link_t nb_link;
[a35b458]90
[0d56712]91#ifdef CONFIG_DEBUG
92 /* Magic cookie for integrity checks. Immutable. Accessed without lock. */
93 uint32_t cookie;
[1b20da0]94#endif
[8a64e81e]95};
96
97/** Min number of idle workers to keep. */
98static size_t min_worker_cnt;
99/** Max total number of workers - be it blocked, idle, or active. */
100static size_t max_worker_cnt;
101/** Max number of concurrently running active workers, ie not blocked nor idle. */
102static size_t max_concurrent_workers;
103/** Max number of work items per active worker before a new worker is activated.*/
104static const size_t max_items_per_worker = 8;
[a35b458]105
[8a64e81e]106/** System wide work queue. */
107static struct work_queue g_work_queue;
108
109static int booting = true;
110
111typedef struct {
112 IRQ_SPINLOCK_DECLARE(lock);
113 condvar_t req_cv;
114 thread_t *thread;
115 list_t work_queues;
116} nonblock_adder_t;
117
118static nonblock_adder_t nonblock_adder;
119
120/** Typedef a worker thread signaling operation prototype. */
121typedef void (*signal_op_t)(struct work_queue *workq);
122
123/* Fwd decl. */
124static void workq_preinit(struct work_queue *workq, const char *name);
125static bool add_worker(struct work_queue *workq);
126static void interrupt_workers(struct work_queue *workq);
127static void wait_for_workers(struct work_queue *workq);
[1b20da0]128static int _workq_enqueue(struct work_queue *workq, work_t *work_item,
[3bacee1]129 work_func_t func, bool can_block);
[0d56712]130static void init_work_item(work_t *work_item, work_func_t func);
[8a64e81e]131static signal_op_t signal_worker_logic(struct work_queue *workq, bool can_block);
132static void worker_thread(void *arg);
133static bool dequeue_work(struct work_queue *workq, work_t **pwork_item);
134static bool worker_unnecessary(struct work_queue *workq);
135static void cv_wait(struct work_queue *workq);
136static void nonblock_init(void);
[04552324]137
138#ifdef CONFIG_DEBUG
[0d56712]139static bool workq_corrupted(struct work_queue *workq);
140static bool work_item_corrupted(work_t *work_item);
[04552324]141#endif
[8a64e81e]142
143/** Creates worker thread for the system-wide worker queue. */
144void workq_global_worker_init(void)
145{
[1b20da0]146 /*
147 * No need for additional synchronization. Stores to word-sized
[8a64e81e]148 * variables are atomic and the change will eventually propagate.
149 * Moreover add_worker() includes the necessary memory barriers
150 * in spinlock lock/unlock().
151 */
152 booting = false;
[a35b458]153
[8a64e81e]154 nonblock_init();
[a35b458]155
[8a64e81e]156 if (!add_worker(&g_work_queue))
157 panic("Could not create a single global work queue worker!\n");
[a35b458]158
[8a64e81e]159}
160
161/** Initializes the system wide work queue and support for other work queues. */
162void workq_global_init(void)
163{
164 /* Keep idle workers on 1/4-th of cpus, but at least 2 threads. */
165 min_worker_cnt = max(2, config.cpu_count / 4);
166 /* Allow max 8 sleeping work items per cpu. */
167 max_worker_cnt = max(32, 8 * config.cpu_count);
168 /* Maximum concurrency without slowing down the system. */
169 max_concurrent_workers = max(2, config.cpu_count);
[a35b458]170
[8a64e81e]171 workq_preinit(&g_work_queue, "kworkq");
172}
173
174/** Stops the system global work queue and waits for all work items to complete.*/
175void workq_global_stop(void)
176{
177 workq_stop(&g_work_queue);
178}
179
180/** Creates and initializes a work queue. Returns NULL upon failure. */
[3bacee1]181struct work_queue *workq_create(const char *name)
[8a64e81e]182{
[11b285d]183 struct work_queue *workq = malloc(sizeof(struct work_queue));
[7473807]184 if (!workq)
185 return NULL;
[a35b458]186
[8a64e81e]187 if (workq) {
188 if (workq_init(workq, name)) {
[63e27ef]189 assert(!workq_corrupted(workq));
[8a64e81e]190 return workq;
191 }
[a35b458]192
[8a64e81e]193 free(workq);
194 }
[a35b458]195
[8a64e81e]196 return NULL;
197}
198
199/** Frees work queue resources and stops it if it had not been done so already.*/
200void workq_destroy(struct work_queue *workq)
201{
[63e27ef]202 assert(!workq_corrupted(workq));
[a35b458]203
[8a64e81e]204 irq_spinlock_lock(&workq->lock, true);
205 bool stopped = workq->stopping;
[04552324]206#ifdef CONFIG_DEBUG
[8a64e81e]207 size_t running_workers = workq->cur_worker_cnt;
[04552324]208#endif
[8a64e81e]209 irq_spinlock_unlock(&workq->lock, true);
[a35b458]210
[8a64e81e]211 if (!stopped) {
212 workq_stop(workq);
213 } else {
[63e27ef]214 assert(0 == running_workers);
[8a64e81e]215 }
[a35b458]216
[0d56712]217#ifdef CONFIG_DEBUG
218 workq->cookie = 0;
[1b20da0]219#endif
[a35b458]220
[8a64e81e]221 free(workq);
222}
223
224/** Initializes workq structure without creating any workers. */
225static void workq_preinit(struct work_queue *workq, const char *name)
226{
[0d56712]227#ifdef CONFIG_DEBUG
228 workq->cookie = WORKQ_MAGIC;
[1b20da0]229#endif
[a35b458]230
[8a64e81e]231 irq_spinlock_initialize(&workq->lock, name);
232 condvar_initialize(&workq->activate_worker);
[a35b458]233
[8a64e81e]234 list_initialize(&workq->queue);
235 list_initialize(&workq->workers);
[a35b458]236
[8a64e81e]237 workq->item_cnt = 0;
238 workq->stopping = false;
239 workq->name = name;
[a35b458]240
[8a64e81e]241 workq->cur_worker_cnt = 1;
242 workq->idle_worker_cnt = 0;
243 workq->activate_pending = 0;
244 workq->blocked_worker_cnt = 0;
[a35b458]245
[8a64e81e]246 workq->pending_op_cnt = 0;
247 link_initialize(&workq->nb_link);
248}
249
[1b20da0]250/** Initializes a work queue. Returns true if successful.
251 *
[8a64e81e]252 * Before destroying a work queue it must be stopped via
253 * workq_stop().
254 */
[89ea2dc]255bool workq_init(struct work_queue *workq, const char *name)
[8a64e81e]256{
257 workq_preinit(workq, name);
258 return add_worker(workq);
259}
260
261/** Add a new worker thread. Returns false if the thread could not be created. */
262static bool add_worker(struct work_queue *workq)
263{
[63e27ef]264 assert(!workq_corrupted(workq));
[0d56712]265
[1b20da0]266 thread_t *thread = thread_create(worker_thread, workq, TASK,
[3bacee1]267 THREAD_FLAG_NONE, workq->name);
[a35b458]268
[8a64e81e]269 if (!thread) {
270 irq_spinlock_lock(&workq->lock, true);
[a35b458]271
[8a64e81e]272 /* cur_worker_cnt proactively increased in signal_worker_logic() .*/
[63e27ef]273 assert(0 < workq->cur_worker_cnt);
[8a64e81e]274 --workq->cur_worker_cnt;
[a35b458]275
[8a64e81e]276 irq_spinlock_unlock(&workq->lock, true);
277 return false;
278 }
[a35b458]279
[8a64e81e]280 /* Respect lock ordering. */
281 irq_spinlock_lock(&thread->lock, true);
282 irq_spinlock_lock(&workq->lock, false);
283
284 bool success;
285
286 if (!workq->stopping) {
287 success = true;
[a35b458]288
[8a64e81e]289 /* Try to distribute workers among cpus right away. */
290 unsigned int cpu_id = (workq->cur_worker_cnt) % config.cpu_active;
[a35b458]291
[8a64e81e]292 if (!cpus[cpu_id].active)
293 cpu_id = CPU->id;
294
[1b20da0]295 thread->workq = workq;
[8a64e81e]296 thread->cpu = &cpus[cpu_id];
297 thread->workq_blocked = false;
298 thread->workq_idling = false;
299 link_initialize(&thread->workq_link);
300
301 list_append(&thread->workq_link, &workq->workers);
302 } else {
[1b20da0]303 /*
[8a64e81e]304 * Work queue is shutting down - we must not add the worker
305 * and we cannot destroy it without ready-ing it. Mark it
306 * interrupted so the worker exits right away without even
307 * touching workq.
308 */
309 success = false;
[a35b458]310
[8a64e81e]311 /* cur_worker_cnt proactively increased in signal_worker() .*/
[63e27ef]312 assert(0 < workq->cur_worker_cnt);
[8a64e81e]313 --workq->cur_worker_cnt;
314 }
[a35b458]315
[8a64e81e]316 irq_spinlock_unlock(&workq->lock, false);
317 irq_spinlock_unlock(&thread->lock, true);
318
319 if (!success) {
320 thread_interrupt(thread);
321 }
[a35b458]322
[8a64e81e]323 thread_ready(thread);
[a35b458]324
[8a64e81e]325 return success;
326}
327
[1b20da0]328/** Shuts down the work queue. Waits for all pending work items to complete.
[8a64e81e]329 *
[1b20da0]330 * workq_stop() may only be run once.
[8a64e81e]331 */
332void workq_stop(struct work_queue *workq)
333{
[63e27ef]334 assert(!workq_corrupted(workq));
[a35b458]335
[8a64e81e]336 interrupt_workers(workq);
337 wait_for_workers(workq);
338}
339
340/** Notifies worker threads the work queue is shutting down. */
341static void interrupt_workers(struct work_queue *workq)
342{
343 irq_spinlock_lock(&workq->lock, true);
344
345 /* workq_stop() may only be called once. */
[63e27ef]346 assert(!workq->stopping);
[8a64e81e]347 workq->stopping = true;
[a35b458]348
[8a64e81e]349 /* Respect lock ordering - do not hold workq->lock during broadcast. */
350 irq_spinlock_unlock(&workq->lock, true);
[a35b458]351
[8a64e81e]352 condvar_broadcast(&workq->activate_worker);
353}
354
355/** Waits for all worker threads to exit. */
356static void wait_for_workers(struct work_queue *workq)
357{
[63e27ef]358 assert(!PREEMPTION_DISABLED);
[a35b458]359
[8a64e81e]360 irq_spinlock_lock(&workq->lock, true);
[a35b458]361
[8a64e81e]362 list_foreach_safe(workq->workers, cur_worker, next_worker) {
363 thread_t *worker = list_get_instance(cur_worker, thread_t, workq_link);
364 list_remove(cur_worker);
365
366 /* Wait without the lock. */
367 irq_spinlock_unlock(&workq->lock, true);
[a35b458]368
[8a64e81e]369 thread_join(worker);
370 thread_detach(worker);
[a35b458]371
[8a64e81e]372 irq_spinlock_lock(&workq->lock, true);
373 }
[a35b458]374
[63e27ef]375 assert(list_empty(&workq->workers));
[a35b458]376
[8a64e81e]377 /* Wait for deferred add_worker_op(), signal_worker_op() to finish. */
378 while (0 < workq->cur_worker_cnt || 0 < workq->pending_op_cnt) {
379 irq_spinlock_unlock(&workq->lock, true);
[a35b458]380
[8a64e81e]381 scheduler();
[a35b458]382
[8a64e81e]383 irq_spinlock_lock(&workq->lock, true);
384 }
[a35b458]385
[8a64e81e]386 irq_spinlock_unlock(&workq->lock, true);
387}
388
[1b20da0]389/** Queues a function into the global wait queue without blocking.
390 *
[8a64e81e]391 * See workq_enqueue_noblock() for more details.
392 */
[89ea2dc]393bool workq_global_enqueue_noblock(work_t *work_item, work_func_t func)
[8a64e81e]394{
395 return workq_enqueue_noblock(&g_work_queue, work_item, func);
396}
397
[1b20da0]398/** Queues a function into the global wait queue; may block.
399 *
[8a64e81e]400 * See workq_enqueue() for more details.
401 */
[89ea2dc]402bool workq_global_enqueue(work_t *work_item, work_func_t func)
[8a64e81e]403{
404 return workq_enqueue(&g_work_queue, work_item, func);
405}
406
[1b20da0]407/** Adds a function to be invoked in a separate thread without blocking.
408 *
409 * workq_enqueue_noblock() is guaranteed not to block. It is safe
[8a64e81e]410 * to invoke from interrupt handlers.
[1b20da0]411 *
[8a64e81e]412 * Consider using workq_enqueue() instead if at all possible. Otherwise,
[1b20da0]413 * your work item may have to wait for previously enqueued sleeping
[8a64e81e]414 * work items to complete if you are unlucky.
[1b20da0]415 *
[8a64e81e]416 * @param workq Work queue where to queue the work item.
417 * @param work_item Work item bookkeeping structure. Must be valid
418 * until func() is entered.
419 * @param func User supplied function to invoke in a worker thread.
[7c3fb9b]420 *
[1b20da0]421 * @return false if work queue is shutting down; function is not
422 * queued for further processing.
[8a64e81e]423 * @return true Otherwise. func() will be invoked in a separate thread.
424 */
[1b20da0]425bool workq_enqueue_noblock(struct work_queue *workq, work_t *work_item,
[3bacee1]426 work_func_t func)
[8a64e81e]427{
428 return _workq_enqueue(workq, work_item, func, false);
429}
430
[1b20da0]431/** Adds a function to be invoked in a separate thread; may block.
432 *
433 * While the workq_enqueue() is unlikely to block, it may do so if too
[8a64e81e]434 * many previous work items blocked sleeping.
[1b20da0]435 *
[8a64e81e]436 * @param workq Work queue where to queue the work item.
437 * @param work_item Work item bookkeeping structure. Must be valid
438 * until func() is entered.
439 * @param func User supplied function to invoke in a worker thread.
[7c3fb9b]440 *
[1b20da0]441 * @return false if work queue is shutting down; function is not
442 * queued for further processing.
[8a64e81e]443 * @return true Otherwise. func() will be invoked in a separate thread.
444 */
[89ea2dc]445bool workq_enqueue(struct work_queue *workq, work_t *work_item, work_func_t func)
[8a64e81e]446{
447 return _workq_enqueue(workq, work_item, func, true);
448}
449
450/** Adds a work item that will be processed by a separate worker thread.
[1b20da0]451 *
452 * func() will be invoked in another kernel thread and may block.
453 *
[8a64e81e]454 * Prefer to call _workq_enqueue() with can_block set. Otherwise
455 * your work item may have to wait for sleeping work items to complete.
456 * If all worker threads are blocked/sleeping a new worker thread cannot
457 * be create without can_block set because creating a thread might
458 * block due to low memory conditions.
[1b20da0]459 *
[8a64e81e]460 * @param workq Work queue where to queue the work item.
461 * @param work_item Work item bookkeeping structure. Must be valid
462 * until func() is entered.
463 * @param func User supplied function to invoke in a worker thread.
464 * @param can_block May adding this work item block?
[7c3fb9b]465 *
[1b20da0]466 * @return false if work queue is shutting down; function is not
467 * queued for further processing.
[8a64e81e]468 * @return true Otherwise.
469 */
[1b20da0]470static int _workq_enqueue(struct work_queue *workq, work_t *work_item,
[3bacee1]471 work_func_t func, bool can_block)
[8a64e81e]472{
[63e27ef]473 assert(!workq_corrupted(workq));
[a35b458]474
[8a64e81e]475 bool success = true;
476 signal_op_t signal_op = NULL;
[a35b458]477
[8a64e81e]478 irq_spinlock_lock(&workq->lock, true);
[a35b458]479
[8a64e81e]480 if (workq->stopping) {
481 success = false;
482 } else {
[0d56712]483 init_work_item(work_item, func);
[8a64e81e]484 list_append(&work_item->queue_link, &workq->queue);
485 ++workq->item_cnt;
486 success = true;
[a35b458]487
[8a64e81e]488 if (!booting) {
489 signal_op = signal_worker_logic(workq, can_block);
490 } else {
[1b20da0]491 /*
492 * During boot there are no workers to signal. Just queue
[8a64e81e]493 * the work and let future workers take care of it.
494 */
495 }
496 }
[a35b458]497
[8a64e81e]498 irq_spinlock_unlock(&workq->lock, true);
499
500 if (signal_op) {
501 signal_op(workq);
502 }
[a35b458]503
[8a64e81e]504 return success;
505}
506
[0d56712]507/** Prepare an item to be added to the work item queue. */
508static void init_work_item(work_t *work_item, work_func_t func)
509{
510#ifdef CONFIG_DEBUG
511 work_item->cookie = WORK_ITEM_MAGIC;
[1b20da0]512#endif
[a35b458]513
[0d56712]514 link_initialize(&work_item->queue_link);
515 work_item->func = func;
516}
517
[8a64e81e]518/** Returns the number of workers running work func() that are not blocked. */
519static size_t active_workers_now(struct work_queue *workq)
520{
[63e27ef]521 assert(irq_spinlock_locked(&workq->lock));
[a35b458]522
[8a64e81e]523 /* Workers blocked are sleeping in the work function (ie not idle). */
[63e27ef]524 assert(workq->blocked_worker_cnt <= workq->cur_worker_cnt);
[8a64e81e]525 /* Idle workers are waiting for more work to arrive in condvar_wait. */
[63e27ef]526 assert(workq->idle_worker_cnt <= workq->cur_worker_cnt);
[a35b458]527
[8a64e81e]528 /* Idle + blocked workers == sleeping worker threads. */
529 size_t sleeping_workers = workq->blocked_worker_cnt + workq->idle_worker_cnt;
[a35b458]530
[63e27ef]531 assert(sleeping_workers <= workq->cur_worker_cnt);
[8a64e81e]532 /* Workers pending activation are idle workers not yet given a time slice. */
[63e27ef]533 assert(workq->activate_pending <= workq->idle_worker_cnt);
[a35b458]534
[1b20da0]535 /*
536 * Workers actively running the work func() this very moment and
537 * are neither blocked nor idle. Exclude ->activate_pending workers
538 * since they will run their work func() once they get a time slice
[8a64e81e]539 * and are not running it right now.
540 */
541 return workq->cur_worker_cnt - sleeping_workers;
542}
543
[1b20da0]544/**
545 * Returns the number of workers that are running or are about to run work
546 * func() and that are not blocked.
[8a64e81e]547 */
548static size_t active_workers(struct work_queue *workq)
549{
[63e27ef]550 assert(irq_spinlock_locked(&workq->lock));
[a35b458]551
[1b20da0]552 /*
553 * Workers actively running the work func() and are neither blocked nor
[8a64e81e]554 * idle. ->activate_pending workers will run their work func() once they
555 * get a time slice after waking from a condvar wait, so count them
556 * as well.
557 */
558 return active_workers_now(workq) + workq->activate_pending;
559}
560
561static void add_worker_noblock_op(struct work_queue *workq)
562{
563 condvar_signal(&nonblock_adder.req_cv);
564}
565
566static void add_worker_op(struct work_queue *workq)
567{
568 add_worker(workq);
569}
570
571static void signal_worker_op(struct work_queue *workq)
572{
[63e27ef]573 assert(!workq_corrupted(workq));
[0d56712]574
[8a64e81e]575 condvar_signal(&workq->activate_worker);
[a35b458]576
[8a64e81e]577 irq_spinlock_lock(&workq->lock, true);
[63e27ef]578 assert(0 < workq->pending_op_cnt);
[8a64e81e]579 --workq->pending_op_cnt;
580 irq_spinlock_unlock(&workq->lock, true);
581}
582
583/** Determines how to signal workers if at all.
[1b20da0]584 *
[8a64e81e]585 * @param workq Work queue where a new work item was queued.
[1b20da0]586 * @param can_block True if we may block while signaling a worker or creating
[8a64e81e]587 * a new worker.
[1b20da0]588 *
[8a64e81e]589 * @return Function that will notify workers or NULL if no action is needed.
590 */
591static signal_op_t signal_worker_logic(struct work_queue *workq, bool can_block)
592{
[63e27ef]593 assert(!workq_corrupted(workq));
594 assert(irq_spinlock_locked(&workq->lock));
[a35b458]595
[8a64e81e]596 /* Only signal workers if really necessary. */
597 signal_op_t signal_op = NULL;
598
[1b20da0]599 /*
600 * Workers actively running the work func() and neither blocked nor idle.
601 * Including ->activate_pending workers that will run their work func()
[8a64e81e]602 * once they get a time slice.
603 */
604 size_t active = active_workers(workq);
605 /* Max total allowed number of work items queued for active workers. */
606 size_t max_load = active * max_items_per_worker;
607
608 /* Active workers are getting overwhelmed - activate another. */
609 if (max_load < workq->item_cnt) {
610
[1b20da0]611 size_t remaining_idle =
[3bacee1]612 workq->idle_worker_cnt - workq->activate_pending;
[8a64e81e]613
614 /* Idle workers still exist - activate one. */
615 if (remaining_idle > 0) {
[1b20da0]616 /*
[8a64e81e]617 * Directly changing idle_worker_cnt here would not allow
[1b20da0]618 * workers to recognize spurious wake-ups. Change
[8a64e81e]619 * activate_pending instead.
620 */
621 ++workq->activate_pending;
622 ++workq->pending_op_cnt;
623 signal_op = signal_worker_op;
624 } else {
625 /* No idle workers remain. Request that a new one be created. */
[3bacee1]626 bool need_worker = (active < max_concurrent_workers) &&
627 (workq->cur_worker_cnt < max_worker_cnt);
[a35b458]628
[8a64e81e]629 if (need_worker && can_block) {
630 signal_op = add_worker_op;
[1b20da0]631 /*
[8a64e81e]632 * It may take some time to actually create the worker.
633 * We don't want to swamp the thread pool with superfluous
634 * worker creation requests so pretend it was already
635 * created and proactively increase the worker count.
636 */
637 ++workq->cur_worker_cnt;
638 }
[a35b458]639
[1b20da0]640 /*
[8a64e81e]641 * We cannot create a new worker but we need one desperately
642 * because all workers are blocked in their work functions.
643 */
644 if (need_worker && !can_block && 0 == active) {
[63e27ef]645 assert(0 == workq->idle_worker_cnt);
[a35b458]646
[8a64e81e]647 irq_spinlock_lock(&nonblock_adder.lock, true);
648
649 if (nonblock_adder.thread && !link_used(&workq->nb_link)) {
650 signal_op = add_worker_noblock_op;
651 ++workq->cur_worker_cnt;
652 list_append(&workq->nb_link, &nonblock_adder.work_queues);
653 }
654
655 irq_spinlock_unlock(&nonblock_adder.lock, true);
656 }
657 }
658 } else {
[1b20da0]659 /*
660 * There are enough active/running workers to process the queue.
[8a64e81e]661 * No need to signal/activate any new workers.
662 */
663 signal_op = NULL;
664 }
[a35b458]665
[8a64e81e]666 return signal_op;
667}
668
669/** Executes queued work items. */
670static void worker_thread(void *arg)
671{
[1b20da0]672 /*
673 * The thread has been created after the work queue was ordered to stop.
674 * Do not access the work queue and return immediately.
[8a64e81e]675 */
676 if (thread_interrupted(THREAD)) {
677 thread_detach(THREAD);
678 return;
679 }
[a35b458]680
[63e27ef]681 assert(arg != NULL);
[a35b458]682
[8a64e81e]683 struct work_queue *workq = arg;
684 work_t *work_item;
[a35b458]685
[8a64e81e]686 while (dequeue_work(workq, &work_item)) {
687 /* Copy the func field so func() can safely free work_item. */
688 work_func_t func = work_item->func;
[0d56712]689
[8a64e81e]690 func(work_item);
691 }
692}
693
694/** Waits and retrieves a work item. Returns false if the worker should exit. */
695static bool dequeue_work(struct work_queue *workq, work_t **pwork_item)
696{
[63e27ef]697 assert(!workq_corrupted(workq));
[a35b458]698
[8a64e81e]699 irq_spinlock_lock(&workq->lock, true);
[a35b458]700
[8a64e81e]701 /* Check if we should exit if load is low. */
702 if (!workq->stopping && worker_unnecessary(workq)) {
703 /* There are too many workers for this load. Exit. */
[63e27ef]704 assert(0 < workq->cur_worker_cnt);
[8a64e81e]705 --workq->cur_worker_cnt;
706 list_remove(&THREAD->workq_link);
707 irq_spinlock_unlock(&workq->lock, true);
[a35b458]708
[8a64e81e]709 thread_detach(THREAD);
710 return false;
711 }
[a35b458]712
[8a64e81e]713 bool stop = false;
[a35b458]714
[8a64e81e]715 /* Wait for work to arrive. */
716 while (list_empty(&workq->queue) && !workq->stopping) {
717 cv_wait(workq);
[a35b458]718
[8a64e81e]719 if (0 < workq->activate_pending)
720 --workq->activate_pending;
721 }
722
723 /* Process remaining work even if requested to stop. */
724 if (!list_empty(&workq->queue)) {
725 link_t *work_link = list_first(&workq->queue);
726 *pwork_item = list_get_instance(work_link, work_t, queue_link);
[a35b458]727
[0d56712]728#ifdef CONFIG_DEBUG
[63e27ef]729 assert(!work_item_corrupted(*pwork_item));
[0d56712]730 (*pwork_item)->cookie = 0;
731#endif
[8a64e81e]732 list_remove(work_link);
733 --workq->item_cnt;
[a35b458]734
[8a64e81e]735 stop = false;
736 } else {
737 /* Requested to stop and no more work queued. */
[63e27ef]738 assert(workq->stopping);
[8a64e81e]739 --workq->cur_worker_cnt;
740 stop = true;
741 }
[a35b458]742
[8a64e81e]743 irq_spinlock_unlock(&workq->lock, true);
[a35b458]744
[8a64e81e]745 return !stop;
746}
747
748/** Returns true if for the given load there are too many workers. */
749static bool worker_unnecessary(struct work_queue *workq)
750{
[63e27ef]751 assert(irq_spinlock_locked(&workq->lock));
[a35b458]752
[8a64e81e]753 /* No work is pending. We don't need too many idle threads. */
754 if (list_empty(&workq->queue)) {
755 /* There are too many idle workers. Exit. */
756 return (min_worker_cnt <= workq->idle_worker_cnt);
757 } else {
[1b20da0]758 /*
[8a64e81e]759 * There is work but we are swamped with too many active workers
760 * that were woken up from sleep at around the same time. We
761 * don't need another worker fighting for cpu time.
762 */
763 size_t active = active_workers_now(workq);
764 return (max_concurrent_workers < active);
765 }
766}
767
768/** Waits for a signal to activate_worker. Thread marked idle while waiting. */
769static void cv_wait(struct work_queue *workq)
770{
771 ++workq->idle_worker_cnt;
772 THREAD->workq_idling = true;
[a35b458]773
[8a64e81e]774 /* Ignore lock ordering just here. */
[63e27ef]775 assert(irq_spinlock_locked(&workq->lock));
[a35b458]776
[8a64e81e]777 _condvar_wait_timeout_irq_spinlock(&workq->activate_worker,
[3bacee1]778 &workq->lock, SYNCH_NO_TIMEOUT, SYNCH_FLAGS_NONE);
[8a64e81e]779
[63e27ef]780 assert(!workq_corrupted(workq));
781 assert(irq_spinlock_locked(&workq->lock));
[a35b458]782
[8a64e81e]783 THREAD->workq_idling = false;
784 --workq->idle_worker_cnt;
785}
786
787/** Invoked from thread_ready() right before the thread is woken up. */
788void workq_before_thread_is_ready(thread_t *thread)
789{
[63e27ef]790 assert(thread);
791 assert(irq_spinlock_locked(&thread->lock));
[8a64e81e]792
793 /* Worker's work func() is about to wake up from sleeping. */
794 if (thread->workq && thread->workq_blocked) {
795 /* Must be blocked in user work func() and not be waiting for work. */
[63e27ef]796 assert(!thread->workq_idling);
797 assert(thread->state == Sleeping);
798 assert(THREAD != thread);
799 assert(!workq_corrupted(thread->workq));
[a35b458]800
[8a64e81e]801 /* Protected by thread->lock */
802 thread->workq_blocked = false;
[a35b458]803
[8a64e81e]804 irq_spinlock_lock(&thread->workq->lock, true);
805 --thread->workq->blocked_worker_cnt;
806 irq_spinlock_unlock(&thread->workq->lock, true);
807 }
808}
809
810/** Invoked from scheduler() before switching away from a thread. */
811void workq_after_thread_ran(void)
812{
[63e27ef]813 assert(THREAD);
814 assert(irq_spinlock_locked(&THREAD->lock));
[8a64e81e]815
816 /* Worker's work func() is about to sleep/block. */
817 if (THREAD->workq && THREAD->state == Sleeping && !THREAD->workq_idling) {
[63e27ef]818 assert(!THREAD->workq_blocked);
819 assert(!workq_corrupted(THREAD->workq));
[a35b458]820
[8a64e81e]821 THREAD->workq_blocked = true;
[a35b458]822
[8a64e81e]823 irq_spinlock_lock(&THREAD->workq->lock, false);
824
825 ++THREAD->workq->blocked_worker_cnt;
[a35b458]826
[8a64e81e]827 bool can_block = false;
828 signal_op_t op = signal_worker_logic(THREAD->workq, can_block);
[a35b458]829
[8a64e81e]830 irq_spinlock_unlock(&THREAD->workq->lock, false);
[a35b458]831
[8a64e81e]832 if (op) {
[63e27ef]833 assert(add_worker_noblock_op == op || signal_worker_op == op);
[8a64e81e]834 op(THREAD->workq);
835 }
836 }
837}
838
839/** Prints stats of the work queue to the kernel console. */
840void workq_print_info(struct work_queue *workq)
841{
842 irq_spinlock_lock(&workq->lock, true);
843
844 size_t total = workq->cur_worker_cnt;
845 size_t blocked = workq->blocked_worker_cnt;
846 size_t idle = workq->idle_worker_cnt;
847 size_t active = active_workers(workq);
848 size_t items = workq->item_cnt;
849 bool stopping = workq->stopping;
850 bool worker_surplus = worker_unnecessary(workq);
[1b20da0]851 const char *load_str = worker_surplus ? "decreasing" :
[3bacee1]852 (0 < workq->activate_pending) ? "increasing" : "stable";
[a35b458]853
[8a64e81e]854 irq_spinlock_unlock(&workq->lock, true);
[a35b458]855
[8a64e81e]856 printf(
[3bacee1]857 "Configuration: max_worker_cnt=%zu, min_worker_cnt=%zu,\n"
858 " max_concurrent_workers=%zu, max_items_per_worker=%zu\n"
859 "Workers: %zu\n"
860 "Active: %zu (workers currently processing work)\n"
861 "Blocked: %zu (work functions sleeping/blocked)\n"
862 "Idle: %zu (idle workers waiting for more work)\n"
863 "Items: %zu (queued not yet dispatched work)\n"
864 "Stopping: %d\n"
865 "Load: %s\n",
866 max_worker_cnt, min_worker_cnt,
867 max_concurrent_workers, max_items_per_worker,
868 total,
869 active,
870 blocked,
871 idle,
872 items,
873 stopping,
874 load_str);
[8a64e81e]875}
876
877/** Prints stats of the global work queue. */
878void workq_global_print_info(void)
879{
880 workq_print_info(&g_work_queue);
881}
882
883static bool dequeue_add_req(nonblock_adder_t *info, struct work_queue **pworkq)
884{
885 bool stop = false;
886
887 irq_spinlock_lock(&info->lock, true);
[a35b458]888
[8a64e81e]889 while (list_empty(&info->work_queues) && !stop) {
[1b20da0]890 errno_t ret = _condvar_wait_timeout_irq_spinlock(&info->req_cv,
[3bacee1]891 &info->lock, SYNCH_NO_TIMEOUT, SYNCH_FLAGS_INTERRUPTIBLE);
[a35b458]892
[897fd8f1]893 stop = (ret == EINTR);
[8a64e81e]894 }
[a35b458]895
[8a64e81e]896 if (!stop) {
[1b20da0]897 *pworkq = list_get_instance(list_first(&info->work_queues),
[3bacee1]898 struct work_queue, nb_link);
[0d56712]899
[63e27ef]900 assert(!workq_corrupted(*pworkq));
[a35b458]901
[8a64e81e]902 list_remove(&(*pworkq)->nb_link);
903 }
[a35b458]904
[8a64e81e]905 irq_spinlock_unlock(&info->lock, true);
[a35b458]906
[8a64e81e]907 return !stop;
908}
909
910static void thr_nonblock_add_worker(void *arg)
911{
912 nonblock_adder_t *info = arg;
[0a4667a7]913 struct work_queue *workq = NULL;
[a35b458]914
[8a64e81e]915 while (dequeue_add_req(info, &workq)) {
916 add_worker(workq);
917 }
918}
919
920static void nonblock_init(void)
921{
[0d56712]922 irq_spinlock_initialize(&nonblock_adder.lock, "kworkq-nb.lock");
[8a64e81e]923 condvar_initialize(&nonblock_adder.req_cv);
924 list_initialize(&nonblock_adder.work_queues);
[a35b458]925
[1b20da0]926 nonblock_adder.thread = thread_create(thr_nonblock_add_worker,
[3bacee1]927 &nonblock_adder, TASK, THREAD_FLAG_NONE, "kworkq-nb");
[a35b458]928
[8a64e81e]929 if (nonblock_adder.thread) {
930 thread_ready(nonblock_adder.thread);
931 } else {
[1b20da0]932 /*
[8a64e81e]933 * We won't be able to add workers without blocking if all workers
934 * sleep, but at least boot the system.
935 */
[0d56712]936 printf("Failed to create kworkq-nb. Sleeping work may stall the workq.\n");
[8a64e81e]937 }
938}
939
[04552324]940#ifdef CONFIG_DEBUG
[1b20da0]941/** Returns true if the workq is definitely corrupted; false if not sure.
942 *
[0d56712]943 * Can be used outside of any locks.
944 */
945static bool workq_corrupted(struct work_queue *workq)
946{
[1b20da0]947 /*
[0d56712]948 * Needed to make the most current cookie value set by workq_preinit()
949 * visible even if we access the workq right after it is created but
950 * on a different cpu. Otherwise, workq_corrupted() would not work
951 * outside a lock.
952 */
953 memory_barrier();
954 return NULL == workq || workq->cookie != WORKQ_MAGIC;
955}
[8a64e81e]956
[1b20da0]957/** Returns true if the work_item is definitely corrupted; false if not sure.
958 *
[0d56712]959 * Must be used with the work queue protecting spinlock locked.
960 */
961static bool work_item_corrupted(work_t *work_item)
962{
963 return NULL == work_item || work_item->cookie != WORK_ITEM_MAGIC;
964}
[04552324]965#endif
[9f8745c5]966
967/** @}
968 */
Note: See TracBrowser for help on using the repository browser.