1 |
commit: 4df7a0a0c16c5ded65ad601d39840797b7704770 |
2 |
Author: Zac Medico <zmedico <AT> gentoo <DOT> org> |
3 |
AuthorDate: Sun Feb 23 21:44:58 2020 +0000 |
4 |
Commit: Zac Medico <zmedico <AT> gentoo <DOT> org> |
5 |
CommitDate: Mon Feb 24 02:35:15 2020 +0000 |
6 |
URL: https://gitweb.gentoo.org/proj/portage.git/commit/?id=4df7a0a0 |
7 |
|
8 |
ForkExecutor: use async_start method |
9 |
|
10 |
Also, fix AsynchronousTask.async_start to handle cancellation of the |
11 |
_async_start coroutine, ensuring that start and exit listeners are |
12 |
notified in this case (otherwise RetryForkExecutorTestCase will hang). |
13 |
|
14 |
Signed-off-by: Zac Medico <zmedico <AT> gentoo.org> |
15 |
|
16 |
lib/_emerge/AsynchronousTask.py | 15 +++++++++++++-- |
17 |
lib/portage/util/futures/executor/fork.py | 21 ++++++++++++++++++--- |
18 |
2 files changed, 31 insertions(+), 5 deletions(-) |
19 |
|
20 |
diff --git a/lib/_emerge/AsynchronousTask.py b/lib/_emerge/AsynchronousTask.py |
21 |
index d1e23cdf1..1e9e177cb 100644 |
22 |
--- a/lib/_emerge/AsynchronousTask.py |
23 |
+++ b/lib/_emerge/AsynchronousTask.py |
24 |
@@ -25,8 +25,19 @@ class AsynchronousTask(SlotObject): |
25 |
|
26 |
@coroutine |
27 |
def async_start(self): |
28 |
- yield self._async_start() |
29 |
- self._start_hook() |
30 |
+ try: |
31 |
+ if self._was_cancelled(): |
32 |
+ raise asyncio.CancelledError |
33 |
+ yield self._async_start() |
34 |
+ if self._was_cancelled(): |
35 |
+ raise asyncio.CancelledError |
36 |
+ except asyncio.CancelledError: |
37 |
+ self.cancel() |
38 |
+ self._was_cancelled() |
39 |
+ self._async_wait() |
40 |
+ raise |
41 |
+ finally: |
42 |
+ self._start_hook() |
43 |
|
44 |
@coroutine |
45 |
def _async_start(self): |
46 |
|
47 |
diff --git a/lib/portage/util/futures/executor/fork.py b/lib/portage/util/futures/executor/fork.py |
48 |
index add7b3c9e..3549fdb31 100644 |
49 |
--- a/lib/portage/util/futures/executor/fork.py |
50 |
+++ b/lib/portage/util/futures/executor/fork.py |
51 |
@@ -13,6 +13,7 @@ import traceback |
52 |
|
53 |
from portage.util._async.AsyncFunction import AsyncFunction |
54 |
from portage.util.futures import asyncio |
55 |
+from portage.util.futures.compat_coroutine import coroutine |
56 |
from portage.util.cpuinfo import get_cpu_count |
57 |
|
58 |
|
59 |
@@ -51,11 +52,25 @@ class ForkExecutor(object): |
60 |
while (not self._shutdown and self._submit_queue and |
61 |
len(self._running_tasks) < self._max_workers): |
62 |
future, proc = self._submit_queue.popleft() |
63 |
- future.add_done_callback(functools.partial(self._cancel_cb, proc)) |
64 |
- proc.addExitListener(functools.partial(self._proc_exit, future)) |
65 |
proc.scheduler = self._loop |
66 |
- proc.start() |
67 |
self._running_tasks[id(proc)] = proc |
68 |
+ future.add_done_callback(functools.partial(self._cancel_cb, proc)) |
69 |
+ proc_future = asyncio.ensure_future(self._proc_coroutine(proc), loop=self._loop) |
70 |
+ proc_future.add_done_callback(functools.partial(self._proc_coroutine_done, future, proc)) |
71 |
+ |
72 |
+ @coroutine |
73 |
+ def _proc_coroutine(self, proc): |
74 |
+ yield proc.async_start() |
75 |
+ yield proc.async_wait() |
76 |
+ |
77 |
+ def _proc_coroutine_done(self, future, proc, proc_future): |
78 |
+ try: |
79 |
+ proc_future.result() |
80 |
+ except asyncio.CancelledError: |
81 |
+ future.done() or future.cancel() |
82 |
+ if proc.poll() is None: |
83 |
+ proc.cancel() |
84 |
+ self._proc_exit(future, proc) |
85 |
|
86 |
def _cancel_cb(self, proc, future): |
87 |
if future.cancelled(): |