async.c

Go to the documentation of this file.
00001 /*
00002  * Copyright (C) 2006 Ondrej Palkovsky
00003  * All rights reserved.
00004  *
00005  * Redistribution and use in source and binary forms, with or without
00006  * modification, are permitted provided that the following conditions
00007  * are met:
00008  *
00009  * - Redistributions of source code must retain the above copyright
00010  *   notice, this list of conditions and the following disclaimer.
00011  * - Redistributions in binary form must reproduce the above copyright
00012  *   notice, this list of conditions and the following disclaimer in the
00013  *   documentation and/or other materials provided with the distribution.
00014  * - The name of the author may not be used to endorse or promote products
00015  *   derived from this software without specific prior written permission.
00016  *
00017  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
00018  * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
00019  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
00020  * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
00021  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
00022  * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
00023  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
00024  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
00025  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
00026  * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
00027  */
00028 
00090 #include <futex.h>
00091 #include <async.h>
00092 #include <psthread.h>
00093 #include <stdio.h>
00094 #include <libadt/hash_table.h>
00095 #include <libadt/list.h>
00096 #include <ipc/ipc.h>
00097 #include <assert.h>
00098 #include <errno.h>
00099 #include <time.h>
00100 #include <arch/barrier.h>
00101 
00102 atomic_t async_futex = FUTEX_INITIALIZER;
00103 static hash_table_t conn_hash_table;
00104 static LIST_INITIALIZE(timeout_list);
00105 
00106 typedef struct {
00107         struct timeval expires;      
00108         int inlist;             
00109         link_t link;
00110 
00111         pstid_t ptid;                
00112         int active;                  
00113         int timedout;                
00114 } awaiter_t;
00115 
00116 typedef struct {
00117         awaiter_t wdata;
00118 
00119         int done;                    
00120         ipc_call_t *dataptr;         
00122         ipcarg_t retval;
00123 } amsg_t;
00124 
00125 typedef struct {
00126         link_t link;
00127         ipc_callid_t callid;
00128         ipc_call_t call;
00129 } msg_t;
00130 
00131 typedef struct {
00132         awaiter_t wdata;
00133 
00134         link_t link;             
00135         ipcarg_t in_phone_hash;  
00136         link_t msg_queue;        
00137         /* Structures for connection opening packet */
00138         ipc_callid_t callid;
00139         ipc_call_t call;
00140         ipc_callid_t close_callid; /* Identification of closing packet */
00141         void (*cthread)(ipc_callid_t,ipc_call_t *);
00142 } connection_t;
00143 
00145 __thread connection_t *PS_connection;
00148 __thread int in_interrupt_handler;
00149 
00150 static void default_client_connection(ipc_callid_t callid, ipc_call_t *call);
00151 static void default_interrupt_received(ipc_callid_t callid, ipc_call_t *call);
00152 static async_client_conn_t client_connection = default_client_connection;
00153 static async_client_conn_t interrupt_received = default_interrupt_received;
00154 
00156 static void tv_add(struct timeval *tv, suseconds_t usecs)
00157 {
00158         tv->tv_sec += usecs / 1000000;
00159         tv->tv_usec += usecs % 1000000;
00160         if (tv->tv_usec > 1000000) {
00161                 tv->tv_sec++;
00162                 tv->tv_usec -= 1000000;
00163         }
00164 }
00165 
00167 static suseconds_t tv_sub(struct timeval *tv1, struct timeval *tv2)
00168 {
00169         suseconds_t result;
00170 
00171         result = tv1->tv_usec - tv2->tv_usec;
00172         result += (tv1->tv_sec - tv2->tv_sec) * 1000000;
00173 
00174         return result;
00175 }
00176 
00181 static int tv_gt(struct timeval *tv1, struct timeval *tv2)
00182 {
00183         if (tv1->tv_sec > tv2->tv_sec)
00184                 return 1;
00185         if (tv1->tv_sec == tv2->tv_sec && tv1->tv_usec > tv2->tv_usec)
00186                 return 1;
00187         return 0;
00188 }
00189 static int tv_gteq(struct timeval *tv1, struct timeval *tv2)
00190 {
00191         if (tv1->tv_sec > tv2->tv_sec)
00192                 return 1;
00193         if (tv1->tv_sec == tv2->tv_sec && tv1->tv_usec >= tv2->tv_usec)
00194                 return 1;
00195         return 0;
00196 }
00197 
00198 /* Hash table functions */
00199 #define CONN_HASH_TABLE_CHAINS  32
00200 
00201 static hash_index_t conn_hash(unsigned long *key)
00202 {
00203         assert(key);
00204         return ((*key) >> 4) % CONN_HASH_TABLE_CHAINS;
00205 }
00206 
00207 static int conn_compare(unsigned long key[], hash_count_t keys, link_t *item)
00208 {
00209         connection_t *hs;
00210 
00211         hs = hash_table_get_instance(item, connection_t, link);
00212         
00213         return key[0] == hs->in_phone_hash;
00214 }
00215 
00216 static void conn_remove(link_t *item)
00217 {
00218         free(hash_table_get_instance(item, connection_t, link));
00219 }
00220 
00221 
00223 static hash_table_operations_t conn_hash_table_ops = {
00224         .hash = conn_hash,
00225         .compare = conn_compare,
00226         .remove_callback = conn_remove
00227 };
00228 
00232 static void insert_timeout(awaiter_t *wd)
00233 {
00234         link_t *tmp;
00235         awaiter_t *cur;
00236 
00237         wd->timedout = 0;
00238         wd->inlist = 1;
00239 
00240         tmp = timeout_list.next;
00241         while (tmp != &timeout_list) {
00242                 cur = list_get_instance(tmp, awaiter_t, link);
00243                 if (tv_gteq(&cur->expires, &wd->expires))
00244                         break;
00245                 tmp = tmp->next;
00246         }
00247         list_append(&wd->link, tmp);
00248 }
00249 
00250 /*************************************************/
00251 
00255 static int route_call(ipc_callid_t callid, ipc_call_t *call)
00256 {
00257         connection_t *conn;
00258         msg_t *msg;
00259         link_t *hlp;
00260         unsigned long key;
00261 
00262         futex_down(&async_futex);
00263 
00264         key = call->in_phone_hash;
00265         hlp = hash_table_find(&conn_hash_table, &key);
00266         if (!hlp) {
00267                 futex_up(&async_futex);
00268                 return 0;
00269         }
00270         conn = hash_table_get_instance(hlp, connection_t, link);
00271 
00272         msg = malloc(sizeof(*msg));
00273         msg->callid = callid;
00274         msg->call = *call;
00275         list_append(&msg->link, &conn->msg_queue);
00276 
00277         if (IPC_GET_METHOD(*call) == IPC_M_PHONE_HUNGUP)
00278                 conn->close_callid = callid;
00279         
00280         /* If the call is waiting for event, run it */
00281         if (!conn->wdata.active) {
00282                 /* If in timeout list, remove it */
00283                 if (conn->wdata.inlist) {
00284                         conn->wdata.inlist = 0;
00285                         list_remove(&conn->wdata.link);
00286                 }
00287                 conn->wdata.active = 1;
00288                 psthread_add_ready(conn->wdata.ptid);
00289         }
00290 
00291         futex_up(&async_futex);
00292 
00293         return 1;
00294 }
00295 
00297 ipc_callid_t async_get_call_timeout(ipc_call_t *call, suseconds_t usecs)
00298 {
00299         msg_t *msg;
00300         ipc_callid_t callid;
00301         connection_t *conn;
00302         
00303         assert(PS_connection);
00304         /* GCC 4.1.0 coughs on PS_connection-> dereference,
00305          * GCC 4.1.1 happilly puts the rdhwr instruction in delay slot.
00306          *           I would never expect to find so many errors in 
00307          *           compiler *($&$(*&$
00308          */
00309         conn = PS_connection; 
00310 
00311         futex_down(&async_futex);
00312 
00313         if (usecs) {
00314                 gettimeofday(&conn->wdata.expires, NULL);
00315                 tv_add(&conn->wdata.expires, usecs);
00316         } else {
00317                 conn->wdata.inlist = 0;
00318         }
00319         /* If nothing in queue, wait until something appears */
00320         while (list_empty(&conn->msg_queue)) {
00321                 if (usecs)
00322                         insert_timeout(&conn->wdata);
00323 
00324                 conn->wdata.active = 0;
00325                 psthread_schedule_next_adv(PS_TO_MANAGER);
00326                 /* Futex is up after getting back from async_manager 
00327                  * get it again */
00328                 futex_down(&async_futex);
00329                 if (usecs && conn->wdata.timedout && \
00330                     list_empty(&conn->msg_queue)) {
00331                         /* If we timed out-> exit */
00332                         futex_up(&async_futex);
00333                         return 0;
00334                 }
00335         }
00336         
00337         msg = list_get_instance(conn->msg_queue.next, msg_t, link);
00338         list_remove(&msg->link);
00339         callid = msg->callid;
00340         *call = msg->call;
00341         free(msg);
00342         
00343         futex_up(&async_futex);
00344         return callid;
00345 }
00346 
00352 static void default_client_connection(ipc_callid_t callid, ipc_call_t *call)
00353 {
00354         ipc_answer_fast(callid, ENOENT, 0, 0);
00355 }
00356 static void default_interrupt_received(ipc_callid_t callid, ipc_call_t *call)
00357 {
00358 }
00359 
00367 static int connection_thread(void  *arg)
00368 {
00369         unsigned long key;
00370         msg_t *msg;
00371         int close_answered = 0;
00372 
00373         /* Setup thread local connection pointer */
00374         PS_connection = (connection_t *)arg;
00375         PS_connection->cthread(PS_connection->callid, &PS_connection->call);
00376         
00377         /* Remove myself from connection hash table */
00378         futex_down(&async_futex);
00379         key = PS_connection->in_phone_hash;
00380         hash_table_remove(&conn_hash_table, &key, 1);
00381         futex_up(&async_futex);
00382         
00383         /* Answer all remaining messages with ehangup */
00384         while (!list_empty(&PS_connection->msg_queue)) {
00385                 msg = list_get_instance(PS_connection->msg_queue.next, msg_t, link);
00386                 list_remove(&msg->link);
00387                 if (msg->callid == PS_connection->close_callid)
00388                         close_answered = 1;
00389                 ipc_answer_fast(msg->callid, EHANGUP, 0, 0);
00390                 free(msg);
00391         }
00392         if (PS_connection->close_callid)
00393                 ipc_answer_fast(PS_connection->close_callid, 0, 0, 0);
00394         
00395         return 0;
00396 }
00397 
00412 pstid_t async_new_connection(ipcarg_t in_phone_hash,ipc_callid_t callid, ipc_call_t *call, void (*cthread)(ipc_callid_t, ipc_call_t *))
00413 {
00414         connection_t *conn;
00415         unsigned long key;
00416 
00417         conn = malloc(sizeof(*conn));
00418         if (!conn) {
00419                 ipc_answer_fast(callid, ENOMEM, 0, 0);
00420                 return NULL;
00421         }
00422         conn->in_phone_hash = in_phone_hash;
00423         list_initialize(&conn->msg_queue);
00424         conn->callid = callid;
00425         conn->close_callid = 0;
00426         if (call)
00427                 conn->call = *call;
00428         conn->wdata.active = 1; /* We will activate it asap */
00429         conn->cthread = cthread;
00430 
00431         conn->wdata.ptid = psthread_create(connection_thread, conn);
00432         if (!conn->wdata.ptid) {
00433                 free(conn);
00434                 ipc_answer_fast(callid, ENOMEM, 0, 0);
00435                 return NULL;
00436         }
00437         /* Add connection to hash table */
00438         key = conn->in_phone_hash;
00439         futex_down(&async_futex);
00440         hash_table_insert(&conn_hash_table, &key, &conn->link);
00441         futex_up(&async_futex);
00442 
00443         psthread_add_ready(conn->wdata.ptid);
00444 
00445         return conn->wdata.ptid;
00446 }
00447 
00449 static void handle_call(ipc_callid_t callid, ipc_call_t *call)
00450 {
00451         /* Unrouted call - do some default behaviour */
00452         if ((callid & IPC_CALLID_NOTIFICATION)) {
00453                 in_interrupt_handler = 1;
00454                 (*interrupt_received)(callid,call);
00455                 in_interrupt_handler = 0;
00456                 return;
00457         }               
00458 
00459         switch (IPC_GET_METHOD(*call)) {
00460         case IPC_M_CONNECT_ME_TO:
00461                 /* Open new connection with thread etc. */
00462                 async_new_connection(IPC_GET_ARG3(*call), callid, call, client_connection);
00463                 return;
00464         }
00465 
00466         /* Try to route call through connection tables */
00467         if (route_call(callid, call))
00468                 return;
00469 
00470         /* Unknown call from unknown phone - hang it up */
00471         ipc_answer_fast(callid, EHANGUP, 0, 0);
00472 }
00473 
00477 static void handle_expired_timeouts(void)
00478 {
00479         struct timeval tv;
00480         awaiter_t *waiter;
00481         link_t *cur;
00482 
00483         gettimeofday(&tv,NULL);
00484         futex_down(&async_futex);
00485 
00486         cur = timeout_list.next;
00487         while (cur != &timeout_list) {
00488                 waiter = list_get_instance(cur,awaiter_t,link);
00489                 if (tv_gt(&waiter->expires, &tv))
00490                         break;
00491                 cur = cur->next;
00492                 list_remove(&waiter->link);
00493                 waiter->inlist = 0;
00494                 waiter->timedout = 1;
00495                 /* Redundant condition? The thread should not
00496                  * be active when it gets here.
00497                  */
00498                 if (!waiter->active) {
00499                         waiter->active = 1;
00500                         psthread_add_ready(waiter->ptid);
00501                 }
00502         }
00503 
00504         futex_up(&async_futex);
00505 }
00506 
00508 static int async_manager_worker(void)
00509 {
00510         ipc_call_t call;
00511         ipc_callid_t callid;
00512         int timeout;
00513         awaiter_t *waiter;
00514         struct timeval tv;
00515 
00516         while (1) {
00517                 if (psthread_schedule_next_adv(PS_FROM_MANAGER)) {
00518                         futex_up(&async_futex); 
00519                         /* async_futex is always held
00520                          * when entering manager thread
00521                          */
00522                         continue;
00523                 }
00524                 futex_down(&async_futex);
00525                 if (!list_empty(&timeout_list)) {
00526                         waiter = list_get_instance(timeout_list.next,awaiter_t,link);
00527                         gettimeofday(&tv,NULL);
00528                         if (tv_gteq(&tv, &waiter->expires)) {
00529                                 futex_up(&async_futex);
00530                                 handle_expired_timeouts();
00531                                 continue;
00532                         } else
00533                                 timeout = tv_sub(&waiter->expires, &tv);
00534                 } else
00535                         timeout = SYNCH_NO_TIMEOUT;
00536                 futex_up(&async_futex);
00537 
00538                 callid = ipc_wait_cycle(&call, timeout, SYNCH_FLAGS_NONE);
00539 
00540                 if (!callid) {
00541                         handle_expired_timeouts();
00542                         continue;
00543                 }
00544 
00545                 if (callid & IPC_CALLID_ANSWERED) {
00546                         continue;
00547                 }
00548 
00549                 handle_call(callid, &call);
00550         }
00551         
00552         return 0;
00553 }
00554 
00562 static int async_manager_thread(void *arg)
00563 {
00564         futex_up(&async_futex);
00565         /* async_futex is always locked when entering
00566          * manager */
00567         async_manager_worker();
00568         
00569         return 0;
00570 }
00571 
00573 void async_create_manager(void)
00574 {
00575         pstid_t ptid;
00576 
00577         ptid = psthread_create(async_manager_thread, NULL);
00578         psthread_add_manager(ptid);
00579 }
00580 
00582 void async_destroy_manager(void)
00583 {
00584         psthread_remove_manager();
00585 }
00586 
00588 int _async_init(void)
00589 {
00590         if (!hash_table_create(&conn_hash_table, CONN_HASH_TABLE_CHAINS, 1, &conn_hash_table_ops)) {
00591                 printf("%s: cannot create hash table\n", "async");
00592                 return ENOMEM;
00593         }
00594         
00595         return 0;
00596 }
00597 
00602 static void reply_received(void *private, int retval,
00603                            ipc_call_t *data)
00604 {
00605         amsg_t *msg = (amsg_t *) private;
00606 
00607         msg->retval = retval;
00608 
00609         futex_down(&async_futex);
00610         /* Copy data after futex_down, just in case the
00611          * call was detached 
00612          */
00613         if (msg->dataptr)
00614                 *msg->dataptr = *data; 
00615 
00616         write_barrier();
00617         /* Remove message from timeout list */
00618         if (msg->wdata.inlist)
00619                 list_remove(&msg->wdata.link);
00620         msg->done = 1;
00621         if (! msg->wdata.active) {
00622                 msg->wdata.active = 1;
00623                 psthread_add_ready(msg->wdata.ptid);
00624         }
00625         futex_up(&async_futex);
00626 }
00627 
00633 aid_t async_send_2(int phoneid, ipcarg_t method, ipcarg_t arg1, ipcarg_t arg2,
00634                    ipc_call_t *dataptr)
00635 {
00636         amsg_t *msg;
00637 
00638         if (in_interrupt_handler) {
00639                 printf("Cannot send asynchronou request in interrupt handler.\n");
00640                 _exit(1);
00641         }
00642 
00643         msg = malloc(sizeof(*msg));
00644         msg->done = 0;
00645         msg->dataptr = dataptr;
00646 
00647         msg->wdata.active = 1; /* We may sleep in next method, but it
00648                                 * will use it's own mechanism */
00649         ipc_call_async_2(phoneid,method,arg1,arg2,msg,reply_received,1);
00650 
00651         return (aid_t) msg;
00652 }
00653 
00659 aid_t async_send_3(int phoneid, ipcarg_t method, ipcarg_t arg1, ipcarg_t arg2,
00660                    ipcarg_t arg3, ipc_call_t *dataptr)
00661 {
00662         amsg_t *msg;
00663 
00664         if (in_interrupt_handler) {
00665                 printf("Cannot send asynchronou request in interrupt handler.\n");
00666                 _exit(1);
00667         }
00668 
00669         msg = malloc(sizeof(*msg));
00670         msg->done = 0;
00671         msg->dataptr = dataptr;
00672 
00673         msg->wdata.active = 1; /* We may sleep in next method, but it
00674                                 * will use it's own mechanism */
00675         ipc_call_async_3(phoneid,method,arg1,arg2,arg3, msg,reply_received,1);
00676 
00677         return (aid_t) msg;
00678 }
00679 
00687 void async_wait_for(aid_t amsgid, ipcarg_t *retval)
00688 {
00689         amsg_t *msg = (amsg_t *) amsgid;
00690 
00691         futex_down(&async_futex);
00692         if (msg->done) {
00693                 futex_up(&async_futex);
00694                 goto done;
00695         }
00696 
00697         msg->wdata.ptid = psthread_get_id();
00698         msg->wdata.active = 0;
00699         msg->wdata.inlist = 0;
00700         /* Leave locked async_futex when entering this function */
00701         psthread_schedule_next_adv(PS_TO_MANAGER);
00702         /* futex is up automatically after psthread_schedule_next...*/
00703 done:
00704         if (retval)
00705                 *retval = msg->retval;
00706         free(msg);
00707 }
00708 
00718 int async_wait_timeout(aid_t amsgid, ipcarg_t *retval, suseconds_t timeout)
00719 {
00720         amsg_t *msg = (amsg_t *) amsgid;
00721 
00722         /* TODO: Let it go through the event read at least once */
00723         if (timeout < 0)
00724                 return ETIMEOUT;
00725 
00726         futex_down(&async_futex);
00727         if (msg->done) {
00728                 futex_up(&async_futex);
00729                 goto done;
00730         }
00731 
00732         gettimeofday(&msg->wdata.expires, NULL);
00733         tv_add(&msg->wdata.expires, timeout);
00734 
00735         msg->wdata.ptid = psthread_get_id();
00736         msg->wdata.active = 0;
00737         insert_timeout(&msg->wdata);
00738 
00739         /* Leave locked async_futex when entering this function */
00740         psthread_schedule_next_adv(PS_TO_MANAGER);
00741         /* futex is up automatically after psthread_schedule_next...*/
00742 
00743         if (!msg->done)
00744                 return ETIMEOUT;
00745 
00746 done:
00747         if (retval)
00748                 *retval = msg->retval;
00749         free(msg);
00750 
00751         return 0;
00752 }
00753 
00758 void async_usleep(suseconds_t timeout)
00759 {
00760         amsg_t *msg;
00761         
00762         if (in_interrupt_handler) {
00763                 printf("Cannot call async_usleep in interrupt handler.\n");
00764                 _exit(1);
00765         }
00766 
00767         msg = malloc(sizeof(*msg));
00768         if (!msg)
00769                 return;
00770 
00771         msg->wdata.ptid = psthread_get_id();
00772         msg->wdata.active = 0;
00773 
00774         gettimeofday(&msg->wdata.expires, NULL);
00775         tv_add(&msg->wdata.expires, timeout);
00776 
00777         futex_down(&async_futex);
00778         insert_timeout(&msg->wdata);
00779         /* Leave locked async_futex when entering this function */
00780         psthread_schedule_next_adv(PS_TO_MANAGER);
00781         /* futex is up automatically after psthread_schedule_next...*/
00782         free(msg);
00783 }
00784 
00789 void async_set_client_connection(async_client_conn_t conn)
00790 {
00791         client_connection = conn;
00792 }
00793 void async_set_interrupt_received(async_client_conn_t conn)
00794 {
00795         interrupt_received = conn;
00796 }
00797 
00798 /* Primitive functions for simple communication */
00799 void async_msg_3(int phoneid, ipcarg_t method, ipcarg_t arg1,
00800                  ipcarg_t arg2, ipcarg_t arg3)
00801 {
00802         ipc_call_async_3(phoneid, method, arg1, arg2, arg3, NULL, NULL, !in_interrupt_handler);
00803 }
00804 
00805 void async_msg_2(int phoneid, ipcarg_t method, ipcarg_t arg1, ipcarg_t arg2)
00806 {
00807         ipc_call_async_2(phoneid, method, arg1, arg2, NULL, NULL, !in_interrupt_handler);
00808 }
00809 
00810 

Generated on Sun Jun 18 18:00:18 2006 for HelenOS Userspace (ia64) by  doxygen 1.4.6