Gentoo Archives: gentoo-portage-dev

From: Zac Medico <zmedico@g.o>
To: gentoo-portage-dev@l.g.o
Cc: Zac Medico <zmedico@g.o>
Subject: [gentoo-portage-dev] [PATCH v2] ManifestScheduler: async fetchlist_dict (bug 653946)
Date: Tue, 24 Apr 2018 15:51:48
Message-Id: 20180424155122.23069-1-zmedico@gentoo.org
In Reply to: [gentoo-portage-dev] [PATCH] ManifestScheduler: async fetchlist_dict (bug 653946) by Zac Medico
1 In order to avoid event loop recursion, pass fetchlist_dict to
2 ManifestTask as a Future.
3
4 Bug: https://bugs.gentoo.org/653946
5 ---
6 [PATCH v2] fixed _future_fetchlist to limit concurrent async_aux_get
7 calls with async_iter_completed(max_jobs=1)
8
9 .../ebuild/_parallel_manifest/ManifestScheduler.py | 108 ++++++++++++++++++---
10 .../ebuild/_parallel_manifest/ManifestTask.py | 22 +++++
11 pym/portage/tests/dbapi/test_portdb_cache.py | 1 +
12 3 files changed, 117 insertions(+), 14 deletions(-)
13
14 diff --git a/pym/portage/package/ebuild/_parallel_manifest/ManifestScheduler.py b/pym/portage/package/ebuild/_parallel_manifest/ManifestScheduler.py
15 index 38ac4825e..2f453afc2 100644
16 --- a/pym/portage/package/ebuild/_parallel_manifest/ManifestScheduler.py
17 +++ b/pym/portage/package/ebuild/_parallel_manifest/ManifestScheduler.py
18 @@ -4,9 +4,9 @@
19 import portage
20 from portage import os
21 from portage.dep import _repo_separator
22 -from portage.exception import InvalidDependString
23 from portage.localization import _
24 from portage.util._async.AsyncScheduler import AsyncScheduler
25 +from portage.util.futures.iter_completed import async_iter_completed
26 from .ManifestTask import ManifestTask
27
28 class ManifestScheduler(AsyncScheduler):
29 @@ -63,21 +63,14 @@ class ManifestScheduler(AsyncScheduler):
30 cpv_list = portdb.cp_list(cp, mytree=[repo_config.location])
31 if not cpv_list:
32 continue
33 - fetchlist_dict = {}
34 - try:
35 - for cpv in cpv_list:
36 - fetchlist_dict[cpv] = \
37 - list(portdb.getFetchMap(cpv, mytree=mytree))
38 - except InvalidDependString as e:
39 - portage.writemsg(
40 - _("!!! %s%s%s: SRC_URI: %s\n") %
41 - (cp, _repo_separator, repo_config.name, e),
42 - noiselevel=-1)
43 - self._error_count += 1
44 - continue
45
46 + # Use _future_fetchlist(max_jobs=1), since we
47 + # spawn concurrent ManifestTask instances.
48 yield ManifestTask(cp=cp, distdir=distdir,
49 - fetchlist_dict=fetchlist_dict, repo_config=repo_config,
50 + fetchlist_dict=_future_fetchlist(
51 + self._event_loop, portdb, repo_config, cp, cpv_list,
52 + max_jobs=1),
53 + repo_config=repo_config,
54 gpg_cmd=self._gpg_cmd, gpg_vars=self._gpg_vars,
55 force_sign_key=self._force_sign_key)
56
57 @@ -91,3 +84,90 @@ class ManifestScheduler(AsyncScheduler):
58 noiselevel=-1)
59
60 AsyncScheduler._task_exit(self, task)
61 +
62 +
63 +def _future_fetchlist(loop, portdb, repo_config, cp, cpv_list,
64 + max_jobs=None, max_load=None):
65 + """
66 + Asynchronous form of FetchlistDict, with max_jobs and max_load
67 + parameters in order to control async_aux_get concurrency.
68 +
69 + @param loop: event loop
70 + @type loop: EventLoop
71 + @param portdb: portdbapi instance
72 + @type portdb: portdbapi
73 + @param repo_config: repository configuration for a Manifest
74 + @type repo_config: RepoConfig
75 + @param cp: cp for a Manifest
76 + @type cp: str
77 + @param cpv_list: list of ebuild cpv values for a Manifest
78 + @type cpv_list: list
79 + @param max_jobs: max number of futures to process concurrently (default
80 + is multiprocessing.cpu_count())
81 + @type max_jobs: int
82 + @param max_load: max load allowed when scheduling a new future,
83 + otherwise schedule no more than 1 future at a time (default
84 + is multiprocessing.cpu_count())
85 + @type max_load: int or float
86 + @return: a Future resulting in a Mapping compatible with FetchlistDict
87 + @rtype: asyncio.Future (or compatible)
88 + """
89 + loop = getattr(loop, '_asyncio_wrapper', loop)
90 + result = loop.create_future()
91 + futures = {}
92 +
93 + def future_generator():
94 + for cpv in cpv_list:
95 + future = futures[cpv] = portdb.async_fetch_map(
96 + cpv, mytree=repo_config.location, loop=loop)
97 + yield future
98 +
99 + completed_iter = async_iter_completed(
100 + future_generator(),
101 + max_jobs=max_jobs,
102 + max_load=max_load,
103 + loop=loop,
104 + )
105 +
106 + def handle_result(future_done_set):
107 + if result.cancelled():
108 + return
109 +
110 + try:
111 + handle_result.current_task = next(completed_iter)
112 + except StopIteration:
113 + pass
114 + else:
115 + handle_result.current_task.add_done_callback(handle_result)
116 + return
117 +
118 + e = None
119 + for future in futures.values():
120 + if (future.done() and future.exception() is not None):
121 + # Retrieve exceptions from all futures in order to
122 + # avoid triggering the event loop's error handler.
123 + e = future.exception()
124 +
125 + if e is None:
126 + result.set_result(dict((k, list(v.result()))
127 + for k, v in futures.items()))
128 + else:
129 + result.set_exception(e)
130 +
131 + try:
132 + handle_result.current_task = next(completed_iter)
133 + except StopIteration:
134 + handle_result.current_task = None
135 + result.set_result({})
136 + else:
137 + handle_result.current_task.add_done_callback(handle_result)
138 +
139 + def cancel_callback(result):
140 + if (result.cancelled() and
141 + handle_result.current_task is not None and
142 + not handle_result.current_task.done()):
143 + handle_result.current_task.cancel()
144 +
145 + result.add_done_callback(cancel_callback)
146 +
147 + return result
148 diff --git a/pym/portage/package/ebuild/_parallel_manifest/ManifestTask.py b/pym/portage/package/ebuild/_parallel_manifest/ManifestTask.py
149 index 0ee2b910d..6f5fe5b16 100644
150 --- a/pym/portage/package/ebuild/_parallel_manifest/ManifestTask.py
151 +++ b/pym/portage/package/ebuild/_parallel_manifest/ManifestTask.py
152 @@ -8,8 +8,12 @@ import subprocess
153 from portage import os
154 from portage import _unicode_encode, _encodings
155 from portage.const import MANIFEST2_IDENTIFIERS
156 +from portage.dep import _repo_separator
157 +from portage.exception import InvalidDependString
158 +from portage.localization import _
159 from portage.util import (atomic_ofstream, grablines,
160 shlex_split, varexpand, writemsg)
161 +from portage.util._async.AsyncTaskFuture import AsyncTaskFuture
162 from portage.util._async.PipeLogger import PipeLogger
163 from portage.util._async.PopenProcess import PopenProcess
164 from _emerge.CompositeTask import CompositeTask
165 @@ -29,6 +33,24 @@ class ManifestTask(CompositeTask):
166 def _start(self):
167 self._manifest_path = os.path.join(self.repo_config.location,
168 self.cp, "Manifest")
169 +
170 + self._start_task(
171 + AsyncTaskFuture(future=self.fetchlist_dict),
172 + self._start_with_fetchlist)
173 +
174 + def _start_with_fetchlist(self, fetchlist_task):
175 + if self._default_exit(fetchlist_task) != os.EX_OK:
176 + if not self.fetchlist_dict.cancelled():
177 + try:
178 + self.fetchlist_dict.result()
179 + except InvalidDependString as e:
180 + writemsg(
181 + _("!!! %s%s%s: SRC_URI: %s\n") %
182 + (self.cp, _repo_separator, self.repo_config.name, e),
183 + noiselevel=-1)
184 + self._async_wait()
185 + return
186 + self.fetchlist_dict = self.fetchlist_dict.result()
187 manifest_proc = ManifestProcess(cp=self.cp, distdir=self.distdir,
188 fetchlist_dict=self.fetchlist_dict, repo_config=self.repo_config,
189 scheduler=self.scheduler)
190 diff --git a/pym/portage/tests/dbapi/test_portdb_cache.py b/pym/portage/tests/dbapi/test_portdb_cache.py
191 index bd934460a..1f139b256 100644
192 --- a/pym/portage/tests/dbapi/test_portdb_cache.py
193 +++ b/pym/portage/tests/dbapi/test_portdb_cache.py
194 @@ -38,6 +38,7 @@ class PortdbCacheTestCase(TestCase):
195 portage_python = portage._python_interpreter
196 egencache_cmd = (portage_python, "-b", "-Wd",
197 os.path.join(self.bindir, "egencache"),
198 + "--update-manifests", "--sign-manifests=n",
199 "--repo", "test_repo",
200 "--repositories-configuration", settings.repositories.config_string())
201 python_cmd = (portage_python, "-b", "-Wd", "-c")
202 --
203 2.13.6