source: mainline/kernel/generic/src/synch/workqueue.c@ 022d72ff

lfn serial ticket/834-toolchain-update topic/msim-upgrade topic/simplify-dev-export
Last change on this file since 022d72ff was 04552324, checked in by Adam Hraska <adam.hraska+hos@…>, 13 years ago

cht, workq: Got rid of warnings in release builds.

  • Property mode set to 100644
File size: 27.2 KB
Line 
1/*
2 * Copyright (c) 2012 Adam Hraska
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 *
9 * - Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 * - Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
14 * - The name of the author may not be used to endorse or promote products
15 * derived from this software without specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
18 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
19 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
20 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
21 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
22 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
26 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27 */
28
29/** @addtogroup generic
30 * @{
31 */
32
33/**
34 * @file
35 * @brief Work queue/thread pool that automatically adjusts its size
36 * depending on the current load. Queued work functions may sleep..
37 */
38
39#include <synch/workqueue.h>
40#include <synch/spinlock.h>
41#include <synch/condvar.h>
42#include <synch/mutex.h>
43#include <proc/thread.h>
44#include <config.h>
45#include <arch.h>
46#include <cpu.h>
47#include <macros.h>
48
49#define WORKQ_MAGIC 0xf00c1333U
50#define WORK_ITEM_MAGIC 0xfeec1777U
51
52
53struct work_queue {
54 /*
55 * Protects everything except activate_worker.
56 * Must be acquired after any thread->locks.
57 */
58 IRQ_SPINLOCK_DECLARE(lock);
59
60 /* Activates a worker if new work arrives or if shutting down the queue. */
61 condvar_t activate_worker;
62
63 /* Queue of work_items ready to be dispatched. */
64 list_t queue;
65
66 /* List of worker threads. */
67 list_t workers;
68
69 /* Number of work items queued. */
70 size_t item_cnt;
71
72 /* Indicates the work queue is shutting down. */
73 bool stopping;
74 const char *name;
75
76 /* Total number of created worker threads. */
77 size_t cur_worker_cnt;
78 /* Number of workers waiting for work to arrive. */
79 size_t idle_worker_cnt;
80 /* Number of idle workers signaled that have not yet been woken up. */
81 size_t activate_pending;
82 /* Number of blocked workers sleeping in work func() (ie not idle). */
83 size_t blocked_worker_cnt;
84
85 /* Number of pending signal_worker_op() operations. */
86 size_t pending_op_cnt;
87
88 link_t nb_link;
89
90#ifdef CONFIG_DEBUG
91 /* Magic cookie for integrity checks. Immutable. Accessed without lock. */
92 uint32_t cookie;
93#endif
94};
95
96
97/** Min number of idle workers to keep. */
98static size_t min_worker_cnt;
99/** Max total number of workers - be it blocked, idle, or active. */
100static size_t max_worker_cnt;
101/** Max number of concurrently running active workers, ie not blocked nor idle. */
102static size_t max_concurrent_workers;
103/** Max number of work items per active worker before a new worker is activated.*/
104static const size_t max_items_per_worker = 8;
105
106/** System wide work queue. */
107static struct work_queue g_work_queue;
108
109static int booting = true;
110
111
112typedef struct {
113 IRQ_SPINLOCK_DECLARE(lock);
114 condvar_t req_cv;
115 thread_t *thread;
116 list_t work_queues;
117} nonblock_adder_t;
118
119static nonblock_adder_t nonblock_adder;
120
121
122
123/** Typedef a worker thread signaling operation prototype. */
124typedef void (*signal_op_t)(struct work_queue *workq);
125
126
127/* Fwd decl. */
128static void workq_preinit(struct work_queue *workq, const char *name);
129static bool add_worker(struct work_queue *workq);
130static void interrupt_workers(struct work_queue *workq);
131static void wait_for_workers(struct work_queue *workq);
132static int _workq_enqueue(struct work_queue *workq, work_t *work_item,
133 work_func_t func, bool can_block);
134static void init_work_item(work_t *work_item, work_func_t func);
135static signal_op_t signal_worker_logic(struct work_queue *workq, bool can_block);
136static void worker_thread(void *arg);
137static bool dequeue_work(struct work_queue *workq, work_t **pwork_item);
138static bool worker_unnecessary(struct work_queue *workq);
139static void cv_wait(struct work_queue *workq);
140static void nonblock_init(void);
141
142#ifdef CONFIG_DEBUG
143static bool workq_corrupted(struct work_queue *workq);
144static bool work_item_corrupted(work_t *work_item);
145#endif
146
147/** Creates worker thread for the system-wide worker queue. */
148void workq_global_worker_init(void)
149{
150 /*
151 * No need for additional synchronization. Stores to word-sized
152 * variables are atomic and the change will eventually propagate.
153 * Moreover add_worker() includes the necessary memory barriers
154 * in spinlock lock/unlock().
155 */
156 booting = false;
157
158 nonblock_init();
159
160 if (!add_worker(&g_work_queue))
161 panic("Could not create a single global work queue worker!\n");
162
163}
164
165/** Initializes the system wide work queue and support for other work queues. */
166void workq_global_init(void)
167{
168 /* Keep idle workers on 1/4-th of cpus, but at least 2 threads. */
169 min_worker_cnt = max(2, config.cpu_count / 4);
170 /* Allow max 8 sleeping work items per cpu. */
171 max_worker_cnt = max(32, 8 * config.cpu_count);
172 /* Maximum concurrency without slowing down the system. */
173 max_concurrent_workers = max(2, config.cpu_count);
174
175 workq_preinit(&g_work_queue, "kworkq");
176}
177
178/** Stops the system global work queue and waits for all work items to complete.*/
179void workq_global_stop(void)
180{
181 workq_stop(&g_work_queue);
182}
183
184/** Creates and initializes a work queue. Returns NULL upon failure. */
185struct work_queue * workq_create(const char *name)
186{
187 struct work_queue *workq = malloc(sizeof(struct work_queue), 0);
188
189 if (workq) {
190 if (workq_init(workq, name)) {
191 ASSERT(!workq_corrupted(workq));
192 return workq;
193 }
194
195 free(workq);
196 }
197
198 return NULL;
199}
200
201/** Frees work queue resources and stops it if it had not been done so already.*/
202void workq_destroy(struct work_queue *workq)
203{
204 ASSERT(!workq_corrupted(workq));
205
206 irq_spinlock_lock(&workq->lock, true);
207 bool stopped = workq->stopping;
208#ifdef CONFIG_DEBUG
209 size_t running_workers = workq->cur_worker_cnt;
210#endif
211 irq_spinlock_unlock(&workq->lock, true);
212
213 if (!stopped) {
214 workq_stop(workq);
215 } else {
216 ASSERT(0 == running_workers);
217 }
218
219#ifdef CONFIG_DEBUG
220 workq->cookie = 0;
221#endif
222
223 free(workq);
224}
225
226/** Initializes workq structure without creating any workers. */
227static void workq_preinit(struct work_queue *workq, const char *name)
228{
229#ifdef CONFIG_DEBUG
230 workq->cookie = WORKQ_MAGIC;
231#endif
232
233 irq_spinlock_initialize(&workq->lock, name);
234 condvar_initialize(&workq->activate_worker);
235
236 list_initialize(&workq->queue);
237 list_initialize(&workq->workers);
238
239 workq->item_cnt = 0;
240 workq->stopping = false;
241 workq->name = name;
242
243 workq->cur_worker_cnt = 1;
244 workq->idle_worker_cnt = 0;
245 workq->activate_pending = 0;
246 workq->blocked_worker_cnt = 0;
247
248 workq->pending_op_cnt = 0;
249 link_initialize(&workq->nb_link);
250}
251
252/** Initializes a work queue. Returns true if successful.
253 *
254 * Before destroying a work queue it must be stopped via
255 * workq_stop().
256 */
257int workq_init(struct work_queue *workq, const char *name)
258{
259 workq_preinit(workq, name);
260 return add_worker(workq);
261}
262
263/** Add a new worker thread. Returns false if the thread could not be created. */
264static bool add_worker(struct work_queue *workq)
265{
266 ASSERT(!workq_corrupted(workq));
267
268 thread_t *thread = thread_create(worker_thread, workq, TASK,
269 THREAD_FLAG_NONE, workq->name);
270
271 if (!thread) {
272 irq_spinlock_lock(&workq->lock, true);
273
274 /* cur_worker_cnt proactively increased in signal_worker_logic() .*/
275 ASSERT(0 < workq->cur_worker_cnt);
276 --workq->cur_worker_cnt;
277
278 irq_spinlock_unlock(&workq->lock, true);
279 return false;
280 }
281
282 /* Respect lock ordering. */
283 irq_spinlock_lock(&thread->lock, true);
284 irq_spinlock_lock(&workq->lock, false);
285
286 bool success;
287
288 if (!workq->stopping) {
289 success = true;
290
291 /* Try to distribute workers among cpus right away. */
292 unsigned int cpu_id = (workq->cur_worker_cnt) % config.cpu_active;
293
294 if (!cpus[cpu_id].active)
295 cpu_id = CPU->id;
296
297 thread->workq = workq;
298 thread->cpu = &cpus[cpu_id];
299 thread->workq_blocked = false;
300 thread->workq_idling = false;
301 link_initialize(&thread->workq_link);
302
303 list_append(&thread->workq_link, &workq->workers);
304 } else {
305 /*
306 * Work queue is shutting down - we must not add the worker
307 * and we cannot destroy it without ready-ing it. Mark it
308 * interrupted so the worker exits right away without even
309 * touching workq.
310 */
311 success = false;
312
313 /* cur_worker_cnt proactively increased in signal_worker() .*/
314 ASSERT(0 < workq->cur_worker_cnt);
315 --workq->cur_worker_cnt;
316 }
317
318 irq_spinlock_unlock(&workq->lock, false);
319 irq_spinlock_unlock(&thread->lock, true);
320
321 if (!success) {
322 thread_interrupt(thread);
323 }
324
325 thread_ready(thread);
326
327 return success;
328}
329
330/** Shuts down the work queue. Waits for all pending work items to complete.
331 *
332 * workq_stop() may only be run once.
333 */
334void workq_stop(struct work_queue *workq)
335{
336 ASSERT(!workq_corrupted(workq));
337
338 interrupt_workers(workq);
339 wait_for_workers(workq);
340}
341
342/** Notifies worker threads the work queue is shutting down. */
343static void interrupt_workers(struct work_queue *workq)
344{
345 irq_spinlock_lock(&workq->lock, true);
346
347 /* workq_stop() may only be called once. */
348 ASSERT(!workq->stopping);
349 workq->stopping = true;
350
351 /* Respect lock ordering - do not hold workq->lock during broadcast. */
352 irq_spinlock_unlock(&workq->lock, true);
353
354 condvar_broadcast(&workq->activate_worker);
355}
356
357/** Waits for all worker threads to exit. */
358static void wait_for_workers(struct work_queue *workq)
359{
360 ASSERT(!PREEMPTION_DISABLED);
361
362 irq_spinlock_lock(&workq->lock, true);
363
364 list_foreach_safe(workq->workers, cur_worker, next_worker) {
365 thread_t *worker = list_get_instance(cur_worker, thread_t, workq_link);
366 list_remove(cur_worker);
367
368 /* Wait without the lock. */
369 irq_spinlock_unlock(&workq->lock, true);
370
371 thread_join(worker);
372 thread_detach(worker);
373
374 irq_spinlock_lock(&workq->lock, true);
375 }
376
377 ASSERT(list_empty(&workq->workers));
378
379 /* Wait for deferred add_worker_op(), signal_worker_op() to finish. */
380 while (0 < workq->cur_worker_cnt || 0 < workq->pending_op_cnt) {
381 irq_spinlock_unlock(&workq->lock, true);
382
383 scheduler();
384
385 irq_spinlock_lock(&workq->lock, true);
386 }
387
388 irq_spinlock_unlock(&workq->lock, true);
389}
390
391/** Queues a function into the global wait queue without blocking.
392 *
393 * See workq_enqueue_noblock() for more details.
394 */
395int workq_global_enqueue_noblock(work_t *work_item, work_func_t func)
396{
397 return workq_enqueue_noblock(&g_work_queue, work_item, func);
398}
399
400/** Queues a function into the global wait queue; may block.
401 *
402 * See workq_enqueue() for more details.
403 */
404int workq_global_enqueue(work_t *work_item, work_func_t func)
405{
406 return workq_enqueue(&g_work_queue, work_item, func);
407}
408
409/** Adds a function to be invoked in a separate thread without blocking.
410 *
411 * workq_enqueue_noblock() is guaranteed not to block. It is safe
412 * to invoke from interrupt handlers.
413 *
414 * Consider using workq_enqueue() instead if at all possible. Otherwise,
415 * your work item may have to wait for previously enqueued sleeping
416 * work items to complete if you are unlucky.
417 *
418 * @param workq Work queue where to queue the work item.
419 * @param work_item Work item bookkeeping structure. Must be valid
420 * until func() is entered.
421 * @param func User supplied function to invoke in a worker thread.
422
423 * @return false if work queue is shutting down; function is not
424 * queued for further processing.
425 * @return true Otherwise. func() will be invoked in a separate thread.
426 */
427int workq_enqueue_noblock(struct work_queue *workq, work_t *work_item,
428 work_func_t func)
429{
430 return _workq_enqueue(workq, work_item, func, false);
431}
432
433/** Adds a function to be invoked in a separate thread; may block.
434 *
435 * While the workq_enqueue() is unlikely to block, it may do so if too
436 * many previous work items blocked sleeping.
437 *
438 * @param workq Work queue where to queue the work item.
439 * @param work_item Work item bookkeeping structure. Must be valid
440 * until func() is entered.
441 * @param func User supplied function to invoke in a worker thread.
442
443 * @return false if work queue is shutting down; function is not
444 * queued for further processing.
445 * @return true Otherwise. func() will be invoked in a separate thread.
446 */
447int workq_enqueue(struct work_queue *workq, work_t *work_item, work_func_t func)
448{
449 return _workq_enqueue(workq, work_item, func, true);
450}
451
452/** Adds a work item that will be processed by a separate worker thread.
453 *
454 * func() will be invoked in another kernel thread and may block.
455 *
456 * Prefer to call _workq_enqueue() with can_block set. Otherwise
457 * your work item may have to wait for sleeping work items to complete.
458 * If all worker threads are blocked/sleeping a new worker thread cannot
459 * be create without can_block set because creating a thread might
460 * block due to low memory conditions.
461 *
462 * @param workq Work queue where to queue the work item.
463 * @param work_item Work item bookkeeping structure. Must be valid
464 * until func() is entered.
465 * @param func User supplied function to invoke in a worker thread.
466 * @param can_block May adding this work item block?
467
468 * @return false if work queue is shutting down; function is not
469 * queued for further processing.
470 * @return true Otherwise.
471 */
472static int _workq_enqueue(struct work_queue *workq, work_t *work_item,
473 work_func_t func, bool can_block)
474{
475 ASSERT(!workq_corrupted(workq));
476
477 bool success = true;
478 signal_op_t signal_op = NULL;
479
480 irq_spinlock_lock(&workq->lock, true);
481
482 if (workq->stopping) {
483 success = false;
484 } else {
485 init_work_item(work_item, func);
486 list_append(&work_item->queue_link, &workq->queue);
487 ++workq->item_cnt;
488 success = true;
489
490 if (!booting) {
491 signal_op = signal_worker_logic(workq, can_block);
492 } else {
493 /*
494 * During boot there are no workers to signal. Just queue
495 * the work and let future workers take care of it.
496 */
497 }
498 }
499
500 irq_spinlock_unlock(&workq->lock, true);
501
502 if (signal_op) {
503 signal_op(workq);
504 }
505
506 return success;
507}
508
509/** Prepare an item to be added to the work item queue. */
510static void init_work_item(work_t *work_item, work_func_t func)
511{
512#ifdef CONFIG_DEBUG
513 work_item->cookie = WORK_ITEM_MAGIC;
514#endif
515
516 link_initialize(&work_item->queue_link);
517 work_item->func = func;
518}
519
520/** Returns the number of workers running work func() that are not blocked. */
521static size_t active_workers_now(struct work_queue *workq)
522{
523 ASSERT(irq_spinlock_locked(&workq->lock));
524
525 /* Workers blocked are sleeping in the work function (ie not idle). */
526 ASSERT(workq->blocked_worker_cnt <= workq->cur_worker_cnt);
527 /* Idle workers are waiting for more work to arrive in condvar_wait. */
528 ASSERT(workq->idle_worker_cnt <= workq->cur_worker_cnt);
529
530 /* Idle + blocked workers == sleeping worker threads. */
531 size_t sleeping_workers = workq->blocked_worker_cnt + workq->idle_worker_cnt;
532
533 ASSERT(sleeping_workers <= workq->cur_worker_cnt);
534 /* Workers pending activation are idle workers not yet given a time slice. */
535 ASSERT(workq->activate_pending <= workq->idle_worker_cnt);
536
537 /*
538 * Workers actively running the work func() this very moment and
539 * are neither blocked nor idle. Exclude ->activate_pending workers
540 * since they will run their work func() once they get a time slice
541 * and are not running it right now.
542 */
543 return workq->cur_worker_cnt - sleeping_workers;
544}
545
546/**
547 * Returns the number of workers that are running or are about to run work
548 * func() and that are not blocked.
549 */
550static size_t active_workers(struct work_queue *workq)
551{
552 ASSERT(irq_spinlock_locked(&workq->lock));
553
554 /*
555 * Workers actively running the work func() and are neither blocked nor
556 * idle. ->activate_pending workers will run their work func() once they
557 * get a time slice after waking from a condvar wait, so count them
558 * as well.
559 */
560 return active_workers_now(workq) + workq->activate_pending;
561}
562
563static void add_worker_noblock_op(struct work_queue *workq)
564{
565 condvar_signal(&nonblock_adder.req_cv);
566}
567
568static void add_worker_op(struct work_queue *workq)
569{
570 add_worker(workq);
571}
572
573static void signal_worker_op(struct work_queue *workq)
574{
575 ASSERT(!workq_corrupted(workq));
576
577 condvar_signal(&workq->activate_worker);
578
579 irq_spinlock_lock(&workq->lock, true);
580 ASSERT(0 < workq->pending_op_cnt);
581 --workq->pending_op_cnt;
582 irq_spinlock_unlock(&workq->lock, true);
583}
584
585/** Determines how to signal workers if at all.
586 *
587 * @param workq Work queue where a new work item was queued.
588 * @param can_block True if we may block while signaling a worker or creating
589 * a new worker.
590 *
591 * @return Function that will notify workers or NULL if no action is needed.
592 */
593static signal_op_t signal_worker_logic(struct work_queue *workq, bool can_block)
594{
595 ASSERT(!workq_corrupted(workq));
596 ASSERT(irq_spinlock_locked(&workq->lock));
597
598 /* Only signal workers if really necessary. */
599 signal_op_t signal_op = NULL;
600
601 /*
602 * Workers actively running the work func() and neither blocked nor idle.
603 * Including ->activate_pending workers that will run their work func()
604 * once they get a time slice.
605 */
606 size_t active = active_workers(workq);
607 /* Max total allowed number of work items queued for active workers. */
608 size_t max_load = active * max_items_per_worker;
609
610 /* Active workers are getting overwhelmed - activate another. */
611 if (max_load < workq->item_cnt) {
612
613 size_t remaining_idle =
614 workq->idle_worker_cnt - workq->activate_pending;
615
616 /* Idle workers still exist - activate one. */
617 if (remaining_idle > 0) {
618 /*
619 * Directly changing idle_worker_cnt here would not allow
620 * workers to recognize spurious wake-ups. Change
621 * activate_pending instead.
622 */
623 ++workq->activate_pending;
624 ++workq->pending_op_cnt;
625 signal_op = signal_worker_op;
626 } else {
627 /* No idle workers remain. Request that a new one be created. */
628 bool need_worker = (active < max_concurrent_workers)
629 && (workq->cur_worker_cnt < max_worker_cnt);
630
631 if (need_worker && can_block) {
632 signal_op = add_worker_op;
633 /*
634 * It may take some time to actually create the worker.
635 * We don't want to swamp the thread pool with superfluous
636 * worker creation requests so pretend it was already
637 * created and proactively increase the worker count.
638 */
639 ++workq->cur_worker_cnt;
640 }
641
642 /*
643 * We cannot create a new worker but we need one desperately
644 * because all workers are blocked in their work functions.
645 */
646 if (need_worker && !can_block && 0 == active) {
647 ASSERT(0 == workq->idle_worker_cnt);
648
649 irq_spinlock_lock(&nonblock_adder.lock, true);
650
651 if (nonblock_adder.thread && !link_used(&workq->nb_link)) {
652 signal_op = add_worker_noblock_op;
653 ++workq->cur_worker_cnt;
654 list_append(&workq->nb_link, &nonblock_adder.work_queues);
655 }
656
657 irq_spinlock_unlock(&nonblock_adder.lock, true);
658 }
659 }
660 } else {
661 /*
662 * There are enough active/running workers to process the queue.
663 * No need to signal/activate any new workers.
664 */
665 signal_op = NULL;
666 }
667
668 return signal_op;
669}
670
671/** Executes queued work items. */
672static void worker_thread(void *arg)
673{
674 /*
675 * The thread has been created after the work queue was ordered to stop.
676 * Do not access the work queue and return immediately.
677 */
678 if (thread_interrupted(THREAD)) {
679 thread_detach(THREAD);
680 return;
681 }
682
683 ASSERT(arg != NULL);
684
685 struct work_queue *workq = arg;
686 work_t *work_item;
687
688 while (dequeue_work(workq, &work_item)) {
689 /* Copy the func field so func() can safely free work_item. */
690 work_func_t func = work_item->func;
691
692 func(work_item);
693 }
694}
695
696/** Waits and retrieves a work item. Returns false if the worker should exit. */
697static bool dequeue_work(struct work_queue *workq, work_t **pwork_item)
698{
699 ASSERT(!workq_corrupted(workq));
700
701 irq_spinlock_lock(&workq->lock, true);
702
703 /* Check if we should exit if load is low. */
704 if (!workq->stopping && worker_unnecessary(workq)) {
705 /* There are too many workers for this load. Exit. */
706 ASSERT(0 < workq->cur_worker_cnt);
707 --workq->cur_worker_cnt;
708 list_remove(&THREAD->workq_link);
709 irq_spinlock_unlock(&workq->lock, true);
710
711 thread_detach(THREAD);
712 return false;
713 }
714
715 bool stop = false;
716
717 /* Wait for work to arrive. */
718 while (list_empty(&workq->queue) && !workq->stopping) {
719 cv_wait(workq);
720
721 if (0 < workq->activate_pending)
722 --workq->activate_pending;
723 }
724
725 /* Process remaining work even if requested to stop. */
726 if (!list_empty(&workq->queue)) {
727 link_t *work_link = list_first(&workq->queue);
728 *pwork_item = list_get_instance(work_link, work_t, queue_link);
729
730#ifdef CONFIG_DEBUG
731 ASSERT(!work_item_corrupted(*pwork_item));
732 (*pwork_item)->cookie = 0;
733#endif
734 list_remove(work_link);
735 --workq->item_cnt;
736
737 stop = false;
738 } else {
739 /* Requested to stop and no more work queued. */
740 ASSERT(workq->stopping);
741 --workq->cur_worker_cnt;
742 stop = true;
743 }
744
745 irq_spinlock_unlock(&workq->lock, true);
746
747 return !stop;
748}
749
750/** Returns true if for the given load there are too many workers. */
751static bool worker_unnecessary(struct work_queue *workq)
752{
753 ASSERT(irq_spinlock_locked(&workq->lock));
754
755 /* No work is pending. We don't need too many idle threads. */
756 if (list_empty(&workq->queue)) {
757 /* There are too many idle workers. Exit. */
758 return (min_worker_cnt <= workq->idle_worker_cnt);
759 } else {
760 /*
761 * There is work but we are swamped with too many active workers
762 * that were woken up from sleep at around the same time. We
763 * don't need another worker fighting for cpu time.
764 */
765 size_t active = active_workers_now(workq);
766 return (max_concurrent_workers < active);
767 }
768}
769
770/** Waits for a signal to activate_worker. Thread marked idle while waiting. */
771static void cv_wait(struct work_queue *workq)
772{
773 ++workq->idle_worker_cnt;
774 THREAD->workq_idling = true;
775
776 /* Ignore lock ordering just here. */
777 ASSERT(irq_spinlock_locked(&workq->lock));
778
779 _condvar_wait_timeout_irq_spinlock(&workq->activate_worker,
780 &workq->lock, SYNCH_NO_TIMEOUT, SYNCH_FLAGS_NONE);
781
782 ASSERT(!workq_corrupted(workq));
783 ASSERT(irq_spinlock_locked(&workq->lock));
784
785 THREAD->workq_idling = false;
786 --workq->idle_worker_cnt;
787}
788
789
790/** Invoked from thread_ready() right before the thread is woken up. */
791void workq_before_thread_is_ready(thread_t *thread)
792{
793 ASSERT(thread);
794 ASSERT(irq_spinlock_locked(&thread->lock));
795
796 /* Worker's work func() is about to wake up from sleeping. */
797 if (thread->workq && thread->workq_blocked) {
798 /* Must be blocked in user work func() and not be waiting for work. */
799 ASSERT(!thread->workq_idling);
800 ASSERT(thread->state == Sleeping);
801 ASSERT(THREAD != thread);
802 ASSERT(!workq_corrupted(thread->workq));
803
804 /* Protected by thread->lock */
805 thread->workq_blocked = false;
806
807 irq_spinlock_lock(&thread->workq->lock, true);
808 --thread->workq->blocked_worker_cnt;
809 irq_spinlock_unlock(&thread->workq->lock, true);
810 }
811}
812
813/** Invoked from scheduler() before switching away from a thread. */
814void workq_after_thread_ran(void)
815{
816 ASSERT(THREAD);
817 ASSERT(irq_spinlock_locked(&THREAD->lock));
818
819 /* Worker's work func() is about to sleep/block. */
820 if (THREAD->workq && THREAD->state == Sleeping && !THREAD->workq_idling) {
821 ASSERT(!THREAD->workq_blocked);
822 ASSERT(!workq_corrupted(THREAD->workq));
823
824 THREAD->workq_blocked = true;
825
826 irq_spinlock_lock(&THREAD->workq->lock, false);
827
828 ++THREAD->workq->blocked_worker_cnt;
829
830 bool can_block = false;
831 signal_op_t op = signal_worker_logic(THREAD->workq, can_block);
832
833 irq_spinlock_unlock(&THREAD->workq->lock, false);
834
835 if (op) {
836 ASSERT(add_worker_noblock_op == op || signal_worker_op == op);
837 op(THREAD->workq);
838 }
839 }
840}
841
842/** Prints stats of the work queue to the kernel console. */
843void workq_print_info(struct work_queue *workq)
844{
845 irq_spinlock_lock(&workq->lock, true);
846
847 size_t total = workq->cur_worker_cnt;
848 size_t blocked = workq->blocked_worker_cnt;
849 size_t idle = workq->idle_worker_cnt;
850 size_t active = active_workers(workq);
851 size_t items = workq->item_cnt;
852 bool stopping = workq->stopping;
853 bool worker_surplus = worker_unnecessary(workq);
854 const char *load_str = worker_surplus ? "decreasing" :
855 (0 < workq->activate_pending) ? "increasing" : "stable";
856
857 irq_spinlock_unlock(&workq->lock, true);
858
859 printf(
860 "Configuration: max_worker_cnt=%zu, min_worker_cnt=%zu,\n"
861 " max_concurrent_workers=%zu, max_items_per_worker=%zu\n"
862 "Workers: %zu\n"
863 "Active: %zu (workers currently processing work)\n"
864 "Blocked: %zu (work functions sleeping/blocked)\n"
865 "Idle: %zu (idle workers waiting for more work)\n"
866 "Items: %zu (queued not yet dispatched work)\n"
867 "Stopping: %d\n"
868 "Load: %s\n",
869 max_worker_cnt, min_worker_cnt,
870 max_concurrent_workers, max_items_per_worker,
871 total,
872 active,
873 blocked,
874 idle,
875 items,
876 stopping,
877 load_str
878 );
879}
880
881/** Prints stats of the global work queue. */
882void workq_global_print_info(void)
883{
884 workq_print_info(&g_work_queue);
885}
886
887
888static bool dequeue_add_req(nonblock_adder_t *info, struct work_queue **pworkq)
889{
890 bool stop = false;
891
892 irq_spinlock_lock(&info->lock, true);
893
894 while (list_empty(&info->work_queues) && !stop) {
895 int ret = _condvar_wait_timeout_irq_spinlock(&info->req_cv,
896 &info->lock, SYNCH_NO_TIMEOUT, SYNCH_FLAGS_INTERRUPTIBLE);
897
898 stop = (ret == ESYNCH_INTERRUPTED);
899 }
900
901 if (!stop) {
902 *pworkq = list_get_instance(list_first(&info->work_queues),
903 struct work_queue, nb_link);
904
905 ASSERT(!workq_corrupted(*pworkq));
906
907 list_remove(&(*pworkq)->nb_link);
908 }
909
910 irq_spinlock_unlock(&info->lock, true);
911
912 return !stop;
913}
914
915static void thr_nonblock_add_worker(void *arg)
916{
917 nonblock_adder_t *info = arg;
918 struct work_queue *workq;
919
920 while (dequeue_add_req(info, &workq)) {
921 add_worker(workq);
922 }
923}
924
925
926static void nonblock_init(void)
927{
928 irq_spinlock_initialize(&nonblock_adder.lock, "kworkq-nb.lock");
929 condvar_initialize(&nonblock_adder.req_cv);
930 list_initialize(&nonblock_adder.work_queues);
931
932 nonblock_adder.thread = thread_create(thr_nonblock_add_worker,
933 &nonblock_adder, TASK, THREAD_FLAG_NONE, "kworkq-nb");
934
935 if (nonblock_adder.thread) {
936 thread_ready(nonblock_adder.thread);
937 } else {
938 /*
939 * We won't be able to add workers without blocking if all workers
940 * sleep, but at least boot the system.
941 */
942 printf("Failed to create kworkq-nb. Sleeping work may stall the workq.\n");
943 }
944}
945
946#ifdef CONFIG_DEBUG
947/** Returns true if the workq is definitely corrupted; false if not sure.
948 *
949 * Can be used outside of any locks.
950 */
951static bool workq_corrupted(struct work_queue *workq)
952{
953 /*
954 * Needed to make the most current cookie value set by workq_preinit()
955 * visible even if we access the workq right after it is created but
956 * on a different cpu. Otherwise, workq_corrupted() would not work
957 * outside a lock.
958 */
959 memory_barrier();
960 return NULL == workq || workq->cookie != WORKQ_MAGIC;
961}
962
963/** Returns true if the work_item is definitely corrupted; false if not sure.
964 *
965 * Must be used with the work queue protecting spinlock locked.
966 */
967static bool work_item_corrupted(work_t *work_item)
968{
969 return NULL == work_item || work_item->cookie != WORK_ITEM_MAGIC;
970}
971#endif
972
973/** @}
974 */
Note: See TracBrowser for help on using the repository browser.