Gentoo Archives: gentoo-commits

From: Zac Medico <zmedico@g.o>
To: gentoo-commits@l.g.o
Subject: [gentoo-commits] proj/portage:master commit in: lib/_emerge/, lib/portage/util/futures/executor/
Date: Mon, 24 Feb 2020 06:07:38
Message-Id: 1582511715.4df7a0a0c16c5ded65ad601d39840797b7704770.zmedico@gentoo
1 commit: 4df7a0a0c16c5ded65ad601d39840797b7704770
2 Author: Zac Medico <zmedico <AT> gentoo <DOT> org>
3 AuthorDate: Sun Feb 23 21:44:58 2020 +0000
4 Commit: Zac Medico <zmedico <AT> gentoo <DOT> org>
5 CommitDate: Mon Feb 24 02:35:15 2020 +0000
6 URL: https://gitweb.gentoo.org/proj/portage.git/commit/?id=4df7a0a0
7
8 ForkExecutor: use async_start method
9
10 Also, fix AsynchronousTask.async_start to handle cancellation of the
11 _async_start coroutine, ensuring that start and exit listeners are
12 notified in this case (otherwise RetryForkExecutorTestCase will hang).
13
14 Signed-off-by: Zac Medico <zmedico <AT> gentoo.org>
15
16 lib/_emerge/AsynchronousTask.py | 15 +++++++++++++--
17 lib/portage/util/futures/executor/fork.py | 21 ++++++++++++++++++---
18 2 files changed, 31 insertions(+), 5 deletions(-)
19
20 diff --git a/lib/_emerge/AsynchronousTask.py b/lib/_emerge/AsynchronousTask.py
21 index d1e23cdf1..1e9e177cb 100644
22 --- a/lib/_emerge/AsynchronousTask.py
23 +++ b/lib/_emerge/AsynchronousTask.py
24 @@ -25,8 +25,19 @@ class AsynchronousTask(SlotObject):
25
26 @coroutine
27 def async_start(self):
28 - yield self._async_start()
29 - self._start_hook()
30 + try:
31 + if self._was_cancelled():
32 + raise asyncio.CancelledError
33 + yield self._async_start()
34 + if self._was_cancelled():
35 + raise asyncio.CancelledError
36 + except asyncio.CancelledError:
37 + self.cancel()
38 + self._was_cancelled()
39 + self._async_wait()
40 + raise
41 + finally:
42 + self._start_hook()
43
44 @coroutine
45 def _async_start(self):
46
47 diff --git a/lib/portage/util/futures/executor/fork.py b/lib/portage/util/futures/executor/fork.py
48 index add7b3c9e..3549fdb31 100644
49 --- a/lib/portage/util/futures/executor/fork.py
50 +++ b/lib/portage/util/futures/executor/fork.py
51 @@ -13,6 +13,7 @@ import traceback
52
53 from portage.util._async.AsyncFunction import AsyncFunction
54 from portage.util.futures import asyncio
55 +from portage.util.futures.compat_coroutine import coroutine
56 from portage.util.cpuinfo import get_cpu_count
57
58
59 @@ -51,11 +52,25 @@ class ForkExecutor(object):
60 while (not self._shutdown and self._submit_queue and
61 len(self._running_tasks) < self._max_workers):
62 future, proc = self._submit_queue.popleft()
63 - future.add_done_callback(functools.partial(self._cancel_cb, proc))
64 - proc.addExitListener(functools.partial(self._proc_exit, future))
65 proc.scheduler = self._loop
66 - proc.start()
67 self._running_tasks[id(proc)] = proc
68 + future.add_done_callback(functools.partial(self._cancel_cb, proc))
69 + proc_future = asyncio.ensure_future(self._proc_coroutine(proc), loop=self._loop)
70 + proc_future.add_done_callback(functools.partial(self._proc_coroutine_done, future, proc))
71 +
72 + @coroutine
73 + def _proc_coroutine(self, proc):
74 + yield proc.async_start()
75 + yield proc.async_wait()
76 +
77 + def _proc_coroutine_done(self, future, proc, proc_future):
78 + try:
79 + proc_future.result()
80 + except asyncio.CancelledError:
81 + future.done() or future.cancel()
82 + if proc.poll() is None:
83 + proc.cancel()
84 + self._proc_exit(future, proc)
85
86 def _cancel_cb(self, proc, future):
87 if future.cancelled():