source: mainline/uspace/srv/bd/hr/fge.c@ 95158dac

Last change on this file since 95158dac was 95158dac, checked in by Miroslav Cimerman <mc@…>, 6 months ago

hr: fge: can ask for wus that finished with EOK

  • Property mode set to 100644
File size: 11.4 KB
Line 
1/*
2 * Copyright (c) 2024 Miroslav Cimerman
3 * Copyright (c) 2024 Vojtech Horky
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions
8 * are met:
9 *
10 * - Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * - Redistributions in binary form must reproduce the above copyright
13 * notice, this list of conditions and the following disclaimer in the
14 * documentation and/or other materials provided with the distribution.
15 * - The name of the author may not be used to endorse or promote products
16 * derived from this software without specific prior written permission.
17 *
18 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
19 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
20 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
21 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
22 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
23 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
24 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
25 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
26 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
27 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28 */
29
30/** @addtogroup hr
31 * @{
32 */
33/**
34 * @file
35 * @brief Fibril group executor
36 *
37 * Fibril pool with pre-allocated storage allowing
38 * execution of groups consisting of multiple work
39 * units.
40 */
41
42#include <adt/bitmap.h>
43#include <assert.h>
44#include <errno.h>
45#include <fibril_synch.h>
46#include <stdatomic.h>
47#include <stdbool.h>
48#include <stdio.h>
49#include <stdlib.h>
50#include <types/common.h>
51
52#include "fge.h"
53
54struct fge_fibril_data;
55typedef struct fge_fibril_data fge_fibril_data_t;
56struct wu_queue;
57typedef struct wu_queue wu_queue_t;
58
59static void *hr_fpool_make_storage(hr_fpool_t *, ssize_t *);
60static errno_t fge_fibril(void *);
61static errno_t wu_queue_init(wu_queue_t *, size_t);
62static void wu_queue_push(wu_queue_t *, fge_fibril_data_t);
63static fge_fibril_data_t wu_queue_pop(wu_queue_t *);
64static ssize_t hr_fpool_get_free_slot(hr_fpool_t *);
65
66typedef struct fge_fibril_data {
67 hr_wu_t wu; /* user-provided work unit fcn pointer */
68 void *arg;
69 hr_fgroup_t *group;
70 ssize_t memslot; /* index to pool bitmap slot */
71} fge_fibril_data_t;
72
73typedef struct wu_queue {
74 fibril_mutex_t lock;
75 fibril_condvar_t not_empty;
76 fibril_condvar_t not_full;
77 fge_fibril_data_t *fexecs;
78 size_t capacity;
79 size_t size;
80 size_t front;
81 size_t back;
82} wu_queue_t;
83
84struct hr_fpool {
85 fibril_mutex_t lock;
86 fibril_condvar_t all_wus_done;
87 bitmap_t bitmap;
88 wu_queue_t queue;
89 fid_t *fibrils;
90 uint8_t *wu_storage;
91 size_t fibril_cnt;
92 size_t max_wus;
93 size_t active_groups;
94 bool stop;
95 size_t wu_size;
96 size_t wu_storage_free_count;
97};
98
99struct hr_fgroup {
100 hr_fpool_t *pool;
101 size_t wu_cnt; /* total wu count */
102 size_t submitted;
103 size_t reserved_cnt; /* no. of reserved wu storage slots */
104 size_t reserved_avail;
105 size_t *memslots; /* indices to pool bitmap */
106 void *own_mem;
107 size_t own_used;
108 errno_t final_errno;
109 atomic_size_t finished_okay;
110 atomic_size_t finished_fail;
111 atomic_size_t wus_started;
112 fibril_mutex_t lock;
113 fibril_condvar_t all_done;
114};
115
116hr_fpool_t *hr_fpool_create(size_t fibril_cnt, size_t max_wus,
117 size_t wu_storage_size)
118{
119 void *bitmap_data = NULL;
120
121 hr_fpool_t *result = calloc(1, sizeof(hr_fpool_t));
122 if (result == NULL)
123 return NULL;
124
125 result->fibrils = malloc(sizeof(fid_t) * fibril_cnt);
126 if (result->fibrils == NULL)
127 goto bad;
128
129 result->wu_storage = malloc(wu_storage_size * max_wus);
130 if (result->wu_storage == NULL)
131 goto bad;
132
133 bitmap_data = calloc(1, bitmap_size(max_wus));
134 if (bitmap_data == NULL)
135 goto bad;
136 bitmap_initialize(&result->bitmap, max_wus, bitmap_data);
137
138 if (wu_queue_init(&result->queue, max_wus) != EOK)
139 goto bad;
140
141 fibril_mutex_initialize(&result->lock);
142 fibril_condvar_initialize(&result->all_wus_done);
143
144 result->max_wus = max_wus;
145 result->fibril_cnt = fibril_cnt;
146 result->wu_size = wu_storage_size;
147 result->wu_storage_free_count = max_wus;
148 result->stop = false;
149 result->active_groups = 0;
150
151 for (size_t i = 0; i < fibril_cnt; i++) {
152 result->fibrils[i] = fibril_create(fge_fibril, result);
153 fibril_start(result->fibrils[i]);
154 }
155
156 return result;
157bad:
158 if (result->queue.fexecs)
159 free(result->queue.fexecs);
160 if (bitmap_data)
161 free(bitmap_data);
162 if (result->wu_storage)
163 free(result->wu_storage);
164 if (result->fibrils)
165 free(result->fibrils);
166 free(result);
167
168 return NULL;
169}
170
171void hr_fpool_destroy(hr_fpool_t *pool)
172{
173 fibril_mutex_lock(&pool->lock);
174 pool->stop = true;
175 while (pool->active_groups > 0)
176 fibril_condvar_wait(&pool->all_wus_done, &pool->lock);
177
178 fibril_mutex_unlock(&pool->lock);
179
180 free(pool->bitmap.bits);
181 free(pool->queue.fexecs);
182 free(pool->wu_storage);
183 free(pool->fibrils);
184 free(pool);
185}
186
187static void *hr_fpool_make_storage(hr_fpool_t *pool, ssize_t *rmemslot)
188{
189 fibril_mutex_lock(&pool->lock);
190 ssize_t memslot = hr_fpool_get_free_slot(pool);
191 assert(memslot != -1);
192
193 bitmap_set(&pool->bitmap, memslot, 1);
194
195 fibril_mutex_unlock(&pool->lock);
196
197 if (rmemslot)
198 *rmemslot = memslot;
199
200 return pool->wu_storage + pool->wu_size * memslot;
201}
202
203hr_fgroup_t *hr_fgroup_create(hr_fpool_t *parent, size_t wu_cnt)
204{
205 hr_fgroup_t *result = malloc(sizeof(hr_fgroup_t));
206 if (result == NULL)
207 return NULL;
208
209 result->reserved_cnt = 0;
210 result->own_mem = NULL;
211 result->memslots = NULL;
212
213 fibril_mutex_lock(&parent->lock);
214
215 parent->active_groups++;
216
217 if (parent->wu_storage_free_count >= wu_cnt) {
218 parent->wu_storage_free_count -= wu_cnt;
219 result->reserved_cnt = wu_cnt;
220 } else {
221 /*
222 * Could be more conservative with memory here and
223 * allocate space only for one work unit and execute
224 * work units sequentially like it was first intended with
225 * the fallback storage.
226 */
227 size_t taking = parent->wu_storage_free_count;
228 result->own_mem = malloc(parent->wu_size * (wu_cnt - taking));
229 result->reserved_cnt = taking;
230 parent->wu_storage_free_count = 0;
231 if (result->own_mem == NULL)
232 goto bad;
233 }
234
235 if (result->reserved_cnt > 0) {
236 result->memslots =
237 malloc(sizeof(size_t) * result->reserved_cnt);
238 if (result->memslots == NULL)
239 goto bad;
240 }
241
242 fibril_mutex_unlock(&parent->lock);
243
244 result->pool = parent;
245 result->wu_cnt = wu_cnt;
246 result->submitted = 0;
247 result->reserved_avail = result->reserved_cnt;
248 result->own_used = 0;
249 result->final_errno = EOK;
250 result->finished_okay = 0;
251 result->finished_fail = 0;
252 result->wus_started = 0;
253
254 fibril_mutex_initialize(&result->lock);
255 fibril_condvar_initialize(&result->all_done);
256
257 return result;
258
259bad:
260 fibril_mutex_lock(&parent->lock);
261 parent->wu_storage_free_count += result->reserved_cnt;
262 fibril_mutex_unlock(&parent->lock);
263
264 if (result->memslots)
265 free(result->memslots);
266 if (result->own_mem)
267 free(result->own_mem);
268 free(result);
269
270 return NULL;
271}
272
273void *hr_fgroup_alloc(hr_fgroup_t *group)
274{
275 void *storage;
276
277 fibril_mutex_lock(&group->lock);
278
279 if (group->reserved_avail > 0) {
280 ssize_t memslot;
281 storage = hr_fpool_make_storage(group->pool, &memslot);
282 assert(storage != NULL);
283 group->reserved_avail--;
284 group->memslots[group->submitted] = memslot;
285 } else {
286 storage =
287 group->own_mem + group->pool->wu_size * group->own_used;
288 group->own_used++;
289 }
290
291 fibril_mutex_unlock(&group->lock);
292
293 return storage;
294}
295
296void hr_fgroup_submit(hr_fgroup_t *group, hr_wu_t wu, void *arg)
297{
298 fibril_mutex_lock(&group->lock);
299 assert(group->submitted + 1 <= group->wu_cnt);
300
301 fge_fibril_data_t executor;
302 executor.wu = wu;
303 executor.arg = arg;
304 executor.group = group;
305
306 if (group->submitted < group->reserved_cnt)
307 executor.memslot = group->memslots[group->submitted];
308 else
309 executor.memslot = -1;
310
311 group->submitted++;
312 fibril_mutex_unlock(&group->lock);
313
314 wu_queue_push(&group->pool->queue, executor);
315}
316
317static void hr_fpool_group_epilogue(hr_fpool_t *pool)
318{
319 fibril_mutex_lock(&pool->lock);
320
321 pool->active_groups--;
322 if (pool->active_groups == 0)
323 fibril_condvar_signal(&pool->all_wus_done);
324
325 fibril_mutex_unlock(&pool->lock);
326}
327
328errno_t hr_fgroup_wait(hr_fgroup_t *group, size_t *rokay, size_t *rfailed)
329{
330 fibril_mutex_lock(&group->lock);
331 while (true) {
332 size_t finished = group->finished_fail + group->finished_okay;
333 if (group->wus_started != 0 && group->wus_started == finished)
334 break;
335
336 fibril_condvar_wait(&group->all_done, &group->lock);
337 }
338
339 if (rokay)
340 *rokay = group->finished_okay;
341 if (rfailed)
342 *rfailed = group->finished_fail;
343
344 errno_t rc = EOK;
345 if (group->finished_okay != group->wus_started)
346 rc = EIO;
347
348 fibril_mutex_unlock(&group->lock);
349
350 hr_fpool_group_epilogue(group->pool);
351
352 if (group->memslots)
353 free(group->memslots);
354 if (group->own_mem)
355 free(group->own_mem);
356 free(group);
357
358 return rc;
359}
360
361static errno_t fge_fibril(void *arg)
362{
363 hr_fpool_t *pool = arg;
364 while (true) {
365 fge_fibril_data_t executor;
366 fibril_mutex_lock(&pool->lock);
367
368 while (pool->queue.size == 0 && !pool->stop) {
369 fibril_condvar_wait(&pool->queue.not_empty,
370 &pool->lock);
371 }
372
373 if (pool->stop && pool->queue.size == 0) {
374 fibril_mutex_unlock(&pool->lock);
375 break;
376 }
377
378 executor = wu_queue_pop(&pool->queue);
379
380 fibril_mutex_unlock(&pool->lock);
381
382 hr_fgroup_t *group = executor.group;
383
384 atomic_fetch_add_explicit(&group->wus_started, 1,
385 memory_order_relaxed);
386
387 errno_t rc = executor.wu(executor.arg);
388
389 if (rc == EOK)
390 atomic_fetch_add_explicit(&group->finished_okay, 1,
391 memory_order_relaxed);
392 else
393 atomic_fetch_add_explicit(&group->finished_fail, 1,
394 memory_order_relaxed);
395
396 fibril_mutex_lock(&pool->lock);
397 if (executor.memslot > -1) {
398 bitmap_set(&pool->bitmap, executor.memslot, 0);
399 pool->wu_storage_free_count++;
400 }
401
402 size_t group_total_done = group->finished_fail +
403 group->finished_okay;
404 if (group->wus_started == group_total_done)
405 fibril_condvar_signal(&group->all_done);
406
407 fibril_mutex_unlock(&pool->lock);
408 }
409 return EOK;
410}
411
412static errno_t wu_queue_init(wu_queue_t *queue, size_t capacity)
413{
414 queue->fexecs = malloc(sizeof(fge_fibril_data_t) * capacity);
415 if (queue->fexecs == NULL)
416 return ENOMEM;
417
418 queue->capacity = capacity;
419 queue->size = 0;
420 queue->front = 0;
421 queue->back = 0;
422 fibril_mutex_initialize(&queue->lock);
423 fibril_condvar_initialize(&queue->not_empty);
424 fibril_condvar_initialize(&queue->not_full);
425
426 return EOK;
427}
428
429static void wu_queue_push(wu_queue_t *queue, fge_fibril_data_t executor)
430{
431 fibril_mutex_lock(&queue->lock);
432
433 while (queue->size == queue->capacity)
434 fibril_condvar_wait(&queue->not_full, &queue->lock);
435
436 queue->fexecs[queue->back] = executor;
437 queue->back = (queue->back + 1) % queue->capacity;
438 queue->size++;
439
440 fibril_condvar_signal(&queue->not_empty);
441
442 fibril_mutex_unlock(&queue->lock);
443}
444
445static fge_fibril_data_t wu_queue_pop(wu_queue_t *queue)
446{
447 fibril_mutex_lock(&queue->lock);
448
449 while (queue->size == 0)
450 fibril_condvar_wait(&queue->not_empty, &queue->lock);
451
452 fge_fibril_data_t wu = queue->fexecs[queue->front];
453 queue->front = (queue->front + 1) % queue->capacity;
454 queue->size--;
455
456 fibril_condvar_signal(&queue->not_full);
457
458 fibril_mutex_unlock(&queue->lock);
459 return wu;
460}
461
462static ssize_t hr_fpool_get_free_slot(hr_fpool_t *pool)
463{
464 bitmap_t *bitmap = &pool->bitmap;
465 for (size_t i = 0; i < pool->max_wus; i++)
466 if (!bitmap_get(bitmap, i))
467 return i;
468 return -1;
469}
470
471/** @}
472 */
Note: See TracBrowser for help on using the repository browser.