Gentoo Archives: gentoo-commits

From: "Zac Medico (zmedico)" <zmedico@g.o>
To: gentoo-commits@l.g.o
Subject: [gentoo-commits] portage r10868 - main/trunk/pym/_emerge
Date: Tue, 01 Jul 2008 09:47:39
Message-Id: E1KDcST-0003OT-7O@stork.gentoo.org
1 Author: zmedico
2 Date: 2008-07-01 09:47:32 +0000 (Tue, 01 Jul 2008)
3 New Revision: 10868
4
5 Modified:
6 main/trunk/pym/_emerge/__init__.py
7 Log:
8 Add a new BinpkgFetcherAsync class and use it to implement parellel-fetch
9 for --getbinpkg.
10
11
12 Modified: main/trunk/pym/_emerge/__init__.py
13 ===================================================================
14 --- main/trunk/pym/_emerge/__init__.py 2008-07-01 08:43:10 UTC (rev 10867)
15 +++ main/trunk/pym/_emerge/__init__.py 2008-07-01 09:47:32 UTC (rev 10868)
16 @@ -21,7 +21,11 @@
17 sys.exit(1)
18
19 import array
20 +import fcntl
21 import select
22 +import shlex
23 +import urlparse
24 +import weakref
25 import gc
26 import os, stat
27 import platform
28 @@ -1991,6 +1995,195 @@
29 rval = 1
30 return rval
31
32 +class BinpkgFetcherAsync(SlotObject):
33 +
34 + __slots__ = ("cancelled", "log_file", "fd_pipes", "pkg",
35 + "register", "unregister",
36 + "locked", "files", "pid", "pkg_path", "returncode", "_lock_obj")
37 +
38 + _file_names = ("fetcher", "out")
39 + _files_dict = slot_dict_class(_file_names)
40 + _bufsize = 4096
41 +
42 + def __init__(self, **kwargs):
43 + SlotObject.__init__(self, **kwargs)
44 + pkg = self.pkg
45 + self.pkg_path = pkg.root_config.trees["bintree"].getname(pkg.cpv)
46 +
47 + def start(self):
48 +
49 + if self.cancelled:
50 + self.pid = -1
51 + return
52 +
53 + fd_pipes = self.fd_pipes
54 + if fd_pipes is None:
55 + fd_pipes = {
56 + 0 : sys.stdin.fileno(),
57 + 1 : sys.stdout.fileno(),
58 + 2 : sys.stderr.fileno(),
59 + }
60 +
61 + log_file = self.log_file
62 + self.files = self._files_dict()
63 + files = self.files
64 +
65 + if log_file is not None:
66 + files["out"] = open(log_file, "a")
67 + portage.util.apply_secpass_permissions(log_file,
68 + uid=portage.portage_uid, gid=portage.portage_gid,
69 + mode=0660)
70 + else:
71 + # flush any pending output
72 + for fd in fd_pipes.itervalues():
73 + if fd == sys.stdout.fileno():
74 + sys.stdout.flush()
75 + if fd == sys.stderr.fileno():
76 + sys.stderr.flush()
77 +
78 + files["out"] = os.fdopen(os.dup(fd_pipes[1]), 'w')
79 +
80 + master_fd, slave_fd = os.pipe()
81 + fcntl.fcntl(master_fd, fcntl.F_SETFL,
82 + fcntl.fcntl(master_fd, fcntl.F_GETFL) | os.O_NONBLOCK)
83 +
84 + fd_pipes.setdefault(0, sys.stdin.fileno())
85 + fd_pipes_orig = fd_pipes.copy()
86 + fd_pipes[0] = fd_pipes_orig[0]
87 + fd_pipes[1] = slave_fd
88 + fd_pipes[2] = slave_fd
89 +
90 + pkg = self.pkg
91 + bintree = pkg.root_config.trees["bintree"]
92 + settings = bintree.settings
93 + use_locks = "distlocks" in settings.features
94 + pkg_path = self.pkg_path
95 + resume = os.path.exists(pkg_path)
96 +
97 + # urljoin doesn't work correctly with
98 + # unrecognized protocols like sftp
99 + if bintree._remote_has_index:
100 + rel_uri = bintree._remotepkgs[pkg.cpv].get("PATH")
101 + if not rel_uri:
102 + rel_uri = pkg.cpv + ".tbz2"
103 + uri = bintree._remote_base_uri.rstrip("/") + \
104 + "/" + rel_uri.lstrip("/")
105 + else:
106 + uri = settings["PORTAGE_BINHOST"].rstrip("/") + \
107 + "/" + pkg.pf + ".tbz2"
108 +
109 + protocol = urlparse.urlparse(uri)[0]
110 + fcmd_prefix = "FETCHCOMMAND"
111 + if resume:
112 + fcmd_prefix = "RESUMECOMMAND"
113 + fcmd = settings.get(fcmd_prefix + "_" + protocol.upper())
114 + if not fcmd:
115 + fcmd = settings.get(fcmd_prefix)
116 +
117 + fcmd_vars = {
118 + "DISTDIR" : os.path.dirname(pkg_path),
119 + "URI" : uri,
120 + "FILE" : os.path.basename(pkg_path)
121 + }
122 +
123 + fetch_env = dict((k, settings[k]) for k in settings)
124 + fetch_args = [portage.util.varexpand(x, mydict=fcmd_vars) \
125 + for x in shlex.split(fcmd)]
126 +
127 + portage.util.ensure_dirs(os.path.dirname(pkg_path))
128 + if use_locks:
129 + self.lock()
130 +
131 + retval = portage.process.spawn(fetch_args, env=fetch_env,
132 + fd_pipes=fd_pipes, returnpid=True)
133 +
134 + self.pid = retval[0]
135 +
136 + os.close(slave_fd)
137 + files["fetcher"] = os.fdopen(master_fd, 'r')
138 + self.register(files["fetcher"].fileno(),
139 + select.POLLIN, self._output_handler)
140 +
141 + def _output_handler(self, fd, event):
142 + files = self.files
143 + buf = array.array('B')
144 + try:
145 + buf.fromfile(files["fetcher"], self._bufsize)
146 + except EOFError:
147 + pass
148 + if buf:
149 + buf.tofile(files["out"])
150 + files["out"].flush()
151 + else:
152 + self.unregister(files["fetcher"].fileno())
153 + for f in files.values():
154 + f.close()
155 + if self.locked:
156 + self.unlock()
157 +
158 + def lock(self):
159 + """
160 + This raises an AlreadyLocked exception if lock() is called
161 + while a lock is already held. In order to avoid this, call
162 + unlock() or check whether the "locked" attribute is True
163 + or False before calling lock().
164 + """
165 + if self._lock_obj is not None:
166 + raise self.AlreadyLocked((self._lock_obj,))
167 +
168 + self._lock_obj = portage.locks.lockfile(
169 + self.pkg_path, wantnewlockfile=1)
170 + self.locked = True
171 +
172 + class AlreadyLocked(portage.exception.PortageException):
173 + pass
174 +
175 + def unlock(self):
176 + if self._lock_obj is None:
177 + return
178 + portage.locks.unlockfile(self._lock_obj)
179 + self._lock_obj = None
180 + self.locked = False
181 +
182 + def poll(self):
183 + if self.returncode is not None:
184 + return self.returncode
185 + retval = os.waitpid(self.pid, os.WNOHANG)
186 + if retval == (0, 0):
187 + return None
188 + self._set_returncode(retval)
189 + return self.returncode
190 +
191 + def cancel(self):
192 + if self.isAlive():
193 + os.kill(self.pid, signal.SIGTERM)
194 + self.cancelled = True
195 + if self.pid is not None:
196 + self.wait()
197 + return self.returncode
198 +
199 + def isAlive(self):
200 + return self.pid is not None and \
201 + self.returncode is None
202 +
203 + def wait(self):
204 + if self.returncode is not None:
205 + return self.returncode
206 + self._set_returncode(os.waitpid(self.pid, 0))
207 + return self.returncode
208 +
209 + def _set_returncode(self, wait_retval):
210 +
211 + retval = wait_retval[1]
212 + portage.process.spawned_pids.remove(self.pid)
213 + if retval != os.EX_OK:
214 + if retval & 0xff:
215 + retval = (retval & 0xff) << 8
216 + else:
217 + retval = retval >> 8
218 +
219 + self.returncode = retval
220 +
221 class BinpkgMerge(Task):
222
223 __slots__ = ("find_blockers", "ldpath_mtimes",
224 @@ -6573,9 +6766,7 @@
225 self._task_queue.clear()
226 while running_tasks:
227 task = running_tasks.pop()
228 - if task.poll() is None:
229 - os.kill(task.pid, signal.SIGTERM)
230 - task.wait()
231 + task.cancel()
232
233 if rval == os.EX_OK or not keep_going:
234 break
235 @@ -6694,8 +6885,10 @@
236
237 while task_queue and (len(running_tasks) < max_jobs):
238 task = task_queue.popleft()
239 - task.start()
240 - running_tasks.add(task)
241 + cancelled = getattr(task, "cancelled", None)
242 + if not cancelled:
243 + task.start()
244 + running_tasks.add(task)
245 state_changed = True
246
247 return state_changed
248 @@ -6726,16 +6919,28 @@
249 if isinstance(x, Package) and x.operation == "merge"]
250 mtimedb.commit()
251
252 + prefetchers = weakref.WeakValueDictionary()
253 + getbinpkg = "--getbinpkg" in self.myopts
254 +
255 if self._parallel_fetch:
256 for pkg in mylist:
257 - if not isinstance(pkg, Package) or \
258 - not pkg.type_name == "ebuild":
259 + if not isinstance(pkg, Package):
260 continue
261 + if pkg.type_name == "ebuild":
262 + self._add_task(EbuildFetcherAsync(
263 + log_file=self._fetch_log,
264 + pkg=pkg, register=self._register,
265 + unregister=self._unregister))
266 + elif pkg.type_name == "binary" and getbinpkg and \
267 + pkg.root_config.trees["bintree"].isremote(pkg.cpv):
268 + prefetcher = BinpkgFetcherAsync(
269 + log_file=self._fetch_log,
270 + pkg=pkg, register=self._register,
271 + unregister=self._unregister)
272 + prefetchers[pkg] = prefetcher
273 + self._add_task(prefetcher)
274 + del prefetcher
275
276 - self._add_task(EbuildFetcherAsync(log_file=self._fetch_log,
277 - pkg=pkg, register=self._register,
278 - unregister=self._unregister))
279 -
280 # Verify all the manifests now so that the user is notified of failure
281 # as soon as possible.
282 if "--fetchonly" not in self.myopts and \
283 @@ -6805,14 +7010,15 @@
284 self._execute_task(bad_resume_opts,
285 failed_fetches,
286 mydbapi, mergecount,
287 - myfeat, mymergelist, x, xterm_titles)
288 + myfeat, mymergelist, x,
289 + prefetchers, xterm_titles)
290 except self._pkg_failure, e:
291 return e.status
292 return self._post_merge(mtimedb, xterm_titles, failed_fetches)
293
294 def _execute_task(self, bad_resume_opts,
295 failed_fetches, mydbapi, mergecount, myfeat,
296 - mymergelist, pkg, xterm_titles):
297 + mymergelist, pkg, prefetchers, xterm_titles):
298 favorites = self._favorites
299 mtimedb = self._mtimedb
300 from portage.elog import elog_process
301 @@ -6963,8 +7169,27 @@
302 phasefilter=filter_mergephases)
303 build_dir.unlock()
304
305 - elif x[0]=="binary":
306 - #merge the tbz2
307 + elif x.type_name == "binary":
308 + # The prefetcher have already completed or it
309 + # could be running now. If it's running now,
310 + # wait for it to complete since it holds
311 + # a lock on the file being fetched. The
312 + # portage.locks functions are only designed
313 + # to work between separate processes. Since
314 + # the lock is held by the current process,
315 + # use the scheduler and fetcher methods to
316 + # synchronize with the fetcher.
317 + prefetcher = prefetchers.get(pkg)
318 + if prefetcher is not None:
319 + if not prefetcher.isAlive():
320 + prefetcher.cancel()
321 + else:
322 + retval = None
323 + while retval is None:
324 + self._schedule()
325 + retval = prefetcher.poll()
326 + del prefetcher
327 +
328 fetcher = BinpkgFetcher(pkg=pkg, pretend=pretend,
329 use_locks=("distlocks" in pkgsettings.features))
330 mytbz2 = fetcher.pkg_path
331
332 --
333 gentoo-commits@l.g.o mailing list