1 |
commit: 1aa9a71731f3ab05e10190493956c70998abf12a |
2 |
Author: Zac Medico <zmedico <AT> gentoo <DOT> org> |
3 |
AuthorDate: Fri Mar 16 19:37:54 2018 +0000 |
4 |
Commit: Zac Medico <zmedico <AT> gentoo <DOT> org> |
5 |
CommitDate: Mon Apr 2 16:53:24 2018 +0000 |
6 |
URL: https://gitweb.gentoo.org/proj/portage.git/commit/?id=1aa9a717 |
7 |
|
8 |
Add retry decorator (API inspired by tenacity) |
9 |
|
10 |
This decorator will be useful for retrying asynchronous |
11 |
operations, such as gpg key refresh (bug 649276). The |
12 |
API is inspired by tenacity, but is simpler. Only |
13 |
asynchronous functions (like @asyncio.coroutine functions) |
14 |
are supported. In order to retry a synchronous function, |
15 |
first convert it to an asynchronous function as follows: |
16 |
|
17 |
asynchronous_func = functools.partial( |
18 |
loop.run_in_executor, None, synchronous_func) |
19 |
|
20 |
Bug: https://bugs.gentoo.org/649276 |
21 |
See: https://github.com/jd/tenacity |
22 |
Reviewed-by: Alec Warner <antarus <AT> gentoo.org> |
23 |
|
24 |
pym/portage/tests/util/futures/test_retry.py | 147 +++++++++++++++++++++ |
25 |
pym/portage/util/futures/futures.py | 6 + |
26 |
pym/portage/util/futures/retry.py | 183 +++++++++++++++++++++++++++ |
27 |
3 files changed, 336 insertions(+) |
28 |
|
29 |
diff --git a/pym/portage/tests/util/futures/test_retry.py b/pym/portage/tests/util/futures/test_retry.py |
30 |
new file mode 100644 |
31 |
index 000000000..7641e4e92 |
32 |
--- /dev/null |
33 |
+++ b/pym/portage/tests/util/futures/test_retry.py |
34 |
@@ -0,0 +1,147 @@ |
35 |
+# Copyright 2018 Gentoo Foundation |
36 |
+# Distributed under the terms of the GNU General Public License v2 |
37 |
+ |
38 |
+import functools |
39 |
+ |
40 |
+try: |
41 |
+ import threading |
42 |
+except ImportError: |
43 |
+ import dummy_threading as threading |
44 |
+ |
45 |
+from portage.tests import TestCase |
46 |
+from portage.util._eventloop.global_event_loop import global_event_loop |
47 |
+from portage.util.backoff import RandomExponentialBackoff |
48 |
+from portage.util.futures.futures import TimeoutError |
49 |
+from portage.util.futures.retry import retry |
50 |
+from portage.util.futures.wait import wait |
51 |
+from portage.util.monotonic import monotonic |
52 |
+ |
53 |
+ |
54 |
+class SucceedLaterException(Exception): |
55 |
+ pass |
56 |
+ |
57 |
+ |
58 |
+class SucceedLater(object): |
59 |
+ """ |
60 |
+ A callable object that succeeds some duration of time has passed. |
61 |
+ """ |
62 |
+ def __init__(self, duration): |
63 |
+ self._succeed_time = monotonic() + duration |
64 |
+ |
65 |
+ def __call__(self): |
66 |
+ remaining = self._succeed_time - monotonic() |
67 |
+ if remaining > 0: |
68 |
+ raise SucceedLaterException('time until success: {} seconds'.format(remaining)) |
69 |
+ return 'success' |
70 |
+ |
71 |
+ |
72 |
+class SucceedNeverException(Exception): |
73 |
+ pass |
74 |
+ |
75 |
+ |
76 |
+class SucceedNever(object): |
77 |
+ """ |
78 |
+ A callable object that never succeeds. |
79 |
+ """ |
80 |
+ def __call__(self): |
81 |
+ raise SucceedNeverException('expected failure') |
82 |
+ |
83 |
+ |
84 |
+class HangForever(object): |
85 |
+ """ |
86 |
+ A callable object that sleeps forever. |
87 |
+ """ |
88 |
+ def __call__(self): |
89 |
+ threading.Event().wait() |
90 |
+ |
91 |
+ |
92 |
+class RetryTestCase(TestCase): |
93 |
+ def testSucceedLater(self): |
94 |
+ loop = global_event_loop() |
95 |
+ func = SucceedLater(1) |
96 |
+ func_coroutine = functools.partial(loop.run_in_executor, None, func) |
97 |
+ decorator = retry(try_max=9999, |
98 |
+ delay_func=RandomExponentialBackoff(multiplier=0.1, base=2)) |
99 |
+ decorated_func = decorator(func_coroutine) |
100 |
+ result = loop.run_until_complete(decorated_func()) |
101 |
+ self.assertEqual(result, 'success') |
102 |
+ |
103 |
+ def testSucceedNever(self): |
104 |
+ loop = global_event_loop() |
105 |
+ func = SucceedNever() |
106 |
+ func_coroutine = functools.partial(loop.run_in_executor, None, func) |
107 |
+ decorator = retry(try_max=4, try_timeout=None, |
108 |
+ delay_func=RandomExponentialBackoff(multiplier=0.1, base=2)) |
109 |
+ decorated_func = decorator(func_coroutine) |
110 |
+ done, pending = loop.run_until_complete(wait([decorated_func()])) |
111 |
+ self.assertEqual(len(done), 1) |
112 |
+ self.assertTrue(isinstance(done[0].exception().__cause__, SucceedNeverException)) |
113 |
+ |
114 |
+ def testSucceedNeverReraise(self): |
115 |
+ loop = global_event_loop() |
116 |
+ func = SucceedNever() |
117 |
+ func_coroutine = functools.partial(loop.run_in_executor, None, func) |
118 |
+ decorator = retry(reraise=True, try_max=4, try_timeout=None, |
119 |
+ delay_func=RandomExponentialBackoff(multiplier=0.1, base=2)) |
120 |
+ decorated_func = decorator(func_coroutine) |
121 |
+ done, pending = loop.run_until_complete(wait([decorated_func()])) |
122 |
+ self.assertEqual(len(done), 1) |
123 |
+ self.assertTrue(isinstance(done[0].exception(), SucceedNeverException)) |
124 |
+ |
125 |
+ def testHangForever(self): |
126 |
+ loop = global_event_loop() |
127 |
+ func = HangForever() |
128 |
+ func_coroutine = functools.partial(loop.run_in_executor, None, func) |
129 |
+ decorator = retry(try_max=2, try_timeout=0.1, |
130 |
+ delay_func=RandomExponentialBackoff(multiplier=0.1, base=2)) |
131 |
+ decorated_func = decorator(func_coroutine) |
132 |
+ done, pending = loop.run_until_complete(wait([decorated_func()])) |
133 |
+ self.assertEqual(len(done), 1) |
134 |
+ self.assertTrue(isinstance(done[0].exception().__cause__, TimeoutError)) |
135 |
+ |
136 |
+ def testHangForeverReraise(self): |
137 |
+ loop = global_event_loop() |
138 |
+ func = HangForever() |
139 |
+ func_coroutine = functools.partial(loop.run_in_executor, None, func) |
140 |
+ decorator = retry(reraise=True, try_max=2, try_timeout=0.1, |
141 |
+ delay_func=RandomExponentialBackoff(multiplier=0.1, base=2)) |
142 |
+ decorated_func = decorator(func_coroutine) |
143 |
+ done, pending = loop.run_until_complete(wait([decorated_func()])) |
144 |
+ self.assertEqual(len(done), 1) |
145 |
+ self.assertTrue(isinstance(done[0].exception(), TimeoutError)) |
146 |
+ |
147 |
+ def testCancelRetry(self): |
148 |
+ loop = global_event_loop() |
149 |
+ func = SucceedNever() |
150 |
+ func_coroutine = functools.partial(loop.run_in_executor, None, func) |
151 |
+ decorator = retry(try_timeout=0.1, |
152 |
+ delay_func=RandomExponentialBackoff(multiplier=0.1, base=2)) |
153 |
+ decorated_func = decorator(func_coroutine) |
154 |
+ future = decorated_func() |
155 |
+ loop.call_later(0.3, future.cancel) |
156 |
+ done, pending = loop.run_until_complete(wait([future])) |
157 |
+ self.assertEqual(len(done), 1) |
158 |
+ self.assertTrue(done[0].cancelled()) |
159 |
+ |
160 |
+ def testOverallTimeoutWithException(self): |
161 |
+ loop = global_event_loop() |
162 |
+ func = SucceedNever() |
163 |
+ func_coroutine = functools.partial(loop.run_in_executor, None, func) |
164 |
+ decorator = retry(try_timeout=0.1, overall_timeout=0.3, |
165 |
+ delay_func=RandomExponentialBackoff(multiplier=0.1, base=2)) |
166 |
+ decorated_func = decorator(func_coroutine) |
167 |
+ done, pending = loop.run_until_complete(wait([decorated_func()])) |
168 |
+ self.assertEqual(len(done), 1) |
169 |
+ self.assertTrue(isinstance(done[0].exception().__cause__, SucceedNeverException)) |
170 |
+ |
171 |
+ def testOverallTimeoutWithTimeoutError(self): |
172 |
+ loop = global_event_loop() |
173 |
+ # results in TimeoutError because it hangs forever |
174 |
+ func = HangForever() |
175 |
+ func_coroutine = functools.partial(loop.run_in_executor, None, func) |
176 |
+ decorator = retry(try_timeout=0.1, overall_timeout=0.3, |
177 |
+ delay_func=RandomExponentialBackoff(multiplier=0.1, base=2)) |
178 |
+ decorated_func = decorator(func_coroutine) |
179 |
+ done, pending = loop.run_until_complete(wait([decorated_func()])) |
180 |
+ self.assertEqual(len(done), 1) |
181 |
+ self.assertTrue(isinstance(done[0].exception().__cause__, TimeoutError)) |
182 |
|
183 |
diff --git a/pym/portage/util/futures/futures.py b/pym/portage/util/futures/futures.py |
184 |
index dcf593c01..cd56a27eb 100644 |
185 |
--- a/pym/portage/util/futures/futures.py |
186 |
+++ b/pym/portage/util/futures/futures.py |
187 |
@@ -11,6 +11,7 @@ __all__ = ( |
188 |
'CancelledError', |
189 |
'Future', |
190 |
'InvalidStateError', |
191 |
+ 'TimeoutError', |
192 |
) |
193 |
|
194 |
try: |
195 |
@@ -18,6 +19,7 @@ try: |
196 |
CancelledError, |
197 |
Future, |
198 |
InvalidStateError, |
199 |
+ TimeoutError, |
200 |
) |
201 |
except ImportError: |
202 |
|
203 |
@@ -30,6 +32,10 @@ except ImportError: |
204 |
def __init__(self): |
205 |
Error.__init__(self, "cancelled") |
206 |
|
207 |
+ class TimeoutError(Error): |
208 |
+ def __init__(self): |
209 |
+ Error.__init__(self, "timed out") |
210 |
+ |
211 |
class InvalidStateError(Error): |
212 |
pass |
213 |
|
214 |
|
215 |
diff --git a/pym/portage/util/futures/retry.py b/pym/portage/util/futures/retry.py |
216 |
new file mode 100644 |
217 |
index 000000000..902ed163a |
218 |
--- /dev/null |
219 |
+++ b/pym/portage/util/futures/retry.py |
220 |
@@ -0,0 +1,183 @@ |
221 |
+# Copyright 2018 Gentoo Foundation |
222 |
+# Distributed under the terms of the GNU General Public License v2 |
223 |
+ |
224 |
+__all__ = ( |
225 |
+ 'RetryError', |
226 |
+ 'retry', |
227 |
+) |
228 |
+ |
229 |
+import functools |
230 |
+ |
231 |
+from portage.exception import PortageException |
232 |
+from portage.util._eventloop.global_event_loop import global_event_loop |
233 |
+from portage.util.futures.futures import TimeoutError |
234 |
+ |
235 |
+ |
236 |
+class RetryError(PortageException): |
237 |
+ """Raised when retry fails.""" |
238 |
+ def __init__(self): |
239 |
+ PortageException.__init__(self, "retry error") |
240 |
+ |
241 |
+ |
242 |
+def retry(try_max=None, try_timeout=None, overall_timeout=None, |
243 |
+ delay_func=None, reraise=False, loop=None): |
244 |
+ """ |
245 |
+ Create and return a retry decorator. The decorator is intended to |
246 |
+ operate only on a coroutine function. |
247 |
+ |
248 |
+ @param try_max: maximum number of tries |
249 |
+ @type try_max: int or None |
250 |
+ @param try_timeout: number of seconds to wait for a try to succeed |
251 |
+ before cancelling it, which is only effective if func returns |
252 |
+ tasks that support cancellation |
253 |
+ @type try_timeout: float or None |
254 |
+ @param overall_timeout: number of seconds to wait for retires to |
255 |
+ succeed before aborting, which is only effective if func returns |
256 |
+ tasks that support cancellation |
257 |
+ @type overall_timeout: float or None |
258 |
+ @param delay_func: function that takes an int argument corresponding |
259 |
+ to the number of previous tries and returns a number of seconds |
260 |
+ to wait before the next try |
261 |
+ @type delay_func: callable |
262 |
+ @param reraise: Reraise the last exception, instead of RetryError |
263 |
+ @type reraise: bool |
264 |
+ @param loop: event loop |
265 |
+ @type loop: EventLoop |
266 |
+ @return: func decorated with retry support |
267 |
+ @rtype: callable |
268 |
+ """ |
269 |
+ return functools.partial(_retry_wrapper, loop, try_max, try_timeout, |
270 |
+ overall_timeout, delay_func, reraise) |
271 |
+ |
272 |
+ |
273 |
+def _retry_wrapper(loop, try_max, try_timeout, overall_timeout, delay_func, |
274 |
+ reraise, func): |
275 |
+ """ |
276 |
+ Create and return a decorated function. |
277 |
+ """ |
278 |
+ return functools.partial(_retry, loop, try_max, try_timeout, |
279 |
+ overall_timeout, delay_func, reraise, func) |
280 |
+ |
281 |
+ |
282 |
+def _retry(loop, try_max, try_timeout, overall_timeout, delay_func, |
283 |
+ reraise, func, *args, **kwargs): |
284 |
+ """ |
285 |
+ Retry coroutine, used to implement retry decorator. |
286 |
+ |
287 |
+ @return: func return value |
288 |
+ @rtype: asyncio.Future (or compatible) |
289 |
+ """ |
290 |
+ loop = loop or global_event_loop() |
291 |
+ future = loop.create_future() |
292 |
+ _Retry(future, loop, try_max, try_timeout, overall_timeout, delay_func, |
293 |
+ reraise, functools.partial(func, *args, **kwargs)) |
294 |
+ return future |
295 |
+ |
296 |
+ |
297 |
+class _Retry(object): |
298 |
+ def __init__(self, future, loop, try_max, try_timeout, overall_timeout, |
299 |
+ delay_func, reraise, func): |
300 |
+ self._future = future |
301 |
+ self._loop = loop |
302 |
+ self._try_max = try_max |
303 |
+ self._try_timeout = try_timeout |
304 |
+ self._delay_func = delay_func |
305 |
+ self._reraise = reraise |
306 |
+ self._func = func |
307 |
+ |
308 |
+ self._try_timeout_handle = None |
309 |
+ self._overall_timeout_handle = None |
310 |
+ self._overall_timeout_expired = None |
311 |
+ self._tries = 0 |
312 |
+ self._current_task = None |
313 |
+ self._previous_result = None |
314 |
+ |
315 |
+ future.add_done_callback(self._cancel_callback) |
316 |
+ if overall_timeout is not None: |
317 |
+ self._overall_timeout_handle = loop.call_later( |
318 |
+ overall_timeout, self._overall_timeout_callback) |
319 |
+ self._begin_try() |
320 |
+ |
321 |
+ def _cancel_callback(self, future): |
322 |
+ if future.cancelled() and self._current_task is not None: |
323 |
+ self._current_task.cancel() |
324 |
+ |
325 |
+ def _try_timeout_callback(self): |
326 |
+ self._try_timeout_handle = None |
327 |
+ self._current_task.cancel() |
328 |
+ |
329 |
+ def _overall_timeout_callback(self): |
330 |
+ self._overall_timeout_handle = None |
331 |
+ self._overall_timeout_expired = True |
332 |
+ self._current_task.cancel() |
333 |
+ self._retry_error() |
334 |
+ |
335 |
+ def _begin_try(self): |
336 |
+ self._tries += 1 |
337 |
+ self._current_task = self._func() |
338 |
+ self._current_task.add_done_callback(self._try_done) |
339 |
+ if self._try_timeout is not None: |
340 |
+ self._try_timeout_handle = self._loop.call_later( |
341 |
+ self._try_timeout, self._try_timeout_callback) |
342 |
+ |
343 |
+ def _try_done(self, future): |
344 |
+ self._current_task = None |
345 |
+ |
346 |
+ if self._try_timeout_handle is not None: |
347 |
+ self._try_timeout_handle.cancel() |
348 |
+ self._try_timeout_handle = None |
349 |
+ |
350 |
+ if not future.cancelled(): |
351 |
+ # consume exception, so that the event loop |
352 |
+ # exception handler does not report it |
353 |
+ future.exception() |
354 |
+ |
355 |
+ if self._overall_timeout_expired: |
356 |
+ return |
357 |
+ |
358 |
+ try: |
359 |
+ if self._future.cancelled(): |
360 |
+ return |
361 |
+ |
362 |
+ self._previous_result = future |
363 |
+ if not (future.cancelled() or future.exception() is not None): |
364 |
+ # success |
365 |
+ self._future.set_result(future.result()) |
366 |
+ return |
367 |
+ finally: |
368 |
+ if self._future.done() and self._overall_timeout_handle is not None: |
369 |
+ self._overall_timeout_handle.cancel() |
370 |
+ self._overall_timeout_handle = None |
371 |
+ |
372 |
+ if self._try_max is not None and self._tries >= self._try_max: |
373 |
+ self._retry_error() |
374 |
+ return |
375 |
+ |
376 |
+ if self._delay_func is not None: |
377 |
+ delay = self._delay_func(self._tries) |
378 |
+ self._current_task = self._loop.call_later(delay, self._delay_done) |
379 |
+ return |
380 |
+ |
381 |
+ self._begin_try() |
382 |
+ |
383 |
+ def _delay_done(self): |
384 |
+ self._current_task = None |
385 |
+ |
386 |
+ if self._future.cancelled() or self._overall_timeout_expired: |
387 |
+ return |
388 |
+ |
389 |
+ self._begin_try() |
390 |
+ |
391 |
+ def _retry_error(self): |
392 |
+ if self._previous_result is None or self._previous_result.cancelled(): |
393 |
+ cause = TimeoutError() |
394 |
+ else: |
395 |
+ cause = self._previous_result.exception() |
396 |
+ |
397 |
+ if self._reraise: |
398 |
+ e = cause |
399 |
+ else: |
400 |
+ e = RetryError() |
401 |
+ e.__cause__ = cause |
402 |
+ |
403 |
+ self._future.set_exception(e) |