1 |
In python versions that support asyncio, this allows API consumers |
2 |
to use subprocess.PIPE for asyncio.create_subprocess_exec() stdin |
3 |
parameters. |
4 |
|
5 |
Bug: https://bugs.gentoo.org/649588 |
6 |
--- |
7 |
.../util/futures/asyncio/test_subprocess_exec.py | 34 +++ |
8 |
pym/portage/util/futures/transports.py | 90 +++++++ |
9 |
pym/portage/util/futures/unix_events.py | 259 ++++++++++++++++++++- |
10 |
3 files changed, 372 insertions(+), 11 deletions(-) |
11 |
create mode 100644 pym/portage/util/futures/transports.py |
12 |
|
13 |
diff --git a/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py b/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py |
14 |
index 94984fc93..8c8c395ca 100644 |
15 |
--- a/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py |
16 |
+++ b/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py |
17 |
@@ -191,3 +191,37 @@ class SubprocessExecTestCase(TestCase): |
18 |
self.assertEqual(loop.run_until_complete(proc.wait()), os.EX_OK) |
19 |
|
20 |
self._run_test(test) |
21 |
+ |
22 |
+ def testWriteTransport(self): |
23 |
+ """ |
24 |
+ Test asyncio.create_subprocess_exec(stdin=subprocess.PIPE) which |
25 |
+ requires an AbstractEventLoop.connect_write_pipe implementation |
26 |
+ (and a WriteTransport implementation for it to return). |
27 |
+ """ |
28 |
+ if not hasattr(asyncio, 'create_subprocess_exec'): |
29 |
+ self.skipTest('create_subprocess_exec not implemented for python2') |
30 |
+ |
31 |
+ stdin_data = b'hello world' |
32 |
+ cat_binary = find_binary("cat") |
33 |
+ self.assertNotEqual(cat_binary, None) |
34 |
+ cat_binary = cat_binary.encode() |
35 |
+ |
36 |
+ def test(loop): |
37 |
+ proc = loop.run_until_complete( |
38 |
+ asyncio.create_subprocess_exec( |
39 |
+ cat_binary, |
40 |
+ stdin=subprocess.PIPE, |
41 |
+ stdout=subprocess.PIPE, stderr=subprocess.STDOUT)) |
42 |
+ |
43 |
+ # This buffers data when necessary to avoid blocking. |
44 |
+ proc.stdin.write(stdin_data) |
45 |
+ # Any buffered data is written asynchronously after the |
46 |
+ # close method is called. |
47 |
+ proc.stdin.close() |
48 |
+ |
49 |
+ self.assertEqual( |
50 |
+ loop.run_until_complete(proc.stdout.read()), |
51 |
+ stdin_data) |
52 |
+ self.assertEqual(loop.run_until_complete(proc.wait()), os.EX_OK) |
53 |
+ |
54 |
+ self._run_test(test) |
55 |
diff --git a/pym/portage/util/futures/transports.py b/pym/portage/util/futures/transports.py |
56 |
new file mode 100644 |
57 |
index 000000000..60ea93073 |
58 |
--- /dev/null |
59 |
+++ b/pym/portage/util/futures/transports.py |
60 |
@@ -0,0 +1,90 @@ |
61 |
+# Copyright 2018 Gentoo Foundation |
62 |
+# Distributed under the terms of the GNU General Public License v2 |
63 |
+ |
64 |
+try: |
65 |
+ from asyncio.transports import Transport as _Transport |
66 |
+except ImportError: |
67 |
+ _Transport = object |
68 |
+ |
69 |
+ |
70 |
+class _FlowControlMixin(_Transport): |
71 |
+ """ |
72 |
+ This is identical to the standard library's private |
73 |
+ asyncio.transports._FlowControlMixin class. |
74 |
+ |
75 |
+ All the logic for (write) flow control in a mix-in base class. |
76 |
+ |
77 |
+ The subclass must implement get_write_buffer_size(). It must call |
78 |
+ _maybe_pause_protocol() whenever the write buffer size increases, |
79 |
+ and _maybe_resume_protocol() whenever it decreases. It may also |
80 |
+ override set_write_buffer_limits() (e.g. to specify different |
81 |
+ defaults). |
82 |
+ |
83 |
+ The subclass constructor must call super().__init__(extra). This |
84 |
+ will call set_write_buffer_limits(). |
85 |
+ |
86 |
+ The user may call set_write_buffer_limits() and |
87 |
+ get_write_buffer_size(), and their protocol's pause_writing() and |
88 |
+ resume_writing() may be called. |
89 |
+ """ |
90 |
+ |
91 |
+ def __init__(self, extra=None, loop=None): |
92 |
+ super().__init__(extra) |
93 |
+ assert loop is not None |
94 |
+ self._loop = loop |
95 |
+ self._protocol_paused = False |
96 |
+ self._set_write_buffer_limits() |
97 |
+ |
98 |
+ def _maybe_pause_protocol(self): |
99 |
+ size = self.get_write_buffer_size() |
100 |
+ if size <= self._high_water: |
101 |
+ return |
102 |
+ if not self._protocol_paused: |
103 |
+ self._protocol_paused = True |
104 |
+ try: |
105 |
+ self._protocol.pause_writing() |
106 |
+ except Exception as exc: |
107 |
+ self._loop.call_exception_handler({ |
108 |
+ 'message': 'protocol.pause_writing() failed', |
109 |
+ 'exception': exc, |
110 |
+ 'transport': self, |
111 |
+ 'protocol': self._protocol, |
112 |
+ }) |
113 |
+ |
114 |
+ def _maybe_resume_protocol(self): |
115 |
+ if (self._protocol_paused and |
116 |
+ self.get_write_buffer_size() <= self._low_water): |
117 |
+ self._protocol_paused = False |
118 |
+ try: |
119 |
+ self._protocol.resume_writing() |
120 |
+ except Exception as exc: |
121 |
+ self._loop.call_exception_handler({ |
122 |
+ 'message': 'protocol.resume_writing() failed', |
123 |
+ 'exception': exc, |
124 |
+ 'transport': self, |
125 |
+ 'protocol': self._protocol, |
126 |
+ }) |
127 |
+ |
128 |
+ def get_write_buffer_limits(self): |
129 |
+ return (self._low_water, self._high_water) |
130 |
+ |
131 |
+ def _set_write_buffer_limits(self, high=None, low=None): |
132 |
+ if high is None: |
133 |
+ if low is None: |
134 |
+ high = 64*1024 |
135 |
+ else: |
136 |
+ high = 4*low |
137 |
+ if low is None: |
138 |
+ low = high // 4 |
139 |
+ if not high >= low >= 0: |
140 |
+ raise ValueError('high (%r) must be >= low (%r) must be >= 0' % |
141 |
+ (high, low)) |
142 |
+ self._high_water = high |
143 |
+ self._low_water = low |
144 |
+ |
145 |
+ def set_write_buffer_limits(self, high=None, low=None): |
146 |
+ self._set_write_buffer_limits(high=high, low=low) |
147 |
+ self._maybe_pause_protocol() |
148 |
+ |
149 |
+ def get_write_buffer_size(self): |
150 |
+ raise NotImplementedError |
151 |
diff --git a/pym/portage/util/futures/unix_events.py b/pym/portage/util/futures/unix_events.py |
152 |
index 9d84ab6aa..a1d7cac80 100644 |
153 |
--- a/pym/portage/util/futures/unix_events.py |
154 |
+++ b/pym/portage/util/futures/unix_events.py |
155 |
@@ -9,19 +9,25 @@ __all__ = ( |
156 |
try: |
157 |
from asyncio.base_subprocess import BaseSubprocessTransport as _BaseSubprocessTransport |
158 |
from asyncio.unix_events import AbstractChildWatcher as _AbstractChildWatcher |
159 |
- from asyncio.transports import ReadTransport as _ReadTransport |
160 |
+ from asyncio.transports import ( |
161 |
+ ReadTransport as _ReadTransport, |
162 |
+ WriteTransport as _WriteTransport, |
163 |
+ ) |
164 |
except ImportError: |
165 |
_AbstractChildWatcher = object |
166 |
_BaseSubprocessTransport = object |
167 |
_ReadTransport = object |
168 |
+ _WriteTransport = object |
169 |
|
170 |
import errno |
171 |
import fcntl |
172 |
import functools |
173 |
import logging |
174 |
import os |
175 |
+import socket |
176 |
import stat |
177 |
import subprocess |
178 |
+import sys |
179 |
|
180 |
from portage.util._eventloop.global_event_loop import ( |
181 |
global_event_loop as _global_event_loop, |
182 |
@@ -30,7 +36,7 @@ from portage.util.futures import ( |
183 |
asyncio, |
184 |
events, |
185 |
) |
186 |
-from portage.util.futures.futures import Future |
187 |
+from portage.util.futures.transports import _FlowControlMixin |
188 |
|
189 |
|
190 |
class _PortageEventLoop(events.AbstractEventLoop): |
191 |
@@ -117,6 +123,35 @@ class _PortageEventLoop(events.AbstractEventLoop): |
192 |
waiter.add_done_callback(waiter_callback) |
193 |
return result |
194 |
|
195 |
+ def connect_write_pipe(self, protocol_factory, pipe): |
196 |
+ """ |
197 |
+ Register write pipe in event loop. Set the pipe to non-blocking mode. |
198 |
+ |
199 |
+ @type protocol_factory: callable |
200 |
+ @param protocol_factory: must instantiate object with Protocol interface |
201 |
+ @type pipe: file |
202 |
+ @param pipe: a pipe to write to |
203 |
+ @rtype: asyncio.Future |
204 |
+ @return: Return pair (transport, protocol), where transport supports the |
205 |
+ WriteTransport interface. |
206 |
+ """ |
207 |
+ protocol = protocol_factory() |
208 |
+ result = self.create_future() |
209 |
+ waiter = self.create_future() |
210 |
+ transport = self._make_write_pipe_transport(pipe, protocol, waiter) |
211 |
+ |
212 |
+ def waiter_callback(waiter): |
213 |
+ try: |
214 |
+ waiter.result() |
215 |
+ except Exception as e: |
216 |
+ transport.close() |
217 |
+ result.set_exception(e) |
218 |
+ else: |
219 |
+ result.set_result((transport, protocol)) |
220 |
+ |
221 |
+ waiter.add_done_callback(waiter_callback) |
222 |
+ return result |
223 |
+ |
224 |
def subprocess_exec(self, protocol_factory, program, *args, **kwargs): |
225 |
""" |
226 |
Run subprocesses asynchronously using the subprocess module. |
227 |
@@ -140,11 +175,6 @@ class _PortageEventLoop(events.AbstractEventLoop): |
228 |
stdout = kwargs.pop('stdout', subprocess.PIPE) |
229 |
stderr = kwargs.pop('stderr', subprocess.PIPE) |
230 |
|
231 |
- if stdin == subprocess.PIPE: |
232 |
- # Requires connect_write_pipe implementation, for example |
233 |
- # see asyncio.unix_events._UnixWritePipeTransport. |
234 |
- raise NotImplementedError() |
235 |
- |
236 |
universal_newlines = kwargs.pop('universal_newlines', False) |
237 |
shell = kwargs.pop('shell', False) |
238 |
bufsize = kwargs.pop('bufsize', 0) |
239 |
@@ -171,6 +201,10 @@ class _PortageEventLoop(events.AbstractEventLoop): |
240 |
extra=None): |
241 |
return _UnixReadPipeTransport(self, pipe, protocol, waiter, extra) |
242 |
|
243 |
+ def _make_write_pipe_transport(self, pipe, protocol, waiter=None, |
244 |
+ extra=None): |
245 |
+ return _UnixWritePipeTransport(self, pipe, protocol, waiter, extra) |
246 |
+ |
247 |
def _make_subprocess_transport(self, result, protocol, args, shell, |
248 |
stdin, stdout, stderr, bufsize, extra=None, **kwargs): |
249 |
waiter = self.create_future() |
250 |
@@ -314,18 +348,221 @@ class _UnixReadPipeTransport(_ReadTransport): |
251 |
self._loop = None |
252 |
|
253 |
|
254 |
+class _UnixWritePipeTransport(_FlowControlMixin, _WriteTransport): |
255 |
+ """ |
256 |
+ This is identical to the standard library's private |
257 |
+ asyncio.unix_events._UnixWritePipeTransport class, except that it |
258 |
+ only calls public AbstractEventLoop methods. |
259 |
+ """ |
260 |
+ |
261 |
+ def __init__(self, loop, pipe, protocol, waiter=None, extra=None): |
262 |
+ super().__init__(extra, loop) |
263 |
+ self._extra['pipe'] = pipe |
264 |
+ self._pipe = pipe |
265 |
+ self._fileno = pipe.fileno() |
266 |
+ self._protocol = protocol |
267 |
+ self._buffer = bytearray() |
268 |
+ self._conn_lost = 0 |
269 |
+ self._closing = False # Set when close() or write_eof() called. |
270 |
+ |
271 |
+ mode = os.fstat(self._fileno).st_mode |
272 |
+ is_char = stat.S_ISCHR(mode) |
273 |
+ is_fifo = stat.S_ISFIFO(mode) |
274 |
+ is_socket = stat.S_ISSOCK(mode) |
275 |
+ if not (is_char or is_fifo or is_socket): |
276 |
+ self._pipe = None |
277 |
+ self._fileno = None |
278 |
+ self._protocol = None |
279 |
+ raise ValueError("Pipe transport is only for " |
280 |
+ "pipes, sockets and character devices") |
281 |
+ |
282 |
+ _set_nonblocking(self._fileno) |
283 |
+ self._loop.call_soon(self._protocol.connection_made, self) |
284 |
+ |
285 |
+ # On AIX, the reader trick (to be notified when the read end of the |
286 |
+ # socket is closed) only works for sockets. On other platforms it |
287 |
+ # works for pipes and sockets. (Exception: OS X 10.4? Issue #19294.) |
288 |
+ if is_socket or (is_fifo and not sys.platform.startswith("aix")): |
289 |
+ # only start reading when connection_made() has been called |
290 |
+ self._loop.call_soon(self._loop.add_reader, |
291 |
+ self._fileno, self._read_ready) |
292 |
+ |
293 |
+ if waiter is not None: |
294 |
+ # only wake up the waiter when connection_made() has been called |
295 |
+ self._loop.call_soon( |
296 |
+ lambda: None if waiter.cancelled() else waiter.set_result(None)) |
297 |
+ |
298 |
+ def get_write_buffer_size(self): |
299 |
+ return len(self._buffer) |
300 |
+ |
301 |
+ def _read_ready(self): |
302 |
+ # Pipe was closed by peer. |
303 |
+ if self._loop.get_debug(): |
304 |
+ logging.info("%r was closed by peer", self) |
305 |
+ if self._buffer: |
306 |
+ self._close(BrokenPipeError()) |
307 |
+ else: |
308 |
+ self._close() |
309 |
+ |
310 |
+ def write(self, data): |
311 |
+ assert isinstance(data, (bytes, bytearray, memoryview)), repr(data) |
312 |
+ if isinstance(data, bytearray): |
313 |
+ data = memoryview(data) |
314 |
+ if not data: |
315 |
+ return |
316 |
+ |
317 |
+ if self._conn_lost or self._closing: |
318 |
+ self._conn_lost += 1 |
319 |
+ return |
320 |
+ |
321 |
+ if not self._buffer: |
322 |
+ # Attempt to send it right away first. |
323 |
+ try: |
324 |
+ n = os.write(self._fileno, data) |
325 |
+ except (BlockingIOError, InterruptedError): |
326 |
+ n = 0 |
327 |
+ except Exception as exc: |
328 |
+ self._conn_lost += 1 |
329 |
+ self._fatal_error(exc, 'Fatal write error on pipe transport') |
330 |
+ return |
331 |
+ if n == len(data): |
332 |
+ return |
333 |
+ elif n > 0: |
334 |
+ data = memoryview(data)[n:] |
335 |
+ self._loop.add_writer(self._fileno, self._write_ready) |
336 |
+ |
337 |
+ self._buffer += data |
338 |
+ self._maybe_pause_protocol() |
339 |
+ |
340 |
+ def _write_ready(self): |
341 |
+ assert self._buffer, 'Data should not be empty' |
342 |
+ |
343 |
+ try: |
344 |
+ n = os.write(self._fileno, self._buffer) |
345 |
+ except (BlockingIOError, InterruptedError): |
346 |
+ pass |
347 |
+ except Exception as exc: |
348 |
+ self._buffer.clear() |
349 |
+ self._conn_lost += 1 |
350 |
+ # Remove writer here, _fatal_error() doesn't it |
351 |
+ # because _buffer is empty. |
352 |
+ self._loop.remove_writer(self._fileno) |
353 |
+ self._fatal_error(exc, 'Fatal write error on pipe transport') |
354 |
+ else: |
355 |
+ if n == len(self._buffer): |
356 |
+ self._buffer.clear() |
357 |
+ self._loop.remove_writer(self._fileno) |
358 |
+ self._maybe_resume_protocol() # May append to buffer. |
359 |
+ if self._closing: |
360 |
+ self._loop.remove_reader(self._fileno) |
361 |
+ self._call_connection_lost(None) |
362 |
+ return |
363 |
+ elif n > 0: |
364 |
+ del self._buffer[:n] |
365 |
+ |
366 |
+ def can_write_eof(self): |
367 |
+ return True |
368 |
+ |
369 |
+ def write_eof(self): |
370 |
+ if self._closing: |
371 |
+ return |
372 |
+ assert self._pipe |
373 |
+ self._closing = True |
374 |
+ if not self._buffer: |
375 |
+ self._loop.remove_reader(self._fileno) |
376 |
+ self._loop.call_soon(self._call_connection_lost, None) |
377 |
+ |
378 |
+ def set_protocol(self, protocol): |
379 |
+ self._protocol = protocol |
380 |
+ |
381 |
+ def get_protocol(self): |
382 |
+ return self._protocol |
383 |
+ |
384 |
+ def is_closing(self): |
385 |
+ return self._closing |
386 |
+ |
387 |
+ def close(self): |
388 |
+ if self._pipe is not None and not self._closing: |
389 |
+ # write_eof is all what we needed to close the write pipe |
390 |
+ self.write_eof() |
391 |
+ |
392 |
+ def abort(self): |
393 |
+ self._close(None) |
394 |
+ |
395 |
+ def _fatal_error(self, exc, message='Fatal error on pipe transport'): |
396 |
+ # should be called by exception handler only |
397 |
+ if isinstance(exc, |
398 |
+ (BrokenPipeError, ConnectionResetError, ConnectionAbortedError)): |
399 |
+ if self._loop.get_debug(): |
400 |
+ logging.debug("%r: %s", self, message, exc_info=True) |
401 |
+ else: |
402 |
+ self._loop.call_exception_handler({ |
403 |
+ 'message': message, |
404 |
+ 'exception': exc, |
405 |
+ 'transport': self, |
406 |
+ 'protocol': self._protocol, |
407 |
+ }) |
408 |
+ self._close(exc) |
409 |
+ |
410 |
+ def _close(self, exc=None): |
411 |
+ self._closing = True |
412 |
+ if self._buffer: |
413 |
+ self._loop.remove_writer(self._fileno) |
414 |
+ self._buffer.clear() |
415 |
+ self._loop.remove_reader(self._fileno) |
416 |
+ self._loop.call_soon(self._call_connection_lost, exc) |
417 |
+ |
418 |
+ def _call_connection_lost(self, exc): |
419 |
+ try: |
420 |
+ self._protocol.connection_lost(exc) |
421 |
+ finally: |
422 |
+ self._pipe.close() |
423 |
+ self._pipe = None |
424 |
+ self._protocol = None |
425 |
+ self._loop = None |
426 |
+ |
427 |
+ |
428 |
+if hasattr(os, 'set_inheritable'): |
429 |
+ # Python 3.4 and newer |
430 |
+ _set_inheritable = os.set_inheritable |
431 |
+else: |
432 |
+ def _set_inheritable(fd, inheritable): |
433 |
+ cloexec_flag = getattr(fcntl, 'FD_CLOEXEC', 1) |
434 |
+ |
435 |
+ old = fcntl.fcntl(fd, fcntl.F_GETFD) |
436 |
+ if not inheritable: |
437 |
+ fcntl.fcntl(fd, fcntl.F_SETFD, old | cloexec_flag) |
438 |
+ else: |
439 |
+ fcntl.fcntl(fd, fcntl.F_SETFD, old & ~cloexec_flag) |
440 |
+ |
441 |
+ |
442 |
class _UnixSubprocessTransport(_BaseSubprocessTransport): |
443 |
""" |
444 |
This is identical to the standard library's private |
445 |
- asyncio.unix_events._UnixSubprocessTransport class, except that |
446 |
- subprocess.PIPE is not implemented for stdin, since that would |
447 |
- require connect_write_pipe support in the event loop. For example, |
448 |
- see the asyncio.unix_events._UnixWritePipeTransport class. |
449 |
+ asyncio.unix_events._UnixSubprocessTransport class, except that it |
450 |
+ only calls public AbstractEventLoop methods. |
451 |
""" |
452 |
def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs): |
453 |
+ stdin_w = None |
454 |
+ if stdin == subprocess.PIPE: |
455 |
+ # Use a socket pair for stdin, since not all platforms |
456 |
+ # support selecting read events on the write end of a |
457 |
+ # socket (which we use in order to detect closing of the |
458 |
+ # other end). Notably this is needed on AIX, and works |
459 |
+ # just fine on other platforms. |
460 |
+ stdin, stdin_w = socket.socketpair() |
461 |
+ |
462 |
+ # Mark the write end of the stdin pipe as non-inheritable, |
463 |
+ # needed by close_fds=False on Python 3.3 and older |
464 |
+ # (Python 3.4 implements the PEP 446, socketpair returns |
465 |
+ # non-inheritable sockets) |
466 |
+ _set_inheritable(stdin_w.fileno(), False) |
467 |
self._proc = subprocess.Popen( |
468 |
args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr, |
469 |
universal_newlines=False, bufsize=bufsize, **kwargs) |
470 |
+ if stdin_w is not None: |
471 |
+ stdin.close() |
472 |
+ self._proc.stdin = os.fdopen(stdin_w.detach(), 'wb', bufsize) |
473 |
|
474 |
|
475 |
class AbstractChildWatcher(_AbstractChildWatcher): |
476 |
-- |
477 |
2.13.6 |