Gentoo Archives: gentoo-portage-dev

From: Zac Medico <zmedico@g.o>
To: gentoo-portage-dev@l.g.o
Cc: Zac Medico <zmedico@g.o>
Subject: [gentoo-portage-dev] [PATCH 3/5] MirrorDistTask: terminate via call_soon for asyncio compat
Date: Fri, 24 Mar 2017 02:56:30
Message-Id: 20170324025500.19518-3-zmedico@gentoo.org
In Reply to: [gentoo-portage-dev] [PATCH 1/5] EventLoop: implement call_soon for asyncio compat (bug 591760) by Zac Medico
1 These changes are analogous to the PollScheduler changes in
2 the previous commit.
3 ---
4 pym/portage/_emirrordist/MirrorDistTask.py | 39 +++++++++++++++++++++---------
5 1 file changed, 27 insertions(+), 12 deletions(-)
6
7 diff --git a/pym/portage/_emirrordist/MirrorDistTask.py b/pym/portage/_emirrordist/MirrorDistTask.py
8 index e23a11b..0702eb1 100644
9 --- a/pym/portage/_emirrordist/MirrorDistTask.py
10 +++ b/pym/portage/_emirrordist/MirrorDistTask.py
11 @@ -24,15 +24,16 @@ if sys.hexversion >= 0x3000000:
12
13 class MirrorDistTask(CompositeTask):
14
15 - __slots__ = ('_config', '_terminated', '_term_check_id')
16 + __slots__ = ('_config', '_fetch_iterator', '_term_rlock',
17 + '_terminated', '_term_callback_handle')
18
19 def __init__(self, config):
20 CompositeTask.__init__(self, scheduler=config.event_loop)
21 self._config = config
22 - self._terminated = threading.Event()
23 + self._term_rlock = threading.RLock()
24 + self._term_callback_handle = None
25
26 def _start(self):
27 - self._term_check_id = self.scheduler.idle_add(self._termination_check)
28 fetch = TaskScheduler(iter(FetchIterator(self._config)),
29 max_jobs=self._config.options.jobs,
30 max_load=self._config.options.load_average,
31 @@ -203,17 +204,31 @@ class MirrorDistTask(CompositeTask):
32 logging.info("added %i files" % added_file_count)
33 logging.info("added %i bytes total" % added_byte_count)
34
35 + def _cleanup(self):
36 + """
37 + Cleanup any callbacks that have been registered with the global
38 + event loop.
39 + """
40 + # The self._term_callback_handle attribute requires locking
41 + # since it's modified by the thread safe terminate method.
42 + with self._term_rlock:
43 + if self._term_callback_handle not in (None, False):
44 + self._term_callback_handle.cancel()
45 + # This prevents the terminate method from scheduling
46 + # any more callbacks (since _cleanup must eliminate all
47 + # callbacks in order to ensure complete cleanup).
48 + self._term_callback_handle = False
49 +
50 def terminate(self):
51 - self._terminated.set()
52 + with self._term_rlock:
53 + if self._term_callback_handle is None:
54 + self._term_callback_handle = self.scheduler.call_soon_threadsafe(
55 + self._term_callback)
56
57 - def _termination_check(self):
58 - if self._terminated.is_set():
59 - self.cancel()
60 - self.wait()
61 - return True
62 + def _term_callback(self):
63 + self.cancel()
64 + self.wait()
65
66 def _wait(self):
67 CompositeTask._wait(self)
68 - if self._term_check_id is not None:
69 - self.scheduler.source_remove(self._term_check_id)
70 - self._term_check_id = None
71 + self._cleanup()
72 --
73 2.10.2