source: mainline/uspace/lib/c/generic/async/client.c@ 95838f1

lfn serial ticket/834-toolchain-update topic/msim-upgrade topic/simplify-dev-export
Last change on this file since 95838f1 was 95838f1, checked in by Jiří Zárevúcky <jiri.zarevucky@…>, 7 years ago

Switch async_futex to using futex_lock/unlock.

  • Property mode set to 100644
File size: 32.3 KB
Line 
1/*
2 * Copyright (c) 2006 Ondrej Palkovsky
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 *
9 * - Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 * - Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
14 * - The name of the author may not be used to endorse or promote products
15 * derived from this software without specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
18 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
19 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
20 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
21 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
22 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
26 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27 */
28
29/** @addtogroup libc
30 * @{
31 */
32/** @file
33 */
34
35/**
36 * Asynchronous library
37 *
38 * The aim of this library is to provide a facility for writing programs which
39 * utilize the asynchronous nature of HelenOS IPC, yet using a normal way of
40 * programming.
41 *
42 * You should be able to write very simple multithreaded programs. The async
43 * framework will automatically take care of most of the synchronization
44 * problems.
45 *
46 * Example of use (pseudo C):
47 *
48 * 1) Multithreaded client application
49 *
50 * fibril_create(fibril1, ...);
51 * fibril_create(fibril2, ...);
52 * ...
53 *
54 * int fibril1(void *arg)
55 * {
56 * conn = async_connect_me_to(...);
57 *
58 * exch = async_exchange_begin(conn);
59 * c1 = async_send(exch);
60 * async_exchange_end(exch);
61 *
62 * exch = async_exchange_begin(conn);
63 * c2 = async_send(exch);
64 * async_exchange_end(exch);
65 *
66 * async_wait_for(c1);
67 * async_wait_for(c2);
68 * ...
69 * }
70 *
71 *
72 * 2) Multithreaded server application
73 *
74 * main()
75 * {
76 * async_manager();
77 * }
78 *
79 * port_handler(ichandle, *icall)
80 * {
81 * if (want_refuse) {
82 * async_answer_0(ichandle, ELIMIT);
83 * return;
84 * }
85 * async_answer_0(ichandle, EOK);
86 *
87 * chandle = async_get_call(&call);
88 * somehow_handle_the_call(chandle, call);
89 * async_answer_2(chandle, 1, 2, 3);
90 *
91 * chandle = async_get_call(&call);
92 * ...
93 * }
94 *
95 */
96
97#define LIBC_ASYNC_C_
98#include <ipc/ipc.h>
99#include <async.h>
100#include "../private/async.h"
101#include "../private/ns.h"
102#undef LIBC_ASYNC_C_
103
104#include <ipc/irq.h>
105#include <ipc/event.h>
106#include <futex.h>
107#include <fibril.h>
108#include <adt/hash_table.h>
109#include <adt/hash.h>
110#include <adt/list.h>
111#include <assert.h>
112#include <errno.h>
113#include <sys/time.h>
114#include <libarch/barrier.h>
115#include <stdbool.h>
116#include <stdlib.h>
117#include <mem.h>
118#include <stdlib.h>
119#include <macros.h>
120#include <as.h>
121#include <abi/mm/as.h>
122#include "../private/libc.h"
123#include "../private/fibril.h"
124
125/** Naming service session */
126async_sess_t session_ns;
127
128/** Message data */
129typedef struct {
130 awaiter_t wdata;
131
132 /** If reply was received. */
133 bool done;
134
135 /** If the message / reply should be discarded on arrival. */
136 bool forget;
137
138 /** If already destroyed. */
139 bool destroyed;
140
141 /** Pointer to where the answer data is stored. */
142 ipc_call_t *dataptr;
143
144 errno_t retval;
145} amsg_t;
146
147static void to_event_initialize(to_event_t *to)
148{
149 struct timeval tv = { 0, 0 };
150
151 to->inlist = false;
152 to->occurred = false;
153 link_initialize(&to->link);
154 to->expires = tv;
155}
156
157static void wu_event_initialize(wu_event_t *wu)
158{
159 wu->inlist = false;
160 link_initialize(&wu->link);
161}
162
163void awaiter_initialize(awaiter_t *aw)
164{
165 aw->fid = 0;
166 aw->active = false;
167 to_event_initialize(&aw->to_event);
168 wu_event_initialize(&aw->wu_event);
169}
170
171static amsg_t *amsg_create(void)
172{
173 amsg_t *msg = malloc(sizeof(amsg_t));
174 if (msg) {
175 msg->done = false;
176 msg->forget = false;
177 msg->destroyed = false;
178 msg->dataptr = NULL;
179 msg->retval = EINVAL;
180 awaiter_initialize(&msg->wdata);
181 }
182
183 return msg;
184}
185
186static void amsg_destroy(amsg_t *msg)
187{
188 assert(!msg->destroyed);
189 msg->destroyed = true;
190 free(msg);
191}
192
193
194/** Mutex protecting inactive_exch_list and avail_phone_cv.
195 *
196 */
197static FIBRIL_MUTEX_INITIALIZE(async_sess_mutex);
198
199/** List of all currently inactive exchanges.
200 *
201 */
202static LIST_INITIALIZE(inactive_exch_list);
203
204/** Condition variable to wait for a phone to become available.
205 *
206 */
207static FIBRIL_CONDVAR_INITIALIZE(avail_phone_cv);
208
209/** Initialize the async framework.
210 *
211 */
212void __async_client_init(void)
213{
214 session_ns.iface = 0;
215 session_ns.mgmt = EXCHANGE_ATOMIC;
216 session_ns.phone = PHONE_NS;
217 session_ns.arg1 = 0;
218 session_ns.arg2 = 0;
219 session_ns.arg3 = 0;
220
221 fibril_mutex_initialize(&session_ns.remote_state_mtx);
222 session_ns.remote_state_data = NULL;
223
224 list_initialize(&session_ns.exch_list);
225 fibril_mutex_initialize(&session_ns.mutex);
226 atomic_set(&session_ns.refcnt, 0);
227}
228
229/** Reply received callback.
230 *
231 * This function is called whenever a reply for an asynchronous message sent out
232 * by the asynchronous framework is received.
233 *
234 * Notify the fibril which is waiting for this message that it has arrived.
235 *
236 * @param arg Pointer to the asynchronous message record.
237 * @param retval Value returned in the answer.
238 * @param data Call data of the answer.
239 *
240 */
241static void reply_received(void *arg, errno_t retval, ipc_call_t *data)
242{
243 assert(arg);
244
245 futex_lock(&async_futex);
246
247 amsg_t *msg = (amsg_t *) arg;
248 msg->retval = retval;
249
250 /* Copy data after futex_down, just in case the call was detached */
251 if ((msg->dataptr) && (data))
252 *msg->dataptr = *data;
253
254 write_barrier();
255
256 /* Remove message from timeout list */
257 if (msg->wdata.to_event.inlist)
258 list_remove(&msg->wdata.to_event.link);
259
260 msg->done = true;
261
262 if (msg->forget) {
263 assert(msg->wdata.active);
264 amsg_destroy(msg);
265 } else if (!msg->wdata.active) {
266 msg->wdata.active = true;
267 fibril_add_ready(msg->wdata.fid);
268 }
269
270 futex_unlock(&async_futex);
271}
272
273/** Send message and return id of the sent message.
274 *
275 * The return value can be used as input for async_wait() to wait for
276 * completion.
277 *
278 * @param exch Exchange for sending the message.
279 * @param imethod Service-defined interface and method.
280 * @param arg1 Service-defined payload argument.
281 * @param arg2 Service-defined payload argument.
282 * @param arg3 Service-defined payload argument.
283 * @param arg4 Service-defined payload argument.
284 * @param dataptr If non-NULL, storage where the reply data will be stored.
285 *
286 * @return Hash of the sent message or 0 on error.
287 *
288 */
289aid_t async_send_fast(async_exch_t *exch, sysarg_t imethod, sysarg_t arg1,
290 sysarg_t arg2, sysarg_t arg3, sysarg_t arg4, ipc_call_t *dataptr)
291{
292 if (exch == NULL)
293 return 0;
294
295 amsg_t *msg = amsg_create();
296 if (msg == NULL)
297 return 0;
298
299 msg->dataptr = dataptr;
300 msg->wdata.active = true;
301
302 ipc_call_async_4(exch->phone, imethod, arg1, arg2, arg3, arg4, msg,
303 reply_received);
304
305 return (aid_t) msg;
306}
307
308/** Send message and return id of the sent message
309 *
310 * The return value can be used as input for async_wait() to wait for
311 * completion.
312 *
313 * @param exch Exchange for sending the message.
314 * @param imethod Service-defined interface and method.
315 * @param arg1 Service-defined payload argument.
316 * @param arg2 Service-defined payload argument.
317 * @param arg3 Service-defined payload argument.
318 * @param arg4 Service-defined payload argument.
319 * @param arg5 Service-defined payload argument.
320 * @param dataptr If non-NULL, storage where the reply data will be
321 * stored.
322 *
323 * @return Hash of the sent message or 0 on error.
324 *
325 */
326aid_t async_send_slow(async_exch_t *exch, sysarg_t imethod, sysarg_t arg1,
327 sysarg_t arg2, sysarg_t arg3, sysarg_t arg4, sysarg_t arg5,
328 ipc_call_t *dataptr)
329{
330 if (exch == NULL)
331 return 0;
332
333 amsg_t *msg = amsg_create();
334 if (msg == NULL)
335 return 0;
336
337 msg->dataptr = dataptr;
338 msg->wdata.active = true;
339
340 ipc_call_async_5(exch->phone, imethod, arg1, arg2, arg3, arg4, arg5,
341 msg, reply_received);
342
343 return (aid_t) msg;
344}
345
346/** Wait for a message sent by the async framework.
347 *
348 * @param amsgid Hash of the message to wait for.
349 * @param retval Pointer to storage where the retval of the answer will
350 * be stored.
351 *
352 */
353void async_wait_for(aid_t amsgid, errno_t *retval)
354{
355 assert(amsgid);
356
357 amsg_t *msg = (amsg_t *) amsgid;
358
359 futex_lock(&async_futex);
360
361 assert(!msg->forget);
362 assert(!msg->destroyed);
363
364 if (msg->done) {
365 futex_unlock(&async_futex);
366 goto done;
367 }
368
369 msg->wdata.fid = fibril_get_id();
370 msg->wdata.active = false;
371 msg->wdata.to_event.inlist = false;
372
373 /* Leave the async_futex locked when entering this function */
374 fibril_switch(FIBRIL_TO_MANAGER);
375
376 /* Futex is up automatically after fibril_switch */
377
378done:
379 if (retval)
380 *retval = msg->retval;
381
382 amsg_destroy(msg);
383}
384
385/** Wait for a message sent by the async framework, timeout variant.
386 *
387 * If the wait times out, the caller may choose to either wait again by calling
388 * async_wait_for() or async_wait_timeout(), or forget the message via
389 * async_forget().
390 *
391 * @param amsgid Hash of the message to wait for.
392 * @param retval Pointer to storage where the retval of the answer will
393 * be stored.
394 * @param timeout Timeout in microseconds.
395 *
396 * @return Zero on success, ETIMEOUT if the timeout has expired.
397 *
398 */
399errno_t async_wait_timeout(aid_t amsgid, errno_t *retval, suseconds_t timeout)
400{
401 assert(amsgid);
402
403 amsg_t *msg = (amsg_t *) amsgid;
404
405 futex_lock(&async_futex);
406
407 assert(!msg->forget);
408 assert(!msg->destroyed);
409
410 if (msg->done) {
411 futex_unlock(&async_futex);
412 goto done;
413 }
414
415 /*
416 * Negative timeout is converted to zero timeout to avoid
417 * using tv_add with negative augmenter.
418 */
419 if (timeout < 0)
420 timeout = 0;
421
422 getuptime(&msg->wdata.to_event.expires);
423 tv_add_diff(&msg->wdata.to_event.expires, timeout);
424
425 /*
426 * Current fibril is inserted as waiting regardless of the
427 * "size" of the timeout.
428 *
429 * Checking for msg->done and immediately bailing out when
430 * timeout == 0 would mean that the manager fibril would never
431 * run (consider single threaded program).
432 * Thus the IPC answer would be never retrieved from the kernel.
433 *
434 * Notice that the actual delay would be very small because we
435 * - switch to manager fibril
436 * - the manager sees expired timeout
437 * - and thus adds us back to ready queue
438 * - manager switches back to some ready fibril
439 * (prior it, it checks for incoming IPC).
440 *
441 */
442 msg->wdata.fid = fibril_get_id();
443 msg->wdata.active = false;
444 async_insert_timeout(&msg->wdata);
445
446 /* Leave the async_futex locked when entering this function */
447 fibril_switch(FIBRIL_TO_MANAGER);
448
449 /* Futex is up automatically after fibril_switch */
450
451 if (!msg->done)
452 return ETIMEOUT;
453
454done:
455 if (retval)
456 *retval = msg->retval;
457
458 amsg_destroy(msg);
459
460 return 0;
461}
462
463/** Discard the message / reply on arrival.
464 *
465 * The message will be marked to be discarded once the reply arrives in
466 * reply_received(). It is not allowed to call async_wait_for() or
467 * async_wait_timeout() on this message after a call to this function.
468 *
469 * @param amsgid Hash of the message to forget.
470 */
471void async_forget(aid_t amsgid)
472{
473 amsg_t *msg = (amsg_t *) amsgid;
474
475 assert(msg);
476 assert(!msg->forget);
477 assert(!msg->destroyed);
478
479 futex_lock(&async_futex);
480
481 if (msg->done) {
482 amsg_destroy(msg);
483 } else {
484 msg->dataptr = NULL;
485 msg->forget = true;
486 }
487
488 futex_unlock(&async_futex);
489}
490
491/** Wait for specified time.
492 *
493 * The current fibril is suspended but the thread continues to execute.
494 *
495 * @param timeout Duration of the wait in microseconds.
496 *
497 */
498void async_usleep(suseconds_t timeout)
499{
500 awaiter_t awaiter;
501 awaiter_initialize(&awaiter);
502
503 awaiter.fid = fibril_get_id();
504
505 getuptime(&awaiter.to_event.expires);
506 tv_add_diff(&awaiter.to_event.expires, timeout);
507
508 futex_lock(&async_futex);
509
510 async_insert_timeout(&awaiter);
511
512 /* Leave the async_futex locked when entering this function */
513 fibril_switch(FIBRIL_TO_MANAGER);
514
515 /* Futex is up automatically after fibril_switch() */
516}
517
518/** Delay execution for the specified number of seconds
519 *
520 * @param sec Number of seconds to sleep
521 */
522void async_sleep(unsigned int sec)
523{
524 /*
525 * Sleep in 1000 second steps to support
526 * full argument range
527 */
528
529 while (sec > 0) {
530 unsigned int period = (sec > 1000) ? 1000 : sec;
531
532 async_usleep(period * 1000000);
533 sec -= period;
534 }
535}
536
537/** Pseudo-synchronous message sending - fast version.
538 *
539 * Send message asynchronously and return only after the reply arrives.
540 *
541 * This function can only transfer 4 register payload arguments. For
542 * transferring more arguments, see the slower async_req_slow().
543 *
544 * @param exch Exchange for sending the message.
545 * @param imethod Interface and method of the call.
546 * @param arg1 Service-defined payload argument.
547 * @param arg2 Service-defined payload argument.
548 * @param arg3 Service-defined payload argument.
549 * @param arg4 Service-defined payload argument.
550 * @param r1 If non-NULL, storage for the 1st reply argument.
551 * @param r2 If non-NULL, storage for the 2nd reply argument.
552 * @param r3 If non-NULL, storage for the 3rd reply argument.
553 * @param r4 If non-NULL, storage for the 4th reply argument.
554 * @param r5 If non-NULL, storage for the 5th reply argument.
555 *
556 * @return Return code of the reply or an error code.
557 *
558 */
559errno_t async_req_fast(async_exch_t *exch, sysarg_t imethod, sysarg_t arg1,
560 sysarg_t arg2, sysarg_t arg3, sysarg_t arg4, sysarg_t *r1, sysarg_t *r2,
561 sysarg_t *r3, sysarg_t *r4, sysarg_t *r5)
562{
563 if (exch == NULL)
564 return ENOENT;
565
566 ipc_call_t result;
567 aid_t aid = async_send_4(exch, imethod, arg1, arg2, arg3, arg4,
568 &result);
569
570 errno_t rc;
571 async_wait_for(aid, &rc);
572
573 if (r1)
574 *r1 = IPC_GET_ARG1(result);
575
576 if (r2)
577 *r2 = IPC_GET_ARG2(result);
578
579 if (r3)
580 *r3 = IPC_GET_ARG3(result);
581
582 if (r4)
583 *r4 = IPC_GET_ARG4(result);
584
585 if (r5)
586 *r5 = IPC_GET_ARG5(result);
587
588 return rc;
589}
590
591/** Pseudo-synchronous message sending - slow version.
592 *
593 * Send message asynchronously and return only after the reply arrives.
594 *
595 * @param exch Exchange for sending the message.
596 * @param imethod Interface and method of the call.
597 * @param arg1 Service-defined payload argument.
598 * @param arg2 Service-defined payload argument.
599 * @param arg3 Service-defined payload argument.
600 * @param arg4 Service-defined payload argument.
601 * @param arg5 Service-defined payload argument.
602 * @param r1 If non-NULL, storage for the 1st reply argument.
603 * @param r2 If non-NULL, storage for the 2nd reply argument.
604 * @param r3 If non-NULL, storage for the 3rd reply argument.
605 * @param r4 If non-NULL, storage for the 4th reply argument.
606 * @param r5 If non-NULL, storage for the 5th reply argument.
607 *
608 * @return Return code of the reply or an error code.
609 *
610 */
611errno_t async_req_slow(async_exch_t *exch, sysarg_t imethod, sysarg_t arg1,
612 sysarg_t arg2, sysarg_t arg3, sysarg_t arg4, sysarg_t arg5, sysarg_t *r1,
613 sysarg_t *r2, sysarg_t *r3, sysarg_t *r4, sysarg_t *r5)
614{
615 if (exch == NULL)
616 return ENOENT;
617
618 ipc_call_t result;
619 aid_t aid = async_send_5(exch, imethod, arg1, arg2, arg3, arg4, arg5,
620 &result);
621
622 errno_t rc;
623 async_wait_for(aid, &rc);
624
625 if (r1)
626 *r1 = IPC_GET_ARG1(result);
627
628 if (r2)
629 *r2 = IPC_GET_ARG2(result);
630
631 if (r3)
632 *r3 = IPC_GET_ARG3(result);
633
634 if (r4)
635 *r4 = IPC_GET_ARG4(result);
636
637 if (r5)
638 *r5 = IPC_GET_ARG5(result);
639
640 return rc;
641}
642
643void async_msg_0(async_exch_t *exch, sysarg_t imethod)
644{
645 if (exch != NULL)
646 ipc_call_async_0(exch->phone, imethod, NULL, NULL);
647}
648
649void async_msg_1(async_exch_t *exch, sysarg_t imethod, sysarg_t arg1)
650{
651 if (exch != NULL)
652 ipc_call_async_1(exch->phone, imethod, arg1, NULL, NULL);
653}
654
655void async_msg_2(async_exch_t *exch, sysarg_t imethod, sysarg_t arg1,
656 sysarg_t arg2)
657{
658 if (exch != NULL)
659 ipc_call_async_2(exch->phone, imethod, arg1, arg2, NULL, NULL);
660}
661
662void async_msg_3(async_exch_t *exch, sysarg_t imethod, sysarg_t arg1,
663 sysarg_t arg2, sysarg_t arg3)
664{
665 if (exch != NULL)
666 ipc_call_async_3(exch->phone, imethod, arg1, arg2, arg3, NULL,
667 NULL);
668}
669
670void async_msg_4(async_exch_t *exch, sysarg_t imethod, sysarg_t arg1,
671 sysarg_t arg2, sysarg_t arg3, sysarg_t arg4)
672{
673 if (exch != NULL)
674 ipc_call_async_4(exch->phone, imethod, arg1, arg2, arg3, arg4,
675 NULL, NULL);
676}
677
678void async_msg_5(async_exch_t *exch, sysarg_t imethod, sysarg_t arg1,
679 sysarg_t arg2, sysarg_t arg3, sysarg_t arg4, sysarg_t arg5)
680{
681 if (exch != NULL)
682 ipc_call_async_5(exch->phone, imethod, arg1, arg2, arg3, arg4,
683 arg5, NULL, NULL);
684}
685
686static errno_t async_connect_me_to_internal(cap_phone_handle_t phone,
687 sysarg_t arg1, sysarg_t arg2, sysarg_t arg3, sysarg_t arg4,
688 cap_phone_handle_t *out_phone)
689{
690 ipc_call_t result;
691
692 // XXX: Workaround for GCC's inability to infer association between
693 // rc == EOK and *out_phone being assigned.
694 *out_phone = CAP_NIL;
695
696 amsg_t *msg = amsg_create();
697 if (!msg)
698 return ENOENT;
699
700 msg->dataptr = &result;
701 msg->wdata.active = true;
702
703 ipc_call_async_4(phone, IPC_M_CONNECT_ME_TO, arg1, arg2, arg3, arg4,
704 msg, reply_received);
705
706 errno_t rc;
707 async_wait_for((aid_t) msg, &rc);
708
709 if (rc != EOK)
710 return rc;
711
712 *out_phone = (cap_phone_handle_t) IPC_GET_ARG5(result);
713 return EOK;
714}
715
716/** Wrapper for making IPC_M_CONNECT_ME_TO calls using the async framework.
717 *
718 * Ask through for a new connection to some service.
719 *
720 * @param mgmt Exchange management style.
721 * @param exch Exchange for sending the message.
722 * @param arg1 User defined argument.
723 * @param arg2 User defined argument.
724 * @param arg3 User defined argument.
725 *
726 * @return New session on success or NULL on error.
727 *
728 */
729async_sess_t *async_connect_me_to(exch_mgmt_t mgmt, async_exch_t *exch,
730 sysarg_t arg1, sysarg_t arg2, sysarg_t arg3)
731{
732 if (exch == NULL) {
733 errno = ENOENT;
734 return NULL;
735 }
736
737 async_sess_t *sess = (async_sess_t *) malloc(sizeof(async_sess_t));
738 if (sess == NULL) {
739 errno = ENOMEM;
740 return NULL;
741 }
742
743 cap_phone_handle_t phone;
744 errno_t rc = async_connect_me_to_internal(exch->phone, arg1, arg2, arg3,
745 0, &phone);
746 if (rc != EOK) {
747 errno = rc;
748 free(sess);
749 return NULL;
750 }
751
752 sess->iface = 0;
753 sess->mgmt = mgmt;
754 sess->phone = phone;
755 sess->arg1 = arg1;
756 sess->arg2 = arg2;
757 sess->arg3 = arg3;
758
759 fibril_mutex_initialize(&sess->remote_state_mtx);
760 sess->remote_state_data = NULL;
761
762 list_initialize(&sess->exch_list);
763 fibril_mutex_initialize(&sess->mutex);
764 atomic_set(&sess->refcnt, 0);
765
766 return sess;
767}
768
769/** Wrapper for making IPC_M_CONNECT_ME_TO calls using the async framework.
770 *
771 * Ask through phone for a new connection to some service and block until
772 * success.
773 *
774 * @param exch Exchange for sending the message.
775 * @param iface Connection interface.
776 * @param arg2 User defined argument.
777 * @param arg3 User defined argument.
778 *
779 * @return New session on success or NULL on error.
780 *
781 */
782async_sess_t *async_connect_me_to_iface(async_exch_t *exch, iface_t iface,
783 sysarg_t arg2, sysarg_t arg3)
784{
785 if (exch == NULL) {
786 errno = ENOENT;
787 return NULL;
788 }
789
790 async_sess_t *sess = (async_sess_t *) malloc(sizeof(async_sess_t));
791 if (sess == NULL) {
792 errno = ENOMEM;
793 return NULL;
794 }
795
796 cap_phone_handle_t phone;
797 errno_t rc = async_connect_me_to_internal(exch->phone, iface, arg2,
798 arg3, 0, &phone);
799 if (rc != EOK) {
800 errno = rc;
801 free(sess);
802 return NULL;
803 }
804
805 sess->iface = iface;
806 sess->phone = phone;
807 sess->arg1 = iface;
808 sess->arg2 = arg2;
809 sess->arg3 = arg3;
810
811 fibril_mutex_initialize(&sess->remote_state_mtx);
812 sess->remote_state_data = NULL;
813
814 list_initialize(&sess->exch_list);
815 fibril_mutex_initialize(&sess->mutex);
816 atomic_set(&sess->refcnt, 0);
817
818 return sess;
819}
820
821/** Set arguments for new connections.
822 *
823 * FIXME This is an ugly hack to work around the problem that parallel
824 * exchanges are implemented using parallel connections. When we create
825 * a callback session, the framework does not know arguments for the new
826 * connections.
827 *
828 * The proper solution seems to be to implement parallel exchanges using
829 * tagging.
830 */
831void async_sess_args_set(async_sess_t *sess, sysarg_t arg1, sysarg_t arg2,
832 sysarg_t arg3)
833{
834 sess->arg1 = arg1;
835 sess->arg2 = arg2;
836 sess->arg3 = arg3;
837}
838
839/** Wrapper for making IPC_M_CONNECT_ME_TO calls using the async framework.
840 *
841 * Ask through phone for a new connection to some service and block until
842 * success.
843 *
844 * @param mgmt Exchange management style.
845 * @param exch Exchange for sending the message.
846 * @param arg1 User defined argument.
847 * @param arg2 User defined argument.
848 * @param arg3 User defined argument.
849 *
850 * @return New session on success or NULL on error.
851 *
852 */
853async_sess_t *async_connect_me_to_blocking(exch_mgmt_t mgmt, async_exch_t *exch,
854 sysarg_t arg1, sysarg_t arg2, sysarg_t arg3)
855{
856 if (exch == NULL) {
857 errno = ENOENT;
858 return NULL;
859 }
860
861 async_sess_t *sess = (async_sess_t *) malloc(sizeof(async_sess_t));
862 if (sess == NULL) {
863 errno = ENOMEM;
864 return NULL;
865 }
866
867 cap_phone_handle_t phone;
868 errno_t rc = async_connect_me_to_internal(exch->phone, arg1, arg2, arg3,
869 IPC_FLAG_BLOCKING, &phone);
870
871 if (rc != EOK) {
872 errno = rc;
873 free(sess);
874 return NULL;
875 }
876
877 sess->iface = 0;
878 sess->mgmt = mgmt;
879 sess->phone = phone;
880 sess->arg1 = arg1;
881 sess->arg2 = arg2;
882 sess->arg3 = arg3;
883
884 fibril_mutex_initialize(&sess->remote_state_mtx);
885 sess->remote_state_data = NULL;
886
887 list_initialize(&sess->exch_list);
888 fibril_mutex_initialize(&sess->mutex);
889 atomic_set(&sess->refcnt, 0);
890
891 return sess;
892}
893
894/** Wrapper for making IPC_M_CONNECT_ME_TO calls using the async framework.
895 *
896 * Ask through phone for a new connection to some service and block until
897 * success.
898 *
899 * @param exch Exchange for sending the message.
900 * @param iface Connection interface.
901 * @param arg2 User defined argument.
902 * @param arg3 User defined argument.
903 *
904 * @return New session on success or NULL on error.
905 *
906 */
907async_sess_t *async_connect_me_to_blocking_iface(async_exch_t *exch, iface_t iface,
908 sysarg_t arg2, sysarg_t arg3)
909{
910 if (exch == NULL) {
911 errno = ENOENT;
912 return NULL;
913 }
914
915 async_sess_t *sess = (async_sess_t *) malloc(sizeof(async_sess_t));
916 if (sess == NULL) {
917 errno = ENOMEM;
918 return NULL;
919 }
920
921 cap_phone_handle_t phone;
922 errno_t rc = async_connect_me_to_internal(exch->phone, iface, arg2,
923 arg3, IPC_FLAG_BLOCKING, &phone);
924 if (rc != EOK) {
925 errno = rc;
926 free(sess);
927 return NULL;
928 }
929
930 sess->iface = iface;
931 sess->phone = phone;
932 sess->arg1 = iface;
933 sess->arg2 = arg2;
934 sess->arg3 = arg3;
935
936 fibril_mutex_initialize(&sess->remote_state_mtx);
937 sess->remote_state_data = NULL;
938
939 list_initialize(&sess->exch_list);
940 fibril_mutex_initialize(&sess->mutex);
941 atomic_set(&sess->refcnt, 0);
942
943 return sess;
944}
945
946/** Connect to a task specified by id.
947 *
948 */
949async_sess_t *async_connect_kbox(task_id_t id)
950{
951 async_sess_t *sess = (async_sess_t *) malloc(sizeof(async_sess_t));
952 if (sess == NULL) {
953 errno = ENOMEM;
954 return NULL;
955 }
956
957 cap_phone_handle_t phone;
958 errno_t rc = ipc_connect_kbox(id, &phone);
959 if (rc != EOK) {
960 errno = rc;
961 free(sess);
962 return NULL;
963 }
964
965 sess->iface = 0;
966 sess->mgmt = EXCHANGE_ATOMIC;
967 sess->phone = phone;
968 sess->arg1 = 0;
969 sess->arg2 = 0;
970 sess->arg3 = 0;
971
972 fibril_mutex_initialize(&sess->remote_state_mtx);
973 sess->remote_state_data = NULL;
974
975 list_initialize(&sess->exch_list);
976 fibril_mutex_initialize(&sess->mutex);
977 atomic_set(&sess->refcnt, 0);
978
979 return sess;
980}
981
982static errno_t async_hangup_internal(cap_phone_handle_t phone)
983{
984 return ipc_hangup(phone);
985}
986
987/** Wrapper for ipc_hangup.
988 *
989 * @param sess Session to hung up.
990 *
991 * @return Zero on success or an error code.
992 *
993 */
994errno_t async_hangup(async_sess_t *sess)
995{
996 async_exch_t *exch;
997
998 assert(sess);
999
1000 if (atomic_get(&sess->refcnt) > 0)
1001 return EBUSY;
1002
1003 fibril_mutex_lock(&async_sess_mutex);
1004
1005 errno_t rc = async_hangup_internal(sess->phone);
1006
1007 while (!list_empty(&sess->exch_list)) {
1008 exch = (async_exch_t *)
1009 list_get_instance(list_first(&sess->exch_list),
1010 async_exch_t, sess_link);
1011
1012 list_remove(&exch->sess_link);
1013 list_remove(&exch->global_link);
1014 async_hangup_internal(exch->phone);
1015 free(exch);
1016 }
1017
1018 free(sess);
1019
1020 fibril_mutex_unlock(&async_sess_mutex);
1021
1022 return rc;
1023}
1024
1025/** Start new exchange in a session.
1026 *
1027 * @param session Session.
1028 *
1029 * @return New exchange or NULL on error.
1030 *
1031 */
1032async_exch_t *async_exchange_begin(async_sess_t *sess)
1033{
1034 if (sess == NULL)
1035 return NULL;
1036
1037 exch_mgmt_t mgmt = sess->mgmt;
1038 if (sess->iface != 0)
1039 mgmt = sess->iface & IFACE_EXCHANGE_MASK;
1040
1041 async_exch_t *exch = NULL;
1042
1043 fibril_mutex_lock(&async_sess_mutex);
1044
1045 if (!list_empty(&sess->exch_list)) {
1046 /*
1047 * There are inactive exchanges in the session.
1048 */
1049 exch = (async_exch_t *)
1050 list_get_instance(list_first(&sess->exch_list),
1051 async_exch_t, sess_link);
1052
1053 list_remove(&exch->sess_link);
1054 list_remove(&exch->global_link);
1055 } else {
1056 /*
1057 * There are no available exchanges in the session.
1058 */
1059
1060 if ((mgmt == EXCHANGE_ATOMIC) ||
1061 (mgmt == EXCHANGE_SERIALIZE)) {
1062 exch = (async_exch_t *) malloc(sizeof(async_exch_t));
1063 if (exch != NULL) {
1064 link_initialize(&exch->sess_link);
1065 link_initialize(&exch->global_link);
1066 exch->sess = sess;
1067 exch->phone = sess->phone;
1068 }
1069 } else if (mgmt == EXCHANGE_PARALLEL) {
1070 cap_phone_handle_t phone;
1071 errno_t rc;
1072
1073 retry:
1074 /*
1075 * Make a one-time attempt to connect a new data phone.
1076 */
1077 rc = async_connect_me_to_internal(sess->phone, sess->arg1,
1078 sess->arg2, sess->arg3, 0, &phone);
1079 if (rc == EOK) {
1080 exch = (async_exch_t *) malloc(sizeof(async_exch_t));
1081 if (exch != NULL) {
1082 link_initialize(&exch->sess_link);
1083 link_initialize(&exch->global_link);
1084 exch->sess = sess;
1085 exch->phone = phone;
1086 } else
1087 async_hangup_internal(phone);
1088 } else if (!list_empty(&inactive_exch_list)) {
1089 /*
1090 * We did not manage to connect a new phone. But we
1091 * can try to close some of the currently inactive
1092 * connections in other sessions and try again.
1093 */
1094 exch = (async_exch_t *)
1095 list_get_instance(list_first(&inactive_exch_list),
1096 async_exch_t, global_link);
1097
1098 list_remove(&exch->sess_link);
1099 list_remove(&exch->global_link);
1100 async_hangup_internal(exch->phone);
1101 free(exch);
1102 goto retry;
1103 } else {
1104 /*
1105 * Wait for a phone to become available.
1106 */
1107 fibril_condvar_wait(&avail_phone_cv, &async_sess_mutex);
1108 goto retry;
1109 }
1110 }
1111 }
1112
1113 fibril_mutex_unlock(&async_sess_mutex);
1114
1115 if (exch != NULL) {
1116 atomic_inc(&sess->refcnt);
1117
1118 if (mgmt == EXCHANGE_SERIALIZE)
1119 fibril_mutex_lock(&sess->mutex);
1120 }
1121
1122 return exch;
1123}
1124
1125/** Finish an exchange.
1126 *
1127 * @param exch Exchange to finish.
1128 *
1129 */
1130void async_exchange_end(async_exch_t *exch)
1131{
1132 if (exch == NULL)
1133 return;
1134
1135 async_sess_t *sess = exch->sess;
1136 assert(sess != NULL);
1137
1138 exch_mgmt_t mgmt = sess->mgmt;
1139 if (sess->iface != 0)
1140 mgmt = sess->iface & IFACE_EXCHANGE_MASK;
1141
1142 atomic_dec(&sess->refcnt);
1143
1144 if (mgmt == EXCHANGE_SERIALIZE)
1145 fibril_mutex_unlock(&sess->mutex);
1146
1147 fibril_mutex_lock(&async_sess_mutex);
1148
1149 list_append(&exch->sess_link, &sess->exch_list);
1150 list_append(&exch->global_link, &inactive_exch_list);
1151 fibril_condvar_signal(&avail_phone_cv);
1152
1153 fibril_mutex_unlock(&async_sess_mutex);
1154}
1155
1156/** Wrapper for IPC_M_SHARE_IN calls using the async framework.
1157 *
1158 * @param exch Exchange for sending the message.
1159 * @param size Size of the destination address space area.
1160 * @param arg User defined argument.
1161 * @param flags Storage for the received flags. Can be NULL.
1162 * @param dst Address of the storage for the destination address space area
1163 * base address. Cannot be NULL.
1164 *
1165 * @return Zero on success or an error code from errno.h.
1166 *
1167 */
1168errno_t async_share_in_start(async_exch_t *exch, size_t size, sysarg_t arg,
1169 unsigned int *flags, void **dst)
1170{
1171 if (exch == NULL)
1172 return ENOENT;
1173
1174 sysarg_t _flags = 0;
1175 sysarg_t _dst = (sysarg_t) -1;
1176 errno_t res = async_req_2_4(exch, IPC_M_SHARE_IN, (sysarg_t) size,
1177 arg, NULL, &_flags, NULL, &_dst);
1178
1179 if (flags)
1180 *flags = (unsigned int) _flags;
1181
1182 *dst = (void *) _dst;
1183 return res;
1184}
1185
1186/** Wrapper for IPC_M_SHARE_OUT calls using the async framework.
1187 *
1188 * @param exch Exchange for sending the message.
1189 * @param src Source address space area base address.
1190 * @param flags Flags to be used for sharing. Bits can be only cleared.
1191 *
1192 * @return Zero on success or an error code from errno.h.
1193 *
1194 */
1195errno_t async_share_out_start(async_exch_t *exch, void *src, unsigned int flags)
1196{
1197 if (exch == NULL)
1198 return ENOENT;
1199
1200 return async_req_3_0(exch, IPC_M_SHARE_OUT, (sysarg_t) src, 0,
1201 (sysarg_t) flags);
1202}
1203
1204/** Start IPC_M_DATA_READ using the async framework.
1205 *
1206 * @param exch Exchange for sending the message.
1207 * @param dst Address of the beginning of the destination buffer.
1208 * @param size Size of the destination buffer (in bytes).
1209 * @param dataptr Storage of call data (arg 2 holds actual data size).
1210 *
1211 * @return Hash of the sent message or 0 on error.
1212 *
1213 */
1214aid_t async_data_read(async_exch_t *exch, void *dst, size_t size,
1215 ipc_call_t *dataptr)
1216{
1217 return async_send_2(exch, IPC_M_DATA_READ, (sysarg_t) dst,
1218 (sysarg_t) size, dataptr);
1219}
1220
1221/** Wrapper for IPC_M_DATA_READ calls using the async framework.
1222 *
1223 * @param exch Exchange for sending the message.
1224 * @param dst Address of the beginning of the destination buffer.
1225 * @param size Size of the destination buffer.
1226 *
1227 * @return Zero on success or an error code from errno.h.
1228 *
1229 */
1230errno_t async_data_read_start(async_exch_t *exch, void *dst, size_t size)
1231{
1232 if (exch == NULL)
1233 return ENOENT;
1234
1235 return async_req_2_0(exch, IPC_M_DATA_READ, (sysarg_t) dst,
1236 (sysarg_t) size);
1237}
1238
1239/** Wrapper for IPC_M_DATA_WRITE calls using the async framework.
1240 *
1241 * @param exch Exchange for sending the message.
1242 * @param src Address of the beginning of the source buffer.
1243 * @param size Size of the source buffer.
1244 *
1245 * @return Zero on success or an error code from errno.h.
1246 *
1247 */
1248errno_t async_data_write_start(async_exch_t *exch, const void *src, size_t size)
1249{
1250 if (exch == NULL)
1251 return ENOENT;
1252
1253 return async_req_2_0(exch, IPC_M_DATA_WRITE, (sysarg_t) src,
1254 (sysarg_t) size);
1255}
1256
1257errno_t async_state_change_start(async_exch_t *exch, sysarg_t arg1, sysarg_t arg2,
1258 sysarg_t arg3, async_exch_t *other_exch)
1259{
1260 return async_req_5_0(exch, IPC_M_STATE_CHANGE_AUTHORIZE,
1261 arg1, arg2, arg3, 0, CAP_HANDLE_RAW(other_exch->phone));
1262}
1263
1264/** Lock and get session remote state
1265 *
1266 * Lock and get the local replica of the remote state
1267 * in stateful sessions. The call should be paired
1268 * with async_remote_state_release*().
1269 *
1270 * @param[in] sess Stateful session.
1271 *
1272 * @return Local replica of the remote state.
1273 *
1274 */
1275void *async_remote_state_acquire(async_sess_t *sess)
1276{
1277 fibril_mutex_lock(&sess->remote_state_mtx);
1278 return sess->remote_state_data;
1279}
1280
1281/** Update the session remote state
1282 *
1283 * Update the local replica of the remote state
1284 * in stateful sessions. The remote state must
1285 * be already locked.
1286 *
1287 * @param[in] sess Stateful session.
1288 * @param[in] state New local replica of the remote state.
1289 *
1290 */
1291void async_remote_state_update(async_sess_t *sess, void *state)
1292{
1293 assert(fibril_mutex_is_locked(&sess->remote_state_mtx));
1294 sess->remote_state_data = state;
1295}
1296
1297/** Release the session remote state
1298 *
1299 * Unlock the local replica of the remote state
1300 * in stateful sessions.
1301 *
1302 * @param[in] sess Stateful session.
1303 *
1304 */
1305void async_remote_state_release(async_sess_t *sess)
1306{
1307 assert(fibril_mutex_is_locked(&sess->remote_state_mtx));
1308
1309 fibril_mutex_unlock(&sess->remote_state_mtx);
1310}
1311
1312/** Release the session remote state and end an exchange
1313 *
1314 * Unlock the local replica of the remote state
1315 * in stateful sessions. This is convenience function
1316 * which gets the session pointer from the exchange
1317 * and also ends the exchange.
1318 *
1319 * @param[in] exch Stateful session's exchange.
1320 *
1321 */
1322void async_remote_state_release_exchange(async_exch_t *exch)
1323{
1324 if (exch == NULL)
1325 return;
1326
1327 async_sess_t *sess = exch->sess;
1328 assert(fibril_mutex_is_locked(&sess->remote_state_mtx));
1329
1330 async_exchange_end(exch);
1331 fibril_mutex_unlock(&sess->remote_state_mtx);
1332}
1333
1334void *async_as_area_create(void *base, size_t size, unsigned int flags,
1335 async_sess_t *pager, sysarg_t id1, sysarg_t id2, sysarg_t id3)
1336{
1337 as_area_pager_info_t pager_info = {
1338 .pager = pager->phone,
1339 .id1 = id1,
1340 .id2 = id2,
1341 .id3 = id3
1342 };
1343 return as_area_create(base, size, flags, &pager_info);
1344}
1345
1346/** @}
1347 */
Note: See TracBrowser for help on using the repository browser.