Gentoo Archives: gentoo-portage-dev

From: Zac Medico <zmedico@g.o>
To: gentoo-portage-dev@l.g.o
Cc: Zac Medico <zmedico@g.o>
Subject: [gentoo-portage-dev] [PATCH] Add minimal asyncio.AbstractEventLoop implementation (bug 649588)
Date: Mon, 09 Apr 2018 03:39:49
Message-Id: 20180409033419.24908-1-zmedico@gentoo.org
1 This provides minimal interoperability with existing asyncio code,
2 by adding a portage.util.futures.unix_events.DefaultEventLoopPolicy
3 class that makes asyncio use portage's internal event loop when an
4 instance is passed into asyncio.set_event_loop_policy(). The
5 get_event_loop() method of this policy returns an instance of a
6 _PortageEventLoop class that wraps portage's internal event loop and
7 implements asyncio's AbstractEventLoop interface.
8
9 The portage.util.futures.asyncio module refers to the real
10 asyncio module when available, and otherwise falls back to a
11 minimal implementation that works with python2.7. The included
12 EventLoopInForkTestCase demonstrates usage, and works with all
13 supported versions of python, include python2.7.
14
15 Bug: https://bugs.gentoo.org/649588
16 ---
17 pym/portage/tests/util/futures/asyncio/__init__.py | 0
18 pym/portage/tests/util/futures/asyncio/__test__.py | 0
19 .../futures/asyncio/test_event_loop_in_fork.py | 62 +++++++
20 pym/portage/util/_eventloop/EventLoop.py | 11 +-
21 pym/portage/util/futures/__init__.py | 9 +
22 pym/portage/util/futures/_asyncio.py | 116 +++++++++++++
23 pym/portage/util/futures/events.py | 191 +++++++++++++++++++++
24 pym/portage/util/futures/futures.py | 7 +-
25 pym/portage/util/futures/unix_events.py | 91 ++++++++++
26 9 files changed, 479 insertions(+), 8 deletions(-)
27 create mode 100644 pym/portage/tests/util/futures/asyncio/__init__.py
28 create mode 100644 pym/portage/tests/util/futures/asyncio/__test__.py
29 create mode 100644 pym/portage/tests/util/futures/asyncio/test_event_loop_in_fork.py
30 create mode 100644 pym/portage/util/futures/_asyncio.py
31 create mode 100644 pym/portage/util/futures/events.py
32 create mode 100644 pym/portage/util/futures/unix_events.py
33
34 diff --git a/pym/portage/tests/util/futures/asyncio/__init__.py b/pym/portage/tests/util/futures/asyncio/__init__.py
35 new file mode 100644
36 index 000000000..e69de29bb
37 diff --git a/pym/portage/tests/util/futures/asyncio/__test__.py b/pym/portage/tests/util/futures/asyncio/__test__.py
38 new file mode 100644
39 index 000000000..e69de29bb
40 diff --git a/pym/portage/tests/util/futures/asyncio/test_event_loop_in_fork.py b/pym/portage/tests/util/futures/asyncio/test_event_loop_in_fork.py
41 new file mode 100644
42 index 000000000..1ef46229b
43 --- /dev/null
44 +++ b/pym/portage/tests/util/futures/asyncio/test_event_loop_in_fork.py
45 @@ -0,0 +1,62 @@
46 +# Copyright 2018 Gentoo Foundation
47 +# Distributed under the terms of the GNU General Public License v2
48 +
49 +import multiprocessing
50 +from portage.tests import TestCase
51 +from portage.util.futures import asyncio
52 +from portage.util.futures.unix_events import DefaultEventLoopPolicy
53 +
54 +
55 +def fork_main(parent_conn, child_conn):
56 + parent_conn.close()
57 + loop = asyncio.get_event_loop()
58 + # This fails with python's default event loop policy,
59 + # see https://bugs.python.org/issue22087.
60 + loop.run_until_complete(asyncio.sleep(0.1))
61 +
62 +
63 +def async_main(loop=None):
64 + loop = loop or asyncio.get_event_loop()
65 + future = loop.create_future()
66 +
67 + # Since python2.7 does not support Process.sentinel, use Pipe to
68 + # monitor for process exit.
69 + parent_conn, child_conn = multiprocessing.Pipe()
70 +
71 + def eof_callback():
72 + loop.remove_reader(parent_conn.fileno())
73 + parent_conn.close()
74 + future.set_result(None)
75 +
76 + loop.add_reader(parent_conn.fileno(), eof_callback)
77 + proc = multiprocessing.Process(target=fork_main, args=(parent_conn, child_conn))
78 + proc.start()
79 + child_conn.close()
80 +
81 + return future
82 +
83 +
84 +class EventLoopInForkTestCase(TestCase):
85 + """
86 + The default asyncio event loop policy does not support loops
87 + running in forks, see https://bugs.python.org/issue22087.
88 + Portage's DefaultEventLoopPolicy supports forks.
89 + """
90 +
91 + def testEventLoopInForkTestCase(self):
92 + initial_policy = asyncio.get_event_loop_policy()
93 + if not isinstance(initial_policy, DefaultEventLoopPolicy):
94 + asyncio.set_event_loop_policy(DefaultEventLoopPolicy())
95 + try:
96 + loop = asyncio.get_event_loop()
97 + exit_future = loop.create_future()
98 + def trigger_exit(*args):
99 + exit_future.set_result(True)
100 +
101 + def start_async_main():
102 + async_main(loop=loop).add_done_callback(trigger_exit)
103 +
104 + loop.call_soon(start_async_main)
105 + loop.run_until_complete(exit_future)
106 + finally:
107 + asyncio.set_event_loop_policy(initial_policy)
108 diff --git a/pym/portage/util/_eventloop/EventLoop.py b/pym/portage/util/_eventloop/EventLoop.py
109 index 72eb407fc..d53a76ba1 100644
110 --- a/pym/portage/util/_eventloop/EventLoop.py
111 +++ b/pym/portage/util/_eventloop/EventLoop.py
112 @@ -23,8 +23,9 @@ except ImportError:
113
114 import portage
115 portage.proxy.lazyimport.lazyimport(globals(),
116 - 'portage.util.futures.futures:_EventLoopFuture',
117 + 'portage.util.futures.futures:Future',
118 'portage.util.futures.executor.fork:ForkExecutor',
119 + 'portage.util.futures.unix_events:_PortageEventLoop',
120 )
121
122 from portage import OrderedDict
123 @@ -188,15 +189,13 @@ class EventLoop(object):
124 self._sigchld_write = None
125 self._sigchld_src_id = None
126 self._pid = os.getpid()
127 + self._asyncio_wrapper = _PortageEventLoop(loop=self)
128
129 def create_future(self):
130 """
131 - Create a Future object attached to the loop. This returns
132 - an instance of _EventLoopFuture, because EventLoop is currently
133 - missing some of the asyncio.AbstractEventLoop methods that
134 - asyncio.Future requires.
135 + Create a Future object attached to the loop.
136 """
137 - return _EventLoopFuture(loop=self)
138 + return Future(loop=self._asyncio_wrapper)
139
140 def _new_source_id(self):
141 """
142 diff --git a/pym/portage/util/futures/__init__.py b/pym/portage/util/futures/__init__.py
143 index e69de29bb..789080c85 100644
144 --- a/pym/portage/util/futures/__init__.py
145 +++ b/pym/portage/util/futures/__init__.py
146 @@ -0,0 +1,9 @@
147 +
148 +__all__ = (
149 + 'asyncio',
150 +)
151 +
152 +try:
153 + import asyncio
154 +except ImportError:
155 + from portage.util.futures import _asyncio as asyncio
156 diff --git a/pym/portage/util/futures/_asyncio.py b/pym/portage/util/futures/_asyncio.py
157 new file mode 100644
158 index 000000000..6874e133f
159 --- /dev/null
160 +++ b/pym/portage/util/futures/_asyncio.py
161 @@ -0,0 +1,116 @@
162 +# Copyright 2018 Gentoo Foundation
163 +# Distributed under the terms of the GNU General Public License v2
164 +
165 +__all__ = (
166 + 'ensure_future',
167 + 'get_event_loop',
168 + 'get_event_loop_policy',
169 + 'set_event_loop_policy',
170 + 'sleep',
171 + 'Task',
172 +)
173 +
174 +import functools
175 +
176 +try:
177 + import threading
178 +except ImportError:
179 + import dummy_threading as threading
180 +
181 +import portage
182 +portage.proxy.lazyimport.lazyimport(globals(),
183 + 'portage.util.futures.unix_events:DefaultEventLoopPolicy',
184 +)
185 +from portage.util.futures.futures import Future
186 +
187 +_lock = threading.Lock()
188 +_policy = None
189 +
190 +
191 +def get_event_loop_policy():
192 + """
193 + Get the current event loop policy.
194 +
195 + @rtype: asyncio.AbstractEventLoopPolicy (or compatible)
196 + @return: the current event loop policy
197 + """
198 + global _lock, _policy
199 + with _lock:
200 + if _policy is None:
201 + _policy = DefaultEventLoopPolicy()
202 + return _policy
203 +
204 +
205 +def set_event_loop_policy(policy):
206 + """
207 + Set the current event loop policy. If policy is None, the default
208 + policy is restored.
209 +
210 + @type policy: asyncio.AbstractEventLoopPolicy or None
211 + @param policy: new event loop policy
212 + """
213 + global _lock, _policy
214 + with _lock:
215 + _policy = policy or DefaultEventLoopPolicy()
216 +
217 +
218 +def get_event_loop():
219 + """
220 + Equivalent to calling get_event_loop_policy().get_event_loop().
221 +
222 + @rtype: asyncio.AbstractEventLoop (or compatible)
223 + @return: the event loop for the current context
224 + """
225 + return get_event_loop_policy().get_event_loop()
226 +
227 +
228 +class Task(Future):
229 + """
230 + Schedule the execution of a coroutine: wrap it in a future. A task
231 + is a subclass of Future.
232 + """
233 + def __init__(self, coro, loop=None):
234 + raise NotImplementedError
235 +
236 +
237 +def ensure_future(coro_or_future, loop=None):
238 + """
239 + Wrap a coroutine or an awaitable in a future.
240 +
241 + If the argument is a Future, it is returned directly.
242 +
243 + @type coro_or_future: coroutine or Future
244 + @param coro_or_future: coroutine or future to wrap
245 + @type loop: asyncio.AbstractEventLoop (or compatible)
246 + @param loop: event loop
247 + @rtype: asyncio.Future (or compatible)
248 + @return: an instance of Future
249 + """
250 + if isinstance(coro_or_future, Future):
251 + return coro_or_future
252 + raise NotImplementedError
253 +
254 +
255 +def sleep(delay, result=None, loop=None):
256 + """
257 + Create a future that completes after a given time (in seconds). If
258 + result is provided, it is produced to the caller when the future
259 + completes.
260 +
261 + @type delay: int or float
262 + @param delay: delay seconds
263 + @type result: object
264 + @param result: result of the future
265 + @type loop: asyncio.AbstractEventLoop (or compatible)
266 + @param loop: event loop
267 + @rtype: asyncio.Future (or compatible)
268 + @return: an instance of Future
269 + """
270 + loop = loop or get_event_loop()
271 + future = loop.create_future()
272 + handle = loop.call_later(delay, functools.partial(future.set_result, result))
273 + def cancel_callback(future):
274 + if future.cancelled():
275 + handle.cancel()
276 + future.add_done_callback(cancel_callback)
277 + return future
278 diff --git a/pym/portage/util/futures/events.py b/pym/portage/util/futures/events.py
279 new file mode 100644
280 index 000000000..b772bc242
281 --- /dev/null
282 +++ b/pym/portage/util/futures/events.py
283 @@ -0,0 +1,191 @@
284 +# Copyright 2018 Gentoo Foundation
285 +# Distributed under the terms of the GNU General Public License v2
286 +
287 +__all__ = (
288 + 'AbstractEventLoopPolicy',
289 + 'AbstractEventLoop',
290 +)
291 +
292 +import socket
293 +import subprocess
294 +
295 +try:
296 + from asyncio.events import (
297 + AbstractEventLoop as _AbstractEventLoop,
298 + AbstractEventLoopPolicy as _AbstractEventLoopPolicy,
299 + )
300 +except ImportError:
301 + _AbstractEventLoop = object
302 + _AbstractEventLoopPolicy = object
303 +
304 +
305 +class AbstractEventLoopPolicy(_AbstractEventLoopPolicy):
306 + """Abstract policy for accessing the event loop."""
307 +
308 + def get_event_loop(self):
309 + raise NotImplementedError
310 +
311 + def set_event_loop(self, loop):
312 + raise NotImplementedError
313 +
314 + def new_event_loop(self):
315 + raise NotImplementedError
316 +
317 + def get_child_watcher(self):
318 + raise NotImplementedError
319 +
320 + def set_child_watcher(self, watcher):
321 + raise NotImplementedError
322 +
323 +
324 +class AbstractEventLoop(_AbstractEventLoop):
325 + """Abstract event loop."""
326 +
327 + def run_forever(self):
328 + raise NotImplementedError
329 +
330 + def run_until_complete(self, future):
331 + raise NotImplementedError
332 +
333 + def stop(self):
334 + raise NotImplementedError
335 +
336 + def is_running(self):
337 + raise NotImplementedError
338 +
339 + def is_closed(self):
340 + raise NotImplementedError
341 +
342 + def close(self):
343 + raise NotImplementedError
344 +
345 + def shutdown_asyncgens(self):
346 + raise NotImplementedError
347 +
348 + def _timer_handle_cancelled(self, handle):
349 + raise NotImplementedError
350 +
351 + def call_soon(self, callback, *args):
352 + return self.call_later(0, callback, *args)
353 +
354 + def call_later(self, delay, callback, *args):
355 + raise NotImplementedError
356 +
357 + def call_at(self, when, callback, *args):
358 + raise NotImplementedError
359 +
360 + def time(self):
361 + raise NotImplementedError
362 +
363 + def create_future(self):
364 + raise NotImplementedError
365 +
366 + def create_task(self, coro):
367 + raise NotImplementedError
368 +
369 + def call_soon_threadsafe(self, callback, *args):
370 + raise NotImplementedError
371 +
372 + def run_in_executor(self, executor, func, *args):
373 + raise NotImplementedError
374 +
375 + def set_default_executor(self, executor):
376 + raise NotImplementedError
377 +
378 + def getaddrinfo(self, host, port, family=0, type=0, proto=0, flags=0):
379 + raise NotImplementedError
380 +
381 + def getnameinfo(self, sockaddr, flags=0):
382 + raise NotImplementedError
383 +
384 + def create_connection(self, protocol_factory, host=None, port=None,
385 + ssl=None, family=0, proto=0, flags=0, sock=None,
386 + local_addr=None, server_hostname=None):
387 + raise NotImplementedError
388 +
389 + def create_server(self, protocol_factory, host=None, port=None,
390 + family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE,
391 + sock=None, backlog=100, ssl=None, reuse_address=None,
392 + reuse_port=None):
393 + raise NotImplementedError
394 +
395 + def create_unix_connection(self, protocol_factory, path,
396 + ssl=None, sock=None,
397 + server_hostname=None):
398 + raise NotImplementedError
399 +
400 + def create_unix_server(self, protocol_factory, path,
401 + sock=None, backlog=100, ssl=None):
402 + raise NotImplementedError
403 +
404 + def create_datagram_endpoint(self, protocol_factory,
405 + local_addr=None, remote_addr=None,
406 + family=0, proto=0, flags=0,
407 + reuse_address=None, reuse_port=None,
408 + allow_broadcast=None, sock=None):
409 + raise NotImplementedError
410 +
411 + def connect_read_pipe(self, protocol_factory, pipe):
412 + raise NotImplementedError
413 +
414 + def connect_write_pipe(self, protocol_factory, pipe):
415 + raise NotImplementedError
416 +
417 + def subprocess_shell(self, protocol_factory, cmd, stdin=subprocess.PIPE,
418 + stdout=subprocess.PIPE, stderr=subprocess.PIPE,
419 + **kwargs):
420 + raise NotImplementedError
421 +
422 + def subprocess_exec(self, protocol_factory, *args, **kwargs):
423 + for k in ('stdin', 'stdout', 'stderr'):
424 + kwargs.setdefault(k, subprocess.PIPE)
425 + raise NotImplementedError
426 +
427 + def add_writer(self, fd, callback, *args):
428 + raise NotImplementedError
429 +
430 + def remove_writer(self, fd):
431 + raise NotImplementedError
432 +
433 + def sock_recv(self, sock, nbytes):
434 + raise NotImplementedError
435 +
436 + def sock_sendall(self, sock, data):
437 + raise NotImplementedError
438 +
439 + def sock_connect(self, sock, address):
440 + raise NotImplementedError
441 +
442 + def sock_accept(self, sock):
443 + raise NotImplementedError
444 +
445 + def add_signal_handler(self, sig, callback, *args):
446 + raise NotImplementedError
447 +
448 + def remove_signal_handler(self, sig):
449 + raise NotImplementedError
450 +
451 + def set_task_factory(self, factory):
452 + raise NotImplementedError
453 +
454 + def get_task_factory(self):
455 + raise NotImplementedError
456 +
457 + def get_exception_handler(self):
458 + raise NotImplementedError
459 +
460 + def set_exception_handler(self, handler):
461 + raise NotImplementedError
462 +
463 + def default_exception_handler(self, context):
464 + raise NotImplementedError
465 +
466 + def call_exception_handler(self, context):
467 + raise NotImplementedError
468 +
469 + def get_debug(self):
470 + raise NotImplementedError
471 +
472 + def set_debug(self, enabled):
473 + raise NotImplementedError
474 +
475 diff --git a/pym/portage/util/futures/futures.py b/pym/portage/util/futures/futures.py
476 index cd56a27eb..dc4e0a7d7 100644
477 --- a/pym/portage/util/futures/futures.py
478 +++ b/pym/portage/util/futures/futures.py
479 @@ -41,7 +41,10 @@ except ImportError:
480
481 Future = None
482
483 -from portage.util._eventloop.global_event_loop import global_event_loop
484 +import portage
485 +portage.proxy.lazyimport.lazyimport(globals(),
486 + 'portage.util._eventloop.global_event_loop:global_event_loop@_global_event_loop',
487 +)
488
489 _PENDING = 'PENDING'
490 _CANCELLED = 'CANCELLED'
491 @@ -69,7 +72,7 @@ class _EventLoopFuture(object):
492 the default event loop.
493 """
494 if loop is None:
495 - self._loop = global_event_loop()
496 + self._loop = _global_event_loop()._asyncio_wrapper
497 else:
498 self._loop = loop
499 self._callbacks = []
500 diff --git a/pym/portage/util/futures/unix_events.py b/pym/portage/util/futures/unix_events.py
501 new file mode 100644
502 index 000000000..ed4c6e519
503 --- /dev/null
504 +++ b/pym/portage/util/futures/unix_events.py
505 @@ -0,0 +1,91 @@
506 +# Copyright 2018 Gentoo Foundation
507 +# Distributed under the terms of the GNU General Public License v2
508 +
509 +__all__ = (
510 + 'DefaultEventLoopPolicy',
511 +)
512 +
513 +from portage.util._eventloop.global_event_loop import (
514 + global_event_loop as _global_event_loop,
515 +)
516 +from portage.util.futures import (
517 + asyncio,
518 + events,
519 +)
520 +from portage.util.futures.futures import Future
521 +
522 +
523 +class _PortageEventLoop(events.AbstractEventLoop):
524 + """
525 + Implementation of asyncio.AbstractEventLoop which wraps portage's
526 + internal event loop.
527 + """
528 +
529 + def __init__(self, loop):
530 + """
531 + @type loop: EventLoop
532 + @param loop: an instance of portage's internal event loop
533 + """
534 + self._loop = loop
535 + self.call_soon = loop.call_soon
536 + self.call_soon_threadsafe = loop.call_soon_threadsafe
537 + self.call_later = loop.call_later
538 + self.call_at = loop.call_at
539 + self.is_closed = loop.is_closed
540 + self.close = loop.close
541 + self.create_future = loop.create_future
542 + self.add_reader = loop.add_reader
543 + self.remove_reader = loop.remove_reader
544 + self.add_writer = loop.add_writer
545 + self.remove_writer = loop.remove_writer
546 + self.run_in_executor = loop.run_in_executor
547 + self.time = loop.time
548 + self.set_debug = loop.set_debug
549 + self.get_debug = loop.get_debug
550 +
551 + def run_until_complete(self, future):
552 + """
553 + Run the event loop until a Future is done.
554 +
555 + @type future: asyncio.Future
556 + @param future: a Future to wait for
557 + @rtype: object
558 + @return: the Future's result
559 + @raise: the Future's exception
560 + """
561 + return self._loop.run_until_complete(
562 + asyncio.ensure_future(future, loop=self))
563 +
564 + def create_task(self, coro):
565 + """
566 + Schedule a coroutine object.
567 +
568 + @type coro: coroutine
569 + @param coro: a coroutine to schedule
570 + @rtype: asyncio.Task
571 + @return: a task object
572 + """
573 + return asyncio.Task(coro, loop=self)
574 +
575 +
576 +class _PortageEventLoopPolicy(events.AbstractEventLoopPolicy):
577 + """
578 + Implementation of asyncio.AbstractEventLoopPolicy based on portage's
579 + internal event loop. This supports running event loops in forks,
580 + which is not supported by the default asyncio event loop policy,
581 + see https://bugs.python.org/issue22087.
582 + """
583 + def get_event_loop(self):
584 + """
585 + Get the event loop for the current context.
586 +
587 + Returns an event loop object implementing the AbstractEventLoop
588 + interface.
589 +
590 + @rtype: asyncio.AbstractEventLoop (or compatible)
591 + @return: the current event loop policy
592 + """
593 + return _global_event_loop()._asyncio_wrapper
594 +
595 +
596 +DefaultEventLoopPolicy = _PortageEventLoopPolicy
597 --
598 2.13.6