1 |
commit: e43f6c583ed9205abbdcb11340c81d7dd97ccc11 |
2 |
Author: Zac Medico <zmedico <AT> gentoo <DOT> org> |
3 |
AuthorDate: Sun Feb 25 23:19:58 2018 +0000 |
4 |
Commit: Zac Medico <zmedico <AT> gentoo <DOT> org> |
5 |
CommitDate: Wed Feb 28 17:22:20 2018 +0000 |
6 |
URL: https://gitweb.gentoo.org/proj/portage.git/commit/?id=e43f6c58 |
7 |
|
8 |
Add iter_completed convenience function (bug 648790) |
9 |
|
10 |
The iter_completed function is similar to asyncio.as_completed, but |
11 |
takes an iterator of futures as input, and includes support for |
12 |
max_jobs and max_load parameters. The default values for max_jobs |
13 |
and max_load correspond to multiprocessing.cpu_count(). |
14 |
|
15 |
Example usage for async_aux_get: |
16 |
|
17 |
import portage |
18 |
from portage.util.futures.iter_completed import iter_completed |
19 |
|
20 |
portdb = portage.portdb |
21 |
# aux_get has many inputs, and the same cpv can exist in multiple |
22 |
# repositories, so the caller is responsibe for mapping futures |
23 |
# back to their aux_get inputs |
24 |
future_cpv = {} |
25 |
|
26 |
def future_generator(): |
27 |
for cpv in portdb.cp_list('sys-apps/portage'): |
28 |
future = portdb.async_aux_get(cpv, portage.auxdbkeys) |
29 |
future_cpv[id(future)] = cpv |
30 |
yield future |
31 |
|
32 |
for future in iter_completed(future_generator()): |
33 |
cpv = future_cpv.pop(id(future)) |
34 |
try: |
35 |
result = future.result() |
36 |
except KeyError as e: |
37 |
# aux_get failed |
38 |
print('error:', cpv, e) |
39 |
else: |
40 |
print(cpv, result) |
41 |
|
42 |
See: https://docs.python.org/3/library/asyncio-task.html#asyncio.as_completed |
43 |
Bug: https://bugs.gentoo.org/648790 |
44 |
|
45 |
.../tests/util/futures/test_iter_completed.py | 50 ++++++++++ |
46 |
pym/portage/util/_async/AsyncTaskFuture.py | 31 +++++++ |
47 |
pym/portage/util/futures/iter_completed.py | 63 +++++++++++++ |
48 |
pym/portage/util/futures/wait.py | 102 +++++++++++++++++++++ |
49 |
4 files changed, 246 insertions(+) |
50 |
|
51 |
diff --git a/pym/portage/tests/util/futures/test_iter_completed.py b/pym/portage/tests/util/futures/test_iter_completed.py |
52 |
new file mode 100644 |
53 |
index 000000000..d0a7dbb45 |
54 |
--- /dev/null |
55 |
+++ b/pym/portage/tests/util/futures/test_iter_completed.py |
56 |
@@ -0,0 +1,50 @@ |
57 |
+# Copyright 2018 Gentoo Foundation |
58 |
+# Distributed under the terms of the GNU General Public License v2 |
59 |
+ |
60 |
+import time |
61 |
+from portage.tests import TestCase |
62 |
+from portage.util._async.ForkProcess import ForkProcess |
63 |
+from portage.util._eventloop.global_event_loop import global_event_loop |
64 |
+from portage.util.futures.iter_completed import iter_completed |
65 |
+ |
66 |
+ |
67 |
+class SleepProcess(ForkProcess): |
68 |
+ __slots__ = ('future', 'seconds') |
69 |
+ def _start(self): |
70 |
+ self.addExitListener(self._future_done) |
71 |
+ ForkProcess._start(self) |
72 |
+ |
73 |
+ def _future_done(self, task): |
74 |
+ self.future.set_result(self.seconds) |
75 |
+ |
76 |
+ def _run(self): |
77 |
+ time.sleep(self.seconds) |
78 |
+ |
79 |
+ |
80 |
+class IterCompletedTestCase(TestCase): |
81 |
+ |
82 |
+ def testIterCompleted(self): |
83 |
+ |
84 |
+ # Mark this as todo, since we don't want to fail if heavy system |
85 |
+ # load causes the tasks to finish in an unexpected order. |
86 |
+ self.todo = True |
87 |
+ |
88 |
+ loop = global_event_loop() |
89 |
+ tasks = [ |
90 |
+ SleepProcess(seconds=0.200), |
91 |
+ SleepProcess(seconds=0.100), |
92 |
+ SleepProcess(seconds=0.001), |
93 |
+ ] |
94 |
+ |
95 |
+ expected_order = sorted(task.seconds for task in tasks) |
96 |
+ |
97 |
+ def future_generator(): |
98 |
+ for task in tasks: |
99 |
+ task.future = loop.create_future() |
100 |
+ task.scheduler = loop |
101 |
+ task.start() |
102 |
+ yield task.future |
103 |
+ |
104 |
+ for seconds, future in zip(expected_order, iter_completed(future_generator(), |
105 |
+ max_jobs=True, max_load=None, loop=loop)): |
106 |
+ self.assertEqual(seconds, future.result()) |
107 |
|
108 |
diff --git a/pym/portage/util/_async/AsyncTaskFuture.py b/pym/portage/util/_async/AsyncTaskFuture.py |
109 |
new file mode 100644 |
110 |
index 000000000..ee39183fe |
111 |
--- /dev/null |
112 |
+++ b/pym/portage/util/_async/AsyncTaskFuture.py |
113 |
@@ -0,0 +1,31 @@ |
114 |
+# Copyright 2018 Gentoo Foundation |
115 |
+# Distributed under the terms of the GNU General Public License v2 |
116 |
+ |
117 |
+import os |
118 |
+import signal |
119 |
+ |
120 |
+from _emerge.AsynchronousTask import AsynchronousTask |
121 |
+ |
122 |
+ |
123 |
+class AsyncTaskFuture(AsynchronousTask): |
124 |
+ """ |
125 |
+ Wraps a Future in an AsynchronousTask, which is useful for |
126 |
+ scheduling with TaskScheduler. |
127 |
+ """ |
128 |
+ __slots__ = ('future', 'scheduler') |
129 |
+ def _start(self): |
130 |
+ self.future.add_done_callback(self._done_callback) |
131 |
+ |
132 |
+ def _cancel(self): |
133 |
+ if not self.future.done(): |
134 |
+ self.future.cancel() |
135 |
+ |
136 |
+ def _done_callback(self, future): |
137 |
+ if future.cancelled(): |
138 |
+ self.cancelled = True |
139 |
+ self.returncode = -signal.SIGINT |
140 |
+ elif future.exception() is None: |
141 |
+ self.returncode = os.EX_OK |
142 |
+ else: |
143 |
+ self.returncode = 1 |
144 |
+ self.wait() |
145 |
|
146 |
diff --git a/pym/portage/util/futures/iter_completed.py b/pym/portage/util/futures/iter_completed.py |
147 |
new file mode 100644 |
148 |
index 000000000..1050b6fa7 |
149 |
--- /dev/null |
150 |
+++ b/pym/portage/util/futures/iter_completed.py |
151 |
@@ -0,0 +1,63 @@ |
152 |
+# Copyright 2018 Gentoo Foundation |
153 |
+# Distributed under the terms of the GNU General Public License v2 |
154 |
+ |
155 |
+import multiprocessing |
156 |
+ |
157 |
+from portage.util._async.AsyncTaskFuture import AsyncTaskFuture |
158 |
+from portage.util._async.TaskScheduler import TaskScheduler |
159 |
+from portage.util._eventloop.global_event_loop import global_event_loop |
160 |
+from portage.util.futures.wait import wait, FIRST_COMPLETED |
161 |
+ |
162 |
+ |
163 |
+def iter_completed(futures, max_jobs=None, max_load=None, loop=None): |
164 |
+ """ |
165 |
+ This is similar to asyncio.as_completed, but takes an iterator of |
166 |
+ futures as input, and includes support for max_jobs and max_load |
167 |
+ parameters. |
168 |
+ |
169 |
+ @param futures: iterator of asyncio.Future (or compatible) |
170 |
+ @type futures: iterator |
171 |
+ @param max_jobs: max number of futures to process concurrently (default |
172 |
+ is multiprocessing.cpu_count()) |
173 |
+ @type max_jobs: int |
174 |
+ @param max_load: max load allowed when scheduling a new future, |
175 |
+ otherwise schedule no more than 1 future at a time (default |
176 |
+ is multiprocessing.cpu_count()) |
177 |
+ @type max_load: int or float |
178 |
+ @param loop: event loop |
179 |
+ @type loop: EventLoop |
180 |
+ @return: iterator of futures that are done |
181 |
+ @rtype: iterator |
182 |
+ """ |
183 |
+ loop = loop or global_event_loop() |
184 |
+ max_jobs = max_jobs or multiprocessing.cpu_count() |
185 |
+ max_load = max_load or multiprocessing.cpu_count() |
186 |
+ |
187 |
+ future_map = {} |
188 |
+ def task_generator(): |
189 |
+ for future in futures: |
190 |
+ future_map[id(future)] = future |
191 |
+ yield AsyncTaskFuture(future=future) |
192 |
+ |
193 |
+ scheduler = TaskScheduler( |
194 |
+ task_generator(), |
195 |
+ max_jobs=max_jobs, |
196 |
+ max_load=max_load, |
197 |
+ event_loop=loop) |
198 |
+ |
199 |
+ try: |
200 |
+ scheduler.start() |
201 |
+ |
202 |
+ # scheduler should ensure that future_map is non-empty until |
203 |
+ # task_generator is exhausted |
204 |
+ while future_map: |
205 |
+ done, pending = loop.run_until_complete( |
206 |
+ wait(*list(future_map.values()), return_when=FIRST_COMPLETED)) |
207 |
+ for future in done: |
208 |
+ del future_map[id(future)] |
209 |
+ yield future |
210 |
+ |
211 |
+ finally: |
212 |
+ # cleanup in case of interruption by SIGINT, etc |
213 |
+ scheduler.cancel() |
214 |
+ scheduler.wait() |
215 |
|
216 |
diff --git a/pym/portage/util/futures/wait.py b/pym/portage/util/futures/wait.py |
217 |
new file mode 100644 |
218 |
index 000000000..3f0bdbff5 |
219 |
--- /dev/null |
220 |
+++ b/pym/portage/util/futures/wait.py |
221 |
@@ -0,0 +1,102 @@ |
222 |
+# Copyright 2018 Gentoo Foundation |
223 |
+# Distributed under the terms of the GNU General Public License v2 |
224 |
+ |
225 |
+try: |
226 |
+ from asyncio import ALL_COMPLETED, FIRST_COMPLETED, FIRST_EXCEPTION |
227 |
+except ImportError: |
228 |
+ ALL_COMPLETED = 'ALL_COMPLETED' |
229 |
+ FIRST_COMPLETED ='FIRST_COMPLETED' |
230 |
+ FIRST_EXCEPTION = 'FIRST_EXCEPTION' |
231 |
+ |
232 |
+from portage.util._eventloop.global_event_loop import global_event_loop |
233 |
+ |
234 |
+ |
235 |
+# Use **kwargs since python2.7 does not allow arguments with defaults |
236 |
+# to follow *futures. |
237 |
+def wait(*futures, **kwargs): |
238 |
+ """ |
239 |
+ Use portage's internal EventLoop to emulate asyncio.wait: |
240 |
+ https://docs.python.org/3/library/asyncio-task.html#asyncio.wait |
241 |
+ |
242 |
+ @param future: future to wait for |
243 |
+ @type future: asyncio.Future (or compatible) |
244 |
+ @param timeout: number of seconds to wait (wait indefinitely if |
245 |
+ not specified) |
246 |
+ @type timeout: int or float |
247 |
+ @param return_when: indicates when this function should return, must |
248 |
+ be one of the constants ALL_COMPLETED, FIRST_COMPLETED, or |
249 |
+ FIRST_EXCEPTION (default is ALL_COMPLETED) |
250 |
+ @type return_when: object |
251 |
+ @param loop: event loop |
252 |
+ @type loop: EventLoop |
253 |
+ @return: tuple of (done, pending). |
254 |
+ @rtype: asyncio.Future (or compatible) |
255 |
+ """ |
256 |
+ if not futures: |
257 |
+ raise TypeError("wait() missing 1 required positional argument: 'future'") |
258 |
+ loop = kwargs.pop('loop', None) |
259 |
+ timeout = kwargs.pop('timeout', None) |
260 |
+ return_when = kwargs.pop('return_when', ALL_COMPLETED) |
261 |
+ if kwargs: |
262 |
+ raise TypeError("wait() got an unexpected keyword argument '{}'".\ |
263 |
+ format(next(iter(kwargs)))) |
264 |
+ loop = loop or global_event_loop() |
265 |
+ result_future = loop.create_future() |
266 |
+ _Waiter(futures, timeout, return_when, result_future, loop) |
267 |
+ return result_future |
268 |
+ |
269 |
+ |
270 |
+class _Waiter(object): |
271 |
+ def __init__(self, futures, timeout, return_when, result_future, loop): |
272 |
+ self._futures = futures |
273 |
+ self._completed = set() |
274 |
+ self._exceptions = set() |
275 |
+ self._return_when = return_when |
276 |
+ self._result_future = result_future |
277 |
+ self._loop = loop |
278 |
+ self._ready = False |
279 |
+ self._timeout = None |
280 |
+ result_future.add_done_callback(self._cancel_callback) |
281 |
+ for future in self._futures: |
282 |
+ future.add_done_callback(self._done_callback) |
283 |
+ if timeout is not None: |
284 |
+ self._timeout = loop.call_later(timeout, self._timeout_callback) |
285 |
+ |
286 |
+ def _cancel_callback(self, future): |
287 |
+ if future.cancelled(): |
288 |
+ self._ready_callback() |
289 |
+ |
290 |
+ def _timeout_callback(self): |
291 |
+ if not self._ready: |
292 |
+ self._ready = True |
293 |
+ self._ready_callback() |
294 |
+ |
295 |
+ def _done_callback(self, future): |
296 |
+ if future.cancelled() or future.exception() is None: |
297 |
+ self._completed.add(id(future)) |
298 |
+ else: |
299 |
+ self._exceptions.add(id(future)) |
300 |
+ if not self._ready and ( |
301 |
+ (self._return_when is FIRST_COMPLETED and self._completed) or |
302 |
+ (self._return_when is FIRST_EXCEPTION and self._exceptions) or |
303 |
+ (len(self._futures) == len(self._completed) + len(self._exceptions))): |
304 |
+ self._ready = True |
305 |
+ # use call_soon in case multiple callbacks complete in quick succession |
306 |
+ self._loop.call_soon(self._ready_callback) |
307 |
+ |
308 |
+ def _ready_callback(self): |
309 |
+ if self._timeout is not None: |
310 |
+ self._timeout.cancel() |
311 |
+ self._timeout = None |
312 |
+ if self._result_future.cancelled(): |
313 |
+ return |
314 |
+ done = [] |
315 |
+ pending = [] |
316 |
+ done_ids = self._completed.union(self._exceptions) |
317 |
+ for future in self._futures: |
318 |
+ if id(future) in done_ids: |
319 |
+ done.append(future) |
320 |
+ else: |
321 |
+ pending.append(future) |
322 |
+ future.remove_done_callback(self._done_callback) |
323 |
+ self._result_future.set_result((done, pending)) |