1 |
Repos are synced in parallel (includes their post-sync hooks). Output |
2 |
of concurrent processes is currently mixed (irrelevant with --quiet). |
3 |
Support for FEATURES=metadata-transfer is handled in the main process, |
4 |
which may be required for some backends (such as sqlite). Repos are |
5 |
synced only after their master(s) have synced (in case that matters |
6 |
for hooks). |
7 |
|
8 |
X-Gentoo-Bug: 557426 |
9 |
X-Gentoo-Bug-url: https://bugs.gentoo.org/show_bug.cgi?id=557426 |
10 |
--- |
11 |
pym/portage/emaint/modules/sync/sync.py | 125 ++++++++++++++++++++++++++++-- |
12 |
pym/portage/sync/controller.py | 31 ++++++-- |
13 |
pym/portage/tests/sync/test_sync_local.py | 6 +- |
14 |
pym/portage/util/_async/AsyncFunction.py | 62 +++++++++++++++ |
15 |
4 files changed, 210 insertions(+), 14 deletions(-) |
16 |
create mode 100644 pym/portage/util/_async/AsyncFunction.py |
17 |
|
18 |
diff --git a/pym/portage/emaint/modules/sync/sync.py b/pym/portage/emaint/modules/sync/sync.py |
19 |
index b463073..8a28f17 100644 |
20 |
--- a/pym/portage/emaint/modules/sync/sync.py |
21 |
+++ b/pym/portage/emaint/modules/sync/sync.py |
22 |
@@ -13,6 +13,10 @@ from portage.output import bold, red, create_color_func |
23 |
from portage._global_updates import _global_updates |
24 |
from portage.sync.controller import SyncManager |
25 |
from portage.util import writemsg_level |
26 |
+from portage.util.digraph import digraph |
27 |
+from portage.util._async.AsyncScheduler import AsyncScheduler |
28 |
+from portage.util._eventloop.global_event_loop import global_event_loop |
29 |
+from portage.util._eventloop.EventLoop import EventLoop |
30 |
|
31 |
import _emerge |
32 |
from _emerge.emergelog import emergelog |
33 |
@@ -201,6 +205,7 @@ class SyncRepos(object): |
34 |
k = "--" + k.replace("_", "-") |
35 |
self.emerge_config.opts[k] = v |
36 |
|
37 |
+ selected_repos = [repo for repo in selected_repos if repo.sync_type is not None] |
38 |
msgs = [] |
39 |
if not selected_repos: |
40 |
msgs.append("Emaint sync, nothing to sync... returning") |
41 |
@@ -213,13 +218,17 @@ class SyncRepos(object): |
42 |
|
43 |
sync_manager = SyncManager( |
44 |
self.emerge_config.target_config.settings, emergelog) |
45 |
- retvals = [] |
46 |
- for repo in selected_repos: |
47 |
- if repo.sync_type is not None: |
48 |
- returncode, message = sync_manager.sync(self.emerge_config, repo) |
49 |
- retvals.append((repo.name, returncode)) |
50 |
- if message: |
51 |
- msgs.append(message) |
52 |
+ |
53 |
+ sync_scheduler = SyncScheduler(emerge_config=self.emerge_config, |
54 |
+ selected_repos=selected_repos, sync_manager=sync_manager, |
55 |
+ max_jobs=self.emerge_config.opts.get('--jobs', 1), |
56 |
+ event_loop=global_event_loop() if portage._internal_caller else |
57 |
+ EventLoop(main=False)) |
58 |
+ |
59 |
+ sync_scheduler.start() |
60 |
+ sync_scheduler.wait() |
61 |
+ retvals = sync_scheduler.retvals |
62 |
+ msgs.extend(sync_scheduler.msgs) |
63 |
|
64 |
# Reload the whole config. |
65 |
portage._sync_mode = False |
66 |
@@ -287,3 +296,105 @@ class SyncRepos(object): |
67 |
messages.append("Action: %s for repo: %s, returned code = %s" |
68 |
% (action, rval[0], rval[1])) |
69 |
return messages |
70 |
+ |
71 |
+ |
72 |
+class SyncScheduler(AsyncScheduler): |
73 |
+ ''' |
74 |
+ Sync repos in parallel, but don't sync a given repo until all |
75 |
+ of it's masters have synced. |
76 |
+ ''' |
77 |
+ def __init__(self, **kwargs): |
78 |
+ ''' |
79 |
+ @param emerge_config: an emerge_config instance |
80 |
+ @param selected_repos: list of RepoConfig instances |
81 |
+ @param sync_manager: a SyncManger instance |
82 |
+ ''' |
83 |
+ self._emerge_config = kwargs.pop('emerge_config') |
84 |
+ self._selected_repos = kwargs.pop('selected_repos') |
85 |
+ self._sync_manager = kwargs.pop('sync_manager') |
86 |
+ AsyncScheduler.__init__(self, **kwargs) |
87 |
+ self._init_graph() |
88 |
+ self._leaf_nodes = self._sync_graph.leaf_nodes() |
89 |
+ self.retvals = [] |
90 |
+ self.msgs = [] |
91 |
+ |
92 |
+ def _init_graph(self): |
93 |
+ ''' |
94 |
+ Graph relationships between repos and their masters. |
95 |
+ ''' |
96 |
+ self._sync_graph = digraph() |
97 |
+ self._repo_map = {} |
98 |
+ self._running_repos = set() |
99 |
+ for repo in self._selected_repos: |
100 |
+ self._repo_map[repo.name] = repo |
101 |
+ self._sync_graph.add(repo.name, None) |
102 |
+ for master in repo.masters: |
103 |
+ self._repo_map[master.name] = master |
104 |
+ self._sync_graph.add(master.name, repo.name) |
105 |
+ |
106 |
+ def _task_exit(self, task): |
107 |
+ ''' |
108 |
+ Remove the task from the graph, in order to expose |
109 |
+ more leaf nodes. |
110 |
+ ''' |
111 |
+ self._running_tasks.discard(task) |
112 |
+ returncode = task.returncode |
113 |
+ if task.returncode == os.EX_OK: |
114 |
+ returncode, message, updatecache_flg = task.result |
115 |
+ if message: |
116 |
+ self.msgs.append(message) |
117 |
+ repo = task.kwargs['repo'].name |
118 |
+ self._running_repos.remove(repo) |
119 |
+ self.retvals.append((repo, returncode)) |
120 |
+ self._sync_graph.remove(repo) |
121 |
+ self._update_leaf_nodes() |
122 |
+ super(SyncScheduler, self)._task_exit(self) |
123 |
+ |
124 |
+ def _update_leaf_nodes(self): |
125 |
+ ''' |
126 |
+ Populate self._leaf_nodes with current leaves from |
127 |
+ self._sync_graph. If a circular master relationship |
128 |
+ is discovered, choose a random node to break the cycle. |
129 |
+ ''' |
130 |
+ if self._sync_graph and not self._leaf_nodes: |
131 |
+ self._leaf_nodes = [obj for obj in |
132 |
+ self._sync_graph.leaf_nodes() |
133 |
+ if obj not in self._running_repos] |
134 |
+ |
135 |
+ if not (self._leaf_nodes or self._running_repos): |
136 |
+ # If there is a circular master relationship, |
137 |
+ # choose a random node to break the cycle. |
138 |
+ self._leaf_nodes = [next(iter(self._sync_graph))] |
139 |
+ |
140 |
+ def _next_task(self): |
141 |
+ ''' |
142 |
+ Return a task for the next available leaf node. |
143 |
+ ''' |
144 |
+ if not self._sync_graph: |
145 |
+ raise StopIteration() |
146 |
+ # If self._sync_graph is non-empty, then self._leaf_nodes |
147 |
+ # is guaranteed to be non-empty, since otherwise |
148 |
+ # _can_add_job would have returned False and prevented |
149 |
+ # _next_task from being immediately called. |
150 |
+ node = self._leaf_nodes.pop() |
151 |
+ self._running_repos.add(node) |
152 |
+ self._update_leaf_nodes() |
153 |
+ |
154 |
+ task = self._sync_manager.async( |
155 |
+ self._emerge_config, self._repo_map[node]) |
156 |
+ return task |
157 |
+ |
158 |
+ def _can_add_job(self): |
159 |
+ ''' |
160 |
+ Returns False if there are no leaf nodes available. |
161 |
+ ''' |
162 |
+ if not AsyncScheduler._can_add_job(self): |
163 |
+ return False |
164 |
+ return bool(self._leaf_nodes) and not self._terminated.is_set() |
165 |
+ |
166 |
+ def _keep_scheduling(self): |
167 |
+ ''' |
168 |
+ Schedule as long as the graph is non-empty, and we haven't |
169 |
+ been terminated. |
170 |
+ ''' |
171 |
+ return bool(self._sync_graph) and not self._terminated.is_set() |
172 |
diff --git a/pym/portage/sync/controller.py b/pym/portage/sync/controller.py |
173 |
index 307487f..e992cc4 100644 |
174 |
--- a/pym/portage/sync/controller.py |
175 |
+++ b/pym/portage/sync/controller.py |
176 |
@@ -21,6 +21,7 @@ bad = create_color_func("BAD") |
177 |
warn = create_color_func("WARN") |
178 |
from portage.package.ebuild.doebuild import _check_temp_dir |
179 |
from portage.metadata import action_metadata |
180 |
+from portage.util._async.AsyncFunction import AsyncFunction |
181 |
from portage import OrderedDict |
182 |
from portage import _unicode_decode |
183 |
from portage import util |
184 |
@@ -113,12 +114,18 @@ class SyncManager(object): |
185 |
return desc |
186 |
return [] |
187 |
|
188 |
+ def async(self, emerge_config=None, repo=None): |
189 |
+ proc = AsyncFunction(target=self.sync, |
190 |
+ kwargs=dict(emerge_config=emerge_config, repo=repo)) |
191 |
+ proc.addExitListener(self._sync_callback) |
192 |
+ return proc |
193 |
|
194 |
- def sync(self, emerge_config=None, repo=None, callback=None): |
195 |
+ def sync(self, emerge_config=None, repo=None): |
196 |
self.emerge_config = emerge_config |
197 |
- self.callback = callback or self._sync_callback |
198 |
+ self.callback = None |
199 |
self.repo = repo |
200 |
self.exitcode = 1 |
201 |
+ self.updatecache_flg = False |
202 |
if repo.sync_type in self.module_names: |
203 |
tasks = [self.module_controller.get_class(repo.sync_type)] |
204 |
else: |
205 |
@@ -149,13 +156,14 @@ class SyncManager(object): |
206 |
|
207 |
self.perform_post_sync_hook(repo.name, repo.sync_uri, repo.location) |
208 |
|
209 |
- return self.exitcode, None |
210 |
+ return self.exitcode, None, self.updatecache_flg |
211 |
|
212 |
|
213 |
def do_callback(self, result): |
214 |
#print("result:", result, "callback()", self.callback) |
215 |
exitcode, updatecache_flg = result |
216 |
self.exitcode = exitcode |
217 |
+ self.updatecache_flg = updatecache_flg |
218 |
if exitcode == 0: |
219 |
msg = "=== Sync completed for %s" % self.repo.name |
220 |
self.logger(self.xterm_titles, msg) |
221 |
@@ -310,17 +318,28 @@ class SyncManager(object): |
222 |
os.umask(0o022) |
223 |
return os.EX_OK |
224 |
|
225 |
+ def _sync_callback(self, proc): |
226 |
+ """ |
227 |
+ This is called in the parent process, serially, for each of the |
228 |
+ sync jobs when they complete. Some cache backends such as sqlite |
229 |
+ may require that cache access be performed serially in the |
230 |
+ parent process like this. |
231 |
+ """ |
232 |
+ repo = proc.kwargs['repo'] |
233 |
+ exitcode = proc.returncode |
234 |
+ updatecache_flg = False |
235 |
+ if proc.returncode == os.EX_OK: |
236 |
+ exitcode, message, updatecache_flg = proc.result |
237 |
|
238 |
- def _sync_callback(self, exitcode, updatecache_flg): |
239 |
if updatecache_flg and "metadata-transfer" not in self.settings.features: |
240 |
updatecache_flg = False |
241 |
|
242 |
if updatecache_flg and \ |
243 |
os.path.exists(os.path.join( |
244 |
- self.repo.location, 'metadata', 'md5-cache')): |
245 |
+ repo.location, 'metadata', 'md5-cache')): |
246 |
|
247 |
# Only update cache for repo.location since that's |
248 |
# the only one that's been synced here. |
249 |
action_metadata(self.settings, self.portdb, self.emerge_config.opts, |
250 |
- porttrees=[self.repo.location]) |
251 |
+ porttrees=[repo.location]) |
252 |
|
253 |
diff --git a/pym/portage/tests/sync/test_sync_local.py b/pym/portage/tests/sync/test_sync_local.py |
254 |
index f50caba..7753a26 100644 |
255 |
--- a/pym/portage/tests/sync/test_sync_local.py |
256 |
+++ b/pym/portage/tests/sync/test_sync_local.py |
257 |
@@ -55,8 +55,12 @@ class SyncLocalTestCase(TestCase): |
258 |
"dev-libs/A-0": {} |
259 |
} |
260 |
|
261 |
+ user_config = { |
262 |
+ 'make.conf': ('FEATURES="metadata-transfer"',) |
263 |
+ } |
264 |
+ |
265 |
playground = ResolverPlayground(ebuilds=ebuilds, |
266 |
- profile=profile, user_config={}, debug=debug) |
267 |
+ profile=profile, user_config=user_config, debug=debug) |
268 |
settings = playground.settings |
269 |
eprefix = settings["EPREFIX"] |
270 |
eroot = settings["EROOT"] |
271 |
diff --git a/pym/portage/util/_async/AsyncFunction.py b/pym/portage/util/_async/AsyncFunction.py |
272 |
new file mode 100644 |
273 |
index 0000000..fda1fe0 |
274 |
--- /dev/null |
275 |
+++ b/pym/portage/util/_async/AsyncFunction.py |
276 |
@@ -0,0 +1,62 @@ |
277 |
+# Copyright 2015 Gentoo Foundation |
278 |
+# Distributed under the terms of the GNU General Public License v2 |
279 |
+ |
280 |
+import pickle |
281 |
+import traceback |
282 |
+ |
283 |
+from portage import os |
284 |
+from portage.util._async.ForkProcess import ForkProcess |
285 |
+from _emerge.PipeReader import PipeReader |
286 |
+ |
287 |
+class AsyncFunction(ForkProcess): |
288 |
+ """ |
289 |
+ Execute a function call in a fork, and retrieve the function |
290 |
+ return value via pickling/unpickling, accessible as the |
291 |
+ "result" attribute after the forked process has exited. |
292 |
+ """ |
293 |
+ |
294 |
+ __slots__ = ('args', 'kwargs', 'result', 'target', |
295 |
+ '_async_func_reader', '_async_func_reader_pw') |
296 |
+ |
297 |
+ def _start(self): |
298 |
+ pr, pw = os.pipe() |
299 |
+ self.fd_pipes = {} |
300 |
+ self.fd_pipes[pw] = pw |
301 |
+ self._async_func_reader_pw = pw |
302 |
+ self._async_func_reader = PipeReader( |
303 |
+ input_files={"input":pr}, |
304 |
+ scheduler=self.scheduler) |
305 |
+ self._async_func_reader.addExitListener(self._async_func_reader_exit) |
306 |
+ self._async_func_reader.start() |
307 |
+ ForkProcess._start(self) |
308 |
+ os.close(pw) |
309 |
+ |
310 |
+ def _run(self): |
311 |
+ try: |
312 |
+ result = self.target(*(self.args or []), **(self.kwargs or {})) |
313 |
+ os.write(self._async_func_reader_pw, pickle.dumps(result)) |
314 |
+ except Exception: |
315 |
+ traceback.print_exc() |
316 |
+ return 1 |
317 |
+ |
318 |
+ return os.EX_OK |
319 |
+ |
320 |
+ def _pipe_logger_exit(self, pipe_logger): |
321 |
+ # Ignore this event, since we want to ensure that we exit |
322 |
+ # only after _async_func_reader_exit has reached EOF. |
323 |
+ self._pipe_logger = None |
324 |
+ |
325 |
+ def _async_func_reader_exit(self, pipe_reader): |
326 |
+ self.result = pickle.loads(pipe_reader.getvalue()) |
327 |
+ self._async_func_reader = None |
328 |
+ self._unregister() |
329 |
+ self.wait() |
330 |
+ |
331 |
+ def _unregister(self): |
332 |
+ ForkProcess._unregister(self) |
333 |
+ |
334 |
+ pipe_reader = self._async_func_reader |
335 |
+ if pipe_reader is not None: |
336 |
+ self._async_func_reader = None |
337 |
+ pipe_reader.removeExitListener(self._async_func_reader_exit) |
338 |
+ pipe_reader.cancel() |
339 |
-- |
340 |
2.4.6 |