Changeset 80649a91 in mainline for libc/generic/async.c


Ignore:
Timestamp:
2006-05-21T19:28:37Z (19 years ago)
Author:
Ondrej Palkovsky <ondrap@…>
Branches:
lfn, master, serial, ticket/834-toolchain-update, topic/msim-upgrade, topic/simplify-dev-export
Children:
a410beb
Parents:
1ee11f4
Message:

Merged libadt into libc.
Made lot of psthread and thread stuff thread-safe.
Added new driver framework for easy C connection programming.
Changed FB code to use new API.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • libc/generic/async.c

    r1ee11f4 r80649a91  
    2727 */
    2828
    29 ipc_wait_t call_func(args)
    30 {
    31 }
    32 
    33 ipc_wait_t fire_function(args)
    34 {
    35         stack = malloc(stacksize);
    36         setup(stack);
    37         add_to_list_of_ready_funcs(stack);
    38         if (threads_waiting_for_message)
    39                 send_message_to_self_to_one_up();
    40 }
    41 
    42 void discard_result(ipc_wait_t funcid)
    43 {
    44 }
    45 
    46 int wait_result(ipc_wait_t funcid);
    47 {
    48         save_context(self);
    49 restart:
    50         if result_available() {
    51                 if in_list_of_ready(self):
    52                         tear_off_list(self);
    53                 return retval;
    54         }
    55         add_to_waitlist_of(funcid);
    56 
    57         take_something_from_list_of_ready();
    58         if something {
    59                
    60                 restore_context(something);
    61         } else { /* nothing */
    62                 wait_for_call();
    63                 if (answer) {
    64                         mark_result_ready();
    65                         put_waiting_thread_to_waitlist();
    66 
    67                         goto restart;
     29/**
     30 * Asynchronous library
     31 *
     32 * The aim of this library is facilitating writing programs utilizing
     33 * the asynchronous nature of Helenos IPC, yet using a normal way
     34 * of programming.
     35 *
     36 * You should be able to write very simple multithreaded programs,
     37 * the async framework will automatically take care of most synchronization
     38 * problems.
     39 *
     40 * Default semantics:
     41 * - send() - send asynchronously. If the kernel refuses to send more
     42 *            messages, [ try to get responses from kernel, if nothing
     43 *            found, might try synchronous ]
     44 *
     45 * Example of use:
     46 *
     47 * 1) Multithreaded client application
     48 *  create_thread(thread1);
     49 *  create_thread(thread2);
     50 *  ...
     51 * 
     52 *  thread1() {
     53 *        conn = ipc_connect_me_to();
     54 *        c1 = send(conn);
     55 *        c2 = send(conn);
     56 *        wait_for(c1);
     57 *        wait_for(c2);
     58 *  }
     59 *
     60 *
     61 * 2) Multithreaded server application
     62 * main() {
     63 *      wait_for_connection(new_connection);
     64 * }
     65 *
     66 *
     67 * new_connection(int connection) {
     68 *       accept(connection);
     69 *       msg = get_msg();
     70 *       handle(msg);
     71 *       answer(msg);
     72 *
     73 *       msg = get_msg();
     74 *       ....
     75 * }
     76 */
     77#include <futex.h>
     78#include <async.h>
     79#include <psthread.h>
     80#include <stdio.h>
     81#include <libadt/hash_table.h>
     82#include <libadt/list.h>
     83#include <ipc/ipc.h>
     84#include <assert.h>
     85#include <errno.h>
     86
     87static atomic_t conn_futex = FUTEX_INITIALIZER;
     88static hash_table_t conn_hash_table;
     89
     90typedef struct {
     91        link_t link;
     92        ipc_callid_t callid;
     93        ipc_call_t call;
     94} msg_t;
     95
     96typedef struct {
     97        link_t link;
     98        ipcarg_t in_phone_hash;         /**< Incoming phone hash. */
     99        link_t msg_queue;              /**< Messages that should be delivered to this thread */
     100        pstid_t ptid;                /**< Thread associated with this connection */
     101        int active;                     /**< If this thread is currently active */
     102        int opened;                    /* If the connection was accepted */
     103        /* Structures for connection opening packet */
     104        ipc_callid_t callid;
     105        ipc_call_t call;
     106} connection_t;
     107
     108__thread connection_t *PS_connection;
     109
     110/* Hash table functions */
     111
     112#define ASYNC_HASH_TABLE_CHAINS 32
     113
     114static hash_index_t conn_hash(unsigned long *key)
     115{
     116        assert(key);
     117        return ((*key) >> 4) % ASYNC_HASH_TABLE_CHAINS;
     118}
     119
     120static int conn_compare(unsigned long key[], hash_count_t keys, link_t *item)
     121{
     122        connection_t *hs;
     123
     124        hs = hash_table_get_instance(item, connection_t, link);
     125       
     126        return key[0] == hs->in_phone_hash;
     127}
     128
     129static void conn_remove(link_t *item)
     130{
     131        free(hash_table_get_instance(item, connection_t, link));
     132}
     133
     134
     135/** Operations for NS hash table. */
     136static hash_table_operations_t conn_hash_table_ops = {
     137        .hash = conn_hash,
     138        .compare = conn_compare,
     139        .remove_callback = conn_remove
     140};
     141
     142/** Try to route a call to an appropriate connection thread
     143 *
     144 */
     145static int route_call(ipc_callid_t callid, ipc_call_t *call)
     146{
     147        connection_t *conn;
     148        msg_t *msg;
     149        link_t *hlp;
     150        unsigned long key;
     151
     152        futex_down(&conn_futex);
     153
     154        key = call->in_phone_hash;
     155        hlp = hash_table_find(&conn_hash_table, &key);
     156        if (!hlp) {
     157                futex_up(&conn_futex);
     158                return 0;
     159        }
     160        conn = hash_table_get_instance(hlp, connection_t, link);
     161
     162        msg = malloc(sizeof(*msg));
     163        msg->callid = callid;
     164        msg->call = *call;
     165        list_append(&msg->link, &conn->msg_queue);
     166       
     167        if (!conn->active) {
     168                conn->active = 1;
     169                psthread_add_ready(conn->ptid);
     170        }
     171
     172        futex_up(&conn_futex);
     173
     174        return 1;
     175}
     176
     177ipc_callid_t async_get_call(ipc_call_t *call)
     178{
     179        msg_t *msg;
     180        ipc_callid_t callid;
     181        connection_t *conn;
     182       
     183        futex_down(&conn_futex);
     184
     185        conn = PS_connection;
     186        /* If nothing in queue, wait until something appears */
     187        if (list_empty(&conn->msg_queue)) {
     188                conn->active = 0;
     189                psthread_schedule_next_adv(PS_TO_MANAGER);
     190        }
     191       
     192        msg = list_get_instance(conn->msg_queue.next, msg_t, link);
     193        list_remove(&msg->link);
     194        callid = msg->callid;
     195        *call = msg->call;
     196        free(msg);
     197       
     198        futex_up(&conn_futex);
     199        return callid;
     200}
     201
     202void client_connection(ipc_callid_t callid, ipc_call_t *call)
     203{
     204        printf("Got connection - no handler.\n");
     205        _exit(1);
     206}
     207
     208static int connection_thread(void  *arg)
     209{
     210        /* Setup thread local connection pointer */
     211        PS_connection = (connection_t *)arg;
     212        client_connection(PS_connection->callid, &PS_connection->call);
     213
     214        futex_down(&conn_futex);
     215        /* TODO: remove myself from connection hash table */
     216        futex_up(&conn_futex);
     217        /* TODO: answer all unanswered messages in queue with
     218         *       EHANGUP */
     219}
     220
     221/** Create new thread for a new connection
     222 *
     223 * Creates new thread for connection, fills in connection
     224 * structures and inserts it into the hash table, so that
     225 * later we can easily do routing of messages to particular
     226 * threads.
     227 */
     228static void new_connection(ipc_callid_t callid, ipc_call_t *call)
     229{
     230        pstid_t ptid;
     231        connection_t *conn;
     232        unsigned long key;
     233
     234        conn = malloc(sizeof(*conn));
     235        if (!conn) {
     236                ipc_answer_fast(callid, ENOMEM, 0, 0);
     237                return;
     238        }
     239        conn->in_phone_hash = IPC_GET_ARG3(*call);
     240        list_initialize(&conn->msg_queue);
     241        conn->opened = 0;
     242        conn->ptid = psthread_create(connection_thread, conn);
     243        conn->callid = callid;
     244        conn->call = *call;
     245        conn->active = 1; /* We will activate it asap */
     246        list_initialize(&conn->link);
     247        if (!conn->ptid) {
     248                free(conn);
     249                ipc_answer_fast(callid, ENOMEM, 0, 0);
     250                return;
     251        }
     252        key = conn->in_phone_hash;
     253        futex_down(&conn_futex);
     254        /* Add connection to hash table */
     255        hash_table_insert(&conn_hash_table, &key, &conn->link);
     256        futex_up(&conn_futex);
     257
     258        psthread_add_ready(conn->ptid);
     259}
     260
     261/** Handle call to a task */
     262static void handle_call(ipc_callid_t callid, ipc_call_t *call)
     263{
     264        if (route_call(callid, call))
     265                return;
     266
     267        switch (IPC_GET_METHOD(*call)) {
     268        case IPC_M_INTERRUPT:
     269                break;
     270        case IPC_M_CONNECT_ME_TO:
     271                /* Open new connection with thread etc. */
     272                new_connection(callid, call);
     273                break;
     274        default:
     275                ipc_answer_fast(callid, EHANGUP, 0, 0);
     276        }
     277}
     278
     279/** Endless loop dispatching incoming calls and answers */
     280int async_manager()
     281{
     282        ipc_call_t call;
     283        ipc_callid_t callid;
     284
     285        while (1) {
     286                if (psthread_schedule_next_adv(PS_FROM_MANAGER)) {
     287                        futex_up(&conn_futex); /* conn_futex is always held
     288                                                * when entering manager thread
     289                                                */
     290                        continue;
    68291                }
    69         }
    70        
    71 }
    72 
    73 
    74 int ipc_call_sync(args)
    75 {
    76         return ipc_wait(call_func(args));
    77 }
     292                callid = ipc_wait_cycle(&call,SYNCH_NO_TIMEOUT,SYNCH_BLOCKING);
     293
     294                if (callid & IPC_CALLID_ANSWERED)
     295                        continue;
     296                handle_call(callid, &call);
     297        }
     298}
     299
     300static int async_manager_thread(void *arg)
     301{
     302        futex_up(&conn_futex); /* conn_futex is always locked when entering
     303                                * manager */
     304        async_manager();
     305}
     306
     307/** Add one manager to manager list */
     308void async_create_manager(void)
     309{
     310        pstid_t ptid;
     311
     312        ptid = psthread_create(async_manager_thread, NULL);
     313        psthread_add_manager(ptid);
     314}
     315
     316/** Remove one manager from manager list */
     317void async_destroy_manager(void)
     318{
     319        psthread_remove_manager();
     320}
     321
     322/** Initialize internal structures needed for async manager */
     323int _async_init(void)
     324{
     325        if (!hash_table_create(&conn_hash_table, ASYNC_HASH_TABLE_CHAINS, 1, &conn_hash_table_ops)) {
     326                printf("%s: cannot create hash table\n", "async");
     327                return ENOMEM;
     328        }
     329       
     330}
Note: See TracChangeset for help on using the changeset viewer.