Gentoo Archives: gentoo-portage-dev

From: Zac Medico <zmedico@g.o>
To: gentoo-portage-dev@l.g.o
Cc: Zac Medico <zmedico@g.o>
Subject: [gentoo-portage-dev] [PATCH] Future: implement add_done_callback for asyncio compat (bug 591760)
Date: Sun, 26 Mar 2017 10:16:24
Message-Id: 20170326101311.24248-1-zmedico@gentoo.org
1 Implement the add_done_callback and remove_done_callback methods, since
2 they are required in order to make further progress toward asyncio
3 compatibility.
4
5 Also implement the AbstractEventLoop create_future method for the
6 EventLoop class, so that it returns an instance of _EventLoopFuture.
7 EventLoop currently does not implement some of the
8 asyncio.AbstractEventLoop methods that asyncio.Future requires for
9 its add_done_callback implementation, and the create_future method
10 conveniently solves this problem.
11
12 X-Gentoo-bug: 591760
13 X-Gentoo-bug-url: https://bugs.gentoo.org/show_bug.cgi?id=591760
14 ---
15 pym/portage/tests/ebuild/test_ipc_daemon.py | 3 +-
16 .../tests/util/eventloop/test_call_soon_fifo.py | 6 +-
17 pym/portage/tests/util/futures/__init__.py | 0
18 pym/portage/tests/util/futures/__test__.py | 0
19 .../tests/util/futures/test_done_callback.py | 35 +++++++++
20 pym/portage/util/_async/SchedulerInterface.py | 3 +-
21 pym/portage/util/_eventloop/EventLoop.py | 14 ++++
22 pym/portage/util/futures/futures.py | 82 ++++++++++++++++++++--
23 8 files changed, 132 insertions(+), 11 deletions(-)
24 create mode 100644 pym/portage/tests/util/futures/__init__.py
25 create mode 100644 pym/portage/tests/util/futures/__test__.py
26 create mode 100644 pym/portage/tests/util/futures/test_done_callback.py
27
28 diff --git a/pym/portage/tests/ebuild/test_ipc_daemon.py b/pym/portage/tests/ebuild/test_ipc_daemon.py
29 index 68f139a..fc79165 100644
30 --- a/pym/portage/tests/ebuild/test_ipc_daemon.py
31 +++ b/pym/portage/tests/ebuild/test_ipc_daemon.py
32 @@ -16,7 +16,6 @@ from portage.util import ensure_dirs
33 from portage.util._async.ForkProcess import ForkProcess
34 from portage.util._async.TaskScheduler import TaskScheduler
35 from portage.util._eventloop.global_event_loop import global_event_loop
36 -from portage.util.futures.futures import Future
37 from _emerge.SpawnProcess import SpawnProcess
38 from _emerge.EbuildBuildDir import EbuildBuildDir
39 from _emerge.EbuildIpcDaemon import EbuildIpcDaemon
40 @@ -150,7 +149,7 @@ class IpcDaemonTestCase(TestCase):
41 self._run_done.set_result(True)
42
43 def _run(self, event_loop, task_scheduler, timeout):
44 - self._run_done = Future()
45 + self._run_done = event_loop.create_future()
46 timeout_id = event_loop.timeout_add(timeout,
47 self._timeout_callback, task_scheduler)
48 task_scheduler.addExitListener(self._exit_callback)
49 diff --git a/pym/portage/tests/util/eventloop/test_call_soon_fifo.py b/pym/portage/tests/util/eventloop/test_call_soon_fifo.py
50 index 5ecc13f..f970c67 100644
51 --- a/pym/portage/tests/util/eventloop/test_call_soon_fifo.py
52 +++ b/pym/portage/tests/util/eventloop/test_call_soon_fifo.py
53 @@ -7,22 +7,22 @@ import random
54 from portage import os
55 from portage.tests import TestCase
56 from portage.util._eventloop.global_event_loop import global_event_loop
57 -from portage.util.futures.futures import Future
58 +
59
60 class CallSoonFifoTestCase(TestCase):
61
62 def testCallSoonFifo(self):
63
64 + event_loop = global_event_loop()
65 inputs = [random.random() for index in range(10)]
66 outputs = []
67 - finished = Future()
68 + finished = event_loop.create_future()
69
70 def add_output(value):
71 outputs.append(value)
72 if len(outputs) == len(inputs):
73 finished.set_result(True)
74
75 - event_loop = global_event_loop()
76 for value in inputs:
77 event_loop.call_soon(functools.partial(add_output, value))
78
79 diff --git a/pym/portage/tests/util/futures/__init__.py b/pym/portage/tests/util/futures/__init__.py
80 new file mode 100644
81 index 0000000..e69de29
82 diff --git a/pym/portage/tests/util/futures/__test__.py b/pym/portage/tests/util/futures/__test__.py
83 new file mode 100644
84 index 0000000..e69de29
85 diff --git a/pym/portage/tests/util/futures/test_done_callback.py b/pym/portage/tests/util/futures/test_done_callback.py
86 new file mode 100644
87 index 0000000..76b727b
88 --- /dev/null
89 +++ b/pym/portage/tests/util/futures/test_done_callback.py
90 @@ -0,0 +1,35 @@
91 +# Copyright 2017 Gentoo Foundation
92 +# Distributed under the terms of the GNU General Public License v2
93 +
94 +from portage.tests import TestCase
95 +from portage.util._eventloop.global_event_loop import global_event_loop
96 +
97 +
98 +class FutureDoneCallbackTestCase(TestCase):
99 +
100 + def testFutureDoneCallback(self):
101 +
102 + event_loop = global_event_loop()
103 +
104 + def done_callback(finished):
105 + done_callback_called.set_result(True)
106 +
107 + done_callback_called = event_loop.create_future()
108 + finished = event_loop.create_future()
109 + finished.add_done_callback(done_callback)
110 + event_loop.call_soon(finished.set_result, True)
111 + event_loop.run_until_complete(done_callback_called)
112 +
113 + def done_callback2(finished):
114 + done_callback2_called.set_result(True)
115 +
116 + done_callback_called = event_loop.create_future()
117 + done_callback2_called = event_loop.create_future()
118 + finished = event_loop.create_future()
119 + finished.add_done_callback(done_callback)
120 + finished.add_done_callback(done_callback2)
121 + finished.remove_done_callback(done_callback)
122 + event_loop.call_soon(finished.set_result, True)
123 + event_loop.run_until_complete(done_callback2_called)
124 +
125 + self.assertFalse(done_callback_called.done())
126 diff --git a/pym/portage/util/_async/SchedulerInterface.py b/pym/portage/util/_async/SchedulerInterface.py
127 index 6028fd9..21420ae 100644
128 --- a/pym/portage/util/_async/SchedulerInterface.py
129 +++ b/pym/portage/util/_async/SchedulerInterface.py
130 @@ -13,7 +13,8 @@ class SchedulerInterface(SlotObject):
131
132 _event_loop_attrs = ("IO_ERR", "IO_HUP", "IO_IN",
133 "IO_NVAL", "IO_OUT", "IO_PRI",
134 - "call_soon", "call_soon_threadsafe", "child_watch_add",
135 + "call_soon", "call_soon_threadsafe",
136 + "child_watch_add", "create_future",
137 "idle_add", "io_add_watch", "iteration", "run_until_complete",
138 "source_remove", "timeout_add")
139
140 diff --git a/pym/portage/util/_eventloop/EventLoop.py b/pym/portage/util/_eventloop/EventLoop.py
141 index 308157b..712838e 100644
142 --- a/pym/portage/util/_eventloop/EventLoop.py
143 +++ b/pym/portage/util/_eventloop/EventLoop.py
144 @@ -22,6 +22,11 @@ try:
145 except ImportError:
146 import dummy_threading as threading
147
148 +import portage
149 +portage.proxy.lazyimport.lazyimport(globals(),
150 + 'portage.util.futures.futures:_EventLoopFuture',
151 +)
152 +
153 from portage import OrderedDict
154 from portage.util import writemsg_level
155 from ..SlotObject import SlotObject
156 @@ -157,6 +162,15 @@ class EventLoop(object):
157 self._sigchld_src_id = None
158 self._pid = os.getpid()
159
160 + def create_future(self):
161 + """
162 + Create a Future object attached to the loop. This returns
163 + an instance of _EventLoopFuture, because EventLoop is currently
164 + missing some of the asyncio.AbstractEventLoop methods that
165 + asyncio.Future requires.
166 + """
167 + return _EventLoopFuture(loop=self)
168 +
169 def _new_source_id(self):
170 """
171 Generate a new source id. This method is thread-safe.
172 diff --git a/pym/portage/util/futures/futures.py b/pym/portage/util/futures/futures.py
173 index c648f10..dd913a1 100644
174 --- a/pym/portage/util/futures/futures.py
175 +++ b/pym/portage/util/futures/futures.py
176 @@ -23,10 +23,6 @@ except ImportError:
177
178 from portage.exception import PortageException
179
180 - _PENDING = 'PENDING'
181 - _CANCELLED = 'CANCELLED'
182 - _FINISHED = 'FINISHED'
183 -
184 class Error(PortageException):
185 pass
186
187 @@ -37,12 +33,40 @@ except ImportError:
188 class InvalidStateError(Error):
189 pass
190
191 - class Future(object):
192 + Future = None
193 +
194 +from portage.util._eventloop.global_event_loop import global_event_loop
195 +
196 +_PENDING = 'PENDING'
197 +_CANCELLED = 'CANCELLED'
198 +_FINISHED = 'FINISHED'
199 +
200 +class _EventLoopFuture(object):
201 + """
202 + This class provides (a subset of) the asyncio.Future interface, for
203 + use with the EventLoop class, because EventLoop is currently
204 + missing some of the asyncio.AbstractEventLoop methods that
205 + asyncio.Future requires.
206 + """
207
208 # Class variables serving as defaults for instance variables.
209 _state = _PENDING
210 _result = None
211 _exception = None
212 + _loop = None
213 +
214 + def __init__(self, loop=None):
215 + """Initialize the future.
216 +
217 + The optional loop argument allows explicitly setting the event
218 + loop object used by the future. If it's not provided, the future uses
219 + the default event loop.
220 + """
221 + if loop is None:
222 + self._loop = global_event_loop()
223 + else:
224 + self._loop = loop
225 + self._callbacks = []
226
227 def cancel(self):
228 """Cancel the future and schedule callbacks.
229 @@ -54,8 +78,27 @@ except ImportError:
230 if self._state != _PENDING:
231 return False
232 self._state = _CANCELLED
233 + self._schedule_callbacks()
234 return True
235
236 + def _schedule_callbacks(self):
237 + """Internal: Ask the event loop to call all callbacks.
238 +
239 + The callbacks are scheduled to be called as soon as possible. Also
240 + clears the callback list.
241 + """
242 + callbacks = self._callbacks[:]
243 + if not callbacks:
244 + return
245 +
246 + self._callbacks[:] = []
247 + for callback in callbacks:
248 + self._loop.call_soon(callback, self)
249 +
250 + def cancelled(self):
251 + """Return True if the future was cancelled."""
252 + return self._state == _CANCELLED
253 +
254 def done(self):
255 """Return True if the future is done.
256
257 @@ -93,6 +136,29 @@ except ImportError:
258 raise InvalidStateError('Exception is not set.')
259 return self._exception
260
261 + def add_done_callback(self, fn):
262 + """Add a callback to be run when the future becomes done.
263 +
264 + The callback is called with a single argument - the future object. If
265 + the future is already done when this is called, the callback is
266 + scheduled with call_soon.
267 + """
268 + if self._state != _PENDING:
269 + self._loop.call_soon(fn, self)
270 + else:
271 + self._callbacks.append(fn)
272 +
273 + def remove_done_callback(self, fn):
274 + """Remove all instances of a callback from the "call when done" list.
275 +
276 + Returns the number of callbacks removed.
277 + """
278 + filtered_callbacks = [f for f in self._callbacks if f != fn]
279 + removed_count = len(self._callbacks) - len(filtered_callbacks)
280 + if removed_count:
281 + self._callbacks[:] = filtered_callbacks
282 + return removed_count
283 +
284 def set_result(self, result):
285 """Mark the future done and set its result.
286
287 @@ -103,6 +169,7 @@ except ImportError:
288 raise InvalidStateError('{}: {!r}'.format(self._state, self))
289 self._result = result
290 self._state = _FINISHED
291 + self._schedule_callbacks()
292
293 def set_exception(self, exception):
294 """Mark the future done and set an exception.
295 @@ -116,3 +183,8 @@ except ImportError:
296 exception = exception()
297 self._exception = exception
298 self._state = _FINISHED
299 + self._schedule_callbacks()
300 +
301 +
302 +if Future is None:
303 + Future = _EventLoopFuture
304 --
305 2.10.2

Replies