Gentoo Archives: gentoo-portage-dev

From: Zac Medico <zmedico@g.o>
To: gentoo-portage-dev@l.g.o
Cc: Zac Medico <zmedico@g.o>
Subject: [gentoo-portage-dev] [PATCH] Implement AbstractEventLoop.connect_read_pipe (bug 649588)
Date: Sun, 15 Apr 2018 01:42:04
Message-Id: 20180415013942.5752-1-zmedico@gentoo.org
1 In python versions that support asyncio, this allows API consumers
2 to use subprocess.PIPE for asyncio.create_subprocess_exec() stdout
3 and stderr parameters.
4
5 Bug: https://bugs.gentoo.org/649588
6 ---
7 .../util/futures/asyncio/test_subprocess_exec.py | 30 ++++
8 pym/portage/util/futures/unix_events.py | 157 ++++++++++++++++++++-
9 2 files changed, 184 insertions(+), 3 deletions(-)
10
11 diff --git a/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py b/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py
12 index d30f48c43..94984fc93 100644
13 --- a/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py
14 +++ b/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py
15 @@ -2,6 +2,7 @@
16 # Distributed under the terms of the GNU General Public License v2
17
18 import os
19 +import subprocess
20
21 from portage.process import find_binary
22 from portage.tests import TestCase
23 @@ -161,3 +162,32 @@ class SubprocessExecTestCase(TestCase):
24 f.close()
25
26 self._run_test(test)
27 +
28 + def testReadTransport(self):
29 + """
30 + Test asyncio.create_subprocess_exec(stdout=subprocess.PIPE) which
31 + requires an AbstractEventLoop.connect_read_pipe implementation
32 + (and a ReadTransport implementation for it to return).
33 + """
34 + if not hasattr(asyncio, 'create_subprocess_exec'):
35 + self.skipTest('create_subprocess_exec not implemented for python2')
36 +
37 + args_tuple = (b'hello', b'world')
38 + echo_binary = find_binary("echo")
39 + self.assertNotEqual(echo_binary, None)
40 + echo_binary = echo_binary.encode()
41 +
42 + def test(loop):
43 + with open(os.devnull, 'rb', 0) as devnull:
44 + proc = loop.run_until_complete(
45 + asyncio.create_subprocess_exec(
46 + echo_binary, *args_tuple,
47 + stdin=devnull,
48 + stdout=subprocess.PIPE, stderr=subprocess.STDOUT))
49 +
50 + self.assertEqual(
51 + tuple(loop.run_until_complete(proc.stdout.read()).split()),
52 + args_tuple)
53 + self.assertEqual(loop.run_until_complete(proc.wait()), os.EX_OK)
54 +
55 + self._run_test(test)
56 diff --git a/pym/portage/util/futures/unix_events.py b/pym/portage/util/futures/unix_events.py
57 index 1abc420e1..6ba0adff6 100644
58 --- a/pym/portage/util/futures/unix_events.py
59 +++ b/pym/portage/util/futures/unix_events.py
60 @@ -9,12 +9,18 @@ __all__ = (
61 try:
62 from asyncio.base_subprocess import BaseSubprocessTransport as _BaseSubprocessTransport
63 from asyncio.unix_events import AbstractChildWatcher as _AbstractChildWatcher
64 + from asyncio.transports import ReadTransport as _ReadTransport
65 except ImportError:
66 _AbstractChildWatcher = object
67 _BaseSubprocessTransport = object
68 + _ReadTransport = object
69
70 +import errno
71 +import fcntl
72 import functools
73 +import logging
74 import os
75 +import stat
76 import subprocess
77
78 from portage.util._eventloop.global_event_loop import (
79 @@ -81,6 +87,35 @@ class _PortageEventLoop(events.AbstractEventLoop):
80 """
81 return asyncio.Task(coro, loop=self)
82
83 + def connect_read_pipe(self, protocol_factory, pipe):
84 + """
85 + Register read pipe in event loop. Set the pipe to non-blocking mode.
86 +
87 + @type protocol_factory: callable
88 + @param protocol_factory: must instantiate object with Protocol interface
89 + @type pipe: file
90 + @param pipe: a pipe to read from
91 + @rtype: asyncio.Future
92 + @return: Return pair (transport, protocol), where transport supports the
93 + ReadTransport interface.
94 + """
95 + protocol = protocol_factory()
96 + result = self.create_future()
97 + waiter = self.create_future()
98 + transport = self._make_read_pipe_transport(pipe, protocol, waiter=waiter)
99 +
100 + def waiter_callback(waiter):
101 + try:
102 + waiter.result()
103 + except Exception as e:
104 + transport.close()
105 + result.set_exception(e)
106 + else:
107 + result.set_result((transport, protocol))
108 +
109 + waiter.add_done_callback(waiter_callback)
110 + return result
111 +
112 def subprocess_exec(self, protocol_factory, program, *args, **kwargs):
113 """
114 Run subprocesses asynchronously using the subprocess module.
115 @@ -104,9 +139,9 @@ class _PortageEventLoop(events.AbstractEventLoop):
116 stdout = kwargs.pop('stdout', subprocess.PIPE)
117 stderr = kwargs.pop('stderr', subprocess.PIPE)
118
119 - if subprocess.PIPE in (stdin, stdout, stderr):
120 - # Requires connect_read/write_pipe implementation, for example
121 - # see asyncio.unix_events._UnixReadPipeTransport.
122 + if stdin == subprocess.PIPE:
123 + # Requires connect_write_pipe implementation, for example
124 + # see asyncio.unix_events._UnixWritePipeTransport.
125 raise NotImplementedError()
126
127 universal_newlines = kwargs.pop('universal_newlines', False)
128 @@ -131,6 +166,10 @@ class _PortageEventLoop(events.AbstractEventLoop):
129 bufsize, **kwargs)
130 return result
131
132 + def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
133 + extra=None):
134 + return _UnixReadPipeTransport(self, pipe, protocol, waiter, extra)
135 +
136 def _make_subprocess_transport(self, result, protocol, args, shell,
137 stdin, stdout, stderr, bufsize, extra=None, **kwargs):
138 waiter = self.create_future()
139 @@ -162,6 +201,118 @@ class _PortageEventLoop(events.AbstractEventLoop):
140 result.set_exception(wait_transp.exception() or exception)
141
142
143 +if hasattr(os, 'set_blocking'):
144 + def _set_nonblocking(fd):
145 + os.set_blocking(fd, False)
146 +else:
147 + def _set_nonblocking(fd):
148 + flags = fcntl.fcntl(fd, fcntl.F_GETFL)
149 + flags = flags | os.O_NONBLOCK
150 + fcntl.fcntl(fd, fcntl.F_SETFL, flags)
151 +
152 +
153 +class _UnixReadPipeTransport(_ReadTransport):
154 + """
155 + This is identical to the standard library's private
156 + asyncio.unix_events._UnixReadPipeTransport class, except that it
157 + only calls public AbstractEventLoop methods.
158 + """
159 +
160 + max_size = 256 * 1024 # max bytes we read in one event loop iteration
161 +
162 + def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
163 + super().__init__(extra)
164 + self._extra['pipe'] = pipe
165 + self._loop = loop
166 + self._pipe = pipe
167 + self._fileno = pipe.fileno()
168 + self._protocol = protocol
169 + self._closing = False
170 +
171 + mode = os.fstat(self._fileno).st_mode
172 + if not (stat.S_ISFIFO(mode) or
173 + stat.S_ISSOCK(mode) or
174 + stat.S_ISCHR(mode)):
175 + self._pipe = None
176 + self._fileno = None
177 + self._protocol = None
178 + raise ValueError("Pipe transport is for pipes/sockets only.")
179 +
180 + _set_nonblocking(self._fileno)
181 +
182 + self._loop.call_soon(self._protocol.connection_made, self)
183 + # only start reading when connection_made() has been called
184 + self._loop.call_soon(self._loop.add_reader,
185 + self._fileno, self._read_ready)
186 + if waiter is not None:
187 + # only wake up the waiter when connection_made() has been called
188 + self._loop.call_soon(
189 + lambda: None if waiter.cancelled() else waiter.set_result(None))
190 +
191 + def _read_ready(self):
192 + try:
193 + data = os.read(self._fileno, self.max_size)
194 + except (BlockingIOError, InterruptedError):
195 + pass
196 + except OSError as exc:
197 + self._fatal_error(exc, 'Fatal read error on pipe transport')
198 + else:
199 + if data:
200 + self._protocol.data_received(data)
201 + else:
202 + self._closing = True
203 + self._loop.remove_reader(self._fileno)
204 + self._loop.call_soon(self._protocol.eof_received)
205 + self._loop.call_soon(self._call_connection_lost, None)
206 +
207 + def pause_reading(self):
208 + self._loop.remove_reader(self._fileno)
209 +
210 + def resume_reading(self):
211 + self._loop.add_reader(self._fileno, self._read_ready)
212 +
213 + def set_protocol(self, protocol):
214 + self._protocol = protocol
215 +
216 + def get_protocol(self):
217 + return self._protocol
218 +
219 + def is_closing(self):
220 + return self._closing
221 +
222 + def close(self):
223 + if not self._closing:
224 + self._close(None)
225 +
226 + def _fatal_error(self, exc, message='Fatal error on pipe transport'):
227 + # should be called by exception handler only
228 + if (isinstance(exc, OSError) and exc.errno == errno.EIO):
229 + if self._loop.get_debug():
230 + logging.debug("%r: %s", self, message, exc_info=True)
231 + else:
232 + self._loop.call_exception_handler({
233 + 'message': message,
234 + 'exception': exc,
235 + 'transport': self,
236 + 'protocol': self._protocol,
237 + })
238 + self._close(exc)
239 +
240 + def _close(self, exc):
241 + self._closing = True
242 + self._loop.remove_reader(self._fileno)
243 + self._loop.call_soon(self._call_connection_lost, exc)
244 +
245 + def _call_connection_lost(self, exc):
246 + try:
247 + self._protocol.connection_lost(exc)
248 + finally:
249 + self._pipe.close()
250 + self._pipe = None
251 + self._protocol = None
252 + self._loop = None
253 +
254 +
255 class _UnixSubprocessTransport(_BaseSubprocessTransport):
256 """
257 This is identical to the standard library's private
258 --
259 2.13.6