1 |
On Thu, 18 Jun 2020 00:35:48 -0700 |
2 |
Zac Medico <zmedico@g.o> wrote: |
3 |
|
4 |
> In the coroutine finally clause, do not call remove_writer in cases |
5 |
> where fd has been closed and then re-allocated to a concurrent |
6 |
> coroutine as in bug 716636. |
7 |
> |
8 |
> Also, assume that the caller will put the file in non-blocking mode |
9 |
> and close the file when done, so that this function is suitable for |
10 |
> use within a loop. |
11 |
> |
12 |
> Bug: https://bugs.gentoo.org/728580 |
13 |
> Signed-off-by: Zac Medico <zmedico@g.o> |
14 |
> --- |
15 |
> lib/portage/util/futures/_asyncio/process.py | 11 ++++- |
16 |
> lib/portage/util/futures/_asyncio/streams.py | 50 |
17 |
> ++++++++++---------- 2 files changed, 33 insertions(+), 28 |
18 |
> deletions(-) |
19 |
> |
20 |
> diff --git a/lib/portage/util/futures/_asyncio/process.py |
21 |
> b/lib/portage/util/futures/_asyncio/process.py index |
22 |
> 020164c9b..2d3e9b0fd 100644 --- |
23 |
> a/lib/portage/util/futures/_asyncio/process.py +++ |
24 |
> b/lib/portage/util/futures/_asyncio/process.py @@ -1,9 +1,12 @@ |
25 |
> -# Copyright 2018 Gentoo Foundation |
26 |
> +# Copyright 2018-2020 Gentoo Authors |
27 |
> # Distributed under the terms of the GNU General Public License v2 |
28 |
> |
29 |
> +import os |
30 |
> + |
31 |
> import portage |
32 |
> portage.proxy.lazyimport.lazyimport(globals(), |
33 |
> 'portage.util.futures:asyncio', |
34 |
> + 'portage.util.futures.unix_events:_set_nonblocking', |
35 |
> ) |
36 |
> from portage.util.futures._asyncio.streams import _reader, _writer |
37 |
> from portage.util.futures.compat_coroutine import coroutine, |
38 |
> coroutine_return @@ -59,7 +62,11 @@ class _Process(object): |
39 |
> if input is not None: |
40 |
> if self._proc.stdin is None: |
41 |
> raise TypeError('communicate: |
42 |
> expected file or int, got {}'.format(type(self._proc.stdin))) |
43 |
> - writer = |
44 |
> asyncio.ensure_future(_writer(self._proc.stdin, input), |
45 |
> loop=self._loop) |
46 |
> + stdin = self._proc.stdin |
47 |
> + stdin = os.fdopen(stdin, 'wb', 0) if |
48 |
> isinstance(stdin, int) else stdin |
49 |
> + _set_nonblocking(stdin.fileno()) |
50 |
> + writer = |
51 |
> asyncio.ensure_future(_writer(stdin, input, loop=self._loop), |
52 |
> loop=self._loop) |
53 |
> + writer.add_done_callback(lambda writer: |
54 |
> stdin.close()) |
55 |
> try: |
56 |
> yield asyncio.wait(futures + [self.wait()], |
57 |
> loop=self._loop) diff --git |
58 |
> a/lib/portage/util/futures/_asyncio/streams.py |
59 |
> b/lib/portage/util/futures/_asyncio/streams.py index |
60 |
> 650a16491..870307e1e 100644 --- |
61 |
> a/lib/portage/util/futures/_asyncio/streams.py +++ |
62 |
> b/lib/portage/util/futures/_asyncio/streams.py @@ -1,4 +1,4 @@ -# |
63 |
> Copyright 2018 Gentoo Foundation +# Copyright 2018-2020 Gentoo Authors |
64 |
> # Distributed under the terms of the GNU General Public License v2 |
65 |
> |
66 |
> import errno |
67 |
> @@ -8,7 +8,6 @@ import portage |
68 |
> portage.proxy.lazyimport.lazyimport(globals(), |
69 |
> '_emerge.PipeReader:PipeReader', |
70 |
> 'portage.util.futures:asyncio', |
71 |
> - 'portage.util.futures.unix_events:_set_nonblocking', |
72 |
> ) |
73 |
> from portage.util.futures.compat_coroutine import coroutine |
74 |
> |
75 |
> @@ -59,38 +58,37 @@ class _Reader(object): |
76 |
> @coroutine |
77 |
> def _writer(output_file, content, loop=None): |
78 |
> """ |
79 |
> - Asynchronously write bytes to output file, and close it when |
80 |
> - done. If an EnvironmentError other than EAGAIN is |
81 |
> encountered, |
82 |
> - which typically indicates that the other end of the pipe has |
83 |
> - close, the error is raised. This function is a coroutine. |
84 |
> + Asynchronously write bytes to output file. The output file is |
85 |
> + assumed to be in non-blocking mode. If an EnvironmentError |
86 |
> + other than EAGAIN is encountered, which typically indicates |
87 |
> that |
88 |
> + the other end of the pipe has closed, the error is raised. |
89 |
> + This function is a coroutine. |
90 |
> |
91 |
> - @param output_file: output file descriptor |
92 |
> - @type output_file: file or int |
93 |
> + @param output_file: output file |
94 |
> + @type output_file: file object |
95 |
> @param content: content to write |
96 |
> @type content: bytes |
97 |
> @param loop: asyncio.AbstractEventLoop (or compatible) |
98 |
> @type loop: event loop |
99 |
> """ |
100 |
> - fd = output_file if isinstance(output_file, int) else |
101 |
> output_file.fileno() |
102 |
> - _set_nonblocking(fd) |
103 |
> loop = asyncio._wrap_loop(loop) |
104 |
> - try: |
105 |
> - while content: |
106 |
> + fd = output_file.fileno() |
107 |
> + while content: |
108 |
> + try: |
109 |
> + content = content[os.write(fd, content):] |
110 |
> + except EnvironmentError as e: |
111 |
> + if e.errno != errno.EAGAIN: |
112 |
> + raise |
113 |
> waiter = loop.create_future() |
114 |
> - loop.add_writer(fd, lambda: |
115 |
> waiter.set_result(None)) |
116 |
> + loop.add_writer(fd, lambda: waiter.done() or |
117 |
> waiter.set_result(None)) try: |
118 |
> yield waiter |
119 |
> - while content: |
120 |
> - try: |
121 |
> - content = |
122 |
> content[os.write(fd, content):] |
123 |
> - except EnvironmentError as e: |
124 |
> - if e.errno == |
125 |
> errno.EAGAIN: |
126 |
> - break |
127 |
> - else: |
128 |
> - raise |
129 |
> finally: |
130 |
> - loop.remove_writer(fd) |
131 |
> - except GeneratorExit: |
132 |
> - raise |
133 |
> - finally: |
134 |
> - os.close(output_file) if isinstance(output_file, |
135 |
> int) else output_file.close() |
136 |
> + # The loop and output file may have |
137 |
> been closed. |
138 |
> + if not loop.is_closed(): |
139 |
> + waiter.done() or |
140 |
> waiter.cancel() |
141 |
> + # Do not call remove_writer |
142 |
> in cases where fd has |
143 |
> + # been closed and then |
144 |
> re-allocated to a concurrent |
145 |
> + # coroutine as in bug 716636. |
146 |
> + if not output_file.closed: |
147 |
> + |
148 |
> loop.remove_writer(fd) |
149 |
|
150 |
|
151 |
looks fine to me |