Changeset ff381a7 in mainline for uspace/lib/c/generic/async.c


Ignore:
Timestamp:
2015-11-02T20:54:19Z (8 years ago)
Author:
Jiri Svoboda <jiri@…>
Branches:
lfn, master, serial, ticket/834-toolchain-update, topic/msim-upgrade, topic/simplify-dev-export
Children:
d8513177
Parents:
3feeab2 (diff), 5265eea4 (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the (diff) links above to see all the changes relative to each parent.
Message:

Merge mainline changes.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • uspace/lib/c/generic/async.c

    r3feeab2 rff381a7  
    7777 *   }
    7878 *
    79  *   my_client_connection(icallid, *icall)
     79 *   port_handler(icallid, *icall)
    8080 *   {
    8181 *     if (want_refuse) {
     
    123123        list_t exch_list;
    124124       
     125        /** Session interface */
     126        iface_t iface;
     127       
    125128        /** Exchange management style */
    126129        exch_mgmt_t mgmt;
     
    189192        /** If reply was received. */
    190193        bool done;
    191 
     194       
    192195        /** If the message / reply should be discarded on arrival. */
    193196        bool forget;
    194 
     197       
    195198        /** If already destroyed. */
    196199        bool destroyed;
     
    232235        /** Identification of the opening call. */
    233236        ipc_callid_t callid;
     237       
    234238        /** Call data of the opening call. */
    235239        ipc_call_t call;
    236         /** Local argument or NULL if none. */
    237         void *carg;
    238240       
    239241        /** Identification of the closing call. */
     
    241243       
    242244        /** Fibril function that will be used to handle the connection. */
    243         async_client_conn_t cfibril;
     245        async_port_handler_t handler;
     246       
     247        /** Client data */
     248        void *data;
    244249} connection_t;
     250
     251/** Interface data */
     252typedef struct {
     253        ht_link_t link;
     254       
     255        /** Interface ID */
     256        iface_t iface;
     257       
     258        /** Futex protecting the hash table */
     259        futex_t futex;
     260       
     261        /** Interface ports */
     262        hash_table_t port_hash_table;
     263       
     264        /** Next available port ID */
     265        port_id_t port_id_avail;
     266} interface_t;
     267
     268/* Port data */
     269typedef struct {
     270        ht_link_t link;
     271       
     272        /** Port ID */
     273        port_id_t id;
     274       
     275        /** Port connection handler */
     276        async_port_handler_t handler;
     277       
     278        /** Client data */
     279        void *data;
     280} port_t;
    245281
    246282/* Notification data */
     
    264300{
    265301        struct timeval tv = { 0, 0 };
    266 
     302       
    267303        to->inlist = false;
    268304        to->occurred = false;
     
    287323static amsg_t *amsg_create(void)
    288324{
    289         amsg_t *msg;
    290 
    291         msg = malloc(sizeof(amsg_t));
     325        amsg_t *msg = malloc(sizeof(amsg_t));
    292326        if (msg) {
    293327                msg->done = false;
     
    298332                awaiter_initialize(&msg->wdata);
    299333        }
    300 
     334       
    301335        return msg;
    302336}
     
    335369}
    336370
    337 /** Default fibril function that gets called to handle new connection.
    338  *
    339  * This function is defined as a weak symbol - to be redefined in user code.
     371/** Default fallback fibril function.
     372 *
     373 * This fallback fibril function gets called on incomming
     374 * connections that do not have a specific handler defined.
    340375 *
    341376 * @param callid Hash of the incoming call.
     
    344379 *
    345380 */
    346 static void default_client_connection(ipc_callid_t callid, ipc_call_t *call,
     381static void default_fallback_port_handler(ipc_callid_t callid, ipc_call_t *call,
    347382    void *arg)
    348383{
     
    350385}
    351386
    352 static async_client_conn_t client_connection = default_client_connection;
     387static async_port_handler_t fallback_port_handler =
     388    default_fallback_port_handler;
     389static void *fallback_port_data = NULL;
     390
     391static hash_table_t interface_hash_table;
     392
     393static size_t interface_key_hash(void *key)
     394{
     395        iface_t iface = *(iface_t *) key;
     396        return iface;
     397}
     398
     399static size_t interface_hash(const ht_link_t *item)
     400{
     401        interface_t *interface = hash_table_get_inst(item, interface_t, link);
     402        return interface_key_hash(&interface->iface);
     403}
     404
     405static bool interface_key_equal(void *key, const ht_link_t *item)
     406{
     407        iface_t iface = *(iface_t *) key;
     408        interface_t *interface = hash_table_get_inst(item, interface_t, link);
     409        return iface == interface->iface;
     410}
     411
     412/** Operations for the port hash table. */
     413static hash_table_ops_t interface_hash_table_ops = {
     414        .hash = interface_hash,
     415        .key_hash = interface_key_hash,
     416        .key_equal = interface_key_equal,
     417        .equal = NULL,
     418        .remove_callback = NULL
     419};
     420
     421static size_t port_key_hash(void *key)
     422{
     423        port_id_t port_id = *(port_id_t *) key;
     424        return port_id;
     425}
     426
     427static size_t port_hash(const ht_link_t *item)
     428{
     429        port_t *port = hash_table_get_inst(item, port_t, link);
     430        return port_key_hash(&port->id);
     431}
     432
     433static bool port_key_equal(void *key, const ht_link_t *item)
     434{
     435        port_id_t port_id = *(port_id_t *) key;
     436        port_t *port = hash_table_get_inst(item, port_t, link);
     437        return port_id == port->id;
     438}
     439
     440/** Operations for the port hash table. */
     441static hash_table_ops_t port_hash_table_ops = {
     442        .hash = port_hash,
     443        .key_hash = port_key_hash,
     444        .key_equal = port_key_equal,
     445        .equal = NULL,
     446        .remove_callback = NULL
     447};
     448
     449static interface_t *async_new_interface(iface_t iface)
     450{
     451        interface_t *interface =
     452            (interface_t *) malloc(sizeof(interface_t));
     453        if (!interface)
     454                return NULL;
     455       
     456        bool ret = hash_table_create(&interface->port_hash_table, 0, 0,
     457            &port_hash_table_ops);
     458        if (!ret) {
     459                free(interface);
     460                return NULL;
     461        }
     462       
     463        interface->iface = iface;
     464        futex_initialize(&interface->futex, 1);
     465        interface->port_id_avail = 0;
     466       
     467        hash_table_insert(&interface_hash_table, &interface->link);
     468       
     469        return interface;
     470}
     471
     472static port_t *async_new_port(interface_t *interface,
     473    async_port_handler_t handler, void *data)
     474{
     475        port_t *port = (port_t *) malloc(sizeof(port_t));
     476        if (!port)
     477                return NULL;
     478       
     479        futex_down(&interface->futex);
     480       
     481        port_id_t id = interface->port_id_avail;
     482        interface->port_id_avail++;
     483       
     484        port->id = id;
     485        port->handler = handler;
     486        port->data = data;
     487       
     488        hash_table_insert(&interface->port_hash_table, &port->link);
     489       
     490        futex_up(&interface->futex);
     491       
     492        return port;
     493}
     494
    353495static size_t notification_handler_stksz = FIBRIL_DFLT_STK_SIZE;
    354496
    355 /** Setter for client_connection function pointer.
    356  *
    357  * @param conn Function that will implement a new connection fibril.
    358  *
    359  */
    360 void async_set_client_connection(async_client_conn_t conn)
    361 {
    362         assert(client_connection == default_client_connection);
    363         client_connection = conn;
    364 }
    365 
    366497/** Set the stack size for the notification handler notification fibrils.
    367498 *
     
    387518 */
    388519static FIBRIL_CONDVAR_INITIALIZE(avail_phone_cv);
     520
     521int async_create_port(iface_t iface, async_port_handler_t handler,
     522    void *data, port_id_t *port_id)
     523{
     524        if ((iface & IFACE_MOD_MASK) == IFACE_MOD_CALLBACK)
     525                return EINVAL;
     526       
     527        interface_t *interface;
     528       
     529        futex_down(&async_futex);
     530       
     531        ht_link_t *link = hash_table_find(&interface_hash_table, &iface);
     532        if (link)
     533                interface = hash_table_get_inst(link, interface_t, link);
     534        else
     535                interface = async_new_interface(iface);
     536       
     537        if (!interface) {
     538                futex_up(&async_futex);
     539                return ENOMEM;
     540        }
     541       
     542        port_t *port = async_new_port(interface, handler, data);
     543        if (!port) {
     544                futex_up(&async_futex);
     545                return ENOMEM;
     546        }
     547       
     548        *port_id = port->id;
     549       
     550        futex_up(&async_futex);
     551       
     552        return EOK;
     553}
     554
     555void async_set_fallback_port_handler(async_port_handler_t handler, void *data)
     556{
     557        assert(handler != NULL);
     558       
     559        fallback_port_handler = handler;
     560        fallback_port_data = data;
     561}
    389562
    390563static hash_table_t client_hash_table;
     
    457630        .remove_callback = NULL
    458631};
     632
     633static client_t *async_client_get(task_id_t client_id, bool create)
     634{
     635        client_t *client = NULL;
     636       
     637        futex_down(&async_futex);
     638        ht_link_t *link = hash_table_find(&client_hash_table, &client_id);
     639        if (link) {
     640                client = hash_table_get_inst(link, client_t, link);
     641                atomic_inc(&client->refcnt);
     642        } else if (create) {
     643                client = malloc(sizeof(client_t));
     644                if (client) {
     645                        client->in_task_id = client_id;
     646                        client->data = async_client_data_create();
     647                       
     648                        atomic_set(&client->refcnt, 1);
     649                        hash_table_insert(&client_hash_table, &client->link);
     650                }
     651        }
     652       
     653        futex_up(&async_futex);
     654        return client;
     655}
     656
     657static void async_client_put(client_t *client)
     658{
     659        bool destroy;
     660       
     661        futex_down(&async_futex);
     662       
     663        if (atomic_predec(&client->refcnt) == 0) {
     664                hash_table_remove(&client_hash_table, &client->in_task_id);
     665                destroy = true;
     666        } else
     667                destroy = false;
     668       
     669        futex_up(&async_futex);
     670       
     671        if (destroy) {
     672                if (client->data)
     673                        async_client_data_destroy(client->data);
     674               
     675                free(client);
     676        }
     677}
     678
     679/** Wrapper for client connection fibril.
     680 *
     681 * When a new connection arrives, a fibril with this implementing
     682 * function is created.
     683 *
     684 * @param arg Connection structure pointer.
     685 *
     686 * @return Always zero.
     687 *
     688 */
     689static int connection_fibril(void *arg)
     690{
     691        assert(arg);
     692       
     693        /*
     694         * Setup fibril-local connection pointer.
     695         */
     696        fibril_connection = (connection_t *) arg;
     697       
     698        /*
     699         * Add our reference for the current connection in the client task
     700         * tracking structure. If this is the first reference, create and
     701         * hash in a new tracking structure.
     702         */
     703       
     704        client_t *client = async_client_get(fibril_connection->in_task_id, true);
     705        if (!client) {
     706                ipc_answer_0(fibril_connection->callid, ENOMEM);
     707                return 0;
     708        }
     709       
     710        fibril_connection->client = client;
     711       
     712        /*
     713         * Call the connection handler function.
     714         */
     715        fibril_connection->handler(fibril_connection->callid,
     716            &fibril_connection->call, fibril_connection->data);
     717       
     718        /*
     719         * Remove the reference for this client task connection.
     720         */
     721        async_client_put(client);
     722       
     723        /*
     724         * Remove myself from the connection hash table.
     725         */
     726        futex_down(&async_futex);
     727        hash_table_remove(&conn_hash_table, &fibril_connection->in_phone_hash);
     728        futex_up(&async_futex);
     729       
     730        /*
     731         * Answer all remaining messages with EHANGUP.
     732         */
     733        while (!list_empty(&fibril_connection->msg_queue)) {
     734                msg_t *msg =
     735                    list_get_instance(list_first(&fibril_connection->msg_queue),
     736                    msg_t, link);
     737               
     738                list_remove(&msg->link);
     739                ipc_answer_0(msg->callid, EHANGUP);
     740                free(msg);
     741        }
     742       
     743        /*
     744         * If the connection was hung-up, answer the last call,
     745         * i.e. IPC_M_PHONE_HUNGUP.
     746         */
     747        if (fibril_connection->close_callid)
     748                ipc_answer_0(fibril_connection->close_callid, EOK);
     749       
     750        free(fibril_connection);
     751        return 0;
     752}
     753
     754/** Create a new fibril for a new connection.
     755 *
     756 * Create new fibril for connection, fill in connection structures
     757 * and insert it into the hash table, so that later we can easily
     758 * do routing of messages to particular fibrils.
     759 *
     760 * @param in_task_id    Identification of the incoming connection.
     761 * @param in_phone_hash Identification of the incoming connection.
     762 * @param callid        Hash of the opening IPC_M_CONNECT_ME_TO call.
     763 *                      If callid is zero, the connection was opened by
     764 *                      accepting the IPC_M_CONNECT_TO_ME call and this
     765 *                      function is called directly by the server.
     766 * @param call          Call data of the opening call.
     767 * @param handler       Connection handler.
     768 * @param data          Client argument to pass to the connection handler.
     769 *
     770 * @return New fibril id or NULL on failure.
     771 *
     772 */
     773static fid_t async_new_connection(task_id_t in_task_id, sysarg_t in_phone_hash,
     774    ipc_callid_t callid, ipc_call_t *call, async_port_handler_t handler,
     775    void *data)
     776{
     777        connection_t *conn = malloc(sizeof(*conn));
     778        if (!conn) {
     779                if (callid)
     780                        ipc_answer_0(callid, ENOMEM);
     781               
     782                return (uintptr_t) NULL;
     783        }
     784       
     785        conn->in_task_id = in_task_id;
     786        conn->in_phone_hash = in_phone_hash;
     787        list_initialize(&conn->msg_queue);
     788        conn->callid = callid;
     789        conn->close_callid = 0;
     790        conn->handler = handler;
     791        conn->data = data;
     792       
     793        if (call)
     794                conn->call = *call;
     795       
     796        /* We will activate the fibril ASAP */
     797        conn->wdata.active = true;
     798        conn->wdata.fid = fibril_create(connection_fibril, conn);
     799       
     800        if (conn->wdata.fid == 0) {
     801                free(conn);
     802               
     803                if (callid)
     804                        ipc_answer_0(callid, ENOMEM);
     805               
     806                return (uintptr_t) NULL;
     807        }
     808       
     809        /* Add connection to the connection hash table */
     810       
     811        futex_down(&async_futex);
     812        hash_table_insert(&conn_hash_table, &conn->link);
     813        futex_up(&async_futex);
     814       
     815        fibril_add_ready(conn->wdata.fid);
     816       
     817        return conn->wdata.fid;
     818}
     819
     820/** Wrapper for making IPC_M_CONNECT_TO_ME calls using the async framework.
     821 *
     822 * Ask through phone for a new connection to some service.
     823 *
     824 * @param exch    Exchange for sending the message.
     825 * @param iface   Callback interface.
     826 * @param arg1    User defined argument.
     827 * @param arg2    User defined argument.
     828 * @param handler Callback handler.
     829 * @param data    Handler data.
     830 * @param port_id ID of the newly created port.
     831 *
     832 * @return Zero on success or a negative error code.
     833 *
     834 */
     835int async_create_callback_port(async_exch_t *exch, iface_t iface, sysarg_t arg1,
     836    sysarg_t arg2, async_port_handler_t handler, void *data, port_id_t *port_id)
     837{
     838        if ((iface & IFACE_MOD_CALLBACK) != IFACE_MOD_CALLBACK)
     839                return EINVAL;
     840       
     841        if (exch == NULL)
     842                return ENOENT;
     843       
     844        ipc_call_t answer;
     845        aid_t req = async_send_3(exch, IPC_M_CONNECT_TO_ME, iface, arg1, arg2,
     846            &answer);
     847       
     848        sysarg_t ret;
     849        async_wait_for(req, &ret);
     850        if (ret != EOK)
     851                return (int) ret;
     852       
     853        sysarg_t phone_hash = IPC_GET_ARG5(answer);
     854        interface_t *interface;
     855       
     856        futex_down(&async_futex);
     857       
     858        ht_link_t *link = hash_table_find(&interface_hash_table, &iface);
     859        if (link)
     860                interface = hash_table_get_inst(link, interface_t, link);
     861        else
     862                interface = async_new_interface(iface);
     863       
     864        if (!interface) {
     865                futex_up(&async_futex);
     866                return ENOMEM;
     867        }
     868       
     869        port_t *port = async_new_port(interface, handler, data);
     870        if (!port) {
     871                futex_up(&async_futex);
     872                return ENOMEM;
     873        }
     874       
     875        *port_id = port->id;
     876       
     877        futex_up(&async_futex);
     878       
     879        fid_t fid = async_new_connection(answer.in_task_id, phone_hash,
     880            0, NULL, handler, data);
     881        if (fid == (uintptr_t) NULL)
     882                return ENOMEM;
     883       
     884        return EOK;
     885}
    459886
    460887static size_t notification_key_hash(void *key)
     
    8661293        }
    8671294       
    868         msg_t *msg = list_get_instance(list_first(&conn->msg_queue), msg_t, link);
     1295        msg_t *msg = list_get_instance(list_first(&conn->msg_queue),
     1296            msg_t, link);
    8691297        list_remove(&msg->link);
    8701298       
     
    8771305}
    8781306
    879 static client_t *async_client_get(task_id_t client_id, bool create)
    880 {
    881         client_t *client = NULL;
    882 
    883         futex_down(&async_futex);
    884         ht_link_t *link = hash_table_find(&client_hash_table, &client_id);
    885         if (link) {
    886                 client = hash_table_get_inst(link, client_t, link);
    887                 atomic_inc(&client->refcnt);
    888         } else if (create) {
    889                 client = malloc(sizeof(client_t));
    890                 if (client) {
    891                         client->in_task_id = client_id;
    892                         client->data = async_client_data_create();
    893                
    894                         atomic_set(&client->refcnt, 1);
    895                         hash_table_insert(&client_hash_table, &client->link);
    896                 }
    897         }
    898 
    899         futex_up(&async_futex);
    900         return client;
    901 }
    902 
    903 static void async_client_put(client_t *client)
    904 {
    905         bool destroy;
    906 
    907         futex_down(&async_futex);
    908        
    909         if (atomic_predec(&client->refcnt) == 0) {
    910                 hash_table_remove(&client_hash_table, &client->in_task_id);
    911                 destroy = true;
    912         } else
    913                 destroy = false;
    914        
    915         futex_up(&async_futex);
    916        
    917         if (destroy) {
    918                 if (client->data)
    919                         async_client_data_destroy(client->data);
    920                
    921                 free(client);
    922         }
    923 }
    924 
    9251307void *async_get_client_data(void)
    9261308{
     
    9341316        if (!client)
    9351317                return NULL;
     1318       
    9361319        if (!client->data) {
    9371320                async_client_put(client);
    9381321                return NULL;
    9391322        }
    940 
     1323       
    9411324        return client->data;
    9421325}
     
    9451328{
    9461329        client_t *client = async_client_get(client_id, false);
    947 
     1330       
    9481331        assert(client);
    9491332        assert(client->data);
    950 
     1333       
    9511334        /* Drop the reference we got in async_get_client_data_by_hash(). */
    9521335        async_client_put(client);
    953 
     1336       
    9541337        /* Drop our own reference we got at the beginning of this function. */
    9551338        async_client_put(client);
    9561339}
    9571340
    958 /** Wrapper for client connection fibril.
    959  *
    960  * When a new connection arrives, a fibril with this implementing function is
    961  * created. It calls client_connection() and does the final cleanup.
    962  *
    963  * @param arg Connection structure pointer.
    964  *
    965  * @return Always zero.
    966  *
    967  */
    968 static int connection_fibril(void *arg)
    969 {
    970         assert(arg);
    971        
    972         /*
    973          * Setup fibril-local connection pointer.
    974          */
    975         fibril_connection = (connection_t *) arg;
    976        
    977         /*
    978          * Add our reference for the current connection in the client task
    979          * tracking structure. If this is the first reference, create and
    980          * hash in a new tracking structure.
    981          */
    982 
    983         client_t *client = async_client_get(fibril_connection->in_task_id, true);
    984         if (!client) {
    985                 ipc_answer_0(fibril_connection->callid, ENOMEM);
    986                 return 0;
    987         }
    988 
    989         fibril_connection->client = client;
    990        
    991         /*
    992          * Call the connection handler function.
    993          */
    994         fibril_connection->cfibril(fibril_connection->callid,
    995             &fibril_connection->call, fibril_connection->carg);
    996        
    997         /*
    998          * Remove the reference for this client task connection.
    999          */
    1000         async_client_put(client);
    1001        
    1002         /*
    1003          * Remove myself from the connection hash table.
    1004          */
     1341static port_t *async_find_port(iface_t iface, port_id_t port_id)
     1342{
     1343        port_t *port = NULL;
     1344       
    10051345        futex_down(&async_futex);
    1006         hash_table_remove(&conn_hash_table, &fibril_connection->in_phone_hash);
     1346       
     1347        ht_link_t *link = hash_table_find(&interface_hash_table, &iface);
     1348        if (link) {
     1349                interface_t *interface =
     1350                    hash_table_get_inst(link, interface_t, link);
     1351               
     1352                link = hash_table_find(&interface->port_hash_table, &port_id);
     1353                if (link)
     1354                        port = hash_table_get_inst(link, port_t, link);
     1355        }
     1356       
    10071357        futex_up(&async_futex);
    10081358       
    1009         /*
    1010          * Answer all remaining messages with EHANGUP.
    1011          */
    1012         while (!list_empty(&fibril_connection->msg_queue)) {
    1013                 msg_t *msg =
    1014                     list_get_instance(list_first(&fibril_connection->msg_queue),
    1015                     msg_t, link);
    1016                
    1017                 list_remove(&msg->link);
    1018                 ipc_answer_0(msg->callid, EHANGUP);
    1019                 free(msg);
    1020         }
    1021        
    1022         /*
    1023          * If the connection was hung-up, answer the last call,
    1024          * i.e. IPC_M_PHONE_HUNGUP.
    1025          */
    1026         if (fibril_connection->close_callid)
    1027                 ipc_answer_0(fibril_connection->close_callid, EOK);
    1028        
    1029         free(fibril_connection);
    1030         return 0;
    1031 }
    1032 
    1033 /** Create a new fibril for a new connection.
    1034  *
    1035  * Create new fibril for connection, fill in connection structures and insert
    1036  * it into the hash table, so that later we can easily do routing of messages to
    1037  * particular fibrils.
    1038  *
    1039  * @param in_task_id    Identification of the incoming connection.
    1040  * @param in_phone_hash Identification of the incoming connection.
    1041  * @param callid        Hash of the opening IPC_M_CONNECT_ME_TO call.
    1042  *                      If callid is zero, the connection was opened by
    1043  *                      accepting the IPC_M_CONNECT_TO_ME call and this function
    1044  *                      is called directly by the server.
    1045  * @param call          Call data of the opening call.
    1046  * @param cfibril       Fibril function that should be called upon opening the
    1047  *                      connection.
    1048  * @param carg          Extra argument to pass to the connection fibril
    1049  *
    1050  * @return New fibril id or NULL on failure.
    1051  *
    1052  */
    1053 fid_t async_new_connection(task_id_t in_task_id, sysarg_t in_phone_hash,
    1054     ipc_callid_t callid, ipc_call_t *call,
    1055     async_client_conn_t cfibril, void *carg)
    1056 {
    1057         connection_t *conn = malloc(sizeof(*conn));
    1058         if (!conn) {
    1059                 if (callid)
    1060                         ipc_answer_0(callid, ENOMEM);
    1061                
    1062                 return (uintptr_t) NULL;
    1063         }
    1064        
    1065         conn->in_task_id = in_task_id;
    1066         conn->in_phone_hash = in_phone_hash;
    1067         list_initialize(&conn->msg_queue);
    1068         conn->callid = callid;
    1069         conn->close_callid = 0;
    1070         conn->carg = carg;
    1071        
    1072         if (call)
    1073                 conn->call = *call;
    1074        
    1075         /* We will activate the fibril ASAP */
    1076         conn->wdata.active = true;
    1077         conn->cfibril = cfibril;
    1078         conn->wdata.fid = fibril_create(connection_fibril, conn);
    1079        
    1080         if (conn->wdata.fid == 0) {
    1081                 free(conn);
    1082                
    1083                 if (callid)
    1084                         ipc_answer_0(callid, ENOMEM);
    1085                
    1086                 return (uintptr_t) NULL;
    1087         }
    1088        
    1089         /* Add connection to the connection hash table */
    1090        
    1091         futex_down(&async_futex);
    1092         hash_table_insert(&conn_hash_table, &conn->link);
    1093         futex_up(&async_futex);
    1094        
    1095         fibril_add_ready(conn->wdata.fid);
    1096        
    1097         return conn->wdata.fid;
     1359        return port;
    10981360}
    10991361
     
    11111373        assert(call);
    11121374       
    1113         /* Unrouted call - take some default action */
     1375        /* Kernel notification */
    11141376        if ((callid & IPC_CALLID_NOTIFICATION)) {
    11151377                process_notification(callid, call);
     
    11171379        }
    11181380       
    1119         switch (IPC_GET_IMETHOD(*call)) {
    1120         case IPC_M_CLONE_ESTABLISH:
    1121         case IPC_M_CONNECT_ME_TO:
     1381        /* New connection */
     1382        if (IPC_GET_IMETHOD(*call) == IPC_M_CONNECT_ME_TO) {
     1383                iface_t iface = (iface_t) IPC_GET_ARG1(*call);
     1384                sysarg_t in_phone_hash = IPC_GET_ARG5(*call);
     1385               
     1386                async_notification_handler_t handler = fallback_port_handler;
     1387                void *data = fallback_port_data;
     1388               
     1389                // TODO: Currently ignores all ports but the first one
     1390                port_t *port = async_find_port(iface, 0);
     1391                if (port) {
     1392                        handler = port->handler;
     1393                        data = port->data;
     1394                }
     1395               
     1396                async_new_connection(call->in_task_id, in_phone_hash, callid,
     1397                    call, handler, data);
     1398                return;
     1399        }
     1400       
     1401        /* Cloned connection */
     1402        if (IPC_GET_IMETHOD(*call) == IPC_M_CLONE_ESTABLISH) {
     1403                // TODO: Currently ignores ports altogether
     1404               
    11221405                /* Open new connection with fibril, etc. */
    11231406                async_new_connection(call->in_task_id, IPC_GET_ARG5(*call),
    1124                     callid, call, client_connection, NULL);
     1407                    callid, call, fallback_port_handler, fallback_port_data);
    11251408                return;
    11261409        }
     
    12831566void __async_init(void)
    12841567{
     1568        if (!hash_table_create(&interface_hash_table, 0, 0,
     1569            &interface_hash_table_ops))
     1570                abort();
     1571       
    12851572        if (!hash_table_create(&client_hash_table, 0, 0, &client_hash_table_ops))
    12861573                abort();
     
    12971584                abort();
    12981585       
     1586        session_ns->iface = 0;
    12991587        session_ns->mgmt = EXCHANGE_ATOMIC;
    13001588        session_ns->phone = PHONE_NS;
     
    13431631       
    13441632        msg->done = true;
    1345 
     1633       
    13461634        if (msg->forget) {
    13471635                assert(msg->wdata.active);
     
    13511639                fibril_add_ready(msg->wdata.fid);
    13521640        }
    1353 
     1641       
    13541642        futex_up(&async_futex);
    13551643}
     
    14431731       
    14441732        futex_down(&async_futex);
    1445 
     1733       
    14461734        assert(!msg->forget);
    14471735        assert(!msg->destroyed);
    1448 
     1736       
    14491737        if (msg->done) {
    14501738                futex_up(&async_futex);
     
    14871775       
    14881776        amsg_t *msg = (amsg_t *) amsgid;
    1489 
     1777       
    14901778        futex_down(&async_futex);
    1491 
     1779       
    14921780        assert(!msg->forget);
    14931781        assert(!msg->destroyed);
    1494 
     1782       
    14951783        if (msg->done) {
    14961784                futex_up(&async_futex);
     
    15041792        if (timeout < 0)
    15051793                timeout = 0;
    1506 
     1794       
    15071795        getuptime(&msg->wdata.to_event.expires);
    15081796        tv_add_diff(&msg->wdata.to_event.expires, timeout);
     
    15571845{
    15581846        amsg_t *msg = (amsg_t *) amsgid;
    1559 
     1847       
    15601848        assert(msg);
    15611849        assert(!msg->forget);
    15621850        assert(!msg->destroyed);
    1563 
     1851       
    15641852        futex_down(&async_futex);
     1853       
    15651854        if (msg->done) {
    15661855                amsg_destroy(msg);
     
    15691858                msg->forget = true;
    15701859        }
     1860       
    15711861        futex_up(&async_futex);
    15721862}
     
    18142104 * @param arg2            User defined argument.
    18152105 * @param arg3            User defined argument.
    1816  * @param client_receiver Connection handing routine.
    18172106 *
    18182107 * @return Zero on success or a negative error code.
     
    18202109 */
    18212110int async_connect_to_me(async_exch_t *exch, sysarg_t arg1, sysarg_t arg2,
    1822     sysarg_t arg3, async_client_conn_t client_receiver, void *carg)
     2111    sysarg_t arg3)
    18232112{
    18242113        if (exch == NULL)
    18252114                return ENOENT;
    18262115       
    1827         sysarg_t phone_hash;
     2116        ipc_call_t answer;
     2117        aid_t req = async_send_3(exch, IPC_M_CONNECT_TO_ME, arg1, arg2, arg3,
     2118            &answer);
     2119       
    18282120        sysarg_t rc;
    1829 
    1830         aid_t req;
    1831         ipc_call_t answer;
    1832         req = async_send_3(exch, IPC_M_CONNECT_TO_ME, arg1, arg2, arg3,
    1833             &answer);
    18342121        async_wait_for(req, &rc);
    18352122        if (rc != EOK)
    18362123                return (int) rc;
    1837 
    1838         phone_hash = IPC_GET_ARG5(answer);
    1839 
    1840         if (client_receiver != NULL)
    1841                 async_new_connection(answer.in_task_id, phone_hash, 0, NULL,
    1842                     client_receiver, carg);
    18432124       
    18442125        return EOK;
     
    19002181        }
    19012182       
     2183        sess->iface = 0;
    19022184        sess->mgmt = mgmt;
    19032185        sess->phone = phone;
     
    19692251        int phone = async_connect_me_to_internal(exch->phone, arg1, arg2, arg3,
    19702252            0);
    1971        
    19722253        if (phone < 0) {
    19732254                errno = phone;
     
    19762257        }
    19772258       
     2259        sess->iface = 0;
    19782260        sess->mgmt = mgmt;
    19792261        sess->phone = phone;
     
    19922274}
    19932275
     2276/** Wrapper for making IPC_M_CONNECT_ME_TO calls using the async framework.
     2277 *
     2278 * Ask through phone for a new connection to some service and block until
     2279 * success.
     2280 *
     2281 * @param exch  Exchange for sending the message.
     2282 * @param iface Connection interface.
     2283 * @param arg2  User defined argument.
     2284 * @param arg3  User defined argument.
     2285 *
     2286 * @return New session on success or NULL on error.
     2287 *
     2288 */
     2289async_sess_t *async_connect_me_to_iface(async_exch_t *exch, iface_t iface,
     2290    sysarg_t arg2, sysarg_t arg3)
     2291{
     2292        if (exch == NULL) {
     2293                errno = ENOENT;
     2294                return NULL;
     2295        }
     2296       
     2297        async_sess_t *sess = (async_sess_t *) malloc(sizeof(async_sess_t));
     2298        if (sess == NULL) {
     2299                errno = ENOMEM;
     2300                return NULL;
     2301        }
     2302       
     2303        int phone = async_connect_me_to_internal(exch->phone, iface, arg2,
     2304            arg3, 0);
     2305        if (phone < 0) {
     2306                errno = phone;
     2307                free(sess);
     2308                return NULL;
     2309        }
     2310       
     2311        sess->iface = iface;
     2312        sess->phone = phone;
     2313        sess->arg1 = iface;
     2314        sess->arg2 = arg2;
     2315        sess->arg3 = arg3;
     2316       
     2317        fibril_mutex_initialize(&sess->remote_state_mtx);
     2318        sess->remote_state_data = NULL;
     2319       
     2320        list_initialize(&sess->exch_list);
     2321        fibril_mutex_initialize(&sess->mutex);
     2322        atomic_set(&sess->refcnt, 0);
     2323       
     2324        return sess;
     2325}
     2326
    19942327/** Set arguments for new connections.
    19952328 *
     
    20472380        }
    20482381       
     2382        sess->iface = 0;
    20492383        sess->mgmt = mgmt;
    20502384        sess->phone = phone;
     
    20632397}
    20642398
     2399/** Wrapper for making IPC_M_CONNECT_ME_TO calls using the async framework.
     2400 *
     2401 * Ask through phone for a new connection to some service and block until
     2402 * success.
     2403 *
     2404 * @param exch  Exchange for sending the message.
     2405 * @param iface Connection interface.
     2406 * @param arg2  User defined argument.
     2407 * @param arg3  User defined argument.
     2408 *
     2409 * @return New session on success or NULL on error.
     2410 *
     2411 */
     2412async_sess_t *async_connect_me_to_blocking_iface(async_exch_t *exch, iface_t iface,
     2413    sysarg_t arg2, sysarg_t arg3)
     2414{
     2415        if (exch == NULL) {
     2416                errno = ENOENT;
     2417                return NULL;
     2418        }
     2419       
     2420        async_sess_t *sess = (async_sess_t *) malloc(sizeof(async_sess_t));
     2421        if (sess == NULL) {
     2422                errno = ENOMEM;
     2423                return NULL;
     2424        }
     2425       
     2426        int phone = async_connect_me_to_internal(exch->phone, iface, arg2,
     2427            arg3, IPC_FLAG_BLOCKING);
     2428        if (phone < 0) {
     2429                errno = phone;
     2430                free(sess);
     2431                return NULL;
     2432        }
     2433       
     2434        sess->iface = iface;
     2435        sess->phone = phone;
     2436        sess->arg1 = iface;
     2437        sess->arg2 = arg2;
     2438        sess->arg3 = arg3;
     2439       
     2440        fibril_mutex_initialize(&sess->remote_state_mtx);
     2441        sess->remote_state_data = NULL;
     2442       
     2443        list_initialize(&sess->exch_list);
     2444        fibril_mutex_initialize(&sess->mutex);
     2445        atomic_set(&sess->refcnt, 0);
     2446       
     2447        return sess;
     2448}
     2449
    20652450/** Connect to a task specified by id.
    20662451 *
     
    20812466        }
    20822467       
     2468        sess->iface = 0;
    20832469        sess->mgmt = EXCHANGE_ATOMIC;
    20842470        sess->phone = phone;
     
    21582544                return NULL;
    21592545       
    2160         async_exch_t *exch;
     2546        exch_mgmt_t mgmt = sess->mgmt;
     2547        if (sess->iface != 0)
     2548                mgmt = sess->iface & IFACE_EXCHANGE_MASK;
     2549       
     2550        async_exch_t *exch = NULL;
    21612551       
    21622552        fibril_mutex_lock(&async_sess_mutex);
     
    21772567                 */
    21782568               
    2179                 if ((sess->mgmt == EXCHANGE_ATOMIC) ||
    2180                     (sess->mgmt == EXCHANGE_SERIALIZE)) {
     2569                if ((mgmt == EXCHANGE_ATOMIC) ||
     2570                    (mgmt == EXCHANGE_SERIALIZE)) {
    21812571                        exch = (async_exch_t *) malloc(sizeof(async_exch_t));
    21822572                        if (exch != NULL) {
     
    21862576                                exch->phone = sess->phone;
    21872577                        }
    2188                 } else {  /* EXCHANGE_PARALLEL */
     2578                } else if (mgmt == EXCHANGE_PARALLEL) {
     2579                        int phone;
     2580                       
     2581                retry:
    21892582                        /*
    21902583                         * Make a one-time attempt to connect a new data phone.
    21912584                         */
    2192                        
    2193                         int phone;
    2194                        
    2195 retry:
    21962585                        phone = async_connect_me_to_internal(sess->phone, sess->arg1,
    21972586                            sess->arg2, sess->arg3, 0);
     
    22352624                atomic_inc(&sess->refcnt);
    22362625               
    2237                 if (sess->mgmt == EXCHANGE_SERIALIZE)
     2626                if (mgmt == EXCHANGE_SERIALIZE)
    22382627                        fibril_mutex_lock(&sess->mutex);
    22392628        }
     
    22552644        assert(sess != NULL);
    22562645       
     2646        exch_mgmt_t mgmt = sess->mgmt;
     2647        if (sess->iface != 0)
     2648                mgmt = sess->iface & IFACE_EXCHANGE_MASK;
     2649       
    22572650        atomic_dec(&sess->refcnt);
    22582651       
    2259         if (sess->mgmt == EXCHANGE_SERIALIZE)
     2652        if (mgmt == EXCHANGE_SERIALIZE)
    22602653                fibril_mutex_unlock(&sess->mutex);
    22612654       
     
    26943087        }
    26953088       
    2696         void *_data;
     3089        void *arg_data;
    26973090       
    26983091        if (nullterm)
    2699                 _data = malloc(size + 1);
     3092                arg_data = malloc(size + 1);
    27003093        else
    2701                 _data = malloc(size);
    2702        
    2703         if (_data == NULL) {
     3094                arg_data = malloc(size);
     3095       
     3096        if (arg_data == NULL) {
    27043097                ipc_answer_0(callid, ENOMEM);
    27053098                return ENOMEM;
    27063099        }
    27073100       
    2708         int rc = async_data_write_finalize(callid, _data, size);
     3101        int rc = async_data_write_finalize(callid, arg_data, size);
    27093102        if (rc != EOK) {
    2710                 free(_data);
     3103                free(arg_data);
    27113104                return rc;
    27123105        }
    27133106       
    27143107        if (nullterm)
    2715                 ((char *) _data)[size] = 0;
    2716        
    2717         *data = _data;
     3108                ((char *) arg_data)[size] = 0;
     3109       
     3110        *data = arg_data;
    27183111        if (received != NULL)
    27193112                *received = size;
     
    28133206        }
    28143207       
     3208        sess->iface = 0;
    28153209        sess->mgmt = mgmt;
    28163210        sess->phone = phone;
     
    28623256        }
    28633257       
     3258        sess->iface = 0;
    28643259        sess->mgmt = mgmt;
    28653260        sess->phone = phone;
     
    29073302                return NULL;
    29083303       
     3304        sess->iface = 0;
    29093305        sess->mgmt = mgmt;
    29103306        sess->phone = phone;
     
    29343330{
    29353331        assert(callid);
    2936 
     3332       
    29373333        ipc_call_t call;
    29383334        *callid = async_get_call(&call);
    2939 
     3335       
    29403336        if (IPC_GET_IMETHOD(call) != IPC_M_STATE_CHANGE_AUTHORIZE)
    29413337                return false;
     
    29473343        if (arg3)
    29483344                *arg3 = IPC_GET_ARG3(call);
    2949 
     3345       
    29503346        return true;
    29513347}
Note: See TracChangeset for help on using the changeset viewer.