Gentoo Archives: gentoo-portage-dev

From: Zac Medico <zmedico@g.o>
To: gentoo-portage-dev@l.g.o
Cc: Zac Medico <zmedico@g.o>
Subject: [gentoo-portage-dev] [PATCH] ForkProcess: replace os.fork with multiprocessing.Process (bug 730192)
Date: Thu, 16 Jul 2020 20:28:46
Message-Id: 20200716202816.24649-1-zmedico@gentoo.org
1 Replace os.fork with multiprocessing.Process, in order to leverage
2 any pre-fork and post-fork interpreter housekeeping that it provides,
3 promoting a healthy state for the forked interpreter.
4
5 Since multiprocessing.Process closes sys.__stdin__, fix relevant
6 code to use the portage._get_stdin() compatibility function.
7 In case there's a legitimate need to inherit stdin for things like
8 PROPERTIES=interactive support, create a temporary duplicate of
9 fd_pipes[0] when appropriate, and restore sys.stdin and sys.__stdin__
10 in the subprocess.
11
12 Bug: https://bugs.gentoo.org/730192
13 Signed-off-by: Zac Medico <zmedico@g.o>
14 ---
15 lib/portage/process.py | 4 +-
16 lib/portage/sync/controller.py | 4 +-
17 lib/portage/util/_async/ForkProcess.py | 135 +++++++++++++++++++------
18 3 files changed, 108 insertions(+), 35 deletions(-)
19
20 diff --git a/lib/portage/process.py b/lib/portage/process.py
21 index f550bcb30..2affd4d4b 100644
22 --- a/lib/portage/process.py
23 +++ b/lib/portage/process.py
24 @@ -1,5 +1,5 @@
25 # portage.py -- core Portage functionality
26 -# Copyright 1998-2019 Gentoo Authors
27 +# Copyright 1998-2020 Gentoo Authors
28 # Distributed under the terms of the GNU General Public License v2
29
30
31 @@ -107,7 +107,7 @@ def sanitize_fds():
32 if _set_inheritable is not None:
33
34 whitelist = frozenset([
35 - sys.__stdin__.fileno(),
36 + portage._get_stdin().fileno(),
37 sys.__stdout__.fileno(),
38 sys.__stderr__.fileno(),
39 ])
40 diff --git a/lib/portage/sync/controller.py b/lib/portage/sync/controller.py
41 index c4c72e73a..43bb5d520 100644
42 --- a/lib/portage/sync/controller.py
43 +++ b/lib/portage/sync/controller.py
44 @@ -1,4 +1,4 @@
45 -# Copyright 2014-2019 Gentoo Authors
46 +# Copyright 2014-2020 Gentoo Authors
47 # Distributed under the terms of the GNU General Public License v2
48
49 from __future__ import print_function
50 @@ -231,7 +231,7 @@ class SyncManager(object):
51 # Redirect command stderr to stdout, in order to prevent
52 # spurious cron job emails (bug 566132).
53 spawn_kwargs["fd_pipes"] = {
54 - 0: sys.__stdin__.fileno(),
55 + 0: portage._get_stdin().fileno(),
56 1: sys.__stdout__.fileno(),
57 2: sys.__stdout__.fileno()
58 }
59 diff --git a/lib/portage/util/_async/ForkProcess.py b/lib/portage/util/_async/ForkProcess.py
60 index d84e93833..ade5c67ad 100644
61 --- a/lib/portage/util/_async/ForkProcess.py
62 +++ b/lib/portage/util/_async/ForkProcess.py
63 @@ -1,6 +1,8 @@
64 -# Copyright 2012-2013 Gentoo Foundation
65 +# Copyright 2012-2020 Gentoo Authors
66 # Distributed under the terms of the GNU General Public License v2
67
68 +import fcntl
69 +import multiprocessing
70 import signal
71 import sys
72 import traceback
73 @@ -11,27 +13,100 @@ from _emerge.SpawnProcess import SpawnProcess
74
75 class ForkProcess(SpawnProcess):
76
77 - __slots__ = ()
78 + __slots__ = ('_proc',)
79
80 def _spawn(self, args, fd_pipes=None, **kwargs):
81 """
82 - Fork a subprocess, apply local settings, and call fetch().
83 + Override SpawnProcess._spawn to fork a subprocess that calls
84 + self._run(). This uses multiprocessing.Process in order to leverage
85 + any pre-fork and post-fork interpreter housekeeping that it provides,
86 + promoting a healthy state for the forked interpreter.
87 """
88 -
89 - parent_pid = os.getpid()
90 - pid = None
91 + # Since multiprocessing.Process closes sys.__stdin__, create a
92 + # temporary duplicate of fd_pipes[0] so that sys.__stdin__ can
93 + # be restored in the subprocess, in case this is needed for
94 + # things like PROPERTIES=interactive support.
95 + stdin_dup = None
96 try:
97 - pid = os.fork()
98 + stdin_fd = fd_pipes.get(0)
99 + if stdin_fd is not None and stdin_fd == portage._get_stdin().fileno():
100 + stdin_dup = os.dup(stdin_fd)
101 + fcntl.fcntl(stdin_dup, fcntl.F_SETFD,
102 + fcntl.fcntl(stdin_fd, fcntl.F_GETFD))
103 + fd_pipes[0] = stdin_dup
104 + self._proc = multiprocessing.Process(target=self._bootstrap, args=(fd_pipes,))
105 + self._proc.start()
106 + finally:
107 + if stdin_dup is not None:
108 + os.close(stdin_dup)
109 +
110 + if self._use_proc_sentinel(self._proc):
111 + self.scheduler.add_reader(self._proc.sentinel, self._proc_sentinel_exit, self._proc)
112 +
113 + return [self._proc.pid]
114 +
115 + def _use_proc_sentinel(self, proc):
116 + """
117 + Use sentinel for versions of python that support the close
118 + method, since a premature call to self._proc.close() may
119 + raise ValueError. It's not safe to call remove_reader on
120 + proc.sentinel unless the close method is supported, since
121 + otherwise it's not possible to know when the corresponding
122 + file descriptor has been closed and then reallocated to a
123 + concurrent coroutine.
124
125 - if pid != 0:
126 - if not isinstance(pid, int):
127 - raise AssertionError(
128 - "fork returned non-integer: %s" % (repr(pid),))
129 - return [pid]
130 + @param proc: process instance
131 + @type proc: multiprocessing.Process
132 + @rtype: bool
133 + @returns: True if multiprocessing.Process sentinal should be used
134 + """
135 + return hasattr(proc, 'close')
136 +
137 + def _cancel(self):
138 + if self._proc is None:
139 + super(ForkProcess, self)._cancel()
140 + else:
141 + self._proc.terminate()
142 +
143 + def _async_wait(self):
144 + # Wait for _proc_sentinel_exit when appropriate, since a premature
145 + # call to self._proc.close() may raise ValueError.
146 + if self._proc is None or not self._use_proc_sentinel(self._proc):
147 + super(ForkProcess, self)._async_wait()
148
149 - rval = 1
150 - try:
151 + def _async_waitpid(self):
152 + # Wait for _proc_sentinel_exit when appropriate, since a premature
153 + # call to self._proc.close() may raise ValueError.
154 + if self._proc is None or not self._use_proc_sentinel(self._proc):
155 + super(ForkProcess, self)._async_waitpid()
156
157 + def _proc_sentinel_exit(self, proc):
158 + self._proc = None
159 + # If multiprocessing.Process supports the close method, then
160 + # access to the sentinel value should raise ValueError if the
161 + # sentinel has been closed. In this case it's not safe to call
162 + # remove_reader, since file descriptor has been closed and then
163 + # reallocated to a concurrent coroutine. After proc is closed,
164 + # most method calls and attribute accesses will raise ValueError,
165 + # so we assume that it is ready to close here and that no other
166 + # code will ever close it (the sentinel provides the only means
167 + # to predict when it is safe to close).
168 + self.scheduler.remove_reader(proc.sentinel)
169 + proc.join()
170 + self._was_cancelled()
171 + if self.returncode is None:
172 + self.returncode = proc.exitcode
173 + proc.close()
174 + self._async_wait()
175 +
176 + def _unregister(self):
177 + super(ForkProcess, self)._unregister()
178 + if self._proc is not None:
179 + if self._proc.is_alive():
180 + self._proc.terminate()
181 + self._proc = None
182 +
183 + def _bootstrap(self, fd_pipes):
184 # Use default signal handlers in order to avoid problems
185 # killing subprocesses as reported in bug #353239.
186 signal.signal(signal.SIGINT, signal.SIG_DFL)
187 @@ -52,24 +127,22 @@ class ForkProcess(SpawnProcess):
188 # (see _setup_pipes docstring).
189 portage.process._setup_pipes(fd_pipes, close_fds=False)
190
191 - rval = self._run()
192 - except SystemExit:
193 - raise
194 - except:
195 - traceback.print_exc()
196 - # os._exit() skips stderr flush!
197 - sys.stderr.flush()
198 - finally:
199 - os._exit(rval)
200 + # Since multiprocessing.Process closes sys.__stdin__ and
201 + # makes sys.stdin refer to os.devnull, restore it when
202 + # appropriate.
203 + if 0 in fd_pipes:
204 + # It's possible that sys.stdin.fileno() is already 0,
205 + # and in that case the above _setup_pipes call will
206 + # have already updated its identity via dup2. Otherwise,
207 + # perform the dup2 call now, and also copy the file
208 + # descriptor flags.
209 + if sys.stdin.fileno() != 0:
210 + os.dup2(0, sys.stdin.fileno())
211 + fcntl.fcntl(sys.stdin.fileno(), fcntl.F_SETFD,
212 + fcntl.fcntl(0, fcntl.F_GETFD))
213 + sys.__stdin__ = sys.stdin
214
215 - finally:
216 - if pid == 0 or (pid is None and os.getpid() != parent_pid):
217 - # Call os._exit() from a finally block in order
218 - # to suppress any finally blocks from earlier
219 - # in the call stack (see bug #345289). This
220 - # finally block has to be setup before the fork
221 - # in order to avoid a race condition.
222 - os._exit(1)
223 + sys.exit(self._run())
224
225 def _run(self):
226 raise NotImplementedError(self)
227 --
228 2.25.3

Replies