source: mainline/kernel/generic/src/synch/workqueue.c@ 82719589

lfn serial ticket/834-toolchain-update topic/msim-upgrade topic/simplify-dev-export
Last change on this file since 82719589 was 9f8745c5, checked in by Adam Hraska <adam.hraska+hos@…>, 13 years ago

workq: Moved almost all tests to a single test entry function.

  • Property mode set to 100644
File size: 27.2 KB
Line 
1/*
2 * Copyright (c) 2012 Adam Hraska
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 *
9 * - Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 * - Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
14 * - The name of the author may not be used to endorse or promote products
15 * derived from this software without specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
18 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
19 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
20 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
21 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
22 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
26 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27 */
28
29/** @addtogroup generic
30 * @{
31 */
32
33/**
34 * @file
35 * @brief Work queue/thread pool that automatically adjusts its size
36 * depending on the current load. Queued work functions may sleep..
37 */
38
39#include <synch/workqueue.h>
40#include <synch/spinlock.h>
41#include <synch/condvar.h>
42#include <synch/mutex.h>
43#include <proc/thread.h>
44#include <config.h>
45#include <arch.h>
46#include <cpu.h>
47#include <macros.h>
48
49#define WORKQ_MAGIC 0xf00c1333U
50#define WORK_ITEM_MAGIC 0xfeec1777U
51
52
53struct work_queue {
54 /*
55 * Protects everything except activate_worker.
56 * Must be acquired after any thread->locks.
57 */
58 IRQ_SPINLOCK_DECLARE(lock);
59
60 /* Activates a worker if new work arrives or if shutting down the queue. */
61 condvar_t activate_worker;
62
63 /* Queue of work_items ready to be dispatched. */
64 list_t queue;
65
66 /* List of worker threads. */
67 list_t workers;
68
69 /* Number of work items queued. */
70 size_t item_cnt;
71
72 /* Indicates the work queue is shutting down. */
73 bool stopping;
74 const char *name;
75
76 /* Total number of created worker threads. */
77 size_t cur_worker_cnt;
78 /* Number of workers waiting for work to arrive. */
79 size_t idle_worker_cnt;
80 /* Number of idle workers signaled that have not yet been woken up. */
81 size_t activate_pending;
82 /* Number of blocked workers sleeping in work func() (ie not idle). */
83 size_t blocked_worker_cnt;
84
85 /* Number of pending signal_worker_op() operations. */
86 size_t pending_op_cnt;
87
88 link_t nb_link;
89
90#ifdef CONFIG_DEBUG
91 /* Magic cookie for integrity checks. Immutable. Accessed without lock. */
92 uint32_t cookie;
93#endif
94};
95
96
97/** Min number of idle workers to keep. */
98static size_t min_worker_cnt;
99/** Max total number of workers - be it blocked, idle, or active. */
100static size_t max_worker_cnt;
101/** Max number of concurrently running active workers, ie not blocked nor idle. */
102static size_t max_concurrent_workers;
103/** Max number of work items per active worker before a new worker is activated.*/
104static const size_t max_items_per_worker = 8;
105
106/** System wide work queue. */
107static struct work_queue g_work_queue;
108
109static int booting = true;
110
111
112typedef struct {
113 IRQ_SPINLOCK_DECLARE(lock);
114 condvar_t req_cv;
115 thread_t *thread;
116 list_t work_queues;
117} nonblock_adder_t;
118
119static nonblock_adder_t nonblock_adder;
120
121
122
123/** Typedef a worker thread signaling operation prototype. */
124typedef void (*signal_op_t)(struct work_queue *workq);
125
126
127/* Fwd decl. */
128static void workq_preinit(struct work_queue *workq, const char *name);
129static bool add_worker(struct work_queue *workq);
130static void interrupt_workers(struct work_queue *workq);
131static void wait_for_workers(struct work_queue *workq);
132static int _workq_enqueue(struct work_queue *workq, work_t *work_item,
133 work_func_t func, bool can_block);
134static void init_work_item(work_t *work_item, work_func_t func);
135static signal_op_t signal_worker_logic(struct work_queue *workq, bool can_block);
136static void worker_thread(void *arg);
137static bool dequeue_work(struct work_queue *workq, work_t **pwork_item);
138static bool worker_unnecessary(struct work_queue *workq);
139static void cv_wait(struct work_queue *workq);
140static void nonblock_init(void);
141static bool workq_corrupted(struct work_queue *workq);
142static bool work_item_corrupted(work_t *work_item);
143
144
145/** Creates worker thread for the system-wide worker queue. */
146void workq_global_worker_init(void)
147{
148 /*
149 * No need for additional synchronization. Stores to word-sized
150 * variables are atomic and the change will eventually propagate.
151 * Moreover add_worker() includes the necessary memory barriers
152 * in spinlock lock/unlock().
153 */
154 booting = false;
155
156 nonblock_init();
157
158 if (!add_worker(&g_work_queue))
159 panic("Could not create a single global work queue worker!\n");
160
161}
162
163/** Initializes the system wide work queue and support for other work queues. */
164void workq_global_init(void)
165{
166 /* Keep idle workers on 1/4-th of cpus, but at least 2 threads. */
167 min_worker_cnt = max(2, config.cpu_count / 4);
168 /* Allow max 8 sleeping work items per cpu. */
169 max_worker_cnt = max(32, 8 * config.cpu_count);
170 /* Maximum concurrency without slowing down the system. */
171 max_concurrent_workers = max(2, config.cpu_count);
172
173 workq_preinit(&g_work_queue, "kworkq");
174}
175
176/** Stops the system global work queue and waits for all work items to complete.*/
177void workq_global_stop(void)
178{
179 workq_stop(&g_work_queue);
180}
181
182/** Creates and initializes a work queue. Returns NULL upon failure. */
183struct work_queue * workq_create(const char *name)
184{
185 struct work_queue *workq = malloc(sizeof(struct work_queue), 0);
186
187 if (workq) {
188 if (workq_init(workq, name)) {
189 ASSERT(!workq_corrupted(workq));
190 return workq;
191 }
192
193 free(workq);
194 }
195
196 return NULL;
197}
198
199/** Frees work queue resources and stops it if it had not been done so already.*/
200void workq_destroy(struct work_queue *workq)
201{
202 ASSERT(!workq_corrupted(workq));
203
204 irq_spinlock_lock(&workq->lock, true);
205 bool stopped = workq->stopping;
206 size_t running_workers = workq->cur_worker_cnt;
207 irq_spinlock_unlock(&workq->lock, true);
208
209 if (!stopped) {
210 workq_stop(workq);
211 } else {
212 ASSERT(0 == running_workers);
213 }
214
215#ifdef CONFIG_DEBUG
216 workq->cookie = 0;
217#endif
218
219 free(workq);
220}
221
222/** Initializes workq structure without creating any workers. */
223static void workq_preinit(struct work_queue *workq, const char *name)
224{
225#ifdef CONFIG_DEBUG
226 workq->cookie = WORKQ_MAGIC;
227#endif
228
229 irq_spinlock_initialize(&workq->lock, name);
230 condvar_initialize(&workq->activate_worker);
231
232 list_initialize(&workq->queue);
233 list_initialize(&workq->workers);
234
235 workq->item_cnt = 0;
236 workq->stopping = false;
237 workq->name = name;
238
239 workq->cur_worker_cnt = 1;
240 workq->idle_worker_cnt = 0;
241 workq->activate_pending = 0;
242 workq->blocked_worker_cnt = 0;
243
244 workq->pending_op_cnt = 0;
245 link_initialize(&workq->nb_link);
246}
247
248/** Initializes a work queue. Returns true if successful.
249 *
250 * Before destroying a work queue it must be stopped via
251 * workq_stop().
252 */
253int workq_init(struct work_queue *workq, const char *name)
254{
255 workq_preinit(workq, name);
256 return add_worker(workq);
257}
258
259/** Add a new worker thread. Returns false if the thread could not be created. */
260static bool add_worker(struct work_queue *workq)
261{
262 ASSERT(!workq_corrupted(workq));
263
264 thread_t *thread = thread_create(worker_thread, workq, TASK,
265 THREAD_FLAG_NONE, workq->name);
266
267 if (!thread) {
268 irq_spinlock_lock(&workq->lock, true);
269
270 /* cur_worker_cnt proactively increased in signal_worker_logic() .*/
271 ASSERT(0 < workq->cur_worker_cnt);
272 --workq->cur_worker_cnt;
273
274 irq_spinlock_unlock(&workq->lock, true);
275 return false;
276 }
277
278 /* Respect lock ordering. */
279 irq_spinlock_lock(&thread->lock, true);
280 irq_spinlock_lock(&workq->lock, false);
281
282 bool success;
283
284 if (!workq->stopping) {
285 success = true;
286
287 /* Try to distribute workers among cpus right away. */
288 unsigned int cpu_id = (workq->cur_worker_cnt) % config.cpu_active;
289
290 if (!cpus[cpu_id].active)
291 cpu_id = CPU->id;
292
293 thread->workq = workq;
294 thread->cpu = &cpus[cpu_id];
295 thread->workq_blocked = false;
296 thread->workq_idling = false;
297 link_initialize(&thread->workq_link);
298
299 list_append(&thread->workq_link, &workq->workers);
300 } else {
301 /*
302 * Work queue is shutting down - we must not add the worker
303 * and we cannot destroy it without ready-ing it. Mark it
304 * interrupted so the worker exits right away without even
305 * touching workq.
306 */
307 success = false;
308
309 /* cur_worker_cnt proactively increased in signal_worker() .*/
310 ASSERT(0 < workq->cur_worker_cnt);
311 --workq->cur_worker_cnt;
312 }
313
314 irq_spinlock_unlock(&workq->lock, false);
315 irq_spinlock_unlock(&thread->lock, true);
316
317 if (!success) {
318 thread_interrupt(thread);
319 }
320
321 thread_ready(thread);
322
323 return success;
324}
325
326/** Shuts down the work queue. Waits for all pending work items to complete.
327 *
328 * workq_stop() may only be run once.
329 */
330void workq_stop(struct work_queue *workq)
331{
332 ASSERT(!workq_corrupted(workq));
333
334 interrupt_workers(workq);
335 wait_for_workers(workq);
336}
337
338/** Notifies worker threads the work queue is shutting down. */
339static void interrupt_workers(struct work_queue *workq)
340{
341 irq_spinlock_lock(&workq->lock, true);
342
343 /* workq_stop() may only be called once. */
344 ASSERT(!workq->stopping);
345 workq->stopping = true;
346
347 /* Respect lock ordering - do not hold workq->lock during broadcast. */
348 irq_spinlock_unlock(&workq->lock, true);
349
350 condvar_broadcast(&workq->activate_worker);
351}
352
353/** Waits for all worker threads to exit. */
354static void wait_for_workers(struct work_queue *workq)
355{
356 ASSERT(!PREEMPTION_DISABLED);
357
358 irq_spinlock_lock(&workq->lock, true);
359
360 list_foreach_safe(workq->workers, cur_worker, next_worker) {
361 thread_t *worker = list_get_instance(cur_worker, thread_t, workq_link);
362 list_remove(cur_worker);
363
364 /* Wait without the lock. */
365 irq_spinlock_unlock(&workq->lock, true);
366
367 thread_join(worker);
368 thread_detach(worker);
369
370 irq_spinlock_lock(&workq->lock, true);
371 }
372
373 ASSERT(list_empty(&workq->workers));
374
375 /* Wait for deferred add_worker_op(), signal_worker_op() to finish. */
376 while (0 < workq->cur_worker_cnt || 0 < workq->pending_op_cnt) {
377 irq_spinlock_unlock(&workq->lock, true);
378
379 scheduler();
380
381 irq_spinlock_lock(&workq->lock, true);
382 }
383
384 irq_spinlock_unlock(&workq->lock, true);
385}
386
387/** Queues a function into the global wait queue without blocking.
388 *
389 * See workq_enqueue_noblock() for more details.
390 */
391int workq_global_enqueue_noblock(work_t *work_item, work_func_t func)
392{
393 return workq_enqueue_noblock(&g_work_queue, work_item, func);
394}
395
396/** Queues a function into the global wait queue; may block.
397 *
398 * See workq_enqueue() for more details.
399 */
400int workq_global_enqueue(work_t *work_item, work_func_t func)
401{
402 return workq_enqueue(&g_work_queue, work_item, func);
403}
404
405/** Adds a function to be invoked in a separate thread without blocking.
406 *
407 * workq_enqueue_noblock() is guaranteed not to block. It is safe
408 * to invoke from interrupt handlers.
409 *
410 * Consider using workq_enqueue() instead if at all possible. Otherwise,
411 * your work item may have to wait for previously enqueued sleeping
412 * work items to complete if you are unlucky.
413 *
414 * @param workq Work queue where to queue the work item.
415 * @param work_item Work item bookkeeping structure. Must be valid
416 * until func() is entered.
417 * @param func User supplied function to invoke in a worker thread.
418
419 * @return false if work queue is shutting down; function is not
420 * queued for further processing.
421 * @return true Otherwise. func() will be invoked in a separate thread.
422 */
423int workq_enqueue_noblock(struct work_queue *workq, work_t *work_item,
424 work_func_t func)
425{
426 return _workq_enqueue(workq, work_item, func, false);
427}
428
429/** Adds a function to be invoked in a separate thread; may blocking.
430 *
431 * While the workq_enqueue() is unlikely to block, it may do so if too
432 * many previous work items blocked sleeping.
433 *
434 * @param workq Work queue where to queue the work item.
435 * @param work_item Work item bookkeeping structure. Must be valid
436 * until func() is entered.
437 * @param func User supplied function to invoke in a worker thread.
438
439 * @return false if work queue is shutting down; function is not
440 * queued for further processing.
441 * @return true Otherwise. func() will be invoked in a separate thread.
442 */
443int workq_enqueue(struct work_queue *workq, work_t *work_item, work_func_t func)
444{
445 return _workq_enqueue(workq, work_item, func, true);
446}
447
448/** Adds a work item that will be processed by a separate worker thread.
449 *
450 * func() will be invoked in another kernel thread and may block.
451 *
452 * Prefer to call _workq_enqueue() with can_block set. Otherwise
453 * your work item may have to wait for sleeping work items to complete.
454 * If all worker threads are blocked/sleeping a new worker thread cannot
455 * be create without can_block set because creating a thread might
456 * block due to low memory conditions.
457 *
458 * @param workq Work queue where to queue the work item.
459 * @param work_item Work item bookkeeping structure. Must be valid
460 * until func() is entered.
461 * @param func User supplied function to invoke in a worker thread.
462 * @param can_block May adding this work item block?
463
464 * @return false if work queue is shutting down; function is not
465 * queued for further processing.
466 * @return true Otherwise.
467 */
468static int _workq_enqueue(struct work_queue *workq, work_t *work_item,
469 work_func_t func, bool can_block)
470{
471 ASSERT(!workq_corrupted(workq));
472
473 bool success = true;
474 signal_op_t signal_op = NULL;
475
476 irq_spinlock_lock(&workq->lock, true);
477
478 if (workq->stopping) {
479 success = false;
480 } else {
481 init_work_item(work_item, func);
482 list_append(&work_item->queue_link, &workq->queue);
483 ++workq->item_cnt;
484 success = true;
485
486 if (!booting) {
487 signal_op = signal_worker_logic(workq, can_block);
488 } else {
489 /*
490 * During boot there are no workers to signal. Just queue
491 * the work and let future workers take care of it.
492 */
493 }
494 }
495
496 irq_spinlock_unlock(&workq->lock, true);
497
498 if (signal_op) {
499 signal_op(workq);
500 }
501
502 return success;
503}
504
505/** Prepare an item to be added to the work item queue. */
506static void init_work_item(work_t *work_item, work_func_t func)
507{
508#ifdef CONFIG_DEBUG
509 work_item->cookie = WORK_ITEM_MAGIC;
510#endif
511
512 link_initialize(&work_item->queue_link);
513 work_item->func = func;
514}
515
516/** Returns the number of workers running work func() that are not blocked. */
517static size_t active_workers_now(struct work_queue *workq)
518{
519 ASSERT(irq_spinlock_locked(&workq->lock));
520
521 /* Workers blocked are sleeping in the work function (ie not idle). */
522 ASSERT(workq->blocked_worker_cnt <= workq->cur_worker_cnt);
523 /* Idle workers are waiting for more work to arrive in condvar_wait. */
524 ASSERT(workq->idle_worker_cnt <= workq->cur_worker_cnt);
525
526 /* Idle + blocked workers == sleeping worker threads. */
527 size_t sleeping_workers = workq->blocked_worker_cnt + workq->idle_worker_cnt;
528
529 ASSERT(sleeping_workers <= workq->cur_worker_cnt);
530 /* Workers pending activation are idle workers not yet given a time slice. */
531 ASSERT(workq->activate_pending <= workq->idle_worker_cnt);
532
533 /*
534 * Workers actively running the work func() this very moment and
535 * are neither blocked nor idle. Exclude ->activate_pending workers
536 * since they will run their work func() once they get a time slice
537 * and are not running it right now.
538 */
539 return workq->cur_worker_cnt - sleeping_workers;
540}
541
542/**
543 * Returns the number of workers that are running or are about to run work
544 * func() and that are not blocked.
545 */
546static size_t active_workers(struct work_queue *workq)
547{
548 ASSERT(irq_spinlock_locked(&workq->lock));
549
550 /*
551 * Workers actively running the work func() and are neither blocked nor
552 * idle. ->activate_pending workers will run their work func() once they
553 * get a time slice after waking from a condvar wait, so count them
554 * as well.
555 */
556 return active_workers_now(workq) + workq->activate_pending;
557}
558
559static void add_worker_noblock_op(struct work_queue *workq)
560{
561 condvar_signal(&nonblock_adder.req_cv);
562}
563
564static void add_worker_op(struct work_queue *workq)
565{
566 add_worker(workq);
567}
568
569static void signal_worker_op(struct work_queue *workq)
570{
571 ASSERT(!workq_corrupted(workq));
572
573 condvar_signal(&workq->activate_worker);
574
575 irq_spinlock_lock(&workq->lock, true);
576 ASSERT(0 < workq->pending_op_cnt);
577 --workq->pending_op_cnt;
578 irq_spinlock_unlock(&workq->lock, true);
579}
580
581/** Determines how to signal workers if at all.
582 *
583 * @param workq Work queue where a new work item was queued.
584 * @param can_block True if we may block while signaling a worker or creating
585 * a new worker.
586 *
587 * @return Function that will notify workers or NULL if no action is needed.
588 */
589static signal_op_t signal_worker_logic(struct work_queue *workq, bool can_block)
590{
591 ASSERT(!workq_corrupted(workq));
592 ASSERT(irq_spinlock_locked(&workq->lock));
593
594 /* Only signal workers if really necessary. */
595 signal_op_t signal_op = NULL;
596
597 /*
598 * Workers actively running the work func() and neither blocked nor idle.
599 * Including ->activate_pending workers that will run their work func()
600 * once they get a time slice.
601 */
602 size_t active = active_workers(workq);
603 /* Max total allowed number of work items queued for active workers. */
604 size_t max_load = active * max_items_per_worker;
605
606 /* Active workers are getting overwhelmed - activate another. */
607 if (max_load < workq->item_cnt) {
608
609 size_t remaining_idle =
610 workq->idle_worker_cnt - workq->activate_pending;
611
612 /* Idle workers still exist - activate one. */
613 if (remaining_idle > 0) {
614 /*
615 * Directly changing idle_worker_cnt here would not allow
616 * workers to recognize spurious wake-ups. Change
617 * activate_pending instead.
618 */
619 ++workq->activate_pending;
620 ++workq->pending_op_cnt;
621 signal_op = signal_worker_op;
622 } else {
623 /* No idle workers remain. Request that a new one be created. */
624 bool need_worker = (active < max_concurrent_workers)
625 && (workq->cur_worker_cnt < max_worker_cnt);
626
627 if (need_worker && can_block) {
628 signal_op = add_worker_op;
629 /*
630 * It may take some time to actually create the worker.
631 * We don't want to swamp the thread pool with superfluous
632 * worker creation requests so pretend it was already
633 * created and proactively increase the worker count.
634 */
635 ++workq->cur_worker_cnt;
636 }
637
638 /*
639 * We cannot create a new worker but we need one desperately
640 * because all workers are blocked in their work functions.
641 */
642 if (need_worker && !can_block && 0 == active) {
643 ASSERT(0 == workq->idle_worker_cnt);
644
645 irq_spinlock_lock(&nonblock_adder.lock, true);
646
647 if (nonblock_adder.thread && !link_used(&workq->nb_link)) {
648 signal_op = add_worker_noblock_op;
649 ++workq->cur_worker_cnt;
650 list_append(&workq->nb_link, &nonblock_adder.work_queues);
651 }
652
653 irq_spinlock_unlock(&nonblock_adder.lock, true);
654 }
655 }
656 } else {
657 /*
658 * There are enough active/running workers to process the queue.
659 * No need to signal/activate any new workers.
660 */
661 signal_op = NULL;
662 }
663
664 return signal_op;
665}
666
667/** Executes queued work items. */
668static void worker_thread(void *arg)
669{
670 /*
671 * The thread has been created after the work queue was ordered to stop.
672 * Do not access the work queue and return immediately.
673 */
674 if (thread_interrupted(THREAD)) {
675 thread_detach(THREAD);
676 return;
677 }
678
679 ASSERT(arg != NULL);
680
681 struct work_queue *workq = arg;
682 work_t *work_item;
683
684 while (dequeue_work(workq, &work_item)) {
685 /* Copy the func field so func() can safely free work_item. */
686 work_func_t func = work_item->func;
687
688 func(work_item);
689 }
690}
691
692/** Waits and retrieves a work item. Returns false if the worker should exit. */
693static bool dequeue_work(struct work_queue *workq, work_t **pwork_item)
694{
695 ASSERT(!workq_corrupted(workq));
696
697 irq_spinlock_lock(&workq->lock, true);
698
699 /* Check if we should exit if load is low. */
700 if (!workq->stopping && worker_unnecessary(workq)) {
701 /* There are too many workers for this load. Exit. */
702 ASSERT(0 < workq->cur_worker_cnt);
703 --workq->cur_worker_cnt;
704 list_remove(&THREAD->workq_link);
705 irq_spinlock_unlock(&workq->lock, true);
706
707 thread_detach(THREAD);
708 return false;
709 }
710
711 bool stop = false;
712
713 /* Wait for work to arrive. */
714 while (list_empty(&workq->queue) && !workq->stopping) {
715 cv_wait(workq);
716
717 if (0 < workq->activate_pending)
718 --workq->activate_pending;
719 }
720
721 /* Process remaining work even if requested to stop. */
722 if (!list_empty(&workq->queue)) {
723 link_t *work_link = list_first(&workq->queue);
724 *pwork_item = list_get_instance(work_link, work_t, queue_link);
725
726#ifdef CONFIG_DEBUG
727 ASSERT(!work_item_corrupted(*pwork_item));
728 (*pwork_item)->cookie = 0;
729#endif
730 list_remove(work_link);
731 --workq->item_cnt;
732
733 stop = false;
734 } else {
735 /* Requested to stop and no more work queued. */
736 ASSERT(workq->stopping);
737 --workq->cur_worker_cnt;
738 stop = true;
739 }
740
741 irq_spinlock_unlock(&workq->lock, true);
742
743 return !stop;
744}
745
746/** Returns true if for the given load there are too many workers. */
747static bool worker_unnecessary(struct work_queue *workq)
748{
749 ASSERT(irq_spinlock_locked(&workq->lock));
750
751 /* No work is pending. We don't need too many idle threads. */
752 if (list_empty(&workq->queue)) {
753 /* There are too many idle workers. Exit. */
754 return (min_worker_cnt <= workq->idle_worker_cnt);
755 } else {
756 /*
757 * There is work but we are swamped with too many active workers
758 * that were woken up from sleep at around the same time. We
759 * don't need another worker fighting for cpu time.
760 */
761 size_t active = active_workers_now(workq);
762 return (max_concurrent_workers < active);
763 }
764}
765
766/** Waits for a signal to activate_worker. Thread marked idle while waiting. */
767static void cv_wait(struct work_queue *workq)
768{
769 ++workq->idle_worker_cnt;
770 THREAD->workq_idling = true;
771
772 /* Ignore lock ordering just here. */
773 ASSERT(irq_spinlock_locked(&workq->lock));
774
775 _condvar_wait_timeout_irq_spinlock(&workq->activate_worker,
776 &workq->lock, SYNCH_NO_TIMEOUT, SYNCH_FLAGS_NONE);
777
778 ASSERT(!workq_corrupted(workq));
779 ASSERT(irq_spinlock_locked(&workq->lock));
780
781 THREAD->workq_idling = false;
782 --workq->idle_worker_cnt;
783}
784
785
786/** Invoked from thread_ready() right before the thread is woken up. */
787void workq_before_thread_is_ready(thread_t *thread)
788{
789 ASSERT(thread);
790 ASSERT(irq_spinlock_locked(&thread->lock));
791
792 /* Worker's work func() is about to wake up from sleeping. */
793 if (thread->workq && thread->workq_blocked) {
794 /* Must be blocked in user work func() and not be waiting for work. */
795 ASSERT(!thread->workq_idling);
796 ASSERT(thread->state == Sleeping);
797 ASSERT(THREAD != thread);
798 ASSERT(!workq_corrupted(thread->workq));
799
800 /* Protected by thread->lock */
801 thread->workq_blocked = false;
802
803 irq_spinlock_lock(&thread->workq->lock, true);
804 --thread->workq->blocked_worker_cnt;
805 irq_spinlock_unlock(&thread->workq->lock, true);
806 }
807}
808
809/** Invoked from scheduler() before switching away from a thread. */
810void workq_after_thread_ran(void)
811{
812 ASSERT(THREAD);
813 ASSERT(irq_spinlock_locked(&THREAD->lock));
814
815 /* Worker's work func() is about to sleep/block. */
816 if (THREAD->workq && THREAD->state == Sleeping && !THREAD->workq_idling) {
817 ASSERT(!THREAD->workq_blocked);
818 ASSERT(!workq_corrupted(THREAD->workq));
819
820 THREAD->workq_blocked = true;
821
822 irq_spinlock_lock(&THREAD->workq->lock, false);
823
824 ++THREAD->workq->blocked_worker_cnt;
825
826 bool can_block = false;
827 signal_op_t op = signal_worker_logic(THREAD->workq, can_block);
828
829 irq_spinlock_unlock(&THREAD->workq->lock, false);
830
831 if (op) {
832 ASSERT(add_worker_noblock_op == op || signal_worker_op == op);
833 op(THREAD->workq);
834 }
835 }
836}
837
838/** Prints stats of the work queue to the kernel console. */
839void workq_print_info(struct work_queue *workq)
840{
841 irq_spinlock_lock(&workq->lock, true);
842
843 size_t total = workq->cur_worker_cnt;
844 size_t blocked = workq->blocked_worker_cnt;
845 size_t idle = workq->idle_worker_cnt;
846 size_t active = active_workers(workq);
847 size_t items = workq->item_cnt;
848 bool stopping = workq->stopping;
849 bool worker_surplus = worker_unnecessary(workq);
850 const char *load_str = worker_surplus ? "decreasing" :
851 (0 < workq->activate_pending) ? "increasing" : "stable";
852
853 irq_spinlock_unlock(&workq->lock, true);
854
855 printf(
856 "Configuration: max_worker_cnt=%zu, min_worker_cnt=%zu,\n"
857 " max_concurrent_workers=%zu, max_items_per_worker=%zu\n"
858 "Workers: %zu\n"
859 "Active: %zu (workers currently processing work)\n"
860 "Blocked: %zu (work functions sleeping/blocked)\n"
861 "Idle: %zu (idle workers waiting for more work)\n"
862 "Items: %zu (queued not yet dispatched work)\n"
863 "Stopping: %d\n"
864 "Load: %s\n",
865 max_worker_cnt, min_worker_cnt,
866 max_concurrent_workers, max_items_per_worker,
867 total,
868 active,
869 blocked,
870 idle,
871 items,
872 stopping,
873 load_str
874 );
875}
876
877/** Prints stats of the global work queue. */
878void workq_global_print_info(void)
879{
880 workq_print_info(&g_work_queue);
881}
882
883
884static bool dequeue_add_req(nonblock_adder_t *info, struct work_queue **pworkq)
885{
886 bool stop = false;
887
888 irq_spinlock_lock(&info->lock, true);
889
890 while (list_empty(&info->work_queues) && !stop) {
891 int ret = _condvar_wait_timeout_irq_spinlock(&info->req_cv,
892 &info->lock, SYNCH_NO_TIMEOUT, SYNCH_FLAGS_INTERRUPTIBLE);
893
894 stop = (ret == ESYNCH_INTERRUPTED);
895 }
896
897 if (!stop) {
898 *pworkq = list_get_instance(list_first(&info->work_queues),
899 struct work_queue, nb_link);
900
901 ASSERT(!workq_corrupted(*pworkq));
902
903 list_remove(&(*pworkq)->nb_link);
904 }
905
906 irq_spinlock_unlock(&info->lock, true);
907
908 return !stop;
909}
910
911static void thr_nonblock_add_worker(void *arg)
912{
913 nonblock_adder_t *info = arg;
914 struct work_queue *workq;
915
916 while (dequeue_add_req(info, &workq)) {
917 add_worker(workq);
918 }
919}
920
921
922static void nonblock_init(void)
923{
924 irq_spinlock_initialize(&nonblock_adder.lock, "kworkq-nb.lock");
925 condvar_initialize(&nonblock_adder.req_cv);
926 list_initialize(&nonblock_adder.work_queues);
927
928 nonblock_adder.thread = thread_create(thr_nonblock_add_worker,
929 &nonblock_adder, TASK, THREAD_FLAG_NONE, "kworkq-nb");
930
931 if (nonblock_adder.thread) {
932 thread_ready(nonblock_adder.thread);
933 } else {
934 /*
935 * We won't be able to add workers without blocking if all workers
936 * sleep, but at least boot the system.
937 */
938 printf("Failed to create kworkq-nb. Sleeping work may stall the workq.\n");
939 }
940}
941
942/** Returns true if the workq is definitely corrupted; false if not sure.
943 *
944 * Can be used outside of any locks.
945 */
946static bool workq_corrupted(struct work_queue *workq)
947{
948#ifdef CONFIG_DEBUG
949 /*
950 * Needed to make the most current cookie value set by workq_preinit()
951 * visible even if we access the workq right after it is created but
952 * on a different cpu. Otherwise, workq_corrupted() would not work
953 * outside a lock.
954 */
955 memory_barrier();
956 return NULL == workq || workq->cookie != WORKQ_MAGIC;
957#else
958 return false;
959#endif
960}
961
962/** Returns true if the work_item is definitely corrupted; false if not sure.
963 *
964 * Must be used with the work queue protecting spinlock locked.
965 */
966static bool work_item_corrupted(work_t *work_item)
967{
968#ifdef CONFIG_DEBUG
969 return NULL == work_item || work_item->cookie != WORK_ITEM_MAGIC;
970#else
971 return false;
972#endif
973}
974
975/** @}
976 */
Note: See TracBrowser for help on using the repository browser.