1 |
commit: dc7919541712d846574e6b7d672a3bed0ca7ef1a |
2 |
Author: Zac Medico <zmedico <AT> gentoo <DOT> org> |
3 |
AuthorDate: Tue Aug 18 06:31:54 2020 +0000 |
4 |
Commit: Zac Medico <zmedico <AT> gentoo <DOT> org> |
5 |
CommitDate: Wed Aug 19 04:01:46 2020 +0000 |
6 |
URL: https://gitweb.gentoo.org/proj/portage.git/commit/?id=dc791954 |
7 |
|
8 |
coroutine: use explicit loop parameter (bug 737698) |
9 |
|
10 |
In order to support local event loops within API functions |
11 |
like doebuild, use an explicit loop parameter when calling a |
12 |
coroutine. Internal code will now raise an AssertionError if |
13 |
the loop parameter is omitted for a coroutine, but API |
14 |
consumers may omit it. |
15 |
|
16 |
Bug: https://bugs.gentoo.org/737698 |
17 |
Signed-off-by: Zac Medico <zmedico <AT> gentoo.org> |
18 |
|
19 |
lib/_emerge/Binpkg.py | 8 ++-- |
20 |
lib/_emerge/EbuildPhase.py | 16 ++++---- |
21 |
lib/_emerge/Scheduler.py | 4 +- |
22 |
lib/_emerge/SequentialTaskQueue.py | 4 +- |
23 |
lib/_emerge/SpawnProcess.py | 5 ++- |
24 |
lib/portage/dbapi/bintree.py | 12 +++--- |
25 |
lib/portage/dbapi/vartree.py | 8 ++-- |
26 |
.../repository/storage/hardlink_quarantine.py | 26 ++++++------- |
27 |
lib/portage/repository/storage/hardlink_rcu.py | 34 ++++++++-------- |
28 |
lib/portage/repository/storage/inplace.py | 10 ++--- |
29 |
lib/portage/repository/storage/interface.py | 10 ++--- |
30 |
lib/portage/sync/syncbase.py | 2 +- |
31 |
lib/portage/tests/dbapi/test_auxdb.py | 9 +++-- |
32 |
lib/portage/tests/emerge/test_simple.py | 6 +-- |
33 |
lib/portage/tests/process/test_AsyncFunction.py | 4 +- |
34 |
lib/portage/tests/process/test_PipeLogger.py | 2 +- |
35 |
.../util/futures/asyncio/test_child_watcher.py | 4 +- |
36 |
.../tests/util/futures/test_compat_coroutine.py | 45 ++++++++++++---------- |
37 |
lib/portage/tests/util/test_socks5.py | 2 +- |
38 |
lib/portage/util/_async/BuildLogger.py | 4 +- |
39 |
lib/portage/util/_async/ForkProcess.py | 6 +-- |
40 |
lib/portage/util/_async/PipeLogger.py | 4 +- |
41 |
lib/portage/util/_async/SchedulerInterface.py | 4 +- |
42 |
lib/portage/util/futures/_asyncio/process.py | 16 ++++---- |
43 |
lib/portage/util/futures/_sync_decorator.py | 3 +- |
44 |
lib/portage/util/futures/compat_coroutine.py | 7 +++- |
45 |
lib/portage/util/socks5.py | 4 +- |
46 |
repoman/lib/repoman/modules/scan/depend/profile.py | 4 +- |
47 |
28 files changed, 138 insertions(+), 125 deletions(-) |
48 |
|
49 |
diff --git a/lib/_emerge/Binpkg.py b/lib/_emerge/Binpkg.py |
50 |
index b5a69f8e7..9d2909d42 100644 |
51 |
--- a/lib/_emerge/Binpkg.py |
52 |
+++ b/lib/_emerge/Binpkg.py |
53 |
@@ -250,11 +250,11 @@ class Binpkg(CompositeTask): |
54 |
return |
55 |
|
56 |
self._start_task( |
57 |
- AsyncTaskFuture(future=self._unpack_metadata()), |
58 |
+ AsyncTaskFuture(future=self._unpack_metadata(loop=self.scheduler)), |
59 |
self._unpack_metadata_exit) |
60 |
|
61 |
@coroutine |
62 |
- def _unpack_metadata(self): |
63 |
+ def _unpack_metadata(self, loop=None): |
64 |
|
65 |
dir_path = self.settings['PORTAGE_BUILDDIR'] |
66 |
|
67 |
@@ -271,7 +271,7 @@ class Binpkg(CompositeTask): |
68 |
portage.prepare_build_dirs(self.settings["ROOT"], self.settings, 1) |
69 |
self._writemsg_level(">>> Extracting info\n") |
70 |
|
71 |
- yield self._bintree.dbapi.unpack_metadata(self.settings, infloc) |
72 |
+ yield self._bintree.dbapi.unpack_metadata(self.settings, infloc, loop=self.scheduler) |
73 |
check_missing_metadata = ("CATEGORY", "PF") |
74 |
for k, v in zip(check_missing_metadata, |
75 |
self._bintree.dbapi.aux_get(self.pkg.cpv, check_missing_metadata)): |
76 |
@@ -333,7 +333,7 @@ class Binpkg(CompositeTask): |
77 |
self._start_task( |
78 |
AsyncTaskFuture(future=self._bintree.dbapi.unpack_contents( |
79 |
self.settings, |
80 |
- self._image_dir)), |
81 |
+ self._image_dir, loop=self.scheduler)), |
82 |
self._unpack_contents_exit) |
83 |
|
84 |
def _unpack_contents_exit(self, unpack_contents): |
85 |
|
86 |
diff --git a/lib/_emerge/EbuildPhase.py b/lib/_emerge/EbuildPhase.py |
87 |
index 4bc2749bd..e4c0428a6 100644 |
88 |
--- a/lib/_emerge/EbuildPhase.py |
89 |
+++ b/lib/_emerge/EbuildPhase.py |
90 |
@@ -70,11 +70,11 @@ class EbuildPhase(CompositeTask): |
91 |
_locked_phases = ("setup", "preinst", "postinst", "prerm", "postrm") |
92 |
|
93 |
def _start(self): |
94 |
- future = asyncio.ensure_future(self._async_start(), loop=self.scheduler) |
95 |
+ future = asyncio.ensure_future(self._async_start(loop=self.scheduler), loop=self.scheduler) |
96 |
self._start_task(AsyncTaskFuture(future=future), self._async_start_exit) |
97 |
|
98 |
@coroutine |
99 |
- def _async_start(self): |
100 |
+ def _async_start(self, loop=None): |
101 |
|
102 |
need_builddir = self.phase not in EbuildProcess._phases_without_builddir |
103 |
|
104 |
@@ -132,7 +132,7 @@ class EbuildPhase(CompositeTask): |
105 |
# Force background=True for this header since it's intended |
106 |
# for the log and it doesn't necessarily need to be visible |
107 |
# elsewhere. |
108 |
- yield self._elog('einfo', msg, background=True) |
109 |
+ yield self._elog('einfo', msg, background=True, loop=self.scheduler) |
110 |
|
111 |
if self.phase == 'package': |
112 |
if 'PORTAGE_BINPKG_TMPFILE' not in self.settings: |
113 |
@@ -403,7 +403,7 @@ class EbuildPhase(CompositeTask): |
114 |
self.wait() |
115 |
|
116 |
@coroutine |
117 |
- def _elog(self, elog_funcname, lines, background=None): |
118 |
+ def _elog(self, elog_funcname, lines, background=None, loop=None): |
119 |
if background is None: |
120 |
background = self.background |
121 |
out = io.StringIO() |
122 |
@@ -435,7 +435,7 @@ class EbuildPhase(CompositeTask): |
123 |
log_file = build_logger.stdin |
124 |
|
125 |
yield self.scheduler.async_output(msg, log_file=log_file, |
126 |
- background=background) |
127 |
+ background=background, loop=self.scheduler) |
128 |
|
129 |
if build_logger is not None: |
130 |
build_logger.stdin.close() |
131 |
@@ -487,7 +487,7 @@ class _PostPhaseCommands(CompositeTask): |
132 |
|
133 |
if 'qa-unresolved-soname-deps' in self.settings.features: |
134 |
# This operates on REQUIRES metadata generated by the above function call. |
135 |
- future = self._soname_deps_qa() |
136 |
+ future = asyncio.ensure_future(self._soname_deps_qa(loop=self.scheduler), loop=self.scheduler) |
137 |
# If an unexpected exception occurs, then this will raise it. |
138 |
future.add_done_callback(lambda future: future.cancelled() or future.result()) |
139 |
self._start_task(AsyncTaskFuture(future=future), self._default_final_exit) |
140 |
@@ -497,7 +497,7 @@ class _PostPhaseCommands(CompositeTask): |
141 |
self._default_final_exit(task) |
142 |
|
143 |
@coroutine |
144 |
- def _soname_deps_qa(self): |
145 |
+ def _soname_deps_qa(self, loop=None): |
146 |
|
147 |
vardb = QueryCommand.get_db()[self.settings['EROOT']]['vartree'].dbapi |
148 |
|
149 |
@@ -512,4 +512,4 @@ class _PostPhaseCommands(CompositeTask): |
150 |
qa_msg.extend("\t%s: %s" % (filename, " ".join(sorted(soname_deps))) |
151 |
for filename, soname_deps in unresolved) |
152 |
qa_msg.append("") |
153 |
- yield self.elog("eqawarn", qa_msg) |
154 |
+ yield self.elog("eqawarn", qa_msg, loop=self.scheduler) |
155 |
|
156 |
diff --git a/lib/_emerge/Scheduler.py b/lib/_emerge/Scheduler.py |
157 |
index 2427d953c..a69421288 100644 |
158 |
--- a/lib/_emerge/Scheduler.py |
159 |
+++ b/lib/_emerge/Scheduler.py |
160 |
@@ -871,7 +871,7 @@ class Scheduler(PollScheduler): |
161 |
infloc = os.path.join(build_dir_path, "build-info") |
162 |
ensure_dirs(infloc) |
163 |
self._sched_iface.run_until_complete( |
164 |
- bintree.dbapi.unpack_metadata(settings, infloc)) |
165 |
+ bintree.dbapi.unpack_metadata(settings, infloc, loop=self._sched_iface)) |
166 |
ebuild_path = os.path.join(infloc, x.pf + ".ebuild") |
167 |
settings.configdict["pkg"]["EMERGE_FROM"] = "binary" |
168 |
settings.configdict["pkg"]["MERGE_TYPE"] = "binary" |
169 |
@@ -1621,7 +1621,7 @@ class Scheduler(PollScheduler): |
170 |
if (self._task_queues.merge and (self._schedule_merge_wakeup_task is None |
171 |
or self._schedule_merge_wakeup_task.done())): |
172 |
self._schedule_merge_wakeup_task = asyncio.ensure_future( |
173 |
- self._task_queues.merge.wait(), loop=self._event_loop) |
174 |
+ self._task_queues.merge.wait(loop=self._event_loop), loop=self._event_loop) |
175 |
self._schedule_merge_wakeup_task.add_done_callback( |
176 |
self._schedule_merge_wakeup) |
177 |
|
178 |
|
179 |
diff --git a/lib/_emerge/SequentialTaskQueue.py b/lib/_emerge/SequentialTaskQueue.py |
180 |
index 40590b76c..02fe19912 100644 |
181 |
--- a/lib/_emerge/SequentialTaskQueue.py |
182 |
+++ b/lib/_emerge/SequentialTaskQueue.py |
183 |
@@ -69,7 +69,7 @@ class SequentialTaskQueue(SlotObject): |
184 |
task.cancel() |
185 |
|
186 |
@coroutine |
187 |
- def wait(self): |
188 |
+ def wait(self, loop=None): |
189 |
""" |
190 |
Wait for the queue to become empty. This method is a coroutine. |
191 |
""" |
192 |
@@ -77,7 +77,7 @@ class SequentialTaskQueue(SlotObject): |
193 |
task = next(iter(self.running_tasks), None) |
194 |
if task is None: |
195 |
# Wait for self.running_tasks to populate. |
196 |
- yield asyncio.sleep(0) |
197 |
+ yield asyncio.sleep(0, loop=loop) |
198 |
else: |
199 |
yield task.async_wait() |
200 |
|
201 |
|
202 |
diff --git a/lib/_emerge/SpawnProcess.py b/lib/_emerge/SpawnProcess.py |
203 |
index cea16df27..c43d60d3f 100644 |
204 |
--- a/lib/_emerge/SpawnProcess.py |
205 |
+++ b/lib/_emerge/SpawnProcess.py |
206 |
@@ -140,11 +140,12 @@ class SpawnProcess(SubProcess): |
207 |
|
208 |
self._registered = True |
209 |
self._main_task_cancel = functools.partial(self._main_cancel, build_logger, pipe_logger) |
210 |
- self._main_task = asyncio.ensure_future(self._main(build_logger, pipe_logger), loop=self.scheduler) |
211 |
+ self._main_task = asyncio.ensure_future( |
212 |
+ self._main(build_logger, pipe_logger, loop=self.scheduler), loop=self.scheduler) |
213 |
self._main_task.add_done_callback(self._main_exit) |
214 |
|
215 |
@coroutine |
216 |
- def _main(self, build_logger, pipe_logger): |
217 |
+ def _main(self, build_logger, pipe_logger, loop=None): |
218 |
try: |
219 |
if pipe_logger.poll() is None: |
220 |
yield pipe_logger.async_wait() |
221 |
|
222 |
diff --git a/lib/portage/dbapi/bintree.py b/lib/portage/dbapi/bintree.py |
223 |
index 59c265688..620865a79 100644 |
224 |
--- a/lib/portage/dbapi/bintree.py |
225 |
+++ b/lib/portage/dbapi/bintree.py |
226 |
@@ -217,7 +217,7 @@ class bindbapi(fakedbapi): |
227 |
|
228 |
|
229 |
@coroutine |
230 |
- def unpack_metadata(self, pkg, dest_dir): |
231 |
+ def unpack_metadata(self, pkg, dest_dir, loop=None): |
232 |
""" |
233 |
Unpack package metadata to a directory. This method is a coroutine. |
234 |
|
235 |
@@ -226,7 +226,7 @@ class bindbapi(fakedbapi): |
236 |
@param dest_dir: destination directory |
237 |
@type dest_dir: str |
238 |
""" |
239 |
- loop = asyncio._wrap_loop() |
240 |
+ loop = asyncio._wrap_loop(loop) |
241 |
if isinstance(pkg, _pkg_str): |
242 |
cpv = pkg |
243 |
else: |
244 |
@@ -234,14 +234,14 @@ class bindbapi(fakedbapi): |
245 |
key = self._instance_key(cpv) |
246 |
add_pkg = self.bintree._additional_pkgs.get(key) |
247 |
if add_pkg is not None: |
248 |
- yield add_pkg._db.unpack_metadata(pkg, dest_dir) |
249 |
+ yield add_pkg._db.unpack_metadata(pkg, dest_dir, loop=loop) |
250 |
else: |
251 |
tbz2_file = self.bintree.getname(cpv) |
252 |
yield loop.run_in_executor(ForkExecutor(loop=loop), |
253 |
portage.xpak.tbz2(tbz2_file).unpackinfo, dest_dir) |
254 |
|
255 |
@coroutine |
256 |
- def unpack_contents(self, pkg, dest_dir): |
257 |
+ def unpack_contents(self, pkg, dest_dir, loop=None): |
258 |
""" |
259 |
Unpack package contents to a directory. This method is a coroutine. |
260 |
|
261 |
@@ -250,7 +250,7 @@ class bindbapi(fakedbapi): |
262 |
@param dest_dir: destination directory |
263 |
@type dest_dir: str |
264 |
""" |
265 |
- loop = asyncio._wrap_loop() |
266 |
+ loop = asyncio._wrap_loop(loop) |
267 |
if isinstance(pkg, _pkg_str): |
268 |
settings = self.settings |
269 |
cpv = pkg |
270 |
@@ -280,7 +280,7 @@ class bindbapi(fakedbapi): |
271 |
add_pkg = self.bintree._additional_pkgs.get(instance_key) |
272 |
if add_pkg is None: |
273 |
raise portage.exception.PackageNotFound(cpv) |
274 |
- yield add_pkg._db.unpack_contents(pkg, dest_dir) |
275 |
+ yield add_pkg._db.unpack_contents(pkg, dest_dir, loop=loop) |
276 |
|
277 |
def cp_list(self, *pargs, **kwargs): |
278 |
if not self.bintree.populated: |
279 |
|
280 |
diff --git a/lib/portage/dbapi/vartree.py b/lib/portage/dbapi/vartree.py |
281 |
index 3eee025ad..1547d2f6d 100644 |
282 |
--- a/lib/portage/dbapi/vartree.py |
283 |
+++ b/lib/portage/dbapi/vartree.py |
284 |
@@ -931,7 +931,7 @@ class vardbapi(dbapi): |
285 |
self._bump_mtime(cpv) |
286 |
|
287 |
@coroutine |
288 |
- def unpack_metadata(self, pkg, dest_dir): |
289 |
+ def unpack_metadata(self, pkg, dest_dir, loop=None): |
290 |
""" |
291 |
Unpack package metadata to a directory. This method is a coroutine. |
292 |
|
293 |
@@ -940,7 +940,7 @@ class vardbapi(dbapi): |
294 |
@param dest_dir: destination directory |
295 |
@type dest_dir: str |
296 |
""" |
297 |
- loop = asyncio._wrap_loop() |
298 |
+ loop = asyncio._wrap_loop(loop) |
299 |
if not isinstance(pkg, portage.config): |
300 |
cpv = pkg |
301 |
else: |
302 |
@@ -956,7 +956,7 @@ class vardbapi(dbapi): |
303 |
|
304 |
@coroutine |
305 |
def unpack_contents(self, pkg, dest_dir, |
306 |
- include_config=None, include_unmodified_config=None): |
307 |
+ include_config=None, include_unmodified_config=None, loop=None): |
308 |
""" |
309 |
Unpack package contents to a directory. This method is a coroutine. |
310 |
|
311 |
@@ -982,7 +982,7 @@ class vardbapi(dbapi): |
312 |
by QUICKPKG_DEFAULT_OPTS). |
313 |
@type include_unmodified_config: bool |
314 |
""" |
315 |
- loop = asyncio._wrap_loop() |
316 |
+ loop = asyncio._wrap_loop(loop) |
317 |
if not isinstance(pkg, portage.config): |
318 |
settings = self.settings |
319 |
cpv = pkg |
320 |
|
321 |
diff --git a/lib/portage/repository/storage/hardlink_quarantine.py b/lib/portage/repository/storage/hardlink_quarantine.py |
322 |
index 3594cb1c9..165ab8324 100644 |
323 |
--- a/lib/portage/repository/storage/hardlink_quarantine.py |
324 |
+++ b/lib/portage/repository/storage/hardlink_quarantine.py |
325 |
@@ -39,59 +39,59 @@ class HardlinkQuarantineRepoStorage(RepoStorageInterface): |
326 |
self._current_update = None |
327 |
|
328 |
@coroutine |
329 |
- def _check_call(self, cmd): |
330 |
+ def _check_call(self, cmd, loop=None): |
331 |
""" |
332 |
Run cmd and raise RepoStorageException on failure. |
333 |
|
334 |
@param cmd: command to executre |
335 |
@type cmd: list |
336 |
""" |
337 |
- p = SpawnProcess(args=cmd, scheduler=asyncio._wrap_loop(), **self._spawn_kwargs) |
338 |
+ p = SpawnProcess(args=cmd, scheduler=asyncio._wrap_loop(loop), **self._spawn_kwargs) |
339 |
p.start() |
340 |
if (yield p.async_wait()) != os.EX_OK: |
341 |
raise RepoStorageException('command exited with status {}: {}'.\ |
342 |
format(p.returncode, ' '.join(cmd))) |
343 |
|
344 |
@coroutine |
345 |
- def init_update(self): |
346 |
+ def init_update(self, loop=None): |
347 |
update_location = os.path.join(self._user_location, '.tmp-unverified-download-quarantine') |
348 |
- yield self._check_call(['rm', '-rf', update_location]) |
349 |
+ yield self._check_call(['rm', '-rf', update_location], loop=loop) |
350 |
|
351 |
# Use rsync --link-dest to hardlink a files into self._update_location, |
352 |
# since cp -l is not portable. |
353 |
yield self._check_call(['rsync', '-a', '--link-dest', self._user_location, |
354 |
'--exclude=/distfiles', '--exclude=/local', '--exclude=/lost+found', '--exclude=/packages', |
355 |
'--exclude', '/{}'.format(os.path.basename(update_location)), |
356 |
- self._user_location + '/', update_location + '/']) |
357 |
+ self._user_location + '/', update_location + '/'], loop=loop) |
358 |
|
359 |
self._update_location = update_location |
360 |
|
361 |
coroutine_return(self._update_location) |
362 |
|
363 |
@property |
364 |
- def current_update(self): |
365 |
+ def current_update(self, loop=None): |
366 |
if self._update_location is None: |
367 |
raise RepoStorageException('current update does not exist') |
368 |
return self._update_location |
369 |
|
370 |
@coroutine |
371 |
- def commit_update(self): |
372 |
+ def commit_update(self, loop=None): |
373 |
update_location = self.current_update |
374 |
self._update_location = None |
375 |
yield self._check_call(['rsync', '-a', '--delete', |
376 |
'--exclude=/distfiles', '--exclude=/local', '--exclude=/lost+found', '--exclude=/packages', |
377 |
'--exclude', '/{}'.format(os.path.basename(update_location)), |
378 |
- update_location + '/', self._user_location + '/']) |
379 |
+ update_location + '/', self._user_location + '/'], loop=loop) |
380 |
|
381 |
- yield self._check_call(['rm', '-rf', update_location]) |
382 |
+ yield self._check_call(['rm', '-rf', update_location], loop=loop) |
383 |
|
384 |
@coroutine |
385 |
- def abort_update(self): |
386 |
+ def abort_update(self, loop=None): |
387 |
if self._update_location is not None: |
388 |
update_location = self._update_location |
389 |
self._update_location = None |
390 |
- yield self._check_call(['rm', '-rf', update_location]) |
391 |
+ yield self._check_call(['rm', '-rf', update_location], loop=loop) |
392 |
|
393 |
@coroutine |
394 |
- def garbage_collection(self): |
395 |
- yield self.abort_update() |
396 |
+ def garbage_collection(self, loop=None): |
397 |
+ yield self.abort_update(loop=loop) |
398 |
|
399 |
diff --git a/lib/portage/repository/storage/hardlink_rcu.py b/lib/portage/repository/storage/hardlink_rcu.py |
400 |
index bb2c8496b..68081494c 100644 |
401 |
--- a/lib/portage/repository/storage/hardlink_rcu.py |
402 |
+++ b/lib/portage/repository/storage/hardlink_rcu.py |
403 |
@@ -105,7 +105,7 @@ class HardlinkRcuRepoStorage(RepoStorageInterface): |
404 |
self._snapshots_dir = os.path.join(self._storage_location, 'snapshots') |
405 |
|
406 |
@coroutine |
407 |
- def _check_call(self, cmd, privileged=False): |
408 |
+ def _check_call(self, cmd, privileged=False, loop=None): |
409 |
""" |
410 |
Run cmd and raise RepoStorageException on failure. |
411 |
|
412 |
@@ -118,16 +118,16 @@ class HardlinkRcuRepoStorage(RepoStorageInterface): |
413 |
kwargs = dict(fd_pipes=self._spawn_kwargs.get('fd_pipes')) |
414 |
else: |
415 |
kwargs = self._spawn_kwargs |
416 |
- p = SpawnProcess(args=cmd, scheduler=asyncio._wrap_loop(), **kwargs) |
417 |
+ p = SpawnProcess(args=cmd, scheduler=asyncio._wrap_loop(loop), **kwargs) |
418 |
p.start() |
419 |
if (yield p.async_wait()) != os.EX_OK: |
420 |
raise RepoStorageException('command exited with status {}: {}'.\ |
421 |
format(p.returncode, ' '.join(cmd))) |
422 |
|
423 |
@coroutine |
424 |
- def init_update(self): |
425 |
+ def init_update(self, loop=None): |
426 |
update_location = os.path.join(self._storage_location, 'update') |
427 |
- yield self._check_call(['rm', '-rf', update_location]) |
428 |
+ yield self._check_call(['rm', '-rf', update_location], loop=loop) |
429 |
|
430 |
# This assumes normal umask permissions if it doesn't exist yet. |
431 |
portage.util.ensure_dirs(self._storage_location) |
432 |
@@ -139,18 +139,18 @@ class HardlinkRcuRepoStorage(RepoStorageInterface): |
433 |
# Use rsync --link-dest to hardlink a files into update_location, |
434 |
# since cp -l is not portable. |
435 |
yield self._check_call(['rsync', '-a', '--link-dest', self._latest_canonical, |
436 |
- self._latest_canonical + '/', update_location + '/']) |
437 |
+ self._latest_canonical + '/', update_location + '/'], loop=loop) |
438 |
|
439 |
elif not os.path.islink(self._user_location): |
440 |
- yield self._migrate(update_location) |
441 |
- update_location = (yield self.init_update()) |
442 |
+ yield self._migrate(update_location, loop=loop) |
443 |
+ update_location = (yield self.init_update(loop=loop)) |
444 |
|
445 |
self._update_location = update_location |
446 |
|
447 |
coroutine_return(self._update_location) |
448 |
|
449 |
@coroutine |
450 |
- def _migrate(self, update_location): |
451 |
+ def _migrate(self, update_location, loop=None): |
452 |
""" |
453 |
When repo.user_location is a normal directory, migrate it to |
454 |
storage so that it can be replaced with a symlink. After migration, |
455 |
@@ -164,26 +164,26 @@ class HardlinkRcuRepoStorage(RepoStorageInterface): |
456 |
os.stat(self._user_location)) |
457 |
# It's probably on a different device, so copy it. |
458 |
yield self._check_call(['rsync', '-a', |
459 |
- self._user_location + '/', update_location + '/']) |
460 |
+ self._user_location + '/', update_location + '/'], loop=loop) |
461 |
|
462 |
# Remove the old copy so that symlink can be created. Run with |
463 |
# maximum privileges, since removal requires write access to |
464 |
# the parent directory. |
465 |
- yield self._check_call(['rm', '-rf', user_location], privileged=True) |
466 |
+ yield self._check_call(['rm', '-rf', user_location], privileged=True, loop=loop) |
467 |
|
468 |
self._update_location = update_location |
469 |
|
470 |
# Make this copy the latest snapshot |
471 |
- yield self.commit_update() |
472 |
+ yield self.commit_update(loop=loop) |
473 |
|
474 |
@property |
475 |
- def current_update(self): |
476 |
+ def current_update(self, loop=None): |
477 |
if self._update_location is None: |
478 |
raise RepoStorageException('current update does not exist') |
479 |
return self._update_location |
480 |
|
481 |
@coroutine |
482 |
- def commit_update(self): |
483 |
+ def commit_update(self, loop=None): |
484 |
update_location = self.current_update |
485 |
self._update_location = None |
486 |
try: |
487 |
@@ -235,14 +235,14 @@ class HardlinkRcuRepoStorage(RepoStorageInterface): |
488 |
yield None |
489 |
|
490 |
@coroutine |
491 |
- def abort_update(self): |
492 |
+ def abort_update(self, loop=None): |
493 |
if self._update_location is not None: |
494 |
update_location = self._update_location |
495 |
self._update_location = None |
496 |
- yield self._check_call(['rm', '-rf', update_location]) |
497 |
+ yield self._check_call(['rm', '-rf', update_location], loop=loop) |
498 |
|
499 |
@coroutine |
500 |
- def garbage_collection(self): |
501 |
+ def garbage_collection(self, loop=None): |
502 |
snap_ttl = datetime.timedelta(days=self._ttl_days) |
503 |
snapshots = sorted(int(name) for name in os.listdir(self._snapshots_dir)) |
504 |
# always preserve the latest snapshot |
505 |
@@ -259,4 +259,4 @@ class HardlinkRcuRepoStorage(RepoStorageInterface): |
506 |
snap_timestamp = datetime.datetime.utcfromtimestamp(st.st_mtime) |
507 |
if (datetime.datetime.utcnow() - snap_timestamp) < snap_ttl: |
508 |
continue |
509 |
- yield self._check_call(['rm', '-rf', snap_path]) |
510 |
+ yield self._check_call(['rm', '-rf', snap_path], loop=loop) |
511 |
|
512 |
diff --git a/lib/portage/repository/storage/inplace.py b/lib/portage/repository/storage/inplace.py |
513 |
index f1117ad03..3dbcbd7ad 100644 |
514 |
--- a/lib/portage/repository/storage/inplace.py |
515 |
+++ b/lib/portage/repository/storage/inplace.py |
516 |
@@ -19,31 +19,31 @@ class InplaceRepoStorage(RepoStorageInterface): |
517 |
self._update_location = None |
518 |
|
519 |
@coroutine |
520 |
- def init_update(self): |
521 |
+ def init_update(self, loop=None): |
522 |
self._update_location = self._user_location |
523 |
coroutine_return(self._update_location) |
524 |
yield None |
525 |
|
526 |
@property |
527 |
- def current_update(self): |
528 |
+ def current_update(self, loop=None): |
529 |
if self._update_location is None: |
530 |
raise RepoStorageException('current update does not exist') |
531 |
return self._update_location |
532 |
|
533 |
@coroutine |
534 |
- def commit_update(self): |
535 |
+ def commit_update(self, loop=None): |
536 |
self.current_update |
537 |
self._update_location = None |
538 |
coroutine_return() |
539 |
yield None |
540 |
|
541 |
@coroutine |
542 |
- def abort_update(self): |
543 |
+ def abort_update(self, loop=None): |
544 |
self._update_location = None |
545 |
coroutine_return() |
546 |
yield None |
547 |
|
548 |
@coroutine |
549 |
- def garbage_collection(self): |
550 |
+ def garbage_collection(self, loop=None): |
551 |
coroutine_return() |
552 |
yield None |
553 |
|
554 |
diff --git a/lib/portage/repository/storage/interface.py b/lib/portage/repository/storage/interface.py |
555 |
index ce8a2a170..4f5be6dbc 100644 |
556 |
--- a/lib/portage/repository/storage/interface.py |
557 |
+++ b/lib/portage/repository/storage/interface.py |
558 |
@@ -33,7 +33,7 @@ class RepoStorageInterface: |
559 |
raise NotImplementedError |
560 |
|
561 |
@coroutine |
562 |
- def init_update(self): |
563 |
+ def init_update(self, loop=None): |
564 |
""" |
565 |
Create an update directory as a destination to sync updates to. |
566 |
The directory will be populated with files from the previous |
567 |
@@ -50,7 +50,7 @@ class RepoStorageInterface: |
568 |
raise NotImplementedError |
569 |
|
570 |
@property |
571 |
- def current_update(self): |
572 |
+ def current_update(self, loop=None): |
573 |
""" |
574 |
Get the current update directory which would have been returned |
575 |
from the most recent call to the init_update method. This raises |
576 |
@@ -63,7 +63,7 @@ class RepoStorageInterface: |
577 |
raise NotImplementedError |
578 |
|
579 |
@coroutine |
580 |
- def commit_update(self): |
581 |
+ def commit_update(self, loop=None): |
582 |
""" |
583 |
Commit the current update directory, so that is becomes the |
584 |
latest immutable snapshot. |
585 |
@@ -71,7 +71,7 @@ class RepoStorageInterface: |
586 |
raise NotImplementedError |
587 |
|
588 |
@coroutine |
589 |
- def abort_update(self): |
590 |
+ def abort_update(self, loop=None): |
591 |
""" |
592 |
Delete the current update directory. If there was not an update |
593 |
in progress, or it has already been committed, then this has |
594 |
@@ -80,7 +80,7 @@ class RepoStorageInterface: |
595 |
raise NotImplementedError |
596 |
|
597 |
@coroutine |
598 |
- def garbage_collection(self): |
599 |
+ def garbage_collection(self, loop=None): |
600 |
""" |
601 |
Remove expired snapshots. |
602 |
""" |
603 |
|
604 |
diff --git a/lib/portage/sync/syncbase.py b/lib/portage/sync/syncbase.py |
605 |
index 5f18e5ba3..8e83b94fb 100644 |
606 |
--- a/lib/portage/sync/syncbase.py |
607 |
+++ b/lib/portage/sync/syncbase.py |
608 |
@@ -108,7 +108,7 @@ class SyncBase: |
609 |
""" |
610 |
if self._repo_storage is None: |
611 |
storage_cls = portage.load_mod(self._select_storage_module()) |
612 |
- self._repo_storage = _sync_methods(storage_cls(self.repo, self.spawn_kwargs)) |
613 |
+ self._repo_storage = _sync_methods(storage_cls(self.repo, self.spawn_kwargs), loop=global_event_loop()) |
614 |
return self._repo_storage |
615 |
|
616 |
@property |
617 |
|
618 |
diff --git a/lib/portage/tests/dbapi/test_auxdb.py b/lib/portage/tests/dbapi/test_auxdb.py |
619 |
index 907c289fb..1029de70d 100644 |
620 |
--- a/lib/portage/tests/dbapi/test_auxdb.py |
621 |
+++ b/lib/portage/tests/dbapi/test_auxdb.py |
622 |
@@ -63,8 +63,9 @@ class AuxdbTestCase(TestCase): |
623 |
portdb = playground.trees[playground.eroot]["porttree"].dbapi |
624 |
|
625 |
def test_func(): |
626 |
- return asyncio._wrap_loop().run_until_complete(self._test_mod_async( |
627 |
- ebuilds, ebuild_inherited, eclass_defined_phases, eclass_depend, portdb)) |
628 |
+ loop = asyncio._wrap_loop() |
629 |
+ return loop.run_until_complete(self._test_mod_async( |
630 |
+ ebuilds, ebuild_inherited, eclass_defined_phases, eclass_depend, portdb, loop=loop)) |
631 |
|
632 |
self.assertTrue(test_func()) |
633 |
|
634 |
@@ -91,10 +92,10 @@ class AuxdbTestCase(TestCase): |
635 |
self.assertEqual(auxdb[cpv]['RESTRICT'], 'test') |
636 |
|
637 |
@coroutine |
638 |
- def _test_mod_async(self, ebuilds, ebuild_inherited, eclass_defined_phases, eclass_depend, portdb): |
639 |
+ def _test_mod_async(self, ebuilds, ebuild_inherited, eclass_defined_phases, eclass_depend, portdb, loop=None): |
640 |
|
641 |
for cpv, metadata in ebuilds.items(): |
642 |
- defined_phases, depend, eapi, inherited = yield portdb.async_aux_get(cpv, ['DEFINED_PHASES', 'DEPEND', 'EAPI', 'INHERITED']) |
643 |
+ defined_phases, depend, eapi, inherited = yield portdb.async_aux_get(cpv, ['DEFINED_PHASES', 'DEPEND', 'EAPI', 'INHERITED'], loop=loop) |
644 |
self.assertEqual(defined_phases, eclass_defined_phases) |
645 |
self.assertEqual(depend, eclass_depend) |
646 |
self.assertEqual(eapi, metadata['EAPI']) |
647 |
|
648 |
diff --git a/lib/portage/tests/emerge/test_simple.py b/lib/portage/tests/emerge/test_simple.py |
649 |
index 94b3076c1..c24f5c603 100644 |
650 |
--- a/lib/portage/tests/emerge/test_simple.py |
651 |
+++ b/lib/portage/tests/emerge/test_simple.py |
652 |
@@ -225,10 +225,10 @@ call_has_and_best_version() { |
653 |
|
654 |
loop = asyncio._wrap_loop() |
655 |
loop.run_until_complete(asyncio.ensure_future( |
656 |
- self._async_test_simple(loop, playground, metadata_xml_files), loop=loop)) |
657 |
+ self._async_test_simple(playground, metadata_xml_files, loop=loop), loop=loop)) |
658 |
|
659 |
@coroutine |
660 |
- def _async_test_simple(self, loop, playground, metadata_xml_files): |
661 |
+ def _async_test_simple(self, playground, metadata_xml_files, loop=None): |
662 |
|
663 |
debug = playground.debug |
664 |
settings = playground.settings |
665 |
@@ -540,7 +540,7 @@ move dev-util/git dev-vcs/git |
666 |
local_env = env |
667 |
|
668 |
proc = yield asyncio.create_subprocess_exec(*args, |
669 |
- env=local_env, stderr=None, stdout=stdout) |
670 |
+ env=local_env, stderr=None, stdout=stdout, loop=loop) |
671 |
|
672 |
if debug: |
673 |
yield proc.wait() |
674 |
|
675 |
diff --git a/lib/portage/tests/process/test_AsyncFunction.py b/lib/portage/tests/process/test_AsyncFunction.py |
676 |
index 3b360e02f..b3f80b8ac 100644 |
677 |
--- a/lib/portage/tests/process/test_AsyncFunction.py |
678 |
+++ b/lib/portage/tests/process/test_AsyncFunction.py |
679 |
@@ -21,7 +21,7 @@ class AsyncFunctionTestCase(TestCase): |
680 |
return ''.join(sys.stdin) |
681 |
|
682 |
@coroutine |
683 |
- def _testAsyncFunctionStdin(self, loop): |
684 |
+ def _testAsyncFunctionStdin(self, loop=None): |
685 |
test_string = '1\n2\n3\n' |
686 |
pr, pw = os.pipe() |
687 |
fd_pipes = {0:pr} |
688 |
@@ -36,7 +36,7 @@ class AsyncFunctionTestCase(TestCase): |
689 |
|
690 |
def testAsyncFunctionStdin(self): |
691 |
loop = asyncio._wrap_loop() |
692 |
- loop.run_until_complete(self._testAsyncFunctionStdin(loop)) |
693 |
+ loop.run_until_complete(self._testAsyncFunctionStdin(loop=loop)) |
694 |
|
695 |
def _test_getpid_fork(self): |
696 |
""" |
697 |
|
698 |
diff --git a/lib/portage/tests/process/test_PipeLogger.py b/lib/portage/tests/process/test_PipeLogger.py |
699 |
index 2bd94cf39..acc3b8af9 100644 |
700 |
--- a/lib/portage/tests/process/test_PipeLogger.py |
701 |
+++ b/lib/portage/tests/process/test_PipeLogger.py |
702 |
@@ -37,7 +37,7 @@ class PipeLoggerTestCase(TestCase): |
703 |
|
704 |
# Before starting the reader, wait here for a moment, in order |
705 |
# to exercise PipeLogger's handling of EAGAIN during write. |
706 |
- yield asyncio.wait([writer], timeout=0.01) |
707 |
+ yield asyncio.wait([writer], timeout=0.01, loop=loop) |
708 |
|
709 |
reader = _reader(pr, loop=loop) |
710 |
yield writer |
711 |
|
712 |
diff --git a/lib/portage/tests/util/futures/asyncio/test_child_watcher.py b/lib/portage/tests/util/futures/asyncio/test_child_watcher.py |
713 |
index 8a8fb3d4f..cd547f008 100644 |
714 |
--- a/lib/portage/tests/util/futures/asyncio/test_child_watcher.py |
715 |
+++ b/lib/portage/tests/util/futures/asyncio/test_child_watcher.py |
716 |
@@ -38,7 +38,7 @@ class ChildWatcherTestCase(TestCase): |
717 |
future.set_result((pid, returncode, args)) |
718 |
|
719 |
@coroutine |
720 |
- def watch_pid(): |
721 |
+ def watch_pid(loop=None): |
722 |
|
723 |
with asyncio.get_child_watcher() as watcher: |
724 |
pids = spawn([true_binary], returnpid=True) |
725 |
@@ -47,7 +47,7 @@ class ChildWatcherTestCase(TestCase): |
726 |
(yield future), |
727 |
(pids[0], os.EX_OK, args_tuple)) |
728 |
|
729 |
- loop.run_until_complete(watch_pid()) |
730 |
+ loop.run_until_complete(watch_pid(loop=loop)) |
731 |
finally: |
732 |
asyncio.set_event_loop_policy(initial_policy) |
733 |
if loop not in (None, global_event_loop()): |
734 |
|
735 |
diff --git a/lib/portage/tests/util/futures/test_compat_coroutine.py b/lib/portage/tests/util/futures/test_compat_coroutine.py |
736 |
index 5a8230432..0fd459cbf 100644 |
737 |
--- a/lib/portage/tests/util/futures/test_compat_coroutine.py |
738 |
+++ b/lib/portage/tests/util/futures/test_compat_coroutine.py |
739 |
@@ -14,12 +14,13 @@ class CompatCoroutineTestCase(TestCase): |
740 |
|
741 |
def test_returning_coroutine(self): |
742 |
@coroutine |
743 |
- def returning_coroutine(): |
744 |
- yield asyncio.sleep(0) |
745 |
+ def returning_coroutine(loop=None): |
746 |
+ yield asyncio.sleep(0, loop=loop) |
747 |
coroutine_return('success') |
748 |
|
749 |
+ loop = asyncio.get_event_loop() |
750 |
self.assertEqual('success', |
751 |
- asyncio.get_event_loop().run_until_complete(returning_coroutine())) |
752 |
+ asyncio.get_event_loop().run_until_complete(returning_coroutine(loop=loop))) |
753 |
|
754 |
def test_raising_coroutine(self): |
755 |
|
756 |
@@ -27,12 +28,13 @@ class CompatCoroutineTestCase(TestCase): |
757 |
pass |
758 |
|
759 |
@coroutine |
760 |
- def raising_coroutine(): |
761 |
- yield asyncio.sleep(0) |
762 |
+ def raising_coroutine(loop=None): |
763 |
+ yield asyncio.sleep(0, loop=loop) |
764 |
raise TestException('exception') |
765 |
|
766 |
+ loop = asyncio.get_event_loop() |
767 |
self.assertRaises(TestException, |
768 |
- asyncio.get_event_loop().run_until_complete, raising_coroutine()) |
769 |
+ loop.run_until_complete, raising_coroutine(loop=loop)) |
770 |
|
771 |
def test_catching_coroutine(self): |
772 |
|
773 |
@@ -109,17 +111,18 @@ class CompatCoroutineTestCase(TestCase): |
774 |
yield future |
775 |
|
776 |
loop = asyncio.get_event_loop() |
777 |
- future = loop.run_until_complete(asyncio.wait([cancelled_future_coroutine()]))[0].pop() |
778 |
+ future = loop.run_until_complete(asyncio.wait([cancelled_future_coroutine(loop=loop)], loop=loop))[0].pop() |
779 |
self.assertTrue(future.cancelled()) |
780 |
|
781 |
def test_yield_expression_result(self): |
782 |
@coroutine |
783 |
- def yield_expression_coroutine(): |
784 |
+ def yield_expression_coroutine(loop=None): |
785 |
for i in range(3): |
786 |
- x = yield asyncio.sleep(0, result=i) |
787 |
+ x = yield asyncio.sleep(0, result=i, loop=loop) |
788 |
self.assertEqual(x, i) |
789 |
|
790 |
- asyncio.get_event_loop().run_until_complete(yield_expression_coroutine()) |
791 |
+ loop = asyncio.get_event_loop() |
792 |
+ loop.run_until_complete(yield_expression_coroutine(loop=loop)) |
793 |
|
794 |
def test_method_coroutine(self): |
795 |
|
796 |
@@ -144,7 +147,7 @@ class CompatCoroutineTestCase(TestCase): |
797 |
return waiter |
798 |
|
799 |
@coroutine |
800 |
- def read(self): |
801 |
+ def read(self, loop=None): |
802 |
while self._value is self._empty: |
803 |
yield self._wait() |
804 |
|
805 |
@@ -154,7 +157,7 @@ class CompatCoroutineTestCase(TestCase): |
806 |
coroutine_return(value) |
807 |
|
808 |
@coroutine |
809 |
- def write(self, value): |
810 |
+ def write(self, value, loop=None): |
811 |
while self._value is not self._empty: |
812 |
yield self._wait() |
813 |
|
814 |
@@ -162,16 +165,16 @@ class CompatCoroutineTestCase(TestCase): |
815 |
self._notify() |
816 |
|
817 |
@coroutine |
818 |
- def writer_coroutine(cubby, values, sentinel): |
819 |
+ def writer_coroutine(cubby, values, sentinel, loop=None): |
820 |
for value in values: |
821 |
- yield cubby.write(value) |
822 |
- yield cubby.write(sentinel) |
823 |
+ yield cubby.write(value, loop=loop) |
824 |
+ yield cubby.write(sentinel, loop=loop) |
825 |
|
826 |
@coroutine |
827 |
- def reader_coroutine(cubby, sentinel): |
828 |
+ def reader_coroutine(cubby, sentinel, loop=None): |
829 |
results = [] |
830 |
while True: |
831 |
- result = yield cubby.read() |
832 |
+ result = yield cubby.read(loop=loop) |
833 |
if result == sentinel: |
834 |
break |
835 |
results.append(result) |
836 |
@@ -180,9 +183,9 @@ class CompatCoroutineTestCase(TestCase): |
837 |
loop = asyncio.get_event_loop() |
838 |
cubby = Cubby(loop) |
839 |
values = list(range(3)) |
840 |
- writer = asyncio.ensure_future(writer_coroutine(cubby, values, None), loop=loop) |
841 |
- reader = asyncio.ensure_future(reader_coroutine(cubby, None), loop=loop) |
842 |
- loop.run_until_complete(asyncio.wait([writer, reader])) |
843 |
+ writer = asyncio.ensure_future(writer_coroutine(cubby, values, None, loop=loop), loop=loop) |
844 |
+ reader = asyncio.ensure_future(reader_coroutine(cubby, None, loop=loop), loop=loop) |
845 |
+ loop.run_until_complete(asyncio.wait([writer, reader], loop=loop)) |
846 |
|
847 |
self.assertEqual(reader.result(), values) |
848 |
|
849 |
@@ -191,7 +194,7 @@ class CompatCoroutineTestCase(TestCase): |
850 |
# blend with synchronous code. |
851 |
sync_cubby = _sync_methods(cubby, loop=loop) |
852 |
sync_reader = _sync_decorator(reader_coroutine, loop=loop) |
853 |
- writer = asyncio.ensure_future(writer_coroutine(cubby, values, None), loop=loop) |
854 |
+ writer = asyncio.ensure_future(writer_coroutine(cubby, values, None, loop=loop), loop=loop) |
855 |
self.assertEqual(sync_reader(cubby, None), values) |
856 |
self.assertTrue(writer.done()) |
857 |
|
858 |
|
859 |
diff --git a/lib/portage/tests/util/test_socks5.py b/lib/portage/tests/util/test_socks5.py |
860 |
index ca32651a7..44d522013 100644 |
861 |
--- a/lib/portage/tests/util/test_socks5.py |
862 |
+++ b/lib/portage/tests/util/test_socks5.py |
863 |
@@ -185,7 +185,7 @@ class Socks5ServerTestCase(TestCase): |
864 |
} |
865 |
|
866 |
proxy = socks5.get_socks5_proxy(settings) |
867 |
- loop.run_until_complete(socks5.proxy.ready()) |
868 |
+ loop.run_until_complete(socks5.proxy.ready(loop=loop)) |
869 |
|
870 |
result = loop.run_until_complete(loop.run_in_executor(None, |
871 |
self._fetch_via_proxy, proxy, host, server.server_port, path)) |
872 |
|
873 |
diff --git a/lib/portage/util/_async/BuildLogger.py b/lib/portage/util/_async/BuildLogger.py |
874 |
index f25f70d5b..5a9c076b6 100644 |
875 |
--- a/lib/portage/util/_async/BuildLogger.py |
876 |
+++ b/lib/portage/util/_async/BuildLogger.py |
877 |
@@ -78,11 +78,11 @@ class BuildLogger(AsynchronousTask): |
878 |
pipe_logger.start() |
879 |
|
880 |
self._main_task_cancel = functools.partial(self._main_cancel, filter_proc, pipe_logger) |
881 |
- self._main_task = asyncio.ensure_future(self._main(filter_proc, pipe_logger), loop=self.scheduler) |
882 |
+ self._main_task = asyncio.ensure_future(self._main(filter_proc, pipe_logger, loop=self.scheduler), loop=self.scheduler) |
883 |
self._main_task.add_done_callback(self._main_exit) |
884 |
|
885 |
@coroutine |
886 |
- def _main(self, filter_proc, pipe_logger): |
887 |
+ def _main(self, filter_proc, pipe_logger, loop=None): |
888 |
try: |
889 |
if pipe_logger.poll() is None: |
890 |
yield pipe_logger.async_wait() |
891 |
|
892 |
diff --git a/lib/portage/util/_async/ForkProcess.py b/lib/portage/util/_async/ForkProcess.py |
893 |
index eb01a6232..3c9c6e22b 100644 |
894 |
--- a/lib/portage/util/_async/ForkProcess.py |
895 |
+++ b/lib/portage/util/_async/ForkProcess.py |
896 |
@@ -47,7 +47,7 @@ class ForkProcess(SpawnProcess): |
897 |
os.close(stdin_dup) |
898 |
|
899 |
self._proc_join_task = asyncio.ensure_future( |
900 |
- self._proc_join(self._proc)) |
901 |
+ self._proc_join(self._proc, loop=self.scheduler), loop=self.scheduler) |
902 |
self._proc_join_task.add_done_callback( |
903 |
functools.partial(self._proc_join_done, self._proc)) |
904 |
|
905 |
@@ -68,7 +68,7 @@ class ForkProcess(SpawnProcess): |
906 |
super(ForkProcess, self)._async_waitpid() |
907 |
|
908 |
@coroutine |
909 |
- def _proc_join(self, proc): |
910 |
+ def _proc_join(self, proc, loop=None): |
911 |
sentinel_reader = self.scheduler.create_future() |
912 |
self.scheduler.add_reader(proc.sentinel, |
913 |
lambda: sentinel_reader.done() or sentinel_reader.set_result(None)) |
914 |
@@ -93,7 +93,7 @@ class ForkProcess(SpawnProcess): |
915 |
proc.join(0) |
916 |
if proc.exitcode is not None: |
917 |
break |
918 |
- yield asyncio.sleep(self._proc_join_interval) |
919 |
+ yield asyncio.sleep(self._proc_join_interval, loop=loop) |
920 |
|
921 |
def _proc_join_done(self, proc, future): |
922 |
future.cancelled() or future.result() |
923 |
|
924 |
diff --git a/lib/portage/util/_async/PipeLogger.py b/lib/portage/util/_async/PipeLogger.py |
925 |
index 2bbdd3ddb..e8203268c 100644 |
926 |
--- a/lib/portage/util/_async/PipeLogger.py |
927 |
+++ b/lib/portage/util/_async/PipeLogger.py |
928 |
@@ -53,7 +53,7 @@ class PipeLogger(AbstractPollTask): |
929 |
fcntl.fcntl(fd, fcntl.F_SETFL, |
930 |
fcntl.fcntl(fd, fcntl.F_GETFL) | os.O_NONBLOCK) |
931 |
|
932 |
- self._io_loop_task = asyncio.ensure_future(self._io_loop(self.input_fd), loop=self.scheduler) |
933 |
+ self._io_loop_task = asyncio.ensure_future(self._io_loop(self.input_fd, loop=self.scheduler), loop=self.scheduler) |
934 |
self._io_loop_task.add_done_callback(self._io_loop_done) |
935 |
self._registered = True |
936 |
|
937 |
@@ -63,7 +63,7 @@ class PipeLogger(AbstractPollTask): |
938 |
self.returncode = self._cancelled_returncode |
939 |
|
940 |
@coroutine |
941 |
- def _io_loop(self, input_file): |
942 |
+ def _io_loop(self, input_file, loop=None): |
943 |
background = self.background |
944 |
stdout_fd = self.stdout_fd |
945 |
log_file = self._log_file |
946 |
|
947 |
diff --git a/lib/portage/util/_async/SchedulerInterface.py b/lib/portage/util/_async/SchedulerInterface.py |
948 |
index 3ff250d1d..2865266eb 100644 |
949 |
--- a/lib/portage/util/_async/SchedulerInterface.py |
950 |
+++ b/lib/portage/util/_async/SchedulerInterface.py |
951 |
@@ -57,7 +57,7 @@ class SchedulerInterface(SlotObject): |
952 |
|
953 |
@coroutine |
954 |
def async_output(self, msg, log_file=None, background=None, |
955 |
- level=0, noiselevel=-1): |
956 |
+ level=0, noiselevel=-1, loop=None): |
957 |
""" |
958 |
Output a msg to stdio (if not in background) and to a log file |
959 |
if provided. |
960 |
@@ -81,7 +81,7 @@ class SchedulerInterface(SlotObject): |
961 |
writemsg_level(msg, level=level, noiselevel=noiselevel) |
962 |
|
963 |
if log_file is not None: |
964 |
- yield _writer(log_file, _unicode_encode(msg)) |
965 |
+ yield _writer(log_file, _unicode_encode(msg), loop=loop) |
966 |
|
967 |
def output(self, msg, log_path=None, background=None, |
968 |
level=0, noiselevel=-1): |
969 |
|
970 |
diff --git a/lib/portage/util/futures/_asyncio/process.py b/lib/portage/util/futures/_asyncio/process.py |
971 |
index 6ff156c9d..275c9031a 100644 |
972 |
--- a/lib/portage/util/futures/_asyncio/process.py |
973 |
+++ b/lib/portage/util/futures/_asyncio/process.py |
974 |
@@ -39,7 +39,7 @@ class _Process: |
975 |
return self._proc.returncode |
976 |
|
977 |
@coroutine |
978 |
- def communicate(self, input=None): # pylint: disable=redefined-builtin |
979 |
+ def communicate(self, input=None, loop=None): # pylint: disable=redefined-builtin |
980 |
""" |
981 |
Read data from stdout and stderr, until end-of-file is reached. |
982 |
Wait for process to terminate. |
983 |
@@ -49,13 +49,14 @@ class _Process: |
984 |
@return: tuple (stdout_data, stderr_data) |
985 |
@rtype: asyncio.Future (or compatible) |
986 |
""" |
987 |
+ loop = asyncio._wrap_loop(loop or self._loop) |
988 |
futures = [] |
989 |
for input_file in (self._proc.stdout, self._proc.stderr): |
990 |
if input_file is None: |
991 |
- future = self._loop.create_future() |
992 |
+ future = loop.create_future() |
993 |
future.set_result(None) |
994 |
else: |
995 |
- future = _reader(input_file, loop=self._loop) |
996 |
+ future = _reader(input_file, loop=loop) |
997 |
futures.append(future) |
998 |
|
999 |
writer = None |
1000 |
@@ -65,11 +66,11 @@ class _Process: |
1001 |
stdin = self._proc.stdin |
1002 |
stdin = os.fdopen(stdin, 'wb', 0) if isinstance(stdin, int) else stdin |
1003 |
_set_nonblocking(stdin.fileno()) |
1004 |
- writer = asyncio.ensure_future(_writer(stdin, input, loop=self._loop), loop=self._loop) |
1005 |
+ writer = asyncio.ensure_future(_writer(stdin, input, loop=loop), loop=loop) |
1006 |
writer.add_done_callback(lambda writer: stdin.close()) |
1007 |
|
1008 |
try: |
1009 |
- yield asyncio.wait(futures + [self.wait()], loop=self._loop) |
1010 |
+ yield asyncio.wait(futures + [self.wait(loop=loop)], loop=loop) |
1011 |
finally: |
1012 |
if writer is not None: |
1013 |
if writer.done(): |
1014 |
@@ -84,14 +85,15 @@ class _Process: |
1015 |
|
1016 |
coroutine_return(tuple(future.result() for future in futures)) |
1017 |
|
1018 |
- def wait(self): |
1019 |
+ def wait(self, loop=None): |
1020 |
""" |
1021 |
Wait for child process to terminate. Set and return returncode attribute. |
1022 |
|
1023 |
@return: returncode |
1024 |
@rtype: asyncio.Future (or compatible) |
1025 |
""" |
1026 |
- waiter = self._loop.create_future() |
1027 |
+ loop = asyncio._wrap_loop(loop or self._loop) |
1028 |
+ waiter = loop.create_future() |
1029 |
if self.returncode is None: |
1030 |
self._waiters.append(waiter) |
1031 |
waiter.add_done_callback(self._waiter_cancel) |
1032 |
|
1033 |
diff --git a/lib/portage/util/futures/_sync_decorator.py b/lib/portage/util/futures/_sync_decorator.py |
1034 |
index 02a0963a7..3da065789 100644 |
1035 |
--- a/lib/portage/util/futures/_sync_decorator.py |
1036 |
+++ b/lib/portage/util/futures/_sync_decorator.py |
1037 |
@@ -15,9 +15,10 @@ def _sync_decorator(func, loop=None): |
1038 |
function that returns a Future) with a wrapper that runs the function |
1039 |
synchronously. |
1040 |
""" |
1041 |
- loop = asyncio._wrap_loop(loop) |
1042 |
@functools.wraps(func) |
1043 |
def wrapper(*args, **kwargs): |
1044 |
+ nonlocal loop |
1045 |
+ loop = kwargs['loop'] = asyncio._wrap_loop(kwargs.get('loop') or loop) |
1046 |
return loop.run_until_complete(func(*args, **kwargs)) |
1047 |
return wrapper |
1048 |
|
1049 |
|
1050 |
diff --git a/lib/portage/util/futures/compat_coroutine.py b/lib/portage/util/futures/compat_coroutine.py |
1051 |
index 79bd0da68..9a0c5c1c8 100644 |
1052 |
--- a/lib/portage/util/futures/compat_coroutine.py |
1053 |
+++ b/lib/portage/util/futures/compat_coroutine.py |
1054 |
@@ -67,7 +67,12 @@ def _generator_future(generator_func, *args, **kwargs): |
1055 |
keyword argument named 'loop' is given, then it is used instead of |
1056 |
the default event loop. |
1057 |
""" |
1058 |
- loop = asyncio._wrap_loop(kwargs.get('loop')) |
1059 |
+ loop = kwargs.get('loop') |
1060 |
+ if loop is None and portage._internal_caller: |
1061 |
+ # Require an explicit loop parameter, in order to support |
1062 |
+ # local event loops (bug 737698). |
1063 |
+ raise AssertionError("Missing required argument 'loop'") |
1064 |
+ loop = asyncio._wrap_loop(loop) |
1065 |
result = loop.create_future() |
1066 |
_GeneratorTask(generator_func(*args, **kwargs), result, loop=loop) |
1067 |
return result |
1068 |
|
1069 |
diff --git a/lib/portage/util/socks5.py b/lib/portage/util/socks5.py |
1070 |
index 65d2400e8..9f22c1dbe 100644 |
1071 |
--- a/lib/portage/util/socks5.py |
1072 |
+++ b/lib/portage/util/socks5.py |
1073 |
@@ -76,7 +76,7 @@ class ProxyManager: |
1074 |
|
1075 |
|
1076 |
@coroutine |
1077 |
- def ready(self): |
1078 |
+ def ready(self, loop=None): |
1079 |
""" |
1080 |
Wait for the proxy socket to become ready. This method is a coroutine. |
1081 |
""" |
1082 |
@@ -98,7 +98,7 @@ class ProxyManager: |
1083 |
except EnvironmentError as e: |
1084 |
if e.errno != errno.ENOENT: |
1085 |
raise |
1086 |
- yield asyncio.sleep(0.2) |
1087 |
+ yield asyncio.sleep(0.2, loop=loop) |
1088 |
else: |
1089 |
break |
1090 |
finally: |
1091 |
|
1092 |
diff --git a/repoman/lib/repoman/modules/scan/depend/profile.py b/repoman/lib/repoman/modules/scan/depend/profile.py |
1093 |
index 1eb69422a..468bc55e2 100644 |
1094 |
--- a/repoman/lib/repoman/modules/scan/depend/profile.py |
1095 |
+++ b/repoman/lib/repoman/modules/scan/depend/profile.py |
1096 |
@@ -146,7 +146,7 @@ class ProfileDependsChecks(ScanBase): |
1097 |
% (ebuild.relative_path, mytype, ", ".join(sorted(atoms)))) |
1098 |
|
1099 |
@coroutine |
1100 |
- def _task(self, task): |
1101 |
+ def _task(self, task, loop=None): |
1102 |
yield task.future |
1103 |
coroutine_return((task, task.future.result())) |
1104 |
|
1105 |
@@ -222,7 +222,7 @@ class ProfileDependsChecks(ScanBase): |
1106 |
yield (task, target()) |
1107 |
else: |
1108 |
task.future = asyncio.ensure_future(loop.run_in_executor(executor, target), loop=loop) |
1109 |
- yield self._task(task) |
1110 |
+ yield self._task(task, loop=loop) |
1111 |
|
1112 |
|
1113 |
def _task_subprocess(self, task, pkg, dep_settings): |