source: mainline/kernel/generic/src/synch/workqueue.c@ bf2042f9

lfn serial ticket/834-toolchain-update topic/msim-upgrade topic/simplify-dev-export
Last change on this file since bf2042f9 was a35b458, checked in by Jiří Zárevúcky <zarevucky.jiri@…>, 7 years ago

style: Remove trailing whitespace on _all_ lines, including empty ones, for particular file types.

Command used: tools/srepl '\s\+$' '' -- *.c *.h *.py *.sh *.s *.S *.ag

Currently, whitespace on empty lines is very inconsistent.
There are two basic choices: Either remove the whitespace, or keep empty lines
indented to the level of surrounding code. The former is AFAICT more common,
and also much easier to do automatically.

Alternatively, we could write script for automatic indentation, and use that
instead. However, if such a script exists, it's possible to use the indented
style locally, by having the editor apply relevant conversions on load/save,
without affecting remote repository. IMO, it makes more sense to adopt
the simpler rule.

  • Property mode set to 100644
File size: 27.0 KB
Line 
1/*
2 * Copyright (c) 2012 Adam Hraska
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 *
9 * - Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 * - Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
14 * - The name of the author may not be used to endorse or promote products
15 * derived from this software without specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
18 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
19 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
20 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
21 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
22 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
26 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27 */
28
29/** @addtogroup generic
30 * @{
31 */
32
33/**
34 * @file
35 * @brief Work queue/thread pool that automatically adjusts its size
36 * depending on the current load. Queued work functions may sleep..
37 */
38
39#include <assert.h>
40#include <errno.h>
41#include <synch/workqueue.h>
42#include <synch/spinlock.h>
43#include <synch/condvar.h>
44#include <synch/mutex.h>
45#include <proc/thread.h>
46#include <config.h>
47#include <arch.h>
48#include <cpu.h>
49#include <macros.h>
50
51#define WORKQ_MAGIC 0xf00c1333U
52#define WORK_ITEM_MAGIC 0xfeec1777U
53
54
55struct work_queue {
56 /*
57 * Protects everything except activate_worker.
58 * Must be acquired after any thread->locks.
59 */
60 IRQ_SPINLOCK_DECLARE(lock);
61
62 /* Activates a worker if new work arrives or if shutting down the queue. */
63 condvar_t activate_worker;
64
65 /* Queue of work_items ready to be dispatched. */
66 list_t queue;
67
68 /* List of worker threads. */
69 list_t workers;
70
71 /* Number of work items queued. */
72 size_t item_cnt;
73
74 /* Indicates the work queue is shutting down. */
75 bool stopping;
76 const char *name;
77
78 /* Total number of created worker threads. */
79 size_t cur_worker_cnt;
80 /* Number of workers waiting for work to arrive. */
81 size_t idle_worker_cnt;
82 /* Number of idle workers signaled that have not yet been woken up. */
83 size_t activate_pending;
84 /* Number of blocked workers sleeping in work func() (ie not idle). */
85 size_t blocked_worker_cnt;
86
87 /* Number of pending signal_worker_op() operations. */
88 size_t pending_op_cnt;
89
90 link_t nb_link;
91
92#ifdef CONFIG_DEBUG
93 /* Magic cookie for integrity checks. Immutable. Accessed without lock. */
94 uint32_t cookie;
95#endif
96};
97
98
99/** Min number of idle workers to keep. */
100static size_t min_worker_cnt;
101/** Max total number of workers - be it blocked, idle, or active. */
102static size_t max_worker_cnt;
103/** Max number of concurrently running active workers, ie not blocked nor idle. */
104static size_t max_concurrent_workers;
105/** Max number of work items per active worker before a new worker is activated.*/
106static const size_t max_items_per_worker = 8;
107
108/** System wide work queue. */
109static struct work_queue g_work_queue;
110
111static int booting = true;
112
113
114typedef struct {
115 IRQ_SPINLOCK_DECLARE(lock);
116 condvar_t req_cv;
117 thread_t *thread;
118 list_t work_queues;
119} nonblock_adder_t;
120
121static nonblock_adder_t nonblock_adder;
122
123
124
125/** Typedef a worker thread signaling operation prototype. */
126typedef void (*signal_op_t)(struct work_queue *workq);
127
128
129/* Fwd decl. */
130static void workq_preinit(struct work_queue *workq, const char *name);
131static bool add_worker(struct work_queue *workq);
132static void interrupt_workers(struct work_queue *workq);
133static void wait_for_workers(struct work_queue *workq);
134static int _workq_enqueue(struct work_queue *workq, work_t *work_item,
135 work_func_t func, bool can_block);
136static void init_work_item(work_t *work_item, work_func_t func);
137static signal_op_t signal_worker_logic(struct work_queue *workq, bool can_block);
138static void worker_thread(void *arg);
139static bool dequeue_work(struct work_queue *workq, work_t **pwork_item);
140static bool worker_unnecessary(struct work_queue *workq);
141static void cv_wait(struct work_queue *workq);
142static void nonblock_init(void);
143
144#ifdef CONFIG_DEBUG
145static bool workq_corrupted(struct work_queue *workq);
146static bool work_item_corrupted(work_t *work_item);
147#endif
148
149/** Creates worker thread for the system-wide worker queue. */
150void workq_global_worker_init(void)
151{
152 /*
153 * No need for additional synchronization. Stores to word-sized
154 * variables are atomic and the change will eventually propagate.
155 * Moreover add_worker() includes the necessary memory barriers
156 * in spinlock lock/unlock().
157 */
158 booting = false;
159
160 nonblock_init();
161
162 if (!add_worker(&g_work_queue))
163 panic("Could not create a single global work queue worker!\n");
164
165}
166
167/** Initializes the system wide work queue and support for other work queues. */
168void workq_global_init(void)
169{
170 /* Keep idle workers on 1/4-th of cpus, but at least 2 threads. */
171 min_worker_cnt = max(2, config.cpu_count / 4);
172 /* Allow max 8 sleeping work items per cpu. */
173 max_worker_cnt = max(32, 8 * config.cpu_count);
174 /* Maximum concurrency without slowing down the system. */
175 max_concurrent_workers = max(2, config.cpu_count);
176
177 workq_preinit(&g_work_queue, "kworkq");
178}
179
180/** Stops the system global work queue and waits for all work items to complete.*/
181void workq_global_stop(void)
182{
183 workq_stop(&g_work_queue);
184}
185
186/** Creates and initializes a work queue. Returns NULL upon failure. */
187struct work_queue * workq_create(const char *name)
188{
189 struct work_queue *workq = malloc(sizeof(struct work_queue), 0);
190
191 if (workq) {
192 if (workq_init(workq, name)) {
193 assert(!workq_corrupted(workq));
194 return workq;
195 }
196
197 free(workq);
198 }
199
200 return NULL;
201}
202
203/** Frees work queue resources and stops it if it had not been done so already.*/
204void workq_destroy(struct work_queue *workq)
205{
206 assert(!workq_corrupted(workq));
207
208 irq_spinlock_lock(&workq->lock, true);
209 bool stopped = workq->stopping;
210#ifdef CONFIG_DEBUG
211 size_t running_workers = workq->cur_worker_cnt;
212#endif
213 irq_spinlock_unlock(&workq->lock, true);
214
215 if (!stopped) {
216 workq_stop(workq);
217 } else {
218 assert(0 == running_workers);
219 }
220
221#ifdef CONFIG_DEBUG
222 workq->cookie = 0;
223#endif
224
225 free(workq);
226}
227
228/** Initializes workq structure without creating any workers. */
229static void workq_preinit(struct work_queue *workq, const char *name)
230{
231#ifdef CONFIG_DEBUG
232 workq->cookie = WORKQ_MAGIC;
233#endif
234
235 irq_spinlock_initialize(&workq->lock, name);
236 condvar_initialize(&workq->activate_worker);
237
238 list_initialize(&workq->queue);
239 list_initialize(&workq->workers);
240
241 workq->item_cnt = 0;
242 workq->stopping = false;
243 workq->name = name;
244
245 workq->cur_worker_cnt = 1;
246 workq->idle_worker_cnt = 0;
247 workq->activate_pending = 0;
248 workq->blocked_worker_cnt = 0;
249
250 workq->pending_op_cnt = 0;
251 link_initialize(&workq->nb_link);
252}
253
254/** Initializes a work queue. Returns true if successful.
255 *
256 * Before destroying a work queue it must be stopped via
257 * workq_stop().
258 */
259bool workq_init(struct work_queue *workq, const char *name)
260{
261 workq_preinit(workq, name);
262 return add_worker(workq);
263}
264
265/** Add a new worker thread. Returns false if the thread could not be created. */
266static bool add_worker(struct work_queue *workq)
267{
268 assert(!workq_corrupted(workq));
269
270 thread_t *thread = thread_create(worker_thread, workq, TASK,
271 THREAD_FLAG_NONE, workq->name);
272
273 if (!thread) {
274 irq_spinlock_lock(&workq->lock, true);
275
276 /* cur_worker_cnt proactively increased in signal_worker_logic() .*/
277 assert(0 < workq->cur_worker_cnt);
278 --workq->cur_worker_cnt;
279
280 irq_spinlock_unlock(&workq->lock, true);
281 return false;
282 }
283
284 /* Respect lock ordering. */
285 irq_spinlock_lock(&thread->lock, true);
286 irq_spinlock_lock(&workq->lock, false);
287
288 bool success;
289
290 if (!workq->stopping) {
291 success = true;
292
293 /* Try to distribute workers among cpus right away. */
294 unsigned int cpu_id = (workq->cur_worker_cnt) % config.cpu_active;
295
296 if (!cpus[cpu_id].active)
297 cpu_id = CPU->id;
298
299 thread->workq = workq;
300 thread->cpu = &cpus[cpu_id];
301 thread->workq_blocked = false;
302 thread->workq_idling = false;
303 link_initialize(&thread->workq_link);
304
305 list_append(&thread->workq_link, &workq->workers);
306 } else {
307 /*
308 * Work queue is shutting down - we must not add the worker
309 * and we cannot destroy it without ready-ing it. Mark it
310 * interrupted so the worker exits right away without even
311 * touching workq.
312 */
313 success = false;
314
315 /* cur_worker_cnt proactively increased in signal_worker() .*/
316 assert(0 < workq->cur_worker_cnt);
317 --workq->cur_worker_cnt;
318 }
319
320 irq_spinlock_unlock(&workq->lock, false);
321 irq_spinlock_unlock(&thread->lock, true);
322
323 if (!success) {
324 thread_interrupt(thread);
325 }
326
327 thread_ready(thread);
328
329 return success;
330}
331
332/** Shuts down the work queue. Waits for all pending work items to complete.
333 *
334 * workq_stop() may only be run once.
335 */
336void workq_stop(struct work_queue *workq)
337{
338 assert(!workq_corrupted(workq));
339
340 interrupt_workers(workq);
341 wait_for_workers(workq);
342}
343
344/** Notifies worker threads the work queue is shutting down. */
345static void interrupt_workers(struct work_queue *workq)
346{
347 irq_spinlock_lock(&workq->lock, true);
348
349 /* workq_stop() may only be called once. */
350 assert(!workq->stopping);
351 workq->stopping = true;
352
353 /* Respect lock ordering - do not hold workq->lock during broadcast. */
354 irq_spinlock_unlock(&workq->lock, true);
355
356 condvar_broadcast(&workq->activate_worker);
357}
358
359/** Waits for all worker threads to exit. */
360static void wait_for_workers(struct work_queue *workq)
361{
362 assert(!PREEMPTION_DISABLED);
363
364 irq_spinlock_lock(&workq->lock, true);
365
366 list_foreach_safe(workq->workers, cur_worker, next_worker) {
367 thread_t *worker = list_get_instance(cur_worker, thread_t, workq_link);
368 list_remove(cur_worker);
369
370 /* Wait without the lock. */
371 irq_spinlock_unlock(&workq->lock, true);
372
373 thread_join(worker);
374 thread_detach(worker);
375
376 irq_spinlock_lock(&workq->lock, true);
377 }
378
379 assert(list_empty(&workq->workers));
380
381 /* Wait for deferred add_worker_op(), signal_worker_op() to finish. */
382 while (0 < workq->cur_worker_cnt || 0 < workq->pending_op_cnt) {
383 irq_spinlock_unlock(&workq->lock, true);
384
385 scheduler();
386
387 irq_spinlock_lock(&workq->lock, true);
388 }
389
390 irq_spinlock_unlock(&workq->lock, true);
391}
392
393/** Queues a function into the global wait queue without blocking.
394 *
395 * See workq_enqueue_noblock() for more details.
396 */
397bool workq_global_enqueue_noblock(work_t *work_item, work_func_t func)
398{
399 return workq_enqueue_noblock(&g_work_queue, work_item, func);
400}
401
402/** Queues a function into the global wait queue; may block.
403 *
404 * See workq_enqueue() for more details.
405 */
406bool workq_global_enqueue(work_t *work_item, work_func_t func)
407{
408 return workq_enqueue(&g_work_queue, work_item, func);
409}
410
411/** Adds a function to be invoked in a separate thread without blocking.
412 *
413 * workq_enqueue_noblock() is guaranteed not to block. It is safe
414 * to invoke from interrupt handlers.
415 *
416 * Consider using workq_enqueue() instead if at all possible. Otherwise,
417 * your work item may have to wait for previously enqueued sleeping
418 * work items to complete if you are unlucky.
419 *
420 * @param workq Work queue where to queue the work item.
421 * @param work_item Work item bookkeeping structure. Must be valid
422 * until func() is entered.
423 * @param func User supplied function to invoke in a worker thread.
424
425 * @return false if work queue is shutting down; function is not
426 * queued for further processing.
427 * @return true Otherwise. func() will be invoked in a separate thread.
428 */
429bool workq_enqueue_noblock(struct work_queue *workq, work_t *work_item,
430 work_func_t func)
431{
432 return _workq_enqueue(workq, work_item, func, false);
433}
434
435/** Adds a function to be invoked in a separate thread; may block.
436 *
437 * While the workq_enqueue() is unlikely to block, it may do so if too
438 * many previous work items blocked sleeping.
439 *
440 * @param workq Work queue where to queue the work item.
441 * @param work_item Work item bookkeeping structure. Must be valid
442 * until func() is entered.
443 * @param func User supplied function to invoke in a worker thread.
444
445 * @return false if work queue is shutting down; function is not
446 * queued for further processing.
447 * @return true Otherwise. func() will be invoked in a separate thread.
448 */
449bool workq_enqueue(struct work_queue *workq, work_t *work_item, work_func_t func)
450{
451 return _workq_enqueue(workq, work_item, func, true);
452}
453
454/** Adds a work item that will be processed by a separate worker thread.
455 *
456 * func() will be invoked in another kernel thread and may block.
457 *
458 * Prefer to call _workq_enqueue() with can_block set. Otherwise
459 * your work item may have to wait for sleeping work items to complete.
460 * If all worker threads are blocked/sleeping a new worker thread cannot
461 * be create without can_block set because creating a thread might
462 * block due to low memory conditions.
463 *
464 * @param workq Work queue where to queue the work item.
465 * @param work_item Work item bookkeeping structure. Must be valid
466 * until func() is entered.
467 * @param func User supplied function to invoke in a worker thread.
468 * @param can_block May adding this work item block?
469
470 * @return false if work queue is shutting down; function is not
471 * queued for further processing.
472 * @return true Otherwise.
473 */
474static int _workq_enqueue(struct work_queue *workq, work_t *work_item,
475 work_func_t func, bool can_block)
476{
477 assert(!workq_corrupted(workq));
478
479 bool success = true;
480 signal_op_t signal_op = NULL;
481
482 irq_spinlock_lock(&workq->lock, true);
483
484 if (workq->stopping) {
485 success = false;
486 } else {
487 init_work_item(work_item, func);
488 list_append(&work_item->queue_link, &workq->queue);
489 ++workq->item_cnt;
490 success = true;
491
492 if (!booting) {
493 signal_op = signal_worker_logic(workq, can_block);
494 } else {
495 /*
496 * During boot there are no workers to signal. Just queue
497 * the work and let future workers take care of it.
498 */
499 }
500 }
501
502 irq_spinlock_unlock(&workq->lock, true);
503
504 if (signal_op) {
505 signal_op(workq);
506 }
507
508 return success;
509}
510
511/** Prepare an item to be added to the work item queue. */
512static void init_work_item(work_t *work_item, work_func_t func)
513{
514#ifdef CONFIG_DEBUG
515 work_item->cookie = WORK_ITEM_MAGIC;
516#endif
517
518 link_initialize(&work_item->queue_link);
519 work_item->func = func;
520}
521
522/** Returns the number of workers running work func() that are not blocked. */
523static size_t active_workers_now(struct work_queue *workq)
524{
525 assert(irq_spinlock_locked(&workq->lock));
526
527 /* Workers blocked are sleeping in the work function (ie not idle). */
528 assert(workq->blocked_worker_cnt <= workq->cur_worker_cnt);
529 /* Idle workers are waiting for more work to arrive in condvar_wait. */
530 assert(workq->idle_worker_cnt <= workq->cur_worker_cnt);
531
532 /* Idle + blocked workers == sleeping worker threads. */
533 size_t sleeping_workers = workq->blocked_worker_cnt + workq->idle_worker_cnt;
534
535 assert(sleeping_workers <= workq->cur_worker_cnt);
536 /* Workers pending activation are idle workers not yet given a time slice. */
537 assert(workq->activate_pending <= workq->idle_worker_cnt);
538
539 /*
540 * Workers actively running the work func() this very moment and
541 * are neither blocked nor idle. Exclude ->activate_pending workers
542 * since they will run their work func() once they get a time slice
543 * and are not running it right now.
544 */
545 return workq->cur_worker_cnt - sleeping_workers;
546}
547
548/**
549 * Returns the number of workers that are running or are about to run work
550 * func() and that are not blocked.
551 */
552static size_t active_workers(struct work_queue *workq)
553{
554 assert(irq_spinlock_locked(&workq->lock));
555
556 /*
557 * Workers actively running the work func() and are neither blocked nor
558 * idle. ->activate_pending workers will run their work func() once they
559 * get a time slice after waking from a condvar wait, so count them
560 * as well.
561 */
562 return active_workers_now(workq) + workq->activate_pending;
563}
564
565static void add_worker_noblock_op(struct work_queue *workq)
566{
567 condvar_signal(&nonblock_adder.req_cv);
568}
569
570static void add_worker_op(struct work_queue *workq)
571{
572 add_worker(workq);
573}
574
575static void signal_worker_op(struct work_queue *workq)
576{
577 assert(!workq_corrupted(workq));
578
579 condvar_signal(&workq->activate_worker);
580
581 irq_spinlock_lock(&workq->lock, true);
582 assert(0 < workq->pending_op_cnt);
583 --workq->pending_op_cnt;
584 irq_spinlock_unlock(&workq->lock, true);
585}
586
587/** Determines how to signal workers if at all.
588 *
589 * @param workq Work queue where a new work item was queued.
590 * @param can_block True if we may block while signaling a worker or creating
591 * a new worker.
592 *
593 * @return Function that will notify workers or NULL if no action is needed.
594 */
595static signal_op_t signal_worker_logic(struct work_queue *workq, bool can_block)
596{
597 assert(!workq_corrupted(workq));
598 assert(irq_spinlock_locked(&workq->lock));
599
600 /* Only signal workers if really necessary. */
601 signal_op_t signal_op = NULL;
602
603 /*
604 * Workers actively running the work func() and neither blocked nor idle.
605 * Including ->activate_pending workers that will run their work func()
606 * once they get a time slice.
607 */
608 size_t active = active_workers(workq);
609 /* Max total allowed number of work items queued for active workers. */
610 size_t max_load = active * max_items_per_worker;
611
612 /* Active workers are getting overwhelmed - activate another. */
613 if (max_load < workq->item_cnt) {
614
615 size_t remaining_idle =
616 workq->idle_worker_cnt - workq->activate_pending;
617
618 /* Idle workers still exist - activate one. */
619 if (remaining_idle > 0) {
620 /*
621 * Directly changing idle_worker_cnt here would not allow
622 * workers to recognize spurious wake-ups. Change
623 * activate_pending instead.
624 */
625 ++workq->activate_pending;
626 ++workq->pending_op_cnt;
627 signal_op = signal_worker_op;
628 } else {
629 /* No idle workers remain. Request that a new one be created. */
630 bool need_worker = (active < max_concurrent_workers)
631 && (workq->cur_worker_cnt < max_worker_cnt);
632
633 if (need_worker && can_block) {
634 signal_op = add_worker_op;
635 /*
636 * It may take some time to actually create the worker.
637 * We don't want to swamp the thread pool with superfluous
638 * worker creation requests so pretend it was already
639 * created and proactively increase the worker count.
640 */
641 ++workq->cur_worker_cnt;
642 }
643
644 /*
645 * We cannot create a new worker but we need one desperately
646 * because all workers are blocked in their work functions.
647 */
648 if (need_worker && !can_block && 0 == active) {
649 assert(0 == workq->idle_worker_cnt);
650
651 irq_spinlock_lock(&nonblock_adder.lock, true);
652
653 if (nonblock_adder.thread && !link_used(&workq->nb_link)) {
654 signal_op = add_worker_noblock_op;
655 ++workq->cur_worker_cnt;
656 list_append(&workq->nb_link, &nonblock_adder.work_queues);
657 }
658
659 irq_spinlock_unlock(&nonblock_adder.lock, true);
660 }
661 }
662 } else {
663 /*
664 * There are enough active/running workers to process the queue.
665 * No need to signal/activate any new workers.
666 */
667 signal_op = NULL;
668 }
669
670 return signal_op;
671}
672
673/** Executes queued work items. */
674static void worker_thread(void *arg)
675{
676 /*
677 * The thread has been created after the work queue was ordered to stop.
678 * Do not access the work queue and return immediately.
679 */
680 if (thread_interrupted(THREAD)) {
681 thread_detach(THREAD);
682 return;
683 }
684
685 assert(arg != NULL);
686
687 struct work_queue *workq = arg;
688 work_t *work_item;
689
690 while (dequeue_work(workq, &work_item)) {
691 /* Copy the func field so func() can safely free work_item. */
692 work_func_t func = work_item->func;
693
694 func(work_item);
695 }
696}
697
698/** Waits and retrieves a work item. Returns false if the worker should exit. */
699static bool dequeue_work(struct work_queue *workq, work_t **pwork_item)
700{
701 assert(!workq_corrupted(workq));
702
703 irq_spinlock_lock(&workq->lock, true);
704
705 /* Check if we should exit if load is low. */
706 if (!workq->stopping && worker_unnecessary(workq)) {
707 /* There are too many workers for this load. Exit. */
708 assert(0 < workq->cur_worker_cnt);
709 --workq->cur_worker_cnt;
710 list_remove(&THREAD->workq_link);
711 irq_spinlock_unlock(&workq->lock, true);
712
713 thread_detach(THREAD);
714 return false;
715 }
716
717 bool stop = false;
718
719 /* Wait for work to arrive. */
720 while (list_empty(&workq->queue) && !workq->stopping) {
721 cv_wait(workq);
722
723 if (0 < workq->activate_pending)
724 --workq->activate_pending;
725 }
726
727 /* Process remaining work even if requested to stop. */
728 if (!list_empty(&workq->queue)) {
729 link_t *work_link = list_first(&workq->queue);
730 *pwork_item = list_get_instance(work_link, work_t, queue_link);
731
732#ifdef CONFIG_DEBUG
733 assert(!work_item_corrupted(*pwork_item));
734 (*pwork_item)->cookie = 0;
735#endif
736 list_remove(work_link);
737 --workq->item_cnt;
738
739 stop = false;
740 } else {
741 /* Requested to stop and no more work queued. */
742 assert(workq->stopping);
743 --workq->cur_worker_cnt;
744 stop = true;
745 }
746
747 irq_spinlock_unlock(&workq->lock, true);
748
749 return !stop;
750}
751
752/** Returns true if for the given load there are too many workers. */
753static bool worker_unnecessary(struct work_queue *workq)
754{
755 assert(irq_spinlock_locked(&workq->lock));
756
757 /* No work is pending. We don't need too many idle threads. */
758 if (list_empty(&workq->queue)) {
759 /* There are too many idle workers. Exit. */
760 return (min_worker_cnt <= workq->idle_worker_cnt);
761 } else {
762 /*
763 * There is work but we are swamped with too many active workers
764 * that were woken up from sleep at around the same time. We
765 * don't need another worker fighting for cpu time.
766 */
767 size_t active = active_workers_now(workq);
768 return (max_concurrent_workers < active);
769 }
770}
771
772/** Waits for a signal to activate_worker. Thread marked idle while waiting. */
773static void cv_wait(struct work_queue *workq)
774{
775 ++workq->idle_worker_cnt;
776 THREAD->workq_idling = true;
777
778 /* Ignore lock ordering just here. */
779 assert(irq_spinlock_locked(&workq->lock));
780
781 _condvar_wait_timeout_irq_spinlock(&workq->activate_worker,
782 &workq->lock, SYNCH_NO_TIMEOUT, SYNCH_FLAGS_NONE);
783
784 assert(!workq_corrupted(workq));
785 assert(irq_spinlock_locked(&workq->lock));
786
787 THREAD->workq_idling = false;
788 --workq->idle_worker_cnt;
789}
790
791
792/** Invoked from thread_ready() right before the thread is woken up. */
793void workq_before_thread_is_ready(thread_t *thread)
794{
795 assert(thread);
796 assert(irq_spinlock_locked(&thread->lock));
797
798 /* Worker's work func() is about to wake up from sleeping. */
799 if (thread->workq && thread->workq_blocked) {
800 /* Must be blocked in user work func() and not be waiting for work. */
801 assert(!thread->workq_idling);
802 assert(thread->state == Sleeping);
803 assert(THREAD != thread);
804 assert(!workq_corrupted(thread->workq));
805
806 /* Protected by thread->lock */
807 thread->workq_blocked = false;
808
809 irq_spinlock_lock(&thread->workq->lock, true);
810 --thread->workq->blocked_worker_cnt;
811 irq_spinlock_unlock(&thread->workq->lock, true);
812 }
813}
814
815/** Invoked from scheduler() before switching away from a thread. */
816void workq_after_thread_ran(void)
817{
818 assert(THREAD);
819 assert(irq_spinlock_locked(&THREAD->lock));
820
821 /* Worker's work func() is about to sleep/block. */
822 if (THREAD->workq && THREAD->state == Sleeping && !THREAD->workq_idling) {
823 assert(!THREAD->workq_blocked);
824 assert(!workq_corrupted(THREAD->workq));
825
826 THREAD->workq_blocked = true;
827
828 irq_spinlock_lock(&THREAD->workq->lock, false);
829
830 ++THREAD->workq->blocked_worker_cnt;
831
832 bool can_block = false;
833 signal_op_t op = signal_worker_logic(THREAD->workq, can_block);
834
835 irq_spinlock_unlock(&THREAD->workq->lock, false);
836
837 if (op) {
838 assert(add_worker_noblock_op == op || signal_worker_op == op);
839 op(THREAD->workq);
840 }
841 }
842}
843
844/** Prints stats of the work queue to the kernel console. */
845void workq_print_info(struct work_queue *workq)
846{
847 irq_spinlock_lock(&workq->lock, true);
848
849 size_t total = workq->cur_worker_cnt;
850 size_t blocked = workq->blocked_worker_cnt;
851 size_t idle = workq->idle_worker_cnt;
852 size_t active = active_workers(workq);
853 size_t items = workq->item_cnt;
854 bool stopping = workq->stopping;
855 bool worker_surplus = worker_unnecessary(workq);
856 const char *load_str = worker_surplus ? "decreasing" :
857 (0 < workq->activate_pending) ? "increasing" : "stable";
858
859 irq_spinlock_unlock(&workq->lock, true);
860
861 printf(
862 "Configuration: max_worker_cnt=%zu, min_worker_cnt=%zu,\n"
863 " max_concurrent_workers=%zu, max_items_per_worker=%zu\n"
864 "Workers: %zu\n"
865 "Active: %zu (workers currently processing work)\n"
866 "Blocked: %zu (work functions sleeping/blocked)\n"
867 "Idle: %zu (idle workers waiting for more work)\n"
868 "Items: %zu (queued not yet dispatched work)\n"
869 "Stopping: %d\n"
870 "Load: %s\n",
871 max_worker_cnt, min_worker_cnt,
872 max_concurrent_workers, max_items_per_worker,
873 total,
874 active,
875 blocked,
876 idle,
877 items,
878 stopping,
879 load_str
880 );
881}
882
883/** Prints stats of the global work queue. */
884void workq_global_print_info(void)
885{
886 workq_print_info(&g_work_queue);
887}
888
889
890static bool dequeue_add_req(nonblock_adder_t *info, struct work_queue **pworkq)
891{
892 bool stop = false;
893
894 irq_spinlock_lock(&info->lock, true);
895
896 while (list_empty(&info->work_queues) && !stop) {
897 errno_t ret = _condvar_wait_timeout_irq_spinlock(&info->req_cv,
898 &info->lock, SYNCH_NO_TIMEOUT, SYNCH_FLAGS_INTERRUPTIBLE);
899
900 stop = (ret == EINTR);
901 }
902
903 if (!stop) {
904 *pworkq = list_get_instance(list_first(&info->work_queues),
905 struct work_queue, nb_link);
906
907 assert(!workq_corrupted(*pworkq));
908
909 list_remove(&(*pworkq)->nb_link);
910 }
911
912 irq_spinlock_unlock(&info->lock, true);
913
914 return !stop;
915}
916
917static void thr_nonblock_add_worker(void *arg)
918{
919 nonblock_adder_t *info = arg;
920 struct work_queue *workq;
921
922 while (dequeue_add_req(info, &workq)) {
923 add_worker(workq);
924 }
925}
926
927
928static void nonblock_init(void)
929{
930 irq_spinlock_initialize(&nonblock_adder.lock, "kworkq-nb.lock");
931 condvar_initialize(&nonblock_adder.req_cv);
932 list_initialize(&nonblock_adder.work_queues);
933
934 nonblock_adder.thread = thread_create(thr_nonblock_add_worker,
935 &nonblock_adder, TASK, THREAD_FLAG_NONE, "kworkq-nb");
936
937 if (nonblock_adder.thread) {
938 thread_ready(nonblock_adder.thread);
939 } else {
940 /*
941 * We won't be able to add workers without blocking if all workers
942 * sleep, but at least boot the system.
943 */
944 printf("Failed to create kworkq-nb. Sleeping work may stall the workq.\n");
945 }
946}
947
948#ifdef CONFIG_DEBUG
949/** Returns true if the workq is definitely corrupted; false if not sure.
950 *
951 * Can be used outside of any locks.
952 */
953static bool workq_corrupted(struct work_queue *workq)
954{
955 /*
956 * Needed to make the most current cookie value set by workq_preinit()
957 * visible even if we access the workq right after it is created but
958 * on a different cpu. Otherwise, workq_corrupted() would not work
959 * outside a lock.
960 */
961 memory_barrier();
962 return NULL == workq || workq->cookie != WORKQ_MAGIC;
963}
964
965/** Returns true if the work_item is definitely corrupted; false if not sure.
966 *
967 * Must be used with the work queue protecting spinlock locked.
968 */
969static bool work_item_corrupted(work_t *work_item)
970{
971 return NULL == work_item || work_item->cookie != WORK_ITEM_MAGIC;
972}
973#endif
974
975/** @}
976 */
Note: See TracBrowser for help on using the repository browser.