Gentoo Archives: gentoo-portage-dev

From: Alec Warner <antarus@g.o>
To: gentoo-portage-dev@l.g.o
Cc: Zac Medico <zmedico@g.o>
Subject: Re: [gentoo-portage-dev] [PATCH 3/4] Add retry decorator (API inspired by tenacity)
Date: Sun, 01 Apr 2018 14:31:03
Message-Id: CAAr7Pr_2opjd1gzZivncZpCanjAjFs+8fuuR2DwUYVOZsS6SqA@mail.gmail.com
In Reply to: [gentoo-portage-dev] [PATCH 3/4] Add retry decorator (API inspired by tenacity) by Zac Medico
1 also lgtm.
2
3 On Sat, Mar 31, 2018 at 10:46 PM, Zac Medico <zmedico@g.o> wrote:
4
5 > This decorator will be useful for retrying asynchronous
6 > operations, such as gpg key refresh (bug 649276). The
7 > API is inspired by tenacity, but is simpler. Only
8 > asynchronous functions (like @asyncio.coroutine functions)
9 > are supported. In order to retry a synchronous function,
10 > first convert it to an asynchronous function as follows:
11 >
12 > asynchronous_func = functools.partial(
13 > loop.run_in_executor, None, synchronous_func)
14 >
15 > Bug: https://bugs.gentoo.org/649276
16 > See: https://github.com/jd/tenacity
17 > ---
18 > pym/portage/tests/util/futures/test_retry.py | 147 ++++++++++++++++++++++
19 > pym/portage/util/futures/futures.py | 6 +
20 > pym/portage/util/futures/retry.py | 178
21 > +++++++++++++++++++++++++++
22 > 3 files changed, 331 insertions(+)
23 > create mode 100644 pym/portage/tests/util/futures/test_retry.py
24 > create mode 100644 pym/portage/util/futures/retry.py
25 >
26 > diff --git a/pym/portage/tests/util/futures/test_retry.py
27 > b/pym/portage/tests/util/futures/test_retry.py
28 > new file mode 100644
29 > index 000000000..7641e4e92
30 > --- /dev/null
31 > +++ b/pym/portage/tests/util/futures/test_retry.py
32 > @@ -0,0 +1,147 @@
33 > +# Copyright 2018 Gentoo Foundation
34 > +# Distributed under the terms of the GNU General Public License v2
35 > +
36 > +import functools
37 > +
38 > +try:
39 > + import threading
40 > +except ImportError:
41 > + import dummy_threading as threading
42 > +
43 > +from portage.tests import TestCase
44 > +from portage.util._eventloop.global_event_loop import global_event_loop
45 > +from portage.util.backoff import RandomExponentialBackoff
46 > +from portage.util.futures.futures import TimeoutError
47 > +from portage.util.futures.retry import retry
48 > +from portage.util.futures.wait import wait
49 > +from portage.util.monotonic import monotonic
50 > +
51 > +
52 > +class SucceedLaterException(Exception):
53 > + pass
54 > +
55 > +
56 > +class SucceedLater(object):
57 > + """
58 > + A callable object that succeeds some duration of time has passed.
59 > + """
60 > + def __init__(self, duration):
61 > + self._succeed_time = monotonic() + duration
62 > +
63 > + def __call__(self):
64 > + remaining = self._succeed_time - monotonic()
65 > + if remaining > 0:
66 > + raise SucceedLaterException('time until success:
67 > {} seconds'.format(remaining))
68 > + return 'success'
69 > +
70 > +
71 > +class SucceedNeverException(Exception):
72 > + pass
73 > +
74 > +
75 > +class SucceedNever(object):
76 > + """
77 > + A callable object that never succeeds.
78 > + """
79 > + def __call__(self):
80 > + raise SucceedNeverException('expected failure')
81 > +
82 > +
83 > +class HangForever(object):
84 > + """
85 > + A callable object that sleeps forever.
86 > + """
87 > + def __call__(self):
88 > + threading.Event().wait()
89 > +
90 > +
91 > +class RetryTestCase(TestCase):
92 > + def testSucceedLater(self):
93 > + loop = global_event_loop()
94 > + func = SucceedLater(1)
95 > + func_coroutine = functools.partial(loop.run_in_executor,
96 > None, func)
97 > + decorator = retry(try_max=9999,
98 > + delay_func=RandomExponentialBackoff(multiplier=0.1,
99 > base=2))
100 > + decorated_func = decorator(func_coroutine)
101 > + result = loop.run_until_complete(decorated_func())
102 > + self.assertEqual(result, 'success')
103 > +
104 > + def testSucceedNever(self):
105 > + loop = global_event_loop()
106 > + func = SucceedNever()
107 > + func_coroutine = functools.partial(loop.run_in_executor,
108 > None, func)
109 > + decorator = retry(try_max=4, try_timeout=None,
110 > + delay_func=RandomExponentialBackoff(multiplier=0.1,
111 > base=2))
112 > + decorated_func = decorator(func_coroutine)
113 > + done, pending = loop.run_until_complete(wait([
114 > decorated_func()]))
115 > + self.assertEqual(len(done), 1)
116 > + self.assertTrue(isinstance(done[0].exception().__cause__,
117 > SucceedNeverException))
118 > +
119 > + def testSucceedNeverReraise(self):
120 > + loop = global_event_loop()
121 > + func = SucceedNever()
122 > + func_coroutine = functools.partial(loop.run_in_executor,
123 > None, func)
124 > + decorator = retry(reraise=True, try_max=4,
125 > try_timeout=None,
126 > + delay_func=RandomExponentialBackoff(multiplier=0.1,
127 > base=2))
128 > + decorated_func = decorator(func_coroutine)
129 > + done, pending = loop.run_until_complete(wait([
130 > decorated_func()]))
131 > + self.assertEqual(len(done), 1)
132 > + self.assertTrue(isinstance(done[0].exception(),
133 > SucceedNeverException))
134 > +
135 > + def testHangForever(self):
136 > + loop = global_event_loop()
137 > + func = HangForever()
138 > + func_coroutine = functools.partial(loop.run_in_executor,
139 > None, func)
140 > + decorator = retry(try_max=2, try_timeout=0.1,
141 > + delay_func=RandomExponentialBackoff(multiplier=0.1,
142 > base=2))
143 > + decorated_func = decorator(func_coroutine)
144 > + done, pending = loop.run_until_complete(wait([
145 > decorated_func()]))
146 > + self.assertEqual(len(done), 1)
147 > + self.assertTrue(isinstance(done[0].exception().__cause__,
148 > TimeoutError))
149 > +
150 > + def testHangForeverReraise(self):
151 > + loop = global_event_loop()
152 > + func = HangForever()
153 > + func_coroutine = functools.partial(loop.run_in_executor,
154 > None, func)
155 > + decorator = retry(reraise=True, try_max=2, try_timeout=0.1,
156 > + delay_func=RandomExponentialBackoff(multiplier=0.1,
157 > base=2))
158 > + decorated_func = decorator(func_coroutine)
159 > + done, pending = loop.run_until_complete(wait([
160 > decorated_func()]))
161 > + self.assertEqual(len(done), 1)
162 > + self.assertTrue(isinstance(done[0].exception(),
163 > TimeoutError))
164 > +
165 > + def testCancelRetry(self):
166 > + loop = global_event_loop()
167 > + func = SucceedNever()
168 > + func_coroutine = functools.partial(loop.run_in_executor,
169 > None, func)
170 > + decorator = retry(try_timeout=0.1,
171 > + delay_func=RandomExponentialBackoff(multiplier=0.1,
172 > base=2))
173 > + decorated_func = decorator(func_coroutine)
174 > + future = decorated_func()
175 > + loop.call_later(0.3, future.cancel)
176 > + done, pending = loop.run_until_complete(wait([future]))
177 > + self.assertEqual(len(done), 1)
178 > + self.assertTrue(done[0].cancelled())
179 > +
180 > + def testOverallTimeoutWithException(self):
181 > + loop = global_event_loop()
182 > + func = SucceedNever()
183 > + func_coroutine = functools.partial(loop.run_in_executor,
184 > None, func)
185 > + decorator = retry(try_timeout=0.1, overall_timeout=0.3,
186 > + delay_func=RandomExponentialBackoff(multiplier=0.1,
187 > base=2))
188 > + decorated_func = decorator(func_coroutine)
189 > + done, pending = loop.run_until_complete(wait([
190 > decorated_func()]))
191 > + self.assertEqual(len(done), 1)
192 > + self.assertTrue(isinstance(done[0].exception().__cause__,
193 > SucceedNeverException))
194 > +
195 > + def testOverallTimeoutWithTimeoutError(self):
196 > + loop = global_event_loop()
197 > + # results in TimeoutError because it hangs forever
198 > + func = HangForever()
199 > + func_coroutine = functools.partial(loop.run_in_executor,
200 > None, func)
201 > + decorator = retry(try_timeout=0.1, overall_timeout=0.3,
202 > + delay_func=RandomExponentialBackoff(multiplier=0.1,
203 > base=2))
204 > + decorated_func = decorator(func_coroutine)
205 > + done, pending = loop.run_until_complete(wait([
206 > decorated_func()]))
207 > + self.assertEqual(len(done), 1)
208 > + self.assertTrue(isinstance(done[0].exception().__cause__,
209 > TimeoutError))
210 > diff --git a/pym/portage/util/futures/futures.py
211 > b/pym/portage/util/futures/futures.py
212 > index dcf593c01..cd56a27eb 100644
213 > --- a/pym/portage/util/futures/futures.py
214 > +++ b/pym/portage/util/futures/futures.py
215 > @@ -11,6 +11,7 @@ __all__ = (
216 > 'CancelledError',
217 > 'Future',
218 > 'InvalidStateError',
219 > + 'TimeoutError',
220 > )
221 >
222 > try:
223 > @@ -18,6 +19,7 @@ try:
224 > CancelledError,
225 > Future,
226 > InvalidStateError,
227 > + TimeoutError,
228 > )
229 > except ImportError:
230 >
231 > @@ -30,6 +32,10 @@ except ImportError:
232 > def __init__(self):
233 > Error.__init__(self, "cancelled")
234 >
235 > + class TimeoutError(Error):
236 > + def __init__(self):
237 > + Error.__init__(self, "timed out")
238 > +
239 > class InvalidStateError(Error):
240 > pass
241 >
242 > diff --git a/pym/portage/util/futures/retry.py b/pym/portage/util/futures/
243 > retry.py
244 > new file mode 100644
245 > index 000000000..2caf1bbac
246 > --- /dev/null
247 > +++ b/pym/portage/util/futures/retry.py
248 > @@ -0,0 +1,178 @@
249 > +# Copyright 2018 Gentoo Foundation
250 > +# Distributed under the terms of the GNU General Public License v2
251 > +
252 > +import functools
253 > +
254 > +from portage.exception import PortageException
255 > +from portage.util._eventloop.global_event_loop import global_event_loop
256 > +from portage.util.futures.futures import TimeoutError
257 > +
258 > +
259 > +class RetryError(PortageException):
260 > + """Raised when retry fails."""
261 > + def __init__(self):
262 > + PortageException.__init__(self, "retry error")
263 > +
264 > +
265 > +def retry(try_max=None, try_timeout=None, overall_timeout=None,
266 > + delay_func=None, reraise=False, loop=None):
267 > + """
268 > + Create and return a retry decorator. The decorator is intended to
269 > + operate only on a coroutine function.
270 > +
271 > + @param try_max: maximum number of tries
272 > + @type try_max: int or None
273 > + @param try_timeout: number of seconds to wait for a try to succeed
274 > + before cancelling it, which is only effective if func
275 > returns
276 > + tasks that support cancellation
277 > + @type try_timeout: float or None
278 > + @param overall_timeout: number of seconds to wait for retires to
279 > + succeed before aborting, which is only effective if func
280 > returns
281 > + tasks that support cancellation
282 > + @type overall_timeout: float or None
283 > + @param delay_func: function that takes an int argument
284 > corresponding
285 > + to the number of previous tries and returns a number of
286 > seconds
287 > + to wait before the next try
288 > + @type delay_func: callable
289 > + @param reraise: Reraise the last exception, instead of RetryError
290 > + @type reraise: bool
291 > + @param loop: event loop
292 > + @type loop: EventLoop
293 > + @return: func decorated with retry support
294 > + @rtype: callable
295 > + """
296 > + return functools.partial(_retry_wrapper, loop, try_max,
297 > try_timeout,
298 > + overall_timeout, delay_func, reraise)
299 > +
300 > +
301 > +def _retry_wrapper(loop, try_max, try_timeout, overall_timeout,
302 > delay_func,
303 > + reraise, func):
304 > + """
305 > + Create and return a decorated function.
306 > + """
307 > + return functools.partial(_retry, loop, try_max, try_timeout,
308 > + overall_timeout, delay_func, reraise, func)
309 > +
310 > +
311 > +def _retry(loop, try_max, try_timeout, overall_timeout, delay_func,
312 > + reraise, func, *args, **kwargs):
313 > + """
314 > + Retry coroutine, used to implement retry decorator.
315 > +
316 > + @return: func return value
317 > + @rtype: asyncio.Future (or compatible)
318 > + """
319 > + loop = loop or global_event_loop()
320 > + future = loop.create_future()
321 > + _Retry(future, loop, try_max, try_timeout, overall_timeout,
322 > delay_func,
323 > + reraise, functools.partial(func, *args, **kwargs))
324 > + return future
325 > +
326 > +
327 > +class _Retry(object):
328 > + def __init__(self, future, loop, try_max, try_timeout,
329 > overall_timeout,
330 > + delay_func, reraise, func):
331 > + self._future = future
332 > + self._loop = loop
333 > + self._try_max = try_max
334 > + self._try_timeout = try_timeout
335 > + self._delay_func = delay_func
336 > + self._reraise = reraise
337 > + self._func = func
338 > +
339 > + self._try_timeout_handle = None
340 > + self._overall_timeout_handle = None
341 > + self._overall_timeout_expired = None
342 > + self._tries = 0
343 > + self._current_task = None
344 > + self._previous_result = None
345 > +
346 > + future.add_done_callback(self._cancel_callback)
347 > + if overall_timeout is not None:
348 > + self._overall_timeout_handle = loop.call_later(
349 > + overall_timeout, self._overall_timeout_
350 > callback)
351 > + self._begin_try()
352 > +
353 > + def _cancel_callback(self, future):
354 > + if future.cancelled() and self._current_task is not None:
355 > + self._current_task.cancel()
356 > +
357 > + def _try_timeout_callback(self):
358 > + self._try_timeout_handle = None
359 > + self._current_task.cancel()
360 > +
361 > + def _overall_timeout_callback(self):
362 > + self._overall_timeout_handle = None
363 > + self._overall_timeout_expired = True
364 > + self._current_task.cancel()
365 > + self._retry_error()
366 > +
367 > + def _begin_try(self):
368 > + self._tries += 1
369 > + self._current_task = self._func()
370 > + self._current_task.add_done_callback(self._try_done)
371 > + if self._try_timeout is not None:
372 > + self._try_timeout_handle = self._loop.call_later(
373 > + self._try_timeout,
374 > self._try_timeout_callback)
375 > +
376 > + def _try_done(self, future):
377 > + self._current_task = None
378 > +
379 > + if self._try_timeout_handle is not None:
380 > + self._try_timeout_handle.cancel()
381 > + self._try_timeout_handle = None
382 > +
383 > + if not future.cancelled():
384 > + # consume exception, so that the event loop
385 > + # exception handler does not report it
386 > + future.exception()
387 > +
388 > + if self._overall_timeout_expired:
389 > + return
390 > +
391 > + try:
392 > + if self._future.cancelled():
393 > + return
394 > +
395 > + self._previous_result = future
396 > + if not (future.cancelled() or future.exception()
397 > is not None):
398 > + # success
399 > + self._future.set_result(future.result())
400 > + return
401 > + finally:
402 > + if self._future.done() and
403 > self._overall_timeout_handle is not None:
404 > + self._overall_timeout_handle.cancel()
405 > + self._overall_timeout_handle = None
406 > +
407 > + if self._try_max is not None and self._tries >=
408 > self._try_max:
409 > + self._retry_error()
410 > + return
411 > +
412 > + if self._delay_func is not None:
413 > + delay = self._delay_func(self._tries)
414 > + self._current_task = self._loop.call_later(delay,
415 > self._delay_done)
416 > + return
417 > +
418 > + self._begin_try()
419 > +
420 > + def _delay_done(self):
421 > + self._current_task = None
422 > +
423 > + if self._future.cancelled() or
424 > self._overall_timeout_expired:
425 > + return
426 > +
427 > + self._begin_try()
428 > +
429 > + def _retry_error(self):
430 > + if self._previous_result is None or self._previous_result.
431 > cancelled():
432 > + cause = TimeoutError()
433 > + else:
434 > + cause = self._previous_result.exception()
435 > +
436 > + if self._reraise:
437 > + e = cause
438 > + else:
439 > + e = RetryError()
440 > + e.__cause__ = cause
441 > +
442 > + self._future.set_exception(e)
443 > --
444 > 2.13.6
445 >
446 >
447 >