Gentoo Archives: gentoo-portage-dev

From: Zac Medico <zmedico@g.o>
To: gentoo-portage-dev@l.g.o
Cc: Zac Medico <zmedico@g.o>
Subject: [gentoo-portage-dev] [PATCH] sync repositories in parallel (bug 557426)
Date: Thu, 13 Aug 2015 08:35:11
Message-Id: 1439454876-23070-1-git-send-email-zmedico@gentoo.org
1 Repos are synced in parallel (includes their post-sync hooks). Output
2 of concurrent processes is currently mixed (irrelevant with --quiet).
3 Support for FEATURES=metadata-transfer is handled in the main process,
4 which may be required for some backends (such as sqlite). Repos are
5 synced only after their master(s) have synced (in case that matters
6 for hooks).
7
8 X-Gentoo-Bug: 557426
9 X-Gentoo-Bug-url: https://bugs.gentoo.org/show_bug.cgi?id=557426
10 ---
11 pym/portage/emaint/modules/sync/sync.py | 125 ++++++++++++++++++++++++++++--
12 pym/portage/sync/controller.py | 31 ++++++--
13 pym/portage/tests/sync/test_sync_local.py | 6 +-
14 pym/portage/util/_async/AsyncFunction.py | 62 +++++++++++++++
15 4 files changed, 210 insertions(+), 14 deletions(-)
16 create mode 100644 pym/portage/util/_async/AsyncFunction.py
17
18 diff --git a/pym/portage/emaint/modules/sync/sync.py b/pym/portage/emaint/modules/sync/sync.py
19 index b463073..8a28f17 100644
20 --- a/pym/portage/emaint/modules/sync/sync.py
21 +++ b/pym/portage/emaint/modules/sync/sync.py
22 @@ -13,6 +13,10 @@ from portage.output import bold, red, create_color_func
23 from portage._global_updates import _global_updates
24 from portage.sync.controller import SyncManager
25 from portage.util import writemsg_level
26 +from portage.util.digraph import digraph
27 +from portage.util._async.AsyncScheduler import AsyncScheduler
28 +from portage.util._eventloop.global_event_loop import global_event_loop
29 +from portage.util._eventloop.EventLoop import EventLoop
30
31 import _emerge
32 from _emerge.emergelog import emergelog
33 @@ -201,6 +205,7 @@ class SyncRepos(object):
34 k = "--" + k.replace("_", "-")
35 self.emerge_config.opts[k] = v
36
37 + selected_repos = [repo for repo in selected_repos if repo.sync_type is not None]
38 msgs = []
39 if not selected_repos:
40 msgs.append("Emaint sync, nothing to sync... returning")
41 @@ -213,13 +218,17 @@ class SyncRepos(object):
42
43 sync_manager = SyncManager(
44 self.emerge_config.target_config.settings, emergelog)
45 - retvals = []
46 - for repo in selected_repos:
47 - if repo.sync_type is not None:
48 - returncode, message = sync_manager.sync(self.emerge_config, repo)
49 - retvals.append((repo.name, returncode))
50 - if message:
51 - msgs.append(message)
52 +
53 + sync_scheduler = SyncScheduler(emerge_config=self.emerge_config,
54 + selected_repos=selected_repos, sync_manager=sync_manager,
55 + max_jobs=self.emerge_config.opts.get('--jobs', 1),
56 + event_loop=global_event_loop() if portage._internal_caller else
57 + EventLoop(main=False))
58 +
59 + sync_scheduler.start()
60 + sync_scheduler.wait()
61 + retvals = sync_scheduler.retvals
62 + msgs.extend(sync_scheduler.msgs)
63
64 # Reload the whole config.
65 portage._sync_mode = False
66 @@ -287,3 +296,105 @@ class SyncRepos(object):
67 messages.append("Action: %s for repo: %s, returned code = %s"
68 % (action, rval[0], rval[1]))
69 return messages
70 +
71 +
72 +class SyncScheduler(AsyncScheduler):
73 + '''
74 + Sync repos in parallel, but don't sync a given repo until all
75 + of it's masters have synced.
76 + '''
77 + def __init__(self, **kwargs):
78 + '''
79 + @param emerge_config: an emerge_config instance
80 + @param selected_repos: list of RepoConfig instances
81 + @param sync_manager: a SyncManger instance
82 + '''
83 + self._emerge_config = kwargs.pop('emerge_config')
84 + self._selected_repos = kwargs.pop('selected_repos')
85 + self._sync_manager = kwargs.pop('sync_manager')
86 + AsyncScheduler.__init__(self, **kwargs)
87 + self._init_graph()
88 + self._leaf_nodes = self._sync_graph.leaf_nodes()
89 + self.retvals = []
90 + self.msgs = []
91 +
92 + def _init_graph(self):
93 + '''
94 + Graph relationships between repos and their masters.
95 + '''
96 + self._sync_graph = digraph()
97 + self._repo_map = {}
98 + self._running_repos = set()
99 + for repo in self._selected_repos:
100 + self._repo_map[repo.name] = repo
101 + self._sync_graph.add(repo.name, None)
102 + for master in repo.masters:
103 + self._repo_map[master.name] = master
104 + self._sync_graph.add(master.name, repo.name)
105 +
106 + def _task_exit(self, task):
107 + '''
108 + Remove the task from the graph, in order to expose
109 + more leaf nodes.
110 + '''
111 + self._running_tasks.discard(task)
112 + returncode = task.returncode
113 + if task.returncode == os.EX_OK:
114 + returncode, message, updatecache_flg = task.result
115 + if message:
116 + self.msgs.append(message)
117 + repo = task.kwargs['repo'].name
118 + self._running_repos.remove(repo)
119 + self.retvals.append((repo, returncode))
120 + self._sync_graph.remove(repo)
121 + self._update_leaf_nodes()
122 + super(SyncScheduler, self)._task_exit(self)
123 +
124 + def _update_leaf_nodes(self):
125 + '''
126 + Populate self._leaf_nodes with current leaves from
127 + self._sync_graph. If a circular master relationship
128 + is discovered, choose a random node to break the cycle.
129 + '''
130 + if self._sync_graph and not self._leaf_nodes:
131 + self._leaf_nodes = [obj for obj in
132 + self._sync_graph.leaf_nodes()
133 + if obj not in self._running_repos]
134 +
135 + if not (self._leaf_nodes or self._running_repos):
136 + # If there is a circular master relationship,
137 + # choose a random node to break the cycle.
138 + self._leaf_nodes = [next(iter(self._sync_graph))]
139 +
140 + def _next_task(self):
141 + '''
142 + Return a task for the next available leaf node.
143 + '''
144 + if not self._sync_graph:
145 + raise StopIteration()
146 + # If self._sync_graph is non-empty, then self._leaf_nodes
147 + # is guaranteed to be non-empty, since otherwise
148 + # _can_add_job would have returned False and prevented
149 + # _next_task from being immediately called.
150 + node = self._leaf_nodes.pop()
151 + self._running_repos.add(node)
152 + self._update_leaf_nodes()
153 +
154 + task = self._sync_manager.async(
155 + self._emerge_config, self._repo_map[node])
156 + return task
157 +
158 + def _can_add_job(self):
159 + '''
160 + Returns False if there are no leaf nodes available.
161 + '''
162 + if not AsyncScheduler._can_add_job(self):
163 + return False
164 + return bool(self._leaf_nodes) and not self._terminated.is_set()
165 +
166 + def _keep_scheduling(self):
167 + '''
168 + Schedule as long as the graph is non-empty, and we haven't
169 + been terminated.
170 + '''
171 + return bool(self._sync_graph) and not self._terminated.is_set()
172 diff --git a/pym/portage/sync/controller.py b/pym/portage/sync/controller.py
173 index 307487f..e992cc4 100644
174 --- a/pym/portage/sync/controller.py
175 +++ b/pym/portage/sync/controller.py
176 @@ -21,6 +21,7 @@ bad = create_color_func("BAD")
177 warn = create_color_func("WARN")
178 from portage.package.ebuild.doebuild import _check_temp_dir
179 from portage.metadata import action_metadata
180 +from portage.util._async.AsyncFunction import AsyncFunction
181 from portage import OrderedDict
182 from portage import _unicode_decode
183 from portage import util
184 @@ -113,12 +114,18 @@ class SyncManager(object):
185 return desc
186 return []
187
188 + def async(self, emerge_config=None, repo=None):
189 + proc = AsyncFunction(target=self.sync,
190 + kwargs=dict(emerge_config=emerge_config, repo=repo))
191 + proc.addExitListener(self._sync_callback)
192 + return proc
193
194 - def sync(self, emerge_config=None, repo=None, callback=None):
195 + def sync(self, emerge_config=None, repo=None):
196 self.emerge_config = emerge_config
197 - self.callback = callback or self._sync_callback
198 + self.callback = None
199 self.repo = repo
200 self.exitcode = 1
201 + self.updatecache_flg = False
202 if repo.sync_type in self.module_names:
203 tasks = [self.module_controller.get_class(repo.sync_type)]
204 else:
205 @@ -149,13 +156,14 @@ class SyncManager(object):
206
207 self.perform_post_sync_hook(repo.name, repo.sync_uri, repo.location)
208
209 - return self.exitcode, None
210 + return self.exitcode, None, self.updatecache_flg
211
212
213 def do_callback(self, result):
214 #print("result:", result, "callback()", self.callback)
215 exitcode, updatecache_flg = result
216 self.exitcode = exitcode
217 + self.updatecache_flg = updatecache_flg
218 if exitcode == 0:
219 msg = "=== Sync completed for %s" % self.repo.name
220 self.logger(self.xterm_titles, msg)
221 @@ -310,17 +318,28 @@ class SyncManager(object):
222 os.umask(0o022)
223 return os.EX_OK
224
225 + def _sync_callback(self, proc):
226 + """
227 + This is called in the parent process, serially, for each of the
228 + sync jobs when they complete. Some cache backends such as sqlite
229 + may require that cache access be performed serially in the
230 + parent process like this.
231 + """
232 + repo = proc.kwargs['repo']
233 + exitcode = proc.returncode
234 + updatecache_flg = False
235 + if proc.returncode == os.EX_OK:
236 + exitcode, message, updatecache_flg = proc.result
237
238 - def _sync_callback(self, exitcode, updatecache_flg):
239 if updatecache_flg and "metadata-transfer" not in self.settings.features:
240 updatecache_flg = False
241
242 if updatecache_flg and \
243 os.path.exists(os.path.join(
244 - self.repo.location, 'metadata', 'md5-cache')):
245 + repo.location, 'metadata', 'md5-cache')):
246
247 # Only update cache for repo.location since that's
248 # the only one that's been synced here.
249 action_metadata(self.settings, self.portdb, self.emerge_config.opts,
250 - porttrees=[self.repo.location])
251 + porttrees=[repo.location])
252
253 diff --git a/pym/portage/tests/sync/test_sync_local.py b/pym/portage/tests/sync/test_sync_local.py
254 index f50caba..7753a26 100644
255 --- a/pym/portage/tests/sync/test_sync_local.py
256 +++ b/pym/portage/tests/sync/test_sync_local.py
257 @@ -55,8 +55,12 @@ class SyncLocalTestCase(TestCase):
258 "dev-libs/A-0": {}
259 }
260
261 + user_config = {
262 + 'make.conf': ('FEATURES="metadata-transfer"',)
263 + }
264 +
265 playground = ResolverPlayground(ebuilds=ebuilds,
266 - profile=profile, user_config={}, debug=debug)
267 + profile=profile, user_config=user_config, debug=debug)
268 settings = playground.settings
269 eprefix = settings["EPREFIX"]
270 eroot = settings["EROOT"]
271 diff --git a/pym/portage/util/_async/AsyncFunction.py b/pym/portage/util/_async/AsyncFunction.py
272 new file mode 100644
273 index 0000000..fda1fe0
274 --- /dev/null
275 +++ b/pym/portage/util/_async/AsyncFunction.py
276 @@ -0,0 +1,62 @@
277 +# Copyright 2015 Gentoo Foundation
278 +# Distributed under the terms of the GNU General Public License v2
279 +
280 +import pickle
281 +import traceback
282 +
283 +from portage import os
284 +from portage.util._async.ForkProcess import ForkProcess
285 +from _emerge.PipeReader import PipeReader
286 +
287 +class AsyncFunction(ForkProcess):
288 + """
289 + Execute a function call in a fork, and retrieve the function
290 + return value via pickling/unpickling, accessible as the
291 + "result" attribute after the forked process has exited.
292 + """
293 +
294 + __slots__ = ('args', 'kwargs', 'result', 'target',
295 + '_async_func_reader', '_async_func_reader_pw')
296 +
297 + def _start(self):
298 + pr, pw = os.pipe()
299 + self.fd_pipes = {}
300 + self.fd_pipes[pw] = pw
301 + self._async_func_reader_pw = pw
302 + self._async_func_reader = PipeReader(
303 + input_files={"input":pr},
304 + scheduler=self.scheduler)
305 + self._async_func_reader.addExitListener(self._async_func_reader_exit)
306 + self._async_func_reader.start()
307 + ForkProcess._start(self)
308 + os.close(pw)
309 +
310 + def _run(self):
311 + try:
312 + result = self.target(*(self.args or []), **(self.kwargs or {}))
313 + os.write(self._async_func_reader_pw, pickle.dumps(result))
314 + except Exception:
315 + traceback.print_exc()
316 + return 1
317 +
318 + return os.EX_OK
319 +
320 + def _pipe_logger_exit(self, pipe_logger):
321 + # Ignore this event, since we want to ensure that we exit
322 + # only after _async_func_reader_exit has reached EOF.
323 + self._pipe_logger = None
324 +
325 + def _async_func_reader_exit(self, pipe_reader):
326 + self.result = pickle.loads(pipe_reader.getvalue())
327 + self._async_func_reader = None
328 + self._unregister()
329 + self.wait()
330 +
331 + def _unregister(self):
332 + ForkProcess._unregister(self)
333 +
334 + pipe_reader = self._async_func_reader
335 + if pipe_reader is not None:
336 + self._async_func_reader = None
337 + pipe_reader.removeExitListener(self._async_func_reader_exit)
338 + pipe_reader.cancel()
339 --
340 2.4.6

Replies