source: mainline/uspace/lib/c/generic/async/client.c@ fb0ec570

lfn serial ticket/834-toolchain-update topic/msim-upgrade topic/simplify-dev-export
Last change on this file since fb0ec570 was fb0ec570, checked in by Martin Decky <martin@…>, 7 years ago

cstyle improvements (no change in functionality)

  • Property mode set to 100644
File size: 32.3 KB
Line 
1/*
2 * Copyright (c) 2006 Ondrej Palkovsky
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 *
9 * - Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 * - Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
14 * - The name of the author may not be used to endorse or promote products
15 * derived from this software without specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
18 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
19 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
20 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
21 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
22 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
26 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27 */
28
29/** @addtogroup libc
30 * @{
31 */
32/** @file
33 */
34
35/**
36 * Asynchronous library
37 *
38 * The aim of this library is to provide a facility for writing programs which
39 * utilize the asynchronous nature of HelenOS IPC, yet using a normal way of
40 * programming.
41 *
42 * You should be able to write very simple multithreaded programs. The async
43 * framework will automatically take care of most of the synchronization
44 * problems.
45 *
46 * Example of use (pseudo C):
47 *
48 * 1) Multithreaded client application
49 *
50 * fibril_create(fibril1, ...);
51 * fibril_create(fibril2, ...);
52 * ...
53 *
54 * int fibril1(void *arg)
55 * {
56 * conn = async_connect_me_to(...);
57 *
58 * exch = async_exchange_begin(conn);
59 * c1 = async_send(exch);
60 * async_exchange_end(exch);
61 *
62 * exch = async_exchange_begin(conn);
63 * c2 = async_send(exch);
64 * async_exchange_end(exch);
65 *
66 * async_wait_for(c1);
67 * async_wait_for(c2);
68 * ...
69 * }
70 *
71 *
72 * 2) Multithreaded server application
73 *
74 * main()
75 * {
76 * async_manager();
77 * }
78 *
79 * port_handler(ichandle, *icall)
80 * {
81 * if (want_refuse) {
82 * async_answer_0(ichandle, ELIMIT);
83 * return;
84 * }
85 * async_answer_0(ichandle, EOK);
86 *
87 * chandle = async_get_call(&call);
88 * somehow_handle_the_call(chandle, call);
89 * async_answer_2(chandle, 1, 2, 3);
90 *
91 * chandle = async_get_call(&call);
92 * ...
93 * }
94 *
95 */
96
97#define LIBC_ASYNC_C_
98#include <ipc/ipc.h>
99#include <async.h>
100#include "../private/async.h"
101#include "../private/ns.h"
102#undef LIBC_ASYNC_C_
103
104#include <ipc/irq.h>
105#include <ipc/event.h>
106#include <futex.h>
107#include <fibril.h>
108#include <adt/hash_table.h>
109#include <adt/hash.h>
110#include <adt/list.h>
111#include <assert.h>
112#include <errno.h>
113#include <sys/time.h>
114#include <libarch/barrier.h>
115#include <stdbool.h>
116#include <stdlib.h>
117#include <mem.h>
118#include <stdlib.h>
119#include <macros.h>
120#include <as.h>
121#include <abi/mm/as.h>
122#include "../private/libc.h"
123
124/** Naming service session */
125async_sess_t session_ns;
126
127/** Message data */
128typedef struct {
129 awaiter_t wdata;
130
131 /** If reply was received. */
132 bool done;
133
134 /** If the message / reply should be discarded on arrival. */
135 bool forget;
136
137 /** If already destroyed. */
138 bool destroyed;
139
140 /** Pointer to where the answer data is stored. */
141 ipc_call_t *dataptr;
142
143 errno_t retval;
144} amsg_t;
145
146static void to_event_initialize(to_event_t *to)
147{
148 struct timeval tv = { 0, 0 };
149
150 to->inlist = false;
151 to->occurred = false;
152 link_initialize(&to->link);
153 to->expires = tv;
154}
155
156static void wu_event_initialize(wu_event_t *wu)
157{
158 wu->inlist = false;
159 link_initialize(&wu->link);
160}
161
162void awaiter_initialize(awaiter_t *aw)
163{
164 aw->fid = 0;
165 aw->active = false;
166 to_event_initialize(&aw->to_event);
167 wu_event_initialize(&aw->wu_event);
168}
169
170static amsg_t *amsg_create(void)
171{
172 amsg_t *msg = malloc(sizeof(amsg_t));
173 if (msg) {
174 msg->done = false;
175 msg->forget = false;
176 msg->destroyed = false;
177 msg->dataptr = NULL;
178 msg->retval = EINVAL;
179 awaiter_initialize(&msg->wdata);
180 }
181
182 return msg;
183}
184
185static void amsg_destroy(amsg_t *msg)
186{
187 assert(!msg->destroyed);
188 msg->destroyed = true;
189 free(msg);
190}
191
192/** Mutex protecting inactive_exch_list and avail_phone_cv.
193 *
194 */
195static FIBRIL_MUTEX_INITIALIZE(async_sess_mutex);
196
197/** List of all currently inactive exchanges.
198 *
199 */
200static LIST_INITIALIZE(inactive_exch_list);
201
202/** Condition variable to wait for a phone to become available.
203 *
204 */
205static FIBRIL_CONDVAR_INITIALIZE(avail_phone_cv);
206
207/** Initialize the async framework.
208 *
209 */
210void __async_client_init(void)
211{
212 session_ns.iface = 0;
213 session_ns.mgmt = EXCHANGE_ATOMIC;
214 session_ns.phone = PHONE_NS;
215 session_ns.arg1 = 0;
216 session_ns.arg2 = 0;
217 session_ns.arg3 = 0;
218
219 fibril_mutex_initialize(&session_ns.remote_state_mtx);
220 session_ns.remote_state_data = NULL;
221
222 list_initialize(&session_ns.exch_list);
223 fibril_mutex_initialize(&session_ns.mutex);
224 atomic_set(&session_ns.refcnt, 0);
225}
226
227/** Reply received callback.
228 *
229 * This function is called whenever a reply for an asynchronous message sent out
230 * by the asynchronous framework is received.
231 *
232 * Notify the fibril which is waiting for this message that it has arrived.
233 *
234 * @param arg Pointer to the asynchronous message record.
235 * @param retval Value returned in the answer.
236 * @param data Call data of the answer.
237 *
238 */
239static void reply_received(void *arg, errno_t retval, ipc_call_t *data)
240{
241 assert(arg);
242
243 futex_down(&async_futex);
244
245 amsg_t *msg = (amsg_t *) arg;
246 msg->retval = retval;
247
248 /* Copy data after futex_down, just in case the call was detached */
249 if ((msg->dataptr) && (data))
250 *msg->dataptr = *data;
251
252 write_barrier();
253
254 /* Remove message from timeout list */
255 if (msg->wdata.to_event.inlist)
256 list_remove(&msg->wdata.to_event.link);
257
258 msg->done = true;
259
260 if (msg->forget) {
261 assert(msg->wdata.active);
262 amsg_destroy(msg);
263 } else if (!msg->wdata.active) {
264 msg->wdata.active = true;
265 fibril_add_ready(msg->wdata.fid);
266 }
267
268 futex_up(&async_futex);
269}
270
271/** Send message and return id of the sent message.
272 *
273 * The return value can be used as input for async_wait() to wait for
274 * completion.
275 *
276 * @param exch Exchange for sending the message.
277 * @param imethod Service-defined interface and method.
278 * @param arg1 Service-defined payload argument.
279 * @param arg2 Service-defined payload argument.
280 * @param arg3 Service-defined payload argument.
281 * @param arg4 Service-defined payload argument.
282 * @param dataptr If non-NULL, storage where the reply data will be stored.
283 *
284 * @return Hash of the sent message or 0 on error.
285 *
286 */
287aid_t async_send_fast(async_exch_t *exch, sysarg_t imethod, sysarg_t arg1,
288 sysarg_t arg2, sysarg_t arg3, sysarg_t arg4, ipc_call_t *dataptr)
289{
290 if (exch == NULL)
291 return 0;
292
293 amsg_t *msg = amsg_create();
294 if (msg == NULL)
295 return 0;
296
297 msg->dataptr = dataptr;
298 msg->wdata.active = true;
299
300 ipc_call_async_4(exch->phone, imethod, arg1, arg2, arg3, arg4, msg,
301 reply_received);
302
303 return (aid_t) msg;
304}
305
306/** Send message and return id of the sent message
307 *
308 * The return value can be used as input for async_wait() to wait for
309 * completion.
310 *
311 * @param exch Exchange for sending the message.
312 * @param imethod Service-defined interface and method.
313 * @param arg1 Service-defined payload argument.
314 * @param arg2 Service-defined payload argument.
315 * @param arg3 Service-defined payload argument.
316 * @param arg4 Service-defined payload argument.
317 * @param arg5 Service-defined payload argument.
318 * @param dataptr If non-NULL, storage where the reply data will be
319 * stored.
320 *
321 * @return Hash of the sent message or 0 on error.
322 *
323 */
324aid_t async_send_slow(async_exch_t *exch, sysarg_t imethod, sysarg_t arg1,
325 sysarg_t arg2, sysarg_t arg3, sysarg_t arg4, sysarg_t arg5,
326 ipc_call_t *dataptr)
327{
328 if (exch == NULL)
329 return 0;
330
331 amsg_t *msg = amsg_create();
332 if (msg == NULL)
333 return 0;
334
335 msg->dataptr = dataptr;
336 msg->wdata.active = true;
337
338 ipc_call_async_5(exch->phone, imethod, arg1, arg2, arg3, arg4, arg5,
339 msg, reply_received);
340
341 return (aid_t) msg;
342}
343
344/** Wait for a message sent by the async framework.
345 *
346 * @param amsgid Hash of the message to wait for.
347 * @param retval Pointer to storage where the retval of the answer will
348 * be stored.
349 *
350 */
351void async_wait_for(aid_t amsgid, errno_t *retval)
352{
353 assert(amsgid);
354
355 amsg_t *msg = (amsg_t *) amsgid;
356
357 futex_down(&async_futex);
358
359 assert(!msg->forget);
360 assert(!msg->destroyed);
361
362 if (msg->done) {
363 futex_up(&async_futex);
364 goto done;
365 }
366
367 msg->wdata.fid = fibril_get_id();
368 msg->wdata.active = false;
369 msg->wdata.to_event.inlist = false;
370
371 /* Leave the async_futex locked when entering this function */
372 fibril_switch(FIBRIL_TO_MANAGER);
373
374 /* Futex is up automatically after fibril_switch */
375
376done:
377 if (retval)
378 *retval = msg->retval;
379
380 amsg_destroy(msg);
381}
382
383/** Wait for a message sent by the async framework, timeout variant.
384 *
385 * If the wait times out, the caller may choose to either wait again by calling
386 * async_wait_for() or async_wait_timeout(), or forget the message via
387 * async_forget().
388 *
389 * @param amsgid Hash of the message to wait for.
390 * @param retval Pointer to storage where the retval of the answer will
391 * be stored.
392 * @param timeout Timeout in microseconds.
393 *
394 * @return Zero on success, ETIMEOUT if the timeout has expired.
395 *
396 */
397errno_t async_wait_timeout(aid_t amsgid, errno_t *retval, suseconds_t timeout)
398{
399 assert(amsgid);
400
401 amsg_t *msg = (amsg_t *) amsgid;
402
403 futex_down(&async_futex);
404
405 assert(!msg->forget);
406 assert(!msg->destroyed);
407
408 if (msg->done) {
409 futex_up(&async_futex);
410 goto done;
411 }
412
413 /*
414 * Negative timeout is converted to zero timeout to avoid
415 * using tv_add with negative augmenter.
416 */
417 if (timeout < 0)
418 timeout = 0;
419
420 getuptime(&msg->wdata.to_event.expires);
421 tv_add_diff(&msg->wdata.to_event.expires, timeout);
422
423 /*
424 * Current fibril is inserted as waiting regardless of the
425 * "size" of the timeout.
426 *
427 * Checking for msg->done and immediately bailing out when
428 * timeout == 0 would mean that the manager fibril would never
429 * run (consider single threaded program).
430 * Thus the IPC answer would be never retrieved from the kernel.
431 *
432 * Notice that the actual delay would be very small because we
433 * - switch to manager fibril
434 * - the manager sees expired timeout
435 * - and thus adds us back to ready queue
436 * - manager switches back to some ready fibril
437 * (prior it, it checks for incoming IPC).
438 *
439 */
440 msg->wdata.fid = fibril_get_id();
441 msg->wdata.active = false;
442 async_insert_timeout(&msg->wdata);
443
444 /* Leave the async_futex locked when entering this function */
445 fibril_switch(FIBRIL_TO_MANAGER);
446
447 /* Futex is up automatically after fibril_switch */
448
449 if (!msg->done)
450 return ETIMEOUT;
451
452done:
453 if (retval)
454 *retval = msg->retval;
455
456 amsg_destroy(msg);
457
458 return 0;
459}
460
461/** Discard the message / reply on arrival.
462 *
463 * The message will be marked to be discarded once the reply arrives in
464 * reply_received(). It is not allowed to call async_wait_for() or
465 * async_wait_timeout() on this message after a call to this function.
466 *
467 * @param amsgid Hash of the message to forget.
468 */
469void async_forget(aid_t amsgid)
470{
471 amsg_t *msg = (amsg_t *) amsgid;
472
473 assert(msg);
474 assert(!msg->forget);
475 assert(!msg->destroyed);
476
477 futex_down(&async_futex);
478
479 if (msg->done) {
480 amsg_destroy(msg);
481 } else {
482 msg->dataptr = NULL;
483 msg->forget = true;
484 }
485
486 futex_up(&async_futex);
487}
488
489/** Wait for specified time.
490 *
491 * The current fibril is suspended but the thread continues to execute.
492 *
493 * @param timeout Duration of the wait in microseconds.
494 *
495 */
496void async_usleep(suseconds_t timeout)
497{
498 awaiter_t awaiter;
499 awaiter_initialize(&awaiter);
500
501 awaiter.fid = fibril_get_id();
502
503 getuptime(&awaiter.to_event.expires);
504 tv_add_diff(&awaiter.to_event.expires, timeout);
505
506 futex_down(&async_futex);
507
508 async_insert_timeout(&awaiter);
509
510 /* Leave the async_futex locked when entering this function */
511 fibril_switch(FIBRIL_TO_MANAGER);
512
513 /* Futex is up automatically after fibril_switch() */
514}
515
516/** Delay execution for the specified number of seconds
517 *
518 * @param sec Number of seconds to sleep
519 */
520void async_sleep(unsigned int sec)
521{
522 /*
523 * Sleep in 1000 second steps to support
524 * full argument range
525 */
526
527 while (sec > 0) {
528 unsigned int period = (sec > 1000) ? 1000 : sec;
529
530 async_usleep(period * 1000000);
531 sec -= period;
532 }
533}
534
535/** Pseudo-synchronous message sending - fast version.
536 *
537 * Send message asynchronously and return only after the reply arrives.
538 *
539 * This function can only transfer 4 register payload arguments. For
540 * transferring more arguments, see the slower async_req_slow().
541 *
542 * @param exch Exchange for sending the message.
543 * @param imethod Interface and method of the call.
544 * @param arg1 Service-defined payload argument.
545 * @param arg2 Service-defined payload argument.
546 * @param arg3 Service-defined payload argument.
547 * @param arg4 Service-defined payload argument.
548 * @param r1 If non-NULL, storage for the 1st reply argument.
549 * @param r2 If non-NULL, storage for the 2nd reply argument.
550 * @param r3 If non-NULL, storage for the 3rd reply argument.
551 * @param r4 If non-NULL, storage for the 4th reply argument.
552 * @param r5 If non-NULL, storage for the 5th reply argument.
553 *
554 * @return Return code of the reply or an error code.
555 *
556 */
557errno_t async_req_fast(async_exch_t *exch, sysarg_t imethod, sysarg_t arg1,
558 sysarg_t arg2, sysarg_t arg3, sysarg_t arg4, sysarg_t *r1, sysarg_t *r2,
559 sysarg_t *r3, sysarg_t *r4, sysarg_t *r5)
560{
561 if (exch == NULL)
562 return ENOENT;
563
564 ipc_call_t result;
565 aid_t aid = async_send_4(exch, imethod, arg1, arg2, arg3, arg4,
566 &result);
567
568 errno_t rc;
569 async_wait_for(aid, &rc);
570
571 if (r1)
572 *r1 = IPC_GET_ARG1(result);
573
574 if (r2)
575 *r2 = IPC_GET_ARG2(result);
576
577 if (r3)
578 *r3 = IPC_GET_ARG3(result);
579
580 if (r4)
581 *r4 = IPC_GET_ARG4(result);
582
583 if (r5)
584 *r5 = IPC_GET_ARG5(result);
585
586 return rc;
587}
588
589/** Pseudo-synchronous message sending - slow version.
590 *
591 * Send message asynchronously and return only after the reply arrives.
592 *
593 * @param exch Exchange for sending the message.
594 * @param imethod Interface and method of the call.
595 * @param arg1 Service-defined payload argument.
596 * @param arg2 Service-defined payload argument.
597 * @param arg3 Service-defined payload argument.
598 * @param arg4 Service-defined payload argument.
599 * @param arg5 Service-defined payload argument.
600 * @param r1 If non-NULL, storage for the 1st reply argument.
601 * @param r2 If non-NULL, storage for the 2nd reply argument.
602 * @param r3 If non-NULL, storage for the 3rd reply argument.
603 * @param r4 If non-NULL, storage for the 4th reply argument.
604 * @param r5 If non-NULL, storage for the 5th reply argument.
605 *
606 * @return Return code of the reply or an error code.
607 *
608 */
609errno_t async_req_slow(async_exch_t *exch, sysarg_t imethod, sysarg_t arg1,
610 sysarg_t arg2, sysarg_t arg3, sysarg_t arg4, sysarg_t arg5, sysarg_t *r1,
611 sysarg_t *r2, sysarg_t *r3, sysarg_t *r4, sysarg_t *r5)
612{
613 if (exch == NULL)
614 return ENOENT;
615
616 ipc_call_t result;
617 aid_t aid = async_send_5(exch, imethod, arg1, arg2, arg3, arg4, arg5,
618 &result);
619
620 errno_t rc;
621 async_wait_for(aid, &rc);
622
623 if (r1)
624 *r1 = IPC_GET_ARG1(result);
625
626 if (r2)
627 *r2 = IPC_GET_ARG2(result);
628
629 if (r3)
630 *r3 = IPC_GET_ARG3(result);
631
632 if (r4)
633 *r4 = IPC_GET_ARG4(result);
634
635 if (r5)
636 *r5 = IPC_GET_ARG5(result);
637
638 return rc;
639}
640
641void async_msg_0(async_exch_t *exch, sysarg_t imethod)
642{
643 if (exch != NULL)
644 ipc_call_async_0(exch->phone, imethod, NULL, NULL);
645}
646
647void async_msg_1(async_exch_t *exch, sysarg_t imethod, sysarg_t arg1)
648{
649 if (exch != NULL)
650 ipc_call_async_1(exch->phone, imethod, arg1, NULL, NULL);
651}
652
653void async_msg_2(async_exch_t *exch, sysarg_t imethod, sysarg_t arg1,
654 sysarg_t arg2)
655{
656 if (exch != NULL)
657 ipc_call_async_2(exch->phone, imethod, arg1, arg2, NULL, NULL);
658}
659
660void async_msg_3(async_exch_t *exch, sysarg_t imethod, sysarg_t arg1,
661 sysarg_t arg2, sysarg_t arg3)
662{
663 if (exch != NULL)
664 ipc_call_async_3(exch->phone, imethod, arg1, arg2, arg3, NULL,
665 NULL);
666}
667
668void async_msg_4(async_exch_t *exch, sysarg_t imethod, sysarg_t arg1,
669 sysarg_t arg2, sysarg_t arg3, sysarg_t arg4)
670{
671 if (exch != NULL)
672 ipc_call_async_4(exch->phone, imethod, arg1, arg2, arg3, arg4,
673 NULL, NULL);
674}
675
676void async_msg_5(async_exch_t *exch, sysarg_t imethod, sysarg_t arg1,
677 sysarg_t arg2, sysarg_t arg3, sysarg_t arg4, sysarg_t arg5)
678{
679 if (exch != NULL)
680 ipc_call_async_5(exch->phone, imethod, arg1, arg2, arg3, arg4,
681 arg5, NULL, NULL);
682}
683
684static errno_t async_connect_me_to_internal(cap_phone_handle_t phone,
685 sysarg_t arg1, sysarg_t arg2, sysarg_t arg3, sysarg_t arg4,
686 cap_phone_handle_t *out_phone)
687{
688 ipc_call_t result;
689
690 // XXX: Workaround for GCC's inability to infer association between
691 // rc == EOK and *out_phone being assigned.
692 *out_phone = CAP_NIL;
693
694 amsg_t *msg = amsg_create();
695 if (!msg)
696 return ENOENT;
697
698 msg->dataptr = &result;
699 msg->wdata.active = true;
700
701 ipc_call_async_4(phone, IPC_M_CONNECT_ME_TO, arg1, arg2, arg3, arg4,
702 msg, reply_received);
703
704 errno_t rc;
705 async_wait_for((aid_t) msg, &rc);
706
707 if (rc != EOK)
708 return rc;
709
710 *out_phone = (cap_phone_handle_t) IPC_GET_ARG5(result);
711 return EOK;
712}
713
714/** Wrapper for making IPC_M_CONNECT_ME_TO calls using the async framework.
715 *
716 * Ask through for a new connection to some service.
717 *
718 * @param mgmt Exchange management style.
719 * @param exch Exchange for sending the message.
720 * @param arg1 User defined argument.
721 * @param arg2 User defined argument.
722 * @param arg3 User defined argument.
723 *
724 * @return New session on success or NULL on error.
725 *
726 */
727async_sess_t *async_connect_me_to(exch_mgmt_t mgmt, async_exch_t *exch,
728 sysarg_t arg1, sysarg_t arg2, sysarg_t arg3)
729{
730 if (exch == NULL) {
731 errno = ENOENT;
732 return NULL;
733 }
734
735 async_sess_t *sess = (async_sess_t *) malloc(sizeof(async_sess_t));
736 if (sess == NULL) {
737 errno = ENOMEM;
738 return NULL;
739 }
740
741 cap_phone_handle_t phone;
742 errno_t rc = async_connect_me_to_internal(exch->phone, arg1, arg2, arg3,
743 0, &phone);
744 if (rc != EOK) {
745 errno = rc;
746 free(sess);
747 return NULL;
748 }
749
750 sess->iface = 0;
751 sess->mgmt = mgmt;
752 sess->phone = phone;
753 sess->arg1 = arg1;
754 sess->arg2 = arg2;
755 sess->arg3 = arg3;
756
757 fibril_mutex_initialize(&sess->remote_state_mtx);
758 sess->remote_state_data = NULL;
759
760 list_initialize(&sess->exch_list);
761 fibril_mutex_initialize(&sess->mutex);
762 atomic_set(&sess->refcnt, 0);
763
764 return sess;
765}
766
767/** Wrapper for making IPC_M_CONNECT_ME_TO calls using the async framework.
768 *
769 * Ask through phone for a new connection to some service and block until
770 * success.
771 *
772 * @param exch Exchange for sending the message.
773 * @param iface Connection interface.
774 * @param arg2 User defined argument.
775 * @param arg3 User defined argument.
776 *
777 * @return New session on success or NULL on error.
778 *
779 */
780async_sess_t *async_connect_me_to_iface(async_exch_t *exch, iface_t iface,
781 sysarg_t arg2, sysarg_t arg3)
782{
783 if (exch == NULL) {
784 errno = ENOENT;
785 return NULL;
786 }
787
788 async_sess_t *sess = (async_sess_t *) malloc(sizeof(async_sess_t));
789 if (sess == NULL) {
790 errno = ENOMEM;
791 return NULL;
792 }
793
794 cap_phone_handle_t phone;
795 errno_t rc = async_connect_me_to_internal(exch->phone, iface, arg2,
796 arg3, 0, &phone);
797 if (rc != EOK) {
798 errno = rc;
799 free(sess);
800 return NULL;
801 }
802
803 sess->iface = iface;
804 sess->phone = phone;
805 sess->arg1 = iface;
806 sess->arg2 = arg2;
807 sess->arg3 = arg3;
808
809 fibril_mutex_initialize(&sess->remote_state_mtx);
810 sess->remote_state_data = NULL;
811
812 list_initialize(&sess->exch_list);
813 fibril_mutex_initialize(&sess->mutex);
814 atomic_set(&sess->refcnt, 0);
815
816 return sess;
817}
818
819/** Set arguments for new connections.
820 *
821 * FIXME This is an ugly hack to work around the problem that parallel
822 * exchanges are implemented using parallel connections. When we create
823 * a callback session, the framework does not know arguments for the new
824 * connections.
825 *
826 * The proper solution seems to be to implement parallel exchanges using
827 * tagging.
828 */
829void async_sess_args_set(async_sess_t *sess, sysarg_t arg1, sysarg_t arg2,
830 sysarg_t arg3)
831{
832 sess->arg1 = arg1;
833 sess->arg2 = arg2;
834 sess->arg3 = arg3;
835}
836
837/** Wrapper for making IPC_M_CONNECT_ME_TO calls using the async framework.
838 *
839 * Ask through phone for a new connection to some service and block until
840 * success.
841 *
842 * @param mgmt Exchange management style.
843 * @param exch Exchange for sending the message.
844 * @param arg1 User defined argument.
845 * @param arg2 User defined argument.
846 * @param arg3 User defined argument.
847 *
848 * @return New session on success or NULL on error.
849 *
850 */
851async_sess_t *async_connect_me_to_blocking(exch_mgmt_t mgmt, async_exch_t *exch,
852 sysarg_t arg1, sysarg_t arg2, sysarg_t arg3)
853{
854 if (exch == NULL) {
855 errno = ENOENT;
856 return NULL;
857 }
858
859 async_sess_t *sess = (async_sess_t *) malloc(sizeof(async_sess_t));
860 if (sess == NULL) {
861 errno = ENOMEM;
862 return NULL;
863 }
864
865 cap_phone_handle_t phone;
866 errno_t rc = async_connect_me_to_internal(exch->phone, arg1, arg2, arg3,
867 IPC_FLAG_BLOCKING, &phone);
868
869 if (rc != EOK) {
870 errno = rc;
871 free(sess);
872 return NULL;
873 }
874
875 sess->iface = 0;
876 sess->mgmt = mgmt;
877 sess->phone = phone;
878 sess->arg1 = arg1;
879 sess->arg2 = arg2;
880 sess->arg3 = arg3;
881
882 fibril_mutex_initialize(&sess->remote_state_mtx);
883 sess->remote_state_data = NULL;
884
885 list_initialize(&sess->exch_list);
886 fibril_mutex_initialize(&sess->mutex);
887 atomic_set(&sess->refcnt, 0);
888
889 return sess;
890}
891
892/** Wrapper for making IPC_M_CONNECT_ME_TO calls using the async framework.
893 *
894 * Ask through phone for a new connection to some service and block until
895 * success.
896 *
897 * @param exch Exchange for sending the message.
898 * @param iface Connection interface.
899 * @param arg2 User defined argument.
900 * @param arg3 User defined argument.
901 *
902 * @return New session on success or NULL on error.
903 *
904 */
905async_sess_t *async_connect_me_to_blocking_iface(async_exch_t *exch, iface_t iface,
906 sysarg_t arg2, sysarg_t arg3)
907{
908 if (exch == NULL) {
909 errno = ENOENT;
910 return NULL;
911 }
912
913 async_sess_t *sess = (async_sess_t *) malloc(sizeof(async_sess_t));
914 if (sess == NULL) {
915 errno = ENOMEM;
916 return NULL;
917 }
918
919 cap_phone_handle_t phone;
920 errno_t rc = async_connect_me_to_internal(exch->phone, iface, arg2,
921 arg3, IPC_FLAG_BLOCKING, &phone);
922 if (rc != EOK) {
923 errno = rc;
924 free(sess);
925 return NULL;
926 }
927
928 sess->iface = iface;
929 sess->phone = phone;
930 sess->arg1 = iface;
931 sess->arg2 = arg2;
932 sess->arg3 = arg3;
933
934 fibril_mutex_initialize(&sess->remote_state_mtx);
935 sess->remote_state_data = NULL;
936
937 list_initialize(&sess->exch_list);
938 fibril_mutex_initialize(&sess->mutex);
939 atomic_set(&sess->refcnt, 0);
940
941 return sess;
942}
943
944/** Connect to a task specified by id.
945 *
946 */
947async_sess_t *async_connect_kbox(task_id_t id)
948{
949 async_sess_t *sess = (async_sess_t *) malloc(sizeof(async_sess_t));
950 if (sess == NULL) {
951 errno = ENOMEM;
952 return NULL;
953 }
954
955 cap_phone_handle_t phone;
956 errno_t rc = ipc_connect_kbox(id, &phone);
957 if (rc != EOK) {
958 errno = rc;
959 free(sess);
960 return NULL;
961 }
962
963 sess->iface = 0;
964 sess->mgmt = EXCHANGE_ATOMIC;
965 sess->phone = phone;
966 sess->arg1 = 0;
967 sess->arg2 = 0;
968 sess->arg3 = 0;
969
970 fibril_mutex_initialize(&sess->remote_state_mtx);
971 sess->remote_state_data = NULL;
972
973 list_initialize(&sess->exch_list);
974 fibril_mutex_initialize(&sess->mutex);
975 atomic_set(&sess->refcnt, 0);
976
977 return sess;
978}
979
980static errno_t async_hangup_internal(cap_phone_handle_t phone)
981{
982 return ipc_hangup(phone);
983}
984
985/** Wrapper for ipc_hangup.
986 *
987 * @param sess Session to hung up.
988 *
989 * @return Zero on success or an error code.
990 *
991 */
992errno_t async_hangup(async_sess_t *sess)
993{
994 async_exch_t *exch;
995
996 assert(sess);
997
998 if (atomic_get(&sess->refcnt) > 0)
999 return EBUSY;
1000
1001 fibril_mutex_lock(&async_sess_mutex);
1002
1003 errno_t rc = async_hangup_internal(sess->phone);
1004
1005 while (!list_empty(&sess->exch_list)) {
1006 exch = (async_exch_t *)
1007 list_get_instance(list_first(&sess->exch_list),
1008 async_exch_t, sess_link);
1009
1010 list_remove(&exch->sess_link);
1011 list_remove(&exch->global_link);
1012 async_hangup_internal(exch->phone);
1013 free(exch);
1014 }
1015
1016 free(sess);
1017
1018 fibril_mutex_unlock(&async_sess_mutex);
1019
1020 return rc;
1021}
1022
1023/** Start new exchange in a session.
1024 *
1025 * @param session Session.
1026 *
1027 * @return New exchange or NULL on error.
1028 *
1029 */
1030async_exch_t *async_exchange_begin(async_sess_t *sess)
1031{
1032 if (sess == NULL)
1033 return NULL;
1034
1035 exch_mgmt_t mgmt = sess->mgmt;
1036 if (sess->iface != 0)
1037 mgmt = sess->iface & IFACE_EXCHANGE_MASK;
1038
1039 async_exch_t *exch = NULL;
1040
1041 fibril_mutex_lock(&async_sess_mutex);
1042
1043 if (!list_empty(&sess->exch_list)) {
1044 /*
1045 * There are inactive exchanges in the session.
1046 */
1047 exch = (async_exch_t *)
1048 list_get_instance(list_first(&sess->exch_list),
1049 async_exch_t, sess_link);
1050
1051 list_remove(&exch->sess_link);
1052 list_remove(&exch->global_link);
1053 } else {
1054 /*
1055 * There are no available exchanges in the session.
1056 */
1057
1058 if ((mgmt == EXCHANGE_ATOMIC) ||
1059 (mgmt == EXCHANGE_SERIALIZE)) {
1060 exch = (async_exch_t *) malloc(sizeof(async_exch_t));
1061 if (exch != NULL) {
1062 link_initialize(&exch->sess_link);
1063 link_initialize(&exch->global_link);
1064 exch->sess = sess;
1065 exch->phone = sess->phone;
1066 }
1067 } else if (mgmt == EXCHANGE_PARALLEL) {
1068 cap_phone_handle_t phone;
1069 errno_t rc;
1070
1071 retry:
1072 /*
1073 * Make a one-time attempt to connect a new data phone.
1074 */
1075 rc = async_connect_me_to_internal(sess->phone, sess->arg1,
1076 sess->arg2, sess->arg3, 0, &phone);
1077 if (rc == EOK) {
1078 exch = (async_exch_t *) malloc(sizeof(async_exch_t));
1079 if (exch != NULL) {
1080 link_initialize(&exch->sess_link);
1081 link_initialize(&exch->global_link);
1082 exch->sess = sess;
1083 exch->phone = phone;
1084 } else
1085 async_hangup_internal(phone);
1086 } else if (!list_empty(&inactive_exch_list)) {
1087 /*
1088 * We did not manage to connect a new phone. But we
1089 * can try to close some of the currently inactive
1090 * connections in other sessions and try again.
1091 */
1092 exch = (async_exch_t *)
1093 list_get_instance(list_first(&inactive_exch_list),
1094 async_exch_t, global_link);
1095
1096 list_remove(&exch->sess_link);
1097 list_remove(&exch->global_link);
1098 async_hangup_internal(exch->phone);
1099 free(exch);
1100 goto retry;
1101 } else {
1102 /*
1103 * Wait for a phone to become available.
1104 */
1105 fibril_condvar_wait(&avail_phone_cv, &async_sess_mutex);
1106 goto retry;
1107 }
1108 }
1109 }
1110
1111 fibril_mutex_unlock(&async_sess_mutex);
1112
1113 if (exch != NULL) {
1114 atomic_inc(&sess->refcnt);
1115
1116 if (mgmt == EXCHANGE_SERIALIZE)
1117 fibril_mutex_lock(&sess->mutex);
1118 }
1119
1120 return exch;
1121}
1122
1123/** Finish an exchange.
1124 *
1125 * @param exch Exchange to finish.
1126 *
1127 */
1128void async_exchange_end(async_exch_t *exch)
1129{
1130 if (exch == NULL)
1131 return;
1132
1133 async_sess_t *sess = exch->sess;
1134 assert(sess != NULL);
1135
1136 exch_mgmt_t mgmt = sess->mgmt;
1137 if (sess->iface != 0)
1138 mgmt = sess->iface & IFACE_EXCHANGE_MASK;
1139
1140 atomic_dec(&sess->refcnt);
1141
1142 if (mgmt == EXCHANGE_SERIALIZE)
1143 fibril_mutex_unlock(&sess->mutex);
1144
1145 fibril_mutex_lock(&async_sess_mutex);
1146
1147 list_append(&exch->sess_link, &sess->exch_list);
1148 list_append(&exch->global_link, &inactive_exch_list);
1149 fibril_condvar_signal(&avail_phone_cv);
1150
1151 fibril_mutex_unlock(&async_sess_mutex);
1152}
1153
1154/** Wrapper for IPC_M_SHARE_IN calls using the async framework.
1155 *
1156 * @param exch Exchange for sending the message.
1157 * @param size Size of the destination address space area.
1158 * @param arg User defined argument.
1159 * @param flags Storage for the received flags. Can be NULL.
1160 * @param dst Address of the storage for the destination address space area
1161 * base address. Cannot be NULL.
1162 *
1163 * @return Zero on success or an error code from errno.h.
1164 *
1165 */
1166errno_t async_share_in_start(async_exch_t *exch, size_t size, sysarg_t arg,
1167 unsigned int *flags, void **dst)
1168{
1169 if (exch == NULL)
1170 return ENOENT;
1171
1172 sysarg_t _flags = 0;
1173 sysarg_t _dst = (sysarg_t) -1;
1174 errno_t res = async_req_2_4(exch, IPC_M_SHARE_IN, (sysarg_t) size,
1175 arg, NULL, &_flags, NULL, &_dst);
1176
1177 if (flags)
1178 *flags = (unsigned int) _flags;
1179
1180 *dst = (void *) _dst;
1181 return res;
1182}
1183
1184/** Wrapper for IPC_M_SHARE_OUT calls using the async framework.
1185 *
1186 * @param exch Exchange for sending the message.
1187 * @param src Source address space area base address.
1188 * @param flags Flags to be used for sharing. Bits can be only cleared.
1189 *
1190 * @return Zero on success or an error code from errno.h.
1191 *
1192 */
1193errno_t async_share_out_start(async_exch_t *exch, void *src, unsigned int flags)
1194{
1195 if (exch == NULL)
1196 return ENOENT;
1197
1198 return async_req_3_0(exch, IPC_M_SHARE_OUT, (sysarg_t) src, 0,
1199 (sysarg_t) flags);
1200}
1201
1202/** Start IPC_M_DATA_READ using the async framework.
1203 *
1204 * @param exch Exchange for sending the message.
1205 * @param dst Address of the beginning of the destination buffer.
1206 * @param size Size of the destination buffer (in bytes).
1207 * @param dataptr Storage of call data (arg 2 holds actual data size).
1208 *
1209 * @return Hash of the sent message or 0 on error.
1210 *
1211 */
1212aid_t async_data_read(async_exch_t *exch, void *dst, size_t size,
1213 ipc_call_t *dataptr)
1214{
1215 return async_send_2(exch, IPC_M_DATA_READ, (sysarg_t) dst,
1216 (sysarg_t) size, dataptr);
1217}
1218
1219/** Wrapper for IPC_M_DATA_READ calls using the async framework.
1220 *
1221 * @param exch Exchange for sending the message.
1222 * @param dst Address of the beginning of the destination buffer.
1223 * @param size Size of the destination buffer.
1224 *
1225 * @return Zero on success or an error code from errno.h.
1226 *
1227 */
1228errno_t async_data_read_start(async_exch_t *exch, void *dst, size_t size)
1229{
1230 if (exch == NULL)
1231 return ENOENT;
1232
1233 return async_req_2_0(exch, IPC_M_DATA_READ, (sysarg_t) dst,
1234 (sysarg_t) size);
1235}
1236
1237/** Wrapper for IPC_M_DATA_WRITE calls using the async framework.
1238 *
1239 * @param exch Exchange for sending the message.
1240 * @param src Address of the beginning of the source buffer.
1241 * @param size Size of the source buffer.
1242 *
1243 * @return Zero on success or an error code from errno.h.
1244 *
1245 */
1246errno_t async_data_write_start(async_exch_t *exch, const void *src, size_t size)
1247{
1248 if (exch == NULL)
1249 return ENOENT;
1250
1251 return async_req_2_0(exch, IPC_M_DATA_WRITE, (sysarg_t) src,
1252 (sysarg_t) size);
1253}
1254
1255errno_t async_state_change_start(async_exch_t *exch, sysarg_t arg1, sysarg_t arg2,
1256 sysarg_t arg3, async_exch_t *other_exch)
1257{
1258 return async_req_5_0(exch, IPC_M_STATE_CHANGE_AUTHORIZE,
1259 arg1, arg2, arg3, 0, CAP_HANDLE_RAW(other_exch->phone));
1260}
1261
1262/** Lock and get session remote state
1263 *
1264 * Lock and get the local replica of the remote state
1265 * in stateful sessions. The call should be paired
1266 * with async_remote_state_release*().
1267 *
1268 * @param[in] sess Stateful session.
1269 *
1270 * @return Local replica of the remote state.
1271 *
1272 */
1273void *async_remote_state_acquire(async_sess_t *sess)
1274{
1275 fibril_mutex_lock(&sess->remote_state_mtx);
1276 return sess->remote_state_data;
1277}
1278
1279/** Update the session remote state
1280 *
1281 * Update the local replica of the remote state
1282 * in stateful sessions. The remote state must
1283 * be already locked.
1284 *
1285 * @param[in] sess Stateful session.
1286 * @param[in] state New local replica of the remote state.
1287 *
1288 */
1289void async_remote_state_update(async_sess_t *sess, void *state)
1290{
1291 assert(fibril_mutex_is_locked(&sess->remote_state_mtx));
1292 sess->remote_state_data = state;
1293}
1294
1295/** Release the session remote state
1296 *
1297 * Unlock the local replica of the remote state
1298 * in stateful sessions.
1299 *
1300 * @param[in] sess Stateful session.
1301 *
1302 */
1303void async_remote_state_release(async_sess_t *sess)
1304{
1305 assert(fibril_mutex_is_locked(&sess->remote_state_mtx));
1306
1307 fibril_mutex_unlock(&sess->remote_state_mtx);
1308}
1309
1310/** Release the session remote state and end an exchange
1311 *
1312 * Unlock the local replica of the remote state
1313 * in stateful sessions. This is convenience function
1314 * which gets the session pointer from the exchange
1315 * and also ends the exchange.
1316 *
1317 * @param[in] exch Stateful session's exchange.
1318 *
1319 */
1320void async_remote_state_release_exchange(async_exch_t *exch)
1321{
1322 if (exch == NULL)
1323 return;
1324
1325 async_sess_t *sess = exch->sess;
1326 assert(fibril_mutex_is_locked(&sess->remote_state_mtx));
1327
1328 async_exchange_end(exch);
1329 fibril_mutex_unlock(&sess->remote_state_mtx);
1330}
1331
1332void *async_as_area_create(void *base, size_t size, unsigned int flags,
1333 async_sess_t *pager, sysarg_t id1, sysarg_t id2, sysarg_t id3)
1334{
1335 as_area_pager_info_t pager_info = {
1336 .pager = pager->phone,
1337 .id1 = id1,
1338 .id2 = id2,
1339 .id3 = id3
1340 };
1341 return as_area_create(base, size, flags, &pager_info);
1342}
1343
1344/** @}
1345 */
Note: See TracBrowser for help on using the repository browser.