Gentoo Archives: gentoo-commits

From: Zac Medico <zmedico@g.o>
To: gentoo-commits@l.g.o
Subject: [gentoo-commits] proj/portage:master commit in: pym/portage/util/_eventloop/
Date: Thu, 27 Dec 2012 02:39:44
Message-Id: 1356575960.5c9bd5c9893e6f852b8dc38c2463f3f7f43122e4.zmedico@gentoo
1 commit: 5c9bd5c9893e6f852b8dc38c2463f3f7f43122e4
2 Author: Zac Medico <zmedico <AT> gentoo <DOT> org>
3 AuthorDate: Thu Dec 27 02:31:18 2012 +0000
4 Commit: Zac Medico <zmedico <AT> gentoo <DOT> org>
5 CommitDate: Thu Dec 27 02:39:20 2012 +0000
6 URL: http://git.overlays.gentoo.org/gitweb/?p=proj/portage.git;a=commit;h=5c9bd5c9
7
8 EventLoop: thread-safe idle_add and timeout_add
9
10 This may be useful for using threads to handle blocking IO with Jython,
11 since Jython lacks the fcntl module which is needed for non-blocking IO
12 (see http://bugs.jython.org/issue1074).
13
14 ---
15 pym/portage/util/_eventloop/EventLoop.py | 165 +++++++++++++++++------------
16 1 files changed, 97 insertions(+), 68 deletions(-)
17
18 diff --git a/pym/portage/util/_eventloop/EventLoop.py b/pym/portage/util/_eventloop/EventLoop.py
19 index 17a468f..37e6007 100644
20 --- a/pym/portage/util/_eventloop/EventLoop.py
21 +++ b/pym/portage/util/_eventloop/EventLoop.py
22 @@ -9,12 +9,23 @@ import select
23 import signal
24 import time
25
26 +try:
27 + import threading
28 +except ImportError:
29 + import dummy_threading as threading
30 +
31 from portage.util import writemsg_level
32 from ..SlotObject import SlotObject
33 from .PollConstants import PollConstants
34 from .PollSelectAdapter import PollSelectAdapter
35
36 class EventLoop(object):
37 + """
38 + An event loop, intended to be compatible with the GLib event loop.
39 + Call the iteration method in order to execute one iteration of the
40 + loop. The idle_add and timeout_add methods serve as thread-safe
41 + means to interact with the loop's thread.
42 + """
43
44 supports_multiprocessing = True
45
46 @@ -44,6 +55,7 @@ class EventLoop(object):
47 @type main: bool
48 """
49 self._use_signal = main
50 + self._thread_rlock = threading.RLock()
51 self._poll_event_queue = []
52 self._poll_event_handlers = {}
53 self._poll_event_handler_ids = {}
54 @@ -89,6 +101,14 @@ class EventLoop(object):
55 self._sigchld_src_id = None
56 self._pid = os.getpid()
57
58 + def _new_source_id(self):
59 + """
60 + Generate a new source id. This method is thread-safe.
61 + """
62 + with self._thread_rlock:
63 + self._event_handler_id += 1
64 + return self._event_handler_id
65 +
66 def _poll(self, timeout=None):
67 """
68 All poll() calls pass through here. The poll events
69 @@ -199,14 +219,17 @@ class EventLoop(object):
70 return bool(events_handled)
71
72 def _get_poll_timeout(self):
73 - if self._child_handlers:
74 - if self._timeout_interval is None:
75 - timeout = self._sigchld_interval
76 +
77 + with self._thread_rlock:
78 + if self._child_handlers:
79 + if self._timeout_interval is None:
80 + timeout = self._sigchld_interval
81 + else:
82 + timeout = min(self._sigchld_interval,
83 + self._timeout_interval)
84 else:
85 - timeout = min(self._sigchld_interval,
86 - self._timeout_interval)
87 - else:
88 - timeout = self._timeout_interval
89 + timeout = self._timeout_interval
90 +
91 return timeout
92
93 def child_watch_add(self, pid, callback, data=None):
94 @@ -229,8 +252,7 @@ class EventLoop(object):
95 @rtype: int
96 @return: an integer ID
97 """
98 - self._event_handler_id += 1
99 - source_id = self._event_handler_id
100 + source_id = self._new_source_id()
101 self._child_handlers[source_id] = self._child_callback_class(
102 callback=callback, data=data, pid=pid, source_id=source_id)
103
104 @@ -304,20 +326,21 @@ class EventLoop(object):
105 """
106 Like glib.idle_add(), if callback returns False it is
107 automatically removed from the list of event sources and will
108 - not be called again.
109 + not be called again. This method is thread-safe.
110
111 @type callback: callable
112 @param callback: a function to call
113 @rtype: int
114 @return: an integer ID
115 """
116 - self._event_handler_id += 1
117 - source_id = self._event_handler_id
118 - self._idle_callbacks[source_id] = self._idle_callback_class(
119 - args=args, callback=callback, source_id=source_id)
120 + with self._thread_rlock:
121 + source_id = self._new_source_id()
122 + self._idle_callbacks[source_id] = self._idle_callback_class(
123 + args=args, callback=callback, source_id=source_id)
124 return source_id
125
126 def _run_idle_callbacks(self):
127 + # assumes caller has acquired self._thread_rlock
128 if not self._idle_callbacks:
129 return
130 # Iterate of our local list, since self._idle_callbacks can be
131 @@ -342,16 +365,18 @@ class EventLoop(object):
132 milliseconds between calls to your function, and your function
133 should return False to stop being called, or True to continue
134 being called. Any additional positional arguments given here
135 - are passed to your function when it's called.
136 + are passed to your function when it's called. This method is
137 + thread-safe.
138 """
139 - self._event_handler_id += 1
140 - source_id = self._event_handler_id
141 - self._timeout_handlers[source_id] = \
142 - self._timeout_handler_class(
143 - interval=interval, function=function, args=args,
144 - source_id=source_id, timestamp=time.time())
145 - if self._timeout_interval is None or self._timeout_interval > interval:
146 - self._timeout_interval = interval
147 + with self._thread_rlock:
148 + source_id = self._new_source_id()
149 + self._timeout_handlers[source_id] = \
150 + self._timeout_handler_class(
151 + interval=interval, function=function, args=args,
152 + source_id=source_id, timestamp=time.time())
153 + if self._timeout_interval is None or \
154 + self._timeout_interval > interval:
155 + self._timeout_interval = interval
156 return source_id
157
158 def _run_timeouts(self):
159 @@ -361,37 +386,39 @@ class EventLoop(object):
160 if self._poll_child_processes():
161 calls += 1
162
163 - self._run_idle_callbacks()
164 -
165 - if not self._timeout_handlers:
166 - return bool(calls)
167 -
168 - ready_timeouts = []
169 - current_time = time.time()
170 - for x in self._timeout_handlers.values():
171 - elapsed_seconds = current_time - x.timestamp
172 - # elapsed_seconds < 0 means the system clock has been adjusted
173 - if elapsed_seconds < 0 or \
174 - (x.interval - 1000 * elapsed_seconds) <= 0:
175 - ready_timeouts.append(x)
176 -
177 - # Iterate of our local list, since self._timeout_handlers can be
178 - # modified during the exection of these callbacks.
179 - for x in ready_timeouts:
180 - if x.source_id not in self._timeout_handlers:
181 - # it got cancelled while executing another timeout
182 - continue
183 - if x.calling:
184 - # don't call it recursively
185 - continue
186 - calls += 1
187 - x.calling = True
188 - try:
189 - x.timestamp = time.time()
190 - if not x.function(*x.args):
191 - self.source_remove(x.source_id)
192 - finally:
193 - x.calling = False
194 + with self._thread_rlock:
195 +
196 + self._run_idle_callbacks()
197 +
198 + if not self._timeout_handlers:
199 + return bool(calls)
200 +
201 + ready_timeouts = []
202 + current_time = time.time()
203 + for x in self._timeout_handlers.values():
204 + elapsed_seconds = current_time - x.timestamp
205 + # elapsed_seconds < 0 means the system clock has been adjusted
206 + if elapsed_seconds < 0 or \
207 + (x.interval - 1000 * elapsed_seconds) <= 0:
208 + ready_timeouts.append(x)
209 +
210 + # Iterate of our local list, since self._timeout_handlers can be
211 + # modified during the exection of these callbacks.
212 + for x in ready_timeouts:
213 + if x.source_id not in self._timeout_handlers:
214 + # it got cancelled while executing another timeout
215 + continue
216 + if x.calling:
217 + # don't call it recursively
218 + continue
219 + calls += 1
220 + x.calling = True
221 + try:
222 + x.timestamp = time.time()
223 + if not x.function(*x.args):
224 + self.source_remove(x.source_id)
225 + finally:
226 + x.calling = False
227
228 return bool(calls)
229
230 @@ -413,8 +440,7 @@ class EventLoop(object):
231 """
232 if f in self._poll_event_handlers:
233 raise AssertionError("fd %d is already registered" % f)
234 - self._event_handler_id += 1
235 - source_id = self._event_handler_id
236 + source_id = self._new_source_id()
237 self._poll_event_handler_ids[source_id] = f
238 self._poll_event_handlers[f] = self._io_handler_class(
239 args=args, callback=callback, f=f, source_id=source_id)
240 @@ -434,18 +460,21 @@ class EventLoop(object):
241 self.source_remove(self._sigchld_src_id)
242 self._sigchld_src_id = None
243 return True
244 - idle_callback = self._idle_callbacks.pop(reg_id, None)
245 - if idle_callback is not None:
246 - return True
247 - timeout_handler = self._timeout_handlers.pop(reg_id, None)
248 - if timeout_handler is not None:
249 - if timeout_handler.interval == self._timeout_interval:
250 - if self._timeout_handlers:
251 - self._timeout_interval = \
252 - min(x.interval for x in self._timeout_handlers.values())
253 - else:
254 - self._timeout_interval = None
255 - return True
256 +
257 + with self._thread_rlock:
258 + idle_callback = self._idle_callbacks.pop(reg_id, None)
259 + if idle_callback is not None:
260 + return True
261 + timeout_handler = self._timeout_handlers.pop(reg_id, None)
262 + if timeout_handler is not None:
263 + if timeout_handler.interval == self._timeout_interval:
264 + if self._timeout_handlers:
265 + self._timeout_interval = min(x.interval
266 + for x in self._timeout_handlers.values())
267 + else:
268 + self._timeout_interval = None
269 + return True
270 +
271 f = self._poll_event_handler_ids.pop(reg_id, None)
272 if f is None:
273 return False