Gentoo Archives: gentoo-portage-dev

From: Zac Medico <zmedico@g.o>
To: gentoo-portage-dev@l.g.o
Cc: Zac Medico <zmedico@g.o>
Subject: [gentoo-portage-dev] [PATCH] Implement AbstractEventLoop.connect_write_pipe (bug 649588)
Date: Mon, 16 Apr 2018 01:12:21
Message-Id: 20180416010955.22236-1-zmedico@gentoo.org
1 In python versions that support asyncio, this allows API consumers
2 to use subprocess.PIPE for asyncio.create_subprocess_exec() stdin
3 parameters.
4
5 Bug: https://bugs.gentoo.org/649588
6 ---
7 .../util/futures/asyncio/test_subprocess_exec.py | 34 +++
8 pym/portage/util/futures/transports.py | 90 +++++++
9 pym/portage/util/futures/unix_events.py | 259 ++++++++++++++++++++-
10 3 files changed, 372 insertions(+), 11 deletions(-)
11 create mode 100644 pym/portage/util/futures/transports.py
12
13 diff --git a/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py b/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py
14 index 94984fc93..8c8c395ca 100644
15 --- a/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py
16 +++ b/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py
17 @@ -191,3 +191,37 @@ class SubprocessExecTestCase(TestCase):
18 self.assertEqual(loop.run_until_complete(proc.wait()), os.EX_OK)
19
20 self._run_test(test)
21 +
22 + def testWriteTransport(self):
23 + """
24 + Test asyncio.create_subprocess_exec(stdin=subprocess.PIPE) which
25 + requires an AbstractEventLoop.connect_write_pipe implementation
26 + (and a WriteTransport implementation for it to return).
27 + """
28 + if not hasattr(asyncio, 'create_subprocess_exec'):
29 + self.skipTest('create_subprocess_exec not implemented for python2')
30 +
31 + stdin_data = b'hello world'
32 + cat_binary = find_binary("cat")
33 + self.assertNotEqual(cat_binary, None)
34 + cat_binary = cat_binary.encode()
35 +
36 + def test(loop):
37 + proc = loop.run_until_complete(
38 + asyncio.create_subprocess_exec(
39 + cat_binary,
40 + stdin=subprocess.PIPE,
41 + stdout=subprocess.PIPE, stderr=subprocess.STDOUT))
42 +
43 + # This buffers data when necessary to avoid blocking.
44 + proc.stdin.write(stdin_data)
45 + # Any buffered data is written asynchronously after the
46 + # close method is called.
47 + proc.stdin.close()
48 +
49 + self.assertEqual(
50 + loop.run_until_complete(proc.stdout.read()),
51 + stdin_data)
52 + self.assertEqual(loop.run_until_complete(proc.wait()), os.EX_OK)
53 +
54 + self._run_test(test)
55 diff --git a/pym/portage/util/futures/transports.py b/pym/portage/util/futures/transports.py
56 new file mode 100644
57 index 000000000..60ea93073
58 --- /dev/null
59 +++ b/pym/portage/util/futures/transports.py
60 @@ -0,0 +1,90 @@
61 +# Copyright 2018 Gentoo Foundation
62 +# Distributed under the terms of the GNU General Public License v2
63 +
64 +try:
65 + from asyncio.transports import Transport as _Transport
66 +except ImportError:
67 + _Transport = object
68 +
69 +
70 +class _FlowControlMixin(_Transport):
71 + """
72 + This is identical to the standard library's private
73 + asyncio.transports._FlowControlMixin class.
74 +
75 + All the logic for (write) flow control in a mix-in base class.
76 +
77 + The subclass must implement get_write_buffer_size(). It must call
78 + _maybe_pause_protocol() whenever the write buffer size increases,
79 + and _maybe_resume_protocol() whenever it decreases. It may also
80 + override set_write_buffer_limits() (e.g. to specify different
81 + defaults).
82 +
83 + The subclass constructor must call super().__init__(extra). This
84 + will call set_write_buffer_limits().
85 +
86 + The user may call set_write_buffer_limits() and
87 + get_write_buffer_size(), and their protocol's pause_writing() and
88 + resume_writing() may be called.
89 + """
90 +
91 + def __init__(self, extra=None, loop=None):
92 + super().__init__(extra)
93 + assert loop is not None
94 + self._loop = loop
95 + self._protocol_paused = False
96 + self._set_write_buffer_limits()
97 +
98 + def _maybe_pause_protocol(self):
99 + size = self.get_write_buffer_size()
100 + if size <= self._high_water:
101 + return
102 + if not self._protocol_paused:
103 + self._protocol_paused = True
104 + try:
105 + self._protocol.pause_writing()
106 + except Exception as exc:
107 + self._loop.call_exception_handler({
108 + 'message': 'protocol.pause_writing() failed',
109 + 'exception': exc,
110 + 'transport': self,
111 + 'protocol': self._protocol,
112 + })
113 +
114 + def _maybe_resume_protocol(self):
115 + if (self._protocol_paused and
116 + self.get_write_buffer_size() <= self._low_water):
117 + self._protocol_paused = False
118 + try:
119 + self._protocol.resume_writing()
120 + except Exception as exc:
121 + self._loop.call_exception_handler({
122 + 'message': 'protocol.resume_writing() failed',
123 + 'exception': exc,
124 + 'transport': self,
125 + 'protocol': self._protocol,
126 + })
127 +
128 + def get_write_buffer_limits(self):
129 + return (self._low_water, self._high_water)
130 +
131 + def _set_write_buffer_limits(self, high=None, low=None):
132 + if high is None:
133 + if low is None:
134 + high = 64*1024
135 + else:
136 + high = 4*low
137 + if low is None:
138 + low = high // 4
139 + if not high >= low >= 0:
140 + raise ValueError('high (%r) must be >= low (%r) must be >= 0' %
141 + (high, low))
142 + self._high_water = high
143 + self._low_water = low
144 +
145 + def set_write_buffer_limits(self, high=None, low=None):
146 + self._set_write_buffer_limits(high=high, low=low)
147 + self._maybe_pause_protocol()
148 +
149 + def get_write_buffer_size(self):
150 + raise NotImplementedError
151 diff --git a/pym/portage/util/futures/unix_events.py b/pym/portage/util/futures/unix_events.py
152 index 9d84ab6aa..a1d7cac80 100644
153 --- a/pym/portage/util/futures/unix_events.py
154 +++ b/pym/portage/util/futures/unix_events.py
155 @@ -9,19 +9,25 @@ __all__ = (
156 try:
157 from asyncio.base_subprocess import BaseSubprocessTransport as _BaseSubprocessTransport
158 from asyncio.unix_events import AbstractChildWatcher as _AbstractChildWatcher
159 - from asyncio.transports import ReadTransport as _ReadTransport
160 + from asyncio.transports import (
161 + ReadTransport as _ReadTransport,
162 + WriteTransport as _WriteTransport,
163 + )
164 except ImportError:
165 _AbstractChildWatcher = object
166 _BaseSubprocessTransport = object
167 _ReadTransport = object
168 + _WriteTransport = object
169
170 import errno
171 import fcntl
172 import functools
173 import logging
174 import os
175 +import socket
176 import stat
177 import subprocess
178 +import sys
179
180 from portage.util._eventloop.global_event_loop import (
181 global_event_loop as _global_event_loop,
182 @@ -30,7 +36,7 @@ from portage.util.futures import (
183 asyncio,
184 events,
185 )
186 -from portage.util.futures.futures import Future
187 +from portage.util.futures.transports import _FlowControlMixin
188
189
190 class _PortageEventLoop(events.AbstractEventLoop):
191 @@ -117,6 +123,35 @@ class _PortageEventLoop(events.AbstractEventLoop):
192 waiter.add_done_callback(waiter_callback)
193 return result
194
195 + def connect_write_pipe(self, protocol_factory, pipe):
196 + """
197 + Register write pipe in event loop. Set the pipe to non-blocking mode.
198 +
199 + @type protocol_factory: callable
200 + @param protocol_factory: must instantiate object with Protocol interface
201 + @type pipe: file
202 + @param pipe: a pipe to write to
203 + @rtype: asyncio.Future
204 + @return: Return pair (transport, protocol), where transport supports the
205 + WriteTransport interface.
206 + """
207 + protocol = protocol_factory()
208 + result = self.create_future()
209 + waiter = self.create_future()
210 + transport = self._make_write_pipe_transport(pipe, protocol, waiter)
211 +
212 + def waiter_callback(waiter):
213 + try:
214 + waiter.result()
215 + except Exception as e:
216 + transport.close()
217 + result.set_exception(e)
218 + else:
219 + result.set_result((transport, protocol))
220 +
221 + waiter.add_done_callback(waiter_callback)
222 + return result
223 +
224 def subprocess_exec(self, protocol_factory, program, *args, **kwargs):
225 """
226 Run subprocesses asynchronously using the subprocess module.
227 @@ -140,11 +175,6 @@ class _PortageEventLoop(events.AbstractEventLoop):
228 stdout = kwargs.pop('stdout', subprocess.PIPE)
229 stderr = kwargs.pop('stderr', subprocess.PIPE)
230
231 - if stdin == subprocess.PIPE:
232 - # Requires connect_write_pipe implementation, for example
233 - # see asyncio.unix_events._UnixWritePipeTransport.
234 - raise NotImplementedError()
235 -
236 universal_newlines = kwargs.pop('universal_newlines', False)
237 shell = kwargs.pop('shell', False)
238 bufsize = kwargs.pop('bufsize', 0)
239 @@ -171,6 +201,10 @@ class _PortageEventLoop(events.AbstractEventLoop):
240 extra=None):
241 return _UnixReadPipeTransport(self, pipe, protocol, waiter, extra)
242
243 + def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
244 + extra=None):
245 + return _UnixWritePipeTransport(self, pipe, protocol, waiter, extra)
246 +
247 def _make_subprocess_transport(self, result, protocol, args, shell,
248 stdin, stdout, stderr, bufsize, extra=None, **kwargs):
249 waiter = self.create_future()
250 @@ -314,18 +348,221 @@ class _UnixReadPipeTransport(_ReadTransport):
251 self._loop = None
252
253
254 +class _UnixWritePipeTransport(_FlowControlMixin, _WriteTransport):
255 + """
256 + This is identical to the standard library's private
257 + asyncio.unix_events._UnixWritePipeTransport class, except that it
258 + only calls public AbstractEventLoop methods.
259 + """
260 +
261 + def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
262 + super().__init__(extra, loop)
263 + self._extra['pipe'] = pipe
264 + self._pipe = pipe
265 + self._fileno = pipe.fileno()
266 + self._protocol = protocol
267 + self._buffer = bytearray()
268 + self._conn_lost = 0
269 + self._closing = False # Set when close() or write_eof() called.
270 +
271 + mode = os.fstat(self._fileno).st_mode
272 + is_char = stat.S_ISCHR(mode)
273 + is_fifo = stat.S_ISFIFO(mode)
274 + is_socket = stat.S_ISSOCK(mode)
275 + if not (is_char or is_fifo or is_socket):
276 + self._pipe = None
277 + self._fileno = None
278 + self._protocol = None
279 + raise ValueError("Pipe transport is only for "
280 + "pipes, sockets and character devices")
281 +
282 + _set_nonblocking(self._fileno)
283 + self._loop.call_soon(self._protocol.connection_made, self)
284 +
285 + # On AIX, the reader trick (to be notified when the read end of the
286 + # socket is closed) only works for sockets. On other platforms it
287 + # works for pipes and sockets. (Exception: OS X 10.4? Issue #19294.)
288 + if is_socket or (is_fifo and not sys.platform.startswith("aix")):
289 + # only start reading when connection_made() has been called
290 + self._loop.call_soon(self._loop.add_reader,
291 + self._fileno, self._read_ready)
292 +
293 + if waiter is not None:
294 + # only wake up the waiter when connection_made() has been called
295 + self._loop.call_soon(
296 + lambda: None if waiter.cancelled() else waiter.set_result(None))
297 +
298 + def get_write_buffer_size(self):
299 + return len(self._buffer)
300 +
301 + def _read_ready(self):
302 + # Pipe was closed by peer.
303 + if self._loop.get_debug():
304 + logging.info("%r was closed by peer", self)
305 + if self._buffer:
306 + self._close(BrokenPipeError())
307 + else:
308 + self._close()
309 +
310 + def write(self, data):
311 + assert isinstance(data, (bytes, bytearray, memoryview)), repr(data)
312 + if isinstance(data, bytearray):
313 + data = memoryview(data)
314 + if not data:
315 + return
316 +
317 + if self._conn_lost or self._closing:
318 + self._conn_lost += 1
319 + return
320 +
321 + if not self._buffer:
322 + # Attempt to send it right away first.
323 + try:
324 + n = os.write(self._fileno, data)
325 + except (BlockingIOError, InterruptedError):
326 + n = 0
327 + except Exception as exc:
328 + self._conn_lost += 1
329 + self._fatal_error(exc, 'Fatal write error on pipe transport')
330 + return
331 + if n == len(data):
332 + return
333 + elif n > 0:
334 + data = memoryview(data)[n:]
335 + self._loop.add_writer(self._fileno, self._write_ready)
336 +
337 + self._buffer += data
338 + self._maybe_pause_protocol()
339 +
340 + def _write_ready(self):
341 + assert self._buffer, 'Data should not be empty'
342 +
343 + try:
344 + n = os.write(self._fileno, self._buffer)
345 + except (BlockingIOError, InterruptedError):
346 + pass
347 + except Exception as exc:
348 + self._buffer.clear()
349 + self._conn_lost += 1
350 + # Remove writer here, _fatal_error() doesn't it
351 + # because _buffer is empty.
352 + self._loop.remove_writer(self._fileno)
353 + self._fatal_error(exc, 'Fatal write error on pipe transport')
354 + else:
355 + if n == len(self._buffer):
356 + self._buffer.clear()
357 + self._loop.remove_writer(self._fileno)
358 + self._maybe_resume_protocol() # May append to buffer.
359 + if self._closing:
360 + self._loop.remove_reader(self._fileno)
361 + self._call_connection_lost(None)
362 + return
363 + elif n > 0:
364 + del self._buffer[:n]
365 +
366 + def can_write_eof(self):
367 + return True
368 +
369 + def write_eof(self):
370 + if self._closing:
371 + return
372 + assert self._pipe
373 + self._closing = True
374 + if not self._buffer:
375 + self._loop.remove_reader(self._fileno)
376 + self._loop.call_soon(self._call_connection_lost, None)
377 +
378 + def set_protocol(self, protocol):
379 + self._protocol = protocol
380 +
381 + def get_protocol(self):
382 + return self._protocol
383 +
384 + def is_closing(self):
385 + return self._closing
386 +
387 + def close(self):
388 + if self._pipe is not None and not self._closing:
389 + # write_eof is all what we needed to close the write pipe
390 + self.write_eof()
391 +
392 + def abort(self):
393 + self._close(None)
394 +
395 + def _fatal_error(self, exc, message='Fatal error on pipe transport'):
396 + # should be called by exception handler only
397 + if isinstance(exc,
398 + (BrokenPipeError, ConnectionResetError, ConnectionAbortedError)):
399 + if self._loop.get_debug():
400 + logging.debug("%r: %s", self, message, exc_info=True)
401 + else:
402 + self._loop.call_exception_handler({
403 + 'message': message,
404 + 'exception': exc,
405 + 'transport': self,
406 + 'protocol': self._protocol,
407 + })
408 + self._close(exc)
409 +
410 + def _close(self, exc=None):
411 + self._closing = True
412 + if self._buffer:
413 + self._loop.remove_writer(self._fileno)
414 + self._buffer.clear()
415 + self._loop.remove_reader(self._fileno)
416 + self._loop.call_soon(self._call_connection_lost, exc)
417 +
418 + def _call_connection_lost(self, exc):
419 + try:
420 + self._protocol.connection_lost(exc)
421 + finally:
422 + self._pipe.close()
423 + self._pipe = None
424 + self._protocol = None
425 + self._loop = None
426 +
427 +
428 +if hasattr(os, 'set_inheritable'):
429 + # Python 3.4 and newer
430 + _set_inheritable = os.set_inheritable
431 +else:
432 + def _set_inheritable(fd, inheritable):
433 + cloexec_flag = getattr(fcntl, 'FD_CLOEXEC', 1)
434 +
435 + old = fcntl.fcntl(fd, fcntl.F_GETFD)
436 + if not inheritable:
437 + fcntl.fcntl(fd, fcntl.F_SETFD, old | cloexec_flag)
438 + else:
439 + fcntl.fcntl(fd, fcntl.F_SETFD, old & ~cloexec_flag)
440 +
441 +
442 class _UnixSubprocessTransport(_BaseSubprocessTransport):
443 """
444 This is identical to the standard library's private
445 - asyncio.unix_events._UnixSubprocessTransport class, except that
446 - subprocess.PIPE is not implemented for stdin, since that would
447 - require connect_write_pipe support in the event loop. For example,
448 - see the asyncio.unix_events._UnixWritePipeTransport class.
449 + asyncio.unix_events._UnixSubprocessTransport class, except that it
450 + only calls public AbstractEventLoop methods.
451 """
452 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
453 + stdin_w = None
454 + if stdin == subprocess.PIPE:
455 + # Use a socket pair for stdin, since not all platforms
456 + # support selecting read events on the write end of a
457 + # socket (which we use in order to detect closing of the
458 + # other end). Notably this is needed on AIX, and works
459 + # just fine on other platforms.
460 + stdin, stdin_w = socket.socketpair()
461 +
462 + # Mark the write end of the stdin pipe as non-inheritable,
463 + # needed by close_fds=False on Python 3.3 and older
464 + # (Python 3.4 implements the PEP 446, socketpair returns
465 + # non-inheritable sockets)
466 + _set_inheritable(stdin_w.fileno(), False)
467 self._proc = subprocess.Popen(
468 args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
469 universal_newlines=False, bufsize=bufsize, **kwargs)
470 + if stdin_w is not None:
471 + stdin.close()
472 + self._proc.stdin = os.fdopen(stdin_w.detach(), 'wb', bufsize)
473
474
475 class AbstractChildWatcher(_AbstractChildWatcher):
476 --
477 2.13.6