source: mainline/uspace/srv/bd/hr/fge.c

Last change on this file was 81b4c795, checked in by Miroslav Cimerman <mc@…>, 5 weeks ago

hr: rename malloc_waitok() to hr_malloc_waitok()

  • Property mode set to 100644
File size: 10.0 KB
Line 
1/*
2 * Copyright (c) 2025 Miroslav Cimerman
3 * Copyright (c) 2024 Vojtech Horky
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions
8 * are met:
9 *
10 * - Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * - Redistributions in binary form must reproduce the above copyright
13 * notice, this list of conditions and the following disclaimer in the
14 * documentation and/or other materials provided with the distribution.
15 * - The name of the author may not be used to endorse or promote products
16 * derived from this software without specific prior written permission.
17 *
18 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
19 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
20 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
21 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
22 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
23 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
24 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
25 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
26 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
27 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28 */
29
30/** @addtogroup hr
31 * @{
32 */
33/**
34 * @file
35 * @brief Fibril group executor
36 *
37 * Fibril pool with pre-allocated storage allowing
38 * execution of groups consisting of multiple work
39 * units.
40 */
41
42#include <adt/bitmap.h>
43#include <adt/circ_buf.h>
44#include <assert.h>
45#include <errno.h>
46#include <fibril_synch.h>
47#include <stdatomic.h>
48#include <stdbool.h>
49#include <stdio.h>
50#include <stdlib.h>
51#include <types/common.h>
52
53#include "fge.h"
54#include "util.h"
55
56static void *hr_fpool_make_storage(hr_fpool_t *, ssize_t *);
57static void hr_fpool_group_epilogue(hr_fpool_t *);
58static errno_t fge_fibril(void *);
59static errno_t wu_queue_init(wu_queue_t *, size_t);
60static void wu_queue_push(wu_queue_t *, fge_fibril_data_t *);
61static void wu_queue_pop(wu_queue_t *, fge_fibril_data_t *);
62static ssize_t hr_fpool_get_free_slot(hr_fpool_t *);
63
64hr_fpool_t *hr_fpool_create(size_t fibril_cnt, size_t max_wus,
65 size_t wu_storage_size)
66{
67 assert(max_wus > 0 && wu_storage_size > 0);
68
69 void *bitmap_data = NULL;
70
71 hr_fpool_t *result = calloc(1, sizeof(hr_fpool_t));
72 if (result == NULL)
73 return NULL;
74
75 result->fibrils = malloc(sizeof(fid_t) * fibril_cnt);
76 if (result->fibrils == NULL)
77 goto bad;
78
79 result->wu_storage = malloc(wu_storage_size * max_wus);
80 if (result->wu_storage == NULL)
81 goto bad;
82
83 bitmap_data = calloc(1, bitmap_size(max_wus));
84 if (bitmap_data == NULL)
85 goto bad;
86 bitmap_initialize(&result->bitmap, max_wus, bitmap_data);
87
88 if (wu_queue_init(&result->queue, max_wus) != EOK)
89 goto bad;
90
91 fibril_mutex_initialize(&result->lock);
92 fibril_condvar_initialize(&result->all_wus_done);
93
94 result->max_wus = max_wus;
95 result->fibril_cnt = fibril_cnt;
96 result->wu_size = wu_storage_size;
97 result->wu_storage_free_count = max_wus;
98 result->stop = false;
99 result->active_groups = 0;
100
101 for (size_t i = 0; i < fibril_cnt; i++) {
102 result->fibrils[i] = fibril_create(fge_fibril, result);
103 fibril_start(result->fibrils[i]);
104 /* fibril_detach(result->fibrils[i]); */
105 }
106
107 return result;
108bad:
109 if (result->queue.fexecs != NULL)
110 free(result->queue.fexecs);
111 if (bitmap_data != NULL)
112 free(bitmap_data);
113 if (result->wu_storage != NULL)
114 free(result->wu_storage);
115 if (result->fibrils != NULL)
116 free(result->fibrils);
117 free(result);
118
119 return NULL;
120}
121
122void hr_fpool_destroy(hr_fpool_t *pool)
123{
124 fibril_mutex_lock(&pool->lock);
125 pool->stop = true;
126 while (pool->active_groups > 0)
127 fibril_condvar_wait(&pool->all_wus_done, &pool->lock);
128
129 fibril_mutex_unlock(&pool->lock);
130
131 free(pool->bitmap.bits);
132 free(pool->queue.fexecs);
133 free(pool->wu_storage);
134 free(pool->fibrils);
135 free(pool);
136}
137
138hr_fgroup_t *hr_fgroup_create(hr_fpool_t *parent, size_t wu_cnt)
139{
140 assert(wu_cnt > 0);
141
142 hr_fgroup_t *result = hr_malloc_waitok(sizeof(hr_fgroup_t));
143
144 result->reserved_cnt = 0;
145 result->own_mem = NULL;
146 result->memslots = NULL;
147
148 fibril_mutex_lock(&parent->lock);
149
150 parent->active_groups++;
151
152 if (parent->wu_storage_free_count >= wu_cnt) {
153 parent->wu_storage_free_count -= wu_cnt;
154 result->reserved_cnt = wu_cnt;
155 } else {
156 /*
157 * Could be more conservative with memory here and
158 * allocate space only for one work unit and execute
159 * work units sequentially like it was first intended with
160 * the fallback storage.
161 */
162 size_t taking = parent->wu_storage_free_count;
163 result->own_mem =
164 hr_malloc_waitok(parent->wu_size * (wu_cnt - taking));
165 result->reserved_cnt = taking;
166 parent->wu_storage_free_count = 0;
167 }
168
169 if (result->reserved_cnt > 0) {
170 result->memslots =
171 hr_malloc_waitok(sizeof(size_t) * result->reserved_cnt);
172 }
173
174 fibril_mutex_unlock(&parent->lock);
175
176 result->pool = parent;
177 result->wu_cnt = wu_cnt;
178 result->submitted = 0;
179 result->reserved_avail = result->reserved_cnt;
180 result->own_used = 0;
181 result->final_errno = EOK;
182 result->finished_okay = 0;
183 result->finished_fail = 0;
184
185 fibril_mutex_initialize(&result->lock);
186 fibril_condvar_initialize(&result->all_done);
187
188 return result;
189}
190
191void *hr_fgroup_alloc(hr_fgroup_t *group)
192{
193 void *storage;
194
195 fibril_mutex_lock(&group->lock);
196
197 assert(group->submitted < group->wu_cnt);
198
199 if (group->reserved_avail > 0) {
200 ssize_t memslot;
201 storage = hr_fpool_make_storage(group->pool, &memslot);
202 assert(storage != NULL);
203 group->reserved_avail--;
204 group->memslots[group->submitted] = memslot;
205 } else {
206 assert(group->own_mem != NULL);
207 storage =
208 group->own_mem + group->pool->wu_size * group->own_used;
209 group->own_used++;
210 }
211
212 fibril_mutex_unlock(&group->lock);
213
214 return storage;
215}
216
217void hr_fgroup_submit(hr_fgroup_t *group, hr_wu_t wu, void *arg)
218{
219 fibril_mutex_lock(&group->lock);
220 assert(group->submitted < group->wu_cnt);
221
222 fge_fibril_data_t executor;
223 executor.wu = wu;
224 executor.arg = arg;
225 executor.group = group;
226
227 if (group->submitted < group->reserved_cnt)
228 executor.memslot = group->memslots[group->submitted];
229 else
230 executor.memslot = -1;
231
232 group->submitted++;
233 fibril_mutex_unlock(&group->lock);
234
235 wu_queue_push(&group->pool->queue, &executor);
236}
237
238errno_t hr_fgroup_wait(hr_fgroup_t *group, size_t *rokay, size_t *rfailed)
239{
240 fibril_mutex_lock(&group->lock);
241 assert(group->submitted <= group->wu_cnt);
242
243 while (true) {
244 size_t finished = group->finished_fail + group->finished_okay;
245 if (finished == group->submitted)
246 break;
247
248 fibril_condvar_wait(&group->all_done, &group->lock);
249 }
250
251 if (rokay)
252 *rokay = group->finished_okay;
253 if (rfailed)
254 *rfailed = group->finished_fail;
255
256 errno_t rc = group->final_errno;
257
258 fibril_mutex_unlock(&group->lock);
259
260 hr_fpool_group_epilogue(group->pool);
261
262 if (group->memslots != NULL)
263 free(group->memslots);
264 if (group->own_mem != NULL)
265 free(group->own_mem);
266 free(group);
267
268 return rc;
269}
270
271static void *hr_fpool_make_storage(hr_fpool_t *pool, ssize_t *rmemslot)
272{
273 fibril_mutex_lock(&pool->lock);
274 ssize_t memslot = hr_fpool_get_free_slot(pool);
275 assert(memslot != -1);
276
277 bitmap_set(&pool->bitmap, memslot, 1);
278
279 fibril_mutex_unlock(&pool->lock);
280
281 if (rmemslot)
282 *rmemslot = memslot;
283
284 return pool->wu_storage + pool->wu_size * memslot;
285}
286
287static void hr_fpool_group_epilogue(hr_fpool_t *pool)
288{
289 fibril_mutex_lock(&pool->lock);
290
291 pool->active_groups--;
292 if (pool->active_groups == 0)
293 fibril_condvar_signal(&pool->all_wus_done);
294
295 fibril_mutex_unlock(&pool->lock);
296}
297
298static errno_t fge_fibril(void *arg)
299{
300 hr_fpool_t *pool = arg;
301 while (true) {
302 fge_fibril_data_t executor;
303 fibril_mutex_lock(&pool->lock);
304
305 while (circ_buf_nused(&pool->queue.cbuf) == 0 && !pool->stop) {
306 fibril_condvar_wait(&pool->queue.not_empty,
307 &pool->lock);
308 }
309
310 if (pool->stop && circ_buf_nused(&pool->queue.cbuf) == 0) {
311 fibril_mutex_unlock(&pool->lock);
312 break;
313 }
314
315 wu_queue_pop(&pool->queue, &executor);
316
317 fibril_mutex_unlock(&pool->lock);
318
319 hr_fgroup_t *group = executor.group;
320
321 errno_t rc = executor.wu(executor.arg);
322
323 if (rc == EOK) {
324 fibril_mutex_lock(&group->lock);
325 group->finished_okay++;
326 fibril_mutex_unlock(&group->lock);
327 } else {
328 fibril_mutex_lock(&group->lock);
329 group->finished_fail++;
330 if (rc == EAGAIN)
331 group->final_errno = EAGAIN;
332 fibril_mutex_unlock(&group->lock);
333 }
334
335 fibril_mutex_lock(&pool->lock);
336 if (executor.memslot > -1) {
337 bitmap_set(&pool->bitmap, executor.memslot, 0);
338 pool->wu_storage_free_count++;
339 }
340
341 fibril_mutex_lock(&group->lock);
342 size_t finished = group->finished_fail + group->finished_okay;
343 if (finished == group->submitted)
344 fibril_condvar_signal(&group->all_done);
345 fibril_mutex_unlock(&group->lock);
346
347 fibril_mutex_unlock(&pool->lock);
348 }
349 return EOK;
350}
351
352static errno_t wu_queue_init(wu_queue_t *queue, size_t nmemb)
353{
354 queue->fexecs = malloc(sizeof(fge_fibril_data_t) * nmemb);
355 if (queue->fexecs == NULL)
356 return ENOMEM;
357
358 circ_buf_init(&queue->cbuf, queue->fexecs, nmemb,
359 sizeof(fge_fibril_data_t));
360
361 fibril_mutex_initialize(&queue->lock);
362 fibril_condvar_initialize(&queue->not_empty);
363 fibril_condvar_initialize(&queue->not_full);
364
365 return EOK;
366}
367
368static void wu_queue_push(wu_queue_t *queue, fge_fibril_data_t *executor)
369{
370 fibril_mutex_lock(&queue->lock);
371
372 while (circ_buf_push(&queue->cbuf, executor) == EAGAIN)
373 fibril_condvar_wait(&queue->not_full, &queue->lock);
374
375 fibril_condvar_signal(&queue->not_empty);
376
377 fibril_mutex_unlock(&queue->lock);
378}
379
380static void wu_queue_pop(wu_queue_t *queue, fge_fibril_data_t *executor)
381{
382 fibril_mutex_lock(&queue->lock);
383
384 while (circ_buf_pop(&queue->cbuf, executor) == EAGAIN)
385 fibril_condvar_wait(&queue->not_empty, &queue->lock);
386
387 fibril_condvar_signal(&queue->not_full);
388
389 fibril_mutex_unlock(&queue->lock);
390}
391
392static ssize_t hr_fpool_get_free_slot(hr_fpool_t *pool)
393{
394 bitmap_t *bitmap = &pool->bitmap;
395 for (size_t i = 0; i < pool->max_wus; i++)
396 if (!bitmap_get(bitmap, i))
397 return i;
398 return -1;
399}
400
401/** @}
402 */
Note: See TracBrowser for help on using the repository browser.