1 |
In order to avoid event loop recursion, pass fetchlist_dict to |
2 |
ManifestTask as a Future. |
3 |
|
4 |
Bug: https://bugs.gentoo.org/653946 |
5 |
--- |
6 |
.../ebuild/_parallel_manifest/ManifestScheduler.py | 82 ++++++++++++++++++---- |
7 |
.../ebuild/_parallel_manifest/ManifestTask.py | 22 ++++++ |
8 |
pym/portage/tests/dbapi/test_portdb_cache.py | 1 + |
9 |
3 files changed, 91 insertions(+), 14 deletions(-) |
10 |
|
11 |
diff --git a/pym/portage/package/ebuild/_parallel_manifest/ManifestScheduler.py b/pym/portage/package/ebuild/_parallel_manifest/ManifestScheduler.py |
12 |
index 38ac4825e..07794522e 100644 |
13 |
--- a/pym/portage/package/ebuild/_parallel_manifest/ManifestScheduler.py |
14 |
+++ b/pym/portage/package/ebuild/_parallel_manifest/ManifestScheduler.py |
15 |
@@ -4,9 +4,9 @@ |
16 |
import portage |
17 |
from portage import os |
18 |
from portage.dep import _repo_separator |
19 |
-from portage.exception import InvalidDependString |
20 |
from portage.localization import _ |
21 |
from portage.util._async.AsyncScheduler import AsyncScheduler |
22 |
+from portage.util.futures.iter_completed import iter_gather |
23 |
from .ManifestTask import ManifestTask |
24 |
|
25 |
class ManifestScheduler(AsyncScheduler): |
26 |
@@ -63,21 +63,14 @@ class ManifestScheduler(AsyncScheduler): |
27 |
cpv_list = portdb.cp_list(cp, mytree=[repo_config.location]) |
28 |
if not cpv_list: |
29 |
continue |
30 |
- fetchlist_dict = {} |
31 |
- try: |
32 |
- for cpv in cpv_list: |
33 |
- fetchlist_dict[cpv] = \ |
34 |
- list(portdb.getFetchMap(cpv, mytree=mytree)) |
35 |
- except InvalidDependString as e: |
36 |
- portage.writemsg( |
37 |
- _("!!! %s%s%s: SRC_URI: %s\n") % |
38 |
- (cp, _repo_separator, repo_config.name, e), |
39 |
- noiselevel=-1) |
40 |
- self._error_count += 1 |
41 |
- continue |
42 |
|
43 |
+ # Use _future_fetchlist(max_jobs=1), since we |
44 |
+ # spawn concurrent ManifestTask instances. |
45 |
yield ManifestTask(cp=cp, distdir=distdir, |
46 |
- fetchlist_dict=fetchlist_dict, repo_config=repo_config, |
47 |
+ fetchlist_dict=_future_fetchlist( |
48 |
+ self._event_loop, portdb, repo_config, cp, cpv_list, |
49 |
+ max_jobs=1), |
50 |
+ repo_config=repo_config, |
51 |
gpg_cmd=self._gpg_cmd, gpg_vars=self._gpg_vars, |
52 |
force_sign_key=self._force_sign_key) |
53 |
|
54 |
@@ -91,3 +84,64 @@ class ManifestScheduler(AsyncScheduler): |
55 |
noiselevel=-1) |
56 |
|
57 |
AsyncScheduler._task_exit(self, task) |
58 |
+ |
59 |
+ |
60 |
+def _future_fetchlist(loop, portdb, repo_config, cp, cpv_list, |
61 |
+ max_jobs=None, max_load=None): |
62 |
+ """ |
63 |
+ Asynchronous form of FetchlistDict, with max_jobs and max_load |
64 |
+ parameters in order to control async_aux_get concurrency. |
65 |
+ |
66 |
+ @param loop: event loop |
67 |
+ @type loop: EventLoop |
68 |
+ @param portdb: portdbapi instance |
69 |
+ @type portdb: portdbapi |
70 |
+ @param repo_config: repository configuration for a Manifest |
71 |
+ @type repo_config: RepoConfig |
72 |
+ @param cp: cp for a Manifest |
73 |
+ @type cp: str |
74 |
+ @param cpv_list: list of ebuild cpv values for a Manifest |
75 |
+ @type cpv_list: list |
76 |
+ @param max_jobs: max number of futures to process concurrently (default |
77 |
+ is multiprocessing.cpu_count()) |
78 |
+ @type max_jobs: int |
79 |
+ @param max_load: max load allowed when scheduling a new future, |
80 |
+ otherwise schedule no more than 1 future at a time (default |
81 |
+ is multiprocessing.cpu_count()) |
82 |
+ @type max_load: int or float |
83 |
+ @return: a Future resulting in a Mapping compatible with FetchlistDict |
84 |
+ @rtype: asyncio.Future (or compatible) |
85 |
+ """ |
86 |
+ loop = getattr(loop, '_asyncio_wrapper', loop) |
87 |
+ result = loop.create_future() |
88 |
+ |
89 |
+ def gather_done(gather_result): |
90 |
+ if result.cancelled(): |
91 |
+ return |
92 |
+ |
93 |
+ e = None |
94 |
+ for future in gather_result.result(): |
95 |
+ if (future.done() and future.exception() is not None): |
96 |
+ # Retrieve exceptions from all futures in order to |
97 |
+ # avoid triggering the event loop's error handler. |
98 |
+ e = future.exception() |
99 |
+ |
100 |
+ if e is None: |
101 |
+ result.set_result(dict((k, list(v.result())) |
102 |
+ for k, v in zip(cpv_list, gather_result.result()))) |
103 |
+ else: |
104 |
+ result.set_exception(e) |
105 |
+ |
106 |
+ gather_result = iter_gather( |
107 |
+ (portdb.async_fetch_map(cpv, mytree=repo_config.location, loop=loop) |
108 |
+ for cpv in cpv_list), |
109 |
+ max_jobs=max_jobs, |
110 |
+ max_load=max_load, |
111 |
+ loop=loop, |
112 |
+ ) |
113 |
+ |
114 |
+ gather_result.add_done_callback(gather_done) |
115 |
+ result.add_done_callback(lambda result: |
116 |
+ gather_result.cancel if result.cancelled() else gather_result) |
117 |
+ |
118 |
+ return result |
119 |
diff --git a/pym/portage/package/ebuild/_parallel_manifest/ManifestTask.py b/pym/portage/package/ebuild/_parallel_manifest/ManifestTask.py |
120 |
index 0ee2b910d..6f5fe5b16 100644 |
121 |
--- a/pym/portage/package/ebuild/_parallel_manifest/ManifestTask.py |
122 |
+++ b/pym/portage/package/ebuild/_parallel_manifest/ManifestTask.py |
123 |
@@ -8,8 +8,12 @@ import subprocess |
124 |
from portage import os |
125 |
from portage import _unicode_encode, _encodings |
126 |
from portage.const import MANIFEST2_IDENTIFIERS |
127 |
+from portage.dep import _repo_separator |
128 |
+from portage.exception import InvalidDependString |
129 |
+from portage.localization import _ |
130 |
from portage.util import (atomic_ofstream, grablines, |
131 |
shlex_split, varexpand, writemsg) |
132 |
+from portage.util._async.AsyncTaskFuture import AsyncTaskFuture |
133 |
from portage.util._async.PipeLogger import PipeLogger |
134 |
from portage.util._async.PopenProcess import PopenProcess |
135 |
from _emerge.CompositeTask import CompositeTask |
136 |
@@ -29,6 +33,24 @@ class ManifestTask(CompositeTask): |
137 |
def _start(self): |
138 |
self._manifest_path = os.path.join(self.repo_config.location, |
139 |
self.cp, "Manifest") |
140 |
+ |
141 |
+ self._start_task( |
142 |
+ AsyncTaskFuture(future=self.fetchlist_dict), |
143 |
+ self._start_with_fetchlist) |
144 |
+ |
145 |
+ def _start_with_fetchlist(self, fetchlist_task): |
146 |
+ if self._default_exit(fetchlist_task) != os.EX_OK: |
147 |
+ if not self.fetchlist_dict.cancelled(): |
148 |
+ try: |
149 |
+ self.fetchlist_dict.result() |
150 |
+ except InvalidDependString as e: |
151 |
+ writemsg( |
152 |
+ _("!!! %s%s%s: SRC_URI: %s\n") % |
153 |
+ (self.cp, _repo_separator, self.repo_config.name, e), |
154 |
+ noiselevel=-1) |
155 |
+ self._async_wait() |
156 |
+ return |
157 |
+ self.fetchlist_dict = self.fetchlist_dict.result() |
158 |
manifest_proc = ManifestProcess(cp=self.cp, distdir=self.distdir, |
159 |
fetchlist_dict=self.fetchlist_dict, repo_config=self.repo_config, |
160 |
scheduler=self.scheduler) |
161 |
diff --git a/pym/portage/tests/dbapi/test_portdb_cache.py b/pym/portage/tests/dbapi/test_portdb_cache.py |
162 |
index bd934460a..1f139b256 100644 |
163 |
--- a/pym/portage/tests/dbapi/test_portdb_cache.py |
164 |
+++ b/pym/portage/tests/dbapi/test_portdb_cache.py |
165 |
@@ -38,6 +38,7 @@ class PortdbCacheTestCase(TestCase): |
166 |
portage_python = portage._python_interpreter |
167 |
egencache_cmd = (portage_python, "-b", "-Wd", |
168 |
os.path.join(self.bindir, "egencache"), |
169 |
+ "--update-manifests", "--sign-manifests=n", |
170 |
"--repo", "test_repo", |
171 |
"--repositories-configuration", settings.repositories.config_string()) |
172 |
python_cmd = (portage_python, "-b", "-Wd", "-c") |
173 |
-- |
174 |
2.13.6 |