Gentoo Archives: gentoo-portage-dev

From: Zac Medico <zmedico@g.o>
To: gentoo-portage-dev@l.g.o
Cc: Zac Medico <zmedico@g.o>
Subject: [gentoo-portage-dev] [PATCH 1/4] Add ForkExecutor (bug 649588)
Date: Sun, 01 Apr 2018 02:51:57
Message-Id: 20180401024631.31017-2-zmedico@gentoo.org
In Reply to: [gentoo-portage-dev] [PATCH 0/4] rsync: add key refresh retry (bug 649276) by Zac Medico
1 This is useful for asynchronous operations that we might
2 need to cancel if they take too long, since (concurrent.
3 futures.ProcessPoolExecutor tasks are not cancellable).
4 This ability to cancel tasks makes this executor useful
5 as an alternative to portage.exception.AlarmSignal.
6
7 Also add an asyncio-compatible EventLoop.run_in_executor
8 method the uses ForkExecutor as the default executor,
9 which will later be used to implement the corresponding
10 asyncio.AbstractEventLoop run_in_executor method.
11
12 Bug: https://bugs.gentoo.org/649588
13 ---
14 pym/portage/util/_eventloop/EventLoop.py | 45 ++++++++-
15 pym/portage/util/futures/executor/__init__.py | 0
16 pym/portage/util/futures/executor/fork.py | 130 ++++++++++++++++++++++++++
17 3 files changed, 174 insertions(+), 1 deletion(-)
18 create mode 100644 pym/portage/util/futures/executor/__init__.py
19 create mode 100644 pym/portage/util/futures/executor/fork.py
20
21 diff --git a/pym/portage/util/_eventloop/EventLoop.py b/pym/portage/util/_eventloop/EventLoop.py
22 index f472a3dae..1574a6837 100644
23 --- a/pym/portage/util/_eventloop/EventLoop.py
24 +++ b/pym/portage/util/_eventloop/EventLoop.py
25 @@ -24,6 +24,7 @@ except ImportError:
26 import portage
27 portage.proxy.lazyimport.lazyimport(globals(),
28 'portage.util.futures.futures:_EventLoopFuture',
29 + 'portage.util.futures.executor.fork:ForkExecutor',
30 )
31
32 from portage import OrderedDict
33 @@ -122,6 +123,7 @@ class EventLoop(object):
34 self._idle_callbacks = OrderedDict()
35 self._timeout_handlers = {}
36 self._timeout_interval = None
37 + self._default_executor = None
38
39 self._poll_obj = None
40 try:
41 @@ -721,6 +723,46 @@ class EventLoop(object):
42 return self._handle(self.timeout_add(
43 delay * 1000, self._call_soon_callback(callback, args)), self)
44
45 + def run_in_executor(self, executor, func, *args):
46 + """
47 + Arrange for a func to be called in the specified executor.
48 +
49 + The executor argument should be an Executor instance. The default
50 + executor is used if executor is None.
51 +
52 + Use functools.partial to pass keywords to the *func*.
53 +
54 + @param executor: executor
55 + @type executor: concurrent.futures.Executor or None
56 + @param func: a function to call
57 + @type func: callable
58 + @return: a Future
59 + @rtype: asyncio.Future (or compatible)
60 + """
61 + if executor is None:
62 + executor = self._default_executor
63 + if executor is None:
64 + executor = ForkExecutor(loop=self)
65 + self._default_executor = executor
66 + return executor.submit(func, *args)
67 +
68 + def close(self):
69 + """Close the event loop.
70 +
71 + This clears the queues and shuts down the executor,
72 + and waits for it to finish.
73 + """
74 + executor = self._default_executor
75 + if executor is not None:
76 + self._default_executor = None
77 + executor.shutdown(wait=True)
78 +
79 + if self._poll_obj is not None:
80 + close = getattr(self._poll_obj, 'close')
81 + if close is not None:
82 + close()
83 + self._poll_obj = None
84 +
85
86 _can_poll_device = None
87
88 @@ -782,10 +824,11 @@ class _epoll_adapter(object):
89 that is associated with an epoll instance will close automatically when
90 it is garbage collected, so it's not necessary to close it explicitly.
91 """
92 - __slots__ = ('_epoll_obj',)
93 + __slots__ = ('_epoll_obj', 'close')
94
95 def __init__(self, epoll_obj):
96 self._epoll_obj = epoll_obj
97 + self.close = epoll_obj.close
98
99 def register(self, fd, *args):
100 self._epoll_obj.register(fd, *args)
101 diff --git a/pym/portage/util/futures/executor/__init__.py b/pym/portage/util/futures/executor/__init__.py
102 new file mode 100644
103 index 000000000..e69de29bb
104 diff --git a/pym/portage/util/futures/executor/fork.py b/pym/portage/util/futures/executor/fork.py
105 new file mode 100644
106 index 000000000..9cd1db2ca
107 --- /dev/null
108 +++ b/pym/portage/util/futures/executor/fork.py
109 @@ -0,0 +1,130 @@
110 +# Copyright 2018 Gentoo Foundation
111 +# Distributed under the terms of the GNU General Public License v2
112 +
113 +import collections
114 +import functools
115 +import multiprocessing
116 +import os
117 +import sys
118 +import traceback
119 +
120 +from portage.util._async.AsyncFunction import AsyncFunction
121 +from portage.util._eventloop.global_event_loop import global_event_loop
122 +
123 +
124 +class ForkExecutor(object):
125 + """
126 + An implementation of concurrent.futures.Executor that forks a
127 + new process for each task, with support for cancellation of tasks.
128 +
129 + This is entirely driven by an event loop.
130 + """
131 + def __init__(self, max_workers=None, loop=None):
132 + self._max_workers = max_workers or multiprocessing.cpu_count()
133 + self._loop = loop or global_event_loop()
134 + self._submit_queue = collections.deque()
135 + self._running_tasks = {}
136 + self._shutdown = False
137 + self._shutdown_future = self._loop.create_future()
138 +
139 + def submit(self, fn, *args, **kwargs):
140 + """Submits a callable to be executed with the given arguments.
141 +
142 + Schedules the callable to be executed as fn(*args, **kwargs) and returns
143 + a Future instance representing the execution of the callable.
144 +
145 + Returns:
146 + A Future representing the given call.
147 + """
148 + future = self._loop.create_future()
149 + proc = AsyncFunction(target=functools.partial(
150 + self._guarded_fn_call, fn, args, kwargs))
151 + self._submit_queue.append((future, proc))
152 + self._schedule()
153 + return future
154 +
155 + def _schedule(self):
156 + while (not self._shutdown and self._submit_queue and
157 + len(self._running_tasks) < self._max_workers):
158 + future, proc = self._submit_queue.popleft()
159 + future.add_done_callback(functools.partial(self._cancel_cb, proc))
160 + proc.addExitListener(functools.partial(self._proc_exit, future))
161 + proc.scheduler = self._loop
162 + proc.start()
163 + self._running_tasks[id(proc)] = proc
164 +
165 + def _cancel_cb(self, proc, future):
166 + if future.cancelled():
167 + # async, handle the rest in _proc_exit
168 + proc.cancel()
169 +
170 + @staticmethod
171 + def _guarded_fn_call(fn, args, kwargs):
172 + try:
173 + result = fn(*args, **kwargs)
174 + exception = None
175 + except Exception as e:
176 + result = None
177 + exception = _ExceptionWithTraceback(e)
178 +
179 + return result, exception
180 +
181 + def _proc_exit(self, future, proc):
182 + if not future.cancelled():
183 + if proc.returncode == os.EX_OK:
184 + result, exception = proc.result
185 + if exception is not None:
186 + future.set_exception(exception)
187 + else:
188 + future.set_result(result)
189 + else:
190 + # TODO: add special exception class for this, maybe
191 + # distinguish between kill and crash
192 + future.set_exception(
193 + Exception('pid {} crashed or killed, exitcode {}'.\
194 + format(proc.pid, proc.returncode)))
195 +
196 + del self._running_tasks[id(proc)]
197 + self._schedule()
198 + if self._shutdown and not self._running_tasks:
199 + self._shutdown_future.set_result(None)
200 +
201 + def shutdown(self, wait=True):
202 + self._shutdown = True
203 + if wait:
204 + self._loop.run_until_complete(self._shutdown_future)
205 +
206 + def __enter__(self):
207 + return self
208 +
209 + def __exit__(self, exc_type, exc_val, exc_tb):
210 + self.shutdown(wait=True)
211 + return False
212 +
213 +
214 +class _ExceptionWithTraceback:
215 + def __init__(self, exc):
216 + tb = traceback.format_exception(type(exc), exc, exc.__traceback__)
217 + tb = ''.join(tb)
218 + self.exc = exc
219 + self.tb = '\n"""\n%s"""' % tb
220 + def __reduce__(self):
221 + return _rebuild_exc, (self.exc, self.tb)
222 +
223 +
224 +class _RemoteTraceback(Exception):
225 + def __init__(self, tb):
226 + self.tb = tb
227 + def __str__(self):
228 + return self.tb
229 +
230 +
231 +def _rebuild_exc(exc, tb):
232 + exc.__cause__ = _RemoteTraceback(tb)
233 + return exc
234 +
235 +
236 +if sys.version_info < (3,):
237 + # Python 2 does not support exception chaining, so
238 + # don't bother to preserve the traceback.
239 + _ExceptionWithTraceback = lambda exc: exc
240 --
241 2.13.6

Replies