source: mainline/uspace/srv/bd/hr/fge.c@ 83c8bb2

Last change on this file since 83c8bb2 was f725787, checked in by Miroslav Cimerman <mc@…>, 10 months ago

hr: fge: fibril group executor

This fibril pool allows execution of grouped work units,
providing pre-allocated storage for the workers.

Based on an idea from Vojtech Horky.

  • Property mode set to 100644
File size: 11.3 KB
Line 
1/*
2 * Copyright (c) 2024 Miroslav Cimerman
3 * Copyright (c) 2024 Vojtech Horky
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions
8 * are met:
9 *
10 * - Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * - Redistributions in binary form must reproduce the above copyright
13 * notice, this list of conditions and the following disclaimer in the
14 * documentation and/or other materials provided with the distribution.
15 * - The name of the author may not be used to endorse or promote products
16 * derived from this software without specific prior written permission.
17 *
18 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
19 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
20 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
21 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
22 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
23 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
24 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
25 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
26 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
27 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28 */
29
30/** @addtogroup hr
31 * @{
32 */
33/**
34 * @file
35 * @brief Fibril group executor
36 *
37 * Fibril pool with pre-allocated storage allowing
38 * execution of groups consisting of multiple work
39 * units.
40 */
41
42#include <adt/bitmap.h>
43#include <assert.h>
44#include <errno.h>
45#include <fibril_synch.h>
46#include <stdatomic.h>
47#include <stdbool.h>
48#include <stdio.h>
49#include <stdlib.h>
50#include <types/common.h>
51
52#include "fge.h"
53
54struct fge_fibril_data;
55typedef struct fge_fibril_data fge_fibril_data_t;
56struct wu_queue;
57typedef struct wu_queue wu_queue_t;
58
59static void *hr_fpool_make_storage(hr_fpool_t *, ssize_t *);
60static errno_t fge_fibril(void *);
61static errno_t wu_queue_init(wu_queue_t *, size_t);
62static void wu_queue_push(wu_queue_t *, fge_fibril_data_t);
63static fge_fibril_data_t wu_queue_pop(wu_queue_t *);
64static ssize_t hr_fpool_get_free_slot(hr_fpool_t *);
65
66typedef struct fge_fibril_data {
67 hr_wu_t wu; /* user-provided work unit fcn pointer */
68 void *arg;
69 hr_fgroup_t *group;
70 ssize_t memslot; /* index to pool bitmap slot */
71} fge_fibril_data_t;
72
73typedef struct wu_queue {
74 fibril_mutex_t lock;
75 fibril_condvar_t not_empty;
76 fibril_condvar_t not_full;
77 fge_fibril_data_t *fexecs;
78 size_t capacity;
79 size_t size;
80 size_t front;
81 size_t back;
82} wu_queue_t;
83
84struct hr_fpool {
85 fibril_mutex_t lock;
86 fibril_condvar_t all_wus_done;
87 bitmap_t bitmap;
88 wu_queue_t queue;
89 fid_t *fibrils;
90 uint8_t *wu_storage;
91 size_t fibril_cnt;
92 size_t max_wus;
93 size_t active_groups;
94 bool stop;
95 size_t wu_size;
96 size_t wu_storage_free_count;
97};
98
99struct hr_fgroup {
100 hr_fpool_t *pool;
101 size_t wu_cnt; /* total wu count */
102 size_t submitted;
103 size_t reserved_cnt; /* no. of reserved wu storage slots */
104 size_t reserved_avail;
105 size_t *memslots; /* indices to pool bitmap */
106 void *own_mem;
107 size_t own_used;
108 errno_t final_errno;
109 atomic_size_t finished_okay;
110 atomic_size_t finished_fail;
111 atomic_size_t wus_started;
112 fibril_mutex_t lock;
113 fibril_condvar_t all_done;
114};
115
116hr_fpool_t *hr_fpool_create(size_t fibril_cnt, size_t max_wus,
117 size_t wu_storage_size)
118{
119 void *bitmap_data = NULL;
120
121 hr_fpool_t *result = calloc(1, sizeof(hr_fpool_t));
122 if (result == NULL)
123 return NULL;
124
125 result->fibrils = malloc(sizeof(fid_t) * fibril_cnt);
126 if (result->fibrils == NULL)
127 goto bad;
128
129 result->wu_storage = malloc(wu_storage_size * max_wus);
130 if (result->wu_storage == NULL)
131 goto bad;
132
133 bitmap_data = calloc(1, bitmap_size(max_wus));
134 if (bitmap_data == NULL)
135 goto bad;
136 bitmap_initialize(&result->bitmap, max_wus, bitmap_data);
137
138 if (wu_queue_init(&result->queue, max_wus) != EOK)
139 goto bad;
140
141 fibril_mutex_initialize(&result->lock);
142 fibril_condvar_initialize(&result->all_wus_done);
143
144 result->max_wus = max_wus;
145 result->fibril_cnt = fibril_cnt;
146 result->wu_size = wu_storage_size;
147 result->wu_storage_free_count = max_wus;
148 result->stop = false;
149 result->active_groups = 0;
150
151 for (size_t i = 0; i < fibril_cnt; i++) {
152 result->fibrils[i] = fibril_create(fge_fibril, result);
153 fibril_start(result->fibrils[i]);
154 }
155
156 return result;
157bad:
158 if (result->queue.fexecs)
159 free(result->queue.fexecs);
160 if (bitmap_data)
161 free(bitmap_data);
162 if (result->wu_storage)
163 free(result->wu_storage);
164 if (result->fibrils)
165 free(result->fibrils);
166 free(result);
167
168 return NULL;
169}
170
171void hr_fpool_destroy(hr_fpool_t *pool)
172{
173 fibril_mutex_lock(&pool->lock);
174 pool->stop = true;
175 while (pool->active_groups > 0)
176 fibril_condvar_wait(&pool->all_wus_done, &pool->lock);
177
178 fibril_mutex_unlock(&pool->lock);
179
180 free(pool->bitmap.bits);
181 free(pool->queue.fexecs);
182 free(pool->wu_storage);
183 free(pool->fibrils);
184 free(pool);
185}
186
187static void *hr_fpool_make_storage(hr_fpool_t *pool, ssize_t *rmemslot)
188{
189 fibril_mutex_lock(&pool->lock);
190 ssize_t memslot = hr_fpool_get_free_slot(pool);
191 assert(memslot != -1);
192
193 bitmap_set(&pool->bitmap, memslot, 1);
194
195 fibril_mutex_unlock(&pool->lock);
196
197 if (rmemslot)
198 *rmemslot = memslot;
199
200 return pool->wu_storage + pool->wu_size * memslot;
201}
202
203hr_fgroup_t *hr_fgroup_create(hr_fpool_t *parent, size_t wu_cnt)
204{
205 hr_fgroup_t *result = malloc(sizeof(hr_fgroup_t));
206 if (result == NULL)
207 return NULL;
208
209 result->reserved_cnt = 0;
210 result->own_mem = NULL;
211 result->memslots = NULL;
212
213 fibril_mutex_lock(&parent->lock);
214
215 parent->active_groups++;
216
217 if (parent->wu_storage_free_count >= wu_cnt) {
218 parent->wu_storage_free_count -= wu_cnt;
219 result->reserved_cnt = wu_cnt;
220 } else {
221 /*
222 * Could be more conservative with memory here and
223 * allocate space only for one work unit and execute
224 * work units sequentially like it was first intended with
225 * the fallback storage.
226 */
227 size_t taking = parent->wu_storage_free_count;
228 result->own_mem = malloc(parent->wu_size * (wu_cnt - taking));
229 result->reserved_cnt = taking;
230 parent->wu_storage_free_count = 0;
231 if (result->own_mem == NULL)
232 goto bad;
233 }
234
235 if (result->reserved_cnt > 0) {
236 result->memslots =
237 malloc(sizeof(size_t) * result->reserved_cnt);
238 if (result->memslots == NULL)
239 goto bad;
240 }
241
242 fibril_mutex_unlock(&parent->lock);
243
244 result->pool = parent;
245 result->wu_cnt = wu_cnt;
246 result->submitted = 0;
247 result->reserved_avail = result->reserved_cnt;
248 result->own_used = 0;
249 result->final_errno = EOK;
250 result->finished_okay = 0;
251 result->finished_fail = 0;
252 result->wus_started = 0;
253
254 fibril_mutex_initialize(&result->lock);
255 fibril_condvar_initialize(&result->all_done);
256
257 return result;
258
259bad:
260 fibril_mutex_lock(&parent->lock);
261 parent->wu_storage_free_count += result->reserved_cnt;
262 fibril_mutex_unlock(&parent->lock);
263
264 if (result->memslots)
265 free(result->memslots);
266 if (result->own_mem)
267 free(result->own_mem);
268 free(result);
269
270 return NULL;
271}
272
273void *hr_fgroup_alloc(hr_fgroup_t *group)
274{
275 void *storage;
276
277 fibril_mutex_lock(&group->lock);
278
279 if (group->reserved_avail > 0) {
280 ssize_t memslot;
281 storage = hr_fpool_make_storage(group->pool, &memslot);
282 assert(storage != NULL);
283 group->reserved_avail--;
284 group->memslots[group->submitted] = memslot;
285 } else {
286 storage =
287 group->own_mem + group->pool->wu_size * group->own_used;
288 group->own_used++;
289 }
290
291 fibril_mutex_unlock(&group->lock);
292
293 return storage;
294}
295
296void hr_fgroup_submit(hr_fgroup_t *group, hr_wu_t wu, void *arg)
297{
298 fibril_mutex_lock(&group->lock);
299 assert(group->submitted + 1 <= group->wu_cnt);
300
301 fge_fibril_data_t executor;
302 executor.wu = wu;
303 executor.arg = arg;
304 executor.group = group;
305
306 if (group->submitted < group->reserved_cnt)
307 executor.memslot = group->memslots[group->submitted];
308 else
309 executor.memslot = -1;
310
311 group->submitted++;
312 fibril_mutex_unlock(&group->lock);
313
314 wu_queue_push(&group->pool->queue, executor);
315}
316
317static void hr_fpool_group_epilogue(hr_fpool_t *pool)
318{
319 fibril_mutex_lock(&pool->lock);
320
321 pool->active_groups--;
322 if (pool->active_groups == 0)
323 fibril_condvar_signal(&pool->all_wus_done);
324
325 fibril_mutex_unlock(&pool->lock);
326}
327
328errno_t hr_fgroup_wait(hr_fgroup_t *group, size_t *rfailed)
329{
330 fibril_mutex_lock(&group->lock);
331 while (true) {
332 size_t finished = group->finished_fail + group->finished_okay;
333 if (group->wus_started != 0 && group->wus_started == finished)
334 break;
335
336 fibril_condvar_wait(&group->all_done, &group->lock);
337 }
338
339 if (rfailed) {
340 *rfailed = group->finished_fail;
341 }
342
343 errno_t rc = EOK;
344 if (group->finished_okay != group->wus_started)
345 rc = EIO;
346
347 fibril_mutex_unlock(&group->lock);
348
349 hr_fpool_group_epilogue(group->pool);
350
351 if (group->memslots)
352 free(group->memslots);
353 if (group->own_mem)
354 free(group->own_mem);
355 free(group);
356
357 return rc;
358}
359
360static errno_t fge_fibril(void *arg)
361{
362 hr_fpool_t *pool = arg;
363 while (true) {
364 fge_fibril_data_t executor;
365 fibril_mutex_lock(&pool->lock);
366
367 while (pool->queue.size == 0 && !pool->stop) {
368 fibril_condvar_wait(&pool->queue.not_empty,
369 &pool->lock);
370 }
371
372 if (pool->stop && pool->queue.size == 0) {
373 fibril_mutex_unlock(&pool->lock);
374 break;
375 }
376
377 executor = wu_queue_pop(&pool->queue);
378
379 fibril_mutex_unlock(&pool->lock);
380
381 hr_fgroup_t *group = executor.group;
382
383 atomic_fetch_add_explicit(&group->wus_started, 1,
384 memory_order_relaxed);
385
386 errno_t rc = executor.wu(executor.arg);
387
388 if (rc == EOK)
389 atomic_fetch_add_explicit(&group->finished_okay, 1,
390 memory_order_relaxed);
391 else
392 atomic_fetch_add_explicit(&group->finished_fail, 1,
393 memory_order_relaxed);
394
395 fibril_mutex_lock(&pool->lock);
396 if (executor.memslot > -1) {
397 bitmap_set(&pool->bitmap, executor.memslot, 0);
398 pool->wu_storage_free_count++;
399 }
400
401 size_t group_total_done = group->finished_fail +
402 group->finished_okay;
403 if (group->wus_started == group_total_done)
404 fibril_condvar_signal(&group->all_done);
405
406 fibril_mutex_unlock(&pool->lock);
407 }
408 return EOK;
409}
410
411static errno_t wu_queue_init(wu_queue_t *queue, size_t capacity)
412{
413 queue->fexecs = malloc(sizeof(fge_fibril_data_t) * capacity);
414 if (queue->fexecs == NULL)
415 return ENOMEM;
416
417 queue->capacity = capacity;
418 queue->size = 0;
419 queue->front = 0;
420 queue->back = 0;
421 fibril_mutex_initialize(&queue->lock);
422 fibril_condvar_initialize(&queue->not_empty);
423 fibril_condvar_initialize(&queue->not_full);
424
425 return EOK;
426}
427
428static void wu_queue_push(wu_queue_t *queue, fge_fibril_data_t executor)
429{
430 fibril_mutex_lock(&queue->lock);
431
432 while (queue->size == queue->capacity)
433 fibril_condvar_wait(&queue->not_full, &queue->lock);
434
435 queue->fexecs[queue->back] = executor;
436 queue->back = (queue->back + 1) % queue->capacity;
437 queue->size++;
438
439 fibril_condvar_signal(&queue->not_empty);
440
441 fibril_mutex_unlock(&queue->lock);
442}
443
444static fge_fibril_data_t wu_queue_pop(wu_queue_t *queue)
445{
446 fibril_mutex_lock(&queue->lock);
447
448 while (queue->size == 0)
449 fibril_condvar_wait(&queue->not_empty, &queue->lock);
450
451 fge_fibril_data_t wu = queue->fexecs[queue->front];
452 queue->front = (queue->front + 1) % queue->capacity;
453 queue->size--;
454
455 fibril_condvar_signal(&queue->not_full);
456
457 fibril_mutex_unlock(&queue->lock);
458 return wu;
459}
460
461static ssize_t hr_fpool_get_free_slot(hr_fpool_t *pool)
462{
463 bitmap_t *bitmap = &pool->bitmap;
464 for (size_t i = 0; i < pool->max_wus; i++)
465 if (!bitmap_get(bitmap, i))
466 return i;
467 return -1;
468}
469
470/** @}
471 */
Note: See TracBrowser for help on using the repository browser.