source: mainline/libc/generic/async.c@ c042bdd

lfn serial ticket/834-toolchain-update topic/msim-upgrade topic/simplify-dev-export
Last change on this file since c042bdd was c042bdd, checked in by Ondrej Palkovsky <ondrap@…>, 19 years ago

Added support for async_wait_timeout.

  • Property mode set to 100644
File size: 15.3 KB
Line 
1/*
2 * Copyright (C) 2006 Ondrej Palkovsky
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 *
9 * - Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 * - Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
14 * - The name of the author may not be used to endorse or promote products
15 * derived from this software without specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
18 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
19 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
20 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
21 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
22 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
26 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27 */
28
29/**
30 * Asynchronous library
31 *
32 * The aim of this library is facilitating writing programs utilizing
33 * the asynchronous nature of Helenos IPC, yet using a normal way
34 * of programming.
35 *
36 * You should be able to write very simple multithreaded programs,
37 * the async framework will automatically take care of most synchronization
38 * problems.
39 *
40 * Default semantics:
41 * - send() - send asynchronously. If the kernel refuses to send more
42 * messages, [ try to get responses from kernel, if nothing
43 * found, might try synchronous ]
44 *
45 * Example of use:
46 *
47 * 1) Multithreaded client application
48 * create_thread(thread1);
49 * create_thread(thread2);
50 * ...
51 *
52 * thread1() {
53 * conn = ipc_connect_me_to();
54 * c1 = send(conn);
55 * c2 = send(conn);
56 * wait_for(c1);
57 * wait_for(c2);
58 * }
59 *
60 *
61 * 2) Multithreaded server application
62 * main() {
63 * async_manager();
64 * }
65 *
66 *
67 * client_connection(icallid, *icall) {
68 * if (want_refuse) {
69 * ipc_answer_fast(icallid, ELIMIT, 0, 0);
70 * return;
71 * }
72 * ipc_answer_fast(icallid, 0, 0, 0);
73 *
74 * callid = async_get_call(&call);
75 * handle(callid, call);
76 * ipc_answer_fast(callid, 1,2,3);
77 *
78 * callid = async_get_call(&call);
79 * ....
80 * }
81 *
82 * TODO: Detaching/joining dead psthreads?
83 */
84#include <futex.h>
85#include <async.h>
86#include <psthread.h>
87#include <stdio.h>
88#include <libadt/hash_table.h>
89#include <libadt/list.h>
90#include <ipc/ipc.h>
91#include <assert.h>
92#include <errno.h>
93#include <time.h>
94#include <arch/barrier.h>
95
96static atomic_t async_futex = FUTEX_INITIALIZER;
97static hash_table_t conn_hash_table;
98static LIST_INITIALIZE(timeout_list);
99
100typedef struct {
101 pstid_t ptid; /**< Thread waiting for this message */
102 int active; /**< If this thread is currently active */
103 int done; /**< If reply was received */
104 ipc_call_t *dataptr; /**< Pointer where the answer data
105 * should be stored */
106 struct timeval expires; /**< Expiration time for waiting thread */
107 int has_timeout; /**< If true, this struct is in timeout list */
108 link_t link;
109
110 ipcarg_t retval;
111} amsg_t;
112
113typedef struct {
114 link_t link;
115 ipc_callid_t callid;
116 ipc_call_t call;
117} msg_t;
118
119typedef struct {
120 link_t link;
121 ipcarg_t in_phone_hash; /**< Incoming phone hash. */
122 link_t msg_queue; /**< Messages that should be delivered to this thread */
123 pstid_t ptid; /**< Thread associated with this connection */
124 int active; /**< If this thread is currently active */
125 /* Structures for connection opening packet */
126 ipc_callid_t callid;
127 ipc_call_t call;
128 void (*cthread)(ipc_callid_t,ipc_call_t *);
129} connection_t;
130
131__thread connection_t *PS_connection;
132
133/** Add microseconds to give timeval */
134static void tv_add(struct timeval *tv, suseconds_t usecs)
135{
136 tv->tv_sec += usecs / 1000000;
137 tv->tv_usec += usecs % 1000000;
138 if (tv->tv_usec > 1000000) {
139 tv->tv_sec++;
140 tv->tv_usec -= 1000000;
141 }
142}
143
144/** Subtract 2 timevals, return microseconds difference */
145static suseconds_t tv_sub(struct timeval *tv1, struct timeval *tv2)
146{
147 suseconds_t result;
148
149 result = tv1->tv_usec - tv2->tv_usec;
150 result += (tv1->tv_sec - tv2->tv_sec) * 1000000;
151
152 return result;
153}
154
155/** Compare timeval
156 *
157 * @return 1 if tv1 > tv2, otherwise 0
158 */
159static int tv_gt(struct timeval *tv1, struct timeval *tv2)
160{
161 if (tv1->tv_sec > tv2->tv_sec)
162 return 1;
163 if (tv1->tv_sec == tv2->tv_sec && tv1->tv_usec > tv2->tv_usec)
164 return 1;
165 return 0;
166}
167
168/* Hash table functions */
169#define CONN_HASH_TABLE_CHAINS 32
170
171static hash_index_t conn_hash(unsigned long *key)
172{
173 assert(key);
174 return ((*key) >> 4) % CONN_HASH_TABLE_CHAINS;
175}
176
177static int conn_compare(unsigned long key[], hash_count_t keys, link_t *item)
178{
179 connection_t *hs;
180
181 hs = hash_table_get_instance(item, connection_t, link);
182
183 return key[0] == hs->in_phone_hash;
184}
185
186static void conn_remove(link_t *item)
187{
188 free(hash_table_get_instance(item, connection_t, link));
189}
190
191
192/** Operations for NS hash table. */
193static hash_table_operations_t conn_hash_table_ops = {
194 .hash = conn_hash,
195 .compare = conn_compare,
196 .remove_callback = conn_remove
197};
198
199/*************************************************/
200
201/** Try to route a call to an appropriate connection thread
202 *
203 */
204static int route_call(ipc_callid_t callid, ipc_call_t *call)
205{
206 connection_t *conn;
207 msg_t *msg;
208 link_t *hlp;
209 unsigned long key;
210
211 futex_down(&async_futex);
212
213 key = call->in_phone_hash;
214 hlp = hash_table_find(&conn_hash_table, &key);
215 if (!hlp) {
216 futex_up(&async_futex);
217 return 0;
218 }
219 conn = hash_table_get_instance(hlp, connection_t, link);
220
221 msg = malloc(sizeof(*msg));
222 msg->callid = callid;
223 msg->call = *call;
224 list_append(&msg->link, &conn->msg_queue);
225
226 if (!conn->active) {
227 conn->active = 1;
228 psthread_add_ready(conn->ptid);
229 }
230
231 futex_up(&async_futex);
232
233 return 1;
234}
235
236/** Return new incoming message for current(thread-local) connection */
237ipc_callid_t async_get_call(ipc_call_t *call)
238{
239 msg_t *msg;
240 ipc_callid_t callid;
241 connection_t *conn;
242
243 futex_down(&async_futex);
244
245 conn = PS_connection;
246 /* If nothing in queue, wait until something appears */
247 if (list_empty(&conn->msg_queue)) {
248 conn->active = 0;
249 psthread_schedule_next_adv(PS_TO_MANAGER);
250 }
251
252 msg = list_get_instance(conn->msg_queue.next, msg_t, link);
253 list_remove(&msg->link);
254 callid = msg->callid;
255 *call = msg->call;
256 free(msg);
257
258 futex_up(&async_futex);
259 return callid;
260}
261
262/** Thread function that gets created on new connection
263 *
264 * This function is defined as a weak symbol - to be redefined in
265 * user code.
266 */
267void client_connection(ipc_callid_t callid, ipc_call_t *call)
268{
269 ipc_answer_fast(callid, ENOENT, 0, 0);
270}
271
272/** Wrapper for client connection thread
273 *
274 * When new connection arrives, thread with this function is created.
275 * It calls client_connection and does final cleanup.
276 *
277 * @parameter arg Connection structure pointer
278 */
279static int connection_thread(void *arg)
280{
281 unsigned long key;
282 msg_t *msg;
283 connection_t *conn;
284
285 /* Setup thread local connection pointer */
286 PS_connection = (connection_t *)arg;
287 conn = PS_connection;
288 conn->cthread(conn->callid, &conn->call);
289
290 /* Remove myself from connection hash table */
291 futex_down(&async_futex);
292 key = conn->in_phone_hash;
293 hash_table_remove(&conn_hash_table, &key, 1);
294 futex_up(&async_futex);
295 /* Answer all remaining messages with ehangup */
296 while (!list_empty(&conn->msg_queue)) {
297 msg = list_get_instance(conn->msg_queue.next, msg_t, link);
298 list_remove(&msg->link);
299 ipc_answer_fast(msg->callid, EHANGUP, 0, 0);
300 free(msg);
301 }
302}
303
304/** Create new thread for a new connection
305 *
306 * Creates new thread for connection, fills in connection
307 * structures and inserts it into the hash table, so that
308 * later we can easily do routing of messages to particular
309 * threads.
310 *
311 * @param callid Callid of the IPC_M_CONNECT_ME_TO packet
312 * @param call Call data of the opening packet
313 * @param cthread Thread function that should be called upon
314 * opening the connection
315 * @return New thread id
316 */
317pstid_t async_new_connection(ipc_callid_t callid, ipc_call_t *call,
318 void (*cthread)(ipc_callid_t,ipc_call_t *))
319{
320 pstid_t ptid;
321 connection_t *conn;
322 unsigned long key;
323
324 conn = malloc(sizeof(*conn));
325 if (!conn) {
326 ipc_answer_fast(callid, ENOMEM, 0, 0);
327 return NULL;
328 }
329 conn->in_phone_hash = IPC_GET_ARG3(*call);
330 list_initialize(&conn->msg_queue);
331 conn->ptid = psthread_create(connection_thread, conn);
332 conn->callid = callid;
333 conn->call = *call;
334 conn->active = 1; /* We will activate it asap */
335 conn->cthread = cthread;
336 list_initialize(&conn->link);
337 if (!conn->ptid) {
338 free(conn);
339 ipc_answer_fast(callid, ENOMEM, 0, 0);
340 return NULL;
341 }
342 key = conn->in_phone_hash;
343 futex_down(&async_futex);
344 /* Add connection to hash table */
345 hash_table_insert(&conn_hash_table, &key, &conn->link);
346 futex_up(&async_futex);
347
348 psthread_add_ready(conn->ptid);
349
350 return conn->ptid;
351}
352
353/** Handle call that was received */
354static void handle_call(ipc_callid_t callid, ipc_call_t *call)
355{
356 if (route_call(callid, call))
357 return;
358
359 switch (IPC_GET_METHOD(*call)) {
360 case IPC_M_INTERRUPT:
361 break;
362 case IPC_M_CONNECT_ME_TO:
363 /* Open new connection with thread etc. */
364 async_new_connection(callid, call, client_connection);
365 break;
366 default:
367 ipc_answer_fast(callid, EHANGUP, 0, 0);
368 }
369}
370
371/** Fire all timeouts that expired */
372static void handle_expired_timeouts(void)
373{
374 struct timeval tv;
375 amsg_t *amsg;
376 link_t *cur;
377
378 gettimeofday(&tv,NULL);
379 futex_down(&async_futex);
380
381 cur = timeout_list.next;
382 while (cur != &timeout_list) {
383 amsg = list_get_instance(cur,amsg_t,link);
384 if (tv_gt(&amsg->expires, &tv))
385 break;
386 cur = cur->next;
387 list_remove(&amsg->link);
388 amsg->has_timeout = 0;
389 /* Redundant condition? The thread should not
390 * be active when it gets here.
391 */
392 if (!amsg->active) {
393 amsg->active = 1;
394 psthread_add_ready(amsg->ptid);
395 }
396 }
397
398 futex_up(&async_futex);
399}
400
401/** Endless loop dispatching incoming calls and answers */
402int async_manager(void)
403{
404 ipc_call_t call;
405 ipc_callid_t callid;
406 int timeout;
407 amsg_t *amsg;
408 struct timeval tv;
409
410 while (1) {
411 if (psthread_schedule_next_adv(PS_FROM_MANAGER)) {
412 futex_up(&async_futex); /* async_futex is always held
413 * when entering manager thread
414 */
415 continue;
416 }
417 futex_down(&async_futex);
418 if (!list_empty(&timeout_list)) {
419 amsg = list_get_instance(timeout_list.next,amsg_t,link);
420 gettimeofday(&tv,NULL);
421 if (tv_gt(&tv, &amsg->expires)) {
422 handle_expired_timeouts();
423 continue;
424 } else
425 timeout = tv_sub(&amsg->expires, &tv);
426 } else
427 timeout = SYNCH_NO_TIMEOUT;
428 futex_up(&async_futex);
429
430 callid = ipc_wait_cycle(&call, timeout, SYNCH_BLOCKING);
431
432 if (!callid) {
433 handle_expired_timeouts();
434 continue;
435 }
436
437 if (callid & IPC_CALLID_ANSWERED)
438 continue;
439
440 handle_call(callid, &call);
441 }
442}
443
444/** Function to start async_manager as a standalone thread
445 *
446 * When more kernel threads are used, one async manager should
447 * exist per thread. The particular implementation may change,
448 * currently one async_manager is started automatically per kernel
449 * thread except main thread.
450 */
451static int async_manager_thread(void *arg)
452{
453 futex_up(&async_futex); /* async_futex is always locked when entering
454 * manager */
455 async_manager();
456}
457
458/** Add one manager to manager list */
459void async_create_manager(void)
460{
461 pstid_t ptid;
462
463 ptid = psthread_create(async_manager_thread, NULL);
464 psthread_add_manager(ptid);
465}
466
467/** Remove one manager from manager list */
468void async_destroy_manager(void)
469{
470 psthread_remove_manager();
471}
472
473/** Initialize internal structures needed for async manager */
474int _async_init(void)
475{
476 if (!hash_table_create(&conn_hash_table, CONN_HASH_TABLE_CHAINS, 1, &conn_hash_table_ops)) {
477 printf("%s: cannot create hash table\n", "async");
478 return ENOMEM;
479 }
480
481}
482
483/** IPC handler for messages in async framework
484 *
485 * Notify thread that is waiting for this message, that it arrived
486 */
487static void reply_received(void *private, int retval,
488 ipc_call_t *data)
489{
490 amsg_t *msg = (amsg_t *) private;
491
492 msg->retval = retval;
493
494 futex_down(&async_futex);
495 /* Copy data after futex_down, just in case the
496 * call was detached
497 */
498 if (msg->dataptr)
499 *msg->dataptr = *data;
500
501 write_barrier();
502 /* Remove message from timeout list */
503 if (msg->has_timeout)
504 list_remove(&msg->link);
505 msg->done = 1;
506 if (! msg->active) {
507 msg->active = 1;
508 psthread_add_ready(msg->ptid);
509 }
510 futex_up(&async_futex);
511}
512
513/** Send message and return id of the sent message
514 *
515 * The return value can be used as input for async_wait() to wait
516 * for completion.
517 */
518aid_t async_send_2(int phoneid, ipcarg_t method, ipcarg_t arg1, ipcarg_t arg2,
519 ipc_call_t *dataptr)
520{
521 amsg_t *msg;
522
523 msg = malloc(sizeof(*msg));
524 msg->active = 1;
525 msg->done = 0;
526 msg->dataptr = dataptr;
527 ipc_call_async_2(phoneid,method,arg1,arg2,msg,reply_received);
528
529 return (aid_t) msg;
530}
531
532/** Wait for a message sent by async framework
533 *
534 * @param amsgid Message ID to wait for
535 * @param retval Pointer to variable where will be stored retval
536 * of the answered message. If NULL, it is ignored.
537 *
538 */
539void async_wait_for(aid_t amsgid, ipcarg_t *retval)
540{
541 amsg_t *msg = (amsg_t *) amsgid;
542 connection_t *conn;
543
544 futex_down(&async_futex);
545 if (msg->done) {
546 futex_up(&async_futex);
547 goto done;
548 }
549
550 msg->ptid = psthread_get_id();
551 msg->active = 0;
552 msg->has_timeout = 0;
553 /* Leave locked async_futex when entering this function */
554 psthread_schedule_next_adv(PS_TO_MANAGER);
555 /* futex is up automatically after psthread_schedule_next...*/
556done:
557 if (retval)
558 *retval = msg->retval;
559 free(msg);
560}
561
562/** Insert sort timeout msg into timeouts list
563 *
564 * Assume async_futex is held
565 */
566static void insert_timeout(amsg_t *msg)
567{
568 link_t *tmp;
569 amsg_t *cur;
570
571 tmp = timeout_list.next;
572 while (tmp != &timeout_list) {
573 cur = list_get_instance(tmp, amsg_t, link);
574 if (tv_gt(&cur->expires, &msg->expires))
575 break;
576 tmp = tmp->next;
577 }
578 list_append(&msg->link, tmp);
579}
580
581/** Wait for a message sent by async framework with timeout
582 *
583 * @param amsgid Message ID to wait for
584 * @param retval Pointer to variable where will be stored retval
585 * of the answered message. If NULL, it is ignored.
586 * @param timeout Timeout in usecs
587 * @return 0 on success, ETIMEOUT if timeout expired
588 *
589 */
590int async_wait_timeout(aid_t amsgid, ipcarg_t *retval, suseconds_t timeout)
591{
592 amsg_t *msg = (amsg_t *) amsgid;
593 connection_t *conn;
594
595 futex_down(&async_futex);
596 if (msg->done) {
597 futex_up(&async_futex);
598 goto done;
599 }
600
601 msg->ptid = psthread_get_id();
602 msg->active = 0;
603 msg->has_timeout = 1;
604
605 gettimeofday(&msg->expires, NULL);
606 tv_add(&msg->expires, timeout);
607 insert_timeout(msg);
608
609 /* Leave locked async_futex when entering this function */
610 psthread_schedule_next_adv(PS_TO_MANAGER);
611 /* futex is up automatically after psthread_schedule_next...*/
612
613 if (!msg->done)
614 return ETIMEOUT;
615
616done:
617 if (retval)
618 *retval = msg->retval;
619 free(msg);
620
621 return 0;
622}
623
Note: See TracBrowser for help on using the repository browser.