1 |
commit: c77afbc31fa687cc612a6f946b324bf4d74d8175 |
2 |
Author: Zac Medico <zmedico <AT> gentoo <DOT> org> |
3 |
AuthorDate: Mon Apr 30 01:49:18 2018 +0000 |
4 |
Commit: Zac Medico <zmedico <AT> gentoo <DOT> org> |
5 |
CommitDate: Mon Apr 30 02:14:41 2018 +0000 |
6 |
URL: https://gitweb.gentoo.org/proj/portage.git/commit/?id=c77afbc3 |
7 |
|
8 |
EventLoop: call add_reader/writer callbacks after pipe is closed (bug 654382) |
9 |
|
10 |
Callbacks registered via add_reader/writer methods need to be called |
11 |
when the other end of a pipe is closed, which does not result in a |
12 |
normal read or write event. Therefore, respond to other event types |
13 |
as well, for compatibility with the asyncio event loop implementation. |
14 |
|
15 |
The included unit tests demonstrate asyncio compatible behavior for |
16 |
both reader and writer callbacks. |
17 |
|
18 |
Bug: https://bugs.gentoo.org/654382 |
19 |
|
20 |
.../tests/util/futures/asyncio/test_pipe_closed.py | 133 +++++++++++++++++++++ |
21 |
pym/portage/util/_eventloop/EventLoop.py | 7 +- |
22 |
2 files changed, 138 insertions(+), 2 deletions(-) |
23 |
|
24 |
diff --git a/pym/portage/tests/util/futures/asyncio/test_pipe_closed.py b/pym/portage/tests/util/futures/asyncio/test_pipe_closed.py |
25 |
new file mode 100644 |
26 |
index 000000000..1ecddab78 |
27 |
--- /dev/null |
28 |
+++ b/pym/portage/tests/util/futures/asyncio/test_pipe_closed.py |
29 |
@@ -0,0 +1,133 @@ |
30 |
+# Copyright 2018 Gentoo Foundation |
31 |
+# Distributed under the terms of the GNU General Public License v2 |
32 |
+ |
33 |
+import errno |
34 |
+import os |
35 |
+import pty |
36 |
+import shutil |
37 |
+import socket |
38 |
+import sys |
39 |
+import tempfile |
40 |
+ |
41 |
+from portage.tests import TestCase |
42 |
+from portage.util.futures import asyncio |
43 |
+from portage.util.futures.unix_events import ( |
44 |
+ DefaultEventLoopPolicy, |
45 |
+ _set_nonblocking, |
46 |
+) |
47 |
+ |
48 |
+ |
49 |
+class _PipeClosedTestCase(object): |
50 |
+ |
51 |
+ def test_pipe(self): |
52 |
+ read_end, write_end = os.pipe() |
53 |
+ self._do_test(read_end, write_end) |
54 |
+ |
55 |
+ def test_pty_device(self): |
56 |
+ try: |
57 |
+ read_end, write_end = pty.openpty() |
58 |
+ except EnvironmentError: |
59 |
+ self.skipTest('pty not available') |
60 |
+ self._do_test(read_end, write_end) |
61 |
+ |
62 |
+ def test_domain_socket(self): |
63 |
+ if sys.version_info >= (3, 2): |
64 |
+ read_end, write_end = socket.socketpair() |
65 |
+ else: |
66 |
+ self.skipTest('socket detach not supported') |
67 |
+ self._do_test(read_end.detach(), write_end.detach()) |
68 |
+ |
69 |
+ def test_named_pipe(self): |
70 |
+ tempdir = tempfile.mkdtemp() |
71 |
+ try: |
72 |
+ fifo_path = os.path.join(tempdir, 'fifo') |
73 |
+ os.mkfifo(fifo_path) |
74 |
+ self._do_test(os.open(fifo_path, os.O_NONBLOCK|os.O_RDONLY), |
75 |
+ os.open(fifo_path, os.O_NONBLOCK|os.O_WRONLY)) |
76 |
+ finally: |
77 |
+ shutil.rmtree(tempdir) |
78 |
+ |
79 |
+ |
80 |
+class ReaderPipeClosedTestCase(_PipeClosedTestCase, TestCase): |
81 |
+ """ |
82 |
+ Test that a reader callback is called after the other end of |
83 |
+ the pipe has been closed. |
84 |
+ """ |
85 |
+ def _do_test(self, read_end, write_end): |
86 |
+ initial_policy = asyncio.get_event_loop_policy() |
87 |
+ if not isinstance(initial_policy, DefaultEventLoopPolicy): |
88 |
+ asyncio.set_event_loop_policy(DefaultEventLoopPolicy()) |
89 |
+ |
90 |
+ loop = asyncio.get_event_loop() |
91 |
+ read_end = os.fdopen(read_end, 'rb', 0) |
92 |
+ write_end = os.fdopen(write_end, 'wb', 0) |
93 |
+ try: |
94 |
+ def reader_callback(): |
95 |
+ if not reader_callback.called.done(): |
96 |
+ reader_callback.called.set_result(None) |
97 |
+ |
98 |
+ reader_callback.called = loop.create_future() |
99 |
+ loop.add_reader(read_end.fileno(), reader_callback) |
100 |
+ |
101 |
+ # Allow the loop to check for IO events, and assert |
102 |
+ # that our future is still not done. |
103 |
+ loop.run_until_complete(asyncio.sleep(0, loop=loop)) |
104 |
+ self.assertFalse(reader_callback.called.done()) |
105 |
+ |
106 |
+ # Demonstrate that the callback is called afer the |
107 |
+ # other end of the pipe has been closed. |
108 |
+ write_end.close() |
109 |
+ loop.run_until_complete(reader_callback.called) |
110 |
+ finally: |
111 |
+ loop.remove_reader(read_end.fileno()) |
112 |
+ write_end.close() |
113 |
+ read_end.close() |
114 |
+ asyncio.set_event_loop_policy(initial_policy) |
115 |
+ |
116 |
+ |
117 |
+class WriterPipeClosedTestCase(_PipeClosedTestCase, TestCase): |
118 |
+ """ |
119 |
+ Test that a writer callback is called after the other end of |
120 |
+ the pipe has been closed. |
121 |
+ """ |
122 |
+ def _do_test(self, read_end, write_end): |
123 |
+ initial_policy = asyncio.get_event_loop_policy() |
124 |
+ if not isinstance(initial_policy, DefaultEventLoopPolicy): |
125 |
+ asyncio.set_event_loop_policy(DefaultEventLoopPolicy()) |
126 |
+ |
127 |
+ loop = asyncio.get_event_loop() |
128 |
+ read_end = os.fdopen(read_end, 'rb', 0) |
129 |
+ write_end = os.fdopen(write_end, 'wb', 0) |
130 |
+ try: |
131 |
+ def writer_callback(): |
132 |
+ if not writer_callback.called.done(): |
133 |
+ writer_callback.called.set_result(None) |
134 |
+ |
135 |
+ writer_callback.called = loop.create_future() |
136 |
+ _set_nonblocking(write_end.fileno()) |
137 |
+ loop.add_writer(write_end.fileno(), writer_callback) |
138 |
+ |
139 |
+ # Fill up the pipe, so that no writer callbacks should be |
140 |
+ # received until the state has changed. |
141 |
+ while True: |
142 |
+ try: |
143 |
+ os.write(write_end.fileno(), 512 * b'0') |
144 |
+ except EnvironmentError as e: |
145 |
+ if e.errno != errno.EAGAIN: |
146 |
+ raise |
147 |
+ break |
148 |
+ |
149 |
+ # Allow the loop to check for IO events, and assert |
150 |
+ # that our future is still not done. |
151 |
+ loop.run_until_complete(asyncio.sleep(0, loop=loop)) |
152 |
+ self.assertFalse(writer_callback.called.done()) |
153 |
+ |
154 |
+ # Demonstrate that the callback is called afer the |
155 |
+ # other end of the pipe has been closed. |
156 |
+ read_end.close() |
157 |
+ loop.run_until_complete(writer_callback.called) |
158 |
+ finally: |
159 |
+ loop.remove_writer(write_end.fileno()) |
160 |
+ write_end.close() |
161 |
+ read_end.close() |
162 |
+ asyncio.set_event_loop_policy(initial_policy) |
163 |
|
164 |
diff --git a/pym/portage/util/_eventloop/EventLoop.py b/pym/portage/util/_eventloop/EventLoop.py |
165 |
index 6a8b906ed..fc7380b03 100644 |
166 |
--- a/pym/portage/util/_eventloop/EventLoop.py |
167 |
+++ b/pym/portage/util/_eventloop/EventLoop.py |
168 |
@@ -192,8 +192,11 @@ class EventLoop(object): |
169 |
self.IO_OUT = PollConstants.POLLOUT |
170 |
self.IO_PRI = PollConstants.POLLPRI |
171 |
|
172 |
- self._EVENT_READ = self.IO_IN | self.IO_HUP |
173 |
- self._EVENT_WRITE = self.IO_OUT |
174 |
+ # These trigger both reader and writer callbacks. |
175 |
+ EVENT_SHARED = self.IO_HUP | self.IO_ERR | self.IO_NVAL |
176 |
+ |
177 |
+ self._EVENT_READ = self.IO_IN | EVENT_SHARED |
178 |
+ self._EVENT_WRITE = self.IO_OUT | EVENT_SHARED |
179 |
|
180 |
self._child_handlers = {} |
181 |
self._sigchld_read = None |