Changeset 566992e1 in mainline for uspace/lib/c/generic/async.c


Ignore:
Timestamp:
2015-08-22T05:01:24Z (9 years ago)
Author:
Martin Decky <martin@…>
Branches:
lfn, master, serial, ticket/834-toolchain-update, topic/msim-upgrade, topic/simplify-dev-export
Children:
d9e68d0
Parents:
57dea62
Message:

extremely rudimentary support for interfaces and ports
(does not do much, but it is backward and forward compatible)

File:
1 edited

Legend:

Unmodified
Added
Removed
  • uspace/lib/c/generic/async.c

    r57dea62 r566992e1  
    123123        list_t exch_list;
    124124       
     125        /** Session interface */
     126        iface_t iface;
     127       
    125128        /** Exchange management style */
    126129        exch_mgmt_t mgmt;
     
    245248        void *data;
    246249} connection_t;
     250
     251/** Interface data */
     252typedef struct {
     253        ht_link_t link;
     254       
     255        /** Interface ID */
     256        iface_t iface;
     257       
     258        /** Futex protecting the hash table */
     259        futex_t futex;
     260       
     261        /** Interface ports */
     262        hash_table_t port_hash_table;
     263       
     264        /** Next available port ID */
     265        port_id_t port_id_avail;
     266} interface_t;
     267
     268/* Port data */
     269typedef struct {
     270        ht_link_t link;
     271       
     272        /** Port ID */
     273        port_id_t id;
     274       
     275        /** Port connection handler */
     276        async_port_handler_t handler;
     277       
     278        /** Client data */
     279        void *data;
     280} port_t;
    247281
    248282/* Notification data */
     
    355389static void *fallback_port_data = NULL;
    356390
     391static hash_table_t interface_hash_table;
     392
     393static size_t interface_key_hash(void *key)
     394{
     395        iface_t iface = *(iface_t *) key;
     396        return iface;
     397}
     398
     399static size_t interface_hash(const ht_link_t *item)
     400{
     401        interface_t *interface = hash_table_get_inst(item, interface_t, link);
     402        return interface_key_hash(&interface->iface);
     403}
     404
     405static bool interface_key_equal(void *key, const ht_link_t *item)
     406{
     407        iface_t iface = *(iface_t *) key;
     408        interface_t *interface = hash_table_get_inst(item, interface_t, link);
     409        return iface == interface->iface;
     410}
     411
     412/** Operations for the port hash table. */
     413static hash_table_ops_t interface_hash_table_ops = {
     414        .hash = interface_hash,
     415        .key_hash = interface_key_hash,
     416        .key_equal = interface_key_equal,
     417        .equal = NULL,
     418        .remove_callback = NULL
     419};
     420
     421static size_t port_key_hash(void *key)
     422{
     423        port_id_t port_id = *(port_id_t *) key;
     424        return port_id;
     425}
     426
     427static size_t port_hash(const ht_link_t *item)
     428{
     429        port_t *port = hash_table_get_inst(item, port_t, link);
     430        return port_key_hash(&port->id);
     431}
     432
     433static bool port_key_equal(void *key, const ht_link_t *item)
     434{
     435        port_id_t port_id = *(port_id_t *) key;
     436        port_t *port = hash_table_get_inst(item, port_t, link);
     437        return port_id == port->id;
     438}
     439
     440/** Operations for the port hash table. */
     441static hash_table_ops_t port_hash_table_ops = {
     442        .hash = port_hash,
     443        .key_hash = port_key_hash,
     444        .key_equal = port_key_equal,
     445        .equal = NULL,
     446        .remove_callback = NULL
     447};
     448
     449static interface_t *async_new_interface(iface_t iface)
     450{
     451        interface_t *interface =
     452            (interface_t *) malloc(sizeof(interface_t));
     453        if (!interface)
     454                return NULL;
     455       
     456        bool ret = hash_table_create(&interface->port_hash_table, 0, 0,
     457            &port_hash_table_ops);
     458        if (!ret) {
     459                free(interface);
     460                return NULL;
     461        }
     462       
     463        interface->iface = iface;
     464        futex_initialize(&interface->futex, 1);
     465        interface->port_id_avail = 0;
     466       
     467        hash_table_insert(&interface_hash_table, &interface->link);
     468       
     469        return interface;
     470}
     471
     472static port_t *async_new_port(interface_t *interface,
     473    async_port_handler_t handler, void *data)
     474{
     475        port_t *port = (port_t *) malloc(sizeof(port_t));
     476        if (!port)
     477                return NULL;
     478       
     479        futex_down(&interface->futex);
     480       
     481        port_id_t id = interface->port_id_avail;
     482        interface->port_id_avail++;
     483       
     484        port->id = id;
     485        port->handler = handler;
     486        port->data = data;
     487       
     488        hash_table_insert(&interface->port_hash_table, &port->link);
     489       
     490        futex_up(&interface->futex);
     491       
     492        return port;
     493}
     494
    357495static size_t notification_handler_stksz = FIBRIL_DFLT_STK_SIZE;
    358496
     
    380518 */
    381519static FIBRIL_CONDVAR_INITIALIZE(avail_phone_cv);
     520
     521int async_create_port(iface_t iface, async_port_handler_t handler,
     522    void *data, port_id_t *port_id)
     523{
     524        if ((iface & IFACE_MOD_MASK) == IFACE_MOD_CALLBACK)
     525                return EINVAL;
     526       
     527        interface_t *interface;
     528       
     529        futex_down(&async_futex);
     530       
     531        ht_link_t *link = hash_table_find(&interface_hash_table, &iface);
     532        if (link)
     533                interface = hash_table_get_inst(link, interface_t, link);
     534        else
     535                interface = async_new_interface(iface);
     536       
     537        if (!interface) {
     538                futex_up(&async_futex);
     539                return ENOMEM;
     540        }
     541       
     542        port_t *port = async_new_port(interface, handler, data);
     543        if (!port) {
     544                futex_up(&async_futex);
     545                return ENOMEM;
     546        }
     547       
     548        *port_id = port->id;
     549       
     550        futex_up(&async_futex);
     551       
     552        return EOK;
     553}
    382554
    383555void async_set_fallback_port_handler(async_port_handler_t handler, void *data)
     
    9591131}
    9601132
     1133static port_t *async_find_port(iface_t iface, port_id_t port_id)
     1134{
     1135        port_t *port = NULL;
     1136       
     1137        futex_down(&async_futex);
     1138       
     1139        ht_link_t *link = hash_table_find(&interface_hash_table, &iface);
     1140        if (link) {
     1141                interface_t *interface =
     1142                    hash_table_get_inst(link, interface_t, link);
     1143               
     1144                link = hash_table_find(&interface->port_hash_table, &port_id);
     1145                if (link)
     1146                        port = hash_table_get_inst(link, port_t, link);
     1147        }
     1148       
     1149        futex_up(&async_futex);
     1150       
     1151        return port;
     1152}
     1153
    9611154/** Wrapper for client connection fibril.
    9621155 *
     
    11201313        }
    11211314       
    1122         switch (IPC_GET_IMETHOD(*call)) {
    1123         case IPC_M_CLONE_ESTABLISH:
    1124         case IPC_M_CONNECT_ME_TO:
     1315        /* New connection */
     1316        if (IPC_GET_IMETHOD(*call) == IPC_M_CONNECT_ME_TO) {
     1317                iface_t iface = (iface_t) IPC_GET_ARG1(*call);
     1318                sysarg_t in_phone_hash = IPC_GET_ARG5(*call);
     1319               
     1320                async_notification_handler_t handler = fallback_port_handler;
     1321                void *data = fallback_port_data;
     1322               
     1323                // TODO: Currently ignores all ports but the first one
     1324                port_t *port = async_find_port(iface, 0);
     1325                if (port) {
     1326                        handler = port->handler;
     1327                        data = port->data;
     1328                }
     1329               
     1330                async_new_connection(call->in_task_id, in_phone_hash, callid,
     1331                    call, handler, data);
     1332                return;
     1333        }
     1334       
     1335        /* Cloned connection */
     1336        if (IPC_GET_IMETHOD(*call) == IPC_M_CLONE_ESTABLISH) {
     1337                // TODO: Currently ignores ports altogether
     1338               
    11251339                /* Open new connection with fibril, etc. */
    11261340                async_new_connection(call->in_task_id, IPC_GET_ARG5(*call),
     
    12861500void __async_init(void)
    12871501{
     1502        if (!hash_table_create(&interface_hash_table, 0, 0,
     1503            &interface_hash_table_ops))
     1504                abort();
     1505       
    12881506        if (!hash_table_create(&client_hash_table, 0, 0, &client_hash_table_ops))
    12891507                abort();
     
    13001518                abort();
    13011519       
     1520        session_ns->iface = 0;
    13021521        session_ns->mgmt = EXCHANGE_ATOMIC;
    13031522        session_ns->phone = PHONE_NS;
     
    19052124        }
    19062125       
     2126        sess->iface = 0;
    19072127        sess->mgmt = mgmt;
    19082128        sess->phone = phone;
     
    19812201        }
    19822202       
     2203        sess->iface = 0;
    19832204        sess->mgmt = mgmt;
    19842205        sess->phone = phone;
     
    20522273        }
    20532274       
     2275        sess->iface = 0;
    20542276        sess->mgmt = mgmt;
    20552277        sess->phone = phone;
     
    20682290}
    20692291
     2292/** Wrapper for making IPC_M_CONNECT_ME_TO calls using the async framework.
     2293 *
     2294 * Ask through phone for a new connection to some service and block until
     2295 * success.
     2296 *
     2297 * @param exch  Exchange for sending the message.
     2298 * @param iface Connection interface.
     2299 * @param arg2  User defined argument.
     2300 * @param arg3  User defined argument.
     2301 *
     2302 * @return New session on success or NULL on error.
     2303 *
     2304 */
     2305async_sess_t *async_connect_me_to_blocking_iface(async_exch_t *exch, iface_t iface,
     2306    sysarg_t arg2, sysarg_t arg3)
     2307{
     2308        if (exch == NULL) {
     2309                errno = ENOENT;
     2310                return NULL;
     2311        }
     2312       
     2313        async_sess_t *sess = (async_sess_t *) malloc(sizeof(async_sess_t));
     2314        if (sess == NULL) {
     2315                errno = ENOMEM;
     2316                return NULL;
     2317        }
     2318       
     2319        int phone = async_connect_me_to_internal(exch->phone, iface, arg2,
     2320            arg3, IPC_FLAG_BLOCKING);
     2321        if (phone < 0) {
     2322                errno = phone;
     2323                free(sess);
     2324                return NULL;
     2325        }
     2326       
     2327        sess->iface = iface;
     2328        sess->phone = phone;
     2329        sess->arg1 = iface;
     2330        sess->arg2 = arg2;
     2331        sess->arg3 = arg3;
     2332       
     2333        fibril_mutex_initialize(&sess->remote_state_mtx);
     2334        sess->remote_state_data = NULL;
     2335       
     2336        list_initialize(&sess->exch_list);
     2337        fibril_mutex_initialize(&sess->mutex);
     2338        atomic_set(&sess->refcnt, 0);
     2339       
     2340        return sess;
     2341}
     2342
    20702343/** Connect to a task specified by id.
    20712344 *
     
    20862359        }
    20872360       
     2361        sess->iface = 0;
    20882362        sess->mgmt = EXCHANGE_ATOMIC;
    20892363        sess->phone = phone;
     
    21632437                return NULL;
    21642438       
    2165         async_exch_t *exch;
     2439        exch_mgmt_t mgmt = sess->mgmt;
     2440        if (sess->iface != 0)
     2441                mgmt = sess->iface & IFACE_EXCHANGE_MASK;
     2442       
     2443        async_exch_t *exch = NULL;
    21662444       
    21672445        fibril_mutex_lock(&async_sess_mutex);
     
    21822460                 */
    21832461               
    2184                 if ((sess->mgmt == EXCHANGE_ATOMIC) ||
    2185                     (sess->mgmt == EXCHANGE_SERIALIZE)) {
     2462                if ((mgmt == EXCHANGE_ATOMIC) ||
     2463                    (mgmt == EXCHANGE_SERIALIZE)) {
    21862464                        exch = (async_exch_t *) malloc(sizeof(async_exch_t));
    21872465                        if (exch != NULL) {
     
    21912469                                exch->phone = sess->phone;
    21922470                        }
    2193                 } else {  /* EXCHANGE_PARALLEL */
     2471                } else if (mgmt == EXCHANGE_PARALLEL) {
     2472                        int phone;
     2473                       
     2474                retry:
    21942475                        /*
    21952476                         * Make a one-time attempt to connect a new data phone.
    21962477                         */
    2197                        
    2198                         int phone;
    2199                        
    2200 retry:
    22012478                        phone = async_connect_me_to_internal(sess->phone, sess->arg1,
    22022479                            sess->arg2, sess->arg3, 0);
     
    22402517                atomic_inc(&sess->refcnt);
    22412518               
    2242                 if (sess->mgmt == EXCHANGE_SERIALIZE)
     2519                if (mgmt == EXCHANGE_SERIALIZE)
    22432520                        fibril_mutex_lock(&sess->mutex);
    22442521        }
     
    22602537        assert(sess != NULL);
    22612538       
     2539        exch_mgmt_t mgmt = sess->mgmt;
     2540        if (sess->iface != 0)
     2541                mgmt = sess->iface & IFACE_EXCHANGE_MASK;
     2542       
    22622543        atomic_dec(&sess->refcnt);
    22632544       
    2264         if (sess->mgmt == EXCHANGE_SERIALIZE)
     2545        if (mgmt == EXCHANGE_SERIALIZE)
    22652546                fibril_mutex_unlock(&sess->mutex);
    22662547       
     
    28183099        }
    28193100       
     3101        sess->iface = 0;
    28203102        sess->mgmt = mgmt;
    28213103        sess->phone = phone;
     
    28673149        }
    28683150       
     3151        sess->iface = 0;
    28693152        sess->mgmt = mgmt;
    28703153        sess->phone = phone;
     
    29123195                return NULL;
    29133196       
     3197        sess->iface = 0;
    29143198        sess->mgmt = mgmt;
    29153199        sess->phone = phone;
Note: See TracChangeset for help on using the changeset viewer.