source: mainline/uspace/srv/bd/hr/fge.c@ 6d0fc11

Last change on this file since 6d0fc11 was 6d0fc11, checked in by Miroslav Cimerman <mc@…>, 3 months ago

hr: style: align structures, function prototypes

  • Property mode set to 100644
File size: 11.9 KB
Line 
1/*
2 * Copyright (c) 2025 Miroslav Cimerman
3 * Copyright (c) 2024 Vojtech Horky
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions
8 * are met:
9 *
10 * - Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * - Redistributions in binary form must reproduce the above copyright
13 * notice, this list of conditions and the following disclaimer in the
14 * documentation and/or other materials provided with the distribution.
15 * - The name of the author may not be used to endorse or promote products
16 * derived from this software without specific prior written permission.
17 *
18 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
19 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
20 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
21 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
22 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
23 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
24 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
25 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
26 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
27 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28 */
29
30/** @addtogroup hr
31 * @{
32 */
33/**
34 * @file
35 * @brief Fibril group executor
36 *
37 * Fibril pool with pre-allocated storage allowing
38 * execution of groups consisting of multiple work
39 * units.
40 */
41
42#include <adt/bitmap.h>
43#include <adt/circ_buf.h>
44#include <assert.h>
45#include <errno.h>
46#include <fibril_synch.h>
47#include <stdatomic.h>
48#include <stdbool.h>
49#include <stdio.h>
50#include <stdlib.h>
51#include <types/common.h>
52
53#include "fge.h"
54
55/* forward declarations */
56struct fge_fibril_data;
57typedef struct fge_fibril_data fge_fibril_data_t;
58struct wu_queue;
59typedef struct wu_queue wu_queue_t;
60
61static void *hr_fpool_make_storage(hr_fpool_t *, ssize_t *);
62static void hr_fpool_group_epilogue(hr_fpool_t *);
63static errno_t fge_fibril(void *);
64static errno_t wu_queue_init(wu_queue_t *, size_t);
65static void wu_queue_push(wu_queue_t *, fge_fibril_data_t *);
66static void wu_queue_pop(wu_queue_t *, fge_fibril_data_t *);
67static ssize_t hr_fpool_get_free_slot(hr_fpool_t *);
68
69struct fge_fibril_data {
70 hr_wu_t wu; /* work unit function pointer */
71 void *arg; /* work unit function argument */
72 hr_fgroup_t *group; /* back-pointer to group */
73 ssize_t memslot; /* index to pool bitmap slot */
74};
75
76struct wu_queue {
77 fibril_mutex_t lock;
78 fibril_condvar_t not_empty;
79 fibril_condvar_t not_full;
80 fge_fibril_data_t *fexecs; /* circ-buf memory */
81 circ_buf_t cbuf;
82};
83
84struct hr_fpool {
85 fibril_mutex_t lock;
86 bitmap_t bitmap; /* memory slot bitmap */
87 wu_queue_t queue;
88 fid_t *fibrils;
89 uint8_t *wu_storage; /* pre-allocated pool storage */
90 size_t fibril_cnt;
91 size_t max_wus;
92 size_t active_groups;
93 bool stop;
94 size_t wu_size;
95 size_t wu_storage_free_count;
96 fibril_condvar_t all_wus_done;
97};
98
99struct hr_fgroup {
100 hr_fpool_t *pool; /* back-pointer to pool */
101 size_t wu_cnt; /* upper bound of work units */
102 size_t submitted; /* number of submitted jobs */
103 size_t reserved_cnt; /* no. of reserved wu storage slots */
104 size_t reserved_avail;
105 size_t *memslots; /* indices to pool bitmap */
106 void *own_mem; /* own allocated memory */
107 size_t own_used; /* own memory slots used counter */
108 errno_t final_errno; /* agreggated errno */
109 size_t finished_okay; /* no. of wus that ended with EOK */
110 size_t finished_fail; /* no. of wus that ended with != EOK */
111 fibril_mutex_t lock;
112 fibril_condvar_t all_done;
113};
114
115hr_fpool_t *hr_fpool_create(size_t fibril_cnt, size_t max_wus,
116 size_t wu_storage_size)
117{
118 /* TODO: allow wu_storage_size to be 0 (we want to save mem) */
119 assert(max_wus > 0 && wu_storage_size > 0);
120
121 void *bitmap_data = NULL;
122
123 hr_fpool_t *result = calloc(1, sizeof(hr_fpool_t));
124 if (result == NULL)
125 return NULL;
126
127 result->fibrils = malloc(sizeof(fid_t) * fibril_cnt);
128 if (result->fibrils == NULL)
129 goto bad;
130
131 result->wu_storage = malloc(wu_storage_size * max_wus);
132 if (result->wu_storage == NULL)
133 goto bad;
134
135 bitmap_data = calloc(1, bitmap_size(max_wus));
136 if (bitmap_data == NULL)
137 goto bad;
138 bitmap_initialize(&result->bitmap, max_wus, bitmap_data);
139
140 if (wu_queue_init(&result->queue, max_wus) != EOK)
141 goto bad;
142
143 fibril_mutex_initialize(&result->lock);
144 fibril_condvar_initialize(&result->all_wus_done);
145
146 result->max_wus = max_wus;
147 result->fibril_cnt = fibril_cnt;
148 result->wu_size = wu_storage_size;
149 result->wu_storage_free_count = max_wus;
150 result->stop = false;
151 result->active_groups = 0;
152
153 for (size_t i = 0; i < fibril_cnt; i++) {
154 result->fibrils[i] = fibril_create(fge_fibril, result);
155 fibril_start(result->fibrils[i]);
156 }
157
158 return result;
159bad:
160 if (result->queue.fexecs != NULL)
161 free(result->queue.fexecs);
162 if (bitmap_data != NULL)
163 free(bitmap_data);
164 if (result->wu_storage != NULL)
165 free(result->wu_storage);
166 if (result->fibrils != NULL)
167 free(result->fibrils);
168 free(result);
169
170 return NULL;
171}
172
173void hr_fpool_destroy(hr_fpool_t *pool)
174{
175 fibril_mutex_lock(&pool->lock);
176 pool->stop = true;
177 while (pool->active_groups > 0)
178 fibril_condvar_wait(&pool->all_wus_done, &pool->lock);
179
180 fibril_mutex_unlock(&pool->lock);
181
182 free(pool->bitmap.bits);
183 free(pool->queue.fexecs);
184 free(pool->wu_storage);
185 free(pool->fibrils);
186 free(pool);
187}
188
189hr_fgroup_t *hr_fgroup_create(hr_fpool_t *parent, size_t wu_cnt)
190{
191 assert(wu_cnt > 0);
192
193 /*
194 * XXX: we can also get rid of this malloc() call,
195 * somewhat...
196 *
197 * Have some fgroups also pre-allocated for maximum
198 * pre-allocation power :-)
199 */
200 hr_fgroup_t *result = malloc(sizeof(hr_fgroup_t));
201 if (result == NULL)
202 return NULL;
203
204 result->reserved_cnt = 0;
205 result->own_mem = NULL;
206 result->memslots = NULL;
207
208 fibril_mutex_lock(&parent->lock);
209
210 parent->active_groups++;
211
212 if (parent->wu_storage_free_count >= wu_cnt) {
213 parent->wu_storage_free_count -= wu_cnt;
214 result->reserved_cnt = wu_cnt;
215 } else {
216 /*
217 * Could be more conservative with memory here and
218 * allocate space only for one work unit and execute
219 * work units sequentially like it was first intended with
220 * the fallback storage.
221 */
222 size_t taking = parent->wu_storage_free_count;
223 result->own_mem = malloc(parent->wu_size * (wu_cnt - taking));
224 if (result->own_mem == NULL)
225 goto bad;
226 result->reserved_cnt = taking;
227 parent->wu_storage_free_count = 0;
228 }
229
230 if (result->reserved_cnt > 0) {
231 result->memslots =
232 malloc(sizeof(size_t) * result->reserved_cnt);
233 if (result->memslots == NULL)
234 goto bad;
235 }
236
237 fibril_mutex_unlock(&parent->lock);
238
239 result->pool = parent;
240 result->wu_cnt = wu_cnt;
241 result->submitted = 0;
242 result->reserved_avail = result->reserved_cnt;
243 result->own_used = 0;
244 result->final_errno = EOK;
245 result->finished_okay = 0;
246 result->finished_fail = 0;
247
248 fibril_mutex_initialize(&result->lock);
249 fibril_condvar_initialize(&result->all_done);
250
251 return result;
252
253bad:
254 parent->wu_storage_free_count += result->reserved_cnt;
255 fibril_mutex_unlock(&parent->lock);
256
257 if (result->memslots != NULL)
258 free(result->memslots);
259 if (result->own_mem != NULL)
260 free(result->own_mem);
261 free(result);
262
263 return NULL;
264}
265
266void *hr_fgroup_alloc(hr_fgroup_t *group)
267{
268 void *storage;
269
270 fibril_mutex_lock(&group->lock);
271
272 if (group->reserved_avail > 0) {
273 ssize_t memslot;
274 storage = hr_fpool_make_storage(group->pool, &memslot);
275 assert(storage != NULL);
276 group->reserved_avail--;
277 group->memslots[group->submitted] = memslot;
278 } else {
279 storage =
280 group->own_mem + group->pool->wu_size * group->own_used;
281 group->own_used++;
282 }
283
284 fibril_mutex_unlock(&group->lock);
285
286 return storage;
287}
288
289void hr_fgroup_submit(hr_fgroup_t *group, hr_wu_t wu, void *arg)
290{
291 fibril_mutex_lock(&group->lock);
292 assert(group->submitted < group->wu_cnt);
293
294 fge_fibril_data_t executor;
295 executor.wu = wu;
296 executor.arg = arg;
297 executor.group = group;
298
299 if (group->submitted < group->reserved_cnt)
300 executor.memslot = group->memslots[group->submitted];
301 else
302 executor.memslot = -1;
303
304 group->submitted++;
305 fibril_mutex_unlock(&group->lock);
306
307 wu_queue_push(&group->pool->queue, &executor);
308}
309
310errno_t hr_fgroup_wait(hr_fgroup_t *group, size_t *rokay, size_t *rfailed)
311{
312 fibril_mutex_lock(&group->lock);
313 assert(group->submitted <= group->wu_cnt);
314
315 while (true) {
316 size_t finished = group->finished_fail + group->finished_okay;
317 if (finished == group->submitted)
318 break;
319
320 fibril_condvar_wait(&group->all_done, &group->lock);
321 }
322
323 if (rokay)
324 *rokay = group->finished_okay;
325 if (rfailed)
326 *rfailed = group->finished_fail;
327
328 errno_t rc = group->final_errno;
329
330 fibril_mutex_unlock(&group->lock);
331
332 hr_fpool_group_epilogue(group->pool);
333
334 if (group->memslots != NULL)
335 free(group->memslots);
336 if (group->own_mem != NULL)
337 free(group->own_mem);
338 free(group);
339
340 return rc;
341}
342
343static void *hr_fpool_make_storage(hr_fpool_t *pool, ssize_t *rmemslot)
344{
345 fibril_mutex_lock(&pool->lock);
346 ssize_t memslot = hr_fpool_get_free_slot(pool);
347 assert(memslot != -1);
348
349 bitmap_set(&pool->bitmap, memslot, 1);
350
351 fibril_mutex_unlock(&pool->lock);
352
353 if (rmemslot)
354 *rmemslot = memslot;
355
356 return pool->wu_storage + pool->wu_size * memslot;
357}
358
359static void hr_fpool_group_epilogue(hr_fpool_t *pool)
360{
361 fibril_mutex_lock(&pool->lock);
362
363 pool->active_groups--;
364 if (pool->active_groups == 0)
365 fibril_condvar_signal(&pool->all_wus_done);
366
367 fibril_mutex_unlock(&pool->lock);
368}
369
370static errno_t fge_fibril(void *arg)
371{
372 hr_fpool_t *pool = arg;
373 while (true) {
374 fge_fibril_data_t executor;
375 fibril_mutex_lock(&pool->lock);
376
377 while (circ_buf_nused(&pool->queue.cbuf) == 0 && !pool->stop) {
378 fibril_condvar_wait(&pool->queue.not_empty,
379 &pool->lock);
380 }
381
382 if (pool->stop && circ_buf_nused(&pool->queue.cbuf) == 0) {
383 fibril_mutex_unlock(&pool->lock);
384 break;
385 }
386
387 wu_queue_pop(&pool->queue, &executor);
388
389 fibril_mutex_unlock(&pool->lock);
390
391 hr_fgroup_t *group = executor.group;
392
393 errno_t rc = executor.wu(executor.arg);
394
395 if (rc == EOK) {
396 fibril_mutex_lock(&group->lock);
397 group->finished_okay++;
398 fibril_mutex_unlock(&group->lock);
399 } else {
400 fibril_mutex_lock(&group->lock);
401 group->finished_fail++;
402 if (rc == ENOMEM)
403 group->final_errno = ENOMEM;
404 fibril_mutex_unlock(&group->lock);
405 }
406
407 fibril_mutex_lock(&pool->lock);
408 if (executor.memslot > -1) {
409 bitmap_set(&pool->bitmap, executor.memslot, 0);
410 pool->wu_storage_free_count++;
411 }
412
413 fibril_mutex_lock(&group->lock);
414 size_t finished = group->finished_fail + group->finished_okay;
415 if (finished == group->submitted)
416 fibril_condvar_signal(&group->all_done);
417 fibril_mutex_unlock(&group->lock);
418
419 fibril_mutex_unlock(&pool->lock);
420 }
421 return EOK;
422}
423
424static errno_t wu_queue_init(wu_queue_t *queue, size_t nmemb)
425{
426 queue->fexecs = malloc(sizeof(fge_fibril_data_t) * nmemb);
427 if (queue->fexecs == NULL)
428 return ENOMEM;
429
430 circ_buf_init(&queue->cbuf, queue->fexecs, nmemb,
431 sizeof(fge_fibril_data_t));
432
433 fibril_mutex_initialize(&queue->lock);
434 fibril_condvar_initialize(&queue->not_empty);
435 fibril_condvar_initialize(&queue->not_full);
436
437 return EOK;
438}
439
440static void wu_queue_push(wu_queue_t *queue, fge_fibril_data_t *executor)
441{
442 fibril_mutex_lock(&queue->lock);
443
444 while (circ_buf_push(&queue->cbuf, executor) == EAGAIN)
445 fibril_condvar_wait(&queue->not_full, &queue->lock);
446
447 fibril_condvar_signal(&queue->not_empty);
448
449 fibril_mutex_unlock(&queue->lock);
450}
451
452static void wu_queue_pop(wu_queue_t *queue, fge_fibril_data_t *executor)
453{
454 fibril_mutex_lock(&queue->lock);
455
456 while (circ_buf_pop(&queue->cbuf, executor) == EAGAIN)
457 fibril_condvar_wait(&queue->not_empty, &queue->lock);
458
459 fibril_condvar_signal(&queue->not_full);
460
461 fibril_mutex_unlock(&queue->lock);
462}
463
464static ssize_t hr_fpool_get_free_slot(hr_fpool_t *pool)
465{
466 bitmap_t *bitmap = &pool->bitmap;
467 for (size_t i = 0; i < pool->max_wus; i++)
468 if (!bitmap_get(bitmap, i))
469 return i;
470 return -1;
471}
472
473/** @}
474 */
Note: See TracBrowser for help on using the repository browser.