source: mainline/uspace/srv/sysman/job_closure.c@ a097c50

Last change on this file since a097c50 was ed5367b, checked in by Matthieu Riolo <matthieu.riolo@…>, 6 years ago

sysman: Implement stopping units

Currently fails service monitoring because of taskman flawed event flags.
However, job closure works well.

  • Property mode set to 100644
File size: 6.8 KB
Line 
1/*
2 * Copyright (c) 2015 Michal Koutny
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 *
9 * - Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 * - Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
14 * - The name of the author may not be used to endorse or promote products
15 * derived from this software without specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AS IS'' AND ANY EXPRESS OR
18 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
19 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
20 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
21 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
22 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
26 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27 */
28
29#include <adt/list.h>
30#include <assert.h>
31#include <errno.h>
32#include <stdlib.h>
33
34#include "repo.h"
35#include "edge.h"
36#include "job_closure.h"
37#include "log.h"
38
39
40/** Struct describes how to traverse units graph */
41struct bfs_ops;
42typedef struct bfs_ops bfs_ops_t;
43
44struct bfs_ops {
45 enum {
46 BFS_FORWARD, /**< Follow oriented edges */
47 BFS_BACKWARD /**< Go against oriented edges */
48 } direction;
49
50 /** Visit a unit via edge
51 * unit, incoming edge, traversing ops, user data
52 * return result of visit (error stops further traversal)
53 */
54 int (* visit)(unit_t *, unit_edge_t *, bfs_ops_t *, void *);
55
56 /** Clean units remaining in BFS queue after error */
57 void (* clean)(unit_t *, bfs_ops_t *, void *);
58};
59
60/*
61 * Static functions
62 */
63
64static int job_add_blocked_job(job_t *blocking_job, job_t *blocked_job)
65{
66 assert(blocking_job->blocked_jobs.size ==
67 blocking_job->blocked_jobs_count);
68
69 int rc = dyn_array_append(&blocking_job->blocked_jobs, job_t *,
70 blocked_job);
71 if (rc != EOK) {
72 return ENOMEM;
73 }
74 job_add_ref(blocked_job);
75
76 blocking_job->blocked_jobs_count += 1;
77 blocked_job->blocking_jobs += 1;
78
79 return EOK;
80}
81
82/** During visit creates job and appends it to closure
83 *
84 * @note Assumes BFS start unit's job is already present in closure!
85 *
86 * @return EOK on success
87 */
88static int visit_propagate_job(unit_t *u, unit_edge_t *e, bfs_ops_t *ops,
89 void *arg)
90{
91 int rc = EOK;
92 job_t *created_job = NULL;
93 job_closure_t *closure = arg;
94
95 if (e == NULL) {
96 assert(u->bfs_data == NULL);
97 job_t *first_job = dyn_array_last(closure, job_t *);
98
99 job_add_ref(first_job);
100 u->bfs_data = first_job;
101
102 goto finish;
103 }
104
105 job_t *job = (ops->direction == BFS_FORWARD) ?
106 e->input->bfs_data :
107 e->output->bfs_data;
108
109 assert(job != NULL);
110
111 if (u->bfs_data == NULL) {
112 created_job = job_create(u, job->target_state);
113 if (created_job == NULL) {
114 rc = ENOMEM;
115 goto finish;
116 }
117
118 /* Pass job reference to closure and add one for unit */
119 rc = dyn_array_append(closure, job_t *, created_job);
120 if (rc != EOK) {
121 goto finish;
122 }
123
124 job_add_ref(created_job);
125 u->bfs_data = created_job;
126 }
127
128 /* Depending on edge type block existing jobs */
129 rc = job_add_blocked_job(u->bfs_data, job);
130
131finish:
132 if (rc != EOK) {
133 job_del_ref(&created_job);
134 }
135 return rc;
136}
137
138static void traverse_clean(unit_t *u, bfs_ops_t *ops, void *arg)
139{
140 job_t *job = u->bfs_data;
141 job_del_ref(&job);
142}
143
144static int bfs_traverse_component_internal(unit_t *origin, bfs_ops_t *ops,
145 void *arg)
146{
147 int rc;
148 list_t units_fifo;
149 list_initialize(&units_fifo);
150
151 unit_t *unit = origin;
152
153 rc = ops->visit(unit, NULL, ops, arg);
154 if (rc != EOK) {
155 goto finish;
156 }
157 unit->bfs_tag = true;
158 list_append(&unit->bfs_link, &units_fifo);
159
160 while (!list_empty(&units_fifo)) {
161 unit = list_get_instance(list_first(&units_fifo), unit_t,
162 bfs_link);
163 list_remove(&unit->bfs_link);
164
165
166 if (ops->direction == BFS_FORWARD)
167 list_foreach(unit->edges_out, edges_out, unit_edge_t, e) {
168 unit_t *u = e->output;
169 if (!u->bfs_tag) {
170 u->bfs_tag = true;
171 list_append(&u->bfs_link, &units_fifo);
172 }
173 rc = ops->visit(u, e, ops, arg);
174 if (rc != EOK) {
175 goto finish;
176 }
177 } else
178 list_foreach(unit->edges_in, edges_in, unit_edge_t, e) {
179 unit_t *u = e->input;
180 if (!u->bfs_tag) {
181 u->bfs_tag = true;
182 list_append(&u->bfs_link, &units_fifo);
183 }
184 rc = ops->visit(u, e, ops, arg);
185 if (rc != EOK) {
186 goto finish;
187 }
188 }
189 }
190 rc = EOK;
191
192finish:
193 /* Let user clean partially processed units */
194 list_foreach_safe(units_fifo, cur_link, next_link) {
195 unit_t *u = list_get_instance(cur_link, unit_t, bfs_link);
196 ops->clean(u, ops, arg);
197 list_remove(cur_link);
198 }
199
200 return rc;
201}
202
203static int bfs_traverse_component(unit_t *origin, bfs_ops_t *ops, void *arg)
204{
205 /* Check invariant */
206 list_foreach(units, units, unit_t, u) {
207 assert(u->bfs_tag == false);
208 }
209 int rc = bfs_traverse_component_internal(origin, ops, arg);
210
211 /* Clean after ourselves (BFS tag jobs) */
212 list_foreach(units, units, unit_t, u) {
213 u->bfs_tag = false;
214 }
215 return rc;
216}
217
218// TODO bfs_traverse_all
219
220
221/*
222 * Non-static functions
223 */
224
225/** Creates job closure for given basic job
226 *
227 * @note It is caller's responsibility to clean job_closure (event on error).
228 *
229 * @return EOK on success otherwise propagated error
230 */
231int job_create_closure(job_t *main_job, job_closure_t *job_closure)
232{
233 sysman_log(LVL_DEBUG2, "%s(%s)", __func__, unit_name(main_job->unit));
234
235 static bfs_ops_t ops = {
236 .clean = traverse_clean,
237 .visit = visit_propagate_job
238 };
239
240 switch (main_job->target_state) {
241 case STATE_STARTED:
242 ops.direction = BFS_FORWARD;
243 break;
244 case STATE_STOPPED:
245 ops.direction = BFS_BACKWARD;
246 break;
247 default:
248 assert(false);
249 }
250
251 int rc = dyn_array_append(job_closure, job_t *, main_job);
252 if (rc != EOK) {
253 return rc;
254 }
255 job_add_ref(main_job); /* Add one for the closure */
256
257 rc = bfs_traverse_component(main_job->unit, &ops, job_closure);
258
259 if (rc == EOK) {
260 sysman_log(LVL_DEBUG2, "%s(%s):", __func__, unit_name(main_job->unit));
261 dyn_array_foreach(*job_closure, job_t *, job_it) {
262 sysman_log(LVL_DEBUG2, "%s\t%s, refs: %u", __func__,
263 unit_name((*job_it)->unit), atomic_get(&(*job_it)->refcnt));
264 }
265 }
266
267
268 /* Clean after ourselves (BFS tag jobs) */
269 dyn_array_foreach(*job_closure, job_t *, job_it) {
270 job_t *j = (*job_it)->unit->bfs_data;
271 assert(*job_it == j);
272 job_del_ref(&j);
273 (*job_it)->unit->bfs_data = NULL;
274 }
275
276 return rc;
277}
278
Note: See TracBrowser for help on using the repository browser.