source: mainline/libc/generic/async.c@ 1c20e22

lfn serial ticket/834-toolchain-update topic/msim-upgrade topic/simplify-dev-export
Last change on this file since 1c20e22 was 1c20e22, checked in by Ondrej Palkovsky <ondrap@…>, 19 years ago

Removed workaround for gcc 4.1.0

  • Property mode set to 100644
File size: 16.5 KB
Line 
1/*
2 * Copyright (C) 2006 Ondrej Palkovsky
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 *
9 * - Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 * - Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
14 * - The name of the author may not be used to endorse or promote products
15 * derived from this software without specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
18 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
19 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
20 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
21 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
22 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
26 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27 */
28
29/**
30 * Asynchronous library
31 *
32 * The aim of this library is facilitating writing programs utilizing
33 * the asynchronous nature of Helenos IPC, yet using a normal way
34 * of programming.
35 *
36 * You should be able to write very simple multithreaded programs,
37 * the async framework will automatically take care of most synchronization
38 * problems.
39 *
40 * Default semantics:
41 * - send() - send asynchronously. If the kernel refuses to send more
42 * messages, [ try to get responses from kernel, if nothing
43 * found, might try synchronous ]
44 *
45 * Example of use:
46 *
47 * 1) Multithreaded client application
48 * create_thread(thread1);
49 * create_thread(thread2);
50 * ...
51 *
52 * thread1() {
53 * conn = ipc_connect_me_to();
54 * c1 = send(conn);
55 * c2 = send(conn);
56 * wait_for(c1);
57 * wait_for(c2);
58 * }
59 *
60 *
61 * 2) Multithreaded server application
62 * main() {
63 * async_manager();
64 * }
65 *
66 *
67 * client_connection(icallid, *icall) {
68 * if (want_refuse) {
69 * ipc_answer_fast(icallid, ELIMIT, 0, 0);
70 * return;
71 * }
72 * ipc_answer_fast(icallid, 0, 0, 0);
73 *
74 * callid = async_get_call(&call);
75 * handle(callid, call);
76 * ipc_answer_fast(callid, 1,2,3);
77 *
78 * callid = async_get_call(&call);
79 * ....
80 * }
81 *
82 * TODO: Detaching/joining dead psthreads?
83 */
84#include <futex.h>
85#include <async.h>
86#include <psthread.h>
87#include <stdio.h>
88#include <libadt/hash_table.h>
89#include <libadt/list.h>
90#include <ipc/ipc.h>
91#include <assert.h>
92#include <errno.h>
93#include <time.h>
94#include <arch/barrier.h>
95
96atomic_t async_futex = FUTEX_INITIALIZER;
97static hash_table_t conn_hash_table;
98static LIST_INITIALIZE(timeout_list);
99
100typedef struct {
101 pstid_t ptid; /**< Thread waiting for this message */
102 int active; /**< If this thread is currently active */
103 int done; /**< If reply was received */
104 ipc_call_t *dataptr; /**< Pointer where the answer data
105 * should be stored */
106 struct timeval expires; /**< Expiration time for waiting thread */
107 int has_timeout; /**< If true, this struct is in timeout list */
108 link_t link;
109
110 ipcarg_t retval;
111} amsg_t;
112
113typedef struct {
114 link_t link;
115 ipc_callid_t callid;
116 ipc_call_t call;
117} msg_t;
118
119typedef struct {
120 link_t link;
121 ipcarg_t in_phone_hash; /**< Incoming phone hash. */
122 link_t msg_queue; /**< Messages that should be delivered to this thread */
123 pstid_t ptid; /**< Thread associated with this connection */
124 int active; /**< If this thread is currently active */
125 /* Structures for connection opening packet */
126 ipc_callid_t callid;
127 ipc_call_t call;
128 void (*cthread)(ipc_callid_t,ipc_call_t *);
129} connection_t;
130
131__thread connection_t *PS_connection;
132
133/** Add microseconds to give timeval */
134static void tv_add(struct timeval *tv, suseconds_t usecs)
135{
136 tv->tv_sec += usecs / 1000000;
137 tv->tv_usec += usecs % 1000000;
138 if (tv->tv_usec > 1000000) {
139 tv->tv_sec++;
140 tv->tv_usec -= 1000000;
141 }
142}
143
144/** Subtract 2 timevals, return microseconds difference */
145static suseconds_t tv_sub(struct timeval *tv1, struct timeval *tv2)
146{
147 suseconds_t result;
148
149 result = tv1->tv_usec - tv2->tv_usec;
150 result += (tv1->tv_sec - tv2->tv_sec) * 1000000;
151
152 return result;
153}
154
155/** Compare timeval
156 *
157 * @return 1 if tv1 > tv2, otherwise 0
158 */
159static int tv_gt(struct timeval *tv1, struct timeval *tv2)
160{
161 if (tv1->tv_sec > tv2->tv_sec)
162 return 1;
163 if (tv1->tv_sec == tv2->tv_sec && tv1->tv_usec > tv2->tv_usec)
164 return 1;
165 return 0;
166}
167static int tv_gteq(struct timeval *tv1, struct timeval *tv2)
168{
169 if (tv1->tv_sec > tv2->tv_sec)
170 return 1;
171 if (tv1->tv_sec == tv2->tv_sec && tv1->tv_usec >= tv2->tv_usec)
172 return 1;
173 return 0;
174}
175
176/* Hash table functions */
177#define CONN_HASH_TABLE_CHAINS 32
178
179static hash_index_t conn_hash(unsigned long *key)
180{
181 assert(key);
182 return ((*key) >> 4) % CONN_HASH_TABLE_CHAINS;
183}
184
185static int conn_compare(unsigned long key[], hash_count_t keys, link_t *item)
186{
187 connection_t *hs;
188
189 hs = hash_table_get_instance(item, connection_t, link);
190
191 return key[0] == hs->in_phone_hash;
192}
193
194static void conn_remove(link_t *item)
195{
196 free(hash_table_get_instance(item, connection_t, link));
197}
198
199
200/** Operations for NS hash table. */
201static hash_table_operations_t conn_hash_table_ops = {
202 .hash = conn_hash,
203 .compare = conn_compare,
204 .remove_callback = conn_remove
205};
206
207/*************************************************/
208
209/** Try to route a call to an appropriate connection thread
210 *
211 */
212static int route_call(ipc_callid_t callid, ipc_call_t *call)
213{
214 connection_t *conn;
215 msg_t *msg;
216 link_t *hlp;
217 unsigned long key;
218
219 futex_down(&async_futex);
220
221 key = call->in_phone_hash;
222 hlp = hash_table_find(&conn_hash_table, &key);
223 if (!hlp) {
224 futex_up(&async_futex);
225 return 0;
226 }
227 conn = hash_table_get_instance(hlp, connection_t, link);
228
229 msg = malloc(sizeof(*msg));
230 msg->callid = callid;
231 msg->call = *call;
232 list_append(&msg->link, &conn->msg_queue);
233
234 if (!conn->active) {
235 conn->active = 1;
236 psthread_add_ready(conn->ptid);
237 }
238
239 futex_up(&async_futex);
240
241 return 1;
242}
243
244/** Return new incoming message for current(thread-local) connection */
245ipc_callid_t async_get_call(ipc_call_t *call)
246{
247 msg_t *msg;
248 ipc_callid_t callid;
249
250 assert(PS_connection);
251
252 futex_down(&async_futex);
253
254 /* If nothing in queue, wait until something appears */
255 if (list_empty(&PS_connection->msg_queue)) {
256 PS_connection->active = 0;
257 psthread_schedule_next_adv(PS_TO_MANAGER);
258 }
259
260 msg = list_get_instance(PS_connection->msg_queue.next, msg_t, link);
261 list_remove(&msg->link);
262 callid = msg->callid;
263 *call = msg->call;
264 free(msg);
265
266 futex_up(&async_futex);
267 return callid;
268}
269
270/** Thread function that gets created on new connection
271 *
272 * This function is defined as a weak symbol - to be redefined in
273 * user code.
274 */
275void client_connection(ipc_callid_t callid, ipc_call_t *call)
276{
277 ipc_answer_fast(callid, ENOENT, 0, 0);
278}
279
280/** Function that gets called on interrupt receival
281 *
282 * This function is defined as a weak symbol - to be redefined in
283 * user code.
284 */
285void interrupt_received(ipc_call_t *call)
286{
287}
288
289
290/** Wrapper for client connection thread
291 *
292 * When new connection arrives, thread with this function is created.
293 * It calls client_connection and does final cleanup.
294 *
295 * @parameter arg Connection structure pointer
296 */
297static int connection_thread(void *arg)
298{
299 unsigned long key;
300 msg_t *msg;
301
302 /* Setup thread local connection pointer */
303 PS_connection = (connection_t *)arg;
304 PS_connection->cthread(PS_connection->callid, &PS_connection->call);
305
306 /* Remove myself from connection hash table */
307 futex_down(&async_futex);
308 key = PS_connection->in_phone_hash;
309 hash_table_remove(&conn_hash_table, &key, 1);
310 futex_up(&async_futex);
311 /* Answer all remaining messages with ehangup */
312 while (!list_empty(&PS_connection->msg_queue)) {
313 msg = list_get_instance(PS_connection->msg_queue.next, msg_t, link);
314 list_remove(&msg->link);
315 ipc_answer_fast(msg->callid, EHANGUP, 0, 0);
316 free(msg);
317 }
318}
319
320/** Create new thread for a new connection
321 *
322 * Creates new thread for connection, fills in connection
323 * structures and inserts it into the hash table, so that
324 * later we can easily do routing of messages to particular
325 * threads.
326 *
327 * @param in_phone_hash Identification of the incoming connection
328 * @param callid Callid of the IPC_M_CONNECT_ME_TO packet
329 * @param call Call data of the opening packet
330 * @param cthread Thread function that should be called upon
331 * opening the connection
332 * @return New thread id
333 */
334pstid_t async_new_connection(ipcarg_t in_phone_hash,ipc_callid_t callid,
335 ipc_call_t *call,
336 void (*cthread)(ipc_callid_t,ipc_call_t *))
337{
338 pstid_t ptid;
339 connection_t *conn;
340 unsigned long key;
341
342 conn = malloc(sizeof(*conn));
343 if (!conn) {
344 ipc_answer_fast(callid, ENOMEM, 0, 0);
345 return NULL;
346 }
347 conn->in_phone_hash = in_phone_hash;
348 list_initialize(&conn->msg_queue);
349 conn->ptid = psthread_create(connection_thread, conn);
350 conn->callid = callid;
351 if (call)
352 conn->call = *call;
353 conn->active = 1; /* We will activate it asap */
354 conn->cthread = cthread;
355 list_initialize(&conn->link);
356 if (!conn->ptid) {
357 free(conn);
358 ipc_answer_fast(callid, ENOMEM, 0, 0);
359 return NULL;
360 }
361 key = conn->in_phone_hash;
362 futex_down(&async_futex);
363 /* Add connection to hash table */
364 hash_table_insert(&conn_hash_table, &key, &conn->link);
365 futex_up(&async_futex);
366
367 psthread_add_ready(conn->ptid);
368
369 return conn->ptid;
370}
371
372/** Handle call that was received */
373static void handle_call(ipc_callid_t callid, ipc_call_t *call)
374{
375 /* Unrouted call - do some default behaviour */
376 switch (IPC_GET_METHOD(*call)) {
377 case IPC_M_INTERRUPT:
378 interrupt_received(call);
379 return;
380 case IPC_M_CONNECT_ME_TO:
381 /* Open new connection with thread etc. */
382 async_new_connection(IPC_GET_ARG3(*call), callid, call, client_connection);
383 return;
384 }
385
386 /* Try to route call through connection tables */
387 if (route_call(callid, call))
388 return;
389
390 /* Unknown call from unknown phone - hang it up */
391 ipc_answer_fast(callid, EHANGUP, 0, 0);
392}
393
394/** Fire all timeouts that expired */
395static void handle_expired_timeouts(void)
396{
397 struct timeval tv;
398 amsg_t *amsg;
399 link_t *cur;
400
401 gettimeofday(&tv,NULL);
402 futex_down(&async_futex);
403
404 cur = timeout_list.next;
405 while (cur != &timeout_list) {
406 amsg = list_get_instance(cur,amsg_t,link);
407 if (tv_gt(&amsg->expires, &tv))
408 break;
409 cur = cur->next;
410 list_remove(&amsg->link);
411 amsg->has_timeout = 0;
412 /* Redundant condition? The thread should not
413 * be active when it gets here.
414 */
415 if (!amsg->active) {
416 amsg->active = 1;
417 psthread_add_ready(amsg->ptid);
418 }
419 }
420
421 futex_up(&async_futex);
422}
423
424/** Endless loop dispatching incoming calls and answers */
425int async_manager(void)
426{
427 ipc_call_t call;
428 ipc_callid_t callid;
429 int timeout;
430 amsg_t *amsg;
431 struct timeval tv;
432
433 while (1) {
434 if (psthread_schedule_next_adv(PS_FROM_MANAGER)) {
435 futex_up(&async_futex); /* async_futex is always held
436 * when entering manager thread
437 */
438 continue;
439 }
440 futex_down(&async_futex);
441 if (!list_empty(&timeout_list)) {
442 amsg = list_get_instance(timeout_list.next,amsg_t,link);
443 gettimeofday(&tv,NULL);
444 if (tv_gteq(&tv, &amsg->expires)) {
445 handle_expired_timeouts();
446 continue;
447 } else
448 timeout = tv_sub(&amsg->expires, &tv);
449 } else
450 timeout = SYNCH_NO_TIMEOUT;
451 futex_up(&async_futex);
452
453 callid = ipc_wait_cycle(&call, timeout, SYNCH_BLOCKING);
454
455 if (!callid) {
456 handle_expired_timeouts();
457 continue;
458 }
459
460 if (callid & IPC_CALLID_ANSWERED)
461 continue;
462
463 handle_call(callid, &call);
464 }
465}
466
467/** Function to start async_manager as a standalone thread
468 *
469 * When more kernel threads are used, one async manager should
470 * exist per thread. The particular implementation may change,
471 * currently one async_manager is started automatically per kernel
472 * thread except main thread.
473 */
474static int async_manager_thread(void *arg)
475{
476 futex_up(&async_futex); /* async_futex is always locked when entering
477 * manager */
478 async_manager();
479}
480
481/** Add one manager to manager list */
482void async_create_manager(void)
483{
484 pstid_t ptid;
485
486 ptid = psthread_create(async_manager_thread, NULL);
487 psthread_add_manager(ptid);
488}
489
490/** Remove one manager from manager list */
491void async_destroy_manager(void)
492{
493 psthread_remove_manager();
494}
495
496/** Initialize internal structures needed for async manager */
497int _async_init(void)
498{
499 if (!hash_table_create(&conn_hash_table, CONN_HASH_TABLE_CHAINS, 1, &conn_hash_table_ops)) {
500 printf("%s: cannot create hash table\n", "async");
501 return ENOMEM;
502 }
503
504}
505
506/** IPC handler for messages in async framework
507 *
508 * Notify thread that is waiting for this message, that it arrived
509 */
510static void reply_received(void *private, int retval,
511 ipc_call_t *data)
512{
513 amsg_t *msg = (amsg_t *) private;
514
515 msg->retval = retval;
516
517 futex_down(&async_futex);
518 /* Copy data after futex_down, just in case the
519 * call was detached
520 */
521 if (msg->dataptr)
522 *msg->dataptr = *data;
523
524 write_barrier();
525 /* Remove message from timeout list */
526 if (msg->has_timeout)
527 list_remove(&msg->link);
528 msg->done = 1;
529 if (! msg->active) {
530 msg->active = 1;
531 psthread_add_ready(msg->ptid);
532 }
533 futex_up(&async_futex);
534}
535
536/** Send message and return id of the sent message
537 *
538 * The return value can be used as input for async_wait() to wait
539 * for completion.
540 */
541aid_t async_send_2(int phoneid, ipcarg_t method, ipcarg_t arg1, ipcarg_t arg2,
542 ipc_call_t *dataptr)
543{
544 amsg_t *msg;
545
546 msg = malloc(sizeof(*msg));
547 msg->active = 1;
548 msg->done = 0;
549 msg->dataptr = dataptr;
550 ipc_call_async_2(phoneid,method,arg1,arg2,msg,reply_received);
551
552 return (aid_t) msg;
553}
554
555/** Wait for a message sent by async framework
556 *
557 * @param amsgid Message ID to wait for
558 * @param retval Pointer to variable where will be stored retval
559 * of the answered message. If NULL, it is ignored.
560 *
561 */
562void async_wait_for(aid_t amsgid, ipcarg_t *retval)
563{
564 amsg_t *msg = (amsg_t *) amsgid;
565 connection_t *conn;
566
567 futex_down(&async_futex);
568 if (msg->done) {
569 futex_up(&async_futex);
570 goto done;
571 }
572
573 msg->ptid = psthread_get_id();
574 msg->active = 0;
575 msg->has_timeout = 0;
576 /* Leave locked async_futex when entering this function */
577 psthread_schedule_next_adv(PS_TO_MANAGER);
578 /* futex is up automatically after psthread_schedule_next...*/
579done:
580 if (retval)
581 *retval = msg->retval;
582 free(msg);
583}
584
585/** Insert sort timeout msg into timeouts list
586 *
587 * Assume async_futex is held
588 */
589static void insert_timeout(amsg_t *msg)
590{
591 link_t *tmp;
592 amsg_t *cur;
593
594 tmp = timeout_list.next;
595 while (tmp != &timeout_list) {
596 cur = list_get_instance(tmp, amsg_t, link);
597 if (tv_gteq(&cur->expires, &msg->expires))
598 break;
599 tmp = tmp->next;
600 }
601 list_append(&msg->link, tmp);
602}
603
604/** Wait for a message sent by async framework with timeout
605 *
606 * @param amsgid Message ID to wait for
607 * @param retval Pointer to variable where will be stored retval
608 * of the answered message. If NULL, it is ignored.
609 * @param timeout Timeout in usecs
610 * @return 0 on success, ETIMEOUT if timeout expired
611 *
612 */
613int async_wait_timeout(aid_t amsgid, ipcarg_t *retval, suseconds_t timeout)
614{
615 amsg_t *msg = (amsg_t *) amsgid;
616 connection_t *conn;
617
618 futex_down(&async_futex);
619 if (msg->done) {
620 futex_up(&async_futex);
621 goto done;
622 }
623
624 msg->ptid = psthread_get_id();
625 msg->active = 0;
626 msg->has_timeout = 1;
627
628 gettimeofday(&msg->expires, NULL);
629 tv_add(&msg->expires, timeout);
630 insert_timeout(msg);
631
632 /* Leave locked async_futex when entering this function */
633 psthread_schedule_next_adv(PS_TO_MANAGER);
634 /* futex is up automatically after psthread_schedule_next...*/
635
636 if (!msg->done)
637 return ETIMEOUT;
638
639done:
640 if (retval)
641 *retval = msg->retval;
642 free(msg);
643
644 return 0;
645}
646
647/** Wait specified time, but in the meantime handle incoming events
648 *
649 * @param timeout Time in microseconds to wait
650 */
651void async_usleep(suseconds_t timeout)
652{
653 amsg_t *msg;
654
655 msg = malloc(sizeof(*msg));
656 if (!msg)
657 return;
658
659 msg->ptid = psthread_get_id();
660 msg->active = 0;
661 msg->has_timeout = 1;
662
663 gettimeofday(&msg->expires, NULL);
664 tv_add(&msg->expires, timeout);
665
666 futex_down(&async_futex);
667 insert_timeout(msg);
668 /* Leave locked async_futex when entering this function */
669 psthread_schedule_next_adv(PS_TO_MANAGER);
670 /* futex is up automatically after psthread_schedule_next...*/
671 free(msg);
672}
Note: See TracBrowser for help on using the repository browser.