1 |
This serves as a wrapper around portage's internal TaskScheduler |
2 |
class, allowing TaskScheduler API consumers to be migrated to use |
3 |
asyncio interfaces. |
4 |
|
5 |
Bug: https://bugs.gentoo.org/591760 |
6 |
--- |
7 |
.../tests/util/futures/test_iter_completed.py | 37 ++++++++++++- |
8 |
pym/portage/util/futures/iter_completed.py | 61 +++++++++++++++++++--- |
9 |
2 files changed, 91 insertions(+), 7 deletions(-) |
10 |
|
11 |
diff --git a/pym/portage/tests/util/futures/test_iter_completed.py b/pym/portage/tests/util/futures/test_iter_completed.py |
12 |
index 9c23aefb1..1344523c6 100644 |
13 |
--- a/pym/portage/tests/util/futures/test_iter_completed.py |
14 |
+++ b/pym/portage/tests/util/futures/test_iter_completed.py |
15 |
@@ -5,7 +5,11 @@ import time |
16 |
from portage.tests import TestCase |
17 |
from portage.util._async.ForkProcess import ForkProcess |
18 |
from portage.util._eventloop.global_event_loop import global_event_loop |
19 |
-from portage.util.futures.iter_completed import iter_completed |
20 |
+from portage.util.futures import asyncio |
21 |
+from portage.util.futures.iter_completed import ( |
22 |
+ iter_completed, |
23 |
+ async_iter_completed, |
24 |
+) |
25 |
|
26 |
|
27 |
class SleepProcess(ForkProcess): |
28 |
@@ -48,3 +52,34 @@ class IterCompletedTestCase(TestCase): |
29 |
for seconds, future in zip(expected_order, iter_completed(future_generator(), |
30 |
max_jobs=True, max_load=None, loop=loop)): |
31 |
self.assertEqual(seconds, future.result()) |
32 |
+ |
33 |
+ def testAsyncCancel(self): |
34 |
+ |
35 |
+ loop = global_event_loop()._asyncio_wrapper |
36 |
+ input_futures = set() |
37 |
+ future_count = 3 |
38 |
+ |
39 |
+ def future_generator(): |
40 |
+ for i in range(future_count): |
41 |
+ future = loop.create_future() |
42 |
+ loop.call_soon(lambda future: None if future.done() |
43 |
+ else future.set_result(None), future) |
44 |
+ input_futures.add(future) |
45 |
+ yield future |
46 |
+ |
47 |
+ for future_done_set in async_iter_completed(future_generator(), |
48 |
+ max_jobs=True, max_load=None, loop=loop): |
49 |
+ future_done_set.cancel() |
50 |
+ break |
51 |
+ |
52 |
+ # With max_jobs=True, async_iter_completed should have executed |
53 |
+ # the generator until it raised StopIteration. |
54 |
+ self.assertEqual(future_count, len(input_futures)) |
55 |
+ |
56 |
+ loop.run_until_complete(asyncio.wait(input_futures, loop=loop)) |
57 |
+ |
58 |
+ # The futures may have results or they may have been cancelled |
59 |
+ # by TaskScheduler, and behavior varies depending on the python |
60 |
+ # interpreter. |
61 |
+ for future in input_futures: |
62 |
+ future.cancelled() or future.result() |
63 |
diff --git a/pym/portage/util/futures/iter_completed.py b/pym/portage/util/futures/iter_completed.py |
64 |
index 8d324de84..5ad075305 100644 |
65 |
--- a/pym/portage/util/futures/iter_completed.py |
66 |
+++ b/pym/portage/util/futures/iter_completed.py |
67 |
@@ -1,6 +1,7 @@ |
68 |
# Copyright 2018 Gentoo Foundation |
69 |
# Distributed under the terms of the GNU General Public License v2 |
70 |
|
71 |
+import functools |
72 |
import multiprocessing |
73 |
|
74 |
from portage.util._async.AsyncTaskFuture import AsyncTaskFuture |
75 |
@@ -31,6 +32,38 @@ def iter_completed(futures, max_jobs=None, max_load=None, loop=None): |
76 |
""" |
77 |
loop = loop or global_event_loop() |
78 |
loop = getattr(loop, '_asyncio_wrapper', loop) |
79 |
+ |
80 |
+ for future_done_set in async_iter_completed(futures, |
81 |
+ max_jobs=max_jobs, max_load=max_load, loop=loop): |
82 |
+ for future in loop.run_until_complete(future_done_set): |
83 |
+ yield future |
84 |
+ |
85 |
+ |
86 |
+def async_iter_completed(futures, max_jobs=None, max_load=None, loop=None): |
87 |
+ """ |
88 |
+ An asynchronous version of iter_completed. This yields futures, which |
89 |
+ when done, result in a set of input futures that are done. This serves |
90 |
+ as a wrapper around portage's internal TaskScheduler class, using |
91 |
+ standard asyncio interfaces. |
92 |
+ |
93 |
+ @param futures: iterator of asyncio.Future (or compatible) |
94 |
+ @type futures: iterator |
95 |
+ @param max_jobs: max number of futures to process concurrently (default |
96 |
+ is multiprocessing.cpu_count()) |
97 |
+ @type max_jobs: int |
98 |
+ @param max_load: max load allowed when scheduling a new future, |
99 |
+ otherwise schedule no more than 1 future at a time (default |
100 |
+ is multiprocessing.cpu_count()) |
101 |
+ @type max_load: int or float |
102 |
+ @param loop: event loop |
103 |
+ @type loop: EventLoop |
104 |
+ @return: iterator of futures, which when done, result in a set of |
105 |
+ input futures that are done |
106 |
+ @rtype: iterator |
107 |
+ """ |
108 |
+ loop = loop or global_event_loop() |
109 |
+ loop = getattr(loop, '_asyncio_wrapper', loop) |
110 |
+ |
111 |
max_jobs = max_jobs or multiprocessing.cpu_count() |
112 |
max_load = max_load or multiprocessing.cpu_count() |
113 |
|
114 |
@@ -46,19 +79,35 @@ def iter_completed(futures, max_jobs=None, max_load=None, loop=None): |
115 |
max_load=max_load, |
116 |
event_loop=loop._loop) |
117 |
|
118 |
+ def done_callback(future_done_set, wait_result): |
119 |
+ """Propagate results from wait_result to future_done_set.""" |
120 |
+ if future_done_set.cancelled(): |
121 |
+ return |
122 |
+ done, pending = wait_result.result() |
123 |
+ for future in done: |
124 |
+ del future_map[id(future)] |
125 |
+ future_done_set.set_result(done) |
126 |
+ |
127 |
+ def cancel_callback(wait_result, future_done_set): |
128 |
+ """Cancel wait_result if future_done_set has been cancelled.""" |
129 |
+ if future_done_set.cancelled() and not wait_result.done(): |
130 |
+ wait_result.cancel() |
131 |
+ |
132 |
try: |
133 |
scheduler.start() |
134 |
|
135 |
# scheduler should ensure that future_map is non-empty until |
136 |
# task_generator is exhausted |
137 |
while future_map: |
138 |
- done, pending = loop.run_until_complete( |
139 |
+ wait_result = asyncio.ensure_future( |
140 |
asyncio.wait(list(future_map.values()), |
141 |
- return_when=asyncio.FIRST_COMPLETED, loop=loop)) |
142 |
- for future in done: |
143 |
- del future_map[id(future)] |
144 |
- yield future |
145 |
- |
146 |
+ return_when=asyncio.FIRST_COMPLETED, loop=loop), loop=loop) |
147 |
+ future_done_set = loop.create_future() |
148 |
+ future_done_set.add_done_callback( |
149 |
+ functools.partial(cancel_callback, wait_result)) |
150 |
+ wait_result.add_done_callback( |
151 |
+ functools.partial(done_callback, future_done_set)) |
152 |
+ yield future_done_set |
153 |
finally: |
154 |
# cleanup in case of interruption by SIGINT, etc |
155 |
scheduler.cancel() |
156 |
-- |
157 |
2.13.6 |