1 |
Add support to write to a non-blocking pipe instead of a |
2 |
log file. This is needed for the purposes of bug 709746, |
3 |
where PipeLogger will write to a pipe that is drained |
4 |
by anoher PipeLogger instance which is running in the same |
5 |
process. |
6 |
|
7 |
Bug: https://bugs.gentoo.org/709746 |
8 |
Signed-off-by: Zac Medico <zmedico@g.o> |
9 |
--- |
10 |
lib/portage/tests/process/test_PipeLogger.py | 58 ++++++++++++++++ |
11 |
lib/portage/util/_async/PipeLogger.py | 73 +++++++++++++++----- |
12 |
2 files changed, 115 insertions(+), 16 deletions(-) |
13 |
create mode 100644 lib/portage/tests/process/test_PipeLogger.py |
14 |
|
15 |
diff --git a/lib/portage/tests/process/test_PipeLogger.py b/lib/portage/tests/process/test_PipeLogger.py |
16 |
new file mode 100644 |
17 |
index 000000000..2bd94cf39 |
18 |
--- /dev/null |
19 |
+++ b/lib/portage/tests/process/test_PipeLogger.py |
20 |
@@ -0,0 +1,58 @@ |
21 |
+# Copyright 2020 Gentoo Authors |
22 |
+# Distributed under the terms of the GNU General Public License v2 |
23 |
+ |
24 |
+from portage import os |
25 |
+from portage.tests import TestCase |
26 |
+from portage.util._async.PipeLogger import PipeLogger |
27 |
+from portage.util.futures import asyncio |
28 |
+from portage.util.futures._asyncio.streams import _reader, _writer |
29 |
+from portage.util.futures.compat_coroutine import coroutine, coroutine_return |
30 |
+from portage.util.futures.unix_events import _set_nonblocking |
31 |
+ |
32 |
+ |
33 |
+class PipeLoggerTestCase(TestCase): |
34 |
+ |
35 |
+ @coroutine |
36 |
+ def _testPipeLoggerToPipe(self, test_string, loop=None): |
37 |
+ """ |
38 |
+ Test PipeLogger writing to a pipe connected to a PipeReader. |
39 |
+ This verifies that PipeLogger does not deadlock when writing |
40 |
+ to a pipe that's drained by a PipeReader running in the same |
41 |
+ process (requires non-blocking write). |
42 |
+ """ |
43 |
+ |
44 |
+ input_fd, writer_pipe = os.pipe() |
45 |
+ _set_nonblocking(writer_pipe) |
46 |
+ writer_pipe = os.fdopen(writer_pipe, 'wb', 0) |
47 |
+ writer = asyncio.ensure_future(_writer(writer_pipe, test_string.encode('ascii'), loop=loop), loop=loop) |
48 |
+ writer.add_done_callback(lambda writer: writer_pipe.close()) |
49 |
+ |
50 |
+ pr, pw = os.pipe() |
51 |
+ |
52 |
+ consumer = PipeLogger(background=True, |
53 |
+ input_fd=input_fd, |
54 |
+ log_file_path=os.fdopen(pw, 'wb', 0), |
55 |
+ scheduler=loop) |
56 |
+ consumer.start() |
57 |
+ |
58 |
+ # Before starting the reader, wait here for a moment, in order |
59 |
+ # to exercise PipeLogger's handling of EAGAIN during write. |
60 |
+ yield asyncio.wait([writer], timeout=0.01) |
61 |
+ |
62 |
+ reader = _reader(pr, loop=loop) |
63 |
+ yield writer |
64 |
+ content = yield reader |
65 |
+ yield consumer.async_wait() |
66 |
+ |
67 |
+ self.assertEqual(consumer.returncode, os.EX_OK) |
68 |
+ |
69 |
+ coroutine_return(content.decode('ascii', 'replace')) |
70 |
+ |
71 |
+ def testPipeLogger(self): |
72 |
+ loop = asyncio._wrap_loop() |
73 |
+ |
74 |
+ for x in (1, 2, 5, 6, 7, 8, 2**5, 2**10, 2**12, 2**13, 2**14, 2**17, 2**17 + 1): |
75 |
+ test_string = x * "a" |
76 |
+ output = loop.run_until_complete(self._testPipeLoggerToPipe(test_string, loop=loop)) |
77 |
+ self.assertEqual(test_string, output, |
78 |
+ "x = %s, len(output) = %s" % (x, len(output))) |
79 |
diff --git a/lib/portage/util/_async/PipeLogger.py b/lib/portage/util/_async/PipeLogger.py |
80 |
index a4258f350..ce8afb846 100644 |
81 |
--- a/lib/portage/util/_async/PipeLogger.py |
82 |
+++ b/lib/portage/util/_async/PipeLogger.py |
83 |
@@ -8,6 +8,10 @@ import sys |
84 |
|
85 |
import portage |
86 |
from portage import os, _encodings, _unicode_encode |
87 |
+from portage.util.futures import asyncio |
88 |
+from portage.util.futures._asyncio.streams import _writer |
89 |
+from portage.util.futures.compat_coroutine import coroutine |
90 |
+from portage.util.futures.unix_events import _set_nonblocking |
91 |
from _emerge.AbstractPollTask import AbstractPollTask |
92 |
|
93 |
class PipeLogger(AbstractPollTask): |
94 |
@@ -21,13 +25,16 @@ class PipeLogger(AbstractPollTask): |
95 |
""" |
96 |
|
97 |
__slots__ = ("input_fd", "log_file_path", "stdout_fd") + \ |
98 |
- ("_log_file", "_log_file_real") |
99 |
+ ("_io_loop_task", "_log_file", "_log_file_nb", "_log_file_real") |
100 |
|
101 |
def _start(self): |
102 |
|
103 |
log_file_path = self.log_file_path |
104 |
- if log_file_path is not None: |
105 |
- |
106 |
+ if hasattr(log_file_path, 'write'): |
107 |
+ self._log_file_nb = True |
108 |
+ self._log_file = log_file_path |
109 |
+ _set_nonblocking(self._log_file.fileno()) |
110 |
+ elif log_file_path is not None: |
111 |
self._log_file = open(_unicode_encode(log_file_path, |
112 |
encoding=_encodings['fs'], errors='strict'), mode='ab') |
113 |
if log_file_path.endswith('.gz'): |
114 |
@@ -40,9 +47,9 @@ class PipeLogger(AbstractPollTask): |
115 |
mode=0o660) |
116 |
|
117 |
if isinstance(self.input_fd, int): |
118 |
- fd = self.input_fd |
119 |
- else: |
120 |
- fd = self.input_fd.fileno() |
121 |
+ self.input_fd = os.fdopen(self.input_fd, 'rb', 0) |
122 |
+ |
123 |
+ fd = self.input_fd.fileno() |
124 |
|
125 |
fcntl.fcntl(fd, fcntl.F_SETFL, |
126 |
fcntl.fcntl(fd, fcntl.F_GETFL) | os.O_NONBLOCK) |
127 |
@@ -57,7 +64,8 @@ class PipeLogger(AbstractPollTask): |
128 |
fcntl.fcntl(fd, fcntl.F_SETFD, |
129 |
fcntl.fcntl(fd, fcntl.F_GETFD) | fcntl.FD_CLOEXEC) |
130 |
|
131 |
- self.scheduler.add_reader(fd, self._output_handler, fd) |
132 |
+ self._io_loop_task = asyncio.ensure_future(self._io_loop(self.input_fd), loop=self.scheduler) |
133 |
+ self._io_loop_task.add_done_callback(self._io_loop_done) |
134 |
self._registered = True |
135 |
|
136 |
def _cancel(self): |
137 |
@@ -65,25 +73,36 @@ class PipeLogger(AbstractPollTask): |
138 |
if self.returncode is None: |
139 |
self.returncode = self._cancelled_returncode |
140 |
|
141 |
- def _output_handler(self, fd): |
142 |
- |
143 |
+ @coroutine |
144 |
+ def _io_loop(self, input_file): |
145 |
background = self.background |
146 |
stdout_fd = self.stdout_fd |
147 |
log_file = self._log_file |
148 |
+ fd = input_file.fileno() |
149 |
|
150 |
while True: |
151 |
buf = self._read_buf(fd) |
152 |
|
153 |
if buf is None: |
154 |
# not a POLLIN event, EAGAIN, etc... |
155 |
- break |
156 |
+ future = self.scheduler.create_future() |
157 |
+ self.scheduler.add_reader(fd, future.set_result, None) |
158 |
+ try: |
159 |
+ yield future |
160 |
+ finally: |
161 |
+ # The loop and input file may have been closed. |
162 |
+ if not self.scheduler.is_closed(): |
163 |
+ future.done() or future.cancel() |
164 |
+ # Do not call remove_reader in cases where fd has |
165 |
+ # been closed and then re-allocated to a concurrent |
166 |
+ # coroutine as in bug 716636. |
167 |
+ if not input_file.closed: |
168 |
+ self.scheduler.remove_reader(fd) |
169 |
+ continue |
170 |
|
171 |
if not buf: |
172 |
# EOF |
173 |
- self._unregister() |
174 |
- self.returncode = self.returncode or os.EX_OK |
175 |
- self._async_wait() |
176 |
- break |
177 |
+ return |
178 |
|
179 |
else: |
180 |
if not background and stdout_fd is not None: |
181 |
@@ -120,8 +139,25 @@ class PipeLogger(AbstractPollTask): |
182 |
fcntl.F_GETFL) ^ os.O_NONBLOCK) |
183 |
|
184 |
if log_file is not None: |
185 |
- log_file.write(buf) |
186 |
- log_file.flush() |
187 |
+ if self._log_file_nb: |
188 |
+ # Use the _writer function which uses os.write, since the |
189 |
+ # log_file.write method looses data when an EAGAIN occurs. |
190 |
+ yield _writer(log_file, buf, loop=self.scheduler) |
191 |
+ else: |
192 |
+ # For gzip.GzipFile instances, the above _writer function |
193 |
+ # will not work because data written directly to the file |
194 |
+ # descriptor bypasses compression. |
195 |
+ log_file.write(buf) |
196 |
+ log_file.flush() |
197 |
+ |
198 |
+ def _io_loop_done(self, future): |
199 |
+ try: |
200 |
+ future.result() |
201 |
+ except asyncio.CancelledError: |
202 |
+ self.cancel() |
203 |
+ self._was_cancelled() |
204 |
+ self.returncode = self.returncode or os.EX_OK |
205 |
+ self._async_wait() |
206 |
|
207 |
def _unregister(self): |
208 |
if self.input_fd is not None: |
209 |
@@ -133,11 +169,16 @@ class PipeLogger(AbstractPollTask): |
210 |
self.input_fd.close() |
211 |
self.input_fd = None |
212 |
|
213 |
+ if self._io_loop_task is not None: |
214 |
+ self._io_loop_task.done() or self._io_loop_task.cancel() |
215 |
+ self._io_loop_task = None |
216 |
+ |
217 |
if self.stdout_fd is not None: |
218 |
os.close(self.stdout_fd) |
219 |
self.stdout_fd = None |
220 |
|
221 |
if self._log_file is not None: |
222 |
+ self.scheduler.remove_writer(self._log_file.fileno()) |
223 |
self._log_file.close() |
224 |
self._log_file = None |
225 |
|
226 |
-- |
227 |
2.25.3 |