source: mainline/kernel/generic/src/synch/workqueue.c@ 3bacee1

lfn serial ticket/834-toolchain-update topic/msim-upgrade topic/simplify-dev-export
Last change on this file since 3bacee1 was 3bacee1, checked in by Jiri Svoboda <jiri@…>, 7 years ago

Make ccheck-fix again and commit more good files.

  • Property mode set to 100644
File size: 27.1 KB
Line 
1/*
2 * Copyright (c) 2012 Adam Hraska
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 *
9 * - Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 * - Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
14 * - The name of the author may not be used to endorse or promote products
15 * derived from this software without specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
18 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
19 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
20 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
21 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
22 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
26 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27 */
28
29/** @addtogroup generic
30 * @{
31 */
32
33/**
34 * @file
35 * @brief Work queue/thread pool that automatically adjusts its size
36 * depending on the current load. Queued work functions may sleep..
37 */
38
39#include <assert.h>
40#include <errno.h>
41#include <synch/workqueue.h>
42#include <synch/spinlock.h>
43#include <synch/condvar.h>
44#include <synch/mutex.h>
45#include <proc/thread.h>
46#include <config.h>
47#include <arch.h>
48#include <cpu.h>
49#include <macros.h>
50
51#define WORKQ_MAGIC 0xf00c1333U
52#define WORK_ITEM_MAGIC 0xfeec1777U
53
54
55struct work_queue {
56 /*
57 * Protects everything except activate_worker.
58 * Must be acquired after any thread->locks.
59 */
60 IRQ_SPINLOCK_DECLARE(lock);
61
62 /* Activates a worker if new work arrives or if shutting down the queue. */
63 condvar_t activate_worker;
64
65 /* Queue of work_items ready to be dispatched. */
66 list_t queue;
67
68 /* List of worker threads. */
69 list_t workers;
70
71 /* Number of work items queued. */
72 size_t item_cnt;
73
74 /* Indicates the work queue is shutting down. */
75 bool stopping;
76 const char *name;
77
78 /* Total number of created worker threads. */
79 size_t cur_worker_cnt;
80 /* Number of workers waiting for work to arrive. */
81 size_t idle_worker_cnt;
82 /* Number of idle workers signaled that have not yet been woken up. */
83 size_t activate_pending;
84 /* Number of blocked workers sleeping in work func() (ie not idle). */
85 size_t blocked_worker_cnt;
86
87 /* Number of pending signal_worker_op() operations. */
88 size_t pending_op_cnt;
89
90 link_t nb_link;
91
92#ifdef CONFIG_DEBUG
93 /* Magic cookie for integrity checks. Immutable. Accessed without lock. */
94 uint32_t cookie;
95#endif
96};
97
98
99/** Min number of idle workers to keep. */
100static size_t min_worker_cnt;
101/** Max total number of workers - be it blocked, idle, or active. */
102static size_t max_worker_cnt;
103/** Max number of concurrently running active workers, ie not blocked nor idle. */
104static size_t max_concurrent_workers;
105/** Max number of work items per active worker before a new worker is activated.*/
106static const size_t max_items_per_worker = 8;
107
108/** System wide work queue. */
109static struct work_queue g_work_queue;
110
111static int booting = true;
112
113
114typedef struct {
115 IRQ_SPINLOCK_DECLARE(lock);
116 condvar_t req_cv;
117 thread_t *thread;
118 list_t work_queues;
119} nonblock_adder_t;
120
121static nonblock_adder_t nonblock_adder;
122
123
124
125/** Typedef a worker thread signaling operation prototype. */
126typedef void (*signal_op_t)(struct work_queue *workq);
127
128
129/* Fwd decl. */
130static void workq_preinit(struct work_queue *workq, const char *name);
131static bool add_worker(struct work_queue *workq);
132static void interrupt_workers(struct work_queue *workq);
133static void wait_for_workers(struct work_queue *workq);
134static int _workq_enqueue(struct work_queue *workq, work_t *work_item,
135 work_func_t func, bool can_block);
136static void init_work_item(work_t *work_item, work_func_t func);
137static signal_op_t signal_worker_logic(struct work_queue *workq, bool can_block);
138static void worker_thread(void *arg);
139static bool dequeue_work(struct work_queue *workq, work_t **pwork_item);
140static bool worker_unnecessary(struct work_queue *workq);
141static void cv_wait(struct work_queue *workq);
142static void nonblock_init(void);
143
144#ifdef CONFIG_DEBUG
145static bool workq_corrupted(struct work_queue *workq);
146static bool work_item_corrupted(work_t *work_item);
147#endif
148
149/** Creates worker thread for the system-wide worker queue. */
150void workq_global_worker_init(void)
151{
152 /*
153 * No need for additional synchronization. Stores to word-sized
154 * variables are atomic and the change will eventually propagate.
155 * Moreover add_worker() includes the necessary memory barriers
156 * in spinlock lock/unlock().
157 */
158 booting = false;
159
160 nonblock_init();
161
162 if (!add_worker(&g_work_queue))
163 panic("Could not create a single global work queue worker!\n");
164
165}
166
167/** Initializes the system wide work queue and support for other work queues. */
168void workq_global_init(void)
169{
170 /* Keep idle workers on 1/4-th of cpus, but at least 2 threads. */
171 min_worker_cnt = max(2, config.cpu_count / 4);
172 /* Allow max 8 sleeping work items per cpu. */
173 max_worker_cnt = max(32, 8 * config.cpu_count);
174 /* Maximum concurrency without slowing down the system. */
175 max_concurrent_workers = max(2, config.cpu_count);
176
177 workq_preinit(&g_work_queue, "kworkq");
178}
179
180/** Stops the system global work queue and waits for all work items to complete.*/
181void workq_global_stop(void)
182{
183 workq_stop(&g_work_queue);
184}
185
186/** Creates and initializes a work queue. Returns NULL upon failure. */
187struct work_queue *workq_create(const char *name)
188{
189 struct work_queue *workq = malloc(sizeof(struct work_queue), 0);
190
191 if (workq) {
192 if (workq_init(workq, name)) {
193 assert(!workq_corrupted(workq));
194 return workq;
195 }
196
197 free(workq);
198 }
199
200 return NULL;
201}
202
203/** Frees work queue resources and stops it if it had not been done so already.*/
204void workq_destroy(struct work_queue *workq)
205{
206 assert(!workq_corrupted(workq));
207
208 irq_spinlock_lock(&workq->lock, true);
209 bool stopped = workq->stopping;
210#ifdef CONFIG_DEBUG
211 size_t running_workers = workq->cur_worker_cnt;
212#endif
213 irq_spinlock_unlock(&workq->lock, true);
214
215 if (!stopped) {
216 workq_stop(workq);
217 } else {
218 assert(0 == running_workers);
219 }
220
221#ifdef CONFIG_DEBUG
222 workq->cookie = 0;
223#endif
224
225 free(workq);
226}
227
228/** Initializes workq structure without creating any workers. */
229static void workq_preinit(struct work_queue *workq, const char *name)
230{
231#ifdef CONFIG_DEBUG
232 workq->cookie = WORKQ_MAGIC;
233#endif
234
235 irq_spinlock_initialize(&workq->lock, name);
236 condvar_initialize(&workq->activate_worker);
237
238 list_initialize(&workq->queue);
239 list_initialize(&workq->workers);
240
241 workq->item_cnt = 0;
242 workq->stopping = false;
243 workq->name = name;
244
245 workq->cur_worker_cnt = 1;
246 workq->idle_worker_cnt = 0;
247 workq->activate_pending = 0;
248 workq->blocked_worker_cnt = 0;
249
250 workq->pending_op_cnt = 0;
251 link_initialize(&workq->nb_link);
252}
253
254/** Initializes a work queue. Returns true if successful.
255 *
256 * Before destroying a work queue it must be stopped via
257 * workq_stop().
258 */
259bool workq_init(struct work_queue *workq, const char *name)
260{
261 workq_preinit(workq, name);
262 return add_worker(workq);
263}
264
265/** Add a new worker thread. Returns false if the thread could not be created. */
266static bool add_worker(struct work_queue *workq)
267{
268 assert(!workq_corrupted(workq));
269
270 thread_t *thread = thread_create(worker_thread, workq, TASK,
271 THREAD_FLAG_NONE, workq->name);
272
273 if (!thread) {
274 irq_spinlock_lock(&workq->lock, true);
275
276 /* cur_worker_cnt proactively increased in signal_worker_logic() .*/
277 assert(0 < workq->cur_worker_cnt);
278 --workq->cur_worker_cnt;
279
280 irq_spinlock_unlock(&workq->lock, true);
281 return false;
282 }
283
284 /* Respect lock ordering. */
285 irq_spinlock_lock(&thread->lock, true);
286 irq_spinlock_lock(&workq->lock, false);
287
288 bool success;
289
290 if (!workq->stopping) {
291 success = true;
292
293 /* Try to distribute workers among cpus right away. */
294 unsigned int cpu_id = (workq->cur_worker_cnt) % config.cpu_active;
295
296 if (!cpus[cpu_id].active)
297 cpu_id = CPU->id;
298
299 thread->workq = workq;
300 thread->cpu = &cpus[cpu_id];
301 thread->workq_blocked = false;
302 thread->workq_idling = false;
303 link_initialize(&thread->workq_link);
304
305 list_append(&thread->workq_link, &workq->workers);
306 } else {
307 /*
308 * Work queue is shutting down - we must not add the worker
309 * and we cannot destroy it without ready-ing it. Mark it
310 * interrupted so the worker exits right away without even
311 * touching workq.
312 */
313 success = false;
314
315 /* cur_worker_cnt proactively increased in signal_worker() .*/
316 assert(0 < workq->cur_worker_cnt);
317 --workq->cur_worker_cnt;
318 }
319
320 irq_spinlock_unlock(&workq->lock, false);
321 irq_spinlock_unlock(&thread->lock, true);
322
323 if (!success) {
324 thread_interrupt(thread);
325 }
326
327 thread_ready(thread);
328
329 return success;
330}
331
332/** Shuts down the work queue. Waits for all pending work items to complete.
333 *
334 * workq_stop() may only be run once.
335 */
336void workq_stop(struct work_queue *workq)
337{
338 assert(!workq_corrupted(workq));
339
340 interrupt_workers(workq);
341 wait_for_workers(workq);
342}
343
344/** Notifies worker threads the work queue is shutting down. */
345static void interrupt_workers(struct work_queue *workq)
346{
347 irq_spinlock_lock(&workq->lock, true);
348
349 /* workq_stop() may only be called once. */
350 assert(!workq->stopping);
351 workq->stopping = true;
352
353 /* Respect lock ordering - do not hold workq->lock during broadcast. */
354 irq_spinlock_unlock(&workq->lock, true);
355
356 condvar_broadcast(&workq->activate_worker);
357}
358
359/** Waits for all worker threads to exit. */
360static void wait_for_workers(struct work_queue *workq)
361{
362 assert(!PREEMPTION_DISABLED);
363
364 irq_spinlock_lock(&workq->lock, true);
365
366 list_foreach_safe(workq->workers, cur_worker, next_worker) {
367 thread_t *worker = list_get_instance(cur_worker, thread_t, workq_link);
368 list_remove(cur_worker);
369
370 /* Wait without the lock. */
371 irq_spinlock_unlock(&workq->lock, true);
372
373 thread_join(worker);
374 thread_detach(worker);
375
376 irq_spinlock_lock(&workq->lock, true);
377 }
378
379 assert(list_empty(&workq->workers));
380
381 /* Wait for deferred add_worker_op(), signal_worker_op() to finish. */
382 while (0 < workq->cur_worker_cnt || 0 < workq->pending_op_cnt) {
383 irq_spinlock_unlock(&workq->lock, true);
384
385 scheduler();
386
387 irq_spinlock_lock(&workq->lock, true);
388 }
389
390 irq_spinlock_unlock(&workq->lock, true);
391}
392
393/** Queues a function into the global wait queue without blocking.
394 *
395 * See workq_enqueue_noblock() for more details.
396 */
397bool workq_global_enqueue_noblock(work_t *work_item, work_func_t func)
398{
399 return workq_enqueue_noblock(&g_work_queue, work_item, func);
400}
401
402/** Queues a function into the global wait queue; may block.
403 *
404 * See workq_enqueue() for more details.
405 */
406bool workq_global_enqueue(work_t *work_item, work_func_t func)
407{
408 return workq_enqueue(&g_work_queue, work_item, func);
409}
410
411/** Adds a function to be invoked in a separate thread without blocking.
412 *
413 * workq_enqueue_noblock() is guaranteed not to block. It is safe
414 * to invoke from interrupt handlers.
415 *
416 * Consider using workq_enqueue() instead if at all possible. Otherwise,
417 * your work item may have to wait for previously enqueued sleeping
418 * work items to complete if you are unlucky.
419 *
420 * @param workq Work queue where to queue the work item.
421 * @param work_item Work item bookkeeping structure. Must be valid
422 * until func() is entered.
423 * @param func User supplied function to invoke in a worker thread.
424
425 * @return false if work queue is shutting down; function is not
426 * queued for further processing.
427 * @return true Otherwise. func() will be invoked in a separate thread.
428 */
429bool workq_enqueue_noblock(struct work_queue *workq, work_t *work_item,
430 work_func_t func)
431{
432 return _workq_enqueue(workq, work_item, func, false);
433}
434
435/** Adds a function to be invoked in a separate thread; may block.
436 *
437 * While the workq_enqueue() is unlikely to block, it may do so if too
438 * many previous work items blocked sleeping.
439 *
440 * @param workq Work queue where to queue the work item.
441 * @param work_item Work item bookkeeping structure. Must be valid
442 * until func() is entered.
443 * @param func User supplied function to invoke in a worker thread.
444
445 * @return false if work queue is shutting down; function is not
446 * queued for further processing.
447 * @return true Otherwise. func() will be invoked in a separate thread.
448 */
449bool workq_enqueue(struct work_queue *workq, work_t *work_item, work_func_t func)
450{
451 return _workq_enqueue(workq, work_item, func, true);
452}
453
454/** Adds a work item that will be processed by a separate worker thread.
455 *
456 * func() will be invoked in another kernel thread and may block.
457 *
458 * Prefer to call _workq_enqueue() with can_block set. Otherwise
459 * your work item may have to wait for sleeping work items to complete.
460 * If all worker threads are blocked/sleeping a new worker thread cannot
461 * be create without can_block set because creating a thread might
462 * block due to low memory conditions.
463 *
464 * @param workq Work queue where to queue the work item.
465 * @param work_item Work item bookkeeping structure. Must be valid
466 * until func() is entered.
467 * @param func User supplied function to invoke in a worker thread.
468 * @param can_block May adding this work item block?
469
470 * @return false if work queue is shutting down; function is not
471 * queued for further processing.
472 * @return true Otherwise.
473 */
474static int _workq_enqueue(struct work_queue *workq, work_t *work_item,
475 work_func_t func, bool can_block)
476{
477 assert(!workq_corrupted(workq));
478
479 bool success = true;
480 signal_op_t signal_op = NULL;
481
482 irq_spinlock_lock(&workq->lock, true);
483
484 if (workq->stopping) {
485 success = false;
486 } else {
487 init_work_item(work_item, func);
488 list_append(&work_item->queue_link, &workq->queue);
489 ++workq->item_cnt;
490 success = true;
491
492 if (!booting) {
493 signal_op = signal_worker_logic(workq, can_block);
494 } else {
495 /*
496 * During boot there are no workers to signal. Just queue
497 * the work and let future workers take care of it.
498 */
499 }
500 }
501
502 irq_spinlock_unlock(&workq->lock, true);
503
504 if (signal_op) {
505 signal_op(workq);
506 }
507
508 return success;
509}
510
511/** Prepare an item to be added to the work item queue. */
512static void init_work_item(work_t *work_item, work_func_t func)
513{
514#ifdef CONFIG_DEBUG
515 work_item->cookie = WORK_ITEM_MAGIC;
516#endif
517
518 link_initialize(&work_item->queue_link);
519 work_item->func = func;
520}
521
522/** Returns the number of workers running work func() that are not blocked. */
523static size_t active_workers_now(struct work_queue *workq)
524{
525 assert(irq_spinlock_locked(&workq->lock));
526
527 /* Workers blocked are sleeping in the work function (ie not idle). */
528 assert(workq->blocked_worker_cnt <= workq->cur_worker_cnt);
529 /* Idle workers are waiting for more work to arrive in condvar_wait. */
530 assert(workq->idle_worker_cnt <= workq->cur_worker_cnt);
531
532 /* Idle + blocked workers == sleeping worker threads. */
533 size_t sleeping_workers = workq->blocked_worker_cnt + workq->idle_worker_cnt;
534
535 assert(sleeping_workers <= workq->cur_worker_cnt);
536 /* Workers pending activation are idle workers not yet given a time slice. */
537 assert(workq->activate_pending <= workq->idle_worker_cnt);
538
539 /*
540 * Workers actively running the work func() this very moment and
541 * are neither blocked nor idle. Exclude ->activate_pending workers
542 * since they will run their work func() once they get a time slice
543 * and are not running it right now.
544 */
545 return workq->cur_worker_cnt - sleeping_workers;
546}
547
548/**
549 * Returns the number of workers that are running or are about to run work
550 * func() and that are not blocked.
551 */
552static size_t active_workers(struct work_queue *workq)
553{
554 assert(irq_spinlock_locked(&workq->lock));
555
556 /*
557 * Workers actively running the work func() and are neither blocked nor
558 * idle. ->activate_pending workers will run their work func() once they
559 * get a time slice after waking from a condvar wait, so count them
560 * as well.
561 */
562 return active_workers_now(workq) + workq->activate_pending;
563}
564
565static void add_worker_noblock_op(struct work_queue *workq)
566{
567 condvar_signal(&nonblock_adder.req_cv);
568}
569
570static void add_worker_op(struct work_queue *workq)
571{
572 add_worker(workq);
573}
574
575static void signal_worker_op(struct work_queue *workq)
576{
577 assert(!workq_corrupted(workq));
578
579 condvar_signal(&workq->activate_worker);
580
581 irq_spinlock_lock(&workq->lock, true);
582 assert(0 < workq->pending_op_cnt);
583 --workq->pending_op_cnt;
584 irq_spinlock_unlock(&workq->lock, true);
585}
586
587/** Determines how to signal workers if at all.
588 *
589 * @param workq Work queue where a new work item was queued.
590 * @param can_block True if we may block while signaling a worker or creating
591 * a new worker.
592 *
593 * @return Function that will notify workers or NULL if no action is needed.
594 */
595static signal_op_t signal_worker_logic(struct work_queue *workq, bool can_block)
596{
597 assert(!workq_corrupted(workq));
598 assert(irq_spinlock_locked(&workq->lock));
599
600 /* Only signal workers if really necessary. */
601 signal_op_t signal_op = NULL;
602
603 /*
604 * Workers actively running the work func() and neither blocked nor idle.
605 * Including ->activate_pending workers that will run their work func()
606 * once they get a time slice.
607 */
608 size_t active = active_workers(workq);
609 /* Max total allowed number of work items queued for active workers. */
610 size_t max_load = active * max_items_per_worker;
611
612 /* Active workers are getting overwhelmed - activate another. */
613 if (max_load < workq->item_cnt) {
614
615 size_t remaining_idle =
616 workq->idle_worker_cnt - workq->activate_pending;
617
618 /* Idle workers still exist - activate one. */
619 if (remaining_idle > 0) {
620 /*
621 * Directly changing idle_worker_cnt here would not allow
622 * workers to recognize spurious wake-ups. Change
623 * activate_pending instead.
624 */
625 ++workq->activate_pending;
626 ++workq->pending_op_cnt;
627 signal_op = signal_worker_op;
628 } else {
629 /* No idle workers remain. Request that a new one be created. */
630 bool need_worker = (active < max_concurrent_workers) &&
631 (workq->cur_worker_cnt < max_worker_cnt);
632
633 if (need_worker && can_block) {
634 signal_op = add_worker_op;
635 /*
636 * It may take some time to actually create the worker.
637 * We don't want to swamp the thread pool with superfluous
638 * worker creation requests so pretend it was already
639 * created and proactively increase the worker count.
640 */
641 ++workq->cur_worker_cnt;
642 }
643
644 /*
645 * We cannot create a new worker but we need one desperately
646 * because all workers are blocked in their work functions.
647 */
648 if (need_worker && !can_block && 0 == active) {
649 assert(0 == workq->idle_worker_cnt);
650
651 irq_spinlock_lock(&nonblock_adder.lock, true);
652
653 if (nonblock_adder.thread && !link_used(&workq->nb_link)) {
654 signal_op = add_worker_noblock_op;
655 ++workq->cur_worker_cnt;
656 list_append(&workq->nb_link, &nonblock_adder.work_queues);
657 }
658
659 irq_spinlock_unlock(&nonblock_adder.lock, true);
660 }
661 }
662 } else {
663 /*
664 * There are enough active/running workers to process the queue.
665 * No need to signal/activate any new workers.
666 */
667 signal_op = NULL;
668 }
669
670 return signal_op;
671}
672
673/** Executes queued work items. */
674static void worker_thread(void *arg)
675{
676 /*
677 * The thread has been created after the work queue was ordered to stop.
678 * Do not access the work queue and return immediately.
679 */
680 if (thread_interrupted(THREAD)) {
681 thread_detach(THREAD);
682 return;
683 }
684
685 assert(arg != NULL);
686
687 struct work_queue *workq = arg;
688 work_t *work_item;
689
690 while (dequeue_work(workq, &work_item)) {
691 /* Copy the func field so func() can safely free work_item. */
692 work_func_t func = work_item->func;
693
694 func(work_item);
695 }
696}
697
698/** Waits and retrieves a work item. Returns false if the worker should exit. */
699static bool dequeue_work(struct work_queue *workq, work_t **pwork_item)
700{
701 assert(!workq_corrupted(workq));
702
703 irq_spinlock_lock(&workq->lock, true);
704
705 /* Check if we should exit if load is low. */
706 if (!workq->stopping && worker_unnecessary(workq)) {
707 /* There are too many workers for this load. Exit. */
708 assert(0 < workq->cur_worker_cnt);
709 --workq->cur_worker_cnt;
710 list_remove(&THREAD->workq_link);
711 irq_spinlock_unlock(&workq->lock, true);
712
713 thread_detach(THREAD);
714 return false;
715 }
716
717 bool stop = false;
718
719 /* Wait for work to arrive. */
720 while (list_empty(&workq->queue) && !workq->stopping) {
721 cv_wait(workq);
722
723 if (0 < workq->activate_pending)
724 --workq->activate_pending;
725 }
726
727 /* Process remaining work even if requested to stop. */
728 if (!list_empty(&workq->queue)) {
729 link_t *work_link = list_first(&workq->queue);
730 *pwork_item = list_get_instance(work_link, work_t, queue_link);
731
732#ifdef CONFIG_DEBUG
733 assert(!work_item_corrupted(*pwork_item));
734 (*pwork_item)->cookie = 0;
735#endif
736 list_remove(work_link);
737 --workq->item_cnt;
738
739 stop = false;
740 } else {
741 /* Requested to stop and no more work queued. */
742 assert(workq->stopping);
743 --workq->cur_worker_cnt;
744 stop = true;
745 }
746
747 irq_spinlock_unlock(&workq->lock, true);
748
749 return !stop;
750}
751
752/** Returns true if for the given load there are too many workers. */
753static bool worker_unnecessary(struct work_queue *workq)
754{
755 assert(irq_spinlock_locked(&workq->lock));
756
757 /* No work is pending. We don't need too many idle threads. */
758 if (list_empty(&workq->queue)) {
759 /* There are too many idle workers. Exit. */
760 return (min_worker_cnt <= workq->idle_worker_cnt);
761 } else {
762 /*
763 * There is work but we are swamped with too many active workers
764 * that were woken up from sleep at around the same time. We
765 * don't need another worker fighting for cpu time.
766 */
767 size_t active = active_workers_now(workq);
768 return (max_concurrent_workers < active);
769 }
770}
771
772/** Waits for a signal to activate_worker. Thread marked idle while waiting. */
773static void cv_wait(struct work_queue *workq)
774{
775 ++workq->idle_worker_cnt;
776 THREAD->workq_idling = true;
777
778 /* Ignore lock ordering just here. */
779 assert(irq_spinlock_locked(&workq->lock));
780
781 _condvar_wait_timeout_irq_spinlock(&workq->activate_worker,
782 &workq->lock, SYNCH_NO_TIMEOUT, SYNCH_FLAGS_NONE);
783
784 assert(!workq_corrupted(workq));
785 assert(irq_spinlock_locked(&workq->lock));
786
787 THREAD->workq_idling = false;
788 --workq->idle_worker_cnt;
789}
790
791
792/** Invoked from thread_ready() right before the thread is woken up. */
793void workq_before_thread_is_ready(thread_t *thread)
794{
795 assert(thread);
796 assert(irq_spinlock_locked(&thread->lock));
797
798 /* Worker's work func() is about to wake up from sleeping. */
799 if (thread->workq && thread->workq_blocked) {
800 /* Must be blocked in user work func() and not be waiting for work. */
801 assert(!thread->workq_idling);
802 assert(thread->state == Sleeping);
803 assert(THREAD != thread);
804 assert(!workq_corrupted(thread->workq));
805
806 /* Protected by thread->lock */
807 thread->workq_blocked = false;
808
809 irq_spinlock_lock(&thread->workq->lock, true);
810 --thread->workq->blocked_worker_cnt;
811 irq_spinlock_unlock(&thread->workq->lock, true);
812 }
813}
814
815/** Invoked from scheduler() before switching away from a thread. */
816void workq_after_thread_ran(void)
817{
818 assert(THREAD);
819 assert(irq_spinlock_locked(&THREAD->lock));
820
821 /* Worker's work func() is about to sleep/block. */
822 if (THREAD->workq && THREAD->state == Sleeping && !THREAD->workq_idling) {
823 assert(!THREAD->workq_blocked);
824 assert(!workq_corrupted(THREAD->workq));
825
826 THREAD->workq_blocked = true;
827
828 irq_spinlock_lock(&THREAD->workq->lock, false);
829
830 ++THREAD->workq->blocked_worker_cnt;
831
832 bool can_block = false;
833 signal_op_t op = signal_worker_logic(THREAD->workq, can_block);
834
835 irq_spinlock_unlock(&THREAD->workq->lock, false);
836
837 if (op) {
838 assert(add_worker_noblock_op == op || signal_worker_op == op);
839 op(THREAD->workq);
840 }
841 }
842}
843
844/** Prints stats of the work queue to the kernel console. */
845void workq_print_info(struct work_queue *workq)
846{
847 irq_spinlock_lock(&workq->lock, true);
848
849 size_t total = workq->cur_worker_cnt;
850 size_t blocked = workq->blocked_worker_cnt;
851 size_t idle = workq->idle_worker_cnt;
852 size_t active = active_workers(workq);
853 size_t items = workq->item_cnt;
854 bool stopping = workq->stopping;
855 bool worker_surplus = worker_unnecessary(workq);
856 const char *load_str = worker_surplus ? "decreasing" :
857 (0 < workq->activate_pending) ? "increasing" : "stable";
858
859 irq_spinlock_unlock(&workq->lock, true);
860
861 printf(
862 "Configuration: max_worker_cnt=%zu, min_worker_cnt=%zu,\n"
863 " max_concurrent_workers=%zu, max_items_per_worker=%zu\n"
864 "Workers: %zu\n"
865 "Active: %zu (workers currently processing work)\n"
866 "Blocked: %zu (work functions sleeping/blocked)\n"
867 "Idle: %zu (idle workers waiting for more work)\n"
868 "Items: %zu (queued not yet dispatched work)\n"
869 "Stopping: %d\n"
870 "Load: %s\n",
871 max_worker_cnt, min_worker_cnt,
872 max_concurrent_workers, max_items_per_worker,
873 total,
874 active,
875 blocked,
876 idle,
877 items,
878 stopping,
879 load_str);
880}
881
882/** Prints stats of the global work queue. */
883void workq_global_print_info(void)
884{
885 workq_print_info(&g_work_queue);
886}
887
888
889static bool dequeue_add_req(nonblock_adder_t *info, struct work_queue **pworkq)
890{
891 bool stop = false;
892
893 irq_spinlock_lock(&info->lock, true);
894
895 while (list_empty(&info->work_queues) && !stop) {
896 errno_t ret = _condvar_wait_timeout_irq_spinlock(&info->req_cv,
897 &info->lock, SYNCH_NO_TIMEOUT, SYNCH_FLAGS_INTERRUPTIBLE);
898
899 stop = (ret == EINTR);
900 }
901
902 if (!stop) {
903 *pworkq = list_get_instance(list_first(&info->work_queues),
904 struct work_queue, nb_link);
905
906 assert(!workq_corrupted(*pworkq));
907
908 list_remove(&(*pworkq)->nb_link);
909 }
910
911 irq_spinlock_unlock(&info->lock, true);
912
913 return !stop;
914}
915
916static void thr_nonblock_add_worker(void *arg)
917{
918 nonblock_adder_t *info = arg;
919 struct work_queue *workq = NULL;
920
921 while (dequeue_add_req(info, &workq)) {
922 add_worker(workq);
923 }
924}
925
926
927static void nonblock_init(void)
928{
929 irq_spinlock_initialize(&nonblock_adder.lock, "kworkq-nb.lock");
930 condvar_initialize(&nonblock_adder.req_cv);
931 list_initialize(&nonblock_adder.work_queues);
932
933 nonblock_adder.thread = thread_create(thr_nonblock_add_worker,
934 &nonblock_adder, TASK, THREAD_FLAG_NONE, "kworkq-nb");
935
936 if (nonblock_adder.thread) {
937 thread_ready(nonblock_adder.thread);
938 } else {
939 /*
940 * We won't be able to add workers without blocking if all workers
941 * sleep, but at least boot the system.
942 */
943 printf("Failed to create kworkq-nb. Sleeping work may stall the workq.\n");
944 }
945}
946
947#ifdef CONFIG_DEBUG
948/** Returns true if the workq is definitely corrupted; false if not sure.
949 *
950 * Can be used outside of any locks.
951 */
952static bool workq_corrupted(struct work_queue *workq)
953{
954 /*
955 * Needed to make the most current cookie value set by workq_preinit()
956 * visible even if we access the workq right after it is created but
957 * on a different cpu. Otherwise, workq_corrupted() would not work
958 * outside a lock.
959 */
960 memory_barrier();
961 return NULL == workq || workq->cookie != WORKQ_MAGIC;
962}
963
964/** Returns true if the work_item is definitely corrupted; false if not sure.
965 *
966 * Must be used with the work queue protecting spinlock locked.
967 */
968static bool work_item_corrupted(work_t *work_item)
969{
970 return NULL == work_item || work_item->cookie != WORK_ITEM_MAGIC;
971}
972#endif
973
974/** @}
975 */
Note: See TracBrowser for help on using the repository browser.