1 |
On Thu, 13 Aug 2015 01:34:36 -0700 |
2 |
Zac Medico <zmedico@g.o> wrote: |
3 |
|
4 |
> Repos are synced in parallel (includes their post-sync hooks). Output |
5 |
> of concurrent processes is currently mixed (irrelevant with --quiet). |
6 |
> Support for FEATURES=metadata-transfer is handled in the main process, |
7 |
> which may be required for some backends (such as sqlite). Repos are |
8 |
> synced only after their master(s) have synced (in case that matters |
9 |
> for hooks). |
10 |
> |
11 |
> X-Gentoo-Bug: 557426 |
12 |
> X-Gentoo-Bug-url: https://bugs.gentoo.org/show_bug.cgi?id=557426 |
13 |
> --- |
14 |
> pym/portage/emaint/modules/sync/sync.py | 125 |
15 |
> ++++++++++++++++++++++++++++-- |
16 |
> pym/portage/sync/controller.py | 31 ++++++-- |
17 |
> pym/portage/tests/sync/test_sync_local.py | 6 +- |
18 |
> pym/portage/util/_async/AsyncFunction.py | 62 +++++++++++++++ 4 |
19 |
> files changed, 210 insertions(+), 14 deletions(-) create mode 100644 |
20 |
> pym/portage/util/_async/AsyncFunction.py |
21 |
> |
22 |
> diff --git a/pym/portage/emaint/modules/sync/sync.py |
23 |
> b/pym/portage/emaint/modules/sync/sync.py index b463073..8a28f17 |
24 |
> 100644 --- a/pym/portage/emaint/modules/sync/sync.py |
25 |
> +++ b/pym/portage/emaint/modules/sync/sync.py |
26 |
> @@ -13,6 +13,10 @@ from portage.output import bold, red, |
27 |
> create_color_func from portage._global_updates import _global_updates |
28 |
> from portage.sync.controller import SyncManager |
29 |
> from portage.util import writemsg_level |
30 |
> +from portage.util.digraph import digraph |
31 |
> +from portage.util._async.AsyncScheduler import AsyncScheduler |
32 |
> +from portage.util._eventloop.global_event_loop import |
33 |
> global_event_loop +from portage.util._eventloop.EventLoop import |
34 |
> EventLoop |
35 |
> import _emerge |
36 |
> from _emerge.emergelog import emergelog |
37 |
> @@ -201,6 +205,7 @@ class SyncRepos(object): |
38 |
> k = "--" + k.replace("_", |
39 |
> "-") self.emerge_config.opts[k] = v |
40 |
> |
41 |
> + selected_repos = [repo for repo in selected_repos if |
42 |
> repo.sync_type is not None] msgs = [] |
43 |
> if not selected_repos: |
44 |
> msgs.append("Emaint sync, nothing to sync... |
45 |
> returning") @@ -213,13 +218,17 @@ class SyncRepos(object): |
46 |
> |
47 |
> sync_manager = SyncManager( |
48 |
> self.emerge_config.target_config.settings, |
49 |
> emergelog) |
50 |
> - retvals = [] |
51 |
> - for repo in selected_repos: |
52 |
> - if repo.sync_type is not None: |
53 |
> - returncode, message = |
54 |
> sync_manager.sync(self.emerge_config, repo) |
55 |
> - retvals.append((repo.name, |
56 |
> returncode)) |
57 |
> - if message: |
58 |
> - msgs.append(message) |
59 |
> + |
60 |
> + sync_scheduler = |
61 |
> SyncScheduler(emerge_config=self.emerge_config, |
62 |
> + selected_repos=selected_repos, |
63 |
> sync_manager=sync_manager, |
64 |
> + |
65 |
> max_jobs=self.emerge_config.opts.get('--jobs', 1), |
66 |
> + event_loop=global_event_loop() if |
67 |
> portage._internal_caller else |
68 |
> + EventLoop(main=False)) |
69 |
> + |
70 |
> + sync_scheduler.start() |
71 |
> + sync_scheduler.wait() |
72 |
> + retvals = sync_scheduler.retvals |
73 |
> + msgs.extend(sync_scheduler.msgs) |
74 |
> |
75 |
> # Reload the whole config. |
76 |
> portage._sync_mode = False |
77 |
> @@ -287,3 +296,105 @@ class SyncRepos(object): |
78 |
> messages.append("Action: %s for repo: %s, |
79 |
> returned code = %s" % (action, rval[0], rval[1])) |
80 |
> return messages |
81 |
> + |
82 |
> + |
83 |
> +class SyncScheduler(AsyncScheduler): |
84 |
> + ''' |
85 |
> + Sync repos in parallel, but don't sync a given repo until all |
86 |
> + of it's masters have synced. |
87 |
> + ''' |
88 |
> + def __init__(self, **kwargs): |
89 |
> + ''' |
90 |
> + @param emerge_config: an emerge_config instance |
91 |
> + @param selected_repos: list of RepoConfig instances |
92 |
> + @param sync_manager: a SyncManger instance |
93 |
> + ''' |
94 |
> + self._emerge_config = kwargs.pop('emerge_config') |
95 |
> + self._selected_repos = kwargs.pop('selected_repos') |
96 |
> + self._sync_manager = kwargs.pop('sync_manager') |
97 |
> + AsyncScheduler.__init__(self, **kwargs) |
98 |
> + self._init_graph() |
99 |
> + self._leaf_nodes = self._sync_graph.leaf_nodes() |
100 |
> + self.retvals = [] |
101 |
> + self.msgs = [] |
102 |
> + |
103 |
> + def _init_graph(self): |
104 |
> + ''' |
105 |
> + Graph relationships between repos and their masters. |
106 |
> + ''' |
107 |
> + self._sync_graph = digraph() |
108 |
> + self._repo_map = {} |
109 |
> + self._running_repos = set() |
110 |
> + for repo in self._selected_repos: |
111 |
> + self._repo_map[repo.name] = repo |
112 |
> + self._sync_graph.add(repo.name, None) |
113 |
> + for master in repo.masters: |
114 |
> + self._repo_map[master.name] = master |
115 |
> + self._sync_graph.add(master.name, |
116 |
> repo.name) + |
117 |
> + def _task_exit(self, task): |
118 |
> + ''' |
119 |
> + Remove the task from the graph, in order to expose |
120 |
> + more leaf nodes. |
121 |
> + ''' |
122 |
> + self._running_tasks.discard(task) |
123 |
> + returncode = task.returncode |
124 |
> + if task.returncode == os.EX_OK: |
125 |
> + returncode, message, updatecache_flg = |
126 |
> task.result |
127 |
> + if message: |
128 |
> + self.msgs.append(message) |
129 |
> + repo = task.kwargs['repo'].name |
130 |
> + self._running_repos.remove(repo) |
131 |
> + self.retvals.append((repo, returncode)) |
132 |
> + self._sync_graph.remove(repo) |
133 |
> + self._update_leaf_nodes() |
134 |
> + super(SyncScheduler, self)._task_exit(self) |
135 |
> + |
136 |
|
137 |
|
138 |
returncode in this function seems to be assigned too man times for no |
139 |
real reason. Is there no other way to get message than to reassign |
140 |
returncode? and it task.returncode is not os.EX_OK is not task.result |
141 |
still accessible? maybe there is a a message still? It sounds like |
142 |
message should be made available like returncode and forget about |
143 |
splitting task.result... maybe it will be clearer when I fifnish |
144 |
reviewing the code. It is early in the morning ... still waking up |
145 |
|
146 |
|
147 |
|
148 |
|
149 |
|
150 |
|
151 |
|
152 |
> + def _update_leaf_nodes(self): |
153 |
> + ''' |
154 |
> + Populate self._leaf_nodes with current leaves from |
155 |
> + self._sync_graph. If a circular master relationship |
156 |
> + is discovered, choose a random node to break the |
157 |
> cycle. |
158 |
> + ''' |
159 |
> + if self._sync_graph and not self._leaf_nodes: |
160 |
> + self._leaf_nodes = [obj for obj in |
161 |
> + self._sync_graph.leaf_nodes() |
162 |
> + if obj not in self._running_repos] |
163 |
> + |
164 |
> + if not (self._leaf_nodes or |
165 |
> self._running_repos): |
166 |
> + # If there is a circular master |
167 |
> relationship, |
168 |
> + # choose a random node to break the |
169 |
> cycle. |
170 |
> + self._leaf_nodes = |
171 |
> [next(iter(self._sync_graph))] + |
172 |
> + def _next_task(self): |
173 |
> + ''' |
174 |
> + Return a task for the next available leaf node. |
175 |
> + ''' |
176 |
> + if not self._sync_graph: |
177 |
> + raise StopIteration() |
178 |
> + # If self._sync_graph is non-empty, then |
179 |
> self._leaf_nodes |
180 |
> + # is guaranteed to be non-empty, since otherwise |
181 |
> + # _can_add_job would have returned False and |
182 |
> prevented |
183 |
> + # _next_task from being immediately called. |
184 |
> + node = self._leaf_nodes.pop() |
185 |
> + self._running_repos.add(node) |
186 |
> + self._update_leaf_nodes() |
187 |
> + |
188 |
> + task = self._sync_manager.async( |
189 |
> + self._emerge_config, self._repo_map[node]) |
190 |
> + return task |
191 |
> + |
192 |
> + def _can_add_job(self): |
193 |
> + ''' |
194 |
> + Returns False if there are no leaf nodes available. |
195 |
> + ''' |
196 |
> + if not AsyncScheduler._can_add_job(self): |
197 |
> + return False |
198 |
> + return bool(self._leaf_nodes) and not |
199 |
> self._terminated.is_set() + |
200 |
> + def _keep_scheduling(self): |
201 |
> + ''' |
202 |
> + Schedule as long as the graph is non-empty, and we |
203 |
> haven't |
204 |
> + been terminated. |
205 |
> + ''' |
206 |
> + return bool(self._sync_graph) and not |
207 |
> self._terminated.is_set() |
208 |
|
209 |
|
210 |
|
211 |
These 2 functions look like barely different versions of the same |
212 |
thing. doesn't bool(self._leaf_nodes) and bool(self._sync_graph) |
213 |
really evaluate the same? if there are no leaf nodes, then sync graph |
214 |
should be false???? |
215 |
|
216 |
|
217 |
|
218 |
> diff --git a/pym/portage/sync/controller.py |
219 |
> b/pym/portage/sync/controller.py index 307487f..e992cc4 100644 |
220 |
> --- a/pym/portage/sync/controller.py |
221 |
> +++ b/pym/portage/sync/controller.py |
222 |
> @@ -21,6 +21,7 @@ bad = create_color_func("BAD") |
223 |
> warn = create_color_func("WARN") |
224 |
> from portage.package.ebuild.doebuild import _check_temp_dir |
225 |
> from portage.metadata import action_metadata |
226 |
> +from portage.util._async.AsyncFunction import AsyncFunction |
227 |
> from portage import OrderedDict |
228 |
> from portage import _unicode_decode |
229 |
> from portage import util |
230 |
> @@ -113,12 +114,18 @@ class SyncManager(object): |
231 |
> return desc |
232 |
> return [] |
233 |
> |
234 |
> + def async(self, emerge_config=None, repo=None): |
235 |
> + proc = AsyncFunction(target=self.sync, |
236 |
> + kwargs=dict(emerge_config=emerge_config, |
237 |
> repo=repo)) |
238 |
> + proc.addExitListener(self._sync_callback) |
239 |
> + return proc |
240 |
> |
241 |
> - def sync(self, emerge_config=None, repo=None, callback=None): |
242 |
> + def sync(self, emerge_config=None, repo=None): |
243 |
> self.emerge_config = emerge_config |
244 |
> - self.callback = callback or self._sync_callback |
245 |
> + self.callback = None |
246 |
> self.repo = repo |
247 |
> self.exitcode = 1 |
248 |
> + self.updatecache_flg = False |
249 |
> if repo.sync_type in self.module_names: |
250 |
> tasks = |
251 |
> [self.module_controller.get_class(repo.sync_type)] else: |
252 |
> @@ -149,13 +156,14 @@ class SyncManager(object): |
253 |
> |
254 |
> self.perform_post_sync_hook(repo.name, |
255 |
> repo.sync_uri, repo.location) |
256 |
> - return self.exitcode, None |
257 |
> + return self.exitcode, None, self.updatecache_flg |
258 |
> |
259 |
> |
260 |
> def do_callback(self, result): |
261 |
> #print("result:", result, "callback()", |
262 |
> self.callback) exitcode, updatecache_flg = result |
263 |
> self.exitcode = exitcode |
264 |
> + self.updatecache_flg = updatecache_flg |
265 |
> if exitcode == 0: |
266 |
> msg = "=== Sync completed for %s" % |
267 |
> self.repo.name self.logger(self.xterm_titles, msg) |
268 |
> @@ -310,17 +318,28 @@ class SyncManager(object): |
269 |
> os.umask(0o022) |
270 |
> return os.EX_OK |
271 |
> |
272 |
> + def _sync_callback(self, proc): |
273 |
> + """ |
274 |
> + This is called in the parent process, serially, for |
275 |
> each of the |
276 |
> + sync jobs when they complete. Some cache backends |
277 |
> such as sqlite |
278 |
> + may require that cache access be performed serially |
279 |
> in the |
280 |
> + parent process like this. |
281 |
> + """ |
282 |
> + repo = proc.kwargs['repo'] |
283 |
> + exitcode = proc.returncode |
284 |
> + updatecache_flg = False |
285 |
> + if proc.returncode == os.EX_OK: |
286 |
> + exitcode, message, updatecache_flg = |
287 |
> proc.result |
288 |
> - def _sync_callback(self, exitcode, updatecache_flg): |
289 |
> if updatecache_flg and "metadata-transfer" not in |
290 |
> self.settings.features: updatecache_flg = False |
291 |
> |
292 |
> if updatecache_flg and \ |
293 |
> os.path.exists(os.path.join( |
294 |
> - self.repo.location, 'metadata', |
295 |
> 'md5-cache')): |
296 |
> + repo.location, 'metadata', 'md5-cache')): |
297 |
> |
298 |
> |
299 |
|
300 |
|
301 |
Again, this looks unnecessarily messy dealing with returncode/exitcode |
302 |
and updatecache_flag assigning and reassigning... |
303 |
|
304 |
|
305 |
|
306 |
> # Only update cache for repo.location since |
307 |
> that's # the only one that's been synced here. |
308 |
> action_metadata(self.settings, self.portdb, |
309 |
> self.emerge_config.opts, |
310 |
> - porttrees=[self.repo.location]) |
311 |
> + porttrees=[repo.location]) |
312 |
> |
313 |
> diff --git a/pym/portage/tests/sync/test_sync_local.py |
314 |
> b/pym/portage/tests/sync/test_sync_local.py index f50caba..7753a26 |
315 |
> 100644 --- a/pym/portage/tests/sync/test_sync_local.py |
316 |
> +++ b/pym/portage/tests/sync/test_sync_local.py |
317 |
> @@ -55,8 +55,12 @@ class SyncLocalTestCase(TestCase): |
318 |
> "dev-libs/A-0": {} |
319 |
> } |
320 |
> |
321 |
> + user_config = { |
322 |
> + 'make.conf': |
323 |
> ('FEATURES="metadata-transfer"',) |
324 |
> + } |
325 |
> + |
326 |
> playground = ResolverPlayground(ebuilds=ebuilds, |
327 |
> - profile=profile, user_config={}, debug=debug) |
328 |
> + profile=profile, user_config=user_config, |
329 |
> debug=debug) settings = playground.settings |
330 |
> eprefix = settings["EPREFIX"] |
331 |
> eroot = settings["EROOT"] |
332 |
> diff --git a/pym/portage/util/_async/AsyncFunction.py |
333 |
> b/pym/portage/util/_async/AsyncFunction.py new file mode 100644 |
334 |
> index 0000000..fda1fe0 |
335 |
> --- /dev/null |
336 |
> +++ b/pym/portage/util/_async/AsyncFunction.py |
337 |
> @@ -0,0 +1,62 @@ |
338 |
> +# Copyright 2015 Gentoo Foundation |
339 |
> +# Distributed under the terms of the GNU General Public License v2 |
340 |
> + |
341 |
> +import pickle |
342 |
> +import traceback |
343 |
> + |
344 |
> +from portage import os |
345 |
> +from portage.util._async.ForkProcess import ForkProcess |
346 |
> +from _emerge.PipeReader import PipeReader |
347 |
> + |
348 |
> +class AsyncFunction(ForkProcess): |
349 |
> + """ |
350 |
> + Execute a function call in a fork, and retrieve the function |
351 |
> + return value via pickling/unpickling, accessible as the |
352 |
> + "result" attribute after the forked process has exited. |
353 |
> + """ |
354 |
> + |
355 |
> + __slots__ = ('args', 'kwargs', 'result', 'target', |
356 |
> + '_async_func_reader', '_async_func_reader_pw') |
357 |
> + |
358 |
> + def _start(self): |
359 |
> + pr, pw = os.pipe() |
360 |
> + self.fd_pipes = {} |
361 |
> + self.fd_pipes[pw] = pw |
362 |
> + self._async_func_reader_pw = pw |
363 |
> + self._async_func_reader = PipeReader( |
364 |
> + input_files={"input":pr}, |
365 |
> + scheduler=self.scheduler) |
366 |
> + |
367 |
> self._async_func_reader.addExitListener(self._async_func_reader_exit) |
368 |
> + self._async_func_reader.start() |
369 |
> + ForkProcess._start(self) |
370 |
> + os.close(pw) |
371 |
> + |
372 |
> + def _run(self): |
373 |
> + try: |
374 |
> + result = self.target(*(self.args or []), |
375 |
> **(self.kwargs or {})) |
376 |
> + os.write(self._async_func_reader_pw, |
377 |
> pickle.dumps(result)) |
378 |
> + except Exception: |
379 |
> + traceback.print_exc() |
380 |
> + return 1 |
381 |
> + |
382 |
> + return os.EX_OK |
383 |
> + |
384 |
> + def _pipe_logger_exit(self, pipe_logger): |
385 |
> + # Ignore this event, since we want to ensure that we |
386 |
> exit |
387 |
> + # only after _async_func_reader_exit has reached EOF. |
388 |
> + self._pipe_logger = None |
389 |
> + |
390 |
> + def _async_func_reader_exit(self, pipe_reader): |
391 |
> + self.result = pickle.loads(pipe_reader.getvalue()) |
392 |
> + self._async_func_reader = None |
393 |
> + self._unregister() |
394 |
> + self.wait() |
395 |
> + |
396 |
> + def _unregister(self): |
397 |
> + ForkProcess._unregister(self) |
398 |
> + |
399 |
> + pipe_reader = self._async_func_reader |
400 |
> + if pipe_reader is not None: |
401 |
> + self._async_func_reader = None |
402 |
> + |
403 |
> pipe_reader.removeExitListener(self._async_func_reader_exit) |
404 |
> + pipe_reader.cancel() |
405 |
|
406 |
|
407 |
|
408 |
-- |
409 |
Brian Dolbec <dolsen> |