source: mainline/kernel/generic/src/synch/workqueue.c@ c46bfbc

lfn serial ticket/834-toolchain-update topic/msim-upgrade topic/simplify-dev-export
Last change on this file since c46bfbc was 63e27ef, checked in by Jiri Svoboda <jiri@…>, 8 years ago

ASSERT → assert

  • Property mode set to 100644
File size: 27.2 KB
Line 
1/*
2 * Copyright (c) 2012 Adam Hraska
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 *
9 * - Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 * - Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
14 * - The name of the author may not be used to endorse or promote products
15 * derived from this software without specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
18 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
19 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
20 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
21 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
22 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
26 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27 */
28
29/** @addtogroup generic
30 * @{
31 */
32
33/**
34 * @file
35 * @brief Work queue/thread pool that automatically adjusts its size
36 * depending on the current load. Queued work functions may sleep..
37 */
38
39#include <assert.h>
40#include <synch/workqueue.h>
41#include <synch/spinlock.h>
42#include <synch/condvar.h>
43#include <synch/mutex.h>
44#include <proc/thread.h>
45#include <config.h>
46#include <arch.h>
47#include <cpu.h>
48#include <macros.h>
49
50#define WORKQ_MAGIC 0xf00c1333U
51#define WORK_ITEM_MAGIC 0xfeec1777U
52
53
54struct work_queue {
55 /*
56 * Protects everything except activate_worker.
57 * Must be acquired after any thread->locks.
58 */
59 IRQ_SPINLOCK_DECLARE(lock);
60
61 /* Activates a worker if new work arrives or if shutting down the queue. */
62 condvar_t activate_worker;
63
64 /* Queue of work_items ready to be dispatched. */
65 list_t queue;
66
67 /* List of worker threads. */
68 list_t workers;
69
70 /* Number of work items queued. */
71 size_t item_cnt;
72
73 /* Indicates the work queue is shutting down. */
74 bool stopping;
75 const char *name;
76
77 /* Total number of created worker threads. */
78 size_t cur_worker_cnt;
79 /* Number of workers waiting for work to arrive. */
80 size_t idle_worker_cnt;
81 /* Number of idle workers signaled that have not yet been woken up. */
82 size_t activate_pending;
83 /* Number of blocked workers sleeping in work func() (ie not idle). */
84 size_t blocked_worker_cnt;
85
86 /* Number of pending signal_worker_op() operations. */
87 size_t pending_op_cnt;
88
89 link_t nb_link;
90
91#ifdef CONFIG_DEBUG
92 /* Magic cookie for integrity checks. Immutable. Accessed without lock. */
93 uint32_t cookie;
94#endif
95};
96
97
98/** Min number of idle workers to keep. */
99static size_t min_worker_cnt;
100/** Max total number of workers - be it blocked, idle, or active. */
101static size_t max_worker_cnt;
102/** Max number of concurrently running active workers, ie not blocked nor idle. */
103static size_t max_concurrent_workers;
104/** Max number of work items per active worker before a new worker is activated.*/
105static const size_t max_items_per_worker = 8;
106
107/** System wide work queue. */
108static struct work_queue g_work_queue;
109
110static int booting = true;
111
112
113typedef struct {
114 IRQ_SPINLOCK_DECLARE(lock);
115 condvar_t req_cv;
116 thread_t *thread;
117 list_t work_queues;
118} nonblock_adder_t;
119
120static nonblock_adder_t nonblock_adder;
121
122
123
124/** Typedef a worker thread signaling operation prototype. */
125typedef void (*signal_op_t)(struct work_queue *workq);
126
127
128/* Fwd decl. */
129static void workq_preinit(struct work_queue *workq, const char *name);
130static bool add_worker(struct work_queue *workq);
131static void interrupt_workers(struct work_queue *workq);
132static void wait_for_workers(struct work_queue *workq);
133static int _workq_enqueue(struct work_queue *workq, work_t *work_item,
134 work_func_t func, bool can_block);
135static void init_work_item(work_t *work_item, work_func_t func);
136static signal_op_t signal_worker_logic(struct work_queue *workq, bool can_block);
137static void worker_thread(void *arg);
138static bool dequeue_work(struct work_queue *workq, work_t **pwork_item);
139static bool worker_unnecessary(struct work_queue *workq);
140static void cv_wait(struct work_queue *workq);
141static void nonblock_init(void);
142
143#ifdef CONFIG_DEBUG
144static bool workq_corrupted(struct work_queue *workq);
145static bool work_item_corrupted(work_t *work_item);
146#endif
147
148/** Creates worker thread for the system-wide worker queue. */
149void workq_global_worker_init(void)
150{
151 /*
152 * No need for additional synchronization. Stores to word-sized
153 * variables are atomic and the change will eventually propagate.
154 * Moreover add_worker() includes the necessary memory barriers
155 * in spinlock lock/unlock().
156 */
157 booting = false;
158
159 nonblock_init();
160
161 if (!add_worker(&g_work_queue))
162 panic("Could not create a single global work queue worker!\n");
163
164}
165
166/** Initializes the system wide work queue and support for other work queues. */
167void workq_global_init(void)
168{
169 /* Keep idle workers on 1/4-th of cpus, but at least 2 threads. */
170 min_worker_cnt = max(2, config.cpu_count / 4);
171 /* Allow max 8 sleeping work items per cpu. */
172 max_worker_cnt = max(32, 8 * config.cpu_count);
173 /* Maximum concurrency without slowing down the system. */
174 max_concurrent_workers = max(2, config.cpu_count);
175
176 workq_preinit(&g_work_queue, "kworkq");
177}
178
179/** Stops the system global work queue and waits for all work items to complete.*/
180void workq_global_stop(void)
181{
182 workq_stop(&g_work_queue);
183}
184
185/** Creates and initializes a work queue. Returns NULL upon failure. */
186struct work_queue * workq_create(const char *name)
187{
188 struct work_queue *workq = malloc(sizeof(struct work_queue), 0);
189
190 if (workq) {
191 if (workq_init(workq, name)) {
192 assert(!workq_corrupted(workq));
193 return workq;
194 }
195
196 free(workq);
197 }
198
199 return NULL;
200}
201
202/** Frees work queue resources and stops it if it had not been done so already.*/
203void workq_destroy(struct work_queue *workq)
204{
205 assert(!workq_corrupted(workq));
206
207 irq_spinlock_lock(&workq->lock, true);
208 bool stopped = workq->stopping;
209#ifdef CONFIG_DEBUG
210 size_t running_workers = workq->cur_worker_cnt;
211#endif
212 irq_spinlock_unlock(&workq->lock, true);
213
214 if (!stopped) {
215 workq_stop(workq);
216 } else {
217 assert(0 == running_workers);
218 }
219
220#ifdef CONFIG_DEBUG
221 workq->cookie = 0;
222#endif
223
224 free(workq);
225}
226
227/** Initializes workq structure without creating any workers. */
228static void workq_preinit(struct work_queue *workq, const char *name)
229{
230#ifdef CONFIG_DEBUG
231 workq->cookie = WORKQ_MAGIC;
232#endif
233
234 irq_spinlock_initialize(&workq->lock, name);
235 condvar_initialize(&workq->activate_worker);
236
237 list_initialize(&workq->queue);
238 list_initialize(&workq->workers);
239
240 workq->item_cnt = 0;
241 workq->stopping = false;
242 workq->name = name;
243
244 workq->cur_worker_cnt = 1;
245 workq->idle_worker_cnt = 0;
246 workq->activate_pending = 0;
247 workq->blocked_worker_cnt = 0;
248
249 workq->pending_op_cnt = 0;
250 link_initialize(&workq->nb_link);
251}
252
253/** Initializes a work queue. Returns true if successful.
254 *
255 * Before destroying a work queue it must be stopped via
256 * workq_stop().
257 */
258int workq_init(struct work_queue *workq, const char *name)
259{
260 workq_preinit(workq, name);
261 return add_worker(workq);
262}
263
264/** Add a new worker thread. Returns false if the thread could not be created. */
265static bool add_worker(struct work_queue *workq)
266{
267 assert(!workq_corrupted(workq));
268
269 thread_t *thread = thread_create(worker_thread, workq, TASK,
270 THREAD_FLAG_NONE, workq->name);
271
272 if (!thread) {
273 irq_spinlock_lock(&workq->lock, true);
274
275 /* cur_worker_cnt proactively increased in signal_worker_logic() .*/
276 assert(0 < workq->cur_worker_cnt);
277 --workq->cur_worker_cnt;
278
279 irq_spinlock_unlock(&workq->lock, true);
280 return false;
281 }
282
283 /* Respect lock ordering. */
284 irq_spinlock_lock(&thread->lock, true);
285 irq_spinlock_lock(&workq->lock, false);
286
287 bool success;
288
289 if (!workq->stopping) {
290 success = true;
291
292 /* Try to distribute workers among cpus right away. */
293 unsigned int cpu_id = (workq->cur_worker_cnt) % config.cpu_active;
294
295 if (!cpus[cpu_id].active)
296 cpu_id = CPU->id;
297
298 thread->workq = workq;
299 thread->cpu = &cpus[cpu_id];
300 thread->workq_blocked = false;
301 thread->workq_idling = false;
302 link_initialize(&thread->workq_link);
303
304 list_append(&thread->workq_link, &workq->workers);
305 } else {
306 /*
307 * Work queue is shutting down - we must not add the worker
308 * and we cannot destroy it without ready-ing it. Mark it
309 * interrupted so the worker exits right away without even
310 * touching workq.
311 */
312 success = false;
313
314 /* cur_worker_cnt proactively increased in signal_worker() .*/
315 assert(0 < workq->cur_worker_cnt);
316 --workq->cur_worker_cnt;
317 }
318
319 irq_spinlock_unlock(&workq->lock, false);
320 irq_spinlock_unlock(&thread->lock, true);
321
322 if (!success) {
323 thread_interrupt(thread);
324 }
325
326 thread_ready(thread);
327
328 return success;
329}
330
331/** Shuts down the work queue. Waits for all pending work items to complete.
332 *
333 * workq_stop() may only be run once.
334 */
335void workq_stop(struct work_queue *workq)
336{
337 assert(!workq_corrupted(workq));
338
339 interrupt_workers(workq);
340 wait_for_workers(workq);
341}
342
343/** Notifies worker threads the work queue is shutting down. */
344static void interrupt_workers(struct work_queue *workq)
345{
346 irq_spinlock_lock(&workq->lock, true);
347
348 /* workq_stop() may only be called once. */
349 assert(!workq->stopping);
350 workq->stopping = true;
351
352 /* Respect lock ordering - do not hold workq->lock during broadcast. */
353 irq_spinlock_unlock(&workq->lock, true);
354
355 condvar_broadcast(&workq->activate_worker);
356}
357
358/** Waits for all worker threads to exit. */
359static void wait_for_workers(struct work_queue *workq)
360{
361 assert(!PREEMPTION_DISABLED);
362
363 irq_spinlock_lock(&workq->lock, true);
364
365 list_foreach_safe(workq->workers, cur_worker, next_worker) {
366 thread_t *worker = list_get_instance(cur_worker, thread_t, workq_link);
367 list_remove(cur_worker);
368
369 /* Wait without the lock. */
370 irq_spinlock_unlock(&workq->lock, true);
371
372 thread_join(worker);
373 thread_detach(worker);
374
375 irq_spinlock_lock(&workq->lock, true);
376 }
377
378 assert(list_empty(&workq->workers));
379
380 /* Wait for deferred add_worker_op(), signal_worker_op() to finish. */
381 while (0 < workq->cur_worker_cnt || 0 < workq->pending_op_cnt) {
382 irq_spinlock_unlock(&workq->lock, true);
383
384 scheduler();
385
386 irq_spinlock_lock(&workq->lock, true);
387 }
388
389 irq_spinlock_unlock(&workq->lock, true);
390}
391
392/** Queues a function into the global wait queue without blocking.
393 *
394 * See workq_enqueue_noblock() for more details.
395 */
396int workq_global_enqueue_noblock(work_t *work_item, work_func_t func)
397{
398 return workq_enqueue_noblock(&g_work_queue, work_item, func);
399}
400
401/** Queues a function into the global wait queue; may block.
402 *
403 * See workq_enqueue() for more details.
404 */
405int workq_global_enqueue(work_t *work_item, work_func_t func)
406{
407 return workq_enqueue(&g_work_queue, work_item, func);
408}
409
410/** Adds a function to be invoked in a separate thread without blocking.
411 *
412 * workq_enqueue_noblock() is guaranteed not to block. It is safe
413 * to invoke from interrupt handlers.
414 *
415 * Consider using workq_enqueue() instead if at all possible. Otherwise,
416 * your work item may have to wait for previously enqueued sleeping
417 * work items to complete if you are unlucky.
418 *
419 * @param workq Work queue where to queue the work item.
420 * @param work_item Work item bookkeeping structure. Must be valid
421 * until func() is entered.
422 * @param func User supplied function to invoke in a worker thread.
423
424 * @return false if work queue is shutting down; function is not
425 * queued for further processing.
426 * @return true Otherwise. func() will be invoked in a separate thread.
427 */
428int workq_enqueue_noblock(struct work_queue *workq, work_t *work_item,
429 work_func_t func)
430{
431 return _workq_enqueue(workq, work_item, func, false);
432}
433
434/** Adds a function to be invoked in a separate thread; may block.
435 *
436 * While the workq_enqueue() is unlikely to block, it may do so if too
437 * many previous work items blocked sleeping.
438 *
439 * @param workq Work queue where to queue the work item.
440 * @param work_item Work item bookkeeping structure. Must be valid
441 * until func() is entered.
442 * @param func User supplied function to invoke in a worker thread.
443
444 * @return false if work queue is shutting down; function is not
445 * queued for further processing.
446 * @return true Otherwise. func() will be invoked in a separate thread.
447 */
448int workq_enqueue(struct work_queue *workq, work_t *work_item, work_func_t func)
449{
450 return _workq_enqueue(workq, work_item, func, true);
451}
452
453/** Adds a work item that will be processed by a separate worker thread.
454 *
455 * func() will be invoked in another kernel thread and may block.
456 *
457 * Prefer to call _workq_enqueue() with can_block set. Otherwise
458 * your work item may have to wait for sleeping work items to complete.
459 * If all worker threads are blocked/sleeping a new worker thread cannot
460 * be create without can_block set because creating a thread might
461 * block due to low memory conditions.
462 *
463 * @param workq Work queue where to queue the work item.
464 * @param work_item Work item bookkeeping structure. Must be valid
465 * until func() is entered.
466 * @param func User supplied function to invoke in a worker thread.
467 * @param can_block May adding this work item block?
468
469 * @return false if work queue is shutting down; function is not
470 * queued for further processing.
471 * @return true Otherwise.
472 */
473static int _workq_enqueue(struct work_queue *workq, work_t *work_item,
474 work_func_t func, bool can_block)
475{
476 assert(!workq_corrupted(workq));
477
478 bool success = true;
479 signal_op_t signal_op = NULL;
480
481 irq_spinlock_lock(&workq->lock, true);
482
483 if (workq->stopping) {
484 success = false;
485 } else {
486 init_work_item(work_item, func);
487 list_append(&work_item->queue_link, &workq->queue);
488 ++workq->item_cnt;
489 success = true;
490
491 if (!booting) {
492 signal_op = signal_worker_logic(workq, can_block);
493 } else {
494 /*
495 * During boot there are no workers to signal. Just queue
496 * the work and let future workers take care of it.
497 */
498 }
499 }
500
501 irq_spinlock_unlock(&workq->lock, true);
502
503 if (signal_op) {
504 signal_op(workq);
505 }
506
507 return success;
508}
509
510/** Prepare an item to be added to the work item queue. */
511static void init_work_item(work_t *work_item, work_func_t func)
512{
513#ifdef CONFIG_DEBUG
514 work_item->cookie = WORK_ITEM_MAGIC;
515#endif
516
517 link_initialize(&work_item->queue_link);
518 work_item->func = func;
519}
520
521/** Returns the number of workers running work func() that are not blocked. */
522static size_t active_workers_now(struct work_queue *workq)
523{
524 assert(irq_spinlock_locked(&workq->lock));
525
526 /* Workers blocked are sleeping in the work function (ie not idle). */
527 assert(workq->blocked_worker_cnt <= workq->cur_worker_cnt);
528 /* Idle workers are waiting for more work to arrive in condvar_wait. */
529 assert(workq->idle_worker_cnt <= workq->cur_worker_cnt);
530
531 /* Idle + blocked workers == sleeping worker threads. */
532 size_t sleeping_workers = workq->blocked_worker_cnt + workq->idle_worker_cnt;
533
534 assert(sleeping_workers <= workq->cur_worker_cnt);
535 /* Workers pending activation are idle workers not yet given a time slice. */
536 assert(workq->activate_pending <= workq->idle_worker_cnt);
537
538 /*
539 * Workers actively running the work func() this very moment and
540 * are neither blocked nor idle. Exclude ->activate_pending workers
541 * since they will run their work func() once they get a time slice
542 * and are not running it right now.
543 */
544 return workq->cur_worker_cnt - sleeping_workers;
545}
546
547/**
548 * Returns the number of workers that are running or are about to run work
549 * func() and that are not blocked.
550 */
551static size_t active_workers(struct work_queue *workq)
552{
553 assert(irq_spinlock_locked(&workq->lock));
554
555 /*
556 * Workers actively running the work func() and are neither blocked nor
557 * idle. ->activate_pending workers will run their work func() once they
558 * get a time slice after waking from a condvar wait, so count them
559 * as well.
560 */
561 return active_workers_now(workq) + workq->activate_pending;
562}
563
564static void add_worker_noblock_op(struct work_queue *workq)
565{
566 condvar_signal(&nonblock_adder.req_cv);
567}
568
569static void add_worker_op(struct work_queue *workq)
570{
571 add_worker(workq);
572}
573
574static void signal_worker_op(struct work_queue *workq)
575{
576 assert(!workq_corrupted(workq));
577
578 condvar_signal(&workq->activate_worker);
579
580 irq_spinlock_lock(&workq->lock, true);
581 assert(0 < workq->pending_op_cnt);
582 --workq->pending_op_cnt;
583 irq_spinlock_unlock(&workq->lock, true);
584}
585
586/** Determines how to signal workers if at all.
587 *
588 * @param workq Work queue where a new work item was queued.
589 * @param can_block True if we may block while signaling a worker or creating
590 * a new worker.
591 *
592 * @return Function that will notify workers or NULL if no action is needed.
593 */
594static signal_op_t signal_worker_logic(struct work_queue *workq, bool can_block)
595{
596 assert(!workq_corrupted(workq));
597 assert(irq_spinlock_locked(&workq->lock));
598
599 /* Only signal workers if really necessary. */
600 signal_op_t signal_op = NULL;
601
602 /*
603 * Workers actively running the work func() and neither blocked nor idle.
604 * Including ->activate_pending workers that will run their work func()
605 * once they get a time slice.
606 */
607 size_t active = active_workers(workq);
608 /* Max total allowed number of work items queued for active workers. */
609 size_t max_load = active * max_items_per_worker;
610
611 /* Active workers are getting overwhelmed - activate another. */
612 if (max_load < workq->item_cnt) {
613
614 size_t remaining_idle =
615 workq->idle_worker_cnt - workq->activate_pending;
616
617 /* Idle workers still exist - activate one. */
618 if (remaining_idle > 0) {
619 /*
620 * Directly changing idle_worker_cnt here would not allow
621 * workers to recognize spurious wake-ups. Change
622 * activate_pending instead.
623 */
624 ++workq->activate_pending;
625 ++workq->pending_op_cnt;
626 signal_op = signal_worker_op;
627 } else {
628 /* No idle workers remain. Request that a new one be created. */
629 bool need_worker = (active < max_concurrent_workers)
630 && (workq->cur_worker_cnt < max_worker_cnt);
631
632 if (need_worker && can_block) {
633 signal_op = add_worker_op;
634 /*
635 * It may take some time to actually create the worker.
636 * We don't want to swamp the thread pool with superfluous
637 * worker creation requests so pretend it was already
638 * created and proactively increase the worker count.
639 */
640 ++workq->cur_worker_cnt;
641 }
642
643 /*
644 * We cannot create a new worker but we need one desperately
645 * because all workers are blocked in their work functions.
646 */
647 if (need_worker && !can_block && 0 == active) {
648 assert(0 == workq->idle_worker_cnt);
649
650 irq_spinlock_lock(&nonblock_adder.lock, true);
651
652 if (nonblock_adder.thread && !link_used(&workq->nb_link)) {
653 signal_op = add_worker_noblock_op;
654 ++workq->cur_worker_cnt;
655 list_append(&workq->nb_link, &nonblock_adder.work_queues);
656 }
657
658 irq_spinlock_unlock(&nonblock_adder.lock, true);
659 }
660 }
661 } else {
662 /*
663 * There are enough active/running workers to process the queue.
664 * No need to signal/activate any new workers.
665 */
666 signal_op = NULL;
667 }
668
669 return signal_op;
670}
671
672/** Executes queued work items. */
673static void worker_thread(void *arg)
674{
675 /*
676 * The thread has been created after the work queue was ordered to stop.
677 * Do not access the work queue and return immediately.
678 */
679 if (thread_interrupted(THREAD)) {
680 thread_detach(THREAD);
681 return;
682 }
683
684 assert(arg != NULL);
685
686 struct work_queue *workq = arg;
687 work_t *work_item;
688
689 while (dequeue_work(workq, &work_item)) {
690 /* Copy the func field so func() can safely free work_item. */
691 work_func_t func = work_item->func;
692
693 func(work_item);
694 }
695}
696
697/** Waits and retrieves a work item. Returns false if the worker should exit. */
698static bool dequeue_work(struct work_queue *workq, work_t **pwork_item)
699{
700 assert(!workq_corrupted(workq));
701
702 irq_spinlock_lock(&workq->lock, true);
703
704 /* Check if we should exit if load is low. */
705 if (!workq->stopping && worker_unnecessary(workq)) {
706 /* There are too many workers for this load. Exit. */
707 assert(0 < workq->cur_worker_cnt);
708 --workq->cur_worker_cnt;
709 list_remove(&THREAD->workq_link);
710 irq_spinlock_unlock(&workq->lock, true);
711
712 thread_detach(THREAD);
713 return false;
714 }
715
716 bool stop = false;
717
718 /* Wait for work to arrive. */
719 while (list_empty(&workq->queue) && !workq->stopping) {
720 cv_wait(workq);
721
722 if (0 < workq->activate_pending)
723 --workq->activate_pending;
724 }
725
726 /* Process remaining work even if requested to stop. */
727 if (!list_empty(&workq->queue)) {
728 link_t *work_link = list_first(&workq->queue);
729 *pwork_item = list_get_instance(work_link, work_t, queue_link);
730
731#ifdef CONFIG_DEBUG
732 assert(!work_item_corrupted(*pwork_item));
733 (*pwork_item)->cookie = 0;
734#endif
735 list_remove(work_link);
736 --workq->item_cnt;
737
738 stop = false;
739 } else {
740 /* Requested to stop and no more work queued. */
741 assert(workq->stopping);
742 --workq->cur_worker_cnt;
743 stop = true;
744 }
745
746 irq_spinlock_unlock(&workq->lock, true);
747
748 return !stop;
749}
750
751/** Returns true if for the given load there are too many workers. */
752static bool worker_unnecessary(struct work_queue *workq)
753{
754 assert(irq_spinlock_locked(&workq->lock));
755
756 /* No work is pending. We don't need too many idle threads. */
757 if (list_empty(&workq->queue)) {
758 /* There are too many idle workers. Exit. */
759 return (min_worker_cnt <= workq->idle_worker_cnt);
760 } else {
761 /*
762 * There is work but we are swamped with too many active workers
763 * that were woken up from sleep at around the same time. We
764 * don't need another worker fighting for cpu time.
765 */
766 size_t active = active_workers_now(workq);
767 return (max_concurrent_workers < active);
768 }
769}
770
771/** Waits for a signal to activate_worker. Thread marked idle while waiting. */
772static void cv_wait(struct work_queue *workq)
773{
774 ++workq->idle_worker_cnt;
775 THREAD->workq_idling = true;
776
777 /* Ignore lock ordering just here. */
778 assert(irq_spinlock_locked(&workq->lock));
779
780 _condvar_wait_timeout_irq_spinlock(&workq->activate_worker,
781 &workq->lock, SYNCH_NO_TIMEOUT, SYNCH_FLAGS_NONE);
782
783 assert(!workq_corrupted(workq));
784 assert(irq_spinlock_locked(&workq->lock));
785
786 THREAD->workq_idling = false;
787 --workq->idle_worker_cnt;
788}
789
790
791/** Invoked from thread_ready() right before the thread is woken up. */
792void workq_before_thread_is_ready(thread_t *thread)
793{
794 assert(thread);
795 assert(irq_spinlock_locked(&thread->lock));
796
797 /* Worker's work func() is about to wake up from sleeping. */
798 if (thread->workq && thread->workq_blocked) {
799 /* Must be blocked in user work func() and not be waiting for work. */
800 assert(!thread->workq_idling);
801 assert(thread->state == Sleeping);
802 assert(THREAD != thread);
803 assert(!workq_corrupted(thread->workq));
804
805 /* Protected by thread->lock */
806 thread->workq_blocked = false;
807
808 irq_spinlock_lock(&thread->workq->lock, true);
809 --thread->workq->blocked_worker_cnt;
810 irq_spinlock_unlock(&thread->workq->lock, true);
811 }
812}
813
814/** Invoked from scheduler() before switching away from a thread. */
815void workq_after_thread_ran(void)
816{
817 assert(THREAD);
818 assert(irq_spinlock_locked(&THREAD->lock));
819
820 /* Worker's work func() is about to sleep/block. */
821 if (THREAD->workq && THREAD->state == Sleeping && !THREAD->workq_idling) {
822 assert(!THREAD->workq_blocked);
823 assert(!workq_corrupted(THREAD->workq));
824
825 THREAD->workq_blocked = true;
826
827 irq_spinlock_lock(&THREAD->workq->lock, false);
828
829 ++THREAD->workq->blocked_worker_cnt;
830
831 bool can_block = false;
832 signal_op_t op = signal_worker_logic(THREAD->workq, can_block);
833
834 irq_spinlock_unlock(&THREAD->workq->lock, false);
835
836 if (op) {
837 assert(add_worker_noblock_op == op || signal_worker_op == op);
838 op(THREAD->workq);
839 }
840 }
841}
842
843/** Prints stats of the work queue to the kernel console. */
844void workq_print_info(struct work_queue *workq)
845{
846 irq_spinlock_lock(&workq->lock, true);
847
848 size_t total = workq->cur_worker_cnt;
849 size_t blocked = workq->blocked_worker_cnt;
850 size_t idle = workq->idle_worker_cnt;
851 size_t active = active_workers(workq);
852 size_t items = workq->item_cnt;
853 bool stopping = workq->stopping;
854 bool worker_surplus = worker_unnecessary(workq);
855 const char *load_str = worker_surplus ? "decreasing" :
856 (0 < workq->activate_pending) ? "increasing" : "stable";
857
858 irq_spinlock_unlock(&workq->lock, true);
859
860 printf(
861 "Configuration: max_worker_cnt=%zu, min_worker_cnt=%zu,\n"
862 " max_concurrent_workers=%zu, max_items_per_worker=%zu\n"
863 "Workers: %zu\n"
864 "Active: %zu (workers currently processing work)\n"
865 "Blocked: %zu (work functions sleeping/blocked)\n"
866 "Idle: %zu (idle workers waiting for more work)\n"
867 "Items: %zu (queued not yet dispatched work)\n"
868 "Stopping: %d\n"
869 "Load: %s\n",
870 max_worker_cnt, min_worker_cnt,
871 max_concurrent_workers, max_items_per_worker,
872 total,
873 active,
874 blocked,
875 idle,
876 items,
877 stopping,
878 load_str
879 );
880}
881
882/** Prints stats of the global work queue. */
883void workq_global_print_info(void)
884{
885 workq_print_info(&g_work_queue);
886}
887
888
889static bool dequeue_add_req(nonblock_adder_t *info, struct work_queue **pworkq)
890{
891 bool stop = false;
892
893 irq_spinlock_lock(&info->lock, true);
894
895 while (list_empty(&info->work_queues) && !stop) {
896 int ret = _condvar_wait_timeout_irq_spinlock(&info->req_cv,
897 &info->lock, SYNCH_NO_TIMEOUT, SYNCH_FLAGS_INTERRUPTIBLE);
898
899 stop = (ret == ESYNCH_INTERRUPTED);
900 }
901
902 if (!stop) {
903 *pworkq = list_get_instance(list_first(&info->work_queues),
904 struct work_queue, nb_link);
905
906 assert(!workq_corrupted(*pworkq));
907
908 list_remove(&(*pworkq)->nb_link);
909 }
910
911 irq_spinlock_unlock(&info->lock, true);
912
913 return !stop;
914}
915
916static void thr_nonblock_add_worker(void *arg)
917{
918 nonblock_adder_t *info = arg;
919 struct work_queue *workq;
920
921 while (dequeue_add_req(info, &workq)) {
922 add_worker(workq);
923 }
924}
925
926
927static void nonblock_init(void)
928{
929 irq_spinlock_initialize(&nonblock_adder.lock, "kworkq-nb.lock");
930 condvar_initialize(&nonblock_adder.req_cv);
931 list_initialize(&nonblock_adder.work_queues);
932
933 nonblock_adder.thread = thread_create(thr_nonblock_add_worker,
934 &nonblock_adder, TASK, THREAD_FLAG_NONE, "kworkq-nb");
935
936 if (nonblock_adder.thread) {
937 thread_ready(nonblock_adder.thread);
938 } else {
939 /*
940 * We won't be able to add workers without blocking if all workers
941 * sleep, but at least boot the system.
942 */
943 printf("Failed to create kworkq-nb. Sleeping work may stall the workq.\n");
944 }
945}
946
947#ifdef CONFIG_DEBUG
948/** Returns true if the workq is definitely corrupted; false if not sure.
949 *
950 * Can be used outside of any locks.
951 */
952static bool workq_corrupted(struct work_queue *workq)
953{
954 /*
955 * Needed to make the most current cookie value set by workq_preinit()
956 * visible even if we access the workq right after it is created but
957 * on a different cpu. Otherwise, workq_corrupted() would not work
958 * outside a lock.
959 */
960 memory_barrier();
961 return NULL == workq || workq->cookie != WORKQ_MAGIC;
962}
963
964/** Returns true if the work_item is definitely corrupted; false if not sure.
965 *
966 * Must be used with the work queue protecting spinlock locked.
967 */
968static bool work_item_corrupted(work_t *work_item)
969{
970 return NULL == work_item || work_item->cookie != WORK_ITEM_MAGIC;
971}
972#endif
973
974/** @}
975 */
Note: See TracBrowser for help on using the repository browser.