Gentoo Archives: gentoo-portage-dev

From: Brian Dolbec <dolsen@g.o>
To: gentoo-portage-dev@l.g.o
Subject: Re: [gentoo-portage-dev] [PATCH] sync repositories in parallel (bug 557426)
Date: Thu, 13 Aug 2015 14:29:40
Message-Id: 20150813072932.205e99c0.dolsen@gentoo.org
In Reply to: [gentoo-portage-dev] [PATCH] sync repositories in parallel (bug 557426) by Zac Medico
1 On Thu, 13 Aug 2015 01:34:36 -0700
2 Zac Medico <zmedico@g.o> wrote:
3
4 > Repos are synced in parallel (includes their post-sync hooks). Output
5 > of concurrent processes is currently mixed (irrelevant with --quiet).
6 > Support for FEATURES=metadata-transfer is handled in the main process,
7 > which may be required for some backends (such as sqlite). Repos are
8 > synced only after their master(s) have synced (in case that matters
9 > for hooks).
10 >
11 > X-Gentoo-Bug: 557426
12 > X-Gentoo-Bug-url: https://bugs.gentoo.org/show_bug.cgi?id=557426
13 > ---
14 > pym/portage/emaint/modules/sync/sync.py | 125
15 > ++++++++++++++++++++++++++++--
16 > pym/portage/sync/controller.py | 31 ++++++--
17 > pym/portage/tests/sync/test_sync_local.py | 6 +-
18 > pym/portage/util/_async/AsyncFunction.py | 62 +++++++++++++++ 4
19 > files changed, 210 insertions(+), 14 deletions(-) create mode 100644
20 > pym/portage/util/_async/AsyncFunction.py
21 >
22 > diff --git a/pym/portage/emaint/modules/sync/sync.py
23 > b/pym/portage/emaint/modules/sync/sync.py index b463073..8a28f17
24 > 100644 --- a/pym/portage/emaint/modules/sync/sync.py
25 > +++ b/pym/portage/emaint/modules/sync/sync.py
26 > @@ -13,6 +13,10 @@ from portage.output import bold, red,
27 > create_color_func from portage._global_updates import _global_updates
28 > from portage.sync.controller import SyncManager
29 > from portage.util import writemsg_level
30 > +from portage.util.digraph import digraph
31 > +from portage.util._async.AsyncScheduler import AsyncScheduler
32 > +from portage.util._eventloop.global_event_loop import
33 > global_event_loop +from portage.util._eventloop.EventLoop import
34 > EventLoop
35 > import _emerge
36 > from _emerge.emergelog import emergelog
37 > @@ -201,6 +205,7 @@ class SyncRepos(object):
38 > k = "--" + k.replace("_",
39 > "-") self.emerge_config.opts[k] = v
40 >
41 > + selected_repos = [repo for repo in selected_repos if
42 > repo.sync_type is not None] msgs = []
43 > if not selected_repos:
44 > msgs.append("Emaint sync, nothing to sync...
45 > returning") @@ -213,13 +218,17 @@ class SyncRepos(object):
46 >
47 > sync_manager = SyncManager(
48 > self.emerge_config.target_config.settings,
49 > emergelog)
50 > - retvals = []
51 > - for repo in selected_repos:
52 > - if repo.sync_type is not None:
53 > - returncode, message =
54 > sync_manager.sync(self.emerge_config, repo)
55 > - retvals.append((repo.name,
56 > returncode))
57 > - if message:
58 > - msgs.append(message)
59 > +
60 > + sync_scheduler =
61 > SyncScheduler(emerge_config=self.emerge_config,
62 > + selected_repos=selected_repos,
63 > sync_manager=sync_manager,
64 > +
65 > max_jobs=self.emerge_config.opts.get('--jobs', 1),
66 > + event_loop=global_event_loop() if
67 > portage._internal_caller else
68 > + EventLoop(main=False))
69 > +
70 > + sync_scheduler.start()
71 > + sync_scheduler.wait()
72 > + retvals = sync_scheduler.retvals
73 > + msgs.extend(sync_scheduler.msgs)
74 >
75 > # Reload the whole config.
76 > portage._sync_mode = False
77 > @@ -287,3 +296,105 @@ class SyncRepos(object):
78 > messages.append("Action: %s for repo: %s,
79 > returned code = %s" % (action, rval[0], rval[1]))
80 > return messages
81 > +
82 > +
83 > +class SyncScheduler(AsyncScheduler):
84 > + '''
85 > + Sync repos in parallel, but don't sync a given repo until all
86 > + of it's masters have synced.
87 > + '''
88 > + def __init__(self, **kwargs):
89 > + '''
90 > + @param emerge_config: an emerge_config instance
91 > + @param selected_repos: list of RepoConfig instances
92 > + @param sync_manager: a SyncManger instance
93 > + '''
94 > + self._emerge_config = kwargs.pop('emerge_config')
95 > + self._selected_repos = kwargs.pop('selected_repos')
96 > + self._sync_manager = kwargs.pop('sync_manager')
97 > + AsyncScheduler.__init__(self, **kwargs)
98 > + self._init_graph()
99 > + self._leaf_nodes = self._sync_graph.leaf_nodes()
100 > + self.retvals = []
101 > + self.msgs = []
102 > +
103 > + def _init_graph(self):
104 > + '''
105 > + Graph relationships between repos and their masters.
106 > + '''
107 > + self._sync_graph = digraph()
108 > + self._repo_map = {}
109 > + self._running_repos = set()
110 > + for repo in self._selected_repos:
111 > + self._repo_map[repo.name] = repo
112 > + self._sync_graph.add(repo.name, None)
113 > + for master in repo.masters:
114 > + self._repo_map[master.name] = master
115 > + self._sync_graph.add(master.name,
116 > repo.name) +
117 > + def _task_exit(self, task):
118 > + '''
119 > + Remove the task from the graph, in order to expose
120 > + more leaf nodes.
121 > + '''
122 > + self._running_tasks.discard(task)
123 > + returncode = task.returncode
124 > + if task.returncode == os.EX_OK:
125 > + returncode, message, updatecache_flg =
126 > task.result
127 > + if message:
128 > + self.msgs.append(message)
129 > + repo = task.kwargs['repo'].name
130 > + self._running_repos.remove(repo)
131 > + self.retvals.append((repo, returncode))
132 > + self._sync_graph.remove(repo)
133 > + self._update_leaf_nodes()
134 > + super(SyncScheduler, self)._task_exit(self)
135 > +
136
137
138 returncode in this function seems to be assigned too man times for no
139 real reason. Is there no other way to get message than to reassign
140 returncode? and it task.returncode is not os.EX_OK is not task.result
141 still accessible? maybe there is a a message still? It sounds like
142 message should be made available like returncode and forget about
143 splitting task.result... maybe it will be clearer when I fifnish
144 reviewing the code. It is early in the morning ... still waking up
145
146
147
148
149
150
151
152 > + def _update_leaf_nodes(self):
153 > + '''
154 > + Populate self._leaf_nodes with current leaves from
155 > + self._sync_graph. If a circular master relationship
156 > + is discovered, choose a random node to break the
157 > cycle.
158 > + '''
159 > + if self._sync_graph and not self._leaf_nodes:
160 > + self._leaf_nodes = [obj for obj in
161 > + self._sync_graph.leaf_nodes()
162 > + if obj not in self._running_repos]
163 > +
164 > + if not (self._leaf_nodes or
165 > self._running_repos):
166 > + # If there is a circular master
167 > relationship,
168 > + # choose a random node to break the
169 > cycle.
170 > + self._leaf_nodes =
171 > [next(iter(self._sync_graph))] +
172 > + def _next_task(self):
173 > + '''
174 > + Return a task for the next available leaf node.
175 > + '''
176 > + if not self._sync_graph:
177 > + raise StopIteration()
178 > + # If self._sync_graph is non-empty, then
179 > self._leaf_nodes
180 > + # is guaranteed to be non-empty, since otherwise
181 > + # _can_add_job would have returned False and
182 > prevented
183 > + # _next_task from being immediately called.
184 > + node = self._leaf_nodes.pop()
185 > + self._running_repos.add(node)
186 > + self._update_leaf_nodes()
187 > +
188 > + task = self._sync_manager.async(
189 > + self._emerge_config, self._repo_map[node])
190 > + return task
191 > +
192 > + def _can_add_job(self):
193 > + '''
194 > + Returns False if there are no leaf nodes available.
195 > + '''
196 > + if not AsyncScheduler._can_add_job(self):
197 > + return False
198 > + return bool(self._leaf_nodes) and not
199 > self._terminated.is_set() +
200 > + def _keep_scheduling(self):
201 > + '''
202 > + Schedule as long as the graph is non-empty, and we
203 > haven't
204 > + been terminated.
205 > + '''
206 > + return bool(self._sync_graph) and not
207 > self._terminated.is_set()
208
209
210
211 These 2 functions look like barely different versions of the same
212 thing. doesn't bool(self._leaf_nodes) and bool(self._sync_graph)
213 really evaluate the same? if there are no leaf nodes, then sync graph
214 should be false????
215
216
217
218 > diff --git a/pym/portage/sync/controller.py
219 > b/pym/portage/sync/controller.py index 307487f..e992cc4 100644
220 > --- a/pym/portage/sync/controller.py
221 > +++ b/pym/portage/sync/controller.py
222 > @@ -21,6 +21,7 @@ bad = create_color_func("BAD")
223 > warn = create_color_func("WARN")
224 > from portage.package.ebuild.doebuild import _check_temp_dir
225 > from portage.metadata import action_metadata
226 > +from portage.util._async.AsyncFunction import AsyncFunction
227 > from portage import OrderedDict
228 > from portage import _unicode_decode
229 > from portage import util
230 > @@ -113,12 +114,18 @@ class SyncManager(object):
231 > return desc
232 > return []
233 >
234 > + def async(self, emerge_config=None, repo=None):
235 > + proc = AsyncFunction(target=self.sync,
236 > + kwargs=dict(emerge_config=emerge_config,
237 > repo=repo))
238 > + proc.addExitListener(self._sync_callback)
239 > + return proc
240 >
241 > - def sync(self, emerge_config=None, repo=None, callback=None):
242 > + def sync(self, emerge_config=None, repo=None):
243 > self.emerge_config = emerge_config
244 > - self.callback = callback or self._sync_callback
245 > + self.callback = None
246 > self.repo = repo
247 > self.exitcode = 1
248 > + self.updatecache_flg = False
249 > if repo.sync_type in self.module_names:
250 > tasks =
251 > [self.module_controller.get_class(repo.sync_type)] else:
252 > @@ -149,13 +156,14 @@ class SyncManager(object):
253 >
254 > self.perform_post_sync_hook(repo.name,
255 > repo.sync_uri, repo.location)
256 > - return self.exitcode, None
257 > + return self.exitcode, None, self.updatecache_flg
258 >
259 >
260 > def do_callback(self, result):
261 > #print("result:", result, "callback()",
262 > self.callback) exitcode, updatecache_flg = result
263 > self.exitcode = exitcode
264 > + self.updatecache_flg = updatecache_flg
265 > if exitcode == 0:
266 > msg = "=== Sync completed for %s" %
267 > self.repo.name self.logger(self.xterm_titles, msg)
268 > @@ -310,17 +318,28 @@ class SyncManager(object):
269 > os.umask(0o022)
270 > return os.EX_OK
271 >
272 > + def _sync_callback(self, proc):
273 > + """
274 > + This is called in the parent process, serially, for
275 > each of the
276 > + sync jobs when they complete. Some cache backends
277 > such as sqlite
278 > + may require that cache access be performed serially
279 > in the
280 > + parent process like this.
281 > + """
282 > + repo = proc.kwargs['repo']
283 > + exitcode = proc.returncode
284 > + updatecache_flg = False
285 > + if proc.returncode == os.EX_OK:
286 > + exitcode, message, updatecache_flg =
287 > proc.result
288 > - def _sync_callback(self, exitcode, updatecache_flg):
289 > if updatecache_flg and "metadata-transfer" not in
290 > self.settings.features: updatecache_flg = False
291 >
292 > if updatecache_flg and \
293 > os.path.exists(os.path.join(
294 > - self.repo.location, 'metadata',
295 > 'md5-cache')):
296 > + repo.location, 'metadata', 'md5-cache')):
297 >
298 >
299
300
301 Again, this looks unnecessarily messy dealing with returncode/exitcode
302 and updatecache_flag assigning and reassigning...
303
304
305
306 > # Only update cache for repo.location since
307 > that's # the only one that's been synced here.
308 > action_metadata(self.settings, self.portdb,
309 > self.emerge_config.opts,
310 > - porttrees=[self.repo.location])
311 > + porttrees=[repo.location])
312 >
313 > diff --git a/pym/portage/tests/sync/test_sync_local.py
314 > b/pym/portage/tests/sync/test_sync_local.py index f50caba..7753a26
315 > 100644 --- a/pym/portage/tests/sync/test_sync_local.py
316 > +++ b/pym/portage/tests/sync/test_sync_local.py
317 > @@ -55,8 +55,12 @@ class SyncLocalTestCase(TestCase):
318 > "dev-libs/A-0": {}
319 > }
320 >
321 > + user_config = {
322 > + 'make.conf':
323 > ('FEATURES="metadata-transfer"',)
324 > + }
325 > +
326 > playground = ResolverPlayground(ebuilds=ebuilds,
327 > - profile=profile, user_config={}, debug=debug)
328 > + profile=profile, user_config=user_config,
329 > debug=debug) settings = playground.settings
330 > eprefix = settings["EPREFIX"]
331 > eroot = settings["EROOT"]
332 > diff --git a/pym/portage/util/_async/AsyncFunction.py
333 > b/pym/portage/util/_async/AsyncFunction.py new file mode 100644
334 > index 0000000..fda1fe0
335 > --- /dev/null
336 > +++ b/pym/portage/util/_async/AsyncFunction.py
337 > @@ -0,0 +1,62 @@
338 > +# Copyright 2015 Gentoo Foundation
339 > +# Distributed under the terms of the GNU General Public License v2
340 > +
341 > +import pickle
342 > +import traceback
343 > +
344 > +from portage import os
345 > +from portage.util._async.ForkProcess import ForkProcess
346 > +from _emerge.PipeReader import PipeReader
347 > +
348 > +class AsyncFunction(ForkProcess):
349 > + """
350 > + Execute a function call in a fork, and retrieve the function
351 > + return value via pickling/unpickling, accessible as the
352 > + "result" attribute after the forked process has exited.
353 > + """
354 > +
355 > + __slots__ = ('args', 'kwargs', 'result', 'target',
356 > + '_async_func_reader', '_async_func_reader_pw')
357 > +
358 > + def _start(self):
359 > + pr, pw = os.pipe()
360 > + self.fd_pipes = {}
361 > + self.fd_pipes[pw] = pw
362 > + self._async_func_reader_pw = pw
363 > + self._async_func_reader = PipeReader(
364 > + input_files={"input":pr},
365 > + scheduler=self.scheduler)
366 > +
367 > self._async_func_reader.addExitListener(self._async_func_reader_exit)
368 > + self._async_func_reader.start()
369 > + ForkProcess._start(self)
370 > + os.close(pw)
371 > +
372 > + def _run(self):
373 > + try:
374 > + result = self.target(*(self.args or []),
375 > **(self.kwargs or {}))
376 > + os.write(self._async_func_reader_pw,
377 > pickle.dumps(result))
378 > + except Exception:
379 > + traceback.print_exc()
380 > + return 1
381 > +
382 > + return os.EX_OK
383 > +
384 > + def _pipe_logger_exit(self, pipe_logger):
385 > + # Ignore this event, since we want to ensure that we
386 > exit
387 > + # only after _async_func_reader_exit has reached EOF.
388 > + self._pipe_logger = None
389 > +
390 > + def _async_func_reader_exit(self, pipe_reader):
391 > + self.result = pickle.loads(pipe_reader.getvalue())
392 > + self._async_func_reader = None
393 > + self._unregister()
394 > + self.wait()
395 > +
396 > + def _unregister(self):
397 > + ForkProcess._unregister(self)
398 > +
399 > + pipe_reader = self._async_func_reader
400 > + if pipe_reader is not None:
401 > + self._async_func_reader = None
402 > +
403 > pipe_reader.removeExitListener(self._async_func_reader_exit)
404 > + pipe_reader.cancel()
405
406
407
408 --
409 Brian Dolbec <dolsen>

Replies