Gentoo Archives: gentoo-portage-dev

From: Zac Medico <zmedico@g.o>
To: gentoo-portage-dev@l.g.o
Cc: Zac Medico <zmedico@g.o>
Subject: [gentoo-portage-dev] [PATCH v2] Log changes between vdb_metadata.pickle updates
Date: Sat, 08 Nov 2014 09:16:38
Message-Id: 1415438188-3785-1-git-send-email-zmedico@gentoo.org
In Reply to: [gentoo-portage-dev] [PATCH] Log changes between vdb_metadata.pickle updates by Zac Medico
1 This adds add support to generate a vdb_metadata_delta.json file
2 which tracks package merges / unmerges that occur between updates to
3 vdb_metadata.pickle. IndexedVardb can use the delta together with
4 vdb_metadata.pickle to reconstruct a complete view of /var/db/pkg,
5 so that it can avoid expensive listdir calls in /var/db/pkg/*.
6 Note that vdb_metadata.pickle is only updated periodically, in
7 order to avoid excessive re-writes of a large file.
8
9 In order to test the performance gains from this patch, you need to
10 generate /var/cache/edb/vdb_metadata_delta.json first, which will
11 happen automatically if you run 'emerge -p anything' with root
12 privileges.
13 ---
14 This updated patch splits out a VdbMetadataDelta class into a new file, reducing
15 vartree.py bloat.
16
17 pym/portage/dbapi/IndexedVardb.py | 22 ++++-
18 pym/portage/dbapi/VdbMetadataDelta.py | 156 ++++++++++++++++++++++++++++++++++
19 pym/portage/dbapi/vartree.py | 42 ++++++---
20 3 files changed, 209 insertions(+), 11 deletions(-)
21 create mode 100644 pym/portage/dbapi/VdbMetadataDelta.py
22
23 diff --git a/pym/portage/dbapi/IndexedVardb.py b/pym/portage/dbapi/IndexedVardb.py
24 index 424defc..38bfeed 100644
25 --- a/pym/portage/dbapi/IndexedVardb.py
26 +++ b/pym/portage/dbapi/IndexedVardb.py
27 @@ -3,6 +3,7 @@
28
29 import portage
30 from portage.dep import Atom
31 +from portage.exception import InvalidData
32 from portage.versions import _pkg_str
33
34 class IndexedVardb(object):
35 @@ -42,7 +43,26 @@ class IndexedVardb(object):
36 if self._cp_map is not None:
37 return iter(sorted(self._cp_map))
38
39 - return self._iter_cp_all()
40 + delta_data = self._vardb._cache_delta.loadRace()
41 + if delta_data is None:
42 + return self._iter_cp_all()
43 +
44 + self._vardb._cache_delta.applyDelta(delta_data)
45 +
46 + self._cp_map = cp_map = {}
47 + for cpv in self._vardb._aux_cache["packages"]:
48 + try:
49 + cpv = _pkg_str(cpv)
50 + except InvalidData:
51 + continue
52 +
53 + cp_list = cp_map.get(cpv.cp)
54 + if cp_list is None:
55 + cp_list = []
56 + cp_map[cpv.cp] = cp_list
57 + cp_list.append(cpv)
58 +
59 + return iter(sorted(self._cp_map))
60
61 def _iter_cp_all(self):
62 self._cp_map = cp_map = {}
63 diff --git a/pym/portage/dbapi/VdbMetadataDelta.py b/pym/portage/dbapi/VdbMetadataDelta.py
64 new file mode 100644
65 index 0000000..e6a5c47
66 --- /dev/null
67 +++ b/pym/portage/dbapi/VdbMetadataDelta.py
68 @@ -0,0 +1,156 @@
69 +# Copyright 2014 Gentoo Foundation
70 +# Distributed under the terms of the GNU General Public License v2
71 +
72 +import errno
73 +import io
74 +import json
75 +import os
76 +
77 +from portage import _encodings
78 +from portage.util import atomic_ofstream
79 +
80 +class VdbMetadataDelta(object):
81 +
82 + _format_version = "1"
83 +
84 + def __init__(self, vardb):
85 + self._vardb = vardb
86 +
87 + def initialize(self, timestamp):
88 + f = atomic_ofstream(self._vardb._cache_delta_filename, 'w',
89 + encoding=_encodings['repo.content'], errors='strict')
90 + json.dump({
91 + "version": self._format_version,
92 + "timestamp": timestamp
93 + }, f, ensure_ascii=False)
94 + f.close()
95 +
96 + def load(self):
97 +
98 + if not os.path.exists(self._vardb._aux_cache_filename):
99 + # If the primary cache doesn't exist yet, then
100 + # we can't record a delta against it.
101 + return None
102 +
103 + try:
104 + with io.open(self._vardb._cache_delta_filename, 'r',
105 + encoding=_encodings['repo.content'],
106 + errors='strict') as f:
107 + cache_obj = json.load(f)
108 + except EnvironmentError as e:
109 + if e.errno not in (errno.ENOENT, errno.ESTALE):
110 + raise
111 + except (SystemExit, KeyboardInterrupt):
112 + raise
113 + except Exception:
114 + # Corrupt, or not json format.
115 + pass
116 + else:
117 + try:
118 + version = cache_obj["version"]
119 + except KeyError:
120 + pass
121 + else:
122 + # If the timestamp recorded in the deltas file
123 + # doesn't match aux_cache_timestamp, then the
124 + # deltas are not valid. This means that deltas
125 + # cannot be recorded until after the next
126 + # vdb_metadata.pickle update, in order to
127 + # guarantee consistency.
128 + if version == self._format_version:
129 + try:
130 + deltas = cache_obj["deltas"]
131 + except KeyError:
132 + cache_obj["deltas"] = deltas = []
133 +
134 + if isinstance(deltas, list):
135 + return cache_obj
136 +
137 + return None
138 +
139 + def loadRace(self):
140 + """
141 + This calls self.load() and validates the timestamp
142 + against the currently loaded self._vardb._aux_cache. If a
143 + concurrent update causes the timestamps to be inconsistent,
144 + then it reloads the caches and tries one more time before
145 + it aborts. In practice, the race is very unlikely, so
146 + this will usually succeed on the first try.
147 + """
148 +
149 + tries = 2
150 + while tries:
151 + tries -= 1
152 + cache_delta = self.load()
153 + if cache_delta is not None and \
154 + cache_delta.get("timestamp") != \
155 + self._vardb._aux_cache.get("timestamp", False):
156 + self._vardb._aux_cache_obj = None
157 + else:
158 + return cache_delta
159 +
160 + return None
161 +
162 + def recordEvent(self, event, cpv, slot, counter):
163 +
164 + self._vardb.lock()
165 + try:
166 + deltas_obj = self.load()
167 +
168 + if deltas_obj is None:
169 + # We can't record meaningful deltas without
170 + # a pre-existing state.
171 + return
172 +
173 + delta_node = {
174 + "event": event,
175 + "package": cpv.cp,
176 + "version": cpv.version,
177 + "slot": slot,
178 + "counter": "%s" % counter
179 + }
180 +
181 + deltas_obj["deltas"].append(delta_node)
182 +
183 + # Eliminate earlier nodes cancelled out by later nodes
184 + # that have identical package and slot attributes.
185 + filtered_list = []
186 + slot_keys = set()
187 + version_keys = set()
188 + for delta_node in reversed(deltas_obj["deltas"]):
189 + slot_key = (delta_node["package"],
190 + delta_node["slot"])
191 + version_key = (delta_node["package"],
192 + delta_node["version"])
193 + if not (slot_key in slot_keys or \
194 + version_key in version_keys):
195 + filtered_list.append(delta_node)
196 + slot_keys.add(slot_key)
197 + version_keys.add(version_key)
198 +
199 + filtered_list.reverse()
200 + deltas_obj["deltas"] = filtered_list
201 +
202 + f = atomic_ofstream(self._vardb._cache_delta_filename,
203 + mode='w', encoding=_encodings['repo.content'])
204 + json.dump(deltas_obj, f, ensure_ascii=False)
205 + f.close()
206 +
207 + finally:
208 + self._vardb.unlock()
209 +
210 + def applyDelta(self, data):
211 + packages = self._vardb._aux_cache["packages"]
212 + for delta in data["deltas"]:
213 + cpv = delta["package"] + "-" + delta["version"]
214 + event = delta["event"]
215 + if event == "add":
216 + # Use aux_get to populate the cache
217 + # for this cpv.
218 + if cpv not in packages:
219 + try:
220 + self._vardb.aux_get(cpv, ["DESCRIPTION"])
221 + except KeyError:
222 + pass
223 + elif event == "remove":
224 + packages.pop(cpv, None)
225 diff --git a/pym/portage/dbapi/vartree.py b/pym/portage/dbapi/vartree.py
226 index 6ab4b92..e0cd5f1 100644
227 --- a/pym/portage/dbapi/vartree.py
228 +++ b/pym/portage/dbapi/vartree.py
229 @@ -63,6 +63,7 @@ from portage import _os_merge
230 from portage import _selinux_merge
231 from portage import _unicode_decode
232 from portage import _unicode_encode
233 +from .VdbMetadataDelta import VdbMetadataDelta
234
235 from _emerge.EbuildBuildDir import EbuildBuildDir
236 from _emerge.EbuildPhase import EbuildPhase
237 @@ -177,6 +178,9 @@ class vardbapi(dbapi):
238 self._aux_cache_obj = None
239 self._aux_cache_filename = os.path.join(self._eroot,
240 CACHE_PATH, "vdb_metadata.pickle")
241 + self._cache_delta_filename = os.path.join(self._eroot,
242 + CACHE_PATH, "vdb_metadata_delta.json")
243 + self._cache_delta = VdbMetadataDelta(self)
244 self._counter_path = os.path.join(self._eroot,
245 CACHE_PATH, "counter")
246
247 @@ -556,22 +560,31 @@ class vardbapi(dbapi):
248 long as at least part of the cache is still valid)."""
249 if self._flush_cache_enabled and \
250 self._aux_cache is not None and \
251 - len(self._aux_cache["modified"]) >= self._aux_cache_threshold and \
252 - secpass >= 2:
253 + secpass >= 2 and \
254 + (len(self._aux_cache["modified"]) >= self._aux_cache_threshold or
255 + not os.path.exists(self._cache_delta_filename)):
256 +
257 + ensure_dirs(os.path.dirname(self._aux_cache_filename))
258 +
259 self._owners.populate() # index any unindexed contents
260 valid_nodes = set(self.cpv_all())
261 for cpv in list(self._aux_cache["packages"]):
262 if cpv not in valid_nodes:
263 del self._aux_cache["packages"][cpv]
264 del self._aux_cache["modified"]
265 - try:
266 - f = atomic_ofstream(self._aux_cache_filename, 'wb')
267 - pickle.dump(self._aux_cache, f, protocol=2)
268 - f.close()
269 - apply_secpass_permissions(
270 - self._aux_cache_filename, gid=portage_gid, mode=0o644)
271 - except (IOError, OSError) as e:
272 - pass
273 + timestamp = time.time()
274 + self._aux_cache["timestamp"] = timestamp
275 +
276 + f = atomic_ofstream(self._aux_cache_filename, 'wb')
277 + pickle.dump(self._aux_cache, f, protocol=2)
278 + f.close()
279 + apply_secpass_permissions(
280 + self._aux_cache_filename, mode=0o644)
281 +
282 + self._cache_delta.initialize(timestamp)
283 + apply_secpass_permissions(
284 + self._cache_delta_filename, mode=0o644)
285 +
286 self._aux_cache["modified"] = set()
287
288 @property
289 @@ -1590,6 +1603,13 @@ class dblink(object):
290 self.dbdir, noiselevel=-1)
291 return
292
293 + if self.dbdir is self.dbpkgdir:
294 + counter, = self.vartree.dbapi.aux_get(
295 + self.mycpv, ["COUNTER"])
296 + self.vartree.dbapi._cache_delta.recordEvent(
297 + "remove", self.mycpv,
298 + self.settings["SLOT"].split("/")[0], counter)
299 +
300 shutil.rmtree(self.dbdir)
301 # If empty, remove parent category directory.
302 try:
303 @@ -4196,6 +4216,8 @@ class dblink(object):
304 self.delete()
305 _movefile(self.dbtmpdir, self.dbpkgdir, mysettings=self.settings)
306 self._merged_path(self.dbpkgdir, os.lstat(self.dbpkgdir))
307 + self.vartree.dbapi._cache_delta.recordEvent(
308 + "add", self.mycpv, slot, counter)
309 finally:
310 self.unlockdb()
311
312 --
313 2.0.4