Gentoo Archives: gentoo-portage-dev

From: Zac Medico <zmedico@g.o>
To: gentoo-portage-dev@l.g.o
Cc: Zac Medico <zmedico@g.o>
Subject: [gentoo-portage-dev] [PATCH] dblink: add locks for parallel-install with blockers (bug 576888)
Date: Mon, 14 Mar 2016 00:24:16
Message-Id: 1457914987-3048-1-git-send-email-zmedico@gentoo.org
1 For parallel-install, lock package slots of the current package and
2 blocked packages, in order to account for blocked packages being
3 removed or replaced concurrently. Acquire locks in predictable order,
4 preventing deadlocks with competitors that may be trying to acquire
5 overlapping locks.
6
7 X-Gentoo-Bug: 576888
8 X-Gentoo-Bug-url: https://bugs.gentoo.org/show_bug.cgi?id=576888
9 ---
10 pym/_emerge/Scheduler.py | 9 +-
11 pym/portage/dbapi/vartree.py | 99 +++++++++++-
12 .../emerge/test_emerge_blocker_file_collision.py | 168 +++++++++++++++++++++
13 3 files changed, 267 insertions(+), 9 deletions(-)
14 create mode 100644 pym/portage/tests/emerge/test_emerge_blocker_file_collision.py
15
16 diff --git a/pym/_emerge/Scheduler.py b/pym/_emerge/Scheduler.py
17 index 20a4e85..97b826a 100644
18 --- a/pym/_emerge/Scheduler.py
19 +++ b/pym/_emerge/Scheduler.py
20 @@ -586,18 +586,15 @@ class Scheduler(PollScheduler):
21
22 blocker_db = self._blocker_db[new_pkg.root]
23
24 - blocker_dblinks = []
25 + blocked_pkgs = []
26 for blocking_pkg in blocker_db.findInstalledBlockers(new_pkg):
27 if new_pkg.slot_atom == blocking_pkg.slot_atom:
28 continue
29 if new_pkg.cpv == blocking_pkg.cpv:
30 continue
31 - blocker_dblinks.append(portage.dblink(
32 - blocking_pkg.category, blocking_pkg.pf, blocking_pkg.root,
33 - self.pkgsettings[blocking_pkg.root], treetype="vartree",
34 - vartree=self.trees[blocking_pkg.root]["vartree"]))
35 + blocked_pkgs.append(blocking_pkg)
36
37 - return blocker_dblinks
38 + return blocked_pkgs
39
40 def _generate_digests(self):
41 """
42 diff --git a/pym/portage/dbapi/vartree.py b/pym/portage/dbapi/vartree.py
43 index e7effca..6209a86 100644
44 --- a/pym/portage/dbapi/vartree.py
45 +++ b/pym/portage/dbapi/vartree.py
46 @@ -168,6 +168,7 @@ class vardbapi(dbapi):
47 self._conf_mem_file = self._eroot + CONFIG_MEMORY_FILE
48 self._fs_lock_obj = None
49 self._fs_lock_count = 0
50 + self._slot_locks = {}
51
52 if vartree is None:
53 vartree = portage.db[settings['EROOT']]['vartree']
54 @@ -284,6 +285,38 @@ class vardbapi(dbapi):
55 self._fs_lock_obj = None
56 self._fs_lock_count -= 1
57
58 + def _slot_lock(self, slot_atom):
59 + """
60 + Acquire a slot lock (reentrant).
61 +
62 + WARNING: The varbapi._slot_lock method is not safe to call
63 + in the main process when that process is scheduling
64 + install/uninstall tasks in parallel, since the locks would
65 + be inherited by child processes. In order to avoid this sort
66 + of problem, this method should be called in a subprocess
67 + (typically spawned by the MergeProcess class).
68 + """
69 + lock, counter = self._slot_locks.get(slot_atom, (None, 0))
70 + if lock is None:
71 + lock_path = self.getpath("%s:%s" % (slot_atom.cp, slot_atom.slot))
72 + ensure_dirs(os.path.dirname(lock_path))
73 + lock = lockfile(lock_path, wantnewlockfile=True)
74 + self._slot_locks[slot_atom] = (lock, counter + 1)
75 +
76 + def _slot_unlock(self, slot_atom):
77 + """
78 + Release a slot lock (or decrementing recursion level).
79 + """
80 + lock, counter = self._slot_locks.get(slot_atom, (None, 0))
81 + if lock is None:
82 + raise AssertionError("not locked")
83 + counter -= 1
84 + if counter == 0:
85 + unlockfile(lock)
86 + del self._slot_locks[slot_atom]
87 + else:
88 + self._slot_locks[slot_atom] = (lock, counter)
89 +
90 def _bump_mtime(self, cpv):
91 """
92 This is called before an after any modifications, so that consumers
93 @@ -1590,6 +1623,7 @@ class dblink(object):
94 # compliance with RESTRICT=preserve-libs.
95 self._preserve_libs = "preserve-libs" in mysettings.features
96 self._contents = ContentsCaseSensitivityManager(self)
97 + self._slot_locks = []
98
99 def __hash__(self):
100 return hash(self._hash_key)
101 @@ -1623,6 +1657,58 @@ class dblink(object):
102 def unlockdb(self):
103 self.vartree.dbapi.unlock()
104
105 + def _slot_locked(f):
106 + """
107 + A decorator function which, when parallel-install is enabled,
108 + acquires and releases slot locks for the current package and
109 + blocked packages. This is required in order to account for
110 + interactions with blocked packages (involving resolution of
111 + file collisions).
112 + """
113 + def wrapper(self, *args, **kwargs):
114 + if "parallel-install" in self.settings.features:
115 + self._acquire_slot_locks(
116 + kwargs.get("mydbapi", self.vartree.dbapi))
117 + try:
118 + return f(self, *args, **kwargs)
119 + finally:
120 + self._release_slot_locks()
121 + return wrapper
122 +
123 + def _acquire_slot_locks(self, db):
124 + """
125 + Acquire slot locks for the current package and blocked packages.
126 + """
127 +
128 + slot_atoms = []
129 +
130 + try:
131 + slot = self.mycpv.slot
132 + except AttributeError:
133 + slot, = db.aux_get(self.mycpv, ["SLOT"])
134 + slot = slot.partition("/")[0]
135 +
136 + slot_atoms.append(portage.dep.Atom(
137 + "%s:%s" % (self.mycpv.cp, slot)))
138 +
139 + for blocker in self._blockers or []:
140 + slot_atoms.append(blocker.slot_atom)
141 +
142 + # Sort atoms so that locks are acquired in a predictable
143 + # order, preventing deadlocks with competitors that may
144 + # be trying to acquire overlapping locks.
145 + slot_atoms.sort()
146 + for slot_atom in slot_atoms:
147 + self.vartree.dbapi._slot_lock(slot_atom)
148 + self._slot_locks.append(slot_atom)
149 +
150 + def _release_slot_locks(self):
151 + """
152 + Release all slot locks.
153 + """
154 + while self._slot_locks:
155 + self.vartree.dbapi._slot_unlock(self._slot_locks.pop())
156 +
157 def getpath(self):
158 "return path to location of db information (for >>> informational display)"
159 return self.dbdir
160 @@ -1863,6 +1949,7 @@ class dblink(object):
161 plib_registry.unlock()
162 self.vartree.dbapi._fs_unlock()
163
164 + @_slot_locked
165 def unmerge(self, pkgfiles=None, trimworld=None, cleanup=True,
166 ldpath_mtimes=None, others_in_slot=None, needed=None,
167 preserve_paths=None):
168 @@ -3929,9 +4016,14 @@ class dblink(object):
169 prepare_build_dirs(settings=self.settings, cleanup=cleanup)
170
171 # check for package collisions
172 - blockers = self._blockers
173 - if blockers is None:
174 - blockers = []
175 + blockers = []
176 + for blocker in self._blockers or []:
177 + blocker = self.vartree.dbapi._dblink(blocker.cpv)
178 + # It may have been unmerged before lock(s)
179 + # were aquired.
180 + if blocker.exists():
181 + blockers.append(blocker)
182 +
183 collisions, dirs_ro, symlink_collisions, plib_collisions = \
184 self._collision_protect(srcroot, destroot,
185 others_in_slot + blockers, filelist, linklist)
186 @@ -4993,6 +5085,7 @@ class dblink(object):
187 else:
188 proc.wait()
189
190 + @_slot_locked
191 def merge(self, mergeroot, inforoot, myroot=None, myebuild=None, cleanup=0,
192 mydbapi=None, prev_mtimes=None, counter=None):
193 """
194 diff --git a/pym/portage/tests/emerge/test_emerge_blocker_file_collision.py b/pym/portage/tests/emerge/test_emerge_blocker_file_collision.py
195 new file mode 100644
196 index 0000000..10d09d8
197 --- /dev/null
198 +++ b/pym/portage/tests/emerge/test_emerge_blocker_file_collision.py
199 @@ -0,0 +1,168 @@
200 +# Copyright 2016 Gentoo Foundation
201 +# Distributed under the terms of the GNU General Public License v2
202 +
203 +import subprocess
204 +import sys
205 +
206 +import portage
207 +from portage import os
208 +from portage import _unicode_decode
209 +from portage.const import PORTAGE_PYM_PATH, USER_CONFIG_PATH
210 +from portage.process import find_binary
211 +from portage.tests import TestCase
212 +from portage.tests.resolver.ResolverPlayground import ResolverPlayground
213 +from portage.util import ensure_dirs
214 +
215 +class BlockerFileCollisionEmergeTestCase(TestCase):
216 +
217 + def testBlockerFileCollision(self):
218 +
219 + debug = False
220 +
221 + install_something = """
222 +S="${WORKDIR}"
223 +
224 +src_install() {
225 + einfo "installing something..."
226 + insinto /usr/lib
227 + echo "${PN}" > "${T}/file-collision"
228 + doins "${T}/file-collision"
229 +}
230 +"""
231 +
232 + ebuilds = {
233 + "dev-libs/A-1" : {
234 + "EAPI": "6",
235 + "MISC_CONTENT": install_something,
236 + "RDEPEND": "!dev-libs/B",
237 + },
238 + "dev-libs/B-1" : {
239 + "EAPI": "6",
240 + "MISC_CONTENT": install_something,
241 + "RDEPEND": "!dev-libs/A",
242 + },
243 + }
244 +
245 + playground = ResolverPlayground(ebuilds=ebuilds, debug=debug)
246 + settings = playground.settings
247 + eprefix = settings["EPREFIX"]
248 + eroot = settings["EROOT"]
249 + var_cache_edb = os.path.join(eprefix, "var", "cache", "edb")
250 + user_config_dir = os.path.join(eprefix, USER_CONFIG_PATH)
251 +
252 + portage_python = portage._python_interpreter
253 + emerge_cmd = (portage_python, "-b", "-Wd",
254 + os.path.join(self.bindir, "emerge"))
255 +
256 + file_collision = os.path.join(eroot, 'usr/lib/file-collision')
257 +
258 + test_commands = (
259 + emerge_cmd + ("--oneshot", "dev-libs/A",),
260 + (lambda: portage.util.grablines(file_collision) == ["A\n"],),
261 + emerge_cmd + ("--oneshot", "dev-libs/B",),
262 + (lambda: portage.util.grablines(file_collision) == ["B\n"],),
263 + emerge_cmd + ("--oneshot", "dev-libs/A",),
264 + (lambda: portage.util.grablines(file_collision) == ["A\n"],),
265 + ({"FEATURES":"parallel-install"},) + emerge_cmd + ("--oneshot", "dev-libs/B",),
266 + (lambda: portage.util.grablines(file_collision) == ["B\n"],),
267 + ({"FEATURES":"parallel-install"},) + emerge_cmd + ("-Cq", "dev-libs/B",),
268 + (lambda: not os.path.exists(file_collision),),
269 + )
270 +
271 + fake_bin = os.path.join(eprefix, "bin")
272 + portage_tmpdir = os.path.join(eprefix, "var", "tmp", "portage")
273 + profile_path = settings.profile_path
274 +
275 + path = os.environ.get("PATH")
276 + if path is not None and not path.strip():
277 + path = None
278 + if path is None:
279 + path = ""
280 + else:
281 + path = ":" + path
282 + path = fake_bin + path
283 +
284 + pythonpath = os.environ.get("PYTHONPATH")
285 + if pythonpath is not None and not pythonpath.strip():
286 + pythonpath = None
287 + if pythonpath is not None and \
288 + pythonpath.split(":")[0] == PORTAGE_PYM_PATH:
289 + pass
290 + else:
291 + if pythonpath is None:
292 + pythonpath = ""
293 + else:
294 + pythonpath = ":" + pythonpath
295 + pythonpath = PORTAGE_PYM_PATH + pythonpath
296 +
297 + env = {
298 + "PORTAGE_OVERRIDE_EPREFIX" : eprefix,
299 + "PATH" : path,
300 + "PORTAGE_PYTHON" : portage_python,
301 + "PORTAGE_REPOSITORIES" : settings.repositories.config_string(),
302 + "PYTHONDONTWRITEBYTECODE" : os.environ.get("PYTHONDONTWRITEBYTECODE", ""),
303 + "PYTHONPATH" : pythonpath,
304 + }
305 +
306 + if "__PORTAGE_TEST_HARDLINK_LOCKS" in os.environ:
307 + env["__PORTAGE_TEST_HARDLINK_LOCKS"] = \
308 + os.environ["__PORTAGE_TEST_HARDLINK_LOCKS"]
309 +
310 + dirs = [playground.distdir, fake_bin, portage_tmpdir,
311 + user_config_dir, var_cache_edb]
312 + true_symlinks = ["chown", "chgrp"]
313 + true_binary = find_binary("true")
314 + self.assertEqual(true_binary is None, False,
315 + "true command not found")
316 + try:
317 + for d in dirs:
318 + ensure_dirs(d)
319 + for x in true_symlinks:
320 + os.symlink(true_binary, os.path.join(fake_bin, x))
321 + with open(os.path.join(var_cache_edb, "counter"), 'wb') as f:
322 + f.write(b"100")
323 + # non-empty system set keeps --depclean quiet
324 + with open(os.path.join(profile_path, "packages"), 'w') as f:
325 + f.write("*dev-libs/token-system-pkg")
326 +
327 + if debug:
328 + # The subprocess inherits both stdout and stderr, for
329 + # debugging purposes.
330 + stdout = None
331 + else:
332 + # The subprocess inherits stderr so that any warnings
333 + # triggered by python -Wd will be visible.
334 + stdout = subprocess.PIPE
335 +
336 + for i, args in enumerate(test_commands):
337 +
338 + if hasattr(args[0], '__call__'):
339 + self.assertTrue(args[0](),
340 + "callable at index %s failed" % (i,))
341 + continue
342 +
343 + if isinstance(args[0], dict):
344 + local_env = env.copy()
345 + local_env.update(args[0])
346 + args = args[1:]
347 + else:
348 + local_env = env
349 +
350 + proc = subprocess.Popen(args,
351 + env=local_env, stdout=stdout)
352 +
353 + if debug:
354 + proc.wait()
355 + else:
356 + output = proc.stdout.readlines()
357 + proc.wait()
358 + proc.stdout.close()
359 + if proc.returncode != os.EX_OK:
360 + for line in output:
361 + sys.stderr.write(_unicode_decode(line))
362 +
363 + self.assertEqual(os.EX_OK, proc.returncode,
364 + "emerge failed with args %s" % (args,))
365 + finally:
366 + playground.debug = False
367 + playground.cleanup()
368 --
369 2.7.2

Replies