Changeset a35b458 in mainline for kernel/generic/src/synch
- Timestamp:
- 2018-03-02T20:10:49Z (7 years ago)
- Branches:
- lfn, master, serial, ticket/834-toolchain-update, topic/msim-upgrade, topic/simplify-dev-export
- Children:
- f1380b7
- Parents:
- 3061bc1
- git-author:
- Jiří Zárevúcky <zarevucky.jiri@…> (2018-02-28 17:38:31)
- git-committer:
- Jiří Zárevúcky <zarevucky.jiri@…> (2018-03-02 20:10:49)
- Location:
- kernel/generic/src/synch
- Files:
-
- 8 edited
Legend:
- Unmodified
- Added
- Removed
-
kernel/generic/src/synch/condvar.c
r3061bc1 ra35b458 143 143 /* Lock only after releasing the waitq to avoid a possible deadlock. */ 144 144 spinlock_lock(lock); 145 145 146 146 return rc; 147 147 } … … 168 168 ipl_t ipl = irq_lock->ipl; 169 169 bool guard = irq_lock->guard; 170 170 171 171 irq_lock->guard = false; 172 172 173 173 /* 174 174 * waitq_prepare() restores interrupts to the current state, … … 182 182 */ 183 183 rc = _condvar_wait_timeout_spinlock(cv, &irq_lock->lock, usec, flags); 184 184 185 185 irq_lock->guard = guard; 186 186 irq_lock->ipl = ipl; 187 187 188 188 return rc; 189 189 } -
kernel/generic/src/synch/futex.c
r3061bc1 ra35b458 157 157 { 158 158 task->futexes = malloc(sizeof(struct futex_cache), 0); 159 159 160 160 cht_create(&task->futexes->ht, 0, 0, 0, true, &task_futex_ht_ops); 161 161 162 162 list_initialize(&task->futexes->list); 163 163 spinlock_initialize(&task->futexes->list_lock, "futex-list-lock"); … … 183 183 struct futex_cache *cache = 184 184 member_to_inst(work, struct futex_cache, destroy_work); 185 185 186 186 /* 187 187 * Destroy the cache before manually freeing items of the cache in case … … 189 189 */ 190 190 cht_destroy_unsafe(&cache->ht); 191 191 192 192 /* Manually free futex_ptr cache items. */ 193 193 list_foreach_safe(cache->list, cur_link, next_link) { … … 197 197 free(fut_ptr); 198 198 } 199 199 200 200 free(cache); 201 201 } … … 205 205 { 206 206 struct futex_cache *futexes = TASK->futexes; 207 207 208 208 /* All threads of this task have terminated. This is the last thread. */ 209 209 spinlock_lock(&futexes->list_lock); 210 210 211 211 list_foreach_safe(futexes->list, cur_link, next_link) { 212 212 futex_ptr_t *fut_ptr = member_to_inst(cur_link, futex_ptr_t, all_link); … … 222 222 futex_release_ref_locked(fut_ptr->futex); 223 223 } 224 224 225 225 spinlock_unlock(&futexes->list_lock); 226 226 } … … 252 252 assert(spinlock_locked(&futex_ht_lock)); 253 253 assert(0 < futex->refcount); 254 254 255 255 --futex->refcount; 256 256 257 257 if (0 == futex->refcount) { 258 258 hash_table_remove(&futex_ht, &futex->paddr); … … 272 272 { 273 273 futex_t *futex = find_cached_futex(uaddr); 274 274 275 275 if (futex) 276 276 return futex; … … 303 303 (uaddr - ALIGN_DOWN(uaddr, PAGE_SIZE)); 304 304 } 305 305 306 306 spinlock_unlock(&futex_ht_lock); 307 307 page_table_unlock(AS, false); 308 308 309 309 return success; 310 310 } … … 314 314 { 315 315 cht_read_lock(); 316 316 317 317 futex_t *futex; 318 318 cht_link_t *futex_ptr_link = cht_find_lazy(&TASK->futexes->ht, &uaddr); … … 321 321 futex_ptr_t *futex_ptr 322 322 = member_to_inst(futex_ptr_link, futex_ptr_t, cht_link); 323 323 324 324 futex = futex_ptr->futex; 325 325 } else { 326 326 futex = NULL; 327 327 } 328 328 329 329 cht_read_unlock(); 330 330 331 331 return futex; 332 332 } … … 340 340 { 341 341 futex_t *futex = malloc(sizeof(futex_t), 0); 342 342 343 343 /* 344 344 * Find the futex object in the global futex table (or insert it … … 346 346 */ 347 347 spinlock_lock(&futex_ht_lock); 348 348 349 349 ht_link_t *fut_link = hash_table_find(&futex_ht, &phys_addr); 350 350 351 351 if (fut_link) { 352 352 free(futex); … … 357 357 hash_table_insert(&futex_ht, &futex->ht_link); 358 358 } 359 359 360 360 spinlock_unlock(&futex_ht_lock); 361 361 362 362 /* 363 363 * Cache the link to the futex object for this task. … … 365 365 futex_ptr_t *fut_ptr = malloc(sizeof(futex_ptr_t), 0); 366 366 cht_link_t *dup_link; 367 367 368 368 fut_ptr->futex = futex; 369 369 fut_ptr->uaddr = uaddr; 370 370 371 371 cht_read_lock(); 372 372 373 373 /* Cache the mapping from the virtual address to the futex for this task. */ 374 374 if (cht_insert_unique(&TASK->futexes->ht, &fut_ptr->cht_link, &dup_link)) { … … 380 380 free(fut_ptr); 381 381 futex_release_ref_locked(futex); 382 382 383 383 futex_ptr_t *dup = member_to_inst(dup_link, futex_ptr_t, cht_link); 384 384 futex = dup->futex; … … 386 386 387 387 cht_read_unlock(); 388 388 389 389 return futex; 390 390 } … … 401 401 { 402 402 futex_t *futex = get_futex(uaddr); 403 403 404 404 if (!futex) 405 405 return (sys_errno_t) ENOENT; … … 428 428 { 429 429 futex_t *futex = get_futex(uaddr); 430 430 431 431 if (futex) { 432 432 waitq_wakeup(&futex->wq, WAKEUP_FIRST); … … 492 492 const futex_ptr_t *fut_ptr1 = member_to_inst(item1, futex_ptr_t, cht_link); 493 493 const futex_ptr_t *fut_ptr2 = member_to_inst(item2, futex_ptr_t, cht_link); 494 494 495 495 return fut_ptr1->uaddr == fut_ptr2->uaddr; 496 496 } … … 500 500 const futex_ptr_t *fut_ptr = member_to_inst(item, futex_ptr_t, cht_link); 501 501 uintptr_t uaddr = *(uintptr_t*)key; 502 502 503 503 return fut_ptr->uaddr == uaddr; 504 504 } -
kernel/generic/src/synch/mutex.c
r3061bc1 ra35b458 108 108 assert(usec == SYNCH_NO_TIMEOUT); 109 109 assert(!(flags & SYNCH_FLAGS_INTERRUPTIBLE)); 110 110 111 111 unsigned int cnt = 0; 112 112 bool deadlock_reported = false; -
kernel/generic/src/synch/rcu.c
r3061bc1 ra35b458 26 26 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 27 27 */ 28 29 28 29 30 30 /** @addtogroup sync 31 31 * @{ … … 182 182 */ 183 183 rcu_gp_t completed_gp; 184 184 185 185 /** Protects the following 3 fields. */ 186 186 IRQ_SPINLOCK_DECLARE(preempt_lock); … … 195 195 */ 196 196 bool preempt_blocking_det; 197 197 198 198 #ifdef RCU_PREEMPT_A 199 199 200 200 /** 201 201 * The detector waits on this semaphore for any preempted readers … … 205 205 206 206 #elif defined(RCU_PREEMPT_PODZIMEK) 207 207 208 208 /** Reclaimers notify the detector when they request more grace periods.*/ 209 209 condvar_t req_gp_changed; … … 228 228 semaphore_t remaining_readers; 229 229 #endif 230 230 231 231 /** Excludes simultaneous rcu_barrier() calls. */ 232 232 mutex_t barrier_mtx; … … 235 235 /** rcu_barrier() waits for the completion of barrier callbacks on this wq.*/ 236 236 waitq_t barrier_wq; 237 237 238 238 /** Interruptible attached detector thread pointer. */ 239 239 thread_t *detector_thr; 240 240 241 241 /* Some statistics. */ 242 242 size_t stat_expedited_cnt; … … 305 305 _rcu_cur_gp = 0; 306 306 rcu.completed_gp = 0; 307 307 308 308 irq_spinlock_initialize(&rcu.preempt_lock, "rcu.preempt_lock"); 309 309 list_initialize(&rcu.cur_preempted); 310 310 list_initialize(&rcu.next_preempted); 311 311 rcu.preempt_blocking_det = false; 312 312 313 313 mutex_initialize(&rcu.barrier_mtx, MUTEX_PASSIVE); 314 314 atomic_set(&rcu.barrier_wait_cnt, 0); … … 316 316 317 317 semaphore_initialize(&rcu.remaining_readers, 0); 318 318 319 319 #ifdef RCU_PREEMPT_PODZIMEK 320 320 condvar_initialize(&rcu.req_gp_changed); 321 321 322 322 rcu.req_gp_end_cnt = 0; 323 323 rcu.req_expedited_cnt = 0; 324 324 atomic_set(&rcu.delaying_cpu_cnt, 0); 325 325 #endif 326 326 327 327 rcu.detector_thr = NULL; 328 328 329 329 rcu.stat_expedited_cnt = 0; 330 330 rcu.stat_delayed_cnt = 0; … … 347 347 CPU->rcu.signal_unlock = false; 348 348 #endif 349 349 350 350 CPU->rcu.cur_cbs = NULL; 351 351 CPU->rcu.cur_cbs_cnt = 0; … … 358 358 CPU->rcu.cur_cbs_gp = 0; 359 359 CPU->rcu.next_cbs_gp = 0; 360 360 361 361 semaphore_initialize(&CPU->rcu.arrived_flag, 0); 362 362 … … 364 364 if (config.cpu_active == 1) 365 365 CPU->rcu.reclaimer_thr = NULL; 366 366 367 367 CPU->rcu.stat_max_cbs = 0; 368 368 CPU->rcu.stat_avg_cbs = 0; … … 379 379 start_detector(); 380 380 #endif 381 381 382 382 start_reclaimers(); 383 383 } … … 391 391 thread->rcu.was_preempted = false; 392 392 #endif 393 393 394 394 link_initialize(&thread->rcu.preempt_link); 395 395 } … … 406 406 for (unsigned int cpu_id = 0; cpu_id < config.cpu_active; ++cpu_id) { 407 407 assert(cpus[cpu_id].rcu.reclaimer_thr != NULL); 408 408 409 409 if (cpus[cpu_id].rcu.reclaimer_thr) { 410 410 thread_interrupt(cpus[cpu_id].rcu.reclaimer_thr); … … 432 432 uint64_t completed = rcu.completed_gp; 433 433 spinlock_unlock(&rcu.gp_lock); 434 434 435 435 return completed; 436 436 } … … 441 441 for (unsigned int cpu_id = 0; cpu_id < config.cpu_count; ++cpu_id) { 442 442 char name[THREAD_NAME_BUFLEN] = {0}; 443 443 444 444 snprintf(name, THREAD_NAME_BUFLEN - 1, "rcu-rec/%u", cpu_id); 445 445 446 446 cpus[cpu_id].rcu.reclaimer_thr = 447 447 thread_create(reclaimer, NULL, TASK, THREAD_FLAG_NONE, name); … … 462 462 rcu.detector_thr = 463 463 thread_create(detector, NULL, TASK, THREAD_FLAG_NONE, "rcu-det"); 464 464 465 465 if (!rcu.detector_thr) 466 466 panic("Failed to create RCU detector thread."); 467 467 468 468 thread_ready(rcu.detector_thr); 469 469 } … … 475 475 bool locked = 0 < CPU->rcu.nesting_cnt; 476 476 preemption_enable(); 477 477 478 478 return locked; 479 479 } … … 489 489 { 490 490 assert(PREEMPTION_DISABLED || interrupts_disabled()); 491 491 492 492 if (0 == --(*pnesting_cnt)) { 493 493 _rcu_record_qs(); 494 494 495 495 /* 496 496 * The thread was preempted while in a critical section or … … 511 511 { 512 512 assert(PREEMPTION_DISABLED || interrupts_disabled()); 513 513 514 514 /* 515 515 * If an interrupt occurs here (even a NMI) it may beat us to … … 517 517 * for us. 518 518 */ 519 519 520 520 /* 521 521 * If the detector is eagerly waiting for this cpu's reader to unlock, … … 525 525 semaphore_up(&rcu.remaining_readers); 526 526 } 527 527 528 528 /* 529 529 * This reader was preempted while in a reader section. … … 536 536 rm_preempted_reader(); 537 537 } 538 538 539 539 /* If there was something to signal to the detector we have done so. */ 540 540 CPU->rcu.signal_unlock = false; … … 565 565 /* Calling from a reader section will deadlock. */ 566 566 assert(!rcu_read_locked()); 567 567 568 568 synch_item_t completion; 569 569 … … 589 589 */ 590 590 mutex_lock(&rcu.barrier_mtx); 591 591 592 592 /* 593 593 * Ensure we queue a barrier callback on all cpus before the already … … 598 598 DEFINE_CPU_MASK(cpu_mask); 599 599 cpu_mask_active(cpu_mask); 600 600 601 601 cpu_mask_for_each(*cpu_mask, cpu_id) { 602 602 smp_call(cpu_id, add_barrier_cb, NULL); 603 603 } 604 604 605 605 if (0 < atomic_predec(&rcu.barrier_wait_cnt)) { 606 606 waitq_sleep(&rcu.barrier_wq); 607 607 } 608 608 609 609 mutex_unlock(&rcu.barrier_mtx); 610 610 } … … 659 659 { 660 660 assert(rcu_item); 661 661 662 662 rcu_item->func = func; 663 663 rcu_item->next = NULL; 664 664 665 665 preemption_disable(); 666 666 … … 670 670 = local_atomic_exchange(&r->parriving_cbs_tail, &rcu_item->next); 671 671 *prev_tail = rcu_item; 672 672 673 673 /* Approximate the number of callbacks present. */ 674 674 ++r->arriving_cbs_cnt; 675 675 676 676 if (expedite) { 677 677 r->expedite_arriving = true; 678 678 } 679 679 680 680 bool first_cb = (prev_tail == &CPU->rcu.arriving_cbs); 681 681 682 682 /* Added first callback - notify the reclaimer. */ 683 683 if (first_cb && !semaphore_count_get(&r->arrived_flag)) { 684 684 semaphore_up(&r->arrived_flag); 685 685 } 686 686 687 687 preemption_enable(); 688 688 } … … 725 725 rcu_gp_t last_compl_gp = 0; 726 726 bool ok = true; 727 727 728 728 while (ok && wait_for_pending_cbs()) { 729 729 assert(CPU->rcu.reclaimer_thr == THREAD); 730 730 731 731 exec_completed_cbs(last_compl_gp); 732 732 733 733 bool expedite = advance_cbs(); 734 734 735 735 ok = wait_for_cur_cbs_gp_end(expedite, &last_compl_gp); 736 736 } … … 744 744 745 745 bool ok = true; 746 746 747 747 while (arriving_cbs_empty() && ok) { 748 748 ok = semaphore_down_interruptable(&CPU->rcu.arrived_flag); 749 749 } 750 750 751 751 return ok; 752 752 } … … 763 763 { 764 764 upd_stat_missed_gp(last_completed_gp); 765 765 766 766 /* Both next_cbs and cur_cbs GP elapsed. */ 767 767 if (CPU->rcu.next_cbs_gp <= last_completed_gp) { 768 768 assert(CPU->rcu.cur_cbs_gp <= CPU->rcu.next_cbs_gp); 769 769 770 770 size_t exec_cnt = CPU->rcu.cur_cbs_cnt + CPU->rcu.next_cbs_cnt; 771 771 772 772 if (exec_cnt < CRITICAL_THRESHOLD) { 773 773 exec_cbs(&CPU->rcu.cur_cbs); … … 784 784 preemption_enable(); 785 785 } 786 786 787 787 CPU->rcu.cur_cbs_cnt = 0; 788 788 CPU->rcu.next_cbs_cnt = 0; … … 815 815 rcu_item_t *next = rcu_item->next; 816 816 rcu_func_t func = rcu_item->func; 817 817 818 818 func(rcu_item); 819 819 820 820 rcu_item = next; 821 821 } 822 822 823 823 *phead = NULL; 824 824 } … … 843 843 CPU->rcu.cur_cbs_cnt = CPU->rcu.next_cbs_cnt; 844 844 CPU->rcu.cur_cbs_gp = CPU->rcu.next_cbs_gp; 845 845 846 846 /* Move arriving_cbs to next_cbs. */ 847 847 848 848 CPU->rcu.next_cbs_cnt = CPU->rcu.arriving_cbs_cnt; 849 849 CPU->rcu.arriving_cbs_cnt = 0; 850 850 851 851 /* 852 852 * Too many callbacks queued. Better speed up the detection … … 859 859 /* Start moving the arriving_cbs list to next_cbs. */ 860 860 CPU->rcu.next_cbs = CPU->rcu.arriving_cbs; 861 861 862 862 /* 863 863 * At least one callback arrived. The tail therefore does not point … … 866 866 if (CPU->rcu.next_cbs) { 867 867 assert(CPU->rcu.parriving_cbs_tail != &CPU->rcu.arriving_cbs); 868 868 869 869 CPU->rcu.arriving_cbs = NULL; 870 870 /* Reset arriving_cbs before updating the tail pointer. */ … … 883 883 /* Update statistics of arrived callbacks. */ 884 884 upd_stat_cb_cnts(CPU->rcu.next_cbs_cnt); 885 885 886 886 /* 887 887 * Make changes prior to queuing next_cbs visible to readers. … … 891 891 892 892 /* At the end of next_cbs_gp, exec next_cbs. Determine what GP that is. */ 893 893 894 894 if (!next_cbs_empty()) { 895 895 spinlock_lock(&rcu.gp_lock); 896 896 897 897 /* Exec next_cbs at the end of the next GP. */ 898 898 CPU->rcu.next_cbs_gp = _rcu_cur_gp + 1; 899 899 900 900 /* 901 901 * There are no callbacks to invoke before next_cbs. Instruct … … 908 908 CPU->rcu.cur_cbs_gp = rcu.completed_gp + 1; 909 909 } 910 910 911 911 spinlock_unlock(&rcu.gp_lock); 912 912 } else { 913 913 CPU->rcu.next_cbs_gp = CPU->rcu.cur_cbs_gp; 914 914 } 915 915 916 916 assert(CPU->rcu.cur_cbs_gp <= CPU->rcu.next_cbs_gp); 917 917 918 918 return expedite; 919 919 } … … 936 936 assert(CPU->rcu.cur_cbs_gp <= CPU->rcu.next_cbs_gp); 937 937 assert(CPU->rcu.cur_cbs_gp <= _rcu_cur_gp + 1); 938 938 939 939 while (rcu.completed_gp < CPU->rcu.cur_cbs_gp) { 940 940 /* GP has not yet started - start a new one. */ … … 952 952 } else { 953 953 /* GP detection is in progress.*/ 954 954 955 955 if (expedite) 956 956 condvar_signal(&rcu.expedite_now); 957 957 958 958 /* Wait for the GP to complete. */ 959 959 errno_t ret = _condvar_wait_timeout_spinlock(&rcu.gp_ended, &rcu.gp_lock, 960 960 SYNCH_NO_TIMEOUT, SYNCH_FLAGS_INTERRUPTIBLE); 961 961 962 962 if (ret == EINTR) { 963 963 spinlock_unlock(&rcu.gp_lock); … … 966 966 } 967 967 } 968 968 969 969 upd_missed_gp_in_wait(rcu.completed_gp); 970 970 971 971 *completed_gp = rcu.completed_gp; 972 972 spinlock_unlock(&rcu.gp_lock); 973 973 974 974 return true; 975 975 } … … 978 978 { 979 979 DEFINE_CPU_MASK(reader_cpus); 980 980 981 981 cpu_mask_active(reader_cpus); 982 982 rm_quiescent_cpus(reader_cpus); 983 983 984 984 while (!cpu_mask_is_none(reader_cpus)) { 985 985 /* Give cpus a chance to context switch (a QS) and batch callbacks. */ 986 986 if(!gp_sleep(&expedite)) 987 987 return false; 988 988 989 989 rm_quiescent_cpus(reader_cpus); 990 990 sample_cpus(reader_cpus, reader_cpus); 991 991 } 992 992 993 993 /* Update statistic. */ 994 994 if (expedite) { 995 995 ++rcu.stat_expedited_cnt; 996 996 } 997 997 998 998 /* 999 999 * All cpus have passed through a QS and see the most recent _rcu_cur_gp. … … 1032 1032 assert(interrupts_disabled()); 1033 1033 cpu_mask_t *reader_cpus = (cpu_mask_t *)arg; 1034 1034 1035 1035 bool locked = RCU_CNT_INC <= THE->rcu_nesting; 1036 1036 /* smp_call machinery makes the most current _rcu_cur_gp visible. */ 1037 1037 bool passed_qs = (CPU->rcu.last_seen_gp == _rcu_cur_gp); 1038 1038 1039 1039 if (locked && !passed_qs) { 1040 1040 /* … … 1062 1062 */ 1063 1063 size_t nesting_cnt = local_atomic_exchange(&THE->rcu_nesting, 0); 1064 1064 1065 1065 /* 1066 1066 * Ensures NMIs see .rcu_nesting without the WAS_PREEMPTED mark and … … 1068 1068 */ 1069 1069 compiler_barrier(); 1070 1070 1071 1071 /* Preempted a reader critical section for the first time. */ 1072 1072 if (RCU_CNT_INC <= nesting_cnt && !(nesting_cnt & RCU_WAS_PREEMPTED)) { … … 1074 1074 note_preempted_reader(); 1075 1075 } 1076 1076 1077 1077 /* Save the thread's nesting count when it is not running. */ 1078 1078 THREAD->rcu.nesting_cnt = nesting_cnt; … … 1110 1110 THREAD->priority = -1; 1111 1111 } 1112 1112 1113 1113 upd_max_cbs_in_slice(CPU->rcu.arriving_cbs_cnt); 1114 1114 } … … 1118 1118 { 1119 1119 assert(!rcu_read_locked()); 1120 1120 1121 1121 /* Load the thread's saved nesting count from before it was preempted. */ 1122 1122 THE->rcu_nesting = THREAD->rcu.nesting_cnt; … … 1131 1131 { 1132 1132 assert(THE->rcu_nesting == 0); 1133 1133 1134 1134 /* 1135 1135 * The thread forgot to exit its reader critical section. … … 1159 1159 { 1160 1160 assert(0 == THE->rcu_nesting || RCU_WAS_PREEMPTED == THE->rcu_nesting); 1161 1161 1162 1162 size_t prev = local_atomic_exchange(&THE->rcu_nesting, 0); 1163 1163 if (prev == RCU_WAS_PREEMPTED) { … … 1212 1212 return true; 1213 1213 } 1214 1214 1215 1215 spinlock_lock(&rcu.gp_lock); 1216 1216 1217 1217 if (CPU->rcu.cur_cbs_gp <= rcu.completed_gp) { 1218 1218 *completed_gp = rcu.completed_gp; … … 1220 1220 return true; 1221 1221 } 1222 1222 1223 1223 assert(CPU->rcu.cur_cbs_gp <= CPU->rcu.next_cbs_gp); 1224 1224 assert(_rcu_cur_gp <= CPU->rcu.cur_cbs_gp); 1225 1225 1226 1226 /* 1227 1227 * Notify the detector of how many GP ends we intend to wait for, so … … 1231 1231 size_t remaining_gp_ends = (size_t) (CPU->rcu.next_cbs_gp - _rcu_cur_gp); 1232 1232 req_detection(remaining_gp_ends + (arriving_cbs_empty() ? 0 : 1)); 1233 1233 1234 1234 /* 1235 1235 * Ask the detector to speed up GP detection if there are too many … … 1239 1239 if(0 == rcu.req_expedited_cnt) 1240 1240 condvar_signal(&rcu.expedite_now); 1241 1241 1242 1242 /* 1243 1243 * Expedite only cub_cbs. If there really is a surge of callbacks … … 1250 1250 /* Wait for cur_cbs_gp to end. */ 1251 1251 bool interrupted = cv_wait_for_gp(CPU->rcu.cur_cbs_gp); 1252 1252 1253 1253 *completed_gp = rcu.completed_gp; 1254 1254 spinlock_unlock(&rcu.gp_lock); 1255 1255 1256 1256 if (!interrupted) 1257 1257 upd_missed_gp_in_wait(*completed_gp); 1258 1258 1259 1259 return !interrupted; 1260 1260 } … … 1264 1264 { 1265 1265 assert(spinlock_locked(&rcu.gp_lock)); 1266 1266 1267 1267 bool interrupted = false; 1268 1268 1269 1269 /* Wait until wait_on_gp ends. */ 1270 1270 while (rcu.completed_gp < wait_on_gp && !interrupted) { … … 1273 1273 interrupted = (ret == EINTR); 1274 1274 } 1275 1275 1276 1276 return interrupted; 1277 1277 } … … 1296 1296 { 1297 1297 spinlock_lock(&rcu.gp_lock); 1298 1298 1299 1299 while (wait_for_detect_req()) { 1300 1300 /* … … 1303 1303 */ 1304 1304 start_new_gp(); 1305 1305 1306 1306 spinlock_unlock(&rcu.gp_lock); 1307 1307 1308 1308 if (!wait_for_readers()) 1309 1309 goto unlocked_out; 1310 1310 1311 1311 spinlock_lock(&rcu.gp_lock); 1312 1312 … … 1314 1314 end_cur_gp(); 1315 1315 } 1316 1316 1317 1317 spinlock_unlock(&rcu.gp_lock); 1318 1318 1319 1319 unlocked_out: 1320 1320 return; … … 1325 1325 { 1326 1326 assert(spinlock_locked(&rcu.gp_lock)); 1327 1327 1328 1328 bool interrupted = false; 1329 1329 1330 1330 while (0 == rcu.req_gp_end_cnt && !interrupted) { 1331 1331 int ret = _condvar_wait_timeout_spinlock(&rcu.req_gp_changed, 1332 1332 &rcu.gp_lock, SYNCH_NO_TIMEOUT, SYNCH_FLAGS_INTERRUPTIBLE); 1333 1333 1334 1334 interrupted = (ret == EINTR); 1335 1335 } 1336 1336 1337 1337 return !interrupted; 1338 1338 } … … 1342 1342 { 1343 1343 assert(spinlock_locked(&rcu.gp_lock)); 1344 1344 1345 1345 rcu.completed_gp = _rcu_cur_gp; 1346 1346 --rcu.req_gp_end_cnt; 1347 1347 1348 1348 condvar_broadcast(&rcu.gp_ended); 1349 1349 } … … 1353 1353 { 1354 1354 DEFINE_CPU_MASK(reading_cpus); 1355 1355 1356 1356 /* All running cpus have potential readers. */ 1357 1357 cpu_mask_active(reading_cpus); … … 1363 1363 if (!gp_sleep()) 1364 1364 return false; 1365 1365 1366 1366 /* Non-intrusively determine which cpus have yet to pass a QS. */ 1367 1367 rm_quiescent_cpus(reading_cpus); 1368 1368 1369 1369 /* Actively interrupt cpus delaying the current GP and demand a QS. */ 1370 1370 interrupt_delaying_cpus(reading_cpus); 1371 1371 1372 1372 /* Wait for the interrupted cpus to notify us that they reached a QS. */ 1373 1373 if (!wait_for_delaying_cpus()) … … 1378 1378 * monotonically descreases. 1379 1379 */ 1380 1380 1381 1381 /* Wait for the last reader in cur_preempted to notify us it is done. */ 1382 1382 if (!wait_for_preempt_reader()) 1383 1383 return false; 1384 1384 1385 1385 return true; 1386 1386 } … … 1397 1397 DETECT_SLEEP_MS * 1000, SYNCH_FLAGS_INTERRUPTIBLE); 1398 1398 } 1399 1399 1400 1400 if (0 < rcu.req_expedited_cnt) { 1401 1401 --rcu.req_expedited_cnt; … … 1403 1403 ++rcu.stat_expedited_cnt; 1404 1404 } 1405 1405 1406 1406 spinlock_unlock(&rcu.gp_lock); 1407 1407 1408 1408 return (ret != EINTR); 1409 1409 } … … 1413 1413 { 1414 1414 atomic_set(&rcu.delaying_cpu_cnt, 0); 1415 1415 1416 1416 sample_cpus(cpu_mask, NULL); 1417 1417 } … … 1426 1426 assert(interrupts_disabled()); 1427 1427 assert(!CPU->rcu.is_delaying_gp); 1428 1428 1429 1429 /* Cpu did not pass a quiescent state yet. */ 1430 1430 if (CPU->rcu.last_seen_gp != _rcu_cur_gp) { … … 1440 1440 ACCESS_ONCE(CPU->rcu.is_delaying_gp) = true; 1441 1441 CPU->rcu.signal_unlock = true; 1442 1442 1443 1443 atomic_inc(&rcu.delaying_cpu_cnt); 1444 1444 } else { … … 1466 1466 */ 1467 1467 } 1468 1468 1469 1469 /* 1470 1470 * smp_call() makes sure any changes propagate back to the caller. … … 1483 1483 return false; 1484 1484 } 1485 1485 1486 1486 /* Update statistic. */ 1487 1487 rcu.stat_delayed_cnt += delaying_cpu_cnt; 1488 1488 1489 1489 return true; 1490 1490 } … … 1506 1506 */ 1507 1507 compiler_barrier(); 1508 1508 1509 1509 /* Save the thread's nesting count when it is not running. */ 1510 1510 THREAD->rcu.nesting_cnt = CPU->rcu.nesting_cnt; 1511 1511 1512 1512 /* Preempted a reader critical section for the first time. */ 1513 1513 if (0 < THREAD->rcu.nesting_cnt && !THREAD->rcu.was_preempted) { … … 1515 1515 note_preempted_reader(); 1516 1516 } 1517 1517 1518 1518 /* 1519 1519 * The preempted reader has been noted globally. There are therefore … … 1528 1528 */ 1529 1529 CPU->rcu.nesting_cnt = 0; 1530 1530 1531 1531 /* 1532 1532 * This cpu is holding up the current GP. Let the detector know … … 1553 1553 THREAD->priority = -1; 1554 1554 } 1555 1555 1556 1556 upd_max_cbs_in_slice(CPU->rcu.arriving_cbs_cnt); 1557 1557 } … … 1562 1562 assert(PREEMPTION_DISABLED || interrupts_disabled()); 1563 1563 assert(0 == CPU->rcu.nesting_cnt); 1564 1564 1565 1565 /* Load the thread's saved nesting count from before it was preempted. */ 1566 1566 CPU->rcu.nesting_cnt = THREAD->rcu.nesting_cnt; 1567 1567 1568 1568 /* 1569 1569 * Ensures NMI see the proper nesting count before .signal_unlock. … … 1572 1572 */ 1573 1573 compiler_barrier(); 1574 1574 1575 1575 /* 1576 1576 * In the unlikely event that a NMI occurs between the loading of the … … 1594 1594 assert(THREAD->state == Exiting); 1595 1595 assert(PREEMPTION_DISABLED || interrupts_disabled()); 1596 1596 1597 1597 /* 1598 1598 * The thread forgot to exit its reader critical section. … … 1617 1617 { 1618 1618 assert(spinlock_locked(&rcu.gp_lock)); 1619 1619 1620 1620 irq_spinlock_lock(&rcu.preempt_lock, true); 1621 1621 1622 1622 /* Start a new GP. Announce to readers that a quiescent state is needed. */ 1623 1623 ++_rcu_cur_gp; 1624 1624 1625 1625 /* 1626 1626 * Readers preempted before the start of this GP (next_preempted) … … 1632 1632 */ 1633 1633 list_concat(&rcu.cur_preempted, &rcu.next_preempted); 1634 1634 1635 1635 irq_spinlock_unlock(&rcu.preempt_lock, true); 1636 1636 } … … 1694 1694 */ 1695 1695 memory_barrier(); /* MB C */ 1696 1696 1697 1697 cpu_mask_for_each(*cpu_mask, cpu_id) { 1698 1698 /* … … 1707 1707 */ 1708 1708 bool cpu_acked_gp = (cpus[cpu_id].rcu.last_seen_gp == _rcu_cur_gp); 1709 1709 1710 1710 /* 1711 1711 * Either the cpu is idle or it is exiting away from idle mode … … 1714 1714 */ 1715 1715 bool cpu_idle = cpus[cpu_id].idle; 1716 1716 1717 1717 if (cpu_acked_gp || cpu_idle) { 1718 1718 cpu_mask_reset(cpu_mask, cpu_id); … … 1736 1736 { 1737 1737 assert(CPU->rcu.cur_cbs_gp <= completed_gp); 1738 1738 1739 1739 size_t delta = (size_t)(completed_gp - CPU->rcu.cur_cbs_gp); 1740 1740 CPU->rcu.stat_missed_gp_in_wait += delta; … … 1764 1764 { 1765 1765 irq_spinlock_lock(&rcu.preempt_lock, true); 1766 1766 1767 1767 assert(link_used(&THREAD->rcu.preempt_link)); 1768 1768 … … 1793 1793 bool reader_exists = !list_empty(&rcu.cur_preempted); 1794 1794 rcu.preempt_blocking_det = reader_exists; 1795 1795 1796 1796 irq_spinlock_unlock(&rcu.preempt_lock, true); 1797 1797 1798 1798 if (reader_exists) { 1799 1799 /* Update statistic. */ 1800 1800 ++rcu.stat_preempt_blocking_cnt; 1801 1801 1802 1802 return semaphore_down_interruptable(&rcu.remaining_readers); 1803 1803 } 1804 1804 1805 1805 return true; 1806 1806 } … … 1809 1809 { 1810 1810 rcu_cpu_data_t *cr = &CPU->rcu; 1811 1811 1812 1812 if (arriving_cbs_cnt > cr->last_arriving_cnt) { 1813 1813 size_t arrived_cnt = arriving_cbs_cnt - cr->last_arriving_cnt; 1814 1814 cr->stat_max_slice_cbs = max(arrived_cnt, cr->stat_max_slice_cbs); 1815 1815 } 1816 1816 1817 1817 cr->last_arriving_cnt = arriving_cbs_cnt; 1818 1818 } … … 1826 1826 * are no locks to lock in order to get up-to-date values. 1827 1827 */ 1828 1828 1829 1829 #ifdef RCU_PREEMPT_PODZIMEK 1830 1830 const char *algo = "podzimek-preempt-rcu"; … … 1832 1832 const char *algo = "a-preempt-rcu"; 1833 1833 #endif 1834 1834 1835 1835 printf("Config: expedite_threshold=%d, critical_threshold=%d," 1836 1836 " detect_sleep=%dms, %s\n", … … 1843 1843 "running or not)\n", rcu.stat_preempt_blocking_cnt); 1844 1844 printf("Smp calls: %zu\n", rcu.stat_smp_call_cnt); 1845 1845 1846 1846 printf("Max arrived callbacks per GP and CPU:\n"); 1847 1847 for (unsigned int i = 0; i < config.cpu_count; ++i) { … … 1853 1853 printf(" %zu", cpus[i].rcu.stat_avg_cbs); 1854 1854 } 1855 1855 1856 1856 printf("\nMax arrived callbacks per time slice and CPU:\n"); 1857 1857 for (unsigned int i = 0; i < config.cpu_count; ++i) { -
kernel/generic/src/synch/smp_memory_barrier.c
r3061bc1 ra35b458 56 56 smp_call(cpu_id, issue_mem_bar, NULL); 57 57 } 58 58 59 59 return 0; 60 60 } -
kernel/generic/src/synch/spinlock.c
r3061bc1 ra35b458 77 77 size_t i = 0; 78 78 bool deadlock_reported = false; 79 79 80 80 preemption_disable(); 81 81 while (test_and_set(&lock->val)) { … … 101 101 if (lock->name[0] == '*') 102 102 continue; 103 103 104 104 if (i++ > DEADLOCK_THRESHOLD) { 105 105 printf("cpu%u: looping on spinlock %p:%s, " … … 107 107 (void *) CALLER, symtab_fmt_name_lookup(CALLER)); 108 108 stack_trace(); 109 109 110 110 i = 0; 111 111 deadlock_reported = true; 112 112 } 113 113 } 114 114 115 115 if (deadlock_reported) 116 116 printf("cpu%u: not deadlocked\n", CPU->id); 117 117 118 118 /* 119 119 * Prevent critical section code from bleeding out this way up. … … 131 131 { 132 132 ASSERT_SPINLOCK(spinlock_locked(lock), lock); 133 133 134 134 /* 135 135 * Prevent critical section code from bleeding out this way down. 136 136 */ 137 137 CS_LEAVE_BARRIER(); 138 138 139 139 atomic_set(&lock->val, 0); 140 140 preemption_enable(); … … 157 157 preemption_disable(); 158 158 bool ret = !test_and_set(&lock->val); 159 159 160 160 /* 161 161 * Prevent critical section code from bleeding out this way up. 162 162 */ 163 163 CS_ENTER_BARRIER(); 164 164 165 165 if (!ret) 166 166 preemption_enable(); 167 167 168 168 return ret; 169 169 } … … 208 208 ipl_t ipl = interrupts_disable(); 209 209 spinlock_lock(&(lock->lock)); 210 210 211 211 lock->guard = true; 212 212 lock->ipl = ipl; 213 213 } else { 214 214 ASSERT_IRQ_SPINLOCK(interrupts_disabled(), lock); 215 215 216 216 spinlock_lock(&(lock->lock)); 217 217 ASSERT_IRQ_SPINLOCK(!lock->guard, lock); … … 231 231 { 232 232 ASSERT_IRQ_SPINLOCK(interrupts_disabled(), lock); 233 233 234 234 if (irq_res) { 235 235 ASSERT_IRQ_SPINLOCK(lock->guard, lock); 236 236 237 237 lock->guard = false; 238 238 ipl_t ipl = lock->ipl; 239 239 240 240 spinlock_unlock(&(lock->lock)); 241 241 interrupts_restore(ipl); … … 261 261 ASSERT_IRQ_SPINLOCK(interrupts_disabled(), lock); 262 262 bool ret = spinlock_trylock(&(lock->lock)); 263 263 264 264 ASSERT_IRQ_SPINLOCK((!ret) || (!lock->guard), lock); 265 265 return ret; … … 280 280 { 281 281 ASSERT_IRQ_SPINLOCK(interrupts_disabled(), unlock); 282 282 283 283 /* Pass guard from unlock to lock */ 284 284 bool guard = unlock->guard; 285 285 ipl_t ipl = unlock->ipl; 286 286 unlock->guard = false; 287 287 288 288 spinlock_unlock(&(unlock->lock)); 289 289 spinlock_lock(&(lock->lock)); 290 290 291 291 ASSERT_IRQ_SPINLOCK(!lock->guard, lock); 292 292 293 293 if (guard) { 294 294 lock->guard = true; … … 311 311 { 312 312 ASSERT_IRQ_SPINLOCK(interrupts_disabled(), unlock); 313 313 314 314 spinlock_lock(&(lock->lock)); 315 315 ASSERT_IRQ_SPINLOCK(!lock->guard, lock); 316 316 317 317 /* Pass guard from unlock to lock */ 318 318 if (unlock->guard) { … … 321 321 unlock->guard = false; 322 322 } 323 323 324 324 spinlock_unlock(&(unlock->lock)); 325 325 } -
kernel/generic/src/synch/waitq.c
r3061bc1 ra35b458 94 94 bool do_wakeup = false; 95 95 DEADLOCK_PROBE_INIT(p_wqlock); 96 96 97 97 irq_spinlock_lock(&threads_lock, false); 98 98 if (!thread_exists(thread)) 99 99 goto out; 100 100 101 101 grab_locks: 102 102 irq_spinlock_lock(&thread->lock, false); 103 103 104 104 waitq_t *wq; 105 105 if ((wq = thread->sleep_queue)) { /* Assignment */ … … 110 110 goto grab_locks; 111 111 } 112 112 113 113 list_remove(&thread->wq_link); 114 114 thread->saved_context = thread->sleep_timeout_context; … … 117 117 irq_spinlock_unlock(&wq->lock, false); 118 118 } 119 119 120 120 thread->timeout_pending = false; 121 121 irq_spinlock_unlock(&thread->lock, false); 122 122 123 123 if (do_wakeup) 124 124 thread_ready(thread); 125 125 126 126 out: 127 127 irq_spinlock_unlock(&threads_lock, false); … … 144 144 bool do_wakeup = false; 145 145 DEADLOCK_PROBE_INIT(p_wqlock); 146 146 147 147 /* 148 148 * The thread is quaranteed to exist because 149 149 * threads_lock is held. 150 150 */ 151 151 152 152 grab_locks: 153 153 irq_spinlock_lock(&thread->lock, false); 154 154 155 155 waitq_t *wq; 156 156 if ((wq = thread->sleep_queue)) { /* Assignment */ … … 162 162 return; 163 163 } 164 164 165 165 if (!irq_spinlock_trylock(&wq->lock)) { 166 166 /* Avoid deadlock */ … … 169 169 goto grab_locks; 170 170 } 171 171 172 172 if ((thread->timeout_pending) && 173 173 (timeout_unregister(&thread->sleep_timeout))) 174 174 thread->timeout_pending = false; 175 175 176 176 list_remove(&thread->wq_link); 177 177 thread->saved_context = thread->sleep_interruption_context; … … 180 180 irq_spinlock_unlock(&wq->lock, false); 181 181 } 182 182 183 183 irq_spinlock_unlock(&thread->lock, false); 184 184 185 185 if (do_wakeup) 186 186 thread_ready(thread); … … 198 198 { 199 199 irq_spinlock_lock(&wq->lock, true); 200 200 201 201 if (!list_empty(&wq->sleepers)) { 202 202 thread_t *thread = list_get_instance(list_first(&wq->sleepers), 203 203 thread_t, wq_link); 204 204 205 205 irq_spinlock_lock(&thread->lock, false); 206 206 207 207 assert(thread->sleep_interruptible); 208 208 209 209 if ((thread->timeout_pending) && 210 210 (timeout_unregister(&thread->sleep_timeout))) 211 211 thread->timeout_pending = false; 212 212 213 213 list_remove(&thread->wq_link); 214 214 thread->saved_context = thread->sleep_interruption_context; 215 215 thread->sleep_queue = NULL; 216 216 217 217 irq_spinlock_unlock(&thread->lock, false); 218 218 thread_ready(thread); 219 219 } 220 220 221 221 irq_spinlock_unlock(&wq->lock, true); 222 222 } … … 271 271 { 272 272 assert((!PREEMPTION_DISABLED) || (PARAM_NON_BLOCKING(flags, usec))); 273 273 274 274 ipl_t ipl = waitq_sleep_prepare(wq); 275 275 bool nblocked; … … 296 296 { 297 297 ipl_t ipl; 298 298 299 299 restart: 300 300 ipl = interrupts_disable(); 301 301 302 302 if (THREAD) { /* Needed during system initiailzation */ 303 303 /* … … 310 310 */ 311 311 irq_spinlock_lock(&THREAD->lock, false); 312 312 313 313 if (THREAD->timeout_pending) { 314 314 irq_spinlock_unlock(&THREAD->lock, false); … … 316 316 goto restart; 317 317 } 318 318 319 319 irq_spinlock_unlock(&THREAD->lock, false); 320 320 } 321 321 322 322 irq_spinlock_lock(&wq->lock, false); 323 323 return ipl; … … 354 354 irq_spinlock_unlock(&wq->lock, false); 355 355 } 356 356 357 357 interrupts_restore(ipl); 358 358 } … … 387 387 } 388 388 } 389 389 390 390 /* 391 391 * Now we are firmly decided to go to sleep. … … 393 393 */ 394 394 irq_spinlock_lock(&THREAD->lock, false); 395 395 396 396 if (flags & SYNCH_FLAGS_INTERRUPTIBLE) { 397 397 /* … … 403 403 return EINTR; 404 404 } 405 405 406 406 /* 407 407 * Set context that will be restored if the sleep … … 417 417 } else 418 418 THREAD->sleep_interruptible = false; 419 419 420 420 if (usec) { 421 421 /* We use the timeout variant. */ … … 426 426 return ETIMEOUT; 427 427 } 428 428 429 429 THREAD->timeout_pending = true; 430 430 timeout_register(&THREAD->sleep_timeout, (uint64_t) usec, 431 431 waitq_sleep_timed_out, THREAD); 432 432 } 433 433 434 434 list_append(&THREAD->wq_link, &wq->sleepers); 435 435 436 436 /* 437 437 * Suspend execution. … … 440 440 THREAD->state = Sleeping; 441 441 THREAD->sleep_queue = wq; 442 442 443 443 /* Must be before entry to scheduler, because there are multiple 444 444 * return vectors. 445 445 */ 446 446 *blocked = true; 447 447 448 448 irq_spinlock_unlock(&THREAD->lock, false); 449 449 450 450 /* wq->lock is released in scheduler_separated_stack() */ 451 451 scheduler(); 452 452 453 453 return EOK; 454 454 } … … 511 511 { 512 512 assert(interrupts_disabled()); 513 513 514 514 irq_spinlock_lock(&wq->lock, false); 515 515 irq_spinlock_unlock(&wq->lock, false); … … 536 536 assert(interrupts_disabled()); 537 537 assert(irq_spinlock_locked(&wq->lock)); 538 538 539 539 loop: 540 540 if (list_empty(&wq->sleepers)) { … … 542 542 if ((count) && (mode == WAKEUP_ALL)) 543 543 wq->missed_wakeups--; 544 544 545 545 return; 546 546 } 547 547 548 548 count++; 549 549 thread_t *thread = list_get_instance(list_first(&wq->sleepers), 550 550 thread_t, wq_link); 551 551 552 552 /* 553 553 * Lock the thread prior to removing it from the wq. … … 569 569 irq_spinlock_lock(&thread->lock, false); 570 570 list_remove(&thread->wq_link); 571 571 572 572 if ((thread->timeout_pending) && 573 573 (timeout_unregister(&thread->sleep_timeout))) 574 574 thread->timeout_pending = false; 575 575 576 576 thread->sleep_queue = NULL; 577 577 irq_spinlock_unlock(&thread->lock, false); 578 578 579 579 thread_ready(thread); 580 580 581 581 if (mode == WAKEUP_ALL) 582 582 goto loop; -
kernel/generic/src/synch/workqueue.c
r3061bc1 ra35b458 59 59 */ 60 60 IRQ_SPINLOCK_DECLARE(lock); 61 61 62 62 /* Activates a worker if new work arrives or if shutting down the queue. */ 63 63 condvar_t activate_worker; 64 64 65 65 /* Queue of work_items ready to be dispatched. */ 66 66 list_t queue; 67 67 68 68 /* List of worker threads. */ 69 69 list_t workers; 70 70 71 71 /* Number of work items queued. */ 72 72 size_t item_cnt; 73 73 74 74 /* Indicates the work queue is shutting down. */ 75 75 bool stopping; … … 84 84 /* Number of blocked workers sleeping in work func() (ie not idle). */ 85 85 size_t blocked_worker_cnt; 86 86 87 87 /* Number of pending signal_worker_op() operations. */ 88 88 size_t pending_op_cnt; 89 89 90 90 link_t nb_link; 91 91 92 92 #ifdef CONFIG_DEBUG 93 93 /* Magic cookie for integrity checks. Immutable. Accessed without lock. */ … … 105 105 /** Max number of work items per active worker before a new worker is activated.*/ 106 106 static const size_t max_items_per_worker = 8; 107 107 108 108 /** System wide work queue. */ 109 109 static struct work_queue g_work_queue; … … 157 157 */ 158 158 booting = false; 159 159 160 160 nonblock_init(); 161 161 162 162 if (!add_worker(&g_work_queue)) 163 163 panic("Could not create a single global work queue worker!\n"); 164 164 165 165 } 166 166 … … 174 174 /* Maximum concurrency without slowing down the system. */ 175 175 max_concurrent_workers = max(2, config.cpu_count); 176 176 177 177 workq_preinit(&g_work_queue, "kworkq"); 178 178 } … … 188 188 { 189 189 struct work_queue *workq = malloc(sizeof(struct work_queue), 0); 190 190 191 191 if (workq) { 192 192 if (workq_init(workq, name)) { … … 194 194 return workq; 195 195 } 196 196 197 197 free(workq); 198 198 } 199 199 200 200 return NULL; 201 201 } … … 205 205 { 206 206 assert(!workq_corrupted(workq)); 207 207 208 208 irq_spinlock_lock(&workq->lock, true); 209 209 bool stopped = workq->stopping; … … 212 212 #endif 213 213 irq_spinlock_unlock(&workq->lock, true); 214 214 215 215 if (!stopped) { 216 216 workq_stop(workq); … … 218 218 assert(0 == running_workers); 219 219 } 220 220 221 221 #ifdef CONFIG_DEBUG 222 222 workq->cookie = 0; 223 223 #endif 224 224 225 225 free(workq); 226 226 } … … 232 232 workq->cookie = WORKQ_MAGIC; 233 233 #endif 234 234 235 235 irq_spinlock_initialize(&workq->lock, name); 236 236 condvar_initialize(&workq->activate_worker); 237 237 238 238 list_initialize(&workq->queue); 239 239 list_initialize(&workq->workers); 240 240 241 241 workq->item_cnt = 0; 242 242 workq->stopping = false; 243 243 workq->name = name; 244 244 245 245 workq->cur_worker_cnt = 1; 246 246 workq->idle_worker_cnt = 0; 247 247 workq->activate_pending = 0; 248 248 workq->blocked_worker_cnt = 0; 249 249 250 250 workq->pending_op_cnt = 0; 251 251 link_initialize(&workq->nb_link); … … 270 270 thread_t *thread = thread_create(worker_thread, workq, TASK, 271 271 THREAD_FLAG_NONE, workq->name); 272 272 273 273 if (!thread) { 274 274 irq_spinlock_lock(&workq->lock, true); 275 275 276 276 /* cur_worker_cnt proactively increased in signal_worker_logic() .*/ 277 277 assert(0 < workq->cur_worker_cnt); 278 278 --workq->cur_worker_cnt; 279 279 280 280 irq_spinlock_unlock(&workq->lock, true); 281 281 return false; 282 282 } 283 283 284 284 /* Respect lock ordering. */ 285 285 irq_spinlock_lock(&thread->lock, true); … … 290 290 if (!workq->stopping) { 291 291 success = true; 292 292 293 293 /* Try to distribute workers among cpus right away. */ 294 294 unsigned int cpu_id = (workq->cur_worker_cnt) % config.cpu_active; 295 295 296 296 if (!cpus[cpu_id].active) 297 297 cpu_id = CPU->id; … … 312 312 */ 313 313 success = false; 314 314 315 315 /* cur_worker_cnt proactively increased in signal_worker() .*/ 316 316 assert(0 < workq->cur_worker_cnt); 317 317 --workq->cur_worker_cnt; 318 318 } 319 319 320 320 irq_spinlock_unlock(&workq->lock, false); 321 321 irq_spinlock_unlock(&thread->lock, true); … … 324 324 thread_interrupt(thread); 325 325 } 326 326 327 327 thread_ready(thread); 328 328 329 329 return success; 330 330 } … … 337 337 { 338 338 assert(!workq_corrupted(workq)); 339 339 340 340 interrupt_workers(workq); 341 341 wait_for_workers(workq); … … 350 350 assert(!workq->stopping); 351 351 workq->stopping = true; 352 352 353 353 /* Respect lock ordering - do not hold workq->lock during broadcast. */ 354 354 irq_spinlock_unlock(&workq->lock, true); 355 355 356 356 condvar_broadcast(&workq->activate_worker); 357 357 } … … 361 361 { 362 362 assert(!PREEMPTION_DISABLED); 363 363 364 364 irq_spinlock_lock(&workq->lock, true); 365 365 366 366 list_foreach_safe(workq->workers, cur_worker, next_worker) { 367 367 thread_t *worker = list_get_instance(cur_worker, thread_t, workq_link); … … 370 370 /* Wait without the lock. */ 371 371 irq_spinlock_unlock(&workq->lock, true); 372 372 373 373 thread_join(worker); 374 374 thread_detach(worker); 375 375 376 376 irq_spinlock_lock(&workq->lock, true); 377 377 } 378 378 379 379 assert(list_empty(&workq->workers)); 380 380 381 381 /* Wait for deferred add_worker_op(), signal_worker_op() to finish. */ 382 382 while (0 < workq->cur_worker_cnt || 0 < workq->pending_op_cnt) { 383 383 irq_spinlock_unlock(&workq->lock, true); 384 384 385 385 scheduler(); 386 386 387 387 irq_spinlock_lock(&workq->lock, true); 388 388 } 389 389 390 390 irq_spinlock_unlock(&workq->lock, true); 391 391 } … … 422 422 * until func() is entered. 423 423 * @param func User supplied function to invoke in a worker thread. 424 424 425 425 * @return false if work queue is shutting down; function is not 426 426 * queued for further processing. … … 442 442 * until func() is entered. 443 443 * @param func User supplied function to invoke in a worker thread. 444 444 445 445 * @return false if work queue is shutting down; function is not 446 446 * queued for further processing. … … 467 467 * @param func User supplied function to invoke in a worker thread. 468 468 * @param can_block May adding this work item block? 469 469 470 470 * @return false if work queue is shutting down; function is not 471 471 * queued for further processing. … … 476 476 { 477 477 assert(!workq_corrupted(workq)); 478 478 479 479 bool success = true; 480 480 signal_op_t signal_op = NULL; 481 481 482 482 irq_spinlock_lock(&workq->lock, true); 483 483 484 484 if (workq->stopping) { 485 485 success = false; … … 489 489 ++workq->item_cnt; 490 490 success = true; 491 491 492 492 if (!booting) { 493 493 signal_op = signal_worker_logic(workq, can_block); … … 499 499 } 500 500 } 501 501 502 502 irq_spinlock_unlock(&workq->lock, true); 503 503 … … 505 505 signal_op(workq); 506 506 } 507 507 508 508 return success; 509 509 } … … 515 515 work_item->cookie = WORK_ITEM_MAGIC; 516 516 #endif 517 517 518 518 link_initialize(&work_item->queue_link); 519 519 work_item->func = func; … … 524 524 { 525 525 assert(irq_spinlock_locked(&workq->lock)); 526 526 527 527 /* Workers blocked are sleeping in the work function (ie not idle). */ 528 528 assert(workq->blocked_worker_cnt <= workq->cur_worker_cnt); 529 529 /* Idle workers are waiting for more work to arrive in condvar_wait. */ 530 530 assert(workq->idle_worker_cnt <= workq->cur_worker_cnt); 531 531 532 532 /* Idle + blocked workers == sleeping worker threads. */ 533 533 size_t sleeping_workers = workq->blocked_worker_cnt + workq->idle_worker_cnt; 534 534 535 535 assert(sleeping_workers <= workq->cur_worker_cnt); 536 536 /* Workers pending activation are idle workers not yet given a time slice. */ 537 537 assert(workq->activate_pending <= workq->idle_worker_cnt); 538 538 539 539 /* 540 540 * Workers actively running the work func() this very moment and … … 553 553 { 554 554 assert(irq_spinlock_locked(&workq->lock)); 555 555 556 556 /* 557 557 * Workers actively running the work func() and are neither blocked nor … … 578 578 579 579 condvar_signal(&workq->activate_worker); 580 580 581 581 irq_spinlock_lock(&workq->lock, true); 582 582 assert(0 < workq->pending_op_cnt); … … 597 597 assert(!workq_corrupted(workq)); 598 598 assert(irq_spinlock_locked(&workq->lock)); 599 599 600 600 /* Only signal workers if really necessary. */ 601 601 signal_op_t signal_op = NULL; … … 630 630 bool need_worker = (active < max_concurrent_workers) 631 631 && (workq->cur_worker_cnt < max_worker_cnt); 632 632 633 633 if (need_worker && can_block) { 634 634 signal_op = add_worker_op; … … 641 641 ++workq->cur_worker_cnt; 642 642 } 643 643 644 644 /* 645 645 * We cannot create a new worker but we need one desperately … … 648 648 if (need_worker && !can_block && 0 == active) { 649 649 assert(0 == workq->idle_worker_cnt); 650 650 651 651 irq_spinlock_lock(&nonblock_adder.lock, true); 652 652 … … 667 667 signal_op = NULL; 668 668 } 669 669 670 670 return signal_op; 671 671 } … … 682 682 return; 683 683 } 684 684 685 685 assert(arg != NULL); 686 686 687 687 struct work_queue *workq = arg; 688 688 work_t *work_item; 689 689 690 690 while (dequeue_work(workq, &work_item)) { 691 691 /* Copy the func field so func() can safely free work_item. */ … … 700 700 { 701 701 assert(!workq_corrupted(workq)); 702 702 703 703 irq_spinlock_lock(&workq->lock, true); 704 704 705 705 /* Check if we should exit if load is low. */ 706 706 if (!workq->stopping && worker_unnecessary(workq)) { … … 710 710 list_remove(&THREAD->workq_link); 711 711 irq_spinlock_unlock(&workq->lock, true); 712 712 713 713 thread_detach(THREAD); 714 714 return false; 715 715 } 716 716 717 717 bool stop = false; 718 718 719 719 /* Wait for work to arrive. */ 720 720 while (list_empty(&workq->queue) && !workq->stopping) { 721 721 cv_wait(workq); 722 722 723 723 if (0 < workq->activate_pending) 724 724 --workq->activate_pending; … … 729 729 link_t *work_link = list_first(&workq->queue); 730 730 *pwork_item = list_get_instance(work_link, work_t, queue_link); 731 731 732 732 #ifdef CONFIG_DEBUG 733 733 assert(!work_item_corrupted(*pwork_item)); … … 736 736 list_remove(work_link); 737 737 --workq->item_cnt; 738 738 739 739 stop = false; 740 740 } else { … … 744 744 stop = true; 745 745 } 746 746 747 747 irq_spinlock_unlock(&workq->lock, true); 748 748 749 749 return !stop; 750 750 } … … 754 754 { 755 755 assert(irq_spinlock_locked(&workq->lock)); 756 756 757 757 /* No work is pending. We don't need too many idle threads. */ 758 758 if (list_empty(&workq->queue)) { … … 775 775 ++workq->idle_worker_cnt; 776 776 THREAD->workq_idling = true; 777 777 778 778 /* Ignore lock ordering just here. */ 779 779 assert(irq_spinlock_locked(&workq->lock)); 780 780 781 781 _condvar_wait_timeout_irq_spinlock(&workq->activate_worker, 782 782 &workq->lock, SYNCH_NO_TIMEOUT, SYNCH_FLAGS_NONE); … … 784 784 assert(!workq_corrupted(workq)); 785 785 assert(irq_spinlock_locked(&workq->lock)); 786 786 787 787 THREAD->workq_idling = false; 788 788 --workq->idle_worker_cnt; … … 803 803 assert(THREAD != thread); 804 804 assert(!workq_corrupted(thread->workq)); 805 805 806 806 /* Protected by thread->lock */ 807 807 thread->workq_blocked = false; 808 808 809 809 irq_spinlock_lock(&thread->workq->lock, true); 810 810 --thread->workq->blocked_worker_cnt; … … 823 823 assert(!THREAD->workq_blocked); 824 824 assert(!workq_corrupted(THREAD->workq)); 825 825 826 826 THREAD->workq_blocked = true; 827 827 828 828 irq_spinlock_lock(&THREAD->workq->lock, false); 829 829 830 830 ++THREAD->workq->blocked_worker_cnt; 831 831 832 832 bool can_block = false; 833 833 signal_op_t op = signal_worker_logic(THREAD->workq, can_block); 834 834 835 835 irq_spinlock_unlock(&THREAD->workq->lock, false); 836 836 837 837 if (op) { 838 838 assert(add_worker_noblock_op == op || signal_worker_op == op); … … 856 856 const char *load_str = worker_surplus ? "decreasing" : 857 857 (0 < workq->activate_pending) ? "increasing" : "stable"; 858 858 859 859 irq_spinlock_unlock(&workq->lock, true); 860 860 861 861 printf( 862 862 "Configuration: max_worker_cnt=%zu, min_worker_cnt=%zu,\n" … … 893 893 894 894 irq_spinlock_lock(&info->lock, true); 895 895 896 896 while (list_empty(&info->work_queues) && !stop) { 897 897 errno_t ret = _condvar_wait_timeout_irq_spinlock(&info->req_cv, 898 898 &info->lock, SYNCH_NO_TIMEOUT, SYNCH_FLAGS_INTERRUPTIBLE); 899 899 900 900 stop = (ret == EINTR); 901 901 } 902 902 903 903 if (!stop) { 904 904 *pworkq = list_get_instance(list_first(&info->work_queues), … … 906 906 907 907 assert(!workq_corrupted(*pworkq)); 908 908 909 909 list_remove(&(*pworkq)->nb_link); 910 910 } 911 911 912 912 irq_spinlock_unlock(&info->lock, true); 913 913 914 914 return !stop; 915 915 } … … 919 919 nonblock_adder_t *info = arg; 920 920 struct work_queue *workq; 921 921 922 922 while (dequeue_add_req(info, &workq)) { 923 923 add_worker(workq); … … 931 931 condvar_initialize(&nonblock_adder.req_cv); 932 932 list_initialize(&nonblock_adder.work_queues); 933 933 934 934 nonblock_adder.thread = thread_create(thr_nonblock_add_worker, 935 935 &nonblock_adder, TASK, THREAD_FLAG_NONE, "kworkq-nb"); 936 936 937 937 if (nonblock_adder.thread) { 938 938 thread_ready(nonblock_adder.thread);
Note:
See TracChangeset
for help on using the changeset viewer.