1 |
commit: 85ac23b7c0c58cef72d22281d66d086521c01e3e |
2 |
Author: Zac Medico <zmedico <AT> gentoo <DOT> org> |
3 |
AuthorDate: Sun May 6 11:05:03 2018 +0000 |
4 |
Commit: Zac Medico <zmedico <AT> gentoo <DOT> org> |
5 |
CommitDate: Sun May 6 11:41:45 2018 +0000 |
6 |
URL: https://gitweb.gentoo.org/proj/portage.git/commit/?id=85ac23b7 |
7 |
|
8 |
asyncio: add _wrap_loop helper (bug 654390) |
9 |
|
10 |
In order to deal with asyncio event loop compatibility issues, add |
11 |
a _wrap_loop helper. For example, since python3.4 does not have the |
12 |
AbstractEventLoop.create_future() method, this helper function can |
13 |
be used to add a wrapper that implements the create_future method |
14 |
for python3.4. |
15 |
|
16 |
Bug: https://bugs.gentoo.org/654390 |
17 |
|
18 |
pym/portage/dbapi/porttree.py | 12 ++++++------ |
19 |
.../tests/util/futures/asyncio/test_child_watcher.py | 2 +- |
20 |
.../util/futures/asyncio/test_event_loop_in_fork.py | 8 ++++---- |
21 |
.../tests/util/futures/asyncio/test_pipe_closed.py | 4 ++-- |
22 |
.../util/futures/asyncio/test_run_until_complete.py | 2 +- |
23 |
.../util/futures/asyncio/test_subprocess_exec.py | 4 ++-- |
24 |
pym/portage/util/futures/_asyncio/__init__.py | 19 ++++++++++++++++++- |
25 |
pym/portage/util/futures/_asyncio/tasks.py | 7 +++++-- |
26 |
pym/portage/util/futures/executor/fork.py | 4 ++-- |
27 |
pym/portage/util/futures/iter_completed.py | 7 +++---- |
28 |
pym/portage/util/futures/retry.py | 3 +-- |
29 |
11 files changed, 45 insertions(+), 27 deletions(-) |
30 |
|
31 |
diff --git a/pym/portage/dbapi/porttree.py b/pym/portage/dbapi/porttree.py |
32 |
index 801b5658a..3e36024ff 100644 |
33 |
--- a/pym/portage/dbapi/porttree.py |
34 |
+++ b/pym/portage/dbapi/porttree.py |
35 |
@@ -36,7 +36,7 @@ from portage import _encodings |
36 |
from portage import _unicode_encode |
37 |
from portage import OrderedDict |
38 |
from portage.util._eventloop.EventLoop import EventLoop |
39 |
-from portage.util._eventloop.global_event_loop import global_event_loop |
40 |
+from portage.util.futures import asyncio |
41 |
from portage.util.futures.iter_completed import iter_gather |
42 |
from _emerge.EbuildMetadataPhase import EbuildMetadataPhase |
43 |
|
44 |
@@ -325,8 +325,8 @@ class portdbapi(dbapi): |
45 |
@property |
46 |
def _event_loop(self): |
47 |
if portage._internal_caller: |
48 |
- # For internal portage usage, the global_event_loop is safe. |
49 |
- return global_event_loop() |
50 |
+ # For internal portage usage, asyncio._wrap_loop() is safe. |
51 |
+ return asyncio._wrap_loop() |
52 |
else: |
53 |
# For external API consumers, use a local EventLoop, since |
54 |
# we don't want to assume that it's safe to override the |
55 |
@@ -611,7 +611,7 @@ class portdbapi(dbapi): |
56 |
# to simultaneous instantiation of multiple event loops here. |
57 |
# Callers of this method certainly want the same event loop to |
58 |
# be used for all calls. |
59 |
- loop = loop or global_event_loop() |
60 |
+ loop = asyncio._wrap_loop(loop) |
61 |
future = loop.create_future() |
62 |
cache_me = False |
63 |
if myrepo is not None: |
64 |
@@ -751,7 +751,7 @@ class portdbapi(dbapi): |
65 |
a set of alternative URIs. |
66 |
@rtype: asyncio.Future (or compatible) |
67 |
""" |
68 |
- loop = loop or global_event_loop() |
69 |
+ loop = asyncio._wrap_loop(loop) |
70 |
result = loop.create_future() |
71 |
|
72 |
def aux_get_done(aux_get_future): |
73 |
@@ -1419,7 +1419,7 @@ def _async_manifest_fetchlist(portdb, repo_config, cp, cpv_list=None, |
74 |
@return: a Future resulting in a Mapping compatible with FetchlistDict |
75 |
@rtype: asyncio.Future (or compatible) |
76 |
""" |
77 |
- loop = loop or global_event_loop() |
78 |
+ loop = asyncio._wrap_loop(loop) |
79 |
result = loop.create_future() |
80 |
cpv_list = (portdb.cp_list(cp, mytree=repo_config.location) |
81 |
if cpv_list is None else cpv_list) |
82 |
|
83 |
diff --git a/pym/portage/tests/util/futures/asyncio/test_child_watcher.py b/pym/portage/tests/util/futures/asyncio/test_child_watcher.py |
84 |
index dca01be56..8ef497544 100644 |
85 |
--- a/pym/portage/tests/util/futures/asyncio/test_child_watcher.py |
86 |
+++ b/pym/portage/tests/util/futures/asyncio/test_child_watcher.py |
87 |
@@ -28,7 +28,7 @@ class ChildWatcherTestCase(TestCase): |
88 |
|
89 |
args_tuple = ('hello', 'world') |
90 |
|
91 |
- loop = asyncio.get_event_loop() |
92 |
+ loop = asyncio._wrap_loop() |
93 |
future = loop.create_future() |
94 |
|
95 |
def callback(pid, returncode, *args): |
96 |
|
97 |
diff --git a/pym/portage/tests/util/futures/asyncio/test_event_loop_in_fork.py b/pym/portage/tests/util/futures/asyncio/test_event_loop_in_fork.py |
98 |
index 7868d792a..19588bf3a 100644 |
99 |
--- a/pym/portage/tests/util/futures/asyncio/test_event_loop_in_fork.py |
100 |
+++ b/pym/portage/tests/util/futures/asyncio/test_event_loop_in_fork.py |
101 |
@@ -11,14 +11,14 @@ from portage.util.futures.unix_events import DefaultEventLoopPolicy |
102 |
|
103 |
def fork_main(parent_conn, child_conn): |
104 |
parent_conn.close() |
105 |
- loop = asyncio.get_event_loop() |
106 |
+ loop = asyncio._wrap_loop() |
107 |
# This fails with python's default event loop policy, |
108 |
# see https://bugs.python.org/issue22087. |
109 |
- loop.run_until_complete(asyncio.sleep(0.1)) |
110 |
+ loop.run_until_complete(asyncio.sleep(0.1, loop=loop)) |
111 |
|
112 |
|
113 |
def async_main(fork_exitcode, loop=None): |
114 |
- loop = loop or asyncio.get_event_loop() |
115 |
+ loop = asyncio._wrap_loop(loop) |
116 |
|
117 |
# Since python2.7 does not support Process.sentinel, use Pipe to |
118 |
# monitor for process exit. |
119 |
@@ -48,7 +48,7 @@ class EventLoopInForkTestCase(TestCase): |
120 |
if not isinstance(initial_policy, DefaultEventLoopPolicy): |
121 |
asyncio.set_event_loop_policy(DefaultEventLoopPolicy()) |
122 |
try: |
123 |
- loop = asyncio.get_event_loop() |
124 |
+ loop = asyncio._wrap_loop() |
125 |
fork_exitcode = loop.create_future() |
126 |
# Make async_main fork while the loop is running, which would |
127 |
# trigger https://bugs.python.org/issue22087 with asyncio's |
128 |
|
129 |
diff --git a/pym/portage/tests/util/futures/asyncio/test_pipe_closed.py b/pym/portage/tests/util/futures/asyncio/test_pipe_closed.py |
130 |
index e63829888..c2b468064 100644 |
131 |
--- a/pym/portage/tests/util/futures/asyncio/test_pipe_closed.py |
132 |
+++ b/pym/portage/tests/util/futures/asyncio/test_pipe_closed.py |
133 |
@@ -58,7 +58,7 @@ class ReaderPipeClosedTestCase(_PipeClosedTestCase, TestCase): |
134 |
if not isinstance(initial_policy, DefaultEventLoopPolicy): |
135 |
asyncio.set_event_loop_policy(DefaultEventLoopPolicy()) |
136 |
|
137 |
- loop = asyncio.get_event_loop() |
138 |
+ loop = asyncio._wrap_loop() |
139 |
read_end = os.fdopen(read_end, 'rb', 0) |
140 |
write_end = os.fdopen(write_end, 'wb', 0) |
141 |
try: |
142 |
@@ -95,7 +95,7 @@ class WriterPipeClosedTestCase(_PipeClosedTestCase, TestCase): |
143 |
if not isinstance(initial_policy, DefaultEventLoopPolicy): |
144 |
asyncio.set_event_loop_policy(DefaultEventLoopPolicy()) |
145 |
|
146 |
- loop = asyncio.get_event_loop() |
147 |
+ loop = asyncio._wrap_loop() |
148 |
read_end = os.fdopen(read_end, 'rb', 0) |
149 |
write_end = os.fdopen(write_end, 'wb', 0) |
150 |
try: |
151 |
|
152 |
diff --git a/pym/portage/tests/util/futures/asyncio/test_run_until_complete.py b/pym/portage/tests/util/futures/asyncio/test_run_until_complete.py |
153 |
index fc8f198ca..1a37e4922 100644 |
154 |
--- a/pym/portage/tests/util/futures/asyncio/test_run_until_complete.py |
155 |
+++ b/pym/portage/tests/util/futures/asyncio/test_run_until_complete.py |
156 |
@@ -13,7 +13,7 @@ class RunUntilCompleteTestCase(TestCase): |
157 |
asyncio.set_event_loop_policy(DefaultEventLoopPolicy()) |
158 |
|
159 |
try: |
160 |
- loop = asyncio.get_event_loop() |
161 |
+ loop = asyncio._wrap_loop() |
162 |
f1 = loop.create_future() |
163 |
f2 = loop.create_future() |
164 |
f1.add_done_callback(f2.set_result) |
165 |
|
166 |
diff --git a/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py b/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py |
167 |
index 98983941d..8dc5fa7b9 100644 |
168 |
--- a/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py |
169 |
+++ b/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py |
170 |
@@ -23,7 +23,7 @@ def reader(input_file, loop=None): |
171 |
@return: bytes |
172 |
@rtype: asyncio.Future (or compatible) |
173 |
""" |
174 |
- loop = loop or asyncio.get_event_loop() |
175 |
+ loop = asyncio._wrap_loop(loop) |
176 |
future = loop.create_future() |
177 |
_Reader(future, input_file, loop) |
178 |
return future |
179 |
@@ -61,7 +61,7 @@ class SubprocessExecTestCase(TestCase): |
180 |
asyncio.set_event_loop_policy(DefaultEventLoopPolicy()) |
181 |
|
182 |
try: |
183 |
- test(asyncio.get_event_loop()) |
184 |
+ test(asyncio._wrap_loop()) |
185 |
finally: |
186 |
asyncio.set_event_loop_policy(initial_policy) |
187 |
|
188 |
|
189 |
diff --git a/pym/portage/util/futures/_asyncio/__init__.py b/pym/portage/util/futures/_asyncio/__init__.py |
190 |
index 9ae050874..e62de7a69 100644 |
191 |
--- a/pym/portage/util/futures/_asyncio/__init__.py |
192 |
+++ b/pym/portage/util/futures/_asyncio/__init__.py |
193 |
@@ -137,7 +137,7 @@ def sleep(delay, result=None, loop=None): |
194 |
@rtype: asyncio.Future (or compatible) |
195 |
@return: an instance of Future |
196 |
""" |
197 |
- loop = loop or get_event_loop() |
198 |
+ loop = _wrap_loop(loop) |
199 |
future = loop.create_future() |
200 |
handle = loop.call_later(delay, future.set_result, result) |
201 |
def cancel_callback(future): |
202 |
@@ -145,3 +145,20 @@ def sleep(delay, result=None, loop=None): |
203 |
handle.cancel() |
204 |
future.add_done_callback(cancel_callback) |
205 |
return future |
206 |
+ |
207 |
+ |
208 |
+def _wrap_loop(loop=None): |
209 |
+ """ |
210 |
+ In order to deal with asyncio event loop compatibility issues, |
211 |
+ use this function to wrap the loop parameter for functions |
212 |
+ that support it. For example, since python3.4 does not have the |
213 |
+ AbstractEventLoop.create_future() method, this helper function |
214 |
+ can be used to add a wrapper that implements the create_future |
215 |
+ method for python3.4. |
216 |
+ |
217 |
+ @type loop: asyncio.AbstractEventLoop (or compatible) |
218 |
+ @param loop: event loop |
219 |
+ @rtype: asyncio.AbstractEventLoop (or compatible) |
220 |
+ @return: event loop |
221 |
+ """ |
222 |
+ return loop or get_event_loop() |
223 |
|
224 |
diff --git a/pym/portage/util/futures/_asyncio/tasks.py b/pym/portage/util/futures/_asyncio/tasks.py |
225 |
index 5f10d3c7b..b20765b7a 100644 |
226 |
--- a/pym/portage/util/futures/_asyncio/tasks.py |
227 |
+++ b/pym/portage/util/futures/_asyncio/tasks.py |
228 |
@@ -15,7 +15,10 @@ except ImportError: |
229 |
FIRST_COMPLETED ='FIRST_COMPLETED' |
230 |
FIRST_EXCEPTION = 'FIRST_EXCEPTION' |
231 |
|
232 |
- |
233 |
+import portage |
234 |
+portage.proxy.lazyimport.lazyimport(globals(), |
235 |
+ 'portage.util.futures:asyncio', |
236 |
+) |
237 |
from portage.util._eventloop.global_event_loop import ( |
238 |
global_event_loop as _global_event_loop, |
239 |
) |
240 |
@@ -40,7 +43,7 @@ def wait(futures, loop=None, timeout=None, return_when=ALL_COMPLETED): |
241 |
@return: tuple of (done, pending). |
242 |
@rtype: asyncio.Future (or compatible) |
243 |
""" |
244 |
- loop = loop or _global_event_loop() |
245 |
+ loop = asyncio._wrap_loop(loop) |
246 |
result_future = loop.create_future() |
247 |
_Waiter(futures, timeout, return_when, result_future, loop) |
248 |
return result_future |
249 |
|
250 |
diff --git a/pym/portage/util/futures/executor/fork.py b/pym/portage/util/futures/executor/fork.py |
251 |
index 276ed54f1..72844403c 100644 |
252 |
--- a/pym/portage/util/futures/executor/fork.py |
253 |
+++ b/pym/portage/util/futures/executor/fork.py |
254 |
@@ -13,7 +13,7 @@ import sys |
255 |
import traceback |
256 |
|
257 |
from portage.util._async.AsyncFunction import AsyncFunction |
258 |
-from portage.util._eventloop.global_event_loop import global_event_loop |
259 |
+from portage.util.futures import asyncio |
260 |
|
261 |
|
262 |
class ForkExecutor(object): |
263 |
@@ -25,7 +25,7 @@ class ForkExecutor(object): |
264 |
""" |
265 |
def __init__(self, max_workers=None, loop=None): |
266 |
self._max_workers = max_workers or multiprocessing.cpu_count() |
267 |
- self._loop = loop or global_event_loop() |
268 |
+ self._loop = asyncio._wrap_loop(loop) |
269 |
self._submit_queue = collections.deque() |
270 |
self._running_tasks = {} |
271 |
self._shutdown = False |
272 |
|
273 |
diff --git a/pym/portage/util/futures/iter_completed.py b/pym/portage/util/futures/iter_completed.py |
274 |
index 231b7e3ab..31b5e0c78 100644 |
275 |
--- a/pym/portage/util/futures/iter_completed.py |
276 |
+++ b/pym/portage/util/futures/iter_completed.py |
277 |
@@ -6,7 +6,6 @@ import multiprocessing |
278 |
|
279 |
from portage.util._async.AsyncTaskFuture import AsyncTaskFuture |
280 |
from portage.util._async.TaskScheduler import TaskScheduler |
281 |
-from portage.util._eventloop.global_event_loop import global_event_loop |
282 |
from portage.util.futures import asyncio |
283 |
|
284 |
|
285 |
@@ -30,7 +29,7 @@ def iter_completed(futures, max_jobs=None, max_load=None, loop=None): |
286 |
@return: iterator of futures that are done |
287 |
@rtype: iterator |
288 |
""" |
289 |
- loop = loop or global_event_loop() |
290 |
+ loop = asyncio._wrap_loop(loop) |
291 |
|
292 |
for future_done_set in async_iter_completed(futures, |
293 |
max_jobs=max_jobs, max_load=max_load, loop=loop): |
294 |
@@ -60,7 +59,7 @@ def async_iter_completed(futures, max_jobs=None, max_load=None, loop=None): |
295 |
input futures that are done |
296 |
@rtype: iterator |
297 |
""" |
298 |
- loop = loop or global_event_loop() |
299 |
+ loop = asyncio._wrap_loop(loop) |
300 |
|
301 |
max_jobs = max_jobs or multiprocessing.cpu_count() |
302 |
max_load = max_load or multiprocessing.cpu_count() |
303 |
@@ -133,7 +132,7 @@ def iter_gather(futures, max_jobs=None, max_load=None, loop=None): |
304 |
same order that they were yielded from the input iterator |
305 |
@rtype: asyncio.Future (or compatible) |
306 |
""" |
307 |
- loop = loop or global_event_loop() |
308 |
+ loop = asyncio._wrap_loop(loop) |
309 |
result = loop.create_future() |
310 |
futures_list = [] |
311 |
|
312 |
|
313 |
diff --git a/pym/portage/util/futures/retry.py b/pym/portage/util/futures/retry.py |
314 |
index 82012d2f3..8a51669ff 100644 |
315 |
--- a/pym/portage/util/futures/retry.py |
316 |
+++ b/pym/portage/util/futures/retry.py |
317 |
@@ -9,7 +9,6 @@ __all__ = ( |
318 |
import functools |
319 |
|
320 |
from portage.exception import PortageException |
321 |
-from portage.util._eventloop.global_event_loop import global_event_loop |
322 |
from portage.util.futures import asyncio |
323 |
|
324 |
|
325 |
@@ -67,7 +66,7 @@ def _retry(loop, try_max, try_timeout, overall_timeout, delay_func, |
326 |
@return: func return value |
327 |
@rtype: asyncio.Future (or compatible) |
328 |
""" |
329 |
- loop = loop or global_event_loop() |
330 |
+ loop = asyncio._wrap_loop(loop) |
331 |
future = loop.create_future() |
332 |
_Retry(future, loop, try_max, try_timeout, overall_timeout, delay_func, |
333 |
reraise, functools.partial(func, *args, **kwargs)) |