source: mainline/uspace/lib/c/generic/async/server.c@ 3b1cc8d

lfn serial ticket/834-toolchain-update topic/msim-upgrade topic/simplify-dev-export
Last change on this file since 3b1cc8d was 3b1cc8d, checked in by GitHub <noreply@…>, 7 years ago

Add notification queue to the async framework. (#40)

Instead of running notification handlers in the same fibril that received it,
forcing us to allocate a new fibril when the handler blocks, we instead queue
the notifications, and allow an arbitrary but fixed number of dedicated fibrils
handle them.

Although a service can increase the number of handler fibrils to reduce latency,
there are now no dynamic allocations due to received notifications.
When the same notification is received again while the older instance is still
in queue, the new notification overwrites the old and increments a counter
of received notifications.

The counter is currently unused, because passing it to the handler would
require extensive changes to user code, but it should be straightforward
to make use of it should the need arise.

  • Property mode set to 100644
File size: 47.2 KB
Line 
1/*
2 * Copyright (c) 2006 Ondrej Palkovsky
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 *
9 * - Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 * - Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
14 * - The name of the author may not be used to endorse or promote products
15 * derived from this software without specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
18 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
19 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
20 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
21 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
22 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
26 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27 */
28
29/** @addtogroup libc
30 * @{
31 */
32/** @file
33 */
34
35/**
36 * Asynchronous library
37 *
38 * The aim of this library is to provide a facility for writing programs which
39 * utilize the asynchronous nature of HelenOS IPC, yet using a normal way of
40 * programming.
41 *
42 * You should be able to write very simple multithreaded programs. The async
43 * framework will automatically take care of most of the synchronization
44 * problems.
45 *
46 * Example of use (pseudo C):
47 *
48 * 1) Multithreaded client application
49 *
50 * fibril_create(fibril1, ...);
51 * fibril_create(fibril2, ...);
52 * ...
53 *
54 * int fibril1(void *arg)
55 * {
56 * conn = async_connect_me_to(...);
57 *
58 * exch = async_exchange_begin(conn);
59 * c1 = async_send(exch);
60 * async_exchange_end(exch);
61 *
62 * exch = async_exchange_begin(conn);
63 * c2 = async_send(exch);
64 * async_exchange_end(exch);
65 *
66 * async_wait_for(c1);
67 * async_wait_for(c2);
68 * ...
69 * }
70 *
71 *
72 * 2) Multithreaded server application
73 *
74 * main()
75 * {
76 * async_manager();
77 * }
78 *
79 * port_handler(ichandle, *icall)
80 * {
81 * if (want_refuse) {
82 * async_answer_0(ichandle, ELIMIT);
83 * return;
84 * }
85 * async_answer_0(ichandle, EOK);
86 *
87 * chandle = async_get_call(&call);
88 * somehow_handle_the_call(chandle, call);
89 * async_answer_2(chandle, 1, 2, 3);
90 *
91 * chandle = async_get_call(&call);
92 * ...
93 * }
94 *
95 */
96
97#define LIBC_ASYNC_C_
98#include <ipc/ipc.h>
99#include <async.h>
100#include "../private/async.h"
101#undef LIBC_ASYNC_C_
102
103#include <ipc/irq.h>
104#include <ipc/event.h>
105#include <futex.h>
106#include <fibril.h>
107#include <adt/hash_table.h>
108#include <adt/hash.h>
109#include <adt/list.h>
110#include <assert.h>
111#include <errno.h>
112#include <sys/time.h>
113#include <libarch/barrier.h>
114#include <stdbool.h>
115#include <stdlib.h>
116#include <mem.h>
117#include <stdlib.h>
118#include <macros.h>
119#include <as.h>
120#include <abi/mm/as.h>
121#include "../private/libc.h"
122
123/** Async framework global futex */
124futex_t async_futex = FUTEX_INITIALIZER;
125
126/** Number of threads waiting for IPC in the kernel. */
127static atomic_t threads_in_ipc_wait = { 0 };
128
129/** Call data */
130typedef struct {
131 link_t link;
132
133 cap_call_handle_t chandle;
134 ipc_call_t call;
135} msg_t;
136
137/* Client connection data */
138typedef struct {
139 ht_link_t link;
140
141 task_id_t in_task_id;
142 atomic_t refcnt;
143 void *data;
144} client_t;
145
146/* Server connection data */
147typedef struct {
148 awaiter_t wdata;
149
150 /** Hash table link. */
151 ht_link_t link;
152
153 /** Incoming client task ID. */
154 task_id_t in_task_id;
155
156 /** Incoming phone hash. */
157 sysarg_t in_phone_hash;
158
159 /** Link to the client tracking structure. */
160 client_t *client;
161
162 /** Messages that should be delivered to this fibril. */
163 list_t msg_queue;
164
165 /** Identification of the opening call. */
166 cap_call_handle_t chandle;
167
168 /** Call data of the opening call. */
169 ipc_call_t call;
170
171 /** Identification of the closing call. */
172 cap_call_handle_t close_chandle;
173
174 /** Fibril function that will be used to handle the connection. */
175 async_port_handler_t handler;
176
177 /** Client data */
178 void *data;
179} connection_t;
180
181/* Notification data */
182typedef struct {
183 /** notification_hash_table link */
184 ht_link_t htlink;
185
186 /** notification_queue link */
187 link_t qlink;
188
189 /** Notification method */
190 sysarg_t imethod;
191
192 /** Notification handler */
193 async_notification_handler_t handler;
194
195 /** Notification handler argument */
196 void *arg;
197
198 /** Data of the most recent notification. */
199 ipc_call_t calldata;
200
201 /**
202 * How many notifications with this `imethod` arrived since it was last
203 * handled. If `count` > 1, `calldata` only holds the data for the most
204 * recent such notification, all the older data being lost.
205 *
206 * `async_spawn_notification_handler()` can be used to increase the
207 * number of notifications that can be processed simultaneously,
208 * reducing the likelihood of losing them when the handler blocks.
209 */
210 long count;
211} notification_t;
212
213/** Identifier of the incoming connection handled by the current fibril. */
214static fibril_local connection_t *fibril_connection;
215
216static void *default_client_data_constructor(void)
217{
218 return NULL;
219}
220
221static void default_client_data_destructor(void *data)
222{
223}
224
225static async_client_data_ctor_t async_client_data_create =
226 default_client_data_constructor;
227static async_client_data_dtor_t async_client_data_destroy =
228 default_client_data_destructor;
229
230void async_set_client_data_constructor(async_client_data_ctor_t ctor)
231{
232 assert(async_client_data_create == default_client_data_constructor);
233 async_client_data_create = ctor;
234}
235
236void async_set_client_data_destructor(async_client_data_dtor_t dtor)
237{
238 assert(async_client_data_destroy == default_client_data_destructor);
239 async_client_data_destroy = dtor;
240}
241
242static hash_table_t client_hash_table;
243static hash_table_t conn_hash_table;
244
245// TODO: lockfree notification_queue?
246static futex_t notification_futex = FUTEX_INITIALIZER;
247static hash_table_t notification_hash_table;
248static LIST_INITIALIZE(notification_queue);
249static FIBRIL_SEMAPHORE_INITIALIZE(notification_semaphore, 0);
250
251static LIST_INITIALIZE(timeout_list);
252
253static sysarg_t notification_avail = 0;
254
255static size_t client_key_hash(void *key)
256{
257 task_id_t in_task_id = *(task_id_t *) key;
258 return in_task_id;
259}
260
261static size_t client_hash(const ht_link_t *item)
262{
263 client_t *client = hash_table_get_inst(item, client_t, link);
264 return client_key_hash(&client->in_task_id);
265}
266
267static bool client_key_equal(void *key, const ht_link_t *item)
268{
269 task_id_t in_task_id = *(task_id_t *) key;
270 client_t *client = hash_table_get_inst(item, client_t, link);
271 return in_task_id == client->in_task_id;
272}
273
274/** Operations for the client hash table. */
275static hash_table_ops_t client_hash_table_ops = {
276 .hash = client_hash,
277 .key_hash = client_key_hash,
278 .key_equal = client_key_equal,
279 .equal = NULL,
280 .remove_callback = NULL
281};
282
283typedef struct {
284 task_id_t task_id;
285 sysarg_t phone_hash;
286} conn_key_t;
287
288/** Compute hash into the connection hash table
289 *
290 * The hash is based on the source task ID and the source phone hash. The task
291 * ID is included in the hash because a phone hash alone might not be unique
292 * while we still track connections for killed tasks due to kernel's recycling
293 * of phone structures.
294 *
295 * @param key Pointer to the connection key structure.
296 *
297 * @return Index into the connection hash table.
298 *
299 */
300static size_t conn_key_hash(void *key)
301{
302 conn_key_t *ck = (conn_key_t *) key;
303
304 size_t hash = 0;
305 hash = hash_combine(hash, LOWER32(ck->task_id));
306 hash = hash_combine(hash, UPPER32(ck->task_id));
307 hash = hash_combine(hash, ck->phone_hash);
308 return hash;
309}
310
311static size_t conn_hash(const ht_link_t *item)
312{
313 connection_t *conn = hash_table_get_inst(item, connection_t, link);
314 return conn_key_hash(&(conn_key_t){
315 .task_id = conn->in_task_id,
316 .phone_hash = conn->in_phone_hash
317 });
318}
319
320static bool conn_key_equal(void *key, const ht_link_t *item)
321{
322 conn_key_t *ck = (conn_key_t *) key;
323 connection_t *conn = hash_table_get_inst(item, connection_t, link);
324 return ((ck->task_id == conn->in_task_id) &&
325 (ck->phone_hash == conn->in_phone_hash));
326}
327
328/** Operations for the connection hash table. */
329static hash_table_ops_t conn_hash_table_ops = {
330 .hash = conn_hash,
331 .key_hash = conn_key_hash,
332 .key_equal = conn_key_equal,
333 .equal = NULL,
334 .remove_callback = NULL
335};
336
337static client_t *async_client_get(task_id_t client_id, bool create)
338{
339 client_t *client = NULL;
340
341 futex_down(&async_futex);
342 ht_link_t *link = hash_table_find(&client_hash_table, &client_id);
343 if (link) {
344 client = hash_table_get_inst(link, client_t, link);
345 atomic_inc(&client->refcnt);
346 } else if (create) {
347 client = malloc(sizeof(client_t));
348 if (client) {
349 client->in_task_id = client_id;
350 client->data = async_client_data_create();
351
352 atomic_set(&client->refcnt, 1);
353 hash_table_insert(&client_hash_table, &client->link);
354 }
355 }
356
357 futex_up(&async_futex);
358 return client;
359}
360
361static void async_client_put(client_t *client)
362{
363 bool destroy;
364
365 futex_down(&async_futex);
366
367 if (atomic_predec(&client->refcnt) == 0) {
368 hash_table_remove(&client_hash_table, &client->in_task_id);
369 destroy = true;
370 } else
371 destroy = false;
372
373 futex_up(&async_futex);
374
375 if (destroy) {
376 if (client->data)
377 async_client_data_destroy(client->data);
378
379 free(client);
380 }
381}
382
383/** Wrapper for client connection fibril.
384 *
385 * When a new connection arrives, a fibril with this implementing
386 * function is created.
387 *
388 * @param arg Connection structure pointer.
389 *
390 * @return Always zero.
391 *
392 */
393static errno_t connection_fibril(void *arg)
394{
395 assert(arg);
396
397 /*
398 * Setup fibril-local connection pointer.
399 */
400 fibril_connection = (connection_t *) arg;
401
402 /*
403 * Add our reference for the current connection in the client task
404 * tracking structure. If this is the first reference, create and
405 * hash in a new tracking structure.
406 */
407
408 client_t *client = async_client_get(fibril_connection->in_task_id, true);
409 if (!client) {
410 ipc_answer_0(fibril_connection->chandle, ENOMEM);
411 return 0;
412 }
413
414 fibril_connection->client = client;
415
416 /*
417 * Call the connection handler function.
418 */
419 fibril_connection->handler(fibril_connection->chandle,
420 &fibril_connection->call, fibril_connection->data);
421
422 /*
423 * Remove the reference for this client task connection.
424 */
425 async_client_put(client);
426
427 /*
428 * Remove myself from the connection hash table.
429 */
430 futex_down(&async_futex);
431 hash_table_remove(&conn_hash_table, &(conn_key_t){
432 .task_id = fibril_connection->in_task_id,
433 .phone_hash = fibril_connection->in_phone_hash
434 });
435 futex_up(&async_futex);
436
437 /*
438 * Answer all remaining messages with EHANGUP.
439 */
440 while (!list_empty(&fibril_connection->msg_queue)) {
441 msg_t *msg =
442 list_get_instance(list_first(&fibril_connection->msg_queue),
443 msg_t, link);
444
445 list_remove(&msg->link);
446 ipc_answer_0(msg->chandle, EHANGUP);
447 free(msg);
448 }
449
450 /*
451 * If the connection was hung-up, answer the last call,
452 * i.e. IPC_M_PHONE_HUNGUP.
453 */
454 if (fibril_connection->close_chandle)
455 ipc_answer_0(fibril_connection->close_chandle, EOK);
456
457 free(fibril_connection);
458 return EOK;
459}
460
461/** Create a new fibril for a new connection.
462 *
463 * Create new fibril for connection, fill in connection structures and insert it
464 * into the hash table, so that later we can easily do routing of messages to
465 * particular fibrils.
466 *
467 * @param in_task_id Identification of the incoming connection.
468 * @param in_phone_hash Identification of the incoming connection.
469 * @param chandle Handle of the opening IPC_M_CONNECT_ME_TO call.
470 * If chandle is CAP_NIL, the connection was opened by
471 * accepting the IPC_M_CONNECT_TO_ME call and this
472 * function is called directly by the server.
473 * @param call Call data of the opening call.
474 * @param handler Connection handler.
475 * @param data Client argument to pass to the connection handler.
476 *
477 * @return New fibril id or NULL on failure.
478 *
479 */
480static fid_t async_new_connection(task_id_t in_task_id, sysarg_t in_phone_hash,
481 cap_call_handle_t chandle, ipc_call_t *call, async_port_handler_t handler,
482 void *data)
483{
484 connection_t *conn = malloc(sizeof(*conn));
485 if (!conn) {
486 if (chandle != CAP_NIL)
487 ipc_answer_0(chandle, ENOMEM);
488
489 return (uintptr_t) NULL;
490 }
491
492 conn->in_task_id = in_task_id;
493 conn->in_phone_hash = in_phone_hash;
494 list_initialize(&conn->msg_queue);
495 conn->chandle = chandle;
496 conn->close_chandle = CAP_NIL;
497 conn->handler = handler;
498 conn->data = data;
499
500 if (call)
501 conn->call = *call;
502
503 /* We will activate the fibril ASAP */
504 conn->wdata.active = true;
505 conn->wdata.fid = fibril_create(connection_fibril, conn);
506
507 if (conn->wdata.fid == 0) {
508 free(conn);
509
510 if (chandle != CAP_NIL)
511 ipc_answer_0(chandle, ENOMEM);
512
513 return (uintptr_t) NULL;
514 }
515
516 /* Add connection to the connection hash table */
517
518 futex_down(&async_futex);
519 hash_table_insert(&conn_hash_table, &conn->link);
520 futex_up(&async_futex);
521
522 fibril_add_ready(conn->wdata.fid);
523
524 return conn->wdata.fid;
525}
526
527/** Wrapper for making IPC_M_CONNECT_TO_ME calls using the async framework.
528 *
529 * Ask through phone for a new connection to some service.
530 *
531 * @param exch Exchange for sending the message.
532 * @param iface Callback interface.
533 * @param arg1 User defined argument.
534 * @param arg2 User defined argument.
535 * @param handler Callback handler.
536 * @param data Handler data.
537 * @param port_id ID of the newly created port.
538 *
539 * @return Zero on success or an error code.
540 *
541 */
542errno_t async_create_callback_port(async_exch_t *exch, iface_t iface, sysarg_t arg1,
543 sysarg_t arg2, async_port_handler_t handler, void *data, port_id_t *port_id)
544{
545 if ((iface & IFACE_MOD_CALLBACK) != IFACE_MOD_CALLBACK)
546 return EINVAL;
547
548 if (exch == NULL)
549 return ENOENT;
550
551 ipc_call_t answer;
552 aid_t req = async_send_3(exch, IPC_M_CONNECT_TO_ME, iface, arg1, arg2,
553 &answer);
554
555 errno_t rc;
556 async_wait_for(req, &rc);
557 if (rc != EOK)
558 return rc;
559
560 rc = async_create_port_internal(iface, handler, data, port_id);
561 if (rc != EOK)
562 return rc;
563
564 sysarg_t phone_hash = IPC_GET_ARG5(answer);
565 fid_t fid = async_new_connection(answer.in_task_id, phone_hash,
566 CAP_NIL, NULL, handler, data);
567 if (fid == (uintptr_t) NULL)
568 return ENOMEM;
569
570 return EOK;
571}
572
573static size_t notification_key_hash(void *key)
574{
575 sysarg_t id = *(sysarg_t *) key;
576 return id;
577}
578
579static size_t notification_hash(const ht_link_t *item)
580{
581 notification_t *notification =
582 hash_table_get_inst(item, notification_t, htlink);
583 return notification_key_hash(&notification->imethod);
584}
585
586static bool notification_key_equal(void *key, const ht_link_t *item)
587{
588 sysarg_t id = *(sysarg_t *) key;
589 notification_t *notification =
590 hash_table_get_inst(item, notification_t, htlink);
591 return id == notification->imethod;
592}
593
594/** Operations for the notification hash table. */
595static hash_table_ops_t notification_hash_table_ops = {
596 .hash = notification_hash,
597 .key_hash = notification_key_hash,
598 .key_equal = notification_key_equal,
599 .equal = NULL,
600 .remove_callback = NULL
601};
602
603/** Sort in current fibril's timeout request.
604 *
605 * @param wd Wait data of the current fibril.
606 *
607 */
608void async_insert_timeout(awaiter_t *wd)
609{
610 assert(wd);
611
612 wd->to_event.occurred = false;
613 wd->to_event.inlist = true;
614
615 link_t *tmp = timeout_list.head.next;
616 while (tmp != &timeout_list.head) {
617 awaiter_t *cur =
618 list_get_instance(tmp, awaiter_t, to_event.link);
619
620 if (tv_gteq(&cur->to_event.expires, &wd->to_event.expires))
621 break;
622
623 tmp = tmp->next;
624 }
625
626 list_insert_before(&wd->to_event.link, tmp);
627}
628
629/** Try to route a call to an appropriate connection fibril.
630 *
631 * If the proper connection fibril is found, a message with the call is added to
632 * its message queue. If the fibril was not active, it is activated and all
633 * timeouts are unregistered.
634 *
635 * @param chandle Handle of the incoming call.
636 * @param call Data of the incoming call.
637 *
638 * @return False if the call doesn't match any connection.
639 * @return True if the call was passed to the respective connection fibril.
640 *
641 */
642static bool route_call(cap_call_handle_t chandle, ipc_call_t *call)
643{
644 assert(call);
645
646 futex_down(&async_futex);
647
648 ht_link_t *link = hash_table_find(&conn_hash_table, &(conn_key_t){
649 .task_id = call->in_task_id,
650 .phone_hash = call->in_phone_hash
651 });
652 if (!link) {
653 futex_up(&async_futex);
654 return false;
655 }
656
657 connection_t *conn = hash_table_get_inst(link, connection_t, link);
658
659 msg_t *msg = malloc(sizeof(*msg));
660 if (!msg) {
661 futex_up(&async_futex);
662 return false;
663 }
664
665 msg->chandle = chandle;
666 msg->call = *call;
667 list_append(&msg->link, &conn->msg_queue);
668
669 if (IPC_GET_IMETHOD(*call) == IPC_M_PHONE_HUNGUP)
670 conn->close_chandle = chandle;
671
672 /* If the connection fibril is waiting for an event, activate it */
673 if (!conn->wdata.active) {
674
675 /* If in timeout list, remove it */
676 if (conn->wdata.to_event.inlist) {
677 conn->wdata.to_event.inlist = false;
678 list_remove(&conn->wdata.to_event.link);
679 }
680
681 conn->wdata.active = true;
682 fibril_add_ready(conn->wdata.fid);
683 }
684
685 futex_up(&async_futex);
686 return true;
687}
688
689/** Function implementing the notification handler fibril. Never returns. */
690static errno_t notification_fibril_func(void *arg)
691{
692 (void) arg;
693
694 while (true) {
695 fibril_semaphore_down(&notification_semaphore);
696
697 futex_lock(&notification_futex);
698
699 /*
700 * The semaphore ensures that if we get this far,
701 * the queue must be non-empty.
702 */
703 assert(!list_empty(&notification_queue));
704
705 notification_t *notification = list_get_instance(
706 list_first(&notification_queue), notification_t, qlink);
707 list_remove(&notification->qlink);
708
709 async_notification_handler_t handler = notification->handler;
710 void *arg = notification->arg;
711 ipc_call_t calldata = notification->calldata;
712 long count = notification->count;
713
714 notification->count = 0;
715
716 futex_unlock(&notification_futex);
717
718 // FIXME: Pass count to the handler. It might be important.
719 (void) count;
720
721 if (handler)
722 handler(&calldata, arg);
723 }
724
725 /* Not reached. */
726 return EOK;
727}
728
729/**
730 * Creates a new dedicated fibril for handling notifications.
731 * By default, there is one such fibril. This function can be used to
732 * create more in order to increase the number of notification that can
733 * be processed concurrently.
734 *
735 * Currently, there is no way to destroy those fibrils after they are created.
736 */
737errno_t async_spawn_notification_handler(void)
738{
739 fid_t f = fibril_create(notification_fibril_func, NULL);
740 if (f == 0)
741 return ENOMEM;
742
743 fibril_add_ready(f);
744 return EOK;
745}
746
747/** Queue notification.
748 *
749 * @param call Data of the incoming call.
750 *
751 */
752static void queue_notification(ipc_call_t *call)
753{
754 assert(call);
755
756 futex_lock(&notification_futex);
757
758 ht_link_t *link = hash_table_find(&notification_hash_table,
759 &IPC_GET_IMETHOD(*call));
760 if (!link) {
761 /* Invalid notification. */
762 // TODO: Make sure this can't happen and turn it into assert.
763 futex_unlock(&notification_futex);
764 return;
765 }
766
767 notification_t *notification =
768 hash_table_get_inst(link, notification_t, htlink);
769
770 notification->count++;
771 notification->calldata = *call;
772
773 if (link_in_use(&notification->qlink)) {
774 /* Notification already queued. */
775 futex_unlock(&notification_futex);
776 return;
777 }
778
779 list_append(&notification->qlink, &notification_queue);
780 futex_unlock(&notification_futex);
781
782 fibril_semaphore_up(&notification_semaphore);
783}
784
785/**
786 * Creates a new notification structure and inserts it into the hash table.
787 *
788 * @param handler Function to call when notification is received.
789 * @param arg Argument for the handler function.
790 * @return The newly created notification structure.
791 */
792static notification_t *notification_create(async_notification_handler_t handler, void *arg)
793{
794 notification_t *notification = calloc(1, sizeof(notification_t));
795 if (!notification)
796 return NULL;
797
798 notification->handler = handler;
799 notification->arg = arg;
800
801 fid_t fib = 0;
802
803 futex_lock(&notification_futex);
804
805 if (notification_avail == 0) {
806 /* Attempt to create the first handler fibril. */
807 fib = fibril_create(notification_fibril_func, NULL);
808 if (fib == 0) {
809 futex_unlock(&notification_futex);
810 free(notification);
811 return NULL;
812 }
813 }
814
815 sysarg_t imethod = notification_avail;
816 notification_avail++;
817
818 notification->imethod = imethod;
819 hash_table_insert(&notification_hash_table, &notification->htlink);
820
821 futex_unlock(&notification_futex);
822
823 if (imethod == 0) {
824 assert(fib);
825 fibril_add_ready(fib);
826 }
827
828 return notification;
829}
830
831/** Subscribe to IRQ notification.
832 *
833 * @param inr IRQ number.
834 * @param handler Notification handler.
835 * @param data Notification handler client data.
836 * @param ucode Top-half pseudocode handler.
837 *
838 * @param[out] handle IRQ capability handle on success.
839 *
840 * @return An error code.
841 *
842 */
843errno_t async_irq_subscribe(int inr, async_notification_handler_t handler,
844 void *data, const irq_code_t *ucode, cap_irq_handle_t *handle)
845{
846 notification_t *notification = notification_create(handler, data);
847 if (!notification)
848 return ENOMEM;
849
850 cap_irq_handle_t ihandle;
851 errno_t rc = ipc_irq_subscribe(inr, notification->imethod, ucode,
852 &ihandle);
853 if (rc == EOK && handle != NULL) {
854 *handle = ihandle;
855 }
856 return rc;
857}
858
859/** Unsubscribe from IRQ notification.
860 *
861 * @param handle IRQ capability handle.
862 *
863 * @return Zero on success or an error code.
864 *
865 */
866errno_t async_irq_unsubscribe(cap_irq_handle_t ihandle)
867{
868 // TODO: Remove entry from hash table
869 // to avoid memory leak
870
871 return ipc_irq_unsubscribe(ihandle);
872}
873
874/** Subscribe to event notifications.
875 *
876 * @param evno Event type to subscribe.
877 * @param handler Notification handler.
878 * @param data Notification handler client data.
879 *
880 * @return Zero on success or an error code.
881 *
882 */
883errno_t async_event_subscribe(event_type_t evno,
884 async_notification_handler_t handler, void *data)
885{
886 notification_t *notification = notification_create(handler, data);
887 if (!notification)
888 return ENOMEM;
889
890 return ipc_event_subscribe(evno, notification->imethod);
891}
892
893/** Subscribe to task event notifications.
894 *
895 * @param evno Event type to subscribe.
896 * @param handler Notification handler.
897 * @param data Notification handler client data.
898 *
899 * @return Zero on success or an error code.
900 *
901 */
902errno_t async_event_task_subscribe(event_task_type_t evno,
903 async_notification_handler_t handler, void *data)
904{
905 notification_t *notification = notification_create(handler, data);
906 if (!notification)
907 return ENOMEM;
908
909 return ipc_event_task_subscribe(evno, notification->imethod);
910}
911
912/** Unmask event notifications.
913 *
914 * @param evno Event type to unmask.
915 *
916 * @return Value returned by the kernel.
917 *
918 */
919errno_t async_event_unmask(event_type_t evno)
920{
921 return ipc_event_unmask(evno);
922}
923
924/** Unmask task event notifications.
925 *
926 * @param evno Event type to unmask.
927 *
928 * @return Value returned by the kernel.
929 *
930 */
931errno_t async_event_task_unmask(event_task_type_t evno)
932{
933 return ipc_event_task_unmask(evno);
934}
935
936/** Return new incoming message for the current (fibril-local) connection.
937 *
938 * @param call Storage where the incoming call data will be stored.
939 * @param usecs Timeout in microseconds. Zero denotes no timeout.
940 *
941 * @return If no timeout was specified, then a handle of the incoming call is
942 * returned. If a timeout is specified, then a handle of the incoming
943 * call is returned unless the timeout expires prior to receiving a
944 * message. In that case zero CAP_NIL is returned.
945 */
946cap_call_handle_t async_get_call_timeout(ipc_call_t *call, suseconds_t usecs)
947{
948 assert(call);
949 assert(fibril_connection);
950
951 /*
952 * Why doing this?
953 * GCC 4.1.0 coughs on fibril_connection-> dereference.
954 * GCC 4.1.1 happilly puts the rdhwr instruction in delay slot.
955 * I would never expect to find so many errors in
956 * a compiler.
957 */
958 connection_t *conn = fibril_connection;
959
960 futex_down(&async_futex);
961
962 if (usecs) {
963 getuptime(&conn->wdata.to_event.expires);
964 tv_add_diff(&conn->wdata.to_event.expires, usecs);
965 } else
966 conn->wdata.to_event.inlist = false;
967
968 /* If nothing in queue, wait until something arrives */
969 while (list_empty(&conn->msg_queue)) {
970 if (conn->close_chandle) {
971 /*
972 * Handle the case when the connection was already
973 * closed by the client but the server did not notice
974 * the first IPC_M_PHONE_HUNGUP call and continues to
975 * call async_get_call_timeout(). Repeat
976 * IPC_M_PHONE_HUNGUP until the caller notices.
977 */
978 memset(call, 0, sizeof(ipc_call_t));
979 IPC_SET_IMETHOD(*call, IPC_M_PHONE_HUNGUP);
980 futex_up(&async_futex);
981 return conn->close_chandle;
982 }
983
984 if (usecs)
985 async_insert_timeout(&conn->wdata);
986
987 conn->wdata.active = false;
988
989 /*
990 * Note: the current fibril will be rescheduled either due to a
991 * timeout or due to an arriving message destined to it. In the
992 * former case, handle_expired_timeouts() and, in the latter
993 * case, route_call() will perform the wakeup.
994 */
995 fibril_switch(FIBRIL_TO_MANAGER);
996
997 /*
998 * Futex is up after getting back from async_manager.
999 * Get it again.
1000 */
1001 futex_down(&async_futex);
1002 if ((usecs) && (conn->wdata.to_event.occurred) &&
1003 (list_empty(&conn->msg_queue))) {
1004 /* If we timed out -> exit */
1005 futex_up(&async_futex);
1006 return CAP_NIL;
1007 }
1008 }
1009
1010 msg_t *msg = list_get_instance(list_first(&conn->msg_queue),
1011 msg_t, link);
1012 list_remove(&msg->link);
1013
1014 cap_call_handle_t chandle = msg->chandle;
1015 *call = msg->call;
1016 free(msg);
1017
1018 futex_up(&async_futex);
1019 return chandle;
1020}
1021
1022void *async_get_client_data(void)
1023{
1024 assert(fibril_connection);
1025 return fibril_connection->client->data;
1026}
1027
1028void *async_get_client_data_by_id(task_id_t client_id)
1029{
1030 client_t *client = async_client_get(client_id, false);
1031 if (!client)
1032 return NULL;
1033
1034 if (!client->data) {
1035 async_client_put(client);
1036 return NULL;
1037 }
1038
1039 return client->data;
1040}
1041
1042void async_put_client_data_by_id(task_id_t client_id)
1043{
1044 client_t *client = async_client_get(client_id, false);
1045
1046 assert(client);
1047 assert(client->data);
1048
1049 /* Drop the reference we got in async_get_client_data_by_hash(). */
1050 async_client_put(client);
1051
1052 /* Drop our own reference we got at the beginning of this function. */
1053 async_client_put(client);
1054}
1055
1056/** Handle a call that was received.
1057 *
1058 * If the call has the IPC_M_CONNECT_ME_TO method, a new connection is created.
1059 * Otherwise the call is routed to its connection fibril.
1060 *
1061 * @param chandle Handle of the incoming call.
1062 * @param call Data of the incoming call.
1063 *
1064 */
1065static void handle_call(cap_call_handle_t chandle, ipc_call_t *call)
1066{
1067 assert(call);
1068
1069 /* Kernel notification */
1070 if ((chandle == CAP_NIL) && (call->flags & IPC_CALL_NOTIF)) {
1071 queue_notification(call);
1072 return;
1073 }
1074
1075 /* New connection */
1076 if (IPC_GET_IMETHOD(*call) == IPC_M_CONNECT_ME_TO) {
1077 iface_t iface = (iface_t) IPC_GET_ARG1(*call);
1078 sysarg_t in_phone_hash = IPC_GET_ARG5(*call);
1079
1080 // TODO: Currently ignores all ports but the first one.
1081 void *data;
1082 async_port_handler_t handler =
1083 async_get_port_handler(iface, 0, &data);
1084
1085 async_new_connection(call->in_task_id, in_phone_hash, chandle,
1086 call, handler, data);
1087 return;
1088 }
1089
1090 /* Try to route the call through the connection hash table */
1091 if (route_call(chandle, call))
1092 return;
1093
1094 /* Unknown call from unknown phone - hang it up */
1095 ipc_answer_0(chandle, EHANGUP);
1096}
1097
1098/** Fire all timeouts that expired. */
1099static void handle_expired_timeouts(void)
1100{
1101 struct timeval tv;
1102 getuptime(&tv);
1103
1104 futex_down(&async_futex);
1105
1106 link_t *cur = list_first(&timeout_list);
1107 while (cur != NULL) {
1108 awaiter_t *waiter =
1109 list_get_instance(cur, awaiter_t, to_event.link);
1110
1111 if (tv_gt(&waiter->to_event.expires, &tv))
1112 break;
1113
1114 list_remove(&waiter->to_event.link);
1115 waiter->to_event.inlist = false;
1116 waiter->to_event.occurred = true;
1117
1118 /*
1119 * Redundant condition?
1120 * The fibril should not be active when it gets here.
1121 */
1122 if (!waiter->active) {
1123 waiter->active = true;
1124 fibril_add_ready(waiter->fid);
1125 }
1126
1127 cur = list_first(&timeout_list);
1128 }
1129
1130 futex_up(&async_futex);
1131}
1132
1133/** Endless loop dispatching incoming calls and answers.
1134 *
1135 * @return Never returns.
1136 *
1137 */
1138static errno_t async_manager_worker(void)
1139{
1140 while (true) {
1141 if (fibril_switch(FIBRIL_FROM_MANAGER)) {
1142 futex_up(&async_futex);
1143 /*
1144 * async_futex is always held when entering a manager
1145 * fibril.
1146 */
1147 continue;
1148 }
1149
1150 futex_down(&async_futex);
1151
1152 suseconds_t timeout;
1153 unsigned int flags = SYNCH_FLAGS_NONE;
1154 if (!list_empty(&timeout_list)) {
1155 awaiter_t *waiter = list_get_instance(
1156 list_first(&timeout_list), awaiter_t, to_event.link);
1157
1158 struct timeval tv;
1159 getuptime(&tv);
1160
1161 if (tv_gteq(&tv, &waiter->to_event.expires)) {
1162 futex_up(&async_futex);
1163 handle_expired_timeouts();
1164 /*
1165 * Notice that even if the event(s) already
1166 * expired (and thus the other fibril was
1167 * supposed to be running already),
1168 * we check for incoming IPC.
1169 *
1170 * Otherwise, a fibril that continuously
1171 * creates (almost) expired events could
1172 * prevent IPC retrieval from the kernel.
1173 */
1174 timeout = 0;
1175 flags = SYNCH_FLAGS_NON_BLOCKING;
1176
1177 } else {
1178 timeout = tv_sub_diff(&waiter->to_event.expires,
1179 &tv);
1180 futex_up(&async_futex);
1181 }
1182 } else {
1183 futex_up(&async_futex);
1184 timeout = SYNCH_NO_TIMEOUT;
1185 }
1186
1187 atomic_inc(&threads_in_ipc_wait);
1188
1189 ipc_call_t call;
1190 errno_t rc = ipc_wait_cycle(&call, timeout, flags);
1191
1192 atomic_dec(&threads_in_ipc_wait);
1193
1194 assert(rc == EOK);
1195
1196 if (call.cap_handle == CAP_NIL) {
1197 if ((call.flags &
1198 (IPC_CALL_NOTIF | IPC_CALL_ANSWERED)) == 0) {
1199 /* Neither a notification nor an answer. */
1200 handle_expired_timeouts();
1201 continue;
1202 }
1203 }
1204
1205 if (call.flags & IPC_CALL_ANSWERED)
1206 continue;
1207
1208 handle_call(call.cap_handle, &call);
1209 }
1210
1211 return 0;
1212}
1213
1214/** Function to start async_manager as a standalone fibril.
1215 *
1216 * When more kernel threads are used, one async manager should exist per thread.
1217 *
1218 * @param arg Unused.
1219 * @return Never returns.
1220 *
1221 */
1222static errno_t async_manager_fibril(void *arg)
1223{
1224 futex_up(&async_futex);
1225
1226 /*
1227 * async_futex is always locked when entering manager
1228 */
1229 async_manager_worker();
1230
1231 return 0;
1232}
1233
1234/** Add one manager to manager list. */
1235void async_create_manager(void)
1236{
1237 fid_t fid = fibril_create_generic(async_manager_fibril, NULL, PAGE_SIZE);
1238 if (fid != 0)
1239 fibril_add_manager(fid);
1240}
1241
1242/** Remove one manager from manager list */
1243void async_destroy_manager(void)
1244{
1245 fibril_remove_manager();
1246}
1247
1248/** Initialize the async framework.
1249 *
1250 */
1251void __async_server_init(void)
1252{
1253 if (!hash_table_create(&client_hash_table, 0, 0, &client_hash_table_ops))
1254 abort();
1255
1256 if (!hash_table_create(&conn_hash_table, 0, 0, &conn_hash_table_ops))
1257 abort();
1258
1259 if (!hash_table_create(&notification_hash_table, 0, 0,
1260 &notification_hash_table_ops))
1261 abort();
1262}
1263
1264errno_t async_answer_0(cap_call_handle_t chandle, errno_t retval)
1265{
1266 return ipc_answer_0(chandle, retval);
1267}
1268
1269errno_t async_answer_1(cap_call_handle_t chandle, errno_t retval, sysarg_t arg1)
1270{
1271 return ipc_answer_1(chandle, retval, arg1);
1272}
1273
1274errno_t async_answer_2(cap_call_handle_t chandle, errno_t retval, sysarg_t arg1,
1275 sysarg_t arg2)
1276{
1277 return ipc_answer_2(chandle, retval, arg1, arg2);
1278}
1279
1280errno_t async_answer_3(cap_call_handle_t chandle, errno_t retval, sysarg_t arg1,
1281 sysarg_t arg2, sysarg_t arg3)
1282{
1283 return ipc_answer_3(chandle, retval, arg1, arg2, arg3);
1284}
1285
1286errno_t async_answer_4(cap_call_handle_t chandle, errno_t retval, sysarg_t arg1,
1287 sysarg_t arg2, sysarg_t arg3, sysarg_t arg4)
1288{
1289 return ipc_answer_4(chandle, retval, arg1, arg2, arg3, arg4);
1290}
1291
1292errno_t async_answer_5(cap_call_handle_t chandle, errno_t retval, sysarg_t arg1,
1293 sysarg_t arg2, sysarg_t arg3, sysarg_t arg4, sysarg_t arg5)
1294{
1295 return ipc_answer_5(chandle, retval, arg1, arg2, arg3, arg4, arg5);
1296}
1297
1298errno_t async_forward_fast(cap_call_handle_t chandle, async_exch_t *exch,
1299 sysarg_t imethod, sysarg_t arg1, sysarg_t arg2, unsigned int mode)
1300{
1301 if (exch == NULL)
1302 return ENOENT;
1303
1304 return ipc_forward_fast(chandle, exch->phone, imethod, arg1, arg2, mode);
1305}
1306
1307errno_t async_forward_slow(cap_call_handle_t chandle, async_exch_t *exch,
1308 sysarg_t imethod, sysarg_t arg1, sysarg_t arg2, sysarg_t arg3,
1309 sysarg_t arg4, sysarg_t arg5, unsigned int mode)
1310{
1311 if (exch == NULL)
1312 return ENOENT;
1313
1314 return ipc_forward_slow(chandle, exch->phone, imethod, arg1, arg2, arg3,
1315 arg4, arg5, mode);
1316}
1317
1318/** Wrapper for making IPC_M_CONNECT_TO_ME calls using the async framework.
1319 *
1320 * Ask through phone for a new connection to some service.
1321 *
1322 * @param exch Exchange for sending the message.
1323 * @param arg1 User defined argument.
1324 * @param arg2 User defined argument.
1325 * @param arg3 User defined argument.
1326 *
1327 * @return Zero on success or an error code.
1328 *
1329 */
1330errno_t async_connect_to_me(async_exch_t *exch, sysarg_t arg1, sysarg_t arg2,
1331 sysarg_t arg3)
1332{
1333 if (exch == NULL)
1334 return ENOENT;
1335
1336 ipc_call_t answer;
1337 aid_t req = async_send_3(exch, IPC_M_CONNECT_TO_ME, arg1, arg2, arg3,
1338 &answer);
1339
1340 errno_t rc;
1341 async_wait_for(req, &rc);
1342 if (rc != EOK)
1343 return (errno_t) rc;
1344
1345 return EOK;
1346}
1347
1348/** Interrupt one thread of this task from waiting for IPC. */
1349void async_poke(void)
1350{
1351 if (atomic_get(&threads_in_ipc_wait) > 0)
1352 ipc_poke();
1353}
1354
1355/** Wrapper for receiving the IPC_M_SHARE_IN calls using the async framework.
1356 *
1357 * This wrapper only makes it more comfortable to receive IPC_M_SHARE_IN
1358 * calls so that the user doesn't have to remember the meaning of each IPC
1359 * argument.
1360 *
1361 * So far, this wrapper is to be used from within a connection fibril.
1362 *
1363 * @param chandle Storage for the handle of the IPC_M_SHARE_IN call.
1364 * @param size Destination address space area size.
1365 *
1366 * @return True on success, false on failure.
1367 *
1368 */
1369bool async_share_in_receive(cap_call_handle_t *chandle, size_t *size)
1370{
1371 assert(chandle);
1372 assert(size);
1373
1374 ipc_call_t data;
1375 *chandle = async_get_call(&data);
1376
1377 if (IPC_GET_IMETHOD(data) != IPC_M_SHARE_IN)
1378 return false;
1379
1380 *size = (size_t) IPC_GET_ARG1(data);
1381 return true;
1382}
1383
1384/** Wrapper for answering the IPC_M_SHARE_IN calls using the async framework.
1385 *
1386 * This wrapper only makes it more comfortable to answer IPC_M_SHARE_IN
1387 * calls so that the user doesn't have to remember the meaning of each IPC
1388 * argument.
1389 *
1390 * @param chandle Handle of the IPC_M_DATA_READ call to answer.
1391 * @param src Source address space base.
1392 * @param flags Flags to be used for sharing. Bits can be only cleared.
1393 *
1394 * @return Zero on success or a value from @ref errno.h on failure.
1395 *
1396 */
1397errno_t async_share_in_finalize(cap_call_handle_t chandle, void *src,
1398 unsigned int flags)
1399{
1400 // FIXME: The source has no business deciding destination address.
1401 return ipc_answer_3(chandle, EOK, (sysarg_t) src, (sysarg_t) flags,
1402 (sysarg_t) _end);
1403}
1404
1405/** Wrapper for receiving the IPC_M_SHARE_OUT calls using the async framework.
1406 *
1407 * This wrapper only makes it more comfortable to receive IPC_M_SHARE_OUT
1408 * calls so that the user doesn't have to remember the meaning of each IPC
1409 * argument.
1410 *
1411 * So far, this wrapper is to be used from within a connection fibril.
1412 *
1413 * @param chandle Storage for the hash of the IPC_M_SHARE_OUT call.
1414 * @param size Storage for the source address space area size.
1415 * @param flags Storage for the sharing flags.
1416 *
1417 * @return True on success, false on failure.
1418 *
1419 */
1420bool async_share_out_receive(cap_call_handle_t *chandle, size_t *size,
1421 unsigned int *flags)
1422{
1423 assert(chandle);
1424 assert(size);
1425 assert(flags);
1426
1427 ipc_call_t data;
1428 *chandle = async_get_call(&data);
1429
1430 if (IPC_GET_IMETHOD(data) != IPC_M_SHARE_OUT)
1431 return false;
1432
1433 *size = (size_t) IPC_GET_ARG2(data);
1434 *flags = (unsigned int) IPC_GET_ARG3(data);
1435 return true;
1436}
1437
1438/** Wrapper for answering the IPC_M_SHARE_OUT calls using the async framework.
1439 *
1440 * This wrapper only makes it more comfortable to answer IPC_M_SHARE_OUT
1441 * calls so that the user doesn't have to remember the meaning of each IPC
1442 * argument.
1443 *
1444 * @param chandle Handle of the IPC_M_DATA_WRITE call to answer.
1445 * @param dst Address of the storage for the destination address space area
1446 * base address.
1447 *
1448 * @return Zero on success or a value from @ref errno.h on failure.
1449 *
1450 */
1451errno_t async_share_out_finalize(cap_call_handle_t chandle, void **dst)
1452{
1453 return ipc_answer_2(chandle, EOK, (sysarg_t) _end, (sysarg_t) dst);
1454}
1455
1456/** Wrapper for receiving the IPC_M_DATA_READ calls using the async framework.
1457 *
1458 * This wrapper only makes it more comfortable to receive IPC_M_DATA_READ
1459 * calls so that the user doesn't have to remember the meaning of each IPC
1460 * argument.
1461 *
1462 * So far, this wrapper is to be used from within a connection fibril.
1463 *
1464 * @param chandle Storage for the handle of the IPC_M_DATA_READ.
1465 * @param size Storage for the maximum size. Can be NULL.
1466 *
1467 * @return True on success, false on failure.
1468 *
1469 */
1470bool async_data_read_receive(cap_call_handle_t *chandle, size_t *size)
1471{
1472 ipc_call_t data;
1473 return async_data_read_receive_call(chandle, &data, size);
1474}
1475
1476/** Wrapper for receiving the IPC_M_DATA_READ calls using the async framework.
1477 *
1478 * This wrapper only makes it more comfortable to receive IPC_M_DATA_READ
1479 * calls so that the user doesn't have to remember the meaning of each IPC
1480 * argument.
1481 *
1482 * So far, this wrapper is to be used from within a connection fibril.
1483 *
1484 * @param chandle Storage for the handle of the IPC_M_DATA_READ.
1485 * @param size Storage for the maximum size. Can be NULL.
1486 *
1487 * @return True on success, false on failure.
1488 *
1489 */
1490bool async_data_read_receive_call(cap_call_handle_t *chandle, ipc_call_t *data,
1491 size_t *size)
1492{
1493 assert(chandle);
1494 assert(data);
1495
1496 *chandle = async_get_call(data);
1497
1498 if (IPC_GET_IMETHOD(*data) != IPC_M_DATA_READ)
1499 return false;
1500
1501 if (size)
1502 *size = (size_t) IPC_GET_ARG2(*data);
1503
1504 return true;
1505}
1506
1507/** Wrapper for answering the IPC_M_DATA_READ calls using the async framework.
1508 *
1509 * This wrapper only makes it more comfortable to answer IPC_M_DATA_READ
1510 * calls so that the user doesn't have to remember the meaning of each IPC
1511 * argument.
1512 *
1513 * @param chandle Handle of the IPC_M_DATA_READ call to answer.
1514 * @param src Source address for the IPC_M_DATA_READ call.
1515 * @param size Size for the IPC_M_DATA_READ call. Can be smaller than
1516 * the maximum size announced by the sender.
1517 *
1518 * @return Zero on success or a value from @ref errno.h on failure.
1519 *
1520 */
1521errno_t async_data_read_finalize(cap_call_handle_t chandle, const void *src,
1522 size_t size)
1523{
1524 return ipc_answer_2(chandle, EOK, (sysarg_t) src, (sysarg_t) size);
1525}
1526
1527/** Wrapper for forwarding any read request
1528 *
1529 */
1530errno_t async_data_read_forward_fast(async_exch_t *exch, sysarg_t imethod,
1531 sysarg_t arg1, sysarg_t arg2, sysarg_t arg3, sysarg_t arg4,
1532 ipc_call_t *dataptr)
1533{
1534 if (exch == NULL)
1535 return ENOENT;
1536
1537 cap_call_handle_t chandle;
1538 if (!async_data_read_receive(&chandle, NULL)) {
1539 ipc_answer_0(chandle, EINVAL);
1540 return EINVAL;
1541 }
1542
1543 aid_t msg = async_send_fast(exch, imethod, arg1, arg2, arg3, arg4,
1544 dataptr);
1545 if (msg == 0) {
1546 ipc_answer_0(chandle, EINVAL);
1547 return EINVAL;
1548 }
1549
1550 errno_t retval = ipc_forward_fast(chandle, exch->phone, 0, 0, 0,
1551 IPC_FF_ROUTE_FROM_ME);
1552 if (retval != EOK) {
1553 async_forget(msg);
1554 ipc_answer_0(chandle, retval);
1555 return retval;
1556 }
1557
1558 errno_t rc;
1559 async_wait_for(msg, &rc);
1560
1561 return (errno_t) rc;
1562}
1563
1564/** Wrapper for receiving the IPC_M_DATA_WRITE calls using the async framework.
1565 *
1566 * This wrapper only makes it more comfortable to receive IPC_M_DATA_WRITE
1567 * calls so that the user doesn't have to remember the meaning of each IPC
1568 * argument.
1569 *
1570 * So far, this wrapper is to be used from within a connection fibril.
1571 *
1572 * @param chandle Storage for the handle of the IPC_M_DATA_WRITE.
1573 * @param size Storage for the suggested size. May be NULL.
1574 *
1575 * @return True on success, false on failure.
1576 *
1577 */
1578bool async_data_write_receive(cap_call_handle_t *chandle, size_t *size)
1579{
1580 ipc_call_t data;
1581 return async_data_write_receive_call(chandle, &data, size);
1582}
1583
1584/** Wrapper for receiving the IPC_M_DATA_WRITE calls using the async framework.
1585 *
1586 * This wrapper only makes it more comfortable to receive IPC_M_DATA_WRITE
1587 * calls so that the user doesn't have to remember the meaning of each IPC
1588 * argument.
1589 *
1590 * So far, this wrapper is to be used from within a connection fibril.
1591 *
1592 * @param chandle Storage for the handle of the IPC_M_DATA_WRITE.
1593 * @param data Storage for the ipc call data.
1594 * @param size Storage for the suggested size. May be NULL.
1595 *
1596 * @return True on success, false on failure.
1597 *
1598 */
1599bool async_data_write_receive_call(cap_call_handle_t *chandle, ipc_call_t *data,
1600 size_t *size)
1601{
1602 assert(chandle);
1603 assert(data);
1604
1605 *chandle = async_get_call(data);
1606
1607 if (IPC_GET_IMETHOD(*data) != IPC_M_DATA_WRITE)
1608 return false;
1609
1610 if (size)
1611 *size = (size_t) IPC_GET_ARG2(*data);
1612
1613 return true;
1614}
1615
1616/** Wrapper for answering the IPC_M_DATA_WRITE calls using the async framework.
1617 *
1618 * This wrapper only makes it more comfortable to answer IPC_M_DATA_WRITE
1619 * calls so that the user doesn't have to remember the meaning of each IPC
1620 * argument.
1621 *
1622 * @param chandle Handle of the IPC_M_DATA_WRITE call to answer.
1623 * @param dst Final destination address for the IPC_M_DATA_WRITE call.
1624 * @param size Final size for the IPC_M_DATA_WRITE call.
1625 *
1626 * @return Zero on success or a value from @ref errno.h on failure.
1627 *
1628 */
1629errno_t async_data_write_finalize(cap_call_handle_t chandle, void *dst,
1630 size_t size)
1631{
1632 return ipc_answer_2(chandle, EOK, (sysarg_t) dst, (sysarg_t) size);
1633}
1634
1635/** Wrapper for receiving binary data or strings
1636 *
1637 * This wrapper only makes it more comfortable to use async_data_write_*
1638 * functions to receive binary data or strings.
1639 *
1640 * @param data Pointer to data pointer (which should be later disposed
1641 * by free()). If the operation fails, the pointer is not
1642 * touched.
1643 * @param nullterm If true then the received data is always zero terminated.
1644 * This also causes to allocate one extra byte beyond the
1645 * raw transmitted data.
1646 * @param min_size Minimum size (in bytes) of the data to receive.
1647 * @param max_size Maximum size (in bytes) of the data to receive. 0 means
1648 * no limit.
1649 * @param granulariy If non-zero then the size of the received data has to
1650 * be divisible by this value.
1651 * @param received If not NULL, the size of the received data is stored here.
1652 *
1653 * @return Zero on success or a value from @ref errno.h on failure.
1654 *
1655 */
1656errno_t async_data_write_accept(void **data, const bool nullterm,
1657 const size_t min_size, const size_t max_size, const size_t granularity,
1658 size_t *received)
1659{
1660 assert(data);
1661
1662 cap_call_handle_t chandle;
1663 size_t size;
1664 if (!async_data_write_receive(&chandle, &size)) {
1665 ipc_answer_0(chandle, EINVAL);
1666 return EINVAL;
1667 }
1668
1669 if (size < min_size) {
1670 ipc_answer_0(chandle, EINVAL);
1671 return EINVAL;
1672 }
1673
1674 if ((max_size > 0) && (size > max_size)) {
1675 ipc_answer_0(chandle, EINVAL);
1676 return EINVAL;
1677 }
1678
1679 if ((granularity > 0) && ((size % granularity) != 0)) {
1680 ipc_answer_0(chandle, EINVAL);
1681 return EINVAL;
1682 }
1683
1684 void *arg_data;
1685
1686 if (nullterm)
1687 arg_data = malloc(size + 1);
1688 else
1689 arg_data = malloc(size);
1690
1691 if (arg_data == NULL) {
1692 ipc_answer_0(chandle, ENOMEM);
1693 return ENOMEM;
1694 }
1695
1696 errno_t rc = async_data_write_finalize(chandle, arg_data, size);
1697 if (rc != EOK) {
1698 free(arg_data);
1699 return rc;
1700 }
1701
1702 if (nullterm)
1703 ((char *) arg_data)[size] = 0;
1704
1705 *data = arg_data;
1706 if (received != NULL)
1707 *received = size;
1708
1709 return EOK;
1710}
1711
1712/** Wrapper for voiding any data that is about to be received
1713 *
1714 * This wrapper can be used to void any pending data
1715 *
1716 * @param retval Error value from @ref errno.h to be returned to the caller.
1717 *
1718 */
1719void async_data_write_void(errno_t retval)
1720{
1721 cap_call_handle_t chandle;
1722 async_data_write_receive(&chandle, NULL);
1723 ipc_answer_0(chandle, retval);
1724}
1725
1726/** Wrapper for forwarding any data that is about to be received
1727 *
1728 */
1729errno_t async_data_write_forward_fast(async_exch_t *exch, sysarg_t imethod,
1730 sysarg_t arg1, sysarg_t arg2, sysarg_t arg3, sysarg_t arg4,
1731 ipc_call_t *dataptr)
1732{
1733 if (exch == NULL)
1734 return ENOENT;
1735
1736 cap_call_handle_t chandle;
1737 if (!async_data_write_receive(&chandle, NULL)) {
1738 ipc_answer_0(chandle, EINVAL);
1739 return EINVAL;
1740 }
1741
1742 aid_t msg = async_send_fast(exch, imethod, arg1, arg2, arg3, arg4,
1743 dataptr);
1744 if (msg == 0) {
1745 ipc_answer_0(chandle, EINVAL);
1746 return EINVAL;
1747 }
1748
1749 errno_t retval = ipc_forward_fast(chandle, exch->phone, 0, 0, 0,
1750 IPC_FF_ROUTE_FROM_ME);
1751 if (retval != EOK) {
1752 async_forget(msg);
1753 ipc_answer_0(chandle, retval);
1754 return retval;
1755 }
1756
1757 errno_t rc;
1758 async_wait_for(msg, &rc);
1759
1760 return (errno_t) rc;
1761}
1762
1763/** Wrapper for receiving the IPC_M_CONNECT_TO_ME calls.
1764 *
1765 * If the current call is IPC_M_CONNECT_TO_ME then a new
1766 * async session is created for the accepted phone.
1767 *
1768 * @param mgmt Exchange management style.
1769 *
1770 * @return New async session.
1771 * @return NULL on failure.
1772 *
1773 */
1774async_sess_t *async_callback_receive(exch_mgmt_t mgmt)
1775{
1776 /* Accept the phone */
1777 ipc_call_t call;
1778 cap_call_handle_t chandle = async_get_call(&call);
1779 cap_phone_handle_t phandle = (cap_handle_t) IPC_GET_ARG5(call);
1780
1781 if ((IPC_GET_IMETHOD(call) != IPC_M_CONNECT_TO_ME) ||
1782 !CAP_HANDLE_VALID((phandle))) {
1783 async_answer_0(chandle, EINVAL);
1784 return NULL;
1785 }
1786
1787 async_sess_t *sess = (async_sess_t *) malloc(sizeof(async_sess_t));
1788 if (sess == NULL) {
1789 async_answer_0(chandle, ENOMEM);
1790 return NULL;
1791 }
1792
1793 sess->iface = 0;
1794 sess->mgmt = mgmt;
1795 sess->phone = phandle;
1796 sess->arg1 = 0;
1797 sess->arg2 = 0;
1798 sess->arg3 = 0;
1799
1800 fibril_mutex_initialize(&sess->remote_state_mtx);
1801 sess->remote_state_data = NULL;
1802
1803 list_initialize(&sess->exch_list);
1804 fibril_mutex_initialize(&sess->mutex);
1805 atomic_set(&sess->refcnt, 0);
1806
1807 /* Acknowledge the connected phone */
1808 async_answer_0(chandle, EOK);
1809
1810 return sess;
1811}
1812
1813/** Wrapper for receiving the IPC_M_CONNECT_TO_ME calls.
1814 *
1815 * If the call is IPC_M_CONNECT_TO_ME then a new
1816 * async session is created. However, the phone is
1817 * not accepted automatically.
1818 *
1819 * @param mgmt Exchange management style.
1820 * @param call Call data.
1821 *
1822 * @return New async session.
1823 * @return NULL on failure.
1824 * @return NULL if the call is not IPC_M_CONNECT_TO_ME.
1825 *
1826 */
1827async_sess_t *async_callback_receive_start(exch_mgmt_t mgmt, ipc_call_t *call)
1828{
1829 cap_phone_handle_t phandle = (cap_handle_t) IPC_GET_ARG5(*call);
1830
1831 if ((IPC_GET_IMETHOD(*call) != IPC_M_CONNECT_TO_ME) ||
1832 !CAP_HANDLE_VALID((phandle)))
1833 return NULL;
1834
1835 async_sess_t *sess = (async_sess_t *) malloc(sizeof(async_sess_t));
1836 if (sess == NULL)
1837 return NULL;
1838
1839 sess->iface = 0;
1840 sess->mgmt = mgmt;
1841 sess->phone = phandle;
1842 sess->arg1 = 0;
1843 sess->arg2 = 0;
1844 sess->arg3 = 0;
1845
1846 fibril_mutex_initialize(&sess->remote_state_mtx);
1847 sess->remote_state_data = NULL;
1848
1849 list_initialize(&sess->exch_list);
1850 fibril_mutex_initialize(&sess->mutex);
1851 atomic_set(&sess->refcnt, 0);
1852
1853 return sess;
1854}
1855
1856bool async_state_change_receive(cap_call_handle_t *chandle, sysarg_t *arg1,
1857 sysarg_t *arg2, sysarg_t *arg3)
1858{
1859 assert(chandle);
1860
1861 ipc_call_t call;
1862 *chandle = async_get_call(&call);
1863
1864 if (IPC_GET_IMETHOD(call) != IPC_M_STATE_CHANGE_AUTHORIZE)
1865 return false;
1866
1867 if (arg1)
1868 *arg1 = IPC_GET_ARG1(call);
1869 if (arg2)
1870 *arg2 = IPC_GET_ARG2(call);
1871 if (arg3)
1872 *arg3 = IPC_GET_ARG3(call);
1873
1874 return true;
1875}
1876
1877errno_t async_state_change_finalize(cap_call_handle_t chandle,
1878 async_exch_t *other_exch)
1879{
1880 return ipc_answer_1(chandle, EOK, CAP_HANDLE_RAW(other_exch->phone));
1881}
1882
1883/** @}
1884 */
Note: See TracBrowser for help on using the repository browser.