Gentoo Archives: gentoo-portage-dev

From: Zac Medico <zmedico@g.o>
To: gentoo-portage-dev@l.g.o
Cc: Zac Medico <zmedico@g.o>
Subject: [gentoo-portage-dev] [PATCH 2/2] ManifestScheduler: async fetchlist_dict (bug 653946)
Date: Tue, 24 Apr 2018 23:46:52
Message-Id: 20180424234559.26703-3-zmedico@gentoo.org
In Reply to: [gentoo-portage-dev] [PATCH 0/2] ManifestScheduler: async fetchlist_dict (bug 653946) by Zac Medico
1 In order to avoid event loop recursion, pass fetchlist_dict to
2 ManifestTask as a Future.
3
4 Bug: https://bugs.gentoo.org/653946
5 ---
6 .../ebuild/_parallel_manifest/ManifestScheduler.py | 82 ++++++++++++++++++----
7 .../ebuild/_parallel_manifest/ManifestTask.py | 22 ++++++
8 pym/portage/tests/dbapi/test_portdb_cache.py | 1 +
9 3 files changed, 91 insertions(+), 14 deletions(-)
10
11 diff --git a/pym/portage/package/ebuild/_parallel_manifest/ManifestScheduler.py b/pym/portage/package/ebuild/_parallel_manifest/ManifestScheduler.py
12 index 38ac4825e..07794522e 100644
13 --- a/pym/portage/package/ebuild/_parallel_manifest/ManifestScheduler.py
14 +++ b/pym/portage/package/ebuild/_parallel_manifest/ManifestScheduler.py
15 @@ -4,9 +4,9 @@
16 import portage
17 from portage import os
18 from portage.dep import _repo_separator
19 -from portage.exception import InvalidDependString
20 from portage.localization import _
21 from portage.util._async.AsyncScheduler import AsyncScheduler
22 +from portage.util.futures.iter_completed import iter_gather
23 from .ManifestTask import ManifestTask
24
25 class ManifestScheduler(AsyncScheduler):
26 @@ -63,21 +63,14 @@ class ManifestScheduler(AsyncScheduler):
27 cpv_list = portdb.cp_list(cp, mytree=[repo_config.location])
28 if not cpv_list:
29 continue
30 - fetchlist_dict = {}
31 - try:
32 - for cpv in cpv_list:
33 - fetchlist_dict[cpv] = \
34 - list(portdb.getFetchMap(cpv, mytree=mytree))
35 - except InvalidDependString as e:
36 - portage.writemsg(
37 - _("!!! %s%s%s: SRC_URI: %s\n") %
38 - (cp, _repo_separator, repo_config.name, e),
39 - noiselevel=-1)
40 - self._error_count += 1
41 - continue
42
43 + # Use _future_fetchlist(max_jobs=1), since we
44 + # spawn concurrent ManifestTask instances.
45 yield ManifestTask(cp=cp, distdir=distdir,
46 - fetchlist_dict=fetchlist_dict, repo_config=repo_config,
47 + fetchlist_dict=_future_fetchlist(
48 + self._event_loop, portdb, repo_config, cp, cpv_list,
49 + max_jobs=1),
50 + repo_config=repo_config,
51 gpg_cmd=self._gpg_cmd, gpg_vars=self._gpg_vars,
52 force_sign_key=self._force_sign_key)
53
54 @@ -91,3 +84,64 @@ class ManifestScheduler(AsyncScheduler):
55 noiselevel=-1)
56
57 AsyncScheduler._task_exit(self, task)
58 +
59 +
60 +def _future_fetchlist(loop, portdb, repo_config, cp, cpv_list,
61 + max_jobs=None, max_load=None):
62 + """
63 + Asynchronous form of FetchlistDict, with max_jobs and max_load
64 + parameters in order to control async_aux_get concurrency.
65 +
66 + @param loop: event loop
67 + @type loop: EventLoop
68 + @param portdb: portdbapi instance
69 + @type portdb: portdbapi
70 + @param repo_config: repository configuration for a Manifest
71 + @type repo_config: RepoConfig
72 + @param cp: cp for a Manifest
73 + @type cp: str
74 + @param cpv_list: list of ebuild cpv values for a Manifest
75 + @type cpv_list: list
76 + @param max_jobs: max number of futures to process concurrently (default
77 + is multiprocessing.cpu_count())
78 + @type max_jobs: int
79 + @param max_load: max load allowed when scheduling a new future,
80 + otherwise schedule no more than 1 future at a time (default
81 + is multiprocessing.cpu_count())
82 + @type max_load: int or float
83 + @return: a Future resulting in a Mapping compatible with FetchlistDict
84 + @rtype: asyncio.Future (or compatible)
85 + """
86 + loop = getattr(loop, '_asyncio_wrapper', loop)
87 + result = loop.create_future()
88 +
89 + def gather_done(gather_result):
90 + if result.cancelled():
91 + return
92 +
93 + e = None
94 + for future in gather_result.result():
95 + if (future.done() and future.exception() is not None):
96 + # Retrieve exceptions from all futures in order to
97 + # avoid triggering the event loop's error handler.
98 + e = future.exception()
99 +
100 + if e is None:
101 + result.set_result(dict((k, list(v.result()))
102 + for k, v in zip(cpv_list, gather_result.result())))
103 + else:
104 + result.set_exception(e)
105 +
106 + gather_result = iter_gather(
107 + (portdb.async_fetch_map(cpv, mytree=repo_config.location, loop=loop)
108 + for cpv in cpv_list),
109 + max_jobs=max_jobs,
110 + max_load=max_load,
111 + loop=loop,
112 + )
113 +
114 + gather_result.add_done_callback(gather_done)
115 + result.add_done_callback(lambda result:
116 + gather_result.cancel if result.cancelled() else gather_result)
117 +
118 + return result
119 diff --git a/pym/portage/package/ebuild/_parallel_manifest/ManifestTask.py b/pym/portage/package/ebuild/_parallel_manifest/ManifestTask.py
120 index 0ee2b910d..6f5fe5b16 100644
121 --- a/pym/portage/package/ebuild/_parallel_manifest/ManifestTask.py
122 +++ b/pym/portage/package/ebuild/_parallel_manifest/ManifestTask.py
123 @@ -8,8 +8,12 @@ import subprocess
124 from portage import os
125 from portage import _unicode_encode, _encodings
126 from portage.const import MANIFEST2_IDENTIFIERS
127 +from portage.dep import _repo_separator
128 +from portage.exception import InvalidDependString
129 +from portage.localization import _
130 from portage.util import (atomic_ofstream, grablines,
131 shlex_split, varexpand, writemsg)
132 +from portage.util._async.AsyncTaskFuture import AsyncTaskFuture
133 from portage.util._async.PipeLogger import PipeLogger
134 from portage.util._async.PopenProcess import PopenProcess
135 from _emerge.CompositeTask import CompositeTask
136 @@ -29,6 +33,24 @@ class ManifestTask(CompositeTask):
137 def _start(self):
138 self._manifest_path = os.path.join(self.repo_config.location,
139 self.cp, "Manifest")
140 +
141 + self._start_task(
142 + AsyncTaskFuture(future=self.fetchlist_dict),
143 + self._start_with_fetchlist)
144 +
145 + def _start_with_fetchlist(self, fetchlist_task):
146 + if self._default_exit(fetchlist_task) != os.EX_OK:
147 + if not self.fetchlist_dict.cancelled():
148 + try:
149 + self.fetchlist_dict.result()
150 + except InvalidDependString as e:
151 + writemsg(
152 + _("!!! %s%s%s: SRC_URI: %s\n") %
153 + (self.cp, _repo_separator, self.repo_config.name, e),
154 + noiselevel=-1)
155 + self._async_wait()
156 + return
157 + self.fetchlist_dict = self.fetchlist_dict.result()
158 manifest_proc = ManifestProcess(cp=self.cp, distdir=self.distdir,
159 fetchlist_dict=self.fetchlist_dict, repo_config=self.repo_config,
160 scheduler=self.scheduler)
161 diff --git a/pym/portage/tests/dbapi/test_portdb_cache.py b/pym/portage/tests/dbapi/test_portdb_cache.py
162 index bd934460a..1f139b256 100644
163 --- a/pym/portage/tests/dbapi/test_portdb_cache.py
164 +++ b/pym/portage/tests/dbapi/test_portdb_cache.py
165 @@ -38,6 +38,7 @@ class PortdbCacheTestCase(TestCase):
166 portage_python = portage._python_interpreter
167 egencache_cmd = (portage_python, "-b", "-Wd",
168 os.path.join(self.bindir, "egencache"),
169 + "--update-manifests", "--sign-manifests=n",
170 "--repo", "test_repo",
171 "--repositories-configuration", settings.repositories.config_string())
172 python_cmd = (portage_python, "-b", "-Wd", "-c")
173 --
174 2.13.6