Changeset a35b458 in mainline for kernel/generic/src/synch/workqueue.c
- Timestamp:
- 2018-03-02T20:10:49Z (7 years ago)
- Branches:
- lfn, master, serial, ticket/834-toolchain-update, topic/msim-upgrade, topic/simplify-dev-export
- Children:
- f1380b7
- Parents:
- 3061bc1
- git-author:
- Jiří Zárevúcky <zarevucky.jiri@…> (2018-02-28 17:38:31)
- git-committer:
- Jiří Zárevúcky <zarevucky.jiri@…> (2018-03-02 20:10:49)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
kernel/generic/src/synch/workqueue.c
r3061bc1 ra35b458 59 59 */ 60 60 IRQ_SPINLOCK_DECLARE(lock); 61 61 62 62 /* Activates a worker if new work arrives or if shutting down the queue. */ 63 63 condvar_t activate_worker; 64 64 65 65 /* Queue of work_items ready to be dispatched. */ 66 66 list_t queue; 67 67 68 68 /* List of worker threads. */ 69 69 list_t workers; 70 70 71 71 /* Number of work items queued. */ 72 72 size_t item_cnt; 73 73 74 74 /* Indicates the work queue is shutting down. */ 75 75 bool stopping; … … 84 84 /* Number of blocked workers sleeping in work func() (ie not idle). */ 85 85 size_t blocked_worker_cnt; 86 86 87 87 /* Number of pending signal_worker_op() operations. */ 88 88 size_t pending_op_cnt; 89 89 90 90 link_t nb_link; 91 91 92 92 #ifdef CONFIG_DEBUG 93 93 /* Magic cookie for integrity checks. Immutable. Accessed without lock. */ … … 105 105 /** Max number of work items per active worker before a new worker is activated.*/ 106 106 static const size_t max_items_per_worker = 8; 107 107 108 108 /** System wide work queue. */ 109 109 static struct work_queue g_work_queue; … … 157 157 */ 158 158 booting = false; 159 159 160 160 nonblock_init(); 161 161 162 162 if (!add_worker(&g_work_queue)) 163 163 panic("Could not create a single global work queue worker!\n"); 164 164 165 165 } 166 166 … … 174 174 /* Maximum concurrency without slowing down the system. */ 175 175 max_concurrent_workers = max(2, config.cpu_count); 176 176 177 177 workq_preinit(&g_work_queue, "kworkq"); 178 178 } … … 188 188 { 189 189 struct work_queue *workq = malloc(sizeof(struct work_queue), 0); 190 190 191 191 if (workq) { 192 192 if (workq_init(workq, name)) { … … 194 194 return workq; 195 195 } 196 196 197 197 free(workq); 198 198 } 199 199 200 200 return NULL; 201 201 } … … 205 205 { 206 206 assert(!workq_corrupted(workq)); 207 207 208 208 irq_spinlock_lock(&workq->lock, true); 209 209 bool stopped = workq->stopping; … … 212 212 #endif 213 213 irq_spinlock_unlock(&workq->lock, true); 214 214 215 215 if (!stopped) { 216 216 workq_stop(workq); … … 218 218 assert(0 == running_workers); 219 219 } 220 220 221 221 #ifdef CONFIG_DEBUG 222 222 workq->cookie = 0; 223 223 #endif 224 224 225 225 free(workq); 226 226 } … … 232 232 workq->cookie = WORKQ_MAGIC; 233 233 #endif 234 234 235 235 irq_spinlock_initialize(&workq->lock, name); 236 236 condvar_initialize(&workq->activate_worker); 237 237 238 238 list_initialize(&workq->queue); 239 239 list_initialize(&workq->workers); 240 240 241 241 workq->item_cnt = 0; 242 242 workq->stopping = false; 243 243 workq->name = name; 244 244 245 245 workq->cur_worker_cnt = 1; 246 246 workq->idle_worker_cnt = 0; 247 247 workq->activate_pending = 0; 248 248 workq->blocked_worker_cnt = 0; 249 249 250 250 workq->pending_op_cnt = 0; 251 251 link_initialize(&workq->nb_link); … … 270 270 thread_t *thread = thread_create(worker_thread, workq, TASK, 271 271 THREAD_FLAG_NONE, workq->name); 272 272 273 273 if (!thread) { 274 274 irq_spinlock_lock(&workq->lock, true); 275 275 276 276 /* cur_worker_cnt proactively increased in signal_worker_logic() .*/ 277 277 assert(0 < workq->cur_worker_cnt); 278 278 --workq->cur_worker_cnt; 279 279 280 280 irq_spinlock_unlock(&workq->lock, true); 281 281 return false; 282 282 } 283 283 284 284 /* Respect lock ordering. */ 285 285 irq_spinlock_lock(&thread->lock, true); … … 290 290 if (!workq->stopping) { 291 291 success = true; 292 292 293 293 /* Try to distribute workers among cpus right away. */ 294 294 unsigned int cpu_id = (workq->cur_worker_cnt) % config.cpu_active; 295 295 296 296 if (!cpus[cpu_id].active) 297 297 cpu_id = CPU->id; … … 312 312 */ 313 313 success = false; 314 314 315 315 /* cur_worker_cnt proactively increased in signal_worker() .*/ 316 316 assert(0 < workq->cur_worker_cnt); 317 317 --workq->cur_worker_cnt; 318 318 } 319 319 320 320 irq_spinlock_unlock(&workq->lock, false); 321 321 irq_spinlock_unlock(&thread->lock, true); … … 324 324 thread_interrupt(thread); 325 325 } 326 326 327 327 thread_ready(thread); 328 328 329 329 return success; 330 330 } … … 337 337 { 338 338 assert(!workq_corrupted(workq)); 339 339 340 340 interrupt_workers(workq); 341 341 wait_for_workers(workq); … … 350 350 assert(!workq->stopping); 351 351 workq->stopping = true; 352 352 353 353 /* Respect lock ordering - do not hold workq->lock during broadcast. */ 354 354 irq_spinlock_unlock(&workq->lock, true); 355 355 356 356 condvar_broadcast(&workq->activate_worker); 357 357 } … … 361 361 { 362 362 assert(!PREEMPTION_DISABLED); 363 363 364 364 irq_spinlock_lock(&workq->lock, true); 365 365 366 366 list_foreach_safe(workq->workers, cur_worker, next_worker) { 367 367 thread_t *worker = list_get_instance(cur_worker, thread_t, workq_link); … … 370 370 /* Wait without the lock. */ 371 371 irq_spinlock_unlock(&workq->lock, true); 372 372 373 373 thread_join(worker); 374 374 thread_detach(worker); 375 375 376 376 irq_spinlock_lock(&workq->lock, true); 377 377 } 378 378 379 379 assert(list_empty(&workq->workers)); 380 380 381 381 /* Wait for deferred add_worker_op(), signal_worker_op() to finish. */ 382 382 while (0 < workq->cur_worker_cnt || 0 < workq->pending_op_cnt) { 383 383 irq_spinlock_unlock(&workq->lock, true); 384 384 385 385 scheduler(); 386 386 387 387 irq_spinlock_lock(&workq->lock, true); 388 388 } 389 389 390 390 irq_spinlock_unlock(&workq->lock, true); 391 391 } … … 422 422 * until func() is entered. 423 423 * @param func User supplied function to invoke in a worker thread. 424 424 425 425 * @return false if work queue is shutting down; function is not 426 426 * queued for further processing. … … 442 442 * until func() is entered. 443 443 * @param func User supplied function to invoke in a worker thread. 444 444 445 445 * @return false if work queue is shutting down; function is not 446 446 * queued for further processing. … … 467 467 * @param func User supplied function to invoke in a worker thread. 468 468 * @param can_block May adding this work item block? 469 469 470 470 * @return false if work queue is shutting down; function is not 471 471 * queued for further processing. … … 476 476 { 477 477 assert(!workq_corrupted(workq)); 478 478 479 479 bool success = true; 480 480 signal_op_t signal_op = NULL; 481 481 482 482 irq_spinlock_lock(&workq->lock, true); 483 483 484 484 if (workq->stopping) { 485 485 success = false; … … 489 489 ++workq->item_cnt; 490 490 success = true; 491 491 492 492 if (!booting) { 493 493 signal_op = signal_worker_logic(workq, can_block); … … 499 499 } 500 500 } 501 501 502 502 irq_spinlock_unlock(&workq->lock, true); 503 503 … … 505 505 signal_op(workq); 506 506 } 507 507 508 508 return success; 509 509 } … … 515 515 work_item->cookie = WORK_ITEM_MAGIC; 516 516 #endif 517 517 518 518 link_initialize(&work_item->queue_link); 519 519 work_item->func = func; … … 524 524 { 525 525 assert(irq_spinlock_locked(&workq->lock)); 526 526 527 527 /* Workers blocked are sleeping in the work function (ie not idle). */ 528 528 assert(workq->blocked_worker_cnt <= workq->cur_worker_cnt); 529 529 /* Idle workers are waiting for more work to arrive in condvar_wait. */ 530 530 assert(workq->idle_worker_cnt <= workq->cur_worker_cnt); 531 531 532 532 /* Idle + blocked workers == sleeping worker threads. */ 533 533 size_t sleeping_workers = workq->blocked_worker_cnt + workq->idle_worker_cnt; 534 534 535 535 assert(sleeping_workers <= workq->cur_worker_cnt); 536 536 /* Workers pending activation are idle workers not yet given a time slice. */ 537 537 assert(workq->activate_pending <= workq->idle_worker_cnt); 538 538 539 539 /* 540 540 * Workers actively running the work func() this very moment and … … 553 553 { 554 554 assert(irq_spinlock_locked(&workq->lock)); 555 555 556 556 /* 557 557 * Workers actively running the work func() and are neither blocked nor … … 578 578 579 579 condvar_signal(&workq->activate_worker); 580 580 581 581 irq_spinlock_lock(&workq->lock, true); 582 582 assert(0 < workq->pending_op_cnt); … … 597 597 assert(!workq_corrupted(workq)); 598 598 assert(irq_spinlock_locked(&workq->lock)); 599 599 600 600 /* Only signal workers if really necessary. */ 601 601 signal_op_t signal_op = NULL; … … 630 630 bool need_worker = (active < max_concurrent_workers) 631 631 && (workq->cur_worker_cnt < max_worker_cnt); 632 632 633 633 if (need_worker && can_block) { 634 634 signal_op = add_worker_op; … … 641 641 ++workq->cur_worker_cnt; 642 642 } 643 643 644 644 /* 645 645 * We cannot create a new worker but we need one desperately … … 648 648 if (need_worker && !can_block && 0 == active) { 649 649 assert(0 == workq->idle_worker_cnt); 650 650 651 651 irq_spinlock_lock(&nonblock_adder.lock, true); 652 652 … … 667 667 signal_op = NULL; 668 668 } 669 669 670 670 return signal_op; 671 671 } … … 682 682 return; 683 683 } 684 684 685 685 assert(arg != NULL); 686 686 687 687 struct work_queue *workq = arg; 688 688 work_t *work_item; 689 689 690 690 while (dequeue_work(workq, &work_item)) { 691 691 /* Copy the func field so func() can safely free work_item. */ … … 700 700 { 701 701 assert(!workq_corrupted(workq)); 702 702 703 703 irq_spinlock_lock(&workq->lock, true); 704 704 705 705 /* Check if we should exit if load is low. */ 706 706 if (!workq->stopping && worker_unnecessary(workq)) { … … 710 710 list_remove(&THREAD->workq_link); 711 711 irq_spinlock_unlock(&workq->lock, true); 712 712 713 713 thread_detach(THREAD); 714 714 return false; 715 715 } 716 716 717 717 bool stop = false; 718 718 719 719 /* Wait for work to arrive. */ 720 720 while (list_empty(&workq->queue) && !workq->stopping) { 721 721 cv_wait(workq); 722 722 723 723 if (0 < workq->activate_pending) 724 724 --workq->activate_pending; … … 729 729 link_t *work_link = list_first(&workq->queue); 730 730 *pwork_item = list_get_instance(work_link, work_t, queue_link); 731 731 732 732 #ifdef CONFIG_DEBUG 733 733 assert(!work_item_corrupted(*pwork_item)); … … 736 736 list_remove(work_link); 737 737 --workq->item_cnt; 738 738 739 739 stop = false; 740 740 } else { … … 744 744 stop = true; 745 745 } 746 746 747 747 irq_spinlock_unlock(&workq->lock, true); 748 748 749 749 return !stop; 750 750 } … … 754 754 { 755 755 assert(irq_spinlock_locked(&workq->lock)); 756 756 757 757 /* No work is pending. We don't need too many idle threads. */ 758 758 if (list_empty(&workq->queue)) { … … 775 775 ++workq->idle_worker_cnt; 776 776 THREAD->workq_idling = true; 777 777 778 778 /* Ignore lock ordering just here. */ 779 779 assert(irq_spinlock_locked(&workq->lock)); 780 780 781 781 _condvar_wait_timeout_irq_spinlock(&workq->activate_worker, 782 782 &workq->lock, SYNCH_NO_TIMEOUT, SYNCH_FLAGS_NONE); … … 784 784 assert(!workq_corrupted(workq)); 785 785 assert(irq_spinlock_locked(&workq->lock)); 786 786 787 787 THREAD->workq_idling = false; 788 788 --workq->idle_worker_cnt; … … 803 803 assert(THREAD != thread); 804 804 assert(!workq_corrupted(thread->workq)); 805 805 806 806 /* Protected by thread->lock */ 807 807 thread->workq_blocked = false; 808 808 809 809 irq_spinlock_lock(&thread->workq->lock, true); 810 810 --thread->workq->blocked_worker_cnt; … … 823 823 assert(!THREAD->workq_blocked); 824 824 assert(!workq_corrupted(THREAD->workq)); 825 825 826 826 THREAD->workq_blocked = true; 827 827 828 828 irq_spinlock_lock(&THREAD->workq->lock, false); 829 829 830 830 ++THREAD->workq->blocked_worker_cnt; 831 831 832 832 bool can_block = false; 833 833 signal_op_t op = signal_worker_logic(THREAD->workq, can_block); 834 834 835 835 irq_spinlock_unlock(&THREAD->workq->lock, false); 836 836 837 837 if (op) { 838 838 assert(add_worker_noblock_op == op || signal_worker_op == op); … … 856 856 const char *load_str = worker_surplus ? "decreasing" : 857 857 (0 < workq->activate_pending) ? "increasing" : "stable"; 858 858 859 859 irq_spinlock_unlock(&workq->lock, true); 860 860 861 861 printf( 862 862 "Configuration: max_worker_cnt=%zu, min_worker_cnt=%zu,\n" … … 893 893 894 894 irq_spinlock_lock(&info->lock, true); 895 895 896 896 while (list_empty(&info->work_queues) && !stop) { 897 897 errno_t ret = _condvar_wait_timeout_irq_spinlock(&info->req_cv, 898 898 &info->lock, SYNCH_NO_TIMEOUT, SYNCH_FLAGS_INTERRUPTIBLE); 899 899 900 900 stop = (ret == EINTR); 901 901 } 902 902 903 903 if (!stop) { 904 904 *pworkq = list_get_instance(list_first(&info->work_queues), … … 906 906 907 907 assert(!workq_corrupted(*pworkq)); 908 908 909 909 list_remove(&(*pworkq)->nb_link); 910 910 } 911 911 912 912 irq_spinlock_unlock(&info->lock, true); 913 913 914 914 return !stop; 915 915 } … … 919 919 nonblock_adder_t *info = arg; 920 920 struct work_queue *workq; 921 921 922 922 while (dequeue_add_req(info, &workq)) { 923 923 add_worker(workq); … … 931 931 condvar_initialize(&nonblock_adder.req_cv); 932 932 list_initialize(&nonblock_adder.work_queues); 933 933 934 934 nonblock_adder.thread = thread_create(thr_nonblock_add_worker, 935 935 &nonblock_adder, TASK, THREAD_FLAG_NONE, "kworkq-nb"); 936 936 937 937 if (nonblock_adder.thread) { 938 938 thread_ready(nonblock_adder.thread);
Note:
See TracChangeset
for help on using the changeset viewer.