source: mainline/uspace/lib/c/generic/async/client.c@ 05208d9

lfn serial ticket/834-toolchain-update topic/msim-upgrade topic/simplify-dev-export
Last change on this file since 05208d9 was 514d561, checked in by Jiří Zárevúcky <jiri.zarevucky@…>, 7 years ago

Fibril/async implementation overhaul.

This commit marks the move towards treating the fibril library as a mere
implementation of a generic threading interface. Understood as a layer that
wraps the kernel threads, we not only have to wrap threading itself, but also
every syscall that blocks the kernel thread (by blocking, we mean thread not
doing useful work until an external event happens — e.g. locking a kernel
mutex or thread sleep is understood as blocking, but an as_area_create() is not,
despite potentially taking a long time to complete).

Consequently, we implement fibril_ipc_wait() as a fibril-native wrapper for
kernel's ipc_wait(), and also implement timer functionality like timeouts
as part of the fibril library. This removes the interdependency between fibril
implementation and the async framework — in theory, the fibril API could be
reimplemented as a simple 1:1 shim, and the async framework would continue
working normally (note that the current implementation of loader complicates
this).

To better isolate the fibril internals from the implementation of high-level
synchronization, a fibril_event_t is added. This object conceptually acts
like a single slot wait queue. All other synchronization is implemented in
terms of this primitive.

  • Property mode set to 100644
File size: 26.8 KB
RevLine 
[49a796f1]1/*
2 * Copyright (c) 2006 Ondrej Palkovsky
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 *
9 * - Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 * - Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
14 * - The name of the author may not be used to endorse or promote products
15 * derived from this software without specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
18 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
19 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
20 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
21 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
22 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
26 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27 */
28
29/** @addtogroup libc
30 * @{
31 */
32/** @file
33 */
34
35/**
36 * Asynchronous library
37 *
38 * The aim of this library is to provide a facility for writing programs which
39 * utilize the asynchronous nature of HelenOS IPC, yet using a normal way of
40 * programming.
41 *
42 * You should be able to write very simple multithreaded programs. The async
43 * framework will automatically take care of most of the synchronization
44 * problems.
45 *
46 * Example of use (pseudo C):
47 *
48 * 1) Multithreaded client application
49 *
50 * fibril_create(fibril1, ...);
51 * fibril_create(fibril2, ...);
52 * ...
53 *
54 * int fibril1(void *arg)
55 * {
56 * conn = async_connect_me_to(...);
57 *
58 * exch = async_exchange_begin(conn);
59 * c1 = async_send(exch);
60 * async_exchange_end(exch);
61 *
62 * exch = async_exchange_begin(conn);
63 * c2 = async_send(exch);
64 * async_exchange_end(exch);
65 *
66 * async_wait_for(c1);
67 * async_wait_for(c2);
68 * ...
69 * }
70 *
71 *
72 * 2) Multithreaded server application
73 *
74 * main()
75 * {
76 * async_manager();
77 * }
78 *
79 * port_handler(ichandle, *icall)
80 * {
81 * if (want_refuse) {
82 * async_answer_0(ichandle, ELIMIT);
83 * return;
84 * }
85 * async_answer_0(ichandle, EOK);
86 *
87 * chandle = async_get_call(&call);
88 * somehow_handle_the_call(chandle, call);
89 * async_answer_2(chandle, 1, 2, 3);
90 *
91 * chandle = async_get_call(&call);
92 * ...
93 * }
94 *
95 */
96
97#define LIBC_ASYNC_C_
98#include <ipc/ipc.h>
99#include <async.h>
100#include "../private/async.h"
[9f272d9]101#include "../private/ns.h"
[49a796f1]102#undef LIBC_ASYNC_C_
103
104#include <ipc/irq.h>
105#include <ipc/event.h>
106#include <fibril.h>
107#include <adt/hash_table.h>
108#include <adt/hash.h>
109#include <adt/list.h>
110#include <assert.h>
111#include <errno.h>
112#include <sys/time.h>
113#include <libarch/barrier.h>
114#include <stdbool.h>
115#include <stdlib.h>
116#include <mem.h>
117#include <stdlib.h>
118#include <macros.h>
119#include <as.h>
120#include <abi/mm/as.h>
121#include "../private/libc.h"
[d73d992]122#include "../private/fibril.h"
[49a796f1]123
124/** Naming service session */
[9f272d9]125async_sess_t session_ns;
[49a796f1]126
127/** Message data */
128typedef struct {
[514d561]129 fibril_event_t received;
[49a796f1]130
131 /** If reply was received. */
132 bool done;
133
134 /** If the message / reply should be discarded on arrival. */
135 bool forget;
136
137 /** Pointer to where the answer data is stored. */
138 ipc_call_t *dataptr;
139
140 errno_t retval;
141} amsg_t;
142
143static amsg_t *amsg_create(void)
144{
[514d561]145 return calloc(1, sizeof(amsg_t));
[49a796f1]146}
147
148static void amsg_destroy(amsg_t *msg)
149{
150 free(msg);
151}
152
153/** Mutex protecting inactive_exch_list and avail_phone_cv.
154 *
155 */
156static FIBRIL_MUTEX_INITIALIZE(async_sess_mutex);
157
158/** List of all currently inactive exchanges.
159 *
160 */
161static LIST_INITIALIZE(inactive_exch_list);
162
163/** Condition variable to wait for a phone to become available.
164 *
165 */
166static FIBRIL_CONDVAR_INITIALIZE(avail_phone_cv);
167
168/** Initialize the async framework.
169 *
170 */
171void __async_client_init(void)
172{
[9f272d9]173 session_ns.iface = 0;
174 session_ns.mgmt = EXCHANGE_ATOMIC;
175 session_ns.phone = PHONE_NS;
176 session_ns.arg1 = 0;
177 session_ns.arg2 = 0;
178 session_ns.arg3 = 0;
179
180 fibril_mutex_initialize(&session_ns.remote_state_mtx);
181 session_ns.remote_state_data = NULL;
182
183 list_initialize(&session_ns.exch_list);
184 fibril_mutex_initialize(&session_ns.mutex);
185 atomic_set(&session_ns.refcnt, 0);
[49a796f1]186}
187
188/** Reply received callback.
189 *
190 * This function is called whenever a reply for an asynchronous message sent out
191 * by the asynchronous framework is received.
192 *
193 * Notify the fibril which is waiting for this message that it has arrived.
194 *
195 * @param arg Pointer to the asynchronous message record.
196 * @param retval Value returned in the answer.
197 * @param data Call data of the answer.
198 *
199 */
[d054ad3]200void async_reply_received(ipc_call_t *data)
[49a796f1]201{
[d054ad3]202 amsg_t *msg = data->label;
203 if (!msg)
204 return;
[49a796f1]205
[95838f1]206 futex_lock(&async_futex);
[49a796f1]207
[d054ad3]208 msg->retval = IPC_GET_RETVAL(*data);
[49a796f1]209
[514d561]210 /* Copy data inside lock, just in case the call was detached */
[49a796f1]211 if ((msg->dataptr) && (data))
212 *msg->dataptr = *data;
213
214 msg->done = true;
215
216 if (msg->forget) {
217 amsg_destroy(msg);
[514d561]218 } else {
219 fibril_notify(&msg->received);
[49a796f1]220 }
221
[95838f1]222 futex_unlock(&async_futex);
[49a796f1]223}
224
225/** Send message and return id of the sent message.
226 *
227 * The return value can be used as input for async_wait() to wait for
228 * completion.
229 *
230 * @param exch Exchange for sending the message.
231 * @param imethod Service-defined interface and method.
232 * @param arg1 Service-defined payload argument.
233 * @param arg2 Service-defined payload argument.
234 * @param arg3 Service-defined payload argument.
235 * @param arg4 Service-defined payload argument.
236 * @param dataptr If non-NULL, storage where the reply data will be stored.
237 *
238 * @return Hash of the sent message or 0 on error.
239 *
240 */
241aid_t async_send_fast(async_exch_t *exch, sysarg_t imethod, sysarg_t arg1,
242 sysarg_t arg2, sysarg_t arg3, sysarg_t arg4, ipc_call_t *dataptr)
243{
244 if (exch == NULL)
245 return 0;
246
247 amsg_t *msg = amsg_create();
248 if (msg == NULL)
249 return 0;
250
251 msg->dataptr = dataptr;
252
[d054ad3]253 errno_t rc = ipc_call_async_4(exch->phone, imethod, arg1, arg2, arg3,
254 arg4, msg);
255 if (rc != EOK) {
256 msg->retval = rc;
257 msg->done = true;
258 }
[49a796f1]259
260 return (aid_t) msg;
261}
262
263/** Send message and return id of the sent message
264 *
265 * The return value can be used as input for async_wait() to wait for
266 * completion.
267 *
268 * @param exch Exchange for sending the message.
269 * @param imethod Service-defined interface and method.
270 * @param arg1 Service-defined payload argument.
271 * @param arg2 Service-defined payload argument.
272 * @param arg3 Service-defined payload argument.
273 * @param arg4 Service-defined payload argument.
274 * @param arg5 Service-defined payload argument.
275 * @param dataptr If non-NULL, storage where the reply data will be
276 * stored.
277 *
278 * @return Hash of the sent message or 0 on error.
279 *
280 */
281aid_t async_send_slow(async_exch_t *exch, sysarg_t imethod, sysarg_t arg1,
282 sysarg_t arg2, sysarg_t arg3, sysarg_t arg4, sysarg_t arg5,
283 ipc_call_t *dataptr)
284{
285 if (exch == NULL)
286 return 0;
287
288 amsg_t *msg = amsg_create();
289 if (msg == NULL)
290 return 0;
291
292 msg->dataptr = dataptr;
293
[d054ad3]294 errno_t rc = ipc_call_async_5(exch->phone, imethod, arg1, arg2, arg3,
295 arg4, arg5, msg);
296 if (rc != EOK) {
297 msg->retval = rc;
298 msg->done = true;
299 }
[49a796f1]300
301 return (aid_t) msg;
302}
303
304/** Wait for a message sent by the async framework.
305 *
306 * @param amsgid Hash of the message to wait for.
307 * @param retval Pointer to storage where the retval of the answer will
308 * be stored.
309 *
310 */
311void async_wait_for(aid_t amsgid, errno_t *retval)
312{
[bd9e868]313 if (amsgid == 0) {
314 if (retval)
315 *retval = ENOMEM;
316 return;
317 }
[49a796f1]318
319 amsg_t *msg = (amsg_t *) amsgid;
[514d561]320 fibril_wait_for(&msg->received);
[49a796f1]321
322 if (retval)
323 *retval = msg->retval;
324
325 amsg_destroy(msg);
326}
327
328/** Wait for a message sent by the async framework, timeout variant.
329 *
330 * If the wait times out, the caller may choose to either wait again by calling
331 * async_wait_for() or async_wait_timeout(), or forget the message via
332 * async_forget().
333 *
334 * @param amsgid Hash of the message to wait for.
335 * @param retval Pointer to storage where the retval of the answer will
336 * be stored.
337 * @param timeout Timeout in microseconds.
338 *
339 * @return Zero on success, ETIMEOUT if the timeout has expired.
340 *
341 */
342errno_t async_wait_timeout(aid_t amsgid, errno_t *retval, suseconds_t timeout)
343{
[bd9e868]344 if (amsgid == 0) {
345 if (retval)
346 *retval = ENOMEM;
347 return EOK;
348 }
[49a796f1]349
350 amsg_t *msg = (amsg_t *) amsgid;
351
352 /*
353 * Negative timeout is converted to zero timeout to avoid
354 * using tv_add with negative augmenter.
355 */
356 if (timeout < 0)
357 timeout = 0;
358
[514d561]359 struct timeval expires;
360 getuptime(&expires);
361 tv_add_diff(&expires, timeout);
[49a796f1]362
[514d561]363 errno_t rc = fibril_wait_timeout(&msg->received, &expires);
364 if (rc != EOK)
365 return rc;
[49a796f1]366
367 if (retval)
368 *retval = msg->retval;
369
370 amsg_destroy(msg);
371
[514d561]372 return EOK;
[49a796f1]373}
374
375/** Discard the message / reply on arrival.
376 *
377 * The message will be marked to be discarded once the reply arrives in
378 * reply_received(). It is not allowed to call async_wait_for() or
379 * async_wait_timeout() on this message after a call to this function.
380 *
381 * @param amsgid Hash of the message to forget.
382 */
383void async_forget(aid_t amsgid)
384{
[bd9e868]385 if (amsgid == 0)
386 return;
387
[49a796f1]388 amsg_t *msg = (amsg_t *) amsgid;
389
390 assert(!msg->forget);
391
[95838f1]392 futex_lock(&async_futex);
[49a796f1]393
394 if (msg->done) {
395 amsg_destroy(msg);
396 } else {
397 msg->dataptr = NULL;
398 msg->forget = true;
399 }
400
[95838f1]401 futex_unlock(&async_futex);
[49a796f1]402}
403
404/** Pseudo-synchronous message sending - fast version.
405 *
406 * Send message asynchronously and return only after the reply arrives.
407 *
408 * This function can only transfer 4 register payload arguments. For
409 * transferring more arguments, see the slower async_req_slow().
410 *
411 * @param exch Exchange for sending the message.
412 * @param imethod Interface and method of the call.
413 * @param arg1 Service-defined payload argument.
414 * @param arg2 Service-defined payload argument.
415 * @param arg3 Service-defined payload argument.
416 * @param arg4 Service-defined payload argument.
417 * @param r1 If non-NULL, storage for the 1st reply argument.
418 * @param r2 If non-NULL, storage for the 2nd reply argument.
419 * @param r3 If non-NULL, storage for the 3rd reply argument.
420 * @param r4 If non-NULL, storage for the 4th reply argument.
421 * @param r5 If non-NULL, storage for the 5th reply argument.
422 *
423 * @return Return code of the reply or an error code.
424 *
425 */
426errno_t async_req_fast(async_exch_t *exch, sysarg_t imethod, sysarg_t arg1,
427 sysarg_t arg2, sysarg_t arg3, sysarg_t arg4, sysarg_t *r1, sysarg_t *r2,
428 sysarg_t *r3, sysarg_t *r4, sysarg_t *r5)
429{
430 if (exch == NULL)
431 return ENOENT;
432
433 ipc_call_t result;
434 aid_t aid = async_send_4(exch, imethod, arg1, arg2, arg3, arg4,
435 &result);
436
437 errno_t rc;
438 async_wait_for(aid, &rc);
439
440 if (r1)
441 *r1 = IPC_GET_ARG1(result);
442
443 if (r2)
444 *r2 = IPC_GET_ARG2(result);
445
446 if (r3)
447 *r3 = IPC_GET_ARG3(result);
448
449 if (r4)
450 *r4 = IPC_GET_ARG4(result);
451
452 if (r5)
453 *r5 = IPC_GET_ARG5(result);
454
455 return rc;
456}
457
458/** Pseudo-synchronous message sending - slow version.
459 *
460 * Send message asynchronously and return only after the reply arrives.
461 *
462 * @param exch Exchange for sending the message.
463 * @param imethod Interface and method of the call.
464 * @param arg1 Service-defined payload argument.
465 * @param arg2 Service-defined payload argument.
466 * @param arg3 Service-defined payload argument.
467 * @param arg4 Service-defined payload argument.
468 * @param arg5 Service-defined payload argument.
469 * @param r1 If non-NULL, storage for the 1st reply argument.
470 * @param r2 If non-NULL, storage for the 2nd reply argument.
471 * @param r3 If non-NULL, storage for the 3rd reply argument.
472 * @param r4 If non-NULL, storage for the 4th reply argument.
473 * @param r5 If non-NULL, storage for the 5th reply argument.
474 *
475 * @return Return code of the reply or an error code.
476 *
477 */
478errno_t async_req_slow(async_exch_t *exch, sysarg_t imethod, sysarg_t arg1,
479 sysarg_t arg2, sysarg_t arg3, sysarg_t arg4, sysarg_t arg5, sysarg_t *r1,
480 sysarg_t *r2, sysarg_t *r3, sysarg_t *r4, sysarg_t *r5)
481{
482 if (exch == NULL)
483 return ENOENT;
484
485 ipc_call_t result;
486 aid_t aid = async_send_5(exch, imethod, arg1, arg2, arg3, arg4, arg5,
487 &result);
488
489 errno_t rc;
490 async_wait_for(aid, &rc);
491
492 if (r1)
493 *r1 = IPC_GET_ARG1(result);
494
495 if (r2)
496 *r2 = IPC_GET_ARG2(result);
497
498 if (r3)
499 *r3 = IPC_GET_ARG3(result);
500
501 if (r4)
502 *r4 = IPC_GET_ARG4(result);
503
504 if (r5)
505 *r5 = IPC_GET_ARG5(result);
506
507 return rc;
508}
509
510void async_msg_0(async_exch_t *exch, sysarg_t imethod)
511{
512 if (exch != NULL)
[d054ad3]513 ipc_call_async_0(exch->phone, imethod, NULL);
[49a796f1]514}
515
516void async_msg_1(async_exch_t *exch, sysarg_t imethod, sysarg_t arg1)
517{
518 if (exch != NULL)
[d054ad3]519 ipc_call_async_1(exch->phone, imethod, arg1, NULL);
[49a796f1]520}
521
522void async_msg_2(async_exch_t *exch, sysarg_t imethod, sysarg_t arg1,
523 sysarg_t arg2)
524{
525 if (exch != NULL)
[d054ad3]526 ipc_call_async_2(exch->phone, imethod, arg1, arg2, NULL);
[49a796f1]527}
528
529void async_msg_3(async_exch_t *exch, sysarg_t imethod, sysarg_t arg1,
530 sysarg_t arg2, sysarg_t arg3)
531{
532 if (exch != NULL)
[d054ad3]533 ipc_call_async_3(exch->phone, imethod, arg1, arg2, arg3, NULL);
[49a796f1]534}
535
536void async_msg_4(async_exch_t *exch, sysarg_t imethod, sysarg_t arg1,
537 sysarg_t arg2, sysarg_t arg3, sysarg_t arg4)
538{
539 if (exch != NULL)
540 ipc_call_async_4(exch->phone, imethod, arg1, arg2, arg3, arg4,
[d054ad3]541 NULL);
[49a796f1]542}
543
544void async_msg_5(async_exch_t *exch, sysarg_t imethod, sysarg_t arg1,
545 sysarg_t arg2, sysarg_t arg3, sysarg_t arg4, sysarg_t arg5)
546{
547 if (exch != NULL)
548 ipc_call_async_5(exch->phone, imethod, arg1, arg2, arg3, arg4,
[d054ad3]549 arg5, NULL);
[49a796f1]550}
551
552static errno_t async_connect_me_to_internal(cap_phone_handle_t phone,
[914c693]553 iface_t iface, sysarg_t arg2, sysarg_t arg3, sysarg_t flags,
[49a796f1]554 cap_phone_handle_t *out_phone)
555{
556 ipc_call_t result;
557
558 // XXX: Workaround for GCC's inability to infer association between
559 // rc == EOK and *out_phone being assigned.
560 *out_phone = CAP_NIL;
561
562 amsg_t *msg = amsg_create();
563 if (!msg)
564 return ENOENT;
565
566 msg->dataptr = &result;
567
[d054ad3]568 errno_t rc = ipc_call_async_4(phone, IPC_M_CONNECT_ME_TO,
569 (sysarg_t) iface, arg2, arg3, flags, msg);
570 if (rc != EOK) {
571 msg->retval = rc;
572 msg->done = true;
573 }
[49a796f1]574
575 async_wait_for((aid_t) msg, &rc);
576
577 if (rc != EOK)
578 return rc;
579
580 *out_phone = (cap_phone_handle_t) IPC_GET_ARG5(result);
581 return EOK;
582}
583
584/** Wrapper for making IPC_M_CONNECT_ME_TO calls using the async framework.
585 *
586 * Ask through phone for a new connection to some service and block until
587 * success.
588 *
589 * @param exch Exchange for sending the message.
590 * @param iface Connection interface.
591 * @param arg2 User defined argument.
592 * @param arg3 User defined argument.
593 *
594 * @return New session on success or NULL on error.
595 *
596 */
[914c693]597async_sess_t *async_connect_me_to(async_exch_t *exch, iface_t iface,
[49a796f1]598 sysarg_t arg2, sysarg_t arg3)
599{
600 if (exch == NULL) {
601 errno = ENOENT;
602 return NULL;
603 }
604
605 async_sess_t *sess = (async_sess_t *) malloc(sizeof(async_sess_t));
606 if (sess == NULL) {
607 errno = ENOMEM;
608 return NULL;
609 }
610
611 cap_phone_handle_t phone;
612 errno_t rc = async_connect_me_to_internal(exch->phone, iface, arg2,
613 arg3, 0, &phone);
614 if (rc != EOK) {
615 errno = rc;
616 free(sess);
617 return NULL;
618 }
619
620 sess->iface = iface;
621 sess->phone = phone;
622 sess->arg1 = iface;
623 sess->arg2 = arg2;
624 sess->arg3 = arg3;
625
626 fibril_mutex_initialize(&sess->remote_state_mtx);
627 sess->remote_state_data = NULL;
628
629 list_initialize(&sess->exch_list);
630 fibril_mutex_initialize(&sess->mutex);
631 atomic_set(&sess->refcnt, 0);
632
633 return sess;
634}
635
636/** Set arguments for new connections.
637 *
638 * FIXME This is an ugly hack to work around the problem that parallel
639 * exchanges are implemented using parallel connections. When we create
640 * a callback session, the framework does not know arguments for the new
641 * connections.
642 *
643 * The proper solution seems to be to implement parallel exchanges using
644 * tagging.
645 */
[914c693]646void async_sess_args_set(async_sess_t *sess, iface_t iface, sysarg_t arg2,
[49a796f1]647 sysarg_t arg3)
648{
[914c693]649 sess->arg1 = iface;
[49a796f1]650 sess->arg2 = arg2;
651 sess->arg3 = arg3;
652}
653
654/** Wrapper for making IPC_M_CONNECT_ME_TO calls using the async framework.
655 *
656 * Ask through phone for a new connection to some service and block until
657 * success.
658 *
659 * @param exch Exchange for sending the message.
660 * @param iface Connection interface.
661 * @param arg2 User defined argument.
662 * @param arg3 User defined argument.
663 *
664 * @return New session on success or NULL on error.
665 *
666 */
[914c693]667async_sess_t *async_connect_me_to_blocking(async_exch_t *exch, iface_t iface,
[49a796f1]668 sysarg_t arg2, sysarg_t arg3)
669{
670 if (exch == NULL) {
671 errno = ENOENT;
672 return NULL;
673 }
674
675 async_sess_t *sess = (async_sess_t *) malloc(sizeof(async_sess_t));
676 if (sess == NULL) {
677 errno = ENOMEM;
678 return NULL;
679 }
680
681 cap_phone_handle_t phone;
682 errno_t rc = async_connect_me_to_internal(exch->phone, iface, arg2,
683 arg3, IPC_FLAG_BLOCKING, &phone);
684 if (rc != EOK) {
685 errno = rc;
686 free(sess);
687 return NULL;
688 }
689
690 sess->iface = iface;
691 sess->phone = phone;
692 sess->arg1 = iface;
693 sess->arg2 = arg2;
694 sess->arg3 = arg3;
695
696 fibril_mutex_initialize(&sess->remote_state_mtx);
697 sess->remote_state_data = NULL;
698
699 list_initialize(&sess->exch_list);
700 fibril_mutex_initialize(&sess->mutex);
701 atomic_set(&sess->refcnt, 0);
702
703 return sess;
704}
705
706/** Connect to a task specified by id.
707 *
708 */
709async_sess_t *async_connect_kbox(task_id_t id)
710{
711 async_sess_t *sess = (async_sess_t *) malloc(sizeof(async_sess_t));
712 if (sess == NULL) {
713 errno = ENOMEM;
714 return NULL;
715 }
716
717 cap_phone_handle_t phone;
718 errno_t rc = ipc_connect_kbox(id, &phone);
719 if (rc != EOK) {
720 errno = rc;
721 free(sess);
722 return NULL;
723 }
724
725 sess->iface = 0;
726 sess->mgmt = EXCHANGE_ATOMIC;
727 sess->phone = phone;
728 sess->arg1 = 0;
729 sess->arg2 = 0;
730 sess->arg3 = 0;
731
732 fibril_mutex_initialize(&sess->remote_state_mtx);
733 sess->remote_state_data = NULL;
734
735 list_initialize(&sess->exch_list);
736 fibril_mutex_initialize(&sess->mutex);
737 atomic_set(&sess->refcnt, 0);
738
739 return sess;
740}
741
742static errno_t async_hangup_internal(cap_phone_handle_t phone)
743{
744 return ipc_hangup(phone);
745}
746
747/** Wrapper for ipc_hangup.
748 *
749 * @param sess Session to hung up.
750 *
751 * @return Zero on success or an error code.
752 *
753 */
754errno_t async_hangup(async_sess_t *sess)
755{
756 async_exch_t *exch;
757
758 assert(sess);
759
760 if (atomic_get(&sess->refcnt) > 0)
761 return EBUSY;
762
763 fibril_mutex_lock(&async_sess_mutex);
764
765 errno_t rc = async_hangup_internal(sess->phone);
766
767 while (!list_empty(&sess->exch_list)) {
768 exch = (async_exch_t *)
769 list_get_instance(list_first(&sess->exch_list),
770 async_exch_t, sess_link);
771
772 list_remove(&exch->sess_link);
773 list_remove(&exch->global_link);
774 async_hangup_internal(exch->phone);
775 free(exch);
776 }
777
778 free(sess);
779
780 fibril_mutex_unlock(&async_sess_mutex);
781
782 return rc;
783}
784
785/** Start new exchange in a session.
786 *
787 * @param session Session.
788 *
789 * @return New exchange or NULL on error.
790 *
791 */
792async_exch_t *async_exchange_begin(async_sess_t *sess)
793{
794 if (sess == NULL)
795 return NULL;
796
797 exch_mgmt_t mgmt = sess->mgmt;
798 if (sess->iface != 0)
799 mgmt = sess->iface & IFACE_EXCHANGE_MASK;
800
801 async_exch_t *exch = NULL;
802
803 fibril_mutex_lock(&async_sess_mutex);
804
805 if (!list_empty(&sess->exch_list)) {
806 /*
807 * There are inactive exchanges in the session.
808 */
809 exch = (async_exch_t *)
810 list_get_instance(list_first(&sess->exch_list),
811 async_exch_t, sess_link);
812
813 list_remove(&exch->sess_link);
814 list_remove(&exch->global_link);
815 } else {
816 /*
817 * There are no available exchanges in the session.
818 */
819
820 if ((mgmt == EXCHANGE_ATOMIC) ||
821 (mgmt == EXCHANGE_SERIALIZE)) {
822 exch = (async_exch_t *) malloc(sizeof(async_exch_t));
823 if (exch != NULL) {
824 link_initialize(&exch->sess_link);
825 link_initialize(&exch->global_link);
826 exch->sess = sess;
827 exch->phone = sess->phone;
828 }
829 } else if (mgmt == EXCHANGE_PARALLEL) {
830 cap_phone_handle_t phone;
831 errno_t rc;
832
833 retry:
834 /*
835 * Make a one-time attempt to connect a new data phone.
836 */
837 rc = async_connect_me_to_internal(sess->phone, sess->arg1,
838 sess->arg2, sess->arg3, 0, &phone);
839 if (rc == EOK) {
840 exch = (async_exch_t *) malloc(sizeof(async_exch_t));
841 if (exch != NULL) {
842 link_initialize(&exch->sess_link);
843 link_initialize(&exch->global_link);
844 exch->sess = sess;
845 exch->phone = phone;
846 } else
847 async_hangup_internal(phone);
848 } else if (!list_empty(&inactive_exch_list)) {
849 /*
850 * We did not manage to connect a new phone. But we
851 * can try to close some of the currently inactive
852 * connections in other sessions and try again.
853 */
854 exch = (async_exch_t *)
855 list_get_instance(list_first(&inactive_exch_list),
856 async_exch_t, global_link);
857
858 list_remove(&exch->sess_link);
859 list_remove(&exch->global_link);
860 async_hangup_internal(exch->phone);
861 free(exch);
862 goto retry;
863 } else {
864 /*
865 * Wait for a phone to become available.
866 */
867 fibril_condvar_wait(&avail_phone_cv, &async_sess_mutex);
868 goto retry;
869 }
870 }
871 }
872
873 fibril_mutex_unlock(&async_sess_mutex);
874
875 if (exch != NULL) {
876 atomic_inc(&sess->refcnt);
877
878 if (mgmt == EXCHANGE_SERIALIZE)
879 fibril_mutex_lock(&sess->mutex);
880 }
881
882 return exch;
883}
884
885/** Finish an exchange.
886 *
887 * @param exch Exchange to finish.
888 *
889 */
890void async_exchange_end(async_exch_t *exch)
891{
892 if (exch == NULL)
893 return;
894
895 async_sess_t *sess = exch->sess;
896 assert(sess != NULL);
897
898 exch_mgmt_t mgmt = sess->mgmt;
899 if (sess->iface != 0)
900 mgmt = sess->iface & IFACE_EXCHANGE_MASK;
901
902 atomic_dec(&sess->refcnt);
903
904 if (mgmt == EXCHANGE_SERIALIZE)
905 fibril_mutex_unlock(&sess->mutex);
906
907 fibril_mutex_lock(&async_sess_mutex);
908
909 list_append(&exch->sess_link, &sess->exch_list);
910 list_append(&exch->global_link, &inactive_exch_list);
911 fibril_condvar_signal(&avail_phone_cv);
912
913 fibril_mutex_unlock(&async_sess_mutex);
914}
915
916/** Wrapper for IPC_M_SHARE_IN calls using the async framework.
917 *
918 * @param exch Exchange for sending the message.
919 * @param size Size of the destination address space area.
920 * @param arg User defined argument.
921 * @param flags Storage for the received flags. Can be NULL.
922 * @param dst Address of the storage for the destination address space area
923 * base address. Cannot be NULL.
924 *
925 * @return Zero on success or an error code from errno.h.
926 *
927 */
928errno_t async_share_in_start(async_exch_t *exch, size_t size, sysarg_t arg,
929 unsigned int *flags, void **dst)
930{
931 if (exch == NULL)
932 return ENOENT;
933
934 sysarg_t _flags = 0;
935 sysarg_t _dst = (sysarg_t) -1;
936 errno_t res = async_req_2_4(exch, IPC_M_SHARE_IN, (sysarg_t) size,
937 arg, NULL, &_flags, NULL, &_dst);
938
939 if (flags)
940 *flags = (unsigned int) _flags;
941
942 *dst = (void *) _dst;
943 return res;
944}
945
946/** Wrapper for IPC_M_SHARE_OUT calls using the async framework.
947 *
948 * @param exch Exchange for sending the message.
949 * @param src Source address space area base address.
950 * @param flags Flags to be used for sharing. Bits can be only cleared.
951 *
952 * @return Zero on success or an error code from errno.h.
953 *
954 */
955errno_t async_share_out_start(async_exch_t *exch, void *src, unsigned int flags)
956{
957 if (exch == NULL)
958 return ENOENT;
959
960 return async_req_3_0(exch, IPC_M_SHARE_OUT, (sysarg_t) src, 0,
961 (sysarg_t) flags);
962}
963
964/** Start IPC_M_DATA_READ using the async framework.
965 *
966 * @param exch Exchange for sending the message.
967 * @param dst Address of the beginning of the destination buffer.
968 * @param size Size of the destination buffer (in bytes).
969 * @param dataptr Storage of call data (arg 2 holds actual data size).
970 *
971 * @return Hash of the sent message or 0 on error.
972 *
973 */
974aid_t async_data_read(async_exch_t *exch, void *dst, size_t size,
975 ipc_call_t *dataptr)
976{
977 return async_send_2(exch, IPC_M_DATA_READ, (sysarg_t) dst,
978 (sysarg_t) size, dataptr);
979}
980
981/** Wrapper for IPC_M_DATA_READ calls using the async framework.
982 *
983 * @param exch Exchange for sending the message.
984 * @param dst Address of the beginning of the destination buffer.
985 * @param size Size of the destination buffer.
986 *
987 * @return Zero on success or an error code from errno.h.
988 *
989 */
990errno_t async_data_read_start(async_exch_t *exch, void *dst, size_t size)
991{
992 if (exch == NULL)
993 return ENOENT;
994
995 return async_req_2_0(exch, IPC_M_DATA_READ, (sysarg_t) dst,
996 (sysarg_t) size);
997}
998
999/** Wrapper for IPC_M_DATA_WRITE calls using the async framework.
1000 *
1001 * @param exch Exchange for sending the message.
1002 * @param src Address of the beginning of the source buffer.
1003 * @param size Size of the source buffer.
1004 *
1005 * @return Zero on success or an error code from errno.h.
1006 *
1007 */
1008errno_t async_data_write_start(async_exch_t *exch, const void *src, size_t size)
1009{
1010 if (exch == NULL)
1011 return ENOENT;
1012
1013 return async_req_2_0(exch, IPC_M_DATA_WRITE, (sysarg_t) src,
1014 (sysarg_t) size);
1015}
1016
1017errno_t async_state_change_start(async_exch_t *exch, sysarg_t arg1, sysarg_t arg2,
1018 sysarg_t arg3, async_exch_t *other_exch)
1019{
1020 return async_req_5_0(exch, IPC_M_STATE_CHANGE_AUTHORIZE,
1021 arg1, arg2, arg3, 0, CAP_HANDLE_RAW(other_exch->phone));
1022}
1023
1024/** Lock and get session remote state
1025 *
1026 * Lock and get the local replica of the remote state
1027 * in stateful sessions. The call should be paired
1028 * with async_remote_state_release*().
1029 *
1030 * @param[in] sess Stateful session.
1031 *
1032 * @return Local replica of the remote state.
1033 *
1034 */
1035void *async_remote_state_acquire(async_sess_t *sess)
1036{
1037 fibril_mutex_lock(&sess->remote_state_mtx);
1038 return sess->remote_state_data;
1039}
1040
1041/** Update the session remote state
1042 *
1043 * Update the local replica of the remote state
1044 * in stateful sessions. The remote state must
1045 * be already locked.
1046 *
1047 * @param[in] sess Stateful session.
1048 * @param[in] state New local replica of the remote state.
1049 *
1050 */
1051void async_remote_state_update(async_sess_t *sess, void *state)
1052{
1053 assert(fibril_mutex_is_locked(&sess->remote_state_mtx));
1054 sess->remote_state_data = state;
1055}
1056
1057/** Release the session remote state
1058 *
1059 * Unlock the local replica of the remote state
1060 * in stateful sessions.
1061 *
1062 * @param[in] sess Stateful session.
1063 *
1064 */
1065void async_remote_state_release(async_sess_t *sess)
1066{
1067 assert(fibril_mutex_is_locked(&sess->remote_state_mtx));
1068
1069 fibril_mutex_unlock(&sess->remote_state_mtx);
1070}
1071
1072/** Release the session remote state and end an exchange
1073 *
1074 * Unlock the local replica of the remote state
1075 * in stateful sessions. This is convenience function
1076 * which gets the session pointer from the exchange
1077 * and also ends the exchange.
1078 *
1079 * @param[in] exch Stateful session's exchange.
1080 *
1081 */
1082void async_remote_state_release_exchange(async_exch_t *exch)
1083{
1084 if (exch == NULL)
1085 return;
1086
1087 async_sess_t *sess = exch->sess;
1088 assert(fibril_mutex_is_locked(&sess->remote_state_mtx));
1089
1090 async_exchange_end(exch);
1091 fibril_mutex_unlock(&sess->remote_state_mtx);
1092}
1093
1094void *async_as_area_create(void *base, size_t size, unsigned int flags,
1095 async_sess_t *pager, sysarg_t id1, sysarg_t id2, sysarg_t id3)
1096{
1097 as_area_pager_info_t pager_info = {
1098 .pager = pager->phone,
1099 .id1 = id1,
1100 .id2 = id2,
1101 .id3 = id3
1102 };
1103 return as_area_create(base, size, flags, &pager_info);
1104}
1105
1106/** @}
1107 */
Note: See TracBrowser for help on using the repository browser.