1 |
commit: be61882996099322bb3a1e82e71f475b4141ad40 |
2 |
Author: Zac Medico <zmedico <AT> gentoo <DOT> org> |
3 |
AuthorDate: Tue Apr 24 23:28:08 2018 +0000 |
4 |
Commit: Zac Medico <zmedico <AT> gentoo <DOT> org> |
5 |
CommitDate: Fri Apr 27 21:33:00 2018 +0000 |
6 |
URL: https://gitweb.gentoo.org/proj/portage.git/commit/?id=be618829 |
7 |
|
8 |
Add iter_gather function (bug 653946) |
9 |
|
10 |
This is similar to asyncio.gather, but takes an iterator of |
11 |
futures as input, and includes support for max_jobs and max_load |
12 |
parameters. For bug 653946, this will be used to asynchronously |
13 |
gather the results of the portdbapi.async_fetch_map calls that |
14 |
are required to generate a Manifest, while using the max_jobs |
15 |
parameter to limit the number of concurrent async_aux_get calls. |
16 |
|
17 |
Bug: https://bugs.gentoo.org/653946 |
18 |
|
19 |
pym/portage/util/futures/iter_completed.py | 73 ++++++++++++++++++++++++++++++ |
20 |
1 file changed, 73 insertions(+) |
21 |
|
22 |
diff --git a/pym/portage/util/futures/iter_completed.py b/pym/portage/util/futures/iter_completed.py |
23 |
index 5ad075305..1d6a9a4bd 100644 |
24 |
--- a/pym/portage/util/futures/iter_completed.py |
25 |
+++ b/pym/portage/util/futures/iter_completed.py |
26 |
@@ -112,3 +112,76 @@ def async_iter_completed(futures, max_jobs=None, max_load=None, loop=None): |
27 |
# cleanup in case of interruption by SIGINT, etc |
28 |
scheduler.cancel() |
29 |
scheduler.wait() |
30 |
+ |
31 |
+ |
32 |
+def iter_gather(futures, max_jobs=None, max_load=None, loop=None): |
33 |
+ """ |
34 |
+ This is similar to asyncio.gather, but takes an iterator of |
35 |
+ futures as input, and includes support for max_jobs and max_load |
36 |
+ parameters. |
37 |
+ |
38 |
+ @param futures: iterator of asyncio.Future (or compatible) |
39 |
+ @type futures: iterator |
40 |
+ @param max_jobs: max number of futures to process concurrently (default |
41 |
+ is multiprocessing.cpu_count()) |
42 |
+ @type max_jobs: int |
43 |
+ @param max_load: max load allowed when scheduling a new future, |
44 |
+ otherwise schedule no more than 1 future at a time (default |
45 |
+ is multiprocessing.cpu_count()) |
46 |
+ @type max_load: int or float |
47 |
+ @param loop: event loop |
48 |
+ @type loop: EventLoop |
49 |
+ @return: a Future resulting in a list of done input futures, in the |
50 |
+ same order that they were yielded from the input iterator |
51 |
+ @rtype: asyncio.Future (or compatible) |
52 |
+ """ |
53 |
+ loop = loop or global_event_loop() |
54 |
+ loop = getattr(loop, '_asyncio_wrapper', loop) |
55 |
+ result = loop.create_future() |
56 |
+ futures_list = [] |
57 |
+ |
58 |
+ def future_generator(): |
59 |
+ for future in futures: |
60 |
+ futures_list.append(future) |
61 |
+ yield future |
62 |
+ |
63 |
+ completed_iter = async_iter_completed( |
64 |
+ future_generator(), |
65 |
+ max_jobs=max_jobs, |
66 |
+ max_load=max_load, |
67 |
+ loop=loop, |
68 |
+ ) |
69 |
+ |
70 |
+ def handle_result(future_done_set): |
71 |
+ if result.cancelled(): |
72 |
+ if not future_done_set.cancelled(): |
73 |
+ # All exceptions must be consumed from future_done_set, in order |
74 |
+ # to avoid triggering the event loop's exception handler. |
75 |
+ list(future.exception() for future in future_done_set.result() |
76 |
+ if not future.cancelled()) |
77 |
+ return |
78 |
+ |
79 |
+ try: |
80 |
+ handle_result.current_task = next(completed_iter) |
81 |
+ except StopIteration: |
82 |
+ result.set_result(futures_list) |
83 |
+ else: |
84 |
+ handle_result.current_task.add_done_callback(handle_result) |
85 |
+ |
86 |
+ try: |
87 |
+ handle_result.current_task = next(completed_iter) |
88 |
+ except StopIteration: |
89 |
+ handle_result.current_task = None |
90 |
+ result.set_result(futures_list) |
91 |
+ else: |
92 |
+ handle_result.current_task.add_done_callback(handle_result) |
93 |
+ |
94 |
+ def cancel_callback(result): |
95 |
+ if (result.cancelled() and |
96 |
+ handle_result.current_task is not None and |
97 |
+ not handle_result.current_task.done()): |
98 |
+ handle_result.current_task.cancel() |
99 |
+ |
100 |
+ result.add_done_callback(cancel_callback) |
101 |
+ |
102 |
+ return result |