source: mainline/uspace/srv/sysman/job.c@ 4fe7fcb

Last change on this file since 4fe7fcb was 4fe7fcb, checked in by Matthieu Riolo <matthieu.riolo@…>, 6 years ago

Transform unit dependencies to job ordering

  • Property mode set to 100644
File size: 4.8 KB
Line 
1#include <adt/list.h>
2#include <assert.h>
3#include <errno.h>
4#include <fibril.h>
5#include <fibril_synch.h>
6#include <stdio.h>
7#include <stdlib.h>
8
9#include "job.h"
10#include "unit.h"
11
12static list_t job_queue;
13static fibril_mutex_t job_queue_mtx;
14static fibril_condvar_t job_queue_cv;
15
16static void job_destroy(job_t **);
17
18
19static int job_run_start(job_t *job)
20{
21 unit_t *unit = job->unit;
22
23 int rc = unit_start(unit);
24 if (rc != EOK) {
25 return rc;
26 }
27
28 fibril_mutex_lock(&unit->state_mtx);
29 while (unit->state != STATE_STARTED) {
30 fibril_condvar_wait(&unit->state_cv, &unit->state_mtx);
31 }
32 fibril_mutex_unlock(&unit->state_mtx);
33
34 // TODO react to failed state
35 return EOK;
36}
37
38static int job_runner(void *arg)
39{
40 job_t *job = (job_t *)arg;
41
42 int retval = EOK;
43
44 /* Wait for previous jobs */
45 list_foreach(job->blocking_jobs, link, job_link_t, jl) {
46 retval = job_wait(jl->job);
47 if (retval != EOK) {
48 break;
49 }
50 }
51
52 if (retval != EOK) {
53 goto finish;
54 }
55
56 /* Run the job itself */
57 fibril_mutex_lock(&job->state_mtx);
58 job->state = JOB_RUNNING;
59 fibril_condvar_broadcast(&job->state_cv);
60 fibril_mutex_unlock(&job->state_mtx);
61
62 switch (job->type) {
63 case JOB_START:
64 retval = job_run_start(job);
65 break;
66 default:
67 assert(false);
68 }
69
70
71finish:
72 fibril_mutex_lock(&job->state_mtx);
73 job->state = JOB_FINISHED;
74 job->retval = retval;
75 fibril_condvar_broadcast(&job->state_cv);
76 fibril_mutex_unlock(&job->state_mtx);
77
78 job_del_ref(&job);
79
80 return EOK;
81}
82
83static int job_dispatcher(void *arg)
84{
85 fibril_mutex_lock(&job_queue_mtx);
86 while (1) {
87 while (list_empty(&job_queue)) {
88 fibril_condvar_wait(&job_queue_cv, &job_queue_mtx);
89 }
90
91 link_t *link = list_first(&job_queue);
92 assert(link);
93 list_remove(link);
94
95 /*
96 * Note that possible use of fibril pool must hold invariant
97 * that job is started asynchronously. In the case there exists
98 * circular dependency between jobs, it may result in a deadlock.
99 */
100 job_t *job = list_get_instance(link, job_t, link);
101 fid_t runner_fibril = fibril_create(job_runner, job);
102 fibril_add_ready(runner_fibril);
103 }
104
105 fibril_mutex_unlock(&job_queue_mtx);
106 return EOK;
107}
108
109void job_queue_init()
110{
111 list_initialize(&job_queue);
112 fibril_mutex_initialize(&job_queue_mtx);
113 fibril_condvar_initialize(&job_queue_cv);
114
115 fid_t dispatcher_fibril = fibril_create(job_dispatcher, NULL);
116 fibril_add_ready(dispatcher_fibril);
117}
118
119int job_queue_jobs(list_t *jobs)
120{
121 fibril_mutex_lock(&job_queue_mtx);
122
123 /* Check consistency with queue. */
124 list_foreach(*jobs, link, job_t, new_job) {
125 list_foreach(job_queue, link, job_t, queued_job) {
126 /*
127 * Currently we have strict strategy not permitting
128 * multiple jobs for one unit in the queue.
129 */
130 if (new_job->unit == queued_job->unit) {
131 return EEXIST;
132 }
133 }
134 }
135
136 /* Enqueue jobs */
137 list_foreach_safe(*jobs, cur_link, next_lin) {
138 list_remove(cur_link);
139 list_append(cur_link, &job_queue);
140 }
141
142 /* Only job dispatcher waits, it's correct to notify one only. */
143 fibril_condvar_signal(&job_queue_cv);
144 fibril_mutex_unlock(&job_queue_mtx);
145
146 return EOK;
147}
148
149/** Blocking wait for job finishing.
150 *
151 * Multiple fibrils may wait for the same job.
152 *
153 * @return Return code of the job
154 */
155int job_wait(job_t *job)
156{
157 fibril_mutex_lock(&job->state_mtx);
158 while (job->state != JOB_FINISHED) {
159 fibril_condvar_wait(&job->state_cv, &job->state_mtx);
160 }
161
162 int rc = job->retval;
163 fibril_mutex_unlock(&job->state_mtx);
164
165 return rc;
166}
167
168void job_add_ref(job_t *job)
169{
170 atomic_inc(&job->refcnt);
171}
172
173void job_del_ref(job_t **job_ptr)
174{
175 job_t *job = *job_ptr;
176 if (job == NULL) {
177 return;
178 }
179
180 assert(atomic_get(&job->refcnt) > 0);
181 if (atomic_predec(&job->refcnt) == 0) {
182 job_destroy(job_ptr);
183 }
184}
185
186static void job_init(job_t *job, job_type_t type)
187{
188 assert(job);
189
190 link_initialize(&job->link);
191 list_initialize(&job->blocking_jobs);
192
193 /* Start with one reference for the creator */
194 atomic_set(&job->refcnt, 1);
195
196 job->type = type;
197 job->unit = NULL;
198
199 job->state = JOB_WAITING;
200 fibril_mutex_initialize(&job->state_mtx);
201 fibril_condvar_initialize(&job->state_cv);
202}
203
204job_t *job_create(job_type_t type)
205{
206 job_t *job = malloc(sizeof(job_t));
207 if (job != NULL) {
208 job_init(job, type);
209 }
210
211 return job;
212}
213
214int job_add_blocking_job(job_t *job, job_t *blocking_job)
215{
216 job_link_t *job_link = malloc(sizeof(job_link_t));
217 if (job_link == NULL) {
218 return ENOMEM;
219 }
220
221 link_initialize(&job_link->link);
222 list_append(&job_link->link, &job->blocking_jobs);
223
224 job_link->job = blocking_job;
225 job_add_ref(blocking_job);
226
227 return EOK;
228}
229
230static void job_destroy(job_t **job_ptr)
231{
232 job_t *job = *job_ptr;
233 if (job == NULL) {
234 return;
235 }
236
237 list_foreach_safe(job->blocking_jobs, cur_link, next_link) {
238 job_link_t *jl = list_get_instance(cur_link, job_link_t, link);
239 list_remove(cur_link);
240 job_del_ref(&jl->job);
241 free(jl);
242 }
243 free(job);
244
245 *job_ptr = NULL;
246}
Note: See TracBrowser for help on using the repository browser.