1 |
The iter_completed function is similar to asyncio.as_completed, but |
2 |
takes an iterator of futures as input, and includes support for |
3 |
max_jobs and max_load parameters. The default values for max_jobs |
4 |
and max_load correspond to multiprocessing.cpu_count(). |
5 |
|
6 |
Example usage for async_aux_get: |
7 |
|
8 |
import portage |
9 |
from portage.util.futures.iter_completed import iter_completed |
10 |
|
11 |
portdb = portage.portdb |
12 |
future_cpv = {} |
13 |
|
14 |
def future_generator(): |
15 |
for cpv in portdb.cp_list('sys-apps/portage'): |
16 |
future = portdb.async_aux_get(cpv, portage.auxdbkeys) |
17 |
future_cpv[id(future)] = cpv |
18 |
yield future |
19 |
|
20 |
for future in iter_completed(future_generator()): |
21 |
cpv = future_cpv.pop(id(future)) |
22 |
try: |
23 |
result = future.result() |
24 |
except KeyError as e: |
25 |
# aux_get failed |
26 |
print('error:', cpv, e) |
27 |
else: |
28 |
print(cpv, result) |
29 |
|
30 |
See: https://docs.python.org/3/library/asyncio-task.html#asyncio.as_completed |
31 |
Bug: https://bugs.gentoo.org/648790 |
32 |
--- |
33 |
.../tests/util/futures/test_iter_completed.py | 50 ++++++++++++ |
34 |
pym/portage/util/_async/FuturePollTask.py | 27 ++++++ |
35 |
pym/portage/util/futures/iter_completed.py | 63 ++++++++++++++ |
36 |
pym/portage/util/futures/wait.py | 95 ++++++++++++++++++++++ |
37 |
4 files changed, 235 insertions(+) |
38 |
create mode 100644 pym/portage/tests/util/futures/test_iter_completed.py |
39 |
create mode 100644 pym/portage/util/_async/FuturePollTask.py |
40 |
create mode 100644 pym/portage/util/futures/iter_completed.py |
41 |
create mode 100644 pym/portage/util/futures/wait.py |
42 |
|
43 |
diff --git a/pym/portage/tests/util/futures/test_iter_completed.py b/pym/portage/tests/util/futures/test_iter_completed.py |
44 |
new file mode 100644 |
45 |
index 000000000..6607d871c |
46 |
--- /dev/null |
47 |
+++ b/pym/portage/tests/util/futures/test_iter_completed.py |
48 |
@@ -0,0 +1,50 @@ |
49 |
+# Copyright 2018 Gentoo Foundation |
50 |
+# Distributed under the terms of the GNU General Public License v2 |
51 |
+ |
52 |
+import time |
53 |
+from portage.tests import TestCase |
54 |
+from portage.util._async.ForkProcess import ForkProcess |
55 |
+from portage.util._eventloop.global_event_loop import global_event_loop |
56 |
+from portage.util.futures.iter_completed import iter_completed |
57 |
+ |
58 |
+ |
59 |
+class SleepProcess(ForkProcess): |
60 |
+ __slots__ = ('future', 'seconds') |
61 |
+ def _start(self): |
62 |
+ self.addExitListener(self._future_done) |
63 |
+ ForkProcess._start(self) |
64 |
+ |
65 |
+ def _future_done(self, task): |
66 |
+ self.future.set_result(self.seconds) |
67 |
+ |
68 |
+ def _run(self): |
69 |
+ time.sleep(self.seconds) |
70 |
+ |
71 |
+ |
72 |
+class IterCompletedTestCase(TestCase): |
73 |
+ |
74 |
+ def testIterCompleted(self): |
75 |
+ |
76 |
+ # Mark this as todo, since we don't want to fail if heavy system |
77 |
+ # load causes the tasks to finish in an unexpected order. |
78 |
+ self.todo = True |
79 |
+ |
80 |
+ loop = global_event_loop() |
81 |
+ tasks = [ |
82 |
+ SleepProcess(seconds=0.200), |
83 |
+ SleepProcess(seconds=0.100), |
84 |
+ SleepProcess(seconds=0.001), |
85 |
+ ] |
86 |
+ |
87 |
+ expected_order = sorted(task.seconds for task in tasks) |
88 |
+ |
89 |
+ def future_generator(): |
90 |
+ for task in tasks: |
91 |
+ task.future = loop.create_future() |
92 |
+ task.scheduler = loop |
93 |
+ task.start() |
94 |
+ yield task.future |
95 |
+ |
96 |
+ for seconds, future in zip(expected_order, iter_completed(future_generator(), |
97 |
+ max_jobs=None, max_load=None, loop=loop)): |
98 |
+ self.assertEqual(seconds, future.result()) |
99 |
diff --git a/pym/portage/util/_async/FuturePollTask.py b/pym/portage/util/_async/FuturePollTask.py |
100 |
new file mode 100644 |
101 |
index 000000000..6b7cdf7d5 |
102 |
--- /dev/null |
103 |
+++ b/pym/portage/util/_async/FuturePollTask.py |
104 |
@@ -0,0 +1,27 @@ |
105 |
+# Copyright 2018 Gentoo Foundation |
106 |
+# Distributed under the terms of the GNU General Public License v2 |
107 |
+ |
108 |
+import os |
109 |
+import signal |
110 |
+ |
111 |
+from _emerge.AbstractPollTask import AbstractPollTask |
112 |
+ |
113 |
+ |
114 |
+class FuturePollTask(AbstractPollTask): |
115 |
+ """ |
116 |
+ Wraps a Future in an AsynchronousTask, which is useful for |
117 |
+ scheduling with TaskScheduler. |
118 |
+ """ |
119 |
+ __slots__ = ('future',) |
120 |
+ def _start(self): |
121 |
+ self.future.add_done_callback(self._done_callback) |
122 |
+ |
123 |
+ def _done_callback(self, future): |
124 |
+ if future.cancelled(): |
125 |
+ self.cancelled = True |
126 |
+ self.returncode = -signal.SIGINT |
127 |
+ elif future.exception() is None: |
128 |
+ self.returncode = os.EX_OK |
129 |
+ else: |
130 |
+ self.returncode = 1 |
131 |
+ self.wait() |
132 |
diff --git a/pym/portage/util/futures/iter_completed.py b/pym/portage/util/futures/iter_completed.py |
133 |
new file mode 100644 |
134 |
index 000000000..0540cc986 |
135 |
--- /dev/null |
136 |
+++ b/pym/portage/util/futures/iter_completed.py |
137 |
@@ -0,0 +1,63 @@ |
138 |
+# Copyright 2018 Gentoo Foundation |
139 |
+# Distributed under the terms of the GNU General Public License v2 |
140 |
+ |
141 |
+import multiprocessing |
142 |
+ |
143 |
+from portage.util._async.FuturePollTask import FuturePollTask |
144 |
+from portage.util._async.TaskScheduler import TaskScheduler |
145 |
+from portage.util._eventloop.global_event_loop import global_event_loop |
146 |
+from portage.util.futures.wait import wait, FIRST_COMPLETED |
147 |
+ |
148 |
+ |
149 |
+def iter_completed(futures, max_jobs=None, max_load=None, loop=None): |
150 |
+ """ |
151 |
+ This is similar to asyncio.as_completed, but takes an iterator of |
152 |
+ futures as input, and includes support for max_jobs and max_load |
153 |
+ parameters. |
154 |
+ |
155 |
+ @param futures: iterator of asyncio.Future (or compatible) |
156 |
+ @type futures: iterator |
157 |
+ @param max_jobs: max number of futures to process concurrently (default |
158 |
+ is multiprocessing.cpu_count()) |
159 |
+ @type max_jobs: int |
160 |
+ @param max_load: max load allowed when scheduling a new future, |
161 |
+ otherwise schedule no more than 1 future at a time (default |
162 |
+ is multiprocessing.cpu_count()) |
163 |
+ @type max_load: int or float |
164 |
+ @param loop: event loop |
165 |
+ @type loop: EventLoop |
166 |
+ @return: iterator of futures that are done |
167 |
+ @rtype: iterator |
168 |
+ """ |
169 |
+ loop = loop or global_event_loop() |
170 |
+ max_jobs = max_jobs or multiprocessing.cpu_count() |
171 |
+ max_load = max_load or multiprocessing.cpu_count() |
172 |
+ |
173 |
+ future_map = {} |
174 |
+ def task_generator(): |
175 |
+ for future in futures: |
176 |
+ future_map[id(future)] = future |
177 |
+ yield FuturePollTask(future=future) |
178 |
+ |
179 |
+ scheduler = TaskScheduler( |
180 |
+ task_generator(), |
181 |
+ max_jobs=max_jobs, |
182 |
+ max_load=max_load, |
183 |
+ event_loop=loop) |
184 |
+ |
185 |
+ try: |
186 |
+ scheduler.start() |
187 |
+ |
188 |
+ # scheduler should ensure that future_map is non-empty until |
189 |
+ # task_generator is exhausted |
190 |
+ while future_map: |
191 |
+ done, pending = loop.run_until_complete( |
192 |
+ wait(*list(future_map.values()), return_when=FIRST_COMPLETED)) |
193 |
+ for future in done: |
194 |
+ del future_map[id(future)] |
195 |
+ yield future |
196 |
+ |
197 |
+ finally: |
198 |
+ # cleanup in case of interruption by SIGINT, etc |
199 |
+ scheduler.cancel() |
200 |
+ scheduler.wait() |
201 |
diff --git a/pym/portage/util/futures/wait.py b/pym/portage/util/futures/wait.py |
202 |
new file mode 100644 |
203 |
index 000000000..a65a25ac4 |
204 |
--- /dev/null |
205 |
+++ b/pym/portage/util/futures/wait.py |
206 |
@@ -0,0 +1,95 @@ |
207 |
+# Copyright 2018 Gentoo Foundation |
208 |
+# Distributed under the terms of the GNU General Public License v2 |
209 |
+ |
210 |
+try: |
211 |
+ from asyncio import ALL_COMPLETED, FIRST_COMPLETED, FIRST_EXCEPTION |
212 |
+except ImportError: |
213 |
+ ALL_COMPLETED = 'ALL_COMPLETED' |
214 |
+ FIRST_COMPLETED ='FIRST_COMPLETED' |
215 |
+ FIRST_EXCEPTION = 'FIRST_EXCEPTION' |
216 |
+ |
217 |
+from portage.util._eventloop.global_event_loop import global_event_loop |
218 |
+ |
219 |
+ |
220 |
+# Use **kwargs since python2.7 does not allow arguments with defaults |
221 |
+# to follow *futures. |
222 |
+def wait(*futures, **kwargs): |
223 |
+ """ |
224 |
+ Use portage's internal EventLoop to emulate asyncio.wait: |
225 |
+ https://docs.python.org/3/library/asyncio-task.html#asyncio.wait |
226 |
+ |
227 |
+ @param future: future to wait for |
228 |
+ @type future: asyncio.Future (or compatible) |
229 |
+ @param timeout: number of seconds to wait (wait indefinitely if |
230 |
+ not specified) |
231 |
+ @type timeout: int or float |
232 |
+ @param return_when: indicates when this function should return, must |
233 |
+ be one of the constants ALL_COMPLETED, FIRST_COMPLETED, or |
234 |
+ FIRST_EXCEPTION (default is ALL_COMPLETED) |
235 |
+ @type return_when: object |
236 |
+ @param loop: event loop |
237 |
+ @type loop: EventLoop |
238 |
+ @return: tuple of (done, pending). |
239 |
+ @rtype: asyncio.Future (or compatible) |
240 |
+ """ |
241 |
+ if not futures: |
242 |
+ raise TypeError("wait() missing 1 required positional argument: 'future'") |
243 |
+ loop = kwargs.pop('loop', None) |
244 |
+ timeout = kwargs.pop('timeout', None) |
245 |
+ return_when = kwargs.pop('return_when', ALL_COMPLETED) |
246 |
+ if kwargs: |
247 |
+ raise TypeError("wait() got an unexpected keyword argument '{}'".\ |
248 |
+ format(next(iter(kwargs)))) |
249 |
+ loop = loop or global_event_loop() |
250 |
+ result_future = loop.create_future() |
251 |
+ _Waiter(futures, timeout, return_when, result_future, loop) |
252 |
+ return result_future |
253 |
+ |
254 |
+ |
255 |
+class _Waiter(object): |
256 |
+ def __init__(self, futures, timeout, return_when, result_future, loop): |
257 |
+ self._futures = futures |
258 |
+ self._completed = set() |
259 |
+ self._exceptions = set() |
260 |
+ self._return_when = return_when |
261 |
+ self._result_future = result_future |
262 |
+ self._loop = loop |
263 |
+ self._ready = False |
264 |
+ self._timeout = None |
265 |
+ for future in self._futures: |
266 |
+ future.add_done_callback(self._done_callback) |
267 |
+ if timeout is not None: |
268 |
+ self._timeout = loop.call_later(timeout, self._timeout_callback) |
269 |
+ |
270 |
+ def _timeout_callback(self): |
271 |
+ if not self._ready: |
272 |
+ self._ready = True |
273 |
+ self._ready_callback() |
274 |
+ |
275 |
+ def _done_callback(self, future): |
276 |
+ if future.cancelled() or future.exception() is None: |
277 |
+ self._completed.add(id(future)) |
278 |
+ else: |
279 |
+ self._exceptions.add(id(future)) |
280 |
+ if not self._ready and ( |
281 |
+ (self._return_when is FIRST_COMPLETED and self._completed) or |
282 |
+ (self._return_when is FIRST_EXCEPTION and self._exceptions) or |
283 |
+ (len(self._futures) == len(self._completed) + len(self._exceptions))): |
284 |
+ self._ready = True |
285 |
+ # use call_soon in case multiple callbacks complete in quick succession |
286 |
+ self._loop.call_soon(self._ready_callback) |
287 |
+ |
288 |
+ def _ready_callback(self): |
289 |
+ if self._timeout is not None: |
290 |
+ self._timeout.cancel() |
291 |
+ self._timeout = None |
292 |
+ done = [] |
293 |
+ pending = [] |
294 |
+ done_ids = self._completed.union(self._exceptions) |
295 |
+ for future in self._futures: |
296 |
+ if id(future) in done_ids: |
297 |
+ done.append(future) |
298 |
+ else: |
299 |
+ pending.append(future) |
300 |
+ future.remove_done_callback(self._done_callback) |
301 |
+ self._result_future.set_result((done, pending)) |
302 |
-- |
303 |
2.13.6 |