source: mainline/uspace/srv/sysman/job.c@ 63a3276

Last change on this file since 63a3276 was 63a3276, checked in by Matthieu Riolo <matthieu.riolo@…>, 6 years ago

sysman: Instrumented locsrv for autostart

  • also refactored unit name derivation in other brokers
  • exposee creation is not used in unit's lifecycle (failed assertion)

Conflicts:

uspace/lib/c/generic/loc.c
uspace/srv/devman/driver.c
uspace/srv/devman/drv_conn.c
uspace/srv/hid/compositor/compositor.c
uspace/srv/locsrv/locsrv.c
uspace/srv/vfs/vfs.h
uspace/srv/vfs/vfs_ops.c
uspace/srv/vfs/vfs_register.c

  • Property mode set to 100644
File size: 12.5 KB
Line 
1/*
2 * Copyright (c) 2015 Michal Koutny
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 *
9 * - Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 * - Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
14 * - The name of the author may not be used to endorse or promote products
15 * derived from this software without specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AS IS'' AND ANY EXPRESS OR
18 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
19 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
20 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
21 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
22 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
26 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27 */
28
29#include <adt/list.h>
30#include <assert.h>
31#include <errno.h>
32#include <stdlib.h>
33
34#include "configuration.h"
35#include "dep.h"
36#include "job.h"
37#include "log.h"
38#include "sysman.h"
39
40static list_t job_queue;
41
42/*
43 * Static functions
44 */
45
46static int job_add_blocked_job(job_t *blocking_job, job_t *blocked_job)
47{
48 assert(blocking_job->blocked_jobs.size ==
49 blocking_job->blocked_jobs_count);
50
51 int rc = dyn_array_append(&blocking_job->blocked_jobs, job_t *,
52 blocked_job);
53 if (rc != EOK) {
54 return ENOMEM;
55 }
56 job_add_ref(blocked_job);
57
58 blocking_job->blocked_jobs_count += 1;
59 blocked_job->blocking_jobs += 1;
60
61 return EOK;
62}
63
64/** Remove blocking_job from blocked job structure
65 *
66 * @note Caller must remove blocked_job from collection of blocked_jobs
67 */
68static void job_unblock(job_t *blocked_job, job_t *blocking_job)
69{
70 if (blocking_job->retval == JOB_FAILED) {
71 blocked_job->blocking_job_failed = true;
72 }
73 blocked_job->blocking_jobs -= 1;
74
75 job_del_ref(&blocked_job);
76}
77
78static void job_init(job_t *job, unit_t *u, unit_state_t target_state)
79{
80 assert(job);
81 assert(u);
82 assert(u->job == NULL);
83 memset(job, 0, sizeof(*job));
84
85 link_initialize(&job->job_queue);
86
87 atomic_set(&job->refcnt, 0);
88
89 job->target_state = target_state;
90 job->unit = u;
91
92 dyn_array_initialize(&job->blocked_jobs, job_t *);
93 job->blocking_jobs = 0;
94 job->blocking_job_failed = false;
95
96 job->state = JOB_EMBRYO;
97 job->retval = JOB_UNDEFINED_;
98}
99
100static bool job_eval_retval(job_t *job)
101{
102 unit_t *u = job->unit;
103
104 if (u->state == job->target_state) {
105 job->retval = JOB_OK;
106 return true;
107 } else if (u->state == STATE_FAILED) {
108 job->retval = JOB_FAILED;
109 return true;
110 } else {
111 return false;
112 }
113}
114
115static void job_check(void *object, void *data)
116{
117 unit_t *u = object;
118 job_t *job = data;
119
120 /*
121 * We have one reference from caller for our disposal,
122 * if needed, pass it to observer.
123 */
124 if (job_eval_retval(job)) {
125 job_finish(job);
126 job_del_ref(&job);
127 } else {
128 // TODO place for timeout
129 sysman_object_observer(u, &job_check, job);
130 }
131}
132
133static void job_destroy(job_t **job_ptr)
134{
135 job_t *job = *job_ptr;
136 if (job == NULL) {
137 return;
138 }
139
140 assert(!link_used(&job->job_queue));
141
142 dyn_array_foreach(job->blocked_jobs, job_t *, job_it) {
143 job_del_ref(&(*job_it));
144 }
145 dyn_array_destroy(&job->blocked_jobs);
146
147 free(job);
148 *job_ptr = NULL;
149}
150
151static bool job_is_runnable(job_t *job)
152{
153 assert(job->state == JOB_PENDING);
154 return job->blocking_jobs == 0;
155}
156
157/** Pop next runnable job
158 *
159 * @return runnable job or NULL when there's none
160 */
161static job_t *job_queue_pop_runnable(void)
162{
163 job_t *result = NULL;
164
165 /* Select first runnable job */
166 list_foreach(job_queue, job_queue, job_t, candidate) {
167 if (job_is_runnable(candidate)) {
168 result = candidate;
169 break;
170 }
171 }
172 if (result) {
173 /* Remove job from queue and pass reference to caller */
174 list_remove(&result->job_queue);
175 }
176
177 return result;
178}
179
180/** Merge two jobs together
181 *
182 * @param[in/out] trunk job that
183 * @param[in] other job that will be cleared out
184 *
185 * @return EOK on success
186 * @return error code on fail
187 */
188static int job_pre_merge(job_t *trunk, job_t *other)
189{
190 assert(trunk->unit == other->unit);
191 assert(trunk->target_state == other->target_state);
192 assert(trunk->blocked_jobs.size == trunk->blocked_jobs_count);
193 assert(other->merged_into == NULL);
194
195 int rc = dyn_array_concat(&trunk->blocked_jobs, &other->blocked_jobs);
196 if (rc != EOK) {
197 return rc;
198 }
199 dyn_array_clear(&other->blocked_jobs);
200
201 // TODO allocate observed object
202
203 other->merged_into = trunk;
204
205 return EOK;
206}
207
208static void job_finish_merge(job_t *trunk, job_t *other)
209{
210 assert(trunk->blocked_jobs.size >= trunk->blocked_jobs_count);
211 //TODO aggregate merged blocked_jobs
212 trunk->blocked_jobs_count = other->blocked_jobs.size;
213
214 /* All allocation is done in job_pre_merge, cannot fail here. */
215 int rc = sysman_move_observers(other, trunk);
216 assert(rc == EOK);
217}
218
219static void job_undo_merge(job_t *trunk)
220{
221 assert(trunk->blocked_jobs.size >= trunk->blocked_jobs_count);
222 dyn_array_clear_range(&trunk->blocked_jobs,
223 trunk->blocked_jobs_count, trunk->blocked_jobs.size);
224}
225
226/*
227 * Non-static functions
228 */
229
230void job_queue_init()
231{
232 list_initialize(&job_queue);
233}
234
235int job_queue_add_closure(dyn_array_t *closure)
236{
237 bool has_error = false;
238 int rc = EOK;
239
240 /* Check consistency with existing jobs. */
241 dyn_array_foreach(*closure, job_t *, job_it) {
242 job_t *job = *job_it;
243 job_t *other_job = job->unit->job;
244
245 if (other_job == NULL) {
246 continue;
247 }
248
249 if (other_job->target_state != job->target_state) {
250 switch (other_job->state) {
251 case JOB_RUNNING:
252 sysman_log(LVL_ERROR,
253 "Unit '%s' has already different job running.",
254 unit_name(job->unit));
255 has_error = true;
256 continue;
257 case JOB_PENDING:
258 /*
259 * Currently we have strict strategy not
260 * permitting multiple jobs for one unit in the
261 * queue at a time.
262 */
263 sysman_log(LVL_ERROR,
264 "Cannot queue multiple jobs for unit '%s'.",
265 unit_name(job->unit));
266 has_error = true;
267 continue;
268 default:
269 assert(false);
270 }
271 } else {
272 // TODO think about other options to merging
273 // (replacing, cancelling)
274 rc = job_pre_merge(other_job, job);
275 if (rc != EOK) {
276 break;
277 }
278 }
279 }
280
281 /* Aggregate merged jobs, or rollback any changes in existing jobs */
282 bool finish_merge = (rc == EOK) && !has_error;
283 dyn_array_foreach(*closure, job_t *, job_it) {
284 if ((*job_it)->merged_into == NULL) {
285 continue;
286 }
287 if (finish_merge) {
288 job_finish_merge((*job_it)->merged_into, *job_it);
289 } else {
290 job_undo_merge((*job_it)->merged_into);
291 }
292 }
293 if (has_error) {
294 return EBUSY;
295 } else if (rc != EOK) {
296 return rc;
297 }
298
299 /* Unmerged jobs are enqueued, merged are disposed */
300 dyn_array_foreach(*closure, job_t *, job_it) {
301 job_t *job = (*job_it);
302 if (job->merged_into != NULL) {
303 job_del_ref(&job);
304 continue;
305 }
306
307
308 unit_t *u = job->unit;
309 assert(u->bfs_job != NULL);
310 assert(u->job == NULL);
311 u->job = u->bfs_job;
312 u->bfs_job = NULL;
313
314
315 job->state = JOB_PENDING;
316 /* We pass reference from the closure to the queue */
317 list_append(&job->job_queue, &job_queue);
318 }
319
320 return EOK;
321}
322
323/** Process all jobs that aren't transitively blocked
324 *
325 * Job can be blocked either by another job or by an incoming event, that will
326 * be queued after this job_queue_process call.
327 *
328 * TODO Write down rules from where this function can be called, to avoid stack
329 * overflow.
330 */
331void job_queue_process(void)
332{
333 job_t *job;
334 while ((job = job_queue_pop_runnable())) {
335 job_run(job);
336 job_del_ref(&job);
337 }
338}
339
340int job_create_closure(job_t *main_job, dyn_array_t *job_closure)
341{
342 sysman_log(LVL_DEBUG2, "%s(%s)", __func__, unit_name(main_job->unit));
343 int rc;
344 list_t units_fifo;
345 list_initialize(&units_fifo);
346
347 /* Check invariant */
348 list_foreach(units, units, unit_t, u) {
349 assert(u->bfs_job == false);
350 }
351
352 unit_t *unit = main_job->unit;
353 job_add_ref(main_job);
354 unit->bfs_job = main_job;
355 list_append(&unit->bfs_link, &units_fifo);
356
357 while (!list_empty(&units_fifo)) {
358 unit = list_get_instance(list_first(&units_fifo), unit_t,
359 bfs_link);
360 list_remove(&unit->bfs_link);
361 job_t *job = unit->bfs_job;
362 assert(job != NULL);
363
364 job_add_ref(job);
365 dyn_array_append(job_closure, job_t *, job);
366
367 /*
368 * Traverse dependencies edges
369 * According to dependency type and edge direction create
370 * appropriate jobs (currently "After" only).
371 */
372 list_foreach(unit->dependencies, dependencies, unit_dependency_t, dep) {
373 unit_t *u = dep->dependency;
374 job_t *blocking_job;
375
376 if (u->bfs_job == NULL) {
377 blocking_job = job_create(u, job->target_state);
378 if (blocking_job == NULL) {
379 rc = ENOMEM;
380 goto finish;
381 }
382 /* Pass reference to unit */
383 u->bfs_job = blocking_job;
384 list_append(&u->bfs_link, &units_fifo);
385 } else {
386 blocking_job = u->bfs_job;
387 }
388
389 job_add_blocked_job(blocking_job, job);
390 }
391 }
392// sysman_log(LVL_DEBUG2, "%s(%s):", __func__, unit_name(main_job->unit));
393// dyn_array_foreach(*job_closure, job_t *, job_it) {
394// sysman_log(LVL_DEBUG2, "%s\t%s", __func__, unit_name((*job_it)->unit));
395// }
396 rc = EOK;
397
398finish:
399 /* Unreference any jobs in interrupted BFS queue */
400 list_foreach_safe(units_fifo, cur_link, next_link) {
401 unit_t *u = list_get_instance(cur_link, unit_t, bfs_link);
402 job_del_ref(&u->bfs_job);
403 list_remove(cur_link);
404 }
405
406 return rc;
407}
408
409/** Create job assigned to the unit
410 *
411 * @param[in] unit
412 * @param[in] target_state
413 *
414 * @return NULL or newly created job (there is a single refernce for the creator)
415 */
416job_t *job_create(unit_t *u, unit_state_t target_state)
417{
418 job_t *job = malloc(sizeof(job_t));
419 if (job != NULL) {
420 job_init(job, u, target_state);
421
422 /* Add one reference for the creator */
423 job_add_ref(job);
424 }
425
426 return job;
427}
428
429/** Add one reference to job
430 *
431 * Usage:
432 * - adding observer which references the job,
433 * - raising and event that references the job,
434 * - anytime any other new reference is made.
435 */
436void job_add_ref(job_t *job)
437{
438 atomic_inc(&job->refcnt);
439}
440
441/** Remove one reference from job, last remover destroys the job
442 *
443 * Usage:
444 * - inside observer callback that references the job,
445 * - inside event handler that references the job,
446 * - anytime you dispose a reference to the job.
447 */
448void job_del_ref(job_t **job_ptr)
449{
450 job_t *job = *job_ptr;
451
452 assert(job != NULL);
453 assert(atomic_get(&job->refcnt) > 0);
454 if (atomic_predec(&job->refcnt) == 0) {
455 job_destroy(job_ptr);
456 }
457}
458
459void job_run(job_t *job)
460{
461 assert(job->state == JOB_PENDING);
462
463 job->state = JOB_RUNNING;
464 unit_t *u = job->unit;
465 sysman_log(LVL_DEBUG, "%s(%p), %s -> %i",
466 __func__, job, unit_name(u), job->target_state);
467
468 /* Propagate failure */
469 if (job->blocking_job_failed) {
470 goto fail;
471 }
472
473 int rc;
474 switch (job->target_state) {
475 case STATE_STARTED:
476 // TODO put here same evaluation as in job_check
477 // goal is to have job_run "idempotent"
478 if (u->state == job->target_state) {
479 rc = EOK;
480 } else {
481 rc = unit_start(u);
482 }
483 break;
484 default:
485 // TODO implement other states
486 assert(false);
487 }
488 if (rc != EOK) {
489 //TODO here is 'rc' value "lost" (not propagated further)
490 sysman_log(LVL_DEBUG, "%s(%p), %s -> %i, error: %i",
491 __func__, job, unit_name(u), job->target_state, rc);
492 goto fail;
493 }
494
495 /*
496 * job_check deletes reference, we want job to remain to caller, thus
497 * add one dummy ref
498 */
499 job_add_ref(job);
500 job_check(job->unit, job);
501 return;
502
503fail:
504 job->retval = JOB_FAILED;
505 job_finish(job);
506}
507
508/** Unblocks blocked jobs and notify observers
509 *
510 * @param[in] job job with defined return value
511 */
512void job_finish(job_t *job)
513{
514 assert(job->state != JOB_FINISHED);
515 assert(job->retval != JOB_UNDEFINED_);
516 assert(job->unit->job == job);
517
518 sysman_log(LVL_DEBUG2, "%s(%p) %s ret %i",
519 __func__, job, unit_name(job->unit), job->retval);
520
521 job->state = JOB_FINISHED;
522
523 /* First remove references, then clear the array */
524 assert(job->blocked_jobs.size == job->blocked_jobs_count);
525 dyn_array_foreach(job->blocked_jobs, job_t *, job_it) {
526 job_unblock(*job_it, job);
527 }
528 dyn_array_clear(&job->blocked_jobs);
529
530 /*
531 * Remove job from unit and pass the reference from the unit to the
532 * event.
533 */
534 job->unit->job = NULL;
535 sysman_raise_event(&sysman_event_job_finished, job);
536}
537
Note: See TracBrowser for help on using the repository browser.