1 |
commit: edba848a013e1797faeacc1911a5e6571fca3ca7 |
2 |
Author: Zac Medico <zmedico <AT> gentoo <DOT> org> |
3 |
AuthorDate: Thu Jul 5 04:44:42 2018 +0000 |
4 |
Commit: Zac Medico <zmedico <AT> gentoo <DOT> org> |
5 |
CommitDate: Wed Jul 11 07:40:59 2018 +0000 |
6 |
URL: https://gitweb.gentoo.org/proj/portage.git/commit/?id=edba848a |
7 |
|
8 |
Add python2 compatible coroutine support (bug 660426) |
9 |
|
10 |
For readability, it's desirable to make asynchronous code use |
11 |
coroutines to avoid callbacks when possible. For python2 compatibility, |
12 |
generators that yield Futures can be used to implement coroutines. |
13 |
|
14 |
Add a compat_coroutine module which provides a @coroutine decorator |
15 |
and a coroutine_return function that can be used to return a value |
16 |
from a generator. The decorated function returns a Future which is |
17 |
done when the generator is exhausted. Usage is very similar to asyncio |
18 |
coroutine usage in python3.4 (see unit tests). |
19 |
|
20 |
Bug: https://bugs.gentoo.org/660426 |
21 |
|
22 |
.../tests/util/futures/test_compat_coroutine.py | 159 +++++++++++++++++++++ |
23 |
pym/portage/util/futures/compat_coroutine.py | 112 +++++++++++++++ |
24 |
2 files changed, 271 insertions(+) |
25 |
|
26 |
diff --git a/pym/portage/tests/util/futures/test_compat_coroutine.py b/pym/portage/tests/util/futures/test_compat_coroutine.py |
27 |
new file mode 100644 |
28 |
index 000000000..cbc070869 |
29 |
--- /dev/null |
30 |
+++ b/pym/portage/tests/util/futures/test_compat_coroutine.py |
31 |
@@ -0,0 +1,159 @@ |
32 |
+# Copyright 2018 Gentoo Foundation |
33 |
+# Distributed under the terms of the GNU General Public License v2 |
34 |
+ |
35 |
+from portage.util.futures import asyncio |
36 |
+from portage.util.futures.compat_coroutine import ( |
37 |
+ coroutine, |
38 |
+ coroutine_return, |
39 |
+) |
40 |
+from portage.tests import TestCase |
41 |
+ |
42 |
+ |
43 |
+class CompatCoroutineTestCase(TestCase): |
44 |
+ |
45 |
+ def test_returning_coroutine(self): |
46 |
+ @coroutine |
47 |
+ def returning_coroutine(): |
48 |
+ yield asyncio.sleep(0) |
49 |
+ coroutine_return('success') |
50 |
+ |
51 |
+ self.assertEqual('success', |
52 |
+ asyncio.get_event_loop().run_until_complete(returning_coroutine())) |
53 |
+ |
54 |
+ def test_raising_coroutine(self): |
55 |
+ |
56 |
+ class TestException(Exception): |
57 |
+ pass |
58 |
+ |
59 |
+ @coroutine |
60 |
+ def raising_coroutine(): |
61 |
+ yield asyncio.sleep(0) |
62 |
+ raise TestException('exception') |
63 |
+ |
64 |
+ self.assertRaises(TestException, |
65 |
+ asyncio.get_event_loop().run_until_complete, raising_coroutine()) |
66 |
+ |
67 |
+ def test_catching_coroutine(self): |
68 |
+ |
69 |
+ class TestException(Exception): |
70 |
+ pass |
71 |
+ |
72 |
+ @coroutine |
73 |
+ def catching_coroutine(loop=None): |
74 |
+ loop = asyncio._wrap_loop(loop) |
75 |
+ future = loop.create_future() |
76 |
+ loop.call_soon(future.set_exception, TestException('exception')) |
77 |
+ try: |
78 |
+ yield future |
79 |
+ except TestException: |
80 |
+ self.assertTrue(True) |
81 |
+ else: |
82 |
+ self.assertTrue(False) |
83 |
+ coroutine_return('success') |
84 |
+ |
85 |
+ loop = asyncio.get_event_loop() |
86 |
+ self.assertEqual('success', |
87 |
+ loop.run_until_complete(catching_coroutine(loop=loop))) |
88 |
+ |
89 |
+ def test_cancelled_coroutine(self): |
90 |
+ |
91 |
+ @coroutine |
92 |
+ def cancelled_coroutine(loop=None): |
93 |
+ loop = asyncio._wrap_loop(loop) |
94 |
+ while True: |
95 |
+ yield loop.create_future() |
96 |
+ |
97 |
+ loop = asyncio.get_event_loop() |
98 |
+ future = cancelled_coroutine(loop=loop) |
99 |
+ loop.call_soon(future.cancel) |
100 |
+ |
101 |
+ self.assertRaises(asyncio.CancelledError, |
102 |
+ loop.run_until_complete, future) |
103 |
+ |
104 |
+ def test_cancelled_future(self): |
105 |
+ |
106 |
+ @coroutine |
107 |
+ def cancelled_future_coroutine(loop=None): |
108 |
+ loop = asyncio._wrap_loop(loop) |
109 |
+ while True: |
110 |
+ future = loop.create_future() |
111 |
+ loop.call_soon(future.cancel) |
112 |
+ yield future |
113 |
+ |
114 |
+ loop = asyncio.get_event_loop() |
115 |
+ self.assertRaises(asyncio.CancelledError, |
116 |
+ loop.run_until_complete, cancelled_future_coroutine(loop=loop)) |
117 |
+ |
118 |
+ def test_yield_expression_result(self): |
119 |
+ @coroutine |
120 |
+ def yield_expression_coroutine(): |
121 |
+ for i in range(3): |
122 |
+ x = yield asyncio.sleep(0, result=i) |
123 |
+ self.assertEqual(x, i) |
124 |
+ |
125 |
+ asyncio.get_event_loop().run_until_complete(yield_expression_coroutine()) |
126 |
+ |
127 |
+ def test_method_coroutine(self): |
128 |
+ |
129 |
+ class Cubby(object): |
130 |
+ |
131 |
+ _empty = object() |
132 |
+ |
133 |
+ def __init__(self, loop): |
134 |
+ self._loop = loop |
135 |
+ self._value = self._empty |
136 |
+ self._waiters = [] |
137 |
+ |
138 |
+ def _notify(self): |
139 |
+ waiters = self._waiters |
140 |
+ self._waiters = [] |
141 |
+ for waiter in waiters: |
142 |
+ waiter.cancelled() or waiter.set_result(None) |
143 |
+ |
144 |
+ def _wait(self): |
145 |
+ waiter = self._loop.create_future() |
146 |
+ self._waiters.append(waiter) |
147 |
+ return waiter |
148 |
+ |
149 |
+ @coroutine |
150 |
+ def read(self): |
151 |
+ while self._value is self._empty: |
152 |
+ yield self._wait() |
153 |
+ |
154 |
+ value = self._value |
155 |
+ self._value = self._empty |
156 |
+ self._notify() |
157 |
+ coroutine_return(value) |
158 |
+ |
159 |
+ @coroutine |
160 |
+ def write(self, value): |
161 |
+ while self._value is not self._empty: |
162 |
+ yield self._wait() |
163 |
+ |
164 |
+ self._value = value |
165 |
+ self._notify() |
166 |
+ |
167 |
+ @coroutine |
168 |
+ def writer_coroutine(cubby, values, sentinel): |
169 |
+ for value in values: |
170 |
+ yield cubby.write(value) |
171 |
+ yield cubby.write(sentinel) |
172 |
+ |
173 |
+ @coroutine |
174 |
+ def reader_coroutine(cubby, sentinel): |
175 |
+ results = [] |
176 |
+ while True: |
177 |
+ result = yield cubby.read() |
178 |
+ if result == sentinel: |
179 |
+ break |
180 |
+ results.append(result) |
181 |
+ coroutine_return(results) |
182 |
+ |
183 |
+ loop = asyncio.get_event_loop() |
184 |
+ cubby = Cubby(loop) |
185 |
+ values = list(range(3)) |
186 |
+ writer = asyncio.ensure_future(writer_coroutine(cubby, values, None), loop=loop) |
187 |
+ reader = asyncio.ensure_future(reader_coroutine(cubby, None), loop=loop) |
188 |
+ loop.run_until_complete(asyncio.wait([writer, reader])) |
189 |
+ |
190 |
+ self.assertEqual(reader.result(), values) |
191 |
|
192 |
diff --git a/pym/portage/util/futures/compat_coroutine.py b/pym/portage/util/futures/compat_coroutine.py |
193 |
new file mode 100644 |
194 |
index 000000000..17400b74d |
195 |
--- /dev/null |
196 |
+++ b/pym/portage/util/futures/compat_coroutine.py |
197 |
@@ -0,0 +1,112 @@ |
198 |
+# Copyright 2018 Gentoo Foundation |
199 |
+# Distributed under the terms of the GNU General Public License v2 |
200 |
+ |
201 |
+from portage.util.futures import asyncio |
202 |
+import functools |
203 |
+ |
204 |
+ |
205 |
+def coroutine(generator_func): |
206 |
+ """ |
207 |
+ A decorator for a generator function that behaves as coroutine function. |
208 |
+ The generator should yield a Future instance in order to wait for it, |
209 |
+ and the result becomes the result of the current yield-expression, |
210 |
+ via the PEP 342 generator send() method. |
211 |
+ |
212 |
+ The decorated function returns a Future which is done when the generator |
213 |
+ is exhausted. The generator can return a value via the coroutine_return |
214 |
+ function. |
215 |
+ |
216 |
+ @param generator_func: A generator function that yields Futures, and |
217 |
+ will receive the result of each Future as the result of the |
218 |
+ corresponding yield-expression. |
219 |
+ @type generator_func: function |
220 |
+ @rtype: function |
221 |
+ @return: A function which calls the given generator function and |
222 |
+ returns a Future that is done when the generator is exhausted. |
223 |
+ """ |
224 |
+ # Note that functools.partial does not work for decoration of |
225 |
+ # methods, since it doesn't implement the descriptor protocol. |
226 |
+ # This problem is solve by defining a wrapper function. |
227 |
+ @functools.wraps(generator_func) |
228 |
+ def wrapped(*args, **kwargs): |
229 |
+ return _generator_future(generator_func, *args, **kwargs) |
230 |
+ return wrapped |
231 |
+ |
232 |
+ |
233 |
+def coroutine_return(result=None): |
234 |
+ """ |
235 |
+ Terminate the current coroutine and set the result of the associated |
236 |
+ Future. |
237 |
+ |
238 |
+ @param result: of the current coroutine's Future |
239 |
+ @type object |
240 |
+ """ |
241 |
+ raise _CoroutineReturnValue(result) |
242 |
+ |
243 |
+ |
244 |
+def _generator_future(generator_func, *args, **kwargs): |
245 |
+ """ |
246 |
+ Call generator_func with the given arguments, and return a Future |
247 |
+ that is done when the resulting generation is exhausted. If a |
248 |
+ keyword argument named 'loop' is given, then it is used instead of |
249 |
+ the default event loop. |
250 |
+ """ |
251 |
+ loop = asyncio._wrap_loop(kwargs.get('loop')) |
252 |
+ result = loop.create_future() |
253 |
+ _GeneratorTask(generator_func(*args, **kwargs), result, loop=loop) |
254 |
+ return result |
255 |
+ |
256 |
+ |
257 |
+class _CoroutineReturnValue(Exception): |
258 |
+ def __init__(self, result): |
259 |
+ self.result = result |
260 |
+ |
261 |
+ |
262 |
+class _GeneratorTask(object): |
263 |
+ """ |
264 |
+ Asynchronously executes the generator to completion, waiting for |
265 |
+ the result of each Future that it yields, and sending the result |
266 |
+ to the generator. |
267 |
+ """ |
268 |
+ def __init__(self, generator, result, loop): |
269 |
+ self._generator = generator |
270 |
+ self._result = result |
271 |
+ self._loop = loop |
272 |
+ result.add_done_callback(self._cancel_callback) |
273 |
+ loop.call_soon(self._next) |
274 |
+ |
275 |
+ def _cancel_callback(self, result): |
276 |
+ if result.cancelled(): |
277 |
+ self._generator.close() |
278 |
+ |
279 |
+ def _next(self, previous=None): |
280 |
+ if self._result.cancelled(): |
281 |
+ if previous is not None: |
282 |
+ # Consume exceptions, in order to avoid triggering |
283 |
+ # the event loop's exception handler. |
284 |
+ previous.cancelled() or previous.exception() |
285 |
+ return |
286 |
+ try: |
287 |
+ if previous is None: |
288 |
+ future = next(self._generator) |
289 |
+ elif previous.cancelled(): |
290 |
+ self._generator.throw(asyncio.CancelledError()) |
291 |
+ future = next(self._generator) |
292 |
+ elif previous.exception() is None: |
293 |
+ future = self._generator.send(previous.result()) |
294 |
+ else: |
295 |
+ self._generator.throw(previous.exception()) |
296 |
+ future = next(self._generator) |
297 |
+ |
298 |
+ except _CoroutineReturnValue as e: |
299 |
+ if not self._result.cancelled(): |
300 |
+ self._result.set_result(e.result) |
301 |
+ except StopIteration: |
302 |
+ if not self._result.cancelled(): |
303 |
+ self._result.set_result(None) |
304 |
+ except Exception as e: |
305 |
+ if not self._result.cancelled(): |
306 |
+ self._result.set_exception(e) |
307 |
+ else: |
308 |
+ future = asyncio.ensure_future(future, loop=self._loop) |
309 |
+ future.add_done_callback(self._next) |