Gentoo Archives: gentoo-portage-dev

From: Zac Medico <zmedico@g.o>
To: gentoo-portage-dev@l.g.o
Cc: Zac Medico <zmedico@g.o>
Subject: [gentoo-portage-dev] [PATCH] Implement AbstractEventLoopPolicy.get_child_watcher() (bug 649588)
Date: Wed, 11 Apr 2018 07:08:07
Message-Id: 20180411070734.29264-1-zmedico@gentoo.org
1 Use a _PortageChildWatcher class to wrap portage's internal event loop
2 and implement asyncio's AbstractChildWatcher interface.
3
4 Bug: https://bugs.gentoo.org/649588
5 ---
6 .../util/futures/asyncio/test_child_watcher.py | 45 +++++++++++
7 pym/portage/util/_eventloop/EventLoop.py | 3 +-
8 pym/portage/util/futures/_asyncio.py | 13 ++++
9 pym/portage/util/futures/unix_events.py | 90 ++++++++++++++++++++++
10 4 files changed, 150 insertions(+), 1 deletion(-)
11 create mode 100644 pym/portage/tests/util/futures/asyncio/test_child_watcher.py
12
13 diff --git a/pym/portage/tests/util/futures/asyncio/test_child_watcher.py b/pym/portage/tests/util/futures/asyncio/test_child_watcher.py
14 new file mode 100644
15 index 000000000..dca01be56
16 --- /dev/null
17 +++ b/pym/portage/tests/util/futures/asyncio/test_child_watcher.py
18 @@ -0,0 +1,45 @@
19 +# Copyright 2018 Gentoo Foundation
20 +# Distributed under the terms of the GNU General Public License v2
21 +
22 +import os
23 +
24 +from portage.process import find_binary, spawn
25 +from portage.tests import TestCase
26 +from portage.util.futures import asyncio
27 +from portage.util.futures.unix_events import DefaultEventLoopPolicy
28 +
29 +
30 +class ChildWatcherTestCase(TestCase):
31 + def testChildWatcher(self):
32 + true_binary = find_binary("true")
33 + self.assertNotEqual(true_binary, None)
34 +
35 + initial_policy = asyncio.get_event_loop_policy()
36 + if not isinstance(initial_policy, DefaultEventLoopPolicy):
37 + asyncio.set_event_loop_policy(DefaultEventLoopPolicy())
38 +
39 + try:
40 + try:
41 + asyncio.set_child_watcher(None)
42 + except NotImplementedError:
43 + pass
44 + else:
45 + self.assertTrue(False)
46 +
47 + args_tuple = ('hello', 'world')
48 +
49 + loop = asyncio.get_event_loop()
50 + future = loop.create_future()
51 +
52 + def callback(pid, returncode, *args):
53 + future.set_result((pid, returncode, args))
54 +
55 + with asyncio.get_child_watcher() as watcher:
56 + pids = spawn([true_binary], returnpid=True)
57 + watcher.add_child_handler(pids[0], callback, *args_tuple)
58 +
59 + self.assertEqual(
60 + loop.run_until_complete(future),
61 + (pids[0], os.EX_OK, args_tuple))
62 + finally:
63 + asyncio.set_event_loop_policy(initial_policy)
64 diff --git a/pym/portage/util/_eventloop/EventLoop.py b/pym/portage/util/_eventloop/EventLoop.py
65 index d53a76ba1..1bb49092b 100644
66 --- a/pym/portage/util/_eventloop/EventLoop.py
67 +++ b/pym/portage/util/_eventloop/EventLoop.py
68 @@ -25,7 +25,7 @@ import portage
69 portage.proxy.lazyimport.lazyimport(globals(),
70 'portage.util.futures.futures:Future',
71 'portage.util.futures.executor.fork:ForkExecutor',
72 - 'portage.util.futures.unix_events:_PortageEventLoop',
73 + 'portage.util.futures.unix_events:_PortageEventLoop,_PortageChildWatcher',
74 )
75
76 from portage import OrderedDict
77 @@ -190,6 +190,7 @@ class EventLoop(object):
78 self._sigchld_src_id = None
79 self._pid = os.getpid()
80 self._asyncio_wrapper = _PortageEventLoop(loop=self)
81 + self._asyncio_child_watcher = _PortageChildWatcher(self)
82
83 def create_future(self):
84 """
85 diff --git a/pym/portage/util/futures/_asyncio.py b/pym/portage/util/futures/_asyncio.py
86 index 02ab59999..0f84f14b7 100644
87 --- a/pym/portage/util/futures/_asyncio.py
88 +++ b/pym/portage/util/futures/_asyncio.py
89 @@ -3,7 +3,9 @@
90
91 __all__ = (
92 'ensure_future',
93 + 'get_child_watcher',
94 'get_event_loop',
95 + 'set_child_watcher',
96 'get_event_loop_policy',
97 'set_event_loop_policy',
98 'sleep',
99 @@ -62,6 +64,17 @@ def get_event_loop():
100 return get_event_loop_policy().get_event_loop()
101
102
103 +def get_child_watcher():
104 + """Equivalent to calling get_event_loop_policy().get_child_watcher()."""
105 + return get_event_loop_policy().get_child_watcher()
106 +
107 +
108 +def set_child_watcher(watcher):
109 + """Equivalent to calling
110 + get_event_loop_policy().set_child_watcher(watcher)."""
111 + return get_event_loop_policy().set_child_watcher(watcher)
112 +
113 +
114 class Task(Future):
115 """
116 Schedule the execution of a coroutine: wrap it in a future. A task
117 diff --git a/pym/portage/util/futures/unix_events.py b/pym/portage/util/futures/unix_events.py
118 index ed4c6e519..6fcef45fa 100644
119 --- a/pym/portage/util/futures/unix_events.py
120 +++ b/pym/portage/util/futures/unix_events.py
121 @@ -2,9 +2,17 @@
122 # Distributed under the terms of the GNU General Public License v2
123
124 __all__ = (
125 + 'AbstractChildWatcher',
126 'DefaultEventLoopPolicy',
127 )
128
129 +try:
130 + from asyncio.unix_events import AbstractChildWatcher as _AbstractChildWatcher
131 +except ImportError:
132 + _AbstractChildWatcher = object
133 +
134 +import os
135 +
136 from portage.util._eventloop.global_event_loop import (
137 global_event_loop as _global_event_loop,
138 )
139 @@ -68,6 +76,84 @@ class _PortageEventLoop(events.AbstractEventLoop):
140 return asyncio.Task(coro, loop=self)
141
142
143 +class AbstractChildWatcher(_AbstractChildWatcher):
144 + def add_child_handler(self, pid, callback, *args):
145 + raise NotImplementedError()
146 +
147 + def remove_child_handler(self, pid):
148 + raise NotImplementedError()
149 +
150 + def attach_loop(self, loop):
151 + raise NotImplementedError()
152 +
153 + def close(self):
154 + raise NotImplementedError()
155 +
156 + def __enter__(self):
157 + raise NotImplementedError()
158 +
159 + def __exit__(self, a, b, c):
160 + raise NotImplementedError()
161 +
162 +
163 +class _PortageChildWatcher(_AbstractChildWatcher):
164 + def __init__(self, loop):
165 + """
166 + @type loop: EventLoop
167 + @param loop: an instance of portage's internal event loop
168 + """
169 + self._loop = loop
170 + self._callbacks = {}
171 +
172 + def close(self):
173 + pass
174 +
175 + def __enter__(self):
176 + return self
177 +
178 + def __exit__(self, a, b, c):
179 + pass
180 +
181 + def _child_exit(self, pid, status, data):
182 + self._callbacks.pop(pid)
183 + callback, args = data
184 + callback(pid, self._compute_returncode(status), *args)
185 +
186 + def _compute_returncode(self, status):
187 + if os.WIFSIGNALED(status):
188 + return -os.WTERMSIG(status)
189 + elif os.WIFEXITED(status):
190 + return os.WEXITSTATUS(status)
191 + else:
192 + return status
193 +
194 + def add_child_handler(self, pid, callback, *args):
195 + """
196 + Register a new child handler.
197 +
198 + Arrange for callback(pid, returncode, *args) to be called when
199 + process 'pid' terminates. Specifying another callback for the same
200 + process replaces the previous handler.
201 + """
202 + source_id = self._callbacks.get(pid)
203 + if source_id is not None:
204 + self._loop.source_remove(source_id)
205 + self._callbacks[pid] = self._loop.child_watch_add(
206 + pid, self._child_exit, data=(callback, args))
207 +
208 + def remove_child_handler(self, pid):
209 + """
210 + Removes the handler for process 'pid'.
211 +
212 + The function returns True if the handler was successfully removed,
213 + False if there was nothing to remove.
214 + """
215 + source_id = self._callbacks.pop(pid, None)
216 + if source_id is not None:
217 + return self._loop.source_remove(source_id)
218 + return False
219 +
220 +
221 class _PortageEventLoopPolicy(events.AbstractEventLoopPolicy):
222 """
223 Implementation of asyncio.AbstractEventLoopPolicy based on portage's
224 @@ -87,5 +173,9 @@ class _PortageEventLoopPolicy(events.AbstractEventLoopPolicy):
225 """
226 return _global_event_loop()._asyncio_wrapper
227
228 + def get_child_watcher(self):
229 + """Get the watcher for child processes."""
230 + return _global_event_loop()._asyncio_child_watcher
231 +
232
233 DefaultEventLoopPolicy = _PortageEventLoopPolicy
234 --
235 2.13.6