From ef75b609aad35dd911540a0d21e781276bf7318f Mon Sep 17 00:00:00 2001 From: Amrith Kumar Date: Thu, 14 Aug 2014 01:21:47 -0400 Subject: [PATCH] Handle a failure on communicate() After much debugging of the subject bug, we've concluded that a failure and an exception of OSError with EAGAIN from communicate() indicates some monkey business. In the specific case that we're seeing in Trove, it is because monkey patching didn't quite get done the way it should. In any event, retrying isn't the answer. This change reverts an earlier change that retried the communicate() call and instead hurls the OSError exception back to the caller, if we are out of retries. A test case to fake the communicate() failure has been amended to ensure that we handle this in what we believe now to be the right way. Originally-Submitted-In: I2450d2dc425a3d29eaba4d5ff9badc4a992a0ec8 Change-Id: Ie36d77eebb2c20124a61ff6029c3b97bfffb9427 Closes-Bug: #1347337 Related-Bug: #1365736 --- oslo/concurrency/processutils.py | 51 +++++++++--------- tests/unit/test_processutils.py | 92 +++++++++++++++++++++++++++----- 2 files changed, 104 insertions(+), 39 deletions(-) diff --git a/oslo/concurrency/processutils.py b/oslo/concurrency/processutils.py index 43108d7..f8b8c68 100644 --- a/oslo/concurrency/processutils.py +++ b/oslo/concurrency/processutils.py @@ -17,7 +17,6 @@ System-level utilities and helper functions. """ -import errno import logging import multiprocessing import os @@ -136,7 +135,7 @@ def execute(*cmd, **kwargs): :raises: :class:`UnknownArgumentError` on receiving unknown arguments :raises: :class:`ProcessExecutionError` - + :raises: :class:`OSError` """ process_input = kwargs.pop('process_input', None) @@ -195,20 +194,9 @@ def execute(*cmd, **kwargs): preexec_fn=preexec_fn, shell=shell, env=env_variables) - result = None - for _i in six.moves.range(20): - # NOTE(russellb) 20 is an arbitrary number of retries to - # prevent any chance of looping forever here. - try: - if process_input is not None: - result = obj.communicate(process_input) - else: - result = obj.communicate() - except OSError as e: - if e.errno in (errno.EAGAIN, errno.EINTR): - continue - raise - break + + result = obj.communicate(process_input) + obj.stdin.close() # pylint: disable=E1101 _returncode = obj.returncode # pylint: disable=E1101 LOG.log(loglevel, 'Result was %s' % _returncode) @@ -221,25 +209,34 @@ def execute(*cmd, **kwargs): stderr=sanitized_stderr, cmd=sanitized_cmd) return result - except ProcessExecutionError as err: + + except (ProcessExecutionError, OSError) as err: # if we want to always log the errors or if this is # the final attempt that failed and we want to log that. if log_errors == LOG_ALL_ERRORS or ( log_errors == LOG_FINAL_ERROR and not attempts): - format = _('%(desc)r\ncommand: %(cmd)r\n' - 'exit code: %(code)r\nstdout: %(stdout)r\n' - 'stderr: %(stderr)r') - LOG.log(loglevel, format, {"desc": err.description, - "cmd": err.cmd, - "code": err.exit_code, - "stdout": err.stdout, - "stderr": err.stderr}) + if isinstance(err, ProcessExecutionError): + format = _('%(desc)r\ncommand: %(cmd)r\n' + 'exit code: %(code)r\nstdout: %(stdout)r\n' + 'stderr: %(stderr)r') + LOG.log(loglevel, format, {"desc": err.description, + "cmd": err.cmd, + "code": err.exit_code, + "stdout": err.stdout, + "stderr": err.stderr}) + else: + format = _('Got an OSError\ncommand: %(cmd)r\n' + 'errno: %(errno)r') + LOG.log(loglevel, format, {"cmd": sanitized_cmd, + "errno": err.errno}) if not attempts: - LOG.log(loglevel, _('%r failed. Not Retrying.'), err.cmd) + LOG.log(loglevel, _('%r failed. Not Retrying.'), + sanitized_cmd) raise else: - LOG.log(loglevel, _('%r failed. Retrying.'), err.cmd) + LOG.log(loglevel, _('%r failed. Retrying.'), + sanitized_cmd) if delay_on_retry: greenthread.sleep(random.randint(20, 200) / 100.0) finally: diff --git a/tests/unit/test_processutils.py b/tests/unit/test_processutils.py index 2bdd525..43c5cbe 100644 --- a/tests/unit/test_processutils.py +++ b/tests/unit/test_processutils.py @@ -27,6 +27,7 @@ import mock from oslotest import base as test_base import six +from oslo.concurrency.openstack.common.fixture import mockpatch from oslo.concurrency import processutils PROCESS_EXECUTION_ERROR_LOGGING_TEST = """#!/bin/bash exit 41""" @@ -204,23 +205,90 @@ grep foo os.unlink(tmpfilename) os.unlink(tmpfilename2) + # This test and the one below ensures that when communicate raises + # an OSError, we do the right thing(s) + def test_exception_on_communicate_error(self): + mock = self.useFixture(mockpatch.Patch( + 'subprocess.Popen.communicate', + side_effect=OSError(errno.EAGAIN, 'fake-test'))) + + self.assertRaises(OSError, + processutils.execute, + '/usr/bin/env', + 'false', + check_exit_code=False) + + self.assertEqual(1, mock.mock.call_count) + def test_retry_on_communicate_error(self): - self.called = False + mock = self.useFixture(mockpatch.Patch( + 'subprocess.Popen.communicate', + side_effect=OSError(errno.EAGAIN, 'fake-test'))) - def fake_communicate(*args, **kwargs): - if self.called: - return ('', '') - self.called = True - e = OSError('foo') - e.errno = errno.EAGAIN - raise e + self.assertRaises(OSError, + processutils.execute, + '/usr/bin/env', + 'false', + check_exit_code=False, + attempts=5) - self.useFixture(fixtures.MonkeyPatch( - 'subprocess.Popen.communicate', fake_communicate)) + self.assertEqual(5, mock.mock.call_count) - processutils.execute('/usr/bin/env', 'true', check_exit_code=False) + def _test_and_check_logging_communicate_errors(self, log_errors=None, + attempts=None): + mock = self.useFixture(mockpatch.Patch( + 'subprocess.Popen.communicate', + side_effect=OSError(errno.EAGAIN, 'fake-test'))) - self.assertTrue(self.called) + fixture = self.useFixture(fixtures.FakeLogger(level=logging.DEBUG)) + kwargs = {} + + if log_errors: + kwargs.update({"log_errors": log_errors}) + + if attempts: + kwargs.update({"attempts": attempts}) + + self.assertRaises(OSError, + processutils.execute, + '/usr/bin/env', + 'false', + **kwargs) + + self.assertEqual(attempts if attempts else 1, mock.mock.call_count) + self.assertIn('Got an OSError', fixture.output) + self.assertIn('errno: 11', fixture.output) + self.assertIn("'/usr/bin/env false'", fixture.output) + + def test_logging_on_communicate_error_1(self): + self._test_and_check_logging_communicate_errors( + log_errors=processutils.LOG_FINAL_ERROR, + attempts=None) + + def test_logging_on_communicate_error_2(self): + self._test_and_check_logging_communicate_errors( + log_errors=processutils.LOG_FINAL_ERROR, + attempts=1) + + def test_logging_on_communicate_error_3(self): + self._test_and_check_logging_communicate_errors( + log_errors=processutils.LOG_FINAL_ERROR, + attempts=5) + + def test_logging_on_communicate_error_4(self): + self._test_and_check_logging_communicate_errors( + log_errors=processutils.LOG_ALL_ERRORS, + attempts=None) + + def test_logging_on_communicate_error_5(self): + self._test_and_check_logging_communicate_errors( + log_errors=processutils.LOG_ALL_ERRORS, + attempts=1) + + def test_logging_on_communicate_error_6(self): + self._test_and_check_logging_communicate_errors( + log_errors=processutils.LOG_ALL_ERRORS, + attempts=5) def test_with_env_variables(self): env_vars = {'SUPER_UNIQUE_VAR': 'The answer is 42'}