Changeset 514d561 in mainline for uspace/lib/c/generic/async


Ignore:
Timestamp:
2018-07-20T16:27:20Z (7 years ago)
Author:
Jiří Zárevúcky <jiri.zarevucky@…>
Branches:
lfn, master, serial, ticket/834-toolchain-update, topic/msim-upgrade, topic/simplify-dev-export
Children:
05208d9
Parents:
7137f74c
git-author:
Jiří Zárevúcky <jiri.zarevucky@…> (2018-07-19 21:52:47)
git-committer:
Jiří Zárevúcky <jiri.zarevucky@…> (2018-07-20 16:27:20)
Message:

Fibril/async implementation overhaul.

This commit marks the move towards treating the fibril library as a mere
implementation of a generic threading interface. Understood as a layer that
wraps the kernel threads, we not only have to wrap threading itself, but also
every syscall that blocks the kernel thread (by blocking, we mean thread not
doing useful work until an external event happens — e.g. locking a kernel
mutex or thread sleep is understood as blocking, but an as_area_create() is not,
despite potentially taking a long time to complete).

Consequently, we implement fibril_ipc_wait() as a fibril-native wrapper for
kernel's ipc_wait(), and also implement timer functionality like timeouts
as part of the fibril library. This removes the interdependency between fibril
implementation and the async framework — in theory, the fibril API could be
reimplemented as a simple 1:1 shim, and the async framework would continue
working normally (note that the current implementation of loader complicates
this).

To better isolate the fibril internals from the implementation of high-level
synchronization, a fibril_event_t is added. This object conceptually acts
like a single slot wait queue. All other synchronization is implemented in
terms of this primitive.

Location:
uspace/lib/c/generic/async
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • uspace/lib/c/generic/async/client.c

    r7137f74c r514d561  
    104104#include <ipc/irq.h>
    105105#include <ipc/event.h>
    106 #include <futex.h>
    107106#include <fibril.h>
    108107#include <adt/hash_table.h>
     
    128127/** Message data */
    129128typedef struct {
    130         awaiter_t wdata;
     129        fibril_event_t received;
    131130
    132131        /** If reply was received. */
     
    136135        bool forget;
    137136
    138         /** If already destroyed. */
    139         bool destroyed;
    140 
    141137        /** Pointer to where the answer data is stored. */
    142138        ipc_call_t *dataptr;
     
    145141} amsg_t;
    146142
    147 static void to_event_initialize(to_event_t *to)
    148 {
    149         struct timeval tv = { 0, 0 };
    150 
    151         to->inlist = false;
    152         to->occurred = false;
    153         link_initialize(&to->link);
    154         to->expires = tv;
    155 }
    156 
    157 static void wu_event_initialize(wu_event_t *wu)
    158 {
    159         wu->inlist = false;
    160         link_initialize(&wu->link);
    161 }
    162 
    163 void awaiter_initialize(awaiter_t *aw)
    164 {
    165         aw->fid = 0;
    166         aw->active = false;
    167         to_event_initialize(&aw->to_event);
    168         wu_event_initialize(&aw->wu_event);
    169 }
    170 
    171143static amsg_t *amsg_create(void)
    172144{
    173         amsg_t *msg = malloc(sizeof(amsg_t));
    174         if (msg) {
    175                 msg->done = false;
    176                 msg->forget = false;
    177                 msg->destroyed = false;
    178                 msg->dataptr = NULL;
    179                 msg->retval = EINVAL;
    180                 awaiter_initialize(&msg->wdata);
    181         }
    182 
    183         return msg;
     145        return calloc(1, sizeof(amsg_t));
    184146}
    185147
    186148static void amsg_destroy(amsg_t *msg)
    187149{
    188         if (!msg)
    189                 return;
    190 
    191         assert(!msg->destroyed);
    192         msg->destroyed = true;
    193150        free(msg);
    194151}
     
    251208        msg->retval = IPC_GET_RETVAL(*data);
    252209
    253         /* Copy data after futex_down, just in case the call was detached */
     210        /* Copy data inside lock, just in case the call was detached */
    254211        if ((msg->dataptr) && (data))
    255212                *msg->dataptr = *data;
    256213
    257         write_barrier();
    258 
    259         /* Remove message from timeout list */
    260         if (msg->wdata.to_event.inlist)
    261                 list_remove(&msg->wdata.to_event.link);
    262 
    263214        msg->done = true;
    264215
    265216        if (msg->forget) {
    266                 assert(msg->wdata.active);
    267217                amsg_destroy(msg);
    268         } else if (!msg->wdata.active) {
    269                 msg->wdata.active = true;
    270                 fibril_add_ready(msg->wdata.fid);
     218        } else {
     219                fibril_notify(&msg->received);
    271220        }
    272221
     
    301250
    302251        msg->dataptr = dataptr;
    303         msg->wdata.active = true;
    304252
    305253        errno_t rc = ipc_call_async_4(exch->phone, imethod, arg1, arg2, arg3,
     
    343291
    344292        msg->dataptr = dataptr;
    345         msg->wdata.active = true;
    346293
    347294        errno_t rc = ipc_call_async_5(exch->phone, imethod, arg1, arg2, arg3,
     
    371318
    372319        amsg_t *msg = (amsg_t *) amsgid;
    373 
    374         futex_lock(&async_futex);
    375 
    376         assert(!msg->forget);
    377         assert(!msg->destroyed);
    378 
    379         if (msg->done) {
    380                 futex_unlock(&async_futex);
    381                 goto done;
    382         }
    383 
    384         msg->wdata.fid = fibril_get_id();
    385         msg->wdata.active = false;
    386         msg->wdata.to_event.inlist = false;
    387 
    388         /* Leave the async_futex locked when entering this function */
    389         fibril_switch(FIBRIL_FROM_BLOCKED);
    390         futex_unlock(&async_futex);
    391 
    392 done:
     320        fibril_wait_for(&msg->received);
     321
    393322        if (retval)
    394323                *retval = msg->retval;
     
    420349
    421350        amsg_t *msg = (amsg_t *) amsgid;
    422 
    423         futex_lock(&async_futex);
    424 
    425         assert(!msg->forget);
    426         assert(!msg->destroyed);
    427 
    428         if (msg->done) {
    429                 futex_unlock(&async_futex);
    430                 goto done;
    431         }
    432351
    433352        /*
     
    438357                timeout = 0;
    439358
    440         getuptime(&msg->wdata.to_event.expires);
    441         tv_add_diff(&msg->wdata.to_event.expires, timeout);
    442 
    443         /*
    444          * Current fibril is inserted as waiting regardless of the
    445          * "size" of the timeout.
    446          *
    447          * Checking for msg->done and immediately bailing out when
    448          * timeout == 0 would mean that the manager fibril would never
    449          * run (consider single threaded program).
    450          * Thus the IPC answer would be never retrieved from the kernel.
    451          *
    452          * Notice that the actual delay would be very small because we
    453          * - switch to manager fibril
    454          * - the manager sees expired timeout
    455          * - and thus adds us back to ready queue
    456          * - manager switches back to some ready fibril
    457          *   (prior it, it checks for incoming IPC).
    458          *
    459          */
    460         msg->wdata.fid = fibril_get_id();
    461         msg->wdata.active = false;
    462         async_insert_timeout(&msg->wdata);
    463 
    464         /* Leave the async_futex locked when entering this function */
    465         fibril_switch(FIBRIL_FROM_BLOCKED);
    466         futex_unlock(&async_futex);
    467 
    468         if (!msg->done)
    469                 return ETIMEOUT;
    470 
    471 done:
     359        struct timeval expires;
     360        getuptime(&expires);
     361        tv_add_diff(&expires, timeout);
     362
     363        errno_t rc = fibril_wait_timeout(&msg->received, &expires);
     364        if (rc != EOK)
     365                return rc;
     366
    472367        if (retval)
    473368                *retval = msg->retval;
     
    475370        amsg_destroy(msg);
    476371
    477         return 0;
     372        return EOK;
    478373}
    479374
     
    494389
    495390        assert(!msg->forget);
    496         assert(!msg->destroyed);
    497391
    498392        futex_lock(&async_futex);
     
    506400
    507401        futex_unlock(&async_futex);
    508 }
    509 
    510 /** Wait for specified time.
    511  *
    512  * The current fibril is suspended but the thread continues to execute.
    513  *
    514  * @param timeout Duration of the wait in microseconds.
    515  *
    516  */
    517 void fibril_usleep(suseconds_t timeout)
    518 {
    519         awaiter_t awaiter;
    520         awaiter_initialize(&awaiter);
    521 
    522         awaiter.fid = fibril_get_id();
    523 
    524         getuptime(&awaiter.to_event.expires);
    525         tv_add_diff(&awaiter.to_event.expires, timeout);
    526 
    527         futex_lock(&async_futex);
    528 
    529         async_insert_timeout(&awaiter);
    530 
    531         /* Leave the async_futex locked when entering this function */
    532         fibril_switch(FIBRIL_FROM_BLOCKED);
    533         futex_unlock(&async_futex);
    534 }
    535 
    536 /** Delay execution for the specified number of seconds
    537  *
    538  * @param sec Number of seconds to sleep
    539  */
    540 void fibril_sleep(unsigned int sec)
    541 {
    542         /*
    543          * Sleep in 1000 second steps to support
    544          * full argument range
    545          */
    546 
    547         while (sec > 0) {
    548                 unsigned int period = (sec > 1000) ? 1000 : sec;
    549 
    550                 fibril_usleep(period * 1000000);
    551                 sec -= period;
    552         }
    553402}
    554403
     
    716565
    717566        msg->dataptr = &result;
    718         msg->wdata.active = true;
    719567
    720568        errno_t rc = ipc_call_async_4(phone, IPC_M_CONNECT_ME_TO,
  • uspace/lib/c/generic/async/server.c

    r7137f74c r514d561  
    104104#include <ipc/irq.h>
    105105#include <ipc/event.h>
    106 #include <futex.h>
    107106#include <fibril.h>
    108107#include <adt/hash_table.h>
     
    118117#include <stdlib.h>
    119118#include <macros.h>
     119#include <str_error.h>
    120120#include <as.h>
    121121#include <abi/mm/as.h>
     
    127127/** Async framework global futex */
    128128futex_t async_futex = FUTEX_INITIALIZER;
    129 
    130 /** Number of threads waiting for IPC in the kernel. */
    131 static atomic_t threads_in_ipc_wait = { 0 };
    132129
    133130/** Call data */
     
    148145/* Server connection data */
    149146typedef struct {
    150         awaiter_t wdata;
     147        /** Fibril handling the connection. */
     148        fid_t fid;
    151149
    152150        /** Hash table link. */
     
    161159        /** Link to the client tracking structure. */
    162160        client_t *client;
     161
     162        /** Message event. */
     163        fibril_event_t msg_arrived;
    163164
    164165        /** Messages that should be delivered to this fibril. */
     
    251252/* The remaining structures are guarded by async_futex. */
    252253static hash_table_t conn_hash_table;
    253 static LIST_INITIALIZE(timeout_list);
    254254
    255255static size_t client_key_hash(void *key)
     
    487487                        ipc_answer_0(call->cap_handle, ENOMEM);
    488488
    489                 return (uintptr_t) NULL;
     489                return (fid_t) NULL;
    490490        }
    491491
    492492        conn->in_task_id = in_task_id;
    493493        conn->in_phone_hash = in_phone_hash;
     494        conn->msg_arrived = FIBRIL_EVENT_INIT;
    494495        list_initialize(&conn->msg_queue);
    495496        conn->close_chandle = CAP_NIL;
     
    503504
    504505        /* We will activate the fibril ASAP */
    505         conn->wdata.active = true;
    506         conn->wdata.fid = fibril_create(connection_fibril, conn);
    507 
    508         if (conn->wdata.fid == 0) {
     506        conn->fid = fibril_create(connection_fibril, conn);
     507
     508        if (conn->fid == 0) {
    509509                free(conn);
    510510
     
    512512                        ipc_answer_0(call->cap_handle, ENOMEM);
    513513
    514                 return (uintptr_t) NULL;
     514                return (fid_t) NULL;
    515515        }
    516516
     
    521521        futex_unlock(&async_futex);
    522522
    523         fibril_add_ready(conn->wdata.fid);
    524 
    525         return conn->wdata.fid;
     523        fibril_add_ready(conn->fid);
     524
     525        return conn->fid;
    526526}
    527527
     
    566566        fid_t fid = async_new_connection(answer.in_task_id, phone_hash,
    567567            NULL, handler, data);
    568         if (fid == (uintptr_t) NULL)
     568        if (fid == (fid_t) NULL)
    569569                return ENOMEM;
    570570
     
    602602};
    603603
    604 /** Sort in current fibril's timeout request.
    605  *
    606  * @param wd Wait data of the current fibril.
    607  *
    608  */
    609 void async_insert_timeout(awaiter_t *wd)
    610 {
    611         assert(wd);
    612 
    613         wd->to_event.occurred = false;
    614         wd->to_event.inlist = true;
    615 
    616         link_t *tmp = timeout_list.head.next;
    617         while (tmp != &timeout_list.head) {
    618                 awaiter_t *cur =
    619                     list_get_instance(tmp, awaiter_t, to_event.link);
    620 
    621                 if (tv_gteq(&cur->to_event.expires, &wd->to_event.expires))
    622                         break;
    623 
    624                 tmp = tmp->next;
    625         }
    626 
    627         list_insert_before(&wd->to_event.link, tmp);
    628 }
    629 
    630604/** Try to route a call to an appropriate connection fibril.
    631605 *
     
    657631        connection_t *conn = hash_table_get_inst(link, connection_t, link);
    658632
     633        // FIXME: malloc in critical section
    659634        msg_t *msg = malloc(sizeof(*msg));
    660635        if (!msg) {
     
    670645
    671646        /* If the connection fibril is waiting for an event, activate it */
    672         if (!conn->wdata.active) {
    673 
    674                 /* If in timeout list, remove it */
    675                 if (conn->wdata.to_event.inlist) {
    676                         conn->wdata.to_event.inlist = false;
    677                         list_remove(&conn->wdata.to_event.link);
    678                 }
    679 
    680                 conn->wdata.active = true;
    681                 fibril_add_ready(conn->wdata.fid);
    682         }
     647        fibril_notify(&conn->msg_arrived);
    683648
    684649        futex_unlock(&async_futex);
     
    987952        connection_t *conn = fibril_connection;
    988953
     954        struct timeval tv;
     955        struct timeval *expires = NULL;
     956        if (usecs) {
     957                getuptime(&tv);
     958                tv_add_diff(&tv, usecs);
     959                expires = &tv;
     960        }
     961
    989962        futex_lock(&async_futex);
    990 
    991         if (usecs) {
    992                 getuptime(&conn->wdata.to_event.expires);
    993                 tv_add_diff(&conn->wdata.to_event.expires, usecs);
    994         } else
    995                 conn->wdata.to_event.inlist = false;
    996963
    997964        /* If nothing in queue, wait until something arrives */
     
    1011978                }
    1012979
    1013                 if (usecs)
    1014                         async_insert_timeout(&conn->wdata);
    1015 
    1016                 conn->wdata.active = false;
    1017 
    1018                 /*
    1019                  * Note: the current fibril will be rescheduled either due to a
    1020                  * timeout or due to an arriving message destined to it. In the
    1021                  * former case, handle_expired_timeouts() and, in the latter
    1022                  * case, route_call() will perform the wakeup.
    1023                  */
    1024                 fibril_switch(FIBRIL_FROM_BLOCKED);
    1025 
    1026                 if ((usecs) && (conn->wdata.to_event.occurred) &&
    1027                     (list_empty(&conn->msg_queue))) {
    1028                         /* If we timed out -> exit */
    1029                         futex_unlock(&async_futex);
     980                // TODO: replace with cvar
     981                futex_unlock(&async_futex);
     982
     983                errno_t rc = fibril_wait_timeout(&conn->msg_arrived, expires);
     984                if (rc == ETIMEOUT)
    1030985                        return false;
    1031                 }
     986
     987                futex_lock(&async_futex);
    1032988        }
    1033989
     
    11261082}
    11271083
    1128 /** Fire all timeouts that expired. */
    1129 static suseconds_t handle_expired_timeouts(unsigned int *flags)
    1130 {
    1131         /* Make sure the async_futex is held. */
    1132         futex_assert_is_locked(&async_futex);
    1133 
    1134         struct timeval tv;
    1135         getuptime(&tv);
    1136 
    1137         bool fired = false;
    1138 
    1139         link_t *cur = list_first(&timeout_list);
    1140         while (cur != NULL) {
    1141                 awaiter_t *waiter =
    1142                     list_get_instance(cur, awaiter_t, to_event.link);
    1143 
    1144                 if (tv_gt(&waiter->to_event.expires, &tv)) {
    1145                         if (fired) {
    1146                                 *flags = SYNCH_FLAGS_NON_BLOCKING;
    1147                                 return 0;
    1148                         }
    1149                         *flags = 0;
    1150                         return tv_sub_diff(&waiter->to_event.expires, &tv);
    1151                 }
    1152 
    1153                 list_remove(&waiter->to_event.link);
    1154                 waiter->to_event.inlist = false;
    1155                 waiter->to_event.occurred = true;
    1156 
    1157                 /*
    1158                  * Redundant condition?
    1159                  * The fibril should not be active when it gets here.
    1160                  */
    1161                 if (!waiter->active) {
    1162                         waiter->active = true;
    1163                         fibril_add_ready(waiter->fid);
    1164                         fired = true;
    1165                 }
    1166 
    1167                 cur = list_first(&timeout_list);
    1168         }
    1169 
    1170         if (fired) {
    1171                 *flags = SYNCH_FLAGS_NON_BLOCKING;
    1172                 return 0;
    1173         }
    1174 
    1175         return SYNCH_NO_TIMEOUT;
    1176 }
    1177 
    11781084/** Endless loop dispatching incoming calls and answers.
    11791085 *
     
    11831089static errno_t async_manager_worker(void)
    11841090{
     1091        ipc_call_t call;
     1092        errno_t rc;
     1093
    11851094        while (true) {
    1186                 futex_lock(&async_futex);
    1187                 fibril_switch(FIBRIL_FROM_MANAGER);
    1188 
    1189                 /*
    1190                  * The switch only returns when there is no non-manager fibril
    1191                  * it can run.
    1192                  */
    1193 
    1194                 unsigned int flags = SYNCH_FLAGS_NONE;
    1195                 suseconds_t next_timeout = handle_expired_timeouts(&flags);
    1196                 futex_unlock(&async_futex);
    1197 
    1198                 atomic_inc(&threads_in_ipc_wait);
    1199 
    1200                 ipc_call_t call;
    1201                 errno_t rc = ipc_wait(&call, next_timeout, flags);
    1202 
    1203                 atomic_dec(&threads_in_ipc_wait);
    1204 
     1095                rc = fibril_ipc_wait(&call, NULL);
    12051096                if (rc == EOK)
    12061097                        handle_call(&call);
     
    12251116
    12261117/** Add one manager to manager list. */
    1227 void async_create_manager(void)
     1118fid_t async_create_manager(void)
    12281119{
    12291120        fid_t fid = fibril_create_generic(async_manager_fibril, NULL, PAGE_SIZE);
    1230         if (fid != 0)
    1231                 fibril_add_manager(fid);
    1232 }
    1233 
    1234 /** Remove one manager from manager list */
    1235 void async_destroy_manager(void)
    1236 {
    1237         fibril_remove_manager();
     1121        fibril_start(fid);
     1122        return fid;
    12381123}
    12391124
     
    12521137            &notification_hash_table_ops))
    12531138                abort();
     1139
     1140        async_create_manager();
    12541141}
    12551142
     
    13421229
    13431230        return EOK;
    1344 }
    1345 
    1346 /** Interrupt one thread of this task from waiting for IPC. */
    1347 void async_poke(void)
    1348 {
    1349         if (atomic_get(&threads_in_ipc_wait) > 0)
    1350                 ipc_poke();
    13511231}
    13521232
     
    18341714__noreturn void async_manager(void)
    18351715{
    1836         futex_lock(&async_futex);
    1837         fibril_switch(FIBRIL_FROM_DEAD);
     1716        fibril_event_t ever = FIBRIL_EVENT_INIT;
     1717        fibril_wait_for(&ever);
    18381718        __builtin_unreachable();
    18391719}
Note: See TracChangeset for help on using the changeset viewer.