source: mainline/kernel/generic/src/synch/workqueue.c@ 3954961e

lfn serial ticket/834-toolchain-update topic/msim-upgrade topic/simplify-dev-export
Last change on this file since 3954961e was 0d56712, checked in by Adam Hraska <adam.hraska+hos@…>, 13 years ago

workq: Added magic cookie integrity checks. Fixed test workqueue1.

  • Property mode set to 100644
File size: 25.6 KB
RevLine 
[8a64e81e]1#include <synch/workqueue.h>
2#include <synch/spinlock.h>
3#include <synch/condvar.h>
4#include <synch/mutex.h>
5#include <proc/thread.h>
6#include <config.h>
7#include <arch.h>
8#include <cpu.h>
9#include <macros.h>
10
[0d56712]11#define WORKQ_MAGIC 0xf00c1333U
12#define WORK_ITEM_MAGIC 0xfeec1777U
13
[8a64e81e]14
15struct work_queue {
16 /*
17 * Protects everything except activate_worker.
18 * Must be acquired after any thread->locks.
19 */
20 IRQ_SPINLOCK_DECLARE(lock);
21
22 /* Activates a worker if new work arrives or if shutting down the queue. */
23 condvar_t activate_worker;
24
25 /* Queue of work_items ready to be dispatched. */
26 list_t queue;
27
28 /* List of worker threads. */
29 list_t workers;
30
31 /* Number of work items queued. */
32 size_t item_cnt;
33
34 /* Indicates the work queue is shutting down. */
35 bool stopping;
36 const char *name;
37
38 /* Total number of created worker threads. */
39 size_t cur_worker_cnt;
40 /* Number of workers waiting for work to arrive. */
41 size_t idle_worker_cnt;
42 /* Number of idle workers signaled that have not yet been woken up. */
43 size_t activate_pending;
44 /* Number of blocked workers sleeping in work func() (ie not idle). */
45 size_t blocked_worker_cnt;
46
47 /* Number of pending signal_worker_op() operations. */
48 size_t pending_op_cnt;
49
50 link_t nb_link;
[0d56712]51
52#ifdef CONFIG_DEBUG
53 /* Magic cookie for integrity checks. Immutable. Accessed without lock. */
54 uint32_t cookie;
55#endif
[8a64e81e]56};
57
58
59/** Min number of idle workers to keep. */
60static size_t min_worker_cnt;
61/** Max total number of workers - be it blocked, idle, or active. */
62static size_t max_worker_cnt;
63/** Max number of concurrently running active workers, ie not blocked nor idle. */
64static size_t max_concurrent_workers;
65/** Max number of work items per active worker before a new worker is activated.*/
66static const size_t max_items_per_worker = 8;
67
68/** System wide work queue. */
69static struct work_queue g_work_queue;
70
71static int booting = true;
72
73
74typedef struct {
75 IRQ_SPINLOCK_DECLARE(lock);
76 condvar_t req_cv;
77 thread_t *thread;
78 list_t work_queues;
79} nonblock_adder_t;
80
81static nonblock_adder_t nonblock_adder;
82
83
84
85/** Typedef a worker thread signaling operation prototype. */
86typedef void (*signal_op_t)(struct work_queue *workq);
87
88
89/* Fwd decl. */
90static void workq_preinit(struct work_queue *workq, const char *name);
91static bool add_worker(struct work_queue *workq);
92static void interrupt_workers(struct work_queue *workq);
93static void wait_for_workers(struct work_queue *workq);
94static int _workq_enqueue(struct work_queue *workq, work_t *work_item,
95 work_func_t func, bool can_block);
[0d56712]96static void init_work_item(work_t *work_item, work_func_t func);
[8a64e81e]97static signal_op_t signal_worker_logic(struct work_queue *workq, bool can_block);
98static void worker_thread(void *arg);
99static bool dequeue_work(struct work_queue *workq, work_t **pwork_item);
100static bool worker_unnecessary(struct work_queue *workq);
101static void cv_wait(struct work_queue *workq);
102static void nonblock_init(void);
[0d56712]103static bool workq_corrupted(struct work_queue *workq);
104static bool work_item_corrupted(work_t *work_item);
105
[8a64e81e]106
107/** Creates worker thread for the system-wide worker queue. */
108void workq_global_worker_init(void)
109{
110 /*
111 * No need for additional synchronization. Stores to word-sized
112 * variables are atomic and the change will eventually propagate.
113 * Moreover add_worker() includes the necessary memory barriers
114 * in spinlock lock/unlock().
115 */
116 booting = false;
117
118 nonblock_init();
119
120 if (!add_worker(&g_work_queue))
121 panic("Could not create a single global work queue worker!\n");
122
123}
124
125/** Initializes the system wide work queue and support for other work queues. */
126void workq_global_init(void)
127{
128 /* Keep idle workers on 1/4-th of cpus, but at least 2 threads. */
129 min_worker_cnt = max(2, config.cpu_count / 4);
130 /* Allow max 8 sleeping work items per cpu. */
131 max_worker_cnt = max(32, 8 * config.cpu_count);
132 /* Maximum concurrency without slowing down the system. */
133 max_concurrent_workers = max(2, config.cpu_count);
134
135 workq_preinit(&g_work_queue, "kworkq");
136}
137
138/** Stops the system global work queue and waits for all work items to complete.*/
139void workq_global_stop(void)
140{
141 workq_stop(&g_work_queue);
142}
143
144/** Creates and initializes a work queue. Returns NULL upon failure. */
145struct work_queue * workq_create(const char *name)
146{
147 struct work_queue *workq = malloc(sizeof(struct work_queue), 0);
148
149 if (workq) {
150 if (workq_init(workq, name)) {
[0d56712]151 ASSERT(!workq_corrupted(workq));
[8a64e81e]152 return workq;
153 }
154
155 free(workq);
156 }
157
158 return NULL;
159}
160
161/** Frees work queue resources and stops it if it had not been done so already.*/
162void workq_destroy(struct work_queue *workq)
163{
[0d56712]164 ASSERT(!workq_corrupted(workq));
[8a64e81e]165
166 irq_spinlock_lock(&workq->lock, true);
167 bool stopped = workq->stopping;
168 size_t running_workers = workq->cur_worker_cnt;
169 irq_spinlock_unlock(&workq->lock, true);
170
171 if (!stopped) {
172 workq_stop(workq);
173 } else {
174 ASSERT(0 == running_workers);
175 }
176
[0d56712]177#ifdef CONFIG_DEBUG
178 workq->cookie = 0;
179#endif
180
[8a64e81e]181 free(workq);
182}
183
184/** Initializes workq structure without creating any workers. */
185static void workq_preinit(struct work_queue *workq, const char *name)
186{
[0d56712]187#ifdef CONFIG_DEBUG
188 workq->cookie = WORKQ_MAGIC;
189#endif
190
[8a64e81e]191 irq_spinlock_initialize(&workq->lock, name);
192 condvar_initialize(&workq->activate_worker);
193
194 list_initialize(&workq->queue);
195 list_initialize(&workq->workers);
196
197 workq->item_cnt = 0;
198 workq->stopping = false;
199 workq->name = name;
200
201 workq->cur_worker_cnt = 1;
202 workq->idle_worker_cnt = 0;
203 workq->activate_pending = 0;
204 workq->blocked_worker_cnt = 0;
205
206 workq->pending_op_cnt = 0;
207 link_initialize(&workq->nb_link);
208}
209
210/** Initializes a work queue. Returns true if successful.
211 *
212 * Before destroying a work queue it must be stopped via
213 * workq_stop().
214 */
215int workq_init(struct work_queue *workq, const char *name)
216{
217 workq_preinit(workq, name);
218 return add_worker(workq);
219}
220
221/** Add a new worker thread. Returns false if the thread could not be created. */
222static bool add_worker(struct work_queue *workq)
223{
[0d56712]224 ASSERT(!workq_corrupted(workq));
225
[8a64e81e]226 thread_t *thread = thread_create(worker_thread, workq, TASK,
227 THREAD_FLAG_NONE, workq->name);
228
229 if (!thread) {
230 irq_spinlock_lock(&workq->lock, true);
231
232 /* cur_worker_cnt proactively increased in signal_worker_logic() .*/
233 ASSERT(0 < workq->cur_worker_cnt);
234 --workq->cur_worker_cnt;
235
236 irq_spinlock_unlock(&workq->lock, true);
237 return false;
238 }
239
240 /* Respect lock ordering. */
241 irq_spinlock_lock(&thread->lock, true);
242 irq_spinlock_lock(&workq->lock, false);
243
244 bool success;
245
246 if (!workq->stopping) {
247 success = true;
248
249 /* Try to distribute workers among cpus right away. */
250 unsigned int cpu_id = (workq->cur_worker_cnt) % config.cpu_active;
251
252 if (!cpus[cpu_id].active)
253 cpu_id = CPU->id;
254
255 thread->workq = workq;
256 thread->cpu = &cpus[cpu_id];
257 thread->workq_blocked = false;
258 thread->workq_idling = false;
259 link_initialize(&thread->workq_link);
260
261 list_append(&thread->workq_link, &workq->workers);
262 } else {
263 /*
264 * Work queue is shutting down - we must not add the worker
265 * and we cannot destroy it without ready-ing it. Mark it
266 * interrupted so the worker exits right away without even
267 * touching workq.
268 */
269 success = false;
270
271 /* cur_worker_cnt proactively increased in signal_worker() .*/
272 ASSERT(0 < workq->cur_worker_cnt);
273 --workq->cur_worker_cnt;
274 }
275
276 irq_spinlock_unlock(&workq->lock, false);
277 irq_spinlock_unlock(&thread->lock, true);
278
279 if (!success) {
280 thread_interrupt(thread);
281 }
282
283 thread_ready(thread);
284
285 return success;
286}
287
288/** Shuts down the work queue. Waits for all pending work items to complete.
289 *
290 * workq_stop() may only be run once.
291 */
292void workq_stop(struct work_queue *workq)
293{
[0d56712]294 ASSERT(!workq_corrupted(workq));
295
[8a64e81e]296 interrupt_workers(workq);
297 wait_for_workers(workq);
298}
299
300/** Notifies worker threads the work queue is shutting down. */
301static void interrupt_workers(struct work_queue *workq)
302{
303 irq_spinlock_lock(&workq->lock, true);
304
305 /* workq_stop() may only be called once. */
306 ASSERT(!workq->stopping);
307 workq->stopping = true;
308
309 /* Respect lock ordering - do not hold workq->lock during broadcast. */
310 irq_spinlock_unlock(&workq->lock, true);
311
312 condvar_broadcast(&workq->activate_worker);
313}
314
315/** Waits for all worker threads to exit. */
316static void wait_for_workers(struct work_queue *workq)
317{
318 ASSERT(!PREEMPTION_DISABLED);
319
320 irq_spinlock_lock(&workq->lock, true);
321
322 list_foreach_safe(workq->workers, cur_worker, next_worker) {
323 thread_t *worker = list_get_instance(cur_worker, thread_t, workq_link);
324 list_remove(cur_worker);
325
326 /* Wait without the lock. */
327 irq_spinlock_unlock(&workq->lock, true);
328
329 thread_join(worker);
330 thread_detach(worker);
331
332 irq_spinlock_lock(&workq->lock, true);
333 }
334
335 ASSERT(list_empty(&workq->workers));
336
337 /* Wait for deferred add_worker_op(), signal_worker_op() to finish. */
338 while (0 < workq->cur_worker_cnt || 0 < workq->pending_op_cnt) {
339 irq_spinlock_unlock(&workq->lock, true);
340
341 scheduler();
342
343 irq_spinlock_lock(&workq->lock, true);
344 }
345
346 irq_spinlock_unlock(&workq->lock, true);
347}
348
349/** Queues a function into the global wait queue without blocking.
350 *
351 * See workq_enqueue_noblock() for more details.
352 */
353int workq_global_enqueue_noblock(work_t *work_item, work_func_t func)
354{
355 return workq_enqueue_noblock(&g_work_queue, work_item, func);
356}
357
358/** Queues a function into the global wait queue; may block.
359 *
360 * See workq_enqueue() for more details.
361 */
362int workq_global_enqueue(work_t *work_item, work_func_t func)
363{
364 return workq_enqueue(&g_work_queue, work_item, func);
365}
366
367/** Adds a function to be invoked in a separate thread without blocking.
368 *
369 * workq_enqueue_noblock() is guaranteed not to block. It is safe
370 * to invoke from interrupt handlers.
371 *
372 * Consider using workq_enqueue() instead if at all possible. Otherwise,
373 * your work item may have to wait for previously enqueued sleeping
374 * work items to complete if you are unlucky.
375 *
376 * @param workq Work queue where to queue the work item.
377 * @param work_item Work item bookkeeping structure. Must be valid
378 * until func() is entered.
379 * @param func User supplied function to invoke in a worker thread.
380
381 * @return false if work queue is shutting down; function is not
382 * queued for further processing.
383 * @return true Otherwise. func() will be invoked in a separate thread.
384 */
385int workq_enqueue_noblock(struct work_queue *workq, work_t *work_item,
386 work_func_t func)
387{
388 return _workq_enqueue(workq, work_item, func, false);
389}
390
391/** Adds a function to be invoked in a separate thread; may blocking.
392 *
393 * While the workq_enqueue() is unlikely to block, it may do so if too
394 * many previous work items blocked sleeping.
395 *
396 * @param workq Work queue where to queue the work item.
397 * @param work_item Work item bookkeeping structure. Must be valid
398 * until func() is entered.
399 * @param func User supplied function to invoke in a worker thread.
400
401 * @return false if work queue is shutting down; function is not
402 * queued for further processing.
403 * @return true Otherwise. func() will be invoked in a separate thread.
404 */
405int workq_enqueue(struct work_queue *workq, work_t *work_item, work_func_t func)
406{
407 return _workq_enqueue(workq, work_item, func, true);
408}
409
410/** Adds a work item that will be processed by a separate worker thread.
411 *
412 * func() will be invoked in another kernel thread and may block.
413 *
414 * Prefer to call _workq_enqueue() with can_block set. Otherwise
415 * your work item may have to wait for sleeping work items to complete.
416 * If all worker threads are blocked/sleeping a new worker thread cannot
417 * be create without can_block set because creating a thread might
418 * block due to low memory conditions.
419 *
420 * @param workq Work queue where to queue the work item.
421 * @param work_item Work item bookkeeping structure. Must be valid
422 * until func() is entered.
423 * @param func User supplied function to invoke in a worker thread.
424 * @param can_block May adding this work item block?
425
426 * @return false if work queue is shutting down; function is not
427 * queued for further processing.
428 * @return true Otherwise.
429 */
430static int _workq_enqueue(struct work_queue *workq, work_t *work_item,
431 work_func_t func, bool can_block)
432{
[0d56712]433 ASSERT(!workq_corrupted(workq));
434
[8a64e81e]435 bool success = true;
436 signal_op_t signal_op = NULL;
437
438 irq_spinlock_lock(&workq->lock, true);
439
440 if (workq->stopping) {
441 success = false;
442 } else {
[0d56712]443 init_work_item(work_item, func);
[8a64e81e]444 list_append(&work_item->queue_link, &workq->queue);
445 ++workq->item_cnt;
446 success = true;
447
448 if (!booting) {
449 signal_op = signal_worker_logic(workq, can_block);
450 } else {
451 /*
452 * During boot there are no workers to signal. Just queue
453 * the work and let future workers take care of it.
454 */
455 }
456 }
457
458 irq_spinlock_unlock(&workq->lock, true);
459
460 if (signal_op) {
461 signal_op(workq);
462 }
463
464 return success;
465}
466
[0d56712]467/** Prepare an item to be added to the work item queue. */
468static void init_work_item(work_t *work_item, work_func_t func)
469{
470#ifdef CONFIG_DEBUG
471 work_item->cookie = WORK_ITEM_MAGIC;
472#endif
473
474 link_initialize(&work_item->queue_link);
475 work_item->func = func;
476}
477
[8a64e81e]478/** Returns the number of workers running work func() that are not blocked. */
479static size_t active_workers_now(struct work_queue *workq)
480{
481 ASSERT(irq_spinlock_locked(&workq->lock));
482
483 /* Workers blocked are sleeping in the work function (ie not idle). */
484 ASSERT(workq->blocked_worker_cnt <= workq->cur_worker_cnt);
485 /* Idle workers are waiting for more work to arrive in condvar_wait. */
486 ASSERT(workq->idle_worker_cnt <= workq->cur_worker_cnt);
487
488 /* Idle + blocked workers == sleeping worker threads. */
489 size_t sleeping_workers = workq->blocked_worker_cnt + workq->idle_worker_cnt;
490
491 ASSERT(sleeping_workers <= workq->cur_worker_cnt);
492 /* Workers pending activation are idle workers not yet given a time slice. */
493 ASSERT(workq->activate_pending <= workq->idle_worker_cnt);
494
495 /*
496 * Workers actively running the work func() this very moment and
497 * are neither blocked nor idle. Exclude ->activate_pending workers
498 * since they will run their work func() once they get a time slice
499 * and are not running it right now.
500 */
501 return workq->cur_worker_cnt - sleeping_workers;
502}
503
504/**
505 * Returns the number of workers that are running or are about to run work
506 * func() and that are not blocked.
507 */
508static size_t active_workers(struct work_queue *workq)
509{
510 ASSERT(irq_spinlock_locked(&workq->lock));
511
512 /*
513 * Workers actively running the work func() and are neither blocked nor
514 * idle. ->activate_pending workers will run their work func() once they
515 * get a time slice after waking from a condvar wait, so count them
516 * as well.
517 */
518 return active_workers_now(workq) + workq->activate_pending;
519}
520
521static void add_worker_noblock_op(struct work_queue *workq)
522{
523 condvar_signal(&nonblock_adder.req_cv);
524}
525
526static void add_worker_op(struct work_queue *workq)
527{
528 add_worker(workq);
529}
530
531static void signal_worker_op(struct work_queue *workq)
532{
[0d56712]533 ASSERT(!workq_corrupted(workq));
534
[8a64e81e]535 condvar_signal(&workq->activate_worker);
536
537 irq_spinlock_lock(&workq->lock, true);
538 ASSERT(0 < workq->pending_op_cnt);
539 --workq->pending_op_cnt;
540 irq_spinlock_unlock(&workq->lock, true);
541}
542
543/** Determines how to signal workers if at all.
544 *
545 * @param workq Work queue where a new work item was queued.
546 * @param can_block True if we may block while signaling a worker or creating
547 * a new worker.
548 *
549 * @return Function that will notify workers or NULL if no action is needed.
550 */
551static signal_op_t signal_worker_logic(struct work_queue *workq, bool can_block)
552{
[0d56712]553 ASSERT(!workq_corrupted(workq));
[8a64e81e]554 ASSERT(irq_spinlock_locked(&workq->lock));
555
556 /* Only signal workers if really necessary. */
557 signal_op_t signal_op = NULL;
558
559 /*
560 * Workers actively running the work func() and neither blocked nor idle.
561 * Including ->activate_pending workers that will run their work func()
562 * once they get a time slice.
563 */
564 size_t active = active_workers(workq);
565 /* Max total allowed number of work items queued for active workers. */
566 size_t max_load = active * max_items_per_worker;
567
568 /* Active workers are getting overwhelmed - activate another. */
569 if (max_load < workq->item_cnt) {
570
571 size_t remaining_idle =
572 workq->idle_worker_cnt - workq->activate_pending;
573
574 /* Idle workers still exist - activate one. */
575 if (remaining_idle > 0) {
576 /*
577 * Directly changing idle_worker_cnt here would not allow
578 * workers to recognize spurious wake-ups. Change
579 * activate_pending instead.
580 */
581 ++workq->activate_pending;
582 ++workq->pending_op_cnt;
583 signal_op = signal_worker_op;
584 } else {
585 /* No idle workers remain. Request that a new one be created. */
586 bool need_worker = (active < max_concurrent_workers)
587 && (workq->cur_worker_cnt < max_worker_cnt);
588
589 if (need_worker && can_block) {
590 signal_op = add_worker_op;
591 /*
592 * It may take some time to actually create the worker.
593 * We don't want to swamp the thread pool with superfluous
594 * worker creation requests so pretend it was already
595 * created and proactively increase the worker count.
596 */
597 ++workq->cur_worker_cnt;
598 }
599
600 /*
601 * We cannot create a new worker but we need one desperately
602 * because all workers are blocked in their work functions.
603 */
604 if (need_worker && !can_block && 0 == active) {
605 ASSERT(0 == workq->idle_worker_cnt);
606
607 irq_spinlock_lock(&nonblock_adder.lock, true);
608
609 if (nonblock_adder.thread && !link_used(&workq->nb_link)) {
610 signal_op = add_worker_noblock_op;
611 ++workq->cur_worker_cnt;
612 list_append(&workq->nb_link, &nonblock_adder.work_queues);
613 }
614
615 irq_spinlock_unlock(&nonblock_adder.lock, true);
616 }
617 }
618 } else {
619 /*
620 * There are enough active/running workers to process the queue.
621 * No need to signal/activate any new workers.
622 */
623 signal_op = NULL;
624 }
625
626 return signal_op;
627}
628
629/** Executes queued work items. */
630static void worker_thread(void *arg)
631{
632 /*
633 * The thread has been created after the work queue was ordered to stop.
634 * Do not access the work queue and return immediately.
635 */
636 if (thread_interrupted(THREAD)) {
637 thread_detach(THREAD);
638 return;
639 }
640
641 ASSERT(arg != NULL);
642
643 struct work_queue *workq = arg;
644 work_t *work_item;
645
646 while (dequeue_work(workq, &work_item)) {
647 /* Copy the func field so func() can safely free work_item. */
648 work_func_t func = work_item->func;
[0d56712]649
[8a64e81e]650 func(work_item);
651 }
652}
653
654/** Waits and retrieves a work item. Returns false if the worker should exit. */
655static bool dequeue_work(struct work_queue *workq, work_t **pwork_item)
656{
[0d56712]657 ASSERT(!workq_corrupted(workq));
658
[8a64e81e]659 irq_spinlock_lock(&workq->lock, true);
660
661 /* Check if we should exit if load is low. */
662 if (!workq->stopping && worker_unnecessary(workq)) {
663 /* There are too many workers for this load. Exit. */
664 ASSERT(0 < workq->cur_worker_cnt);
665 --workq->cur_worker_cnt;
666 list_remove(&THREAD->workq_link);
667 irq_spinlock_unlock(&workq->lock, true);
668
669 thread_detach(THREAD);
670 return false;
671 }
672
673 bool stop = false;
674
675 /* Wait for work to arrive. */
676 while (list_empty(&workq->queue) && !workq->stopping) {
677 cv_wait(workq);
678
679 if (0 < workq->activate_pending)
680 --workq->activate_pending;
681 }
682
683 /* Process remaining work even if requested to stop. */
684 if (!list_empty(&workq->queue)) {
685 link_t *work_link = list_first(&workq->queue);
686 *pwork_item = list_get_instance(work_link, work_t, queue_link);
[0d56712]687
688#ifdef CONFIG_DEBUG
689 ASSERT(!work_item_corrupted(*pwork_item));
690 (*pwork_item)->cookie = 0;
691#endif
[8a64e81e]692 list_remove(work_link);
693 --workq->item_cnt;
694
695 stop = false;
696 } else {
697 /* Requested to stop and no more work queued. */
698 ASSERT(workq->stopping);
699 --workq->cur_worker_cnt;
700 stop = true;
701 }
702
703 irq_spinlock_unlock(&workq->lock, true);
704
705 return !stop;
706}
707
708/** Returns true if for the given load there are too many workers. */
709static bool worker_unnecessary(struct work_queue *workq)
710{
711 ASSERT(irq_spinlock_locked(&workq->lock));
712
713 /* No work is pending. We don't need too many idle threads. */
714 if (list_empty(&workq->queue)) {
715 /* There are too many idle workers. Exit. */
716 return (min_worker_cnt <= workq->idle_worker_cnt);
717 } else {
718 /*
719 * There is work but we are swamped with too many active workers
720 * that were woken up from sleep at around the same time. We
721 * don't need another worker fighting for cpu time.
722 */
723 size_t active = active_workers_now(workq);
724 return (max_concurrent_workers < active);
725 }
726}
727
728/** Waits for a signal to activate_worker. Thread marked idle while waiting. */
729static void cv_wait(struct work_queue *workq)
730{
731 ++workq->idle_worker_cnt;
732 THREAD->workq_idling = true;
733
734 /* Ignore lock ordering just here. */
735 ASSERT(irq_spinlock_locked(&workq->lock));
736
737 _condvar_wait_timeout_irq_spinlock(&workq->activate_worker,
738 &workq->lock, SYNCH_NO_TIMEOUT, SYNCH_FLAGS_NONE);
739
[0d56712]740 ASSERT(!workq_corrupted(workq));
[8a64e81e]741 ASSERT(irq_spinlock_locked(&workq->lock));
742
743 THREAD->workq_idling = false;
744 --workq->idle_worker_cnt;
745}
746
747
748/** Invoked from thread_ready() right before the thread is woken up. */
749void workq_before_thread_is_ready(thread_t *thread)
750{
751 ASSERT(thread);
752 ASSERT(irq_spinlock_locked(&thread->lock));
753
754 /* Worker's work func() is about to wake up from sleeping. */
755 if (thread->workq && thread->workq_blocked) {
756 /* Must be blocked in user work func() and not be waiting for work. */
757 ASSERT(!thread->workq_idling);
758 ASSERT(thread->state == Sleeping);
[0d56712]759 ASSERT(THREAD != thread);
760 ASSERT(!workq_corrupted(thread->workq));
[8a64e81e]761
762 /* Protected by thread->lock */
763 thread->workq_blocked = false;
764
765 irq_spinlock_lock(&thread->workq->lock, true);
766 --thread->workq->blocked_worker_cnt;
767 irq_spinlock_unlock(&thread->workq->lock, true);
768 }
769}
770
771/** Invoked from scheduler() before switching away from a thread. */
772void workq_after_thread_ran(void)
773{
774 ASSERT(THREAD);
775 ASSERT(irq_spinlock_locked(&THREAD->lock));
776
777 /* Worker's work func() is about to sleep/block. */
778 if (THREAD->workq && THREAD->state == Sleeping && !THREAD->workq_idling) {
779 ASSERT(!THREAD->workq_blocked);
[0d56712]780 ASSERT(!workq_corrupted(THREAD->workq));
781
[8a64e81e]782 THREAD->workq_blocked = true;
783
784 irq_spinlock_lock(&THREAD->workq->lock, false);
785
786 ++THREAD->workq->blocked_worker_cnt;
787
788 bool can_block = false;
789 signal_op_t op = signal_worker_logic(THREAD->workq, can_block);
790
791 irq_spinlock_unlock(&THREAD->workq->lock, false);
792
793 if (op) {
794 ASSERT(add_worker_noblock_op == op || signal_worker_op == op);
795 op(THREAD->workq);
796 }
797 }
798}
799
800/** Prints stats of the work queue to the kernel console. */
801void workq_print_info(struct work_queue *workq)
802{
803 irq_spinlock_lock(&workq->lock, true);
804
805 size_t total = workq->cur_worker_cnt;
806 size_t blocked = workq->blocked_worker_cnt;
807 size_t idle = workq->idle_worker_cnt;
808 size_t active = active_workers(workq);
809 size_t items = workq->item_cnt;
810 bool stopping = workq->stopping;
811 bool worker_surplus = worker_unnecessary(workq);
812 const char *load_str = worker_surplus ? "decreasing" :
813 (0 < workq->activate_pending) ? "increasing" : "stable";
814
815 irq_spinlock_unlock(&workq->lock, true);
816
817 printf(
[0d56712]818 "Configuration: max_worker_cnt=%zu, min_worker_cnt=%zu,\n"
819 " max_concurrent_workers=%zu, max_items_per_worker=%zu\n"
[8a64e81e]820 "Workers: %zu\n"
821 "Active: %zu (workers currently processing work)\n"
822 "Blocked: %zu (work functions sleeping/blocked)\n"
823 "Idle: %zu (idle workers waiting for more work)\n"
824 "Items: %zu (queued not yet dispatched work)\n"
825 "Stopping: %d\n"
826 "Load: %s\n",
827 max_worker_cnt, min_worker_cnt,
828 max_concurrent_workers, max_items_per_worker,
829 total,
830 active,
831 blocked,
832 idle,
833 items,
834 stopping,
835 load_str
836 );
837}
838
839/** Prints stats of the global work queue. */
840void workq_global_print_info(void)
841{
842 workq_print_info(&g_work_queue);
843}
844
845
846static bool dequeue_add_req(nonblock_adder_t *info, struct work_queue **pworkq)
847{
848 bool stop = false;
849
850 irq_spinlock_lock(&info->lock, true);
851
852 while (list_empty(&info->work_queues) && !stop) {
853 int ret = _condvar_wait_timeout_irq_spinlock(&info->req_cv,
854 &info->lock, SYNCH_NO_TIMEOUT, SYNCH_FLAGS_INTERRUPTIBLE);
855
856 stop = (ret == ESYNCH_INTERRUPTED);
857 }
858
859 if (!stop) {
860 *pworkq = list_get_instance(list_first(&info->work_queues),
861 struct work_queue, nb_link);
[0d56712]862
863 ASSERT(!workq_corrupted(*pworkq));
[8a64e81e]864
865 list_remove(&(*pworkq)->nb_link);
866 }
867
868 irq_spinlock_unlock(&info->lock, true);
869
870 return !stop;
871}
872
873static void thr_nonblock_add_worker(void *arg)
874{
875 nonblock_adder_t *info = arg;
876 struct work_queue *workq;
877
878 while (dequeue_add_req(info, &workq)) {
879 add_worker(workq);
880 }
881}
882
883
884static void nonblock_init(void)
885{
[0d56712]886 irq_spinlock_initialize(&nonblock_adder.lock, "kworkq-nb.lock");
[8a64e81e]887 condvar_initialize(&nonblock_adder.req_cv);
888 list_initialize(&nonblock_adder.work_queues);
889
890 nonblock_adder.thread = thread_create(thr_nonblock_add_worker,
[0d56712]891 &nonblock_adder, TASK, THREAD_FLAG_NONE, "kworkq-nb");
[8a64e81e]892
893 if (nonblock_adder.thread) {
894 thread_ready(nonblock_adder.thread);
895 } else {
896 /*
897 * We won't be able to add workers without blocking if all workers
898 * sleep, but at least boot the system.
899 */
[0d56712]900 printf("Failed to create kworkq-nb. Sleeping work may stall the workq.\n");
[8a64e81e]901 }
902}
903
[0d56712]904/** Returns true if the workq is definitely corrupted; false if not sure.
905 *
906 * Can be used outside of any locks.
907 */
908static bool workq_corrupted(struct work_queue *workq)
909{
910#ifdef CONFIG_DEBUG
911 /*
912 * Needed to make the most current cookie value set by workq_preinit()
913 * visible even if we access the workq right after it is created but
914 * on a different cpu. Otherwise, workq_corrupted() would not work
915 * outside a lock.
916 */
917 memory_barrier();
918 return NULL == workq || workq->cookie != WORKQ_MAGIC;
919#else
920 return false;
921#endif
922}
[8a64e81e]923
[0d56712]924/** Returns true if the work_item is definitely corrupted; false if not sure.
925 *
926 * Must be used with the work queue protecting spinlock locked.
927 */
928static bool work_item_corrupted(work_t *work_item)
929{
930#ifdef CONFIG_DEBUG
931 return NULL == work_item || work_item->cookie != WORK_ITEM_MAGIC;
932#else
933 return false;
934#endif
935}
Note: See TracBrowser for help on using the repository browser.