1 |
lgtm. |
2 |
|
3 |
On Sat, Mar 31, 2018 at 10:46 PM, Zac Medico <zmedico@g.o> wrote: |
4 |
|
5 |
> This is useful for asynchronous operations that we might |
6 |
> need to cancel if they take too long, since (concurrent. |
7 |
> futures.ProcessPoolExecutor tasks are not cancellable). |
8 |
> This ability to cancel tasks makes this executor useful |
9 |
> as an alternative to portage.exception.AlarmSignal. |
10 |
> |
11 |
> Also add an asyncio-compatible EventLoop.run_in_executor |
12 |
> method the uses ForkExecutor as the default executor, |
13 |
> which will later be used to implement the corresponding |
14 |
> asyncio.AbstractEventLoop run_in_executor method. |
15 |
> |
16 |
> Bug: https://bugs.gentoo.org/649588 |
17 |
> --- |
18 |
> pym/portage/util/_eventloop/EventLoop.py | 45 ++++++++- |
19 |
> pym/portage/util/futures/executor/__init__.py | 0 |
20 |
> pym/portage/util/futures/executor/fork.py | 130 |
21 |
> ++++++++++++++++++++++++++ |
22 |
> 3 files changed, 174 insertions(+), 1 deletion(-) |
23 |
> create mode 100644 pym/portage/util/futures/executor/__init__.py |
24 |
> create mode 100644 pym/portage/util/futures/executor/fork.py |
25 |
> |
26 |
> diff --git a/pym/portage/util/_eventloop/EventLoop.py |
27 |
> b/pym/portage/util/_eventloop/EventLoop.py |
28 |
> index f472a3dae..1574a6837 100644 |
29 |
> --- a/pym/portage/util/_eventloop/EventLoop.py |
30 |
> +++ b/pym/portage/util/_eventloop/EventLoop.py |
31 |
> @@ -24,6 +24,7 @@ except ImportError: |
32 |
> import portage |
33 |
> portage.proxy.lazyimport.lazyimport(globals(), |
34 |
> 'portage.util.futures.futures:_EventLoopFuture', |
35 |
> + 'portage.util.futures.executor.fork:ForkExecutor', |
36 |
> ) |
37 |
> |
38 |
> from portage import OrderedDict |
39 |
> @@ -122,6 +123,7 @@ class EventLoop(object): |
40 |
> self._idle_callbacks = OrderedDict() |
41 |
> self._timeout_handlers = {} |
42 |
> self._timeout_interval = None |
43 |
> + self._default_executor = None |
44 |
> |
45 |
> self._poll_obj = None |
46 |
> try: |
47 |
> @@ -721,6 +723,46 @@ class EventLoop(object): |
48 |
> return self._handle(self.timeout_add( |
49 |
> delay * 1000, self._call_soon_callback(callback, |
50 |
> args)), self) |
51 |
> |
52 |
> + def run_in_executor(self, executor, func, *args): |
53 |
> + """ |
54 |
> + Arrange for a func to be called in the specified executor. |
55 |
> + |
56 |
> + The executor argument should be an Executor instance. The |
57 |
> default |
58 |
> + executor is used if executor is None. |
59 |
> + |
60 |
> + Use functools.partial to pass keywords to the *func*. |
61 |
> + |
62 |
> + @param executor: executor |
63 |
> + @type executor: concurrent.futures.Executor or None |
64 |
> + @param func: a function to call |
65 |
> + @type func: callable |
66 |
> + @return: a Future |
67 |
> + @rtype: asyncio.Future (or compatible) |
68 |
> + """ |
69 |
> + if executor is None: |
70 |
> + executor = self._default_executor |
71 |
> + if executor is None: |
72 |
> + executor = ForkExecutor(loop=self) |
73 |
> + self._default_executor = executor |
74 |
> + return executor.submit(func, *args) |
75 |
> + |
76 |
> + def close(self): |
77 |
> + """Close the event loop. |
78 |
> + |
79 |
> + This clears the queues and shuts down the executor, |
80 |
> + and waits for it to finish. |
81 |
> + """ |
82 |
> + executor = self._default_executor |
83 |
> + if executor is not None: |
84 |
> + self._default_executor = None |
85 |
> + executor.shutdown(wait=True) |
86 |
> + |
87 |
> + if self._poll_obj is not None: |
88 |
> + close = getattr(self._poll_obj, 'close') |
89 |
> + if close is not None: |
90 |
> + close() |
91 |
> + self._poll_obj = None |
92 |
> + |
93 |
> |
94 |
> _can_poll_device = None |
95 |
> |
96 |
> @@ -782,10 +824,11 @@ class _epoll_adapter(object): |
97 |
> that is associated with an epoll instance will close automatically |
98 |
> when |
99 |
> it is garbage collected, so it's not necessary to close it |
100 |
> explicitly. |
101 |
> """ |
102 |
> - __slots__ = ('_epoll_obj',) |
103 |
> + __slots__ = ('_epoll_obj', 'close') |
104 |
> |
105 |
> def __init__(self, epoll_obj): |
106 |
> self._epoll_obj = epoll_obj |
107 |
> + self.close = epoll_obj.close |
108 |
> |
109 |
> def register(self, fd, *args): |
110 |
> self._epoll_obj.register(fd, *args) |
111 |
> diff --git a/pym/portage/util/futures/executor/__init__.py |
112 |
> b/pym/portage/util/futures/executor/__init__.py |
113 |
> new file mode 100644 |
114 |
> index 000000000..e69de29bb |
115 |
> diff --git a/pym/portage/util/futures/executor/fork.py |
116 |
> b/pym/portage/util/futures/executor/fork.py |
117 |
> new file mode 100644 |
118 |
> index 000000000..9cd1db2ca |
119 |
> --- /dev/null |
120 |
> +++ b/pym/portage/util/futures/executor/fork.py |
121 |
> @@ -0,0 +1,130 @@ |
122 |
> +# Copyright 2018 Gentoo Foundation |
123 |
> +# Distributed under the terms of the GNU General Public License v2 |
124 |
> + |
125 |
> +import collections |
126 |
> +import functools |
127 |
> +import multiprocessing |
128 |
> +import os |
129 |
> +import sys |
130 |
> +import traceback |
131 |
> + |
132 |
> +from portage.util._async.AsyncFunction import AsyncFunction |
133 |
> +from portage.util._eventloop.global_event_loop import global_event_loop |
134 |
> + |
135 |
> + |
136 |
> +class ForkExecutor(object): |
137 |
> + """ |
138 |
> + An implementation of concurrent.futures.Executor that forks a |
139 |
> + new process for each task, with support for cancellation of tasks. |
140 |
> + |
141 |
> + This is entirely driven by an event loop. |
142 |
> + """ |
143 |
> + def __init__(self, max_workers=None, loop=None): |
144 |
> + self._max_workers = max_workers or |
145 |
> multiprocessing.cpu_count() |
146 |
> + self._loop = loop or global_event_loop() |
147 |
> + self._submit_queue = collections.deque() |
148 |
> + self._running_tasks = {} |
149 |
> + self._shutdown = False |
150 |
> + self._shutdown_future = self._loop.create_future() |
151 |
> + |
152 |
> + def submit(self, fn, *args, **kwargs): |
153 |
> + """Submits a callable to be executed with the given |
154 |
> arguments. |
155 |
> + |
156 |
> + Schedules the callable to be executed as fn(*args, |
157 |
> **kwargs) and returns |
158 |
> + a Future instance representing the execution of the |
159 |
> callable. |
160 |
> + |
161 |
> + Returns: |
162 |
> + A Future representing the given call. |
163 |
> + """ |
164 |
> + future = self._loop.create_future() |
165 |
> + proc = AsyncFunction(target=functools.partial( |
166 |
> + self._guarded_fn_call, fn, args, kwargs)) |
167 |
> + self._submit_queue.append((future, proc)) |
168 |
> + self._schedule() |
169 |
> + return future |
170 |
> + |
171 |
> + def _schedule(self): |
172 |
> + while (not self._shutdown and self._submit_queue and |
173 |
> + len(self._running_tasks) < self._max_workers): |
174 |
> + future, proc = self._submit_queue.popleft() |
175 |
> + future.add_done_callback(functools.partial(self._cancel_cb, |
176 |
> proc)) |
177 |
> + proc.addExitListener(functools.partial(self._proc_exit, |
178 |
> future)) |
179 |
> + proc.scheduler = self._loop |
180 |
> + proc.start() |
181 |
> + self._running_tasks[id(proc)] = proc |
182 |
> + |
183 |
> + def _cancel_cb(self, proc, future): |
184 |
> + if future.cancelled(): |
185 |
> + # async, handle the rest in _proc_exit |
186 |
> + proc.cancel() |
187 |
> + |
188 |
> + @staticmethod |
189 |
> + def _guarded_fn_call(fn, args, kwargs): |
190 |
> + try: |
191 |
> + result = fn(*args, **kwargs) |
192 |
> + exception = None |
193 |
> + except Exception as e: |
194 |
> + result = None |
195 |
> + exception = _ExceptionWithTraceback(e) |
196 |
> + |
197 |
> + return result, exception |
198 |
> + |
199 |
> + def _proc_exit(self, future, proc): |
200 |
> + if not future.cancelled(): |
201 |
> + if proc.returncode == os.EX_OK: |
202 |
> + result, exception = proc.result |
203 |
> + if exception is not None: |
204 |
> + future.set_exception(exception) |
205 |
> + else: |
206 |
> + future.set_result(result) |
207 |
> + else: |
208 |
> + # TODO: add special exception class for |
209 |
> this, maybe |
210 |
> + # distinguish between kill and crash |
211 |
> + future.set_exception( |
212 |
> + Exception('pid {} crashed or |
213 |
> killed, exitcode {}'.\ |
214 |
> + format(proc.pid, |
215 |
> proc.returncode))) |
216 |
> + |
217 |
> + del self._running_tasks[id(proc)] |
218 |
> + self._schedule() |
219 |
> + if self._shutdown and not self._running_tasks: |
220 |
> + self._shutdown_future.set_result(None) |
221 |
> + |
222 |
> + def shutdown(self, wait=True): |
223 |
> + self._shutdown = True |
224 |
> + if wait: |
225 |
> + self._loop.run_until_complete( |
226 |
> self._shutdown_future) |
227 |
> + |
228 |
> + def __enter__(self): |
229 |
> + return self |
230 |
> + |
231 |
> + def __exit__(self, exc_type, exc_val, exc_tb): |
232 |
> + self.shutdown(wait=True) |
233 |
> + return False |
234 |
> + |
235 |
> + |
236 |
> +class _ExceptionWithTraceback: |
237 |
> + def __init__(self, exc): |
238 |
> + tb = traceback.format_exception(type(exc), exc, |
239 |
> exc.__traceback__) |
240 |
> + tb = ''.join(tb) |
241 |
> + self.exc = exc |
242 |
> + self.tb = '\n"""\n%s"""' % tb |
243 |
> + def __reduce__(self): |
244 |
> + return _rebuild_exc, (self.exc, self.tb) |
245 |
> + |
246 |
> + |
247 |
> +class _RemoteTraceback(Exception): |
248 |
> + def __init__(self, tb): |
249 |
> + self.tb = tb |
250 |
> + def __str__(self): |
251 |
> + return self.tb |
252 |
> + |
253 |
> + |
254 |
> +def _rebuild_exc(exc, tb): |
255 |
> + exc.__cause__ = _RemoteTraceback(tb) |
256 |
> + return exc |
257 |
> + |
258 |
> + |
259 |
> +if sys.version_info < (3,): |
260 |
> + # Python 2 does not support exception chaining, so |
261 |
> + # don't bother to preserve the traceback. |
262 |
> + _ExceptionWithTraceback = lambda exc: exc |
263 |
> -- |
264 |
> 2.13.6 |
265 |
> |
266 |
> |
267 |
> |