Gentoo Archives: gentoo-commits

From: "André Erdmann" <dywi@×××××××.de>
To: gentoo-commits@l.g.o
Subject: [gentoo-commits] proj/R_overlay:master commit in: roverlay/ebuild/, roverlay/depres/, roverlay/overlay/
Date: Mon, 02 Jul 2012 16:52:49
Message-Id: 1341247711.8a174d86bba8fa67d38935da2b7b7c51f53606fa.dywi@gentoo
1 commit: 8a174d86bba8fa67d38935da2b7b7c51f53606fa
2 Author: André Erdmann <dywi <AT> mailerd <DOT> de>
3 AuthorDate: Mon Jul 2 16:48:31 2012 +0000
4 Commit: André Erdmann <dywi <AT> mailerd <DOT> de>
5 CommitDate: Mon Jul 2 16:48:31 2012 +0000
6 URL: http://git.overlays.gentoo.org/gitweb/?p=proj/R_overlay.git;a=commit;h=8a174d86
7
8 threaded overlay writing fix some threading issues
9
10 * the overlay produced can now be written to fs using per-pacackage dir threads
11 * fixed some thread locking issues
12 * added an error queue used by threads to pass exceptions to the main module
13
14 modified: roverlay/depres/channels.py
15 modified: roverlay/depres/depresolver.py
16 modified: roverlay/ebuild/creation.py
17 modified: roverlay/ebuild/depres.py
18 modified: roverlay/overlay/__init__.py
19 modified: roverlay/overlay/category.py
20 modified: roverlay/overlay/creator.py
21 modified: roverlay/overlay/package.py
22 modified: roverlay/overlay/worker.py
23
24 ---
25 roverlay/depres/channels.py | 16 ++-
26 roverlay/depres/depresolver.py | 296 +++++++++++++++++++++++++---------------
27 roverlay/ebuild/creation.py | 17 ++-
28 roverlay/ebuild/depres.py | 9 +-
29 roverlay/overlay/__init__.py | 18 ++-
30 roverlay/overlay/category.py | 76 +++++++++--
31 roverlay/overlay/creator.py | 119 ++++++++++++----
32 roverlay/overlay/package.py | 165 +++++++++++------------
33 roverlay/overlay/worker.py | 81 ++++++++----
34 9 files changed, 510 insertions(+), 287 deletions(-)
35
36 diff --git a/roverlay/depres/channels.py b/roverlay/depres/channels.py
37 index 65d4d45..56491f4 100644
38 --- a/roverlay/depres/channels.py
39 +++ b/roverlay/depres/channels.py
40 @@ -19,7 +19,7 @@ class EbuildJobChannel ( DependencyResolverChannel ):
41 add deps, then satisfy_request(): collect/lookup
42 """
43
44 - def __init__ ( self, name=None, logger=None ):
45 + def __init__ ( self, err_queue, name=None, logger=None ):
46 """EbuildJobChannel
47
48 arguments:
49 @@ -32,6 +32,8 @@ class EbuildJobChannel ( DependencyResolverChannel ):
50 # in the join()-method
51 self._depdone = 0
52
53 + self.err_queue = err_queue
54 +
55 # set of portage packages (resolved deps)
56 # this is None unless all deps have been successfully resolved
57 self._collected_deps = None
58 @@ -170,12 +172,15 @@ class EbuildJobChannel ( DependencyResolverChannel ):
59 # DEPEND/RDEPEND/.. later, seewave requires sci-libs/fftw
60 # in both DEPEND and RDEPEND for example
61 dep_collected = set()
62 - satisfiable = True
63 + satisfiable = self.err_queue.empty()
64
65
66 def handle_queue_item ( dep_env ):
67 self._depdone += 1
68 - if dep_env.is_resolved():
69 + if dep_env is None:
70 + # could used to unblock the queue
71 + return self.err_queue.empty()
72 + elif dep_env.is_resolved():
73 ### and dep_env in self.dep_env_list
74 # successfully resolved
75 dep_collected.add ( dep_env.get_result() [1] )
76 @@ -190,7 +195,8 @@ class EbuildJobChannel ( DependencyResolverChannel ):
77
78 # loop until
79 # (a) at least one dependency could not be resolved or
80 - # (b) all deps processed
81 + # (b) all deps processed or
82 + # (c) error queue not empty
83 while self._depdone < len ( self.dep_env_list ) and satisfiable:
84 # tell the resolver to start
85 self._depres_master.start()
86 @@ -204,7 +210,7 @@ class EbuildJobChannel ( DependencyResolverChannel ):
87 # --- end while
88
89 if satisfiable:
90 - self._collected_deps = dep_collected
91 + self._collected_deps = frozenset ( dep_collected )
92 return self._collected_deps
93 else:
94 if close_if_unresolvable: self.close()
95
96 diff --git a/roverlay/depres/depresolver.py b/roverlay/depres/depresolver.py
97 index 26040c0..7f91983 100644
98 --- a/roverlay/depres/depresolver.py
99 +++ b/roverlay/depres/depresolver.py
100 @@ -29,12 +29,11 @@ class DependencyResolver ( object ):
101 """Main object for dependency resolution."""
102
103
104 - NUMTHREADS = config.get ( "DEPRES.jobcount", 0 )
105 + NUMTHREADS = config.get ( "DEPRES.jobcount", 15 )
106
107 def __init__ ( self ):
108 """Initializes a DependencyResolver."""
109
110 - # these loggers are temporary helpers
111 self.logger = logging.getLogger ( self.__class__.__name__ )
112 self.logger_unresolvable = self.logger.getChild ( "UNRESOLVABLE" )
113 self.logger_resolved = self.logger.getChild ( "RESOLVED" )
114 @@ -46,6 +45,7 @@ class DependencyResolver ( object ):
115
116 # this lock tells whether a dep res 'master' thread is running (locked)
117 self.runlock = threading.Lock()
118 + self.startlock = threading.Lock()
119 # the dep res main thread
120 self._mainthread = None
121 # the dep res worker threads
122 @@ -86,6 +86,9 @@ class DependencyResolver ( object ):
123 self.all_channel_ids = set()
124 # --- end of __init__ (...) ---
125
126 + def set_exception_queue ( self, equeue ):
127 + self.err_queue = equeue
128 +
129 def _sort ( self ):
130 """Sorts the rule pools of this resolver."""
131 for pool in self.static_rule_pools: pool.sort()
132 @@ -188,7 +191,7 @@ class DependencyResolver ( object ):
133 * channel -- channel to be registered
134 automatically sets channel's resolver to self if it is None
135
136 - raises: Exception if channels is already registered with this resolver
137 + raises: Exception if channel is already registered with this resolver
138
139 returns: channel
140 """
141 @@ -242,7 +245,6 @@ class DependencyResolver ( object ):
142 except KeyError as expected:
143 # ok
144 pass
145 -
146 # --- end of channel_closed (...) ---
147
148 def _queue_previously_failed ( self ):
149 @@ -259,81 +261,138 @@ class DependencyResolver ( object ):
150 # --- end of _queue_previously_failed (...) ---
151
152 def start ( self ):
153 - if not self.runlock.acquire ( False ):
154 - # already running
155 - return True
156 - # --
157 + # -- verify whether resolver has to be started
158 + if self._depqueue.empty():
159 + # nothing to resolve
160 + return
161 +
162 + if not self.startlock.acquire ( False ):
163 + # another channel/.. is starting the resolver
164 + return
165 + elif self._depqueue.empty():
166 + self.startlock.release()
167 + return
168 +
169 + # -- verify...
170 +
171 + # acquire the run lock (that locks _run_main)
172 + try:
173 + self.runlock.acquire()
174 + finally:
175 + self.startlock.release()
176 +
177 + if self._depqueue.empty():
178 + self.runlock.release()
179 + return
180
181 if DependencyResolver.NUMTHREADS > 0:
182 # no need to wait for the old thread
183 + # FIXME: could remove the following block
184 + if self._mainthread is not None:
185 + self._mainthread.join()
186 + del self._mainthread
187 +
188 self._mainthread = threading.Thread ( target=self._thread_run_main )
189 self._mainthread.start()
190 +
191 else:
192 self._thread_run_main()
193
194 # self.runlock is released when _thread_run_main is done
195 # --- end of start (...) ---
196
197 + def unblock_channels ( self ):
198 + # unblock all channels by processing all remaining deps as
199 + # unresolved
200 + ## other option: select channel, but this may interfere with
201 + ## channel_closed()
202 + channel_gone = set()
203 + while not self._depqueue.empty():
204 + chan, dep_env = self._depqueue.get_nowait()
205 + dep_env.set_unresolvable()
206 +
207 + if chan not in channel_gone:
208 + try:
209 + self._depqueue_done [chan].put_nowait ( dep_env )
210 + except KeyError:
211 + channel_gone.add ( chan )
212 + # --- end of unblock_channels (...) ---
213 +
214 def _thread_run_main ( self ):
215 """Tells the resolver to run."""
216 -
217 jobcount = DependencyResolver.NUMTHREADS
218
219 - if jobcount < 1:
220 - ( self.logger.warning if jobcount < 0 else self.logger.debug ) (
221 - "Running in sequential mode."
222 - )
223 - self._thread_run_resolve()
224 - else:
225 + try:
226 + if jobcount < 1:
227 + ( self.logger.warning if jobcount < 0 else self.logger.debug ) (
228 + "Running in sequential mode."
229 + )
230 + self._thread_run_resolve()
231 + else:
232
233 - # wait for old threads
234 - if not self._threads is None:
235 - self.logger.warning ( "Waiting for old threads..." )
236 + # wait for old threads
237 + if not self._threads is None:
238 + self.logger.warning ( "Waiting for old threads..." )
239 + for t in self._threads: t.join()
240 +
241 + self.logger.debug (
242 + "Running in concurrent mode with %i jobs." % jobcount
243 + )
244 +
245 + # create threads,
246 + self._threads = tuple (
247 + threading.Thread ( target=self._thread_run_resolve )
248 + for n in range (jobcount)
249 + )
250 + # run them
251 + for t in self._threads: t.start()
252 + # and wait until done
253 for t in self._threads: t.join()
254
255 - self.logger.warning (
256 - "Running in concurrent mode with %i jobs." % jobcount
257 - )
258 + # finally remove them
259 + del self._threads
260 + self._threads = None
261
262 - # create threads,
263 - self._threads = [
264 - threading.Thread ( target=self._thread_run_resolve )
265 - for n in range (jobcount)
266 - ]
267 - # run them
268 - for t in self._threads: t.start()
269 - # and wait until done
270 - for t in self._threads: t.join()
271
272 - # finally remove them
273 - del self._threads
274 - self._threads = None
275 + # iterate over _depqueue_failed and report unresolved
276 + ## todo can thread this
277 + while not self._depqueue_failed.empty():
278 + try:
279 + channel_id, dep_env = self._depqueue_failed.get_nowait()
280
281 + except queue.Empty:
282 + # race cond empty() <-> get_nowait()
283 + return
284
285 - # iterate over _depqueue_failed and report unresolved
286 - ## todo can thread this
287 - while not self._depqueue_failed.empty():
288 - try:
289 - channel_id, dep_env = self._depqueue_failed.get_nowait()
290
291 dep_env.set_unresolvable()
292
293 self._report_event ( 'UNRESOLVABLE', dep_env )
294
295 - if channel_id in self._depqueue_done:
296 - ## todo/fixme/whatever: this 'if' can filter out channels
297 - ## that have been added again
298 - self._depqueue_done [channel_id].put ( dep_env )
299
300 - except queue.Empty:
301 - # race cond empty() <-> get_nowait()
302 - break
303 - except KeyError:
304 - # channel has been closed before calling put, ignore this err
305 - pass
306 + try:
307 + if channel_id in self._depqueue_done:
308 + self._depqueue_done [channel_id].put_nowait ( dep_env )
309 + except KeyError:
310 + # channel has been closed before calling put, ignore this
311 + pass
312 +
313 + if not self.err_queue.empty():
314 + self.unblock_channels()
315 +
316 + except ( Exception, KeyboardInterrupt ) as e:
317 +
318 + self.unblock_channels()
319
320 - # release the lock
321 - self.runlock.release()
322 + if jobcount > 0 and hasattr ( self, 'err_queue' ):
323 + self.err_queue.put_nowait ( id ( self ), e )
324 + return
325 + else:
326 + raise e
327 +
328 + finally:
329 + # release the lock
330 + self.runlock.release()
331
332 # --- end of _thread_run_main (...) ---
333
334 @@ -342,77 +401,90 @@ class DependencyResolver ( object ):
335
336 returns: None (implicit)
337 """
338 + try:
339 + while self.err_queue.empty() and not self._depqueue.empty():
340
341 - while not self._depqueue.empty():
342 -
343 - try:
344 - to_resolve = self._depqueue.get_nowait()
345 - except queue.Empty:
346 - # this thread is done when the queue is empty, so this is
347 - # no error, but just the result of the race condition between
348 - # queue.empty() and queue.get(False)
349 - return None
350 -
351 - channel_id, dep_env = to_resolve
352 -
353 - if channel_id in self._depqueue_done:
354 - # else channel has been closed, drop dep
355 -
356 - self.logger.debug ( "Trying to resolve '%s'." % dep_env.dep_str )
357 -
358 - #have_new_rule = False
359 + try:
360 + to_resolve = self._depqueue.get_nowait()
361 + except queue.Empty:
362 + # this thread is done when the queue is empty, so this is
363 + # no error, but just the result of the race condition between
364 + # queue.empty() and queue.get(False)
365 + return
366
367 - resolved = None
368 - # resolved can be None, so use a tri-state int for checking
369 - # 0 -> unresolved, but resolvable
370 - # 1 -> unresolved and (currently, new rules may change this)
371 - # not resolvable
372 - # 2 -> resolved
373 - is_resolved = 0
374 + channel_id, dep_env = to_resolve
375
376 - # TODO:
377 - # (threading: could search the pools in parallel)
378 + if channel_id in self._depqueue_done:
379 + # else channel has been closed, drop dep
380
381 - if USING_DEPRES_CACHE:
382 - if dep_env.dep_str_low in self._dep_unresolvable:
383 - # cannot resolve
384 - is_resolved = 1
385 + self.logger.debug (
386 + "Trying to resolve '%s'." % dep_env.dep_str
387 + )
388
389 - if is_resolved == 0:
390 - # search for a match in the rule pools
391 - for rulepool in self.static_rule_pools:
392 - result = rulepool.matches ( dep_env )
393 - if not result is None and result [0] > 0:
394 - resolved = result [1]
395 - is_resolved = 2
396 - break
397 + #have_new_rule = False
398
399 + resolved = None
400 + # resolved can be None, so use a tri-state int for checking
401 + # 0 -> unresolved, but resolvable
402 + # 1 -> unresolved and (currently, new rules may change this)
403 + # not resolvable
404 + # 2 -> resolved
405 + is_resolved = 0
406
407 + # TODO:
408 + # (threading: could search the pools in parallel)
409
410 - if is_resolved == 2:
411 - dep_env.set_resolved ( resolved, append=False )
412 - self._report_event ( 'RESOLVED', dep_env )
413 - self._depqueue_done [channel_id].put ( dep_env )
414 - else:
415 - self._depqueue_failed.put ( to_resolve )
416 -
417 - if USING_DEPRES_CACHE:
418 - # does not work when adding new rules is possible
419 - self._dep_unresolvable.add ( dep_env.dep_str_low )
420 -
421 - """
422 - ## only useful if new rules can be created
423 - # new rule found, requeue all previously failed dependency searches
424 - if have_new_rule:
425 - self._queue_previously_failed
426 if USING_DEPRES_CACHE:
427 - self._dep_unresolvable.clear() #?
428 - """
429 - # --- end if channel_id in self._depqueue_done
430 -
431 - self._depqueue.task_done()
432 - # --- end while
433 -
434 + if dep_env.dep_str_low in self._dep_unresolvable:
435 + # cannot resolve
436 + is_resolved = 1
437 +
438 + if is_resolved == 0:
439 + # search for a match in the rule pools
440 + for rulepool in self.static_rule_pools:
441 + result = rulepool.matches ( dep_env )
442 + if not result is None and result [0] > 0:
443 + resolved = result [1]
444 + is_resolved = 2
445 + break
446 +
447 +
448 +
449 + if is_resolved == 2:
450 + dep_env.set_resolved ( resolved, append=False )
451 + self._report_event ( 'RESOLVED', dep_env )
452 + try:
453 + self._depqueue_done [channel_id].put ( dep_env )
454 + except KeyError:
455 + # channel gone while resolving
456 + pass
457 + else:
458 + self._depqueue_failed.put ( to_resolve )
459 +
460 + if USING_DEPRES_CACHE:
461 + # does not work when adding new rules is possible
462 + self._dep_unresolvable.add ( dep_env.dep_str_low )
463 +
464 + """
465 + ## only useful if new rules can be created
466 + # new rule found, requeue all previously
467 + # failed dependency searches
468 + if have_new_rule:
469 + self._queue_previously_failed
470 + if USING_DEPRES_CACHE:
471 + self._dep_unresolvable.clear() #?
472 + """
473 + # --- end if channel_id in self._depqueue_done
474 +
475 + self._depqueue.task_done()
476 + # --- end while
477 +
478 + except ( Exception, KeyboardInterrupt ) as e:
479 + if jobcount > 0 and hasattr ( self, 'err_queue' ):
480 + self.err_queue.put_nowait ( id ( self ), e )
481 + return
482 + else:
483 + raise e
484
485
486 # --- end of _thread_run_resolve (...) ---
487
488 diff --git a/roverlay/ebuild/creation.py b/roverlay/ebuild/creation.py
489 index 056b1a2..b606ae9 100644
490 --- a/roverlay/ebuild/creation.py
491 +++ b/roverlay/ebuild/creation.py
492 @@ -22,7 +22,7 @@ FALLBACK_DESCRIPTION = "<none>"
493 class EbuildCreation ( object ):
494 """Used to create an ebuild using DESCRIPTION data."""
495
496 - def __init__ ( self, package_info, depres_channel_spawner=None ):
497 + def __init__ ( self, package_info, err_queue, depres_channel_spawner=None ):
498 """Initializes the creation of an ebuild.
499
500 arguments:
501 @@ -39,6 +39,8 @@ class EbuildCreation ( object ):
502
503 self.depres_channel_spawner = depres_channel_spawner
504
505 + self.err_queue = err_queue
506 +
507 self.package_info.set_readonly()
508 # --- end of __init__ (...) ---
509
510 @@ -64,10 +66,10 @@ class EbuildCreation ( object ):
511 self.logger.info ( "Cannot create an ebuild for this package." )
512 self.status = -1
513
514 - except Exception as e:
515 - # log this and set status to fail
516 + except ( Exception, KeyboardInterrupt ):
517 + # set status to fail
518 self.status = -10
519 - self.logger.exception ( e )
520 + raise
521 # --- end of run (...) ---
522
523 def _lazyimport_desc_data ( self ):
524 @@ -82,8 +84,8 @@ class EbuildCreation ( object ):
525 logger=self.logger,
526 read_now=True
527 )
528 - self.package_info.set_writeable()
529 - self.package_info.update (
530 +
531 + self.package_info.update_now (
532 desc_data=reader.get_desc ( run_if_unset=False )
533 )
534 del reader
535 @@ -133,7 +135,8 @@ class EbuildCreation ( object ):
536 _dep_resolution = depres.EbuildDepRes (
537 self.package_info, self.logger,
538 create_iuse=True, run_now=True,
539 - depres_channel_spawner=self.depres_channel_spawner
540 + depres_channel_spawner=self.depres_channel_spawner,
541 + err_queue=self.err_queue
542 )
543 if not _dep_resolution.success():
544 # log here? (FIXME)
545
546 diff --git a/roverlay/ebuild/depres.py b/roverlay/ebuild/depres.py
547 index 383831e..1c19a00 100644
548 --- a/roverlay/ebuild/depres.py
549 +++ b/roverlay/ebuild/depres.py
550 @@ -22,8 +22,8 @@ class EbuildDepRes ( object ):
551 """Handles dependency resolution for a single ebuild."""
552
553 def __init__ (
554 - self, package_info, logger, depres_channel_spawner,
555 - create_iuse=True, run_now=True
556 + self, package_info, logger, depres_channel_spawner, err_queue,
557 + create_iuse=True, run_now=True,
558 ):
559 """Initializes an EbuildDepRes.
560
561 @@ -45,6 +45,8 @@ class EbuildDepRes ( object ):
562 self.has_suggests = None
563 self.create_iuse = create_iuse
564
565 + self.err_queue = err_queue
566 +
567 self._channels = None
568
569 if run_now:
570 @@ -90,7 +92,8 @@ class EbuildDepRes ( object ):
571 if dependency_type not in self._channels:
572 self._channels [dependency_type] = self.request_resolver (
573 name=dependency_type,
574 - logger=self.logger
575 + logger=self.logger,
576 + err_queue=self.err_queue
577 )
578 return self._channels [dependency_type]
579 # --- end of get_channel (...) ---
580
581 diff --git a/roverlay/overlay/__init__.py b/roverlay/overlay/__init__.py
582 index 9ca6abe..a8a939d 100644
583 --- a/roverlay/overlay/__init__.py
584 +++ b/roverlay/overlay/__init__.py
585 @@ -77,14 +77,16 @@ class Overlay ( object ):
586 """
587 if not category in self._categories:
588 self._catlock.acquire()
589 - if not category in self._categories:
590 - self._categories [category] = Category (
591 - category,
592 - self.logger,
593 - None if self.physical_location is None else \
594 - os.path.join ( self.physical_location, category )
595 - )
596 - self._catlock.release()
597 + try:
598 + if not category in self._categories:
599 + self._categories [category] = Category (
600 + category,
601 + self.logger,
602 + None if self.physical_location is None else \
603 + os.path.join ( self.physical_location, category )
604 + )
605 + finally:
606 + self._catlock.release()
607
608 return self._categories [category]
609 # --- end of _get_category (...) ---
610
611 diff --git a/roverlay/overlay/category.py b/roverlay/overlay/category.py
612 index 910f194..e938dbf 100644
613 --- a/roverlay/overlay/category.py
614 +++ b/roverlay/overlay/category.py
615 @@ -5,12 +5,19 @@
616 import threading
617 import os.path
618
619 +try:
620 + import queue
621 +except ImportError:
622 + import Queue as queue
623 +
624 from roverlay.overlay.package import PackageDir
625
626 import roverlay.util
627
628 class Category ( object ):
629
630 + WRITE_JOBCOUNT = 3
631 +
632 def __init__ ( self, name, logger, directory ):
633 """Initializes a overlay/portage category (such as 'app-text', 'sci-R').
634
635 @@ -45,14 +52,16 @@ class Category ( object ):
636
637 if not pkg_name in self._subdirs:
638 self._lock.acquire()
639 - if not pkg_name in self._subdirs:
640 - self._subdirs [pkg_name] = PackageDir (
641 - pkg_name,
642 - self.logger,
643 - None if self.physical_location is None else \
644 - os.path.join ( self.physical_location, pkg_name )
645 - )
646 - self._lock.release()
647 + try:
648 + if not pkg_name in self._subdirs:
649 + self._subdirs [pkg_name] = PackageDir (
650 + pkg_name,
651 + self.logger,
652 + None if self.physical_location is None else \
653 + os.path.join ( self.physical_location, pkg_name )
654 + )
655 + finally:
656 + self._lock.release()
657
658 self._subdirs [pkg_name].add ( package_info )
659 # --- end of add (...) ---
660 @@ -92,13 +101,56 @@ class Category ( object ):
661 package.show ( **show_kw )
662 # --- end of show (...) ---
663
664 + def _run_write_queue ( self, q, write_kw ):
665 + try:
666 + while not q.empty():
667 + pkg = q.get_nowait()
668 + pkg.write ( **write_kw )
669 +
670 + except queue.Empty:
671 + pass
672 + except ( Exception, KeyboardInterrupt ) as e:
673 + self.RERAISE_EXCEPTION = e
674 +
675 + # --- end of _run_write_queue (...) ---
676 +
677 def write ( self, **write_kw ):
678 """Writes this category to its filesystem location.
679
680 returns: None (implicit)
681 """
682 - for package in self._subdirs.values():
683 - if package.physical_location and not package.empty():
684 - roverlay.util.dodir ( package.physical_location )
685 - package.write ( **write_kw )
686 +
687 + max_jobs = self.__class__.WRITE_JOBCOUNT
688 +
689 + # todo len.. > 3: what's an reasonable number of min package dirs to
690 + # start threaded writing?
691 + if max_jobs > 1 and len ( self._subdirs ) > 3:
692 +
693 + # writing 1..self.__class__.WRITE_JOBCOUNT package dirs at once
694 +
695 + write_queue = queue.Queue()
696 + for package in self._subdirs.values():
697 + if package.physical_location and not package.empty():
698 + roverlay.util.dodir ( package.physical_location )
699 + write_queue.put_nowait ( package )
700 +
701 +
702 + if not write_queue.empty():
703 + workers = (
704 + threading.Thread (
705 + target=self._run_write_queue,
706 + args=( write_queue, write_kw )
707 + ) for n in range ( max_jobs )
708 + )
709 +
710 + for w in workers: w.start()
711 + for w in workers: w.join()
712 +
713 + if hasattr ( self, 'RERAISE_EXCEPTION' ):
714 + raise self.RERAISE_EXCEPTION
715 + else:
716 + for package in self._subdirs.values():
717 + if package.physical_location and not package.empty():
718 + roverlay.util.dodir ( package.physical_location )
719 + package.write ( **write_kw )
720 # --- end of write (...) ---
721
722 diff --git a/roverlay/overlay/creator.py b/roverlay/overlay/creator.py
723 index 608ba77..aa1a3a8 100644
724 --- a/roverlay/overlay/creator.py
725 +++ b/roverlay/overlay/creator.py
726 @@ -5,6 +5,9 @@
727 import time
728 import logging
729 import threading
730 +import signal
731 +import traceback
732 +import sys
733
734 try:
735 import queue
736 @@ -40,18 +43,26 @@ class OverlayCreator ( object ):
737
738 self.depresolver = easyresolver.setup()
739
740 - self.NUMTHREADS = config.get ( 'EBUILD.jobcount', 0 )
741 + self.NUMTHREADS = config.get ( 'EBUILD.jobcount', 10 )
742
743 # --
744 - self._pkg_queue = queue.Queue()
745 - self._workers = None
746 - self._runlock = threading.RLock()
747 + self._pkg_queue = queue.Queue()
748 +
749 + # this queue is used to propagate exceptions from threads
750 + # it's
751 + self._err_queue = queue.Queue()
752 +
753 +
754 + self._workers = None
755 + self._runlock = threading.RLock()
756
757 self.can_write_overlay = OVERLAY_WRITE_ALLOWED
758
759 # this is a method that adds PackageInfo objects to the pkg queue
760 self.add_package = self._pkg_queue.put
761
762 + self.depresolver.set_exception_queue ( self._err_queue )
763 +
764 # --- end of __init__ (...) ---
765
766 def _timestamp ( self, description, start, stop=None ):
767 @@ -109,17 +120,24 @@ class OverlayCreator ( object ):
768 def run ( self ):
769 """Starts ebuild creation and waits until done."""
770 self._runlock.acquire()
771 - self.start()
772 - self.join()
773 - self._runlock.release()
774 + try:
775 + self.start()
776 + self.join()
777 + finally:
778 + self._runlock.release()
779 # --- end of run (...) ---
780
781 def start ( self ):
782 """Starts ebuild creation."""
783 self._runlock.acquire()
784 - self.join()
785 - self._make_workers()
786 - self._runlock.release()
787 + try:
788 + self.join()
789 + self._make_workers()
790 + except:
791 + self._err_queue.put_nowait ( ( -1, None ) )
792 + raise
793 + finally:
794 + self._runlock.release()
795 # --- end of start (...) ---
796
797 def join ( self ):
798 @@ -142,13 +160,14 @@ class OverlayCreator ( object ):
799 if self.depresolver is None: return
800
801 self._runlock.acquire()
802 - if self.depresolver is None: return
803
804 - self.depresolver.close()
805 - del self.depresolver
806 - self.depresolver = None
807 -
808 - self._runlock.release()
809 + try:
810 + if self.depresolver is not None:
811 + self.depresolver.close()
812 + del self.depresolver
813 + self.depresolver = None
814 + finally:
815 + self._runlock.release()
816 # --- end of _close_resolver (...) ---
817
818 def _waitfor_workers ( self, do_close ):
819 @@ -157,26 +176,65 @@ class OverlayCreator ( object ):
820 arguments:
821 * do_close -- close (exit) workers if True, else wait until done.
822 """
823 - if self._workers is None: return
824
825 - self._runlock.acquire()
826 if self._workers is None: return
827 + start = None
828 + self._runlock.acquire()
829
830 - if self.NUMTHREADS > 0: start = time.time()
831 + try:
832 + if self._workers is not None:
833 + if self.NUMTHREADS > 0: start = time.time()
834 +
835 + if do_close:
836 + self._err_queue.put_nowait ( ( -1, None ) )
837 + # fixme: remove enabled?
838 + for w in self._workers: w.enabled = False
839 + else:
840 + for w in self._workers: w.stop_when_empty()
841 +
842 + while True in ( w.active() for w in self._workers ):
843 + self._pkg_queue.put ( None )
844 +
845 + del self._workers
846 + self._workers = None
847 +
848 + while not self._err_queue.empty():
849 + e = self._err_queue.get_nowait()
850 + self._err_queue.put_nowait ( ( -2, None ) )
851 + if isinstance ( e [1], ( Exception, KeyboardInterrupt ) ):
852 + self._err_queue.put ( e )
853 + self.logger.warning ( "Reraising thread exception." )
854 + raise e [1]
855 +
856 +
857 +
858 + except ( Exception, KeyboardInterrupt ) as err:
859 + # catch interrupt here: still wait until all workers have been closed
860 + # and reraise after that
861 +# SIGINT_RESTORE = signal.signal (
862 +# signal.SIGINT,
863 +# lambda sig, frame : sys.stderr.write ( "Please wait ...\n" )
864 +# )
865 +
866 + try:
867 + self._err_queue.put_nowait ( ( -1, None ) )
868 + self.depresolver.unblock_channels()
869 + while hasattr ( self, '_workers' ) and self._workers is not None:
870
871 - if do_close:
872 - for w in self._workers: w.enabled = False
873 - else:
874 - for w in self._workers: w.stop_when_empty()
875
876 - while True in ( w.active() for w in self._workers ):
877 - self._pkg_queue.put ( None )
878 + if True in ( w.active() for w in self._workers ):
879 + self._pkg_queue.put_nowait ( None )
880 + else:
881 + del self._workers
882 + self._workers = None
883 + finally:
884 +# signal.signal ( signal.SIGINT, SIGINT_RESTORE )
885 + raise
886
887 - del self._workers
888 - self._workers = None
889 + finally:
890 + self._runlock.release()
891
892 - self._runlock.release()
893 - if self.NUMTHREADS > 0: return start
894 + return start
895 # --- end of _waitfor_workers (...) ---
896
897 def _join_workers ( self ):
898 @@ -221,7 +279,8 @@ class OverlayCreator ( object ):
899 """
900 w = OverlayWorker (
901 self._pkg_queue, self.depresolver, self.logger, self._pkg_done,
902 - use_threads=use_threads
903 + use_threads=use_threads,
904 + err_queue=self._err_queue
905 )
906 if start_now: w.start()
907 return w
908
909 diff --git a/roverlay/overlay/package.py b/roverlay/overlay/package.py
910 index e77d4a2..77fcbee 100644
911 --- a/roverlay/overlay/package.py
912 +++ b/roverlay/overlay/package.py
913 @@ -69,64 +69,60 @@ class PackageDir ( object ):
914 raise Exception ( "cannot write - no directory assigned!" )
915
916 self._lock.acquire()
917 - self._regen_metadata()
918 + try:
919 + self._regen_metadata()
920
921 - # mkdir not required here, overlay.Category does this
922 + # mkdir not required here, overlay.Category does this
923
924 - # write ebuilds
925 - for ver, p_info in self._packages.items():
926 - fh = None
927 - try:
928 - efile = self._get_ebuild_filepath ( ver )
929 + # write ebuilds
930 + for ver, p_info in self._packages.items():
931 + fh = None
932 + try:
933 + efile = self._get_ebuild_filepath ( ver )
934
935 - ebuild = p_info ['ebuild']
936 + ebuild = p_info ['ebuild']
937
938 - fh = open ( efile, 'w' )
939 - if isinstance ( ebuild, str ):
940 - if default_header is not None:
941 - fh.write ( default_header )
942 - fh.write ( '\n\n' )
943 - fh.write ( ebuild )
944 + fh = open ( efile, 'w' )
945 + fh.write ( default_header )
946 + fh.write ( '\n\n' )
947 + fh.write ( str ( ebuild ) )
948 fh.write ( '\n' )
949 - else:
950 - ebuild.write (
951 - fh,
952 - header=default_header, header_is_fallback=True
953 - )
954 - if fh: fh.close()
955
956 - # adjust owner/perm? TODO
957 - # chmod 0644 or 0444
958 - # chown 250.250
959 + if fh: fh.close()
960
961 - # this marks the package as 'written to fs'
962 - p_info.set_writeable()
963 - p_info ['ebuild_file'] = efile
964 - p_info.set_readonly()
965 + # adjust owner/perm? TODO
966 + # chmod 0644 or 0444
967 + # chown 250.250
968
969 - self.logger.info ( "Wrote ebuild %s." % efile )
970 - except IOError as e:
971 - if fh: fh.close()
972 - self.logger.error ( "Couldn't write ebuild %s." % efile )
973 - self.logger.exception ( e )
974 + # this marks the package as 'written to fs'
975 + p_info.set_writeable()
976 + p_info ['ebuild_file'] = efile
977 + p_info.set_readonly()
978
979 - # write metadata
980 - fh = None
981 - try:
982 - mfile = self._get_metadata_filepath()
983 + self.logger.info ( "Wrote ebuild %s." % efile )
984 + except IOError as e:
985 + if fh: fh.close()
986 + self.logger.error ( "Couldn't write ebuild %s." % efile )
987 + self.logger.exception ( e )
988
989 - fh = open ( mfile, 'w' )
990 - self._metadata.write ( fh )
991 - if fh: fh.close()
992 + # write metadata
993 + fh = None
994 + try:
995 + mfile = self._get_metadata_filepath()
996
997 - except IOError as e:
998 - if fh: fh.close()
999 - self.logger.error ( "Failed to write metadata at %s." % mfile )
1000 - self.logger.exception ( e )
1001 + fh = open ( mfile, 'w' )
1002 + self._metadata.write ( fh )
1003 + if fh: fh.close()
1004
1005 - self.generate_manifest()
1006 + except IOError as e:
1007 + if fh: fh.close()
1008 + self.logger.error ( "Failed to write metadata at %s." % mfile )
1009 + self.logger.exception ( e )
1010
1011 - self._lock.release()
1012 + self.generate_manifest()
1013 +
1014 + finally:
1015 + self._lock.release()
1016 # --- end of write (...) ---
1017
1018 def show ( self, stream=sys.stderr, default_header=None ):
1019 @@ -141,35 +137,31 @@ class PackageDir ( object ):
1020 * IOError
1021 """
1022 self._lock.acquire()
1023 - self._regen_metadata()
1024 + try:
1025 + self._regen_metadata()
1026
1027
1028 - for ver, p_info in self._packages.items():
1029 - efile = self._get_ebuild_filepath ( ver )
1030 - ebuild = p_info ['ebuild']
1031 + for ver, p_info in self._packages.items():
1032 + efile = self._get_ebuild_filepath ( ver )
1033 + ebuild = p_info ['ebuild']
1034
1035 - stream.write ( "[BEGIN ebuild %s]\n" % efile )
1036 - if isinstance ( ebuild, str ):
1037 - if default_header is not None:
1038 - stream.write ( default_header )
1039 - stream.write ( '\n\n' )
1040 - stream.write ( ebuild )
1041 + stream.write ( "[BEGIN ebuild %s]\n" % efile )
1042 +
1043 + stream.write ( default_header )
1044 + stream.write ( '\n\n' )
1045 + stream.write ( str ( ebuild ) )
1046 stream.write ( '\n' )
1047 - else:
1048 - ebuild.write (
1049 - stream,
1050 - header=default_header, header_is_fallback=True
1051 - )
1052 - stream.write ( "[END ebuild %s]\n" % efile )
1053
1054 - mfile = self._get_metadata_filepath()
1055 + stream.write ( "[END ebuild %s]\n" % efile )
1056
1057 - stream.write ( "[BEGIN %s]\n" % mfile )
1058 - self._metadata.write ( stream )
1059 - stream.write ( "[END %s]\n" % mfile )
1060 + mfile = self._get_metadata_filepath()
1061
1062 + stream.write ( "[BEGIN %s]\n" % mfile )
1063 + self._metadata.write ( stream )
1064 + stream.write ( "[END %s]\n" % mfile )
1065
1066 - self._lock.release()
1067 + finally:
1068 + self._lock.release()
1069 # --- end of show (...) ---
1070
1071 def _latest_package ( self, pkg_filter=None, use_lock=False ):
1072 @@ -186,15 +178,16 @@ class PackageDir ( object ):
1073 retpkg = None
1074
1075 if use_lock: self._lock.acquire()
1076 - for p in self._packages.values():
1077 - if pkg_filter is None or pkg_filter ( p ):
1078 - newver = p ['version']
1079 - if first or newver > retver:
1080 - retver = newver
1081 - retpkg = p
1082 - first = False
1083 -
1084 - if use_lock: self._lock.release()
1085 + try:
1086 + for p in self._packages.values():
1087 + if pkg_filter is None or pkg_filter ( p ):
1088 + newver = p ['version']
1089 + if first or newver > retver:
1090 + retver = newver
1091 + retpkg = p
1092 + first = False
1093 + finally:
1094 + if use_lock: self._lock.release()
1095 return retpkg
1096 # --- end of _latest_package (...) ---
1097
1098 @@ -219,7 +212,7 @@ class PackageDir ( object ):
1099 self.name, shortver
1100 )
1101 if SUPPRESS_EXCEPTIONS:
1102 - self.logger.warning ( msg )
1103 + self.logger.info ( msg )
1104 else:
1105 raise Exception ( msg )
1106
1107 @@ -265,18 +258,20 @@ class PackageDir ( object ):
1108 return
1109
1110 self._lock.acquire()
1111 + try:
1112
1113 - if self._metadata is None or not use_old_metadata:
1114 - del self._metadata
1115 - self._metadata = MetadataJob ( self.logger )
1116 + if self._metadata is None or not use_old_metadata:
1117 + del self._metadata
1118 + self._metadata = MetadataJob ( self.logger )
1119
1120 - if use_all_packages:
1121 - for p_info in self._packages:
1122 - self._metadata.update ( p_info )
1123 - else:
1124 - self._metadata.update ( self._latest_package() )
1125 + if use_all_packages:
1126 + for p_info in self._packages:
1127 + self._metadata.update ( p_info )
1128 + else:
1129 + self._metadata.update ( self._latest_package() )
1130
1131 - self._lock.release()
1132 + finally:
1133 + self._lock.release()
1134 # --- end of generate_metadata (...) ---
1135
1136 def generate_manifest ( self ):
1137
1138 diff --git a/roverlay/overlay/worker.py b/roverlay/overlay/worker.py
1139 index 471cb94..541de7f 100644
1140 --- a/roverlay/overlay/worker.py
1141 +++ b/roverlay/overlay/worker.py
1142 @@ -3,16 +3,21 @@
1143 # Distributed under the terms of the GNU General Public License v2
1144
1145 #import time
1146 +import sys
1147 import threading
1148
1149 from roverlay.depres.channels import EbuildJobChannel
1150 from roverlay.ebuild.creation import EbuildCreation
1151
1152 +# this controls whether debug message from OverlayWorker.run() are printed
1153 +# to stderr or suppressed
1154 +DEBUG = True
1155 +
1156 class OverlayWorker ( object ):
1157 """Overlay package queue worker."""
1158
1159 def __init__ ( self,
1160 - pkg_queue, depresolver, logger, pkg_done, use_threads
1161 + pkg_queue, depresolver, logger, pkg_done, use_threads, err_queue
1162 ):
1163 """Initializes a worker.
1164
1165 @@ -29,11 +34,12 @@ class OverlayWorker ( object ):
1166 self.pkg_done = pkg_done
1167 self.depresolver = depresolver
1168
1169 + self.err_queue = err_queue
1170 +
1171 self._use_thread = use_threads
1172 self._thread = None
1173 -
1174 - self.enabled = True
1175 self.running = False
1176 + self.enabled = True
1177 self.halting = False
1178 # --- end of __init__ (...) ---
1179
1180 @@ -81,7 +87,8 @@ class OverlayWorker ( object ):
1181 """
1182 job = EbuildCreation (
1183 package_info,
1184 - depres_channel_spawner=self._get_resolver_channel
1185 + depres_channel_spawner=self._get_resolver_channel,
1186 + err_queue=self.err_queue
1187 )
1188 job.run()
1189 self.pkg_done ( package_info )
1190 @@ -89,42 +96,66 @@ class OverlayWorker ( object ):
1191
1192 def _run ( self ):
1193 """Runs the worker (thread mode)."""
1194 - self.running = True
1195 - self.halting = False
1196 - while self.enabled or (
1197 - self.halting and not self.pkg_queue.empty()
1198 - ):
1199 - if not self.running:
1200 - # exit now
1201 - break
1202 - p = self.pkg_queue.get()
1203
1204 - # drop empty requests that are used to unblock get()
1205 - if p is not None:
1206 - self._process ( p )
1207 - elif self.halting:
1208 - # receiving an empty request while halting means 'stop now',
1209 - self.enabled = False
1210 - self.halting = False
1211 + def debug ( msg ):
1212 + if DEBUG:
1213 + sys.stderr.write (
1214 + "%i WORKER: %s\n" % ( id ( self ), msg )
1215 + )
1216 +
1217 + try:
1218 + self.running = True
1219 + self.halting = False
1220 + while self.enabled or (
1221 + self.halting and not self.pkg_queue.empty()
1222 + ):
1223 + if not self.err_queue.empty():
1224 + # other workers died (or exit request sent)
1225 + debug ( "STOPPING #1" )
1226 + break
1227 +
1228 + debug ( "WAITING" )
1229 + p = self.pkg_queue.get()
1230 + debug ( "RECEIVED A TASK, " + str ( p ) )
1231 +
1232 + if not self.err_queue.empty():
1233 + debug ( "STOPPING #2" )
1234 + break
1235 +
1236 + # drop empty requests that are used to unblock get()
1237 + if p is not None:
1238 + debug ( "ENTER PROC" )
1239 + if self.err_queue.empty():
1240 + debug ( "__ empty exception/error queue!" )
1241 + self._process ( p )
1242 + elif self.halting:
1243 + # receiving an empty request while halting means 'stop now',
1244 + self.enabled = False
1245 + self.halting = False
1246 +
1247 + self.pkg_queue.task_done()
1248 +
1249 + debug ( "STOPPING - DONE" )
1250 + except ( Exception, KeyboardInterrupt ) as e:
1251 + self.logger.exception ( e )
1252 + self.err_queue.put_nowait ( ( id ( self ), e ) )
1253
1254 - self.pkg_queue.task_done()
1255 self.running = False
1256 +
1257 # --- end of run (...) ---
1258
1259 def _run_nothread ( self ):
1260 """Runs the worker (no-thread mode)."""
1261 self.running = True
1262 while self.enabled and not self.pkg_queue.empty():
1263 - if not self.running:
1264 - # exit now
1265 - break
1266
1267 - p = self.pkg_queue.get()
1268 + p = self.pkg_queue.get_nowait()
1269
1270 # drop empty requests that are used to unblock get()
1271 if p is not None:
1272 self._process ( p )
1273
1274 self.pkg_queue.task_done()
1275 +
1276 self.running = False
1277 # --- end of _run_nothread (...) ---