Changeset 566992e1 in mainline for uspace/lib/c/generic/async.c
- Timestamp:
- 2015-08-22T05:01:24Z (9 years ago)
- Branches:
- lfn, master, serial, ticket/834-toolchain-update, topic/msim-upgrade, topic/simplify-dev-export
- Children:
- d9e68d0
- Parents:
- 57dea62
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
uspace/lib/c/generic/async.c
r57dea62 r566992e1 123 123 list_t exch_list; 124 124 125 /** Session interface */ 126 iface_t iface; 127 125 128 /** Exchange management style */ 126 129 exch_mgmt_t mgmt; … … 245 248 void *data; 246 249 } connection_t; 250 251 /** Interface data */ 252 typedef struct { 253 ht_link_t link; 254 255 /** Interface ID */ 256 iface_t iface; 257 258 /** Futex protecting the hash table */ 259 futex_t futex; 260 261 /** Interface ports */ 262 hash_table_t port_hash_table; 263 264 /** Next available port ID */ 265 port_id_t port_id_avail; 266 } interface_t; 267 268 /* Port data */ 269 typedef struct { 270 ht_link_t link; 271 272 /** Port ID */ 273 port_id_t id; 274 275 /** Port connection handler */ 276 async_port_handler_t handler; 277 278 /** Client data */ 279 void *data; 280 } port_t; 247 281 248 282 /* Notification data */ … … 355 389 static void *fallback_port_data = NULL; 356 390 391 static hash_table_t interface_hash_table; 392 393 static size_t interface_key_hash(void *key) 394 { 395 iface_t iface = *(iface_t *) key; 396 return iface; 397 } 398 399 static size_t interface_hash(const ht_link_t *item) 400 { 401 interface_t *interface = hash_table_get_inst(item, interface_t, link); 402 return interface_key_hash(&interface->iface); 403 } 404 405 static bool interface_key_equal(void *key, const ht_link_t *item) 406 { 407 iface_t iface = *(iface_t *) key; 408 interface_t *interface = hash_table_get_inst(item, interface_t, link); 409 return iface == interface->iface; 410 } 411 412 /** Operations for the port hash table. */ 413 static hash_table_ops_t interface_hash_table_ops = { 414 .hash = interface_hash, 415 .key_hash = interface_key_hash, 416 .key_equal = interface_key_equal, 417 .equal = NULL, 418 .remove_callback = NULL 419 }; 420 421 static size_t port_key_hash(void *key) 422 { 423 port_id_t port_id = *(port_id_t *) key; 424 return port_id; 425 } 426 427 static size_t port_hash(const ht_link_t *item) 428 { 429 port_t *port = hash_table_get_inst(item, port_t, link); 430 return port_key_hash(&port->id); 431 } 432 433 static bool port_key_equal(void *key, const ht_link_t *item) 434 { 435 port_id_t port_id = *(port_id_t *) key; 436 port_t *port = hash_table_get_inst(item, port_t, link); 437 return port_id == port->id; 438 } 439 440 /** Operations for the port hash table. */ 441 static hash_table_ops_t port_hash_table_ops = { 442 .hash = port_hash, 443 .key_hash = port_key_hash, 444 .key_equal = port_key_equal, 445 .equal = NULL, 446 .remove_callback = NULL 447 }; 448 449 static interface_t *async_new_interface(iface_t iface) 450 { 451 interface_t *interface = 452 (interface_t *) malloc(sizeof(interface_t)); 453 if (!interface) 454 return NULL; 455 456 bool ret = hash_table_create(&interface->port_hash_table, 0, 0, 457 &port_hash_table_ops); 458 if (!ret) { 459 free(interface); 460 return NULL; 461 } 462 463 interface->iface = iface; 464 futex_initialize(&interface->futex, 1); 465 interface->port_id_avail = 0; 466 467 hash_table_insert(&interface_hash_table, &interface->link); 468 469 return interface; 470 } 471 472 static port_t *async_new_port(interface_t *interface, 473 async_port_handler_t handler, void *data) 474 { 475 port_t *port = (port_t *) malloc(sizeof(port_t)); 476 if (!port) 477 return NULL; 478 479 futex_down(&interface->futex); 480 481 port_id_t id = interface->port_id_avail; 482 interface->port_id_avail++; 483 484 port->id = id; 485 port->handler = handler; 486 port->data = data; 487 488 hash_table_insert(&interface->port_hash_table, &port->link); 489 490 futex_up(&interface->futex); 491 492 return port; 493 } 494 357 495 static size_t notification_handler_stksz = FIBRIL_DFLT_STK_SIZE; 358 496 … … 380 518 */ 381 519 static FIBRIL_CONDVAR_INITIALIZE(avail_phone_cv); 520 521 int async_create_port(iface_t iface, async_port_handler_t handler, 522 void *data, port_id_t *port_id) 523 { 524 if ((iface & IFACE_MOD_MASK) == IFACE_MOD_CALLBACK) 525 return EINVAL; 526 527 interface_t *interface; 528 529 futex_down(&async_futex); 530 531 ht_link_t *link = hash_table_find(&interface_hash_table, &iface); 532 if (link) 533 interface = hash_table_get_inst(link, interface_t, link); 534 else 535 interface = async_new_interface(iface); 536 537 if (!interface) { 538 futex_up(&async_futex); 539 return ENOMEM; 540 } 541 542 port_t *port = async_new_port(interface, handler, data); 543 if (!port) { 544 futex_up(&async_futex); 545 return ENOMEM; 546 } 547 548 *port_id = port->id; 549 550 futex_up(&async_futex); 551 552 return EOK; 553 } 382 554 383 555 void async_set_fallback_port_handler(async_port_handler_t handler, void *data) … … 959 1131 } 960 1132 1133 static port_t *async_find_port(iface_t iface, port_id_t port_id) 1134 { 1135 port_t *port = NULL; 1136 1137 futex_down(&async_futex); 1138 1139 ht_link_t *link = hash_table_find(&interface_hash_table, &iface); 1140 if (link) { 1141 interface_t *interface = 1142 hash_table_get_inst(link, interface_t, link); 1143 1144 link = hash_table_find(&interface->port_hash_table, &port_id); 1145 if (link) 1146 port = hash_table_get_inst(link, port_t, link); 1147 } 1148 1149 futex_up(&async_futex); 1150 1151 return port; 1152 } 1153 961 1154 /** Wrapper for client connection fibril. 962 1155 * … … 1120 1313 } 1121 1314 1122 switch (IPC_GET_IMETHOD(*call)) { 1123 case IPC_M_CLONE_ESTABLISH: 1124 case IPC_M_CONNECT_ME_TO: 1315 /* New connection */ 1316 if (IPC_GET_IMETHOD(*call) == IPC_M_CONNECT_ME_TO) { 1317 iface_t iface = (iface_t) IPC_GET_ARG1(*call); 1318 sysarg_t in_phone_hash = IPC_GET_ARG5(*call); 1319 1320 async_notification_handler_t handler = fallback_port_handler; 1321 void *data = fallback_port_data; 1322 1323 // TODO: Currently ignores all ports but the first one 1324 port_t *port = async_find_port(iface, 0); 1325 if (port) { 1326 handler = port->handler; 1327 data = port->data; 1328 } 1329 1330 async_new_connection(call->in_task_id, in_phone_hash, callid, 1331 call, handler, data); 1332 return; 1333 } 1334 1335 /* Cloned connection */ 1336 if (IPC_GET_IMETHOD(*call) == IPC_M_CLONE_ESTABLISH) { 1337 // TODO: Currently ignores ports altogether 1338 1125 1339 /* Open new connection with fibril, etc. */ 1126 1340 async_new_connection(call->in_task_id, IPC_GET_ARG5(*call), … … 1286 1500 void __async_init(void) 1287 1501 { 1502 if (!hash_table_create(&interface_hash_table, 0, 0, 1503 &interface_hash_table_ops)) 1504 abort(); 1505 1288 1506 if (!hash_table_create(&client_hash_table, 0, 0, &client_hash_table_ops)) 1289 1507 abort(); … … 1300 1518 abort(); 1301 1519 1520 session_ns->iface = 0; 1302 1521 session_ns->mgmt = EXCHANGE_ATOMIC; 1303 1522 session_ns->phone = PHONE_NS; … … 1905 2124 } 1906 2125 2126 sess->iface = 0; 1907 2127 sess->mgmt = mgmt; 1908 2128 sess->phone = phone; … … 1981 2201 } 1982 2202 2203 sess->iface = 0; 1983 2204 sess->mgmt = mgmt; 1984 2205 sess->phone = phone; … … 2052 2273 } 2053 2274 2275 sess->iface = 0; 2054 2276 sess->mgmt = mgmt; 2055 2277 sess->phone = phone; … … 2068 2290 } 2069 2291 2292 /** Wrapper for making IPC_M_CONNECT_ME_TO calls using the async framework. 2293 * 2294 * Ask through phone for a new connection to some service and block until 2295 * success. 2296 * 2297 * @param exch Exchange for sending the message. 2298 * @param iface Connection interface. 2299 * @param arg2 User defined argument. 2300 * @param arg3 User defined argument. 2301 * 2302 * @return New session on success or NULL on error. 2303 * 2304 */ 2305 async_sess_t *async_connect_me_to_blocking_iface(async_exch_t *exch, iface_t iface, 2306 sysarg_t arg2, sysarg_t arg3) 2307 { 2308 if (exch == NULL) { 2309 errno = ENOENT; 2310 return NULL; 2311 } 2312 2313 async_sess_t *sess = (async_sess_t *) malloc(sizeof(async_sess_t)); 2314 if (sess == NULL) { 2315 errno = ENOMEM; 2316 return NULL; 2317 } 2318 2319 int phone = async_connect_me_to_internal(exch->phone, iface, arg2, 2320 arg3, IPC_FLAG_BLOCKING); 2321 if (phone < 0) { 2322 errno = phone; 2323 free(sess); 2324 return NULL; 2325 } 2326 2327 sess->iface = iface; 2328 sess->phone = phone; 2329 sess->arg1 = iface; 2330 sess->arg2 = arg2; 2331 sess->arg3 = arg3; 2332 2333 fibril_mutex_initialize(&sess->remote_state_mtx); 2334 sess->remote_state_data = NULL; 2335 2336 list_initialize(&sess->exch_list); 2337 fibril_mutex_initialize(&sess->mutex); 2338 atomic_set(&sess->refcnt, 0); 2339 2340 return sess; 2341 } 2342 2070 2343 /** Connect to a task specified by id. 2071 2344 * … … 2086 2359 } 2087 2360 2361 sess->iface = 0; 2088 2362 sess->mgmt = EXCHANGE_ATOMIC; 2089 2363 sess->phone = phone; … … 2163 2437 return NULL; 2164 2438 2165 async_exch_t *exch; 2439 exch_mgmt_t mgmt = sess->mgmt; 2440 if (sess->iface != 0) 2441 mgmt = sess->iface & IFACE_EXCHANGE_MASK; 2442 2443 async_exch_t *exch = NULL; 2166 2444 2167 2445 fibril_mutex_lock(&async_sess_mutex); … … 2182 2460 */ 2183 2461 2184 if (( sess->mgmt == EXCHANGE_ATOMIC) ||2185 ( sess->mgmt == EXCHANGE_SERIALIZE)) {2462 if ((mgmt == EXCHANGE_ATOMIC) || 2463 (mgmt == EXCHANGE_SERIALIZE)) { 2186 2464 exch = (async_exch_t *) malloc(sizeof(async_exch_t)); 2187 2465 if (exch != NULL) { … … 2191 2469 exch->phone = sess->phone; 2192 2470 } 2193 } else { /* EXCHANGE_PARALLEL */ 2471 } else if (mgmt == EXCHANGE_PARALLEL) { 2472 int phone; 2473 2474 retry: 2194 2475 /* 2195 2476 * Make a one-time attempt to connect a new data phone. 2196 2477 */ 2197 2198 int phone;2199 2200 retry:2201 2478 phone = async_connect_me_to_internal(sess->phone, sess->arg1, 2202 2479 sess->arg2, sess->arg3, 0); … … 2240 2517 atomic_inc(&sess->refcnt); 2241 2518 2242 if ( sess->mgmt == EXCHANGE_SERIALIZE)2519 if (mgmt == EXCHANGE_SERIALIZE) 2243 2520 fibril_mutex_lock(&sess->mutex); 2244 2521 } … … 2260 2537 assert(sess != NULL); 2261 2538 2539 exch_mgmt_t mgmt = sess->mgmt; 2540 if (sess->iface != 0) 2541 mgmt = sess->iface & IFACE_EXCHANGE_MASK; 2542 2262 2543 atomic_dec(&sess->refcnt); 2263 2544 2264 if ( sess->mgmt == EXCHANGE_SERIALIZE)2545 if (mgmt == EXCHANGE_SERIALIZE) 2265 2546 fibril_mutex_unlock(&sess->mutex); 2266 2547 … … 2818 3099 } 2819 3100 3101 sess->iface = 0; 2820 3102 sess->mgmt = mgmt; 2821 3103 sess->phone = phone; … … 2867 3149 } 2868 3150 3151 sess->iface = 0; 2869 3152 sess->mgmt = mgmt; 2870 3153 sess->phone = phone; … … 2912 3195 return NULL; 2913 3196 3197 sess->iface = 0; 2914 3198 sess->mgmt = mgmt; 2915 3199 sess->phone = phone;
Note:
See TracChangeset
for help on using the changeset viewer.