Ignore:
File:
1 edited

Legend:

Unmodified
Added
Removed
  • uspace/lib/c/generic/async.c

    r7f9d97f3 rc170438  
    7777 *   }
    7878 *
    79  *   my_client_connection(icallid, *icall)
     79 *   port_handler(icallid, *icall)
    8080 *   {
    8181 *     if (want_refuse) {
     
    123123        list_t exch_list;
    124124       
     125        /** Session interface */
     126        iface_t iface;
     127       
    125128        /** Exchange management style */
    126129        exch_mgmt_t mgmt;
     
    189192        /** If reply was received. */
    190193        bool done;
    191 
     194       
    192195        /** If the message / reply should be discarded on arrival. */
    193196        bool forget;
    194 
     197       
    195198        /** If already destroyed. */
    196199        bool destroyed;
     
    232235        /** Identification of the opening call. */
    233236        ipc_callid_t callid;
     237       
    234238        /** Call data of the opening call. */
    235239        ipc_call_t call;
    236         /** Local argument or NULL if none. */
    237         void *carg;
    238240       
    239241        /** Identification of the closing call. */
     
    241243       
    242244        /** Fibril function that will be used to handle the connection. */
    243         async_client_conn_t cfibril;
     245        async_port_handler_t handler;
     246       
     247        /** Client data */
     248        void *data;
    244249} connection_t;
     250
     251/** Interface data */
     252typedef struct {
     253        ht_link_t link;
     254       
     255        /** Interface ID */
     256        iface_t iface;
     257       
     258        /** Futex protecting the hash table */
     259        futex_t futex;
     260       
     261        /** Interface ports */
     262        hash_table_t port_hash_table;
     263       
     264        /** Next available port ID */
     265        port_id_t port_id_avail;
     266} interface_t;
     267
     268/* Port data */
     269typedef struct {
     270        ht_link_t link;
     271       
     272        /** Port ID */
     273        port_id_t id;
     274       
     275        /** Port connection handler */
     276        async_port_handler_t handler;
     277       
     278        /** Client data */
     279        void *data;
     280} port_t;
    245281
    246282/* Notification data */
     
    264300{
    265301        struct timeval tv = { 0, 0 };
    266 
     302       
    267303        to->inlist = false;
    268304        to->occurred = false;
     
    287323static amsg_t *amsg_create(void)
    288324{
    289         amsg_t *msg;
    290 
    291         msg = malloc(sizeof(amsg_t));
     325        amsg_t *msg = malloc(sizeof(amsg_t));
    292326        if (msg) {
    293327                msg->done = false;
     
    298332                awaiter_initialize(&msg->wdata);
    299333        }
    300 
     334       
    301335        return msg;
    302336}
     
    335369}
    336370
    337 /** Default fibril function that gets called to handle new connection.
    338  *
    339  * This function is defined as a weak symbol - to be redefined in user code.
     371/** Default fallback fibril function.
     372 *
     373 * This fallback fibril function gets called on incomming
     374 * connections that do not have a specific handler defined.
    340375 *
    341376 * @param callid Hash of the incoming call.
     
    344379 *
    345380 */
    346 static void default_client_connection(ipc_callid_t callid, ipc_call_t *call,
     381static void default_fallback_port_handler(ipc_callid_t callid, ipc_call_t *call,
    347382    void *arg)
    348383{
     
    350385}
    351386
    352 static async_client_conn_t client_connection = default_client_connection;
    353 static size_t notification_handler_stksz = FIBRIL_DFLT_STK_SIZE;
    354 
    355 /** Setter for client_connection function pointer.
    356  *
    357  * @param conn Function that will implement a new connection fibril.
    358  *
    359  */
    360 void async_set_client_connection(async_client_conn_t conn)
    361 {
    362         assert(client_connection == default_client_connection);
    363         client_connection = conn;
    364 }
    365 
    366 /** Set the stack size for the notification handler notification fibrils.
    367  *
    368  * @param size Stack size in bytes.
    369  */
    370 void async_set_notification_handler_stack_size(size_t size)
    371 {
    372         notification_handler_stksz = size;
     387static async_port_handler_t fallback_port_handler =
     388    default_fallback_port_handler;
     389static void *fallback_port_data = NULL;
     390
     391static hash_table_t interface_hash_table;
     392
     393static size_t interface_key_hash(void *key)
     394{
     395        iface_t iface = *(iface_t *) key;
     396        return iface;
     397}
     398
     399static size_t interface_hash(const ht_link_t *item)
     400{
     401        interface_t *interface = hash_table_get_inst(item, interface_t, link);
     402        return interface_key_hash(&interface->iface);
     403}
     404
     405static bool interface_key_equal(void *key, const ht_link_t *item)
     406{
     407        iface_t iface = *(iface_t *) key;
     408        interface_t *interface = hash_table_get_inst(item, interface_t, link);
     409        return iface == interface->iface;
     410}
     411
     412/** Operations for the port hash table. */
     413static hash_table_ops_t interface_hash_table_ops = {
     414        .hash = interface_hash,
     415        .key_hash = interface_key_hash,
     416        .key_equal = interface_key_equal,
     417        .equal = NULL,
     418        .remove_callback = NULL
     419};
     420
     421static size_t port_key_hash(void *key)
     422{
     423        port_id_t port_id = *(port_id_t *) key;
     424        return port_id;
     425}
     426
     427static size_t port_hash(const ht_link_t *item)
     428{
     429        port_t *port = hash_table_get_inst(item, port_t, link);
     430        return port_key_hash(&port->id);
     431}
     432
     433static bool port_key_equal(void *key, const ht_link_t *item)
     434{
     435        port_id_t port_id = *(port_id_t *) key;
     436        port_t *port = hash_table_get_inst(item, port_t, link);
     437        return port_id == port->id;
     438}
     439
     440/** Operations for the port hash table. */
     441static hash_table_ops_t port_hash_table_ops = {
     442        .hash = port_hash,
     443        .key_hash = port_key_hash,
     444        .key_equal = port_key_equal,
     445        .equal = NULL,
     446        .remove_callback = NULL
     447};
     448
     449static interface_t *async_new_interface(iface_t iface)
     450{
     451        interface_t *interface =
     452            (interface_t *) malloc(sizeof(interface_t));
     453        if (!interface)
     454                return NULL;
     455       
     456        bool ret = hash_table_create(&interface->port_hash_table, 0, 0,
     457            &port_hash_table_ops);
     458        if (!ret) {
     459                free(interface);
     460                return NULL;
     461        }
     462       
     463        interface->iface = iface;
     464        futex_initialize(&interface->futex, 1);
     465        interface->port_id_avail = 0;
     466       
     467        hash_table_insert(&interface_hash_table, &interface->link);
     468       
     469        return interface;
     470}
     471
     472static port_t *async_new_port(interface_t *interface,
     473    async_port_handler_t handler, void *data)
     474{
     475        port_t *port = (port_t *) malloc(sizeof(port_t));
     476        if (!port)
     477                return NULL;
     478       
     479        futex_down(&interface->futex);
     480       
     481        port_id_t id = interface->port_id_avail;
     482        interface->port_id_avail++;
     483       
     484        port->id = id;
     485        port->handler = handler;
     486        port->data = data;
     487       
     488        hash_table_insert(&interface->port_hash_table, &port->link);
     489       
     490        futex_up(&interface->futex);
     491       
     492        return port;
    373493}
    374494
     
    387507 */
    388508static FIBRIL_CONDVAR_INITIALIZE(avail_phone_cv);
     509
     510int async_create_port(iface_t iface, async_port_handler_t handler,
     511    void *data, port_id_t *port_id)
     512{
     513        if ((iface & IFACE_MOD_MASK) == IFACE_MOD_CALLBACK)
     514                return EINVAL;
     515       
     516        interface_t *interface;
     517       
     518        futex_down(&async_futex);
     519       
     520        ht_link_t *link = hash_table_find(&interface_hash_table, &iface);
     521        if (link)
     522                interface = hash_table_get_inst(link, interface_t, link);
     523        else
     524                interface = async_new_interface(iface);
     525       
     526        if (!interface) {
     527                futex_up(&async_futex);
     528                return ENOMEM;
     529        }
     530       
     531        port_t *port = async_new_port(interface, handler, data);
     532        if (!port) {
     533                futex_up(&async_futex);
     534                return ENOMEM;
     535        }
     536       
     537        *port_id = port->id;
     538       
     539        futex_up(&async_futex);
     540       
     541        return EOK;
     542}
     543
     544void async_set_fallback_port_handler(async_port_handler_t handler, void *data)
     545{
     546        assert(handler != NULL);
     547       
     548        fallback_port_handler = handler;
     549        fallback_port_data = data;
     550}
    389551
    390552static hash_table_t client_hash_table;
     
    457619        .remove_callback = NULL
    458620};
     621
     622static client_t *async_client_get(task_id_t client_id, bool create)
     623{
     624        client_t *client = NULL;
     625       
     626        futex_down(&async_futex);
     627        ht_link_t *link = hash_table_find(&client_hash_table, &client_id);
     628        if (link) {
     629                client = hash_table_get_inst(link, client_t, link);
     630                atomic_inc(&client->refcnt);
     631        } else if (create) {
     632                client = malloc(sizeof(client_t));
     633                if (client) {
     634                        client->in_task_id = client_id;
     635                        client->data = async_client_data_create();
     636                       
     637                        atomic_set(&client->refcnt, 1);
     638                        hash_table_insert(&client_hash_table, &client->link);
     639                }
     640        }
     641       
     642        futex_up(&async_futex);
     643        return client;
     644}
     645
     646static void async_client_put(client_t *client)
     647{
     648        bool destroy;
     649       
     650        futex_down(&async_futex);
     651       
     652        if (atomic_predec(&client->refcnt) == 0) {
     653                hash_table_remove(&client_hash_table, &client->in_task_id);
     654                destroy = true;
     655        } else
     656                destroy = false;
     657       
     658        futex_up(&async_futex);
     659       
     660        if (destroy) {
     661                if (client->data)
     662                        async_client_data_destroy(client->data);
     663               
     664                free(client);
     665        }
     666}
     667
     668/** Wrapper for client connection fibril.
     669 *
     670 * When a new connection arrives, a fibril with this implementing
     671 * function is created.
     672 *
     673 * @param arg Connection structure pointer.
     674 *
     675 * @return Always zero.
     676 *
     677 */
     678static int connection_fibril(void *arg)
     679{
     680        assert(arg);
     681       
     682        /*
     683         * Setup fibril-local connection pointer.
     684         */
     685        fibril_connection = (connection_t *) arg;
     686       
     687        /*
     688         * Add our reference for the current connection in the client task
     689         * tracking structure. If this is the first reference, create and
     690         * hash in a new tracking structure.
     691         */
     692       
     693        client_t *client = async_client_get(fibril_connection->in_task_id, true);
     694        if (!client) {
     695                ipc_answer_0(fibril_connection->callid, ENOMEM);
     696                return 0;
     697        }
     698       
     699        fibril_connection->client = client;
     700       
     701        /*
     702         * Call the connection handler function.
     703         */
     704        fibril_connection->handler(fibril_connection->callid,
     705            &fibril_connection->call, fibril_connection->data);
     706       
     707        /*
     708         * Remove the reference for this client task connection.
     709         */
     710        async_client_put(client);
     711       
     712        /*
     713         * Remove myself from the connection hash table.
     714         */
     715        futex_down(&async_futex);
     716        hash_table_remove(&conn_hash_table, &fibril_connection->in_phone_hash);
     717        futex_up(&async_futex);
     718       
     719        /*
     720         * Answer all remaining messages with EHANGUP.
     721         */
     722        while (!list_empty(&fibril_connection->msg_queue)) {
     723                msg_t *msg =
     724                    list_get_instance(list_first(&fibril_connection->msg_queue),
     725                    msg_t, link);
     726               
     727                list_remove(&msg->link);
     728                ipc_answer_0(msg->callid, EHANGUP);
     729                free(msg);
     730        }
     731       
     732        /*
     733         * If the connection was hung-up, answer the last call,
     734         * i.e. IPC_M_PHONE_HUNGUP.
     735         */
     736        if (fibril_connection->close_callid)
     737                ipc_answer_0(fibril_connection->close_callid, EOK);
     738       
     739        free(fibril_connection);
     740        return 0;
     741}
     742
     743/** Create a new fibril for a new connection.
     744 *
     745 * Create new fibril for connection, fill in connection structures
     746 * and insert it into the hash table, so that later we can easily
     747 * do routing of messages to particular fibrils.
     748 *
     749 * @param in_task_id    Identification of the incoming connection.
     750 * @param in_phone_hash Identification of the incoming connection.
     751 * @param callid        Hash of the opening IPC_M_CONNECT_ME_TO call.
     752 *                      If callid is zero, the connection was opened by
     753 *                      accepting the IPC_M_CONNECT_TO_ME call and this
     754 *                      function is called directly by the server.
     755 * @param call          Call data of the opening call.
     756 * @param handler       Connection handler.
     757 * @param data          Client argument to pass to the connection handler.
     758 *
     759 * @return New fibril id or NULL on failure.
     760 *
     761 */
     762static fid_t async_new_connection(task_id_t in_task_id, sysarg_t in_phone_hash,
     763    ipc_callid_t callid, ipc_call_t *call, async_port_handler_t handler,
     764    void *data)
     765{
     766        connection_t *conn = malloc(sizeof(*conn));
     767        if (!conn) {
     768                if (callid)
     769                        ipc_answer_0(callid, ENOMEM);
     770               
     771                return (uintptr_t) NULL;
     772        }
     773       
     774        conn->in_task_id = in_task_id;
     775        conn->in_phone_hash = in_phone_hash;
     776        list_initialize(&conn->msg_queue);
     777        conn->callid = callid;
     778        conn->close_callid = 0;
     779        conn->handler = handler;
     780        conn->data = data;
     781       
     782        if (call)
     783                conn->call = *call;
     784       
     785        /* We will activate the fibril ASAP */
     786        conn->wdata.active = true;
     787        conn->wdata.fid = fibril_create(connection_fibril, conn);
     788       
     789        if (conn->wdata.fid == 0) {
     790                free(conn);
     791               
     792                if (callid)
     793                        ipc_answer_0(callid, ENOMEM);
     794               
     795                return (uintptr_t) NULL;
     796        }
     797       
     798        /* Add connection to the connection hash table */
     799       
     800        futex_down(&async_futex);
     801        hash_table_insert(&conn_hash_table, &conn->link);
     802        futex_up(&async_futex);
     803       
     804        fibril_add_ready(conn->wdata.fid);
     805       
     806        return conn->wdata.fid;
     807}
     808
     809/** Wrapper for making IPC_M_CONNECT_TO_ME calls using the async framework.
     810 *
     811 * Ask through phone for a new connection to some service.
     812 *
     813 * @param exch    Exchange for sending the message.
     814 * @param iface   Callback interface.
     815 * @param arg1    User defined argument.
     816 * @param arg2    User defined argument.
     817 * @param handler Callback handler.
     818 * @param data    Handler data.
     819 * @param port_id ID of the newly created port.
     820 *
     821 * @return Zero on success or a negative error code.
     822 *
     823 */
     824int async_create_callback_port(async_exch_t *exch, iface_t iface, sysarg_t arg1,
     825    sysarg_t arg2, async_port_handler_t handler, void *data, port_id_t *port_id)
     826{
     827        if ((iface & IFACE_MOD_CALLBACK) != IFACE_MOD_CALLBACK)
     828                return EINVAL;
     829       
     830        if (exch == NULL)
     831                return ENOENT;
     832       
     833        ipc_call_t answer;
     834        aid_t req = async_send_3(exch, IPC_M_CONNECT_TO_ME, iface, arg1, arg2,
     835            &answer);
     836       
     837        sysarg_t ret;
     838        async_wait_for(req, &ret);
     839        if (ret != EOK)
     840                return (int) ret;
     841       
     842        sysarg_t phone_hash = IPC_GET_ARG5(answer);
     843        interface_t *interface;
     844       
     845        futex_down(&async_futex);
     846       
     847        ht_link_t *link = hash_table_find(&interface_hash_table, &iface);
     848        if (link)
     849                interface = hash_table_get_inst(link, interface_t, link);
     850        else
     851                interface = async_new_interface(iface);
     852       
     853        if (!interface) {
     854                futex_up(&async_futex);
     855                return ENOMEM;
     856        }
     857       
     858        port_t *port = async_new_port(interface, handler, data);
     859        if (!port) {
     860                futex_up(&async_futex);
     861                return ENOMEM;
     862        }
     863       
     864        *port_id = port->id;
     865       
     866        futex_up(&async_futex);
     867       
     868        fid_t fid = async_new_connection(answer.in_task_id, phone_hash,
     869            0, NULL, handler, data);
     870        if (fid == (uintptr_t) NULL)
     871                return ENOMEM;
     872       
     873        return EOK;
     874}
    459875
    460876static size_t notification_key_hash(void *key)
     
    571987}
    572988
    573 /** Notification fibril.
    574  *
    575  * When a notification arrives, a fibril with this implementing function is
    576  * created. It calls the corresponding notification handler and does the final
    577  * cleanup.
    578  *
    579  * @param arg Message structure pointer.
    580  *
    581  * @return Always zero.
    582  *
    583  */
    584 static int notification_fibril(void *arg)
    585 {
    586         assert(arg);
    587        
    588         msg_t *msg = (msg_t *) arg;
     989/** Process notification.
     990 *
     991 * @param callid Hash of the incoming call.
     992 * @param call   Data of the incoming call.
     993 */
     994static void process_notification(ipc_callid_t callid, ipc_call_t *call)
     995{
    589996        async_notification_handler_t handler = NULL;
    590997        void *data = NULL;
     998
     999        assert(call);
    5911000       
    5921001        futex_down(&async_futex);
    5931002       
    5941003        ht_link_t *link = hash_table_find(&notification_hash_table,
    595             &IPC_GET_IMETHOD(msg->call));
     1004            &IPC_GET_IMETHOD(*call));
    5961005        if (link) {
    5971006                notification_t *notification =
     
    6041013       
    6051014        if (handler)
    606                 handler(msg->callid, &msg->call, data);
    607        
    608         free(msg);
    609         return 0;
    610 }
    611 
    612 /** Process notification.
    613  *
    614  * A new fibril is created which would process the notification.
    615  *
    616  * @param callid Hash of the incoming call.
    617  * @param call   Data of the incoming call.
    618  *
    619  * @return False if an error occured.
    620  *         True if the call was passed to the notification fibril.
    621  *
    622  */
    623 static bool process_notification(ipc_callid_t callid, ipc_call_t *call)
    624 {
    625         assert(call);
    626        
    627         futex_down(&async_futex);
    628        
    629         msg_t *msg = malloc(sizeof(*msg));
    630         if (!msg) {
    631                 futex_up(&async_futex);
    632                 return false;
    633         }
    634        
    635         msg->callid = callid;
    636         msg->call = *call;
    637        
    638         fid_t fid = fibril_create_generic(notification_fibril, msg,
    639             notification_handler_stksz);
    640         if (fid == 0) {
    641                 free(msg);
    642                 futex_up(&async_futex);
    643                 return false;
    644         }
    645        
    646         fibril_add_ready(fid);
    647        
    648         futex_up(&async_futex);
    649         return true;
     1015                handler(callid, call, data);
    6501016}
    6511017
     
    8661232        }
    8671233       
    868         msg_t *msg = list_get_instance(list_first(&conn->msg_queue), msg_t, link);
     1234        msg_t *msg = list_get_instance(list_first(&conn->msg_queue),
     1235            msg_t, link);
    8691236        list_remove(&msg->link);
    8701237       
     
    8771244}
    8781245
    879 static client_t *async_client_get(task_id_t client_id, bool create)
    880 {
    881         client_t *client = NULL;
    882 
    883         futex_down(&async_futex);
    884         ht_link_t *link = hash_table_find(&client_hash_table, &client_id);
    885         if (link) {
    886                 client = hash_table_get_inst(link, client_t, link);
    887                 atomic_inc(&client->refcnt);
    888         } else if (create) {
    889                 client = malloc(sizeof(client_t));
    890                 if (client) {
    891                         client->in_task_id = client_id;
    892                         client->data = async_client_data_create();
    893                
    894                         atomic_set(&client->refcnt, 1);
    895                         hash_table_insert(&client_hash_table, &client->link);
    896                 }
    897         }
    898 
    899         futex_up(&async_futex);
    900         return client;
    901 }
    902 
    903 static void async_client_put(client_t *client)
    904 {
    905         bool destroy;
    906 
    907         futex_down(&async_futex);
    908        
    909         if (atomic_predec(&client->refcnt) == 0) {
    910                 hash_table_remove(&client_hash_table, &client->in_task_id);
    911                 destroy = true;
    912         } else
    913                 destroy = false;
    914        
    915         futex_up(&async_futex);
    916        
    917         if (destroy) {
    918                 if (client->data)
    919                         async_client_data_destroy(client->data);
    920                
    921                 free(client);
    922         }
    923 }
    924 
    9251246void *async_get_client_data(void)
    9261247{
     
    9341255        if (!client)
    9351256                return NULL;
     1257       
    9361258        if (!client->data) {
    9371259                async_client_put(client);
    9381260                return NULL;
    9391261        }
    940 
     1262       
    9411263        return client->data;
    9421264}
     
    9451267{
    9461268        client_t *client = async_client_get(client_id, false);
    947 
     1269       
    9481270        assert(client);
    9491271        assert(client->data);
    950 
     1272       
    9511273        /* Drop the reference we got in async_get_client_data_by_hash(). */
    9521274        async_client_put(client);
    953 
     1275       
    9541276        /* Drop our own reference we got at the beginning of this function. */
    9551277        async_client_put(client);
    9561278}
    9571279
    958 /** Wrapper for client connection fibril.
    959  *
    960  * When a new connection arrives, a fibril with this implementing function is
    961  * created. It calls client_connection() and does the final cleanup.
    962  *
    963  * @param arg Connection structure pointer.
    964  *
    965  * @return Always zero.
    966  *
    967  */
    968 static int connection_fibril(void *arg)
    969 {
    970         assert(arg);
    971        
    972         /*
    973          * Setup fibril-local connection pointer.
    974          */
    975         fibril_connection = (connection_t *) arg;
    976        
    977         /*
    978          * Add our reference for the current connection in the client task
    979          * tracking structure. If this is the first reference, create and
    980          * hash in a new tracking structure.
    981          */
    982 
    983         client_t *client = async_client_get(fibril_connection->in_task_id, true);
    984         if (!client) {
    985                 ipc_answer_0(fibril_connection->callid, ENOMEM);
    986                 return 0;
    987         }
    988 
    989         fibril_connection->client = client;
    990        
    991         /*
    992          * Call the connection handler function.
    993          */
    994         fibril_connection->cfibril(fibril_connection->callid,
    995             &fibril_connection->call, fibril_connection->carg);
    996        
    997         /*
    998          * Remove the reference for this client task connection.
    999          */
    1000         async_client_put(client);
    1001        
    1002         /*
    1003          * Remove myself from the connection hash table.
    1004          */
     1280static port_t *async_find_port(iface_t iface, port_id_t port_id)
     1281{
     1282        port_t *port = NULL;
     1283       
    10051284        futex_down(&async_futex);
    1006         hash_table_remove(&conn_hash_table, &fibril_connection->in_phone_hash);
     1285       
     1286        ht_link_t *link = hash_table_find(&interface_hash_table, &iface);
     1287        if (link) {
     1288                interface_t *interface =
     1289                    hash_table_get_inst(link, interface_t, link);
     1290               
     1291                link = hash_table_find(&interface->port_hash_table, &port_id);
     1292                if (link)
     1293                        port = hash_table_get_inst(link, port_t, link);
     1294        }
     1295       
    10071296        futex_up(&async_futex);
    10081297       
    1009         /*
    1010          * Answer all remaining messages with EHANGUP.
    1011          */
    1012         while (!list_empty(&fibril_connection->msg_queue)) {
    1013                 msg_t *msg =
    1014                     list_get_instance(list_first(&fibril_connection->msg_queue),
    1015                     msg_t, link);
    1016                
    1017                 list_remove(&msg->link);
    1018                 ipc_answer_0(msg->callid, EHANGUP);
    1019                 free(msg);
    1020         }
    1021        
    1022         /*
    1023          * If the connection was hung-up, answer the last call,
    1024          * i.e. IPC_M_PHONE_HUNGUP.
    1025          */
    1026         if (fibril_connection->close_callid)
    1027                 ipc_answer_0(fibril_connection->close_callid, EOK);
    1028        
    1029         free(fibril_connection);
    1030         return 0;
    1031 }
    1032 
    1033 /** Create a new fibril for a new connection.
    1034  *
    1035  * Create new fibril for connection, fill in connection structures and insert
    1036  * it into the hash table, so that later we can easily do routing of messages to
    1037  * particular fibrils.
    1038  *
    1039  * @param in_task_id    Identification of the incoming connection.
    1040  * @param in_phone_hash Identification of the incoming connection.
    1041  * @param callid        Hash of the opening IPC_M_CONNECT_ME_TO call.
    1042  *                      If callid is zero, the connection was opened by
    1043  *                      accepting the IPC_M_CONNECT_TO_ME call and this function
    1044  *                      is called directly by the server.
    1045  * @param call          Call data of the opening call.
    1046  * @param cfibril       Fibril function that should be called upon opening the
    1047  *                      connection.
    1048  * @param carg          Extra argument to pass to the connection fibril
    1049  *
    1050  * @return New fibril id or NULL on failure.
    1051  *
    1052  */
    1053 fid_t async_new_connection(task_id_t in_task_id, sysarg_t in_phone_hash,
    1054     ipc_callid_t callid, ipc_call_t *call,
    1055     async_client_conn_t cfibril, void *carg)
    1056 {
    1057         connection_t *conn = malloc(sizeof(*conn));
    1058         if (!conn) {
    1059                 if (callid)
    1060                         ipc_answer_0(callid, ENOMEM);
    1061                
    1062                 return (uintptr_t) NULL;
    1063         }
    1064        
    1065         conn->in_task_id = in_task_id;
    1066         conn->in_phone_hash = in_phone_hash;
    1067         list_initialize(&conn->msg_queue);
    1068         conn->callid = callid;
    1069         conn->close_callid = 0;
    1070         conn->carg = carg;
    1071        
    1072         if (call)
    1073                 conn->call = *call;
    1074        
    1075         /* We will activate the fibril ASAP */
    1076         conn->wdata.active = true;
    1077         conn->cfibril = cfibril;
    1078         conn->wdata.fid = fibril_create(connection_fibril, conn);
    1079        
    1080         if (conn->wdata.fid == 0) {
    1081                 free(conn);
    1082                
    1083                 if (callid)
    1084                         ipc_answer_0(callid, ENOMEM);
    1085                
    1086                 return (uintptr_t) NULL;
    1087         }
    1088        
    1089         /* Add connection to the connection hash table */
    1090        
    1091         futex_down(&async_futex);
    1092         hash_table_insert(&conn_hash_table, &conn->link);
    1093         futex_up(&async_futex);
    1094        
    1095         fibril_add_ready(conn->wdata.fid);
    1096        
    1097         return conn->wdata.fid;
     1298        return port;
    10981299}
    10991300
     
    11111312        assert(call);
    11121313       
    1113         /* Unrouted call - take some default action */
     1314        /* Kernel notification */
    11141315        if ((callid & IPC_CALLID_NOTIFICATION)) {
     1316                fibril_t *fibril = (fibril_t *) __tcb_get()->fibril_data;
     1317                unsigned oldsw = fibril->switches;
     1318
    11151319                process_notification(callid, call);
     1320
     1321                if (oldsw != fibril->switches) {
     1322                        /*
     1323                         * The notification handler did not execute atomically
     1324                         * and so the current manager fibril assumed the role of
     1325                         * a notification fibril. While waiting for its
     1326                         * resources, it switched to another manager fibril that
     1327                         * had already existed or it created a new one. We
     1328                         * therefore know there is at least yet another
     1329                         * manager fibril that can take over. We now kill the
     1330                         * current 'notification' fibril to prevent fibril
     1331                         * population explosion.
     1332                         */
     1333                        futex_down(&async_futex);
     1334                        fibril_switch(FIBRIL_FROM_DEAD);
     1335                }
    11161336                return;
    11171337        }
    11181338       
    1119         switch (IPC_GET_IMETHOD(*call)) {
    1120         case IPC_M_CLONE_ESTABLISH:
    1121         case IPC_M_CONNECT_ME_TO:
     1339        /* New connection */
     1340        if (IPC_GET_IMETHOD(*call) == IPC_M_CONNECT_ME_TO) {
     1341                iface_t iface = (iface_t) IPC_GET_ARG1(*call);
     1342                sysarg_t in_phone_hash = IPC_GET_ARG5(*call);
     1343               
     1344                async_notification_handler_t handler = fallback_port_handler;
     1345                void *data = fallback_port_data;
     1346               
     1347                // TODO: Currently ignores all ports but the first one
     1348                port_t *port = async_find_port(iface, 0);
     1349                if (port) {
     1350                        handler = port->handler;
     1351                        data = port->data;
     1352                }
     1353               
     1354                async_new_connection(call->in_task_id, in_phone_hash, callid,
     1355                    call, handler, data);
     1356                return;
     1357        }
     1358       
     1359        /* Cloned connection */
     1360        if (IPC_GET_IMETHOD(*call) == IPC_M_CLONE_ESTABLISH) {
     1361                // TODO: Currently ignores ports altogether
     1362               
    11221363                /* Open new connection with fibril, etc. */
    11231364                async_new_connection(call->in_task_id, IPC_GET_ARG5(*call),
    1124                     callid, call, client_connection, NULL);
     1365                    callid, call, fallback_port_handler, fallback_port_data);
    11251366                return;
    11261367        }
     
    12671508void async_create_manager(void)
    12681509{
    1269         fid_t fid = fibril_create(async_manager_fibril, NULL);
     1510        fid_t fid = fibril_create_generic(async_manager_fibril, NULL, PAGE_SIZE);
    12701511        if (fid != 0)
    12711512                fibril_add_manager(fid);
     
    12831524void __async_init(void)
    12841525{
     1526        if (!hash_table_create(&interface_hash_table, 0, 0,
     1527            &interface_hash_table_ops))
     1528                abort();
     1529       
    12851530        if (!hash_table_create(&client_hash_table, 0, 0, &client_hash_table_ops))
    12861531                abort();
     
    12971542                abort();
    12981543       
     1544        session_ns->iface = 0;
    12991545        session_ns->mgmt = EXCHANGE_ATOMIC;
    13001546        session_ns->phone = PHONE_NS;
     
    13431589       
    13441590        msg->done = true;
    1345 
     1591       
    13461592        if (msg->forget) {
    13471593                assert(msg->wdata.active);
     
    13511597                fibril_add_ready(msg->wdata.fid);
    13521598        }
    1353 
     1599       
    13541600        futex_up(&async_futex);
    13551601}
     
    13861632       
    13871633        ipc_call_async_4(exch->phone, imethod, arg1, arg2, arg3, arg4, msg,
    1388             reply_received, true);
     1634            reply_received);
    13891635       
    13901636        return (aid_t) msg;
     
    14241670       
    14251671        ipc_call_async_5(exch->phone, imethod, arg1, arg2, arg3, arg4, arg5,
    1426             msg, reply_received, true);
     1672            msg, reply_received);
    14271673       
    14281674        return (aid_t) msg;
     
    14431689       
    14441690        futex_down(&async_futex);
    1445 
     1691       
    14461692        assert(!msg->forget);
    14471693        assert(!msg->destroyed);
    1448 
     1694       
    14491695        if (msg->done) {
    14501696                futex_up(&async_futex);
     
    14871733       
    14881734        amsg_t *msg = (amsg_t *) amsgid;
    1489 
     1735       
    14901736        futex_down(&async_futex);
    1491 
     1737       
    14921738        assert(!msg->forget);
    14931739        assert(!msg->destroyed);
    1494 
     1740       
    14951741        if (msg->done) {
    14961742                futex_up(&async_futex);
     
    15041750        if (timeout < 0)
    15051751                timeout = 0;
    1506 
     1752       
    15071753        getuptime(&msg->wdata.to_event.expires);
    15081754        tv_add_diff(&msg->wdata.to_event.expires, timeout);
     
    15571803{
    15581804        amsg_t *msg = (amsg_t *) amsgid;
    1559 
     1805       
    15601806        assert(msg);
    15611807        assert(!msg->forget);
    15621808        assert(!msg->destroyed);
    1563 
     1809       
    15641810        futex_down(&async_futex);
     1811       
    15651812        if (msg->done) {
    15661813                amsg_destroy(msg);
     
    15691816                msg->forget = true;
    15701817        }
     1818       
    15711819        futex_up(&async_futex);
    15721820}
     
    17111959{
    17121960        if (exch != NULL)
    1713                 ipc_call_async_0(exch->phone, imethod, NULL, NULL, true);
     1961                ipc_call_async_0(exch->phone, imethod, NULL, NULL);
    17141962}
    17151963
     
    17171965{
    17181966        if (exch != NULL)
    1719                 ipc_call_async_1(exch->phone, imethod, arg1, NULL, NULL, true);
     1967                ipc_call_async_1(exch->phone, imethod, arg1, NULL, NULL);
    17201968}
    17211969
     
    17241972{
    17251973        if (exch != NULL)
    1726                 ipc_call_async_2(exch->phone, imethod, arg1, arg2, NULL, NULL,
    1727                     true);
     1974                ipc_call_async_2(exch->phone, imethod, arg1, arg2, NULL, NULL);
    17281975}
    17291976
     
    17331980        if (exch != NULL)
    17341981                ipc_call_async_3(exch->phone, imethod, arg1, arg2, arg3, NULL,
    1735                     NULL, true);
     1982                    NULL);
    17361983}
    17371984
     
    17411988        if (exch != NULL)
    17421989                ipc_call_async_4(exch->phone, imethod, arg1, arg2, arg3, arg4,
    1743                     NULL, NULL, true);
     1990                    NULL, NULL);
    17441991}
    17451992
     
    17491996        if (exch != NULL)
    17501997                ipc_call_async_5(exch->phone, imethod, arg1, arg2, arg3, arg4,
    1751                     arg5, NULL, NULL, true);
     1998                    arg5, NULL, NULL);
    17521999}
    17532000
     
    18142061 * @param arg2            User defined argument.
    18152062 * @param arg3            User defined argument.
    1816  * @param client_receiver Connection handing routine.
    18172063 *
    18182064 * @return Zero on success or a negative error code.
     
    18202066 */
    18212067int async_connect_to_me(async_exch_t *exch, sysarg_t arg1, sysarg_t arg2,
    1822     sysarg_t arg3, async_client_conn_t client_receiver, void *carg)
     2068    sysarg_t arg3)
    18232069{
    18242070        if (exch == NULL)
    18252071                return ENOENT;
    18262072       
    1827         sysarg_t phone_hash;
     2073        ipc_call_t answer;
     2074        aid_t req = async_send_3(exch, IPC_M_CONNECT_TO_ME, arg1, arg2, arg3,
     2075            &answer);
     2076       
    18282077        sysarg_t rc;
    1829 
    1830         aid_t req;
    1831         ipc_call_t answer;
    1832         req = async_send_3(exch, IPC_M_CONNECT_TO_ME, arg1, arg2, arg3,
    1833             &answer);
    18342078        async_wait_for(req, &rc);
    18352079        if (rc != EOK)
    18362080                return (int) rc;
    1837 
    1838         phone_hash = IPC_GET_ARG5(answer);
    1839 
    1840         if (client_receiver != NULL)
    1841                 async_new_connection(answer.in_task_id, phone_hash, 0, NULL,
    1842                     client_receiver, carg);
    18432081       
    18442082        return EOK;
     
    18812119       
    18822120        ipc_call_async_0(exch->phone, IPC_M_CLONE_ESTABLISH, msg,
    1883             reply_received, true);
     2121            reply_received);
    18842122       
    18852123        sysarg_t rc;
     
    19002138        }
    19012139       
     2140        sess->iface = 0;
    19022141        sess->mgmt = mgmt;
    19032142        sess->phone = phone;
     
    19292168       
    19302169        ipc_call_async_4(phone, IPC_M_CONNECT_ME_TO, arg1, arg2, arg3, arg4,
    1931             msg, reply_received, true);
     2170            msg, reply_received);
    19322171       
    19332172        sysarg_t rc;
     
    19692208        int phone = async_connect_me_to_internal(exch->phone, arg1, arg2, arg3,
    19702209            0);
    1971        
    19722210        if (phone < 0) {
    19732211                errno = phone;
     
    19762214        }
    19772215       
     2216        sess->iface = 0;
    19782217        sess->mgmt = mgmt;
    19792218        sess->phone = phone;
     
    19922231}
    19932232
     2233/** Wrapper for making IPC_M_CONNECT_ME_TO calls using the async framework.
     2234 *
     2235 * Ask through phone for a new connection to some service and block until
     2236 * success.
     2237 *
     2238 * @param exch  Exchange for sending the message.
     2239 * @param iface Connection interface.
     2240 * @param arg2  User defined argument.
     2241 * @param arg3  User defined argument.
     2242 *
     2243 * @return New session on success or NULL on error.
     2244 *
     2245 */
     2246async_sess_t *async_connect_me_to_iface(async_exch_t *exch, iface_t iface,
     2247    sysarg_t arg2, sysarg_t arg3)
     2248{
     2249        if (exch == NULL) {
     2250                errno = ENOENT;
     2251                return NULL;
     2252        }
     2253       
     2254        async_sess_t *sess = (async_sess_t *) malloc(sizeof(async_sess_t));
     2255        if (sess == NULL) {
     2256                errno = ENOMEM;
     2257                return NULL;
     2258        }
     2259       
     2260        int phone = async_connect_me_to_internal(exch->phone, iface, arg2,
     2261            arg3, 0);
     2262        if (phone < 0) {
     2263                errno = phone;
     2264                free(sess);
     2265                return NULL;
     2266        }
     2267       
     2268        sess->iface = iface;
     2269        sess->phone = phone;
     2270        sess->arg1 = iface;
     2271        sess->arg2 = arg2;
     2272        sess->arg3 = arg3;
     2273       
     2274        fibril_mutex_initialize(&sess->remote_state_mtx);
     2275        sess->remote_state_data = NULL;
     2276       
     2277        list_initialize(&sess->exch_list);
     2278        fibril_mutex_initialize(&sess->mutex);
     2279        atomic_set(&sess->refcnt, 0);
     2280       
     2281        return sess;
     2282}
     2283
    19942284/** Set arguments for new connections.
    19952285 *
     
    20472337        }
    20482338       
     2339        sess->iface = 0;
    20492340        sess->mgmt = mgmt;
    20502341        sess->phone = phone;
     
    20632354}
    20642355
     2356/** Wrapper for making IPC_M_CONNECT_ME_TO calls using the async framework.
     2357 *
     2358 * Ask through phone for a new connection to some service and block until
     2359 * success.
     2360 *
     2361 * @param exch  Exchange for sending the message.
     2362 * @param iface Connection interface.
     2363 * @param arg2  User defined argument.
     2364 * @param arg3  User defined argument.
     2365 *
     2366 * @return New session on success or NULL on error.
     2367 *
     2368 */
     2369async_sess_t *async_connect_me_to_blocking_iface(async_exch_t *exch, iface_t iface,
     2370    sysarg_t arg2, sysarg_t arg3)
     2371{
     2372        if (exch == NULL) {
     2373                errno = ENOENT;
     2374                return NULL;
     2375        }
     2376       
     2377        async_sess_t *sess = (async_sess_t *) malloc(sizeof(async_sess_t));
     2378        if (sess == NULL) {
     2379                errno = ENOMEM;
     2380                return NULL;
     2381        }
     2382       
     2383        int phone = async_connect_me_to_internal(exch->phone, iface, arg2,
     2384            arg3, IPC_FLAG_BLOCKING);
     2385        if (phone < 0) {
     2386                errno = phone;
     2387                free(sess);
     2388                return NULL;
     2389        }
     2390       
     2391        sess->iface = iface;
     2392        sess->phone = phone;
     2393        sess->arg1 = iface;
     2394        sess->arg2 = arg2;
     2395        sess->arg3 = arg3;
     2396       
     2397        fibril_mutex_initialize(&sess->remote_state_mtx);
     2398        sess->remote_state_data = NULL;
     2399       
     2400        list_initialize(&sess->exch_list);
     2401        fibril_mutex_initialize(&sess->mutex);
     2402        atomic_set(&sess->refcnt, 0);
     2403       
     2404        return sess;
     2405}
     2406
    20652407/** Connect to a task specified by id.
    20662408 *
     
    20812423        }
    20822424       
     2425        sess->iface = 0;
    20832426        sess->mgmt = EXCHANGE_ATOMIC;
    20842427        sess->phone = phone;
     
    21582501                return NULL;
    21592502       
    2160         async_exch_t *exch;
     2503        exch_mgmt_t mgmt = sess->mgmt;
     2504        if (sess->iface != 0)
     2505                mgmt = sess->iface & IFACE_EXCHANGE_MASK;
     2506       
     2507        async_exch_t *exch = NULL;
    21612508       
    21622509        fibril_mutex_lock(&async_sess_mutex);
     
    21772524                 */
    21782525               
    2179                 if ((sess->mgmt == EXCHANGE_ATOMIC) ||
    2180                     (sess->mgmt == EXCHANGE_SERIALIZE)) {
     2526                if ((mgmt == EXCHANGE_ATOMIC) ||
     2527                    (mgmt == EXCHANGE_SERIALIZE)) {
    21812528                        exch = (async_exch_t *) malloc(sizeof(async_exch_t));
    21822529                        if (exch != NULL) {
     
    21862533                                exch->phone = sess->phone;
    21872534                        }
    2188                 } else {  /* EXCHANGE_PARALLEL */
     2535                } else if (mgmt == EXCHANGE_PARALLEL) {
     2536                        int phone;
     2537                       
     2538                retry:
    21892539                        /*
    21902540                         * Make a one-time attempt to connect a new data phone.
    21912541                         */
    2192                        
    2193                         int phone;
    2194                        
    2195 retry:
    21962542                        phone = async_connect_me_to_internal(sess->phone, sess->arg1,
    21972543                            sess->arg2, sess->arg3, 0);
     
    22352581                atomic_inc(&sess->refcnt);
    22362582               
    2237                 if (sess->mgmt == EXCHANGE_SERIALIZE)
     2583                if (mgmt == EXCHANGE_SERIALIZE)
    22382584                        fibril_mutex_lock(&sess->mutex);
    22392585        }
     
    22552601        assert(sess != NULL);
    22562602       
     2603        exch_mgmt_t mgmt = sess->mgmt;
     2604        if (sess->iface != 0)
     2605                mgmt = sess->iface & IFACE_EXCHANGE_MASK;
     2606       
    22572607        atomic_dec(&sess->refcnt);
    22582608       
    2259         if (sess->mgmt == EXCHANGE_SERIALIZE)
     2609        if (mgmt == EXCHANGE_SERIALIZE)
    22602610                fibril_mutex_unlock(&sess->mutex);
    22612611       
     
    26943044        }
    26953045       
    2696         void *_data;
     3046        void *arg_data;
    26973047       
    26983048        if (nullterm)
    2699                 _data = malloc(size + 1);
     3049                arg_data = malloc(size + 1);
    27003050        else
    2701                 _data = malloc(size);
    2702        
    2703         if (_data == NULL) {
     3051                arg_data = malloc(size);
     3052       
     3053        if (arg_data == NULL) {
    27043054                ipc_answer_0(callid, ENOMEM);
    27053055                return ENOMEM;
    27063056        }
    27073057       
    2708         int rc = async_data_write_finalize(callid, _data, size);
     3058        int rc = async_data_write_finalize(callid, arg_data, size);
    27093059        if (rc != EOK) {
    2710                 free(_data);
     3060                free(arg_data);
    27113061                return rc;
    27123062        }
    27133063       
    27143064        if (nullterm)
    2715                 ((char *) _data)[size] = 0;
    2716        
    2717         *data = _data;
     3065                ((char *) arg_data)[size] = 0;
     3066       
     3067        *data = arg_data;
    27183068        if (received != NULL)
    27193069                *received = size;
     
    28133163        }
    28143164       
     3165        sess->iface = 0;
    28153166        sess->mgmt = mgmt;
    28163167        sess->phone = phone;
     
    28623213        }
    28633214       
     3215        sess->iface = 0;
    28643216        sess->mgmt = mgmt;
    28653217        sess->phone = phone;
     
    29073259                return NULL;
    29083260       
     3261        sess->iface = 0;
    29093262        sess->mgmt = mgmt;
    29103263        sess->phone = phone;
     
    29343287{
    29353288        assert(callid);
    2936 
     3289       
    29373290        ipc_call_t call;
    29383291        *callid = async_get_call(&call);
    2939 
     3292       
    29403293        if (IPC_GET_IMETHOD(call) != IPC_M_STATE_CHANGE_AUTHORIZE)
    29413294                return false;
     
    29473300        if (arg3)
    29483301                *arg3 = IPC_GET_ARG3(call);
    2949 
     3302       
    29503303        return true;
    29513304}
Note: See TracChangeset for help on using the changeset viewer.