Gentoo Archives: gentoo-portage-dev

From: Zac Medico <zmedico@g.o>
To: gentoo-portage-dev@l.g.o
Cc: Zac Medico <zmedico@g.o>
Subject: [gentoo-portage-dev] [PATCH 1/2] Add iter_gather function (bug 653946)
Date: Tue, 24 Apr 2018 23:46:49
Message-Id: 20180424234559.26703-2-zmedico@gentoo.org
In Reply to: [gentoo-portage-dev] [PATCH 0/2] ManifestScheduler: async fetchlist_dict (bug 653946) by Zac Medico
1 This is similar to asyncio.gather, but takes an iterator of
2 futures as input, and includes support for max_jobs and max_load
3 parameters. For bug 653946, this will be used to asynchronously
4 gather the results of the portdbapi.async_fetch_map calls that
5 are required to generate a Manifest, while using the max_jobs
6 parameter to limit the number of concurrent async_aux_get calls.
7
8 Bug: https://bugs.gentoo.org/653946
9 ---
10 pym/portage/util/futures/iter_completed.py | 68 ++++++++++++++++++++++++++++++
11 1 file changed, 68 insertions(+)
12
13 diff --git a/pym/portage/util/futures/iter_completed.py b/pym/portage/util/futures/iter_completed.py
14 index 5ad075305..4e52a499f 100644
15 --- a/pym/portage/util/futures/iter_completed.py
16 +++ b/pym/portage/util/futures/iter_completed.py
17 @@ -112,3 +112,71 @@ def async_iter_completed(futures, max_jobs=None, max_load=None, loop=None):
18 # cleanup in case of interruption by SIGINT, etc
19 scheduler.cancel()
20 scheduler.wait()
21 +
22 +
23 +def iter_gather(futures, max_jobs=None, max_load=None, loop=None):
24 + """
25 + This is similar to asyncio.gather, but takes an iterator of
26 + futures as input, and includes support for max_jobs and max_load
27 + parameters.
28 +
29 + @param futures: iterator of asyncio.Future (or compatible)
30 + @type futures: iterator
31 + @param max_jobs: max number of futures to process concurrently (default
32 + is multiprocessing.cpu_count())
33 + @type max_jobs: int
34 + @param max_load: max load allowed when scheduling a new future,
35 + otherwise schedule no more than 1 future at a time (default
36 + is multiprocessing.cpu_count())
37 + @type max_load: int or float
38 + @param loop: event loop
39 + @type loop: EventLoop
40 + @return: a Future resulting in a list of done input futures, in the
41 + same order that they were yielded from the input iterator
42 + @rtype: asyncio.Future (or compatible)
43 + """
44 + loop = loop or global_event_loop()
45 + loop = getattr(loop, '_asyncio_wrapper', loop)
46 + result = loop.create_future()
47 + futures_list = []
48 +
49 + def future_generator():
50 + for future in futures:
51 + futures_list.append(future)
52 + yield future
53 +
54 + completed_iter = async_iter_completed(
55 + future_generator(),
56 + max_jobs=max_jobs,
57 + max_load=max_load,
58 + loop=loop,
59 + )
60 +
61 + def handle_result(future_done_set):
62 + if result.cancelled():
63 + return
64 +
65 + try:
66 + handle_result.current_task = next(completed_iter)
67 + except StopIteration:
68 + result.set_result(futures_list)
69 + else:
70 + handle_result.current_task.add_done_callback(handle_result)
71 +
72 + try:
73 + handle_result.current_task = next(completed_iter)
74 + except StopIteration:
75 + handle_result.current_task = None
76 + result.set_result(futures_list)
77 + else:
78 + handle_result.current_task.add_done_callback(handle_result)
79 +
80 + def cancel_callback(result):
81 + if (result.cancelled() and
82 + handle_result.current_task is not None and
83 + not handle_result.current_task.done()):
84 + handle_result.current_task.cancel()
85 +
86 + result.add_done_callback(cancel_callback)
87 +
88 + return result
89 --
90 2.13.6