source: mainline/uspace/srv/sysman/job_closure.c@ 102f641

Last change on this file since 102f641 was 102f641, checked in by Matthieu Riolo <matthieu.riolo@…>, 6 years ago

Correcting syntax according to ccheck

  • Property mode set to 100644
File size: 8.8 KB
Line 
1/*
2 * Copyright (c) 2015 Michal Koutny
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 *
9 * - Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 * - Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
14 * - The name of the author may not be used to endorse or promote products
15 * derived from this software without specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AS IS'' AND ANY EXPRESS OR
18 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
19 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
20 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
21 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
22 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
26 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27 */
28
29#include <adt/list.h>
30#include <assert.h>
31#include <errno.h>
32#include <stdlib.h>
33
34#include "repo.h"
35#include "edge.h"
36#include "job_closure.h"
37#include "log.h"
38
39/** Struct describes how to traverse units graph */
40struct bfs_ops;
41typedef struct bfs_ops bfs_ops_t;
42
43struct bfs_ops {
44 enum {
45 BFS_FORWARD, /**< Follow oriented edges */
46 BFS_BACKWARD /**< Go against oriented edges */
47 } direction;
48
49 /** Visit a unit via edge
50 * unit, incoming edge, traversing ops, user data
51 * return result of visit (error stops further traversal)
52 */
53 int (*visit)(unit_t *, unit_edge_t *, bfs_ops_t *, void *);
54
55 /** Clean units remaining in BFS queue after error */
56 void (*clean)(unit_t *, bfs_ops_t *, void *);
57};
58
59/*
60 * Static functions
61 */
62
63static int job_add_blocked_job(job_t *blocking_job, job_t *blocked_job)
64{
65 assert(blocking_job->blocked_jobs.size ==
66 blocking_job->blocked_jobs_count);
67
68 int rc = dyn_array_append(&blocking_job->blocked_jobs, job_t *,
69 blocked_job);
70 if (rc != EOK) {
71 return ENOMEM;
72 }
73 job_add_ref(blocked_job);
74
75 blocking_job->blocked_jobs_count += 1;
76 blocked_job->blocking_jobs += 1;
77
78 return EOK;
79}
80
81/** During visit creates job and appends it to closure
82 *
83 * @note Assumes BFS origin unit's job is already present in the closure on the
84 * last position!
85 *
86 * @return EOK on success
87 */
88static int visit_propagate_job(unit_t *u, unit_edge_t *e, bfs_ops_t *ops,
89 void *arg)
90{
91 int rc = EOK;
92 job_t *created_job = NULL;
93 job_closure_t *closure = arg;
94
95 if (e == NULL) {
96 if (u->bfs_data != NULL) {
97 goto finish;
98 }
99 job_t *first_job = dyn_array_last(closure, job_t *);
100
101 job_add_ref(first_job);
102 u->bfs_data = first_job;
103
104 goto finish;
105 }
106
107 job_t *job = (ops->direction == BFS_FORWARD) ?
108 e->input->bfs_data :
109 e->output->bfs_data;
110
111 assert(job != NULL);
112
113 if (u->bfs_data == NULL) {
114 created_job = job_create(u, job->target_state);
115 if (created_job == NULL) {
116 rc = ENOMEM;
117 goto finish;
118 }
119
120 /* Pass job reference to closure and add one for unit */
121 rc = dyn_array_append(closure, job_t *, created_job);
122 if (rc != EOK) {
123 goto finish;
124 }
125
126 job_add_ref(created_job);
127 u->bfs_data = created_job;
128 }
129
130 /* Depending on edge type block existing jobs */
131 rc = job_add_blocked_job(u->bfs_data, job);
132
133finish:
134 if (rc != EOK) {
135 job_del_ref(&created_job);
136 }
137 return rc;
138}
139
140static int visit_isolate(unit_t *u, unit_edge_t *e, bfs_ops_t *ops, void *arg)
141{
142 int rc = EOK;
143 job_t *created_job = NULL;
144 job_closure_t *closure = arg;
145
146 sysman_log(LVL_DEBUG2, "%s(%s)", __func__, unit_name(u));
147 /*
148 * Unit can have starting job from original request or from isolation
149 * BFS with different origin.
150 *
151 * Don't check u->state == STATE_STOPPED, closure is created stateless
152 * and its upon merging procedure to correctly resolve conflicting
153 * jobs.
154 *
155 * If we're at the origin (no BFS incoming edge), create a stop job,
156 * put it to the closure and let propagate as if called the propagate
157 * visitor.
158 */
159 if (e == NULL && u->bfs_data == NULL) {
160 created_job = job_create(u, STATE_STOPPED);
161 if (created_job == NULL) {
162 rc = ENOMEM;
163 goto finish;
164 }
165
166 /* Pass job reference to closure and add one for unit */
167 rc = dyn_array_append(closure, job_t *, created_job);
168 if (rc != EOK) {
169 goto finish;
170 }
171 }
172 rc = visit_propagate_job(u, e, ops, closure);
173
174finish:
175 if (rc != EOK) {
176 job_del_ref(&created_job);
177 }
178 sysman_log(LVL_DEBUG2, "%s(%s) -> %i", __func__, unit_name(u), rc);
179 return rc;
180}
181
182static void traverse_clean(unit_t *u, bfs_ops_t *ops, void *arg)
183{
184 job_t *job = u->bfs_data;
185 job_del_ref(&job);
186 u->bfs_data = NULL;
187}
188
189static int bfs_traverse_component_internal(unit_t *origin, bfs_ops_t *ops,
190 void *arg)
191{
192 int rc;
193 list_t units_fifo;
194 list_initialize(&units_fifo);
195
196 unit_t *unit = origin;
197
198 rc = ops->visit(unit, NULL, ops, arg);
199 if (rc != EOK) {
200 goto finish;
201 }
202 unit->bfs_tag = true;
203 list_append(&unit->bfs_link, &units_fifo);
204
205 while (!list_empty(&units_fifo)) {
206 unit = list_get_instance(list_first(&units_fifo), unit_t,
207 bfs_link);
208 list_remove(&unit->bfs_link);
209
210 if (ops->direction == BFS_FORWARD) {
211 list_foreach(unit->edges_out, edges_out, unit_edge_t, e) {
212 unit_t *u = e->output;
213 if (!u->bfs_tag) {
214 u->bfs_tag = true;
215 list_append(&u->bfs_link, &units_fifo);
216 }
217 rc = ops->visit(u, e, ops, arg);
218 if (rc != EOK) {
219 goto finish;
220 }
221 }
222 } else {
223 list_foreach(unit->edges_in, edges_in, unit_edge_t, e) {
224 unit_t *u = e->input;
225 if (!u->bfs_tag) {
226 u->bfs_tag = true;
227 list_append(&u->bfs_link, &units_fifo);
228 }
229 rc = ops->visit(u, e, ops, arg);
230 if (rc != EOK) {
231 goto finish;
232 }
233 }
234 }
235 }
236 rc = EOK;
237
238finish:
239 /* Let user clean partially processed units */
240 list_foreach_safe(units_fifo, cur_link, next_link) {
241 unit_t *u = list_get_instance(cur_link, unit_t, bfs_link);
242 ops->clean(u, ops, arg);
243 list_remove(cur_link);
244 }
245
246 return rc;
247}
248
249static int bfs_traverse_component(unit_t *origin, bfs_ops_t *ops, void *arg)
250{
251 /* Check invariant */
252 repo_foreach(u) {
253 assert(u->bfs_tag == false);
254 }
255 int rc = bfs_traverse_component_internal(origin, ops, arg);
256
257 /* Clean after ourselves (BFS tag jobs) */
258 repo_foreach(u) {
259 u->bfs_tag = false;
260 }
261 return rc;
262}
263
264static int bfs_traverse_all(bfs_ops_t *ops, void *arg)
265{
266 /* Check invariant */
267 repo_foreach(u) {
268 assert(u->bfs_tag == false);
269 }
270 int rc = EOK;
271
272 repo_foreach(origin) {
273 sysman_log(LVL_DEBUG2, "%s: %p, %i", __func__, origin, origin->bfs_tag);
274 if (origin->bfs_tag == true) {
275 continue;
276 }
277 rc = bfs_traverse_component_internal(origin, ops, arg);
278 if (rc != EOK) {
279 goto finish;
280 }
281 }
282
283finish:
284 /* Clean after ourselves (BFS tag jobs) */
285 repo_foreach(u) {
286 u->bfs_tag = false;
287 }
288 return rc;
289}
290
291/*
292 * Non-static functions
293 */
294
295/** Creates job closure for given basic job
296 *
297 * @note It is caller's responsibility to clean job_closure (even on error).
298 *
299 * @return EOK on success otherwise propagated error
300 */
301int job_create_closure(job_t *main_job, job_closure_t *job_closure, int flags)
302{
303 sysman_log(LVL_DEBUG2, "%s(%s)", __func__, unit_name(main_job->unit));
304
305 if ((flags & CLOSURE_ISOLATE) && main_job->target_state != STATE_STARTED) {
306 // TODO EINVAL?
307 return ENOTSUP;
308 }
309
310 int rc = dyn_array_append(job_closure, job_t *, main_job);
311 if (rc != EOK) {
312 return rc;
313 }
314 job_add_ref(main_job); /* Add one for the closure */
315
316 /* Propagate main_job to other (dependent) units */
317 static bfs_ops_t propagate_ops = {
318 .clean = traverse_clean,
319 .visit = visit_propagate_job
320 };
321 switch (main_job->target_state) {
322 case STATE_STARTED:
323 propagate_ops.direction = BFS_FORWARD;
324 break;
325 case STATE_STOPPED:
326 propagate_ops.direction = BFS_BACKWARD;
327 break;
328 default:
329 assert(false);
330 }
331
332 rc = bfs_traverse_component(main_job->unit, &propagate_ops, job_closure);
333
334 sysman_log(LVL_DEBUG2, "%s: %i&%i", __func__, flags, CLOSURE_ISOLATE);
335 if (flags & CLOSURE_ISOLATE) {
336 static bfs_ops_t isolate_ops = {
337 .direction = BFS_BACKWARD,
338 .clean = traverse_clean,
339 .visit = visit_isolate
340 };
341 rc = bfs_traverse_all(&isolate_ops, job_closure);
342 }
343
344 if (rc == EOK) {
345 dyn_array_foreach(*job_closure, job_t *, job_it) {
346 sysman_log(LVL_DEBUG2, "%s\t%s, refs: %u", __func__,
347 unit_name((*job_it)->unit), atomic_load(&(*job_it)->refcnt));
348 }
349 }
350
351 /* Clean after ourselves (BFS tag jobs) */
352 dyn_array_foreach(*job_closure, job_t *, job_it) {
353 job_t *j = (*job_it)->unit->bfs_data;
354 assert(*job_it == j);
355 job_del_ref(&j);
356 (*job_it)->unit->bfs_data = NULL;
357 }
358
359 return rc;
360}
Note: See TracBrowser for help on using the repository browser.