source: mainline/uspace/lib/c/generic/async/client.c@ 5f97ef44

lfn serial ticket/834-toolchain-update topic/msim-upgrade topic/simplify-dev-export
Last change on this file since 5f97ef44 was 5f97ef44, checked in by Jiří Zárevúcky <jiri.zarevucky@…>, 7 years ago

Sleep is more natural as part of the fibril API.
(the implementation will move later)

  • Property mode set to 100644
File size: 30.1 KB
Line 
1/*
2 * Copyright (c) 2006 Ondrej Palkovsky
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 *
9 * - Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 * - Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
14 * - The name of the author may not be used to endorse or promote products
15 * derived from this software without specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
18 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
19 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
20 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
21 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
22 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
26 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27 */
28
29/** @addtogroup libc
30 * @{
31 */
32/** @file
33 */
34
35/**
36 * Asynchronous library
37 *
38 * The aim of this library is to provide a facility for writing programs which
39 * utilize the asynchronous nature of HelenOS IPC, yet using a normal way of
40 * programming.
41 *
42 * You should be able to write very simple multithreaded programs. The async
43 * framework will automatically take care of most of the synchronization
44 * problems.
45 *
46 * Example of use (pseudo C):
47 *
48 * 1) Multithreaded client application
49 *
50 * fibril_create(fibril1, ...);
51 * fibril_create(fibril2, ...);
52 * ...
53 *
54 * int fibril1(void *arg)
55 * {
56 * conn = async_connect_me_to(...);
57 *
58 * exch = async_exchange_begin(conn);
59 * c1 = async_send(exch);
60 * async_exchange_end(exch);
61 *
62 * exch = async_exchange_begin(conn);
63 * c2 = async_send(exch);
64 * async_exchange_end(exch);
65 *
66 * async_wait_for(c1);
67 * async_wait_for(c2);
68 * ...
69 * }
70 *
71 *
72 * 2) Multithreaded server application
73 *
74 * main()
75 * {
76 * async_manager();
77 * }
78 *
79 * port_handler(ichandle, *icall)
80 * {
81 * if (want_refuse) {
82 * async_answer_0(ichandle, ELIMIT);
83 * return;
84 * }
85 * async_answer_0(ichandle, EOK);
86 *
87 * chandle = async_get_call(&call);
88 * somehow_handle_the_call(chandle, call);
89 * async_answer_2(chandle, 1, 2, 3);
90 *
91 * chandle = async_get_call(&call);
92 * ...
93 * }
94 *
95 */
96
97#define LIBC_ASYNC_C_
98#include <ipc/ipc.h>
99#include <async.h>
100#include "../private/async.h"
101#include "../private/ns.h"
102#undef LIBC_ASYNC_C_
103
104#include <ipc/irq.h>
105#include <ipc/event.h>
106#include <futex.h>
107#include <fibril.h>
108#include <adt/hash_table.h>
109#include <adt/hash.h>
110#include <adt/list.h>
111#include <assert.h>
112#include <errno.h>
113#include <sys/time.h>
114#include <libarch/barrier.h>
115#include <stdbool.h>
116#include <stdlib.h>
117#include <mem.h>
118#include <stdlib.h>
119#include <macros.h>
120#include <as.h>
121#include <abi/mm/as.h>
122#include "../private/libc.h"
123#include "../private/fibril.h"
124
125/** Naming service session */
126async_sess_t session_ns;
127
128/** Message data */
129typedef struct {
130 awaiter_t wdata;
131
132 /** If reply was received. */
133 bool done;
134
135 /** If the message / reply should be discarded on arrival. */
136 bool forget;
137
138 /** If already destroyed. */
139 bool destroyed;
140
141 /** Pointer to where the answer data is stored. */
142 ipc_call_t *dataptr;
143
144 errno_t retval;
145} amsg_t;
146
147static void to_event_initialize(to_event_t *to)
148{
149 struct timeval tv = { 0, 0 };
150
151 to->inlist = false;
152 to->occurred = false;
153 link_initialize(&to->link);
154 to->expires = tv;
155}
156
157static void wu_event_initialize(wu_event_t *wu)
158{
159 wu->inlist = false;
160 link_initialize(&wu->link);
161}
162
163void awaiter_initialize(awaiter_t *aw)
164{
165 aw->fid = 0;
166 aw->active = false;
167 to_event_initialize(&aw->to_event);
168 wu_event_initialize(&aw->wu_event);
169}
170
171static amsg_t *amsg_create(void)
172{
173 amsg_t *msg = malloc(sizeof(amsg_t));
174 if (msg) {
175 msg->done = false;
176 msg->forget = false;
177 msg->destroyed = false;
178 msg->dataptr = NULL;
179 msg->retval = EINVAL;
180 awaiter_initialize(&msg->wdata);
181 }
182
183 return msg;
184}
185
186static void amsg_destroy(amsg_t *msg)
187{
188 if (!msg)
189 return;
190
191 assert(!msg->destroyed);
192 msg->destroyed = true;
193 free(msg);
194}
195
196/** Mutex protecting inactive_exch_list and avail_phone_cv.
197 *
198 */
199static FIBRIL_MUTEX_INITIALIZE(async_sess_mutex);
200
201/** List of all currently inactive exchanges.
202 *
203 */
204static LIST_INITIALIZE(inactive_exch_list);
205
206/** Condition variable to wait for a phone to become available.
207 *
208 */
209static FIBRIL_CONDVAR_INITIALIZE(avail_phone_cv);
210
211/** Initialize the async framework.
212 *
213 */
214void __async_client_init(void)
215{
216 session_ns.iface = 0;
217 session_ns.mgmt = EXCHANGE_ATOMIC;
218 session_ns.phone = PHONE_NS;
219 session_ns.arg1 = 0;
220 session_ns.arg2 = 0;
221 session_ns.arg3 = 0;
222
223 fibril_mutex_initialize(&session_ns.remote_state_mtx);
224 session_ns.remote_state_data = NULL;
225
226 list_initialize(&session_ns.exch_list);
227 fibril_mutex_initialize(&session_ns.mutex);
228 atomic_set(&session_ns.refcnt, 0);
229}
230
231/** Reply received callback.
232 *
233 * This function is called whenever a reply for an asynchronous message sent out
234 * by the asynchronous framework is received.
235 *
236 * Notify the fibril which is waiting for this message that it has arrived.
237 *
238 * @param arg Pointer to the asynchronous message record.
239 * @param retval Value returned in the answer.
240 * @param data Call data of the answer.
241 *
242 */
243void async_reply_received(ipc_call_t *data)
244{
245 amsg_t *msg = data->label;
246 if (!msg)
247 return;
248
249 futex_lock(&async_futex);
250
251 msg->retval = IPC_GET_RETVAL(*data);
252
253 /* Copy data after futex_down, just in case the call was detached */
254 if ((msg->dataptr) && (data))
255 *msg->dataptr = *data;
256
257 write_barrier();
258
259 /* Remove message from timeout list */
260 if (msg->wdata.to_event.inlist)
261 list_remove(&msg->wdata.to_event.link);
262
263 msg->done = true;
264
265 if (msg->forget) {
266 assert(msg->wdata.active);
267 amsg_destroy(msg);
268 } else if (!msg->wdata.active) {
269 msg->wdata.active = true;
270 fibril_add_ready(msg->wdata.fid);
271 }
272
273 futex_unlock(&async_futex);
274}
275
276/** Send message and return id of the sent message.
277 *
278 * The return value can be used as input for async_wait() to wait for
279 * completion.
280 *
281 * @param exch Exchange for sending the message.
282 * @param imethod Service-defined interface and method.
283 * @param arg1 Service-defined payload argument.
284 * @param arg2 Service-defined payload argument.
285 * @param arg3 Service-defined payload argument.
286 * @param arg4 Service-defined payload argument.
287 * @param dataptr If non-NULL, storage where the reply data will be stored.
288 *
289 * @return Hash of the sent message or 0 on error.
290 *
291 */
292aid_t async_send_fast(async_exch_t *exch, sysarg_t imethod, sysarg_t arg1,
293 sysarg_t arg2, sysarg_t arg3, sysarg_t arg4, ipc_call_t *dataptr)
294{
295 if (exch == NULL)
296 return 0;
297
298 amsg_t *msg = amsg_create();
299 if (msg == NULL)
300 return 0;
301
302 msg->dataptr = dataptr;
303 msg->wdata.active = true;
304
305 errno_t rc = ipc_call_async_4(exch->phone, imethod, arg1, arg2, arg3,
306 arg4, msg);
307 if (rc != EOK) {
308 msg->retval = rc;
309 msg->done = true;
310 }
311
312 return (aid_t) msg;
313}
314
315/** Send message and return id of the sent message
316 *
317 * The return value can be used as input for async_wait() to wait for
318 * completion.
319 *
320 * @param exch Exchange for sending the message.
321 * @param imethod Service-defined interface and method.
322 * @param arg1 Service-defined payload argument.
323 * @param arg2 Service-defined payload argument.
324 * @param arg3 Service-defined payload argument.
325 * @param arg4 Service-defined payload argument.
326 * @param arg5 Service-defined payload argument.
327 * @param dataptr If non-NULL, storage where the reply data will be
328 * stored.
329 *
330 * @return Hash of the sent message or 0 on error.
331 *
332 */
333aid_t async_send_slow(async_exch_t *exch, sysarg_t imethod, sysarg_t arg1,
334 sysarg_t arg2, sysarg_t arg3, sysarg_t arg4, sysarg_t arg5,
335 ipc_call_t *dataptr)
336{
337 if (exch == NULL)
338 return 0;
339
340 amsg_t *msg = amsg_create();
341 if (msg == NULL)
342 return 0;
343
344 msg->dataptr = dataptr;
345 msg->wdata.active = true;
346
347 errno_t rc = ipc_call_async_5(exch->phone, imethod, arg1, arg2, arg3,
348 arg4, arg5, msg);
349 if (rc != EOK) {
350 msg->retval = rc;
351 msg->done = true;
352 }
353
354 return (aid_t) msg;
355}
356
357/** Wait for a message sent by the async framework.
358 *
359 * @param amsgid Hash of the message to wait for.
360 * @param retval Pointer to storage where the retval of the answer will
361 * be stored.
362 *
363 */
364void async_wait_for(aid_t amsgid, errno_t *retval)
365{
366 if (amsgid == 0) {
367 if (retval)
368 *retval = ENOMEM;
369 return;
370 }
371
372 amsg_t *msg = (amsg_t *) amsgid;
373
374 futex_lock(&async_futex);
375
376 assert(!msg->forget);
377 assert(!msg->destroyed);
378
379 if (msg->done) {
380 futex_unlock(&async_futex);
381 goto done;
382 }
383
384 msg->wdata.fid = fibril_get_id();
385 msg->wdata.active = false;
386 msg->wdata.to_event.inlist = false;
387
388 /* Leave the async_futex locked when entering this function */
389 fibril_switch(FIBRIL_FROM_BLOCKED);
390 futex_unlock(&async_futex);
391
392done:
393 if (retval)
394 *retval = msg->retval;
395
396 amsg_destroy(msg);
397}
398
399/** Wait for a message sent by the async framework, timeout variant.
400 *
401 * If the wait times out, the caller may choose to either wait again by calling
402 * async_wait_for() or async_wait_timeout(), or forget the message via
403 * async_forget().
404 *
405 * @param amsgid Hash of the message to wait for.
406 * @param retval Pointer to storage where the retval of the answer will
407 * be stored.
408 * @param timeout Timeout in microseconds.
409 *
410 * @return Zero on success, ETIMEOUT if the timeout has expired.
411 *
412 */
413errno_t async_wait_timeout(aid_t amsgid, errno_t *retval, suseconds_t timeout)
414{
415 if (amsgid == 0) {
416 if (retval)
417 *retval = ENOMEM;
418 return EOK;
419 }
420
421 amsg_t *msg = (amsg_t *) amsgid;
422
423 futex_lock(&async_futex);
424
425 assert(!msg->forget);
426 assert(!msg->destroyed);
427
428 if (msg->done) {
429 futex_unlock(&async_futex);
430 goto done;
431 }
432
433 /*
434 * Negative timeout is converted to zero timeout to avoid
435 * using tv_add with negative augmenter.
436 */
437 if (timeout < 0)
438 timeout = 0;
439
440 getuptime(&msg->wdata.to_event.expires);
441 tv_add_diff(&msg->wdata.to_event.expires, timeout);
442
443 /*
444 * Current fibril is inserted as waiting regardless of the
445 * "size" of the timeout.
446 *
447 * Checking for msg->done and immediately bailing out when
448 * timeout == 0 would mean that the manager fibril would never
449 * run (consider single threaded program).
450 * Thus the IPC answer would be never retrieved from the kernel.
451 *
452 * Notice that the actual delay would be very small because we
453 * - switch to manager fibril
454 * - the manager sees expired timeout
455 * - and thus adds us back to ready queue
456 * - manager switches back to some ready fibril
457 * (prior it, it checks for incoming IPC).
458 *
459 */
460 msg->wdata.fid = fibril_get_id();
461 msg->wdata.active = false;
462 async_insert_timeout(&msg->wdata);
463
464 /* Leave the async_futex locked when entering this function */
465 fibril_switch(FIBRIL_FROM_BLOCKED);
466 futex_unlock(&async_futex);
467
468 if (!msg->done)
469 return ETIMEOUT;
470
471done:
472 if (retval)
473 *retval = msg->retval;
474
475 amsg_destroy(msg);
476
477 return 0;
478}
479
480/** Discard the message / reply on arrival.
481 *
482 * The message will be marked to be discarded once the reply arrives in
483 * reply_received(). It is not allowed to call async_wait_for() or
484 * async_wait_timeout() on this message after a call to this function.
485 *
486 * @param amsgid Hash of the message to forget.
487 */
488void async_forget(aid_t amsgid)
489{
490 if (amsgid == 0)
491 return;
492
493 amsg_t *msg = (amsg_t *) amsgid;
494
495 assert(!msg->forget);
496 assert(!msg->destroyed);
497
498 futex_lock(&async_futex);
499
500 if (msg->done) {
501 amsg_destroy(msg);
502 } else {
503 msg->dataptr = NULL;
504 msg->forget = true;
505 }
506
507 futex_unlock(&async_futex);
508}
509
510/** Wait for specified time.
511 *
512 * The current fibril is suspended but the thread continues to execute.
513 *
514 * @param timeout Duration of the wait in microseconds.
515 *
516 */
517void fibril_usleep(suseconds_t timeout)
518{
519 awaiter_t awaiter;
520 awaiter_initialize(&awaiter);
521
522 awaiter.fid = fibril_get_id();
523
524 getuptime(&awaiter.to_event.expires);
525 tv_add_diff(&awaiter.to_event.expires, timeout);
526
527 futex_lock(&async_futex);
528
529 async_insert_timeout(&awaiter);
530
531 /* Leave the async_futex locked when entering this function */
532 fibril_switch(FIBRIL_FROM_BLOCKED);
533 futex_unlock(&async_futex);
534}
535
536/** Delay execution for the specified number of seconds
537 *
538 * @param sec Number of seconds to sleep
539 */
540void fibril_sleep(unsigned int sec)
541{
542 /*
543 * Sleep in 1000 second steps to support
544 * full argument range
545 */
546
547 while (sec > 0) {
548 unsigned int period = (sec > 1000) ? 1000 : sec;
549
550 fibril_usleep(period * 1000000);
551 sec -= period;
552 }
553}
554
555/** Pseudo-synchronous message sending - fast version.
556 *
557 * Send message asynchronously and return only after the reply arrives.
558 *
559 * This function can only transfer 4 register payload arguments. For
560 * transferring more arguments, see the slower async_req_slow().
561 *
562 * @param exch Exchange for sending the message.
563 * @param imethod Interface and method of the call.
564 * @param arg1 Service-defined payload argument.
565 * @param arg2 Service-defined payload argument.
566 * @param arg3 Service-defined payload argument.
567 * @param arg4 Service-defined payload argument.
568 * @param r1 If non-NULL, storage for the 1st reply argument.
569 * @param r2 If non-NULL, storage for the 2nd reply argument.
570 * @param r3 If non-NULL, storage for the 3rd reply argument.
571 * @param r4 If non-NULL, storage for the 4th reply argument.
572 * @param r5 If non-NULL, storage for the 5th reply argument.
573 *
574 * @return Return code of the reply or an error code.
575 *
576 */
577errno_t async_req_fast(async_exch_t *exch, sysarg_t imethod, sysarg_t arg1,
578 sysarg_t arg2, sysarg_t arg3, sysarg_t arg4, sysarg_t *r1, sysarg_t *r2,
579 sysarg_t *r3, sysarg_t *r4, sysarg_t *r5)
580{
581 if (exch == NULL)
582 return ENOENT;
583
584 ipc_call_t result;
585 aid_t aid = async_send_4(exch, imethod, arg1, arg2, arg3, arg4,
586 &result);
587
588 errno_t rc;
589 async_wait_for(aid, &rc);
590
591 if (r1)
592 *r1 = IPC_GET_ARG1(result);
593
594 if (r2)
595 *r2 = IPC_GET_ARG2(result);
596
597 if (r3)
598 *r3 = IPC_GET_ARG3(result);
599
600 if (r4)
601 *r4 = IPC_GET_ARG4(result);
602
603 if (r5)
604 *r5 = IPC_GET_ARG5(result);
605
606 return rc;
607}
608
609/** Pseudo-synchronous message sending - slow version.
610 *
611 * Send message asynchronously and return only after the reply arrives.
612 *
613 * @param exch Exchange for sending the message.
614 * @param imethod Interface and method of the call.
615 * @param arg1 Service-defined payload argument.
616 * @param arg2 Service-defined payload argument.
617 * @param arg3 Service-defined payload argument.
618 * @param arg4 Service-defined payload argument.
619 * @param arg5 Service-defined payload argument.
620 * @param r1 If non-NULL, storage for the 1st reply argument.
621 * @param r2 If non-NULL, storage for the 2nd reply argument.
622 * @param r3 If non-NULL, storage for the 3rd reply argument.
623 * @param r4 If non-NULL, storage for the 4th reply argument.
624 * @param r5 If non-NULL, storage for the 5th reply argument.
625 *
626 * @return Return code of the reply or an error code.
627 *
628 */
629errno_t async_req_slow(async_exch_t *exch, sysarg_t imethod, sysarg_t arg1,
630 sysarg_t arg2, sysarg_t arg3, sysarg_t arg4, sysarg_t arg5, sysarg_t *r1,
631 sysarg_t *r2, sysarg_t *r3, sysarg_t *r4, sysarg_t *r5)
632{
633 if (exch == NULL)
634 return ENOENT;
635
636 ipc_call_t result;
637 aid_t aid = async_send_5(exch, imethod, arg1, arg2, arg3, arg4, arg5,
638 &result);
639
640 errno_t rc;
641 async_wait_for(aid, &rc);
642
643 if (r1)
644 *r1 = IPC_GET_ARG1(result);
645
646 if (r2)
647 *r2 = IPC_GET_ARG2(result);
648
649 if (r3)
650 *r3 = IPC_GET_ARG3(result);
651
652 if (r4)
653 *r4 = IPC_GET_ARG4(result);
654
655 if (r5)
656 *r5 = IPC_GET_ARG5(result);
657
658 return rc;
659}
660
661void async_msg_0(async_exch_t *exch, sysarg_t imethod)
662{
663 if (exch != NULL)
664 ipc_call_async_0(exch->phone, imethod, NULL);
665}
666
667void async_msg_1(async_exch_t *exch, sysarg_t imethod, sysarg_t arg1)
668{
669 if (exch != NULL)
670 ipc_call_async_1(exch->phone, imethod, arg1, NULL);
671}
672
673void async_msg_2(async_exch_t *exch, sysarg_t imethod, sysarg_t arg1,
674 sysarg_t arg2)
675{
676 if (exch != NULL)
677 ipc_call_async_2(exch->phone, imethod, arg1, arg2, NULL);
678}
679
680void async_msg_3(async_exch_t *exch, sysarg_t imethod, sysarg_t arg1,
681 sysarg_t arg2, sysarg_t arg3)
682{
683 if (exch != NULL)
684 ipc_call_async_3(exch->phone, imethod, arg1, arg2, arg3, NULL);
685}
686
687void async_msg_4(async_exch_t *exch, sysarg_t imethod, sysarg_t arg1,
688 sysarg_t arg2, sysarg_t arg3, sysarg_t arg4)
689{
690 if (exch != NULL)
691 ipc_call_async_4(exch->phone, imethod, arg1, arg2, arg3, arg4,
692 NULL);
693}
694
695void async_msg_5(async_exch_t *exch, sysarg_t imethod, sysarg_t arg1,
696 sysarg_t arg2, sysarg_t arg3, sysarg_t arg4, sysarg_t arg5)
697{
698 if (exch != NULL)
699 ipc_call_async_5(exch->phone, imethod, arg1, arg2, arg3, arg4,
700 arg5, NULL);
701}
702
703static errno_t async_connect_me_to_internal(cap_phone_handle_t phone,
704 iface_t iface, sysarg_t arg2, sysarg_t arg3, sysarg_t flags,
705 cap_phone_handle_t *out_phone)
706{
707 ipc_call_t result;
708
709 // XXX: Workaround for GCC's inability to infer association between
710 // rc == EOK and *out_phone being assigned.
711 *out_phone = CAP_NIL;
712
713 amsg_t *msg = amsg_create();
714 if (!msg)
715 return ENOENT;
716
717 msg->dataptr = &result;
718 msg->wdata.active = true;
719
720 errno_t rc = ipc_call_async_4(phone, IPC_M_CONNECT_ME_TO,
721 (sysarg_t) iface, arg2, arg3, flags, msg);
722 if (rc != EOK) {
723 msg->retval = rc;
724 msg->done = true;
725 }
726
727 async_wait_for((aid_t) msg, &rc);
728
729 if (rc != EOK)
730 return rc;
731
732 *out_phone = (cap_phone_handle_t) IPC_GET_ARG5(result);
733 return EOK;
734}
735
736/** Wrapper for making IPC_M_CONNECT_ME_TO calls using the async framework.
737 *
738 * Ask through phone for a new connection to some service and block until
739 * success.
740 *
741 * @param exch Exchange for sending the message.
742 * @param iface Connection interface.
743 * @param arg2 User defined argument.
744 * @param arg3 User defined argument.
745 *
746 * @return New session on success or NULL on error.
747 *
748 */
749async_sess_t *async_connect_me_to(async_exch_t *exch, iface_t iface,
750 sysarg_t arg2, sysarg_t arg3)
751{
752 if (exch == NULL) {
753 errno = ENOENT;
754 return NULL;
755 }
756
757 async_sess_t *sess = (async_sess_t *) malloc(sizeof(async_sess_t));
758 if (sess == NULL) {
759 errno = ENOMEM;
760 return NULL;
761 }
762
763 cap_phone_handle_t phone;
764 errno_t rc = async_connect_me_to_internal(exch->phone, iface, arg2,
765 arg3, 0, &phone);
766 if (rc != EOK) {
767 errno = rc;
768 free(sess);
769 return NULL;
770 }
771
772 sess->iface = iface;
773 sess->phone = phone;
774 sess->arg1 = iface;
775 sess->arg2 = arg2;
776 sess->arg3 = arg3;
777
778 fibril_mutex_initialize(&sess->remote_state_mtx);
779 sess->remote_state_data = NULL;
780
781 list_initialize(&sess->exch_list);
782 fibril_mutex_initialize(&sess->mutex);
783 atomic_set(&sess->refcnt, 0);
784
785 return sess;
786}
787
788/** Set arguments for new connections.
789 *
790 * FIXME This is an ugly hack to work around the problem that parallel
791 * exchanges are implemented using parallel connections. When we create
792 * a callback session, the framework does not know arguments for the new
793 * connections.
794 *
795 * The proper solution seems to be to implement parallel exchanges using
796 * tagging.
797 */
798void async_sess_args_set(async_sess_t *sess, iface_t iface, sysarg_t arg2,
799 sysarg_t arg3)
800{
801 sess->arg1 = iface;
802 sess->arg2 = arg2;
803 sess->arg3 = arg3;
804}
805
806/** Wrapper for making IPC_M_CONNECT_ME_TO calls using the async framework.
807 *
808 * Ask through phone for a new connection to some service and block until
809 * success.
810 *
811 * @param exch Exchange for sending the message.
812 * @param iface Connection interface.
813 * @param arg2 User defined argument.
814 * @param arg3 User defined argument.
815 *
816 * @return New session on success or NULL on error.
817 *
818 */
819async_sess_t *async_connect_me_to_blocking(async_exch_t *exch, iface_t iface,
820 sysarg_t arg2, sysarg_t arg3)
821{
822 if (exch == NULL) {
823 errno = ENOENT;
824 return NULL;
825 }
826
827 async_sess_t *sess = (async_sess_t *) malloc(sizeof(async_sess_t));
828 if (sess == NULL) {
829 errno = ENOMEM;
830 return NULL;
831 }
832
833 cap_phone_handle_t phone;
834 errno_t rc = async_connect_me_to_internal(exch->phone, iface, arg2,
835 arg3, IPC_FLAG_BLOCKING, &phone);
836 if (rc != EOK) {
837 errno = rc;
838 free(sess);
839 return NULL;
840 }
841
842 sess->iface = iface;
843 sess->phone = phone;
844 sess->arg1 = iface;
845 sess->arg2 = arg2;
846 sess->arg3 = arg3;
847
848 fibril_mutex_initialize(&sess->remote_state_mtx);
849 sess->remote_state_data = NULL;
850
851 list_initialize(&sess->exch_list);
852 fibril_mutex_initialize(&sess->mutex);
853 atomic_set(&sess->refcnt, 0);
854
855 return sess;
856}
857
858/** Connect to a task specified by id.
859 *
860 */
861async_sess_t *async_connect_kbox(task_id_t id)
862{
863 async_sess_t *sess = (async_sess_t *) malloc(sizeof(async_sess_t));
864 if (sess == NULL) {
865 errno = ENOMEM;
866 return NULL;
867 }
868
869 cap_phone_handle_t phone;
870 errno_t rc = ipc_connect_kbox(id, &phone);
871 if (rc != EOK) {
872 errno = rc;
873 free(sess);
874 return NULL;
875 }
876
877 sess->iface = 0;
878 sess->mgmt = EXCHANGE_ATOMIC;
879 sess->phone = phone;
880 sess->arg1 = 0;
881 sess->arg2 = 0;
882 sess->arg3 = 0;
883
884 fibril_mutex_initialize(&sess->remote_state_mtx);
885 sess->remote_state_data = NULL;
886
887 list_initialize(&sess->exch_list);
888 fibril_mutex_initialize(&sess->mutex);
889 atomic_set(&sess->refcnt, 0);
890
891 return sess;
892}
893
894static errno_t async_hangup_internal(cap_phone_handle_t phone)
895{
896 return ipc_hangup(phone);
897}
898
899/** Wrapper for ipc_hangup.
900 *
901 * @param sess Session to hung up.
902 *
903 * @return Zero on success or an error code.
904 *
905 */
906errno_t async_hangup(async_sess_t *sess)
907{
908 async_exch_t *exch;
909
910 assert(sess);
911
912 if (atomic_get(&sess->refcnt) > 0)
913 return EBUSY;
914
915 fibril_mutex_lock(&async_sess_mutex);
916
917 errno_t rc = async_hangup_internal(sess->phone);
918
919 while (!list_empty(&sess->exch_list)) {
920 exch = (async_exch_t *)
921 list_get_instance(list_first(&sess->exch_list),
922 async_exch_t, sess_link);
923
924 list_remove(&exch->sess_link);
925 list_remove(&exch->global_link);
926 async_hangup_internal(exch->phone);
927 free(exch);
928 }
929
930 free(sess);
931
932 fibril_mutex_unlock(&async_sess_mutex);
933
934 return rc;
935}
936
937/** Start new exchange in a session.
938 *
939 * @param session Session.
940 *
941 * @return New exchange or NULL on error.
942 *
943 */
944async_exch_t *async_exchange_begin(async_sess_t *sess)
945{
946 if (sess == NULL)
947 return NULL;
948
949 exch_mgmt_t mgmt = sess->mgmt;
950 if (sess->iface != 0)
951 mgmt = sess->iface & IFACE_EXCHANGE_MASK;
952
953 async_exch_t *exch = NULL;
954
955 fibril_mutex_lock(&async_sess_mutex);
956
957 if (!list_empty(&sess->exch_list)) {
958 /*
959 * There are inactive exchanges in the session.
960 */
961 exch = (async_exch_t *)
962 list_get_instance(list_first(&sess->exch_list),
963 async_exch_t, sess_link);
964
965 list_remove(&exch->sess_link);
966 list_remove(&exch->global_link);
967 } else {
968 /*
969 * There are no available exchanges in the session.
970 */
971
972 if ((mgmt == EXCHANGE_ATOMIC) ||
973 (mgmt == EXCHANGE_SERIALIZE)) {
974 exch = (async_exch_t *) malloc(sizeof(async_exch_t));
975 if (exch != NULL) {
976 link_initialize(&exch->sess_link);
977 link_initialize(&exch->global_link);
978 exch->sess = sess;
979 exch->phone = sess->phone;
980 }
981 } else if (mgmt == EXCHANGE_PARALLEL) {
982 cap_phone_handle_t phone;
983 errno_t rc;
984
985 retry:
986 /*
987 * Make a one-time attempt to connect a new data phone.
988 */
989 rc = async_connect_me_to_internal(sess->phone, sess->arg1,
990 sess->arg2, sess->arg3, 0, &phone);
991 if (rc == EOK) {
992 exch = (async_exch_t *) malloc(sizeof(async_exch_t));
993 if (exch != NULL) {
994 link_initialize(&exch->sess_link);
995 link_initialize(&exch->global_link);
996 exch->sess = sess;
997 exch->phone = phone;
998 } else
999 async_hangup_internal(phone);
1000 } else if (!list_empty(&inactive_exch_list)) {
1001 /*
1002 * We did not manage to connect a new phone. But we
1003 * can try to close some of the currently inactive
1004 * connections in other sessions and try again.
1005 */
1006 exch = (async_exch_t *)
1007 list_get_instance(list_first(&inactive_exch_list),
1008 async_exch_t, global_link);
1009
1010 list_remove(&exch->sess_link);
1011 list_remove(&exch->global_link);
1012 async_hangup_internal(exch->phone);
1013 free(exch);
1014 goto retry;
1015 } else {
1016 /*
1017 * Wait for a phone to become available.
1018 */
1019 fibril_condvar_wait(&avail_phone_cv, &async_sess_mutex);
1020 goto retry;
1021 }
1022 }
1023 }
1024
1025 fibril_mutex_unlock(&async_sess_mutex);
1026
1027 if (exch != NULL) {
1028 atomic_inc(&sess->refcnt);
1029
1030 if (mgmt == EXCHANGE_SERIALIZE)
1031 fibril_mutex_lock(&sess->mutex);
1032 }
1033
1034 return exch;
1035}
1036
1037/** Finish an exchange.
1038 *
1039 * @param exch Exchange to finish.
1040 *
1041 */
1042void async_exchange_end(async_exch_t *exch)
1043{
1044 if (exch == NULL)
1045 return;
1046
1047 async_sess_t *sess = exch->sess;
1048 assert(sess != NULL);
1049
1050 exch_mgmt_t mgmt = sess->mgmt;
1051 if (sess->iface != 0)
1052 mgmt = sess->iface & IFACE_EXCHANGE_MASK;
1053
1054 atomic_dec(&sess->refcnt);
1055
1056 if (mgmt == EXCHANGE_SERIALIZE)
1057 fibril_mutex_unlock(&sess->mutex);
1058
1059 fibril_mutex_lock(&async_sess_mutex);
1060
1061 list_append(&exch->sess_link, &sess->exch_list);
1062 list_append(&exch->global_link, &inactive_exch_list);
1063 fibril_condvar_signal(&avail_phone_cv);
1064
1065 fibril_mutex_unlock(&async_sess_mutex);
1066}
1067
1068/** Wrapper for IPC_M_SHARE_IN calls using the async framework.
1069 *
1070 * @param exch Exchange for sending the message.
1071 * @param size Size of the destination address space area.
1072 * @param arg User defined argument.
1073 * @param flags Storage for the received flags. Can be NULL.
1074 * @param dst Address of the storage for the destination address space area
1075 * base address. Cannot be NULL.
1076 *
1077 * @return Zero on success or an error code from errno.h.
1078 *
1079 */
1080errno_t async_share_in_start(async_exch_t *exch, size_t size, sysarg_t arg,
1081 unsigned int *flags, void **dst)
1082{
1083 if (exch == NULL)
1084 return ENOENT;
1085
1086 sysarg_t _flags = 0;
1087 sysarg_t _dst = (sysarg_t) -1;
1088 errno_t res = async_req_2_4(exch, IPC_M_SHARE_IN, (sysarg_t) size,
1089 arg, NULL, &_flags, NULL, &_dst);
1090
1091 if (flags)
1092 *flags = (unsigned int) _flags;
1093
1094 *dst = (void *) _dst;
1095 return res;
1096}
1097
1098/** Wrapper for IPC_M_SHARE_OUT calls using the async framework.
1099 *
1100 * @param exch Exchange for sending the message.
1101 * @param src Source address space area base address.
1102 * @param flags Flags to be used for sharing. Bits can be only cleared.
1103 *
1104 * @return Zero on success or an error code from errno.h.
1105 *
1106 */
1107errno_t async_share_out_start(async_exch_t *exch, void *src, unsigned int flags)
1108{
1109 if (exch == NULL)
1110 return ENOENT;
1111
1112 return async_req_3_0(exch, IPC_M_SHARE_OUT, (sysarg_t) src, 0,
1113 (sysarg_t) flags);
1114}
1115
1116/** Start IPC_M_DATA_READ using the async framework.
1117 *
1118 * @param exch Exchange for sending the message.
1119 * @param dst Address of the beginning of the destination buffer.
1120 * @param size Size of the destination buffer (in bytes).
1121 * @param dataptr Storage of call data (arg 2 holds actual data size).
1122 *
1123 * @return Hash of the sent message or 0 on error.
1124 *
1125 */
1126aid_t async_data_read(async_exch_t *exch, void *dst, size_t size,
1127 ipc_call_t *dataptr)
1128{
1129 return async_send_2(exch, IPC_M_DATA_READ, (sysarg_t) dst,
1130 (sysarg_t) size, dataptr);
1131}
1132
1133/** Wrapper for IPC_M_DATA_READ calls using the async framework.
1134 *
1135 * @param exch Exchange for sending the message.
1136 * @param dst Address of the beginning of the destination buffer.
1137 * @param size Size of the destination buffer.
1138 *
1139 * @return Zero on success or an error code from errno.h.
1140 *
1141 */
1142errno_t async_data_read_start(async_exch_t *exch, void *dst, size_t size)
1143{
1144 if (exch == NULL)
1145 return ENOENT;
1146
1147 return async_req_2_0(exch, IPC_M_DATA_READ, (sysarg_t) dst,
1148 (sysarg_t) size);
1149}
1150
1151/** Wrapper for IPC_M_DATA_WRITE calls using the async framework.
1152 *
1153 * @param exch Exchange for sending the message.
1154 * @param src Address of the beginning of the source buffer.
1155 * @param size Size of the source buffer.
1156 *
1157 * @return Zero on success or an error code from errno.h.
1158 *
1159 */
1160errno_t async_data_write_start(async_exch_t *exch, const void *src, size_t size)
1161{
1162 if (exch == NULL)
1163 return ENOENT;
1164
1165 return async_req_2_0(exch, IPC_M_DATA_WRITE, (sysarg_t) src,
1166 (sysarg_t) size);
1167}
1168
1169errno_t async_state_change_start(async_exch_t *exch, sysarg_t arg1, sysarg_t arg2,
1170 sysarg_t arg3, async_exch_t *other_exch)
1171{
1172 return async_req_5_0(exch, IPC_M_STATE_CHANGE_AUTHORIZE,
1173 arg1, arg2, arg3, 0, CAP_HANDLE_RAW(other_exch->phone));
1174}
1175
1176/** Lock and get session remote state
1177 *
1178 * Lock and get the local replica of the remote state
1179 * in stateful sessions. The call should be paired
1180 * with async_remote_state_release*().
1181 *
1182 * @param[in] sess Stateful session.
1183 *
1184 * @return Local replica of the remote state.
1185 *
1186 */
1187void *async_remote_state_acquire(async_sess_t *sess)
1188{
1189 fibril_mutex_lock(&sess->remote_state_mtx);
1190 return sess->remote_state_data;
1191}
1192
1193/** Update the session remote state
1194 *
1195 * Update the local replica of the remote state
1196 * in stateful sessions. The remote state must
1197 * be already locked.
1198 *
1199 * @param[in] sess Stateful session.
1200 * @param[in] state New local replica of the remote state.
1201 *
1202 */
1203void async_remote_state_update(async_sess_t *sess, void *state)
1204{
1205 assert(fibril_mutex_is_locked(&sess->remote_state_mtx));
1206 sess->remote_state_data = state;
1207}
1208
1209/** Release the session remote state
1210 *
1211 * Unlock the local replica of the remote state
1212 * in stateful sessions.
1213 *
1214 * @param[in] sess Stateful session.
1215 *
1216 */
1217void async_remote_state_release(async_sess_t *sess)
1218{
1219 assert(fibril_mutex_is_locked(&sess->remote_state_mtx));
1220
1221 fibril_mutex_unlock(&sess->remote_state_mtx);
1222}
1223
1224/** Release the session remote state and end an exchange
1225 *
1226 * Unlock the local replica of the remote state
1227 * in stateful sessions. This is convenience function
1228 * which gets the session pointer from the exchange
1229 * and also ends the exchange.
1230 *
1231 * @param[in] exch Stateful session's exchange.
1232 *
1233 */
1234void async_remote_state_release_exchange(async_exch_t *exch)
1235{
1236 if (exch == NULL)
1237 return;
1238
1239 async_sess_t *sess = exch->sess;
1240 assert(fibril_mutex_is_locked(&sess->remote_state_mtx));
1241
1242 async_exchange_end(exch);
1243 fibril_mutex_unlock(&sess->remote_state_mtx);
1244}
1245
1246void *async_as_area_create(void *base, size_t size, unsigned int flags,
1247 async_sess_t *pager, sysarg_t id1, sysarg_t id2, sysarg_t id3)
1248{
1249 as_area_pager_info_t pager_info = {
1250 .pager = pager->phone,
1251 .id1 = id1,
1252 .id2 = id2,
1253 .id3 = id3
1254 };
1255 return as_area_create(base, size, flags, &pager_info);
1256}
1257
1258/** @}
1259 */
Note: See TracBrowser for help on using the repository browser.