source: mainline/libc/generic/async.c@ 53ca318

lfn serial ticket/834-toolchain-update topic/msim-upgrade topic/simplify-dev-export
Last change on this file since 53ca318 was 53ca318, checked in by Ondrej Palkovsky <ondrap@…>, 19 years ago

Small updates to asynchronous framework.

  • Property mode set to 100644
File size: 9.7 KB
Line 
1/*
2 * Copyright (C) 2006 Ondrej Palkovsky
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 *
9 * - Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 * - Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
14 * - The name of the author may not be used to endorse or promote products
15 * derived from this software without specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
18 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
19 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
20 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
21 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
22 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
26 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27 */
28
29/**
30 * Asynchronous library
31 *
32 * The aim of this library is facilitating writing programs utilizing
33 * the asynchronous nature of Helenos IPC, yet using a normal way
34 * of programming.
35 *
36 * You should be able to write very simple multithreaded programs,
37 * the async framework will automatically take care of most synchronization
38 * problems.
39 *
40 * Default semantics:
41 * - send() - send asynchronously. If the kernel refuses to send more
42 * messages, [ try to get responses from kernel, if nothing
43 * found, might try synchronous ]
44 *
45 * Example of use:
46 *
47 * 1) Multithreaded client application
48 * create_thread(thread1);
49 * create_thread(thread2);
50 * ...
51 *
52 * thread1() {
53 * conn = ipc_connect_me_to();
54 * c1 = send(conn);
55 * c2 = send(conn);
56 * wait_for(c1);
57 * wait_for(c2);
58 * }
59 *
60 *
61 * 2) Multithreaded server application
62 * main() {
63 * async_manager();
64 * }
65 *
66 *
67 * client_connection(icallid, *icall) {
68 * if (want_refuse) {
69 * ipc_answer_fast(icallid, ELIMIT, 0, 0);
70 * return;
71 * }
72 * ipc_answer_fast(icallid, 0, 0, 0);
73 *
74 * callid = async_get_call(&call);
75 * handle(callid, call);
76 * ipc_answer_fast(callid, 1,2,3);
77 *
78 * callid = async_get_call(&call);
79 * ....
80 * }
81 *
82 * TODO: Detaching/joining dead psthreads?
83 */
84#include <futex.h>
85#include <async.h>
86#include <psthread.h>
87#include <stdio.h>
88#include <libadt/hash_table.h>
89#include <libadt/list.h>
90#include <ipc/ipc.h>
91#include <assert.h>
92#include <errno.h>
93
94static atomic_t conn_futex = FUTEX_INITIALIZER;
95static hash_table_t conn_hash_table;
96
97typedef struct {
98 link_t link;
99 ipc_callid_t callid;
100 ipc_call_t call;
101} msg_t;
102
103typedef struct {
104 link_t link;
105 ipcarg_t in_phone_hash; /**< Incoming phone hash. */
106 link_t msg_queue; /**< Messages that should be delivered to this thread */
107 pstid_t ptid; /**< Thread associated with this connection */
108 int active; /**< If this thread is currently active */
109 /* Structures for connection opening packet */
110 ipc_callid_t callid;
111 ipc_call_t call;
112 void (*cthread)(ipc_callid_t,ipc_call_t *);
113} connection_t;
114
115__thread connection_t *PS_connection;
116
117/* Hash table functions */
118
119#define CONN_HASH_TABLE_CHAINS 32
120
121static hash_index_t conn_hash(unsigned long *key)
122{
123 assert(key);
124 return ((*key) >> 4) % CONN_HASH_TABLE_CHAINS;
125}
126
127static int conn_compare(unsigned long key[], hash_count_t keys, link_t *item)
128{
129 connection_t *hs;
130
131 hs = hash_table_get_instance(item, connection_t, link);
132
133 return key[0] == hs->in_phone_hash;
134}
135
136static void conn_remove(link_t *item)
137{
138 free(hash_table_get_instance(item, connection_t, link));
139}
140
141
142/** Operations for NS hash table. */
143static hash_table_operations_t conn_hash_table_ops = {
144 .hash = conn_hash,
145 .compare = conn_compare,
146 .remove_callback = conn_remove
147};
148
149/** Try to route a call to an appropriate connection thread
150 *
151 */
152static int route_call(ipc_callid_t callid, ipc_call_t *call)
153{
154 connection_t *conn;
155 msg_t *msg;
156 link_t *hlp;
157 unsigned long key;
158
159 futex_down(&conn_futex);
160
161 key = call->in_phone_hash;
162 hlp = hash_table_find(&conn_hash_table, &key);
163 if (!hlp) {
164 futex_up(&conn_futex);
165 return 0;
166 }
167 conn = hash_table_get_instance(hlp, connection_t, link);
168
169 msg = malloc(sizeof(*msg));
170 msg->callid = callid;
171 msg->call = *call;
172 list_append(&msg->link, &conn->msg_queue);
173
174 if (!conn->active) {
175 conn->active = 1;
176 psthread_add_ready(conn->ptid);
177 }
178
179 futex_up(&conn_futex);
180
181 return 1;
182}
183
184/** Return new incoming message for current(thread-local) connection */
185ipc_callid_t async_get_call(ipc_call_t *call)
186{
187 msg_t *msg;
188 ipc_callid_t callid;
189 connection_t *conn;
190
191 futex_down(&conn_futex);
192
193 conn = PS_connection;
194 /* If nothing in queue, wait until something appears */
195 if (list_empty(&conn->msg_queue)) {
196 conn->active = 0;
197 psthread_schedule_next_adv(PS_TO_MANAGER);
198 }
199
200 msg = list_get_instance(conn->msg_queue.next, msg_t, link);
201 list_remove(&msg->link);
202 callid = msg->callid;
203 *call = msg->call;
204 free(msg);
205
206 futex_up(&conn_futex);
207 return callid;
208}
209
210/** Thread function that gets created on new connection
211 *
212 * This function is defined as a weak symbol - to be redefined in
213 * user code.
214 */
215void client_connection(ipc_callid_t callid, ipc_call_t *call)
216{
217 ipc_answer_fast(callid, ENOENT, 0, 0);
218}
219
220/** Wrapper for client connection thread
221 *
222 * When new connection arrives, thread with this function is created.
223 * It calls client_connection and does final cleanup.
224 *
225 * @parameter arg Connection structure pointer
226 */
227static int connection_thread(void *arg)
228{
229 unsigned long key;
230 msg_t *msg;
231
232 /* Setup thread local connection pointer */
233 PS_connection = (connection_t *)arg;
234 PS_connection->cthread(PS_connection->callid, &PS_connection->call);
235
236 /* Remove myself from connection hash table */
237 futex_down(&conn_futex);
238 key = PS_connection->in_phone_hash;
239 hash_table_remove(&conn_hash_table, &key, 1);
240 futex_up(&conn_futex);
241 /* Answer all remaining messages with ehangup */
242 while (!list_empty(&PS_connection->msg_queue)) {
243 msg = list_get_instance(PS_connection->msg_queue.next, msg_t, link);
244 list_remove(&msg->link);
245 ipc_answer_fast(msg->callid, EHANGUP, 0, 0);
246 free(msg);
247 }
248}
249
250/** Create new thread for a new connection
251 *
252 * Creates new thread for connection, fills in connection
253 * structures and inserts it into the hash table, so that
254 * later we can easily do routing of messages to particular
255 * threads.
256 *
257 * @param callid Callid of the IPC_M_CONNECT_ME_TO packet
258 * @param call Call data of the opening packet
259 * @param cthread Thread function that should be called upon
260 * opening the connection
261 * @return New thread id
262 */
263pstid_t async_new_connection(ipc_callid_t callid, ipc_call_t *call,
264 void (*cthread)(ipc_callid_t,ipc_call_t *))
265{
266 pstid_t ptid;
267 connection_t *conn;
268 unsigned long key;
269
270 conn = malloc(sizeof(*conn));
271 if (!conn) {
272 ipc_answer_fast(callid, ENOMEM, 0, 0);
273 return NULL;
274 }
275 conn->in_phone_hash = IPC_GET_ARG3(*call);
276 list_initialize(&conn->msg_queue);
277 conn->ptid = psthread_create(connection_thread, conn);
278 conn->callid = callid;
279 conn->call = *call;
280 conn->active = 1; /* We will activate it asap */
281 conn->cthread = cthread;
282 list_initialize(&conn->link);
283 if (!conn->ptid) {
284 free(conn);
285 ipc_answer_fast(callid, ENOMEM, 0, 0);
286 return NULL;
287 }
288 key = conn->in_phone_hash;
289 futex_down(&conn_futex);
290 /* Add connection to hash table */
291 hash_table_insert(&conn_hash_table, &key, &conn->link);
292 futex_up(&conn_futex);
293
294 psthread_add_ready(conn->ptid);
295
296 return conn->ptid;
297}
298
299/** Handle call to a task */
300static void handle_call(ipc_callid_t callid, ipc_call_t *call)
301{
302 if (route_call(callid, call))
303 return;
304
305 switch (IPC_GET_METHOD(*call)) {
306 case IPC_M_INTERRUPT:
307 break;
308 case IPC_M_CONNECT_ME_TO:
309 /* Open new connection with thread etc. */
310 async_new_connection(callid, call, client_connection);
311 break;
312 default:
313 ipc_answer_fast(callid, EHANGUP, 0, 0);
314 }
315}
316
317/** Endless loop dispatching incoming calls and answers */
318int async_manager()
319{
320 ipc_call_t call;
321 ipc_callid_t callid;
322
323 while (1) {
324 if (psthread_schedule_next_adv(PS_FROM_MANAGER)) {
325 futex_up(&conn_futex); /* conn_futex is always held
326 * when entering manager thread
327 */
328 continue;
329 }
330 callid = ipc_wait_cycle(&call,SYNCH_NO_TIMEOUT,SYNCH_BLOCKING);
331
332 if (callid & IPC_CALLID_ANSWERED)
333 continue;
334 handle_call(callid, &call);
335 }
336}
337
338/** Function to start async_manager as a standalone thread
339 *
340 * When more kernel threads are used, one async manager should
341 * exist per thread. The particular implementation may change,
342 * currently one async_manager is started automatically per kernel
343 * thread except main thread.
344 */
345static int async_manager_thread(void *arg)
346{
347 futex_up(&conn_futex); /* conn_futex is always locked when entering
348 * manager */
349 async_manager();
350}
351
352/** Add one manager to manager list */
353void async_create_manager(void)
354{
355 pstid_t ptid;
356
357 ptid = psthread_create(async_manager_thread, NULL);
358 psthread_add_manager(ptid);
359}
360
361/** Remove one manager from manager list */
362void async_destroy_manager(void)
363{
364 psthread_remove_manager();
365}
366
367/** Initialize internal structures needed for async manager */
368int _async_init(void)
369{
370 if (!hash_table_create(&conn_hash_table, CONN_HASH_TABLE_CHAINS, 1, &conn_hash_table_ops)) {
371 printf("%s: cannot create hash table\n", "async");
372 return ENOMEM;
373 }
374
375}
Note: See TracBrowser for help on using the repository browser.