Gentoo Archives: gentoo-portage-dev

From: Zac Medico <zmedico@g.o>
To: gentoo-portage-dev@l.g.o
Cc: Zac Medico <zmedico@g.o>
Subject: [gentoo-portage-dev] [PATCH] emerge: enable parallel-fetch during pkg_pretend (bug 710432)
Date: Sun, 20 Sep 2020 02:11:21
Message-Id: 20200920020527.355385-1-zmedico@gentoo.org
1 Execute pkg_pretend phases in a coroutine while parallel-fetch
2 is running concurrently. When it's time to execute the pkg_pretend
3 phase for a remote binary package, use a Scheduler _get_prefetcher
4 method to get a running prefetcher if available, and otherwise
5 start a new fetcher.
6
7 Bug: https://bugs.gentoo.org/710432
8 Signed-off-by: Zac Medico <zmedico@g.o>
9 ---
10 lib/_emerge/Scheduler.py | 94 +++++++++++++++++++++++++---------------
11 1 file changed, 58 insertions(+), 36 deletions(-)
12
13 diff --git a/lib/_emerge/Scheduler.py b/lib/_emerge/Scheduler.py
14 index a69421288..20884986f 100644
15 --- a/lib/_emerge/Scheduler.py
16 +++ b/lib/_emerge/Scheduler.py
17 @@ -25,6 +25,7 @@ from portage._sets import SETPREFIX
18 from portage._sets.base import InternalPackageSet
19 from portage.util import ensure_dirs, writemsg, writemsg_level
20 from portage.util.futures import asyncio
21 +from portage.util.futures.compat_coroutine import coroutine, coroutine_return
22 from portage.util.SlotObject import SlotObject
23 from portage.util._async.SchedulerInterface import SchedulerInterface
24 from portage.package.ebuild.digestcheck import digestcheck
25 @@ -766,7 +767,8 @@ class Scheduler(PollScheduler):
26
27 return prefetcher
28
29 - def _run_pkg_pretend(self):
30 + @coroutine
31 + def _run_pkg_pretend(self, loop=None):
32 """
33 Since pkg_pretend output may be important, this method sends all
34 output directly to stdout (regardless of options like --quiet or
35 @@ -774,7 +776,7 @@ class Scheduler(PollScheduler):
36 """
37
38 failures = 0
39 - sched_iface = self._sched_iface
40 + sched_iface = loop = asyncio._wrap_loop(loop or self._sched_iface)
41
42 for x in self._mergelist:
43 if not isinstance(x, Package):
44 @@ -795,12 +797,18 @@ class Scheduler(PollScheduler):
45 root_config = x.root_config
46 settings = self.pkgsettings[root_config.root]
47 settings.setcpv(x)
48 + if not x.built:
49 + # Get required SRC_URI metadata (it's not cached in x.metadata
50 + # because some packages have an extremely large SRC_URI value).
51 + portdb = root_config.trees["porttree"].dbapi
52 + settings.configdict["pkg"]["SRC_URI"], = (yield portdb.async_aux_get(
53 + x.cpv, ["SRC_URI"], myrepo=x.repo, loop=loop))
54
55 # setcpv/package.env allows for per-package PORTAGE_TMPDIR so we
56 # have to validate it for each package
57 rval = _check_temp_dir(settings)
58 if rval != os.EX_OK:
59 - return rval
60 + coroutine_return(rval)
61
62 build_dir_path = os.path.join(
63 os.path.realpath(settings["PORTAGE_TMPDIR"]),
64 @@ -809,7 +817,7 @@ class Scheduler(PollScheduler):
65 settings["PORTAGE_BUILDDIR"] = build_dir_path
66 build_dir = EbuildBuildDir(scheduler=sched_iface,
67 settings=settings)
68 - sched_iface.run_until_complete(build_dir.async_lock())
69 + yield build_dir.async_lock()
70 current_task = None
71
72 try:
73 @@ -835,7 +843,7 @@ class Scheduler(PollScheduler):
74 phase='clean', scheduler=sched_iface, settings=settings)
75 current_task = clean_phase
76 clean_phase.start()
77 - clean_phase.wait()
78 + yield clean_phase.async_wait()
79
80 if x.built:
81 tree = "bintree"
82 @@ -845,10 +853,11 @@ class Scheduler(PollScheduler):
83 # Display fetch on stdout, so that it's always clear what
84 # is consuming time here.
85 if bintree.isremote(x.cpv):
86 - fetcher = BinpkgFetcher(pkg=x,
87 - scheduler=sched_iface)
88 - fetcher.start()
89 - if fetcher.wait() != os.EX_OK:
90 + fetcher = self._get_prefetcher(x)
91 + if fetcher is None:
92 + fetcher = BinpkgFetcher(pkg=x, scheduler=loop)
93 + fetcher.start()
94 + if (yield fetcher.async_wait()) != os.EX_OK:
95 failures += 1
96 continue
97 fetched = fetcher.pkg_path
98 @@ -861,7 +870,7 @@ class Scheduler(PollScheduler):
99 scheduler=sched_iface, _pkg_path=filename)
100 current_task = verifier
101 verifier.start()
102 - if verifier.wait() != os.EX_OK:
103 + if (yield verifier.async_wait()) != os.EX_OK:
104 failures += 1
105 continue
106
107 @@ -870,8 +879,7 @@ class Scheduler(PollScheduler):
108
109 infloc = os.path.join(build_dir_path, "build-info")
110 ensure_dirs(infloc)
111 - self._sched_iface.run_until_complete(
112 - bintree.dbapi.unpack_metadata(settings, infloc, loop=self._sched_iface))
113 + yield bintree.dbapi.unpack_metadata(settings, infloc, loop=loop)
114 ebuild_path = os.path.join(infloc, x.pf + ".ebuild")
115 settings.configdict["pkg"]["EMERGE_FROM"] = "binary"
116 settings.configdict["pkg"]["MERGE_TYPE"] = "binary"
117 @@ -905,7 +913,7 @@ class Scheduler(PollScheduler):
118
119 current_task = pretend_phase
120 pretend_phase.start()
121 - ret = pretend_phase.wait()
122 + ret = (yield pretend_phase.async_wait())
123 if ret != os.EX_OK:
124 failures += 1
125 portage.elog.elog_process(x.cpv, settings)
126 @@ -920,13 +928,13 @@ class Scheduler(PollScheduler):
127 phase='clean', scheduler=sched_iface,
128 settings=settings)
129 clean_phase.start()
130 - clean_phase.wait()
131 + yield clean_phase.async_wait()
132
133 - sched_iface.run_until_complete(build_dir.async_unlock())
134 + yield build_dir.async_unlock()
135
136 if failures:
137 - return FAILURE
138 - return os.EX_OK
139 + return coroutine_return(FAILURE)
140 + coroutine_return(os.EX_OK)
141
142 def merge(self):
143 if "--resume" in self.myopts:
144 @@ -988,11 +996,6 @@ class Scheduler(PollScheduler):
145 if rval != os.EX_OK and not keep_going:
146 return rval
147
148 - if not fetchonly:
149 - rval = self._run_pkg_pretend()
150 - if rval != os.EX_OK:
151 - return rval
152 -
153 while True:
154
155 received_signal = []
156 @@ -1390,6 +1393,21 @@ class Scheduler(PollScheduler):
157 self._set_max_jobs(1)
158
159 self._add_prefetchers()
160 +
161 + if not self._build_opts.fetchonly:
162 + # Run pkg_pretend concurrently with parallel-fetch, and be careful
163 + # to respond appropriately to termination, so that we don't start
164 + # any new tasks after we've been terminated.
165 + try:
166 + rval = self._sched_iface.run_until_complete(self._run_pkg_pretend(loop=self._sched_iface))
167 + except asyncio.CancelledError:
168 + rval = None
169 + self._termination_check()
170 + if self._terminated_tasks or rval is None:
171 + rval = 128 + signal.SIGINT
172 + if rval != os.EX_OK:
173 + return rval
174 +
175 self._add_packages()
176 failed_pkgs = self._failed_pkgs
177 portage.locks._quiet = self._background
178 @@ -1742,6 +1760,23 @@ class Scheduler(PollScheduler):
179
180 return bool(state_change)
181
182 + def _get_prefetcher(self, pkg):
183 + try:
184 + prefetcher = self._prefetchers.pop(pkg, None)
185 + except KeyError:
186 + # KeyError observed with PyPy 1.8, despite None given as default.
187 + # Note that PyPy 1.8 has the same WeakValueDictionary code as
188 + # CPython 2.7, so it may be possible for CPython to raise KeyError
189 + # here as well.
190 + prefetcher = None
191 + if prefetcher is not None and not prefetcher.isAlive():
192 + try:
193 + self._task_queues.fetch._task_queue.remove(prefetcher)
194 + except ValueError:
195 + pass
196 + prefetcher = None
197 + return prefetcher
198 +
199 def _task(self, pkg):
200
201 pkg_to_replace = None
202 @@ -1758,20 +1793,7 @@ class Scheduler(PollScheduler):
203 "installed", pkg.root_config, installed=True,
204 operation="uninstall")
205
206 - try:
207 - prefetcher = self._prefetchers.pop(pkg, None)
208 - except KeyError:
209 - # KeyError observed with PyPy 1.8, despite None given as default.
210 - # Note that PyPy 1.8 has the same WeakValueDictionary code as
211 - # CPython 2.7, so it may be possible for CPython to raise KeyError
212 - # here as well.
213 - prefetcher = None
214 - if prefetcher is not None and not prefetcher.isAlive():
215 - try:
216 - self._task_queues.fetch._task_queue.remove(prefetcher)
217 - except ValueError:
218 - pass
219 - prefetcher = None
220 + prefetcher = self._get_prefetcher(pkg)
221
222 task = MergeListItem(args_set=self._args_set,
223 background=self._background, binpkg_opts=self._binpkg_opts,
224 --
225 2.25.3

Replies