source: mainline/uspace/srv/bd/hr/fge.c@ 7bfe468

Last change on this file since 7bfe468 was b15e534, checked in by Miroslav Cimerman <mc@…>, 5 months ago

hr/fge.c: add some comments

  • Property mode set to 100644
File size: 11.5 KB
Line 
1/*
2 * Copyright (c) 2025 Miroslav Cimerman
3 * Copyright (c) 2024 Vojtech Horky
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions
8 * are met:
9 *
10 * - Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * - Redistributions in binary form must reproduce the above copyright
13 * notice, this list of conditions and the following disclaimer in the
14 * documentation and/or other materials provided with the distribution.
15 * - The name of the author may not be used to endorse or promote products
16 * derived from this software without specific prior written permission.
17 *
18 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
19 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
20 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
21 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
22 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
23 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
24 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
25 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
26 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
27 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28 */
29
30/** @addtogroup hr
31 * @{
32 */
33/**
34 * @file
35 * @brief Fibril group executor
36 *
37 * Fibril pool with pre-allocated storage allowing
38 * execution of groups consisting of multiple work
39 * units.
40 */
41
42#include <adt/bitmap.h>
43#include <adt/circ_buf.h>
44#include <assert.h>
45#include <errno.h>
46#include <fibril_synch.h>
47#include <stdatomic.h>
48#include <stdbool.h>
49#include <stdio.h>
50#include <stdlib.h>
51#include <types/common.h>
52
53#include "fge.h"
54
55struct fge_fibril_data;
56typedef struct fge_fibril_data fge_fibril_data_t;
57struct wu_queue;
58typedef struct wu_queue wu_queue_t;
59
60static void *hr_fpool_make_storage(hr_fpool_t *, ssize_t *);
61static void hr_fpool_group_epilogue(hr_fpool_t *);
62static errno_t fge_fibril(void *);
63static errno_t wu_queue_init(wu_queue_t *, size_t);
64static void wu_queue_push(wu_queue_t *, fge_fibril_data_t *);
65static void wu_queue_pop(wu_queue_t *, fge_fibril_data_t *);
66static ssize_t hr_fpool_get_free_slot(hr_fpool_t *);
67
68typedef struct fge_fibril_data {
69 hr_wu_t wu; /* user-provided work unit fcn pointer */
70 void *arg;
71 hr_fgroup_t *group;
72 ssize_t memslot; /* index to pool bitmap slot */
73} fge_fibril_data_t;
74
75typedef struct wu_queue {
76 fibril_mutex_t lock;
77 fibril_condvar_t not_empty;
78 fibril_condvar_t not_full;
79 fge_fibril_data_t *fexecs;
80 circ_buf_t cbuf;
81} wu_queue_t;
82
83struct hr_fpool {
84 fibril_mutex_t lock;
85 fibril_condvar_t all_wus_done;
86 bitmap_t bitmap;
87 wu_queue_t queue;
88 fid_t *fibrils;
89 uint8_t *wu_storage;
90 size_t fibril_cnt;
91 size_t max_wus;
92 size_t active_groups;
93 bool stop;
94 size_t wu_size;
95 size_t wu_storage_free_count;
96};
97
98struct hr_fgroup {
99 hr_fpool_t *pool;
100 size_t wu_cnt; /* total wu count */
101 size_t submitted;
102 size_t reserved_cnt; /* no. of reserved wu storage slots */
103 size_t reserved_avail;
104 size_t *memslots; /* indices to pool bitmap */
105 void *own_mem;
106 size_t own_used;
107 errno_t final_errno;
108 size_t finished_okay;
109 size_t finished_fail;
110 fibril_mutex_t lock;
111 fibril_condvar_t all_done;
112};
113
114hr_fpool_t *hr_fpool_create(size_t fibril_cnt, size_t max_wus,
115 size_t wu_storage_size)
116{
117 /* TODO: allow wu_storage_size to be 0 (we want to save mem) */
118 assert(max_wus > 0 && wu_storage_size > 0);
119
120 void *bitmap_data = NULL;
121
122 hr_fpool_t *result = calloc(1, sizeof(hr_fpool_t));
123 if (result == NULL)
124 return NULL;
125
126 result->fibrils = malloc(sizeof(fid_t) * fibril_cnt);
127 if (result->fibrils == NULL)
128 goto bad;
129
130 result->wu_storage = malloc(wu_storage_size * max_wus);
131 if (result->wu_storage == NULL)
132 goto bad;
133
134 bitmap_data = calloc(1, bitmap_size(max_wus));
135 if (bitmap_data == NULL)
136 goto bad;
137 bitmap_initialize(&result->bitmap, max_wus, bitmap_data);
138
139 if (wu_queue_init(&result->queue, max_wus) != EOK)
140 goto bad;
141
142 fibril_mutex_initialize(&result->lock);
143 fibril_condvar_initialize(&result->all_wus_done);
144
145 result->max_wus = max_wus;
146 result->fibril_cnt = fibril_cnt;
147 result->wu_size = wu_storage_size;
148 result->wu_storage_free_count = max_wus;
149 result->stop = false;
150 result->active_groups = 0;
151
152 for (size_t i = 0; i < fibril_cnt; i++) {
153 result->fibrils[i] = fibril_create(fge_fibril, result);
154 fibril_start(result->fibrils[i]);
155 }
156
157 return result;
158bad:
159 if (result->queue.fexecs != NULL)
160 free(result->queue.fexecs);
161 if (bitmap_data != NULL)
162 free(bitmap_data);
163 if (result->wu_storage != NULL)
164 free(result->wu_storage);
165 if (result->fibrils != NULL)
166 free(result->fibrils);
167 free(result);
168
169 return NULL;
170}
171
172void hr_fpool_destroy(hr_fpool_t *pool)
173{
174 fibril_mutex_lock(&pool->lock);
175 pool->stop = true;
176 while (pool->active_groups > 0)
177 fibril_condvar_wait(&pool->all_wus_done, &pool->lock);
178
179 fibril_mutex_unlock(&pool->lock);
180
181 free(pool->bitmap.bits);
182 free(pool->queue.fexecs);
183 free(pool->wu_storage);
184 free(pool->fibrils);
185 free(pool);
186}
187
188hr_fgroup_t *hr_fgroup_create(hr_fpool_t *parent, size_t wu_cnt)
189{
190 assert(wu_cnt > 0);
191
192 /*
193 * XXX: we can also get rid of this malloc() call,
194 * somewhat...
195 *
196 * Have some fgroups also pre-allocated for maximum
197 * pre-allocation power :-)
198 */
199 hr_fgroup_t *result = malloc(sizeof(hr_fgroup_t));
200 if (result == NULL)
201 return NULL;
202
203 result->reserved_cnt = 0;
204 result->own_mem = NULL;
205 result->memslots = NULL;
206
207 fibril_mutex_lock(&parent->lock);
208
209 parent->active_groups++;
210
211 if (parent->wu_storage_free_count >= wu_cnt) {
212 parent->wu_storage_free_count -= wu_cnt;
213 result->reserved_cnt = wu_cnt;
214 } else {
215 /*
216 * Could be more conservative with memory here and
217 * allocate space only for one work unit and execute
218 * work units sequentially like it was first intended with
219 * the fallback storage.
220 */
221 size_t taking = parent->wu_storage_free_count;
222 result->own_mem = malloc(parent->wu_size * (wu_cnt - taking));
223 if (result->own_mem == NULL)
224 goto bad;
225 result->reserved_cnt = taking;
226 parent->wu_storage_free_count = 0;
227 }
228
229 if (result->reserved_cnt > 0) {
230 result->memslots =
231 malloc(sizeof(size_t) * result->reserved_cnt);
232 if (result->memslots == NULL)
233 goto bad;
234 }
235
236 fibril_mutex_unlock(&parent->lock);
237
238 result->pool = parent;
239 result->wu_cnt = wu_cnt;
240 result->submitted = 0;
241 result->reserved_avail = result->reserved_cnt;
242 result->own_used = 0;
243 result->final_errno = EOK;
244 result->finished_okay = 0;
245 result->finished_fail = 0;
246
247 fibril_mutex_initialize(&result->lock);
248 fibril_condvar_initialize(&result->all_done);
249
250 return result;
251
252bad:
253 parent->wu_storage_free_count += result->reserved_cnt;
254 fibril_mutex_unlock(&parent->lock);
255
256 if (result->memslots != NULL)
257 free(result->memslots);
258 if (result->own_mem != NULL)
259 free(result->own_mem);
260 free(result);
261
262 return NULL;
263}
264
265void *hr_fgroup_alloc(hr_fgroup_t *group)
266{
267 void *storage;
268
269 fibril_mutex_lock(&group->lock);
270
271 if (group->reserved_avail > 0) {
272 ssize_t memslot;
273 storage = hr_fpool_make_storage(group->pool, &memslot);
274 assert(storage != NULL);
275 group->reserved_avail--;
276 group->memslots[group->submitted] = memslot;
277 } else {
278 storage =
279 group->own_mem + group->pool->wu_size * group->own_used;
280 group->own_used++;
281 }
282
283 fibril_mutex_unlock(&group->lock);
284
285 return storage;
286}
287
288void hr_fgroup_submit(hr_fgroup_t *group, hr_wu_t wu, void *arg)
289{
290 fibril_mutex_lock(&group->lock);
291 assert(group->submitted < group->wu_cnt);
292
293 fge_fibril_data_t executor;
294 executor.wu = wu;
295 executor.arg = arg;
296 executor.group = group;
297
298 if (group->submitted < group->reserved_cnt)
299 executor.memslot = group->memslots[group->submitted];
300 else
301 executor.memslot = -1;
302
303 group->submitted++;
304 fibril_mutex_unlock(&group->lock);
305
306 wu_queue_push(&group->pool->queue, &executor);
307}
308
309errno_t hr_fgroup_wait(hr_fgroup_t *group, size_t *rokay, size_t *rfailed)
310{
311 fibril_mutex_lock(&group->lock);
312 assert(group->submitted <= group->wu_cnt);
313
314 while (true) {
315 size_t finished = group->finished_fail + group->finished_okay;
316 if (finished == group->submitted)
317 break;
318
319 fibril_condvar_wait(&group->all_done, &group->lock);
320 }
321
322 if (rokay)
323 *rokay = group->finished_okay;
324 if (rfailed)
325 *rfailed = group->finished_fail;
326
327 errno_t rc = group->final_errno;
328
329 fibril_mutex_unlock(&group->lock);
330
331 hr_fpool_group_epilogue(group->pool);
332
333 if (group->memslots != NULL)
334 free(group->memslots);
335 if (group->own_mem != NULL)
336 free(group->own_mem);
337 free(group);
338
339 return rc;
340}
341
342static void *hr_fpool_make_storage(hr_fpool_t *pool, ssize_t *rmemslot)
343{
344 fibril_mutex_lock(&pool->lock);
345 ssize_t memslot = hr_fpool_get_free_slot(pool);
346 assert(memslot != -1);
347
348 bitmap_set(&pool->bitmap, memslot, 1);
349
350 fibril_mutex_unlock(&pool->lock);
351
352 if (rmemslot)
353 *rmemslot = memslot;
354
355 return pool->wu_storage + pool->wu_size * memslot;
356}
357
358static void hr_fpool_group_epilogue(hr_fpool_t *pool)
359{
360 fibril_mutex_lock(&pool->lock);
361
362 pool->active_groups--;
363 if (pool->active_groups == 0)
364 fibril_condvar_signal(&pool->all_wus_done);
365
366 fibril_mutex_unlock(&pool->lock);
367}
368
369static errno_t fge_fibril(void *arg)
370{
371 hr_fpool_t *pool = arg;
372 while (true) {
373 fge_fibril_data_t executor;
374 fibril_mutex_lock(&pool->lock);
375
376 while (circ_buf_nused(&pool->queue.cbuf) == 0 && !pool->stop) {
377 fibril_condvar_wait(&pool->queue.not_empty,
378 &pool->lock);
379 }
380
381 if (pool->stop && circ_buf_nused(&pool->queue.cbuf) == 0) {
382 fibril_mutex_unlock(&pool->lock);
383 break;
384 }
385
386 wu_queue_pop(&pool->queue, &executor);
387
388 fibril_mutex_unlock(&pool->lock);
389
390 hr_fgroup_t *group = executor.group;
391
392 errno_t rc = executor.wu(executor.arg);
393
394 if (rc == EOK) {
395 fibril_mutex_lock(&group->lock);
396 group->finished_okay++;
397 fibril_mutex_unlock(&group->lock);
398 } else {
399 fibril_mutex_lock(&group->lock);
400 group->finished_fail++;
401 if (rc == ENOMEM)
402 group->final_errno = ENOMEM;
403 fibril_mutex_unlock(&group->lock);
404 }
405
406 fibril_mutex_lock(&pool->lock);
407 if (executor.memslot > -1) {
408 bitmap_set(&pool->bitmap, executor.memslot, 0);
409 pool->wu_storage_free_count++;
410 }
411
412 fibril_mutex_lock(&group->lock);
413 size_t finished = group->finished_fail + group->finished_okay;
414 if (finished == group->submitted)
415 fibril_condvar_signal(&group->all_done);
416 fibril_mutex_unlock(&group->lock);
417
418 fibril_mutex_unlock(&pool->lock);
419 }
420 return EOK;
421}
422
423static errno_t wu_queue_init(wu_queue_t *queue, size_t nmemb)
424{
425 queue->fexecs = malloc(sizeof(fge_fibril_data_t) * nmemb);
426 if (queue->fexecs == NULL)
427 return ENOMEM;
428
429 circ_buf_init(&queue->cbuf, queue->fexecs, nmemb,
430 sizeof(fge_fibril_data_t));
431
432 fibril_mutex_initialize(&queue->lock);
433 fibril_condvar_initialize(&queue->not_empty);
434 fibril_condvar_initialize(&queue->not_full);
435
436 return EOK;
437}
438
439static void wu_queue_push(wu_queue_t *queue, fge_fibril_data_t *executor)
440{
441 fibril_mutex_lock(&queue->lock);
442
443 while (circ_buf_push(&queue->cbuf, executor) == EAGAIN)
444 fibril_condvar_wait(&queue->not_full, &queue->lock);
445
446 fibril_condvar_signal(&queue->not_empty);
447
448 fibril_mutex_unlock(&queue->lock);
449}
450
451static void wu_queue_pop(wu_queue_t *queue, fge_fibril_data_t *executor)
452{
453 fibril_mutex_lock(&queue->lock);
454
455 while (circ_buf_pop(&queue->cbuf, executor) == EAGAIN)
456 fibril_condvar_wait(&queue->not_empty, &queue->lock);
457
458 fibril_condvar_signal(&queue->not_full);
459
460 fibril_mutex_unlock(&queue->lock);
461}
462
463static ssize_t hr_fpool_get_free_slot(hr_fpool_t *pool)
464{
465 bitmap_t *bitmap = &pool->bitmap;
466 for (size_t i = 0; i < pool->max_wus; i++)
467 if (!bitmap_get(bitmap, i))
468 return i;
469 return -1;
470}
471
472/** @}
473 */
Note: See TracBrowser for help on using the repository browser.