1 |
Author: zmedico |
2 |
Date: 2008-07-01 09:47:32 +0000 (Tue, 01 Jul 2008) |
3 |
New Revision: 10868 |
4 |
|
5 |
Modified: |
6 |
main/trunk/pym/_emerge/__init__.py |
7 |
Log: |
8 |
Add a new BinpkgFetcherAsync class and use it to implement parellel-fetch |
9 |
for --getbinpkg. |
10 |
|
11 |
|
12 |
Modified: main/trunk/pym/_emerge/__init__.py |
13 |
=================================================================== |
14 |
--- main/trunk/pym/_emerge/__init__.py 2008-07-01 08:43:10 UTC (rev 10867) |
15 |
+++ main/trunk/pym/_emerge/__init__.py 2008-07-01 09:47:32 UTC (rev 10868) |
16 |
@@ -21,7 +21,11 @@ |
17 |
sys.exit(1) |
18 |
|
19 |
import array |
20 |
+import fcntl |
21 |
import select |
22 |
+import shlex |
23 |
+import urlparse |
24 |
+import weakref |
25 |
import gc |
26 |
import os, stat |
27 |
import platform |
28 |
@@ -1991,6 +1995,195 @@ |
29 |
rval = 1 |
30 |
return rval |
31 |
|
32 |
+class BinpkgFetcherAsync(SlotObject): |
33 |
+ |
34 |
+ __slots__ = ("cancelled", "log_file", "fd_pipes", "pkg", |
35 |
+ "register", "unregister", |
36 |
+ "locked", "files", "pid", "pkg_path", "returncode", "_lock_obj") |
37 |
+ |
38 |
+ _file_names = ("fetcher", "out") |
39 |
+ _files_dict = slot_dict_class(_file_names) |
40 |
+ _bufsize = 4096 |
41 |
+ |
42 |
+ def __init__(self, **kwargs): |
43 |
+ SlotObject.__init__(self, **kwargs) |
44 |
+ pkg = self.pkg |
45 |
+ self.pkg_path = pkg.root_config.trees["bintree"].getname(pkg.cpv) |
46 |
+ |
47 |
+ def start(self): |
48 |
+ |
49 |
+ if self.cancelled: |
50 |
+ self.pid = -1 |
51 |
+ return |
52 |
+ |
53 |
+ fd_pipes = self.fd_pipes |
54 |
+ if fd_pipes is None: |
55 |
+ fd_pipes = { |
56 |
+ 0 : sys.stdin.fileno(), |
57 |
+ 1 : sys.stdout.fileno(), |
58 |
+ 2 : sys.stderr.fileno(), |
59 |
+ } |
60 |
+ |
61 |
+ log_file = self.log_file |
62 |
+ self.files = self._files_dict() |
63 |
+ files = self.files |
64 |
+ |
65 |
+ if log_file is not None: |
66 |
+ files["out"] = open(log_file, "a") |
67 |
+ portage.util.apply_secpass_permissions(log_file, |
68 |
+ uid=portage.portage_uid, gid=portage.portage_gid, |
69 |
+ mode=0660) |
70 |
+ else: |
71 |
+ # flush any pending output |
72 |
+ for fd in fd_pipes.itervalues(): |
73 |
+ if fd == sys.stdout.fileno(): |
74 |
+ sys.stdout.flush() |
75 |
+ if fd == sys.stderr.fileno(): |
76 |
+ sys.stderr.flush() |
77 |
+ |
78 |
+ files["out"] = os.fdopen(os.dup(fd_pipes[1]), 'w') |
79 |
+ |
80 |
+ master_fd, slave_fd = os.pipe() |
81 |
+ fcntl.fcntl(master_fd, fcntl.F_SETFL, |
82 |
+ fcntl.fcntl(master_fd, fcntl.F_GETFL) | os.O_NONBLOCK) |
83 |
+ |
84 |
+ fd_pipes.setdefault(0, sys.stdin.fileno()) |
85 |
+ fd_pipes_orig = fd_pipes.copy() |
86 |
+ fd_pipes[0] = fd_pipes_orig[0] |
87 |
+ fd_pipes[1] = slave_fd |
88 |
+ fd_pipes[2] = slave_fd |
89 |
+ |
90 |
+ pkg = self.pkg |
91 |
+ bintree = pkg.root_config.trees["bintree"] |
92 |
+ settings = bintree.settings |
93 |
+ use_locks = "distlocks" in settings.features |
94 |
+ pkg_path = self.pkg_path |
95 |
+ resume = os.path.exists(pkg_path) |
96 |
+ |
97 |
+ # urljoin doesn't work correctly with |
98 |
+ # unrecognized protocols like sftp |
99 |
+ if bintree._remote_has_index: |
100 |
+ rel_uri = bintree._remotepkgs[pkg.cpv].get("PATH") |
101 |
+ if not rel_uri: |
102 |
+ rel_uri = pkg.cpv + ".tbz2" |
103 |
+ uri = bintree._remote_base_uri.rstrip("/") + \ |
104 |
+ "/" + rel_uri.lstrip("/") |
105 |
+ else: |
106 |
+ uri = settings["PORTAGE_BINHOST"].rstrip("/") + \ |
107 |
+ "/" + pkg.pf + ".tbz2" |
108 |
+ |
109 |
+ protocol = urlparse.urlparse(uri)[0] |
110 |
+ fcmd_prefix = "FETCHCOMMAND" |
111 |
+ if resume: |
112 |
+ fcmd_prefix = "RESUMECOMMAND" |
113 |
+ fcmd = settings.get(fcmd_prefix + "_" + protocol.upper()) |
114 |
+ if not fcmd: |
115 |
+ fcmd = settings.get(fcmd_prefix) |
116 |
+ |
117 |
+ fcmd_vars = { |
118 |
+ "DISTDIR" : os.path.dirname(pkg_path), |
119 |
+ "URI" : uri, |
120 |
+ "FILE" : os.path.basename(pkg_path) |
121 |
+ } |
122 |
+ |
123 |
+ fetch_env = dict((k, settings[k]) for k in settings) |
124 |
+ fetch_args = [portage.util.varexpand(x, mydict=fcmd_vars) \ |
125 |
+ for x in shlex.split(fcmd)] |
126 |
+ |
127 |
+ portage.util.ensure_dirs(os.path.dirname(pkg_path)) |
128 |
+ if use_locks: |
129 |
+ self.lock() |
130 |
+ |
131 |
+ retval = portage.process.spawn(fetch_args, env=fetch_env, |
132 |
+ fd_pipes=fd_pipes, returnpid=True) |
133 |
+ |
134 |
+ self.pid = retval[0] |
135 |
+ |
136 |
+ os.close(slave_fd) |
137 |
+ files["fetcher"] = os.fdopen(master_fd, 'r') |
138 |
+ self.register(files["fetcher"].fileno(), |
139 |
+ select.POLLIN, self._output_handler) |
140 |
+ |
141 |
+ def _output_handler(self, fd, event): |
142 |
+ files = self.files |
143 |
+ buf = array.array('B') |
144 |
+ try: |
145 |
+ buf.fromfile(files["fetcher"], self._bufsize) |
146 |
+ except EOFError: |
147 |
+ pass |
148 |
+ if buf: |
149 |
+ buf.tofile(files["out"]) |
150 |
+ files["out"].flush() |
151 |
+ else: |
152 |
+ self.unregister(files["fetcher"].fileno()) |
153 |
+ for f in files.values(): |
154 |
+ f.close() |
155 |
+ if self.locked: |
156 |
+ self.unlock() |
157 |
+ |
158 |
+ def lock(self): |
159 |
+ """ |
160 |
+ This raises an AlreadyLocked exception if lock() is called |
161 |
+ while a lock is already held. In order to avoid this, call |
162 |
+ unlock() or check whether the "locked" attribute is True |
163 |
+ or False before calling lock(). |
164 |
+ """ |
165 |
+ if self._lock_obj is not None: |
166 |
+ raise self.AlreadyLocked((self._lock_obj,)) |
167 |
+ |
168 |
+ self._lock_obj = portage.locks.lockfile( |
169 |
+ self.pkg_path, wantnewlockfile=1) |
170 |
+ self.locked = True |
171 |
+ |
172 |
+ class AlreadyLocked(portage.exception.PortageException): |
173 |
+ pass |
174 |
+ |
175 |
+ def unlock(self): |
176 |
+ if self._lock_obj is None: |
177 |
+ return |
178 |
+ portage.locks.unlockfile(self._lock_obj) |
179 |
+ self._lock_obj = None |
180 |
+ self.locked = False |
181 |
+ |
182 |
+ def poll(self): |
183 |
+ if self.returncode is not None: |
184 |
+ return self.returncode |
185 |
+ retval = os.waitpid(self.pid, os.WNOHANG) |
186 |
+ if retval == (0, 0): |
187 |
+ return None |
188 |
+ self._set_returncode(retval) |
189 |
+ return self.returncode |
190 |
+ |
191 |
+ def cancel(self): |
192 |
+ if self.isAlive(): |
193 |
+ os.kill(self.pid, signal.SIGTERM) |
194 |
+ self.cancelled = True |
195 |
+ if self.pid is not None: |
196 |
+ self.wait() |
197 |
+ return self.returncode |
198 |
+ |
199 |
+ def isAlive(self): |
200 |
+ return self.pid is not None and \ |
201 |
+ self.returncode is None |
202 |
+ |
203 |
+ def wait(self): |
204 |
+ if self.returncode is not None: |
205 |
+ return self.returncode |
206 |
+ self._set_returncode(os.waitpid(self.pid, 0)) |
207 |
+ return self.returncode |
208 |
+ |
209 |
+ def _set_returncode(self, wait_retval): |
210 |
+ |
211 |
+ retval = wait_retval[1] |
212 |
+ portage.process.spawned_pids.remove(self.pid) |
213 |
+ if retval != os.EX_OK: |
214 |
+ if retval & 0xff: |
215 |
+ retval = (retval & 0xff) << 8 |
216 |
+ else: |
217 |
+ retval = retval >> 8 |
218 |
+ |
219 |
+ self.returncode = retval |
220 |
+ |
221 |
class BinpkgMerge(Task): |
222 |
|
223 |
__slots__ = ("find_blockers", "ldpath_mtimes", |
224 |
@@ -6573,9 +6766,7 @@ |
225 |
self._task_queue.clear() |
226 |
while running_tasks: |
227 |
task = running_tasks.pop() |
228 |
- if task.poll() is None: |
229 |
- os.kill(task.pid, signal.SIGTERM) |
230 |
- task.wait() |
231 |
+ task.cancel() |
232 |
|
233 |
if rval == os.EX_OK or not keep_going: |
234 |
break |
235 |
@@ -6694,8 +6885,10 @@ |
236 |
|
237 |
while task_queue and (len(running_tasks) < max_jobs): |
238 |
task = task_queue.popleft() |
239 |
- task.start() |
240 |
- running_tasks.add(task) |
241 |
+ cancelled = getattr(task, "cancelled", None) |
242 |
+ if not cancelled: |
243 |
+ task.start() |
244 |
+ running_tasks.add(task) |
245 |
state_changed = True |
246 |
|
247 |
return state_changed |
248 |
@@ -6726,16 +6919,28 @@ |
249 |
if isinstance(x, Package) and x.operation == "merge"] |
250 |
mtimedb.commit() |
251 |
|
252 |
+ prefetchers = weakref.WeakValueDictionary() |
253 |
+ getbinpkg = "--getbinpkg" in self.myopts |
254 |
+ |
255 |
if self._parallel_fetch: |
256 |
for pkg in mylist: |
257 |
- if not isinstance(pkg, Package) or \ |
258 |
- not pkg.type_name == "ebuild": |
259 |
+ if not isinstance(pkg, Package): |
260 |
continue |
261 |
+ if pkg.type_name == "ebuild": |
262 |
+ self._add_task(EbuildFetcherAsync( |
263 |
+ log_file=self._fetch_log, |
264 |
+ pkg=pkg, register=self._register, |
265 |
+ unregister=self._unregister)) |
266 |
+ elif pkg.type_name == "binary" and getbinpkg and \ |
267 |
+ pkg.root_config.trees["bintree"].isremote(pkg.cpv): |
268 |
+ prefetcher = BinpkgFetcherAsync( |
269 |
+ log_file=self._fetch_log, |
270 |
+ pkg=pkg, register=self._register, |
271 |
+ unregister=self._unregister) |
272 |
+ prefetchers[pkg] = prefetcher |
273 |
+ self._add_task(prefetcher) |
274 |
+ del prefetcher |
275 |
|
276 |
- self._add_task(EbuildFetcherAsync(log_file=self._fetch_log, |
277 |
- pkg=pkg, register=self._register, |
278 |
- unregister=self._unregister)) |
279 |
- |
280 |
# Verify all the manifests now so that the user is notified of failure |
281 |
# as soon as possible. |
282 |
if "--fetchonly" not in self.myopts and \ |
283 |
@@ -6805,14 +7010,15 @@ |
284 |
self._execute_task(bad_resume_opts, |
285 |
failed_fetches, |
286 |
mydbapi, mergecount, |
287 |
- myfeat, mymergelist, x, xterm_titles) |
288 |
+ myfeat, mymergelist, x, |
289 |
+ prefetchers, xterm_titles) |
290 |
except self._pkg_failure, e: |
291 |
return e.status |
292 |
return self._post_merge(mtimedb, xterm_titles, failed_fetches) |
293 |
|
294 |
def _execute_task(self, bad_resume_opts, |
295 |
failed_fetches, mydbapi, mergecount, myfeat, |
296 |
- mymergelist, pkg, xterm_titles): |
297 |
+ mymergelist, pkg, prefetchers, xterm_titles): |
298 |
favorites = self._favorites |
299 |
mtimedb = self._mtimedb |
300 |
from portage.elog import elog_process |
301 |
@@ -6963,8 +7169,27 @@ |
302 |
phasefilter=filter_mergephases) |
303 |
build_dir.unlock() |
304 |
|
305 |
- elif x[0]=="binary": |
306 |
- #merge the tbz2 |
307 |
+ elif x.type_name == "binary": |
308 |
+ # The prefetcher have already completed or it |
309 |
+ # could be running now. If it's running now, |
310 |
+ # wait for it to complete since it holds |
311 |
+ # a lock on the file being fetched. The |
312 |
+ # portage.locks functions are only designed |
313 |
+ # to work between separate processes. Since |
314 |
+ # the lock is held by the current process, |
315 |
+ # use the scheduler and fetcher methods to |
316 |
+ # synchronize with the fetcher. |
317 |
+ prefetcher = prefetchers.get(pkg) |
318 |
+ if prefetcher is not None: |
319 |
+ if not prefetcher.isAlive(): |
320 |
+ prefetcher.cancel() |
321 |
+ else: |
322 |
+ retval = None |
323 |
+ while retval is None: |
324 |
+ self._schedule() |
325 |
+ retval = prefetcher.poll() |
326 |
+ del prefetcher |
327 |
+ |
328 |
fetcher = BinpkgFetcher(pkg=pkg, pretend=pretend, |
329 |
use_locks=("distlocks" in pkgsettings.features)) |
330 |
mytbz2 = fetcher.pkg_path |
331 |
|
332 |
-- |
333 |
gentoo-commits@l.g.o mailing list |