Ignore:
File:
1 edited

Legend:

Unmodified
Added
Removed
  • uspace/lib/c/generic/async.c

    r9ef495f r7f9d97f3  
    7777 *   }
    7878 *
    79  *   port_handler(icallid, *icall)
     79 *   my_client_connection(icallid, *icall)
    8080 *   {
    8181 *     if (want_refuse) {
     
    123123        list_t exch_list;
    124124       
    125         /** Session interface */
    126         iface_t iface;
    127        
    128125        /** Exchange management style */
    129126        exch_mgmt_t mgmt;
     
    192189        /** If reply was received. */
    193190        bool done;
    194        
     191
    195192        /** If the message / reply should be discarded on arrival. */
    196193        bool forget;
    197        
     194
    198195        /** If already destroyed. */
    199196        bool destroyed;
     
    235232        /** Identification of the opening call. */
    236233        ipc_callid_t callid;
    237        
    238234        /** Call data of the opening call. */
    239235        ipc_call_t call;
     236        /** Local argument or NULL if none. */
     237        void *carg;
    240238       
    241239        /** Identification of the closing call. */
     
    243241       
    244242        /** Fibril function that will be used to handle the connection. */
    245         async_port_handler_t handler;
    246        
    247         /** Client data */
    248         void *data;
     243        async_client_conn_t cfibril;
    249244} connection_t;
    250 
    251 /** Interface data */
    252 typedef struct {
    253         ht_link_t link;
    254        
    255         /** Interface ID */
    256         iface_t iface;
    257        
    258         /** Futex protecting the hash table */
    259         futex_t futex;
    260        
    261         /** Interface ports */
    262         hash_table_t port_hash_table;
    263        
    264         /** Next available port ID */
    265         port_id_t port_id_avail;
    266 } interface_t;
    267 
    268 /* Port data */
    269 typedef struct {
    270         ht_link_t link;
    271        
    272         /** Port ID */
    273         port_id_t id;
    274        
    275         /** Port connection handler */
    276         async_port_handler_t handler;
    277        
    278         /** Client data */
    279         void *data;
    280 } port_t;
    281245
    282246/* Notification data */
     
    300264{
    301265        struct timeval tv = { 0, 0 };
    302        
     266
    303267        to->inlist = false;
    304268        to->occurred = false;
     
    323287static amsg_t *amsg_create(void)
    324288{
    325         amsg_t *msg = malloc(sizeof(amsg_t));
     289        amsg_t *msg;
     290
     291        msg = malloc(sizeof(amsg_t));
    326292        if (msg) {
    327293                msg->done = false;
     
    332298                awaiter_initialize(&msg->wdata);
    333299        }
    334        
     300
    335301        return msg;
    336302}
     
    369335}
    370336
    371 /** Default fallback fibril function.
    372  *
    373  * This fallback fibril function gets called on incomming
    374  * connections that do not have a specific handler defined.
     337/** Default fibril function that gets called to handle new connection.
     338 *
     339 * This function is defined as a weak symbol - to be redefined in user code.
    375340 *
    376341 * @param callid Hash of the incoming call.
     
    379344 *
    380345 */
    381 static void default_fallback_port_handler(ipc_callid_t callid, ipc_call_t *call,
     346static void default_client_connection(ipc_callid_t callid, ipc_call_t *call,
    382347    void *arg)
    383348{
     
    385350}
    386351
    387 static async_port_handler_t fallback_port_handler =
    388     default_fallback_port_handler;
    389 static void *fallback_port_data = NULL;
    390 
    391 static hash_table_t interface_hash_table;
    392 
    393 static size_t interface_key_hash(void *key)
    394 {
    395         iface_t iface = *(iface_t *) key;
    396         return iface;
    397 }
    398 
    399 static size_t interface_hash(const ht_link_t *item)
    400 {
    401         interface_t *interface = hash_table_get_inst(item, interface_t, link);
    402         return interface_key_hash(&interface->iface);
    403 }
    404 
    405 static bool interface_key_equal(void *key, const ht_link_t *item)
    406 {
    407         iface_t iface = *(iface_t *) key;
    408         interface_t *interface = hash_table_get_inst(item, interface_t, link);
    409         return iface == interface->iface;
    410 }
    411 
    412 /** Operations for the port hash table. */
    413 static hash_table_ops_t interface_hash_table_ops = {
    414         .hash = interface_hash,
    415         .key_hash = interface_key_hash,
    416         .key_equal = interface_key_equal,
    417         .equal = NULL,
    418         .remove_callback = NULL
    419 };
    420 
    421 static size_t port_key_hash(void *key)
    422 {
    423         port_id_t port_id = *(port_id_t *) key;
    424         return port_id;
    425 }
    426 
    427 static size_t port_hash(const ht_link_t *item)
    428 {
    429         port_t *port = hash_table_get_inst(item, port_t, link);
    430         return port_key_hash(&port->id);
    431 }
    432 
    433 static bool port_key_equal(void *key, const ht_link_t *item)
    434 {
    435         port_id_t port_id = *(port_id_t *) key;
    436         port_t *port = hash_table_get_inst(item, port_t, link);
    437         return port_id == port->id;
    438 }
    439 
    440 /** Operations for the port hash table. */
    441 static hash_table_ops_t port_hash_table_ops = {
    442         .hash = port_hash,
    443         .key_hash = port_key_hash,
    444         .key_equal = port_key_equal,
    445         .equal = NULL,
    446         .remove_callback = NULL
    447 };
    448 
    449 static interface_t *async_new_interface(iface_t iface)
    450 {
    451         interface_t *interface =
    452             (interface_t *) malloc(sizeof(interface_t));
    453         if (!interface)
    454                 return NULL;
    455        
    456         bool ret = hash_table_create(&interface->port_hash_table, 0, 0,
    457             &port_hash_table_ops);
    458         if (!ret) {
    459                 free(interface);
    460                 return NULL;
    461         }
    462        
    463         interface->iface = iface;
    464         futex_initialize(&interface->futex, 1);
    465         interface->port_id_avail = 0;
    466        
    467         hash_table_insert(&interface_hash_table, &interface->link);
    468        
    469         return interface;
    470 }
    471 
    472 static port_t *async_new_port(interface_t *interface,
    473     async_port_handler_t handler, void *data)
    474 {
    475         port_t *port = (port_t *) malloc(sizeof(port_t));
    476         if (!port)
    477                 return NULL;
    478        
    479         futex_down(&interface->futex);
    480        
    481         port_id_t id = interface->port_id_avail;
    482         interface->port_id_avail++;
    483        
    484         port->id = id;
    485         port->handler = handler;
    486         port->data = data;
    487        
    488         hash_table_insert(&interface->port_hash_table, &port->link);
    489        
    490         futex_up(&interface->futex);
    491        
    492         return port;
    493 }
    494 
     352static async_client_conn_t client_connection = default_client_connection;
    495353static size_t notification_handler_stksz = FIBRIL_DFLT_STK_SIZE;
    496354
     355/** Setter for client_connection function pointer.
     356 *
     357 * @param conn Function that will implement a new connection fibril.
     358 *
     359 */
     360void async_set_client_connection(async_client_conn_t conn)
     361{
     362        assert(client_connection == default_client_connection);
     363        client_connection = conn;
     364}
     365
    497366/** Set the stack size for the notification handler notification fibrils.
    498367 *
     
    518387 */
    519388static FIBRIL_CONDVAR_INITIALIZE(avail_phone_cv);
    520 
    521 int async_create_port(iface_t iface, async_port_handler_t handler,
    522     void *data, port_id_t *port_id)
    523 {
    524         if ((iface & IFACE_MOD_MASK) == IFACE_MOD_CALLBACK)
    525                 return EINVAL;
    526        
    527         interface_t *interface;
    528        
    529         futex_down(&async_futex);
    530        
    531         ht_link_t *link = hash_table_find(&interface_hash_table, &iface);
    532         if (link)
    533                 interface = hash_table_get_inst(link, interface_t, link);
    534         else
    535                 interface = async_new_interface(iface);
    536        
    537         if (!interface) {
    538                 futex_up(&async_futex);
    539                 return ENOMEM;
    540         }
    541        
    542         port_t *port = async_new_port(interface, handler, data);
    543         if (!port) {
    544                 futex_up(&async_futex);
    545                 return ENOMEM;
    546         }
    547        
    548         *port_id = port->id;
    549        
    550         futex_up(&async_futex);
    551        
    552         return EOK;
    553 }
    554 
    555 void async_set_fallback_port_handler(async_port_handler_t handler, void *data)
    556 {
    557         assert(handler != NULL);
    558        
    559         fallback_port_handler = handler;
    560         fallback_port_data = data;
    561 }
    562389
    563390static hash_table_t client_hash_table;
     
    630457        .remove_callback = NULL
    631458};
    632 
    633 static client_t *async_client_get(task_id_t client_id, bool create)
    634 {
    635         client_t *client = NULL;
    636        
    637         futex_down(&async_futex);
    638         ht_link_t *link = hash_table_find(&client_hash_table, &client_id);
    639         if (link) {
    640                 client = hash_table_get_inst(link, client_t, link);
    641                 atomic_inc(&client->refcnt);
    642         } else if (create) {
    643                 client = malloc(sizeof(client_t));
    644                 if (client) {
    645                         client->in_task_id = client_id;
    646                         client->data = async_client_data_create();
    647                        
    648                         atomic_set(&client->refcnt, 1);
    649                         hash_table_insert(&client_hash_table, &client->link);
    650                 }
    651         }
    652        
    653         futex_up(&async_futex);
    654         return client;
    655 }
    656 
    657 static void async_client_put(client_t *client)
    658 {
    659         bool destroy;
    660        
    661         futex_down(&async_futex);
    662        
    663         if (atomic_predec(&client->refcnt) == 0) {
    664                 hash_table_remove(&client_hash_table, &client->in_task_id);
    665                 destroy = true;
    666         } else
    667                 destroy = false;
    668        
    669         futex_up(&async_futex);
    670        
    671         if (destroy) {
    672                 if (client->data)
    673                         async_client_data_destroy(client->data);
    674                
    675                 free(client);
    676         }
    677 }
    678 
    679 /** Wrapper for client connection fibril.
    680  *
    681  * When a new connection arrives, a fibril with this implementing
    682  * function is created.
    683  *
    684  * @param arg Connection structure pointer.
    685  *
    686  * @return Always zero.
    687  *
    688  */
    689 static int connection_fibril(void *arg)
    690 {
    691         assert(arg);
    692        
    693         /*
    694          * Setup fibril-local connection pointer.
    695          */
    696         fibril_connection = (connection_t *) arg;
    697        
    698         /*
    699          * Add our reference for the current connection in the client task
    700          * tracking structure. If this is the first reference, create and
    701          * hash in a new tracking structure.
    702          */
    703        
    704         client_t *client = async_client_get(fibril_connection->in_task_id, true);
    705         if (!client) {
    706                 ipc_answer_0(fibril_connection->callid, ENOMEM);
    707                 return 0;
    708         }
    709        
    710         fibril_connection->client = client;
    711        
    712         /*
    713          * Call the connection handler function.
    714          */
    715         fibril_connection->handler(fibril_connection->callid,
    716             &fibril_connection->call, fibril_connection->data);
    717        
    718         /*
    719          * Remove the reference for this client task connection.
    720          */
    721         async_client_put(client);
    722        
    723         /*
    724          * Remove myself from the connection hash table.
    725          */
    726         futex_down(&async_futex);
    727         hash_table_remove(&conn_hash_table, &fibril_connection->in_phone_hash);
    728         futex_up(&async_futex);
    729        
    730         /*
    731          * Answer all remaining messages with EHANGUP.
    732          */
    733         while (!list_empty(&fibril_connection->msg_queue)) {
    734                 msg_t *msg =
    735                     list_get_instance(list_first(&fibril_connection->msg_queue),
    736                     msg_t, link);
    737                
    738                 list_remove(&msg->link);
    739                 ipc_answer_0(msg->callid, EHANGUP);
    740                 free(msg);
    741         }
    742        
    743         /*
    744          * If the connection was hung-up, answer the last call,
    745          * i.e. IPC_M_PHONE_HUNGUP.
    746          */
    747         if (fibril_connection->close_callid)
    748                 ipc_answer_0(fibril_connection->close_callid, EOK);
    749        
    750         free(fibril_connection);
    751         return 0;
    752 }
    753 
    754 /** Create a new fibril for a new connection.
    755  *
    756  * Create new fibril for connection, fill in connection structures
    757  * and insert it into the hash table, so that later we can easily
    758  * do routing of messages to particular fibrils.
    759  *
    760  * @param in_task_id    Identification of the incoming connection.
    761  * @param in_phone_hash Identification of the incoming connection.
    762  * @param callid        Hash of the opening IPC_M_CONNECT_ME_TO call.
    763  *                      If callid is zero, the connection was opened by
    764  *                      accepting the IPC_M_CONNECT_TO_ME call and this
    765  *                      function is called directly by the server.
    766  * @param call          Call data of the opening call.
    767  * @param handler       Connection handler.
    768  * @param data          Client argument to pass to the connection handler.
    769  *
    770  * @return New fibril id or NULL on failure.
    771  *
    772  */
    773 static fid_t async_new_connection(task_id_t in_task_id, sysarg_t in_phone_hash,
    774     ipc_callid_t callid, ipc_call_t *call, async_port_handler_t handler,
    775     void *data)
    776 {
    777         connection_t *conn = malloc(sizeof(*conn));
    778         if (!conn) {
    779                 if (callid)
    780                         ipc_answer_0(callid, ENOMEM);
    781                
    782                 return (uintptr_t) NULL;
    783         }
    784        
    785         conn->in_task_id = in_task_id;
    786         conn->in_phone_hash = in_phone_hash;
    787         list_initialize(&conn->msg_queue);
    788         conn->callid = callid;
    789         conn->close_callid = 0;
    790         conn->handler = handler;
    791         conn->data = data;
    792        
    793         if (call)
    794                 conn->call = *call;
    795        
    796         /* We will activate the fibril ASAP */
    797         conn->wdata.active = true;
    798         conn->wdata.fid = fibril_create(connection_fibril, conn);
    799        
    800         if (conn->wdata.fid == 0) {
    801                 free(conn);
    802                
    803                 if (callid)
    804                         ipc_answer_0(callid, ENOMEM);
    805                
    806                 return (uintptr_t) NULL;
    807         }
    808        
    809         /* Add connection to the connection hash table */
    810        
    811         futex_down(&async_futex);
    812         hash_table_insert(&conn_hash_table, &conn->link);
    813         futex_up(&async_futex);
    814        
    815         fibril_add_ready(conn->wdata.fid);
    816        
    817         return conn->wdata.fid;
    818 }
    819 
    820 /** Wrapper for making IPC_M_CONNECT_TO_ME calls using the async framework.
    821  *
    822  * Ask through phone for a new connection to some service.
    823  *
    824  * @param exch    Exchange for sending the message.
    825  * @param iface   Callback interface.
    826  * @param arg1    User defined argument.
    827  * @param arg2    User defined argument.
    828  * @param handler Callback handler.
    829  * @param data    Handler data.
    830  * @param port_id ID of the newly created port.
    831  *
    832  * @return Zero on success or a negative error code.
    833  *
    834  */
    835 int async_create_callback_port(async_exch_t *exch, iface_t iface, sysarg_t arg1,
    836     sysarg_t arg2, async_port_handler_t handler, void *data, port_id_t *port_id)
    837 {
    838         if ((iface & IFACE_MOD_CALLBACK) != IFACE_MOD_CALLBACK)
    839                 return EINVAL;
    840        
    841         if (exch == NULL)
    842                 return ENOENT;
    843        
    844         ipc_call_t answer;
    845         aid_t req = async_send_3(exch, IPC_M_CONNECT_TO_ME, iface, arg1, arg2,
    846             &answer);
    847        
    848         sysarg_t ret;
    849         async_wait_for(req, &ret);
    850         if (ret != EOK)
    851                 return (int) ret;
    852        
    853         sysarg_t phone_hash = IPC_GET_ARG5(answer);
    854         interface_t *interface;
    855        
    856         futex_down(&async_futex);
    857        
    858         ht_link_t *link = hash_table_find(&interface_hash_table, &iface);
    859         if (link)
    860                 interface = hash_table_get_inst(link, interface_t, link);
    861         else
    862                 interface = async_new_interface(iface);
    863        
    864         if (!interface) {
    865                 futex_up(&async_futex);
    866                 return ENOMEM;
    867         }
    868        
    869         port_t *port = async_new_port(interface, handler, data);
    870         if (!port) {
    871                 futex_up(&async_futex);
    872                 return ENOMEM;
    873         }
    874        
    875         *port_id = port->id;
    876        
    877         futex_up(&async_futex);
    878        
    879         fid_t fid = async_new_connection(answer.in_task_id, phone_hash,
    880             0, NULL, handler, data);
    881         if (fid == (uintptr_t) NULL)
    882                 return ENOMEM;
    883        
    884         return EOK;
    885 }
    886459
    887460static size_t notification_key_hash(void *key)
     
    1293866        }
    1294867       
    1295         msg_t *msg = list_get_instance(list_first(&conn->msg_queue),
    1296             msg_t, link);
     868        msg_t *msg = list_get_instance(list_first(&conn->msg_queue), msg_t, link);
    1297869        list_remove(&msg->link);
    1298870       
     
    1305877}
    1306878
     879static client_t *async_client_get(task_id_t client_id, bool create)
     880{
     881        client_t *client = NULL;
     882
     883        futex_down(&async_futex);
     884        ht_link_t *link = hash_table_find(&client_hash_table, &client_id);
     885        if (link) {
     886                client = hash_table_get_inst(link, client_t, link);
     887                atomic_inc(&client->refcnt);
     888        } else if (create) {
     889                client = malloc(sizeof(client_t));
     890                if (client) {
     891                        client->in_task_id = client_id;
     892                        client->data = async_client_data_create();
     893               
     894                        atomic_set(&client->refcnt, 1);
     895                        hash_table_insert(&client_hash_table, &client->link);
     896                }
     897        }
     898
     899        futex_up(&async_futex);
     900        return client;
     901}
     902
     903static void async_client_put(client_t *client)
     904{
     905        bool destroy;
     906
     907        futex_down(&async_futex);
     908       
     909        if (atomic_predec(&client->refcnt) == 0) {
     910                hash_table_remove(&client_hash_table, &client->in_task_id);
     911                destroy = true;
     912        } else
     913                destroy = false;
     914       
     915        futex_up(&async_futex);
     916       
     917        if (destroy) {
     918                if (client->data)
     919                        async_client_data_destroy(client->data);
     920               
     921                free(client);
     922        }
     923}
     924
    1307925void *async_get_client_data(void)
    1308926{
     
    1316934        if (!client)
    1317935                return NULL;
    1318        
    1319936        if (!client->data) {
    1320937                async_client_put(client);
    1321938                return NULL;
    1322939        }
    1323        
     940
    1324941        return client->data;
    1325942}
     
    1328945{
    1329946        client_t *client = async_client_get(client_id, false);
    1330        
     947
    1331948        assert(client);
    1332949        assert(client->data);
    1333        
     950
    1334951        /* Drop the reference we got in async_get_client_data_by_hash(). */
    1335952        async_client_put(client);
    1336        
     953
    1337954        /* Drop our own reference we got at the beginning of this function. */
    1338955        async_client_put(client);
    1339956}
    1340957
    1341 static port_t *async_find_port(iface_t iface, port_id_t port_id)
    1342 {
    1343         port_t *port = NULL;
    1344        
     958/** Wrapper for client connection fibril.
     959 *
     960 * When a new connection arrives, a fibril with this implementing function is
     961 * created. It calls client_connection() and does the final cleanup.
     962 *
     963 * @param arg Connection structure pointer.
     964 *
     965 * @return Always zero.
     966 *
     967 */
     968static int connection_fibril(void *arg)
     969{
     970        assert(arg);
     971       
     972        /*
     973         * Setup fibril-local connection pointer.
     974         */
     975        fibril_connection = (connection_t *) arg;
     976       
     977        /*
     978         * Add our reference for the current connection in the client task
     979         * tracking structure. If this is the first reference, create and
     980         * hash in a new tracking structure.
     981         */
     982
     983        client_t *client = async_client_get(fibril_connection->in_task_id, true);
     984        if (!client) {
     985                ipc_answer_0(fibril_connection->callid, ENOMEM);
     986                return 0;
     987        }
     988
     989        fibril_connection->client = client;
     990       
     991        /*
     992         * Call the connection handler function.
     993         */
     994        fibril_connection->cfibril(fibril_connection->callid,
     995            &fibril_connection->call, fibril_connection->carg);
     996       
     997        /*
     998         * Remove the reference for this client task connection.
     999         */
     1000        async_client_put(client);
     1001       
     1002        /*
     1003         * Remove myself from the connection hash table.
     1004         */
    13451005        futex_down(&async_futex);
    1346        
    1347         ht_link_t *link = hash_table_find(&interface_hash_table, &iface);
    1348         if (link) {
    1349                 interface_t *interface =
    1350                     hash_table_get_inst(link, interface_t, link);
     1006        hash_table_remove(&conn_hash_table, &fibril_connection->in_phone_hash);
     1007        futex_up(&async_futex);
     1008       
     1009        /*
     1010         * Answer all remaining messages with EHANGUP.
     1011         */
     1012        while (!list_empty(&fibril_connection->msg_queue)) {
     1013                msg_t *msg =
     1014                    list_get_instance(list_first(&fibril_connection->msg_queue),
     1015                    msg_t, link);
    13511016               
    1352                 link = hash_table_find(&interface->port_hash_table, &port_id);
    1353                 if (link)
    1354                         port = hash_table_get_inst(link, port_t, link);
    1355         }
    1356        
     1017                list_remove(&msg->link);
     1018                ipc_answer_0(msg->callid, EHANGUP);
     1019                free(msg);
     1020        }
     1021       
     1022        /*
     1023         * If the connection was hung-up, answer the last call,
     1024         * i.e. IPC_M_PHONE_HUNGUP.
     1025         */
     1026        if (fibril_connection->close_callid)
     1027                ipc_answer_0(fibril_connection->close_callid, EOK);
     1028       
     1029        free(fibril_connection);
     1030        return 0;
     1031}
     1032
     1033/** Create a new fibril for a new connection.
     1034 *
     1035 * Create new fibril for connection, fill in connection structures and insert
     1036 * it into the hash table, so that later we can easily do routing of messages to
     1037 * particular fibrils.
     1038 *
     1039 * @param in_task_id    Identification of the incoming connection.
     1040 * @param in_phone_hash Identification of the incoming connection.
     1041 * @param callid        Hash of the opening IPC_M_CONNECT_ME_TO call.
     1042 *                      If callid is zero, the connection was opened by
     1043 *                      accepting the IPC_M_CONNECT_TO_ME call and this function
     1044 *                      is called directly by the server.
     1045 * @param call          Call data of the opening call.
     1046 * @param cfibril       Fibril function that should be called upon opening the
     1047 *                      connection.
     1048 * @param carg          Extra argument to pass to the connection fibril
     1049 *
     1050 * @return New fibril id or NULL on failure.
     1051 *
     1052 */
     1053fid_t async_new_connection(task_id_t in_task_id, sysarg_t in_phone_hash,
     1054    ipc_callid_t callid, ipc_call_t *call,
     1055    async_client_conn_t cfibril, void *carg)
     1056{
     1057        connection_t *conn = malloc(sizeof(*conn));
     1058        if (!conn) {
     1059                if (callid)
     1060                        ipc_answer_0(callid, ENOMEM);
     1061               
     1062                return (uintptr_t) NULL;
     1063        }
     1064       
     1065        conn->in_task_id = in_task_id;
     1066        conn->in_phone_hash = in_phone_hash;
     1067        list_initialize(&conn->msg_queue);
     1068        conn->callid = callid;
     1069        conn->close_callid = 0;
     1070        conn->carg = carg;
     1071       
     1072        if (call)
     1073                conn->call = *call;
     1074       
     1075        /* We will activate the fibril ASAP */
     1076        conn->wdata.active = true;
     1077        conn->cfibril = cfibril;
     1078        conn->wdata.fid = fibril_create(connection_fibril, conn);
     1079       
     1080        if (conn->wdata.fid == 0) {
     1081                free(conn);
     1082               
     1083                if (callid)
     1084                        ipc_answer_0(callid, ENOMEM);
     1085               
     1086                return (uintptr_t) NULL;
     1087        }
     1088       
     1089        /* Add connection to the connection hash table */
     1090       
     1091        futex_down(&async_futex);
     1092        hash_table_insert(&conn_hash_table, &conn->link);
    13571093        futex_up(&async_futex);
    13581094       
    1359         return port;
     1095        fibril_add_ready(conn->wdata.fid);
     1096       
     1097        return conn->wdata.fid;
    13601098}
    13611099
     
    13731111        assert(call);
    13741112       
    1375         /* Kernel notification */
     1113        /* Unrouted call - take some default action */
    13761114        if ((callid & IPC_CALLID_NOTIFICATION)) {
    13771115                process_notification(callid, call);
     
    13791117        }
    13801118       
    1381         /* New connection */
    1382         if (IPC_GET_IMETHOD(*call) == IPC_M_CONNECT_ME_TO) {
    1383                 iface_t iface = (iface_t) IPC_GET_ARG1(*call);
    1384                 sysarg_t in_phone_hash = IPC_GET_ARG5(*call);
    1385                
    1386                 async_notification_handler_t handler = fallback_port_handler;
    1387                 void *data = fallback_port_data;
    1388                
    1389                 // TODO: Currently ignores all ports but the first one
    1390                 port_t *port = async_find_port(iface, 0);
    1391                 if (port) {
    1392                         handler = port->handler;
    1393                         data = port->data;
    1394                 }
    1395                
    1396                 async_new_connection(call->in_task_id, in_phone_hash, callid,
    1397                     call, handler, data);
    1398                 return;
    1399         }
    1400        
    1401         /* Cloned connection */
    1402         if (IPC_GET_IMETHOD(*call) == IPC_M_CLONE_ESTABLISH) {
    1403                 // TODO: Currently ignores ports altogether
    1404                
     1119        switch (IPC_GET_IMETHOD(*call)) {
     1120        case IPC_M_CLONE_ESTABLISH:
     1121        case IPC_M_CONNECT_ME_TO:
    14051122                /* Open new connection with fibril, etc. */
    14061123                async_new_connection(call->in_task_id, IPC_GET_ARG5(*call),
    1407                     callid, call, fallback_port_handler, fallback_port_data);
     1124                    callid, call, client_connection, NULL);
    14081125                return;
    14091126        }
     
    15661283void __async_init(void)
    15671284{
    1568         if (!hash_table_create(&interface_hash_table, 0, 0,
    1569             &interface_hash_table_ops))
    1570                 abort();
    1571        
    15721285        if (!hash_table_create(&client_hash_table, 0, 0, &client_hash_table_ops))
    15731286                abort();
     
    15841297                abort();
    15851298       
    1586         session_ns->iface = 0;
    15871299        session_ns->mgmt = EXCHANGE_ATOMIC;
    15881300        session_ns->phone = PHONE_NS;
     
    16311343       
    16321344        msg->done = true;
    1633        
     1345
    16341346        if (msg->forget) {
    16351347                assert(msg->wdata.active);
     
    16391351                fibril_add_ready(msg->wdata.fid);
    16401352        }
    1641        
     1353
    16421354        futex_up(&async_futex);
    16431355}
     
    17311443       
    17321444        futex_down(&async_futex);
    1733        
     1445
    17341446        assert(!msg->forget);
    17351447        assert(!msg->destroyed);
    1736        
     1448
    17371449        if (msg->done) {
    17381450                futex_up(&async_futex);
     
    17751487       
    17761488        amsg_t *msg = (amsg_t *) amsgid;
    1777        
     1489
    17781490        futex_down(&async_futex);
    1779        
     1491
    17801492        assert(!msg->forget);
    17811493        assert(!msg->destroyed);
    1782        
     1494
    17831495        if (msg->done) {
    17841496                futex_up(&async_futex);
     
    17921504        if (timeout < 0)
    17931505                timeout = 0;
    1794        
     1506
    17951507        getuptime(&msg->wdata.to_event.expires);
    17961508        tv_add_diff(&msg->wdata.to_event.expires, timeout);
     
    18451557{
    18461558        amsg_t *msg = (amsg_t *) amsgid;
    1847        
     1559
    18481560        assert(msg);
    18491561        assert(!msg->forget);
    18501562        assert(!msg->destroyed);
    1851        
     1563
    18521564        futex_down(&async_futex);
    1853        
    18541565        if (msg->done) {
    18551566                amsg_destroy(msg);
     
    18581569                msg->forget = true;
    18591570        }
    1860        
    18611571        futex_up(&async_futex);
    18621572}
     
    21041814 * @param arg2            User defined argument.
    21051815 * @param arg3            User defined argument.
     1816 * @param client_receiver Connection handing routine.
    21061817 *
    21071818 * @return Zero on success or a negative error code.
     
    21091820 */
    21101821int async_connect_to_me(async_exch_t *exch, sysarg_t arg1, sysarg_t arg2,
    2111     sysarg_t arg3)
     1822    sysarg_t arg3, async_client_conn_t client_receiver, void *carg)
    21121823{
    21131824        if (exch == NULL)
    21141825                return ENOENT;
    21151826       
     1827        sysarg_t phone_hash;
     1828        sysarg_t rc;
     1829
     1830        aid_t req;
    21161831        ipc_call_t answer;
    2117         aid_t req = async_send_3(exch, IPC_M_CONNECT_TO_ME, arg1, arg2, arg3,
     1832        req = async_send_3(exch, IPC_M_CONNECT_TO_ME, arg1, arg2, arg3,
    21181833            &answer);
    2119        
    2120         sysarg_t rc;
    21211834        async_wait_for(req, &rc);
    21221835        if (rc != EOK)
    21231836                return (int) rc;
     1837
     1838        phone_hash = IPC_GET_ARG5(answer);
     1839
     1840        if (client_receiver != NULL)
     1841                async_new_connection(answer.in_task_id, phone_hash, 0, NULL,
     1842                    client_receiver, carg);
    21241843       
    21251844        return EOK;
     
    21811900        }
    21821901       
    2183         sess->iface = 0;
    21841902        sess->mgmt = mgmt;
    21851903        sess->phone = phone;
     
    22511969        int phone = async_connect_me_to_internal(exch->phone, arg1, arg2, arg3,
    22521970            0);
     1971       
    22531972        if (phone < 0) {
    22541973                errno = phone;
     
    22571976        }
    22581977       
    2259         sess->iface = 0;
    22601978        sess->mgmt = mgmt;
    22611979        sess->phone = phone;
     
    22741992}
    22751993
    2276 /** Wrapper for making IPC_M_CONNECT_ME_TO calls using the async framework.
    2277  *
    2278  * Ask through phone for a new connection to some service and block until
    2279  * success.
    2280  *
    2281  * @param exch  Exchange for sending the message.
    2282  * @param iface Connection interface.
    2283  * @param arg2  User defined argument.
    2284  * @param arg3  User defined argument.
    2285  *
    2286  * @return New session on success or NULL on error.
    2287  *
    2288  */
    2289 async_sess_t *async_connect_me_to_iface(async_exch_t *exch, iface_t iface,
    2290     sysarg_t arg2, sysarg_t arg3)
    2291 {
    2292         if (exch == NULL) {
    2293                 errno = ENOENT;
    2294                 return NULL;
    2295         }
    2296        
    2297         async_sess_t *sess = (async_sess_t *) malloc(sizeof(async_sess_t));
    2298         if (sess == NULL) {
    2299                 errno = ENOMEM;
    2300                 return NULL;
    2301         }
    2302        
    2303         int phone = async_connect_me_to_internal(exch->phone, iface, arg2,
    2304             arg3, 0);
    2305         if (phone < 0) {
    2306                 errno = phone;
    2307                 free(sess);
    2308                 return NULL;
    2309         }
    2310        
    2311         sess->iface = iface;
    2312         sess->phone = phone;
    2313         sess->arg1 = iface;
    2314         sess->arg2 = arg2;
    2315         sess->arg3 = arg3;
    2316        
    2317         fibril_mutex_initialize(&sess->remote_state_mtx);
    2318         sess->remote_state_data = NULL;
    2319        
    2320         list_initialize(&sess->exch_list);
    2321         fibril_mutex_initialize(&sess->mutex);
    2322         atomic_set(&sess->refcnt, 0);
    2323        
    2324         return sess;
    2325 }
    2326 
    23271994/** Set arguments for new connections.
    23281995 *
     
    23802047        }
    23812048       
    2382         sess->iface = 0;
    23832049        sess->mgmt = mgmt;
    23842050        sess->phone = phone;
     
    23972063}
    23982064
    2399 /** Wrapper for making IPC_M_CONNECT_ME_TO calls using the async framework.
    2400  *
    2401  * Ask through phone for a new connection to some service and block until
    2402  * success.
    2403  *
    2404  * @param exch  Exchange for sending the message.
    2405  * @param iface Connection interface.
    2406  * @param arg2  User defined argument.
    2407  * @param arg3  User defined argument.
    2408  *
    2409  * @return New session on success or NULL on error.
    2410  *
    2411  */
    2412 async_sess_t *async_connect_me_to_blocking_iface(async_exch_t *exch, iface_t iface,
    2413     sysarg_t arg2, sysarg_t arg3)
    2414 {
    2415         if (exch == NULL) {
    2416                 errno = ENOENT;
    2417                 return NULL;
    2418         }
    2419        
    2420         async_sess_t *sess = (async_sess_t *) malloc(sizeof(async_sess_t));
    2421         if (sess == NULL) {
    2422                 errno = ENOMEM;
    2423                 return NULL;
    2424         }
    2425        
    2426         int phone = async_connect_me_to_internal(exch->phone, iface, arg2,
    2427             arg3, IPC_FLAG_BLOCKING);
    2428         if (phone < 0) {
    2429                 errno = phone;
    2430                 free(sess);
    2431                 return NULL;
    2432         }
    2433        
    2434         sess->iface = iface;
    2435         sess->phone = phone;
    2436         sess->arg1 = iface;
    2437         sess->arg2 = arg2;
    2438         sess->arg3 = arg3;
    2439        
    2440         fibril_mutex_initialize(&sess->remote_state_mtx);
    2441         sess->remote_state_data = NULL;
    2442        
    2443         list_initialize(&sess->exch_list);
    2444         fibril_mutex_initialize(&sess->mutex);
    2445         atomic_set(&sess->refcnt, 0);
    2446        
    2447         return sess;
    2448 }
    2449 
    24502065/** Connect to a task specified by id.
    24512066 *
     
    24662081        }
    24672082       
    2468         sess->iface = 0;
    24692083        sess->mgmt = EXCHANGE_ATOMIC;
    24702084        sess->phone = phone;
     
    25442158                return NULL;
    25452159       
    2546         exch_mgmt_t mgmt = sess->mgmt;
    2547         if (sess->iface != 0)
    2548                 mgmt = sess->iface & IFACE_EXCHANGE_MASK;
    2549        
    2550         async_exch_t *exch = NULL;
     2160        async_exch_t *exch;
    25512161       
    25522162        fibril_mutex_lock(&async_sess_mutex);
     
    25672177                 */
    25682178               
    2569                 if ((mgmt == EXCHANGE_ATOMIC) ||
    2570                     (mgmt == EXCHANGE_SERIALIZE)) {
     2179                if ((sess->mgmt == EXCHANGE_ATOMIC) ||
     2180                    (sess->mgmt == EXCHANGE_SERIALIZE)) {
    25712181                        exch = (async_exch_t *) malloc(sizeof(async_exch_t));
    25722182                        if (exch != NULL) {
     
    25762186                                exch->phone = sess->phone;
    25772187                        }
    2578                 } else if (mgmt == EXCHANGE_PARALLEL) {
    2579                         int phone;
    2580                        
    2581                 retry:
     2188                } else {  /* EXCHANGE_PARALLEL */
    25822189                        /*
    25832190                         * Make a one-time attempt to connect a new data phone.
    25842191                         */
     2192                       
     2193                        int phone;
     2194                       
     2195retry:
    25852196                        phone = async_connect_me_to_internal(sess->phone, sess->arg1,
    25862197                            sess->arg2, sess->arg3, 0);
     
    26242235                atomic_inc(&sess->refcnt);
    26252236               
    2626                 if (mgmt == EXCHANGE_SERIALIZE)
     2237                if (sess->mgmt == EXCHANGE_SERIALIZE)
    26272238                        fibril_mutex_lock(&sess->mutex);
    26282239        }
     
    26442255        assert(sess != NULL);
    26452256       
    2646         exch_mgmt_t mgmt = sess->mgmt;
    2647         if (sess->iface != 0)
    2648                 mgmt = sess->iface & IFACE_EXCHANGE_MASK;
    2649        
    26502257        atomic_dec(&sess->refcnt);
    26512258       
    2652         if (mgmt == EXCHANGE_SERIALIZE)
     2259        if (sess->mgmt == EXCHANGE_SERIALIZE)
    26532260                fibril_mutex_unlock(&sess->mutex);
    26542261       
     
    30872694        }
    30882695       
    3089         void *arg_data;
     2696        void *_data;
    30902697       
    30912698        if (nullterm)
    3092                 arg_data = malloc(size + 1);
     2699                _data = malloc(size + 1);
    30932700        else
    3094                 arg_data = malloc(size);
    3095        
    3096         if (arg_data == NULL) {
     2701                _data = malloc(size);
     2702       
     2703        if (_data == NULL) {
    30972704                ipc_answer_0(callid, ENOMEM);
    30982705                return ENOMEM;
    30992706        }
    31002707       
    3101         int rc = async_data_write_finalize(callid, arg_data, size);
     2708        int rc = async_data_write_finalize(callid, _data, size);
    31022709        if (rc != EOK) {
    3103                 free(arg_data);
     2710                free(_data);
    31042711                return rc;
    31052712        }
    31062713       
    31072714        if (nullterm)
    3108                 ((char *) arg_data)[size] = 0;
    3109        
    3110         *data = arg_data;
     2715                ((char *) _data)[size] = 0;
     2716       
     2717        *data = _data;
    31112718        if (received != NULL)
    31122719                *received = size;
     
    32062813        }
    32072814       
    3208         sess->iface = 0;
    32092815        sess->mgmt = mgmt;
    32102816        sess->phone = phone;
     
    32562862        }
    32572863       
    3258         sess->iface = 0;
    32592864        sess->mgmt = mgmt;
    32602865        sess->phone = phone;
     
    33022907                return NULL;
    33032908       
    3304         sess->iface = 0;
    33052909        sess->mgmt = mgmt;
    33062910        sess->phone = phone;
     
    33302934{
    33312935        assert(callid);
    3332        
     2936
    33332937        ipc_call_t call;
    33342938        *callid = async_get_call(&call);
    3335        
     2939
    33362940        if (IPC_GET_IMETHOD(call) != IPC_M_STATE_CHANGE_AUTHORIZE)
    33372941                return false;
     
    33432947        if (arg3)
    33442948                *arg3 = IPC_GET_ARG3(call);
    3345        
     2949
    33462950        return true;
    33472951}
Note: See TracChangeset for help on using the changeset viewer.