source: mainline/uspace/lib/c/generic/thread/mpsc.c@ 269bc459

lfn serial ticket/834-toolchain-update topic/msim-upgrade topic/simplify-dev-export
Last change on this file since 269bc459 was 269bc459, checked in by Jakub Jermar <jakub@…>, 7 years ago

Add SYS_WAITQ_DESTROY

  • Property mode set to 100644
File size: 4.4 KB
Line 
1/*
2 * Copyright (c) 2018 CZ.NIC, z.s.p.o.
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 *
9 * - Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 * - Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
14 * - The name of the author may not be used to endorse or promote products
15 * derived from this software without specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
18 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
19 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
20 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
21 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
22 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
26 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27 */
28
29/*
30 * Authors:
31 * Jiří Zárevúcky (jzr) <zarevucky.jiri@gmail.com>
32 */
33
34#include <fibril.h>
35#include <fibril_synch.h>
36#include <mem.h>
37#include <stdlib.h>
38
39#include "../private/fibril.h"
40
41/*
42 * A multi-producer, single-consumer concurrent FIFO channel with unlimited
43 * buffering.
44 *
45 * The current implementation is based on the super simple two-lock queue
46 * by Michael and Scott
47 * (http://www.cs.rochester.edu/~scott/papers/1996_PODC_queues.pdf)
48 *
49 * The original algorithm uses one lock on each side. Since this queue is
50 * single-consumer, we only use the tail lock.
51 */
52
53typedef struct mpsc_node mpsc_node_t;
54
55struct mpsc {
56 size_t elem_size;
57 fibril_rmutex_t t_lock;
58 mpsc_node_t *head;
59 mpsc_node_t *tail;
60 mpsc_node_t *close_node;
61 fibril_event_t event;
62};
63
64struct mpsc_node {
65 mpsc_node_t *next;
66 unsigned char data[];
67};
68
69mpsc_t *mpsc_create(size_t elem_size)
70{
71 mpsc_t *q = calloc(1, sizeof(mpsc_t));
72 mpsc_node_t *n = calloc(1, sizeof(mpsc_node_t) + elem_size);
73 mpsc_node_t *c = calloc(1, sizeof(mpsc_node_t) + elem_size);
74
75 if (!q || !n || !c) {
76 free(q);
77 free(n);
78 free(c);
79 return NULL;
80 }
81
82 q->elem_size = elem_size;
83 fibril_rmutex_initialize(&q->t_lock);
84 q->head = q->tail = n;
85 q->close_node = c;
86 return q;
87}
88
89void mpsc_destroy(mpsc_t *q)
90{
91 mpsc_node_t *n = q->head;
92 mpsc_node_t *next = NULL;
93 while (n != NULL) {
94 next = n->next;
95 free(n);
96 n = next;
97 }
98
99 fibril_rmutex_destroy(&q->t_lock);
100
101 free(q);
102}
103
104static errno_t _mpsc_push(mpsc_t *q, mpsc_node_t *n)
105{
106 fibril_rmutex_lock(&q->t_lock);
107
108 if (q->tail == q->close_node) {
109 fibril_rmutex_unlock(&q->t_lock);
110 return EINVAL;
111 }
112
113 __atomic_store_n(&q->tail->next, n, __ATOMIC_RELEASE);
114 q->tail = n;
115
116 fibril_rmutex_unlock(&q->t_lock);
117
118 fibril_notify(&q->event);
119 return EOK;
120}
121
122/**
123 * Send data on the channel.
124 * The length of data is equal to the `elem_size` value set in `mpsc_create`.
125 *
126 * This function is safe for use under restricted mutex lock.
127 *
128 * @return ENOMEM if allocation failed, EINVAL if the queue is closed.
129 */
130errno_t mpsc_send(mpsc_t *q, const void *b)
131{
132 mpsc_node_t *n = malloc(sizeof(mpsc_node_t) + q->elem_size);
133 if (!n)
134 return ENOMEM;
135
136 n->next = NULL;
137 memcpy(n->data, b, q->elem_size);
138
139 return _mpsc_push(q, n);
140}
141
142/**
143 * Receive data from the channel.
144 *
145 * @return ETIMEOUT if deadline expires, ENOENT if the queue is closed and
146 * there is no message left in the queue.
147 */
148errno_t mpsc_receive(mpsc_t *q, void *b, const struct timespec *expires)
149{
150 mpsc_node_t *n;
151 mpsc_node_t *new_head;
152
153 while (true) {
154 n = q->head;
155 new_head = __atomic_load_n(&n->next, __ATOMIC_ACQUIRE);
156 if (new_head)
157 break;
158
159 errno_t rc = fibril_wait_timeout(&q->event, expires);
160 if (rc != EOK)
161 return rc;
162 }
163
164 if (new_head == q->close_node)
165 return ENOENT;
166
167 memcpy(b, new_head->data, q->elem_size);
168 q->head = new_head;
169
170 free(n);
171 return EOK;
172}
173
174/**
175 * Close the channel.
176 *
177 * This function is safe for use under restricted mutex lock.
178 */
179void mpsc_close(mpsc_t *q)
180{
181 _mpsc_push(q, q->close_node);
182}
Note: See TracBrowser for help on using the repository browser.