Gentoo Archives: gentoo-commits

From: Zac Medico <zmedico@g.o>
To: gentoo-commits@l.g.o
Subject: [gentoo-commits] proj/portage:master commit in: pym/portage/util/futures/executor/, pym/portage/util/_eventloop/
Date: Mon, 02 Apr 2018 17:11:54
Message-Id: 1522688003.4095be74985c5c2eead5fb480cf37baa11308d62.zmedico@gentoo
1 commit: 4095be74985c5c2eead5fb480cf37baa11308d62
2 Author: Zac Medico <zmedico <AT> gentoo <DOT> org>
3 AuthorDate: Wed Mar 14 08:01:26 2018 +0000
4 Commit: Zac Medico <zmedico <AT> gentoo <DOT> org>
5 CommitDate: Mon Apr 2 16:53:23 2018 +0000
6 URL: https://gitweb.gentoo.org/proj/portage.git/commit/?id=4095be74
7
8 Add ForkExecutor (bug 649588)
9
10 This is useful for asynchronous operations that we might
11 need to cancel if they take too long, since (concurrent.
12 futures.ProcessPoolExecutor tasks are not cancellable).
13 The ability to cancel tasks makes this executor useful
14 as an alternative to portage.exception.AlarmSignal.
15
16 Also add an asyncio-compatible EventLoop.run_in_executor
17 method that uses ForkExecutor as the default executor,
18 which will later be used to implement the corresponding
19 asyncio.AbstractEventLoop run_in_executor method.
20
21 Bug: https://bugs.gentoo.org/649588
22 Reviewed-by: Alec Warner <antarus <AT> gentoo.org>
23
24 pym/portage/util/_eventloop/EventLoop.py | 45 ++++++++-
25 pym/portage/util/futures/executor/__init__.py | 0
26 pym/portage/util/futures/executor/fork.py | 134 ++++++++++++++++++++++++++
27 3 files changed, 178 insertions(+), 1 deletion(-)
28
29 diff --git a/pym/portage/util/_eventloop/EventLoop.py b/pym/portage/util/_eventloop/EventLoop.py
30 index f472a3dae..1574a6837 100644
31 --- a/pym/portage/util/_eventloop/EventLoop.py
32 +++ b/pym/portage/util/_eventloop/EventLoop.py
33 @@ -24,6 +24,7 @@ except ImportError:
34 import portage
35 portage.proxy.lazyimport.lazyimport(globals(),
36 'portage.util.futures.futures:_EventLoopFuture',
37 + 'portage.util.futures.executor.fork:ForkExecutor',
38 )
39
40 from portage import OrderedDict
41 @@ -122,6 +123,7 @@ class EventLoop(object):
42 self._idle_callbacks = OrderedDict()
43 self._timeout_handlers = {}
44 self._timeout_interval = None
45 + self._default_executor = None
46
47 self._poll_obj = None
48 try:
49 @@ -721,6 +723,46 @@ class EventLoop(object):
50 return self._handle(self.timeout_add(
51 delay * 1000, self._call_soon_callback(callback, args)), self)
52
53 + def run_in_executor(self, executor, func, *args):
54 + """
55 + Arrange for a func to be called in the specified executor.
56 +
57 + The executor argument should be an Executor instance. The default
58 + executor is used if executor is None.
59 +
60 + Use functools.partial to pass keywords to the *func*.
61 +
62 + @param executor: executor
63 + @type executor: concurrent.futures.Executor or None
64 + @param func: a function to call
65 + @type func: callable
66 + @return: a Future
67 + @rtype: asyncio.Future (or compatible)
68 + """
69 + if executor is None:
70 + executor = self._default_executor
71 + if executor is None:
72 + executor = ForkExecutor(loop=self)
73 + self._default_executor = executor
74 + return executor.submit(func, *args)
75 +
76 + def close(self):
77 + """Close the event loop.
78 +
79 + This clears the queues and shuts down the executor,
80 + and waits for it to finish.
81 + """
82 + executor = self._default_executor
83 + if executor is not None:
84 + self._default_executor = None
85 + executor.shutdown(wait=True)
86 +
87 + if self._poll_obj is not None:
88 + close = getattr(self._poll_obj, 'close')
89 + if close is not None:
90 + close()
91 + self._poll_obj = None
92 +
93
94 _can_poll_device = None
95
96 @@ -782,10 +824,11 @@ class _epoll_adapter(object):
97 that is associated with an epoll instance will close automatically when
98 it is garbage collected, so it's not necessary to close it explicitly.
99 """
100 - __slots__ = ('_epoll_obj',)
101 + __slots__ = ('_epoll_obj', 'close')
102
103 def __init__(self, epoll_obj):
104 self._epoll_obj = epoll_obj
105 + self.close = epoll_obj.close
106
107 def register(self, fd, *args):
108 self._epoll_obj.register(fd, *args)
109
110 diff --git a/pym/portage/util/futures/executor/__init__.py b/pym/portage/util/futures/executor/__init__.py
111 new file mode 100644
112 index 000000000..e69de29bb
113
114 diff --git a/pym/portage/util/futures/executor/fork.py b/pym/portage/util/futures/executor/fork.py
115 new file mode 100644
116 index 000000000..496b4e892
117 --- /dev/null
118 +++ b/pym/portage/util/futures/executor/fork.py
119 @@ -0,0 +1,134 @@
120 +# Copyright 2018 Gentoo Foundation
121 +# Distributed under the terms of the GNU General Public License v2
122 +
123 +__all__ = (
124 + 'ForkExecutor',
125 +)
126 +
127 +import collections
128 +import functools
129 +import multiprocessing
130 +import os
131 +import sys
132 +import traceback
133 +
134 +from portage.util._async.AsyncFunction import AsyncFunction
135 +from portage.util._eventloop.global_event_loop import global_event_loop
136 +
137 +
138 +class ForkExecutor(object):
139 + """
140 + An implementation of concurrent.futures.Executor that forks a
141 + new process for each task, with support for cancellation of tasks.
142 +
143 + This is entirely driven by an event loop.
144 + """
145 + def __init__(self, max_workers=None, loop=None):
146 + self._max_workers = max_workers or multiprocessing.cpu_count()
147 + self._loop = loop or global_event_loop()
148 + self._submit_queue = collections.deque()
149 + self._running_tasks = {}
150 + self._shutdown = False
151 + self._shutdown_future = self._loop.create_future()
152 +
153 + def submit(self, fn, *args, **kwargs):
154 + """Submits a callable to be executed with the given arguments.
155 +
156 + Schedules the callable to be executed as fn(*args, **kwargs) and returns
157 + a Future instance representing the execution of the callable.
158 +
159 + Returns:
160 + A Future representing the given call.
161 + """
162 + future = self._loop.create_future()
163 + proc = AsyncFunction(target=functools.partial(
164 + self._guarded_fn_call, fn, args, kwargs))
165 + self._submit_queue.append((future, proc))
166 + self._schedule()
167 + return future
168 +
169 + def _schedule(self):
170 + while (not self._shutdown and self._submit_queue and
171 + len(self._running_tasks) < self._max_workers):
172 + future, proc = self._submit_queue.popleft()
173 + future.add_done_callback(functools.partial(self._cancel_cb, proc))
174 + proc.addExitListener(functools.partial(self._proc_exit, future))
175 + proc.scheduler = self._loop
176 + proc.start()
177 + self._running_tasks[id(proc)] = proc
178 +
179 + def _cancel_cb(self, proc, future):
180 + if future.cancelled():
181 + # async, handle the rest in _proc_exit
182 + proc.cancel()
183 +
184 + @staticmethod
185 + def _guarded_fn_call(fn, args, kwargs):
186 + try:
187 + result = fn(*args, **kwargs)
188 + exception = None
189 + except Exception as e:
190 + result = None
191 + exception = _ExceptionWithTraceback(e)
192 +
193 + return result, exception
194 +
195 + def _proc_exit(self, future, proc):
196 + if not future.cancelled():
197 + if proc.returncode == os.EX_OK:
198 + result, exception = proc.result
199 + if exception is not None:
200 + future.set_exception(exception)
201 + else:
202 + future.set_result(result)
203 + else:
204 + # TODO: add special exception class for this, maybe
205 + # distinguish between kill and crash
206 + future.set_exception(
207 + Exception('pid {} crashed or killed, exitcode {}'.\
208 + format(proc.pid, proc.returncode)))
209 +
210 + del self._running_tasks[id(proc)]
211 + self._schedule()
212 + if self._shutdown and not self._running_tasks:
213 + self._shutdown_future.set_result(None)
214 +
215 + def shutdown(self, wait=True):
216 + self._shutdown = True
217 + if wait:
218 + self._loop.run_until_complete(self._shutdown_future)
219 +
220 + def __enter__(self):
221 + return self
222 +
223 + def __exit__(self, exc_type, exc_val, exc_tb):
224 + self.shutdown(wait=True)
225 + return False
226 +
227 +
228 +class _ExceptionWithTraceback:
229 + def __init__(self, exc):
230 + tb = traceback.format_exception(type(exc), exc, exc.__traceback__)
231 + tb = ''.join(tb)
232 + self.exc = exc
233 + self.tb = '\n"""\n%s"""' % tb
234 + def __reduce__(self):
235 + return _rebuild_exc, (self.exc, self.tb)
236 +
237 +
238 +class _RemoteTraceback(Exception):
239 + def __init__(self, tb):
240 + self.tb = tb
241 + def __str__(self):
242 + return self.tb
243 +
244 +
245 +def _rebuild_exc(exc, tb):
246 + exc.__cause__ = _RemoteTraceback(tb)
247 + return exc
248 +
249 +
250 +if sys.version_info < (3,):
251 + # Python 2 does not support exception chaining, so
252 + # don't bother to preserve the traceback.
253 + _ExceptionWithTraceback = lambda exc: exc