1 |
commit: 27712651aa7014a960b012dc89457df09677edc1 |
2 |
Author: Zac Medico <zmedico <AT> gentoo <DOT> org> |
3 |
AuthorDate: Mon Feb 24 08:06:11 2020 +0000 |
4 |
Commit: Zac Medico <zmedico <AT> gentoo <DOT> org> |
5 |
CommitDate: Mon Feb 24 10:26:33 2020 +0000 |
6 |
URL: https://gitweb.gentoo.org/proj/portage.git/commit/?id=27712651 |
7 |
|
8 |
PipeLogger: non-blocking write to pipe (bug 709746) |
9 |
|
10 |
Add support to write to a non-blocking pipe instead of a |
11 |
log file. This is needed for the purposes of bug 709746, |
12 |
where PipeLogger will write to a pipe that is drained |
13 |
by anoher PipeLogger instance which is running in the same |
14 |
process. |
15 |
|
16 |
Bug: https://bugs.gentoo.org/709746 |
17 |
Signed-off-by: Zac Medico <zmedico <AT> gentoo.org> |
18 |
|
19 |
lib/portage/tests/process/test_PopenProcess.py | 41 +++++++++++++++- |
20 |
lib/portage/util/_async/PipeLogger.py | 67 +++++++++++++++++++++----- |
21 |
2 files changed, 94 insertions(+), 14 deletions(-) |
22 |
|
23 |
diff --git a/lib/portage/tests/process/test_PopenProcess.py b/lib/portage/tests/process/test_PopenProcess.py |
24 |
index ed506b814..d4e97f210 100644 |
25 |
--- a/lib/portage/tests/process/test_PopenProcess.py |
26 |
+++ b/lib/portage/tests/process/test_PopenProcess.py |
27 |
@@ -9,6 +9,8 @@ from portage.tests import TestCase |
28 |
from portage.util._async.PipeLogger import PipeLogger |
29 |
from portage.util._async.PopenProcess import PopenProcess |
30 |
from portage.util._eventloop.global_event_loop import global_event_loop |
31 |
+from portage.util.futures._asyncio.streams import _reader |
32 |
+from portage.util.futures.compat_coroutine import coroutine, coroutine_return |
33 |
from _emerge.PipeReader import PipeReader |
34 |
|
35 |
class PopenPipeTestCase(TestCase): |
36 |
@@ -73,8 +75,41 @@ class PopenPipeTestCase(TestCase): |
37 |
|
38 |
return content.decode('ascii', 'replace') |
39 |
|
40 |
+ @coroutine |
41 |
+ def _testPipeLoggerToPipe(self, test_string, loop=None): |
42 |
+ """ |
43 |
+ Test PipeLogger writing to a pipe connected to a PipeReader. |
44 |
+ This verifies that PipeLogger does not deadlock when writing |
45 |
+ to a pipe that's drained by a PipeReader running in the same |
46 |
+ process (requires non-blocking write). |
47 |
+ """ |
48 |
+ |
49 |
+ producer = PopenProcess(proc=subprocess.Popen( |
50 |
+ ["bash", "-c", self._echo_cmd % test_string], |
51 |
+ stdout=subprocess.PIPE, stderr=subprocess.STDOUT), |
52 |
+ scheduler=loop) |
53 |
+ |
54 |
+ pr, pw = os.pipe() |
55 |
+ |
56 |
+ consumer = producer.pipe_reader = PipeLogger(background=True, |
57 |
+ input_fd=producer.proc.stdout, |
58 |
+ log_file_path=os.fdopen(pw, 'wb', 0)) |
59 |
+ |
60 |
+ reader = _reader(pr, loop=loop) |
61 |
+ yield producer.async_start() |
62 |
+ content = yield reader |
63 |
+ yield producer.async_wait() |
64 |
+ yield consumer.async_wait() |
65 |
+ |
66 |
+ self.assertEqual(producer.returncode, os.EX_OK) |
67 |
+ self.assertEqual(consumer.returncode, os.EX_OK) |
68 |
+ |
69 |
+ coroutine_return(content.decode('ascii', 'replace')) |
70 |
+ |
71 |
def testPopenPipe(self): |
72 |
- for x in (1, 2, 5, 6, 7, 8, 2**5, 2**10, 2**12, 2**13, 2**14): |
73 |
+ loop = global_event_loop() |
74 |
+ |
75 |
+ for x in (1, 2, 5, 6, 7, 8, 2**5, 2**10, 2**12, 2**13, 2**14, 2**15, 2**16): |
76 |
test_string = x * "a" |
77 |
output = self._testPipeReader(test_string) |
78 |
self.assertEqual(test_string, output, |
79 |
@@ -83,3 +118,7 @@ class PopenPipeTestCase(TestCase): |
80 |
output = self._testPipeLogger(test_string) |
81 |
self.assertEqual(test_string, output, |
82 |
"x = %s, len(output) = %s" % (x, len(output))) |
83 |
+ |
84 |
+ output = loop.run_until_complete(self._testPipeLoggerToPipe(test_string, loop=loop)) |
85 |
+ self.assertEqual(test_string, output, |
86 |
+ "x = %s, len(output) = %s" % (x, len(output))) |
87 |
|
88 |
diff --git a/lib/portage/util/_async/PipeLogger.py b/lib/portage/util/_async/PipeLogger.py |
89 |
index a4258f350..6b03988a1 100644 |
90 |
--- a/lib/portage/util/_async/PipeLogger.py |
91 |
+++ b/lib/portage/util/_async/PipeLogger.py |
92 |
@@ -8,6 +8,9 @@ import sys |
93 |
|
94 |
import portage |
95 |
from portage import os, _encodings, _unicode_encode |
96 |
+from portage.util.futures import asyncio |
97 |
+from portage.util.futures.compat_coroutine import coroutine |
98 |
+from portage.util.futures.unix_events import _set_nonblocking |
99 |
from _emerge.AbstractPollTask import AbstractPollTask |
100 |
|
101 |
class PipeLogger(AbstractPollTask): |
102 |
@@ -21,13 +24,15 @@ class PipeLogger(AbstractPollTask): |
103 |
""" |
104 |
|
105 |
__slots__ = ("input_fd", "log_file_path", "stdout_fd") + \ |
106 |
- ("_log_file", "_log_file_real") |
107 |
+ ("_io_loop_task", "_log_file", "_log_file_real") |
108 |
|
109 |
def _start(self): |
110 |
|
111 |
log_file_path = self.log_file_path |
112 |
- if log_file_path is not None: |
113 |
- |
114 |
+ if hasattr(log_file_path, 'write'): |
115 |
+ self._log_file = log_file_path |
116 |
+ _set_nonblocking(self._log_file.fileno()) |
117 |
+ elif log_file_path is not None: |
118 |
self._log_file = open(_unicode_encode(log_file_path, |
119 |
encoding=_encodings['fs'], errors='strict'), mode='ab') |
120 |
if log_file_path.endswith('.gz'): |
121 |
@@ -57,7 +62,8 @@ class PipeLogger(AbstractPollTask): |
122 |
fcntl.fcntl(fd, fcntl.F_SETFD, |
123 |
fcntl.fcntl(fd, fcntl.F_GETFD) | fcntl.FD_CLOEXEC) |
124 |
|
125 |
- self.scheduler.add_reader(fd, self._output_handler, fd) |
126 |
+ self._io_loop_task = asyncio.ensure_future(self._io_loop(fd), loop=self.scheduler) |
127 |
+ self._io_loop_task.add_done_callback(self._io_loop_done) |
128 |
self._registered = True |
129 |
|
130 |
def _cancel(self): |
131 |
@@ -65,8 +71,8 @@ class PipeLogger(AbstractPollTask): |
132 |
if self.returncode is None: |
133 |
self.returncode = self._cancelled_returncode |
134 |
|
135 |
- def _output_handler(self, fd): |
136 |
- |
137 |
+ @coroutine |
138 |
+ def _io_loop(self, fd): |
139 |
background = self.background |
140 |
stdout_fd = self.stdout_fd |
141 |
log_file = self._log_file |
142 |
@@ -76,14 +82,18 @@ class PipeLogger(AbstractPollTask): |
143 |
|
144 |
if buf is None: |
145 |
# not a POLLIN event, EAGAIN, etc... |
146 |
- break |
147 |
+ future = self.scheduler.create_future() |
148 |
+ self.scheduler.add_reader(fd, future.set_result, None) |
149 |
+ try: |
150 |
+ yield future |
151 |
+ finally: |
152 |
+ self.scheduler.remove_reader(fd) |
153 |
+ future.done() or future.cancel() |
154 |
+ continue |
155 |
|
156 |
if not buf: |
157 |
# EOF |
158 |
- self._unregister() |
159 |
- self.returncode = self.returncode or os.EX_OK |
160 |
- self._async_wait() |
161 |
- break |
162 |
+ return |
163 |
|
164 |
else: |
165 |
if not background and stdout_fd is not None: |
166 |
@@ -120,8 +130,34 @@ class PipeLogger(AbstractPollTask): |
167 |
fcntl.F_GETFL) ^ os.O_NONBLOCK) |
168 |
|
169 |
if log_file is not None: |
170 |
- log_file.write(buf) |
171 |
- log_file.flush() |
172 |
+ write_buf = buf |
173 |
+ while True: |
174 |
+ try: |
175 |
+ if write_buf is not None: |
176 |
+ log_file.write(write_buf) |
177 |
+ write_buf = None |
178 |
+ log_file.flush() |
179 |
+ except EnvironmentError as e: |
180 |
+ if e.errno != errno.EAGAIN: |
181 |
+ raise |
182 |
+ future = self.scheduler.create_future() |
183 |
+ self.scheduler.add_writer(self._log_file.fileno(), future.set_result, None) |
184 |
+ try: |
185 |
+ yield future |
186 |
+ finally: |
187 |
+ self.scheduler.remove_writer(self._log_file.fileno()) |
188 |
+ future.done() or future.cancel() |
189 |
+ else: |
190 |
+ break |
191 |
+ |
192 |
+ def _io_loop_done(self, future): |
193 |
+ try: |
194 |
+ future.result() |
195 |
+ except asyncio.CancelledError: |
196 |
+ self.cancel() |
197 |
+ self._was_cancelled() |
198 |
+ self.returncode = self.returncode or os.EX_OK |
199 |
+ self._async_wait() |
200 |
|
201 |
def _unregister(self): |
202 |
if self.input_fd is not None: |
203 |
@@ -133,11 +169,16 @@ class PipeLogger(AbstractPollTask): |
204 |
self.input_fd.close() |
205 |
self.input_fd = None |
206 |
|
207 |
+ if self._io_loop_task is not None: |
208 |
+ self._io_loop_task.done() or self._io_loop_task.cancel() |
209 |
+ self._io_loop_task = None |
210 |
+ |
211 |
if self.stdout_fd is not None: |
212 |
os.close(self.stdout_fd) |
213 |
self.stdout_fd = None |
214 |
|
215 |
if self._log_file is not None: |
216 |
+ self.scheduler.remove_writer(self._log_file.fileno()) |
217 |
self._log_file.close() |
218 |
self._log_file = None |