1 |
On Sun, Feb 25, 2018 at 8:50 PM, Zac Medico <zmedico@g.o> wrote: |
2 |
|
3 |
> The iter_completed function is similar to asyncio.as_completed, but |
4 |
> takes an iterator of futures as input, and includes support for |
5 |
> max_jobs and max_load parameters. The default values for max_jobs |
6 |
> and max_load correspond to multiprocessing.cpu_count(). |
7 |
> |
8 |
> Example usage for async_aux_get: |
9 |
> |
10 |
> import portage |
11 |
> from portage.util.futures.iter_completed import iter_completed |
12 |
> |
13 |
> portdb = portage.portdb |
14 |
> future_cpv = {} |
15 |
> |
16 |
|
17 |
I'm not sure I grasp the purpose of this dict, can't we just modify the |
18 |
async aux get to return the cpv from the future? |
19 |
|
20 |
|
21 |
> |
22 |
> def future_generator(): |
23 |
> for cpv in portdb.cp_list('sys-apps/portage'): |
24 |
> future = portdb.async_aux_get(cpv, portage.auxdbkeys) |
25 |
> future_cpv[id(future)] = cpv |
26 |
> yield future |
27 |
> |
28 |
> |
29 |
for cpv in portdb.cp_list('...'): |
30 |
yield portdb.async_aux_get(cpv, portage.auxdbkeys) |
31 |
|
32 |
|
33 |
> for future in iter_completed(future_generator()): |
34 |
> cpv = future_cpv.pop(id(future)) |
35 |
> try: |
36 |
> result = future.result() |
37 |
> except KeyError as e: |
38 |
> # aux_get failed |
39 |
> print('error:', cpv, e) |
40 |
> else: |
41 |
> print(cpv, result) |
42 |
> |
43 |
|
44 |
for future in iter_completed(future_generator()): |
45 |
try: |
46 |
cpv, result = future.result() |
47 |
except KeyError as e: |
48 |
print('error', cpv, e) |
49 |
|
50 |
|
51 |
Or do we expect callers to need other things to key off of in this API? |
52 |
|
53 |
-A |
54 |
|
55 |
|
56 |
> See: https://docs.python.org/3/library/asyncio-task.html# |
57 |
> asyncio.as_completed |
58 |
> Bug: https://bugs.gentoo.org/648790 |
59 |
> --- |
60 |
> .../tests/util/futures/test_iter_completed.py | 50 ++++++++++++ |
61 |
> pym/portage/util/_async/FuturePollTask.py | 27 ++++++ |
62 |
> pym/portage/util/futures/iter_completed.py | 63 ++++++++++++++ |
63 |
> pym/portage/util/futures/wait.py | 95 |
64 |
> ++++++++++++++++++++++ |
65 |
> 4 files changed, 235 insertions(+) |
66 |
> create mode 100644 pym/portage/tests/util/futures/test_iter_completed.py |
67 |
> create mode 100644 pym/portage/util/_async/FuturePollTask.py |
68 |
> create mode 100644 pym/portage/util/futures/iter_completed.py |
69 |
> create mode 100644 pym/portage/util/futures/wait.py |
70 |
> |
71 |
> diff --git a/pym/portage/tests/util/futures/test_iter_completed.py |
72 |
> b/pym/portage/tests/util/futures/test_iter_completed.py |
73 |
> new file mode 100644 |
74 |
> index 000000000..6607d871c |
75 |
> --- /dev/null |
76 |
> +++ b/pym/portage/tests/util/futures/test_iter_completed.py |
77 |
> @@ -0,0 +1,50 @@ |
78 |
> +# Copyright 2018 Gentoo Foundation |
79 |
> +# Distributed under the terms of the GNU General Public License v2 |
80 |
> + |
81 |
> +import time |
82 |
> +from portage.tests import TestCase |
83 |
> +from portage.util._async.ForkProcess import ForkProcess |
84 |
> +from portage.util._eventloop.global_event_loop import global_event_loop |
85 |
> +from portage.util.futures.iter_completed import iter_completed |
86 |
> + |
87 |
> + |
88 |
> +class SleepProcess(ForkProcess): |
89 |
> + __slots__ = ('future', 'seconds') |
90 |
> + def _start(self): |
91 |
> + self.addExitListener(self._future_done) |
92 |
> + ForkProcess._start(self) |
93 |
> + |
94 |
> + def _future_done(self, task): |
95 |
> + self.future.set_result(self.seconds) |
96 |
> + |
97 |
> + def _run(self): |
98 |
> + time.sleep(self.seconds) |
99 |
> + |
100 |
> + |
101 |
> +class IterCompletedTestCase(TestCase): |
102 |
> + |
103 |
> + def testIterCompleted(self): |
104 |
> + |
105 |
> + # Mark this as todo, since we don't want to fail if heavy |
106 |
> system |
107 |
> + # load causes the tasks to finish in an unexpected order. |
108 |
> + self.todo = True |
109 |
> + |
110 |
> + loop = global_event_loop() |
111 |
> + tasks = [ |
112 |
> + SleepProcess(seconds=0.200), |
113 |
> + SleepProcess(seconds=0.100), |
114 |
> + SleepProcess(seconds=0.001), |
115 |
> + ] |
116 |
> + |
117 |
> + expected_order = sorted(task.seconds for task in tasks) |
118 |
> + |
119 |
> + def future_generator(): |
120 |
> + for task in tasks: |
121 |
> + task.future = loop.create_future() |
122 |
> + task.scheduler = loop |
123 |
> + task.start() |
124 |
> + yield task.future |
125 |
> + |
126 |
> + for seconds, future in zip(expected_order, |
127 |
> iter_completed(future_generator(), |
128 |
> + max_jobs=None, max_load=None, loop=loop)): |
129 |
> + self.assertEqual(seconds, future.result()) |
130 |
> diff --git a/pym/portage/util/_async/FuturePollTask.py |
131 |
> b/pym/portage/util/_async/FuturePollTask.py |
132 |
> new file mode 100644 |
133 |
> index 000000000..6b7cdf7d5 |
134 |
> --- /dev/null |
135 |
> +++ b/pym/portage/util/_async/FuturePollTask.py |
136 |
> @@ -0,0 +1,27 @@ |
137 |
> +# Copyright 2018 Gentoo Foundation |
138 |
> +# Distributed under the terms of the GNU General Public License v2 |
139 |
> + |
140 |
> +import os |
141 |
> +import signal |
142 |
> + |
143 |
> +from _emerge.AbstractPollTask import AbstractPollTask |
144 |
> + |
145 |
> + |
146 |
> +class FuturePollTask(AbstractPollTask): |
147 |
> + """ |
148 |
> + Wraps a Future in an AsynchronousTask, which is useful for |
149 |
> + scheduling with TaskScheduler. |
150 |
> + """ |
151 |
> + __slots__ = ('future',) |
152 |
> + def _start(self): |
153 |
> + self.future.add_done_callback(self._done_callback) |
154 |
> + |
155 |
> + def _done_callback(self, future): |
156 |
> + if future.cancelled(): |
157 |
> + self.cancelled = True |
158 |
> + self.returncode = -signal.SIGINT |
159 |
> + elif future.exception() is None: |
160 |
> + self.returncode = os.EX_OK |
161 |
> + else: |
162 |
> + self.returncode = 1 |
163 |
> + self.wait() |
164 |
> diff --git a/pym/portage/util/futures/iter_completed.py |
165 |
> b/pym/portage/util/futures/iter_completed.py |
166 |
> new file mode 100644 |
167 |
> index 000000000..0540cc986 |
168 |
> --- /dev/null |
169 |
> +++ b/pym/portage/util/futures/iter_completed.py |
170 |
> @@ -0,0 +1,63 @@ |
171 |
> +# Copyright 2018 Gentoo Foundation |
172 |
> +# Distributed under the terms of the GNU General Public License v2 |
173 |
> + |
174 |
> +import multiprocessing |
175 |
> + |
176 |
> +from portage.util._async.FuturePollTask import FuturePollTask |
177 |
> +from portage.util._async.TaskScheduler import TaskScheduler |
178 |
> +from portage.util._eventloop.global_event_loop import global_event_loop |
179 |
> +from portage.util.futures.wait import wait, FIRST_COMPLETED |
180 |
> + |
181 |
> + |
182 |
> +def iter_completed(futures, max_jobs=None, max_load=None, loop=None): |
183 |
> + """ |
184 |
> + This is similar to asyncio.as_completed, but takes an iterator of |
185 |
> + futures as input, and includes support for max_jobs and max_load |
186 |
> + parameters. |
187 |
> + |
188 |
> + @param futures: iterator of asyncio.Future (or compatible) |
189 |
> + @type futures: iterator |
190 |
> + @param max_jobs: max number of futures to process concurrently |
191 |
> (default |
192 |
> + is multiprocessing.cpu_count()) |
193 |
> + @type max_jobs: int |
194 |
> + @param max_load: max load allowed when scheduling a new future, |
195 |
> + otherwise schedule no more than 1 future at a time (default |
196 |
> + is multiprocessing.cpu_count()) |
197 |
> + @type max_load: int or float |
198 |
> + @param loop: event loop |
199 |
> + @type loop: EventLoop |
200 |
> + @return: iterator of futures that are done |
201 |
> + @rtype: iterator |
202 |
> + """ |
203 |
> + loop = loop or global_event_loop() |
204 |
> + max_jobs = max_jobs or multiprocessing.cpu_count() |
205 |
> + max_load = max_load or multiprocessing.cpu_count() |
206 |
> + |
207 |
> + future_map = {} |
208 |
> + def task_generator(): |
209 |
> + for future in futures: |
210 |
> + future_map[id(future)] = future |
211 |
> + yield FuturePollTask(future=future) |
212 |
> + |
213 |
> + scheduler = TaskScheduler( |
214 |
> + task_generator(), |
215 |
> + max_jobs=max_jobs, |
216 |
> + max_load=max_load, |
217 |
> + event_loop=loop) |
218 |
> + |
219 |
> + try: |
220 |
> + scheduler.start() |
221 |
> + |
222 |
> + # scheduler should ensure that future_map is non-empty |
223 |
> until |
224 |
> + # task_generator is exhausted |
225 |
> + while future_map: |
226 |
> + done, pending = loop.run_until_complete( |
227 |
> + wait(*list(future_map.values()), |
228 |
> return_when=FIRST_COMPLETED)) |
229 |
> + for future in done: |
230 |
> + del future_map[id(future)] |
231 |
> + yield future |
232 |
> + |
233 |
> + finally: |
234 |
> + # cleanup in case of interruption by SIGINT, etc |
235 |
> + scheduler.cancel() |
236 |
> + scheduler.wait() |
237 |
> diff --git a/pym/portage/util/futures/wait.py b/pym/portage/util/futures/ |
238 |
> wait.py |
239 |
> new file mode 100644 |
240 |
> index 000000000..a65a25ac4 |
241 |
> --- /dev/null |
242 |
> +++ b/pym/portage/util/futures/wait.py |
243 |
> @@ -0,0 +1,95 @@ |
244 |
> +# Copyright 2018 Gentoo Foundation |
245 |
> +# Distributed under the terms of the GNU General Public License v2 |
246 |
> + |
247 |
> +try: |
248 |
> + from asyncio import ALL_COMPLETED, FIRST_COMPLETED, FIRST_EXCEPTION |
249 |
> +except ImportError: |
250 |
> + ALL_COMPLETED = 'ALL_COMPLETED' |
251 |
> + FIRST_COMPLETED ='FIRST_COMPLETED' |
252 |
> + FIRST_EXCEPTION = 'FIRST_EXCEPTION' |
253 |
> + |
254 |
> +from portage.util._eventloop.global_event_loop import global_event_loop |
255 |
> + |
256 |
> + |
257 |
> +# Use **kwargs since python2.7 does not allow arguments with defaults |
258 |
> +# to follow *futures. |
259 |
> +def wait(*futures, **kwargs): |
260 |
> + """ |
261 |
> + Use portage's internal EventLoop to emulate asyncio.wait: |
262 |
> + https://docs.python.org/3/library/asyncio-task.html#asyncio.wait |
263 |
> + |
264 |
> + @param future: future to wait for |
265 |
> + @type future: asyncio.Future (or compatible) |
266 |
> + @param timeout: number of seconds to wait (wait indefinitely if |
267 |
> + not specified) |
268 |
> + @type timeout: int or float |
269 |
> + @param return_when: indicates when this function should return, |
270 |
> must |
271 |
> + be one of the constants ALL_COMPLETED, FIRST_COMPLETED, or |
272 |
> + FIRST_EXCEPTION (default is ALL_COMPLETED) |
273 |
> + @type return_when: object |
274 |
> + @param loop: event loop |
275 |
> + @type loop: EventLoop |
276 |
> + @return: tuple of (done, pending). |
277 |
> + @rtype: asyncio.Future (or compatible) |
278 |
> + """ |
279 |
> + if not futures: |
280 |
> + raise TypeError("wait() missing 1 required positional |
281 |
> argument: 'future'") |
282 |
> + loop = kwargs.pop('loop', None) |
283 |
> + timeout = kwargs.pop('timeout', None) |
284 |
> + return_when = kwargs.pop('return_when', ALL_COMPLETED) |
285 |
> + if kwargs: |
286 |
> + raise TypeError("wait() got an unexpected keyword argument |
287 |
> '{}'".\ |
288 |
> + format(next(iter(kwargs)))) |
289 |
> + loop = loop or global_event_loop() |
290 |
> + result_future = loop.create_future() |
291 |
> + _Waiter(futures, timeout, return_when, result_future, loop) |
292 |
> + return result_future |
293 |
> + |
294 |
> + |
295 |
> +class _Waiter(object): |
296 |
> + def __init__(self, futures, timeout, return_when, result_future, |
297 |
> loop): |
298 |
> + self._futures = futures |
299 |
> + self._completed = set() |
300 |
> + self._exceptions = set() |
301 |
> + self._return_when = return_when |
302 |
> + self._result_future = result_future |
303 |
> + self._loop = loop |
304 |
> + self._ready = False |
305 |
> + self._timeout = None |
306 |
> + for future in self._futures: |
307 |
> + future.add_done_callback(self._done_callback) |
308 |
> + if timeout is not None: |
309 |
> + self._timeout = loop.call_later(timeout, |
310 |
> self._timeout_callback) |
311 |
> + |
312 |
> + def _timeout_callback(self): |
313 |
> + if not self._ready: |
314 |
> + self._ready = True |
315 |
> + self._ready_callback() |
316 |
> + |
317 |
> + def _done_callback(self, future): |
318 |
> + if future.cancelled() or future.exception() is None: |
319 |
> + self._completed.add(id(future)) |
320 |
> + else: |
321 |
> + self._exceptions.add(id(future)) |
322 |
> + if not self._ready and ( |
323 |
> + (self._return_when is FIRST_COMPLETED and |
324 |
> self._completed) or |
325 |
> + (self._return_when is FIRST_EXCEPTION and |
326 |
> self._exceptions) or |
327 |
> + (len(self._futures) == len(self._completed) + |
328 |
> len(self._exceptions))): |
329 |
> + self._ready = True |
330 |
> + # use call_soon in case multiple callbacks |
331 |
> complete in quick succession |
332 |
> + self._loop.call_soon(self._ready_callback) |
333 |
> + |
334 |
> + def _ready_callback(self): |
335 |
> + if self._timeout is not None: |
336 |
> + self._timeout.cancel() |
337 |
> + self._timeout = None |
338 |
> + done = [] |
339 |
> + pending = [] |
340 |
> + done_ids = self._completed.union(self._exceptions) |
341 |
> + for future in self._futures: |
342 |
> + if id(future) in done_ids: |
343 |
> + done.append(future) |
344 |
> + else: |
345 |
> + pending.append(future) |
346 |
> + future.remove_done_callback( |
347 |
> self._done_callback) |
348 |
> + self._result_future.set_result((done, pending)) |
349 |
> -- |
350 |
> 2.13.6 |
351 |
> |
352 |
> |
353 |
> |