Gentoo Archives: gentoo-commits

From: Zac Medico <zmedico@g.o>
To: gentoo-commits@l.g.o
Subject: [gentoo-commits] proj/portage:master commit in: pym/portage/util/_eventloop/
Date: Thu, 27 Dec 2012 02:31:45
Message-Id: 1356575478.323bfee21f553b2c3b088bf6398b8d9e6b963aaf.zmedico@gentoo
1 commit: 323bfee21f553b2c3b088bf6398b8d9e6b963aaf
2 Author: Zac Medico <zmedico <AT> gentoo <DOT> org>
3 AuthorDate: Thu Dec 27 02:31:18 2012 +0000
4 Commit: Zac Medico <zmedico <AT> gentoo <DOT> org>
5 CommitDate: Thu Dec 27 02:31:18 2012 +0000
6 URL: http://git.overlays.gentoo.org/gitweb/?p=proj/portage.git;a=commit;h=323bfee2
7
8 EventLoop: thread-safe idle_add and timeout_add
9
10 This may be useful for using threads to handle blocking IO with Jython,
11 since Jython lacks the fcntl module which is needed for non-blocking IO
12 (see http://bugs.jython.org/issue1074).
13
14 ---
15 pym/portage/util/_eventloop/EventLoop.py | 145 +++++++++++++++++------------
16 1 files changed, 85 insertions(+), 60 deletions(-)
17
18 diff --git a/pym/portage/util/_eventloop/EventLoop.py b/pym/portage/util/_eventloop/EventLoop.py
19 index 17a468f..6f464df 100644
20 --- a/pym/portage/util/_eventloop/EventLoop.py
21 +++ b/pym/portage/util/_eventloop/EventLoop.py
22 @@ -9,12 +9,23 @@ import select
23 import signal
24 import time
25
26 +try:
27 + import threading
28 +except ImportError:
29 + import dummy_threading as threading
30 +
31 from portage.util import writemsg_level
32 from ..SlotObject import SlotObject
33 from .PollConstants import PollConstants
34 from .PollSelectAdapter import PollSelectAdapter
35
36 class EventLoop(object):
37 + """
38 + An event loop, intended to be compatible with the GLib event loop.
39 + Call the iteration method in order to execute one iteration of the
40 + loop. The idle_add and timeout_add methods serve as thread-safe
41 + means to interact with the loop's thread.
42 + """
43
44 supports_multiprocessing = True
45
46 @@ -44,6 +55,7 @@ class EventLoop(object):
47 @type main: bool
48 """
49 self._use_signal = main
50 + self._thread_rlock = threading.RLock()
51 self._poll_event_queue = []
52 self._poll_event_handlers = {}
53 self._poll_event_handler_ids = {}
54 @@ -89,6 +101,14 @@ class EventLoop(object):
55 self._sigchld_src_id = None
56 self._pid = os.getpid()
57
58 + def _new_source_id(self):
59 + """
60 + Generate a new source id. This method is thread-safe.
61 + """
62 + with self._thread_rlock:
63 + self._event_handler_id += 1
64 + return self._event_handler_id
65 +
66 def _poll(self, timeout=None):
67 """
68 All poll() calls pass through here. The poll events
69 @@ -229,8 +249,7 @@ class EventLoop(object):
70 @rtype: int
71 @return: an integer ID
72 """
73 - self._event_handler_id += 1
74 - source_id = self._event_handler_id
75 + source_id = self._new_source_id()
76 self._child_handlers[source_id] = self._child_callback_class(
77 callback=callback, data=data, pid=pid, source_id=source_id)
78
79 @@ -304,20 +323,21 @@ class EventLoop(object):
80 """
81 Like glib.idle_add(), if callback returns False it is
82 automatically removed from the list of event sources and will
83 - not be called again.
84 + not be called again. This method is thread-safe.
85
86 @type callback: callable
87 @param callback: a function to call
88 @rtype: int
89 @return: an integer ID
90 """
91 - self._event_handler_id += 1
92 - source_id = self._event_handler_id
93 - self._idle_callbacks[source_id] = self._idle_callback_class(
94 - args=args, callback=callback, source_id=source_id)
95 + with self._thread_rlock:
96 + source_id = self._new_source_id()
97 + self._idle_callbacks[source_id] = self._idle_callback_class(
98 + args=args, callback=callback, source_id=source_id)
99 return source_id
100
101 def _run_idle_callbacks(self):
102 + # assumes caller has acquired self._thread_rlock
103 if not self._idle_callbacks:
104 return
105 # Iterate of our local list, since self._idle_callbacks can be
106 @@ -344,14 +364,15 @@ class EventLoop(object):
107 being called. Any additional positional arguments given here
108 are passed to your function when it's called.
109 """
110 - self._event_handler_id += 1
111 - source_id = self._event_handler_id
112 - self._timeout_handlers[source_id] = \
113 - self._timeout_handler_class(
114 - interval=interval, function=function, args=args,
115 - source_id=source_id, timestamp=time.time())
116 - if self._timeout_interval is None or self._timeout_interval > interval:
117 - self._timeout_interval = interval
118 + with self._thread_rlock:
119 + source_id = self._new_source_id()
120 + self._timeout_handlers[source_id] = \
121 + self._timeout_handler_class(
122 + interval=interval, function=function, args=args,
123 + source_id=source_id, timestamp=time.time())
124 + if self._timeout_interval is None or \
125 + self._timeout_interval > interval:
126 + self._timeout_interval = interval
127 return source_id
128
129 def _run_timeouts(self):
130 @@ -361,37 +382,39 @@ class EventLoop(object):
131 if self._poll_child_processes():
132 calls += 1
133
134 - self._run_idle_callbacks()
135 -
136 - if not self._timeout_handlers:
137 - return bool(calls)
138 -
139 - ready_timeouts = []
140 - current_time = time.time()
141 - for x in self._timeout_handlers.values():
142 - elapsed_seconds = current_time - x.timestamp
143 - # elapsed_seconds < 0 means the system clock has been adjusted
144 - if elapsed_seconds < 0 or \
145 - (x.interval - 1000 * elapsed_seconds) <= 0:
146 - ready_timeouts.append(x)
147 -
148 - # Iterate of our local list, since self._timeout_handlers can be
149 - # modified during the exection of these callbacks.
150 - for x in ready_timeouts:
151 - if x.source_id not in self._timeout_handlers:
152 - # it got cancelled while executing another timeout
153 - continue
154 - if x.calling:
155 - # don't call it recursively
156 - continue
157 - calls += 1
158 - x.calling = True
159 - try:
160 - x.timestamp = time.time()
161 - if not x.function(*x.args):
162 - self.source_remove(x.source_id)
163 - finally:
164 - x.calling = False
165 + with self._thread_rlock:
166 +
167 + self._run_idle_callbacks()
168 +
169 + if not self._timeout_handlers:
170 + return bool(calls)
171 +
172 + ready_timeouts = []
173 + current_time = time.time()
174 + for x in self._timeout_handlers.values():
175 + elapsed_seconds = current_time - x.timestamp
176 + # elapsed_seconds < 0 means the system clock has been adjusted
177 + if elapsed_seconds < 0 or \
178 + (x.interval - 1000 * elapsed_seconds) <= 0:
179 + ready_timeouts.append(x)
180 +
181 + # Iterate of our local list, since self._timeout_handlers can be
182 + # modified during the exection of these callbacks.
183 + for x in ready_timeouts:
184 + if x.source_id not in self._timeout_handlers:
185 + # it got cancelled while executing another timeout
186 + continue
187 + if x.calling:
188 + # don't call it recursively
189 + continue
190 + calls += 1
191 + x.calling = True
192 + try:
193 + x.timestamp = time.time()
194 + if not x.function(*x.args):
195 + self.source_remove(x.source_id)
196 + finally:
197 + x.calling = False
198
199 return bool(calls)
200
201 @@ -413,8 +436,7 @@ class EventLoop(object):
202 """
203 if f in self._poll_event_handlers:
204 raise AssertionError("fd %d is already registered" % f)
205 - self._event_handler_id += 1
206 - source_id = self._event_handler_id
207 + source_id = self._new_source_id()
208 self._poll_event_handler_ids[source_id] = f
209 self._poll_event_handlers[f] = self._io_handler_class(
210 args=args, callback=callback, f=f, source_id=source_id)
211 @@ -434,18 +456,21 @@ class EventLoop(object):
212 self.source_remove(self._sigchld_src_id)
213 self._sigchld_src_id = None
214 return True
215 - idle_callback = self._idle_callbacks.pop(reg_id, None)
216 - if idle_callback is not None:
217 - return True
218 - timeout_handler = self._timeout_handlers.pop(reg_id, None)
219 - if timeout_handler is not None:
220 - if timeout_handler.interval == self._timeout_interval:
221 - if self._timeout_handlers:
222 - self._timeout_interval = \
223 - min(x.interval for x in self._timeout_handlers.values())
224 - else:
225 - self._timeout_interval = None
226 - return True
227 +
228 + with self._thread_rlock:
229 + idle_callback = self._idle_callbacks.pop(reg_id, None)
230 + if idle_callback is not None:
231 + return True
232 + timeout_handler = self._timeout_handlers.pop(reg_id, None)
233 + if timeout_handler is not None:
234 + if timeout_handler.interval == self._timeout_interval:
235 + if self._timeout_handlers:
236 + self._timeout_interval = min(x.interval
237 + for x in self._timeout_handlers.values())
238 + else:
239 + self._timeout_interval = None
240 + return True
241 +
242 f = self._poll_event_handler_ids.pop(reg_id, None)
243 if f is None:
244 return False