Merge "Shell execute command refactoring"

This commit is contained in:
Jenkins 2014-02-26 19:43:41 +00:00 committed by Gerrit Code Review
commit 626bf5fb12
13 changed files with 416 additions and 206 deletions

View File

@ -84,29 +84,17 @@ class DependencyException(AnvilException):
class ProcessExecutionError(IOError):
def __init__(self, stdout=None, stderr=None,
exit_code=None, cmd=None,
def __init__(self, cmd, stdout='', stderr='', exit_code=None,
description=None):
self.exit_code = exit_code
self.stderr = stderr
self.stdout = stdout
self.cmd = cmd
self.description = description
if not self.cmd:
self.cmd = '-'
if not self.description:
self.description = 'Unexpected error while running command.'
if not isinstance(self.exit_code, (long, int)):
self.exit_code = '-'
if not self.stderr:
self.stderr = ''
if not self.stdout:
self.stdout = ''
message = ('%s\nCommand: %s\n'
'Exit code: %s\nStdout: %r\n'
'Stderr: %r' % (self.description, self.cmd,
self.exit_code, self.stdout,
self.stderr))
if not isinstance(exit_code, (long, int)):
exit_code = '-'
if not description:
description = 'Unexpected error while running command.'
message = ('%s\n' % description +
'Command: %s\n' % cmd +
'Exit code: %s\n' % exit_code +
'Stdout: %s\n' % stdout +
'Stderr: %s' % stderr)
IOError.__init__(self, message)

View File

@ -263,7 +263,7 @@ class DependencyHandler(object):
cmdline.extend(sorted([str(p) for p in pips_to_download]))
out_filename = sh.joinpths(self.log_dir,
"pip-download-attempt-%s.log" % (attempt))
sh.execute_save_output(cmdline, out_filename=out_filename)
sh.execute_save_output(cmdline, out_filename)
def _examine_download_dir(self, pips_to_download, pip_download_dir):
pip_names = set([p.key for p in pips_to_download])

View File

@ -65,7 +65,7 @@ class Helper(object):
def _execute_make(self, filename, marks_dir, jobs):
cmdline = ["make", "-f", filename, "-j", str(jobs)]
out_filename = sh.joinpths(self._log_dir, "%s.log" % sh.basename(filename))
sh.execute_save_output(cmdline, cwd=marks_dir, out_filename=out_filename)
sh.execute_save_output(cmdline, out_filename, cwd=marks_dir)
def _convert_names_to_rpm(self, python_names, only_name):
if not python_names:
@ -150,7 +150,7 @@ class Helper(object):
cmdline.extend(["--", source])
out_filename = sh.joinpths(self._log_dir,
"py2rpm-build-%s.log" % log_filename)
sh.execute_save_output(cmdline, cwd=source, out_filename=out_filename)
sh.execute_save_output(cmdline, out_filename, cwd=source)
def build_all_binaries(self, repo_name, src_repo_dir, rpmbuild_flags,
tracewriter, jobs):

View File

@ -431,7 +431,7 @@ class YumDependencyHandler(base.DependencyHandler):
spec_filename,
]
out_filename = sh.joinpths(self.log_dir, "rpmbuild-%s.log" % instance.name)
sh.execute_save_output(cmdline, out_filename=out_filename)
sh.execute_save_output(cmdline, out_filename)
def _write_git_tarball(self, instance, pkg_dir, spec_filename):
cmdline = [
@ -453,7 +453,7 @@ class YumDependencyHandler(base.DependencyHandler):
"HEAD",
]
out_filename = sh.joinpths(self.log_dir, "git-tar-%s.log" % instance.name)
sh.execute_save_output(cmdline, cwd=pkg_dir, out_filename=out_filename)
sh.execute_save_output(cmdline, out_filename, cwd=pkg_dir)
sh.gzip(output_filename)
sh.unlink(output_filename)
@ -476,7 +476,7 @@ class YumDependencyHandler(base.DependencyHandler):
"--dist-dir", self.rpm_sources_dir,
]
out_filename = sh.joinpths(self.log_dir, "sdist-%s.log" % (instance.name))
sh.execute_save_output(cmdline, cwd=pkg_dir, out_filename=out_filename)
sh.execute_save_output(cmdline, out_filename, cwd=pkg_dir)
archive_name = sh.joinpths(self.rpm_sources_dir, "%s.tar" % (base_name))
if ensure_exists:
with contextlib.closing(tarfile.open(archive_name, 'r')) as tfh:

View File

@ -81,167 +81,116 @@ def is_dry_run():
return bool(IS_DRYRUN)
# Originally borrowed from nova computes execute...
# Originally borrowed from nova compute execute.
def execute(cmd,
process_input=None,
check_exit_code=True,
cwd=None,
shell=False,
env_overrides=None,
stdout_fh=None,
stderr_fh=None,
stdout_fn=None,
stderr_fn=None,
trace_writer=None):
"""Helper method to execute command.
stdout_fh=subprocess.PIPE,
stderr_fh=subprocess.PIPE):
"""Helper method to execute a command through subprocess.
:param cmd: Passed to subprocess.Popen
:param process_input: Send to opened process
:param check_exit_code: Single `bool`, `int`, or `list` of allowed exit
codes. By default, only 0 exit code is allowed.
Raise :class:`exceptions.ProcessExecutionError`
unless program exits with one of these code
:returns: a tuple, (stdout, stderr) from the spawned process, or None if
the command fails
:param cmd: Command passed to subprocess.Popen.
:param process_input: Input send to opened process.
:param check_exit_code: Specifies whether to check process return code.
If return code is other then `0` - exception will
be raised.
:param cwd: The child's current directory will be changed to
`cwd` before it is executed.
:param shell: Specifies whether to use the shell as the program
to execute.
:param env_overrides: Process environment parameters to override.
:param stdout_fh: Stdout file handler.
:param stderr_fh: Stderr file handler.
:returns: A tuple, (stdout, stderr) from the spawned process.
:raises: :class:`exceptions.ProcessExecutionError` when
process ends with other then `0` return code.
"""
if isinstance(check_exit_code, (bool)):
ignore_exit_code = not check_exit_code
check_exit_code = [0]
elif isinstance(check_exit_code, (int)):
check_exit_code = [check_exit_code]
# Ensure all string args (ie for those that send ints and such...)
execute_cmd = [str(c) for c in cmd]
# From the docs it seems a shell command must be a string??
# TODO(harlowja) this might not really be needed?
str_cmd = " ".join(shellquote(word) for word in cmd)
# Ensure all string args (i.e. for those that send ints, etc.).
cmd = map(str, cmd)
# NOTE(skudriashev): If shell is True, it is recommended to pass args as a
# string rather than as a sequence.
str_cmd = subprocess.list2cmdline(cmd)
if shell:
execute_cmd = str_cmd
if not shell:
LOG.debug('Running cmd: %r' % (execute_cmd))
cmd = str_cmd
LOG.debug('Running shell cmd: %r' % cmd)
else:
LOG.debug('Running shell cmd: %r' % (execute_cmd))
LOG.debug('Running cmd: %r' % cmd)
if process_input is not None:
LOG.debug('With stdin: %s' % (process_input))
process_input = str(process_input)
LOG.debug('Process input: %s' % process_input)
if cwd:
LOG.debug("In working directory: %r" % (cwd))
if stdout_fn is not None and stdout_fh is not None:
LOG.warn("Stdout file handles and stdout file names can not be used simultaneously!")
if stderr_fn is not None and stderr_fh is not None:
LOG.warn("Stderr file handles and stderr file names can not be used simultaneously!")
LOG.debug('Process working directory: %r' % cwd)
# Override process environment in needed.
process_env = None
if env_overrides and len(env_overrides):
process_env = env.get()
for (k, v) in env_overrides.items():
for k, v in env_overrides.items():
process_env[k] = str(v)
rc = None
# Run command process.
result = ("", "")
if is_dry_run():
rc = 0
else:
stdin_fh = subprocess.PIPE
if stdout_fn or (stdout_fh is None):
stdout_fh = subprocess.PIPE
if stderr_fn or (stderr_fh is None):
stderr_fh = subprocess.PIPE
try:
obj = subprocess.Popen(execute_cmd, stdin=stdin_fh, stdout=stdout_fh, stderr=stderr_fh,
close_fds=True, cwd=cwd, shell=shell, env=process_env)
if process_input is not None:
result = obj.communicate(str(process_input))
else:
result = obj.communicate()
obj = subprocess.Popen(cmd, stdin=subprocess.PIPE,
stdout=stdout_fh, stderr=stderr_fh,
close_fds=True, shell=shell, cwd=cwd,
env=process_env)
result = obj.communicate(process_input)
except OSError as e:
raise excp.ProcessExecutionError(description="%s: [%s, %s]" % (e, e.errno, e.strerror),
cmd=str_cmd)
rc = obj.returncode
if stdout_fh != subprocess.PIPE:
stdout = "<redirected to %s>" % (stdout_fn or stdout_fh)
else:
stdout = result[0] or ""
if stderr_fh != subprocess.PIPE:
stderr = "<redirected to %s>" % (stderr_fn or stderr_fh)
else:
stderr = result[1] or ""
if (not ignore_exit_code) and (rc not in check_exit_code):
raise excp.ProcessExecutionError(exit_code=rc, stdout=stdout,
stderr=stderr, cmd=str_cmd)
else:
# Log it anyway
if rc not in check_exit_code:
LOG.debug("A failure may have just happened when running command %r [%s] (%s, %s)",
str_cmd, rc, stdout, stderr)
# See if a requested storage place was given for stderr/stdout
for name, handle in ((stdout_fn, stdout), (stderr_fn, stderr)):
if name:
write_file(name, handle)
if trace_writer:
trace_writer.file_touched(name)
return (stdout, stderr)
def execute_save_output2(cmd, **kwargs):
kwargs = kwargs.copy()
watch_open = []
save_stdout = False
stdout_filename = kwargs.pop('stdout_filename', None)
if stdout_filename:
save_stdout = True
mkdirslist(dirname(stdout_filename))
watch_open.append(stdout_filename)
is_same = False
save_stderr = False
stderr_filename = kwargs.pop('stderr_filename', None)
if stderr_filename:
save_stderr = True
if stdout_filename == stderr_filename and save_stdout:
is_same = True
raise excp.ProcessExecutionError(
cmd=str_cmd,
description="%s: [%s, %s]" % (e, e.errno, e.strerror)
)
else:
mkdirslist(dirname(stderr_filename))
watch_open.append(stderr_filename)
rc = obj.returncode
was_opened = []
try:
if save_stdout:
out = open(stdout_filename, "wb")
was_opened.append(out)
kwargs["stdout_fh"] = out
if save_stderr:
if is_same:
kwargs["stderr_fh"] = kwargs["stdout_fh"]
else:
out = open(stderr_filename, "wb")
was_opened.append(out)
kwargs["stderr_fh"] = out
if watch_open:
LOG.info("You can watch progress in another terminal with:")
for p in watch_open:
LOG.info(" tail -f %s", p)
return execute(cmd, **kwargs)
finally:
for fh in was_opened:
try:
fh.close()
except IOError:
pass
# Handle process exit code.
stdout = result[0] or ""
stderr = result[1] or ""
if rc != 0 and check_exit_code:
# Raise exception if return code is not `0`.
raise excp.ProcessExecutionError(cmd=str_cmd,
stdout=_redirected(stdout, stdout_fh),
stderr=_redirected(stderr, stderr_fh),
exit_code=rc)
return stdout, stderr
def execute_save_output(cmd, out_filename, **kwargs):
def _redirected(output, stream, count=5):
"""Prepare stream output before it can be passed to exception to be raised.
Add information of where output was redirected (if was).
"""
if stream == subprocess.PIPE:
lines = output.splitlines(True)
if len(lines) > count:
LOG.debug(output)
output_tail = ''.join(lines[-count:])
output = "<redirected to debug log>\n...\n%s" % output_tail
else:
output = "<redirected to %s>" % stream.name
return output
def execute_save_output(cmd, file_name, **kwargs):
"""Helper method to execute a command through subprocess and save stdout
and stderr into a file.
"""
kwargs = kwargs.copy()
kwargs['stderr_filename'] = out_filename
kwargs['stdout_filename'] = out_filename
execute_save_output2(cmd, **kwargs)
mkdirslist(dirname(file_name))
with open(file_name, 'wb') as fh:
LOG.info("You can watch progress in another terminal with:")
LOG.info(" tail -f %s", file_name)
return execute(cmd, stdout_fh=fh, stderr_fh=fh, **kwargs)
@contextlib.contextmanager

75
anvil/test.py Normal file
View File

@ -0,0 +1,75 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2014 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from testtools import compat
from testtools import matchers
from testtools import testcase
class TestCase(testcase.TestCase):
"""Base test case class for all anvil unit tests."""
def assertRaisesRegexp(self, expected_exception, expected_regexp,
callable_obj=None, *args, **kwargs):
# TODO(harlowja): submit a pull/review request to testtools to add
# this method to there codebase instead of having it exist in ours
# since it really doesn't belong here.
class ReRaiseOtherTypes(object):
def match(self, matchee):
if not issubclass(matchee[0], expected_exception):
compat.reraise(*matchee)
matcher = matchers.Raises(matchers.MatchesAll(ReRaiseOtherTypes(),
matchers.MatchesException(expected_exception,
expected_regexp)))
our_callable = testcase.Nullary(callable_obj, *args, **kwargs)
self.assertThat(our_callable, matcher)
class MockTestCase(TestCase):
"""Base test case class for all anvil mocking unit tests."""
def setUp(self):
super(MockTestCase, self).setUp()
self.master_mock = mock.MagicMock(name='master_mock')
def _patch_class(self, module, name, autospec=True, attach_as=None):
"""Patch class, create class instance mock and attach them to
the master mock.
"""
if autospec:
instance_mock = mock.MagicMock(spec=getattr(module, name))
else:
instance_mock = mock.MagicMock()
patcher = mock.patch.object(module, name, autospec=autospec)
class_mock = patcher.start()
self.addCleanup(patcher.stop)
class_mock.return_value = instance_mock
if attach_as is None:
attach_class_as = name
attach_instance_as = name.lower()
else:
attach_class_as = attach_as + '_class'
attach_instance_as = attach_as
self.master_mock.attach_mock(class_mock, attach_class_as)
self.master_mock.attach_mock(instance_mock, attach_instance_as)
return class_mock, instance_mock

View File

@ -18,15 +18,15 @@ import mock
import os
import shutil
import tempfile
import unittest
from anvil import cfg
from anvil import exceptions
from anvil import shell
from anvil import test
from anvil import utils
class TestYamlRefLoader(unittest.TestCase):
class TestYamlRefLoader(test.TestCase):
def setUp(self):
super(TestYamlRefLoader, self).setUp()
@ -567,7 +567,7 @@ class TestYamlRefLoader(unittest.TestCase):
self.assertEqual(processed['stable'], 12)
class TestYamlMergeLoader(unittest.TestCase):
class TestYamlMergeLoader(test.TestCase):
def setUp(self):
super(TestYamlMergeLoader, self).setUp()

View File

@ -14,15 +14,15 @@
# License for the specific language governing permissions and limitations
# under the License.
import unittest
from anvil import downloader
from anvil import exceptions
from anvil import test
class TestGitDownloader(unittest.TestCase):
class TestGitDownloader(test.TestCase):
def setUp(self):
super(TestGitDownloader, self).setUp()
self._uri = 'https://github.com/stackforge/anvil.git'
self._dst = '/root/anvil'
self._sha1 = '0a4d55a8d778e5022fab701977c5d840bbc486d0'

View File

@ -14,47 +14,102 @@
# License for the specific language governing permissions and limitations
# under the License.
import unittest
from anvil import exceptions
from anvil import exceptions as exc
from anvil import test
class TestYamlException(unittest.TestCase):
class TestProcessExecutionError(test.TestCase):
def test_YamlException(self):
self.assertTrue(issubclass(exceptions.YamlException,
exceptions.ConfigException))
def assertExceptionMessage(self, err, cmd, stdout='', stderr='',
exit_code='-', description=None):
if description is None:
description = 'Unexpected error while running command.'
message = ('%s\nCommand: %s\nExit code: %s\nStdout: %s\nStderr: %s' %
(description, cmd, exit_code, stdout, stderr))
self.assertEqual(err.message, message)
def test_YamlOptionNotFoundException(self):
self.assertTrue(issubclass(exceptions.YamlOptionNotFoundException,
exceptions.YamlException))
def setUp(self):
super(TestProcessExecutionError, self).setUp()
self.cmd = 'test-command'
self.stdout = 'test-stdout'
self.stderr = 'test-stderr'
exc = str(exceptions.YamlOptionNotFoundException(
def test_default(self):
err = exc.ProcessExecutionError(self.cmd)
self.assertExceptionMessage(err, cmd=self.cmd)
def test_stdout(self):
err = exc.ProcessExecutionError(self.cmd, stdout=self.stdout)
self.assertExceptionMessage(err, cmd=self.cmd, stdout=self.stdout)
def test_stdout_empty(self):
err = exc.ProcessExecutionError(self.cmd, stdout='')
self.assertExceptionMessage(err, cmd=self.cmd, stdout='')
def test_stdout_none(self):
err = exc.ProcessExecutionError(self.cmd, stdout=None)
self.assertExceptionMessage(err, cmd=self.cmd, stdout=None)
def test_stderr(self):
err = exc.ProcessExecutionError(self.cmd, stderr=self.stderr)
self.assertExceptionMessage(err, cmd=self.cmd, stderr=self.stderr)
def test_stderr_none(self):
err = exc.ProcessExecutionError(self.cmd, stderr=None)
self.assertExceptionMessage(err, cmd=self.cmd, stderr=None)
def test_exit_code_int(self):
err = exc.ProcessExecutionError(self.cmd, exit_code=0)
self.assertExceptionMessage(err, self.cmd, exit_code=0)
def test_exit_code_long(self):
err = exc.ProcessExecutionError(self.cmd, exit_code=0L)
self.assertExceptionMessage(err, self.cmd, exit_code=0L)
def test_exit_code_not_valid(self):
err = exc.ProcessExecutionError(self.cmd, exit_code='code')
self.assertExceptionMessage(err, self.cmd, exit_code='-')
err = exc.ProcessExecutionError(self.cmd, exit_code=0.0)
self.assertExceptionMessage(err, self.cmd, exit_code='-')
def test_description(self):
description = 'custom description'
err = exc.ProcessExecutionError(self.cmd, description=description)
self.assertExceptionMessage(err, self.cmd, description=description)
class TestYamlException(test.TestCase):
def test_yaml_exception(self):
self.assertTrue(issubclass(exc.YamlException,
exc.ConfigException))
def test_yaml_option_not_found_exception(self):
self.assertTrue(issubclass(exc.YamlOptionNotFoundException,
exc.YamlException))
exc_str = str(exc.YamlOptionNotFoundException(
'conf-sample', 'opt-sample', 'ref-conf', 'ref-opt'
))
self.assertTrue("`conf-sample`" in exc)
self.assertTrue("`ref-opt`" in exc)
self.assertTrue("opt-sample" in exc)
self.assertTrue("ref-conf:ref-opt" in exc)
self.assertTrue("`conf-sample`" in exc_str)
self.assertTrue("`ref-opt`" in exc_str)
self.assertTrue("opt-sample" in exc_str)
self.assertTrue("ref-conf:ref-opt" in exc_str)
def test_YamlConfigNotFoundException(self):
self.assertTrue(issubclass(exceptions.YamlConfigNotFoundException,
exceptions.YamlException))
def test_yaml_config_not_found_exception(self):
self.assertTrue(issubclass(exc.YamlConfigNotFoundException,
exc.YamlException))
exc = str(exceptions.YamlConfigNotFoundException(
"no/such//path/to/yaml"
))
self.assertTrue("no/such//path/to/yaml" in exc)
exc_str = str(exc.YamlConfigNotFoundException("no/such//path/to/yaml"))
self.assertTrue("no/such//path/to/yaml" in exc_str)
def test_YamlLoopException(self):
self.assertTrue(issubclass(exceptions.YamlLoopException,
exceptions.YamlException))
def test_yaml_loop_exception(self):
self.assertTrue(issubclass(exc.YamlLoopException, exc.YamlException))
exc = str(exceptions.YamlLoopException('conf-sample', 'opt-sample',
[('s1', 'r1'), ('s2', 'r2')]))
self.assertTrue("`conf-sample`" in exc)
self.assertTrue("`opt-sample`" in exc)
self.assertTrue("loop found" in exc)
self.assertTrue("`s1`=>`r1`" in exc)
self.assertTrue("`s2`=>`r2`" in exc)
exc_str = str(exc.YamlLoopException('conf-sample', 'opt-sample',
[('s1', 'r1'), ('s2', 'r2')]))
self.assertTrue("`conf-sample`" in exc_str)
self.assertTrue("`opt-sample`" in exc_str)
self.assertTrue("loop found" in exc_str)
self.assertTrue("`s1`=>`r1`" in exc_str)
self.assertTrue("`s2`=>`r2`" in exc_str)

View File

@ -15,20 +15,20 @@
# under the License.
import StringIO
import unittest
from anvil import ini_parser
from anvil import test
class TestAnvilConfigParser(unittest.TestCase):
class TestAnvilConfigParser(test.TestCase):
def setUp(self):
super(TestAnvilConfigParser, self).setUp()
self.config_parser = ini_parser.AnvilConfigParser()
def _read_ini(self, ini):
steam = StringIO.StringIO(ini)
self.config_parser.readfp(steam)
stream = StringIO.StringIO(ini)
self.config_parser.readfp(stream)
def test_commented_option_regexp_simple(self):
regexp = self.config_parser.option_regex

View File

@ -16,12 +16,12 @@
import os
import tempfile
import unittest
from anvil import log
from anvil import test
class TestLog(unittest.TestCase):
class TestLog(test.TestCase):
def setUp(self):
super(TestLog, self).setUp()
self.test_logger = log.getLogger().logger

143
anvil/tests/test_shell.py Normal file
View File

@ -0,0 +1,143 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2014 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
import subprocess
from anvil import exceptions as exc
from anvil import shell as sh
from anvil import test
class TestShell(test.MockTestCase):
def setUp(self):
super(TestShell, self).setUp()
self.cmd = ['test', 'command']
self.str_cmd = ' '.join(self.cmd)
self.result = ('stdout', 'stderr')
# patch subprocess.Popen
self.popen_mock, self.popen_inst_mock = self._patch_class(
sh.subprocess, 'Popen')
self.popen_inst_mock.returncode = 0
self.popen_inst_mock.communicate.return_value = self.result
def test_execute_dry_run(self):
sh.IS_DRYRUN = True
self.assertEqual(sh.execute(self.cmd), ('', ''))
self.assertEqual(self.master_mock.mock_calls, [])
sh.IS_DRYRUN = False
def test_execute_default_params(self):
result = sh.execute(self.cmd)
master_mock_calls = [
mock.call.Popen(self.cmd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
close_fds=True,
shell=False,
cwd=None,
env=None),
mock.call.popen.communicate(None)
]
self.assertEqual(self.master_mock.mock_calls, master_mock_calls)
self.assertEqual(result, self.result)
@mock.patch.object(sh.env, 'get')
def test_execute_custom_params(self, mocked_env_get):
mocked_env_get.return_value = {'a': 'a'}
env = {'b': 'b'}
sh.execute(self.cmd,
process_input='input',
cwd='cwd',
shell=True,
env_overrides=env,
stdout_fh='stdout_fh',
stderr_fh='stderr_fh')
env.update({'a': 'a'})
self.assertEqual(self.master_mock.mock_calls, [
mock.call.Popen(self.str_cmd,
stdin=subprocess.PIPE,
stdout='stdout_fh',
stderr='stderr_fh',
close_fds=True,
shell=True,
cwd='cwd',
env=env),
mock.call.popen.communicate('input')
])
def test_execute_with_result_none(self):
self.popen_inst_mock.communicate.return_value = (None, None)
self.assertEqual(sh.execute(self.cmd), ('', ''))
def test_execute_popen_raises(self):
self.popen_mock.side_effect = OSError('Woot!')
self.assertRaises(exc.ProcessExecutionError, sh.execute, self.cmd)
def test_execute_communicate_raises(self):
self.popen_inst_mock.communicate.side_effect = OSError('Woot!')
self.assertRaises(exc.ProcessExecutionError, sh.execute, self.cmd)
def test_execute_bad_return_code_no_check(self):
self.popen_inst_mock.returncode = 1
self.assertEqual(sh.execute(self.cmd, check_exit_code=False),
self.result)
def test_execute_bad_return_code_with_check(self):
self.popen_inst_mock.returncode = 1
self.assertRaisesRegexp(exc.ProcessExecutionError,
"Unexpected error while running command.\n"
"Command: %s\n"
"Exit code: 1\n"
"Stdout: stdout\n"
"Stderr: stderr" % self.str_cmd,
sh.execute, self.cmd)
def test_execute_bad_return_code_with_tail(self):
self.popen_inst_mock.returncode = 1
self.popen_inst_mock.communicate.return_value = (
'1\n2\n3\n4\n5\n6\n7\n8\n', '')
stdout = '<redirected to debug log>\n...\n4\n5\n6\n7\n8\n'
self.assertRaisesRegexp(exc.ProcessExecutionError,
"Unexpected error while running command.\n"
"Command: %s\n"
"Exit code: 1\n"
"Stdout: %s\n"
"Stderr: " % (self.str_cmd, stdout),
sh.execute, self.cmd)
@mock.patch.object(sh, 'mkdirslist')
def test_execute_save_output(self, mocked_mkdirslist):
self.popen_inst_mock.returncode = 1
file_name = 'output.txt'
with mock.patch.object(sh, 'open', mock.mock_open(),
create=True) as fh_mock:
fh_mock.return_value.name = file_name
self.assertRaisesRegexp(
exc.ProcessExecutionError,
"Unexpected error while running command.\n"
"Command: %s\n"
"Exit code: 1\n"
"Stdout: <redirected to %s>\n"
"Stderr: <redirected to %s>" % (self.str_cmd, file_name,
file_name),
sh.execute_save_output, self.cmd, file_name
)

View File

@ -14,12 +14,12 @@
# License for the specific language governing permissions and limitations
# under the License.
import unittest
from anvil import test
from anvil import utils
class TestUtils(unittest.TestCase):
class TestUtils(test.TestCase):
def test_expand(self):
text = "blah $v"
text = utils.expand_template(text, {