source: mainline/uspace/srv/sysman/job.c@ dd5c623

Last change on this file since dd5c623 was dd5c623, checked in by Matthieu Riolo <matthieu.riolo@…>, 6 years ago

dyn_array: Non-allocating initialization API

  • Property mode set to 100644
File size: 10.1 KB
Line 
1/*
2 * Copyright (c) 2015 Michal Koutny
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 *
9 * - Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 * - Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
14 * - The name of the author may not be used to endorse or promote products
15 * derived from this software without specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AS IS'' AND ANY EXPRESS OR
18 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
19 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
20 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
21 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
22 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
26 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27 */
28
29#include <adt/list.h>
30#include <assert.h>
31#include <errno.h>
32#include <stdlib.h>
33
34#include "configuration.h"
35#include "dep.h"
36#include "job.h"
37#include "log.h"
38#include "sysman.h"
39
40static list_t job_queue;
41
42/*
43 * Static functions
44 */
45
46static int job_add_blocked_job(job_t *blocking_job, job_t *blocked_job)
47{
48 int rc = dyn_array_append(&blocking_job->blocked_jobs, job_ptr_t,
49 blocked_job);
50 if (rc != EOK) {
51 return ENOMEM;
52 }
53 job_add_ref(blocked_job);
54
55 blocked_job->blocking_jobs += 1;
56
57 return EOK;
58}
59
60/** Remove blocking_job from blocked job structure
61 *
62 * @note Caller must remove blocked_job from collection of blocked_jobs
63 */
64static void job_unblock(job_t *blocked_job, job_t *blocking_job)
65{
66 if (blocking_job->retval == JOB_FAILED) {
67 blocked_job->blocking_job_failed = true;
68 }
69 blocked_job->blocking_jobs -= 1;
70
71 job_del_ref(&blocked_job);
72}
73
74static void job_init(job_t *job, unit_t *u, unit_state_t target_state)
75{
76 assert(job);
77 assert(u);
78 assert(u->job == NULL);
79 memset(job, 0, sizeof(*job));
80
81 link_initialize(&job->job_queue);
82
83 atomic_set(&job->refcnt, 0);
84
85 job->target_state = target_state;
86 job->unit = u;
87
88 u->job = job;
89 job_add_ref(job);
90
91 dyn_array_initialize(&job->blocked_jobs, job_ptr_t);
92 job->blocking_jobs = 0;
93 job->blocking_job_failed = false;
94
95 job->state = JOB_UNQUEUED;
96 job->retval = JOB_UNDEFINED_;
97}
98
99static bool job_eval_retval(job_t *job)
100{
101 unit_t *u = job->unit;
102 if (u->state == job->target_state) {
103 job->retval = JOB_OK;
104 return true;
105 } else if (u->state == STATE_FAILED) {
106 job->retval = JOB_FAILED;
107 return true;
108 } else {
109 return false;
110 }
111}
112
113static void job_check(void *object, void *data)
114{
115 unit_t *u = object;
116 job_t *job = data;
117
118 /*
119 * We have one reference from caller for our disposal, *
120 * if needed, pass it to observer.
121 */
122 if (job_eval_retval(job)) {
123 job_finish(job);
124 job_del_ref(&job);
125 } else {
126 // TODO place for timeout
127 sysman_object_observer(u, &job_check, job);
128 }
129}
130
131static void job_destroy(job_t **job_ptr)
132{
133 job_t *job = *job_ptr;
134 if (job == NULL) {
135 return;
136 }
137
138 assert(!link_used(&job->job_queue));
139
140 dyn_array_foreach(job->blocked_jobs, job_ptr_t, job_it) {
141 job_del_ref(&(*job_it));
142 }
143 dyn_array_destroy(&job->blocked_jobs);
144
145 free(job);
146 *job_ptr = NULL;
147}
148
149static bool job_is_runnable(job_t *job)
150{
151 return job->state == JOB_QUEUED && job->blocking_jobs == 0;
152}
153
154/** Pop next runnable job
155 *
156 * @return runnable job or NULL when there's none
157 */
158static job_t *job_queue_pop_runnable(void)
159{
160 job_t *result = NULL;
161
162 /* Select first runnable job */
163 list_foreach(job_queue, job_queue, job_t, candidate) {
164 if (job_is_runnable(candidate)) {
165 result = candidate;
166 break;
167 }
168 }
169 if (result) {
170 /* Remove job from queue and pass reference to caller */
171 list_remove(&result->job_queue);
172 result->state = JOB_DEQUEUED;
173 }
174
175 return result;
176}
177
178/*
179 * Non-static functions
180 */
181
182void job_queue_init()
183{
184 list_initialize(&job_queue);
185}
186
187int job_queue_add_jobs(dyn_array_t *jobs)
188{
189 /* Check consistency with queue. */
190 dyn_array_foreach(*jobs, job_ptr_t, new_job_it) {
191 list_foreach(job_queue, job_queue, job_t, queued_job) {
192 /*
193 * Currently we have strict strategy not permitting
194 * multiple jobs for one unit in the queue at a time.
195 */
196 if ((*new_job_it)->unit == queued_job->unit) {
197 sysman_log(LVL_ERROR,
198 "Cannot queue multiple jobs for unit '%s'",
199 unit_name((*new_job_it)->unit));
200 return EEXIST;
201 }
202 }
203 }
204
205 /* Enqueue jobs */
206 dyn_array_foreach(*jobs, job_ptr_t, job_it) {
207 (*job_it)->state = JOB_QUEUED;
208 list_append(&(*job_it)->job_queue, &job_queue);
209 /* We pass reference from the closure to the queue */
210 }
211
212 return EOK;
213}
214
215/** Process all jobs that aren't transitively blocked
216 *
217 * Job can be blocked either by another job or by an incoming event, that will
218 * be queued after this job_queue_process call.
219 *
220 * TODO Write down rules from where this function can be called, to avoid stack
221 * overflow.
222 */
223void job_queue_process(void)
224{
225 job_t *job;
226 while ((job = job_queue_pop_runnable())) {
227 job_run(job);
228 job_del_ref(&job);
229 }
230}
231
232int job_create_closure(job_t *main_job, dyn_array_t *job_closure)
233{
234 int rc;
235 list_t units_fifo;
236 list_initialize(&units_fifo);
237
238 /* Zero BFS tags before use */
239 list_foreach(units, units, unit_t, u) {
240 u->bfs_tag = false;
241 }
242
243 unit_t *unit = main_job->unit;
244 list_append(&unit->bfs_link, &units_fifo);
245 unit->bfs_tag = true;
246
247 while (!list_empty(&units_fifo)) {
248 unit = list_get_instance(list_first(&units_fifo), unit_t,
249 bfs_link);
250 assert(unit->job);
251 list_remove(&unit->bfs_link);
252 job_t *job = unit->job;
253
254
255 // TODO more sophisticated check? (unit that is in transitional
256 // state cannot have currently multiple jobs queued)
257 if (job->target_state == unit->state) {
258 /*
259 * Job would do nothing, finish it on spot.
260 * No need to continue BFS search from it.
261 */
262 job->retval = JOB_OK;
263 job_finish(job);
264 continue;
265 }
266
267 job_add_ref(job);
268 dyn_array_append(job_closure, job_ptr_t, job);
269
270 /*
271 * Traverse dependencies edges
272 * Depending on dependency type and edge direction create
273 * appropriate jobs.
274 */
275 list_foreach(unit->dependencies, dependencies, unit_dependency_t, dep) {
276 unit_t *u = dep->dependency;
277 job_t *blocking_job;
278 if (u->bfs_tag) {
279 assert(u->job);
280 blocking_job = u->job;
281 } else {
282 u->bfs_tag = true;
283 blocking_job = job_create(u, job->target_state);
284 if (blocking_job == NULL) {
285 rc = ENOMEM;
286 goto finish;
287 }
288 /* Reference to job is kept in unit */
289 job_del_ref(&blocking_job);
290 list_append(&u->bfs_link, &units_fifo);
291 }
292
293 job_add_blocked_job(blocking_job, job);
294 }
295 }
296 sysman_log(LVL_DEBUG2, "%s(%s):", __func__, unit_name(main_job->unit));
297 dyn_array_foreach(*job_closure, job_ptr_t, job_it) {
298 sysman_log(LVL_DEBUG2, "%s\t%s", __func__, unit_name((*job_it)->unit));
299 }
300 rc = EOK;
301
302finish:
303 /* Any unprocessed jobs may be referenced by units */
304 list_foreach(units_fifo, bfs_link, unit_t, u) {
305 job_del_ref(&u->job);
306 }
307 return rc;
308}
309
310/** Create job assigned to the unit
311 *
312 * @param[in] unit unit to be modified, its job must be empty
313 * @param[in] target_state
314 *
315 * @return NULL or newly created job
316 * There are two references to the job, one set in the unit and second
317 * is the return value.
318 */
319job_t *job_create(unit_t *u, unit_state_t target_state)
320{
321 job_t *job = malloc(sizeof(job_t));
322 if (job != NULL) {
323 job_init(job, u, target_state);
324
325 /* Add one reference for the creator */
326 job_add_ref(job);
327 }
328
329 return job;
330}
331
332/** Add one reference to job
333 *
334 * Usage:
335 * - adding observer which references the job,
336 * - raising and event that references the job,
337 * - anytime any other new reference is made.
338 */
339void job_add_ref(job_t *job)
340{
341 atomic_inc(&job->refcnt);
342}
343
344/** Remove one reference from job, last remover destroys the job
345 *
346 * Usage:
347 * - inside observer callback that references the job,
348 * - inside event handler that references the job,
349 * - anytime you dispose a reference to the job.
350 */
351void job_del_ref(job_t **job_ptr)
352{
353 job_t *job = *job_ptr;
354
355 assert(job != NULL);
356 assert(atomic_get(&job->refcnt) > 0);
357 if (atomic_predec(&job->refcnt) == 0) {
358 job_destroy(job_ptr);
359 }
360}
361
362void job_run(job_t *job)
363{
364 assert(job->state != JOB_RUNNING);
365 assert(job->state != JOB_FINISHED);
366
367 unit_t *u = job->unit;
368 sysman_log(LVL_DEBUG, "%s(%p), %s -> %i",
369 __func__, job, unit_name(u), job->target_state);
370
371 /* Propagate failure */
372 if (job->blocking_job_failed) {
373 goto fail;
374 }
375
376 int rc;
377 switch (job->target_state) {
378 case STATE_STARTED:
379 rc = unit_start(u);
380 break;
381 default:
382 // TODO implement other states
383 assert(false);
384 }
385 if (rc != EOK) {
386 sysman_log(LVL_DEBUG, "%s(%p), %s -> %i, error: %i",
387 __func__, job, unit_name(u), job->target_state, rc);
388 goto fail;
389 }
390
391 /*
392 * job_check deletes reference, we want job to remain to caller, thus
393 * add one dummy ref
394 */
395 job_add_ref(job);
396 job_check(job->unit, job);
397 return;
398
399fail:
400 job->retval = JOB_FAILED;
401 job_finish(job);
402}
403
404/** Unblocks blocked jobs and notify observers
405 *
406 * @param[in] job job with defined return value
407 */
408void job_finish(job_t *job)
409{
410 assert(job->state != JOB_FINISHED);
411 assert(job->retval != JOB_UNDEFINED_);
412 assert(job->unit->job == job);
413
414 sysman_log(LVL_DEBUG2, "%s(%p) %s -> %i",
415 __func__, job, unit_name(job->unit), job->retval);
416
417 job->state = JOB_FINISHED;
418
419 /* First remove references, then clear the array */
420 dyn_array_foreach(job->blocked_jobs, job_ptr_t, job_it) {
421 job_unblock(*job_it, job);
422 }
423 dyn_array_clear(&job->blocked_jobs);
424
425 /*
426 * Remove job from unit and pass the reference from the unit to the
427 * event.
428 */
429 job->unit->job = NULL;
430 sysman_raise_event(&sysman_event_job_finished, job);
431}
432
Note: See TracBrowser for help on using the repository browser.