source: mainline/uspace/lib/c/generic/async/client.c@ 914c693

lfn serial ticket/834-toolchain-update topic/msim-upgrade topic/simplify-dev-export
Last change on this file since 914c693 was 914c693, checked in by Martin Decky <martin@…>, 7 years ago

remove obsolete non-interface connection routines

  • Property mode set to 100644
File size: 30.0 KB
Line 
1/*
2 * Copyright (c) 2006 Ondrej Palkovsky
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 *
9 * - Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 * - Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
14 * - The name of the author may not be used to endorse or promote products
15 * derived from this software without specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
18 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
19 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
20 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
21 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
22 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
26 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27 */
28
29/** @addtogroup libc
30 * @{
31 */
32/** @file
33 */
34
35/**
36 * Asynchronous library
37 *
38 * The aim of this library is to provide a facility for writing programs which
39 * utilize the asynchronous nature of HelenOS IPC, yet using a normal way of
40 * programming.
41 *
42 * You should be able to write very simple multithreaded programs. The async
43 * framework will automatically take care of most of the synchronization
44 * problems.
45 *
46 * Example of use (pseudo C):
47 *
48 * 1) Multithreaded client application
49 *
50 * fibril_create(fibril1, ...);
51 * fibril_create(fibril2, ...);
52 * ...
53 *
54 * int fibril1(void *arg)
55 * {
56 * conn = async_connect_me_to(...);
57 *
58 * exch = async_exchange_begin(conn);
59 * c1 = async_send(exch);
60 * async_exchange_end(exch);
61 *
62 * exch = async_exchange_begin(conn);
63 * c2 = async_send(exch);
64 * async_exchange_end(exch);
65 *
66 * async_wait_for(c1);
67 * async_wait_for(c2);
68 * ...
69 * }
70 *
71 *
72 * 2) Multithreaded server application
73 *
74 * main()
75 * {
76 * async_manager();
77 * }
78 *
79 * port_handler(ichandle, *icall)
80 * {
81 * if (want_refuse) {
82 * async_answer_0(ichandle, ELIMIT);
83 * return;
84 * }
85 * async_answer_0(ichandle, EOK);
86 *
87 * chandle = async_get_call(&call);
88 * somehow_handle_the_call(chandle, call);
89 * async_answer_2(chandle, 1, 2, 3);
90 *
91 * chandle = async_get_call(&call);
92 * ...
93 * }
94 *
95 */
96
97#define LIBC_ASYNC_C_
98#include <ipc/ipc.h>
99#include <async.h>
100#include "../private/async.h"
101#include "../private/ns.h"
102#undef LIBC_ASYNC_C_
103
104#include <ipc/irq.h>
105#include <ipc/event.h>
106#include <futex.h>
107#include <fibril.h>
108#include <adt/hash_table.h>
109#include <adt/hash.h>
110#include <adt/list.h>
111#include <assert.h>
112#include <errno.h>
113#include <sys/time.h>
114#include <libarch/barrier.h>
115#include <stdbool.h>
116#include <stdlib.h>
117#include <mem.h>
118#include <stdlib.h>
119#include <macros.h>
120#include <as.h>
121#include <abi/mm/as.h>
122#include "../private/libc.h"
123#include "../private/fibril.h"
124
125/** Naming service session */
126async_sess_t session_ns;
127
128/** Message data */
129typedef struct {
130 awaiter_t wdata;
131
132 /** If reply was received. */
133 bool done;
134
135 /** If the message / reply should be discarded on arrival. */
136 bool forget;
137
138 /** If already destroyed. */
139 bool destroyed;
140
141 /** Pointer to where the answer data is stored. */
142 ipc_call_t *dataptr;
143
144 errno_t retval;
145} amsg_t;
146
147static void to_event_initialize(to_event_t *to)
148{
149 struct timeval tv = { 0, 0 };
150
151 to->inlist = false;
152 to->occurred = false;
153 link_initialize(&to->link);
154 to->expires = tv;
155}
156
157static void wu_event_initialize(wu_event_t *wu)
158{
159 wu->inlist = false;
160 link_initialize(&wu->link);
161}
162
163void awaiter_initialize(awaiter_t *aw)
164{
165 aw->fid = 0;
166 aw->active = false;
167 to_event_initialize(&aw->to_event);
168 wu_event_initialize(&aw->wu_event);
169}
170
171static amsg_t *amsg_create(void)
172{
173 amsg_t *msg = malloc(sizeof(amsg_t));
174 if (msg) {
175 msg->done = false;
176 msg->forget = false;
177 msg->destroyed = false;
178 msg->dataptr = NULL;
179 msg->retval = EINVAL;
180 awaiter_initialize(&msg->wdata);
181 }
182
183 return msg;
184}
185
186static void amsg_destroy(amsg_t *msg)
187{
188 if (!msg)
189 return;
190
191 assert(!msg->destroyed);
192 msg->destroyed = true;
193 free(msg);
194}
195
196/** Mutex protecting inactive_exch_list and avail_phone_cv.
197 *
198 */
199static FIBRIL_MUTEX_INITIALIZE(async_sess_mutex);
200
201/** List of all currently inactive exchanges.
202 *
203 */
204static LIST_INITIALIZE(inactive_exch_list);
205
206/** Condition variable to wait for a phone to become available.
207 *
208 */
209static FIBRIL_CONDVAR_INITIALIZE(avail_phone_cv);
210
211/** Initialize the async framework.
212 *
213 */
214void __async_client_init(void)
215{
216 session_ns.iface = 0;
217 session_ns.mgmt = EXCHANGE_ATOMIC;
218 session_ns.phone = PHONE_NS;
219 session_ns.arg1 = 0;
220 session_ns.arg2 = 0;
221 session_ns.arg3 = 0;
222
223 fibril_mutex_initialize(&session_ns.remote_state_mtx);
224 session_ns.remote_state_data = NULL;
225
226 list_initialize(&session_ns.exch_list);
227 fibril_mutex_initialize(&session_ns.mutex);
228 atomic_set(&session_ns.refcnt, 0);
229}
230
231/** Reply received callback.
232 *
233 * This function is called whenever a reply for an asynchronous message sent out
234 * by the asynchronous framework is received.
235 *
236 * Notify the fibril which is waiting for this message that it has arrived.
237 *
238 * @param arg Pointer to the asynchronous message record.
239 * @param retval Value returned in the answer.
240 * @param data Call data of the answer.
241 *
242 */
243static void reply_received(void *arg, errno_t retval, ipc_call_t *data)
244{
245 assert(arg);
246
247 futex_lock(&async_futex);
248
249 amsg_t *msg = (amsg_t *) arg;
250 msg->retval = retval;
251
252 /* Copy data after futex_down, just in case the call was detached */
253 if ((msg->dataptr) && (data))
254 *msg->dataptr = *data;
255
256 write_barrier();
257
258 /* Remove message from timeout list */
259 if (msg->wdata.to_event.inlist)
260 list_remove(&msg->wdata.to_event.link);
261
262 msg->done = true;
263
264 if (msg->forget) {
265 assert(msg->wdata.active);
266 amsg_destroy(msg);
267 } else if (!msg->wdata.active) {
268 msg->wdata.active = true;
269 fibril_add_ready(msg->wdata.fid);
270 }
271
272 futex_unlock(&async_futex);
273}
274
275/** Send message and return id of the sent message.
276 *
277 * The return value can be used as input for async_wait() to wait for
278 * completion.
279 *
280 * @param exch Exchange for sending the message.
281 * @param imethod Service-defined interface and method.
282 * @param arg1 Service-defined payload argument.
283 * @param arg2 Service-defined payload argument.
284 * @param arg3 Service-defined payload argument.
285 * @param arg4 Service-defined payload argument.
286 * @param dataptr If non-NULL, storage where the reply data will be stored.
287 *
288 * @return Hash of the sent message or 0 on error.
289 *
290 */
291aid_t async_send_fast(async_exch_t *exch, sysarg_t imethod, sysarg_t arg1,
292 sysarg_t arg2, sysarg_t arg3, sysarg_t arg4, ipc_call_t *dataptr)
293{
294 if (exch == NULL)
295 return 0;
296
297 amsg_t *msg = amsg_create();
298 if (msg == NULL)
299 return 0;
300
301 msg->dataptr = dataptr;
302 msg->wdata.active = true;
303
304 ipc_call_async_4(exch->phone, imethod, arg1, arg2, arg3, arg4, msg,
305 reply_received);
306
307 return (aid_t) msg;
308}
309
310/** Send message and return id of the sent message
311 *
312 * The return value can be used as input for async_wait() to wait for
313 * completion.
314 *
315 * @param exch Exchange for sending the message.
316 * @param imethod Service-defined interface and method.
317 * @param arg1 Service-defined payload argument.
318 * @param arg2 Service-defined payload argument.
319 * @param arg3 Service-defined payload argument.
320 * @param arg4 Service-defined payload argument.
321 * @param arg5 Service-defined payload argument.
322 * @param dataptr If non-NULL, storage where the reply data will be
323 * stored.
324 *
325 * @return Hash of the sent message or 0 on error.
326 *
327 */
328aid_t async_send_slow(async_exch_t *exch, sysarg_t imethod, sysarg_t arg1,
329 sysarg_t arg2, sysarg_t arg3, sysarg_t arg4, sysarg_t arg5,
330 ipc_call_t *dataptr)
331{
332 if (exch == NULL)
333 return 0;
334
335 amsg_t *msg = amsg_create();
336 if (msg == NULL)
337 return 0;
338
339 msg->dataptr = dataptr;
340 msg->wdata.active = true;
341
342 ipc_call_async_5(exch->phone, imethod, arg1, arg2, arg3, arg4, arg5,
343 msg, reply_received);
344
345 return (aid_t) msg;
346}
347
348/** Wait for a message sent by the async framework.
349 *
350 * @param amsgid Hash of the message to wait for.
351 * @param retval Pointer to storage where the retval of the answer will
352 * be stored.
353 *
354 */
355void async_wait_for(aid_t amsgid, errno_t *retval)
356{
357 if (amsgid == 0) {
358 if (retval)
359 *retval = ENOMEM;
360 return;
361 }
362
363 amsg_t *msg = (amsg_t *) amsgid;
364
365 futex_lock(&async_futex);
366
367 assert(!msg->forget);
368 assert(!msg->destroyed);
369
370 if (msg->done) {
371 futex_unlock(&async_futex);
372 goto done;
373 }
374
375 msg->wdata.fid = fibril_get_id();
376 msg->wdata.active = false;
377 msg->wdata.to_event.inlist = false;
378
379 /* Leave the async_futex locked when entering this function */
380 fibril_switch(FIBRIL_FROM_BLOCKED);
381 futex_unlock(&async_futex);
382
383done:
384 if (retval)
385 *retval = msg->retval;
386
387 amsg_destroy(msg);
388}
389
390/** Wait for a message sent by the async framework, timeout variant.
391 *
392 * If the wait times out, the caller may choose to either wait again by calling
393 * async_wait_for() or async_wait_timeout(), or forget the message via
394 * async_forget().
395 *
396 * @param amsgid Hash of the message to wait for.
397 * @param retval Pointer to storage where the retval of the answer will
398 * be stored.
399 * @param timeout Timeout in microseconds.
400 *
401 * @return Zero on success, ETIMEOUT if the timeout has expired.
402 *
403 */
404errno_t async_wait_timeout(aid_t amsgid, errno_t *retval, suseconds_t timeout)
405{
406 if (amsgid == 0) {
407 if (retval)
408 *retval = ENOMEM;
409 return EOK;
410 }
411
412 amsg_t *msg = (amsg_t *) amsgid;
413
414 futex_lock(&async_futex);
415
416 assert(!msg->forget);
417 assert(!msg->destroyed);
418
419 if (msg->done) {
420 futex_unlock(&async_futex);
421 goto done;
422 }
423
424 /*
425 * Negative timeout is converted to zero timeout to avoid
426 * using tv_add with negative augmenter.
427 */
428 if (timeout < 0)
429 timeout = 0;
430
431 getuptime(&msg->wdata.to_event.expires);
432 tv_add_diff(&msg->wdata.to_event.expires, timeout);
433
434 /*
435 * Current fibril is inserted as waiting regardless of the
436 * "size" of the timeout.
437 *
438 * Checking for msg->done and immediately bailing out when
439 * timeout == 0 would mean that the manager fibril would never
440 * run (consider single threaded program).
441 * Thus the IPC answer would be never retrieved from the kernel.
442 *
443 * Notice that the actual delay would be very small because we
444 * - switch to manager fibril
445 * - the manager sees expired timeout
446 * - and thus adds us back to ready queue
447 * - manager switches back to some ready fibril
448 * (prior it, it checks for incoming IPC).
449 *
450 */
451 msg->wdata.fid = fibril_get_id();
452 msg->wdata.active = false;
453 async_insert_timeout(&msg->wdata);
454
455 /* Leave the async_futex locked when entering this function */
456 fibril_switch(FIBRIL_FROM_BLOCKED);
457 futex_unlock(&async_futex);
458
459 if (!msg->done)
460 return ETIMEOUT;
461
462done:
463 if (retval)
464 *retval = msg->retval;
465
466 amsg_destroy(msg);
467
468 return 0;
469}
470
471/** Discard the message / reply on arrival.
472 *
473 * The message will be marked to be discarded once the reply arrives in
474 * reply_received(). It is not allowed to call async_wait_for() or
475 * async_wait_timeout() on this message after a call to this function.
476 *
477 * @param amsgid Hash of the message to forget.
478 */
479void async_forget(aid_t amsgid)
480{
481 if (amsgid == 0)
482 return;
483
484 amsg_t *msg = (amsg_t *) amsgid;
485
486 assert(!msg->forget);
487 assert(!msg->destroyed);
488
489 futex_lock(&async_futex);
490
491 if (msg->done) {
492 amsg_destroy(msg);
493 } else {
494 msg->dataptr = NULL;
495 msg->forget = true;
496 }
497
498 futex_unlock(&async_futex);
499}
500
501/** Wait for specified time.
502 *
503 * The current fibril is suspended but the thread continues to execute.
504 *
505 * @param timeout Duration of the wait in microseconds.
506 *
507 */
508void async_usleep(suseconds_t timeout)
509{
510 awaiter_t awaiter;
511 awaiter_initialize(&awaiter);
512
513 awaiter.fid = fibril_get_id();
514
515 getuptime(&awaiter.to_event.expires);
516 tv_add_diff(&awaiter.to_event.expires, timeout);
517
518 futex_lock(&async_futex);
519
520 async_insert_timeout(&awaiter);
521
522 /* Leave the async_futex locked when entering this function */
523 fibril_switch(FIBRIL_FROM_BLOCKED);
524 futex_unlock(&async_futex);
525}
526
527/** Delay execution for the specified number of seconds
528 *
529 * @param sec Number of seconds to sleep
530 */
531void async_sleep(unsigned int sec)
532{
533 /*
534 * Sleep in 1000 second steps to support
535 * full argument range
536 */
537
538 while (sec > 0) {
539 unsigned int period = (sec > 1000) ? 1000 : sec;
540
541 async_usleep(period * 1000000);
542 sec -= period;
543 }
544}
545
546/** Pseudo-synchronous message sending - fast version.
547 *
548 * Send message asynchronously and return only after the reply arrives.
549 *
550 * This function can only transfer 4 register payload arguments. For
551 * transferring more arguments, see the slower async_req_slow().
552 *
553 * @param exch Exchange for sending the message.
554 * @param imethod Interface and method of the call.
555 * @param arg1 Service-defined payload argument.
556 * @param arg2 Service-defined payload argument.
557 * @param arg3 Service-defined payload argument.
558 * @param arg4 Service-defined payload argument.
559 * @param r1 If non-NULL, storage for the 1st reply argument.
560 * @param r2 If non-NULL, storage for the 2nd reply argument.
561 * @param r3 If non-NULL, storage for the 3rd reply argument.
562 * @param r4 If non-NULL, storage for the 4th reply argument.
563 * @param r5 If non-NULL, storage for the 5th reply argument.
564 *
565 * @return Return code of the reply or an error code.
566 *
567 */
568errno_t async_req_fast(async_exch_t *exch, sysarg_t imethod, sysarg_t arg1,
569 sysarg_t arg2, sysarg_t arg3, sysarg_t arg4, sysarg_t *r1, sysarg_t *r2,
570 sysarg_t *r3, sysarg_t *r4, sysarg_t *r5)
571{
572 if (exch == NULL)
573 return ENOENT;
574
575 ipc_call_t result;
576 aid_t aid = async_send_4(exch, imethod, arg1, arg2, arg3, arg4,
577 &result);
578
579 errno_t rc;
580 async_wait_for(aid, &rc);
581
582 if (r1)
583 *r1 = IPC_GET_ARG1(result);
584
585 if (r2)
586 *r2 = IPC_GET_ARG2(result);
587
588 if (r3)
589 *r3 = IPC_GET_ARG3(result);
590
591 if (r4)
592 *r4 = IPC_GET_ARG4(result);
593
594 if (r5)
595 *r5 = IPC_GET_ARG5(result);
596
597 return rc;
598}
599
600/** Pseudo-synchronous message sending - slow version.
601 *
602 * Send message asynchronously and return only after the reply arrives.
603 *
604 * @param exch Exchange for sending the message.
605 * @param imethod Interface and method of the call.
606 * @param arg1 Service-defined payload argument.
607 * @param arg2 Service-defined payload argument.
608 * @param arg3 Service-defined payload argument.
609 * @param arg4 Service-defined payload argument.
610 * @param arg5 Service-defined payload argument.
611 * @param r1 If non-NULL, storage for the 1st reply argument.
612 * @param r2 If non-NULL, storage for the 2nd reply argument.
613 * @param r3 If non-NULL, storage for the 3rd reply argument.
614 * @param r4 If non-NULL, storage for the 4th reply argument.
615 * @param r5 If non-NULL, storage for the 5th reply argument.
616 *
617 * @return Return code of the reply or an error code.
618 *
619 */
620errno_t async_req_slow(async_exch_t *exch, sysarg_t imethod, sysarg_t arg1,
621 sysarg_t arg2, sysarg_t arg3, sysarg_t arg4, sysarg_t arg5, sysarg_t *r1,
622 sysarg_t *r2, sysarg_t *r3, sysarg_t *r4, sysarg_t *r5)
623{
624 if (exch == NULL)
625 return ENOENT;
626
627 ipc_call_t result;
628 aid_t aid = async_send_5(exch, imethod, arg1, arg2, arg3, arg4, arg5,
629 &result);
630
631 errno_t rc;
632 async_wait_for(aid, &rc);
633
634 if (r1)
635 *r1 = IPC_GET_ARG1(result);
636
637 if (r2)
638 *r2 = IPC_GET_ARG2(result);
639
640 if (r3)
641 *r3 = IPC_GET_ARG3(result);
642
643 if (r4)
644 *r4 = IPC_GET_ARG4(result);
645
646 if (r5)
647 *r5 = IPC_GET_ARG5(result);
648
649 return rc;
650}
651
652void async_msg_0(async_exch_t *exch, sysarg_t imethod)
653{
654 if (exch != NULL)
655 ipc_call_async_0(exch->phone, imethod, NULL, NULL);
656}
657
658void async_msg_1(async_exch_t *exch, sysarg_t imethod, sysarg_t arg1)
659{
660 if (exch != NULL)
661 ipc_call_async_1(exch->phone, imethod, arg1, NULL, NULL);
662}
663
664void async_msg_2(async_exch_t *exch, sysarg_t imethod, sysarg_t arg1,
665 sysarg_t arg2)
666{
667 if (exch != NULL)
668 ipc_call_async_2(exch->phone, imethod, arg1, arg2, NULL, NULL);
669}
670
671void async_msg_3(async_exch_t *exch, sysarg_t imethod, sysarg_t arg1,
672 sysarg_t arg2, sysarg_t arg3)
673{
674 if (exch != NULL)
675 ipc_call_async_3(exch->phone, imethod, arg1, arg2, arg3, NULL,
676 NULL);
677}
678
679void async_msg_4(async_exch_t *exch, sysarg_t imethod, sysarg_t arg1,
680 sysarg_t arg2, sysarg_t arg3, sysarg_t arg4)
681{
682 if (exch != NULL)
683 ipc_call_async_4(exch->phone, imethod, arg1, arg2, arg3, arg4,
684 NULL, NULL);
685}
686
687void async_msg_5(async_exch_t *exch, sysarg_t imethod, sysarg_t arg1,
688 sysarg_t arg2, sysarg_t arg3, sysarg_t arg4, sysarg_t arg5)
689{
690 if (exch != NULL)
691 ipc_call_async_5(exch->phone, imethod, arg1, arg2, arg3, arg4,
692 arg5, NULL, NULL);
693}
694
695static errno_t async_connect_me_to_internal(cap_phone_handle_t phone,
696 iface_t iface, sysarg_t arg2, sysarg_t arg3, sysarg_t flags,
697 cap_phone_handle_t *out_phone)
698{
699 ipc_call_t result;
700
701 // XXX: Workaround for GCC's inability to infer association between
702 // rc == EOK and *out_phone being assigned.
703 *out_phone = CAP_NIL;
704
705 amsg_t *msg = amsg_create();
706 if (!msg)
707 return ENOENT;
708
709 msg->dataptr = &result;
710 msg->wdata.active = true;
711
712 ipc_call_async_4(phone, IPC_M_CONNECT_ME_TO, (sysarg_t) iface, arg2,
713 arg3, flags, msg, reply_received);
714
715 errno_t rc;
716 async_wait_for((aid_t) msg, &rc);
717
718 if (rc != EOK)
719 return rc;
720
721 *out_phone = (cap_phone_handle_t) IPC_GET_ARG5(result);
722 return EOK;
723}
724
725/** Wrapper for making IPC_M_CONNECT_ME_TO calls using the async framework.
726 *
727 * Ask through phone for a new connection to some service and block until
728 * success.
729 *
730 * @param exch Exchange for sending the message.
731 * @param iface Connection interface.
732 * @param arg2 User defined argument.
733 * @param arg3 User defined argument.
734 *
735 * @return New session on success or NULL on error.
736 *
737 */
738async_sess_t *async_connect_me_to(async_exch_t *exch, iface_t iface,
739 sysarg_t arg2, sysarg_t arg3)
740{
741 if (exch == NULL) {
742 errno = ENOENT;
743 return NULL;
744 }
745
746 async_sess_t *sess = (async_sess_t *) malloc(sizeof(async_sess_t));
747 if (sess == NULL) {
748 errno = ENOMEM;
749 return NULL;
750 }
751
752 cap_phone_handle_t phone;
753 errno_t rc = async_connect_me_to_internal(exch->phone, iface, arg2,
754 arg3, 0, &phone);
755 if (rc != EOK) {
756 errno = rc;
757 free(sess);
758 return NULL;
759 }
760
761 sess->iface = iface;
762 sess->phone = phone;
763 sess->arg1 = iface;
764 sess->arg2 = arg2;
765 sess->arg3 = arg3;
766
767 fibril_mutex_initialize(&sess->remote_state_mtx);
768 sess->remote_state_data = NULL;
769
770 list_initialize(&sess->exch_list);
771 fibril_mutex_initialize(&sess->mutex);
772 atomic_set(&sess->refcnt, 0);
773
774 return sess;
775}
776
777/** Set arguments for new connections.
778 *
779 * FIXME This is an ugly hack to work around the problem that parallel
780 * exchanges are implemented using parallel connections. When we create
781 * a callback session, the framework does not know arguments for the new
782 * connections.
783 *
784 * The proper solution seems to be to implement parallel exchanges using
785 * tagging.
786 */
787void async_sess_args_set(async_sess_t *sess, iface_t iface, sysarg_t arg2,
788 sysarg_t arg3)
789{
790 sess->arg1 = iface;
791 sess->arg2 = arg2;
792 sess->arg3 = arg3;
793}
794
795/** Wrapper for making IPC_M_CONNECT_ME_TO calls using the async framework.
796 *
797 * Ask through phone for a new connection to some service and block until
798 * success.
799 *
800 * @param exch Exchange for sending the message.
801 * @param iface Connection interface.
802 * @param arg2 User defined argument.
803 * @param arg3 User defined argument.
804 *
805 * @return New session on success or NULL on error.
806 *
807 */
808async_sess_t *async_connect_me_to_blocking(async_exch_t *exch, iface_t iface,
809 sysarg_t arg2, sysarg_t arg3)
810{
811 if (exch == NULL) {
812 errno = ENOENT;
813 return NULL;
814 }
815
816 async_sess_t *sess = (async_sess_t *) malloc(sizeof(async_sess_t));
817 if (sess == NULL) {
818 errno = ENOMEM;
819 return NULL;
820 }
821
822 cap_phone_handle_t phone;
823 errno_t rc = async_connect_me_to_internal(exch->phone, iface, arg2,
824 arg3, IPC_FLAG_BLOCKING, &phone);
825 if (rc != EOK) {
826 errno = rc;
827 free(sess);
828 return NULL;
829 }
830
831 sess->iface = iface;
832 sess->phone = phone;
833 sess->arg1 = iface;
834 sess->arg2 = arg2;
835 sess->arg3 = arg3;
836
837 fibril_mutex_initialize(&sess->remote_state_mtx);
838 sess->remote_state_data = NULL;
839
840 list_initialize(&sess->exch_list);
841 fibril_mutex_initialize(&sess->mutex);
842 atomic_set(&sess->refcnt, 0);
843
844 return sess;
845}
846
847/** Connect to a task specified by id.
848 *
849 */
850async_sess_t *async_connect_kbox(task_id_t id)
851{
852 async_sess_t *sess = (async_sess_t *) malloc(sizeof(async_sess_t));
853 if (sess == NULL) {
854 errno = ENOMEM;
855 return NULL;
856 }
857
858 cap_phone_handle_t phone;
859 errno_t rc = ipc_connect_kbox(id, &phone);
860 if (rc != EOK) {
861 errno = rc;
862 free(sess);
863 return NULL;
864 }
865
866 sess->iface = 0;
867 sess->mgmt = EXCHANGE_ATOMIC;
868 sess->phone = phone;
869 sess->arg1 = 0;
870 sess->arg2 = 0;
871 sess->arg3 = 0;
872
873 fibril_mutex_initialize(&sess->remote_state_mtx);
874 sess->remote_state_data = NULL;
875
876 list_initialize(&sess->exch_list);
877 fibril_mutex_initialize(&sess->mutex);
878 atomic_set(&sess->refcnt, 0);
879
880 return sess;
881}
882
883static errno_t async_hangup_internal(cap_phone_handle_t phone)
884{
885 return ipc_hangup(phone);
886}
887
888/** Wrapper for ipc_hangup.
889 *
890 * @param sess Session to hung up.
891 *
892 * @return Zero on success or an error code.
893 *
894 */
895errno_t async_hangup(async_sess_t *sess)
896{
897 async_exch_t *exch;
898
899 assert(sess);
900
901 if (atomic_get(&sess->refcnt) > 0)
902 return EBUSY;
903
904 fibril_mutex_lock(&async_sess_mutex);
905
906 errno_t rc = async_hangup_internal(sess->phone);
907
908 while (!list_empty(&sess->exch_list)) {
909 exch = (async_exch_t *)
910 list_get_instance(list_first(&sess->exch_list),
911 async_exch_t, sess_link);
912
913 list_remove(&exch->sess_link);
914 list_remove(&exch->global_link);
915 async_hangup_internal(exch->phone);
916 free(exch);
917 }
918
919 free(sess);
920
921 fibril_mutex_unlock(&async_sess_mutex);
922
923 return rc;
924}
925
926/** Start new exchange in a session.
927 *
928 * @param session Session.
929 *
930 * @return New exchange or NULL on error.
931 *
932 */
933async_exch_t *async_exchange_begin(async_sess_t *sess)
934{
935 if (sess == NULL)
936 return NULL;
937
938 exch_mgmt_t mgmt = sess->mgmt;
939 if (sess->iface != 0)
940 mgmt = sess->iface & IFACE_EXCHANGE_MASK;
941
942 async_exch_t *exch = NULL;
943
944 fibril_mutex_lock(&async_sess_mutex);
945
946 if (!list_empty(&sess->exch_list)) {
947 /*
948 * There are inactive exchanges in the session.
949 */
950 exch = (async_exch_t *)
951 list_get_instance(list_first(&sess->exch_list),
952 async_exch_t, sess_link);
953
954 list_remove(&exch->sess_link);
955 list_remove(&exch->global_link);
956 } else {
957 /*
958 * There are no available exchanges in the session.
959 */
960
961 if ((mgmt == EXCHANGE_ATOMIC) ||
962 (mgmt == EXCHANGE_SERIALIZE)) {
963 exch = (async_exch_t *) malloc(sizeof(async_exch_t));
964 if (exch != NULL) {
965 link_initialize(&exch->sess_link);
966 link_initialize(&exch->global_link);
967 exch->sess = sess;
968 exch->phone = sess->phone;
969 }
970 } else if (mgmt == EXCHANGE_PARALLEL) {
971 cap_phone_handle_t phone;
972 errno_t rc;
973
974 retry:
975 /*
976 * Make a one-time attempt to connect a new data phone.
977 */
978 rc = async_connect_me_to_internal(sess->phone, sess->arg1,
979 sess->arg2, sess->arg3, 0, &phone);
980 if (rc == EOK) {
981 exch = (async_exch_t *) malloc(sizeof(async_exch_t));
982 if (exch != NULL) {
983 link_initialize(&exch->sess_link);
984 link_initialize(&exch->global_link);
985 exch->sess = sess;
986 exch->phone = phone;
987 } else
988 async_hangup_internal(phone);
989 } else if (!list_empty(&inactive_exch_list)) {
990 /*
991 * We did not manage to connect a new phone. But we
992 * can try to close some of the currently inactive
993 * connections in other sessions and try again.
994 */
995 exch = (async_exch_t *)
996 list_get_instance(list_first(&inactive_exch_list),
997 async_exch_t, global_link);
998
999 list_remove(&exch->sess_link);
1000 list_remove(&exch->global_link);
1001 async_hangup_internal(exch->phone);
1002 free(exch);
1003 goto retry;
1004 } else {
1005 /*
1006 * Wait for a phone to become available.
1007 */
1008 fibril_condvar_wait(&avail_phone_cv, &async_sess_mutex);
1009 goto retry;
1010 }
1011 }
1012 }
1013
1014 fibril_mutex_unlock(&async_sess_mutex);
1015
1016 if (exch != NULL) {
1017 atomic_inc(&sess->refcnt);
1018
1019 if (mgmt == EXCHANGE_SERIALIZE)
1020 fibril_mutex_lock(&sess->mutex);
1021 }
1022
1023 return exch;
1024}
1025
1026/** Finish an exchange.
1027 *
1028 * @param exch Exchange to finish.
1029 *
1030 */
1031void async_exchange_end(async_exch_t *exch)
1032{
1033 if (exch == NULL)
1034 return;
1035
1036 async_sess_t *sess = exch->sess;
1037 assert(sess != NULL);
1038
1039 exch_mgmt_t mgmt = sess->mgmt;
1040 if (sess->iface != 0)
1041 mgmt = sess->iface & IFACE_EXCHANGE_MASK;
1042
1043 atomic_dec(&sess->refcnt);
1044
1045 if (mgmt == EXCHANGE_SERIALIZE)
1046 fibril_mutex_unlock(&sess->mutex);
1047
1048 fibril_mutex_lock(&async_sess_mutex);
1049
1050 list_append(&exch->sess_link, &sess->exch_list);
1051 list_append(&exch->global_link, &inactive_exch_list);
1052 fibril_condvar_signal(&avail_phone_cv);
1053
1054 fibril_mutex_unlock(&async_sess_mutex);
1055}
1056
1057/** Wrapper for IPC_M_SHARE_IN calls using the async framework.
1058 *
1059 * @param exch Exchange for sending the message.
1060 * @param size Size of the destination address space area.
1061 * @param arg User defined argument.
1062 * @param flags Storage for the received flags. Can be NULL.
1063 * @param dst Address of the storage for the destination address space area
1064 * base address. Cannot be NULL.
1065 *
1066 * @return Zero on success or an error code from errno.h.
1067 *
1068 */
1069errno_t async_share_in_start(async_exch_t *exch, size_t size, sysarg_t arg,
1070 unsigned int *flags, void **dst)
1071{
1072 if (exch == NULL)
1073 return ENOENT;
1074
1075 sysarg_t _flags = 0;
1076 sysarg_t _dst = (sysarg_t) -1;
1077 errno_t res = async_req_2_4(exch, IPC_M_SHARE_IN, (sysarg_t) size,
1078 arg, NULL, &_flags, NULL, &_dst);
1079
1080 if (flags)
1081 *flags = (unsigned int) _flags;
1082
1083 *dst = (void *) _dst;
1084 return res;
1085}
1086
1087/** Wrapper for IPC_M_SHARE_OUT calls using the async framework.
1088 *
1089 * @param exch Exchange for sending the message.
1090 * @param src Source address space area base address.
1091 * @param flags Flags to be used for sharing. Bits can be only cleared.
1092 *
1093 * @return Zero on success or an error code from errno.h.
1094 *
1095 */
1096errno_t async_share_out_start(async_exch_t *exch, void *src, unsigned int flags)
1097{
1098 if (exch == NULL)
1099 return ENOENT;
1100
1101 return async_req_3_0(exch, IPC_M_SHARE_OUT, (sysarg_t) src, 0,
1102 (sysarg_t) flags);
1103}
1104
1105/** Start IPC_M_DATA_READ using the async framework.
1106 *
1107 * @param exch Exchange for sending the message.
1108 * @param dst Address of the beginning of the destination buffer.
1109 * @param size Size of the destination buffer (in bytes).
1110 * @param dataptr Storage of call data (arg 2 holds actual data size).
1111 *
1112 * @return Hash of the sent message or 0 on error.
1113 *
1114 */
1115aid_t async_data_read(async_exch_t *exch, void *dst, size_t size,
1116 ipc_call_t *dataptr)
1117{
1118 return async_send_2(exch, IPC_M_DATA_READ, (sysarg_t) dst,
1119 (sysarg_t) size, dataptr);
1120}
1121
1122/** Wrapper for IPC_M_DATA_READ calls using the async framework.
1123 *
1124 * @param exch Exchange for sending the message.
1125 * @param dst Address of the beginning of the destination buffer.
1126 * @param size Size of the destination buffer.
1127 *
1128 * @return Zero on success or an error code from errno.h.
1129 *
1130 */
1131errno_t async_data_read_start(async_exch_t *exch, void *dst, size_t size)
1132{
1133 if (exch == NULL)
1134 return ENOENT;
1135
1136 return async_req_2_0(exch, IPC_M_DATA_READ, (sysarg_t) dst,
1137 (sysarg_t) size);
1138}
1139
1140/** Wrapper for IPC_M_DATA_WRITE calls using the async framework.
1141 *
1142 * @param exch Exchange for sending the message.
1143 * @param src Address of the beginning of the source buffer.
1144 * @param size Size of the source buffer.
1145 *
1146 * @return Zero on success or an error code from errno.h.
1147 *
1148 */
1149errno_t async_data_write_start(async_exch_t *exch, const void *src, size_t size)
1150{
1151 if (exch == NULL)
1152 return ENOENT;
1153
1154 return async_req_2_0(exch, IPC_M_DATA_WRITE, (sysarg_t) src,
1155 (sysarg_t) size);
1156}
1157
1158errno_t async_state_change_start(async_exch_t *exch, sysarg_t arg1, sysarg_t arg2,
1159 sysarg_t arg3, async_exch_t *other_exch)
1160{
1161 return async_req_5_0(exch, IPC_M_STATE_CHANGE_AUTHORIZE,
1162 arg1, arg2, arg3, 0, CAP_HANDLE_RAW(other_exch->phone));
1163}
1164
1165/** Lock and get session remote state
1166 *
1167 * Lock and get the local replica of the remote state
1168 * in stateful sessions. The call should be paired
1169 * with async_remote_state_release*().
1170 *
1171 * @param[in] sess Stateful session.
1172 *
1173 * @return Local replica of the remote state.
1174 *
1175 */
1176void *async_remote_state_acquire(async_sess_t *sess)
1177{
1178 fibril_mutex_lock(&sess->remote_state_mtx);
1179 return sess->remote_state_data;
1180}
1181
1182/** Update the session remote state
1183 *
1184 * Update the local replica of the remote state
1185 * in stateful sessions. The remote state must
1186 * be already locked.
1187 *
1188 * @param[in] sess Stateful session.
1189 * @param[in] state New local replica of the remote state.
1190 *
1191 */
1192void async_remote_state_update(async_sess_t *sess, void *state)
1193{
1194 assert(fibril_mutex_is_locked(&sess->remote_state_mtx));
1195 sess->remote_state_data = state;
1196}
1197
1198/** Release the session remote state
1199 *
1200 * Unlock the local replica of the remote state
1201 * in stateful sessions.
1202 *
1203 * @param[in] sess Stateful session.
1204 *
1205 */
1206void async_remote_state_release(async_sess_t *sess)
1207{
1208 assert(fibril_mutex_is_locked(&sess->remote_state_mtx));
1209
1210 fibril_mutex_unlock(&sess->remote_state_mtx);
1211}
1212
1213/** Release the session remote state and end an exchange
1214 *
1215 * Unlock the local replica of the remote state
1216 * in stateful sessions. This is convenience function
1217 * which gets the session pointer from the exchange
1218 * and also ends the exchange.
1219 *
1220 * @param[in] exch Stateful session's exchange.
1221 *
1222 */
1223void async_remote_state_release_exchange(async_exch_t *exch)
1224{
1225 if (exch == NULL)
1226 return;
1227
1228 async_sess_t *sess = exch->sess;
1229 assert(fibril_mutex_is_locked(&sess->remote_state_mtx));
1230
1231 async_exchange_end(exch);
1232 fibril_mutex_unlock(&sess->remote_state_mtx);
1233}
1234
1235void *async_as_area_create(void *base, size_t size, unsigned int flags,
1236 async_sess_t *pager, sysarg_t id1, sysarg_t id2, sysarg_t id3)
1237{
1238 as_area_pager_info_t pager_info = {
1239 .pager = pager->phone,
1240 .id1 = id1,
1241 .id2 = id2,
1242 .id3 = id3
1243 };
1244 return as_area_create(base, size, flags, &pager_info);
1245}
1246
1247/** @}
1248 */
Note: See TracBrowser for help on using the repository browser.