Gentoo Archives: gentoo-portage-dev

From: Zac Medico <zmedico@g.o>
To: gentoo-portage-dev@l.g.o
Cc: Zac Medico <zmedico@g.o>
Subject: [gentoo-portage-dev] [PATCH] Log changes between vdb_metadata.pickle updates
Date: Fri, 07 Nov 2014 08:46:03
Message-Id: 1415349955-10728-1-git-send-email-zmedico@gentoo.org
In Reply to: [gentoo-portage-dev] Re: [PATCH 4/5] Add IndexedVardb class. by Zac Medico
1 This adds add support to generate a vdb_metadata_delta.json file
2 which tracks package merges / unmerges that occur between updates to
3 vdb_metadata.pickle. IndexedVardb can use the delta together with
4 vdb_metadata.pickle to reconstruct a complete view of /var/db/pkg,
5 so that it can avoid expensive listdir calls in /var/db/pkg/*.
6 Note that vdb_metadata.pickle is only updated periodically, in
7 order to avoid excessive re-writes of a large file.
8
9 In order to test the performance gains from this patch, you need to
10 generate /var/cache/edb/vdb_metadata_delta.json first, which will
11 happen automatically if you run 'emerge -p anything' with root
12 privileges.
13 ---
14 pym/portage/dbapi/IndexedVardb.py | 35 ++++++++-
15 pym/portage/dbapi/vartree.py | 161 +++++++++++++++++++++++++++++++++++---
16 2 files changed, 185 insertions(+), 11 deletions(-)
17
18 diff --git a/pym/portage/dbapi/IndexedVardb.py b/pym/portage/dbapi/IndexedVardb.py
19 index 424defc..e225ca1 100644
20 --- a/pym/portage/dbapi/IndexedVardb.py
21 +++ b/pym/portage/dbapi/IndexedVardb.py
22 @@ -3,6 +3,7 @@
23
24 import portage
25 from portage.dep import Atom
26 +from portage.exception import InvalidData
27 from portage.versions import _pkg_str
28
29 class IndexedVardb(object):
30 @@ -42,7 +43,39 @@ class IndexedVardb(object):
31 if self._cp_map is not None:
32 return iter(sorted(self._cp_map))
33
34 - return self._iter_cp_all()
35 + cache_delta = self._vardb._cache_delta_load_race()
36 + if cache_delta is None:
37 + return self._iter_cp_all()
38 +
39 + packages = self._vardb._aux_cache["packages"]
40 + for delta in cache_delta["deltas"]:
41 + cpv = delta["package"] + "-" + delta["version"]
42 + event = delta["event"]
43 + if event == "add":
44 + # Use aux_get to populate the cache
45 + # for this cpv.
46 + if cpv not in packages:
47 + try:
48 + self._vardb.aux_get(cpv, ["DESCRIPTION"])
49 + except KeyError:
50 + pass
51 + elif event == "remove":
52 + packages.pop(cpv, None)
53 +
54 + self._cp_map = cp_map = {}
55 + for cpv in packages:
56 + try:
57 + cpv = _pkg_str(cpv)
58 + except InvalidData:
59 + continue
60 +
61 + cp_list = cp_map.get(cpv.cp)
62 + if cp_list is None:
63 + cp_list = []
64 + cp_map[cpv.cp] = cp_list
65 + cp_list.append(cpv)
66 +
67 + return iter(sorted(self._cp_map))
68
69 def _iter_cp_all(self):
70 self._cp_map = cp_map = {}
71 diff --git a/pym/portage/dbapi/vartree.py b/pym/portage/dbapi/vartree.py
72 index 6ab4b92..fd4b099 100644
73 --- a/pym/portage/dbapi/vartree.py
74 +++ b/pym/portage/dbapi/vartree.py
75 @@ -76,6 +76,7 @@ import gc
76 import grp
77 import io
78 from itertools import chain
79 +import json
80 import logging
81 import os as _os
82 import platform
83 @@ -109,6 +110,7 @@ class vardbapi(dbapi):
84 "|".join(_excluded_dirs) + r')$')
85
86 _aux_cache_version = "1"
87 + _aux_cache_delta_version = "1"
88 _owners_cache_version = "1"
89
90 # Number of uncached packages to trigger cache update, since
91 @@ -177,6 +179,8 @@ class vardbapi(dbapi):
92 self._aux_cache_obj = None
93 self._aux_cache_filename = os.path.join(self._eroot,
94 CACHE_PATH, "vdb_metadata.pickle")
95 + self._cache_delta_filename = os.path.join(self._eroot,
96 + CACHE_PATH, "vdb_metadata_delta.json")
97 self._counter_path = os.path.join(self._eroot,
98 CACHE_PATH, "counter")
99
100 @@ -511,6 +515,120 @@ class vardbapi(dbapi):
101 self.cpcache.pop(pkg_dblink.mysplit[0], None)
102 dircache.pop(pkg_dblink.dbcatdir, None)
103
104 + def _cache_delta(self, event, cpv, slot, counter):
105 +
106 + self.lock()
107 + try:
108 + deltas_obj = self._cache_delta_load()
109 +
110 + if deltas_obj is None:
111 + # We can't record meaningful deltas without
112 + # a pre-existing state.
113 + return
114 +
115 + delta_node = {
116 + "event": event,
117 + "package": cpv.cp,
118 + "version": cpv.version,
119 + "slot": slot,
120 + "counter": "%s" % counter
121 + }
122 +
123 + deltas_obj["deltas"].append(delta_node)
124 +
125 + # Eliminate earlier nodes cancelled out by later nodes
126 + # that have identical package and slot attributes.
127 + filtered_list = []
128 + slot_keys = set()
129 + version_keys = set()
130 + for delta_node in reversed(deltas_obj["deltas"]):
131 + slot_key = (delta_node["package"],
132 + delta_node["slot"])
133 + version_key = (delta_node["package"],
134 + delta_node["version"])
135 + if not (slot_key in slot_keys or \
136 + version_key in version_keys):
137 + filtered_list.append(delta_node)
138 + slot_keys.add(slot_key)
139 + version_keys.add(version_key)
140 +
141 + filtered_list.reverse()
142 + deltas_obj["deltas"] = filtered_list
143 +
144 + f = atomic_ofstream(self._cache_delta_filename,
145 + mode='w', encoding=_encodings['repo.content'])
146 + json.dump(deltas_obj, f, ensure_ascii=False)
147 + f.close()
148 +
149 + finally:
150 + self.unlock()
151 +
152 + def _cache_delta_load(self):
153 +
154 + if not os.path.exists(self._aux_cache_filename):
155 + # If the primary cache doesn't exist yet, then
156 + # we can't record a delta against it.
157 + return None
158 +
159 + try:
160 + with io.open(self._cache_delta_filename, 'r',
161 + encoding=_encodings['repo.content'],
162 + errors='strict') as f:
163 + cache_obj = json.load(f)
164 + except EnvironmentError as e:
165 + if e.errno not in (errno.ENOENT, errno.ESTALE):
166 + raise
167 + except (SystemExit, KeyboardInterrupt):
168 + raise
169 + except Exception:
170 + # Corrupt, or not json format.
171 + pass
172 + else:
173 + try:
174 + version = cache_obj["version"]
175 + except KeyError:
176 + pass
177 + else:
178 + # If the timestamp recorded in the deltas file
179 + # doesn't match aux_cache_timestamp, then the
180 + # deltas are not valid. This means that deltas
181 + # cannot be recorded until after the next
182 + # vdb_metadata.pickle update, in order to
183 + # guarantee consistency.
184 + if version == self._aux_cache_delta_version:
185 + try:
186 + deltas = cache_obj["deltas"]
187 + except KeyError:
188 + cache_obj["deltas"] = deltas = []
189 +
190 + if isinstance(deltas, list):
191 + return cache_obj
192 +
193 + return None
194 +
195 + def _cache_delta_load_race(self):
196 + """
197 + This calls _cache_delta_load and validates the timestamp
198 + against the currently loaded _aux_cache. If a concurrent
199 + update causes the timestamps to be inconsistent, then
200 + it reloads the caches and tries one more time before
201 + it aborts. In practice, the race is very unlikely, so
202 + this will usually succeed on the first try.
203 + """
204 +
205 + tries = 2
206 + while tries:
207 + tries -= 1
208 + cache_delta = self._cache_delta_load()
209 + if cache_delta is not None and \
210 + cache_delta.get("timestamp") != \
211 + self._aux_cache.get("timestamp", False):
212 + self._aux_cache_obj = None
213 + else:
214 + return cache_delta
215 +
216 + return None
217 +
218 def match(self, origdep, use_cache=1):
219 "caching match function"
220 mydep = dep_expand(
221 @@ -556,22 +674,37 @@ class vardbapi(dbapi):
222 long as at least part of the cache is still valid)."""
223 if self._flush_cache_enabled and \
224 self._aux_cache is not None and \
225 - len(self._aux_cache["modified"]) >= self._aux_cache_threshold and \
226 - secpass >= 2:
227 + secpass >= 2 and \
228 + (len(self._aux_cache["modified"]) >= self._aux_cache_threshold or
229 + not os.path.exists(self._cache_delta_filename)):
230 +
231 + ensure_dirs(os.path.dirname(self._aux_cache_filename))
232 +
233 self._owners.populate() # index any unindexed contents
234 valid_nodes = set(self.cpv_all())
235 for cpv in list(self._aux_cache["packages"]):
236 if cpv not in valid_nodes:
237 del self._aux_cache["packages"][cpv]
238 del self._aux_cache["modified"]
239 - try:
240 - f = atomic_ofstream(self._aux_cache_filename, 'wb')
241 - pickle.dump(self._aux_cache, f, protocol=2)
242 - f.close()
243 - apply_secpass_permissions(
244 - self._aux_cache_filename, gid=portage_gid, mode=0o644)
245 - except (IOError, OSError) as e:
246 - pass
247 + timestamp = time.time()
248 + self._aux_cache["timestamp"] = timestamp
249 +
250 + f = atomic_ofstream(self._aux_cache_filename, 'wb')
251 + pickle.dump(self._aux_cache, f, protocol=2)
252 + f.close()
253 + apply_secpass_permissions(
254 + self._aux_cache_filename, mode=0o644)
255 +
256 + f = atomic_ofstream(self._cache_delta_filename, 'w',
257 + encoding=_encodings['repo.content'], errors='strict')
258 + json.dump({
259 + "version": self._aux_cache_delta_version,
260 + "timestamp": timestamp
261 + }, f, ensure_ascii=False)
262 + f.close()
263 + apply_secpass_permissions(
264 + self._cache_delta_filename, mode=0o644)
265 +
266 self._aux_cache["modified"] = set()
267
268 @property
269 @@ -1590,6 +1723,12 @@ class dblink(object):
270 self.dbdir, noiselevel=-1)
271 return
272
273 + if self.dbdir is self.dbpkgdir:
274 + counter, = self.vartree.dbapi.aux_get(
275 + self.mycpv, ["COUNTER"])
276 + self.vartree.dbapi._cache_delta("remove", self.mycpv,
277 + self.settings["SLOT"].split("/")[0], counter)
278 +
279 shutil.rmtree(self.dbdir)
280 # If empty, remove parent category directory.
281 try:
282 @@ -4196,6 +4335,8 @@ class dblink(object):
283 self.delete()
284 _movefile(self.dbtmpdir, self.dbpkgdir, mysettings=self.settings)
285 self._merged_path(self.dbpkgdir, os.lstat(self.dbpkgdir))
286 + self.vartree.dbapi._cache_delta("add",
287 + self.mycpv, slot, counter)
288 finally:
289 self.unlockdb()
290
291 --
292 2.0.4

Replies