Gentoo Archives: gentoo-portage-dev

From: Zac Medico <zmedico@g.o>
To: gentoo-portage-dev@l.g.o
Cc: Zac Medico <zmedico@g.o>
Subject: [gentoo-portage-dev] [PATCH 1/2] PipeLogger: non-blocking write to pipe (bug 709746)
Date: Fri, 19 Jun 2020 20:39:48
Message-Id: 20200619203919.22519-2-zmedico@gentoo.org
In Reply to: [gentoo-portage-dev] [PATCH 0/2] Support PORTAGE_LOG_FILTER_FILE_CMD (bug 709746) by Zac Medico
1 Add support to write to a non-blocking pipe instead of a
2 log file. This is needed for the purposes of bug 709746,
3 where PipeLogger will write to a pipe that is drained
4 by anoher PipeLogger instance which is running in the same
5 process.
6
7 Bug: https://bugs.gentoo.org/709746
8 Signed-off-by: Zac Medico <zmedico@g.o>
9 ---
10 lib/portage/tests/process/test_PipeLogger.py | 58 ++++++++++++++++
11 lib/portage/util/_async/PipeLogger.py | 73 +++++++++++++++-----
12 2 files changed, 115 insertions(+), 16 deletions(-)
13 create mode 100644 lib/portage/tests/process/test_PipeLogger.py
14
15 diff --git a/lib/portage/tests/process/test_PipeLogger.py b/lib/portage/tests/process/test_PipeLogger.py
16 new file mode 100644
17 index 000000000..2bd94cf39
18 --- /dev/null
19 +++ b/lib/portage/tests/process/test_PipeLogger.py
20 @@ -0,0 +1,58 @@
21 +# Copyright 2020 Gentoo Authors
22 +# Distributed under the terms of the GNU General Public License v2
23 +
24 +from portage import os
25 +from portage.tests import TestCase
26 +from portage.util._async.PipeLogger import PipeLogger
27 +from portage.util.futures import asyncio
28 +from portage.util.futures._asyncio.streams import _reader, _writer
29 +from portage.util.futures.compat_coroutine import coroutine, coroutine_return
30 +from portage.util.futures.unix_events import _set_nonblocking
31 +
32 +
33 +class PipeLoggerTestCase(TestCase):
34 +
35 + @coroutine
36 + def _testPipeLoggerToPipe(self, test_string, loop=None):
37 + """
38 + Test PipeLogger writing to a pipe connected to a PipeReader.
39 + This verifies that PipeLogger does not deadlock when writing
40 + to a pipe that's drained by a PipeReader running in the same
41 + process (requires non-blocking write).
42 + """
43 +
44 + input_fd, writer_pipe = os.pipe()
45 + _set_nonblocking(writer_pipe)
46 + writer_pipe = os.fdopen(writer_pipe, 'wb', 0)
47 + writer = asyncio.ensure_future(_writer(writer_pipe, test_string.encode('ascii'), loop=loop), loop=loop)
48 + writer.add_done_callback(lambda writer: writer_pipe.close())
49 +
50 + pr, pw = os.pipe()
51 +
52 + consumer = PipeLogger(background=True,
53 + input_fd=input_fd,
54 + log_file_path=os.fdopen(pw, 'wb', 0),
55 + scheduler=loop)
56 + consumer.start()
57 +
58 + # Before starting the reader, wait here for a moment, in order
59 + # to exercise PipeLogger's handling of EAGAIN during write.
60 + yield asyncio.wait([writer], timeout=0.01)
61 +
62 + reader = _reader(pr, loop=loop)
63 + yield writer
64 + content = yield reader
65 + yield consumer.async_wait()
66 +
67 + self.assertEqual(consumer.returncode, os.EX_OK)
68 +
69 + coroutine_return(content.decode('ascii', 'replace'))
70 +
71 + def testPipeLogger(self):
72 + loop = asyncio._wrap_loop()
73 +
74 + for x in (1, 2, 5, 6, 7, 8, 2**5, 2**10, 2**12, 2**13, 2**14, 2**17, 2**17 + 1):
75 + test_string = x * "a"
76 + output = loop.run_until_complete(self._testPipeLoggerToPipe(test_string, loop=loop))
77 + self.assertEqual(test_string, output,
78 + "x = %s, len(output) = %s" % (x, len(output)))
79 diff --git a/lib/portage/util/_async/PipeLogger.py b/lib/portage/util/_async/PipeLogger.py
80 index a4258f350..ce8afb846 100644
81 --- a/lib/portage/util/_async/PipeLogger.py
82 +++ b/lib/portage/util/_async/PipeLogger.py
83 @@ -8,6 +8,10 @@ import sys
84
85 import portage
86 from portage import os, _encodings, _unicode_encode
87 +from portage.util.futures import asyncio
88 +from portage.util.futures._asyncio.streams import _writer
89 +from portage.util.futures.compat_coroutine import coroutine
90 +from portage.util.futures.unix_events import _set_nonblocking
91 from _emerge.AbstractPollTask import AbstractPollTask
92
93 class PipeLogger(AbstractPollTask):
94 @@ -21,13 +25,16 @@ class PipeLogger(AbstractPollTask):
95 """
96
97 __slots__ = ("input_fd", "log_file_path", "stdout_fd") + \
98 - ("_log_file", "_log_file_real")
99 + ("_io_loop_task", "_log_file", "_log_file_nb", "_log_file_real")
100
101 def _start(self):
102
103 log_file_path = self.log_file_path
104 - if log_file_path is not None:
105 -
106 + if hasattr(log_file_path, 'write'):
107 + self._log_file_nb = True
108 + self._log_file = log_file_path
109 + _set_nonblocking(self._log_file.fileno())
110 + elif log_file_path is not None:
111 self._log_file = open(_unicode_encode(log_file_path,
112 encoding=_encodings['fs'], errors='strict'), mode='ab')
113 if log_file_path.endswith('.gz'):
114 @@ -40,9 +47,9 @@ class PipeLogger(AbstractPollTask):
115 mode=0o660)
116
117 if isinstance(self.input_fd, int):
118 - fd = self.input_fd
119 - else:
120 - fd = self.input_fd.fileno()
121 + self.input_fd = os.fdopen(self.input_fd, 'rb', 0)
122 +
123 + fd = self.input_fd.fileno()
124
125 fcntl.fcntl(fd, fcntl.F_SETFL,
126 fcntl.fcntl(fd, fcntl.F_GETFL) | os.O_NONBLOCK)
127 @@ -57,7 +64,8 @@ class PipeLogger(AbstractPollTask):
128 fcntl.fcntl(fd, fcntl.F_SETFD,
129 fcntl.fcntl(fd, fcntl.F_GETFD) | fcntl.FD_CLOEXEC)
130
131 - self.scheduler.add_reader(fd, self._output_handler, fd)
132 + self._io_loop_task = asyncio.ensure_future(self._io_loop(self.input_fd), loop=self.scheduler)
133 + self._io_loop_task.add_done_callback(self._io_loop_done)
134 self._registered = True
135
136 def _cancel(self):
137 @@ -65,25 +73,36 @@ class PipeLogger(AbstractPollTask):
138 if self.returncode is None:
139 self.returncode = self._cancelled_returncode
140
141 - def _output_handler(self, fd):
142 -
143 + @coroutine
144 + def _io_loop(self, input_file):
145 background = self.background
146 stdout_fd = self.stdout_fd
147 log_file = self._log_file
148 + fd = input_file.fileno()
149
150 while True:
151 buf = self._read_buf(fd)
152
153 if buf is None:
154 # not a POLLIN event, EAGAIN, etc...
155 - break
156 + future = self.scheduler.create_future()
157 + self.scheduler.add_reader(fd, future.set_result, None)
158 + try:
159 + yield future
160 + finally:
161 + # The loop and input file may have been closed.
162 + if not self.scheduler.is_closed():
163 + future.done() or future.cancel()
164 + # Do not call remove_reader in cases where fd has
165 + # been closed and then re-allocated to a concurrent
166 + # coroutine as in bug 716636.
167 + if not input_file.closed:
168 + self.scheduler.remove_reader(fd)
169 + continue
170
171 if not buf:
172 # EOF
173 - self._unregister()
174 - self.returncode = self.returncode or os.EX_OK
175 - self._async_wait()
176 - break
177 + return
178
179 else:
180 if not background and stdout_fd is not None:
181 @@ -120,8 +139,25 @@ class PipeLogger(AbstractPollTask):
182 fcntl.F_GETFL) ^ os.O_NONBLOCK)
183
184 if log_file is not None:
185 - log_file.write(buf)
186 - log_file.flush()
187 + if self._log_file_nb:
188 + # Use the _writer function which uses os.write, since the
189 + # log_file.write method looses data when an EAGAIN occurs.
190 + yield _writer(log_file, buf, loop=self.scheduler)
191 + else:
192 + # For gzip.GzipFile instances, the above _writer function
193 + # will not work because data written directly to the file
194 + # descriptor bypasses compression.
195 + log_file.write(buf)
196 + log_file.flush()
197 +
198 + def _io_loop_done(self, future):
199 + try:
200 + future.result()
201 + except asyncio.CancelledError:
202 + self.cancel()
203 + self._was_cancelled()
204 + self.returncode = self.returncode or os.EX_OK
205 + self._async_wait()
206
207 def _unregister(self):
208 if self.input_fd is not None:
209 @@ -133,11 +169,16 @@ class PipeLogger(AbstractPollTask):
210 self.input_fd.close()
211 self.input_fd = None
212
213 + if self._io_loop_task is not None:
214 + self._io_loop_task.done() or self._io_loop_task.cancel()
215 + self._io_loop_task = None
216 +
217 if self.stdout_fd is not None:
218 os.close(self.stdout_fd)
219 self.stdout_fd = None
220
221 if self._log_file is not None:
222 + self.scheduler.remove_writer(self._log_file.fileno())
223 self._log_file.close()
224 self._log_file = None
225
226 --
227 2.25.3

Replies