Gentoo Archives: gentoo-commits

From: Zac Medico <zmedico@g.o>
To: gentoo-commits@l.g.o
Subject: [gentoo-commits] proj/portage:master commit in: lib/portage/util/futures/_asyncio/, lib/portage/util/futures/, ...
Date: Mon, 06 Aug 2018 05:01:31
Message-Id: 1533530321.6b4252d3a0f12808a5bcce888b7f68e1f84b5301.zmedico@gentoo
1 commit: 6b4252d3a0f12808a5bcce888b7f68e1f84b5301
2 Author: Zac Medico <zmedico <AT> gentoo <DOT> org>
3 AuthorDate: Sat Jul 28 21:22:42 2018 +0000
4 Commit: Zac Medico <zmedico <AT> gentoo <DOT> org>
5 CommitDate: Mon Aug 6 04:38:41 2018 +0000
6 URL: https://gitweb.gentoo.org/proj/portage.git/commit/?id=6b4252d3
7
8 Add asyncio.create_subprocess_exec support for python2 (bug 662388)
9
10 The asyncio.create_subprocess_exec function is essential for
11 using subprocesses in coroutines, so add support to do this
12 for python2. This paves the way for extensive use of coroutines
13 in portage, since coroutines are well-suited for many portage
14 tasks that involve subprocesses.
15
16 Bug: https://bugs.gentoo.org/662388
17
18 .../util/futures/asyncio/test_subprocess_exec.py | 184 ++++++---------------
19 lib/portage/util/futures/_asyncio/__init__.py | 53 ++++++
20 lib/portage/util/futures/_asyncio/process.py | 107 ++++++++++++
21 lib/portage/util/futures/_asyncio/streams.py | 96 +++++++++++
22 lib/portage/util/futures/compat_coroutine.py | 6 +-
23 5 files changed, 315 insertions(+), 131 deletions(-)
24
25 diff --git a/lib/portage/tests/util/futures/asyncio/test_subprocess_exec.py b/lib/portage/tests/util/futures/asyncio/test_subprocess_exec.py
26 index 5a812ba6a..61646cb92 100644
27 --- a/lib/portage/tests/util/futures/asyncio/test_subprocess_exec.py
28 +++ b/lib/portage/tests/util/futures/asyncio/test_subprocess_exec.py
29 @@ -3,61 +3,16 @@
30
31 import os
32 import subprocess
33 -
34 -try:
35 - from asyncio import create_subprocess_exec
36 -except ImportError:
37 - create_subprocess_exec = None
38 +import sys
39
40 from portage.process import find_binary
41 from portage.tests import TestCase
42 from portage.util._eventloop.global_event_loop import global_event_loop
43 from portage.util.futures import asyncio
44 -from portage.util.futures.executor.fork import ForkExecutor
45 +from portage.util.futures._asyncio import create_subprocess_exec
46 +from portage.util.futures._asyncio.streams import _reader as reader
47 +from portage.util.futures.compat_coroutine import coroutine, coroutine_return
48 from portage.util.futures.unix_events import DefaultEventLoopPolicy
49 -from _emerge.PipeReader import PipeReader
50 -
51 -
52 -def reader(input_file, loop=None):
53 - """
54 - Asynchronously read a binary input file.
55 -
56 - @param input_file: binary input file
57 - @type input_file: file
58 - @param loop: event loop
59 - @type loop: EventLoop
60 - @return: bytes
61 - @rtype: asyncio.Future (or compatible)
62 - """
63 - loop = asyncio._wrap_loop(loop)
64 - future = loop.create_future()
65 - _Reader(future, input_file, loop)
66 - return future
67 -
68 -
69 -class _Reader(object):
70 - def __init__(self, future, input_file, loop):
71 - self._future = future
72 - self._pipe_reader = PipeReader(
73 - input_files={'input_file':input_file}, scheduler=loop)
74 -
75 - self._future.add_done_callback(self._cancel_callback)
76 - self._pipe_reader.addExitListener(self._eof)
77 - self._pipe_reader.start()
78 -
79 - def _cancel_callback(self, future):
80 - if future.cancelled():
81 - self._cancel()
82 -
83 - def _eof(self, pipe_reader):
84 - self._pipe_reader = None
85 - self._future.set_result(pipe_reader.getvalue())
86 -
87 - def _cancel(self):
88 - if self._pipe_reader is not None and self._pipe_reader.poll() is None:
89 - self._pipe_reader.removeExitListener(self._eof)
90 - self._pipe_reader.cancel()
91 - self._pipe_reader = None
92
93
94 class SubprocessExecTestCase(TestCase):
95 @@ -76,99 +31,66 @@ class SubprocessExecTestCase(TestCase):
96 self.assertFalse(global_event_loop().is_closed())
97
98 def testEcho(self):
99 - if create_subprocess_exec is None:
100 - self.skipTest('create_subprocess_exec not implemented for python2')
101 -
102 args_tuple = (b'hello', b'world')
103 echo_binary = find_binary("echo")
104 self.assertNotEqual(echo_binary, None)
105 echo_binary = echo_binary.encode()
106
107 - # Use os.pipe(), since this loop does not implement the
108 - # ReadTransport necessary for subprocess.PIPE support.
109 - stdout_pr, stdout_pw = os.pipe()
110 - stdout_pr = os.fdopen(stdout_pr, 'rb', 0)
111 - stdout_pw = os.fdopen(stdout_pw, 'wb', 0)
112 - files = [stdout_pr, stdout_pw]
113 -
114 def test(loop):
115 - output = None
116 - try:
117 - with open(os.devnull, 'rb', 0) as devnull:
118 - proc = loop.run_until_complete(
119 - create_subprocess_exec(
120 - echo_binary, *args_tuple,
121 - stdin=devnull, stdout=stdout_pw, stderr=stdout_pw))
122 -
123 - # This belongs exclusively to the subprocess now.
124 - stdout_pw.close()
125 -
126 - output = asyncio.ensure_future(
127 - reader(stdout_pr, loop=loop), loop=loop)
128 -
129 - self.assertEqual(
130 - loop.run_until_complete(proc.wait()), os.EX_OK)
131 - self.assertEqual(
132 - tuple(loop.run_until_complete(output).split()), args_tuple)
133 - finally:
134 - if output is not None and not output.done():
135 - output.cancel()
136 - for f in files:
137 - f.close()
138 + @coroutine
139 + def test_coroutine(loop=None):
140
141 - self._run_test(test)
142 + proc = (yield create_subprocess_exec(echo_binary, *args_tuple,
143 + stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
144 + loop=loop))
145
146 - def testCat(self):
147 - if create_subprocess_exec is None:
148 - self.skipTest('create_subprocess_exec not implemented for python2')
149 + out, err = (yield proc.communicate())
150 + self.assertEqual(tuple(out.split()), args_tuple)
151 + self.assertEqual(proc.returncode, os.EX_OK)
152
153 - stdin_data = b'hello world'
154 - cat_binary = find_binary("cat")
155 - self.assertNotEqual(cat_binary, None)
156 - cat_binary = cat_binary.encode()
157 + proc = (yield create_subprocess_exec(
158 + 'bash', '-c', 'echo foo; echo bar 1>&2;',
159 + stdout=subprocess.PIPE, stderr=subprocess.PIPE,
160 + loop=loop))
161
162 - # Use os.pipe(), since this loop does not implement the
163 - # ReadTransport necessary for subprocess.PIPE support.
164 - stdout_pr, stdout_pw = os.pipe()
165 - stdout_pr = os.fdopen(stdout_pr, 'rb', 0)
166 - stdout_pw = os.fdopen(stdout_pw, 'wb', 0)
167 + out, err = (yield proc.communicate())
168 + self.assertEqual(out, b'foo\n')
169 + self.assertEqual(err, b'bar\n')
170 + self.assertEqual(proc.returncode, os.EX_OK)
171
172 - stdin_pr, stdin_pw = os.pipe()
173 - stdin_pr = os.fdopen(stdin_pr, 'rb', 0)
174 - stdin_pw = os.fdopen(stdin_pw, 'wb', 0)
175 + proc = (yield create_subprocess_exec(
176 + 'bash', '-c', 'echo foo; echo bar 1>&2;',
177 + stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
178 + loop=loop))
179
180 - files = [stdout_pr, stdout_pw, stdin_pr, stdin_pw]
181 + out, err = (yield proc.communicate())
182 + self.assertEqual(out, b'foo\nbar\n')
183 + self.assertEqual(err, None)
184 + self.assertEqual(proc.returncode, os.EX_OK)
185
186 - def test(loop):
187 - output = None
188 - try:
189 - proc = loop.run_until_complete(
190 - create_subprocess_exec(
191 - cat_binary,
192 - stdin=stdin_pr, stdout=stdout_pw, stderr=stdout_pw))
193 + coroutine_return('success')
194
195 - # These belong exclusively to the subprocess now.
196 - stdout_pw.close()
197 - stdin_pr.close()
198 + self.assertEqual('success',
199 + loop.run_until_complete(test_coroutine(loop=loop)))
200
201 - output = asyncio.ensure_future(
202 - reader(stdout_pr, loop=loop), loop=loop)
203 + self._run_test(test)
204
205 - with ForkExecutor(loop=loop) as executor:
206 - writer = asyncio.ensure_future(loop.run_in_executor(
207 - executor, stdin_pw.write, stdin_data), loop=loop)
208 + def testCat(self):
209 + stdin_data = b'hello world'
210 + cat_binary = find_binary("cat")
211 + self.assertNotEqual(cat_binary, None)
212 + cat_binary = cat_binary.encode()
213
214 - # This belongs exclusively to the writer now.
215 - stdin_pw.close()
216 - loop.run_until_complete(writer)
217 + def test(loop):
218 + proc = loop.run_until_complete(
219 + create_subprocess_exec(cat_binary,
220 + stdin=subprocess.PIPE, stdout=subprocess.PIPE,
221 + loop=loop))
222 +
223 + out, err = loop.run_until_complete(proc.communicate(input=stdin_data))
224
225 - self.assertEqual(loop.run_until_complete(proc.wait()), os.EX_OK)
226 - self.assertEqual(loop.run_until_complete(output), stdin_data)
227 - finally:
228 - if output is not None and not output.done():
229 - output.cancel()
230 - for f in files:
231 - f.close()
232 + self.assertEqual(loop.run_until_complete(proc.wait()), os.EX_OK)
233 + self.assertEqual(out, stdin_data)
234
235 self._run_test(test)
236
237 @@ -178,8 +100,8 @@ class SubprocessExecTestCase(TestCase):
238 requires an AbstractEventLoop.connect_read_pipe implementation
239 (and a ReadTransport implementation for it to return).
240 """
241 - if create_subprocess_exec is None:
242 - self.skipTest('create_subprocess_exec not implemented for python2')
243 + if sys.version_info.major < 3:
244 + self.skipTest('ReadTransport not implemented for python2')
245
246 args_tuple = (b'hello', b'world')
247 echo_binary = find_binary("echo")
248 @@ -192,7 +114,8 @@ class SubprocessExecTestCase(TestCase):
249 create_subprocess_exec(
250 echo_binary, *args_tuple,
251 stdin=devnull,
252 - stdout=subprocess.PIPE, stderr=subprocess.STDOUT))
253 + stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
254 + loop=loop))
255
256 self.assertEqual(
257 tuple(loop.run_until_complete(proc.stdout.read()).split()),
258 @@ -207,8 +130,8 @@ class SubprocessExecTestCase(TestCase):
259 requires an AbstractEventLoop.connect_write_pipe implementation
260 (and a WriteTransport implementation for it to return).
261 """
262 - if create_subprocess_exec is None:
263 - self.skipTest('create_subprocess_exec not implemented for python2')
264 + if sys.version_info.major < 3:
265 + self.skipTest('WriteTransport not implemented for python2')
266
267 stdin_data = b'hello world'
268 cat_binary = find_binary("cat")
269 @@ -220,7 +143,8 @@ class SubprocessExecTestCase(TestCase):
270 create_subprocess_exec(
271 cat_binary,
272 stdin=subprocess.PIPE,
273 - stdout=subprocess.PIPE, stderr=subprocess.STDOUT))
274 + stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
275 + loop=loop))
276
277 # This buffers data when necessary to avoid blocking.
278 proc.stdin.write(stdin_data)
279
280 diff --git a/lib/portage/util/futures/_asyncio/__init__.py b/lib/portage/util/futures/_asyncio/__init__.py
281 index acfd59396..faab98e47 100644
282 --- a/lib/portage/util/futures/_asyncio/__init__.py
283 +++ b/lib/portage/util/futures/_asyncio/__init__.py
284 @@ -20,6 +20,9 @@ __all__ = (
285 'wait',
286 )
287
288 +import subprocess
289 +import sys
290 +
291 try:
292 import asyncio as _real_asyncio
293 except ImportError:
294 @@ -45,6 +48,7 @@ from portage.util.futures.futures import (
295 InvalidStateError,
296 TimeoutError,
297 )
298 +from portage.util.futures._asyncio.process import _Process
299 from portage.util.futures._asyncio.tasks import (
300 ALL_COMPLETED,
301 FIRST_COMPLETED,
302 @@ -105,6 +109,49 @@ def set_child_watcher(watcher):
303 return get_event_loop_policy().set_child_watcher(watcher)
304
305
306 +# Python 3.4 and later implement PEP 446, which makes newly
307 +# created file descriptors non-inheritable by default.
308 +_close_fds_default = sys.version_info < (3, 4)
309 +
310 +
311 +def create_subprocess_exec(*args, **kwargs):
312 + """
313 + Create a subprocess.
314 +
315 + @param args: program and arguments
316 + @type args: str
317 + @param stdin: stdin file descriptor
318 + @type stdin: file or int
319 + @param stdout: stdout file descriptor
320 + @type stdout: file or int
321 + @param stderr: stderr file descriptor
322 + @type stderr: file or int
323 + @param close_fds: close file descriptors
324 + @type close_fds: bool
325 + @param loop: asyncio.AbstractEventLoop (or compatible)
326 + @type loop: event loop
327 + @type kwargs: varies
328 + @param kwargs: subprocess.Popen parameters
329 + @rtype: asyncio.Future (or compatible)
330 + @return: subset of asyncio.subprocess.Process interface
331 + """
332 + loop = _wrap_loop(kwargs.pop('loop', None))
333 + kwargs.setdefault('close_fds', _close_fds_default)
334 + if _asyncio_enabled and isinstance(loop, _AsyncioEventLoop):
335 + # Use the real asyncio loop and create_subprocess_exec.
336 + return _real_asyncio.create_subprocess_exec(*args, loop=loop._loop, **kwargs)
337 +
338 + result = loop.create_future()
339 +
340 + result.set_result(_Process(subprocess.Popen(
341 + args,
342 + stdin=kwargs.pop('stdin', None),
343 + stdout=kwargs.pop('stdout', None),
344 + stderr=kwargs.pop('stderr', None), **kwargs), loop))
345 +
346 + return result
347 +
348 +
349 class Task(Future):
350 """
351 Schedule the execution of a coroutine: wrap it in a future. A task
352 @@ -127,6 +174,12 @@ def ensure_future(coro_or_future, loop=None):
353 @rtype: asyncio.Future (or compatible)
354 @return: an instance of Future
355 """
356 + loop = _wrap_loop(loop)
357 + if _asyncio_enabled and isinstance(loop, _AsyncioEventLoop):
358 + # Use the real asyncio loop and ensure_future.
359 + return _real_asyncio.ensure_future(
360 + coro_or_future, loop=loop._loop)
361 +
362 if isinstance(coro_or_future, Future):
363 return coro_or_future
364 raise NotImplementedError
365
366 diff --git a/lib/portage/util/futures/_asyncio/process.py b/lib/portage/util/futures/_asyncio/process.py
367 new file mode 100644
368 index 000000000..020164c9b
369 --- /dev/null
370 +++ b/lib/portage/util/futures/_asyncio/process.py
371 @@ -0,0 +1,107 @@
372 +# Copyright 2018 Gentoo Foundation
373 +# Distributed under the terms of the GNU General Public License v2
374 +
375 +import portage
376 +portage.proxy.lazyimport.lazyimport(globals(),
377 + 'portage.util.futures:asyncio',
378 +)
379 +from portage.util.futures._asyncio.streams import _reader, _writer
380 +from portage.util.futures.compat_coroutine import coroutine, coroutine_return
381 +
382 +
383 +class _Process(object):
384 + """
385 + Emulate a subset of the asyncio.subprocess.Process interface,
386 + for python2.
387 + """
388 + def __init__(self, proc, loop):
389 + """
390 + @param proc: process instance
391 + @type proc: subprocess.Popen
392 + @param loop: asyncio.AbstractEventLoop (or compatible)
393 + @type loop: event loop
394 + """
395 + self._proc = proc
396 + self._loop = loop
397 + self.terminate = proc.terminate
398 + self.kill = proc.kill
399 + self.send_signal = proc.send_signal
400 + self.pid = proc.pid
401 + self._waiters = []
402 + loop._asyncio_child_watcher.\
403 + add_child_handler(self.pid, self._proc_exit)
404 +
405 + @property
406 + def returncode(self):
407 + return self._proc.returncode
408 +
409 + @coroutine
410 + def communicate(self, input=None):
411 + """
412 + Read data from stdout and stderr, until end-of-file is reached.
413 + Wait for process to terminate.
414 +
415 + @param input: stdin content to write
416 + @type input: bytes
417 + @return: tuple (stdout_data, stderr_data)
418 + @rtype: asyncio.Future (or compatible)
419 + """
420 + futures = []
421 + for input_file in (self._proc.stdout, self._proc.stderr):
422 + if input_file is None:
423 + future = self._loop.create_future()
424 + future.set_result(None)
425 + else:
426 + future = _reader(input_file, loop=self._loop)
427 + futures.append(future)
428 +
429 + writer = None
430 + if input is not None:
431 + if self._proc.stdin is None:
432 + raise TypeError('communicate: expected file or int, got {}'.format(type(self._proc.stdin)))
433 + writer = asyncio.ensure_future(_writer(self._proc.stdin, input), loop=self._loop)
434 +
435 + try:
436 + yield asyncio.wait(futures + [self.wait()], loop=self._loop)
437 + finally:
438 + if writer is not None:
439 + if writer.done():
440 + # Consume expected exceptions.
441 + try:
442 + writer.result()
443 + except EnvironmentError:
444 + # This is normal if the other end of the pipe was closed.
445 + pass
446 + else:
447 + writer.cancel()
448 +
449 + coroutine_return(tuple(future.result() for future in futures))
450 +
451 + def wait(self):
452 + """
453 + Wait for child process to terminate. Set and return returncode attribute.
454 +
455 + @return: returncode
456 + @rtype: asyncio.Future (or compatible)
457 + """
458 + waiter = self._loop.create_future()
459 + if self.returncode is None:
460 + self._waiters.append(waiter)
461 + waiter.add_done_callback(self._waiter_cancel)
462 + else:
463 + waiter.set_result(self.returncode)
464 + return waiter
465 +
466 + def _waiter_cancel(self, waiter):
467 + if waiter.cancelled():
468 + try:
469 + self._waiters.remove(waiter)
470 + except ValueError:
471 + pass
472 +
473 + def _proc_exit(self, pid, returncode):
474 + self._proc.returncode = returncode
475 + waiters = self._waiters
476 + self._waiters = []
477 + for waiter in waiters:
478 + waiter.set_result(returncode)
479
480 diff --git a/lib/portage/util/futures/_asyncio/streams.py b/lib/portage/util/futures/_asyncio/streams.py
481 new file mode 100644
482 index 000000000..650a16491
483 --- /dev/null
484 +++ b/lib/portage/util/futures/_asyncio/streams.py
485 @@ -0,0 +1,96 @@
486 +# Copyright 2018 Gentoo Foundation
487 +# Distributed under the terms of the GNU General Public License v2
488 +
489 +import errno
490 +import os
491 +
492 +import portage
493 +portage.proxy.lazyimport.lazyimport(globals(),
494 + '_emerge.PipeReader:PipeReader',
495 + 'portage.util.futures:asyncio',
496 + 'portage.util.futures.unix_events:_set_nonblocking',
497 +)
498 +from portage.util.futures.compat_coroutine import coroutine
499 +
500 +
501 +def _reader(input_file, loop=None):
502 + """
503 + Asynchronously read a binary input file, and close it when
504 + it reaches EOF.
505 +
506 + @param input_file: binary input file descriptor
507 + @type input_file: file or int
508 + @param loop: asyncio.AbstractEventLoop (or compatible)
509 + @type loop: event loop
510 + @return: bytes
511 + @rtype: asyncio.Future (or compatible)
512 + """
513 + loop = asyncio._wrap_loop(loop)
514 + future = loop.create_future()
515 + _Reader(future, input_file, loop)
516 + return future
517 +
518 +
519 +class _Reader(object):
520 + def __init__(self, future, input_file, loop):
521 + self._future = future
522 + self._pipe_reader = PipeReader(
523 + input_files={'input_file':input_file}, scheduler=loop)
524 +
525 + self._future.add_done_callback(self._cancel_callback)
526 + self._pipe_reader.addExitListener(self._eof)
527 + self._pipe_reader.start()
528 +
529 + def _cancel_callback(self, future):
530 + if future.cancelled():
531 + self._cancel()
532 +
533 + def _eof(self, pipe_reader):
534 + self._pipe_reader = None
535 + self._future.set_result(pipe_reader.getvalue())
536 +
537 + def _cancel(self):
538 + if self._pipe_reader is not None and self._pipe_reader.poll() is None:
539 + self._pipe_reader.removeExitListener(self._eof)
540 + self._pipe_reader.cancel()
541 + self._pipe_reader = None
542 +
543 +
544 +@coroutine
545 +def _writer(output_file, content, loop=None):
546 + """
547 + Asynchronously write bytes to output file, and close it when
548 + done. If an EnvironmentError other than EAGAIN is encountered,
549 + which typically indicates that the other end of the pipe has
550 + close, the error is raised. This function is a coroutine.
551 +
552 + @param output_file: output file descriptor
553 + @type output_file: file or int
554 + @param content: content to write
555 + @type content: bytes
556 + @param loop: asyncio.AbstractEventLoop (or compatible)
557 + @type loop: event loop
558 + """
559 + fd = output_file if isinstance(output_file, int) else output_file.fileno()
560 + _set_nonblocking(fd)
561 + loop = asyncio._wrap_loop(loop)
562 + try:
563 + while content:
564 + waiter = loop.create_future()
565 + loop.add_writer(fd, lambda: waiter.set_result(None))
566 + try:
567 + yield waiter
568 + while content:
569 + try:
570 + content = content[os.write(fd, content):]
571 + except EnvironmentError as e:
572 + if e.errno == errno.EAGAIN:
573 + break
574 + else:
575 + raise
576 + finally:
577 + loop.remove_writer(fd)
578 + except GeneratorExit:
579 + raise
580 + finally:
581 + os.close(output_file) if isinstance(output_file, int) else output_file.close()
582
583 diff --git a/lib/portage/util/futures/compat_coroutine.py b/lib/portage/util/futures/compat_coroutine.py
584 index 17400b74d..59fdc31b6 100644
585 --- a/lib/portage/util/futures/compat_coroutine.py
586 +++ b/lib/portage/util/futures/compat_coroutine.py
587 @@ -1,9 +1,13 @@
588 # Copyright 2018 Gentoo Foundation
589 # Distributed under the terms of the GNU General Public License v2
590
591 -from portage.util.futures import asyncio
592 import functools
593
594 +import portage
595 +portage.proxy.lazyimport.lazyimport(globals(),
596 + 'portage.util.futures:asyncio',
597 +)
598 +
599
600 def coroutine(generator_func):
601 """