1 |
In python versions that support asyncio, this allows API consumers |
2 |
to use the asyncio.create_subprocess_exec() function with portage's |
3 |
internal event loop. Currently, subprocess.PIPE is not implemented |
4 |
because that would require an implementation of asyncio's private |
5 |
asyncio.unix_events._UnixReadPipeTransport class. However, it's |
6 |
possible to use pipes created with os.pipe() for stdin, stdout, |
7 |
and stderr, as demonstrated in the included unit tests. |
8 |
|
9 |
Bug: https://bugs.gentoo.org/649588 |
10 |
--- |
11 |
.../util/futures/asyncio/test_subprocess_exec.py | 136 +++++++++++++++++++++ |
12 |
pym/portage/util/futures/unix_events.py | 98 +++++++++++++++ |
13 |
2 files changed, 234 insertions(+) |
14 |
create mode 100644 pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py |
15 |
|
16 |
diff --git a/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py b/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py |
17 |
new file mode 100644 |
18 |
index 000000000..f77d362f3 |
19 |
--- /dev/null |
20 |
+++ b/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py |
21 |
@@ -0,0 +1,136 @@ |
22 |
+# Copyright 2018 Gentoo Foundation |
23 |
+# Distributed under the terms of the GNU General Public License v2 |
24 |
+ |
25 |
+import errno |
26 |
+import fcntl |
27 |
+import os |
28 |
+ |
29 |
+from portage.process import find_binary |
30 |
+from portage.tests import TestCase |
31 |
+from portage.util.futures import asyncio |
32 |
+from portage.util.futures.executor.fork import ForkExecutor |
33 |
+from portage.util.futures.unix_events import DefaultEventLoopPolicy |
34 |
+ |
35 |
+ |
36 |
+class SubprocessExecTestCase(TestCase): |
37 |
+ def _run_test(self, test, cleanup): |
38 |
+ initial_policy = asyncio.get_event_loop_policy() |
39 |
+ if not isinstance(initial_policy, DefaultEventLoopPolicy): |
40 |
+ asyncio.set_event_loop_policy(DefaultEventLoopPolicy()) |
41 |
+ |
42 |
+ try: |
43 |
+ test(asyncio.get_event_loop()) |
44 |
+ finally: |
45 |
+ cleanup(asyncio.get_event_loop()) |
46 |
+ asyncio.set_event_loop_policy(initial_policy) |
47 |
+ |
48 |
+ def testEcho(self): |
49 |
+ if not hasattr(asyncio, 'create_subprocess_exec'): |
50 |
+ self.skipTest('create_subprocess_exec not implemented for python2') |
51 |
+ |
52 |
+ args_tuple = (b'hello', b'world') |
53 |
+ echo_binary = find_binary("echo") |
54 |
+ self.assertNotEqual(echo_binary, None) |
55 |
+ echo_binary = os.fsencode(echo_binary) |
56 |
+ |
57 |
+ # Use os.pipe(), since this loop does not implement the |
58 |
+ # ReadTransport necessary for subprocess.PIPE support. |
59 |
+ stdout_pr, stdout_pw = os.pipe() |
60 |
+ stdout_pr = open(stdout_pr, 'rb', 0) |
61 |
+ stdout_pw = open(stdout_pw, 'wb', 0) |
62 |
+ files = [stdout_pr, stdout_pw] |
63 |
+ |
64 |
+ def test(loop): |
65 |
+ with open(os.devnull, 'rb', 0) as devnull: |
66 |
+ proc = loop.run_until_complete( |
67 |
+ asyncio.create_subprocess_exec( |
68 |
+ echo_binary, *args_tuple, |
69 |
+ stdin=devnull, stdout=stdout_pw, stderr=stdout_pw)) |
70 |
+ |
71 |
+ # This belongs exclusively to the subprocess now. |
72 |
+ stdout_pw.close() |
73 |
+ |
74 |
+ read_buffer = [] |
75 |
+ fcntl.fcntl(stdout_pr.fileno(), fcntl.F_SETFL, |
76 |
+ fcntl.fcntl(stdout_pr.fileno(), fcntl.F_GETFL) | os.O_NONBLOCK) |
77 |
+ loop.add_reader(stdout_pr.fileno(), |
78 |
+ lambda: read_buffer.extend(self._read_stdio(stdout_pr))) |
79 |
+ |
80 |
+ self.assertEqual(loop.run_until_complete(proc.wait()), os.EX_OK) |
81 |
+ self.assertEqual(tuple(b''.join(read_buffer).split()), args_tuple) |
82 |
+ |
83 |
+ def cleanup(loop): |
84 |
+ loop.remove_reader(stdout_pr.fileno()) |
85 |
+ for f in files: |
86 |
+ f.close() |
87 |
+ |
88 |
+ self._run_test(test, cleanup) |
89 |
+ |
90 |
+ def testCat(self): |
91 |
+ if not hasattr(asyncio, 'create_subprocess_exec'): |
92 |
+ self.skipTest('create_subprocess_exec not implemented for python2') |
93 |
+ |
94 |
+ stdin_data = b'hello world' |
95 |
+ cat_binary = find_binary("cat") |
96 |
+ self.assertNotEqual(cat_binary, None) |
97 |
+ cat_binary = os.fsencode(cat_binary) |
98 |
+ |
99 |
+ # Use os.pipe(), since this loop does not implement the |
100 |
+ # ReadTransport necessary for subprocess.PIPE support. |
101 |
+ stdout_pr, stdout_pw = os.pipe() |
102 |
+ stdout_pr = open(stdout_pr, 'rb', 0) |
103 |
+ stdout_pw = open(stdout_pw, 'wb', 0) |
104 |
+ |
105 |
+ stdin_pr, stdin_pw = os.pipe() |
106 |
+ stdin_pr = open(stdin_pr, 'rb', 0) |
107 |
+ stdin_pw = open(stdin_pw, 'wb', 0) |
108 |
+ |
109 |
+ files = [stdout_pr, stdout_pw, stdin_pr, stdin_pw] |
110 |
+ |
111 |
+ def test(loop): |
112 |
+ proc = loop.run_until_complete( |
113 |
+ asyncio.create_subprocess_exec( |
114 |
+ cat_binary, |
115 |
+ stdin=stdin_pr, stdout=stdout_pw, stderr=stdout_pw)) |
116 |
+ |
117 |
+ # These belong exclusively to the subprocess now. |
118 |
+ stdout_pw.close() |
119 |
+ stdin_pr.close() |
120 |
+ |
121 |
+ read_buffer = [] |
122 |
+ fcntl.fcntl(stdout_pr.fileno(), fcntl.F_SETFL, |
123 |
+ fcntl.fcntl(stdout_pr.fileno(), fcntl.F_GETFL) | os.O_NONBLOCK) |
124 |
+ loop.add_reader(stdout_pr.fileno(), |
125 |
+ lambda: read_buffer.extend(self._read_stdio(stdout_pr))) |
126 |
+ |
127 |
+ with ForkExecutor(loop=loop) as executor: |
128 |
+ writer = asyncio.ensure_future(loop.run_in_executor( |
129 |
+ executor, stdin_pw.write, stdin_data), loop=loop) |
130 |
+ |
131 |
+ # This belongs exlusively to the writer now. |
132 |
+ stdin_pw.close() |
133 |
+ loop.run_until_complete(writer) |
134 |
+ |
135 |
+ self.assertEqual(loop.run_until_complete(proc.wait()), os.EX_OK) |
136 |
+ self.assertEqual(b''.join(read_buffer), stdin_data) |
137 |
+ |
138 |
+ def cleanup(loop): |
139 |
+ loop.remove_reader(stdout_pr.fileno()) |
140 |
+ for f in files: |
141 |
+ f.close() |
142 |
+ |
143 |
+ self._run_test(test, cleanup) |
144 |
+ |
145 |
+ @staticmethod |
146 |
+ def _read_stdio(stdio_pr): |
147 |
+ while True: |
148 |
+ try: |
149 |
+ buf = stdio_pr.read() |
150 |
+ except OSError as e: |
151 |
+ if e.errno == errno.EAGAIN: |
152 |
+ break |
153 |
+ else: |
154 |
+ if buf: |
155 |
+ yield buf |
156 |
+ else: |
157 |
+ break |
158 |
diff --git a/pym/portage/util/futures/unix_events.py b/pym/portage/util/futures/unix_events.py |
159 |
index 6fcef45fa..d5c0480bf 100644 |
160 |
--- a/pym/portage/util/futures/unix_events.py |
161 |
+++ b/pym/portage/util/futures/unix_events.py |
162 |
@@ -7,11 +7,15 @@ __all__ = ( |
163 |
) |
164 |
|
165 |
try: |
166 |
+ from asyncio.base_subprocess import BaseSubprocessTransport as _BaseSubprocessTransport |
167 |
from asyncio.unix_events import AbstractChildWatcher as _AbstractChildWatcher |
168 |
except ImportError: |
169 |
_AbstractChildWatcher = object |
170 |
+ _BaseSubprocessTransport = object |
171 |
|
172 |
+import functools |
173 |
import os |
174 |
+import subprocess |
175 |
|
176 |
from portage.util._eventloop.global_event_loop import ( |
177 |
global_event_loop as _global_event_loop, |
178 |
@@ -75,6 +79,100 @@ class _PortageEventLoop(events.AbstractEventLoop): |
179 |
""" |
180 |
return asyncio.Task(coro, loop=self) |
181 |
|
182 |
+ def subprocess_exec(self, protocol_factory, program, *args, **kwargs): |
183 |
+ """ |
184 |
+ Run subprocesses asynchronously using the subprocess module. |
185 |
+ |
186 |
+ @type protocol_factory: callable |
187 |
+ @param protocol_factory: must instantiate a subclass of the |
188 |
+ asyncio.SubprocessProtocol class |
189 |
+ @type program: str or bytes |
190 |
+ @param program: the program to execute |
191 |
+ @type args: str or bytes |
192 |
+ @param args: program's arguments |
193 |
+ @type kwargs: varies |
194 |
+ @param kwargs: subprocess.Popen parameters |
195 |
+ @rtype: asyncio.Future |
196 |
+ @return: Returns a pair of (transport, protocol), where transport |
197 |
+ is an instance of BaseSubprocessTransport |
198 |
+ """ |
199 |
+ |
200 |
+ # python2.7 does not allow arguments with defaults after *args |
201 |
+ stdin = kwargs.pop('stdin', subprocess.PIPE) |
202 |
+ stdout = kwargs.pop('stdout', subprocess.PIPE) |
203 |
+ stderr = kwargs.pop('stderr', subprocess.PIPE) |
204 |
+ |
205 |
+ if subprocess.PIPE in (stdin, stdout, stderr): |
206 |
+ # Requires connect_read/write_pipe implementation, for example |
207 |
+ # see asyncio.unix_events._UnixReadPipeTransport. |
208 |
+ raise NotImplementedError() |
209 |
+ |
210 |
+ universal_newlines = kwargs.pop('universal_newlines', False) |
211 |
+ shell = kwargs.pop('shell', False) |
212 |
+ bufsize = kwargs.pop('bufsize', 0) |
213 |
+ |
214 |
+ if universal_newlines: |
215 |
+ raise ValueError("universal_newlines must be False") |
216 |
+ if shell: |
217 |
+ raise ValueError("shell must be False") |
218 |
+ if bufsize != 0: |
219 |
+ raise ValueError("bufsize must be 0") |
220 |
+ popen_args = (program,) + args |
221 |
+ for arg in popen_args: |
222 |
+ if not isinstance(arg, (str, bytes)): |
223 |
+ raise TypeError("program arguments must be " |
224 |
+ "a bytes or text string, not %s" |
225 |
+ % type(arg).__name__) |
226 |
+ result = self.create_future() |
227 |
+ self._make_subprocess_transport( |
228 |
+ result, protocol_factory(), popen_args, False, stdin, stdout, stderr, |
229 |
+ bufsize, **kwargs) |
230 |
+ return result |
231 |
+ |
232 |
+ def _make_subprocess_transport(self, result, protocol, args, shell, |
233 |
+ stdin, stdout, stderr, bufsize, extra=None, **kwargs): |
234 |
+ waiter = self.create_future() |
235 |
+ transp = _UnixSubprocessTransport(self, |
236 |
+ protocol, args, shell, stdin, stdout, stderr, bufsize, |
237 |
+ waiter=waiter, extra=extra, |
238 |
+ **kwargs) |
239 |
+ |
240 |
+ self._loop._asyncio_child_watcher.add_child_handler( |
241 |
+ transp.get_pid(), self._child_watcher_callback, transp) |
242 |
+ |
243 |
+ waiter.add_done_callback(functools.partial( |
244 |
+ self._subprocess_transport_callback, transp, protocol, result)) |
245 |
+ |
246 |
+ def _subprocess_transport_callback(self, transp, protocol, result, waiter): |
247 |
+ if waiter.exception() is None: |
248 |
+ result.set_result((transp, protocol)) |
249 |
+ else: |
250 |
+ transp.close() |
251 |
+ wait_transp = asyncio.ensure_future(transp._wait(), loop=self) |
252 |
+ wait_transp.add_done_callback( |
253 |
+ functools.partial(self._subprocess_transport_failure, |
254 |
+ result, waiter.exception())) |
255 |
+ |
256 |
+ def _child_watcher_callback(self, pid, returncode, transp): |
257 |
+ self.call_soon_threadsafe(transp._process_exited, returncode) |
258 |
+ |
259 |
+ def _subprocess_transport_failure(self, result, exception, wait_transp): |
260 |
+ result.set_exception(wait_transp.exception() or exception) |
261 |
+ |
262 |
+ |
263 |
+class _UnixSubprocessTransport(_BaseSubprocessTransport): |
264 |
+ """ |
265 |
+ This is identical to the standard library's private |
266 |
+ asyncio.unix_events._UnixSubprocessTransport class, except that |
267 |
+ subprocess.PIPE is not implemented for stdin, since that would |
268 |
+ require connect_write_pipe support in the event loop. For example, |
269 |
+ see the asyncio.unix_events._UnixWritePipeTransport class. |
270 |
+ """ |
271 |
+ def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs): |
272 |
+ self._proc = subprocess.Popen( |
273 |
+ args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr, |
274 |
+ universal_newlines=False, bufsize=bufsize, **kwargs) |
275 |
+ |
276 |
|
277 |
class AbstractChildWatcher(_AbstractChildWatcher): |
278 |
def add_child_handler(self, pid, callback, *args): |
279 |
-- |
280 |
2.13.6 |