Merge "Refactor the ProcessMonitor API"
This commit is contained in:
commit
35cbd28a61
|
@ -361,7 +361,7 @@ class DhcpAgent(manager.Manager):
|
|||
|
||||
def disable_isolated_metadata_proxy(self, network):
|
||||
metadata_driver.MetadataDriver.destroy_monitored_metadata_proxy(
|
||||
self._process_monitor, network.id, network.namespace)
|
||||
self._process_monitor, network.id, network.namespace, self.conf)
|
||||
|
||||
|
||||
class DhcpPluginApi(object):
|
||||
|
|
|
@ -23,13 +23,14 @@ import netaddr
|
|||
from oslo_utils import importutils
|
||||
import six
|
||||
|
||||
from neutron.agent.linux import external_process
|
||||
from neutron.agent.linux import ip_lib
|
||||
from neutron.agent.linux import utils
|
||||
from neutron.common import constants
|
||||
from neutron.common import exceptions
|
||||
from neutron.common import ipv6_utils
|
||||
from neutron.common import utils as commonutils
|
||||
from neutron.i18n import _LE, _LI
|
||||
from neutron.i18n import _LE, _LI, _LW
|
||||
from neutron.openstack.common import log as logging
|
||||
from neutron.openstack.common import uuidutils
|
||||
|
||||
|
@ -206,31 +207,37 @@ class DhcpLocalProcess(DhcpBase):
|
|||
self.interface_name = interface_name
|
||||
self.spawn_process()
|
||||
|
||||
def _get_process_manager(self, cmd_callback=None):
|
||||
return external_process.ProcessManager(
|
||||
conf=self.conf,
|
||||
uuid=self.network.id,
|
||||
namespace=self.network.namespace,
|
||||
default_cmd_callback=cmd_callback,
|
||||
pid_file=self.get_conf_file_name('pid'))
|
||||
|
||||
def disable(self, retain_port=False):
|
||||
"""Disable DHCP for this network by killing the local process."""
|
||||
pid_filename = self.get_conf_file_name('pid')
|
||||
|
||||
pid = self.process_monitor.get_pid(uuid=self.network.id,
|
||||
service=DNSMASQ_SERVICE_NAME,
|
||||
pid_file=pid_filename)
|
||||
|
||||
self.process_monitor.disable(uuid=self.network.id,
|
||||
namespace=self.network.namespace,
|
||||
service=DNSMASQ_SERVICE_NAME,
|
||||
pid_file=pid_filename)
|
||||
if pid and not retain_port:
|
||||
self.device_manager.destroy(self.network, self.interface_name)
|
||||
self.process_monitor.unregister(self.network.id, DNSMASQ_SERVICE_NAME)
|
||||
self._get_process_manager().disable()
|
||||
|
||||
self._remove_config_files()
|
||||
|
||||
if not retain_port:
|
||||
if self.conf.dhcp_delete_namespaces and self.network.namespace:
|
||||
ns_ip = ip_lib.IPWrapper(namespace=self.network.namespace)
|
||||
try:
|
||||
ns_ip.netns.delete(self.network.namespace)
|
||||
except RuntimeError:
|
||||
LOG.exception(_LE('Failed trying to delete namespace: %s'),
|
||||
self.network.namespace)
|
||||
self._destroy_namespace_and_port()
|
||||
|
||||
def _destroy_namespace_and_port(self):
|
||||
try:
|
||||
self.device_manager.destroy(self.network, self.interface_name)
|
||||
except RuntimeError:
|
||||
LOG.warning(_LW('Failed trying to delete interface: %s'),
|
||||
self.interface_name)
|
||||
|
||||
if self.conf.dhcp_delete_namespaces and self.network.namespace:
|
||||
ns_ip = ip_lib.IPWrapper(namespace=self.network.namespace)
|
||||
try:
|
||||
ns_ip.netns.delete(self.network.namespace)
|
||||
except RuntimeError:
|
||||
LOG.warning(_LW('Failed trying to delete namespace: %s'),
|
||||
self.network.namespace)
|
||||
|
||||
def _get_value_from_conf_file(self, kind, converter=None):
|
||||
"""A helper function to read a value from one of the state files."""
|
||||
|
@ -260,10 +267,7 @@ class DhcpLocalProcess(DhcpBase):
|
|||
|
||||
@property
|
||||
def active(self):
|
||||
pid_filename = self.get_conf_file_name('pid')
|
||||
return self.process_monitor.is_active(self.network.id,
|
||||
DNSMASQ_SERVICE_NAME,
|
||||
pid_file=pid_filename)
|
||||
return self._get_process_manager().active
|
||||
|
||||
@abc.abstractmethod
|
||||
def spawn_process(self):
|
||||
|
@ -383,15 +387,14 @@ class Dnsmasq(DhcpLocalProcess):
|
|||
|
||||
self._output_config_files()
|
||||
|
||||
pid_filename = self.get_conf_file_name('pid')
|
||||
pm = self._get_process_manager(
|
||||
cmd_callback=self._build_cmdline_callback)
|
||||
|
||||
self.process_monitor.enable(
|
||||
uuid=self.network.id,
|
||||
cmd_callback=self._build_cmdline_callback,
|
||||
namespace=self.network.namespace,
|
||||
service=DNSMASQ_SERVICE_NAME,
|
||||
reload_cfg=reload_with_HUP,
|
||||
pid_file=pid_filename)
|
||||
pm.enable(reload_cfg=reload_with_HUP)
|
||||
|
||||
self.process_monitor.register(uuid=self.network.id,
|
||||
service_name=DNSMASQ_SERVICE_NAME,
|
||||
monitored_process=pm)
|
||||
|
||||
def _release_lease(self, mac_address, ip):
|
||||
"""Release a DHCP lease."""
|
||||
|
|
|
@ -12,8 +12,10 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import abc
|
||||
import collections
|
||||
import os.path
|
||||
import six
|
||||
|
||||
import eventlet
|
||||
from oslo_concurrency import lockutils
|
||||
|
@ -40,7 +42,18 @@ cfg.CONF.register_opts(OPTS)
|
|||
agent_cfg.register_process_monitor_opts(cfg.CONF)
|
||||
|
||||
|
||||
class ProcessManager(object):
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class MonitoredProcess(object):
|
||||
@abc.abstractproperty
|
||||
def active(self):
|
||||
"""Boolean representing the running state of the process."""
|
||||
|
||||
@abc.abstractmethod
|
||||
def enable(self):
|
||||
"""Enable the service, or respawn the process."""
|
||||
|
||||
|
||||
class ProcessManager(MonitoredProcess):
|
||||
"""An external process manager for Neutron spawned processes.
|
||||
|
||||
Note: The manager expects uuid to be in cmdline.
|
||||
|
@ -140,118 +153,57 @@ class ProcessMonitor(object):
|
|||
self._config = config
|
||||
self._resource_type = resource_type
|
||||
|
||||
self._process_managers = {}
|
||||
self._monitored_processes = {}
|
||||
|
||||
if self._config.AGENT.check_child_processes_interval:
|
||||
self._spawn_checking_thread()
|
||||
|
||||
def enable(self, uuid, cmd_callback, namespace=None, service=None,
|
||||
reload_cfg=False, cmd_addl_env=None, pid_file=None):
|
||||
"""Creates a process manager and ensures that it is monitored.
|
||||
def register(self, uuid, service_name, monitored_process):
|
||||
"""Start monitoring a process.
|
||||
|
||||
It will create a new ProcessManager and tie it to the uuid/service
|
||||
with the new settings, replacing the old one if it existed already.
|
||||
The given monitored_process will be tied to it's uuid+service_name
|
||||
replacing the old one if it existed already.
|
||||
|
||||
:param uuid: UUID of the resource this process will serve for.
|
||||
:param cmd_callback: Callback function that receives a pid_file
|
||||
location and returns a list with the command
|
||||
and arguments.
|
||||
:param namespace: network namespace to run the process in, if
|
||||
necessary.
|
||||
:param service: a logical name for the service this process provides,
|
||||
it will extend the pid file like pid.%(service)s.
|
||||
:param reload_cfg: if the process is active send a HUP signal
|
||||
for configuration reload, otherwise spawn it
|
||||
normally.
|
||||
:param cmd_addl_env: additional environment variables for the
|
||||
spawned process.
|
||||
:param pid_file: the pid file to store the pid of the external
|
||||
process. If not provided, a default will be used.
|
||||
The monitored_process should be enabled before registration,
|
||||
otherwise ProcessMonitor could try to enable the process itself,
|
||||
which could lead to double enable and if unlucky enough, two processes
|
||||
running, and also errors in the logs.
|
||||
|
||||
:param uuid: An ID of the resource for which the process is running.
|
||||
:param service_name: A logical service name for this process monitor,
|
||||
so the same uuid provided via process manager
|
||||
can reference several different services.
|
||||
:param monitored_process: MonitoredProcess we want to monitor.
|
||||
"""
|
||||
process_manager = self._create_process_manager(
|
||||
uuid=uuid,
|
||||
cmd_callback=cmd_callback,
|
||||
namespace=namespace,
|
||||
service=service,
|
||||
cmd_addl_env=cmd_addl_env,
|
||||
pid_file=pid_file)
|
||||
|
||||
process_manager.enable(reload_cfg=reload_cfg)
|
||||
service_id = ServiceId(uuid, service)
|
||||
service_id = ServiceId(uuid, service_name)
|
||||
self._monitored_processes[service_id] = monitored_process
|
||||
|
||||
# replace the old process manager with the new one
|
||||
self._process_managers[service_id] = process_manager
|
||||
def unregister(self, uuid, service_name):
|
||||
"""Stop monitoring a process.
|
||||
|
||||
def disable(self, uuid, namespace=None, service=None,
|
||||
pid_file=None):
|
||||
"""Disables the process and stops monitoring it."""
|
||||
service_id = ServiceId(uuid, service)
|
||||
The uuid+service_name will be removed from the monitored processes.
|
||||
|
||||
process_manager = self._ensure_process_manager(
|
||||
uuid=uuid,
|
||||
service=service,
|
||||
pid_file=pid_file,
|
||||
namespace=namespace)
|
||||
self._process_managers.pop(service_id, None)
|
||||
The service must be disabled **after** unregistering, otherwise if
|
||||
process monitor checks after you disable the process, and before
|
||||
you unregister it, the process will be respawned, and left orphaned
|
||||
into the system.
|
||||
|
||||
process_manager.disable()
|
||||
:param uuid: An ID of the resource for which the process is running.
|
||||
:param service_name: A logical service name for this process monitor,
|
||||
so the same uuid provided via process manager
|
||||
can reference several different services.
|
||||
"""
|
||||
|
||||
def disable_all(self):
|
||||
for service_id in self._process_managers.keys():
|
||||
self.disable(uuid=service_id.uuid, service=service_id.service)
|
||||
service_id = ServiceId(uuid, service_name)
|
||||
self._monitored_processes.pop(service_id, None)
|
||||
|
||||
def get_process_manager(self, uuid, service=None):
|
||||
"""Returns a process manager for manipulation"""
|
||||
service_id = ServiceId(uuid, service)
|
||||
return self._process_managers.get(service_id)
|
||||
|
||||
def is_active(self, uuid, service=None, pid_file=None):
|
||||
return self._ensure_process_manager(
|
||||
uuid=uuid,
|
||||
service=service,
|
||||
pid_file=pid_file).active
|
||||
|
||||
def get_pid(self, uuid, service=None, pid_file=None):
|
||||
return self._ensure_process_manager(
|
||||
uuid=uuid,
|
||||
service=service,
|
||||
pid_file=pid_file).pid
|
||||
|
||||
def get_pid_file_name(self, uuid, service=None):
|
||||
return self._ensure_process_manager(
|
||||
uuid=uuid,
|
||||
service=service).get_pid_file_name()
|
||||
|
||||
def _ensure_process_manager(self, uuid, cmd_callback=None,
|
||||
namespace=None, service=None,
|
||||
cmd_addl_env=None,
|
||||
pid_file=None,
|
||||
):
|
||||
|
||||
process_manager = self.get_process_manager(uuid, service)
|
||||
if not process_manager:
|
||||
# if the process existed in a different run of the agent
|
||||
# provide one, generally for pid / active evaluation
|
||||
process_manager = self._create_process_manager(
|
||||
uuid=uuid,
|
||||
cmd_callback=cmd_callback,
|
||||
namespace=namespace,
|
||||
service=service,
|
||||
cmd_addl_env=cmd_addl_env,
|
||||
pid_file=pid_file)
|
||||
return process_manager
|
||||
|
||||
def _create_process_manager(self, uuid, cmd_callback, namespace, service,
|
||||
cmd_addl_env, pid_file):
|
||||
return ProcessManager(conf=self._config,
|
||||
uuid=uuid,
|
||||
namespace=namespace,
|
||||
service=service,
|
||||
default_cmd_callback=cmd_callback,
|
||||
cmd_addl_env=cmd_addl_env,
|
||||
pid_file=pid_file)
|
||||
def stop(self):
|
||||
"""Stop the process monitoring. """
|
||||
self._monitor_processes = False
|
||||
|
||||
def _spawn_checking_thread(self):
|
||||
self._monitor_processes = True
|
||||
eventlet.spawn(self._periodic_checking_thread)
|
||||
|
||||
@lockutils.synchronized("_check_child_processes")
|
||||
|
@ -259,8 +211,8 @@ class ProcessMonitor(object):
|
|||
# we build the list of keys before iterating in the loop to cover
|
||||
# the case where other threads add or remove items from the
|
||||
# dictionary which otherwise will cause a RuntimeError
|
||||
for service_id in list(self._process_managers):
|
||||
pm = self._process_managers.get(service_id)
|
||||
for service_id in list(self._monitored_processes):
|
||||
pm = self._monitored_processes.get(service_id)
|
||||
|
||||
if pm and not pm.active:
|
||||
LOG.error(_LE("%(service)s for %(resource_type)s "
|
||||
|
@ -273,7 +225,7 @@ class ProcessMonitor(object):
|
|||
eventlet.sleep(0)
|
||||
|
||||
def _periodic_checking_thread(self):
|
||||
while True:
|
||||
while self._monitor_processes:
|
||||
eventlet.sleep(self._config.AGENT.check_child_processes_interval)
|
||||
eventlet.spawn(self._check_child_processes)
|
||||
|
||||
|
@ -286,7 +238,7 @@ class ProcessMonitor(object):
|
|||
LOG.error(_LE("respawning %(service)s for uuid %(uuid)s"),
|
||||
{'service': service_id.service,
|
||||
'uuid': service_id.uuid})
|
||||
self._process_managers[service_id].enable()
|
||||
self._monitored_processes[service_id].enable()
|
||||
|
||||
def _exit_action(self, service_id):
|
||||
LOG.error(_LE("Exiting agent as programmed in check_child_processes_"
|
||||
|
|
|
@ -18,6 +18,7 @@ import netaddr
|
|||
from oslo_config import cfg
|
||||
import six
|
||||
|
||||
from neutron.agent.linux import external_process
|
||||
from neutron.agent.linux import utils
|
||||
from neutron.common import constants
|
||||
from neutron.openstack.common import log as logging
|
||||
|
@ -90,6 +91,14 @@ class DaemonMonitor(object):
|
|||
utils.replace_file(radvd_conf, buf.getvalue())
|
||||
return radvd_conf
|
||||
|
||||
def _get_radvd_process_manager(self, callback=None):
|
||||
return external_process.ProcessManager(
|
||||
uuid=self._router_id,
|
||||
default_cmd_callback=callback,
|
||||
namespace=self._router_ns,
|
||||
service=RADVD_SERVICE_NAME,
|
||||
conf=cfg.CONF)
|
||||
|
||||
def _spawn_radvd(self, radvd_conf):
|
||||
def callback(pid_file):
|
||||
# we need to use -m syslog and f.e. not -m stderr (the default)
|
||||
|
@ -103,11 +112,11 @@ class DaemonMonitor(object):
|
|||
'-m', 'syslog']
|
||||
return radvd_cmd
|
||||
|
||||
self._process_monitor.enable(uuid=self._router_id,
|
||||
cmd_callback=callback,
|
||||
namespace=self._router_ns,
|
||||
service=RADVD_SERVICE_NAME,
|
||||
reload_cfg=True)
|
||||
pm = self._get_radvd_process_manager(callback)
|
||||
pm.enable(reload_cfg=True)
|
||||
self._process_monitor.register(uuid=self._router_id,
|
||||
service_name=RADVD_SERVICE_NAME,
|
||||
monitored_process=pm)
|
||||
LOG.debug("radvd enabled for router %s", self._router_id)
|
||||
|
||||
def enable(self, router_ports):
|
||||
|
@ -124,12 +133,13 @@ class DaemonMonitor(object):
|
|||
self._spawn_radvd(radvd_conf)
|
||||
|
||||
def disable(self):
|
||||
self._process_monitor.disable(self._router_id,
|
||||
service=RADVD_SERVICE_NAME)
|
||||
self._process_monitor.unregister(uuid=self._router_id,
|
||||
service_name=RADVD_SERVICE_NAME)
|
||||
pm = self._get_radvd_process_manager()
|
||||
pm.disable()
|
||||
utils.remove_conf_files(cfg.CONF.ra_confs, self._router_id)
|
||||
LOG.debug("radvd disabled for router %s", self._router_id)
|
||||
|
||||
@property
|
||||
def enabled(self):
|
||||
return self._process_monitor.is_active(self._router_id,
|
||||
RADVD_SERVICE_NAME)
|
||||
return self._get_radvd_process_manager().active
|
||||
|
|
|
@ -27,6 +27,7 @@ LOG = logging.getLogger(__name__)
|
|||
|
||||
# Access with redirection to metadata proxy iptables mark mask
|
||||
METADATA_ACCESS_MARK_MASK = '0xffffffff'
|
||||
METADATA_SERVICE_NAME = 'metadata-proxy'
|
||||
|
||||
|
||||
class MetadataDriver(advanced_service.AdvancedService):
|
||||
|
@ -83,7 +84,8 @@ class MetadataDriver(advanced_service.AdvancedService):
|
|||
|
||||
self.destroy_monitored_metadata_proxy(self.l3_agent.process_monitor,
|
||||
router.router['id'],
|
||||
router.ns_name)
|
||||
router.ns_name,
|
||||
self.l3_agent.conf)
|
||||
|
||||
@classmethod
|
||||
def metadata_filter_rules(cls, port, mark):
|
||||
|
@ -143,20 +145,25 @@ class MetadataDriver(advanced_service.AdvancedService):
|
|||
@classmethod
|
||||
def spawn_monitored_metadata_proxy(cls, monitor, ns_name, port, conf,
|
||||
network_id=None, router_id=None):
|
||||
uuid = network_id or router_id
|
||||
callback = cls._get_metadata_proxy_callback(
|
||||
port, conf, network_id=network_id, router_id=router_id)
|
||||
monitor.enable(network_id or router_id, callback, ns_name)
|
||||
pm = cls._get_metadata_proxy_process_manager(uuid, ns_name, conf,
|
||||
callback=callback)
|
||||
pm.enable()
|
||||
monitor.register(uuid, METADATA_SERVICE_NAME, pm)
|
||||
|
||||
@classmethod
|
||||
def destroy_monitored_metadata_proxy(cls, monitor, uuid, ns_name):
|
||||
monitor.disable(uuid, ns_name)
|
||||
def destroy_monitored_metadata_proxy(cls, monitor, uuid, ns_name, conf):
|
||||
monitor.unregister(uuid, METADATA_SERVICE_NAME)
|
||||
pm = cls._get_metadata_proxy_process_manager(uuid, ns_name, conf)
|
||||
pm.disable()
|
||||
|
||||
# TODO(mangelajo): remove the unmonitored _get_*_process_manager,
|
||||
# _spawn_* and _destroy* when keepalived stops
|
||||
# spawning and killing proxies on its own.
|
||||
@classmethod
|
||||
def _get_metadata_proxy_process_manager(cls, router_id, ns_name, conf):
|
||||
def _get_metadata_proxy_process_manager(cls, router_id, ns_name, conf,
|
||||
callback=None):
|
||||
return external_process.ProcessManager(
|
||||
conf,
|
||||
router_id,
|
||||
ns_name)
|
||||
conf=conf,
|
||||
uuid=router_id,
|
||||
namespace=ns_name,
|
||||
default_cmd_callback=callback)
|
||||
|
|
|
@ -23,7 +23,6 @@ from neutron.agent.common import config as agent_config
|
|||
from neutron.agent.dhcp import config as dhcp_config
|
||||
from neutron.agent.l3 import agent as l3_agent
|
||||
from neutron.agent.linux import dhcp
|
||||
from neutron.agent.linux import external_process
|
||||
from neutron.agent.linux import interface
|
||||
from neutron.agent.linux import ip_lib
|
||||
from neutron.agent.linux import ovs_lib
|
||||
|
@ -70,12 +69,6 @@ def setup_conf():
|
|||
return conf
|
||||
|
||||
|
||||
def _get_dhcp_process_monitor(config):
|
||||
return external_process.ProcessMonitor(
|
||||
config=config,
|
||||
resource_type='dhcp')
|
||||
|
||||
|
||||
def kill_dhcp(conf, namespace):
|
||||
"""Disable DHCP for a network if DHCP is still active."""
|
||||
network_id = namespace.replace(dhcp.NS_PREFIX, '')
|
||||
|
@ -83,7 +76,6 @@ def kill_dhcp(conf, namespace):
|
|||
dhcp_driver = importutils.import_object(
|
||||
conf.dhcp_driver,
|
||||
conf=conf,
|
||||
process_monitor=_get_dhcp_process_monitor(conf),
|
||||
network=dhcp.NetModel(conf.use_namespaces, {'id': network_id}),
|
||||
plugin=FakeDhcpPlugin())
|
||||
|
||||
|
|
|
@ -22,6 +22,7 @@ from neutron.tests.functional.agent.linux import simple_daemon
|
|||
|
||||
|
||||
UUID_FORMAT = "test-uuid-%d"
|
||||
SERVICE_NAME = "service"
|
||||
|
||||
|
||||
class BaseTestProcessMonitor(base.BaseTestCase):
|
||||
|
@ -30,13 +31,13 @@ class BaseTestProcessMonitor(base.BaseTestCase):
|
|||
super(BaseTestProcessMonitor, self).setUp()
|
||||
cfg.CONF.set_override('check_child_processes_interval', 1, 'AGENT')
|
||||
self._child_processes = []
|
||||
self._ext_processes = None
|
||||
self._process_monitor = None
|
||||
self.create_child_processes_manager('respawn')
|
||||
self.addCleanup(self.cleanup_spawned_children)
|
||||
|
||||
def create_child_processes_manager(self, action):
|
||||
cfg.CONF.set_override('check_child_processes_action', action, 'AGENT')
|
||||
self._ext_processes = self.build_process_monitor()
|
||||
self._process_monitor = self.build_process_monitor()
|
||||
|
||||
def build_process_monitor(self):
|
||||
return external_process.ProcessMonitor(
|
||||
|
@ -56,11 +57,14 @@ class BaseTestProcessMonitor(base.BaseTestCase):
|
|||
for child_number in moves.xrange(n):
|
||||
uuid = self._child_uuid(child_number)
|
||||
_callback = self._make_cmdline_callback(uuid)
|
||||
self._ext_processes.enable(uuid=uuid,
|
||||
cmd_callback=_callback,
|
||||
service=service)
|
||||
pm = external_process.ProcessManager(
|
||||
conf=cfg.CONF,
|
||||
uuid=uuid,
|
||||
default_cmd_callback=_callback,
|
||||
service=service)
|
||||
pm.enable()
|
||||
self._process_monitor.register(uuid, SERVICE_NAME, pm)
|
||||
|
||||
pm = self._ext_processes.get_process_manager(uuid, service)
|
||||
self._child_processes.append(pm)
|
||||
|
||||
@staticmethod
|
||||
|
@ -70,16 +74,11 @@ class BaseTestProcessMonitor(base.BaseTestCase):
|
|||
def _kill_last_child(self):
|
||||
self._child_processes[-1].disable()
|
||||
|
||||
def spawn_child_processes_and_kill_last(self, service=None, number=2):
|
||||
self.spawn_n_children(number, service)
|
||||
self._kill_last_child()
|
||||
self.assertFalse(self._child_processes[-1].active)
|
||||
|
||||
def wait_for_all_childs_respawned(self):
|
||||
def all_childs_active():
|
||||
def wait_for_all_children_respawned(self):
|
||||
def all_children_active():
|
||||
return all(pm.active for pm in self._child_processes)
|
||||
|
||||
self._wait_for_condition(all_childs_active)
|
||||
self._wait_for_condition(all_children_active)
|
||||
|
||||
def _wait_for_condition(self, exit_condition, extra_time=5):
|
||||
# we need to allow extra_time for the check process to happen
|
||||
|
@ -92,24 +91,13 @@ class BaseTestProcessMonitor(base.BaseTestCase):
|
|||
eventlet.sleep(0.01)
|
||||
|
||||
def cleanup_spawned_children(self):
|
||||
if self._ext_processes:
|
||||
self._ext_processes.disable_all()
|
||||
for pm in self._child_processes:
|
||||
pm.disable()
|
||||
|
||||
|
||||
class TestProcessMonitor(BaseTestProcessMonitor):
|
||||
|
||||
def test_respawn_handler(self):
|
||||
self.spawn_child_processes_and_kill_last()
|
||||
self.wait_for_all_childs_respawned()
|
||||
|
||||
def test_new_process_monitor_finds_old_process(self):
|
||||
self.spawn_n_children(1)
|
||||
spawn_process = self._child_processes[-1]
|
||||
uuid = spawn_process.uuid
|
||||
|
||||
another_pm = self.build_process_monitor()
|
||||
self.assertTrue(another_pm.is_active(uuid))
|
||||
self.assertEqual(another_pm.get_pid(uuid), spawn_process.pid)
|
||||
|
||||
def test_tries_to_get_pid_for_unknown_uuid(self):
|
||||
self.assertIsNone(self._ext_processes.get_pid('bad-uuid'))
|
||||
self.spawn_n_children(2)
|
||||
self._kill_last_child()
|
||||
self.wait_for_all_children_respawned()
|
||||
|
|
|
@ -19,7 +19,7 @@ from neutron.agent.linux import external_process
|
|||
from neutron.tests import base
|
||||
|
||||
TEST_UUID = 'test-uuid'
|
||||
TEST_SERVICE1 = 'testsvc'
|
||||
TEST_SERVICE = 'testsvc'
|
||||
TEST_PID = 1234
|
||||
|
||||
|
||||
|
@ -27,10 +27,6 @@ class BaseTestProcessMonitor(base.BaseTestCase):
|
|||
|
||||
def setUp(self):
|
||||
super(BaseTestProcessMonitor, self).setUp()
|
||||
self.pm_patch = mock.patch("neutron.agent.linux.external_process."
|
||||
"ProcessManager", side_effect=mock.Mock)
|
||||
self.pmanager = self.pm_patch.start()
|
||||
|
||||
self.log_patch = mock.patch("neutron.agent.linux.external_process."
|
||||
"LOG.error")
|
||||
self.error_log = self.log_patch.start()
|
||||
|
@ -49,47 +45,51 @@ class BaseTestProcessMonitor(base.BaseTestCase):
|
|||
config=conf,
|
||||
resource_type='test')
|
||||
|
||||
def get_monitored_process_manager(self, uuid, service=None):
|
||||
self.pmonitor.enable(uuid=uuid, service=service, cmd_callback=None)
|
||||
return self.pmonitor.get_process_manager(uuid, service)
|
||||
def get_monitored_process(self, uuid, service=None):
|
||||
monitored_process = mock.Mock()
|
||||
self.pmonitor.register(uuid=uuid,
|
||||
service_name=service,
|
||||
monitored_process=monitored_process)
|
||||
return monitored_process
|
||||
|
||||
|
||||
class TestProcessMonitor(BaseTestProcessMonitor):
|
||||
|
||||
def test_error_logged(self):
|
||||
pm = self.get_monitored_process_manager(TEST_UUID)
|
||||
pm = self.get_monitored_process(TEST_UUID)
|
||||
pm.active = False
|
||||
self.pmonitor._check_child_processes()
|
||||
self.assertTrue(self.error_log.called)
|
||||
|
||||
def test_exit_handler(self):
|
||||
self.create_child_process_monitor('exit')
|
||||
pm = self.get_monitored_process_manager(TEST_UUID)
|
||||
pm = self.get_monitored_process(TEST_UUID)
|
||||
pm.active = False
|
||||
with mock.patch.object(external_process.ProcessMonitor,
|
||||
'_exit_handler') as exit_handler:
|
||||
self.pmonitor._check_child_processes()
|
||||
exit_handler.assert_called_once_with(TEST_UUID, None)
|
||||
|
||||
def test_different_service_types(self):
|
||||
pm_none = self.get_monitored_process_manager(TEST_UUID)
|
||||
pm_svc1 = self.get_monitored_process_manager(TEST_UUID, TEST_SERVICE1)
|
||||
self.assertNotEqual(pm_none, pm_svc1)
|
||||
def test_register(self):
|
||||
pm = self.get_monitored_process(TEST_UUID)
|
||||
self.assertEqual(len(self.pmonitor._monitored_processes), 1)
|
||||
self.assertIn(pm, self.pmonitor._monitored_processes.values())
|
||||
|
||||
def test_active_method(self, service=None):
|
||||
pm = self.get_monitored_process_manager(TEST_UUID, service)
|
||||
pm.active = False
|
||||
self.assertFalse(self.pmonitor.is_active(TEST_UUID, service))
|
||||
pm.active = True
|
||||
self.assertTrue(self.pmonitor.is_active(TEST_UUID, service))
|
||||
def test_register_same_service_twice(self):
|
||||
self.get_monitored_process(TEST_UUID)
|
||||
self.get_monitored_process(TEST_UUID)
|
||||
self.assertEqual(len(self.pmonitor._monitored_processes), 1)
|
||||
|
||||
def test_active_method_with_service(self):
|
||||
self.test_active_method(TEST_SERVICE1)
|
||||
def test_register_different_service_types(self):
|
||||
self.get_monitored_process(TEST_UUID)
|
||||
self.get_monitored_process(TEST_UUID, TEST_SERVICE)
|
||||
self.assertEqual(len(self.pmonitor._monitored_processes), 2)
|
||||
|
||||
def test_pid_method(self, service=None):
|
||||
pm = self.get_monitored_process_manager(TEST_UUID, service)
|
||||
pm.pid = TEST_PID
|
||||
self.assertEqual(TEST_PID, self.pmonitor.get_pid(TEST_UUID, service))
|
||||
def test_unregister(self):
|
||||
self.get_monitored_process(TEST_UUID)
|
||||
self.pmonitor.unregister(TEST_UUID, None)
|
||||
self.assertEqual(len(self.pmonitor._monitored_processes), 0)
|
||||
|
||||
def test_pid_method_with_service(self):
|
||||
self.test_pid_method(TEST_PID)
|
||||
def test_unregister_unknown_process(self):
|
||||
self.pmonitor.unregister(TEST_UUID, None)
|
||||
self.assertEqual(len(self.pmonitor._monitored_processes), 0)
|
||||
|
|
|
@ -557,15 +557,13 @@ class TestDhcpAgentEventHandler(base.BaseTestCase):
|
|||
|
||||
def _process_manager_constructor_call(self):
|
||||
return mock.call(conf=cfg.CONF,
|
||||
uuid=FAKE_NETWORK_UUID,
|
||||
namespace=FAKE_NETWORK_DHCP_NS,
|
||||
service=None,
|
||||
default_cmd_callback=mock.ANY,
|
||||
pid_file=None,
|
||||
cmd_addl_env=None)
|
||||
uuid=FAKE_NETWORK_UUID,
|
||||
namespace=FAKE_NETWORK_DHCP_NS,
|
||||
default_cmd_callback=mock.ANY)
|
||||
|
||||
def _enable_dhcp_helper(self, network, enable_isolated_metadata=False,
|
||||
is_isolated_network=False):
|
||||
self.dhcp._process_monitor = mock.Mock()
|
||||
if enable_isolated_metadata:
|
||||
cfg.CONF.set_override('enable_isolated_metadata', True)
|
||||
self.plugin.get_network_info.return_value = network
|
||||
|
@ -577,7 +575,7 @@ class TestDhcpAgentEventHandler(base.BaseTestCase):
|
|||
if is_isolated_network:
|
||||
self.external_process.assert_has_calls([
|
||||
self._process_manager_constructor_call(),
|
||||
mock.call().enable(reload_cfg=False)
|
||||
mock.call().enable()
|
||||
])
|
||||
else:
|
||||
self.assertFalse(self.external_process.call_count)
|
||||
|
@ -744,10 +742,11 @@ class TestDhcpAgentEventHandler(base.BaseTestCase):
|
|||
self._disable_dhcp_helper_driver_failure()
|
||||
|
||||
def test_enable_isolated_metadata_proxy(self):
|
||||
self.dhcp._process_monitor = mock.Mock()
|
||||
self.dhcp.enable_isolated_metadata_proxy(fake_network)
|
||||
self.external_process.assert_has_calls([
|
||||
self._process_manager_constructor_call(),
|
||||
mock.call().enable(reload_cfg=False)
|
||||
mock.call().enable()
|
||||
])
|
||||
|
||||
def test_disable_isolated_metadata_proxy(self):
|
||||
|
@ -757,7 +756,8 @@ class TestDhcpAgentEventHandler(base.BaseTestCase):
|
|||
self.dhcp.disable_isolated_metadata_proxy(fake_network)
|
||||
destroy.assert_called_once_with(self.dhcp._process_monitor,
|
||||
fake_network.id,
|
||||
fake_network.namespace)
|
||||
fake_network.namespace,
|
||||
cfg.CONF)
|
||||
|
||||
def _test_metadata_network(self, network):
|
||||
cfg.CONF.set_override('enable_metadata_network', True)
|
||||
|
|
|
@ -214,6 +214,8 @@ class BasicRouterOperationsFramework(base.BaseTestCase):
|
|||
self.external_process_p = mock.patch(
|
||||
'neutron.agent.linux.external_process.ProcessManager')
|
||||
self.external_process = self.external_process_p.start()
|
||||
self.process_monitor = mock.patch(
|
||||
'neutron.agent.linux.external_process.ProcessMonitor').start()
|
||||
|
||||
self.send_arp_p = mock.patch(
|
||||
'neutron.agent.linux.ip_lib.send_gratuitous_arp')
|
||||
|
@ -1070,32 +1072,24 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
|
|||
self.assertFalse(nat_rules_delta)
|
||||
return ri
|
||||
|
||||
def _expected_call_lookup_ri_process_enabled(self, ri, process):
|
||||
def _expected_call_lookup_ri_process(self, ri, process):
|
||||
"""Expected call if a process is looked up in a router instance."""
|
||||
return [mock.call(uuid=ri.router['id'],
|
||||
service=process,
|
||||
default_cmd_callback=mock.ANY,
|
||||
namespace=ri.ns_name,
|
||||
conf=self.conf,
|
||||
pid_file=None,
|
||||
cmd_addl_env=None)]
|
||||
|
||||
def _expected_call_lookup_ri_process_disabled(self, ri, process):
|
||||
"""Expected call if a process is looked up in a router instance."""
|
||||
# The ProcessManager does already exist, and it's found via
|
||||
# ProcessMonitor lookup _ensure_process_manager
|
||||
return [mock.call().__nonzero__()]
|
||||
conf=mock.ANY)]
|
||||
|
||||
def _assert_ri_process_enabled(self, ri, process):
|
||||
"""Verify that process was enabled for a router instance."""
|
||||
expected_calls = self._expected_call_lookup_ri_process_enabled(
|
||||
expected_calls = self._expected_call_lookup_ri_process(
|
||||
ri, process)
|
||||
expected_calls.append(mock.call().enable(reload_cfg=True))
|
||||
self.assertEqual(expected_calls, self.external_process.mock_calls)
|
||||
|
||||
def _assert_ri_process_disabled(self, ri, process):
|
||||
"""Verify that process was disabled for a router instance."""
|
||||
expected_calls = self._expected_call_lookup_ri_process_disabled(
|
||||
expected_calls = self._expected_call_lookup_ri_process(
|
||||
ri, process)
|
||||
expected_calls.append(mock.call().disable())
|
||||
self.assertEqual(expected_calls, self.external_process.mock_calls)
|
||||
|
@ -1158,6 +1152,7 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
|
|||
self._assert_ri_process_enabled(ri, 'radvd')
|
||||
# Reset the calls so we can check for disable radvd
|
||||
self.external_process.reset_mock()
|
||||
self.process_monitor.reset_mock()
|
||||
# Remove the IPv6 interface and reprocess
|
||||
del router[l3_constants.INTERFACE_KEY][1]
|
||||
self._process_router_instance_for_agent(agent, ri, router)
|
||||
|
@ -1485,6 +1480,7 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
|
|||
if enableflag:
|
||||
destroy_proxy.assert_called_with(mock.ANY,
|
||||
router_id,
|
||||
mock.ANY,
|
||||
mock.ANY)
|
||||
else:
|
||||
self.assertFalse(destroy_proxy.call_count)
|
||||
|
|
|
@ -623,6 +623,9 @@ class TestBase(base.BaseTestCase):
|
|||
self.isdir = mock.patch('os.path.isdir').start()
|
||||
self.isdir.return_value = False
|
||||
|
||||
self.external_process = mock.patch(
|
||||
'neutron.agent.linux.external_process.ProcessManager').start()
|
||||
|
||||
|
||||
class TestDhcpBase(TestBase):
|
||||
|
||||
|
@ -711,6 +714,10 @@ class TestDhcpLocalProcess(TestBase):
|
|||
self.assertTrue(mocks['interface_name'].__set__.called)
|
||||
self.assertTrue(mocks['_ensure_network_conf_dir'].called)
|
||||
|
||||
def _assert_disabled(self, lp):
|
||||
self.assertTrue(lp.process_monitor.unregister.called)
|
||||
self.assertTrue(self.external_process().disable.called)
|
||||
|
||||
def test_disable_not_active(self):
|
||||
attrs_to_mock = dict([(a, mock.DEFAULT) for a in
|
||||
['active', 'interface_name']])
|
||||
|
@ -719,11 +726,11 @@ class TestDhcpLocalProcess(TestBase):
|
|||
mocks['interface_name'].__get__ = mock.Mock(return_value='tap0')
|
||||
network = FakeDualNetwork()
|
||||
lp = LocalChild(self.conf, network)
|
||||
lp.process_monitor.pid.return_value = 5
|
||||
lp.device_manager = mock.Mock()
|
||||
lp.disable()
|
||||
lp.device_manager.destroy.assert_called_once_with(
|
||||
network, 'tap0')
|
||||
self._assert_disabled(lp)
|
||||
|
||||
def test_disable_retain_port(self):
|
||||
attrs_to_mock = dict([(a, mock.DEFAULT) for a in
|
||||
|
@ -734,7 +741,7 @@ class TestDhcpLocalProcess(TestBase):
|
|||
mocks['interface_name'].__get__ = mock.Mock(return_value='tap0')
|
||||
lp = LocalChild(self.conf, network)
|
||||
lp.disable(retain_port=True)
|
||||
self.assertTrue(lp.process_monitor.disable.called)
|
||||
self._assert_disabled(lp)
|
||||
|
||||
def test_disable(self):
|
||||
attrs_to_mock = dict([(a, mock.DEFAULT) for a in
|
||||
|
@ -745,9 +752,10 @@ class TestDhcpLocalProcess(TestBase):
|
|||
mocks['interface_name'].__get__ = mock.Mock(return_value='tap0')
|
||||
lp = LocalChild(self.conf, network)
|
||||
with mock.patch('neutron.agent.linux.ip_lib.IPWrapper') as ip:
|
||||
lp.process_monitor.pid.return_value = 5
|
||||
lp.disable()
|
||||
|
||||
self._assert_disabled(lp)
|
||||
|
||||
self.mock_mgr.assert_has_calls([mock.call(self.conf, None),
|
||||
mock.call().destroy(network, 'tap0')])
|
||||
|
||||
|
@ -761,9 +769,10 @@ class TestDhcpLocalProcess(TestBase):
|
|||
mocks['active'].__get__ = mock.Mock(return_value=False)
|
||||
lp = LocalChild(self.conf, FakeDualNetwork())
|
||||
with mock.patch('neutron.agent.linux.ip_lib.IPWrapper') as ip:
|
||||
lp.process_monitor.pid.return_value = 5
|
||||
lp.disable()
|
||||
|
||||
self._assert_disabled(lp)
|
||||
|
||||
ip.return_value.netns.delete.assert_called_with('qdhcp-ns')
|
||||
|
||||
def test_get_interface_name(self):
|
||||
|
@ -865,15 +874,11 @@ class TestDnsmasq(TestBase):
|
|||
dm.spawn_process()
|
||||
self.assertTrue(mocks['_output_opts_file'].called)
|
||||
|
||||
test_pm.enable.assert_called_once_with(
|
||||
uuid=network.id,
|
||||
service='dnsmasq',
|
||||
namespace='qdhcp-ns',
|
||||
cmd_callback=mock.ANY,
|
||||
reload_cfg=False,
|
||||
pid_file=expected_pid_file)
|
||||
call_kwargs = test_pm.method_calls[0][2]
|
||||
cmd_callback = call_kwargs['cmd_callback']
|
||||
self.assertTrue(test_pm.register.called)
|
||||
self.external_process().enable.assert_called_once_with(
|
||||
reload_cfg=False)
|
||||
call_kwargs = self.external_process.mock_calls[0][2]
|
||||
cmd_callback = call_kwargs['default_cmd_callback']
|
||||
|
||||
result_cmd = cmd_callback(expected_pid_file)
|
||||
|
||||
|
@ -1261,12 +1266,9 @@ class TestDnsmasq(TestBase):
|
|||
test_pm = mock.Mock()
|
||||
dm = self._get_dnsmasq(FakeDualNetwork(), test_pm)
|
||||
dm.reload_allocations()
|
||||
test_pm.enable.assert_has_calls([mock.call(uuid=mock.ANY,
|
||||
cmd_callback=mock.ANY,
|
||||
namespace=mock.ANY,
|
||||
service=mock.ANY,
|
||||
reload_cfg=True,
|
||||
pid_file=mock.ANY)])
|
||||
self.assertTrue(test_pm.register.called)
|
||||
self.external_process().enable.assert_called_once_with(
|
||||
reload_cfg=True)
|
||||
|
||||
self.safe.assert_has_calls([
|
||||
mock.call(exp_host_name, exp_host_data),
|
||||
|
|
|
@ -45,8 +45,7 @@ class TestNetnsCleanup(base.BaseTestCase):
|
|||
util.kill_dhcp(conf, 'ns')
|
||||
|
||||
expected_params = {'conf': conf, 'network': mock.ANY,
|
||||
'plugin': mock.ANY,
|
||||
'process_monitor': mock.ANY}
|
||||
'plugin': mock.ANY}
|
||||
import_object.assert_called_once_with('driver', **expected_params)
|
||||
|
||||
if dhcp_active:
|
||||
|
|
Loading…
Reference in New Issue