source: mainline/uspace/lib/c/generic/async.c@ b688fd8

lfn serial ticket/834-toolchain-update topic/msim-upgrade topic/simplify-dev-export
Last change on this file since b688fd8 was b688fd8, checked in by Martin Decky <martin@…>, 10 years ago

gradually introduce async ports, initial phase

The initial phase is to reimplement the traditional async client connections as an untyped fallback port. This creates the possibility to introduce ports typed by interface type gradually in later changesets.

  • Property mode set to 100644
File size: 73.1 KB
Line 
1/*
2 * Copyright (c) 2006 Ondrej Palkovsky
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 *
9 * - Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 * - Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
14 * - The name of the author may not be used to endorse or promote products
15 * derived from this software without specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
18 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
19 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
20 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
21 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
22 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
26 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27 */
28
29/** @addtogroup libc
30 * @{
31 */
32/** @file
33 */
34
35/**
36 * Asynchronous library
37 *
38 * The aim of this library is to provide a facility for writing programs which
39 * utilize the asynchronous nature of HelenOS IPC, yet using a normal way of
40 * programming.
41 *
42 * You should be able to write very simple multithreaded programs. The async
43 * framework will automatically take care of most of the synchronization
44 * problems.
45 *
46 * Example of use (pseudo C):
47 *
48 * 1) Multithreaded client application
49 *
50 * fibril_create(fibril1, ...);
51 * fibril_create(fibril2, ...);
52 * ...
53 *
54 * int fibril1(void *arg)
55 * {
56 * conn = async_connect_me_to(...);
57 *
58 * exch = async_exchange_begin(conn);
59 * c1 = async_send(exch);
60 * async_exchange_end(exch);
61 *
62 * exch = async_exchange_begin(conn);
63 * c2 = async_send(exch);
64 * async_exchange_end(exch);
65 *
66 * async_wait_for(c1);
67 * async_wait_for(c2);
68 * ...
69 * }
70 *
71 *
72 * 2) Multithreaded server application
73 *
74 * main()
75 * {
76 * async_manager();
77 * }
78 *
79 * port_handler(icallid, *icall)
80 * {
81 * if (want_refuse) {
82 * async_answer_0(icallid, ELIMIT);
83 * return;
84 * }
85 * async_answer_0(icallid, EOK);
86 *
87 * callid = async_get_call(&call);
88 * somehow_handle_the_call(callid, call);
89 * async_answer_2(callid, 1, 2, 3);
90 *
91 * callid = async_get_call(&call);
92 * ...
93 * }
94 *
95 */
96
97#define LIBC_ASYNC_C_
98#include <ipc/ipc.h>
99#include <async.h>
100#include "private/async.h"
101#undef LIBC_ASYNC_C_
102
103#include <ipc/irq.h>
104#include <ipc/event.h>
105#include <futex.h>
106#include <fibril.h>
107#include <adt/hash_table.h>
108#include <adt/list.h>
109#include <assert.h>
110#include <errno.h>
111#include <sys/time.h>
112#include <libarch/barrier.h>
113#include <stdbool.h>
114#include <malloc.h>
115#include <mem.h>
116#include <stdlib.h>
117#include <macros.h>
118#include "private/libc.h"
119
120/** Session data */
121struct async_sess {
122 /** List of inactive exchanges */
123 list_t exch_list;
124
125 /** Exchange management style */
126 exch_mgmt_t mgmt;
127
128 /** Session identification */
129 int phone;
130
131 /** First clone connection argument */
132 sysarg_t arg1;
133
134 /** Second clone connection argument */
135 sysarg_t arg2;
136
137 /** Third clone connection argument */
138 sysarg_t arg3;
139
140 /** Exchange mutex */
141 fibril_mutex_t mutex;
142
143 /** Number of opened exchanges */
144 atomic_t refcnt;
145
146 /** Mutex for stateful connections */
147 fibril_mutex_t remote_state_mtx;
148
149 /** Data for stateful connections */
150 void *remote_state_data;
151};
152
153/** Exchange data */
154struct async_exch {
155 /** Link into list of inactive exchanges */
156 link_t sess_link;
157
158 /** Link into global list of inactive exchanges */
159 link_t global_link;
160
161 /** Session pointer */
162 async_sess_t *sess;
163
164 /** Exchange identification */
165 int phone;
166};
167
168/** Async framework global futex */
169futex_t async_futex = FUTEX_INITIALIZER;
170
171/** Number of threads waiting for IPC in the kernel. */
172atomic_t threads_in_ipc_wait = { 0 };
173
174/** Naming service session */
175async_sess_t *session_ns;
176
177/** Call data */
178typedef struct {
179 link_t link;
180
181 ipc_callid_t callid;
182 ipc_call_t call;
183} msg_t;
184
185/** Message data */
186typedef struct {
187 awaiter_t wdata;
188
189 /** If reply was received. */
190 bool done;
191
192 /** If the message / reply should be discarded on arrival. */
193 bool forget;
194
195 /** If already destroyed. */
196 bool destroyed;
197
198 /** Pointer to where the answer data is stored. */
199 ipc_call_t *dataptr;
200
201 sysarg_t retval;
202} amsg_t;
203
204/* Client connection data */
205typedef struct {
206 ht_link_t link;
207
208 task_id_t in_task_id;
209 atomic_t refcnt;
210 void *data;
211} client_t;
212
213/* Server connection data */
214typedef struct {
215 awaiter_t wdata;
216
217 /** Hash table link. */
218 ht_link_t link;
219
220 /** Incoming client task ID. */
221 task_id_t in_task_id;
222
223 /** Incoming phone hash. */
224 sysarg_t in_phone_hash;
225
226 /** Link to the client tracking structure. */
227 client_t *client;
228
229 /** Messages that should be delivered to this fibril. */
230 list_t msg_queue;
231
232 /** Identification of the opening call. */
233 ipc_callid_t callid;
234
235 /** Call data of the opening call. */
236 ipc_call_t call;
237
238 /** Identification of the closing call. */
239 ipc_callid_t close_callid;
240
241 /** Fibril function that will be used to handle the connection. */
242 async_port_handler_t handler;
243
244 /** Client data */
245 void *data;
246} connection_t;
247
248/* Notification data */
249typedef struct {
250 ht_link_t link;
251
252 /** Notification method */
253 sysarg_t imethod;
254
255 /** Notification handler */
256 async_notification_handler_t handler;
257
258 /** Notification data */
259 void *data;
260} notification_t;
261
262/** Identifier of the incoming connection handled by the current fibril. */
263static fibril_local connection_t *fibril_connection;
264
265static void to_event_initialize(to_event_t *to)
266{
267 struct timeval tv = { 0, 0 };
268
269 to->inlist = false;
270 to->occurred = false;
271 link_initialize(&to->link);
272 to->expires = tv;
273}
274
275static void wu_event_initialize(wu_event_t *wu)
276{
277 wu->inlist = false;
278 link_initialize(&wu->link);
279}
280
281void awaiter_initialize(awaiter_t *aw)
282{
283 aw->fid = 0;
284 aw->active = false;
285 to_event_initialize(&aw->to_event);
286 wu_event_initialize(&aw->wu_event);
287}
288
289static amsg_t *amsg_create(void)
290{
291 amsg_t *msg;
292
293 msg = malloc(sizeof(amsg_t));
294 if (msg) {
295 msg->done = false;
296 msg->forget = false;
297 msg->destroyed = false;
298 msg->dataptr = NULL;
299 msg->retval = (sysarg_t) EINVAL;
300 awaiter_initialize(&msg->wdata);
301 }
302
303 return msg;
304}
305
306static void amsg_destroy(amsg_t *msg)
307{
308 assert(!msg->destroyed);
309 msg->destroyed = true;
310 free(msg);
311}
312
313static void *default_client_data_constructor(void)
314{
315 return NULL;
316}
317
318static void default_client_data_destructor(void *data)
319{
320}
321
322static async_client_data_ctor_t async_client_data_create =
323 default_client_data_constructor;
324static async_client_data_dtor_t async_client_data_destroy =
325 default_client_data_destructor;
326
327void async_set_client_data_constructor(async_client_data_ctor_t ctor)
328{
329 assert(async_client_data_create == default_client_data_constructor);
330 async_client_data_create = ctor;
331}
332
333void async_set_client_data_destructor(async_client_data_dtor_t dtor)
334{
335 assert(async_client_data_destroy == default_client_data_destructor);
336 async_client_data_destroy = dtor;
337}
338
339/** Default fallback fibril function.
340 *
341 * This fallback fibril function gets called on incomming
342 * connections that do not have a specific handler defined.
343 *
344 * @param callid Hash of the incoming call.
345 * @param call Data of the incoming call.
346 * @param arg Local argument
347 *
348 */
349static void default_fallback_port_handler(ipc_callid_t callid, ipc_call_t *call,
350 void *arg)
351{
352 ipc_answer_0(callid, ENOENT);
353}
354
355static async_port_handler_t fallback_port_handler =
356 default_fallback_port_handler;
357static void *fallback_port_data = NULL;
358
359static size_t notification_handler_stksz = FIBRIL_DFLT_STK_SIZE;
360
361/** Set the stack size for the notification handler notification fibrils.
362 *
363 * @param size Stack size in bytes.
364 */
365void async_set_notification_handler_stack_size(size_t size)
366{
367 notification_handler_stksz = size;
368}
369
370/** Mutex protecting inactive_exch_list and avail_phone_cv.
371 *
372 */
373static FIBRIL_MUTEX_INITIALIZE(async_sess_mutex);
374
375/** List of all currently inactive exchanges.
376 *
377 */
378static LIST_INITIALIZE(inactive_exch_list);
379
380/** Condition variable to wait for a phone to become available.
381 *
382 */
383static FIBRIL_CONDVAR_INITIALIZE(avail_phone_cv);
384
385void async_set_fallback_port_handler(async_port_handler_t handler, void *data)
386{
387 assert(handler != NULL);
388
389 fallback_port_handler = handler;
390 fallback_port_data = data;
391}
392
393static hash_table_t client_hash_table;
394static hash_table_t conn_hash_table;
395static hash_table_t notification_hash_table;
396static LIST_INITIALIZE(timeout_list);
397
398static sysarg_t notification_avail = 0;
399
400static size_t client_key_hash(void *key)
401{
402 task_id_t in_task_id = *(task_id_t *) key;
403 return in_task_id;
404}
405
406static size_t client_hash(const ht_link_t *item)
407{
408 client_t *client = hash_table_get_inst(item, client_t, link);
409 return client_key_hash(&client->in_task_id);
410}
411
412static bool client_key_equal(void *key, const ht_link_t *item)
413{
414 task_id_t in_task_id = *(task_id_t *) key;
415 client_t *client = hash_table_get_inst(item, client_t, link);
416 return in_task_id == client->in_task_id;
417}
418
419/** Operations for the client hash table. */
420static hash_table_ops_t client_hash_table_ops = {
421 .hash = client_hash,
422 .key_hash = client_key_hash,
423 .key_equal = client_key_equal,
424 .equal = NULL,
425 .remove_callback = NULL
426};
427
428/** Compute hash into the connection hash table based on the source phone hash.
429 *
430 * @param key Pointer to source phone hash.
431 *
432 * @return Index into the connection hash table.
433 *
434 */
435static size_t conn_key_hash(void *key)
436{
437 sysarg_t in_phone_hash = *(sysarg_t *) key;
438 return in_phone_hash;
439}
440
441static size_t conn_hash(const ht_link_t *item)
442{
443 connection_t *conn = hash_table_get_inst(item, connection_t, link);
444 return conn_key_hash(&conn->in_phone_hash);
445}
446
447static bool conn_key_equal(void *key, const ht_link_t *item)
448{
449 sysarg_t in_phone_hash = *(sysarg_t *) key;
450 connection_t *conn = hash_table_get_inst(item, connection_t, link);
451 return (in_phone_hash == conn->in_phone_hash);
452}
453
454/** Operations for the connection hash table. */
455static hash_table_ops_t conn_hash_table_ops = {
456 .hash = conn_hash,
457 .key_hash = conn_key_hash,
458 .key_equal = conn_key_equal,
459 .equal = NULL,
460 .remove_callback = NULL
461};
462
463static size_t notification_key_hash(void *key)
464{
465 sysarg_t id = *(sysarg_t *) key;
466 return id;
467}
468
469static size_t notification_hash(const ht_link_t *item)
470{
471 notification_t *notification =
472 hash_table_get_inst(item, notification_t, link);
473 return notification_key_hash(&notification->imethod);
474}
475
476static bool notification_key_equal(void *key, const ht_link_t *item)
477{
478 sysarg_t id = *(sysarg_t *) key;
479 notification_t *notification =
480 hash_table_get_inst(item, notification_t, link);
481 return id == notification->imethod;
482}
483
484/** Operations for the notification hash table. */
485static hash_table_ops_t notification_hash_table_ops = {
486 .hash = notification_hash,
487 .key_hash = notification_key_hash,
488 .key_equal = notification_key_equal,
489 .equal = NULL,
490 .remove_callback = NULL
491};
492
493/** Sort in current fibril's timeout request.
494 *
495 * @param wd Wait data of the current fibril.
496 *
497 */
498void async_insert_timeout(awaiter_t *wd)
499{
500 assert(wd);
501
502 wd->to_event.occurred = false;
503 wd->to_event.inlist = true;
504
505 link_t *tmp = timeout_list.head.next;
506 while (tmp != &timeout_list.head) {
507 awaiter_t *cur
508 = list_get_instance(tmp, awaiter_t, to_event.link);
509
510 if (tv_gteq(&cur->to_event.expires, &wd->to_event.expires))
511 break;
512
513 tmp = tmp->next;
514 }
515
516 list_insert_before(&wd->to_event.link, tmp);
517}
518
519/** Try to route a call to an appropriate connection fibril.
520 *
521 * If the proper connection fibril is found, a message with the call is added to
522 * its message queue. If the fibril was not active, it is activated and all
523 * timeouts are unregistered.
524 *
525 * @param callid Hash of the incoming call.
526 * @param call Data of the incoming call.
527 *
528 * @return False if the call doesn't match any connection.
529 * @return True if the call was passed to the respective connection fibril.
530 *
531 */
532static bool route_call(ipc_callid_t callid, ipc_call_t *call)
533{
534 assert(call);
535
536 futex_down(&async_futex);
537
538 ht_link_t *link = hash_table_find(&conn_hash_table, &call->in_phone_hash);
539 if (!link) {
540 futex_up(&async_futex);
541 return false;
542 }
543
544 connection_t *conn = hash_table_get_inst(link, connection_t, link);
545
546 msg_t *msg = malloc(sizeof(*msg));
547 if (!msg) {
548 futex_up(&async_futex);
549 return false;
550 }
551
552 msg->callid = callid;
553 msg->call = *call;
554 list_append(&msg->link, &conn->msg_queue);
555
556 if (IPC_GET_IMETHOD(*call) == IPC_M_PHONE_HUNGUP)
557 conn->close_callid = callid;
558
559 /* If the connection fibril is waiting for an event, activate it */
560 if (!conn->wdata.active) {
561
562 /* If in timeout list, remove it */
563 if (conn->wdata.to_event.inlist) {
564 conn->wdata.to_event.inlist = false;
565 list_remove(&conn->wdata.to_event.link);
566 }
567
568 conn->wdata.active = true;
569 fibril_add_ready(conn->wdata.fid);
570 }
571
572 futex_up(&async_futex);
573 return true;
574}
575
576/** Notification fibril.
577 *
578 * When a notification arrives, a fibril with this implementing function is
579 * created. It calls the corresponding notification handler and does the final
580 * cleanup.
581 *
582 * @param arg Message structure pointer.
583 *
584 * @return Always zero.
585 *
586 */
587static int notification_fibril(void *arg)
588{
589 assert(arg);
590
591 msg_t *msg = (msg_t *) arg;
592 async_notification_handler_t handler = NULL;
593 void *data = NULL;
594
595 futex_down(&async_futex);
596
597 ht_link_t *link = hash_table_find(&notification_hash_table,
598 &IPC_GET_IMETHOD(msg->call));
599 if (link) {
600 notification_t *notification =
601 hash_table_get_inst(link, notification_t, link);
602 handler = notification->handler;
603 data = notification->data;
604 }
605
606 futex_up(&async_futex);
607
608 if (handler)
609 handler(msg->callid, &msg->call, data);
610
611 free(msg);
612 return 0;
613}
614
615/** Process notification.
616 *
617 * A new fibril is created which would process the notification.
618 *
619 * @param callid Hash of the incoming call.
620 * @param call Data of the incoming call.
621 *
622 * @return False if an error occured.
623 * True if the call was passed to the notification fibril.
624 *
625 */
626static bool process_notification(ipc_callid_t callid, ipc_call_t *call)
627{
628 assert(call);
629
630 futex_down(&async_futex);
631
632 msg_t *msg = malloc(sizeof(*msg));
633 if (!msg) {
634 futex_up(&async_futex);
635 return false;
636 }
637
638 msg->callid = callid;
639 msg->call = *call;
640
641 fid_t fid = fibril_create_generic(notification_fibril, msg,
642 notification_handler_stksz);
643 if (fid == 0) {
644 free(msg);
645 futex_up(&async_futex);
646 return false;
647 }
648
649 fibril_add_ready(fid);
650
651 futex_up(&async_futex);
652 return true;
653}
654
655/** Subscribe to IRQ notification.
656 *
657 * @param inr IRQ number.
658 * @param devno Device number of the device generating inr.
659 * @param handler Notification handler.
660 * @param data Notification handler client data.
661 * @param ucode Top-half pseudocode handler.
662 *
663 * @return Zero on success or a negative error code.
664 *
665 */
666int async_irq_subscribe(int inr, int devno,
667 async_notification_handler_t handler, void *data, const irq_code_t *ucode)
668{
669 notification_t *notification =
670 (notification_t *) malloc(sizeof(notification_t));
671 if (!notification)
672 return ENOMEM;
673
674 futex_down(&async_futex);
675
676 sysarg_t imethod = notification_avail;
677 notification_avail++;
678
679 notification->imethod = imethod;
680 notification->handler = handler;
681 notification->data = data;
682
683 hash_table_insert(&notification_hash_table, &notification->link);
684
685 futex_up(&async_futex);
686
687 return ipc_irq_subscribe(inr, devno, imethod, ucode);
688}
689
690/** Unsubscribe from IRQ notification.
691 *
692 * @param inr IRQ number.
693 * @param devno Device number of the device generating inr.
694 *
695 * @return Zero on success or a negative error code.
696 *
697 */
698int async_irq_unsubscribe(int inr, int devno)
699{
700 // TODO: Remove entry from hash table
701 // to avoid memory leak
702
703 return ipc_irq_unsubscribe(inr, devno);
704}
705
706/** Subscribe to event notifications.
707 *
708 * @param evno Event type to subscribe.
709 * @param handler Notification handler.
710 * @param data Notification handler client data.
711 *
712 * @return Zero on success or a negative error code.
713 *
714 */
715int async_event_subscribe(event_type_t evno,
716 async_notification_handler_t handler, void *data)
717{
718 notification_t *notification =
719 (notification_t *) malloc(sizeof(notification_t));
720 if (!notification)
721 return ENOMEM;
722
723 futex_down(&async_futex);
724
725 sysarg_t imethod = notification_avail;
726 notification_avail++;
727
728 notification->imethod = imethod;
729 notification->handler = handler;
730 notification->data = data;
731
732 hash_table_insert(&notification_hash_table, &notification->link);
733
734 futex_up(&async_futex);
735
736 return ipc_event_subscribe(evno, imethod);
737}
738
739/** Subscribe to task event notifications.
740 *
741 * @param evno Event type to subscribe.
742 * @param handler Notification handler.
743 * @param data Notification handler client data.
744 *
745 * @return Zero on success or a negative error code.
746 *
747 */
748int async_event_task_subscribe(event_task_type_t evno,
749 async_notification_handler_t handler, void *data)
750{
751 notification_t *notification =
752 (notification_t *) malloc(sizeof(notification_t));
753 if (!notification)
754 return ENOMEM;
755
756 futex_down(&async_futex);
757
758 sysarg_t imethod = notification_avail;
759 notification_avail++;
760
761 notification->imethod = imethod;
762 notification->handler = handler;
763 notification->data = data;
764
765 hash_table_insert(&notification_hash_table, &notification->link);
766
767 futex_up(&async_futex);
768
769 return ipc_event_task_subscribe(evno, imethod);
770}
771
772/** Unmask event notifications.
773 *
774 * @param evno Event type to unmask.
775 *
776 * @return Value returned by the kernel.
777 *
778 */
779int async_event_unmask(event_type_t evno)
780{
781 return ipc_event_unmask(evno);
782}
783
784/** Unmask task event notifications.
785 *
786 * @param evno Event type to unmask.
787 *
788 * @return Value returned by the kernel.
789 *
790 */
791int async_event_task_unmask(event_task_type_t evno)
792{
793 return ipc_event_task_unmask(evno);
794}
795
796/** Return new incoming message for the current (fibril-local) connection.
797 *
798 * @param call Storage where the incoming call data will be stored.
799 * @param usecs Timeout in microseconds. Zero denotes no timeout.
800 *
801 * @return If no timeout was specified, then a hash of the
802 * incoming call is returned. If a timeout is specified,
803 * then a hash of the incoming call is returned unless
804 * the timeout expires prior to receiving a message. In
805 * that case zero is returned.
806 *
807 */
808ipc_callid_t async_get_call_timeout(ipc_call_t *call, suseconds_t usecs)
809{
810 assert(call);
811 assert(fibril_connection);
812
813 /* Why doing this?
814 * GCC 4.1.0 coughs on fibril_connection-> dereference.
815 * GCC 4.1.1 happilly puts the rdhwr instruction in delay slot.
816 * I would never expect to find so many errors in
817 * a compiler.
818 */
819 connection_t *conn = fibril_connection;
820
821 futex_down(&async_futex);
822
823 if (usecs) {
824 getuptime(&conn->wdata.to_event.expires);
825 tv_add_diff(&conn->wdata.to_event.expires, usecs);
826 } else
827 conn->wdata.to_event.inlist = false;
828
829 /* If nothing in queue, wait until something arrives */
830 while (list_empty(&conn->msg_queue)) {
831 if (conn->close_callid) {
832 /*
833 * Handle the case when the connection was already
834 * closed by the client but the server did not notice
835 * the first IPC_M_PHONE_HUNGUP call and continues to
836 * call async_get_call_timeout(). Repeat
837 * IPC_M_PHONE_HUNGUP until the caller notices.
838 */
839 memset(call, 0, sizeof(ipc_call_t));
840 IPC_SET_IMETHOD(*call, IPC_M_PHONE_HUNGUP);
841 futex_up(&async_futex);
842 return conn->close_callid;
843 }
844
845 if (usecs)
846 async_insert_timeout(&conn->wdata);
847
848 conn->wdata.active = false;
849
850 /*
851 * Note: the current fibril will be rescheduled either due to a
852 * timeout or due to an arriving message destined to it. In the
853 * former case, handle_expired_timeouts() and, in the latter
854 * case, route_call() will perform the wakeup.
855 */
856 fibril_switch(FIBRIL_TO_MANAGER);
857
858 /*
859 * Futex is up after getting back from async_manager.
860 * Get it again.
861 */
862 futex_down(&async_futex);
863 if ((usecs) && (conn->wdata.to_event.occurred)
864 && (list_empty(&conn->msg_queue))) {
865 /* If we timed out -> exit */
866 futex_up(&async_futex);
867 return 0;
868 }
869 }
870
871 msg_t *msg = list_get_instance(list_first(&conn->msg_queue), msg_t, link);
872 list_remove(&msg->link);
873
874 ipc_callid_t callid = msg->callid;
875 *call = msg->call;
876 free(msg);
877
878 futex_up(&async_futex);
879 return callid;
880}
881
882static client_t *async_client_get(task_id_t client_id, bool create)
883{
884 client_t *client = NULL;
885
886 futex_down(&async_futex);
887 ht_link_t *link = hash_table_find(&client_hash_table, &client_id);
888 if (link) {
889 client = hash_table_get_inst(link, client_t, link);
890 atomic_inc(&client->refcnt);
891 } else if (create) {
892 client = malloc(sizeof(client_t));
893 if (client) {
894 client->in_task_id = client_id;
895 client->data = async_client_data_create();
896
897 atomic_set(&client->refcnt, 1);
898 hash_table_insert(&client_hash_table, &client->link);
899 }
900 }
901
902 futex_up(&async_futex);
903 return client;
904}
905
906static void async_client_put(client_t *client)
907{
908 bool destroy;
909
910 futex_down(&async_futex);
911
912 if (atomic_predec(&client->refcnt) == 0) {
913 hash_table_remove(&client_hash_table, &client->in_task_id);
914 destroy = true;
915 } else
916 destroy = false;
917
918 futex_up(&async_futex);
919
920 if (destroy) {
921 if (client->data)
922 async_client_data_destroy(client->data);
923
924 free(client);
925 }
926}
927
928void *async_get_client_data(void)
929{
930 assert(fibril_connection);
931 return fibril_connection->client->data;
932}
933
934void *async_get_client_data_by_id(task_id_t client_id)
935{
936 client_t *client = async_client_get(client_id, false);
937 if (!client)
938 return NULL;
939 if (!client->data) {
940 async_client_put(client);
941 return NULL;
942 }
943
944 return client->data;
945}
946
947void async_put_client_data_by_id(task_id_t client_id)
948{
949 client_t *client = async_client_get(client_id, false);
950
951 assert(client);
952 assert(client->data);
953
954 /* Drop the reference we got in async_get_client_data_by_hash(). */
955 async_client_put(client);
956
957 /* Drop our own reference we got at the beginning of this function. */
958 async_client_put(client);
959}
960
961/** Wrapper for client connection fibril.
962 *
963 * When a new connection arrives, a fibril with this implementing function is
964 * created. It calls handler() and does the final cleanup.
965 *
966 * @param arg Connection structure pointer.
967 *
968 * @return Always zero.
969 *
970 */
971static int connection_fibril(void *arg)
972{
973 assert(arg);
974
975 /*
976 * Setup fibril-local connection pointer.
977 */
978 fibril_connection = (connection_t *) arg;
979
980 /*
981 * Add our reference for the current connection in the client task
982 * tracking structure. If this is the first reference, create and
983 * hash in a new tracking structure.
984 */
985
986 client_t *client = async_client_get(fibril_connection->in_task_id, true);
987 if (!client) {
988 ipc_answer_0(fibril_connection->callid, ENOMEM);
989 return 0;
990 }
991
992 fibril_connection->client = client;
993
994 /*
995 * Call the connection handler function.
996 */
997 fibril_connection->handler(fibril_connection->callid,
998 &fibril_connection->call, fibril_connection->data);
999
1000 /*
1001 * Remove the reference for this client task connection.
1002 */
1003 async_client_put(client);
1004
1005 /*
1006 * Remove myself from the connection hash table.
1007 */
1008 futex_down(&async_futex);
1009 hash_table_remove(&conn_hash_table, &fibril_connection->in_phone_hash);
1010 futex_up(&async_futex);
1011
1012 /*
1013 * Answer all remaining messages with EHANGUP.
1014 */
1015 while (!list_empty(&fibril_connection->msg_queue)) {
1016 msg_t *msg =
1017 list_get_instance(list_first(&fibril_connection->msg_queue),
1018 msg_t, link);
1019
1020 list_remove(&msg->link);
1021 ipc_answer_0(msg->callid, EHANGUP);
1022 free(msg);
1023 }
1024
1025 /*
1026 * If the connection was hung-up, answer the last call,
1027 * i.e. IPC_M_PHONE_HUNGUP.
1028 */
1029 if (fibril_connection->close_callid)
1030 ipc_answer_0(fibril_connection->close_callid, EOK);
1031
1032 free(fibril_connection);
1033 return 0;
1034}
1035
1036/** Create a new fibril for a new connection.
1037 *
1038 * Create new fibril for connection, fill in connection structures and insert
1039 * it into the hash table, so that later we can easily do routing of messages to
1040 * particular fibrils.
1041 *
1042 * @param in_task_id Identification of the incoming connection.
1043 * @param in_phone_hash Identification of the incoming connection.
1044 * @param callid Hash of the opening IPC_M_CONNECT_ME_TO call.
1045 * If callid is zero, the connection was opened by
1046 * accepting the IPC_M_CONNECT_TO_ME call and this function
1047 * is called directly by the server.
1048 * @param call Call data of the opening call.
1049 * @param handler Fibril function that should be called upon opening the
1050 * connection.
1051 * @param data Extra argument to pass to the connection fibril
1052 *
1053 * @return New fibril id or NULL on failure.
1054 *
1055 */
1056fid_t async_new_connection(task_id_t in_task_id, sysarg_t in_phone_hash,
1057 ipc_callid_t callid, ipc_call_t *call,
1058 async_port_handler_t handler, void *data)
1059{
1060 connection_t *conn = malloc(sizeof(*conn));
1061 if (!conn) {
1062 if (callid)
1063 ipc_answer_0(callid, ENOMEM);
1064
1065 return (uintptr_t) NULL;
1066 }
1067
1068 conn->in_task_id = in_task_id;
1069 conn->in_phone_hash = in_phone_hash;
1070 list_initialize(&conn->msg_queue);
1071 conn->callid = callid;
1072 conn->close_callid = 0;
1073 conn->data = data;
1074
1075 if (call)
1076 conn->call = *call;
1077
1078 /* We will activate the fibril ASAP */
1079 conn->wdata.active = true;
1080 conn->handler = handler;
1081 conn->wdata.fid = fibril_create(connection_fibril, conn);
1082
1083 if (conn->wdata.fid == 0) {
1084 free(conn);
1085
1086 if (callid)
1087 ipc_answer_0(callid, ENOMEM);
1088
1089 return (uintptr_t) NULL;
1090 }
1091
1092 /* Add connection to the connection hash table */
1093
1094 futex_down(&async_futex);
1095 hash_table_insert(&conn_hash_table, &conn->link);
1096 futex_up(&async_futex);
1097
1098 fibril_add_ready(conn->wdata.fid);
1099
1100 return conn->wdata.fid;
1101}
1102
1103/** Handle a call that was received.
1104 *
1105 * If the call has the IPC_M_CONNECT_ME_TO method, a new connection is created.
1106 * Otherwise the call is routed to its connection fibril.
1107 *
1108 * @param callid Hash of the incoming call.
1109 * @param call Data of the incoming call.
1110 *
1111 */
1112static void handle_call(ipc_callid_t callid, ipc_call_t *call)
1113{
1114 assert(call);
1115
1116 /* Kernel notification */
1117 if ((callid & IPC_CALLID_NOTIFICATION)) {
1118 process_notification(callid, call);
1119 return;
1120 }
1121
1122 switch (IPC_GET_IMETHOD(*call)) {
1123 case IPC_M_CLONE_ESTABLISH:
1124 case IPC_M_CONNECT_ME_TO:
1125 /* Open new connection with fibril, etc. */
1126 async_new_connection(call->in_task_id, IPC_GET_ARG5(*call),
1127 callid, call, fallback_port_handler, fallback_port_data);
1128 return;
1129 }
1130
1131 /* Try to route the call through the connection hash table */
1132 if (route_call(callid, call))
1133 return;
1134
1135 /* Unknown call from unknown phone - hang it up */
1136 ipc_answer_0(callid, EHANGUP);
1137}
1138
1139/** Fire all timeouts that expired. */
1140static void handle_expired_timeouts(void)
1141{
1142 struct timeval tv;
1143 getuptime(&tv);
1144
1145 futex_down(&async_futex);
1146
1147 link_t *cur = list_first(&timeout_list);
1148 while (cur != NULL) {
1149 awaiter_t *waiter =
1150 list_get_instance(cur, awaiter_t, to_event.link);
1151
1152 if (tv_gt(&waiter->to_event.expires, &tv))
1153 break;
1154
1155 list_remove(&waiter->to_event.link);
1156 waiter->to_event.inlist = false;
1157 waiter->to_event.occurred = true;
1158
1159 /*
1160 * Redundant condition?
1161 * The fibril should not be active when it gets here.
1162 */
1163 if (!waiter->active) {
1164 waiter->active = true;
1165 fibril_add_ready(waiter->fid);
1166 }
1167
1168 cur = list_first(&timeout_list);
1169 }
1170
1171 futex_up(&async_futex);
1172}
1173
1174/** Endless loop dispatching incoming calls and answers.
1175 *
1176 * @return Never returns.
1177 *
1178 */
1179static int async_manager_worker(void)
1180{
1181 while (true) {
1182 if (fibril_switch(FIBRIL_FROM_MANAGER)) {
1183 futex_up(&async_futex);
1184 /*
1185 * async_futex is always held when entering a manager
1186 * fibril.
1187 */
1188 continue;
1189 }
1190
1191 futex_down(&async_futex);
1192
1193 suseconds_t timeout;
1194 unsigned int flags = SYNCH_FLAGS_NONE;
1195 if (!list_empty(&timeout_list)) {
1196 awaiter_t *waiter = list_get_instance(
1197 list_first(&timeout_list), awaiter_t, to_event.link);
1198
1199 struct timeval tv;
1200 getuptime(&tv);
1201
1202 if (tv_gteq(&tv, &waiter->to_event.expires)) {
1203 futex_up(&async_futex);
1204 handle_expired_timeouts();
1205 /*
1206 * Notice that even if the event(s) already
1207 * expired (and thus the other fibril was
1208 * supposed to be running already),
1209 * we check for incoming IPC.
1210 *
1211 * Otherwise, a fibril that continuously
1212 * creates (almost) expired events could
1213 * prevent IPC retrieval from the kernel.
1214 */
1215 timeout = 0;
1216 flags = SYNCH_FLAGS_NON_BLOCKING;
1217
1218 } else {
1219 timeout = tv_sub_diff(&waiter->to_event.expires,
1220 &tv);
1221 futex_up(&async_futex);
1222 }
1223 } else {
1224 futex_up(&async_futex);
1225 timeout = SYNCH_NO_TIMEOUT;
1226 }
1227
1228 atomic_inc(&threads_in_ipc_wait);
1229
1230 ipc_call_t call;
1231 ipc_callid_t callid = ipc_wait_cycle(&call, timeout, flags);
1232
1233 atomic_dec(&threads_in_ipc_wait);
1234
1235 if (!callid) {
1236 handle_expired_timeouts();
1237 continue;
1238 }
1239
1240 if (callid & IPC_CALLID_ANSWERED)
1241 continue;
1242
1243 handle_call(callid, &call);
1244 }
1245
1246 return 0;
1247}
1248
1249/** Function to start async_manager as a standalone fibril.
1250 *
1251 * When more kernel threads are used, one async manager should exist per thread.
1252 *
1253 * @param arg Unused.
1254 * @return Never returns.
1255 *
1256 */
1257static int async_manager_fibril(void *arg)
1258{
1259 futex_up(&async_futex);
1260
1261 /*
1262 * async_futex is always locked when entering manager
1263 */
1264 async_manager_worker();
1265
1266 return 0;
1267}
1268
1269/** Add one manager to manager list. */
1270void async_create_manager(void)
1271{
1272 fid_t fid = fibril_create(async_manager_fibril, NULL);
1273 if (fid != 0)
1274 fibril_add_manager(fid);
1275}
1276
1277/** Remove one manager from manager list */
1278void async_destroy_manager(void)
1279{
1280 fibril_remove_manager();
1281}
1282
1283/** Initialize the async framework.
1284 *
1285 */
1286void __async_init(void)
1287{
1288 if (!hash_table_create(&client_hash_table, 0, 0, &client_hash_table_ops))
1289 abort();
1290
1291 if (!hash_table_create(&conn_hash_table, 0, 0, &conn_hash_table_ops))
1292 abort();
1293
1294 if (!hash_table_create(&notification_hash_table, 0, 0,
1295 &notification_hash_table_ops))
1296 abort();
1297
1298 session_ns = (async_sess_t *) malloc(sizeof(async_sess_t));
1299 if (session_ns == NULL)
1300 abort();
1301
1302 session_ns->mgmt = EXCHANGE_ATOMIC;
1303 session_ns->phone = PHONE_NS;
1304 session_ns->arg1 = 0;
1305 session_ns->arg2 = 0;
1306 session_ns->arg3 = 0;
1307
1308 fibril_mutex_initialize(&session_ns->remote_state_mtx);
1309 session_ns->remote_state_data = NULL;
1310
1311 list_initialize(&session_ns->exch_list);
1312 fibril_mutex_initialize(&session_ns->mutex);
1313 atomic_set(&session_ns->refcnt, 0);
1314}
1315
1316/** Reply received callback.
1317 *
1318 * This function is called whenever a reply for an asynchronous message sent out
1319 * by the asynchronous framework is received.
1320 *
1321 * Notify the fibril which is waiting for this message that it has arrived.
1322 *
1323 * @param arg Pointer to the asynchronous message record.
1324 * @param retval Value returned in the answer.
1325 * @param data Call data of the answer.
1326 *
1327 */
1328void reply_received(void *arg, int retval, ipc_call_t *data)
1329{
1330 assert(arg);
1331
1332 futex_down(&async_futex);
1333
1334 amsg_t *msg = (amsg_t *) arg;
1335 msg->retval = retval;
1336
1337 /* Copy data after futex_down, just in case the call was detached */
1338 if ((msg->dataptr) && (data))
1339 *msg->dataptr = *data;
1340
1341 write_barrier();
1342
1343 /* Remove message from timeout list */
1344 if (msg->wdata.to_event.inlist)
1345 list_remove(&msg->wdata.to_event.link);
1346
1347 msg->done = true;
1348
1349 if (msg->forget) {
1350 assert(msg->wdata.active);
1351 amsg_destroy(msg);
1352 } else if (!msg->wdata.active) {
1353 msg->wdata.active = true;
1354 fibril_add_ready(msg->wdata.fid);
1355 }
1356
1357 futex_up(&async_futex);
1358}
1359
1360/** Send message and return id of the sent message.
1361 *
1362 * The return value can be used as input for async_wait() to wait for
1363 * completion.
1364 *
1365 * @param exch Exchange for sending the message.
1366 * @param imethod Service-defined interface and method.
1367 * @param arg1 Service-defined payload argument.
1368 * @param arg2 Service-defined payload argument.
1369 * @param arg3 Service-defined payload argument.
1370 * @param arg4 Service-defined payload argument.
1371 * @param dataptr If non-NULL, storage where the reply data will be
1372 * stored.
1373 *
1374 * @return Hash of the sent message or 0 on error.
1375 *
1376 */
1377aid_t async_send_fast(async_exch_t *exch, sysarg_t imethod, sysarg_t arg1,
1378 sysarg_t arg2, sysarg_t arg3, sysarg_t arg4, ipc_call_t *dataptr)
1379{
1380 if (exch == NULL)
1381 return 0;
1382
1383 amsg_t *msg = amsg_create();
1384 if (msg == NULL)
1385 return 0;
1386
1387 msg->dataptr = dataptr;
1388 msg->wdata.active = true;
1389
1390 ipc_call_async_4(exch->phone, imethod, arg1, arg2, arg3, arg4, msg,
1391 reply_received, true);
1392
1393 return (aid_t) msg;
1394}
1395
1396/** Send message and return id of the sent message
1397 *
1398 * The return value can be used as input for async_wait() to wait for
1399 * completion.
1400 *
1401 * @param exch Exchange for sending the message.
1402 * @param imethod Service-defined interface and method.
1403 * @param arg1 Service-defined payload argument.
1404 * @param arg2 Service-defined payload argument.
1405 * @param arg3 Service-defined payload argument.
1406 * @param arg4 Service-defined payload argument.
1407 * @param arg5 Service-defined payload argument.
1408 * @param dataptr If non-NULL, storage where the reply data will be
1409 * stored.
1410 *
1411 * @return Hash of the sent message or 0 on error.
1412 *
1413 */
1414aid_t async_send_slow(async_exch_t *exch, sysarg_t imethod, sysarg_t arg1,
1415 sysarg_t arg2, sysarg_t arg3, sysarg_t arg4, sysarg_t arg5,
1416 ipc_call_t *dataptr)
1417{
1418 if (exch == NULL)
1419 return 0;
1420
1421 amsg_t *msg = amsg_create();
1422 if (msg == NULL)
1423 return 0;
1424
1425 msg->dataptr = dataptr;
1426 msg->wdata.active = true;
1427
1428 ipc_call_async_5(exch->phone, imethod, arg1, arg2, arg3, arg4, arg5,
1429 msg, reply_received, true);
1430
1431 return (aid_t) msg;
1432}
1433
1434/** Wait for a message sent by the async framework.
1435 *
1436 * @param amsgid Hash of the message to wait for.
1437 * @param retval Pointer to storage where the retval of the answer will
1438 * be stored.
1439 *
1440 */
1441void async_wait_for(aid_t amsgid, sysarg_t *retval)
1442{
1443 assert(amsgid);
1444
1445 amsg_t *msg = (amsg_t *) amsgid;
1446
1447 futex_down(&async_futex);
1448
1449 assert(!msg->forget);
1450 assert(!msg->destroyed);
1451
1452 if (msg->done) {
1453 futex_up(&async_futex);
1454 goto done;
1455 }
1456
1457 msg->wdata.fid = fibril_get_id();
1458 msg->wdata.active = false;
1459 msg->wdata.to_event.inlist = false;
1460
1461 /* Leave the async_futex locked when entering this function */
1462 fibril_switch(FIBRIL_TO_MANAGER);
1463
1464 /* Futex is up automatically after fibril_switch */
1465
1466done:
1467 if (retval)
1468 *retval = msg->retval;
1469
1470 amsg_destroy(msg);
1471}
1472
1473/** Wait for a message sent by the async framework, timeout variant.
1474 *
1475 * If the wait times out, the caller may choose to either wait again by calling
1476 * async_wait_for() or async_wait_timeout(), or forget the message via
1477 * async_forget().
1478 *
1479 * @param amsgid Hash of the message to wait for.
1480 * @param retval Pointer to storage where the retval of the answer will
1481 * be stored.
1482 * @param timeout Timeout in microseconds.
1483 *
1484 * @return Zero on success, ETIMEOUT if the timeout has expired.
1485 *
1486 */
1487int async_wait_timeout(aid_t amsgid, sysarg_t *retval, suseconds_t timeout)
1488{
1489 assert(amsgid);
1490
1491 amsg_t *msg = (amsg_t *) amsgid;
1492
1493 futex_down(&async_futex);
1494
1495 assert(!msg->forget);
1496 assert(!msg->destroyed);
1497
1498 if (msg->done) {
1499 futex_up(&async_futex);
1500 goto done;
1501 }
1502
1503 /*
1504 * Negative timeout is converted to zero timeout to avoid
1505 * using tv_add with negative augmenter.
1506 */
1507 if (timeout < 0)
1508 timeout = 0;
1509
1510 getuptime(&msg->wdata.to_event.expires);
1511 tv_add_diff(&msg->wdata.to_event.expires, timeout);
1512
1513 /*
1514 * Current fibril is inserted as waiting regardless of the
1515 * "size" of the timeout.
1516 *
1517 * Checking for msg->done and immediately bailing out when
1518 * timeout == 0 would mean that the manager fibril would never
1519 * run (consider single threaded program).
1520 * Thus the IPC answer would be never retrieved from the kernel.
1521 *
1522 * Notice that the actual delay would be very small because we
1523 * - switch to manager fibril
1524 * - the manager sees expired timeout
1525 * - and thus adds us back to ready queue
1526 * - manager switches back to some ready fibril
1527 * (prior it, it checks for incoming IPC).
1528 *
1529 */
1530 msg->wdata.fid = fibril_get_id();
1531 msg->wdata.active = false;
1532 async_insert_timeout(&msg->wdata);
1533
1534 /* Leave the async_futex locked when entering this function */
1535 fibril_switch(FIBRIL_TO_MANAGER);
1536
1537 /* Futex is up automatically after fibril_switch */
1538
1539 if (!msg->done)
1540 return ETIMEOUT;
1541
1542done:
1543 if (retval)
1544 *retval = msg->retval;
1545
1546 amsg_destroy(msg);
1547
1548 return 0;
1549}
1550
1551/** Discard the message / reply on arrival.
1552 *
1553 * The message will be marked to be discarded once the reply arrives in
1554 * reply_received(). It is not allowed to call async_wait_for() or
1555 * async_wait_timeout() on this message after a call to this function.
1556 *
1557 * @param amsgid Hash of the message to forget.
1558 */
1559void async_forget(aid_t amsgid)
1560{
1561 amsg_t *msg = (amsg_t *) amsgid;
1562
1563 assert(msg);
1564 assert(!msg->forget);
1565 assert(!msg->destroyed);
1566
1567 futex_down(&async_futex);
1568 if (msg->done) {
1569 amsg_destroy(msg);
1570 } else {
1571 msg->dataptr = NULL;
1572 msg->forget = true;
1573 }
1574 futex_up(&async_futex);
1575}
1576
1577/** Wait for specified time.
1578 *
1579 * The current fibril is suspended but the thread continues to execute.
1580 *
1581 * @param timeout Duration of the wait in microseconds.
1582 *
1583 */
1584void async_usleep(suseconds_t timeout)
1585{
1586 amsg_t *msg = amsg_create();
1587 if (!msg)
1588 return;
1589
1590 msg->wdata.fid = fibril_get_id();
1591
1592 getuptime(&msg->wdata.to_event.expires);
1593 tv_add_diff(&msg->wdata.to_event.expires, timeout);
1594
1595 futex_down(&async_futex);
1596
1597 async_insert_timeout(&msg->wdata);
1598
1599 /* Leave the async_futex locked when entering this function */
1600 fibril_switch(FIBRIL_TO_MANAGER);
1601
1602 /* Futex is up automatically after fibril_switch() */
1603
1604 amsg_destroy(msg);
1605}
1606
1607/** Pseudo-synchronous message sending - fast version.
1608 *
1609 * Send message asynchronously and return only after the reply arrives.
1610 *
1611 * This function can only transfer 4 register payload arguments. For
1612 * transferring more arguments, see the slower async_req_slow().
1613 *
1614 * @param exch Exchange for sending the message.
1615 * @param imethod Interface and method of the call.
1616 * @param arg1 Service-defined payload argument.
1617 * @param arg2 Service-defined payload argument.
1618 * @param arg3 Service-defined payload argument.
1619 * @param arg4 Service-defined payload argument.
1620 * @param r1 If non-NULL, storage for the 1st reply argument.
1621 * @param r2 If non-NULL, storage for the 2nd reply argument.
1622 * @param r3 If non-NULL, storage for the 3rd reply argument.
1623 * @param r4 If non-NULL, storage for the 4th reply argument.
1624 * @param r5 If non-NULL, storage for the 5th reply argument.
1625 *
1626 * @return Return code of the reply or a negative error code.
1627 *
1628 */
1629sysarg_t async_req_fast(async_exch_t *exch, sysarg_t imethod, sysarg_t arg1,
1630 sysarg_t arg2, sysarg_t arg3, sysarg_t arg4, sysarg_t *r1, sysarg_t *r2,
1631 sysarg_t *r3, sysarg_t *r4, sysarg_t *r5)
1632{
1633 if (exch == NULL)
1634 return ENOENT;
1635
1636 ipc_call_t result;
1637 aid_t aid = async_send_4(exch, imethod, arg1, arg2, arg3, arg4,
1638 &result);
1639
1640 sysarg_t rc;
1641 async_wait_for(aid, &rc);
1642
1643 if (r1)
1644 *r1 = IPC_GET_ARG1(result);
1645
1646 if (r2)
1647 *r2 = IPC_GET_ARG2(result);
1648
1649 if (r3)
1650 *r3 = IPC_GET_ARG3(result);
1651
1652 if (r4)
1653 *r4 = IPC_GET_ARG4(result);
1654
1655 if (r5)
1656 *r5 = IPC_GET_ARG5(result);
1657
1658 return rc;
1659}
1660
1661/** Pseudo-synchronous message sending - slow version.
1662 *
1663 * Send message asynchronously and return only after the reply arrives.
1664 *
1665 * @param exch Exchange for sending the message.
1666 * @param imethod Interface and method of the call.
1667 * @param arg1 Service-defined payload argument.
1668 * @param arg2 Service-defined payload argument.
1669 * @param arg3 Service-defined payload argument.
1670 * @param arg4 Service-defined payload argument.
1671 * @param arg5 Service-defined payload argument.
1672 * @param r1 If non-NULL, storage for the 1st reply argument.
1673 * @param r2 If non-NULL, storage for the 2nd reply argument.
1674 * @param r3 If non-NULL, storage for the 3rd reply argument.
1675 * @param r4 If non-NULL, storage for the 4th reply argument.
1676 * @param r5 If non-NULL, storage for the 5th reply argument.
1677 *
1678 * @return Return code of the reply or a negative error code.
1679 *
1680 */
1681sysarg_t async_req_slow(async_exch_t *exch, sysarg_t imethod, sysarg_t arg1,
1682 sysarg_t arg2, sysarg_t arg3, sysarg_t arg4, sysarg_t arg5, sysarg_t *r1,
1683 sysarg_t *r2, sysarg_t *r3, sysarg_t *r4, sysarg_t *r5)
1684{
1685 if (exch == NULL)
1686 return ENOENT;
1687
1688 ipc_call_t result;
1689 aid_t aid = async_send_5(exch, imethod, arg1, arg2, arg3, arg4, arg5,
1690 &result);
1691
1692 sysarg_t rc;
1693 async_wait_for(aid, &rc);
1694
1695 if (r1)
1696 *r1 = IPC_GET_ARG1(result);
1697
1698 if (r2)
1699 *r2 = IPC_GET_ARG2(result);
1700
1701 if (r3)
1702 *r3 = IPC_GET_ARG3(result);
1703
1704 if (r4)
1705 *r4 = IPC_GET_ARG4(result);
1706
1707 if (r5)
1708 *r5 = IPC_GET_ARG5(result);
1709
1710 return rc;
1711}
1712
1713void async_msg_0(async_exch_t *exch, sysarg_t imethod)
1714{
1715 if (exch != NULL)
1716 ipc_call_async_0(exch->phone, imethod, NULL, NULL, true);
1717}
1718
1719void async_msg_1(async_exch_t *exch, sysarg_t imethod, sysarg_t arg1)
1720{
1721 if (exch != NULL)
1722 ipc_call_async_1(exch->phone, imethod, arg1, NULL, NULL, true);
1723}
1724
1725void async_msg_2(async_exch_t *exch, sysarg_t imethod, sysarg_t arg1,
1726 sysarg_t arg2)
1727{
1728 if (exch != NULL)
1729 ipc_call_async_2(exch->phone, imethod, arg1, arg2, NULL, NULL,
1730 true);
1731}
1732
1733void async_msg_3(async_exch_t *exch, sysarg_t imethod, sysarg_t arg1,
1734 sysarg_t arg2, sysarg_t arg3)
1735{
1736 if (exch != NULL)
1737 ipc_call_async_3(exch->phone, imethod, arg1, arg2, arg3, NULL,
1738 NULL, true);
1739}
1740
1741void async_msg_4(async_exch_t *exch, sysarg_t imethod, sysarg_t arg1,
1742 sysarg_t arg2, sysarg_t arg3, sysarg_t arg4)
1743{
1744 if (exch != NULL)
1745 ipc_call_async_4(exch->phone, imethod, arg1, arg2, arg3, arg4,
1746 NULL, NULL, true);
1747}
1748
1749void async_msg_5(async_exch_t *exch, sysarg_t imethod, sysarg_t arg1,
1750 sysarg_t arg2, sysarg_t arg3, sysarg_t arg4, sysarg_t arg5)
1751{
1752 if (exch != NULL)
1753 ipc_call_async_5(exch->phone, imethod, arg1, arg2, arg3, arg4,
1754 arg5, NULL, NULL, true);
1755}
1756
1757sysarg_t async_answer_0(ipc_callid_t callid, sysarg_t retval)
1758{
1759 return ipc_answer_0(callid, retval);
1760}
1761
1762sysarg_t async_answer_1(ipc_callid_t callid, sysarg_t retval, sysarg_t arg1)
1763{
1764 return ipc_answer_1(callid, retval, arg1);
1765}
1766
1767sysarg_t async_answer_2(ipc_callid_t callid, sysarg_t retval, sysarg_t arg1,
1768 sysarg_t arg2)
1769{
1770 return ipc_answer_2(callid, retval, arg1, arg2);
1771}
1772
1773sysarg_t async_answer_3(ipc_callid_t callid, sysarg_t retval, sysarg_t arg1,
1774 sysarg_t arg2, sysarg_t arg3)
1775{
1776 return ipc_answer_3(callid, retval, arg1, arg2, arg3);
1777}
1778
1779sysarg_t async_answer_4(ipc_callid_t callid, sysarg_t retval, sysarg_t arg1,
1780 sysarg_t arg2, sysarg_t arg3, sysarg_t arg4)
1781{
1782 return ipc_answer_4(callid, retval, arg1, arg2, arg3, arg4);
1783}
1784
1785sysarg_t async_answer_5(ipc_callid_t callid, sysarg_t retval, sysarg_t arg1,
1786 sysarg_t arg2, sysarg_t arg3, sysarg_t arg4, sysarg_t arg5)
1787{
1788 return ipc_answer_5(callid, retval, arg1, arg2, arg3, arg4, arg5);
1789}
1790
1791int async_forward_fast(ipc_callid_t callid, async_exch_t *exch,
1792 sysarg_t imethod, sysarg_t arg1, sysarg_t arg2, unsigned int mode)
1793{
1794 if (exch == NULL)
1795 return ENOENT;
1796
1797 return ipc_forward_fast(callid, exch->phone, imethod, arg1, arg2, mode);
1798}
1799
1800int async_forward_slow(ipc_callid_t callid, async_exch_t *exch,
1801 sysarg_t imethod, sysarg_t arg1, sysarg_t arg2, sysarg_t arg3,
1802 sysarg_t arg4, sysarg_t arg5, unsigned int mode)
1803{
1804 if (exch == NULL)
1805 return ENOENT;
1806
1807 return ipc_forward_slow(callid, exch->phone, imethod, arg1, arg2, arg3,
1808 arg4, arg5, mode);
1809}
1810
1811/** Wrapper for making IPC_M_CONNECT_TO_ME calls using the async framework.
1812 *
1813 * Ask through phone for a new connection to some service.
1814 *
1815 * @param exch Exchange for sending the message.
1816 * @param arg1 User defined argument.
1817 * @param arg2 User defined argument.
1818 * @param arg3 User defined argument.
1819 * @param client_receiver Connection handing routine.
1820 *
1821 * @return Zero on success or a negative error code.
1822 *
1823 */
1824int async_connect_to_me(async_exch_t *exch, sysarg_t arg1, sysarg_t arg2,
1825 sysarg_t arg3, async_port_handler_t client_receiver, void *data)
1826{
1827 if (exch == NULL)
1828 return ENOENT;
1829
1830 sysarg_t phone_hash;
1831 sysarg_t rc;
1832
1833 aid_t req;
1834 ipc_call_t answer;
1835 req = async_send_3(exch, IPC_M_CONNECT_TO_ME, arg1, arg2, arg3,
1836 &answer);
1837 async_wait_for(req, &rc);
1838 if (rc != EOK)
1839 return (int) rc;
1840
1841 phone_hash = IPC_GET_ARG5(answer);
1842
1843 if (client_receiver != NULL)
1844 async_new_connection(answer.in_task_id, phone_hash, 0, NULL,
1845 client_receiver, data);
1846
1847 return EOK;
1848}
1849
1850/** Wrapper for making IPC_M_CLONE_ESTABLISH calls using the async framework.
1851 *
1852 * Ask for a cloned connection to some service.
1853 *
1854 * @param mgmt Exchange management style.
1855 * @param exch Exchange for sending the message.
1856 *
1857 * @return New session on success or NULL on error.
1858 *
1859 */
1860async_sess_t *async_clone_establish(exch_mgmt_t mgmt, async_exch_t *exch)
1861{
1862 if (exch == NULL) {
1863 errno = ENOENT;
1864 return NULL;
1865 }
1866
1867 async_sess_t *sess = (async_sess_t *) malloc(sizeof(async_sess_t));
1868 if (sess == NULL) {
1869 errno = ENOMEM;
1870 return NULL;
1871 }
1872
1873 ipc_call_t result;
1874
1875 amsg_t *msg = amsg_create();
1876 if (!msg) {
1877 free(sess);
1878 errno = ENOMEM;
1879 return NULL;
1880 }
1881
1882 msg->dataptr = &result;
1883 msg->wdata.active = true;
1884
1885 ipc_call_async_0(exch->phone, IPC_M_CLONE_ESTABLISH, msg,
1886 reply_received, true);
1887
1888 sysarg_t rc;
1889 async_wait_for((aid_t) msg, &rc);
1890
1891 if (rc != EOK) {
1892 errno = rc;
1893 free(sess);
1894 return NULL;
1895 }
1896
1897 int phone = (int) IPC_GET_ARG5(result);
1898
1899 if (phone < 0) {
1900 errno = phone;
1901 free(sess);
1902 return NULL;
1903 }
1904
1905 sess->mgmt = mgmt;
1906 sess->phone = phone;
1907 sess->arg1 = 0;
1908 sess->arg2 = 0;
1909 sess->arg3 = 0;
1910
1911 fibril_mutex_initialize(&sess->remote_state_mtx);
1912 sess->remote_state_data = NULL;
1913
1914 list_initialize(&sess->exch_list);
1915 fibril_mutex_initialize(&sess->mutex);
1916 atomic_set(&sess->refcnt, 0);
1917
1918 return sess;
1919}
1920
1921static int async_connect_me_to_internal(int phone, sysarg_t arg1, sysarg_t arg2,
1922 sysarg_t arg3, sysarg_t arg4)
1923{
1924 ipc_call_t result;
1925
1926 amsg_t *msg = amsg_create();
1927 if (!msg)
1928 return ENOENT;
1929
1930 msg->dataptr = &result;
1931 msg->wdata.active = true;
1932
1933 ipc_call_async_4(phone, IPC_M_CONNECT_ME_TO, arg1, arg2, arg3, arg4,
1934 msg, reply_received, true);
1935
1936 sysarg_t rc;
1937 async_wait_for((aid_t) msg, &rc);
1938
1939 if (rc != EOK)
1940 return rc;
1941
1942 return (int) IPC_GET_ARG5(result);
1943}
1944
1945/** Wrapper for making IPC_M_CONNECT_ME_TO calls using the async framework.
1946 *
1947 * Ask through for a new connection to some service.
1948 *
1949 * @param mgmt Exchange management style.
1950 * @param exch Exchange for sending the message.
1951 * @param arg1 User defined argument.
1952 * @param arg2 User defined argument.
1953 * @param arg3 User defined argument.
1954 *
1955 * @return New session on success or NULL on error.
1956 *
1957 */
1958async_sess_t *async_connect_me_to(exch_mgmt_t mgmt, async_exch_t *exch,
1959 sysarg_t arg1, sysarg_t arg2, sysarg_t arg3)
1960{
1961 if (exch == NULL) {
1962 errno = ENOENT;
1963 return NULL;
1964 }
1965
1966 async_sess_t *sess = (async_sess_t *) malloc(sizeof(async_sess_t));
1967 if (sess == NULL) {
1968 errno = ENOMEM;
1969 return NULL;
1970 }
1971
1972 int phone = async_connect_me_to_internal(exch->phone, arg1, arg2, arg3,
1973 0);
1974
1975 if (phone < 0) {
1976 errno = phone;
1977 free(sess);
1978 return NULL;
1979 }
1980
1981 sess->mgmt = mgmt;
1982 sess->phone = phone;
1983 sess->arg1 = arg1;
1984 sess->arg2 = arg2;
1985 sess->arg3 = arg3;
1986
1987 fibril_mutex_initialize(&sess->remote_state_mtx);
1988 sess->remote_state_data = NULL;
1989
1990 list_initialize(&sess->exch_list);
1991 fibril_mutex_initialize(&sess->mutex);
1992 atomic_set(&sess->refcnt, 0);
1993
1994 return sess;
1995}
1996
1997/** Set arguments for new connections.
1998 *
1999 * FIXME This is an ugly hack to work around the problem that parallel
2000 * exchanges are implemented using parallel connections. When we create
2001 * a callback session, the framework does not know arguments for the new
2002 * connections.
2003 *
2004 * The proper solution seems to be to implement parallel exchanges using
2005 * tagging.
2006 */
2007void async_sess_args_set(async_sess_t *sess, sysarg_t arg1, sysarg_t arg2,
2008 sysarg_t arg3)
2009{
2010 sess->arg1 = arg1;
2011 sess->arg2 = arg2;
2012 sess->arg3 = arg3;
2013}
2014
2015/** Wrapper for making IPC_M_CONNECT_ME_TO calls using the async framework.
2016 *
2017 * Ask through phone for a new connection to some service and block until
2018 * success.
2019 *
2020 * @param mgmt Exchange management style.
2021 * @param exch Exchange for sending the message.
2022 * @param arg1 User defined argument.
2023 * @param arg2 User defined argument.
2024 * @param arg3 User defined argument.
2025 *
2026 * @return New session on success or NULL on error.
2027 *
2028 */
2029async_sess_t *async_connect_me_to_blocking(exch_mgmt_t mgmt, async_exch_t *exch,
2030 sysarg_t arg1, sysarg_t arg2, sysarg_t arg3)
2031{
2032 if (exch == NULL) {
2033 errno = ENOENT;
2034 return NULL;
2035 }
2036
2037 async_sess_t *sess = (async_sess_t *) malloc(sizeof(async_sess_t));
2038 if (sess == NULL) {
2039 errno = ENOMEM;
2040 return NULL;
2041 }
2042
2043 int phone = async_connect_me_to_internal(exch->phone, arg1, arg2, arg3,
2044 IPC_FLAG_BLOCKING);
2045
2046 if (phone < 0) {
2047 errno = phone;
2048 free(sess);
2049 return NULL;
2050 }
2051
2052 sess->mgmt = mgmt;
2053 sess->phone = phone;
2054 sess->arg1 = arg1;
2055 sess->arg2 = arg2;
2056 sess->arg3 = arg3;
2057
2058 fibril_mutex_initialize(&sess->remote_state_mtx);
2059 sess->remote_state_data = NULL;
2060
2061 list_initialize(&sess->exch_list);
2062 fibril_mutex_initialize(&sess->mutex);
2063 atomic_set(&sess->refcnt, 0);
2064
2065 return sess;
2066}
2067
2068/** Connect to a task specified by id.
2069 *
2070 */
2071async_sess_t *async_connect_kbox(task_id_t id)
2072{
2073 async_sess_t *sess = (async_sess_t *) malloc(sizeof(async_sess_t));
2074 if (sess == NULL) {
2075 errno = ENOMEM;
2076 return NULL;
2077 }
2078
2079 int phone = ipc_connect_kbox(id);
2080 if (phone < 0) {
2081 errno = phone;
2082 free(sess);
2083 return NULL;
2084 }
2085
2086 sess->mgmt = EXCHANGE_ATOMIC;
2087 sess->phone = phone;
2088 sess->arg1 = 0;
2089 sess->arg2 = 0;
2090 sess->arg3 = 0;
2091
2092 fibril_mutex_initialize(&sess->remote_state_mtx);
2093 sess->remote_state_data = NULL;
2094
2095 list_initialize(&sess->exch_list);
2096 fibril_mutex_initialize(&sess->mutex);
2097 atomic_set(&sess->refcnt, 0);
2098
2099 return sess;
2100}
2101
2102static int async_hangup_internal(int phone)
2103{
2104 return ipc_hangup(phone);
2105}
2106
2107/** Wrapper for ipc_hangup.
2108 *
2109 * @param sess Session to hung up.
2110 *
2111 * @return Zero on success or a negative error code.
2112 *
2113 */
2114int async_hangup(async_sess_t *sess)
2115{
2116 async_exch_t *exch;
2117
2118 assert(sess);
2119
2120 if (atomic_get(&sess->refcnt) > 0)
2121 return EBUSY;
2122
2123 fibril_mutex_lock(&async_sess_mutex);
2124
2125 int rc = async_hangup_internal(sess->phone);
2126
2127 while (!list_empty(&sess->exch_list)) {
2128 exch = (async_exch_t *)
2129 list_get_instance(list_first(&sess->exch_list),
2130 async_exch_t, sess_link);
2131
2132 list_remove(&exch->sess_link);
2133 list_remove(&exch->global_link);
2134 async_hangup_internal(exch->phone);
2135 free(exch);
2136 }
2137
2138 free(sess);
2139
2140 fibril_mutex_unlock(&async_sess_mutex);
2141
2142 return rc;
2143}
2144
2145/** Interrupt one thread of this task from waiting for IPC. */
2146void async_poke(void)
2147{
2148 ipc_poke();
2149}
2150
2151/** Start new exchange in a session.
2152 *
2153 * @param session Session.
2154 *
2155 * @return New exchange or NULL on error.
2156 *
2157 */
2158async_exch_t *async_exchange_begin(async_sess_t *sess)
2159{
2160 if (sess == NULL)
2161 return NULL;
2162
2163 async_exch_t *exch;
2164
2165 fibril_mutex_lock(&async_sess_mutex);
2166
2167 if (!list_empty(&sess->exch_list)) {
2168 /*
2169 * There are inactive exchanges in the session.
2170 */
2171 exch = (async_exch_t *)
2172 list_get_instance(list_first(&sess->exch_list),
2173 async_exch_t, sess_link);
2174
2175 list_remove(&exch->sess_link);
2176 list_remove(&exch->global_link);
2177 } else {
2178 /*
2179 * There are no available exchanges in the session.
2180 */
2181
2182 if ((sess->mgmt == EXCHANGE_ATOMIC) ||
2183 (sess->mgmt == EXCHANGE_SERIALIZE)) {
2184 exch = (async_exch_t *) malloc(sizeof(async_exch_t));
2185 if (exch != NULL) {
2186 link_initialize(&exch->sess_link);
2187 link_initialize(&exch->global_link);
2188 exch->sess = sess;
2189 exch->phone = sess->phone;
2190 }
2191 } else { /* EXCHANGE_PARALLEL */
2192 /*
2193 * Make a one-time attempt to connect a new data phone.
2194 */
2195
2196 int phone;
2197
2198retry:
2199 phone = async_connect_me_to_internal(sess->phone, sess->arg1,
2200 sess->arg2, sess->arg3, 0);
2201 if (phone >= 0) {
2202 exch = (async_exch_t *) malloc(sizeof(async_exch_t));
2203 if (exch != NULL) {
2204 link_initialize(&exch->sess_link);
2205 link_initialize(&exch->global_link);
2206 exch->sess = sess;
2207 exch->phone = phone;
2208 } else
2209 async_hangup_internal(phone);
2210 } else if (!list_empty(&inactive_exch_list)) {
2211 /*
2212 * We did not manage to connect a new phone. But we
2213 * can try to close some of the currently inactive
2214 * connections in other sessions and try again.
2215 */
2216 exch = (async_exch_t *)
2217 list_get_instance(list_first(&inactive_exch_list),
2218 async_exch_t, global_link);
2219
2220 list_remove(&exch->sess_link);
2221 list_remove(&exch->global_link);
2222 async_hangup_internal(exch->phone);
2223 free(exch);
2224 goto retry;
2225 } else {
2226 /*
2227 * Wait for a phone to become available.
2228 */
2229 fibril_condvar_wait(&avail_phone_cv, &async_sess_mutex);
2230 goto retry;
2231 }
2232 }
2233 }
2234
2235 fibril_mutex_unlock(&async_sess_mutex);
2236
2237 if (exch != NULL) {
2238 atomic_inc(&sess->refcnt);
2239
2240 if (sess->mgmt == EXCHANGE_SERIALIZE)
2241 fibril_mutex_lock(&sess->mutex);
2242 }
2243
2244 return exch;
2245}
2246
2247/** Finish an exchange.
2248 *
2249 * @param exch Exchange to finish.
2250 *
2251 */
2252void async_exchange_end(async_exch_t *exch)
2253{
2254 if (exch == NULL)
2255 return;
2256
2257 async_sess_t *sess = exch->sess;
2258 assert(sess != NULL);
2259
2260 atomic_dec(&sess->refcnt);
2261
2262 if (sess->mgmt == EXCHANGE_SERIALIZE)
2263 fibril_mutex_unlock(&sess->mutex);
2264
2265 fibril_mutex_lock(&async_sess_mutex);
2266
2267 list_append(&exch->sess_link, &sess->exch_list);
2268 list_append(&exch->global_link, &inactive_exch_list);
2269 fibril_condvar_signal(&avail_phone_cv);
2270
2271 fibril_mutex_unlock(&async_sess_mutex);
2272}
2273
2274/** Wrapper for IPC_M_SHARE_IN calls using the async framework.
2275 *
2276 * @param exch Exchange for sending the message.
2277 * @param size Size of the destination address space area.
2278 * @param arg User defined argument.
2279 * @param flags Storage for the received flags. Can be NULL.
2280 * @param dst Address of the storage for the destination address space area
2281 * base address. Cannot be NULL.
2282 *
2283 * @return Zero on success or a negative error code from errno.h.
2284 *
2285 */
2286int async_share_in_start(async_exch_t *exch, size_t size, sysarg_t arg,
2287 unsigned int *flags, void **dst)
2288{
2289 if (exch == NULL)
2290 return ENOENT;
2291
2292 sysarg_t _flags = 0;
2293 sysarg_t _dst = (sysarg_t) -1;
2294 int res = async_req_2_4(exch, IPC_M_SHARE_IN, (sysarg_t) size,
2295 arg, NULL, &_flags, NULL, &_dst);
2296
2297 if (flags)
2298 *flags = (unsigned int) _flags;
2299
2300 *dst = (void *) _dst;
2301 return res;
2302}
2303
2304/** Wrapper for receiving the IPC_M_SHARE_IN calls using the async framework.
2305 *
2306 * This wrapper only makes it more comfortable to receive IPC_M_SHARE_IN
2307 * calls so that the user doesn't have to remember the meaning of each IPC
2308 * argument.
2309 *
2310 * So far, this wrapper is to be used from within a connection fibril.
2311 *
2312 * @param callid Storage for the hash of the IPC_M_SHARE_IN call.
2313 * @param size Destination address space area size.
2314 *
2315 * @return True on success, false on failure.
2316 *
2317 */
2318bool async_share_in_receive(ipc_callid_t *callid, size_t *size)
2319{
2320 assert(callid);
2321 assert(size);
2322
2323 ipc_call_t data;
2324 *callid = async_get_call(&data);
2325
2326 if (IPC_GET_IMETHOD(data) != IPC_M_SHARE_IN)
2327 return false;
2328
2329 *size = (size_t) IPC_GET_ARG1(data);
2330 return true;
2331}
2332
2333/** Wrapper for answering the IPC_M_SHARE_IN calls using the async framework.
2334 *
2335 * This wrapper only makes it more comfortable to answer IPC_M_SHARE_IN
2336 * calls so that the user doesn't have to remember the meaning of each IPC
2337 * argument.
2338 *
2339 * @param callid Hash of the IPC_M_DATA_READ call to answer.
2340 * @param src Source address space base.
2341 * @param flags Flags to be used for sharing. Bits can be only cleared.
2342 *
2343 * @return Zero on success or a value from @ref errno.h on failure.
2344 *
2345 */
2346int async_share_in_finalize(ipc_callid_t callid, void *src, unsigned int flags)
2347{
2348 return ipc_answer_3(callid, EOK, (sysarg_t) src, (sysarg_t) flags,
2349 (sysarg_t) __entry);
2350}
2351
2352/** Wrapper for IPC_M_SHARE_OUT calls using the async framework.
2353 *
2354 * @param exch Exchange for sending the message.
2355 * @param src Source address space area base address.
2356 * @param flags Flags to be used for sharing. Bits can be only cleared.
2357 *
2358 * @return Zero on success or a negative error code from errno.h.
2359 *
2360 */
2361int async_share_out_start(async_exch_t *exch, void *src, unsigned int flags)
2362{
2363 if (exch == NULL)
2364 return ENOENT;
2365
2366 return async_req_3_0(exch, IPC_M_SHARE_OUT, (sysarg_t) src, 0,
2367 (sysarg_t) flags);
2368}
2369
2370/** Wrapper for receiving the IPC_M_SHARE_OUT calls using the async framework.
2371 *
2372 * This wrapper only makes it more comfortable to receive IPC_M_SHARE_OUT
2373 * calls so that the user doesn't have to remember the meaning of each IPC
2374 * argument.
2375 *
2376 * So far, this wrapper is to be used from within a connection fibril.
2377 *
2378 * @param callid Storage for the hash of the IPC_M_SHARE_OUT call.
2379 * @param size Storage for the source address space area size.
2380 * @param flags Storage for the sharing flags.
2381 *
2382 * @return True on success, false on failure.
2383 *
2384 */
2385bool async_share_out_receive(ipc_callid_t *callid, size_t *size, unsigned int *flags)
2386{
2387 assert(callid);
2388 assert(size);
2389 assert(flags);
2390
2391 ipc_call_t data;
2392 *callid = async_get_call(&data);
2393
2394 if (IPC_GET_IMETHOD(data) != IPC_M_SHARE_OUT)
2395 return false;
2396
2397 *size = (size_t) IPC_GET_ARG2(data);
2398 *flags = (unsigned int) IPC_GET_ARG3(data);
2399 return true;
2400}
2401
2402/** Wrapper for answering the IPC_M_SHARE_OUT calls using the async framework.
2403 *
2404 * This wrapper only makes it more comfortable to answer IPC_M_SHARE_OUT
2405 * calls so that the user doesn't have to remember the meaning of each IPC
2406 * argument.
2407 *
2408 * @param callid Hash of the IPC_M_DATA_WRITE call to answer.
2409 * @param dst Address of the storage for the destination address space area
2410 * base address.
2411 *
2412 * @return Zero on success or a value from @ref errno.h on failure.
2413 *
2414 */
2415int async_share_out_finalize(ipc_callid_t callid, void **dst)
2416{
2417 return ipc_answer_2(callid, EOK, (sysarg_t) __entry, (sysarg_t) dst);
2418}
2419
2420/** Start IPC_M_DATA_READ using the async framework.
2421 *
2422 * @param exch Exchange for sending the message.
2423 * @param dst Address of the beginning of the destination buffer.
2424 * @param size Size of the destination buffer (in bytes).
2425 * @param dataptr Storage of call data (arg 2 holds actual data size).
2426 *
2427 * @return Hash of the sent message or 0 on error.
2428 *
2429 */
2430aid_t async_data_read(async_exch_t *exch, void *dst, size_t size,
2431 ipc_call_t *dataptr)
2432{
2433 return async_send_2(exch, IPC_M_DATA_READ, (sysarg_t) dst,
2434 (sysarg_t) size, dataptr);
2435}
2436
2437/** Wrapper for IPC_M_DATA_READ calls using the async framework.
2438 *
2439 * @param exch Exchange for sending the message.
2440 * @param dst Address of the beginning of the destination buffer.
2441 * @param size Size of the destination buffer.
2442 *
2443 * @return Zero on success or a negative error code from errno.h.
2444 *
2445 */
2446int async_data_read_start(async_exch_t *exch, void *dst, size_t size)
2447{
2448 if (exch == NULL)
2449 return ENOENT;
2450
2451 return async_req_2_0(exch, IPC_M_DATA_READ, (sysarg_t) dst,
2452 (sysarg_t) size);
2453}
2454
2455/** Wrapper for receiving the IPC_M_DATA_READ calls using the async framework.
2456 *
2457 * This wrapper only makes it more comfortable to receive IPC_M_DATA_READ
2458 * calls so that the user doesn't have to remember the meaning of each IPC
2459 * argument.
2460 *
2461 * So far, this wrapper is to be used from within a connection fibril.
2462 *
2463 * @param callid Storage for the hash of the IPC_M_DATA_READ.
2464 * @param size Storage for the maximum size. Can be NULL.
2465 *
2466 * @return True on success, false on failure.
2467 *
2468 */
2469bool async_data_read_receive(ipc_callid_t *callid, size_t *size)
2470{
2471 ipc_call_t data;
2472 return async_data_read_receive_call(callid, &data, size);
2473}
2474
2475/** Wrapper for receiving the IPC_M_DATA_READ calls using the async framework.
2476 *
2477 * This wrapper only makes it more comfortable to receive IPC_M_DATA_READ
2478 * calls so that the user doesn't have to remember the meaning of each IPC
2479 * argument.
2480 *
2481 * So far, this wrapper is to be used from within a connection fibril.
2482 *
2483 * @param callid Storage for the hash of the IPC_M_DATA_READ.
2484 * @param size Storage for the maximum size. Can be NULL.
2485 *
2486 * @return True on success, false on failure.
2487 *
2488 */
2489bool async_data_read_receive_call(ipc_callid_t *callid, ipc_call_t *data,
2490 size_t *size)
2491{
2492 assert(callid);
2493 assert(data);
2494
2495 *callid = async_get_call(data);
2496
2497 if (IPC_GET_IMETHOD(*data) != IPC_M_DATA_READ)
2498 return false;
2499
2500 if (size)
2501 *size = (size_t) IPC_GET_ARG2(*data);
2502
2503 return true;
2504}
2505
2506/** Wrapper for answering the IPC_M_DATA_READ calls using the async framework.
2507 *
2508 * This wrapper only makes it more comfortable to answer IPC_M_DATA_READ
2509 * calls so that the user doesn't have to remember the meaning of each IPC
2510 * argument.
2511 *
2512 * @param callid Hash of the IPC_M_DATA_READ call to answer.
2513 * @param src Source address for the IPC_M_DATA_READ call.
2514 * @param size Size for the IPC_M_DATA_READ call. Can be smaller than
2515 * the maximum size announced by the sender.
2516 *
2517 * @return Zero on success or a value from @ref errno.h on failure.
2518 *
2519 */
2520int async_data_read_finalize(ipc_callid_t callid, const void *src, size_t size)
2521{
2522 return ipc_answer_2(callid, EOK, (sysarg_t) src, (sysarg_t) size);
2523}
2524
2525/** Wrapper for forwarding any read request
2526 *
2527 */
2528int async_data_read_forward_fast(async_exch_t *exch, sysarg_t imethod,
2529 sysarg_t arg1, sysarg_t arg2, sysarg_t arg3, sysarg_t arg4,
2530 ipc_call_t *dataptr)
2531{
2532 if (exch == NULL)
2533 return ENOENT;
2534
2535 ipc_callid_t callid;
2536 if (!async_data_read_receive(&callid, NULL)) {
2537 ipc_answer_0(callid, EINVAL);
2538 return EINVAL;
2539 }
2540
2541 aid_t msg = async_send_fast(exch, imethod, arg1, arg2, arg3, arg4,
2542 dataptr);
2543 if (msg == 0) {
2544 ipc_answer_0(callid, EINVAL);
2545 return EINVAL;
2546 }
2547
2548 int retval = ipc_forward_fast(callid, exch->phone, 0, 0, 0,
2549 IPC_FF_ROUTE_FROM_ME);
2550 if (retval != EOK) {
2551 async_forget(msg);
2552 ipc_answer_0(callid, retval);
2553 return retval;
2554 }
2555
2556 sysarg_t rc;
2557 async_wait_for(msg, &rc);
2558
2559 return (int) rc;
2560}
2561
2562/** Wrapper for IPC_M_DATA_WRITE calls using the async framework.
2563 *
2564 * @param exch Exchange for sending the message.
2565 * @param src Address of the beginning of the source buffer.
2566 * @param size Size of the source buffer.
2567 *
2568 * @return Zero on success or a negative error code from errno.h.
2569 *
2570 */
2571int async_data_write_start(async_exch_t *exch, const void *src, size_t size)
2572{
2573 if (exch == NULL)
2574 return ENOENT;
2575
2576 return async_req_2_0(exch, IPC_M_DATA_WRITE, (sysarg_t) src,
2577 (sysarg_t) size);
2578}
2579
2580/** Wrapper for receiving the IPC_M_DATA_WRITE calls using the async framework.
2581 *
2582 * This wrapper only makes it more comfortable to receive IPC_M_DATA_WRITE
2583 * calls so that the user doesn't have to remember the meaning of each IPC
2584 * argument.
2585 *
2586 * So far, this wrapper is to be used from within a connection fibril.
2587 *
2588 * @param callid Storage for the hash of the IPC_M_DATA_WRITE.
2589 * @param size Storage for the suggested size. May be NULL.
2590 *
2591 * @return True on success, false on failure.
2592 *
2593 */
2594bool async_data_write_receive(ipc_callid_t *callid, size_t *size)
2595{
2596 ipc_call_t data;
2597 return async_data_write_receive_call(callid, &data, size);
2598}
2599
2600/** Wrapper for receiving the IPC_M_DATA_WRITE calls using the async framework.
2601 *
2602 * This wrapper only makes it more comfortable to receive IPC_M_DATA_WRITE
2603 * calls so that the user doesn't have to remember the meaning of each IPC
2604 * argument.
2605 *
2606 * So far, this wrapper is to be used from within a connection fibril.
2607 *
2608 * @param callid Storage for the hash of the IPC_M_DATA_WRITE.
2609 * @param data Storage for the ipc call data.
2610 * @param size Storage for the suggested size. May be NULL.
2611 *
2612 * @return True on success, false on failure.
2613 *
2614 */
2615bool async_data_write_receive_call(ipc_callid_t *callid, ipc_call_t *data,
2616 size_t *size)
2617{
2618 assert(callid);
2619 assert(data);
2620
2621 *callid = async_get_call(data);
2622
2623 if (IPC_GET_IMETHOD(*data) != IPC_M_DATA_WRITE)
2624 return false;
2625
2626 if (size)
2627 *size = (size_t) IPC_GET_ARG2(*data);
2628
2629 return true;
2630}
2631
2632/** Wrapper for answering the IPC_M_DATA_WRITE calls using the async framework.
2633 *
2634 * This wrapper only makes it more comfortable to answer IPC_M_DATA_WRITE
2635 * calls so that the user doesn't have to remember the meaning of each IPC
2636 * argument.
2637 *
2638 * @param callid Hash of the IPC_M_DATA_WRITE call to answer.
2639 * @param dst Final destination address for the IPC_M_DATA_WRITE call.
2640 * @param size Final size for the IPC_M_DATA_WRITE call.
2641 *
2642 * @return Zero on success or a value from @ref errno.h on failure.
2643 *
2644 */
2645int async_data_write_finalize(ipc_callid_t callid, void *dst, size_t size)
2646{
2647 return ipc_answer_2(callid, EOK, (sysarg_t) dst, (sysarg_t) size);
2648}
2649
2650/** Wrapper for receiving binary data or strings
2651 *
2652 * This wrapper only makes it more comfortable to use async_data_write_*
2653 * functions to receive binary data or strings.
2654 *
2655 * @param data Pointer to data pointer (which should be later disposed
2656 * by free()). If the operation fails, the pointer is not
2657 * touched.
2658 * @param nullterm If true then the received data is always zero terminated.
2659 * This also causes to allocate one extra byte beyond the
2660 * raw transmitted data.
2661 * @param min_size Minimum size (in bytes) of the data to receive.
2662 * @param max_size Maximum size (in bytes) of the data to receive. 0 means
2663 * no limit.
2664 * @param granulariy If non-zero then the size of the received data has to
2665 * be divisible by this value.
2666 * @param received If not NULL, the size of the received data is stored here.
2667 *
2668 * @return Zero on success or a value from @ref errno.h on failure.
2669 *
2670 */
2671int async_data_write_accept(void **data, const bool nullterm,
2672 const size_t min_size, const size_t max_size, const size_t granularity,
2673 size_t *received)
2674{
2675 assert(data);
2676
2677 ipc_callid_t callid;
2678 size_t size;
2679 if (!async_data_write_receive(&callid, &size)) {
2680 ipc_answer_0(callid, EINVAL);
2681 return EINVAL;
2682 }
2683
2684 if (size < min_size) {
2685 ipc_answer_0(callid, EINVAL);
2686 return EINVAL;
2687 }
2688
2689 if ((max_size > 0) && (size > max_size)) {
2690 ipc_answer_0(callid, EINVAL);
2691 return EINVAL;
2692 }
2693
2694 if ((granularity > 0) && ((size % granularity) != 0)) {
2695 ipc_answer_0(callid, EINVAL);
2696 return EINVAL;
2697 }
2698
2699 void *_data;
2700
2701 if (nullterm)
2702 _data = malloc(size + 1);
2703 else
2704 _data = malloc(size);
2705
2706 if (_data == NULL) {
2707 ipc_answer_0(callid, ENOMEM);
2708 return ENOMEM;
2709 }
2710
2711 int rc = async_data_write_finalize(callid, _data, size);
2712 if (rc != EOK) {
2713 free(_data);
2714 return rc;
2715 }
2716
2717 if (nullterm)
2718 ((char *) _data)[size] = 0;
2719
2720 *data = _data;
2721 if (received != NULL)
2722 *received = size;
2723
2724 return EOK;
2725}
2726
2727/** Wrapper for voiding any data that is about to be received
2728 *
2729 * This wrapper can be used to void any pending data
2730 *
2731 * @param retval Error value from @ref errno.h to be returned to the caller.
2732 *
2733 */
2734void async_data_write_void(sysarg_t retval)
2735{
2736 ipc_callid_t callid;
2737 async_data_write_receive(&callid, NULL);
2738 ipc_answer_0(callid, retval);
2739}
2740
2741/** Wrapper for forwarding any data that is about to be received
2742 *
2743 */
2744int async_data_write_forward_fast(async_exch_t *exch, sysarg_t imethod,
2745 sysarg_t arg1, sysarg_t arg2, sysarg_t arg3, sysarg_t arg4,
2746 ipc_call_t *dataptr)
2747{
2748 if (exch == NULL)
2749 return ENOENT;
2750
2751 ipc_callid_t callid;
2752 if (!async_data_write_receive(&callid, NULL)) {
2753 ipc_answer_0(callid, EINVAL);
2754 return EINVAL;
2755 }
2756
2757 aid_t msg = async_send_fast(exch, imethod, arg1, arg2, arg3, arg4,
2758 dataptr);
2759 if (msg == 0) {
2760 ipc_answer_0(callid, EINVAL);
2761 return EINVAL;
2762 }
2763
2764 int retval = ipc_forward_fast(callid, exch->phone, 0, 0, 0,
2765 IPC_FF_ROUTE_FROM_ME);
2766 if (retval != EOK) {
2767 async_forget(msg);
2768 ipc_answer_0(callid, retval);
2769 return retval;
2770 }
2771
2772 sysarg_t rc;
2773 async_wait_for(msg, &rc);
2774
2775 return (int) rc;
2776}
2777
2778/** Wrapper for sending an exchange over different exchange for cloning
2779 *
2780 * @param exch Exchange to be used for sending.
2781 * @param clone_exch Exchange to be cloned.
2782 *
2783 */
2784int async_exchange_clone(async_exch_t *exch, async_exch_t *clone_exch)
2785{
2786 return async_req_1_0(exch, IPC_M_CONNECTION_CLONE, clone_exch->phone);
2787}
2788
2789/** Wrapper for receiving the IPC_M_CONNECTION_CLONE calls.
2790 *
2791 * If the current call is IPC_M_CONNECTION_CLONE then a new
2792 * async session is created for the accepted phone.
2793 *
2794 * @param mgmt Exchange management style.
2795 *
2796 * @return New async session or NULL on failure.
2797 *
2798 */
2799async_sess_t *async_clone_receive(exch_mgmt_t mgmt)
2800{
2801 /* Accept the phone */
2802 ipc_call_t call;
2803 ipc_callid_t callid = async_get_call(&call);
2804 int phone = (int) IPC_GET_ARG1(call);
2805
2806 if ((IPC_GET_IMETHOD(call) != IPC_M_CONNECTION_CLONE) ||
2807 (phone < 0)) {
2808 async_answer_0(callid, EINVAL);
2809 return NULL;
2810 }
2811
2812 async_sess_t *sess = (async_sess_t *) malloc(sizeof(async_sess_t));
2813 if (sess == NULL) {
2814 async_answer_0(callid, ENOMEM);
2815 return NULL;
2816 }
2817
2818 sess->mgmt = mgmt;
2819 sess->phone = phone;
2820 sess->arg1 = 0;
2821 sess->arg2 = 0;
2822 sess->arg3 = 0;
2823
2824 fibril_mutex_initialize(&sess->remote_state_mtx);
2825 sess->remote_state_data = NULL;
2826
2827 list_initialize(&sess->exch_list);
2828 fibril_mutex_initialize(&sess->mutex);
2829 atomic_set(&sess->refcnt, 0);
2830
2831 /* Acknowledge the cloned phone */
2832 async_answer_0(callid, EOK);
2833
2834 return sess;
2835}
2836
2837/** Wrapper for receiving the IPC_M_CONNECT_TO_ME calls.
2838 *
2839 * If the current call is IPC_M_CONNECT_TO_ME then a new
2840 * async session is created for the accepted phone.
2841 *
2842 * @param mgmt Exchange management style.
2843 *
2844 * @return New async session.
2845 * @return NULL on failure.
2846 *
2847 */
2848async_sess_t *async_callback_receive(exch_mgmt_t mgmt)
2849{
2850 /* Accept the phone */
2851 ipc_call_t call;
2852 ipc_callid_t callid = async_get_call(&call);
2853 int phone = (int) IPC_GET_ARG5(call);
2854
2855 if ((IPC_GET_IMETHOD(call) != IPC_M_CONNECT_TO_ME) ||
2856 (phone < 0)) {
2857 async_answer_0(callid, EINVAL);
2858 return NULL;
2859 }
2860
2861 async_sess_t *sess = (async_sess_t *) malloc(sizeof(async_sess_t));
2862 if (sess == NULL) {
2863 async_answer_0(callid, ENOMEM);
2864 return NULL;
2865 }
2866
2867 sess->mgmt = mgmt;
2868 sess->phone = phone;
2869 sess->arg1 = 0;
2870 sess->arg2 = 0;
2871 sess->arg3 = 0;
2872
2873 fibril_mutex_initialize(&sess->remote_state_mtx);
2874 sess->remote_state_data = NULL;
2875
2876 list_initialize(&sess->exch_list);
2877 fibril_mutex_initialize(&sess->mutex);
2878 atomic_set(&sess->refcnt, 0);
2879
2880 /* Acknowledge the connected phone */
2881 async_answer_0(callid, EOK);
2882
2883 return sess;
2884}
2885
2886/** Wrapper for receiving the IPC_M_CONNECT_TO_ME calls.
2887 *
2888 * If the call is IPC_M_CONNECT_TO_ME then a new
2889 * async session is created. However, the phone is
2890 * not accepted automatically.
2891 *
2892 * @param mgmt Exchange management style.
2893 * @param call Call data.
2894 *
2895 * @return New async session.
2896 * @return NULL on failure.
2897 * @return NULL if the call is not IPC_M_CONNECT_TO_ME.
2898 *
2899 */
2900async_sess_t *async_callback_receive_start(exch_mgmt_t mgmt, ipc_call_t *call)
2901{
2902 int phone = (int) IPC_GET_ARG5(*call);
2903
2904 if ((IPC_GET_IMETHOD(*call) != IPC_M_CONNECT_TO_ME) ||
2905 (phone < 0))
2906 return NULL;
2907
2908 async_sess_t *sess = (async_sess_t *) malloc(sizeof(async_sess_t));
2909 if (sess == NULL)
2910 return NULL;
2911
2912 sess->mgmt = mgmt;
2913 sess->phone = phone;
2914 sess->arg1 = 0;
2915 sess->arg2 = 0;
2916 sess->arg3 = 0;
2917
2918 fibril_mutex_initialize(&sess->remote_state_mtx);
2919 sess->remote_state_data = NULL;
2920
2921 list_initialize(&sess->exch_list);
2922 fibril_mutex_initialize(&sess->mutex);
2923 atomic_set(&sess->refcnt, 0);
2924
2925 return sess;
2926}
2927
2928int async_state_change_start(async_exch_t *exch, sysarg_t arg1, sysarg_t arg2,
2929 sysarg_t arg3, async_exch_t *other_exch)
2930{
2931 return async_req_5_0(exch, IPC_M_STATE_CHANGE_AUTHORIZE,
2932 arg1, arg2, arg3, 0, other_exch->phone);
2933}
2934
2935bool async_state_change_receive(ipc_callid_t *callid, sysarg_t *arg1,
2936 sysarg_t *arg2, sysarg_t *arg3)
2937{
2938 assert(callid);
2939
2940 ipc_call_t call;
2941 *callid = async_get_call(&call);
2942
2943 if (IPC_GET_IMETHOD(call) != IPC_M_STATE_CHANGE_AUTHORIZE)
2944 return false;
2945
2946 if (arg1)
2947 *arg1 = IPC_GET_ARG1(call);
2948 if (arg2)
2949 *arg2 = IPC_GET_ARG2(call);
2950 if (arg3)
2951 *arg3 = IPC_GET_ARG3(call);
2952
2953 return true;
2954}
2955
2956int async_state_change_finalize(ipc_callid_t callid, async_exch_t *other_exch)
2957{
2958 return ipc_answer_1(callid, EOK, other_exch->phone);
2959}
2960
2961/** Lock and get session remote state
2962 *
2963 * Lock and get the local replica of the remote state
2964 * in stateful sessions. The call should be paired
2965 * with async_remote_state_release*().
2966 *
2967 * @param[in] sess Stateful session.
2968 *
2969 * @return Local replica of the remote state.
2970 *
2971 */
2972void *async_remote_state_acquire(async_sess_t *sess)
2973{
2974 fibril_mutex_lock(&sess->remote_state_mtx);
2975 return sess->remote_state_data;
2976}
2977
2978/** Update the session remote state
2979 *
2980 * Update the local replica of the remote state
2981 * in stateful sessions. The remote state must
2982 * be already locked.
2983 *
2984 * @param[in] sess Stateful session.
2985 * @param[in] state New local replica of the remote state.
2986 *
2987 */
2988void async_remote_state_update(async_sess_t *sess, void *state)
2989{
2990 assert(fibril_mutex_is_locked(&sess->remote_state_mtx));
2991 sess->remote_state_data = state;
2992}
2993
2994/** Release the session remote state
2995 *
2996 * Unlock the local replica of the remote state
2997 * in stateful sessions.
2998 *
2999 * @param[in] sess Stateful session.
3000 *
3001 */
3002void async_remote_state_release(async_sess_t *sess)
3003{
3004 assert(fibril_mutex_is_locked(&sess->remote_state_mtx));
3005
3006 fibril_mutex_unlock(&sess->remote_state_mtx);
3007}
3008
3009/** Release the session remote state and end an exchange
3010 *
3011 * Unlock the local replica of the remote state
3012 * in stateful sessions. This is convenience function
3013 * which gets the session pointer from the exchange
3014 * and also ends the exchange.
3015 *
3016 * @param[in] exch Stateful session's exchange.
3017 *
3018 */
3019void async_remote_state_release_exchange(async_exch_t *exch)
3020{
3021 if (exch == NULL)
3022 return;
3023
3024 async_sess_t *sess = exch->sess;
3025 assert(fibril_mutex_is_locked(&sess->remote_state_mtx));
3026
3027 async_exchange_end(exch);
3028 fibril_mutex_unlock(&sess->remote_state_mtx);
3029}
3030
3031/** @}
3032 */
Note: See TracBrowser for help on using the repository browser.