Gentoo Archives: gentoo-commits

From: Zac Medico <zmedico@g.o>
To: gentoo-commits@l.g.o
Subject: [gentoo-commits] proj/portage:master commit in: lib/portage/util/_async/, lib/portage/tests/process/
Date: Mon, 24 Feb 2020 10:51:07
Message-Id: 1582539993.27712651aa7014a960b012dc89457df09677edc1.zmedico@gentoo
1 commit: 27712651aa7014a960b012dc89457df09677edc1
2 Author: Zac Medico <zmedico <AT> gentoo <DOT> org>
3 AuthorDate: Mon Feb 24 08:06:11 2020 +0000
4 Commit: Zac Medico <zmedico <AT> gentoo <DOT> org>
5 CommitDate: Mon Feb 24 10:26:33 2020 +0000
6 URL: https://gitweb.gentoo.org/proj/portage.git/commit/?id=27712651
7
8 PipeLogger: non-blocking write to pipe (bug 709746)
9
10 Add support to write to a non-blocking pipe instead of a
11 log file. This is needed for the purposes of bug 709746,
12 where PipeLogger will write to a pipe that is drained
13 by anoher PipeLogger instance which is running in the same
14 process.
15
16 Bug: https://bugs.gentoo.org/709746
17 Signed-off-by: Zac Medico <zmedico <AT> gentoo.org>
18
19 lib/portage/tests/process/test_PopenProcess.py | 41 +++++++++++++++-
20 lib/portage/util/_async/PipeLogger.py | 67 +++++++++++++++++++++-----
21 2 files changed, 94 insertions(+), 14 deletions(-)
22
23 diff --git a/lib/portage/tests/process/test_PopenProcess.py b/lib/portage/tests/process/test_PopenProcess.py
24 index ed506b814..d4e97f210 100644
25 --- a/lib/portage/tests/process/test_PopenProcess.py
26 +++ b/lib/portage/tests/process/test_PopenProcess.py
27 @@ -9,6 +9,8 @@ from portage.tests import TestCase
28 from portage.util._async.PipeLogger import PipeLogger
29 from portage.util._async.PopenProcess import PopenProcess
30 from portage.util._eventloop.global_event_loop import global_event_loop
31 +from portage.util.futures._asyncio.streams import _reader
32 +from portage.util.futures.compat_coroutine import coroutine, coroutine_return
33 from _emerge.PipeReader import PipeReader
34
35 class PopenPipeTestCase(TestCase):
36 @@ -73,8 +75,41 @@ class PopenPipeTestCase(TestCase):
37
38 return content.decode('ascii', 'replace')
39
40 + @coroutine
41 + def _testPipeLoggerToPipe(self, test_string, loop=None):
42 + """
43 + Test PipeLogger writing to a pipe connected to a PipeReader.
44 + This verifies that PipeLogger does not deadlock when writing
45 + to a pipe that's drained by a PipeReader running in the same
46 + process (requires non-blocking write).
47 + """
48 +
49 + producer = PopenProcess(proc=subprocess.Popen(
50 + ["bash", "-c", self._echo_cmd % test_string],
51 + stdout=subprocess.PIPE, stderr=subprocess.STDOUT),
52 + scheduler=loop)
53 +
54 + pr, pw = os.pipe()
55 +
56 + consumer = producer.pipe_reader = PipeLogger(background=True,
57 + input_fd=producer.proc.stdout,
58 + log_file_path=os.fdopen(pw, 'wb', 0))
59 +
60 + reader = _reader(pr, loop=loop)
61 + yield producer.async_start()
62 + content = yield reader
63 + yield producer.async_wait()
64 + yield consumer.async_wait()
65 +
66 + self.assertEqual(producer.returncode, os.EX_OK)
67 + self.assertEqual(consumer.returncode, os.EX_OK)
68 +
69 + coroutine_return(content.decode('ascii', 'replace'))
70 +
71 def testPopenPipe(self):
72 - for x in (1, 2, 5, 6, 7, 8, 2**5, 2**10, 2**12, 2**13, 2**14):
73 + loop = global_event_loop()
74 +
75 + for x in (1, 2, 5, 6, 7, 8, 2**5, 2**10, 2**12, 2**13, 2**14, 2**15, 2**16):
76 test_string = x * "a"
77 output = self._testPipeReader(test_string)
78 self.assertEqual(test_string, output,
79 @@ -83,3 +118,7 @@ class PopenPipeTestCase(TestCase):
80 output = self._testPipeLogger(test_string)
81 self.assertEqual(test_string, output,
82 "x = %s, len(output) = %s" % (x, len(output)))
83 +
84 + output = loop.run_until_complete(self._testPipeLoggerToPipe(test_string, loop=loop))
85 + self.assertEqual(test_string, output,
86 + "x = %s, len(output) = %s" % (x, len(output)))
87
88 diff --git a/lib/portage/util/_async/PipeLogger.py b/lib/portage/util/_async/PipeLogger.py
89 index a4258f350..6b03988a1 100644
90 --- a/lib/portage/util/_async/PipeLogger.py
91 +++ b/lib/portage/util/_async/PipeLogger.py
92 @@ -8,6 +8,9 @@ import sys
93
94 import portage
95 from portage import os, _encodings, _unicode_encode
96 +from portage.util.futures import asyncio
97 +from portage.util.futures.compat_coroutine import coroutine
98 +from portage.util.futures.unix_events import _set_nonblocking
99 from _emerge.AbstractPollTask import AbstractPollTask
100
101 class PipeLogger(AbstractPollTask):
102 @@ -21,13 +24,15 @@ class PipeLogger(AbstractPollTask):
103 """
104
105 __slots__ = ("input_fd", "log_file_path", "stdout_fd") + \
106 - ("_log_file", "_log_file_real")
107 + ("_io_loop_task", "_log_file", "_log_file_real")
108
109 def _start(self):
110
111 log_file_path = self.log_file_path
112 - if log_file_path is not None:
113 -
114 + if hasattr(log_file_path, 'write'):
115 + self._log_file = log_file_path
116 + _set_nonblocking(self._log_file.fileno())
117 + elif log_file_path is not None:
118 self._log_file = open(_unicode_encode(log_file_path,
119 encoding=_encodings['fs'], errors='strict'), mode='ab')
120 if log_file_path.endswith('.gz'):
121 @@ -57,7 +62,8 @@ class PipeLogger(AbstractPollTask):
122 fcntl.fcntl(fd, fcntl.F_SETFD,
123 fcntl.fcntl(fd, fcntl.F_GETFD) | fcntl.FD_CLOEXEC)
124
125 - self.scheduler.add_reader(fd, self._output_handler, fd)
126 + self._io_loop_task = asyncio.ensure_future(self._io_loop(fd), loop=self.scheduler)
127 + self._io_loop_task.add_done_callback(self._io_loop_done)
128 self._registered = True
129
130 def _cancel(self):
131 @@ -65,8 +71,8 @@ class PipeLogger(AbstractPollTask):
132 if self.returncode is None:
133 self.returncode = self._cancelled_returncode
134
135 - def _output_handler(self, fd):
136 -
137 + @coroutine
138 + def _io_loop(self, fd):
139 background = self.background
140 stdout_fd = self.stdout_fd
141 log_file = self._log_file
142 @@ -76,14 +82,18 @@ class PipeLogger(AbstractPollTask):
143
144 if buf is None:
145 # not a POLLIN event, EAGAIN, etc...
146 - break
147 + future = self.scheduler.create_future()
148 + self.scheduler.add_reader(fd, future.set_result, None)
149 + try:
150 + yield future
151 + finally:
152 + self.scheduler.remove_reader(fd)
153 + future.done() or future.cancel()
154 + continue
155
156 if not buf:
157 # EOF
158 - self._unregister()
159 - self.returncode = self.returncode or os.EX_OK
160 - self._async_wait()
161 - break
162 + return
163
164 else:
165 if not background and stdout_fd is not None:
166 @@ -120,8 +130,34 @@ class PipeLogger(AbstractPollTask):
167 fcntl.F_GETFL) ^ os.O_NONBLOCK)
168
169 if log_file is not None:
170 - log_file.write(buf)
171 - log_file.flush()
172 + write_buf = buf
173 + while True:
174 + try:
175 + if write_buf is not None:
176 + log_file.write(write_buf)
177 + write_buf = None
178 + log_file.flush()
179 + except EnvironmentError as e:
180 + if e.errno != errno.EAGAIN:
181 + raise
182 + future = self.scheduler.create_future()
183 + self.scheduler.add_writer(self._log_file.fileno(), future.set_result, None)
184 + try:
185 + yield future
186 + finally:
187 + self.scheduler.remove_writer(self._log_file.fileno())
188 + future.done() or future.cancel()
189 + else:
190 + break
191 +
192 + def _io_loop_done(self, future):
193 + try:
194 + future.result()
195 + except asyncio.CancelledError:
196 + self.cancel()
197 + self._was_cancelled()
198 + self.returncode = self.returncode or os.EX_OK
199 + self._async_wait()
200
201 def _unregister(self):
202 if self.input_fd is not None:
203 @@ -133,11 +169,16 @@ class PipeLogger(AbstractPollTask):
204 self.input_fd.close()
205 self.input_fd = None
206
207 + if self._io_loop_task is not None:
208 + self._io_loop_task.done() or self._io_loop_task.cancel()
209 + self._io_loop_task = None
210 +
211 if self.stdout_fd is not None:
212 os.close(self.stdout_fd)
213 self.stdout_fd = None
214
215 if self._log_file is not None:
216 + self.scheduler.remove_writer(self._log_file.fileno())
217 self._log_file.close()
218 self._log_file = None