1 |
Replace os.fork with multiprocessing.Process, in order to leverage |
2 |
any pre-fork and post-fork interpreter housekeeping that it provides, |
3 |
promoting a healthy state for the forked interpreter. |
4 |
|
5 |
Since multiprocessing.Process closes sys.__stdin__, fix relevant |
6 |
code to use the portage._get_stdin() compatibility function. |
7 |
In case there's a legitimate need to inherit stdin for things like |
8 |
PROPERTIES=interactive support, create a temporary duplicate of |
9 |
fd_pipes[0] when appropriate, and restore sys.stdin and sys.__stdin__ |
10 |
in the subprocess. |
11 |
|
12 |
Bug: https://bugs.gentoo.org/730192 |
13 |
Signed-off-by: Zac Medico <zmedico@g.o> |
14 |
--- |
15 |
[PATCH v2] |
16 |
* Use sentinel for all python versions |
17 |
* Add _proc_join coroutine for non-blocking join |
18 |
|
19 |
lib/portage/process.py | 4 +- |
20 |
lib/portage/sync/controller.py | 4 +- |
21 |
lib/portage/util/_async/ForkProcess.py | 146 +++++++++++++++++++------ |
22 |
3 files changed, 119 insertions(+), 35 deletions(-) |
23 |
|
24 |
diff --git a/lib/portage/process.py b/lib/portage/process.py |
25 |
index 6af668db4..b7316c89d 100644 |
26 |
--- a/lib/portage/process.py |
27 |
+++ b/lib/portage/process.py |
28 |
@@ -1,5 +1,5 @@ |
29 |
# portage.py -- core Portage functionality |
30 |
-# Copyright 1998-2019 Gentoo Authors |
31 |
+# Copyright 1998-2020 Gentoo Authors |
32 |
# Distributed under the terms of the GNU General Public License v2 |
33 |
|
34 |
|
35 |
@@ -107,7 +107,7 @@ def sanitize_fds(): |
36 |
if _set_inheritable is not None: |
37 |
|
38 |
whitelist = frozenset([ |
39 |
- sys.__stdin__.fileno(), |
40 |
+ portage._get_stdin().fileno(), |
41 |
sys.__stdout__.fileno(), |
42 |
sys.__stderr__.fileno(), |
43 |
]) |
44 |
diff --git a/lib/portage/sync/controller.py b/lib/portage/sync/controller.py |
45 |
index c4c72e73a..43bb5d520 100644 |
46 |
--- a/lib/portage/sync/controller.py |
47 |
+++ b/lib/portage/sync/controller.py |
48 |
@@ -1,4 +1,4 @@ |
49 |
-# Copyright 2014-2019 Gentoo Authors |
50 |
+# Copyright 2014-2020 Gentoo Authors |
51 |
# Distributed under the terms of the GNU General Public License v2 |
52 |
|
53 |
from __future__ import print_function |
54 |
@@ -231,7 +231,7 @@ class SyncManager(object): |
55 |
# Redirect command stderr to stdout, in order to prevent |
56 |
# spurious cron job emails (bug 566132). |
57 |
spawn_kwargs["fd_pipes"] = { |
58 |
- 0: sys.__stdin__.fileno(), |
59 |
+ 0: portage._get_stdin().fileno(), |
60 |
1: sys.__stdout__.fileno(), |
61 |
2: sys.__stdout__.fileno() |
62 |
} |
63 |
diff --git a/lib/portage/util/_async/ForkProcess.py b/lib/portage/util/_async/ForkProcess.py |
64 |
index d84e93833..eb01a6232 100644 |
65 |
--- a/lib/portage/util/_async/ForkProcess.py |
66 |
+++ b/lib/portage/util/_async/ForkProcess.py |
67 |
@@ -1,37 +1,123 @@ |
68 |
-# Copyright 2012-2013 Gentoo Foundation |
69 |
+# Copyright 2012-2020 Gentoo Authors |
70 |
# Distributed under the terms of the GNU General Public License v2 |
71 |
|
72 |
+import fcntl |
73 |
+import functools |
74 |
+import multiprocessing |
75 |
import signal |
76 |
import sys |
77 |
-import traceback |
78 |
|
79 |
import portage |
80 |
from portage import os |
81 |
+from portage.util.futures import asyncio |
82 |
+from portage.util.futures.compat_coroutine import coroutine |
83 |
from _emerge.SpawnProcess import SpawnProcess |
84 |
|
85 |
class ForkProcess(SpawnProcess): |
86 |
|
87 |
- __slots__ = () |
88 |
+ __slots__ = ('_proc', '_proc_join_task') |
89 |
+ |
90 |
+ # Number of seconds between poll attempts for process exit status |
91 |
+ # (after the sentinel has become ready). |
92 |
+ _proc_join_interval = 0.1 |
93 |
|
94 |
def _spawn(self, args, fd_pipes=None, **kwargs): |
95 |
""" |
96 |
- Fork a subprocess, apply local settings, and call fetch(). |
97 |
+ Override SpawnProcess._spawn to fork a subprocess that calls |
98 |
+ self._run(). This uses multiprocessing.Process in order to leverage |
99 |
+ any pre-fork and post-fork interpreter housekeeping that it provides, |
100 |
+ promoting a healthy state for the forked interpreter. |
101 |
""" |
102 |
- |
103 |
- parent_pid = os.getpid() |
104 |
- pid = None |
105 |
+ # Since multiprocessing.Process closes sys.__stdin__, create a |
106 |
+ # temporary duplicate of fd_pipes[0] so that sys.__stdin__ can |
107 |
+ # be restored in the subprocess, in case this is needed for |
108 |
+ # things like PROPERTIES=interactive support. |
109 |
+ stdin_dup = None |
110 |
try: |
111 |
- pid = os.fork() |
112 |
+ stdin_fd = fd_pipes.get(0) |
113 |
+ if stdin_fd is not None and stdin_fd == portage._get_stdin().fileno(): |
114 |
+ stdin_dup = os.dup(stdin_fd) |
115 |
+ fcntl.fcntl(stdin_dup, fcntl.F_SETFD, |
116 |
+ fcntl.fcntl(stdin_fd, fcntl.F_GETFD)) |
117 |
+ fd_pipes[0] = stdin_dup |
118 |
+ self._proc = multiprocessing.Process(target=self._bootstrap, args=(fd_pipes,)) |
119 |
+ self._proc.start() |
120 |
+ finally: |
121 |
+ if stdin_dup is not None: |
122 |
+ os.close(stdin_dup) |
123 |
+ |
124 |
+ self._proc_join_task = asyncio.ensure_future( |
125 |
+ self._proc_join(self._proc)) |
126 |
+ self._proc_join_task.add_done_callback( |
127 |
+ functools.partial(self._proc_join_done, self._proc)) |
128 |
+ |
129 |
+ return [self._proc.pid] |
130 |
+ |
131 |
+ def _cancel(self): |
132 |
+ if self._proc is None: |
133 |
+ super(ForkProcess, self)._cancel() |
134 |
+ else: |
135 |
+ self._proc.terminate() |
136 |
+ |
137 |
+ def _async_wait(self): |
138 |
+ if self._proc_join_task is None: |
139 |
+ super(ForkProcess, self)._async_wait() |
140 |
|
141 |
- if pid != 0: |
142 |
- if not isinstance(pid, int): |
143 |
- raise AssertionError( |
144 |
- "fork returned non-integer: %s" % (repr(pid),)) |
145 |
- return [pid] |
146 |
+ def _async_waitpid(self): |
147 |
+ if self._proc_join_task is None: |
148 |
+ super(ForkProcess, self)._async_waitpid() |
149 |
|
150 |
- rval = 1 |
151 |
+ @coroutine |
152 |
+ def _proc_join(self, proc): |
153 |
+ sentinel_reader = self.scheduler.create_future() |
154 |
+ self.scheduler.add_reader(proc.sentinel, |
155 |
+ lambda: sentinel_reader.done() or sentinel_reader.set_result(None)) |
156 |
+ try: |
157 |
+ yield sentinel_reader |
158 |
+ finally: |
159 |
+ # If multiprocessing.Process supports the close method, then |
160 |
+ # access to proc.sentinel will raise ValueError if the |
161 |
+ # sentinel has been closed. In this case it's not safe to call |
162 |
+ # remove_reader, since the file descriptor may have been closed |
163 |
+ # and then reallocated to a concurrent coroutine. When the |
164 |
+ # close method is not supported, proc.sentinel remains open |
165 |
+ # until proc's finalizer is called. |
166 |
try: |
167 |
+ self.scheduler.remove_reader(proc.sentinel) |
168 |
+ except ValueError: |
169 |
+ pass |
170 |
+ |
171 |
+ # Now that proc.sentinel is ready, poll until process exit |
172 |
+ # status has become available. |
173 |
+ while True: |
174 |
+ proc.join(0) |
175 |
+ if proc.exitcode is not None: |
176 |
+ break |
177 |
+ yield asyncio.sleep(self._proc_join_interval) |
178 |
+ |
179 |
+ def _proc_join_done(self, proc, future): |
180 |
+ future.cancelled() or future.result() |
181 |
+ self._was_cancelled() |
182 |
+ if self.returncode is None: |
183 |
+ self.returncode = proc.exitcode |
184 |
|
185 |
+ self._proc = None |
186 |
+ if hasattr(proc, 'close'): |
187 |
+ proc.close() |
188 |
+ self._proc_join_task = None |
189 |
+ self._async_wait() |
190 |
+ |
191 |
+ def _unregister(self): |
192 |
+ super(ForkProcess, self)._unregister() |
193 |
+ if self._proc is not None: |
194 |
+ if self._proc.is_alive(): |
195 |
+ self._proc.terminate() |
196 |
+ self._proc = None |
197 |
+ if self._proc_join_task is not None: |
198 |
+ self._proc_join_task.cancel() |
199 |
+ self._proc_join_task = None |
200 |
+ |
201 |
+ def _bootstrap(self, fd_pipes): |
202 |
# Use default signal handlers in order to avoid problems |
203 |
# killing subprocesses as reported in bug #353239. |
204 |
signal.signal(signal.SIGINT, signal.SIG_DFL) |
205 |
@@ -52,24 +138,22 @@ class ForkProcess(SpawnProcess): |
206 |
# (see _setup_pipes docstring). |
207 |
portage.process._setup_pipes(fd_pipes, close_fds=False) |
208 |
|
209 |
- rval = self._run() |
210 |
- except SystemExit: |
211 |
- raise |
212 |
- except: |
213 |
- traceback.print_exc() |
214 |
- # os._exit() skips stderr flush! |
215 |
- sys.stderr.flush() |
216 |
- finally: |
217 |
- os._exit(rval) |
218 |
+ # Since multiprocessing.Process closes sys.__stdin__ and |
219 |
+ # makes sys.stdin refer to os.devnull, restore it when |
220 |
+ # appropriate. |
221 |
+ if 0 in fd_pipes: |
222 |
+ # It's possible that sys.stdin.fileno() is already 0, |
223 |
+ # and in that case the above _setup_pipes call will |
224 |
+ # have already updated its identity via dup2. Otherwise, |
225 |
+ # perform the dup2 call now, and also copy the file |
226 |
+ # descriptor flags. |
227 |
+ if sys.stdin.fileno() != 0: |
228 |
+ os.dup2(0, sys.stdin.fileno()) |
229 |
+ fcntl.fcntl(sys.stdin.fileno(), fcntl.F_SETFD, |
230 |
+ fcntl.fcntl(0, fcntl.F_GETFD)) |
231 |
+ sys.__stdin__ = sys.stdin |
232 |
|
233 |
- finally: |
234 |
- if pid == 0 or (pid is None and os.getpid() != parent_pid): |
235 |
- # Call os._exit() from a finally block in order |
236 |
- # to suppress any finally blocks from earlier |
237 |
- # in the call stack (see bug #345289). This |
238 |
- # finally block has to be setup before the fork |
239 |
- # in order to avoid a race condition. |
240 |
- os._exit(1) |
241 |
+ sys.exit(self._run()) |
242 |
|
243 |
def _run(self): |
244 |
raise NotImplementedError(self) |
245 |
-- |
246 |
2.25.3 |