Gentoo Archives: gentoo-portage-dev

From: Zac Medico <zmedico@g.o>
To: gentoo-portage-dev@l.g.o
Cc: Zac Medico <zmedico@g.o>
Subject: [gentoo-portage-dev] [PATCH] Add asyncio.create_subprocess_exec support for python2 (bug 662388)
Date: Sun, 29 Jul 2018 03:50:18
Message-Id: 20180729034557.28791-1-zmedico@gentoo.org
1 The asyncio.create_subprocess_exec function is essential for
2 using subprocesses in coroutines, so add support to do this
3 for python2. This paves the way for extensive use of coroutines
4 in portage, since coroutines are well-suited for many portage
5 tasks that involve subprocesses.
6
7 Bug: https://bugs.gentoo.org/662388
8 ---
9 .../util/futures/asyncio/test_subprocess_exec.py | 116 +++++++++------------
10 lib/portage/util/futures/_asyncio/__init__.py | 52 +++++++++
11 lib/portage/util/futures/_asyncio/process.py | 82 +++++++++++++++
12 lib/portage/util/futures/_asyncio/streams.py | 51 +++++++++
13 4 files changed, 237 insertions(+), 64 deletions(-)
14 create mode 100644 lib/portage/util/futures/_asyncio/process.py
15 create mode 100644 lib/portage/util/futures/_asyncio/streams.py
16
17 diff --git a/lib/portage/tests/util/futures/asyncio/test_subprocess_exec.py b/lib/portage/tests/util/futures/asyncio/test_subprocess_exec.py
18 index 5a812ba6a..15adfbfd6 100644
19 --- a/lib/portage/tests/util/futures/asyncio/test_subprocess_exec.py
20 +++ b/lib/portage/tests/util/futures/asyncio/test_subprocess_exec.py
21 @@ -3,61 +3,17 @@
22
23 import os
24 import subprocess
25 -
26 -try:
27 - from asyncio import create_subprocess_exec
28 -except ImportError:
29 - create_subprocess_exec = None
30 +import sys
31
32 from portage.process import find_binary
33 from portage.tests import TestCase
34 from portage.util._eventloop.global_event_loop import global_event_loop
35 from portage.util.futures import asyncio
36 +from portage.util.futures._asyncio import create_subprocess_exec
37 +from portage.util.futures._asyncio.streams import _reader as reader
38 +from portage.util.futures.compat_coroutine import coroutine, coroutine_return
39 from portage.util.futures.executor.fork import ForkExecutor
40 from portage.util.futures.unix_events import DefaultEventLoopPolicy
41 -from _emerge.PipeReader import PipeReader
42 -
43 -
44 -def reader(input_file, loop=None):
45 - """
46 - Asynchronously read a binary input file.
47 -
48 - @param input_file: binary input file
49 - @type input_file: file
50 - @param loop: event loop
51 - @type loop: EventLoop
52 - @return: bytes
53 - @rtype: asyncio.Future (or compatible)
54 - """
55 - loop = asyncio._wrap_loop(loop)
56 - future = loop.create_future()
57 - _Reader(future, input_file, loop)
58 - return future
59 -
60 -
61 -class _Reader(object):
62 - def __init__(self, future, input_file, loop):
63 - self._future = future
64 - self._pipe_reader = PipeReader(
65 - input_files={'input_file':input_file}, scheduler=loop)
66 -
67 - self._future.add_done_callback(self._cancel_callback)
68 - self._pipe_reader.addExitListener(self._eof)
69 - self._pipe_reader.start()
70 -
71 - def _cancel_callback(self, future):
72 - if future.cancelled():
73 - self._cancel()
74 -
75 - def _eof(self, pipe_reader):
76 - self._pipe_reader = None
77 - self._future.set_result(pipe_reader.getvalue())
78 -
79 - def _cancel(self):
80 - if self._pipe_reader is not None and self._pipe_reader.poll() is None:
81 - self._pipe_reader.removeExitListener(self._eof)
82 - self._pipe_reader.cancel()
83 - self._pipe_reader = None
84
85
86 class SubprocessExecTestCase(TestCase):
87 @@ -76,9 +32,6 @@ class SubprocessExecTestCase(TestCase):
88 self.assertFalse(global_event_loop().is_closed())
89
90 def testEcho(self):
91 - if create_subprocess_exec is None:
92 - self.skipTest('create_subprocess_exec not implemented for python2')
93 -
94 args_tuple = (b'hello', b'world')
95 echo_binary = find_binary("echo")
96 self.assertNotEqual(echo_binary, None)
97 @@ -98,7 +51,8 @@ class SubprocessExecTestCase(TestCase):
98 proc = loop.run_until_complete(
99 create_subprocess_exec(
100 echo_binary, *args_tuple,
101 - stdin=devnull, stdout=stdout_pw, stderr=stdout_pw))
102 + stdin=devnull, stdout=stdout_pw, stderr=stdout_pw,
103 + loop=loop))
104
105 # This belongs exclusively to the subprocess now.
106 stdout_pw.close()
107 @@ -110,6 +64,41 @@ class SubprocessExecTestCase(TestCase):
108 loop.run_until_complete(proc.wait()), os.EX_OK)
109 self.assertEqual(
110 tuple(loop.run_until_complete(output).split()), args_tuple)
111 +
112 + @coroutine
113 + def test_coroutine(loop=None):
114 + proc = (yield create_subprocess_exec(echo_binary, *args_tuple,
115 + stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
116 + loop=loop))
117 +
118 + out, err = (yield proc.communicate())
119 + self.assertEqual(tuple(out.split()), args_tuple)
120 + self.assertEqual(proc.returncode, os.EX_OK)
121 +
122 + proc = (yield create_subprocess_exec(
123 + 'bash', '-c', 'echo foo; echo bar 1>&2;',
124 + stdout=subprocess.PIPE, stderr=subprocess.PIPE,
125 + loop=loop))
126 +
127 + out, err = (yield proc.communicate())
128 + self.assertEqual(out, b'foo\n')
129 + self.assertEqual(err, b'bar\n')
130 + self.assertEqual(proc.returncode, os.EX_OK)
131 +
132 + proc = (yield create_subprocess_exec(
133 + 'bash', '-c', 'echo foo; echo bar 1>&2;',
134 + stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
135 + loop=loop))
136 +
137 + out, err = (yield proc.communicate())
138 + self.assertEqual(out, b'foo\nbar\n')
139 + self.assertEqual(err, None)
140 + self.assertEqual(proc.returncode, os.EX_OK)
141 +
142 + coroutine_return('success')
143 +
144 + self.assertEqual('success',
145 + loop.run_until_complete(test_coroutine(loop=loop)))
146 finally:
147 if output is not None and not output.done():
148 output.cancel()
149 @@ -119,9 +108,6 @@ class SubprocessExecTestCase(TestCase):
150 self._run_test(test)
151
152 def testCat(self):
153 - if create_subprocess_exec is None:
154 - self.skipTest('create_subprocess_exec not implemented for python2')
155 -
156 stdin_data = b'hello world'
157 cat_binary = find_binary("cat")
158 self.assertNotEqual(cat_binary, None)
159 @@ -143,9 +129,9 @@ class SubprocessExecTestCase(TestCase):
160 output = None
161 try:
162 proc = loop.run_until_complete(
163 - create_subprocess_exec(
164 - cat_binary,
165 - stdin=stdin_pr, stdout=stdout_pw, stderr=stdout_pw))
166 + create_subprocess_exec(cat_binary,
167 + stdin=stdin_pr, stdout=stdout_pw, stderr=stdout_pw,
168 + loop=loop))
169
170 # These belong exclusively to the subprocess now.
171 stdout_pw.close()
172 @@ -178,8 +164,8 @@ class SubprocessExecTestCase(TestCase):
173 requires an AbstractEventLoop.connect_read_pipe implementation
174 (and a ReadTransport implementation for it to return).
175 """
176 - if create_subprocess_exec is None:
177 - self.skipTest('create_subprocess_exec not implemented for python2')
178 + if sys.version_info.major < 3:
179 + self.skipTest('ReadTransport not implemented for python2')
180
181 args_tuple = (b'hello', b'world')
182 echo_binary = find_binary("echo")
183 @@ -192,7 +178,8 @@ class SubprocessExecTestCase(TestCase):
184 create_subprocess_exec(
185 echo_binary, *args_tuple,
186 stdin=devnull,
187 - stdout=subprocess.PIPE, stderr=subprocess.STDOUT))
188 + stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
189 + loop=loop))
190
191 self.assertEqual(
192 tuple(loop.run_until_complete(proc.stdout.read()).split()),
193 @@ -207,8 +194,8 @@ class SubprocessExecTestCase(TestCase):
194 requires an AbstractEventLoop.connect_write_pipe implementation
195 (and a WriteTransport implementation for it to return).
196 """
197 - if create_subprocess_exec is None:
198 - self.skipTest('create_subprocess_exec not implemented for python2')
199 + if sys.version_info.major < 3:
200 + self.skipTest('WriteTransport not implemented for python2')
201
202 stdin_data = b'hello world'
203 cat_binary = find_binary("cat")
204 @@ -220,7 +207,8 @@ class SubprocessExecTestCase(TestCase):
205 create_subprocess_exec(
206 cat_binary,
207 stdin=subprocess.PIPE,
208 - stdout=subprocess.PIPE, stderr=subprocess.STDOUT))
209 + stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
210 + loop=loop))
211
212 # This buffers data when necessary to avoid blocking.
213 proc.stdin.write(stdin_data)
214 diff --git a/lib/portage/util/futures/_asyncio/__init__.py b/lib/portage/util/futures/_asyncio/__init__.py
215 index acfd59396..e3f678086 100644
216 --- a/lib/portage/util/futures/_asyncio/__init__.py
217 +++ b/lib/portage/util/futures/_asyncio/__init__.py
218 @@ -20,6 +20,9 @@ __all__ = (
219 'wait',
220 )
221
222 +import subprocess
223 +import sys
224 +
225 try:
226 import asyncio as _real_asyncio
227 except ImportError:
228 @@ -45,6 +48,7 @@ from portage.util.futures.futures import (
229 InvalidStateError,
230 TimeoutError,
231 )
232 +from portage.util.futures._asyncio.process import _Process
233 from portage.util.futures._asyncio.tasks import (
234 ALL_COMPLETED,
235 FIRST_COMPLETED,
236 @@ -105,6 +109,48 @@ def set_child_watcher(watcher):
237 return get_event_loop_policy().set_child_watcher(watcher)
238
239
240 +def create_subprocess_exec(*args, **kwargs):
241 + """
242 + Create a subprocess.
243 +
244 + @param args: program and arguments
245 + @type args: str
246 + @param stdin: stdin file descriptor
247 + @type stdin: file or int
248 + @param stdout: stdout file descriptor
249 + @type stdout: file or int
250 + @param stderr: stderr file descriptor
251 + @type stderr: file or int
252 + @param close_fds: close file descriptors
253 + @type close_fds: bool
254 + @param loop: asyncio.AbstractEventLoop (or compatible)
255 + @type loop: event loop
256 + @type kwargs: varies
257 + @param kwargs: subprocess.Popen parameters
258 + @rtype: asyncio.Future (or compatible)
259 + @return: subset of asyncio.subprocess.Process interface
260 + """
261 + loop = _wrap_loop(kwargs.pop('loop', None))
262 + if _asyncio_enabled and isinstance(loop, _AsyncioEventLoop):
263 + # Use the real asyncio loop and create_subprocess_exec.
264 + return _real_asyncio.create_subprocess_exec(*args, loop=loop._loop, **kwargs)
265 +
266 + if sys.version_info < (3, 4):
267 + # Python 3.4 and later implement PEP 446, which makes newly
268 + # created file descriptors non-inheritable by default.
269 + kwargs.setdefault('close_fds', True)
270 +
271 + result = loop.create_future()
272 +
273 + result.set_result(_Process(subprocess.Popen(
274 + args,
275 + stdin=kwargs.pop('stdin', None),
276 + stdout=kwargs.pop('stdout', None),
277 + stderr=kwargs.pop('stderr', None), **kwargs), loop))
278 +
279 + return result
280 +
281 +
282 class Task(Future):
283 """
284 Schedule the execution of a coroutine: wrap it in a future. A task
285 @@ -127,6 +173,12 @@ def ensure_future(coro_or_future, loop=None):
286 @rtype: asyncio.Future (or compatible)
287 @return: an instance of Future
288 """
289 + loop = _wrap_loop(loop)
290 + if _asyncio_enabled and isinstance(loop, _AsyncioEventLoop):
291 + # Use the real asyncio loop and ensure_future.
292 + return _real_asyncio.ensure_future(
293 + coro_or_future, loop=loop._loop)
294 +
295 if isinstance(coro_or_future, Future):
296 return coro_or_future
297 raise NotImplementedError
298 diff --git a/lib/portage/util/futures/_asyncio/process.py b/lib/portage/util/futures/_asyncio/process.py
299 new file mode 100644
300 index 000000000..3e448da9c
301 --- /dev/null
302 +++ b/lib/portage/util/futures/_asyncio/process.py
303 @@ -0,0 +1,82 @@
304 +# Copyright 2018 Gentoo Foundation
305 +# Distributed under the terms of the GNU General Public License v2
306 +
307 +import portage
308 +portage.proxy.lazyimport.lazyimport(globals(),
309 + 'portage.util.futures:asyncio',
310 +)
311 +from portage.util.futures._asyncio.streams import _reader
312 +
313 +
314 +class _Process(object):
315 + """
316 + Emulate a subset of the asyncio.subprocess.Process interface,
317 + for python2.
318 + """
319 + def __init__(self, proc, loop):
320 + """
321 + @param proc: process instance
322 + @type proc: subprocess.Popen
323 + @param loop: asyncio.AbstractEventLoop (or compatible)
324 + @type loop: event loop
325 + """
326 + self._proc = proc
327 + self._loop = loop
328 + self.terminate = proc.terminate
329 + self.kill = proc.kill
330 + self.send_signal = proc.send_signal
331 + self.pid = proc.pid
332 + self._waiters = []
333 + loop._asyncio_child_watcher.\
334 + add_child_handler(self.pid, self._proc_exit)
335 +
336 + @property
337 + def returncode(self):
338 + return self._proc.returncode
339 +
340 + def communicate(self):
341 + """
342 + Read data from stdout and stderr, until end-of-file is reached.
343 + Wait for process to terminate.
344 +
345 + @return: tuple (stdout_data, stderr_data)
346 + @rtype: asyncio.Future (or compatible)
347 + """
348 + futures = []
349 + for input_file in (self._proc.stdout, self._proc.stderr):
350 + if input_file is None:
351 + future = self._loop.create_future()
352 + future.set_result(None)
353 + else:
354 + future = _reader(input_file, loop=self._loop)
355 + futures.append(future)
356 +
357 + result = self._loop.create_future()
358 + asyncio.ensure_future(asyncio.wait(futures + [self.wait()], loop=self._loop),
359 + loop=self._loop).add_done_callback(
360 + lambda waiter: None if result.cancelled() else
361 + result.set_result(tuple(future.result() for future in futures)))
362 + return result
363 +
364 + def wait(self):
365 + """
366 + Wait for child process to terminate. Set and return returncode attribute.
367 +
368 + @return: returncode
369 + @rtype: asyncio.Future (or compatible)
370 + """
371 + waiter = self._loop.create_future()
372 + if self.returncode is None:
373 + self._waiters.append(waiter)
374 + waiter.add_done_callback(lambda waiter: self._waiters.remove(waiter)
375 + if waiter.cancelled() else None)
376 + else:
377 + waiter.set_result(self.returncode)
378 + return waiter
379 +
380 + def _proc_exit(self, pid, returncode):
381 + self._proc.returncode = returncode
382 + waiters = self._waiters
383 + self._waiters = []
384 + for waiter in waiters:
385 + waiter.set_result(returncode)
386 diff --git a/lib/portage/util/futures/_asyncio/streams.py b/lib/portage/util/futures/_asyncio/streams.py
387 new file mode 100644
388 index 000000000..95d626dea
389 --- /dev/null
390 +++ b/lib/portage/util/futures/_asyncio/streams.py
391 @@ -0,0 +1,51 @@
392 +# Copyright 2018 Gentoo Foundation
393 +# Distributed under the terms of the GNU General Public License v2
394 +
395 +import portage
396 +portage.proxy.lazyimport.lazyimport(globals(),
397 + '_emerge.PipeReader:PipeReader',
398 + 'portage.util.futures:asyncio',
399 +)
400 +
401 +
402 +def _reader(input_file, loop=None):
403 + """
404 + Asynchronously read a binary input file, and close it when
405 + it reaches EOF.
406 +
407 + @param input_file: binary input file descriptor
408 + @type input_file: file or int
409 + @param loop: asyncio.AbstractEventLoop (or compatible)
410 + @type loop: event loop
411 + @return: bytes
412 + @rtype: asyncio.Future (or compatible)
413 + """
414 + loop = asyncio._wrap_loop(loop)
415 + future = loop.create_future()
416 + _Reader(future, input_file, loop)
417 + return future
418 +
419 +
420 +class _Reader(object):
421 + def __init__(self, future, input_file, loop):
422 + self._future = future
423 + self._pipe_reader = PipeReader(
424 + input_files={'input_file':input_file}, scheduler=loop)
425 +
426 + self._future.add_done_callback(self._cancel_callback)
427 + self._pipe_reader.addExitListener(self._eof)
428 + self._pipe_reader.start()
429 +
430 + def _cancel_callback(self, future):
431 + if future.cancelled():
432 + self._cancel()
433 +
434 + def _eof(self, pipe_reader):
435 + self._pipe_reader = None
436 + self._future.set_result(pipe_reader.getvalue())
437 +
438 + def _cancel(self):
439 + if self._pipe_reader is not None and self._pipe_reader.poll() is None:
440 + self._pipe_reader.removeExitListener(self._eof)
441 + self._pipe_reader.cancel()
442 + self._pipe_reader = None
443 --
444 2.16.4