1 |
Author: grobian |
2 |
Date: 2009-06-27 13:19:35 +0000 (Sat, 27 Jun 2009) |
3 |
New Revision: 13707 |
4 |
|
5 |
Added: |
6 |
main/branches/prefix/pym/_emerge/PollScheduler.py |
7 |
main/branches/prefix/pym/_emerge/QueueScheduler.py |
8 |
main/branches/prefix/pym/_emerge/TaskScheduler.py |
9 |
Modified: |
10 |
main/branches/prefix/pym/_emerge/__init__.py |
11 |
main/branches/prefix/pym/portage/tests/process/test_poll.py |
12 |
Log: |
13 |
Merged from trunk -r13667:13668 |
14 |
|
15 |
| 13668 | Bug #275047 - Split _emerge/__init__.py into smaller pieces | |
16 |
| zmedico | (part 3). Thanks to Sebastian Mingramm (few) | |
17 |
| | <s.mingramm@×××.de> for this patch. | |
18 |
|
19 |
|
20 |
Copied: main/branches/prefix/pym/_emerge/PollScheduler.py (from rev 13668, main/trunk/pym/_emerge/PollScheduler.py) |
21 |
=================================================================== |
22 |
--- main/branches/prefix/pym/_emerge/PollScheduler.py (rev 0) |
23 |
+++ main/branches/prefix/pym/_emerge/PollScheduler.py 2009-06-27 13:19:35 UTC (rev 13707) |
24 |
@@ -0,0 +1,251 @@ |
25 |
+import logging |
26 |
+import select |
27 |
+ |
28 |
+from portage.util import writemsg_level |
29 |
+ |
30 |
+from _emerge.SlotObject import SlotObject |
31 |
+from _emerge.getloadavg import getloadavg |
32 |
+from _emerge.PollConstants import PollConstants |
33 |
+from _emerge.PollSelectAdapter import PollSelectAdapter |
34 |
+ |
35 |
+class PollScheduler(object): |
36 |
+ |
37 |
+ class _sched_iface_class(SlotObject): |
38 |
+ __slots__ = ("register", "schedule", "unregister") |
39 |
+ |
40 |
+ def __init__(self): |
41 |
+ self._max_jobs = 1 |
42 |
+ self._max_load = None |
43 |
+ self._jobs = 0 |
44 |
+ self._poll_event_queue = [] |
45 |
+ self._poll_event_handlers = {} |
46 |
+ self._poll_event_handler_ids = {} |
47 |
+ # Increment id for each new handler. |
48 |
+ self._event_handler_id = 0 |
49 |
+ self._poll_obj = create_poll_instance() |
50 |
+ self._scheduling = False |
51 |
+ |
52 |
+ def _schedule(self): |
53 |
+ """ |
54 |
+ Calls _schedule_tasks() and automatically returns early from |
55 |
+ any recursive calls to this method that the _schedule_tasks() |
56 |
+ call might trigger. This makes _schedule() safe to call from |
57 |
+ inside exit listeners. |
58 |
+ """ |
59 |
+ if self._scheduling: |
60 |
+ return False |
61 |
+ self._scheduling = True |
62 |
+ try: |
63 |
+ return self._schedule_tasks() |
64 |
+ finally: |
65 |
+ self._scheduling = False |
66 |
+ |
67 |
+ def _running_job_count(self): |
68 |
+ return self._jobs |
69 |
+ |
70 |
+ def _can_add_job(self): |
71 |
+ max_jobs = self._max_jobs |
72 |
+ max_load = self._max_load |
73 |
+ |
74 |
+ if self._max_jobs is not True and \ |
75 |
+ self._running_job_count() >= self._max_jobs: |
76 |
+ return False |
77 |
+ |
78 |
+ if max_load is not None and \ |
79 |
+ (max_jobs is True or max_jobs > 1) and \ |
80 |
+ self._running_job_count() >= 1: |
81 |
+ try: |
82 |
+ avg1, avg5, avg15 = getloadavg() |
83 |
+ except OSError: |
84 |
+ return False |
85 |
+ |
86 |
+ if avg1 >= max_load: |
87 |
+ return False |
88 |
+ |
89 |
+ return True |
90 |
+ |
91 |
+ def _poll(self, timeout=None): |
92 |
+ """ |
93 |
+ All poll() calls pass through here. The poll events |
94 |
+ are added directly to self._poll_event_queue. |
95 |
+ In order to avoid endless blocking, this raises |
96 |
+ StopIteration if timeout is None and there are |
97 |
+ no file descriptors to poll. |
98 |
+ """ |
99 |
+ if not self._poll_event_handlers: |
100 |
+ self._schedule() |
101 |
+ if timeout is None and \ |
102 |
+ not self._poll_event_handlers: |
103 |
+ raise StopIteration( |
104 |
+ "timeout is None and there are no poll() event handlers") |
105 |
+ |
106 |
+ # The following error is known to occur with Linux kernel versions |
107 |
+ # less than 2.6.24: |
108 |
+ # |
109 |
+ # select.error: (4, 'Interrupted system call') |
110 |
+ # |
111 |
+ # This error has been observed after a SIGSTOP, followed by SIGCONT. |
112 |
+ # Treat it similar to EAGAIN if timeout is None, otherwise just return |
113 |
+ # without any events. |
114 |
+ while True: |
115 |
+ try: |
116 |
+ self._poll_event_queue.extend(self._poll_obj.poll(timeout)) |
117 |
+ break |
118 |
+ except select.error, e: |
119 |
+ writemsg_level("\n!!! select error: %s\n" % (e,), |
120 |
+ level=logging.ERROR, noiselevel=-1) |
121 |
+ del e |
122 |
+ if timeout is not None: |
123 |
+ break |
124 |
+ |
125 |
+ def _next_poll_event(self, timeout=None): |
126 |
+ """ |
127 |
+ Since the _schedule_wait() loop is called by event |
128 |
+ handlers from _poll_loop(), maintain a central event |
129 |
+ queue for both of them to share events from a single |
130 |
+ poll() call. In order to avoid endless blocking, this |
131 |
+ raises StopIteration if timeout is None and there are |
132 |
+ no file descriptors to poll. |
133 |
+ """ |
134 |
+ if not self._poll_event_queue: |
135 |
+ self._poll(timeout) |
136 |
+ return self._poll_event_queue.pop() |
137 |
+ |
138 |
+ def _poll_loop(self): |
139 |
+ |
140 |
+ event_handlers = self._poll_event_handlers |
141 |
+ event_handled = False |
142 |
+ |
143 |
+ try: |
144 |
+ while event_handlers: |
145 |
+ f, event = self._next_poll_event() |
146 |
+ handler, reg_id = event_handlers[f] |
147 |
+ handler(f, event) |
148 |
+ event_handled = True |
149 |
+ except StopIteration: |
150 |
+ event_handled = True |
151 |
+ |
152 |
+ if not event_handled: |
153 |
+ raise AssertionError("tight loop") |
154 |
+ |
155 |
+ def _schedule_yield(self): |
156 |
+ """ |
157 |
+ Schedule for a short period of time chosen by the scheduler based |
158 |
+ on internal state. Synchronous tasks should call this periodically |
159 |
+ in order to allow the scheduler to service pending poll events. The |
160 |
+ scheduler will call poll() exactly once, without blocking, and any |
161 |
+ resulting poll events will be serviced. |
162 |
+ """ |
163 |
+ event_handlers = self._poll_event_handlers |
164 |
+ events_handled = 0 |
165 |
+ |
166 |
+ if not event_handlers: |
167 |
+ return bool(events_handled) |
168 |
+ |
169 |
+ if not self._poll_event_queue: |
170 |
+ self._poll(0) |
171 |
+ |
172 |
+ try: |
173 |
+ while event_handlers and self._poll_event_queue: |
174 |
+ f, event = self._next_poll_event() |
175 |
+ handler, reg_id = event_handlers[f] |
176 |
+ handler(f, event) |
177 |
+ events_handled += 1 |
178 |
+ except StopIteration: |
179 |
+ events_handled += 1 |
180 |
+ |
181 |
+ return bool(events_handled) |
182 |
+ |
183 |
+ def _register(self, f, eventmask, handler): |
184 |
+ """ |
185 |
+ @rtype: Integer |
186 |
+ @return: A unique registration id, for use in schedule() or |
187 |
+ unregister() calls. |
188 |
+ """ |
189 |
+ if f in self._poll_event_handlers: |
190 |
+ raise AssertionError("fd %d is already registered" % f) |
191 |
+ self._event_handler_id += 1 |
192 |
+ reg_id = self._event_handler_id |
193 |
+ self._poll_event_handler_ids[reg_id] = f |
194 |
+ self._poll_event_handlers[f] = (handler, reg_id) |
195 |
+ self._poll_obj.register(f, eventmask) |
196 |
+ return reg_id |
197 |
+ |
198 |
+ def _unregister(self, reg_id): |
199 |
+ f = self._poll_event_handler_ids[reg_id] |
200 |
+ self._poll_obj.unregister(f) |
201 |
+ del self._poll_event_handlers[f] |
202 |
+ del self._poll_event_handler_ids[reg_id] |
203 |
+ |
204 |
+ def _schedule_wait(self, wait_ids): |
205 |
+ """ |
206 |
+ Schedule until wait_id is not longer registered |
207 |
+ for poll() events. |
208 |
+ @type wait_id: int |
209 |
+ @param wait_id: a task id to wait for |
210 |
+ """ |
211 |
+ event_handlers = self._poll_event_handlers |
212 |
+ handler_ids = self._poll_event_handler_ids |
213 |
+ event_handled = False |
214 |
+ |
215 |
+ if isinstance(wait_ids, int): |
216 |
+ wait_ids = frozenset([wait_ids]) |
217 |
+ |
218 |
+ try: |
219 |
+ while wait_ids.intersection(handler_ids): |
220 |
+ f, event = self._next_poll_event() |
221 |
+ handler, reg_id = event_handlers[f] |
222 |
+ handler(f, event) |
223 |
+ event_handled = True |
224 |
+ except StopIteration: |
225 |
+ event_handled = True |
226 |
+ |
227 |
+ return event_handled |
228 |
+ |
229 |
+ |
230 |
+_can_poll_device = None |
231 |
+ |
232 |
+def can_poll_device(): |
233 |
+ """ |
234 |
+ Test if it's possible to use poll() on a device such as a pty. This |
235 |
+ is known to fail on Darwin. |
236 |
+ @rtype: bool |
237 |
+ @returns: True if poll() on a device succeeds, False otherwise. |
238 |
+ """ |
239 |
+ |
240 |
+ global _can_poll_device |
241 |
+ if _can_poll_device is not None: |
242 |
+ return _can_poll_device |
243 |
+ |
244 |
+ if not hasattr(select, "poll"): |
245 |
+ _can_poll_device = False |
246 |
+ return _can_poll_device |
247 |
+ |
248 |
+ try: |
249 |
+ dev_null = open('/dev/null', 'rb') |
250 |
+ except IOError: |
251 |
+ _can_poll_device = False |
252 |
+ return _can_poll_device |
253 |
+ |
254 |
+ p = select.poll() |
255 |
+ p.register(dev_null.fileno(), PollConstants.POLLIN) |
256 |
+ |
257 |
+ invalid_request = False |
258 |
+ for f, event in p.poll(): |
259 |
+ if event & PollConstants.POLLNVAL: |
260 |
+ invalid_request = True |
261 |
+ break |
262 |
+ dev_null.close() |
263 |
+ |
264 |
+ _can_poll_device = not invalid_request |
265 |
+ return _can_poll_device |
266 |
+ |
267 |
+def create_poll_instance(): |
268 |
+ """ |
269 |
+ Create an instance of select.poll, or an instance of |
270 |
+ PollSelectAdapter there is no poll() implementation or |
271 |
+ it is broken somehow. |
272 |
+ """ |
273 |
+ if can_poll_device(): |
274 |
+ return select.poll() |
275 |
+ return PollSelectAdapter() |
276 |
|
277 |
Copied: main/branches/prefix/pym/_emerge/QueueScheduler.py (from rev 13668, main/trunk/pym/_emerge/QueueScheduler.py) |
278 |
=================================================================== |
279 |
--- main/branches/prefix/pym/_emerge/QueueScheduler.py (rev 0) |
280 |
+++ main/branches/prefix/pym/_emerge/QueueScheduler.py 2009-06-27 13:19:35 UTC (rev 13707) |
281 |
@@ -0,0 +1,77 @@ |
282 |
+from _emerge.PollScheduler import PollScheduler |
283 |
+ |
284 |
+class QueueScheduler(PollScheduler): |
285 |
+ |
286 |
+ """ |
287 |
+ Add instances of SequentialTaskQueue and then call run(). The |
288 |
+ run() method returns when no tasks remain. |
289 |
+ """ |
290 |
+ |
291 |
+ def __init__(self, max_jobs=None, max_load=None): |
292 |
+ PollScheduler.__init__(self) |
293 |
+ |
294 |
+ if max_jobs is None: |
295 |
+ max_jobs = 1 |
296 |
+ |
297 |
+ self._max_jobs = max_jobs |
298 |
+ self._max_load = max_load |
299 |
+ self.sched_iface = self._sched_iface_class( |
300 |
+ register=self._register, |
301 |
+ schedule=self._schedule_wait, |
302 |
+ unregister=self._unregister) |
303 |
+ |
304 |
+ self._queues = [] |
305 |
+ self._schedule_listeners = [] |
306 |
+ |
307 |
+ def add(self, q): |
308 |
+ self._queues.append(q) |
309 |
+ |
310 |
+ def remove(self, q): |
311 |
+ self._queues.remove(q) |
312 |
+ |
313 |
+ def run(self): |
314 |
+ |
315 |
+ while self._schedule(): |
316 |
+ self._poll_loop() |
317 |
+ |
318 |
+ while self._running_job_count(): |
319 |
+ self._poll_loop() |
320 |
+ |
321 |
+ def _schedule_tasks(self): |
322 |
+ """ |
323 |
+ @rtype: bool |
324 |
+ @returns: True if there may be remaining tasks to schedule, |
325 |
+ False otherwise. |
326 |
+ """ |
327 |
+ while self._can_add_job(): |
328 |
+ n = self._max_jobs - self._running_job_count() |
329 |
+ if n < 1: |
330 |
+ break |
331 |
+ |
332 |
+ if not self._start_next_job(n): |
333 |
+ return False |
334 |
+ |
335 |
+ for q in self._queues: |
336 |
+ if q: |
337 |
+ return True |
338 |
+ return False |
339 |
+ |
340 |
+ def _running_job_count(self): |
341 |
+ job_count = 0 |
342 |
+ for q in self._queues: |
343 |
+ job_count += len(q.running_tasks) |
344 |
+ self._jobs = job_count |
345 |
+ return job_count |
346 |
+ |
347 |
+ def _start_next_job(self, n=1): |
348 |
+ started_count = 0 |
349 |
+ for q in self._queues: |
350 |
+ initial_job_count = len(q.running_tasks) |
351 |
+ q.schedule() |
352 |
+ final_job_count = len(q.running_tasks) |
353 |
+ if final_job_count > initial_job_count: |
354 |
+ started_count += (final_job_count - initial_job_count) |
355 |
+ if started_count >= n: |
356 |
+ break |
357 |
+ return started_count |
358 |
+ |
359 |
|
360 |
Copied: main/branches/prefix/pym/_emerge/TaskScheduler.py (from rev 13668, main/trunk/pym/_emerge/TaskScheduler.py) |
361 |
=================================================================== |
362 |
--- main/branches/prefix/pym/_emerge/TaskScheduler.py (rev 0) |
363 |
+++ main/branches/prefix/pym/_emerge/TaskScheduler.py 2009-06-27 13:19:35 UTC (rev 13707) |
364 |
@@ -0,0 +1,21 @@ |
365 |
+from _emerge.QueueScheduler import QueueScheduler |
366 |
+from _emerge.SequentialTaskQueue import SequentialTaskQueue |
367 |
+ |
368 |
+class TaskScheduler(object): |
369 |
+ |
370 |
+ """ |
371 |
+ A simple way to handle scheduling of AsynchrousTask instances. Simply |
372 |
+ add tasks and call run(). The run() method returns when no tasks remain. |
373 |
+ """ |
374 |
+ |
375 |
+ def __init__(self, max_jobs=None, max_load=None): |
376 |
+ self._queue = SequentialTaskQueue(max_jobs=max_jobs) |
377 |
+ self._scheduler = QueueScheduler( |
378 |
+ max_jobs=max_jobs, max_load=max_load) |
379 |
+ self.sched_iface = self._scheduler.sched_iface |
380 |
+ self.run = self._scheduler.run |
381 |
+ self._scheduler.add(self._queue) |
382 |
+ |
383 |
+ def add(self, task): |
384 |
+ self._queue.add(task) |
385 |
+ |
386 |
|
387 |
Modified: main/branches/prefix/pym/_emerge/__init__.py |
388 |
=================================================================== |
389 |
--- main/branches/prefix/pym/_emerge/__init__.py 2009-06-27 13:12:24 UTC (rev 13706) |
390 |
+++ main/branches/prefix/pym/_emerge/__init__.py 2009-06-27 13:19:35 UTC (rev 13707) |
391 |
@@ -3,10 +3,8 @@ |
392 |
# Distributed under the terms of the GNU General Public License v2 |
393 |
# $Id$ |
394 |
|
395 |
-import formatter |
396 |
import logging |
397 |
import pwd |
398 |
-import select |
399 |
import shlex |
400 |
import signal |
401 |
import sys |
402 |
@@ -74,7 +72,6 @@ |
403 |
from _emerge.DepPrioritySatisfiedRange import DepPrioritySatisfiedRange |
404 |
from _emerge.Task import Task |
405 |
from _emerge.Blocker import Blocker |
406 |
-from _emerge.PollConstants import PollConstants |
407 |
from _emerge.AsynchronousTask import AsynchronousTask |
408 |
from _emerge.CompositeTask import CompositeTask |
409 |
from _emerge.EbuildFetcher import EbuildFetcher |
410 |
@@ -93,13 +90,12 @@ |
411 |
from _emerge.PackageVirtualDbapi import PackageVirtualDbapi |
412 |
from _emerge.RepoDisplay import RepoDisplay |
413 |
from _emerge.UseFlagDisplay import UseFlagDisplay |
414 |
-from _emerge.PollSelectAdapter import PollSelectAdapter |
415 |
from _emerge.SequentialTaskQueue import SequentialTaskQueue |
416 |
from _emerge.ProgressHandler import ProgressHandler |
417 |
from _emerge.stdout_spinner import stdout_spinner |
418 |
from _emerge.UninstallFailure import UninstallFailure |
419 |
from _emerge.JobStatusDisplay import JobStatusDisplay |
420 |
-from _emerge.getloadavg import getloadavg |
421 |
+from _emerge.PollScheduler import PollScheduler |
422 |
|
423 |
def userquery(prompt, responses=None, colours=None): |
424 |
"""Displays a prompt and a set of responses, then waits for a response |
425 |
@@ -6476,341 +6472,6 @@ |
426 |
(self.blocks - self.blocks_satisfied)) |
427 |
return "".join(myoutput) |
428 |
|
429 |
- |
430 |
-_can_poll_device = None |
431 |
- |
432 |
-def can_poll_device(): |
433 |
- """ |
434 |
- Test if it's possible to use poll() on a device such as a pty. This |
435 |
- is known to fail on Darwin. |
436 |
- @rtype: bool |
437 |
- @returns: True if poll() on a device succeeds, False otherwise. |
438 |
- """ |
439 |
- |
440 |
- global _can_poll_device |
441 |
- if _can_poll_device is not None: |
442 |
- return _can_poll_device |
443 |
- |
444 |
- if not hasattr(select, "poll"): |
445 |
- _can_poll_device = False |
446 |
- return _can_poll_device |
447 |
- |
448 |
- try: |
449 |
- dev_null = open('/dev/null', 'rb') |
450 |
- except IOError: |
451 |
- _can_poll_device = False |
452 |
- return _can_poll_device |
453 |
- |
454 |
- p = select.poll() |
455 |
- p.register(dev_null.fileno(), PollConstants.POLLIN) |
456 |
- |
457 |
- invalid_request = False |
458 |
- for f, event in p.poll(): |
459 |
- if event & PollConstants.POLLNVAL: |
460 |
- invalid_request = True |
461 |
- break |
462 |
- dev_null.close() |
463 |
- |
464 |
- _can_poll_device = not invalid_request |
465 |
- return _can_poll_device |
466 |
- |
467 |
-def create_poll_instance(): |
468 |
- """ |
469 |
- Create an instance of select.poll, or an instance of |
470 |
- PollSelectAdapter there is no poll() implementation or |
471 |
- it is broken somehow. |
472 |
- """ |
473 |
- if can_poll_device(): |
474 |
- return select.poll() |
475 |
- return PollSelectAdapter() |
476 |
- |
477 |
-class PollScheduler(object): |
478 |
- |
479 |
- class _sched_iface_class(SlotObject): |
480 |
- __slots__ = ("register", "schedule", "unregister") |
481 |
- |
482 |
- def __init__(self): |
483 |
- self._max_jobs = 1 |
484 |
- self._max_load = None |
485 |
- self._jobs = 0 |
486 |
- self._poll_event_queue = [] |
487 |
- self._poll_event_handlers = {} |
488 |
- self._poll_event_handler_ids = {} |
489 |
- # Increment id for each new handler. |
490 |
- self._event_handler_id = 0 |
491 |
- self._poll_obj = create_poll_instance() |
492 |
- self._scheduling = False |
493 |
- |
494 |
- def _schedule(self): |
495 |
- """ |
496 |
- Calls _schedule_tasks() and automatically returns early from |
497 |
- any recursive calls to this method that the _schedule_tasks() |
498 |
- call might trigger. This makes _schedule() safe to call from |
499 |
- inside exit listeners. |
500 |
- """ |
501 |
- if self._scheduling: |
502 |
- return False |
503 |
- self._scheduling = True |
504 |
- try: |
505 |
- return self._schedule_tasks() |
506 |
- finally: |
507 |
- self._scheduling = False |
508 |
- |
509 |
- def _running_job_count(self): |
510 |
- return self._jobs |
511 |
- |
512 |
- def _can_add_job(self): |
513 |
- max_jobs = self._max_jobs |
514 |
- max_load = self._max_load |
515 |
- |
516 |
- if self._max_jobs is not True and \ |
517 |
- self._running_job_count() >= self._max_jobs: |
518 |
- return False |
519 |
- |
520 |
- if max_load is not None and \ |
521 |
- (max_jobs is True or max_jobs > 1) and \ |
522 |
- self._running_job_count() >= 1: |
523 |
- try: |
524 |
- avg1, avg5, avg15 = getloadavg() |
525 |
- except OSError: |
526 |
- return False |
527 |
- |
528 |
- if avg1 >= max_load: |
529 |
- return False |
530 |
- |
531 |
- return True |
532 |
- |
533 |
- def _poll(self, timeout=None): |
534 |
- """ |
535 |
- All poll() calls pass through here. The poll events |
536 |
- are added directly to self._poll_event_queue. |
537 |
- In order to avoid endless blocking, this raises |
538 |
- StopIteration if timeout is None and there are |
539 |
- no file descriptors to poll. |
540 |
- """ |
541 |
- if not self._poll_event_handlers: |
542 |
- self._schedule() |
543 |
- if timeout is None and \ |
544 |
- not self._poll_event_handlers: |
545 |
- raise StopIteration( |
546 |
- "timeout is None and there are no poll() event handlers") |
547 |
- |
548 |
- # The following error is known to occur with Linux kernel versions |
549 |
- # less than 2.6.24: |
550 |
- # |
551 |
- # select.error: (4, 'Interrupted system call') |
552 |
- # |
553 |
- # This error has been observed after a SIGSTOP, followed by SIGCONT. |
554 |
- # Treat it similar to EAGAIN if timeout is None, otherwise just return |
555 |
- # without any events. |
556 |
- while True: |
557 |
- try: |
558 |
- self._poll_event_queue.extend(self._poll_obj.poll(timeout)) |
559 |
- break |
560 |
- except select.error, e: |
561 |
- writemsg_level("\n!!! select error: %s\n" % (e,), |
562 |
- level=logging.ERROR, noiselevel=-1) |
563 |
- del e |
564 |
- if timeout is not None: |
565 |
- break |
566 |
- |
567 |
- def _next_poll_event(self, timeout=None): |
568 |
- """ |
569 |
- Since the _schedule_wait() loop is called by event |
570 |
- handlers from _poll_loop(), maintain a central event |
571 |
- queue for both of them to share events from a single |
572 |
- poll() call. In order to avoid endless blocking, this |
573 |
- raises StopIteration if timeout is None and there are |
574 |
- no file descriptors to poll. |
575 |
- """ |
576 |
- if not self._poll_event_queue: |
577 |
- self._poll(timeout) |
578 |
- return self._poll_event_queue.pop() |
579 |
- |
580 |
- def _poll_loop(self): |
581 |
- |
582 |
- event_handlers = self._poll_event_handlers |
583 |
- event_handled = False |
584 |
- |
585 |
- try: |
586 |
- while event_handlers: |
587 |
- f, event = self._next_poll_event() |
588 |
- handler, reg_id = event_handlers[f] |
589 |
- handler(f, event) |
590 |
- event_handled = True |
591 |
- except StopIteration: |
592 |
- event_handled = True |
593 |
- |
594 |
- if not event_handled: |
595 |
- raise AssertionError("tight loop") |
596 |
- |
597 |
- def _schedule_yield(self): |
598 |
- """ |
599 |
- Schedule for a short period of time chosen by the scheduler based |
600 |
- on internal state. Synchronous tasks should call this periodically |
601 |
- in order to allow the scheduler to service pending poll events. The |
602 |
- scheduler will call poll() exactly once, without blocking, and any |
603 |
- resulting poll events will be serviced. |
604 |
- """ |
605 |
- event_handlers = self._poll_event_handlers |
606 |
- events_handled = 0 |
607 |
- |
608 |
- if not event_handlers: |
609 |
- return bool(events_handled) |
610 |
- |
611 |
- if not self._poll_event_queue: |
612 |
- self._poll(0) |
613 |
- |
614 |
- try: |
615 |
- while event_handlers and self._poll_event_queue: |
616 |
- f, event = self._next_poll_event() |
617 |
- handler, reg_id = event_handlers[f] |
618 |
- handler(f, event) |
619 |
- events_handled += 1 |
620 |
- except StopIteration: |
621 |
- events_handled += 1 |
622 |
- |
623 |
- return bool(events_handled) |
624 |
- |
625 |
- def _register(self, f, eventmask, handler): |
626 |
- """ |
627 |
- @rtype: Integer |
628 |
- @return: A unique registration id, for use in schedule() or |
629 |
- unregister() calls. |
630 |
- """ |
631 |
- if f in self._poll_event_handlers: |
632 |
- raise AssertionError("fd %d is already registered" % f) |
633 |
- self._event_handler_id += 1 |
634 |
- reg_id = self._event_handler_id |
635 |
- self._poll_event_handler_ids[reg_id] = f |
636 |
- self._poll_event_handlers[f] = (handler, reg_id) |
637 |
- self._poll_obj.register(f, eventmask) |
638 |
- return reg_id |
639 |
- |
640 |
- def _unregister(self, reg_id): |
641 |
- f = self._poll_event_handler_ids[reg_id] |
642 |
- self._poll_obj.unregister(f) |
643 |
- del self._poll_event_handlers[f] |
644 |
- del self._poll_event_handler_ids[reg_id] |
645 |
- |
646 |
- def _schedule_wait(self, wait_ids): |
647 |
- """ |
648 |
- Schedule until wait_id is not longer registered |
649 |
- for poll() events. |
650 |
- @type wait_id: int |
651 |
- @param wait_id: a task id to wait for |
652 |
- """ |
653 |
- event_handlers = self._poll_event_handlers |
654 |
- handler_ids = self._poll_event_handler_ids |
655 |
- event_handled = False |
656 |
- |
657 |
- if isinstance(wait_ids, int): |
658 |
- wait_ids = frozenset([wait_ids]) |
659 |
- |
660 |
- try: |
661 |
- while wait_ids.intersection(handler_ids): |
662 |
- f, event = self._next_poll_event() |
663 |
- handler, reg_id = event_handlers[f] |
664 |
- handler(f, event) |
665 |
- event_handled = True |
666 |
- except StopIteration: |
667 |
- event_handled = True |
668 |
- |
669 |
- return event_handled |
670 |
- |
671 |
-class QueueScheduler(PollScheduler): |
672 |
- |
673 |
- """ |
674 |
- Add instances of SequentialTaskQueue and then call run(). The |
675 |
- run() method returns when no tasks remain. |
676 |
- """ |
677 |
- |
678 |
- def __init__(self, max_jobs=None, max_load=None): |
679 |
- PollScheduler.__init__(self) |
680 |
- |
681 |
- if max_jobs is None: |
682 |
- max_jobs = 1 |
683 |
- |
684 |
- self._max_jobs = max_jobs |
685 |
- self._max_load = max_load |
686 |
- self.sched_iface = self._sched_iface_class( |
687 |
- register=self._register, |
688 |
- schedule=self._schedule_wait, |
689 |
- unregister=self._unregister) |
690 |
- |
691 |
- self._queues = [] |
692 |
- self._schedule_listeners = [] |
693 |
- |
694 |
- def add(self, q): |
695 |
- self._queues.append(q) |
696 |
- |
697 |
- def remove(self, q): |
698 |
- self._queues.remove(q) |
699 |
- |
700 |
- def run(self): |
701 |
- |
702 |
- while self._schedule(): |
703 |
- self._poll_loop() |
704 |
- |
705 |
- while self._running_job_count(): |
706 |
- self._poll_loop() |
707 |
- |
708 |
- def _schedule_tasks(self): |
709 |
- """ |
710 |
- @rtype: bool |
711 |
- @returns: True if there may be remaining tasks to schedule, |
712 |
- False otherwise. |
713 |
- """ |
714 |
- while self._can_add_job(): |
715 |
- n = self._max_jobs - self._running_job_count() |
716 |
- if n < 1: |
717 |
- break |
718 |
- |
719 |
- if not self._start_next_job(n): |
720 |
- return False |
721 |
- |
722 |
- for q in self._queues: |
723 |
- if q: |
724 |
- return True |
725 |
- return False |
726 |
- |
727 |
- def _running_job_count(self): |
728 |
- job_count = 0 |
729 |
- for q in self._queues: |
730 |
- job_count += len(q.running_tasks) |
731 |
- self._jobs = job_count |
732 |
- return job_count |
733 |
- |
734 |
- def _start_next_job(self, n=1): |
735 |
- started_count = 0 |
736 |
- for q in self._queues: |
737 |
- initial_job_count = len(q.running_tasks) |
738 |
- q.schedule() |
739 |
- final_job_count = len(q.running_tasks) |
740 |
- if final_job_count > initial_job_count: |
741 |
- started_count += (final_job_count - initial_job_count) |
742 |
- if started_count >= n: |
743 |
- break |
744 |
- return started_count |
745 |
- |
746 |
-class TaskScheduler(object): |
747 |
- |
748 |
- """ |
749 |
- A simple way to handle scheduling of AsynchrousTask instances. Simply |
750 |
- add tasks and call run(). The run() method returns when no tasks remain. |
751 |
- """ |
752 |
- |
753 |
- def __init__(self, max_jobs=None, max_load=None): |
754 |
- self._queue = SequentialTaskQueue(max_jobs=max_jobs) |
755 |
- self._scheduler = QueueScheduler( |
756 |
- max_jobs=max_jobs, max_load=max_load) |
757 |
- self.sched_iface = self._scheduler.sched_iface |
758 |
- self.run = self._scheduler.run |
759 |
- self._scheduler.add(self._queue) |
760 |
- |
761 |
- def add(self, task): |
762 |
- self._queue.add(task) |
763 |
- |
764 |
class Scheduler(PollScheduler): |
765 |
|
766 |
_opts_ignore_blockers = \ |
767 |
|
768 |
Modified: main/branches/prefix/pym/portage/tests/process/test_poll.py |
769 |
=================================================================== |
770 |
--- main/branches/prefix/pym/portage/tests/process/test_poll.py 2009-06-27 13:12:24 UTC (rev 13706) |
771 |
+++ main/branches/prefix/pym/portage/tests/process/test_poll.py 2009-06-27 13:19:35 UTC (rev 13707) |
772 |
@@ -8,7 +8,7 @@ |
773 |
import portage |
774 |
from portage.output import get_term_size, set_term_size |
775 |
from portage.tests import TestCase |
776 |
-from _emerge import TaskScheduler |
777 |
+from _emerge.TaskScheduler import TaskScheduler |
778 |
from _emerge.PipeReader import PipeReader |
779 |
from _emerge.SpawnProcess import SpawnProcess |