Gentoo Archives: gentoo-portage-dev

From: Zac Medico <zmedico@g.o>
To: gentoo-portage-dev@l.g.o
Cc: Zac Medico <zmedico@g.o>
Subject: [gentoo-portage-dev] [PATCH] Implement _PortageEventLoop.subprocess_exec (bug 649588)
Date: Thu, 12 Apr 2018 09:02:44
Message-Id: 20180412090224.3500-1-zmedico@gentoo.org
1 In python versions that support asyncio, this allows API consumers
2 to use the asyncio.create_subprocess_exec() function with portage's
3 internal event loop. Currently, subprocess.PIPE is not implemented
4 because that would require an implementation of asyncio's private
5 asyncio.unix_events._UnixReadPipeTransport class. However, it's
6 possible to use pipes created with os.pipe() for stdin, stdout,
7 and stderr, as demonstrated in the included unit tests.
8
9 Bug: https://bugs.gentoo.org/649588
10 ---
11 .../util/futures/asyncio/test_subprocess_exec.py | 136 +++++++++++++++++++++
12 pym/portage/util/futures/unix_events.py | 98 +++++++++++++++
13 2 files changed, 234 insertions(+)
14 create mode 100644 pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py
15
16 diff --git a/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py b/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py
17 new file mode 100644
18 index 000000000..f77d362f3
19 --- /dev/null
20 +++ b/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py
21 @@ -0,0 +1,136 @@
22 +# Copyright 2018 Gentoo Foundation
23 +# Distributed under the terms of the GNU General Public License v2
24 +
25 +import errno
26 +import fcntl
27 +import os
28 +
29 +from portage.process import find_binary
30 +from portage.tests import TestCase
31 +from portage.util.futures import asyncio
32 +from portage.util.futures.executor.fork import ForkExecutor
33 +from portage.util.futures.unix_events import DefaultEventLoopPolicy
34 +
35 +
36 +class SubprocessExecTestCase(TestCase):
37 + def _run_test(self, test, cleanup):
38 + initial_policy = asyncio.get_event_loop_policy()
39 + if not isinstance(initial_policy, DefaultEventLoopPolicy):
40 + asyncio.set_event_loop_policy(DefaultEventLoopPolicy())
41 +
42 + try:
43 + test(asyncio.get_event_loop())
44 + finally:
45 + cleanup(asyncio.get_event_loop())
46 + asyncio.set_event_loop_policy(initial_policy)
47 +
48 + def testEcho(self):
49 + if not hasattr(asyncio, 'create_subprocess_exec'):
50 + self.skipTest('create_subprocess_exec not implemented for python2')
51 +
52 + args_tuple = (b'hello', b'world')
53 + echo_binary = find_binary("echo")
54 + self.assertNotEqual(echo_binary, None)
55 + echo_binary = os.fsencode(echo_binary)
56 +
57 + # Use os.pipe(), since this loop does not implement the
58 + # ReadTransport necessary for subprocess.PIPE support.
59 + stdout_pr, stdout_pw = os.pipe()
60 + stdout_pr = open(stdout_pr, 'rb', 0)
61 + stdout_pw = open(stdout_pw, 'wb', 0)
62 + files = [stdout_pr, stdout_pw]
63 +
64 + def test(loop):
65 + with open(os.devnull, 'rb', 0) as devnull:
66 + proc = loop.run_until_complete(
67 + asyncio.create_subprocess_exec(
68 + echo_binary, *args_tuple,
69 + stdin=devnull, stdout=stdout_pw, stderr=stdout_pw))
70 +
71 + # This belongs exclusively to the subprocess now.
72 + stdout_pw.close()
73 +
74 + read_buffer = []
75 + fcntl.fcntl(stdout_pr.fileno(), fcntl.F_SETFL,
76 + fcntl.fcntl(stdout_pr.fileno(), fcntl.F_GETFL) | os.O_NONBLOCK)
77 + loop.add_reader(stdout_pr.fileno(),
78 + lambda: read_buffer.extend(self._read_stdio(stdout_pr)))
79 +
80 + self.assertEqual(loop.run_until_complete(proc.wait()), os.EX_OK)
81 + self.assertEqual(tuple(b''.join(read_buffer).split()), args_tuple)
82 +
83 + def cleanup(loop):
84 + loop.remove_reader(stdout_pr.fileno())
85 + for f in files:
86 + f.close()
87 +
88 + self._run_test(test, cleanup)
89 +
90 + def testCat(self):
91 + if not hasattr(asyncio, 'create_subprocess_exec'):
92 + self.skipTest('create_subprocess_exec not implemented for python2')
93 +
94 + stdin_data = b'hello world'
95 + cat_binary = find_binary("cat")
96 + self.assertNotEqual(cat_binary, None)
97 + cat_binary = os.fsencode(cat_binary)
98 +
99 + # Use os.pipe(), since this loop does not implement the
100 + # ReadTransport necessary for subprocess.PIPE support.
101 + stdout_pr, stdout_pw = os.pipe()
102 + stdout_pr = open(stdout_pr, 'rb', 0)
103 + stdout_pw = open(stdout_pw, 'wb', 0)
104 +
105 + stdin_pr, stdin_pw = os.pipe()
106 + stdin_pr = open(stdin_pr, 'rb', 0)
107 + stdin_pw = open(stdin_pw, 'wb', 0)
108 +
109 + files = [stdout_pr, stdout_pw, stdin_pr, stdin_pw]
110 +
111 + def test(loop):
112 + proc = loop.run_until_complete(
113 + asyncio.create_subprocess_exec(
114 + cat_binary,
115 + stdin=stdin_pr, stdout=stdout_pw, stderr=stdout_pw))
116 +
117 + # These belong exclusively to the subprocess now.
118 + stdout_pw.close()
119 + stdin_pr.close()
120 +
121 + read_buffer = []
122 + fcntl.fcntl(stdout_pr.fileno(), fcntl.F_SETFL,
123 + fcntl.fcntl(stdout_pr.fileno(), fcntl.F_GETFL) | os.O_NONBLOCK)
124 + loop.add_reader(stdout_pr.fileno(),
125 + lambda: read_buffer.extend(self._read_stdio(stdout_pr)))
126 +
127 + with ForkExecutor(loop=loop) as executor:
128 + writer = asyncio.ensure_future(loop.run_in_executor(
129 + executor, stdin_pw.write, stdin_data), loop=loop)
130 +
131 + # This belongs exlusively to the writer now.
132 + stdin_pw.close()
133 + loop.run_until_complete(writer)
134 +
135 + self.assertEqual(loop.run_until_complete(proc.wait()), os.EX_OK)
136 + self.assertEqual(b''.join(read_buffer), stdin_data)
137 +
138 + def cleanup(loop):
139 + loop.remove_reader(stdout_pr.fileno())
140 + for f in files:
141 + f.close()
142 +
143 + self._run_test(test, cleanup)
144 +
145 + @staticmethod
146 + def _read_stdio(stdio_pr):
147 + while True:
148 + try:
149 + buf = stdio_pr.read()
150 + except OSError as e:
151 + if e.errno == errno.EAGAIN:
152 + break
153 + else:
154 + if buf:
155 + yield buf
156 + else:
157 + break
158 diff --git a/pym/portage/util/futures/unix_events.py b/pym/portage/util/futures/unix_events.py
159 index 6fcef45fa..d5c0480bf 100644
160 --- a/pym/portage/util/futures/unix_events.py
161 +++ b/pym/portage/util/futures/unix_events.py
162 @@ -7,11 +7,15 @@ __all__ = (
163 )
164
165 try:
166 + from asyncio.base_subprocess import BaseSubprocessTransport as _BaseSubprocessTransport
167 from asyncio.unix_events import AbstractChildWatcher as _AbstractChildWatcher
168 except ImportError:
169 _AbstractChildWatcher = object
170 + _BaseSubprocessTransport = object
171
172 +import functools
173 import os
174 +import subprocess
175
176 from portage.util._eventloop.global_event_loop import (
177 global_event_loop as _global_event_loop,
178 @@ -75,6 +79,100 @@ class _PortageEventLoop(events.AbstractEventLoop):
179 """
180 return asyncio.Task(coro, loop=self)
181
182 + def subprocess_exec(self, protocol_factory, program, *args, **kwargs):
183 + """
184 + Run subprocesses asynchronously using the subprocess module.
185 +
186 + @type protocol_factory: callable
187 + @param protocol_factory: must instantiate a subclass of the
188 + asyncio.SubprocessProtocol class
189 + @type program: str or bytes
190 + @param program: the program to execute
191 + @type args: str or bytes
192 + @param args: program's arguments
193 + @type kwargs: varies
194 + @param kwargs: subprocess.Popen parameters
195 + @rtype: asyncio.Future
196 + @return: Returns a pair of (transport, protocol), where transport
197 + is an instance of BaseSubprocessTransport
198 + """
199 +
200 + # python2.7 does not allow arguments with defaults after *args
201 + stdin = kwargs.pop('stdin', subprocess.PIPE)
202 + stdout = kwargs.pop('stdout', subprocess.PIPE)
203 + stderr = kwargs.pop('stderr', subprocess.PIPE)
204 +
205 + if subprocess.PIPE in (stdin, stdout, stderr):
206 + # Requires connect_read/write_pipe implementation, for example
207 + # see asyncio.unix_events._UnixReadPipeTransport.
208 + raise NotImplementedError()
209 +
210 + universal_newlines = kwargs.pop('universal_newlines', False)
211 + shell = kwargs.pop('shell', False)
212 + bufsize = kwargs.pop('bufsize', 0)
213 +
214 + if universal_newlines:
215 + raise ValueError("universal_newlines must be False")
216 + if shell:
217 + raise ValueError("shell must be False")
218 + if bufsize != 0:
219 + raise ValueError("bufsize must be 0")
220 + popen_args = (program,) + args
221 + for arg in popen_args:
222 + if not isinstance(arg, (str, bytes)):
223 + raise TypeError("program arguments must be "
224 + "a bytes or text string, not %s"
225 + % type(arg).__name__)
226 + result = self.create_future()
227 + self._make_subprocess_transport(
228 + result, protocol_factory(), popen_args, False, stdin, stdout, stderr,
229 + bufsize, **kwargs)
230 + return result
231 +
232 + def _make_subprocess_transport(self, result, protocol, args, shell,
233 + stdin, stdout, stderr, bufsize, extra=None, **kwargs):
234 + waiter = self.create_future()
235 + transp = _UnixSubprocessTransport(self,
236 + protocol, args, shell, stdin, stdout, stderr, bufsize,
237 + waiter=waiter, extra=extra,
238 + **kwargs)
239 +
240 + self._loop._asyncio_child_watcher.add_child_handler(
241 + transp.get_pid(), self._child_watcher_callback, transp)
242 +
243 + waiter.add_done_callback(functools.partial(
244 + self._subprocess_transport_callback, transp, protocol, result))
245 +
246 + def _subprocess_transport_callback(self, transp, protocol, result, waiter):
247 + if waiter.exception() is None:
248 + result.set_result((transp, protocol))
249 + else:
250 + transp.close()
251 + wait_transp = asyncio.ensure_future(transp._wait(), loop=self)
252 + wait_transp.add_done_callback(
253 + functools.partial(self._subprocess_transport_failure,
254 + result, waiter.exception()))
255 +
256 + def _child_watcher_callback(self, pid, returncode, transp):
257 + self.call_soon_threadsafe(transp._process_exited, returncode)
258 +
259 + def _subprocess_transport_failure(self, result, exception, wait_transp):
260 + result.set_exception(wait_transp.exception() or exception)
261 +
262 +
263 +class _UnixSubprocessTransport(_BaseSubprocessTransport):
264 + """
265 + This is identical to the standard library's private
266 + asyncio.unix_events._UnixSubprocessTransport class, except that
267 + subprocess.PIPE is not implemented for stdin, since that would
268 + require connect_write_pipe support in the event loop. For example,
269 + see the asyncio.unix_events._UnixWritePipeTransport class.
270 + """
271 + def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
272 + self._proc = subprocess.Popen(
273 + args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
274 + universal_newlines=False, bufsize=bufsize, **kwargs)
275 +
276
277 class AbstractChildWatcher(_AbstractChildWatcher):
278 def add_child_handler(self, pid, callback, *args):
279 --
280 2.13.6