source: mainline/kernel/generic/src/synch/workqueue.c@ df96271

lfn serial ticket/834-toolchain-update topic/msim-upgrade topic/simplify-dev-export
Last change on this file since df96271 was 7473807, checked in by Jakub Jermar <jakub@…>, 8 years ago

Use atomic malloc allocations

We can safely use atomic allocations in places that use the non-failing
version of malloc(), but test the return value anyway. And also in some
places that can afford to return failure but did not because of comfort.

  • Property mode set to 100644
File size: 27.1 KB
Line 
1/*
2 * Copyright (c) 2012 Adam Hraska
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 *
9 * - Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 * - Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
14 * - The name of the author may not be used to endorse or promote products
15 * derived from this software without specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
18 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
19 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
20 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
21 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
22 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
26 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27 */
28
29/** @addtogroup generic
30 * @{
31 */
32
33/**
34 * @file
35 * @brief Work queue/thread pool that automatically adjusts its size
36 * depending on the current load. Queued work functions may sleep..
37 */
38
39#include <assert.h>
40#include <errno.h>
41#include <synch/workqueue.h>
42#include <synch/spinlock.h>
43#include <synch/condvar.h>
44#include <synch/mutex.h>
45#include <proc/thread.h>
46#include <config.h>
47#include <arch.h>
48#include <cpu.h>
49#include <macros.h>
50
51#define WORKQ_MAGIC 0xf00c1333U
52#define WORK_ITEM_MAGIC 0xfeec1777U
53
54
55struct work_queue {
56 /*
57 * Protects everything except activate_worker.
58 * Must be acquired after any thread->locks.
59 */
60 IRQ_SPINLOCK_DECLARE(lock);
61
62 /* Activates a worker if new work arrives or if shutting down the queue. */
63 condvar_t activate_worker;
64
65 /* Queue of work_items ready to be dispatched. */
66 list_t queue;
67
68 /* List of worker threads. */
69 list_t workers;
70
71 /* Number of work items queued. */
72 size_t item_cnt;
73
74 /* Indicates the work queue is shutting down. */
75 bool stopping;
76 const char *name;
77
78 /* Total number of created worker threads. */
79 size_t cur_worker_cnt;
80 /* Number of workers waiting for work to arrive. */
81 size_t idle_worker_cnt;
82 /* Number of idle workers signaled that have not yet been woken up. */
83 size_t activate_pending;
84 /* Number of blocked workers sleeping in work func() (ie not idle). */
85 size_t blocked_worker_cnt;
86
87 /* Number of pending signal_worker_op() operations. */
88 size_t pending_op_cnt;
89
90 link_t nb_link;
91
92#ifdef CONFIG_DEBUG
93 /* Magic cookie for integrity checks. Immutable. Accessed without lock. */
94 uint32_t cookie;
95#endif
96};
97
98
99/** Min number of idle workers to keep. */
100static size_t min_worker_cnt;
101/** Max total number of workers - be it blocked, idle, or active. */
102static size_t max_worker_cnt;
103/** Max number of concurrently running active workers, ie not blocked nor idle. */
104static size_t max_concurrent_workers;
105/** Max number of work items per active worker before a new worker is activated.*/
106static const size_t max_items_per_worker = 8;
107
108/** System wide work queue. */
109static struct work_queue g_work_queue;
110
111static int booting = true;
112
113
114typedef struct {
115 IRQ_SPINLOCK_DECLARE(lock);
116 condvar_t req_cv;
117 thread_t *thread;
118 list_t work_queues;
119} nonblock_adder_t;
120
121static nonblock_adder_t nonblock_adder;
122
123
124
125/** Typedef a worker thread signaling operation prototype. */
126typedef void (*signal_op_t)(struct work_queue *workq);
127
128
129/* Fwd decl. */
130static void workq_preinit(struct work_queue *workq, const char *name);
131static bool add_worker(struct work_queue *workq);
132static void interrupt_workers(struct work_queue *workq);
133static void wait_for_workers(struct work_queue *workq);
134static int _workq_enqueue(struct work_queue *workq, work_t *work_item,
135 work_func_t func, bool can_block);
136static void init_work_item(work_t *work_item, work_func_t func);
137static signal_op_t signal_worker_logic(struct work_queue *workq, bool can_block);
138static void worker_thread(void *arg);
139static bool dequeue_work(struct work_queue *workq, work_t **pwork_item);
140static bool worker_unnecessary(struct work_queue *workq);
141static void cv_wait(struct work_queue *workq);
142static void nonblock_init(void);
143
144#ifdef CONFIG_DEBUG
145static bool workq_corrupted(struct work_queue *workq);
146static bool work_item_corrupted(work_t *work_item);
147#endif
148
149/** Creates worker thread for the system-wide worker queue. */
150void workq_global_worker_init(void)
151{
152 /*
153 * No need for additional synchronization. Stores to word-sized
154 * variables are atomic and the change will eventually propagate.
155 * Moreover add_worker() includes the necessary memory barriers
156 * in spinlock lock/unlock().
157 */
158 booting = false;
159
160 nonblock_init();
161
162 if (!add_worker(&g_work_queue))
163 panic("Could not create a single global work queue worker!\n");
164
165}
166
167/** Initializes the system wide work queue and support for other work queues. */
168void workq_global_init(void)
169{
170 /* Keep idle workers on 1/4-th of cpus, but at least 2 threads. */
171 min_worker_cnt = max(2, config.cpu_count / 4);
172 /* Allow max 8 sleeping work items per cpu. */
173 max_worker_cnt = max(32, 8 * config.cpu_count);
174 /* Maximum concurrency without slowing down the system. */
175 max_concurrent_workers = max(2, config.cpu_count);
176
177 workq_preinit(&g_work_queue, "kworkq");
178}
179
180/** Stops the system global work queue and waits for all work items to complete.*/
181void workq_global_stop(void)
182{
183 workq_stop(&g_work_queue);
184}
185
186/** Creates and initializes a work queue. Returns NULL upon failure. */
187struct work_queue *workq_create(const char *name)
188{
189 struct work_queue *workq = malloc(sizeof(struct work_queue),
190 FRAME_ATOMIC);
191 if (!workq)
192 return NULL;
193
194 if (workq) {
195 if (workq_init(workq, name)) {
196 assert(!workq_corrupted(workq));
197 return workq;
198 }
199
200 free(workq);
201 }
202
203 return NULL;
204}
205
206/** Frees work queue resources and stops it if it had not been done so already.*/
207void workq_destroy(struct work_queue *workq)
208{
209 assert(!workq_corrupted(workq));
210
211 irq_spinlock_lock(&workq->lock, true);
212 bool stopped = workq->stopping;
213#ifdef CONFIG_DEBUG
214 size_t running_workers = workq->cur_worker_cnt;
215#endif
216 irq_spinlock_unlock(&workq->lock, true);
217
218 if (!stopped) {
219 workq_stop(workq);
220 } else {
221 assert(0 == running_workers);
222 }
223
224#ifdef CONFIG_DEBUG
225 workq->cookie = 0;
226#endif
227
228 free(workq);
229}
230
231/** Initializes workq structure without creating any workers. */
232static void workq_preinit(struct work_queue *workq, const char *name)
233{
234#ifdef CONFIG_DEBUG
235 workq->cookie = WORKQ_MAGIC;
236#endif
237
238 irq_spinlock_initialize(&workq->lock, name);
239 condvar_initialize(&workq->activate_worker);
240
241 list_initialize(&workq->queue);
242 list_initialize(&workq->workers);
243
244 workq->item_cnt = 0;
245 workq->stopping = false;
246 workq->name = name;
247
248 workq->cur_worker_cnt = 1;
249 workq->idle_worker_cnt = 0;
250 workq->activate_pending = 0;
251 workq->blocked_worker_cnt = 0;
252
253 workq->pending_op_cnt = 0;
254 link_initialize(&workq->nb_link);
255}
256
257/** Initializes a work queue. Returns true if successful.
258 *
259 * Before destroying a work queue it must be stopped via
260 * workq_stop().
261 */
262bool workq_init(struct work_queue *workq, const char *name)
263{
264 workq_preinit(workq, name);
265 return add_worker(workq);
266}
267
268/** Add a new worker thread. Returns false if the thread could not be created. */
269static bool add_worker(struct work_queue *workq)
270{
271 assert(!workq_corrupted(workq));
272
273 thread_t *thread = thread_create(worker_thread, workq, TASK,
274 THREAD_FLAG_NONE, workq->name);
275
276 if (!thread) {
277 irq_spinlock_lock(&workq->lock, true);
278
279 /* cur_worker_cnt proactively increased in signal_worker_logic() .*/
280 assert(0 < workq->cur_worker_cnt);
281 --workq->cur_worker_cnt;
282
283 irq_spinlock_unlock(&workq->lock, true);
284 return false;
285 }
286
287 /* Respect lock ordering. */
288 irq_spinlock_lock(&thread->lock, true);
289 irq_spinlock_lock(&workq->lock, false);
290
291 bool success;
292
293 if (!workq->stopping) {
294 success = true;
295
296 /* Try to distribute workers among cpus right away. */
297 unsigned int cpu_id = (workq->cur_worker_cnt) % config.cpu_active;
298
299 if (!cpus[cpu_id].active)
300 cpu_id = CPU->id;
301
302 thread->workq = workq;
303 thread->cpu = &cpus[cpu_id];
304 thread->workq_blocked = false;
305 thread->workq_idling = false;
306 link_initialize(&thread->workq_link);
307
308 list_append(&thread->workq_link, &workq->workers);
309 } else {
310 /*
311 * Work queue is shutting down - we must not add the worker
312 * and we cannot destroy it without ready-ing it. Mark it
313 * interrupted so the worker exits right away without even
314 * touching workq.
315 */
316 success = false;
317
318 /* cur_worker_cnt proactively increased in signal_worker() .*/
319 assert(0 < workq->cur_worker_cnt);
320 --workq->cur_worker_cnt;
321 }
322
323 irq_spinlock_unlock(&workq->lock, false);
324 irq_spinlock_unlock(&thread->lock, true);
325
326 if (!success) {
327 thread_interrupt(thread);
328 }
329
330 thread_ready(thread);
331
332 return success;
333}
334
335/** Shuts down the work queue. Waits for all pending work items to complete.
336 *
337 * workq_stop() may only be run once.
338 */
339void workq_stop(struct work_queue *workq)
340{
341 assert(!workq_corrupted(workq));
342
343 interrupt_workers(workq);
344 wait_for_workers(workq);
345}
346
347/** Notifies worker threads the work queue is shutting down. */
348static void interrupt_workers(struct work_queue *workq)
349{
350 irq_spinlock_lock(&workq->lock, true);
351
352 /* workq_stop() may only be called once. */
353 assert(!workq->stopping);
354 workq->stopping = true;
355
356 /* Respect lock ordering - do not hold workq->lock during broadcast. */
357 irq_spinlock_unlock(&workq->lock, true);
358
359 condvar_broadcast(&workq->activate_worker);
360}
361
362/** Waits for all worker threads to exit. */
363static void wait_for_workers(struct work_queue *workq)
364{
365 assert(!PREEMPTION_DISABLED);
366
367 irq_spinlock_lock(&workq->lock, true);
368
369 list_foreach_safe(workq->workers, cur_worker, next_worker) {
370 thread_t *worker = list_get_instance(cur_worker, thread_t, workq_link);
371 list_remove(cur_worker);
372
373 /* Wait without the lock. */
374 irq_spinlock_unlock(&workq->lock, true);
375
376 thread_join(worker);
377 thread_detach(worker);
378
379 irq_spinlock_lock(&workq->lock, true);
380 }
381
382 assert(list_empty(&workq->workers));
383
384 /* Wait for deferred add_worker_op(), signal_worker_op() to finish. */
385 while (0 < workq->cur_worker_cnt || 0 < workq->pending_op_cnt) {
386 irq_spinlock_unlock(&workq->lock, true);
387
388 scheduler();
389
390 irq_spinlock_lock(&workq->lock, true);
391 }
392
393 irq_spinlock_unlock(&workq->lock, true);
394}
395
396/** Queues a function into the global wait queue without blocking.
397 *
398 * See workq_enqueue_noblock() for more details.
399 */
400bool workq_global_enqueue_noblock(work_t *work_item, work_func_t func)
401{
402 return workq_enqueue_noblock(&g_work_queue, work_item, func);
403}
404
405/** Queues a function into the global wait queue; may block.
406 *
407 * See workq_enqueue() for more details.
408 */
409bool workq_global_enqueue(work_t *work_item, work_func_t func)
410{
411 return workq_enqueue(&g_work_queue, work_item, func);
412}
413
414/** Adds a function to be invoked in a separate thread without blocking.
415 *
416 * workq_enqueue_noblock() is guaranteed not to block. It is safe
417 * to invoke from interrupt handlers.
418 *
419 * Consider using workq_enqueue() instead if at all possible. Otherwise,
420 * your work item may have to wait for previously enqueued sleeping
421 * work items to complete if you are unlucky.
422 *
423 * @param workq Work queue where to queue the work item.
424 * @param work_item Work item bookkeeping structure. Must be valid
425 * until func() is entered.
426 * @param func User supplied function to invoke in a worker thread.
427
428 * @return false if work queue is shutting down; function is not
429 * queued for further processing.
430 * @return true Otherwise. func() will be invoked in a separate thread.
431 */
432bool workq_enqueue_noblock(struct work_queue *workq, work_t *work_item,
433 work_func_t func)
434{
435 return _workq_enqueue(workq, work_item, func, false);
436}
437
438/** Adds a function to be invoked in a separate thread; may block.
439 *
440 * While the workq_enqueue() is unlikely to block, it may do so if too
441 * many previous work items blocked sleeping.
442 *
443 * @param workq Work queue where to queue the work item.
444 * @param work_item Work item bookkeeping structure. Must be valid
445 * until func() is entered.
446 * @param func User supplied function to invoke in a worker thread.
447
448 * @return false if work queue is shutting down; function is not
449 * queued for further processing.
450 * @return true Otherwise. func() will be invoked in a separate thread.
451 */
452bool workq_enqueue(struct work_queue *workq, work_t *work_item, work_func_t func)
453{
454 return _workq_enqueue(workq, work_item, func, true);
455}
456
457/** Adds a work item that will be processed by a separate worker thread.
458 *
459 * func() will be invoked in another kernel thread and may block.
460 *
461 * Prefer to call _workq_enqueue() with can_block set. Otherwise
462 * your work item may have to wait for sleeping work items to complete.
463 * If all worker threads are blocked/sleeping a new worker thread cannot
464 * be create without can_block set because creating a thread might
465 * block due to low memory conditions.
466 *
467 * @param workq Work queue where to queue the work item.
468 * @param work_item Work item bookkeeping structure. Must be valid
469 * until func() is entered.
470 * @param func User supplied function to invoke in a worker thread.
471 * @param can_block May adding this work item block?
472
473 * @return false if work queue is shutting down; function is not
474 * queued for further processing.
475 * @return true Otherwise.
476 */
477static int _workq_enqueue(struct work_queue *workq, work_t *work_item,
478 work_func_t func, bool can_block)
479{
480 assert(!workq_corrupted(workq));
481
482 bool success = true;
483 signal_op_t signal_op = NULL;
484
485 irq_spinlock_lock(&workq->lock, true);
486
487 if (workq->stopping) {
488 success = false;
489 } else {
490 init_work_item(work_item, func);
491 list_append(&work_item->queue_link, &workq->queue);
492 ++workq->item_cnt;
493 success = true;
494
495 if (!booting) {
496 signal_op = signal_worker_logic(workq, can_block);
497 } else {
498 /*
499 * During boot there are no workers to signal. Just queue
500 * the work and let future workers take care of it.
501 */
502 }
503 }
504
505 irq_spinlock_unlock(&workq->lock, true);
506
507 if (signal_op) {
508 signal_op(workq);
509 }
510
511 return success;
512}
513
514/** Prepare an item to be added to the work item queue. */
515static void init_work_item(work_t *work_item, work_func_t func)
516{
517#ifdef CONFIG_DEBUG
518 work_item->cookie = WORK_ITEM_MAGIC;
519#endif
520
521 link_initialize(&work_item->queue_link);
522 work_item->func = func;
523}
524
525/** Returns the number of workers running work func() that are not blocked. */
526static size_t active_workers_now(struct work_queue *workq)
527{
528 assert(irq_spinlock_locked(&workq->lock));
529
530 /* Workers blocked are sleeping in the work function (ie not idle). */
531 assert(workq->blocked_worker_cnt <= workq->cur_worker_cnt);
532 /* Idle workers are waiting for more work to arrive in condvar_wait. */
533 assert(workq->idle_worker_cnt <= workq->cur_worker_cnt);
534
535 /* Idle + blocked workers == sleeping worker threads. */
536 size_t sleeping_workers = workq->blocked_worker_cnt + workq->idle_worker_cnt;
537
538 assert(sleeping_workers <= workq->cur_worker_cnt);
539 /* Workers pending activation are idle workers not yet given a time slice. */
540 assert(workq->activate_pending <= workq->idle_worker_cnt);
541
542 /*
543 * Workers actively running the work func() this very moment and
544 * are neither blocked nor idle. Exclude ->activate_pending workers
545 * since they will run their work func() once they get a time slice
546 * and are not running it right now.
547 */
548 return workq->cur_worker_cnt - sleeping_workers;
549}
550
551/**
552 * Returns the number of workers that are running or are about to run work
553 * func() and that are not blocked.
554 */
555static size_t active_workers(struct work_queue *workq)
556{
557 assert(irq_spinlock_locked(&workq->lock));
558
559 /*
560 * Workers actively running the work func() and are neither blocked nor
561 * idle. ->activate_pending workers will run their work func() once they
562 * get a time slice after waking from a condvar wait, so count them
563 * as well.
564 */
565 return active_workers_now(workq) + workq->activate_pending;
566}
567
568static void add_worker_noblock_op(struct work_queue *workq)
569{
570 condvar_signal(&nonblock_adder.req_cv);
571}
572
573static void add_worker_op(struct work_queue *workq)
574{
575 add_worker(workq);
576}
577
578static void signal_worker_op(struct work_queue *workq)
579{
580 assert(!workq_corrupted(workq));
581
582 condvar_signal(&workq->activate_worker);
583
584 irq_spinlock_lock(&workq->lock, true);
585 assert(0 < workq->pending_op_cnt);
586 --workq->pending_op_cnt;
587 irq_spinlock_unlock(&workq->lock, true);
588}
589
590/** Determines how to signal workers if at all.
591 *
592 * @param workq Work queue where a new work item was queued.
593 * @param can_block True if we may block while signaling a worker or creating
594 * a new worker.
595 *
596 * @return Function that will notify workers or NULL if no action is needed.
597 */
598static signal_op_t signal_worker_logic(struct work_queue *workq, bool can_block)
599{
600 assert(!workq_corrupted(workq));
601 assert(irq_spinlock_locked(&workq->lock));
602
603 /* Only signal workers if really necessary. */
604 signal_op_t signal_op = NULL;
605
606 /*
607 * Workers actively running the work func() and neither blocked nor idle.
608 * Including ->activate_pending workers that will run their work func()
609 * once they get a time slice.
610 */
611 size_t active = active_workers(workq);
612 /* Max total allowed number of work items queued for active workers. */
613 size_t max_load = active * max_items_per_worker;
614
615 /* Active workers are getting overwhelmed - activate another. */
616 if (max_load < workq->item_cnt) {
617
618 size_t remaining_idle =
619 workq->idle_worker_cnt - workq->activate_pending;
620
621 /* Idle workers still exist - activate one. */
622 if (remaining_idle > 0) {
623 /*
624 * Directly changing idle_worker_cnt here would not allow
625 * workers to recognize spurious wake-ups. Change
626 * activate_pending instead.
627 */
628 ++workq->activate_pending;
629 ++workq->pending_op_cnt;
630 signal_op = signal_worker_op;
631 } else {
632 /* No idle workers remain. Request that a new one be created. */
633 bool need_worker = (active < max_concurrent_workers) &&
634 (workq->cur_worker_cnt < max_worker_cnt);
635
636 if (need_worker && can_block) {
637 signal_op = add_worker_op;
638 /*
639 * It may take some time to actually create the worker.
640 * We don't want to swamp the thread pool with superfluous
641 * worker creation requests so pretend it was already
642 * created and proactively increase the worker count.
643 */
644 ++workq->cur_worker_cnt;
645 }
646
647 /*
648 * We cannot create a new worker but we need one desperately
649 * because all workers are blocked in their work functions.
650 */
651 if (need_worker && !can_block && 0 == active) {
652 assert(0 == workq->idle_worker_cnt);
653
654 irq_spinlock_lock(&nonblock_adder.lock, true);
655
656 if (nonblock_adder.thread && !link_used(&workq->nb_link)) {
657 signal_op = add_worker_noblock_op;
658 ++workq->cur_worker_cnt;
659 list_append(&workq->nb_link, &nonblock_adder.work_queues);
660 }
661
662 irq_spinlock_unlock(&nonblock_adder.lock, true);
663 }
664 }
665 } else {
666 /*
667 * There are enough active/running workers to process the queue.
668 * No need to signal/activate any new workers.
669 */
670 signal_op = NULL;
671 }
672
673 return signal_op;
674}
675
676/** Executes queued work items. */
677static void worker_thread(void *arg)
678{
679 /*
680 * The thread has been created after the work queue was ordered to stop.
681 * Do not access the work queue and return immediately.
682 */
683 if (thread_interrupted(THREAD)) {
684 thread_detach(THREAD);
685 return;
686 }
687
688 assert(arg != NULL);
689
690 struct work_queue *workq = arg;
691 work_t *work_item;
692
693 while (dequeue_work(workq, &work_item)) {
694 /* Copy the func field so func() can safely free work_item. */
695 work_func_t func = work_item->func;
696
697 func(work_item);
698 }
699}
700
701/** Waits and retrieves a work item. Returns false if the worker should exit. */
702static bool dequeue_work(struct work_queue *workq, work_t **pwork_item)
703{
704 assert(!workq_corrupted(workq));
705
706 irq_spinlock_lock(&workq->lock, true);
707
708 /* Check if we should exit if load is low. */
709 if (!workq->stopping && worker_unnecessary(workq)) {
710 /* There are too many workers for this load. Exit. */
711 assert(0 < workq->cur_worker_cnt);
712 --workq->cur_worker_cnt;
713 list_remove(&THREAD->workq_link);
714 irq_spinlock_unlock(&workq->lock, true);
715
716 thread_detach(THREAD);
717 return false;
718 }
719
720 bool stop = false;
721
722 /* Wait for work to arrive. */
723 while (list_empty(&workq->queue) && !workq->stopping) {
724 cv_wait(workq);
725
726 if (0 < workq->activate_pending)
727 --workq->activate_pending;
728 }
729
730 /* Process remaining work even if requested to stop. */
731 if (!list_empty(&workq->queue)) {
732 link_t *work_link = list_first(&workq->queue);
733 *pwork_item = list_get_instance(work_link, work_t, queue_link);
734
735#ifdef CONFIG_DEBUG
736 assert(!work_item_corrupted(*pwork_item));
737 (*pwork_item)->cookie = 0;
738#endif
739 list_remove(work_link);
740 --workq->item_cnt;
741
742 stop = false;
743 } else {
744 /* Requested to stop and no more work queued. */
745 assert(workq->stopping);
746 --workq->cur_worker_cnt;
747 stop = true;
748 }
749
750 irq_spinlock_unlock(&workq->lock, true);
751
752 return !stop;
753}
754
755/** Returns true if for the given load there are too many workers. */
756static bool worker_unnecessary(struct work_queue *workq)
757{
758 assert(irq_spinlock_locked(&workq->lock));
759
760 /* No work is pending. We don't need too many idle threads. */
761 if (list_empty(&workq->queue)) {
762 /* There are too many idle workers. Exit. */
763 return (min_worker_cnt <= workq->idle_worker_cnt);
764 } else {
765 /*
766 * There is work but we are swamped with too many active workers
767 * that were woken up from sleep at around the same time. We
768 * don't need another worker fighting for cpu time.
769 */
770 size_t active = active_workers_now(workq);
771 return (max_concurrent_workers < active);
772 }
773}
774
775/** Waits for a signal to activate_worker. Thread marked idle while waiting. */
776static void cv_wait(struct work_queue *workq)
777{
778 ++workq->idle_worker_cnt;
779 THREAD->workq_idling = true;
780
781 /* Ignore lock ordering just here. */
782 assert(irq_spinlock_locked(&workq->lock));
783
784 _condvar_wait_timeout_irq_spinlock(&workq->activate_worker,
785 &workq->lock, SYNCH_NO_TIMEOUT, SYNCH_FLAGS_NONE);
786
787 assert(!workq_corrupted(workq));
788 assert(irq_spinlock_locked(&workq->lock));
789
790 THREAD->workq_idling = false;
791 --workq->idle_worker_cnt;
792}
793
794
795/** Invoked from thread_ready() right before the thread is woken up. */
796void workq_before_thread_is_ready(thread_t *thread)
797{
798 assert(thread);
799 assert(irq_spinlock_locked(&thread->lock));
800
801 /* Worker's work func() is about to wake up from sleeping. */
802 if (thread->workq && thread->workq_blocked) {
803 /* Must be blocked in user work func() and not be waiting for work. */
804 assert(!thread->workq_idling);
805 assert(thread->state == Sleeping);
806 assert(THREAD != thread);
807 assert(!workq_corrupted(thread->workq));
808
809 /* Protected by thread->lock */
810 thread->workq_blocked = false;
811
812 irq_spinlock_lock(&thread->workq->lock, true);
813 --thread->workq->blocked_worker_cnt;
814 irq_spinlock_unlock(&thread->workq->lock, true);
815 }
816}
817
818/** Invoked from scheduler() before switching away from a thread. */
819void workq_after_thread_ran(void)
820{
821 assert(THREAD);
822 assert(irq_spinlock_locked(&THREAD->lock));
823
824 /* Worker's work func() is about to sleep/block. */
825 if (THREAD->workq && THREAD->state == Sleeping && !THREAD->workq_idling) {
826 assert(!THREAD->workq_blocked);
827 assert(!workq_corrupted(THREAD->workq));
828
829 THREAD->workq_blocked = true;
830
831 irq_spinlock_lock(&THREAD->workq->lock, false);
832
833 ++THREAD->workq->blocked_worker_cnt;
834
835 bool can_block = false;
836 signal_op_t op = signal_worker_logic(THREAD->workq, can_block);
837
838 irq_spinlock_unlock(&THREAD->workq->lock, false);
839
840 if (op) {
841 assert(add_worker_noblock_op == op || signal_worker_op == op);
842 op(THREAD->workq);
843 }
844 }
845}
846
847/** Prints stats of the work queue to the kernel console. */
848void workq_print_info(struct work_queue *workq)
849{
850 irq_spinlock_lock(&workq->lock, true);
851
852 size_t total = workq->cur_worker_cnt;
853 size_t blocked = workq->blocked_worker_cnt;
854 size_t idle = workq->idle_worker_cnt;
855 size_t active = active_workers(workq);
856 size_t items = workq->item_cnt;
857 bool stopping = workq->stopping;
858 bool worker_surplus = worker_unnecessary(workq);
859 const char *load_str = worker_surplus ? "decreasing" :
860 (0 < workq->activate_pending) ? "increasing" : "stable";
861
862 irq_spinlock_unlock(&workq->lock, true);
863
864 printf(
865 "Configuration: max_worker_cnt=%zu, min_worker_cnt=%zu,\n"
866 " max_concurrent_workers=%zu, max_items_per_worker=%zu\n"
867 "Workers: %zu\n"
868 "Active: %zu (workers currently processing work)\n"
869 "Blocked: %zu (work functions sleeping/blocked)\n"
870 "Idle: %zu (idle workers waiting for more work)\n"
871 "Items: %zu (queued not yet dispatched work)\n"
872 "Stopping: %d\n"
873 "Load: %s\n",
874 max_worker_cnt, min_worker_cnt,
875 max_concurrent_workers, max_items_per_worker,
876 total,
877 active,
878 blocked,
879 idle,
880 items,
881 stopping,
882 load_str);
883}
884
885/** Prints stats of the global work queue. */
886void workq_global_print_info(void)
887{
888 workq_print_info(&g_work_queue);
889}
890
891
892static bool dequeue_add_req(nonblock_adder_t *info, struct work_queue **pworkq)
893{
894 bool stop = false;
895
896 irq_spinlock_lock(&info->lock, true);
897
898 while (list_empty(&info->work_queues) && !stop) {
899 errno_t ret = _condvar_wait_timeout_irq_spinlock(&info->req_cv,
900 &info->lock, SYNCH_NO_TIMEOUT, SYNCH_FLAGS_INTERRUPTIBLE);
901
902 stop = (ret == EINTR);
903 }
904
905 if (!stop) {
906 *pworkq = list_get_instance(list_first(&info->work_queues),
907 struct work_queue, nb_link);
908
909 assert(!workq_corrupted(*pworkq));
910
911 list_remove(&(*pworkq)->nb_link);
912 }
913
914 irq_spinlock_unlock(&info->lock, true);
915
916 return !stop;
917}
918
919static void thr_nonblock_add_worker(void *arg)
920{
921 nonblock_adder_t *info = arg;
922 struct work_queue *workq = NULL;
923
924 while (dequeue_add_req(info, &workq)) {
925 add_worker(workq);
926 }
927}
928
929
930static void nonblock_init(void)
931{
932 irq_spinlock_initialize(&nonblock_adder.lock, "kworkq-nb.lock");
933 condvar_initialize(&nonblock_adder.req_cv);
934 list_initialize(&nonblock_adder.work_queues);
935
936 nonblock_adder.thread = thread_create(thr_nonblock_add_worker,
937 &nonblock_adder, TASK, THREAD_FLAG_NONE, "kworkq-nb");
938
939 if (nonblock_adder.thread) {
940 thread_ready(nonblock_adder.thread);
941 } else {
942 /*
943 * We won't be able to add workers without blocking if all workers
944 * sleep, but at least boot the system.
945 */
946 printf("Failed to create kworkq-nb. Sleeping work may stall the workq.\n");
947 }
948}
949
950#ifdef CONFIG_DEBUG
951/** Returns true if the workq is definitely corrupted; false if not sure.
952 *
953 * Can be used outside of any locks.
954 */
955static bool workq_corrupted(struct work_queue *workq)
956{
957 /*
958 * Needed to make the most current cookie value set by workq_preinit()
959 * visible even if we access the workq right after it is created but
960 * on a different cpu. Otherwise, workq_corrupted() would not work
961 * outside a lock.
962 */
963 memory_barrier();
964 return NULL == workq || workq->cookie != WORKQ_MAGIC;
965}
966
967/** Returns true if the work_item is definitely corrupted; false if not sure.
968 *
969 * Must be used with the work queue protecting spinlock locked.
970 */
971static bool work_item_corrupted(work_t *work_item)
972{
973 return NULL == work_item || work_item->cookie != WORK_ITEM_MAGIC;
974}
975#endif
976
977/** @}
978 */
Note: See TracBrowser for help on using the repository browser.