Merge "Trivial: Move platform independent modules to common dir"
This commit is contained in:
commit
a11087ec43
|
@ -0,0 +1,272 @@
|
|||
# Copyright 2013 Red Hat, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import signal
|
||||
|
||||
import eventlet
|
||||
import eventlet.event
|
||||
import eventlet.queue
|
||||
from neutron_lib.utils import helpers
|
||||
from oslo_log import log as logging
|
||||
|
||||
from neutron._i18n import _
|
||||
from neutron.agent.common import ip_lib
|
||||
from neutron.agent.common import utils
|
||||
from neutron.common import utils as common_utils
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class AsyncProcessException(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class AsyncProcess(object):
|
||||
"""Manages an asynchronous process.
|
||||
|
||||
This class spawns a new process via subprocess and uses
|
||||
greenthreads to read stderr and stdout asynchronously into queues
|
||||
that can be read via repeatedly calling iter_stdout() and
|
||||
iter_stderr().
|
||||
|
||||
If respawn_interval is non-zero, any error in communicating with
|
||||
the managed process will result in the process and greenthreads
|
||||
being cleaned up and the process restarted after the specified
|
||||
interval.
|
||||
|
||||
Example usage:
|
||||
|
||||
>>> import time
|
||||
>>> proc = AsyncProcess(['ping'])
|
||||
>>> proc.start()
|
||||
>>> time.sleep(5)
|
||||
>>> proc.stop()
|
||||
>>> for line in proc.iter_stdout():
|
||||
... print(line)
|
||||
"""
|
||||
|
||||
def __init__(self, cmd, run_as_root=False, respawn_interval=None,
|
||||
namespace=None, log_output=False, die_on_error=False):
|
||||
"""Constructor.
|
||||
|
||||
:param cmd: The list of command arguments to invoke.
|
||||
:param run_as_root: The process should run with elevated privileges.
|
||||
:param respawn_interval: Optional, the interval in seconds to wait
|
||||
to respawn after unexpected process death. Respawn will
|
||||
only be attempted if a value of 0 or greater is provided.
|
||||
:param namespace: Optional, start the command in the specified
|
||||
namespace.
|
||||
:param log_output: Optional, also log received output.
|
||||
:param die_on_error: Optional, kills the process on stderr output.
|
||||
"""
|
||||
self.cmd_without_namespace = cmd
|
||||
self._cmd = ip_lib.add_namespace_to_cmd(cmd, namespace)
|
||||
self.run_as_root = run_as_root
|
||||
if respawn_interval is not None and respawn_interval < 0:
|
||||
raise ValueError(_('respawn_interval must be >= 0 if provided.'))
|
||||
self.respawn_interval = respawn_interval
|
||||
self._process = None
|
||||
self._pid = None
|
||||
self._is_running = False
|
||||
self._kill_event = None
|
||||
self._reset_queues()
|
||||
self._watchers = []
|
||||
self.log_output = log_output
|
||||
self.die_on_error = die_on_error
|
||||
|
||||
@property
|
||||
def cmd(self):
|
||||
return ' '.join(self._cmd)
|
||||
|
||||
def _reset_queues(self):
|
||||
self._stdout_lines = eventlet.queue.LightQueue()
|
||||
self._stderr_lines = eventlet.queue.LightQueue()
|
||||
|
||||
def is_active(self):
|
||||
# If using sudo rootwrap as a root_helper, we have to wait until sudo
|
||||
# spawns rootwrap and rootwrap spawns the process. self.pid will make
|
||||
# sure to get the correct pid.
|
||||
return utils.pid_invoked_with_cmdline(
|
||||
self.pid, self.cmd_without_namespace)
|
||||
|
||||
def start(self, block=False):
|
||||
"""Launch a process and monitor it asynchronously.
|
||||
|
||||
:param block: Block until the process has started.
|
||||
:raises utils.WaitTimeout if blocking is True and the process
|
||||
did not start in time.
|
||||
"""
|
||||
LOG.debug('Launching async process [%s].', self.cmd)
|
||||
if self._is_running:
|
||||
raise AsyncProcessException(_('Process is already started'))
|
||||
else:
|
||||
self._spawn()
|
||||
|
||||
if block:
|
||||
common_utils.wait_until_true(self.is_active)
|
||||
|
||||
def stop(self, block=False, kill_signal=None):
|
||||
"""Halt the process and watcher threads.
|
||||
|
||||
:param block: Block until the process has stopped.
|
||||
:param kill_signal: Number of signal that will be sent to the process
|
||||
when terminating the process
|
||||
:raises utils.WaitTimeout if blocking is True and the process
|
||||
did not stop in time.
|
||||
"""
|
||||
kill_signal = kill_signal or getattr(signal, 'SIGKILL', signal.SIGTERM)
|
||||
if self._is_running:
|
||||
LOG.debug('Halting async process [%s].', self.cmd)
|
||||
self._kill(kill_signal)
|
||||
else:
|
||||
raise AsyncProcessException(_('Process is not running.'))
|
||||
|
||||
if block:
|
||||
common_utils.wait_until_true(lambda: not self.is_active())
|
||||
|
||||
def _spawn(self):
|
||||
"""Spawn a process and its watchers."""
|
||||
self._is_running = True
|
||||
self._pid = None
|
||||
self._kill_event = eventlet.event.Event()
|
||||
self._process, cmd = utils.create_process(self._cmd,
|
||||
run_as_root=self.run_as_root)
|
||||
self._watchers = []
|
||||
for reader in (self._read_stdout, self._read_stderr):
|
||||
# Pass the stop event directly to the greenthread to
|
||||
# ensure that assignment of a new event to the instance
|
||||
# attribute does not prevent the greenthread from using
|
||||
# the original event.
|
||||
watcher = eventlet.spawn(self._watch_process,
|
||||
reader,
|
||||
self._kill_event)
|
||||
self._watchers.append(watcher)
|
||||
|
||||
@property
|
||||
def pid(self):
|
||||
if self._process:
|
||||
if not self._pid:
|
||||
self._pid = utils.get_root_helper_child_pid(
|
||||
self._process.pid,
|
||||
self.cmd_without_namespace,
|
||||
run_as_root=self.run_as_root)
|
||||
return self._pid
|
||||
|
||||
def _kill(self, kill_signal):
|
||||
"""Kill the process and the associated watcher greenthreads."""
|
||||
pid = self.pid
|
||||
if pid:
|
||||
self._is_running = False
|
||||
self._pid = None
|
||||
self._kill_process(pid, kill_signal)
|
||||
|
||||
# Halt the greenthreads if they weren't already.
|
||||
if self._kill_event:
|
||||
self._kill_event.send()
|
||||
self._kill_event = None
|
||||
|
||||
def _kill_process(self, pid, kill_signal):
|
||||
try:
|
||||
# A process started by a root helper will be running as
|
||||
# root and need to be killed via the same helper.
|
||||
utils.kill_process(pid, kill_signal, self.run_as_root)
|
||||
except Exception:
|
||||
LOG.exception('An error occurred while killing [%s].',
|
||||
self.cmd)
|
||||
return False
|
||||
|
||||
if self._process:
|
||||
self._process.wait()
|
||||
return True
|
||||
|
||||
def _handle_process_error(self):
|
||||
"""Kill the async process and respawn if necessary."""
|
||||
stdout = list(self.iter_stdout())
|
||||
stderr = list(self.iter_stderr())
|
||||
LOG.debug('Halting async process [%s] in response to an error. stdout:'
|
||||
' [%s] - stderr: [%s]', self.cmd, stdout, stderr)
|
||||
self._kill(getattr(signal, 'SIGKILL', signal.SIGTERM))
|
||||
if self.respawn_interval is not None and self.respawn_interval >= 0:
|
||||
eventlet.sleep(self.respawn_interval)
|
||||
LOG.debug('Respawning async process [%s].', self.cmd)
|
||||
try:
|
||||
self.start()
|
||||
except AsyncProcessException:
|
||||
# Process was already respawned by someone else...
|
||||
pass
|
||||
|
||||
def _watch_process(self, callback, kill_event):
|
||||
while not kill_event.ready():
|
||||
try:
|
||||
output = callback()
|
||||
if not output and output != "":
|
||||
break
|
||||
except Exception:
|
||||
LOG.exception('An error occurred while communicating '
|
||||
'with async process [%s].', self.cmd)
|
||||
break
|
||||
# Ensure that watching a process with lots of output does
|
||||
# not block execution of other greenthreads.
|
||||
eventlet.sleep()
|
||||
# self._is_running being True indicates that the loop was
|
||||
# broken out of due to an error in the watched process rather
|
||||
# than the loop condition being satisfied.
|
||||
if self._is_running:
|
||||
self._is_running = False
|
||||
self._handle_process_error()
|
||||
|
||||
def _read(self, stream, queue):
|
||||
data = stream.readline()
|
||||
if data:
|
||||
data = helpers.safe_decode_utf8(data.strip())
|
||||
queue.put(data)
|
||||
return data
|
||||
|
||||
def _read_stdout(self):
|
||||
data = self._read(self._process.stdout, self._stdout_lines)
|
||||
if self.log_output:
|
||||
LOG.debug('Output received from [%(cmd)s]: %(data)s',
|
||||
{'cmd': self.cmd,
|
||||
'data': data})
|
||||
return data
|
||||
|
||||
def _read_stderr(self):
|
||||
data = self._read(self._process.stderr, self._stderr_lines)
|
||||
if self.log_output:
|
||||
LOG.error('Error received from [%(cmd)s]: %(err)s',
|
||||
{'cmd': self.cmd,
|
||||
'err': data})
|
||||
if self.die_on_error:
|
||||
LOG.error("Process [%(cmd)s] dies due to the error: %(err)s",
|
||||
{'cmd': self.cmd,
|
||||
'err': data})
|
||||
# the callback caller will use None to indicate the need to bail
|
||||
# out of the thread
|
||||
return None
|
||||
|
||||
return data
|
||||
|
||||
def _iter_queue(self, queue, block):
|
||||
while True:
|
||||
try:
|
||||
yield queue.get(block=block)
|
||||
except eventlet.queue.Empty:
|
||||
break
|
||||
|
||||
def iter_stdout(self, block=False):
|
||||
return self._iter_queue(self._stdout_lines, block)
|
||||
|
||||
def iter_stderr(self, block=False):
|
||||
return self._iter_queue(self._stderr_lines, block)
|
|
@ -19,7 +19,7 @@ from oslo_config import cfg
|
|||
from oslo_log import log as logging
|
||||
from oslo_serialization import jsonutils
|
||||
|
||||
from neutron.agent.linux import async_process
|
||||
from neutron.agent.common import async_process
|
||||
from neutron.agent.ovsdb import api as ovsdb
|
||||
from neutron.agent.ovsdb.native import helpers
|
||||
from neutron.common import utils
|
|
@ -12,261 +12,17 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import signal
|
||||
from debtcollector import moves
|
||||
|
||||
import eventlet
|
||||
import eventlet.event
|
||||
import eventlet.queue
|
||||
from neutron_lib.utils import helpers
|
||||
from oslo_log import log as logging
|
||||
|
||||
from neutron._i18n import _
|
||||
from neutron.agent.common import ip_lib
|
||||
from neutron.agent.common import utils
|
||||
from neutron.common import utils as common_utils
|
||||
from neutron.agent.common import async_process
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
AsyncProcessException = moves.moved_class(
|
||||
async_process.AsyncProcessException,
|
||||
'AsyncProcessException',
|
||||
__name__)
|
||||
|
||||
|
||||
class AsyncProcessException(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class AsyncProcess(object):
|
||||
"""Manages an asynchronous process.
|
||||
|
||||
This class spawns a new process via subprocess and uses
|
||||
greenthreads to read stderr and stdout asynchronously into queues
|
||||
that can be read via repeatedly calling iter_stdout() and
|
||||
iter_stderr().
|
||||
|
||||
If respawn_interval is non-zero, any error in communicating with
|
||||
the managed process will result in the process and greenthreads
|
||||
being cleaned up and the process restarted after the specified
|
||||
interval.
|
||||
|
||||
Example usage:
|
||||
|
||||
>>> import time
|
||||
>>> proc = AsyncProcess(['ping'])
|
||||
>>> proc.start()
|
||||
>>> time.sleep(5)
|
||||
>>> proc.stop()
|
||||
>>> for line in proc.iter_stdout():
|
||||
... print(line)
|
||||
"""
|
||||
|
||||
def __init__(self, cmd, run_as_root=False, respawn_interval=None,
|
||||
namespace=None, log_output=False, die_on_error=False):
|
||||
"""Constructor.
|
||||
|
||||
:param cmd: The list of command arguments to invoke.
|
||||
:param run_as_root: The process should run with elevated privileges.
|
||||
:param respawn_interval: Optional, the interval in seconds to wait
|
||||
to respawn after unexpected process death. Respawn will
|
||||
only be attempted if a value of 0 or greater is provided.
|
||||
:param namespace: Optional, start the command in the specified
|
||||
namespace.
|
||||
:param log_output: Optional, also log received output.
|
||||
:param die_on_error: Optional, kills the process on stderr output.
|
||||
"""
|
||||
self.cmd_without_namespace = cmd
|
||||
self._cmd = ip_lib.add_namespace_to_cmd(cmd, namespace)
|
||||
self.run_as_root = run_as_root
|
||||
if respawn_interval is not None and respawn_interval < 0:
|
||||
raise ValueError(_('respawn_interval must be >= 0 if provided.'))
|
||||
self.respawn_interval = respawn_interval
|
||||
self._process = None
|
||||
self._pid = None
|
||||
self._is_running = False
|
||||
self._kill_event = None
|
||||
self._reset_queues()
|
||||
self._watchers = []
|
||||
self.log_output = log_output
|
||||
self.die_on_error = die_on_error
|
||||
|
||||
@property
|
||||
def cmd(self):
|
||||
return ' '.join(self._cmd)
|
||||
|
||||
def _reset_queues(self):
|
||||
self._stdout_lines = eventlet.queue.LightQueue()
|
||||
self._stderr_lines = eventlet.queue.LightQueue()
|
||||
|
||||
def is_active(self):
|
||||
# If using sudo rootwrap as a root_helper, we have to wait until sudo
|
||||
# spawns rootwrap and rootwrap spawns the process. self.pid will make
|
||||
# sure to get the correct pid.
|
||||
return utils.pid_invoked_with_cmdline(
|
||||
self.pid, self.cmd_without_namespace)
|
||||
|
||||
def start(self, block=False):
|
||||
"""Launch a process and monitor it asynchronously.
|
||||
|
||||
:param block: Block until the process has started.
|
||||
:raises utils.WaitTimeout if blocking is True and the process
|
||||
did not start in time.
|
||||
"""
|
||||
LOG.debug('Launching async process [%s].', self.cmd)
|
||||
if self._is_running:
|
||||
raise AsyncProcessException(_('Process is already started'))
|
||||
else:
|
||||
self._spawn()
|
||||
|
||||
if block:
|
||||
common_utils.wait_until_true(self.is_active)
|
||||
|
||||
def stop(self, block=False, kill_signal=None):
|
||||
"""Halt the process and watcher threads.
|
||||
|
||||
:param block: Block until the process has stopped.
|
||||
:param kill_signal: Number of signal that will be sent to the process
|
||||
when terminating the process
|
||||
:raises utils.WaitTimeout if blocking is True and the process
|
||||
did not stop in time.
|
||||
"""
|
||||
kill_signal = kill_signal or getattr(signal, 'SIGKILL', signal.SIGTERM)
|
||||
if self._is_running:
|
||||
LOG.debug('Halting async process [%s].', self.cmd)
|
||||
self._kill(kill_signal)
|
||||
else:
|
||||
raise AsyncProcessException(_('Process is not running.'))
|
||||
|
||||
if block:
|
||||
common_utils.wait_until_true(lambda: not self.is_active())
|
||||
|
||||
def _spawn(self):
|
||||
"""Spawn a process and its watchers."""
|
||||
self._is_running = True
|
||||
self._pid = None
|
||||
self._kill_event = eventlet.event.Event()
|
||||
self._process, cmd = utils.create_process(self._cmd,
|
||||
run_as_root=self.run_as_root)
|
||||
self._watchers = []
|
||||
for reader in (self._read_stdout, self._read_stderr):
|
||||
# Pass the stop event directly to the greenthread to
|
||||
# ensure that assignment of a new event to the instance
|
||||
# attribute does not prevent the greenthread from using
|
||||
# the original event.
|
||||
watcher = eventlet.spawn(self._watch_process,
|
||||
reader,
|
||||
self._kill_event)
|
||||
self._watchers.append(watcher)
|
||||
|
||||
@property
|
||||
def pid(self):
|
||||
if self._process:
|
||||
if not self._pid:
|
||||
self._pid = utils.get_root_helper_child_pid(
|
||||
self._process.pid,
|
||||
self.cmd_without_namespace,
|
||||
run_as_root=self.run_as_root)
|
||||
return self._pid
|
||||
|
||||
def _kill(self, kill_signal):
|
||||
"""Kill the process and the associated watcher greenthreads."""
|
||||
pid = self.pid
|
||||
if pid:
|
||||
self._is_running = False
|
||||
self._pid = None
|
||||
self._kill_process(pid, kill_signal)
|
||||
|
||||
# Halt the greenthreads if they weren't already.
|
||||
if self._kill_event:
|
||||
self._kill_event.send()
|
||||
self._kill_event = None
|
||||
|
||||
def _kill_process(self, pid, kill_signal):
|
||||
try:
|
||||
# A process started by a root helper will be running as
|
||||
# root and need to be killed via the same helper.
|
||||
utils.kill_process(pid, kill_signal, self.run_as_root)
|
||||
except Exception:
|
||||
LOG.exception('An error occurred while killing [%s].',
|
||||
self.cmd)
|
||||
return False
|
||||
|
||||
if self._process:
|
||||
self._process.wait()
|
||||
return True
|
||||
|
||||
def _handle_process_error(self):
|
||||
"""Kill the async process and respawn if necessary."""
|
||||
stdout = list(self.iter_stdout())
|
||||
stderr = list(self.iter_stderr())
|
||||
LOG.debug('Halting async process [%s] in response to an error. stdout:'
|
||||
' [%s] - stderr: [%s]', self.cmd, stdout, stderr)
|
||||
self._kill(getattr(signal, 'SIGKILL', signal.SIGTERM))
|
||||
if self.respawn_interval is not None and self.respawn_interval >= 0:
|
||||
eventlet.sleep(self.respawn_interval)
|
||||
LOG.debug('Respawning async process [%s].', self.cmd)
|
||||
try:
|
||||
self.start()
|
||||
except AsyncProcessException:
|
||||
# Process was already respawned by someone else...
|
||||
pass
|
||||
|
||||
def _watch_process(self, callback, kill_event):
|
||||
while not kill_event.ready():
|
||||
try:
|
||||
output = callback()
|
||||
if not output and output != "":
|
||||
break
|
||||
except Exception:
|
||||
LOG.exception('An error occurred while communicating '
|
||||
'with async process [%s].', self.cmd)
|
||||
break
|
||||
# Ensure that watching a process with lots of output does
|
||||
# not block execution of other greenthreads.
|
||||
eventlet.sleep()
|
||||
# self._is_running being True indicates that the loop was
|
||||
# broken out of due to an error in the watched process rather
|
||||
# than the loop condition being satisfied.
|
||||
if self._is_running:
|
||||
self._is_running = False
|
||||
self._handle_process_error()
|
||||
|
||||
def _read(self, stream, queue):
|
||||
data = stream.readline()
|
||||
if data:
|
||||
data = helpers.safe_decode_utf8(data.strip())
|
||||
queue.put(data)
|
||||
return data
|
||||
|
||||
def _read_stdout(self):
|
||||
data = self._read(self._process.stdout, self._stdout_lines)
|
||||
if self.log_output:
|
||||
LOG.debug('Output received from [%(cmd)s]: %(data)s',
|
||||
{'cmd': self.cmd,
|
||||
'data': data})
|
||||
return data
|
||||
|
||||
def _read_stderr(self):
|
||||
data = self._read(self._process.stderr, self._stderr_lines)
|
||||
if self.log_output:
|
||||
LOG.error('Error received from [%(cmd)s]: %(err)s',
|
||||
{'cmd': self.cmd,
|
||||
'err': data})
|
||||
if self.die_on_error:
|
||||
LOG.error("Process [%(cmd)s] dies due to the error: %(err)s",
|
||||
{'cmd': self.cmd,
|
||||
'err': data})
|
||||
# the callback caller will use None to indicate the need to bail
|
||||
# out of the thread
|
||||
return None
|
||||
|
||||
return data
|
||||
|
||||
def _iter_queue(self, queue, block):
|
||||
while True:
|
||||
try:
|
||||
yield queue.get(block=block)
|
||||
except eventlet.queue.Empty:
|
||||
break
|
||||
|
||||
def iter_stdout(self, block=False):
|
||||
return self._iter_queue(self._stdout_lines, block)
|
||||
|
||||
def iter_stderr(self, block=False):
|
||||
return self._iter_queue(self._stderr_lines, block)
|
||||
AsyncProcess = moves.moved_class(
|
||||
async_process.AsyncProcess,
|
||||
'AsyncProcess',
|
||||
__name__)
|
||||
|
|
|
@ -16,7 +16,7 @@
|
|||
from oslo_log import log as logging
|
||||
from oslo_utils import excutils
|
||||
|
||||
from neutron.agent.linux import async_process
|
||||
from neutron.agent.common import async_process
|
||||
from neutron.agent.linux import ip_lib
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
|
|
@ -18,9 +18,9 @@ import eventlet
|
|||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
|
||||
from neutron.agent.common import async_process
|
||||
from neutron.agent.common import base_polling
|
||||
from neutron.agent.linux import async_process
|
||||
from neutron.agent.linux import ovsdb_monitor
|
||||
from neutron.agent.common import ovsdb_monitor
|
||||
from neutron.plugins.ml2.drivers.openvswitch.agent.common import constants
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
|
|
@ -44,10 +44,10 @@ from six import moves
|
|||
from neutron._i18n import _
|
||||
from neutron.agent.common import ip_lib
|
||||
from neutron.agent.common import ovs_lib
|
||||
from neutron.agent.common import ovsdb_monitor
|
||||
from neutron.agent.common import polling
|
||||
from neutron.agent.common import utils
|
||||
from neutron.agent.l2 import l2_agent_extensions_manager as ext_manager
|
||||
from neutron.agent.linux import ovsdb_monitor
|
||||
from neutron.agent.linux import xenapi_root_helper
|
||||
from neutron.agent import rpc as agent_rpc
|
||||
from neutron.agent import securitygroups_rpc as agent_sg_rpc
|
||||
|
|
|
@ -18,7 +18,7 @@ import signal
|
|||
|
||||
from oslo_log import log as logging
|
||||
|
||||
from neutron.agent.linux import async_process
|
||||
from neutron.agent.common import async_process
|
||||
from neutron.agent.linux import iptables_manager
|
||||
from neutron.common import utils as common_utils
|
||||
|
||||
|
|
|
@ -20,7 +20,7 @@ import netaddr
|
|||
from neutron_lib.api.definitions import portbindings as pbs
|
||||
from neutron_lib import constants
|
||||
|
||||
from neutron.agent.linux import async_process
|
||||
from neutron.agent.common import async_process
|
||||
from neutron.agent.linux import ip_lib
|
||||
from neutron.common import utils
|
||||
from neutron.tests.common import machine_fixtures
|
||||
|
|
|
@ -24,7 +24,7 @@ from neutronclient.v2_0 import client
|
|||
from oslo_log import log as logging
|
||||
from oslo_utils import fileutils
|
||||
|
||||
from neutron.agent.linux import async_process
|
||||
from neutron.agent.common import async_process
|
||||
from neutron.agent.linux import ip_lib
|
||||
from neutron.agent.linux import utils
|
||||
from neutron.common import utils as common_utils
|
||||
|
|
|
@ -16,7 +16,7 @@ import eventlet
|
|||
import six
|
||||
|
||||
from neutron._i18n import _
|
||||
from neutron.agent.linux import async_process
|
||||
from neutron.agent.common import async_process
|
||||
from neutron.agent.linux import utils
|
||||
from neutron.common import utils as common_utils
|
||||
from neutron.tests import base
|
||||
|
|
|
@ -13,7 +13,7 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from neutron.agent.linux import async_process
|
||||
from neutron.agent.common import async_process
|
||||
from neutron.agent.linux import ip_monitor
|
||||
from neutron.tests.functional.agent.linux import test_ip_lib
|
||||
|
||||
|
|
|
@ -25,7 +25,7 @@ Tests in this module will be skipped unless:
|
|||
from oslo_config import cfg
|
||||
|
||||
from neutron.agent.common import ovs_lib
|
||||
from neutron.agent.linux import ovsdb_monitor
|
||||
from neutron.agent.common import ovsdb_monitor
|
||||
from neutron.common import utils
|
||||
from neutron.tests import base
|
||||
from neutron.tests.common import net_helpers
|
||||
|
|
|
@ -14,7 +14,7 @@
|
|||
|
||||
import functools
|
||||
|
||||
from neutron.agent.linux import async_process
|
||||
from neutron.agent.common import async_process
|
||||
from neutron.agent.linux import utils
|
||||
from neutron.common import utils as common_utils
|
||||
from neutron.tests.functional.agent.linux import test_async_process
|
||||
|
|
|
@ -19,8 +19,8 @@ import eventlet.queue
|
|||
import mock
|
||||
import testtools
|
||||
|
||||
from neutron.agent.common import async_process
|
||||
from neutron.agent.common import utils
|
||||
from neutron.agent.linux import async_process
|
||||
from neutron.tests import base
|
||||
from neutron.tests.unit.agent.linux import failing_process
|
||||
|
|
@ -14,8 +14,9 @@
|
|||
|
||||
import mock
|
||||
|
||||
from neutron.agent.common import async_process
|
||||
from neutron.agent.common import ovs_lib
|
||||
from neutron.agent.linux import ovsdb_monitor
|
||||
from neutron.agent.common import ovsdb_monitor
|
||||
from neutron.agent.ovsdb.native import helpers
|
||||
from neutron.tests import base
|
||||
|
||||
|
@ -29,32 +30,31 @@ class TestOvsdbMonitor(base.BaseTestCase):
|
|||
def test___init__(self):
|
||||
ovsdb_monitor.OvsdbMonitor('Interface')
|
||||
|
||||
def test___init___with_columns(self):
|
||||
@mock.patch.object(async_process.AsyncProcess, '__init__')
|
||||
def test___init___with_columns(self, init):
|
||||
columns = ['col1', 'col2']
|
||||
with mock.patch(
|
||||
'neutron.agent.linux.async_process.AsyncProcess.__init__') as init:
|
||||
ovsdb_monitor.OvsdbMonitor('Interface', columns=columns)
|
||||
cmd = init.call_args_list[0][0][0]
|
||||
self.assertEqual('col1,col2', cmd[-1])
|
||||
|
||||
def test___init___with_format(self):
|
||||
with mock.patch(
|
||||
'neutron.agent.linux.async_process.AsyncProcess.__init__') as init:
|
||||
ovsdb_monitor.OvsdbMonitor('Interface', format='blob')
|
||||
cmd = init.call_args_list[0][0][0]
|
||||
self.assertEqual('--format=blob', cmd[-1])
|
||||
ovsdb_monitor.OvsdbMonitor('Interface', columns=columns)
|
||||
cmd = init.call_args_list[0][0][0]
|
||||
self.assertEqual('col1,col2', cmd[-1])
|
||||
|
||||
def test__init__with_connection_columns(self):
|
||||
@mock.patch.object(async_process.AsyncProcess, '__init__')
|
||||
def test___init___with_format(self, init):
|
||||
ovsdb_monitor.OvsdbMonitor('Interface', format='blob')
|
||||
cmd = init.call_args_list[0][0][0]
|
||||
self.assertEqual('--format=blob', cmd[-1])
|
||||
|
||||
@mock.patch.object(async_process.AsyncProcess, '__init__')
|
||||
def test__init__with_connection_columns(self, init):
|
||||
conn_info = 'tcp:10.10.10.10:6640'
|
||||
columns = ['col1', 'col2']
|
||||
with mock.patch(
|
||||
'neutron.agent.linux.async_process.AsyncProcess.__init__') as init:
|
||||
ovsdb_monitor.OvsdbMonitor('Interface', columns=columns,
|
||||
ovsdb_connection=conn_info)
|
||||
cmd_all = init.call_args_list[0][0][0]
|
||||
cmd_expect = ['ovsdb-client', 'monitor', 'tcp:10.10.10.10:6640',
|
||||
'Interface', 'col1,col2']
|
||||
self.assertEqual(cmd_expect, cmd_all)
|
||||
|
||||
ovsdb_monitor.OvsdbMonitor('Interface', columns=columns,
|
||||
ovsdb_connection=conn_info)
|
||||
cmd_all = init.call_args_list[0][0][0]
|
||||
cmd_expect = ['ovsdb-client', 'monitor', 'tcp:10.10.10.10:6640',
|
||||
'Interface', 'col1,col2']
|
||||
self.assertEqual(cmd_expect, cmd_all)
|
||||
|
||||
|
||||
class TestSimpleInterfaceMonitor(base.BaseTestCase):
|
||||
|
@ -64,9 +64,7 @@ class TestSimpleInterfaceMonitor(base.BaseTestCase):
|
|||
self.monitor = ovsdb_monitor.SimpleInterfaceMonitor()
|
||||
|
||||
def test_has_updates_is_false_if_active_with_no_output(self):
|
||||
target = ('neutron.agent.linux.ovsdb_monitor.SimpleInterfaceMonitor'
|
||||
'.is_active')
|
||||
with mock.patch(target, return_value=True):
|
||||
with mock.patch.object(self.monitor, 'is_active', return_value=True):
|
||||
self.assertFalse(self.monitor.has_updates)
|
||||
|
||||
def test_has_updates_after_calling_get_events_is_false(self):
|
|
@ -59,7 +59,7 @@ class TestInterfacePollingMinimizer(base.BaseTestCase):
|
|||
mock_stop.assert_called_with()
|
||||
|
||||
def mock_has_updates(self, return_value):
|
||||
target = ('neutron.agent.linux.ovsdb_monitor.SimpleInterfaceMonitor'
|
||||
target = ('neutron.agent.common.ovsdb_monitor.SimpleInterfaceMonitor'
|
||||
'.has_updates')
|
||||
return mock.patch(
|
||||
target,
|
||||
|
|
|
@ -24,9 +24,9 @@ import oslo_messaging
|
|||
import testtools
|
||||
|
||||
from neutron._i18n import _
|
||||
from neutron.agent.common import async_process
|
||||
from neutron.agent.common import ovs_lib
|
||||
from neutron.agent.common import utils
|
||||
from neutron.agent.linux import async_process
|
||||
from neutron.agent.linux import ip_lib
|
||||
from neutron.common import constants as c_const
|
||||
from neutron.common import rpc as n_rpc
|
||||
|
@ -1770,7 +1770,7 @@ class TestOvsNeutronAgent(object):
|
|||
'physnet1': ex_br_mocks[1]},
|
||||
bm_mock = mock.Mock()
|
||||
with mock.patch(
|
||||
'neutron.agent.linux.ovsdb_monitor.get_bridges_monitor',
|
||||
'neutron.agent.common.ovsdb_monitor.get_bridges_monitor',
|
||||
return_value=bm_mock),\
|
||||
mock.patch.object(
|
||||
self.agent,
|
||||
|
@ -1801,7 +1801,7 @@ class TestOvsNeutronAgent(object):
|
|||
with mock.patch(
|
||||
'neutron.agent.common.polling.get_polling_manager'
|
||||
) as mock_get_pm, mock.patch(
|
||||
'neutron.agent.linux.ovsdb_monitor.get_bridges_monitor'
|
||||
'neutron.agent.common.ovsdb_monitor.get_bridges_monitor'
|
||||
) as mock_get_bm, mock.patch.object(
|
||||
self.agent, 'rpc_loop'
|
||||
) as mock_loop, mock.patch.dict(
|
||||
|
|
Loading…
Reference in New Issue