public inbox for gentoo-portage-dev@lists.gentoo.org
 help / color / mirror / Atom feed
Search results ordered by [date|relevance]  view[summary|nested|Atom feed]
thread overview below | download: 
* [gentoo-portage-dev] [PATCH] sync repositories in parallel (bug 557426)
@ 2015-08-13  8:34 99% Zac Medico
  0 siblings, 0 replies; 1+ results
From: Zac Medico @ 2015-08-13  8:34 UTC (permalink / raw
  To: gentoo-portage-dev; +Cc: Zac Medico

Repos are synced in parallel (includes their post-sync hooks). Output
of concurrent processes is currently mixed (irrelevant with --quiet).
Support for FEATURES=metadata-transfer is handled in the main process,
which may be required for some backends (such as sqlite). Repos are
synced only after their master(s) have synced (in case that matters
for hooks).

X-Gentoo-Bug: 557426
X-Gentoo-Bug-url: https://bugs.gentoo.org/show_bug.cgi?id=557426
---
 pym/portage/emaint/modules/sync/sync.py   | 125 ++++++++++++++++++++++++++++--
 pym/portage/sync/controller.py            |  31 ++++++--
 pym/portage/tests/sync/test_sync_local.py |   6 +-
 pym/portage/util/_async/AsyncFunction.py  |  62 +++++++++++++++
 4 files changed, 210 insertions(+), 14 deletions(-)
 create mode 100644 pym/portage/util/_async/AsyncFunction.py

diff --git a/pym/portage/emaint/modules/sync/sync.py b/pym/portage/emaint/modules/sync/sync.py
index b463073..8a28f17 100644
--- a/pym/portage/emaint/modules/sync/sync.py
+++ b/pym/portage/emaint/modules/sync/sync.py
@@ -13,6 +13,10 @@ from portage.output import bold, red, create_color_func
 from portage._global_updates import _global_updates
 from portage.sync.controller import SyncManager
 from portage.util import writemsg_level
+from portage.util.digraph import digraph
+from portage.util._async.AsyncScheduler import AsyncScheduler
+from portage.util._eventloop.global_event_loop import global_event_loop
+from portage.util._eventloop.EventLoop import EventLoop
 
 import _emerge
 from _emerge.emergelog import emergelog
@@ -201,6 +205,7 @@ class SyncRepos(object):
 					k = "--" + k.replace("_", "-")
 					self.emerge_config.opts[k] = v
 
+		selected_repos = [repo for repo in selected_repos if repo.sync_type is not None]
 		msgs = []
 		if not selected_repos:
 			msgs.append("Emaint sync, nothing to sync... returning")
@@ -213,13 +218,17 @@ class SyncRepos(object):
 
 		sync_manager = SyncManager(
 			self.emerge_config.target_config.settings, emergelog)
-		retvals = []
-		for repo in selected_repos:
-			if repo.sync_type is not None:
-				returncode, message = sync_manager.sync(self.emerge_config, repo)
-				retvals.append((repo.name, returncode))
-				if message:
-					msgs.append(message)
+
+		sync_scheduler = SyncScheduler(emerge_config=self.emerge_config,
+			selected_repos=selected_repos, sync_manager=sync_manager,
+			max_jobs=self.emerge_config.opts.get('--jobs', 1),
+			event_loop=global_event_loop() if portage._internal_caller else
+				EventLoop(main=False))
+
+		sync_scheduler.start()
+		sync_scheduler.wait()
+		retvals = sync_scheduler.retvals
+		msgs.extend(sync_scheduler.msgs)
 
 		# Reload the whole config.
 		portage._sync_mode = False
@@ -287,3 +296,105 @@ class SyncRepos(object):
 			messages.append("Action: %s for repo: %s, returned code = %s"
 				% (action, rval[0], rval[1]))
 		return messages
+
+
+class SyncScheduler(AsyncScheduler):
+	'''
+	Sync repos in parallel, but don't sync a given repo until all
+	of it's masters have synced.
+	'''
+	def __init__(self, **kwargs):
+		'''
+		@param emerge_config: an emerge_config instance
+		@param selected_repos: list of RepoConfig instances
+		@param sync_manager: a SyncManger instance
+		'''
+		self._emerge_config = kwargs.pop('emerge_config')
+		self._selected_repos = kwargs.pop('selected_repos')
+		self._sync_manager = kwargs.pop('sync_manager')
+		AsyncScheduler.__init__(self, **kwargs)
+		self._init_graph()
+		self._leaf_nodes = self._sync_graph.leaf_nodes()
+		self.retvals = []
+		self.msgs = []
+
+	def _init_graph(self):
+		'''
+		Graph relationships between repos and their masters.
+		'''
+		self._sync_graph = digraph()
+		self._repo_map = {}
+		self._running_repos = set()
+		for repo in self._selected_repos:
+			self._repo_map[repo.name] = repo
+			self._sync_graph.add(repo.name, None)
+			for master in repo.masters:
+				self._repo_map[master.name] = master
+				self._sync_graph.add(master.name, repo.name)
+
+	def _task_exit(self, task):
+		'''
+		Remove the task from the graph, in order to expose
+		more leaf nodes.
+		'''
+		self._running_tasks.discard(task)
+		returncode = task.returncode
+		if task.returncode == os.EX_OK:
+			returncode, message, updatecache_flg = task.result
+			if message:
+				self.msgs.append(message)
+		repo = task.kwargs['repo'].name
+		self._running_repos.remove(repo)
+		self.retvals.append((repo, returncode))
+		self._sync_graph.remove(repo)
+		self._update_leaf_nodes()
+		super(SyncScheduler, self)._task_exit(self)
+
+	def _update_leaf_nodes(self):
+		'''
+		Populate self._leaf_nodes with current leaves from
+		self._sync_graph. If a circular master relationship
+		is discovered, choose a random node to break the cycle.
+		'''
+		if self._sync_graph and not self._leaf_nodes:
+			self._leaf_nodes = [obj for obj in
+				self._sync_graph.leaf_nodes()
+				if obj not in self._running_repos]
+
+			if not (self._leaf_nodes or self._running_repos):
+				# If there is a circular master relationship,
+				# choose a random node to break the cycle.
+				self._leaf_nodes = [next(iter(self._sync_graph))]
+
+	def _next_task(self):
+		'''
+		Return a task for the next available leaf node.
+		'''
+		if not self._sync_graph:
+			raise StopIteration()
+		# If self._sync_graph is non-empty, then self._leaf_nodes
+		# is guaranteed to be non-empty, since otherwise
+		# _can_add_job would have returned False and prevented
+		# _next_task from being immediately called.
+		node = self._leaf_nodes.pop()
+		self._running_repos.add(node)
+		self._update_leaf_nodes()
+
+		task = self._sync_manager.async(
+			self._emerge_config, self._repo_map[node])
+		return task
+
+	def _can_add_job(self):
+		'''
+		Returns False if there are no leaf nodes available.
+		'''
+		if not AsyncScheduler._can_add_job(self):
+			return False
+		return bool(self._leaf_nodes) and not self._terminated.is_set()
+
+	def _keep_scheduling(self):
+		'''
+		Schedule as long as the graph is non-empty, and we haven't
+		been terminated.
+		'''
+		return bool(self._sync_graph) and not self._terminated.is_set()
diff --git a/pym/portage/sync/controller.py b/pym/portage/sync/controller.py
index 307487f..e992cc4 100644
--- a/pym/portage/sync/controller.py
+++ b/pym/portage/sync/controller.py
@@ -21,6 +21,7 @@ bad = create_color_func("BAD")
 warn = create_color_func("WARN")
 from portage.package.ebuild.doebuild import _check_temp_dir
 from portage.metadata import action_metadata
+from portage.util._async.AsyncFunction import AsyncFunction
 from portage import OrderedDict
 from portage import _unicode_decode
 from portage import util
@@ -113,12 +114,18 @@ class SyncManager(object):
 			return desc
 		return []
 
+	def async(self, emerge_config=None, repo=None):
+		proc = AsyncFunction(target=self.sync,
+			kwargs=dict(emerge_config=emerge_config, repo=repo))
+		proc.addExitListener(self._sync_callback)
+		return proc
 
-	def sync(self, emerge_config=None, repo=None, callback=None):
+	def sync(self, emerge_config=None, repo=None):
 		self.emerge_config = emerge_config
-		self.callback = callback or self._sync_callback
+		self.callback = None
 		self.repo = repo
 		self.exitcode = 1
+		self.updatecache_flg = False
 		if repo.sync_type in self.module_names:
 			tasks = [self.module_controller.get_class(repo.sync_type)]
 		else:
@@ -149,13 +156,14 @@ class SyncManager(object):
 
 		self.perform_post_sync_hook(repo.name, repo.sync_uri, repo.location)
 
-		return self.exitcode, None
+		return self.exitcode, None, self.updatecache_flg
 
 
 	def do_callback(self, result):
 		#print("result:", result, "callback()", self.callback)
 		exitcode, updatecache_flg = result
 		self.exitcode = exitcode
+		self.updatecache_flg = updatecache_flg
 		if exitcode == 0:
 			msg = "=== Sync completed for %s" % self.repo.name
 			self.logger(self.xterm_titles, msg)
@@ -310,17 +318,28 @@ class SyncManager(object):
 		os.umask(0o022)
 		return os.EX_OK
 
+	def _sync_callback(self, proc):
+		"""
+		This is called in the parent process, serially, for each of the
+		sync jobs when they complete. Some cache backends such as sqlite
+		may require that cache access be performed serially in the
+		parent process like this.
+		"""
+		repo = proc.kwargs['repo']
+		exitcode = proc.returncode
+		updatecache_flg = False
+		if proc.returncode == os.EX_OK:
+			exitcode, message, updatecache_flg = proc.result
 
-	def _sync_callback(self, exitcode, updatecache_flg):
 		if updatecache_flg and "metadata-transfer" not in self.settings.features:
 			updatecache_flg = False
 
 		if updatecache_flg and \
 			os.path.exists(os.path.join(
-			self.repo.location, 'metadata', 'md5-cache')):
+			repo.location, 'metadata', 'md5-cache')):
 
 			# Only update cache for repo.location since that's
 			# the only one that's been synced here.
 			action_metadata(self.settings, self.portdb, self.emerge_config.opts,
-				porttrees=[self.repo.location])
+				porttrees=[repo.location])
 
diff --git a/pym/portage/tests/sync/test_sync_local.py b/pym/portage/tests/sync/test_sync_local.py
index f50caba..7753a26 100644
--- a/pym/portage/tests/sync/test_sync_local.py
+++ b/pym/portage/tests/sync/test_sync_local.py
@@ -55,8 +55,12 @@ class SyncLocalTestCase(TestCase):
 			"dev-libs/A-0": {}
 		}
 
+		user_config = {
+			'make.conf': ('FEATURES="metadata-transfer"',)
+		}
+
 		playground = ResolverPlayground(ebuilds=ebuilds,
-			profile=profile, user_config={}, debug=debug)
+			profile=profile, user_config=user_config, debug=debug)
 		settings = playground.settings
 		eprefix = settings["EPREFIX"]
 		eroot = settings["EROOT"]
diff --git a/pym/portage/util/_async/AsyncFunction.py b/pym/portage/util/_async/AsyncFunction.py
new file mode 100644
index 0000000..fda1fe0
--- /dev/null
+++ b/pym/portage/util/_async/AsyncFunction.py
@@ -0,0 +1,62 @@
+# Copyright 2015 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+import pickle
+import traceback
+
+from portage import os
+from portage.util._async.ForkProcess import ForkProcess
+from _emerge.PipeReader import PipeReader
+
+class AsyncFunction(ForkProcess):
+	"""
+	Execute a function call in a fork, and retrieve the function
+	return value via pickling/unpickling, accessible as the
+	"result" attribute after the forked process has exited.
+	"""
+
+	__slots__ = ('args', 'kwargs', 'result', 'target',
+		'_async_func_reader', '_async_func_reader_pw')
+
+	def _start(self):
+		pr, pw = os.pipe()
+		self.fd_pipes = {}
+		self.fd_pipes[pw] = pw
+		self._async_func_reader_pw = pw
+		self._async_func_reader = PipeReader(
+			input_files={"input":pr},
+			scheduler=self.scheduler)
+		self._async_func_reader.addExitListener(self._async_func_reader_exit)
+		self._async_func_reader.start()
+		ForkProcess._start(self)
+		os.close(pw)
+
+	def _run(self):
+		try:
+			result = self.target(*(self.args or []), **(self.kwargs or {}))
+			os.write(self._async_func_reader_pw, pickle.dumps(result))
+		except Exception:
+			traceback.print_exc()
+			return 1
+
+		return os.EX_OK
+
+	def _pipe_logger_exit(self, pipe_logger):
+		# Ignore this event, since we want to ensure that we exit
+		# only after _async_func_reader_exit has reached EOF.
+		self._pipe_logger = None
+
+	def _async_func_reader_exit(self, pipe_reader):
+		self.result = pickle.loads(pipe_reader.getvalue())
+		self._async_func_reader = None
+		self._unregister()
+		self.wait()
+
+	def _unregister(self):
+		ForkProcess._unregister(self)
+
+		pipe_reader = self._async_func_reader
+		if pipe_reader is not None:
+			self._async_func_reader = None
+			pipe_reader.removeExitListener(self._async_func_reader_exit)
+			pipe_reader.cancel()
-- 
2.4.6



^ permalink raw reply related	[relevance 99%]

Results 1-1 of 1 | reverse | options above
-- pct% links below jump to the message on this page, permalinks otherwise --
2015-08-13  8:34 99% [gentoo-portage-dev] [PATCH] sync repositories in parallel (bug 557426) Zac Medico

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox