source: mainline/uspace/srv/sysman/job.c@ 63a3276

Last change on this file since 63a3276 was 63a3276, checked in by Matthieu Riolo <matthieu.riolo@…>, 6 years ago

sysman: Instrumented locsrv for autostart

  • also refactored unit name derivation in other brokers
  • exposee creation is not used in unit's lifecycle (failed assertion)

Conflicts:

uspace/lib/c/generic/loc.c
uspace/srv/devman/driver.c
uspace/srv/devman/drv_conn.c
uspace/srv/hid/compositor/compositor.c
uspace/srv/locsrv/locsrv.c
uspace/srv/vfs/vfs.h
uspace/srv/vfs/vfs_ops.c
uspace/srv/vfs/vfs_register.c

  • Property mode set to 100644
File size: 12.5 KB
RevLine 
[09a8006]1/*
2 * Copyright (c) 2015 Michal Koutny
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 *
9 * - Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 * - Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
14 * - The name of the author may not be used to endorse or promote products
15 * derived from this software without specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AS IS'' AND ANY EXPRESS OR
18 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
19 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
20 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
21 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
22 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
26 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27 */
28
[dda2602]29#include <adt/list.h>
[694253c]30#include <assert.h>
31#include <errno.h>
[dda2602]32#include <stdlib.h>
[694253c]33
[dda2602]34#include "configuration.h"
[3f7e1f24]35#include "dep.h"
[694253c]36#include "job.h"
[6efec7e3]37#include "log.h"
[3f7e1f24]38#include "sysman.h"
[694253c]39
40static list_t job_queue;
[c0c388d2]41
[3f7e1f24]42/*
43 * Static functions
44 */
[c0c388d2]45
[dda2602]46static int job_add_blocked_job(job_t *blocking_job, job_t *blocked_job)
[694253c]47{
[72c8f77]48 assert(blocking_job->blocked_jobs.size ==
49 blocking_job->blocked_jobs_count);
50
51 int rc = dyn_array_append(&blocking_job->blocked_jobs, job_t *,
[dda2602]52 blocked_job);
[694253c]53 if (rc != EOK) {
[3f7e1f24]54 return ENOMEM;
[694253c]55 }
[3f7e1f24]56 job_add_ref(blocked_job);
[8432ae1]57
[72c8f77]58 blocking_job->blocked_jobs_count += 1;
[3f7e1f24]59 blocked_job->blocking_jobs += 1;
[694253c]60
61 return EOK;
62}
63
[8432ae1]64/** Remove blocking_job from blocked job structure
65 *
66 * @note Caller must remove blocked_job from collection of blocked_jobs
67 */
68static void job_unblock(job_t *blocked_job, job_t *blocking_job)
69{
70 if (blocking_job->retval == JOB_FAILED) {
71 blocked_job->blocking_job_failed = true;
72 }
73 blocked_job->blocking_jobs -= 1;
74
75 job_del_ref(&blocked_job);
76}
77
[3f7e1f24]78static void job_init(job_t *job, unit_t *u, unit_state_t target_state)
[694253c]79{
[3f7e1f24]80 assert(job);
81 assert(u);
[dda2602]82 assert(u->job == NULL);
[5559712]83 memset(job, 0, sizeof(*job));
[694253c]84
[3f7e1f24]85 link_initialize(&job->job_queue);
[694253c]86
[8432ae1]87 atomic_set(&job->refcnt, 0);
[694253c]88
[3f7e1f24]89 job->target_state = target_state;
90 job->unit = u;
[694253c]91
[72c8f77]92 dyn_array_initialize(&job->blocked_jobs, job_t *);
[3f7e1f24]93 job->blocking_jobs = 0;
94 job->blocking_job_failed = false;
[694253c]95
[72c8f77]96 job->state = JOB_EMBRYO;
[3f7e1f24]97 job->retval = JOB_UNDEFINED_;
98}
99
100static bool job_eval_retval(job_t *job)
101{
102 unit_t *u = job->unit;
[72c8f77]103
[3f7e1f24]104 if (u->state == job->target_state) {
105 job->retval = JOB_OK;
106 return true;
107 } else if (u->state == STATE_FAILED) {
108 job->retval = JOB_FAILED;
109 return true;
110 } else {
111 return false;
[694253c]112 }
[3f7e1f24]113}
[694253c]114
[3f7e1f24]115static void job_check(void *object, void *data)
116{
117 unit_t *u = object;
118 job_t *job = data;
119
[8432ae1]120 /*
[72c8f77]121 * We have one reference from caller for our disposal,
[8432ae1]122 * if needed, pass it to observer.
123 */
[3f7e1f24]124 if (job_eval_retval(job)) {
125 job_finish(job);
[8432ae1]126 job_del_ref(&job);
[3f7e1f24]127 } else {
128 // TODO place for timeout
129 sysman_object_observer(u, &job_check, job);
130 }
131}
[694253c]132
[3f7e1f24]133static void job_destroy(job_t **job_ptr)
[694253c]134{
[3f7e1f24]135 job_t *job = *job_ptr;
136 if (job == NULL) {
137 return;
[694253c]138 }
139
[3f7e1f24]140 assert(!link_used(&job->job_queue));
[8432ae1]141
[72c8f77]142 dyn_array_foreach(job->blocked_jobs, job_t *, job_it) {
[8432ae1]143 job_del_ref(&(*job_it));
144 }
[3f7e1f24]145 dyn_array_destroy(&job->blocked_jobs);
146
147 free(job);
148 *job_ptr = NULL;
[694253c]149}
150
[5559712]151static bool job_is_runnable(job_t *job)
152{
[72c8f77]153 assert(job->state == JOB_PENDING);
154 return job->blocking_jobs == 0;
[5559712]155}
156
157/** Pop next runnable job
158 *
159 * @return runnable job or NULL when there's none
160 */
161static job_t *job_queue_pop_runnable(void)
162{
163 job_t *result = NULL;
164
165 /* Select first runnable job */
166 list_foreach(job_queue, job_queue, job_t, candidate) {
167 if (job_is_runnable(candidate)) {
168 result = candidate;
169 break;
170 }
171 }
172 if (result) {
173 /* Remove job from queue and pass reference to caller */
174 list_remove(&result->job_queue);
175 }
176
177 return result;
178}
179
[72c8f77]180/** Merge two jobs together
181 *
182 * @param[in/out] trunk job that
183 * @param[in] other job that will be cleared out
184 *
185 * @return EOK on success
186 * @return error code on fail
187 */
188static int job_pre_merge(job_t *trunk, job_t *other)
189{
190 assert(trunk->unit == other->unit);
191 assert(trunk->target_state == other->target_state);
192 assert(trunk->blocked_jobs.size == trunk->blocked_jobs_count);
193 assert(other->merged_into == NULL);
194
195 int rc = dyn_array_concat(&trunk->blocked_jobs, &other->blocked_jobs);
196 if (rc != EOK) {
197 return rc;
198 }
199 dyn_array_clear(&other->blocked_jobs);
200
201 // TODO allocate observed object
202
203 other->merged_into = trunk;
204
205 return EOK;
206}
207
208static void job_finish_merge(job_t *trunk, job_t *other)
209{
210 assert(trunk->blocked_jobs.size >= trunk->blocked_jobs_count);
211 //TODO aggregate merged blocked_jobs
212 trunk->blocked_jobs_count = other->blocked_jobs.size;
213
214 /* All allocation is done in job_pre_merge, cannot fail here. */
215 int rc = sysman_move_observers(other, trunk);
216 assert(rc == EOK);
217}
218
219static void job_undo_merge(job_t *trunk)
220{
221 assert(trunk->blocked_jobs.size >= trunk->blocked_jobs_count);
222 dyn_array_clear_range(&trunk->blocked_jobs,
223 trunk->blocked_jobs_count, trunk->blocked_jobs.size);
224}
225
[3f7e1f24]226/*
227 * Non-static functions
228 */
229
[694253c]230void job_queue_init()
231{
232 list_initialize(&job_queue);
233}
234
[72c8f77]235int job_queue_add_closure(dyn_array_t *closure)
[694253c]236{
[72c8f77]237 bool has_error = false;
238 int rc = EOK;
239
240 /* Check consistency with existing jobs. */
241 dyn_array_foreach(*closure, job_t *, job_it) {
242 job_t *job = *job_it;
243 job_t *other_job = job->unit->job;
244
245 if (other_job == NULL) {
246 continue;
247 }
248
249 if (other_job->target_state != job->target_state) {
250 switch (other_job->state) {
251 case JOB_RUNNING:
[3f7e1f24]252 sysman_log(LVL_ERROR,
[72c8f77]253 "Unit '%s' has already different job running.",
254 unit_name(job->unit));
255 has_error = true;
256 continue;
257 case JOB_PENDING:
258 /*
259 * Currently we have strict strategy not
260 * permitting multiple jobs for one unit in the
261 * queue at a time.
262 */
263 sysman_log(LVL_ERROR,
264 "Cannot queue multiple jobs for unit '%s'.",
265 unit_name(job->unit));
266 has_error = true;
267 continue;
268 default:
269 assert(false);
270 }
271 } else {
272 // TODO think about other options to merging
273 // (replacing, cancelling)
274 rc = job_pre_merge(other_job, job);
275 if (rc != EOK) {
276 break;
[694253c]277 }
278 }
279 }
280
[72c8f77]281 /* Aggregate merged jobs, or rollback any changes in existing jobs */
282 bool finish_merge = (rc == EOK) && !has_error;
283 dyn_array_foreach(*closure, job_t *, job_it) {
284 if ((*job_it)->merged_into == NULL) {
285 continue;
286 }
287 if (finish_merge) {
288 job_finish_merge((*job_it)->merged_into, *job_it);
289 } else {
290 job_undo_merge((*job_it)->merged_into);
291 }
292 }
293 if (has_error) {
294 return EBUSY;
295 } else if (rc != EOK) {
296 return rc;
297 }
298
299 /* Unmerged jobs are enqueued, merged are disposed */
300 dyn_array_foreach(*closure, job_t *, job_it) {
301 job_t *job = (*job_it);
302 if (job->merged_into != NULL) {
303 job_del_ref(&job);
304 continue;
305 }
306
307
308 unit_t *u = job->unit;
309 assert(u->bfs_job != NULL);
310 assert(u->job == NULL);
311 u->job = u->bfs_job;
312 u->bfs_job = NULL;
313
314
315 job->state = JOB_PENDING;
[8432ae1]316 /* We pass reference from the closure to the queue */
[72c8f77]317 list_append(&job->job_queue, &job_queue);
[694253c]318 }
319
320 return EOK;
321}
322
[5559712]323/** Process all jobs that aren't transitively blocked
[694253c]324 *
[5559712]325 * Job can be blocked either by another job or by an incoming event, that will
326 * be queued after this job_queue_process call.
327 *
328 * TODO Write down rules from where this function can be called, to avoid stack
329 * overflow.
[694253c]330 */
[5559712]331void job_queue_process(void)
[694253c]332{
[5559712]333 job_t *job;
334 while ((job = job_queue_pop_runnable())) {
335 job_run(job);
336 job_del_ref(&job);
[3f7e1f24]337 }
338}
339
340int job_create_closure(job_t *main_job, dyn_array_t *job_closure)
341{
[72c8f77]342 sysman_log(LVL_DEBUG2, "%s(%s)", __func__, unit_name(main_job->unit));
[3f7e1f24]343 int rc;
[dda2602]344 list_t units_fifo;
345 list_initialize(&units_fifo);
[3f7e1f24]346
[72c8f77]347 /* Check invariant */
[dda2602]348 list_foreach(units, units, unit_t, u) {
[72c8f77]349 assert(u->bfs_job == false);
[dda2602]350 }
[5559712]351
[dda2602]352 unit_t *unit = main_job->unit;
[72c8f77]353 job_add_ref(main_job);
354 unit->bfs_job = main_job;
[dda2602]355 list_append(&unit->bfs_link, &units_fifo);
356
357 while (!list_empty(&units_fifo)) {
358 unit = list_get_instance(list_first(&units_fifo), unit_t,
359 bfs_link);
360 list_remove(&unit->bfs_link);
[72c8f77]361 job_t *job = unit->bfs_job;
362 assert(job != NULL);
[3f7e1f24]363
[dda2602]364 job_add_ref(job);
[72c8f77]365 dyn_array_append(job_closure, job_t *, job);
[dda2602]366
367 /*
368 * Traverse dependencies edges
[72c8f77]369 * According to dependency type and edge direction create
370 * appropriate jobs (currently "After" only).
[dda2602]371 */
372 list_foreach(unit->dependencies, dependencies, unit_dependency_t, dep) {
373 unit_t *u = dep->dependency;
374 job_t *blocking_job;
[72c8f77]375
376 if (u->bfs_job == NULL) {
[dda2602]377 blocking_job = job_create(u, job->target_state);
378 if (blocking_job == NULL) {
379 rc = ENOMEM;
380 goto finish;
381 }
[72c8f77]382 /* Pass reference to unit */
383 u->bfs_job = blocking_job;
[dda2602]384 list_append(&u->bfs_link, &units_fifo);
[72c8f77]385 } else {
386 blocking_job = u->bfs_job;
[3f7e1f24]387 }
[72c8f77]388
[dda2602]389 job_add_blocked_job(blocking_job, job);
[3f7e1f24]390 }
391 }
[72c8f77]392// sysman_log(LVL_DEBUG2, "%s(%s):", __func__, unit_name(main_job->unit));
393// dyn_array_foreach(*job_closure, job_t *, job_it) {
394// sysman_log(LVL_DEBUG2, "%s\t%s", __func__, unit_name((*job_it)->unit));
395// }
[3f7e1f24]396 rc = EOK;
397
398finish:
[72c8f77]399 /* Unreference any jobs in interrupted BFS queue */
400 list_foreach_safe(units_fifo, cur_link, next_link) {
401 unit_t *u = list_get_instance(cur_link, unit_t, bfs_link);
402 job_del_ref(&u->bfs_job);
403 list_remove(cur_link);
[dda2602]404 }
[72c8f77]405
[694253c]406 return rc;
407}
408
[dda2602]409/** Create job assigned to the unit
410 *
[72c8f77]411 * @param[in] unit
[dda2602]412 * @param[in] target_state
413 *
[72c8f77]414 * @return NULL or newly created job (there is a single refernce for the creator)
[dda2602]415 */
[3f7e1f24]416job_t *job_create(unit_t *u, unit_state_t target_state)
417{
418 job_t *job = malloc(sizeof(job_t));
419 if (job != NULL) {
420 job_init(job, u, target_state);
[8432ae1]421
[dda2602]422 /* Add one reference for the creator */
[8432ae1]423 job_add_ref(job);
[3f7e1f24]424 }
425
426 return job;
427}
428
[8432ae1]429/** Add one reference to job
430 *
431 * Usage:
432 * - adding observer which references the job,
433 * - raising and event that references the job,
434 * - anytime any other new reference is made.
435 */
[c0c388d2]436void job_add_ref(job_t *job)
437{
438 atomic_inc(&job->refcnt);
439}
440
[8432ae1]441/** Remove one reference from job, last remover destroys the job
442 *
443 * Usage:
444 * - inside observer callback that references the job,
445 * - inside event handler that references the job,
446 * - anytime you dispose a reference to the job.
447 */
[c0c388d2]448void job_del_ref(job_t **job_ptr)
449{
450 job_t *job = *job_ptr;
451
[8432ae1]452 assert(job != NULL);
[c0c388d2]453 assert(atomic_get(&job->refcnt) > 0);
454 if (atomic_predec(&job->refcnt) == 0) {
455 job_destroy(job_ptr);
456 }
457}
458
[3f7e1f24]459void job_run(job_t *job)
[694253c]460{
[72c8f77]461 assert(job->state == JOB_PENDING);
[694253c]462
[72c8f77]463 job->state = JOB_RUNNING;
[3f7e1f24]464 unit_t *u = job->unit;
[8432ae1]465 sysman_log(LVL_DEBUG, "%s(%p), %s -> %i",
466 __func__, job, unit_name(u), job->target_state);
[694253c]467
[3f7e1f24]468 /* Propagate failure */
469 if (job->blocking_job_failed) {
470 goto fail;
[694253c]471 }
472
[3f7e1f24]473 int rc;
474 switch (job->target_state) {
475 case STATE_STARTED:
[72c8f77]476 // TODO put here same evaluation as in job_check
477 // goal is to have job_run "idempotent"
478 if (u->state == job->target_state) {
479 rc = EOK;
480 } else {
481 rc = unit_start(u);
482 }
[3f7e1f24]483 break;
484 default:
485 // TODO implement other states
486 assert(false);
487 }
488 if (rc != EOK) {
[63a3276]489 //TODO here is 'rc' value "lost" (not propagated further)
[dda2602]490 sysman_log(LVL_DEBUG, "%s(%p), %s -> %i, error: %i",
491 __func__, job, unit_name(u), job->target_state, rc);
[3f7e1f24]492 goto fail;
[4fe7fcb]493 }
494
[8432ae1]495 /*
496 * job_check deletes reference, we want job to remain to caller, thus
497 * add one dummy ref
498 */
499 job_add_ref(job);
[3f7e1f24]500 job_check(job->unit, job);
501 return;
[4fe7fcb]502
[3f7e1f24]503fail:
504 job->retval = JOB_FAILED;
505 job_finish(job);
[4fe7fcb]506}
507
[3f7e1f24]508/** Unblocks blocked jobs and notify observers
509 *
510 * @param[in] job job with defined return value
511 */
512void job_finish(job_t *job)
[694253c]513{
[3f7e1f24]514 assert(job->state != JOB_FINISHED);
515 assert(job->retval != JOB_UNDEFINED_);
[dda2602]516 assert(job->unit->job == job);
[3f7e1f24]517
[72c8f77]518 sysman_log(LVL_DEBUG2, "%s(%p) %s ret %i",
[8432ae1]519 __func__, job, unit_name(job->unit), job->retval);
[3f7e1f24]520
521 job->state = JOB_FINISHED;
[694253c]522
[8432ae1]523 /* First remove references, then clear the array */
[72c8f77]524 assert(job->blocked_jobs.size == job->blocked_jobs_count);
525 dyn_array_foreach(job->blocked_jobs, job_t *, job_it) {
[3f7e1f24]526 job_unblock(*job_it, job);
[694253c]527 }
[8432ae1]528 dyn_array_clear(&job->blocked_jobs);
[694253c]529
[dda2602]530 /*
531 * Remove job from unit and pass the reference from the unit to the
532 * event.
533 */
534 job->unit->job = NULL;
[5559712]535 sysman_raise_event(&sysman_event_job_finished, job);
[694253c]536}
[3f7e1f24]537
Note: See TracBrowser for help on using the repository browser.