source: mainline/uspace/lib/c/generic/async.c@ e86a617a

lfn serial ticket/834-toolchain-update topic/msim-upgrade topic/simplify-dev-export
Last change on this file since e86a617a was c170438, checked in by Jakub Jermar <jakub@…>, 9 years ago

Do not create a new fibril for each IRQ notification

In the absence of fibril serialization, manager fibrils can
theoretically block in async IPC or on fibril synchronization
primitives. Consequently, it is safe to execute the IRQ handler directly
from the manager fibril. The manager fibril can block while processing
the notification, but most of the times it will not block and the
handler will execute atomically.

This changeset modifies the current behaviour so that we no longer spawn
a new notification fibril for each IRQ, but rather execute the handler
directly from the manager fibril and test if the execution blocked. If
it blocked, the manager fibril had assumed the role of a notification
fibril and we destroy it afterwards - merely to avoid fibril population
explosion. Otherwise, which is the usual behavior, we keep it so that
it resumes its job of a manager fibril.

  • Property mode set to 100644
File size: 81.0 KB
Line 
1/*
2 * Copyright (c) 2006 Ondrej Palkovsky
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 *
9 * - Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 * - Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
14 * - The name of the author may not be used to endorse or promote products
15 * derived from this software without specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
18 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
19 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
20 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
21 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
22 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
26 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27 */
28
29/** @addtogroup libc
30 * @{
31 */
32/** @file
33 */
34
35/**
36 * Asynchronous library
37 *
38 * The aim of this library is to provide a facility for writing programs which
39 * utilize the asynchronous nature of HelenOS IPC, yet using a normal way of
40 * programming.
41 *
42 * You should be able to write very simple multithreaded programs. The async
43 * framework will automatically take care of most of the synchronization
44 * problems.
45 *
46 * Example of use (pseudo C):
47 *
48 * 1) Multithreaded client application
49 *
50 * fibril_create(fibril1, ...);
51 * fibril_create(fibril2, ...);
52 * ...
53 *
54 * int fibril1(void *arg)
55 * {
56 * conn = async_connect_me_to(...);
57 *
58 * exch = async_exchange_begin(conn);
59 * c1 = async_send(exch);
60 * async_exchange_end(exch);
61 *
62 * exch = async_exchange_begin(conn);
63 * c2 = async_send(exch);
64 * async_exchange_end(exch);
65 *
66 * async_wait_for(c1);
67 * async_wait_for(c2);
68 * ...
69 * }
70 *
71 *
72 * 2) Multithreaded server application
73 *
74 * main()
75 * {
76 * async_manager();
77 * }
78 *
79 * port_handler(icallid, *icall)
80 * {
81 * if (want_refuse) {
82 * async_answer_0(icallid, ELIMIT);
83 * return;
84 * }
85 * async_answer_0(icallid, EOK);
86 *
87 * callid = async_get_call(&call);
88 * somehow_handle_the_call(callid, call);
89 * async_answer_2(callid, 1, 2, 3);
90 *
91 * callid = async_get_call(&call);
92 * ...
93 * }
94 *
95 */
96
97#define LIBC_ASYNC_C_
98#include <ipc/ipc.h>
99#include <async.h>
100#include "private/async.h"
101#undef LIBC_ASYNC_C_
102
103#include <ipc/irq.h>
104#include <ipc/event.h>
105#include <futex.h>
106#include <fibril.h>
107#include <adt/hash_table.h>
108#include <adt/list.h>
109#include <assert.h>
110#include <errno.h>
111#include <sys/time.h>
112#include <libarch/barrier.h>
113#include <stdbool.h>
114#include <malloc.h>
115#include <mem.h>
116#include <stdlib.h>
117#include <macros.h>
118#include "private/libc.h"
119
120/** Session data */
121struct async_sess {
122 /** List of inactive exchanges */
123 list_t exch_list;
124
125 /** Session interface */
126 iface_t iface;
127
128 /** Exchange management style */
129 exch_mgmt_t mgmt;
130
131 /** Session identification */
132 int phone;
133
134 /** First clone connection argument */
135 sysarg_t arg1;
136
137 /** Second clone connection argument */
138 sysarg_t arg2;
139
140 /** Third clone connection argument */
141 sysarg_t arg3;
142
143 /** Exchange mutex */
144 fibril_mutex_t mutex;
145
146 /** Number of opened exchanges */
147 atomic_t refcnt;
148
149 /** Mutex for stateful connections */
150 fibril_mutex_t remote_state_mtx;
151
152 /** Data for stateful connections */
153 void *remote_state_data;
154};
155
156/** Exchange data */
157struct async_exch {
158 /** Link into list of inactive exchanges */
159 link_t sess_link;
160
161 /** Link into global list of inactive exchanges */
162 link_t global_link;
163
164 /** Session pointer */
165 async_sess_t *sess;
166
167 /** Exchange identification */
168 int phone;
169};
170
171/** Async framework global futex */
172futex_t async_futex = FUTEX_INITIALIZER;
173
174/** Number of threads waiting for IPC in the kernel. */
175atomic_t threads_in_ipc_wait = { 0 };
176
177/** Naming service session */
178async_sess_t *session_ns;
179
180/** Call data */
181typedef struct {
182 link_t link;
183
184 ipc_callid_t callid;
185 ipc_call_t call;
186} msg_t;
187
188/** Message data */
189typedef struct {
190 awaiter_t wdata;
191
192 /** If reply was received. */
193 bool done;
194
195 /** If the message / reply should be discarded on arrival. */
196 bool forget;
197
198 /** If already destroyed. */
199 bool destroyed;
200
201 /** Pointer to where the answer data is stored. */
202 ipc_call_t *dataptr;
203
204 sysarg_t retval;
205} amsg_t;
206
207/* Client connection data */
208typedef struct {
209 ht_link_t link;
210
211 task_id_t in_task_id;
212 atomic_t refcnt;
213 void *data;
214} client_t;
215
216/* Server connection data */
217typedef struct {
218 awaiter_t wdata;
219
220 /** Hash table link. */
221 ht_link_t link;
222
223 /** Incoming client task ID. */
224 task_id_t in_task_id;
225
226 /** Incoming phone hash. */
227 sysarg_t in_phone_hash;
228
229 /** Link to the client tracking structure. */
230 client_t *client;
231
232 /** Messages that should be delivered to this fibril. */
233 list_t msg_queue;
234
235 /** Identification of the opening call. */
236 ipc_callid_t callid;
237
238 /** Call data of the opening call. */
239 ipc_call_t call;
240
241 /** Identification of the closing call. */
242 ipc_callid_t close_callid;
243
244 /** Fibril function that will be used to handle the connection. */
245 async_port_handler_t handler;
246
247 /** Client data */
248 void *data;
249} connection_t;
250
251/** Interface data */
252typedef struct {
253 ht_link_t link;
254
255 /** Interface ID */
256 iface_t iface;
257
258 /** Futex protecting the hash table */
259 futex_t futex;
260
261 /** Interface ports */
262 hash_table_t port_hash_table;
263
264 /** Next available port ID */
265 port_id_t port_id_avail;
266} interface_t;
267
268/* Port data */
269typedef struct {
270 ht_link_t link;
271
272 /** Port ID */
273 port_id_t id;
274
275 /** Port connection handler */
276 async_port_handler_t handler;
277
278 /** Client data */
279 void *data;
280} port_t;
281
282/* Notification data */
283typedef struct {
284 ht_link_t link;
285
286 /** Notification method */
287 sysarg_t imethod;
288
289 /** Notification handler */
290 async_notification_handler_t handler;
291
292 /** Notification data */
293 void *data;
294} notification_t;
295
296/** Identifier of the incoming connection handled by the current fibril. */
297static fibril_local connection_t *fibril_connection;
298
299static void to_event_initialize(to_event_t *to)
300{
301 struct timeval tv = { 0, 0 };
302
303 to->inlist = false;
304 to->occurred = false;
305 link_initialize(&to->link);
306 to->expires = tv;
307}
308
309static void wu_event_initialize(wu_event_t *wu)
310{
311 wu->inlist = false;
312 link_initialize(&wu->link);
313}
314
315void awaiter_initialize(awaiter_t *aw)
316{
317 aw->fid = 0;
318 aw->active = false;
319 to_event_initialize(&aw->to_event);
320 wu_event_initialize(&aw->wu_event);
321}
322
323static amsg_t *amsg_create(void)
324{
325 amsg_t *msg = malloc(sizeof(amsg_t));
326 if (msg) {
327 msg->done = false;
328 msg->forget = false;
329 msg->destroyed = false;
330 msg->dataptr = NULL;
331 msg->retval = (sysarg_t) EINVAL;
332 awaiter_initialize(&msg->wdata);
333 }
334
335 return msg;
336}
337
338static void amsg_destroy(amsg_t *msg)
339{
340 assert(!msg->destroyed);
341 msg->destroyed = true;
342 free(msg);
343}
344
345static void *default_client_data_constructor(void)
346{
347 return NULL;
348}
349
350static void default_client_data_destructor(void *data)
351{
352}
353
354static async_client_data_ctor_t async_client_data_create =
355 default_client_data_constructor;
356static async_client_data_dtor_t async_client_data_destroy =
357 default_client_data_destructor;
358
359void async_set_client_data_constructor(async_client_data_ctor_t ctor)
360{
361 assert(async_client_data_create == default_client_data_constructor);
362 async_client_data_create = ctor;
363}
364
365void async_set_client_data_destructor(async_client_data_dtor_t dtor)
366{
367 assert(async_client_data_destroy == default_client_data_destructor);
368 async_client_data_destroy = dtor;
369}
370
371/** Default fallback fibril function.
372 *
373 * This fallback fibril function gets called on incomming
374 * connections that do not have a specific handler defined.
375 *
376 * @param callid Hash of the incoming call.
377 * @param call Data of the incoming call.
378 * @param arg Local argument
379 *
380 */
381static void default_fallback_port_handler(ipc_callid_t callid, ipc_call_t *call,
382 void *arg)
383{
384 ipc_answer_0(callid, ENOENT);
385}
386
387static async_port_handler_t fallback_port_handler =
388 default_fallback_port_handler;
389static void *fallback_port_data = NULL;
390
391static hash_table_t interface_hash_table;
392
393static size_t interface_key_hash(void *key)
394{
395 iface_t iface = *(iface_t *) key;
396 return iface;
397}
398
399static size_t interface_hash(const ht_link_t *item)
400{
401 interface_t *interface = hash_table_get_inst(item, interface_t, link);
402 return interface_key_hash(&interface->iface);
403}
404
405static bool interface_key_equal(void *key, const ht_link_t *item)
406{
407 iface_t iface = *(iface_t *) key;
408 interface_t *interface = hash_table_get_inst(item, interface_t, link);
409 return iface == interface->iface;
410}
411
412/** Operations for the port hash table. */
413static hash_table_ops_t interface_hash_table_ops = {
414 .hash = interface_hash,
415 .key_hash = interface_key_hash,
416 .key_equal = interface_key_equal,
417 .equal = NULL,
418 .remove_callback = NULL
419};
420
421static size_t port_key_hash(void *key)
422{
423 port_id_t port_id = *(port_id_t *) key;
424 return port_id;
425}
426
427static size_t port_hash(const ht_link_t *item)
428{
429 port_t *port = hash_table_get_inst(item, port_t, link);
430 return port_key_hash(&port->id);
431}
432
433static bool port_key_equal(void *key, const ht_link_t *item)
434{
435 port_id_t port_id = *(port_id_t *) key;
436 port_t *port = hash_table_get_inst(item, port_t, link);
437 return port_id == port->id;
438}
439
440/** Operations for the port hash table. */
441static hash_table_ops_t port_hash_table_ops = {
442 .hash = port_hash,
443 .key_hash = port_key_hash,
444 .key_equal = port_key_equal,
445 .equal = NULL,
446 .remove_callback = NULL
447};
448
449static interface_t *async_new_interface(iface_t iface)
450{
451 interface_t *interface =
452 (interface_t *) malloc(sizeof(interface_t));
453 if (!interface)
454 return NULL;
455
456 bool ret = hash_table_create(&interface->port_hash_table, 0, 0,
457 &port_hash_table_ops);
458 if (!ret) {
459 free(interface);
460 return NULL;
461 }
462
463 interface->iface = iface;
464 futex_initialize(&interface->futex, 1);
465 interface->port_id_avail = 0;
466
467 hash_table_insert(&interface_hash_table, &interface->link);
468
469 return interface;
470}
471
472static port_t *async_new_port(interface_t *interface,
473 async_port_handler_t handler, void *data)
474{
475 port_t *port = (port_t *) malloc(sizeof(port_t));
476 if (!port)
477 return NULL;
478
479 futex_down(&interface->futex);
480
481 port_id_t id = interface->port_id_avail;
482 interface->port_id_avail++;
483
484 port->id = id;
485 port->handler = handler;
486 port->data = data;
487
488 hash_table_insert(&interface->port_hash_table, &port->link);
489
490 futex_up(&interface->futex);
491
492 return port;
493}
494
495/** Mutex protecting inactive_exch_list and avail_phone_cv.
496 *
497 */
498static FIBRIL_MUTEX_INITIALIZE(async_sess_mutex);
499
500/** List of all currently inactive exchanges.
501 *
502 */
503static LIST_INITIALIZE(inactive_exch_list);
504
505/** Condition variable to wait for a phone to become available.
506 *
507 */
508static FIBRIL_CONDVAR_INITIALIZE(avail_phone_cv);
509
510int async_create_port(iface_t iface, async_port_handler_t handler,
511 void *data, port_id_t *port_id)
512{
513 if ((iface & IFACE_MOD_MASK) == IFACE_MOD_CALLBACK)
514 return EINVAL;
515
516 interface_t *interface;
517
518 futex_down(&async_futex);
519
520 ht_link_t *link = hash_table_find(&interface_hash_table, &iface);
521 if (link)
522 interface = hash_table_get_inst(link, interface_t, link);
523 else
524 interface = async_new_interface(iface);
525
526 if (!interface) {
527 futex_up(&async_futex);
528 return ENOMEM;
529 }
530
531 port_t *port = async_new_port(interface, handler, data);
532 if (!port) {
533 futex_up(&async_futex);
534 return ENOMEM;
535 }
536
537 *port_id = port->id;
538
539 futex_up(&async_futex);
540
541 return EOK;
542}
543
544void async_set_fallback_port_handler(async_port_handler_t handler, void *data)
545{
546 assert(handler != NULL);
547
548 fallback_port_handler = handler;
549 fallback_port_data = data;
550}
551
552static hash_table_t client_hash_table;
553static hash_table_t conn_hash_table;
554static hash_table_t notification_hash_table;
555static LIST_INITIALIZE(timeout_list);
556
557static sysarg_t notification_avail = 0;
558
559static size_t client_key_hash(void *key)
560{
561 task_id_t in_task_id = *(task_id_t *) key;
562 return in_task_id;
563}
564
565static size_t client_hash(const ht_link_t *item)
566{
567 client_t *client = hash_table_get_inst(item, client_t, link);
568 return client_key_hash(&client->in_task_id);
569}
570
571static bool client_key_equal(void *key, const ht_link_t *item)
572{
573 task_id_t in_task_id = *(task_id_t *) key;
574 client_t *client = hash_table_get_inst(item, client_t, link);
575 return in_task_id == client->in_task_id;
576}
577
578/** Operations for the client hash table. */
579static hash_table_ops_t client_hash_table_ops = {
580 .hash = client_hash,
581 .key_hash = client_key_hash,
582 .key_equal = client_key_equal,
583 .equal = NULL,
584 .remove_callback = NULL
585};
586
587/** Compute hash into the connection hash table based on the source phone hash.
588 *
589 * @param key Pointer to source phone hash.
590 *
591 * @return Index into the connection hash table.
592 *
593 */
594static size_t conn_key_hash(void *key)
595{
596 sysarg_t in_phone_hash = *(sysarg_t *) key;
597 return in_phone_hash;
598}
599
600static size_t conn_hash(const ht_link_t *item)
601{
602 connection_t *conn = hash_table_get_inst(item, connection_t, link);
603 return conn_key_hash(&conn->in_phone_hash);
604}
605
606static bool conn_key_equal(void *key, const ht_link_t *item)
607{
608 sysarg_t in_phone_hash = *(sysarg_t *) key;
609 connection_t *conn = hash_table_get_inst(item, connection_t, link);
610 return (in_phone_hash == conn->in_phone_hash);
611}
612
613/** Operations for the connection hash table. */
614static hash_table_ops_t conn_hash_table_ops = {
615 .hash = conn_hash,
616 .key_hash = conn_key_hash,
617 .key_equal = conn_key_equal,
618 .equal = NULL,
619 .remove_callback = NULL
620};
621
622static client_t *async_client_get(task_id_t client_id, bool create)
623{
624 client_t *client = NULL;
625
626 futex_down(&async_futex);
627 ht_link_t *link = hash_table_find(&client_hash_table, &client_id);
628 if (link) {
629 client = hash_table_get_inst(link, client_t, link);
630 atomic_inc(&client->refcnt);
631 } else if (create) {
632 client = malloc(sizeof(client_t));
633 if (client) {
634 client->in_task_id = client_id;
635 client->data = async_client_data_create();
636
637 atomic_set(&client->refcnt, 1);
638 hash_table_insert(&client_hash_table, &client->link);
639 }
640 }
641
642 futex_up(&async_futex);
643 return client;
644}
645
646static void async_client_put(client_t *client)
647{
648 bool destroy;
649
650 futex_down(&async_futex);
651
652 if (atomic_predec(&client->refcnt) == 0) {
653 hash_table_remove(&client_hash_table, &client->in_task_id);
654 destroy = true;
655 } else
656 destroy = false;
657
658 futex_up(&async_futex);
659
660 if (destroy) {
661 if (client->data)
662 async_client_data_destroy(client->data);
663
664 free(client);
665 }
666}
667
668/** Wrapper for client connection fibril.
669 *
670 * When a new connection arrives, a fibril with this implementing
671 * function is created.
672 *
673 * @param arg Connection structure pointer.
674 *
675 * @return Always zero.
676 *
677 */
678static int connection_fibril(void *arg)
679{
680 assert(arg);
681
682 /*
683 * Setup fibril-local connection pointer.
684 */
685 fibril_connection = (connection_t *) arg;
686
687 /*
688 * Add our reference for the current connection in the client task
689 * tracking structure. If this is the first reference, create and
690 * hash in a new tracking structure.
691 */
692
693 client_t *client = async_client_get(fibril_connection->in_task_id, true);
694 if (!client) {
695 ipc_answer_0(fibril_connection->callid, ENOMEM);
696 return 0;
697 }
698
699 fibril_connection->client = client;
700
701 /*
702 * Call the connection handler function.
703 */
704 fibril_connection->handler(fibril_connection->callid,
705 &fibril_connection->call, fibril_connection->data);
706
707 /*
708 * Remove the reference for this client task connection.
709 */
710 async_client_put(client);
711
712 /*
713 * Remove myself from the connection hash table.
714 */
715 futex_down(&async_futex);
716 hash_table_remove(&conn_hash_table, &fibril_connection->in_phone_hash);
717 futex_up(&async_futex);
718
719 /*
720 * Answer all remaining messages with EHANGUP.
721 */
722 while (!list_empty(&fibril_connection->msg_queue)) {
723 msg_t *msg =
724 list_get_instance(list_first(&fibril_connection->msg_queue),
725 msg_t, link);
726
727 list_remove(&msg->link);
728 ipc_answer_0(msg->callid, EHANGUP);
729 free(msg);
730 }
731
732 /*
733 * If the connection was hung-up, answer the last call,
734 * i.e. IPC_M_PHONE_HUNGUP.
735 */
736 if (fibril_connection->close_callid)
737 ipc_answer_0(fibril_connection->close_callid, EOK);
738
739 free(fibril_connection);
740 return 0;
741}
742
743/** Create a new fibril for a new connection.
744 *
745 * Create new fibril for connection, fill in connection structures
746 * and insert it into the hash table, so that later we can easily
747 * do routing of messages to particular fibrils.
748 *
749 * @param in_task_id Identification of the incoming connection.
750 * @param in_phone_hash Identification of the incoming connection.
751 * @param callid Hash of the opening IPC_M_CONNECT_ME_TO call.
752 * If callid is zero, the connection was opened by
753 * accepting the IPC_M_CONNECT_TO_ME call and this
754 * function is called directly by the server.
755 * @param call Call data of the opening call.
756 * @param handler Connection handler.
757 * @param data Client argument to pass to the connection handler.
758 *
759 * @return New fibril id or NULL on failure.
760 *
761 */
762static fid_t async_new_connection(task_id_t in_task_id, sysarg_t in_phone_hash,
763 ipc_callid_t callid, ipc_call_t *call, async_port_handler_t handler,
764 void *data)
765{
766 connection_t *conn = malloc(sizeof(*conn));
767 if (!conn) {
768 if (callid)
769 ipc_answer_0(callid, ENOMEM);
770
771 return (uintptr_t) NULL;
772 }
773
774 conn->in_task_id = in_task_id;
775 conn->in_phone_hash = in_phone_hash;
776 list_initialize(&conn->msg_queue);
777 conn->callid = callid;
778 conn->close_callid = 0;
779 conn->handler = handler;
780 conn->data = data;
781
782 if (call)
783 conn->call = *call;
784
785 /* We will activate the fibril ASAP */
786 conn->wdata.active = true;
787 conn->wdata.fid = fibril_create(connection_fibril, conn);
788
789 if (conn->wdata.fid == 0) {
790 free(conn);
791
792 if (callid)
793 ipc_answer_0(callid, ENOMEM);
794
795 return (uintptr_t) NULL;
796 }
797
798 /* Add connection to the connection hash table */
799
800 futex_down(&async_futex);
801 hash_table_insert(&conn_hash_table, &conn->link);
802 futex_up(&async_futex);
803
804 fibril_add_ready(conn->wdata.fid);
805
806 return conn->wdata.fid;
807}
808
809/** Wrapper for making IPC_M_CONNECT_TO_ME calls using the async framework.
810 *
811 * Ask through phone for a new connection to some service.
812 *
813 * @param exch Exchange for sending the message.
814 * @param iface Callback interface.
815 * @param arg1 User defined argument.
816 * @param arg2 User defined argument.
817 * @param handler Callback handler.
818 * @param data Handler data.
819 * @param port_id ID of the newly created port.
820 *
821 * @return Zero on success or a negative error code.
822 *
823 */
824int async_create_callback_port(async_exch_t *exch, iface_t iface, sysarg_t arg1,
825 sysarg_t arg2, async_port_handler_t handler, void *data, port_id_t *port_id)
826{
827 if ((iface & IFACE_MOD_CALLBACK) != IFACE_MOD_CALLBACK)
828 return EINVAL;
829
830 if (exch == NULL)
831 return ENOENT;
832
833 ipc_call_t answer;
834 aid_t req = async_send_3(exch, IPC_M_CONNECT_TO_ME, iface, arg1, arg2,
835 &answer);
836
837 sysarg_t ret;
838 async_wait_for(req, &ret);
839 if (ret != EOK)
840 return (int) ret;
841
842 sysarg_t phone_hash = IPC_GET_ARG5(answer);
843 interface_t *interface;
844
845 futex_down(&async_futex);
846
847 ht_link_t *link = hash_table_find(&interface_hash_table, &iface);
848 if (link)
849 interface = hash_table_get_inst(link, interface_t, link);
850 else
851 interface = async_new_interface(iface);
852
853 if (!interface) {
854 futex_up(&async_futex);
855 return ENOMEM;
856 }
857
858 port_t *port = async_new_port(interface, handler, data);
859 if (!port) {
860 futex_up(&async_futex);
861 return ENOMEM;
862 }
863
864 *port_id = port->id;
865
866 futex_up(&async_futex);
867
868 fid_t fid = async_new_connection(answer.in_task_id, phone_hash,
869 0, NULL, handler, data);
870 if (fid == (uintptr_t) NULL)
871 return ENOMEM;
872
873 return EOK;
874}
875
876static size_t notification_key_hash(void *key)
877{
878 sysarg_t id = *(sysarg_t *) key;
879 return id;
880}
881
882static size_t notification_hash(const ht_link_t *item)
883{
884 notification_t *notification =
885 hash_table_get_inst(item, notification_t, link);
886 return notification_key_hash(&notification->imethod);
887}
888
889static bool notification_key_equal(void *key, const ht_link_t *item)
890{
891 sysarg_t id = *(sysarg_t *) key;
892 notification_t *notification =
893 hash_table_get_inst(item, notification_t, link);
894 return id == notification->imethod;
895}
896
897/** Operations for the notification hash table. */
898static hash_table_ops_t notification_hash_table_ops = {
899 .hash = notification_hash,
900 .key_hash = notification_key_hash,
901 .key_equal = notification_key_equal,
902 .equal = NULL,
903 .remove_callback = NULL
904};
905
906/** Sort in current fibril's timeout request.
907 *
908 * @param wd Wait data of the current fibril.
909 *
910 */
911void async_insert_timeout(awaiter_t *wd)
912{
913 assert(wd);
914
915 wd->to_event.occurred = false;
916 wd->to_event.inlist = true;
917
918 link_t *tmp = timeout_list.head.next;
919 while (tmp != &timeout_list.head) {
920 awaiter_t *cur
921 = list_get_instance(tmp, awaiter_t, to_event.link);
922
923 if (tv_gteq(&cur->to_event.expires, &wd->to_event.expires))
924 break;
925
926 tmp = tmp->next;
927 }
928
929 list_insert_before(&wd->to_event.link, tmp);
930}
931
932/** Try to route a call to an appropriate connection fibril.
933 *
934 * If the proper connection fibril is found, a message with the call is added to
935 * its message queue. If the fibril was not active, it is activated and all
936 * timeouts are unregistered.
937 *
938 * @param callid Hash of the incoming call.
939 * @param call Data of the incoming call.
940 *
941 * @return False if the call doesn't match any connection.
942 * @return True if the call was passed to the respective connection fibril.
943 *
944 */
945static bool route_call(ipc_callid_t callid, ipc_call_t *call)
946{
947 assert(call);
948
949 futex_down(&async_futex);
950
951 ht_link_t *link = hash_table_find(&conn_hash_table, &call->in_phone_hash);
952 if (!link) {
953 futex_up(&async_futex);
954 return false;
955 }
956
957 connection_t *conn = hash_table_get_inst(link, connection_t, link);
958
959 msg_t *msg = malloc(sizeof(*msg));
960 if (!msg) {
961 futex_up(&async_futex);
962 return false;
963 }
964
965 msg->callid = callid;
966 msg->call = *call;
967 list_append(&msg->link, &conn->msg_queue);
968
969 if (IPC_GET_IMETHOD(*call) == IPC_M_PHONE_HUNGUP)
970 conn->close_callid = callid;
971
972 /* If the connection fibril is waiting for an event, activate it */
973 if (!conn->wdata.active) {
974
975 /* If in timeout list, remove it */
976 if (conn->wdata.to_event.inlist) {
977 conn->wdata.to_event.inlist = false;
978 list_remove(&conn->wdata.to_event.link);
979 }
980
981 conn->wdata.active = true;
982 fibril_add_ready(conn->wdata.fid);
983 }
984
985 futex_up(&async_futex);
986 return true;
987}
988
989/** Process notification.
990 *
991 * @param callid Hash of the incoming call.
992 * @param call Data of the incoming call.
993 */
994static void process_notification(ipc_callid_t callid, ipc_call_t *call)
995{
996 async_notification_handler_t handler = NULL;
997 void *data = NULL;
998
999 assert(call);
1000
1001 futex_down(&async_futex);
1002
1003 ht_link_t *link = hash_table_find(&notification_hash_table,
1004 &IPC_GET_IMETHOD(*call));
1005 if (link) {
1006 notification_t *notification =
1007 hash_table_get_inst(link, notification_t, link);
1008 handler = notification->handler;
1009 data = notification->data;
1010 }
1011
1012 futex_up(&async_futex);
1013
1014 if (handler)
1015 handler(callid, call, data);
1016}
1017
1018/** Subscribe to IRQ notification.
1019 *
1020 * @param inr IRQ number.
1021 * @param devno Device number of the device generating inr.
1022 * @param handler Notification handler.
1023 * @param data Notification handler client data.
1024 * @param ucode Top-half pseudocode handler.
1025 *
1026 * @return Zero on success or a negative error code.
1027 *
1028 */
1029int async_irq_subscribe(int inr, int devno,
1030 async_notification_handler_t handler, void *data, const irq_code_t *ucode)
1031{
1032 notification_t *notification =
1033 (notification_t *) malloc(sizeof(notification_t));
1034 if (!notification)
1035 return ENOMEM;
1036
1037 futex_down(&async_futex);
1038
1039 sysarg_t imethod = notification_avail;
1040 notification_avail++;
1041
1042 notification->imethod = imethod;
1043 notification->handler = handler;
1044 notification->data = data;
1045
1046 hash_table_insert(&notification_hash_table, &notification->link);
1047
1048 futex_up(&async_futex);
1049
1050 return ipc_irq_subscribe(inr, devno, imethod, ucode);
1051}
1052
1053/** Unsubscribe from IRQ notification.
1054 *
1055 * @param inr IRQ number.
1056 * @param devno Device number of the device generating inr.
1057 *
1058 * @return Zero on success or a negative error code.
1059 *
1060 */
1061int async_irq_unsubscribe(int inr, int devno)
1062{
1063 // TODO: Remove entry from hash table
1064 // to avoid memory leak
1065
1066 return ipc_irq_unsubscribe(inr, devno);
1067}
1068
1069/** Subscribe to event notifications.
1070 *
1071 * @param evno Event type to subscribe.
1072 * @param handler Notification handler.
1073 * @param data Notification handler client data.
1074 *
1075 * @return Zero on success or a negative error code.
1076 *
1077 */
1078int async_event_subscribe(event_type_t evno,
1079 async_notification_handler_t handler, void *data)
1080{
1081 notification_t *notification =
1082 (notification_t *) malloc(sizeof(notification_t));
1083 if (!notification)
1084 return ENOMEM;
1085
1086 futex_down(&async_futex);
1087
1088 sysarg_t imethod = notification_avail;
1089 notification_avail++;
1090
1091 notification->imethod = imethod;
1092 notification->handler = handler;
1093 notification->data = data;
1094
1095 hash_table_insert(&notification_hash_table, &notification->link);
1096
1097 futex_up(&async_futex);
1098
1099 return ipc_event_subscribe(evno, imethod);
1100}
1101
1102/** Subscribe to task event notifications.
1103 *
1104 * @param evno Event type to subscribe.
1105 * @param handler Notification handler.
1106 * @param data Notification handler client data.
1107 *
1108 * @return Zero on success or a negative error code.
1109 *
1110 */
1111int async_event_task_subscribe(event_task_type_t evno,
1112 async_notification_handler_t handler, void *data)
1113{
1114 notification_t *notification =
1115 (notification_t *) malloc(sizeof(notification_t));
1116 if (!notification)
1117 return ENOMEM;
1118
1119 futex_down(&async_futex);
1120
1121 sysarg_t imethod = notification_avail;
1122 notification_avail++;
1123
1124 notification->imethod = imethod;
1125 notification->handler = handler;
1126 notification->data = data;
1127
1128 hash_table_insert(&notification_hash_table, &notification->link);
1129
1130 futex_up(&async_futex);
1131
1132 return ipc_event_task_subscribe(evno, imethod);
1133}
1134
1135/** Unmask event notifications.
1136 *
1137 * @param evno Event type to unmask.
1138 *
1139 * @return Value returned by the kernel.
1140 *
1141 */
1142int async_event_unmask(event_type_t evno)
1143{
1144 return ipc_event_unmask(evno);
1145}
1146
1147/** Unmask task event notifications.
1148 *
1149 * @param evno Event type to unmask.
1150 *
1151 * @return Value returned by the kernel.
1152 *
1153 */
1154int async_event_task_unmask(event_task_type_t evno)
1155{
1156 return ipc_event_task_unmask(evno);
1157}
1158
1159/** Return new incoming message for the current (fibril-local) connection.
1160 *
1161 * @param call Storage where the incoming call data will be stored.
1162 * @param usecs Timeout in microseconds. Zero denotes no timeout.
1163 *
1164 * @return If no timeout was specified, then a hash of the
1165 * incoming call is returned. If a timeout is specified,
1166 * then a hash of the incoming call is returned unless
1167 * the timeout expires prior to receiving a message. In
1168 * that case zero is returned.
1169 *
1170 */
1171ipc_callid_t async_get_call_timeout(ipc_call_t *call, suseconds_t usecs)
1172{
1173 assert(call);
1174 assert(fibril_connection);
1175
1176 /* Why doing this?
1177 * GCC 4.1.0 coughs on fibril_connection-> dereference.
1178 * GCC 4.1.1 happilly puts the rdhwr instruction in delay slot.
1179 * I would never expect to find so many errors in
1180 * a compiler.
1181 */
1182 connection_t *conn = fibril_connection;
1183
1184 futex_down(&async_futex);
1185
1186 if (usecs) {
1187 getuptime(&conn->wdata.to_event.expires);
1188 tv_add_diff(&conn->wdata.to_event.expires, usecs);
1189 } else
1190 conn->wdata.to_event.inlist = false;
1191
1192 /* If nothing in queue, wait until something arrives */
1193 while (list_empty(&conn->msg_queue)) {
1194 if (conn->close_callid) {
1195 /*
1196 * Handle the case when the connection was already
1197 * closed by the client but the server did not notice
1198 * the first IPC_M_PHONE_HUNGUP call and continues to
1199 * call async_get_call_timeout(). Repeat
1200 * IPC_M_PHONE_HUNGUP until the caller notices.
1201 */
1202 memset(call, 0, sizeof(ipc_call_t));
1203 IPC_SET_IMETHOD(*call, IPC_M_PHONE_HUNGUP);
1204 futex_up(&async_futex);
1205 return conn->close_callid;
1206 }
1207
1208 if (usecs)
1209 async_insert_timeout(&conn->wdata);
1210
1211 conn->wdata.active = false;
1212
1213 /*
1214 * Note: the current fibril will be rescheduled either due to a
1215 * timeout or due to an arriving message destined to it. In the
1216 * former case, handle_expired_timeouts() and, in the latter
1217 * case, route_call() will perform the wakeup.
1218 */
1219 fibril_switch(FIBRIL_TO_MANAGER);
1220
1221 /*
1222 * Futex is up after getting back from async_manager.
1223 * Get it again.
1224 */
1225 futex_down(&async_futex);
1226 if ((usecs) && (conn->wdata.to_event.occurred)
1227 && (list_empty(&conn->msg_queue))) {
1228 /* If we timed out -> exit */
1229 futex_up(&async_futex);
1230 return 0;
1231 }
1232 }
1233
1234 msg_t *msg = list_get_instance(list_first(&conn->msg_queue),
1235 msg_t, link);
1236 list_remove(&msg->link);
1237
1238 ipc_callid_t callid = msg->callid;
1239 *call = msg->call;
1240 free(msg);
1241
1242 futex_up(&async_futex);
1243 return callid;
1244}
1245
1246void *async_get_client_data(void)
1247{
1248 assert(fibril_connection);
1249 return fibril_connection->client->data;
1250}
1251
1252void *async_get_client_data_by_id(task_id_t client_id)
1253{
1254 client_t *client = async_client_get(client_id, false);
1255 if (!client)
1256 return NULL;
1257
1258 if (!client->data) {
1259 async_client_put(client);
1260 return NULL;
1261 }
1262
1263 return client->data;
1264}
1265
1266void async_put_client_data_by_id(task_id_t client_id)
1267{
1268 client_t *client = async_client_get(client_id, false);
1269
1270 assert(client);
1271 assert(client->data);
1272
1273 /* Drop the reference we got in async_get_client_data_by_hash(). */
1274 async_client_put(client);
1275
1276 /* Drop our own reference we got at the beginning of this function. */
1277 async_client_put(client);
1278}
1279
1280static port_t *async_find_port(iface_t iface, port_id_t port_id)
1281{
1282 port_t *port = NULL;
1283
1284 futex_down(&async_futex);
1285
1286 ht_link_t *link = hash_table_find(&interface_hash_table, &iface);
1287 if (link) {
1288 interface_t *interface =
1289 hash_table_get_inst(link, interface_t, link);
1290
1291 link = hash_table_find(&interface->port_hash_table, &port_id);
1292 if (link)
1293 port = hash_table_get_inst(link, port_t, link);
1294 }
1295
1296 futex_up(&async_futex);
1297
1298 return port;
1299}
1300
1301/** Handle a call that was received.
1302 *
1303 * If the call has the IPC_M_CONNECT_ME_TO method, a new connection is created.
1304 * Otherwise the call is routed to its connection fibril.
1305 *
1306 * @param callid Hash of the incoming call.
1307 * @param call Data of the incoming call.
1308 *
1309 */
1310static void handle_call(ipc_callid_t callid, ipc_call_t *call)
1311{
1312 assert(call);
1313
1314 /* Kernel notification */
1315 if ((callid & IPC_CALLID_NOTIFICATION)) {
1316 fibril_t *fibril = (fibril_t *) __tcb_get()->fibril_data;
1317 unsigned oldsw = fibril->switches;
1318
1319 process_notification(callid, call);
1320
1321 if (oldsw != fibril->switches) {
1322 /*
1323 * The notification handler did not execute atomically
1324 * and so the current manager fibril assumed the role of
1325 * a notification fibril. While waiting for its
1326 * resources, it switched to another manager fibril that
1327 * had already existed or it created a new one. We
1328 * therefore know there is at least yet another
1329 * manager fibril that can take over. We now kill the
1330 * current 'notification' fibril to prevent fibril
1331 * population explosion.
1332 */
1333 futex_down(&async_futex);
1334 fibril_switch(FIBRIL_FROM_DEAD);
1335 }
1336 return;
1337 }
1338
1339 /* New connection */
1340 if (IPC_GET_IMETHOD(*call) == IPC_M_CONNECT_ME_TO) {
1341 iface_t iface = (iface_t) IPC_GET_ARG1(*call);
1342 sysarg_t in_phone_hash = IPC_GET_ARG5(*call);
1343
1344 async_notification_handler_t handler = fallback_port_handler;
1345 void *data = fallback_port_data;
1346
1347 // TODO: Currently ignores all ports but the first one
1348 port_t *port = async_find_port(iface, 0);
1349 if (port) {
1350 handler = port->handler;
1351 data = port->data;
1352 }
1353
1354 async_new_connection(call->in_task_id, in_phone_hash, callid,
1355 call, handler, data);
1356 return;
1357 }
1358
1359 /* Cloned connection */
1360 if (IPC_GET_IMETHOD(*call) == IPC_M_CLONE_ESTABLISH) {
1361 // TODO: Currently ignores ports altogether
1362
1363 /* Open new connection with fibril, etc. */
1364 async_new_connection(call->in_task_id, IPC_GET_ARG5(*call),
1365 callid, call, fallback_port_handler, fallback_port_data);
1366 return;
1367 }
1368
1369 /* Try to route the call through the connection hash table */
1370 if (route_call(callid, call))
1371 return;
1372
1373 /* Unknown call from unknown phone - hang it up */
1374 ipc_answer_0(callid, EHANGUP);
1375}
1376
1377/** Fire all timeouts that expired. */
1378static void handle_expired_timeouts(void)
1379{
1380 struct timeval tv;
1381 getuptime(&tv);
1382
1383 futex_down(&async_futex);
1384
1385 link_t *cur = list_first(&timeout_list);
1386 while (cur != NULL) {
1387 awaiter_t *waiter =
1388 list_get_instance(cur, awaiter_t, to_event.link);
1389
1390 if (tv_gt(&waiter->to_event.expires, &tv))
1391 break;
1392
1393 list_remove(&waiter->to_event.link);
1394 waiter->to_event.inlist = false;
1395 waiter->to_event.occurred = true;
1396
1397 /*
1398 * Redundant condition?
1399 * The fibril should not be active when it gets here.
1400 */
1401 if (!waiter->active) {
1402 waiter->active = true;
1403 fibril_add_ready(waiter->fid);
1404 }
1405
1406 cur = list_first(&timeout_list);
1407 }
1408
1409 futex_up(&async_futex);
1410}
1411
1412/** Endless loop dispatching incoming calls and answers.
1413 *
1414 * @return Never returns.
1415 *
1416 */
1417static int async_manager_worker(void)
1418{
1419 while (true) {
1420 if (fibril_switch(FIBRIL_FROM_MANAGER)) {
1421 futex_up(&async_futex);
1422 /*
1423 * async_futex is always held when entering a manager
1424 * fibril.
1425 */
1426 continue;
1427 }
1428
1429 futex_down(&async_futex);
1430
1431 suseconds_t timeout;
1432 unsigned int flags = SYNCH_FLAGS_NONE;
1433 if (!list_empty(&timeout_list)) {
1434 awaiter_t *waiter = list_get_instance(
1435 list_first(&timeout_list), awaiter_t, to_event.link);
1436
1437 struct timeval tv;
1438 getuptime(&tv);
1439
1440 if (tv_gteq(&tv, &waiter->to_event.expires)) {
1441 futex_up(&async_futex);
1442 handle_expired_timeouts();
1443 /*
1444 * Notice that even if the event(s) already
1445 * expired (and thus the other fibril was
1446 * supposed to be running already),
1447 * we check for incoming IPC.
1448 *
1449 * Otherwise, a fibril that continuously
1450 * creates (almost) expired events could
1451 * prevent IPC retrieval from the kernel.
1452 */
1453 timeout = 0;
1454 flags = SYNCH_FLAGS_NON_BLOCKING;
1455
1456 } else {
1457 timeout = tv_sub_diff(&waiter->to_event.expires,
1458 &tv);
1459 futex_up(&async_futex);
1460 }
1461 } else {
1462 futex_up(&async_futex);
1463 timeout = SYNCH_NO_TIMEOUT;
1464 }
1465
1466 atomic_inc(&threads_in_ipc_wait);
1467
1468 ipc_call_t call;
1469 ipc_callid_t callid = ipc_wait_cycle(&call, timeout, flags);
1470
1471 atomic_dec(&threads_in_ipc_wait);
1472
1473 if (!callid) {
1474 handle_expired_timeouts();
1475 continue;
1476 }
1477
1478 if (callid & IPC_CALLID_ANSWERED)
1479 continue;
1480
1481 handle_call(callid, &call);
1482 }
1483
1484 return 0;
1485}
1486
1487/** Function to start async_manager as a standalone fibril.
1488 *
1489 * When more kernel threads are used, one async manager should exist per thread.
1490 *
1491 * @param arg Unused.
1492 * @return Never returns.
1493 *
1494 */
1495static int async_manager_fibril(void *arg)
1496{
1497 futex_up(&async_futex);
1498
1499 /*
1500 * async_futex is always locked when entering manager
1501 */
1502 async_manager_worker();
1503
1504 return 0;
1505}
1506
1507/** Add one manager to manager list. */
1508void async_create_manager(void)
1509{
1510 fid_t fid = fibril_create_generic(async_manager_fibril, NULL, PAGE_SIZE);
1511 if (fid != 0)
1512 fibril_add_manager(fid);
1513}
1514
1515/** Remove one manager from manager list */
1516void async_destroy_manager(void)
1517{
1518 fibril_remove_manager();
1519}
1520
1521/** Initialize the async framework.
1522 *
1523 */
1524void __async_init(void)
1525{
1526 if (!hash_table_create(&interface_hash_table, 0, 0,
1527 &interface_hash_table_ops))
1528 abort();
1529
1530 if (!hash_table_create(&client_hash_table, 0, 0, &client_hash_table_ops))
1531 abort();
1532
1533 if (!hash_table_create(&conn_hash_table, 0, 0, &conn_hash_table_ops))
1534 abort();
1535
1536 if (!hash_table_create(&notification_hash_table, 0, 0,
1537 &notification_hash_table_ops))
1538 abort();
1539
1540 session_ns = (async_sess_t *) malloc(sizeof(async_sess_t));
1541 if (session_ns == NULL)
1542 abort();
1543
1544 session_ns->iface = 0;
1545 session_ns->mgmt = EXCHANGE_ATOMIC;
1546 session_ns->phone = PHONE_NS;
1547 session_ns->arg1 = 0;
1548 session_ns->arg2 = 0;
1549 session_ns->arg3 = 0;
1550
1551 fibril_mutex_initialize(&session_ns->remote_state_mtx);
1552 session_ns->remote_state_data = NULL;
1553
1554 list_initialize(&session_ns->exch_list);
1555 fibril_mutex_initialize(&session_ns->mutex);
1556 atomic_set(&session_ns->refcnt, 0);
1557}
1558
1559/** Reply received callback.
1560 *
1561 * This function is called whenever a reply for an asynchronous message sent out
1562 * by the asynchronous framework is received.
1563 *
1564 * Notify the fibril which is waiting for this message that it has arrived.
1565 *
1566 * @param arg Pointer to the asynchronous message record.
1567 * @param retval Value returned in the answer.
1568 * @param data Call data of the answer.
1569 *
1570 */
1571void reply_received(void *arg, int retval, ipc_call_t *data)
1572{
1573 assert(arg);
1574
1575 futex_down(&async_futex);
1576
1577 amsg_t *msg = (amsg_t *) arg;
1578 msg->retval = retval;
1579
1580 /* Copy data after futex_down, just in case the call was detached */
1581 if ((msg->dataptr) && (data))
1582 *msg->dataptr = *data;
1583
1584 write_barrier();
1585
1586 /* Remove message from timeout list */
1587 if (msg->wdata.to_event.inlist)
1588 list_remove(&msg->wdata.to_event.link);
1589
1590 msg->done = true;
1591
1592 if (msg->forget) {
1593 assert(msg->wdata.active);
1594 amsg_destroy(msg);
1595 } else if (!msg->wdata.active) {
1596 msg->wdata.active = true;
1597 fibril_add_ready(msg->wdata.fid);
1598 }
1599
1600 futex_up(&async_futex);
1601}
1602
1603/** Send message and return id of the sent message.
1604 *
1605 * The return value can be used as input for async_wait() to wait for
1606 * completion.
1607 *
1608 * @param exch Exchange for sending the message.
1609 * @param imethod Service-defined interface and method.
1610 * @param arg1 Service-defined payload argument.
1611 * @param arg2 Service-defined payload argument.
1612 * @param arg3 Service-defined payload argument.
1613 * @param arg4 Service-defined payload argument.
1614 * @param dataptr If non-NULL, storage where the reply data will be
1615 * stored.
1616 *
1617 * @return Hash of the sent message or 0 on error.
1618 *
1619 */
1620aid_t async_send_fast(async_exch_t *exch, sysarg_t imethod, sysarg_t arg1,
1621 sysarg_t arg2, sysarg_t arg3, sysarg_t arg4, ipc_call_t *dataptr)
1622{
1623 if (exch == NULL)
1624 return 0;
1625
1626 amsg_t *msg = amsg_create();
1627 if (msg == NULL)
1628 return 0;
1629
1630 msg->dataptr = dataptr;
1631 msg->wdata.active = true;
1632
1633 ipc_call_async_4(exch->phone, imethod, arg1, arg2, arg3, arg4, msg,
1634 reply_received);
1635
1636 return (aid_t) msg;
1637}
1638
1639/** Send message and return id of the sent message
1640 *
1641 * The return value can be used as input for async_wait() to wait for
1642 * completion.
1643 *
1644 * @param exch Exchange for sending the message.
1645 * @param imethod Service-defined interface and method.
1646 * @param arg1 Service-defined payload argument.
1647 * @param arg2 Service-defined payload argument.
1648 * @param arg3 Service-defined payload argument.
1649 * @param arg4 Service-defined payload argument.
1650 * @param arg5 Service-defined payload argument.
1651 * @param dataptr If non-NULL, storage where the reply data will be
1652 * stored.
1653 *
1654 * @return Hash of the sent message or 0 on error.
1655 *
1656 */
1657aid_t async_send_slow(async_exch_t *exch, sysarg_t imethod, sysarg_t arg1,
1658 sysarg_t arg2, sysarg_t arg3, sysarg_t arg4, sysarg_t arg5,
1659 ipc_call_t *dataptr)
1660{
1661 if (exch == NULL)
1662 return 0;
1663
1664 amsg_t *msg = amsg_create();
1665 if (msg == NULL)
1666 return 0;
1667
1668 msg->dataptr = dataptr;
1669 msg->wdata.active = true;
1670
1671 ipc_call_async_5(exch->phone, imethod, arg1, arg2, arg3, arg4, arg5,
1672 msg, reply_received);
1673
1674 return (aid_t) msg;
1675}
1676
1677/** Wait for a message sent by the async framework.
1678 *
1679 * @param amsgid Hash of the message to wait for.
1680 * @param retval Pointer to storage where the retval of the answer will
1681 * be stored.
1682 *
1683 */
1684void async_wait_for(aid_t amsgid, sysarg_t *retval)
1685{
1686 assert(amsgid);
1687
1688 amsg_t *msg = (amsg_t *) amsgid;
1689
1690 futex_down(&async_futex);
1691
1692 assert(!msg->forget);
1693 assert(!msg->destroyed);
1694
1695 if (msg->done) {
1696 futex_up(&async_futex);
1697 goto done;
1698 }
1699
1700 msg->wdata.fid = fibril_get_id();
1701 msg->wdata.active = false;
1702 msg->wdata.to_event.inlist = false;
1703
1704 /* Leave the async_futex locked when entering this function */
1705 fibril_switch(FIBRIL_TO_MANAGER);
1706
1707 /* Futex is up automatically after fibril_switch */
1708
1709done:
1710 if (retval)
1711 *retval = msg->retval;
1712
1713 amsg_destroy(msg);
1714}
1715
1716/** Wait for a message sent by the async framework, timeout variant.
1717 *
1718 * If the wait times out, the caller may choose to either wait again by calling
1719 * async_wait_for() or async_wait_timeout(), or forget the message via
1720 * async_forget().
1721 *
1722 * @param amsgid Hash of the message to wait for.
1723 * @param retval Pointer to storage where the retval of the answer will
1724 * be stored.
1725 * @param timeout Timeout in microseconds.
1726 *
1727 * @return Zero on success, ETIMEOUT if the timeout has expired.
1728 *
1729 */
1730int async_wait_timeout(aid_t amsgid, sysarg_t *retval, suseconds_t timeout)
1731{
1732 assert(amsgid);
1733
1734 amsg_t *msg = (amsg_t *) amsgid;
1735
1736 futex_down(&async_futex);
1737
1738 assert(!msg->forget);
1739 assert(!msg->destroyed);
1740
1741 if (msg->done) {
1742 futex_up(&async_futex);
1743 goto done;
1744 }
1745
1746 /*
1747 * Negative timeout is converted to zero timeout to avoid
1748 * using tv_add with negative augmenter.
1749 */
1750 if (timeout < 0)
1751 timeout = 0;
1752
1753 getuptime(&msg->wdata.to_event.expires);
1754 tv_add_diff(&msg->wdata.to_event.expires, timeout);
1755
1756 /*
1757 * Current fibril is inserted as waiting regardless of the
1758 * "size" of the timeout.
1759 *
1760 * Checking for msg->done and immediately bailing out when
1761 * timeout == 0 would mean that the manager fibril would never
1762 * run (consider single threaded program).
1763 * Thus the IPC answer would be never retrieved from the kernel.
1764 *
1765 * Notice that the actual delay would be very small because we
1766 * - switch to manager fibril
1767 * - the manager sees expired timeout
1768 * - and thus adds us back to ready queue
1769 * - manager switches back to some ready fibril
1770 * (prior it, it checks for incoming IPC).
1771 *
1772 */
1773 msg->wdata.fid = fibril_get_id();
1774 msg->wdata.active = false;
1775 async_insert_timeout(&msg->wdata);
1776
1777 /* Leave the async_futex locked when entering this function */
1778 fibril_switch(FIBRIL_TO_MANAGER);
1779
1780 /* Futex is up automatically after fibril_switch */
1781
1782 if (!msg->done)
1783 return ETIMEOUT;
1784
1785done:
1786 if (retval)
1787 *retval = msg->retval;
1788
1789 amsg_destroy(msg);
1790
1791 return 0;
1792}
1793
1794/** Discard the message / reply on arrival.
1795 *
1796 * The message will be marked to be discarded once the reply arrives in
1797 * reply_received(). It is not allowed to call async_wait_for() or
1798 * async_wait_timeout() on this message after a call to this function.
1799 *
1800 * @param amsgid Hash of the message to forget.
1801 */
1802void async_forget(aid_t amsgid)
1803{
1804 amsg_t *msg = (amsg_t *) amsgid;
1805
1806 assert(msg);
1807 assert(!msg->forget);
1808 assert(!msg->destroyed);
1809
1810 futex_down(&async_futex);
1811
1812 if (msg->done) {
1813 amsg_destroy(msg);
1814 } else {
1815 msg->dataptr = NULL;
1816 msg->forget = true;
1817 }
1818
1819 futex_up(&async_futex);
1820}
1821
1822/** Wait for specified time.
1823 *
1824 * The current fibril is suspended but the thread continues to execute.
1825 *
1826 * @param timeout Duration of the wait in microseconds.
1827 *
1828 */
1829void async_usleep(suseconds_t timeout)
1830{
1831 amsg_t *msg = amsg_create();
1832 if (!msg)
1833 return;
1834
1835 msg->wdata.fid = fibril_get_id();
1836
1837 getuptime(&msg->wdata.to_event.expires);
1838 tv_add_diff(&msg->wdata.to_event.expires, timeout);
1839
1840 futex_down(&async_futex);
1841
1842 async_insert_timeout(&msg->wdata);
1843
1844 /* Leave the async_futex locked when entering this function */
1845 fibril_switch(FIBRIL_TO_MANAGER);
1846
1847 /* Futex is up automatically after fibril_switch() */
1848
1849 amsg_destroy(msg);
1850}
1851
1852/** Pseudo-synchronous message sending - fast version.
1853 *
1854 * Send message asynchronously and return only after the reply arrives.
1855 *
1856 * This function can only transfer 4 register payload arguments. For
1857 * transferring more arguments, see the slower async_req_slow().
1858 *
1859 * @param exch Exchange for sending the message.
1860 * @param imethod Interface and method of the call.
1861 * @param arg1 Service-defined payload argument.
1862 * @param arg2 Service-defined payload argument.
1863 * @param arg3 Service-defined payload argument.
1864 * @param arg4 Service-defined payload argument.
1865 * @param r1 If non-NULL, storage for the 1st reply argument.
1866 * @param r2 If non-NULL, storage for the 2nd reply argument.
1867 * @param r3 If non-NULL, storage for the 3rd reply argument.
1868 * @param r4 If non-NULL, storage for the 4th reply argument.
1869 * @param r5 If non-NULL, storage for the 5th reply argument.
1870 *
1871 * @return Return code of the reply or a negative error code.
1872 *
1873 */
1874sysarg_t async_req_fast(async_exch_t *exch, sysarg_t imethod, sysarg_t arg1,
1875 sysarg_t arg2, sysarg_t arg3, sysarg_t arg4, sysarg_t *r1, sysarg_t *r2,
1876 sysarg_t *r3, sysarg_t *r4, sysarg_t *r5)
1877{
1878 if (exch == NULL)
1879 return ENOENT;
1880
1881 ipc_call_t result;
1882 aid_t aid = async_send_4(exch, imethod, arg1, arg2, arg3, arg4,
1883 &result);
1884
1885 sysarg_t rc;
1886 async_wait_for(aid, &rc);
1887
1888 if (r1)
1889 *r1 = IPC_GET_ARG1(result);
1890
1891 if (r2)
1892 *r2 = IPC_GET_ARG2(result);
1893
1894 if (r3)
1895 *r3 = IPC_GET_ARG3(result);
1896
1897 if (r4)
1898 *r4 = IPC_GET_ARG4(result);
1899
1900 if (r5)
1901 *r5 = IPC_GET_ARG5(result);
1902
1903 return rc;
1904}
1905
1906/** Pseudo-synchronous message sending - slow version.
1907 *
1908 * Send message asynchronously and return only after the reply arrives.
1909 *
1910 * @param exch Exchange for sending the message.
1911 * @param imethod Interface and method of the call.
1912 * @param arg1 Service-defined payload argument.
1913 * @param arg2 Service-defined payload argument.
1914 * @param arg3 Service-defined payload argument.
1915 * @param arg4 Service-defined payload argument.
1916 * @param arg5 Service-defined payload argument.
1917 * @param r1 If non-NULL, storage for the 1st reply argument.
1918 * @param r2 If non-NULL, storage for the 2nd reply argument.
1919 * @param r3 If non-NULL, storage for the 3rd reply argument.
1920 * @param r4 If non-NULL, storage for the 4th reply argument.
1921 * @param r5 If non-NULL, storage for the 5th reply argument.
1922 *
1923 * @return Return code of the reply or a negative error code.
1924 *
1925 */
1926sysarg_t async_req_slow(async_exch_t *exch, sysarg_t imethod, sysarg_t arg1,
1927 sysarg_t arg2, sysarg_t arg3, sysarg_t arg4, sysarg_t arg5, sysarg_t *r1,
1928 sysarg_t *r2, sysarg_t *r3, sysarg_t *r4, sysarg_t *r5)
1929{
1930 if (exch == NULL)
1931 return ENOENT;
1932
1933 ipc_call_t result;
1934 aid_t aid = async_send_5(exch, imethod, arg1, arg2, arg3, arg4, arg5,
1935 &result);
1936
1937 sysarg_t rc;
1938 async_wait_for(aid, &rc);
1939
1940 if (r1)
1941 *r1 = IPC_GET_ARG1(result);
1942
1943 if (r2)
1944 *r2 = IPC_GET_ARG2(result);
1945
1946 if (r3)
1947 *r3 = IPC_GET_ARG3(result);
1948
1949 if (r4)
1950 *r4 = IPC_GET_ARG4(result);
1951
1952 if (r5)
1953 *r5 = IPC_GET_ARG5(result);
1954
1955 return rc;
1956}
1957
1958void async_msg_0(async_exch_t *exch, sysarg_t imethod)
1959{
1960 if (exch != NULL)
1961 ipc_call_async_0(exch->phone, imethod, NULL, NULL);
1962}
1963
1964void async_msg_1(async_exch_t *exch, sysarg_t imethod, sysarg_t arg1)
1965{
1966 if (exch != NULL)
1967 ipc_call_async_1(exch->phone, imethod, arg1, NULL, NULL);
1968}
1969
1970void async_msg_2(async_exch_t *exch, sysarg_t imethod, sysarg_t arg1,
1971 sysarg_t arg2)
1972{
1973 if (exch != NULL)
1974 ipc_call_async_2(exch->phone, imethod, arg1, arg2, NULL, NULL);
1975}
1976
1977void async_msg_3(async_exch_t *exch, sysarg_t imethod, sysarg_t arg1,
1978 sysarg_t arg2, sysarg_t arg3)
1979{
1980 if (exch != NULL)
1981 ipc_call_async_3(exch->phone, imethod, arg1, arg2, arg3, NULL,
1982 NULL);
1983}
1984
1985void async_msg_4(async_exch_t *exch, sysarg_t imethod, sysarg_t arg1,
1986 sysarg_t arg2, sysarg_t arg3, sysarg_t arg4)
1987{
1988 if (exch != NULL)
1989 ipc_call_async_4(exch->phone, imethod, arg1, arg2, arg3, arg4,
1990 NULL, NULL);
1991}
1992
1993void async_msg_5(async_exch_t *exch, sysarg_t imethod, sysarg_t arg1,
1994 sysarg_t arg2, sysarg_t arg3, sysarg_t arg4, sysarg_t arg5)
1995{
1996 if (exch != NULL)
1997 ipc_call_async_5(exch->phone, imethod, arg1, arg2, arg3, arg4,
1998 arg5, NULL, NULL);
1999}
2000
2001sysarg_t async_answer_0(ipc_callid_t callid, sysarg_t retval)
2002{
2003 return ipc_answer_0(callid, retval);
2004}
2005
2006sysarg_t async_answer_1(ipc_callid_t callid, sysarg_t retval, sysarg_t arg1)
2007{
2008 return ipc_answer_1(callid, retval, arg1);
2009}
2010
2011sysarg_t async_answer_2(ipc_callid_t callid, sysarg_t retval, sysarg_t arg1,
2012 sysarg_t arg2)
2013{
2014 return ipc_answer_2(callid, retval, arg1, arg2);
2015}
2016
2017sysarg_t async_answer_3(ipc_callid_t callid, sysarg_t retval, sysarg_t arg1,
2018 sysarg_t arg2, sysarg_t arg3)
2019{
2020 return ipc_answer_3(callid, retval, arg1, arg2, arg3);
2021}
2022
2023sysarg_t async_answer_4(ipc_callid_t callid, sysarg_t retval, sysarg_t arg1,
2024 sysarg_t arg2, sysarg_t arg3, sysarg_t arg4)
2025{
2026 return ipc_answer_4(callid, retval, arg1, arg2, arg3, arg4);
2027}
2028
2029sysarg_t async_answer_5(ipc_callid_t callid, sysarg_t retval, sysarg_t arg1,
2030 sysarg_t arg2, sysarg_t arg3, sysarg_t arg4, sysarg_t arg5)
2031{
2032 return ipc_answer_5(callid, retval, arg1, arg2, arg3, arg4, arg5);
2033}
2034
2035int async_forward_fast(ipc_callid_t callid, async_exch_t *exch,
2036 sysarg_t imethod, sysarg_t arg1, sysarg_t arg2, unsigned int mode)
2037{
2038 if (exch == NULL)
2039 return ENOENT;
2040
2041 return ipc_forward_fast(callid, exch->phone, imethod, arg1, arg2, mode);
2042}
2043
2044int async_forward_slow(ipc_callid_t callid, async_exch_t *exch,
2045 sysarg_t imethod, sysarg_t arg1, sysarg_t arg2, sysarg_t arg3,
2046 sysarg_t arg4, sysarg_t arg5, unsigned int mode)
2047{
2048 if (exch == NULL)
2049 return ENOENT;
2050
2051 return ipc_forward_slow(callid, exch->phone, imethod, arg1, arg2, arg3,
2052 arg4, arg5, mode);
2053}
2054
2055/** Wrapper for making IPC_M_CONNECT_TO_ME calls using the async framework.
2056 *
2057 * Ask through phone for a new connection to some service.
2058 *
2059 * @param exch Exchange for sending the message.
2060 * @param arg1 User defined argument.
2061 * @param arg2 User defined argument.
2062 * @param arg3 User defined argument.
2063 *
2064 * @return Zero on success or a negative error code.
2065 *
2066 */
2067int async_connect_to_me(async_exch_t *exch, sysarg_t arg1, sysarg_t arg2,
2068 sysarg_t arg3)
2069{
2070 if (exch == NULL)
2071 return ENOENT;
2072
2073 ipc_call_t answer;
2074 aid_t req = async_send_3(exch, IPC_M_CONNECT_TO_ME, arg1, arg2, arg3,
2075 &answer);
2076
2077 sysarg_t rc;
2078 async_wait_for(req, &rc);
2079 if (rc != EOK)
2080 return (int) rc;
2081
2082 return EOK;
2083}
2084
2085/** Wrapper for making IPC_M_CLONE_ESTABLISH calls using the async framework.
2086 *
2087 * Ask for a cloned connection to some service.
2088 *
2089 * @param mgmt Exchange management style.
2090 * @param exch Exchange for sending the message.
2091 *
2092 * @return New session on success or NULL on error.
2093 *
2094 */
2095async_sess_t *async_clone_establish(exch_mgmt_t mgmt, async_exch_t *exch)
2096{
2097 if (exch == NULL) {
2098 errno = ENOENT;
2099 return NULL;
2100 }
2101
2102 async_sess_t *sess = (async_sess_t *) malloc(sizeof(async_sess_t));
2103 if (sess == NULL) {
2104 errno = ENOMEM;
2105 return NULL;
2106 }
2107
2108 ipc_call_t result;
2109
2110 amsg_t *msg = amsg_create();
2111 if (!msg) {
2112 free(sess);
2113 errno = ENOMEM;
2114 return NULL;
2115 }
2116
2117 msg->dataptr = &result;
2118 msg->wdata.active = true;
2119
2120 ipc_call_async_0(exch->phone, IPC_M_CLONE_ESTABLISH, msg,
2121 reply_received);
2122
2123 sysarg_t rc;
2124 async_wait_for((aid_t) msg, &rc);
2125
2126 if (rc != EOK) {
2127 errno = rc;
2128 free(sess);
2129 return NULL;
2130 }
2131
2132 int phone = (int) IPC_GET_ARG5(result);
2133
2134 if (phone < 0) {
2135 errno = phone;
2136 free(sess);
2137 return NULL;
2138 }
2139
2140 sess->iface = 0;
2141 sess->mgmt = mgmt;
2142 sess->phone = phone;
2143 sess->arg1 = 0;
2144 sess->arg2 = 0;
2145 sess->arg3 = 0;
2146
2147 fibril_mutex_initialize(&sess->remote_state_mtx);
2148 sess->remote_state_data = NULL;
2149
2150 list_initialize(&sess->exch_list);
2151 fibril_mutex_initialize(&sess->mutex);
2152 atomic_set(&sess->refcnt, 0);
2153
2154 return sess;
2155}
2156
2157static int async_connect_me_to_internal(int phone, sysarg_t arg1, sysarg_t arg2,
2158 sysarg_t arg3, sysarg_t arg4)
2159{
2160 ipc_call_t result;
2161
2162 amsg_t *msg = amsg_create();
2163 if (!msg)
2164 return ENOENT;
2165
2166 msg->dataptr = &result;
2167 msg->wdata.active = true;
2168
2169 ipc_call_async_4(phone, IPC_M_CONNECT_ME_TO, arg1, arg2, arg3, arg4,
2170 msg, reply_received);
2171
2172 sysarg_t rc;
2173 async_wait_for((aid_t) msg, &rc);
2174
2175 if (rc != EOK)
2176 return rc;
2177
2178 return (int) IPC_GET_ARG5(result);
2179}
2180
2181/** Wrapper for making IPC_M_CONNECT_ME_TO calls using the async framework.
2182 *
2183 * Ask through for a new connection to some service.
2184 *
2185 * @param mgmt Exchange management style.
2186 * @param exch Exchange for sending the message.
2187 * @param arg1 User defined argument.
2188 * @param arg2 User defined argument.
2189 * @param arg3 User defined argument.
2190 *
2191 * @return New session on success or NULL on error.
2192 *
2193 */
2194async_sess_t *async_connect_me_to(exch_mgmt_t mgmt, async_exch_t *exch,
2195 sysarg_t arg1, sysarg_t arg2, sysarg_t arg3)
2196{
2197 if (exch == NULL) {
2198 errno = ENOENT;
2199 return NULL;
2200 }
2201
2202 async_sess_t *sess = (async_sess_t *) malloc(sizeof(async_sess_t));
2203 if (sess == NULL) {
2204 errno = ENOMEM;
2205 return NULL;
2206 }
2207
2208 int phone = async_connect_me_to_internal(exch->phone, arg1, arg2, arg3,
2209 0);
2210 if (phone < 0) {
2211 errno = phone;
2212 free(sess);
2213 return NULL;
2214 }
2215
2216 sess->iface = 0;
2217 sess->mgmt = mgmt;
2218 sess->phone = phone;
2219 sess->arg1 = arg1;
2220 sess->arg2 = arg2;
2221 sess->arg3 = arg3;
2222
2223 fibril_mutex_initialize(&sess->remote_state_mtx);
2224 sess->remote_state_data = NULL;
2225
2226 list_initialize(&sess->exch_list);
2227 fibril_mutex_initialize(&sess->mutex);
2228 atomic_set(&sess->refcnt, 0);
2229
2230 return sess;
2231}
2232
2233/** Wrapper for making IPC_M_CONNECT_ME_TO calls using the async framework.
2234 *
2235 * Ask through phone for a new connection to some service and block until
2236 * success.
2237 *
2238 * @param exch Exchange for sending the message.
2239 * @param iface Connection interface.
2240 * @param arg2 User defined argument.
2241 * @param arg3 User defined argument.
2242 *
2243 * @return New session on success or NULL on error.
2244 *
2245 */
2246async_sess_t *async_connect_me_to_iface(async_exch_t *exch, iface_t iface,
2247 sysarg_t arg2, sysarg_t arg3)
2248{
2249 if (exch == NULL) {
2250 errno = ENOENT;
2251 return NULL;
2252 }
2253
2254 async_sess_t *sess = (async_sess_t *) malloc(sizeof(async_sess_t));
2255 if (sess == NULL) {
2256 errno = ENOMEM;
2257 return NULL;
2258 }
2259
2260 int phone = async_connect_me_to_internal(exch->phone, iface, arg2,
2261 arg3, 0);
2262 if (phone < 0) {
2263 errno = phone;
2264 free(sess);
2265 return NULL;
2266 }
2267
2268 sess->iface = iface;
2269 sess->phone = phone;
2270 sess->arg1 = iface;
2271 sess->arg2 = arg2;
2272 sess->arg3 = arg3;
2273
2274 fibril_mutex_initialize(&sess->remote_state_mtx);
2275 sess->remote_state_data = NULL;
2276
2277 list_initialize(&sess->exch_list);
2278 fibril_mutex_initialize(&sess->mutex);
2279 atomic_set(&sess->refcnt, 0);
2280
2281 return sess;
2282}
2283
2284/** Set arguments for new connections.
2285 *
2286 * FIXME This is an ugly hack to work around the problem that parallel
2287 * exchanges are implemented using parallel connections. When we create
2288 * a callback session, the framework does not know arguments for the new
2289 * connections.
2290 *
2291 * The proper solution seems to be to implement parallel exchanges using
2292 * tagging.
2293 */
2294void async_sess_args_set(async_sess_t *sess, sysarg_t arg1, sysarg_t arg2,
2295 sysarg_t arg3)
2296{
2297 sess->arg1 = arg1;
2298 sess->arg2 = arg2;
2299 sess->arg3 = arg3;
2300}
2301
2302/** Wrapper for making IPC_M_CONNECT_ME_TO calls using the async framework.
2303 *
2304 * Ask through phone for a new connection to some service and block until
2305 * success.
2306 *
2307 * @param mgmt Exchange management style.
2308 * @param exch Exchange for sending the message.
2309 * @param arg1 User defined argument.
2310 * @param arg2 User defined argument.
2311 * @param arg3 User defined argument.
2312 *
2313 * @return New session on success or NULL on error.
2314 *
2315 */
2316async_sess_t *async_connect_me_to_blocking(exch_mgmt_t mgmt, async_exch_t *exch,
2317 sysarg_t arg1, sysarg_t arg2, sysarg_t arg3)
2318{
2319 if (exch == NULL) {
2320 errno = ENOENT;
2321 return NULL;
2322 }
2323
2324 async_sess_t *sess = (async_sess_t *) malloc(sizeof(async_sess_t));
2325 if (sess == NULL) {
2326 errno = ENOMEM;
2327 return NULL;
2328 }
2329
2330 int phone = async_connect_me_to_internal(exch->phone, arg1, arg2, arg3,
2331 IPC_FLAG_BLOCKING);
2332
2333 if (phone < 0) {
2334 errno = phone;
2335 free(sess);
2336 return NULL;
2337 }
2338
2339 sess->iface = 0;
2340 sess->mgmt = mgmt;
2341 sess->phone = phone;
2342 sess->arg1 = arg1;
2343 sess->arg2 = arg2;
2344 sess->arg3 = arg3;
2345
2346 fibril_mutex_initialize(&sess->remote_state_mtx);
2347 sess->remote_state_data = NULL;
2348
2349 list_initialize(&sess->exch_list);
2350 fibril_mutex_initialize(&sess->mutex);
2351 atomic_set(&sess->refcnt, 0);
2352
2353 return sess;
2354}
2355
2356/** Wrapper for making IPC_M_CONNECT_ME_TO calls using the async framework.
2357 *
2358 * Ask through phone for a new connection to some service and block until
2359 * success.
2360 *
2361 * @param exch Exchange for sending the message.
2362 * @param iface Connection interface.
2363 * @param arg2 User defined argument.
2364 * @param arg3 User defined argument.
2365 *
2366 * @return New session on success or NULL on error.
2367 *
2368 */
2369async_sess_t *async_connect_me_to_blocking_iface(async_exch_t *exch, iface_t iface,
2370 sysarg_t arg2, sysarg_t arg3)
2371{
2372 if (exch == NULL) {
2373 errno = ENOENT;
2374 return NULL;
2375 }
2376
2377 async_sess_t *sess = (async_sess_t *) malloc(sizeof(async_sess_t));
2378 if (sess == NULL) {
2379 errno = ENOMEM;
2380 return NULL;
2381 }
2382
2383 int phone = async_connect_me_to_internal(exch->phone, iface, arg2,
2384 arg3, IPC_FLAG_BLOCKING);
2385 if (phone < 0) {
2386 errno = phone;
2387 free(sess);
2388 return NULL;
2389 }
2390
2391 sess->iface = iface;
2392 sess->phone = phone;
2393 sess->arg1 = iface;
2394 sess->arg2 = arg2;
2395 sess->arg3 = arg3;
2396
2397 fibril_mutex_initialize(&sess->remote_state_mtx);
2398 sess->remote_state_data = NULL;
2399
2400 list_initialize(&sess->exch_list);
2401 fibril_mutex_initialize(&sess->mutex);
2402 atomic_set(&sess->refcnt, 0);
2403
2404 return sess;
2405}
2406
2407/** Connect to a task specified by id.
2408 *
2409 */
2410async_sess_t *async_connect_kbox(task_id_t id)
2411{
2412 async_sess_t *sess = (async_sess_t *) malloc(sizeof(async_sess_t));
2413 if (sess == NULL) {
2414 errno = ENOMEM;
2415 return NULL;
2416 }
2417
2418 int phone = ipc_connect_kbox(id);
2419 if (phone < 0) {
2420 errno = phone;
2421 free(sess);
2422 return NULL;
2423 }
2424
2425 sess->iface = 0;
2426 sess->mgmt = EXCHANGE_ATOMIC;
2427 sess->phone = phone;
2428 sess->arg1 = 0;
2429 sess->arg2 = 0;
2430 sess->arg3 = 0;
2431
2432 fibril_mutex_initialize(&sess->remote_state_mtx);
2433 sess->remote_state_data = NULL;
2434
2435 list_initialize(&sess->exch_list);
2436 fibril_mutex_initialize(&sess->mutex);
2437 atomic_set(&sess->refcnt, 0);
2438
2439 return sess;
2440}
2441
2442static int async_hangup_internal(int phone)
2443{
2444 return ipc_hangup(phone);
2445}
2446
2447/** Wrapper for ipc_hangup.
2448 *
2449 * @param sess Session to hung up.
2450 *
2451 * @return Zero on success or a negative error code.
2452 *
2453 */
2454int async_hangup(async_sess_t *sess)
2455{
2456 async_exch_t *exch;
2457
2458 assert(sess);
2459
2460 if (atomic_get(&sess->refcnt) > 0)
2461 return EBUSY;
2462
2463 fibril_mutex_lock(&async_sess_mutex);
2464
2465 int rc = async_hangup_internal(sess->phone);
2466
2467 while (!list_empty(&sess->exch_list)) {
2468 exch = (async_exch_t *)
2469 list_get_instance(list_first(&sess->exch_list),
2470 async_exch_t, sess_link);
2471
2472 list_remove(&exch->sess_link);
2473 list_remove(&exch->global_link);
2474 async_hangup_internal(exch->phone);
2475 free(exch);
2476 }
2477
2478 free(sess);
2479
2480 fibril_mutex_unlock(&async_sess_mutex);
2481
2482 return rc;
2483}
2484
2485/** Interrupt one thread of this task from waiting for IPC. */
2486void async_poke(void)
2487{
2488 ipc_poke();
2489}
2490
2491/** Start new exchange in a session.
2492 *
2493 * @param session Session.
2494 *
2495 * @return New exchange or NULL on error.
2496 *
2497 */
2498async_exch_t *async_exchange_begin(async_sess_t *sess)
2499{
2500 if (sess == NULL)
2501 return NULL;
2502
2503 exch_mgmt_t mgmt = sess->mgmt;
2504 if (sess->iface != 0)
2505 mgmt = sess->iface & IFACE_EXCHANGE_MASK;
2506
2507 async_exch_t *exch = NULL;
2508
2509 fibril_mutex_lock(&async_sess_mutex);
2510
2511 if (!list_empty(&sess->exch_list)) {
2512 /*
2513 * There are inactive exchanges in the session.
2514 */
2515 exch = (async_exch_t *)
2516 list_get_instance(list_first(&sess->exch_list),
2517 async_exch_t, sess_link);
2518
2519 list_remove(&exch->sess_link);
2520 list_remove(&exch->global_link);
2521 } else {
2522 /*
2523 * There are no available exchanges in the session.
2524 */
2525
2526 if ((mgmt == EXCHANGE_ATOMIC) ||
2527 (mgmt == EXCHANGE_SERIALIZE)) {
2528 exch = (async_exch_t *) malloc(sizeof(async_exch_t));
2529 if (exch != NULL) {
2530 link_initialize(&exch->sess_link);
2531 link_initialize(&exch->global_link);
2532 exch->sess = sess;
2533 exch->phone = sess->phone;
2534 }
2535 } else if (mgmt == EXCHANGE_PARALLEL) {
2536 int phone;
2537
2538 retry:
2539 /*
2540 * Make a one-time attempt to connect a new data phone.
2541 */
2542 phone = async_connect_me_to_internal(sess->phone, sess->arg1,
2543 sess->arg2, sess->arg3, 0);
2544 if (phone >= 0) {
2545 exch = (async_exch_t *) malloc(sizeof(async_exch_t));
2546 if (exch != NULL) {
2547 link_initialize(&exch->sess_link);
2548 link_initialize(&exch->global_link);
2549 exch->sess = sess;
2550 exch->phone = phone;
2551 } else
2552 async_hangup_internal(phone);
2553 } else if (!list_empty(&inactive_exch_list)) {
2554 /*
2555 * We did not manage to connect a new phone. But we
2556 * can try to close some of the currently inactive
2557 * connections in other sessions and try again.
2558 */
2559 exch = (async_exch_t *)
2560 list_get_instance(list_first(&inactive_exch_list),
2561 async_exch_t, global_link);
2562
2563 list_remove(&exch->sess_link);
2564 list_remove(&exch->global_link);
2565 async_hangup_internal(exch->phone);
2566 free(exch);
2567 goto retry;
2568 } else {
2569 /*
2570 * Wait for a phone to become available.
2571 */
2572 fibril_condvar_wait(&avail_phone_cv, &async_sess_mutex);
2573 goto retry;
2574 }
2575 }
2576 }
2577
2578 fibril_mutex_unlock(&async_sess_mutex);
2579
2580 if (exch != NULL) {
2581 atomic_inc(&sess->refcnt);
2582
2583 if (mgmt == EXCHANGE_SERIALIZE)
2584 fibril_mutex_lock(&sess->mutex);
2585 }
2586
2587 return exch;
2588}
2589
2590/** Finish an exchange.
2591 *
2592 * @param exch Exchange to finish.
2593 *
2594 */
2595void async_exchange_end(async_exch_t *exch)
2596{
2597 if (exch == NULL)
2598 return;
2599
2600 async_sess_t *sess = exch->sess;
2601 assert(sess != NULL);
2602
2603 exch_mgmt_t mgmt = sess->mgmt;
2604 if (sess->iface != 0)
2605 mgmt = sess->iface & IFACE_EXCHANGE_MASK;
2606
2607 atomic_dec(&sess->refcnt);
2608
2609 if (mgmt == EXCHANGE_SERIALIZE)
2610 fibril_mutex_unlock(&sess->mutex);
2611
2612 fibril_mutex_lock(&async_sess_mutex);
2613
2614 list_append(&exch->sess_link, &sess->exch_list);
2615 list_append(&exch->global_link, &inactive_exch_list);
2616 fibril_condvar_signal(&avail_phone_cv);
2617
2618 fibril_mutex_unlock(&async_sess_mutex);
2619}
2620
2621/** Wrapper for IPC_M_SHARE_IN calls using the async framework.
2622 *
2623 * @param exch Exchange for sending the message.
2624 * @param size Size of the destination address space area.
2625 * @param arg User defined argument.
2626 * @param flags Storage for the received flags. Can be NULL.
2627 * @param dst Address of the storage for the destination address space area
2628 * base address. Cannot be NULL.
2629 *
2630 * @return Zero on success or a negative error code from errno.h.
2631 *
2632 */
2633int async_share_in_start(async_exch_t *exch, size_t size, sysarg_t arg,
2634 unsigned int *flags, void **dst)
2635{
2636 if (exch == NULL)
2637 return ENOENT;
2638
2639 sysarg_t _flags = 0;
2640 sysarg_t _dst = (sysarg_t) -1;
2641 int res = async_req_2_4(exch, IPC_M_SHARE_IN, (sysarg_t) size,
2642 arg, NULL, &_flags, NULL, &_dst);
2643
2644 if (flags)
2645 *flags = (unsigned int) _flags;
2646
2647 *dst = (void *) _dst;
2648 return res;
2649}
2650
2651/** Wrapper for receiving the IPC_M_SHARE_IN calls using the async framework.
2652 *
2653 * This wrapper only makes it more comfortable to receive IPC_M_SHARE_IN
2654 * calls so that the user doesn't have to remember the meaning of each IPC
2655 * argument.
2656 *
2657 * So far, this wrapper is to be used from within a connection fibril.
2658 *
2659 * @param callid Storage for the hash of the IPC_M_SHARE_IN call.
2660 * @param size Destination address space area size.
2661 *
2662 * @return True on success, false on failure.
2663 *
2664 */
2665bool async_share_in_receive(ipc_callid_t *callid, size_t *size)
2666{
2667 assert(callid);
2668 assert(size);
2669
2670 ipc_call_t data;
2671 *callid = async_get_call(&data);
2672
2673 if (IPC_GET_IMETHOD(data) != IPC_M_SHARE_IN)
2674 return false;
2675
2676 *size = (size_t) IPC_GET_ARG1(data);
2677 return true;
2678}
2679
2680/** Wrapper for answering the IPC_M_SHARE_IN calls using the async framework.
2681 *
2682 * This wrapper only makes it more comfortable to answer IPC_M_SHARE_IN
2683 * calls so that the user doesn't have to remember the meaning of each IPC
2684 * argument.
2685 *
2686 * @param callid Hash of the IPC_M_DATA_READ call to answer.
2687 * @param src Source address space base.
2688 * @param flags Flags to be used for sharing. Bits can be only cleared.
2689 *
2690 * @return Zero on success or a value from @ref errno.h on failure.
2691 *
2692 */
2693int async_share_in_finalize(ipc_callid_t callid, void *src, unsigned int flags)
2694{
2695 return ipc_answer_3(callid, EOK, (sysarg_t) src, (sysarg_t) flags,
2696 (sysarg_t) __entry);
2697}
2698
2699/** Wrapper for IPC_M_SHARE_OUT calls using the async framework.
2700 *
2701 * @param exch Exchange for sending the message.
2702 * @param src Source address space area base address.
2703 * @param flags Flags to be used for sharing. Bits can be only cleared.
2704 *
2705 * @return Zero on success or a negative error code from errno.h.
2706 *
2707 */
2708int async_share_out_start(async_exch_t *exch, void *src, unsigned int flags)
2709{
2710 if (exch == NULL)
2711 return ENOENT;
2712
2713 return async_req_3_0(exch, IPC_M_SHARE_OUT, (sysarg_t) src, 0,
2714 (sysarg_t) flags);
2715}
2716
2717/** Wrapper for receiving the IPC_M_SHARE_OUT calls using the async framework.
2718 *
2719 * This wrapper only makes it more comfortable to receive IPC_M_SHARE_OUT
2720 * calls so that the user doesn't have to remember the meaning of each IPC
2721 * argument.
2722 *
2723 * So far, this wrapper is to be used from within a connection fibril.
2724 *
2725 * @param callid Storage for the hash of the IPC_M_SHARE_OUT call.
2726 * @param size Storage for the source address space area size.
2727 * @param flags Storage for the sharing flags.
2728 *
2729 * @return True on success, false on failure.
2730 *
2731 */
2732bool async_share_out_receive(ipc_callid_t *callid, size_t *size, unsigned int *flags)
2733{
2734 assert(callid);
2735 assert(size);
2736 assert(flags);
2737
2738 ipc_call_t data;
2739 *callid = async_get_call(&data);
2740
2741 if (IPC_GET_IMETHOD(data) != IPC_M_SHARE_OUT)
2742 return false;
2743
2744 *size = (size_t) IPC_GET_ARG2(data);
2745 *flags = (unsigned int) IPC_GET_ARG3(data);
2746 return true;
2747}
2748
2749/** Wrapper for answering the IPC_M_SHARE_OUT calls using the async framework.
2750 *
2751 * This wrapper only makes it more comfortable to answer IPC_M_SHARE_OUT
2752 * calls so that the user doesn't have to remember the meaning of each IPC
2753 * argument.
2754 *
2755 * @param callid Hash of the IPC_M_DATA_WRITE call to answer.
2756 * @param dst Address of the storage for the destination address space area
2757 * base address.
2758 *
2759 * @return Zero on success or a value from @ref errno.h on failure.
2760 *
2761 */
2762int async_share_out_finalize(ipc_callid_t callid, void **dst)
2763{
2764 return ipc_answer_2(callid, EOK, (sysarg_t) __entry, (sysarg_t) dst);
2765}
2766
2767/** Start IPC_M_DATA_READ using the async framework.
2768 *
2769 * @param exch Exchange for sending the message.
2770 * @param dst Address of the beginning of the destination buffer.
2771 * @param size Size of the destination buffer (in bytes).
2772 * @param dataptr Storage of call data (arg 2 holds actual data size).
2773 *
2774 * @return Hash of the sent message or 0 on error.
2775 *
2776 */
2777aid_t async_data_read(async_exch_t *exch, void *dst, size_t size,
2778 ipc_call_t *dataptr)
2779{
2780 return async_send_2(exch, IPC_M_DATA_READ, (sysarg_t) dst,
2781 (sysarg_t) size, dataptr);
2782}
2783
2784/** Wrapper for IPC_M_DATA_READ calls using the async framework.
2785 *
2786 * @param exch Exchange for sending the message.
2787 * @param dst Address of the beginning of the destination buffer.
2788 * @param size Size of the destination buffer.
2789 *
2790 * @return Zero on success or a negative error code from errno.h.
2791 *
2792 */
2793int async_data_read_start(async_exch_t *exch, void *dst, size_t size)
2794{
2795 if (exch == NULL)
2796 return ENOENT;
2797
2798 return async_req_2_0(exch, IPC_M_DATA_READ, (sysarg_t) dst,
2799 (sysarg_t) size);
2800}
2801
2802/** Wrapper for receiving the IPC_M_DATA_READ calls using the async framework.
2803 *
2804 * This wrapper only makes it more comfortable to receive IPC_M_DATA_READ
2805 * calls so that the user doesn't have to remember the meaning of each IPC
2806 * argument.
2807 *
2808 * So far, this wrapper is to be used from within a connection fibril.
2809 *
2810 * @param callid Storage for the hash of the IPC_M_DATA_READ.
2811 * @param size Storage for the maximum size. Can be NULL.
2812 *
2813 * @return True on success, false on failure.
2814 *
2815 */
2816bool async_data_read_receive(ipc_callid_t *callid, size_t *size)
2817{
2818 ipc_call_t data;
2819 return async_data_read_receive_call(callid, &data, size);
2820}
2821
2822/** Wrapper for receiving the IPC_M_DATA_READ calls using the async framework.
2823 *
2824 * This wrapper only makes it more comfortable to receive IPC_M_DATA_READ
2825 * calls so that the user doesn't have to remember the meaning of each IPC
2826 * argument.
2827 *
2828 * So far, this wrapper is to be used from within a connection fibril.
2829 *
2830 * @param callid Storage for the hash of the IPC_M_DATA_READ.
2831 * @param size Storage for the maximum size. Can be NULL.
2832 *
2833 * @return True on success, false on failure.
2834 *
2835 */
2836bool async_data_read_receive_call(ipc_callid_t *callid, ipc_call_t *data,
2837 size_t *size)
2838{
2839 assert(callid);
2840 assert(data);
2841
2842 *callid = async_get_call(data);
2843
2844 if (IPC_GET_IMETHOD(*data) != IPC_M_DATA_READ)
2845 return false;
2846
2847 if (size)
2848 *size = (size_t) IPC_GET_ARG2(*data);
2849
2850 return true;
2851}
2852
2853/** Wrapper for answering the IPC_M_DATA_READ calls using the async framework.
2854 *
2855 * This wrapper only makes it more comfortable to answer IPC_M_DATA_READ
2856 * calls so that the user doesn't have to remember the meaning of each IPC
2857 * argument.
2858 *
2859 * @param callid Hash of the IPC_M_DATA_READ call to answer.
2860 * @param src Source address for the IPC_M_DATA_READ call.
2861 * @param size Size for the IPC_M_DATA_READ call. Can be smaller than
2862 * the maximum size announced by the sender.
2863 *
2864 * @return Zero on success or a value from @ref errno.h on failure.
2865 *
2866 */
2867int async_data_read_finalize(ipc_callid_t callid, const void *src, size_t size)
2868{
2869 return ipc_answer_2(callid, EOK, (sysarg_t) src, (sysarg_t) size);
2870}
2871
2872/** Wrapper for forwarding any read request
2873 *
2874 */
2875int async_data_read_forward_fast(async_exch_t *exch, sysarg_t imethod,
2876 sysarg_t arg1, sysarg_t arg2, sysarg_t arg3, sysarg_t arg4,
2877 ipc_call_t *dataptr)
2878{
2879 if (exch == NULL)
2880 return ENOENT;
2881
2882 ipc_callid_t callid;
2883 if (!async_data_read_receive(&callid, NULL)) {
2884 ipc_answer_0(callid, EINVAL);
2885 return EINVAL;
2886 }
2887
2888 aid_t msg = async_send_fast(exch, imethod, arg1, arg2, arg3, arg4,
2889 dataptr);
2890 if (msg == 0) {
2891 ipc_answer_0(callid, EINVAL);
2892 return EINVAL;
2893 }
2894
2895 int retval = ipc_forward_fast(callid, exch->phone, 0, 0, 0,
2896 IPC_FF_ROUTE_FROM_ME);
2897 if (retval != EOK) {
2898 async_forget(msg);
2899 ipc_answer_0(callid, retval);
2900 return retval;
2901 }
2902
2903 sysarg_t rc;
2904 async_wait_for(msg, &rc);
2905
2906 return (int) rc;
2907}
2908
2909/** Wrapper for IPC_M_DATA_WRITE calls using the async framework.
2910 *
2911 * @param exch Exchange for sending the message.
2912 * @param src Address of the beginning of the source buffer.
2913 * @param size Size of the source buffer.
2914 *
2915 * @return Zero on success or a negative error code from errno.h.
2916 *
2917 */
2918int async_data_write_start(async_exch_t *exch, const void *src, size_t size)
2919{
2920 if (exch == NULL)
2921 return ENOENT;
2922
2923 return async_req_2_0(exch, IPC_M_DATA_WRITE, (sysarg_t) src,
2924 (sysarg_t) size);
2925}
2926
2927/** Wrapper for receiving the IPC_M_DATA_WRITE calls using the async framework.
2928 *
2929 * This wrapper only makes it more comfortable to receive IPC_M_DATA_WRITE
2930 * calls so that the user doesn't have to remember the meaning of each IPC
2931 * argument.
2932 *
2933 * So far, this wrapper is to be used from within a connection fibril.
2934 *
2935 * @param callid Storage for the hash of the IPC_M_DATA_WRITE.
2936 * @param size Storage for the suggested size. May be NULL.
2937 *
2938 * @return True on success, false on failure.
2939 *
2940 */
2941bool async_data_write_receive(ipc_callid_t *callid, size_t *size)
2942{
2943 ipc_call_t data;
2944 return async_data_write_receive_call(callid, &data, size);
2945}
2946
2947/** Wrapper for receiving the IPC_M_DATA_WRITE calls using the async framework.
2948 *
2949 * This wrapper only makes it more comfortable to receive IPC_M_DATA_WRITE
2950 * calls so that the user doesn't have to remember the meaning of each IPC
2951 * argument.
2952 *
2953 * So far, this wrapper is to be used from within a connection fibril.
2954 *
2955 * @param callid Storage for the hash of the IPC_M_DATA_WRITE.
2956 * @param data Storage for the ipc call data.
2957 * @param size Storage for the suggested size. May be NULL.
2958 *
2959 * @return True on success, false on failure.
2960 *
2961 */
2962bool async_data_write_receive_call(ipc_callid_t *callid, ipc_call_t *data,
2963 size_t *size)
2964{
2965 assert(callid);
2966 assert(data);
2967
2968 *callid = async_get_call(data);
2969
2970 if (IPC_GET_IMETHOD(*data) != IPC_M_DATA_WRITE)
2971 return false;
2972
2973 if (size)
2974 *size = (size_t) IPC_GET_ARG2(*data);
2975
2976 return true;
2977}
2978
2979/** Wrapper for answering the IPC_M_DATA_WRITE calls using the async framework.
2980 *
2981 * This wrapper only makes it more comfortable to answer IPC_M_DATA_WRITE
2982 * calls so that the user doesn't have to remember the meaning of each IPC
2983 * argument.
2984 *
2985 * @param callid Hash of the IPC_M_DATA_WRITE call to answer.
2986 * @param dst Final destination address for the IPC_M_DATA_WRITE call.
2987 * @param size Final size for the IPC_M_DATA_WRITE call.
2988 *
2989 * @return Zero on success or a value from @ref errno.h on failure.
2990 *
2991 */
2992int async_data_write_finalize(ipc_callid_t callid, void *dst, size_t size)
2993{
2994 return ipc_answer_2(callid, EOK, (sysarg_t) dst, (sysarg_t) size);
2995}
2996
2997/** Wrapper for receiving binary data or strings
2998 *
2999 * This wrapper only makes it more comfortable to use async_data_write_*
3000 * functions to receive binary data or strings.
3001 *
3002 * @param data Pointer to data pointer (which should be later disposed
3003 * by free()). If the operation fails, the pointer is not
3004 * touched.
3005 * @param nullterm If true then the received data is always zero terminated.
3006 * This also causes to allocate one extra byte beyond the
3007 * raw transmitted data.
3008 * @param min_size Minimum size (in bytes) of the data to receive.
3009 * @param max_size Maximum size (in bytes) of the data to receive. 0 means
3010 * no limit.
3011 * @param granulariy If non-zero then the size of the received data has to
3012 * be divisible by this value.
3013 * @param received If not NULL, the size of the received data is stored here.
3014 *
3015 * @return Zero on success or a value from @ref errno.h on failure.
3016 *
3017 */
3018int async_data_write_accept(void **data, const bool nullterm,
3019 const size_t min_size, const size_t max_size, const size_t granularity,
3020 size_t *received)
3021{
3022 assert(data);
3023
3024 ipc_callid_t callid;
3025 size_t size;
3026 if (!async_data_write_receive(&callid, &size)) {
3027 ipc_answer_0(callid, EINVAL);
3028 return EINVAL;
3029 }
3030
3031 if (size < min_size) {
3032 ipc_answer_0(callid, EINVAL);
3033 return EINVAL;
3034 }
3035
3036 if ((max_size > 0) && (size > max_size)) {
3037 ipc_answer_0(callid, EINVAL);
3038 return EINVAL;
3039 }
3040
3041 if ((granularity > 0) && ((size % granularity) != 0)) {
3042 ipc_answer_0(callid, EINVAL);
3043 return EINVAL;
3044 }
3045
3046 void *arg_data;
3047
3048 if (nullterm)
3049 arg_data = malloc(size + 1);
3050 else
3051 arg_data = malloc(size);
3052
3053 if (arg_data == NULL) {
3054 ipc_answer_0(callid, ENOMEM);
3055 return ENOMEM;
3056 }
3057
3058 int rc = async_data_write_finalize(callid, arg_data, size);
3059 if (rc != EOK) {
3060 free(arg_data);
3061 return rc;
3062 }
3063
3064 if (nullterm)
3065 ((char *) arg_data)[size] = 0;
3066
3067 *data = arg_data;
3068 if (received != NULL)
3069 *received = size;
3070
3071 return EOK;
3072}
3073
3074/** Wrapper for voiding any data that is about to be received
3075 *
3076 * This wrapper can be used to void any pending data
3077 *
3078 * @param retval Error value from @ref errno.h to be returned to the caller.
3079 *
3080 */
3081void async_data_write_void(sysarg_t retval)
3082{
3083 ipc_callid_t callid;
3084 async_data_write_receive(&callid, NULL);
3085 ipc_answer_0(callid, retval);
3086}
3087
3088/** Wrapper for forwarding any data that is about to be received
3089 *
3090 */
3091int async_data_write_forward_fast(async_exch_t *exch, sysarg_t imethod,
3092 sysarg_t arg1, sysarg_t arg2, sysarg_t arg3, sysarg_t arg4,
3093 ipc_call_t *dataptr)
3094{
3095 if (exch == NULL)
3096 return ENOENT;
3097
3098 ipc_callid_t callid;
3099 if (!async_data_write_receive(&callid, NULL)) {
3100 ipc_answer_0(callid, EINVAL);
3101 return EINVAL;
3102 }
3103
3104 aid_t msg = async_send_fast(exch, imethod, arg1, arg2, arg3, arg4,
3105 dataptr);
3106 if (msg == 0) {
3107 ipc_answer_0(callid, EINVAL);
3108 return EINVAL;
3109 }
3110
3111 int retval = ipc_forward_fast(callid, exch->phone, 0, 0, 0,
3112 IPC_FF_ROUTE_FROM_ME);
3113 if (retval != EOK) {
3114 async_forget(msg);
3115 ipc_answer_0(callid, retval);
3116 return retval;
3117 }
3118
3119 sysarg_t rc;
3120 async_wait_for(msg, &rc);
3121
3122 return (int) rc;
3123}
3124
3125/** Wrapper for sending an exchange over different exchange for cloning
3126 *
3127 * @param exch Exchange to be used for sending.
3128 * @param clone_exch Exchange to be cloned.
3129 *
3130 */
3131int async_exchange_clone(async_exch_t *exch, async_exch_t *clone_exch)
3132{
3133 return async_req_1_0(exch, IPC_M_CONNECTION_CLONE, clone_exch->phone);
3134}
3135
3136/** Wrapper for receiving the IPC_M_CONNECTION_CLONE calls.
3137 *
3138 * If the current call is IPC_M_CONNECTION_CLONE then a new
3139 * async session is created for the accepted phone.
3140 *
3141 * @param mgmt Exchange management style.
3142 *
3143 * @return New async session or NULL on failure.
3144 *
3145 */
3146async_sess_t *async_clone_receive(exch_mgmt_t mgmt)
3147{
3148 /* Accept the phone */
3149 ipc_call_t call;
3150 ipc_callid_t callid = async_get_call(&call);
3151 int phone = (int) IPC_GET_ARG1(call);
3152
3153 if ((IPC_GET_IMETHOD(call) != IPC_M_CONNECTION_CLONE) ||
3154 (phone < 0)) {
3155 async_answer_0(callid, EINVAL);
3156 return NULL;
3157 }
3158
3159 async_sess_t *sess = (async_sess_t *) malloc(sizeof(async_sess_t));
3160 if (sess == NULL) {
3161 async_answer_0(callid, ENOMEM);
3162 return NULL;
3163 }
3164
3165 sess->iface = 0;
3166 sess->mgmt = mgmt;
3167 sess->phone = phone;
3168 sess->arg1 = 0;
3169 sess->arg2 = 0;
3170 sess->arg3 = 0;
3171
3172 fibril_mutex_initialize(&sess->remote_state_mtx);
3173 sess->remote_state_data = NULL;
3174
3175 list_initialize(&sess->exch_list);
3176 fibril_mutex_initialize(&sess->mutex);
3177 atomic_set(&sess->refcnt, 0);
3178
3179 /* Acknowledge the cloned phone */
3180 async_answer_0(callid, EOK);
3181
3182 return sess;
3183}
3184
3185/** Wrapper for receiving the IPC_M_CONNECT_TO_ME calls.
3186 *
3187 * If the current call is IPC_M_CONNECT_TO_ME then a new
3188 * async session is created for the accepted phone.
3189 *
3190 * @param mgmt Exchange management style.
3191 *
3192 * @return New async session.
3193 * @return NULL on failure.
3194 *
3195 */
3196async_sess_t *async_callback_receive(exch_mgmt_t mgmt)
3197{
3198 /* Accept the phone */
3199 ipc_call_t call;
3200 ipc_callid_t callid = async_get_call(&call);
3201 int phone = (int) IPC_GET_ARG5(call);
3202
3203 if ((IPC_GET_IMETHOD(call) != IPC_M_CONNECT_TO_ME) ||
3204 (phone < 0)) {
3205 async_answer_0(callid, EINVAL);
3206 return NULL;
3207 }
3208
3209 async_sess_t *sess = (async_sess_t *) malloc(sizeof(async_sess_t));
3210 if (sess == NULL) {
3211 async_answer_0(callid, ENOMEM);
3212 return NULL;
3213 }
3214
3215 sess->iface = 0;
3216 sess->mgmt = mgmt;
3217 sess->phone = phone;
3218 sess->arg1 = 0;
3219 sess->arg2 = 0;
3220 sess->arg3 = 0;
3221
3222 fibril_mutex_initialize(&sess->remote_state_mtx);
3223 sess->remote_state_data = NULL;
3224
3225 list_initialize(&sess->exch_list);
3226 fibril_mutex_initialize(&sess->mutex);
3227 atomic_set(&sess->refcnt, 0);
3228
3229 /* Acknowledge the connected phone */
3230 async_answer_0(callid, EOK);
3231
3232 return sess;
3233}
3234
3235/** Wrapper for receiving the IPC_M_CONNECT_TO_ME calls.
3236 *
3237 * If the call is IPC_M_CONNECT_TO_ME then a new
3238 * async session is created. However, the phone is
3239 * not accepted automatically.
3240 *
3241 * @param mgmt Exchange management style.
3242 * @param call Call data.
3243 *
3244 * @return New async session.
3245 * @return NULL on failure.
3246 * @return NULL if the call is not IPC_M_CONNECT_TO_ME.
3247 *
3248 */
3249async_sess_t *async_callback_receive_start(exch_mgmt_t mgmt, ipc_call_t *call)
3250{
3251 int phone = (int) IPC_GET_ARG5(*call);
3252
3253 if ((IPC_GET_IMETHOD(*call) != IPC_M_CONNECT_TO_ME) ||
3254 (phone < 0))
3255 return NULL;
3256
3257 async_sess_t *sess = (async_sess_t *) malloc(sizeof(async_sess_t));
3258 if (sess == NULL)
3259 return NULL;
3260
3261 sess->iface = 0;
3262 sess->mgmt = mgmt;
3263 sess->phone = phone;
3264 sess->arg1 = 0;
3265 sess->arg2 = 0;
3266 sess->arg3 = 0;
3267
3268 fibril_mutex_initialize(&sess->remote_state_mtx);
3269 sess->remote_state_data = NULL;
3270
3271 list_initialize(&sess->exch_list);
3272 fibril_mutex_initialize(&sess->mutex);
3273 atomic_set(&sess->refcnt, 0);
3274
3275 return sess;
3276}
3277
3278int async_state_change_start(async_exch_t *exch, sysarg_t arg1, sysarg_t arg2,
3279 sysarg_t arg3, async_exch_t *other_exch)
3280{
3281 return async_req_5_0(exch, IPC_M_STATE_CHANGE_AUTHORIZE,
3282 arg1, arg2, arg3, 0, other_exch->phone);
3283}
3284
3285bool async_state_change_receive(ipc_callid_t *callid, sysarg_t *arg1,
3286 sysarg_t *arg2, sysarg_t *arg3)
3287{
3288 assert(callid);
3289
3290 ipc_call_t call;
3291 *callid = async_get_call(&call);
3292
3293 if (IPC_GET_IMETHOD(call) != IPC_M_STATE_CHANGE_AUTHORIZE)
3294 return false;
3295
3296 if (arg1)
3297 *arg1 = IPC_GET_ARG1(call);
3298 if (arg2)
3299 *arg2 = IPC_GET_ARG2(call);
3300 if (arg3)
3301 *arg3 = IPC_GET_ARG3(call);
3302
3303 return true;
3304}
3305
3306int async_state_change_finalize(ipc_callid_t callid, async_exch_t *other_exch)
3307{
3308 return ipc_answer_1(callid, EOK, other_exch->phone);
3309}
3310
3311/** Lock and get session remote state
3312 *
3313 * Lock and get the local replica of the remote state
3314 * in stateful sessions. The call should be paired
3315 * with async_remote_state_release*().
3316 *
3317 * @param[in] sess Stateful session.
3318 *
3319 * @return Local replica of the remote state.
3320 *
3321 */
3322void *async_remote_state_acquire(async_sess_t *sess)
3323{
3324 fibril_mutex_lock(&sess->remote_state_mtx);
3325 return sess->remote_state_data;
3326}
3327
3328/** Update the session remote state
3329 *
3330 * Update the local replica of the remote state
3331 * in stateful sessions. The remote state must
3332 * be already locked.
3333 *
3334 * @param[in] sess Stateful session.
3335 * @param[in] state New local replica of the remote state.
3336 *
3337 */
3338void async_remote_state_update(async_sess_t *sess, void *state)
3339{
3340 assert(fibril_mutex_is_locked(&sess->remote_state_mtx));
3341 sess->remote_state_data = state;
3342}
3343
3344/** Release the session remote state
3345 *
3346 * Unlock the local replica of the remote state
3347 * in stateful sessions.
3348 *
3349 * @param[in] sess Stateful session.
3350 *
3351 */
3352void async_remote_state_release(async_sess_t *sess)
3353{
3354 assert(fibril_mutex_is_locked(&sess->remote_state_mtx));
3355
3356 fibril_mutex_unlock(&sess->remote_state_mtx);
3357}
3358
3359/** Release the session remote state and end an exchange
3360 *
3361 * Unlock the local replica of the remote state
3362 * in stateful sessions. This is convenience function
3363 * which gets the session pointer from the exchange
3364 * and also ends the exchange.
3365 *
3366 * @param[in] exch Stateful session's exchange.
3367 *
3368 */
3369void async_remote_state_release_exchange(async_exch_t *exch)
3370{
3371 if (exch == NULL)
3372 return;
3373
3374 async_sess_t *sess = exch->sess;
3375 assert(fibril_mutex_is_locked(&sess->remote_state_mtx));
3376
3377 async_exchange_end(exch);
3378 fibril_mutex_unlock(&sess->remote_state_mtx);
3379}
3380
3381/** @}
3382 */
Note: See TracBrowser for help on using the repository browser.