source: mainline/uspace/srv/sysman/job.c@ 9532981

Last change on this file since 9532981 was 9532981, checked in by Matthieu Riolo <matthieu.riolo@…>, 6 years ago

sysman: Rename dependency to edge (more generic)

  • Property mode set to 100644
File size: 14.0 KB
RevLine 
[09a8006]1/*
2 * Copyright (c) 2015 Michal Koutny
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 *
9 * - Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 * - Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
14 * - The name of the author may not be used to endorse or promote products
15 * derived from this software without specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AS IS'' AND ANY EXPRESS OR
18 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
19 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
20 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
21 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
22 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
26 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27 */
28
[dda2602]29#include <adt/list.h>
[694253c]30#include <assert.h>
31#include <errno.h>
[dda2602]32#include <stdlib.h>
[694253c]33
[af92309]34#include "repo.h"
[9532981]35#include "edge.h"
[694253c]36#include "job.h"
[6efec7e3]37#include "log.h"
[3f7e1f24]38#include "sysman.h"
[694253c]39
40static list_t job_queue;
[c0c388d2]41
[3f7e1f24]42/*
43 * Static functions
44 */
[c0c388d2]45
[dda2602]46static int job_add_blocked_job(job_t *blocking_job, job_t *blocked_job)
[694253c]47{
[72c8f77]48 assert(blocking_job->blocked_jobs.size ==
49 blocking_job->blocked_jobs_count);
50
51 int rc = dyn_array_append(&blocking_job->blocked_jobs, job_t *,
[dda2602]52 blocked_job);
[694253c]53 if (rc != EOK) {
[3f7e1f24]54 return ENOMEM;
[694253c]55 }
[3f7e1f24]56 job_add_ref(blocked_job);
[8432ae1]57
[72c8f77]58 blocking_job->blocked_jobs_count += 1;
[3f7e1f24]59 blocked_job->blocking_jobs += 1;
[694253c]60
61 return EOK;
62}
63
[8432ae1]64/** Remove blocking_job from blocked job structure
65 *
66 * @note Caller must remove blocked_job from collection of blocked_jobs
67 */
68static void job_unblock(job_t *blocked_job, job_t *blocking_job)
69{
70 if (blocking_job->retval == JOB_FAILED) {
71 blocked_job->blocking_job_failed = true;
72 }
73 blocked_job->blocking_jobs -= 1;
74
75 job_del_ref(&blocked_job);
76}
77
[3f7e1f24]78static void job_init(job_t *job, unit_t *u, unit_state_t target_state)
[694253c]79{
[3f7e1f24]80 assert(job);
81 assert(u);
[5559712]82 memset(job, 0, sizeof(*job));
[694253c]83
[3f7e1f24]84 link_initialize(&job->job_queue);
[694253c]85
[8432ae1]86 atomic_set(&job->refcnt, 0);
[694253c]87
[3f7e1f24]88 job->target_state = target_state;
89 job->unit = u;
[694253c]90
[72c8f77]91 dyn_array_initialize(&job->blocked_jobs, job_t *);
[3f7e1f24]92 job->blocking_jobs = 0;
93 job->blocking_job_failed = false;
[694253c]94
[72c8f77]95 job->state = JOB_EMBRYO;
[3f7e1f24]96 job->retval = JOB_UNDEFINED_;
97}
98
99static bool job_eval_retval(job_t *job)
100{
101 unit_t *u = job->unit;
[72c8f77]102
[3f7e1f24]103 if (u->state == job->target_state) {
104 job->retval = JOB_OK;
105 return true;
106 } else if (u->state == STATE_FAILED) {
107 job->retval = JOB_FAILED;
108 return true;
109 } else {
110 return false;
[694253c]111 }
[3f7e1f24]112}
[694253c]113
[3f7e1f24]114static void job_check(void *object, void *data)
115{
116 unit_t *u = object;
117 job_t *job = data;
118
[8432ae1]119 /*
[72c8f77]120 * We have one reference from caller for our disposal,
[8432ae1]121 * if needed, pass it to observer.
122 */
[3f7e1f24]123 if (job_eval_retval(job)) {
124 job_finish(job);
[8432ae1]125 job_del_ref(&job);
[3f7e1f24]126 } else {
127 // TODO place for timeout
128 sysman_object_observer(u, &job_check, job);
129 }
130}
[694253c]131
[3f7e1f24]132static void job_destroy(job_t **job_ptr)
[694253c]133{
[3f7e1f24]134 job_t *job = *job_ptr;
135 if (job == NULL) {
136 return;
[694253c]137 }
138
[3f7e1f24]139 assert(!link_used(&job->job_queue));
[8432ae1]140
[72c8f77]141 dyn_array_foreach(job->blocked_jobs, job_t *, job_it) {
[8432ae1]142 job_del_ref(&(*job_it));
143 }
[3f7e1f24]144 dyn_array_destroy(&job->blocked_jobs);
145
146 free(job);
147 *job_ptr = NULL;
[694253c]148}
149
[5559712]150static bool job_is_runnable(job_t *job)
151{
[72c8f77]152 assert(job->state == JOB_PENDING);
153 return job->blocking_jobs == 0;
[5559712]154}
155
156/** Pop next runnable job
157 *
158 * @return runnable job or NULL when there's none
159 */
160static job_t *job_queue_pop_runnable(void)
161{
162 job_t *result = NULL;
163
164 /* Select first runnable job */
165 list_foreach(job_queue, job_queue, job_t, candidate) {
166 if (job_is_runnable(candidate)) {
167 result = candidate;
168 break;
169 }
170 }
171 if (result) {
172 /* Remove job from queue and pass reference to caller */
173 list_remove(&result->job_queue);
174 }
175
176 return result;
177}
178
[dba056b]179/** Add multiple references to job
180 *
181 * Non-atomicity doesn't mind as long as individual increments are atomic.
182 *
183 * @note Function is not exported as other modules shouldn't need it.
184 */
185static inline void job_add_refs(job_t *job, size_t refs)
186{
187 for (size_t i = 0; i < refs; ++i) {
188 job_add_ref(job);
189 }
190}
191
192/** Delete multiple references to job
193 *
194 * Behavior of concurrent runs with job_add_refs aren't specified.
195 */
196static inline void job_del_refs(job_t **job_ptr, size_t refs)
197{
198 for (size_t i = 0; i < refs; ++i) {
199 job_del_ref(job_ptr);
200 }
201}
202
[72c8f77]203/** Merge two jobs together
204 *
205 * @param[in/out] trunk job that
206 * @param[in] other job that will be cleared out
207 *
208 * @return EOK on success
209 * @return error code on fail
210 */
211static int job_pre_merge(job_t *trunk, job_t *other)
212{
213 assert(trunk->unit == other->unit);
214 assert(trunk->target_state == other->target_state);
215 assert(trunk->blocked_jobs.size == trunk->blocked_jobs_count);
216 assert(other->merged_into == NULL);
217
218 int rc = dyn_array_concat(&trunk->blocked_jobs, &other->blocked_jobs);
219 if (rc != EOK) {
220 return rc;
221 }
222 dyn_array_clear(&other->blocked_jobs);
223
224 // TODO allocate observed object
225
226 other->merged_into = trunk;
227
228 return EOK;
229}
230
231static void job_finish_merge(job_t *trunk, job_t *other)
232{
233 assert(trunk->blocked_jobs.size >= trunk->blocked_jobs_count);
234 //TODO aggregate merged blocked_jobs
235 trunk->blocked_jobs_count = other->blocked_jobs.size;
236
[dba056b]237 /*
238 * Note, the sysman_move_observers cannot fail here sice all necessary
239 * allocation is done in job_pre_merge.
240 */
241 size_t observers_refs = sysman_observers_count(other);
[72c8f77]242 int rc = sysman_move_observers(other, trunk);
243 assert(rc == EOK);
[dba056b]244
245 /* When we move observers, don't forget to pass their references too. */
246 job_add_refs(trunk, observers_refs);
247 job_del_refs(&other, observers_refs);
[72c8f77]248}
249
250static void job_undo_merge(job_t *trunk)
251{
252 assert(trunk->blocked_jobs.size >= trunk->blocked_jobs_count);
253 dyn_array_clear_range(&trunk->blocked_jobs,
254 trunk->blocked_jobs_count, trunk->blocked_jobs.size);
255}
256
[3f7e1f24]257/*
258 * Non-static functions
259 */
260
[694253c]261void job_queue_init()
262{
263 list_initialize(&job_queue);
264}
265
[dba056b]266/** Consistenly add jobs to the queue
267 *
268 * @param[in/out] closure jobs closure, on success it's emptied, otherwise
269 * you should take care of remaining jobs
270 *
271 * @return EOK on success
272 * @return EBUSY when any job in closure is conflicting
273 */
[72c8f77]274int job_queue_add_closure(dyn_array_t *closure)
[694253c]275{
[72c8f77]276 bool has_error = false;
277 int rc = EOK;
278
279 /* Check consistency with existing jobs. */
280 dyn_array_foreach(*closure, job_t *, job_it) {
281 job_t *job = *job_it;
282 job_t *other_job = job->unit->job;
283
284 if (other_job == NULL) {
285 continue;
286 }
287
288 if (other_job->target_state != job->target_state) {
289 switch (other_job->state) {
290 case JOB_RUNNING:
[3f7e1f24]291 sysman_log(LVL_ERROR,
[72c8f77]292 "Unit '%s' has already different job running.",
293 unit_name(job->unit));
294 has_error = true;
295 continue;
296 case JOB_PENDING:
297 /*
298 * Currently we have strict strategy not
299 * permitting multiple jobs for one unit in the
300 * queue at a time.
301 */
302 sysman_log(LVL_ERROR,
303 "Cannot queue multiple jobs for unit '%s'.",
304 unit_name(job->unit));
305 has_error = true;
306 continue;
307 default:
308 assert(false);
309 }
310 } else {
311 // TODO think about other options to merging
312 // (replacing, cancelling)
313 rc = job_pre_merge(other_job, job);
314 if (rc != EOK) {
315 break;
[694253c]316 }
317 }
318 }
319
[72c8f77]320 /* Aggregate merged jobs, or rollback any changes in existing jobs */
321 bool finish_merge = (rc == EOK) && !has_error;
322 dyn_array_foreach(*closure, job_t *, job_it) {
323 if ((*job_it)->merged_into == NULL) {
324 continue;
325 }
326 if (finish_merge) {
327 job_finish_merge((*job_it)->merged_into, *job_it);
328 } else {
329 job_undo_merge((*job_it)->merged_into);
330 }
331 }
332 if (has_error) {
333 return EBUSY;
334 } else if (rc != EOK) {
335 return rc;
336 }
337
338 /* Unmerged jobs are enqueued, merged are disposed */
339 dyn_array_foreach(*closure, job_t *, job_it) {
340 job_t *job = (*job_it);
341 if (job->merged_into != NULL) {
342 job_del_ref(&job);
343 continue;
344 }
345
346
347 unit_t *u = job->unit;
348 assert(u->job == NULL);
[dba056b]349 /* Pass reference from the closure to the unit */
350 u->job = job;
[72c8f77]351
[dba056b]352 /* Enqueue job (new reference) */
[72c8f77]353 job->state = JOB_PENDING;
[dba056b]354 job_add_ref(job);
[72c8f77]355 list_append(&job->job_queue, &job_queue);
[694253c]356 }
357
[dba056b]358 /* We've stolen references from the closure, so erase it */
359 dyn_array_clear(closure);
360
[694253c]361 return EOK;
362}
363
[5559712]364/** Process all jobs that aren't transitively blocked
[694253c]365 *
[5559712]366 * Job can be blocked either by another job or by an incoming event, that will
367 * be queued after this job_queue_process call.
368 *
369 * TODO Write down rules from where this function can be called, to avoid stack
370 * overflow.
[694253c]371 */
[5559712]372void job_queue_process(void)
[694253c]373{
[5559712]374 job_t *job;
375 while ((job = job_queue_pop_runnable())) {
376 job_run(job);
377 job_del_ref(&job);
[3f7e1f24]378 }
379}
380
381int job_create_closure(job_t *main_job, dyn_array_t *job_closure)
382{
[72c8f77]383 sysman_log(LVL_DEBUG2, "%s(%s)", __func__, unit_name(main_job->unit));
[3f7e1f24]384 int rc;
[dda2602]385 list_t units_fifo;
386 list_initialize(&units_fifo);
[3f7e1f24]387
[72c8f77]388 /* Check invariant */
[dda2602]389 list_foreach(units, units, unit_t, u) {
[dba056b]390 assert(u->bfs_job == NULL);
[dda2602]391 }
[5559712]392
[dda2602]393 unit_t *unit = main_job->unit;
[72c8f77]394 job_add_ref(main_job);
395 unit->bfs_job = main_job;
[dda2602]396 list_append(&unit->bfs_link, &units_fifo);
397
398 while (!list_empty(&units_fifo)) {
399 unit = list_get_instance(list_first(&units_fifo), unit_t,
400 bfs_link);
401 list_remove(&unit->bfs_link);
[72c8f77]402 job_t *job = unit->bfs_job;
403 assert(job != NULL);
[3f7e1f24]404
[dda2602]405 job_add_ref(job);
[72c8f77]406 dyn_array_append(job_closure, job_t *, job);
[dda2602]407
408 /*
409 * Traverse dependencies edges
[72c8f77]410 * According to dependency type and edge direction create
411 * appropriate jobs (currently "After" only).
[dda2602]412 */
[9532981]413 list_foreach(unit->edges_out, edges_out, unit_edge_t, e) {
414 unit_t *u = e->output;
[dda2602]415 job_t *blocking_job;
[72c8f77]416
417 if (u->bfs_job == NULL) {
[dda2602]418 blocking_job = job_create(u, job->target_state);
419 if (blocking_job == NULL) {
420 rc = ENOMEM;
421 goto finish;
422 }
[72c8f77]423 /* Pass reference to unit */
424 u->bfs_job = blocking_job;
[dda2602]425 list_append(&u->bfs_link, &units_fifo);
[72c8f77]426 } else {
427 blocking_job = u->bfs_job;
[3f7e1f24]428 }
[72c8f77]429
[dda2602]430 job_add_blocked_job(blocking_job, job);
[3f7e1f24]431 }
432 }
[dba056b]433 sysman_log(LVL_DEBUG2, "%s(%s):", __func__, unit_name(main_job->unit));
434 dyn_array_foreach(*job_closure, job_t *, job_it) {
435 sysman_log(LVL_DEBUG2, "%s\t%s, refs: %u", __func__,
436 unit_name((*job_it)->unit), atomic_get(&(*job_it)->refcnt));
437 }
[3f7e1f24]438 rc = EOK;
439
440finish:
[72c8f77]441 /* Unreference any jobs in interrupted BFS queue */
442 list_foreach_safe(units_fifo, cur_link, next_link) {
443 unit_t *u = list_get_instance(cur_link, unit_t, bfs_link);
444 job_del_ref(&u->bfs_job);
445 list_remove(cur_link);
[dda2602]446 }
[dba056b]447
448 /* Clean after ourselves (BFS tag jobs) */
449 dyn_array_foreach(*job_closure, job_t *, job_it) {
450 assert(*job_it == (*job_it)->unit->bfs_job);
451 job_del_ref(&(*job_it)->unit->bfs_job);
452 (*job_it)->unit->bfs_job = NULL;
453 }
[72c8f77]454
[694253c]455 return rc;
456}
457
[dda2602]458/** Create job assigned to the unit
459 *
[72c8f77]460 * @param[in] unit
[dda2602]461 * @param[in] target_state
462 *
[72c8f77]463 * @return NULL or newly created job (there is a single refernce for the creator)
[dda2602]464 */
[3f7e1f24]465job_t *job_create(unit_t *u, unit_state_t target_state)
466{
467 job_t *job = malloc(sizeof(job_t));
468 if (job != NULL) {
469 job_init(job, u, target_state);
[8432ae1]470
[dda2602]471 /* Add one reference for the creator */
[8432ae1]472 job_add_ref(job);
[3f7e1f24]473 }
474
475 return job;
476}
477
[8432ae1]478/** Add one reference to job
479 *
480 * Usage:
481 * - adding observer which references the job,
482 * - raising and event that references the job,
483 * - anytime any other new reference is made.
484 */
[c0c388d2]485void job_add_ref(job_t *job)
486{
487 atomic_inc(&job->refcnt);
488}
489
[8432ae1]490/** Remove one reference from job, last remover destroys the job
491 *
492 * Usage:
493 * - inside observer callback that references the job,
494 * - inside event handler that references the job,
495 * - anytime you dispose a reference to the job.
496 */
[c0c388d2]497void job_del_ref(job_t **job_ptr)
498{
499 job_t *job = *job_ptr;
500
[8432ae1]501 assert(job != NULL);
[c0c388d2]502 assert(atomic_get(&job->refcnt) > 0);
503 if (atomic_predec(&job->refcnt) == 0) {
504 job_destroy(job_ptr);
505 }
506}
507
[3f7e1f24]508void job_run(job_t *job)
[694253c]509{
[72c8f77]510 assert(job->state == JOB_PENDING);
[694253c]511
[72c8f77]512 job->state = JOB_RUNNING;
[3f7e1f24]513 unit_t *u = job->unit;
[8432ae1]514 sysman_log(LVL_DEBUG, "%s(%p), %s -> %i",
515 __func__, job, unit_name(u), job->target_state);
[694253c]516
[3f7e1f24]517 /* Propagate failure */
518 if (job->blocking_job_failed) {
519 goto fail;
[694253c]520 }
521
[3f7e1f24]522 int rc;
523 switch (job->target_state) {
524 case STATE_STARTED:
[72c8f77]525 // TODO put here same evaluation as in job_check
526 // goal is to have job_run "idempotent"
527 if (u->state == job->target_state) {
528 rc = EOK;
529 } else {
530 rc = unit_start(u);
531 }
[3f7e1f24]532 break;
533 default:
534 // TODO implement other states
535 assert(false);
536 }
537 if (rc != EOK) {
[63a3276]538 //TODO here is 'rc' value "lost" (not propagated further)
[dda2602]539 sysman_log(LVL_DEBUG, "%s(%p), %s -> %i, error: %i",
540 __func__, job, unit_name(u), job->target_state, rc);
[3f7e1f24]541 goto fail;
[4fe7fcb]542 }
543
[8432ae1]544 /*
545 * job_check deletes reference, we want job to remain to caller, thus
546 * add one dummy ref
547 */
548 job_add_ref(job);
[3f7e1f24]549 job_check(job->unit, job);
550 return;
[4fe7fcb]551
[3f7e1f24]552fail:
553 job->retval = JOB_FAILED;
554 job_finish(job);
[4fe7fcb]555}
556
[3f7e1f24]557/** Unblocks blocked jobs and notify observers
558 *
559 * @param[in] job job with defined return value
560 */
561void job_finish(job_t *job)
[694253c]562{
[3f7e1f24]563 assert(job->state != JOB_FINISHED);
564 assert(job->retval != JOB_UNDEFINED_);
[dba056b]565 assert(!job->unit->job || job->unit->job == job);
[3f7e1f24]566
[dba056b]567 sysman_log(LVL_DEBUG2, "%s(%p) %s ret %i, ref %i",
568 __func__, job, unit_name(job->unit), job->retval,
569 atomic_get(&job->refcnt));
[3f7e1f24]570
571 job->state = JOB_FINISHED;
[694253c]572
[8432ae1]573 /* First remove references, then clear the array */
[72c8f77]574 assert(job->blocked_jobs.size == job->blocked_jobs_count);
575 dyn_array_foreach(job->blocked_jobs, job_t *, job_it) {
[3f7e1f24]576 job_unblock(*job_it, job);
[694253c]577 }
[8432ae1]578 dyn_array_clear(&job->blocked_jobs);
[694253c]579
[dba056b]580 /* Add reference for event handler */
581 if (job->unit->job == NULL) {
582 job_add_ref(job);
583 } else {
584 /* Pass reference from unit */
585 job->unit->job = NULL;
586 }
[5559712]587 sysman_raise_event(&sysman_event_job_finished, job);
[694253c]588}
[3f7e1f24]589
Note: See TracBrowser for help on using the repository browser.