1 |
This is similar to asyncio.gather, but takes an iterator of |
2 |
futures as input, and includes support for max_jobs and max_load |
3 |
parameters. For bug 653946, this will be used to asynchronously |
4 |
gather the results of the portdbapi.async_fetch_map calls that |
5 |
are required to generate a Manifest, while using the max_jobs |
6 |
parameter to limit the number of concurrent async_aux_get calls. |
7 |
|
8 |
Bug: https://bugs.gentoo.org/653946 |
9 |
--- |
10 |
pym/portage/util/futures/iter_completed.py | 68 ++++++++++++++++++++++++++++++ |
11 |
1 file changed, 68 insertions(+) |
12 |
|
13 |
diff --git a/pym/portage/util/futures/iter_completed.py b/pym/portage/util/futures/iter_completed.py |
14 |
index 5ad075305..4e52a499f 100644 |
15 |
--- a/pym/portage/util/futures/iter_completed.py |
16 |
+++ b/pym/portage/util/futures/iter_completed.py |
17 |
@@ -112,3 +112,71 @@ def async_iter_completed(futures, max_jobs=None, max_load=None, loop=None): |
18 |
# cleanup in case of interruption by SIGINT, etc |
19 |
scheduler.cancel() |
20 |
scheduler.wait() |
21 |
+ |
22 |
+ |
23 |
+def iter_gather(futures, max_jobs=None, max_load=None, loop=None): |
24 |
+ """ |
25 |
+ This is similar to asyncio.gather, but takes an iterator of |
26 |
+ futures as input, and includes support for max_jobs and max_load |
27 |
+ parameters. |
28 |
+ |
29 |
+ @param futures: iterator of asyncio.Future (or compatible) |
30 |
+ @type futures: iterator |
31 |
+ @param max_jobs: max number of futures to process concurrently (default |
32 |
+ is multiprocessing.cpu_count()) |
33 |
+ @type max_jobs: int |
34 |
+ @param max_load: max load allowed when scheduling a new future, |
35 |
+ otherwise schedule no more than 1 future at a time (default |
36 |
+ is multiprocessing.cpu_count()) |
37 |
+ @type max_load: int or float |
38 |
+ @param loop: event loop |
39 |
+ @type loop: EventLoop |
40 |
+ @return: a Future resulting in a list of done input futures, in the |
41 |
+ same order that they were yielded from the input iterator |
42 |
+ @rtype: asyncio.Future (or compatible) |
43 |
+ """ |
44 |
+ loop = loop or global_event_loop() |
45 |
+ loop = getattr(loop, '_asyncio_wrapper', loop) |
46 |
+ result = loop.create_future() |
47 |
+ futures_list = [] |
48 |
+ |
49 |
+ def future_generator(): |
50 |
+ for future in futures: |
51 |
+ futures_list.append(future) |
52 |
+ yield future |
53 |
+ |
54 |
+ completed_iter = async_iter_completed( |
55 |
+ future_generator(), |
56 |
+ max_jobs=max_jobs, |
57 |
+ max_load=max_load, |
58 |
+ loop=loop, |
59 |
+ ) |
60 |
+ |
61 |
+ def handle_result(future_done_set): |
62 |
+ if result.cancelled(): |
63 |
+ return |
64 |
+ |
65 |
+ try: |
66 |
+ handle_result.current_task = next(completed_iter) |
67 |
+ except StopIteration: |
68 |
+ result.set_result(futures_list) |
69 |
+ else: |
70 |
+ handle_result.current_task.add_done_callback(handle_result) |
71 |
+ |
72 |
+ try: |
73 |
+ handle_result.current_task = next(completed_iter) |
74 |
+ except StopIteration: |
75 |
+ handle_result.current_task = None |
76 |
+ result.set_result(futures_list) |
77 |
+ else: |
78 |
+ handle_result.current_task.add_done_callback(handle_result) |
79 |
+ |
80 |
+ def cancel_callback(result): |
81 |
+ if (result.cancelled() and |
82 |
+ handle_result.current_task is not None and |
83 |
+ not handle_result.current_task.done()): |
84 |
+ handle_result.current_task.cancel() |
85 |
+ |
86 |
+ result.add_done_callback(cancel_callback) |
87 |
+ |
88 |
+ return result |
89 |
-- |
90 |
2.13.6 |