source: mainline/uspace/lib/c/generic/thread/mpsc.c@ 5e801dc

lfn serial ticket/834-toolchain-update topic/msim-upgrade topic/simplify-dev-export
Last change on this file since 5e801dc was 45c8eea, checked in by Jakub Jermar <jakub@…>, 7 years ago

Preallocate waitq handle during initialization

Do not clutter futex_down_composable() with the preallocation of the
wait queue handle and do it single-threadedly in futex_initialize().

  • Property mode set to 100644
File size: 4.5 KB
Line 
1/*
2 * Copyright (c) 2018 CZ.NIC, z.s.p.o.
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 *
9 * - Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 * - Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
14 * - The name of the author may not be used to endorse or promote products
15 * derived from this software without specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
18 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
19 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
20 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
21 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
22 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
26 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27 */
28
29/*
30 * Authors:
31 * Jiří Zárevúcky (jzr) <zarevucky.jiri@gmail.com>
32 */
33
34#include <fibril.h>
35#include <fibril_synch.h>
36#include <mem.h>
37#include <stdlib.h>
38
39#include "../private/fibril.h"
40
41/*
42 * A multi-producer, single-consumer concurrent FIFO channel with unlimited
43 * buffering.
44 *
45 * The current implementation is based on the super simple two-lock queue
46 * by Michael and Scott
47 * (http://www.cs.rochester.edu/~scott/papers/1996_PODC_queues.pdf)
48 *
49 * The original algorithm uses one lock on each side. Since this queue is
50 * single-consumer, we only use the tail lock.
51 */
52
53typedef struct mpsc_node mpsc_node_t;
54
55struct mpsc {
56 size_t elem_size;
57 fibril_rmutex_t t_lock;
58 mpsc_node_t *head;
59 mpsc_node_t *tail;
60 mpsc_node_t *close_node;
61 fibril_event_t event;
62};
63
64struct mpsc_node {
65 mpsc_node_t *next;
66 unsigned char data[];
67};
68
69mpsc_t *mpsc_create(size_t elem_size)
70{
71 mpsc_t *q = calloc(1, sizeof(mpsc_t));
72 mpsc_node_t *n = calloc(1, sizeof(mpsc_node_t) + elem_size);
73 mpsc_node_t *c = calloc(1, sizeof(mpsc_node_t) + elem_size);
74
75 if (!q || !n || !c) {
76 free(q);
77 free(n);
78 free(c);
79 return NULL;
80 }
81
82 if (fibril_rmutex_initialize(&q->t_lock) != EOK) {
83 free(q);
84 free(n);
85 free(c);
86 return NULL;
87 }
88
89 q->elem_size = elem_size;
90 q->head = q->tail = n;
91 q->close_node = c;
92 return q;
93}
94
95void mpsc_destroy(mpsc_t *q)
96{
97 mpsc_node_t *n = q->head;
98 mpsc_node_t *next = NULL;
99 while (n != NULL) {
100 next = n->next;
101 free(n);
102 n = next;
103 }
104
105 fibril_rmutex_destroy(&q->t_lock);
106
107 free(q);
108}
109
110static errno_t _mpsc_push(mpsc_t *q, mpsc_node_t *n)
111{
112 fibril_rmutex_lock(&q->t_lock);
113
114 if (q->tail == q->close_node) {
115 fibril_rmutex_unlock(&q->t_lock);
116 return EINVAL;
117 }
118
119 __atomic_store_n(&q->tail->next, n, __ATOMIC_RELEASE);
120 q->tail = n;
121
122 fibril_rmutex_unlock(&q->t_lock);
123
124 fibril_notify(&q->event);
125 return EOK;
126}
127
128/**
129 * Send data on the channel.
130 * The length of data is equal to the `elem_size` value set in `mpsc_create`.
131 *
132 * This function is safe for use under restricted mutex lock.
133 *
134 * @return ENOMEM if allocation failed, EINVAL if the queue is closed.
135 */
136errno_t mpsc_send(mpsc_t *q, const void *b)
137{
138 mpsc_node_t *n = malloc(sizeof(mpsc_node_t) + q->elem_size);
139 if (!n)
140 return ENOMEM;
141
142 n->next = NULL;
143 memcpy(n->data, b, q->elem_size);
144
145 return _mpsc_push(q, n);
146}
147
148/**
149 * Receive data from the channel.
150 *
151 * @return ETIMEOUT if deadline expires, ENOENT if the queue is closed and
152 * there is no message left in the queue.
153 */
154errno_t mpsc_receive(mpsc_t *q, void *b, const struct timespec *expires)
155{
156 mpsc_node_t *n;
157 mpsc_node_t *new_head;
158
159 while (true) {
160 n = q->head;
161 new_head = __atomic_load_n(&n->next, __ATOMIC_ACQUIRE);
162 if (new_head)
163 break;
164
165 errno_t rc = fibril_wait_timeout(&q->event, expires);
166 if (rc != EOK)
167 return rc;
168 }
169
170 if (new_head == q->close_node)
171 return ENOENT;
172
173 memcpy(b, new_head->data, q->elem_size);
174 q->head = new_head;
175
176 free(n);
177 return EOK;
178}
179
180/**
181 * Close the channel.
182 *
183 * This function is safe for use under restricted mutex lock.
184 */
185void mpsc_close(mpsc_t *q)
186{
187 _mpsc_push(q, q->close_node);
188}
Note: See TracBrowser for help on using the repository browser.