1 |
This decorator will be useful for retrying asynchronous |
2 |
operations, such as gpg key refresh (bug 649276). The |
3 |
API is inspired by tenacity, but is simpler. Only |
4 |
asynchronous functions (like @asyncio.coroutine functions) |
5 |
are supported. In order to retry a synchronous function, |
6 |
first convert it to an asynchronous function as follows: |
7 |
|
8 |
asynchronous_func = functools.partial( |
9 |
loop.run_in_executor, None, synchronous_func) |
10 |
|
11 |
Bug: https://bugs.gentoo.org/649276 |
12 |
See: https://github.com/jd/tenacity |
13 |
--- |
14 |
pym/portage/tests/util/futures/test_retry.py | 147 ++++++++++++++++++++++ |
15 |
pym/portage/util/futures/futures.py | 6 + |
16 |
pym/portage/util/futures/retry.py | 178 +++++++++++++++++++++++++++ |
17 |
3 files changed, 331 insertions(+) |
18 |
create mode 100644 pym/portage/tests/util/futures/test_retry.py |
19 |
create mode 100644 pym/portage/util/futures/retry.py |
20 |
|
21 |
diff --git a/pym/portage/tests/util/futures/test_retry.py b/pym/portage/tests/util/futures/test_retry.py |
22 |
new file mode 100644 |
23 |
index 000000000..7641e4e92 |
24 |
--- /dev/null |
25 |
+++ b/pym/portage/tests/util/futures/test_retry.py |
26 |
@@ -0,0 +1,147 @@ |
27 |
+# Copyright 2018 Gentoo Foundation |
28 |
+# Distributed under the terms of the GNU General Public License v2 |
29 |
+ |
30 |
+import functools |
31 |
+ |
32 |
+try: |
33 |
+ import threading |
34 |
+except ImportError: |
35 |
+ import dummy_threading as threading |
36 |
+ |
37 |
+from portage.tests import TestCase |
38 |
+from portage.util._eventloop.global_event_loop import global_event_loop |
39 |
+from portage.util.backoff import RandomExponentialBackoff |
40 |
+from portage.util.futures.futures import TimeoutError |
41 |
+from portage.util.futures.retry import retry |
42 |
+from portage.util.futures.wait import wait |
43 |
+from portage.util.monotonic import monotonic |
44 |
+ |
45 |
+ |
46 |
+class SucceedLaterException(Exception): |
47 |
+ pass |
48 |
+ |
49 |
+ |
50 |
+class SucceedLater(object): |
51 |
+ """ |
52 |
+ A callable object that succeeds some duration of time has passed. |
53 |
+ """ |
54 |
+ def __init__(self, duration): |
55 |
+ self._succeed_time = monotonic() + duration |
56 |
+ |
57 |
+ def __call__(self): |
58 |
+ remaining = self._succeed_time - monotonic() |
59 |
+ if remaining > 0: |
60 |
+ raise SucceedLaterException('time until success: {} seconds'.format(remaining)) |
61 |
+ return 'success' |
62 |
+ |
63 |
+ |
64 |
+class SucceedNeverException(Exception): |
65 |
+ pass |
66 |
+ |
67 |
+ |
68 |
+class SucceedNever(object): |
69 |
+ """ |
70 |
+ A callable object that never succeeds. |
71 |
+ """ |
72 |
+ def __call__(self): |
73 |
+ raise SucceedNeverException('expected failure') |
74 |
+ |
75 |
+ |
76 |
+class HangForever(object): |
77 |
+ """ |
78 |
+ A callable object that sleeps forever. |
79 |
+ """ |
80 |
+ def __call__(self): |
81 |
+ threading.Event().wait() |
82 |
+ |
83 |
+ |
84 |
+class RetryTestCase(TestCase): |
85 |
+ def testSucceedLater(self): |
86 |
+ loop = global_event_loop() |
87 |
+ func = SucceedLater(1) |
88 |
+ func_coroutine = functools.partial(loop.run_in_executor, None, func) |
89 |
+ decorator = retry(try_max=9999, |
90 |
+ delay_func=RandomExponentialBackoff(multiplier=0.1, base=2)) |
91 |
+ decorated_func = decorator(func_coroutine) |
92 |
+ result = loop.run_until_complete(decorated_func()) |
93 |
+ self.assertEqual(result, 'success') |
94 |
+ |
95 |
+ def testSucceedNever(self): |
96 |
+ loop = global_event_loop() |
97 |
+ func = SucceedNever() |
98 |
+ func_coroutine = functools.partial(loop.run_in_executor, None, func) |
99 |
+ decorator = retry(try_max=4, try_timeout=None, |
100 |
+ delay_func=RandomExponentialBackoff(multiplier=0.1, base=2)) |
101 |
+ decorated_func = decorator(func_coroutine) |
102 |
+ done, pending = loop.run_until_complete(wait([decorated_func()])) |
103 |
+ self.assertEqual(len(done), 1) |
104 |
+ self.assertTrue(isinstance(done[0].exception().__cause__, SucceedNeverException)) |
105 |
+ |
106 |
+ def testSucceedNeverReraise(self): |
107 |
+ loop = global_event_loop() |
108 |
+ func = SucceedNever() |
109 |
+ func_coroutine = functools.partial(loop.run_in_executor, None, func) |
110 |
+ decorator = retry(reraise=True, try_max=4, try_timeout=None, |
111 |
+ delay_func=RandomExponentialBackoff(multiplier=0.1, base=2)) |
112 |
+ decorated_func = decorator(func_coroutine) |
113 |
+ done, pending = loop.run_until_complete(wait([decorated_func()])) |
114 |
+ self.assertEqual(len(done), 1) |
115 |
+ self.assertTrue(isinstance(done[0].exception(), SucceedNeverException)) |
116 |
+ |
117 |
+ def testHangForever(self): |
118 |
+ loop = global_event_loop() |
119 |
+ func = HangForever() |
120 |
+ func_coroutine = functools.partial(loop.run_in_executor, None, func) |
121 |
+ decorator = retry(try_max=2, try_timeout=0.1, |
122 |
+ delay_func=RandomExponentialBackoff(multiplier=0.1, base=2)) |
123 |
+ decorated_func = decorator(func_coroutine) |
124 |
+ done, pending = loop.run_until_complete(wait([decorated_func()])) |
125 |
+ self.assertEqual(len(done), 1) |
126 |
+ self.assertTrue(isinstance(done[0].exception().__cause__, TimeoutError)) |
127 |
+ |
128 |
+ def testHangForeverReraise(self): |
129 |
+ loop = global_event_loop() |
130 |
+ func = HangForever() |
131 |
+ func_coroutine = functools.partial(loop.run_in_executor, None, func) |
132 |
+ decorator = retry(reraise=True, try_max=2, try_timeout=0.1, |
133 |
+ delay_func=RandomExponentialBackoff(multiplier=0.1, base=2)) |
134 |
+ decorated_func = decorator(func_coroutine) |
135 |
+ done, pending = loop.run_until_complete(wait([decorated_func()])) |
136 |
+ self.assertEqual(len(done), 1) |
137 |
+ self.assertTrue(isinstance(done[0].exception(), TimeoutError)) |
138 |
+ |
139 |
+ def testCancelRetry(self): |
140 |
+ loop = global_event_loop() |
141 |
+ func = SucceedNever() |
142 |
+ func_coroutine = functools.partial(loop.run_in_executor, None, func) |
143 |
+ decorator = retry(try_timeout=0.1, |
144 |
+ delay_func=RandomExponentialBackoff(multiplier=0.1, base=2)) |
145 |
+ decorated_func = decorator(func_coroutine) |
146 |
+ future = decorated_func() |
147 |
+ loop.call_later(0.3, future.cancel) |
148 |
+ done, pending = loop.run_until_complete(wait([future])) |
149 |
+ self.assertEqual(len(done), 1) |
150 |
+ self.assertTrue(done[0].cancelled()) |
151 |
+ |
152 |
+ def testOverallTimeoutWithException(self): |
153 |
+ loop = global_event_loop() |
154 |
+ func = SucceedNever() |
155 |
+ func_coroutine = functools.partial(loop.run_in_executor, None, func) |
156 |
+ decorator = retry(try_timeout=0.1, overall_timeout=0.3, |
157 |
+ delay_func=RandomExponentialBackoff(multiplier=0.1, base=2)) |
158 |
+ decorated_func = decorator(func_coroutine) |
159 |
+ done, pending = loop.run_until_complete(wait([decorated_func()])) |
160 |
+ self.assertEqual(len(done), 1) |
161 |
+ self.assertTrue(isinstance(done[0].exception().__cause__, SucceedNeverException)) |
162 |
+ |
163 |
+ def testOverallTimeoutWithTimeoutError(self): |
164 |
+ loop = global_event_loop() |
165 |
+ # results in TimeoutError because it hangs forever |
166 |
+ func = HangForever() |
167 |
+ func_coroutine = functools.partial(loop.run_in_executor, None, func) |
168 |
+ decorator = retry(try_timeout=0.1, overall_timeout=0.3, |
169 |
+ delay_func=RandomExponentialBackoff(multiplier=0.1, base=2)) |
170 |
+ decorated_func = decorator(func_coroutine) |
171 |
+ done, pending = loop.run_until_complete(wait([decorated_func()])) |
172 |
+ self.assertEqual(len(done), 1) |
173 |
+ self.assertTrue(isinstance(done[0].exception().__cause__, TimeoutError)) |
174 |
diff --git a/pym/portage/util/futures/futures.py b/pym/portage/util/futures/futures.py |
175 |
index dcf593c01..cd56a27eb 100644 |
176 |
--- a/pym/portage/util/futures/futures.py |
177 |
+++ b/pym/portage/util/futures/futures.py |
178 |
@@ -11,6 +11,7 @@ __all__ = ( |
179 |
'CancelledError', |
180 |
'Future', |
181 |
'InvalidStateError', |
182 |
+ 'TimeoutError', |
183 |
) |
184 |
|
185 |
try: |
186 |
@@ -18,6 +19,7 @@ try: |
187 |
CancelledError, |
188 |
Future, |
189 |
InvalidStateError, |
190 |
+ TimeoutError, |
191 |
) |
192 |
except ImportError: |
193 |
|
194 |
@@ -30,6 +32,10 @@ except ImportError: |
195 |
def __init__(self): |
196 |
Error.__init__(self, "cancelled") |
197 |
|
198 |
+ class TimeoutError(Error): |
199 |
+ def __init__(self): |
200 |
+ Error.__init__(self, "timed out") |
201 |
+ |
202 |
class InvalidStateError(Error): |
203 |
pass |
204 |
|
205 |
diff --git a/pym/portage/util/futures/retry.py b/pym/portage/util/futures/retry.py |
206 |
new file mode 100644 |
207 |
index 000000000..2caf1bbac |
208 |
--- /dev/null |
209 |
+++ b/pym/portage/util/futures/retry.py |
210 |
@@ -0,0 +1,178 @@ |
211 |
+# Copyright 2018 Gentoo Foundation |
212 |
+# Distributed under the terms of the GNU General Public License v2 |
213 |
+ |
214 |
+import functools |
215 |
+ |
216 |
+from portage.exception import PortageException |
217 |
+from portage.util._eventloop.global_event_loop import global_event_loop |
218 |
+from portage.util.futures.futures import TimeoutError |
219 |
+ |
220 |
+ |
221 |
+class RetryError(PortageException): |
222 |
+ """Raised when retry fails.""" |
223 |
+ def __init__(self): |
224 |
+ PortageException.__init__(self, "retry error") |
225 |
+ |
226 |
+ |
227 |
+def retry(try_max=None, try_timeout=None, overall_timeout=None, |
228 |
+ delay_func=None, reraise=False, loop=None): |
229 |
+ """ |
230 |
+ Create and return a retry decorator. The decorator is intended to |
231 |
+ operate only on a coroutine function. |
232 |
+ |
233 |
+ @param try_max: maximum number of tries |
234 |
+ @type try_max: int or None |
235 |
+ @param try_timeout: number of seconds to wait for a try to succeed |
236 |
+ before cancelling it, which is only effective if func returns |
237 |
+ tasks that support cancellation |
238 |
+ @type try_timeout: float or None |
239 |
+ @param overall_timeout: number of seconds to wait for retires to |
240 |
+ succeed before aborting, which is only effective if func returns |
241 |
+ tasks that support cancellation |
242 |
+ @type overall_timeout: float or None |
243 |
+ @param delay_func: function that takes an int argument corresponding |
244 |
+ to the number of previous tries and returns a number of seconds |
245 |
+ to wait before the next try |
246 |
+ @type delay_func: callable |
247 |
+ @param reraise: Reraise the last exception, instead of RetryError |
248 |
+ @type reraise: bool |
249 |
+ @param loop: event loop |
250 |
+ @type loop: EventLoop |
251 |
+ @return: func decorated with retry support |
252 |
+ @rtype: callable |
253 |
+ """ |
254 |
+ return functools.partial(_retry_wrapper, loop, try_max, try_timeout, |
255 |
+ overall_timeout, delay_func, reraise) |
256 |
+ |
257 |
+ |
258 |
+def _retry_wrapper(loop, try_max, try_timeout, overall_timeout, delay_func, |
259 |
+ reraise, func): |
260 |
+ """ |
261 |
+ Create and return a decorated function. |
262 |
+ """ |
263 |
+ return functools.partial(_retry, loop, try_max, try_timeout, |
264 |
+ overall_timeout, delay_func, reraise, func) |
265 |
+ |
266 |
+ |
267 |
+def _retry(loop, try_max, try_timeout, overall_timeout, delay_func, |
268 |
+ reraise, func, *args, **kwargs): |
269 |
+ """ |
270 |
+ Retry coroutine, used to implement retry decorator. |
271 |
+ |
272 |
+ @return: func return value |
273 |
+ @rtype: asyncio.Future (or compatible) |
274 |
+ """ |
275 |
+ loop = loop or global_event_loop() |
276 |
+ future = loop.create_future() |
277 |
+ _Retry(future, loop, try_max, try_timeout, overall_timeout, delay_func, |
278 |
+ reraise, functools.partial(func, *args, **kwargs)) |
279 |
+ return future |
280 |
+ |
281 |
+ |
282 |
+class _Retry(object): |
283 |
+ def __init__(self, future, loop, try_max, try_timeout, overall_timeout, |
284 |
+ delay_func, reraise, func): |
285 |
+ self._future = future |
286 |
+ self._loop = loop |
287 |
+ self._try_max = try_max |
288 |
+ self._try_timeout = try_timeout |
289 |
+ self._delay_func = delay_func |
290 |
+ self._reraise = reraise |
291 |
+ self._func = func |
292 |
+ |
293 |
+ self._try_timeout_handle = None |
294 |
+ self._overall_timeout_handle = None |
295 |
+ self._overall_timeout_expired = None |
296 |
+ self._tries = 0 |
297 |
+ self._current_task = None |
298 |
+ self._previous_result = None |
299 |
+ |
300 |
+ future.add_done_callback(self._cancel_callback) |
301 |
+ if overall_timeout is not None: |
302 |
+ self._overall_timeout_handle = loop.call_later( |
303 |
+ overall_timeout, self._overall_timeout_callback) |
304 |
+ self._begin_try() |
305 |
+ |
306 |
+ def _cancel_callback(self, future): |
307 |
+ if future.cancelled() and self._current_task is not None: |
308 |
+ self._current_task.cancel() |
309 |
+ |
310 |
+ def _try_timeout_callback(self): |
311 |
+ self._try_timeout_handle = None |
312 |
+ self._current_task.cancel() |
313 |
+ |
314 |
+ def _overall_timeout_callback(self): |
315 |
+ self._overall_timeout_handle = None |
316 |
+ self._overall_timeout_expired = True |
317 |
+ self._current_task.cancel() |
318 |
+ self._retry_error() |
319 |
+ |
320 |
+ def _begin_try(self): |
321 |
+ self._tries += 1 |
322 |
+ self._current_task = self._func() |
323 |
+ self._current_task.add_done_callback(self._try_done) |
324 |
+ if self._try_timeout is not None: |
325 |
+ self._try_timeout_handle = self._loop.call_later( |
326 |
+ self._try_timeout, self._try_timeout_callback) |
327 |
+ |
328 |
+ def _try_done(self, future): |
329 |
+ self._current_task = None |
330 |
+ |
331 |
+ if self._try_timeout_handle is not None: |
332 |
+ self._try_timeout_handle.cancel() |
333 |
+ self._try_timeout_handle = None |
334 |
+ |
335 |
+ if not future.cancelled(): |
336 |
+ # consume exception, so that the event loop |
337 |
+ # exception handler does not report it |
338 |
+ future.exception() |
339 |
+ |
340 |
+ if self._overall_timeout_expired: |
341 |
+ return |
342 |
+ |
343 |
+ try: |
344 |
+ if self._future.cancelled(): |
345 |
+ return |
346 |
+ |
347 |
+ self._previous_result = future |
348 |
+ if not (future.cancelled() or future.exception() is not None): |
349 |
+ # success |
350 |
+ self._future.set_result(future.result()) |
351 |
+ return |
352 |
+ finally: |
353 |
+ if self._future.done() and self._overall_timeout_handle is not None: |
354 |
+ self._overall_timeout_handle.cancel() |
355 |
+ self._overall_timeout_handle = None |
356 |
+ |
357 |
+ if self._try_max is not None and self._tries >= self._try_max: |
358 |
+ self._retry_error() |
359 |
+ return |
360 |
+ |
361 |
+ if self._delay_func is not None: |
362 |
+ delay = self._delay_func(self._tries) |
363 |
+ self._current_task = self._loop.call_later(delay, self._delay_done) |
364 |
+ return |
365 |
+ |
366 |
+ self._begin_try() |
367 |
+ |
368 |
+ def _delay_done(self): |
369 |
+ self._current_task = None |
370 |
+ |
371 |
+ if self._future.cancelled() or self._overall_timeout_expired: |
372 |
+ return |
373 |
+ |
374 |
+ self._begin_try() |
375 |
+ |
376 |
+ def _retry_error(self): |
377 |
+ if self._previous_result is None or self._previous_result.cancelled(): |
378 |
+ cause = TimeoutError() |
379 |
+ else: |
380 |
+ cause = self._previous_result.exception() |
381 |
+ |
382 |
+ if self._reraise: |
383 |
+ e = cause |
384 |
+ else: |
385 |
+ e = RetryError() |
386 |
+ e.__cause__ = cause |
387 |
+ |
388 |
+ self._future.set_exception(e) |
389 |
-- |
390 |
2.13.6 |