Changeset 80649a91 in mainline for libc/generic/async.c
- Timestamp:
- 2006-05-21T19:28:37Z (19 years ago)
- Branches:
- lfn, master, serial, ticket/834-toolchain-update, topic/msim-upgrade, topic/simplify-dev-export
- Children:
- a410beb
- Parents:
- 1ee11f4
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
libc/generic/async.c
r1ee11f4 r80649a91 27 27 */ 28 28 29 ipc_wait_t call_func(args) 30 { 31 } 32 33 ipc_wait_t fire_function(args) 34 { 35 stack = malloc(stacksize); 36 setup(stack); 37 add_to_list_of_ready_funcs(stack); 38 if (threads_waiting_for_message) 39 send_message_to_self_to_one_up(); 40 } 41 42 void discard_result(ipc_wait_t funcid) 43 { 44 } 45 46 int wait_result(ipc_wait_t funcid); 47 { 48 save_context(self); 49 restart: 50 if result_available() { 51 if in_list_of_ready(self): 52 tear_off_list(self); 53 return retval; 54 } 55 add_to_waitlist_of(funcid); 56 57 take_something_from_list_of_ready(); 58 if something { 59 60 restore_context(something); 61 } else { /* nothing */ 62 wait_for_call(); 63 if (answer) { 64 mark_result_ready(); 65 put_waiting_thread_to_waitlist(); 66 67 goto restart; 29 /** 30 * Asynchronous library 31 * 32 * The aim of this library is facilitating writing programs utilizing 33 * the asynchronous nature of Helenos IPC, yet using a normal way 34 * of programming. 35 * 36 * You should be able to write very simple multithreaded programs, 37 * the async framework will automatically take care of most synchronization 38 * problems. 39 * 40 * Default semantics: 41 * - send() - send asynchronously. If the kernel refuses to send more 42 * messages, [ try to get responses from kernel, if nothing 43 * found, might try synchronous ] 44 * 45 * Example of use: 46 * 47 * 1) Multithreaded client application 48 * create_thread(thread1); 49 * create_thread(thread2); 50 * ... 51 * 52 * thread1() { 53 * conn = ipc_connect_me_to(); 54 * c1 = send(conn); 55 * c2 = send(conn); 56 * wait_for(c1); 57 * wait_for(c2); 58 * } 59 * 60 * 61 * 2) Multithreaded server application 62 * main() { 63 * wait_for_connection(new_connection); 64 * } 65 * 66 * 67 * new_connection(int connection) { 68 * accept(connection); 69 * msg = get_msg(); 70 * handle(msg); 71 * answer(msg); 72 * 73 * msg = get_msg(); 74 * .... 75 * } 76 */ 77 #include <futex.h> 78 #include <async.h> 79 #include <psthread.h> 80 #include <stdio.h> 81 #include <libadt/hash_table.h> 82 #include <libadt/list.h> 83 #include <ipc/ipc.h> 84 #include <assert.h> 85 #include <errno.h> 86 87 static atomic_t conn_futex = FUTEX_INITIALIZER; 88 static hash_table_t conn_hash_table; 89 90 typedef struct { 91 link_t link; 92 ipc_callid_t callid; 93 ipc_call_t call; 94 } msg_t; 95 96 typedef struct { 97 link_t link; 98 ipcarg_t in_phone_hash; /**< Incoming phone hash. */ 99 link_t msg_queue; /**< Messages that should be delivered to this thread */ 100 pstid_t ptid; /**< Thread associated with this connection */ 101 int active; /**< If this thread is currently active */ 102 int opened; /* If the connection was accepted */ 103 /* Structures for connection opening packet */ 104 ipc_callid_t callid; 105 ipc_call_t call; 106 } connection_t; 107 108 __thread connection_t *PS_connection; 109 110 /* Hash table functions */ 111 112 #define ASYNC_HASH_TABLE_CHAINS 32 113 114 static hash_index_t conn_hash(unsigned long *key) 115 { 116 assert(key); 117 return ((*key) >> 4) % ASYNC_HASH_TABLE_CHAINS; 118 } 119 120 static int conn_compare(unsigned long key[], hash_count_t keys, link_t *item) 121 { 122 connection_t *hs; 123 124 hs = hash_table_get_instance(item, connection_t, link); 125 126 return key[0] == hs->in_phone_hash; 127 } 128 129 static void conn_remove(link_t *item) 130 { 131 free(hash_table_get_instance(item, connection_t, link)); 132 } 133 134 135 /** Operations for NS hash table. */ 136 static hash_table_operations_t conn_hash_table_ops = { 137 .hash = conn_hash, 138 .compare = conn_compare, 139 .remove_callback = conn_remove 140 }; 141 142 /** Try to route a call to an appropriate connection thread 143 * 144 */ 145 static int route_call(ipc_callid_t callid, ipc_call_t *call) 146 { 147 connection_t *conn; 148 msg_t *msg; 149 link_t *hlp; 150 unsigned long key; 151 152 futex_down(&conn_futex); 153 154 key = call->in_phone_hash; 155 hlp = hash_table_find(&conn_hash_table, &key); 156 if (!hlp) { 157 futex_up(&conn_futex); 158 return 0; 159 } 160 conn = hash_table_get_instance(hlp, connection_t, link); 161 162 msg = malloc(sizeof(*msg)); 163 msg->callid = callid; 164 msg->call = *call; 165 list_append(&msg->link, &conn->msg_queue); 166 167 if (!conn->active) { 168 conn->active = 1; 169 psthread_add_ready(conn->ptid); 170 } 171 172 futex_up(&conn_futex); 173 174 return 1; 175 } 176 177 ipc_callid_t async_get_call(ipc_call_t *call) 178 { 179 msg_t *msg; 180 ipc_callid_t callid; 181 connection_t *conn; 182 183 futex_down(&conn_futex); 184 185 conn = PS_connection; 186 /* If nothing in queue, wait until something appears */ 187 if (list_empty(&conn->msg_queue)) { 188 conn->active = 0; 189 psthread_schedule_next_adv(PS_TO_MANAGER); 190 } 191 192 msg = list_get_instance(conn->msg_queue.next, msg_t, link); 193 list_remove(&msg->link); 194 callid = msg->callid; 195 *call = msg->call; 196 free(msg); 197 198 futex_up(&conn_futex); 199 return callid; 200 } 201 202 void client_connection(ipc_callid_t callid, ipc_call_t *call) 203 { 204 printf("Got connection - no handler.\n"); 205 _exit(1); 206 } 207 208 static int connection_thread(void *arg) 209 { 210 /* Setup thread local connection pointer */ 211 PS_connection = (connection_t *)arg; 212 client_connection(PS_connection->callid, &PS_connection->call); 213 214 futex_down(&conn_futex); 215 /* TODO: remove myself from connection hash table */ 216 futex_up(&conn_futex); 217 /* TODO: answer all unanswered messages in queue with 218 * EHANGUP */ 219 } 220 221 /** Create new thread for a new connection 222 * 223 * Creates new thread for connection, fills in connection 224 * structures and inserts it into the hash table, so that 225 * later we can easily do routing of messages to particular 226 * threads. 227 */ 228 static void new_connection(ipc_callid_t callid, ipc_call_t *call) 229 { 230 pstid_t ptid; 231 connection_t *conn; 232 unsigned long key; 233 234 conn = malloc(sizeof(*conn)); 235 if (!conn) { 236 ipc_answer_fast(callid, ENOMEM, 0, 0); 237 return; 238 } 239 conn->in_phone_hash = IPC_GET_ARG3(*call); 240 list_initialize(&conn->msg_queue); 241 conn->opened = 0; 242 conn->ptid = psthread_create(connection_thread, conn); 243 conn->callid = callid; 244 conn->call = *call; 245 conn->active = 1; /* We will activate it asap */ 246 list_initialize(&conn->link); 247 if (!conn->ptid) { 248 free(conn); 249 ipc_answer_fast(callid, ENOMEM, 0, 0); 250 return; 251 } 252 key = conn->in_phone_hash; 253 futex_down(&conn_futex); 254 /* Add connection to hash table */ 255 hash_table_insert(&conn_hash_table, &key, &conn->link); 256 futex_up(&conn_futex); 257 258 psthread_add_ready(conn->ptid); 259 } 260 261 /** Handle call to a task */ 262 static void handle_call(ipc_callid_t callid, ipc_call_t *call) 263 { 264 if (route_call(callid, call)) 265 return; 266 267 switch (IPC_GET_METHOD(*call)) { 268 case IPC_M_INTERRUPT: 269 break; 270 case IPC_M_CONNECT_ME_TO: 271 /* Open new connection with thread etc. */ 272 new_connection(callid, call); 273 break; 274 default: 275 ipc_answer_fast(callid, EHANGUP, 0, 0); 276 } 277 } 278 279 /** Endless loop dispatching incoming calls and answers */ 280 int async_manager() 281 { 282 ipc_call_t call; 283 ipc_callid_t callid; 284 285 while (1) { 286 if (psthread_schedule_next_adv(PS_FROM_MANAGER)) { 287 futex_up(&conn_futex); /* conn_futex is always held 288 * when entering manager thread 289 */ 290 continue; 68 291 } 69 } 70 71 } 72 73 74 int ipc_call_sync(args) 75 { 76 return ipc_wait(call_func(args)); 77 } 292 callid = ipc_wait_cycle(&call,SYNCH_NO_TIMEOUT,SYNCH_BLOCKING); 293 294 if (callid & IPC_CALLID_ANSWERED) 295 continue; 296 handle_call(callid, &call); 297 } 298 } 299 300 static int async_manager_thread(void *arg) 301 { 302 futex_up(&conn_futex); /* conn_futex is always locked when entering 303 * manager */ 304 async_manager(); 305 } 306 307 /** Add one manager to manager list */ 308 void async_create_manager(void) 309 { 310 pstid_t ptid; 311 312 ptid = psthread_create(async_manager_thread, NULL); 313 psthread_add_manager(ptid); 314 } 315 316 /** Remove one manager from manager list */ 317 void async_destroy_manager(void) 318 { 319 psthread_remove_manager(); 320 } 321 322 /** Initialize internal structures needed for async manager */ 323 int _async_init(void) 324 { 325 if (!hash_table_create(&conn_hash_table, ASYNC_HASH_TABLE_CHAINS, 1, &conn_hash_table_ops)) { 326 printf("%s: cannot create hash table\n", "async"); 327 return ENOMEM; 328 } 329 330 }
Note:
See TracChangeset
for help on using the changeset viewer.