1 |
commit: 3e77f0199cb401acf974089fb6aa378fd45d0e90 |
2 |
Author: Zac Medico <zmedico <AT> gentoo <DOT> org> |
3 |
AuthorDate: Tue Apr 24 06:54:05 2018 +0000 |
4 |
Commit: Zac Medico <zmedico <AT> gentoo <DOT> org> |
5 |
CommitDate: Fri Apr 27 22:56:02 2018 +0000 |
6 |
URL: https://gitweb.gentoo.org/proj/portage.git/commit/?id=3e77f019 |
7 |
|
8 |
ManifestScheduler: async fetchlist_dict (bug 653946) |
9 |
|
10 |
In order to avoid event loop recursion, pass fetchlist_dict to |
11 |
ManifestTask as a Future. |
12 |
|
13 |
Bug: https://bugs.gentoo.org/653946 |
14 |
|
15 |
pym/portage/dbapi/porttree.py | 70 ++++++++++++++++++++++ |
16 |
.../ebuild/_parallel_manifest/ManifestScheduler.py | 25 ++++---- |
17 |
.../ebuild/_parallel_manifest/ManifestTask.py | 24 +++++++- |
18 |
pym/portage/tests/dbapi/test_portdb_cache.py | 3 +- |
19 |
4 files changed, 105 insertions(+), 17 deletions(-) |
20 |
|
21 |
diff --git a/pym/portage/dbapi/porttree.py b/pym/portage/dbapi/porttree.py |
22 |
index 975f03d5e..3ce214cd7 100644 |
23 |
--- a/pym/portage/dbapi/porttree.py |
24 |
+++ b/pym/portage/dbapi/porttree.py |
25 |
@@ -37,6 +37,7 @@ from portage import _unicode_encode |
26 |
from portage import OrderedDict |
27 |
from portage.util._eventloop.EventLoop import EventLoop |
28 |
from portage.util._eventloop.global_event_loop import global_event_loop |
29 |
+from portage.util.futures.iter_completed import iter_gather |
30 |
from _emerge.EbuildMetadataPhase import EbuildMetadataPhase |
31 |
|
32 |
import os as _os |
33 |
@@ -1393,6 +1394,75 @@ class FetchlistDict(Mapping): |
34 |
if sys.hexversion >= 0x3000000: |
35 |
keys = __iter__ |
36 |
|
37 |
+ |
38 |
+def _async_manifest_fetchlist(portdb, repo_config, cp, cpv_list=None, |
39 |
+ max_jobs=None, max_load=None, loop=None): |
40 |
+ """ |
41 |
+ Asynchronous form of FetchlistDict, with max_jobs and max_load |
42 |
+ parameters in order to control async_aux_get concurrency. |
43 |
+ |
44 |
+ @param portdb: portdbapi instance |
45 |
+ @type portdb: portdbapi |
46 |
+ @param repo_config: repository configuration for a Manifest |
47 |
+ @type repo_config: RepoConfig |
48 |
+ @param cp: cp for a Manifest |
49 |
+ @type cp: str |
50 |
+ @param cpv_list: list of ebuild cpv values for a Manifest |
51 |
+ @type cpv_list: list |
52 |
+ @param max_jobs: max number of futures to process concurrently (default |
53 |
+ is multiprocessing.cpu_count()) |
54 |
+ @type max_jobs: int |
55 |
+ @param max_load: max load allowed when scheduling a new future, |
56 |
+ otherwise schedule no more than 1 future at a time (default |
57 |
+ is multiprocessing.cpu_count()) |
58 |
+ @type max_load: int or float |
59 |
+ @param loop: event loop |
60 |
+ @type loop: EventLoop |
61 |
+ @return: a Future resulting in a Mapping compatible with FetchlistDict |
62 |
+ @rtype: asyncio.Future (or compatible) |
63 |
+ """ |
64 |
+ loop = loop or global_event_loop() |
65 |
+ loop = getattr(loop, '_asyncio_wrapper', loop) |
66 |
+ result = loop.create_future() |
67 |
+ cpv_list = (portdb.cp_list(cp, mytree=repo_config.location) |
68 |
+ if cpv_list is None else cpv_list) |
69 |
+ |
70 |
+ def gather_done(gather_result): |
71 |
+ # All exceptions must be consumed from gather_result before this |
72 |
+ # function returns, in order to avoid triggering the event loop's |
73 |
+ # exception handler. |
74 |
+ e = None |
75 |
+ if not gather_result.cancelled(): |
76 |
+ for future in gather_result.result(): |
77 |
+ if (future.done() and not future.cancelled() and |
78 |
+ future.exception() is not None): |
79 |
+ e = future.exception() |
80 |
+ |
81 |
+ if result.cancelled(): |
82 |
+ return |
83 |
+ elif e is None: |
84 |
+ result.set_result(dict((k, list(v.result())) |
85 |
+ for k, v in zip(cpv_list, gather_result.result()))) |
86 |
+ else: |
87 |
+ result.set_exception(e) |
88 |
+ |
89 |
+ gather_result = iter_gather( |
90 |
+ # Use a generator expression for lazy evaluation, so that iter_gather |
91 |
+ # controls the number of concurrent async_fetch_map calls. |
92 |
+ (portdb.async_fetch_map(cpv, mytree=repo_config.location, loop=loop) |
93 |
+ for cpv in cpv_list), |
94 |
+ max_jobs=max_jobs, |
95 |
+ max_load=max_load, |
96 |
+ loop=loop, |
97 |
+ ) |
98 |
+ |
99 |
+ gather_result.add_done_callback(gather_done) |
100 |
+ result.add_done_callback(lambda result: |
101 |
+ gather_result.cancel() if result.cancelled() else None) |
102 |
+ |
103 |
+ return result |
104 |
+ |
105 |
+ |
106 |
def _parse_uri_map(cpv, metadata, use=None): |
107 |
|
108 |
myuris = use_reduce(metadata.get('SRC_URI', ''), |
109 |
|
110 |
diff --git a/pym/portage/package/ebuild/_parallel_manifest/ManifestScheduler.py b/pym/portage/package/ebuild/_parallel_manifest/ManifestScheduler.py |
111 |
index 38ac4825e..fabea9bc1 100644 |
112 |
--- a/pym/portage/package/ebuild/_parallel_manifest/ManifestScheduler.py |
113 |
+++ b/pym/portage/package/ebuild/_parallel_manifest/ManifestScheduler.py |
114 |
@@ -1,10 +1,10 @@ |
115 |
-# Copyright 2012-2013 Gentoo Foundation |
116 |
+# Copyright 2012-2018 Gentoo Foundation |
117 |
# Distributed under the terms of the GNU General Public License v2 |
118 |
|
119 |
import portage |
120 |
from portage import os |
121 |
+from portage.dbapi.porttree import _async_manifest_fetchlist |
122 |
from portage.dep import _repo_separator |
123 |
-from portage.exception import InvalidDependString |
124 |
from portage.localization import _ |
125 |
from portage.util._async.AsyncScheduler import AsyncScheduler |
126 |
from .ManifestTask import ManifestTask |
127 |
@@ -63,21 +63,14 @@ class ManifestScheduler(AsyncScheduler): |
128 |
cpv_list = portdb.cp_list(cp, mytree=[repo_config.location]) |
129 |
if not cpv_list: |
130 |
continue |
131 |
- fetchlist_dict = {} |
132 |
- try: |
133 |
- for cpv in cpv_list: |
134 |
- fetchlist_dict[cpv] = \ |
135 |
- list(portdb.getFetchMap(cpv, mytree=mytree)) |
136 |
- except InvalidDependString as e: |
137 |
- portage.writemsg( |
138 |
- _("!!! %s%s%s: SRC_URI: %s\n") % |
139 |
- (cp, _repo_separator, repo_config.name, e), |
140 |
- noiselevel=-1) |
141 |
- self._error_count += 1 |
142 |
- continue |
143 |
|
144 |
+ # Use _async_manifest_fetchlist(max_jobs=1), since we |
145 |
+ # spawn concurrent ManifestTask instances. |
146 |
yield ManifestTask(cp=cp, distdir=distdir, |
147 |
- fetchlist_dict=fetchlist_dict, repo_config=repo_config, |
148 |
+ fetchlist_dict=_async_manifest_fetchlist( |
149 |
+ portdb, repo_config, cp, cpv_list=cpv_list, |
150 |
+ max_jobs=1, loop=self._event_loop), |
151 |
+ repo_config=repo_config, |
152 |
gpg_cmd=self._gpg_cmd, gpg_vars=self._gpg_vars, |
153 |
force_sign_key=self._force_sign_key) |
154 |
|
155 |
@@ -91,3 +84,5 @@ class ManifestScheduler(AsyncScheduler): |
156 |
noiselevel=-1) |
157 |
|
158 |
AsyncScheduler._task_exit(self, task) |
159 |
+ |
160 |
+ |
161 |
|
162 |
diff --git a/pym/portage/package/ebuild/_parallel_manifest/ManifestTask.py b/pym/portage/package/ebuild/_parallel_manifest/ManifestTask.py |
163 |
index 0ee2b910d..6bf5e82ef 100644 |
164 |
--- a/pym/portage/package/ebuild/_parallel_manifest/ManifestTask.py |
165 |
+++ b/pym/portage/package/ebuild/_parallel_manifest/ManifestTask.py |
166 |
@@ -1,4 +1,4 @@ |
167 |
-# Copyright 2012-2013 Gentoo Foundation |
168 |
+# Copyright 2012-2018 Gentoo Foundation |
169 |
# Distributed under the terms of the GNU General Public License v2 |
170 |
|
171 |
import errno |
172 |
@@ -8,8 +8,12 @@ import subprocess |
173 |
from portage import os |
174 |
from portage import _unicode_encode, _encodings |
175 |
from portage.const import MANIFEST2_IDENTIFIERS |
176 |
+from portage.dep import _repo_separator |
177 |
+from portage.exception import InvalidDependString |
178 |
+from portage.localization import _ |
179 |
from portage.util import (atomic_ofstream, grablines, |
180 |
shlex_split, varexpand, writemsg) |
181 |
+from portage.util._async.AsyncTaskFuture import AsyncTaskFuture |
182 |
from portage.util._async.PipeLogger import PipeLogger |
183 |
from portage.util._async.PopenProcess import PopenProcess |
184 |
from _emerge.CompositeTask import CompositeTask |
185 |
@@ -29,6 +33,24 @@ class ManifestTask(CompositeTask): |
186 |
def _start(self): |
187 |
self._manifest_path = os.path.join(self.repo_config.location, |
188 |
self.cp, "Manifest") |
189 |
+ |
190 |
+ self._start_task( |
191 |
+ AsyncTaskFuture(future=self.fetchlist_dict), |
192 |
+ self._start_with_fetchlist) |
193 |
+ |
194 |
+ def _start_with_fetchlist(self, fetchlist_task): |
195 |
+ if self._default_exit(fetchlist_task) != os.EX_OK: |
196 |
+ if not self.fetchlist_dict.cancelled(): |
197 |
+ try: |
198 |
+ self.fetchlist_dict.result() |
199 |
+ except InvalidDependString as e: |
200 |
+ writemsg( |
201 |
+ _("!!! %s%s%s: SRC_URI: %s\n") % |
202 |
+ (self.cp, _repo_separator, self.repo_config.name, e), |
203 |
+ noiselevel=-1) |
204 |
+ self._async_wait() |
205 |
+ return |
206 |
+ self.fetchlist_dict = self.fetchlist_dict.result() |
207 |
manifest_proc = ManifestProcess(cp=self.cp, distdir=self.distdir, |
208 |
fetchlist_dict=self.fetchlist_dict, repo_config=self.repo_config, |
209 |
scheduler=self.scheduler) |
210 |
|
211 |
diff --git a/pym/portage/tests/dbapi/test_portdb_cache.py b/pym/portage/tests/dbapi/test_portdb_cache.py |
212 |
index bd934460a..d3101b120 100644 |
213 |
--- a/pym/portage/tests/dbapi/test_portdb_cache.py |
214 |
+++ b/pym/portage/tests/dbapi/test_portdb_cache.py |
215 |
@@ -1,4 +1,4 @@ |
216 |
-# Copyright 2012-2015 Gentoo Foundation |
217 |
+# Copyright 2012-2018 Gentoo Foundation |
218 |
# Distributed under the terms of the GNU General Public License v2 |
219 |
|
220 |
import subprocess |
221 |
@@ -38,6 +38,7 @@ class PortdbCacheTestCase(TestCase): |
222 |
portage_python = portage._python_interpreter |
223 |
egencache_cmd = (portage_python, "-b", "-Wd", |
224 |
os.path.join(self.bindir, "egencache"), |
225 |
+ "--update-manifests", "--sign-manifests=n", |
226 |
"--repo", "test_repo", |
227 |
"--repositories-configuration", settings.repositories.config_string()) |
228 |
python_cmd = (portage_python, "-b", "-Wd", "-c") |