Gentoo Archives: gentoo-portage-dev

From: Zac Medico <zmedico@g.o>
To: gentoo-portage-dev@l.g.o
Cc: Zac Medico <zmedico@g.o>
Subject: [gentoo-portage-dev] [PATCH] FetchIterator: fix event loop recursion (bug 654038)
Date: Wed, 25 Apr 2018 09:20:29
Message-Id: 20180425092007.11959-1-zmedico@gentoo.org
1 Since construction of FetchTask instances requires results from
2 aux_get calls that would trigger event loop recursion when executed
3 synchronously, add a _fetch_tasks_future function to construct
4 FetchTask instances asynchronously and return a Future. Use an
5 _EbuildFetchTasks class to wait for the FetchTask instances to
6 become available, and then execute them.
7
8 Bug: https://bugs.gentoo.org/654038
9 ---
10 pym/portage/_emirrordist/FetchIterator.py | 311 ++++++++++++++++++++----------
11 1 file changed, 206 insertions(+), 105 deletions(-)
12
13 diff --git a/pym/portage/_emirrordist/FetchIterator.py b/pym/portage/_emirrordist/FetchIterator.py
14 index 38419799d..bd3b98cd0 100644
15 --- a/pym/portage/_emirrordist/FetchIterator.py
16 +++ b/pym/portage/_emirrordist/FetchIterator.py
17 @@ -7,14 +7,18 @@ from portage import os
18 from portage.checksum import (_apply_hash_filter,
19 _filter_unaccelarated_hashes, _hash_filter)
20 from portage.dep import use_reduce
21 -from portage.exception import PortageException
22 +from portage.exception import PortageException, PortageKeyError
23 +from portage.util._async.AsyncTaskFuture import AsyncTaskFuture
24 +from portage.util._async.TaskScheduler import TaskScheduler
25 +from portage.util.futures.iter_completed import iter_gather
26 from .FetchTask import FetchTask
27 +from _emerge.CompositeTask import CompositeTask
28 +
29
30 class FetchIterator(object):
31
32 def __init__(self, config):
33 self._config = config
34 - self._log_failure = config.log_failure
35 self._terminated = threading.Event()
36
37 def terminate(self):
38 @@ -41,9 +45,6 @@ class FetchIterator(object):
39
40 portdb = self._config.portdb
41 get_repo_for_location = portdb.repositories.get_repo_for_location
42 - file_owners = self._config.file_owners
43 - file_failures = self._config.file_failures
44 - restrict_mirror_exemptions = self._config.restrict_mirror_exemptions
45
46 hash_filter = _hash_filter(
47 portdb.settings.get("PORTAGE_CHECKSUM_FILTER", ""))
48 @@ -59,110 +60,210 @@ class FetchIterator(object):
49
50 # Reset state so the Manifest is pulled once
51 # for this cp / tree combination.
52 - digests = None
53 repo_config = get_repo_for_location(tree)
54 + digests_future = portdb._event_loop.create_future()
55
56 for cpv in portdb.cp_list(cp, mytree=tree):
57
58 if self._terminated.is_set():
59 return
60
61 - try:
62 - restrict, = portdb.aux_get(cpv, ("RESTRICT",),
63 - mytree=tree)
64 - except (KeyError, PortageException) as e:
65 - self._log_failure("%s\t\taux_get exception %s" %
66 - (cpv, e))
67 - continue
68 -
69 - # Here we use matchnone=True to ignore conditional parts
70 - # of RESTRICT since they don't apply unconditionally.
71 - # Assume such conditionals only apply on the client side.
72 - try:
73 - restrict = frozenset(use_reduce(restrict,
74 - flat=True, matchnone=True))
75 - except PortageException as e:
76 - self._log_failure("%s\t\tuse_reduce exception %s" %
77 - (cpv, e))
78 - continue
79 -
80 - if "fetch" in restrict:
81 - continue
82 -
83 - try:
84 - uri_map = portdb.getFetchMap(cpv)
85 - except PortageException as e:
86 - self._log_failure("%s\t\tgetFetchMap exception %s" %
87 - (cpv, e))
88 - continue
89 -
90 - if not uri_map:
91 - continue
92 -
93 - if "mirror" in restrict:
94 - skip = False
95 - if restrict_mirror_exemptions is not None:
96 - new_uri_map = {}
97 - for filename, uri_tuple in uri_map.items():
98 - for uri in uri_tuple:
99 - if uri[:9] == "mirror://":
100 - i = uri.find("/", 9)
101 - if i != -1 and uri[9:i].strip("/") in \
102 - restrict_mirror_exemptions:
103 - new_uri_map[filename] = uri_tuple
104 - break
105 - if new_uri_map:
106 - uri_map = new_uri_map
107 - else:
108 - skip = True
109 - else:
110 - skip = True
111 -
112 - if skip:
113 - continue
114 -
115 - # Parse Manifest for this cp if we haven't yet.
116 - if digests is None:
117 - try:
118 - digests = repo_config.load_manifest(
119 - os.path.join(repo_config.location, cp)
120 - ).getTypeDigests("DIST")
121 - except (EnvironmentError, PortageException) as e:
122 - for filename in uri_map:
123 - self._log_failure(
124 - "%s\t%s\tManifest exception %s" %
125 - (cpv, filename, e))
126 - file_failures[filename] = cpv
127 - continue
128 -
129 - if not digests:
130 - for filename in uri_map:
131 - self._log_failure("%s\t%s\tdigest entry missing" %
132 - (cpv, filename))
133 - file_failures[filename] = cpv
134 - continue
135 -
136 - for filename, uri_tuple in uri_map.items():
137 - file_digests = digests.get(filename)
138 - if file_digests is None:
139 - self._log_failure("%s\t%s\tdigest entry missing" %
140 - (cpv, filename))
141 - file_failures[filename] = cpv
142 - continue
143 - if filename in file_owners:
144 - continue
145 - file_owners[filename] = cpv
146 -
147 - file_digests = \
148 - _filter_unaccelarated_hashes(file_digests)
149 - if hash_filter is not None:
150 - file_digests = _apply_hash_filter(
151 - file_digests, hash_filter)
152 -
153 - yield FetchTask(cpv=cpv,
154 - background=True,
155 - digests=file_digests,
156 - distfile=filename,
157 - restrict=restrict,
158 - uri_tuple=uri_tuple,
159 - config=self._config)
160 + yield _EbuildFetchTasks(
161 + fetch_tasks_future=_fetch_tasks_future(
162 + self._config,
163 + hash_filter,
164 + repo_config,
165 + digests_future,
166 + cpv)
167 + )
168 +
169 +
170 +class _EbuildFetchTasks(CompositeTask):
171 + """
172 + This executes asynchronously constructed FetchTask instances for
173 + each of the files referenced by an ebuild.
174 + """
175 + __slots__ = ('fetch_tasks_future',)
176 + def _start(self):
177 + self._start_task(AsyncTaskFuture(future=self.fetch_tasks_future),
178 + self._start_fetch_tasks)
179 +
180 + def _start_fetch_tasks(self, task):
181 + if self._default_exit(task) != os.EX_OK:
182 + self._async_wait()
183 + return
184 +
185 + self._start_task(
186 + TaskScheduler(
187 + iter(self.fetch_tasks_future.result()),
188 + max_jobs=1,
189 + event_loop=self.scheduler),
190 + self._default_final_exit)
191 +
192 +
193 +def _fetch_tasks_future(config, hash_filter, repo_config, digests_future, cpv):
194 + """
195 + Asynchronously construct FetchTask instances for each of the files
196 + referenced by an ebuild.
197 +
198 + @param config: emirrordist config
199 + @type config: portage._emirrordist.Config.Config
200 + @param hash_filter: PORTAGE_CHECKSUM_FILTER settings
201 + @type hash_filter: portage.checksum._hash_filter
202 + @param repo_config: repository configuration
203 + @type repo_config: RepoConfig
204 + @param digests_future: future that contains cached distfiles digests
205 + for the current cp if available
206 + @type digests_future: asyncio.Future
207 + @param cpv: current ebuild cpv
208 + @type cpv: portage.versions._pkg_str
209 +
210 + @return: A future that results in a list containing FetchTask
211 + instances for each of the files referenced by an ebuild.
212 + @rtype: asyncio.Future (or compatible)
213 + """
214 +
215 + loop = config.portdb._event_loop
216 + result = loop.create_future()
217 + fetch_tasks = []
218 +
219 + def aux_get_done(gather_result):
220 + if result.cancelled():
221 + return
222 +
223 + aux_get_result, fetch_map_result = gather_result.result()
224 + try:
225 + restrict, = aux_get_result.result()
226 + except (PortageKeyError, PortageException) as e:
227 + config.log_failure("%s\t\taux_get exception %s" %
228 + (cpv, e))
229 + result.set_result(fetch_tasks)
230 + return
231 +
232 + # Here we use matchnone=True to ignore conditional parts
233 + # of RESTRICT since they don't apply unconditionally.
234 + # Assume such conditionals only apply on the client side.
235 + try:
236 + restrict = frozenset(use_reduce(restrict,
237 + flat=True, matchnone=True))
238 + except PortageException as e:
239 + config.log_failure("%s\t\tuse_reduce exception %s" %
240 + (cpv, e))
241 + result.set_result(fetch_tasks)
242 + return
243 +
244 + if "fetch" in restrict:
245 + result.set_result(fetch_tasks)
246 + return
247 +
248 + try:
249 + uri_map = fetch_map_result.result()
250 + except PortageException as e:
251 + config.log_failure("%s\t\tgetFetchMap exception %s" %
252 + (cpv, e))
253 + result.set_result(fetch_tasks)
254 + return
255 +
256 + if not uri_map:
257 + result.set_result(fetch_tasks)
258 + return
259 +
260 + if "mirror" in restrict:
261 + skip = False
262 + if config.restrict_mirror_exemptions is not None:
263 + new_uri_map = {}
264 + for filename, uri_tuple in uri_map.items():
265 + for uri in uri_tuple:
266 + if uri[:9] == "mirror://":
267 + i = uri.find("/", 9)
268 + if i != -1 and uri[9:i].strip("/") in \
269 + config.restrict_mirror_exemptions:
270 + new_uri_map[filename] = uri_tuple
271 + break
272 + if new_uri_map:
273 + uri_map = new_uri_map
274 + else:
275 + skip = True
276 + else:
277 + skip = True
278 +
279 + if skip:
280 + result.set_result(fetch_tasks)
281 + return
282 +
283 + # Parse Manifest for this cp if we haven't yet.
284 + if not digests_future.done():
285 + try:
286 + digests = repo_config.load_manifest(
287 + os.path.join(repo_config.location, cpv.cp)).\
288 + getTypeDigests("DIST")
289 + except (EnvironmentError, PortageException) as e:
290 + digests_future.set_exception(e)
291 + for filename in uri_map:
292 + config.log_failure(
293 + "%s\t%s\tManifest exception %s" %
294 + (cpv, filename, e))
295 + config.file_failures[filename] = cpv
296 + result.set_result(fetch_tasks)
297 + return
298 + else:
299 + digests_future.set_result(digests)
300 +
301 + digests = digests_future.result()
302 + if not digests:
303 + for filename in uri_map:
304 + config.log_failure("%s\t%s\tdigest entry missing" %
305 + (cpv, filename))
306 + config.file_failures[filename] = cpv
307 + result.set_result(fetch_tasks)
308 + return
309 +
310 + for filename, uri_tuple in uri_map.items():
311 + file_digests = digests.get(filename)
312 + if file_digests is None:
313 + config.log_failure("%s\t%s\tdigest entry missing" %
314 + (cpv, filename))
315 + config.file_failures[filename] = cpv
316 + continue
317 + if filename in config.file_owners:
318 + continue
319 + config.file_owners[filename] = cpv
320 +
321 + file_digests = \
322 + _filter_unaccelarated_hashes(file_digests)
323 + if hash_filter is not None:
324 + file_digests = _apply_hash_filter(
325 + file_digests, hash_filter)
326 +
327 + fetch_tasks.append(FetchTask(
328 + cpv=cpv,
329 + background=True,
330 + digests=file_digests,
331 + distfile=filename,
332 + restrict=restrict,
333 + uri_tuple=uri_tuple,
334 + config=config))
335 +
336 + result.set_result(fetch_tasks)
337 +
338 + def future_generator():
339 + yield config.portdb.async_aux_get(cpv, ("RESTRICT",),
340 + myrepo=repo_config.name, loop=loop)
341 + yield config.portdb.async_fetch_map(cpv,
342 + mytree=repo_config.location, loop=loop)
343 +
344 + # Use iter_gather(max_jobs=1) to limit the number of processes per
345 + # _EbuildFetchTask instance, and also to avoid spawning two bash
346 + # processes for the same cpv simultaneously (the second one can
347 + # use metadata cached by the first one).
348 + gather_result = iter_gather(
349 + future_generator(),
350 + max_jobs=1,
351 + loop=loop,
352 + )
353 + gather_result.add_done_callback(aux_get_done)
354 + result.add_done_callback(lambda result:
355 + gather_result.cancel() if result.cancelled() and
356 + not gather_result.done() else None)
357 +
358 + return result
359 --
360 2.13.6