source: mainline/uspace/lib/c/generic/async/client.c@ ab6edb6

lfn serial ticket/834-toolchain-update topic/msim-upgrade topic/simplify-dev-export
Last change on this file since ab6edb6 was ab6edb6, checked in by Jiří Zárevúcky <jiri.zarevucky@…>, 7 years ago

Simplify the interaction between async_futex and fibril_switch().

  • Property mode set to 100644
File size: 32.3 KB
Line 
1/*
2 * Copyright (c) 2006 Ondrej Palkovsky
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 *
9 * - Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 * - Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
14 * - The name of the author may not be used to endorse or promote products
15 * derived from this software without specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
18 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
19 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
20 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
21 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
22 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
26 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27 */
28
29/** @addtogroup libc
30 * @{
31 */
32/** @file
33 */
34
35/**
36 * Asynchronous library
37 *
38 * The aim of this library is to provide a facility for writing programs which
39 * utilize the asynchronous nature of HelenOS IPC, yet using a normal way of
40 * programming.
41 *
42 * You should be able to write very simple multithreaded programs. The async
43 * framework will automatically take care of most of the synchronization
44 * problems.
45 *
46 * Example of use (pseudo C):
47 *
48 * 1) Multithreaded client application
49 *
50 * fibril_create(fibril1, ...);
51 * fibril_create(fibril2, ...);
52 * ...
53 *
54 * int fibril1(void *arg)
55 * {
56 * conn = async_connect_me_to(...);
57 *
58 * exch = async_exchange_begin(conn);
59 * c1 = async_send(exch);
60 * async_exchange_end(exch);
61 *
62 * exch = async_exchange_begin(conn);
63 * c2 = async_send(exch);
64 * async_exchange_end(exch);
65 *
66 * async_wait_for(c1);
67 * async_wait_for(c2);
68 * ...
69 * }
70 *
71 *
72 * 2) Multithreaded server application
73 *
74 * main()
75 * {
76 * async_manager();
77 * }
78 *
79 * port_handler(ichandle, *icall)
80 * {
81 * if (want_refuse) {
82 * async_answer_0(ichandle, ELIMIT);
83 * return;
84 * }
85 * async_answer_0(ichandle, EOK);
86 *
87 * chandle = async_get_call(&call);
88 * somehow_handle_the_call(chandle, call);
89 * async_answer_2(chandle, 1, 2, 3);
90 *
91 * chandle = async_get_call(&call);
92 * ...
93 * }
94 *
95 */
96
97#define LIBC_ASYNC_C_
98#include <ipc/ipc.h>
99#include <async.h>
100#include "../private/async.h"
101#include "../private/ns.h"
102#undef LIBC_ASYNC_C_
103
104#include <ipc/irq.h>
105#include <ipc/event.h>
106#include <futex.h>
107#include <fibril.h>
108#include <adt/hash_table.h>
109#include <adt/hash.h>
110#include <adt/list.h>
111#include <assert.h>
112#include <errno.h>
113#include <sys/time.h>
114#include <libarch/barrier.h>
115#include <stdbool.h>
116#include <stdlib.h>
117#include <mem.h>
118#include <stdlib.h>
119#include <macros.h>
120#include <as.h>
121#include <abi/mm/as.h>
122#include "../private/libc.h"
123#include "../private/fibril.h"
124
125/** Naming service session */
126async_sess_t session_ns;
127
128/** Message data */
129typedef struct {
130 awaiter_t wdata;
131
132 /** If reply was received. */
133 bool done;
134
135 /** If the message / reply should be discarded on arrival. */
136 bool forget;
137
138 /** If already destroyed. */
139 bool destroyed;
140
141 /** Pointer to where the answer data is stored. */
142 ipc_call_t *dataptr;
143
144 errno_t retval;
145} amsg_t;
146
147static void to_event_initialize(to_event_t *to)
148{
149 struct timeval tv = { 0, 0 };
150
151 to->inlist = false;
152 to->occurred = false;
153 link_initialize(&to->link);
154 to->expires = tv;
155}
156
157static void wu_event_initialize(wu_event_t *wu)
158{
159 wu->inlist = false;
160 link_initialize(&wu->link);
161}
162
163void awaiter_initialize(awaiter_t *aw)
164{
165 aw->fid = 0;
166 aw->active = false;
167 to_event_initialize(&aw->to_event);
168 wu_event_initialize(&aw->wu_event);
169}
170
171static amsg_t *amsg_create(void)
172{
173 amsg_t *msg = malloc(sizeof(amsg_t));
174 if (msg) {
175 msg->done = false;
176 msg->forget = false;
177 msg->destroyed = false;
178 msg->dataptr = NULL;
179 msg->retval = EINVAL;
180 awaiter_initialize(&msg->wdata);
181 }
182
183 return msg;
184}
185
186static void amsg_destroy(amsg_t *msg)
187{
188 assert(!msg->destroyed);
189 msg->destroyed = true;
190 free(msg);
191}
192
193
194/** Mutex protecting inactive_exch_list and avail_phone_cv.
195 *
196 */
197static FIBRIL_MUTEX_INITIALIZE(async_sess_mutex);
198
199/** List of all currently inactive exchanges.
200 *
201 */
202static LIST_INITIALIZE(inactive_exch_list);
203
204/** Condition variable to wait for a phone to become available.
205 *
206 */
207static FIBRIL_CONDVAR_INITIALIZE(avail_phone_cv);
208
209/** Initialize the async framework.
210 *
211 */
212void __async_client_init(void)
213{
214 session_ns.iface = 0;
215 session_ns.mgmt = EXCHANGE_ATOMIC;
216 session_ns.phone = PHONE_NS;
217 session_ns.arg1 = 0;
218 session_ns.arg2 = 0;
219 session_ns.arg3 = 0;
220
221 fibril_mutex_initialize(&session_ns.remote_state_mtx);
222 session_ns.remote_state_data = NULL;
223
224 list_initialize(&session_ns.exch_list);
225 fibril_mutex_initialize(&session_ns.mutex);
226 atomic_set(&session_ns.refcnt, 0);
227}
228
229/** Reply received callback.
230 *
231 * This function is called whenever a reply for an asynchronous message sent out
232 * by the asynchronous framework is received.
233 *
234 * Notify the fibril which is waiting for this message that it has arrived.
235 *
236 * @param arg Pointer to the asynchronous message record.
237 * @param retval Value returned in the answer.
238 * @param data Call data of the answer.
239 *
240 */
241static void reply_received(void *arg, errno_t retval, ipc_call_t *data)
242{
243 assert(arg);
244
245 futex_lock(&async_futex);
246
247 amsg_t *msg = (amsg_t *) arg;
248 msg->retval = retval;
249
250 /* Copy data after futex_down, just in case the call was detached */
251 if ((msg->dataptr) && (data))
252 *msg->dataptr = *data;
253
254 write_barrier();
255
256 /* Remove message from timeout list */
257 if (msg->wdata.to_event.inlist)
258 list_remove(&msg->wdata.to_event.link);
259
260 msg->done = true;
261
262 if (msg->forget) {
263 assert(msg->wdata.active);
264 amsg_destroy(msg);
265 } else if (!msg->wdata.active) {
266 msg->wdata.active = true;
267 fibril_add_ready(msg->wdata.fid);
268 }
269
270 futex_unlock(&async_futex);
271}
272
273/** Send message and return id of the sent message.
274 *
275 * The return value can be used as input for async_wait() to wait for
276 * completion.
277 *
278 * @param exch Exchange for sending the message.
279 * @param imethod Service-defined interface and method.
280 * @param arg1 Service-defined payload argument.
281 * @param arg2 Service-defined payload argument.
282 * @param arg3 Service-defined payload argument.
283 * @param arg4 Service-defined payload argument.
284 * @param dataptr If non-NULL, storage where the reply data will be stored.
285 *
286 * @return Hash of the sent message or 0 on error.
287 *
288 */
289aid_t async_send_fast(async_exch_t *exch, sysarg_t imethod, sysarg_t arg1,
290 sysarg_t arg2, sysarg_t arg3, sysarg_t arg4, ipc_call_t *dataptr)
291{
292 if (exch == NULL)
293 return 0;
294
295 amsg_t *msg = amsg_create();
296 if (msg == NULL)
297 return 0;
298
299 msg->dataptr = dataptr;
300 msg->wdata.active = true;
301
302 ipc_call_async_4(exch->phone, imethod, arg1, arg2, arg3, arg4, msg,
303 reply_received);
304
305 return (aid_t) msg;
306}
307
308/** Send message and return id of the sent message
309 *
310 * The return value can be used as input for async_wait() to wait for
311 * completion.
312 *
313 * @param exch Exchange for sending the message.
314 * @param imethod Service-defined interface and method.
315 * @param arg1 Service-defined payload argument.
316 * @param arg2 Service-defined payload argument.
317 * @param arg3 Service-defined payload argument.
318 * @param arg4 Service-defined payload argument.
319 * @param arg5 Service-defined payload argument.
320 * @param dataptr If non-NULL, storage where the reply data will be
321 * stored.
322 *
323 * @return Hash of the sent message or 0 on error.
324 *
325 */
326aid_t async_send_slow(async_exch_t *exch, sysarg_t imethod, sysarg_t arg1,
327 sysarg_t arg2, sysarg_t arg3, sysarg_t arg4, sysarg_t arg5,
328 ipc_call_t *dataptr)
329{
330 if (exch == NULL)
331 return 0;
332
333 amsg_t *msg = amsg_create();
334 if (msg == NULL)
335 return 0;
336
337 msg->dataptr = dataptr;
338 msg->wdata.active = true;
339
340 ipc_call_async_5(exch->phone, imethod, arg1, arg2, arg3, arg4, arg5,
341 msg, reply_received);
342
343 return (aid_t) msg;
344}
345
346/** Wait for a message sent by the async framework.
347 *
348 * @param amsgid Hash of the message to wait for.
349 * @param retval Pointer to storage where the retval of the answer will
350 * be stored.
351 *
352 */
353void async_wait_for(aid_t amsgid, errno_t *retval)
354{
355 assert(amsgid);
356
357 amsg_t *msg = (amsg_t *) amsgid;
358
359 futex_lock(&async_futex);
360
361 assert(!msg->forget);
362 assert(!msg->destroyed);
363
364 if (msg->done) {
365 futex_unlock(&async_futex);
366 goto done;
367 }
368
369 msg->wdata.fid = fibril_get_id();
370 msg->wdata.active = false;
371 msg->wdata.to_event.inlist = false;
372
373 /* Leave the async_futex locked when entering this function */
374 fibril_switch(FIBRIL_FROM_BLOCKED);
375 futex_unlock(&async_futex);
376
377done:
378 if (retval)
379 *retval = msg->retval;
380
381 amsg_destroy(msg);
382}
383
384/** Wait for a message sent by the async framework, timeout variant.
385 *
386 * If the wait times out, the caller may choose to either wait again by calling
387 * async_wait_for() or async_wait_timeout(), or forget the message via
388 * async_forget().
389 *
390 * @param amsgid Hash of the message to wait for.
391 * @param retval Pointer to storage where the retval of the answer will
392 * be stored.
393 * @param timeout Timeout in microseconds.
394 *
395 * @return Zero on success, ETIMEOUT if the timeout has expired.
396 *
397 */
398errno_t async_wait_timeout(aid_t amsgid, errno_t *retval, suseconds_t timeout)
399{
400 assert(amsgid);
401
402 amsg_t *msg = (amsg_t *) amsgid;
403
404 futex_lock(&async_futex);
405
406 assert(!msg->forget);
407 assert(!msg->destroyed);
408
409 if (msg->done) {
410 futex_unlock(&async_futex);
411 goto done;
412 }
413
414 /*
415 * Negative timeout is converted to zero timeout to avoid
416 * using tv_add with negative augmenter.
417 */
418 if (timeout < 0)
419 timeout = 0;
420
421 getuptime(&msg->wdata.to_event.expires);
422 tv_add_diff(&msg->wdata.to_event.expires, timeout);
423
424 /*
425 * Current fibril is inserted as waiting regardless of the
426 * "size" of the timeout.
427 *
428 * Checking for msg->done and immediately bailing out when
429 * timeout == 0 would mean that the manager fibril would never
430 * run (consider single threaded program).
431 * Thus the IPC answer would be never retrieved from the kernel.
432 *
433 * Notice that the actual delay would be very small because we
434 * - switch to manager fibril
435 * - the manager sees expired timeout
436 * - and thus adds us back to ready queue
437 * - manager switches back to some ready fibril
438 * (prior it, it checks for incoming IPC).
439 *
440 */
441 msg->wdata.fid = fibril_get_id();
442 msg->wdata.active = false;
443 async_insert_timeout(&msg->wdata);
444
445 /* Leave the async_futex locked when entering this function */
446 fibril_switch(FIBRIL_FROM_BLOCKED);
447 futex_unlock(&async_futex);
448
449 if (!msg->done)
450 return ETIMEOUT;
451
452done:
453 if (retval)
454 *retval = msg->retval;
455
456 amsg_destroy(msg);
457
458 return 0;
459}
460
461/** Discard the message / reply on arrival.
462 *
463 * The message will be marked to be discarded once the reply arrives in
464 * reply_received(). It is not allowed to call async_wait_for() or
465 * async_wait_timeout() on this message after a call to this function.
466 *
467 * @param amsgid Hash of the message to forget.
468 */
469void async_forget(aid_t amsgid)
470{
471 amsg_t *msg = (amsg_t *) amsgid;
472
473 assert(msg);
474 assert(!msg->forget);
475 assert(!msg->destroyed);
476
477 futex_lock(&async_futex);
478
479 if (msg->done) {
480 amsg_destroy(msg);
481 } else {
482 msg->dataptr = NULL;
483 msg->forget = true;
484 }
485
486 futex_unlock(&async_futex);
487}
488
489/** Wait for specified time.
490 *
491 * The current fibril is suspended but the thread continues to execute.
492 *
493 * @param timeout Duration of the wait in microseconds.
494 *
495 */
496void async_usleep(suseconds_t timeout)
497{
498 awaiter_t awaiter;
499 awaiter_initialize(&awaiter);
500
501 awaiter.fid = fibril_get_id();
502
503 getuptime(&awaiter.to_event.expires);
504 tv_add_diff(&awaiter.to_event.expires, timeout);
505
506 futex_lock(&async_futex);
507
508 async_insert_timeout(&awaiter);
509
510 /* Leave the async_futex locked when entering this function */
511 fibril_switch(FIBRIL_FROM_BLOCKED);
512 futex_unlock(&async_futex);
513}
514
515/** Delay execution for the specified number of seconds
516 *
517 * @param sec Number of seconds to sleep
518 */
519void async_sleep(unsigned int sec)
520{
521 /*
522 * Sleep in 1000 second steps to support
523 * full argument range
524 */
525
526 while (sec > 0) {
527 unsigned int period = (sec > 1000) ? 1000 : sec;
528
529 async_usleep(period * 1000000);
530 sec -= period;
531 }
532}
533
534/** Pseudo-synchronous message sending - fast version.
535 *
536 * Send message asynchronously and return only after the reply arrives.
537 *
538 * This function can only transfer 4 register payload arguments. For
539 * transferring more arguments, see the slower async_req_slow().
540 *
541 * @param exch Exchange for sending the message.
542 * @param imethod Interface and method of the call.
543 * @param arg1 Service-defined payload argument.
544 * @param arg2 Service-defined payload argument.
545 * @param arg3 Service-defined payload argument.
546 * @param arg4 Service-defined payload argument.
547 * @param r1 If non-NULL, storage for the 1st reply argument.
548 * @param r2 If non-NULL, storage for the 2nd reply argument.
549 * @param r3 If non-NULL, storage for the 3rd reply argument.
550 * @param r4 If non-NULL, storage for the 4th reply argument.
551 * @param r5 If non-NULL, storage for the 5th reply argument.
552 *
553 * @return Return code of the reply or an error code.
554 *
555 */
556errno_t async_req_fast(async_exch_t *exch, sysarg_t imethod, sysarg_t arg1,
557 sysarg_t arg2, sysarg_t arg3, sysarg_t arg4, sysarg_t *r1, sysarg_t *r2,
558 sysarg_t *r3, sysarg_t *r4, sysarg_t *r5)
559{
560 if (exch == NULL)
561 return ENOENT;
562
563 ipc_call_t result;
564 aid_t aid = async_send_4(exch, imethod, arg1, arg2, arg3, arg4,
565 &result);
566
567 errno_t rc;
568 async_wait_for(aid, &rc);
569
570 if (r1)
571 *r1 = IPC_GET_ARG1(result);
572
573 if (r2)
574 *r2 = IPC_GET_ARG2(result);
575
576 if (r3)
577 *r3 = IPC_GET_ARG3(result);
578
579 if (r4)
580 *r4 = IPC_GET_ARG4(result);
581
582 if (r5)
583 *r5 = IPC_GET_ARG5(result);
584
585 return rc;
586}
587
588/** Pseudo-synchronous message sending - slow version.
589 *
590 * Send message asynchronously and return only after the reply arrives.
591 *
592 * @param exch Exchange for sending the message.
593 * @param imethod Interface and method of the call.
594 * @param arg1 Service-defined payload argument.
595 * @param arg2 Service-defined payload argument.
596 * @param arg3 Service-defined payload argument.
597 * @param arg4 Service-defined payload argument.
598 * @param arg5 Service-defined payload argument.
599 * @param r1 If non-NULL, storage for the 1st reply argument.
600 * @param r2 If non-NULL, storage for the 2nd reply argument.
601 * @param r3 If non-NULL, storage for the 3rd reply argument.
602 * @param r4 If non-NULL, storage for the 4th reply argument.
603 * @param r5 If non-NULL, storage for the 5th reply argument.
604 *
605 * @return Return code of the reply or an error code.
606 *
607 */
608errno_t async_req_slow(async_exch_t *exch, sysarg_t imethod, sysarg_t arg1,
609 sysarg_t arg2, sysarg_t arg3, sysarg_t arg4, sysarg_t arg5, sysarg_t *r1,
610 sysarg_t *r2, sysarg_t *r3, sysarg_t *r4, sysarg_t *r5)
611{
612 if (exch == NULL)
613 return ENOENT;
614
615 ipc_call_t result;
616 aid_t aid = async_send_5(exch, imethod, arg1, arg2, arg3, arg4, arg5,
617 &result);
618
619 errno_t rc;
620 async_wait_for(aid, &rc);
621
622 if (r1)
623 *r1 = IPC_GET_ARG1(result);
624
625 if (r2)
626 *r2 = IPC_GET_ARG2(result);
627
628 if (r3)
629 *r3 = IPC_GET_ARG3(result);
630
631 if (r4)
632 *r4 = IPC_GET_ARG4(result);
633
634 if (r5)
635 *r5 = IPC_GET_ARG5(result);
636
637 return rc;
638}
639
640void async_msg_0(async_exch_t *exch, sysarg_t imethod)
641{
642 if (exch != NULL)
643 ipc_call_async_0(exch->phone, imethod, NULL, NULL);
644}
645
646void async_msg_1(async_exch_t *exch, sysarg_t imethod, sysarg_t arg1)
647{
648 if (exch != NULL)
649 ipc_call_async_1(exch->phone, imethod, arg1, NULL, NULL);
650}
651
652void async_msg_2(async_exch_t *exch, sysarg_t imethod, sysarg_t arg1,
653 sysarg_t arg2)
654{
655 if (exch != NULL)
656 ipc_call_async_2(exch->phone, imethod, arg1, arg2, NULL, NULL);
657}
658
659void async_msg_3(async_exch_t *exch, sysarg_t imethod, sysarg_t arg1,
660 sysarg_t arg2, sysarg_t arg3)
661{
662 if (exch != NULL)
663 ipc_call_async_3(exch->phone, imethod, arg1, arg2, arg3, NULL,
664 NULL);
665}
666
667void async_msg_4(async_exch_t *exch, sysarg_t imethod, sysarg_t arg1,
668 sysarg_t arg2, sysarg_t arg3, sysarg_t arg4)
669{
670 if (exch != NULL)
671 ipc_call_async_4(exch->phone, imethod, arg1, arg2, arg3, arg4,
672 NULL, NULL);
673}
674
675void async_msg_5(async_exch_t *exch, sysarg_t imethod, sysarg_t arg1,
676 sysarg_t arg2, sysarg_t arg3, sysarg_t arg4, sysarg_t arg5)
677{
678 if (exch != NULL)
679 ipc_call_async_5(exch->phone, imethod, arg1, arg2, arg3, arg4,
680 arg5, NULL, NULL);
681}
682
683static errno_t async_connect_me_to_internal(cap_phone_handle_t phone,
684 sysarg_t arg1, sysarg_t arg2, sysarg_t arg3, sysarg_t arg4,
685 cap_phone_handle_t *out_phone)
686{
687 ipc_call_t result;
688
689 // XXX: Workaround for GCC's inability to infer association between
690 // rc == EOK and *out_phone being assigned.
691 *out_phone = CAP_NIL;
692
693 amsg_t *msg = amsg_create();
694 if (!msg)
695 return ENOENT;
696
697 msg->dataptr = &result;
698 msg->wdata.active = true;
699
700 ipc_call_async_4(phone, IPC_M_CONNECT_ME_TO, arg1, arg2, arg3, arg4,
701 msg, reply_received);
702
703 errno_t rc;
704 async_wait_for((aid_t) msg, &rc);
705
706 if (rc != EOK)
707 return rc;
708
709 *out_phone = (cap_phone_handle_t) IPC_GET_ARG5(result);
710 return EOK;
711}
712
713/** Wrapper for making IPC_M_CONNECT_ME_TO calls using the async framework.
714 *
715 * Ask through for a new connection to some service.
716 *
717 * @param mgmt Exchange management style.
718 * @param exch Exchange for sending the message.
719 * @param arg1 User defined argument.
720 * @param arg2 User defined argument.
721 * @param arg3 User defined argument.
722 *
723 * @return New session on success or NULL on error.
724 *
725 */
726async_sess_t *async_connect_me_to(exch_mgmt_t mgmt, async_exch_t *exch,
727 sysarg_t arg1, sysarg_t arg2, sysarg_t arg3)
728{
729 if (exch == NULL) {
730 errno = ENOENT;
731 return NULL;
732 }
733
734 async_sess_t *sess = (async_sess_t *) malloc(sizeof(async_sess_t));
735 if (sess == NULL) {
736 errno = ENOMEM;
737 return NULL;
738 }
739
740 cap_phone_handle_t phone;
741 errno_t rc = async_connect_me_to_internal(exch->phone, arg1, arg2, arg3,
742 0, &phone);
743 if (rc != EOK) {
744 errno = rc;
745 free(sess);
746 return NULL;
747 }
748
749 sess->iface = 0;
750 sess->mgmt = mgmt;
751 sess->phone = phone;
752 sess->arg1 = arg1;
753 sess->arg2 = arg2;
754 sess->arg3 = arg3;
755
756 fibril_mutex_initialize(&sess->remote_state_mtx);
757 sess->remote_state_data = NULL;
758
759 list_initialize(&sess->exch_list);
760 fibril_mutex_initialize(&sess->mutex);
761 atomic_set(&sess->refcnt, 0);
762
763 return sess;
764}
765
766/** Wrapper for making IPC_M_CONNECT_ME_TO calls using the async framework.
767 *
768 * Ask through phone for a new connection to some service and block until
769 * success.
770 *
771 * @param exch Exchange for sending the message.
772 * @param iface Connection interface.
773 * @param arg2 User defined argument.
774 * @param arg3 User defined argument.
775 *
776 * @return New session on success or NULL on error.
777 *
778 */
779async_sess_t *async_connect_me_to_iface(async_exch_t *exch, iface_t iface,
780 sysarg_t arg2, sysarg_t arg3)
781{
782 if (exch == NULL) {
783 errno = ENOENT;
784 return NULL;
785 }
786
787 async_sess_t *sess = (async_sess_t *) malloc(sizeof(async_sess_t));
788 if (sess == NULL) {
789 errno = ENOMEM;
790 return NULL;
791 }
792
793 cap_phone_handle_t phone;
794 errno_t rc = async_connect_me_to_internal(exch->phone, iface, arg2,
795 arg3, 0, &phone);
796 if (rc != EOK) {
797 errno = rc;
798 free(sess);
799 return NULL;
800 }
801
802 sess->iface = iface;
803 sess->phone = phone;
804 sess->arg1 = iface;
805 sess->arg2 = arg2;
806 sess->arg3 = arg3;
807
808 fibril_mutex_initialize(&sess->remote_state_mtx);
809 sess->remote_state_data = NULL;
810
811 list_initialize(&sess->exch_list);
812 fibril_mutex_initialize(&sess->mutex);
813 atomic_set(&sess->refcnt, 0);
814
815 return sess;
816}
817
818/** Set arguments for new connections.
819 *
820 * FIXME This is an ugly hack to work around the problem that parallel
821 * exchanges are implemented using parallel connections. When we create
822 * a callback session, the framework does not know arguments for the new
823 * connections.
824 *
825 * The proper solution seems to be to implement parallel exchanges using
826 * tagging.
827 */
828void async_sess_args_set(async_sess_t *sess, sysarg_t arg1, sysarg_t arg2,
829 sysarg_t arg3)
830{
831 sess->arg1 = arg1;
832 sess->arg2 = arg2;
833 sess->arg3 = arg3;
834}
835
836/** Wrapper for making IPC_M_CONNECT_ME_TO calls using the async framework.
837 *
838 * Ask through phone for a new connection to some service and block until
839 * success.
840 *
841 * @param mgmt Exchange management style.
842 * @param exch Exchange for sending the message.
843 * @param arg1 User defined argument.
844 * @param arg2 User defined argument.
845 * @param arg3 User defined argument.
846 *
847 * @return New session on success or NULL on error.
848 *
849 */
850async_sess_t *async_connect_me_to_blocking(exch_mgmt_t mgmt, async_exch_t *exch,
851 sysarg_t arg1, sysarg_t arg2, sysarg_t arg3)
852{
853 if (exch == NULL) {
854 errno = ENOENT;
855 return NULL;
856 }
857
858 async_sess_t *sess = (async_sess_t *) malloc(sizeof(async_sess_t));
859 if (sess == NULL) {
860 errno = ENOMEM;
861 return NULL;
862 }
863
864 cap_phone_handle_t phone;
865 errno_t rc = async_connect_me_to_internal(exch->phone, arg1, arg2, arg3,
866 IPC_FLAG_BLOCKING, &phone);
867
868 if (rc != EOK) {
869 errno = rc;
870 free(sess);
871 return NULL;
872 }
873
874 sess->iface = 0;
875 sess->mgmt = mgmt;
876 sess->phone = phone;
877 sess->arg1 = arg1;
878 sess->arg2 = arg2;
879 sess->arg3 = arg3;
880
881 fibril_mutex_initialize(&sess->remote_state_mtx);
882 sess->remote_state_data = NULL;
883
884 list_initialize(&sess->exch_list);
885 fibril_mutex_initialize(&sess->mutex);
886 atomic_set(&sess->refcnt, 0);
887
888 return sess;
889}
890
891/** Wrapper for making IPC_M_CONNECT_ME_TO calls using the async framework.
892 *
893 * Ask through phone for a new connection to some service and block until
894 * success.
895 *
896 * @param exch Exchange for sending the message.
897 * @param iface Connection interface.
898 * @param arg2 User defined argument.
899 * @param arg3 User defined argument.
900 *
901 * @return New session on success or NULL on error.
902 *
903 */
904async_sess_t *async_connect_me_to_blocking_iface(async_exch_t *exch, iface_t iface,
905 sysarg_t arg2, sysarg_t arg3)
906{
907 if (exch == NULL) {
908 errno = ENOENT;
909 return NULL;
910 }
911
912 async_sess_t *sess = (async_sess_t *) malloc(sizeof(async_sess_t));
913 if (sess == NULL) {
914 errno = ENOMEM;
915 return NULL;
916 }
917
918 cap_phone_handle_t phone;
919 errno_t rc = async_connect_me_to_internal(exch->phone, iface, arg2,
920 arg3, IPC_FLAG_BLOCKING, &phone);
921 if (rc != EOK) {
922 errno = rc;
923 free(sess);
924 return NULL;
925 }
926
927 sess->iface = iface;
928 sess->phone = phone;
929 sess->arg1 = iface;
930 sess->arg2 = arg2;
931 sess->arg3 = arg3;
932
933 fibril_mutex_initialize(&sess->remote_state_mtx);
934 sess->remote_state_data = NULL;
935
936 list_initialize(&sess->exch_list);
937 fibril_mutex_initialize(&sess->mutex);
938 atomic_set(&sess->refcnt, 0);
939
940 return sess;
941}
942
943/** Connect to a task specified by id.
944 *
945 */
946async_sess_t *async_connect_kbox(task_id_t id)
947{
948 async_sess_t *sess = (async_sess_t *) malloc(sizeof(async_sess_t));
949 if (sess == NULL) {
950 errno = ENOMEM;
951 return NULL;
952 }
953
954 cap_phone_handle_t phone;
955 errno_t rc = ipc_connect_kbox(id, &phone);
956 if (rc != EOK) {
957 errno = rc;
958 free(sess);
959 return NULL;
960 }
961
962 sess->iface = 0;
963 sess->mgmt = EXCHANGE_ATOMIC;
964 sess->phone = phone;
965 sess->arg1 = 0;
966 sess->arg2 = 0;
967 sess->arg3 = 0;
968
969 fibril_mutex_initialize(&sess->remote_state_mtx);
970 sess->remote_state_data = NULL;
971
972 list_initialize(&sess->exch_list);
973 fibril_mutex_initialize(&sess->mutex);
974 atomic_set(&sess->refcnt, 0);
975
976 return sess;
977}
978
979static errno_t async_hangup_internal(cap_phone_handle_t phone)
980{
981 return ipc_hangup(phone);
982}
983
984/** Wrapper for ipc_hangup.
985 *
986 * @param sess Session to hung up.
987 *
988 * @return Zero on success or an error code.
989 *
990 */
991errno_t async_hangup(async_sess_t *sess)
992{
993 async_exch_t *exch;
994
995 assert(sess);
996
997 if (atomic_get(&sess->refcnt) > 0)
998 return EBUSY;
999
1000 fibril_mutex_lock(&async_sess_mutex);
1001
1002 errno_t rc = async_hangup_internal(sess->phone);
1003
1004 while (!list_empty(&sess->exch_list)) {
1005 exch = (async_exch_t *)
1006 list_get_instance(list_first(&sess->exch_list),
1007 async_exch_t, sess_link);
1008
1009 list_remove(&exch->sess_link);
1010 list_remove(&exch->global_link);
1011 async_hangup_internal(exch->phone);
1012 free(exch);
1013 }
1014
1015 free(sess);
1016
1017 fibril_mutex_unlock(&async_sess_mutex);
1018
1019 return rc;
1020}
1021
1022/** Start new exchange in a session.
1023 *
1024 * @param session Session.
1025 *
1026 * @return New exchange or NULL on error.
1027 *
1028 */
1029async_exch_t *async_exchange_begin(async_sess_t *sess)
1030{
1031 if (sess == NULL)
1032 return NULL;
1033
1034 exch_mgmt_t mgmt = sess->mgmt;
1035 if (sess->iface != 0)
1036 mgmt = sess->iface & IFACE_EXCHANGE_MASK;
1037
1038 async_exch_t *exch = NULL;
1039
1040 fibril_mutex_lock(&async_sess_mutex);
1041
1042 if (!list_empty(&sess->exch_list)) {
1043 /*
1044 * There are inactive exchanges in the session.
1045 */
1046 exch = (async_exch_t *)
1047 list_get_instance(list_first(&sess->exch_list),
1048 async_exch_t, sess_link);
1049
1050 list_remove(&exch->sess_link);
1051 list_remove(&exch->global_link);
1052 } else {
1053 /*
1054 * There are no available exchanges in the session.
1055 */
1056
1057 if ((mgmt == EXCHANGE_ATOMIC) ||
1058 (mgmt == EXCHANGE_SERIALIZE)) {
1059 exch = (async_exch_t *) malloc(sizeof(async_exch_t));
1060 if (exch != NULL) {
1061 link_initialize(&exch->sess_link);
1062 link_initialize(&exch->global_link);
1063 exch->sess = sess;
1064 exch->phone = sess->phone;
1065 }
1066 } else if (mgmt == EXCHANGE_PARALLEL) {
1067 cap_phone_handle_t phone;
1068 errno_t rc;
1069
1070 retry:
1071 /*
1072 * Make a one-time attempt to connect a new data phone.
1073 */
1074 rc = async_connect_me_to_internal(sess->phone, sess->arg1,
1075 sess->arg2, sess->arg3, 0, &phone);
1076 if (rc == EOK) {
1077 exch = (async_exch_t *) malloc(sizeof(async_exch_t));
1078 if (exch != NULL) {
1079 link_initialize(&exch->sess_link);
1080 link_initialize(&exch->global_link);
1081 exch->sess = sess;
1082 exch->phone = phone;
1083 } else
1084 async_hangup_internal(phone);
1085 } else if (!list_empty(&inactive_exch_list)) {
1086 /*
1087 * We did not manage to connect a new phone. But we
1088 * can try to close some of the currently inactive
1089 * connections in other sessions and try again.
1090 */
1091 exch = (async_exch_t *)
1092 list_get_instance(list_first(&inactive_exch_list),
1093 async_exch_t, global_link);
1094
1095 list_remove(&exch->sess_link);
1096 list_remove(&exch->global_link);
1097 async_hangup_internal(exch->phone);
1098 free(exch);
1099 goto retry;
1100 } else {
1101 /*
1102 * Wait for a phone to become available.
1103 */
1104 fibril_condvar_wait(&avail_phone_cv, &async_sess_mutex);
1105 goto retry;
1106 }
1107 }
1108 }
1109
1110 fibril_mutex_unlock(&async_sess_mutex);
1111
1112 if (exch != NULL) {
1113 atomic_inc(&sess->refcnt);
1114
1115 if (mgmt == EXCHANGE_SERIALIZE)
1116 fibril_mutex_lock(&sess->mutex);
1117 }
1118
1119 return exch;
1120}
1121
1122/** Finish an exchange.
1123 *
1124 * @param exch Exchange to finish.
1125 *
1126 */
1127void async_exchange_end(async_exch_t *exch)
1128{
1129 if (exch == NULL)
1130 return;
1131
1132 async_sess_t *sess = exch->sess;
1133 assert(sess != NULL);
1134
1135 exch_mgmt_t mgmt = sess->mgmt;
1136 if (sess->iface != 0)
1137 mgmt = sess->iface & IFACE_EXCHANGE_MASK;
1138
1139 atomic_dec(&sess->refcnt);
1140
1141 if (mgmt == EXCHANGE_SERIALIZE)
1142 fibril_mutex_unlock(&sess->mutex);
1143
1144 fibril_mutex_lock(&async_sess_mutex);
1145
1146 list_append(&exch->sess_link, &sess->exch_list);
1147 list_append(&exch->global_link, &inactive_exch_list);
1148 fibril_condvar_signal(&avail_phone_cv);
1149
1150 fibril_mutex_unlock(&async_sess_mutex);
1151}
1152
1153/** Wrapper for IPC_M_SHARE_IN calls using the async framework.
1154 *
1155 * @param exch Exchange for sending the message.
1156 * @param size Size of the destination address space area.
1157 * @param arg User defined argument.
1158 * @param flags Storage for the received flags. Can be NULL.
1159 * @param dst Address of the storage for the destination address space area
1160 * base address. Cannot be NULL.
1161 *
1162 * @return Zero on success or an error code from errno.h.
1163 *
1164 */
1165errno_t async_share_in_start(async_exch_t *exch, size_t size, sysarg_t arg,
1166 unsigned int *flags, void **dst)
1167{
1168 if (exch == NULL)
1169 return ENOENT;
1170
1171 sysarg_t _flags = 0;
1172 sysarg_t _dst = (sysarg_t) -1;
1173 errno_t res = async_req_2_4(exch, IPC_M_SHARE_IN, (sysarg_t) size,
1174 arg, NULL, &_flags, NULL, &_dst);
1175
1176 if (flags)
1177 *flags = (unsigned int) _flags;
1178
1179 *dst = (void *) _dst;
1180 return res;
1181}
1182
1183/** Wrapper for IPC_M_SHARE_OUT calls using the async framework.
1184 *
1185 * @param exch Exchange for sending the message.
1186 * @param src Source address space area base address.
1187 * @param flags Flags to be used for sharing. Bits can be only cleared.
1188 *
1189 * @return Zero on success or an error code from errno.h.
1190 *
1191 */
1192errno_t async_share_out_start(async_exch_t *exch, void *src, unsigned int flags)
1193{
1194 if (exch == NULL)
1195 return ENOENT;
1196
1197 return async_req_3_0(exch, IPC_M_SHARE_OUT, (sysarg_t) src, 0,
1198 (sysarg_t) flags);
1199}
1200
1201/** Start IPC_M_DATA_READ using the async framework.
1202 *
1203 * @param exch Exchange for sending the message.
1204 * @param dst Address of the beginning of the destination buffer.
1205 * @param size Size of the destination buffer (in bytes).
1206 * @param dataptr Storage of call data (arg 2 holds actual data size).
1207 *
1208 * @return Hash of the sent message or 0 on error.
1209 *
1210 */
1211aid_t async_data_read(async_exch_t *exch, void *dst, size_t size,
1212 ipc_call_t *dataptr)
1213{
1214 return async_send_2(exch, IPC_M_DATA_READ, (sysarg_t) dst,
1215 (sysarg_t) size, dataptr);
1216}
1217
1218/** Wrapper for IPC_M_DATA_READ calls using the async framework.
1219 *
1220 * @param exch Exchange for sending the message.
1221 * @param dst Address of the beginning of the destination buffer.
1222 * @param size Size of the destination buffer.
1223 *
1224 * @return Zero on success or an error code from errno.h.
1225 *
1226 */
1227errno_t async_data_read_start(async_exch_t *exch, void *dst, size_t size)
1228{
1229 if (exch == NULL)
1230 return ENOENT;
1231
1232 return async_req_2_0(exch, IPC_M_DATA_READ, (sysarg_t) dst,
1233 (sysarg_t) size);
1234}
1235
1236/** Wrapper for IPC_M_DATA_WRITE calls using the async framework.
1237 *
1238 * @param exch Exchange for sending the message.
1239 * @param src Address of the beginning of the source buffer.
1240 * @param size Size of the source buffer.
1241 *
1242 * @return Zero on success or an error code from errno.h.
1243 *
1244 */
1245errno_t async_data_write_start(async_exch_t *exch, const void *src, size_t size)
1246{
1247 if (exch == NULL)
1248 return ENOENT;
1249
1250 return async_req_2_0(exch, IPC_M_DATA_WRITE, (sysarg_t) src,
1251 (sysarg_t) size);
1252}
1253
1254errno_t async_state_change_start(async_exch_t *exch, sysarg_t arg1, sysarg_t arg2,
1255 sysarg_t arg3, async_exch_t *other_exch)
1256{
1257 return async_req_5_0(exch, IPC_M_STATE_CHANGE_AUTHORIZE,
1258 arg1, arg2, arg3, 0, CAP_HANDLE_RAW(other_exch->phone));
1259}
1260
1261/** Lock and get session remote state
1262 *
1263 * Lock and get the local replica of the remote state
1264 * in stateful sessions. The call should be paired
1265 * with async_remote_state_release*().
1266 *
1267 * @param[in] sess Stateful session.
1268 *
1269 * @return Local replica of the remote state.
1270 *
1271 */
1272void *async_remote_state_acquire(async_sess_t *sess)
1273{
1274 fibril_mutex_lock(&sess->remote_state_mtx);
1275 return sess->remote_state_data;
1276}
1277
1278/** Update the session remote state
1279 *
1280 * Update the local replica of the remote state
1281 * in stateful sessions. The remote state must
1282 * be already locked.
1283 *
1284 * @param[in] sess Stateful session.
1285 * @param[in] state New local replica of the remote state.
1286 *
1287 */
1288void async_remote_state_update(async_sess_t *sess, void *state)
1289{
1290 assert(fibril_mutex_is_locked(&sess->remote_state_mtx));
1291 sess->remote_state_data = state;
1292}
1293
1294/** Release the session remote state
1295 *
1296 * Unlock the local replica of the remote state
1297 * in stateful sessions.
1298 *
1299 * @param[in] sess Stateful session.
1300 *
1301 */
1302void async_remote_state_release(async_sess_t *sess)
1303{
1304 assert(fibril_mutex_is_locked(&sess->remote_state_mtx));
1305
1306 fibril_mutex_unlock(&sess->remote_state_mtx);
1307}
1308
1309/** Release the session remote state and end an exchange
1310 *
1311 * Unlock the local replica of the remote state
1312 * in stateful sessions. This is convenience function
1313 * which gets the session pointer from the exchange
1314 * and also ends the exchange.
1315 *
1316 * @param[in] exch Stateful session's exchange.
1317 *
1318 */
1319void async_remote_state_release_exchange(async_exch_t *exch)
1320{
1321 if (exch == NULL)
1322 return;
1323
1324 async_sess_t *sess = exch->sess;
1325 assert(fibril_mutex_is_locked(&sess->remote_state_mtx));
1326
1327 async_exchange_end(exch);
1328 fibril_mutex_unlock(&sess->remote_state_mtx);
1329}
1330
1331void *async_as_area_create(void *base, size_t size, unsigned int flags,
1332 async_sess_t *pager, sysarg_t id1, sysarg_t id2, sysarg_t id3)
1333{
1334 as_area_pager_info_t pager_info = {
1335 .pager = pager->phone,
1336 .id1 = id1,
1337 .id2 = id2,
1338 .id3 = id3
1339 };
1340 return as_area_create(base, size, flags, &pager_info);
1341}
1342
1343/** @}
1344 */
Note: See TracBrowser for help on using the repository browser.