1 |
Since there aremany ways to manage repository storage, split out a repo |
2 |
storage framework. The HardlinkQuarantineRepoStorage class implements |
3 |
the existing default behavior, and the InplaceRepoStorage class |
4 |
implements the legacy behavior (when sync-allow-hardlinks is disabled in |
5 |
repos.conf). |
6 |
|
7 |
Each class implements RepoStorageInterface, which uses coroutine methods |
8 |
since coroutines are well-suited to the I/O bound tasks that these |
9 |
methods perform. The _sync_decorator is used to convert coroutine |
10 |
methods to synchronous methods, for smooth integration into the |
11 |
surrounding synchronous code. |
12 |
|
13 |
Bug: https://bugs.gentoo.org/662070 |
14 |
--- |
15 |
lib/portage/repository/storage/__init__.py | 0 |
16 |
.../repository/storage/hardlink_quarantine.py | 95 ++++++++++++++++++++++ |
17 |
lib/portage/repository/storage/inplace.py | 49 +++++++++++ |
18 |
lib/portage/repository/storage/interface.py | 87 ++++++++++++++++++++ |
19 |
lib/portage/sync/controller.py | 1 + |
20 |
lib/portage/sync/modules/rsync/rsync.py | 85 +++++-------------- |
21 |
lib/portage/sync/syncbase.py | 31 +++++++ |
22 |
7 files changed, 284 insertions(+), 64 deletions(-) |
23 |
create mode 100644 lib/portage/repository/storage/__init__.py |
24 |
create mode 100644 lib/portage/repository/storage/hardlink_quarantine.py |
25 |
create mode 100644 lib/portage/repository/storage/inplace.py |
26 |
create mode 100644 lib/portage/repository/storage/interface.py |
27 |
|
28 |
diff --git a/lib/portage/repository/storage/__init__.py b/lib/portage/repository/storage/__init__.py |
29 |
new file mode 100644 |
30 |
index 000000000..e69de29bb |
31 |
diff --git a/lib/portage/repository/storage/hardlink_quarantine.py b/lib/portage/repository/storage/hardlink_quarantine.py |
32 |
new file mode 100644 |
33 |
index 000000000..7e9cf4493 |
34 |
--- /dev/null |
35 |
+++ b/lib/portage/repository/storage/hardlink_quarantine.py |
36 |
@@ -0,0 +1,95 @@ |
37 |
+# Copyright 2018 Gentoo Foundation |
38 |
+# Distributed under the terms of the GNU General Public License v2 |
39 |
+ |
40 |
+from portage import os |
41 |
+from portage.repository.storage.interface import ( |
42 |
+ RepoStorageException, |
43 |
+ RepoStorageInterface, |
44 |
+) |
45 |
+from portage.util.futures import asyncio |
46 |
+from portage.util.futures.compat_coroutine import ( |
47 |
+ coroutine, |
48 |
+ coroutine_return, |
49 |
+) |
50 |
+ |
51 |
+from _emerge.SpawnProcess import SpawnProcess |
52 |
+ |
53 |
+ |
54 |
+class HardlinkQuarantineRepoStorage(RepoStorageInterface): |
55 |
+ """ |
56 |
+ This is the default storage module, since its quite compatible with |
57 |
+ most configurations. |
58 |
+ |
59 |
+ It's desirable to be able to create shared hardlinks between the |
60 |
+ download directory and the normal repository, and this is facilitated |
61 |
+ by making the download directory be a subdirectory of the normal |
62 |
+ repository location (ensuring that no mountpoints are crossed). |
63 |
+ Shared hardlinks are created by using the rsync --link-dest option. |
64 |
+ |
65 |
+ Since the download is initially unverified, it is safest to save |
66 |
+ it in a quarantine directory. The quarantine directory is also |
67 |
+ useful for making the repository update more atomic, so that it |
68 |
+ less likely that normal repository location will be observed in |
69 |
+ a partially synced state. |
70 |
+ """ |
71 |
+ def __init__(self, repo, spawn_kwargs): |
72 |
+ self._user_location = repo.location |
73 |
+ self._update_location = None |
74 |
+ self._spawn_kwargs = spawn_kwargs |
75 |
+ self._current_update = None |
76 |
+ |
77 |
+ @coroutine |
78 |
+ def _check_call(self, cmd): |
79 |
+ """ |
80 |
+ Run cmd and raise RepoStorageException on failure. |
81 |
+ |
82 |
+ @param cmd: command to executre |
83 |
+ @type cmd: list |
84 |
+ """ |
85 |
+ p = SpawnProcess(args=cmd, scheduler=asyncio._wrap_loop(), **self._spawn_kwargs) |
86 |
+ p.start() |
87 |
+ if (yield p.async_wait()) != os.EX_OK: |
88 |
+ raise RepoStorageException('command exited with status {}: {}'.\ |
89 |
+ format(p.returncode, ' '.join(cmd))) |
90 |
+ |
91 |
+ @coroutine |
92 |
+ def init_update(self): |
93 |
+ update_location = os.path.join(self._user_location, '.tmp-unverified-download-quarantine') |
94 |
+ yield self._check_call(['rm', '-rf', update_location]) |
95 |
+ |
96 |
+ # Use rsync --link-dest to hardlink a files into self._update_location, |
97 |
+ # since cp -l is not portable. |
98 |
+ yield self._check_call(['rsync', '-a', '--link-dest', self._user_location, |
99 |
+ '--exclude', '/{}'.format(os.path.basename(update_location)), |
100 |
+ self._user_location + '/', update_location + '/']) |
101 |
+ |
102 |
+ self._update_location = update_location |
103 |
+ |
104 |
+ coroutine_return(self._update_location) |
105 |
+ |
106 |
+ @property |
107 |
+ def current_update(self): |
108 |
+ if self._update_location is None: |
109 |
+ raise RepoStorageException('current update does not exist') |
110 |
+ return self._update_location |
111 |
+ |
112 |
+ @coroutine |
113 |
+ def commit_update(self): |
114 |
+ update_location = self.current_update |
115 |
+ self._update_location = None |
116 |
+ yield self._check_call(['rsync', '-a', '--delete', |
117 |
+ '--exclude', '/{}'.format(os.path.basename(update_location)), |
118 |
+ update_location + '/', self._user_location + '/']) |
119 |
+ |
120 |
+ yield self._check_call(['rm', '-rf', update_location]) |
121 |
+ |
122 |
+ @coroutine |
123 |
+ def abort_update(self): |
124 |
+ if self._update_location is not None: |
125 |
+ update_location = self._update_location |
126 |
+ self._update_location = None |
127 |
+ yield self._check_call(['rm', '-rf', update_location]) |
128 |
+ |
129 |
+ @coroutine |
130 |
+ def garbage_collection(self): |
131 |
+ yield self.abort_update() |
132 |
diff --git a/lib/portage/repository/storage/inplace.py b/lib/portage/repository/storage/inplace.py |
133 |
new file mode 100644 |
134 |
index 000000000..f1117ad03 |
135 |
--- /dev/null |
136 |
+++ b/lib/portage/repository/storage/inplace.py |
137 |
@@ -0,0 +1,49 @@ |
138 |
+# Copyright 2018 Gentoo Foundation |
139 |
+# Distributed under the terms of the GNU General Public License v2 |
140 |
+ |
141 |
+from portage.repository.storage.interface import ( |
142 |
+ RepoStorageException, |
143 |
+ RepoStorageInterface, |
144 |
+) |
145 |
+from portage.util.futures.compat_coroutine import coroutine, coroutine_return |
146 |
+ |
147 |
+ |
148 |
+class InplaceRepoStorage(RepoStorageInterface): |
149 |
+ """ |
150 |
+ Legacy repo storage behavior, where updates are applied in-place. |
151 |
+ This module is not recommended, since the repository is left in an |
152 |
+ unspecified (possibly malicious) state if the update fails. |
153 |
+ """ |
154 |
+ def __init__(self, repo, spawn_kwargs): |
155 |
+ self._user_location = repo.location |
156 |
+ self._update_location = None |
157 |
+ |
158 |
+ @coroutine |
159 |
+ def init_update(self): |
160 |
+ self._update_location = self._user_location |
161 |
+ coroutine_return(self._update_location) |
162 |
+ yield None |
163 |
+ |
164 |
+ @property |
165 |
+ def current_update(self): |
166 |
+ if self._update_location is None: |
167 |
+ raise RepoStorageException('current update does not exist') |
168 |
+ return self._update_location |
169 |
+ |
170 |
+ @coroutine |
171 |
+ def commit_update(self): |
172 |
+ self.current_update |
173 |
+ self._update_location = None |
174 |
+ coroutine_return() |
175 |
+ yield None |
176 |
+ |
177 |
+ @coroutine |
178 |
+ def abort_update(self): |
179 |
+ self._update_location = None |
180 |
+ coroutine_return() |
181 |
+ yield None |
182 |
+ |
183 |
+ @coroutine |
184 |
+ def garbage_collection(self): |
185 |
+ coroutine_return() |
186 |
+ yield None |
187 |
diff --git a/lib/portage/repository/storage/interface.py b/lib/portage/repository/storage/interface.py |
188 |
new file mode 100644 |
189 |
index 000000000..f83c42b84 |
190 |
--- /dev/null |
191 |
+++ b/lib/portage/repository/storage/interface.py |
192 |
@@ -0,0 +1,87 @@ |
193 |
+# Copyright 2018 Gentoo Foundation |
194 |
+# Distributed under the terms of the GNU General Public License v2 |
195 |
+ |
196 |
+from portage.exception import PortageException |
197 |
+from portage.util.futures.compat_coroutine import coroutine |
198 |
+ |
199 |
+ |
200 |
+class RepoStorageException(PortageException): |
201 |
+ """ |
202 |
+ Base class for exceptions raise by RepoStorageInterface. |
203 |
+ """ |
204 |
+ |
205 |
+ |
206 |
+class RepoStorageInterface(object): |
207 |
+ """ |
208 |
+ Abstract repository storage interface. |
209 |
+ |
210 |
+ Implementations can assume that the repo.location directory already |
211 |
+ exists with appropriate permissions (SyncManager handles this). |
212 |
+ |
213 |
+ TODO: Add a method to check of a previous uncommitted update, which |
214 |
+ typically indicates a verification failure: |
215 |
+ https://bugs.gentoo.org/662386 |
216 |
+ """ |
217 |
+ def __init__(self, repo, spawn_kwargs): |
218 |
+ """ |
219 |
+ @param repo: repository configuration |
220 |
+ @type repo: portage.repository.config.RepoConfig |
221 |
+ @param spawn_kwargs: keyword arguments supported by the |
222 |
+ portage.process.spawn function |
223 |
+ @type spawn_kwargs: dict |
224 |
+ """ |
225 |
+ raise NotImplementedError |
226 |
+ |
227 |
+ @coroutine |
228 |
+ def init_update(self): |
229 |
+ """ |
230 |
+ Create an update directory as a destination to sync updates to. |
231 |
+ The directory will be populated with files from the previous |
232 |
+ immutable snapshot, if available. Note that this directory |
233 |
+ may contain hardlinks that reference files in the previous |
234 |
+ immutable snapshot, so these files should not be modified |
235 |
+ (tools like rsync and git normally break hardlinks when |
236 |
+ files need to be modified). |
237 |
+ |
238 |
+ @rtype: str |
239 |
+ @return: path of directory to update, populated with files from |
240 |
+ the previous snapshot if available |
241 |
+ """ |
242 |
+ raise NotImplementedError |
243 |
+ |
244 |
+ @property |
245 |
+ def current_update(self): |
246 |
+ """ |
247 |
+ Get the current update directory which would have been returned |
248 |
+ from the most recent call to the init_update method. This raises |
249 |
+ RepoStorageException if the init_update method has not been |
250 |
+ called. |
251 |
+ |
252 |
+ @rtype: str |
253 |
+ @return: path of directory to update |
254 |
+ """ |
255 |
+ raise NotImplementedError |
256 |
+ |
257 |
+ @coroutine |
258 |
+ def commit_update(self): |
259 |
+ """ |
260 |
+ Commit the current update directory, so that is becomes the |
261 |
+ latest immutable snapshot. |
262 |
+ """ |
263 |
+ raise NotImplementedError |
264 |
+ |
265 |
+ @coroutine |
266 |
+ def abort_update(self): |
267 |
+ """ |
268 |
+ Delete the current update directory. If there was not an update |
269 |
+ in progress, or it has already been committed, then this has |
270 |
+ no effect. |
271 |
+ """ |
272 |
+ raise NotImplementedError |
273 |
+ |
274 |
+ @coroutine |
275 |
+ def garbage_collection(self): |
276 |
+ """ |
277 |
+ Remove expired snapshots. |
278 |
+ """ |
279 |
+ raise NotImplementedError |
280 |
diff --git a/lib/portage/sync/controller.py b/lib/portage/sync/controller.py |
281 |
index 3bccf6f74..bf5750f7f 100644 |
282 |
--- a/lib/portage/sync/controller.py |
283 |
+++ b/lib/portage/sync/controller.py |
284 |
@@ -327,6 +327,7 @@ class SyncManager(object): |
285 |
# override the defaults when sync_umask is set |
286 |
if repo.sync_umask is not None: |
287 |
spawn_kwargs["umask"] = int(repo.sync_umask, 8) |
288 |
+ spawn_kwargs.setdefault("umask", 0o022) |
289 |
self.spawn_kwargs = spawn_kwargs |
290 |
|
291 |
if self.usersync_uid is not None: |
292 |
diff --git a/lib/portage/sync/modules/rsync/rsync.py b/lib/portage/sync/modules/rsync/rsync.py |
293 |
index 56e38631e..17b1b9e7b 100644 |
294 |
--- a/lib/portage/sync/modules/rsync/rsync.py |
295 |
+++ b/lib/portage/sync/modules/rsync/rsync.py |
296 |
@@ -59,55 +59,6 @@ class RsyncSync(NewBase): |
297 |
def __init__(self): |
298 |
NewBase.__init__(self, "rsync", RSYNC_PACKAGE_ATOM) |
299 |
|
300 |
- def _select_download_dir(self): |
301 |
- ''' |
302 |
- Select and return the download directory. It's desirable to be able |
303 |
- to create shared hardlinks between the download directory to the |
304 |
- normal repository, and this is facilitated by making the download |
305 |
- directory be a subdirectory of the normal repository location |
306 |
- (ensuring that no mountpoints are crossed). Shared hardlinks are |
307 |
- created by using the rsync --link-dest option. |
308 |
- |
309 |
- Since the download is initially unverified, it is safest to save |
310 |
- it in a quarantine directory. The quarantine directory is also |
311 |
- useful for making the repository update more atomic, so that it |
312 |
- less likely that normal repository location will be observed in |
313 |
- a partially synced state. |
314 |
- |
315 |
- This method returns a quarantine directory if sync-allow-hardlinks |
316 |
- is enabled in repos.conf, and otherwise it returne the normal |
317 |
- repository location. |
318 |
- ''' |
319 |
- if self.repo.sync_allow_hardlinks: |
320 |
- return os.path.join(self.repo.location, '.tmp-unverified-download-quarantine') |
321 |
- else: |
322 |
- return self.repo.location |
323 |
- |
324 |
- def _commit_download(self, download_dir): |
325 |
- ''' |
326 |
- Commit changes from download_dir if it does not refer to the |
327 |
- normal repository location. |
328 |
- ''' |
329 |
- exitcode = 0 |
330 |
- if self.repo.location != download_dir: |
331 |
- rsynccommand = [self.bin_command] + self.rsync_opts + self.extra_rsync_opts |
332 |
- rsynccommand.append('--exclude=/%s' % os.path.basename(download_dir)) |
333 |
- rsynccommand.append('%s/' % download_dir.rstrip('/')) |
334 |
- rsynccommand.append('%s/' % self.repo.location) |
335 |
- exitcode = portage.process.spawn(rsynccommand, **self.spawn_kwargs) |
336 |
- |
337 |
- return exitcode |
338 |
- |
339 |
- def _remove_download(self, download_dir): |
340 |
- """ |
341 |
- Remove download_dir if it does not refer to the normal repository |
342 |
- location. |
343 |
- """ |
344 |
- exitcode = 0 |
345 |
- if self.repo.location != download_dir: |
346 |
- exitcode = subprocess.call(['rm', '-rf', download_dir]) |
347 |
- return exitcode |
348 |
- |
349 |
def update(self): |
350 |
'''Internal update function which performs the transfer''' |
351 |
opts = self.options.get('emerge_config').opts |
352 |
@@ -143,8 +94,8 @@ class RsyncSync(NewBase): |
353 |
self.extra_rsync_opts.extend(portage.util.shlex_split( |
354 |
self.repo.module_specific_options['sync-rsync-extra-opts'])) |
355 |
|
356 |
- download_dir = self._select_download_dir() |
357 |
exitcode = 0 |
358 |
+ verify_failure = False |
359 |
|
360 |
# Process GLEP74 verification options. |
361 |
# Default verification to 'no'; it's enabled for ::gentoo |
362 |
@@ -240,10 +191,14 @@ class RsyncSync(NewBase): |
363 |
self.proto = "file" |
364 |
dosyncuri = syncuri[7:] |
365 |
unchanged, is_synced, exitcode, updatecache_flg = self._do_rsync( |
366 |
- dosyncuri, timestamp, opts, download_dir) |
367 |
+ dosyncuri, timestamp, opts) |
368 |
self._process_exitcode(exitcode, dosyncuri, out, 1) |
369 |
- if exitcode == 0 and not unchanged: |
370 |
- self._commit_download(download_dir) |
371 |
+ if exitcode == 0: |
372 |
+ if unchanged: |
373 |
+ self.repo_storage.abort_update() |
374 |
+ else: |
375 |
+ self.repo_storage.commit_update() |
376 |
+ self.repo_storage.garbage_collection() |
377 |
return (exitcode, updatecache_flg) |
378 |
|
379 |
retries=0 |
380 |
@@ -375,7 +330,7 @@ class RsyncSync(NewBase): |
381 |
dosyncuri = dosyncuri[6:].replace('/', ':/', 1) |
382 |
|
383 |
unchanged, is_synced, exitcode, updatecache_flg = self._do_rsync( |
384 |
- dosyncuri, timestamp, opts, download_dir) |
385 |
+ dosyncuri, timestamp, opts) |
386 |
if not unchanged: |
387 |
local_state_unchanged = False |
388 |
if is_synced: |
389 |
@@ -390,6 +345,7 @@ class RsyncSync(NewBase): |
390 |
# exit loop |
391 |
exitcode = EXCEEDED_MAX_RETRIES |
392 |
break |
393 |
+ |
394 |
self._process_exitcode(exitcode, dosyncuri, out, maxretries) |
395 |
|
396 |
if local_state_unchanged: |
397 |
@@ -397,6 +353,8 @@ class RsyncSync(NewBase): |
398 |
# in this case, so refer gemato to the normal repository |
399 |
# location. |
400 |
download_dir = self.repo.location |
401 |
+ else: |
402 |
+ download_dir = self.download_dir |
403 |
|
404 |
# if synced successfully, verify now |
405 |
if exitcode == 0 and self.verify_metamanifest: |
406 |
@@ -448,14 +406,18 @@ class RsyncSync(NewBase): |
407 |
% (e,), |
408 |
level=logging.ERROR, noiselevel=-1) |
409 |
exitcode = 1 |
410 |
+ verify_failure = True |
411 |
|
412 |
if exitcode == 0 and not local_state_unchanged: |
413 |
- exitcode = self._commit_download(download_dir) |
414 |
+ self.repo_storage.commit_update() |
415 |
+ self.repo_storage.garbage_collection() |
416 |
|
417 |
return (exitcode, updatecache_flg) |
418 |
finally: |
419 |
- if exitcode == 0: |
420 |
- self._remove_download(download_dir) |
421 |
+ # Don't delete the update if verification failed, in case |
422 |
+ # the cause needs to be investigated. |
423 |
+ if not verify_failure: |
424 |
+ self.repo_storage.abort_update() |
425 |
if openpgp_env is not None: |
426 |
openpgp_env.close() |
427 |
|
428 |
@@ -594,7 +556,7 @@ class RsyncSync(NewBase): |
429 |
return rsync_opts |
430 |
|
431 |
|
432 |
- def _do_rsync(self, syncuri, timestamp, opts, download_dir): |
433 |
+ def _do_rsync(self, syncuri, timestamp, opts): |
434 |
updatecache_flg = False |
435 |
is_synced = False |
436 |
if timestamp != 0 and "--quiet" not in opts: |
437 |
@@ -720,11 +682,6 @@ class RsyncSync(NewBase): |
438 |
# actual sync |
439 |
command = rsynccommand[:] |
440 |
|
441 |
- if self.repo.location != download_dir: |
442 |
- # Use shared hardlinks for files that are identical |
443 |
- # in the previous snapshot of the repository. |
444 |
- command.append('--link-dest=%s' % self.repo.location) |
445 |
- |
446 |
submodule_paths = self._get_submodule_paths() |
447 |
if submodule_paths: |
448 |
# The only way to select multiple directories to |
449 |
@@ -738,7 +695,7 @@ class RsyncSync(NewBase): |
450 |
else: |
451 |
command.append(syncuri + "/") |
452 |
|
453 |
- command.append(download_dir) |
454 |
+ command.append(self.download_dir) |
455 |
|
456 |
exitcode = None |
457 |
try: |
458 |
diff --git a/lib/portage/sync/syncbase.py b/lib/portage/sync/syncbase.py |
459 |
index ce69a4fc0..1d2a00b7c 100644 |
460 |
--- a/lib/portage/sync/syncbase.py |
461 |
+++ b/lib/portage/sync/syncbase.py |
462 |
@@ -15,6 +15,7 @@ import portage |
463 |
from portage.util import writemsg_level |
464 |
from portage.util._eventloop.global_event_loop import global_event_loop |
465 |
from portage.util.backoff import RandomExponentialBackoff |
466 |
+from portage.util.futures._sync_decorator import _sync_methods |
467 |
from portage.util.futures.retry import retry |
468 |
from portage.util.futures.executor.fork import ForkExecutor |
469 |
from . import _SUBMODULE_PATH_MAP |
470 |
@@ -40,6 +41,8 @@ class SyncBase(object): |
471 |
self.repo = None |
472 |
self.xterm_titles = None |
473 |
self.spawn_kwargs = None |
474 |
+ self.repo_storage = None |
475 |
+ self._download_dir = None |
476 |
self.bin_command = None |
477 |
self._bin_command = bin_command |
478 |
self.bin_pkg = bin_pkg |
479 |
@@ -72,7 +75,35 @@ class SyncBase(object): |
480 |
self.repo = self.options.get('repo', None) |
481 |
self.xterm_titles = self.options.get('xterm_titles', False) |
482 |
self.spawn_kwargs = self.options.get('spawn_kwargs', None) |
483 |
+ storage_cls = portage.load_mod(self._select_storage_module()) |
484 |
+ self.repo_storage = _sync_methods(storage_cls(self.repo, self.spawn_kwargs)) |
485 |
|
486 |
+ def _select_storage_module(self): |
487 |
+ ''' |
488 |
+ Select an appropriate implementation of RepoStorageInterface, based |
489 |
+ on repos.conf settings. |
490 |
+ |
491 |
+ @rtype: str |
492 |
+ @return: name of the selected repo storage constructor |
493 |
+ ''' |
494 |
+ if self.repo.sync_allow_hardlinks: |
495 |
+ mod_name = 'portage.repository.storage.hardlink_quarantine.HardlinkQuarantineRepoStorage' |
496 |
+ else: |
497 |
+ mod_name = 'portage.repository.storage.inplace.InplaceRepoStorage' |
498 |
+ return mod_name |
499 |
+ |
500 |
+ @property |
501 |
+ def download_dir(self): |
502 |
+ """ |
503 |
+ Get the path of the download directory, where the repository |
504 |
+ update is staged. The directory is initialized lazily, since |
505 |
+ the repository might already be at the latest revision, and |
506 |
+ there may be some cost associated with the directory |
507 |
+ initialization. |
508 |
+ """ |
509 |
+ if self._download_dir is None: |
510 |
+ self._download_dir = self.repo_storage.init_update() |
511 |
+ return self._download_dir |
512 |
|
513 |
def exists(self, **kwargs): |
514 |
'''Tests whether the repo actually exists''' |
515 |
-- |
516 |
2.16.4 |