Changes in uspace/lib/c/generic/async.c [9ef495f:7f9d97f3] in mainline
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
uspace/lib/c/generic/async.c
r9ef495f r7f9d97f3 77 77 * } 78 78 * 79 * port_handler(icallid, *icall)79 * my_client_connection(icallid, *icall) 80 80 * { 81 81 * if (want_refuse) { … … 123 123 list_t exch_list; 124 124 125 /** Session interface */126 iface_t iface;127 128 125 /** Exchange management style */ 129 126 exch_mgmt_t mgmt; … … 192 189 /** If reply was received. */ 193 190 bool done; 194 191 195 192 /** If the message / reply should be discarded on arrival. */ 196 193 bool forget; 197 194 198 195 /** If already destroyed. */ 199 196 bool destroyed; … … 235 232 /** Identification of the opening call. */ 236 233 ipc_callid_t callid; 237 238 234 /** Call data of the opening call. */ 239 235 ipc_call_t call; 236 /** Local argument or NULL if none. */ 237 void *carg; 240 238 241 239 /** Identification of the closing call. */ … … 243 241 244 242 /** Fibril function that will be used to handle the connection. */ 245 async_port_handler_t handler; 246 247 /** Client data */ 248 void *data; 243 async_client_conn_t cfibril; 249 244 } connection_t; 250 251 /** Interface data */252 typedef struct {253 ht_link_t link;254 255 /** Interface ID */256 iface_t iface;257 258 /** Futex protecting the hash table */259 futex_t futex;260 261 /** Interface ports */262 hash_table_t port_hash_table;263 264 /** Next available port ID */265 port_id_t port_id_avail;266 } interface_t;267 268 /* Port data */269 typedef struct {270 ht_link_t link;271 272 /** Port ID */273 port_id_t id;274 275 /** Port connection handler */276 async_port_handler_t handler;277 278 /** Client data */279 void *data;280 } port_t;281 245 282 246 /* Notification data */ … … 300 264 { 301 265 struct timeval tv = { 0, 0 }; 302 266 303 267 to->inlist = false; 304 268 to->occurred = false; … … 323 287 static amsg_t *amsg_create(void) 324 288 { 325 amsg_t *msg = malloc(sizeof(amsg_t)); 289 amsg_t *msg; 290 291 msg = malloc(sizeof(amsg_t)); 326 292 if (msg) { 327 293 msg->done = false; … … 332 298 awaiter_initialize(&msg->wdata); 333 299 } 334 300 335 301 return msg; 336 302 } … … 369 335 } 370 336 371 /** Default fallback fibril function. 372 * 373 * This fallback fibril function gets called on incomming 374 * connections that do not have a specific handler defined. 337 /** Default fibril function that gets called to handle new connection. 338 * 339 * This function is defined as a weak symbol - to be redefined in user code. 375 340 * 376 341 * @param callid Hash of the incoming call. … … 379 344 * 380 345 */ 381 static void default_ fallback_port_handler(ipc_callid_t callid, ipc_call_t *call,346 static void default_client_connection(ipc_callid_t callid, ipc_call_t *call, 382 347 void *arg) 383 348 { … … 385 350 } 386 351 387 static async_port_handler_t fallback_port_handler = 388 default_fallback_port_handler; 389 static void *fallback_port_data = NULL; 390 391 static hash_table_t interface_hash_table; 392 393 static size_t interface_key_hash(void *key) 394 { 395 iface_t iface = *(iface_t *) key; 396 return iface; 397 } 398 399 static size_t interface_hash(const ht_link_t *item) 400 { 401 interface_t *interface = hash_table_get_inst(item, interface_t, link); 402 return interface_key_hash(&interface->iface); 403 } 404 405 static bool interface_key_equal(void *key, const ht_link_t *item) 406 { 407 iface_t iface = *(iface_t *) key; 408 interface_t *interface = hash_table_get_inst(item, interface_t, link); 409 return iface == interface->iface; 410 } 411 412 /** Operations for the port hash table. */ 413 static hash_table_ops_t interface_hash_table_ops = { 414 .hash = interface_hash, 415 .key_hash = interface_key_hash, 416 .key_equal = interface_key_equal, 417 .equal = NULL, 418 .remove_callback = NULL 419 }; 420 421 static size_t port_key_hash(void *key) 422 { 423 port_id_t port_id = *(port_id_t *) key; 424 return port_id; 425 } 426 427 static size_t port_hash(const ht_link_t *item) 428 { 429 port_t *port = hash_table_get_inst(item, port_t, link); 430 return port_key_hash(&port->id); 431 } 432 433 static bool port_key_equal(void *key, const ht_link_t *item) 434 { 435 port_id_t port_id = *(port_id_t *) key; 436 port_t *port = hash_table_get_inst(item, port_t, link); 437 return port_id == port->id; 438 } 439 440 /** Operations for the port hash table. */ 441 static hash_table_ops_t port_hash_table_ops = { 442 .hash = port_hash, 443 .key_hash = port_key_hash, 444 .key_equal = port_key_equal, 445 .equal = NULL, 446 .remove_callback = NULL 447 }; 448 449 static interface_t *async_new_interface(iface_t iface) 450 { 451 interface_t *interface = 452 (interface_t *) malloc(sizeof(interface_t)); 453 if (!interface) 454 return NULL; 455 456 bool ret = hash_table_create(&interface->port_hash_table, 0, 0, 457 &port_hash_table_ops); 458 if (!ret) { 459 free(interface); 460 return NULL; 461 } 462 463 interface->iface = iface; 464 futex_initialize(&interface->futex, 1); 465 interface->port_id_avail = 0; 466 467 hash_table_insert(&interface_hash_table, &interface->link); 468 469 return interface; 470 } 471 472 static port_t *async_new_port(interface_t *interface, 473 async_port_handler_t handler, void *data) 474 { 475 port_t *port = (port_t *) malloc(sizeof(port_t)); 476 if (!port) 477 return NULL; 478 479 futex_down(&interface->futex); 480 481 port_id_t id = interface->port_id_avail; 482 interface->port_id_avail++; 483 484 port->id = id; 485 port->handler = handler; 486 port->data = data; 487 488 hash_table_insert(&interface->port_hash_table, &port->link); 489 490 futex_up(&interface->futex); 491 492 return port; 493 } 494 352 static async_client_conn_t client_connection = default_client_connection; 495 353 static size_t notification_handler_stksz = FIBRIL_DFLT_STK_SIZE; 496 354 355 /** Setter for client_connection function pointer. 356 * 357 * @param conn Function that will implement a new connection fibril. 358 * 359 */ 360 void async_set_client_connection(async_client_conn_t conn) 361 { 362 assert(client_connection == default_client_connection); 363 client_connection = conn; 364 } 365 497 366 /** Set the stack size for the notification handler notification fibrils. 498 367 * … … 518 387 */ 519 388 static FIBRIL_CONDVAR_INITIALIZE(avail_phone_cv); 520 521 int async_create_port(iface_t iface, async_port_handler_t handler,522 void *data, port_id_t *port_id)523 {524 if ((iface & IFACE_MOD_MASK) == IFACE_MOD_CALLBACK)525 return EINVAL;526 527 interface_t *interface;528 529 futex_down(&async_futex);530 531 ht_link_t *link = hash_table_find(&interface_hash_table, &iface);532 if (link)533 interface = hash_table_get_inst(link, interface_t, link);534 else535 interface = async_new_interface(iface);536 537 if (!interface) {538 futex_up(&async_futex);539 return ENOMEM;540 }541 542 port_t *port = async_new_port(interface, handler, data);543 if (!port) {544 futex_up(&async_futex);545 return ENOMEM;546 }547 548 *port_id = port->id;549 550 futex_up(&async_futex);551 552 return EOK;553 }554 555 void async_set_fallback_port_handler(async_port_handler_t handler, void *data)556 {557 assert(handler != NULL);558 559 fallback_port_handler = handler;560 fallback_port_data = data;561 }562 389 563 390 static hash_table_t client_hash_table; … … 630 457 .remove_callback = NULL 631 458 }; 632 633 static client_t *async_client_get(task_id_t client_id, bool create)634 {635 client_t *client = NULL;636 637 futex_down(&async_futex);638 ht_link_t *link = hash_table_find(&client_hash_table, &client_id);639 if (link) {640 client = hash_table_get_inst(link, client_t, link);641 atomic_inc(&client->refcnt);642 } else if (create) {643 client = malloc(sizeof(client_t));644 if (client) {645 client->in_task_id = client_id;646 client->data = async_client_data_create();647 648 atomic_set(&client->refcnt, 1);649 hash_table_insert(&client_hash_table, &client->link);650 }651 }652 653 futex_up(&async_futex);654 return client;655 }656 657 static void async_client_put(client_t *client)658 {659 bool destroy;660 661 futex_down(&async_futex);662 663 if (atomic_predec(&client->refcnt) == 0) {664 hash_table_remove(&client_hash_table, &client->in_task_id);665 destroy = true;666 } else667 destroy = false;668 669 futex_up(&async_futex);670 671 if (destroy) {672 if (client->data)673 async_client_data_destroy(client->data);674 675 free(client);676 }677 }678 679 /** Wrapper for client connection fibril.680 *681 * When a new connection arrives, a fibril with this implementing682 * function is created.683 *684 * @param arg Connection structure pointer.685 *686 * @return Always zero.687 *688 */689 static int connection_fibril(void *arg)690 {691 assert(arg);692 693 /*694 * Setup fibril-local connection pointer.695 */696 fibril_connection = (connection_t *) arg;697 698 /*699 * Add our reference for the current connection in the client task700 * tracking structure. If this is the first reference, create and701 * hash in a new tracking structure.702 */703 704 client_t *client = async_client_get(fibril_connection->in_task_id, true);705 if (!client) {706 ipc_answer_0(fibril_connection->callid, ENOMEM);707 return 0;708 }709 710 fibril_connection->client = client;711 712 /*713 * Call the connection handler function.714 */715 fibril_connection->handler(fibril_connection->callid,716 &fibril_connection->call, fibril_connection->data);717 718 /*719 * Remove the reference for this client task connection.720 */721 async_client_put(client);722 723 /*724 * Remove myself from the connection hash table.725 */726 futex_down(&async_futex);727 hash_table_remove(&conn_hash_table, &fibril_connection->in_phone_hash);728 futex_up(&async_futex);729 730 /*731 * Answer all remaining messages with EHANGUP.732 */733 while (!list_empty(&fibril_connection->msg_queue)) {734 msg_t *msg =735 list_get_instance(list_first(&fibril_connection->msg_queue),736 msg_t, link);737 738 list_remove(&msg->link);739 ipc_answer_0(msg->callid, EHANGUP);740 free(msg);741 }742 743 /*744 * If the connection was hung-up, answer the last call,745 * i.e. IPC_M_PHONE_HUNGUP.746 */747 if (fibril_connection->close_callid)748 ipc_answer_0(fibril_connection->close_callid, EOK);749 750 free(fibril_connection);751 return 0;752 }753 754 /** Create a new fibril for a new connection.755 *756 * Create new fibril for connection, fill in connection structures757 * and insert it into the hash table, so that later we can easily758 * do routing of messages to particular fibrils.759 *760 * @param in_task_id Identification of the incoming connection.761 * @param in_phone_hash Identification of the incoming connection.762 * @param callid Hash of the opening IPC_M_CONNECT_ME_TO call.763 * If callid is zero, the connection was opened by764 * accepting the IPC_M_CONNECT_TO_ME call and this765 * function is called directly by the server.766 * @param call Call data of the opening call.767 * @param handler Connection handler.768 * @param data Client argument to pass to the connection handler.769 *770 * @return New fibril id or NULL on failure.771 *772 */773 static fid_t async_new_connection(task_id_t in_task_id, sysarg_t in_phone_hash,774 ipc_callid_t callid, ipc_call_t *call, async_port_handler_t handler,775 void *data)776 {777 connection_t *conn = malloc(sizeof(*conn));778 if (!conn) {779 if (callid)780 ipc_answer_0(callid, ENOMEM);781 782 return (uintptr_t) NULL;783 }784 785 conn->in_task_id = in_task_id;786 conn->in_phone_hash = in_phone_hash;787 list_initialize(&conn->msg_queue);788 conn->callid = callid;789 conn->close_callid = 0;790 conn->handler = handler;791 conn->data = data;792 793 if (call)794 conn->call = *call;795 796 /* We will activate the fibril ASAP */797 conn->wdata.active = true;798 conn->wdata.fid = fibril_create(connection_fibril, conn);799 800 if (conn->wdata.fid == 0) {801 free(conn);802 803 if (callid)804 ipc_answer_0(callid, ENOMEM);805 806 return (uintptr_t) NULL;807 }808 809 /* Add connection to the connection hash table */810 811 futex_down(&async_futex);812 hash_table_insert(&conn_hash_table, &conn->link);813 futex_up(&async_futex);814 815 fibril_add_ready(conn->wdata.fid);816 817 return conn->wdata.fid;818 }819 820 /** Wrapper for making IPC_M_CONNECT_TO_ME calls using the async framework.821 *822 * Ask through phone for a new connection to some service.823 *824 * @param exch Exchange for sending the message.825 * @param iface Callback interface.826 * @param arg1 User defined argument.827 * @param arg2 User defined argument.828 * @param handler Callback handler.829 * @param data Handler data.830 * @param port_id ID of the newly created port.831 *832 * @return Zero on success or a negative error code.833 *834 */835 int async_create_callback_port(async_exch_t *exch, iface_t iface, sysarg_t arg1,836 sysarg_t arg2, async_port_handler_t handler, void *data, port_id_t *port_id)837 {838 if ((iface & IFACE_MOD_CALLBACK) != IFACE_MOD_CALLBACK)839 return EINVAL;840 841 if (exch == NULL)842 return ENOENT;843 844 ipc_call_t answer;845 aid_t req = async_send_3(exch, IPC_M_CONNECT_TO_ME, iface, arg1, arg2,846 &answer);847 848 sysarg_t ret;849 async_wait_for(req, &ret);850 if (ret != EOK)851 return (int) ret;852 853 sysarg_t phone_hash = IPC_GET_ARG5(answer);854 interface_t *interface;855 856 futex_down(&async_futex);857 858 ht_link_t *link = hash_table_find(&interface_hash_table, &iface);859 if (link)860 interface = hash_table_get_inst(link, interface_t, link);861 else862 interface = async_new_interface(iface);863 864 if (!interface) {865 futex_up(&async_futex);866 return ENOMEM;867 }868 869 port_t *port = async_new_port(interface, handler, data);870 if (!port) {871 futex_up(&async_futex);872 return ENOMEM;873 }874 875 *port_id = port->id;876 877 futex_up(&async_futex);878 879 fid_t fid = async_new_connection(answer.in_task_id, phone_hash,880 0, NULL, handler, data);881 if (fid == (uintptr_t) NULL)882 return ENOMEM;883 884 return EOK;885 }886 459 887 460 static size_t notification_key_hash(void *key) … … 1293 866 } 1294 867 1295 msg_t *msg = list_get_instance(list_first(&conn->msg_queue), 1296 msg_t, link); 868 msg_t *msg = list_get_instance(list_first(&conn->msg_queue), msg_t, link); 1297 869 list_remove(&msg->link); 1298 870 … … 1305 877 } 1306 878 879 static client_t *async_client_get(task_id_t client_id, bool create) 880 { 881 client_t *client = NULL; 882 883 futex_down(&async_futex); 884 ht_link_t *link = hash_table_find(&client_hash_table, &client_id); 885 if (link) { 886 client = hash_table_get_inst(link, client_t, link); 887 atomic_inc(&client->refcnt); 888 } else if (create) { 889 client = malloc(sizeof(client_t)); 890 if (client) { 891 client->in_task_id = client_id; 892 client->data = async_client_data_create(); 893 894 atomic_set(&client->refcnt, 1); 895 hash_table_insert(&client_hash_table, &client->link); 896 } 897 } 898 899 futex_up(&async_futex); 900 return client; 901 } 902 903 static void async_client_put(client_t *client) 904 { 905 bool destroy; 906 907 futex_down(&async_futex); 908 909 if (atomic_predec(&client->refcnt) == 0) { 910 hash_table_remove(&client_hash_table, &client->in_task_id); 911 destroy = true; 912 } else 913 destroy = false; 914 915 futex_up(&async_futex); 916 917 if (destroy) { 918 if (client->data) 919 async_client_data_destroy(client->data); 920 921 free(client); 922 } 923 } 924 1307 925 void *async_get_client_data(void) 1308 926 { … … 1316 934 if (!client) 1317 935 return NULL; 1318 1319 936 if (!client->data) { 1320 937 async_client_put(client); 1321 938 return NULL; 1322 939 } 1323 940 1324 941 return client->data; 1325 942 } … … 1328 945 { 1329 946 client_t *client = async_client_get(client_id, false); 1330 947 1331 948 assert(client); 1332 949 assert(client->data); 1333 950 1334 951 /* Drop the reference we got in async_get_client_data_by_hash(). */ 1335 952 async_client_put(client); 1336 953 1337 954 /* Drop our own reference we got at the beginning of this function. */ 1338 955 async_client_put(client); 1339 956 } 1340 957 1341 static port_t *async_find_port(iface_t iface, port_id_t port_id) 1342 { 1343 port_t *port = NULL; 1344 958 /** Wrapper for client connection fibril. 959 * 960 * When a new connection arrives, a fibril with this implementing function is 961 * created. It calls client_connection() and does the final cleanup. 962 * 963 * @param arg Connection structure pointer. 964 * 965 * @return Always zero. 966 * 967 */ 968 static int connection_fibril(void *arg) 969 { 970 assert(arg); 971 972 /* 973 * Setup fibril-local connection pointer. 974 */ 975 fibril_connection = (connection_t *) arg; 976 977 /* 978 * Add our reference for the current connection in the client task 979 * tracking structure. If this is the first reference, create and 980 * hash in a new tracking structure. 981 */ 982 983 client_t *client = async_client_get(fibril_connection->in_task_id, true); 984 if (!client) { 985 ipc_answer_0(fibril_connection->callid, ENOMEM); 986 return 0; 987 } 988 989 fibril_connection->client = client; 990 991 /* 992 * Call the connection handler function. 993 */ 994 fibril_connection->cfibril(fibril_connection->callid, 995 &fibril_connection->call, fibril_connection->carg); 996 997 /* 998 * Remove the reference for this client task connection. 999 */ 1000 async_client_put(client); 1001 1002 /* 1003 * Remove myself from the connection hash table. 1004 */ 1345 1005 futex_down(&async_futex); 1346 1347 ht_link_t *link = hash_table_find(&interface_hash_table, &iface); 1348 if (link) { 1349 interface_t *interface = 1350 hash_table_get_inst(link, interface_t, link); 1006 hash_table_remove(&conn_hash_table, &fibril_connection->in_phone_hash); 1007 futex_up(&async_futex); 1008 1009 /* 1010 * Answer all remaining messages with EHANGUP. 1011 */ 1012 while (!list_empty(&fibril_connection->msg_queue)) { 1013 msg_t *msg = 1014 list_get_instance(list_first(&fibril_connection->msg_queue), 1015 msg_t, link); 1351 1016 1352 link = hash_table_find(&interface->port_hash_table, &port_id); 1353 if (link) 1354 port = hash_table_get_inst(link, port_t, link); 1355 } 1356 1017 list_remove(&msg->link); 1018 ipc_answer_0(msg->callid, EHANGUP); 1019 free(msg); 1020 } 1021 1022 /* 1023 * If the connection was hung-up, answer the last call, 1024 * i.e. IPC_M_PHONE_HUNGUP. 1025 */ 1026 if (fibril_connection->close_callid) 1027 ipc_answer_0(fibril_connection->close_callid, EOK); 1028 1029 free(fibril_connection); 1030 return 0; 1031 } 1032 1033 /** Create a new fibril for a new connection. 1034 * 1035 * Create new fibril for connection, fill in connection structures and insert 1036 * it into the hash table, so that later we can easily do routing of messages to 1037 * particular fibrils. 1038 * 1039 * @param in_task_id Identification of the incoming connection. 1040 * @param in_phone_hash Identification of the incoming connection. 1041 * @param callid Hash of the opening IPC_M_CONNECT_ME_TO call. 1042 * If callid is zero, the connection was opened by 1043 * accepting the IPC_M_CONNECT_TO_ME call and this function 1044 * is called directly by the server. 1045 * @param call Call data of the opening call. 1046 * @param cfibril Fibril function that should be called upon opening the 1047 * connection. 1048 * @param carg Extra argument to pass to the connection fibril 1049 * 1050 * @return New fibril id or NULL on failure. 1051 * 1052 */ 1053 fid_t async_new_connection(task_id_t in_task_id, sysarg_t in_phone_hash, 1054 ipc_callid_t callid, ipc_call_t *call, 1055 async_client_conn_t cfibril, void *carg) 1056 { 1057 connection_t *conn = malloc(sizeof(*conn)); 1058 if (!conn) { 1059 if (callid) 1060 ipc_answer_0(callid, ENOMEM); 1061 1062 return (uintptr_t) NULL; 1063 } 1064 1065 conn->in_task_id = in_task_id; 1066 conn->in_phone_hash = in_phone_hash; 1067 list_initialize(&conn->msg_queue); 1068 conn->callid = callid; 1069 conn->close_callid = 0; 1070 conn->carg = carg; 1071 1072 if (call) 1073 conn->call = *call; 1074 1075 /* We will activate the fibril ASAP */ 1076 conn->wdata.active = true; 1077 conn->cfibril = cfibril; 1078 conn->wdata.fid = fibril_create(connection_fibril, conn); 1079 1080 if (conn->wdata.fid == 0) { 1081 free(conn); 1082 1083 if (callid) 1084 ipc_answer_0(callid, ENOMEM); 1085 1086 return (uintptr_t) NULL; 1087 } 1088 1089 /* Add connection to the connection hash table */ 1090 1091 futex_down(&async_futex); 1092 hash_table_insert(&conn_hash_table, &conn->link); 1357 1093 futex_up(&async_futex); 1358 1094 1359 return port; 1095 fibril_add_ready(conn->wdata.fid); 1096 1097 return conn->wdata.fid; 1360 1098 } 1361 1099 … … 1373 1111 assert(call); 1374 1112 1375 /* Kernel notification */1113 /* Unrouted call - take some default action */ 1376 1114 if ((callid & IPC_CALLID_NOTIFICATION)) { 1377 1115 process_notification(callid, call); … … 1379 1117 } 1380 1118 1381 /* New connection */ 1382 if (IPC_GET_IMETHOD(*call) == IPC_M_CONNECT_ME_TO) { 1383 iface_t iface = (iface_t) IPC_GET_ARG1(*call); 1384 sysarg_t in_phone_hash = IPC_GET_ARG5(*call); 1385 1386 async_notification_handler_t handler = fallback_port_handler; 1387 void *data = fallback_port_data; 1388 1389 // TODO: Currently ignores all ports but the first one 1390 port_t *port = async_find_port(iface, 0); 1391 if (port) { 1392 handler = port->handler; 1393 data = port->data; 1394 } 1395 1396 async_new_connection(call->in_task_id, in_phone_hash, callid, 1397 call, handler, data); 1398 return; 1399 } 1400 1401 /* Cloned connection */ 1402 if (IPC_GET_IMETHOD(*call) == IPC_M_CLONE_ESTABLISH) { 1403 // TODO: Currently ignores ports altogether 1404 1119 switch (IPC_GET_IMETHOD(*call)) { 1120 case IPC_M_CLONE_ESTABLISH: 1121 case IPC_M_CONNECT_ME_TO: 1405 1122 /* Open new connection with fibril, etc. */ 1406 1123 async_new_connection(call->in_task_id, IPC_GET_ARG5(*call), 1407 callid, call, fallback_port_handler, fallback_port_data);1124 callid, call, client_connection, NULL); 1408 1125 return; 1409 1126 } … … 1566 1283 void __async_init(void) 1567 1284 { 1568 if (!hash_table_create(&interface_hash_table, 0, 0,1569 &interface_hash_table_ops))1570 abort();1571 1572 1285 if (!hash_table_create(&client_hash_table, 0, 0, &client_hash_table_ops)) 1573 1286 abort(); … … 1584 1297 abort(); 1585 1298 1586 session_ns->iface = 0;1587 1299 session_ns->mgmt = EXCHANGE_ATOMIC; 1588 1300 session_ns->phone = PHONE_NS; … … 1631 1343 1632 1344 msg->done = true; 1633 1345 1634 1346 if (msg->forget) { 1635 1347 assert(msg->wdata.active); … … 1639 1351 fibril_add_ready(msg->wdata.fid); 1640 1352 } 1641 1353 1642 1354 futex_up(&async_futex); 1643 1355 } … … 1731 1443 1732 1444 futex_down(&async_futex); 1733 1445 1734 1446 assert(!msg->forget); 1735 1447 assert(!msg->destroyed); 1736 1448 1737 1449 if (msg->done) { 1738 1450 futex_up(&async_futex); … … 1775 1487 1776 1488 amsg_t *msg = (amsg_t *) amsgid; 1777 1489 1778 1490 futex_down(&async_futex); 1779 1491 1780 1492 assert(!msg->forget); 1781 1493 assert(!msg->destroyed); 1782 1494 1783 1495 if (msg->done) { 1784 1496 futex_up(&async_futex); … … 1792 1504 if (timeout < 0) 1793 1505 timeout = 0; 1794 1506 1795 1507 getuptime(&msg->wdata.to_event.expires); 1796 1508 tv_add_diff(&msg->wdata.to_event.expires, timeout); … … 1845 1557 { 1846 1558 amsg_t *msg = (amsg_t *) amsgid; 1847 1559 1848 1560 assert(msg); 1849 1561 assert(!msg->forget); 1850 1562 assert(!msg->destroyed); 1851 1563 1852 1564 futex_down(&async_futex); 1853 1854 1565 if (msg->done) { 1855 1566 amsg_destroy(msg); … … 1858 1569 msg->forget = true; 1859 1570 } 1860 1861 1571 futex_up(&async_futex); 1862 1572 } … … 2104 1814 * @param arg2 User defined argument. 2105 1815 * @param arg3 User defined argument. 1816 * @param client_receiver Connection handing routine. 2106 1817 * 2107 1818 * @return Zero on success or a negative error code. … … 2109 1820 */ 2110 1821 int async_connect_to_me(async_exch_t *exch, sysarg_t arg1, sysarg_t arg2, 2111 sysarg_t arg3 )1822 sysarg_t arg3, async_client_conn_t client_receiver, void *carg) 2112 1823 { 2113 1824 if (exch == NULL) 2114 1825 return ENOENT; 2115 1826 1827 sysarg_t phone_hash; 1828 sysarg_t rc; 1829 1830 aid_t req; 2116 1831 ipc_call_t answer; 2117 aid_treq = async_send_3(exch, IPC_M_CONNECT_TO_ME, arg1, arg2, arg3,1832 req = async_send_3(exch, IPC_M_CONNECT_TO_ME, arg1, arg2, arg3, 2118 1833 &answer); 2119 2120 sysarg_t rc;2121 1834 async_wait_for(req, &rc); 2122 1835 if (rc != EOK) 2123 1836 return (int) rc; 1837 1838 phone_hash = IPC_GET_ARG5(answer); 1839 1840 if (client_receiver != NULL) 1841 async_new_connection(answer.in_task_id, phone_hash, 0, NULL, 1842 client_receiver, carg); 2124 1843 2125 1844 return EOK; … … 2181 1900 } 2182 1901 2183 sess->iface = 0;2184 1902 sess->mgmt = mgmt; 2185 1903 sess->phone = phone; … … 2251 1969 int phone = async_connect_me_to_internal(exch->phone, arg1, arg2, arg3, 2252 1970 0); 1971 2253 1972 if (phone < 0) { 2254 1973 errno = phone; … … 2257 1976 } 2258 1977 2259 sess->iface = 0;2260 1978 sess->mgmt = mgmt; 2261 1979 sess->phone = phone; … … 2274 1992 } 2275 1993 2276 /** Wrapper for making IPC_M_CONNECT_ME_TO calls using the async framework.2277 *2278 * Ask through phone for a new connection to some service and block until2279 * success.2280 *2281 * @param exch Exchange for sending the message.2282 * @param iface Connection interface.2283 * @param arg2 User defined argument.2284 * @param arg3 User defined argument.2285 *2286 * @return New session on success or NULL on error.2287 *2288 */2289 async_sess_t *async_connect_me_to_iface(async_exch_t *exch, iface_t iface,2290 sysarg_t arg2, sysarg_t arg3)2291 {2292 if (exch == NULL) {2293 errno = ENOENT;2294 return NULL;2295 }2296 2297 async_sess_t *sess = (async_sess_t *) malloc(sizeof(async_sess_t));2298 if (sess == NULL) {2299 errno = ENOMEM;2300 return NULL;2301 }2302 2303 int phone = async_connect_me_to_internal(exch->phone, iface, arg2,2304 arg3, 0);2305 if (phone < 0) {2306 errno = phone;2307 free(sess);2308 return NULL;2309 }2310 2311 sess->iface = iface;2312 sess->phone = phone;2313 sess->arg1 = iface;2314 sess->arg2 = arg2;2315 sess->arg3 = arg3;2316 2317 fibril_mutex_initialize(&sess->remote_state_mtx);2318 sess->remote_state_data = NULL;2319 2320 list_initialize(&sess->exch_list);2321 fibril_mutex_initialize(&sess->mutex);2322 atomic_set(&sess->refcnt, 0);2323 2324 return sess;2325 }2326 2327 1994 /** Set arguments for new connections. 2328 1995 * … … 2380 2047 } 2381 2048 2382 sess->iface = 0;2383 2049 sess->mgmt = mgmt; 2384 2050 sess->phone = phone; … … 2397 2063 } 2398 2064 2399 /** Wrapper for making IPC_M_CONNECT_ME_TO calls using the async framework.2400 *2401 * Ask through phone for a new connection to some service and block until2402 * success.2403 *2404 * @param exch Exchange for sending the message.2405 * @param iface Connection interface.2406 * @param arg2 User defined argument.2407 * @param arg3 User defined argument.2408 *2409 * @return New session on success or NULL on error.2410 *2411 */2412 async_sess_t *async_connect_me_to_blocking_iface(async_exch_t *exch, iface_t iface,2413 sysarg_t arg2, sysarg_t arg3)2414 {2415 if (exch == NULL) {2416 errno = ENOENT;2417 return NULL;2418 }2419 2420 async_sess_t *sess = (async_sess_t *) malloc(sizeof(async_sess_t));2421 if (sess == NULL) {2422 errno = ENOMEM;2423 return NULL;2424 }2425 2426 int phone = async_connect_me_to_internal(exch->phone, iface, arg2,2427 arg3, IPC_FLAG_BLOCKING);2428 if (phone < 0) {2429 errno = phone;2430 free(sess);2431 return NULL;2432 }2433 2434 sess->iface = iface;2435 sess->phone = phone;2436 sess->arg1 = iface;2437 sess->arg2 = arg2;2438 sess->arg3 = arg3;2439 2440 fibril_mutex_initialize(&sess->remote_state_mtx);2441 sess->remote_state_data = NULL;2442 2443 list_initialize(&sess->exch_list);2444 fibril_mutex_initialize(&sess->mutex);2445 atomic_set(&sess->refcnt, 0);2446 2447 return sess;2448 }2449 2450 2065 /** Connect to a task specified by id. 2451 2066 * … … 2466 2081 } 2467 2082 2468 sess->iface = 0;2469 2083 sess->mgmt = EXCHANGE_ATOMIC; 2470 2084 sess->phone = phone; … … 2544 2158 return NULL; 2545 2159 2546 exch_mgmt_t mgmt = sess->mgmt; 2547 if (sess->iface != 0) 2548 mgmt = sess->iface & IFACE_EXCHANGE_MASK; 2549 2550 async_exch_t *exch = NULL; 2160 async_exch_t *exch; 2551 2161 2552 2162 fibril_mutex_lock(&async_sess_mutex); … … 2567 2177 */ 2568 2178 2569 if (( mgmt == EXCHANGE_ATOMIC) ||2570 ( mgmt == EXCHANGE_SERIALIZE)) {2179 if ((sess->mgmt == EXCHANGE_ATOMIC) || 2180 (sess->mgmt == EXCHANGE_SERIALIZE)) { 2571 2181 exch = (async_exch_t *) malloc(sizeof(async_exch_t)); 2572 2182 if (exch != NULL) { … … 2576 2186 exch->phone = sess->phone; 2577 2187 } 2578 } else if (mgmt == EXCHANGE_PARALLEL) { 2579 int phone; 2580 2581 retry: 2188 } else { /* EXCHANGE_PARALLEL */ 2582 2189 /* 2583 2190 * Make a one-time attempt to connect a new data phone. 2584 2191 */ 2192 2193 int phone; 2194 2195 retry: 2585 2196 phone = async_connect_me_to_internal(sess->phone, sess->arg1, 2586 2197 sess->arg2, sess->arg3, 0); … … 2624 2235 atomic_inc(&sess->refcnt); 2625 2236 2626 if ( mgmt == EXCHANGE_SERIALIZE)2237 if (sess->mgmt == EXCHANGE_SERIALIZE) 2627 2238 fibril_mutex_lock(&sess->mutex); 2628 2239 } … … 2644 2255 assert(sess != NULL); 2645 2256 2646 exch_mgmt_t mgmt = sess->mgmt;2647 if (sess->iface != 0)2648 mgmt = sess->iface & IFACE_EXCHANGE_MASK;2649 2650 2257 atomic_dec(&sess->refcnt); 2651 2258 2652 if ( mgmt == EXCHANGE_SERIALIZE)2259 if (sess->mgmt == EXCHANGE_SERIALIZE) 2653 2260 fibril_mutex_unlock(&sess->mutex); 2654 2261 … … 3087 2694 } 3088 2695 3089 void * arg_data;2696 void *_data; 3090 2697 3091 2698 if (nullterm) 3092 arg_data = malloc(size + 1);2699 _data = malloc(size + 1); 3093 2700 else 3094 arg_data = malloc(size);3095 3096 if ( arg_data == NULL) {2701 _data = malloc(size); 2702 2703 if (_data == NULL) { 3097 2704 ipc_answer_0(callid, ENOMEM); 3098 2705 return ENOMEM; 3099 2706 } 3100 2707 3101 int rc = async_data_write_finalize(callid, arg_data, size);2708 int rc = async_data_write_finalize(callid, _data, size); 3102 2709 if (rc != EOK) { 3103 free( arg_data);2710 free(_data); 3104 2711 return rc; 3105 2712 } 3106 2713 3107 2714 if (nullterm) 3108 ((char *) arg_data)[size] = 0;3109 3110 *data = arg_data;2715 ((char *) _data)[size] = 0; 2716 2717 *data = _data; 3111 2718 if (received != NULL) 3112 2719 *received = size; … … 3206 2813 } 3207 2814 3208 sess->iface = 0;3209 2815 sess->mgmt = mgmt; 3210 2816 sess->phone = phone; … … 3256 2862 } 3257 2863 3258 sess->iface = 0;3259 2864 sess->mgmt = mgmt; 3260 2865 sess->phone = phone; … … 3302 2907 return NULL; 3303 2908 3304 sess->iface = 0;3305 2909 sess->mgmt = mgmt; 3306 2910 sess->phone = phone; … … 3330 2934 { 3331 2935 assert(callid); 3332 2936 3333 2937 ipc_call_t call; 3334 2938 *callid = async_get_call(&call); 3335 2939 3336 2940 if (IPC_GET_IMETHOD(call) != IPC_M_STATE_CHANGE_AUTHORIZE) 3337 2941 return false; … … 3343 2947 if (arg3) 3344 2948 *arg3 = IPC_GET_ARG3(call); 3345 2949 3346 2950 return true; 3347 2951 }
Note:
See TracChangeset
for help on using the changeset viewer.