Gentoo Archives: gentoo-portage-dev

From: Zac Medico <zmedico@g.o>
To: gentoo-portage-dev@l.g.o
Cc: Zac Medico <zmedico@g.o>
Subject: [gentoo-portage-dev] [PATCH] Add async_iter_completed for asyncio migration (bug 591760)
Date: Tue, 17 Apr 2018 10:06:18
Message-Id: 20180417100507.1248-1-zmedico@gentoo.org
1 This serves as a wrapper around portage's internal TaskScheduler
2 class, allowing TaskScheduler API consumers to be migrated to use
3 asyncio interfaces.
4
5 Bug: https://bugs.gentoo.org/591760
6 ---
7 .../tests/util/futures/test_iter_completed.py | 37 ++++++++++++-
8 pym/portage/util/futures/iter_completed.py | 61 +++++++++++++++++++---
9 2 files changed, 91 insertions(+), 7 deletions(-)
10
11 diff --git a/pym/portage/tests/util/futures/test_iter_completed.py b/pym/portage/tests/util/futures/test_iter_completed.py
12 index 9c23aefb1..1344523c6 100644
13 --- a/pym/portage/tests/util/futures/test_iter_completed.py
14 +++ b/pym/portage/tests/util/futures/test_iter_completed.py
15 @@ -5,7 +5,11 @@ import time
16 from portage.tests import TestCase
17 from portage.util._async.ForkProcess import ForkProcess
18 from portage.util._eventloop.global_event_loop import global_event_loop
19 -from portage.util.futures.iter_completed import iter_completed
20 +from portage.util.futures import asyncio
21 +from portage.util.futures.iter_completed import (
22 + iter_completed,
23 + async_iter_completed,
24 +)
25
26
27 class SleepProcess(ForkProcess):
28 @@ -48,3 +52,34 @@ class IterCompletedTestCase(TestCase):
29 for seconds, future in zip(expected_order, iter_completed(future_generator(),
30 max_jobs=True, max_load=None, loop=loop)):
31 self.assertEqual(seconds, future.result())
32 +
33 + def testAsyncCancel(self):
34 +
35 + loop = global_event_loop()._asyncio_wrapper
36 + input_futures = set()
37 + future_count = 3
38 +
39 + def future_generator():
40 + for i in range(future_count):
41 + future = loop.create_future()
42 + loop.call_soon(lambda future: None if future.done()
43 + else future.set_result(None), future)
44 + input_futures.add(future)
45 + yield future
46 +
47 + for future_done_set in async_iter_completed(future_generator(),
48 + max_jobs=True, max_load=None, loop=loop):
49 + future_done_set.cancel()
50 + break
51 +
52 + # With max_jobs=True, async_iter_completed should have executed
53 + # the generator until it raised StopIteration.
54 + self.assertEqual(future_count, len(input_futures))
55 +
56 + loop.run_until_complete(asyncio.wait(input_futures, loop=loop))
57 +
58 + # The futures may have results or they may have been cancelled
59 + # by TaskScheduler, and behavior varies depending on the python
60 + # interpreter.
61 + for future in input_futures:
62 + future.cancelled() or future.result()
63 diff --git a/pym/portage/util/futures/iter_completed.py b/pym/portage/util/futures/iter_completed.py
64 index 8d324de84..5ad075305 100644
65 --- a/pym/portage/util/futures/iter_completed.py
66 +++ b/pym/portage/util/futures/iter_completed.py
67 @@ -1,6 +1,7 @@
68 # Copyright 2018 Gentoo Foundation
69 # Distributed under the terms of the GNU General Public License v2
70
71 +import functools
72 import multiprocessing
73
74 from portage.util._async.AsyncTaskFuture import AsyncTaskFuture
75 @@ -31,6 +32,38 @@ def iter_completed(futures, max_jobs=None, max_load=None, loop=None):
76 """
77 loop = loop or global_event_loop()
78 loop = getattr(loop, '_asyncio_wrapper', loop)
79 +
80 + for future_done_set in async_iter_completed(futures,
81 + max_jobs=max_jobs, max_load=max_load, loop=loop):
82 + for future in loop.run_until_complete(future_done_set):
83 + yield future
84 +
85 +
86 +def async_iter_completed(futures, max_jobs=None, max_load=None, loop=None):
87 + """
88 + An asynchronous version of iter_completed. This yields futures, which
89 + when done, result in a set of input futures that are done. This serves
90 + as a wrapper around portage's internal TaskScheduler class, using
91 + standard asyncio interfaces.
92 +
93 + @param futures: iterator of asyncio.Future (or compatible)
94 + @type futures: iterator
95 + @param max_jobs: max number of futures to process concurrently (default
96 + is multiprocessing.cpu_count())
97 + @type max_jobs: int
98 + @param max_load: max load allowed when scheduling a new future,
99 + otherwise schedule no more than 1 future at a time (default
100 + is multiprocessing.cpu_count())
101 + @type max_load: int or float
102 + @param loop: event loop
103 + @type loop: EventLoop
104 + @return: iterator of futures, which when done, result in a set of
105 + input futures that are done
106 + @rtype: iterator
107 + """
108 + loop = loop or global_event_loop()
109 + loop = getattr(loop, '_asyncio_wrapper', loop)
110 +
111 max_jobs = max_jobs or multiprocessing.cpu_count()
112 max_load = max_load or multiprocessing.cpu_count()
113
114 @@ -46,19 +79,35 @@ def iter_completed(futures, max_jobs=None, max_load=None, loop=None):
115 max_load=max_load,
116 event_loop=loop._loop)
117
118 + def done_callback(future_done_set, wait_result):
119 + """Propagate results from wait_result to future_done_set."""
120 + if future_done_set.cancelled():
121 + return
122 + done, pending = wait_result.result()
123 + for future in done:
124 + del future_map[id(future)]
125 + future_done_set.set_result(done)
126 +
127 + def cancel_callback(wait_result, future_done_set):
128 + """Cancel wait_result if future_done_set has been cancelled."""
129 + if future_done_set.cancelled() and not wait_result.done():
130 + wait_result.cancel()
131 +
132 try:
133 scheduler.start()
134
135 # scheduler should ensure that future_map is non-empty until
136 # task_generator is exhausted
137 while future_map:
138 - done, pending = loop.run_until_complete(
139 + wait_result = asyncio.ensure_future(
140 asyncio.wait(list(future_map.values()),
141 - return_when=asyncio.FIRST_COMPLETED, loop=loop))
142 - for future in done:
143 - del future_map[id(future)]
144 - yield future
145 -
146 + return_when=asyncio.FIRST_COMPLETED, loop=loop), loop=loop)
147 + future_done_set = loop.create_future()
148 + future_done_set.add_done_callback(
149 + functools.partial(cancel_callback, wait_result))
150 + wait_result.add_done_callback(
151 + functools.partial(done_callback, future_done_set))
152 + yield future_done_set
153 finally:
154 # cleanup in case of interruption by SIGINT, etc
155 scheduler.cancel()
156 --
157 2.13.6