1 |
also lgtm. |
2 |
|
3 |
On Sat, Mar 31, 2018 at 10:46 PM, Zac Medico <zmedico@g.o> wrote: |
4 |
|
5 |
> This decorator will be useful for retrying asynchronous |
6 |
> operations, such as gpg key refresh (bug 649276). The |
7 |
> API is inspired by tenacity, but is simpler. Only |
8 |
> asynchronous functions (like @asyncio.coroutine functions) |
9 |
> are supported. In order to retry a synchronous function, |
10 |
> first convert it to an asynchronous function as follows: |
11 |
> |
12 |
> asynchronous_func = functools.partial( |
13 |
> loop.run_in_executor, None, synchronous_func) |
14 |
> |
15 |
> Bug: https://bugs.gentoo.org/649276 |
16 |
> See: https://github.com/jd/tenacity |
17 |
> --- |
18 |
> pym/portage/tests/util/futures/test_retry.py | 147 ++++++++++++++++++++++ |
19 |
> pym/portage/util/futures/futures.py | 6 + |
20 |
> pym/portage/util/futures/retry.py | 178 |
21 |
> +++++++++++++++++++++++++++ |
22 |
> 3 files changed, 331 insertions(+) |
23 |
> create mode 100644 pym/portage/tests/util/futures/test_retry.py |
24 |
> create mode 100644 pym/portage/util/futures/retry.py |
25 |
> |
26 |
> diff --git a/pym/portage/tests/util/futures/test_retry.py |
27 |
> b/pym/portage/tests/util/futures/test_retry.py |
28 |
> new file mode 100644 |
29 |
> index 000000000..7641e4e92 |
30 |
> --- /dev/null |
31 |
> +++ b/pym/portage/tests/util/futures/test_retry.py |
32 |
> @@ -0,0 +1,147 @@ |
33 |
> +# Copyright 2018 Gentoo Foundation |
34 |
> +# Distributed under the terms of the GNU General Public License v2 |
35 |
> + |
36 |
> +import functools |
37 |
> + |
38 |
> +try: |
39 |
> + import threading |
40 |
> +except ImportError: |
41 |
> + import dummy_threading as threading |
42 |
> + |
43 |
> +from portage.tests import TestCase |
44 |
> +from portage.util._eventloop.global_event_loop import global_event_loop |
45 |
> +from portage.util.backoff import RandomExponentialBackoff |
46 |
> +from portage.util.futures.futures import TimeoutError |
47 |
> +from portage.util.futures.retry import retry |
48 |
> +from portage.util.futures.wait import wait |
49 |
> +from portage.util.monotonic import monotonic |
50 |
> + |
51 |
> + |
52 |
> +class SucceedLaterException(Exception): |
53 |
> + pass |
54 |
> + |
55 |
> + |
56 |
> +class SucceedLater(object): |
57 |
> + """ |
58 |
> + A callable object that succeeds some duration of time has passed. |
59 |
> + """ |
60 |
> + def __init__(self, duration): |
61 |
> + self._succeed_time = monotonic() + duration |
62 |
> + |
63 |
> + def __call__(self): |
64 |
> + remaining = self._succeed_time - monotonic() |
65 |
> + if remaining > 0: |
66 |
> + raise SucceedLaterException('time until success: |
67 |
> {} seconds'.format(remaining)) |
68 |
> + return 'success' |
69 |
> + |
70 |
> + |
71 |
> +class SucceedNeverException(Exception): |
72 |
> + pass |
73 |
> + |
74 |
> + |
75 |
> +class SucceedNever(object): |
76 |
> + """ |
77 |
> + A callable object that never succeeds. |
78 |
> + """ |
79 |
> + def __call__(self): |
80 |
> + raise SucceedNeverException('expected failure') |
81 |
> + |
82 |
> + |
83 |
> +class HangForever(object): |
84 |
> + """ |
85 |
> + A callable object that sleeps forever. |
86 |
> + """ |
87 |
> + def __call__(self): |
88 |
> + threading.Event().wait() |
89 |
> + |
90 |
> + |
91 |
> +class RetryTestCase(TestCase): |
92 |
> + def testSucceedLater(self): |
93 |
> + loop = global_event_loop() |
94 |
> + func = SucceedLater(1) |
95 |
> + func_coroutine = functools.partial(loop.run_in_executor, |
96 |
> None, func) |
97 |
> + decorator = retry(try_max=9999, |
98 |
> + delay_func=RandomExponentialBackoff(multiplier=0.1, |
99 |
> base=2)) |
100 |
> + decorated_func = decorator(func_coroutine) |
101 |
> + result = loop.run_until_complete(decorated_func()) |
102 |
> + self.assertEqual(result, 'success') |
103 |
> + |
104 |
> + def testSucceedNever(self): |
105 |
> + loop = global_event_loop() |
106 |
> + func = SucceedNever() |
107 |
> + func_coroutine = functools.partial(loop.run_in_executor, |
108 |
> None, func) |
109 |
> + decorator = retry(try_max=4, try_timeout=None, |
110 |
> + delay_func=RandomExponentialBackoff(multiplier=0.1, |
111 |
> base=2)) |
112 |
> + decorated_func = decorator(func_coroutine) |
113 |
> + done, pending = loop.run_until_complete(wait([ |
114 |
> decorated_func()])) |
115 |
> + self.assertEqual(len(done), 1) |
116 |
> + self.assertTrue(isinstance(done[0].exception().__cause__, |
117 |
> SucceedNeverException)) |
118 |
> + |
119 |
> + def testSucceedNeverReraise(self): |
120 |
> + loop = global_event_loop() |
121 |
> + func = SucceedNever() |
122 |
> + func_coroutine = functools.partial(loop.run_in_executor, |
123 |
> None, func) |
124 |
> + decorator = retry(reraise=True, try_max=4, |
125 |
> try_timeout=None, |
126 |
> + delay_func=RandomExponentialBackoff(multiplier=0.1, |
127 |
> base=2)) |
128 |
> + decorated_func = decorator(func_coroutine) |
129 |
> + done, pending = loop.run_until_complete(wait([ |
130 |
> decorated_func()])) |
131 |
> + self.assertEqual(len(done), 1) |
132 |
> + self.assertTrue(isinstance(done[0].exception(), |
133 |
> SucceedNeverException)) |
134 |
> + |
135 |
> + def testHangForever(self): |
136 |
> + loop = global_event_loop() |
137 |
> + func = HangForever() |
138 |
> + func_coroutine = functools.partial(loop.run_in_executor, |
139 |
> None, func) |
140 |
> + decorator = retry(try_max=2, try_timeout=0.1, |
141 |
> + delay_func=RandomExponentialBackoff(multiplier=0.1, |
142 |
> base=2)) |
143 |
> + decorated_func = decorator(func_coroutine) |
144 |
> + done, pending = loop.run_until_complete(wait([ |
145 |
> decorated_func()])) |
146 |
> + self.assertEqual(len(done), 1) |
147 |
> + self.assertTrue(isinstance(done[0].exception().__cause__, |
148 |
> TimeoutError)) |
149 |
> + |
150 |
> + def testHangForeverReraise(self): |
151 |
> + loop = global_event_loop() |
152 |
> + func = HangForever() |
153 |
> + func_coroutine = functools.partial(loop.run_in_executor, |
154 |
> None, func) |
155 |
> + decorator = retry(reraise=True, try_max=2, try_timeout=0.1, |
156 |
> + delay_func=RandomExponentialBackoff(multiplier=0.1, |
157 |
> base=2)) |
158 |
> + decorated_func = decorator(func_coroutine) |
159 |
> + done, pending = loop.run_until_complete(wait([ |
160 |
> decorated_func()])) |
161 |
> + self.assertEqual(len(done), 1) |
162 |
> + self.assertTrue(isinstance(done[0].exception(), |
163 |
> TimeoutError)) |
164 |
> + |
165 |
> + def testCancelRetry(self): |
166 |
> + loop = global_event_loop() |
167 |
> + func = SucceedNever() |
168 |
> + func_coroutine = functools.partial(loop.run_in_executor, |
169 |
> None, func) |
170 |
> + decorator = retry(try_timeout=0.1, |
171 |
> + delay_func=RandomExponentialBackoff(multiplier=0.1, |
172 |
> base=2)) |
173 |
> + decorated_func = decorator(func_coroutine) |
174 |
> + future = decorated_func() |
175 |
> + loop.call_later(0.3, future.cancel) |
176 |
> + done, pending = loop.run_until_complete(wait([future])) |
177 |
> + self.assertEqual(len(done), 1) |
178 |
> + self.assertTrue(done[0].cancelled()) |
179 |
> + |
180 |
> + def testOverallTimeoutWithException(self): |
181 |
> + loop = global_event_loop() |
182 |
> + func = SucceedNever() |
183 |
> + func_coroutine = functools.partial(loop.run_in_executor, |
184 |
> None, func) |
185 |
> + decorator = retry(try_timeout=0.1, overall_timeout=0.3, |
186 |
> + delay_func=RandomExponentialBackoff(multiplier=0.1, |
187 |
> base=2)) |
188 |
> + decorated_func = decorator(func_coroutine) |
189 |
> + done, pending = loop.run_until_complete(wait([ |
190 |
> decorated_func()])) |
191 |
> + self.assertEqual(len(done), 1) |
192 |
> + self.assertTrue(isinstance(done[0].exception().__cause__, |
193 |
> SucceedNeverException)) |
194 |
> + |
195 |
> + def testOverallTimeoutWithTimeoutError(self): |
196 |
> + loop = global_event_loop() |
197 |
> + # results in TimeoutError because it hangs forever |
198 |
> + func = HangForever() |
199 |
> + func_coroutine = functools.partial(loop.run_in_executor, |
200 |
> None, func) |
201 |
> + decorator = retry(try_timeout=0.1, overall_timeout=0.3, |
202 |
> + delay_func=RandomExponentialBackoff(multiplier=0.1, |
203 |
> base=2)) |
204 |
> + decorated_func = decorator(func_coroutine) |
205 |
> + done, pending = loop.run_until_complete(wait([ |
206 |
> decorated_func()])) |
207 |
> + self.assertEqual(len(done), 1) |
208 |
> + self.assertTrue(isinstance(done[0].exception().__cause__, |
209 |
> TimeoutError)) |
210 |
> diff --git a/pym/portage/util/futures/futures.py |
211 |
> b/pym/portage/util/futures/futures.py |
212 |
> index dcf593c01..cd56a27eb 100644 |
213 |
> --- a/pym/portage/util/futures/futures.py |
214 |
> +++ b/pym/portage/util/futures/futures.py |
215 |
> @@ -11,6 +11,7 @@ __all__ = ( |
216 |
> 'CancelledError', |
217 |
> 'Future', |
218 |
> 'InvalidStateError', |
219 |
> + 'TimeoutError', |
220 |
> ) |
221 |
> |
222 |
> try: |
223 |
> @@ -18,6 +19,7 @@ try: |
224 |
> CancelledError, |
225 |
> Future, |
226 |
> InvalidStateError, |
227 |
> + TimeoutError, |
228 |
> ) |
229 |
> except ImportError: |
230 |
> |
231 |
> @@ -30,6 +32,10 @@ except ImportError: |
232 |
> def __init__(self): |
233 |
> Error.__init__(self, "cancelled") |
234 |
> |
235 |
> + class TimeoutError(Error): |
236 |
> + def __init__(self): |
237 |
> + Error.__init__(self, "timed out") |
238 |
> + |
239 |
> class InvalidStateError(Error): |
240 |
> pass |
241 |
> |
242 |
> diff --git a/pym/portage/util/futures/retry.py b/pym/portage/util/futures/ |
243 |
> retry.py |
244 |
> new file mode 100644 |
245 |
> index 000000000..2caf1bbac |
246 |
> --- /dev/null |
247 |
> +++ b/pym/portage/util/futures/retry.py |
248 |
> @@ -0,0 +1,178 @@ |
249 |
> +# Copyright 2018 Gentoo Foundation |
250 |
> +# Distributed under the terms of the GNU General Public License v2 |
251 |
> + |
252 |
> +import functools |
253 |
> + |
254 |
> +from portage.exception import PortageException |
255 |
> +from portage.util._eventloop.global_event_loop import global_event_loop |
256 |
> +from portage.util.futures.futures import TimeoutError |
257 |
> + |
258 |
> + |
259 |
> +class RetryError(PortageException): |
260 |
> + """Raised when retry fails.""" |
261 |
> + def __init__(self): |
262 |
> + PortageException.__init__(self, "retry error") |
263 |
> + |
264 |
> + |
265 |
> +def retry(try_max=None, try_timeout=None, overall_timeout=None, |
266 |
> + delay_func=None, reraise=False, loop=None): |
267 |
> + """ |
268 |
> + Create and return a retry decorator. The decorator is intended to |
269 |
> + operate only on a coroutine function. |
270 |
> + |
271 |
> + @param try_max: maximum number of tries |
272 |
> + @type try_max: int or None |
273 |
> + @param try_timeout: number of seconds to wait for a try to succeed |
274 |
> + before cancelling it, which is only effective if func |
275 |
> returns |
276 |
> + tasks that support cancellation |
277 |
> + @type try_timeout: float or None |
278 |
> + @param overall_timeout: number of seconds to wait for retires to |
279 |
> + succeed before aborting, which is only effective if func |
280 |
> returns |
281 |
> + tasks that support cancellation |
282 |
> + @type overall_timeout: float or None |
283 |
> + @param delay_func: function that takes an int argument |
284 |
> corresponding |
285 |
> + to the number of previous tries and returns a number of |
286 |
> seconds |
287 |
> + to wait before the next try |
288 |
> + @type delay_func: callable |
289 |
> + @param reraise: Reraise the last exception, instead of RetryError |
290 |
> + @type reraise: bool |
291 |
> + @param loop: event loop |
292 |
> + @type loop: EventLoop |
293 |
> + @return: func decorated with retry support |
294 |
> + @rtype: callable |
295 |
> + """ |
296 |
> + return functools.partial(_retry_wrapper, loop, try_max, |
297 |
> try_timeout, |
298 |
> + overall_timeout, delay_func, reraise) |
299 |
> + |
300 |
> + |
301 |
> +def _retry_wrapper(loop, try_max, try_timeout, overall_timeout, |
302 |
> delay_func, |
303 |
> + reraise, func): |
304 |
> + """ |
305 |
> + Create and return a decorated function. |
306 |
> + """ |
307 |
> + return functools.partial(_retry, loop, try_max, try_timeout, |
308 |
> + overall_timeout, delay_func, reraise, func) |
309 |
> + |
310 |
> + |
311 |
> +def _retry(loop, try_max, try_timeout, overall_timeout, delay_func, |
312 |
> + reraise, func, *args, **kwargs): |
313 |
> + """ |
314 |
> + Retry coroutine, used to implement retry decorator. |
315 |
> + |
316 |
> + @return: func return value |
317 |
> + @rtype: asyncio.Future (or compatible) |
318 |
> + """ |
319 |
> + loop = loop or global_event_loop() |
320 |
> + future = loop.create_future() |
321 |
> + _Retry(future, loop, try_max, try_timeout, overall_timeout, |
322 |
> delay_func, |
323 |
> + reraise, functools.partial(func, *args, **kwargs)) |
324 |
> + return future |
325 |
> + |
326 |
> + |
327 |
> +class _Retry(object): |
328 |
> + def __init__(self, future, loop, try_max, try_timeout, |
329 |
> overall_timeout, |
330 |
> + delay_func, reraise, func): |
331 |
> + self._future = future |
332 |
> + self._loop = loop |
333 |
> + self._try_max = try_max |
334 |
> + self._try_timeout = try_timeout |
335 |
> + self._delay_func = delay_func |
336 |
> + self._reraise = reraise |
337 |
> + self._func = func |
338 |
> + |
339 |
> + self._try_timeout_handle = None |
340 |
> + self._overall_timeout_handle = None |
341 |
> + self._overall_timeout_expired = None |
342 |
> + self._tries = 0 |
343 |
> + self._current_task = None |
344 |
> + self._previous_result = None |
345 |
> + |
346 |
> + future.add_done_callback(self._cancel_callback) |
347 |
> + if overall_timeout is not None: |
348 |
> + self._overall_timeout_handle = loop.call_later( |
349 |
> + overall_timeout, self._overall_timeout_ |
350 |
> callback) |
351 |
> + self._begin_try() |
352 |
> + |
353 |
> + def _cancel_callback(self, future): |
354 |
> + if future.cancelled() and self._current_task is not None: |
355 |
> + self._current_task.cancel() |
356 |
> + |
357 |
> + def _try_timeout_callback(self): |
358 |
> + self._try_timeout_handle = None |
359 |
> + self._current_task.cancel() |
360 |
> + |
361 |
> + def _overall_timeout_callback(self): |
362 |
> + self._overall_timeout_handle = None |
363 |
> + self._overall_timeout_expired = True |
364 |
> + self._current_task.cancel() |
365 |
> + self._retry_error() |
366 |
> + |
367 |
> + def _begin_try(self): |
368 |
> + self._tries += 1 |
369 |
> + self._current_task = self._func() |
370 |
> + self._current_task.add_done_callback(self._try_done) |
371 |
> + if self._try_timeout is not None: |
372 |
> + self._try_timeout_handle = self._loop.call_later( |
373 |
> + self._try_timeout, |
374 |
> self._try_timeout_callback) |
375 |
> + |
376 |
> + def _try_done(self, future): |
377 |
> + self._current_task = None |
378 |
> + |
379 |
> + if self._try_timeout_handle is not None: |
380 |
> + self._try_timeout_handle.cancel() |
381 |
> + self._try_timeout_handle = None |
382 |
> + |
383 |
> + if not future.cancelled(): |
384 |
> + # consume exception, so that the event loop |
385 |
> + # exception handler does not report it |
386 |
> + future.exception() |
387 |
> + |
388 |
> + if self._overall_timeout_expired: |
389 |
> + return |
390 |
> + |
391 |
> + try: |
392 |
> + if self._future.cancelled(): |
393 |
> + return |
394 |
> + |
395 |
> + self._previous_result = future |
396 |
> + if not (future.cancelled() or future.exception() |
397 |
> is not None): |
398 |
> + # success |
399 |
> + self._future.set_result(future.result()) |
400 |
> + return |
401 |
> + finally: |
402 |
> + if self._future.done() and |
403 |
> self._overall_timeout_handle is not None: |
404 |
> + self._overall_timeout_handle.cancel() |
405 |
> + self._overall_timeout_handle = None |
406 |
> + |
407 |
> + if self._try_max is not None and self._tries >= |
408 |
> self._try_max: |
409 |
> + self._retry_error() |
410 |
> + return |
411 |
> + |
412 |
> + if self._delay_func is not None: |
413 |
> + delay = self._delay_func(self._tries) |
414 |
> + self._current_task = self._loop.call_later(delay, |
415 |
> self._delay_done) |
416 |
> + return |
417 |
> + |
418 |
> + self._begin_try() |
419 |
> + |
420 |
> + def _delay_done(self): |
421 |
> + self._current_task = None |
422 |
> + |
423 |
> + if self._future.cancelled() or |
424 |
> self._overall_timeout_expired: |
425 |
> + return |
426 |
> + |
427 |
> + self._begin_try() |
428 |
> + |
429 |
> + def _retry_error(self): |
430 |
> + if self._previous_result is None or self._previous_result. |
431 |
> cancelled(): |
432 |
> + cause = TimeoutError() |
433 |
> + else: |
434 |
> + cause = self._previous_result.exception() |
435 |
> + |
436 |
> + if self._reraise: |
437 |
> + e = cause |
438 |
> + else: |
439 |
> + e = RetryError() |
440 |
> + e.__cause__ = cause |
441 |
> + |
442 |
> + self._future.set_exception(e) |
443 |
> -- |
444 |
> 2.13.6 |
445 |
> |
446 |
> |
447 |
> |