Gentoo Archives: gentoo-commits

From: Zac Medico <zmedico@g.o>
To: gentoo-commits@l.g.o
Subject: [gentoo-commits] proj/portage:master commit in: pym/portage/tests/util/futures/, pym/portage/util/futures/, ...
Date: Wed, 28 Feb 2018 18:41:07
Message-Id: 1519838540.e43f6c583ed9205abbdcb11340c81d7dd97ccc11.zmedico@gentoo
1 commit: e43f6c583ed9205abbdcb11340c81d7dd97ccc11
2 Author: Zac Medico <zmedico <AT> gentoo <DOT> org>
3 AuthorDate: Sun Feb 25 23:19:58 2018 +0000
4 Commit: Zac Medico <zmedico <AT> gentoo <DOT> org>
5 CommitDate: Wed Feb 28 17:22:20 2018 +0000
6 URL: https://gitweb.gentoo.org/proj/portage.git/commit/?id=e43f6c58
7
8 Add iter_completed convenience function (bug 648790)
9
10 The iter_completed function is similar to asyncio.as_completed, but
11 takes an iterator of futures as input, and includes support for
12 max_jobs and max_load parameters. The default values for max_jobs
13 and max_load correspond to multiprocessing.cpu_count().
14
15 Example usage for async_aux_get:
16
17 import portage
18 from portage.util.futures.iter_completed import iter_completed
19
20 portdb = portage.portdb
21 # aux_get has many inputs, and the same cpv can exist in multiple
22 # repositories, so the caller is responsibe for mapping futures
23 # back to their aux_get inputs
24 future_cpv = {}
25
26 def future_generator():
27 for cpv in portdb.cp_list('sys-apps/portage'):
28 future = portdb.async_aux_get(cpv, portage.auxdbkeys)
29 future_cpv[id(future)] = cpv
30 yield future
31
32 for future in iter_completed(future_generator()):
33 cpv = future_cpv.pop(id(future))
34 try:
35 result = future.result()
36 except KeyError as e:
37 # aux_get failed
38 print('error:', cpv, e)
39 else:
40 print(cpv, result)
41
42 See: https://docs.python.org/3/library/asyncio-task.html#asyncio.as_completed
43 Bug: https://bugs.gentoo.org/648790
44
45 .../tests/util/futures/test_iter_completed.py | 50 ++++++++++
46 pym/portage/util/_async/AsyncTaskFuture.py | 31 +++++++
47 pym/portage/util/futures/iter_completed.py | 63 +++++++++++++
48 pym/portage/util/futures/wait.py | 102 +++++++++++++++++++++
49 4 files changed, 246 insertions(+)
50
51 diff --git a/pym/portage/tests/util/futures/test_iter_completed.py b/pym/portage/tests/util/futures/test_iter_completed.py
52 new file mode 100644
53 index 000000000..d0a7dbb45
54 --- /dev/null
55 +++ b/pym/portage/tests/util/futures/test_iter_completed.py
56 @@ -0,0 +1,50 @@
57 +# Copyright 2018 Gentoo Foundation
58 +# Distributed under the terms of the GNU General Public License v2
59 +
60 +import time
61 +from portage.tests import TestCase
62 +from portage.util._async.ForkProcess import ForkProcess
63 +from portage.util._eventloop.global_event_loop import global_event_loop
64 +from portage.util.futures.iter_completed import iter_completed
65 +
66 +
67 +class SleepProcess(ForkProcess):
68 + __slots__ = ('future', 'seconds')
69 + def _start(self):
70 + self.addExitListener(self._future_done)
71 + ForkProcess._start(self)
72 +
73 + def _future_done(self, task):
74 + self.future.set_result(self.seconds)
75 +
76 + def _run(self):
77 + time.sleep(self.seconds)
78 +
79 +
80 +class IterCompletedTestCase(TestCase):
81 +
82 + def testIterCompleted(self):
83 +
84 + # Mark this as todo, since we don't want to fail if heavy system
85 + # load causes the tasks to finish in an unexpected order.
86 + self.todo = True
87 +
88 + loop = global_event_loop()
89 + tasks = [
90 + SleepProcess(seconds=0.200),
91 + SleepProcess(seconds=0.100),
92 + SleepProcess(seconds=0.001),
93 + ]
94 +
95 + expected_order = sorted(task.seconds for task in tasks)
96 +
97 + def future_generator():
98 + for task in tasks:
99 + task.future = loop.create_future()
100 + task.scheduler = loop
101 + task.start()
102 + yield task.future
103 +
104 + for seconds, future in zip(expected_order, iter_completed(future_generator(),
105 + max_jobs=True, max_load=None, loop=loop)):
106 + self.assertEqual(seconds, future.result())
107
108 diff --git a/pym/portage/util/_async/AsyncTaskFuture.py b/pym/portage/util/_async/AsyncTaskFuture.py
109 new file mode 100644
110 index 000000000..ee39183fe
111 --- /dev/null
112 +++ b/pym/portage/util/_async/AsyncTaskFuture.py
113 @@ -0,0 +1,31 @@
114 +# Copyright 2018 Gentoo Foundation
115 +# Distributed under the terms of the GNU General Public License v2
116 +
117 +import os
118 +import signal
119 +
120 +from _emerge.AsynchronousTask import AsynchronousTask
121 +
122 +
123 +class AsyncTaskFuture(AsynchronousTask):
124 + """
125 + Wraps a Future in an AsynchronousTask, which is useful for
126 + scheduling with TaskScheduler.
127 + """
128 + __slots__ = ('future', 'scheduler')
129 + def _start(self):
130 + self.future.add_done_callback(self._done_callback)
131 +
132 + def _cancel(self):
133 + if not self.future.done():
134 + self.future.cancel()
135 +
136 + def _done_callback(self, future):
137 + if future.cancelled():
138 + self.cancelled = True
139 + self.returncode = -signal.SIGINT
140 + elif future.exception() is None:
141 + self.returncode = os.EX_OK
142 + else:
143 + self.returncode = 1
144 + self.wait()
145
146 diff --git a/pym/portage/util/futures/iter_completed.py b/pym/portage/util/futures/iter_completed.py
147 new file mode 100644
148 index 000000000..1050b6fa7
149 --- /dev/null
150 +++ b/pym/portage/util/futures/iter_completed.py
151 @@ -0,0 +1,63 @@
152 +# Copyright 2018 Gentoo Foundation
153 +# Distributed under the terms of the GNU General Public License v2
154 +
155 +import multiprocessing
156 +
157 +from portage.util._async.AsyncTaskFuture import AsyncTaskFuture
158 +from portage.util._async.TaskScheduler import TaskScheduler
159 +from portage.util._eventloop.global_event_loop import global_event_loop
160 +from portage.util.futures.wait import wait, FIRST_COMPLETED
161 +
162 +
163 +def iter_completed(futures, max_jobs=None, max_load=None, loop=None):
164 + """
165 + This is similar to asyncio.as_completed, but takes an iterator of
166 + futures as input, and includes support for max_jobs and max_load
167 + parameters.
168 +
169 + @param futures: iterator of asyncio.Future (or compatible)
170 + @type futures: iterator
171 + @param max_jobs: max number of futures to process concurrently (default
172 + is multiprocessing.cpu_count())
173 + @type max_jobs: int
174 + @param max_load: max load allowed when scheduling a new future,
175 + otherwise schedule no more than 1 future at a time (default
176 + is multiprocessing.cpu_count())
177 + @type max_load: int or float
178 + @param loop: event loop
179 + @type loop: EventLoop
180 + @return: iterator of futures that are done
181 + @rtype: iterator
182 + """
183 + loop = loop or global_event_loop()
184 + max_jobs = max_jobs or multiprocessing.cpu_count()
185 + max_load = max_load or multiprocessing.cpu_count()
186 +
187 + future_map = {}
188 + def task_generator():
189 + for future in futures:
190 + future_map[id(future)] = future
191 + yield AsyncTaskFuture(future=future)
192 +
193 + scheduler = TaskScheduler(
194 + task_generator(),
195 + max_jobs=max_jobs,
196 + max_load=max_load,
197 + event_loop=loop)
198 +
199 + try:
200 + scheduler.start()
201 +
202 + # scheduler should ensure that future_map is non-empty until
203 + # task_generator is exhausted
204 + while future_map:
205 + done, pending = loop.run_until_complete(
206 + wait(*list(future_map.values()), return_when=FIRST_COMPLETED))
207 + for future in done:
208 + del future_map[id(future)]
209 + yield future
210 +
211 + finally:
212 + # cleanup in case of interruption by SIGINT, etc
213 + scheduler.cancel()
214 + scheduler.wait()
215
216 diff --git a/pym/portage/util/futures/wait.py b/pym/portage/util/futures/wait.py
217 new file mode 100644
218 index 000000000..3f0bdbff5
219 --- /dev/null
220 +++ b/pym/portage/util/futures/wait.py
221 @@ -0,0 +1,102 @@
222 +# Copyright 2018 Gentoo Foundation
223 +# Distributed under the terms of the GNU General Public License v2
224 +
225 +try:
226 + from asyncio import ALL_COMPLETED, FIRST_COMPLETED, FIRST_EXCEPTION
227 +except ImportError:
228 + ALL_COMPLETED = 'ALL_COMPLETED'
229 + FIRST_COMPLETED ='FIRST_COMPLETED'
230 + FIRST_EXCEPTION = 'FIRST_EXCEPTION'
231 +
232 +from portage.util._eventloop.global_event_loop import global_event_loop
233 +
234 +
235 +# Use **kwargs since python2.7 does not allow arguments with defaults
236 +# to follow *futures.
237 +def wait(*futures, **kwargs):
238 + """
239 + Use portage's internal EventLoop to emulate asyncio.wait:
240 + https://docs.python.org/3/library/asyncio-task.html#asyncio.wait
241 +
242 + @param future: future to wait for
243 + @type future: asyncio.Future (or compatible)
244 + @param timeout: number of seconds to wait (wait indefinitely if
245 + not specified)
246 + @type timeout: int or float
247 + @param return_when: indicates when this function should return, must
248 + be one of the constants ALL_COMPLETED, FIRST_COMPLETED, or
249 + FIRST_EXCEPTION (default is ALL_COMPLETED)
250 + @type return_when: object
251 + @param loop: event loop
252 + @type loop: EventLoop
253 + @return: tuple of (done, pending).
254 + @rtype: asyncio.Future (or compatible)
255 + """
256 + if not futures:
257 + raise TypeError("wait() missing 1 required positional argument: 'future'")
258 + loop = kwargs.pop('loop', None)
259 + timeout = kwargs.pop('timeout', None)
260 + return_when = kwargs.pop('return_when', ALL_COMPLETED)
261 + if kwargs:
262 + raise TypeError("wait() got an unexpected keyword argument '{}'".\
263 + format(next(iter(kwargs))))
264 + loop = loop or global_event_loop()
265 + result_future = loop.create_future()
266 + _Waiter(futures, timeout, return_when, result_future, loop)
267 + return result_future
268 +
269 +
270 +class _Waiter(object):
271 + def __init__(self, futures, timeout, return_when, result_future, loop):
272 + self._futures = futures
273 + self._completed = set()
274 + self._exceptions = set()
275 + self._return_when = return_when
276 + self._result_future = result_future
277 + self._loop = loop
278 + self._ready = False
279 + self._timeout = None
280 + result_future.add_done_callback(self._cancel_callback)
281 + for future in self._futures:
282 + future.add_done_callback(self._done_callback)
283 + if timeout is not None:
284 + self._timeout = loop.call_later(timeout, self._timeout_callback)
285 +
286 + def _cancel_callback(self, future):
287 + if future.cancelled():
288 + self._ready_callback()
289 +
290 + def _timeout_callback(self):
291 + if not self._ready:
292 + self._ready = True
293 + self._ready_callback()
294 +
295 + def _done_callback(self, future):
296 + if future.cancelled() or future.exception() is None:
297 + self._completed.add(id(future))
298 + else:
299 + self._exceptions.add(id(future))
300 + if not self._ready and (
301 + (self._return_when is FIRST_COMPLETED and self._completed) or
302 + (self._return_when is FIRST_EXCEPTION and self._exceptions) or
303 + (len(self._futures) == len(self._completed) + len(self._exceptions))):
304 + self._ready = True
305 + # use call_soon in case multiple callbacks complete in quick succession
306 + self._loop.call_soon(self._ready_callback)
307 +
308 + def _ready_callback(self):
309 + if self._timeout is not None:
310 + self._timeout.cancel()
311 + self._timeout = None
312 + if self._result_future.cancelled():
313 + return
314 + done = []
315 + pending = []
316 + done_ids = self._completed.union(self._exceptions)
317 + for future in self._futures:
318 + if id(future) in done_ids:
319 + done.append(future)
320 + else:
321 + pending.append(future)
322 + future.remove_done_callback(self._done_callback)
323 + self._result_future.set_result((done, pending))