Fork us on GitHub Follow us on Facebook Follow us on Twitter

Changeset 3b1cc8d in mainline


Ignore:
Timestamp:
2018-06-14T19:19:51Z (3 years ago)
Author:
GitHub <noreply@…>
Branches:
master
Children:
e6bab27b
Parents:
587478b
git-author:
Jiří Zárevúcky <zarevucky.jiri@…> (2018-06-14 19:19:51)
git-committer:
GitHub <noreply@…> (2018-06-14 19:19:51)
Message:

Add notification queue to the async framework. (#40)

Instead of running notification handlers in the same fibril that received it,
forcing us to allocate a new fibril when the handler blocks, we instead queue
the notifications, and allow an arbitrary but fixed number of dedicated fibrils
handle them.

Although a service can increase the number of handler fibrils to reduce latency,
there are now no dynamic allocations due to received notifications.
When the same notification is received again while the older instance is still
in queue, the new notification overwrites the old and increments a counter
of received notifications.

The counter is currently unused, because passing it to the handler would
require extensive changes to user code, but it should be straightforward
to make use of it should the need arise.

Location:
uspace/lib/c
Files:
4 edited

Legend:

Unmodified
Added
Removed
  • uspace/lib/c/generic/async/server.c

    r587478b r3b1cc8d  
    181181/* Notification data */
    182182typedef struct {
    183         ht_link_t link;
     183        /** notification_hash_table link */
     184        ht_link_t htlink;
     185
     186        /** notification_queue link */
     187        link_t qlink;
    184188
    185189        /** Notification method */
     
    189193        async_notification_handler_t handler;
    190194
    191         /** Notification data */
    192         void *data;
     195        /** Notification handler argument */
     196        void *arg;
     197
     198        /** Data of the most recent notification. */
     199        ipc_call_t calldata;
     200
     201        /**
     202         * How many notifications with this `imethod` arrived since it was last
     203         * handled. If `count` > 1, `calldata` only holds the data for the most
     204         * recent such notification, all the older data being lost.
     205         *
     206         * `async_spawn_notification_handler()` can be used to increase the
     207         * number of notifications that can be processed simultaneously,
     208         * reducing the likelihood of losing them when the handler blocks.
     209         */
     210        long count;
    193211} notification_t;
    194212
     
    224242static hash_table_t client_hash_table;
    225243static hash_table_t conn_hash_table;
     244
     245// TODO: lockfree notification_queue?
     246static futex_t notification_futex = FUTEX_INITIALIZER;
    226247static hash_table_t notification_hash_table;
     248static LIST_INITIALIZE(notification_queue);
     249static FIBRIL_SEMAPHORE_INITIALIZE(notification_semaphore, 0);
     250
    227251static LIST_INITIALIZE(timeout_list);
    228252
     
    556580{
    557581        notification_t *notification =
    558             hash_table_get_inst(item, notification_t, link);
     582            hash_table_get_inst(item, notification_t, htlink);
    559583        return notification_key_hash(&notification->imethod);
    560584}
     
    564588        sysarg_t id = *(sysarg_t *) key;
    565589        notification_t *notification =
    566             hash_table_get_inst(item, notification_t, link);
     590            hash_table_get_inst(item, notification_t, htlink);
    567591        return id == notification->imethod;
    568592}
     
    663687}
    664688
    665 /** Process notification.
     689/** Function implementing the notification handler fibril. Never returns. */
     690static errno_t notification_fibril_func(void *arg)
     691{
     692        (void) arg;
     693
     694        while (true) {
     695                fibril_semaphore_down(&notification_semaphore);
     696
     697                futex_lock(&notification_futex);
     698
     699                /*
     700                 * The semaphore ensures that if we get this far,
     701                 * the queue must be non-empty.
     702                 */
     703                assert(!list_empty(&notification_queue));
     704
     705                notification_t *notification = list_get_instance(
     706                    list_first(&notification_queue), notification_t, qlink);
     707                list_remove(&notification->qlink);
     708
     709                async_notification_handler_t handler = notification->handler;
     710                void *arg = notification->arg;
     711                ipc_call_t calldata = notification->calldata;
     712                long count = notification->count;
     713
     714                notification->count = 0;
     715
     716                futex_unlock(&notification_futex);
     717
     718                // FIXME: Pass count to the handler. It might be important.
     719                (void) count;
     720
     721                if (handler)
     722                        handler(&calldata, arg);
     723        }
     724
     725        /* Not reached. */
     726        return EOK;
     727}
     728
     729/**
     730 * Creates a new dedicated fibril for handling notifications.
     731 * By default, there is one such fibril. This function can be used to
     732 * create more in order to increase the number of notification that can
     733 * be processed concurrently.
     734 *
     735 * Currently, there is no way to destroy those fibrils after they are created.
     736 */
     737errno_t async_spawn_notification_handler(void)
     738{
     739        fid_t f = fibril_create(notification_fibril_func, NULL);
     740        if (f == 0)
     741                return ENOMEM;
     742
     743        fibril_add_ready(f);
     744        return EOK;
     745}
     746
     747/** Queue notification.
    666748 *
    667749 * @param call   Data of the incoming call.
    668750 *
    669751 */
    670 static void process_notification(ipc_call_t *call)
    671 {
    672         async_notification_handler_t handler = NULL;
    673         void *data = NULL;
    674 
     752static void queue_notification(ipc_call_t *call)
     753{
    675754        assert(call);
    676755
    677         futex_down(&async_futex);
     756        futex_lock(&notification_futex);
    678757
    679758        ht_link_t *link = hash_table_find(&notification_hash_table,
    680759            &IPC_GET_IMETHOD(*call));
    681         if (link) {
    682                 notification_t *notification =
    683                     hash_table_get_inst(link, notification_t, link);
    684                 handler = notification->handler;
    685                 data = notification->data;
    686         }
    687 
    688         futex_up(&async_futex);
    689 
    690         if (handler)
    691                 handler(call, data);
     760        if (!link) {
     761                /* Invalid notification. */
     762                // TODO: Make sure this can't happen and turn it into assert.
     763                futex_unlock(&notification_futex);
     764                return;
     765        }
     766
     767        notification_t *notification =
     768            hash_table_get_inst(link, notification_t, htlink);
     769
     770        notification->count++;
     771        notification->calldata = *call;
     772
     773        if (link_in_use(&notification->qlink)) {
     774                /* Notification already queued. */
     775                futex_unlock(&notification_futex);
     776                return;
     777        }
     778
     779        list_append(&notification->qlink, &notification_queue);
     780        futex_unlock(&notification_futex);
     781
     782        fibril_semaphore_up(&notification_semaphore);
     783}
     784
     785/**
     786 * Creates a new notification structure and inserts it into the hash table.
     787 *
     788 * @param handler  Function to call when notification is received.
     789 * @param arg      Argument for the handler function.
     790 * @return         The newly created notification structure.
     791 */
     792static notification_t *notification_create(async_notification_handler_t handler, void *arg)
     793{
     794        notification_t *notification = calloc(1, sizeof(notification_t));
     795        if (!notification)
     796                return NULL;
     797
     798        notification->handler = handler;
     799        notification->arg = arg;
     800
     801        fid_t fib = 0;
     802
     803        futex_lock(&notification_futex);
     804
     805        if (notification_avail == 0) {
     806                /* Attempt to create the first handler fibril. */
     807                fib = fibril_create(notification_fibril_func, NULL);
     808                if (fib == 0) {
     809                        futex_unlock(&notification_futex);
     810                        free(notification);
     811                        return NULL;
     812                }
     813        }
     814
     815        sysarg_t imethod = notification_avail;
     816        notification_avail++;
     817
     818        notification->imethod = imethod;
     819        hash_table_insert(&notification_hash_table, &notification->htlink);
     820
     821        futex_unlock(&notification_futex);
     822
     823        if (imethod == 0) {
     824                assert(fib);
     825                fibril_add_ready(fib);
     826        }
     827
     828        return notification;
    692829}
    693830
     
    707844    void *data, const irq_code_t *ucode, cap_irq_handle_t *handle)
    708845{
    709         notification_t *notification =
    710             (notification_t *) malloc(sizeof(notification_t));
     846        notification_t *notification = notification_create(handler, data);
    711847        if (!notification)
    712848                return ENOMEM;
    713849
    714         futex_down(&async_futex);
    715 
    716         sysarg_t imethod = notification_avail;
    717         notification_avail++;
    718 
    719         notification->imethod = imethod;
    720         notification->handler = handler;
    721         notification->data = data;
    722 
    723         hash_table_insert(&notification_hash_table, &notification->link);
    724 
    725         futex_up(&async_futex);
    726 
    727850        cap_irq_handle_t ihandle;
    728         errno_t rc = ipc_irq_subscribe(inr, imethod, ucode, &ihandle);
     851        errno_t rc = ipc_irq_subscribe(inr, notification->imethod, ucode,
     852            &ihandle);
    729853        if (rc == EOK && handle != NULL) {
    730854                *handle = ihandle;
     
    760884    async_notification_handler_t handler, void *data)
    761885{
    762         notification_t *notification =
    763             (notification_t *) malloc(sizeof(notification_t));
     886        notification_t *notification = notification_create(handler, data);
    764887        if (!notification)
    765888                return ENOMEM;
    766889
    767         futex_down(&async_futex);
    768 
    769         sysarg_t imethod = notification_avail;
    770         notification_avail++;
    771 
    772         notification->imethod = imethod;
    773         notification->handler = handler;
    774         notification->data = data;
    775 
    776         hash_table_insert(&notification_hash_table, &notification->link);
    777 
    778         futex_up(&async_futex);
    779 
    780         return ipc_event_subscribe(evno, imethod);
     890        return ipc_event_subscribe(evno, notification->imethod);
    781891}
    782892
     
    793903    async_notification_handler_t handler, void *data)
    794904{
    795         notification_t *notification =
    796             (notification_t *) malloc(sizeof(notification_t));
     905        notification_t *notification = notification_create(handler, data);
    797906        if (!notification)
    798907                return ENOMEM;
    799908
    800         futex_down(&async_futex);
    801 
    802         sysarg_t imethod = notification_avail;
    803         notification_avail++;
    804 
    805         notification->imethod = imethod;
    806         notification->handler = handler;
    807         notification->data = data;
    808 
    809         hash_table_insert(&notification_hash_table, &notification->link);
    810 
    811         futex_up(&async_futex);
    812 
    813         return ipc_event_task_subscribe(evno, imethod);
     909        return ipc_event_task_subscribe(evno, notification->imethod);
    814910}
    815911
     
    9731069        /* Kernel notification */
    9741070        if ((chandle == CAP_NIL) && (call->flags & IPC_CALL_NOTIF)) {
    975                 fibril_t *fibril = (fibril_t *) __tcb_get()->fibril_data;
    976                 unsigned oldsw = fibril->switches;
    977 
    978                 process_notification(call);
    979 
    980                 if (oldsw != fibril->switches) {
    981                         /*
    982                          * The notification handler did not execute atomically
    983                          * and so the current manager fibril assumed the role of
    984                          * a notification fibril. While waiting for its
    985                          * resources, it switched to another manager fibril that
    986                          * had already existed or it created a new one. We
    987                          * therefore know there is at least yet another
    988                          * manager fibril that can take over. We now kill the
    989                          * current 'notification' fibril to prevent fibril
    990                          * population explosion.
    991                          */
    992                         futex_down(&async_futex);
    993                         fibril_switch(FIBRIL_FROM_DEAD);
    994                 }
    995 
     1071                queue_notification(call);
    9961072                return;
    9971073        }
  • uspace/lib/c/generic/fibril.c

    r587478b r3b1cc8d  
    116116        fibril->waits_for = NULL;
    117117
    118         fibril->switches = 0;
    119 
    120118        /*
    121119         * We are called before __tcb_set(), so we need to use
     
    206204                break;
    207205        case FIBRIL_TO_MANAGER:
    208                 srcf->switches++;
    209206                /*
    210207                 * Don't put the current fibril into any list, it should
  • uspace/lib/c/include/async.h

    r587478b r3b1cc8d  
    492492    sysarg_t, sysarg_t, sysarg_t);
    493493
     494errno_t async_spawn_notification_handler(void);
     495
    494496#endif
    495497
  • uspace/lib/c/include/fibril.h

    r587478b r3b1cc8d  
    7272
    7373        fibril_owner_info_t *waits_for;
    74 
    75         unsigned int switches;
    7674} fibril_t;
    7775
Note: See TracChangeset for help on using the changeset viewer.