Gentoo Archives: gentoo-commits

From: Zac Medico <zmedico@g.o>
To: gentoo-commits@l.g.o
Subject: [gentoo-commits] proj/portage:master commit in: pym/portage/util/futures/
Date: Fri, 27 Apr 2018 23:42:30
Message-Id: 1524864780.be61882996099322bb3a1e82e71f475b4141ad40.zmedico@gentoo
1 commit: be61882996099322bb3a1e82e71f475b4141ad40
2 Author: Zac Medico <zmedico <AT> gentoo <DOT> org>
3 AuthorDate: Tue Apr 24 23:28:08 2018 +0000
4 Commit: Zac Medico <zmedico <AT> gentoo <DOT> org>
5 CommitDate: Fri Apr 27 21:33:00 2018 +0000
6 URL: https://gitweb.gentoo.org/proj/portage.git/commit/?id=be618829
7
8 Add iter_gather function (bug 653946)
9
10 This is similar to asyncio.gather, but takes an iterator of
11 futures as input, and includes support for max_jobs and max_load
12 parameters. For bug 653946, this will be used to asynchronously
13 gather the results of the portdbapi.async_fetch_map calls that
14 are required to generate a Manifest, while using the max_jobs
15 parameter to limit the number of concurrent async_aux_get calls.
16
17 Bug: https://bugs.gentoo.org/653946
18
19 pym/portage/util/futures/iter_completed.py | 73 ++++++++++++++++++++++++++++++
20 1 file changed, 73 insertions(+)
21
22 diff --git a/pym/portage/util/futures/iter_completed.py b/pym/portage/util/futures/iter_completed.py
23 index 5ad075305..1d6a9a4bd 100644
24 --- a/pym/portage/util/futures/iter_completed.py
25 +++ b/pym/portage/util/futures/iter_completed.py
26 @@ -112,3 +112,76 @@ def async_iter_completed(futures, max_jobs=None, max_load=None, loop=None):
27 # cleanup in case of interruption by SIGINT, etc
28 scheduler.cancel()
29 scheduler.wait()
30 +
31 +
32 +def iter_gather(futures, max_jobs=None, max_load=None, loop=None):
33 + """
34 + This is similar to asyncio.gather, but takes an iterator of
35 + futures as input, and includes support for max_jobs and max_load
36 + parameters.
37 +
38 + @param futures: iterator of asyncio.Future (or compatible)
39 + @type futures: iterator
40 + @param max_jobs: max number of futures to process concurrently (default
41 + is multiprocessing.cpu_count())
42 + @type max_jobs: int
43 + @param max_load: max load allowed when scheduling a new future,
44 + otherwise schedule no more than 1 future at a time (default
45 + is multiprocessing.cpu_count())
46 + @type max_load: int or float
47 + @param loop: event loop
48 + @type loop: EventLoop
49 + @return: a Future resulting in a list of done input futures, in the
50 + same order that they were yielded from the input iterator
51 + @rtype: asyncio.Future (or compatible)
52 + """
53 + loop = loop or global_event_loop()
54 + loop = getattr(loop, '_asyncio_wrapper', loop)
55 + result = loop.create_future()
56 + futures_list = []
57 +
58 + def future_generator():
59 + for future in futures:
60 + futures_list.append(future)
61 + yield future
62 +
63 + completed_iter = async_iter_completed(
64 + future_generator(),
65 + max_jobs=max_jobs,
66 + max_load=max_load,
67 + loop=loop,
68 + )
69 +
70 + def handle_result(future_done_set):
71 + if result.cancelled():
72 + if not future_done_set.cancelled():
73 + # All exceptions must be consumed from future_done_set, in order
74 + # to avoid triggering the event loop's exception handler.
75 + list(future.exception() for future in future_done_set.result()
76 + if not future.cancelled())
77 + return
78 +
79 + try:
80 + handle_result.current_task = next(completed_iter)
81 + except StopIteration:
82 + result.set_result(futures_list)
83 + else:
84 + handle_result.current_task.add_done_callback(handle_result)
85 +
86 + try:
87 + handle_result.current_task = next(completed_iter)
88 + except StopIteration:
89 + handle_result.current_task = None
90 + result.set_result(futures_list)
91 + else:
92 + handle_result.current_task.add_done_callback(handle_result)
93 +
94 + def cancel_callback(result):
95 + if (result.cancelled() and
96 + handle_result.current_task is not None and
97 + not handle_result.current_task.done()):
98 + handle_result.current_task.cancel()
99 +
100 + result.add_done_callback(cancel_callback)
101 +
102 + return result