1 |
On Fri, 19 Jun 2020 13:39:18 -0700 |
2 |
Zac Medico <zmedico@g.o> wrote: |
3 |
|
4 |
> Add support to write to a non-blocking pipe instead of a |
5 |
> log file. This is needed for the purposes of bug 709746, |
6 |
> where PipeLogger will write to a pipe that is drained |
7 |
> by anoher PipeLogger instance which is running in the same |
8 |
> process. |
9 |
> |
10 |
> Bug: https://bugs.gentoo.org/709746 |
11 |
> Signed-off-by: Zac Medico <zmedico@g.o> |
12 |
> --- |
13 |
> lib/portage/tests/process/test_PipeLogger.py | 58 ++++++++++++++++ |
14 |
> lib/portage/util/_async/PipeLogger.py | 73 |
15 |
> +++++++++++++++----- 2 files changed, 115 insertions(+), 16 |
16 |
> deletions(-) create mode 100644 |
17 |
> lib/portage/tests/process/test_PipeLogger.py |
18 |
> |
19 |
> diff --git a/lib/portage/tests/process/test_PipeLogger.py |
20 |
> b/lib/portage/tests/process/test_PipeLogger.py new file mode 100644 |
21 |
> index 000000000..2bd94cf39 |
22 |
> --- /dev/null |
23 |
> +++ b/lib/portage/tests/process/test_PipeLogger.py |
24 |
> @@ -0,0 +1,58 @@ |
25 |
> +# Copyright 2020 Gentoo Authors |
26 |
> +# Distributed under the terms of the GNU General Public License v2 |
27 |
> + |
28 |
> +from portage import os |
29 |
> +from portage.tests import TestCase |
30 |
> +from portage.util._async.PipeLogger import PipeLogger |
31 |
> +from portage.util.futures import asyncio |
32 |
> +from portage.util.futures._asyncio.streams import _reader, _writer |
33 |
> +from portage.util.futures.compat_coroutine import coroutine, |
34 |
> coroutine_return +from portage.util.futures.unix_events import |
35 |
> _set_nonblocking + |
36 |
> + |
37 |
> +class PipeLoggerTestCase(TestCase): |
38 |
> + |
39 |
> + @coroutine |
40 |
> + def _testPipeLoggerToPipe(self, test_string, loop=None): |
41 |
> + """ |
42 |
> + Test PipeLogger writing to a pipe connected to a |
43 |
> PipeReader. |
44 |
> + This verifies that PipeLogger does not deadlock when |
45 |
> writing |
46 |
> + to a pipe that's drained by a PipeReader running in |
47 |
> the same |
48 |
> + process (requires non-blocking write). |
49 |
> + """ |
50 |
> + |
51 |
> + input_fd, writer_pipe = os.pipe() |
52 |
> + _set_nonblocking(writer_pipe) |
53 |
> + writer_pipe = os.fdopen(writer_pipe, 'wb', 0) |
54 |
> + writer = asyncio.ensure_future(_writer(writer_pipe, |
55 |
> test_string.encode('ascii'), loop=loop), loop=loop) |
56 |
> + writer.add_done_callback(lambda writer: |
57 |
> writer_pipe.close()) + |
58 |
> + pr, pw = os.pipe() |
59 |
> + |
60 |
> + consumer = PipeLogger(background=True, |
61 |
> + input_fd=input_fd, |
62 |
> + log_file_path=os.fdopen(pw, 'wb', 0), |
63 |
> + scheduler=loop) |
64 |
> + consumer.start() |
65 |
> + |
66 |
> + # Before starting the reader, wait here for a |
67 |
> moment, in order |
68 |
> + # to exercise PipeLogger's handling of EAGAIN during |
69 |
> write. |
70 |
> + yield asyncio.wait([writer], timeout=0.01) |
71 |
> + |
72 |
> + reader = _reader(pr, loop=loop) |
73 |
> + yield writer |
74 |
> + content = yield reader |
75 |
> + yield consumer.async_wait() |
76 |
> + |
77 |
> + self.assertEqual(consumer.returncode, os.EX_OK) |
78 |
> + |
79 |
> + coroutine_return(content.decode('ascii', 'replace')) |
80 |
> + |
81 |
> + def testPipeLogger(self): |
82 |
> + loop = asyncio._wrap_loop() |
83 |
> + |
84 |
> + for x in (1, 2, 5, 6, 7, 8, 2**5, 2**10, 2**12, |
85 |
> 2**13, 2**14, 2**17, 2**17 + 1): |
86 |
> + test_string = x * "a" |
87 |
> + output = |
88 |
> loop.run_until_complete(self._testPipeLoggerToPipe(test_string, |
89 |
> loop=loop)) |
90 |
> + self.assertEqual(test_string, output, |
91 |
> + "x = %s, len(output) = %s" % (x, |
92 |
> len(output))) diff --git a/lib/portage/util/_async/PipeLogger.py |
93 |
> b/lib/portage/util/_async/PipeLogger.py index a4258f350..ce8afb846 |
94 |
> 100644 --- a/lib/portage/util/_async/PipeLogger.py |
95 |
> +++ b/lib/portage/util/_async/PipeLogger.py |
96 |
> @@ -8,6 +8,10 @@ import sys |
97 |
> |
98 |
> import portage |
99 |
> from portage import os, _encodings, _unicode_encode |
100 |
> +from portage.util.futures import asyncio |
101 |
> +from portage.util.futures._asyncio.streams import _writer |
102 |
> +from portage.util.futures.compat_coroutine import coroutine |
103 |
> +from portage.util.futures.unix_events import _set_nonblocking |
104 |
> from _emerge.AbstractPollTask import AbstractPollTask |
105 |
> |
106 |
> class PipeLogger(AbstractPollTask): |
107 |
> @@ -21,13 +25,16 @@ class PipeLogger(AbstractPollTask): |
108 |
> """ |
109 |
> |
110 |
> __slots__ = ("input_fd", "log_file_path", "stdout_fd") + \ |
111 |
> - ("_log_file", "_log_file_real") |
112 |
> + ("_io_loop_task", "_log_file", "_log_file_nb", |
113 |
> "_log_file_real") |
114 |
> def _start(self): |
115 |
> |
116 |
> log_file_path = self.log_file_path |
117 |
> - if log_file_path is not None: |
118 |
> - |
119 |
> + if hasattr(log_file_path, 'write'): |
120 |
> + self._log_file_nb = True |
121 |
> + self._log_file = log_file_path |
122 |
> + _set_nonblocking(self._log_file.fileno()) |
123 |
> + elif log_file_path is not None: |
124 |
> self._log_file = |
125 |
> open(_unicode_encode(log_file_path, encoding=_encodings['fs'], |
126 |
> errors='strict'), mode='ab') if log_file_path.endswith('.gz'): |
127 |
> @@ -40,9 +47,9 @@ class PipeLogger(AbstractPollTask): |
128 |
> mode=0o660) |
129 |
> |
130 |
> if isinstance(self.input_fd, int): |
131 |
> - fd = self.input_fd |
132 |
> - else: |
133 |
> - fd = self.input_fd.fileno() |
134 |
> + self.input_fd = os.fdopen(self.input_fd, |
135 |
> 'rb', 0) + |
136 |
> + fd = self.input_fd.fileno() |
137 |
> |
138 |
> fcntl.fcntl(fd, fcntl.F_SETFL, |
139 |
> fcntl.fcntl(fd, fcntl.F_GETFL) | |
140 |
> os.O_NONBLOCK) @@ -57,7 +64,8 @@ class PipeLogger(AbstractPollTask): |
141 |
> fcntl.fcntl(fd, fcntl.F_SETFD, |
142 |
> fcntl.fcntl(fd, |
143 |
> fcntl.F_GETFD) | fcntl.FD_CLOEXEC) |
144 |
> - self.scheduler.add_reader(fd, self._output_handler, |
145 |
> fd) |
146 |
> + self._io_loop_task = |
147 |
> asyncio.ensure_future(self._io_loop(self.input_fd), |
148 |
> loop=self.scheduler) |
149 |
> + |
150 |
> self._io_loop_task.add_done_callback(self._io_loop_done) |
151 |
> self._registered = True |
152 |
> def _cancel(self): |
153 |
> @@ -65,25 +73,36 @@ class PipeLogger(AbstractPollTask): |
154 |
> if self.returncode is None: |
155 |
> self.returncode = self._cancelled_returncode |
156 |
> |
157 |
> - def _output_handler(self, fd): |
158 |
> - |
159 |
> + @coroutine |
160 |
> + def _io_loop(self, input_file): |
161 |
> background = self.background |
162 |
> stdout_fd = self.stdout_fd |
163 |
> log_file = self._log_file |
164 |
> + fd = input_file.fileno() |
165 |
> |
166 |
> while True: |
167 |
> buf = self._read_buf(fd) |
168 |
> |
169 |
> if buf is None: |
170 |
> # not a POLLIN event, EAGAIN, etc... |
171 |
> - break |
172 |
> + future = |
173 |
> self.scheduler.create_future() |
174 |
> + self.scheduler.add_reader(fd, |
175 |
> future.set_result, None) |
176 |
> + try: |
177 |
> + yield future |
178 |
> + finally: |
179 |
> + # The loop and input file |
180 |
> may have been closed. |
181 |
> + if not |
182 |
> self.scheduler.is_closed(): |
183 |
> + future.done() or |
184 |
> future.cancel() |
185 |
> + # Do not call |
186 |
> remove_reader in cases where fd has |
187 |
> + # been closed and |
188 |
> then re-allocated to a concurrent |
189 |
> + # coroutine as in |
190 |
> bug 716636. |
191 |
> + if not |
192 |
> input_file.closed: |
193 |
> + |
194 |
> self.scheduler.remove_reader(fd) |
195 |
> + continue |
196 |
> |
197 |
> if not buf: |
198 |
> # EOF |
199 |
> - self._unregister() |
200 |
> - self.returncode = self.returncode or |
201 |
> os.EX_OK |
202 |
> - self._async_wait() |
203 |
> - break |
204 |
> + return |
205 |
> |
206 |
> else: |
207 |
> if not background and stdout_fd is |
208 |
> not None: @@ -120,8 +139,25 @@ class PipeLogger(AbstractPollTask): |
209 |
> fcntl.F_GETFL) |
210 |
> ^ os.O_NONBLOCK) |
211 |
> if log_file is not None: |
212 |
> - log_file.write(buf) |
213 |
> - log_file.flush() |
214 |
> + if self._log_file_nb: |
215 |
> + # Use the _writer |
216 |
> function which uses os.write, since the |
217 |
> + # log_file.write |
218 |
> method looses data when an EAGAIN occurs. |
219 |
> + yield |
220 |
> _writer(log_file, buf, loop=self.scheduler) |
221 |
> + else: |
222 |
> + # For gzip.GzipFile |
223 |
> instances, the above _writer function |
224 |
> + # will not work |
225 |
> because data written directly to the file |
226 |
> + # descriptor |
227 |
> bypasses compression. |
228 |
> + log_file.write(buf) |
229 |
> + log_file.flush() |
230 |
> + |
231 |
> + def _io_loop_done(self, future): |
232 |
> + try: |
233 |
> + future.result() |
234 |
> + except asyncio.CancelledError: |
235 |
> + self.cancel() |
236 |
> + self._was_cancelled() |
237 |
> + self.returncode = self.returncode or os.EX_OK |
238 |
> + self._async_wait() |
239 |
> |
240 |
> def _unregister(self): |
241 |
> if self.input_fd is not None: |
242 |
> @@ -133,11 +169,16 @@ class PipeLogger(AbstractPollTask): |
243 |
> self.input_fd.close() |
244 |
> self.input_fd = None |
245 |
> |
246 |
> + if self._io_loop_task is not None: |
247 |
> + self._io_loop_task.done() or |
248 |
> self._io_loop_task.cancel() |
249 |
> + self._io_loop_task = None |
250 |
> + |
251 |
> if self.stdout_fd is not None: |
252 |
> os.close(self.stdout_fd) |
253 |
> self.stdout_fd = None |
254 |
> |
255 |
> if self._log_file is not None: |
256 |
> + |
257 |
> self.scheduler.remove_writer(self._log_file.fileno()) |
258 |
> self._log_file.close() self._log_file = None |
259 |
> |
260 |
|
261 |
Looks good |