source: mainline/uspace/lib/libc/generic/async.c@ 12f91130

lfn serial ticket/834-toolchain-update topic/msim-upgrade topic/simplify-dev-export
Last change on this file since 12f91130 was 12f91130, checked in by Jakub Jermar <jakub@…>, 18 years ago

Remove fibril_join().
We cannot guarantee our assumptions that easily.
This broken feature is removed instead of fixing because there are now users of it.

  • Property mode set to 100644
File size: 20.2 KB
RevLine 
[06502f7d]1/*
[df4ed85]2 * Copyright (c) 2006 Ondrej Palkovsky
[06502f7d]3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 *
9 * - Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 * - Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
14 * - The name of the author may not be used to endorse or promote products
15 * derived from this software without specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
18 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
19 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
20 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
21 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
22 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
26 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
[b2951e2]27 */
28
[a46da63]29/** @addtogroup libc
[b2951e2]30 * @{
31 */
32/** @file
[450cd3a]33 */
[06502f7d]34
[80649a91]35/**
36 * Asynchronous library
37 *
38 * The aim of this library is facilitating writing programs utilizing
[14de0dd8]39 * the asynchronous nature of HelenOS IPC, yet using a normal way
[80649a91]40 * of programming.
41 *
42 * You should be able to write very simple multithreaded programs,
43 * the async framework will automatically take care of most synchronization
44 * problems.
45 *
46 * Default semantics:
47 * - send() - send asynchronously. If the kernel refuses to send more
48 * messages, [ try to get responses from kernel, if nothing
49 * found, might try synchronous ]
50 *
51 * Example of use:
52 *
53 * 1) Multithreaded client application
[12f91130]54 * fibril_create(fibril1);
55 * fibril_create(fibril2);
[80649a91]56 * ...
57 *
[12f91130]58 * fibril1() {
[80649a91]59 * conn = ipc_connect_me_to();
60 * c1 = send(conn);
61 * c2 = send(conn);
62 * wait_for(c1);
63 * wait_for(c2);
64 * }
65 *
66 *
67 * 2) Multithreaded server application
68 * main() {
[53ca318]69 * async_manager();
[80649a91]70 * }
71 *
72 *
[53ca318]73 * client_connection(icallid, *icall) {
74 * if (want_refuse) {
75 * ipc_answer_fast(icallid, ELIMIT, 0, 0);
76 * return;
77 * }
78 * ipc_answer_fast(icallid, 0, 0, 0);
[80649a91]79 *
[53ca318]80 * callid = async_get_call(&call);
81 * handle(callid, call);
[14de0dd8]82 * ipc_answer_fast(callid, 1, 2, 3);
[53ca318]83 *
84 * callid = async_get_call(&call);
[80649a91]85 * ....
86 * }
[a2cd194]87 *
[80649a91]88 */
89#include <futex.h>
90#include <async.h>
[bc1f1c2]91#include <fibril.h>
[80649a91]92#include <stdio.h>
93#include <libadt/hash_table.h>
94#include <libadt/list.h>
95#include <ipc/ipc.h>
96#include <assert.h>
97#include <errno.h>
[c042bdd]98#include <time.h>
99#include <arch/barrier.h>
[80649a91]100
[fc42b28]101atomic_t async_futex = FUTEX_INITIALIZER;
[80649a91]102static hash_table_t conn_hash_table;
[c042bdd]103static LIST_INITIALIZE(timeout_list);
[80649a91]104
[01ff41c]105typedef struct {
[bc1f1c2]106 /** Expiration time for waiting fibril. */
107 struct timeval expires;
108 /** If true, this struct is in the timeout list. */
109 int inlist;
[49d072e]110 link_t link;
111
[bc1f1c2]112 /** Fibril waiting for this message. */
113 fid_t fid;
114 /** If this fibril is currently active. */
115 int active;
116 /** If true, we timed out. */
117 int timedout;
[49d072e]118} awaiter_t;
119
120typedef struct {
121 awaiter_t wdata;
122
[14de0dd8]123 int done; /**< If reply was received */
124 ipc_call_t *dataptr; /**< Pointer where the answer data
[bc1f1c2]125 * is stored */
[01ff41c]126 ipcarg_t retval;
127} amsg_t;
128
[80649a91]129typedef struct {
130 link_t link;
131 ipc_callid_t callid;
132 ipc_call_t call;
133} msg_t;
134
135typedef struct {
[49d072e]136 awaiter_t wdata;
137
[bc1f1c2]138 link_t link; /**< Hash table link. */
[14de0dd8]139 ipcarg_t in_phone_hash; /**< Incoming phone hash. */
[bc1f1c2]140 link_t msg_queue; /**< Messages that should be delivered
141 * to this fibril. */
[80649a91]142 /* Structures for connection opening packet */
143 ipc_callid_t callid;
144 ipc_call_t call;
[bc1f1c2]145 ipc_callid_t close_callid; /* Identification of closing packet. */
146 void (*cfibril)(ipc_callid_t, ipc_call_t *);
[80649a91]147} connection_t;
148
[bc1f1c2]149/** Identifier of the incoming connection handled by the current fibril. */
150__thread connection_t *FIBRIL_connection;
[085bd54]151/** If true, it is forbidden to use async_req functions and
152 * all preemption is disabled */
153__thread int in_interrupt_handler;
[80649a91]154
[da0c91e7]155static void default_client_connection(ipc_callid_t callid, ipc_call_t *call);
[51dbadf3]156static void default_interrupt_received(ipc_callid_t callid, ipc_call_t *call);
[da0c91e7]157static async_client_conn_t client_connection = default_client_connection;
[51dbadf3]158static async_client_conn_t interrupt_received = default_interrupt_received;
[da0c91e7]159
[c042bdd]160/** Add microseconds to give timeval */
161static void tv_add(struct timeval *tv, suseconds_t usecs)
162{
163 tv->tv_sec += usecs / 1000000;
164 tv->tv_usec += usecs % 1000000;
165 if (tv->tv_usec > 1000000) {
166 tv->tv_sec++;
167 tv->tv_usec -= 1000000;
168 }
169}
170
171/** Subtract 2 timevals, return microseconds difference */
172static suseconds_t tv_sub(struct timeval *tv1, struct timeval *tv2)
173{
174 suseconds_t result;
175
176 result = tv1->tv_usec - tv2->tv_usec;
177 result += (tv1->tv_sec - tv2->tv_sec) * 1000000;
178
179 return result;
180}
181
182/** Compare timeval
183 *
184 * @return 1 if tv1 > tv2, otherwise 0
185 */
186static int tv_gt(struct timeval *tv1, struct timeval *tv2)
187{
188 if (tv1->tv_sec > tv2->tv_sec)
189 return 1;
190 if (tv1->tv_sec == tv2->tv_sec && tv1->tv_usec > tv2->tv_usec)
191 return 1;
192 return 0;
193}
[c0e674a]194static int tv_gteq(struct timeval *tv1, struct timeval *tv2)
195{
196 if (tv1->tv_sec > tv2->tv_sec)
197 return 1;
198 if (tv1->tv_sec == tv2->tv_sec && tv1->tv_usec >= tv2->tv_usec)
199 return 1;
200 return 0;
201}
[c042bdd]202
[80649a91]203/* Hash table functions */
[a2cd194]204#define CONN_HASH_TABLE_CHAINS 32
[80649a91]205
206static hash_index_t conn_hash(unsigned long *key)
[450cd3a]207{
[80649a91]208 assert(key);
[a2cd194]209 return ((*key) >> 4) % CONN_HASH_TABLE_CHAINS;
[450cd3a]210}
[06502f7d]211
[80649a91]212static int conn_compare(unsigned long key[], hash_count_t keys, link_t *item)
[450cd3a]213{
[80649a91]214 connection_t *hs;
215
216 hs = hash_table_get_instance(item, connection_t, link);
217
218 return key[0] == hs->in_phone_hash;
[450cd3a]219}
[06502f7d]220
[80649a91]221static void conn_remove(link_t *item)
[450cd3a]222{
[80649a91]223 free(hash_table_get_instance(item, connection_t, link));
[450cd3a]224}
225
[80649a91]226
227/** Operations for NS hash table. */
228static hash_table_operations_t conn_hash_table_ops = {
229 .hash = conn_hash,
230 .compare = conn_compare,
231 .remove_callback = conn_remove
232};
233
[49d072e]234/** Insert sort timeout msg into timeouts list
235 *
236 */
237static void insert_timeout(awaiter_t *wd)
238{
239 link_t *tmp;
240 awaiter_t *cur;
241
242 wd->timedout = 0;
[085bd54]243 wd->inlist = 1;
[49d072e]244
245 tmp = timeout_list.next;
246 while (tmp != &timeout_list) {
247 cur = list_get_instance(tmp, awaiter_t, link);
248 if (tv_gteq(&cur->expires, &wd->expires))
249 break;
250 tmp = tmp->next;
251 }
252 list_append(&wd->link, tmp);
253}
254
[01ff41c]255/*************************************************/
256
[80649a91]257/** Try to route a call to an appropriate connection thread
258 *
259 */
260static int route_call(ipc_callid_t callid, ipc_call_t *call)
[450cd3a]261{
[80649a91]262 connection_t *conn;
263 msg_t *msg;
264 link_t *hlp;
265 unsigned long key;
266
[01ff41c]267 futex_down(&async_futex);
[80649a91]268
269 key = call->in_phone_hash;
270 hlp = hash_table_find(&conn_hash_table, &key);
271 if (!hlp) {
[01ff41c]272 futex_up(&async_futex);
[80649a91]273 return 0;
[450cd3a]274 }
[80649a91]275 conn = hash_table_get_instance(hlp, connection_t, link);
276
277 msg = malloc(sizeof(*msg));
278 msg->callid = callid;
279 msg->call = *call;
280 list_append(&msg->link, &conn->msg_queue);
[41269bd]281
282 if (IPC_GET_METHOD(*call) == IPC_M_PHONE_HUNGUP)
283 conn->close_callid = callid;
[80649a91]284
[49d072e]285 /* If the call is waiting for event, run it */
286 if (!conn->wdata.active) {
287 /* If in timeout list, remove it */
288 if (conn->wdata.inlist) {
289 conn->wdata.inlist = 0;
290 list_remove(&conn->wdata.link);
291 }
292 conn->wdata.active = 1;
[bc1f1c2]293 fibril_add_ready(conn->wdata.fid);
[80649a91]294 }
295
[01ff41c]296 futex_up(&async_futex);
[80649a91]297
298 return 1;
299}
300
[a2cd194]301/** Return new incoming message for current(thread-local) connection */
[49d072e]302ipc_callid_t async_get_call_timeout(ipc_call_t *call, suseconds_t usecs)
[80649a91]303{
304 msg_t *msg;
305 ipc_callid_t callid;
[6c46350]306 connection_t *conn;
[80649a91]307
[bc1f1c2]308 assert(FIBRIL_connection);
309 /* GCC 4.1.0 coughs on FIBRIL_connection-> dereference,
[6c46350]310 * GCC 4.1.1 happilly puts the rdhwr instruction in delay slot.
311 * I would never expect to find so many errors in
312 * compiler *($&$(*&$
313 */
[bc1f1c2]314 conn = FIBRIL_connection;
[c0e674a]315
[01ff41c]316 futex_down(&async_futex);
[80649a91]317
[49d072e]318 if (usecs) {
[6c46350]319 gettimeofday(&conn->wdata.expires, NULL);
320 tv_add(&conn->wdata.expires, usecs);
[49d072e]321 } else {
[6c46350]322 conn->wdata.inlist = 0;
[49d072e]323 }
[80649a91]324 /* If nothing in queue, wait until something appears */
[6c46350]325 while (list_empty(&conn->msg_queue)) {
[085bd54]326 if (usecs)
[6c46350]327 insert_timeout(&conn->wdata);
[085bd54]328
[6c46350]329 conn->wdata.active = 0;
[bc1f1c2]330 fibril_schedule_next_adv(FIBRIL_TO_MANAGER);
[49d072e]331 /* Futex is up after getting back from async_manager
332 * get it again */
333 futex_down(&async_futex);
[bc1f1c2]334 if (usecs && conn->wdata.timedout &&
[6c46350]335 list_empty(&conn->msg_queue)) {
[49d072e]336 /* If we timed out-> exit */
337 futex_up(&async_futex);
338 return 0;
339 }
[450cd3a]340 }
341
[6c46350]342 msg = list_get_instance(conn->msg_queue.next, msg_t, link);
[80649a91]343 list_remove(&msg->link);
344 callid = msg->callid;
345 *call = msg->call;
346 free(msg);
347
[01ff41c]348 futex_up(&async_futex);
[80649a91]349 return callid;
350}
351
[a2cd194]352/** Thread function that gets created on new connection
353 *
354 * This function is defined as a weak symbol - to be redefined in
355 * user code.
356 */
[da0c91e7]357static void default_client_connection(ipc_callid_t callid, ipc_call_t *call)
[80649a91]358{
[a2cd194]359 ipc_answer_fast(callid, ENOENT, 0, 0);
[80649a91]360}
[51dbadf3]361static void default_interrupt_received(ipc_callid_t callid, ipc_call_t *call)
[44c6d88d]362{
363}
364
[a2cd194]365/** Wrapper for client connection thread
366 *
367 * When new connection arrives, thread with this function is created.
368 * It calls client_connection and does final cleanup.
369 *
[b2951e2]370 * @param arg Connection structure pointer
[a2cd194]371 */
[bc1f1c2]372static int connection_fibril(void *arg)
[80649a91]373{
[a2cd194]374 unsigned long key;
375 msg_t *msg;
[41269bd]376 int close_answered = 0;
[a2cd194]377
[80649a91]378 /* Setup thread local connection pointer */
[bc1f1c2]379 FIBRIL_connection = (connection_t *) arg;
380 FIBRIL_connection->cfibril(FIBRIL_connection->callid,
381 &FIBRIL_connection->call);
[a46da63]382
[a2cd194]383 /* Remove myself from connection hash table */
[01ff41c]384 futex_down(&async_futex);
[bc1f1c2]385 key = FIBRIL_connection->in_phone_hash;
[a2cd194]386 hash_table_remove(&conn_hash_table, &key, 1);
[01ff41c]387 futex_up(&async_futex);
[a46da63]388
[a2cd194]389 /* Answer all remaining messages with ehangup */
[bc1f1c2]390 while (!list_empty(&FIBRIL_connection->msg_queue)) {
391 msg = list_get_instance(FIBRIL_connection->msg_queue.next,
392 msg_t, link);
[a2cd194]393 list_remove(&msg->link);
[bc1f1c2]394 if (msg->callid == FIBRIL_connection->close_callid)
[41269bd]395 close_answered = 1;
[a2cd194]396 ipc_answer_fast(msg->callid, EHANGUP, 0, 0);
397 free(msg);
398 }
[bc1f1c2]399 if (FIBRIL_connection->close_callid)
400 ipc_answer_fast(FIBRIL_connection->close_callid, 0, 0, 0);
[a46da63]401
402 return 0;
[80649a91]403}
404
405/** Create new thread for a new connection
406 *
407 * Creates new thread for connection, fills in connection
408 * structures and inserts it into the hash table, so that
409 * later we can easily do routing of messages to particular
410 * threads.
[53ca318]411 *
[44c6d88d]412 * @param in_phone_hash Identification of the incoming connection
[53ca318]413 * @param callid Callid of the IPC_M_CONNECT_ME_TO packet
414 * @param call Call data of the opening packet
[bc1f1c2]415 * @param cfibril Fibril function that should be called upon
[53ca318]416 * opening the connection
[bc1f1c2]417 * @return New fibril id.
[80649a91]418 */
[bc1f1c2]419fid_t async_new_connection(ipcarg_t in_phone_hash, ipc_callid_t callid,
420 ipc_call_t *call, void (*cfibril)(ipc_callid_t, ipc_call_t *))
[80649a91]421{
422 connection_t *conn;
423 unsigned long key;
424
425 conn = malloc(sizeof(*conn));
426 if (!conn) {
427 ipc_answer_fast(callid, ENOMEM, 0, 0);
[53ca318]428 return NULL;
[80649a91]429 }
[44c6d88d]430 conn->in_phone_hash = in_phone_hash;
[80649a91]431 list_initialize(&conn->msg_queue);
432 conn->callid = callid;
[41269bd]433 conn->close_callid = 0;
[eaf34f7]434 if (call)
435 conn->call = *call;
[49d072e]436 conn->wdata.active = 1; /* We will activate it asap */
[bc1f1c2]437 conn->cfibril = cfibril;
[49d072e]438
[bc1f1c2]439 conn->wdata.fid = fibril_create(connection_fibril, conn);
440 if (!conn->wdata.fid) {
[80649a91]441 free(conn);
442 ipc_answer_fast(callid, ENOMEM, 0, 0);
[53ca318]443 return NULL;
[80649a91]444 }
[49d072e]445 /* Add connection to hash table */
[80649a91]446 key = conn->in_phone_hash;
[01ff41c]447 futex_down(&async_futex);
[80649a91]448 hash_table_insert(&conn_hash_table, &key, &conn->link);
[01ff41c]449 futex_up(&async_futex);
[80649a91]450
[bc1f1c2]451 fibril_add_ready(conn->wdata.fid);
[53ca318]452
[bc1f1c2]453 return conn->wdata.fid;
[80649a91]454}
455
[01ff41c]456/** Handle call that was received */
[80649a91]457static void handle_call(ipc_callid_t callid, ipc_call_t *call)
458{
[44c6d88d]459 /* Unrouted call - do some default behaviour */
[15039b67]460 if ((callid & IPC_CALLID_NOTIFICATION)) {
[085bd54]461 in_interrupt_handler = 1;
[51dbadf3]462 (*interrupt_received)(callid,call);
[085bd54]463 in_interrupt_handler = 0;
[44c6d88d]464 return;
[15039b67]465 }
466
467 switch (IPC_GET_METHOD(*call)) {
[80649a91]468 case IPC_M_CONNECT_ME_TO:
469 /* Open new connection with thread etc. */
[bc1f1c2]470 async_new_connection(IPC_GET_ARG3(*call), callid, call,
471 client_connection);
[44c6d88d]472 return;
[80649a91]473 }
[44c6d88d]474
475 /* Try to route call through connection tables */
476 if (route_call(callid, call))
477 return;
478
479 /* Unknown call from unknown phone - hang it up */
480 ipc_answer_fast(callid, EHANGUP, 0, 0);
[450cd3a]481}
482
[6c46350]483/** Fire all timeouts that expired
484 *
485 */
[c042bdd]486static void handle_expired_timeouts(void)
487{
488 struct timeval tv;
[49d072e]489 awaiter_t *waiter;
[c042bdd]490 link_t *cur;
491
492 gettimeofday(&tv,NULL);
493 futex_down(&async_futex);
494
495 cur = timeout_list.next;
496 while (cur != &timeout_list) {
[bc1f1c2]497 waiter = list_get_instance(cur, awaiter_t, link);
[49d072e]498 if (tv_gt(&waiter->expires, &tv))
[c042bdd]499 break;
500 cur = cur->next;
[49d072e]501 list_remove(&waiter->link);
502 waiter->inlist = 0;
503 waiter->timedout = 1;
[c042bdd]504 /* Redundant condition? The thread should not
505 * be active when it gets here.
506 */
[49d072e]507 if (!waiter->active) {
508 waiter->active = 1;
[bc1f1c2]509 fibril_add_ready(waiter->fid);
[c042bdd]510 }
511 }
512
513 futex_up(&async_futex);
514}
515
[80649a91]516/** Endless loop dispatching incoming calls and answers */
[085bd54]517static int async_manager_worker(void)
[80649a91]518{
519 ipc_call_t call;
520 ipc_callid_t callid;
[0b99e40]521 int timeout;
[49d072e]522 awaiter_t *waiter;
[c042bdd]523 struct timeval tv;
[80649a91]524
525 while (1) {
[bc1f1c2]526 if (fibril_schedule_next_adv(FIBRIL_FROM_MANAGER)) {
[a46da63]527 futex_up(&async_futex);
528 /* async_futex is always held
529 * when entering manager thread
530 */
[80649a91]531 continue;
532 }
[c042bdd]533 futex_down(&async_futex);
534 if (!list_empty(&timeout_list)) {
[bc1f1c2]535 waiter = list_get_instance(timeout_list.next, awaiter_t,
536 link);
537 gettimeofday(&tv, NULL);
[49d072e]538 if (tv_gteq(&tv, &waiter->expires)) {
[6c46350]539 futex_up(&async_futex);
[c042bdd]540 handle_expired_timeouts();
541 continue;
542 } else
[49d072e]543 timeout = tv_sub(&waiter->expires, &tv);
[c042bdd]544 } else
[0b99e40]545 timeout = SYNCH_NO_TIMEOUT;
[c042bdd]546 futex_up(&async_futex);
547
[2d22049]548 callid = ipc_wait_cycle(&call, timeout, SYNCH_FLAGS_NONE);
[0b99e40]549
550 if (!callid) {
[c042bdd]551 handle_expired_timeouts();
[0b99e40]552 continue;
553 }
[80649a91]554
[085bd54]555 if (callid & IPC_CALLID_ANSWERED) {
[80649a91]556 continue;
[085bd54]557 }
[01ff41c]558
[80649a91]559 handle_call(callid, &call);
560 }
[a46da63]561
562 return 0;
[80649a91]563}
564
[a2cd194]565/** Function to start async_manager as a standalone thread
566 *
567 * When more kernel threads are used, one async manager should
568 * exist per thread. The particular implementation may change,
569 * currently one async_manager is started automatically per kernel
570 * thread except main thread.
571 */
[80649a91]572static int async_manager_thread(void *arg)
573{
[a46da63]574 futex_up(&async_futex);
575 /* async_futex is always locked when entering
576 * manager */
[085bd54]577 async_manager_worker();
[a46da63]578
579 return 0;
[80649a91]580}
[450cd3a]581
[80649a91]582/** Add one manager to manager list */
583void async_create_manager(void)
[450cd3a]584{
[bc1f1c2]585 fid_t fid;
[80649a91]586
[bc1f1c2]587 fid = fibril_create(async_manager_thread, NULL);
588 fibril_add_manager(fid);
[80649a91]589}
590
591/** Remove one manager from manager list */
592void async_destroy_manager(void)
593{
[bc1f1c2]594 fibril_remove_manager();
[80649a91]595}
596
597/** Initialize internal structures needed for async manager */
598int _async_init(void)
599{
[bc1f1c2]600 if (!hash_table_create(&conn_hash_table, CONN_HASH_TABLE_CHAINS, 1,
601 &conn_hash_table_ops)) {
[80649a91]602 printf("%s: cannot create hash table\n", "async");
603 return ENOMEM;
604 }
605
[a46da63]606 return 0;
[450cd3a]607}
[01ff41c]608
609/** IPC handler for messages in async framework
610 *
[bc1f1c2]611 * Notify the fibril which is waiting for this message, that it arrived
[01ff41c]612 */
613static void reply_received(void *private, int retval,
614 ipc_call_t *data)
615{
616 amsg_t *msg = (amsg_t *) private;
617
618 msg->retval = retval;
619
620 futex_down(&async_futex);
621 /* Copy data after futex_down, just in case the
622 * call was detached
623 */
624 if (msg->dataptr)
625 *msg->dataptr = *data;
[0b99e40]626
[c042bdd]627 write_barrier();
628 /* Remove message from timeout list */
[49d072e]629 if (msg->wdata.inlist)
630 list_remove(&msg->wdata.link);
[01ff41c]631 msg->done = 1;
[49d072e]632 if (! msg->wdata.active) {
633 msg->wdata.active = 1;
[bc1f1c2]634 fibril_add_ready(msg->wdata.fid);
[01ff41c]635 }
636 futex_up(&async_futex);
637}
638
639/** Send message and return id of the sent message
640 *
641 * The return value can be used as input for async_wait() to wait
642 * for completion.
643 */
644aid_t async_send_2(int phoneid, ipcarg_t method, ipcarg_t arg1, ipcarg_t arg2,
645 ipc_call_t *dataptr)
646{
647 amsg_t *msg;
648
[085bd54]649 if (in_interrupt_handler) {
[bc1f1c2]650 printf("Cannot send asynchronous request in interrupt "
651 "handler.\n");
[085bd54]652 _exit(1);
653 }
654
[01ff41c]655 msg = malloc(sizeof(*msg));
656 msg->done = 0;
657 msg->dataptr = dataptr;
[49d072e]658
659 msg->wdata.active = 1; /* We may sleep in next method, but it
660 * will use it's own mechanism */
[bc1f1c2]661 ipc_call_async_2(phoneid, method, arg1, arg2, msg, reply_received, 1);
[01ff41c]662
663 return (aid_t) msg;
664}
665
[90f5d64]666/** Send message and return id of the sent message
667 *
668 * The return value can be used as input for async_wait() to wait
669 * for completion.
670 */
671aid_t async_send_3(int phoneid, ipcarg_t method, ipcarg_t arg1, ipcarg_t arg2,
672 ipcarg_t arg3, ipc_call_t *dataptr)
673{
674 amsg_t *msg;
675
[085bd54]676 if (in_interrupt_handler) {
[2b017ba]677 printf("Cannot send asynchronous request in interrupt handler.\n");
[085bd54]678 _exit(1);
679 }
680
[90f5d64]681 msg = malloc(sizeof(*msg));
682 msg->done = 0;
683 msg->dataptr = dataptr;
684
685 msg->wdata.active = 1; /* We may sleep in next method, but it
686 * will use it's own mechanism */
[bc1f1c2]687 ipc_call_async_3(phoneid, method, arg1, arg2, arg3, msg, reply_received,
688 1);
[90f5d64]689
690 return (aid_t) msg;
691}
692
[01ff41c]693/** Wait for a message sent by async framework
694 *
695 * @param amsgid Message ID to wait for
696 * @param retval Pointer to variable where will be stored retval
697 * of the answered message. If NULL, it is ignored.
698 *
699 */
700void async_wait_for(aid_t amsgid, ipcarg_t *retval)
701{
702 amsg_t *msg = (amsg_t *) amsgid;
703
704 futex_down(&async_futex);
705 if (msg->done) {
706 futex_up(&async_futex);
707 goto done;
708 }
709
[bc1f1c2]710 msg->wdata.fid = fibril_get_id();
[49d072e]711 msg->wdata.active = 0;
712 msg->wdata.inlist = 0;
[01ff41c]713 /* Leave locked async_futex when entering this function */
[bc1f1c2]714 fibril_schedule_next_adv(FIBRIL_TO_MANAGER);
715 /* futex is up automatically after fibril_schedule_next...*/
[01ff41c]716done:
717 if (retval)
718 *retval = msg->retval;
719 free(msg);
720}
[0b99e40]721
[c042bdd]722/** Wait for a message sent by async framework with timeout
723 *
724 * @param amsgid Message ID to wait for
725 * @param retval Pointer to variable where will be stored retval
726 * of the answered message. If NULL, it is ignored.
727 * @param timeout Timeout in usecs
728 * @return 0 on success, ETIMEOUT if timeout expired
729 *
730 */
731int async_wait_timeout(aid_t amsgid, ipcarg_t *retval, suseconds_t timeout)
732{
733 amsg_t *msg = (amsg_t *) amsgid;
[0b99e40]734
[86029498]735 /* TODO: Let it go through the event read at least once */
736 if (timeout < 0)
737 return ETIMEOUT;
738
[c042bdd]739 futex_down(&async_futex);
740 if (msg->done) {
741 futex_up(&async_futex);
742 goto done;
743 }
[0b99e40]744
[49d072e]745 gettimeofday(&msg->wdata.expires, NULL);
746 tv_add(&msg->wdata.expires, timeout);
747
[bc1f1c2]748 msg->wdata.fid = fibril_get_id();
[49d072e]749 msg->wdata.active = 0;
750 insert_timeout(&msg->wdata);
[0b99e40]751
[c042bdd]752 /* Leave locked async_futex when entering this function */
[bc1f1c2]753 fibril_schedule_next_adv(FIBRIL_TO_MANAGER);
754 /* futex is up automatically after fibril_schedule_next...*/
[0b99e40]755
[c042bdd]756 if (!msg->done)
757 return ETIMEOUT;
[0b99e40]758
[c042bdd]759done:
760 if (retval)
761 *retval = msg->retval;
762 free(msg);
763
764 return 0;
765}
[0b99e40]766
[44c6d88d]767/** Wait specified time, but in the meantime handle incoming events
768 *
769 * @param timeout Time in microseconds to wait
770 */
771void async_usleep(suseconds_t timeout)
772{
773 amsg_t *msg;
774
[085bd54]775 if (in_interrupt_handler) {
776 printf("Cannot call async_usleep in interrupt handler.\n");
777 _exit(1);
778 }
779
[44c6d88d]780 msg = malloc(sizeof(*msg));
781 if (!msg)
782 return;
783
[bc1f1c2]784 msg->wdata.fid = fibril_get_id();
[49d072e]785 msg->wdata.active = 0;
[44c6d88d]786
[49d072e]787 gettimeofday(&msg->wdata.expires, NULL);
788 tv_add(&msg->wdata.expires, timeout);
[44c6d88d]789
790 futex_down(&async_futex);
[49d072e]791 insert_timeout(&msg->wdata);
[44c6d88d]792 /* Leave locked async_futex when entering this function */
[bc1f1c2]793 fibril_schedule_next_adv(FIBRIL_TO_MANAGER);
794 /* futex is up automatically after fibril_schedule_next...*/
[44c6d88d]795 free(msg);
796}
[da0c91e7]797
798/** Set function that is called, IPC_M_CONNECT_ME_TO is received
799 *
800 * @param conn Function that will form new psthread.
801 */
802void async_set_client_connection(async_client_conn_t conn)
803{
804 client_connection = conn;
805}
[51dbadf3]806void async_set_interrupt_received(async_client_conn_t conn)
807{
808 interrupt_received = conn;
809}
[085bd54]810
811/* Primitive functions for simple communication */
812void async_msg_3(int phoneid, ipcarg_t method, ipcarg_t arg1,
813 ipcarg_t arg2, ipcarg_t arg3)
814{
[bc1f1c2]815 ipc_call_async_3(phoneid, method, arg1, arg2, arg3, NULL, NULL,
816 !in_interrupt_handler);
[085bd54]817}
818
819void async_msg_2(int phoneid, ipcarg_t method, ipcarg_t arg1, ipcarg_t arg2)
820{
[bc1f1c2]821 ipc_call_async_2(phoneid, method, arg1, arg2, NULL, NULL,
822 !in_interrupt_handler);
[085bd54]823}
[b2951e2]824
[a46da63]825/** @}
[b2951e2]826 */
Note: See TracBrowser for help on using the repository browser.