Handle a failure on communicate()
After much debugging of the subject bug, we've concluded that a failure and an exception of OSError with EAGAIN from communicate() indicates some monkey business. In the specific case that we're seeing in Trove, it is because monkey patching didn't quite get done the way it should. In any event, retrying isn't the answer. This change reverts an earlier change that retried the communicate() call and instead hurls the OSError exception back to the caller, if we are out of retries. A test case to fake the communicate() failure has been amended to ensure that we handle this in what we believe now to be the right way. Originally-Submitted-In: I2450d2dc425a3d29eaba4d5ff9badc4a992a0ec8 Change-Id: Ie36d77eebb2c20124a61ff6029c3b97bfffb9427 Closes-Bug: #1347337 Related-Bug: #1365736
This commit is contained in:
parent
4e7a576227
commit
ef75b609aa
@ -17,7 +17,6 @@
|
||||
System-level utilities and helper functions.
|
||||
"""
|
||||
|
||||
import errno
|
||||
import logging
|
||||
import multiprocessing
|
||||
import os
|
||||
@ -136,7 +135,7 @@ def execute(*cmd, **kwargs):
|
||||
:raises: :class:`UnknownArgumentError` on
|
||||
receiving unknown arguments
|
||||
:raises: :class:`ProcessExecutionError`
|
||||
|
||||
:raises: :class:`OSError`
|
||||
"""
|
||||
|
||||
process_input = kwargs.pop('process_input', None)
|
||||
@ -195,20 +194,9 @@ def execute(*cmd, **kwargs):
|
||||
preexec_fn=preexec_fn,
|
||||
shell=shell,
|
||||
env=env_variables)
|
||||
result = None
|
||||
for _i in six.moves.range(20):
|
||||
# NOTE(russellb) 20 is an arbitrary number of retries to
|
||||
# prevent any chance of looping forever here.
|
||||
try:
|
||||
if process_input is not None:
|
||||
result = obj.communicate(process_input)
|
||||
else:
|
||||
result = obj.communicate()
|
||||
except OSError as e:
|
||||
if e.errno in (errno.EAGAIN, errno.EINTR):
|
||||
continue
|
||||
raise
|
||||
break
|
||||
|
||||
result = obj.communicate(process_input)
|
||||
|
||||
obj.stdin.close() # pylint: disable=E1101
|
||||
_returncode = obj.returncode # pylint: disable=E1101
|
||||
LOG.log(loglevel, 'Result was %s' % _returncode)
|
||||
@ -221,25 +209,34 @@ def execute(*cmd, **kwargs):
|
||||
stderr=sanitized_stderr,
|
||||
cmd=sanitized_cmd)
|
||||
return result
|
||||
except ProcessExecutionError as err:
|
||||
|
||||
except (ProcessExecutionError, OSError) as err:
|
||||
# if we want to always log the errors or if this is
|
||||
# the final attempt that failed and we want to log that.
|
||||
if log_errors == LOG_ALL_ERRORS or (
|
||||
log_errors == LOG_FINAL_ERROR and not attempts):
|
||||
format = _('%(desc)r\ncommand: %(cmd)r\n'
|
||||
'exit code: %(code)r\nstdout: %(stdout)r\n'
|
||||
'stderr: %(stderr)r')
|
||||
LOG.log(loglevel, format, {"desc": err.description,
|
||||
"cmd": err.cmd,
|
||||
"code": err.exit_code,
|
||||
"stdout": err.stdout,
|
||||
"stderr": err.stderr})
|
||||
if isinstance(err, ProcessExecutionError):
|
||||
format = _('%(desc)r\ncommand: %(cmd)r\n'
|
||||
'exit code: %(code)r\nstdout: %(stdout)r\n'
|
||||
'stderr: %(stderr)r')
|
||||
LOG.log(loglevel, format, {"desc": err.description,
|
||||
"cmd": err.cmd,
|
||||
"code": err.exit_code,
|
||||
"stdout": err.stdout,
|
||||
"stderr": err.stderr})
|
||||
else:
|
||||
format = _('Got an OSError\ncommand: %(cmd)r\n'
|
||||
'errno: %(errno)r')
|
||||
LOG.log(loglevel, format, {"cmd": sanitized_cmd,
|
||||
"errno": err.errno})
|
||||
|
||||
if not attempts:
|
||||
LOG.log(loglevel, _('%r failed. Not Retrying.'), err.cmd)
|
||||
LOG.log(loglevel, _('%r failed. Not Retrying.'),
|
||||
sanitized_cmd)
|
||||
raise
|
||||
else:
|
||||
LOG.log(loglevel, _('%r failed. Retrying.'), err.cmd)
|
||||
LOG.log(loglevel, _('%r failed. Retrying.'),
|
||||
sanitized_cmd)
|
||||
if delay_on_retry:
|
||||
greenthread.sleep(random.randint(20, 200) / 100.0)
|
||||
finally:
|
||||
|
@ -27,6 +27,7 @@ import mock
|
||||
from oslotest import base as test_base
|
||||
import six
|
||||
|
||||
from oslo.concurrency.openstack.common.fixture import mockpatch
|
||||
from oslo.concurrency import processutils
|
||||
PROCESS_EXECUTION_ERROR_LOGGING_TEST = """#!/bin/bash
|
||||
exit 41"""
|
||||
@ -204,23 +205,90 @@ grep foo
|
||||
os.unlink(tmpfilename)
|
||||
os.unlink(tmpfilename2)
|
||||
|
||||
# This test and the one below ensures that when communicate raises
|
||||
# an OSError, we do the right thing(s)
|
||||
def test_exception_on_communicate_error(self):
|
||||
mock = self.useFixture(mockpatch.Patch(
|
||||
'subprocess.Popen.communicate',
|
||||
side_effect=OSError(errno.EAGAIN, 'fake-test')))
|
||||
|
||||
self.assertRaises(OSError,
|
||||
processutils.execute,
|
||||
'/usr/bin/env',
|
||||
'false',
|
||||
check_exit_code=False)
|
||||
|
||||
self.assertEqual(1, mock.mock.call_count)
|
||||
|
||||
def test_retry_on_communicate_error(self):
|
||||
self.called = False
|
||||
mock = self.useFixture(mockpatch.Patch(
|
||||
'subprocess.Popen.communicate',
|
||||
side_effect=OSError(errno.EAGAIN, 'fake-test')))
|
||||
|
||||
def fake_communicate(*args, **kwargs):
|
||||
if self.called:
|
||||
return ('', '')
|
||||
self.called = True
|
||||
e = OSError('foo')
|
||||
e.errno = errno.EAGAIN
|
||||
raise e
|
||||
self.assertRaises(OSError,
|
||||
processutils.execute,
|
||||
'/usr/bin/env',
|
||||
'false',
|
||||
check_exit_code=False,
|
||||
attempts=5)
|
||||
|
||||
self.useFixture(fixtures.MonkeyPatch(
|
||||
'subprocess.Popen.communicate', fake_communicate))
|
||||
self.assertEqual(5, mock.mock.call_count)
|
||||
|
||||
processutils.execute('/usr/bin/env', 'true', check_exit_code=False)
|
||||
def _test_and_check_logging_communicate_errors(self, log_errors=None,
|
||||
attempts=None):
|
||||
mock = self.useFixture(mockpatch.Patch(
|
||||
'subprocess.Popen.communicate',
|
||||
side_effect=OSError(errno.EAGAIN, 'fake-test')))
|
||||
|
||||
self.assertTrue(self.called)
|
||||
fixture = self.useFixture(fixtures.FakeLogger(level=logging.DEBUG))
|
||||
kwargs = {}
|
||||
|
||||
if log_errors:
|
||||
kwargs.update({"log_errors": log_errors})
|
||||
|
||||
if attempts:
|
||||
kwargs.update({"attempts": attempts})
|
||||
|
||||
self.assertRaises(OSError,
|
||||
processutils.execute,
|
||||
'/usr/bin/env',
|
||||
'false',
|
||||
**kwargs)
|
||||
|
||||
self.assertEqual(attempts if attempts else 1, mock.mock.call_count)
|
||||
self.assertIn('Got an OSError', fixture.output)
|
||||
self.assertIn('errno: 11', fixture.output)
|
||||
self.assertIn("'/usr/bin/env false'", fixture.output)
|
||||
|
||||
def test_logging_on_communicate_error_1(self):
|
||||
self._test_and_check_logging_communicate_errors(
|
||||
log_errors=processutils.LOG_FINAL_ERROR,
|
||||
attempts=None)
|
||||
|
||||
def test_logging_on_communicate_error_2(self):
|
||||
self._test_and_check_logging_communicate_errors(
|
||||
log_errors=processutils.LOG_FINAL_ERROR,
|
||||
attempts=1)
|
||||
|
||||
def test_logging_on_communicate_error_3(self):
|
||||
self._test_and_check_logging_communicate_errors(
|
||||
log_errors=processutils.LOG_FINAL_ERROR,
|
||||
attempts=5)
|
||||
|
||||
def test_logging_on_communicate_error_4(self):
|
||||
self._test_and_check_logging_communicate_errors(
|
||||
log_errors=processutils.LOG_ALL_ERRORS,
|
||||
attempts=None)
|
||||
|
||||
def test_logging_on_communicate_error_5(self):
|
||||
self._test_and_check_logging_communicate_errors(
|
||||
log_errors=processutils.LOG_ALL_ERRORS,
|
||||
attempts=1)
|
||||
|
||||
def test_logging_on_communicate_error_6(self):
|
||||
self._test_and_check_logging_communicate_errors(
|
||||
log_errors=processutils.LOG_ALL_ERRORS,
|
||||
attempts=5)
|
||||
|
||||
def test_with_env_variables(self):
|
||||
env_vars = {'SUPER_UNIQUE_VAR': 'The answer is 42'}
|
||||
|
Loading…
x
Reference in New Issue
Block a user