Ignore:
File:
1 edited

Legend:

Unmodified
Added
Removed
  • uspace/lib/c/generic/async.c

    rc170438 r7f9d97f3  
    7777 *   }
    7878 *
    79  *   port_handler(icallid, *icall)
     79 *   my_client_connection(icallid, *icall)
    8080 *   {
    8181 *     if (want_refuse) {
     
    123123        list_t exch_list;
    124124       
    125         /** Session interface */
    126         iface_t iface;
    127        
    128125        /** Exchange management style */
    129126        exch_mgmt_t mgmt;
     
    192189        /** If reply was received. */
    193190        bool done;
    194        
     191
    195192        /** If the message / reply should be discarded on arrival. */
    196193        bool forget;
    197        
     194
    198195        /** If already destroyed. */
    199196        bool destroyed;
     
    235232        /** Identification of the opening call. */
    236233        ipc_callid_t callid;
    237        
    238234        /** Call data of the opening call. */
    239235        ipc_call_t call;
     236        /** Local argument or NULL if none. */
     237        void *carg;
    240238       
    241239        /** Identification of the closing call. */
     
    243241       
    244242        /** Fibril function that will be used to handle the connection. */
    245         async_port_handler_t handler;
    246        
    247         /** Client data */
    248         void *data;
     243        async_client_conn_t cfibril;
    249244} connection_t;
    250 
    251 /** Interface data */
    252 typedef struct {
    253         ht_link_t link;
    254        
    255         /** Interface ID */
    256         iface_t iface;
    257        
    258         /** Futex protecting the hash table */
    259         futex_t futex;
    260        
    261         /** Interface ports */
    262         hash_table_t port_hash_table;
    263        
    264         /** Next available port ID */
    265         port_id_t port_id_avail;
    266 } interface_t;
    267 
    268 /* Port data */
    269 typedef struct {
    270         ht_link_t link;
    271        
    272         /** Port ID */
    273         port_id_t id;
    274        
    275         /** Port connection handler */
    276         async_port_handler_t handler;
    277        
    278         /** Client data */
    279         void *data;
    280 } port_t;
    281245
    282246/* Notification data */
     
    300264{
    301265        struct timeval tv = { 0, 0 };
    302        
     266
    303267        to->inlist = false;
    304268        to->occurred = false;
     
    323287static amsg_t *amsg_create(void)
    324288{
    325         amsg_t *msg = malloc(sizeof(amsg_t));
     289        amsg_t *msg;
     290
     291        msg = malloc(sizeof(amsg_t));
    326292        if (msg) {
    327293                msg->done = false;
     
    332298                awaiter_initialize(&msg->wdata);
    333299        }
    334        
     300
    335301        return msg;
    336302}
     
    369335}
    370336
    371 /** Default fallback fibril function.
    372  *
    373  * This fallback fibril function gets called on incomming
    374  * connections that do not have a specific handler defined.
     337/** Default fibril function that gets called to handle new connection.
     338 *
     339 * This function is defined as a weak symbol - to be redefined in user code.
    375340 *
    376341 * @param callid Hash of the incoming call.
     
    379344 *
    380345 */
    381 static void default_fallback_port_handler(ipc_callid_t callid, ipc_call_t *call,
     346static void default_client_connection(ipc_callid_t callid, ipc_call_t *call,
    382347    void *arg)
    383348{
     
    385350}
    386351
    387 static async_port_handler_t fallback_port_handler =
    388     default_fallback_port_handler;
    389 static void *fallback_port_data = NULL;
    390 
    391 static hash_table_t interface_hash_table;
    392 
    393 static size_t interface_key_hash(void *key)
    394 {
    395         iface_t iface = *(iface_t *) key;
    396         return iface;
    397 }
    398 
    399 static size_t interface_hash(const ht_link_t *item)
    400 {
    401         interface_t *interface = hash_table_get_inst(item, interface_t, link);
    402         return interface_key_hash(&interface->iface);
    403 }
    404 
    405 static bool interface_key_equal(void *key, const ht_link_t *item)
    406 {
    407         iface_t iface = *(iface_t *) key;
    408         interface_t *interface = hash_table_get_inst(item, interface_t, link);
    409         return iface == interface->iface;
    410 }
    411 
    412 /** Operations for the port hash table. */
    413 static hash_table_ops_t interface_hash_table_ops = {
    414         .hash = interface_hash,
    415         .key_hash = interface_key_hash,
    416         .key_equal = interface_key_equal,
    417         .equal = NULL,
    418         .remove_callback = NULL
    419 };
    420 
    421 static size_t port_key_hash(void *key)
    422 {
    423         port_id_t port_id = *(port_id_t *) key;
    424         return port_id;
    425 }
    426 
    427 static size_t port_hash(const ht_link_t *item)
    428 {
    429         port_t *port = hash_table_get_inst(item, port_t, link);
    430         return port_key_hash(&port->id);
    431 }
    432 
    433 static bool port_key_equal(void *key, const ht_link_t *item)
    434 {
    435         port_id_t port_id = *(port_id_t *) key;
    436         port_t *port = hash_table_get_inst(item, port_t, link);
    437         return port_id == port->id;
    438 }
    439 
    440 /** Operations for the port hash table. */
    441 static hash_table_ops_t port_hash_table_ops = {
    442         .hash = port_hash,
    443         .key_hash = port_key_hash,
    444         .key_equal = port_key_equal,
    445         .equal = NULL,
    446         .remove_callback = NULL
    447 };
    448 
    449 static interface_t *async_new_interface(iface_t iface)
    450 {
    451         interface_t *interface =
    452             (interface_t *) malloc(sizeof(interface_t));
    453         if (!interface)
    454                 return NULL;
    455        
    456         bool ret = hash_table_create(&interface->port_hash_table, 0, 0,
    457             &port_hash_table_ops);
    458         if (!ret) {
    459                 free(interface);
    460                 return NULL;
    461         }
    462        
    463         interface->iface = iface;
    464         futex_initialize(&interface->futex, 1);
    465         interface->port_id_avail = 0;
    466        
    467         hash_table_insert(&interface_hash_table, &interface->link);
    468        
    469         return interface;
    470 }
    471 
    472 static port_t *async_new_port(interface_t *interface,
    473     async_port_handler_t handler, void *data)
    474 {
    475         port_t *port = (port_t *) malloc(sizeof(port_t));
    476         if (!port)
    477                 return NULL;
    478        
    479         futex_down(&interface->futex);
    480        
    481         port_id_t id = interface->port_id_avail;
    482         interface->port_id_avail++;
    483        
    484         port->id = id;
    485         port->handler = handler;
    486         port->data = data;
    487        
    488         hash_table_insert(&interface->port_hash_table, &port->link);
    489        
    490         futex_up(&interface->futex);
    491        
    492         return port;
     352static async_client_conn_t client_connection = default_client_connection;
     353static size_t notification_handler_stksz = FIBRIL_DFLT_STK_SIZE;
     354
     355/** Setter for client_connection function pointer.
     356 *
     357 * @param conn Function that will implement a new connection fibril.
     358 *
     359 */
     360void async_set_client_connection(async_client_conn_t conn)
     361{
     362        assert(client_connection == default_client_connection);
     363        client_connection = conn;
     364}
     365
     366/** Set the stack size for the notification handler notification fibrils.
     367 *
     368 * @param size Stack size in bytes.
     369 */
     370void async_set_notification_handler_stack_size(size_t size)
     371{
     372        notification_handler_stksz = size;
    493373}
    494374
     
    507387 */
    508388static FIBRIL_CONDVAR_INITIALIZE(avail_phone_cv);
    509 
    510 int async_create_port(iface_t iface, async_port_handler_t handler,
    511     void *data, port_id_t *port_id)
    512 {
    513         if ((iface & IFACE_MOD_MASK) == IFACE_MOD_CALLBACK)
    514                 return EINVAL;
    515        
    516         interface_t *interface;
    517        
    518         futex_down(&async_futex);
    519        
    520         ht_link_t *link = hash_table_find(&interface_hash_table, &iface);
    521         if (link)
    522                 interface = hash_table_get_inst(link, interface_t, link);
    523         else
    524                 interface = async_new_interface(iface);
    525        
    526         if (!interface) {
    527                 futex_up(&async_futex);
    528                 return ENOMEM;
    529         }
    530        
    531         port_t *port = async_new_port(interface, handler, data);
    532         if (!port) {
    533                 futex_up(&async_futex);
    534                 return ENOMEM;
    535         }
    536        
    537         *port_id = port->id;
    538        
    539         futex_up(&async_futex);
    540        
    541         return EOK;
    542 }
    543 
    544 void async_set_fallback_port_handler(async_port_handler_t handler, void *data)
    545 {
    546         assert(handler != NULL);
    547        
    548         fallback_port_handler = handler;
    549         fallback_port_data = data;
    550 }
    551389
    552390static hash_table_t client_hash_table;
     
    619457        .remove_callback = NULL
    620458};
    621 
    622 static client_t *async_client_get(task_id_t client_id, bool create)
    623 {
    624         client_t *client = NULL;
    625        
    626         futex_down(&async_futex);
    627         ht_link_t *link = hash_table_find(&client_hash_table, &client_id);
    628         if (link) {
    629                 client = hash_table_get_inst(link, client_t, link);
    630                 atomic_inc(&client->refcnt);
    631         } else if (create) {
    632                 client = malloc(sizeof(client_t));
    633                 if (client) {
    634                         client->in_task_id = client_id;
    635                         client->data = async_client_data_create();
    636                        
    637                         atomic_set(&client->refcnt, 1);
    638                         hash_table_insert(&client_hash_table, &client->link);
    639                 }
    640         }
    641        
    642         futex_up(&async_futex);
    643         return client;
    644 }
    645 
    646 static void async_client_put(client_t *client)
    647 {
    648         bool destroy;
    649        
    650         futex_down(&async_futex);
    651        
    652         if (atomic_predec(&client->refcnt) == 0) {
    653                 hash_table_remove(&client_hash_table, &client->in_task_id);
    654                 destroy = true;
    655         } else
    656                 destroy = false;
    657        
    658         futex_up(&async_futex);
    659        
    660         if (destroy) {
    661                 if (client->data)
    662                         async_client_data_destroy(client->data);
    663                
    664                 free(client);
    665         }
    666 }
    667 
    668 /** Wrapper for client connection fibril.
    669  *
    670  * When a new connection arrives, a fibril with this implementing
    671  * function is created.
    672  *
    673  * @param arg Connection structure pointer.
    674  *
    675  * @return Always zero.
    676  *
    677  */
    678 static int connection_fibril(void *arg)
    679 {
    680         assert(arg);
    681        
    682         /*
    683          * Setup fibril-local connection pointer.
    684          */
    685         fibril_connection = (connection_t *) arg;
    686        
    687         /*
    688          * Add our reference for the current connection in the client task
    689          * tracking structure. If this is the first reference, create and
    690          * hash in a new tracking structure.
    691          */
    692        
    693         client_t *client = async_client_get(fibril_connection->in_task_id, true);
    694         if (!client) {
    695                 ipc_answer_0(fibril_connection->callid, ENOMEM);
    696                 return 0;
    697         }
    698        
    699         fibril_connection->client = client;
    700        
    701         /*
    702          * Call the connection handler function.
    703          */
    704         fibril_connection->handler(fibril_connection->callid,
    705             &fibril_connection->call, fibril_connection->data);
    706        
    707         /*
    708          * Remove the reference for this client task connection.
    709          */
    710         async_client_put(client);
    711        
    712         /*
    713          * Remove myself from the connection hash table.
    714          */
    715         futex_down(&async_futex);
    716         hash_table_remove(&conn_hash_table, &fibril_connection->in_phone_hash);
    717         futex_up(&async_futex);
    718        
    719         /*
    720          * Answer all remaining messages with EHANGUP.
    721          */
    722         while (!list_empty(&fibril_connection->msg_queue)) {
    723                 msg_t *msg =
    724                     list_get_instance(list_first(&fibril_connection->msg_queue),
    725                     msg_t, link);
    726                
    727                 list_remove(&msg->link);
    728                 ipc_answer_0(msg->callid, EHANGUP);
    729                 free(msg);
    730         }
    731        
    732         /*
    733          * If the connection was hung-up, answer the last call,
    734          * i.e. IPC_M_PHONE_HUNGUP.
    735          */
    736         if (fibril_connection->close_callid)
    737                 ipc_answer_0(fibril_connection->close_callid, EOK);
    738        
    739         free(fibril_connection);
    740         return 0;
    741 }
    742 
    743 /** Create a new fibril for a new connection.
    744  *
    745  * Create new fibril for connection, fill in connection structures
    746  * and insert it into the hash table, so that later we can easily
    747  * do routing of messages to particular fibrils.
    748  *
    749  * @param in_task_id    Identification of the incoming connection.
    750  * @param in_phone_hash Identification of the incoming connection.
    751  * @param callid        Hash of the opening IPC_M_CONNECT_ME_TO call.
    752  *                      If callid is zero, the connection was opened by
    753  *                      accepting the IPC_M_CONNECT_TO_ME call and this
    754  *                      function is called directly by the server.
    755  * @param call          Call data of the opening call.
    756  * @param handler       Connection handler.
    757  * @param data          Client argument to pass to the connection handler.
    758  *
    759  * @return New fibril id or NULL on failure.
    760  *
    761  */
    762 static fid_t async_new_connection(task_id_t in_task_id, sysarg_t in_phone_hash,
    763     ipc_callid_t callid, ipc_call_t *call, async_port_handler_t handler,
    764     void *data)
    765 {
    766         connection_t *conn = malloc(sizeof(*conn));
    767         if (!conn) {
    768                 if (callid)
    769                         ipc_answer_0(callid, ENOMEM);
    770                
    771                 return (uintptr_t) NULL;
    772         }
    773        
    774         conn->in_task_id = in_task_id;
    775         conn->in_phone_hash = in_phone_hash;
    776         list_initialize(&conn->msg_queue);
    777         conn->callid = callid;
    778         conn->close_callid = 0;
    779         conn->handler = handler;
    780         conn->data = data;
    781        
    782         if (call)
    783                 conn->call = *call;
    784        
    785         /* We will activate the fibril ASAP */
    786         conn->wdata.active = true;
    787         conn->wdata.fid = fibril_create(connection_fibril, conn);
    788        
    789         if (conn->wdata.fid == 0) {
    790                 free(conn);
    791                
    792                 if (callid)
    793                         ipc_answer_0(callid, ENOMEM);
    794                
    795                 return (uintptr_t) NULL;
    796         }
    797        
    798         /* Add connection to the connection hash table */
    799        
    800         futex_down(&async_futex);
    801         hash_table_insert(&conn_hash_table, &conn->link);
    802         futex_up(&async_futex);
    803        
    804         fibril_add_ready(conn->wdata.fid);
    805        
    806         return conn->wdata.fid;
    807 }
    808 
    809 /** Wrapper for making IPC_M_CONNECT_TO_ME calls using the async framework.
    810  *
    811  * Ask through phone for a new connection to some service.
    812  *
    813  * @param exch    Exchange for sending the message.
    814  * @param iface   Callback interface.
    815  * @param arg1    User defined argument.
    816  * @param arg2    User defined argument.
    817  * @param handler Callback handler.
    818  * @param data    Handler data.
    819  * @param port_id ID of the newly created port.
    820  *
    821  * @return Zero on success or a negative error code.
    822  *
    823  */
    824 int async_create_callback_port(async_exch_t *exch, iface_t iface, sysarg_t arg1,
    825     sysarg_t arg2, async_port_handler_t handler, void *data, port_id_t *port_id)
    826 {
    827         if ((iface & IFACE_MOD_CALLBACK) != IFACE_MOD_CALLBACK)
    828                 return EINVAL;
    829        
    830         if (exch == NULL)
    831                 return ENOENT;
    832        
    833         ipc_call_t answer;
    834         aid_t req = async_send_3(exch, IPC_M_CONNECT_TO_ME, iface, arg1, arg2,
    835             &answer);
    836        
    837         sysarg_t ret;
    838         async_wait_for(req, &ret);
    839         if (ret != EOK)
    840                 return (int) ret;
    841        
    842         sysarg_t phone_hash = IPC_GET_ARG5(answer);
    843         interface_t *interface;
    844        
    845         futex_down(&async_futex);
    846        
    847         ht_link_t *link = hash_table_find(&interface_hash_table, &iface);
    848         if (link)
    849                 interface = hash_table_get_inst(link, interface_t, link);
    850         else
    851                 interface = async_new_interface(iface);
    852        
    853         if (!interface) {
    854                 futex_up(&async_futex);
    855                 return ENOMEM;
    856         }
    857        
    858         port_t *port = async_new_port(interface, handler, data);
    859         if (!port) {
    860                 futex_up(&async_futex);
    861                 return ENOMEM;
    862         }
    863        
    864         *port_id = port->id;
    865        
    866         futex_up(&async_futex);
    867        
    868         fid_t fid = async_new_connection(answer.in_task_id, phone_hash,
    869             0, NULL, handler, data);
    870         if (fid == (uintptr_t) NULL)
    871                 return ENOMEM;
    872        
    873         return EOK;
    874 }
    875459
    876460static size_t notification_key_hash(void *key)
     
    987571}
    988572
    989 /** Process notification.
    990  *
    991  * @param callid Hash of the incoming call.
    992  * @param call   Data of the incoming call.
    993  */
    994 static void process_notification(ipc_callid_t callid, ipc_call_t *call)
    995 {
     573/** Notification fibril.
     574 *
     575 * When a notification arrives, a fibril with this implementing function is
     576 * created. It calls the corresponding notification handler and does the final
     577 * cleanup.
     578 *
     579 * @param arg Message structure pointer.
     580 *
     581 * @return Always zero.
     582 *
     583 */
     584static int notification_fibril(void *arg)
     585{
     586        assert(arg);
     587       
     588        msg_t *msg = (msg_t *) arg;
    996589        async_notification_handler_t handler = NULL;
    997590        void *data = NULL;
    998 
    999         assert(call);
    1000591       
    1001592        futex_down(&async_futex);
    1002593       
    1003594        ht_link_t *link = hash_table_find(&notification_hash_table,
    1004             &IPC_GET_IMETHOD(*call));
     595            &IPC_GET_IMETHOD(msg->call));
    1005596        if (link) {
    1006597                notification_t *notification =
     
    1013604       
    1014605        if (handler)
    1015                 handler(callid, call, data);
     606                handler(msg->callid, &msg->call, data);
     607       
     608        free(msg);
     609        return 0;
     610}
     611
     612/** Process notification.
     613 *
     614 * A new fibril is created which would process the notification.
     615 *
     616 * @param callid Hash of the incoming call.
     617 * @param call   Data of the incoming call.
     618 *
     619 * @return False if an error occured.
     620 *         True if the call was passed to the notification fibril.
     621 *
     622 */
     623static bool process_notification(ipc_callid_t callid, ipc_call_t *call)
     624{
     625        assert(call);
     626       
     627        futex_down(&async_futex);
     628       
     629        msg_t *msg = malloc(sizeof(*msg));
     630        if (!msg) {
     631                futex_up(&async_futex);
     632                return false;
     633        }
     634       
     635        msg->callid = callid;
     636        msg->call = *call;
     637       
     638        fid_t fid = fibril_create_generic(notification_fibril, msg,
     639            notification_handler_stksz);
     640        if (fid == 0) {
     641                free(msg);
     642                futex_up(&async_futex);
     643                return false;
     644        }
     645       
     646        fibril_add_ready(fid);
     647       
     648        futex_up(&async_futex);
     649        return true;
    1016650}
    1017651
     
    1232866        }
    1233867       
    1234         msg_t *msg = list_get_instance(list_first(&conn->msg_queue),
    1235             msg_t, link);
     868        msg_t *msg = list_get_instance(list_first(&conn->msg_queue), msg_t, link);
    1236869        list_remove(&msg->link);
    1237870       
     
    1244877}
    1245878
     879static client_t *async_client_get(task_id_t client_id, bool create)
     880{
     881        client_t *client = NULL;
     882
     883        futex_down(&async_futex);
     884        ht_link_t *link = hash_table_find(&client_hash_table, &client_id);
     885        if (link) {
     886                client = hash_table_get_inst(link, client_t, link);
     887                atomic_inc(&client->refcnt);
     888        } else if (create) {
     889                client = malloc(sizeof(client_t));
     890                if (client) {
     891                        client->in_task_id = client_id;
     892                        client->data = async_client_data_create();
     893               
     894                        atomic_set(&client->refcnt, 1);
     895                        hash_table_insert(&client_hash_table, &client->link);
     896                }
     897        }
     898
     899        futex_up(&async_futex);
     900        return client;
     901}
     902
     903static void async_client_put(client_t *client)
     904{
     905        bool destroy;
     906
     907        futex_down(&async_futex);
     908       
     909        if (atomic_predec(&client->refcnt) == 0) {
     910                hash_table_remove(&client_hash_table, &client->in_task_id);
     911                destroy = true;
     912        } else
     913                destroy = false;
     914       
     915        futex_up(&async_futex);
     916       
     917        if (destroy) {
     918                if (client->data)
     919                        async_client_data_destroy(client->data);
     920               
     921                free(client);
     922        }
     923}
     924
    1246925void *async_get_client_data(void)
    1247926{
     
    1255934        if (!client)
    1256935                return NULL;
    1257        
    1258936        if (!client->data) {
    1259937                async_client_put(client);
    1260938                return NULL;
    1261939        }
    1262        
     940
    1263941        return client->data;
    1264942}
     
    1267945{
    1268946        client_t *client = async_client_get(client_id, false);
    1269        
     947
    1270948        assert(client);
    1271949        assert(client->data);
    1272        
     950
    1273951        /* Drop the reference we got in async_get_client_data_by_hash(). */
    1274952        async_client_put(client);
    1275        
     953
    1276954        /* Drop our own reference we got at the beginning of this function. */
    1277955        async_client_put(client);
    1278956}
    1279957
    1280 static port_t *async_find_port(iface_t iface, port_id_t port_id)
    1281 {
    1282         port_t *port = NULL;
    1283        
     958/** Wrapper for client connection fibril.
     959 *
     960 * When a new connection arrives, a fibril with this implementing function is
     961 * created. It calls client_connection() and does the final cleanup.
     962 *
     963 * @param arg Connection structure pointer.
     964 *
     965 * @return Always zero.
     966 *
     967 */
     968static int connection_fibril(void *arg)
     969{
     970        assert(arg);
     971       
     972        /*
     973         * Setup fibril-local connection pointer.
     974         */
     975        fibril_connection = (connection_t *) arg;
     976       
     977        /*
     978         * Add our reference for the current connection in the client task
     979         * tracking structure. If this is the first reference, create and
     980         * hash in a new tracking structure.
     981         */
     982
     983        client_t *client = async_client_get(fibril_connection->in_task_id, true);
     984        if (!client) {
     985                ipc_answer_0(fibril_connection->callid, ENOMEM);
     986                return 0;
     987        }
     988
     989        fibril_connection->client = client;
     990       
     991        /*
     992         * Call the connection handler function.
     993         */
     994        fibril_connection->cfibril(fibril_connection->callid,
     995            &fibril_connection->call, fibril_connection->carg);
     996       
     997        /*
     998         * Remove the reference for this client task connection.
     999         */
     1000        async_client_put(client);
     1001       
     1002        /*
     1003         * Remove myself from the connection hash table.
     1004         */
    12841005        futex_down(&async_futex);
    1285        
    1286         ht_link_t *link = hash_table_find(&interface_hash_table, &iface);
    1287         if (link) {
    1288                 interface_t *interface =
    1289                     hash_table_get_inst(link, interface_t, link);
     1006        hash_table_remove(&conn_hash_table, &fibril_connection->in_phone_hash);
     1007        futex_up(&async_futex);
     1008       
     1009        /*
     1010         * Answer all remaining messages with EHANGUP.
     1011         */
     1012        while (!list_empty(&fibril_connection->msg_queue)) {
     1013                msg_t *msg =
     1014                    list_get_instance(list_first(&fibril_connection->msg_queue),
     1015                    msg_t, link);
    12901016               
    1291                 link = hash_table_find(&interface->port_hash_table, &port_id);
    1292                 if (link)
    1293                         port = hash_table_get_inst(link, port_t, link);
    1294         }
    1295        
     1017                list_remove(&msg->link);
     1018                ipc_answer_0(msg->callid, EHANGUP);
     1019                free(msg);
     1020        }
     1021       
     1022        /*
     1023         * If the connection was hung-up, answer the last call,
     1024         * i.e. IPC_M_PHONE_HUNGUP.
     1025         */
     1026        if (fibril_connection->close_callid)
     1027                ipc_answer_0(fibril_connection->close_callid, EOK);
     1028       
     1029        free(fibril_connection);
     1030        return 0;
     1031}
     1032
     1033/** Create a new fibril for a new connection.
     1034 *
     1035 * Create new fibril for connection, fill in connection structures and insert
     1036 * it into the hash table, so that later we can easily do routing of messages to
     1037 * particular fibrils.
     1038 *
     1039 * @param in_task_id    Identification of the incoming connection.
     1040 * @param in_phone_hash Identification of the incoming connection.
     1041 * @param callid        Hash of the opening IPC_M_CONNECT_ME_TO call.
     1042 *                      If callid is zero, the connection was opened by
     1043 *                      accepting the IPC_M_CONNECT_TO_ME call and this function
     1044 *                      is called directly by the server.
     1045 * @param call          Call data of the opening call.
     1046 * @param cfibril       Fibril function that should be called upon opening the
     1047 *                      connection.
     1048 * @param carg          Extra argument to pass to the connection fibril
     1049 *
     1050 * @return New fibril id or NULL on failure.
     1051 *
     1052 */
     1053fid_t async_new_connection(task_id_t in_task_id, sysarg_t in_phone_hash,
     1054    ipc_callid_t callid, ipc_call_t *call,
     1055    async_client_conn_t cfibril, void *carg)
     1056{
     1057        connection_t *conn = malloc(sizeof(*conn));
     1058        if (!conn) {
     1059                if (callid)
     1060                        ipc_answer_0(callid, ENOMEM);
     1061               
     1062                return (uintptr_t) NULL;
     1063        }
     1064       
     1065        conn->in_task_id = in_task_id;
     1066        conn->in_phone_hash = in_phone_hash;
     1067        list_initialize(&conn->msg_queue);
     1068        conn->callid = callid;
     1069        conn->close_callid = 0;
     1070        conn->carg = carg;
     1071       
     1072        if (call)
     1073                conn->call = *call;
     1074       
     1075        /* We will activate the fibril ASAP */
     1076        conn->wdata.active = true;
     1077        conn->cfibril = cfibril;
     1078        conn->wdata.fid = fibril_create(connection_fibril, conn);
     1079       
     1080        if (conn->wdata.fid == 0) {
     1081                free(conn);
     1082               
     1083                if (callid)
     1084                        ipc_answer_0(callid, ENOMEM);
     1085               
     1086                return (uintptr_t) NULL;
     1087        }
     1088       
     1089        /* Add connection to the connection hash table */
     1090       
     1091        futex_down(&async_futex);
     1092        hash_table_insert(&conn_hash_table, &conn->link);
    12961093        futex_up(&async_futex);
    12971094       
    1298         return port;
     1095        fibril_add_ready(conn->wdata.fid);
     1096       
     1097        return conn->wdata.fid;
    12991098}
    13001099
     
    13121111        assert(call);
    13131112       
    1314         /* Kernel notification */
     1113        /* Unrouted call - take some default action */
    13151114        if ((callid & IPC_CALLID_NOTIFICATION)) {
    1316                 fibril_t *fibril = (fibril_t *) __tcb_get()->fibril_data;
    1317                 unsigned oldsw = fibril->switches;
    1318 
    13191115                process_notification(callid, call);
    1320 
    1321                 if (oldsw != fibril->switches) {
    1322                         /*
    1323                          * The notification handler did not execute atomically
    1324                          * and so the current manager fibril assumed the role of
    1325                          * a notification fibril. While waiting for its
    1326                          * resources, it switched to another manager fibril that
    1327                          * had already existed or it created a new one. We
    1328                          * therefore know there is at least yet another
    1329                          * manager fibril that can take over. We now kill the
    1330                          * current 'notification' fibril to prevent fibril
    1331                          * population explosion.
    1332                          */
    1333                         futex_down(&async_futex);
    1334                         fibril_switch(FIBRIL_FROM_DEAD);
    1335                 }
    13361116                return;
    13371117        }
    13381118       
    1339         /* New connection */
    1340         if (IPC_GET_IMETHOD(*call) == IPC_M_CONNECT_ME_TO) {
    1341                 iface_t iface = (iface_t) IPC_GET_ARG1(*call);
    1342                 sysarg_t in_phone_hash = IPC_GET_ARG5(*call);
    1343                
    1344                 async_notification_handler_t handler = fallback_port_handler;
    1345                 void *data = fallback_port_data;
    1346                
    1347                 // TODO: Currently ignores all ports but the first one
    1348                 port_t *port = async_find_port(iface, 0);
    1349                 if (port) {
    1350                         handler = port->handler;
    1351                         data = port->data;
    1352                 }
    1353                
    1354                 async_new_connection(call->in_task_id, in_phone_hash, callid,
    1355                     call, handler, data);
    1356                 return;
    1357         }
    1358        
    1359         /* Cloned connection */
    1360         if (IPC_GET_IMETHOD(*call) == IPC_M_CLONE_ESTABLISH) {
    1361                 // TODO: Currently ignores ports altogether
    1362                
     1119        switch (IPC_GET_IMETHOD(*call)) {
     1120        case IPC_M_CLONE_ESTABLISH:
     1121        case IPC_M_CONNECT_ME_TO:
    13631122                /* Open new connection with fibril, etc. */
    13641123                async_new_connection(call->in_task_id, IPC_GET_ARG5(*call),
    1365                     callid, call, fallback_port_handler, fallback_port_data);
     1124                    callid, call, client_connection, NULL);
    13661125                return;
    13671126        }
     
    15081267void async_create_manager(void)
    15091268{
    1510         fid_t fid = fibril_create_generic(async_manager_fibril, NULL, PAGE_SIZE);
     1269        fid_t fid = fibril_create(async_manager_fibril, NULL);
    15111270        if (fid != 0)
    15121271                fibril_add_manager(fid);
     
    15241283void __async_init(void)
    15251284{
    1526         if (!hash_table_create(&interface_hash_table, 0, 0,
    1527             &interface_hash_table_ops))
    1528                 abort();
    1529        
    15301285        if (!hash_table_create(&client_hash_table, 0, 0, &client_hash_table_ops))
    15311286                abort();
     
    15421297                abort();
    15431298       
    1544         session_ns->iface = 0;
    15451299        session_ns->mgmt = EXCHANGE_ATOMIC;
    15461300        session_ns->phone = PHONE_NS;
     
    15891343       
    15901344        msg->done = true;
    1591        
     1345
    15921346        if (msg->forget) {
    15931347                assert(msg->wdata.active);
     
    15971351                fibril_add_ready(msg->wdata.fid);
    15981352        }
    1599        
     1353
    16001354        futex_up(&async_futex);
    16011355}
     
    16321386       
    16331387        ipc_call_async_4(exch->phone, imethod, arg1, arg2, arg3, arg4, msg,
    1634             reply_received);
     1388            reply_received, true);
    16351389       
    16361390        return (aid_t) msg;
     
    16701424       
    16711425        ipc_call_async_5(exch->phone, imethod, arg1, arg2, arg3, arg4, arg5,
    1672             msg, reply_received);
     1426            msg, reply_received, true);
    16731427       
    16741428        return (aid_t) msg;
     
    16891443       
    16901444        futex_down(&async_futex);
    1691        
     1445
    16921446        assert(!msg->forget);
    16931447        assert(!msg->destroyed);
    1694        
     1448
    16951449        if (msg->done) {
    16961450                futex_up(&async_futex);
     
    17331487       
    17341488        amsg_t *msg = (amsg_t *) amsgid;
    1735        
     1489
    17361490        futex_down(&async_futex);
    1737        
     1491
    17381492        assert(!msg->forget);
    17391493        assert(!msg->destroyed);
    1740        
     1494
    17411495        if (msg->done) {
    17421496                futex_up(&async_futex);
     
    17501504        if (timeout < 0)
    17511505                timeout = 0;
    1752        
     1506
    17531507        getuptime(&msg->wdata.to_event.expires);
    17541508        tv_add_diff(&msg->wdata.to_event.expires, timeout);
     
    18031557{
    18041558        amsg_t *msg = (amsg_t *) amsgid;
    1805        
     1559
    18061560        assert(msg);
    18071561        assert(!msg->forget);
    18081562        assert(!msg->destroyed);
    1809        
     1563
    18101564        futex_down(&async_futex);
    1811        
    18121565        if (msg->done) {
    18131566                amsg_destroy(msg);
     
    18161569                msg->forget = true;
    18171570        }
    1818        
    18191571        futex_up(&async_futex);
    18201572}
     
    19591711{
    19601712        if (exch != NULL)
    1961                 ipc_call_async_0(exch->phone, imethod, NULL, NULL);
     1713                ipc_call_async_0(exch->phone, imethod, NULL, NULL, true);
    19621714}
    19631715
     
    19651717{
    19661718        if (exch != NULL)
    1967                 ipc_call_async_1(exch->phone, imethod, arg1, NULL, NULL);
     1719                ipc_call_async_1(exch->phone, imethod, arg1, NULL, NULL, true);
    19681720}
    19691721
     
    19721724{
    19731725        if (exch != NULL)
    1974                 ipc_call_async_2(exch->phone, imethod, arg1, arg2, NULL, NULL);
     1726                ipc_call_async_2(exch->phone, imethod, arg1, arg2, NULL, NULL,
     1727                    true);
    19751728}
    19761729
     
    19801733        if (exch != NULL)
    19811734                ipc_call_async_3(exch->phone, imethod, arg1, arg2, arg3, NULL,
    1982                     NULL);
     1735                    NULL, true);
    19831736}
    19841737
     
    19881741        if (exch != NULL)
    19891742                ipc_call_async_4(exch->phone, imethod, arg1, arg2, arg3, arg4,
    1990                     NULL, NULL);
     1743                    NULL, NULL, true);
    19911744}
    19921745
     
    19961749        if (exch != NULL)
    19971750                ipc_call_async_5(exch->phone, imethod, arg1, arg2, arg3, arg4,
    1998                     arg5, NULL, NULL);
     1751                    arg5, NULL, NULL, true);
    19991752}
    20001753
     
    20611814 * @param arg2            User defined argument.
    20621815 * @param arg3            User defined argument.
     1816 * @param client_receiver Connection handing routine.
    20631817 *
    20641818 * @return Zero on success or a negative error code.
     
    20661820 */
    20671821int async_connect_to_me(async_exch_t *exch, sysarg_t arg1, sysarg_t arg2,
    2068     sysarg_t arg3)
     1822    sysarg_t arg3, async_client_conn_t client_receiver, void *carg)
    20691823{
    20701824        if (exch == NULL)
    20711825                return ENOENT;
    20721826       
     1827        sysarg_t phone_hash;
     1828        sysarg_t rc;
     1829
     1830        aid_t req;
    20731831        ipc_call_t answer;
    2074         aid_t req = async_send_3(exch, IPC_M_CONNECT_TO_ME, arg1, arg2, arg3,
     1832        req = async_send_3(exch, IPC_M_CONNECT_TO_ME, arg1, arg2, arg3,
    20751833            &answer);
    2076        
    2077         sysarg_t rc;
    20781834        async_wait_for(req, &rc);
    20791835        if (rc != EOK)
    20801836                return (int) rc;
     1837
     1838        phone_hash = IPC_GET_ARG5(answer);
     1839
     1840        if (client_receiver != NULL)
     1841                async_new_connection(answer.in_task_id, phone_hash, 0, NULL,
     1842                    client_receiver, carg);
    20811843       
    20821844        return EOK;
     
    21191881       
    21201882        ipc_call_async_0(exch->phone, IPC_M_CLONE_ESTABLISH, msg,
    2121             reply_received);
     1883            reply_received, true);
    21221884       
    21231885        sysarg_t rc;
     
    21381900        }
    21391901       
    2140         sess->iface = 0;
    21411902        sess->mgmt = mgmt;
    21421903        sess->phone = phone;
     
    21681929       
    21691930        ipc_call_async_4(phone, IPC_M_CONNECT_ME_TO, arg1, arg2, arg3, arg4,
    2170             msg, reply_received);
     1931            msg, reply_received, true);
    21711932       
    21721933        sysarg_t rc;
     
    22081969        int phone = async_connect_me_to_internal(exch->phone, arg1, arg2, arg3,
    22091970            0);
     1971       
    22101972        if (phone < 0) {
    22111973                errno = phone;
     
    22141976        }
    22151977       
    2216         sess->iface = 0;
    22171978        sess->mgmt = mgmt;
    22181979        sess->phone = phone;
     
    22311992}
    22321993
    2233 /** Wrapper for making IPC_M_CONNECT_ME_TO calls using the async framework.
    2234  *
    2235  * Ask through phone for a new connection to some service and block until
    2236  * success.
    2237  *
    2238  * @param exch  Exchange for sending the message.
    2239  * @param iface Connection interface.
    2240  * @param arg2  User defined argument.
    2241  * @param arg3  User defined argument.
    2242  *
    2243  * @return New session on success or NULL on error.
    2244  *
    2245  */
    2246 async_sess_t *async_connect_me_to_iface(async_exch_t *exch, iface_t iface,
    2247     sysarg_t arg2, sysarg_t arg3)
    2248 {
    2249         if (exch == NULL) {
    2250                 errno = ENOENT;
    2251                 return NULL;
    2252         }
    2253        
    2254         async_sess_t *sess = (async_sess_t *) malloc(sizeof(async_sess_t));
    2255         if (sess == NULL) {
    2256                 errno = ENOMEM;
    2257                 return NULL;
    2258         }
    2259        
    2260         int phone = async_connect_me_to_internal(exch->phone, iface, arg2,
    2261             arg3, 0);
    2262         if (phone < 0) {
    2263                 errno = phone;
    2264                 free(sess);
    2265                 return NULL;
    2266         }
    2267        
    2268         sess->iface = iface;
    2269         sess->phone = phone;
    2270         sess->arg1 = iface;
    2271         sess->arg2 = arg2;
    2272         sess->arg3 = arg3;
    2273        
    2274         fibril_mutex_initialize(&sess->remote_state_mtx);
    2275         sess->remote_state_data = NULL;
    2276        
    2277         list_initialize(&sess->exch_list);
    2278         fibril_mutex_initialize(&sess->mutex);
    2279         atomic_set(&sess->refcnt, 0);
    2280        
    2281         return sess;
    2282 }
    2283 
    22841994/** Set arguments for new connections.
    22851995 *
     
    23372047        }
    23382048       
    2339         sess->iface = 0;
    23402049        sess->mgmt = mgmt;
    23412050        sess->phone = phone;
     
    23542063}
    23552064
    2356 /** Wrapper for making IPC_M_CONNECT_ME_TO calls using the async framework.
    2357  *
    2358  * Ask through phone for a new connection to some service and block until
    2359  * success.
    2360  *
    2361  * @param exch  Exchange for sending the message.
    2362  * @param iface Connection interface.
    2363  * @param arg2  User defined argument.
    2364  * @param arg3  User defined argument.
    2365  *
    2366  * @return New session on success or NULL on error.
    2367  *
    2368  */
    2369 async_sess_t *async_connect_me_to_blocking_iface(async_exch_t *exch, iface_t iface,
    2370     sysarg_t arg2, sysarg_t arg3)
    2371 {
    2372         if (exch == NULL) {
    2373                 errno = ENOENT;
    2374                 return NULL;
    2375         }
    2376        
    2377         async_sess_t *sess = (async_sess_t *) malloc(sizeof(async_sess_t));
    2378         if (sess == NULL) {
    2379                 errno = ENOMEM;
    2380                 return NULL;
    2381         }
    2382        
    2383         int phone = async_connect_me_to_internal(exch->phone, iface, arg2,
    2384             arg3, IPC_FLAG_BLOCKING);
    2385         if (phone < 0) {
    2386                 errno = phone;
    2387                 free(sess);
    2388                 return NULL;
    2389         }
    2390        
    2391         sess->iface = iface;
    2392         sess->phone = phone;
    2393         sess->arg1 = iface;
    2394         sess->arg2 = arg2;
    2395         sess->arg3 = arg3;
    2396        
    2397         fibril_mutex_initialize(&sess->remote_state_mtx);
    2398         sess->remote_state_data = NULL;
    2399        
    2400         list_initialize(&sess->exch_list);
    2401         fibril_mutex_initialize(&sess->mutex);
    2402         atomic_set(&sess->refcnt, 0);
    2403        
    2404         return sess;
    2405 }
    2406 
    24072065/** Connect to a task specified by id.
    24082066 *
     
    24232081        }
    24242082       
    2425         sess->iface = 0;
    24262083        sess->mgmt = EXCHANGE_ATOMIC;
    24272084        sess->phone = phone;
     
    25012158                return NULL;
    25022159       
    2503         exch_mgmt_t mgmt = sess->mgmt;
    2504         if (sess->iface != 0)
    2505                 mgmt = sess->iface & IFACE_EXCHANGE_MASK;
    2506        
    2507         async_exch_t *exch = NULL;
     2160        async_exch_t *exch;
    25082161       
    25092162        fibril_mutex_lock(&async_sess_mutex);
     
    25242177                 */
    25252178               
    2526                 if ((mgmt == EXCHANGE_ATOMIC) ||
    2527                     (mgmt == EXCHANGE_SERIALIZE)) {
     2179                if ((sess->mgmt == EXCHANGE_ATOMIC) ||
     2180                    (sess->mgmt == EXCHANGE_SERIALIZE)) {
    25282181                        exch = (async_exch_t *) malloc(sizeof(async_exch_t));
    25292182                        if (exch != NULL) {
     
    25332186                                exch->phone = sess->phone;
    25342187                        }
    2535                 } else if (mgmt == EXCHANGE_PARALLEL) {
    2536                         int phone;
    2537                        
    2538                 retry:
     2188                } else {  /* EXCHANGE_PARALLEL */
    25392189                        /*
    25402190                         * Make a one-time attempt to connect a new data phone.
    25412191                         */
     2192                       
     2193                        int phone;
     2194                       
     2195retry:
    25422196                        phone = async_connect_me_to_internal(sess->phone, sess->arg1,
    25432197                            sess->arg2, sess->arg3, 0);
     
    25812235                atomic_inc(&sess->refcnt);
    25822236               
    2583                 if (mgmt == EXCHANGE_SERIALIZE)
     2237                if (sess->mgmt == EXCHANGE_SERIALIZE)
    25842238                        fibril_mutex_lock(&sess->mutex);
    25852239        }
     
    26012255        assert(sess != NULL);
    26022256       
    2603         exch_mgmt_t mgmt = sess->mgmt;
    2604         if (sess->iface != 0)
    2605                 mgmt = sess->iface & IFACE_EXCHANGE_MASK;
    2606        
    26072257        atomic_dec(&sess->refcnt);
    26082258       
    2609         if (mgmt == EXCHANGE_SERIALIZE)
     2259        if (sess->mgmt == EXCHANGE_SERIALIZE)
    26102260                fibril_mutex_unlock(&sess->mutex);
    26112261       
     
    30442694        }
    30452695       
    3046         void *arg_data;
     2696        void *_data;
    30472697       
    30482698        if (nullterm)
    3049                 arg_data = malloc(size + 1);
     2699                _data = malloc(size + 1);
    30502700        else
    3051                 arg_data = malloc(size);
    3052        
    3053         if (arg_data == NULL) {
     2701                _data = malloc(size);
     2702       
     2703        if (_data == NULL) {
    30542704                ipc_answer_0(callid, ENOMEM);
    30552705                return ENOMEM;
    30562706        }
    30572707       
    3058         int rc = async_data_write_finalize(callid, arg_data, size);
     2708        int rc = async_data_write_finalize(callid, _data, size);
    30592709        if (rc != EOK) {
    3060                 free(arg_data);
     2710                free(_data);
    30612711                return rc;
    30622712        }
    30632713       
    30642714        if (nullterm)
    3065                 ((char *) arg_data)[size] = 0;
    3066        
    3067         *data = arg_data;
     2715                ((char *) _data)[size] = 0;
     2716       
     2717        *data = _data;
    30682718        if (received != NULL)
    30692719                *received = size;
     
    31632813        }
    31642814       
    3165         sess->iface = 0;
    31662815        sess->mgmt = mgmt;
    31672816        sess->phone = phone;
     
    32132862        }
    32142863       
    3215         sess->iface = 0;
    32162864        sess->mgmt = mgmt;
    32172865        sess->phone = phone;
     
    32592907                return NULL;
    32602908       
    3261         sess->iface = 0;
    32622909        sess->mgmt = mgmt;
    32632910        sess->phone = phone;
     
    32872934{
    32882935        assert(callid);
    3289        
     2936
    32902937        ipc_call_t call;
    32912938        *callid = async_get_call(&call);
    3292        
     2939
    32932940        if (IPC_GET_IMETHOD(call) != IPC_M_STATE_CHANGE_AUTHORIZE)
    32942941                return false;
     
    33002947        if (arg3)
    33012948                *arg3 = IPC_GET_ARG3(call);
    3302        
     2949
    33032950        return true;
    33042951}
Note: See TracChangeset for help on using the changeset viewer.