Changeset a35b458 in mainline for kernel/generic/src/synch/workqueue.c


Ignore:
Timestamp:
2018-03-02T20:10:49Z (7 years ago)
Author:
Jiří Zárevúcky <zarevucky.jiri@…>
Branches:
lfn, master, serial, ticket/834-toolchain-update, topic/msim-upgrade, topic/simplify-dev-export
Children:
f1380b7
Parents:
3061bc1
git-author:
Jiří Zárevúcky <zarevucky.jiri@…> (2018-02-28 17:38:31)
git-committer:
Jiří Zárevúcky <zarevucky.jiri@…> (2018-03-02 20:10:49)
Message:

style: Remove trailing whitespace on _all_ lines, including empty ones, for particular file types.

Command used: tools/srepl '\s\+$' '' -- *.c *.h *.py *.sh *.s *.S *.ag

Currently, whitespace on empty lines is very inconsistent.
There are two basic choices: Either remove the whitespace, or keep empty lines
indented to the level of surrounding code. The former is AFAICT more common,
and also much easier to do automatically.

Alternatively, we could write script for automatic indentation, and use that
instead. However, if such a script exists, it's possible to use the indented
style locally, by having the editor apply relevant conversions on load/save,
without affecting remote repository. IMO, it makes more sense to adopt
the simpler rule.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • kernel/generic/src/synch/workqueue.c

    r3061bc1 ra35b458  
    5959         */
    6060        IRQ_SPINLOCK_DECLARE(lock);
    61        
     61
    6262        /* Activates a worker if new work arrives or if shutting down the queue. */
    6363        condvar_t activate_worker;
    64        
     64
    6565        /* Queue of work_items ready to be dispatched. */
    6666        list_t queue;
    67        
     67
    6868        /* List of worker threads. */
    6969        list_t workers;
    70        
     70
    7171        /* Number of work items queued. */
    7272        size_t item_cnt;
    73        
     73
    7474        /* Indicates the work queue is shutting down. */
    7575        bool stopping;
     
    8484        /* Number of blocked workers sleeping in work func() (ie not idle). */
    8585        size_t blocked_worker_cnt;
    86        
     86
    8787        /* Number of pending signal_worker_op() operations. */
    8888        size_t pending_op_cnt;
    89        
     89
    9090        link_t nb_link;
    91        
     91
    9292#ifdef CONFIG_DEBUG
    9393        /* Magic cookie for integrity checks. Immutable. Accessed without lock. */
     
    105105/** Max number of work items per active worker before a new worker is activated.*/
    106106static const size_t max_items_per_worker = 8;
    107        
     107
    108108/** System wide work queue. */
    109109static struct work_queue g_work_queue;
     
    157157         */
    158158        booting = false;
    159        
     159
    160160        nonblock_init();
    161        
     161
    162162        if (!add_worker(&g_work_queue))
    163163                panic("Could not create a single global work queue worker!\n");
    164        
     164
    165165}
    166166
     
    174174        /* Maximum concurrency without slowing down the system. */
    175175        max_concurrent_workers = max(2, config.cpu_count);
    176        
     176
    177177        workq_preinit(&g_work_queue, "kworkq");
    178178}
     
    188188{
    189189        struct work_queue *workq = malloc(sizeof(struct work_queue), 0);
    190        
     190
    191191        if (workq) {
    192192                if (workq_init(workq, name)) {
     
    194194                        return workq;
    195195                }
    196                
     196
    197197                free(workq);
    198198        }
    199        
     199
    200200        return NULL;
    201201}
     
    205205{
    206206        assert(!workq_corrupted(workq));
    207        
     207
    208208        irq_spinlock_lock(&workq->lock, true);
    209209        bool stopped = workq->stopping;
     
    212212#endif
    213213        irq_spinlock_unlock(&workq->lock, true);
    214        
     214
    215215        if (!stopped) {
    216216                workq_stop(workq);
     
    218218                assert(0 == running_workers);
    219219        }
    220        
     220
    221221#ifdef CONFIG_DEBUG
    222222        workq->cookie = 0;
    223223#endif
    224        
     224
    225225        free(workq);
    226226}
     
    232232        workq->cookie = WORKQ_MAGIC;
    233233#endif
    234        
     234
    235235        irq_spinlock_initialize(&workq->lock, name);
    236236        condvar_initialize(&workq->activate_worker);
    237        
     237
    238238        list_initialize(&workq->queue);
    239239        list_initialize(&workq->workers);
    240        
     240
    241241        workq->item_cnt = 0;
    242242        workq->stopping = false;
    243243        workq->name = name;
    244        
     244
    245245        workq->cur_worker_cnt = 1;
    246246        workq->idle_worker_cnt = 0;
    247247        workq->activate_pending = 0;
    248248        workq->blocked_worker_cnt = 0;
    249        
     249
    250250        workq->pending_op_cnt = 0;
    251251        link_initialize(&workq->nb_link);
     
    270270        thread_t *thread = thread_create(worker_thread, workq, TASK,
    271271                THREAD_FLAG_NONE, workq->name);
    272        
     272
    273273        if (!thread) {
    274274                irq_spinlock_lock(&workq->lock, true);
    275                
     275
    276276                /* cur_worker_cnt proactively increased in signal_worker_logic() .*/
    277277                assert(0 < workq->cur_worker_cnt);
    278278                --workq->cur_worker_cnt;
    279                
     279
    280280                irq_spinlock_unlock(&workq->lock, true);
    281281                return false;
    282282        }
    283        
     283
    284284        /* Respect lock ordering. */
    285285        irq_spinlock_lock(&thread->lock, true);
     
    290290        if (!workq->stopping) {
    291291                success = true;
    292                
     292
    293293                /* Try to distribute workers among cpus right away. */
    294294                unsigned int cpu_id = (workq->cur_worker_cnt) % config.cpu_active;
    295                
     295
    296296                if (!cpus[cpu_id].active)
    297297                        cpu_id = CPU->id;
     
    312312                 */
    313313                success = false;
    314                
     314
    315315                /* cur_worker_cnt proactively increased in signal_worker() .*/
    316316                assert(0 < workq->cur_worker_cnt);
    317317                --workq->cur_worker_cnt;
    318318        }
    319        
     319
    320320        irq_spinlock_unlock(&workq->lock, false);
    321321        irq_spinlock_unlock(&thread->lock, true);
     
    324324                thread_interrupt(thread);
    325325        }
    326                
     326
    327327        thread_ready(thread);
    328        
     328
    329329        return success;
    330330}
     
    337337{
    338338        assert(!workq_corrupted(workq));
    339        
     339
    340340        interrupt_workers(workq);
    341341        wait_for_workers(workq);
     
    350350        assert(!workq->stopping);
    351351        workq->stopping = true;
    352        
     352
    353353        /* Respect lock ordering - do not hold workq->lock during broadcast. */
    354354        irq_spinlock_unlock(&workq->lock, true);
    355        
     355
    356356        condvar_broadcast(&workq->activate_worker);
    357357}
     
    361361{
    362362        assert(!PREEMPTION_DISABLED);
    363        
     363
    364364        irq_spinlock_lock(&workq->lock, true);
    365        
     365
    366366        list_foreach_safe(workq->workers, cur_worker, next_worker) {
    367367                thread_t *worker = list_get_instance(cur_worker, thread_t, workq_link);
     
    370370                /* Wait without the lock. */
    371371                irq_spinlock_unlock(&workq->lock, true);
    372                
     372
    373373                thread_join(worker);
    374374                thread_detach(worker);
    375                
     375
    376376                irq_spinlock_lock(&workq->lock, true);
    377377        }
    378        
     378
    379379        assert(list_empty(&workq->workers));
    380        
     380
    381381        /* Wait for deferred add_worker_op(), signal_worker_op() to finish. */
    382382        while (0 < workq->cur_worker_cnt || 0 < workq->pending_op_cnt) {
    383383                irq_spinlock_unlock(&workq->lock, true);
    384                
     384
    385385                scheduler();
    386                
     386
    387387                irq_spinlock_lock(&workq->lock, true);
    388388        }
    389        
     389
    390390        irq_spinlock_unlock(&workq->lock, true);
    391391}
     
    422422 *                  until func() is entered.
    423423 * @param func      User supplied function to invoke in a worker thread.
    424  
     424
    425425 * @return false if work queue is shutting down; function is not
    426426 *               queued for further processing.
     
    442442 *                  until func() is entered.
    443443 * @param func      User supplied function to invoke in a worker thread.
    444  
     444
    445445 * @return false if work queue is shutting down; function is not
    446446 *               queued for further processing.
     
    467467 * @param func      User supplied function to invoke in a worker thread.
    468468 * @param can_block May adding this work item block?
    469  
     469
    470470 * @return false if work queue is shutting down; function is not
    471471 *               queued for further processing.
     
    476476{
    477477        assert(!workq_corrupted(workq));
    478        
     478
    479479        bool success = true;
    480480        signal_op_t signal_op = NULL;
    481        
     481
    482482        irq_spinlock_lock(&workq->lock, true);
    483        
     483
    484484        if (workq->stopping) {
    485485                success = false;
     
    489489                ++workq->item_cnt;
    490490                success = true;
    491                
     491
    492492                if (!booting) {
    493493                        signal_op = signal_worker_logic(workq, can_block);
     
    499499                }
    500500        }
    501        
     501
    502502        irq_spinlock_unlock(&workq->lock, true);
    503503
     
    505505                signal_op(workq);
    506506        }
    507        
     507
    508508        return success;
    509509}
     
    515515        work_item->cookie = WORK_ITEM_MAGIC;
    516516#endif
    517        
     517
    518518        link_initialize(&work_item->queue_link);
    519519        work_item->func = func;
     
    524524{
    525525        assert(irq_spinlock_locked(&workq->lock));
    526        
     526
    527527        /* Workers blocked are sleeping in the work function (ie not idle). */
    528528        assert(workq->blocked_worker_cnt <= workq->cur_worker_cnt);
    529529        /* Idle workers are waiting for more work to arrive in condvar_wait. */
    530530        assert(workq->idle_worker_cnt <= workq->cur_worker_cnt);
    531        
     531
    532532        /* Idle + blocked workers == sleeping worker threads. */
    533533        size_t sleeping_workers = workq->blocked_worker_cnt + workq->idle_worker_cnt;
    534        
     534
    535535        assert(sleeping_workers <= workq->cur_worker_cnt);
    536536        /* Workers pending activation are idle workers not yet given a time slice. */
    537537        assert(workq->activate_pending <= workq->idle_worker_cnt);
    538        
     538
    539539        /*
    540540         * Workers actively running the work func() this very moment and
     
    553553{
    554554        assert(irq_spinlock_locked(&workq->lock));
    555        
     555
    556556        /*
    557557         * Workers actively running the work func() and are neither blocked nor
     
    578578
    579579        condvar_signal(&workq->activate_worker);
    580        
     580
    581581        irq_spinlock_lock(&workq->lock, true);
    582582        assert(0 < workq->pending_op_cnt);
     
    597597        assert(!workq_corrupted(workq));
    598598        assert(irq_spinlock_locked(&workq->lock));
    599        
     599
    600600        /* Only signal workers if really necessary. */
    601601        signal_op_t signal_op = NULL;
     
    630630                        bool need_worker = (active < max_concurrent_workers)
    631631                                && (workq->cur_worker_cnt < max_worker_cnt);
    632                        
     632
    633633                        if (need_worker && can_block) {
    634634                                signal_op = add_worker_op;
     
    641641                                ++workq->cur_worker_cnt;
    642642                        }
    643                        
     643
    644644                        /*
    645645                         * We cannot create a new worker but we need one desperately
     
    648648                        if (need_worker && !can_block && 0 == active) {
    649649                                assert(0 == workq->idle_worker_cnt);
    650                                
     650
    651651                                irq_spinlock_lock(&nonblock_adder.lock, true);
    652652
     
    667667                signal_op = NULL;
    668668        }
    669        
     669
    670670        return signal_op;
    671671}
     
    682682                return;
    683683        }
    684        
     684
    685685        assert(arg != NULL);
    686        
     686
    687687        struct work_queue *workq = arg;
    688688        work_t *work_item;
    689        
     689
    690690        while (dequeue_work(workq, &work_item)) {
    691691                /* Copy the func field so func() can safely free work_item. */
     
    700700{
    701701        assert(!workq_corrupted(workq));
    702        
     702
    703703        irq_spinlock_lock(&workq->lock, true);
    704        
     704
    705705        /* Check if we should exit if load is low. */
    706706        if (!workq->stopping && worker_unnecessary(workq)) {
     
    710710                list_remove(&THREAD->workq_link);
    711711                irq_spinlock_unlock(&workq->lock, true);
    712                
     712
    713713                thread_detach(THREAD);
    714714                return false;
    715715        }
    716        
     716
    717717        bool stop = false;
    718        
     718
    719719        /* Wait for work to arrive. */
    720720        while (list_empty(&workq->queue) && !workq->stopping) {
    721721                cv_wait(workq);
    722                
     722
    723723                if (0 < workq->activate_pending)
    724724                        --workq->activate_pending;
     
    729729                link_t *work_link = list_first(&workq->queue);
    730730                *pwork_item = list_get_instance(work_link, work_t, queue_link);
    731                
     731
    732732#ifdef CONFIG_DEBUG
    733733                assert(!work_item_corrupted(*pwork_item));
     
    736736                list_remove(work_link);
    737737                --workq->item_cnt;
    738                
     738
    739739                stop = false;
    740740        } else {
     
    744744                stop = true;
    745745        }
    746        
     746
    747747        irq_spinlock_unlock(&workq->lock, true);
    748        
     748
    749749        return !stop;
    750750}
     
    754754{
    755755        assert(irq_spinlock_locked(&workq->lock));
    756        
     756
    757757        /* No work is pending. We don't need too many idle threads. */
    758758        if (list_empty(&workq->queue)) {
     
    775775        ++workq->idle_worker_cnt;
    776776        THREAD->workq_idling = true;
    777        
     777
    778778        /* Ignore lock ordering just here. */
    779779        assert(irq_spinlock_locked(&workq->lock));
    780        
     780
    781781        _condvar_wait_timeout_irq_spinlock(&workq->activate_worker,
    782782                &workq->lock, SYNCH_NO_TIMEOUT, SYNCH_FLAGS_NONE);
     
    784784        assert(!workq_corrupted(workq));
    785785        assert(irq_spinlock_locked(&workq->lock));
    786        
     786
    787787        THREAD->workq_idling = false;
    788788        --workq->idle_worker_cnt;
     
    803803                assert(THREAD != thread);
    804804                assert(!workq_corrupted(thread->workq));
    805                
     805
    806806                /* Protected by thread->lock */
    807807                thread->workq_blocked = false;
    808                
     808
    809809                irq_spinlock_lock(&thread->workq->lock, true);
    810810                --thread->workq->blocked_worker_cnt;
     
    823823                assert(!THREAD->workq_blocked);
    824824                assert(!workq_corrupted(THREAD->workq));
    825                
     825
    826826                THREAD->workq_blocked = true;
    827                
     827
    828828                irq_spinlock_lock(&THREAD->workq->lock, false);
    829829
    830830                ++THREAD->workq->blocked_worker_cnt;
    831                
     831
    832832                bool can_block = false;
    833833                signal_op_t op = signal_worker_logic(THREAD->workq, can_block);
    834                
     834
    835835                irq_spinlock_unlock(&THREAD->workq->lock, false);
    836                
     836
    837837                if (op) {
    838838                        assert(add_worker_noblock_op == op || signal_worker_op == op);
     
    856856        const char *load_str = worker_surplus ? "decreasing" :
    857857                (0 < workq->activate_pending) ? "increasing" : "stable";
    858        
     858
    859859        irq_spinlock_unlock(&workq->lock, true);
    860        
     860
    861861        printf(
    862862                "Configuration: max_worker_cnt=%zu, min_worker_cnt=%zu,\n"
     
    893893
    894894        irq_spinlock_lock(&info->lock, true);
    895        
     895
    896896        while (list_empty(&info->work_queues) && !stop) {
    897897                errno_t ret = _condvar_wait_timeout_irq_spinlock(&info->req_cv,
    898898                        &info->lock, SYNCH_NO_TIMEOUT, SYNCH_FLAGS_INTERRUPTIBLE);
    899                
     899
    900900                stop = (ret == EINTR);
    901901        }
    902        
     902
    903903        if (!stop) {
    904904                *pworkq = list_get_instance(list_first(&info->work_queues),
     
    906906
    907907                assert(!workq_corrupted(*pworkq));
    908                
     908
    909909                list_remove(&(*pworkq)->nb_link);
    910910        }
    911        
     911
    912912        irq_spinlock_unlock(&info->lock, true);
    913        
     913
    914914        return !stop;
    915915}
     
    919919        nonblock_adder_t *info = arg;
    920920        struct work_queue *workq;
    921        
     921
    922922        while (dequeue_add_req(info, &workq)) {
    923923                add_worker(workq);
     
    931931        condvar_initialize(&nonblock_adder.req_cv);
    932932        list_initialize(&nonblock_adder.work_queues);
    933        
     933
    934934        nonblock_adder.thread = thread_create(thr_nonblock_add_worker,
    935935                &nonblock_adder, TASK, THREAD_FLAG_NONE, "kworkq-nb");
    936        
     936
    937937        if (nonblock_adder.thread) {
    938938                thread_ready(nonblock_adder.thread);
Note: See TracChangeset for help on using the changeset viewer.