Gentoo Archives: gentoo-portage-dev

From: Zac Medico <zmedico@g.o>
To: gentoo-portage-dev@l.g.o
Cc: Zac Medico <zmedico@g.o>
Subject: [gentoo-portage-dev] [PATCH 3/4] rsync: split out repo storage framework
Date: Mon, 06 Aug 2018 07:44:08
Message-Id: 20180806074033.30318-4-zmedico@gentoo.org
In Reply to: [gentoo-portage-dev] [PATCH 0/4] Add sync-rcu support for rsync (bug 662070) by Zac Medico
1 Since there aremany ways to manage repository storage, split out a repo
2 storage framework. The HardlinkQuarantineRepoStorage class implements
3 the existing default behavior, and the InplaceRepoStorage class
4 implements the legacy behavior (when sync-allow-hardlinks is disabled in
5 repos.conf).
6
7 Each class implements RepoStorageInterface, which uses coroutine methods
8 since coroutines are well-suited to the I/O bound tasks that these
9 methods perform. The _sync_decorator is used to convert coroutine
10 methods to synchronous methods, for smooth integration into the
11 surrounding synchronous code.
12
13 Bug: https://bugs.gentoo.org/662070
14 ---
15 lib/portage/repository/storage/__init__.py | 0
16 .../repository/storage/hardlink_quarantine.py | 95 ++++++++++++++++++++++
17 lib/portage/repository/storage/inplace.py | 49 +++++++++++
18 lib/portage/repository/storage/interface.py | 87 ++++++++++++++++++++
19 lib/portage/sync/controller.py | 1 +
20 lib/portage/sync/modules/rsync/rsync.py | 85 +++++--------------
21 lib/portage/sync/syncbase.py | 31 +++++++
22 7 files changed, 284 insertions(+), 64 deletions(-)
23 create mode 100644 lib/portage/repository/storage/__init__.py
24 create mode 100644 lib/portage/repository/storage/hardlink_quarantine.py
25 create mode 100644 lib/portage/repository/storage/inplace.py
26 create mode 100644 lib/portage/repository/storage/interface.py
27
28 diff --git a/lib/portage/repository/storage/__init__.py b/lib/portage/repository/storage/__init__.py
29 new file mode 100644
30 index 000000000..e69de29bb
31 diff --git a/lib/portage/repository/storage/hardlink_quarantine.py b/lib/portage/repository/storage/hardlink_quarantine.py
32 new file mode 100644
33 index 000000000..7e9cf4493
34 --- /dev/null
35 +++ b/lib/portage/repository/storage/hardlink_quarantine.py
36 @@ -0,0 +1,95 @@
37 +# Copyright 2018 Gentoo Foundation
38 +# Distributed under the terms of the GNU General Public License v2
39 +
40 +from portage import os
41 +from portage.repository.storage.interface import (
42 + RepoStorageException,
43 + RepoStorageInterface,
44 +)
45 +from portage.util.futures import asyncio
46 +from portage.util.futures.compat_coroutine import (
47 + coroutine,
48 + coroutine_return,
49 +)
50 +
51 +from _emerge.SpawnProcess import SpawnProcess
52 +
53 +
54 +class HardlinkQuarantineRepoStorage(RepoStorageInterface):
55 + """
56 + This is the default storage module, since its quite compatible with
57 + most configurations.
58 +
59 + It's desirable to be able to create shared hardlinks between the
60 + download directory and the normal repository, and this is facilitated
61 + by making the download directory be a subdirectory of the normal
62 + repository location (ensuring that no mountpoints are crossed).
63 + Shared hardlinks are created by using the rsync --link-dest option.
64 +
65 + Since the download is initially unverified, it is safest to save
66 + it in a quarantine directory. The quarantine directory is also
67 + useful for making the repository update more atomic, so that it
68 + less likely that normal repository location will be observed in
69 + a partially synced state.
70 + """
71 + def __init__(self, repo, spawn_kwargs):
72 + self._user_location = repo.location
73 + self._update_location = None
74 + self._spawn_kwargs = spawn_kwargs
75 + self._current_update = None
76 +
77 + @coroutine
78 + def _check_call(self, cmd):
79 + """
80 + Run cmd and raise RepoStorageException on failure.
81 +
82 + @param cmd: command to executre
83 + @type cmd: list
84 + """
85 + p = SpawnProcess(args=cmd, scheduler=asyncio._wrap_loop(), **self._spawn_kwargs)
86 + p.start()
87 + if (yield p.async_wait()) != os.EX_OK:
88 + raise RepoStorageException('command exited with status {}: {}'.\
89 + format(p.returncode, ' '.join(cmd)))
90 +
91 + @coroutine
92 + def init_update(self):
93 + update_location = os.path.join(self._user_location, '.tmp-unverified-download-quarantine')
94 + yield self._check_call(['rm', '-rf', update_location])
95 +
96 + # Use rsync --link-dest to hardlink a files into self._update_location,
97 + # since cp -l is not portable.
98 + yield self._check_call(['rsync', '-a', '--link-dest', self._user_location,
99 + '--exclude', '/{}'.format(os.path.basename(update_location)),
100 + self._user_location + '/', update_location + '/'])
101 +
102 + self._update_location = update_location
103 +
104 + coroutine_return(self._update_location)
105 +
106 + @property
107 + def current_update(self):
108 + if self._update_location is None:
109 + raise RepoStorageException('current update does not exist')
110 + return self._update_location
111 +
112 + @coroutine
113 + def commit_update(self):
114 + update_location = self.current_update
115 + self._update_location = None
116 + yield self._check_call(['rsync', '-a', '--delete',
117 + '--exclude', '/{}'.format(os.path.basename(update_location)),
118 + update_location + '/', self._user_location + '/'])
119 +
120 + yield self._check_call(['rm', '-rf', update_location])
121 +
122 + @coroutine
123 + def abort_update(self):
124 + if self._update_location is not None:
125 + update_location = self._update_location
126 + self._update_location = None
127 + yield self._check_call(['rm', '-rf', update_location])
128 +
129 + @coroutine
130 + def garbage_collection(self):
131 + yield self.abort_update()
132 diff --git a/lib/portage/repository/storage/inplace.py b/lib/portage/repository/storage/inplace.py
133 new file mode 100644
134 index 000000000..f1117ad03
135 --- /dev/null
136 +++ b/lib/portage/repository/storage/inplace.py
137 @@ -0,0 +1,49 @@
138 +# Copyright 2018 Gentoo Foundation
139 +# Distributed under the terms of the GNU General Public License v2
140 +
141 +from portage.repository.storage.interface import (
142 + RepoStorageException,
143 + RepoStorageInterface,
144 +)
145 +from portage.util.futures.compat_coroutine import coroutine, coroutine_return
146 +
147 +
148 +class InplaceRepoStorage(RepoStorageInterface):
149 + """
150 + Legacy repo storage behavior, where updates are applied in-place.
151 + This module is not recommended, since the repository is left in an
152 + unspecified (possibly malicious) state if the update fails.
153 + """
154 + def __init__(self, repo, spawn_kwargs):
155 + self._user_location = repo.location
156 + self._update_location = None
157 +
158 + @coroutine
159 + def init_update(self):
160 + self._update_location = self._user_location
161 + coroutine_return(self._update_location)
162 + yield None
163 +
164 + @property
165 + def current_update(self):
166 + if self._update_location is None:
167 + raise RepoStorageException('current update does not exist')
168 + return self._update_location
169 +
170 + @coroutine
171 + def commit_update(self):
172 + self.current_update
173 + self._update_location = None
174 + coroutine_return()
175 + yield None
176 +
177 + @coroutine
178 + def abort_update(self):
179 + self._update_location = None
180 + coroutine_return()
181 + yield None
182 +
183 + @coroutine
184 + def garbage_collection(self):
185 + coroutine_return()
186 + yield None
187 diff --git a/lib/portage/repository/storage/interface.py b/lib/portage/repository/storage/interface.py
188 new file mode 100644
189 index 000000000..f83c42b84
190 --- /dev/null
191 +++ b/lib/portage/repository/storage/interface.py
192 @@ -0,0 +1,87 @@
193 +# Copyright 2018 Gentoo Foundation
194 +# Distributed under the terms of the GNU General Public License v2
195 +
196 +from portage.exception import PortageException
197 +from portage.util.futures.compat_coroutine import coroutine
198 +
199 +
200 +class RepoStorageException(PortageException):
201 + """
202 + Base class for exceptions raise by RepoStorageInterface.
203 + """
204 +
205 +
206 +class RepoStorageInterface(object):
207 + """
208 + Abstract repository storage interface.
209 +
210 + Implementations can assume that the repo.location directory already
211 + exists with appropriate permissions (SyncManager handles this).
212 +
213 + TODO: Add a method to check of a previous uncommitted update, which
214 + typically indicates a verification failure:
215 + https://bugs.gentoo.org/662386
216 + """
217 + def __init__(self, repo, spawn_kwargs):
218 + """
219 + @param repo: repository configuration
220 + @type repo: portage.repository.config.RepoConfig
221 + @param spawn_kwargs: keyword arguments supported by the
222 + portage.process.spawn function
223 + @type spawn_kwargs: dict
224 + """
225 + raise NotImplementedError
226 +
227 + @coroutine
228 + def init_update(self):
229 + """
230 + Create an update directory as a destination to sync updates to.
231 + The directory will be populated with files from the previous
232 + immutable snapshot, if available. Note that this directory
233 + may contain hardlinks that reference files in the previous
234 + immutable snapshot, so these files should not be modified
235 + (tools like rsync and git normally break hardlinks when
236 + files need to be modified).
237 +
238 + @rtype: str
239 + @return: path of directory to update, populated with files from
240 + the previous snapshot if available
241 + """
242 + raise NotImplementedError
243 +
244 + @property
245 + def current_update(self):
246 + """
247 + Get the current update directory which would have been returned
248 + from the most recent call to the init_update method. This raises
249 + RepoStorageException if the init_update method has not been
250 + called.
251 +
252 + @rtype: str
253 + @return: path of directory to update
254 + """
255 + raise NotImplementedError
256 +
257 + @coroutine
258 + def commit_update(self):
259 + """
260 + Commit the current update directory, so that is becomes the
261 + latest immutable snapshot.
262 + """
263 + raise NotImplementedError
264 +
265 + @coroutine
266 + def abort_update(self):
267 + """
268 + Delete the current update directory. If there was not an update
269 + in progress, or it has already been committed, then this has
270 + no effect.
271 + """
272 + raise NotImplementedError
273 +
274 + @coroutine
275 + def garbage_collection(self):
276 + """
277 + Remove expired snapshots.
278 + """
279 + raise NotImplementedError
280 diff --git a/lib/portage/sync/controller.py b/lib/portage/sync/controller.py
281 index 3bccf6f74..bf5750f7f 100644
282 --- a/lib/portage/sync/controller.py
283 +++ b/lib/portage/sync/controller.py
284 @@ -327,6 +327,7 @@ class SyncManager(object):
285 # override the defaults when sync_umask is set
286 if repo.sync_umask is not None:
287 spawn_kwargs["umask"] = int(repo.sync_umask, 8)
288 + spawn_kwargs.setdefault("umask", 0o022)
289 self.spawn_kwargs = spawn_kwargs
290
291 if self.usersync_uid is not None:
292 diff --git a/lib/portage/sync/modules/rsync/rsync.py b/lib/portage/sync/modules/rsync/rsync.py
293 index 56e38631e..17b1b9e7b 100644
294 --- a/lib/portage/sync/modules/rsync/rsync.py
295 +++ b/lib/portage/sync/modules/rsync/rsync.py
296 @@ -59,55 +59,6 @@ class RsyncSync(NewBase):
297 def __init__(self):
298 NewBase.__init__(self, "rsync", RSYNC_PACKAGE_ATOM)
299
300 - def _select_download_dir(self):
301 - '''
302 - Select and return the download directory. It's desirable to be able
303 - to create shared hardlinks between the download directory to the
304 - normal repository, and this is facilitated by making the download
305 - directory be a subdirectory of the normal repository location
306 - (ensuring that no mountpoints are crossed). Shared hardlinks are
307 - created by using the rsync --link-dest option.
308 -
309 - Since the download is initially unverified, it is safest to save
310 - it in a quarantine directory. The quarantine directory is also
311 - useful for making the repository update more atomic, so that it
312 - less likely that normal repository location will be observed in
313 - a partially synced state.
314 -
315 - This method returns a quarantine directory if sync-allow-hardlinks
316 - is enabled in repos.conf, and otherwise it returne the normal
317 - repository location.
318 - '''
319 - if self.repo.sync_allow_hardlinks:
320 - return os.path.join(self.repo.location, '.tmp-unverified-download-quarantine')
321 - else:
322 - return self.repo.location
323 -
324 - def _commit_download(self, download_dir):
325 - '''
326 - Commit changes from download_dir if it does not refer to the
327 - normal repository location.
328 - '''
329 - exitcode = 0
330 - if self.repo.location != download_dir:
331 - rsynccommand = [self.bin_command] + self.rsync_opts + self.extra_rsync_opts
332 - rsynccommand.append('--exclude=/%s' % os.path.basename(download_dir))
333 - rsynccommand.append('%s/' % download_dir.rstrip('/'))
334 - rsynccommand.append('%s/' % self.repo.location)
335 - exitcode = portage.process.spawn(rsynccommand, **self.spawn_kwargs)
336 -
337 - return exitcode
338 -
339 - def _remove_download(self, download_dir):
340 - """
341 - Remove download_dir if it does not refer to the normal repository
342 - location.
343 - """
344 - exitcode = 0
345 - if self.repo.location != download_dir:
346 - exitcode = subprocess.call(['rm', '-rf', download_dir])
347 - return exitcode
348 -
349 def update(self):
350 '''Internal update function which performs the transfer'''
351 opts = self.options.get('emerge_config').opts
352 @@ -143,8 +94,8 @@ class RsyncSync(NewBase):
353 self.extra_rsync_opts.extend(portage.util.shlex_split(
354 self.repo.module_specific_options['sync-rsync-extra-opts']))
355
356 - download_dir = self._select_download_dir()
357 exitcode = 0
358 + verify_failure = False
359
360 # Process GLEP74 verification options.
361 # Default verification to 'no'; it's enabled for ::gentoo
362 @@ -240,10 +191,14 @@ class RsyncSync(NewBase):
363 self.proto = "file"
364 dosyncuri = syncuri[7:]
365 unchanged, is_synced, exitcode, updatecache_flg = self._do_rsync(
366 - dosyncuri, timestamp, opts, download_dir)
367 + dosyncuri, timestamp, opts)
368 self._process_exitcode(exitcode, dosyncuri, out, 1)
369 - if exitcode == 0 and not unchanged:
370 - self._commit_download(download_dir)
371 + if exitcode == 0:
372 + if unchanged:
373 + self.repo_storage.abort_update()
374 + else:
375 + self.repo_storage.commit_update()
376 + self.repo_storage.garbage_collection()
377 return (exitcode, updatecache_flg)
378
379 retries=0
380 @@ -375,7 +330,7 @@ class RsyncSync(NewBase):
381 dosyncuri = dosyncuri[6:].replace('/', ':/', 1)
382
383 unchanged, is_synced, exitcode, updatecache_flg = self._do_rsync(
384 - dosyncuri, timestamp, opts, download_dir)
385 + dosyncuri, timestamp, opts)
386 if not unchanged:
387 local_state_unchanged = False
388 if is_synced:
389 @@ -390,6 +345,7 @@ class RsyncSync(NewBase):
390 # exit loop
391 exitcode = EXCEEDED_MAX_RETRIES
392 break
393 +
394 self._process_exitcode(exitcode, dosyncuri, out, maxretries)
395
396 if local_state_unchanged:
397 @@ -397,6 +353,8 @@ class RsyncSync(NewBase):
398 # in this case, so refer gemato to the normal repository
399 # location.
400 download_dir = self.repo.location
401 + else:
402 + download_dir = self.download_dir
403
404 # if synced successfully, verify now
405 if exitcode == 0 and self.verify_metamanifest:
406 @@ -448,14 +406,18 @@ class RsyncSync(NewBase):
407 % (e,),
408 level=logging.ERROR, noiselevel=-1)
409 exitcode = 1
410 + verify_failure = True
411
412 if exitcode == 0 and not local_state_unchanged:
413 - exitcode = self._commit_download(download_dir)
414 + self.repo_storage.commit_update()
415 + self.repo_storage.garbage_collection()
416
417 return (exitcode, updatecache_flg)
418 finally:
419 - if exitcode == 0:
420 - self._remove_download(download_dir)
421 + # Don't delete the update if verification failed, in case
422 + # the cause needs to be investigated.
423 + if not verify_failure:
424 + self.repo_storage.abort_update()
425 if openpgp_env is not None:
426 openpgp_env.close()
427
428 @@ -594,7 +556,7 @@ class RsyncSync(NewBase):
429 return rsync_opts
430
431
432 - def _do_rsync(self, syncuri, timestamp, opts, download_dir):
433 + def _do_rsync(self, syncuri, timestamp, opts):
434 updatecache_flg = False
435 is_synced = False
436 if timestamp != 0 and "--quiet" not in opts:
437 @@ -720,11 +682,6 @@ class RsyncSync(NewBase):
438 # actual sync
439 command = rsynccommand[:]
440
441 - if self.repo.location != download_dir:
442 - # Use shared hardlinks for files that are identical
443 - # in the previous snapshot of the repository.
444 - command.append('--link-dest=%s' % self.repo.location)
445 -
446 submodule_paths = self._get_submodule_paths()
447 if submodule_paths:
448 # The only way to select multiple directories to
449 @@ -738,7 +695,7 @@ class RsyncSync(NewBase):
450 else:
451 command.append(syncuri + "/")
452
453 - command.append(download_dir)
454 + command.append(self.download_dir)
455
456 exitcode = None
457 try:
458 diff --git a/lib/portage/sync/syncbase.py b/lib/portage/sync/syncbase.py
459 index ce69a4fc0..1d2a00b7c 100644
460 --- a/lib/portage/sync/syncbase.py
461 +++ b/lib/portage/sync/syncbase.py
462 @@ -15,6 +15,7 @@ import portage
463 from portage.util import writemsg_level
464 from portage.util._eventloop.global_event_loop import global_event_loop
465 from portage.util.backoff import RandomExponentialBackoff
466 +from portage.util.futures._sync_decorator import _sync_methods
467 from portage.util.futures.retry import retry
468 from portage.util.futures.executor.fork import ForkExecutor
469 from . import _SUBMODULE_PATH_MAP
470 @@ -40,6 +41,8 @@ class SyncBase(object):
471 self.repo = None
472 self.xterm_titles = None
473 self.spawn_kwargs = None
474 + self.repo_storage = None
475 + self._download_dir = None
476 self.bin_command = None
477 self._bin_command = bin_command
478 self.bin_pkg = bin_pkg
479 @@ -72,7 +75,35 @@ class SyncBase(object):
480 self.repo = self.options.get('repo', None)
481 self.xterm_titles = self.options.get('xterm_titles', False)
482 self.spawn_kwargs = self.options.get('spawn_kwargs', None)
483 + storage_cls = portage.load_mod(self._select_storage_module())
484 + self.repo_storage = _sync_methods(storage_cls(self.repo, self.spawn_kwargs))
485
486 + def _select_storage_module(self):
487 + '''
488 + Select an appropriate implementation of RepoStorageInterface, based
489 + on repos.conf settings.
490 +
491 + @rtype: str
492 + @return: name of the selected repo storage constructor
493 + '''
494 + if self.repo.sync_allow_hardlinks:
495 + mod_name = 'portage.repository.storage.hardlink_quarantine.HardlinkQuarantineRepoStorage'
496 + else:
497 + mod_name = 'portage.repository.storage.inplace.InplaceRepoStorage'
498 + return mod_name
499 +
500 + @property
501 + def download_dir(self):
502 + """
503 + Get the path of the download directory, where the repository
504 + update is staged. The directory is initialized lazily, since
505 + the repository might already be at the latest revision, and
506 + there may be some cost associated with the directory
507 + initialization.
508 + """
509 + if self._download_dir is None:
510 + self._download_dir = self.repo_storage.init_update()
511 + return self._download_dir
512
513 def exists(self, **kwargs):
514 '''Tests whether the repo actually exists'''
515 --
516 2.16.4

Replies