diff --git a/neutron/agent/common/async_process.py b/neutron/agent/common/async_process.py new file mode 100644 index 00000000000..fc8284fe379 --- /dev/null +++ b/neutron/agent/common/async_process.py @@ -0,0 +1,272 @@ +# Copyright 2013 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import signal + +import eventlet +import eventlet.event +import eventlet.queue +from neutron_lib.utils import helpers +from oslo_log import log as logging + +from neutron._i18n import _ +from neutron.agent.common import ip_lib +from neutron.agent.common import utils +from neutron.common import utils as common_utils + + +LOG = logging.getLogger(__name__) + + +class AsyncProcessException(Exception): + pass + + +class AsyncProcess(object): + """Manages an asynchronous process. + + This class spawns a new process via subprocess and uses + greenthreads to read stderr and stdout asynchronously into queues + that can be read via repeatedly calling iter_stdout() and + iter_stderr(). + + If respawn_interval is non-zero, any error in communicating with + the managed process will result in the process and greenthreads + being cleaned up and the process restarted after the specified + interval. + + Example usage: + + >>> import time + >>> proc = AsyncProcess(['ping']) + >>> proc.start() + >>> time.sleep(5) + >>> proc.stop() + >>> for line in proc.iter_stdout(): + ... print(line) + """ + + def __init__(self, cmd, run_as_root=False, respawn_interval=None, + namespace=None, log_output=False, die_on_error=False): + """Constructor. + + :param cmd: The list of command arguments to invoke. + :param run_as_root: The process should run with elevated privileges. + :param respawn_interval: Optional, the interval in seconds to wait + to respawn after unexpected process death. Respawn will + only be attempted if a value of 0 or greater is provided. + :param namespace: Optional, start the command in the specified + namespace. + :param log_output: Optional, also log received output. + :param die_on_error: Optional, kills the process on stderr output. + """ + self.cmd_without_namespace = cmd + self._cmd = ip_lib.add_namespace_to_cmd(cmd, namespace) + self.run_as_root = run_as_root + if respawn_interval is not None and respawn_interval < 0: + raise ValueError(_('respawn_interval must be >= 0 if provided.')) + self.respawn_interval = respawn_interval + self._process = None + self._pid = None + self._is_running = False + self._kill_event = None + self._reset_queues() + self._watchers = [] + self.log_output = log_output + self.die_on_error = die_on_error + + @property + def cmd(self): + return ' '.join(self._cmd) + + def _reset_queues(self): + self._stdout_lines = eventlet.queue.LightQueue() + self._stderr_lines = eventlet.queue.LightQueue() + + def is_active(self): + # If using sudo rootwrap as a root_helper, we have to wait until sudo + # spawns rootwrap and rootwrap spawns the process. self.pid will make + # sure to get the correct pid. + return utils.pid_invoked_with_cmdline( + self.pid, self.cmd_without_namespace) + + def start(self, block=False): + """Launch a process and monitor it asynchronously. + + :param block: Block until the process has started. + :raises utils.WaitTimeout if blocking is True and the process + did not start in time. + """ + LOG.debug('Launching async process [%s].', self.cmd) + if self._is_running: + raise AsyncProcessException(_('Process is already started')) + else: + self._spawn() + + if block: + common_utils.wait_until_true(self.is_active) + + def stop(self, block=False, kill_signal=None): + """Halt the process and watcher threads. + + :param block: Block until the process has stopped. + :param kill_signal: Number of signal that will be sent to the process + when terminating the process + :raises utils.WaitTimeout if blocking is True and the process + did not stop in time. + """ + kill_signal = kill_signal or getattr(signal, 'SIGKILL', signal.SIGTERM) + if self._is_running: + LOG.debug('Halting async process [%s].', self.cmd) + self._kill(kill_signal) + else: + raise AsyncProcessException(_('Process is not running.')) + + if block: + common_utils.wait_until_true(lambda: not self.is_active()) + + def _spawn(self): + """Spawn a process and its watchers.""" + self._is_running = True + self._pid = None + self._kill_event = eventlet.event.Event() + self._process, cmd = utils.create_process(self._cmd, + run_as_root=self.run_as_root) + self._watchers = [] + for reader in (self._read_stdout, self._read_stderr): + # Pass the stop event directly to the greenthread to + # ensure that assignment of a new event to the instance + # attribute does not prevent the greenthread from using + # the original event. + watcher = eventlet.spawn(self._watch_process, + reader, + self._kill_event) + self._watchers.append(watcher) + + @property + def pid(self): + if self._process: + if not self._pid: + self._pid = utils.get_root_helper_child_pid( + self._process.pid, + self.cmd_without_namespace, + run_as_root=self.run_as_root) + return self._pid + + def _kill(self, kill_signal): + """Kill the process and the associated watcher greenthreads.""" + pid = self.pid + if pid: + self._is_running = False + self._pid = None + self._kill_process(pid, kill_signal) + + # Halt the greenthreads if they weren't already. + if self._kill_event: + self._kill_event.send() + self._kill_event = None + + def _kill_process(self, pid, kill_signal): + try: + # A process started by a root helper will be running as + # root and need to be killed via the same helper. + utils.kill_process(pid, kill_signal, self.run_as_root) + except Exception: + LOG.exception('An error occurred while killing [%s].', + self.cmd) + return False + + if self._process: + self._process.wait() + return True + + def _handle_process_error(self): + """Kill the async process and respawn if necessary.""" + stdout = list(self.iter_stdout()) + stderr = list(self.iter_stderr()) + LOG.debug('Halting async process [%s] in response to an error. stdout:' + ' [%s] - stderr: [%s]', self.cmd, stdout, stderr) + self._kill(getattr(signal, 'SIGKILL', signal.SIGTERM)) + if self.respawn_interval is not None and self.respawn_interval >= 0: + eventlet.sleep(self.respawn_interval) + LOG.debug('Respawning async process [%s].', self.cmd) + try: + self.start() + except AsyncProcessException: + # Process was already respawned by someone else... + pass + + def _watch_process(self, callback, kill_event): + while not kill_event.ready(): + try: + output = callback() + if not output and output != "": + break + except Exception: + LOG.exception('An error occurred while communicating ' + 'with async process [%s].', self.cmd) + break + # Ensure that watching a process with lots of output does + # not block execution of other greenthreads. + eventlet.sleep() + # self._is_running being True indicates that the loop was + # broken out of due to an error in the watched process rather + # than the loop condition being satisfied. + if self._is_running: + self._is_running = False + self._handle_process_error() + + def _read(self, stream, queue): + data = stream.readline() + if data: + data = helpers.safe_decode_utf8(data.strip()) + queue.put(data) + return data + + def _read_stdout(self): + data = self._read(self._process.stdout, self._stdout_lines) + if self.log_output: + LOG.debug('Output received from [%(cmd)s]: %(data)s', + {'cmd': self.cmd, + 'data': data}) + return data + + def _read_stderr(self): + data = self._read(self._process.stderr, self._stderr_lines) + if self.log_output: + LOG.error('Error received from [%(cmd)s]: %(err)s', + {'cmd': self.cmd, + 'err': data}) + if self.die_on_error: + LOG.error("Process [%(cmd)s] dies due to the error: %(err)s", + {'cmd': self.cmd, + 'err': data}) + # the callback caller will use None to indicate the need to bail + # out of the thread + return None + + return data + + def _iter_queue(self, queue, block): + while True: + try: + yield queue.get(block=block) + except eventlet.queue.Empty: + break + + def iter_stdout(self, block=False): + return self._iter_queue(self._stdout_lines, block) + + def iter_stderr(self, block=False): + return self._iter_queue(self._stderr_lines, block) diff --git a/neutron/agent/linux/ovsdb_monitor.py b/neutron/agent/common/ovsdb_monitor.py similarity index 99% rename from neutron/agent/linux/ovsdb_monitor.py rename to neutron/agent/common/ovsdb_monitor.py index b066ebfc659..6b182f492f5 100644 --- a/neutron/agent/linux/ovsdb_monitor.py +++ b/neutron/agent/common/ovsdb_monitor.py @@ -19,7 +19,7 @@ from oslo_config import cfg from oslo_log import log as logging from oslo_serialization import jsonutils -from neutron.agent.linux import async_process +from neutron.agent.common import async_process from neutron.agent.ovsdb import api as ovsdb from neutron.agent.ovsdb.native import helpers from neutron.common import utils diff --git a/neutron/agent/linux/async_process.py b/neutron/agent/linux/async_process.py index fc8284fe379..aec24d52ebd 100644 --- a/neutron/agent/linux/async_process.py +++ b/neutron/agent/linux/async_process.py @@ -12,261 +12,17 @@ # License for the specific language governing permissions and limitations # under the License. -import signal +from debtcollector import moves -import eventlet -import eventlet.event -import eventlet.queue -from neutron_lib.utils import helpers -from oslo_log import log as logging - -from neutron._i18n import _ -from neutron.agent.common import ip_lib -from neutron.agent.common import utils -from neutron.common import utils as common_utils +from neutron.agent.common import async_process -LOG = logging.getLogger(__name__) +AsyncProcessException = moves.moved_class( + async_process.AsyncProcessException, + 'AsyncProcessException', + __name__) - -class AsyncProcessException(Exception): - pass - - -class AsyncProcess(object): - """Manages an asynchronous process. - - This class spawns a new process via subprocess and uses - greenthreads to read stderr and stdout asynchronously into queues - that can be read via repeatedly calling iter_stdout() and - iter_stderr(). - - If respawn_interval is non-zero, any error in communicating with - the managed process will result in the process and greenthreads - being cleaned up and the process restarted after the specified - interval. - - Example usage: - - >>> import time - >>> proc = AsyncProcess(['ping']) - >>> proc.start() - >>> time.sleep(5) - >>> proc.stop() - >>> for line in proc.iter_stdout(): - ... print(line) - """ - - def __init__(self, cmd, run_as_root=False, respawn_interval=None, - namespace=None, log_output=False, die_on_error=False): - """Constructor. - - :param cmd: The list of command arguments to invoke. - :param run_as_root: The process should run with elevated privileges. - :param respawn_interval: Optional, the interval in seconds to wait - to respawn after unexpected process death. Respawn will - only be attempted if a value of 0 or greater is provided. - :param namespace: Optional, start the command in the specified - namespace. - :param log_output: Optional, also log received output. - :param die_on_error: Optional, kills the process on stderr output. - """ - self.cmd_without_namespace = cmd - self._cmd = ip_lib.add_namespace_to_cmd(cmd, namespace) - self.run_as_root = run_as_root - if respawn_interval is not None and respawn_interval < 0: - raise ValueError(_('respawn_interval must be >= 0 if provided.')) - self.respawn_interval = respawn_interval - self._process = None - self._pid = None - self._is_running = False - self._kill_event = None - self._reset_queues() - self._watchers = [] - self.log_output = log_output - self.die_on_error = die_on_error - - @property - def cmd(self): - return ' '.join(self._cmd) - - def _reset_queues(self): - self._stdout_lines = eventlet.queue.LightQueue() - self._stderr_lines = eventlet.queue.LightQueue() - - def is_active(self): - # If using sudo rootwrap as a root_helper, we have to wait until sudo - # spawns rootwrap and rootwrap spawns the process. self.pid will make - # sure to get the correct pid. - return utils.pid_invoked_with_cmdline( - self.pid, self.cmd_without_namespace) - - def start(self, block=False): - """Launch a process and monitor it asynchronously. - - :param block: Block until the process has started. - :raises utils.WaitTimeout if blocking is True and the process - did not start in time. - """ - LOG.debug('Launching async process [%s].', self.cmd) - if self._is_running: - raise AsyncProcessException(_('Process is already started')) - else: - self._spawn() - - if block: - common_utils.wait_until_true(self.is_active) - - def stop(self, block=False, kill_signal=None): - """Halt the process and watcher threads. - - :param block: Block until the process has stopped. - :param kill_signal: Number of signal that will be sent to the process - when terminating the process - :raises utils.WaitTimeout if blocking is True and the process - did not stop in time. - """ - kill_signal = kill_signal or getattr(signal, 'SIGKILL', signal.SIGTERM) - if self._is_running: - LOG.debug('Halting async process [%s].', self.cmd) - self._kill(kill_signal) - else: - raise AsyncProcessException(_('Process is not running.')) - - if block: - common_utils.wait_until_true(lambda: not self.is_active()) - - def _spawn(self): - """Spawn a process and its watchers.""" - self._is_running = True - self._pid = None - self._kill_event = eventlet.event.Event() - self._process, cmd = utils.create_process(self._cmd, - run_as_root=self.run_as_root) - self._watchers = [] - for reader in (self._read_stdout, self._read_stderr): - # Pass the stop event directly to the greenthread to - # ensure that assignment of a new event to the instance - # attribute does not prevent the greenthread from using - # the original event. - watcher = eventlet.spawn(self._watch_process, - reader, - self._kill_event) - self._watchers.append(watcher) - - @property - def pid(self): - if self._process: - if not self._pid: - self._pid = utils.get_root_helper_child_pid( - self._process.pid, - self.cmd_without_namespace, - run_as_root=self.run_as_root) - return self._pid - - def _kill(self, kill_signal): - """Kill the process and the associated watcher greenthreads.""" - pid = self.pid - if pid: - self._is_running = False - self._pid = None - self._kill_process(pid, kill_signal) - - # Halt the greenthreads if they weren't already. - if self._kill_event: - self._kill_event.send() - self._kill_event = None - - def _kill_process(self, pid, kill_signal): - try: - # A process started by a root helper will be running as - # root and need to be killed via the same helper. - utils.kill_process(pid, kill_signal, self.run_as_root) - except Exception: - LOG.exception('An error occurred while killing [%s].', - self.cmd) - return False - - if self._process: - self._process.wait() - return True - - def _handle_process_error(self): - """Kill the async process and respawn if necessary.""" - stdout = list(self.iter_stdout()) - stderr = list(self.iter_stderr()) - LOG.debug('Halting async process [%s] in response to an error. stdout:' - ' [%s] - stderr: [%s]', self.cmd, stdout, stderr) - self._kill(getattr(signal, 'SIGKILL', signal.SIGTERM)) - if self.respawn_interval is not None and self.respawn_interval >= 0: - eventlet.sleep(self.respawn_interval) - LOG.debug('Respawning async process [%s].', self.cmd) - try: - self.start() - except AsyncProcessException: - # Process was already respawned by someone else... - pass - - def _watch_process(self, callback, kill_event): - while not kill_event.ready(): - try: - output = callback() - if not output and output != "": - break - except Exception: - LOG.exception('An error occurred while communicating ' - 'with async process [%s].', self.cmd) - break - # Ensure that watching a process with lots of output does - # not block execution of other greenthreads. - eventlet.sleep() - # self._is_running being True indicates that the loop was - # broken out of due to an error in the watched process rather - # than the loop condition being satisfied. - if self._is_running: - self._is_running = False - self._handle_process_error() - - def _read(self, stream, queue): - data = stream.readline() - if data: - data = helpers.safe_decode_utf8(data.strip()) - queue.put(data) - return data - - def _read_stdout(self): - data = self._read(self._process.stdout, self._stdout_lines) - if self.log_output: - LOG.debug('Output received from [%(cmd)s]: %(data)s', - {'cmd': self.cmd, - 'data': data}) - return data - - def _read_stderr(self): - data = self._read(self._process.stderr, self._stderr_lines) - if self.log_output: - LOG.error('Error received from [%(cmd)s]: %(err)s', - {'cmd': self.cmd, - 'err': data}) - if self.die_on_error: - LOG.error("Process [%(cmd)s] dies due to the error: %(err)s", - {'cmd': self.cmd, - 'err': data}) - # the callback caller will use None to indicate the need to bail - # out of the thread - return None - - return data - - def _iter_queue(self, queue, block): - while True: - try: - yield queue.get(block=block) - except eventlet.queue.Empty: - break - - def iter_stdout(self, block=False): - return self._iter_queue(self._stdout_lines, block) - - def iter_stderr(self, block=False): - return self._iter_queue(self._stderr_lines, block) +AsyncProcess = moves.moved_class( + async_process.AsyncProcess, + 'AsyncProcess', + __name__) diff --git a/neutron/agent/linux/ip_monitor.py b/neutron/agent/linux/ip_monitor.py index 33db43c9a61..2a386f06ae0 100644 --- a/neutron/agent/linux/ip_monitor.py +++ b/neutron/agent/linux/ip_monitor.py @@ -16,7 +16,7 @@ from oslo_log import log as logging from oslo_utils import excutils -from neutron.agent.linux import async_process +from neutron.agent.common import async_process from neutron.agent.linux import ip_lib LOG = logging.getLogger(__name__) diff --git a/neutron/agent/linux/polling.py b/neutron/agent/linux/polling.py index 695fd4bc02d..91475f899ca 100644 --- a/neutron/agent/linux/polling.py +++ b/neutron/agent/linux/polling.py @@ -18,9 +18,9 @@ import eventlet from oslo_config import cfg from oslo_log import log as logging +from neutron.agent.common import async_process from neutron.agent.common import base_polling -from neutron.agent.linux import async_process -from neutron.agent.linux import ovsdb_monitor +from neutron.agent.common import ovsdb_monitor from neutron.plugins.ml2.drivers.openvswitch.agent.common import constants LOG = logging.getLogger(__name__) diff --git a/neutron/plugins/ml2/drivers/openvswitch/agent/ovs_neutron_agent.py b/neutron/plugins/ml2/drivers/openvswitch/agent/ovs_neutron_agent.py index 988bf802266..af34d2f5900 100644 --- a/neutron/plugins/ml2/drivers/openvswitch/agent/ovs_neutron_agent.py +++ b/neutron/plugins/ml2/drivers/openvswitch/agent/ovs_neutron_agent.py @@ -44,10 +44,10 @@ from six import moves from neutron._i18n import _ from neutron.agent.common import ip_lib from neutron.agent.common import ovs_lib +from neutron.agent.common import ovsdb_monitor from neutron.agent.common import polling from neutron.agent.common import utils from neutron.agent.l2 import l2_agent_extensions_manager as ext_manager -from neutron.agent.linux import ovsdb_monitor from neutron.agent.linux import xenapi_root_helper from neutron.agent import rpc as agent_rpc from neutron.agent import securitygroups_rpc as agent_sg_rpc diff --git a/neutron/tests/common/agents/l2_extensions.py b/neutron/tests/common/agents/l2_extensions.py index 37edc51078b..a4161c39820 100644 --- a/neutron/tests/common/agents/l2_extensions.py +++ b/neutron/tests/common/agents/l2_extensions.py @@ -18,7 +18,7 @@ import signal from oslo_log import log as logging -from neutron.agent.linux import async_process +from neutron.agent.common import async_process from neutron.agent.linux import iptables_manager from neutron.common import utils as common_utils diff --git a/neutron/tests/fullstack/resources/machine.py b/neutron/tests/fullstack/resources/machine.py index bce26186281..74580cc39f7 100644 --- a/neutron/tests/fullstack/resources/machine.py +++ b/neutron/tests/fullstack/resources/machine.py @@ -20,7 +20,7 @@ import netaddr from neutron_lib.api.definitions import portbindings as pbs from neutron_lib import constants -from neutron.agent.linux import async_process +from neutron.agent.common import async_process from neutron.agent.linux import ip_lib from neutron.common import utils from neutron.tests.common import machine_fixtures diff --git a/neutron/tests/fullstack/resources/process.py b/neutron/tests/fullstack/resources/process.py index 0f9ff1f4b56..214a2feeed9 100644 --- a/neutron/tests/fullstack/resources/process.py +++ b/neutron/tests/fullstack/resources/process.py @@ -24,7 +24,7 @@ from neutronclient.v2_0 import client from oslo_log import log as logging from oslo_utils import fileutils -from neutron.agent.linux import async_process +from neutron.agent.common import async_process from neutron.agent.linux import ip_lib from neutron.agent.linux import utils from neutron.common import utils as common_utils diff --git a/neutron/tests/functional/agent/linux/test_async_process.py b/neutron/tests/functional/agent/linux/test_async_process.py index 14cbec29681..72bd2fa138b 100644 --- a/neutron/tests/functional/agent/linux/test_async_process.py +++ b/neutron/tests/functional/agent/linux/test_async_process.py @@ -16,7 +16,7 @@ import eventlet import six from neutron._i18n import _ -from neutron.agent.linux import async_process +from neutron.agent.common import async_process from neutron.agent.linux import utils from neutron.common import utils as common_utils from neutron.tests import base diff --git a/neutron/tests/functional/agent/linux/test_ip_monitor.py b/neutron/tests/functional/agent/linux/test_ip_monitor.py index f497a402bd2..ae1a0379ee3 100644 --- a/neutron/tests/functional/agent/linux/test_ip_monitor.py +++ b/neutron/tests/functional/agent/linux/test_ip_monitor.py @@ -13,7 +13,7 @@ # License for the specific language governing permissions and limitations # under the License. -from neutron.agent.linux import async_process +from neutron.agent.common import async_process from neutron.agent.linux import ip_monitor from neutron.tests.functional.agent.linux import test_ip_lib diff --git a/neutron/tests/functional/agent/linux/test_ovsdb_monitor.py b/neutron/tests/functional/agent/linux/test_ovsdb_monitor.py index e644e68f2d2..e041bfd740e 100644 --- a/neutron/tests/functional/agent/linux/test_ovsdb_monitor.py +++ b/neutron/tests/functional/agent/linux/test_ovsdb_monitor.py @@ -25,7 +25,7 @@ Tests in this module will be skipped unless: from oslo_config import cfg from neutron.agent.common import ovs_lib -from neutron.agent.linux import ovsdb_monitor +from neutron.agent.common import ovsdb_monitor from neutron.common import utils from neutron.tests import base from neutron.tests.common import net_helpers diff --git a/neutron/tests/functional/agent/linux/test_utils.py b/neutron/tests/functional/agent/linux/test_utils.py index bfe4b9141e7..2b9d4bb7467 100644 --- a/neutron/tests/functional/agent/linux/test_utils.py +++ b/neutron/tests/functional/agent/linux/test_utils.py @@ -14,7 +14,7 @@ import functools -from neutron.agent.linux import async_process +from neutron.agent.common import async_process from neutron.agent.linux import utils from neutron.common import utils as common_utils from neutron.tests.functional.agent.linux import test_async_process diff --git a/neutron/tests/unit/agent/linux/test_async_process.py b/neutron/tests/unit/agent/common/test_async_process.py similarity index 99% rename from neutron/tests/unit/agent/linux/test_async_process.py rename to neutron/tests/unit/agent/common/test_async_process.py index e86375ff9fc..3a26ca6401b 100644 --- a/neutron/tests/unit/agent/linux/test_async_process.py +++ b/neutron/tests/unit/agent/common/test_async_process.py @@ -19,8 +19,8 @@ import eventlet.queue import mock import testtools +from neutron.agent.common import async_process from neutron.agent.common import utils -from neutron.agent.linux import async_process from neutron.tests import base from neutron.tests.unit.agent.linux import failing_process diff --git a/neutron/tests/unit/agent/linux/test_ovsdb_monitor.py b/neutron/tests/unit/agent/common/test_ovsdb_monitor.py similarity index 63% rename from neutron/tests/unit/agent/linux/test_ovsdb_monitor.py rename to neutron/tests/unit/agent/common/test_ovsdb_monitor.py index 28c6ae82f24..fc645ee61d9 100644 --- a/neutron/tests/unit/agent/linux/test_ovsdb_monitor.py +++ b/neutron/tests/unit/agent/common/test_ovsdb_monitor.py @@ -14,8 +14,9 @@ import mock +from neutron.agent.common import async_process from neutron.agent.common import ovs_lib -from neutron.agent.linux import ovsdb_monitor +from neutron.agent.common import ovsdb_monitor from neutron.agent.ovsdb.native import helpers from neutron.tests import base @@ -29,32 +30,31 @@ class TestOvsdbMonitor(base.BaseTestCase): def test___init__(self): ovsdb_monitor.OvsdbMonitor('Interface') - def test___init___with_columns(self): + @mock.patch.object(async_process.AsyncProcess, '__init__') + def test___init___with_columns(self, init): columns = ['col1', 'col2'] - with mock.patch( - 'neutron.agent.linux.async_process.AsyncProcess.__init__') as init: - ovsdb_monitor.OvsdbMonitor('Interface', columns=columns) - cmd = init.call_args_list[0][0][0] - self.assertEqual('col1,col2', cmd[-1]) - def test___init___with_format(self): - with mock.patch( - 'neutron.agent.linux.async_process.AsyncProcess.__init__') as init: - ovsdb_monitor.OvsdbMonitor('Interface', format='blob') - cmd = init.call_args_list[0][0][0] - self.assertEqual('--format=blob', cmd[-1]) + ovsdb_monitor.OvsdbMonitor('Interface', columns=columns) + cmd = init.call_args_list[0][0][0] + self.assertEqual('col1,col2', cmd[-1]) - def test__init__with_connection_columns(self): + @mock.patch.object(async_process.AsyncProcess, '__init__') + def test___init___with_format(self, init): + ovsdb_monitor.OvsdbMonitor('Interface', format='blob') + cmd = init.call_args_list[0][0][0] + self.assertEqual('--format=blob', cmd[-1]) + + @mock.patch.object(async_process.AsyncProcess, '__init__') + def test__init__with_connection_columns(self, init): conn_info = 'tcp:10.10.10.10:6640' columns = ['col1', 'col2'] - with mock.patch( - 'neutron.agent.linux.async_process.AsyncProcess.__init__') as init: - ovsdb_monitor.OvsdbMonitor('Interface', columns=columns, - ovsdb_connection=conn_info) - cmd_all = init.call_args_list[0][0][0] - cmd_expect = ['ovsdb-client', 'monitor', 'tcp:10.10.10.10:6640', - 'Interface', 'col1,col2'] - self.assertEqual(cmd_expect, cmd_all) + + ovsdb_monitor.OvsdbMonitor('Interface', columns=columns, + ovsdb_connection=conn_info) + cmd_all = init.call_args_list[0][0][0] + cmd_expect = ['ovsdb-client', 'monitor', 'tcp:10.10.10.10:6640', + 'Interface', 'col1,col2'] + self.assertEqual(cmd_expect, cmd_all) class TestSimpleInterfaceMonitor(base.BaseTestCase): @@ -64,9 +64,7 @@ class TestSimpleInterfaceMonitor(base.BaseTestCase): self.monitor = ovsdb_monitor.SimpleInterfaceMonitor() def test_has_updates_is_false_if_active_with_no_output(self): - target = ('neutron.agent.linux.ovsdb_monitor.SimpleInterfaceMonitor' - '.is_active') - with mock.patch(target, return_value=True): + with mock.patch.object(self.monitor, 'is_active', return_value=True): self.assertFalse(self.monitor.has_updates) def test_has_updates_after_calling_get_events_is_false(self): diff --git a/neutron/tests/unit/agent/linux/test_polling.py b/neutron/tests/unit/agent/linux/test_polling.py index b8471ae8ca2..ef9f44f12ae 100644 --- a/neutron/tests/unit/agent/linux/test_polling.py +++ b/neutron/tests/unit/agent/linux/test_polling.py @@ -59,7 +59,7 @@ class TestInterfacePollingMinimizer(base.BaseTestCase): mock_stop.assert_called_with() def mock_has_updates(self, return_value): - target = ('neutron.agent.linux.ovsdb_monitor.SimpleInterfaceMonitor' + target = ('neutron.agent.common.ovsdb_monitor.SimpleInterfaceMonitor' '.has_updates') return mock.patch( target, diff --git a/neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/test_ovs_neutron_agent.py b/neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/test_ovs_neutron_agent.py index f9f915b2cdd..a2aa163b187 100644 --- a/neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/test_ovs_neutron_agent.py +++ b/neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/test_ovs_neutron_agent.py @@ -24,9 +24,9 @@ import oslo_messaging import testtools from neutron._i18n import _ +from neutron.agent.common import async_process from neutron.agent.common import ovs_lib from neutron.agent.common import utils -from neutron.agent.linux import async_process from neutron.agent.linux import ip_lib from neutron.common import rpc as n_rpc from neutron.plugins.ml2.drivers.l2pop import rpc as l2pop_rpc @@ -1681,7 +1681,7 @@ class TestOvsNeutronAgent(object): 'physnet1': ex_br_mocks[1]}, bm_mock = mock.Mock() with mock.patch( - 'neutron.agent.linux.ovsdb_monitor.get_bridges_monitor', + 'neutron.agent.common.ovsdb_monitor.get_bridges_monitor', return_value=bm_mock),\ mock.patch.object( self.agent, @@ -1712,7 +1712,7 @@ class TestOvsNeutronAgent(object): with mock.patch( 'neutron.agent.common.polling.get_polling_manager' ) as mock_get_pm, mock.patch( - 'neutron.agent.linux.ovsdb_monitor.get_bridges_monitor' + 'neutron.agent.common.ovsdb_monitor.get_bridges_monitor' ) as mock_get_bm, mock.patch.object( self.agent, 'rpc_loop' ) as mock_loop, mock.patch.dict(