Gentoo Archives: gentoo-commits

From: Zac Medico <zmedico@g.o>
To: gentoo-commits@l.g.o
Subject: [gentoo-commits] proj/portage:master commit in: lib/portage/util/_async/, lib/portage/tests/ebuild/, ...
Date: Thu, 20 Feb 2020 09:42:16
Message-Id: 1582190152.8f47d3fe1190d4476ae9eebfafcebdfb1794fc05.zmedico@gentoo
1 commit: 8f47d3fe1190d4476ae9eebfafcebdfb1794fc05
2 Author: Zac Medico <zmedico <AT> gentoo <DOT> org>
3 AuthorDate: Tue Feb 18 07:43:12 2020 +0000
4 Commit: Zac Medico <zmedico <AT> gentoo <DOT> org>
5 CommitDate: Thu Feb 20 09:15:52 2020 +0000
6 URL: https://gitweb.gentoo.org/proj/portage.git/commit/?id=8f47d3fe
7
8 AsyncScheduler: use async_start method
9
10 Convert AsyncScheduler to use the async_start method, since
11 eventually this method will need to be a coroutine in order to write
12 messages to the build log as discussed in bug 709746.
13
14 Also fix async_iter_completed to be compatible with callback
15 scheduling differences introduced by migration to the async_start
16 method.
17
18 Bug: https://bugs.gentoo.org/709746
19 Signed-off-by: Zac Medico <zmedico <AT> gentoo.org>
20
21 lib/portage/tests/ebuild/test_doebuild_fd_pipes.py | 8 ++---
22 .../tests/util/futures/test_iter_completed.py | 2 ++
23 lib/portage/util/_async/AsyncScheduler.py | 20 ++++++++++--
24 lib/portage/util/futures/iter_completed.py | 38 +++++++++++++++++-----
25 4 files changed, 53 insertions(+), 15 deletions(-)
26
27 diff --git a/lib/portage/tests/ebuild/test_doebuild_fd_pipes.py b/lib/portage/tests/ebuild/test_doebuild_fd_pipes.py
28 index 05ea24c4b..50fc5fe1c 100644
29 --- a/lib/portage/tests/ebuild/test_doebuild_fd_pipes.py
30 +++ b/lib/portage/tests/ebuild/test_doebuild_fd_pipes.py
31 @@ -109,18 +109,16 @@ class DoebuildFdPipesTestCase(TestCase):
32 output_fd: pw,
33 },
34 "prev_mtimes": {}})
35 + producer.addStartListener(lambda producer: os.close(pw))
36
37 + # PipeReader closes pr
38 consumer = PipeReader(
39 input_files={"producer" : pr})
40
41 task_scheduler = TaskScheduler(iter([producer, consumer]),
42 max_jobs=2)
43
44 - try:
45 - loop.run_until_complete(task_scheduler.async_start())
46 - finally:
47 - # PipeReader closes pr
48 - os.close(pw)
49 + loop.run_until_complete(task_scheduler.async_start())
50
51 task_scheduler.wait()
52 output = portage._unicode_decode(
53
54 diff --git a/lib/portage/tests/util/futures/test_iter_completed.py b/lib/portage/tests/util/futures/test_iter_completed.py
55 index aa24f5685..03ace915a 100644
56 --- a/lib/portage/tests/util/futures/test_iter_completed.py
57 +++ b/lib/portage/tests/util/futures/test_iter_completed.py
58 @@ -76,6 +76,8 @@ class IterCompletedTestCase(TestCase):
59
60 for future_done_set in async_iter_completed(future_generator(),
61 max_jobs=True, max_load=True, loop=loop):
62 + while not input_futures:
63 + loop.run_until_complete(asyncio.sleep(0, loop=loop))
64 future_done_set.cancel()
65 break
66
67
68 diff --git a/lib/portage/util/_async/AsyncScheduler.py b/lib/portage/util/_async/AsyncScheduler.py
69 index c6b523eaa..b9070061a 100644
70 --- a/lib/portage/util/_async/AsyncScheduler.py
71 +++ b/lib/portage/util/_async/AsyncScheduler.py
72 @@ -1,7 +1,11 @@
73 # Copyright 2012-2018 Gentoo Foundation
74 # Distributed under the terms of the GNU General Public License v2
75
76 +import functools
77 +
78 from portage import os
79 +from portage.util.futures import asyncio
80 +from portage.util.futures.compat_coroutine import coroutine
81 from _emerge.AsynchronousTask import AsynchronousTask
82 from _emerge.PollScheduler import PollScheduler
83
84 @@ -62,8 +66,8 @@ class AsyncScheduler(AsynchronousTask, PollScheduler):
85 else:
86 self._running_tasks.add(task)
87 task.scheduler = self._sched_iface
88 - task.addExitListener(self._task_exit)
89 - task.start()
90 + future = asyncio.ensure_future(self._task_coroutine(task), loop=self._sched_iface)
91 + future.add_done_callback(functools.partial(self._task_coroutine_done, task))
92
93 if self._loadavg_check_id is not None:
94 self._loadavg_check_id.cancel()
95 @@ -73,6 +77,18 @@ class AsyncScheduler(AsynchronousTask, PollScheduler):
96 # Triggers cleanup and exit listeners if there's nothing left to do.
97 self.poll()
98
99 + @coroutine
100 + def _task_coroutine(self, task):
101 + yield task.async_start()
102 + yield task.async_wait()
103 +
104 + def _task_coroutine_done(self, task, future):
105 + try:
106 + future.result()
107 + except asyncio.CancelledError:
108 + self.cancel()
109 + self._task_exit(task)
110 +
111 def _task_exit(self, task):
112 self._running_tasks.discard(task)
113 if task.returncode != os.EX_OK:
114
115 diff --git a/lib/portage/util/futures/iter_completed.py b/lib/portage/util/futures/iter_completed.py
116 index 9554b4338..1fb30eb70 100644
117 --- a/lib/portage/util/futures/iter_completed.py
118 +++ b/lib/portage/util/futures/iter_completed.py
119 @@ -6,6 +6,7 @@ import functools
120 from portage.util._async.AsyncTaskFuture import AsyncTaskFuture
121 from portage.util._async.TaskScheduler import TaskScheduler
122 from portage.util.futures import asyncio
123 +from portage.util.futures.compat_coroutine import coroutine, coroutine_return
124 from portage.util.cpuinfo import get_cpu_count
125
126
127 @@ -90,21 +91,42 @@ def async_iter_completed(futures, max_jobs=None, max_load=None, loop=None):
128 if future_done_set.cancelled() and not wait_result.done():
129 wait_result.cancel()
130
131 + @coroutine
132 + def fetch_wait_result(scheduler, first, loop=None):
133 + if first:
134 + yield scheduler.async_start()
135 +
136 + # If the current coroutine awakens just after a call to
137 + # done_callback but before scheduler has been notified of
138 + # corresponding done future(s), then wait here until scheduler
139 + # is notified (which will cause future_map to populate).
140 + while not future_map and scheduler.poll() is None:
141 + yield asyncio.sleep(0, loop=loop)
142 +
143 + if not future_map:
144 + if scheduler.poll() is not None:
145 + coroutine_return((set(), set()))
146 + else:
147 + raise AssertionError('expected non-empty future_map')
148 +
149 + wait_result = yield asyncio.wait(list(future_map.values()),
150 + return_when=asyncio.FIRST_COMPLETED, loop=loop)
151 +
152 + coroutine_return(wait_result)
153 +
154 + first = True
155 try:
156 - scheduler.start()
157 -
158 - # scheduler should ensure that future_map is non-empty until
159 - # task_generator is exhausted
160 - while future_map:
161 - wait_result = asyncio.ensure_future(
162 - asyncio.wait(list(future_map.values()),
163 - return_when=asyncio.FIRST_COMPLETED, loop=loop), loop=loop)
164 + while True:
165 + wait_result = asyncio.ensure_future(fetch_wait_result(scheduler, first, loop=loop), loop=loop)
166 + first = False
167 future_done_set = loop.create_future()
168 future_done_set.add_done_callback(
169 functools.partial(cancel_callback, wait_result))
170 wait_result.add_done_callback(
171 functools.partial(done_callback, future_done_set))
172 yield future_done_set
173 + if not future_map and scheduler.poll() is not None:
174 + break
175 finally:
176 # cleanup in case of interruption by SIGINT, etc
177 scheduler.cancel()