1 |
These changes are analogous to the PollScheduler changes in |
2 |
the previous commit. |
3 |
--- |
4 |
pym/portage/_emirrordist/MirrorDistTask.py | 39 +++++++++++++++++++++--------- |
5 |
1 file changed, 27 insertions(+), 12 deletions(-) |
6 |
|
7 |
diff --git a/pym/portage/_emirrordist/MirrorDistTask.py b/pym/portage/_emirrordist/MirrorDistTask.py |
8 |
index e23a11b..0702eb1 100644 |
9 |
--- a/pym/portage/_emirrordist/MirrorDistTask.py |
10 |
+++ b/pym/portage/_emirrordist/MirrorDistTask.py |
11 |
@@ -24,15 +24,16 @@ if sys.hexversion >= 0x3000000: |
12 |
|
13 |
class MirrorDistTask(CompositeTask): |
14 |
|
15 |
- __slots__ = ('_config', '_terminated', '_term_check_id') |
16 |
+ __slots__ = ('_config', '_fetch_iterator', '_term_rlock', |
17 |
+ '_terminated', '_term_callback_handle') |
18 |
|
19 |
def __init__(self, config): |
20 |
CompositeTask.__init__(self, scheduler=config.event_loop) |
21 |
self._config = config |
22 |
- self._terminated = threading.Event() |
23 |
+ self._term_rlock = threading.RLock() |
24 |
+ self._term_callback_handle = None |
25 |
|
26 |
def _start(self): |
27 |
- self._term_check_id = self.scheduler.idle_add(self._termination_check) |
28 |
fetch = TaskScheduler(iter(FetchIterator(self._config)), |
29 |
max_jobs=self._config.options.jobs, |
30 |
max_load=self._config.options.load_average, |
31 |
@@ -203,17 +204,31 @@ class MirrorDistTask(CompositeTask): |
32 |
logging.info("added %i files" % added_file_count) |
33 |
logging.info("added %i bytes total" % added_byte_count) |
34 |
|
35 |
+ def _cleanup(self): |
36 |
+ """ |
37 |
+ Cleanup any callbacks that have been registered with the global |
38 |
+ event loop. |
39 |
+ """ |
40 |
+ # The self._term_callback_handle attribute requires locking |
41 |
+ # since it's modified by the thread safe terminate method. |
42 |
+ with self._term_rlock: |
43 |
+ if self._term_callback_handle not in (None, False): |
44 |
+ self._term_callback_handle.cancel() |
45 |
+ # This prevents the terminate method from scheduling |
46 |
+ # any more callbacks (since _cleanup must eliminate all |
47 |
+ # callbacks in order to ensure complete cleanup). |
48 |
+ self._term_callback_handle = False |
49 |
+ |
50 |
def terminate(self): |
51 |
- self._terminated.set() |
52 |
+ with self._term_rlock: |
53 |
+ if self._term_callback_handle is None: |
54 |
+ self._term_callback_handle = self.scheduler.call_soon_threadsafe( |
55 |
+ self._term_callback) |
56 |
|
57 |
- def _termination_check(self): |
58 |
- if self._terminated.is_set(): |
59 |
- self.cancel() |
60 |
- self.wait() |
61 |
- return True |
62 |
+ def _term_callback(self): |
63 |
+ self.cancel() |
64 |
+ self.wait() |
65 |
|
66 |
def _wait(self): |
67 |
CompositeTask._wait(self) |
68 |
- if self._term_check_id is not None: |
69 |
- self.scheduler.source_remove(self._term_check_id) |
70 |
- self._term_check_id = None |
71 |
+ self._cleanup() |
72 |
-- |
73 |
2.10.2 |