source: mainline/libc/generic/async.c@ da0c91e7

lfn serial ticket/834-toolchain-update topic/msim-upgrade topic/simplify-dev-export
Last change on this file since da0c91e7 was da0c91e7, checked in by Ondrej Palkovsky <ondrap@…>, 19 years ago

Added very preliminary support for console on architectures
that do not support framebuffer.

  • Property mode set to 100644
File size: 16.9 KB
Line 
1/*
2 * Copyright (C) 2006 Ondrej Palkovsky
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 *
9 * - Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 * - Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
14 * - The name of the author may not be used to endorse or promote products
15 * derived from this software without specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
18 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
19 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
20 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
21 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
22 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
26 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27 */
28
29/**
30 * Asynchronous library
31 *
32 * The aim of this library is facilitating writing programs utilizing
33 * the asynchronous nature of Helenos IPC, yet using a normal way
34 * of programming.
35 *
36 * You should be able to write very simple multithreaded programs,
37 * the async framework will automatically take care of most synchronization
38 * problems.
39 *
40 * Default semantics:
41 * - send() - send asynchronously. If the kernel refuses to send more
42 * messages, [ try to get responses from kernel, if nothing
43 * found, might try synchronous ]
44 *
45 * Example of use:
46 *
47 * 1) Multithreaded client application
48 * create_thread(thread1);
49 * create_thread(thread2);
50 * ...
51 *
52 * thread1() {
53 * conn = ipc_connect_me_to();
54 * c1 = send(conn);
55 * c2 = send(conn);
56 * wait_for(c1);
57 * wait_for(c2);
58 * }
59 *
60 *
61 * 2) Multithreaded server application
62 * main() {
63 * async_manager();
64 * }
65 *
66 *
67 * client_connection(icallid, *icall) {
68 * if (want_refuse) {
69 * ipc_answer_fast(icallid, ELIMIT, 0, 0);
70 * return;
71 * }
72 * ipc_answer_fast(icallid, 0, 0, 0);
73 *
74 * callid = async_get_call(&call);
75 * handle(callid, call);
76 * ipc_answer_fast(callid, 1,2,3);
77 *
78 * callid = async_get_call(&call);
79 * ....
80 * }
81 *
82 * TODO: Detaching/joining dead psthreads?
83 */
84#include <futex.h>
85#include <async.h>
86#include <psthread.h>
87#include <stdio.h>
88#include <libadt/hash_table.h>
89#include <libadt/list.h>
90#include <ipc/ipc.h>
91#include <assert.h>
92#include <errno.h>
93#include <time.h>
94#include <arch/barrier.h>
95
96atomic_t async_futex = FUTEX_INITIALIZER;
97static hash_table_t conn_hash_table;
98static LIST_INITIALIZE(timeout_list);
99
100typedef struct {
101 pstid_t ptid; /**< Thread waiting for this message */
102 int active; /**< If this thread is currently active */
103 int done; /**< If reply was received */
104 ipc_call_t *dataptr; /**< Pointer where the answer data
105 * should be stored */
106 struct timeval expires; /**< Expiration time for waiting thread */
107 int has_timeout; /**< If true, this struct is in timeout list */
108 link_t link;
109
110 ipcarg_t retval;
111} amsg_t;
112
113typedef struct {
114 link_t link;
115 ipc_callid_t callid;
116 ipc_call_t call;
117} msg_t;
118
119typedef struct {
120 link_t link;
121 ipcarg_t in_phone_hash; /**< Incoming phone hash. */
122 link_t msg_queue; /**< Messages that should be delivered to this thread */
123 pstid_t ptid; /**< Thread associated with this connection */
124 int active; /**< If this thread is currently active */
125 /* Structures for connection opening packet */
126 ipc_callid_t callid;
127 ipc_call_t call;
128 void (*cthread)(ipc_callid_t,ipc_call_t *);
129} connection_t;
130
131
132__thread connection_t *PS_connection;
133
134static void default_client_connection(ipc_callid_t callid, ipc_call_t *call);
135static async_client_conn_t client_connection = default_client_connection;
136
137/** Add microseconds to give timeval */
138static void tv_add(struct timeval *tv, suseconds_t usecs)
139{
140 tv->tv_sec += usecs / 1000000;
141 tv->tv_usec += usecs % 1000000;
142 if (tv->tv_usec > 1000000) {
143 tv->tv_sec++;
144 tv->tv_usec -= 1000000;
145 }
146}
147
148/** Subtract 2 timevals, return microseconds difference */
149static suseconds_t tv_sub(struct timeval *tv1, struct timeval *tv2)
150{
151 suseconds_t result;
152
153 result = tv1->tv_usec - tv2->tv_usec;
154 result += (tv1->tv_sec - tv2->tv_sec) * 1000000;
155
156 return result;
157}
158
159/** Compare timeval
160 *
161 * @return 1 if tv1 > tv2, otherwise 0
162 */
163static int tv_gt(struct timeval *tv1, struct timeval *tv2)
164{
165 if (tv1->tv_sec > tv2->tv_sec)
166 return 1;
167 if (tv1->tv_sec == tv2->tv_sec && tv1->tv_usec > tv2->tv_usec)
168 return 1;
169 return 0;
170}
171static int tv_gteq(struct timeval *tv1, struct timeval *tv2)
172{
173 if (tv1->tv_sec > tv2->tv_sec)
174 return 1;
175 if (tv1->tv_sec == tv2->tv_sec && tv1->tv_usec >= tv2->tv_usec)
176 return 1;
177 return 0;
178}
179
180/* Hash table functions */
181#define CONN_HASH_TABLE_CHAINS 32
182
183static hash_index_t conn_hash(unsigned long *key)
184{
185 assert(key);
186 return ((*key) >> 4) % CONN_HASH_TABLE_CHAINS;
187}
188
189static int conn_compare(unsigned long key[], hash_count_t keys, link_t *item)
190{
191 connection_t *hs;
192
193 hs = hash_table_get_instance(item, connection_t, link);
194
195 return key[0] == hs->in_phone_hash;
196}
197
198static void conn_remove(link_t *item)
199{
200 free(hash_table_get_instance(item, connection_t, link));
201}
202
203
204/** Operations for NS hash table. */
205static hash_table_operations_t conn_hash_table_ops = {
206 .hash = conn_hash,
207 .compare = conn_compare,
208 .remove_callback = conn_remove
209};
210
211/*************************************************/
212
213/** Try to route a call to an appropriate connection thread
214 *
215 */
216static int route_call(ipc_callid_t callid, ipc_call_t *call)
217{
218 connection_t *conn;
219 msg_t *msg;
220 link_t *hlp;
221 unsigned long key;
222
223 futex_down(&async_futex);
224
225 key = call->in_phone_hash;
226 hlp = hash_table_find(&conn_hash_table, &key);
227 if (!hlp) {
228 futex_up(&async_futex);
229 return 0;
230 }
231 conn = hash_table_get_instance(hlp, connection_t, link);
232
233 msg = malloc(sizeof(*msg));
234 msg->callid = callid;
235 msg->call = *call;
236 list_append(&msg->link, &conn->msg_queue);
237
238 if (!conn->active) {
239 conn->active = 1;
240 psthread_add_ready(conn->ptid);
241 }
242
243 futex_up(&async_futex);
244
245 return 1;
246}
247
248/** Return new incoming message for current(thread-local) connection */
249ipc_callid_t async_get_call(ipc_call_t *call)
250{
251 msg_t *msg;
252 ipc_callid_t callid;
253
254 assert(PS_connection);
255
256 futex_down(&async_futex);
257
258 /* If nothing in queue, wait until something appears */
259 if (list_empty(&PS_connection->msg_queue)) {
260 PS_connection->active = 0;
261 psthread_schedule_next_adv(PS_TO_MANAGER);
262 }
263
264 msg = list_get_instance(PS_connection->msg_queue.next, msg_t, link);
265 list_remove(&msg->link);
266 callid = msg->callid;
267 *call = msg->call;
268 free(msg);
269
270 futex_up(&async_futex);
271 return callid;
272}
273
274/** Thread function that gets created on new connection
275 *
276 * This function is defined as a weak symbol - to be redefined in
277 * user code.
278 */
279static void default_client_connection(ipc_callid_t callid, ipc_call_t *call)
280{
281 ipc_answer_fast(callid, ENOENT, 0, 0);
282}
283
284/** Function that gets called on interrupt receival
285 *
286 * This function is defined as a weak symbol - to be redefined in
287 * user code.
288 */
289void interrupt_received(ipc_call_t *call)
290{
291}
292
293
294/** Wrapper for client connection thread
295 *
296 * When new connection arrives, thread with this function is created.
297 * It calls client_connection and does final cleanup.
298 *
299 * @parameter arg Connection structure pointer
300 */
301static int connection_thread(void *arg)
302{
303 unsigned long key;
304 msg_t *msg;
305
306 /* Setup thread local connection pointer */
307 PS_connection = (connection_t *)arg;
308 PS_connection->cthread(PS_connection->callid, &PS_connection->call);
309
310 /* Remove myself from connection hash table */
311 futex_down(&async_futex);
312 key = PS_connection->in_phone_hash;
313 hash_table_remove(&conn_hash_table, &key, 1);
314 futex_up(&async_futex);
315 /* Answer all remaining messages with ehangup */
316 while (!list_empty(&PS_connection->msg_queue)) {
317 msg = list_get_instance(PS_connection->msg_queue.next, msg_t, link);
318 list_remove(&msg->link);
319 ipc_answer_fast(msg->callid, EHANGUP, 0, 0);
320 free(msg);
321 }
322}
323
324/** Create new thread for a new connection
325 *
326 * Creates new thread for connection, fills in connection
327 * structures and inserts it into the hash table, so that
328 * later we can easily do routing of messages to particular
329 * threads.
330 *
331 * @param in_phone_hash Identification of the incoming connection
332 * @param callid Callid of the IPC_M_CONNECT_ME_TO packet
333 * @param call Call data of the opening packet
334 * @param cthread Thread function that should be called upon
335 * opening the connection
336 * @return New thread id
337 */
338pstid_t async_new_connection(ipcarg_t in_phone_hash,ipc_callid_t callid,
339 ipc_call_t *call,
340 void (*cthread)(ipc_callid_t,ipc_call_t *))
341{
342 pstid_t ptid;
343 connection_t *conn;
344 unsigned long key;
345
346 conn = malloc(sizeof(*conn));
347 if (!conn) {
348 ipc_answer_fast(callid, ENOMEM, 0, 0);
349 return NULL;
350 }
351 conn->in_phone_hash = in_phone_hash;
352 list_initialize(&conn->msg_queue);
353 conn->ptid = psthread_create(connection_thread, conn);
354 conn->callid = callid;
355 if (call)
356 conn->call = *call;
357 conn->active = 1; /* We will activate it asap */
358 conn->cthread = cthread;
359 list_initialize(&conn->link);
360 if (!conn->ptid) {
361 free(conn);
362 ipc_answer_fast(callid, ENOMEM, 0, 0);
363 return NULL;
364 }
365 key = conn->in_phone_hash;
366 futex_down(&async_futex);
367 /* Add connection to hash table */
368 hash_table_insert(&conn_hash_table, &key, &conn->link);
369 futex_up(&async_futex);
370
371 psthread_add_ready(conn->ptid);
372
373 return conn->ptid;
374}
375
376/** Handle call that was received */
377static void handle_call(ipc_callid_t callid, ipc_call_t *call)
378{
379 /* Unrouted call - do some default behaviour */
380 switch (IPC_GET_METHOD(*call)) {
381 case IPC_M_INTERRUPT:
382 interrupt_received(call);
383 return;
384 case IPC_M_CONNECT_ME_TO:
385 /* Open new connection with thread etc. */
386 async_new_connection(IPC_GET_ARG3(*call), callid, call, client_connection);
387 return;
388 }
389
390 /* Try to route call through connection tables */
391 if (route_call(callid, call))
392 return;
393
394 /* Unknown call from unknown phone - hang it up */
395 ipc_answer_fast(callid, EHANGUP, 0, 0);
396}
397
398/** Fire all timeouts that expired */
399static void handle_expired_timeouts(void)
400{
401 struct timeval tv;
402 amsg_t *amsg;
403 link_t *cur;
404
405 gettimeofday(&tv,NULL);
406 futex_down(&async_futex);
407
408 cur = timeout_list.next;
409 while (cur != &timeout_list) {
410 amsg = list_get_instance(cur,amsg_t,link);
411 if (tv_gt(&amsg->expires, &tv))
412 break;
413 cur = cur->next;
414 list_remove(&amsg->link);
415 amsg->has_timeout = 0;
416 /* Redundant condition? The thread should not
417 * be active when it gets here.
418 */
419 if (!amsg->active) {
420 amsg->active = 1;
421 psthread_add_ready(amsg->ptid);
422 }
423 }
424
425 futex_up(&async_futex);
426}
427
428/** Endless loop dispatching incoming calls and answers */
429int async_manager(void)
430{
431 ipc_call_t call;
432 ipc_callid_t callid;
433 int timeout;
434 amsg_t *amsg;
435 struct timeval tv;
436
437 while (1) {
438 if (psthread_schedule_next_adv(PS_FROM_MANAGER)) {
439 futex_up(&async_futex); /* async_futex is always held
440 * when entering manager thread
441 */
442 continue;
443 }
444 futex_down(&async_futex);
445 if (!list_empty(&timeout_list)) {
446 amsg = list_get_instance(timeout_list.next,amsg_t,link);
447 gettimeofday(&tv,NULL);
448 if (tv_gteq(&tv, &amsg->expires)) {
449 handle_expired_timeouts();
450 continue;
451 } else
452 timeout = tv_sub(&amsg->expires, &tv);
453 } else
454 timeout = SYNCH_NO_TIMEOUT;
455 futex_up(&async_futex);
456
457 callid = ipc_wait_cycle(&call, timeout, SYNCH_BLOCKING);
458
459 if (!callid) {
460 handle_expired_timeouts();
461 continue;
462 }
463
464 if (callid & IPC_CALLID_ANSWERED)
465 continue;
466
467 handle_call(callid, &call);
468 }
469}
470
471/** Function to start async_manager as a standalone thread
472 *
473 * When more kernel threads are used, one async manager should
474 * exist per thread. The particular implementation may change,
475 * currently one async_manager is started automatically per kernel
476 * thread except main thread.
477 */
478static int async_manager_thread(void *arg)
479{
480 futex_up(&async_futex); /* async_futex is always locked when entering
481 * manager */
482 async_manager();
483}
484
485/** Add one manager to manager list */
486void async_create_manager(void)
487{
488 pstid_t ptid;
489
490 ptid = psthread_create(async_manager_thread, NULL);
491 psthread_add_manager(ptid);
492}
493
494/** Remove one manager from manager list */
495void async_destroy_manager(void)
496{
497 psthread_remove_manager();
498}
499
500/** Initialize internal structures needed for async manager */
501int _async_init(void)
502{
503 if (!hash_table_create(&conn_hash_table, CONN_HASH_TABLE_CHAINS, 1, &conn_hash_table_ops)) {
504 printf("%s: cannot create hash table\n", "async");
505 return ENOMEM;
506 }
507
508}
509
510/** IPC handler for messages in async framework
511 *
512 * Notify thread that is waiting for this message, that it arrived
513 */
514static void reply_received(void *private, int retval,
515 ipc_call_t *data)
516{
517 amsg_t *msg = (amsg_t *) private;
518
519 msg->retval = retval;
520
521 futex_down(&async_futex);
522 /* Copy data after futex_down, just in case the
523 * call was detached
524 */
525 if (msg->dataptr)
526 *msg->dataptr = *data;
527
528 write_barrier();
529 /* Remove message from timeout list */
530 if (msg->has_timeout)
531 list_remove(&msg->link);
532 msg->done = 1;
533 if (! msg->active) {
534 msg->active = 1;
535 psthread_add_ready(msg->ptid);
536 }
537 futex_up(&async_futex);
538}
539
540/** Send message and return id of the sent message
541 *
542 * The return value can be used as input for async_wait() to wait
543 * for completion.
544 */
545aid_t async_send_2(int phoneid, ipcarg_t method, ipcarg_t arg1, ipcarg_t arg2,
546 ipc_call_t *dataptr)
547{
548 amsg_t *msg;
549
550 msg = malloc(sizeof(*msg));
551 msg->active = 1;
552 msg->done = 0;
553 msg->dataptr = dataptr;
554 ipc_call_async_2(phoneid,method,arg1,arg2,msg,reply_received);
555
556 return (aid_t) msg;
557}
558
559/** Wait for a message sent by async framework
560 *
561 * @param amsgid Message ID to wait for
562 * @param retval Pointer to variable where will be stored retval
563 * of the answered message. If NULL, it is ignored.
564 *
565 */
566void async_wait_for(aid_t amsgid, ipcarg_t *retval)
567{
568 amsg_t *msg = (amsg_t *) amsgid;
569 connection_t *conn;
570
571 futex_down(&async_futex);
572 if (msg->done) {
573 futex_up(&async_futex);
574 goto done;
575 }
576
577 msg->ptid = psthread_get_id();
578 msg->active = 0;
579 msg->has_timeout = 0;
580 /* Leave locked async_futex when entering this function */
581 psthread_schedule_next_adv(PS_TO_MANAGER);
582 /* futex is up automatically after psthread_schedule_next...*/
583done:
584 if (retval)
585 *retval = msg->retval;
586 free(msg);
587}
588
589/** Insert sort timeout msg into timeouts list
590 *
591 * Assume async_futex is held
592 */
593static void insert_timeout(amsg_t *msg)
594{
595 link_t *tmp;
596 amsg_t *cur;
597
598 tmp = timeout_list.next;
599 while (tmp != &timeout_list) {
600 cur = list_get_instance(tmp, amsg_t, link);
601 if (tv_gteq(&cur->expires, &msg->expires))
602 break;
603 tmp = tmp->next;
604 }
605 list_append(&msg->link, tmp);
606}
607
608/** Wait for a message sent by async framework with timeout
609 *
610 * @param amsgid Message ID to wait for
611 * @param retval Pointer to variable where will be stored retval
612 * of the answered message. If NULL, it is ignored.
613 * @param timeout Timeout in usecs
614 * @return 0 on success, ETIMEOUT if timeout expired
615 *
616 */
617int async_wait_timeout(aid_t amsgid, ipcarg_t *retval, suseconds_t timeout)
618{
619 amsg_t *msg = (amsg_t *) amsgid;
620 connection_t *conn;
621
622 futex_down(&async_futex);
623 if (msg->done) {
624 futex_up(&async_futex);
625 goto done;
626 }
627
628 msg->ptid = psthread_get_id();
629 msg->active = 0;
630 msg->has_timeout = 1;
631
632 gettimeofday(&msg->expires, NULL);
633 tv_add(&msg->expires, timeout);
634 insert_timeout(msg);
635
636 /* Leave locked async_futex when entering this function */
637 psthread_schedule_next_adv(PS_TO_MANAGER);
638 /* futex is up automatically after psthread_schedule_next...*/
639
640 if (!msg->done)
641 return ETIMEOUT;
642
643done:
644 if (retval)
645 *retval = msg->retval;
646 free(msg);
647
648 return 0;
649}
650
651/** Wait specified time, but in the meantime handle incoming events
652 *
653 * @param timeout Time in microseconds to wait
654 */
655void async_usleep(suseconds_t timeout)
656{
657 amsg_t *msg;
658
659 msg = malloc(sizeof(*msg));
660 if (!msg)
661 return;
662
663 msg->ptid = psthread_get_id();
664 msg->active = 0;
665 msg->has_timeout = 1;
666
667 gettimeofday(&msg->expires, NULL);
668 tv_add(&msg->expires, timeout);
669
670 futex_down(&async_futex);
671 insert_timeout(msg);
672 /* Leave locked async_futex when entering this function */
673 psthread_schedule_next_adv(PS_TO_MANAGER);
674 /* futex is up automatically after psthread_schedule_next...*/
675 free(msg);
676}
677
678/** Set function that is called, IPC_M_CONNECT_ME_TO is received
679 *
680 * @param conn Function that will form new psthread.
681 */
682void async_set_client_connection(async_client_conn_t conn)
683{
684 client_connection = conn;
685}
Note: See TracBrowser for help on using the repository browser.