Changeset 26e7d6d in mainline for uspace/lib/c/generic/async.c


Ignore:
Timestamp:
2011-09-19T16:31:00Z (13 years ago)
Author:
Vojtech Horky <vojtechhorky@…>
Branches:
lfn, master, serial, ticket/834-toolchain-update, topic/msim-upgrade, topic/simplify-dev-export
Children:
a347a11
Parents:
3842a955 (diff), 086290d (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the (diff) links above to see all the changes relative to each parent.
Message:

Merge mainline changes

File:
1 edited

Legend:

Unmodified
Added
Removed
  • uspace/lib/c/generic/async.c

    r3842a955 r26e7d6d  
    9898#include <ipc/ipc.h>
    9999#include <async.h>
     100#include "private/async.h"
    100101#undef LIBC_ASYNC_C_
    101102
     
    107108#include <errno.h>
    108109#include <sys/time.h>
    109 #include <arch/barrier.h>
     110#include <libarch/barrier.h>
    110111#include <bool.h>
    111112#include <malloc.h>
    112113#include <mem.h>
    113114#include <stdlib.h>
    114 #include "private/async.h"
     115#include <macros.h>
    115116
    116117#define CLIENT_HASH_TABLE_BUCKETS  32
    117118#define CONN_HASH_TABLE_BUCKETS    32
     119
     120/** Session data */
     121struct async_sess {
     122        /** List of inactive exchanges */
     123        list_t exch_list;
     124       
     125        /** Exchange management style */
     126        exch_mgmt_t mgmt;
     127       
     128        /** Session identification */
     129        int phone;
     130       
     131        /** First clone connection argument */
     132        sysarg_t arg1;
     133       
     134        /** Second clone connection argument */
     135        sysarg_t arg2;
     136       
     137        /** Third clone connection argument */
     138        sysarg_t arg3;
     139       
     140        /** Exchange mutex */
     141        fibril_mutex_t mutex;
     142       
     143        /** Number of opened exchanges */
     144        atomic_t refcnt;
     145       
     146        /** Mutex for stateful connections */
     147        fibril_mutex_t remote_state_mtx;
     148       
     149        /** Data for stateful connections */
     150        void *remote_state_data;
     151};
     152
     153/** Exchange data */
     154struct async_exch {
     155        /** Link into list of inactive exchanges */
     156        link_t sess_link;
     157       
     158        /** Link into global list of inactive exchanges */
     159        link_t global_link;
     160       
     161        /** Session pointer */
     162        async_sess_t *sess;
     163       
     164        /** Exchange identification */
     165        int phone;
     166};
    118167
    119168/** Async framework global futex */
     
    134183} msg_t;
    135184
     185/** Message data */
     186typedef struct {
     187        awaiter_t wdata;
     188       
     189        /** If reply was received. */
     190        bool done;
     191       
     192        /** Pointer to where the answer data is stored. */
     193        ipc_call_t *dataptr;
     194       
     195        sysarg_t retval;
     196} amsg_t;
     197
    136198/* Client connection data */
    137199typedef struct {
    138200        link_t link;
    139201       
    140         sysarg_t in_task_hash;
     202        task_id_t in_task_id;
    141203        atomic_t refcnt;
    142204        void *data;
     
    150212        link_t link;
    151213       
    152         /** Incoming client task hash. */
    153         sysarg_t in_task_hash;
     214        /** Incoming client task ID. */
     215        task_id_t in_task_id;
    154216       
    155217        /** Incoming phone hash. */
     
    203265}
    204266
    205 void *async_get_client_data(void)
    206 {
    207         assert(fibril_connection);
    208         return fibril_connection->client->data;
    209 }
    210 
    211267/** Default fibril function that gets called to handle new connection.
    212268 *
    213269 * This function is defined as a weak symbol - to be redefined in user code.
    214270 *
    215  * @param callid        Hash of the incoming call.
    216  * @param call          Data of the incoming call.
    217  * @param arg           Local argument
     271 * @param callid Hash of the incoming call.
     272 * @param call   Data of the incoming call.
     273 * @param arg    Local argument
    218274 *
    219275 */
     
    228284 * This function is defined as a weak symbol - to be redefined in user code.
    229285 *
    230  * @param callid        Hash of the incoming call.
    231  * @param call          Data of the incoming call.
    232  * @param arg           Local argument.
     286 * @param callid Hash of the incoming call.
     287 * @param call   Data of the incoming call.
     288 * @param arg    Local argument.
    233289 *
    234290 */
     
    289345{
    290346        assert(key);
     347        assert(keys == 2);
    291348        assert(item);
    292349       
    293350        client_t *client = hash_table_get_instance(item, client_t, link);
    294         return (key[0] == client->in_task_hash);
     351        return (key[0] == LOWER32(client->in_task_id) &&
     352            (key[1] == UPPER32(client->in_task_id)));
    295353}
    296354
     
    580638}
    581639
     640static client_t *async_client_get(task_id_t client_id, bool create)
     641{
     642        unsigned long key[2] = {
     643                LOWER32(client_id),
     644                UPPER32(client_id),
     645        };
     646        client_t *client = NULL;
     647
     648        futex_down(&async_futex);
     649        link_t *lnk = hash_table_find(&client_hash_table, key);
     650        if (lnk) {
     651                client = hash_table_get_instance(lnk, client_t, link);
     652                atomic_inc(&client->refcnt);
     653        } else if (create) {
     654                client = malloc(sizeof(client_t));
     655                if (client) {
     656                        client->in_task_id = client_id;
     657                        client->data = async_client_data_create();
     658               
     659                        atomic_set(&client->refcnt, 1);
     660                        hash_table_insert(&client_hash_table, key, &client->link);
     661                }
     662        }
     663
     664        futex_up(&async_futex);
     665        return client;
     666}
     667
     668static void async_client_put(client_t *client)
     669{
     670        bool destroy;
     671        unsigned long key[2] = {
     672                LOWER32(client->in_task_id),
     673                UPPER32(client->in_task_id)
     674        };
     675       
     676        futex_down(&async_futex);
     677       
     678        if (atomic_predec(&client->refcnt) == 0) {
     679                hash_table_remove(&client_hash_table, key, 2);
     680                destroy = true;
     681        } else
     682                destroy = false;
     683       
     684        futex_up(&async_futex);
     685       
     686        if (destroy) {
     687                if (client->data)
     688                        async_client_data_destroy(client->data);
     689               
     690                free(client);
     691        }
     692}
     693
     694void *async_get_client_data(void)
     695{
     696        assert(fibril_connection);
     697        return fibril_connection->client->data;
     698}
     699
     700void *async_get_client_data_by_id(task_id_t client_id)
     701{
     702        client_t *client = async_client_get(client_id, false);
     703        if (!client)
     704                return NULL;
     705        if (!client->data) {
     706                async_client_put(client);
     707                return NULL;
     708        }
     709
     710        return client->data;
     711}
     712
     713void async_put_client_data_by_id(task_id_t client_id)
     714{
     715        client_t *client = async_client_get(client_id, false);
     716
     717        assert(client);
     718        assert(client->data);
     719
     720        /* Drop the reference we got in async_get_client_data_by_hash(). */
     721        async_client_put(client);
     722
     723        /* Drop our own reference we got at the beginning of this function. */
     724        async_client_put(client);
     725}
     726
    582727/** Wrapper for client connection fibril.
    583728 *
     
    598743         */
    599744        fibril_connection = (connection_t *) arg;
    600        
    601         futex_down(&async_futex);
    602745       
    603746        /*
     
    606749         * hash in a new tracking structure.
    607750         */
    608        
    609         unsigned long key = fibril_connection->in_task_hash;
    610         link_t *lnk = hash_table_find(&client_hash_table, &key);
    611        
    612         client_t *client;
    613        
    614         if (lnk) {
    615                 client = hash_table_get_instance(lnk, client_t, link);
    616                 atomic_inc(&client->refcnt);
    617         } else {
    618                 client = malloc(sizeof(client_t));
    619                 if (!client) {
    620                         ipc_answer_0(fibril_connection->callid, ENOMEM);
    621                         futex_up(&async_futex);
    622                         return 0;
    623                 }
    624                
    625                 client->in_task_hash = fibril_connection->in_task_hash;
    626                 client->data = async_client_data_create();
    627                
    628                 atomic_set(&client->refcnt, 1);
    629                 hash_table_insert(&client_hash_table, &key, &client->link);
    630         }
    631        
    632         futex_up(&async_futex);
    633        
     751
     752        client_t *client = async_client_get(fibril_connection->in_task_id, true);
     753        if (!client) {
     754                ipc_answer_0(fibril_connection->callid, ENOMEM);
     755                return 0;
     756        }
     757
    634758        fibril_connection->client = client;
    635759       
     
    643767         * Remove the reference for this client task connection.
    644768         */
    645         bool destroy;
    646        
    647         futex_down(&async_futex);
    648        
    649         if (atomic_predec(&client->refcnt) == 0) {
    650                 hash_table_remove(&client_hash_table, &key, 1);
    651                 destroy = true;
    652         } else
    653                 destroy = false;
    654        
    655         futex_up(&async_futex);
    656        
    657         if (destroy) {
    658                 if (client->data)
    659                         async_client_data_destroy(client->data);
    660                
    661                 free(client);
    662         }
     769        async_client_put(client);
    663770       
    664771        /*
     
    666773         */
    667774        futex_down(&async_futex);
    668         key = fibril_connection->in_phone_hash;
     775        unsigned long key = fibril_connection->in_phone_hash;
    669776        hash_table_remove(&conn_hash_table, &key, 1);
    670777        futex_up(&async_futex);
     
    700807 * particular fibrils.
    701808 *
    702  * @param in_task_hash  Identification of the incoming connection.
     809 * @param in_task_id    Identification of the incoming connection.
    703810 * @param in_phone_hash Identification of the incoming connection.
    704811 * @param callid        Hash of the opening IPC_M_CONNECT_ME_TO call.
     
    709816 * @param cfibril       Fibril function that should be called upon opening the
    710817 *                      connection.
    711  * @param carg          Extra argument to pass to the connection fibril
     818 * @param carg          Extra argument to pass to the connection fibril
    712819 *
    713820 * @return New fibril id or NULL on failure.
    714821 *
    715822 */
    716 fid_t async_new_connection(sysarg_t in_task_hash, sysarg_t in_phone_hash,
     823fid_t async_new_connection(task_id_t in_task_id, sysarg_t in_phone_hash,
    717824    ipc_callid_t callid, ipc_call_t *call,
    718825    async_client_conn_t cfibril, void *carg)
     
    726833        }
    727834       
    728         conn->in_task_hash = in_task_hash;
     835        conn->in_task_id = in_task_id;
    729836        conn->in_phone_hash = in_phone_hash;
    730837        list_initialize(&conn->msg_queue);
     
    785892        case IPC_M_CONNECT_ME_TO:
    786893                /* Open new connection with fibril, etc. */
    787                 async_new_connection(call->in_task_hash, IPC_GET_ARG5(*call),
     894                async_new_connection(call->in_task_id, IPC_GET_ARG5(*call),
    788895                    callid, call, client_connection, NULL);
    789896                return;
     
    9331040{
    9341041        if (!hash_table_create(&client_hash_table, CLIENT_HASH_TABLE_BUCKETS,
    935             1, &client_hash_table_ops))
     1042            2, &client_hash_table_ops))
    9361043                abort();
    9371044       
     
    9491056        session_ns->arg2 = 0;
    9501057        session_ns->arg3 = 0;
     1058       
     1059        fibril_mutex_initialize(&session_ns->remote_state_mtx);
     1060        session_ns->remote_state_data = NULL;
    9511061       
    9521062        list_initialize(&session_ns->exch_list);
     
    14261536                return ENOENT;
    14271537       
    1428         sysarg_t task_hash;
    14291538        sysarg_t phone_hash;
    1430         int rc = async_req_3_5(exch, IPC_M_CONNECT_TO_ME, arg1, arg2, arg3,
    1431             NULL, NULL, NULL, &task_hash, &phone_hash);
     1539        sysarg_t rc;
     1540
     1541        aid_t req;
     1542        ipc_call_t answer;
     1543        req = async_send_3(exch, IPC_M_CONNECT_TO_ME, arg1, arg2, arg3,
     1544            &answer);
     1545        async_wait_for(req, &rc);
    14321546        if (rc != EOK)
    1433                 return rc;
    1434        
     1547                return (int) rc;
     1548
     1549        phone_hash = IPC_GET_ARG5(answer);
     1550
    14351551        if (client_receiver != NULL)
    1436                 async_new_connection(task_hash, phone_hash, 0, NULL,
     1552                async_new_connection(answer.in_task_id, phone_hash, 0, NULL,
    14371553                    client_receiver, carg);
    14381554       
     
    15091625        sess->arg3 = 0;
    15101626       
     1627        fibril_mutex_initialize(&sess->remote_state_mtx);
     1628        sess->remote_state_data = NULL;
     1629       
    15111630        list_initialize(&sess->exch_list);
    15121631        fibril_mutex_initialize(&sess->mutex);
     
    15901709        sess->arg3 = arg3;
    15911710       
     1711        fibril_mutex_initialize(&sess->remote_state_mtx);
     1712        sess->remote_state_data = NULL;
     1713       
    15921714        list_initialize(&sess->exch_list);
    15931715        fibril_mutex_initialize(&sess->mutex);
     
    15951717       
    15961718        return sess;
     1719}
     1720
     1721/** Set arguments for new connections.
     1722 *
     1723 * FIXME This is an ugly hack to work around the problem that parallel
     1724 * exchanges are implemented using parallel connections. When we create
     1725 * a callback session, the framework does not know arguments for the new
     1726 * connections.
     1727 *
     1728 * The proper solution seems to be to implement parallel exchanges using
     1729 * tagging.
     1730 */
     1731void async_sess_args_set(async_sess_t *sess, sysarg_t arg1, sysarg_t arg2,
     1732    sysarg_t arg3)
     1733{
     1734        sess->arg1 = arg1;
     1735        sess->arg2 = arg2;
     1736        sess->arg3 = arg3;
    15971737}
    15981738
     
    16401780        sess->arg3 = arg3;
    16411781       
     1782        fibril_mutex_initialize(&sess->remote_state_mtx);
     1783        sess->remote_state_data = NULL;
     1784       
    16421785        list_initialize(&sess->exch_list);
    16431786        fibril_mutex_initialize(&sess->mutex);
     
    16711814        sess->arg3 = 0;
    16721815       
     1816        fibril_mutex_initialize(&sess->remote_state_mtx);
     1817        sess->remote_state_data = NULL;
     1818       
    16731819        list_initialize(&sess->exch_list);
    16741820        fibril_mutex_initialize(&sess->mutex);
     
    16921838int async_hangup(async_sess_t *sess)
    16931839{
     1840        async_exch_t *exch;
     1841       
    16941842        assert(sess);
    16951843       
    16961844        if (atomic_get(&sess->refcnt) > 0)
    16971845                return EBUSY;
     1846       
     1847        fibril_mutex_lock(&async_sess_mutex);
    16981848       
    16991849        int rc = async_hangup_internal(sess->phone);
    17001850        if (rc == EOK)
    17011851                free(sess);
     1852       
     1853        while (!list_empty(&sess->exch_list)) {
     1854                exch = (async_exch_t *)
     1855                    list_get_instance(list_first(&sess->exch_list),
     1856                    async_exch_t, sess_link);
     1857               
     1858                list_remove(&exch->sess_link);
     1859                list_remove(&exch->global_link);
     1860                async_hangup_internal(exch->phone);
     1861                free(exch);
     1862        }
     1863       
     1864        fibril_mutex_unlock(&async_sess_mutex);
    17021865       
    17031866        return rc;
     
    23342497        sess->arg3 = 0;
    23352498       
     2499        fibril_mutex_initialize(&sess->remote_state_mtx);
     2500        sess->remote_state_data = NULL;
     2501       
    23362502        list_initialize(&sess->exch_list);
    23372503        fibril_mutex_initialize(&sess->mutex);
     
    23802546        sess->arg3 = 0;
    23812547       
     2548        fibril_mutex_initialize(&sess->remote_state_mtx);
     2549        sess->remote_state_data = NULL;
     2550       
    23822551        list_initialize(&sess->exch_list);
    23832552        fibril_mutex_initialize(&sess->mutex);
     
    24222591        sess->arg3 = 0;
    24232592       
     2593        fibril_mutex_initialize(&sess->remote_state_mtx);
     2594        sess->remote_state_data = NULL;
     2595       
    24242596        list_initialize(&sess->exch_list);
    24252597        fibril_mutex_initialize(&sess->mutex);
     
    24292601}
    24302602
     2603int async_state_change_start(async_exch_t *exch, sysarg_t arg1, sysarg_t arg2,
     2604    sysarg_t arg3, async_exch_t *other_exch)
     2605{
     2606        return async_req_5_0(exch, IPC_M_STATE_CHANGE_AUTHORIZE,
     2607            arg1, arg2, arg3, 0, other_exch->phone);
     2608}
     2609
     2610bool async_state_change_receive(ipc_callid_t *callid, sysarg_t *arg1,
     2611    sysarg_t *arg2, sysarg_t *arg3)
     2612{
     2613        assert(callid);
     2614
     2615        ipc_call_t call;
     2616        *callid = async_get_call(&call);
     2617
     2618        if (IPC_GET_IMETHOD(call) != IPC_M_STATE_CHANGE_AUTHORIZE)
     2619                return false;
     2620       
     2621        if (arg1)
     2622                *arg1 = IPC_GET_ARG1(call);
     2623        if (arg2)
     2624                *arg2 = IPC_GET_ARG2(call);
     2625        if (arg3)
     2626                *arg3 = IPC_GET_ARG3(call);
     2627
     2628        return true;
     2629}
     2630
     2631int async_state_change_finalize(ipc_callid_t callid, async_exch_t *other_exch)
     2632{
     2633        return ipc_answer_1(callid, EOK, other_exch->phone);
     2634}
     2635
     2636/** Lock and get session remote state
     2637 *
     2638 * Lock and get the local replica of the remote state
     2639 * in stateful sessions. The call should be paired
     2640 * with async_remote_state_release*().
     2641 *
     2642 * @param[in] sess Stateful session.
     2643 *
     2644 * @return Local replica of the remote state.
     2645 *
     2646 */
     2647void *async_remote_state_acquire(async_sess_t *sess)
     2648{
     2649        fibril_mutex_lock(&sess->remote_state_mtx);
     2650        return sess->remote_state_data;
     2651}
     2652
     2653/** Update the session remote state
     2654 *
     2655 * Update the local replica of the remote state
     2656 * in stateful sessions. The remote state must
     2657 * be already locked.
     2658 *
     2659 * @param[in] sess  Stateful session.
     2660 * @param[in] state New local replica of the remote state.
     2661 *
     2662 */
     2663void async_remote_state_update(async_sess_t *sess, void *state)
     2664{
     2665        assert(fibril_mutex_is_locked(&sess->remote_state_mtx));
     2666        sess->remote_state_data = state;
     2667}
     2668
     2669/** Release the session remote state
     2670 *
     2671 * Unlock the local replica of the remote state
     2672 * in stateful sessions.
     2673 *
     2674 * @param[in] sess Stateful session.
     2675 *
     2676 */
     2677void async_remote_state_release(async_sess_t *sess)
     2678{
     2679        assert(fibril_mutex_is_locked(&sess->remote_state_mtx));
     2680       
     2681        fibril_mutex_unlock(&sess->remote_state_mtx);
     2682}
     2683
     2684/** Release the session remote state and end an exchange
     2685 *
     2686 * Unlock the local replica of the remote state
     2687 * in stateful sessions. This is convenience function
     2688 * which gets the session pointer from the exchange
     2689 * and also ends the exchange.
     2690 *
     2691 * @param[in] exch Stateful session's exchange.
     2692 *
     2693 */
     2694void async_remote_state_release_exchange(async_exch_t *exch)
     2695{
     2696        if (exch == NULL)
     2697                return;
     2698       
     2699        async_sess_t *sess = exch->sess;
     2700        assert(fibril_mutex_is_locked(&sess->remote_state_mtx));
     2701       
     2702        async_exchange_end(exch);
     2703        fibril_mutex_unlock(&sess->remote_state_mtx);
     2704}
     2705
    24312706/** @}
    24322707 */
Note: See TracChangeset for help on using the changeset viewer.