1 |
Execute pkg_pretend phases in a coroutine while parallel-fetch |
2 |
is running concurrently. When it's time to execute the pkg_pretend |
3 |
phase for a remote binary package, use a Scheduler _get_prefetcher |
4 |
method to get a running prefetcher if available, and otherwise |
5 |
start a new fetcher. |
6 |
|
7 |
Bug: https://bugs.gentoo.org/710432 |
8 |
Signed-off-by: Zac Medico <zmedico@g.o> |
9 |
--- |
10 |
lib/_emerge/Scheduler.py | 94 +++++++++++++++++++++++++--------------- |
11 |
1 file changed, 58 insertions(+), 36 deletions(-) |
12 |
|
13 |
diff --git a/lib/_emerge/Scheduler.py b/lib/_emerge/Scheduler.py |
14 |
index a69421288..20884986f 100644 |
15 |
--- a/lib/_emerge/Scheduler.py |
16 |
+++ b/lib/_emerge/Scheduler.py |
17 |
@@ -25,6 +25,7 @@ from portage._sets import SETPREFIX |
18 |
from portage._sets.base import InternalPackageSet |
19 |
from portage.util import ensure_dirs, writemsg, writemsg_level |
20 |
from portage.util.futures import asyncio |
21 |
+from portage.util.futures.compat_coroutine import coroutine, coroutine_return |
22 |
from portage.util.SlotObject import SlotObject |
23 |
from portage.util._async.SchedulerInterface import SchedulerInterface |
24 |
from portage.package.ebuild.digestcheck import digestcheck |
25 |
@@ -766,7 +767,8 @@ class Scheduler(PollScheduler): |
26 |
|
27 |
return prefetcher |
28 |
|
29 |
- def _run_pkg_pretend(self): |
30 |
+ @coroutine |
31 |
+ def _run_pkg_pretend(self, loop=None): |
32 |
""" |
33 |
Since pkg_pretend output may be important, this method sends all |
34 |
output directly to stdout (regardless of options like --quiet or |
35 |
@@ -774,7 +776,7 @@ class Scheduler(PollScheduler): |
36 |
""" |
37 |
|
38 |
failures = 0 |
39 |
- sched_iface = self._sched_iface |
40 |
+ sched_iface = loop = asyncio._wrap_loop(loop or self._sched_iface) |
41 |
|
42 |
for x in self._mergelist: |
43 |
if not isinstance(x, Package): |
44 |
@@ -795,12 +797,18 @@ class Scheduler(PollScheduler): |
45 |
root_config = x.root_config |
46 |
settings = self.pkgsettings[root_config.root] |
47 |
settings.setcpv(x) |
48 |
+ if not x.built: |
49 |
+ # Get required SRC_URI metadata (it's not cached in x.metadata |
50 |
+ # because some packages have an extremely large SRC_URI value). |
51 |
+ portdb = root_config.trees["porttree"].dbapi |
52 |
+ settings.configdict["pkg"]["SRC_URI"], = (yield portdb.async_aux_get( |
53 |
+ x.cpv, ["SRC_URI"], myrepo=x.repo, loop=loop)) |
54 |
|
55 |
# setcpv/package.env allows for per-package PORTAGE_TMPDIR so we |
56 |
# have to validate it for each package |
57 |
rval = _check_temp_dir(settings) |
58 |
if rval != os.EX_OK: |
59 |
- return rval |
60 |
+ coroutine_return(rval) |
61 |
|
62 |
build_dir_path = os.path.join( |
63 |
os.path.realpath(settings["PORTAGE_TMPDIR"]), |
64 |
@@ -809,7 +817,7 @@ class Scheduler(PollScheduler): |
65 |
settings["PORTAGE_BUILDDIR"] = build_dir_path |
66 |
build_dir = EbuildBuildDir(scheduler=sched_iface, |
67 |
settings=settings) |
68 |
- sched_iface.run_until_complete(build_dir.async_lock()) |
69 |
+ yield build_dir.async_lock() |
70 |
current_task = None |
71 |
|
72 |
try: |
73 |
@@ -835,7 +843,7 @@ class Scheduler(PollScheduler): |
74 |
phase='clean', scheduler=sched_iface, settings=settings) |
75 |
current_task = clean_phase |
76 |
clean_phase.start() |
77 |
- clean_phase.wait() |
78 |
+ yield clean_phase.async_wait() |
79 |
|
80 |
if x.built: |
81 |
tree = "bintree" |
82 |
@@ -845,10 +853,11 @@ class Scheduler(PollScheduler): |
83 |
# Display fetch on stdout, so that it's always clear what |
84 |
# is consuming time here. |
85 |
if bintree.isremote(x.cpv): |
86 |
- fetcher = BinpkgFetcher(pkg=x, |
87 |
- scheduler=sched_iface) |
88 |
- fetcher.start() |
89 |
- if fetcher.wait() != os.EX_OK: |
90 |
+ fetcher = self._get_prefetcher(x) |
91 |
+ if fetcher is None: |
92 |
+ fetcher = BinpkgFetcher(pkg=x, scheduler=loop) |
93 |
+ fetcher.start() |
94 |
+ if (yield fetcher.async_wait()) != os.EX_OK: |
95 |
failures += 1 |
96 |
continue |
97 |
fetched = fetcher.pkg_path |
98 |
@@ -861,7 +870,7 @@ class Scheduler(PollScheduler): |
99 |
scheduler=sched_iface, _pkg_path=filename) |
100 |
current_task = verifier |
101 |
verifier.start() |
102 |
- if verifier.wait() != os.EX_OK: |
103 |
+ if (yield verifier.async_wait()) != os.EX_OK: |
104 |
failures += 1 |
105 |
continue |
106 |
|
107 |
@@ -870,8 +879,7 @@ class Scheduler(PollScheduler): |
108 |
|
109 |
infloc = os.path.join(build_dir_path, "build-info") |
110 |
ensure_dirs(infloc) |
111 |
- self._sched_iface.run_until_complete( |
112 |
- bintree.dbapi.unpack_metadata(settings, infloc, loop=self._sched_iface)) |
113 |
+ yield bintree.dbapi.unpack_metadata(settings, infloc, loop=loop) |
114 |
ebuild_path = os.path.join(infloc, x.pf + ".ebuild") |
115 |
settings.configdict["pkg"]["EMERGE_FROM"] = "binary" |
116 |
settings.configdict["pkg"]["MERGE_TYPE"] = "binary" |
117 |
@@ -905,7 +913,7 @@ class Scheduler(PollScheduler): |
118 |
|
119 |
current_task = pretend_phase |
120 |
pretend_phase.start() |
121 |
- ret = pretend_phase.wait() |
122 |
+ ret = (yield pretend_phase.async_wait()) |
123 |
if ret != os.EX_OK: |
124 |
failures += 1 |
125 |
portage.elog.elog_process(x.cpv, settings) |
126 |
@@ -920,13 +928,13 @@ class Scheduler(PollScheduler): |
127 |
phase='clean', scheduler=sched_iface, |
128 |
settings=settings) |
129 |
clean_phase.start() |
130 |
- clean_phase.wait() |
131 |
+ yield clean_phase.async_wait() |
132 |
|
133 |
- sched_iface.run_until_complete(build_dir.async_unlock()) |
134 |
+ yield build_dir.async_unlock() |
135 |
|
136 |
if failures: |
137 |
- return FAILURE |
138 |
- return os.EX_OK |
139 |
+ return coroutine_return(FAILURE) |
140 |
+ coroutine_return(os.EX_OK) |
141 |
|
142 |
def merge(self): |
143 |
if "--resume" in self.myopts: |
144 |
@@ -988,11 +996,6 @@ class Scheduler(PollScheduler): |
145 |
if rval != os.EX_OK and not keep_going: |
146 |
return rval |
147 |
|
148 |
- if not fetchonly: |
149 |
- rval = self._run_pkg_pretend() |
150 |
- if rval != os.EX_OK: |
151 |
- return rval |
152 |
- |
153 |
while True: |
154 |
|
155 |
received_signal = [] |
156 |
@@ -1390,6 +1393,21 @@ class Scheduler(PollScheduler): |
157 |
self._set_max_jobs(1) |
158 |
|
159 |
self._add_prefetchers() |
160 |
+ |
161 |
+ if not self._build_opts.fetchonly: |
162 |
+ # Run pkg_pretend concurrently with parallel-fetch, and be careful |
163 |
+ # to respond appropriately to termination, so that we don't start |
164 |
+ # any new tasks after we've been terminated. |
165 |
+ try: |
166 |
+ rval = self._sched_iface.run_until_complete(self._run_pkg_pretend(loop=self._sched_iface)) |
167 |
+ except asyncio.CancelledError: |
168 |
+ rval = None |
169 |
+ self._termination_check() |
170 |
+ if self._terminated_tasks or rval is None: |
171 |
+ rval = 128 + signal.SIGINT |
172 |
+ if rval != os.EX_OK: |
173 |
+ return rval |
174 |
+ |
175 |
self._add_packages() |
176 |
failed_pkgs = self._failed_pkgs |
177 |
portage.locks._quiet = self._background |
178 |
@@ -1742,6 +1760,23 @@ class Scheduler(PollScheduler): |
179 |
|
180 |
return bool(state_change) |
181 |
|
182 |
+ def _get_prefetcher(self, pkg): |
183 |
+ try: |
184 |
+ prefetcher = self._prefetchers.pop(pkg, None) |
185 |
+ except KeyError: |
186 |
+ # KeyError observed with PyPy 1.8, despite None given as default. |
187 |
+ # Note that PyPy 1.8 has the same WeakValueDictionary code as |
188 |
+ # CPython 2.7, so it may be possible for CPython to raise KeyError |
189 |
+ # here as well. |
190 |
+ prefetcher = None |
191 |
+ if prefetcher is not None and not prefetcher.isAlive(): |
192 |
+ try: |
193 |
+ self._task_queues.fetch._task_queue.remove(prefetcher) |
194 |
+ except ValueError: |
195 |
+ pass |
196 |
+ prefetcher = None |
197 |
+ return prefetcher |
198 |
+ |
199 |
def _task(self, pkg): |
200 |
|
201 |
pkg_to_replace = None |
202 |
@@ -1758,20 +1793,7 @@ class Scheduler(PollScheduler): |
203 |
"installed", pkg.root_config, installed=True, |
204 |
operation="uninstall") |
205 |
|
206 |
- try: |
207 |
- prefetcher = self._prefetchers.pop(pkg, None) |
208 |
- except KeyError: |
209 |
- # KeyError observed with PyPy 1.8, despite None given as default. |
210 |
- # Note that PyPy 1.8 has the same WeakValueDictionary code as |
211 |
- # CPython 2.7, so it may be possible for CPython to raise KeyError |
212 |
- # here as well. |
213 |
- prefetcher = None |
214 |
- if prefetcher is not None and not prefetcher.isAlive(): |
215 |
- try: |
216 |
- self._task_queues.fetch._task_queue.remove(prefetcher) |
217 |
- except ValueError: |
218 |
- pass |
219 |
- prefetcher = None |
220 |
+ prefetcher = self._get_prefetcher(pkg) |
221 |
|
222 |
task = MergeListItem(args_set=self._args_set, |
223 |
background=self._background, binpkg_opts=self._binpkg_opts, |
224 |
-- |
225 |
2.25.3 |