Gentoo Archives: gentoo-portage-dev

From: Alec Warner <antarus@g.o>
To: gentoo-portage-dev@l.g.o
Cc: Zac Medico <zmedico@g.o>
Subject: Re: [gentoo-portage-dev] [PATCH 1/4] Add ForkExecutor (bug 649588)
Date: Sun, 01 Apr 2018 14:27:06
Message-Id: CAAr7Pr_L0o4_Ynwc+g2hbr+ds_7NMrOKdxEWrZuayaT2Eomg5A@mail.gmail.com
In Reply to: [gentoo-portage-dev] [PATCH 1/4] Add ForkExecutor (bug 649588) by Zac Medico
1 lgtm.
2
3 On Sat, Mar 31, 2018 at 10:46 PM, Zac Medico <zmedico@g.o> wrote:
4
5 > This is useful for asynchronous operations that we might
6 > need to cancel if they take too long, since (concurrent.
7 > futures.ProcessPoolExecutor tasks are not cancellable).
8 > This ability to cancel tasks makes this executor useful
9 > as an alternative to portage.exception.AlarmSignal.
10 >
11 > Also add an asyncio-compatible EventLoop.run_in_executor
12 > method the uses ForkExecutor as the default executor,
13 > which will later be used to implement the corresponding
14 > asyncio.AbstractEventLoop run_in_executor method.
15 >
16 > Bug: https://bugs.gentoo.org/649588
17 > ---
18 > pym/portage/util/_eventloop/EventLoop.py | 45 ++++++++-
19 > pym/portage/util/futures/executor/__init__.py | 0
20 > pym/portage/util/futures/executor/fork.py | 130
21 > ++++++++++++++++++++++++++
22 > 3 files changed, 174 insertions(+), 1 deletion(-)
23 > create mode 100644 pym/portage/util/futures/executor/__init__.py
24 > create mode 100644 pym/portage/util/futures/executor/fork.py
25 >
26 > diff --git a/pym/portage/util/_eventloop/EventLoop.py
27 > b/pym/portage/util/_eventloop/EventLoop.py
28 > index f472a3dae..1574a6837 100644
29 > --- a/pym/portage/util/_eventloop/EventLoop.py
30 > +++ b/pym/portage/util/_eventloop/EventLoop.py
31 > @@ -24,6 +24,7 @@ except ImportError:
32 > import portage
33 > portage.proxy.lazyimport.lazyimport(globals(),
34 > 'portage.util.futures.futures:_EventLoopFuture',
35 > + 'portage.util.futures.executor.fork:ForkExecutor',
36 > )
37 >
38 > from portage import OrderedDict
39 > @@ -122,6 +123,7 @@ class EventLoop(object):
40 > self._idle_callbacks = OrderedDict()
41 > self._timeout_handlers = {}
42 > self._timeout_interval = None
43 > + self._default_executor = None
44 >
45 > self._poll_obj = None
46 > try:
47 > @@ -721,6 +723,46 @@ class EventLoop(object):
48 > return self._handle(self.timeout_add(
49 > delay * 1000, self._call_soon_callback(callback,
50 > args)), self)
51 >
52 > + def run_in_executor(self, executor, func, *args):
53 > + """
54 > + Arrange for a func to be called in the specified executor.
55 > +
56 > + The executor argument should be an Executor instance. The
57 > default
58 > + executor is used if executor is None.
59 > +
60 > + Use functools.partial to pass keywords to the *func*.
61 > +
62 > + @param executor: executor
63 > + @type executor: concurrent.futures.Executor or None
64 > + @param func: a function to call
65 > + @type func: callable
66 > + @return: a Future
67 > + @rtype: asyncio.Future (or compatible)
68 > + """
69 > + if executor is None:
70 > + executor = self._default_executor
71 > + if executor is None:
72 > + executor = ForkExecutor(loop=self)
73 > + self._default_executor = executor
74 > + return executor.submit(func, *args)
75 > +
76 > + def close(self):
77 > + """Close the event loop.
78 > +
79 > + This clears the queues and shuts down the executor,
80 > + and waits for it to finish.
81 > + """
82 > + executor = self._default_executor
83 > + if executor is not None:
84 > + self._default_executor = None
85 > + executor.shutdown(wait=True)
86 > +
87 > + if self._poll_obj is not None:
88 > + close = getattr(self._poll_obj, 'close')
89 > + if close is not None:
90 > + close()
91 > + self._poll_obj = None
92 > +
93 >
94 > _can_poll_device = None
95 >
96 > @@ -782,10 +824,11 @@ class _epoll_adapter(object):
97 > that is associated with an epoll instance will close automatically
98 > when
99 > it is garbage collected, so it's not necessary to close it
100 > explicitly.
101 > """
102 > - __slots__ = ('_epoll_obj',)
103 > + __slots__ = ('_epoll_obj', 'close')
104 >
105 > def __init__(self, epoll_obj):
106 > self._epoll_obj = epoll_obj
107 > + self.close = epoll_obj.close
108 >
109 > def register(self, fd, *args):
110 > self._epoll_obj.register(fd, *args)
111 > diff --git a/pym/portage/util/futures/executor/__init__.py
112 > b/pym/portage/util/futures/executor/__init__.py
113 > new file mode 100644
114 > index 000000000..e69de29bb
115 > diff --git a/pym/portage/util/futures/executor/fork.py
116 > b/pym/portage/util/futures/executor/fork.py
117 > new file mode 100644
118 > index 000000000..9cd1db2ca
119 > --- /dev/null
120 > +++ b/pym/portage/util/futures/executor/fork.py
121 > @@ -0,0 +1,130 @@
122 > +# Copyright 2018 Gentoo Foundation
123 > +# Distributed under the terms of the GNU General Public License v2
124 > +
125 > +import collections
126 > +import functools
127 > +import multiprocessing
128 > +import os
129 > +import sys
130 > +import traceback
131 > +
132 > +from portage.util._async.AsyncFunction import AsyncFunction
133 > +from portage.util._eventloop.global_event_loop import global_event_loop
134 > +
135 > +
136 > +class ForkExecutor(object):
137 > + """
138 > + An implementation of concurrent.futures.Executor that forks a
139 > + new process for each task, with support for cancellation of tasks.
140 > +
141 > + This is entirely driven by an event loop.
142 > + """
143 > + def __init__(self, max_workers=None, loop=None):
144 > + self._max_workers = max_workers or
145 > multiprocessing.cpu_count()
146 > + self._loop = loop or global_event_loop()
147 > + self._submit_queue = collections.deque()
148 > + self._running_tasks = {}
149 > + self._shutdown = False
150 > + self._shutdown_future = self._loop.create_future()
151 > +
152 > + def submit(self, fn, *args, **kwargs):
153 > + """Submits a callable to be executed with the given
154 > arguments.
155 > +
156 > + Schedules the callable to be executed as fn(*args,
157 > **kwargs) and returns
158 > + a Future instance representing the execution of the
159 > callable.
160 > +
161 > + Returns:
162 > + A Future representing the given call.
163 > + """
164 > + future = self._loop.create_future()
165 > + proc = AsyncFunction(target=functools.partial(
166 > + self._guarded_fn_call, fn, args, kwargs))
167 > + self._submit_queue.append((future, proc))
168 > + self._schedule()
169 > + return future
170 > +
171 > + def _schedule(self):
172 > + while (not self._shutdown and self._submit_queue and
173 > + len(self._running_tasks) < self._max_workers):
174 > + future, proc = self._submit_queue.popleft()
175 > + future.add_done_callback(functools.partial(self._cancel_cb,
176 > proc))
177 > + proc.addExitListener(functools.partial(self._proc_exit,
178 > future))
179 > + proc.scheduler = self._loop
180 > + proc.start()
181 > + self._running_tasks[id(proc)] = proc
182 > +
183 > + def _cancel_cb(self, proc, future):
184 > + if future.cancelled():
185 > + # async, handle the rest in _proc_exit
186 > + proc.cancel()
187 > +
188 > + @staticmethod
189 > + def _guarded_fn_call(fn, args, kwargs):
190 > + try:
191 > + result = fn(*args, **kwargs)
192 > + exception = None
193 > + except Exception as e:
194 > + result = None
195 > + exception = _ExceptionWithTraceback(e)
196 > +
197 > + return result, exception
198 > +
199 > + def _proc_exit(self, future, proc):
200 > + if not future.cancelled():
201 > + if proc.returncode == os.EX_OK:
202 > + result, exception = proc.result
203 > + if exception is not None:
204 > + future.set_exception(exception)
205 > + else:
206 > + future.set_result(result)
207 > + else:
208 > + # TODO: add special exception class for
209 > this, maybe
210 > + # distinguish between kill and crash
211 > + future.set_exception(
212 > + Exception('pid {} crashed or
213 > killed, exitcode {}'.\
214 > + format(proc.pid,
215 > proc.returncode)))
216 > +
217 > + del self._running_tasks[id(proc)]
218 > + self._schedule()
219 > + if self._shutdown and not self._running_tasks:
220 > + self._shutdown_future.set_result(None)
221 > +
222 > + def shutdown(self, wait=True):
223 > + self._shutdown = True
224 > + if wait:
225 > + self._loop.run_until_complete(
226 > self._shutdown_future)
227 > +
228 > + def __enter__(self):
229 > + return self
230 > +
231 > + def __exit__(self, exc_type, exc_val, exc_tb):
232 > + self.shutdown(wait=True)
233 > + return False
234 > +
235 > +
236 > +class _ExceptionWithTraceback:
237 > + def __init__(self, exc):
238 > + tb = traceback.format_exception(type(exc), exc,
239 > exc.__traceback__)
240 > + tb = ''.join(tb)
241 > + self.exc = exc
242 > + self.tb = '\n"""\n%s"""' % tb
243 > + def __reduce__(self):
244 > + return _rebuild_exc, (self.exc, self.tb)
245 > +
246 > +
247 > +class _RemoteTraceback(Exception):
248 > + def __init__(self, tb):
249 > + self.tb = tb
250 > + def __str__(self):
251 > + return self.tb
252 > +
253 > +
254 > +def _rebuild_exc(exc, tb):
255 > + exc.__cause__ = _RemoteTraceback(tb)
256 > + return exc
257 > +
258 > +
259 > +if sys.version_info < (3,):
260 > + # Python 2 does not support exception chaining, so
261 > + # don't bother to preserve the traceback.
262 > + _ExceptionWithTraceback = lambda exc: exc
263 > --
264 > 2.13.6
265 >
266 >
267 >