1 |
This adds add support to generate a vdb_metadata_delta.json file |
2 |
which tracks package merges / unmerges that occur between updates to |
3 |
vdb_metadata.pickle. IndexedVardb can use the delta together with |
4 |
vdb_metadata.pickle to reconstruct a complete view of /var/db/pkg, |
5 |
so that it can avoid expensive listdir calls in /var/db/pkg/*. |
6 |
Note that vdb_metadata.pickle is only updated periodically, in |
7 |
order to avoid excessive re-writes of a large file. |
8 |
|
9 |
In order to test the performance gains from this patch, you need to |
10 |
generate /var/cache/edb/vdb_metadata_delta.json first, which will |
11 |
happen automatically if you run 'emerge -p anything' with root |
12 |
privileges. |
13 |
--- |
14 |
This updated patch splits out a VdbMetadataDelta class into a new file, reducing |
15 |
vartree.py bloat. |
16 |
|
17 |
pym/portage/dbapi/IndexedVardb.py | 22 ++++- |
18 |
pym/portage/dbapi/VdbMetadataDelta.py | 156 ++++++++++++++++++++++++++++++++++ |
19 |
pym/portage/dbapi/vartree.py | 42 ++++++--- |
20 |
3 files changed, 209 insertions(+), 11 deletions(-) |
21 |
create mode 100644 pym/portage/dbapi/VdbMetadataDelta.py |
22 |
|
23 |
diff --git a/pym/portage/dbapi/IndexedVardb.py b/pym/portage/dbapi/IndexedVardb.py |
24 |
index 424defc..38bfeed 100644 |
25 |
--- a/pym/portage/dbapi/IndexedVardb.py |
26 |
+++ b/pym/portage/dbapi/IndexedVardb.py |
27 |
@@ -3,6 +3,7 @@ |
28 |
|
29 |
import portage |
30 |
from portage.dep import Atom |
31 |
+from portage.exception import InvalidData |
32 |
from portage.versions import _pkg_str |
33 |
|
34 |
class IndexedVardb(object): |
35 |
@@ -42,7 +43,26 @@ class IndexedVardb(object): |
36 |
if self._cp_map is not None: |
37 |
return iter(sorted(self._cp_map)) |
38 |
|
39 |
- return self._iter_cp_all() |
40 |
+ delta_data = self._vardb._cache_delta.loadRace() |
41 |
+ if delta_data is None: |
42 |
+ return self._iter_cp_all() |
43 |
+ |
44 |
+ self._vardb._cache_delta.applyDelta(delta_data) |
45 |
+ |
46 |
+ self._cp_map = cp_map = {} |
47 |
+ for cpv in self._vardb._aux_cache["packages"]: |
48 |
+ try: |
49 |
+ cpv = _pkg_str(cpv) |
50 |
+ except InvalidData: |
51 |
+ continue |
52 |
+ |
53 |
+ cp_list = cp_map.get(cpv.cp) |
54 |
+ if cp_list is None: |
55 |
+ cp_list = [] |
56 |
+ cp_map[cpv.cp] = cp_list |
57 |
+ cp_list.append(cpv) |
58 |
+ |
59 |
+ return iter(sorted(self._cp_map)) |
60 |
|
61 |
def _iter_cp_all(self): |
62 |
self._cp_map = cp_map = {} |
63 |
diff --git a/pym/portage/dbapi/VdbMetadataDelta.py b/pym/portage/dbapi/VdbMetadataDelta.py |
64 |
new file mode 100644 |
65 |
index 0000000..e6a5c47 |
66 |
--- /dev/null |
67 |
+++ b/pym/portage/dbapi/VdbMetadataDelta.py |
68 |
@@ -0,0 +1,156 @@ |
69 |
+# Copyright 2014 Gentoo Foundation |
70 |
+# Distributed under the terms of the GNU General Public License v2 |
71 |
+ |
72 |
+import errno |
73 |
+import io |
74 |
+import json |
75 |
+import os |
76 |
+ |
77 |
+from portage import _encodings |
78 |
+from portage.util import atomic_ofstream |
79 |
+ |
80 |
+class VdbMetadataDelta(object): |
81 |
+ |
82 |
+ _format_version = "1" |
83 |
+ |
84 |
+ def __init__(self, vardb): |
85 |
+ self._vardb = vardb |
86 |
+ |
87 |
+ def initialize(self, timestamp): |
88 |
+ f = atomic_ofstream(self._vardb._cache_delta_filename, 'w', |
89 |
+ encoding=_encodings['repo.content'], errors='strict') |
90 |
+ json.dump({ |
91 |
+ "version": self._format_version, |
92 |
+ "timestamp": timestamp |
93 |
+ }, f, ensure_ascii=False) |
94 |
+ f.close() |
95 |
+ |
96 |
+ def load(self): |
97 |
+ |
98 |
+ if not os.path.exists(self._vardb._aux_cache_filename): |
99 |
+ # If the primary cache doesn't exist yet, then |
100 |
+ # we can't record a delta against it. |
101 |
+ return None |
102 |
+ |
103 |
+ try: |
104 |
+ with io.open(self._vardb._cache_delta_filename, 'r', |
105 |
+ encoding=_encodings['repo.content'], |
106 |
+ errors='strict') as f: |
107 |
+ cache_obj = json.load(f) |
108 |
+ except EnvironmentError as e: |
109 |
+ if e.errno not in (errno.ENOENT, errno.ESTALE): |
110 |
+ raise |
111 |
+ except (SystemExit, KeyboardInterrupt): |
112 |
+ raise |
113 |
+ except Exception: |
114 |
+ # Corrupt, or not json format. |
115 |
+ pass |
116 |
+ else: |
117 |
+ try: |
118 |
+ version = cache_obj["version"] |
119 |
+ except KeyError: |
120 |
+ pass |
121 |
+ else: |
122 |
+ # If the timestamp recorded in the deltas file |
123 |
+ # doesn't match aux_cache_timestamp, then the |
124 |
+ # deltas are not valid. This means that deltas |
125 |
+ # cannot be recorded until after the next |
126 |
+ # vdb_metadata.pickle update, in order to |
127 |
+ # guarantee consistency. |
128 |
+ if version == self._format_version: |
129 |
+ try: |
130 |
+ deltas = cache_obj["deltas"] |
131 |
+ except KeyError: |
132 |
+ cache_obj["deltas"] = deltas = [] |
133 |
+ |
134 |
+ if isinstance(deltas, list): |
135 |
+ return cache_obj |
136 |
+ |
137 |
+ return None |
138 |
+ |
139 |
+ def loadRace(self): |
140 |
+ """ |
141 |
+ This calls self.load() and validates the timestamp |
142 |
+ against the currently loaded self._vardb._aux_cache. If a |
143 |
+ concurrent update causes the timestamps to be inconsistent, |
144 |
+ then it reloads the caches and tries one more time before |
145 |
+ it aborts. In practice, the race is very unlikely, so |
146 |
+ this will usually succeed on the first try. |
147 |
+ """ |
148 |
+ |
149 |
+ tries = 2 |
150 |
+ while tries: |
151 |
+ tries -= 1 |
152 |
+ cache_delta = self.load() |
153 |
+ if cache_delta is not None and \ |
154 |
+ cache_delta.get("timestamp") != \ |
155 |
+ self._vardb._aux_cache.get("timestamp", False): |
156 |
+ self._vardb._aux_cache_obj = None |
157 |
+ else: |
158 |
+ return cache_delta |
159 |
+ |
160 |
+ return None |
161 |
+ |
162 |
+ def recordEvent(self, event, cpv, slot, counter): |
163 |
+ |
164 |
+ self._vardb.lock() |
165 |
+ try: |
166 |
+ deltas_obj = self.load() |
167 |
+ |
168 |
+ if deltas_obj is None: |
169 |
+ # We can't record meaningful deltas without |
170 |
+ # a pre-existing state. |
171 |
+ return |
172 |
+ |
173 |
+ delta_node = { |
174 |
+ "event": event, |
175 |
+ "package": cpv.cp, |
176 |
+ "version": cpv.version, |
177 |
+ "slot": slot, |
178 |
+ "counter": "%s" % counter |
179 |
+ } |
180 |
+ |
181 |
+ deltas_obj["deltas"].append(delta_node) |
182 |
+ |
183 |
+ # Eliminate earlier nodes cancelled out by later nodes |
184 |
+ # that have identical package and slot attributes. |
185 |
+ filtered_list = [] |
186 |
+ slot_keys = set() |
187 |
+ version_keys = set() |
188 |
+ for delta_node in reversed(deltas_obj["deltas"]): |
189 |
+ slot_key = (delta_node["package"], |
190 |
+ delta_node["slot"]) |
191 |
+ version_key = (delta_node["package"], |
192 |
+ delta_node["version"]) |
193 |
+ if not (slot_key in slot_keys or \ |
194 |
+ version_key in version_keys): |
195 |
+ filtered_list.append(delta_node) |
196 |
+ slot_keys.add(slot_key) |
197 |
+ version_keys.add(version_key) |
198 |
+ |
199 |
+ filtered_list.reverse() |
200 |
+ deltas_obj["deltas"] = filtered_list |
201 |
+ |
202 |
+ f = atomic_ofstream(self._vardb._cache_delta_filename, |
203 |
+ mode='w', encoding=_encodings['repo.content']) |
204 |
+ json.dump(deltas_obj, f, ensure_ascii=False) |
205 |
+ f.close() |
206 |
+ |
207 |
+ finally: |
208 |
+ self._vardb.unlock() |
209 |
+ |
210 |
+ def applyDelta(self, data): |
211 |
+ packages = self._vardb._aux_cache["packages"] |
212 |
+ for delta in data["deltas"]: |
213 |
+ cpv = delta["package"] + "-" + delta["version"] |
214 |
+ event = delta["event"] |
215 |
+ if event == "add": |
216 |
+ # Use aux_get to populate the cache |
217 |
+ # for this cpv. |
218 |
+ if cpv not in packages: |
219 |
+ try: |
220 |
+ self._vardb.aux_get(cpv, ["DESCRIPTION"]) |
221 |
+ except KeyError: |
222 |
+ pass |
223 |
+ elif event == "remove": |
224 |
+ packages.pop(cpv, None) |
225 |
diff --git a/pym/portage/dbapi/vartree.py b/pym/portage/dbapi/vartree.py |
226 |
index 6ab4b92..e0cd5f1 100644 |
227 |
--- a/pym/portage/dbapi/vartree.py |
228 |
+++ b/pym/portage/dbapi/vartree.py |
229 |
@@ -63,6 +63,7 @@ from portage import _os_merge |
230 |
from portage import _selinux_merge |
231 |
from portage import _unicode_decode |
232 |
from portage import _unicode_encode |
233 |
+from .VdbMetadataDelta import VdbMetadataDelta |
234 |
|
235 |
from _emerge.EbuildBuildDir import EbuildBuildDir |
236 |
from _emerge.EbuildPhase import EbuildPhase |
237 |
@@ -177,6 +178,9 @@ class vardbapi(dbapi): |
238 |
self._aux_cache_obj = None |
239 |
self._aux_cache_filename = os.path.join(self._eroot, |
240 |
CACHE_PATH, "vdb_metadata.pickle") |
241 |
+ self._cache_delta_filename = os.path.join(self._eroot, |
242 |
+ CACHE_PATH, "vdb_metadata_delta.json") |
243 |
+ self._cache_delta = VdbMetadataDelta(self) |
244 |
self._counter_path = os.path.join(self._eroot, |
245 |
CACHE_PATH, "counter") |
246 |
|
247 |
@@ -556,22 +560,31 @@ class vardbapi(dbapi): |
248 |
long as at least part of the cache is still valid).""" |
249 |
if self._flush_cache_enabled and \ |
250 |
self._aux_cache is not None and \ |
251 |
- len(self._aux_cache["modified"]) >= self._aux_cache_threshold and \ |
252 |
- secpass >= 2: |
253 |
+ secpass >= 2 and \ |
254 |
+ (len(self._aux_cache["modified"]) >= self._aux_cache_threshold or |
255 |
+ not os.path.exists(self._cache_delta_filename)): |
256 |
+ |
257 |
+ ensure_dirs(os.path.dirname(self._aux_cache_filename)) |
258 |
+ |
259 |
self._owners.populate() # index any unindexed contents |
260 |
valid_nodes = set(self.cpv_all()) |
261 |
for cpv in list(self._aux_cache["packages"]): |
262 |
if cpv not in valid_nodes: |
263 |
del self._aux_cache["packages"][cpv] |
264 |
del self._aux_cache["modified"] |
265 |
- try: |
266 |
- f = atomic_ofstream(self._aux_cache_filename, 'wb') |
267 |
- pickle.dump(self._aux_cache, f, protocol=2) |
268 |
- f.close() |
269 |
- apply_secpass_permissions( |
270 |
- self._aux_cache_filename, gid=portage_gid, mode=0o644) |
271 |
- except (IOError, OSError) as e: |
272 |
- pass |
273 |
+ timestamp = time.time() |
274 |
+ self._aux_cache["timestamp"] = timestamp |
275 |
+ |
276 |
+ f = atomic_ofstream(self._aux_cache_filename, 'wb') |
277 |
+ pickle.dump(self._aux_cache, f, protocol=2) |
278 |
+ f.close() |
279 |
+ apply_secpass_permissions( |
280 |
+ self._aux_cache_filename, mode=0o644) |
281 |
+ |
282 |
+ self._cache_delta.initialize(timestamp) |
283 |
+ apply_secpass_permissions( |
284 |
+ self._cache_delta_filename, mode=0o644) |
285 |
+ |
286 |
self._aux_cache["modified"] = set() |
287 |
|
288 |
@property |
289 |
@@ -1590,6 +1603,13 @@ class dblink(object): |
290 |
self.dbdir, noiselevel=-1) |
291 |
return |
292 |
|
293 |
+ if self.dbdir is self.dbpkgdir: |
294 |
+ counter, = self.vartree.dbapi.aux_get( |
295 |
+ self.mycpv, ["COUNTER"]) |
296 |
+ self.vartree.dbapi._cache_delta.recordEvent( |
297 |
+ "remove", self.mycpv, |
298 |
+ self.settings["SLOT"].split("/")[0], counter) |
299 |
+ |
300 |
shutil.rmtree(self.dbdir) |
301 |
# If empty, remove parent category directory. |
302 |
try: |
303 |
@@ -4196,6 +4216,8 @@ class dblink(object): |
304 |
self.delete() |
305 |
_movefile(self.dbtmpdir, self.dbpkgdir, mysettings=self.settings) |
306 |
self._merged_path(self.dbpkgdir, os.lstat(self.dbpkgdir)) |
307 |
+ self.vartree.dbapi._cache_delta.recordEvent( |
308 |
+ "add", self.mycpv, slot, counter) |
309 |
finally: |
310 |
self.unlockdb() |
311 |
|
312 |
-- |
313 |
2.0.4 |