Gentoo Archives: gentoo-commits

From: Zac Medico <zmedico@g.o>
To: gentoo-commits@l.g.o
Subject: [gentoo-commits] proj/portage:master commit in: pym/portage/_emirrordist/
Date: Sat, 28 Apr 2018 14:02:02
Message-Id: 1524923802.7c652d1b967f9c4c6a7fbd9fc5e46a0e57438a16.zmedico@gentoo
1 commit: 7c652d1b967f9c4c6a7fbd9fc5e46a0e57438a16
2 Author: Zac Medico <zmedico <AT> gentoo <DOT> org>
3 AuthorDate: Wed Apr 25 06:42:10 2018 +0000
4 Commit: Zac Medico <zmedico <AT> gentoo <DOT> org>
5 CommitDate: Sat Apr 28 13:56:42 2018 +0000
6 URL: https://gitweb.gentoo.org/proj/portage.git/commit/?id=7c652d1b
7
8 FetchIterator: fix event loop recursion (bug 654038)
9
10 Since construction of FetchTask instances requires results from
11 aux_get calls that would trigger event loop recursion when executed
12 synchronously, add an _async_fetch_tasks function to construct
13 FetchTask instances asynchronously and return a Future. Use an
14 _EbuildFetchTasks class to wait for the FetchTask instances to
15 become available, and then execute them.
16
17 Bug: https://bugs.gentoo.org/654038
18
19 pym/portage/_emirrordist/FetchIterator.py | 324 ++++++++++++++++++++----------
20 1 file changed, 218 insertions(+), 106 deletions(-)
21
22 diff --git a/pym/portage/_emirrordist/FetchIterator.py b/pym/portage/_emirrordist/FetchIterator.py
23 index 38419799d..366453c12 100644
24 --- a/pym/portage/_emirrordist/FetchIterator.py
25 +++ b/pym/portage/_emirrordist/FetchIterator.py
26 @@ -1,4 +1,4 @@
27 -# Copyright 2013 Gentoo Foundation
28 +# Copyright 2013-2018 Gentoo Foundation
29 # Distributed under the terms of the GNU General Public License v2
30
31 import threading
32 @@ -7,14 +7,18 @@ from portage import os
33 from portage.checksum import (_apply_hash_filter,
34 _filter_unaccelarated_hashes, _hash_filter)
35 from portage.dep import use_reduce
36 -from portage.exception import PortageException
37 +from portage.exception import PortageException, PortageKeyError
38 +from portage.util._async.AsyncTaskFuture import AsyncTaskFuture
39 +from portage.util._async.TaskScheduler import TaskScheduler
40 +from portage.util.futures.iter_completed import iter_gather
41 from .FetchTask import FetchTask
42 +from _emerge.CompositeTask import CompositeTask
43 +
44
45 class FetchIterator(object):
46
47 def __init__(self, config):
48 self._config = config
49 - self._log_failure = config.log_failure
50 self._terminated = threading.Event()
51
52 def terminate(self):
53 @@ -41,9 +45,6 @@ class FetchIterator(object):
54
55 portdb = self._config.portdb
56 get_repo_for_location = portdb.repositories.get_repo_for_location
57 - file_owners = self._config.file_owners
58 - file_failures = self._config.file_failures
59 - restrict_mirror_exemptions = self._config.restrict_mirror_exemptions
60
61 hash_filter = _hash_filter(
62 portdb.settings.get("PORTAGE_CHECKSUM_FILTER", ""))
63 @@ -59,110 +60,221 @@ class FetchIterator(object):
64
65 # Reset state so the Manifest is pulled once
66 # for this cp / tree combination.
67 - digests = None
68 repo_config = get_repo_for_location(tree)
69 + digests_future = portdb._event_loop.create_future()
70
71 for cpv in portdb.cp_list(cp, mytree=tree):
72
73 if self._terminated.is_set():
74 return
75
76 - try:
77 - restrict, = portdb.aux_get(cpv, ("RESTRICT",),
78 - mytree=tree)
79 - except (KeyError, PortageException) as e:
80 - self._log_failure("%s\t\taux_get exception %s" %
81 - (cpv, e))
82 - continue
83 -
84 - # Here we use matchnone=True to ignore conditional parts
85 - # of RESTRICT since they don't apply unconditionally.
86 - # Assume such conditionals only apply on the client side.
87 - try:
88 - restrict = frozenset(use_reduce(restrict,
89 - flat=True, matchnone=True))
90 - except PortageException as e:
91 - self._log_failure("%s\t\tuse_reduce exception %s" %
92 - (cpv, e))
93 - continue
94 -
95 - if "fetch" in restrict:
96 - continue
97 -
98 - try:
99 - uri_map = portdb.getFetchMap(cpv)
100 - except PortageException as e:
101 - self._log_failure("%s\t\tgetFetchMap exception %s" %
102 - (cpv, e))
103 - continue
104 -
105 - if not uri_map:
106 - continue
107 -
108 - if "mirror" in restrict:
109 - skip = False
110 - if restrict_mirror_exemptions is not None:
111 - new_uri_map = {}
112 - for filename, uri_tuple in uri_map.items():
113 - for uri in uri_tuple:
114 - if uri[:9] == "mirror://":
115 - i = uri.find("/", 9)
116 - if i != -1 and uri[9:i].strip("/") in \
117 - restrict_mirror_exemptions:
118 - new_uri_map[filename] = uri_tuple
119 - break
120 - if new_uri_map:
121 - uri_map = new_uri_map
122 - else:
123 - skip = True
124 - else:
125 - skip = True
126 -
127 - if skip:
128 - continue
129 -
130 - # Parse Manifest for this cp if we haven't yet.
131 - if digests is None:
132 - try:
133 - digests = repo_config.load_manifest(
134 - os.path.join(repo_config.location, cp)
135 - ).getTypeDigests("DIST")
136 - except (EnvironmentError, PortageException) as e:
137 - for filename in uri_map:
138 - self._log_failure(
139 - "%s\t%s\tManifest exception %s" %
140 - (cpv, filename, e))
141 - file_failures[filename] = cpv
142 - continue
143 -
144 - if not digests:
145 - for filename in uri_map:
146 - self._log_failure("%s\t%s\tdigest entry missing" %
147 - (cpv, filename))
148 - file_failures[filename] = cpv
149 - continue
150 -
151 - for filename, uri_tuple in uri_map.items():
152 - file_digests = digests.get(filename)
153 - if file_digests is None:
154 - self._log_failure("%s\t%s\tdigest entry missing" %
155 - (cpv, filename))
156 - file_failures[filename] = cpv
157 - continue
158 - if filename in file_owners:
159 - continue
160 - file_owners[filename] = cpv
161 -
162 - file_digests = \
163 - _filter_unaccelarated_hashes(file_digests)
164 - if hash_filter is not None:
165 - file_digests = _apply_hash_filter(
166 - file_digests, hash_filter)
167 -
168 - yield FetchTask(cpv=cpv,
169 - background=True,
170 - digests=file_digests,
171 - distfile=filename,
172 - restrict=restrict,
173 - uri_tuple=uri_tuple,
174 - config=self._config)
175 + yield _EbuildFetchTasks(
176 + fetch_tasks_future=_async_fetch_tasks(
177 + self._config,
178 + hash_filter,
179 + repo_config,
180 + digests_future,
181 + cpv,
182 + portdb._event_loop)
183 + )
184 +
185 +
186 +class _EbuildFetchTasks(CompositeTask):
187 + """
188 + Executes FetchTask instances (which are asynchronously constructed)
189 + for each of the files referenced by an ebuild.
190 + """
191 + __slots__ = ('fetch_tasks_future',)
192 + def _start(self):
193 + self._start_task(AsyncTaskFuture(future=self.fetch_tasks_future),
194 + self._start_fetch_tasks)
195 +
196 + def _start_fetch_tasks(self, task):
197 + if self._default_exit(task) != os.EX_OK:
198 + self._async_wait()
199 + return
200 +
201 + self._start_task(
202 + TaskScheduler(
203 + iter(self.fetch_tasks_future.result()),
204 + max_jobs=1,
205 + event_loop=self.scheduler),
206 + self._default_final_exit)
207 +
208 +
209 +def _async_fetch_tasks(config, hash_filter, repo_config, digests_future, cpv,
210 + loop):
211 + """
212 + Asynchronously construct FetchTask instances for each of the files
213 + referenced by an ebuild.
214 +
215 + @param config: emirrordist config
216 + @type config: portage._emirrordist.Config.Config
217 + @param hash_filter: PORTAGE_CHECKSUM_FILTER settings
218 + @type hash_filter: portage.checksum._hash_filter
219 + @param repo_config: repository configuration
220 + @type repo_config: RepoConfig
221 + @param digests_future: future that contains cached distfiles digests
222 + for the current cp if available
223 + @type digests_future: asyncio.Future
224 + @param cpv: current ebuild cpv
225 + @type cpv: portage.versions._pkg_str
226 + @param loop: event loop
227 + @type loop: EventLoop
228 + @return: A future that results in a list containing FetchTask
229 + instances for each of the files referenced by an ebuild.
230 + @rtype: asyncio.Future (or compatible)
231 + """
232 + loop = getattr(loop, '_asyncio_wrapper', loop)
233 + result = loop.create_future()
234 + fetch_tasks = []
235 +
236 + def aux_get_done(gather_result):
237 + # All exceptions must be consumed from gather_result before this
238 + # function returns, in order to avoid triggering the event loop's
239 + # exception handler.
240 + if not gather_result.cancelled():
241 + list(future.exception() for future in gather_result.result()
242 + if not future.cancelled())
243 +
244 + if result.cancelled():
245 + return
246 +
247 + aux_get_result, fetch_map_result = gather_result.result()
248 + try:
249 + restrict, = aux_get_result.result()
250 + except (PortageKeyError, PortageException) as e:
251 + config.log_failure("%s\t\taux_get exception %s" %
252 + (cpv, e))
253 + result.set_result(fetch_tasks)
254 + return
255 +
256 + # Here we use matchnone=True to ignore conditional parts
257 + # of RESTRICT since they don't apply unconditionally.
258 + # Assume such conditionals only apply on the client side.
259 + try:
260 + restrict = frozenset(use_reduce(restrict,
261 + flat=True, matchnone=True))
262 + except PortageException as e:
263 + config.log_failure("%s\t\tuse_reduce exception %s" %
264 + (cpv, e))
265 + result.set_result(fetch_tasks)
266 + return
267 +
268 + if "fetch" in restrict:
269 + result.set_result(fetch_tasks)
270 + return
271 +
272 + try:
273 + uri_map = fetch_map_result.result()
274 + except PortageException as e:
275 + config.log_failure("%s\t\tgetFetchMap exception %s" %
276 + (cpv, e))
277 + result.set_result(fetch_tasks)
278 + return
279 +
280 + if not uri_map:
281 + result.set_result(fetch_tasks)
282 + return
283 +
284 + if "mirror" in restrict:
285 + skip = False
286 + if config.restrict_mirror_exemptions is not None:
287 + new_uri_map = {}
288 + for filename, uri_tuple in uri_map.items():
289 + for uri in uri_tuple:
290 + if uri[:9] == "mirror://":
291 + i = uri.find("/", 9)
292 + if i != -1 and uri[9:i].strip("/") in \
293 + config.restrict_mirror_exemptions:
294 + new_uri_map[filename] = uri_tuple
295 + break
296 + if new_uri_map:
297 + uri_map = new_uri_map
298 + else:
299 + skip = True
300 + else:
301 + skip = True
302 +
303 + if skip:
304 + result.set_result(fetch_tasks)
305 + return
306 +
307 + # Parse Manifest for this cp if we haven't yet.
308 + try:
309 + if digests_future.done():
310 + # If there's an exception then raise it.
311 + digests = digests_future.result()
312 + else:
313 + digests = repo_config.load_manifest(
314 + os.path.join(repo_config.location, cpv.cp)).\
315 + getTypeDigests("DIST")
316 + except (EnvironmentError, PortageException) as e:
317 + digests_future.done() or digests_future.set_exception(e)
318 + for filename in uri_map:
319 + config.log_failure(
320 + "%s\t%s\tManifest exception %s" %
321 + (cpv, filename, e))
322 + config.file_failures[filename] = cpv
323 + result.set_result(fetch_tasks)
324 + return
325 + else:
326 + digests_future.done() or digests_future.set_result(digests)
327 +
328 + if not digests:
329 + for filename in uri_map:
330 + config.log_failure("%s\t%s\tdigest entry missing" %
331 + (cpv, filename))
332 + config.file_failures[filename] = cpv
333 + result.set_result(fetch_tasks)
334 + return
335 +
336 + for filename, uri_tuple in uri_map.items():
337 + file_digests = digests.get(filename)
338 + if file_digests is None:
339 + config.log_failure("%s\t%s\tdigest entry missing" %
340 + (cpv, filename))
341 + config.file_failures[filename] = cpv
342 + continue
343 + if filename in config.file_owners:
344 + continue
345 + config.file_owners[filename] = cpv
346 +
347 + file_digests = \
348 + _filter_unaccelarated_hashes(file_digests)
349 + if hash_filter is not None:
350 + file_digests = _apply_hash_filter(
351 + file_digests, hash_filter)
352 +
353 + fetch_tasks.append(FetchTask(
354 + cpv=cpv,
355 + background=True,
356 + digests=file_digests,
357 + distfile=filename,
358 + restrict=restrict,
359 + uri_tuple=uri_tuple,
360 + config=config))
361 +
362 + result.set_result(fetch_tasks)
363 +
364 + def future_generator():
365 + yield config.portdb.async_aux_get(cpv, ("RESTRICT",),
366 + myrepo=repo_config.name, loop=loop)
367 + yield config.portdb.async_fetch_map(cpv,
368 + mytree=repo_config.location, loop=loop)
369 +
370 + # Use iter_gather(max_jobs=1) to limit the number of processes per
371 + # _EbuildFetchTask instance, and also to avoid spawning two bash
372 + # processes for the same cpv simultaneously (the second one can
373 + # use metadata cached by the first one).
374 + gather_result = iter_gather(
375 + future_generator(),
376 + max_jobs=1,
377 + loop=loop,
378 + )
379 + gather_result.add_done_callback(aux_get_done)
380 + result.add_done_callback(lambda result:
381 + gather_result.cancel() if result.cancelled() and
382 + not gather_result.done() else None)
383 +
384 + return result