Changes in uspace/lib/c/generic/async.c [7f9d97f3:c170438] in mainline
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
uspace/lib/c/generic/async.c
r7f9d97f3 rc170438 77 77 * } 78 78 * 79 * my_client_connection(icallid, *icall)79 * port_handler(icallid, *icall) 80 80 * { 81 81 * if (want_refuse) { … … 123 123 list_t exch_list; 124 124 125 /** Session interface */ 126 iface_t iface; 127 125 128 /** Exchange management style */ 126 129 exch_mgmt_t mgmt; … … 189 192 /** If reply was received. */ 190 193 bool done; 191 194 192 195 /** If the message / reply should be discarded on arrival. */ 193 196 bool forget; 194 197 195 198 /** If already destroyed. */ 196 199 bool destroyed; … … 232 235 /** Identification of the opening call. */ 233 236 ipc_callid_t callid; 237 234 238 /** Call data of the opening call. */ 235 239 ipc_call_t call; 236 /** Local argument or NULL if none. */237 void *carg;238 240 239 241 /** Identification of the closing call. */ … … 241 243 242 244 /** Fibril function that will be used to handle the connection. */ 243 async_client_conn_t cfibril; 245 async_port_handler_t handler; 246 247 /** Client data */ 248 void *data; 244 249 } connection_t; 250 251 /** Interface data */ 252 typedef struct { 253 ht_link_t link; 254 255 /** Interface ID */ 256 iface_t iface; 257 258 /** Futex protecting the hash table */ 259 futex_t futex; 260 261 /** Interface ports */ 262 hash_table_t port_hash_table; 263 264 /** Next available port ID */ 265 port_id_t port_id_avail; 266 } interface_t; 267 268 /* Port data */ 269 typedef struct { 270 ht_link_t link; 271 272 /** Port ID */ 273 port_id_t id; 274 275 /** Port connection handler */ 276 async_port_handler_t handler; 277 278 /** Client data */ 279 void *data; 280 } port_t; 245 281 246 282 /* Notification data */ … … 264 300 { 265 301 struct timeval tv = { 0, 0 }; 266 302 267 303 to->inlist = false; 268 304 to->occurred = false; … … 287 323 static amsg_t *amsg_create(void) 288 324 { 289 amsg_t *msg; 290 291 msg = malloc(sizeof(amsg_t)); 325 amsg_t *msg = malloc(sizeof(amsg_t)); 292 326 if (msg) { 293 327 msg->done = false; … … 298 332 awaiter_initialize(&msg->wdata); 299 333 } 300 334 301 335 return msg; 302 336 } … … 335 369 } 336 370 337 /** Default fibril function that gets called to handle new connection. 338 * 339 * This function is defined as a weak symbol - to be redefined in user code. 371 /** Default fallback fibril function. 372 * 373 * This fallback fibril function gets called on incomming 374 * connections that do not have a specific handler defined. 340 375 * 341 376 * @param callid Hash of the incoming call. … … 344 379 * 345 380 */ 346 static void default_ client_connection(ipc_callid_t callid, ipc_call_t *call,381 static void default_fallback_port_handler(ipc_callid_t callid, ipc_call_t *call, 347 382 void *arg) 348 383 { … … 350 385 } 351 386 352 static async_client_conn_t client_connection = default_client_connection; 353 static size_t notification_handler_stksz = FIBRIL_DFLT_STK_SIZE; 354 355 /** Setter for client_connection function pointer. 356 * 357 * @param conn Function that will implement a new connection fibril. 358 * 359 */ 360 void async_set_client_connection(async_client_conn_t conn) 361 { 362 assert(client_connection == default_client_connection); 363 client_connection = conn; 364 } 365 366 /** Set the stack size for the notification handler notification fibrils. 367 * 368 * @param size Stack size in bytes. 369 */ 370 void async_set_notification_handler_stack_size(size_t size) 371 { 372 notification_handler_stksz = size; 387 static async_port_handler_t fallback_port_handler = 388 default_fallback_port_handler; 389 static void *fallback_port_data = NULL; 390 391 static hash_table_t interface_hash_table; 392 393 static size_t interface_key_hash(void *key) 394 { 395 iface_t iface = *(iface_t *) key; 396 return iface; 397 } 398 399 static size_t interface_hash(const ht_link_t *item) 400 { 401 interface_t *interface = hash_table_get_inst(item, interface_t, link); 402 return interface_key_hash(&interface->iface); 403 } 404 405 static bool interface_key_equal(void *key, const ht_link_t *item) 406 { 407 iface_t iface = *(iface_t *) key; 408 interface_t *interface = hash_table_get_inst(item, interface_t, link); 409 return iface == interface->iface; 410 } 411 412 /** Operations for the port hash table. */ 413 static hash_table_ops_t interface_hash_table_ops = { 414 .hash = interface_hash, 415 .key_hash = interface_key_hash, 416 .key_equal = interface_key_equal, 417 .equal = NULL, 418 .remove_callback = NULL 419 }; 420 421 static size_t port_key_hash(void *key) 422 { 423 port_id_t port_id = *(port_id_t *) key; 424 return port_id; 425 } 426 427 static size_t port_hash(const ht_link_t *item) 428 { 429 port_t *port = hash_table_get_inst(item, port_t, link); 430 return port_key_hash(&port->id); 431 } 432 433 static bool port_key_equal(void *key, const ht_link_t *item) 434 { 435 port_id_t port_id = *(port_id_t *) key; 436 port_t *port = hash_table_get_inst(item, port_t, link); 437 return port_id == port->id; 438 } 439 440 /** Operations for the port hash table. */ 441 static hash_table_ops_t port_hash_table_ops = { 442 .hash = port_hash, 443 .key_hash = port_key_hash, 444 .key_equal = port_key_equal, 445 .equal = NULL, 446 .remove_callback = NULL 447 }; 448 449 static interface_t *async_new_interface(iface_t iface) 450 { 451 interface_t *interface = 452 (interface_t *) malloc(sizeof(interface_t)); 453 if (!interface) 454 return NULL; 455 456 bool ret = hash_table_create(&interface->port_hash_table, 0, 0, 457 &port_hash_table_ops); 458 if (!ret) { 459 free(interface); 460 return NULL; 461 } 462 463 interface->iface = iface; 464 futex_initialize(&interface->futex, 1); 465 interface->port_id_avail = 0; 466 467 hash_table_insert(&interface_hash_table, &interface->link); 468 469 return interface; 470 } 471 472 static port_t *async_new_port(interface_t *interface, 473 async_port_handler_t handler, void *data) 474 { 475 port_t *port = (port_t *) malloc(sizeof(port_t)); 476 if (!port) 477 return NULL; 478 479 futex_down(&interface->futex); 480 481 port_id_t id = interface->port_id_avail; 482 interface->port_id_avail++; 483 484 port->id = id; 485 port->handler = handler; 486 port->data = data; 487 488 hash_table_insert(&interface->port_hash_table, &port->link); 489 490 futex_up(&interface->futex); 491 492 return port; 373 493 } 374 494 … … 387 507 */ 388 508 static FIBRIL_CONDVAR_INITIALIZE(avail_phone_cv); 509 510 int async_create_port(iface_t iface, async_port_handler_t handler, 511 void *data, port_id_t *port_id) 512 { 513 if ((iface & IFACE_MOD_MASK) == IFACE_MOD_CALLBACK) 514 return EINVAL; 515 516 interface_t *interface; 517 518 futex_down(&async_futex); 519 520 ht_link_t *link = hash_table_find(&interface_hash_table, &iface); 521 if (link) 522 interface = hash_table_get_inst(link, interface_t, link); 523 else 524 interface = async_new_interface(iface); 525 526 if (!interface) { 527 futex_up(&async_futex); 528 return ENOMEM; 529 } 530 531 port_t *port = async_new_port(interface, handler, data); 532 if (!port) { 533 futex_up(&async_futex); 534 return ENOMEM; 535 } 536 537 *port_id = port->id; 538 539 futex_up(&async_futex); 540 541 return EOK; 542 } 543 544 void async_set_fallback_port_handler(async_port_handler_t handler, void *data) 545 { 546 assert(handler != NULL); 547 548 fallback_port_handler = handler; 549 fallback_port_data = data; 550 } 389 551 390 552 static hash_table_t client_hash_table; … … 457 619 .remove_callback = NULL 458 620 }; 621 622 static client_t *async_client_get(task_id_t client_id, bool create) 623 { 624 client_t *client = NULL; 625 626 futex_down(&async_futex); 627 ht_link_t *link = hash_table_find(&client_hash_table, &client_id); 628 if (link) { 629 client = hash_table_get_inst(link, client_t, link); 630 atomic_inc(&client->refcnt); 631 } else if (create) { 632 client = malloc(sizeof(client_t)); 633 if (client) { 634 client->in_task_id = client_id; 635 client->data = async_client_data_create(); 636 637 atomic_set(&client->refcnt, 1); 638 hash_table_insert(&client_hash_table, &client->link); 639 } 640 } 641 642 futex_up(&async_futex); 643 return client; 644 } 645 646 static void async_client_put(client_t *client) 647 { 648 bool destroy; 649 650 futex_down(&async_futex); 651 652 if (atomic_predec(&client->refcnt) == 0) { 653 hash_table_remove(&client_hash_table, &client->in_task_id); 654 destroy = true; 655 } else 656 destroy = false; 657 658 futex_up(&async_futex); 659 660 if (destroy) { 661 if (client->data) 662 async_client_data_destroy(client->data); 663 664 free(client); 665 } 666 } 667 668 /** Wrapper for client connection fibril. 669 * 670 * When a new connection arrives, a fibril with this implementing 671 * function is created. 672 * 673 * @param arg Connection structure pointer. 674 * 675 * @return Always zero. 676 * 677 */ 678 static int connection_fibril(void *arg) 679 { 680 assert(arg); 681 682 /* 683 * Setup fibril-local connection pointer. 684 */ 685 fibril_connection = (connection_t *) arg; 686 687 /* 688 * Add our reference for the current connection in the client task 689 * tracking structure. If this is the first reference, create and 690 * hash in a new tracking structure. 691 */ 692 693 client_t *client = async_client_get(fibril_connection->in_task_id, true); 694 if (!client) { 695 ipc_answer_0(fibril_connection->callid, ENOMEM); 696 return 0; 697 } 698 699 fibril_connection->client = client; 700 701 /* 702 * Call the connection handler function. 703 */ 704 fibril_connection->handler(fibril_connection->callid, 705 &fibril_connection->call, fibril_connection->data); 706 707 /* 708 * Remove the reference for this client task connection. 709 */ 710 async_client_put(client); 711 712 /* 713 * Remove myself from the connection hash table. 714 */ 715 futex_down(&async_futex); 716 hash_table_remove(&conn_hash_table, &fibril_connection->in_phone_hash); 717 futex_up(&async_futex); 718 719 /* 720 * Answer all remaining messages with EHANGUP. 721 */ 722 while (!list_empty(&fibril_connection->msg_queue)) { 723 msg_t *msg = 724 list_get_instance(list_first(&fibril_connection->msg_queue), 725 msg_t, link); 726 727 list_remove(&msg->link); 728 ipc_answer_0(msg->callid, EHANGUP); 729 free(msg); 730 } 731 732 /* 733 * If the connection was hung-up, answer the last call, 734 * i.e. IPC_M_PHONE_HUNGUP. 735 */ 736 if (fibril_connection->close_callid) 737 ipc_answer_0(fibril_connection->close_callid, EOK); 738 739 free(fibril_connection); 740 return 0; 741 } 742 743 /** Create a new fibril for a new connection. 744 * 745 * Create new fibril for connection, fill in connection structures 746 * and insert it into the hash table, so that later we can easily 747 * do routing of messages to particular fibrils. 748 * 749 * @param in_task_id Identification of the incoming connection. 750 * @param in_phone_hash Identification of the incoming connection. 751 * @param callid Hash of the opening IPC_M_CONNECT_ME_TO call. 752 * If callid is zero, the connection was opened by 753 * accepting the IPC_M_CONNECT_TO_ME call and this 754 * function is called directly by the server. 755 * @param call Call data of the opening call. 756 * @param handler Connection handler. 757 * @param data Client argument to pass to the connection handler. 758 * 759 * @return New fibril id or NULL on failure. 760 * 761 */ 762 static fid_t async_new_connection(task_id_t in_task_id, sysarg_t in_phone_hash, 763 ipc_callid_t callid, ipc_call_t *call, async_port_handler_t handler, 764 void *data) 765 { 766 connection_t *conn = malloc(sizeof(*conn)); 767 if (!conn) { 768 if (callid) 769 ipc_answer_0(callid, ENOMEM); 770 771 return (uintptr_t) NULL; 772 } 773 774 conn->in_task_id = in_task_id; 775 conn->in_phone_hash = in_phone_hash; 776 list_initialize(&conn->msg_queue); 777 conn->callid = callid; 778 conn->close_callid = 0; 779 conn->handler = handler; 780 conn->data = data; 781 782 if (call) 783 conn->call = *call; 784 785 /* We will activate the fibril ASAP */ 786 conn->wdata.active = true; 787 conn->wdata.fid = fibril_create(connection_fibril, conn); 788 789 if (conn->wdata.fid == 0) { 790 free(conn); 791 792 if (callid) 793 ipc_answer_0(callid, ENOMEM); 794 795 return (uintptr_t) NULL; 796 } 797 798 /* Add connection to the connection hash table */ 799 800 futex_down(&async_futex); 801 hash_table_insert(&conn_hash_table, &conn->link); 802 futex_up(&async_futex); 803 804 fibril_add_ready(conn->wdata.fid); 805 806 return conn->wdata.fid; 807 } 808 809 /** Wrapper for making IPC_M_CONNECT_TO_ME calls using the async framework. 810 * 811 * Ask through phone for a new connection to some service. 812 * 813 * @param exch Exchange for sending the message. 814 * @param iface Callback interface. 815 * @param arg1 User defined argument. 816 * @param arg2 User defined argument. 817 * @param handler Callback handler. 818 * @param data Handler data. 819 * @param port_id ID of the newly created port. 820 * 821 * @return Zero on success or a negative error code. 822 * 823 */ 824 int async_create_callback_port(async_exch_t *exch, iface_t iface, sysarg_t arg1, 825 sysarg_t arg2, async_port_handler_t handler, void *data, port_id_t *port_id) 826 { 827 if ((iface & IFACE_MOD_CALLBACK) != IFACE_MOD_CALLBACK) 828 return EINVAL; 829 830 if (exch == NULL) 831 return ENOENT; 832 833 ipc_call_t answer; 834 aid_t req = async_send_3(exch, IPC_M_CONNECT_TO_ME, iface, arg1, arg2, 835 &answer); 836 837 sysarg_t ret; 838 async_wait_for(req, &ret); 839 if (ret != EOK) 840 return (int) ret; 841 842 sysarg_t phone_hash = IPC_GET_ARG5(answer); 843 interface_t *interface; 844 845 futex_down(&async_futex); 846 847 ht_link_t *link = hash_table_find(&interface_hash_table, &iface); 848 if (link) 849 interface = hash_table_get_inst(link, interface_t, link); 850 else 851 interface = async_new_interface(iface); 852 853 if (!interface) { 854 futex_up(&async_futex); 855 return ENOMEM; 856 } 857 858 port_t *port = async_new_port(interface, handler, data); 859 if (!port) { 860 futex_up(&async_futex); 861 return ENOMEM; 862 } 863 864 *port_id = port->id; 865 866 futex_up(&async_futex); 867 868 fid_t fid = async_new_connection(answer.in_task_id, phone_hash, 869 0, NULL, handler, data); 870 if (fid == (uintptr_t) NULL) 871 return ENOMEM; 872 873 return EOK; 874 } 459 875 460 876 static size_t notification_key_hash(void *key) … … 571 987 } 572 988 573 /** Notification fibril. 574 * 575 * When a notification arrives, a fibril with this implementing function is 576 * created. It calls the corresponding notification handler and does the final 577 * cleanup. 578 * 579 * @param arg Message structure pointer. 580 * 581 * @return Always zero. 582 * 583 */ 584 static int notification_fibril(void *arg) 585 { 586 assert(arg); 587 588 msg_t *msg = (msg_t *) arg; 989 /** Process notification. 990 * 991 * @param callid Hash of the incoming call. 992 * @param call Data of the incoming call. 993 */ 994 static void process_notification(ipc_callid_t callid, ipc_call_t *call) 995 { 589 996 async_notification_handler_t handler = NULL; 590 997 void *data = NULL; 998 999 assert(call); 591 1000 592 1001 futex_down(&async_futex); 593 1002 594 1003 ht_link_t *link = hash_table_find(¬ification_hash_table, 595 &IPC_GET_IMETHOD( msg->call));1004 &IPC_GET_IMETHOD(*call)); 596 1005 if (link) { 597 1006 notification_t *notification = … … 604 1013 605 1014 if (handler) 606 handler(msg->callid, &msg->call, data); 607 608 free(msg); 609 return 0; 610 } 611 612 /** Process notification. 613 * 614 * A new fibril is created which would process the notification. 615 * 616 * @param callid Hash of the incoming call. 617 * @param call Data of the incoming call. 618 * 619 * @return False if an error occured. 620 * True if the call was passed to the notification fibril. 621 * 622 */ 623 static bool process_notification(ipc_callid_t callid, ipc_call_t *call) 624 { 625 assert(call); 626 627 futex_down(&async_futex); 628 629 msg_t *msg = malloc(sizeof(*msg)); 630 if (!msg) { 631 futex_up(&async_futex); 632 return false; 633 } 634 635 msg->callid = callid; 636 msg->call = *call; 637 638 fid_t fid = fibril_create_generic(notification_fibril, msg, 639 notification_handler_stksz); 640 if (fid == 0) { 641 free(msg); 642 futex_up(&async_futex); 643 return false; 644 } 645 646 fibril_add_ready(fid); 647 648 futex_up(&async_futex); 649 return true; 1015 handler(callid, call, data); 650 1016 } 651 1017 … … 866 1232 } 867 1233 868 msg_t *msg = list_get_instance(list_first(&conn->msg_queue), msg_t, link); 1234 msg_t *msg = list_get_instance(list_first(&conn->msg_queue), 1235 msg_t, link); 869 1236 list_remove(&msg->link); 870 1237 … … 877 1244 } 878 1245 879 static client_t *async_client_get(task_id_t client_id, bool create)880 {881 client_t *client = NULL;882 883 futex_down(&async_futex);884 ht_link_t *link = hash_table_find(&client_hash_table, &client_id);885 if (link) {886 client = hash_table_get_inst(link, client_t, link);887 atomic_inc(&client->refcnt);888 } else if (create) {889 client = malloc(sizeof(client_t));890 if (client) {891 client->in_task_id = client_id;892 client->data = async_client_data_create();893 894 atomic_set(&client->refcnt, 1);895 hash_table_insert(&client_hash_table, &client->link);896 }897 }898 899 futex_up(&async_futex);900 return client;901 }902 903 static void async_client_put(client_t *client)904 {905 bool destroy;906 907 futex_down(&async_futex);908 909 if (atomic_predec(&client->refcnt) == 0) {910 hash_table_remove(&client_hash_table, &client->in_task_id);911 destroy = true;912 } else913 destroy = false;914 915 futex_up(&async_futex);916 917 if (destroy) {918 if (client->data)919 async_client_data_destroy(client->data);920 921 free(client);922 }923 }924 925 1246 void *async_get_client_data(void) 926 1247 { … … 934 1255 if (!client) 935 1256 return NULL; 1257 936 1258 if (!client->data) { 937 1259 async_client_put(client); 938 1260 return NULL; 939 1261 } 940 1262 941 1263 return client->data; 942 1264 } … … 945 1267 { 946 1268 client_t *client = async_client_get(client_id, false); 947 1269 948 1270 assert(client); 949 1271 assert(client->data); 950 1272 951 1273 /* Drop the reference we got in async_get_client_data_by_hash(). */ 952 1274 async_client_put(client); 953 1275 954 1276 /* Drop our own reference we got at the beginning of this function. */ 955 1277 async_client_put(client); 956 1278 } 957 1279 958 /** Wrapper for client connection fibril. 959 * 960 * When a new connection arrives, a fibril with this implementing function is 961 * created. It calls client_connection() and does the final cleanup. 962 * 963 * @param arg Connection structure pointer. 964 * 965 * @return Always zero. 966 * 967 */ 968 static int connection_fibril(void *arg) 969 { 970 assert(arg); 971 972 /* 973 * Setup fibril-local connection pointer. 974 */ 975 fibril_connection = (connection_t *) arg; 976 977 /* 978 * Add our reference for the current connection in the client task 979 * tracking structure. If this is the first reference, create and 980 * hash in a new tracking structure. 981 */ 982 983 client_t *client = async_client_get(fibril_connection->in_task_id, true); 984 if (!client) { 985 ipc_answer_0(fibril_connection->callid, ENOMEM); 986 return 0; 987 } 988 989 fibril_connection->client = client; 990 991 /* 992 * Call the connection handler function. 993 */ 994 fibril_connection->cfibril(fibril_connection->callid, 995 &fibril_connection->call, fibril_connection->carg); 996 997 /* 998 * Remove the reference for this client task connection. 999 */ 1000 async_client_put(client); 1001 1002 /* 1003 * Remove myself from the connection hash table. 1004 */ 1280 static port_t *async_find_port(iface_t iface, port_id_t port_id) 1281 { 1282 port_t *port = NULL; 1283 1005 1284 futex_down(&async_futex); 1006 hash_table_remove(&conn_hash_table, &fibril_connection->in_phone_hash); 1285 1286 ht_link_t *link = hash_table_find(&interface_hash_table, &iface); 1287 if (link) { 1288 interface_t *interface = 1289 hash_table_get_inst(link, interface_t, link); 1290 1291 link = hash_table_find(&interface->port_hash_table, &port_id); 1292 if (link) 1293 port = hash_table_get_inst(link, port_t, link); 1294 } 1295 1007 1296 futex_up(&async_futex); 1008 1297 1009 /* 1010 * Answer all remaining messages with EHANGUP. 1011 */ 1012 while (!list_empty(&fibril_connection->msg_queue)) { 1013 msg_t *msg = 1014 list_get_instance(list_first(&fibril_connection->msg_queue), 1015 msg_t, link); 1016 1017 list_remove(&msg->link); 1018 ipc_answer_0(msg->callid, EHANGUP); 1019 free(msg); 1020 } 1021 1022 /* 1023 * If the connection was hung-up, answer the last call, 1024 * i.e. IPC_M_PHONE_HUNGUP. 1025 */ 1026 if (fibril_connection->close_callid) 1027 ipc_answer_0(fibril_connection->close_callid, EOK); 1028 1029 free(fibril_connection); 1030 return 0; 1031 } 1032 1033 /** Create a new fibril for a new connection. 1034 * 1035 * Create new fibril for connection, fill in connection structures and insert 1036 * it into the hash table, so that later we can easily do routing of messages to 1037 * particular fibrils. 1038 * 1039 * @param in_task_id Identification of the incoming connection. 1040 * @param in_phone_hash Identification of the incoming connection. 1041 * @param callid Hash of the opening IPC_M_CONNECT_ME_TO call. 1042 * If callid is zero, the connection was opened by 1043 * accepting the IPC_M_CONNECT_TO_ME call and this function 1044 * is called directly by the server. 1045 * @param call Call data of the opening call. 1046 * @param cfibril Fibril function that should be called upon opening the 1047 * connection. 1048 * @param carg Extra argument to pass to the connection fibril 1049 * 1050 * @return New fibril id or NULL on failure. 1051 * 1052 */ 1053 fid_t async_new_connection(task_id_t in_task_id, sysarg_t in_phone_hash, 1054 ipc_callid_t callid, ipc_call_t *call, 1055 async_client_conn_t cfibril, void *carg) 1056 { 1057 connection_t *conn = malloc(sizeof(*conn)); 1058 if (!conn) { 1059 if (callid) 1060 ipc_answer_0(callid, ENOMEM); 1061 1062 return (uintptr_t) NULL; 1063 } 1064 1065 conn->in_task_id = in_task_id; 1066 conn->in_phone_hash = in_phone_hash; 1067 list_initialize(&conn->msg_queue); 1068 conn->callid = callid; 1069 conn->close_callid = 0; 1070 conn->carg = carg; 1071 1072 if (call) 1073 conn->call = *call; 1074 1075 /* We will activate the fibril ASAP */ 1076 conn->wdata.active = true; 1077 conn->cfibril = cfibril; 1078 conn->wdata.fid = fibril_create(connection_fibril, conn); 1079 1080 if (conn->wdata.fid == 0) { 1081 free(conn); 1082 1083 if (callid) 1084 ipc_answer_0(callid, ENOMEM); 1085 1086 return (uintptr_t) NULL; 1087 } 1088 1089 /* Add connection to the connection hash table */ 1090 1091 futex_down(&async_futex); 1092 hash_table_insert(&conn_hash_table, &conn->link); 1093 futex_up(&async_futex); 1094 1095 fibril_add_ready(conn->wdata.fid); 1096 1097 return conn->wdata.fid; 1298 return port; 1098 1299 } 1099 1300 … … 1111 1312 assert(call); 1112 1313 1113 /* Unrouted call - take some default action */1314 /* Kernel notification */ 1114 1315 if ((callid & IPC_CALLID_NOTIFICATION)) { 1316 fibril_t *fibril = (fibril_t *) __tcb_get()->fibril_data; 1317 unsigned oldsw = fibril->switches; 1318 1115 1319 process_notification(callid, call); 1320 1321 if (oldsw != fibril->switches) { 1322 /* 1323 * The notification handler did not execute atomically 1324 * and so the current manager fibril assumed the role of 1325 * a notification fibril. While waiting for its 1326 * resources, it switched to another manager fibril that 1327 * had already existed or it created a new one. We 1328 * therefore know there is at least yet another 1329 * manager fibril that can take over. We now kill the 1330 * current 'notification' fibril to prevent fibril 1331 * population explosion. 1332 */ 1333 futex_down(&async_futex); 1334 fibril_switch(FIBRIL_FROM_DEAD); 1335 } 1116 1336 return; 1117 1337 } 1118 1338 1119 switch (IPC_GET_IMETHOD(*call)) { 1120 case IPC_M_CLONE_ESTABLISH: 1121 case IPC_M_CONNECT_ME_TO: 1339 /* New connection */ 1340 if (IPC_GET_IMETHOD(*call) == IPC_M_CONNECT_ME_TO) { 1341 iface_t iface = (iface_t) IPC_GET_ARG1(*call); 1342 sysarg_t in_phone_hash = IPC_GET_ARG5(*call); 1343 1344 async_notification_handler_t handler = fallback_port_handler; 1345 void *data = fallback_port_data; 1346 1347 // TODO: Currently ignores all ports but the first one 1348 port_t *port = async_find_port(iface, 0); 1349 if (port) { 1350 handler = port->handler; 1351 data = port->data; 1352 } 1353 1354 async_new_connection(call->in_task_id, in_phone_hash, callid, 1355 call, handler, data); 1356 return; 1357 } 1358 1359 /* Cloned connection */ 1360 if (IPC_GET_IMETHOD(*call) == IPC_M_CLONE_ESTABLISH) { 1361 // TODO: Currently ignores ports altogether 1362 1122 1363 /* Open new connection with fibril, etc. */ 1123 1364 async_new_connection(call->in_task_id, IPC_GET_ARG5(*call), 1124 callid, call, client_connection, NULL);1365 callid, call, fallback_port_handler, fallback_port_data); 1125 1366 return; 1126 1367 } … … 1267 1508 void async_create_manager(void) 1268 1509 { 1269 fid_t fid = fibril_create (async_manager_fibril, NULL);1510 fid_t fid = fibril_create_generic(async_manager_fibril, NULL, PAGE_SIZE); 1270 1511 if (fid != 0) 1271 1512 fibril_add_manager(fid); … … 1283 1524 void __async_init(void) 1284 1525 { 1526 if (!hash_table_create(&interface_hash_table, 0, 0, 1527 &interface_hash_table_ops)) 1528 abort(); 1529 1285 1530 if (!hash_table_create(&client_hash_table, 0, 0, &client_hash_table_ops)) 1286 1531 abort(); … … 1297 1542 abort(); 1298 1543 1544 session_ns->iface = 0; 1299 1545 session_ns->mgmt = EXCHANGE_ATOMIC; 1300 1546 session_ns->phone = PHONE_NS; … … 1343 1589 1344 1590 msg->done = true; 1345 1591 1346 1592 if (msg->forget) { 1347 1593 assert(msg->wdata.active); … … 1351 1597 fibril_add_ready(msg->wdata.fid); 1352 1598 } 1353 1599 1354 1600 futex_up(&async_futex); 1355 1601 } … … 1386 1632 1387 1633 ipc_call_async_4(exch->phone, imethod, arg1, arg2, arg3, arg4, msg, 1388 reply_received , true);1634 reply_received); 1389 1635 1390 1636 return (aid_t) msg; … … 1424 1670 1425 1671 ipc_call_async_5(exch->phone, imethod, arg1, arg2, arg3, arg4, arg5, 1426 msg, reply_received , true);1672 msg, reply_received); 1427 1673 1428 1674 return (aid_t) msg; … … 1443 1689 1444 1690 futex_down(&async_futex); 1445 1691 1446 1692 assert(!msg->forget); 1447 1693 assert(!msg->destroyed); 1448 1694 1449 1695 if (msg->done) { 1450 1696 futex_up(&async_futex); … … 1487 1733 1488 1734 amsg_t *msg = (amsg_t *) amsgid; 1489 1735 1490 1736 futex_down(&async_futex); 1491 1737 1492 1738 assert(!msg->forget); 1493 1739 assert(!msg->destroyed); 1494 1740 1495 1741 if (msg->done) { 1496 1742 futex_up(&async_futex); … … 1504 1750 if (timeout < 0) 1505 1751 timeout = 0; 1506 1752 1507 1753 getuptime(&msg->wdata.to_event.expires); 1508 1754 tv_add_diff(&msg->wdata.to_event.expires, timeout); … … 1557 1803 { 1558 1804 amsg_t *msg = (amsg_t *) amsgid; 1559 1805 1560 1806 assert(msg); 1561 1807 assert(!msg->forget); 1562 1808 assert(!msg->destroyed); 1563 1809 1564 1810 futex_down(&async_futex); 1811 1565 1812 if (msg->done) { 1566 1813 amsg_destroy(msg); … … 1569 1816 msg->forget = true; 1570 1817 } 1818 1571 1819 futex_up(&async_futex); 1572 1820 } … … 1711 1959 { 1712 1960 if (exch != NULL) 1713 ipc_call_async_0(exch->phone, imethod, NULL, NULL , true);1961 ipc_call_async_0(exch->phone, imethod, NULL, NULL); 1714 1962 } 1715 1963 … … 1717 1965 { 1718 1966 if (exch != NULL) 1719 ipc_call_async_1(exch->phone, imethod, arg1, NULL, NULL , true);1967 ipc_call_async_1(exch->phone, imethod, arg1, NULL, NULL); 1720 1968 } 1721 1969 … … 1724 1972 { 1725 1973 if (exch != NULL) 1726 ipc_call_async_2(exch->phone, imethod, arg1, arg2, NULL, NULL, 1727 true); 1974 ipc_call_async_2(exch->phone, imethod, arg1, arg2, NULL, NULL); 1728 1975 } 1729 1976 … … 1733 1980 if (exch != NULL) 1734 1981 ipc_call_async_3(exch->phone, imethod, arg1, arg2, arg3, NULL, 1735 NULL , true);1982 NULL); 1736 1983 } 1737 1984 … … 1741 1988 if (exch != NULL) 1742 1989 ipc_call_async_4(exch->phone, imethod, arg1, arg2, arg3, arg4, 1743 NULL, NULL , true);1990 NULL, NULL); 1744 1991 } 1745 1992 … … 1749 1996 if (exch != NULL) 1750 1997 ipc_call_async_5(exch->phone, imethod, arg1, arg2, arg3, arg4, 1751 arg5, NULL, NULL , true);1998 arg5, NULL, NULL); 1752 1999 } 1753 2000 … … 1814 2061 * @param arg2 User defined argument. 1815 2062 * @param arg3 User defined argument. 1816 * @param client_receiver Connection handing routine.1817 2063 * 1818 2064 * @return Zero on success or a negative error code. … … 1820 2066 */ 1821 2067 int async_connect_to_me(async_exch_t *exch, sysarg_t arg1, sysarg_t arg2, 1822 sysarg_t arg3 , async_client_conn_t client_receiver, void *carg)2068 sysarg_t arg3) 1823 2069 { 1824 2070 if (exch == NULL) 1825 2071 return ENOENT; 1826 2072 1827 sysarg_t phone_hash; 2073 ipc_call_t answer; 2074 aid_t req = async_send_3(exch, IPC_M_CONNECT_TO_ME, arg1, arg2, arg3, 2075 &answer); 2076 1828 2077 sysarg_t rc; 1829 1830 aid_t req;1831 ipc_call_t answer;1832 req = async_send_3(exch, IPC_M_CONNECT_TO_ME, arg1, arg2, arg3,1833 &answer);1834 2078 async_wait_for(req, &rc); 1835 2079 if (rc != EOK) 1836 2080 return (int) rc; 1837 1838 phone_hash = IPC_GET_ARG5(answer);1839 1840 if (client_receiver != NULL)1841 async_new_connection(answer.in_task_id, phone_hash, 0, NULL,1842 client_receiver, carg);1843 2081 1844 2082 return EOK; … … 1881 2119 1882 2120 ipc_call_async_0(exch->phone, IPC_M_CLONE_ESTABLISH, msg, 1883 reply_received , true);2121 reply_received); 1884 2122 1885 2123 sysarg_t rc; … … 1900 2138 } 1901 2139 2140 sess->iface = 0; 1902 2141 sess->mgmt = mgmt; 1903 2142 sess->phone = phone; … … 1929 2168 1930 2169 ipc_call_async_4(phone, IPC_M_CONNECT_ME_TO, arg1, arg2, arg3, arg4, 1931 msg, reply_received , true);2170 msg, reply_received); 1932 2171 1933 2172 sysarg_t rc; … … 1969 2208 int phone = async_connect_me_to_internal(exch->phone, arg1, arg2, arg3, 1970 2209 0); 1971 1972 2210 if (phone < 0) { 1973 2211 errno = phone; … … 1976 2214 } 1977 2215 2216 sess->iface = 0; 1978 2217 sess->mgmt = mgmt; 1979 2218 sess->phone = phone; … … 1992 2231 } 1993 2232 2233 /** Wrapper for making IPC_M_CONNECT_ME_TO calls using the async framework. 2234 * 2235 * Ask through phone for a new connection to some service and block until 2236 * success. 2237 * 2238 * @param exch Exchange for sending the message. 2239 * @param iface Connection interface. 2240 * @param arg2 User defined argument. 2241 * @param arg3 User defined argument. 2242 * 2243 * @return New session on success or NULL on error. 2244 * 2245 */ 2246 async_sess_t *async_connect_me_to_iface(async_exch_t *exch, iface_t iface, 2247 sysarg_t arg2, sysarg_t arg3) 2248 { 2249 if (exch == NULL) { 2250 errno = ENOENT; 2251 return NULL; 2252 } 2253 2254 async_sess_t *sess = (async_sess_t *) malloc(sizeof(async_sess_t)); 2255 if (sess == NULL) { 2256 errno = ENOMEM; 2257 return NULL; 2258 } 2259 2260 int phone = async_connect_me_to_internal(exch->phone, iface, arg2, 2261 arg3, 0); 2262 if (phone < 0) { 2263 errno = phone; 2264 free(sess); 2265 return NULL; 2266 } 2267 2268 sess->iface = iface; 2269 sess->phone = phone; 2270 sess->arg1 = iface; 2271 sess->arg2 = arg2; 2272 sess->arg3 = arg3; 2273 2274 fibril_mutex_initialize(&sess->remote_state_mtx); 2275 sess->remote_state_data = NULL; 2276 2277 list_initialize(&sess->exch_list); 2278 fibril_mutex_initialize(&sess->mutex); 2279 atomic_set(&sess->refcnt, 0); 2280 2281 return sess; 2282 } 2283 1994 2284 /** Set arguments for new connections. 1995 2285 * … … 2047 2337 } 2048 2338 2339 sess->iface = 0; 2049 2340 sess->mgmt = mgmt; 2050 2341 sess->phone = phone; … … 2063 2354 } 2064 2355 2356 /** Wrapper for making IPC_M_CONNECT_ME_TO calls using the async framework. 2357 * 2358 * Ask through phone for a new connection to some service and block until 2359 * success. 2360 * 2361 * @param exch Exchange for sending the message. 2362 * @param iface Connection interface. 2363 * @param arg2 User defined argument. 2364 * @param arg3 User defined argument. 2365 * 2366 * @return New session on success or NULL on error. 2367 * 2368 */ 2369 async_sess_t *async_connect_me_to_blocking_iface(async_exch_t *exch, iface_t iface, 2370 sysarg_t arg2, sysarg_t arg3) 2371 { 2372 if (exch == NULL) { 2373 errno = ENOENT; 2374 return NULL; 2375 } 2376 2377 async_sess_t *sess = (async_sess_t *) malloc(sizeof(async_sess_t)); 2378 if (sess == NULL) { 2379 errno = ENOMEM; 2380 return NULL; 2381 } 2382 2383 int phone = async_connect_me_to_internal(exch->phone, iface, arg2, 2384 arg3, IPC_FLAG_BLOCKING); 2385 if (phone < 0) { 2386 errno = phone; 2387 free(sess); 2388 return NULL; 2389 } 2390 2391 sess->iface = iface; 2392 sess->phone = phone; 2393 sess->arg1 = iface; 2394 sess->arg2 = arg2; 2395 sess->arg3 = arg3; 2396 2397 fibril_mutex_initialize(&sess->remote_state_mtx); 2398 sess->remote_state_data = NULL; 2399 2400 list_initialize(&sess->exch_list); 2401 fibril_mutex_initialize(&sess->mutex); 2402 atomic_set(&sess->refcnt, 0); 2403 2404 return sess; 2405 } 2406 2065 2407 /** Connect to a task specified by id. 2066 2408 * … … 2081 2423 } 2082 2424 2425 sess->iface = 0; 2083 2426 sess->mgmt = EXCHANGE_ATOMIC; 2084 2427 sess->phone = phone; … … 2158 2501 return NULL; 2159 2502 2160 async_exch_t *exch; 2503 exch_mgmt_t mgmt = sess->mgmt; 2504 if (sess->iface != 0) 2505 mgmt = sess->iface & IFACE_EXCHANGE_MASK; 2506 2507 async_exch_t *exch = NULL; 2161 2508 2162 2509 fibril_mutex_lock(&async_sess_mutex); … … 2177 2524 */ 2178 2525 2179 if (( sess->mgmt == EXCHANGE_ATOMIC) ||2180 ( sess->mgmt == EXCHANGE_SERIALIZE)) {2526 if ((mgmt == EXCHANGE_ATOMIC) || 2527 (mgmt == EXCHANGE_SERIALIZE)) { 2181 2528 exch = (async_exch_t *) malloc(sizeof(async_exch_t)); 2182 2529 if (exch != NULL) { … … 2186 2533 exch->phone = sess->phone; 2187 2534 } 2188 } else { /* EXCHANGE_PARALLEL */ 2535 } else if (mgmt == EXCHANGE_PARALLEL) { 2536 int phone; 2537 2538 retry: 2189 2539 /* 2190 2540 * Make a one-time attempt to connect a new data phone. 2191 2541 */ 2192 2193 int phone;2194 2195 retry:2196 2542 phone = async_connect_me_to_internal(sess->phone, sess->arg1, 2197 2543 sess->arg2, sess->arg3, 0); … … 2235 2581 atomic_inc(&sess->refcnt); 2236 2582 2237 if ( sess->mgmt == EXCHANGE_SERIALIZE)2583 if (mgmt == EXCHANGE_SERIALIZE) 2238 2584 fibril_mutex_lock(&sess->mutex); 2239 2585 } … … 2255 2601 assert(sess != NULL); 2256 2602 2603 exch_mgmt_t mgmt = sess->mgmt; 2604 if (sess->iface != 0) 2605 mgmt = sess->iface & IFACE_EXCHANGE_MASK; 2606 2257 2607 atomic_dec(&sess->refcnt); 2258 2608 2259 if ( sess->mgmt == EXCHANGE_SERIALIZE)2609 if (mgmt == EXCHANGE_SERIALIZE) 2260 2610 fibril_mutex_unlock(&sess->mutex); 2261 2611 … … 2694 3044 } 2695 3045 2696 void * _data;3046 void *arg_data; 2697 3047 2698 3048 if (nullterm) 2699 _data = malloc(size + 1);3049 arg_data = malloc(size + 1); 2700 3050 else 2701 _data = malloc(size);2702 2703 if ( _data == NULL) {3051 arg_data = malloc(size); 3052 3053 if (arg_data == NULL) { 2704 3054 ipc_answer_0(callid, ENOMEM); 2705 3055 return ENOMEM; 2706 3056 } 2707 3057 2708 int rc = async_data_write_finalize(callid, _data, size);3058 int rc = async_data_write_finalize(callid, arg_data, size); 2709 3059 if (rc != EOK) { 2710 free( _data);3060 free(arg_data); 2711 3061 return rc; 2712 3062 } 2713 3063 2714 3064 if (nullterm) 2715 ((char *) _data)[size] = 0;2716 2717 *data = _data;3065 ((char *) arg_data)[size] = 0; 3066 3067 *data = arg_data; 2718 3068 if (received != NULL) 2719 3069 *received = size; … … 2813 3163 } 2814 3164 3165 sess->iface = 0; 2815 3166 sess->mgmt = mgmt; 2816 3167 sess->phone = phone; … … 2862 3213 } 2863 3214 3215 sess->iface = 0; 2864 3216 sess->mgmt = mgmt; 2865 3217 sess->phone = phone; … … 2907 3259 return NULL; 2908 3260 3261 sess->iface = 0; 2909 3262 sess->mgmt = mgmt; 2910 3263 sess->phone = phone; … … 2934 3287 { 2935 3288 assert(callid); 2936 3289 2937 3290 ipc_call_t call; 2938 3291 *callid = async_get_call(&call); 2939 3292 2940 3293 if (IPC_GET_IMETHOD(call) != IPC_M_STATE_CHANGE_AUTHORIZE) 2941 3294 return false; … … 2947 3300 if (arg3) 2948 3301 *arg3 = IPC_GET_ARG3(call); 2949 3302 2950 3303 return true; 2951 3304 }
Note:
See TracChangeset
for help on using the changeset viewer.