Gentoo Archives: gentoo-portage-dev

From: Zac Medico <zmedico@g.o>
To: gentoo-portage-dev@l.g.o
Cc: Zac Medico <zmedico@g.o>
Subject: [gentoo-portage-dev] [PATCH 2/5] PollScheduler: terminate via call_soon for asyncio compat
Date: Fri, 24 Mar 2017 02:55:51
Message-Id: 20170324025500.19518-2-zmedico@gentoo.org
In Reply to: [gentoo-portage-dev] [PATCH 1/5] EventLoop: implement call_soon for asyncio compat (bug 591760) by Zac Medico
1 Use call_soon to schedule the _termination_check callback when needed.
2 The previous idle_add usage was relatively inefficient, because it
3 scheduled the _termination_check callback to be called in every
4 iteration of the event loop.
5
6 Add a _cleanup method to handle cleanup of callbacks registered with
7 the global event loop. Since the terminate method is thread safe and it
8 interacts with self._term_callback_handle, use this variable only while
9 holding a lock.
10 ---
11 pym/_emerge/PollScheduler.py | 57 +++++++++++++++++++++++--------
12 pym/_emerge/Scheduler.py | 7 ++--
13 pym/portage/util/_async/AsyncScheduler.py | 16 ++++-----
14 3 files changed, 54 insertions(+), 26 deletions(-)
15
16 diff --git a/pym/_emerge/PollScheduler.py b/pym/_emerge/PollScheduler.py
17 index b118ac1..569879b 100644
18 --- a/pym/_emerge/PollScheduler.py
19 +++ b/pym/_emerge/PollScheduler.py
20 @@ -25,8 +25,10 @@ class PollScheduler(object):
21 a non-main thread)
22 @type main: bool
23 """
24 + self._term_rlock = threading.RLock()
25 self._terminated = threading.Event()
26 self._terminated_tasks = False
27 + self._term_check_handle = None
28 self._max_jobs = 1
29 self._max_load = None
30 self._scheduling = False
31 @@ -44,6 +46,21 @@ class PollScheduler(object):
32 def _is_background(self):
33 return self._background
34
35 + def _cleanup(self):
36 + """
37 + Cleanup any callbacks that have been registered with the global
38 + event loop.
39 + """
40 + # The self._term_check_handle attribute requires locking
41 + # since it's modified by the thread safe terminate method.
42 + with self._term_rlock:
43 + if self._term_check_handle not in (None, False):
44 + self._term_check_handle.cancel()
45 + # This prevents the terminate method from scheduling
46 + # any more callbacks (since _cleanup must eliminate all
47 + # callbacks in order to ensure complete cleanup).
48 + self._term_check_handle = False
49 +
50 def terminate(self):
51 """
52 Schedules asynchronous, graceful termination of the scheduler
53 @@ -51,26 +68,36 @@ class PollScheduler(object):
54
55 This method is thread-safe (and safe for signal handlers).
56 """
57 - self._terminated.set()
58 + with self._term_rlock:
59 + if self._term_check_handle is None:
60 + self._terminated.set()
61 + self._term_check_handle = self._event_loop.call_soon_threadsafe(
62 + self._termination_check, True)
63
64 - def _termination_check(self):
65 + def _termination_check(self, retry=False):
66 """
67 Calls _terminate_tasks() if appropriate. It's guaranteed not to
68 - call it while _schedule_tasks() is being called. The check should
69 - be executed for each iteration of the event loop, for response to
70 - termination signals at the earliest opportunity. It always returns
71 - True, for continuous scheduling via idle_add.
72 + call it while _schedule_tasks() is being called. This method must
73 + only be called via the event loop thread.
74 +
75 + @param retry: If True then reschedule if scheduling state prevents
76 + immediate termination.
77 + @type retry: bool
78 """
79 - if not self._scheduling and \
80 - self._terminated.is_set() and \
81 + if self._terminated.is_set() and \
82 not self._terminated_tasks:
83 - self._scheduling = True
84 - try:
85 - self._terminated_tasks = True
86 - self._terminate_tasks()
87 - finally:
88 - self._scheduling = False
89 - return True
90 + if not self._scheduling:
91 + self._scheduling = True
92 + try:
93 + self._terminated_tasks = True
94 + self._terminate_tasks()
95 + finally:
96 + self._scheduling = False
97 +
98 + elif retry:
99 + with self._term_rlock:
100 + self._term_check_handle = self._event_loop.call_soon(
101 + self._termination_check, True)
102
103 def _terminate_tasks(self):
104 """
105 diff --git a/pym/_emerge/Scheduler.py b/pym/_emerge/Scheduler.py
106 index 71fe75f..58ff971 100644
107 --- a/pym/_emerge/Scheduler.py
108 +++ b/pym/_emerge/Scheduler.py
109 @@ -1055,6 +1055,7 @@ class Scheduler(PollScheduler):
110 else:
111 signal.signal(signal.SIGCONT, signal.SIG_DFL)
112
113 + self._termination_check()
114 if received_signal:
115 sys.exit(received_signal[0])
116
117 @@ -1091,6 +1092,10 @@ class Scheduler(PollScheduler):
118 if isinstance(x, Package) and x.operation == "merge"])
119 self._status_display.maxval = self._pkg_count.maxval
120
121 + # Cleanup any callbacks that have been registered with the global
122 + # event loop by calls to the terminate method.
123 + self._cleanup()
124 +
125 self._logger.log(" *** Finished. Cleaning up...")
126
127 if failed_pkgs:
128 @@ -1393,7 +1398,6 @@ class Scheduler(PollScheduler):
129 blocker_db.discardBlocker(pkg)
130
131 def _main_loop(self):
132 - term_check_id = self._event_loop.idle_add(self._termination_check)
133 loadavg_check_id = None
134 if self._max_load is not None and \
135 self._loadavg_latency is not None and \
136 @@ -1420,7 +1424,6 @@ class Scheduler(PollScheduler):
137 while self._is_work_scheduled():
138 self._event_loop.iteration()
139 finally:
140 - self._event_loop.source_remove(term_check_id)
141 if loadavg_check_id is not None:
142 self._event_loop.source_remove(loadavg_check_id)
143
144 diff --git a/pym/portage/util/_async/AsyncScheduler.py b/pym/portage/util/_async/AsyncScheduler.py
145 index 9b96c6f..3deb6cb 100644
146 --- a/pym/portage/util/_async/AsyncScheduler.py
147 +++ b/pym/portage/util/_async/AsyncScheduler.py
148 @@ -18,7 +18,6 @@ class AsyncScheduler(AsynchronousTask, PollScheduler):
149 self._error_count = 0
150 self._running_tasks = set()
151 self._remaining_tasks = True
152 - self._term_check_id = None
153 self._loadavg_check_id = None
154
155 def _poll(self):
156 @@ -65,7 +64,6 @@ class AsyncScheduler(AsynchronousTask, PollScheduler):
157 self._schedule()
158
159 def _start(self):
160 - self._term_check_id = self._event_loop.idle_add(self._termination_check)
161 if self._max_load is not None and \
162 self._loadavg_latency is not None and \
163 (self._max_jobs is True or self._max_jobs > 1):
164 @@ -75,6 +73,12 @@ class AsyncScheduler(AsynchronousTask, PollScheduler):
165 self._loadavg_latency, self._schedule)
166 self._schedule()
167
168 + def _cleanup(self):
169 + super(AsyncScheduler, self)._cleanup()
170 + if self._loadavg_check_id is not None:
171 + self._event_loop.source_remove(self._loadavg_check_id)
172 + self._loadavg_check_id = None
173 +
174 def _wait(self):
175 # Loop while there are jobs to be scheduled.
176 while self._keep_scheduling():
177 @@ -86,13 +90,7 @@ class AsyncScheduler(AsynchronousTask, PollScheduler):
178 while self._is_work_scheduled():
179 self._event_loop.iteration()
180
181 - if self._term_check_id is not None:
182 - self._event_loop.source_remove(self._term_check_id)
183 - self._term_check_id = None
184 -
185 - if self._loadavg_check_id is not None:
186 - self._event_loop.source_remove(self._loadavg_check_id)
187 - self._loadavg_check_id = None
188 + self._cleanup()
189
190 if self._error_count > 0:
191 self.returncode = 1
192 --
193 2.10.2