Changeset 1b20da0 in mainline for kernel/generic/src/synch/workqueue.c
- Timestamp:
- 2018-02-28T17:52:03Z (7 years ago)
- Branches:
- lfn, master, serial, ticket/834-toolchain-update, topic/msim-upgrade, topic/simplify-dev-export
- Children:
- 3061bc1
- Parents:
- df6ded8
- git-author:
- Jiří Zárevúcky <zarevucky.jiri@…> (2018-02-28 17:26:03)
- git-committer:
- Jiří Zárevúcky <zarevucky.jiri@…> (2018-02-28 17:52:03)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
kernel/generic/src/synch/workqueue.c
rdf6ded8 r1b20da0 54 54 55 55 struct work_queue { 56 /* 57 * Protects everything except activate_worker. 56 /* 57 * Protects everything except activate_worker. 58 58 * Must be acquired after any thread->locks. 59 59 */ … … 93 93 /* Magic cookie for integrity checks. Immutable. Accessed without lock. */ 94 94 uint32_t cookie; 95 #endif 95 #endif 96 96 }; 97 97 … … 132 132 static void interrupt_workers(struct work_queue *workq); 133 133 static void wait_for_workers(struct work_queue *workq); 134 static int _workq_enqueue(struct work_queue *workq, work_t *work_item, 134 static int _workq_enqueue(struct work_queue *workq, work_t *work_item, 135 135 work_func_t func, bool can_block); 136 136 static void init_work_item(work_t *work_item, work_func_t func); … … 150 150 void workq_global_worker_init(void) 151 151 { 152 /* 153 * No need for additional synchronization. Stores to word-sized 152 /* 153 * No need for additional synchronization. Stores to word-sized 154 154 * variables are atomic and the change will eventually propagate. 155 155 * Moreover add_worker() includes the necessary memory barriers … … 221 221 #ifdef CONFIG_DEBUG 222 222 workq->cookie = 0; 223 #endif 223 #endif 224 224 225 225 free(workq); … … 231 231 #ifdef CONFIG_DEBUG 232 232 workq->cookie = WORKQ_MAGIC; 233 #endif 233 #endif 234 234 235 235 irq_spinlock_initialize(&workq->lock, name); … … 252 252 } 253 253 254 /** Initializes a work queue. Returns true if successful. 255 * 254 /** Initializes a work queue. Returns true if successful. 255 * 256 256 * Before destroying a work queue it must be stopped via 257 257 * workq_stop(). … … 268 268 assert(!workq_corrupted(workq)); 269 269 270 thread_t *thread = thread_create(worker_thread, workq, TASK, 270 thread_t *thread = thread_create(worker_thread, workq, TASK, 271 271 THREAD_FLAG_NONE, workq->name); 272 272 … … 297 297 cpu_id = CPU->id; 298 298 299 thread->workq = workq; 299 thread->workq = workq; 300 300 thread->cpu = &cpus[cpu_id]; 301 301 thread->workq_blocked = false; … … 305 305 list_append(&thread->workq_link, &workq->workers); 306 306 } else { 307 /* 307 /* 308 308 * Work queue is shutting down - we must not add the worker 309 309 * and we cannot destroy it without ready-ing it. Mark it … … 330 330 } 331 331 332 /** Shuts down the work queue. Waits for all pending work items to complete. 333 * 334 * workq_stop() may only be run once. 332 /** Shuts down the work queue. Waits for all pending work items to complete. 333 * 334 * workq_stop() may only be run once. 335 335 */ 336 336 void workq_stop(struct work_queue *workq) … … 391 391 } 392 392 393 /** Queues a function into the global wait queue without blocking. 394 * 393 /** Queues a function into the global wait queue without blocking. 394 * 395 395 * See workq_enqueue_noblock() for more details. 396 396 */ … … 400 400 } 401 401 402 /** Queues a function into the global wait queue; may block. 403 * 402 /** Queues a function into the global wait queue; may block. 403 * 404 404 * See workq_enqueue() for more details. 405 405 */ … … 409 409 } 410 410 411 /** Adds a function to be invoked in a separate thread without blocking. 412 * 413 * workq_enqueue_noblock() is guaranteed not to block. It is safe 411 /** Adds a function to be invoked in a separate thread without blocking. 412 * 413 * workq_enqueue_noblock() is guaranteed not to block. It is safe 414 414 * to invoke from interrupt handlers. 415 * 415 * 416 416 * Consider using workq_enqueue() instead if at all possible. Otherwise, 417 * your work item may have to wait for previously enqueued sleeping 417 * your work item may have to wait for previously enqueued sleeping 418 418 * work items to complete if you are unlucky. 419 * 419 * 420 420 * @param workq Work queue where to queue the work item. 421 421 * @param work_item Work item bookkeeping structure. Must be valid … … 423 423 * @param func User supplied function to invoke in a worker thread. 424 424 425 * @return false if work queue is shutting down; function is not 426 * queued for further processing. 425 * @return false if work queue is shutting down; function is not 426 * queued for further processing. 427 427 * @return true Otherwise. func() will be invoked in a separate thread. 428 428 */ 429 bool workq_enqueue_noblock(struct work_queue *workq, work_t *work_item, 429 bool workq_enqueue_noblock(struct work_queue *workq, work_t *work_item, 430 430 work_func_t func) 431 431 { … … 433 433 } 434 434 435 /** Adds a function to be invoked in a separate thread; may block. 436 * 437 * While the workq_enqueue() is unlikely to block, it may do so if too 435 /** Adds a function to be invoked in a separate thread; may block. 436 * 437 * While the workq_enqueue() is unlikely to block, it may do so if too 438 438 * many previous work items blocked sleeping. 439 * 439 * 440 440 * @param workq Work queue where to queue the work item. 441 441 * @param work_item Work item bookkeeping structure. Must be valid … … 443 443 * @param func User supplied function to invoke in a worker thread. 444 444 445 * @return false if work queue is shutting down; function is not 446 * queued for further processing. 445 * @return false if work queue is shutting down; function is not 446 * queued for further processing. 447 447 * @return true Otherwise. func() will be invoked in a separate thread. 448 448 */ … … 453 453 454 454 /** Adds a work item that will be processed by a separate worker thread. 455 * 456 * func() will be invoked in another kernel thread and may block. 457 * 455 * 456 * func() will be invoked in another kernel thread and may block. 457 * 458 458 * Prefer to call _workq_enqueue() with can_block set. Otherwise 459 459 * your work item may have to wait for sleeping work items to complete. … … 461 461 * be create without can_block set because creating a thread might 462 462 * block due to low memory conditions. 463 * 463 * 464 464 * @param workq Work queue where to queue the work item. 465 465 * @param work_item Work item bookkeeping structure. Must be valid … … 468 468 * @param can_block May adding this work item block? 469 469 470 * @return false if work queue is shutting down; function is not 471 * queued for further processing. 470 * @return false if work queue is shutting down; function is not 471 * queued for further processing. 472 472 * @return true Otherwise. 473 473 */ 474 static int _workq_enqueue(struct work_queue *workq, work_t *work_item, 474 static int _workq_enqueue(struct work_queue *workq, work_t *work_item, 475 475 work_func_t func, bool can_block) 476 476 { … … 493 493 signal_op = signal_worker_logic(workq, can_block); 494 494 } else { 495 /* 496 * During boot there are no workers to signal. Just queue 495 /* 496 * During boot there are no workers to signal. Just queue 497 497 * the work and let future workers take care of it. 498 498 */ … … 514 514 #ifdef CONFIG_DEBUG 515 515 work_item->cookie = WORK_ITEM_MAGIC; 516 #endif 516 #endif 517 517 518 518 link_initialize(&work_item->queue_link); … … 537 537 assert(workq->activate_pending <= workq->idle_worker_cnt); 538 538 539 /* 540 * Workers actively running the work func() this very moment and 541 * are neither blocked nor idle. Exclude ->activate_pending workers 542 * since they will run their work func() once they get a time slice 539 /* 540 * Workers actively running the work func() this very moment and 541 * are neither blocked nor idle. Exclude ->activate_pending workers 542 * since they will run their work func() once they get a time slice 543 543 * and are not running it right now. 544 544 */ … … 546 546 } 547 547 548 /** 549 * Returns the number of workers that are running or are about to run work 550 * func() and that are not blocked. 548 /** 549 * Returns the number of workers that are running or are about to run work 550 * func() and that are not blocked. 551 551 */ 552 552 static size_t active_workers(struct work_queue *workq) … … 554 554 assert(irq_spinlock_locked(&workq->lock)); 555 555 556 /* 557 * Workers actively running the work func() and are neither blocked nor 556 /* 557 * Workers actively running the work func() and are neither blocked nor 558 558 * idle. ->activate_pending workers will run their work func() once they 559 559 * get a time slice after waking from a condvar wait, so count them … … 586 586 587 587 /** Determines how to signal workers if at all. 588 * 588 * 589 589 * @param workq Work queue where a new work item was queued. 590 * @param can_block True if we may block while signaling a worker or creating 590 * @param can_block True if we may block while signaling a worker or creating 591 591 * a new worker. 592 * 592 * 593 593 * @return Function that will notify workers or NULL if no action is needed. 594 594 */ … … 601 601 signal_op_t signal_op = NULL; 602 602 603 /* 604 * Workers actively running the work func() and neither blocked nor idle. 605 * Including ->activate_pending workers that will run their work func() 603 /* 604 * Workers actively running the work func() and neither blocked nor idle. 605 * Including ->activate_pending workers that will run their work func() 606 606 * once they get a time slice. 607 607 */ … … 613 613 if (max_load < workq->item_cnt) { 614 614 615 size_t remaining_idle = 615 size_t remaining_idle = 616 616 workq->idle_worker_cnt - workq->activate_pending; 617 617 618 618 /* Idle workers still exist - activate one. */ 619 619 if (remaining_idle > 0) { 620 /* 620 /* 621 621 * Directly changing idle_worker_cnt here would not allow 622 * workers to recognize spurious wake-ups. Change 622 * workers to recognize spurious wake-ups. Change 623 623 * activate_pending instead. 624 624 */ … … 633 633 if (need_worker && can_block) { 634 634 signal_op = add_worker_op; 635 /* 635 /* 636 636 * It may take some time to actually create the worker. 637 637 * We don't want to swamp the thread pool with superfluous … … 642 642 } 643 643 644 /* 644 /* 645 645 * We cannot create a new worker but we need one desperately 646 646 * because all workers are blocked in their work functions. … … 661 661 } 662 662 } else { 663 /* 664 * There are enough active/running workers to process the queue. 663 /* 664 * There are enough active/running workers to process the queue. 665 665 * No need to signal/activate any new workers. 666 666 */ … … 674 674 static void worker_thread(void *arg) 675 675 { 676 /* 677 * The thread has been created after the work queue was ordered to stop. 678 * Do not access the work queue and return immediately. 676 /* 677 * The thread has been created after the work queue was ordered to stop. 678 * Do not access the work queue and return immediately. 679 679 */ 680 680 if (thread_interrupted(THREAD)) { … … 760 760 return (min_worker_cnt <= workq->idle_worker_cnt); 761 761 } else { 762 /* 762 /* 763 763 * There is work but we are swamped with too many active workers 764 764 * that were woken up from sleep at around the same time. We … … 854 854 bool stopping = workq->stopping; 855 855 bool worker_surplus = worker_unnecessary(workq); 856 const char *load_str = worker_surplus ? "decreasing" : 856 const char *load_str = worker_surplus ? "decreasing" : 857 857 (0 < workq->activate_pending) ? "increasing" : "stable"; 858 858 … … 869 869 "Stopping: %d\n" 870 870 "Load: %s\n", 871 max_worker_cnt, min_worker_cnt, 871 max_worker_cnt, min_worker_cnt, 872 872 max_concurrent_workers, max_items_per_worker, 873 873 total, … … 895 895 896 896 while (list_empty(&info->work_queues) && !stop) { 897 errno_t ret = _condvar_wait_timeout_irq_spinlock(&info->req_cv, 897 errno_t ret = _condvar_wait_timeout_irq_spinlock(&info->req_cv, 898 898 &info->lock, SYNCH_NO_TIMEOUT, SYNCH_FLAGS_INTERRUPTIBLE); 899 899 … … 902 902 903 903 if (!stop) { 904 *pworkq = list_get_instance(list_first(&info->work_queues), 904 *pworkq = list_get_instance(list_first(&info->work_queues), 905 905 struct work_queue, nb_link); 906 906 … … 932 932 list_initialize(&nonblock_adder.work_queues); 933 933 934 nonblock_adder.thread = thread_create(thr_nonblock_add_worker, 934 nonblock_adder.thread = thread_create(thr_nonblock_add_worker, 935 935 &nonblock_adder, TASK, THREAD_FLAG_NONE, "kworkq-nb"); 936 936 … … 938 938 thread_ready(nonblock_adder.thread); 939 939 } else { 940 /* 940 /* 941 941 * We won't be able to add workers without blocking if all workers 942 942 * sleep, but at least boot the system. … … 947 947 948 948 #ifdef CONFIG_DEBUG 949 /** Returns true if the workq is definitely corrupted; false if not sure. 950 * 949 /** Returns true if the workq is definitely corrupted; false if not sure. 950 * 951 951 * Can be used outside of any locks. 952 952 */ 953 953 static bool workq_corrupted(struct work_queue *workq) 954 954 { 955 /* 955 /* 956 956 * Needed to make the most current cookie value set by workq_preinit() 957 957 * visible even if we access the workq right after it is created but … … 963 963 } 964 964 965 /** Returns true if the work_item is definitely corrupted; false if not sure. 966 * 965 /** Returns true if the work_item is definitely corrupted; false if not sure. 966 * 967 967 * Must be used with the work queue protecting spinlock locked. 968 968 */
Note:
See TracChangeset
for help on using the changeset viewer.