Changeset ff381a7 in mainline for uspace/lib/c/generic/async.c
- Timestamp:
- 2015-11-02T20:54:19Z (8 years ago)
- Branches:
- lfn, master, serial, ticket/834-toolchain-update, topic/msim-upgrade, topic/simplify-dev-export
- Children:
- d8513177
- Parents:
- 3feeab2 (diff), 5265eea4 (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the(diff)
links above to see all the changes relative to each parent. - File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
uspace/lib/c/generic/async.c
r3feeab2 rff381a7 77 77 * } 78 78 * 79 * my_client_connection(icallid, *icall)79 * port_handler(icallid, *icall) 80 80 * { 81 81 * if (want_refuse) { … … 123 123 list_t exch_list; 124 124 125 /** Session interface */ 126 iface_t iface; 127 125 128 /** Exchange management style */ 126 129 exch_mgmt_t mgmt; … … 189 192 /** If reply was received. */ 190 193 bool done; 191 194 192 195 /** If the message / reply should be discarded on arrival. */ 193 196 bool forget; 194 197 195 198 /** If already destroyed. */ 196 199 bool destroyed; … … 232 235 /** Identification of the opening call. */ 233 236 ipc_callid_t callid; 237 234 238 /** Call data of the opening call. */ 235 239 ipc_call_t call; 236 /** Local argument or NULL if none. */237 void *carg;238 240 239 241 /** Identification of the closing call. */ … … 241 243 242 244 /** Fibril function that will be used to handle the connection. */ 243 async_client_conn_t cfibril; 245 async_port_handler_t handler; 246 247 /** Client data */ 248 void *data; 244 249 } connection_t; 250 251 /** Interface data */ 252 typedef struct { 253 ht_link_t link; 254 255 /** Interface ID */ 256 iface_t iface; 257 258 /** Futex protecting the hash table */ 259 futex_t futex; 260 261 /** Interface ports */ 262 hash_table_t port_hash_table; 263 264 /** Next available port ID */ 265 port_id_t port_id_avail; 266 } interface_t; 267 268 /* Port data */ 269 typedef struct { 270 ht_link_t link; 271 272 /** Port ID */ 273 port_id_t id; 274 275 /** Port connection handler */ 276 async_port_handler_t handler; 277 278 /** Client data */ 279 void *data; 280 } port_t; 245 281 246 282 /* Notification data */ … … 264 300 { 265 301 struct timeval tv = { 0, 0 }; 266 302 267 303 to->inlist = false; 268 304 to->occurred = false; … … 287 323 static amsg_t *amsg_create(void) 288 324 { 289 amsg_t *msg; 290 291 msg = malloc(sizeof(amsg_t)); 325 amsg_t *msg = malloc(sizeof(amsg_t)); 292 326 if (msg) { 293 327 msg->done = false; … … 298 332 awaiter_initialize(&msg->wdata); 299 333 } 300 334 301 335 return msg; 302 336 } … … 335 369 } 336 370 337 /** Default fibril function that gets called to handle new connection. 338 * 339 * This function is defined as a weak symbol - to be redefined in user code. 371 /** Default fallback fibril function. 372 * 373 * This fallback fibril function gets called on incomming 374 * connections that do not have a specific handler defined. 340 375 * 341 376 * @param callid Hash of the incoming call. … … 344 379 * 345 380 */ 346 static void default_ client_connection(ipc_callid_t callid, ipc_call_t *call,381 static void default_fallback_port_handler(ipc_callid_t callid, ipc_call_t *call, 347 382 void *arg) 348 383 { … … 350 385 } 351 386 352 static async_client_conn_t client_connection = default_client_connection; 387 static async_port_handler_t fallback_port_handler = 388 default_fallback_port_handler; 389 static void *fallback_port_data = NULL; 390 391 static hash_table_t interface_hash_table; 392 393 static size_t interface_key_hash(void *key) 394 { 395 iface_t iface = *(iface_t *) key; 396 return iface; 397 } 398 399 static size_t interface_hash(const ht_link_t *item) 400 { 401 interface_t *interface = hash_table_get_inst(item, interface_t, link); 402 return interface_key_hash(&interface->iface); 403 } 404 405 static bool interface_key_equal(void *key, const ht_link_t *item) 406 { 407 iface_t iface = *(iface_t *) key; 408 interface_t *interface = hash_table_get_inst(item, interface_t, link); 409 return iface == interface->iface; 410 } 411 412 /** Operations for the port hash table. */ 413 static hash_table_ops_t interface_hash_table_ops = { 414 .hash = interface_hash, 415 .key_hash = interface_key_hash, 416 .key_equal = interface_key_equal, 417 .equal = NULL, 418 .remove_callback = NULL 419 }; 420 421 static size_t port_key_hash(void *key) 422 { 423 port_id_t port_id = *(port_id_t *) key; 424 return port_id; 425 } 426 427 static size_t port_hash(const ht_link_t *item) 428 { 429 port_t *port = hash_table_get_inst(item, port_t, link); 430 return port_key_hash(&port->id); 431 } 432 433 static bool port_key_equal(void *key, const ht_link_t *item) 434 { 435 port_id_t port_id = *(port_id_t *) key; 436 port_t *port = hash_table_get_inst(item, port_t, link); 437 return port_id == port->id; 438 } 439 440 /** Operations for the port hash table. */ 441 static hash_table_ops_t port_hash_table_ops = { 442 .hash = port_hash, 443 .key_hash = port_key_hash, 444 .key_equal = port_key_equal, 445 .equal = NULL, 446 .remove_callback = NULL 447 }; 448 449 static interface_t *async_new_interface(iface_t iface) 450 { 451 interface_t *interface = 452 (interface_t *) malloc(sizeof(interface_t)); 453 if (!interface) 454 return NULL; 455 456 bool ret = hash_table_create(&interface->port_hash_table, 0, 0, 457 &port_hash_table_ops); 458 if (!ret) { 459 free(interface); 460 return NULL; 461 } 462 463 interface->iface = iface; 464 futex_initialize(&interface->futex, 1); 465 interface->port_id_avail = 0; 466 467 hash_table_insert(&interface_hash_table, &interface->link); 468 469 return interface; 470 } 471 472 static port_t *async_new_port(interface_t *interface, 473 async_port_handler_t handler, void *data) 474 { 475 port_t *port = (port_t *) malloc(sizeof(port_t)); 476 if (!port) 477 return NULL; 478 479 futex_down(&interface->futex); 480 481 port_id_t id = interface->port_id_avail; 482 interface->port_id_avail++; 483 484 port->id = id; 485 port->handler = handler; 486 port->data = data; 487 488 hash_table_insert(&interface->port_hash_table, &port->link); 489 490 futex_up(&interface->futex); 491 492 return port; 493 } 494 353 495 static size_t notification_handler_stksz = FIBRIL_DFLT_STK_SIZE; 354 496 355 /** Setter for client_connection function pointer.356 *357 * @param conn Function that will implement a new connection fibril.358 *359 */360 void async_set_client_connection(async_client_conn_t conn)361 {362 assert(client_connection == default_client_connection);363 client_connection = conn;364 }365 366 497 /** Set the stack size for the notification handler notification fibrils. 367 498 * … … 387 518 */ 388 519 static FIBRIL_CONDVAR_INITIALIZE(avail_phone_cv); 520 521 int async_create_port(iface_t iface, async_port_handler_t handler, 522 void *data, port_id_t *port_id) 523 { 524 if ((iface & IFACE_MOD_MASK) == IFACE_MOD_CALLBACK) 525 return EINVAL; 526 527 interface_t *interface; 528 529 futex_down(&async_futex); 530 531 ht_link_t *link = hash_table_find(&interface_hash_table, &iface); 532 if (link) 533 interface = hash_table_get_inst(link, interface_t, link); 534 else 535 interface = async_new_interface(iface); 536 537 if (!interface) { 538 futex_up(&async_futex); 539 return ENOMEM; 540 } 541 542 port_t *port = async_new_port(interface, handler, data); 543 if (!port) { 544 futex_up(&async_futex); 545 return ENOMEM; 546 } 547 548 *port_id = port->id; 549 550 futex_up(&async_futex); 551 552 return EOK; 553 } 554 555 void async_set_fallback_port_handler(async_port_handler_t handler, void *data) 556 { 557 assert(handler != NULL); 558 559 fallback_port_handler = handler; 560 fallback_port_data = data; 561 } 389 562 390 563 static hash_table_t client_hash_table; … … 457 630 .remove_callback = NULL 458 631 }; 632 633 static client_t *async_client_get(task_id_t client_id, bool create) 634 { 635 client_t *client = NULL; 636 637 futex_down(&async_futex); 638 ht_link_t *link = hash_table_find(&client_hash_table, &client_id); 639 if (link) { 640 client = hash_table_get_inst(link, client_t, link); 641 atomic_inc(&client->refcnt); 642 } else if (create) { 643 client = malloc(sizeof(client_t)); 644 if (client) { 645 client->in_task_id = client_id; 646 client->data = async_client_data_create(); 647 648 atomic_set(&client->refcnt, 1); 649 hash_table_insert(&client_hash_table, &client->link); 650 } 651 } 652 653 futex_up(&async_futex); 654 return client; 655 } 656 657 static void async_client_put(client_t *client) 658 { 659 bool destroy; 660 661 futex_down(&async_futex); 662 663 if (atomic_predec(&client->refcnt) == 0) { 664 hash_table_remove(&client_hash_table, &client->in_task_id); 665 destroy = true; 666 } else 667 destroy = false; 668 669 futex_up(&async_futex); 670 671 if (destroy) { 672 if (client->data) 673 async_client_data_destroy(client->data); 674 675 free(client); 676 } 677 } 678 679 /** Wrapper for client connection fibril. 680 * 681 * When a new connection arrives, a fibril with this implementing 682 * function is created. 683 * 684 * @param arg Connection structure pointer. 685 * 686 * @return Always zero. 687 * 688 */ 689 static int connection_fibril(void *arg) 690 { 691 assert(arg); 692 693 /* 694 * Setup fibril-local connection pointer. 695 */ 696 fibril_connection = (connection_t *) arg; 697 698 /* 699 * Add our reference for the current connection in the client task 700 * tracking structure. If this is the first reference, create and 701 * hash in a new tracking structure. 702 */ 703 704 client_t *client = async_client_get(fibril_connection->in_task_id, true); 705 if (!client) { 706 ipc_answer_0(fibril_connection->callid, ENOMEM); 707 return 0; 708 } 709 710 fibril_connection->client = client; 711 712 /* 713 * Call the connection handler function. 714 */ 715 fibril_connection->handler(fibril_connection->callid, 716 &fibril_connection->call, fibril_connection->data); 717 718 /* 719 * Remove the reference for this client task connection. 720 */ 721 async_client_put(client); 722 723 /* 724 * Remove myself from the connection hash table. 725 */ 726 futex_down(&async_futex); 727 hash_table_remove(&conn_hash_table, &fibril_connection->in_phone_hash); 728 futex_up(&async_futex); 729 730 /* 731 * Answer all remaining messages with EHANGUP. 732 */ 733 while (!list_empty(&fibril_connection->msg_queue)) { 734 msg_t *msg = 735 list_get_instance(list_first(&fibril_connection->msg_queue), 736 msg_t, link); 737 738 list_remove(&msg->link); 739 ipc_answer_0(msg->callid, EHANGUP); 740 free(msg); 741 } 742 743 /* 744 * If the connection was hung-up, answer the last call, 745 * i.e. IPC_M_PHONE_HUNGUP. 746 */ 747 if (fibril_connection->close_callid) 748 ipc_answer_0(fibril_connection->close_callid, EOK); 749 750 free(fibril_connection); 751 return 0; 752 } 753 754 /** Create a new fibril for a new connection. 755 * 756 * Create new fibril for connection, fill in connection structures 757 * and insert it into the hash table, so that later we can easily 758 * do routing of messages to particular fibrils. 759 * 760 * @param in_task_id Identification of the incoming connection. 761 * @param in_phone_hash Identification of the incoming connection. 762 * @param callid Hash of the opening IPC_M_CONNECT_ME_TO call. 763 * If callid is zero, the connection was opened by 764 * accepting the IPC_M_CONNECT_TO_ME call and this 765 * function is called directly by the server. 766 * @param call Call data of the opening call. 767 * @param handler Connection handler. 768 * @param data Client argument to pass to the connection handler. 769 * 770 * @return New fibril id or NULL on failure. 771 * 772 */ 773 static fid_t async_new_connection(task_id_t in_task_id, sysarg_t in_phone_hash, 774 ipc_callid_t callid, ipc_call_t *call, async_port_handler_t handler, 775 void *data) 776 { 777 connection_t *conn = malloc(sizeof(*conn)); 778 if (!conn) { 779 if (callid) 780 ipc_answer_0(callid, ENOMEM); 781 782 return (uintptr_t) NULL; 783 } 784 785 conn->in_task_id = in_task_id; 786 conn->in_phone_hash = in_phone_hash; 787 list_initialize(&conn->msg_queue); 788 conn->callid = callid; 789 conn->close_callid = 0; 790 conn->handler = handler; 791 conn->data = data; 792 793 if (call) 794 conn->call = *call; 795 796 /* We will activate the fibril ASAP */ 797 conn->wdata.active = true; 798 conn->wdata.fid = fibril_create(connection_fibril, conn); 799 800 if (conn->wdata.fid == 0) { 801 free(conn); 802 803 if (callid) 804 ipc_answer_0(callid, ENOMEM); 805 806 return (uintptr_t) NULL; 807 } 808 809 /* Add connection to the connection hash table */ 810 811 futex_down(&async_futex); 812 hash_table_insert(&conn_hash_table, &conn->link); 813 futex_up(&async_futex); 814 815 fibril_add_ready(conn->wdata.fid); 816 817 return conn->wdata.fid; 818 } 819 820 /** Wrapper for making IPC_M_CONNECT_TO_ME calls using the async framework. 821 * 822 * Ask through phone for a new connection to some service. 823 * 824 * @param exch Exchange for sending the message. 825 * @param iface Callback interface. 826 * @param arg1 User defined argument. 827 * @param arg2 User defined argument. 828 * @param handler Callback handler. 829 * @param data Handler data. 830 * @param port_id ID of the newly created port. 831 * 832 * @return Zero on success or a negative error code. 833 * 834 */ 835 int async_create_callback_port(async_exch_t *exch, iface_t iface, sysarg_t arg1, 836 sysarg_t arg2, async_port_handler_t handler, void *data, port_id_t *port_id) 837 { 838 if ((iface & IFACE_MOD_CALLBACK) != IFACE_MOD_CALLBACK) 839 return EINVAL; 840 841 if (exch == NULL) 842 return ENOENT; 843 844 ipc_call_t answer; 845 aid_t req = async_send_3(exch, IPC_M_CONNECT_TO_ME, iface, arg1, arg2, 846 &answer); 847 848 sysarg_t ret; 849 async_wait_for(req, &ret); 850 if (ret != EOK) 851 return (int) ret; 852 853 sysarg_t phone_hash = IPC_GET_ARG5(answer); 854 interface_t *interface; 855 856 futex_down(&async_futex); 857 858 ht_link_t *link = hash_table_find(&interface_hash_table, &iface); 859 if (link) 860 interface = hash_table_get_inst(link, interface_t, link); 861 else 862 interface = async_new_interface(iface); 863 864 if (!interface) { 865 futex_up(&async_futex); 866 return ENOMEM; 867 } 868 869 port_t *port = async_new_port(interface, handler, data); 870 if (!port) { 871 futex_up(&async_futex); 872 return ENOMEM; 873 } 874 875 *port_id = port->id; 876 877 futex_up(&async_futex); 878 879 fid_t fid = async_new_connection(answer.in_task_id, phone_hash, 880 0, NULL, handler, data); 881 if (fid == (uintptr_t) NULL) 882 return ENOMEM; 883 884 return EOK; 885 } 459 886 460 887 static size_t notification_key_hash(void *key) … … 866 1293 } 867 1294 868 msg_t *msg = list_get_instance(list_first(&conn->msg_queue), msg_t, link); 1295 msg_t *msg = list_get_instance(list_first(&conn->msg_queue), 1296 msg_t, link); 869 1297 list_remove(&msg->link); 870 1298 … … 877 1305 } 878 1306 879 static client_t *async_client_get(task_id_t client_id, bool create)880 {881 client_t *client = NULL;882 883 futex_down(&async_futex);884 ht_link_t *link = hash_table_find(&client_hash_table, &client_id);885 if (link) {886 client = hash_table_get_inst(link, client_t, link);887 atomic_inc(&client->refcnt);888 } else if (create) {889 client = malloc(sizeof(client_t));890 if (client) {891 client->in_task_id = client_id;892 client->data = async_client_data_create();893 894 atomic_set(&client->refcnt, 1);895 hash_table_insert(&client_hash_table, &client->link);896 }897 }898 899 futex_up(&async_futex);900 return client;901 }902 903 static void async_client_put(client_t *client)904 {905 bool destroy;906 907 futex_down(&async_futex);908 909 if (atomic_predec(&client->refcnt) == 0) {910 hash_table_remove(&client_hash_table, &client->in_task_id);911 destroy = true;912 } else913 destroy = false;914 915 futex_up(&async_futex);916 917 if (destroy) {918 if (client->data)919 async_client_data_destroy(client->data);920 921 free(client);922 }923 }924 925 1307 void *async_get_client_data(void) 926 1308 { … … 934 1316 if (!client) 935 1317 return NULL; 1318 936 1319 if (!client->data) { 937 1320 async_client_put(client); 938 1321 return NULL; 939 1322 } 940 1323 941 1324 return client->data; 942 1325 } … … 945 1328 { 946 1329 client_t *client = async_client_get(client_id, false); 947 1330 948 1331 assert(client); 949 1332 assert(client->data); 950 1333 951 1334 /* Drop the reference we got in async_get_client_data_by_hash(). */ 952 1335 async_client_put(client); 953 1336 954 1337 /* Drop our own reference we got at the beginning of this function. */ 955 1338 async_client_put(client); 956 1339 } 957 1340 958 /** Wrapper for client connection fibril. 959 * 960 * When a new connection arrives, a fibril with this implementing function is 961 * created. It calls client_connection() and does the final cleanup. 962 * 963 * @param arg Connection structure pointer. 964 * 965 * @return Always zero. 966 * 967 */ 968 static int connection_fibril(void *arg) 969 { 970 assert(arg); 971 972 /* 973 * Setup fibril-local connection pointer. 974 */ 975 fibril_connection = (connection_t *) arg; 976 977 /* 978 * Add our reference for the current connection in the client task 979 * tracking structure. If this is the first reference, create and 980 * hash in a new tracking structure. 981 */ 982 983 client_t *client = async_client_get(fibril_connection->in_task_id, true); 984 if (!client) { 985 ipc_answer_0(fibril_connection->callid, ENOMEM); 986 return 0; 987 } 988 989 fibril_connection->client = client; 990 991 /* 992 * Call the connection handler function. 993 */ 994 fibril_connection->cfibril(fibril_connection->callid, 995 &fibril_connection->call, fibril_connection->carg); 996 997 /* 998 * Remove the reference for this client task connection. 999 */ 1000 async_client_put(client); 1001 1002 /* 1003 * Remove myself from the connection hash table. 1004 */ 1341 static port_t *async_find_port(iface_t iface, port_id_t port_id) 1342 { 1343 port_t *port = NULL; 1344 1005 1345 futex_down(&async_futex); 1006 hash_table_remove(&conn_hash_table, &fibril_connection->in_phone_hash); 1346 1347 ht_link_t *link = hash_table_find(&interface_hash_table, &iface); 1348 if (link) { 1349 interface_t *interface = 1350 hash_table_get_inst(link, interface_t, link); 1351 1352 link = hash_table_find(&interface->port_hash_table, &port_id); 1353 if (link) 1354 port = hash_table_get_inst(link, port_t, link); 1355 } 1356 1007 1357 futex_up(&async_futex); 1008 1358 1009 /* 1010 * Answer all remaining messages with EHANGUP. 1011 */ 1012 while (!list_empty(&fibril_connection->msg_queue)) { 1013 msg_t *msg = 1014 list_get_instance(list_first(&fibril_connection->msg_queue), 1015 msg_t, link); 1016 1017 list_remove(&msg->link); 1018 ipc_answer_0(msg->callid, EHANGUP); 1019 free(msg); 1020 } 1021 1022 /* 1023 * If the connection was hung-up, answer the last call, 1024 * i.e. IPC_M_PHONE_HUNGUP. 1025 */ 1026 if (fibril_connection->close_callid) 1027 ipc_answer_0(fibril_connection->close_callid, EOK); 1028 1029 free(fibril_connection); 1030 return 0; 1031 } 1032 1033 /** Create a new fibril for a new connection. 1034 * 1035 * Create new fibril for connection, fill in connection structures and insert 1036 * it into the hash table, so that later we can easily do routing of messages to 1037 * particular fibrils. 1038 * 1039 * @param in_task_id Identification of the incoming connection. 1040 * @param in_phone_hash Identification of the incoming connection. 1041 * @param callid Hash of the opening IPC_M_CONNECT_ME_TO call. 1042 * If callid is zero, the connection was opened by 1043 * accepting the IPC_M_CONNECT_TO_ME call and this function 1044 * is called directly by the server. 1045 * @param call Call data of the opening call. 1046 * @param cfibril Fibril function that should be called upon opening the 1047 * connection. 1048 * @param carg Extra argument to pass to the connection fibril 1049 * 1050 * @return New fibril id or NULL on failure. 1051 * 1052 */ 1053 fid_t async_new_connection(task_id_t in_task_id, sysarg_t in_phone_hash, 1054 ipc_callid_t callid, ipc_call_t *call, 1055 async_client_conn_t cfibril, void *carg) 1056 { 1057 connection_t *conn = malloc(sizeof(*conn)); 1058 if (!conn) { 1059 if (callid) 1060 ipc_answer_0(callid, ENOMEM); 1061 1062 return (uintptr_t) NULL; 1063 } 1064 1065 conn->in_task_id = in_task_id; 1066 conn->in_phone_hash = in_phone_hash; 1067 list_initialize(&conn->msg_queue); 1068 conn->callid = callid; 1069 conn->close_callid = 0; 1070 conn->carg = carg; 1071 1072 if (call) 1073 conn->call = *call; 1074 1075 /* We will activate the fibril ASAP */ 1076 conn->wdata.active = true; 1077 conn->cfibril = cfibril; 1078 conn->wdata.fid = fibril_create(connection_fibril, conn); 1079 1080 if (conn->wdata.fid == 0) { 1081 free(conn); 1082 1083 if (callid) 1084 ipc_answer_0(callid, ENOMEM); 1085 1086 return (uintptr_t) NULL; 1087 } 1088 1089 /* Add connection to the connection hash table */ 1090 1091 futex_down(&async_futex); 1092 hash_table_insert(&conn_hash_table, &conn->link); 1093 futex_up(&async_futex); 1094 1095 fibril_add_ready(conn->wdata.fid); 1096 1097 return conn->wdata.fid; 1359 return port; 1098 1360 } 1099 1361 … … 1111 1373 assert(call); 1112 1374 1113 /* Unrouted call - take some default action */1375 /* Kernel notification */ 1114 1376 if ((callid & IPC_CALLID_NOTIFICATION)) { 1115 1377 process_notification(callid, call); … … 1117 1379 } 1118 1380 1119 switch (IPC_GET_IMETHOD(*call)) { 1120 case IPC_M_CLONE_ESTABLISH: 1121 case IPC_M_CONNECT_ME_TO: 1381 /* New connection */ 1382 if (IPC_GET_IMETHOD(*call) == IPC_M_CONNECT_ME_TO) { 1383 iface_t iface = (iface_t) IPC_GET_ARG1(*call); 1384 sysarg_t in_phone_hash = IPC_GET_ARG5(*call); 1385 1386 async_notification_handler_t handler = fallback_port_handler; 1387 void *data = fallback_port_data; 1388 1389 // TODO: Currently ignores all ports but the first one 1390 port_t *port = async_find_port(iface, 0); 1391 if (port) { 1392 handler = port->handler; 1393 data = port->data; 1394 } 1395 1396 async_new_connection(call->in_task_id, in_phone_hash, callid, 1397 call, handler, data); 1398 return; 1399 } 1400 1401 /* Cloned connection */ 1402 if (IPC_GET_IMETHOD(*call) == IPC_M_CLONE_ESTABLISH) { 1403 // TODO: Currently ignores ports altogether 1404 1122 1405 /* Open new connection with fibril, etc. */ 1123 1406 async_new_connection(call->in_task_id, IPC_GET_ARG5(*call), 1124 callid, call, client_connection, NULL);1407 callid, call, fallback_port_handler, fallback_port_data); 1125 1408 return; 1126 1409 } … … 1283 1566 void __async_init(void) 1284 1567 { 1568 if (!hash_table_create(&interface_hash_table, 0, 0, 1569 &interface_hash_table_ops)) 1570 abort(); 1571 1285 1572 if (!hash_table_create(&client_hash_table, 0, 0, &client_hash_table_ops)) 1286 1573 abort(); … … 1297 1584 abort(); 1298 1585 1586 session_ns->iface = 0; 1299 1587 session_ns->mgmt = EXCHANGE_ATOMIC; 1300 1588 session_ns->phone = PHONE_NS; … … 1343 1631 1344 1632 msg->done = true; 1345 1633 1346 1634 if (msg->forget) { 1347 1635 assert(msg->wdata.active); … … 1351 1639 fibril_add_ready(msg->wdata.fid); 1352 1640 } 1353 1641 1354 1642 futex_up(&async_futex); 1355 1643 } … … 1443 1731 1444 1732 futex_down(&async_futex); 1445 1733 1446 1734 assert(!msg->forget); 1447 1735 assert(!msg->destroyed); 1448 1736 1449 1737 if (msg->done) { 1450 1738 futex_up(&async_futex); … … 1487 1775 1488 1776 amsg_t *msg = (amsg_t *) amsgid; 1489 1777 1490 1778 futex_down(&async_futex); 1491 1779 1492 1780 assert(!msg->forget); 1493 1781 assert(!msg->destroyed); 1494 1782 1495 1783 if (msg->done) { 1496 1784 futex_up(&async_futex); … … 1504 1792 if (timeout < 0) 1505 1793 timeout = 0; 1506 1794 1507 1795 getuptime(&msg->wdata.to_event.expires); 1508 1796 tv_add_diff(&msg->wdata.to_event.expires, timeout); … … 1557 1845 { 1558 1846 amsg_t *msg = (amsg_t *) amsgid; 1559 1847 1560 1848 assert(msg); 1561 1849 assert(!msg->forget); 1562 1850 assert(!msg->destroyed); 1563 1851 1564 1852 futex_down(&async_futex); 1853 1565 1854 if (msg->done) { 1566 1855 amsg_destroy(msg); … … 1569 1858 msg->forget = true; 1570 1859 } 1860 1571 1861 futex_up(&async_futex); 1572 1862 } … … 1814 2104 * @param arg2 User defined argument. 1815 2105 * @param arg3 User defined argument. 1816 * @param client_receiver Connection handing routine.1817 2106 * 1818 2107 * @return Zero on success or a negative error code. … … 1820 2109 */ 1821 2110 int async_connect_to_me(async_exch_t *exch, sysarg_t arg1, sysarg_t arg2, 1822 sysarg_t arg3 , async_client_conn_t client_receiver, void *carg)2111 sysarg_t arg3) 1823 2112 { 1824 2113 if (exch == NULL) 1825 2114 return ENOENT; 1826 2115 1827 sysarg_t phone_hash; 2116 ipc_call_t answer; 2117 aid_t req = async_send_3(exch, IPC_M_CONNECT_TO_ME, arg1, arg2, arg3, 2118 &answer); 2119 1828 2120 sysarg_t rc; 1829 1830 aid_t req;1831 ipc_call_t answer;1832 req = async_send_3(exch, IPC_M_CONNECT_TO_ME, arg1, arg2, arg3,1833 &answer);1834 2121 async_wait_for(req, &rc); 1835 2122 if (rc != EOK) 1836 2123 return (int) rc; 1837 1838 phone_hash = IPC_GET_ARG5(answer);1839 1840 if (client_receiver != NULL)1841 async_new_connection(answer.in_task_id, phone_hash, 0, NULL,1842 client_receiver, carg);1843 2124 1844 2125 return EOK; … … 1900 2181 } 1901 2182 2183 sess->iface = 0; 1902 2184 sess->mgmt = mgmt; 1903 2185 sess->phone = phone; … … 1969 2251 int phone = async_connect_me_to_internal(exch->phone, arg1, arg2, arg3, 1970 2252 0); 1971 1972 2253 if (phone < 0) { 1973 2254 errno = phone; … … 1976 2257 } 1977 2258 2259 sess->iface = 0; 1978 2260 sess->mgmt = mgmt; 1979 2261 sess->phone = phone; … … 1992 2274 } 1993 2275 2276 /** Wrapper for making IPC_M_CONNECT_ME_TO calls using the async framework. 2277 * 2278 * Ask through phone for a new connection to some service and block until 2279 * success. 2280 * 2281 * @param exch Exchange for sending the message. 2282 * @param iface Connection interface. 2283 * @param arg2 User defined argument. 2284 * @param arg3 User defined argument. 2285 * 2286 * @return New session on success or NULL on error. 2287 * 2288 */ 2289 async_sess_t *async_connect_me_to_iface(async_exch_t *exch, iface_t iface, 2290 sysarg_t arg2, sysarg_t arg3) 2291 { 2292 if (exch == NULL) { 2293 errno = ENOENT; 2294 return NULL; 2295 } 2296 2297 async_sess_t *sess = (async_sess_t *) malloc(sizeof(async_sess_t)); 2298 if (sess == NULL) { 2299 errno = ENOMEM; 2300 return NULL; 2301 } 2302 2303 int phone = async_connect_me_to_internal(exch->phone, iface, arg2, 2304 arg3, 0); 2305 if (phone < 0) { 2306 errno = phone; 2307 free(sess); 2308 return NULL; 2309 } 2310 2311 sess->iface = iface; 2312 sess->phone = phone; 2313 sess->arg1 = iface; 2314 sess->arg2 = arg2; 2315 sess->arg3 = arg3; 2316 2317 fibril_mutex_initialize(&sess->remote_state_mtx); 2318 sess->remote_state_data = NULL; 2319 2320 list_initialize(&sess->exch_list); 2321 fibril_mutex_initialize(&sess->mutex); 2322 atomic_set(&sess->refcnt, 0); 2323 2324 return sess; 2325 } 2326 1994 2327 /** Set arguments for new connections. 1995 2328 * … … 2047 2380 } 2048 2381 2382 sess->iface = 0; 2049 2383 sess->mgmt = mgmt; 2050 2384 sess->phone = phone; … … 2063 2397 } 2064 2398 2399 /** Wrapper for making IPC_M_CONNECT_ME_TO calls using the async framework. 2400 * 2401 * Ask through phone for a new connection to some service and block until 2402 * success. 2403 * 2404 * @param exch Exchange for sending the message. 2405 * @param iface Connection interface. 2406 * @param arg2 User defined argument. 2407 * @param arg3 User defined argument. 2408 * 2409 * @return New session on success or NULL on error. 2410 * 2411 */ 2412 async_sess_t *async_connect_me_to_blocking_iface(async_exch_t *exch, iface_t iface, 2413 sysarg_t arg2, sysarg_t arg3) 2414 { 2415 if (exch == NULL) { 2416 errno = ENOENT; 2417 return NULL; 2418 } 2419 2420 async_sess_t *sess = (async_sess_t *) malloc(sizeof(async_sess_t)); 2421 if (sess == NULL) { 2422 errno = ENOMEM; 2423 return NULL; 2424 } 2425 2426 int phone = async_connect_me_to_internal(exch->phone, iface, arg2, 2427 arg3, IPC_FLAG_BLOCKING); 2428 if (phone < 0) { 2429 errno = phone; 2430 free(sess); 2431 return NULL; 2432 } 2433 2434 sess->iface = iface; 2435 sess->phone = phone; 2436 sess->arg1 = iface; 2437 sess->arg2 = arg2; 2438 sess->arg3 = arg3; 2439 2440 fibril_mutex_initialize(&sess->remote_state_mtx); 2441 sess->remote_state_data = NULL; 2442 2443 list_initialize(&sess->exch_list); 2444 fibril_mutex_initialize(&sess->mutex); 2445 atomic_set(&sess->refcnt, 0); 2446 2447 return sess; 2448 } 2449 2065 2450 /** Connect to a task specified by id. 2066 2451 * … … 2081 2466 } 2082 2467 2468 sess->iface = 0; 2083 2469 sess->mgmt = EXCHANGE_ATOMIC; 2084 2470 sess->phone = phone; … … 2158 2544 return NULL; 2159 2545 2160 async_exch_t *exch; 2546 exch_mgmt_t mgmt = sess->mgmt; 2547 if (sess->iface != 0) 2548 mgmt = sess->iface & IFACE_EXCHANGE_MASK; 2549 2550 async_exch_t *exch = NULL; 2161 2551 2162 2552 fibril_mutex_lock(&async_sess_mutex); … … 2177 2567 */ 2178 2568 2179 if (( sess->mgmt == EXCHANGE_ATOMIC) ||2180 ( sess->mgmt == EXCHANGE_SERIALIZE)) {2569 if ((mgmt == EXCHANGE_ATOMIC) || 2570 (mgmt == EXCHANGE_SERIALIZE)) { 2181 2571 exch = (async_exch_t *) malloc(sizeof(async_exch_t)); 2182 2572 if (exch != NULL) { … … 2186 2576 exch->phone = sess->phone; 2187 2577 } 2188 } else { /* EXCHANGE_PARALLEL */ 2578 } else if (mgmt == EXCHANGE_PARALLEL) { 2579 int phone; 2580 2581 retry: 2189 2582 /* 2190 2583 * Make a one-time attempt to connect a new data phone. 2191 2584 */ 2192 2193 int phone;2194 2195 retry:2196 2585 phone = async_connect_me_to_internal(sess->phone, sess->arg1, 2197 2586 sess->arg2, sess->arg3, 0); … … 2235 2624 atomic_inc(&sess->refcnt); 2236 2625 2237 if ( sess->mgmt == EXCHANGE_SERIALIZE)2626 if (mgmt == EXCHANGE_SERIALIZE) 2238 2627 fibril_mutex_lock(&sess->mutex); 2239 2628 } … … 2255 2644 assert(sess != NULL); 2256 2645 2646 exch_mgmt_t mgmt = sess->mgmt; 2647 if (sess->iface != 0) 2648 mgmt = sess->iface & IFACE_EXCHANGE_MASK; 2649 2257 2650 atomic_dec(&sess->refcnt); 2258 2651 2259 if ( sess->mgmt == EXCHANGE_SERIALIZE)2652 if (mgmt == EXCHANGE_SERIALIZE) 2260 2653 fibril_mutex_unlock(&sess->mutex); 2261 2654 … … 2694 3087 } 2695 3088 2696 void * _data;3089 void *arg_data; 2697 3090 2698 3091 if (nullterm) 2699 _data = malloc(size + 1);3092 arg_data = malloc(size + 1); 2700 3093 else 2701 _data = malloc(size);2702 2703 if ( _data == NULL) {3094 arg_data = malloc(size); 3095 3096 if (arg_data == NULL) { 2704 3097 ipc_answer_0(callid, ENOMEM); 2705 3098 return ENOMEM; 2706 3099 } 2707 3100 2708 int rc = async_data_write_finalize(callid, _data, size);3101 int rc = async_data_write_finalize(callid, arg_data, size); 2709 3102 if (rc != EOK) { 2710 free( _data);3103 free(arg_data); 2711 3104 return rc; 2712 3105 } 2713 3106 2714 3107 if (nullterm) 2715 ((char *) _data)[size] = 0;2716 2717 *data = _data;3108 ((char *) arg_data)[size] = 0; 3109 3110 *data = arg_data; 2718 3111 if (received != NULL) 2719 3112 *received = size; … … 2813 3206 } 2814 3207 3208 sess->iface = 0; 2815 3209 sess->mgmt = mgmt; 2816 3210 sess->phone = phone; … … 2862 3256 } 2863 3257 3258 sess->iface = 0; 2864 3259 sess->mgmt = mgmt; 2865 3260 sess->phone = phone; … … 2907 3302 return NULL; 2908 3303 3304 sess->iface = 0; 2909 3305 sess->mgmt = mgmt; 2910 3306 sess->phone = phone; … … 2934 3330 { 2935 3331 assert(callid); 2936 3332 2937 3333 ipc_call_t call; 2938 3334 *callid = async_get_call(&call); 2939 3335 2940 3336 if (IPC_GET_IMETHOD(call) != IPC_M_STATE_CHANGE_AUTHORIZE) 2941 3337 return false; … … 2947 3343 if (arg3) 2948 3344 *arg3 = IPC_GET_ARG3(call); 2949 3345 2950 3346 return true; 2951 3347 }
Note:
See TracChangeset
for help on using the changeset viewer.