1 |
commit: 6b4252d3a0f12808a5bcce888b7f68e1f84b5301 |
2 |
Author: Zac Medico <zmedico <AT> gentoo <DOT> org> |
3 |
AuthorDate: Sat Jul 28 21:22:42 2018 +0000 |
4 |
Commit: Zac Medico <zmedico <AT> gentoo <DOT> org> |
5 |
CommitDate: Mon Aug 6 04:38:41 2018 +0000 |
6 |
URL: https://gitweb.gentoo.org/proj/portage.git/commit/?id=6b4252d3 |
7 |
|
8 |
Add asyncio.create_subprocess_exec support for python2 (bug 662388) |
9 |
|
10 |
The asyncio.create_subprocess_exec function is essential for |
11 |
using subprocesses in coroutines, so add support to do this |
12 |
for python2. This paves the way for extensive use of coroutines |
13 |
in portage, since coroutines are well-suited for many portage |
14 |
tasks that involve subprocesses. |
15 |
|
16 |
Bug: https://bugs.gentoo.org/662388 |
17 |
|
18 |
.../util/futures/asyncio/test_subprocess_exec.py | 184 ++++++--------------- |
19 |
lib/portage/util/futures/_asyncio/__init__.py | 53 ++++++ |
20 |
lib/portage/util/futures/_asyncio/process.py | 107 ++++++++++++ |
21 |
lib/portage/util/futures/_asyncio/streams.py | 96 +++++++++++ |
22 |
lib/portage/util/futures/compat_coroutine.py | 6 +- |
23 |
5 files changed, 315 insertions(+), 131 deletions(-) |
24 |
|
25 |
diff --git a/lib/portage/tests/util/futures/asyncio/test_subprocess_exec.py b/lib/portage/tests/util/futures/asyncio/test_subprocess_exec.py |
26 |
index 5a812ba6a..61646cb92 100644 |
27 |
--- a/lib/portage/tests/util/futures/asyncio/test_subprocess_exec.py |
28 |
+++ b/lib/portage/tests/util/futures/asyncio/test_subprocess_exec.py |
29 |
@@ -3,61 +3,16 @@ |
30 |
|
31 |
import os |
32 |
import subprocess |
33 |
- |
34 |
-try: |
35 |
- from asyncio import create_subprocess_exec |
36 |
-except ImportError: |
37 |
- create_subprocess_exec = None |
38 |
+import sys |
39 |
|
40 |
from portage.process import find_binary |
41 |
from portage.tests import TestCase |
42 |
from portage.util._eventloop.global_event_loop import global_event_loop |
43 |
from portage.util.futures import asyncio |
44 |
-from portage.util.futures.executor.fork import ForkExecutor |
45 |
+from portage.util.futures._asyncio import create_subprocess_exec |
46 |
+from portage.util.futures._asyncio.streams import _reader as reader |
47 |
+from portage.util.futures.compat_coroutine import coroutine, coroutine_return |
48 |
from portage.util.futures.unix_events import DefaultEventLoopPolicy |
49 |
-from _emerge.PipeReader import PipeReader |
50 |
- |
51 |
- |
52 |
-def reader(input_file, loop=None): |
53 |
- """ |
54 |
- Asynchronously read a binary input file. |
55 |
- |
56 |
- @param input_file: binary input file |
57 |
- @type input_file: file |
58 |
- @param loop: event loop |
59 |
- @type loop: EventLoop |
60 |
- @return: bytes |
61 |
- @rtype: asyncio.Future (or compatible) |
62 |
- """ |
63 |
- loop = asyncio._wrap_loop(loop) |
64 |
- future = loop.create_future() |
65 |
- _Reader(future, input_file, loop) |
66 |
- return future |
67 |
- |
68 |
- |
69 |
-class _Reader(object): |
70 |
- def __init__(self, future, input_file, loop): |
71 |
- self._future = future |
72 |
- self._pipe_reader = PipeReader( |
73 |
- input_files={'input_file':input_file}, scheduler=loop) |
74 |
- |
75 |
- self._future.add_done_callback(self._cancel_callback) |
76 |
- self._pipe_reader.addExitListener(self._eof) |
77 |
- self._pipe_reader.start() |
78 |
- |
79 |
- def _cancel_callback(self, future): |
80 |
- if future.cancelled(): |
81 |
- self._cancel() |
82 |
- |
83 |
- def _eof(self, pipe_reader): |
84 |
- self._pipe_reader = None |
85 |
- self._future.set_result(pipe_reader.getvalue()) |
86 |
- |
87 |
- def _cancel(self): |
88 |
- if self._pipe_reader is not None and self._pipe_reader.poll() is None: |
89 |
- self._pipe_reader.removeExitListener(self._eof) |
90 |
- self._pipe_reader.cancel() |
91 |
- self._pipe_reader = None |
92 |
|
93 |
|
94 |
class SubprocessExecTestCase(TestCase): |
95 |
@@ -76,99 +31,66 @@ class SubprocessExecTestCase(TestCase): |
96 |
self.assertFalse(global_event_loop().is_closed()) |
97 |
|
98 |
def testEcho(self): |
99 |
- if create_subprocess_exec is None: |
100 |
- self.skipTest('create_subprocess_exec not implemented for python2') |
101 |
- |
102 |
args_tuple = (b'hello', b'world') |
103 |
echo_binary = find_binary("echo") |
104 |
self.assertNotEqual(echo_binary, None) |
105 |
echo_binary = echo_binary.encode() |
106 |
|
107 |
- # Use os.pipe(), since this loop does not implement the |
108 |
- # ReadTransport necessary for subprocess.PIPE support. |
109 |
- stdout_pr, stdout_pw = os.pipe() |
110 |
- stdout_pr = os.fdopen(stdout_pr, 'rb', 0) |
111 |
- stdout_pw = os.fdopen(stdout_pw, 'wb', 0) |
112 |
- files = [stdout_pr, stdout_pw] |
113 |
- |
114 |
def test(loop): |
115 |
- output = None |
116 |
- try: |
117 |
- with open(os.devnull, 'rb', 0) as devnull: |
118 |
- proc = loop.run_until_complete( |
119 |
- create_subprocess_exec( |
120 |
- echo_binary, *args_tuple, |
121 |
- stdin=devnull, stdout=stdout_pw, stderr=stdout_pw)) |
122 |
- |
123 |
- # This belongs exclusively to the subprocess now. |
124 |
- stdout_pw.close() |
125 |
- |
126 |
- output = asyncio.ensure_future( |
127 |
- reader(stdout_pr, loop=loop), loop=loop) |
128 |
- |
129 |
- self.assertEqual( |
130 |
- loop.run_until_complete(proc.wait()), os.EX_OK) |
131 |
- self.assertEqual( |
132 |
- tuple(loop.run_until_complete(output).split()), args_tuple) |
133 |
- finally: |
134 |
- if output is not None and not output.done(): |
135 |
- output.cancel() |
136 |
- for f in files: |
137 |
- f.close() |
138 |
+ @coroutine |
139 |
+ def test_coroutine(loop=None): |
140 |
|
141 |
- self._run_test(test) |
142 |
+ proc = (yield create_subprocess_exec(echo_binary, *args_tuple, |
143 |
+ stdout=subprocess.PIPE, stderr=subprocess.STDOUT, |
144 |
+ loop=loop)) |
145 |
|
146 |
- def testCat(self): |
147 |
- if create_subprocess_exec is None: |
148 |
- self.skipTest('create_subprocess_exec not implemented for python2') |
149 |
+ out, err = (yield proc.communicate()) |
150 |
+ self.assertEqual(tuple(out.split()), args_tuple) |
151 |
+ self.assertEqual(proc.returncode, os.EX_OK) |
152 |
|
153 |
- stdin_data = b'hello world' |
154 |
- cat_binary = find_binary("cat") |
155 |
- self.assertNotEqual(cat_binary, None) |
156 |
- cat_binary = cat_binary.encode() |
157 |
+ proc = (yield create_subprocess_exec( |
158 |
+ 'bash', '-c', 'echo foo; echo bar 1>&2;', |
159 |
+ stdout=subprocess.PIPE, stderr=subprocess.PIPE, |
160 |
+ loop=loop)) |
161 |
|
162 |
- # Use os.pipe(), since this loop does not implement the |
163 |
- # ReadTransport necessary for subprocess.PIPE support. |
164 |
- stdout_pr, stdout_pw = os.pipe() |
165 |
- stdout_pr = os.fdopen(stdout_pr, 'rb', 0) |
166 |
- stdout_pw = os.fdopen(stdout_pw, 'wb', 0) |
167 |
+ out, err = (yield proc.communicate()) |
168 |
+ self.assertEqual(out, b'foo\n') |
169 |
+ self.assertEqual(err, b'bar\n') |
170 |
+ self.assertEqual(proc.returncode, os.EX_OK) |
171 |
|
172 |
- stdin_pr, stdin_pw = os.pipe() |
173 |
- stdin_pr = os.fdopen(stdin_pr, 'rb', 0) |
174 |
- stdin_pw = os.fdopen(stdin_pw, 'wb', 0) |
175 |
+ proc = (yield create_subprocess_exec( |
176 |
+ 'bash', '-c', 'echo foo; echo bar 1>&2;', |
177 |
+ stdout=subprocess.PIPE, stderr=subprocess.STDOUT, |
178 |
+ loop=loop)) |
179 |
|
180 |
- files = [stdout_pr, stdout_pw, stdin_pr, stdin_pw] |
181 |
+ out, err = (yield proc.communicate()) |
182 |
+ self.assertEqual(out, b'foo\nbar\n') |
183 |
+ self.assertEqual(err, None) |
184 |
+ self.assertEqual(proc.returncode, os.EX_OK) |
185 |
|
186 |
- def test(loop): |
187 |
- output = None |
188 |
- try: |
189 |
- proc = loop.run_until_complete( |
190 |
- create_subprocess_exec( |
191 |
- cat_binary, |
192 |
- stdin=stdin_pr, stdout=stdout_pw, stderr=stdout_pw)) |
193 |
+ coroutine_return('success') |
194 |
|
195 |
- # These belong exclusively to the subprocess now. |
196 |
- stdout_pw.close() |
197 |
- stdin_pr.close() |
198 |
+ self.assertEqual('success', |
199 |
+ loop.run_until_complete(test_coroutine(loop=loop))) |
200 |
|
201 |
- output = asyncio.ensure_future( |
202 |
- reader(stdout_pr, loop=loop), loop=loop) |
203 |
+ self._run_test(test) |
204 |
|
205 |
- with ForkExecutor(loop=loop) as executor: |
206 |
- writer = asyncio.ensure_future(loop.run_in_executor( |
207 |
- executor, stdin_pw.write, stdin_data), loop=loop) |
208 |
+ def testCat(self): |
209 |
+ stdin_data = b'hello world' |
210 |
+ cat_binary = find_binary("cat") |
211 |
+ self.assertNotEqual(cat_binary, None) |
212 |
+ cat_binary = cat_binary.encode() |
213 |
|
214 |
- # This belongs exclusively to the writer now. |
215 |
- stdin_pw.close() |
216 |
- loop.run_until_complete(writer) |
217 |
+ def test(loop): |
218 |
+ proc = loop.run_until_complete( |
219 |
+ create_subprocess_exec(cat_binary, |
220 |
+ stdin=subprocess.PIPE, stdout=subprocess.PIPE, |
221 |
+ loop=loop)) |
222 |
+ |
223 |
+ out, err = loop.run_until_complete(proc.communicate(input=stdin_data)) |
224 |
|
225 |
- self.assertEqual(loop.run_until_complete(proc.wait()), os.EX_OK) |
226 |
- self.assertEqual(loop.run_until_complete(output), stdin_data) |
227 |
- finally: |
228 |
- if output is not None and not output.done(): |
229 |
- output.cancel() |
230 |
- for f in files: |
231 |
- f.close() |
232 |
+ self.assertEqual(loop.run_until_complete(proc.wait()), os.EX_OK) |
233 |
+ self.assertEqual(out, stdin_data) |
234 |
|
235 |
self._run_test(test) |
236 |
|
237 |
@@ -178,8 +100,8 @@ class SubprocessExecTestCase(TestCase): |
238 |
requires an AbstractEventLoop.connect_read_pipe implementation |
239 |
(and a ReadTransport implementation for it to return). |
240 |
""" |
241 |
- if create_subprocess_exec is None: |
242 |
- self.skipTest('create_subprocess_exec not implemented for python2') |
243 |
+ if sys.version_info.major < 3: |
244 |
+ self.skipTest('ReadTransport not implemented for python2') |
245 |
|
246 |
args_tuple = (b'hello', b'world') |
247 |
echo_binary = find_binary("echo") |
248 |
@@ -192,7 +114,8 @@ class SubprocessExecTestCase(TestCase): |
249 |
create_subprocess_exec( |
250 |
echo_binary, *args_tuple, |
251 |
stdin=devnull, |
252 |
- stdout=subprocess.PIPE, stderr=subprocess.STDOUT)) |
253 |
+ stdout=subprocess.PIPE, stderr=subprocess.STDOUT, |
254 |
+ loop=loop)) |
255 |
|
256 |
self.assertEqual( |
257 |
tuple(loop.run_until_complete(proc.stdout.read()).split()), |
258 |
@@ -207,8 +130,8 @@ class SubprocessExecTestCase(TestCase): |
259 |
requires an AbstractEventLoop.connect_write_pipe implementation |
260 |
(and a WriteTransport implementation for it to return). |
261 |
""" |
262 |
- if create_subprocess_exec is None: |
263 |
- self.skipTest('create_subprocess_exec not implemented for python2') |
264 |
+ if sys.version_info.major < 3: |
265 |
+ self.skipTest('WriteTransport not implemented for python2') |
266 |
|
267 |
stdin_data = b'hello world' |
268 |
cat_binary = find_binary("cat") |
269 |
@@ -220,7 +143,8 @@ class SubprocessExecTestCase(TestCase): |
270 |
create_subprocess_exec( |
271 |
cat_binary, |
272 |
stdin=subprocess.PIPE, |
273 |
- stdout=subprocess.PIPE, stderr=subprocess.STDOUT)) |
274 |
+ stdout=subprocess.PIPE, stderr=subprocess.STDOUT, |
275 |
+ loop=loop)) |
276 |
|
277 |
# This buffers data when necessary to avoid blocking. |
278 |
proc.stdin.write(stdin_data) |
279 |
|
280 |
diff --git a/lib/portage/util/futures/_asyncio/__init__.py b/lib/portage/util/futures/_asyncio/__init__.py |
281 |
index acfd59396..faab98e47 100644 |
282 |
--- a/lib/portage/util/futures/_asyncio/__init__.py |
283 |
+++ b/lib/portage/util/futures/_asyncio/__init__.py |
284 |
@@ -20,6 +20,9 @@ __all__ = ( |
285 |
'wait', |
286 |
) |
287 |
|
288 |
+import subprocess |
289 |
+import sys |
290 |
+ |
291 |
try: |
292 |
import asyncio as _real_asyncio |
293 |
except ImportError: |
294 |
@@ -45,6 +48,7 @@ from portage.util.futures.futures import ( |
295 |
InvalidStateError, |
296 |
TimeoutError, |
297 |
) |
298 |
+from portage.util.futures._asyncio.process import _Process |
299 |
from portage.util.futures._asyncio.tasks import ( |
300 |
ALL_COMPLETED, |
301 |
FIRST_COMPLETED, |
302 |
@@ -105,6 +109,49 @@ def set_child_watcher(watcher): |
303 |
return get_event_loop_policy().set_child_watcher(watcher) |
304 |
|
305 |
|
306 |
+# Python 3.4 and later implement PEP 446, which makes newly |
307 |
+# created file descriptors non-inheritable by default. |
308 |
+_close_fds_default = sys.version_info < (3, 4) |
309 |
+ |
310 |
+ |
311 |
+def create_subprocess_exec(*args, **kwargs): |
312 |
+ """ |
313 |
+ Create a subprocess. |
314 |
+ |
315 |
+ @param args: program and arguments |
316 |
+ @type args: str |
317 |
+ @param stdin: stdin file descriptor |
318 |
+ @type stdin: file or int |
319 |
+ @param stdout: stdout file descriptor |
320 |
+ @type stdout: file or int |
321 |
+ @param stderr: stderr file descriptor |
322 |
+ @type stderr: file or int |
323 |
+ @param close_fds: close file descriptors |
324 |
+ @type close_fds: bool |
325 |
+ @param loop: asyncio.AbstractEventLoop (or compatible) |
326 |
+ @type loop: event loop |
327 |
+ @type kwargs: varies |
328 |
+ @param kwargs: subprocess.Popen parameters |
329 |
+ @rtype: asyncio.Future (or compatible) |
330 |
+ @return: subset of asyncio.subprocess.Process interface |
331 |
+ """ |
332 |
+ loop = _wrap_loop(kwargs.pop('loop', None)) |
333 |
+ kwargs.setdefault('close_fds', _close_fds_default) |
334 |
+ if _asyncio_enabled and isinstance(loop, _AsyncioEventLoop): |
335 |
+ # Use the real asyncio loop and create_subprocess_exec. |
336 |
+ return _real_asyncio.create_subprocess_exec(*args, loop=loop._loop, **kwargs) |
337 |
+ |
338 |
+ result = loop.create_future() |
339 |
+ |
340 |
+ result.set_result(_Process(subprocess.Popen( |
341 |
+ args, |
342 |
+ stdin=kwargs.pop('stdin', None), |
343 |
+ stdout=kwargs.pop('stdout', None), |
344 |
+ stderr=kwargs.pop('stderr', None), **kwargs), loop)) |
345 |
+ |
346 |
+ return result |
347 |
+ |
348 |
+ |
349 |
class Task(Future): |
350 |
""" |
351 |
Schedule the execution of a coroutine: wrap it in a future. A task |
352 |
@@ -127,6 +174,12 @@ def ensure_future(coro_or_future, loop=None): |
353 |
@rtype: asyncio.Future (or compatible) |
354 |
@return: an instance of Future |
355 |
""" |
356 |
+ loop = _wrap_loop(loop) |
357 |
+ if _asyncio_enabled and isinstance(loop, _AsyncioEventLoop): |
358 |
+ # Use the real asyncio loop and ensure_future. |
359 |
+ return _real_asyncio.ensure_future( |
360 |
+ coro_or_future, loop=loop._loop) |
361 |
+ |
362 |
if isinstance(coro_or_future, Future): |
363 |
return coro_or_future |
364 |
raise NotImplementedError |
365 |
|
366 |
diff --git a/lib/portage/util/futures/_asyncio/process.py b/lib/portage/util/futures/_asyncio/process.py |
367 |
new file mode 100644 |
368 |
index 000000000..020164c9b |
369 |
--- /dev/null |
370 |
+++ b/lib/portage/util/futures/_asyncio/process.py |
371 |
@@ -0,0 +1,107 @@ |
372 |
+# Copyright 2018 Gentoo Foundation |
373 |
+# Distributed under the terms of the GNU General Public License v2 |
374 |
+ |
375 |
+import portage |
376 |
+portage.proxy.lazyimport.lazyimport(globals(), |
377 |
+ 'portage.util.futures:asyncio', |
378 |
+) |
379 |
+from portage.util.futures._asyncio.streams import _reader, _writer |
380 |
+from portage.util.futures.compat_coroutine import coroutine, coroutine_return |
381 |
+ |
382 |
+ |
383 |
+class _Process(object): |
384 |
+ """ |
385 |
+ Emulate a subset of the asyncio.subprocess.Process interface, |
386 |
+ for python2. |
387 |
+ """ |
388 |
+ def __init__(self, proc, loop): |
389 |
+ """ |
390 |
+ @param proc: process instance |
391 |
+ @type proc: subprocess.Popen |
392 |
+ @param loop: asyncio.AbstractEventLoop (or compatible) |
393 |
+ @type loop: event loop |
394 |
+ """ |
395 |
+ self._proc = proc |
396 |
+ self._loop = loop |
397 |
+ self.terminate = proc.terminate |
398 |
+ self.kill = proc.kill |
399 |
+ self.send_signal = proc.send_signal |
400 |
+ self.pid = proc.pid |
401 |
+ self._waiters = [] |
402 |
+ loop._asyncio_child_watcher.\ |
403 |
+ add_child_handler(self.pid, self._proc_exit) |
404 |
+ |
405 |
+ @property |
406 |
+ def returncode(self): |
407 |
+ return self._proc.returncode |
408 |
+ |
409 |
+ @coroutine |
410 |
+ def communicate(self, input=None): |
411 |
+ """ |
412 |
+ Read data from stdout and stderr, until end-of-file is reached. |
413 |
+ Wait for process to terminate. |
414 |
+ |
415 |
+ @param input: stdin content to write |
416 |
+ @type input: bytes |
417 |
+ @return: tuple (stdout_data, stderr_data) |
418 |
+ @rtype: asyncio.Future (or compatible) |
419 |
+ """ |
420 |
+ futures = [] |
421 |
+ for input_file in (self._proc.stdout, self._proc.stderr): |
422 |
+ if input_file is None: |
423 |
+ future = self._loop.create_future() |
424 |
+ future.set_result(None) |
425 |
+ else: |
426 |
+ future = _reader(input_file, loop=self._loop) |
427 |
+ futures.append(future) |
428 |
+ |
429 |
+ writer = None |
430 |
+ if input is not None: |
431 |
+ if self._proc.stdin is None: |
432 |
+ raise TypeError('communicate: expected file or int, got {}'.format(type(self._proc.stdin))) |
433 |
+ writer = asyncio.ensure_future(_writer(self._proc.stdin, input), loop=self._loop) |
434 |
+ |
435 |
+ try: |
436 |
+ yield asyncio.wait(futures + [self.wait()], loop=self._loop) |
437 |
+ finally: |
438 |
+ if writer is not None: |
439 |
+ if writer.done(): |
440 |
+ # Consume expected exceptions. |
441 |
+ try: |
442 |
+ writer.result() |
443 |
+ except EnvironmentError: |
444 |
+ # This is normal if the other end of the pipe was closed. |
445 |
+ pass |
446 |
+ else: |
447 |
+ writer.cancel() |
448 |
+ |
449 |
+ coroutine_return(tuple(future.result() for future in futures)) |
450 |
+ |
451 |
+ def wait(self): |
452 |
+ """ |
453 |
+ Wait for child process to terminate. Set and return returncode attribute. |
454 |
+ |
455 |
+ @return: returncode |
456 |
+ @rtype: asyncio.Future (or compatible) |
457 |
+ """ |
458 |
+ waiter = self._loop.create_future() |
459 |
+ if self.returncode is None: |
460 |
+ self._waiters.append(waiter) |
461 |
+ waiter.add_done_callback(self._waiter_cancel) |
462 |
+ else: |
463 |
+ waiter.set_result(self.returncode) |
464 |
+ return waiter |
465 |
+ |
466 |
+ def _waiter_cancel(self, waiter): |
467 |
+ if waiter.cancelled(): |
468 |
+ try: |
469 |
+ self._waiters.remove(waiter) |
470 |
+ except ValueError: |
471 |
+ pass |
472 |
+ |
473 |
+ def _proc_exit(self, pid, returncode): |
474 |
+ self._proc.returncode = returncode |
475 |
+ waiters = self._waiters |
476 |
+ self._waiters = [] |
477 |
+ for waiter in waiters: |
478 |
+ waiter.set_result(returncode) |
479 |
|
480 |
diff --git a/lib/portage/util/futures/_asyncio/streams.py b/lib/portage/util/futures/_asyncio/streams.py |
481 |
new file mode 100644 |
482 |
index 000000000..650a16491 |
483 |
--- /dev/null |
484 |
+++ b/lib/portage/util/futures/_asyncio/streams.py |
485 |
@@ -0,0 +1,96 @@ |
486 |
+# Copyright 2018 Gentoo Foundation |
487 |
+# Distributed under the terms of the GNU General Public License v2 |
488 |
+ |
489 |
+import errno |
490 |
+import os |
491 |
+ |
492 |
+import portage |
493 |
+portage.proxy.lazyimport.lazyimport(globals(), |
494 |
+ '_emerge.PipeReader:PipeReader', |
495 |
+ 'portage.util.futures:asyncio', |
496 |
+ 'portage.util.futures.unix_events:_set_nonblocking', |
497 |
+) |
498 |
+from portage.util.futures.compat_coroutine import coroutine |
499 |
+ |
500 |
+ |
501 |
+def _reader(input_file, loop=None): |
502 |
+ """ |
503 |
+ Asynchronously read a binary input file, and close it when |
504 |
+ it reaches EOF. |
505 |
+ |
506 |
+ @param input_file: binary input file descriptor |
507 |
+ @type input_file: file or int |
508 |
+ @param loop: asyncio.AbstractEventLoop (or compatible) |
509 |
+ @type loop: event loop |
510 |
+ @return: bytes |
511 |
+ @rtype: asyncio.Future (or compatible) |
512 |
+ """ |
513 |
+ loop = asyncio._wrap_loop(loop) |
514 |
+ future = loop.create_future() |
515 |
+ _Reader(future, input_file, loop) |
516 |
+ return future |
517 |
+ |
518 |
+ |
519 |
+class _Reader(object): |
520 |
+ def __init__(self, future, input_file, loop): |
521 |
+ self._future = future |
522 |
+ self._pipe_reader = PipeReader( |
523 |
+ input_files={'input_file':input_file}, scheduler=loop) |
524 |
+ |
525 |
+ self._future.add_done_callback(self._cancel_callback) |
526 |
+ self._pipe_reader.addExitListener(self._eof) |
527 |
+ self._pipe_reader.start() |
528 |
+ |
529 |
+ def _cancel_callback(self, future): |
530 |
+ if future.cancelled(): |
531 |
+ self._cancel() |
532 |
+ |
533 |
+ def _eof(self, pipe_reader): |
534 |
+ self._pipe_reader = None |
535 |
+ self._future.set_result(pipe_reader.getvalue()) |
536 |
+ |
537 |
+ def _cancel(self): |
538 |
+ if self._pipe_reader is not None and self._pipe_reader.poll() is None: |
539 |
+ self._pipe_reader.removeExitListener(self._eof) |
540 |
+ self._pipe_reader.cancel() |
541 |
+ self._pipe_reader = None |
542 |
+ |
543 |
+ |
544 |
+@coroutine |
545 |
+def _writer(output_file, content, loop=None): |
546 |
+ """ |
547 |
+ Asynchronously write bytes to output file, and close it when |
548 |
+ done. If an EnvironmentError other than EAGAIN is encountered, |
549 |
+ which typically indicates that the other end of the pipe has |
550 |
+ close, the error is raised. This function is a coroutine. |
551 |
+ |
552 |
+ @param output_file: output file descriptor |
553 |
+ @type output_file: file or int |
554 |
+ @param content: content to write |
555 |
+ @type content: bytes |
556 |
+ @param loop: asyncio.AbstractEventLoop (or compatible) |
557 |
+ @type loop: event loop |
558 |
+ """ |
559 |
+ fd = output_file if isinstance(output_file, int) else output_file.fileno() |
560 |
+ _set_nonblocking(fd) |
561 |
+ loop = asyncio._wrap_loop(loop) |
562 |
+ try: |
563 |
+ while content: |
564 |
+ waiter = loop.create_future() |
565 |
+ loop.add_writer(fd, lambda: waiter.set_result(None)) |
566 |
+ try: |
567 |
+ yield waiter |
568 |
+ while content: |
569 |
+ try: |
570 |
+ content = content[os.write(fd, content):] |
571 |
+ except EnvironmentError as e: |
572 |
+ if e.errno == errno.EAGAIN: |
573 |
+ break |
574 |
+ else: |
575 |
+ raise |
576 |
+ finally: |
577 |
+ loop.remove_writer(fd) |
578 |
+ except GeneratorExit: |
579 |
+ raise |
580 |
+ finally: |
581 |
+ os.close(output_file) if isinstance(output_file, int) else output_file.close() |
582 |
|
583 |
diff --git a/lib/portage/util/futures/compat_coroutine.py b/lib/portage/util/futures/compat_coroutine.py |
584 |
index 17400b74d..59fdc31b6 100644 |
585 |
--- a/lib/portage/util/futures/compat_coroutine.py |
586 |
+++ b/lib/portage/util/futures/compat_coroutine.py |
587 |
@@ -1,9 +1,13 @@ |
588 |
# Copyright 2018 Gentoo Foundation |
589 |
# Distributed under the terms of the GNU General Public License v2 |
590 |
|
591 |
-from portage.util.futures import asyncio |
592 |
import functools |
593 |
|
594 |
+import portage |
595 |
+portage.proxy.lazyimport.lazyimport(globals(), |
596 |
+ 'portage.util.futures:asyncio', |
597 |
+) |
598 |
+ |
599 |
|
600 |
def coroutine(generator_func): |
601 |
""" |