1 |
commit: 8a174d86bba8fa67d38935da2b7b7c51f53606fa |
2 |
Author: André Erdmann <dywi <AT> mailerd <DOT> de> |
3 |
AuthorDate: Mon Jul 2 16:48:31 2012 +0000 |
4 |
Commit: André Erdmann <dywi <AT> mailerd <DOT> de> |
5 |
CommitDate: Mon Jul 2 16:48:31 2012 +0000 |
6 |
URL: http://git.overlays.gentoo.org/gitweb/?p=proj/R_overlay.git;a=commit;h=8a174d86 |
7 |
|
8 |
threaded overlay writing fix some threading issues |
9 |
|
10 |
* the overlay produced can now be written to fs using per-pacackage dir threads |
11 |
* fixed some thread locking issues |
12 |
* added an error queue used by threads to pass exceptions to the main module |
13 |
|
14 |
modified: roverlay/depres/channels.py |
15 |
modified: roverlay/depres/depresolver.py |
16 |
modified: roverlay/ebuild/creation.py |
17 |
modified: roverlay/ebuild/depres.py |
18 |
modified: roverlay/overlay/__init__.py |
19 |
modified: roverlay/overlay/category.py |
20 |
modified: roverlay/overlay/creator.py |
21 |
modified: roverlay/overlay/package.py |
22 |
modified: roverlay/overlay/worker.py |
23 |
|
24 |
--- |
25 |
roverlay/depres/channels.py | 16 ++- |
26 |
roverlay/depres/depresolver.py | 296 +++++++++++++++++++++++++--------------- |
27 |
roverlay/ebuild/creation.py | 17 ++- |
28 |
roverlay/ebuild/depres.py | 9 +- |
29 |
roverlay/overlay/__init__.py | 18 ++- |
30 |
roverlay/overlay/category.py | 76 +++++++++-- |
31 |
roverlay/overlay/creator.py | 119 ++++++++++++---- |
32 |
roverlay/overlay/package.py | 165 +++++++++++------------ |
33 |
roverlay/overlay/worker.py | 81 ++++++++---- |
34 |
9 files changed, 510 insertions(+), 287 deletions(-) |
35 |
|
36 |
diff --git a/roverlay/depres/channels.py b/roverlay/depres/channels.py |
37 |
index 65d4d45..56491f4 100644 |
38 |
--- a/roverlay/depres/channels.py |
39 |
+++ b/roverlay/depres/channels.py |
40 |
@@ -19,7 +19,7 @@ class EbuildJobChannel ( DependencyResolverChannel ): |
41 |
add deps, then satisfy_request(): collect/lookup |
42 |
""" |
43 |
|
44 |
- def __init__ ( self, name=None, logger=None ): |
45 |
+ def __init__ ( self, err_queue, name=None, logger=None ): |
46 |
"""EbuildJobChannel |
47 |
|
48 |
arguments: |
49 |
@@ -32,6 +32,8 @@ class EbuildJobChannel ( DependencyResolverChannel ): |
50 |
# in the join()-method |
51 |
self._depdone = 0 |
52 |
|
53 |
+ self.err_queue = err_queue |
54 |
+ |
55 |
# set of portage packages (resolved deps) |
56 |
# this is None unless all deps have been successfully resolved |
57 |
self._collected_deps = None |
58 |
@@ -170,12 +172,15 @@ class EbuildJobChannel ( DependencyResolverChannel ): |
59 |
# DEPEND/RDEPEND/.. later, seewave requires sci-libs/fftw |
60 |
# in both DEPEND and RDEPEND for example |
61 |
dep_collected = set() |
62 |
- satisfiable = True |
63 |
+ satisfiable = self.err_queue.empty() |
64 |
|
65 |
|
66 |
def handle_queue_item ( dep_env ): |
67 |
self._depdone += 1 |
68 |
- if dep_env.is_resolved(): |
69 |
+ if dep_env is None: |
70 |
+ # could used to unblock the queue |
71 |
+ return self.err_queue.empty() |
72 |
+ elif dep_env.is_resolved(): |
73 |
### and dep_env in self.dep_env_list |
74 |
# successfully resolved |
75 |
dep_collected.add ( dep_env.get_result() [1] ) |
76 |
@@ -190,7 +195,8 @@ class EbuildJobChannel ( DependencyResolverChannel ): |
77 |
|
78 |
# loop until |
79 |
# (a) at least one dependency could not be resolved or |
80 |
- # (b) all deps processed |
81 |
+ # (b) all deps processed or |
82 |
+ # (c) error queue not empty |
83 |
while self._depdone < len ( self.dep_env_list ) and satisfiable: |
84 |
# tell the resolver to start |
85 |
self._depres_master.start() |
86 |
@@ -204,7 +210,7 @@ class EbuildJobChannel ( DependencyResolverChannel ): |
87 |
# --- end while |
88 |
|
89 |
if satisfiable: |
90 |
- self._collected_deps = dep_collected |
91 |
+ self._collected_deps = frozenset ( dep_collected ) |
92 |
return self._collected_deps |
93 |
else: |
94 |
if close_if_unresolvable: self.close() |
95 |
|
96 |
diff --git a/roverlay/depres/depresolver.py b/roverlay/depres/depresolver.py |
97 |
index 26040c0..7f91983 100644 |
98 |
--- a/roverlay/depres/depresolver.py |
99 |
+++ b/roverlay/depres/depresolver.py |
100 |
@@ -29,12 +29,11 @@ class DependencyResolver ( object ): |
101 |
"""Main object for dependency resolution.""" |
102 |
|
103 |
|
104 |
- NUMTHREADS = config.get ( "DEPRES.jobcount", 0 ) |
105 |
+ NUMTHREADS = config.get ( "DEPRES.jobcount", 15 ) |
106 |
|
107 |
def __init__ ( self ): |
108 |
"""Initializes a DependencyResolver.""" |
109 |
|
110 |
- # these loggers are temporary helpers |
111 |
self.logger = logging.getLogger ( self.__class__.__name__ ) |
112 |
self.logger_unresolvable = self.logger.getChild ( "UNRESOLVABLE" ) |
113 |
self.logger_resolved = self.logger.getChild ( "RESOLVED" ) |
114 |
@@ -46,6 +45,7 @@ class DependencyResolver ( object ): |
115 |
|
116 |
# this lock tells whether a dep res 'master' thread is running (locked) |
117 |
self.runlock = threading.Lock() |
118 |
+ self.startlock = threading.Lock() |
119 |
# the dep res main thread |
120 |
self._mainthread = None |
121 |
# the dep res worker threads |
122 |
@@ -86,6 +86,9 @@ class DependencyResolver ( object ): |
123 |
self.all_channel_ids = set() |
124 |
# --- end of __init__ (...) --- |
125 |
|
126 |
+ def set_exception_queue ( self, equeue ): |
127 |
+ self.err_queue = equeue |
128 |
+ |
129 |
def _sort ( self ): |
130 |
"""Sorts the rule pools of this resolver.""" |
131 |
for pool in self.static_rule_pools: pool.sort() |
132 |
@@ -188,7 +191,7 @@ class DependencyResolver ( object ): |
133 |
* channel -- channel to be registered |
134 |
automatically sets channel's resolver to self if it is None |
135 |
|
136 |
- raises: Exception if channels is already registered with this resolver |
137 |
+ raises: Exception if channel is already registered with this resolver |
138 |
|
139 |
returns: channel |
140 |
""" |
141 |
@@ -242,7 +245,6 @@ class DependencyResolver ( object ): |
142 |
except KeyError as expected: |
143 |
# ok |
144 |
pass |
145 |
- |
146 |
# --- end of channel_closed (...) --- |
147 |
|
148 |
def _queue_previously_failed ( self ): |
149 |
@@ -259,81 +261,138 @@ class DependencyResolver ( object ): |
150 |
# --- end of _queue_previously_failed (...) --- |
151 |
|
152 |
def start ( self ): |
153 |
- if not self.runlock.acquire ( False ): |
154 |
- # already running |
155 |
- return True |
156 |
- # -- |
157 |
+ # -- verify whether resolver has to be started |
158 |
+ if self._depqueue.empty(): |
159 |
+ # nothing to resolve |
160 |
+ return |
161 |
+ |
162 |
+ if not self.startlock.acquire ( False ): |
163 |
+ # another channel/.. is starting the resolver |
164 |
+ return |
165 |
+ elif self._depqueue.empty(): |
166 |
+ self.startlock.release() |
167 |
+ return |
168 |
+ |
169 |
+ # -- verify... |
170 |
+ |
171 |
+ # acquire the run lock (that locks _run_main) |
172 |
+ try: |
173 |
+ self.runlock.acquire() |
174 |
+ finally: |
175 |
+ self.startlock.release() |
176 |
+ |
177 |
+ if self._depqueue.empty(): |
178 |
+ self.runlock.release() |
179 |
+ return |
180 |
|
181 |
if DependencyResolver.NUMTHREADS > 0: |
182 |
# no need to wait for the old thread |
183 |
+ # FIXME: could remove the following block |
184 |
+ if self._mainthread is not None: |
185 |
+ self._mainthread.join() |
186 |
+ del self._mainthread |
187 |
+ |
188 |
self._mainthread = threading.Thread ( target=self._thread_run_main ) |
189 |
self._mainthread.start() |
190 |
+ |
191 |
else: |
192 |
self._thread_run_main() |
193 |
|
194 |
# self.runlock is released when _thread_run_main is done |
195 |
# --- end of start (...) --- |
196 |
|
197 |
+ def unblock_channels ( self ): |
198 |
+ # unblock all channels by processing all remaining deps as |
199 |
+ # unresolved |
200 |
+ ## other option: select channel, but this may interfere with |
201 |
+ ## channel_closed() |
202 |
+ channel_gone = set() |
203 |
+ while not self._depqueue.empty(): |
204 |
+ chan, dep_env = self._depqueue.get_nowait() |
205 |
+ dep_env.set_unresolvable() |
206 |
+ |
207 |
+ if chan not in channel_gone: |
208 |
+ try: |
209 |
+ self._depqueue_done [chan].put_nowait ( dep_env ) |
210 |
+ except KeyError: |
211 |
+ channel_gone.add ( chan ) |
212 |
+ # --- end of unblock_channels (...) --- |
213 |
+ |
214 |
def _thread_run_main ( self ): |
215 |
"""Tells the resolver to run.""" |
216 |
- |
217 |
jobcount = DependencyResolver.NUMTHREADS |
218 |
|
219 |
- if jobcount < 1: |
220 |
- ( self.logger.warning if jobcount < 0 else self.logger.debug ) ( |
221 |
- "Running in sequential mode." |
222 |
- ) |
223 |
- self._thread_run_resolve() |
224 |
- else: |
225 |
+ try: |
226 |
+ if jobcount < 1: |
227 |
+ ( self.logger.warning if jobcount < 0 else self.logger.debug ) ( |
228 |
+ "Running in sequential mode." |
229 |
+ ) |
230 |
+ self._thread_run_resolve() |
231 |
+ else: |
232 |
|
233 |
- # wait for old threads |
234 |
- if not self._threads is None: |
235 |
- self.logger.warning ( "Waiting for old threads..." ) |
236 |
+ # wait for old threads |
237 |
+ if not self._threads is None: |
238 |
+ self.logger.warning ( "Waiting for old threads..." ) |
239 |
+ for t in self._threads: t.join() |
240 |
+ |
241 |
+ self.logger.debug ( |
242 |
+ "Running in concurrent mode with %i jobs." % jobcount |
243 |
+ ) |
244 |
+ |
245 |
+ # create threads, |
246 |
+ self._threads = tuple ( |
247 |
+ threading.Thread ( target=self._thread_run_resolve ) |
248 |
+ for n in range (jobcount) |
249 |
+ ) |
250 |
+ # run them |
251 |
+ for t in self._threads: t.start() |
252 |
+ # and wait until done |
253 |
for t in self._threads: t.join() |
254 |
|
255 |
- self.logger.warning ( |
256 |
- "Running in concurrent mode with %i jobs." % jobcount |
257 |
- ) |
258 |
+ # finally remove them |
259 |
+ del self._threads |
260 |
+ self._threads = None |
261 |
|
262 |
- # create threads, |
263 |
- self._threads = [ |
264 |
- threading.Thread ( target=self._thread_run_resolve ) |
265 |
- for n in range (jobcount) |
266 |
- ] |
267 |
- # run them |
268 |
- for t in self._threads: t.start() |
269 |
- # and wait until done |
270 |
- for t in self._threads: t.join() |
271 |
|
272 |
- # finally remove them |
273 |
- del self._threads |
274 |
- self._threads = None |
275 |
+ # iterate over _depqueue_failed and report unresolved |
276 |
+ ## todo can thread this |
277 |
+ while not self._depqueue_failed.empty(): |
278 |
+ try: |
279 |
+ channel_id, dep_env = self._depqueue_failed.get_nowait() |
280 |
|
281 |
+ except queue.Empty: |
282 |
+ # race cond empty() <-> get_nowait() |
283 |
+ return |
284 |
|
285 |
- # iterate over _depqueue_failed and report unresolved |
286 |
- ## todo can thread this |
287 |
- while not self._depqueue_failed.empty(): |
288 |
- try: |
289 |
- channel_id, dep_env = self._depqueue_failed.get_nowait() |
290 |
|
291 |
dep_env.set_unresolvable() |
292 |
|
293 |
self._report_event ( 'UNRESOLVABLE', dep_env ) |
294 |
|
295 |
- if channel_id in self._depqueue_done: |
296 |
- ## todo/fixme/whatever: this 'if' can filter out channels |
297 |
- ## that have been added again |
298 |
- self._depqueue_done [channel_id].put ( dep_env ) |
299 |
|
300 |
- except queue.Empty: |
301 |
- # race cond empty() <-> get_nowait() |
302 |
- break |
303 |
- except KeyError: |
304 |
- # channel has been closed before calling put, ignore this err |
305 |
- pass |
306 |
+ try: |
307 |
+ if channel_id in self._depqueue_done: |
308 |
+ self._depqueue_done [channel_id].put_nowait ( dep_env ) |
309 |
+ except KeyError: |
310 |
+ # channel has been closed before calling put, ignore this |
311 |
+ pass |
312 |
+ |
313 |
+ if not self.err_queue.empty(): |
314 |
+ self.unblock_channels() |
315 |
+ |
316 |
+ except ( Exception, KeyboardInterrupt ) as e: |
317 |
+ |
318 |
+ self.unblock_channels() |
319 |
|
320 |
- # release the lock |
321 |
- self.runlock.release() |
322 |
+ if jobcount > 0 and hasattr ( self, 'err_queue' ): |
323 |
+ self.err_queue.put_nowait ( id ( self ), e ) |
324 |
+ return |
325 |
+ else: |
326 |
+ raise e |
327 |
+ |
328 |
+ finally: |
329 |
+ # release the lock |
330 |
+ self.runlock.release() |
331 |
|
332 |
# --- end of _thread_run_main (...) --- |
333 |
|
334 |
@@ -342,77 +401,90 @@ class DependencyResolver ( object ): |
335 |
|
336 |
returns: None (implicit) |
337 |
""" |
338 |
+ try: |
339 |
+ while self.err_queue.empty() and not self._depqueue.empty(): |
340 |
|
341 |
- while not self._depqueue.empty(): |
342 |
- |
343 |
- try: |
344 |
- to_resolve = self._depqueue.get_nowait() |
345 |
- except queue.Empty: |
346 |
- # this thread is done when the queue is empty, so this is |
347 |
- # no error, but just the result of the race condition between |
348 |
- # queue.empty() and queue.get(False) |
349 |
- return None |
350 |
- |
351 |
- channel_id, dep_env = to_resolve |
352 |
- |
353 |
- if channel_id in self._depqueue_done: |
354 |
- # else channel has been closed, drop dep |
355 |
- |
356 |
- self.logger.debug ( "Trying to resolve '%s'." % dep_env.dep_str ) |
357 |
- |
358 |
- #have_new_rule = False |
359 |
+ try: |
360 |
+ to_resolve = self._depqueue.get_nowait() |
361 |
+ except queue.Empty: |
362 |
+ # this thread is done when the queue is empty, so this is |
363 |
+ # no error, but just the result of the race condition between |
364 |
+ # queue.empty() and queue.get(False) |
365 |
+ return |
366 |
|
367 |
- resolved = None |
368 |
- # resolved can be None, so use a tri-state int for checking |
369 |
- # 0 -> unresolved, but resolvable |
370 |
- # 1 -> unresolved and (currently, new rules may change this) |
371 |
- # not resolvable |
372 |
- # 2 -> resolved |
373 |
- is_resolved = 0 |
374 |
+ channel_id, dep_env = to_resolve |
375 |
|
376 |
- # TODO: |
377 |
- # (threading: could search the pools in parallel) |
378 |
+ if channel_id in self._depqueue_done: |
379 |
+ # else channel has been closed, drop dep |
380 |
|
381 |
- if USING_DEPRES_CACHE: |
382 |
- if dep_env.dep_str_low in self._dep_unresolvable: |
383 |
- # cannot resolve |
384 |
- is_resolved = 1 |
385 |
+ self.logger.debug ( |
386 |
+ "Trying to resolve '%s'." % dep_env.dep_str |
387 |
+ ) |
388 |
|
389 |
- if is_resolved == 0: |
390 |
- # search for a match in the rule pools |
391 |
- for rulepool in self.static_rule_pools: |
392 |
- result = rulepool.matches ( dep_env ) |
393 |
- if not result is None and result [0] > 0: |
394 |
- resolved = result [1] |
395 |
- is_resolved = 2 |
396 |
- break |
397 |
+ #have_new_rule = False |
398 |
|
399 |
+ resolved = None |
400 |
+ # resolved can be None, so use a tri-state int for checking |
401 |
+ # 0 -> unresolved, but resolvable |
402 |
+ # 1 -> unresolved and (currently, new rules may change this) |
403 |
+ # not resolvable |
404 |
+ # 2 -> resolved |
405 |
+ is_resolved = 0 |
406 |
|
407 |
+ # TODO: |
408 |
+ # (threading: could search the pools in parallel) |
409 |
|
410 |
- if is_resolved == 2: |
411 |
- dep_env.set_resolved ( resolved, append=False ) |
412 |
- self._report_event ( 'RESOLVED', dep_env ) |
413 |
- self._depqueue_done [channel_id].put ( dep_env ) |
414 |
- else: |
415 |
- self._depqueue_failed.put ( to_resolve ) |
416 |
- |
417 |
- if USING_DEPRES_CACHE: |
418 |
- # does not work when adding new rules is possible |
419 |
- self._dep_unresolvable.add ( dep_env.dep_str_low ) |
420 |
- |
421 |
- """ |
422 |
- ## only useful if new rules can be created |
423 |
- # new rule found, requeue all previously failed dependency searches |
424 |
- if have_new_rule: |
425 |
- self._queue_previously_failed |
426 |
if USING_DEPRES_CACHE: |
427 |
- self._dep_unresolvable.clear() #? |
428 |
- """ |
429 |
- # --- end if channel_id in self._depqueue_done |
430 |
- |
431 |
- self._depqueue.task_done() |
432 |
- # --- end while |
433 |
- |
434 |
+ if dep_env.dep_str_low in self._dep_unresolvable: |
435 |
+ # cannot resolve |
436 |
+ is_resolved = 1 |
437 |
+ |
438 |
+ if is_resolved == 0: |
439 |
+ # search for a match in the rule pools |
440 |
+ for rulepool in self.static_rule_pools: |
441 |
+ result = rulepool.matches ( dep_env ) |
442 |
+ if not result is None and result [0] > 0: |
443 |
+ resolved = result [1] |
444 |
+ is_resolved = 2 |
445 |
+ break |
446 |
+ |
447 |
+ |
448 |
+ |
449 |
+ if is_resolved == 2: |
450 |
+ dep_env.set_resolved ( resolved, append=False ) |
451 |
+ self._report_event ( 'RESOLVED', dep_env ) |
452 |
+ try: |
453 |
+ self._depqueue_done [channel_id].put ( dep_env ) |
454 |
+ except KeyError: |
455 |
+ # channel gone while resolving |
456 |
+ pass |
457 |
+ else: |
458 |
+ self._depqueue_failed.put ( to_resolve ) |
459 |
+ |
460 |
+ if USING_DEPRES_CACHE: |
461 |
+ # does not work when adding new rules is possible |
462 |
+ self._dep_unresolvable.add ( dep_env.dep_str_low ) |
463 |
+ |
464 |
+ """ |
465 |
+ ## only useful if new rules can be created |
466 |
+ # new rule found, requeue all previously |
467 |
+ # failed dependency searches |
468 |
+ if have_new_rule: |
469 |
+ self._queue_previously_failed |
470 |
+ if USING_DEPRES_CACHE: |
471 |
+ self._dep_unresolvable.clear() #? |
472 |
+ """ |
473 |
+ # --- end if channel_id in self._depqueue_done |
474 |
+ |
475 |
+ self._depqueue.task_done() |
476 |
+ # --- end while |
477 |
+ |
478 |
+ except ( Exception, KeyboardInterrupt ) as e: |
479 |
+ if jobcount > 0 and hasattr ( self, 'err_queue' ): |
480 |
+ self.err_queue.put_nowait ( id ( self ), e ) |
481 |
+ return |
482 |
+ else: |
483 |
+ raise e |
484 |
|
485 |
|
486 |
# --- end of _thread_run_resolve (...) --- |
487 |
|
488 |
diff --git a/roverlay/ebuild/creation.py b/roverlay/ebuild/creation.py |
489 |
index 056b1a2..b606ae9 100644 |
490 |
--- a/roverlay/ebuild/creation.py |
491 |
+++ b/roverlay/ebuild/creation.py |
492 |
@@ -22,7 +22,7 @@ FALLBACK_DESCRIPTION = "<none>" |
493 |
class EbuildCreation ( object ): |
494 |
"""Used to create an ebuild using DESCRIPTION data.""" |
495 |
|
496 |
- def __init__ ( self, package_info, depres_channel_spawner=None ): |
497 |
+ def __init__ ( self, package_info, err_queue, depres_channel_spawner=None ): |
498 |
"""Initializes the creation of an ebuild. |
499 |
|
500 |
arguments: |
501 |
@@ -39,6 +39,8 @@ class EbuildCreation ( object ): |
502 |
|
503 |
self.depres_channel_spawner = depres_channel_spawner |
504 |
|
505 |
+ self.err_queue = err_queue |
506 |
+ |
507 |
self.package_info.set_readonly() |
508 |
# --- end of __init__ (...) --- |
509 |
|
510 |
@@ -64,10 +66,10 @@ class EbuildCreation ( object ): |
511 |
self.logger.info ( "Cannot create an ebuild for this package." ) |
512 |
self.status = -1 |
513 |
|
514 |
- except Exception as e: |
515 |
- # log this and set status to fail |
516 |
+ except ( Exception, KeyboardInterrupt ): |
517 |
+ # set status to fail |
518 |
self.status = -10 |
519 |
- self.logger.exception ( e ) |
520 |
+ raise |
521 |
# --- end of run (...) --- |
522 |
|
523 |
def _lazyimport_desc_data ( self ): |
524 |
@@ -82,8 +84,8 @@ class EbuildCreation ( object ): |
525 |
logger=self.logger, |
526 |
read_now=True |
527 |
) |
528 |
- self.package_info.set_writeable() |
529 |
- self.package_info.update ( |
530 |
+ |
531 |
+ self.package_info.update_now ( |
532 |
desc_data=reader.get_desc ( run_if_unset=False ) |
533 |
) |
534 |
del reader |
535 |
@@ -133,7 +135,8 @@ class EbuildCreation ( object ): |
536 |
_dep_resolution = depres.EbuildDepRes ( |
537 |
self.package_info, self.logger, |
538 |
create_iuse=True, run_now=True, |
539 |
- depres_channel_spawner=self.depres_channel_spawner |
540 |
+ depres_channel_spawner=self.depres_channel_spawner, |
541 |
+ err_queue=self.err_queue |
542 |
) |
543 |
if not _dep_resolution.success(): |
544 |
# log here? (FIXME) |
545 |
|
546 |
diff --git a/roverlay/ebuild/depres.py b/roverlay/ebuild/depres.py |
547 |
index 383831e..1c19a00 100644 |
548 |
--- a/roverlay/ebuild/depres.py |
549 |
+++ b/roverlay/ebuild/depres.py |
550 |
@@ -22,8 +22,8 @@ class EbuildDepRes ( object ): |
551 |
"""Handles dependency resolution for a single ebuild.""" |
552 |
|
553 |
def __init__ ( |
554 |
- self, package_info, logger, depres_channel_spawner, |
555 |
- create_iuse=True, run_now=True |
556 |
+ self, package_info, logger, depres_channel_spawner, err_queue, |
557 |
+ create_iuse=True, run_now=True, |
558 |
): |
559 |
"""Initializes an EbuildDepRes. |
560 |
|
561 |
@@ -45,6 +45,8 @@ class EbuildDepRes ( object ): |
562 |
self.has_suggests = None |
563 |
self.create_iuse = create_iuse |
564 |
|
565 |
+ self.err_queue = err_queue |
566 |
+ |
567 |
self._channels = None |
568 |
|
569 |
if run_now: |
570 |
@@ -90,7 +92,8 @@ class EbuildDepRes ( object ): |
571 |
if dependency_type not in self._channels: |
572 |
self._channels [dependency_type] = self.request_resolver ( |
573 |
name=dependency_type, |
574 |
- logger=self.logger |
575 |
+ logger=self.logger, |
576 |
+ err_queue=self.err_queue |
577 |
) |
578 |
return self._channels [dependency_type] |
579 |
# --- end of get_channel (...) --- |
580 |
|
581 |
diff --git a/roverlay/overlay/__init__.py b/roverlay/overlay/__init__.py |
582 |
index 9ca6abe..a8a939d 100644 |
583 |
--- a/roverlay/overlay/__init__.py |
584 |
+++ b/roverlay/overlay/__init__.py |
585 |
@@ -77,14 +77,16 @@ class Overlay ( object ): |
586 |
""" |
587 |
if not category in self._categories: |
588 |
self._catlock.acquire() |
589 |
- if not category in self._categories: |
590 |
- self._categories [category] = Category ( |
591 |
- category, |
592 |
- self.logger, |
593 |
- None if self.physical_location is None else \ |
594 |
- os.path.join ( self.physical_location, category ) |
595 |
- ) |
596 |
- self._catlock.release() |
597 |
+ try: |
598 |
+ if not category in self._categories: |
599 |
+ self._categories [category] = Category ( |
600 |
+ category, |
601 |
+ self.logger, |
602 |
+ None if self.physical_location is None else \ |
603 |
+ os.path.join ( self.physical_location, category ) |
604 |
+ ) |
605 |
+ finally: |
606 |
+ self._catlock.release() |
607 |
|
608 |
return self._categories [category] |
609 |
# --- end of _get_category (...) --- |
610 |
|
611 |
diff --git a/roverlay/overlay/category.py b/roverlay/overlay/category.py |
612 |
index 910f194..e938dbf 100644 |
613 |
--- a/roverlay/overlay/category.py |
614 |
+++ b/roverlay/overlay/category.py |
615 |
@@ -5,12 +5,19 @@ |
616 |
import threading |
617 |
import os.path |
618 |
|
619 |
+try: |
620 |
+ import queue |
621 |
+except ImportError: |
622 |
+ import Queue as queue |
623 |
+ |
624 |
from roverlay.overlay.package import PackageDir |
625 |
|
626 |
import roverlay.util |
627 |
|
628 |
class Category ( object ): |
629 |
|
630 |
+ WRITE_JOBCOUNT = 3 |
631 |
+ |
632 |
def __init__ ( self, name, logger, directory ): |
633 |
"""Initializes a overlay/portage category (such as 'app-text', 'sci-R'). |
634 |
|
635 |
@@ -45,14 +52,16 @@ class Category ( object ): |
636 |
|
637 |
if not pkg_name in self._subdirs: |
638 |
self._lock.acquire() |
639 |
- if not pkg_name in self._subdirs: |
640 |
- self._subdirs [pkg_name] = PackageDir ( |
641 |
- pkg_name, |
642 |
- self.logger, |
643 |
- None if self.physical_location is None else \ |
644 |
- os.path.join ( self.physical_location, pkg_name ) |
645 |
- ) |
646 |
- self._lock.release() |
647 |
+ try: |
648 |
+ if not pkg_name in self._subdirs: |
649 |
+ self._subdirs [pkg_name] = PackageDir ( |
650 |
+ pkg_name, |
651 |
+ self.logger, |
652 |
+ None if self.physical_location is None else \ |
653 |
+ os.path.join ( self.physical_location, pkg_name ) |
654 |
+ ) |
655 |
+ finally: |
656 |
+ self._lock.release() |
657 |
|
658 |
self._subdirs [pkg_name].add ( package_info ) |
659 |
# --- end of add (...) --- |
660 |
@@ -92,13 +101,56 @@ class Category ( object ): |
661 |
package.show ( **show_kw ) |
662 |
# --- end of show (...) --- |
663 |
|
664 |
+ def _run_write_queue ( self, q, write_kw ): |
665 |
+ try: |
666 |
+ while not q.empty(): |
667 |
+ pkg = q.get_nowait() |
668 |
+ pkg.write ( **write_kw ) |
669 |
+ |
670 |
+ except queue.Empty: |
671 |
+ pass |
672 |
+ except ( Exception, KeyboardInterrupt ) as e: |
673 |
+ self.RERAISE_EXCEPTION = e |
674 |
+ |
675 |
+ # --- end of _run_write_queue (...) --- |
676 |
+ |
677 |
def write ( self, **write_kw ): |
678 |
"""Writes this category to its filesystem location. |
679 |
|
680 |
returns: None (implicit) |
681 |
""" |
682 |
- for package in self._subdirs.values(): |
683 |
- if package.physical_location and not package.empty(): |
684 |
- roverlay.util.dodir ( package.physical_location ) |
685 |
- package.write ( **write_kw ) |
686 |
+ |
687 |
+ max_jobs = self.__class__.WRITE_JOBCOUNT |
688 |
+ |
689 |
+ # todo len.. > 3: what's an reasonable number of min package dirs to |
690 |
+ # start threaded writing? |
691 |
+ if max_jobs > 1 and len ( self._subdirs ) > 3: |
692 |
+ |
693 |
+ # writing 1..self.__class__.WRITE_JOBCOUNT package dirs at once |
694 |
+ |
695 |
+ write_queue = queue.Queue() |
696 |
+ for package in self._subdirs.values(): |
697 |
+ if package.physical_location and not package.empty(): |
698 |
+ roverlay.util.dodir ( package.physical_location ) |
699 |
+ write_queue.put_nowait ( package ) |
700 |
+ |
701 |
+ |
702 |
+ if not write_queue.empty(): |
703 |
+ workers = ( |
704 |
+ threading.Thread ( |
705 |
+ target=self._run_write_queue, |
706 |
+ args=( write_queue, write_kw ) |
707 |
+ ) for n in range ( max_jobs ) |
708 |
+ ) |
709 |
+ |
710 |
+ for w in workers: w.start() |
711 |
+ for w in workers: w.join() |
712 |
+ |
713 |
+ if hasattr ( self, 'RERAISE_EXCEPTION' ): |
714 |
+ raise self.RERAISE_EXCEPTION |
715 |
+ else: |
716 |
+ for package in self._subdirs.values(): |
717 |
+ if package.physical_location and not package.empty(): |
718 |
+ roverlay.util.dodir ( package.physical_location ) |
719 |
+ package.write ( **write_kw ) |
720 |
# --- end of write (...) --- |
721 |
|
722 |
diff --git a/roverlay/overlay/creator.py b/roverlay/overlay/creator.py |
723 |
index 608ba77..aa1a3a8 100644 |
724 |
--- a/roverlay/overlay/creator.py |
725 |
+++ b/roverlay/overlay/creator.py |
726 |
@@ -5,6 +5,9 @@ |
727 |
import time |
728 |
import logging |
729 |
import threading |
730 |
+import signal |
731 |
+import traceback |
732 |
+import sys |
733 |
|
734 |
try: |
735 |
import queue |
736 |
@@ -40,18 +43,26 @@ class OverlayCreator ( object ): |
737 |
|
738 |
self.depresolver = easyresolver.setup() |
739 |
|
740 |
- self.NUMTHREADS = config.get ( 'EBUILD.jobcount', 0 ) |
741 |
+ self.NUMTHREADS = config.get ( 'EBUILD.jobcount', 10 ) |
742 |
|
743 |
# -- |
744 |
- self._pkg_queue = queue.Queue() |
745 |
- self._workers = None |
746 |
- self._runlock = threading.RLock() |
747 |
+ self._pkg_queue = queue.Queue() |
748 |
+ |
749 |
+ # this queue is used to propagate exceptions from threads |
750 |
+ # it's |
751 |
+ self._err_queue = queue.Queue() |
752 |
+ |
753 |
+ |
754 |
+ self._workers = None |
755 |
+ self._runlock = threading.RLock() |
756 |
|
757 |
self.can_write_overlay = OVERLAY_WRITE_ALLOWED |
758 |
|
759 |
# this is a method that adds PackageInfo objects to the pkg queue |
760 |
self.add_package = self._pkg_queue.put |
761 |
|
762 |
+ self.depresolver.set_exception_queue ( self._err_queue ) |
763 |
+ |
764 |
# --- end of __init__ (...) --- |
765 |
|
766 |
def _timestamp ( self, description, start, stop=None ): |
767 |
@@ -109,17 +120,24 @@ class OverlayCreator ( object ): |
768 |
def run ( self ): |
769 |
"""Starts ebuild creation and waits until done.""" |
770 |
self._runlock.acquire() |
771 |
- self.start() |
772 |
- self.join() |
773 |
- self._runlock.release() |
774 |
+ try: |
775 |
+ self.start() |
776 |
+ self.join() |
777 |
+ finally: |
778 |
+ self._runlock.release() |
779 |
# --- end of run (...) --- |
780 |
|
781 |
def start ( self ): |
782 |
"""Starts ebuild creation.""" |
783 |
self._runlock.acquire() |
784 |
- self.join() |
785 |
- self._make_workers() |
786 |
- self._runlock.release() |
787 |
+ try: |
788 |
+ self.join() |
789 |
+ self._make_workers() |
790 |
+ except: |
791 |
+ self._err_queue.put_nowait ( ( -1, None ) ) |
792 |
+ raise |
793 |
+ finally: |
794 |
+ self._runlock.release() |
795 |
# --- end of start (...) --- |
796 |
|
797 |
def join ( self ): |
798 |
@@ -142,13 +160,14 @@ class OverlayCreator ( object ): |
799 |
if self.depresolver is None: return |
800 |
|
801 |
self._runlock.acquire() |
802 |
- if self.depresolver is None: return |
803 |
|
804 |
- self.depresolver.close() |
805 |
- del self.depresolver |
806 |
- self.depresolver = None |
807 |
- |
808 |
- self._runlock.release() |
809 |
+ try: |
810 |
+ if self.depresolver is not None: |
811 |
+ self.depresolver.close() |
812 |
+ del self.depresolver |
813 |
+ self.depresolver = None |
814 |
+ finally: |
815 |
+ self._runlock.release() |
816 |
# --- end of _close_resolver (...) --- |
817 |
|
818 |
def _waitfor_workers ( self, do_close ): |
819 |
@@ -157,26 +176,65 @@ class OverlayCreator ( object ): |
820 |
arguments: |
821 |
* do_close -- close (exit) workers if True, else wait until done. |
822 |
""" |
823 |
- if self._workers is None: return |
824 |
|
825 |
- self._runlock.acquire() |
826 |
if self._workers is None: return |
827 |
+ start = None |
828 |
+ self._runlock.acquire() |
829 |
|
830 |
- if self.NUMTHREADS > 0: start = time.time() |
831 |
+ try: |
832 |
+ if self._workers is not None: |
833 |
+ if self.NUMTHREADS > 0: start = time.time() |
834 |
+ |
835 |
+ if do_close: |
836 |
+ self._err_queue.put_nowait ( ( -1, None ) ) |
837 |
+ # fixme: remove enabled? |
838 |
+ for w in self._workers: w.enabled = False |
839 |
+ else: |
840 |
+ for w in self._workers: w.stop_when_empty() |
841 |
+ |
842 |
+ while True in ( w.active() for w in self._workers ): |
843 |
+ self._pkg_queue.put ( None ) |
844 |
+ |
845 |
+ del self._workers |
846 |
+ self._workers = None |
847 |
+ |
848 |
+ while not self._err_queue.empty(): |
849 |
+ e = self._err_queue.get_nowait() |
850 |
+ self._err_queue.put_nowait ( ( -2, None ) ) |
851 |
+ if isinstance ( e [1], ( Exception, KeyboardInterrupt ) ): |
852 |
+ self._err_queue.put ( e ) |
853 |
+ self.logger.warning ( "Reraising thread exception." ) |
854 |
+ raise e [1] |
855 |
+ |
856 |
+ |
857 |
+ |
858 |
+ except ( Exception, KeyboardInterrupt ) as err: |
859 |
+ # catch interrupt here: still wait until all workers have been closed |
860 |
+ # and reraise after that |
861 |
+# SIGINT_RESTORE = signal.signal ( |
862 |
+# signal.SIGINT, |
863 |
+# lambda sig, frame : sys.stderr.write ( "Please wait ...\n" ) |
864 |
+# ) |
865 |
+ |
866 |
+ try: |
867 |
+ self._err_queue.put_nowait ( ( -1, None ) ) |
868 |
+ self.depresolver.unblock_channels() |
869 |
+ while hasattr ( self, '_workers' ) and self._workers is not None: |
870 |
|
871 |
- if do_close: |
872 |
- for w in self._workers: w.enabled = False |
873 |
- else: |
874 |
- for w in self._workers: w.stop_when_empty() |
875 |
|
876 |
- while True in ( w.active() for w in self._workers ): |
877 |
- self._pkg_queue.put ( None ) |
878 |
+ if True in ( w.active() for w in self._workers ): |
879 |
+ self._pkg_queue.put_nowait ( None ) |
880 |
+ else: |
881 |
+ del self._workers |
882 |
+ self._workers = None |
883 |
+ finally: |
884 |
+# signal.signal ( signal.SIGINT, SIGINT_RESTORE ) |
885 |
+ raise |
886 |
|
887 |
- del self._workers |
888 |
- self._workers = None |
889 |
+ finally: |
890 |
+ self._runlock.release() |
891 |
|
892 |
- self._runlock.release() |
893 |
- if self.NUMTHREADS > 0: return start |
894 |
+ return start |
895 |
# --- end of _waitfor_workers (...) --- |
896 |
|
897 |
def _join_workers ( self ): |
898 |
@@ -221,7 +279,8 @@ class OverlayCreator ( object ): |
899 |
""" |
900 |
w = OverlayWorker ( |
901 |
self._pkg_queue, self.depresolver, self.logger, self._pkg_done, |
902 |
- use_threads=use_threads |
903 |
+ use_threads=use_threads, |
904 |
+ err_queue=self._err_queue |
905 |
) |
906 |
if start_now: w.start() |
907 |
return w |
908 |
|
909 |
diff --git a/roverlay/overlay/package.py b/roverlay/overlay/package.py |
910 |
index e77d4a2..77fcbee 100644 |
911 |
--- a/roverlay/overlay/package.py |
912 |
+++ b/roverlay/overlay/package.py |
913 |
@@ -69,64 +69,60 @@ class PackageDir ( object ): |
914 |
raise Exception ( "cannot write - no directory assigned!" ) |
915 |
|
916 |
self._lock.acquire() |
917 |
- self._regen_metadata() |
918 |
+ try: |
919 |
+ self._regen_metadata() |
920 |
|
921 |
- # mkdir not required here, overlay.Category does this |
922 |
+ # mkdir not required here, overlay.Category does this |
923 |
|
924 |
- # write ebuilds |
925 |
- for ver, p_info in self._packages.items(): |
926 |
- fh = None |
927 |
- try: |
928 |
- efile = self._get_ebuild_filepath ( ver ) |
929 |
+ # write ebuilds |
930 |
+ for ver, p_info in self._packages.items(): |
931 |
+ fh = None |
932 |
+ try: |
933 |
+ efile = self._get_ebuild_filepath ( ver ) |
934 |
|
935 |
- ebuild = p_info ['ebuild'] |
936 |
+ ebuild = p_info ['ebuild'] |
937 |
|
938 |
- fh = open ( efile, 'w' ) |
939 |
- if isinstance ( ebuild, str ): |
940 |
- if default_header is not None: |
941 |
- fh.write ( default_header ) |
942 |
- fh.write ( '\n\n' ) |
943 |
- fh.write ( ebuild ) |
944 |
+ fh = open ( efile, 'w' ) |
945 |
+ fh.write ( default_header ) |
946 |
+ fh.write ( '\n\n' ) |
947 |
+ fh.write ( str ( ebuild ) ) |
948 |
fh.write ( '\n' ) |
949 |
- else: |
950 |
- ebuild.write ( |
951 |
- fh, |
952 |
- header=default_header, header_is_fallback=True |
953 |
- ) |
954 |
- if fh: fh.close() |
955 |
|
956 |
- # adjust owner/perm? TODO |
957 |
- # chmod 0644 or 0444 |
958 |
- # chown 250.250 |
959 |
+ if fh: fh.close() |
960 |
|
961 |
- # this marks the package as 'written to fs' |
962 |
- p_info.set_writeable() |
963 |
- p_info ['ebuild_file'] = efile |
964 |
- p_info.set_readonly() |
965 |
+ # adjust owner/perm? TODO |
966 |
+ # chmod 0644 or 0444 |
967 |
+ # chown 250.250 |
968 |
|
969 |
- self.logger.info ( "Wrote ebuild %s." % efile ) |
970 |
- except IOError as e: |
971 |
- if fh: fh.close() |
972 |
- self.logger.error ( "Couldn't write ebuild %s." % efile ) |
973 |
- self.logger.exception ( e ) |
974 |
+ # this marks the package as 'written to fs' |
975 |
+ p_info.set_writeable() |
976 |
+ p_info ['ebuild_file'] = efile |
977 |
+ p_info.set_readonly() |
978 |
|
979 |
- # write metadata |
980 |
- fh = None |
981 |
- try: |
982 |
- mfile = self._get_metadata_filepath() |
983 |
+ self.logger.info ( "Wrote ebuild %s." % efile ) |
984 |
+ except IOError as e: |
985 |
+ if fh: fh.close() |
986 |
+ self.logger.error ( "Couldn't write ebuild %s." % efile ) |
987 |
+ self.logger.exception ( e ) |
988 |
|
989 |
- fh = open ( mfile, 'w' ) |
990 |
- self._metadata.write ( fh ) |
991 |
- if fh: fh.close() |
992 |
+ # write metadata |
993 |
+ fh = None |
994 |
+ try: |
995 |
+ mfile = self._get_metadata_filepath() |
996 |
|
997 |
- except IOError as e: |
998 |
- if fh: fh.close() |
999 |
- self.logger.error ( "Failed to write metadata at %s." % mfile ) |
1000 |
- self.logger.exception ( e ) |
1001 |
+ fh = open ( mfile, 'w' ) |
1002 |
+ self._metadata.write ( fh ) |
1003 |
+ if fh: fh.close() |
1004 |
|
1005 |
- self.generate_manifest() |
1006 |
+ except IOError as e: |
1007 |
+ if fh: fh.close() |
1008 |
+ self.logger.error ( "Failed to write metadata at %s." % mfile ) |
1009 |
+ self.logger.exception ( e ) |
1010 |
|
1011 |
- self._lock.release() |
1012 |
+ self.generate_manifest() |
1013 |
+ |
1014 |
+ finally: |
1015 |
+ self._lock.release() |
1016 |
# --- end of write (...) --- |
1017 |
|
1018 |
def show ( self, stream=sys.stderr, default_header=None ): |
1019 |
@@ -141,35 +137,31 @@ class PackageDir ( object ): |
1020 |
* IOError |
1021 |
""" |
1022 |
self._lock.acquire() |
1023 |
- self._regen_metadata() |
1024 |
+ try: |
1025 |
+ self._regen_metadata() |
1026 |
|
1027 |
|
1028 |
- for ver, p_info in self._packages.items(): |
1029 |
- efile = self._get_ebuild_filepath ( ver ) |
1030 |
- ebuild = p_info ['ebuild'] |
1031 |
+ for ver, p_info in self._packages.items(): |
1032 |
+ efile = self._get_ebuild_filepath ( ver ) |
1033 |
+ ebuild = p_info ['ebuild'] |
1034 |
|
1035 |
- stream.write ( "[BEGIN ebuild %s]\n" % efile ) |
1036 |
- if isinstance ( ebuild, str ): |
1037 |
- if default_header is not None: |
1038 |
- stream.write ( default_header ) |
1039 |
- stream.write ( '\n\n' ) |
1040 |
- stream.write ( ebuild ) |
1041 |
+ stream.write ( "[BEGIN ebuild %s]\n" % efile ) |
1042 |
+ |
1043 |
+ stream.write ( default_header ) |
1044 |
+ stream.write ( '\n\n' ) |
1045 |
+ stream.write ( str ( ebuild ) ) |
1046 |
stream.write ( '\n' ) |
1047 |
- else: |
1048 |
- ebuild.write ( |
1049 |
- stream, |
1050 |
- header=default_header, header_is_fallback=True |
1051 |
- ) |
1052 |
- stream.write ( "[END ebuild %s]\n" % efile ) |
1053 |
|
1054 |
- mfile = self._get_metadata_filepath() |
1055 |
+ stream.write ( "[END ebuild %s]\n" % efile ) |
1056 |
|
1057 |
- stream.write ( "[BEGIN %s]\n" % mfile ) |
1058 |
- self._metadata.write ( stream ) |
1059 |
- stream.write ( "[END %s]\n" % mfile ) |
1060 |
+ mfile = self._get_metadata_filepath() |
1061 |
|
1062 |
+ stream.write ( "[BEGIN %s]\n" % mfile ) |
1063 |
+ self._metadata.write ( stream ) |
1064 |
+ stream.write ( "[END %s]\n" % mfile ) |
1065 |
|
1066 |
- self._lock.release() |
1067 |
+ finally: |
1068 |
+ self._lock.release() |
1069 |
# --- end of show (...) --- |
1070 |
|
1071 |
def _latest_package ( self, pkg_filter=None, use_lock=False ): |
1072 |
@@ -186,15 +178,16 @@ class PackageDir ( object ): |
1073 |
retpkg = None |
1074 |
|
1075 |
if use_lock: self._lock.acquire() |
1076 |
- for p in self._packages.values(): |
1077 |
- if pkg_filter is None or pkg_filter ( p ): |
1078 |
- newver = p ['version'] |
1079 |
- if first or newver > retver: |
1080 |
- retver = newver |
1081 |
- retpkg = p |
1082 |
- first = False |
1083 |
- |
1084 |
- if use_lock: self._lock.release() |
1085 |
+ try: |
1086 |
+ for p in self._packages.values(): |
1087 |
+ if pkg_filter is None or pkg_filter ( p ): |
1088 |
+ newver = p ['version'] |
1089 |
+ if first or newver > retver: |
1090 |
+ retver = newver |
1091 |
+ retpkg = p |
1092 |
+ first = False |
1093 |
+ finally: |
1094 |
+ if use_lock: self._lock.release() |
1095 |
return retpkg |
1096 |
# --- end of _latest_package (...) --- |
1097 |
|
1098 |
@@ -219,7 +212,7 @@ class PackageDir ( object ): |
1099 |
self.name, shortver |
1100 |
) |
1101 |
if SUPPRESS_EXCEPTIONS: |
1102 |
- self.logger.warning ( msg ) |
1103 |
+ self.logger.info ( msg ) |
1104 |
else: |
1105 |
raise Exception ( msg ) |
1106 |
|
1107 |
@@ -265,18 +258,20 @@ class PackageDir ( object ): |
1108 |
return |
1109 |
|
1110 |
self._lock.acquire() |
1111 |
+ try: |
1112 |
|
1113 |
- if self._metadata is None or not use_old_metadata: |
1114 |
- del self._metadata |
1115 |
- self._metadata = MetadataJob ( self.logger ) |
1116 |
+ if self._metadata is None or not use_old_metadata: |
1117 |
+ del self._metadata |
1118 |
+ self._metadata = MetadataJob ( self.logger ) |
1119 |
|
1120 |
- if use_all_packages: |
1121 |
- for p_info in self._packages: |
1122 |
- self._metadata.update ( p_info ) |
1123 |
- else: |
1124 |
- self._metadata.update ( self._latest_package() ) |
1125 |
+ if use_all_packages: |
1126 |
+ for p_info in self._packages: |
1127 |
+ self._metadata.update ( p_info ) |
1128 |
+ else: |
1129 |
+ self._metadata.update ( self._latest_package() ) |
1130 |
|
1131 |
- self._lock.release() |
1132 |
+ finally: |
1133 |
+ self._lock.release() |
1134 |
# --- end of generate_metadata (...) --- |
1135 |
|
1136 |
def generate_manifest ( self ): |
1137 |
|
1138 |
diff --git a/roverlay/overlay/worker.py b/roverlay/overlay/worker.py |
1139 |
index 471cb94..541de7f 100644 |
1140 |
--- a/roverlay/overlay/worker.py |
1141 |
+++ b/roverlay/overlay/worker.py |
1142 |
@@ -3,16 +3,21 @@ |
1143 |
# Distributed under the terms of the GNU General Public License v2 |
1144 |
|
1145 |
#import time |
1146 |
+import sys |
1147 |
import threading |
1148 |
|
1149 |
from roverlay.depres.channels import EbuildJobChannel |
1150 |
from roverlay.ebuild.creation import EbuildCreation |
1151 |
|
1152 |
+# this controls whether debug message from OverlayWorker.run() are printed |
1153 |
+# to stderr or suppressed |
1154 |
+DEBUG = True |
1155 |
+ |
1156 |
class OverlayWorker ( object ): |
1157 |
"""Overlay package queue worker.""" |
1158 |
|
1159 |
def __init__ ( self, |
1160 |
- pkg_queue, depresolver, logger, pkg_done, use_threads |
1161 |
+ pkg_queue, depresolver, logger, pkg_done, use_threads, err_queue |
1162 |
): |
1163 |
"""Initializes a worker. |
1164 |
|
1165 |
@@ -29,11 +34,12 @@ class OverlayWorker ( object ): |
1166 |
self.pkg_done = pkg_done |
1167 |
self.depresolver = depresolver |
1168 |
|
1169 |
+ self.err_queue = err_queue |
1170 |
+ |
1171 |
self._use_thread = use_threads |
1172 |
self._thread = None |
1173 |
- |
1174 |
- self.enabled = True |
1175 |
self.running = False |
1176 |
+ self.enabled = True |
1177 |
self.halting = False |
1178 |
# --- end of __init__ (...) --- |
1179 |
|
1180 |
@@ -81,7 +87,8 @@ class OverlayWorker ( object ): |
1181 |
""" |
1182 |
job = EbuildCreation ( |
1183 |
package_info, |
1184 |
- depres_channel_spawner=self._get_resolver_channel |
1185 |
+ depres_channel_spawner=self._get_resolver_channel, |
1186 |
+ err_queue=self.err_queue |
1187 |
) |
1188 |
job.run() |
1189 |
self.pkg_done ( package_info ) |
1190 |
@@ -89,42 +96,66 @@ class OverlayWorker ( object ): |
1191 |
|
1192 |
def _run ( self ): |
1193 |
"""Runs the worker (thread mode).""" |
1194 |
- self.running = True |
1195 |
- self.halting = False |
1196 |
- while self.enabled or ( |
1197 |
- self.halting and not self.pkg_queue.empty() |
1198 |
- ): |
1199 |
- if not self.running: |
1200 |
- # exit now |
1201 |
- break |
1202 |
- p = self.pkg_queue.get() |
1203 |
|
1204 |
- # drop empty requests that are used to unblock get() |
1205 |
- if p is not None: |
1206 |
- self._process ( p ) |
1207 |
- elif self.halting: |
1208 |
- # receiving an empty request while halting means 'stop now', |
1209 |
- self.enabled = False |
1210 |
- self.halting = False |
1211 |
+ def debug ( msg ): |
1212 |
+ if DEBUG: |
1213 |
+ sys.stderr.write ( |
1214 |
+ "%i WORKER: %s\n" % ( id ( self ), msg ) |
1215 |
+ ) |
1216 |
+ |
1217 |
+ try: |
1218 |
+ self.running = True |
1219 |
+ self.halting = False |
1220 |
+ while self.enabled or ( |
1221 |
+ self.halting and not self.pkg_queue.empty() |
1222 |
+ ): |
1223 |
+ if not self.err_queue.empty(): |
1224 |
+ # other workers died (or exit request sent) |
1225 |
+ debug ( "STOPPING #1" ) |
1226 |
+ break |
1227 |
+ |
1228 |
+ debug ( "WAITING" ) |
1229 |
+ p = self.pkg_queue.get() |
1230 |
+ debug ( "RECEIVED A TASK, " + str ( p ) ) |
1231 |
+ |
1232 |
+ if not self.err_queue.empty(): |
1233 |
+ debug ( "STOPPING #2" ) |
1234 |
+ break |
1235 |
+ |
1236 |
+ # drop empty requests that are used to unblock get() |
1237 |
+ if p is not None: |
1238 |
+ debug ( "ENTER PROC" ) |
1239 |
+ if self.err_queue.empty(): |
1240 |
+ debug ( "__ empty exception/error queue!" ) |
1241 |
+ self._process ( p ) |
1242 |
+ elif self.halting: |
1243 |
+ # receiving an empty request while halting means 'stop now', |
1244 |
+ self.enabled = False |
1245 |
+ self.halting = False |
1246 |
+ |
1247 |
+ self.pkg_queue.task_done() |
1248 |
+ |
1249 |
+ debug ( "STOPPING - DONE" ) |
1250 |
+ except ( Exception, KeyboardInterrupt ) as e: |
1251 |
+ self.logger.exception ( e ) |
1252 |
+ self.err_queue.put_nowait ( ( id ( self ), e ) ) |
1253 |
|
1254 |
- self.pkg_queue.task_done() |
1255 |
self.running = False |
1256 |
+ |
1257 |
# --- end of run (...) --- |
1258 |
|
1259 |
def _run_nothread ( self ): |
1260 |
"""Runs the worker (no-thread mode).""" |
1261 |
self.running = True |
1262 |
while self.enabled and not self.pkg_queue.empty(): |
1263 |
- if not self.running: |
1264 |
- # exit now |
1265 |
- break |
1266 |
|
1267 |
- p = self.pkg_queue.get() |
1268 |
+ p = self.pkg_queue.get_nowait() |
1269 |
|
1270 |
# drop empty requests that are used to unblock get() |
1271 |
if p is not None: |
1272 |
self._process ( p ) |
1273 |
|
1274 |
self.pkg_queue.task_done() |
1275 |
+ |
1276 |
self.running = False |
1277 |
# --- end of _run_nothread (...) --- |