1 |
commit: 3f74dd21143044949e6344f3cbcc7308a6d75ef6 |
2 |
Author: Zac Medico <zmedico <AT> gentoo <DOT> org> |
3 |
AuthorDate: Wed Feb 8 00:36:32 2012 +0000 |
4 |
Commit: Zac Medico <zmedico <AT> gentoo <DOT> org> |
5 |
CommitDate: Wed Feb 8 00:36:32 2012 +0000 |
6 |
URL: http://git.overlays.gentoo.org/gitweb/?p=proj/portage.git;a=commit;h=3f74dd21 |
7 |
|
8 |
PollScheduler: glib.io_add_watch() compatibility |
9 |
|
10 |
--- |
11 |
pym/_emerge/AsynchronousLock.py | 4 +++ |
12 |
pym/_emerge/EbuildIpcDaemon.py | 2 + |
13 |
pym/_emerge/EbuildMetadataPhase.py | 2 + |
14 |
pym/_emerge/PipeReader.py | 4 +++ |
15 |
pym/_emerge/PollScheduler.py | 52 ++++++++++++++++++++++++----------- |
16 |
pym/_emerge/Scheduler.py | 1 + |
17 |
pym/_emerge/SpawnProcess.py | 4 +++ |
18 |
pym/portage/dbapi/_MergeProcess.py | 2 + |
19 |
8 files changed, 54 insertions(+), 17 deletions(-) |
20 |
|
21 |
diff --git a/pym/_emerge/AsynchronousLock.py b/pym/_emerge/AsynchronousLock.py |
22 |
index 3593834..e166df3 100644 |
23 |
--- a/pym/_emerge/AsynchronousLock.py |
24 |
+++ b/pym/_emerge/AsynchronousLock.py |
25 |
@@ -143,6 +143,8 @@ class _LockThread(AbstractPollTask): |
26 |
self.returncode = os.EX_OK |
27 |
self.wait() |
28 |
|
29 |
+ return True |
30 |
+ |
31 |
def _cancel(self): |
32 |
# There's currently no way to force thread termination. |
33 |
pass |
34 |
@@ -280,6 +282,8 @@ class _LockProcess(AbstractPollTask): |
35 |
self.returncode = os.EX_OK |
36 |
self.wait() |
37 |
|
38 |
+ return True |
39 |
+ |
40 |
def _unregister(self): |
41 |
self._registered = False |
42 |
|
43 |
|
44 |
diff --git a/pym/_emerge/EbuildIpcDaemon.py b/pym/_emerge/EbuildIpcDaemon.py |
45 |
index 5dabe34..6a320cb 100644 |
46 |
--- a/pym/_emerge/EbuildIpcDaemon.py |
47 |
+++ b/pym/_emerge/EbuildIpcDaemon.py |
48 |
@@ -84,6 +84,8 @@ class EbuildIpcDaemon(FifoIpcDaemon): |
49 |
if reply_hook is not None: |
50 |
reply_hook() |
51 |
|
52 |
+ return True |
53 |
+ |
54 |
def _send_reply(self, reply): |
55 |
# File streams are in unbuffered mode since we do atomic |
56 |
# read and write of whole pickles. Use non-blocking mode so |
57 |
|
58 |
diff --git a/pym/_emerge/EbuildMetadataPhase.py b/pym/_emerge/EbuildMetadataPhase.py |
59 |
index d4f5bc0..f8da866 100644 |
60 |
--- a/pym/_emerge/EbuildMetadataPhase.py |
61 |
+++ b/pym/_emerge/EbuildMetadataPhase.py |
62 |
@@ -128,6 +128,8 @@ class EbuildMetadataPhase(SubProcess): |
63 |
|
64 |
self._unregister_if_appropriate(event) |
65 |
|
66 |
+ return True |
67 |
+ |
68 |
def _set_returncode(self, wait_retval): |
69 |
SubProcess._set_returncode(self, wait_retval) |
70 |
# self._raw_metadata is None when _start returns |
71 |
|
72 |
diff --git a/pym/_emerge/PipeReader.py b/pym/_emerge/PipeReader.py |
73 |
index 0f784cf..a85d794 100644 |
74 |
--- a/pym/_emerge/PipeReader.py |
75 |
+++ b/pym/_emerge/PipeReader.py |
76 |
@@ -74,6 +74,8 @@ class PipeReader(AbstractPollTask): |
77 |
|
78 |
self._unregister_if_appropriate(event) |
79 |
|
80 |
+ return True |
81 |
+ |
82 |
def _array_output_handler(self, fd, event): |
83 |
|
84 |
for f in self.input_files.values(): |
85 |
@@ -93,6 +95,8 @@ class PipeReader(AbstractPollTask): |
86 |
|
87 |
self._unregister_if_appropriate(event) |
88 |
|
89 |
+ return True |
90 |
+ |
91 |
def _unregister(self): |
92 |
""" |
93 |
Unregister from the scheduler and close open files. |
94 |
|
95 |
diff --git a/pym/_emerge/PollScheduler.py b/pym/_emerge/PollScheduler.py |
96 |
index 519a370..d6b971c 100644 |
97 |
--- a/pym/_emerge/PollScheduler.py |
98 |
+++ b/pym/_emerge/PollScheduler.py |
99 |
@@ -27,9 +27,12 @@ class PollScheduler(object): |
100 |
__slots__ = ("output", "register", "schedule", |
101 |
"source_remove", "timeout_add", "unregister") |
102 |
|
103 |
+ class _io_handler_class(SlotObject): |
104 |
+ __slots__ = ("args", "callback", "fd", "source_id") |
105 |
+ |
106 |
class _timeout_handler_class(SlotObject): |
107 |
- __slots__ = ("args", "function", "interval", "source_id", |
108 |
- "timestamp") |
109 |
+ __slots__ = ("args", "function", "interval", |
110 |
+ "io_add_watch", "source_id", "timestamp") |
111 |
|
112 |
def __init__(self): |
113 |
self._terminated = threading.Event() |
114 |
@@ -49,6 +52,7 @@ class PollScheduler(object): |
115 |
self._scheduling = False |
116 |
self._background = False |
117 |
self.sched_iface = self._sched_iface_class( |
118 |
+ io_add_watch=self._register, |
119 |
output=self._task_output, |
120 |
register=self._register, |
121 |
schedule=self._schedule_wait, |
122 |
@@ -248,8 +252,9 @@ class PollScheduler(object): |
123 |
try: |
124 |
while event_handlers: |
125 |
f, event = self._next_poll_event() |
126 |
- handler, reg_id = event_handlers[f] |
127 |
- handler(f, event) |
128 |
+ x = event_handlers[f] |
129 |
+ if not x.callback(f, event, *x.args): |
130 |
+ self._unregister(x.source_id) |
131 |
event_handled = True |
132 |
except StopIteration: |
133 |
event_handled = True |
134 |
@@ -277,8 +282,9 @@ class PollScheduler(object): |
135 |
try: |
136 |
while event_handlers and self._poll_event_queue: |
137 |
f, event = self._next_poll_event() |
138 |
- handler, reg_id = event_handlers[f] |
139 |
- handler(f, event) |
140 |
+ x = event_handlers[f] |
141 |
+ if not x.callback(f, event, *x.args): |
142 |
+ self._unregister(x.source_id) |
143 |
events_handled += 1 |
144 |
except StopIteration: |
145 |
events_handled += 1 |
146 |
@@ -332,20 +338,31 @@ class PollScheduler(object): |
147 |
|
148 |
return bool(ready_timeouts) |
149 |
|
150 |
- def _register(self, f, eventmask, handler): |
151 |
+ def _register(self, f, condition, callback, *args): |
152 |
""" |
153 |
- @rtype: Integer |
154 |
- @return: A unique registration id, for use in schedule() or |
155 |
- unregister() calls. |
156 |
+ Like glib.io_add_watch(), your function should return False to |
157 |
+ stop being called, or True to continue being called. Any |
158 |
+ additional positional arguments given here are passed to your |
159 |
+ function when it's called. |
160 |
+ |
161 |
+ @type f: int or object with fileno() method |
162 |
+ @param f: a file descriptor to monitor |
163 |
+ @type condition: int |
164 |
+ @param condition: a condition mask |
165 |
+ @type callback: callable |
166 |
+ @param callback: a function to call |
167 |
+ @rtype: int |
168 |
+ @return: an integer ID of the event source |
169 |
""" |
170 |
if f in self._poll_event_handlers: |
171 |
raise AssertionError("fd %d is already registered" % f) |
172 |
self._event_handler_id += 1 |
173 |
- reg_id = self._event_handler_id |
174 |
- self._poll_event_handler_ids[reg_id] = f |
175 |
- self._poll_event_handlers[f] = (handler, reg_id) |
176 |
- self._poll_obj.register(f, eventmask) |
177 |
- return reg_id |
178 |
+ source_id = self._event_handler_id |
179 |
+ self._poll_event_handler_ids[source_id] = f |
180 |
+ self._poll_event_handlers[f] = self._io_handler_class( |
181 |
+ args=args, callback=callback, f=f, source_id=source_id) |
182 |
+ self._poll_obj.register(f, condition) |
183 |
+ return source_id |
184 |
|
185 |
def _unregister(self, reg_id): |
186 |
""" |
187 |
@@ -409,8 +426,9 @@ class PollScheduler(object): |
188 |
while (wait_ids is None and event_handlers) or \ |
189 |
(wait_ids is not None and wait_ids.intersection(handler_ids)): |
190 |
f, event = self._next_poll_event(timeout=remaining_timeout) |
191 |
- handler, reg_id = event_handlers[f] |
192 |
- handler(f, event) |
193 |
+ x = event_handlers[f] |
194 |
+ if not x.callback(f, event, *x.args): |
195 |
+ self._unregister(x.source_id) |
196 |
event_handled = True |
197 |
if condition is not None and condition(): |
198 |
break |
199 |
|
200 |
diff --git a/pym/_emerge/Scheduler.py b/pym/_emerge/Scheduler.py |
201 |
index e6f3e0e..55e327f 100644 |
202 |
--- a/pym/_emerge/Scheduler.py |
203 |
+++ b/pym/_emerge/Scheduler.py |
204 |
@@ -218,6 +218,7 @@ class Scheduler(PollScheduler): |
205 |
schedule=self._schedule_fetch) |
206 |
self._sched_iface = self._iface_class( |
207 |
fetch=fetch_iface, output=self._task_output, |
208 |
+ io_add_watch=self._register, |
209 |
register=self._register, |
210 |
schedule=self._schedule_wait, |
211 |
scheduleSetup=self._schedule_setup, |
212 |
|
213 |
diff --git a/pym/_emerge/SpawnProcess.py b/pym/_emerge/SpawnProcess.py |
214 |
index ec5bf7d..411e7c6 100644 |
215 |
--- a/pym/_emerge/SpawnProcess.py |
216 |
+++ b/pym/_emerge/SpawnProcess.py |
217 |
@@ -218,6 +218,8 @@ class SpawnProcess(SubProcess): |
218 |
|
219 |
self._unregister_if_appropriate(event) |
220 |
|
221 |
+ return True |
222 |
+ |
223 |
def _dummy_handler(self, fd, event): |
224 |
""" |
225 |
This method is mainly interested in detecting EOF, since |
226 |
@@ -240,6 +242,8 @@ class SpawnProcess(SubProcess): |
227 |
|
228 |
self._unregister_if_appropriate(event) |
229 |
|
230 |
+ return True |
231 |
+ |
232 |
def _unregister(self): |
233 |
super(SpawnProcess, self)._unregister() |
234 |
if self._log_file_real is not None: |
235 |
|
236 |
diff --git a/pym/portage/dbapi/_MergeProcess.py b/pym/portage/dbapi/_MergeProcess.py |
237 |
index df501be..eed7bd4 100644 |
238 |
--- a/pym/portage/dbapi/_MergeProcess.py |
239 |
+++ b/pym/portage/dbapi/_MergeProcess.py |
240 |
@@ -83,6 +83,8 @@ class MergeProcess(SpawnProcess): |
241 |
reporter = getattr(portage.elog.messages, funcname) |
242 |
reporter(msg, phase=phase, key=key, out=out) |
243 |
|
244 |
+ return True |
245 |
+ |
246 |
def _spawn(self, args, fd_pipes, **kwargs): |
247 |
""" |
248 |
Fork a subprocess, apply local settings, and call |