source: mainline/uspace/lib/c/generic/async/client.c@ 8dab988

lfn serial ticket/834-toolchain-update topic/msim-upgrade topic/simplify-dev-export
Last change on this file since 8dab988 was 8119363, checked in by Jiří Zárevúcky <jiri.zarevucky@…>, 7 years ago

Merge some preliminary async/fibril framework changes.

  • Property mode set to 100644
File size: 32.3 KB
Line 
1/*
2 * Copyright (c) 2006 Ondrej Palkovsky
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 *
9 * - Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 * - Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
14 * - The name of the author may not be used to endorse or promote products
15 * derived from this software without specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
18 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
19 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
20 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
21 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
22 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
26 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27 */
28
29/** @addtogroup libc
30 * @{
31 */
32/** @file
33 */
34
35/**
36 * Asynchronous library
37 *
38 * The aim of this library is to provide a facility for writing programs which
39 * utilize the asynchronous nature of HelenOS IPC, yet using a normal way of
40 * programming.
41 *
42 * You should be able to write very simple multithreaded programs. The async
43 * framework will automatically take care of most of the synchronization
44 * problems.
45 *
46 * Example of use (pseudo C):
47 *
48 * 1) Multithreaded client application
49 *
50 * fibril_create(fibril1, ...);
51 * fibril_create(fibril2, ...);
52 * ...
53 *
54 * int fibril1(void *arg)
55 * {
56 * conn = async_connect_me_to(...);
57 *
58 * exch = async_exchange_begin(conn);
59 * c1 = async_send(exch);
60 * async_exchange_end(exch);
61 *
62 * exch = async_exchange_begin(conn);
63 * c2 = async_send(exch);
64 * async_exchange_end(exch);
65 *
66 * async_wait_for(c1);
67 * async_wait_for(c2);
68 * ...
69 * }
70 *
71 *
72 * 2) Multithreaded server application
73 *
74 * main()
75 * {
76 * async_manager();
77 * }
78 *
79 * port_handler(ichandle, *icall)
80 * {
81 * if (want_refuse) {
82 * async_answer_0(ichandle, ELIMIT);
83 * return;
84 * }
85 * async_answer_0(ichandle, EOK);
86 *
87 * chandle = async_get_call(&call);
88 * somehow_handle_the_call(chandle, call);
89 * async_answer_2(chandle, 1, 2, 3);
90 *
91 * chandle = async_get_call(&call);
92 * ...
93 * }
94 *
95 */
96
97#define LIBC_ASYNC_C_
98#include <ipc/ipc.h>
99#include <async.h>
100#include "../private/async.h"
101#include "../private/ns.h"
102#undef LIBC_ASYNC_C_
103
104#include <ipc/irq.h>
105#include <ipc/event.h>
106#include <futex.h>
107#include <fibril.h>
108#include <adt/hash_table.h>
109#include <adt/hash.h>
110#include <adt/list.h>
111#include <assert.h>
112#include <errno.h>
113#include <sys/time.h>
114#include <libarch/barrier.h>
115#include <stdbool.h>
116#include <stdlib.h>
117#include <mem.h>
118#include <stdlib.h>
119#include <macros.h>
120#include <as.h>
121#include <abi/mm/as.h>
122#include "../private/libc.h"
123#include "../private/fibril.h"
124
125/** Naming service session */
126async_sess_t session_ns;
127
128/** Message data */
129typedef struct {
130 awaiter_t wdata;
131
132 /** If reply was received. */
133 bool done;
134
135 /** If the message / reply should be discarded on arrival. */
136 bool forget;
137
138 /** If already destroyed. */
139 bool destroyed;
140
141 /** Pointer to where the answer data is stored. */
142 ipc_call_t *dataptr;
143
144 errno_t retval;
145} amsg_t;
146
147static void to_event_initialize(to_event_t *to)
148{
149 struct timeval tv = { 0, 0 };
150
151 to->inlist = false;
152 to->occurred = false;
153 link_initialize(&to->link);
154 to->expires = tv;
155}
156
157static void wu_event_initialize(wu_event_t *wu)
158{
159 wu->inlist = false;
160 link_initialize(&wu->link);
161}
162
163void awaiter_initialize(awaiter_t *aw)
164{
165 aw->fid = 0;
166 aw->active = false;
167 to_event_initialize(&aw->to_event);
168 wu_event_initialize(&aw->wu_event);
169}
170
171static amsg_t *amsg_create(void)
172{
173 amsg_t *msg = malloc(sizeof(amsg_t));
174 if (msg) {
175 msg->done = false;
176 msg->forget = false;
177 msg->destroyed = false;
178 msg->dataptr = NULL;
179 msg->retval = EINVAL;
180 awaiter_initialize(&msg->wdata);
181 }
182
183 return msg;
184}
185
186static void amsg_destroy(amsg_t *msg)
187{
188 assert(!msg->destroyed);
189 msg->destroyed = true;
190 free(msg);
191}
192
193/** Mutex protecting inactive_exch_list and avail_phone_cv.
194 *
195 */
196static FIBRIL_MUTEX_INITIALIZE(async_sess_mutex);
197
198/** List of all currently inactive exchanges.
199 *
200 */
201static LIST_INITIALIZE(inactive_exch_list);
202
203/** Condition variable to wait for a phone to become available.
204 *
205 */
206static FIBRIL_CONDVAR_INITIALIZE(avail_phone_cv);
207
208/** Initialize the async framework.
209 *
210 */
211void __async_client_init(void)
212{
213 session_ns.iface = 0;
214 session_ns.mgmt = EXCHANGE_ATOMIC;
215 session_ns.phone = PHONE_NS;
216 session_ns.arg1 = 0;
217 session_ns.arg2 = 0;
218 session_ns.arg3 = 0;
219
220 fibril_mutex_initialize(&session_ns.remote_state_mtx);
221 session_ns.remote_state_data = NULL;
222
223 list_initialize(&session_ns.exch_list);
224 fibril_mutex_initialize(&session_ns.mutex);
225 atomic_set(&session_ns.refcnt, 0);
226}
227
228/** Reply received callback.
229 *
230 * This function is called whenever a reply for an asynchronous message sent out
231 * by the asynchronous framework is received.
232 *
233 * Notify the fibril which is waiting for this message that it has arrived.
234 *
235 * @param arg Pointer to the asynchronous message record.
236 * @param retval Value returned in the answer.
237 * @param data Call data of the answer.
238 *
239 */
240static void reply_received(void *arg, errno_t retval, ipc_call_t *data)
241{
242 assert(arg);
243
244 futex_lock(&async_futex);
245
246 amsg_t *msg = (amsg_t *) arg;
247 msg->retval = retval;
248
249 /* Copy data after futex_down, just in case the call was detached */
250 if ((msg->dataptr) && (data))
251 *msg->dataptr = *data;
252
253 write_barrier();
254
255 /* Remove message from timeout list */
256 if (msg->wdata.to_event.inlist)
257 list_remove(&msg->wdata.to_event.link);
258
259 msg->done = true;
260
261 if (msg->forget) {
262 assert(msg->wdata.active);
263 amsg_destroy(msg);
264 } else if (!msg->wdata.active) {
265 msg->wdata.active = true;
266 fibril_add_ready(msg->wdata.fid);
267 }
268
269 futex_unlock(&async_futex);
270}
271
272/** Send message and return id of the sent message.
273 *
274 * The return value can be used as input for async_wait() to wait for
275 * completion.
276 *
277 * @param exch Exchange for sending the message.
278 * @param imethod Service-defined interface and method.
279 * @param arg1 Service-defined payload argument.
280 * @param arg2 Service-defined payload argument.
281 * @param arg3 Service-defined payload argument.
282 * @param arg4 Service-defined payload argument.
283 * @param dataptr If non-NULL, storage where the reply data will be stored.
284 *
285 * @return Hash of the sent message or 0 on error.
286 *
287 */
288aid_t async_send_fast(async_exch_t *exch, sysarg_t imethod, sysarg_t arg1,
289 sysarg_t arg2, sysarg_t arg3, sysarg_t arg4, ipc_call_t *dataptr)
290{
291 if (exch == NULL)
292 return 0;
293
294 amsg_t *msg = amsg_create();
295 if (msg == NULL)
296 return 0;
297
298 msg->dataptr = dataptr;
299 msg->wdata.active = true;
300
301 ipc_call_async_4(exch->phone, imethod, arg1, arg2, arg3, arg4, msg,
302 reply_received);
303
304 return (aid_t) msg;
305}
306
307/** Send message and return id of the sent message
308 *
309 * The return value can be used as input for async_wait() to wait for
310 * completion.
311 *
312 * @param exch Exchange for sending the message.
313 * @param imethod Service-defined interface and method.
314 * @param arg1 Service-defined payload argument.
315 * @param arg2 Service-defined payload argument.
316 * @param arg3 Service-defined payload argument.
317 * @param arg4 Service-defined payload argument.
318 * @param arg5 Service-defined payload argument.
319 * @param dataptr If non-NULL, storage where the reply data will be
320 * stored.
321 *
322 * @return Hash of the sent message or 0 on error.
323 *
324 */
325aid_t async_send_slow(async_exch_t *exch, sysarg_t imethod, sysarg_t arg1,
326 sysarg_t arg2, sysarg_t arg3, sysarg_t arg4, sysarg_t arg5,
327 ipc_call_t *dataptr)
328{
329 if (exch == NULL)
330 return 0;
331
332 amsg_t *msg = amsg_create();
333 if (msg == NULL)
334 return 0;
335
336 msg->dataptr = dataptr;
337 msg->wdata.active = true;
338
339 ipc_call_async_5(exch->phone, imethod, arg1, arg2, arg3, arg4, arg5,
340 msg, reply_received);
341
342 return (aid_t) msg;
343}
344
345/** Wait for a message sent by the async framework.
346 *
347 * @param amsgid Hash of the message to wait for.
348 * @param retval Pointer to storage where the retval of the answer will
349 * be stored.
350 *
351 */
352void async_wait_for(aid_t amsgid, errno_t *retval)
353{
354 assert(amsgid);
355
356 amsg_t *msg = (amsg_t *) amsgid;
357
358 futex_lock(&async_futex);
359
360 assert(!msg->forget);
361 assert(!msg->destroyed);
362
363 if (msg->done) {
364 futex_unlock(&async_futex);
365 goto done;
366 }
367
368 msg->wdata.fid = fibril_get_id();
369 msg->wdata.active = false;
370 msg->wdata.to_event.inlist = false;
371
372 /* Leave the async_futex locked when entering this function */
373 fibril_switch(FIBRIL_FROM_BLOCKED);
374 futex_unlock(&async_futex);
375
376done:
377 if (retval)
378 *retval = msg->retval;
379
380 amsg_destroy(msg);
381}
382
383/** Wait for a message sent by the async framework, timeout variant.
384 *
385 * If the wait times out, the caller may choose to either wait again by calling
386 * async_wait_for() or async_wait_timeout(), or forget the message via
387 * async_forget().
388 *
389 * @param amsgid Hash of the message to wait for.
390 * @param retval Pointer to storage where the retval of the answer will
391 * be stored.
392 * @param timeout Timeout in microseconds.
393 *
394 * @return Zero on success, ETIMEOUT if the timeout has expired.
395 *
396 */
397errno_t async_wait_timeout(aid_t amsgid, errno_t *retval, suseconds_t timeout)
398{
399 assert(amsgid);
400
401 amsg_t *msg = (amsg_t *) amsgid;
402
403 futex_lock(&async_futex);
404
405 assert(!msg->forget);
406 assert(!msg->destroyed);
407
408 if (msg->done) {
409 futex_unlock(&async_futex);
410 goto done;
411 }
412
413 /*
414 * Negative timeout is converted to zero timeout to avoid
415 * using tv_add with negative augmenter.
416 */
417 if (timeout < 0)
418 timeout = 0;
419
420 getuptime(&msg->wdata.to_event.expires);
421 tv_add_diff(&msg->wdata.to_event.expires, timeout);
422
423 /*
424 * Current fibril is inserted as waiting regardless of the
425 * "size" of the timeout.
426 *
427 * Checking for msg->done and immediately bailing out when
428 * timeout == 0 would mean that the manager fibril would never
429 * run (consider single threaded program).
430 * Thus the IPC answer would be never retrieved from the kernel.
431 *
432 * Notice that the actual delay would be very small because we
433 * - switch to manager fibril
434 * - the manager sees expired timeout
435 * - and thus adds us back to ready queue
436 * - manager switches back to some ready fibril
437 * (prior it, it checks for incoming IPC).
438 *
439 */
440 msg->wdata.fid = fibril_get_id();
441 msg->wdata.active = false;
442 async_insert_timeout(&msg->wdata);
443
444 /* Leave the async_futex locked when entering this function */
445 fibril_switch(FIBRIL_FROM_BLOCKED);
446 futex_unlock(&async_futex);
447
448 if (!msg->done)
449 return ETIMEOUT;
450
451done:
452 if (retval)
453 *retval = msg->retval;
454
455 amsg_destroy(msg);
456
457 return 0;
458}
459
460/** Discard the message / reply on arrival.
461 *
462 * The message will be marked to be discarded once the reply arrives in
463 * reply_received(). It is not allowed to call async_wait_for() or
464 * async_wait_timeout() on this message after a call to this function.
465 *
466 * @param amsgid Hash of the message to forget.
467 */
468void async_forget(aid_t amsgid)
469{
470 amsg_t *msg = (amsg_t *) amsgid;
471
472 assert(msg);
473 assert(!msg->forget);
474 assert(!msg->destroyed);
475
476 futex_lock(&async_futex);
477
478 if (msg->done) {
479 amsg_destroy(msg);
480 } else {
481 msg->dataptr = NULL;
482 msg->forget = true;
483 }
484
485 futex_unlock(&async_futex);
486}
487
488/** Wait for specified time.
489 *
490 * The current fibril is suspended but the thread continues to execute.
491 *
492 * @param timeout Duration of the wait in microseconds.
493 *
494 */
495void async_usleep(suseconds_t timeout)
496{
497 awaiter_t awaiter;
498 awaiter_initialize(&awaiter);
499
500 awaiter.fid = fibril_get_id();
501
502 getuptime(&awaiter.to_event.expires);
503 tv_add_diff(&awaiter.to_event.expires, timeout);
504
505 futex_lock(&async_futex);
506
507 async_insert_timeout(&awaiter);
508
509 /* Leave the async_futex locked when entering this function */
510 fibril_switch(FIBRIL_FROM_BLOCKED);
511 futex_unlock(&async_futex);
512}
513
514/** Delay execution for the specified number of seconds
515 *
516 * @param sec Number of seconds to sleep
517 */
518void async_sleep(unsigned int sec)
519{
520 /*
521 * Sleep in 1000 second steps to support
522 * full argument range
523 */
524
525 while (sec > 0) {
526 unsigned int period = (sec > 1000) ? 1000 : sec;
527
528 async_usleep(period * 1000000);
529 sec -= period;
530 }
531}
532
533/** Pseudo-synchronous message sending - fast version.
534 *
535 * Send message asynchronously and return only after the reply arrives.
536 *
537 * This function can only transfer 4 register payload arguments. For
538 * transferring more arguments, see the slower async_req_slow().
539 *
540 * @param exch Exchange for sending the message.
541 * @param imethod Interface and method of the call.
542 * @param arg1 Service-defined payload argument.
543 * @param arg2 Service-defined payload argument.
544 * @param arg3 Service-defined payload argument.
545 * @param arg4 Service-defined payload argument.
546 * @param r1 If non-NULL, storage for the 1st reply argument.
547 * @param r2 If non-NULL, storage for the 2nd reply argument.
548 * @param r3 If non-NULL, storage for the 3rd reply argument.
549 * @param r4 If non-NULL, storage for the 4th reply argument.
550 * @param r5 If non-NULL, storage for the 5th reply argument.
551 *
552 * @return Return code of the reply or an error code.
553 *
554 */
555errno_t async_req_fast(async_exch_t *exch, sysarg_t imethod, sysarg_t arg1,
556 sysarg_t arg2, sysarg_t arg3, sysarg_t arg4, sysarg_t *r1, sysarg_t *r2,
557 sysarg_t *r3, sysarg_t *r4, sysarg_t *r5)
558{
559 if (exch == NULL)
560 return ENOENT;
561
562 ipc_call_t result;
563 aid_t aid = async_send_4(exch, imethod, arg1, arg2, arg3, arg4,
564 &result);
565
566 errno_t rc;
567 async_wait_for(aid, &rc);
568
569 if (r1)
570 *r1 = IPC_GET_ARG1(result);
571
572 if (r2)
573 *r2 = IPC_GET_ARG2(result);
574
575 if (r3)
576 *r3 = IPC_GET_ARG3(result);
577
578 if (r4)
579 *r4 = IPC_GET_ARG4(result);
580
581 if (r5)
582 *r5 = IPC_GET_ARG5(result);
583
584 return rc;
585}
586
587/** Pseudo-synchronous message sending - slow version.
588 *
589 * Send message asynchronously and return only after the reply arrives.
590 *
591 * @param exch Exchange for sending the message.
592 * @param imethod Interface and method of the call.
593 * @param arg1 Service-defined payload argument.
594 * @param arg2 Service-defined payload argument.
595 * @param arg3 Service-defined payload argument.
596 * @param arg4 Service-defined payload argument.
597 * @param arg5 Service-defined payload argument.
598 * @param r1 If non-NULL, storage for the 1st reply argument.
599 * @param r2 If non-NULL, storage for the 2nd reply argument.
600 * @param r3 If non-NULL, storage for the 3rd reply argument.
601 * @param r4 If non-NULL, storage for the 4th reply argument.
602 * @param r5 If non-NULL, storage for the 5th reply argument.
603 *
604 * @return Return code of the reply or an error code.
605 *
606 */
607errno_t async_req_slow(async_exch_t *exch, sysarg_t imethod, sysarg_t arg1,
608 sysarg_t arg2, sysarg_t arg3, sysarg_t arg4, sysarg_t arg5, sysarg_t *r1,
609 sysarg_t *r2, sysarg_t *r3, sysarg_t *r4, sysarg_t *r5)
610{
611 if (exch == NULL)
612 return ENOENT;
613
614 ipc_call_t result;
615 aid_t aid = async_send_5(exch, imethod, arg1, arg2, arg3, arg4, arg5,
616 &result);
617
618 errno_t rc;
619 async_wait_for(aid, &rc);
620
621 if (r1)
622 *r1 = IPC_GET_ARG1(result);
623
624 if (r2)
625 *r2 = IPC_GET_ARG2(result);
626
627 if (r3)
628 *r3 = IPC_GET_ARG3(result);
629
630 if (r4)
631 *r4 = IPC_GET_ARG4(result);
632
633 if (r5)
634 *r5 = IPC_GET_ARG5(result);
635
636 return rc;
637}
638
639void async_msg_0(async_exch_t *exch, sysarg_t imethod)
640{
641 if (exch != NULL)
642 ipc_call_async_0(exch->phone, imethod, NULL, NULL);
643}
644
645void async_msg_1(async_exch_t *exch, sysarg_t imethod, sysarg_t arg1)
646{
647 if (exch != NULL)
648 ipc_call_async_1(exch->phone, imethod, arg1, NULL, NULL);
649}
650
651void async_msg_2(async_exch_t *exch, sysarg_t imethod, sysarg_t arg1,
652 sysarg_t arg2)
653{
654 if (exch != NULL)
655 ipc_call_async_2(exch->phone, imethod, arg1, arg2, NULL, NULL);
656}
657
658void async_msg_3(async_exch_t *exch, sysarg_t imethod, sysarg_t arg1,
659 sysarg_t arg2, sysarg_t arg3)
660{
661 if (exch != NULL)
662 ipc_call_async_3(exch->phone, imethod, arg1, arg2, arg3, NULL,
663 NULL);
664}
665
666void async_msg_4(async_exch_t *exch, sysarg_t imethod, sysarg_t arg1,
667 sysarg_t arg2, sysarg_t arg3, sysarg_t arg4)
668{
669 if (exch != NULL)
670 ipc_call_async_4(exch->phone, imethod, arg1, arg2, arg3, arg4,
671 NULL, NULL);
672}
673
674void async_msg_5(async_exch_t *exch, sysarg_t imethod, sysarg_t arg1,
675 sysarg_t arg2, sysarg_t arg3, sysarg_t arg4, sysarg_t arg5)
676{
677 if (exch != NULL)
678 ipc_call_async_5(exch->phone, imethod, arg1, arg2, arg3, arg4,
679 arg5, NULL, NULL);
680}
681
682static errno_t async_connect_me_to_internal(cap_phone_handle_t phone,
683 sysarg_t arg1, sysarg_t arg2, sysarg_t arg3, sysarg_t arg4,
684 cap_phone_handle_t *out_phone)
685{
686 ipc_call_t result;
687
688 // XXX: Workaround for GCC's inability to infer association between
689 // rc == EOK and *out_phone being assigned.
690 *out_phone = CAP_NIL;
691
692 amsg_t *msg = amsg_create();
693 if (!msg)
694 return ENOENT;
695
696 msg->dataptr = &result;
697 msg->wdata.active = true;
698
699 ipc_call_async_4(phone, IPC_M_CONNECT_ME_TO, arg1, arg2, arg3, arg4,
700 msg, reply_received);
701
702 errno_t rc;
703 async_wait_for((aid_t) msg, &rc);
704
705 if (rc != EOK)
706 return rc;
707
708 *out_phone = (cap_phone_handle_t) IPC_GET_ARG5(result);
709 return EOK;
710}
711
712/** Wrapper for making IPC_M_CONNECT_ME_TO calls using the async framework.
713 *
714 * Ask through for a new connection to some service.
715 *
716 * @param mgmt Exchange management style.
717 * @param exch Exchange for sending the message.
718 * @param arg1 User defined argument.
719 * @param arg2 User defined argument.
720 * @param arg3 User defined argument.
721 *
722 * @return New session on success or NULL on error.
723 *
724 */
725async_sess_t *async_connect_me_to(exch_mgmt_t mgmt, async_exch_t *exch,
726 sysarg_t arg1, sysarg_t arg2, sysarg_t arg3)
727{
728 if (exch == NULL) {
729 errno = ENOENT;
730 return NULL;
731 }
732
733 async_sess_t *sess = (async_sess_t *) malloc(sizeof(async_sess_t));
734 if (sess == NULL) {
735 errno = ENOMEM;
736 return NULL;
737 }
738
739 cap_phone_handle_t phone;
740 errno_t rc = async_connect_me_to_internal(exch->phone, arg1, arg2, arg3,
741 0, &phone);
742 if (rc != EOK) {
743 errno = rc;
744 free(sess);
745 return NULL;
746 }
747
748 sess->iface = 0;
749 sess->mgmt = mgmt;
750 sess->phone = phone;
751 sess->arg1 = arg1;
752 sess->arg2 = arg2;
753 sess->arg3 = arg3;
754
755 fibril_mutex_initialize(&sess->remote_state_mtx);
756 sess->remote_state_data = NULL;
757
758 list_initialize(&sess->exch_list);
759 fibril_mutex_initialize(&sess->mutex);
760 atomic_set(&sess->refcnt, 0);
761
762 return sess;
763}
764
765/** Wrapper for making IPC_M_CONNECT_ME_TO calls using the async framework.
766 *
767 * Ask through phone for a new connection to some service and block until
768 * success.
769 *
770 * @param exch Exchange for sending the message.
771 * @param iface Connection interface.
772 * @param arg2 User defined argument.
773 * @param arg3 User defined argument.
774 *
775 * @return New session on success or NULL on error.
776 *
777 */
778async_sess_t *async_connect_me_to_iface(async_exch_t *exch, iface_t iface,
779 sysarg_t arg2, sysarg_t arg3)
780{
781 if (exch == NULL) {
782 errno = ENOENT;
783 return NULL;
784 }
785
786 async_sess_t *sess = (async_sess_t *) malloc(sizeof(async_sess_t));
787 if (sess == NULL) {
788 errno = ENOMEM;
789 return NULL;
790 }
791
792 cap_phone_handle_t phone;
793 errno_t rc = async_connect_me_to_internal(exch->phone, iface, arg2,
794 arg3, 0, &phone);
795 if (rc != EOK) {
796 errno = rc;
797 free(sess);
798 return NULL;
799 }
800
801 sess->iface = iface;
802 sess->phone = phone;
803 sess->arg1 = iface;
804 sess->arg2 = arg2;
805 sess->arg3 = arg3;
806
807 fibril_mutex_initialize(&sess->remote_state_mtx);
808 sess->remote_state_data = NULL;
809
810 list_initialize(&sess->exch_list);
811 fibril_mutex_initialize(&sess->mutex);
812 atomic_set(&sess->refcnt, 0);
813
814 return sess;
815}
816
817/** Set arguments for new connections.
818 *
819 * FIXME This is an ugly hack to work around the problem that parallel
820 * exchanges are implemented using parallel connections. When we create
821 * a callback session, the framework does not know arguments for the new
822 * connections.
823 *
824 * The proper solution seems to be to implement parallel exchanges using
825 * tagging.
826 */
827void async_sess_args_set(async_sess_t *sess, sysarg_t arg1, sysarg_t arg2,
828 sysarg_t arg3)
829{
830 sess->arg1 = arg1;
831 sess->arg2 = arg2;
832 sess->arg3 = arg3;
833}
834
835/** Wrapper for making IPC_M_CONNECT_ME_TO calls using the async framework.
836 *
837 * Ask through phone for a new connection to some service and block until
838 * success.
839 *
840 * @param mgmt Exchange management style.
841 * @param exch Exchange for sending the message.
842 * @param arg1 User defined argument.
843 * @param arg2 User defined argument.
844 * @param arg3 User defined argument.
845 *
846 * @return New session on success or NULL on error.
847 *
848 */
849async_sess_t *async_connect_me_to_blocking(exch_mgmt_t mgmt, async_exch_t *exch,
850 sysarg_t arg1, sysarg_t arg2, sysarg_t arg3)
851{
852 if (exch == NULL) {
853 errno = ENOENT;
854 return NULL;
855 }
856
857 async_sess_t *sess = (async_sess_t *) malloc(sizeof(async_sess_t));
858 if (sess == NULL) {
859 errno = ENOMEM;
860 return NULL;
861 }
862
863 cap_phone_handle_t phone;
864 errno_t rc = async_connect_me_to_internal(exch->phone, arg1, arg2, arg3,
865 IPC_FLAG_BLOCKING, &phone);
866
867 if (rc != EOK) {
868 errno = rc;
869 free(sess);
870 return NULL;
871 }
872
873 sess->iface = 0;
874 sess->mgmt = mgmt;
875 sess->phone = phone;
876 sess->arg1 = arg1;
877 sess->arg2 = arg2;
878 sess->arg3 = arg3;
879
880 fibril_mutex_initialize(&sess->remote_state_mtx);
881 sess->remote_state_data = NULL;
882
883 list_initialize(&sess->exch_list);
884 fibril_mutex_initialize(&sess->mutex);
885 atomic_set(&sess->refcnt, 0);
886
887 return sess;
888}
889
890/** Wrapper for making IPC_M_CONNECT_ME_TO calls using the async framework.
891 *
892 * Ask through phone for a new connection to some service and block until
893 * success.
894 *
895 * @param exch Exchange for sending the message.
896 * @param iface Connection interface.
897 * @param arg2 User defined argument.
898 * @param arg3 User defined argument.
899 *
900 * @return New session on success or NULL on error.
901 *
902 */
903async_sess_t *async_connect_me_to_blocking_iface(async_exch_t *exch, iface_t iface,
904 sysarg_t arg2, sysarg_t arg3)
905{
906 if (exch == NULL) {
907 errno = ENOENT;
908 return NULL;
909 }
910
911 async_sess_t *sess = (async_sess_t *) malloc(sizeof(async_sess_t));
912 if (sess == NULL) {
913 errno = ENOMEM;
914 return NULL;
915 }
916
917 cap_phone_handle_t phone;
918 errno_t rc = async_connect_me_to_internal(exch->phone, iface, arg2,
919 arg3, IPC_FLAG_BLOCKING, &phone);
920 if (rc != EOK) {
921 errno = rc;
922 free(sess);
923 return NULL;
924 }
925
926 sess->iface = iface;
927 sess->phone = phone;
928 sess->arg1 = iface;
929 sess->arg2 = arg2;
930 sess->arg3 = arg3;
931
932 fibril_mutex_initialize(&sess->remote_state_mtx);
933 sess->remote_state_data = NULL;
934
935 list_initialize(&sess->exch_list);
936 fibril_mutex_initialize(&sess->mutex);
937 atomic_set(&sess->refcnt, 0);
938
939 return sess;
940}
941
942/** Connect to a task specified by id.
943 *
944 */
945async_sess_t *async_connect_kbox(task_id_t id)
946{
947 async_sess_t *sess = (async_sess_t *) malloc(sizeof(async_sess_t));
948 if (sess == NULL) {
949 errno = ENOMEM;
950 return NULL;
951 }
952
953 cap_phone_handle_t phone;
954 errno_t rc = ipc_connect_kbox(id, &phone);
955 if (rc != EOK) {
956 errno = rc;
957 free(sess);
958 return NULL;
959 }
960
961 sess->iface = 0;
962 sess->mgmt = EXCHANGE_ATOMIC;
963 sess->phone = phone;
964 sess->arg1 = 0;
965 sess->arg2 = 0;
966 sess->arg3 = 0;
967
968 fibril_mutex_initialize(&sess->remote_state_mtx);
969 sess->remote_state_data = NULL;
970
971 list_initialize(&sess->exch_list);
972 fibril_mutex_initialize(&sess->mutex);
973 atomic_set(&sess->refcnt, 0);
974
975 return sess;
976}
977
978static errno_t async_hangup_internal(cap_phone_handle_t phone)
979{
980 return ipc_hangup(phone);
981}
982
983/** Wrapper for ipc_hangup.
984 *
985 * @param sess Session to hung up.
986 *
987 * @return Zero on success or an error code.
988 *
989 */
990errno_t async_hangup(async_sess_t *sess)
991{
992 async_exch_t *exch;
993
994 assert(sess);
995
996 if (atomic_get(&sess->refcnt) > 0)
997 return EBUSY;
998
999 fibril_mutex_lock(&async_sess_mutex);
1000
1001 errno_t rc = async_hangup_internal(sess->phone);
1002
1003 while (!list_empty(&sess->exch_list)) {
1004 exch = (async_exch_t *)
1005 list_get_instance(list_first(&sess->exch_list),
1006 async_exch_t, sess_link);
1007
1008 list_remove(&exch->sess_link);
1009 list_remove(&exch->global_link);
1010 async_hangup_internal(exch->phone);
1011 free(exch);
1012 }
1013
1014 free(sess);
1015
1016 fibril_mutex_unlock(&async_sess_mutex);
1017
1018 return rc;
1019}
1020
1021/** Start new exchange in a session.
1022 *
1023 * @param session Session.
1024 *
1025 * @return New exchange or NULL on error.
1026 *
1027 */
1028async_exch_t *async_exchange_begin(async_sess_t *sess)
1029{
1030 if (sess == NULL)
1031 return NULL;
1032
1033 exch_mgmt_t mgmt = sess->mgmt;
1034 if (sess->iface != 0)
1035 mgmt = sess->iface & IFACE_EXCHANGE_MASK;
1036
1037 async_exch_t *exch = NULL;
1038
1039 fibril_mutex_lock(&async_sess_mutex);
1040
1041 if (!list_empty(&sess->exch_list)) {
1042 /*
1043 * There are inactive exchanges in the session.
1044 */
1045 exch = (async_exch_t *)
1046 list_get_instance(list_first(&sess->exch_list),
1047 async_exch_t, sess_link);
1048
1049 list_remove(&exch->sess_link);
1050 list_remove(&exch->global_link);
1051 } else {
1052 /*
1053 * There are no available exchanges in the session.
1054 */
1055
1056 if ((mgmt == EXCHANGE_ATOMIC) ||
1057 (mgmt == EXCHANGE_SERIALIZE)) {
1058 exch = (async_exch_t *) malloc(sizeof(async_exch_t));
1059 if (exch != NULL) {
1060 link_initialize(&exch->sess_link);
1061 link_initialize(&exch->global_link);
1062 exch->sess = sess;
1063 exch->phone = sess->phone;
1064 }
1065 } else if (mgmt == EXCHANGE_PARALLEL) {
1066 cap_phone_handle_t phone;
1067 errno_t rc;
1068
1069 retry:
1070 /*
1071 * Make a one-time attempt to connect a new data phone.
1072 */
1073 rc = async_connect_me_to_internal(sess->phone, sess->arg1,
1074 sess->arg2, sess->arg3, 0, &phone);
1075 if (rc == EOK) {
1076 exch = (async_exch_t *) malloc(sizeof(async_exch_t));
1077 if (exch != NULL) {
1078 link_initialize(&exch->sess_link);
1079 link_initialize(&exch->global_link);
1080 exch->sess = sess;
1081 exch->phone = phone;
1082 } else
1083 async_hangup_internal(phone);
1084 } else if (!list_empty(&inactive_exch_list)) {
1085 /*
1086 * We did not manage to connect a new phone. But we
1087 * can try to close some of the currently inactive
1088 * connections in other sessions and try again.
1089 */
1090 exch = (async_exch_t *)
1091 list_get_instance(list_first(&inactive_exch_list),
1092 async_exch_t, global_link);
1093
1094 list_remove(&exch->sess_link);
1095 list_remove(&exch->global_link);
1096 async_hangup_internal(exch->phone);
1097 free(exch);
1098 goto retry;
1099 } else {
1100 /*
1101 * Wait for a phone to become available.
1102 */
1103 fibril_condvar_wait(&avail_phone_cv, &async_sess_mutex);
1104 goto retry;
1105 }
1106 }
1107 }
1108
1109 fibril_mutex_unlock(&async_sess_mutex);
1110
1111 if (exch != NULL) {
1112 atomic_inc(&sess->refcnt);
1113
1114 if (mgmt == EXCHANGE_SERIALIZE)
1115 fibril_mutex_lock(&sess->mutex);
1116 }
1117
1118 return exch;
1119}
1120
1121/** Finish an exchange.
1122 *
1123 * @param exch Exchange to finish.
1124 *
1125 */
1126void async_exchange_end(async_exch_t *exch)
1127{
1128 if (exch == NULL)
1129 return;
1130
1131 async_sess_t *sess = exch->sess;
1132 assert(sess != NULL);
1133
1134 exch_mgmt_t mgmt = sess->mgmt;
1135 if (sess->iface != 0)
1136 mgmt = sess->iface & IFACE_EXCHANGE_MASK;
1137
1138 atomic_dec(&sess->refcnt);
1139
1140 if (mgmt == EXCHANGE_SERIALIZE)
1141 fibril_mutex_unlock(&sess->mutex);
1142
1143 fibril_mutex_lock(&async_sess_mutex);
1144
1145 list_append(&exch->sess_link, &sess->exch_list);
1146 list_append(&exch->global_link, &inactive_exch_list);
1147 fibril_condvar_signal(&avail_phone_cv);
1148
1149 fibril_mutex_unlock(&async_sess_mutex);
1150}
1151
1152/** Wrapper for IPC_M_SHARE_IN calls using the async framework.
1153 *
1154 * @param exch Exchange for sending the message.
1155 * @param size Size of the destination address space area.
1156 * @param arg User defined argument.
1157 * @param flags Storage for the received flags. Can be NULL.
1158 * @param dst Address of the storage for the destination address space area
1159 * base address. Cannot be NULL.
1160 *
1161 * @return Zero on success or an error code from errno.h.
1162 *
1163 */
1164errno_t async_share_in_start(async_exch_t *exch, size_t size, sysarg_t arg,
1165 unsigned int *flags, void **dst)
1166{
1167 if (exch == NULL)
1168 return ENOENT;
1169
1170 sysarg_t _flags = 0;
1171 sysarg_t _dst = (sysarg_t) -1;
1172 errno_t res = async_req_2_4(exch, IPC_M_SHARE_IN, (sysarg_t) size,
1173 arg, NULL, &_flags, NULL, &_dst);
1174
1175 if (flags)
1176 *flags = (unsigned int) _flags;
1177
1178 *dst = (void *) _dst;
1179 return res;
1180}
1181
1182/** Wrapper for IPC_M_SHARE_OUT calls using the async framework.
1183 *
1184 * @param exch Exchange for sending the message.
1185 * @param src Source address space area base address.
1186 * @param flags Flags to be used for sharing. Bits can be only cleared.
1187 *
1188 * @return Zero on success or an error code from errno.h.
1189 *
1190 */
1191errno_t async_share_out_start(async_exch_t *exch, void *src, unsigned int flags)
1192{
1193 if (exch == NULL)
1194 return ENOENT;
1195
1196 return async_req_3_0(exch, IPC_M_SHARE_OUT, (sysarg_t) src, 0,
1197 (sysarg_t) flags);
1198}
1199
1200/** Start IPC_M_DATA_READ using the async framework.
1201 *
1202 * @param exch Exchange for sending the message.
1203 * @param dst Address of the beginning of the destination buffer.
1204 * @param size Size of the destination buffer (in bytes).
1205 * @param dataptr Storage of call data (arg 2 holds actual data size).
1206 *
1207 * @return Hash of the sent message or 0 on error.
1208 *
1209 */
1210aid_t async_data_read(async_exch_t *exch, void *dst, size_t size,
1211 ipc_call_t *dataptr)
1212{
1213 return async_send_2(exch, IPC_M_DATA_READ, (sysarg_t) dst,
1214 (sysarg_t) size, dataptr);
1215}
1216
1217/** Wrapper for IPC_M_DATA_READ calls using the async framework.
1218 *
1219 * @param exch Exchange for sending the message.
1220 * @param dst Address of the beginning of the destination buffer.
1221 * @param size Size of the destination buffer.
1222 *
1223 * @return Zero on success or an error code from errno.h.
1224 *
1225 */
1226errno_t async_data_read_start(async_exch_t *exch, void *dst, size_t size)
1227{
1228 if (exch == NULL)
1229 return ENOENT;
1230
1231 return async_req_2_0(exch, IPC_M_DATA_READ, (sysarg_t) dst,
1232 (sysarg_t) size);
1233}
1234
1235/** Wrapper for IPC_M_DATA_WRITE calls using the async framework.
1236 *
1237 * @param exch Exchange for sending the message.
1238 * @param src Address of the beginning of the source buffer.
1239 * @param size Size of the source buffer.
1240 *
1241 * @return Zero on success or an error code from errno.h.
1242 *
1243 */
1244errno_t async_data_write_start(async_exch_t *exch, const void *src, size_t size)
1245{
1246 if (exch == NULL)
1247 return ENOENT;
1248
1249 return async_req_2_0(exch, IPC_M_DATA_WRITE, (sysarg_t) src,
1250 (sysarg_t) size);
1251}
1252
1253errno_t async_state_change_start(async_exch_t *exch, sysarg_t arg1, sysarg_t arg2,
1254 sysarg_t arg3, async_exch_t *other_exch)
1255{
1256 return async_req_5_0(exch, IPC_M_STATE_CHANGE_AUTHORIZE,
1257 arg1, arg2, arg3, 0, CAP_HANDLE_RAW(other_exch->phone));
1258}
1259
1260/** Lock and get session remote state
1261 *
1262 * Lock and get the local replica of the remote state
1263 * in stateful sessions. The call should be paired
1264 * with async_remote_state_release*().
1265 *
1266 * @param[in] sess Stateful session.
1267 *
1268 * @return Local replica of the remote state.
1269 *
1270 */
1271void *async_remote_state_acquire(async_sess_t *sess)
1272{
1273 fibril_mutex_lock(&sess->remote_state_mtx);
1274 return sess->remote_state_data;
1275}
1276
1277/** Update the session remote state
1278 *
1279 * Update the local replica of the remote state
1280 * in stateful sessions. The remote state must
1281 * be already locked.
1282 *
1283 * @param[in] sess Stateful session.
1284 * @param[in] state New local replica of the remote state.
1285 *
1286 */
1287void async_remote_state_update(async_sess_t *sess, void *state)
1288{
1289 assert(fibril_mutex_is_locked(&sess->remote_state_mtx));
1290 sess->remote_state_data = state;
1291}
1292
1293/** Release the session remote state
1294 *
1295 * Unlock the local replica of the remote state
1296 * in stateful sessions.
1297 *
1298 * @param[in] sess Stateful session.
1299 *
1300 */
1301void async_remote_state_release(async_sess_t *sess)
1302{
1303 assert(fibril_mutex_is_locked(&sess->remote_state_mtx));
1304
1305 fibril_mutex_unlock(&sess->remote_state_mtx);
1306}
1307
1308/** Release the session remote state and end an exchange
1309 *
1310 * Unlock the local replica of the remote state
1311 * in stateful sessions. This is convenience function
1312 * which gets the session pointer from the exchange
1313 * and also ends the exchange.
1314 *
1315 * @param[in] exch Stateful session's exchange.
1316 *
1317 */
1318void async_remote_state_release_exchange(async_exch_t *exch)
1319{
1320 if (exch == NULL)
1321 return;
1322
1323 async_sess_t *sess = exch->sess;
1324 assert(fibril_mutex_is_locked(&sess->remote_state_mtx));
1325
1326 async_exchange_end(exch);
1327 fibril_mutex_unlock(&sess->remote_state_mtx);
1328}
1329
1330void *async_as_area_create(void *base, size_t size, unsigned int flags,
1331 async_sess_t *pager, sysarg_t id1, sysarg_t id2, sysarg_t id3)
1332{
1333 as_area_pager_info_t pager_info = {
1334 .pager = pager->phone,
1335 .id1 = id1,
1336 .id2 = id2,
1337 .id3 = id3
1338 };
1339 return as_area_create(base, size, flags, &pager_info);
1340}
1341
1342/** @}
1343 */
Note: See TracBrowser for help on using the repository browser.