source: mainline/uspace/srv/sysman/job.c@ 9532981

Last change on this file since 9532981 was 9532981, checked in by Matthieu Riolo <matthieu.riolo@…>, 6 years ago

sysman: Rename dependency to edge (more generic)

  • Property mode set to 100644
File size: 14.0 KB
Line 
1/*
2 * Copyright (c) 2015 Michal Koutny
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 *
9 * - Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 * - Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
14 * - The name of the author may not be used to endorse or promote products
15 * derived from this software without specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AS IS'' AND ANY EXPRESS OR
18 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
19 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
20 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
21 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
22 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
26 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27 */
28
29#include <adt/list.h>
30#include <assert.h>
31#include <errno.h>
32#include <stdlib.h>
33
34#include "repo.h"
35#include "edge.h"
36#include "job.h"
37#include "log.h"
38#include "sysman.h"
39
40static list_t job_queue;
41
42/*
43 * Static functions
44 */
45
46static int job_add_blocked_job(job_t *blocking_job, job_t *blocked_job)
47{
48 assert(blocking_job->blocked_jobs.size ==
49 blocking_job->blocked_jobs_count);
50
51 int rc = dyn_array_append(&blocking_job->blocked_jobs, job_t *,
52 blocked_job);
53 if (rc != EOK) {
54 return ENOMEM;
55 }
56 job_add_ref(blocked_job);
57
58 blocking_job->blocked_jobs_count += 1;
59 blocked_job->blocking_jobs += 1;
60
61 return EOK;
62}
63
64/** Remove blocking_job from blocked job structure
65 *
66 * @note Caller must remove blocked_job from collection of blocked_jobs
67 */
68static void job_unblock(job_t *blocked_job, job_t *blocking_job)
69{
70 if (blocking_job->retval == JOB_FAILED) {
71 blocked_job->blocking_job_failed = true;
72 }
73 blocked_job->blocking_jobs -= 1;
74
75 job_del_ref(&blocked_job);
76}
77
78static void job_init(job_t *job, unit_t *u, unit_state_t target_state)
79{
80 assert(job);
81 assert(u);
82 memset(job, 0, sizeof(*job));
83
84 link_initialize(&job->job_queue);
85
86 atomic_set(&job->refcnt, 0);
87
88 job->target_state = target_state;
89 job->unit = u;
90
91 dyn_array_initialize(&job->blocked_jobs, job_t *);
92 job->blocking_jobs = 0;
93 job->blocking_job_failed = false;
94
95 job->state = JOB_EMBRYO;
96 job->retval = JOB_UNDEFINED_;
97}
98
99static bool job_eval_retval(job_t *job)
100{
101 unit_t *u = job->unit;
102
103 if (u->state == job->target_state) {
104 job->retval = JOB_OK;
105 return true;
106 } else if (u->state == STATE_FAILED) {
107 job->retval = JOB_FAILED;
108 return true;
109 } else {
110 return false;
111 }
112}
113
114static void job_check(void *object, void *data)
115{
116 unit_t *u = object;
117 job_t *job = data;
118
119 /*
120 * We have one reference from caller for our disposal,
121 * if needed, pass it to observer.
122 */
123 if (job_eval_retval(job)) {
124 job_finish(job);
125 job_del_ref(&job);
126 } else {
127 // TODO place for timeout
128 sysman_object_observer(u, &job_check, job);
129 }
130}
131
132static void job_destroy(job_t **job_ptr)
133{
134 job_t *job = *job_ptr;
135 if (job == NULL) {
136 return;
137 }
138
139 assert(!link_used(&job->job_queue));
140
141 dyn_array_foreach(job->blocked_jobs, job_t *, job_it) {
142 job_del_ref(&(*job_it));
143 }
144 dyn_array_destroy(&job->blocked_jobs);
145
146 free(job);
147 *job_ptr = NULL;
148}
149
150static bool job_is_runnable(job_t *job)
151{
152 assert(job->state == JOB_PENDING);
153 return job->blocking_jobs == 0;
154}
155
156/** Pop next runnable job
157 *
158 * @return runnable job or NULL when there's none
159 */
160static job_t *job_queue_pop_runnable(void)
161{
162 job_t *result = NULL;
163
164 /* Select first runnable job */
165 list_foreach(job_queue, job_queue, job_t, candidate) {
166 if (job_is_runnable(candidate)) {
167 result = candidate;
168 break;
169 }
170 }
171 if (result) {
172 /* Remove job from queue and pass reference to caller */
173 list_remove(&result->job_queue);
174 }
175
176 return result;
177}
178
179/** Add multiple references to job
180 *
181 * Non-atomicity doesn't mind as long as individual increments are atomic.
182 *
183 * @note Function is not exported as other modules shouldn't need it.
184 */
185static inline void job_add_refs(job_t *job, size_t refs)
186{
187 for (size_t i = 0; i < refs; ++i) {
188 job_add_ref(job);
189 }
190}
191
192/** Delete multiple references to job
193 *
194 * Behavior of concurrent runs with job_add_refs aren't specified.
195 */
196static inline void job_del_refs(job_t **job_ptr, size_t refs)
197{
198 for (size_t i = 0; i < refs; ++i) {
199 job_del_ref(job_ptr);
200 }
201}
202
203/** Merge two jobs together
204 *
205 * @param[in/out] trunk job that
206 * @param[in] other job that will be cleared out
207 *
208 * @return EOK on success
209 * @return error code on fail
210 */
211static int job_pre_merge(job_t *trunk, job_t *other)
212{
213 assert(trunk->unit == other->unit);
214 assert(trunk->target_state == other->target_state);
215 assert(trunk->blocked_jobs.size == trunk->blocked_jobs_count);
216 assert(other->merged_into == NULL);
217
218 int rc = dyn_array_concat(&trunk->blocked_jobs, &other->blocked_jobs);
219 if (rc != EOK) {
220 return rc;
221 }
222 dyn_array_clear(&other->blocked_jobs);
223
224 // TODO allocate observed object
225
226 other->merged_into = trunk;
227
228 return EOK;
229}
230
231static void job_finish_merge(job_t *trunk, job_t *other)
232{
233 assert(trunk->blocked_jobs.size >= trunk->blocked_jobs_count);
234 //TODO aggregate merged blocked_jobs
235 trunk->blocked_jobs_count = other->blocked_jobs.size;
236
237 /*
238 * Note, the sysman_move_observers cannot fail here sice all necessary
239 * allocation is done in job_pre_merge.
240 */
241 size_t observers_refs = sysman_observers_count(other);
242 int rc = sysman_move_observers(other, trunk);
243 assert(rc == EOK);
244
245 /* When we move observers, don't forget to pass their references too. */
246 job_add_refs(trunk, observers_refs);
247 job_del_refs(&other, observers_refs);
248}
249
250static void job_undo_merge(job_t *trunk)
251{
252 assert(trunk->blocked_jobs.size >= trunk->blocked_jobs_count);
253 dyn_array_clear_range(&trunk->blocked_jobs,
254 trunk->blocked_jobs_count, trunk->blocked_jobs.size);
255}
256
257/*
258 * Non-static functions
259 */
260
261void job_queue_init()
262{
263 list_initialize(&job_queue);
264}
265
266/** Consistenly add jobs to the queue
267 *
268 * @param[in/out] closure jobs closure, on success it's emptied, otherwise
269 * you should take care of remaining jobs
270 *
271 * @return EOK on success
272 * @return EBUSY when any job in closure is conflicting
273 */
274int job_queue_add_closure(dyn_array_t *closure)
275{
276 bool has_error = false;
277 int rc = EOK;
278
279 /* Check consistency with existing jobs. */
280 dyn_array_foreach(*closure, job_t *, job_it) {
281 job_t *job = *job_it;
282 job_t *other_job = job->unit->job;
283
284 if (other_job == NULL) {
285 continue;
286 }
287
288 if (other_job->target_state != job->target_state) {
289 switch (other_job->state) {
290 case JOB_RUNNING:
291 sysman_log(LVL_ERROR,
292 "Unit '%s' has already different job running.",
293 unit_name(job->unit));
294 has_error = true;
295 continue;
296 case JOB_PENDING:
297 /*
298 * Currently we have strict strategy not
299 * permitting multiple jobs for one unit in the
300 * queue at a time.
301 */
302 sysman_log(LVL_ERROR,
303 "Cannot queue multiple jobs for unit '%s'.",
304 unit_name(job->unit));
305 has_error = true;
306 continue;
307 default:
308 assert(false);
309 }
310 } else {
311 // TODO think about other options to merging
312 // (replacing, cancelling)
313 rc = job_pre_merge(other_job, job);
314 if (rc != EOK) {
315 break;
316 }
317 }
318 }
319
320 /* Aggregate merged jobs, or rollback any changes in existing jobs */
321 bool finish_merge = (rc == EOK) && !has_error;
322 dyn_array_foreach(*closure, job_t *, job_it) {
323 if ((*job_it)->merged_into == NULL) {
324 continue;
325 }
326 if (finish_merge) {
327 job_finish_merge((*job_it)->merged_into, *job_it);
328 } else {
329 job_undo_merge((*job_it)->merged_into);
330 }
331 }
332 if (has_error) {
333 return EBUSY;
334 } else if (rc != EOK) {
335 return rc;
336 }
337
338 /* Unmerged jobs are enqueued, merged are disposed */
339 dyn_array_foreach(*closure, job_t *, job_it) {
340 job_t *job = (*job_it);
341 if (job->merged_into != NULL) {
342 job_del_ref(&job);
343 continue;
344 }
345
346
347 unit_t *u = job->unit;
348 assert(u->job == NULL);
349 /* Pass reference from the closure to the unit */
350 u->job = job;
351
352 /* Enqueue job (new reference) */
353 job->state = JOB_PENDING;
354 job_add_ref(job);
355 list_append(&job->job_queue, &job_queue);
356 }
357
358 /* We've stolen references from the closure, so erase it */
359 dyn_array_clear(closure);
360
361 return EOK;
362}
363
364/** Process all jobs that aren't transitively blocked
365 *
366 * Job can be blocked either by another job or by an incoming event, that will
367 * be queued after this job_queue_process call.
368 *
369 * TODO Write down rules from where this function can be called, to avoid stack
370 * overflow.
371 */
372void job_queue_process(void)
373{
374 job_t *job;
375 while ((job = job_queue_pop_runnable())) {
376 job_run(job);
377 job_del_ref(&job);
378 }
379}
380
381int job_create_closure(job_t *main_job, dyn_array_t *job_closure)
382{
383 sysman_log(LVL_DEBUG2, "%s(%s)", __func__, unit_name(main_job->unit));
384 int rc;
385 list_t units_fifo;
386 list_initialize(&units_fifo);
387
388 /* Check invariant */
389 list_foreach(units, units, unit_t, u) {
390 assert(u->bfs_job == NULL);
391 }
392
393 unit_t *unit = main_job->unit;
394 job_add_ref(main_job);
395 unit->bfs_job = main_job;
396 list_append(&unit->bfs_link, &units_fifo);
397
398 while (!list_empty(&units_fifo)) {
399 unit = list_get_instance(list_first(&units_fifo), unit_t,
400 bfs_link);
401 list_remove(&unit->bfs_link);
402 job_t *job = unit->bfs_job;
403 assert(job != NULL);
404
405 job_add_ref(job);
406 dyn_array_append(job_closure, job_t *, job);
407
408 /*
409 * Traverse dependencies edges
410 * According to dependency type and edge direction create
411 * appropriate jobs (currently "After" only).
412 */
413 list_foreach(unit->edges_out, edges_out, unit_edge_t, e) {
414 unit_t *u = e->output;
415 job_t *blocking_job;
416
417 if (u->bfs_job == NULL) {
418 blocking_job = job_create(u, job->target_state);
419 if (blocking_job == NULL) {
420 rc = ENOMEM;
421 goto finish;
422 }
423 /* Pass reference to unit */
424 u->bfs_job = blocking_job;
425 list_append(&u->bfs_link, &units_fifo);
426 } else {
427 blocking_job = u->bfs_job;
428 }
429
430 job_add_blocked_job(blocking_job, job);
431 }
432 }
433 sysman_log(LVL_DEBUG2, "%s(%s):", __func__, unit_name(main_job->unit));
434 dyn_array_foreach(*job_closure, job_t *, job_it) {
435 sysman_log(LVL_DEBUG2, "%s\t%s, refs: %u", __func__,
436 unit_name((*job_it)->unit), atomic_get(&(*job_it)->refcnt));
437 }
438 rc = EOK;
439
440finish:
441 /* Unreference any jobs in interrupted BFS queue */
442 list_foreach_safe(units_fifo, cur_link, next_link) {
443 unit_t *u = list_get_instance(cur_link, unit_t, bfs_link);
444 job_del_ref(&u->bfs_job);
445 list_remove(cur_link);
446 }
447
448 /* Clean after ourselves (BFS tag jobs) */
449 dyn_array_foreach(*job_closure, job_t *, job_it) {
450 assert(*job_it == (*job_it)->unit->bfs_job);
451 job_del_ref(&(*job_it)->unit->bfs_job);
452 (*job_it)->unit->bfs_job = NULL;
453 }
454
455 return rc;
456}
457
458/** Create job assigned to the unit
459 *
460 * @param[in] unit
461 * @param[in] target_state
462 *
463 * @return NULL or newly created job (there is a single refernce for the creator)
464 */
465job_t *job_create(unit_t *u, unit_state_t target_state)
466{
467 job_t *job = malloc(sizeof(job_t));
468 if (job != NULL) {
469 job_init(job, u, target_state);
470
471 /* Add one reference for the creator */
472 job_add_ref(job);
473 }
474
475 return job;
476}
477
478/** Add one reference to job
479 *
480 * Usage:
481 * - adding observer which references the job,
482 * - raising and event that references the job,
483 * - anytime any other new reference is made.
484 */
485void job_add_ref(job_t *job)
486{
487 atomic_inc(&job->refcnt);
488}
489
490/** Remove one reference from job, last remover destroys the job
491 *
492 * Usage:
493 * - inside observer callback that references the job,
494 * - inside event handler that references the job,
495 * - anytime you dispose a reference to the job.
496 */
497void job_del_ref(job_t **job_ptr)
498{
499 job_t *job = *job_ptr;
500
501 assert(job != NULL);
502 assert(atomic_get(&job->refcnt) > 0);
503 if (atomic_predec(&job->refcnt) == 0) {
504 job_destroy(job_ptr);
505 }
506}
507
508void job_run(job_t *job)
509{
510 assert(job->state == JOB_PENDING);
511
512 job->state = JOB_RUNNING;
513 unit_t *u = job->unit;
514 sysman_log(LVL_DEBUG, "%s(%p), %s -> %i",
515 __func__, job, unit_name(u), job->target_state);
516
517 /* Propagate failure */
518 if (job->blocking_job_failed) {
519 goto fail;
520 }
521
522 int rc;
523 switch (job->target_state) {
524 case STATE_STARTED:
525 // TODO put here same evaluation as in job_check
526 // goal is to have job_run "idempotent"
527 if (u->state == job->target_state) {
528 rc = EOK;
529 } else {
530 rc = unit_start(u);
531 }
532 break;
533 default:
534 // TODO implement other states
535 assert(false);
536 }
537 if (rc != EOK) {
538 //TODO here is 'rc' value "lost" (not propagated further)
539 sysman_log(LVL_DEBUG, "%s(%p), %s -> %i, error: %i",
540 __func__, job, unit_name(u), job->target_state, rc);
541 goto fail;
542 }
543
544 /*
545 * job_check deletes reference, we want job to remain to caller, thus
546 * add one dummy ref
547 */
548 job_add_ref(job);
549 job_check(job->unit, job);
550 return;
551
552fail:
553 job->retval = JOB_FAILED;
554 job_finish(job);
555}
556
557/** Unblocks blocked jobs and notify observers
558 *
559 * @param[in] job job with defined return value
560 */
561void job_finish(job_t *job)
562{
563 assert(job->state != JOB_FINISHED);
564 assert(job->retval != JOB_UNDEFINED_);
565 assert(!job->unit->job || job->unit->job == job);
566
567 sysman_log(LVL_DEBUG2, "%s(%p) %s ret %i, ref %i",
568 __func__, job, unit_name(job->unit), job->retval,
569 atomic_get(&job->refcnt));
570
571 job->state = JOB_FINISHED;
572
573 /* First remove references, then clear the array */
574 assert(job->blocked_jobs.size == job->blocked_jobs_count);
575 dyn_array_foreach(job->blocked_jobs, job_t *, job_it) {
576 job_unblock(*job_it, job);
577 }
578 dyn_array_clear(&job->blocked_jobs);
579
580 /* Add reference for event handler */
581 if (job->unit->job == NULL) {
582 job_add_ref(job);
583 } else {
584 /* Pass reference from unit */
585 job->unit->job = NULL;
586 }
587 sysman_raise_event(&sysman_event_job_finished, job);
588}
589
Note: See TracBrowser for help on using the repository browser.