source: mainline/uspace/lib/cpp/include/__bits/thread/future.hpp@ 627dc41

lfn serial ticket/834-toolchain-update topic/msim-upgrade topic/simplify-dev-export
Last change on this file since 627dc41 was 627dc41, checked in by Jaroslav Jindrak <dzejrou@…>, 6 years ago

cpp: add deferred/async shared state for asynchronous function execution and implement async

  • Property mode set to 100644
File size: 24.0 KB
Line 
1/*
2 * Copyright (c) 2019 Jaroslav Jindrak
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 *
9 * - Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 * - Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
14 * - The name of the author may not be used to endorse or promote products
15 * derived from this software without specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
18 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
19 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
20 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
21 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
22 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
26 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27 */
28
29#ifndef LIBCPP_BITS_THREAD_FUTURE
30#define LIBCPP_BITS_THREAD_FUTURE
31
32#include <__bits/functional/function.hpp>
33#include <__bits/functional/invoke.hpp>
34#include <__bits/refcount_obj.hpp>
35#include <__bits/thread/threading.hpp>
36#include <cassert>
37#include <memory>
38#include <system_error>
39#include <thread>
40#include <tuple>
41#include <type_traits>
42#include <utility>
43
44namespace std
45{
46 /**
47 * 30.6, futures:
48 */
49
50 enum class future_errc
51 { // The 5001 start is to not collide with system_error's codes.
52 broken_promise = 5001,
53 future_already_retrieved,
54 promise_already_satisfied,
55 no_state
56 };
57
58 enum class launch
59 {
60 async,
61 deferred
62 };
63
64 enum class future_status
65 {
66 ready,
67 timeout,
68 deferred
69 };
70
71 /**
72 * 30.6.2, error handling:
73 */
74
75 template<>
76 struct is_error_code_enum<future_errc>: true_type
77 { /* DUMMY BODY */ };
78
79 error_code make_error_code(future_errc) noexcept;
80 error_condition make_error_condition(future_errc) noexcept;
81
82 const error_category& future_category() noexcept;
83
84 /**
85 * 30.6.3, class future_error:
86 */
87
88 class future_error: public logic_error
89 {
90 public:
91 future_error(error_code ec);
92
93 const error_code& code() const noexcept;
94 const char* what() const noexcept;
95
96 private:
97 error_code code_;
98 };
99
100 /**
101 * 30.6.4, shared state:
102 */
103
104 namespace aux
105 {
106 template<class R>
107 class shared_state: public aux::refcount_obj
108 {
109 public:
110 shared_state()
111 : mutex_{}, condvar_{}, value_{}, value_set_{false},
112 exception_{}, has_exception_{false}
113 {
114 threading::mutex::init(mutex_);
115 threading::condvar::init(condvar_);
116 }
117
118 void destroy() override
119 {
120 /**
121 * Note: No need to act in this case, async shared
122 * state is the object that needs to sometimes
123 * invoke its payload.
124 */
125 }
126
127 void set_value(const R& val, bool set)
128 {
129 /**
130 * Note: This is the 'mark ready' move described
131 * in 30.6.4 (6).
132 */
133
134 aux::threading::mutex::lock(mutex_);
135 value_ = val;
136 value_set_ = set;
137 aux::threading::mutex::unlock(mutex_);
138
139 aux::threading::condvar::broadcast(condvar_);
140 }
141
142 void set_value(R&& val, bool set = true)
143 {
144 aux::threading::mutex::lock(mutex_);
145 value_ = std::move(val);
146 value_set_ = set;
147 aux::threading::mutex::unlock(mutex_);
148
149 aux::threading::condvar::broadcast(condvar_);
150 }
151
152 void mark_set(bool set = true) noexcept
153 {
154 value_set_ = set;
155 }
156
157 bool is_set() const noexcept
158 {
159 return value_set_;
160 }
161
162 R& get()
163 {
164 return value_;
165 }
166
167 void set_exception(exception_ptr ptr)
168 {
169 exception_ = ptr;
170 has_exception_ = true;
171 }
172
173 bool has_exception() const noexcept
174 {
175 return has_exception_;
176 }
177
178 void throw_stored_exception() const
179 {
180 // TODO: implement
181 }
182
183 /**
184 * TODO: This member function is supposed to be marked
185 * as 'const'. In such a case, however, we cannot
186 * use the underlying fibril API because these
187 * references get converted to pointers and the API
188 * does not accept e.g. 'const fibril_condvar_t*'.
189 *
190 * The same applies to the wait_for and wait_until
191 * functions.
192 */
193 virtual void wait()
194 {
195 aux::threading::mutex::lock(mutex_);
196 while (!value_set_)
197 aux::threading::condvar::wait(condvar_, mutex_);
198 aux::threading::mutex::unlock(mutex_);
199 }
200
201 template<class Rep, class Period>
202 bool wait_for(const chrono::duration<Rep, Period>& rel_time)
203 {
204 aux::threading::mutex::lock(mutex_);
205 aux::threading::condvar::wait_for(
206 condvar_, mutex_,
207 aux::threading::time::convert(rel_time)
208 );
209 aux::threading::mutex::unlock(mutex_);
210
211 return value_set_;
212 }
213
214 template<class Clock, class Duration>
215 bool wait_until(const chrono::time_point<Clock, Duration>& abs_time)
216 {
217 aux::threading::mutex::lock(mutex_);
218 aux::threading::condvar::wait_for(
219 condvar_, mutex_,
220 aux::threading::time::convert(abs_time - Clock::now())
221 );
222 aux::threading::mutex::unlock(mutex_);
223
224 return value_set_;
225 }
226
227 ~shared_state() override = default;
228
229 private:
230 aux::mutex_t mutex_;
231 aux::condvar_t condvar_;
232
233 R value_;
234 bool value_set_;
235
236 exception_ptr exception_;
237 bool has_exception_;
238 };
239
240 /**
241 * We could make one state for both async and
242 * deferred policies, but then we would be wasting
243 * memory and the only benefit would be the ability
244 * for additional implementation defined policies done
245 * directly in that state (as opposed to making new
246 * states for them).
247 *
248 * But since we have no plan (nor need) to make those,
249 * this approach seems to be the best one.
250 * TODO: Override wait_for and wait_until in both!
251 */
252
253 template<class R, class F, class... Args>
254 class async_shared_state: public shared_state<R>
255 {
256 public:
257 async_shared_state(F&& f, Args&&... args)
258 : shared_state<R>{}, thread_{}
259 {
260 thread_ = thread{
261 [=](){
262 try
263 {
264 this->set_value(invoke(f, args...));
265 }
266 catch(...) // TODO: Any exception.
267 {
268 // TODO: Store it.
269 }
270 }
271 };
272 }
273
274 void destroy() override
275 {
276 if (!this->is_set())
277 thread_.join();
278 }
279
280 void wait() override
281 {
282 if (!this->is_set())
283 thread_.join();
284 }
285
286 ~async_shared_state() override
287 {
288 destroy();
289 }
290
291 private:
292 thread thread_;
293 };
294
295 template<class R, class F, class... Args>
296 class deferred_shared_state: public shared_state<R>
297 {
298 public:
299 template<class G>
300 deferred_shared_state(G&& f, Args&&... args)
301 : shared_state<R>{}, func_{forward<F>(f)},
302 args_{forward<Args>(args)...}
303 { /* DUMMY BODY */ }
304
305 void destroy() override
306 {
307 if (!this->is_set())
308 invoke_(make_index_sequence<sizeof...(Args)>{});
309 }
310
311 void wait() override
312 {
313 // TODO: Should these be synchronized for async?
314 if (!this->is_set())
315 invoke_(make_index_sequence<sizeof...(Args)>{});
316 }
317
318 ~deferred_shared_state() override
319 {
320 destroy();
321 }
322
323 private:
324 function<R(decay_t<Args>...)> func_;
325 tuple<decay_t<Args>...> args_;
326
327 template<size_t... Is>
328 void invoke_(index_sequence<Is...>)
329 {
330 try
331 {
332 this->set_value(invoke(move(func_), get<Is>(move(args_))...));
333 }
334 catch(...)
335 {
336 // TODO: Store it.
337 }
338 }
339 };
340 }
341
342 template<class R>
343 class future;
344
345 template<class R>
346 class promise
347 {
348 public:
349 promise()
350 : state_{new aux::shared_state<R>{}}
351 { /* DUMMY BODY */ }
352
353 template<class Allocator>
354 promise(allocator_arg_t, const Allocator& a)
355 : promise{}
356 {
357 // TODO: Use the allocator.
358 }
359
360 promise(promise&& rhs) noexcept
361 : state_{}
362 {
363 state_ = rhs.state_;
364 rhs.state_ = nullptr;
365 }
366
367 promise(const promise&) = delete;
368
369 ~promise()
370 {
371 abandon_state_();
372 }
373
374 promise& operator=(promise&& rhs) noexcept
375 {
376 abandon_state_();
377 promise{std::move(rhs)}.swap(*this);
378 }
379
380 promise& operator=(const promise&) = delete;
381
382 void swap(promise& other) noexcept
383 {
384 std::swap(state_, other.state_);
385 }
386
387 future<R> get_future()
388 {
389 return future<R>{state_};
390 }
391
392 void set_value(const R& val)
393 {
394 if (!state_)
395 throw future_error{make_error_code(future_errc::no_state)};
396 if (state_->is_set())
397 {
398 throw future_error{
399 make_error_code(future_errc::promise_already_satisfied)
400 };
401 }
402
403 state_->set_value(val, true);
404 }
405
406 void set_value(R&& val)
407 {
408 if (!state_)
409 throw future_error{make_error_code(future_errc::no_state)};
410 if (state_->is_set())
411 {
412 throw future_error{
413 make_error_code(future_errc::promise_already_satisfied)
414 };
415 }
416
417 state_->set_value(std::forward<R>(val), true);
418 }
419
420 void set_exception(exception_ptr ptr)
421 {
422 assert(state_);
423
424 state_->set_exception(ptr);
425 }
426
427 void set_value_at_thread_exit(const R& val)
428 {
429 if (!state_)
430 throw future_error{make_error_code(future_errc::no_state)};
431 if (state_->is_set())
432 {
433 throw future_error{
434 make_error_code(future_errc::promise_already_satisfied)
435 };
436 }
437
438 state_->set_value(val, false);
439 // TODO: schedule it to be set as ready when thread exits
440 }
441
442 void set_value_at_thread_exit(R&& val)
443 {
444 if (!state_)
445 throw future_error{make_error_code(future_errc::no_state)};
446 if (state_->is_set())
447 {
448 throw future_error{
449 make_error_code(future_errc::promise_already_satisfied)
450 };
451 }
452
453 state_->set_value(std::forward<R>(val), false);
454 // TODO: schedule it to be set as ready when thread exits
455 }
456
457 void set_exception_at_thread_exit(exception_ptr)
458 {
459 // TODO: No exception handling, no-op at this time.
460 }
461
462 private:
463 void abandon_state_()
464 {
465 /**
466 * Note: This is the 'abandon' move described in
467 * 30.6.4 (7).
468 * 1) If state is not ready:
469 * a) Store exception of type future_error with
470 * error condition broken_promise.
471 * b) Mark state as ready.
472 * 2) Rekease the state.
473 */
474 }
475
476 aux::shared_state<R>* state_;
477 };
478
479 template<class R>
480 class promise<R&>
481 {
482 // TODO: Copy & modify once promise is done.
483 };
484
485 template<>
486 class promise<void>
487 {
488 // TODO: Copy & modify once promise is done.
489 };
490
491 template<class R>
492 void swap(promise<R>& lhs, promise<R>& rhs) noexcept
493 {
494 lhs.swap(rhs);
495 }
496
497 template<class R, class Alloc>
498 struct uses_allocator<promise<R>, Alloc>: true_type
499 { /* DUMMY BODY */ };
500
501 template<class R>
502 class shared_future;
503
504 template<class R>
505 class future
506 {
507 public:
508 future() noexcept
509 : state_{nullptr}
510 { /* DUMMY BODY */ }
511
512 future(const future&) = delete;
513
514 future(future&& rhs) noexcept
515 : state_{std::move(rhs.state_)}
516 {
517 rhs.state_ = nullptr;
518 }
519
520 future(aux::shared_state<R>* state)
521 : state_{state}
522 {
523 /**
524 * Note: This is a custom non-standard constructor that allows
525 * us to create a future directly from a shared state. This
526 * should never be a problem as aux::shared_state is a private
527 * type and future has no constructor templates.
528 */
529 }
530
531 ~future()
532 {
533 release_state_();
534 }
535
536 future& operator=(const future) = delete;
537
538 future& operator=(future&& rhs) noexcept
539 {
540 release_state_();
541 state_ = std::move(rhs.state_);
542 rhs.state_ = nullptr;
543 }
544
545 shared_future<R> share()
546 {
547 return shared_future<R>(std::move(*this));
548 }
549
550 R get()
551 {
552 assert(state_);
553
554 wait();
555
556 if (state_->has_exception())
557 state_->throw_stored_exception();
558 auto res = std::move(state_->get());
559
560 release_state_();
561
562 return res;
563 }
564
565 bool valid() const noexcept
566 {
567 return state_ != nullptr;
568 }
569
570 void wait() const noexcept
571 {
572 assert(state_);
573
574 state_->wait();
575 }
576
577 template<class Rep, class Period>
578 future_status wait_for(const chrono::duration<Rep, Period>& rel_time) const
579 {
580 assert(state_);
581 if (state_->is_deffered_function)
582 return future_status::deferred;
583
584 auto res = state_->wait_for(rel_time);
585
586 if (res)
587 return future_status::ready;
588 else
589 return future_status::timeout;
590 }
591
592 template<class Clock, class Duration>
593 future_status wait_until(const chrono::time_point<Clock, Duration>& abs_time) const
594 {
595 assert(state_);
596 if (state_->is_deffered_function)
597 return future_status::deferred;
598
599 auto res = state_->wait_until(abs_time);
600
601 if (res)
602 return future_status::ready;
603 else
604 return future_status::timeout;
605 }
606
607 private:
608 void release_state_()
609 {
610 if (!state_)
611 return;
612
613 /**
614 * Note: This is the 'release' move described in
615 * 30.6.4 (5).
616 * Last reference to state -> destroy state.
617 * Decrement refcount of state otherwise.
618 * Will not block, unless all following hold:
619 * 1) State was created by call to std::async.
620 * 2) State is not yet ready.
621 * 3) This was the last reference to the shared state.
622 */
623 if (state_->decrement())
624 {
625 /**
626 * The destroy call handles the special case
627 * when 1) - 3) hold.
628 */
629 state_->destroy();
630 delete state_;
631 state_ = nullptr;
632 }
633 }
634
635 aux::shared_state<R>* state_;
636 };
637
638 template<class R>
639 class future<R&>
640 {
641 // TODO: Copy & modify once future is done.
642 };
643
644 template<>
645 class future<void>
646 {
647 // TODO: Copy & modify once future is done.
648 };
649
650 // TODO: Make sure the move-future constructor of shared_future
651 // invalidates the state (i.e. sets to nullptr).
652 template<class R>
653 class shared_future
654 {
655 // TODO: Copy & modify once future is done.
656 };
657
658 template<class R>
659 class shared_future<R&>
660 {
661 // TODO: Copy & modify once future is done.
662 };
663
664 template<>
665 class shared_future<void>
666 {
667 // TODO: Copy & modify once future is done.
668 };
669
670 template<class>
671 class packaged_task; // undefined
672
673 template<class R, class... Args>
674 class packaged_task<R(Args...)>
675 {
676 packaged_task() noexcept
677 {}
678
679 template<class F>
680 explicit packaged_task(F&& f)
681 {}
682
683 template<class F, class Allocator>
684 explicit packaged_task(allocator_arg_t, const Allocator& a, F&& f)
685 {}
686
687 ~packaged_task()
688 {}
689
690 packaged_task(const packaged_task&) = delete;
691 packaged_task& operator=(const packaged_task&) = delete;
692
693 packaged_task(packaged_task&& rhs)
694 {}
695
696 packaged_task& operator=(packaged_task&& rhs)
697 {}
698
699 void swap(packaged_task& other) noexcept
700 {}
701
702 bool valid() const noexcept
703 {}
704
705 future<R> get_future()
706 {}
707
708 void operator()(Args...)
709 {}
710
711 void make_ready_at_thread_exit(Args...)
712 {}
713
714 void reset()
715 {}
716 };
717
718 template<class R, class... Args>
719 void swap(packaged_task<R(Args...)>& lhs, packaged_task<R(Args...)>& rhs) noexcept
720 {
721 lhs.swap(rhs);
722 };
723
724 template<class R, class Alloc>
725 struct uses_allocator<packaged_task<R>, Alloc>: true_type
726 { /* DUMMY BODY */ };
727
728 namespace aux
729 {
730 /**
731 * Note: The reason we keep the actual function
732 * within the aux namespace is that were the non-policy
733 * version of the function call the other one in the std
734 * namespace, we'd get resolution conflicts. This way
735 * aux::async is properly called even if std::async is
736 * called either with or without a launch policy.
737 */
738 template<class F, class... Args>
739 future<result_of_t<decay_t<F>(decay_t<Args>...)>>
740 async(launch policy, F&& f, Args&&... args)
741 {
742 using result_t = result_of_t<decay_t<F>(decay_t<Args>...)>;
743
744 bool async = (static_cast<int>(policy) &
745 static_cast<int>(launch::async)) != 0;
746 bool deferred = (static_cast<int>(policy) &
747 static_cast<int>(launch::deferred)) != 0;
748
749 /**
750 * Note: The case when async | deferred is set in policy
751 * is implementation defined, feel free to change.
752 */
753 if (async && deferred)
754 {
755 return future<result_t>{
756 new aux::deferred_shared_state<
757 result_t, F, Args...
758 >{forward<F>(f), forward<Args>(args)...}
759 };
760 }
761 else if (async)
762 {
763 return future<result_t>{
764 new aux::async_shared_state<
765 result_t, F, Args...
766 >{forward<F>(f), forward<Args>(args)...}
767 };
768 }
769 else if (deferred)
770 {
771 /**
772 * Duplicated on purpose because of the default.
773 * Do not remove!
774 */
775 return future<result_t>{
776 new aux::deferred_shared_state<
777 result_t, F, Args...
778 >{forward<F>(f), forward<Args>(args)...}
779 };
780 }
781
782 /**
783 * This is undefined behaviour, let's be nice though ;)
784 */
785 return future<result_t>{
786 new aux::deferred_shared_state<
787 result_t, F, Args...
788 >{forward<F>(f), forward<Args>(args)...}
789 };
790 }
791 }
792
793 template<class F>
794 decltype(auto) async(F&& f)
795 {
796 launch policy = static_cast<launch>(
797 static_cast<int>(launch::async) |
798 static_cast<int>(launch::deferred)
799 );
800
801 return aux::async(policy, forward<F>(f));
802 }
803
804 /**
805 * The async(launch, F, Args...) and async(F, Args...)
806 * overloards must not collide, so we check the first template
807 * argument and handle the special case of just a functor
808 * above.
809 */
810 template<class F, class Arg, class... Args>
811 decltype(auto) async(F&& f, Arg&& arg, Args&&... args)
812 {
813 if constexpr (is_same_v<decay_t<F>, launch>)
814 return aux::async(f, forward<Arg>(arg), forward<Args>(args)...);
815 else
816 {
817 launch policy = static_cast<launch>(
818 static_cast<int>(launch::async) |
819 static_cast<int>(launch::deferred)
820 );
821
822 return aux::async(policy, forward<F>(f), forward<Arg>(arg), forward<Args>(args)...);
823 }
824 }
825}
826
827#endif
Note: See TracBrowser for help on using the repository browser.