source: mainline/libc/generic/async.c@ eaf34f7

lfn serial ticket/834-toolchain-update topic/msim-upgrade topic/simplify-dev-export
Last change on this file since eaf34f7 was eaf34f7, checked in by Ondrej Palkovsky <ondrap@…>, 19 years ago

Modified console to use new async framework.

  • Property mode set to 100644
File size: 16.3 KB
Line 
1/*
2 * Copyright (C) 2006 Ondrej Palkovsky
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 *
9 * - Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 * - Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
14 * - The name of the author may not be used to endorse or promote products
15 * derived from this software without specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
18 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
19 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
20 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
21 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
22 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
26 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27 */
28
29/**
30 * Asynchronous library
31 *
32 * The aim of this library is facilitating writing programs utilizing
33 * the asynchronous nature of Helenos IPC, yet using a normal way
34 * of programming.
35 *
36 * You should be able to write very simple multithreaded programs,
37 * the async framework will automatically take care of most synchronization
38 * problems.
39 *
40 * Default semantics:
41 * - send() - send asynchronously. If the kernel refuses to send more
42 * messages, [ try to get responses from kernel, if nothing
43 * found, might try synchronous ]
44 *
45 * Example of use:
46 *
47 * 1) Multithreaded client application
48 * create_thread(thread1);
49 * create_thread(thread2);
50 * ...
51 *
52 * thread1() {
53 * conn = ipc_connect_me_to();
54 * c1 = send(conn);
55 * c2 = send(conn);
56 * wait_for(c1);
57 * wait_for(c2);
58 * }
59 *
60 *
61 * 2) Multithreaded server application
62 * main() {
63 * async_manager();
64 * }
65 *
66 *
67 * client_connection(icallid, *icall) {
68 * if (want_refuse) {
69 * ipc_answer_fast(icallid, ELIMIT, 0, 0);
70 * return;
71 * }
72 * ipc_answer_fast(icallid, 0, 0, 0);
73 *
74 * callid = async_get_call(&call);
75 * handle(callid, call);
76 * ipc_answer_fast(callid, 1,2,3);
77 *
78 * callid = async_get_call(&call);
79 * ....
80 * }
81 *
82 * TODO: Detaching/joining dead psthreads?
83 */
84#include <futex.h>
85#include <async.h>
86#include <psthread.h>
87#include <stdio.h>
88#include <libadt/hash_table.h>
89#include <libadt/list.h>
90#include <ipc/ipc.h>
91#include <assert.h>
92#include <errno.h>
93#include <time.h>
94#include <arch/barrier.h>
95
96static atomic_t async_futex = FUTEX_INITIALIZER;
97static hash_table_t conn_hash_table;
98static LIST_INITIALIZE(timeout_list);
99
100typedef struct {
101 pstid_t ptid; /**< Thread waiting for this message */
102 int active; /**< If this thread is currently active */
103 int done; /**< If reply was received */
104 ipc_call_t *dataptr; /**< Pointer where the answer data
105 * should be stored */
106 struct timeval expires; /**< Expiration time for waiting thread */
107 int has_timeout; /**< If true, this struct is in timeout list */
108 link_t link;
109
110 ipcarg_t retval;
111} amsg_t;
112
113typedef struct {
114 link_t link;
115 ipc_callid_t callid;
116 ipc_call_t call;
117} msg_t;
118
119typedef struct {
120 link_t link;
121 ipcarg_t in_phone_hash; /**< Incoming phone hash. */
122 link_t msg_queue; /**< Messages that should be delivered to this thread */
123 pstid_t ptid; /**< Thread associated with this connection */
124 int active; /**< If this thread is currently active */
125 /* Structures for connection opening packet */
126 ipc_callid_t callid;
127 ipc_call_t call;
128 void (*cthread)(ipc_callid_t,ipc_call_t *);
129} connection_t;
130
131__thread connection_t *PS_connection;
132
133/** Add microseconds to give timeval */
134static void tv_add(struct timeval *tv, suseconds_t usecs)
135{
136 tv->tv_sec += usecs / 1000000;
137 tv->tv_usec += usecs % 1000000;
138 if (tv->tv_usec > 1000000) {
139 tv->tv_sec++;
140 tv->tv_usec -= 1000000;
141 }
142}
143
144/** Subtract 2 timevals, return microseconds difference */
145static suseconds_t tv_sub(struct timeval *tv1, struct timeval *tv2)
146{
147 suseconds_t result;
148
149 result = tv1->tv_usec - tv2->tv_usec;
150 result += (tv1->tv_sec - tv2->tv_sec) * 1000000;
151
152 return result;
153}
154
155/** Compare timeval
156 *
157 * @return 1 if tv1 > tv2, otherwise 0
158 */
159static int tv_gt(struct timeval *tv1, struct timeval *tv2)
160{
161 if (tv1->tv_sec > tv2->tv_sec)
162 return 1;
163 if (tv1->tv_sec == tv2->tv_sec && tv1->tv_usec > tv2->tv_usec)
164 return 1;
165 return 0;
166}
167
168/* Hash table functions */
169#define CONN_HASH_TABLE_CHAINS 32
170
171static hash_index_t conn_hash(unsigned long *key)
172{
173 assert(key);
174 return ((*key) >> 4) % CONN_HASH_TABLE_CHAINS;
175}
176
177static int conn_compare(unsigned long key[], hash_count_t keys, link_t *item)
178{
179 connection_t *hs;
180
181 hs = hash_table_get_instance(item, connection_t, link);
182
183 return key[0] == hs->in_phone_hash;
184}
185
186static void conn_remove(link_t *item)
187{
188 free(hash_table_get_instance(item, connection_t, link));
189}
190
191
192/** Operations for NS hash table. */
193static hash_table_operations_t conn_hash_table_ops = {
194 .hash = conn_hash,
195 .compare = conn_compare,
196 .remove_callback = conn_remove
197};
198
199/*************************************************/
200
201/** Try to route a call to an appropriate connection thread
202 *
203 */
204static int route_call(ipc_callid_t callid, ipc_call_t *call)
205{
206 connection_t *conn;
207 msg_t *msg;
208 link_t *hlp;
209 unsigned long key;
210
211 futex_down(&async_futex);
212
213 key = call->in_phone_hash;
214 hlp = hash_table_find(&conn_hash_table, &key);
215 if (!hlp) {
216 futex_up(&async_futex);
217 return 0;
218 }
219 conn = hash_table_get_instance(hlp, connection_t, link);
220
221 msg = malloc(sizeof(*msg));
222 msg->callid = callid;
223 msg->call = *call;
224 list_append(&msg->link, &conn->msg_queue);
225
226 if (!conn->active) {
227 conn->active = 1;
228 psthread_add_ready(conn->ptid);
229 }
230
231 futex_up(&async_futex);
232
233 return 1;
234}
235
236/** Return new incoming message for current(thread-local) connection */
237ipc_callid_t async_get_call(ipc_call_t *call)
238{
239 msg_t *msg;
240 ipc_callid_t callid;
241 connection_t *conn;
242
243 futex_down(&async_futex);
244
245 conn = PS_connection;
246 /* If nothing in queue, wait until something appears */
247 if (list_empty(&conn->msg_queue)) {
248 conn->active = 0;
249 psthread_schedule_next_adv(PS_TO_MANAGER);
250 }
251
252 msg = list_get_instance(conn->msg_queue.next, msg_t, link);
253 list_remove(&msg->link);
254 callid = msg->callid;
255 *call = msg->call;
256 free(msg);
257
258 futex_up(&async_futex);
259 return callid;
260}
261
262/** Thread function that gets created on new connection
263 *
264 * This function is defined as a weak symbol - to be redefined in
265 * user code.
266 */
267void client_connection(ipc_callid_t callid, ipc_call_t *call)
268{
269 ipc_answer_fast(callid, ENOENT, 0, 0);
270}
271
272/** Function that gets called on interrupt receival
273 *
274 * This function is defined as a weak symbol - to be redefined in
275 * user code.
276 */
277void interrupt_received(ipc_call_t *call)
278{
279}
280
281
282/** Wrapper for client connection thread
283 *
284 * When new connection arrives, thread with this function is created.
285 * It calls client_connection and does final cleanup.
286 *
287 * @parameter arg Connection structure pointer
288 */
289static int connection_thread(void *arg)
290{
291 unsigned long key;
292 msg_t *msg;
293 connection_t *conn;
294
295 /* Setup thread local connection pointer */
296 PS_connection = (connection_t *)arg;
297 conn = PS_connection;
298 conn->cthread(conn->callid, &conn->call);
299
300 /* Remove myself from connection hash table */
301 futex_down(&async_futex);
302 key = conn->in_phone_hash;
303 hash_table_remove(&conn_hash_table, &key, 1);
304 futex_up(&async_futex);
305 /* Answer all remaining messages with ehangup */
306 while (!list_empty(&conn->msg_queue)) {
307 msg = list_get_instance(conn->msg_queue.next, msg_t, link);
308 list_remove(&msg->link);
309 ipc_answer_fast(msg->callid, EHANGUP, 0, 0);
310 free(msg);
311 }
312}
313
314/** Create new thread for a new connection
315 *
316 * Creates new thread for connection, fills in connection
317 * structures and inserts it into the hash table, so that
318 * later we can easily do routing of messages to particular
319 * threads.
320 *
321 * @param in_phone_hash Identification of the incoming connection
322 * @param callid Callid of the IPC_M_CONNECT_ME_TO packet
323 * @param call Call data of the opening packet
324 * @param cthread Thread function that should be called upon
325 * opening the connection
326 * @return New thread id
327 */
328pstid_t async_new_connection(ipcarg_t in_phone_hash,ipc_callid_t callid,
329 ipc_call_t *call,
330 void (*cthread)(ipc_callid_t,ipc_call_t *))
331{
332 pstid_t ptid;
333 connection_t *conn;
334 unsigned long key;
335
336 conn = malloc(sizeof(*conn));
337 if (!conn) {
338 ipc_answer_fast(callid, ENOMEM, 0, 0);
339 return NULL;
340 }
341 conn->in_phone_hash = in_phone_hash;
342 list_initialize(&conn->msg_queue);
343 conn->ptid = psthread_create(connection_thread, conn);
344 conn->callid = callid;
345 if (call)
346 conn->call = *call;
347 conn->active = 1; /* We will activate it asap */
348 conn->cthread = cthread;
349 list_initialize(&conn->link);
350 if (!conn->ptid) {
351 free(conn);
352 ipc_answer_fast(callid, ENOMEM, 0, 0);
353 return NULL;
354 }
355 key = conn->in_phone_hash;
356 futex_down(&async_futex);
357 /* Add connection to hash table */
358 hash_table_insert(&conn_hash_table, &key, &conn->link);
359 futex_up(&async_futex);
360
361 psthread_add_ready(conn->ptid);
362
363 return conn->ptid;
364}
365
366/** Handle call that was received */
367static void handle_call(ipc_callid_t callid, ipc_call_t *call)
368{
369 /* Unrouted call - do some default behaviour */
370 switch (IPC_GET_METHOD(*call)) {
371 case IPC_M_INTERRUPT:
372 interrupt_received(call);
373 return;
374 case IPC_M_CONNECT_ME_TO:
375 /* Open new connection with thread etc. */
376 async_new_connection(IPC_GET_ARG3(*call), callid, call, client_connection);
377 return;
378 }
379
380 /* Try to route call through connection tables */
381 if (route_call(callid, call))
382 return;
383
384 /* Unknown call from unknown phone - hang it up */
385 ipc_answer_fast(callid, EHANGUP, 0, 0);
386}
387
388/** Fire all timeouts that expired */
389static void handle_expired_timeouts(void)
390{
391 struct timeval tv;
392 amsg_t *amsg;
393 link_t *cur;
394
395 gettimeofday(&tv,NULL);
396 futex_down(&async_futex);
397
398 cur = timeout_list.next;
399 while (cur != &timeout_list) {
400 amsg = list_get_instance(cur,amsg_t,link);
401 if (tv_gt(&amsg->expires, &tv))
402 break;
403 cur = cur->next;
404 list_remove(&amsg->link);
405 amsg->has_timeout = 0;
406 /* Redundant condition? The thread should not
407 * be active when it gets here.
408 */
409 if (!amsg->active) {
410 amsg->active = 1;
411 psthread_add_ready(amsg->ptid);
412 }
413 }
414
415 futex_up(&async_futex);
416}
417
418/** Endless loop dispatching incoming calls and answers */
419int async_manager(void)
420{
421 ipc_call_t call;
422 ipc_callid_t callid;
423 int timeout;
424 amsg_t *amsg;
425 struct timeval tv;
426
427 while (1) {
428 if (psthread_schedule_next_adv(PS_FROM_MANAGER)) {
429 futex_up(&async_futex); /* async_futex is always held
430 * when entering manager thread
431 */
432 continue;
433 }
434 futex_down(&async_futex);
435 if (!list_empty(&timeout_list)) {
436 amsg = list_get_instance(timeout_list.next,amsg_t,link);
437 gettimeofday(&tv,NULL);
438 if (tv_gt(&tv, &amsg->expires)) {
439 handle_expired_timeouts();
440 continue;
441 } else
442 timeout = tv_sub(&amsg->expires, &tv);
443 } else
444 timeout = SYNCH_NO_TIMEOUT;
445 futex_up(&async_futex);
446
447 callid = ipc_wait_cycle(&call, timeout, SYNCH_BLOCKING);
448
449 if (!callid) {
450 handle_expired_timeouts();
451 continue;
452 }
453
454 if (callid & IPC_CALLID_ANSWERED)
455 continue;
456
457 handle_call(callid, &call);
458 }
459}
460
461/** Function to start async_manager as a standalone thread
462 *
463 * When more kernel threads are used, one async manager should
464 * exist per thread. The particular implementation may change,
465 * currently one async_manager is started automatically per kernel
466 * thread except main thread.
467 */
468static int async_manager_thread(void *arg)
469{
470 futex_up(&async_futex); /* async_futex is always locked when entering
471 * manager */
472 async_manager();
473}
474
475/** Add one manager to manager list */
476void async_create_manager(void)
477{
478 pstid_t ptid;
479
480 ptid = psthread_create(async_manager_thread, NULL);
481 psthread_add_manager(ptid);
482}
483
484/** Remove one manager from manager list */
485void async_destroy_manager(void)
486{
487 psthread_remove_manager();
488}
489
490/** Initialize internal structures needed for async manager */
491int _async_init(void)
492{
493 if (!hash_table_create(&conn_hash_table, CONN_HASH_TABLE_CHAINS, 1, &conn_hash_table_ops)) {
494 printf("%s: cannot create hash table\n", "async");
495 return ENOMEM;
496 }
497
498}
499
500/** IPC handler for messages in async framework
501 *
502 * Notify thread that is waiting for this message, that it arrived
503 */
504static void reply_received(void *private, int retval,
505 ipc_call_t *data)
506{
507 amsg_t *msg = (amsg_t *) private;
508
509 msg->retval = retval;
510
511 futex_down(&async_futex);
512 /* Copy data after futex_down, just in case the
513 * call was detached
514 */
515 if (msg->dataptr)
516 *msg->dataptr = *data;
517
518 write_barrier();
519 /* Remove message from timeout list */
520 if (msg->has_timeout)
521 list_remove(&msg->link);
522 msg->done = 1;
523 if (! msg->active) {
524 msg->active = 1;
525 psthread_add_ready(msg->ptid);
526 }
527 futex_up(&async_futex);
528}
529
530/** Send message and return id of the sent message
531 *
532 * The return value can be used as input for async_wait() to wait
533 * for completion.
534 */
535aid_t async_send_2(int phoneid, ipcarg_t method, ipcarg_t arg1, ipcarg_t arg2,
536 ipc_call_t *dataptr)
537{
538 amsg_t *msg;
539
540 msg = malloc(sizeof(*msg));
541 msg->active = 1;
542 msg->done = 0;
543 msg->dataptr = dataptr;
544 ipc_call_async_2(phoneid,method,arg1,arg2,msg,reply_received);
545
546 return (aid_t) msg;
547}
548
549/** Wait for a message sent by async framework
550 *
551 * @param amsgid Message ID to wait for
552 * @param retval Pointer to variable where will be stored retval
553 * of the answered message. If NULL, it is ignored.
554 *
555 */
556void async_wait_for(aid_t amsgid, ipcarg_t *retval)
557{
558 amsg_t *msg = (amsg_t *) amsgid;
559 connection_t *conn;
560
561 futex_down(&async_futex);
562 if (msg->done) {
563 futex_up(&async_futex);
564 goto done;
565 }
566
567 msg->ptid = psthread_get_id();
568 msg->active = 0;
569 msg->has_timeout = 0;
570 /* Leave locked async_futex when entering this function */
571 psthread_schedule_next_adv(PS_TO_MANAGER);
572 /* futex is up automatically after psthread_schedule_next...*/
573done:
574 if (retval)
575 *retval = msg->retval;
576 free(msg);
577}
578
579/** Insert sort timeout msg into timeouts list
580 *
581 * Assume async_futex is held
582 */
583static void insert_timeout(amsg_t *msg)
584{
585 link_t *tmp;
586 amsg_t *cur;
587
588 tmp = timeout_list.next;
589 while (tmp != &timeout_list) {
590 cur = list_get_instance(tmp, amsg_t, link);
591 if (tv_gt(&cur->expires, &msg->expires))
592 break;
593 tmp = tmp->next;
594 }
595 list_append(&msg->link, tmp);
596}
597
598/** Wait for a message sent by async framework with timeout
599 *
600 * @param amsgid Message ID to wait for
601 * @param retval Pointer to variable where will be stored retval
602 * of the answered message. If NULL, it is ignored.
603 * @param timeout Timeout in usecs
604 * @return 0 on success, ETIMEOUT if timeout expired
605 *
606 */
607int async_wait_timeout(aid_t amsgid, ipcarg_t *retval, suseconds_t timeout)
608{
609 amsg_t *msg = (amsg_t *) amsgid;
610 connection_t *conn;
611
612 futex_down(&async_futex);
613 if (msg->done) {
614 futex_up(&async_futex);
615 goto done;
616 }
617
618 msg->ptid = psthread_get_id();
619 msg->active = 0;
620 msg->has_timeout = 1;
621
622 gettimeofday(&msg->expires, NULL);
623 tv_add(&msg->expires, timeout);
624 insert_timeout(msg);
625
626 /* Leave locked async_futex when entering this function */
627 psthread_schedule_next_adv(PS_TO_MANAGER);
628 /* futex is up automatically after psthread_schedule_next...*/
629
630 if (!msg->done)
631 return ETIMEOUT;
632
633done:
634 if (retval)
635 *retval = msg->retval;
636 free(msg);
637
638 return 0;
639}
640
641/** Wait specified time, but in the meantime handle incoming events
642 *
643 * @param timeout Time in microseconds to wait
644 */
645void async_usleep(suseconds_t timeout)
646{
647 amsg_t *msg;
648
649 msg = malloc(sizeof(*msg));
650 if (!msg)
651 return;
652
653 msg->ptid = psthread_get_id();
654 msg->active = 0;
655 msg->has_timeout = 1;
656
657 gettimeofday(&msg->expires, NULL);
658 tv_add(&msg->expires, timeout);
659
660 futex_down(&async_futex);
661 insert_timeout(msg);
662 /* Leave locked async_futex when entering this function */
663 psthread_schedule_next_adv(PS_TO_MANAGER);
664 /* futex is up automatically after psthread_schedule_next...*/
665 free(msg);
666}
Note: See TracBrowser for help on using the repository browser.