Gentoo Archives: gentoo-portage-dev

From: Zac Medico <zmedico@g.o>
To: gentoo-portage-dev@l.g.o
Cc: Zac Medico <zmedico@g.o>
Subject: [gentoo-portage-dev] [PATCH 2/2] Add iter_completed convenience function (bug 648790)
Date: Mon, 26 Feb 2018 01:52:56
Message-Id: 20180226015001.30049-2-zmedico@gentoo.org
In Reply to: [gentoo-portage-dev] [PATCH 1/2] portdbapi: add async_aux_get method (bug 648790) by Zac Medico
1 The iter_completed function is similar to asyncio.as_completed, but
2 takes an iterator of futures as input, and includes support for
3 max_jobs and max_load parameters. The default values for max_jobs
4 and max_load correspond to multiprocessing.cpu_count().
5
6 Example usage for async_aux_get:
7
8 import portage
9 from portage.util.futures.iter_completed import iter_completed
10
11 portdb = portage.portdb
12 future_cpv = {}
13
14 def future_generator():
15 for cpv in portdb.cp_list('sys-apps/portage'):
16 future = portdb.async_aux_get(cpv, portage.auxdbkeys)
17 future_cpv[id(future)] = cpv
18 yield future
19
20 for future in iter_completed(future_generator()):
21 cpv = future_cpv.pop(id(future))
22 try:
23 result = future.result()
24 except KeyError as e:
25 # aux_get failed
26 print('error:', cpv, e)
27 else:
28 print(cpv, result)
29
30 See: https://docs.python.org/3/library/asyncio-task.html#asyncio.as_completed
31 Bug: https://bugs.gentoo.org/648790
32 ---
33 .../tests/util/futures/test_iter_completed.py | 50 ++++++++++++
34 pym/portage/util/_async/FuturePollTask.py | 27 ++++++
35 pym/portage/util/futures/iter_completed.py | 63 ++++++++++++++
36 pym/portage/util/futures/wait.py | 95 ++++++++++++++++++++++
37 4 files changed, 235 insertions(+)
38 create mode 100644 pym/portage/tests/util/futures/test_iter_completed.py
39 create mode 100644 pym/portage/util/_async/FuturePollTask.py
40 create mode 100644 pym/portage/util/futures/iter_completed.py
41 create mode 100644 pym/portage/util/futures/wait.py
42
43 diff --git a/pym/portage/tests/util/futures/test_iter_completed.py b/pym/portage/tests/util/futures/test_iter_completed.py
44 new file mode 100644
45 index 000000000..6607d871c
46 --- /dev/null
47 +++ b/pym/portage/tests/util/futures/test_iter_completed.py
48 @@ -0,0 +1,50 @@
49 +# Copyright 2018 Gentoo Foundation
50 +# Distributed under the terms of the GNU General Public License v2
51 +
52 +import time
53 +from portage.tests import TestCase
54 +from portage.util._async.ForkProcess import ForkProcess
55 +from portage.util._eventloop.global_event_loop import global_event_loop
56 +from portage.util.futures.iter_completed import iter_completed
57 +
58 +
59 +class SleepProcess(ForkProcess):
60 + __slots__ = ('future', 'seconds')
61 + def _start(self):
62 + self.addExitListener(self._future_done)
63 + ForkProcess._start(self)
64 +
65 + def _future_done(self, task):
66 + self.future.set_result(self.seconds)
67 +
68 + def _run(self):
69 + time.sleep(self.seconds)
70 +
71 +
72 +class IterCompletedTestCase(TestCase):
73 +
74 + def testIterCompleted(self):
75 +
76 + # Mark this as todo, since we don't want to fail if heavy system
77 + # load causes the tasks to finish in an unexpected order.
78 + self.todo = True
79 +
80 + loop = global_event_loop()
81 + tasks = [
82 + SleepProcess(seconds=0.200),
83 + SleepProcess(seconds=0.100),
84 + SleepProcess(seconds=0.001),
85 + ]
86 +
87 + expected_order = sorted(task.seconds for task in tasks)
88 +
89 + def future_generator():
90 + for task in tasks:
91 + task.future = loop.create_future()
92 + task.scheduler = loop
93 + task.start()
94 + yield task.future
95 +
96 + for seconds, future in zip(expected_order, iter_completed(future_generator(),
97 + max_jobs=None, max_load=None, loop=loop)):
98 + self.assertEqual(seconds, future.result())
99 diff --git a/pym/portage/util/_async/FuturePollTask.py b/pym/portage/util/_async/FuturePollTask.py
100 new file mode 100644
101 index 000000000..6b7cdf7d5
102 --- /dev/null
103 +++ b/pym/portage/util/_async/FuturePollTask.py
104 @@ -0,0 +1,27 @@
105 +# Copyright 2018 Gentoo Foundation
106 +# Distributed under the terms of the GNU General Public License v2
107 +
108 +import os
109 +import signal
110 +
111 +from _emerge.AbstractPollTask import AbstractPollTask
112 +
113 +
114 +class FuturePollTask(AbstractPollTask):
115 + """
116 + Wraps a Future in an AsynchronousTask, which is useful for
117 + scheduling with TaskScheduler.
118 + """
119 + __slots__ = ('future',)
120 + def _start(self):
121 + self.future.add_done_callback(self._done_callback)
122 +
123 + def _done_callback(self, future):
124 + if future.cancelled():
125 + self.cancelled = True
126 + self.returncode = -signal.SIGINT
127 + elif future.exception() is None:
128 + self.returncode = os.EX_OK
129 + else:
130 + self.returncode = 1
131 + self.wait()
132 diff --git a/pym/portage/util/futures/iter_completed.py b/pym/portage/util/futures/iter_completed.py
133 new file mode 100644
134 index 000000000..0540cc986
135 --- /dev/null
136 +++ b/pym/portage/util/futures/iter_completed.py
137 @@ -0,0 +1,63 @@
138 +# Copyright 2018 Gentoo Foundation
139 +# Distributed under the terms of the GNU General Public License v2
140 +
141 +import multiprocessing
142 +
143 +from portage.util._async.FuturePollTask import FuturePollTask
144 +from portage.util._async.TaskScheduler import TaskScheduler
145 +from portage.util._eventloop.global_event_loop import global_event_loop
146 +from portage.util.futures.wait import wait, FIRST_COMPLETED
147 +
148 +
149 +def iter_completed(futures, max_jobs=None, max_load=None, loop=None):
150 + """
151 + This is similar to asyncio.as_completed, but takes an iterator of
152 + futures as input, and includes support for max_jobs and max_load
153 + parameters.
154 +
155 + @param futures: iterator of asyncio.Future (or compatible)
156 + @type futures: iterator
157 + @param max_jobs: max number of futures to process concurrently (default
158 + is multiprocessing.cpu_count())
159 + @type max_jobs: int
160 + @param max_load: max load allowed when scheduling a new future,
161 + otherwise schedule no more than 1 future at a time (default
162 + is multiprocessing.cpu_count())
163 + @type max_load: int or float
164 + @param loop: event loop
165 + @type loop: EventLoop
166 + @return: iterator of futures that are done
167 + @rtype: iterator
168 + """
169 + loop = loop or global_event_loop()
170 + max_jobs = max_jobs or multiprocessing.cpu_count()
171 + max_load = max_load or multiprocessing.cpu_count()
172 +
173 + future_map = {}
174 + def task_generator():
175 + for future in futures:
176 + future_map[id(future)] = future
177 + yield FuturePollTask(future=future)
178 +
179 + scheduler = TaskScheduler(
180 + task_generator(),
181 + max_jobs=max_jobs,
182 + max_load=max_load,
183 + event_loop=loop)
184 +
185 + try:
186 + scheduler.start()
187 +
188 + # scheduler should ensure that future_map is non-empty until
189 + # task_generator is exhausted
190 + while future_map:
191 + done, pending = loop.run_until_complete(
192 + wait(*list(future_map.values()), return_when=FIRST_COMPLETED))
193 + for future in done:
194 + del future_map[id(future)]
195 + yield future
196 +
197 + finally:
198 + # cleanup in case of interruption by SIGINT, etc
199 + scheduler.cancel()
200 + scheduler.wait()
201 diff --git a/pym/portage/util/futures/wait.py b/pym/portage/util/futures/wait.py
202 new file mode 100644
203 index 000000000..a65a25ac4
204 --- /dev/null
205 +++ b/pym/portage/util/futures/wait.py
206 @@ -0,0 +1,95 @@
207 +# Copyright 2018 Gentoo Foundation
208 +# Distributed under the terms of the GNU General Public License v2
209 +
210 +try:
211 + from asyncio import ALL_COMPLETED, FIRST_COMPLETED, FIRST_EXCEPTION
212 +except ImportError:
213 + ALL_COMPLETED = 'ALL_COMPLETED'
214 + FIRST_COMPLETED ='FIRST_COMPLETED'
215 + FIRST_EXCEPTION = 'FIRST_EXCEPTION'
216 +
217 +from portage.util._eventloop.global_event_loop import global_event_loop
218 +
219 +
220 +# Use **kwargs since python2.7 does not allow arguments with defaults
221 +# to follow *futures.
222 +def wait(*futures, **kwargs):
223 + """
224 + Use portage's internal EventLoop to emulate asyncio.wait:
225 + https://docs.python.org/3/library/asyncio-task.html#asyncio.wait
226 +
227 + @param future: future to wait for
228 + @type future: asyncio.Future (or compatible)
229 + @param timeout: number of seconds to wait (wait indefinitely if
230 + not specified)
231 + @type timeout: int or float
232 + @param return_when: indicates when this function should return, must
233 + be one of the constants ALL_COMPLETED, FIRST_COMPLETED, or
234 + FIRST_EXCEPTION (default is ALL_COMPLETED)
235 + @type return_when: object
236 + @param loop: event loop
237 + @type loop: EventLoop
238 + @return: tuple of (done, pending).
239 + @rtype: asyncio.Future (or compatible)
240 + """
241 + if not futures:
242 + raise TypeError("wait() missing 1 required positional argument: 'future'")
243 + loop = kwargs.pop('loop', None)
244 + timeout = kwargs.pop('timeout', None)
245 + return_when = kwargs.pop('return_when', ALL_COMPLETED)
246 + if kwargs:
247 + raise TypeError("wait() got an unexpected keyword argument '{}'".\
248 + format(next(iter(kwargs))))
249 + loop = loop or global_event_loop()
250 + result_future = loop.create_future()
251 + _Waiter(futures, timeout, return_when, result_future, loop)
252 + return result_future
253 +
254 +
255 +class _Waiter(object):
256 + def __init__(self, futures, timeout, return_when, result_future, loop):
257 + self._futures = futures
258 + self._completed = set()
259 + self._exceptions = set()
260 + self._return_when = return_when
261 + self._result_future = result_future
262 + self._loop = loop
263 + self._ready = False
264 + self._timeout = None
265 + for future in self._futures:
266 + future.add_done_callback(self._done_callback)
267 + if timeout is not None:
268 + self._timeout = loop.call_later(timeout, self._timeout_callback)
269 +
270 + def _timeout_callback(self):
271 + if not self._ready:
272 + self._ready = True
273 + self._ready_callback()
274 +
275 + def _done_callback(self, future):
276 + if future.cancelled() or future.exception() is None:
277 + self._completed.add(id(future))
278 + else:
279 + self._exceptions.add(id(future))
280 + if not self._ready and (
281 + (self._return_when is FIRST_COMPLETED and self._completed) or
282 + (self._return_when is FIRST_EXCEPTION and self._exceptions) or
283 + (len(self._futures) == len(self._completed) + len(self._exceptions))):
284 + self._ready = True
285 + # use call_soon in case multiple callbacks complete in quick succession
286 + self._loop.call_soon(self._ready_callback)
287 +
288 + def _ready_callback(self):
289 + if self._timeout is not None:
290 + self._timeout.cancel()
291 + self._timeout = None
292 + done = []
293 + pending = []
294 + done_ids = self._completed.union(self._exceptions)
295 + for future in self._futures:
296 + if id(future) in done_ids:
297 + done.append(future)
298 + else:
299 + pending.append(future)
300 + future.remove_done_callback(self._done_callback)
301 + self._result_future.set_result((done, pending))
302 --
303 2.13.6

Replies