Gentoo Archives: gentoo-commits

From: "Fabian Groffen (grobian)" <grobian@g.o>
To: gentoo-commits@l.g.o
Subject: [gentoo-commits] portage r13707 - in main/branches/prefix/pym: _emerge portage/tests/process
Date: Sat, 27 Jun 2009 13:19:39
Message-Id: E1MKXoe-0007EK-Cx@stork.gentoo.org
1 Author: grobian
2 Date: 2009-06-27 13:19:35 +0000 (Sat, 27 Jun 2009)
3 New Revision: 13707
4
5 Added:
6 main/branches/prefix/pym/_emerge/PollScheduler.py
7 main/branches/prefix/pym/_emerge/QueueScheduler.py
8 main/branches/prefix/pym/_emerge/TaskScheduler.py
9 Modified:
10 main/branches/prefix/pym/_emerge/__init__.py
11 main/branches/prefix/pym/portage/tests/process/test_poll.py
12 Log:
13 Merged from trunk -r13667:13668
14
15 | 13668 | Bug #275047 - Split _emerge/__init__.py into smaller pieces |
16 | zmedico | (part 3). Thanks to Sebastian Mingramm (few) |
17 | | <s.mingramm@×××.de> for this patch. |
18
19
20 Copied: main/branches/prefix/pym/_emerge/PollScheduler.py (from rev 13668, main/trunk/pym/_emerge/PollScheduler.py)
21 ===================================================================
22 --- main/branches/prefix/pym/_emerge/PollScheduler.py (rev 0)
23 +++ main/branches/prefix/pym/_emerge/PollScheduler.py 2009-06-27 13:19:35 UTC (rev 13707)
24 @@ -0,0 +1,251 @@
25 +import logging
26 +import select
27 +
28 +from portage.util import writemsg_level
29 +
30 +from _emerge.SlotObject import SlotObject
31 +from _emerge.getloadavg import getloadavg
32 +from _emerge.PollConstants import PollConstants
33 +from _emerge.PollSelectAdapter import PollSelectAdapter
34 +
35 +class PollScheduler(object):
36 +
37 + class _sched_iface_class(SlotObject):
38 + __slots__ = ("register", "schedule", "unregister")
39 +
40 + def __init__(self):
41 + self._max_jobs = 1
42 + self._max_load = None
43 + self._jobs = 0
44 + self._poll_event_queue = []
45 + self._poll_event_handlers = {}
46 + self._poll_event_handler_ids = {}
47 + # Increment id for each new handler.
48 + self._event_handler_id = 0
49 + self._poll_obj = create_poll_instance()
50 + self._scheduling = False
51 +
52 + def _schedule(self):
53 + """
54 + Calls _schedule_tasks() and automatically returns early from
55 + any recursive calls to this method that the _schedule_tasks()
56 + call might trigger. This makes _schedule() safe to call from
57 + inside exit listeners.
58 + """
59 + if self._scheduling:
60 + return False
61 + self._scheduling = True
62 + try:
63 + return self._schedule_tasks()
64 + finally:
65 + self._scheduling = False
66 +
67 + def _running_job_count(self):
68 + return self._jobs
69 +
70 + def _can_add_job(self):
71 + max_jobs = self._max_jobs
72 + max_load = self._max_load
73 +
74 + if self._max_jobs is not True and \
75 + self._running_job_count() >= self._max_jobs:
76 + return False
77 +
78 + if max_load is not None and \
79 + (max_jobs is True or max_jobs > 1) and \
80 + self._running_job_count() >= 1:
81 + try:
82 + avg1, avg5, avg15 = getloadavg()
83 + except OSError:
84 + return False
85 +
86 + if avg1 >= max_load:
87 + return False
88 +
89 + return True
90 +
91 + def _poll(self, timeout=None):
92 + """
93 + All poll() calls pass through here. The poll events
94 + are added directly to self._poll_event_queue.
95 + In order to avoid endless blocking, this raises
96 + StopIteration if timeout is None and there are
97 + no file descriptors to poll.
98 + """
99 + if not self._poll_event_handlers:
100 + self._schedule()
101 + if timeout is None and \
102 + not self._poll_event_handlers:
103 + raise StopIteration(
104 + "timeout is None and there are no poll() event handlers")
105 +
106 + # The following error is known to occur with Linux kernel versions
107 + # less than 2.6.24:
108 + #
109 + # select.error: (4, 'Interrupted system call')
110 + #
111 + # This error has been observed after a SIGSTOP, followed by SIGCONT.
112 + # Treat it similar to EAGAIN if timeout is None, otherwise just return
113 + # without any events.
114 + while True:
115 + try:
116 + self._poll_event_queue.extend(self._poll_obj.poll(timeout))
117 + break
118 + except select.error, e:
119 + writemsg_level("\n!!! select error: %s\n" % (e,),
120 + level=logging.ERROR, noiselevel=-1)
121 + del e
122 + if timeout is not None:
123 + break
124 +
125 + def _next_poll_event(self, timeout=None):
126 + """
127 + Since the _schedule_wait() loop is called by event
128 + handlers from _poll_loop(), maintain a central event
129 + queue for both of them to share events from a single
130 + poll() call. In order to avoid endless blocking, this
131 + raises StopIteration if timeout is None and there are
132 + no file descriptors to poll.
133 + """
134 + if not self._poll_event_queue:
135 + self._poll(timeout)
136 + return self._poll_event_queue.pop()
137 +
138 + def _poll_loop(self):
139 +
140 + event_handlers = self._poll_event_handlers
141 + event_handled = False
142 +
143 + try:
144 + while event_handlers:
145 + f, event = self._next_poll_event()
146 + handler, reg_id = event_handlers[f]
147 + handler(f, event)
148 + event_handled = True
149 + except StopIteration:
150 + event_handled = True
151 +
152 + if not event_handled:
153 + raise AssertionError("tight loop")
154 +
155 + def _schedule_yield(self):
156 + """
157 + Schedule for a short period of time chosen by the scheduler based
158 + on internal state. Synchronous tasks should call this periodically
159 + in order to allow the scheduler to service pending poll events. The
160 + scheduler will call poll() exactly once, without blocking, and any
161 + resulting poll events will be serviced.
162 + """
163 + event_handlers = self._poll_event_handlers
164 + events_handled = 0
165 +
166 + if not event_handlers:
167 + return bool(events_handled)
168 +
169 + if not self._poll_event_queue:
170 + self._poll(0)
171 +
172 + try:
173 + while event_handlers and self._poll_event_queue:
174 + f, event = self._next_poll_event()
175 + handler, reg_id = event_handlers[f]
176 + handler(f, event)
177 + events_handled += 1
178 + except StopIteration:
179 + events_handled += 1
180 +
181 + return bool(events_handled)
182 +
183 + def _register(self, f, eventmask, handler):
184 + """
185 + @rtype: Integer
186 + @return: A unique registration id, for use in schedule() or
187 + unregister() calls.
188 + """
189 + if f in self._poll_event_handlers:
190 + raise AssertionError("fd %d is already registered" % f)
191 + self._event_handler_id += 1
192 + reg_id = self._event_handler_id
193 + self._poll_event_handler_ids[reg_id] = f
194 + self._poll_event_handlers[f] = (handler, reg_id)
195 + self._poll_obj.register(f, eventmask)
196 + return reg_id
197 +
198 + def _unregister(self, reg_id):
199 + f = self._poll_event_handler_ids[reg_id]
200 + self._poll_obj.unregister(f)
201 + del self._poll_event_handlers[f]
202 + del self._poll_event_handler_ids[reg_id]
203 +
204 + def _schedule_wait(self, wait_ids):
205 + """
206 + Schedule until wait_id is not longer registered
207 + for poll() events.
208 + @type wait_id: int
209 + @param wait_id: a task id to wait for
210 + """
211 + event_handlers = self._poll_event_handlers
212 + handler_ids = self._poll_event_handler_ids
213 + event_handled = False
214 +
215 + if isinstance(wait_ids, int):
216 + wait_ids = frozenset([wait_ids])
217 +
218 + try:
219 + while wait_ids.intersection(handler_ids):
220 + f, event = self._next_poll_event()
221 + handler, reg_id = event_handlers[f]
222 + handler(f, event)
223 + event_handled = True
224 + except StopIteration:
225 + event_handled = True
226 +
227 + return event_handled
228 +
229 +
230 +_can_poll_device = None
231 +
232 +def can_poll_device():
233 + """
234 + Test if it's possible to use poll() on a device such as a pty. This
235 + is known to fail on Darwin.
236 + @rtype: bool
237 + @returns: True if poll() on a device succeeds, False otherwise.
238 + """
239 +
240 + global _can_poll_device
241 + if _can_poll_device is not None:
242 + return _can_poll_device
243 +
244 + if not hasattr(select, "poll"):
245 + _can_poll_device = False
246 + return _can_poll_device
247 +
248 + try:
249 + dev_null = open('/dev/null', 'rb')
250 + except IOError:
251 + _can_poll_device = False
252 + return _can_poll_device
253 +
254 + p = select.poll()
255 + p.register(dev_null.fileno(), PollConstants.POLLIN)
256 +
257 + invalid_request = False
258 + for f, event in p.poll():
259 + if event & PollConstants.POLLNVAL:
260 + invalid_request = True
261 + break
262 + dev_null.close()
263 +
264 + _can_poll_device = not invalid_request
265 + return _can_poll_device
266 +
267 +def create_poll_instance():
268 + """
269 + Create an instance of select.poll, or an instance of
270 + PollSelectAdapter there is no poll() implementation or
271 + it is broken somehow.
272 + """
273 + if can_poll_device():
274 + return select.poll()
275 + return PollSelectAdapter()
276
277 Copied: main/branches/prefix/pym/_emerge/QueueScheduler.py (from rev 13668, main/trunk/pym/_emerge/QueueScheduler.py)
278 ===================================================================
279 --- main/branches/prefix/pym/_emerge/QueueScheduler.py (rev 0)
280 +++ main/branches/prefix/pym/_emerge/QueueScheduler.py 2009-06-27 13:19:35 UTC (rev 13707)
281 @@ -0,0 +1,77 @@
282 +from _emerge.PollScheduler import PollScheduler
283 +
284 +class QueueScheduler(PollScheduler):
285 +
286 + """
287 + Add instances of SequentialTaskQueue and then call run(). The
288 + run() method returns when no tasks remain.
289 + """
290 +
291 + def __init__(self, max_jobs=None, max_load=None):
292 + PollScheduler.__init__(self)
293 +
294 + if max_jobs is None:
295 + max_jobs = 1
296 +
297 + self._max_jobs = max_jobs
298 + self._max_load = max_load
299 + self.sched_iface = self._sched_iface_class(
300 + register=self._register,
301 + schedule=self._schedule_wait,
302 + unregister=self._unregister)
303 +
304 + self._queues = []
305 + self._schedule_listeners = []
306 +
307 + def add(self, q):
308 + self._queues.append(q)
309 +
310 + def remove(self, q):
311 + self._queues.remove(q)
312 +
313 + def run(self):
314 +
315 + while self._schedule():
316 + self._poll_loop()
317 +
318 + while self._running_job_count():
319 + self._poll_loop()
320 +
321 + def _schedule_tasks(self):
322 + """
323 + @rtype: bool
324 + @returns: True if there may be remaining tasks to schedule,
325 + False otherwise.
326 + """
327 + while self._can_add_job():
328 + n = self._max_jobs - self._running_job_count()
329 + if n < 1:
330 + break
331 +
332 + if not self._start_next_job(n):
333 + return False
334 +
335 + for q in self._queues:
336 + if q:
337 + return True
338 + return False
339 +
340 + def _running_job_count(self):
341 + job_count = 0
342 + for q in self._queues:
343 + job_count += len(q.running_tasks)
344 + self._jobs = job_count
345 + return job_count
346 +
347 + def _start_next_job(self, n=1):
348 + started_count = 0
349 + for q in self._queues:
350 + initial_job_count = len(q.running_tasks)
351 + q.schedule()
352 + final_job_count = len(q.running_tasks)
353 + if final_job_count > initial_job_count:
354 + started_count += (final_job_count - initial_job_count)
355 + if started_count >= n:
356 + break
357 + return started_count
358 +
359
360 Copied: main/branches/prefix/pym/_emerge/TaskScheduler.py (from rev 13668, main/trunk/pym/_emerge/TaskScheduler.py)
361 ===================================================================
362 --- main/branches/prefix/pym/_emerge/TaskScheduler.py (rev 0)
363 +++ main/branches/prefix/pym/_emerge/TaskScheduler.py 2009-06-27 13:19:35 UTC (rev 13707)
364 @@ -0,0 +1,21 @@
365 +from _emerge.QueueScheduler import QueueScheduler
366 +from _emerge.SequentialTaskQueue import SequentialTaskQueue
367 +
368 +class TaskScheduler(object):
369 +
370 + """
371 + A simple way to handle scheduling of AsynchrousTask instances. Simply
372 + add tasks and call run(). The run() method returns when no tasks remain.
373 + """
374 +
375 + def __init__(self, max_jobs=None, max_load=None):
376 + self._queue = SequentialTaskQueue(max_jobs=max_jobs)
377 + self._scheduler = QueueScheduler(
378 + max_jobs=max_jobs, max_load=max_load)
379 + self.sched_iface = self._scheduler.sched_iface
380 + self.run = self._scheduler.run
381 + self._scheduler.add(self._queue)
382 +
383 + def add(self, task):
384 + self._queue.add(task)
385 +
386
387 Modified: main/branches/prefix/pym/_emerge/__init__.py
388 ===================================================================
389 --- main/branches/prefix/pym/_emerge/__init__.py 2009-06-27 13:12:24 UTC (rev 13706)
390 +++ main/branches/prefix/pym/_emerge/__init__.py 2009-06-27 13:19:35 UTC (rev 13707)
391 @@ -3,10 +3,8 @@
392 # Distributed under the terms of the GNU General Public License v2
393 # $Id$
394
395 -import formatter
396 import logging
397 import pwd
398 -import select
399 import shlex
400 import signal
401 import sys
402 @@ -74,7 +72,6 @@
403 from _emerge.DepPrioritySatisfiedRange import DepPrioritySatisfiedRange
404 from _emerge.Task import Task
405 from _emerge.Blocker import Blocker
406 -from _emerge.PollConstants import PollConstants
407 from _emerge.AsynchronousTask import AsynchronousTask
408 from _emerge.CompositeTask import CompositeTask
409 from _emerge.EbuildFetcher import EbuildFetcher
410 @@ -93,13 +90,12 @@
411 from _emerge.PackageVirtualDbapi import PackageVirtualDbapi
412 from _emerge.RepoDisplay import RepoDisplay
413 from _emerge.UseFlagDisplay import UseFlagDisplay
414 -from _emerge.PollSelectAdapter import PollSelectAdapter
415 from _emerge.SequentialTaskQueue import SequentialTaskQueue
416 from _emerge.ProgressHandler import ProgressHandler
417 from _emerge.stdout_spinner import stdout_spinner
418 from _emerge.UninstallFailure import UninstallFailure
419 from _emerge.JobStatusDisplay import JobStatusDisplay
420 -from _emerge.getloadavg import getloadavg
421 +from _emerge.PollScheduler import PollScheduler
422
423 def userquery(prompt, responses=None, colours=None):
424 """Displays a prompt and a set of responses, then waits for a response
425 @@ -6476,341 +6472,6 @@
426 (self.blocks - self.blocks_satisfied))
427 return "".join(myoutput)
428
429 -
430 -_can_poll_device = None
431 -
432 -def can_poll_device():
433 - """
434 - Test if it's possible to use poll() on a device such as a pty. This
435 - is known to fail on Darwin.
436 - @rtype: bool
437 - @returns: True if poll() on a device succeeds, False otherwise.
438 - """
439 -
440 - global _can_poll_device
441 - if _can_poll_device is not None:
442 - return _can_poll_device
443 -
444 - if not hasattr(select, "poll"):
445 - _can_poll_device = False
446 - return _can_poll_device
447 -
448 - try:
449 - dev_null = open('/dev/null', 'rb')
450 - except IOError:
451 - _can_poll_device = False
452 - return _can_poll_device
453 -
454 - p = select.poll()
455 - p.register(dev_null.fileno(), PollConstants.POLLIN)
456 -
457 - invalid_request = False
458 - for f, event in p.poll():
459 - if event & PollConstants.POLLNVAL:
460 - invalid_request = True
461 - break
462 - dev_null.close()
463 -
464 - _can_poll_device = not invalid_request
465 - return _can_poll_device
466 -
467 -def create_poll_instance():
468 - """
469 - Create an instance of select.poll, or an instance of
470 - PollSelectAdapter there is no poll() implementation or
471 - it is broken somehow.
472 - """
473 - if can_poll_device():
474 - return select.poll()
475 - return PollSelectAdapter()
476 -
477 -class PollScheduler(object):
478 -
479 - class _sched_iface_class(SlotObject):
480 - __slots__ = ("register", "schedule", "unregister")
481 -
482 - def __init__(self):
483 - self._max_jobs = 1
484 - self._max_load = None
485 - self._jobs = 0
486 - self._poll_event_queue = []
487 - self._poll_event_handlers = {}
488 - self._poll_event_handler_ids = {}
489 - # Increment id for each new handler.
490 - self._event_handler_id = 0
491 - self._poll_obj = create_poll_instance()
492 - self._scheduling = False
493 -
494 - def _schedule(self):
495 - """
496 - Calls _schedule_tasks() and automatically returns early from
497 - any recursive calls to this method that the _schedule_tasks()
498 - call might trigger. This makes _schedule() safe to call from
499 - inside exit listeners.
500 - """
501 - if self._scheduling:
502 - return False
503 - self._scheduling = True
504 - try:
505 - return self._schedule_tasks()
506 - finally:
507 - self._scheduling = False
508 -
509 - def _running_job_count(self):
510 - return self._jobs
511 -
512 - def _can_add_job(self):
513 - max_jobs = self._max_jobs
514 - max_load = self._max_load
515 -
516 - if self._max_jobs is not True and \
517 - self._running_job_count() >= self._max_jobs:
518 - return False
519 -
520 - if max_load is not None and \
521 - (max_jobs is True or max_jobs > 1) and \
522 - self._running_job_count() >= 1:
523 - try:
524 - avg1, avg5, avg15 = getloadavg()
525 - except OSError:
526 - return False
527 -
528 - if avg1 >= max_load:
529 - return False
530 -
531 - return True
532 -
533 - def _poll(self, timeout=None):
534 - """
535 - All poll() calls pass through here. The poll events
536 - are added directly to self._poll_event_queue.
537 - In order to avoid endless blocking, this raises
538 - StopIteration if timeout is None and there are
539 - no file descriptors to poll.
540 - """
541 - if not self._poll_event_handlers:
542 - self._schedule()
543 - if timeout is None and \
544 - not self._poll_event_handlers:
545 - raise StopIteration(
546 - "timeout is None and there are no poll() event handlers")
547 -
548 - # The following error is known to occur with Linux kernel versions
549 - # less than 2.6.24:
550 - #
551 - # select.error: (4, 'Interrupted system call')
552 - #
553 - # This error has been observed after a SIGSTOP, followed by SIGCONT.
554 - # Treat it similar to EAGAIN if timeout is None, otherwise just return
555 - # without any events.
556 - while True:
557 - try:
558 - self._poll_event_queue.extend(self._poll_obj.poll(timeout))
559 - break
560 - except select.error, e:
561 - writemsg_level("\n!!! select error: %s\n" % (e,),
562 - level=logging.ERROR, noiselevel=-1)
563 - del e
564 - if timeout is not None:
565 - break
566 -
567 - def _next_poll_event(self, timeout=None):
568 - """
569 - Since the _schedule_wait() loop is called by event
570 - handlers from _poll_loop(), maintain a central event
571 - queue for both of them to share events from a single
572 - poll() call. In order to avoid endless blocking, this
573 - raises StopIteration if timeout is None and there are
574 - no file descriptors to poll.
575 - """
576 - if not self._poll_event_queue:
577 - self._poll(timeout)
578 - return self._poll_event_queue.pop()
579 -
580 - def _poll_loop(self):
581 -
582 - event_handlers = self._poll_event_handlers
583 - event_handled = False
584 -
585 - try:
586 - while event_handlers:
587 - f, event = self._next_poll_event()
588 - handler, reg_id = event_handlers[f]
589 - handler(f, event)
590 - event_handled = True
591 - except StopIteration:
592 - event_handled = True
593 -
594 - if not event_handled:
595 - raise AssertionError("tight loop")
596 -
597 - def _schedule_yield(self):
598 - """
599 - Schedule for a short period of time chosen by the scheduler based
600 - on internal state. Synchronous tasks should call this periodically
601 - in order to allow the scheduler to service pending poll events. The
602 - scheduler will call poll() exactly once, without blocking, and any
603 - resulting poll events will be serviced.
604 - """
605 - event_handlers = self._poll_event_handlers
606 - events_handled = 0
607 -
608 - if not event_handlers:
609 - return bool(events_handled)
610 -
611 - if not self._poll_event_queue:
612 - self._poll(0)
613 -
614 - try:
615 - while event_handlers and self._poll_event_queue:
616 - f, event = self._next_poll_event()
617 - handler, reg_id = event_handlers[f]
618 - handler(f, event)
619 - events_handled += 1
620 - except StopIteration:
621 - events_handled += 1
622 -
623 - return bool(events_handled)
624 -
625 - def _register(self, f, eventmask, handler):
626 - """
627 - @rtype: Integer
628 - @return: A unique registration id, for use in schedule() or
629 - unregister() calls.
630 - """
631 - if f in self._poll_event_handlers:
632 - raise AssertionError("fd %d is already registered" % f)
633 - self._event_handler_id += 1
634 - reg_id = self._event_handler_id
635 - self._poll_event_handler_ids[reg_id] = f
636 - self._poll_event_handlers[f] = (handler, reg_id)
637 - self._poll_obj.register(f, eventmask)
638 - return reg_id
639 -
640 - def _unregister(self, reg_id):
641 - f = self._poll_event_handler_ids[reg_id]
642 - self._poll_obj.unregister(f)
643 - del self._poll_event_handlers[f]
644 - del self._poll_event_handler_ids[reg_id]
645 -
646 - def _schedule_wait(self, wait_ids):
647 - """
648 - Schedule until wait_id is not longer registered
649 - for poll() events.
650 - @type wait_id: int
651 - @param wait_id: a task id to wait for
652 - """
653 - event_handlers = self._poll_event_handlers
654 - handler_ids = self._poll_event_handler_ids
655 - event_handled = False
656 -
657 - if isinstance(wait_ids, int):
658 - wait_ids = frozenset([wait_ids])
659 -
660 - try:
661 - while wait_ids.intersection(handler_ids):
662 - f, event = self._next_poll_event()
663 - handler, reg_id = event_handlers[f]
664 - handler(f, event)
665 - event_handled = True
666 - except StopIteration:
667 - event_handled = True
668 -
669 - return event_handled
670 -
671 -class QueueScheduler(PollScheduler):
672 -
673 - """
674 - Add instances of SequentialTaskQueue and then call run(). The
675 - run() method returns when no tasks remain.
676 - """
677 -
678 - def __init__(self, max_jobs=None, max_load=None):
679 - PollScheduler.__init__(self)
680 -
681 - if max_jobs is None:
682 - max_jobs = 1
683 -
684 - self._max_jobs = max_jobs
685 - self._max_load = max_load
686 - self.sched_iface = self._sched_iface_class(
687 - register=self._register,
688 - schedule=self._schedule_wait,
689 - unregister=self._unregister)
690 -
691 - self._queues = []
692 - self._schedule_listeners = []
693 -
694 - def add(self, q):
695 - self._queues.append(q)
696 -
697 - def remove(self, q):
698 - self._queues.remove(q)
699 -
700 - def run(self):
701 -
702 - while self._schedule():
703 - self._poll_loop()
704 -
705 - while self._running_job_count():
706 - self._poll_loop()
707 -
708 - def _schedule_tasks(self):
709 - """
710 - @rtype: bool
711 - @returns: True if there may be remaining tasks to schedule,
712 - False otherwise.
713 - """
714 - while self._can_add_job():
715 - n = self._max_jobs - self._running_job_count()
716 - if n < 1:
717 - break
718 -
719 - if not self._start_next_job(n):
720 - return False
721 -
722 - for q in self._queues:
723 - if q:
724 - return True
725 - return False
726 -
727 - def _running_job_count(self):
728 - job_count = 0
729 - for q in self._queues:
730 - job_count += len(q.running_tasks)
731 - self._jobs = job_count
732 - return job_count
733 -
734 - def _start_next_job(self, n=1):
735 - started_count = 0
736 - for q in self._queues:
737 - initial_job_count = len(q.running_tasks)
738 - q.schedule()
739 - final_job_count = len(q.running_tasks)
740 - if final_job_count > initial_job_count:
741 - started_count += (final_job_count - initial_job_count)
742 - if started_count >= n:
743 - break
744 - return started_count
745 -
746 -class TaskScheduler(object):
747 -
748 - """
749 - A simple way to handle scheduling of AsynchrousTask instances. Simply
750 - add tasks and call run(). The run() method returns when no tasks remain.
751 - """
752 -
753 - def __init__(self, max_jobs=None, max_load=None):
754 - self._queue = SequentialTaskQueue(max_jobs=max_jobs)
755 - self._scheduler = QueueScheduler(
756 - max_jobs=max_jobs, max_load=max_load)
757 - self.sched_iface = self._scheduler.sched_iface
758 - self.run = self._scheduler.run
759 - self._scheduler.add(self._queue)
760 -
761 - def add(self, task):
762 - self._queue.add(task)
763 -
764 class Scheduler(PollScheduler):
765
766 _opts_ignore_blockers = \
767
768 Modified: main/branches/prefix/pym/portage/tests/process/test_poll.py
769 ===================================================================
770 --- main/branches/prefix/pym/portage/tests/process/test_poll.py 2009-06-27 13:12:24 UTC (rev 13706)
771 +++ main/branches/prefix/pym/portage/tests/process/test_poll.py 2009-06-27 13:19:35 UTC (rev 13707)
772 @@ -8,7 +8,7 @@
773 import portage
774 from portage.output import get_term_size, set_term_size
775 from portage.tests import TestCase
776 -from _emerge import TaskScheduler
777 +from _emerge.TaskScheduler import TaskScheduler
778 from _emerge.PipeReader import PipeReader
779 from _emerge.SpawnProcess import SpawnProcess