Gentoo Archives: gentoo-portage-dev

From: Zac Medico <zmedico@g.o>
To: gentoo-portage-dev@l.g.o
Cc: Zac Medico <zmedico@g.o>
Subject: [gentoo-portage-dev] [PATCH v2] emerge: enable parallel-fetch during pkg_pretend (bug 710432)
Date: Mon, 21 Sep 2020 05:09:15
Message-Id: 20200921050656.460304-1-zmedico@gentoo.org
In Reply to: [gentoo-portage-dev] [PATCH] emerge: enable parallel-fetch during pkg_pretend (bug 710432) by Zac Medico
1 Execute pkg_pretend phases in a coroutine while parallel-fetch
2 is running concurrently. When it's time to execute the pkg_pretend
3 phase for a remote binary package, use a Scheduler _get_prefetcher
4 method to get a running prefetcher if available, and otherwise
5 start a new fetcher.
6
7 Since pkg_pretend phases now run inside of the --keep-going retry
8 loop, --keep-going is now able to recover from pkg_pretend
9 failures, which fixes bug 404157.
10
11 Bug: https://bugs.gentoo.org/404157
12 Bug: https://bugs.gentoo.org/710432
13 Signed-off-by: Zac Medico <zmedico@g.o>
14 ---
15 [PATCH v2] records failed packages for correct interaction with
16 emerge --keep-going, which fixes bug 404157
17
18 lib/_emerge/Scheduler.py | 142 +++++++++++++++++++++++++++------------
19 1 file changed, 99 insertions(+), 43 deletions(-)
20
21 diff --git a/lib/_emerge/Scheduler.py b/lib/_emerge/Scheduler.py
22 index a69421288..465f928a0 100644
23 --- a/lib/_emerge/Scheduler.py
24 +++ b/lib/_emerge/Scheduler.py
25 @@ -25,6 +25,7 @@ from portage._sets import SETPREFIX
26 from portage._sets.base import InternalPackageSet
27 from portage.util import ensure_dirs, writemsg, writemsg_level
28 from portage.util.futures import asyncio
29 +from portage.util.futures.compat_coroutine import coroutine, coroutine_return
30 from portage.util.SlotObject import SlotObject
31 from portage.util._async.SchedulerInterface import SchedulerInterface
32 from portage.package.ebuild.digestcheck import digestcheck
33 @@ -766,7 +767,8 @@ class Scheduler(PollScheduler):
34
35 return prefetcher
36
37 - def _run_pkg_pretend(self):
38 + @coroutine
39 + def _run_pkg_pretend(self, loop=None):
40 """
41 Since pkg_pretend output may be important, this method sends all
42 output directly to stdout (regardless of options like --quiet or
43 @@ -774,7 +776,7 @@ class Scheduler(PollScheduler):
44 """
45
46 failures = 0
47 - sched_iface = self._sched_iface
48 + sched_iface = loop = asyncio._wrap_loop(loop or self._sched_iface)
49
50 for x in self._mergelist:
51 if not isinstance(x, Package):
52 @@ -789,18 +791,28 @@ class Scheduler(PollScheduler):
53 if "pretend" not in x.defined_phases:
54 continue
55
56 - out_str =">>> Running pre-merge checks for " + colorize("INFORM", x.cpv) + "\n"
57 - portage.util.writemsg_stdout(out_str, noiselevel=-1)
58 + out_str = "Running pre-merge checks for " + colorize("INFORM", x.cpv)
59 + self._status_msg(out_str)
60
61 root_config = x.root_config
62 - settings = self.pkgsettings[root_config.root]
63 + settings = self._allocate_config(root_config.root)
64 settings.setcpv(x)
65 + if not x.built:
66 + # Get required SRC_URI metadata (it's not cached in x.metadata
67 + # because some packages have an extremely large SRC_URI value).
68 + portdb = root_config.trees["porttree"].dbapi
69 + (settings.configdict["pkg"]["SRC_URI"],) = yield portdb.async_aux_get(
70 + x.cpv, ["SRC_URI"], myrepo=x.repo, loop=loop
71 + )
72
73 # setcpv/package.env allows for per-package PORTAGE_TMPDIR so we
74 # have to validate it for each package
75 rval = _check_temp_dir(settings)
76 if rval != os.EX_OK:
77 - return rval
78 + failures += 1
79 + self._record_pkg_failure(x, settings, FAILURE)
80 + self._deallocate_config(settings)
81 + continue
82
83 build_dir_path = os.path.join(
84 os.path.realpath(settings["PORTAGE_TMPDIR"]),
85 @@ -809,7 +821,7 @@ class Scheduler(PollScheduler):
86 settings["PORTAGE_BUILDDIR"] = build_dir_path
87 build_dir = EbuildBuildDir(scheduler=sched_iface,
88 settings=settings)
89 - sched_iface.run_until_complete(build_dir.async_lock())
90 + yield build_dir.async_lock()
91 current_task = None
92
93 try:
94 @@ -835,7 +847,7 @@ class Scheduler(PollScheduler):
95 phase='clean', scheduler=sched_iface, settings=settings)
96 current_task = clean_phase
97 clean_phase.start()
98 - clean_phase.wait()
99 + yield clean_phase.async_wait()
100
101 if x.built:
102 tree = "bintree"
103 @@ -845,13 +857,19 @@ class Scheduler(PollScheduler):
104 # Display fetch on stdout, so that it's always clear what
105 # is consuming time here.
106 if bintree.isremote(x.cpv):
107 - fetcher = BinpkgFetcher(pkg=x,
108 - scheduler=sched_iface)
109 - fetcher.start()
110 - if fetcher.wait() != os.EX_OK:
111 + fetcher = self._get_prefetcher(x)
112 + if fetcher is None:
113 + fetcher = BinpkgFetcher(pkg=x, scheduler=loop)
114 + fetcher.start()
115 + # We only set the fetched value when fetcher
116 + # is a BinpkgFetcher, since BinpkgPrefetcher
117 + # handles fetch, verification, and the
118 + # bintree.inject call which moves the file.
119 + fetched = fetcher.pkg_path
120 + if (yield fetcher.async_wait()) != os.EX_OK:
121 failures += 1
122 + self._record_pkg_failure(x, settings, fetcher.returncode)
123 continue
124 - fetched = fetcher.pkg_path
125
126 if fetched is False:
127 filename = bintree.getname(x.cpv)
128 @@ -861,8 +879,9 @@ class Scheduler(PollScheduler):
129 scheduler=sched_iface, _pkg_path=filename)
130 current_task = verifier
131 verifier.start()
132 - if verifier.wait() != os.EX_OK:
133 + if (yield verifier.async_wait()) != os.EX_OK:
134 failures += 1
135 + self._record_pkg_failure(x, settings, verifier.returncode)
136 continue
137
138 if fetched:
139 @@ -870,8 +889,7 @@ class Scheduler(PollScheduler):
140
141 infloc = os.path.join(build_dir_path, "build-info")
142 ensure_dirs(infloc)
143 - self._sched_iface.run_until_complete(
144 - bintree.dbapi.unpack_metadata(settings, infloc, loop=self._sched_iface))
145 + yield bintree.dbapi.unpack_metadata(settings, infloc, loop=loop)
146 ebuild_path = os.path.join(infloc, x.pf + ".ebuild")
147 settings.configdict["pkg"]["EMERGE_FROM"] = "binary"
148 settings.configdict["pkg"]["MERGE_TYPE"] = "binary"
149 @@ -905,28 +923,45 @@ class Scheduler(PollScheduler):
150
151 current_task = pretend_phase
152 pretend_phase.start()
153 - ret = pretend_phase.wait()
154 + ret = yield pretend_phase.async_wait()
155 if ret != os.EX_OK:
156 failures += 1
157 + self._record_pkg_failure(x, settings, ret)
158 portage.elog.elog_process(x.cpv, settings)
159 finally:
160
161 if current_task is not None:
162 if current_task.isAlive():
163 current_task.cancel()
164 - current_task.wait()
165 if current_task.returncode == os.EX_OK:
166 clean_phase = EbuildPhase(background=False,
167 phase='clean', scheduler=sched_iface,
168 settings=settings)
169 clean_phase.start()
170 - clean_phase.wait()
171 + yield clean_phase.async_wait()
172
173 - sched_iface.run_until_complete(build_dir.async_unlock())
174 + yield build_dir.async_unlock()
175 + self._deallocate_config(settings)
176
177 if failures:
178 - return FAILURE
179 - return os.EX_OK
180 + return coroutine_return(FAILURE)
181 + coroutine_return(os.EX_OK)
182 +
183 + def _record_pkg_failure(self, pkg, settings, ret):
184 + """Record a package failure. This eliminates the package
185 + from the --keep-going merge list, and immediately calls
186 + _failed_pkg_msg if we have not been terminated."""
187 + self._failed_pkgs.append(
188 + self._failed_pkg(
189 + build_dir=settings.get("PORTAGE_BUILDDIR"),
190 + build_log=settings.get("PORTAGE_LOG_FILE"),
191 + pkg=pkg,
192 + returncode=ret,
193 + )
194 + )
195 + if not self._terminated_tasks:
196 + self._failed_pkg_msg(self._failed_pkgs[-1], "emerge", "for")
197 + self._status_display.failed = len(self._failed_pkgs)
198
199 def merge(self):
200 if "--resume" in self.myopts:
201 @@ -988,11 +1023,6 @@ class Scheduler(PollScheduler):
202 if rval != os.EX_OK and not keep_going:
203 return rval
204
205 - if not fetchonly:
206 - rval = self._run_pkg_pretend()
207 - if rval != os.EX_OK:
208 - return rval
209 -
210 while True:
211
212 received_signal = []
213 @@ -1389,8 +1419,6 @@ class Scheduler(PollScheduler):
214 if self._opts_no_background.intersection(self.myopts):
215 self._set_max_jobs(1)
216
217 - self._add_prefetchers()
218 - self._add_packages()
219 failed_pkgs = self._failed_pkgs
220 portage.locks._quiet = self._background
221 portage.elog.add_listener(self._elog_listener)
222 @@ -1406,6 +1434,30 @@ class Scheduler(PollScheduler):
223 rval = os.EX_OK
224
225 try:
226 + self._add_prefetchers()
227 + if not self._build_opts.fetchonly:
228 + # Run pkg_pretend concurrently with parallel-fetch, and be careful
229 + # to respond appropriately to termination, so that we don't start
230 + # any new tasks after we've been terminated. Temporarily make the
231 + # status display quiet so that its output is not interleaved with
232 + # pkg_pretend output.
233 + status_quiet = self._status_display.quiet
234 + self._status_display.quiet = True
235 + try:
236 + rval = self._sched_iface.run_until_complete(
237 + self._run_pkg_pretend(loop=self._sched_iface)
238 + )
239 + except asyncio.CancelledError:
240 + self.terminate()
241 + finally:
242 + self._status_display.quiet = status_quiet
243 + self._termination_check()
244 + if self._terminated_tasks:
245 + rval = 128 + signal.SIGINT
246 + if rval != os.EX_OK:
247 + return rval
248 +
249 + self._add_packages()
250 self._main_loop()
251 finally:
252 self._main_loop_cleanup()
253 @@ -1742,6 +1794,23 @@ class Scheduler(PollScheduler):
254
255 return bool(state_change)
256
257 + def _get_prefetcher(self, pkg):
258 + try:
259 + prefetcher = self._prefetchers.pop(pkg, None)
260 + except KeyError:
261 + # KeyError observed with PyPy 1.8, despite None given as default.
262 + # Note that PyPy 1.8 has the same WeakValueDictionary code as
263 + # CPython 2.7, so it may be possible for CPython to raise KeyError
264 + # here as well.
265 + prefetcher = None
266 + if prefetcher is not None and not prefetcher.isAlive():
267 + try:
268 + self._task_queues.fetch._task_queue.remove(prefetcher)
269 + except ValueError:
270 + pass
271 + prefetcher = None
272 + return prefetcher
273 +
274 def _task(self, pkg):
275
276 pkg_to_replace = None
277 @@ -1758,20 +1827,7 @@ class Scheduler(PollScheduler):
278 "installed", pkg.root_config, installed=True,
279 operation="uninstall")
280
281 - try:
282 - prefetcher = self._prefetchers.pop(pkg, None)
283 - except KeyError:
284 - # KeyError observed with PyPy 1.8, despite None given as default.
285 - # Note that PyPy 1.8 has the same WeakValueDictionary code as
286 - # CPython 2.7, so it may be possible for CPython to raise KeyError
287 - # here as well.
288 - prefetcher = None
289 - if prefetcher is not None and not prefetcher.isAlive():
290 - try:
291 - self._task_queues.fetch._task_queue.remove(prefetcher)
292 - except ValueError:
293 - pass
294 - prefetcher = None
295 + prefetcher = self._get_prefetcher(pkg)
296
297 task = MergeListItem(args_set=self._args_set,
298 background=self._background, binpkg_opts=self._binpkg_opts,
299 --
300 2.25.3