Gentoo Archives: gentoo-portage-dev

From: Brian Dolbec <dolsen@g.o>
To: gentoo-portage-dev@l.g.o
Subject: Re: [gentoo-portage-dev] [PATCH] Log changes between vdb_metadata.pickle updates
Date: Fri, 07 Nov 2014 16:52:13
Message-Id: 20141107085114.601cd93a.dolsen@gentoo.org
In Reply to: [gentoo-portage-dev] [PATCH] Log changes between vdb_metadata.pickle updates by Zac Medico
1 On Fri, 7 Nov 2014 00:45:55 -0800
2 Zac Medico <zmedico@g.o> wrote:
3
4 > This adds add support to generate a vdb_metadata_delta.json file
5 > which tracks package merges / unmerges that occur between updates to
6 > vdb_metadata.pickle. IndexedVardb can use the delta together with
7 > vdb_metadata.pickle to reconstruct a complete view of /var/db/pkg,
8 > so that it can avoid expensive listdir calls in /var/db/pkg/*.
9 > Note that vdb_metadata.pickle is only updated periodically, in
10 > order to avoid excessive re-writes of a large file.
11 >
12 > In order to test the performance gains from this patch, you need to
13 > generate /var/cache/edb/vdb_metadata_delta.json first, which will
14 > happen automatically if you run 'emerge -p anything' with root
15 > privileges.
16 > ---
17 > pym/portage/dbapi/IndexedVardb.py | 35 ++++++++-
18 > pym/portage/dbapi/vartree.py | 161
19 > +++++++++++++++++++++++++++++++++++--- 2 files changed, 185
20 > insertions(+), 11 deletions(-)
21 >
22 > diff --git a/pym/portage/dbapi/IndexedVardb.py
23 > b/pym/portage/dbapi/IndexedVardb.py index 424defc..e225ca1 100644
24 > --- a/pym/portage/dbapi/IndexedVardb.py
25 > +++ b/pym/portage/dbapi/IndexedVardb.py
26 > @@ -3,6 +3,7 @@
27 >
28 > import portage
29 > from portage.dep import Atom
30 > +from portage.exception import InvalidData
31 > from portage.versions import _pkg_str
32 >
33 > class IndexedVardb(object):
34 > @@ -42,7 +43,39 @@ class IndexedVardb(object):
35 > if self._cp_map is not None:
36 > return iter(sorted(self._cp_map))
37 >
38 > - return self._iter_cp_all()
39 > + cache_delta = self._vardb._cache_delta_load_race()
40 > + if cache_delta is None:
41 > + return self._iter_cp_all()
42 > +
43 > + packages = self._vardb._aux_cache["packages"]
44 > + for delta in cache_delta["deltas"]:
45 > + cpv = delta["package"] + "-" +
46 > delta["version"]
47 > + event = delta["event"]
48 > + if event == "add":
49 > + # Use aux_get to populate the cache
50 > + # for this cpv.
51 > + if cpv not in packages:
52 > + try:
53 > +
54 > self._vardb.aux_get(cpv, ["DESCRIPTION"])
55 > + except KeyError:
56 > + pass
57 > + elif event == "remove":
58 > + packages.pop(cpv, None)
59 > +
60 > + self._cp_map = cp_map = {}
61 > + for cpv in packages:
62 > + try:
63 > + cpv = _pkg_str(cpv)
64 > + except InvalidData:
65 > + continue
66 > +
67 > + cp_list = cp_map.get(cpv.cp)
68 > + if cp_list is None:
69 > + cp_list = []
70 > + cp_map[cpv.cp] = cp_list
71 > + cp_list.append(cpv)
72 > +
73 > + return iter(sorted(self._cp_map))
74 >
75 > def _iter_cp_all(self):
76 > self._cp_map = cp_map = {}
77
78 looks good
79
80 > diff --git a/pym/portage/dbapi/vartree.py
81 > b/pym/portage/dbapi/vartree.py index 6ab4b92..fd4b099 100644
82 > --- a/pym/portage/dbapi/vartree.py
83 > +++ b/pym/portage/dbapi/vartree.py
84 > @@ -76,6 +76,7 @@ import gc
85 > import grp
86 > import io
87 > from itertools import chain
88 > +import json
89 > import logging
90 > import os as _os
91 > import platform
92 > @@ -109,6 +110,7 @@ class vardbapi(dbapi):
93 > "|".join(_excluded_dirs) + r')$')
94 >
95 > _aux_cache_version = "1"
96 > + _aux_cache_delta_version = "1"
97 > _owners_cache_version = "1"
98 >
99 > # Number of uncached packages to trigger cache update, since
100 > @@ -177,6 +179,8 @@ class vardbapi(dbapi):
101 > self._aux_cache_obj = None
102 > self._aux_cache_filename = os.path.join(self._eroot,
103 > CACHE_PATH, "vdb_metadata.pickle")
104 > + self._cache_delta_filename =
105 > os.path.join(self._eroot,
106 > + CACHE_PATH, "vdb_metadata_delta.json")
107 > self._counter_path = os.path.join(self._eroot,
108 > CACHE_PATH, "counter")
109 >
110 > @@ -511,6 +515,120 @@ class vardbapi(dbapi):
111 > self.cpcache.pop(pkg_dblink.mysplit[0], None)
112 > dircache.pop(pkg_dblink.dbcatdir, None)
113 >
114
115 The following code I would like to see either as an independant class
116 and file if possible, then just instantiated here in the main vardbapi.
117 Looking over the code, I didn't see much use of other class functions.
118 This class is already too large in many ways. Also is there a
119 possibility this code could be re-used as a generic delta cache
120 anywhere else?
121
122 Another possibility is moving this code and the aux_cache code to
123 another class that the vardbapi class also subclasses. This would move
124 all the cache code to a small class easily viewed, edited, maintained.
125
126 This file is already 5k+ LOC and primarily the vardbapi class
127
128 > + def _cache_delta(self, event, cpv, slot, counter):
129 > +
130 > + self.lock()
131 > + try:
132 > + deltas_obj = self._cache_delta_load()
133 > +
134 > + if deltas_obj is None:
135 > + # We can't record meaningful deltas
136 > without
137 > + # a pre-existing state.
138 > + return
139 > +
140 > + delta_node = {
141 > + "event": event,
142 > + "package": cpv.cp,
143 > + "version": cpv.version,
144 > + "slot": slot,
145 > + "counter": "%s" % counter
146 > + }
147 > +
148 > + deltas_obj["deltas"].append(delta_node)
149 > +
150 > + # Eliminate earlier nodes cancelled out by
151 > later nodes
152 > + # that have identical package and slot
153 > attributes.
154 > + filtered_list = []
155 > + slot_keys = set()
156 > + version_keys = set()
157 > + for delta_node in
158 > reversed(deltas_obj["deltas"]):
159 > + slot_key = (delta_node["package"],
160 > + delta_node["slot"])
161 > + version_key = (delta_node["package"],
162 > + delta_node["version"])
163 > + if not (slot_key in slot_keys or \
164 > + version_key in version_keys):
165 > +
166 > filtered_list.append(delta_node)
167 > + slot_keys.add(slot_key)
168 > + version_keys.add(version_key)
169 > +
170 > + filtered_list.reverse()
171 > + deltas_obj["deltas"] = filtered_list
172 > +
173 > + f =
174 > atomic_ofstream(self._cache_delta_filename,
175 > + mode='w',
176 > encoding=_encodings['repo.content'])
177 > + json.dump(deltas_obj, f, ensure_ascii=False)
178 > + f.close()
179 > +
180 > + finally:
181 > + self.unlock()
182 > +
183 > + def _cache_delta_load(self):
184 > +
185 > + if not os.path.exists(self._aux_cache_filename):
186 > + # If the primary cache doesn't exist yet,
187 > then
188 > + # we can't record a delta against it.
189 > + return None
190 > +
191 > + try:
192 > + with io.open(self._cache_delta_filename, 'r',
193 > + encoding=_encodings['repo.content'],
194 > + errors='strict') as f:
195 > + cache_obj = json.load(f)
196 > + except EnvironmentError as e:
197 > + if e.errno not in (errno.ENOENT,
198 > errno.ESTALE):
199 > + raise
200 > + except (SystemExit, KeyboardInterrupt):
201 > + raise
202 > + except Exception:
203 > + # Corrupt, or not json format.
204 > + pass
205 > + else:
206 > + try:
207 > + version = cache_obj["version"]
208 > + except KeyError:
209 > + pass
210 > + else:
211 > + # If the timestamp recorded in the
212 > deltas file
213 > + # doesn't match aux_cache_timestamp,
214 > then the
215 > + # deltas are not valid. This means
216 > that deltas
217 > + # cannot be recorded until after the
218 > next
219 > + # vdb_metadata.pickle update, in
220 > order to
221 > + # guarantee consistency.
222 > + if version ==
223 > self._aux_cache_delta_version:
224 > + try:
225 > + deltas =
226 > cache_obj["deltas"]
227 > + except KeyError:
228 > + cache_obj["deltas"]
229 > = deltas = [] +
230 > + if isinstance(deltas, list):
231 > + return cache_obj
232 > +
233 > + return None
234 > +
235 > + def _cache_delta_load_race(self):
236 > + """
237 > + This calls _cache_delta_load and validates the
238 > timestamp
239 > + against the currently loaded _aux_cache. If a
240 > concurrent
241 > + update causes the timestamps to be inconsistent, then
242 > + it reloads the caches and tries one more time before
243 > + it aborts. In practice, the race is very unlikely, so
244 > + this will usually succeed on the first try.
245 > + """
246 > +
247 > + tries = 2
248 > + while tries:
249 > + tries -= 1
250 > + cache_delta = self._cache_delta_load()
251 > + if cache_delta is not None and \
252 > + cache_delta.get("timestamp") != \
253 > + self._aux_cache.get("timestamp",
254 > False):
255 > + self._aux_cache_obj = None
256 > + else:
257 > + return cache_delta
258 > +
259 > + return None
260 > +
261 > def match(self, origdep, use_cache=1):
262 > "caching match function"
263 > mydep = dep_expand(
264 > @@ -556,22 +674,37 @@ class vardbapi(dbapi):
265 > long as at least part of the cache is still
266 > valid).""" if self._flush_cache_enabled and \
267 > self._aux_cache is not None and \
268 > - len(self._aux_cache["modified"]) >=
269 > self._aux_cache_threshold and \
270 > - secpass >= 2:
271 > + secpass >= 2 and \
272 > + (len(self._aux_cache["modified"]) >=
273 > self._aux_cache_threshold or
274 > + not
275 > os.path.exists(self._cache_delta_filename)): +
276 > +
277 > ensure_dirs(os.path.dirname(self._aux_cache_filename)) +
278 > self._owners.populate() # index any
279 > unindexed contents valid_nodes = set(self.cpv_all())
280 > for cpv in list(self._aux_cache["packages"]):
281 > if cpv not in valid_nodes:
282 > del
283 > self._aux_cache["packages"][cpv] del self._aux_cache["modified"]
284 > - try:
285 > - f =
286 > atomic_ofstream(self._aux_cache_filename, 'wb')
287 > - pickle.dump(self._aux_cache, f,
288 > protocol=2)
289 > - f.close()
290 > - apply_secpass_permissions(
291 > - self._aux_cache_filename,
292 > gid=portage_gid, mode=0o644)
293 > - except (IOError, OSError) as e:
294 > - pass
295 > + timestamp = time.time()
296 > + self._aux_cache["timestamp"] = timestamp
297 > +
298 > + f =
299 > atomic_ofstream(self._aux_cache_filename, 'wb')
300 > + pickle.dump(self._aux_cache, f, protocol=2)
301 > + f.close()
302 > + apply_secpass_permissions(
303 > + self._aux_cache_filename, mode=0o644)
304 > +
305 > + f =
306 > atomic_ofstream(self._cache_delta_filename, 'w',
307 > + encoding=_encodings['repo.content'],
308 > errors='strict')
309 > + json.dump({
310 > + "version":
311 > self._aux_cache_delta_version,
312 > + "timestamp": timestamp
313 > + }, f, ensure_ascii=False)
314 > + f.close()
315 > + apply_secpass_permissions(
316 > + self._cache_delta_filename,
317 > mode=0o644) +
318 > self._aux_cache["modified"] = set()
319 >
320 > @property
321 > @@ -1590,6 +1723,12 @@ class dblink(object):
322 > self.dbdir, noiselevel=-1)
323 > return
324 >
325 > + if self.dbdir is self.dbpkgdir:
326 > + counter, = self.vartree.dbapi.aux_get(
327 > + self.mycpv, ["COUNTER"])
328 > + self.vartree.dbapi._cache_delta("remove",
329 > self.mycpv,
330 > + self.settings["SLOT"].split("/")[0],
331 > counter) +
332 > shutil.rmtree(self.dbdir)
333 > # If empty, remove parent category directory.
334 > try:
335 > @@ -4196,6 +4335,8 @@ class dblink(object):
336 > self.delete()
337 > _movefile(self.dbtmpdir, self.dbpkgdir,
338 > mysettings=self.settings) self._merged_path(self.dbpkgdir,
339 > os.lstat(self.dbpkgdir))
340 > + self.vartree.dbapi._cache_delta("add",
341 > + self.mycpv, slot, counter)
342 > finally:
343 > self.unlockdb()
344 >
345
346
347
348 --
349 Brian Dolbec <dolsen>

Replies