Gentoo Archives: gentoo-commits

From: Zac Medico <zmedico@g.o>
To: gentoo-commits@l.g.o
Subject: [gentoo-commits] proj/portage:master commit in: pym/portage/tests/dbapi/, pym/portage/package/ebuild/_parallel_manifest/, ...
Date: Fri, 27 Apr 2018 23:42:31
Message-Id: 1524869762.3e77f0199cb401acf974089fb6aa378fd45d0e90.zmedico@gentoo
1 commit: 3e77f0199cb401acf974089fb6aa378fd45d0e90
2 Author: Zac Medico <zmedico <AT> gentoo <DOT> org>
3 AuthorDate: Tue Apr 24 06:54:05 2018 +0000
4 Commit: Zac Medico <zmedico <AT> gentoo <DOT> org>
5 CommitDate: Fri Apr 27 22:56:02 2018 +0000
6 URL: https://gitweb.gentoo.org/proj/portage.git/commit/?id=3e77f019
7
8 ManifestScheduler: async fetchlist_dict (bug 653946)
9
10 In order to avoid event loop recursion, pass fetchlist_dict to
11 ManifestTask as a Future.
12
13 Bug: https://bugs.gentoo.org/653946
14
15 pym/portage/dbapi/porttree.py | 70 ++++++++++++++++++++++
16 .../ebuild/_parallel_manifest/ManifestScheduler.py | 25 ++++----
17 .../ebuild/_parallel_manifest/ManifestTask.py | 24 +++++++-
18 pym/portage/tests/dbapi/test_portdb_cache.py | 3 +-
19 4 files changed, 105 insertions(+), 17 deletions(-)
20
21 diff --git a/pym/portage/dbapi/porttree.py b/pym/portage/dbapi/porttree.py
22 index 975f03d5e..3ce214cd7 100644
23 --- a/pym/portage/dbapi/porttree.py
24 +++ b/pym/portage/dbapi/porttree.py
25 @@ -37,6 +37,7 @@ from portage import _unicode_encode
26 from portage import OrderedDict
27 from portage.util._eventloop.EventLoop import EventLoop
28 from portage.util._eventloop.global_event_loop import global_event_loop
29 +from portage.util.futures.iter_completed import iter_gather
30 from _emerge.EbuildMetadataPhase import EbuildMetadataPhase
31
32 import os as _os
33 @@ -1393,6 +1394,75 @@ class FetchlistDict(Mapping):
34 if sys.hexversion >= 0x3000000:
35 keys = __iter__
36
37 +
38 +def _async_manifest_fetchlist(portdb, repo_config, cp, cpv_list=None,
39 + max_jobs=None, max_load=None, loop=None):
40 + """
41 + Asynchronous form of FetchlistDict, with max_jobs and max_load
42 + parameters in order to control async_aux_get concurrency.
43 +
44 + @param portdb: portdbapi instance
45 + @type portdb: portdbapi
46 + @param repo_config: repository configuration for a Manifest
47 + @type repo_config: RepoConfig
48 + @param cp: cp for a Manifest
49 + @type cp: str
50 + @param cpv_list: list of ebuild cpv values for a Manifest
51 + @type cpv_list: list
52 + @param max_jobs: max number of futures to process concurrently (default
53 + is multiprocessing.cpu_count())
54 + @type max_jobs: int
55 + @param max_load: max load allowed when scheduling a new future,
56 + otherwise schedule no more than 1 future at a time (default
57 + is multiprocessing.cpu_count())
58 + @type max_load: int or float
59 + @param loop: event loop
60 + @type loop: EventLoop
61 + @return: a Future resulting in a Mapping compatible with FetchlistDict
62 + @rtype: asyncio.Future (or compatible)
63 + """
64 + loop = loop or global_event_loop()
65 + loop = getattr(loop, '_asyncio_wrapper', loop)
66 + result = loop.create_future()
67 + cpv_list = (portdb.cp_list(cp, mytree=repo_config.location)
68 + if cpv_list is None else cpv_list)
69 +
70 + def gather_done(gather_result):
71 + # All exceptions must be consumed from gather_result before this
72 + # function returns, in order to avoid triggering the event loop's
73 + # exception handler.
74 + e = None
75 + if not gather_result.cancelled():
76 + for future in gather_result.result():
77 + if (future.done() and not future.cancelled() and
78 + future.exception() is not None):
79 + e = future.exception()
80 +
81 + if result.cancelled():
82 + return
83 + elif e is None:
84 + result.set_result(dict((k, list(v.result()))
85 + for k, v in zip(cpv_list, gather_result.result())))
86 + else:
87 + result.set_exception(e)
88 +
89 + gather_result = iter_gather(
90 + # Use a generator expression for lazy evaluation, so that iter_gather
91 + # controls the number of concurrent async_fetch_map calls.
92 + (portdb.async_fetch_map(cpv, mytree=repo_config.location, loop=loop)
93 + for cpv in cpv_list),
94 + max_jobs=max_jobs,
95 + max_load=max_load,
96 + loop=loop,
97 + )
98 +
99 + gather_result.add_done_callback(gather_done)
100 + result.add_done_callback(lambda result:
101 + gather_result.cancel() if result.cancelled() else None)
102 +
103 + return result
104 +
105 +
106 def _parse_uri_map(cpv, metadata, use=None):
107
108 myuris = use_reduce(metadata.get('SRC_URI', ''),
109
110 diff --git a/pym/portage/package/ebuild/_parallel_manifest/ManifestScheduler.py b/pym/portage/package/ebuild/_parallel_manifest/ManifestScheduler.py
111 index 38ac4825e..fabea9bc1 100644
112 --- a/pym/portage/package/ebuild/_parallel_manifest/ManifestScheduler.py
113 +++ b/pym/portage/package/ebuild/_parallel_manifest/ManifestScheduler.py
114 @@ -1,10 +1,10 @@
115 -# Copyright 2012-2013 Gentoo Foundation
116 +# Copyright 2012-2018 Gentoo Foundation
117 # Distributed under the terms of the GNU General Public License v2
118
119 import portage
120 from portage import os
121 +from portage.dbapi.porttree import _async_manifest_fetchlist
122 from portage.dep import _repo_separator
123 -from portage.exception import InvalidDependString
124 from portage.localization import _
125 from portage.util._async.AsyncScheduler import AsyncScheduler
126 from .ManifestTask import ManifestTask
127 @@ -63,21 +63,14 @@ class ManifestScheduler(AsyncScheduler):
128 cpv_list = portdb.cp_list(cp, mytree=[repo_config.location])
129 if not cpv_list:
130 continue
131 - fetchlist_dict = {}
132 - try:
133 - for cpv in cpv_list:
134 - fetchlist_dict[cpv] = \
135 - list(portdb.getFetchMap(cpv, mytree=mytree))
136 - except InvalidDependString as e:
137 - portage.writemsg(
138 - _("!!! %s%s%s: SRC_URI: %s\n") %
139 - (cp, _repo_separator, repo_config.name, e),
140 - noiselevel=-1)
141 - self._error_count += 1
142 - continue
143
144 + # Use _async_manifest_fetchlist(max_jobs=1), since we
145 + # spawn concurrent ManifestTask instances.
146 yield ManifestTask(cp=cp, distdir=distdir,
147 - fetchlist_dict=fetchlist_dict, repo_config=repo_config,
148 + fetchlist_dict=_async_manifest_fetchlist(
149 + portdb, repo_config, cp, cpv_list=cpv_list,
150 + max_jobs=1, loop=self._event_loop),
151 + repo_config=repo_config,
152 gpg_cmd=self._gpg_cmd, gpg_vars=self._gpg_vars,
153 force_sign_key=self._force_sign_key)
154
155 @@ -91,3 +84,5 @@ class ManifestScheduler(AsyncScheduler):
156 noiselevel=-1)
157
158 AsyncScheduler._task_exit(self, task)
159 +
160 +
161
162 diff --git a/pym/portage/package/ebuild/_parallel_manifest/ManifestTask.py b/pym/portage/package/ebuild/_parallel_manifest/ManifestTask.py
163 index 0ee2b910d..6bf5e82ef 100644
164 --- a/pym/portage/package/ebuild/_parallel_manifest/ManifestTask.py
165 +++ b/pym/portage/package/ebuild/_parallel_manifest/ManifestTask.py
166 @@ -1,4 +1,4 @@
167 -# Copyright 2012-2013 Gentoo Foundation
168 +# Copyright 2012-2018 Gentoo Foundation
169 # Distributed under the terms of the GNU General Public License v2
170
171 import errno
172 @@ -8,8 +8,12 @@ import subprocess
173 from portage import os
174 from portage import _unicode_encode, _encodings
175 from portage.const import MANIFEST2_IDENTIFIERS
176 +from portage.dep import _repo_separator
177 +from portage.exception import InvalidDependString
178 +from portage.localization import _
179 from portage.util import (atomic_ofstream, grablines,
180 shlex_split, varexpand, writemsg)
181 +from portage.util._async.AsyncTaskFuture import AsyncTaskFuture
182 from portage.util._async.PipeLogger import PipeLogger
183 from portage.util._async.PopenProcess import PopenProcess
184 from _emerge.CompositeTask import CompositeTask
185 @@ -29,6 +33,24 @@ class ManifestTask(CompositeTask):
186 def _start(self):
187 self._manifest_path = os.path.join(self.repo_config.location,
188 self.cp, "Manifest")
189 +
190 + self._start_task(
191 + AsyncTaskFuture(future=self.fetchlist_dict),
192 + self._start_with_fetchlist)
193 +
194 + def _start_with_fetchlist(self, fetchlist_task):
195 + if self._default_exit(fetchlist_task) != os.EX_OK:
196 + if not self.fetchlist_dict.cancelled():
197 + try:
198 + self.fetchlist_dict.result()
199 + except InvalidDependString as e:
200 + writemsg(
201 + _("!!! %s%s%s: SRC_URI: %s\n") %
202 + (self.cp, _repo_separator, self.repo_config.name, e),
203 + noiselevel=-1)
204 + self._async_wait()
205 + return
206 + self.fetchlist_dict = self.fetchlist_dict.result()
207 manifest_proc = ManifestProcess(cp=self.cp, distdir=self.distdir,
208 fetchlist_dict=self.fetchlist_dict, repo_config=self.repo_config,
209 scheduler=self.scheduler)
210
211 diff --git a/pym/portage/tests/dbapi/test_portdb_cache.py b/pym/portage/tests/dbapi/test_portdb_cache.py
212 index bd934460a..d3101b120 100644
213 --- a/pym/portage/tests/dbapi/test_portdb_cache.py
214 +++ b/pym/portage/tests/dbapi/test_portdb_cache.py
215 @@ -1,4 +1,4 @@
216 -# Copyright 2012-2015 Gentoo Foundation
217 +# Copyright 2012-2018 Gentoo Foundation
218 # Distributed under the terms of the GNU General Public License v2
219
220 import subprocess
221 @@ -38,6 +38,7 @@ class PortdbCacheTestCase(TestCase):
222 portage_python = portage._python_interpreter
223 egencache_cmd = (portage_python, "-b", "-Wd",
224 os.path.join(self.bindir, "egencache"),
225 + "--update-manifests", "--sign-manifests=n",
226 "--repo", "test_repo",
227 "--repositories-configuration", settings.repositories.config_string())
228 python_cmd = (portage_python, "-b", "-Wd", "-c")