1 |
In python versions that support asyncio, this allows API consumers |
2 |
to use subprocess.PIPE for asyncio.create_subprocess_exec() stdout |
3 |
and stderr parameters. |
4 |
|
5 |
Bug: https://bugs.gentoo.org/649588 |
6 |
--- |
7 |
.../util/futures/asyncio/test_subprocess_exec.py | 30 ++++ |
8 |
pym/portage/util/futures/unix_events.py | 157 ++++++++++++++++++++- |
9 |
2 files changed, 184 insertions(+), 3 deletions(-) |
10 |
|
11 |
diff --git a/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py b/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py |
12 |
index d30f48c43..94984fc93 100644 |
13 |
--- a/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py |
14 |
+++ b/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py |
15 |
@@ -2,6 +2,7 @@ |
16 |
# Distributed under the terms of the GNU General Public License v2 |
17 |
|
18 |
import os |
19 |
+import subprocess |
20 |
|
21 |
from portage.process import find_binary |
22 |
from portage.tests import TestCase |
23 |
@@ -161,3 +162,32 @@ class SubprocessExecTestCase(TestCase): |
24 |
f.close() |
25 |
|
26 |
self._run_test(test) |
27 |
+ |
28 |
+ def testReadTransport(self): |
29 |
+ """ |
30 |
+ Test asyncio.create_subprocess_exec(stdout=subprocess.PIPE) which |
31 |
+ requires an AbstractEventLoop.connect_read_pipe implementation |
32 |
+ (and a ReadTransport implementation for it to return). |
33 |
+ """ |
34 |
+ if not hasattr(asyncio, 'create_subprocess_exec'): |
35 |
+ self.skipTest('create_subprocess_exec not implemented for python2') |
36 |
+ |
37 |
+ args_tuple = (b'hello', b'world') |
38 |
+ echo_binary = find_binary("echo") |
39 |
+ self.assertNotEqual(echo_binary, None) |
40 |
+ echo_binary = echo_binary.encode() |
41 |
+ |
42 |
+ def test(loop): |
43 |
+ with open(os.devnull, 'rb', 0) as devnull: |
44 |
+ proc = loop.run_until_complete( |
45 |
+ asyncio.create_subprocess_exec( |
46 |
+ echo_binary, *args_tuple, |
47 |
+ stdin=devnull, |
48 |
+ stdout=subprocess.PIPE, stderr=subprocess.STDOUT)) |
49 |
+ |
50 |
+ self.assertEqual( |
51 |
+ tuple(loop.run_until_complete(proc.stdout.read()).split()), |
52 |
+ args_tuple) |
53 |
+ self.assertEqual(loop.run_until_complete(proc.wait()), os.EX_OK) |
54 |
+ |
55 |
+ self._run_test(test) |
56 |
diff --git a/pym/portage/util/futures/unix_events.py b/pym/portage/util/futures/unix_events.py |
57 |
index 1abc420e1..6ba0adff6 100644 |
58 |
--- a/pym/portage/util/futures/unix_events.py |
59 |
+++ b/pym/portage/util/futures/unix_events.py |
60 |
@@ -9,12 +9,18 @@ __all__ = ( |
61 |
try: |
62 |
from asyncio.base_subprocess import BaseSubprocessTransport as _BaseSubprocessTransport |
63 |
from asyncio.unix_events import AbstractChildWatcher as _AbstractChildWatcher |
64 |
+ from asyncio.transports import ReadTransport as _ReadTransport |
65 |
except ImportError: |
66 |
_AbstractChildWatcher = object |
67 |
_BaseSubprocessTransport = object |
68 |
+ _ReadTransport = object |
69 |
|
70 |
+import errno |
71 |
+import fcntl |
72 |
import functools |
73 |
+import logging |
74 |
import os |
75 |
+import stat |
76 |
import subprocess |
77 |
|
78 |
from portage.util._eventloop.global_event_loop import ( |
79 |
@@ -81,6 +87,35 @@ class _PortageEventLoop(events.AbstractEventLoop): |
80 |
""" |
81 |
return asyncio.Task(coro, loop=self) |
82 |
|
83 |
+ def connect_read_pipe(self, protocol_factory, pipe): |
84 |
+ """ |
85 |
+ Register read pipe in event loop. Set the pipe to non-blocking mode. |
86 |
+ |
87 |
+ @type protocol_factory: callable |
88 |
+ @param protocol_factory: must instantiate object with Protocol interface |
89 |
+ @type pipe: file |
90 |
+ @param pipe: a pipe to read from |
91 |
+ @rtype: asyncio.Future |
92 |
+ @return: Return pair (transport, protocol), where transport supports the |
93 |
+ ReadTransport interface. |
94 |
+ """ |
95 |
+ protocol = protocol_factory() |
96 |
+ result = self.create_future() |
97 |
+ waiter = self.create_future() |
98 |
+ transport = self._make_read_pipe_transport(pipe, protocol, waiter=waiter) |
99 |
+ |
100 |
+ def waiter_callback(waiter): |
101 |
+ try: |
102 |
+ waiter.result() |
103 |
+ except Exception as e: |
104 |
+ transport.close() |
105 |
+ result.set_exception(e) |
106 |
+ else: |
107 |
+ result.set_result((transport, protocol)) |
108 |
+ |
109 |
+ waiter.add_done_callback(waiter_callback) |
110 |
+ return result |
111 |
+ |
112 |
def subprocess_exec(self, protocol_factory, program, *args, **kwargs): |
113 |
""" |
114 |
Run subprocesses asynchronously using the subprocess module. |
115 |
@@ -104,9 +139,9 @@ class _PortageEventLoop(events.AbstractEventLoop): |
116 |
stdout = kwargs.pop('stdout', subprocess.PIPE) |
117 |
stderr = kwargs.pop('stderr', subprocess.PIPE) |
118 |
|
119 |
- if subprocess.PIPE in (stdin, stdout, stderr): |
120 |
- # Requires connect_read/write_pipe implementation, for example |
121 |
- # see asyncio.unix_events._UnixReadPipeTransport. |
122 |
+ if stdin == subprocess.PIPE: |
123 |
+ # Requires connect_write_pipe implementation, for example |
124 |
+ # see asyncio.unix_events._UnixWritePipeTransport. |
125 |
raise NotImplementedError() |
126 |
|
127 |
universal_newlines = kwargs.pop('universal_newlines', False) |
128 |
@@ -131,6 +166,10 @@ class _PortageEventLoop(events.AbstractEventLoop): |
129 |
bufsize, **kwargs) |
130 |
return result |
131 |
|
132 |
+ def _make_read_pipe_transport(self, pipe, protocol, waiter=None, |
133 |
+ extra=None): |
134 |
+ return _UnixReadPipeTransport(self, pipe, protocol, waiter, extra) |
135 |
+ |
136 |
def _make_subprocess_transport(self, result, protocol, args, shell, |
137 |
stdin, stdout, stderr, bufsize, extra=None, **kwargs): |
138 |
waiter = self.create_future() |
139 |
@@ -162,6 +201,118 @@ class _PortageEventLoop(events.AbstractEventLoop): |
140 |
result.set_exception(wait_transp.exception() or exception) |
141 |
|
142 |
|
143 |
+if hasattr(os, 'set_blocking'): |
144 |
+ def _set_nonblocking(fd): |
145 |
+ os.set_blocking(fd, False) |
146 |
+else: |
147 |
+ def _set_nonblocking(fd): |
148 |
+ flags = fcntl.fcntl(fd, fcntl.F_GETFL) |
149 |
+ flags = flags | os.O_NONBLOCK |
150 |
+ fcntl.fcntl(fd, fcntl.F_SETFL, flags) |
151 |
+ |
152 |
+ |
153 |
+class _UnixReadPipeTransport(_ReadTransport): |
154 |
+ """ |
155 |
+ This is identical to the standard library's private |
156 |
+ asyncio.unix_events._UnixReadPipeTransport class, except that it |
157 |
+ only calls public AbstractEventLoop methods. |
158 |
+ """ |
159 |
+ |
160 |
+ max_size = 256 * 1024 # max bytes we read in one event loop iteration |
161 |
+ |
162 |
+ def __init__(self, loop, pipe, protocol, waiter=None, extra=None): |
163 |
+ super().__init__(extra) |
164 |
+ self._extra['pipe'] = pipe |
165 |
+ self._loop = loop |
166 |
+ self._pipe = pipe |
167 |
+ self._fileno = pipe.fileno() |
168 |
+ self._protocol = protocol |
169 |
+ self._closing = False |
170 |
+ |
171 |
+ mode = os.fstat(self._fileno).st_mode |
172 |
+ if not (stat.S_ISFIFO(mode) or |
173 |
+ stat.S_ISSOCK(mode) or |
174 |
+ stat.S_ISCHR(mode)): |
175 |
+ self._pipe = None |
176 |
+ self._fileno = None |
177 |
+ self._protocol = None |
178 |
+ raise ValueError("Pipe transport is for pipes/sockets only.") |
179 |
+ |
180 |
+ _set_nonblocking(self._fileno) |
181 |
+ |
182 |
+ self._loop.call_soon(self._protocol.connection_made, self) |
183 |
+ # only start reading when connection_made() has been called |
184 |
+ self._loop.call_soon(self._loop.add_reader, |
185 |
+ self._fileno, self._read_ready) |
186 |
+ if waiter is not None: |
187 |
+ # only wake up the waiter when connection_made() has been called |
188 |
+ self._loop.call_soon( |
189 |
+ lambda: None if waiter.cancelled() else waiter.set_result(None)) |
190 |
+ |
191 |
+ def _read_ready(self): |
192 |
+ try: |
193 |
+ data = os.read(self._fileno, self.max_size) |
194 |
+ except (BlockingIOError, InterruptedError): |
195 |
+ pass |
196 |
+ except OSError as exc: |
197 |
+ self._fatal_error(exc, 'Fatal read error on pipe transport') |
198 |
+ else: |
199 |
+ if data: |
200 |
+ self._protocol.data_received(data) |
201 |
+ else: |
202 |
+ self._closing = True |
203 |
+ self._loop.remove_reader(self._fileno) |
204 |
+ self._loop.call_soon(self._protocol.eof_received) |
205 |
+ self._loop.call_soon(self._call_connection_lost, None) |
206 |
+ |
207 |
+ def pause_reading(self): |
208 |
+ self._loop.remove_reader(self._fileno) |
209 |
+ |
210 |
+ def resume_reading(self): |
211 |
+ self._loop.add_reader(self._fileno, self._read_ready) |
212 |
+ |
213 |
+ def set_protocol(self, protocol): |
214 |
+ self._protocol = protocol |
215 |
+ |
216 |
+ def get_protocol(self): |
217 |
+ return self._protocol |
218 |
+ |
219 |
+ def is_closing(self): |
220 |
+ return self._closing |
221 |
+ |
222 |
+ def close(self): |
223 |
+ if not self._closing: |
224 |
+ self._close(None) |
225 |
+ |
226 |
+ def _fatal_error(self, exc, message='Fatal error on pipe transport'): |
227 |
+ # should be called by exception handler only |
228 |
+ if (isinstance(exc, OSError) and exc.errno == errno.EIO): |
229 |
+ if self._loop.get_debug(): |
230 |
+ logging.debug("%r: %s", self, message, exc_info=True) |
231 |
+ else: |
232 |
+ self._loop.call_exception_handler({ |
233 |
+ 'message': message, |
234 |
+ 'exception': exc, |
235 |
+ 'transport': self, |
236 |
+ 'protocol': self._protocol, |
237 |
+ }) |
238 |
+ self._close(exc) |
239 |
+ |
240 |
+ def _close(self, exc): |
241 |
+ self._closing = True |
242 |
+ self._loop.remove_reader(self._fileno) |
243 |
+ self._loop.call_soon(self._call_connection_lost, exc) |
244 |
+ |
245 |
+ def _call_connection_lost(self, exc): |
246 |
+ try: |
247 |
+ self._protocol.connection_lost(exc) |
248 |
+ finally: |
249 |
+ self._pipe.close() |
250 |
+ self._pipe = None |
251 |
+ self._protocol = None |
252 |
+ self._loop = None |
253 |
+ |
254 |
+ |
255 |
class _UnixSubprocessTransport(_BaseSubprocessTransport): |
256 |
""" |
257 |
This is identical to the standard library's private |
258 |
-- |
259 |
2.13.6 |