Gentoo Archives: gentoo-commits

From: Zac Medico <zmedico@g.o>
To: gentoo-commits@l.g.o
Subject: [gentoo-commits] proj/portage:master commit in: lib/portage/tests/process/, lib/portage/util/futures/_asyncio/, ...
Date: Wed, 19 Aug 2020 04:47:06
Message-Id: 1597809706.dc7919541712d846574e6b7d672a3bed0ca7ef1a.zmedico@gentoo
1 commit: dc7919541712d846574e6b7d672a3bed0ca7ef1a
2 Author: Zac Medico <zmedico <AT> gentoo <DOT> org>
3 AuthorDate: Tue Aug 18 06:31:54 2020 +0000
4 Commit: Zac Medico <zmedico <AT> gentoo <DOT> org>
5 CommitDate: Wed Aug 19 04:01:46 2020 +0000
6 URL: https://gitweb.gentoo.org/proj/portage.git/commit/?id=dc791954
7
8 coroutine: use explicit loop parameter (bug 737698)
9
10 In order to support local event loops within API functions
11 like doebuild, use an explicit loop parameter when calling a
12 coroutine. Internal code will now raise an AssertionError if
13 the loop parameter is omitted for a coroutine, but API
14 consumers may omit it.
15
16 Bug: https://bugs.gentoo.org/737698
17 Signed-off-by: Zac Medico <zmedico <AT> gentoo.org>
18
19 lib/_emerge/Binpkg.py | 8 ++--
20 lib/_emerge/EbuildPhase.py | 16 ++++----
21 lib/_emerge/Scheduler.py | 4 +-
22 lib/_emerge/SequentialTaskQueue.py | 4 +-
23 lib/_emerge/SpawnProcess.py | 5 ++-
24 lib/portage/dbapi/bintree.py | 12 +++---
25 lib/portage/dbapi/vartree.py | 8 ++--
26 .../repository/storage/hardlink_quarantine.py | 26 ++++++-------
27 lib/portage/repository/storage/hardlink_rcu.py | 34 ++++++++--------
28 lib/portage/repository/storage/inplace.py | 10 ++---
29 lib/portage/repository/storage/interface.py | 10 ++---
30 lib/portage/sync/syncbase.py | 2 +-
31 lib/portage/tests/dbapi/test_auxdb.py | 9 +++--
32 lib/portage/tests/emerge/test_simple.py | 6 +--
33 lib/portage/tests/process/test_AsyncFunction.py | 4 +-
34 lib/portage/tests/process/test_PipeLogger.py | 2 +-
35 .../util/futures/asyncio/test_child_watcher.py | 4 +-
36 .../tests/util/futures/test_compat_coroutine.py | 45 ++++++++++++----------
37 lib/portage/tests/util/test_socks5.py | 2 +-
38 lib/portage/util/_async/BuildLogger.py | 4 +-
39 lib/portage/util/_async/ForkProcess.py | 6 +--
40 lib/portage/util/_async/PipeLogger.py | 4 +-
41 lib/portage/util/_async/SchedulerInterface.py | 4 +-
42 lib/portage/util/futures/_asyncio/process.py | 16 ++++----
43 lib/portage/util/futures/_sync_decorator.py | 3 +-
44 lib/portage/util/futures/compat_coroutine.py | 7 +++-
45 lib/portage/util/socks5.py | 4 +-
46 repoman/lib/repoman/modules/scan/depend/profile.py | 4 +-
47 28 files changed, 138 insertions(+), 125 deletions(-)
48
49 diff --git a/lib/_emerge/Binpkg.py b/lib/_emerge/Binpkg.py
50 index b5a69f8e7..9d2909d42 100644
51 --- a/lib/_emerge/Binpkg.py
52 +++ b/lib/_emerge/Binpkg.py
53 @@ -250,11 +250,11 @@ class Binpkg(CompositeTask):
54 return
55
56 self._start_task(
57 - AsyncTaskFuture(future=self._unpack_metadata()),
58 + AsyncTaskFuture(future=self._unpack_metadata(loop=self.scheduler)),
59 self._unpack_metadata_exit)
60
61 @coroutine
62 - def _unpack_metadata(self):
63 + def _unpack_metadata(self, loop=None):
64
65 dir_path = self.settings['PORTAGE_BUILDDIR']
66
67 @@ -271,7 +271,7 @@ class Binpkg(CompositeTask):
68 portage.prepare_build_dirs(self.settings["ROOT"], self.settings, 1)
69 self._writemsg_level(">>> Extracting info\n")
70
71 - yield self._bintree.dbapi.unpack_metadata(self.settings, infloc)
72 + yield self._bintree.dbapi.unpack_metadata(self.settings, infloc, loop=self.scheduler)
73 check_missing_metadata = ("CATEGORY", "PF")
74 for k, v in zip(check_missing_metadata,
75 self._bintree.dbapi.aux_get(self.pkg.cpv, check_missing_metadata)):
76 @@ -333,7 +333,7 @@ class Binpkg(CompositeTask):
77 self._start_task(
78 AsyncTaskFuture(future=self._bintree.dbapi.unpack_contents(
79 self.settings,
80 - self._image_dir)),
81 + self._image_dir, loop=self.scheduler)),
82 self._unpack_contents_exit)
83
84 def _unpack_contents_exit(self, unpack_contents):
85
86 diff --git a/lib/_emerge/EbuildPhase.py b/lib/_emerge/EbuildPhase.py
87 index 4bc2749bd..e4c0428a6 100644
88 --- a/lib/_emerge/EbuildPhase.py
89 +++ b/lib/_emerge/EbuildPhase.py
90 @@ -70,11 +70,11 @@ class EbuildPhase(CompositeTask):
91 _locked_phases = ("setup", "preinst", "postinst", "prerm", "postrm")
92
93 def _start(self):
94 - future = asyncio.ensure_future(self._async_start(), loop=self.scheduler)
95 + future = asyncio.ensure_future(self._async_start(loop=self.scheduler), loop=self.scheduler)
96 self._start_task(AsyncTaskFuture(future=future), self._async_start_exit)
97
98 @coroutine
99 - def _async_start(self):
100 + def _async_start(self, loop=None):
101
102 need_builddir = self.phase not in EbuildProcess._phases_without_builddir
103
104 @@ -132,7 +132,7 @@ class EbuildPhase(CompositeTask):
105 # Force background=True for this header since it's intended
106 # for the log and it doesn't necessarily need to be visible
107 # elsewhere.
108 - yield self._elog('einfo', msg, background=True)
109 + yield self._elog('einfo', msg, background=True, loop=self.scheduler)
110
111 if self.phase == 'package':
112 if 'PORTAGE_BINPKG_TMPFILE' not in self.settings:
113 @@ -403,7 +403,7 @@ class EbuildPhase(CompositeTask):
114 self.wait()
115
116 @coroutine
117 - def _elog(self, elog_funcname, lines, background=None):
118 + def _elog(self, elog_funcname, lines, background=None, loop=None):
119 if background is None:
120 background = self.background
121 out = io.StringIO()
122 @@ -435,7 +435,7 @@ class EbuildPhase(CompositeTask):
123 log_file = build_logger.stdin
124
125 yield self.scheduler.async_output(msg, log_file=log_file,
126 - background=background)
127 + background=background, loop=self.scheduler)
128
129 if build_logger is not None:
130 build_logger.stdin.close()
131 @@ -487,7 +487,7 @@ class _PostPhaseCommands(CompositeTask):
132
133 if 'qa-unresolved-soname-deps' in self.settings.features:
134 # This operates on REQUIRES metadata generated by the above function call.
135 - future = self._soname_deps_qa()
136 + future = asyncio.ensure_future(self._soname_deps_qa(loop=self.scheduler), loop=self.scheduler)
137 # If an unexpected exception occurs, then this will raise it.
138 future.add_done_callback(lambda future: future.cancelled() or future.result())
139 self._start_task(AsyncTaskFuture(future=future), self._default_final_exit)
140 @@ -497,7 +497,7 @@ class _PostPhaseCommands(CompositeTask):
141 self._default_final_exit(task)
142
143 @coroutine
144 - def _soname_deps_qa(self):
145 + def _soname_deps_qa(self, loop=None):
146
147 vardb = QueryCommand.get_db()[self.settings['EROOT']]['vartree'].dbapi
148
149 @@ -512,4 +512,4 @@ class _PostPhaseCommands(CompositeTask):
150 qa_msg.extend("\t%s: %s" % (filename, " ".join(sorted(soname_deps)))
151 for filename, soname_deps in unresolved)
152 qa_msg.append("")
153 - yield self.elog("eqawarn", qa_msg)
154 + yield self.elog("eqawarn", qa_msg, loop=self.scheduler)
155
156 diff --git a/lib/_emerge/Scheduler.py b/lib/_emerge/Scheduler.py
157 index 2427d953c..a69421288 100644
158 --- a/lib/_emerge/Scheduler.py
159 +++ b/lib/_emerge/Scheduler.py
160 @@ -871,7 +871,7 @@ class Scheduler(PollScheduler):
161 infloc = os.path.join(build_dir_path, "build-info")
162 ensure_dirs(infloc)
163 self._sched_iface.run_until_complete(
164 - bintree.dbapi.unpack_metadata(settings, infloc))
165 + bintree.dbapi.unpack_metadata(settings, infloc, loop=self._sched_iface))
166 ebuild_path = os.path.join(infloc, x.pf + ".ebuild")
167 settings.configdict["pkg"]["EMERGE_FROM"] = "binary"
168 settings.configdict["pkg"]["MERGE_TYPE"] = "binary"
169 @@ -1621,7 +1621,7 @@ class Scheduler(PollScheduler):
170 if (self._task_queues.merge and (self._schedule_merge_wakeup_task is None
171 or self._schedule_merge_wakeup_task.done())):
172 self._schedule_merge_wakeup_task = asyncio.ensure_future(
173 - self._task_queues.merge.wait(), loop=self._event_loop)
174 + self._task_queues.merge.wait(loop=self._event_loop), loop=self._event_loop)
175 self._schedule_merge_wakeup_task.add_done_callback(
176 self._schedule_merge_wakeup)
177
178
179 diff --git a/lib/_emerge/SequentialTaskQueue.py b/lib/_emerge/SequentialTaskQueue.py
180 index 40590b76c..02fe19912 100644
181 --- a/lib/_emerge/SequentialTaskQueue.py
182 +++ b/lib/_emerge/SequentialTaskQueue.py
183 @@ -69,7 +69,7 @@ class SequentialTaskQueue(SlotObject):
184 task.cancel()
185
186 @coroutine
187 - def wait(self):
188 + def wait(self, loop=None):
189 """
190 Wait for the queue to become empty. This method is a coroutine.
191 """
192 @@ -77,7 +77,7 @@ class SequentialTaskQueue(SlotObject):
193 task = next(iter(self.running_tasks), None)
194 if task is None:
195 # Wait for self.running_tasks to populate.
196 - yield asyncio.sleep(0)
197 + yield asyncio.sleep(0, loop=loop)
198 else:
199 yield task.async_wait()
200
201
202 diff --git a/lib/_emerge/SpawnProcess.py b/lib/_emerge/SpawnProcess.py
203 index cea16df27..c43d60d3f 100644
204 --- a/lib/_emerge/SpawnProcess.py
205 +++ b/lib/_emerge/SpawnProcess.py
206 @@ -140,11 +140,12 @@ class SpawnProcess(SubProcess):
207
208 self._registered = True
209 self._main_task_cancel = functools.partial(self._main_cancel, build_logger, pipe_logger)
210 - self._main_task = asyncio.ensure_future(self._main(build_logger, pipe_logger), loop=self.scheduler)
211 + self._main_task = asyncio.ensure_future(
212 + self._main(build_logger, pipe_logger, loop=self.scheduler), loop=self.scheduler)
213 self._main_task.add_done_callback(self._main_exit)
214
215 @coroutine
216 - def _main(self, build_logger, pipe_logger):
217 + def _main(self, build_logger, pipe_logger, loop=None):
218 try:
219 if pipe_logger.poll() is None:
220 yield pipe_logger.async_wait()
221
222 diff --git a/lib/portage/dbapi/bintree.py b/lib/portage/dbapi/bintree.py
223 index 59c265688..620865a79 100644
224 --- a/lib/portage/dbapi/bintree.py
225 +++ b/lib/portage/dbapi/bintree.py
226 @@ -217,7 +217,7 @@ class bindbapi(fakedbapi):
227
228
229 @coroutine
230 - def unpack_metadata(self, pkg, dest_dir):
231 + def unpack_metadata(self, pkg, dest_dir, loop=None):
232 """
233 Unpack package metadata to a directory. This method is a coroutine.
234
235 @@ -226,7 +226,7 @@ class bindbapi(fakedbapi):
236 @param dest_dir: destination directory
237 @type dest_dir: str
238 """
239 - loop = asyncio._wrap_loop()
240 + loop = asyncio._wrap_loop(loop)
241 if isinstance(pkg, _pkg_str):
242 cpv = pkg
243 else:
244 @@ -234,14 +234,14 @@ class bindbapi(fakedbapi):
245 key = self._instance_key(cpv)
246 add_pkg = self.bintree._additional_pkgs.get(key)
247 if add_pkg is not None:
248 - yield add_pkg._db.unpack_metadata(pkg, dest_dir)
249 + yield add_pkg._db.unpack_metadata(pkg, dest_dir, loop=loop)
250 else:
251 tbz2_file = self.bintree.getname(cpv)
252 yield loop.run_in_executor(ForkExecutor(loop=loop),
253 portage.xpak.tbz2(tbz2_file).unpackinfo, dest_dir)
254
255 @coroutine
256 - def unpack_contents(self, pkg, dest_dir):
257 + def unpack_contents(self, pkg, dest_dir, loop=None):
258 """
259 Unpack package contents to a directory. This method is a coroutine.
260
261 @@ -250,7 +250,7 @@ class bindbapi(fakedbapi):
262 @param dest_dir: destination directory
263 @type dest_dir: str
264 """
265 - loop = asyncio._wrap_loop()
266 + loop = asyncio._wrap_loop(loop)
267 if isinstance(pkg, _pkg_str):
268 settings = self.settings
269 cpv = pkg
270 @@ -280,7 +280,7 @@ class bindbapi(fakedbapi):
271 add_pkg = self.bintree._additional_pkgs.get(instance_key)
272 if add_pkg is None:
273 raise portage.exception.PackageNotFound(cpv)
274 - yield add_pkg._db.unpack_contents(pkg, dest_dir)
275 + yield add_pkg._db.unpack_contents(pkg, dest_dir, loop=loop)
276
277 def cp_list(self, *pargs, **kwargs):
278 if not self.bintree.populated:
279
280 diff --git a/lib/portage/dbapi/vartree.py b/lib/portage/dbapi/vartree.py
281 index 3eee025ad..1547d2f6d 100644
282 --- a/lib/portage/dbapi/vartree.py
283 +++ b/lib/portage/dbapi/vartree.py
284 @@ -931,7 +931,7 @@ class vardbapi(dbapi):
285 self._bump_mtime(cpv)
286
287 @coroutine
288 - def unpack_metadata(self, pkg, dest_dir):
289 + def unpack_metadata(self, pkg, dest_dir, loop=None):
290 """
291 Unpack package metadata to a directory. This method is a coroutine.
292
293 @@ -940,7 +940,7 @@ class vardbapi(dbapi):
294 @param dest_dir: destination directory
295 @type dest_dir: str
296 """
297 - loop = asyncio._wrap_loop()
298 + loop = asyncio._wrap_loop(loop)
299 if not isinstance(pkg, portage.config):
300 cpv = pkg
301 else:
302 @@ -956,7 +956,7 @@ class vardbapi(dbapi):
303
304 @coroutine
305 def unpack_contents(self, pkg, dest_dir,
306 - include_config=None, include_unmodified_config=None):
307 + include_config=None, include_unmodified_config=None, loop=None):
308 """
309 Unpack package contents to a directory. This method is a coroutine.
310
311 @@ -982,7 +982,7 @@ class vardbapi(dbapi):
312 by QUICKPKG_DEFAULT_OPTS).
313 @type include_unmodified_config: bool
314 """
315 - loop = asyncio._wrap_loop()
316 + loop = asyncio._wrap_loop(loop)
317 if not isinstance(pkg, portage.config):
318 settings = self.settings
319 cpv = pkg
320
321 diff --git a/lib/portage/repository/storage/hardlink_quarantine.py b/lib/portage/repository/storage/hardlink_quarantine.py
322 index 3594cb1c9..165ab8324 100644
323 --- a/lib/portage/repository/storage/hardlink_quarantine.py
324 +++ b/lib/portage/repository/storage/hardlink_quarantine.py
325 @@ -39,59 +39,59 @@ class HardlinkQuarantineRepoStorage(RepoStorageInterface):
326 self._current_update = None
327
328 @coroutine
329 - def _check_call(self, cmd):
330 + def _check_call(self, cmd, loop=None):
331 """
332 Run cmd and raise RepoStorageException on failure.
333
334 @param cmd: command to executre
335 @type cmd: list
336 """
337 - p = SpawnProcess(args=cmd, scheduler=asyncio._wrap_loop(), **self._spawn_kwargs)
338 + p = SpawnProcess(args=cmd, scheduler=asyncio._wrap_loop(loop), **self._spawn_kwargs)
339 p.start()
340 if (yield p.async_wait()) != os.EX_OK:
341 raise RepoStorageException('command exited with status {}: {}'.\
342 format(p.returncode, ' '.join(cmd)))
343
344 @coroutine
345 - def init_update(self):
346 + def init_update(self, loop=None):
347 update_location = os.path.join(self._user_location, '.tmp-unverified-download-quarantine')
348 - yield self._check_call(['rm', '-rf', update_location])
349 + yield self._check_call(['rm', '-rf', update_location], loop=loop)
350
351 # Use rsync --link-dest to hardlink a files into self._update_location,
352 # since cp -l is not portable.
353 yield self._check_call(['rsync', '-a', '--link-dest', self._user_location,
354 '--exclude=/distfiles', '--exclude=/local', '--exclude=/lost+found', '--exclude=/packages',
355 '--exclude', '/{}'.format(os.path.basename(update_location)),
356 - self._user_location + '/', update_location + '/'])
357 + self._user_location + '/', update_location + '/'], loop=loop)
358
359 self._update_location = update_location
360
361 coroutine_return(self._update_location)
362
363 @property
364 - def current_update(self):
365 + def current_update(self, loop=None):
366 if self._update_location is None:
367 raise RepoStorageException('current update does not exist')
368 return self._update_location
369
370 @coroutine
371 - def commit_update(self):
372 + def commit_update(self, loop=None):
373 update_location = self.current_update
374 self._update_location = None
375 yield self._check_call(['rsync', '-a', '--delete',
376 '--exclude=/distfiles', '--exclude=/local', '--exclude=/lost+found', '--exclude=/packages',
377 '--exclude', '/{}'.format(os.path.basename(update_location)),
378 - update_location + '/', self._user_location + '/'])
379 + update_location + '/', self._user_location + '/'], loop=loop)
380
381 - yield self._check_call(['rm', '-rf', update_location])
382 + yield self._check_call(['rm', '-rf', update_location], loop=loop)
383
384 @coroutine
385 - def abort_update(self):
386 + def abort_update(self, loop=None):
387 if self._update_location is not None:
388 update_location = self._update_location
389 self._update_location = None
390 - yield self._check_call(['rm', '-rf', update_location])
391 + yield self._check_call(['rm', '-rf', update_location], loop=loop)
392
393 @coroutine
394 - def garbage_collection(self):
395 - yield self.abort_update()
396 + def garbage_collection(self, loop=None):
397 + yield self.abort_update(loop=loop)
398
399 diff --git a/lib/portage/repository/storage/hardlink_rcu.py b/lib/portage/repository/storage/hardlink_rcu.py
400 index bb2c8496b..68081494c 100644
401 --- a/lib/portage/repository/storage/hardlink_rcu.py
402 +++ b/lib/portage/repository/storage/hardlink_rcu.py
403 @@ -105,7 +105,7 @@ class HardlinkRcuRepoStorage(RepoStorageInterface):
404 self._snapshots_dir = os.path.join(self._storage_location, 'snapshots')
405
406 @coroutine
407 - def _check_call(self, cmd, privileged=False):
408 + def _check_call(self, cmd, privileged=False, loop=None):
409 """
410 Run cmd and raise RepoStorageException on failure.
411
412 @@ -118,16 +118,16 @@ class HardlinkRcuRepoStorage(RepoStorageInterface):
413 kwargs = dict(fd_pipes=self._spawn_kwargs.get('fd_pipes'))
414 else:
415 kwargs = self._spawn_kwargs
416 - p = SpawnProcess(args=cmd, scheduler=asyncio._wrap_loop(), **kwargs)
417 + p = SpawnProcess(args=cmd, scheduler=asyncio._wrap_loop(loop), **kwargs)
418 p.start()
419 if (yield p.async_wait()) != os.EX_OK:
420 raise RepoStorageException('command exited with status {}: {}'.\
421 format(p.returncode, ' '.join(cmd)))
422
423 @coroutine
424 - def init_update(self):
425 + def init_update(self, loop=None):
426 update_location = os.path.join(self._storage_location, 'update')
427 - yield self._check_call(['rm', '-rf', update_location])
428 + yield self._check_call(['rm', '-rf', update_location], loop=loop)
429
430 # This assumes normal umask permissions if it doesn't exist yet.
431 portage.util.ensure_dirs(self._storage_location)
432 @@ -139,18 +139,18 @@ class HardlinkRcuRepoStorage(RepoStorageInterface):
433 # Use rsync --link-dest to hardlink a files into update_location,
434 # since cp -l is not portable.
435 yield self._check_call(['rsync', '-a', '--link-dest', self._latest_canonical,
436 - self._latest_canonical + '/', update_location + '/'])
437 + self._latest_canonical + '/', update_location + '/'], loop=loop)
438
439 elif not os.path.islink(self._user_location):
440 - yield self._migrate(update_location)
441 - update_location = (yield self.init_update())
442 + yield self._migrate(update_location, loop=loop)
443 + update_location = (yield self.init_update(loop=loop))
444
445 self._update_location = update_location
446
447 coroutine_return(self._update_location)
448
449 @coroutine
450 - def _migrate(self, update_location):
451 + def _migrate(self, update_location, loop=None):
452 """
453 When repo.user_location is a normal directory, migrate it to
454 storage so that it can be replaced with a symlink. After migration,
455 @@ -164,26 +164,26 @@ class HardlinkRcuRepoStorage(RepoStorageInterface):
456 os.stat(self._user_location))
457 # It's probably on a different device, so copy it.
458 yield self._check_call(['rsync', '-a',
459 - self._user_location + '/', update_location + '/'])
460 + self._user_location + '/', update_location + '/'], loop=loop)
461
462 # Remove the old copy so that symlink can be created. Run with
463 # maximum privileges, since removal requires write access to
464 # the parent directory.
465 - yield self._check_call(['rm', '-rf', user_location], privileged=True)
466 + yield self._check_call(['rm', '-rf', user_location], privileged=True, loop=loop)
467
468 self._update_location = update_location
469
470 # Make this copy the latest snapshot
471 - yield self.commit_update()
472 + yield self.commit_update(loop=loop)
473
474 @property
475 - def current_update(self):
476 + def current_update(self, loop=None):
477 if self._update_location is None:
478 raise RepoStorageException('current update does not exist')
479 return self._update_location
480
481 @coroutine
482 - def commit_update(self):
483 + def commit_update(self, loop=None):
484 update_location = self.current_update
485 self._update_location = None
486 try:
487 @@ -235,14 +235,14 @@ class HardlinkRcuRepoStorage(RepoStorageInterface):
488 yield None
489
490 @coroutine
491 - def abort_update(self):
492 + def abort_update(self, loop=None):
493 if self._update_location is not None:
494 update_location = self._update_location
495 self._update_location = None
496 - yield self._check_call(['rm', '-rf', update_location])
497 + yield self._check_call(['rm', '-rf', update_location], loop=loop)
498
499 @coroutine
500 - def garbage_collection(self):
501 + def garbage_collection(self, loop=None):
502 snap_ttl = datetime.timedelta(days=self._ttl_days)
503 snapshots = sorted(int(name) for name in os.listdir(self._snapshots_dir))
504 # always preserve the latest snapshot
505 @@ -259,4 +259,4 @@ class HardlinkRcuRepoStorage(RepoStorageInterface):
506 snap_timestamp = datetime.datetime.utcfromtimestamp(st.st_mtime)
507 if (datetime.datetime.utcnow() - snap_timestamp) < snap_ttl:
508 continue
509 - yield self._check_call(['rm', '-rf', snap_path])
510 + yield self._check_call(['rm', '-rf', snap_path], loop=loop)
511
512 diff --git a/lib/portage/repository/storage/inplace.py b/lib/portage/repository/storage/inplace.py
513 index f1117ad03..3dbcbd7ad 100644
514 --- a/lib/portage/repository/storage/inplace.py
515 +++ b/lib/portage/repository/storage/inplace.py
516 @@ -19,31 +19,31 @@ class InplaceRepoStorage(RepoStorageInterface):
517 self._update_location = None
518
519 @coroutine
520 - def init_update(self):
521 + def init_update(self, loop=None):
522 self._update_location = self._user_location
523 coroutine_return(self._update_location)
524 yield None
525
526 @property
527 - def current_update(self):
528 + def current_update(self, loop=None):
529 if self._update_location is None:
530 raise RepoStorageException('current update does not exist')
531 return self._update_location
532
533 @coroutine
534 - def commit_update(self):
535 + def commit_update(self, loop=None):
536 self.current_update
537 self._update_location = None
538 coroutine_return()
539 yield None
540
541 @coroutine
542 - def abort_update(self):
543 + def abort_update(self, loop=None):
544 self._update_location = None
545 coroutine_return()
546 yield None
547
548 @coroutine
549 - def garbage_collection(self):
550 + def garbage_collection(self, loop=None):
551 coroutine_return()
552 yield None
553
554 diff --git a/lib/portage/repository/storage/interface.py b/lib/portage/repository/storage/interface.py
555 index ce8a2a170..4f5be6dbc 100644
556 --- a/lib/portage/repository/storage/interface.py
557 +++ b/lib/portage/repository/storage/interface.py
558 @@ -33,7 +33,7 @@ class RepoStorageInterface:
559 raise NotImplementedError
560
561 @coroutine
562 - def init_update(self):
563 + def init_update(self, loop=None):
564 """
565 Create an update directory as a destination to sync updates to.
566 The directory will be populated with files from the previous
567 @@ -50,7 +50,7 @@ class RepoStorageInterface:
568 raise NotImplementedError
569
570 @property
571 - def current_update(self):
572 + def current_update(self, loop=None):
573 """
574 Get the current update directory which would have been returned
575 from the most recent call to the init_update method. This raises
576 @@ -63,7 +63,7 @@ class RepoStorageInterface:
577 raise NotImplementedError
578
579 @coroutine
580 - def commit_update(self):
581 + def commit_update(self, loop=None):
582 """
583 Commit the current update directory, so that is becomes the
584 latest immutable snapshot.
585 @@ -71,7 +71,7 @@ class RepoStorageInterface:
586 raise NotImplementedError
587
588 @coroutine
589 - def abort_update(self):
590 + def abort_update(self, loop=None):
591 """
592 Delete the current update directory. If there was not an update
593 in progress, or it has already been committed, then this has
594 @@ -80,7 +80,7 @@ class RepoStorageInterface:
595 raise NotImplementedError
596
597 @coroutine
598 - def garbage_collection(self):
599 + def garbage_collection(self, loop=None):
600 """
601 Remove expired snapshots.
602 """
603
604 diff --git a/lib/portage/sync/syncbase.py b/lib/portage/sync/syncbase.py
605 index 5f18e5ba3..8e83b94fb 100644
606 --- a/lib/portage/sync/syncbase.py
607 +++ b/lib/portage/sync/syncbase.py
608 @@ -108,7 +108,7 @@ class SyncBase:
609 """
610 if self._repo_storage is None:
611 storage_cls = portage.load_mod(self._select_storage_module())
612 - self._repo_storage = _sync_methods(storage_cls(self.repo, self.spawn_kwargs))
613 + self._repo_storage = _sync_methods(storage_cls(self.repo, self.spawn_kwargs), loop=global_event_loop())
614 return self._repo_storage
615
616 @property
617
618 diff --git a/lib/portage/tests/dbapi/test_auxdb.py b/lib/portage/tests/dbapi/test_auxdb.py
619 index 907c289fb..1029de70d 100644
620 --- a/lib/portage/tests/dbapi/test_auxdb.py
621 +++ b/lib/portage/tests/dbapi/test_auxdb.py
622 @@ -63,8 +63,9 @@ class AuxdbTestCase(TestCase):
623 portdb = playground.trees[playground.eroot]["porttree"].dbapi
624
625 def test_func():
626 - return asyncio._wrap_loop().run_until_complete(self._test_mod_async(
627 - ebuilds, ebuild_inherited, eclass_defined_phases, eclass_depend, portdb))
628 + loop = asyncio._wrap_loop()
629 + return loop.run_until_complete(self._test_mod_async(
630 + ebuilds, ebuild_inherited, eclass_defined_phases, eclass_depend, portdb, loop=loop))
631
632 self.assertTrue(test_func())
633
634 @@ -91,10 +92,10 @@ class AuxdbTestCase(TestCase):
635 self.assertEqual(auxdb[cpv]['RESTRICT'], 'test')
636
637 @coroutine
638 - def _test_mod_async(self, ebuilds, ebuild_inherited, eclass_defined_phases, eclass_depend, portdb):
639 + def _test_mod_async(self, ebuilds, ebuild_inherited, eclass_defined_phases, eclass_depend, portdb, loop=None):
640
641 for cpv, metadata in ebuilds.items():
642 - defined_phases, depend, eapi, inherited = yield portdb.async_aux_get(cpv, ['DEFINED_PHASES', 'DEPEND', 'EAPI', 'INHERITED'])
643 + defined_phases, depend, eapi, inherited = yield portdb.async_aux_get(cpv, ['DEFINED_PHASES', 'DEPEND', 'EAPI', 'INHERITED'], loop=loop)
644 self.assertEqual(defined_phases, eclass_defined_phases)
645 self.assertEqual(depend, eclass_depend)
646 self.assertEqual(eapi, metadata['EAPI'])
647
648 diff --git a/lib/portage/tests/emerge/test_simple.py b/lib/portage/tests/emerge/test_simple.py
649 index 94b3076c1..c24f5c603 100644
650 --- a/lib/portage/tests/emerge/test_simple.py
651 +++ b/lib/portage/tests/emerge/test_simple.py
652 @@ -225,10 +225,10 @@ call_has_and_best_version() {
653
654 loop = asyncio._wrap_loop()
655 loop.run_until_complete(asyncio.ensure_future(
656 - self._async_test_simple(loop, playground, metadata_xml_files), loop=loop))
657 + self._async_test_simple(playground, metadata_xml_files, loop=loop), loop=loop))
658
659 @coroutine
660 - def _async_test_simple(self, loop, playground, metadata_xml_files):
661 + def _async_test_simple(self, playground, metadata_xml_files, loop=None):
662
663 debug = playground.debug
664 settings = playground.settings
665 @@ -540,7 +540,7 @@ move dev-util/git dev-vcs/git
666 local_env = env
667
668 proc = yield asyncio.create_subprocess_exec(*args,
669 - env=local_env, stderr=None, stdout=stdout)
670 + env=local_env, stderr=None, stdout=stdout, loop=loop)
671
672 if debug:
673 yield proc.wait()
674
675 diff --git a/lib/portage/tests/process/test_AsyncFunction.py b/lib/portage/tests/process/test_AsyncFunction.py
676 index 3b360e02f..b3f80b8ac 100644
677 --- a/lib/portage/tests/process/test_AsyncFunction.py
678 +++ b/lib/portage/tests/process/test_AsyncFunction.py
679 @@ -21,7 +21,7 @@ class AsyncFunctionTestCase(TestCase):
680 return ''.join(sys.stdin)
681
682 @coroutine
683 - def _testAsyncFunctionStdin(self, loop):
684 + def _testAsyncFunctionStdin(self, loop=None):
685 test_string = '1\n2\n3\n'
686 pr, pw = os.pipe()
687 fd_pipes = {0:pr}
688 @@ -36,7 +36,7 @@ class AsyncFunctionTestCase(TestCase):
689
690 def testAsyncFunctionStdin(self):
691 loop = asyncio._wrap_loop()
692 - loop.run_until_complete(self._testAsyncFunctionStdin(loop))
693 + loop.run_until_complete(self._testAsyncFunctionStdin(loop=loop))
694
695 def _test_getpid_fork(self):
696 """
697
698 diff --git a/lib/portage/tests/process/test_PipeLogger.py b/lib/portage/tests/process/test_PipeLogger.py
699 index 2bd94cf39..acc3b8af9 100644
700 --- a/lib/portage/tests/process/test_PipeLogger.py
701 +++ b/lib/portage/tests/process/test_PipeLogger.py
702 @@ -37,7 +37,7 @@ class PipeLoggerTestCase(TestCase):
703
704 # Before starting the reader, wait here for a moment, in order
705 # to exercise PipeLogger's handling of EAGAIN during write.
706 - yield asyncio.wait([writer], timeout=0.01)
707 + yield asyncio.wait([writer], timeout=0.01, loop=loop)
708
709 reader = _reader(pr, loop=loop)
710 yield writer
711
712 diff --git a/lib/portage/tests/util/futures/asyncio/test_child_watcher.py b/lib/portage/tests/util/futures/asyncio/test_child_watcher.py
713 index 8a8fb3d4f..cd547f008 100644
714 --- a/lib/portage/tests/util/futures/asyncio/test_child_watcher.py
715 +++ b/lib/portage/tests/util/futures/asyncio/test_child_watcher.py
716 @@ -38,7 +38,7 @@ class ChildWatcherTestCase(TestCase):
717 future.set_result((pid, returncode, args))
718
719 @coroutine
720 - def watch_pid():
721 + def watch_pid(loop=None):
722
723 with asyncio.get_child_watcher() as watcher:
724 pids = spawn([true_binary], returnpid=True)
725 @@ -47,7 +47,7 @@ class ChildWatcherTestCase(TestCase):
726 (yield future),
727 (pids[0], os.EX_OK, args_tuple))
728
729 - loop.run_until_complete(watch_pid())
730 + loop.run_until_complete(watch_pid(loop=loop))
731 finally:
732 asyncio.set_event_loop_policy(initial_policy)
733 if loop not in (None, global_event_loop()):
734
735 diff --git a/lib/portage/tests/util/futures/test_compat_coroutine.py b/lib/portage/tests/util/futures/test_compat_coroutine.py
736 index 5a8230432..0fd459cbf 100644
737 --- a/lib/portage/tests/util/futures/test_compat_coroutine.py
738 +++ b/lib/portage/tests/util/futures/test_compat_coroutine.py
739 @@ -14,12 +14,13 @@ class CompatCoroutineTestCase(TestCase):
740
741 def test_returning_coroutine(self):
742 @coroutine
743 - def returning_coroutine():
744 - yield asyncio.sleep(0)
745 + def returning_coroutine(loop=None):
746 + yield asyncio.sleep(0, loop=loop)
747 coroutine_return('success')
748
749 + loop = asyncio.get_event_loop()
750 self.assertEqual('success',
751 - asyncio.get_event_loop().run_until_complete(returning_coroutine()))
752 + asyncio.get_event_loop().run_until_complete(returning_coroutine(loop=loop)))
753
754 def test_raising_coroutine(self):
755
756 @@ -27,12 +28,13 @@ class CompatCoroutineTestCase(TestCase):
757 pass
758
759 @coroutine
760 - def raising_coroutine():
761 - yield asyncio.sleep(0)
762 + def raising_coroutine(loop=None):
763 + yield asyncio.sleep(0, loop=loop)
764 raise TestException('exception')
765
766 + loop = asyncio.get_event_loop()
767 self.assertRaises(TestException,
768 - asyncio.get_event_loop().run_until_complete, raising_coroutine())
769 + loop.run_until_complete, raising_coroutine(loop=loop))
770
771 def test_catching_coroutine(self):
772
773 @@ -109,17 +111,18 @@ class CompatCoroutineTestCase(TestCase):
774 yield future
775
776 loop = asyncio.get_event_loop()
777 - future = loop.run_until_complete(asyncio.wait([cancelled_future_coroutine()]))[0].pop()
778 + future = loop.run_until_complete(asyncio.wait([cancelled_future_coroutine(loop=loop)], loop=loop))[0].pop()
779 self.assertTrue(future.cancelled())
780
781 def test_yield_expression_result(self):
782 @coroutine
783 - def yield_expression_coroutine():
784 + def yield_expression_coroutine(loop=None):
785 for i in range(3):
786 - x = yield asyncio.sleep(0, result=i)
787 + x = yield asyncio.sleep(0, result=i, loop=loop)
788 self.assertEqual(x, i)
789
790 - asyncio.get_event_loop().run_until_complete(yield_expression_coroutine())
791 + loop = asyncio.get_event_loop()
792 + loop.run_until_complete(yield_expression_coroutine(loop=loop))
793
794 def test_method_coroutine(self):
795
796 @@ -144,7 +147,7 @@ class CompatCoroutineTestCase(TestCase):
797 return waiter
798
799 @coroutine
800 - def read(self):
801 + def read(self, loop=None):
802 while self._value is self._empty:
803 yield self._wait()
804
805 @@ -154,7 +157,7 @@ class CompatCoroutineTestCase(TestCase):
806 coroutine_return(value)
807
808 @coroutine
809 - def write(self, value):
810 + def write(self, value, loop=None):
811 while self._value is not self._empty:
812 yield self._wait()
813
814 @@ -162,16 +165,16 @@ class CompatCoroutineTestCase(TestCase):
815 self._notify()
816
817 @coroutine
818 - def writer_coroutine(cubby, values, sentinel):
819 + def writer_coroutine(cubby, values, sentinel, loop=None):
820 for value in values:
821 - yield cubby.write(value)
822 - yield cubby.write(sentinel)
823 + yield cubby.write(value, loop=loop)
824 + yield cubby.write(sentinel, loop=loop)
825
826 @coroutine
827 - def reader_coroutine(cubby, sentinel):
828 + def reader_coroutine(cubby, sentinel, loop=None):
829 results = []
830 while True:
831 - result = yield cubby.read()
832 + result = yield cubby.read(loop=loop)
833 if result == sentinel:
834 break
835 results.append(result)
836 @@ -180,9 +183,9 @@ class CompatCoroutineTestCase(TestCase):
837 loop = asyncio.get_event_loop()
838 cubby = Cubby(loop)
839 values = list(range(3))
840 - writer = asyncio.ensure_future(writer_coroutine(cubby, values, None), loop=loop)
841 - reader = asyncio.ensure_future(reader_coroutine(cubby, None), loop=loop)
842 - loop.run_until_complete(asyncio.wait([writer, reader]))
843 + writer = asyncio.ensure_future(writer_coroutine(cubby, values, None, loop=loop), loop=loop)
844 + reader = asyncio.ensure_future(reader_coroutine(cubby, None, loop=loop), loop=loop)
845 + loop.run_until_complete(asyncio.wait([writer, reader], loop=loop))
846
847 self.assertEqual(reader.result(), values)
848
849 @@ -191,7 +194,7 @@ class CompatCoroutineTestCase(TestCase):
850 # blend with synchronous code.
851 sync_cubby = _sync_methods(cubby, loop=loop)
852 sync_reader = _sync_decorator(reader_coroutine, loop=loop)
853 - writer = asyncio.ensure_future(writer_coroutine(cubby, values, None), loop=loop)
854 + writer = asyncio.ensure_future(writer_coroutine(cubby, values, None, loop=loop), loop=loop)
855 self.assertEqual(sync_reader(cubby, None), values)
856 self.assertTrue(writer.done())
857
858
859 diff --git a/lib/portage/tests/util/test_socks5.py b/lib/portage/tests/util/test_socks5.py
860 index ca32651a7..44d522013 100644
861 --- a/lib/portage/tests/util/test_socks5.py
862 +++ b/lib/portage/tests/util/test_socks5.py
863 @@ -185,7 +185,7 @@ class Socks5ServerTestCase(TestCase):
864 }
865
866 proxy = socks5.get_socks5_proxy(settings)
867 - loop.run_until_complete(socks5.proxy.ready())
868 + loop.run_until_complete(socks5.proxy.ready(loop=loop))
869
870 result = loop.run_until_complete(loop.run_in_executor(None,
871 self._fetch_via_proxy, proxy, host, server.server_port, path))
872
873 diff --git a/lib/portage/util/_async/BuildLogger.py b/lib/portage/util/_async/BuildLogger.py
874 index f25f70d5b..5a9c076b6 100644
875 --- a/lib/portage/util/_async/BuildLogger.py
876 +++ b/lib/portage/util/_async/BuildLogger.py
877 @@ -78,11 +78,11 @@ class BuildLogger(AsynchronousTask):
878 pipe_logger.start()
879
880 self._main_task_cancel = functools.partial(self._main_cancel, filter_proc, pipe_logger)
881 - self._main_task = asyncio.ensure_future(self._main(filter_proc, pipe_logger), loop=self.scheduler)
882 + self._main_task = asyncio.ensure_future(self._main(filter_proc, pipe_logger, loop=self.scheduler), loop=self.scheduler)
883 self._main_task.add_done_callback(self._main_exit)
884
885 @coroutine
886 - def _main(self, filter_proc, pipe_logger):
887 + def _main(self, filter_proc, pipe_logger, loop=None):
888 try:
889 if pipe_logger.poll() is None:
890 yield pipe_logger.async_wait()
891
892 diff --git a/lib/portage/util/_async/ForkProcess.py b/lib/portage/util/_async/ForkProcess.py
893 index eb01a6232..3c9c6e22b 100644
894 --- a/lib/portage/util/_async/ForkProcess.py
895 +++ b/lib/portage/util/_async/ForkProcess.py
896 @@ -47,7 +47,7 @@ class ForkProcess(SpawnProcess):
897 os.close(stdin_dup)
898
899 self._proc_join_task = asyncio.ensure_future(
900 - self._proc_join(self._proc))
901 + self._proc_join(self._proc, loop=self.scheduler), loop=self.scheduler)
902 self._proc_join_task.add_done_callback(
903 functools.partial(self._proc_join_done, self._proc))
904
905 @@ -68,7 +68,7 @@ class ForkProcess(SpawnProcess):
906 super(ForkProcess, self)._async_waitpid()
907
908 @coroutine
909 - def _proc_join(self, proc):
910 + def _proc_join(self, proc, loop=None):
911 sentinel_reader = self.scheduler.create_future()
912 self.scheduler.add_reader(proc.sentinel,
913 lambda: sentinel_reader.done() or sentinel_reader.set_result(None))
914 @@ -93,7 +93,7 @@ class ForkProcess(SpawnProcess):
915 proc.join(0)
916 if proc.exitcode is not None:
917 break
918 - yield asyncio.sleep(self._proc_join_interval)
919 + yield asyncio.sleep(self._proc_join_interval, loop=loop)
920
921 def _proc_join_done(self, proc, future):
922 future.cancelled() or future.result()
923
924 diff --git a/lib/portage/util/_async/PipeLogger.py b/lib/portage/util/_async/PipeLogger.py
925 index 2bbdd3ddb..e8203268c 100644
926 --- a/lib/portage/util/_async/PipeLogger.py
927 +++ b/lib/portage/util/_async/PipeLogger.py
928 @@ -53,7 +53,7 @@ class PipeLogger(AbstractPollTask):
929 fcntl.fcntl(fd, fcntl.F_SETFL,
930 fcntl.fcntl(fd, fcntl.F_GETFL) | os.O_NONBLOCK)
931
932 - self._io_loop_task = asyncio.ensure_future(self._io_loop(self.input_fd), loop=self.scheduler)
933 + self._io_loop_task = asyncio.ensure_future(self._io_loop(self.input_fd, loop=self.scheduler), loop=self.scheduler)
934 self._io_loop_task.add_done_callback(self._io_loop_done)
935 self._registered = True
936
937 @@ -63,7 +63,7 @@ class PipeLogger(AbstractPollTask):
938 self.returncode = self._cancelled_returncode
939
940 @coroutine
941 - def _io_loop(self, input_file):
942 + def _io_loop(self, input_file, loop=None):
943 background = self.background
944 stdout_fd = self.stdout_fd
945 log_file = self._log_file
946
947 diff --git a/lib/portage/util/_async/SchedulerInterface.py b/lib/portage/util/_async/SchedulerInterface.py
948 index 3ff250d1d..2865266eb 100644
949 --- a/lib/portage/util/_async/SchedulerInterface.py
950 +++ b/lib/portage/util/_async/SchedulerInterface.py
951 @@ -57,7 +57,7 @@ class SchedulerInterface(SlotObject):
952
953 @coroutine
954 def async_output(self, msg, log_file=None, background=None,
955 - level=0, noiselevel=-1):
956 + level=0, noiselevel=-1, loop=None):
957 """
958 Output a msg to stdio (if not in background) and to a log file
959 if provided.
960 @@ -81,7 +81,7 @@ class SchedulerInterface(SlotObject):
961 writemsg_level(msg, level=level, noiselevel=noiselevel)
962
963 if log_file is not None:
964 - yield _writer(log_file, _unicode_encode(msg))
965 + yield _writer(log_file, _unicode_encode(msg), loop=loop)
966
967 def output(self, msg, log_path=None, background=None,
968 level=0, noiselevel=-1):
969
970 diff --git a/lib/portage/util/futures/_asyncio/process.py b/lib/portage/util/futures/_asyncio/process.py
971 index 6ff156c9d..275c9031a 100644
972 --- a/lib/portage/util/futures/_asyncio/process.py
973 +++ b/lib/portage/util/futures/_asyncio/process.py
974 @@ -39,7 +39,7 @@ class _Process:
975 return self._proc.returncode
976
977 @coroutine
978 - def communicate(self, input=None): # pylint: disable=redefined-builtin
979 + def communicate(self, input=None, loop=None): # pylint: disable=redefined-builtin
980 """
981 Read data from stdout and stderr, until end-of-file is reached.
982 Wait for process to terminate.
983 @@ -49,13 +49,14 @@ class _Process:
984 @return: tuple (stdout_data, stderr_data)
985 @rtype: asyncio.Future (or compatible)
986 """
987 + loop = asyncio._wrap_loop(loop or self._loop)
988 futures = []
989 for input_file in (self._proc.stdout, self._proc.stderr):
990 if input_file is None:
991 - future = self._loop.create_future()
992 + future = loop.create_future()
993 future.set_result(None)
994 else:
995 - future = _reader(input_file, loop=self._loop)
996 + future = _reader(input_file, loop=loop)
997 futures.append(future)
998
999 writer = None
1000 @@ -65,11 +66,11 @@ class _Process:
1001 stdin = self._proc.stdin
1002 stdin = os.fdopen(stdin, 'wb', 0) if isinstance(stdin, int) else stdin
1003 _set_nonblocking(stdin.fileno())
1004 - writer = asyncio.ensure_future(_writer(stdin, input, loop=self._loop), loop=self._loop)
1005 + writer = asyncio.ensure_future(_writer(stdin, input, loop=loop), loop=loop)
1006 writer.add_done_callback(lambda writer: stdin.close())
1007
1008 try:
1009 - yield asyncio.wait(futures + [self.wait()], loop=self._loop)
1010 + yield asyncio.wait(futures + [self.wait(loop=loop)], loop=loop)
1011 finally:
1012 if writer is not None:
1013 if writer.done():
1014 @@ -84,14 +85,15 @@ class _Process:
1015
1016 coroutine_return(tuple(future.result() for future in futures))
1017
1018 - def wait(self):
1019 + def wait(self, loop=None):
1020 """
1021 Wait for child process to terminate. Set and return returncode attribute.
1022
1023 @return: returncode
1024 @rtype: asyncio.Future (or compatible)
1025 """
1026 - waiter = self._loop.create_future()
1027 + loop = asyncio._wrap_loop(loop or self._loop)
1028 + waiter = loop.create_future()
1029 if self.returncode is None:
1030 self._waiters.append(waiter)
1031 waiter.add_done_callback(self._waiter_cancel)
1032
1033 diff --git a/lib/portage/util/futures/_sync_decorator.py b/lib/portage/util/futures/_sync_decorator.py
1034 index 02a0963a7..3da065789 100644
1035 --- a/lib/portage/util/futures/_sync_decorator.py
1036 +++ b/lib/portage/util/futures/_sync_decorator.py
1037 @@ -15,9 +15,10 @@ def _sync_decorator(func, loop=None):
1038 function that returns a Future) with a wrapper that runs the function
1039 synchronously.
1040 """
1041 - loop = asyncio._wrap_loop(loop)
1042 @functools.wraps(func)
1043 def wrapper(*args, **kwargs):
1044 + nonlocal loop
1045 + loop = kwargs['loop'] = asyncio._wrap_loop(kwargs.get('loop') or loop)
1046 return loop.run_until_complete(func(*args, **kwargs))
1047 return wrapper
1048
1049
1050 diff --git a/lib/portage/util/futures/compat_coroutine.py b/lib/portage/util/futures/compat_coroutine.py
1051 index 79bd0da68..9a0c5c1c8 100644
1052 --- a/lib/portage/util/futures/compat_coroutine.py
1053 +++ b/lib/portage/util/futures/compat_coroutine.py
1054 @@ -67,7 +67,12 @@ def _generator_future(generator_func, *args, **kwargs):
1055 keyword argument named 'loop' is given, then it is used instead of
1056 the default event loop.
1057 """
1058 - loop = asyncio._wrap_loop(kwargs.get('loop'))
1059 + loop = kwargs.get('loop')
1060 + if loop is None and portage._internal_caller:
1061 + # Require an explicit loop parameter, in order to support
1062 + # local event loops (bug 737698).
1063 + raise AssertionError("Missing required argument 'loop'")
1064 + loop = asyncio._wrap_loop(loop)
1065 result = loop.create_future()
1066 _GeneratorTask(generator_func(*args, **kwargs), result, loop=loop)
1067 return result
1068
1069 diff --git a/lib/portage/util/socks5.py b/lib/portage/util/socks5.py
1070 index 65d2400e8..9f22c1dbe 100644
1071 --- a/lib/portage/util/socks5.py
1072 +++ b/lib/portage/util/socks5.py
1073 @@ -76,7 +76,7 @@ class ProxyManager:
1074
1075
1076 @coroutine
1077 - def ready(self):
1078 + def ready(self, loop=None):
1079 """
1080 Wait for the proxy socket to become ready. This method is a coroutine.
1081 """
1082 @@ -98,7 +98,7 @@ class ProxyManager:
1083 except EnvironmentError as e:
1084 if e.errno != errno.ENOENT:
1085 raise
1086 - yield asyncio.sleep(0.2)
1087 + yield asyncio.sleep(0.2, loop=loop)
1088 else:
1089 break
1090 finally:
1091
1092 diff --git a/repoman/lib/repoman/modules/scan/depend/profile.py b/repoman/lib/repoman/modules/scan/depend/profile.py
1093 index 1eb69422a..468bc55e2 100644
1094 --- a/repoman/lib/repoman/modules/scan/depend/profile.py
1095 +++ b/repoman/lib/repoman/modules/scan/depend/profile.py
1096 @@ -146,7 +146,7 @@ class ProfileDependsChecks(ScanBase):
1097 % (ebuild.relative_path, mytype, ", ".join(sorted(atoms))))
1098
1099 @coroutine
1100 - def _task(self, task):
1101 + def _task(self, task, loop=None):
1102 yield task.future
1103 coroutine_return((task, task.future.result()))
1104
1105 @@ -222,7 +222,7 @@ class ProfileDependsChecks(ScanBase):
1106 yield (task, target())
1107 else:
1108 task.future = asyncio.ensure_future(loop.run_in_executor(executor, target), loop=loop)
1109 - yield self._task(task)
1110 + yield self._task(task, loop=loop)
1111
1112
1113 def _task_subprocess(self, task, pkg, dep_settings):