Changeset 8565a42 in mainline for kernel/generic/src/synch/workqueue.c


Ignore:
Timestamp:
2018-03-02T20:34:50Z (7 years ago)
Author:
GitHub <noreply@…>
Branches:
lfn, master, serial, ticket/834-toolchain-update, topic/msim-upgrade, topic/simplify-dev-export
Children:
a1a81f69, d5e5fd1
Parents:
3061bc1 (diff), 34e1206 (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the (diff) links above to see all the changes relative to each parent.
git-author:
Jiří Zárevúcky <zarevucky.jiri@…> (2018-03-02 20:34:50)
git-committer:
GitHub <noreply@…> (2018-03-02 20:34:50)
Message:

Remove all trailing whitespace, everywhere.

See individual commit messages for details.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • kernel/generic/src/synch/workqueue.c

    r3061bc1 r8565a42  
    5959         */
    6060        IRQ_SPINLOCK_DECLARE(lock);
    61        
     61
    6262        /* Activates a worker if new work arrives or if shutting down the queue. */
    6363        condvar_t activate_worker;
    64        
     64
    6565        /* Queue of work_items ready to be dispatched. */
    6666        list_t queue;
    67        
     67
    6868        /* List of worker threads. */
    6969        list_t workers;
    70        
     70
    7171        /* Number of work items queued. */
    7272        size_t item_cnt;
    73        
     73
    7474        /* Indicates the work queue is shutting down. */
    7575        bool stopping;
     
    8484        /* Number of blocked workers sleeping in work func() (ie not idle). */
    8585        size_t blocked_worker_cnt;
    86        
     86
    8787        /* Number of pending signal_worker_op() operations. */
    8888        size_t pending_op_cnt;
    89        
     89
    9090        link_t nb_link;
    91        
     91
    9292#ifdef CONFIG_DEBUG
    9393        /* Magic cookie for integrity checks. Immutable. Accessed without lock. */
     
    105105/** Max number of work items per active worker before a new worker is activated.*/
    106106static const size_t max_items_per_worker = 8;
    107        
     107
    108108/** System wide work queue. */
    109109static struct work_queue g_work_queue;
     
    157157         */
    158158        booting = false;
    159        
     159
    160160        nonblock_init();
    161        
     161
    162162        if (!add_worker(&g_work_queue))
    163163                panic("Could not create a single global work queue worker!\n");
    164        
     164
    165165}
    166166
     
    174174        /* Maximum concurrency without slowing down the system. */
    175175        max_concurrent_workers = max(2, config.cpu_count);
    176        
     176
    177177        workq_preinit(&g_work_queue, "kworkq");
    178178}
     
    188188{
    189189        struct work_queue *workq = malloc(sizeof(struct work_queue), 0);
    190        
     190
    191191        if (workq) {
    192192                if (workq_init(workq, name)) {
     
    194194                        return workq;
    195195                }
    196                
     196
    197197                free(workq);
    198198        }
    199        
     199
    200200        return NULL;
    201201}
     
    205205{
    206206        assert(!workq_corrupted(workq));
    207        
     207
    208208        irq_spinlock_lock(&workq->lock, true);
    209209        bool stopped = workq->stopping;
     
    212212#endif
    213213        irq_spinlock_unlock(&workq->lock, true);
    214        
     214
    215215        if (!stopped) {
    216216                workq_stop(workq);
     
    218218                assert(0 == running_workers);
    219219        }
    220        
     220
    221221#ifdef CONFIG_DEBUG
    222222        workq->cookie = 0;
    223223#endif
    224        
     224
    225225        free(workq);
    226226}
     
    232232        workq->cookie = WORKQ_MAGIC;
    233233#endif
    234        
     234
    235235        irq_spinlock_initialize(&workq->lock, name);
    236236        condvar_initialize(&workq->activate_worker);
    237        
     237
    238238        list_initialize(&workq->queue);
    239239        list_initialize(&workq->workers);
    240        
     240
    241241        workq->item_cnt = 0;
    242242        workq->stopping = false;
    243243        workq->name = name;
    244        
     244
    245245        workq->cur_worker_cnt = 1;
    246246        workq->idle_worker_cnt = 0;
    247247        workq->activate_pending = 0;
    248248        workq->blocked_worker_cnt = 0;
    249        
     249
    250250        workq->pending_op_cnt = 0;
    251251        link_initialize(&workq->nb_link);
     
    270270        thread_t *thread = thread_create(worker_thread, workq, TASK,
    271271                THREAD_FLAG_NONE, workq->name);
    272        
     272
    273273        if (!thread) {
    274274                irq_spinlock_lock(&workq->lock, true);
    275                
     275
    276276                /* cur_worker_cnt proactively increased in signal_worker_logic() .*/
    277277                assert(0 < workq->cur_worker_cnt);
    278278                --workq->cur_worker_cnt;
    279                
     279
    280280                irq_spinlock_unlock(&workq->lock, true);
    281281                return false;
    282282        }
    283        
     283
    284284        /* Respect lock ordering. */
    285285        irq_spinlock_lock(&thread->lock, true);
     
    290290        if (!workq->stopping) {
    291291                success = true;
    292                
     292
    293293                /* Try to distribute workers among cpus right away. */
    294294                unsigned int cpu_id = (workq->cur_worker_cnt) % config.cpu_active;
    295                
     295
    296296                if (!cpus[cpu_id].active)
    297297                        cpu_id = CPU->id;
     
    312312                 */
    313313                success = false;
    314                
     314
    315315                /* cur_worker_cnt proactively increased in signal_worker() .*/
    316316                assert(0 < workq->cur_worker_cnt);
    317317                --workq->cur_worker_cnt;
    318318        }
    319        
     319
    320320        irq_spinlock_unlock(&workq->lock, false);
    321321        irq_spinlock_unlock(&thread->lock, true);
     
    324324                thread_interrupt(thread);
    325325        }
    326                
     326
    327327        thread_ready(thread);
    328        
     328
    329329        return success;
    330330}
     
    337337{
    338338        assert(!workq_corrupted(workq));
    339        
     339
    340340        interrupt_workers(workq);
    341341        wait_for_workers(workq);
     
    350350        assert(!workq->stopping);
    351351        workq->stopping = true;
    352        
     352
    353353        /* Respect lock ordering - do not hold workq->lock during broadcast. */
    354354        irq_spinlock_unlock(&workq->lock, true);
    355        
     355
    356356        condvar_broadcast(&workq->activate_worker);
    357357}
     
    361361{
    362362        assert(!PREEMPTION_DISABLED);
    363        
     363
    364364        irq_spinlock_lock(&workq->lock, true);
    365        
     365
    366366        list_foreach_safe(workq->workers, cur_worker, next_worker) {
    367367                thread_t *worker = list_get_instance(cur_worker, thread_t, workq_link);
     
    370370                /* Wait without the lock. */
    371371                irq_spinlock_unlock(&workq->lock, true);
    372                
     372
    373373                thread_join(worker);
    374374                thread_detach(worker);
    375                
     375
    376376                irq_spinlock_lock(&workq->lock, true);
    377377        }
    378        
     378
    379379        assert(list_empty(&workq->workers));
    380        
     380
    381381        /* Wait for deferred add_worker_op(), signal_worker_op() to finish. */
    382382        while (0 < workq->cur_worker_cnt || 0 < workq->pending_op_cnt) {
    383383                irq_spinlock_unlock(&workq->lock, true);
    384                
     384
    385385                scheduler();
    386                
     386
    387387                irq_spinlock_lock(&workq->lock, true);
    388388        }
    389        
     389
    390390        irq_spinlock_unlock(&workq->lock, true);
    391391}
     
    422422 *                  until func() is entered.
    423423 * @param func      User supplied function to invoke in a worker thread.
    424  
     424
    425425 * @return false if work queue is shutting down; function is not
    426426 *               queued for further processing.
     
    442442 *                  until func() is entered.
    443443 * @param func      User supplied function to invoke in a worker thread.
    444  
     444
    445445 * @return false if work queue is shutting down; function is not
    446446 *               queued for further processing.
     
    467467 * @param func      User supplied function to invoke in a worker thread.
    468468 * @param can_block May adding this work item block?
    469  
     469
    470470 * @return false if work queue is shutting down; function is not
    471471 *               queued for further processing.
     
    476476{
    477477        assert(!workq_corrupted(workq));
    478        
     478
    479479        bool success = true;
    480480        signal_op_t signal_op = NULL;
    481        
     481
    482482        irq_spinlock_lock(&workq->lock, true);
    483        
     483
    484484        if (workq->stopping) {
    485485                success = false;
     
    489489                ++workq->item_cnt;
    490490                success = true;
    491                
     491
    492492                if (!booting) {
    493493                        signal_op = signal_worker_logic(workq, can_block);
     
    499499                }
    500500        }
    501        
     501
    502502        irq_spinlock_unlock(&workq->lock, true);
    503503
     
    505505                signal_op(workq);
    506506        }
    507        
     507
    508508        return success;
    509509}
     
    515515        work_item->cookie = WORK_ITEM_MAGIC;
    516516#endif
    517        
     517
    518518        link_initialize(&work_item->queue_link);
    519519        work_item->func = func;
     
    524524{
    525525        assert(irq_spinlock_locked(&workq->lock));
    526        
     526
    527527        /* Workers blocked are sleeping in the work function (ie not idle). */
    528528        assert(workq->blocked_worker_cnt <= workq->cur_worker_cnt);
    529529        /* Idle workers are waiting for more work to arrive in condvar_wait. */
    530530        assert(workq->idle_worker_cnt <= workq->cur_worker_cnt);
    531        
     531
    532532        /* Idle + blocked workers == sleeping worker threads. */
    533533        size_t sleeping_workers = workq->blocked_worker_cnt + workq->idle_worker_cnt;
    534        
     534
    535535        assert(sleeping_workers <= workq->cur_worker_cnt);
    536536        /* Workers pending activation are idle workers not yet given a time slice. */
    537537        assert(workq->activate_pending <= workq->idle_worker_cnt);
    538        
     538
    539539        /*
    540540         * Workers actively running the work func() this very moment and
     
    553553{
    554554        assert(irq_spinlock_locked(&workq->lock));
    555        
     555
    556556        /*
    557557         * Workers actively running the work func() and are neither blocked nor
     
    578578
    579579        condvar_signal(&workq->activate_worker);
    580        
     580
    581581        irq_spinlock_lock(&workq->lock, true);
    582582        assert(0 < workq->pending_op_cnt);
     
    597597        assert(!workq_corrupted(workq));
    598598        assert(irq_spinlock_locked(&workq->lock));
    599        
     599
    600600        /* Only signal workers if really necessary. */
    601601        signal_op_t signal_op = NULL;
     
    630630                        bool need_worker = (active < max_concurrent_workers)
    631631                                && (workq->cur_worker_cnt < max_worker_cnt);
    632                        
     632
    633633                        if (need_worker && can_block) {
    634634                                signal_op = add_worker_op;
     
    641641                                ++workq->cur_worker_cnt;
    642642                        }
    643                        
     643
    644644                        /*
    645645                         * We cannot create a new worker but we need one desperately
     
    648648                        if (need_worker && !can_block && 0 == active) {
    649649                                assert(0 == workq->idle_worker_cnt);
    650                                
     650
    651651                                irq_spinlock_lock(&nonblock_adder.lock, true);
    652652
     
    667667                signal_op = NULL;
    668668        }
    669        
     669
    670670        return signal_op;
    671671}
     
    682682                return;
    683683        }
    684        
     684
    685685        assert(arg != NULL);
    686        
     686
    687687        struct work_queue *workq = arg;
    688688        work_t *work_item;
    689        
     689
    690690        while (dequeue_work(workq, &work_item)) {
    691691                /* Copy the func field so func() can safely free work_item. */
     
    700700{
    701701        assert(!workq_corrupted(workq));
    702        
     702
    703703        irq_spinlock_lock(&workq->lock, true);
    704        
     704
    705705        /* Check if we should exit if load is low. */
    706706        if (!workq->stopping && worker_unnecessary(workq)) {
     
    710710                list_remove(&THREAD->workq_link);
    711711                irq_spinlock_unlock(&workq->lock, true);
    712                
     712
    713713                thread_detach(THREAD);
    714714                return false;
    715715        }
    716        
     716
    717717        bool stop = false;
    718        
     718
    719719        /* Wait for work to arrive. */
    720720        while (list_empty(&workq->queue) && !workq->stopping) {
    721721                cv_wait(workq);
    722                
     722
    723723                if (0 < workq->activate_pending)
    724724                        --workq->activate_pending;
     
    729729                link_t *work_link = list_first(&workq->queue);
    730730                *pwork_item = list_get_instance(work_link, work_t, queue_link);
    731                
     731
    732732#ifdef CONFIG_DEBUG
    733733                assert(!work_item_corrupted(*pwork_item));
     
    736736                list_remove(work_link);
    737737                --workq->item_cnt;
    738                
     738
    739739                stop = false;
    740740        } else {
     
    744744                stop = true;
    745745        }
    746        
     746
    747747        irq_spinlock_unlock(&workq->lock, true);
    748        
     748
    749749        return !stop;
    750750}
     
    754754{
    755755        assert(irq_spinlock_locked(&workq->lock));
    756        
     756
    757757        /* No work is pending. We don't need too many idle threads. */
    758758        if (list_empty(&workq->queue)) {
     
    775775        ++workq->idle_worker_cnt;
    776776        THREAD->workq_idling = true;
    777        
     777
    778778        /* Ignore lock ordering just here. */
    779779        assert(irq_spinlock_locked(&workq->lock));
    780        
     780
    781781        _condvar_wait_timeout_irq_spinlock(&workq->activate_worker,
    782782                &workq->lock, SYNCH_NO_TIMEOUT, SYNCH_FLAGS_NONE);
     
    784784        assert(!workq_corrupted(workq));
    785785        assert(irq_spinlock_locked(&workq->lock));
    786        
     786
    787787        THREAD->workq_idling = false;
    788788        --workq->idle_worker_cnt;
     
    803803                assert(THREAD != thread);
    804804                assert(!workq_corrupted(thread->workq));
    805                
     805
    806806                /* Protected by thread->lock */
    807807                thread->workq_blocked = false;
    808                
     808
    809809                irq_spinlock_lock(&thread->workq->lock, true);
    810810                --thread->workq->blocked_worker_cnt;
     
    823823                assert(!THREAD->workq_blocked);
    824824                assert(!workq_corrupted(THREAD->workq));
    825                
     825
    826826                THREAD->workq_blocked = true;
    827                
     827
    828828                irq_spinlock_lock(&THREAD->workq->lock, false);
    829829
    830830                ++THREAD->workq->blocked_worker_cnt;
    831                
     831
    832832                bool can_block = false;
    833833                signal_op_t op = signal_worker_logic(THREAD->workq, can_block);
    834                
     834
    835835                irq_spinlock_unlock(&THREAD->workq->lock, false);
    836                
     836
    837837                if (op) {
    838838                        assert(add_worker_noblock_op == op || signal_worker_op == op);
     
    856856        const char *load_str = worker_surplus ? "decreasing" :
    857857                (0 < workq->activate_pending) ? "increasing" : "stable";
    858        
     858
    859859        irq_spinlock_unlock(&workq->lock, true);
    860        
     860
    861861        printf(
    862862                "Configuration: max_worker_cnt=%zu, min_worker_cnt=%zu,\n"
     
    893893
    894894        irq_spinlock_lock(&info->lock, true);
    895        
     895
    896896        while (list_empty(&info->work_queues) && !stop) {
    897897                errno_t ret = _condvar_wait_timeout_irq_spinlock(&info->req_cv,
    898898                        &info->lock, SYNCH_NO_TIMEOUT, SYNCH_FLAGS_INTERRUPTIBLE);
    899                
     899
    900900                stop = (ret == EINTR);
    901901        }
    902        
     902
    903903        if (!stop) {
    904904                *pworkq = list_get_instance(list_first(&info->work_queues),
     
    906906
    907907                assert(!workq_corrupted(*pworkq));
    908                
     908
    909909                list_remove(&(*pworkq)->nb_link);
    910910        }
    911        
     911
    912912        irq_spinlock_unlock(&info->lock, true);
    913        
     913
    914914        return !stop;
    915915}
     
    919919        nonblock_adder_t *info = arg;
    920920        struct work_queue *workq;
    921        
     921
    922922        while (dequeue_add_req(info, &workq)) {
    923923                add_worker(workq);
     
    931931        condvar_initialize(&nonblock_adder.req_cv);
    932932        list_initialize(&nonblock_adder.work_queues);
    933        
     933
    934934        nonblock_adder.thread = thread_create(thr_nonblock_add_worker,
    935935                &nonblock_adder, TASK, THREAD_FLAG_NONE, "kworkq-nb");
    936        
     936
    937937        if (nonblock_adder.thread) {
    938938                thread_ready(nonblock_adder.thread);
Note: See TracChangeset for help on using the changeset viewer.