Gentoo Archives: gentoo-portage-dev

From: Brian Dolbec <dolsen@g.o>
To: gentoo-portage-dev@l.g.o
Subject: Re: [gentoo-portage-dev] [PATCH 1/2] PipeLogger: non-blocking write to pipe (bug 709746)
Date: Mon, 22 Jun 2020 14:35:51
Message-Id: 20200622073545.2edc73c8@storm
In Reply to: [gentoo-portage-dev] [PATCH 1/2] PipeLogger: non-blocking write to pipe (bug 709746) by Zac Medico
1 On Fri, 19 Jun 2020 13:39:18 -0700
2 Zac Medico <zmedico@g.o> wrote:
3
4 > Add support to write to a non-blocking pipe instead of a
5 > log file. This is needed for the purposes of bug 709746,
6 > where PipeLogger will write to a pipe that is drained
7 > by anoher PipeLogger instance which is running in the same
8 > process.
9 >
10 > Bug: https://bugs.gentoo.org/709746
11 > Signed-off-by: Zac Medico <zmedico@g.o>
12 > ---
13 > lib/portage/tests/process/test_PipeLogger.py | 58 ++++++++++++++++
14 > lib/portage/util/_async/PipeLogger.py | 73
15 > +++++++++++++++----- 2 files changed, 115 insertions(+), 16
16 > deletions(-) create mode 100644
17 > lib/portage/tests/process/test_PipeLogger.py
18 >
19 > diff --git a/lib/portage/tests/process/test_PipeLogger.py
20 > b/lib/portage/tests/process/test_PipeLogger.py new file mode 100644
21 > index 000000000..2bd94cf39
22 > --- /dev/null
23 > +++ b/lib/portage/tests/process/test_PipeLogger.py
24 > @@ -0,0 +1,58 @@
25 > +# Copyright 2020 Gentoo Authors
26 > +# Distributed under the terms of the GNU General Public License v2
27 > +
28 > +from portage import os
29 > +from portage.tests import TestCase
30 > +from portage.util._async.PipeLogger import PipeLogger
31 > +from portage.util.futures import asyncio
32 > +from portage.util.futures._asyncio.streams import _reader, _writer
33 > +from portage.util.futures.compat_coroutine import coroutine,
34 > coroutine_return +from portage.util.futures.unix_events import
35 > _set_nonblocking +
36 > +
37 > +class PipeLoggerTestCase(TestCase):
38 > +
39 > + @coroutine
40 > + def _testPipeLoggerToPipe(self, test_string, loop=None):
41 > + """
42 > + Test PipeLogger writing to a pipe connected to a
43 > PipeReader.
44 > + This verifies that PipeLogger does not deadlock when
45 > writing
46 > + to a pipe that's drained by a PipeReader running in
47 > the same
48 > + process (requires non-blocking write).
49 > + """
50 > +
51 > + input_fd, writer_pipe = os.pipe()
52 > + _set_nonblocking(writer_pipe)
53 > + writer_pipe = os.fdopen(writer_pipe, 'wb', 0)
54 > + writer = asyncio.ensure_future(_writer(writer_pipe,
55 > test_string.encode('ascii'), loop=loop), loop=loop)
56 > + writer.add_done_callback(lambda writer:
57 > writer_pipe.close()) +
58 > + pr, pw = os.pipe()
59 > +
60 > + consumer = PipeLogger(background=True,
61 > + input_fd=input_fd,
62 > + log_file_path=os.fdopen(pw, 'wb', 0),
63 > + scheduler=loop)
64 > + consumer.start()
65 > +
66 > + # Before starting the reader, wait here for a
67 > moment, in order
68 > + # to exercise PipeLogger's handling of EAGAIN during
69 > write.
70 > + yield asyncio.wait([writer], timeout=0.01)
71 > +
72 > + reader = _reader(pr, loop=loop)
73 > + yield writer
74 > + content = yield reader
75 > + yield consumer.async_wait()
76 > +
77 > + self.assertEqual(consumer.returncode, os.EX_OK)
78 > +
79 > + coroutine_return(content.decode('ascii', 'replace'))
80 > +
81 > + def testPipeLogger(self):
82 > + loop = asyncio._wrap_loop()
83 > +
84 > + for x in (1, 2, 5, 6, 7, 8, 2**5, 2**10, 2**12,
85 > 2**13, 2**14, 2**17, 2**17 + 1):
86 > + test_string = x * "a"
87 > + output =
88 > loop.run_until_complete(self._testPipeLoggerToPipe(test_string,
89 > loop=loop))
90 > + self.assertEqual(test_string, output,
91 > + "x = %s, len(output) = %s" % (x,
92 > len(output))) diff --git a/lib/portage/util/_async/PipeLogger.py
93 > b/lib/portage/util/_async/PipeLogger.py index a4258f350..ce8afb846
94 > 100644 --- a/lib/portage/util/_async/PipeLogger.py
95 > +++ b/lib/portage/util/_async/PipeLogger.py
96 > @@ -8,6 +8,10 @@ import sys
97 >
98 > import portage
99 > from portage import os, _encodings, _unicode_encode
100 > +from portage.util.futures import asyncio
101 > +from portage.util.futures._asyncio.streams import _writer
102 > +from portage.util.futures.compat_coroutine import coroutine
103 > +from portage.util.futures.unix_events import _set_nonblocking
104 > from _emerge.AbstractPollTask import AbstractPollTask
105 >
106 > class PipeLogger(AbstractPollTask):
107 > @@ -21,13 +25,16 @@ class PipeLogger(AbstractPollTask):
108 > """
109 >
110 > __slots__ = ("input_fd", "log_file_path", "stdout_fd") + \
111 > - ("_log_file", "_log_file_real")
112 > + ("_io_loop_task", "_log_file", "_log_file_nb",
113 > "_log_file_real")
114 > def _start(self):
115 >
116 > log_file_path = self.log_file_path
117 > - if log_file_path is not None:
118 > -
119 > + if hasattr(log_file_path, 'write'):
120 > + self._log_file_nb = True
121 > + self._log_file = log_file_path
122 > + _set_nonblocking(self._log_file.fileno())
123 > + elif log_file_path is not None:
124 > self._log_file =
125 > open(_unicode_encode(log_file_path, encoding=_encodings['fs'],
126 > errors='strict'), mode='ab') if log_file_path.endswith('.gz'):
127 > @@ -40,9 +47,9 @@ class PipeLogger(AbstractPollTask):
128 > mode=0o660)
129 >
130 > if isinstance(self.input_fd, int):
131 > - fd = self.input_fd
132 > - else:
133 > - fd = self.input_fd.fileno()
134 > + self.input_fd = os.fdopen(self.input_fd,
135 > 'rb', 0) +
136 > + fd = self.input_fd.fileno()
137 >
138 > fcntl.fcntl(fd, fcntl.F_SETFL,
139 > fcntl.fcntl(fd, fcntl.F_GETFL) |
140 > os.O_NONBLOCK) @@ -57,7 +64,8 @@ class PipeLogger(AbstractPollTask):
141 > fcntl.fcntl(fd, fcntl.F_SETFD,
142 > fcntl.fcntl(fd,
143 > fcntl.F_GETFD) | fcntl.FD_CLOEXEC)
144 > - self.scheduler.add_reader(fd, self._output_handler,
145 > fd)
146 > + self._io_loop_task =
147 > asyncio.ensure_future(self._io_loop(self.input_fd),
148 > loop=self.scheduler)
149 > +
150 > self._io_loop_task.add_done_callback(self._io_loop_done)
151 > self._registered = True
152 > def _cancel(self):
153 > @@ -65,25 +73,36 @@ class PipeLogger(AbstractPollTask):
154 > if self.returncode is None:
155 > self.returncode = self._cancelled_returncode
156 >
157 > - def _output_handler(self, fd):
158 > -
159 > + @coroutine
160 > + def _io_loop(self, input_file):
161 > background = self.background
162 > stdout_fd = self.stdout_fd
163 > log_file = self._log_file
164 > + fd = input_file.fileno()
165 >
166 > while True:
167 > buf = self._read_buf(fd)
168 >
169 > if buf is None:
170 > # not a POLLIN event, EAGAIN, etc...
171 > - break
172 > + future =
173 > self.scheduler.create_future()
174 > + self.scheduler.add_reader(fd,
175 > future.set_result, None)
176 > + try:
177 > + yield future
178 > + finally:
179 > + # The loop and input file
180 > may have been closed.
181 > + if not
182 > self.scheduler.is_closed():
183 > + future.done() or
184 > future.cancel()
185 > + # Do not call
186 > remove_reader in cases where fd has
187 > + # been closed and
188 > then re-allocated to a concurrent
189 > + # coroutine as in
190 > bug 716636.
191 > + if not
192 > input_file.closed:
193 > +
194 > self.scheduler.remove_reader(fd)
195 > + continue
196 >
197 > if not buf:
198 > # EOF
199 > - self._unregister()
200 > - self.returncode = self.returncode or
201 > os.EX_OK
202 > - self._async_wait()
203 > - break
204 > + return
205 >
206 > else:
207 > if not background and stdout_fd is
208 > not None: @@ -120,8 +139,25 @@ class PipeLogger(AbstractPollTask):
209 > fcntl.F_GETFL)
210 > ^ os.O_NONBLOCK)
211 > if log_file is not None:
212 > - log_file.write(buf)
213 > - log_file.flush()
214 > + if self._log_file_nb:
215 > + # Use the _writer
216 > function which uses os.write, since the
217 > + # log_file.write
218 > method looses data when an EAGAIN occurs.
219 > + yield
220 > _writer(log_file, buf, loop=self.scheduler)
221 > + else:
222 > + # For gzip.GzipFile
223 > instances, the above _writer function
224 > + # will not work
225 > because data written directly to the file
226 > + # descriptor
227 > bypasses compression.
228 > + log_file.write(buf)
229 > + log_file.flush()
230 > +
231 > + def _io_loop_done(self, future):
232 > + try:
233 > + future.result()
234 > + except asyncio.CancelledError:
235 > + self.cancel()
236 > + self._was_cancelled()
237 > + self.returncode = self.returncode or os.EX_OK
238 > + self._async_wait()
239 >
240 > def _unregister(self):
241 > if self.input_fd is not None:
242 > @@ -133,11 +169,16 @@ class PipeLogger(AbstractPollTask):
243 > self.input_fd.close()
244 > self.input_fd = None
245 >
246 > + if self._io_loop_task is not None:
247 > + self._io_loop_task.done() or
248 > self._io_loop_task.cancel()
249 > + self._io_loop_task = None
250 > +
251 > if self.stdout_fd is not None:
252 > os.close(self.stdout_fd)
253 > self.stdout_fd = None
254 >
255 > if self._log_file is not None:
256 > +
257 > self.scheduler.remove_writer(self._log_file.fileno())
258 > self._log_file.close() self._log_file = None
259 >
260
261 Looks good