Gentoo Archives: gentoo-portage-dev

From: Zac Medico <zmedico@g.o>
To: gentoo-portage-dev@l.g.o
Cc: Zac Medico <zmedico@g.o>
Subject: [gentoo-portage-dev] [PATCH v2] ForkProcess: replace os.fork with multiprocessing.Process (bug 730192)
Date: Sat, 18 Jul 2020 01:59:39
Message-Id: 20200718015720.73732-1-zmedico@gentoo.org
In Reply to: [gentoo-portage-dev] [PATCH] ForkProcess: replace os.fork with multiprocessing.Process (bug 730192) by Zac Medico
1 Replace os.fork with multiprocessing.Process, in order to leverage
2 any pre-fork and post-fork interpreter housekeeping that it provides,
3 promoting a healthy state for the forked interpreter.
4
5 Since multiprocessing.Process closes sys.__stdin__, fix relevant
6 code to use the portage._get_stdin() compatibility function.
7 In case there's a legitimate need to inherit stdin for things like
8 PROPERTIES=interactive support, create a temporary duplicate of
9 fd_pipes[0] when appropriate, and restore sys.stdin and sys.__stdin__
10 in the subprocess.
11
12 Bug: https://bugs.gentoo.org/730192
13 Signed-off-by: Zac Medico <zmedico@g.o>
14 ---
15 [PATCH v2]
16 * Use sentinel for all python versions
17 * Add _proc_join coroutine for non-blocking join
18
19 lib/portage/process.py | 4 +-
20 lib/portage/sync/controller.py | 4 +-
21 lib/portage/util/_async/ForkProcess.py | 146 +++++++++++++++++++------
22 3 files changed, 119 insertions(+), 35 deletions(-)
23
24 diff --git a/lib/portage/process.py b/lib/portage/process.py
25 index 6af668db4..b7316c89d 100644
26 --- a/lib/portage/process.py
27 +++ b/lib/portage/process.py
28 @@ -1,5 +1,5 @@
29 # portage.py -- core Portage functionality
30 -# Copyright 1998-2019 Gentoo Authors
31 +# Copyright 1998-2020 Gentoo Authors
32 # Distributed under the terms of the GNU General Public License v2
33
34
35 @@ -107,7 +107,7 @@ def sanitize_fds():
36 if _set_inheritable is not None:
37
38 whitelist = frozenset([
39 - sys.__stdin__.fileno(),
40 + portage._get_stdin().fileno(),
41 sys.__stdout__.fileno(),
42 sys.__stderr__.fileno(),
43 ])
44 diff --git a/lib/portage/sync/controller.py b/lib/portage/sync/controller.py
45 index c4c72e73a..43bb5d520 100644
46 --- a/lib/portage/sync/controller.py
47 +++ b/lib/portage/sync/controller.py
48 @@ -1,4 +1,4 @@
49 -# Copyright 2014-2019 Gentoo Authors
50 +# Copyright 2014-2020 Gentoo Authors
51 # Distributed under the terms of the GNU General Public License v2
52
53 from __future__ import print_function
54 @@ -231,7 +231,7 @@ class SyncManager(object):
55 # Redirect command stderr to stdout, in order to prevent
56 # spurious cron job emails (bug 566132).
57 spawn_kwargs["fd_pipes"] = {
58 - 0: sys.__stdin__.fileno(),
59 + 0: portage._get_stdin().fileno(),
60 1: sys.__stdout__.fileno(),
61 2: sys.__stdout__.fileno()
62 }
63 diff --git a/lib/portage/util/_async/ForkProcess.py b/lib/portage/util/_async/ForkProcess.py
64 index d84e93833..eb01a6232 100644
65 --- a/lib/portage/util/_async/ForkProcess.py
66 +++ b/lib/portage/util/_async/ForkProcess.py
67 @@ -1,37 +1,123 @@
68 -# Copyright 2012-2013 Gentoo Foundation
69 +# Copyright 2012-2020 Gentoo Authors
70 # Distributed under the terms of the GNU General Public License v2
71
72 +import fcntl
73 +import functools
74 +import multiprocessing
75 import signal
76 import sys
77 -import traceback
78
79 import portage
80 from portage import os
81 +from portage.util.futures import asyncio
82 +from portage.util.futures.compat_coroutine import coroutine
83 from _emerge.SpawnProcess import SpawnProcess
84
85 class ForkProcess(SpawnProcess):
86
87 - __slots__ = ()
88 + __slots__ = ('_proc', '_proc_join_task')
89 +
90 + # Number of seconds between poll attempts for process exit status
91 + # (after the sentinel has become ready).
92 + _proc_join_interval = 0.1
93
94 def _spawn(self, args, fd_pipes=None, **kwargs):
95 """
96 - Fork a subprocess, apply local settings, and call fetch().
97 + Override SpawnProcess._spawn to fork a subprocess that calls
98 + self._run(). This uses multiprocessing.Process in order to leverage
99 + any pre-fork and post-fork interpreter housekeeping that it provides,
100 + promoting a healthy state for the forked interpreter.
101 """
102 -
103 - parent_pid = os.getpid()
104 - pid = None
105 + # Since multiprocessing.Process closes sys.__stdin__, create a
106 + # temporary duplicate of fd_pipes[0] so that sys.__stdin__ can
107 + # be restored in the subprocess, in case this is needed for
108 + # things like PROPERTIES=interactive support.
109 + stdin_dup = None
110 try:
111 - pid = os.fork()
112 + stdin_fd = fd_pipes.get(0)
113 + if stdin_fd is not None and stdin_fd == portage._get_stdin().fileno():
114 + stdin_dup = os.dup(stdin_fd)
115 + fcntl.fcntl(stdin_dup, fcntl.F_SETFD,
116 + fcntl.fcntl(stdin_fd, fcntl.F_GETFD))
117 + fd_pipes[0] = stdin_dup
118 + self._proc = multiprocessing.Process(target=self._bootstrap, args=(fd_pipes,))
119 + self._proc.start()
120 + finally:
121 + if stdin_dup is not None:
122 + os.close(stdin_dup)
123 +
124 + self._proc_join_task = asyncio.ensure_future(
125 + self._proc_join(self._proc))
126 + self._proc_join_task.add_done_callback(
127 + functools.partial(self._proc_join_done, self._proc))
128 +
129 + return [self._proc.pid]
130 +
131 + def _cancel(self):
132 + if self._proc is None:
133 + super(ForkProcess, self)._cancel()
134 + else:
135 + self._proc.terminate()
136 +
137 + def _async_wait(self):
138 + if self._proc_join_task is None:
139 + super(ForkProcess, self)._async_wait()
140
141 - if pid != 0:
142 - if not isinstance(pid, int):
143 - raise AssertionError(
144 - "fork returned non-integer: %s" % (repr(pid),))
145 - return [pid]
146 + def _async_waitpid(self):
147 + if self._proc_join_task is None:
148 + super(ForkProcess, self)._async_waitpid()
149
150 - rval = 1
151 + @coroutine
152 + def _proc_join(self, proc):
153 + sentinel_reader = self.scheduler.create_future()
154 + self.scheduler.add_reader(proc.sentinel,
155 + lambda: sentinel_reader.done() or sentinel_reader.set_result(None))
156 + try:
157 + yield sentinel_reader
158 + finally:
159 + # If multiprocessing.Process supports the close method, then
160 + # access to proc.sentinel will raise ValueError if the
161 + # sentinel has been closed. In this case it's not safe to call
162 + # remove_reader, since the file descriptor may have been closed
163 + # and then reallocated to a concurrent coroutine. When the
164 + # close method is not supported, proc.sentinel remains open
165 + # until proc's finalizer is called.
166 try:
167 + self.scheduler.remove_reader(proc.sentinel)
168 + except ValueError:
169 + pass
170 +
171 + # Now that proc.sentinel is ready, poll until process exit
172 + # status has become available.
173 + while True:
174 + proc.join(0)
175 + if proc.exitcode is not None:
176 + break
177 + yield asyncio.sleep(self._proc_join_interval)
178 +
179 + def _proc_join_done(self, proc, future):
180 + future.cancelled() or future.result()
181 + self._was_cancelled()
182 + if self.returncode is None:
183 + self.returncode = proc.exitcode
184
185 + self._proc = None
186 + if hasattr(proc, 'close'):
187 + proc.close()
188 + self._proc_join_task = None
189 + self._async_wait()
190 +
191 + def _unregister(self):
192 + super(ForkProcess, self)._unregister()
193 + if self._proc is not None:
194 + if self._proc.is_alive():
195 + self._proc.terminate()
196 + self._proc = None
197 + if self._proc_join_task is not None:
198 + self._proc_join_task.cancel()
199 + self._proc_join_task = None
200 +
201 + def _bootstrap(self, fd_pipes):
202 # Use default signal handlers in order to avoid problems
203 # killing subprocesses as reported in bug #353239.
204 signal.signal(signal.SIGINT, signal.SIG_DFL)
205 @@ -52,24 +138,22 @@ class ForkProcess(SpawnProcess):
206 # (see _setup_pipes docstring).
207 portage.process._setup_pipes(fd_pipes, close_fds=False)
208
209 - rval = self._run()
210 - except SystemExit:
211 - raise
212 - except:
213 - traceback.print_exc()
214 - # os._exit() skips stderr flush!
215 - sys.stderr.flush()
216 - finally:
217 - os._exit(rval)
218 + # Since multiprocessing.Process closes sys.__stdin__ and
219 + # makes sys.stdin refer to os.devnull, restore it when
220 + # appropriate.
221 + if 0 in fd_pipes:
222 + # It's possible that sys.stdin.fileno() is already 0,
223 + # and in that case the above _setup_pipes call will
224 + # have already updated its identity via dup2. Otherwise,
225 + # perform the dup2 call now, and also copy the file
226 + # descriptor flags.
227 + if sys.stdin.fileno() != 0:
228 + os.dup2(0, sys.stdin.fileno())
229 + fcntl.fcntl(sys.stdin.fileno(), fcntl.F_SETFD,
230 + fcntl.fcntl(0, fcntl.F_GETFD))
231 + sys.__stdin__ = sys.stdin
232
233 - finally:
234 - if pid == 0 or (pid is None and os.getpid() != parent_pid):
235 - # Call os._exit() from a finally block in order
236 - # to suppress any finally blocks from earlier
237 - # in the call stack (see bug #345289). This
238 - # finally block has to be setup before the fork
239 - # in order to avoid a race condition.
240 - os._exit(1)
241 + sys.exit(self._run())
242
243 def _run(self):
244 raise NotImplementedError(self)
245 --
246 2.25.3