Gentoo Archives: gentoo-commits

From: Zac Medico <zmedico@g.o>
To: gentoo-commits@l.g.o
Subject: [gentoo-commits] proj/portage:master commit in: pym/portage/tests/util/futures/, pym/portage/util/futures/
Date: Mon, 02 Apr 2018 17:11:53
Message-Id: 1522688004.1aa9a71731f3ab05e10190493956c70998abf12a.zmedico@gentoo
1 commit: 1aa9a71731f3ab05e10190493956c70998abf12a
2 Author: Zac Medico <zmedico <AT> gentoo <DOT> org>
3 AuthorDate: Fri Mar 16 19:37:54 2018 +0000
4 Commit: Zac Medico <zmedico <AT> gentoo <DOT> org>
5 CommitDate: Mon Apr 2 16:53:24 2018 +0000
6 URL: https://gitweb.gentoo.org/proj/portage.git/commit/?id=1aa9a717
7
8 Add retry decorator (API inspired by tenacity)
9
10 This decorator will be useful for retrying asynchronous
11 operations, such as gpg key refresh (bug 649276). The
12 API is inspired by tenacity, but is simpler. Only
13 asynchronous functions (like @asyncio.coroutine functions)
14 are supported. In order to retry a synchronous function,
15 first convert it to an asynchronous function as follows:
16
17 asynchronous_func = functools.partial(
18 loop.run_in_executor, None, synchronous_func)
19
20 Bug: https://bugs.gentoo.org/649276
21 See: https://github.com/jd/tenacity
22 Reviewed-by: Alec Warner <antarus <AT> gentoo.org>
23
24 pym/portage/tests/util/futures/test_retry.py | 147 +++++++++++++++++++++
25 pym/portage/util/futures/futures.py | 6 +
26 pym/portage/util/futures/retry.py | 183 +++++++++++++++++++++++++++
27 3 files changed, 336 insertions(+)
28
29 diff --git a/pym/portage/tests/util/futures/test_retry.py b/pym/portage/tests/util/futures/test_retry.py
30 new file mode 100644
31 index 000000000..7641e4e92
32 --- /dev/null
33 +++ b/pym/portage/tests/util/futures/test_retry.py
34 @@ -0,0 +1,147 @@
35 +# Copyright 2018 Gentoo Foundation
36 +# Distributed under the terms of the GNU General Public License v2
37 +
38 +import functools
39 +
40 +try:
41 + import threading
42 +except ImportError:
43 + import dummy_threading as threading
44 +
45 +from portage.tests import TestCase
46 +from portage.util._eventloop.global_event_loop import global_event_loop
47 +from portage.util.backoff import RandomExponentialBackoff
48 +from portage.util.futures.futures import TimeoutError
49 +from portage.util.futures.retry import retry
50 +from portage.util.futures.wait import wait
51 +from portage.util.monotonic import monotonic
52 +
53 +
54 +class SucceedLaterException(Exception):
55 + pass
56 +
57 +
58 +class SucceedLater(object):
59 + """
60 + A callable object that succeeds some duration of time has passed.
61 + """
62 + def __init__(self, duration):
63 + self._succeed_time = monotonic() + duration
64 +
65 + def __call__(self):
66 + remaining = self._succeed_time - monotonic()
67 + if remaining > 0:
68 + raise SucceedLaterException('time until success: {} seconds'.format(remaining))
69 + return 'success'
70 +
71 +
72 +class SucceedNeverException(Exception):
73 + pass
74 +
75 +
76 +class SucceedNever(object):
77 + """
78 + A callable object that never succeeds.
79 + """
80 + def __call__(self):
81 + raise SucceedNeverException('expected failure')
82 +
83 +
84 +class HangForever(object):
85 + """
86 + A callable object that sleeps forever.
87 + """
88 + def __call__(self):
89 + threading.Event().wait()
90 +
91 +
92 +class RetryTestCase(TestCase):
93 + def testSucceedLater(self):
94 + loop = global_event_loop()
95 + func = SucceedLater(1)
96 + func_coroutine = functools.partial(loop.run_in_executor, None, func)
97 + decorator = retry(try_max=9999,
98 + delay_func=RandomExponentialBackoff(multiplier=0.1, base=2))
99 + decorated_func = decorator(func_coroutine)
100 + result = loop.run_until_complete(decorated_func())
101 + self.assertEqual(result, 'success')
102 +
103 + def testSucceedNever(self):
104 + loop = global_event_loop()
105 + func = SucceedNever()
106 + func_coroutine = functools.partial(loop.run_in_executor, None, func)
107 + decorator = retry(try_max=4, try_timeout=None,
108 + delay_func=RandomExponentialBackoff(multiplier=0.1, base=2))
109 + decorated_func = decorator(func_coroutine)
110 + done, pending = loop.run_until_complete(wait([decorated_func()]))
111 + self.assertEqual(len(done), 1)
112 + self.assertTrue(isinstance(done[0].exception().__cause__, SucceedNeverException))
113 +
114 + def testSucceedNeverReraise(self):
115 + loop = global_event_loop()
116 + func = SucceedNever()
117 + func_coroutine = functools.partial(loop.run_in_executor, None, func)
118 + decorator = retry(reraise=True, try_max=4, try_timeout=None,
119 + delay_func=RandomExponentialBackoff(multiplier=0.1, base=2))
120 + decorated_func = decorator(func_coroutine)
121 + done, pending = loop.run_until_complete(wait([decorated_func()]))
122 + self.assertEqual(len(done), 1)
123 + self.assertTrue(isinstance(done[0].exception(), SucceedNeverException))
124 +
125 + def testHangForever(self):
126 + loop = global_event_loop()
127 + func = HangForever()
128 + func_coroutine = functools.partial(loop.run_in_executor, None, func)
129 + decorator = retry(try_max=2, try_timeout=0.1,
130 + delay_func=RandomExponentialBackoff(multiplier=0.1, base=2))
131 + decorated_func = decorator(func_coroutine)
132 + done, pending = loop.run_until_complete(wait([decorated_func()]))
133 + self.assertEqual(len(done), 1)
134 + self.assertTrue(isinstance(done[0].exception().__cause__, TimeoutError))
135 +
136 + def testHangForeverReraise(self):
137 + loop = global_event_loop()
138 + func = HangForever()
139 + func_coroutine = functools.partial(loop.run_in_executor, None, func)
140 + decorator = retry(reraise=True, try_max=2, try_timeout=0.1,
141 + delay_func=RandomExponentialBackoff(multiplier=0.1, base=2))
142 + decorated_func = decorator(func_coroutine)
143 + done, pending = loop.run_until_complete(wait([decorated_func()]))
144 + self.assertEqual(len(done), 1)
145 + self.assertTrue(isinstance(done[0].exception(), TimeoutError))
146 +
147 + def testCancelRetry(self):
148 + loop = global_event_loop()
149 + func = SucceedNever()
150 + func_coroutine = functools.partial(loop.run_in_executor, None, func)
151 + decorator = retry(try_timeout=0.1,
152 + delay_func=RandomExponentialBackoff(multiplier=0.1, base=2))
153 + decorated_func = decorator(func_coroutine)
154 + future = decorated_func()
155 + loop.call_later(0.3, future.cancel)
156 + done, pending = loop.run_until_complete(wait([future]))
157 + self.assertEqual(len(done), 1)
158 + self.assertTrue(done[0].cancelled())
159 +
160 + def testOverallTimeoutWithException(self):
161 + loop = global_event_loop()
162 + func = SucceedNever()
163 + func_coroutine = functools.partial(loop.run_in_executor, None, func)
164 + decorator = retry(try_timeout=0.1, overall_timeout=0.3,
165 + delay_func=RandomExponentialBackoff(multiplier=0.1, base=2))
166 + decorated_func = decorator(func_coroutine)
167 + done, pending = loop.run_until_complete(wait([decorated_func()]))
168 + self.assertEqual(len(done), 1)
169 + self.assertTrue(isinstance(done[0].exception().__cause__, SucceedNeverException))
170 +
171 + def testOverallTimeoutWithTimeoutError(self):
172 + loop = global_event_loop()
173 + # results in TimeoutError because it hangs forever
174 + func = HangForever()
175 + func_coroutine = functools.partial(loop.run_in_executor, None, func)
176 + decorator = retry(try_timeout=0.1, overall_timeout=0.3,
177 + delay_func=RandomExponentialBackoff(multiplier=0.1, base=2))
178 + decorated_func = decorator(func_coroutine)
179 + done, pending = loop.run_until_complete(wait([decorated_func()]))
180 + self.assertEqual(len(done), 1)
181 + self.assertTrue(isinstance(done[0].exception().__cause__, TimeoutError))
182
183 diff --git a/pym/portage/util/futures/futures.py b/pym/portage/util/futures/futures.py
184 index dcf593c01..cd56a27eb 100644
185 --- a/pym/portage/util/futures/futures.py
186 +++ b/pym/portage/util/futures/futures.py
187 @@ -11,6 +11,7 @@ __all__ = (
188 'CancelledError',
189 'Future',
190 'InvalidStateError',
191 + 'TimeoutError',
192 )
193
194 try:
195 @@ -18,6 +19,7 @@ try:
196 CancelledError,
197 Future,
198 InvalidStateError,
199 + TimeoutError,
200 )
201 except ImportError:
202
203 @@ -30,6 +32,10 @@ except ImportError:
204 def __init__(self):
205 Error.__init__(self, "cancelled")
206
207 + class TimeoutError(Error):
208 + def __init__(self):
209 + Error.__init__(self, "timed out")
210 +
211 class InvalidStateError(Error):
212 pass
213
214
215 diff --git a/pym/portage/util/futures/retry.py b/pym/portage/util/futures/retry.py
216 new file mode 100644
217 index 000000000..902ed163a
218 --- /dev/null
219 +++ b/pym/portage/util/futures/retry.py
220 @@ -0,0 +1,183 @@
221 +# Copyright 2018 Gentoo Foundation
222 +# Distributed under the terms of the GNU General Public License v2
223 +
224 +__all__ = (
225 + 'RetryError',
226 + 'retry',
227 +)
228 +
229 +import functools
230 +
231 +from portage.exception import PortageException
232 +from portage.util._eventloop.global_event_loop import global_event_loop
233 +from portage.util.futures.futures import TimeoutError
234 +
235 +
236 +class RetryError(PortageException):
237 + """Raised when retry fails."""
238 + def __init__(self):
239 + PortageException.__init__(self, "retry error")
240 +
241 +
242 +def retry(try_max=None, try_timeout=None, overall_timeout=None,
243 + delay_func=None, reraise=False, loop=None):
244 + """
245 + Create and return a retry decorator. The decorator is intended to
246 + operate only on a coroutine function.
247 +
248 + @param try_max: maximum number of tries
249 + @type try_max: int or None
250 + @param try_timeout: number of seconds to wait for a try to succeed
251 + before cancelling it, which is only effective if func returns
252 + tasks that support cancellation
253 + @type try_timeout: float or None
254 + @param overall_timeout: number of seconds to wait for retires to
255 + succeed before aborting, which is only effective if func returns
256 + tasks that support cancellation
257 + @type overall_timeout: float or None
258 + @param delay_func: function that takes an int argument corresponding
259 + to the number of previous tries and returns a number of seconds
260 + to wait before the next try
261 + @type delay_func: callable
262 + @param reraise: Reraise the last exception, instead of RetryError
263 + @type reraise: bool
264 + @param loop: event loop
265 + @type loop: EventLoop
266 + @return: func decorated with retry support
267 + @rtype: callable
268 + """
269 + return functools.partial(_retry_wrapper, loop, try_max, try_timeout,
270 + overall_timeout, delay_func, reraise)
271 +
272 +
273 +def _retry_wrapper(loop, try_max, try_timeout, overall_timeout, delay_func,
274 + reraise, func):
275 + """
276 + Create and return a decorated function.
277 + """
278 + return functools.partial(_retry, loop, try_max, try_timeout,
279 + overall_timeout, delay_func, reraise, func)
280 +
281 +
282 +def _retry(loop, try_max, try_timeout, overall_timeout, delay_func,
283 + reraise, func, *args, **kwargs):
284 + """
285 + Retry coroutine, used to implement retry decorator.
286 +
287 + @return: func return value
288 + @rtype: asyncio.Future (or compatible)
289 + """
290 + loop = loop or global_event_loop()
291 + future = loop.create_future()
292 + _Retry(future, loop, try_max, try_timeout, overall_timeout, delay_func,
293 + reraise, functools.partial(func, *args, **kwargs))
294 + return future
295 +
296 +
297 +class _Retry(object):
298 + def __init__(self, future, loop, try_max, try_timeout, overall_timeout,
299 + delay_func, reraise, func):
300 + self._future = future
301 + self._loop = loop
302 + self._try_max = try_max
303 + self._try_timeout = try_timeout
304 + self._delay_func = delay_func
305 + self._reraise = reraise
306 + self._func = func
307 +
308 + self._try_timeout_handle = None
309 + self._overall_timeout_handle = None
310 + self._overall_timeout_expired = None
311 + self._tries = 0
312 + self._current_task = None
313 + self._previous_result = None
314 +
315 + future.add_done_callback(self._cancel_callback)
316 + if overall_timeout is not None:
317 + self._overall_timeout_handle = loop.call_later(
318 + overall_timeout, self._overall_timeout_callback)
319 + self._begin_try()
320 +
321 + def _cancel_callback(self, future):
322 + if future.cancelled() and self._current_task is not None:
323 + self._current_task.cancel()
324 +
325 + def _try_timeout_callback(self):
326 + self._try_timeout_handle = None
327 + self._current_task.cancel()
328 +
329 + def _overall_timeout_callback(self):
330 + self._overall_timeout_handle = None
331 + self._overall_timeout_expired = True
332 + self._current_task.cancel()
333 + self._retry_error()
334 +
335 + def _begin_try(self):
336 + self._tries += 1
337 + self._current_task = self._func()
338 + self._current_task.add_done_callback(self._try_done)
339 + if self._try_timeout is not None:
340 + self._try_timeout_handle = self._loop.call_later(
341 + self._try_timeout, self._try_timeout_callback)
342 +
343 + def _try_done(self, future):
344 + self._current_task = None
345 +
346 + if self._try_timeout_handle is not None:
347 + self._try_timeout_handle.cancel()
348 + self._try_timeout_handle = None
349 +
350 + if not future.cancelled():
351 + # consume exception, so that the event loop
352 + # exception handler does not report it
353 + future.exception()
354 +
355 + if self._overall_timeout_expired:
356 + return
357 +
358 + try:
359 + if self._future.cancelled():
360 + return
361 +
362 + self._previous_result = future
363 + if not (future.cancelled() or future.exception() is not None):
364 + # success
365 + self._future.set_result(future.result())
366 + return
367 + finally:
368 + if self._future.done() and self._overall_timeout_handle is not None:
369 + self._overall_timeout_handle.cancel()
370 + self._overall_timeout_handle = None
371 +
372 + if self._try_max is not None and self._tries >= self._try_max:
373 + self._retry_error()
374 + return
375 +
376 + if self._delay_func is not None:
377 + delay = self._delay_func(self._tries)
378 + self._current_task = self._loop.call_later(delay, self._delay_done)
379 + return
380 +
381 + self._begin_try()
382 +
383 + def _delay_done(self):
384 + self._current_task = None
385 +
386 + if self._future.cancelled() or self._overall_timeout_expired:
387 + return
388 +
389 + self._begin_try()
390 +
391 + def _retry_error(self):
392 + if self._previous_result is None or self._previous_result.cancelled():
393 + cause = TimeoutError()
394 + else:
395 + cause = self._previous_result.exception()
396 +
397 + if self._reraise:
398 + e = cause
399 + else:
400 + e = RetryError()
401 + e.__cause__ = cause
402 +
403 + self._future.set_exception(e)