Reduce shell process 'communicate' method code complexity

- close stdin only when waiting for process termination,
  alsewhere flushes buffer
- wait for both closing streams and process termination before
  interrupting comunication

Change-Id: I8fc6c99213634336efde37c062c2e4ea106a6e4c
This commit is contained in:
Federico Ressi 2019-07-11 17:46:18 +02:00
parent b71bbff782
commit 378432cd6a
6 changed files with 132 additions and 42 deletions

View File

@ -19,6 +19,7 @@ from tobiko.shell.sh import _command
from tobiko.shell.sh import _exception
from tobiko.shell.sh import _execute
from tobiko.shell.sh import _local
from tobiko.shell.sh import _io
from tobiko.shell.sh import _process
from tobiko.shell.sh import _ssh
@ -36,6 +37,8 @@ execute = _execute.execute
execute_process = _execute.execute_process
ShellExecuteResult = _execute.ShellExecuteResult
join_chunks = _io.join_chunks
local_execute = _local.local_execute
local_process = _local.local_process
LocalShellProcessFixture = _local.LocalShellProcessFixture

View File

@ -80,12 +80,6 @@ class ShellIOBase(io.IOBase):
def closed(self):
return self.delegate.closed
def __bool__(self):
for chunk in self._data_chunks:
if chunk:
return True
return False
class ShellReadable(ShellIOBase):
@ -97,11 +91,13 @@ class ShellReadable(ShellIOBase):
try:
chunk = self.delegate.read(size)
except IOError:
LOG.exception('Error reading from %r', self)
chunk = None
try:
self.close()
except Exception:
pass
LOG.exception('Error closing %r', self)
raise
if chunk:
self._data_chunks.append(chunk)
@ -204,3 +200,17 @@ def is_writable_file(f):
def select_write_ready_files(files):
return {f for f in files if f.write_ready}
def join_chunks(chunks):
chunk_it = iter(chunks)
data = None
for chunk in chunk_it:
if chunk:
data = chunk
break
if data:
# Use a zero-length chunk to join other chunks
return data + data[:0].join(chunk for chunk in chunk_it if chunk)
else:
return None

View File

@ -227,59 +227,77 @@ class ShellProcessFixture(tobiko.SharedFixture):
self.comunicate(stdin=None, timeout=timeout, wait=True)
def comunicate(self, stdin=None, stdout=True, stderr=True, timeout=None,
wait=True):
wait=True, buffer_size=None):
timeout = ShellProcessTimeout(timeout=timeout)
# Avoid waiting for data in the first loop
poll_interval = 0.
poll_files = _io.select_opened_files([stdin and self.stdin,
stdout and self.stdout,
stderr and self.stderr])
buffer_size = self.parameters.buffer_size
while wait or stdin or poll_files:
while wait or stdin:
self.check_timeout(timeout=timeout)
if stdin:
self.check_is_running()
self.check_stdin_is_opened()
else:
wait = wait and self.is_running
wait = wait and (poll_files or self.is_running)
read_ready, write_ready = _io.select_files(files=poll_files,
timeout=poll_interval)
if read_ready or write_ready:
# Avoid waiting for data the next time
poll_interval = 0.
if self.stdin in write_ready:
# Write data to remote STDIN
stdin = self._write_to_stdin(stdin)
if not stdin:
if wait:
self.stdin.close()
else:
# Stop polling STDIN for write
self.stdin.flush()
poll_files.remove(self.stdin)
if self.stdout in read_ready:
# Read data from remote STDOUT
self._read_from_stdout(buffer_size=buffer_size)
if self.stderr in read_ready:
# Read data from remote STDERR
self._read_from_stderr(buffer_size=buffer_size)
else:
# Wait for data in the following loops
poll_interval = min(self.poll_interval,
self.check_timeout(timeout=timeout))
if self.stdin in write_ready:
# Write data to remote STDIN
sent_bytes = self.stdin.write(stdin)
if sent_bytes:
stdin = stdin[sent_bytes:]
if not stdin:
self.stdin.flush()
else:
LOG.debug("STDIN channel closed by peer on %r", self)
self.stdin.close()
if self.stdout in read_ready:
# Read data from remote STDOUT
chunk = self.stdout.read(buffer_size)
if not chunk:
LOG.debug("STDOUT channel closed by peer on %r", self)
self.stdout.close()
if self.stderr in read_ready:
# Read data from remote STDERR
chunk = self.stderr.read(buffer_size)
if not chunk:
LOG.debug("STDERR channel closed by peer on %r", self)
self.stderr.close()
# Remove closed streams
poll_files = _io.select_opened_files(poll_files)
def _write_to_stdin(self, data, check=True):
"""Write data to STDIN"""
if check:
self.check_stdin_is_opened()
sent_bytes = self.stdin.write(data)
if sent_bytes:
return data[sent_bytes:] or None
else:
LOG.debug("%r closed by peer on %r", self.stdin, self)
self.stdin.close()
def _read_from_stdout(self, buffer_size=None):
"""Read data from remote stream"""
# Read data from remote stream
chunk = self.stdout.read(buffer_size)
if chunk:
return chunk
else:
LOG.debug("%r closed by peer on %r", self.stdout, self)
self.stdout.close()
return None
def _read_from_stderr(self, buffer_size=None):
"""Read data from remote stream"""
# Read data from remote stream
chunk = self.stderr.read(buffer_size)
if chunk:
return chunk
else:
LOG.debug("%r closed by peer on %r", self.stderr, self)
self.stderr.close()
return None
def time_left(self, now=None, timeout=None):
now = now or time.time()
time_left = self.timeout.time_left(now=now)

View File

View File

@ -0,0 +1,59 @@
# Copyright (c) 2019 Red Hat, Inc.
#
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import
from tobiko.shell import sh
from tobiko.tests import unit
class JoinChunksTest(unit.TobikoUnitTest):
def test_join_chunks(self, chunks=None, expected_result=None):
chunks = chunks or []
actual_result = sh.join_chunks(chunks)
self.assertIsInstance(actual_result, type(expected_result))
if len(chunks) > 1:
self.assertEqual(expected_result, actual_result)
else:
self.assertIs(expected_result, actual_result)
def test_join_chunks_with_bytes(self):
self.test_join_chunks([b'a', b'b', b'c'], b'abc')
def test_join_chunks_with_one_bytes(self):
self.test_join_chunks([b'abc'], b'abc')
def test_join_chunks_with_bytes_and_nones(self):
self.test_join_chunks([None, b'ab', None, b'cd'], b'abcd')
def test_join_chunks_with_strings(self):
self.test_join_chunks(['a', 'b', 'c'], 'abc')
def test_join_chunks_with_one_string(self):
self.test_join_chunks(['abc'], 'abc')
def test_join_chunks_with_strings_and_nones(self):
self.test_join_chunks([None, 'ab', None, 'cd'], 'abcd')
def test_join_chunks_with_unicodes(self):
self.test_join_chunks([u'a', u'b', u'c'], u'abc')
def test_join_chunks_with_one_unicode(self):
self.test_join_chunks([u'abc'], u'abc')
def test_join_chunks_with_unicodes_and_nones(self):
self.test_join_chunks([None, u'ab', None, u'cd'], u'abcd')