1 |
This adds add support to generate a vdb_metadata_delta.json file |
2 |
which tracks package merges / unmerges that occur between updates to |
3 |
vdb_metadata.pickle. IndexedVardb can use the delta together with |
4 |
vdb_metadata.pickle to reconstruct a complete view of /var/db/pkg, |
5 |
so that it can avoid expensive listdir calls in /var/db/pkg/*. |
6 |
Note that vdb_metadata.pickle is only updated periodically, in |
7 |
order to avoid excessive re-writes of a large file. |
8 |
|
9 |
In order to test the performance gains from this patch, you need to |
10 |
generate /var/cache/edb/vdb_metadata_delta.json first, which will |
11 |
happen automatically if you run 'emerge -p anything' with root |
12 |
privileges. |
13 |
--- |
14 |
pym/portage/dbapi/IndexedVardb.py | 35 ++++++++- |
15 |
pym/portage/dbapi/vartree.py | 161 +++++++++++++++++++++++++++++++++++--- |
16 |
2 files changed, 185 insertions(+), 11 deletions(-) |
17 |
|
18 |
diff --git a/pym/portage/dbapi/IndexedVardb.py b/pym/portage/dbapi/IndexedVardb.py |
19 |
index 424defc..e225ca1 100644 |
20 |
--- a/pym/portage/dbapi/IndexedVardb.py |
21 |
+++ b/pym/portage/dbapi/IndexedVardb.py |
22 |
@@ -3,6 +3,7 @@ |
23 |
|
24 |
import portage |
25 |
from portage.dep import Atom |
26 |
+from portage.exception import InvalidData |
27 |
from portage.versions import _pkg_str |
28 |
|
29 |
class IndexedVardb(object): |
30 |
@@ -42,7 +43,39 @@ class IndexedVardb(object): |
31 |
if self._cp_map is not None: |
32 |
return iter(sorted(self._cp_map)) |
33 |
|
34 |
- return self._iter_cp_all() |
35 |
+ cache_delta = self._vardb._cache_delta_load_race() |
36 |
+ if cache_delta is None: |
37 |
+ return self._iter_cp_all() |
38 |
+ |
39 |
+ packages = self._vardb._aux_cache["packages"] |
40 |
+ for delta in cache_delta["deltas"]: |
41 |
+ cpv = delta["package"] + "-" + delta["version"] |
42 |
+ event = delta["event"] |
43 |
+ if event == "add": |
44 |
+ # Use aux_get to populate the cache |
45 |
+ # for this cpv. |
46 |
+ if cpv not in packages: |
47 |
+ try: |
48 |
+ self._vardb.aux_get(cpv, ["DESCRIPTION"]) |
49 |
+ except KeyError: |
50 |
+ pass |
51 |
+ elif event == "remove": |
52 |
+ packages.pop(cpv, None) |
53 |
+ |
54 |
+ self._cp_map = cp_map = {} |
55 |
+ for cpv in packages: |
56 |
+ try: |
57 |
+ cpv = _pkg_str(cpv) |
58 |
+ except InvalidData: |
59 |
+ continue |
60 |
+ |
61 |
+ cp_list = cp_map.get(cpv.cp) |
62 |
+ if cp_list is None: |
63 |
+ cp_list = [] |
64 |
+ cp_map[cpv.cp] = cp_list |
65 |
+ cp_list.append(cpv) |
66 |
+ |
67 |
+ return iter(sorted(self._cp_map)) |
68 |
|
69 |
def _iter_cp_all(self): |
70 |
self._cp_map = cp_map = {} |
71 |
diff --git a/pym/portage/dbapi/vartree.py b/pym/portage/dbapi/vartree.py |
72 |
index 6ab4b92..fd4b099 100644 |
73 |
--- a/pym/portage/dbapi/vartree.py |
74 |
+++ b/pym/portage/dbapi/vartree.py |
75 |
@@ -76,6 +76,7 @@ import gc |
76 |
import grp |
77 |
import io |
78 |
from itertools import chain |
79 |
+import json |
80 |
import logging |
81 |
import os as _os |
82 |
import platform |
83 |
@@ -109,6 +110,7 @@ class vardbapi(dbapi): |
84 |
"|".join(_excluded_dirs) + r')$') |
85 |
|
86 |
_aux_cache_version = "1" |
87 |
+ _aux_cache_delta_version = "1" |
88 |
_owners_cache_version = "1" |
89 |
|
90 |
# Number of uncached packages to trigger cache update, since |
91 |
@@ -177,6 +179,8 @@ class vardbapi(dbapi): |
92 |
self._aux_cache_obj = None |
93 |
self._aux_cache_filename = os.path.join(self._eroot, |
94 |
CACHE_PATH, "vdb_metadata.pickle") |
95 |
+ self._cache_delta_filename = os.path.join(self._eroot, |
96 |
+ CACHE_PATH, "vdb_metadata_delta.json") |
97 |
self._counter_path = os.path.join(self._eroot, |
98 |
CACHE_PATH, "counter") |
99 |
|
100 |
@@ -511,6 +515,120 @@ class vardbapi(dbapi): |
101 |
self.cpcache.pop(pkg_dblink.mysplit[0], None) |
102 |
dircache.pop(pkg_dblink.dbcatdir, None) |
103 |
|
104 |
+ def _cache_delta(self, event, cpv, slot, counter): |
105 |
+ |
106 |
+ self.lock() |
107 |
+ try: |
108 |
+ deltas_obj = self._cache_delta_load() |
109 |
+ |
110 |
+ if deltas_obj is None: |
111 |
+ # We can't record meaningful deltas without |
112 |
+ # a pre-existing state. |
113 |
+ return |
114 |
+ |
115 |
+ delta_node = { |
116 |
+ "event": event, |
117 |
+ "package": cpv.cp, |
118 |
+ "version": cpv.version, |
119 |
+ "slot": slot, |
120 |
+ "counter": "%s" % counter |
121 |
+ } |
122 |
+ |
123 |
+ deltas_obj["deltas"].append(delta_node) |
124 |
+ |
125 |
+ # Eliminate earlier nodes cancelled out by later nodes |
126 |
+ # that have identical package and slot attributes. |
127 |
+ filtered_list = [] |
128 |
+ slot_keys = set() |
129 |
+ version_keys = set() |
130 |
+ for delta_node in reversed(deltas_obj["deltas"]): |
131 |
+ slot_key = (delta_node["package"], |
132 |
+ delta_node["slot"]) |
133 |
+ version_key = (delta_node["package"], |
134 |
+ delta_node["version"]) |
135 |
+ if not (slot_key in slot_keys or \ |
136 |
+ version_key in version_keys): |
137 |
+ filtered_list.append(delta_node) |
138 |
+ slot_keys.add(slot_key) |
139 |
+ version_keys.add(version_key) |
140 |
+ |
141 |
+ filtered_list.reverse() |
142 |
+ deltas_obj["deltas"] = filtered_list |
143 |
+ |
144 |
+ f = atomic_ofstream(self._cache_delta_filename, |
145 |
+ mode='w', encoding=_encodings['repo.content']) |
146 |
+ json.dump(deltas_obj, f, ensure_ascii=False) |
147 |
+ f.close() |
148 |
+ |
149 |
+ finally: |
150 |
+ self.unlock() |
151 |
+ |
152 |
+ def _cache_delta_load(self): |
153 |
+ |
154 |
+ if not os.path.exists(self._aux_cache_filename): |
155 |
+ # If the primary cache doesn't exist yet, then |
156 |
+ # we can't record a delta against it. |
157 |
+ return None |
158 |
+ |
159 |
+ try: |
160 |
+ with io.open(self._cache_delta_filename, 'r', |
161 |
+ encoding=_encodings['repo.content'], |
162 |
+ errors='strict') as f: |
163 |
+ cache_obj = json.load(f) |
164 |
+ except EnvironmentError as e: |
165 |
+ if e.errno not in (errno.ENOENT, errno.ESTALE): |
166 |
+ raise |
167 |
+ except (SystemExit, KeyboardInterrupt): |
168 |
+ raise |
169 |
+ except Exception: |
170 |
+ # Corrupt, or not json format. |
171 |
+ pass |
172 |
+ else: |
173 |
+ try: |
174 |
+ version = cache_obj["version"] |
175 |
+ except KeyError: |
176 |
+ pass |
177 |
+ else: |
178 |
+ # If the timestamp recorded in the deltas file |
179 |
+ # doesn't match aux_cache_timestamp, then the |
180 |
+ # deltas are not valid. This means that deltas |
181 |
+ # cannot be recorded until after the next |
182 |
+ # vdb_metadata.pickle update, in order to |
183 |
+ # guarantee consistency. |
184 |
+ if version == self._aux_cache_delta_version: |
185 |
+ try: |
186 |
+ deltas = cache_obj["deltas"] |
187 |
+ except KeyError: |
188 |
+ cache_obj["deltas"] = deltas = [] |
189 |
+ |
190 |
+ if isinstance(deltas, list): |
191 |
+ return cache_obj |
192 |
+ |
193 |
+ return None |
194 |
+ |
195 |
+ def _cache_delta_load_race(self): |
196 |
+ """ |
197 |
+ This calls _cache_delta_load and validates the timestamp |
198 |
+ against the currently loaded _aux_cache. If a concurrent |
199 |
+ update causes the timestamps to be inconsistent, then |
200 |
+ it reloads the caches and tries one more time before |
201 |
+ it aborts. In practice, the race is very unlikely, so |
202 |
+ this will usually succeed on the first try. |
203 |
+ """ |
204 |
+ |
205 |
+ tries = 2 |
206 |
+ while tries: |
207 |
+ tries -= 1 |
208 |
+ cache_delta = self._cache_delta_load() |
209 |
+ if cache_delta is not None and \ |
210 |
+ cache_delta.get("timestamp") != \ |
211 |
+ self._aux_cache.get("timestamp", False): |
212 |
+ self._aux_cache_obj = None |
213 |
+ else: |
214 |
+ return cache_delta |
215 |
+ |
216 |
+ return None |
217 |
+ |
218 |
def match(self, origdep, use_cache=1): |
219 |
"caching match function" |
220 |
mydep = dep_expand( |
221 |
@@ -556,22 +674,37 @@ class vardbapi(dbapi): |
222 |
long as at least part of the cache is still valid).""" |
223 |
if self._flush_cache_enabled and \ |
224 |
self._aux_cache is not None and \ |
225 |
- len(self._aux_cache["modified"]) >= self._aux_cache_threshold and \ |
226 |
- secpass >= 2: |
227 |
+ secpass >= 2 and \ |
228 |
+ (len(self._aux_cache["modified"]) >= self._aux_cache_threshold or |
229 |
+ not os.path.exists(self._cache_delta_filename)): |
230 |
+ |
231 |
+ ensure_dirs(os.path.dirname(self._aux_cache_filename)) |
232 |
+ |
233 |
self._owners.populate() # index any unindexed contents |
234 |
valid_nodes = set(self.cpv_all()) |
235 |
for cpv in list(self._aux_cache["packages"]): |
236 |
if cpv not in valid_nodes: |
237 |
del self._aux_cache["packages"][cpv] |
238 |
del self._aux_cache["modified"] |
239 |
- try: |
240 |
- f = atomic_ofstream(self._aux_cache_filename, 'wb') |
241 |
- pickle.dump(self._aux_cache, f, protocol=2) |
242 |
- f.close() |
243 |
- apply_secpass_permissions( |
244 |
- self._aux_cache_filename, gid=portage_gid, mode=0o644) |
245 |
- except (IOError, OSError) as e: |
246 |
- pass |
247 |
+ timestamp = time.time() |
248 |
+ self._aux_cache["timestamp"] = timestamp |
249 |
+ |
250 |
+ f = atomic_ofstream(self._aux_cache_filename, 'wb') |
251 |
+ pickle.dump(self._aux_cache, f, protocol=2) |
252 |
+ f.close() |
253 |
+ apply_secpass_permissions( |
254 |
+ self._aux_cache_filename, mode=0o644) |
255 |
+ |
256 |
+ f = atomic_ofstream(self._cache_delta_filename, 'w', |
257 |
+ encoding=_encodings['repo.content'], errors='strict') |
258 |
+ json.dump({ |
259 |
+ "version": self._aux_cache_delta_version, |
260 |
+ "timestamp": timestamp |
261 |
+ }, f, ensure_ascii=False) |
262 |
+ f.close() |
263 |
+ apply_secpass_permissions( |
264 |
+ self._cache_delta_filename, mode=0o644) |
265 |
+ |
266 |
self._aux_cache["modified"] = set() |
267 |
|
268 |
@property |
269 |
@@ -1590,6 +1723,12 @@ class dblink(object): |
270 |
self.dbdir, noiselevel=-1) |
271 |
return |
272 |
|
273 |
+ if self.dbdir is self.dbpkgdir: |
274 |
+ counter, = self.vartree.dbapi.aux_get( |
275 |
+ self.mycpv, ["COUNTER"]) |
276 |
+ self.vartree.dbapi._cache_delta("remove", self.mycpv, |
277 |
+ self.settings["SLOT"].split("/")[0], counter) |
278 |
+ |
279 |
shutil.rmtree(self.dbdir) |
280 |
# If empty, remove parent category directory. |
281 |
try: |
282 |
@@ -4196,6 +4335,8 @@ class dblink(object): |
283 |
self.delete() |
284 |
_movefile(self.dbtmpdir, self.dbpkgdir, mysettings=self.settings) |
285 |
self._merged_path(self.dbpkgdir, os.lstat(self.dbpkgdir)) |
286 |
+ self.vartree.dbapi._cache_delta("add", |
287 |
+ self.mycpv, slot, counter) |
288 |
finally: |
289 |
self.unlockdb() |
290 |
|
291 |
-- |
292 |
2.0.4 |