Gentoo Archives: gentoo-commits

From: Zac Medico <zmedico@g.o>
To: gentoo-commits@l.g.o
Subject: [gentoo-commits] proj/portage:master commit in: pym/portage/util/futures/_asyncio/, pym/portage/tests/util/futures/asyncio/, ...
Date: Sun, 06 May 2018 11:48:54
Message-Id: 1525606905.85ac23b7c0c58cef72d22281d66d086521c01e3e.zmedico@gentoo
1 commit: 85ac23b7c0c58cef72d22281d66d086521c01e3e
2 Author: Zac Medico <zmedico <AT> gentoo <DOT> org>
3 AuthorDate: Sun May 6 11:05:03 2018 +0000
4 Commit: Zac Medico <zmedico <AT> gentoo <DOT> org>
5 CommitDate: Sun May 6 11:41:45 2018 +0000
6 URL: https://gitweb.gentoo.org/proj/portage.git/commit/?id=85ac23b7
7
8 asyncio: add _wrap_loop helper (bug 654390)
9
10 In order to deal with asyncio event loop compatibility issues, add
11 a _wrap_loop helper. For example, since python3.4 does not have the
12 AbstractEventLoop.create_future() method, this helper function can
13 be used to add a wrapper that implements the create_future method
14 for python3.4.
15
16 Bug: https://bugs.gentoo.org/654390
17
18 pym/portage/dbapi/porttree.py | 12 ++++++------
19 .../tests/util/futures/asyncio/test_child_watcher.py | 2 +-
20 .../util/futures/asyncio/test_event_loop_in_fork.py | 8 ++++----
21 .../tests/util/futures/asyncio/test_pipe_closed.py | 4 ++--
22 .../util/futures/asyncio/test_run_until_complete.py | 2 +-
23 .../util/futures/asyncio/test_subprocess_exec.py | 4 ++--
24 pym/portage/util/futures/_asyncio/__init__.py | 19 ++++++++++++++++++-
25 pym/portage/util/futures/_asyncio/tasks.py | 7 +++++--
26 pym/portage/util/futures/executor/fork.py | 4 ++--
27 pym/portage/util/futures/iter_completed.py | 7 +++----
28 pym/portage/util/futures/retry.py | 3 +--
29 11 files changed, 45 insertions(+), 27 deletions(-)
30
31 diff --git a/pym/portage/dbapi/porttree.py b/pym/portage/dbapi/porttree.py
32 index 801b5658a..3e36024ff 100644
33 --- a/pym/portage/dbapi/porttree.py
34 +++ b/pym/portage/dbapi/porttree.py
35 @@ -36,7 +36,7 @@ from portage import _encodings
36 from portage import _unicode_encode
37 from portage import OrderedDict
38 from portage.util._eventloop.EventLoop import EventLoop
39 -from portage.util._eventloop.global_event_loop import global_event_loop
40 +from portage.util.futures import asyncio
41 from portage.util.futures.iter_completed import iter_gather
42 from _emerge.EbuildMetadataPhase import EbuildMetadataPhase
43
44 @@ -325,8 +325,8 @@ class portdbapi(dbapi):
45 @property
46 def _event_loop(self):
47 if portage._internal_caller:
48 - # For internal portage usage, the global_event_loop is safe.
49 - return global_event_loop()
50 + # For internal portage usage, asyncio._wrap_loop() is safe.
51 + return asyncio._wrap_loop()
52 else:
53 # For external API consumers, use a local EventLoop, since
54 # we don't want to assume that it's safe to override the
55 @@ -611,7 +611,7 @@ class portdbapi(dbapi):
56 # to simultaneous instantiation of multiple event loops here.
57 # Callers of this method certainly want the same event loop to
58 # be used for all calls.
59 - loop = loop or global_event_loop()
60 + loop = asyncio._wrap_loop(loop)
61 future = loop.create_future()
62 cache_me = False
63 if myrepo is not None:
64 @@ -751,7 +751,7 @@ class portdbapi(dbapi):
65 a set of alternative URIs.
66 @rtype: asyncio.Future (or compatible)
67 """
68 - loop = loop or global_event_loop()
69 + loop = asyncio._wrap_loop(loop)
70 result = loop.create_future()
71
72 def aux_get_done(aux_get_future):
73 @@ -1419,7 +1419,7 @@ def _async_manifest_fetchlist(portdb, repo_config, cp, cpv_list=None,
74 @return: a Future resulting in a Mapping compatible with FetchlistDict
75 @rtype: asyncio.Future (or compatible)
76 """
77 - loop = loop or global_event_loop()
78 + loop = asyncio._wrap_loop(loop)
79 result = loop.create_future()
80 cpv_list = (portdb.cp_list(cp, mytree=repo_config.location)
81 if cpv_list is None else cpv_list)
82
83 diff --git a/pym/portage/tests/util/futures/asyncio/test_child_watcher.py b/pym/portage/tests/util/futures/asyncio/test_child_watcher.py
84 index dca01be56..8ef497544 100644
85 --- a/pym/portage/tests/util/futures/asyncio/test_child_watcher.py
86 +++ b/pym/portage/tests/util/futures/asyncio/test_child_watcher.py
87 @@ -28,7 +28,7 @@ class ChildWatcherTestCase(TestCase):
88
89 args_tuple = ('hello', 'world')
90
91 - loop = asyncio.get_event_loop()
92 + loop = asyncio._wrap_loop()
93 future = loop.create_future()
94
95 def callback(pid, returncode, *args):
96
97 diff --git a/pym/portage/tests/util/futures/asyncio/test_event_loop_in_fork.py b/pym/portage/tests/util/futures/asyncio/test_event_loop_in_fork.py
98 index 7868d792a..19588bf3a 100644
99 --- a/pym/portage/tests/util/futures/asyncio/test_event_loop_in_fork.py
100 +++ b/pym/portage/tests/util/futures/asyncio/test_event_loop_in_fork.py
101 @@ -11,14 +11,14 @@ from portage.util.futures.unix_events import DefaultEventLoopPolicy
102
103 def fork_main(parent_conn, child_conn):
104 parent_conn.close()
105 - loop = asyncio.get_event_loop()
106 + loop = asyncio._wrap_loop()
107 # This fails with python's default event loop policy,
108 # see https://bugs.python.org/issue22087.
109 - loop.run_until_complete(asyncio.sleep(0.1))
110 + loop.run_until_complete(asyncio.sleep(0.1, loop=loop))
111
112
113 def async_main(fork_exitcode, loop=None):
114 - loop = loop or asyncio.get_event_loop()
115 + loop = asyncio._wrap_loop(loop)
116
117 # Since python2.7 does not support Process.sentinel, use Pipe to
118 # monitor for process exit.
119 @@ -48,7 +48,7 @@ class EventLoopInForkTestCase(TestCase):
120 if not isinstance(initial_policy, DefaultEventLoopPolicy):
121 asyncio.set_event_loop_policy(DefaultEventLoopPolicy())
122 try:
123 - loop = asyncio.get_event_loop()
124 + loop = asyncio._wrap_loop()
125 fork_exitcode = loop.create_future()
126 # Make async_main fork while the loop is running, which would
127 # trigger https://bugs.python.org/issue22087 with asyncio's
128
129 diff --git a/pym/portage/tests/util/futures/asyncio/test_pipe_closed.py b/pym/portage/tests/util/futures/asyncio/test_pipe_closed.py
130 index e63829888..c2b468064 100644
131 --- a/pym/portage/tests/util/futures/asyncio/test_pipe_closed.py
132 +++ b/pym/portage/tests/util/futures/asyncio/test_pipe_closed.py
133 @@ -58,7 +58,7 @@ class ReaderPipeClosedTestCase(_PipeClosedTestCase, TestCase):
134 if not isinstance(initial_policy, DefaultEventLoopPolicy):
135 asyncio.set_event_loop_policy(DefaultEventLoopPolicy())
136
137 - loop = asyncio.get_event_loop()
138 + loop = asyncio._wrap_loop()
139 read_end = os.fdopen(read_end, 'rb', 0)
140 write_end = os.fdopen(write_end, 'wb', 0)
141 try:
142 @@ -95,7 +95,7 @@ class WriterPipeClosedTestCase(_PipeClosedTestCase, TestCase):
143 if not isinstance(initial_policy, DefaultEventLoopPolicy):
144 asyncio.set_event_loop_policy(DefaultEventLoopPolicy())
145
146 - loop = asyncio.get_event_loop()
147 + loop = asyncio._wrap_loop()
148 read_end = os.fdopen(read_end, 'rb', 0)
149 write_end = os.fdopen(write_end, 'wb', 0)
150 try:
151
152 diff --git a/pym/portage/tests/util/futures/asyncio/test_run_until_complete.py b/pym/portage/tests/util/futures/asyncio/test_run_until_complete.py
153 index fc8f198ca..1a37e4922 100644
154 --- a/pym/portage/tests/util/futures/asyncio/test_run_until_complete.py
155 +++ b/pym/portage/tests/util/futures/asyncio/test_run_until_complete.py
156 @@ -13,7 +13,7 @@ class RunUntilCompleteTestCase(TestCase):
157 asyncio.set_event_loop_policy(DefaultEventLoopPolicy())
158
159 try:
160 - loop = asyncio.get_event_loop()
161 + loop = asyncio._wrap_loop()
162 f1 = loop.create_future()
163 f2 = loop.create_future()
164 f1.add_done_callback(f2.set_result)
165
166 diff --git a/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py b/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py
167 index 98983941d..8dc5fa7b9 100644
168 --- a/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py
169 +++ b/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py
170 @@ -23,7 +23,7 @@ def reader(input_file, loop=None):
171 @return: bytes
172 @rtype: asyncio.Future (or compatible)
173 """
174 - loop = loop or asyncio.get_event_loop()
175 + loop = asyncio._wrap_loop(loop)
176 future = loop.create_future()
177 _Reader(future, input_file, loop)
178 return future
179 @@ -61,7 +61,7 @@ class SubprocessExecTestCase(TestCase):
180 asyncio.set_event_loop_policy(DefaultEventLoopPolicy())
181
182 try:
183 - test(asyncio.get_event_loop())
184 + test(asyncio._wrap_loop())
185 finally:
186 asyncio.set_event_loop_policy(initial_policy)
187
188
189 diff --git a/pym/portage/util/futures/_asyncio/__init__.py b/pym/portage/util/futures/_asyncio/__init__.py
190 index 9ae050874..e62de7a69 100644
191 --- a/pym/portage/util/futures/_asyncio/__init__.py
192 +++ b/pym/portage/util/futures/_asyncio/__init__.py
193 @@ -137,7 +137,7 @@ def sleep(delay, result=None, loop=None):
194 @rtype: asyncio.Future (or compatible)
195 @return: an instance of Future
196 """
197 - loop = loop or get_event_loop()
198 + loop = _wrap_loop(loop)
199 future = loop.create_future()
200 handle = loop.call_later(delay, future.set_result, result)
201 def cancel_callback(future):
202 @@ -145,3 +145,20 @@ def sleep(delay, result=None, loop=None):
203 handle.cancel()
204 future.add_done_callback(cancel_callback)
205 return future
206 +
207 +
208 +def _wrap_loop(loop=None):
209 + """
210 + In order to deal with asyncio event loop compatibility issues,
211 + use this function to wrap the loop parameter for functions
212 + that support it. For example, since python3.4 does not have the
213 + AbstractEventLoop.create_future() method, this helper function
214 + can be used to add a wrapper that implements the create_future
215 + method for python3.4.
216 +
217 + @type loop: asyncio.AbstractEventLoop (or compatible)
218 + @param loop: event loop
219 + @rtype: asyncio.AbstractEventLoop (or compatible)
220 + @return: event loop
221 + """
222 + return loop or get_event_loop()
223
224 diff --git a/pym/portage/util/futures/_asyncio/tasks.py b/pym/portage/util/futures/_asyncio/tasks.py
225 index 5f10d3c7b..b20765b7a 100644
226 --- a/pym/portage/util/futures/_asyncio/tasks.py
227 +++ b/pym/portage/util/futures/_asyncio/tasks.py
228 @@ -15,7 +15,10 @@ except ImportError:
229 FIRST_COMPLETED ='FIRST_COMPLETED'
230 FIRST_EXCEPTION = 'FIRST_EXCEPTION'
231
232 -
233 +import portage
234 +portage.proxy.lazyimport.lazyimport(globals(),
235 + 'portage.util.futures:asyncio',
236 +)
237 from portage.util._eventloop.global_event_loop import (
238 global_event_loop as _global_event_loop,
239 )
240 @@ -40,7 +43,7 @@ def wait(futures, loop=None, timeout=None, return_when=ALL_COMPLETED):
241 @return: tuple of (done, pending).
242 @rtype: asyncio.Future (or compatible)
243 """
244 - loop = loop or _global_event_loop()
245 + loop = asyncio._wrap_loop(loop)
246 result_future = loop.create_future()
247 _Waiter(futures, timeout, return_when, result_future, loop)
248 return result_future
249
250 diff --git a/pym/portage/util/futures/executor/fork.py b/pym/portage/util/futures/executor/fork.py
251 index 276ed54f1..72844403c 100644
252 --- a/pym/portage/util/futures/executor/fork.py
253 +++ b/pym/portage/util/futures/executor/fork.py
254 @@ -13,7 +13,7 @@ import sys
255 import traceback
256
257 from portage.util._async.AsyncFunction import AsyncFunction
258 -from portage.util._eventloop.global_event_loop import global_event_loop
259 +from portage.util.futures import asyncio
260
261
262 class ForkExecutor(object):
263 @@ -25,7 +25,7 @@ class ForkExecutor(object):
264 """
265 def __init__(self, max_workers=None, loop=None):
266 self._max_workers = max_workers or multiprocessing.cpu_count()
267 - self._loop = loop or global_event_loop()
268 + self._loop = asyncio._wrap_loop(loop)
269 self._submit_queue = collections.deque()
270 self._running_tasks = {}
271 self._shutdown = False
272
273 diff --git a/pym/portage/util/futures/iter_completed.py b/pym/portage/util/futures/iter_completed.py
274 index 231b7e3ab..31b5e0c78 100644
275 --- a/pym/portage/util/futures/iter_completed.py
276 +++ b/pym/portage/util/futures/iter_completed.py
277 @@ -6,7 +6,6 @@ import multiprocessing
278
279 from portage.util._async.AsyncTaskFuture import AsyncTaskFuture
280 from portage.util._async.TaskScheduler import TaskScheduler
281 -from portage.util._eventloop.global_event_loop import global_event_loop
282 from portage.util.futures import asyncio
283
284
285 @@ -30,7 +29,7 @@ def iter_completed(futures, max_jobs=None, max_load=None, loop=None):
286 @return: iterator of futures that are done
287 @rtype: iterator
288 """
289 - loop = loop or global_event_loop()
290 + loop = asyncio._wrap_loop(loop)
291
292 for future_done_set in async_iter_completed(futures,
293 max_jobs=max_jobs, max_load=max_load, loop=loop):
294 @@ -60,7 +59,7 @@ def async_iter_completed(futures, max_jobs=None, max_load=None, loop=None):
295 input futures that are done
296 @rtype: iterator
297 """
298 - loop = loop or global_event_loop()
299 + loop = asyncio._wrap_loop(loop)
300
301 max_jobs = max_jobs or multiprocessing.cpu_count()
302 max_load = max_load or multiprocessing.cpu_count()
303 @@ -133,7 +132,7 @@ def iter_gather(futures, max_jobs=None, max_load=None, loop=None):
304 same order that they were yielded from the input iterator
305 @rtype: asyncio.Future (or compatible)
306 """
307 - loop = loop or global_event_loop()
308 + loop = asyncio._wrap_loop(loop)
309 result = loop.create_future()
310 futures_list = []
311
312
313 diff --git a/pym/portage/util/futures/retry.py b/pym/portage/util/futures/retry.py
314 index 82012d2f3..8a51669ff 100644
315 --- a/pym/portage/util/futures/retry.py
316 +++ b/pym/portage/util/futures/retry.py
317 @@ -9,7 +9,6 @@ __all__ = (
318 import functools
319
320 from portage.exception import PortageException
321 -from portage.util._eventloop.global_event_loop import global_event_loop
322 from portage.util.futures import asyncio
323
324
325 @@ -67,7 +66,7 @@ def _retry(loop, try_max, try_timeout, overall_timeout, delay_func,
326 @return: func return value
327 @rtype: asyncio.Future (or compatible)
328 """
329 - loop = loop or global_event_loop()
330 + loop = asyncio._wrap_loop(loop)
331 future = loop.create_future()
332 _Retry(future, loop, try_max, try_timeout, overall_timeout, delay_func,
333 reraise, functools.partial(func, *args, **kwargs))