source: mainline/libc/generic/async.c@ c0e674a

lfn serial ticket/834-toolchain-update topic/msim-upgrade topic/simplify-dev-export
Last change on this file since c0e674a was c0e674a, checked in by Ondrej Palkovsky <ondrap@…>, 19 years ago

Fix incorrect timeout handling in async framework.
Start tweak the tetris code.

  • Property mode set to 100644
File size: 16.5 KB
Line 
1/*
2 * Copyright (C) 2006 Ondrej Palkovsky
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 *
9 * - Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 * - Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
14 * - The name of the author may not be used to endorse or promote products
15 * derived from this software without specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
18 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
19 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
20 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
21 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
22 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
26 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27 */
28
29/**
30 * Asynchronous library
31 *
32 * The aim of this library is facilitating writing programs utilizing
33 * the asynchronous nature of Helenos IPC, yet using a normal way
34 * of programming.
35 *
36 * You should be able to write very simple multithreaded programs,
37 * the async framework will automatically take care of most synchronization
38 * problems.
39 *
40 * Default semantics:
41 * - send() - send asynchronously. If the kernel refuses to send more
42 * messages, [ try to get responses from kernel, if nothing
43 * found, might try synchronous ]
44 *
45 * Example of use:
46 *
47 * 1) Multithreaded client application
48 * create_thread(thread1);
49 * create_thread(thread2);
50 * ...
51 *
52 * thread1() {
53 * conn = ipc_connect_me_to();
54 * c1 = send(conn);
55 * c2 = send(conn);
56 * wait_for(c1);
57 * wait_for(c2);
58 * }
59 *
60 *
61 * 2) Multithreaded server application
62 * main() {
63 * async_manager();
64 * }
65 *
66 *
67 * client_connection(icallid, *icall) {
68 * if (want_refuse) {
69 * ipc_answer_fast(icallid, ELIMIT, 0, 0);
70 * return;
71 * }
72 * ipc_answer_fast(icallid, 0, 0, 0);
73 *
74 * callid = async_get_call(&call);
75 * handle(callid, call);
76 * ipc_answer_fast(callid, 1,2,3);
77 *
78 * callid = async_get_call(&call);
79 * ....
80 * }
81 *
82 * TODO: Detaching/joining dead psthreads?
83 */
84#include <futex.h>
85#include <async.h>
86#include <psthread.h>
87#include <stdio.h>
88#include <libadt/hash_table.h>
89#include <libadt/list.h>
90#include <ipc/ipc.h>
91#include <assert.h>
92#include <errno.h>
93#include <time.h>
94#include <arch/barrier.h>
95
96atomic_t async_futex = FUTEX_INITIALIZER;
97static hash_table_t conn_hash_table;
98static LIST_INITIALIZE(timeout_list);
99
100typedef struct {
101 pstid_t ptid; /**< Thread waiting for this message */
102 int active; /**< If this thread is currently active */
103 int done; /**< If reply was received */
104 ipc_call_t *dataptr; /**< Pointer where the answer data
105 * should be stored */
106 struct timeval expires; /**< Expiration time for waiting thread */
107 int has_timeout; /**< If true, this struct is in timeout list */
108 link_t link;
109
110 ipcarg_t retval;
111} amsg_t;
112
113typedef struct {
114 link_t link;
115 ipc_callid_t callid;
116 ipc_call_t call;
117} msg_t;
118
119typedef struct {
120 link_t link;
121 ipcarg_t in_phone_hash; /**< Incoming phone hash. */
122 link_t msg_queue; /**< Messages that should be delivered to this thread */
123 pstid_t ptid; /**< Thread associated with this connection */
124 int active; /**< If this thread is currently active */
125 /* Structures for connection opening packet */
126 ipc_callid_t callid;
127 ipc_call_t call;
128 void (*cthread)(ipc_callid_t,ipc_call_t *);
129} connection_t;
130
131__thread connection_t *PS_connection;
132
133/** Add microseconds to give timeval */
134static void tv_add(struct timeval *tv, suseconds_t usecs)
135{
136 tv->tv_sec += usecs / 1000000;
137 tv->tv_usec += usecs % 1000000;
138 if (tv->tv_usec > 1000000) {
139 tv->tv_sec++;
140 tv->tv_usec -= 1000000;
141 }
142}
143
144/** Subtract 2 timevals, return microseconds difference */
145static suseconds_t tv_sub(struct timeval *tv1, struct timeval *tv2)
146{
147 suseconds_t result;
148
149 result = tv1->tv_usec - tv2->tv_usec;
150 result += (tv1->tv_sec - tv2->tv_sec) * 1000000;
151
152 return result;
153}
154
155/** Compare timeval
156 *
157 * @return 1 if tv1 > tv2, otherwise 0
158 */
159static int tv_gt(struct timeval *tv1, struct timeval *tv2)
160{
161 if (tv1->tv_sec > tv2->tv_sec)
162 return 1;
163 if (tv1->tv_sec == tv2->tv_sec && tv1->tv_usec > tv2->tv_usec)
164 return 1;
165 return 0;
166}
167static int tv_gteq(struct timeval *tv1, struct timeval *tv2)
168{
169 if (tv1->tv_sec > tv2->tv_sec)
170 return 1;
171 if (tv1->tv_sec == tv2->tv_sec && tv1->tv_usec >= tv2->tv_usec)
172 return 1;
173 return 0;
174}
175
176/* Hash table functions */
177#define CONN_HASH_TABLE_CHAINS 32
178
179static hash_index_t conn_hash(unsigned long *key)
180{
181 assert(key);
182 return ((*key) >> 4) % CONN_HASH_TABLE_CHAINS;
183}
184
185static int conn_compare(unsigned long key[], hash_count_t keys, link_t *item)
186{
187 connection_t *hs;
188
189 hs = hash_table_get_instance(item, connection_t, link);
190
191 return key[0] == hs->in_phone_hash;
192}
193
194static void conn_remove(link_t *item)
195{
196 free(hash_table_get_instance(item, connection_t, link));
197}
198
199
200/** Operations for NS hash table. */
201static hash_table_operations_t conn_hash_table_ops = {
202 .hash = conn_hash,
203 .compare = conn_compare,
204 .remove_callback = conn_remove
205};
206
207/*************************************************/
208
209/** Try to route a call to an appropriate connection thread
210 *
211 */
212static int route_call(ipc_callid_t callid, ipc_call_t *call)
213{
214 connection_t *conn;
215 msg_t *msg;
216 link_t *hlp;
217 unsigned long key;
218
219 futex_down(&async_futex);
220
221 key = call->in_phone_hash;
222 hlp = hash_table_find(&conn_hash_table, &key);
223 if (!hlp) {
224 futex_up(&async_futex);
225 return 0;
226 }
227 conn = hash_table_get_instance(hlp, connection_t, link);
228
229 msg = malloc(sizeof(*msg));
230 msg->callid = callid;
231 msg->call = *call;
232 list_append(&msg->link, &conn->msg_queue);
233
234 if (!conn->active) {
235 conn->active = 1;
236 psthread_add_ready(conn->ptid);
237 }
238
239 futex_up(&async_futex);
240
241 return 1;
242}
243
244/** Return new incoming message for current(thread-local) connection */
245ipc_callid_t async_get_call(ipc_call_t *call)
246{
247 msg_t *msg;
248 ipc_callid_t callid;
249 connection_t *conn;
250
251 assert(PS_connection);
252
253 futex_down(&async_futex);
254
255 conn = PS_connection;
256 /* If nothing in queue, wait until something appears */
257 if (list_empty(&conn->msg_queue)) {
258 conn->active = 0;
259 psthread_schedule_next_adv(PS_TO_MANAGER);
260 }
261
262 msg = list_get_instance(conn->msg_queue.next, msg_t, link);
263 list_remove(&msg->link);
264 callid = msg->callid;
265 *call = msg->call;
266 free(msg);
267
268 futex_up(&async_futex);
269 return callid;
270}
271
272/** Thread function that gets created on new connection
273 *
274 * This function is defined as a weak symbol - to be redefined in
275 * user code.
276 */
277void client_connection(ipc_callid_t callid, ipc_call_t *call)
278{
279 ipc_answer_fast(callid, ENOENT, 0, 0);
280}
281
282/** Function that gets called on interrupt receival
283 *
284 * This function is defined as a weak symbol - to be redefined in
285 * user code.
286 */
287void interrupt_received(ipc_call_t *call)
288{
289}
290
291
292/** Wrapper for client connection thread
293 *
294 * When new connection arrives, thread with this function is created.
295 * It calls client_connection and does final cleanup.
296 *
297 * @parameter arg Connection structure pointer
298 */
299static int connection_thread(void *arg)
300{
301 unsigned long key;
302 msg_t *msg;
303 connection_t *conn;
304
305 /* Setup thread local connection pointer */
306 PS_connection = (connection_t *)arg;
307 conn = PS_connection;
308 conn->cthread(conn->callid, &conn->call);
309
310 /* Remove myself from connection hash table */
311 futex_down(&async_futex);
312 key = conn->in_phone_hash;
313 hash_table_remove(&conn_hash_table, &key, 1);
314 futex_up(&async_futex);
315 /* Answer all remaining messages with ehangup */
316 while (!list_empty(&conn->msg_queue)) {
317 msg = list_get_instance(conn->msg_queue.next, msg_t, link);
318 list_remove(&msg->link);
319 ipc_answer_fast(msg->callid, EHANGUP, 0, 0);
320 free(msg);
321 }
322}
323
324/** Create new thread for a new connection
325 *
326 * Creates new thread for connection, fills in connection
327 * structures and inserts it into the hash table, so that
328 * later we can easily do routing of messages to particular
329 * threads.
330 *
331 * @param in_phone_hash Identification of the incoming connection
332 * @param callid Callid of the IPC_M_CONNECT_ME_TO packet
333 * @param call Call data of the opening packet
334 * @param cthread Thread function that should be called upon
335 * opening the connection
336 * @return New thread id
337 */
338pstid_t async_new_connection(ipcarg_t in_phone_hash,ipc_callid_t callid,
339 ipc_call_t *call,
340 void (*cthread)(ipc_callid_t,ipc_call_t *))
341{
342 pstid_t ptid;
343 connection_t *conn;
344 unsigned long key;
345
346 conn = malloc(sizeof(*conn));
347 if (!conn) {
348 ipc_answer_fast(callid, ENOMEM, 0, 0);
349 return NULL;
350 }
351 conn->in_phone_hash = in_phone_hash;
352 list_initialize(&conn->msg_queue);
353 conn->ptid = psthread_create(connection_thread, conn);
354 conn->callid = callid;
355 if (call)
356 conn->call = *call;
357 conn->active = 1; /* We will activate it asap */
358 conn->cthread = cthread;
359 list_initialize(&conn->link);
360 if (!conn->ptid) {
361 free(conn);
362 ipc_answer_fast(callid, ENOMEM, 0, 0);
363 return NULL;
364 }
365 key = conn->in_phone_hash;
366 futex_down(&async_futex);
367 /* Add connection to hash table */
368 hash_table_insert(&conn_hash_table, &key, &conn->link);
369 futex_up(&async_futex);
370
371 psthread_add_ready(conn->ptid);
372
373 return conn->ptid;
374}
375
376/** Handle call that was received */
377static void handle_call(ipc_callid_t callid, ipc_call_t *call)
378{
379 /* Unrouted call - do some default behaviour */
380 switch (IPC_GET_METHOD(*call)) {
381 case IPC_M_INTERRUPT:
382 interrupt_received(call);
383 return;
384 case IPC_M_CONNECT_ME_TO:
385 /* Open new connection with thread etc. */
386 async_new_connection(IPC_GET_ARG3(*call), callid, call, client_connection);
387 return;
388 }
389
390 /* Try to route call through connection tables */
391 if (route_call(callid, call))
392 return;
393
394 /* Unknown call from unknown phone - hang it up */
395 ipc_answer_fast(callid, EHANGUP, 0, 0);
396}
397
398/** Fire all timeouts that expired */
399static void handle_expired_timeouts(void)
400{
401 struct timeval tv;
402 amsg_t *amsg;
403 link_t *cur;
404
405 gettimeofday(&tv,NULL);
406 futex_down(&async_futex);
407
408 cur = timeout_list.next;
409 while (cur != &timeout_list) {
410 amsg = list_get_instance(cur,amsg_t,link);
411 if (tv_gt(&amsg->expires, &tv))
412 break;
413 cur = cur->next;
414 list_remove(&amsg->link);
415 amsg->has_timeout = 0;
416 /* Redundant condition? The thread should not
417 * be active when it gets here.
418 */
419 if (!amsg->active) {
420 amsg->active = 1;
421 psthread_add_ready(amsg->ptid);
422 }
423 }
424
425 futex_up(&async_futex);
426}
427
428/** Endless loop dispatching incoming calls and answers */
429int async_manager(void)
430{
431 ipc_call_t call;
432 ipc_callid_t callid;
433 int timeout;
434 amsg_t *amsg;
435 struct timeval tv;
436
437 while (1) {
438 if (psthread_schedule_next_adv(PS_FROM_MANAGER)) {
439 futex_up(&async_futex); /* async_futex is always held
440 * when entering manager thread
441 */
442 continue;
443 }
444 futex_down(&async_futex);
445 if (!list_empty(&timeout_list)) {
446 amsg = list_get_instance(timeout_list.next,amsg_t,link);
447 gettimeofday(&tv,NULL);
448 if (tv_gteq(&tv, &amsg->expires)) {
449 handle_expired_timeouts();
450 continue;
451 } else
452 timeout = tv_sub(&amsg->expires, &tv);
453 } else
454 timeout = SYNCH_NO_TIMEOUT;
455 futex_up(&async_futex);
456
457 callid = ipc_wait_cycle(&call, timeout, SYNCH_BLOCKING);
458
459 if (!callid) {
460 handle_expired_timeouts();
461 continue;
462 }
463
464 if (callid & IPC_CALLID_ANSWERED)
465 continue;
466
467 handle_call(callid, &call);
468 }
469}
470
471/** Function to start async_manager as a standalone thread
472 *
473 * When more kernel threads are used, one async manager should
474 * exist per thread. The particular implementation may change,
475 * currently one async_manager is started automatically per kernel
476 * thread except main thread.
477 */
478static int async_manager_thread(void *arg)
479{
480 futex_up(&async_futex); /* async_futex is always locked when entering
481 * manager */
482 async_manager();
483}
484
485/** Add one manager to manager list */
486void async_create_manager(void)
487{
488 pstid_t ptid;
489
490 ptid = psthread_create(async_manager_thread, NULL);
491 psthread_add_manager(ptid);
492}
493
494/** Remove one manager from manager list */
495void async_destroy_manager(void)
496{
497 psthread_remove_manager();
498}
499
500/** Initialize internal structures needed for async manager */
501int _async_init(void)
502{
503 if (!hash_table_create(&conn_hash_table, CONN_HASH_TABLE_CHAINS, 1, &conn_hash_table_ops)) {
504 printf("%s: cannot create hash table\n", "async");
505 return ENOMEM;
506 }
507
508}
509
510/** IPC handler for messages in async framework
511 *
512 * Notify thread that is waiting for this message, that it arrived
513 */
514static void reply_received(void *private, int retval,
515 ipc_call_t *data)
516{
517 amsg_t *msg = (amsg_t *) private;
518
519 msg->retval = retval;
520
521 futex_down(&async_futex);
522 /* Copy data after futex_down, just in case the
523 * call was detached
524 */
525 if (msg->dataptr)
526 *msg->dataptr = *data;
527
528 write_barrier();
529 /* Remove message from timeout list */
530 if (msg->has_timeout)
531 list_remove(&msg->link);
532 msg->done = 1;
533 if (! msg->active) {
534 msg->active = 1;
535 psthread_add_ready(msg->ptid);
536 }
537 futex_up(&async_futex);
538}
539
540/** Send message and return id of the sent message
541 *
542 * The return value can be used as input for async_wait() to wait
543 * for completion.
544 */
545aid_t async_send_2(int phoneid, ipcarg_t method, ipcarg_t arg1, ipcarg_t arg2,
546 ipc_call_t *dataptr)
547{
548 amsg_t *msg;
549
550 msg = malloc(sizeof(*msg));
551 msg->active = 1;
552 msg->done = 0;
553 msg->dataptr = dataptr;
554 ipc_call_async_2(phoneid,method,arg1,arg2,msg,reply_received);
555
556 return (aid_t) msg;
557}
558
559/** Wait for a message sent by async framework
560 *
561 * @param amsgid Message ID to wait for
562 * @param retval Pointer to variable where will be stored retval
563 * of the answered message. If NULL, it is ignored.
564 *
565 */
566void async_wait_for(aid_t amsgid, ipcarg_t *retval)
567{
568 amsg_t *msg = (amsg_t *) amsgid;
569 connection_t *conn;
570
571 futex_down(&async_futex);
572 if (msg->done) {
573 futex_up(&async_futex);
574 goto done;
575 }
576
577 msg->ptid = psthread_get_id();
578 msg->active = 0;
579 msg->has_timeout = 0;
580 /* Leave locked async_futex when entering this function */
581 psthread_schedule_next_adv(PS_TO_MANAGER);
582 /* futex is up automatically after psthread_schedule_next...*/
583done:
584 if (retval)
585 *retval = msg->retval;
586 free(msg);
587}
588
589/** Insert sort timeout msg into timeouts list
590 *
591 * Assume async_futex is held
592 */
593static void insert_timeout(amsg_t *msg)
594{
595 link_t *tmp;
596 amsg_t *cur;
597
598 tmp = timeout_list.next;
599 while (tmp != &timeout_list) {
600 cur = list_get_instance(tmp, amsg_t, link);
601 if (tv_gteq(&cur->expires, &msg->expires))
602 break;
603 tmp = tmp->next;
604 }
605 list_append(&msg->link, tmp);
606}
607
608/** Wait for a message sent by async framework with timeout
609 *
610 * @param amsgid Message ID to wait for
611 * @param retval Pointer to variable where will be stored retval
612 * of the answered message. If NULL, it is ignored.
613 * @param timeout Timeout in usecs
614 * @return 0 on success, ETIMEOUT if timeout expired
615 *
616 */
617int async_wait_timeout(aid_t amsgid, ipcarg_t *retval, suseconds_t timeout)
618{
619 amsg_t *msg = (amsg_t *) amsgid;
620 connection_t *conn;
621
622 futex_down(&async_futex);
623 if (msg->done) {
624 futex_up(&async_futex);
625 goto done;
626 }
627
628 msg->ptid = psthread_get_id();
629 msg->active = 0;
630 msg->has_timeout = 1;
631
632 gettimeofday(&msg->expires, NULL);
633 tv_add(&msg->expires, timeout);
634 insert_timeout(msg);
635
636 /* Leave locked async_futex when entering this function */
637 psthread_schedule_next_adv(PS_TO_MANAGER);
638 /* futex is up automatically after psthread_schedule_next...*/
639
640 if (!msg->done)
641 return ETIMEOUT;
642
643done:
644 if (retval)
645 *retval = msg->retval;
646 free(msg);
647
648 return 0;
649}
650
651/** Wait specified time, but in the meantime handle incoming events
652 *
653 * @param timeout Time in microseconds to wait
654 */
655void async_usleep(suseconds_t timeout)
656{
657 amsg_t *msg;
658
659 msg = malloc(sizeof(*msg));
660 if (!msg)
661 return;
662
663 msg->ptid = psthread_get_id();
664 msg->active = 0;
665 msg->has_timeout = 1;
666
667 gettimeofday(&msg->expires, NULL);
668 tv_add(&msg->expires, timeout);
669
670 futex_down(&async_futex);
671 insert_timeout(msg);
672 /* Leave locked async_futex when entering this function */
673 psthread_schedule_next_adv(PS_TO_MANAGER);
674 /* futex is up automatically after psthread_schedule_next...*/
675 free(msg);
676}
Note: See TracBrowser for help on using the repository browser.