1 |
In order to avoid event loop recursion, pass fetchlist_dict to |
2 |
ManifestTask as a Future. |
3 |
|
4 |
Bug: https://bugs.gentoo.org/653946 |
5 |
--- |
6 |
[PATCH v2] fixed _future_fetchlist to limit concurrent async_aux_get |
7 |
calls with async_iter_completed(max_jobs=1) |
8 |
|
9 |
.../ebuild/_parallel_manifest/ManifestScheduler.py | 108 ++++++++++++++++++--- |
10 |
.../ebuild/_parallel_manifest/ManifestTask.py | 22 +++++ |
11 |
pym/portage/tests/dbapi/test_portdb_cache.py | 1 + |
12 |
3 files changed, 117 insertions(+), 14 deletions(-) |
13 |
|
14 |
diff --git a/pym/portage/package/ebuild/_parallel_manifest/ManifestScheduler.py b/pym/portage/package/ebuild/_parallel_manifest/ManifestScheduler.py |
15 |
index 38ac4825e..2f453afc2 100644 |
16 |
--- a/pym/portage/package/ebuild/_parallel_manifest/ManifestScheduler.py |
17 |
+++ b/pym/portage/package/ebuild/_parallel_manifest/ManifestScheduler.py |
18 |
@@ -4,9 +4,9 @@ |
19 |
import portage |
20 |
from portage import os |
21 |
from portage.dep import _repo_separator |
22 |
-from portage.exception import InvalidDependString |
23 |
from portage.localization import _ |
24 |
from portage.util._async.AsyncScheduler import AsyncScheduler |
25 |
+from portage.util.futures.iter_completed import async_iter_completed |
26 |
from .ManifestTask import ManifestTask |
27 |
|
28 |
class ManifestScheduler(AsyncScheduler): |
29 |
@@ -63,21 +63,14 @@ class ManifestScheduler(AsyncScheduler): |
30 |
cpv_list = portdb.cp_list(cp, mytree=[repo_config.location]) |
31 |
if not cpv_list: |
32 |
continue |
33 |
- fetchlist_dict = {} |
34 |
- try: |
35 |
- for cpv in cpv_list: |
36 |
- fetchlist_dict[cpv] = \ |
37 |
- list(portdb.getFetchMap(cpv, mytree=mytree)) |
38 |
- except InvalidDependString as e: |
39 |
- portage.writemsg( |
40 |
- _("!!! %s%s%s: SRC_URI: %s\n") % |
41 |
- (cp, _repo_separator, repo_config.name, e), |
42 |
- noiselevel=-1) |
43 |
- self._error_count += 1 |
44 |
- continue |
45 |
|
46 |
+ # Use _future_fetchlist(max_jobs=1), since we |
47 |
+ # spawn concurrent ManifestTask instances. |
48 |
yield ManifestTask(cp=cp, distdir=distdir, |
49 |
- fetchlist_dict=fetchlist_dict, repo_config=repo_config, |
50 |
+ fetchlist_dict=_future_fetchlist( |
51 |
+ self._event_loop, portdb, repo_config, cp, cpv_list, |
52 |
+ max_jobs=1), |
53 |
+ repo_config=repo_config, |
54 |
gpg_cmd=self._gpg_cmd, gpg_vars=self._gpg_vars, |
55 |
force_sign_key=self._force_sign_key) |
56 |
|
57 |
@@ -91,3 +84,90 @@ class ManifestScheduler(AsyncScheduler): |
58 |
noiselevel=-1) |
59 |
|
60 |
AsyncScheduler._task_exit(self, task) |
61 |
+ |
62 |
+ |
63 |
+def _future_fetchlist(loop, portdb, repo_config, cp, cpv_list, |
64 |
+ max_jobs=None, max_load=None): |
65 |
+ """ |
66 |
+ Asynchronous form of FetchlistDict, with max_jobs and max_load |
67 |
+ parameters in order to control async_aux_get concurrency. |
68 |
+ |
69 |
+ @param loop: event loop |
70 |
+ @type loop: EventLoop |
71 |
+ @param portdb: portdbapi instance |
72 |
+ @type portdb: portdbapi |
73 |
+ @param repo_config: repository configuration for a Manifest |
74 |
+ @type repo_config: RepoConfig |
75 |
+ @param cp: cp for a Manifest |
76 |
+ @type cp: str |
77 |
+ @param cpv_list: list of ebuild cpv values for a Manifest |
78 |
+ @type cpv_list: list |
79 |
+ @param max_jobs: max number of futures to process concurrently (default |
80 |
+ is multiprocessing.cpu_count()) |
81 |
+ @type max_jobs: int |
82 |
+ @param max_load: max load allowed when scheduling a new future, |
83 |
+ otherwise schedule no more than 1 future at a time (default |
84 |
+ is multiprocessing.cpu_count()) |
85 |
+ @type max_load: int or float |
86 |
+ @return: a Future resulting in a Mapping compatible with FetchlistDict |
87 |
+ @rtype: asyncio.Future (or compatible) |
88 |
+ """ |
89 |
+ loop = getattr(loop, '_asyncio_wrapper', loop) |
90 |
+ result = loop.create_future() |
91 |
+ futures = {} |
92 |
+ |
93 |
+ def future_generator(): |
94 |
+ for cpv in cpv_list: |
95 |
+ future = futures[cpv] = portdb.async_fetch_map( |
96 |
+ cpv, mytree=repo_config.location, loop=loop) |
97 |
+ yield future |
98 |
+ |
99 |
+ completed_iter = async_iter_completed( |
100 |
+ future_generator(), |
101 |
+ max_jobs=max_jobs, |
102 |
+ max_load=max_load, |
103 |
+ loop=loop, |
104 |
+ ) |
105 |
+ |
106 |
+ def handle_result(future_done_set): |
107 |
+ if result.cancelled(): |
108 |
+ return |
109 |
+ |
110 |
+ try: |
111 |
+ handle_result.current_task = next(completed_iter) |
112 |
+ except StopIteration: |
113 |
+ pass |
114 |
+ else: |
115 |
+ handle_result.current_task.add_done_callback(handle_result) |
116 |
+ return |
117 |
+ |
118 |
+ e = None |
119 |
+ for future in futures.values(): |
120 |
+ if (future.done() and future.exception() is not None): |
121 |
+ # Retrieve exceptions from all futures in order to |
122 |
+ # avoid triggering the event loop's error handler. |
123 |
+ e = future.exception() |
124 |
+ |
125 |
+ if e is None: |
126 |
+ result.set_result(dict((k, list(v.result())) |
127 |
+ for k, v in futures.items())) |
128 |
+ else: |
129 |
+ result.set_exception(e) |
130 |
+ |
131 |
+ try: |
132 |
+ handle_result.current_task = next(completed_iter) |
133 |
+ except StopIteration: |
134 |
+ handle_result.current_task = None |
135 |
+ result.set_result({}) |
136 |
+ else: |
137 |
+ handle_result.current_task.add_done_callback(handle_result) |
138 |
+ |
139 |
+ def cancel_callback(result): |
140 |
+ if (result.cancelled() and |
141 |
+ handle_result.current_task is not None and |
142 |
+ not handle_result.current_task.done()): |
143 |
+ handle_result.current_task.cancel() |
144 |
+ |
145 |
+ result.add_done_callback(cancel_callback) |
146 |
+ |
147 |
+ return result |
148 |
diff --git a/pym/portage/package/ebuild/_parallel_manifest/ManifestTask.py b/pym/portage/package/ebuild/_parallel_manifest/ManifestTask.py |
149 |
index 0ee2b910d..6f5fe5b16 100644 |
150 |
--- a/pym/portage/package/ebuild/_parallel_manifest/ManifestTask.py |
151 |
+++ b/pym/portage/package/ebuild/_parallel_manifest/ManifestTask.py |
152 |
@@ -8,8 +8,12 @@ import subprocess |
153 |
from portage import os |
154 |
from portage import _unicode_encode, _encodings |
155 |
from portage.const import MANIFEST2_IDENTIFIERS |
156 |
+from portage.dep import _repo_separator |
157 |
+from portage.exception import InvalidDependString |
158 |
+from portage.localization import _ |
159 |
from portage.util import (atomic_ofstream, grablines, |
160 |
shlex_split, varexpand, writemsg) |
161 |
+from portage.util._async.AsyncTaskFuture import AsyncTaskFuture |
162 |
from portage.util._async.PipeLogger import PipeLogger |
163 |
from portage.util._async.PopenProcess import PopenProcess |
164 |
from _emerge.CompositeTask import CompositeTask |
165 |
@@ -29,6 +33,24 @@ class ManifestTask(CompositeTask): |
166 |
def _start(self): |
167 |
self._manifest_path = os.path.join(self.repo_config.location, |
168 |
self.cp, "Manifest") |
169 |
+ |
170 |
+ self._start_task( |
171 |
+ AsyncTaskFuture(future=self.fetchlist_dict), |
172 |
+ self._start_with_fetchlist) |
173 |
+ |
174 |
+ def _start_with_fetchlist(self, fetchlist_task): |
175 |
+ if self._default_exit(fetchlist_task) != os.EX_OK: |
176 |
+ if not self.fetchlist_dict.cancelled(): |
177 |
+ try: |
178 |
+ self.fetchlist_dict.result() |
179 |
+ except InvalidDependString as e: |
180 |
+ writemsg( |
181 |
+ _("!!! %s%s%s: SRC_URI: %s\n") % |
182 |
+ (self.cp, _repo_separator, self.repo_config.name, e), |
183 |
+ noiselevel=-1) |
184 |
+ self._async_wait() |
185 |
+ return |
186 |
+ self.fetchlist_dict = self.fetchlist_dict.result() |
187 |
manifest_proc = ManifestProcess(cp=self.cp, distdir=self.distdir, |
188 |
fetchlist_dict=self.fetchlist_dict, repo_config=self.repo_config, |
189 |
scheduler=self.scheduler) |
190 |
diff --git a/pym/portage/tests/dbapi/test_portdb_cache.py b/pym/portage/tests/dbapi/test_portdb_cache.py |
191 |
index bd934460a..1f139b256 100644 |
192 |
--- a/pym/portage/tests/dbapi/test_portdb_cache.py |
193 |
+++ b/pym/portage/tests/dbapi/test_portdb_cache.py |
194 |
@@ -38,6 +38,7 @@ class PortdbCacheTestCase(TestCase): |
195 |
portage_python = portage._python_interpreter |
196 |
egencache_cmd = (portage_python, "-b", "-Wd", |
197 |
os.path.join(self.bindir, "egencache"), |
198 |
+ "--update-manifests", "--sign-manifests=n", |
199 |
"--repo", "test_repo", |
200 |
"--repositories-configuration", settings.repositories.config_string()) |
201 |
python_cmd = (portage_python, "-b", "-Wd", "-c") |
202 |
-- |
203 |
2.13.6 |