ovsdb monitor: get rid of custom _read_stdout/_read_stderr methods

Those methods do the same thing as AsyncProcess counterparts, just
with logging the received output. It's better to move the logging into
AsyncProcess and control it with __init__ arguments.

This allows us to get rid of some duplicate tests for ovsdb monitor.

Change-Id: Ic20ded27ba09afdd73e4d96c47469c2d7b4d4db5
Related-Bug: #1495937
This commit is contained in:
Ihar Hrachyshka 2015-09-17 14:57:43 +02:00
parent 7ab3f53777
commit bdcf8e6079
4 changed files with 98 additions and 47 deletions

View File

@ -56,7 +56,7 @@ class AsyncProcess(object):
"""
def __init__(self, cmd, run_as_root=False, respawn_interval=None,
namespace=None):
namespace=None, log_output=False, die_on_error=False):
"""Constructor.
:param cmd: The list of command arguments to invoke.
@ -66,9 +66,11 @@ class AsyncProcess(object):
only be attempted if a value of 0 or greater is provided.
:param namespace: Optional, start the command in the specified
namespace.
:param log_output: Optional, also log received output.
:param die_on_error: Optional, kills the process on stderr output.
"""
self.cmd_without_namespace = cmd
self.cmd = ip_lib.add_namespace_to_cmd(cmd, namespace)
self._cmd = ip_lib.add_namespace_to_cmd(cmd, namespace)
self.run_as_root = run_as_root
if respawn_interval is not None and respawn_interval < 0:
raise ValueError(_('respawn_interval must be >= 0 if provided.'))
@ -77,6 +79,12 @@ class AsyncProcess(object):
self._kill_event = None
self._reset_queues()
self._watchers = []
self.log_output = log_output
self.die_on_error = die_on_error
@property
def cmd(self):
return ' '.join(self._cmd)
def _reset_queues(self):
self._stdout_lines = eventlet.queue.LightQueue()
@ -126,7 +134,7 @@ class AsyncProcess(object):
def _spawn(self):
"""Spawn a process and its watchers."""
self._kill_event = eventlet.event.Event()
self._process, cmd = utils.create_process(self.cmd,
self._process, cmd = utils.create_process(self._cmd,
run_as_root=self.run_as_root)
self._watchers = []
for reader in (self._read_stdout, self._read_stderr):
@ -223,10 +231,28 @@ class AsyncProcess(object):
return data
def _read_stdout(self):
return self._read(self._process.stdout, self._stdout_lines)
data = self._read(self._process.stdout, self._stdout_lines)
if self.log_output:
LOG.debug('Output received from [%(cmd)s]: %(data)s',
{'cmd': self.cmd,
'data': data})
return data
def _read_stderr(self):
return self._read(self._process.stderr, self._stderr_lines)
data = self._read(self._process.stderr, self._stderr_lines)
if self.log_output:
LOG.error(_LE('Error received from [%(cmd)s]: %(err)s'),
{'cmd': self.cmd,
'err': data})
if self.die_on_error:
LOG.error(_LE("Process [%(cmd)s] dies due to the error: %(err)s"),
{'cmd': self.cmd,
'err': data})
# the callback caller will use None to indicate the need to bail
# out of the thread
return None
return data
def _iter_queue(self, queue, block):
while True:

View File

@ -40,22 +40,9 @@ class OvsdbMonitor(async_process.AsyncProcess):
if format:
cmd.append('--format=%s' % format)
super(OvsdbMonitor, self).__init__(cmd, run_as_root=True,
respawn_interval=respawn_interval)
def _read_stdout(self):
data = self._process.stdout.readline()
if not data:
return
self._stdout_lines.put(data)
LOG.debug('Output received from ovsdb monitor: %s', data)
return data
def _read_stderr(self):
data = super(OvsdbMonitor, self)._read_stderr()
if data:
LOG.error(_LE('Error received from ovsdb monitor: %s'), data)
# Do not return value to ensure that stderr output will
# stop the monitor.
respawn_interval=respawn_interval,
log_output=True,
die_on_error=True)
class SimpleInterfaceMonitor(OvsdbMonitor):

View File

@ -84,7 +84,7 @@ class TestAsyncProcess(base.BaseTestCase):
func.assert_called_once_with()
def test__watch_process_exits_on_callback_failure(self):
self._test__watch_process(lambda: False, eventlet.event.Event())
self._test__watch_process(lambda: None, eventlet.event.Event())
def test__watch_process_exits_on_exception(self):
def foo():
@ -219,3 +219,51 @@ class TestAsyncProcess(base.BaseTestCase):
def test_stop_raises_exception_if_already_started(self):
with testtools.ExpectedException(async_process.AsyncProcessException):
self.proc.stop()
def test_cmd(self):
for expected, cmd in (('ls -l file', ['ls', '-l', 'file']),
('fake', ['fake'])):
proc = async_process.AsyncProcess(cmd)
self.assertEqual(expected, proc.cmd)
class TestAsyncProcessLogging(base.BaseTestCase):
def setUp(self):
super(TestAsyncProcessLogging, self).setUp()
self.log_mock = mock.patch.object(async_process, 'LOG').start()
def _test__read_stdout_logging(self, enable):
proc = async_process.AsyncProcess(['fakecmd'], log_output=enable)
with mock.patch.object(proc, '_read', return_value='fakedata'),\
mock.patch.object(proc, '_process'):
proc._read_stdout()
self.assertEqual(enable, self.log_mock.debug.called)
def _test__read_stderr_logging(self, enable):
proc = async_process.AsyncProcess(['fake'], log_output=enable)
with mock.patch.object(proc, '_read', return_value='fakedata'),\
mock.patch.object(proc, '_process'):
proc._read_stderr()
self.assertEqual(enable, self.log_mock.error.called)
def test__read_stdout_logging_enabled(self):
self._test__read_stdout_logging(enable=True)
def test__read_stdout_logging_disabled(self):
self._test__read_stdout_logging(enable=False)
def test__read_stderr_logging_enabled(self):
self._test__read_stderr_logging(enable=True)
def test__read_stderr_logging_disabled(self):
self._test__read_stderr_logging(enable=False)
class TestAsyncProcessDieOnError(base.BaseTestCase):
def test__read_stderr_returns_none_on_error(self):
proc = async_process.AsyncProcess(['fakecmd'], die_on_error=True)
with mock.patch.object(proc, '_read', return_value='fakedata'),\
mock.patch.object(proc, '_process'):
self.assertIsNone(proc._read_stderr())

View File

@ -21,33 +21,23 @@ from neutron.tests import base
class TestOvsdbMonitor(base.BaseTestCase):
def setUp(self):
super(TestOvsdbMonitor, self).setUp()
self.monitor = ovsdb_monitor.OvsdbMonitor('Interface')
def test___init__(self):
ovsdb_monitor.OvsdbMonitor('Interface')
def read_output_queues_and_returns_result(self, output_type, output):
with mock.patch.object(self.monitor, '_process') as mock_process:
with mock.patch.object(mock_process, output_type) as mock_file:
with mock.patch.object(mock_file, 'readline') as mock_readline:
mock_readline.return_value = output
func = getattr(self.monitor,
'_read_%s' % output_type,
None)
return func()
def test___init___with_columns(self):
columns = ['col1', 'col2']
with mock.patch(
'neutron.agent.linux.async_process.AsyncProcess.__init__') as init:
ovsdb_monitor.OvsdbMonitor('Interface', columns=columns)
cmd = init.call_args_list[0][0][0]
self.assertEqual('col1,col2', cmd[-1])
def test__read_stdout_returns_none_for_empty_read(self):
result = self.read_output_queues_and_returns_result('stdout', '')
self.assertIsNone(result)
def test__read_stdout_queues_normal_output_to_stdout_queue(self):
output = 'foo'
result = self.read_output_queues_and_returns_result('stdout', output)
self.assertEqual(result, output)
self.assertEqual(self.monitor._stdout_lines.get_nowait(), output)
def test__read_stderr_returns_none(self):
result = self.read_output_queues_and_returns_result('stderr', '')
self.assertIsNone(result)
def test___init___with_format(self):
with mock.patch(
'neutron.agent.linux.async_process.AsyncProcess.__init__') as init:
ovsdb_monitor.OvsdbMonitor('Interface', format='blob')
cmd = init.call_args_list[0][0][0]
self.assertEqual('--format=blob', cmd[-1])
class TestSimpleInterfaceMonitor(base.BaseTestCase):