Changes in uspace/lib/c/generic/async.c [c170438:7f9d97f3] in mainline
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
uspace/lib/c/generic/async.c
rc170438 r7f9d97f3 77 77 * } 78 78 * 79 * port_handler(icallid, *icall)79 * my_client_connection(icallid, *icall) 80 80 * { 81 81 * if (want_refuse) { … … 123 123 list_t exch_list; 124 124 125 /** Session interface */126 iface_t iface;127 128 125 /** Exchange management style */ 129 126 exch_mgmt_t mgmt; … … 192 189 /** If reply was received. */ 193 190 bool done; 194 191 195 192 /** If the message / reply should be discarded on arrival. */ 196 193 bool forget; 197 194 198 195 /** If already destroyed. */ 199 196 bool destroyed; … … 235 232 /** Identification of the opening call. */ 236 233 ipc_callid_t callid; 237 238 234 /** Call data of the opening call. */ 239 235 ipc_call_t call; 236 /** Local argument or NULL if none. */ 237 void *carg; 240 238 241 239 /** Identification of the closing call. */ … … 243 241 244 242 /** Fibril function that will be used to handle the connection. */ 245 async_port_handler_t handler; 246 247 /** Client data */ 248 void *data; 243 async_client_conn_t cfibril; 249 244 } connection_t; 250 251 /** Interface data */252 typedef struct {253 ht_link_t link;254 255 /** Interface ID */256 iface_t iface;257 258 /** Futex protecting the hash table */259 futex_t futex;260 261 /** Interface ports */262 hash_table_t port_hash_table;263 264 /** Next available port ID */265 port_id_t port_id_avail;266 } interface_t;267 268 /* Port data */269 typedef struct {270 ht_link_t link;271 272 /** Port ID */273 port_id_t id;274 275 /** Port connection handler */276 async_port_handler_t handler;277 278 /** Client data */279 void *data;280 } port_t;281 245 282 246 /* Notification data */ … … 300 264 { 301 265 struct timeval tv = { 0, 0 }; 302 266 303 267 to->inlist = false; 304 268 to->occurred = false; … … 323 287 static amsg_t *amsg_create(void) 324 288 { 325 amsg_t *msg = malloc(sizeof(amsg_t)); 289 amsg_t *msg; 290 291 msg = malloc(sizeof(amsg_t)); 326 292 if (msg) { 327 293 msg->done = false; … … 332 298 awaiter_initialize(&msg->wdata); 333 299 } 334 300 335 301 return msg; 336 302 } … … 369 335 } 370 336 371 /** Default fallback fibril function. 372 * 373 * This fallback fibril function gets called on incomming 374 * connections that do not have a specific handler defined. 337 /** Default fibril function that gets called to handle new connection. 338 * 339 * This function is defined as a weak symbol - to be redefined in user code. 375 340 * 376 341 * @param callid Hash of the incoming call. … … 379 344 * 380 345 */ 381 static void default_ fallback_port_handler(ipc_callid_t callid, ipc_call_t *call,346 static void default_client_connection(ipc_callid_t callid, ipc_call_t *call, 382 347 void *arg) 383 348 { … … 385 350 } 386 351 387 static async_port_handler_t fallback_port_handler = 388 default_fallback_port_handler; 389 static void *fallback_port_data = NULL; 390 391 static hash_table_t interface_hash_table; 392 393 static size_t interface_key_hash(void *key) 394 { 395 iface_t iface = *(iface_t *) key; 396 return iface; 397 } 398 399 static size_t interface_hash(const ht_link_t *item) 400 { 401 interface_t *interface = hash_table_get_inst(item, interface_t, link); 402 return interface_key_hash(&interface->iface); 403 } 404 405 static bool interface_key_equal(void *key, const ht_link_t *item) 406 { 407 iface_t iface = *(iface_t *) key; 408 interface_t *interface = hash_table_get_inst(item, interface_t, link); 409 return iface == interface->iface; 410 } 411 412 /** Operations for the port hash table. */ 413 static hash_table_ops_t interface_hash_table_ops = { 414 .hash = interface_hash, 415 .key_hash = interface_key_hash, 416 .key_equal = interface_key_equal, 417 .equal = NULL, 418 .remove_callback = NULL 419 }; 420 421 static size_t port_key_hash(void *key) 422 { 423 port_id_t port_id = *(port_id_t *) key; 424 return port_id; 425 } 426 427 static size_t port_hash(const ht_link_t *item) 428 { 429 port_t *port = hash_table_get_inst(item, port_t, link); 430 return port_key_hash(&port->id); 431 } 432 433 static bool port_key_equal(void *key, const ht_link_t *item) 434 { 435 port_id_t port_id = *(port_id_t *) key; 436 port_t *port = hash_table_get_inst(item, port_t, link); 437 return port_id == port->id; 438 } 439 440 /** Operations for the port hash table. */ 441 static hash_table_ops_t port_hash_table_ops = { 442 .hash = port_hash, 443 .key_hash = port_key_hash, 444 .key_equal = port_key_equal, 445 .equal = NULL, 446 .remove_callback = NULL 447 }; 448 449 static interface_t *async_new_interface(iface_t iface) 450 { 451 interface_t *interface = 452 (interface_t *) malloc(sizeof(interface_t)); 453 if (!interface) 454 return NULL; 455 456 bool ret = hash_table_create(&interface->port_hash_table, 0, 0, 457 &port_hash_table_ops); 458 if (!ret) { 459 free(interface); 460 return NULL; 461 } 462 463 interface->iface = iface; 464 futex_initialize(&interface->futex, 1); 465 interface->port_id_avail = 0; 466 467 hash_table_insert(&interface_hash_table, &interface->link); 468 469 return interface; 470 } 471 472 static port_t *async_new_port(interface_t *interface, 473 async_port_handler_t handler, void *data) 474 { 475 port_t *port = (port_t *) malloc(sizeof(port_t)); 476 if (!port) 477 return NULL; 478 479 futex_down(&interface->futex); 480 481 port_id_t id = interface->port_id_avail; 482 interface->port_id_avail++; 483 484 port->id = id; 485 port->handler = handler; 486 port->data = data; 487 488 hash_table_insert(&interface->port_hash_table, &port->link); 489 490 futex_up(&interface->futex); 491 492 return port; 352 static async_client_conn_t client_connection = default_client_connection; 353 static size_t notification_handler_stksz = FIBRIL_DFLT_STK_SIZE; 354 355 /** Setter for client_connection function pointer. 356 * 357 * @param conn Function that will implement a new connection fibril. 358 * 359 */ 360 void async_set_client_connection(async_client_conn_t conn) 361 { 362 assert(client_connection == default_client_connection); 363 client_connection = conn; 364 } 365 366 /** Set the stack size for the notification handler notification fibrils. 367 * 368 * @param size Stack size in bytes. 369 */ 370 void async_set_notification_handler_stack_size(size_t size) 371 { 372 notification_handler_stksz = size; 493 373 } 494 374 … … 507 387 */ 508 388 static FIBRIL_CONDVAR_INITIALIZE(avail_phone_cv); 509 510 int async_create_port(iface_t iface, async_port_handler_t handler,511 void *data, port_id_t *port_id)512 {513 if ((iface & IFACE_MOD_MASK) == IFACE_MOD_CALLBACK)514 return EINVAL;515 516 interface_t *interface;517 518 futex_down(&async_futex);519 520 ht_link_t *link = hash_table_find(&interface_hash_table, &iface);521 if (link)522 interface = hash_table_get_inst(link, interface_t, link);523 else524 interface = async_new_interface(iface);525 526 if (!interface) {527 futex_up(&async_futex);528 return ENOMEM;529 }530 531 port_t *port = async_new_port(interface, handler, data);532 if (!port) {533 futex_up(&async_futex);534 return ENOMEM;535 }536 537 *port_id = port->id;538 539 futex_up(&async_futex);540 541 return EOK;542 }543 544 void async_set_fallback_port_handler(async_port_handler_t handler, void *data)545 {546 assert(handler != NULL);547 548 fallback_port_handler = handler;549 fallback_port_data = data;550 }551 389 552 390 static hash_table_t client_hash_table; … … 619 457 .remove_callback = NULL 620 458 }; 621 622 static client_t *async_client_get(task_id_t client_id, bool create)623 {624 client_t *client = NULL;625 626 futex_down(&async_futex);627 ht_link_t *link = hash_table_find(&client_hash_table, &client_id);628 if (link) {629 client = hash_table_get_inst(link, client_t, link);630 atomic_inc(&client->refcnt);631 } else if (create) {632 client = malloc(sizeof(client_t));633 if (client) {634 client->in_task_id = client_id;635 client->data = async_client_data_create();636 637 atomic_set(&client->refcnt, 1);638 hash_table_insert(&client_hash_table, &client->link);639 }640 }641 642 futex_up(&async_futex);643 return client;644 }645 646 static void async_client_put(client_t *client)647 {648 bool destroy;649 650 futex_down(&async_futex);651 652 if (atomic_predec(&client->refcnt) == 0) {653 hash_table_remove(&client_hash_table, &client->in_task_id);654 destroy = true;655 } else656 destroy = false;657 658 futex_up(&async_futex);659 660 if (destroy) {661 if (client->data)662 async_client_data_destroy(client->data);663 664 free(client);665 }666 }667 668 /** Wrapper for client connection fibril.669 *670 * When a new connection arrives, a fibril with this implementing671 * function is created.672 *673 * @param arg Connection structure pointer.674 *675 * @return Always zero.676 *677 */678 static int connection_fibril(void *arg)679 {680 assert(arg);681 682 /*683 * Setup fibril-local connection pointer.684 */685 fibril_connection = (connection_t *) arg;686 687 /*688 * Add our reference for the current connection in the client task689 * tracking structure. If this is the first reference, create and690 * hash in a new tracking structure.691 */692 693 client_t *client = async_client_get(fibril_connection->in_task_id, true);694 if (!client) {695 ipc_answer_0(fibril_connection->callid, ENOMEM);696 return 0;697 }698 699 fibril_connection->client = client;700 701 /*702 * Call the connection handler function.703 */704 fibril_connection->handler(fibril_connection->callid,705 &fibril_connection->call, fibril_connection->data);706 707 /*708 * Remove the reference for this client task connection.709 */710 async_client_put(client);711 712 /*713 * Remove myself from the connection hash table.714 */715 futex_down(&async_futex);716 hash_table_remove(&conn_hash_table, &fibril_connection->in_phone_hash);717 futex_up(&async_futex);718 719 /*720 * Answer all remaining messages with EHANGUP.721 */722 while (!list_empty(&fibril_connection->msg_queue)) {723 msg_t *msg =724 list_get_instance(list_first(&fibril_connection->msg_queue),725 msg_t, link);726 727 list_remove(&msg->link);728 ipc_answer_0(msg->callid, EHANGUP);729 free(msg);730 }731 732 /*733 * If the connection was hung-up, answer the last call,734 * i.e. IPC_M_PHONE_HUNGUP.735 */736 if (fibril_connection->close_callid)737 ipc_answer_0(fibril_connection->close_callid, EOK);738 739 free(fibril_connection);740 return 0;741 }742 743 /** Create a new fibril for a new connection.744 *745 * Create new fibril for connection, fill in connection structures746 * and insert it into the hash table, so that later we can easily747 * do routing of messages to particular fibrils.748 *749 * @param in_task_id Identification of the incoming connection.750 * @param in_phone_hash Identification of the incoming connection.751 * @param callid Hash of the opening IPC_M_CONNECT_ME_TO call.752 * If callid is zero, the connection was opened by753 * accepting the IPC_M_CONNECT_TO_ME call and this754 * function is called directly by the server.755 * @param call Call data of the opening call.756 * @param handler Connection handler.757 * @param data Client argument to pass to the connection handler.758 *759 * @return New fibril id or NULL on failure.760 *761 */762 static fid_t async_new_connection(task_id_t in_task_id, sysarg_t in_phone_hash,763 ipc_callid_t callid, ipc_call_t *call, async_port_handler_t handler,764 void *data)765 {766 connection_t *conn = malloc(sizeof(*conn));767 if (!conn) {768 if (callid)769 ipc_answer_0(callid, ENOMEM);770 771 return (uintptr_t) NULL;772 }773 774 conn->in_task_id = in_task_id;775 conn->in_phone_hash = in_phone_hash;776 list_initialize(&conn->msg_queue);777 conn->callid = callid;778 conn->close_callid = 0;779 conn->handler = handler;780 conn->data = data;781 782 if (call)783 conn->call = *call;784 785 /* We will activate the fibril ASAP */786 conn->wdata.active = true;787 conn->wdata.fid = fibril_create(connection_fibril, conn);788 789 if (conn->wdata.fid == 0) {790 free(conn);791 792 if (callid)793 ipc_answer_0(callid, ENOMEM);794 795 return (uintptr_t) NULL;796 }797 798 /* Add connection to the connection hash table */799 800 futex_down(&async_futex);801 hash_table_insert(&conn_hash_table, &conn->link);802 futex_up(&async_futex);803 804 fibril_add_ready(conn->wdata.fid);805 806 return conn->wdata.fid;807 }808 809 /** Wrapper for making IPC_M_CONNECT_TO_ME calls using the async framework.810 *811 * Ask through phone for a new connection to some service.812 *813 * @param exch Exchange for sending the message.814 * @param iface Callback interface.815 * @param arg1 User defined argument.816 * @param arg2 User defined argument.817 * @param handler Callback handler.818 * @param data Handler data.819 * @param port_id ID of the newly created port.820 *821 * @return Zero on success or a negative error code.822 *823 */824 int async_create_callback_port(async_exch_t *exch, iface_t iface, sysarg_t arg1,825 sysarg_t arg2, async_port_handler_t handler, void *data, port_id_t *port_id)826 {827 if ((iface & IFACE_MOD_CALLBACK) != IFACE_MOD_CALLBACK)828 return EINVAL;829 830 if (exch == NULL)831 return ENOENT;832 833 ipc_call_t answer;834 aid_t req = async_send_3(exch, IPC_M_CONNECT_TO_ME, iface, arg1, arg2,835 &answer);836 837 sysarg_t ret;838 async_wait_for(req, &ret);839 if (ret != EOK)840 return (int) ret;841 842 sysarg_t phone_hash = IPC_GET_ARG5(answer);843 interface_t *interface;844 845 futex_down(&async_futex);846 847 ht_link_t *link = hash_table_find(&interface_hash_table, &iface);848 if (link)849 interface = hash_table_get_inst(link, interface_t, link);850 else851 interface = async_new_interface(iface);852 853 if (!interface) {854 futex_up(&async_futex);855 return ENOMEM;856 }857 858 port_t *port = async_new_port(interface, handler, data);859 if (!port) {860 futex_up(&async_futex);861 return ENOMEM;862 }863 864 *port_id = port->id;865 866 futex_up(&async_futex);867 868 fid_t fid = async_new_connection(answer.in_task_id, phone_hash,869 0, NULL, handler, data);870 if (fid == (uintptr_t) NULL)871 return ENOMEM;872 873 return EOK;874 }875 459 876 460 static size_t notification_key_hash(void *key) … … 987 571 } 988 572 989 /** Process notification. 990 * 991 * @param callid Hash of the incoming call. 992 * @param call Data of the incoming call. 993 */ 994 static void process_notification(ipc_callid_t callid, ipc_call_t *call) 995 { 573 /** Notification fibril. 574 * 575 * When a notification arrives, a fibril with this implementing function is 576 * created. It calls the corresponding notification handler and does the final 577 * cleanup. 578 * 579 * @param arg Message structure pointer. 580 * 581 * @return Always zero. 582 * 583 */ 584 static int notification_fibril(void *arg) 585 { 586 assert(arg); 587 588 msg_t *msg = (msg_t *) arg; 996 589 async_notification_handler_t handler = NULL; 997 590 void *data = NULL; 998 999 assert(call);1000 591 1001 592 futex_down(&async_futex); 1002 593 1003 594 ht_link_t *link = hash_table_find(¬ification_hash_table, 1004 &IPC_GET_IMETHOD( *call));595 &IPC_GET_IMETHOD(msg->call)); 1005 596 if (link) { 1006 597 notification_t *notification = … … 1013 604 1014 605 if (handler) 1015 handler(callid, call, data); 606 handler(msg->callid, &msg->call, data); 607 608 free(msg); 609 return 0; 610 } 611 612 /** Process notification. 613 * 614 * A new fibril is created which would process the notification. 615 * 616 * @param callid Hash of the incoming call. 617 * @param call Data of the incoming call. 618 * 619 * @return False if an error occured. 620 * True if the call was passed to the notification fibril. 621 * 622 */ 623 static bool process_notification(ipc_callid_t callid, ipc_call_t *call) 624 { 625 assert(call); 626 627 futex_down(&async_futex); 628 629 msg_t *msg = malloc(sizeof(*msg)); 630 if (!msg) { 631 futex_up(&async_futex); 632 return false; 633 } 634 635 msg->callid = callid; 636 msg->call = *call; 637 638 fid_t fid = fibril_create_generic(notification_fibril, msg, 639 notification_handler_stksz); 640 if (fid == 0) { 641 free(msg); 642 futex_up(&async_futex); 643 return false; 644 } 645 646 fibril_add_ready(fid); 647 648 futex_up(&async_futex); 649 return true; 1016 650 } 1017 651 … … 1232 866 } 1233 867 1234 msg_t *msg = list_get_instance(list_first(&conn->msg_queue), 1235 msg_t, link); 868 msg_t *msg = list_get_instance(list_first(&conn->msg_queue), msg_t, link); 1236 869 list_remove(&msg->link); 1237 870 … … 1244 877 } 1245 878 879 static client_t *async_client_get(task_id_t client_id, bool create) 880 { 881 client_t *client = NULL; 882 883 futex_down(&async_futex); 884 ht_link_t *link = hash_table_find(&client_hash_table, &client_id); 885 if (link) { 886 client = hash_table_get_inst(link, client_t, link); 887 atomic_inc(&client->refcnt); 888 } else if (create) { 889 client = malloc(sizeof(client_t)); 890 if (client) { 891 client->in_task_id = client_id; 892 client->data = async_client_data_create(); 893 894 atomic_set(&client->refcnt, 1); 895 hash_table_insert(&client_hash_table, &client->link); 896 } 897 } 898 899 futex_up(&async_futex); 900 return client; 901 } 902 903 static void async_client_put(client_t *client) 904 { 905 bool destroy; 906 907 futex_down(&async_futex); 908 909 if (atomic_predec(&client->refcnt) == 0) { 910 hash_table_remove(&client_hash_table, &client->in_task_id); 911 destroy = true; 912 } else 913 destroy = false; 914 915 futex_up(&async_futex); 916 917 if (destroy) { 918 if (client->data) 919 async_client_data_destroy(client->data); 920 921 free(client); 922 } 923 } 924 1246 925 void *async_get_client_data(void) 1247 926 { … … 1255 934 if (!client) 1256 935 return NULL; 1257 1258 936 if (!client->data) { 1259 937 async_client_put(client); 1260 938 return NULL; 1261 939 } 1262 940 1263 941 return client->data; 1264 942 } … … 1267 945 { 1268 946 client_t *client = async_client_get(client_id, false); 1269 947 1270 948 assert(client); 1271 949 assert(client->data); 1272 950 1273 951 /* Drop the reference we got in async_get_client_data_by_hash(). */ 1274 952 async_client_put(client); 1275 953 1276 954 /* Drop our own reference we got at the beginning of this function. */ 1277 955 async_client_put(client); 1278 956 } 1279 957 1280 static port_t *async_find_port(iface_t iface, port_id_t port_id) 1281 { 1282 port_t *port = NULL; 1283 958 /** Wrapper for client connection fibril. 959 * 960 * When a new connection arrives, a fibril with this implementing function is 961 * created. It calls client_connection() and does the final cleanup. 962 * 963 * @param arg Connection structure pointer. 964 * 965 * @return Always zero. 966 * 967 */ 968 static int connection_fibril(void *arg) 969 { 970 assert(arg); 971 972 /* 973 * Setup fibril-local connection pointer. 974 */ 975 fibril_connection = (connection_t *) arg; 976 977 /* 978 * Add our reference for the current connection in the client task 979 * tracking structure. If this is the first reference, create and 980 * hash in a new tracking structure. 981 */ 982 983 client_t *client = async_client_get(fibril_connection->in_task_id, true); 984 if (!client) { 985 ipc_answer_0(fibril_connection->callid, ENOMEM); 986 return 0; 987 } 988 989 fibril_connection->client = client; 990 991 /* 992 * Call the connection handler function. 993 */ 994 fibril_connection->cfibril(fibril_connection->callid, 995 &fibril_connection->call, fibril_connection->carg); 996 997 /* 998 * Remove the reference for this client task connection. 999 */ 1000 async_client_put(client); 1001 1002 /* 1003 * Remove myself from the connection hash table. 1004 */ 1284 1005 futex_down(&async_futex); 1285 1286 ht_link_t *link = hash_table_find(&interface_hash_table, &iface); 1287 if (link) { 1288 interface_t *interface = 1289 hash_table_get_inst(link, interface_t, link); 1006 hash_table_remove(&conn_hash_table, &fibril_connection->in_phone_hash); 1007 futex_up(&async_futex); 1008 1009 /* 1010 * Answer all remaining messages with EHANGUP. 1011 */ 1012 while (!list_empty(&fibril_connection->msg_queue)) { 1013 msg_t *msg = 1014 list_get_instance(list_first(&fibril_connection->msg_queue), 1015 msg_t, link); 1290 1016 1291 link = hash_table_find(&interface->port_hash_table, &port_id); 1292 if (link) 1293 port = hash_table_get_inst(link, port_t, link); 1294 } 1295 1017 list_remove(&msg->link); 1018 ipc_answer_0(msg->callid, EHANGUP); 1019 free(msg); 1020 } 1021 1022 /* 1023 * If the connection was hung-up, answer the last call, 1024 * i.e. IPC_M_PHONE_HUNGUP. 1025 */ 1026 if (fibril_connection->close_callid) 1027 ipc_answer_0(fibril_connection->close_callid, EOK); 1028 1029 free(fibril_connection); 1030 return 0; 1031 } 1032 1033 /** Create a new fibril for a new connection. 1034 * 1035 * Create new fibril for connection, fill in connection structures and insert 1036 * it into the hash table, so that later we can easily do routing of messages to 1037 * particular fibrils. 1038 * 1039 * @param in_task_id Identification of the incoming connection. 1040 * @param in_phone_hash Identification of the incoming connection. 1041 * @param callid Hash of the opening IPC_M_CONNECT_ME_TO call. 1042 * If callid is zero, the connection was opened by 1043 * accepting the IPC_M_CONNECT_TO_ME call and this function 1044 * is called directly by the server. 1045 * @param call Call data of the opening call. 1046 * @param cfibril Fibril function that should be called upon opening the 1047 * connection. 1048 * @param carg Extra argument to pass to the connection fibril 1049 * 1050 * @return New fibril id or NULL on failure. 1051 * 1052 */ 1053 fid_t async_new_connection(task_id_t in_task_id, sysarg_t in_phone_hash, 1054 ipc_callid_t callid, ipc_call_t *call, 1055 async_client_conn_t cfibril, void *carg) 1056 { 1057 connection_t *conn = malloc(sizeof(*conn)); 1058 if (!conn) { 1059 if (callid) 1060 ipc_answer_0(callid, ENOMEM); 1061 1062 return (uintptr_t) NULL; 1063 } 1064 1065 conn->in_task_id = in_task_id; 1066 conn->in_phone_hash = in_phone_hash; 1067 list_initialize(&conn->msg_queue); 1068 conn->callid = callid; 1069 conn->close_callid = 0; 1070 conn->carg = carg; 1071 1072 if (call) 1073 conn->call = *call; 1074 1075 /* We will activate the fibril ASAP */ 1076 conn->wdata.active = true; 1077 conn->cfibril = cfibril; 1078 conn->wdata.fid = fibril_create(connection_fibril, conn); 1079 1080 if (conn->wdata.fid == 0) { 1081 free(conn); 1082 1083 if (callid) 1084 ipc_answer_0(callid, ENOMEM); 1085 1086 return (uintptr_t) NULL; 1087 } 1088 1089 /* Add connection to the connection hash table */ 1090 1091 futex_down(&async_futex); 1092 hash_table_insert(&conn_hash_table, &conn->link); 1296 1093 futex_up(&async_futex); 1297 1094 1298 return port; 1095 fibril_add_ready(conn->wdata.fid); 1096 1097 return conn->wdata.fid; 1299 1098 } 1300 1099 … … 1312 1111 assert(call); 1313 1112 1314 /* Kernel notification */1113 /* Unrouted call - take some default action */ 1315 1114 if ((callid & IPC_CALLID_NOTIFICATION)) { 1316 fibril_t *fibril = (fibril_t *) __tcb_get()->fibril_data;1317 unsigned oldsw = fibril->switches;1318 1319 1115 process_notification(callid, call); 1320 1321 if (oldsw != fibril->switches) {1322 /*1323 * The notification handler did not execute atomically1324 * and so the current manager fibril assumed the role of1325 * a notification fibril. While waiting for its1326 * resources, it switched to another manager fibril that1327 * had already existed or it created a new one. We1328 * therefore know there is at least yet another1329 * manager fibril that can take over. We now kill the1330 * current 'notification' fibril to prevent fibril1331 * population explosion.1332 */1333 futex_down(&async_futex);1334 fibril_switch(FIBRIL_FROM_DEAD);1335 }1336 1116 return; 1337 1117 } 1338 1118 1339 /* New connection */ 1340 if (IPC_GET_IMETHOD(*call) == IPC_M_CONNECT_ME_TO) { 1341 iface_t iface = (iface_t) IPC_GET_ARG1(*call); 1342 sysarg_t in_phone_hash = IPC_GET_ARG5(*call); 1343 1344 async_notification_handler_t handler = fallback_port_handler; 1345 void *data = fallback_port_data; 1346 1347 // TODO: Currently ignores all ports but the first one 1348 port_t *port = async_find_port(iface, 0); 1349 if (port) { 1350 handler = port->handler; 1351 data = port->data; 1352 } 1353 1354 async_new_connection(call->in_task_id, in_phone_hash, callid, 1355 call, handler, data); 1356 return; 1357 } 1358 1359 /* Cloned connection */ 1360 if (IPC_GET_IMETHOD(*call) == IPC_M_CLONE_ESTABLISH) { 1361 // TODO: Currently ignores ports altogether 1362 1119 switch (IPC_GET_IMETHOD(*call)) { 1120 case IPC_M_CLONE_ESTABLISH: 1121 case IPC_M_CONNECT_ME_TO: 1363 1122 /* Open new connection with fibril, etc. */ 1364 1123 async_new_connection(call->in_task_id, IPC_GET_ARG5(*call), 1365 callid, call, fallback_port_handler, fallback_port_data);1124 callid, call, client_connection, NULL); 1366 1125 return; 1367 1126 } … … 1508 1267 void async_create_manager(void) 1509 1268 { 1510 fid_t fid = fibril_create _generic(async_manager_fibril, NULL, PAGE_SIZE);1269 fid_t fid = fibril_create(async_manager_fibril, NULL); 1511 1270 if (fid != 0) 1512 1271 fibril_add_manager(fid); … … 1524 1283 void __async_init(void) 1525 1284 { 1526 if (!hash_table_create(&interface_hash_table, 0, 0,1527 &interface_hash_table_ops))1528 abort();1529 1530 1285 if (!hash_table_create(&client_hash_table, 0, 0, &client_hash_table_ops)) 1531 1286 abort(); … … 1542 1297 abort(); 1543 1298 1544 session_ns->iface = 0;1545 1299 session_ns->mgmt = EXCHANGE_ATOMIC; 1546 1300 session_ns->phone = PHONE_NS; … … 1589 1343 1590 1344 msg->done = true; 1591 1345 1592 1346 if (msg->forget) { 1593 1347 assert(msg->wdata.active); … … 1597 1351 fibril_add_ready(msg->wdata.fid); 1598 1352 } 1599 1353 1600 1354 futex_up(&async_futex); 1601 1355 } … … 1632 1386 1633 1387 ipc_call_async_4(exch->phone, imethod, arg1, arg2, arg3, arg4, msg, 1634 reply_received );1388 reply_received, true); 1635 1389 1636 1390 return (aid_t) msg; … … 1670 1424 1671 1425 ipc_call_async_5(exch->phone, imethod, arg1, arg2, arg3, arg4, arg5, 1672 msg, reply_received );1426 msg, reply_received, true); 1673 1427 1674 1428 return (aid_t) msg; … … 1689 1443 1690 1444 futex_down(&async_futex); 1691 1445 1692 1446 assert(!msg->forget); 1693 1447 assert(!msg->destroyed); 1694 1448 1695 1449 if (msg->done) { 1696 1450 futex_up(&async_futex); … … 1733 1487 1734 1488 amsg_t *msg = (amsg_t *) amsgid; 1735 1489 1736 1490 futex_down(&async_futex); 1737 1491 1738 1492 assert(!msg->forget); 1739 1493 assert(!msg->destroyed); 1740 1494 1741 1495 if (msg->done) { 1742 1496 futex_up(&async_futex); … … 1750 1504 if (timeout < 0) 1751 1505 timeout = 0; 1752 1506 1753 1507 getuptime(&msg->wdata.to_event.expires); 1754 1508 tv_add_diff(&msg->wdata.to_event.expires, timeout); … … 1803 1557 { 1804 1558 amsg_t *msg = (amsg_t *) amsgid; 1805 1559 1806 1560 assert(msg); 1807 1561 assert(!msg->forget); 1808 1562 assert(!msg->destroyed); 1809 1563 1810 1564 futex_down(&async_futex); 1811 1812 1565 if (msg->done) { 1813 1566 amsg_destroy(msg); … … 1816 1569 msg->forget = true; 1817 1570 } 1818 1819 1571 futex_up(&async_futex); 1820 1572 } … … 1959 1711 { 1960 1712 if (exch != NULL) 1961 ipc_call_async_0(exch->phone, imethod, NULL, NULL );1713 ipc_call_async_0(exch->phone, imethod, NULL, NULL, true); 1962 1714 } 1963 1715 … … 1965 1717 { 1966 1718 if (exch != NULL) 1967 ipc_call_async_1(exch->phone, imethod, arg1, NULL, NULL );1719 ipc_call_async_1(exch->phone, imethod, arg1, NULL, NULL, true); 1968 1720 } 1969 1721 … … 1972 1724 { 1973 1725 if (exch != NULL) 1974 ipc_call_async_2(exch->phone, imethod, arg1, arg2, NULL, NULL); 1726 ipc_call_async_2(exch->phone, imethod, arg1, arg2, NULL, NULL, 1727 true); 1975 1728 } 1976 1729 … … 1980 1733 if (exch != NULL) 1981 1734 ipc_call_async_3(exch->phone, imethod, arg1, arg2, arg3, NULL, 1982 NULL );1735 NULL, true); 1983 1736 } 1984 1737 … … 1988 1741 if (exch != NULL) 1989 1742 ipc_call_async_4(exch->phone, imethod, arg1, arg2, arg3, arg4, 1990 NULL, NULL );1743 NULL, NULL, true); 1991 1744 } 1992 1745 … … 1996 1749 if (exch != NULL) 1997 1750 ipc_call_async_5(exch->phone, imethod, arg1, arg2, arg3, arg4, 1998 arg5, NULL, NULL );1751 arg5, NULL, NULL, true); 1999 1752 } 2000 1753 … … 2061 1814 * @param arg2 User defined argument. 2062 1815 * @param arg3 User defined argument. 1816 * @param client_receiver Connection handing routine. 2063 1817 * 2064 1818 * @return Zero on success or a negative error code. … … 2066 1820 */ 2067 1821 int async_connect_to_me(async_exch_t *exch, sysarg_t arg1, sysarg_t arg2, 2068 sysarg_t arg3 )1822 sysarg_t arg3, async_client_conn_t client_receiver, void *carg) 2069 1823 { 2070 1824 if (exch == NULL) 2071 1825 return ENOENT; 2072 1826 1827 sysarg_t phone_hash; 1828 sysarg_t rc; 1829 1830 aid_t req; 2073 1831 ipc_call_t answer; 2074 aid_treq = async_send_3(exch, IPC_M_CONNECT_TO_ME, arg1, arg2, arg3,1832 req = async_send_3(exch, IPC_M_CONNECT_TO_ME, arg1, arg2, arg3, 2075 1833 &answer); 2076 2077 sysarg_t rc;2078 1834 async_wait_for(req, &rc); 2079 1835 if (rc != EOK) 2080 1836 return (int) rc; 1837 1838 phone_hash = IPC_GET_ARG5(answer); 1839 1840 if (client_receiver != NULL) 1841 async_new_connection(answer.in_task_id, phone_hash, 0, NULL, 1842 client_receiver, carg); 2081 1843 2082 1844 return EOK; … … 2119 1881 2120 1882 ipc_call_async_0(exch->phone, IPC_M_CLONE_ESTABLISH, msg, 2121 reply_received );1883 reply_received, true); 2122 1884 2123 1885 sysarg_t rc; … … 2138 1900 } 2139 1901 2140 sess->iface = 0;2141 1902 sess->mgmt = mgmt; 2142 1903 sess->phone = phone; … … 2168 1929 2169 1930 ipc_call_async_4(phone, IPC_M_CONNECT_ME_TO, arg1, arg2, arg3, arg4, 2170 msg, reply_received );1931 msg, reply_received, true); 2171 1932 2172 1933 sysarg_t rc; … … 2208 1969 int phone = async_connect_me_to_internal(exch->phone, arg1, arg2, arg3, 2209 1970 0); 1971 2210 1972 if (phone < 0) { 2211 1973 errno = phone; … … 2214 1976 } 2215 1977 2216 sess->iface = 0;2217 1978 sess->mgmt = mgmt; 2218 1979 sess->phone = phone; … … 2231 1992 } 2232 1993 2233 /** Wrapper for making IPC_M_CONNECT_ME_TO calls using the async framework.2234 *2235 * Ask through phone for a new connection to some service and block until2236 * success.2237 *2238 * @param exch Exchange for sending the message.2239 * @param iface Connection interface.2240 * @param arg2 User defined argument.2241 * @param arg3 User defined argument.2242 *2243 * @return New session on success or NULL on error.2244 *2245 */2246 async_sess_t *async_connect_me_to_iface(async_exch_t *exch, iface_t iface,2247 sysarg_t arg2, sysarg_t arg3)2248 {2249 if (exch == NULL) {2250 errno = ENOENT;2251 return NULL;2252 }2253 2254 async_sess_t *sess = (async_sess_t *) malloc(sizeof(async_sess_t));2255 if (sess == NULL) {2256 errno = ENOMEM;2257 return NULL;2258 }2259 2260 int phone = async_connect_me_to_internal(exch->phone, iface, arg2,2261 arg3, 0);2262 if (phone < 0) {2263 errno = phone;2264 free(sess);2265 return NULL;2266 }2267 2268 sess->iface = iface;2269 sess->phone = phone;2270 sess->arg1 = iface;2271 sess->arg2 = arg2;2272 sess->arg3 = arg3;2273 2274 fibril_mutex_initialize(&sess->remote_state_mtx);2275 sess->remote_state_data = NULL;2276 2277 list_initialize(&sess->exch_list);2278 fibril_mutex_initialize(&sess->mutex);2279 atomic_set(&sess->refcnt, 0);2280 2281 return sess;2282 }2283 2284 1994 /** Set arguments for new connections. 2285 1995 * … … 2337 2047 } 2338 2048 2339 sess->iface = 0;2340 2049 sess->mgmt = mgmt; 2341 2050 sess->phone = phone; … … 2354 2063 } 2355 2064 2356 /** Wrapper for making IPC_M_CONNECT_ME_TO calls using the async framework.2357 *2358 * Ask through phone for a new connection to some service and block until2359 * success.2360 *2361 * @param exch Exchange for sending the message.2362 * @param iface Connection interface.2363 * @param arg2 User defined argument.2364 * @param arg3 User defined argument.2365 *2366 * @return New session on success or NULL on error.2367 *2368 */2369 async_sess_t *async_connect_me_to_blocking_iface(async_exch_t *exch, iface_t iface,2370 sysarg_t arg2, sysarg_t arg3)2371 {2372 if (exch == NULL) {2373 errno = ENOENT;2374 return NULL;2375 }2376 2377 async_sess_t *sess = (async_sess_t *) malloc(sizeof(async_sess_t));2378 if (sess == NULL) {2379 errno = ENOMEM;2380 return NULL;2381 }2382 2383 int phone = async_connect_me_to_internal(exch->phone, iface, arg2,2384 arg3, IPC_FLAG_BLOCKING);2385 if (phone < 0) {2386 errno = phone;2387 free(sess);2388 return NULL;2389 }2390 2391 sess->iface = iface;2392 sess->phone = phone;2393 sess->arg1 = iface;2394 sess->arg2 = arg2;2395 sess->arg3 = arg3;2396 2397 fibril_mutex_initialize(&sess->remote_state_mtx);2398 sess->remote_state_data = NULL;2399 2400 list_initialize(&sess->exch_list);2401 fibril_mutex_initialize(&sess->mutex);2402 atomic_set(&sess->refcnt, 0);2403 2404 return sess;2405 }2406 2407 2065 /** Connect to a task specified by id. 2408 2066 * … … 2423 2081 } 2424 2082 2425 sess->iface = 0;2426 2083 sess->mgmt = EXCHANGE_ATOMIC; 2427 2084 sess->phone = phone; … … 2501 2158 return NULL; 2502 2159 2503 exch_mgmt_t mgmt = sess->mgmt; 2504 if (sess->iface != 0) 2505 mgmt = sess->iface & IFACE_EXCHANGE_MASK; 2506 2507 async_exch_t *exch = NULL; 2160 async_exch_t *exch; 2508 2161 2509 2162 fibril_mutex_lock(&async_sess_mutex); … … 2524 2177 */ 2525 2178 2526 if (( mgmt == EXCHANGE_ATOMIC) ||2527 ( mgmt == EXCHANGE_SERIALIZE)) {2179 if ((sess->mgmt == EXCHANGE_ATOMIC) || 2180 (sess->mgmt == EXCHANGE_SERIALIZE)) { 2528 2181 exch = (async_exch_t *) malloc(sizeof(async_exch_t)); 2529 2182 if (exch != NULL) { … … 2533 2186 exch->phone = sess->phone; 2534 2187 } 2535 } else if (mgmt == EXCHANGE_PARALLEL) { 2536 int phone; 2537 2538 retry: 2188 } else { /* EXCHANGE_PARALLEL */ 2539 2189 /* 2540 2190 * Make a one-time attempt to connect a new data phone. 2541 2191 */ 2192 2193 int phone; 2194 2195 retry: 2542 2196 phone = async_connect_me_to_internal(sess->phone, sess->arg1, 2543 2197 sess->arg2, sess->arg3, 0); … … 2581 2235 atomic_inc(&sess->refcnt); 2582 2236 2583 if ( mgmt == EXCHANGE_SERIALIZE)2237 if (sess->mgmt == EXCHANGE_SERIALIZE) 2584 2238 fibril_mutex_lock(&sess->mutex); 2585 2239 } … … 2601 2255 assert(sess != NULL); 2602 2256 2603 exch_mgmt_t mgmt = sess->mgmt;2604 if (sess->iface != 0)2605 mgmt = sess->iface & IFACE_EXCHANGE_MASK;2606 2607 2257 atomic_dec(&sess->refcnt); 2608 2258 2609 if ( mgmt == EXCHANGE_SERIALIZE)2259 if (sess->mgmt == EXCHANGE_SERIALIZE) 2610 2260 fibril_mutex_unlock(&sess->mutex); 2611 2261 … … 3044 2694 } 3045 2695 3046 void * arg_data;2696 void *_data; 3047 2697 3048 2698 if (nullterm) 3049 arg_data = malloc(size + 1);2699 _data = malloc(size + 1); 3050 2700 else 3051 arg_data = malloc(size);3052 3053 if ( arg_data == NULL) {2701 _data = malloc(size); 2702 2703 if (_data == NULL) { 3054 2704 ipc_answer_0(callid, ENOMEM); 3055 2705 return ENOMEM; 3056 2706 } 3057 2707 3058 int rc = async_data_write_finalize(callid, arg_data, size);2708 int rc = async_data_write_finalize(callid, _data, size); 3059 2709 if (rc != EOK) { 3060 free( arg_data);2710 free(_data); 3061 2711 return rc; 3062 2712 } 3063 2713 3064 2714 if (nullterm) 3065 ((char *) arg_data)[size] = 0;3066 3067 *data = arg_data;2715 ((char *) _data)[size] = 0; 2716 2717 *data = _data; 3068 2718 if (received != NULL) 3069 2719 *received = size; … … 3163 2813 } 3164 2814 3165 sess->iface = 0;3166 2815 sess->mgmt = mgmt; 3167 2816 sess->phone = phone; … … 3213 2862 } 3214 2863 3215 sess->iface = 0;3216 2864 sess->mgmt = mgmt; 3217 2865 sess->phone = phone; … … 3259 2907 return NULL; 3260 2908 3261 sess->iface = 0;3262 2909 sess->mgmt = mgmt; 3263 2910 sess->phone = phone; … … 3287 2934 { 3288 2935 assert(callid); 3289 2936 3290 2937 ipc_call_t call; 3291 2938 *callid = async_get_call(&call); 3292 2939 3293 2940 if (IPC_GET_IMETHOD(call) != IPC_M_STATE_CHANGE_AUTHORIZE) 3294 2941 return false; … … 3300 2947 if (arg3) 3301 2948 *arg3 = IPC_GET_ARG3(call); 3302 2949 3303 2950 return true; 3304 2951 }
Note:
See TracChangeset
for help on using the changeset viewer.