Gentoo Archives: gentoo-portage-dev

From: Alec Warner <antarus@g.o>
To: gentoo-portage-dev@l.g.o
Cc: Zac Medico <zmedico@g.o>
Subject: Re: [gentoo-portage-dev] [PATCH 2/2] Add iter_completed convenience function (bug 648790)
Date: Mon, 26 Feb 2018 03:17:15
Message-Id: CAAr7Pr-SyTu6dNu=zp9jD4yyYnp-azFFhFFeMvZ+tOP84XKb7w@mail.gmail.com
In Reply to: [gentoo-portage-dev] [PATCH 2/2] Add iter_completed convenience function (bug 648790) by Zac Medico
1 On Sun, Feb 25, 2018 at 8:50 PM, Zac Medico <zmedico@g.o> wrote:
2
3 > The iter_completed function is similar to asyncio.as_completed, but
4 > takes an iterator of futures as input, and includes support for
5 > max_jobs and max_load parameters. The default values for max_jobs
6 > and max_load correspond to multiprocessing.cpu_count().
7 >
8 > Example usage for async_aux_get:
9 >
10 > import portage
11 > from portage.util.futures.iter_completed import iter_completed
12 >
13 > portdb = portage.portdb
14 > future_cpv = {}
15 >
16
17 I'm not sure I grasp the purpose of this dict, can't we just modify the
18 async aux get to return the cpv from the future?
19
20
21 >
22 > def future_generator():
23 > for cpv in portdb.cp_list('sys-apps/portage'):
24 > future = portdb.async_aux_get(cpv, portage.auxdbkeys)
25 > future_cpv[id(future)] = cpv
26 > yield future
27 >
28 >
29 for cpv in portdb.cp_list('...'):
30 yield portdb.async_aux_get(cpv, portage.auxdbkeys)
31
32
33 > for future in iter_completed(future_generator()):
34 > cpv = future_cpv.pop(id(future))
35 > try:
36 > result = future.result()
37 > except KeyError as e:
38 > # aux_get failed
39 > print('error:', cpv, e)
40 > else:
41 > print(cpv, result)
42 >
43
44 for future in iter_completed(future_generator()):
45 try:
46 cpv, result = future.result()
47 except KeyError as e:
48 print('error', cpv, e)
49
50
51 Or do we expect callers to need other things to key off of in this API?
52
53 -A
54
55
56 > See: https://docs.python.org/3/library/asyncio-task.html#
57 > asyncio.as_completed
58 > Bug: https://bugs.gentoo.org/648790
59 > ---
60 > .../tests/util/futures/test_iter_completed.py | 50 ++++++++++++
61 > pym/portage/util/_async/FuturePollTask.py | 27 ++++++
62 > pym/portage/util/futures/iter_completed.py | 63 ++++++++++++++
63 > pym/portage/util/futures/wait.py | 95
64 > ++++++++++++++++++++++
65 > 4 files changed, 235 insertions(+)
66 > create mode 100644 pym/portage/tests/util/futures/test_iter_completed.py
67 > create mode 100644 pym/portage/util/_async/FuturePollTask.py
68 > create mode 100644 pym/portage/util/futures/iter_completed.py
69 > create mode 100644 pym/portage/util/futures/wait.py
70 >
71 > diff --git a/pym/portage/tests/util/futures/test_iter_completed.py
72 > b/pym/portage/tests/util/futures/test_iter_completed.py
73 > new file mode 100644
74 > index 000000000..6607d871c
75 > --- /dev/null
76 > +++ b/pym/portage/tests/util/futures/test_iter_completed.py
77 > @@ -0,0 +1,50 @@
78 > +# Copyright 2018 Gentoo Foundation
79 > +# Distributed under the terms of the GNU General Public License v2
80 > +
81 > +import time
82 > +from portage.tests import TestCase
83 > +from portage.util._async.ForkProcess import ForkProcess
84 > +from portage.util._eventloop.global_event_loop import global_event_loop
85 > +from portage.util.futures.iter_completed import iter_completed
86 > +
87 > +
88 > +class SleepProcess(ForkProcess):
89 > + __slots__ = ('future', 'seconds')
90 > + def _start(self):
91 > + self.addExitListener(self._future_done)
92 > + ForkProcess._start(self)
93 > +
94 > + def _future_done(self, task):
95 > + self.future.set_result(self.seconds)
96 > +
97 > + def _run(self):
98 > + time.sleep(self.seconds)
99 > +
100 > +
101 > +class IterCompletedTestCase(TestCase):
102 > +
103 > + def testIterCompleted(self):
104 > +
105 > + # Mark this as todo, since we don't want to fail if heavy
106 > system
107 > + # load causes the tasks to finish in an unexpected order.
108 > + self.todo = True
109 > +
110 > + loop = global_event_loop()
111 > + tasks = [
112 > + SleepProcess(seconds=0.200),
113 > + SleepProcess(seconds=0.100),
114 > + SleepProcess(seconds=0.001),
115 > + ]
116 > +
117 > + expected_order = sorted(task.seconds for task in tasks)
118 > +
119 > + def future_generator():
120 > + for task in tasks:
121 > + task.future = loop.create_future()
122 > + task.scheduler = loop
123 > + task.start()
124 > + yield task.future
125 > +
126 > + for seconds, future in zip(expected_order,
127 > iter_completed(future_generator(),
128 > + max_jobs=None, max_load=None, loop=loop)):
129 > + self.assertEqual(seconds, future.result())
130 > diff --git a/pym/portage/util/_async/FuturePollTask.py
131 > b/pym/portage/util/_async/FuturePollTask.py
132 > new file mode 100644
133 > index 000000000..6b7cdf7d5
134 > --- /dev/null
135 > +++ b/pym/portage/util/_async/FuturePollTask.py
136 > @@ -0,0 +1,27 @@
137 > +# Copyright 2018 Gentoo Foundation
138 > +# Distributed under the terms of the GNU General Public License v2
139 > +
140 > +import os
141 > +import signal
142 > +
143 > +from _emerge.AbstractPollTask import AbstractPollTask
144 > +
145 > +
146 > +class FuturePollTask(AbstractPollTask):
147 > + """
148 > + Wraps a Future in an AsynchronousTask, which is useful for
149 > + scheduling with TaskScheduler.
150 > + """
151 > + __slots__ = ('future',)
152 > + def _start(self):
153 > + self.future.add_done_callback(self._done_callback)
154 > +
155 > + def _done_callback(self, future):
156 > + if future.cancelled():
157 > + self.cancelled = True
158 > + self.returncode = -signal.SIGINT
159 > + elif future.exception() is None:
160 > + self.returncode = os.EX_OK
161 > + else:
162 > + self.returncode = 1
163 > + self.wait()
164 > diff --git a/pym/portage/util/futures/iter_completed.py
165 > b/pym/portage/util/futures/iter_completed.py
166 > new file mode 100644
167 > index 000000000..0540cc986
168 > --- /dev/null
169 > +++ b/pym/portage/util/futures/iter_completed.py
170 > @@ -0,0 +1,63 @@
171 > +# Copyright 2018 Gentoo Foundation
172 > +# Distributed under the terms of the GNU General Public License v2
173 > +
174 > +import multiprocessing
175 > +
176 > +from portage.util._async.FuturePollTask import FuturePollTask
177 > +from portage.util._async.TaskScheduler import TaskScheduler
178 > +from portage.util._eventloop.global_event_loop import global_event_loop
179 > +from portage.util.futures.wait import wait, FIRST_COMPLETED
180 > +
181 > +
182 > +def iter_completed(futures, max_jobs=None, max_load=None, loop=None):
183 > + """
184 > + This is similar to asyncio.as_completed, but takes an iterator of
185 > + futures as input, and includes support for max_jobs and max_load
186 > + parameters.
187 > +
188 > + @param futures: iterator of asyncio.Future (or compatible)
189 > + @type futures: iterator
190 > + @param max_jobs: max number of futures to process concurrently
191 > (default
192 > + is multiprocessing.cpu_count())
193 > + @type max_jobs: int
194 > + @param max_load: max load allowed when scheduling a new future,
195 > + otherwise schedule no more than 1 future at a time (default
196 > + is multiprocessing.cpu_count())
197 > + @type max_load: int or float
198 > + @param loop: event loop
199 > + @type loop: EventLoop
200 > + @return: iterator of futures that are done
201 > + @rtype: iterator
202 > + """
203 > + loop = loop or global_event_loop()
204 > + max_jobs = max_jobs or multiprocessing.cpu_count()
205 > + max_load = max_load or multiprocessing.cpu_count()
206 > +
207 > + future_map = {}
208 > + def task_generator():
209 > + for future in futures:
210 > + future_map[id(future)] = future
211 > + yield FuturePollTask(future=future)
212 > +
213 > + scheduler = TaskScheduler(
214 > + task_generator(),
215 > + max_jobs=max_jobs,
216 > + max_load=max_load,
217 > + event_loop=loop)
218 > +
219 > + try:
220 > + scheduler.start()
221 > +
222 > + # scheduler should ensure that future_map is non-empty
223 > until
224 > + # task_generator is exhausted
225 > + while future_map:
226 > + done, pending = loop.run_until_complete(
227 > + wait(*list(future_map.values()),
228 > return_when=FIRST_COMPLETED))
229 > + for future in done:
230 > + del future_map[id(future)]
231 > + yield future
232 > +
233 > + finally:
234 > + # cleanup in case of interruption by SIGINT, etc
235 > + scheduler.cancel()
236 > + scheduler.wait()
237 > diff --git a/pym/portage/util/futures/wait.py b/pym/portage/util/futures/
238 > wait.py
239 > new file mode 100644
240 > index 000000000..a65a25ac4
241 > --- /dev/null
242 > +++ b/pym/portage/util/futures/wait.py
243 > @@ -0,0 +1,95 @@
244 > +# Copyright 2018 Gentoo Foundation
245 > +# Distributed under the terms of the GNU General Public License v2
246 > +
247 > +try:
248 > + from asyncio import ALL_COMPLETED, FIRST_COMPLETED, FIRST_EXCEPTION
249 > +except ImportError:
250 > + ALL_COMPLETED = 'ALL_COMPLETED'
251 > + FIRST_COMPLETED ='FIRST_COMPLETED'
252 > + FIRST_EXCEPTION = 'FIRST_EXCEPTION'
253 > +
254 > +from portage.util._eventloop.global_event_loop import global_event_loop
255 > +
256 > +
257 > +# Use **kwargs since python2.7 does not allow arguments with defaults
258 > +# to follow *futures.
259 > +def wait(*futures, **kwargs):
260 > + """
261 > + Use portage's internal EventLoop to emulate asyncio.wait:
262 > + https://docs.python.org/3/library/asyncio-task.html#asyncio.wait
263 > +
264 > + @param future: future to wait for
265 > + @type future: asyncio.Future (or compatible)
266 > + @param timeout: number of seconds to wait (wait indefinitely if
267 > + not specified)
268 > + @type timeout: int or float
269 > + @param return_when: indicates when this function should return,
270 > must
271 > + be one of the constants ALL_COMPLETED, FIRST_COMPLETED, or
272 > + FIRST_EXCEPTION (default is ALL_COMPLETED)
273 > + @type return_when: object
274 > + @param loop: event loop
275 > + @type loop: EventLoop
276 > + @return: tuple of (done, pending).
277 > + @rtype: asyncio.Future (or compatible)
278 > + """
279 > + if not futures:
280 > + raise TypeError("wait() missing 1 required positional
281 > argument: 'future'")
282 > + loop = kwargs.pop('loop', None)
283 > + timeout = kwargs.pop('timeout', None)
284 > + return_when = kwargs.pop('return_when', ALL_COMPLETED)
285 > + if kwargs:
286 > + raise TypeError("wait() got an unexpected keyword argument
287 > '{}'".\
288 > + format(next(iter(kwargs))))
289 > + loop = loop or global_event_loop()
290 > + result_future = loop.create_future()
291 > + _Waiter(futures, timeout, return_when, result_future, loop)
292 > + return result_future
293 > +
294 > +
295 > +class _Waiter(object):
296 > + def __init__(self, futures, timeout, return_when, result_future,
297 > loop):
298 > + self._futures = futures
299 > + self._completed = set()
300 > + self._exceptions = set()
301 > + self._return_when = return_when
302 > + self._result_future = result_future
303 > + self._loop = loop
304 > + self._ready = False
305 > + self._timeout = None
306 > + for future in self._futures:
307 > + future.add_done_callback(self._done_callback)
308 > + if timeout is not None:
309 > + self._timeout = loop.call_later(timeout,
310 > self._timeout_callback)
311 > +
312 > + def _timeout_callback(self):
313 > + if not self._ready:
314 > + self._ready = True
315 > + self._ready_callback()
316 > +
317 > + def _done_callback(self, future):
318 > + if future.cancelled() or future.exception() is None:
319 > + self._completed.add(id(future))
320 > + else:
321 > + self._exceptions.add(id(future))
322 > + if not self._ready and (
323 > + (self._return_when is FIRST_COMPLETED and
324 > self._completed) or
325 > + (self._return_when is FIRST_EXCEPTION and
326 > self._exceptions) or
327 > + (len(self._futures) == len(self._completed) +
328 > len(self._exceptions))):
329 > + self._ready = True
330 > + # use call_soon in case multiple callbacks
331 > complete in quick succession
332 > + self._loop.call_soon(self._ready_callback)
333 > +
334 > + def _ready_callback(self):
335 > + if self._timeout is not None:
336 > + self._timeout.cancel()
337 > + self._timeout = None
338 > + done = []
339 > + pending = []
340 > + done_ids = self._completed.union(self._exceptions)
341 > + for future in self._futures:
342 > + if id(future) in done_ids:
343 > + done.append(future)
344 > + else:
345 > + pending.append(future)
346 > + future.remove_done_callback(
347 > self._done_callback)
348 > + self._result_future.set_result((done, pending))
349 > --
350 > 2.13.6
351 >
352 >
353 >

Replies