Gentoo Archives: gentoo-commits

From: Zac Medico <zmedico@g.o>
To: gentoo-commits@l.g.o
Subject: [gentoo-commits] proj/portage:master commit in: lib/portage/repository/storage/, lib/portage/sync/, ...
Date: Mon, 24 Sep 2018 06:12:16
Message-Id: 1537763050.884ad951d700d1871cab2e321e4d8635b1a0f698.zmedico@gentoo
1 commit: 884ad951d700d1871cab2e321e4d8635b1a0f698
2 Author: Zac Medico <zmedico <AT> gentoo <DOT> org>
3 AuthorDate: Mon Jul 30 06:21:30 2018 +0000
4 Commit: Zac Medico <zmedico <AT> gentoo <DOT> org>
5 CommitDate: Mon Sep 24 04:24:10 2018 +0000
6 URL: https://gitweb.gentoo.org/proj/portage.git/commit/?id=884ad951
7
8 rsync: split out repo storage framework
9
10 Since there are many ways to manage repository storage, split out a repo
11 storage framework. The HardlinkQuarantineRepoStorage class implements
12 the existing default behavior, and the InplaceRepoStorage class
13 implements the legacy behavior (when sync-allow-hardlinks is disabled in
14 repos.conf).
15
16 Each class implements RepoStorageInterface, which uses coroutine methods
17 since coroutines are well-suited to the I/O bound tasks that these
18 methods perform. The _sync_decorator is used to convert coroutine
19 methods to synchronous methods, for smooth integration into the
20 surrounding synchronous code.
21
22 Bug: https://bugs.gentoo.org/662070
23 Reviewed-by: Brian Dolbec <dolsen <AT> gentoo.org>
24 Signed-off-by: Zac Medico <zmedico <AT> gentoo.org>
25
26 lib/portage/repository/storage/__init__.py | 2 +
27 .../repository/storage/hardlink_quarantine.py | 95 ++++++++++++++++++++++
28 lib/portage/repository/storage/inplace.py | 49 +++++++++++
29 lib/portage/repository/storage/interface.py | 87 ++++++++++++++++++++
30 lib/portage/sync/controller.py | 1 +
31 lib/portage/sync/modules/rsync/rsync.py | 85 +++++--------------
32 lib/portage/sync/syncbase.py | 53 +++++++++++-
33 7 files changed, 306 insertions(+), 66 deletions(-)
34
35 diff --git a/lib/portage/repository/storage/__init__.py b/lib/portage/repository/storage/__init__.py
36 new file mode 100644
37 index 000000000..58496758f
38 --- /dev/null
39 +++ b/lib/portage/repository/storage/__init__.py
40 @@ -0,0 +1,2 @@
41 +# Copyright 2018 Gentoo Foundation
42 +# Distributed under the terms of the GNU General Public License v2
43
44 diff --git a/lib/portage/repository/storage/hardlink_quarantine.py b/lib/portage/repository/storage/hardlink_quarantine.py
45 new file mode 100644
46 index 000000000..7e9cf4493
47 --- /dev/null
48 +++ b/lib/portage/repository/storage/hardlink_quarantine.py
49 @@ -0,0 +1,95 @@
50 +# Copyright 2018 Gentoo Foundation
51 +# Distributed under the terms of the GNU General Public License v2
52 +
53 +from portage import os
54 +from portage.repository.storage.interface import (
55 + RepoStorageException,
56 + RepoStorageInterface,
57 +)
58 +from portage.util.futures import asyncio
59 +from portage.util.futures.compat_coroutine import (
60 + coroutine,
61 + coroutine_return,
62 +)
63 +
64 +from _emerge.SpawnProcess import SpawnProcess
65 +
66 +
67 +class HardlinkQuarantineRepoStorage(RepoStorageInterface):
68 + """
69 + This is the default storage module, since its quite compatible with
70 + most configurations.
71 +
72 + It's desirable to be able to create shared hardlinks between the
73 + download directory and the normal repository, and this is facilitated
74 + by making the download directory be a subdirectory of the normal
75 + repository location (ensuring that no mountpoints are crossed).
76 + Shared hardlinks are created by using the rsync --link-dest option.
77 +
78 + Since the download is initially unverified, it is safest to save
79 + it in a quarantine directory. The quarantine directory is also
80 + useful for making the repository update more atomic, so that it
81 + less likely that normal repository location will be observed in
82 + a partially synced state.
83 + """
84 + def __init__(self, repo, spawn_kwargs):
85 + self._user_location = repo.location
86 + self._update_location = None
87 + self._spawn_kwargs = spawn_kwargs
88 + self._current_update = None
89 +
90 + @coroutine
91 + def _check_call(self, cmd):
92 + """
93 + Run cmd and raise RepoStorageException on failure.
94 +
95 + @param cmd: command to executre
96 + @type cmd: list
97 + """
98 + p = SpawnProcess(args=cmd, scheduler=asyncio._wrap_loop(), **self._spawn_kwargs)
99 + p.start()
100 + if (yield p.async_wait()) != os.EX_OK:
101 + raise RepoStorageException('command exited with status {}: {}'.\
102 + format(p.returncode, ' '.join(cmd)))
103 +
104 + @coroutine
105 + def init_update(self):
106 + update_location = os.path.join(self._user_location, '.tmp-unverified-download-quarantine')
107 + yield self._check_call(['rm', '-rf', update_location])
108 +
109 + # Use rsync --link-dest to hardlink a files into self._update_location,
110 + # since cp -l is not portable.
111 + yield self._check_call(['rsync', '-a', '--link-dest', self._user_location,
112 + '--exclude', '/{}'.format(os.path.basename(update_location)),
113 + self._user_location + '/', update_location + '/'])
114 +
115 + self._update_location = update_location
116 +
117 + coroutine_return(self._update_location)
118 +
119 + @property
120 + def current_update(self):
121 + if self._update_location is None:
122 + raise RepoStorageException('current update does not exist')
123 + return self._update_location
124 +
125 + @coroutine
126 + def commit_update(self):
127 + update_location = self.current_update
128 + self._update_location = None
129 + yield self._check_call(['rsync', '-a', '--delete',
130 + '--exclude', '/{}'.format(os.path.basename(update_location)),
131 + update_location + '/', self._user_location + '/'])
132 +
133 + yield self._check_call(['rm', '-rf', update_location])
134 +
135 + @coroutine
136 + def abort_update(self):
137 + if self._update_location is not None:
138 + update_location = self._update_location
139 + self._update_location = None
140 + yield self._check_call(['rm', '-rf', update_location])
141 +
142 + @coroutine
143 + def garbage_collection(self):
144 + yield self.abort_update()
145
146 diff --git a/lib/portage/repository/storage/inplace.py b/lib/portage/repository/storage/inplace.py
147 new file mode 100644
148 index 000000000..f1117ad03
149 --- /dev/null
150 +++ b/lib/portage/repository/storage/inplace.py
151 @@ -0,0 +1,49 @@
152 +# Copyright 2018 Gentoo Foundation
153 +# Distributed under the terms of the GNU General Public License v2
154 +
155 +from portage.repository.storage.interface import (
156 + RepoStorageException,
157 + RepoStorageInterface,
158 +)
159 +from portage.util.futures.compat_coroutine import coroutine, coroutine_return
160 +
161 +
162 +class InplaceRepoStorage(RepoStorageInterface):
163 + """
164 + Legacy repo storage behavior, where updates are applied in-place.
165 + This module is not recommended, since the repository is left in an
166 + unspecified (possibly malicious) state if the update fails.
167 + """
168 + def __init__(self, repo, spawn_kwargs):
169 + self._user_location = repo.location
170 + self._update_location = None
171 +
172 + @coroutine
173 + def init_update(self):
174 + self._update_location = self._user_location
175 + coroutine_return(self._update_location)
176 + yield None
177 +
178 + @property
179 + def current_update(self):
180 + if self._update_location is None:
181 + raise RepoStorageException('current update does not exist')
182 + return self._update_location
183 +
184 + @coroutine
185 + def commit_update(self):
186 + self.current_update
187 + self._update_location = None
188 + coroutine_return()
189 + yield None
190 +
191 + @coroutine
192 + def abort_update(self):
193 + self._update_location = None
194 + coroutine_return()
195 + yield None
196 +
197 + @coroutine
198 + def garbage_collection(self):
199 + coroutine_return()
200 + yield None
201
202 diff --git a/lib/portage/repository/storage/interface.py b/lib/portage/repository/storage/interface.py
203 new file mode 100644
204 index 000000000..f83c42b84
205 --- /dev/null
206 +++ b/lib/portage/repository/storage/interface.py
207 @@ -0,0 +1,87 @@
208 +# Copyright 2018 Gentoo Foundation
209 +# Distributed under the terms of the GNU General Public License v2
210 +
211 +from portage.exception import PortageException
212 +from portage.util.futures.compat_coroutine import coroutine
213 +
214 +
215 +class RepoStorageException(PortageException):
216 + """
217 + Base class for exceptions raise by RepoStorageInterface.
218 + """
219 +
220 +
221 +class RepoStorageInterface(object):
222 + """
223 + Abstract repository storage interface.
224 +
225 + Implementations can assume that the repo.location directory already
226 + exists with appropriate permissions (SyncManager handles this).
227 +
228 + TODO: Add a method to check of a previous uncommitted update, which
229 + typically indicates a verification failure:
230 + https://bugs.gentoo.org/662386
231 + """
232 + def __init__(self, repo, spawn_kwargs):
233 + """
234 + @param repo: repository configuration
235 + @type repo: portage.repository.config.RepoConfig
236 + @param spawn_kwargs: keyword arguments supported by the
237 + portage.process.spawn function
238 + @type spawn_kwargs: dict
239 + """
240 + raise NotImplementedError
241 +
242 + @coroutine
243 + def init_update(self):
244 + """
245 + Create an update directory as a destination to sync updates to.
246 + The directory will be populated with files from the previous
247 + immutable snapshot, if available. Note that this directory
248 + may contain hardlinks that reference files in the previous
249 + immutable snapshot, so these files should not be modified
250 + (tools like rsync and git normally break hardlinks when
251 + files need to be modified).
252 +
253 + @rtype: str
254 + @return: path of directory to update, populated with files from
255 + the previous snapshot if available
256 + """
257 + raise NotImplementedError
258 +
259 + @property
260 + def current_update(self):
261 + """
262 + Get the current update directory which would have been returned
263 + from the most recent call to the init_update method. This raises
264 + RepoStorageException if the init_update method has not been
265 + called.
266 +
267 + @rtype: str
268 + @return: path of directory to update
269 + """
270 + raise NotImplementedError
271 +
272 + @coroutine
273 + def commit_update(self):
274 + """
275 + Commit the current update directory, so that is becomes the
276 + latest immutable snapshot.
277 + """
278 + raise NotImplementedError
279 +
280 + @coroutine
281 + def abort_update(self):
282 + """
283 + Delete the current update directory. If there was not an update
284 + in progress, or it has already been committed, then this has
285 + no effect.
286 + """
287 + raise NotImplementedError
288 +
289 + @coroutine
290 + def garbage_collection(self):
291 + """
292 + Remove expired snapshots.
293 + """
294 + raise NotImplementedError
295
296 diff --git a/lib/portage/sync/controller.py b/lib/portage/sync/controller.py
297 index 3bccf6f74..bf5750f7f 100644
298 --- a/lib/portage/sync/controller.py
299 +++ b/lib/portage/sync/controller.py
300 @@ -327,6 +327,7 @@ class SyncManager(object):
301 # override the defaults when sync_umask is set
302 if repo.sync_umask is not None:
303 spawn_kwargs["umask"] = int(repo.sync_umask, 8)
304 + spawn_kwargs.setdefault("umask", 0o022)
305 self.spawn_kwargs = spawn_kwargs
306
307 if self.usersync_uid is not None:
308
309 diff --git a/lib/portage/sync/modules/rsync/rsync.py b/lib/portage/sync/modules/rsync/rsync.py
310 index e0748794a..0f8221776 100644
311 --- a/lib/portage/sync/modules/rsync/rsync.py
312 +++ b/lib/portage/sync/modules/rsync/rsync.py
313 @@ -59,55 +59,6 @@ class RsyncSync(NewBase):
314 def __init__(self):
315 NewBase.__init__(self, "rsync", RSYNC_PACKAGE_ATOM)
316
317 - def _select_download_dir(self):
318 - '''
319 - Select and return the download directory. It's desirable to be able
320 - to create shared hardlinks between the download directory to the
321 - normal repository, and this is facilitated by making the download
322 - directory be a subdirectory of the normal repository location
323 - (ensuring that no mountpoints are crossed). Shared hardlinks are
324 - created by using the rsync --link-dest option.
325 -
326 - Since the download is initially unverified, it is safest to save
327 - it in a quarantine directory. The quarantine directory is also
328 - useful for making the repository update more atomic, so that it
329 - less likely that normal repository location will be observed in
330 - a partially synced state.
331 -
332 - This method returns a quarantine directory if sync-allow-hardlinks
333 - is enabled in repos.conf, and otherwise it returne the normal
334 - repository location.
335 - '''
336 - if self.repo.sync_allow_hardlinks:
337 - return os.path.join(self.repo.location, '.tmp-unverified-download-quarantine')
338 - else:
339 - return self.repo.location
340 -
341 - def _commit_download(self, download_dir):
342 - '''
343 - Commit changes from download_dir if it does not refer to the
344 - normal repository location.
345 - '''
346 - exitcode = 0
347 - if self.repo.location != download_dir:
348 - rsynccommand = [self.bin_command] + self.rsync_opts + self.extra_rsync_opts
349 - rsynccommand.append('--exclude=/%s' % os.path.basename(download_dir))
350 - rsynccommand.append('%s/' % download_dir.rstrip('/'))
351 - rsynccommand.append('%s/' % self.repo.location)
352 - exitcode = portage.process.spawn(rsynccommand, **self.spawn_kwargs)
353 -
354 - return exitcode
355 -
356 - def _remove_download(self, download_dir):
357 - """
358 - Remove download_dir if it does not refer to the normal repository
359 - location.
360 - """
361 - exitcode = 0
362 - if self.repo.location != download_dir:
363 - exitcode = subprocess.call(['rm', '-rf', download_dir])
364 - return exitcode
365 -
366 def update(self):
367 '''Internal update function which performs the transfer'''
368 opts = self.options.get('emerge_config').opts
369 @@ -143,8 +94,8 @@ class RsyncSync(NewBase):
370 self.extra_rsync_opts.extend(portage.util.shlex_split(
371 self.repo.module_specific_options['sync-rsync-extra-opts']))
372
373 - download_dir = self._select_download_dir()
374 exitcode = 0
375 + verify_failure = False
376
377 # Process GLEP74 verification options.
378 # Default verification to 'no'; it's enabled for ::gentoo
379 @@ -240,10 +191,14 @@ class RsyncSync(NewBase):
380 self.proto = "file"
381 dosyncuri = syncuri[7:]
382 unchanged, is_synced, exitcode, updatecache_flg = self._do_rsync(
383 - dosyncuri, timestamp, opts, download_dir)
384 + dosyncuri, timestamp, opts)
385 self._process_exitcode(exitcode, dosyncuri, out, 1)
386 - if exitcode == 0 and not unchanged:
387 - self._commit_download(download_dir)
388 + if exitcode == 0:
389 + if unchanged:
390 + self.repo_storage.abort_update()
391 + else:
392 + self.repo_storage.commit_update()
393 + self.repo_storage.garbage_collection()
394 return (exitcode, updatecache_flg)
395
396 retries=0
397 @@ -375,7 +330,7 @@ class RsyncSync(NewBase):
398 dosyncuri = dosyncuri[6:].replace('/', ':/', 1)
399
400 unchanged, is_synced, exitcode, updatecache_flg = self._do_rsync(
401 - dosyncuri, timestamp, opts, download_dir)
402 + dosyncuri, timestamp, opts)
403 if not unchanged:
404 local_state_unchanged = False
405 if is_synced:
406 @@ -390,6 +345,7 @@ class RsyncSync(NewBase):
407 # exit loop
408 exitcode = EXCEEDED_MAX_RETRIES
409 break
410 +
411 self._process_exitcode(exitcode, dosyncuri, out, maxretries)
412
413 if local_state_unchanged:
414 @@ -397,6 +353,8 @@ class RsyncSync(NewBase):
415 # in this case, so refer gemato to the normal repository
416 # location.
417 download_dir = self.repo.location
418 + else:
419 + download_dir = self.download_dir
420
421 # if synced successfully, verify now
422 if exitcode == 0 and self.verify_metamanifest:
423 @@ -448,14 +406,18 @@ class RsyncSync(NewBase):
424 % (e,),
425 level=logging.ERROR, noiselevel=-1)
426 exitcode = 1
427 + verify_failure = True
428
429 if exitcode == 0 and not local_state_unchanged:
430 - exitcode = self._commit_download(download_dir)
431 + self.repo_storage.commit_update()
432 + self.repo_storage.garbage_collection()
433
434 return (exitcode, updatecache_flg)
435 finally:
436 - if exitcode == 0:
437 - self._remove_download(download_dir)
438 + # Don't delete the update if verification failed, in case
439 + # the cause needs to be investigated.
440 + if not verify_failure:
441 + self.repo_storage.abort_update()
442 if openpgp_env is not None:
443 openpgp_env.close()
444
445 @@ -594,7 +556,7 @@ class RsyncSync(NewBase):
446 return rsync_opts
447
448
449 - def _do_rsync(self, syncuri, timestamp, opts, download_dir):
450 + def _do_rsync(self, syncuri, timestamp, opts):
451 updatecache_flg = False
452 is_synced = False
453 if timestamp != 0 and "--quiet" not in opts:
454 @@ -720,11 +682,6 @@ class RsyncSync(NewBase):
455 # actual sync
456 command = rsynccommand[:]
457
458 - if self.repo.location != download_dir:
459 - # Use shared hardlinks for files that are identical
460 - # in the previous snapshot of the repository.
461 - command.append('--link-dest=%s' % self.repo.location)
462 -
463 submodule_paths = self._get_submodule_paths()
464 if submodule_paths:
465 # The only way to select multiple directories to
466 @@ -738,7 +695,7 @@ class RsyncSync(NewBase):
467 else:
468 command.append(syncuri + "/")
469
470 - command.append(download_dir)
471 + command.append(self.download_dir)
472
473 exitcode = None
474 try:
475
476 diff --git a/lib/portage/sync/syncbase.py b/lib/portage/sync/syncbase.py
477 index ce69a4fc0..e9b6ede4e 100644
478 --- a/lib/portage/sync/syncbase.py
479 +++ b/lib/portage/sync/syncbase.py
480 @@ -12,9 +12,11 @@ import logging
481 import os
482
483 import portage
484 +from portage.repository.storage.interface import RepoStorageException
485 from portage.util import writemsg_level
486 from portage.util._eventloop.global_event_loop import global_event_loop
487 from portage.util.backoff import RandomExponentialBackoff
488 +from portage.util.futures._sync_decorator import _sync_methods
489 from portage.util.futures.retry import retry
490 from portage.util.futures.executor.fork import ForkExecutor
491 from . import _SUBMODULE_PATH_MAP
492 @@ -40,6 +42,8 @@ class SyncBase(object):
493 self.repo = None
494 self.xterm_titles = None
495 self.spawn_kwargs = None
496 + self._repo_storage = None
497 + self._download_dir = None
498 self.bin_command = None
499 self._bin_command = bin_command
500 self.bin_pkg = bin_pkg
501 @@ -49,7 +53,8 @@ class SyncBase(object):
502
503 @property
504 def has_bin(self):
505 - '''Checks for existance of the external binary.
506 + '''Checks for existance of the external binary, and also
507 + checks for storage driver configuration problems.
508
509 MUST only be called after _kwargs() has set the logger
510 '''
511 @@ -61,8 +66,15 @@ class SyncBase(object):
512 writemsg_level("!!! %s\n" % l,
513 level=logging.ERROR, noiselevel=-1)
514 return False
515 - return True
516
517 + try:
518 + self.repo_storage
519 + except RepoStorageException as e:
520 + writemsg_level("!!! %s\n" % (e,),
521 + level=logging.ERROR, noiselevel=-1)
522 + return False
523 +
524 + return True
525
526 def _kwargs(self, kwargs):
527 '''Sets internal variables from kwargs'''
528 @@ -73,6 +85,43 @@ class SyncBase(object):
529 self.xterm_titles = self.options.get('xterm_titles', False)
530 self.spawn_kwargs = self.options.get('spawn_kwargs', None)
531
532 + def _select_storage_module(self):
533 + '''
534 + Select an appropriate implementation of RepoStorageInterface, based
535 + on repos.conf settings.
536 +
537 + @rtype: str
538 + @return: name of the selected repo storage constructor
539 + '''
540 + if self.repo.sync_allow_hardlinks:
541 + mod_name = 'portage.repository.storage.hardlink_quarantine.HardlinkQuarantineRepoStorage'
542 + else:
543 + mod_name = 'portage.repository.storage.inplace.InplaceRepoStorage'
544 + return mod_name
545 +
546 + @property
547 + def repo_storage(self):
548 + """
549 + Get the repo storage driver instance. Raise RepoStorageException
550 + if there is a configuration problem
551 + """
552 + if self._repo_storage is None:
553 + storage_cls = portage.load_mod(self._select_storage_module())
554 + self._repo_storage = _sync_methods(storage_cls(self.repo, self.spawn_kwargs))
555 + return self._repo_storage
556 +
557 + @property
558 + def download_dir(self):
559 + """
560 + Get the path of the download directory, where the repository
561 + update is staged. The directory is initialized lazily, since
562 + the repository might already be at the latest revision, and
563 + there may be some cost associated with the directory
564 + initialization.
565 + """
566 + if self._download_dir is None:
567 + self._download_dir = self.repo_storage.init_update()
568 + return self._download_dir
569
570 def exists(self, **kwargs):
571 '''Tests whether the repo actually exists'''