1 |
Use call_soon to schedule the _termination_check callback when needed. |
2 |
The previous idle_add usage was relatively inefficient, because it |
3 |
scheduled the _termination_check callback to be called in every |
4 |
iteration of the event loop. |
5 |
|
6 |
Add a _cleanup method to handle cleanup of callbacks registered with |
7 |
the global event loop. Since the terminate method is thread safe and it |
8 |
interacts with self._term_callback_handle, use this variable only while |
9 |
holding a lock. |
10 |
--- |
11 |
pym/_emerge/PollScheduler.py | 57 +++++++++++++++++++++++-------- |
12 |
pym/_emerge/Scheduler.py | 7 ++-- |
13 |
pym/portage/util/_async/AsyncScheduler.py | 16 ++++----- |
14 |
3 files changed, 54 insertions(+), 26 deletions(-) |
15 |
|
16 |
diff --git a/pym/_emerge/PollScheduler.py b/pym/_emerge/PollScheduler.py |
17 |
index b118ac1..569879b 100644 |
18 |
--- a/pym/_emerge/PollScheduler.py |
19 |
+++ b/pym/_emerge/PollScheduler.py |
20 |
@@ -25,8 +25,10 @@ class PollScheduler(object): |
21 |
a non-main thread) |
22 |
@type main: bool |
23 |
""" |
24 |
+ self._term_rlock = threading.RLock() |
25 |
self._terminated = threading.Event() |
26 |
self._terminated_tasks = False |
27 |
+ self._term_check_handle = None |
28 |
self._max_jobs = 1 |
29 |
self._max_load = None |
30 |
self._scheduling = False |
31 |
@@ -44,6 +46,21 @@ class PollScheduler(object): |
32 |
def _is_background(self): |
33 |
return self._background |
34 |
|
35 |
+ def _cleanup(self): |
36 |
+ """ |
37 |
+ Cleanup any callbacks that have been registered with the global |
38 |
+ event loop. |
39 |
+ """ |
40 |
+ # The self._term_check_handle attribute requires locking |
41 |
+ # since it's modified by the thread safe terminate method. |
42 |
+ with self._term_rlock: |
43 |
+ if self._term_check_handle not in (None, False): |
44 |
+ self._term_check_handle.cancel() |
45 |
+ # This prevents the terminate method from scheduling |
46 |
+ # any more callbacks (since _cleanup must eliminate all |
47 |
+ # callbacks in order to ensure complete cleanup). |
48 |
+ self._term_check_handle = False |
49 |
+ |
50 |
def terminate(self): |
51 |
""" |
52 |
Schedules asynchronous, graceful termination of the scheduler |
53 |
@@ -51,26 +68,36 @@ class PollScheduler(object): |
54 |
|
55 |
This method is thread-safe (and safe for signal handlers). |
56 |
""" |
57 |
- self._terminated.set() |
58 |
+ with self._term_rlock: |
59 |
+ if self._term_check_handle is None: |
60 |
+ self._terminated.set() |
61 |
+ self._term_check_handle = self._event_loop.call_soon_threadsafe( |
62 |
+ self._termination_check, True) |
63 |
|
64 |
- def _termination_check(self): |
65 |
+ def _termination_check(self, retry=False): |
66 |
""" |
67 |
Calls _terminate_tasks() if appropriate. It's guaranteed not to |
68 |
- call it while _schedule_tasks() is being called. The check should |
69 |
- be executed for each iteration of the event loop, for response to |
70 |
- termination signals at the earliest opportunity. It always returns |
71 |
- True, for continuous scheduling via idle_add. |
72 |
+ call it while _schedule_tasks() is being called. This method must |
73 |
+ only be called via the event loop thread. |
74 |
+ |
75 |
+ @param retry: If True then reschedule if scheduling state prevents |
76 |
+ immediate termination. |
77 |
+ @type retry: bool |
78 |
""" |
79 |
- if not self._scheduling and \ |
80 |
- self._terminated.is_set() and \ |
81 |
+ if self._terminated.is_set() and \ |
82 |
not self._terminated_tasks: |
83 |
- self._scheduling = True |
84 |
- try: |
85 |
- self._terminated_tasks = True |
86 |
- self._terminate_tasks() |
87 |
- finally: |
88 |
- self._scheduling = False |
89 |
- return True |
90 |
+ if not self._scheduling: |
91 |
+ self._scheduling = True |
92 |
+ try: |
93 |
+ self._terminated_tasks = True |
94 |
+ self._terminate_tasks() |
95 |
+ finally: |
96 |
+ self._scheduling = False |
97 |
+ |
98 |
+ elif retry: |
99 |
+ with self._term_rlock: |
100 |
+ self._term_check_handle = self._event_loop.call_soon( |
101 |
+ self._termination_check, True) |
102 |
|
103 |
def _terminate_tasks(self): |
104 |
""" |
105 |
diff --git a/pym/_emerge/Scheduler.py b/pym/_emerge/Scheduler.py |
106 |
index 71fe75f..58ff971 100644 |
107 |
--- a/pym/_emerge/Scheduler.py |
108 |
+++ b/pym/_emerge/Scheduler.py |
109 |
@@ -1055,6 +1055,7 @@ class Scheduler(PollScheduler): |
110 |
else: |
111 |
signal.signal(signal.SIGCONT, signal.SIG_DFL) |
112 |
|
113 |
+ self._termination_check() |
114 |
if received_signal: |
115 |
sys.exit(received_signal[0]) |
116 |
|
117 |
@@ -1091,6 +1092,10 @@ class Scheduler(PollScheduler): |
118 |
if isinstance(x, Package) and x.operation == "merge"]) |
119 |
self._status_display.maxval = self._pkg_count.maxval |
120 |
|
121 |
+ # Cleanup any callbacks that have been registered with the global |
122 |
+ # event loop by calls to the terminate method. |
123 |
+ self._cleanup() |
124 |
+ |
125 |
self._logger.log(" *** Finished. Cleaning up...") |
126 |
|
127 |
if failed_pkgs: |
128 |
@@ -1393,7 +1398,6 @@ class Scheduler(PollScheduler): |
129 |
blocker_db.discardBlocker(pkg) |
130 |
|
131 |
def _main_loop(self): |
132 |
- term_check_id = self._event_loop.idle_add(self._termination_check) |
133 |
loadavg_check_id = None |
134 |
if self._max_load is not None and \ |
135 |
self._loadavg_latency is not None and \ |
136 |
@@ -1420,7 +1424,6 @@ class Scheduler(PollScheduler): |
137 |
while self._is_work_scheduled(): |
138 |
self._event_loop.iteration() |
139 |
finally: |
140 |
- self._event_loop.source_remove(term_check_id) |
141 |
if loadavg_check_id is not None: |
142 |
self._event_loop.source_remove(loadavg_check_id) |
143 |
|
144 |
diff --git a/pym/portage/util/_async/AsyncScheduler.py b/pym/portage/util/_async/AsyncScheduler.py |
145 |
index 9b96c6f..3deb6cb 100644 |
146 |
--- a/pym/portage/util/_async/AsyncScheduler.py |
147 |
+++ b/pym/portage/util/_async/AsyncScheduler.py |
148 |
@@ -18,7 +18,6 @@ class AsyncScheduler(AsynchronousTask, PollScheduler): |
149 |
self._error_count = 0 |
150 |
self._running_tasks = set() |
151 |
self._remaining_tasks = True |
152 |
- self._term_check_id = None |
153 |
self._loadavg_check_id = None |
154 |
|
155 |
def _poll(self): |
156 |
@@ -65,7 +64,6 @@ class AsyncScheduler(AsynchronousTask, PollScheduler): |
157 |
self._schedule() |
158 |
|
159 |
def _start(self): |
160 |
- self._term_check_id = self._event_loop.idle_add(self._termination_check) |
161 |
if self._max_load is not None and \ |
162 |
self._loadavg_latency is not None and \ |
163 |
(self._max_jobs is True or self._max_jobs > 1): |
164 |
@@ -75,6 +73,12 @@ class AsyncScheduler(AsynchronousTask, PollScheduler): |
165 |
self._loadavg_latency, self._schedule) |
166 |
self._schedule() |
167 |
|
168 |
+ def _cleanup(self): |
169 |
+ super(AsyncScheduler, self)._cleanup() |
170 |
+ if self._loadavg_check_id is not None: |
171 |
+ self._event_loop.source_remove(self._loadavg_check_id) |
172 |
+ self._loadavg_check_id = None |
173 |
+ |
174 |
def _wait(self): |
175 |
# Loop while there are jobs to be scheduled. |
176 |
while self._keep_scheduling(): |
177 |
@@ -86,13 +90,7 @@ class AsyncScheduler(AsynchronousTask, PollScheduler): |
178 |
while self._is_work_scheduled(): |
179 |
self._event_loop.iteration() |
180 |
|
181 |
- if self._term_check_id is not None: |
182 |
- self._event_loop.source_remove(self._term_check_id) |
183 |
- self._term_check_id = None |
184 |
- |
185 |
- if self._loadavg_check_id is not None: |
186 |
- self._event_loop.source_remove(self._loadavg_check_id) |
187 |
- self._loadavg_check_id = None |
188 |
+ self._cleanup() |
189 |
|
190 |
if self._error_count > 0: |
191 |
self.returncode = 1 |
192 |
-- |
193 |
2.10.2 |