1 |
The asyncio.create_subprocess_exec function is essential for |
2 |
using subprocesses in coroutines, so add support to do this |
3 |
for python2. This paves the way for extensive use of coroutines |
4 |
in portage, since coroutines are well-suited for many portage |
5 |
tasks that involve subprocesses. |
6 |
|
7 |
Bug: https://bugs.gentoo.org/662388 |
8 |
--- |
9 |
.../util/futures/asyncio/test_subprocess_exec.py | 116 +++++++++------------ |
10 |
lib/portage/util/futures/_asyncio/__init__.py | 52 +++++++++ |
11 |
lib/portage/util/futures/_asyncio/process.py | 82 +++++++++++++++ |
12 |
lib/portage/util/futures/_asyncio/streams.py | 51 +++++++++ |
13 |
4 files changed, 237 insertions(+), 64 deletions(-) |
14 |
create mode 100644 lib/portage/util/futures/_asyncio/process.py |
15 |
create mode 100644 lib/portage/util/futures/_asyncio/streams.py |
16 |
|
17 |
diff --git a/lib/portage/tests/util/futures/asyncio/test_subprocess_exec.py b/lib/portage/tests/util/futures/asyncio/test_subprocess_exec.py |
18 |
index 5a812ba6a..15adfbfd6 100644 |
19 |
--- a/lib/portage/tests/util/futures/asyncio/test_subprocess_exec.py |
20 |
+++ b/lib/portage/tests/util/futures/asyncio/test_subprocess_exec.py |
21 |
@@ -3,61 +3,17 @@ |
22 |
|
23 |
import os |
24 |
import subprocess |
25 |
- |
26 |
-try: |
27 |
- from asyncio import create_subprocess_exec |
28 |
-except ImportError: |
29 |
- create_subprocess_exec = None |
30 |
+import sys |
31 |
|
32 |
from portage.process import find_binary |
33 |
from portage.tests import TestCase |
34 |
from portage.util._eventloop.global_event_loop import global_event_loop |
35 |
from portage.util.futures import asyncio |
36 |
+from portage.util.futures._asyncio import create_subprocess_exec |
37 |
+from portage.util.futures._asyncio.streams import _reader as reader |
38 |
+from portage.util.futures.compat_coroutine import coroutine, coroutine_return |
39 |
from portage.util.futures.executor.fork import ForkExecutor |
40 |
from portage.util.futures.unix_events import DefaultEventLoopPolicy |
41 |
-from _emerge.PipeReader import PipeReader |
42 |
- |
43 |
- |
44 |
-def reader(input_file, loop=None): |
45 |
- """ |
46 |
- Asynchronously read a binary input file. |
47 |
- |
48 |
- @param input_file: binary input file |
49 |
- @type input_file: file |
50 |
- @param loop: event loop |
51 |
- @type loop: EventLoop |
52 |
- @return: bytes |
53 |
- @rtype: asyncio.Future (or compatible) |
54 |
- """ |
55 |
- loop = asyncio._wrap_loop(loop) |
56 |
- future = loop.create_future() |
57 |
- _Reader(future, input_file, loop) |
58 |
- return future |
59 |
- |
60 |
- |
61 |
-class _Reader(object): |
62 |
- def __init__(self, future, input_file, loop): |
63 |
- self._future = future |
64 |
- self._pipe_reader = PipeReader( |
65 |
- input_files={'input_file':input_file}, scheduler=loop) |
66 |
- |
67 |
- self._future.add_done_callback(self._cancel_callback) |
68 |
- self._pipe_reader.addExitListener(self._eof) |
69 |
- self._pipe_reader.start() |
70 |
- |
71 |
- def _cancel_callback(self, future): |
72 |
- if future.cancelled(): |
73 |
- self._cancel() |
74 |
- |
75 |
- def _eof(self, pipe_reader): |
76 |
- self._pipe_reader = None |
77 |
- self._future.set_result(pipe_reader.getvalue()) |
78 |
- |
79 |
- def _cancel(self): |
80 |
- if self._pipe_reader is not None and self._pipe_reader.poll() is None: |
81 |
- self._pipe_reader.removeExitListener(self._eof) |
82 |
- self._pipe_reader.cancel() |
83 |
- self._pipe_reader = None |
84 |
|
85 |
|
86 |
class SubprocessExecTestCase(TestCase): |
87 |
@@ -76,9 +32,6 @@ class SubprocessExecTestCase(TestCase): |
88 |
self.assertFalse(global_event_loop().is_closed()) |
89 |
|
90 |
def testEcho(self): |
91 |
- if create_subprocess_exec is None: |
92 |
- self.skipTest('create_subprocess_exec not implemented for python2') |
93 |
- |
94 |
args_tuple = (b'hello', b'world') |
95 |
echo_binary = find_binary("echo") |
96 |
self.assertNotEqual(echo_binary, None) |
97 |
@@ -98,7 +51,8 @@ class SubprocessExecTestCase(TestCase): |
98 |
proc = loop.run_until_complete( |
99 |
create_subprocess_exec( |
100 |
echo_binary, *args_tuple, |
101 |
- stdin=devnull, stdout=stdout_pw, stderr=stdout_pw)) |
102 |
+ stdin=devnull, stdout=stdout_pw, stderr=stdout_pw, |
103 |
+ loop=loop)) |
104 |
|
105 |
# This belongs exclusively to the subprocess now. |
106 |
stdout_pw.close() |
107 |
@@ -110,6 +64,41 @@ class SubprocessExecTestCase(TestCase): |
108 |
loop.run_until_complete(proc.wait()), os.EX_OK) |
109 |
self.assertEqual( |
110 |
tuple(loop.run_until_complete(output).split()), args_tuple) |
111 |
+ |
112 |
+ @coroutine |
113 |
+ def test_coroutine(loop=None): |
114 |
+ proc = (yield create_subprocess_exec(echo_binary, *args_tuple, |
115 |
+ stdout=subprocess.PIPE, stderr=subprocess.STDOUT, |
116 |
+ loop=loop)) |
117 |
+ |
118 |
+ out, err = (yield proc.communicate()) |
119 |
+ self.assertEqual(tuple(out.split()), args_tuple) |
120 |
+ self.assertEqual(proc.returncode, os.EX_OK) |
121 |
+ |
122 |
+ proc = (yield create_subprocess_exec( |
123 |
+ 'bash', '-c', 'echo foo; echo bar 1>&2;', |
124 |
+ stdout=subprocess.PIPE, stderr=subprocess.PIPE, |
125 |
+ loop=loop)) |
126 |
+ |
127 |
+ out, err = (yield proc.communicate()) |
128 |
+ self.assertEqual(out, b'foo\n') |
129 |
+ self.assertEqual(err, b'bar\n') |
130 |
+ self.assertEqual(proc.returncode, os.EX_OK) |
131 |
+ |
132 |
+ proc = (yield create_subprocess_exec( |
133 |
+ 'bash', '-c', 'echo foo; echo bar 1>&2;', |
134 |
+ stdout=subprocess.PIPE, stderr=subprocess.STDOUT, |
135 |
+ loop=loop)) |
136 |
+ |
137 |
+ out, err = (yield proc.communicate()) |
138 |
+ self.assertEqual(out, b'foo\nbar\n') |
139 |
+ self.assertEqual(err, None) |
140 |
+ self.assertEqual(proc.returncode, os.EX_OK) |
141 |
+ |
142 |
+ coroutine_return('success') |
143 |
+ |
144 |
+ self.assertEqual('success', |
145 |
+ loop.run_until_complete(test_coroutine(loop=loop))) |
146 |
finally: |
147 |
if output is not None and not output.done(): |
148 |
output.cancel() |
149 |
@@ -119,9 +108,6 @@ class SubprocessExecTestCase(TestCase): |
150 |
self._run_test(test) |
151 |
|
152 |
def testCat(self): |
153 |
- if create_subprocess_exec is None: |
154 |
- self.skipTest('create_subprocess_exec not implemented for python2') |
155 |
- |
156 |
stdin_data = b'hello world' |
157 |
cat_binary = find_binary("cat") |
158 |
self.assertNotEqual(cat_binary, None) |
159 |
@@ -143,9 +129,9 @@ class SubprocessExecTestCase(TestCase): |
160 |
output = None |
161 |
try: |
162 |
proc = loop.run_until_complete( |
163 |
- create_subprocess_exec( |
164 |
- cat_binary, |
165 |
- stdin=stdin_pr, stdout=stdout_pw, stderr=stdout_pw)) |
166 |
+ create_subprocess_exec(cat_binary, |
167 |
+ stdin=stdin_pr, stdout=stdout_pw, stderr=stdout_pw, |
168 |
+ loop=loop)) |
169 |
|
170 |
# These belong exclusively to the subprocess now. |
171 |
stdout_pw.close() |
172 |
@@ -178,8 +164,8 @@ class SubprocessExecTestCase(TestCase): |
173 |
requires an AbstractEventLoop.connect_read_pipe implementation |
174 |
(and a ReadTransport implementation for it to return). |
175 |
""" |
176 |
- if create_subprocess_exec is None: |
177 |
- self.skipTest('create_subprocess_exec not implemented for python2') |
178 |
+ if sys.version_info.major < 3: |
179 |
+ self.skipTest('ReadTransport not implemented for python2') |
180 |
|
181 |
args_tuple = (b'hello', b'world') |
182 |
echo_binary = find_binary("echo") |
183 |
@@ -192,7 +178,8 @@ class SubprocessExecTestCase(TestCase): |
184 |
create_subprocess_exec( |
185 |
echo_binary, *args_tuple, |
186 |
stdin=devnull, |
187 |
- stdout=subprocess.PIPE, stderr=subprocess.STDOUT)) |
188 |
+ stdout=subprocess.PIPE, stderr=subprocess.STDOUT, |
189 |
+ loop=loop)) |
190 |
|
191 |
self.assertEqual( |
192 |
tuple(loop.run_until_complete(proc.stdout.read()).split()), |
193 |
@@ -207,8 +194,8 @@ class SubprocessExecTestCase(TestCase): |
194 |
requires an AbstractEventLoop.connect_write_pipe implementation |
195 |
(and a WriteTransport implementation for it to return). |
196 |
""" |
197 |
- if create_subprocess_exec is None: |
198 |
- self.skipTest('create_subprocess_exec not implemented for python2') |
199 |
+ if sys.version_info.major < 3: |
200 |
+ self.skipTest('WriteTransport not implemented for python2') |
201 |
|
202 |
stdin_data = b'hello world' |
203 |
cat_binary = find_binary("cat") |
204 |
@@ -220,7 +207,8 @@ class SubprocessExecTestCase(TestCase): |
205 |
create_subprocess_exec( |
206 |
cat_binary, |
207 |
stdin=subprocess.PIPE, |
208 |
- stdout=subprocess.PIPE, stderr=subprocess.STDOUT)) |
209 |
+ stdout=subprocess.PIPE, stderr=subprocess.STDOUT, |
210 |
+ loop=loop)) |
211 |
|
212 |
# This buffers data when necessary to avoid blocking. |
213 |
proc.stdin.write(stdin_data) |
214 |
diff --git a/lib/portage/util/futures/_asyncio/__init__.py b/lib/portage/util/futures/_asyncio/__init__.py |
215 |
index acfd59396..e3f678086 100644 |
216 |
--- a/lib/portage/util/futures/_asyncio/__init__.py |
217 |
+++ b/lib/portage/util/futures/_asyncio/__init__.py |
218 |
@@ -20,6 +20,9 @@ __all__ = ( |
219 |
'wait', |
220 |
) |
221 |
|
222 |
+import subprocess |
223 |
+import sys |
224 |
+ |
225 |
try: |
226 |
import asyncio as _real_asyncio |
227 |
except ImportError: |
228 |
@@ -45,6 +48,7 @@ from portage.util.futures.futures import ( |
229 |
InvalidStateError, |
230 |
TimeoutError, |
231 |
) |
232 |
+from portage.util.futures._asyncio.process import _Process |
233 |
from portage.util.futures._asyncio.tasks import ( |
234 |
ALL_COMPLETED, |
235 |
FIRST_COMPLETED, |
236 |
@@ -105,6 +109,48 @@ def set_child_watcher(watcher): |
237 |
return get_event_loop_policy().set_child_watcher(watcher) |
238 |
|
239 |
|
240 |
+def create_subprocess_exec(*args, **kwargs): |
241 |
+ """ |
242 |
+ Create a subprocess. |
243 |
+ |
244 |
+ @param args: program and arguments |
245 |
+ @type args: str |
246 |
+ @param stdin: stdin file descriptor |
247 |
+ @type stdin: file or int |
248 |
+ @param stdout: stdout file descriptor |
249 |
+ @type stdout: file or int |
250 |
+ @param stderr: stderr file descriptor |
251 |
+ @type stderr: file or int |
252 |
+ @param close_fds: close file descriptors |
253 |
+ @type close_fds: bool |
254 |
+ @param loop: asyncio.AbstractEventLoop (or compatible) |
255 |
+ @type loop: event loop |
256 |
+ @type kwargs: varies |
257 |
+ @param kwargs: subprocess.Popen parameters |
258 |
+ @rtype: asyncio.Future (or compatible) |
259 |
+ @return: subset of asyncio.subprocess.Process interface |
260 |
+ """ |
261 |
+ loop = _wrap_loop(kwargs.pop('loop', None)) |
262 |
+ if _asyncio_enabled and isinstance(loop, _AsyncioEventLoop): |
263 |
+ # Use the real asyncio loop and create_subprocess_exec. |
264 |
+ return _real_asyncio.create_subprocess_exec(*args, loop=loop._loop, **kwargs) |
265 |
+ |
266 |
+ if sys.version_info < (3, 4): |
267 |
+ # Python 3.4 and later implement PEP 446, which makes newly |
268 |
+ # created file descriptors non-inheritable by default. |
269 |
+ kwargs.setdefault('close_fds', True) |
270 |
+ |
271 |
+ result = loop.create_future() |
272 |
+ |
273 |
+ result.set_result(_Process(subprocess.Popen( |
274 |
+ args, |
275 |
+ stdin=kwargs.pop('stdin', None), |
276 |
+ stdout=kwargs.pop('stdout', None), |
277 |
+ stderr=kwargs.pop('stderr', None), **kwargs), loop)) |
278 |
+ |
279 |
+ return result |
280 |
+ |
281 |
+ |
282 |
class Task(Future): |
283 |
""" |
284 |
Schedule the execution of a coroutine: wrap it in a future. A task |
285 |
@@ -127,6 +173,12 @@ def ensure_future(coro_or_future, loop=None): |
286 |
@rtype: asyncio.Future (or compatible) |
287 |
@return: an instance of Future |
288 |
""" |
289 |
+ loop = _wrap_loop(loop) |
290 |
+ if _asyncio_enabled and isinstance(loop, _AsyncioEventLoop): |
291 |
+ # Use the real asyncio loop and ensure_future. |
292 |
+ return _real_asyncio.ensure_future( |
293 |
+ coro_or_future, loop=loop._loop) |
294 |
+ |
295 |
if isinstance(coro_or_future, Future): |
296 |
return coro_or_future |
297 |
raise NotImplementedError |
298 |
diff --git a/lib/portage/util/futures/_asyncio/process.py b/lib/portage/util/futures/_asyncio/process.py |
299 |
new file mode 100644 |
300 |
index 000000000..3e448da9c |
301 |
--- /dev/null |
302 |
+++ b/lib/portage/util/futures/_asyncio/process.py |
303 |
@@ -0,0 +1,82 @@ |
304 |
+# Copyright 2018 Gentoo Foundation |
305 |
+# Distributed under the terms of the GNU General Public License v2 |
306 |
+ |
307 |
+import portage |
308 |
+portage.proxy.lazyimport.lazyimport(globals(), |
309 |
+ 'portage.util.futures:asyncio', |
310 |
+) |
311 |
+from portage.util.futures._asyncio.streams import _reader |
312 |
+ |
313 |
+ |
314 |
+class _Process(object): |
315 |
+ """ |
316 |
+ Emulate a subset of the asyncio.subprocess.Process interface, |
317 |
+ for python2. |
318 |
+ """ |
319 |
+ def __init__(self, proc, loop): |
320 |
+ """ |
321 |
+ @param proc: process instance |
322 |
+ @type proc: subprocess.Popen |
323 |
+ @param loop: asyncio.AbstractEventLoop (or compatible) |
324 |
+ @type loop: event loop |
325 |
+ """ |
326 |
+ self._proc = proc |
327 |
+ self._loop = loop |
328 |
+ self.terminate = proc.terminate |
329 |
+ self.kill = proc.kill |
330 |
+ self.send_signal = proc.send_signal |
331 |
+ self.pid = proc.pid |
332 |
+ self._waiters = [] |
333 |
+ loop._asyncio_child_watcher.\ |
334 |
+ add_child_handler(self.pid, self._proc_exit) |
335 |
+ |
336 |
+ @property |
337 |
+ def returncode(self): |
338 |
+ return self._proc.returncode |
339 |
+ |
340 |
+ def communicate(self): |
341 |
+ """ |
342 |
+ Read data from stdout and stderr, until end-of-file is reached. |
343 |
+ Wait for process to terminate. |
344 |
+ |
345 |
+ @return: tuple (stdout_data, stderr_data) |
346 |
+ @rtype: asyncio.Future (or compatible) |
347 |
+ """ |
348 |
+ futures = [] |
349 |
+ for input_file in (self._proc.stdout, self._proc.stderr): |
350 |
+ if input_file is None: |
351 |
+ future = self._loop.create_future() |
352 |
+ future.set_result(None) |
353 |
+ else: |
354 |
+ future = _reader(input_file, loop=self._loop) |
355 |
+ futures.append(future) |
356 |
+ |
357 |
+ result = self._loop.create_future() |
358 |
+ asyncio.ensure_future(asyncio.wait(futures + [self.wait()], loop=self._loop), |
359 |
+ loop=self._loop).add_done_callback( |
360 |
+ lambda waiter: None if result.cancelled() else |
361 |
+ result.set_result(tuple(future.result() for future in futures))) |
362 |
+ return result |
363 |
+ |
364 |
+ def wait(self): |
365 |
+ """ |
366 |
+ Wait for child process to terminate. Set and return returncode attribute. |
367 |
+ |
368 |
+ @return: returncode |
369 |
+ @rtype: asyncio.Future (or compatible) |
370 |
+ """ |
371 |
+ waiter = self._loop.create_future() |
372 |
+ if self.returncode is None: |
373 |
+ self._waiters.append(waiter) |
374 |
+ waiter.add_done_callback(lambda waiter: self._waiters.remove(waiter) |
375 |
+ if waiter.cancelled() else None) |
376 |
+ else: |
377 |
+ waiter.set_result(self.returncode) |
378 |
+ return waiter |
379 |
+ |
380 |
+ def _proc_exit(self, pid, returncode): |
381 |
+ self._proc.returncode = returncode |
382 |
+ waiters = self._waiters |
383 |
+ self._waiters = [] |
384 |
+ for waiter in waiters: |
385 |
+ waiter.set_result(returncode) |
386 |
diff --git a/lib/portage/util/futures/_asyncio/streams.py b/lib/portage/util/futures/_asyncio/streams.py |
387 |
new file mode 100644 |
388 |
index 000000000..95d626dea |
389 |
--- /dev/null |
390 |
+++ b/lib/portage/util/futures/_asyncio/streams.py |
391 |
@@ -0,0 +1,51 @@ |
392 |
+# Copyright 2018 Gentoo Foundation |
393 |
+# Distributed under the terms of the GNU General Public License v2 |
394 |
+ |
395 |
+import portage |
396 |
+portage.proxy.lazyimport.lazyimport(globals(), |
397 |
+ '_emerge.PipeReader:PipeReader', |
398 |
+ 'portage.util.futures:asyncio', |
399 |
+) |
400 |
+ |
401 |
+ |
402 |
+def _reader(input_file, loop=None): |
403 |
+ """ |
404 |
+ Asynchronously read a binary input file, and close it when |
405 |
+ it reaches EOF. |
406 |
+ |
407 |
+ @param input_file: binary input file descriptor |
408 |
+ @type input_file: file or int |
409 |
+ @param loop: asyncio.AbstractEventLoop (or compatible) |
410 |
+ @type loop: event loop |
411 |
+ @return: bytes |
412 |
+ @rtype: asyncio.Future (or compatible) |
413 |
+ """ |
414 |
+ loop = asyncio._wrap_loop(loop) |
415 |
+ future = loop.create_future() |
416 |
+ _Reader(future, input_file, loop) |
417 |
+ return future |
418 |
+ |
419 |
+ |
420 |
+class _Reader(object): |
421 |
+ def __init__(self, future, input_file, loop): |
422 |
+ self._future = future |
423 |
+ self._pipe_reader = PipeReader( |
424 |
+ input_files={'input_file':input_file}, scheduler=loop) |
425 |
+ |
426 |
+ self._future.add_done_callback(self._cancel_callback) |
427 |
+ self._pipe_reader.addExitListener(self._eof) |
428 |
+ self._pipe_reader.start() |
429 |
+ |
430 |
+ def _cancel_callback(self, future): |
431 |
+ if future.cancelled(): |
432 |
+ self._cancel() |
433 |
+ |
434 |
+ def _eof(self, pipe_reader): |
435 |
+ self._pipe_reader = None |
436 |
+ self._future.set_result(pipe_reader.getvalue()) |
437 |
+ |
438 |
+ def _cancel(self): |
439 |
+ if self._pipe_reader is not None and self._pipe_reader.poll() is None: |
440 |
+ self._pipe_reader.removeExitListener(self._eof) |
441 |
+ self._pipe_reader.cancel() |
442 |
+ self._pipe_reader = None |
443 |
-- |
444 |
2.16.4 |