LBaaS: unify haproxy-on-host plugin driver and agent

Unifies haproxy reference implementation to make common agent based plugin driver
which is suitable for all vendors who wants to use async mechanism.

 - Agent API as well as device driver API changed to handle
   loadbalancer objects individually;
 - Agent loads device drivers according to config;
 - LogicalDeviceCache class was removed from agent as it was used only
   as a list - to put and remove entries ant check whether entry is in or not.
   It was replaced with instance_mapping dict in agent to store known instances and
   corresponding device_drivers;
 - Agent reports which device drivers are supported (needs for scheduling on plugin side);
 - Agent-to-plugin API was extended to provide an ability for agent to update
   statuses of pools/vips/members/health_monitors;
 - Vendor should only implement device driver; plugin driver just needs
   to inherit AgentBasedPluginDriver and override device_driver member;
 - This patch doesn't move files to make review easier;
   all rename/replace will be done in a subsequent patch;

DocImpact

NOTE: Since the change in the agent RPC API is backward-incompatible
(major RPC version change), LBaaS server-agent communications will be
completely broken until both sides are upgraded so users will be unable to
create new or update existing HAProxy loadbalancer instances during upgrade

Implements blueprint lbaas-common-agent-driver

Change-Id: I9fd90a1321611d202ef838681273081fa6c1686a
This commit is contained in:
Oleg Bondarev 2013-08-06 12:52:34 +04:00
parent 2d7042a6b0
commit 2e9d1b173b
18 changed files with 1378 additions and 729 deletions

View File

@ -23,9 +23,16 @@
# Example of interface_driver option for LinuxBridge # Example of interface_driver option for LinuxBridge
# interface_driver = neutron.agent.linux.interface.BridgeInterfaceDriver # interface_driver = neutron.agent.linux.interface.BridgeInterfaceDriver
# The agent requires a driver to manage the loadbalancer. HAProxy is the # The agent requires drivers to manage the loadbalancer. HAProxy is the opensource version.
# opensource version. # Multiple device drivers reflecting different service providers could be specified:
# device_driver = path.to.provider1.driver.Driver
# device_driver = path.to.provider2.driver.Driver
# Default is:
# device_driver = neutron.services.loadbalancer.drivers.haproxy.namespace_driver.HaproxyNSDriver # device_driver = neutron.services.loadbalancer.drivers.haproxy.namespace_driver.HaproxyNSDriver
[haproxy]
# Location to store config and state files
# loadbalancer_state_path = $state_path/lbaas
# The user group # The user group
# user_group = nogroup # user_group = nogroup

View File

@ -79,11 +79,19 @@ class LbaasAgentSchedulerDbMixin(agentschedulers_db.AgentSchedulerDbMixin,
else: else:
return {'pools': []} return {'pools': []}
def get_lbaas_agent_candidates(self, device_driver, active_agents):
candidates = []
for agent in active_agents:
agent_conf = self.get_configuration_dict(agent)
if device_driver in agent_conf['device_drivers']:
candidates.append(agent)
return candidates
class ChanceScheduler(object): class ChanceScheduler(object):
"""Allocate a loadbalancer agent for a vip in a random way.""" """Allocate a loadbalancer agent for a vip in a random way."""
def schedule(self, plugin, context, pool): def schedule(self, plugin, context, pool, device_driver):
"""Schedule the pool to an active loadbalancer agent if there """Schedule the pool to an active loadbalancer agent if there
is no enabled agent hosting it. is no enabled agent hosting it.
""" """
@ -97,11 +105,18 @@ class ChanceScheduler(object):
'agent_id': lbaas_agent['id']}) 'agent_id': lbaas_agent['id']})
return return
candidates = plugin.get_lbaas_agents(context, active=True) active_agents = plugin.get_lbaas_agents(context, active=True)
if not candidates: if not active_agents:
LOG.warn(_('No active lbaas agents for pool %s'), pool['id']) LOG.warn(_('No active lbaas agents for pool %s'), pool['id'])
return return
candidates = plugin.get_lbaas_agent_candidates(device_driver,
active_agents)
if not candidates:
LOG.warn(_('No lbaas agent supporting device driver %s'),
device_driver)
return
chosen_agent = random.choice(candidates) chosen_agent = random.choice(candidates)
binding = PoolLoadbalancerAgentBinding() binding = PoolLoadbalancerAgentBinding()
binding.agent = chosen_agent binding.agent = chosen_agent

View File

@ -107,10 +107,10 @@ class LoadBalancerAbstractDriver(object):
pass pass
@abc.abstractmethod @abc.abstractmethod
def update_health_monitor(self, context, def update_pool_health_monitor(self, context,
old_health_monitor, old_health_monitor,
health_monitor, health_monitor,
pool_id): pool_id):
pass pass
@abc.abstractmethod @abc.abstractmethod

View File

@ -0,0 +1,98 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright 2013 OpenStack Foundation. All rights reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import six
@six.add_metaclass(abc.ABCMeta)
class AgentDeviceDriver(object):
"""Abstract device driver that defines the API required by LBaaS agent."""
@abc.abstractmethod
def get_name(cls):
"""Returns unique name across all LBaaS device drivers."""
pass
@abc.abstractmethod
def deploy_instance(self, logical_config):
"""Fully deploys a loadbalancer instance from a given config."""
pass
@abc.abstractmethod
def undeploy_instance(self, pool_id):
"""Fully undeploys the loadbalancer instance."""
pass
@abc.abstractmethod
def get_stats(self, pool_id):
pass
def remove_orphans(self, known_pool_ids):
# Not all drivers will support this
raise NotImplementedError()
@abc.abstractmethod
def create_vip(self, vip):
pass
@abc.abstractmethod
def update_vip(self, old_vip, vip):
pass
@abc.abstractmethod
def delete_vip(self, vip):
pass
@abc.abstractmethod
def create_pool(self, pool):
pass
@abc.abstractmethod
def update_pool(self, old_pool, pool):
pass
@abc.abstractmethod
def delete_pool(self, pool):
pass
@abc.abstractmethod
def create_member(self, member):
pass
@abc.abstractmethod
def update_member(self, old_member, member):
pass
@abc.abstractmethod
def delete_member(self, context, member):
pass
@abc.abstractmethod
def create_pool_health_monitor(self, health_monitor, pool_id):
pass
@abc.abstractmethod
def update_pool_health_monitor(self,
old_health_monitor,
health_monitor,
pool_id):
pass
@abc.abstractmethod
def delete_pool_health_monitor(self, context, health_monitor, pool_id):
pass

View File

@ -22,7 +22,12 @@ from neutron.openstack.common.rpc import proxy
class LbaasAgentApi(proxy.RpcProxy): class LbaasAgentApi(proxy.RpcProxy):
"""Agent side of the Agent to Plugin RPC API.""" """Agent side of the Agent to Plugin RPC API."""
API_VERSION = '1.0' API_VERSION = '2.0'
# history
# 1.0 Initial version
# 2.0 Generic API for agent based drivers
# - get_logical_device() handling changed on plugin side;
# - pool_deployed() and update_status() methods added;
def __init__(self, topic, context, host): def __init__(self, topic, context, host):
super(LbaasAgentApi, self).__init__(topic, self.API_VERSION) super(LbaasAgentApi, self).__init__(topic, self.API_VERSION)
@ -36,21 +41,35 @@ class LbaasAgentApi(proxy.RpcProxy):
topic=self.topic topic=self.topic
) )
def pool_destroyed(self, pool_id):
return self.call(
self.context,
self.make_msg('pool_destroyed', pool_id=pool_id),
topic=self.topic
)
def pool_deployed(self, pool_id):
return self.call(
self.context,
self.make_msg('pool_deployed', pool_id=pool_id),
topic=self.topic
)
def get_logical_device(self, pool_id): def get_logical_device(self, pool_id):
return self.call( return self.call(
self.context, self.context,
self.make_msg( self.make_msg(
'get_logical_device', 'get_logical_device',
pool_id=pool_id, pool_id=pool_id
host=self.host
), ),
topic=self.topic topic=self.topic
) )
def pool_destroyed(self, pool_id): def update_status(self, obj_type, obj_id, status):
return self.call( return self.call(
self.context, self.context,
self.make_msg('pool_destroyed', pool_id=pool_id, host=self.host), self.make_msg('update_status', obj_type=obj_type, obj_id=obj_id,
status=status),
topic=self.topic topic=self.topic
) )

View File

@ -16,152 +16,100 @@
# #
# @author: Mark McClain, DreamHost # @author: Mark McClain, DreamHost
import weakref
from oslo.config import cfg from oslo.config import cfg
from neutron.agent.common import config
from neutron.agent import rpc as agent_rpc from neutron.agent import rpc as agent_rpc
from neutron.common import constants from neutron.common import constants as n_const
from neutron.common import exceptions as n_exc
from neutron import context from neutron import context
from neutron.openstack.common import importutils from neutron.openstack.common import importutils
from neutron.openstack.common import log as logging from neutron.openstack.common import log as logging
from neutron.openstack.common import loopingcall from neutron.openstack.common import loopingcall
from neutron.openstack.common import periodic_task from neutron.openstack.common import periodic_task
from neutron.plugins.common import constants
from neutron.services.loadbalancer.drivers.haproxy import ( from neutron.services.loadbalancer.drivers.haproxy import (
agent_api, agent_api,
plugin_driver plugin_driver
) )
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
NS_PREFIX = 'qlbaas-'
OPTS = [ OPTS = [
cfg.StrOpt( cfg.MultiStrOpt(
'device_driver', 'device_driver',
default=('neutron.services.loadbalancer.drivers' default=['neutron.services.loadbalancer.drivers'
'.haproxy.namespace_driver.HaproxyNSDriver'), '.haproxy.namespace_driver.HaproxyNSDriver'],
help=_('The driver used to manage the loadbalancing device'), help=_('Drivers used to manage loadbalancing devices'),
),
cfg.StrOpt(
'loadbalancer_state_path',
default='$state_path/lbaas',
help=_('Location to store config and state files'),
), ),
cfg.StrOpt( cfg.StrOpt(
'interface_driver', 'interface_driver',
help=_('The driver used to manage the virtual interface') help=_('The driver used to manage the virtual interface')
), ),
cfg.StrOpt(
'user_group',
default='nogroup',
help=_('The user group'),
),
] ]
class LogicalDeviceCache(object): class DeviceNotFoundOnAgent(n_exc.NotFound):
"""Manage a cache of known devices.""" msg = _('Unknown device with pool_id %(pool_id)s')
class Device(object):
"""Inner classes used to hold values for weakref lookups."""
def __init__(self, port_id, pool_id):
self.port_id = port_id
self.pool_id = pool_id
def __eq__(self, other):
return self.__dict__ == other.__dict__
def __hash__(self):
return hash((self.port_id, self.pool_id))
def __init__(self):
self.devices = set()
self.port_lookup = weakref.WeakValueDictionary()
self.pool_lookup = weakref.WeakValueDictionary()
def put(self, device):
port_id = device['vip']['port_id']
pool_id = device['pool']['id']
d = self.Device(device['vip']['port_id'], device['pool']['id'])
if d not in self.devices:
self.devices.add(d)
self.port_lookup[port_id] = d
self.pool_lookup[pool_id] = d
def remove(self, device):
if not isinstance(device, self.Device):
device = self.Device(
device['vip']['port_id'], device['pool']['id']
)
if device in self.devices:
self.devices.remove(device)
def remove_by_pool_id(self, pool_id):
d = self.pool_lookup.get(pool_id)
if d:
self.devices.remove(d)
def get_by_pool_id(self, pool_id):
return self.pool_lookup.get(pool_id)
def get_by_port_id(self, port_id):
return self.port_lookup.get(port_id)
def get_pool_ids(self):
return self.pool_lookup.keys()
class LbaasAgentManager(periodic_task.PeriodicTasks): class LbaasAgentManager(periodic_task.PeriodicTasks):
RPC_API_VERSION = '2.0'
# history # history
# 1.0 Initial version # 1.0 Initial version
# 1.1 Support agent_updated call # 1.1 Support agent_updated call
RPC_API_VERSION = '1.1' # 2.0 Generic API for agent based drivers
# - modify/reload/destroy_pool methods were removed;
# - added methods to handle create/update/delete for every lbaas
# object individually;
def __init__(self, conf): def __init__(self, conf):
self.conf = conf self.conf = conf
try: self.context = context.get_admin_context_without_session()
vif_driver = importutils.import_object(conf.interface_driver, conf) self.plugin_rpc = agent_api.LbaasAgentApi(
except ImportError: plugin_driver.TOPIC_LOADBALANCER_PLUGIN,
msg = _('Error importing interface driver: %s') self.context,
raise SystemExit(msg % conf.interface_driver) self.conf.host
)
try: self._load_drivers()
self.driver = importutils.import_object(
conf.device_driver,
config.get_root_helper(self.conf),
conf.loadbalancer_state_path,
vif_driver,
self._vip_plug_callback
)
except ImportError:
msg = _('Error importing loadbalancer device driver: %s')
raise SystemExit(msg % conf.device_driver)
self.agent_state = { self.agent_state = {
'binary': 'neutron-lbaas-agent', 'binary': 'neutron-lbaas-agent',
'host': conf.host, 'host': conf.host,
'topic': plugin_driver.TOPIC_LOADBALANCER_AGENT, 'topic': plugin_driver.TOPIC_LOADBALANCER_AGENT,
'configurations': {'device_driver': conf.device_driver, 'configurations': {'device_drivers': self.device_drivers.keys()},
'interface_driver': conf.interface_driver}, 'agent_type': n_const.AGENT_TYPE_LOADBALANCER,
'agent_type': constants.AGENT_TYPE_LOADBALANCER,
'start_flag': True} 'start_flag': True}
self.admin_state_up = True self.admin_state_up = True
self.context = context.get_admin_context_without_session() self._setup_state_rpc()
self._setup_rpc()
self.needs_resync = False self.needs_resync = False
self.cache = LogicalDeviceCache() # pool_id->device_driver_name mapping used to store known instances
self.instance_mapping = {}
def _setup_rpc(self): def _load_drivers(self):
self.plugin_rpc = agent_api.LbaasAgentApi( self.device_drivers = {}
plugin_driver.TOPIC_PROCESS_ON_HOST, for driver in self.conf.device_driver:
self.context, try:
self.conf.host driver_inst = importutils.import_object(
) driver,
self.conf,
self.plugin_rpc
)
except ImportError:
msg = _('Error importing loadbalancer device driver: %s')
raise SystemExit(msg % driver)
driver_name = driver_inst.get_name()
if driver_name not in self.device_drivers:
self.device_drivers[driver_name] = driver_inst
else:
msg = _('Multiple device drivers with the same name found: %s')
raise SystemExit(msg % driver_name)
def _setup_state_rpc(self):
self.state_rpc = agent_rpc.PluginReportStateAPI( self.state_rpc = agent_rpc.PluginReportStateAPI(
plugin_driver.TOPIC_PROCESS_ON_HOST) plugin_driver.TOPIC_LOADBALANCER_PLUGIN)
report_interval = self.conf.AGENT.report_interval report_interval = self.conf.AGENT.report_interval
if report_interval: if report_interval:
heartbeat = loopingcall.FixedIntervalLoopingCall( heartbeat = loopingcall.FixedIntervalLoopingCall(
@ -170,8 +118,8 @@ class LbaasAgentManager(periodic_task.PeriodicTasks):
def _report_state(self): def _report_state(self):
try: try:
device_count = len(self.cache.devices) instance_count = len(self.instance_mapping)
self.agent_state['configurations']['devices'] = device_count self.agent_state['configurations']['instances'] = instance_count
self.state_rpc.report_state(self.context, self.state_rpc.report_state(self.context,
self.agent_state) self.agent_state)
self.agent_state.pop('start_flag', None) self.agent_state.pop('start_flag', None)
@ -189,31 +137,26 @@ class LbaasAgentManager(periodic_task.PeriodicTasks):
@periodic_task.periodic_task(spacing=6) @periodic_task.periodic_task(spacing=6)
def collect_stats(self, context): def collect_stats(self, context):
for pool_id in self.cache.get_pool_ids(): for pool_id, driver_name in self.instance_mapping.items():
driver = self.device_drivers[driver_name]
try: try:
stats = self.driver.get_stats(pool_id) stats = driver.get_stats(pool_id)
if stats: if stats:
self.plugin_rpc.update_pool_stats(pool_id, stats) self.plugin_rpc.update_pool_stats(pool_id, stats)
except Exception: except Exception:
LOG.exception(_('Error upating stats')) LOG.exception(_('Error upating stats'))
self.needs_resync = True self.needs_resync = True
def _vip_plug_callback(self, action, port):
if action == 'plug':
self.plugin_rpc.plug_vip_port(port['id'])
elif action == 'unplug':
self.plugin_rpc.unplug_vip_port(port['id'])
def sync_state(self): def sync_state(self):
known_devices = set(self.cache.get_pool_ids()) known_instances = set(self.instance_mapping.keys())
try: try:
ready_logical_devices = set(self.plugin_rpc.get_ready_devices()) ready_instances = set(self.plugin_rpc.get_ready_devices())
for deleted_id in known_devices - ready_logical_devices: for deleted_id in known_instances - ready_instances:
self.destroy_device(deleted_id) self._destroy_pool(deleted_id)
for pool_id in ready_logical_devices: for pool_id in ready_instances:
self.refresh_device(pool_id) self._reload_pool(pool_id)
except Exception: except Exception:
LOG.exception(_('Unable to retrieve ready devices')) LOG.exception(_('Unable to retrieve ready devices'))
@ -221,51 +164,168 @@ class LbaasAgentManager(periodic_task.PeriodicTasks):
self.remove_orphans() self.remove_orphans()
def refresh_device(self, pool_id): def _get_driver(self, pool_id):
if pool_id not in self.instance_mapping:
raise DeviceNotFoundOnAgent(pool_id=pool_id)
driver_name = self.instance_mapping[pool_id]
return self.device_drivers[driver_name]
def _reload_pool(self, pool_id):
try: try:
logical_config = self.plugin_rpc.get_logical_device(pool_id) logical_config = self.plugin_rpc.get_logical_device(pool_id)
driver_name = logical_config['driver']
if driver_name not in self.device_drivers:
LOG.error(_('No device driver '
'on agent: %s.'), driver_name)
self.plugin_rpc.update_status(
'pool', pool_id, constants.ERROR)
return
if self.driver.exists(pool_id): self.device_drivers[driver_name].deploy_instance(logical_config)
self.driver.update(logical_config) self.instance_mapping[pool_id] = driver_name
else: self.plugin_rpc.pool_deployed(pool_id)
self.driver.create(logical_config)
self.cache.put(logical_config)
except Exception: except Exception:
LOG.exception(_('Unable to refresh device for pool: %s'), pool_id) LOG.exception(_('Unable to deploy instance for pool: %s'), pool_id)
self.needs_resync = True self.needs_resync = True
def destroy_device(self, pool_id): def _destroy_pool(self, pool_id):
device = self.cache.get_by_pool_id(pool_id) driver = self._get_driver(pool_id)
if not device:
return
try: try:
self.driver.destroy(pool_id) driver.undeploy_instance(pool_id)
del self.instance_mapping[pool_id]
self.plugin_rpc.pool_destroyed(pool_id) self.plugin_rpc.pool_destroyed(pool_id)
except Exception: except Exception:
LOG.exception(_('Unable to destroy device for pool: %s'), pool_id) LOG.exception(_('Unable to destroy device for pool: %s'), pool_id)
self.needs_resync = True self.needs_resync = True
self.cache.remove(device)
def remove_orphans(self): def remove_orphans(self):
for driver_name in self.device_drivers:
pool_ids = [pool_id for pool_id in self.instance_mapping
if self.instance_mapping[pool_id] == driver_name]
try:
self.device_drivers[driver_name].remove_orphans(pool_ids)
except NotImplementedError:
pass # Not all drivers will support this
def _handle_failed_driver_call(self, operation, obj_type, obj_id, driver):
LOG.exception(_('%(operation)s %(obj)s %(id)s failed on device driver '
'%(driver)s'),
{'operation': operation.capitalize(), 'obj': obj_type,
'id': obj_id, 'driver': driver})
self.plugin_rpc.update_status(obj_type, obj_id, constants.ERROR)
def create_vip(self, context, vip):
driver = self._get_driver(vip['pool_id'])
try: try:
self.driver.remove_orphans(self.cache.get_pool_ids()) driver.create_vip(vip)
except NotImplementedError: except Exception:
pass # Not all drivers will support this self._handle_failed_driver_call('create', 'vip', vip['id'],
driver.get_name())
else:
self.plugin_rpc.update_status('vip', vip['id'], constants.ACTIVE)
def reload_pool(self, context, pool_id=None, host=None): def update_vip(self, context, old_vip, vip):
"""Handle RPC cast from plugin to reload a pool.""" driver = self._get_driver(vip['pool_id'])
if pool_id: try:
self.refresh_device(pool_id) driver.update_vip(old_vip, vip)
except Exception:
self._handle_failed_driver_call('update', 'vip', vip['id'],
driver.get_name())
else:
self.plugin_rpc.update_status('vip', vip['id'], constants.ACTIVE)
def modify_pool(self, context, pool_id=None, host=None): def delete_vip(self, context, vip):
"""Handle RPC cast from plugin to modify a pool if known to agent.""" driver = self._get_driver(vip['pool_id'])
if self.cache.get_by_pool_id(pool_id): driver.delete_vip(vip)
self.refresh_device(pool_id)
def destroy_pool(self, context, pool_id=None, host=None): def create_pool(self, context, pool, driver_name):
"""Handle RPC cast from plugin to destroy a pool if known to agent.""" if driver_name not in self.device_drivers:
if self.cache.get_by_pool_id(pool_id): LOG.error(_('No device driver on agent: %s.'), driver_name)
self.destroy_device(pool_id) self.plugin_rpc.update_status('pool', pool['id'], constants.ERROR)
return
driver = self.device_drivers[driver_name]
try:
driver.create_pool(pool)
except Exception:
self._handle_failed_driver_call('create', 'pool', pool['id'],
driver.get_name())
else:
self.instance_mapping[pool['id']] = driver_name
self.plugin_rpc.update_status('pool', pool['id'], constants.ACTIVE)
def update_pool(self, context, old_pool, pool):
driver = self._get_driver(pool['id'])
try:
driver.update_pool(old_pool, pool)
except Exception:
self._handle_failed_driver_call('update', 'pool', pool['id'],
driver.get_name())
else:
self.plugin_rpc.update_status('pool', pool['id'], constants.ACTIVE)
def delete_pool(self, context, pool):
driver = self._get_driver(pool['id'])
driver.delete_pool(pool)
del self.instance_mapping[pool['id']]
def create_member(self, context, member):
driver = self._get_driver(member['pool_id'])
try:
driver.create_member(member)
except Exception:
self._handle_failed_driver_call('create', 'member', member['id'],
driver.get_name())
else:
self.plugin_rpc.update_status('member', member['id'],
constants.ACTIVE)
def update_member(self, context, old_member, member):
driver = self._get_driver(member['pool_id'])
try:
driver.update_member(old_member, member)
except Exception:
self._handle_failed_driver_call('update', 'member', member['id'],
driver.get_name())
else:
self.plugin_rpc.update_status('member', member['id'],
constants.ACTIVE)
def delete_member(self, context, member):
driver = self._get_driver(member['pool_id'])
driver.delete_member(member)
def create_pool_health_monitor(self, context, health_monitor, pool_id):
driver = self._get_driver(pool_id)
assoc_id = {'pool_id': pool_id, 'monitor_id': health_monitor['id']}
try:
driver.create_pool_health_monitor(health_monitor, pool_id)
except Exception:
self._handle_failed_driver_call(
'create', 'health_monitor', assoc_id, driver.get_name())
else:
self.plugin_rpc.update_status(
'health_monitor', assoc_id, constants.ACTIVE)
def update_pool_health_monitor(self, context, old_health_monitor,
health_monitor, pool_id):
driver = self._get_driver(pool_id)
assoc_id = {'pool_id': pool_id, 'monitor_id': health_monitor['id']}
try:
driver.update_pool_health_monitor(old_health_monitor,
health_monitor,
pool_id)
except Exception:
self._handle_failed_driver_call(
'update', 'health_monitor', assoc_id, driver.get_name())
else:
self.plugin_rpc.update_status(
'health_monitor', assoc_id, constants.ACTIVE)
def delete_pool_health_monitor(self, context, health_monitor, pool_id):
driver = self._get_driver(pool_id)
driver.delete_pool_health_monitor(health_monitor, pool_id)
def agent_updated(self, context, payload): def agent_updated(self, context, payload):
"""Handle the agent_updated notification event.""" """Handle the agent_updated notification event."""
@ -274,6 +334,8 @@ class LbaasAgentManager(periodic_task.PeriodicTasks):
if self.admin_state_up: if self.admin_state_up:
self.needs_resync = True self.needs_resync = True
else: else:
for pool_id in self.cache.get_pool_ids(): for pool_id in self.instance_mapping.keys():
self.destroy_device(pool_id) LOG.info(_("Destroying pool %s due to agent disabling"),
LOG.info(_("agent_updated by server side %s!"), payload) pool_id)
self._destroy_pool(pool_id)
LOG.info(_("Agent_updated by server side %s!"), payload)

View File

@ -18,8 +18,6 @@
import itertools import itertools
from oslo.config import cfg
from neutron.agent.linux import utils from neutron.agent.linux import utils
from neutron.plugins.common import constants as qconstants from neutron.plugins.common import constants as qconstants
from neutron.services.loadbalancer import constants from neutron.services.loadbalancer import constants
@ -53,21 +51,23 @@ ACTIVE = qconstants.ACTIVE
INACTIVE = qconstants.INACTIVE INACTIVE = qconstants.INACTIVE
def save_config(conf_path, logical_config, socket_path=None): def save_config(conf_path, logical_config, socket_path=None,
user_group='nogroup'):
"""Convert a logical configuration to the HAProxy version.""" """Convert a logical configuration to the HAProxy version."""
data = [] data = []
data.extend(_build_global(logical_config, socket_path=socket_path)) data.extend(_build_global(logical_config, socket_path=socket_path,
user_group=user_group))
data.extend(_build_defaults(logical_config)) data.extend(_build_defaults(logical_config))
data.extend(_build_frontend(logical_config)) data.extend(_build_frontend(logical_config))
data.extend(_build_backend(logical_config)) data.extend(_build_backend(logical_config))
utils.replace_file(conf_path, '\n'.join(data)) utils.replace_file(conf_path, '\n'.join(data))
def _build_global(config, socket_path=None): def _build_global(config, socket_path=None, user_group='nogroup'):
opts = [ opts = [
'daemon', 'daemon',
'user nobody', 'user nobody',
'group %s' % cfg.CONF.user_group, 'group %s' % user_group,
'log /dev/log local0', 'log /dev/log local0',
'log /dev/log local1 notice' 'log /dev/log local1 notice'
] ]

View File

@ -20,27 +20,69 @@ import shutil
import socket import socket
import netaddr import netaddr
from oslo.config import cfg
from neutron.agent.common import config
from neutron.agent.linux import ip_lib from neutron.agent.linux import ip_lib
from neutron.agent.linux import utils from neutron.agent.linux import utils
from neutron.common import exceptions from neutron.common import exceptions
from neutron.openstack.common import importutils
from neutron.openstack.common import log as logging from neutron.openstack.common import log as logging
from neutron.plugins.common import constants from neutron.plugins.common import constants
from neutron.services.loadbalancer import constants as lb_const from neutron.services.loadbalancer import constants as lb_const
from neutron.services.loadbalancer.drivers import agent_device_driver
from neutron.services.loadbalancer.drivers.haproxy import cfg as hacfg from neutron.services.loadbalancer.drivers.haproxy import cfg as hacfg
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
NS_PREFIX = 'qlbaas-' NS_PREFIX = 'qlbaas-'
DRIVER_NAME = 'haproxy_ns'
ACTIVE_PENDING = (
constants.ACTIVE,
constants.PENDING_CREATE,
constants.PENDING_UPDATE
)
STATE_PATH_DEFAULT = '$state_path/lbaas'
USER_GROUP_DEFAULT = 'nogroup'
OPTS = [
cfg.StrOpt(
'loadbalancer_state_path',
default=STATE_PATH_DEFAULT,
help=_('Location to store config and state files'),
deprecated_opts=[cfg.DeprecatedOpt('loadbalancer_state_path')],
),
cfg.StrOpt(
'user_group',
default=USER_GROUP_DEFAULT,
help=_('The user group'),
deprecated_opts=[cfg.DeprecatedOpt('user_group')],
)
]
cfg.CONF.register_opts(OPTS, 'haproxy')
class HaproxyNSDriver(object): class HaproxyNSDriver(agent_device_driver.AgentDeviceDriver):
def __init__(self, root_helper, state_path, vif_driver, vip_plug_callback): def __init__(self, conf, plugin_rpc):
self.root_helper = root_helper self.conf = conf
self.state_path = state_path self.root_helper = config.get_root_helper(conf)
self.state_path = conf.haproxy.loadbalancer_state_path
try:
vif_driver = importutils.import_object(conf.interface_driver, conf)
except ImportError:
msg = (_('Error importing interface driver: %s')
% conf.haproxy.interface_driver)
LOG.error(msg)
raise
self.vif_driver = vif_driver self.vif_driver = vif_driver
self.vip_plug_callback = vip_plug_callback self.plugin_rpc = plugin_rpc
self.pool_to_port_id = {} self.pool_to_port_id = {}
@classmethod
def get_name(cls):
return DRIVER_NAME
def create(self, logical_config): def create(self, logical_config):
pool_id = logical_config['pool']['id'] pool_id = logical_config['pool']['id']
namespace = get_ns_name(pool_id) namespace = get_ns_name(pool_id)
@ -62,8 +104,9 @@ class HaproxyNSDriver(object):
conf_path = self._get_state_file_path(pool_id, 'conf') conf_path = self._get_state_file_path(pool_id, 'conf')
pid_path = self._get_state_file_path(pool_id, 'pid') pid_path = self._get_state_file_path(pool_id, 'pid')
sock_path = self._get_state_file_path(pool_id, 'sock') sock_path = self._get_state_file_path(pool_id, 'sock')
user_group = self.conf.haproxy.user_group
hacfg.save_config(conf_path, logical_config, sock_path) hacfg.save_config(conf_path, logical_config, sock_path, user_group)
cmd = ['haproxy', '-f', conf_path, '-p', pid_path] cmd = ['haproxy', '-f', conf_path, '-p', pid_path]
cmd.extend(extra_cmd_args) cmd.extend(extra_cmd_args)
@ -73,7 +116,7 @@ class HaproxyNSDriver(object):
# remember the pool<>port mapping # remember the pool<>port mapping
self.pool_to_port_id[pool_id] = logical_config['vip']['port']['id'] self.pool_to_port_id[pool_id] = logical_config['vip']['port']['id']
def destroy(self, pool_id): def undeploy_instance(self, pool_id):
namespace = get_ns_name(pool_id) namespace = get_ns_name(pool_id)
ns = ip_lib.IPWrapper(self.root_helper, namespace) ns = ip_lib.IPWrapper(self.root_helper, namespace)
pid_path = self._get_state_file_path(pool_id, 'pid') pid_path = self._get_state_file_path(pool_id, 'pid')
@ -176,9 +219,6 @@ class HaproxyNSDriver(object):
return res_stats return res_stats
def remove_orphans(self, known_pool_ids):
raise NotImplementedError()
def _get_state_file_path(self, pool_id, kind, ensure_state_dir=True): def _get_state_file_path(self, pool_id, kind, ensure_state_dir=True):
"""Returns the file name for a given kind of config file.""" """Returns the file name for a given kind of config file."""
confs_dir = os.path.abspath(os.path.normpath(self.state_path)) confs_dir = os.path.abspath(os.path.normpath(self.state_path))
@ -189,7 +229,7 @@ class HaproxyNSDriver(object):
return os.path.join(conf_dir, kind) return os.path.join(conf_dir, kind)
def _plug(self, namespace, port, reuse_existing=True): def _plug(self, namespace, port, reuse_existing=True):
self.vip_plug_callback('plug', port) self.plugin_rpc.plug_vip_port(port['id'])
interface_name = self.vif_driver.get_device_name(Wrap(port)) interface_name = self.vif_driver.get_device_name(Wrap(port))
if ip_lib.device_exists(interface_name, self.root_helper, namespace): if ip_lib.device_exists(interface_name, self.root_helper, namespace):
@ -222,10 +262,67 @@ class HaproxyNSDriver(object):
def _unplug(self, namespace, port_id): def _unplug(self, namespace, port_id):
port_stub = {'id': port_id} port_stub = {'id': port_id}
self.vip_plug_callback('unplug', port_stub) self.plugin_rpc.unplug_vip_port(port_id)
interface_name = self.vif_driver.get_device_name(Wrap(port_stub)) interface_name = self.vif_driver.get_device_name(Wrap(port_stub))
self.vif_driver.unplug(interface_name, namespace=namespace) self.vif_driver.unplug(interface_name, namespace=namespace)
def deploy_instance(self, logical_config):
# do actual deploy only if vip is configured and active
if ('vip' not in logical_config or
logical_config['vip']['status'] not in ACTIVE_PENDING or
not logical_config['vip']['admin_state_up']):
return
if self.exists(logical_config['pool']['id']):
self.update(logical_config)
else:
self.create(logical_config)
def _refresh_device(self, pool_id):
logical_config = self.plugin_rpc.get_logical_device(pool_id)
self.deploy_instance(logical_config)
def create_vip(self, vip):
self._refresh_device(vip['pool_id'])
def update_vip(self, old_vip, vip):
self._refresh_device(vip['pool_id'])
def delete_vip(self, vip):
self.undeploy_instance(vip['pool_id'])
def create_pool(self, pool):
# nothing to do here because a pool needs a vip to be useful
pass
def update_pool(self, old_pool, pool):
self._refresh_device(pool['id'])
def delete_pool(self, pool):
# delete_pool may be called before vip deletion in case
# pool's admin state set to down
if self.exists(pool['id']):
self.undeploy_instance(pool['id'])
def create_member(self, member):
self._refresh_device(member['pool_id'])
def update_member(self, old_member, member):
self._refresh_device(member['pool_id'])
def delete_member(self, member):
self._refresh_device(member['pool_id'])
def create_pool_health_monitor(self, health_monitor, pool_id):
self._refresh_device(pool_id)
def update_pool_health_monitor(self, old_health_monitor, health_monitor,
pool_id):
self._refresh_device(pool_id)
def delete_pool_health_monitor(self, health_monitor, pool_id):
self._refresh_device(pool_id)
# NOTE (markmcclain) For compliance with interface.py which expects objects # NOTE (markmcclain) For compliance with interface.py which expects objects
class Wrap(object): class Wrap(object):

View File

@ -53,13 +53,23 @@ AGENT_SCHEDULER_OPTS = [
cfg.CONF.register_opts(AGENT_SCHEDULER_OPTS) cfg.CONF.register_opts(AGENT_SCHEDULER_OPTS)
# topic name for this particular agent implementation # topic name for this particular agent implementation
TOPIC_PROCESS_ON_HOST = 'q-lbaas-process-on-host' TOPIC_LOADBALANCER_PLUGIN = 'n-lbaas-plugin'
TOPIC_LOADBALANCER_AGENT = 'lbaas_process_on_host_agent' TOPIC_LOADBALANCER_AGENT = 'n-lbaas_agent'
class DriverNotSpecified(q_exc.NeutronException):
message = _("Device driver for agent should be specified "
"in plugin driver.")
class LoadBalancerCallbacks(object): class LoadBalancerCallbacks(object):
RPC_API_VERSION = '1.0' RPC_API_VERSION = '2.0'
# history
# 1.0 Initial version
# 2.0 Generic API for agent based drivers
# - get_logical_device() handling changed;
# - pool_deployed() and update_status() methods added;
def __init__(self, plugin): def __init__(self, plugin):
self.plugin = plugin self.plugin = plugin
@ -70,67 +80,47 @@ class LoadBalancerCallbacks(object):
def get_ready_devices(self, context, host=None): def get_ready_devices(self, context, host=None):
with context.session.begin(subtransactions=True): with context.session.begin(subtransactions=True):
qry = (context.session.query(loadbalancer_db.Pool.id).
join(loadbalancer_db.Vip))
qry = qry.filter(loadbalancer_db.Vip.status.in_(ACTIVE_PENDING))
qry = qry.filter(loadbalancer_db.Pool.status.in_(ACTIVE_PENDING))
up = True # makes pep8 and sqlalchemy happy
qry = qry.filter(loadbalancer_db.Vip.admin_state_up == up)
qry = qry.filter(loadbalancer_db.Pool.admin_state_up == up)
agents = self.plugin.get_lbaas_agents(context, agents = self.plugin.get_lbaas_agents(context,
filters={'host': [host]}) filters={'host': [host]})
if not agents: if not agents:
return [] return []
elif len(agents) > 1: elif len(agents) > 1:
LOG.warning(_('Multiple lbaas agents found on host %s'), host) LOG.warning(_('Multiple lbaas agents found on host %s'), host)
pools = self.plugin.list_pools_on_lbaas_agent(context, pools = self.plugin.list_pools_on_lbaas_agent(context,
agents[0].id) agents[0].id)
pool_ids = [pool['id'] for pool in pools['pools']] pool_ids = [pool['id'] for pool in pools['pools']]
qry = context.session.query(loadbalancer_db.Pool.id)
qry = qry.filter(loadbalancer_db.Pool.id.in_(pool_ids)) qry = qry.filter(loadbalancer_db.Pool.id.in_(pool_ids))
qry = qry.filter(loadbalancer_db.Pool.status.in_(ACTIVE_PENDING))
up = True # makes pep8 and sqlalchemy happy
qry = qry.filter(loadbalancer_db.Pool.admin_state_up == up)
return [id for id, in qry] return [id for id, in qry]
def get_logical_device(self, context, pool_id=None, activate=True, def get_logical_device(self, context, pool_id=None):
**kwargs):
with context.session.begin(subtransactions=True): with context.session.begin(subtransactions=True):
qry = context.session.query(loadbalancer_db.Pool) qry = context.session.query(loadbalancer_db.Pool)
qry = qry.filter_by(id=pool_id) qry = qry.filter_by(id=pool_id)
pool = qry.one() pool = qry.one()
if activate: if pool.status != constants.ACTIVE:
# set all resources to active raise q_exc.Invalid(_('Expected active pool'))
if pool.status in ACTIVE_PENDING:
pool.status = constants.ACTIVE
if pool.vip.status in ACTIVE_PENDING:
pool.vip.status = constants.ACTIVE
for m in pool.members:
if m.status in ACTIVE_PENDING:
m.status = constants.ACTIVE
for hm in pool.monitors:
if hm.status in ACTIVE_PENDING:
hm.status = constants.ACTIVE
if (pool.status != constants.ACTIVE
or pool.vip.status != constants.ACTIVE):
raise q_exc.Invalid(_('Expected active pool and vip'))
retval = {} retval = {}
retval['pool'] = self.plugin._make_pool_dict(pool) retval['pool'] = self.plugin._make_pool_dict(pool)
retval['vip'] = self.plugin._make_vip_dict(pool.vip)
retval['vip']['port'] = ( if pool.vip:
self.plugin._core_plugin._make_port_dict(pool.vip.port) retval['vip'] = self.plugin._make_vip_dict(pool.vip)
) retval['vip']['port'] = (
for fixed_ip in retval['vip']['port']['fixed_ips']: self.plugin._core_plugin._make_port_dict(pool.vip.port)
fixed_ip['subnet'] = (
self.plugin._core_plugin.get_subnet(
context,
fixed_ip['subnet_id']
)
) )
for fixed_ip in retval['vip']['port']['fixed_ips']:
fixed_ip['subnet'] = (
self.plugin._core_plugin.get_subnet(
context,
fixed_ip['subnet_id']
)
)
retval['members'] = [ retval['members'] = [
self.plugin._make_member_dict(m) self.plugin._make_member_dict(m)
for m in pool.members if m.status in (constants.ACTIVE, for m in pool.members if m.status in (constants.ACTIVE,
@ -141,10 +131,49 @@ class LoadBalancerCallbacks(object):
for hm in pool.monitors for hm in pool.monitors
if hm.status == constants.ACTIVE if hm.status == constants.ACTIVE
] ]
retval['driver'] = (
self.plugin.drivers[pool.provider.provider_name].device_driver)
return retval return retval
def pool_destroyed(self, context, pool_id=None, host=None): def pool_deployed(self, context, pool_id):
with context.session.begin(subtransactions=True):
qry = context.session.query(loadbalancer_db.Pool)
qry = qry.filter_by(id=pool_id)
pool = qry.one()
# set all resources to active
if pool.status in ACTIVE_PENDING:
pool.status = constants.ACTIVE
if pool.vip and pool.vip.status in ACTIVE_PENDING:
pool.vip.status = constants.ACTIVE
for m in pool.members:
if m.status in ACTIVE_PENDING:
m.status = constants.ACTIVE
for hm in pool.monitors:
if hm.status in ACTIVE_PENDING:
hm.status = constants.ACTIVE
def update_status(self, context, obj_type, obj_id, status):
model_mapping = {
'pool': loadbalancer_db.Pool,
'vip': loadbalancer_db.Vip,
'member': loadbalancer_db.Member,
'health_monitor': loadbalancer_db.PoolMonitorAssociation
}
if obj_type not in model_mapping:
raise q_exc.Invalid(_('Unknown object type: %s') % obj_type)
elif obj_type == 'health_monitor':
self.plugin.update_pool_health_monitor(
context, obj_id['monitor_id'], obj_id['pool_id'], status)
else:
self.plugin.update_status(
context, model_mapping[obj_type], obj_id, status)
def pool_destroyed(self, context, pool_id=None):
"""Agent confirmation hook that a pool has been destroyed. """Agent confirmation hook that a pool has been destroyed.
This method exists for subclasses to change the deletion This method exists for subclasses to change the deletion
@ -214,65 +243,116 @@ class LoadBalancerCallbacks(object):
class LoadBalancerAgentApi(proxy.RpcProxy): class LoadBalancerAgentApi(proxy.RpcProxy):
"""Plugin side of plugin to agent RPC API.""" """Plugin side of plugin to agent RPC API."""
BASE_RPC_API_VERSION = '1.0' BASE_RPC_API_VERSION = '2.0'
# history # history
# 1.0 Initial version # 1.0 Initial version
# 1.1 Support agent_updated call # 1.1 Support agent_updated call
# 2.0 Generic API for agent based drivers
# - modify/reload/destroy_pool methods were removed;
# - added methods to handle create/update/delete for every lbaas
# object individually;
def __init__(self, topic): def __init__(self, topic):
super(LoadBalancerAgentApi, self).__init__( super(LoadBalancerAgentApi, self).__init__(
topic, default_version=self.BASE_RPC_API_VERSION) topic, default_version=self.BASE_RPC_API_VERSION)
def reload_pool(self, context, pool_id, host): def _cast(self, context, method_name, method_args, host, version=None):
return self.cast( return self.cast(
context, context,
self.make_msg('reload_pool', pool_id=pool_id, host=host), self.make_msg(method_name, **method_args),
topic='%s.%s' % (self.topic, host) topic='%s.%s' % (self.topic, host),
version=version
) )
def destroy_pool(self, context, pool_id, host): def create_vip(self, context, vip, host):
return self.cast( return self._cast(context, 'create_vip', {'vip': vip}, host)
context,
self.make_msg('destroy_pool', pool_id=pool_id, host=host),
topic='%s.%s' % (self.topic, host)
)
def modify_pool(self, context, pool_id, host): def update_vip(self, context, old_vip, vip, host):
return self.cast( return self._cast(context, 'update_vip',
context, {'old_vip': old_vip, 'vip': vip}, host)
self.make_msg('modify_pool', pool_id=pool_id, host=host),
topic='%s.%s' % (self.topic, host) def delete_vip(self, context, vip, host):
) return self._cast(context, 'delete_vip', {'vip': vip}, host)
def create_pool(self, context, pool, host, driver_name):
return self._cast(context, 'create_pool',
{'pool': pool, 'driver_name': driver_name}, host)
def update_pool(self, context, old_pool, pool, host):
return self._cast(context, 'update_pool',
{'old_pool': old_pool, 'pool': pool}, host)
def delete_pool(self, context, pool, host):
return self._cast(context, 'delete_pool', {'pool': pool}, host)
def create_member(self, context, member, host):
return self._cast(context, 'create_member', {'member': member}, host)
def update_member(self, context, old_member, member, host):
return self._cast(context, 'update_member',
{'old_member': old_member, 'member': member}, host)
def delete_member(self, context, member, host):
return self._cast(context, 'delete_member', {'member': member}, host)
def create_pool_health_monitor(self, context, health_monitor, pool_id,
host):
return self._cast(context, 'create_pool_health_monitor',
{'health_monitor': health_monitor,
'pool_id': pool_id}, host)
def update_pool_health_monitor(self, context, old_health_monitor,
health_monitor, pool_id, host):
return self._cast(context, 'update_pool_health_monitor',
{'old_health_monitor': old_health_monitor,
'health_monitor': health_monitor,
'pool_id': pool_id}, host)
def delete_pool_health_monitor(self, context, health_monitor, pool_id,
host):
return self._cast(context, 'delete_pool_health_monitor',
{'health_monitor': health_monitor,
'pool_id': pool_id}, host)
def agent_updated(self, context, admin_state_up, host): def agent_updated(self, context, admin_state_up, host):
return self.cast( return self._cast(context, 'agent_updated',
context, {'payload': {'admin_state_up': admin_state_up}},
self.make_msg('agent_updated', host)
payload={'admin_state_up': admin_state_up}),
topic='%s.%s' % (self.topic, host),
version='1.1'
)
class HaproxyOnHostPluginDriver(abstract_driver.LoadBalancerAbstractDriver): class AgentBasedPluginDriver(abstract_driver.LoadBalancerAbstractDriver):
# name of device driver that should be used by the agent;
# vendor specific plugin drivers must override it;
device_driver = None
def __init__(self, plugin): def __init__(self, plugin):
self.agent_rpc = LoadBalancerAgentApi(TOPIC_LOADBALANCER_AGENT) if not self.device_driver:
self.callbacks = LoadBalancerCallbacks(plugin) raise DriverNotSpecified()
self.agent_rpc = LoadBalancerAgentApi(TOPIC_LOADBALANCER_AGENT)
self.conn = rpc.create_connection(new=True)
self.conn.create_consumer(
TOPIC_PROCESS_ON_HOST,
self.callbacks.create_rpc_dispatcher(),
fanout=False)
self.conn.consume_in_thread()
self.plugin = plugin self.plugin = plugin
self._set_callbacks_on_plugin()
self.plugin.agent_notifiers.update( self.plugin.agent_notifiers.update(
{q_const.AGENT_TYPE_LOADBALANCER: self.agent_rpc}) {q_const.AGENT_TYPE_LOADBALANCER: self.agent_rpc})
self.pool_scheduler = importutils.import_object( self.pool_scheduler = importutils.import_object(
cfg.CONF.loadbalancer_pool_scheduler_driver) cfg.CONF.loadbalancer_pool_scheduler_driver)
def _set_callbacks_on_plugin(self):
# other agent based plugin driver might already set callbacks on plugin
if hasattr(self.plugin, 'agent_callbacks'):
return
self.plugin.agent_callbacks = LoadBalancerCallbacks(self.plugin)
self.plugin.conn = rpc.create_connection(new=True)
self.plugin.conn.create_consumer(
TOPIC_LOADBALANCER_PLUGIN,
self.plugin.agent_callbacks.create_rpc_dispatcher(),
fanout=False)
self.plugin.conn.consume_in_thread()
def get_pool_agent(self, context, pool_id): def get_pool_agent(self, context, pool_id):
agent = self.plugin.get_lbaas_agent_hosting_pool(context, pool_id) agent = self.plugin.get_lbaas_agent_hosting_pool(context, pool_id)
if not agent: if not agent:
@ -281,80 +361,95 @@ class HaproxyOnHostPluginDriver(abstract_driver.LoadBalancerAbstractDriver):
def create_vip(self, context, vip): def create_vip(self, context, vip):
agent = self.get_pool_agent(context, vip['pool_id']) agent = self.get_pool_agent(context, vip['pool_id'])
self.agent_rpc.reload_pool(context, vip['pool_id'], agent['host']) self.agent_rpc.create_vip(context, vip, agent['host'])
def update_vip(self, context, old_vip, vip): def update_vip(self, context, old_vip, vip):
agent = self.get_pool_agent(context, vip['pool_id']) agent = self.get_pool_agent(context, vip['pool_id'])
if vip['status'] in ACTIVE_PENDING: if vip['status'] in ACTIVE_PENDING:
self.agent_rpc.reload_pool(context, vip['pool_id'], agent['host']) self.agent_rpc.update_vip(context, old_vip, vip, agent['host'])
else: else:
self.agent_rpc.destroy_pool(context, vip['pool_id'], agent['host']) self.agent_rpc.delete_vip(context, vip, agent['host'])
def delete_vip(self, context, vip): def delete_vip(self, context, vip):
self.plugin._delete_db_vip(context, vip['id']) self.plugin._delete_db_vip(context, vip['id'])
agent = self.get_pool_agent(context, vip['pool_id']) agent = self.get_pool_agent(context, vip['pool_id'])
self.agent_rpc.destroy_pool(context, vip['pool_id'], agent['host']) self.agent_rpc.delete_vip(context, vip, agent['host'])
def create_pool(self, context, pool): def create_pool(self, context, pool):
if not self.pool_scheduler.schedule(self.plugin, context, pool): agent = self.pool_scheduler.schedule(self.plugin, context, pool,
self.device_driver)
if not agent:
raise lbaas_agentscheduler.NoEligibleLbaasAgent(pool_id=pool['id']) raise lbaas_agentscheduler.NoEligibleLbaasAgent(pool_id=pool['id'])
# don't notify here because a pool needs a vip to be useful self.agent_rpc.create_pool(context, pool, agent['host'],
self.device_driver)
def update_pool(self, context, old_pool, pool): def update_pool(self, context, old_pool, pool):
agent = self.get_pool_agent(context, pool['id']) agent = self.get_pool_agent(context, pool['id'])
if pool['status'] in ACTIVE_PENDING: if pool['status'] in ACTIVE_PENDING:
if pool['vip_id'] is not None: self.agent_rpc.update_pool(context, old_pool, pool,
self.agent_rpc.reload_pool(context, pool['id'], agent['host']) agent['host'])
else: else:
self.agent_rpc.destroy_pool(context, pool['id'], agent['host']) self.agent_rpc.delete_pool(context, pool, agent['host'])
def delete_pool(self, context, pool): def delete_pool(self, context, pool):
# get agent first to know host as binding will be deleted
# after pool is deleted from db
agent = self.plugin.get_lbaas_agent_hosting_pool(context, pool['id']) agent = self.plugin.get_lbaas_agent_hosting_pool(context, pool['id'])
if agent:
self.agent_rpc.destroy_pool(context, pool['id'],
agent['agent']['host'])
self.plugin._delete_db_pool(context, pool['id']) self.plugin._delete_db_pool(context, pool['id'])
if agent:
self.agent_rpc.delete_pool(context, pool, agent['agent']['host'])
def create_member(self, context, member): def create_member(self, context, member):
agent = self.get_pool_agent(context, member['pool_id']) agent = self.get_pool_agent(context, member['pool_id'])
self.agent_rpc.modify_pool(context, member['pool_id'], agent['host']) self.agent_rpc.create_member(context, member, agent['host'])
def update_member(self, context, old_member, member): def update_member(self, context, old_member, member):
agent = self.get_pool_agent(context, member['pool_id'])
# member may change pool id # member may change pool id
if member['pool_id'] != old_member['pool_id']: if member['pool_id'] != old_member['pool_id']:
agent = self.plugin.get_lbaas_agent_hosting_pool( old_pool_agent = self.plugin.get_lbaas_agent_hosting_pool(
context, old_member['pool_id']) context, old_member['pool_id'])
if agent: if old_pool_agent:
self.agent_rpc.modify_pool(context, self.agent_rpc.delete_member(context, old_member,
old_member['pool_id'], old_pool_agent['agent']['host'])
agent['agent']['host']) self.agent_rpc.create_member(context, member, agent['host'])
agent = self.get_pool_agent(context, member['pool_id']) else:
self.agent_rpc.modify_pool(context, member['pool_id'], agent['host']) self.agent_rpc.update_member(context, old_member, member,
agent['host'])
def delete_member(self, context, member): def delete_member(self, context, member):
self.plugin._delete_db_member(context, member['id']) self.plugin._delete_db_member(context, member['id'])
agent = self.get_pool_agent(context, member['pool_id']) agent = self.get_pool_agent(context, member['pool_id'])
self.agent_rpc.modify_pool(context, member['pool_id'], agent['host']) self.agent_rpc.delete_member(context, member, agent['host'])
def update_health_monitor(self, context, old_health_monitor,
health_monitor, pool_id):
# monitors are unused here because agent will fetch what is necessary
agent = self.get_pool_agent(context, pool_id)
self.agent_rpc.modify_pool(context, pool_id, agent['host'])
def create_pool_health_monitor(self, context, healthmon, pool_id): def create_pool_health_monitor(self, context, healthmon, pool_id):
# healthmon is not used here # healthmon is not used here
agent = self.get_pool_agent(context, pool_id) agent = self.get_pool_agent(context, pool_id)
self.agent_rpc.modify_pool(context, pool_id, agent['host']) self.agent_rpc.create_pool_health_monitor(context, healthmon,
pool_id, agent['host'])
def update_pool_health_monitor(self, context, old_health_monitor,
health_monitor, pool_id):
agent = self.get_pool_agent(context, pool_id)
self.agent_rpc.update_pool_health_monitor(context, old_health_monitor,
health_monitor, pool_id,
agent['host'])
def delete_pool_health_monitor(self, context, health_monitor, pool_id): def delete_pool_health_monitor(self, context, health_monitor, pool_id):
self.plugin._delete_db_pool_health_monitor( self.plugin._delete_db_pool_health_monitor(
context, health_monitor['id'], pool_id context, health_monitor['id'], pool_id
) )
# healthmon_id is not used here
agent = self.get_pool_agent(context, pool_id) agent = self.get_pool_agent(context, pool_id)
self.agent_rpc.modify_pool(context, pool_id, agent['host']) self.agent_rpc.delete_pool_health_monitor(context, health_monitor,
pool_id, agent['host'])
def stats(self, context, pool_id): def stats(self, context, pool_id):
pass pass
class HaproxyOnHostPluginDriver(AgentBasedPluginDriver):
#TODO(obondarev): change hardcoded driver name
# to namespace_driver.DRIVER_NAME after moving HaproxyOnHostPluginDriver
# to a separate file (follow-up patch)
device_driver = 'haproxy_ns'

View File

@ -285,9 +285,9 @@ class LoadBalancerDriver(abstract_driver.LoadBalancerAbstractDriver):
# Anything to do here? the hm is not connected to the graph yet # Anything to do here? the hm is not connected to the graph yet
pass pass
def update_health_monitor(self, context, old_health_monitor, def update_pool_health_monitor(self, context, old_health_monitor,
health_monitor, health_monitor,
pool_id): pool_id):
self._handle_pool_health_monitor(context, health_monitor, pool_id) self._handle_pool_health_monitor(context, health_monitor, pool_id)
def create_pool_health_monitor(self, context, def create_pool_health_monitor(self, context,

View File

@ -242,8 +242,8 @@ class LoadBalancerPlugin(ldb.LoadBalancerPluginDb,
).filter_by(monitor_id=hm['id']).join(ldb.Pool) ).filter_by(monitor_id=hm['id']).join(ldb.Pool)
for assoc in qry: for assoc in qry:
driver = self._get_driver_for_pool(context, assoc['pool_id']) driver = self._get_driver_for_pool(context, assoc['pool_id'])
driver.update_health_monitor(context, old_hm, driver.update_pool_health_monitor(context, old_hm,
hm, assoc['pool_id']) hm, assoc['pool_id'])
return hm return hm
def _delete_db_pool_health_monitor(self, context, hm_id, pool_id): def _delete_db_pool_health_monitor(self, context, hm_id, pool_id):

View File

@ -102,9 +102,9 @@ class NoopLbaaSDriver(abstract_driver.LoadBalancerAbstractDriver):
def delete_member(self, context, member): def delete_member(self, context, member):
self.plugin._delete_db_member(context, member["id"]) self.plugin._delete_db_member(context, member["id"])
def update_health_monitor(self, context, old_health_monitor, def update_pool_health_monitor(self, context, old_health_monitor,
health_monitor, health_monitor,
pool_association): pool_association):
pass pass
def create_pool_health_monitor(self, context, def create_pool_health_monitor(self, context,

View File

@ -20,129 +20,20 @@ import contextlib
import mock import mock
from neutron.plugins.common import constants
from neutron.services.loadbalancer.drivers.haproxy import ( from neutron.services.loadbalancer.drivers.haproxy import (
agent_manager as manager agent_manager as manager
) )
from neutron.tests import base from neutron.tests import base
class TestLogicalDeviceCache(base.BaseTestCase):
def setUp(self):
super(TestLogicalDeviceCache, self).setUp()
self.cache = manager.LogicalDeviceCache()
def test_put(self):
fake_device = {
'vip': {'port_id': 'port_id'},
'pool': {'id': 'pool_id'}
}
self.cache.put(fake_device)
self.assertEqual(len(self.cache.devices), 1)
self.assertEqual(len(self.cache.port_lookup), 1)
self.assertEqual(len(self.cache.pool_lookup), 1)
def test_double_put(self):
fake_device = {
'vip': {'port_id': 'port_id'},
'pool': {'id': 'pool_id'}
}
self.cache.put(fake_device)
self.cache.put(fake_device)
self.assertEqual(len(self.cache.devices), 1)
self.assertEqual(len(self.cache.port_lookup), 1)
self.assertEqual(len(self.cache.pool_lookup), 1)
def test_remove_in_cache(self):
fake_device = {
'vip': {'port_id': 'port_id'},
'pool': {'id': 'pool_id'}
}
self.cache.put(fake_device)
self.assertEqual(len(self.cache.devices), 1)
self.cache.remove(fake_device)
self.assertFalse(len(self.cache.devices))
self.assertFalse(self.cache.port_lookup)
self.assertFalse(self.cache.pool_lookup)
def test_remove_in_cache_same_object(self):
fake_device = {
'vip': {'port_id': 'port_id'},
'pool': {'id': 'pool_id'}
}
self.cache.put(fake_device)
self.assertEqual(len(self.cache.devices), 1)
self.cache.remove(set(self.cache.devices).pop())
self.assertFalse(len(self.cache.devices))
self.assertFalse(self.cache.port_lookup)
self.assertFalse(self.cache.pool_lookup)
def test_remove_by_pool_id(self):
fake_device = {
'vip': {'port_id': 'port_id'},
'pool': {'id': 'pool_id'}
}
self.cache.put(fake_device)
self.assertEqual(len(self.cache.devices), 1)
self.cache.remove_by_pool_id('pool_id')
self.assertFalse(len(self.cache.devices))
self.assertFalse(self.cache.port_lookup)
self.assertFalse(self.cache.pool_lookup)
def test_get_by_pool_id(self):
fake_device = {
'vip': {'port_id': 'port_id'},
'pool': {'id': 'pool_id'}
}
self.cache.put(fake_device)
dev = self.cache.get_by_pool_id('pool_id')
self.assertEqual(dev.pool_id, 'pool_id')
self.assertEqual(dev.port_id, 'port_id')
def test_get_by_port_id(self):
fake_device = {
'vip': {'port_id': 'port_id'},
'pool': {'id': 'pool_id'}
}
self.cache.put(fake_device)
dev = self.cache.get_by_port_id('port_id')
self.assertEqual(dev.pool_id, 'pool_id')
self.assertEqual(dev.port_id, 'port_id')
def test_get_pool_ids(self):
fake_device = {
'vip': {'port_id': 'port_id'},
'pool': {'id': 'pool_id'}
}
self.cache.put(fake_device)
self.assertEqual(self.cache.get_pool_ids(), ['pool_id'])
class TestManager(base.BaseTestCase): class TestManager(base.BaseTestCase):
def setUp(self): def setUp(self):
super(TestManager, self).setUp() super(TestManager, self).setUp()
self.addCleanup(mock.patch.stopall) self.addCleanup(mock.patch.stopall)
mock_conf = mock.Mock() mock_conf = mock.Mock()
mock_conf.interface_driver = 'intdriver' mock_conf.device_driver = ['devdriver']
mock_conf.device_driver = 'devdriver'
mock_conf.AGENT.root_helper = 'sudo'
mock_conf.loadbalancer_state_path = '/the/path'
self.mock_importer = mock.patch.object(manager, 'importutils').start() self.mock_importer = mock.patch.object(manager, 'importutils').start()
@ -154,6 +45,9 @@ class TestManager(base.BaseTestCase):
self.mgr = manager.LbaasAgentManager(mock_conf) self.mgr = manager.LbaasAgentManager(mock_conf)
self.rpc_mock = rpc_mock_cls.return_value self.rpc_mock = rpc_mock_cls.return_value
self.log = mock.patch.object(manager, 'LOG').start() self.log = mock.patch.object(manager, 'LOG').start()
self.driver_mock = mock.Mock()
self.mgr.device_drivers = {'devdriver': self.driver_mock}
self.mgr.instance_mapping = {'1': 'devdriver', '2': 'devdriver'}
self.mgr.needs_resync = False self.mgr.needs_resync = False
def test_initialize_service_hook(self): def test_initialize_service_hook(self):
@ -174,64 +68,51 @@ class TestManager(base.BaseTestCase):
self.assertFalse(sync.called) self.assertFalse(sync.called)
def test_collect_stats(self): def test_collect_stats(self):
with mock.patch.object(self.mgr, 'cache') as cache: self.mgr.collect_stats(mock.Mock())
cache.get_pool_ids.return_value = ['1', '2'] self.rpc_mock.update_pool_stats.assert_has_calls([
self.mgr.collect_stats(mock.Mock()) mock.call('1', mock.ANY),
self.rpc_mock.update_pool_stats.assert_has_calls([ mock.call('2', mock.ANY)
mock.call('1', mock.ANY), ])
mock.call('2', mock.ANY)
])
def test_collect_stats_exception(self): def test_collect_stats_exception(self):
with mock.patch.object(self.mgr, 'cache') as cache: self.driver_mock.get_stats.side_effect = Exception
cache.get_pool_ids.return_value = ['1', '2']
with mock.patch.object(self.mgr, 'driver') as driver:
driver.get_stats.side_effect = Exception
self.mgr.collect_stats(mock.Mock()) self.mgr.collect_stats(mock.Mock())
self.assertFalse(self.rpc_mock.called) self.assertFalse(self.rpc_mock.called)
self.assertTrue(self.mgr.needs_resync) self.assertTrue(self.mgr.needs_resync)
self.assertTrue(self.log.exception.called) self.assertTrue(self.log.exception.called)
def test_vip_plug_callback(self): def _sync_state_helper(self, ready, reloaded, destroyed):
self.mgr._vip_plug_callback('plug', {'id': 'id'})
self.rpc_mock.plug_vip_port.assert_called_once_with('id')
def test_vip_unplug_callback(self):
self.mgr._vip_plug_callback('unplug', {'id': 'id'})
self.rpc_mock.unplug_vip_port.assert_called_once_with('id')
def _sync_state_helper(self, cache, ready, refreshed, destroyed):
with contextlib.nested( with contextlib.nested(
mock.patch.object(self.mgr, 'cache'), mock.patch.object(self.mgr, '_reload_pool'),
mock.patch.object(self.mgr, 'refresh_device'), mock.patch.object(self.mgr, '_destroy_pool')
mock.patch.object(self.mgr, 'destroy_device') ) as (reload, destroy):
) as (mock_cache, refresh, destroy):
mock_cache.get_pool_ids.return_value = cache
self.rpc_mock.get_ready_devices.return_value = ready self.rpc_mock.get_ready_devices.return_value = ready
self.mgr.sync_state() self.mgr.sync_state()
self.assertEqual(len(refreshed), len(refresh.mock_calls)) self.assertEqual(len(reloaded), len(reload.mock_calls))
self.assertEqual(len(destroyed), len(destroy.mock_calls)) self.assertEqual(len(destroyed), len(destroy.mock_calls))
refresh.assert_has_calls([mock.call(i) for i in refreshed]) reload.assert_has_calls([mock.call(i) for i in reloaded])
destroy.assert_has_calls([mock.call(i) for i in destroyed]) destroy.assert_has_calls([mock.call(i) for i in destroyed])
self.assertFalse(self.mgr.needs_resync) self.assertFalse(self.mgr.needs_resync)
def test_sync_state_all_known(self): def test_sync_state_all_known(self):
self._sync_state_helper(['1', '2'], ['1', '2'], ['1', '2'], []) self._sync_state_helper(['1', '2'], ['1', '2'], [])
def test_sync_state_all_unknown(self): def test_sync_state_all_unknown(self):
self._sync_state_helper([], ['1', '2'], ['1', '2'], []) self.mgr.instance_mapping = {}
self._sync_state_helper(['1', '2'], ['1', '2'], [])
def test_sync_state_destroy_all(self): def test_sync_state_destroy_all(self):
self._sync_state_helper(['1', '2'], [], [], ['1', '2']) self._sync_state_helper([], [], ['1', '2'])
def test_sync_state_both(self): def test_sync_state_both(self):
self._sync_state_helper(['1'], ['2'], ['2'], ['1']) self.mgr.instance_mapping = {'1': 'devdriver'}
self._sync_state_helper(['2'], ['2'], ['1'])
def test_sync_state_exception(self): def test_sync_state_exception(self):
self.rpc_mock.get_ready_devices.side_effect = Exception self.rpc_mock.get_ready_devices.side_effect = Exception
@ -241,127 +122,251 @@ class TestManager(base.BaseTestCase):
self.assertTrue(self.log.exception.called) self.assertTrue(self.log.exception.called)
self.assertTrue(self.mgr.needs_resync) self.assertTrue(self.mgr.needs_resync)
def test_refresh_device_exists(self): def test_reload_pool(self):
config = self.rpc_mock.get_logical_device.return_value config = {'driver': 'devdriver'}
self.rpc_mock.get_logical_device.return_value = config
pool_id = 'new_id'
self.assertNotIn(pool_id, self.mgr.instance_mapping)
with mock.patch.object(self.mgr, 'driver') as driver: self.mgr._reload_pool(pool_id)
with mock.patch.object(self.mgr, 'cache') as cache:
driver.exists.return_value = True
self.mgr.refresh_device(config) self.driver_mock.deploy_instance.assert_called_once_with(config)
self.assertIn(pool_id, self.mgr.instance_mapping)
self.rpc_mock.pool_deployed.assert_called_once_with(pool_id)
driver.exists.assert_called_once_with(config) def test_reload_pool_driver_not_found(self):
driver.update.assert_called_once_with(config) config = {'driver': 'unknown_driver'}
cache.put.assert_called_once_with(config) self.rpc_mock.get_logical_device.return_value = config
self.assertFalse(self.mgr.needs_resync) pool_id = 'new_id'
self.assertNotIn(pool_id, self.mgr.instance_mapping)
def test_refresh_device_new(self): self.mgr._reload_pool(pool_id)
config = self.rpc_mock.get_logical_device.return_value
with mock.patch.object(self.mgr, 'driver') as driver: self.assertTrue(self.log.error.called)
with mock.patch.object(self.mgr, 'cache') as cache: self.assertFalse(self.driver_mock.deploy_instance.called)
driver.exists.return_value = False self.assertNotIn(pool_id, self.mgr.instance_mapping)
self.assertFalse(self.rpc_mock.pool_deployed.called)
self.mgr.refresh_device(config) def test_reload_pool_exception_on_driver(self):
config = {'driver': 'devdriver'}
self.rpc_mock.get_logical_device.return_value = config
self.driver_mock.deploy_instance.side_effect = Exception
pool_id = 'new_id'
self.assertNotIn(pool_id, self.mgr.instance_mapping)
driver.exists.assert_called_once_with(config) self.mgr._reload_pool(pool_id)
driver.create.assert_called_once_with(config)
cache.put.assert_called_once_with(config)
self.assertFalse(self.mgr.needs_resync)
def test_refresh_device_exception(self): self.driver_mock.deploy_instance.assert_called_once_with(config)
config = self.rpc_mock.get_logical_device.return_value self.assertNotIn(pool_id, self.mgr.instance_mapping)
self.assertFalse(self.rpc_mock.pool_deployed.called)
self.assertTrue(self.log.exception.called)
self.assertTrue(self.mgr.needs_resync)
with mock.patch.object(self.mgr, 'driver') as driver: def test_destroy_pool(self):
with mock.patch.object(self.mgr, 'cache') as cache: pool_id = '1'
driver.exists.side_effect = Exception self.assertIn(pool_id, self.mgr.instance_mapping)
self.mgr.refresh_device(config)
driver.exists.assert_called_once_with(config) self.mgr._destroy_pool(pool_id)
self.assertTrue(self.mgr.needs_resync)
self.assertTrue(self.log.exception.called)
self.assertFalse(cache.put.called)
def test_destroy_device_known(self): self.driver_mock.undeploy_instance.assert_called_once_with(pool_id)
with mock.patch.object(self.mgr, 'driver') as driver: self.assertNotIn(pool_id, self.mgr.instance_mapping)
with mock.patch.object(self.mgr, 'cache') as cache: self.rpc_mock.pool_destroyed.assert_called_once_with(pool_id)
cache.get_by_pool_id.return_value = True self.assertFalse(self.mgr.needs_resync)
self.mgr.destroy_device('pool_id') def test_destroy_pool_exception_on_driver(self):
cache.get_by_pool_id.assert_called_once_with('pool_id') pool_id = '1'
driver.destroy.assert_called_once_with('pool_id') self.assertIn(pool_id, self.mgr.instance_mapping)
self.rpc_mock.pool_destroyed.assert_called_once_with( self.driver_mock.undeploy_instance.side_effect = Exception
'pool_id'
)
cache.remove.assert_called_once_with(True)
self.assertFalse(self.mgr.needs_resync)
def test_destroy_device_unknown(self): self.mgr._destroy_pool(pool_id)
with mock.patch.object(self.mgr, 'driver') as driver:
with mock.patch.object(self.mgr, 'cache') as cache:
cache.get_by_pool_id.return_value = None
self.mgr.destroy_device('pool_id') self.driver_mock.undeploy_instance.assert_called_once_with(pool_id)
cache.get_by_pool_id.assert_called_once_with('pool_id') self.assertIn(pool_id, self.mgr.instance_mapping)
self.assertFalse(driver.destroy.called) self.assertFalse(self.rpc_mock.pool_destroyed.called)
self.assertTrue(self.log.exception.called)
self.assertTrue(self.mgr.needs_resync)
def test_destroy_device_exception(self): def test_get_driver_unknown_device(self):
with mock.patch.object(self.mgr, 'driver') as driver: self.assertRaises(manager.DeviceNotFoundOnAgent,
with mock.patch.object(self.mgr, 'cache') as cache: self.mgr._get_driver, 'unknown')
cache.get_by_pool_id.return_value = True
driver.destroy.side_effect = Exception
self.mgr.destroy_device('pool_id')
cache.get_by_pool_id.assert_called_once_with('pool_id')
self.assertTrue(self.log.exception.called)
self.assertTrue(self.mgr.needs_resync)
def test_remove_orphans(self): def test_remove_orphans(self):
with mock.patch.object(self.mgr, 'driver') as driver: self.mgr.remove_orphans()
with mock.patch.object(self.mgr, 'cache') as cache: self.driver_mock.remove_orphans.assert_called_once_with(['1', '2'])
cache.get_pool_ids.return_value = ['1', '2']
self.mgr.remove_orphans()
driver.remove_orphans.assert_called_once_with(['1', '2']) def test_create_vip(self):
vip = {'id': 'id1', 'pool_id': '1'}
self.mgr.create_vip(mock.Mock(), vip)
self.driver_mock.create_vip.assert_called_once_with(vip)
self.rpc_mock.update_status.assert_called_once_with('vip', vip['id'],
constants.ACTIVE)
def test_reload_pool(self): def test_create_vip_failed(self):
with mock.patch.object(self.mgr, 'refresh_device') as refresh: vip = {'id': 'id1', 'pool_id': '1'}
self.mgr.reload_pool(mock.Mock(), pool_id='pool_id') self.driver_mock.create_vip.side_effect = Exception
refresh.assert_called_once_with('pool_id') self.mgr.create_vip(mock.Mock(), vip)
self.driver_mock.create_vip.assert_called_once_with(vip)
self.rpc_mock.update_status.assert_called_once_with('vip', vip['id'],
constants.ERROR)
def test_modify_pool_known(self): def test_update_vip(self):
with mock.patch.object(self.mgr, 'refresh_device') as refresh: old_vip = {'id': 'id1'}
with mock.patch.object(self.mgr, 'cache') as cache: vip = {'id': 'id1', 'pool_id': '1'}
cache.get_by_pool_id.return_value = True self.mgr.update_vip(mock.Mock(), old_vip, vip)
self.driver_mock.update_vip.assert_called_once_with(old_vip, vip)
self.rpc_mock.update_status.assert_called_once_with('vip', vip['id'],
constants.ACTIVE)
self.mgr.reload_pool(mock.Mock(), pool_id='pool_id') def test_update_vip_failed(self):
old_vip = {'id': 'id1'}
vip = {'id': 'id1', 'pool_id': '1'}
self.driver_mock.update_vip.side_effect = Exception
self.mgr.update_vip(mock.Mock(), old_vip, vip)
self.driver_mock.update_vip.assert_called_once_with(old_vip, vip)
self.rpc_mock.update_status.assert_called_once_with('vip', vip['id'],
constants.ERROR)
refresh.assert_called_once_with('pool_id') def test_delete_vip(self):
vip = {'id': 'id1', 'pool_id': '1'}
self.mgr.delete_vip(mock.Mock(), vip)
self.driver_mock.delete_vip.assert_called_once_with(vip)
def test_modify_pool_unknown(self): def test_create_pool(self):
with mock.patch.object(self.mgr, 'refresh_device') as refresh: pool = {'id': 'id1'}
with mock.patch.object(self.mgr, 'cache') as cache: self.assertNotIn(pool['id'], self.mgr.instance_mapping)
cache.get_by_pool_id.return_value = False self.mgr.create_pool(mock.Mock(), pool, 'devdriver')
self.driver_mock.create_pool.assert_called_once_with(pool)
self.rpc_mock.update_status.assert_called_once_with('pool', pool['id'],
constants.ACTIVE)
self.assertIn(pool['id'], self.mgr.instance_mapping)
self.mgr.modify_pool(mock.Mock(), pool_id='pool_id') def test_create_pool_failed(self):
pool = {'id': 'id1'}
self.assertNotIn(pool['id'], self.mgr.instance_mapping)
self.driver_mock.create_pool.side_effect = Exception
self.mgr.create_pool(mock.Mock(), pool, 'devdriver')
self.driver_mock.create_pool.assert_called_once_with(pool)
self.rpc_mock.update_status.assert_called_once_with('pool', pool['id'],
constants.ERROR)
self.assertNotIn(pool['id'], self.mgr.instance_mapping)
self.assertFalse(refresh.called) def test_update_pool(self):
old_pool = {'id': '1'}
pool = {'id': '1'}
self.mgr.update_pool(mock.Mock(), old_pool, pool)
self.driver_mock.update_pool.assert_called_once_with(old_pool, pool)
self.rpc_mock.update_status.assert_called_once_with('pool', pool['id'],
constants.ACTIVE)
def test_destroy_pool_known(self): def test_update_pool_failed(self):
with mock.patch.object(self.mgr, 'destroy_device') as destroy: old_pool = {'id': '1'}
with mock.patch.object(self.mgr, 'cache') as cache: pool = {'id': '1'}
cache.get_by_pool_id.return_value = True self.driver_mock.update_pool.side_effect = Exception
self.mgr.update_pool(mock.Mock(), old_pool, pool)
self.driver_mock.update_pool.assert_called_once_with(old_pool, pool)
self.rpc_mock.update_status.assert_called_once_with('pool', pool['id'],
constants.ERROR)
self.mgr.destroy_pool(mock.Mock(), pool_id='pool_id') def test_delete_pool(self):
pool = {'id': '1'}
self.assertIn(pool['id'], self.mgr.instance_mapping)
self.mgr.delete_pool(mock.Mock(), pool)
self.driver_mock.delete_pool.assert_called_once_with(pool)
self.assertNotIn(pool['id'], self.mgr.instance_mapping)
destroy.assert_called_once_with('pool_id') def test_create_member(self):
member = {'id': 'id1', 'pool_id': '1'}
self.mgr.create_member(mock.Mock(), member)
self.driver_mock.create_member.assert_called_once_with(member)
self.rpc_mock.update_status.assert_called_once_with('member',
member['id'],
constants.ACTIVE)
def test_destroy_pool_unknown(self): def test_create_member_failed(self):
with mock.patch.object(self.mgr, 'destroy_device') as destroy: member = {'id': 'id1', 'pool_id': '1'}
with mock.patch.object(self.mgr, 'cache') as cache: self.driver_mock.create_member.side_effect = Exception
cache.get_by_pool_id.return_value = False self.mgr.create_member(mock.Mock(), member)
self.driver_mock.create_member.assert_called_once_with(member)
self.rpc_mock.update_status.assert_called_once_with('member',
member['id'],
constants.ERROR)
self.mgr.destroy_pool(mock.Mock(), pool_id='pool_id') def test_update_member(self):
old_member = {'id': 'id1'}
member = {'id': 'id1', 'pool_id': '1'}
self.mgr.update_member(mock.Mock(), old_member, member)
self.driver_mock.update_member.assert_called_once_with(old_member,
member)
self.rpc_mock.update_status.assert_called_once_with('member',
member['id'],
constants.ACTIVE)
self.assertFalse(destroy.called) def test_update_member_failed(self):
old_member = {'id': 'id1'}
member = {'id': 'id1', 'pool_id': '1'}
self.driver_mock.update_member.side_effect = Exception
self.mgr.update_member(mock.Mock(), old_member, member)
self.driver_mock.update_member.assert_called_once_with(old_member,
member)
self.rpc_mock.update_status.assert_called_once_with('member',
member['id'],
constants.ERROR)
def test_delete_member(self):
member = {'id': 'id1', 'pool_id': '1'}
self.mgr.delete_member(mock.Mock(), member)
self.driver_mock.delete_member.assert_called_once_with(member)
def test_create_monitor(self):
monitor = {'id': 'id1'}
assoc_id = {'monitor_id': monitor['id'], 'pool_id': '1'}
self.mgr.create_pool_health_monitor(mock.Mock(), monitor, '1')
self.driver_mock.create_pool_health_monitor.assert_called_once_with(
monitor, '1')
self.rpc_mock.update_status.assert_called_once_with('health_monitor',
assoc_id,
constants.ACTIVE)
def test_create_monitor_failed(self):
monitor = {'id': 'id1'}
assoc_id = {'monitor_id': monitor['id'], 'pool_id': '1'}
self.driver_mock.create_pool_health_monitor.side_effect = Exception
self.mgr.create_pool_health_monitor(mock.Mock(), monitor, '1')
self.driver_mock.create_pool_health_monitor.assert_called_once_with(
monitor, '1')
self.rpc_mock.update_status.assert_called_once_with('health_monitor',
assoc_id,
constants.ERROR)
def test_update_monitor(self):
monitor = {'id': 'id1'}
assoc_id = {'monitor_id': monitor['id'], 'pool_id': '1'}
self.mgr.update_pool_health_monitor(mock.Mock(), monitor, monitor, '1')
self.driver_mock.update_pool_health_monitor.assert_called_once_with(
monitor, monitor, '1')
self.rpc_mock.update_status.assert_called_once_with('health_monitor',
assoc_id,
constants.ACTIVE)
def test_update_monitor_failed(self):
monitor = {'id': 'id1'}
assoc_id = {'monitor_id': monitor['id'], 'pool_id': '1'}
self.driver_mock.update_pool_health_monitor.side_effect = Exception
self.mgr.update_pool_health_monitor(mock.Mock(), monitor, monitor, '1')
self.driver_mock.update_pool_health_monitor.assert_called_once_with(
monitor, monitor, '1')
self.rpc_mock.update_status.assert_called_once_with('health_monitor',
assoc_id,
constants.ERROR)
def test_delete_monitor(self):
monitor = {'id': 'id1'}
self.mgr.delete_pool_health_monitor(mock.Mock(), monitor, '1')
self.driver_mock.delete_pool_health_monitor.assert_called_once_with(
monitor, '1')
def test_agent_disabled(self):
payload = {'admin_state_up': False}
self.mgr.agent_updated(mock.Mock(), payload)
self.driver_mock.undeploy_instance.assert_has_calls(
[mock.call('1'), mock.call('2')])

View File

@ -58,8 +58,7 @@ class TestApiCache(base.BaseTestCase):
self.make_msg.assert_called_once_with( self.make_msg.assert_called_once_with(
'get_logical_device', 'get_logical_device',
pool_id='pool_id', pool_id='pool_id')
host='host')
self.mock_call.assert_called_once_with( self.mock_call.assert_called_once_with(
mock.sentinel.context, mock.sentinel.context,
@ -75,8 +74,41 @@ class TestApiCache(base.BaseTestCase):
self.make_msg.assert_called_once_with( self.make_msg.assert_called_once_with(
'pool_destroyed', 'pool_destroyed',
pool_id='pool_id', pool_id='pool_id')
host='host')
self.mock_call.assert_called_once_with(
mock.sentinel.context,
self.make_msg.return_value,
topic='topic'
)
def test_pool_deployed(self):
self.assertEqual(
self.api.pool_deployed('pool_id'),
self.mock_call.return_value
)
self.make_msg.assert_called_once_with(
'pool_deployed',
pool_id='pool_id')
self.mock_call.assert_called_once_with(
mock.sentinel.context,
self.make_msg.return_value,
topic='topic'
)
def test_update_status(self):
self.assertEqual(
self.api.update_status('pool', 'pool_id', 'ACTIVE'),
self.mock_call.return_value
)
self.make_msg.assert_called_once_with(
'update_status',
obj_type='pool',
obj_id='pool_id',
status='ACTIVE')
self.mock_call.assert_called_once_with( self.mock_call.assert_called_once_with(
mock.sentinel.context, mock.sentinel.context,

View File

@ -18,9 +18,8 @@
# @author: Oleg Bondarev (obondarev@mirantis.com) # @author: Oleg Bondarev (obondarev@mirantis.com)
import contextlib import contextlib
import mock
from oslo.config import cfg as config import mock
from neutron.services.loadbalancer.drivers.haproxy import cfg from neutron.services.loadbalancer.drivers.haproxy import cfg
from neutron.tests import base from neutron.tests import base
@ -50,9 +49,6 @@ class TestHaproxyCfg(base.BaseTestCase):
'\n'.join(test_config)) '\n'.join(test_config))
def test_build_global(self): def test_build_global(self):
if not hasattr(config.CONF, 'user_group'):
config.CONF.register_opt(config.StrOpt('user_group'))
config.CONF.set_override('user_group', 'test_group')
expected_opts = ['global', expected_opts = ['global',
'\tdaemon', '\tdaemon',
'\tuser nobody', '\tuser nobody',
@ -60,9 +56,8 @@ class TestHaproxyCfg(base.BaseTestCase):
'\tlog /dev/log local0', '\tlog /dev/log local0',
'\tlog /dev/log local1 notice', '\tlog /dev/log local1 notice',
'\tstats socket test_path mode 0666 level user'] '\tstats socket test_path mode 0666 level user']
opts = cfg._build_global(mock.Mock(), 'test_path') opts = cfg._build_global(mock.Mock(), 'test_path', 'test_group')
self.assertEqual(expected_opts, list(opts)) self.assertEqual(expected_opts, list(opts))
config.CONF.reset()
def test_build_defaults(self): def test_build_defaults(self):
expected_opts = ['defaults', expected_opts = ['defaults',
@ -74,7 +69,6 @@ class TestHaproxyCfg(base.BaseTestCase):
'\ttimeout server 50000'] '\ttimeout server 50000']
opts = cfg._build_defaults(mock.Mock()) opts = cfg._build_defaults(mock.Mock())
self.assertEqual(expected_opts, list(opts)) self.assertEqual(expected_opts, list(opts))
config.CONF.reset()
def test_build_frontend(self): def test_build_frontend(self):
test_config = {'vip': {'id': 'vip_id', test_config = {'vip': {'id': 'vip_id',

View File

@ -17,6 +17,7 @@
# @author: Mark McClain, DreamHost # @author: Mark McClain, DreamHost
import contextlib import contextlib
import mock import mock
from neutron.common import exceptions from neutron.common import exceptions
@ -29,22 +30,33 @@ from neutron.tests import base
class TestHaproxyNSDriver(base.BaseTestCase): class TestHaproxyNSDriver(base.BaseTestCase):
def setUp(self): def setUp(self):
super(TestHaproxyNSDriver, self).setUp() super(TestHaproxyNSDriver, self).setUp()
self.addCleanup(mock.patch.stopall)
self.vif_driver = mock.Mock() conf = mock.Mock()
self.vip_plug_callback = mock.Mock() conf.haproxy.loadbalancer_state_path = '/the/path'
conf.interface_driver = 'intdriver'
conf.haproxy.user_group = 'test_group'
conf.AGENT.root_helper = 'sudo_test'
self.mock_importer = mock.patch.object(namespace_driver,
'importutils').start()
self.rpc_mock = mock.Mock()
self.driver = namespace_driver.HaproxyNSDriver( self.driver = namespace_driver.HaproxyNSDriver(
'sudo', conf,
'/the/path', self.rpc_mock
self.vif_driver,
self.vip_plug_callback
) )
self.vif_driver = mock.Mock()
self.driver.vif_driver = self.vif_driver
self.fake_config = { self.fake_config = {
'pool': {'id': 'pool_id'}, 'pool': {'id': 'pool_id'},
'vip': {'id': 'vip_id', 'port': {'id': 'port_id'}} 'vip': {'id': 'vip_id', 'port': {'id': 'port_id'},
'status': 'ACTIVE', 'admin_state_up': True}
} }
def test_get_name(self):
self.assertEqual(self.driver.get_name(), namespace_driver.DRIVER_NAME)
def test_create(self): def test_create(self):
with mock.patch.object(self.driver, '_plug') as plug: with mock.patch.object(self.driver, '_plug') as plug:
with mock.patch.object(self.driver, '_spawn') as spawn: with mock.patch.object(self.driver, '_spawn') as spawn:
@ -78,14 +90,15 @@ class TestHaproxyNSDriver(base.BaseTestCase):
self.driver._spawn(self.fake_config) self.driver._spawn(self.fake_config)
mock_save.assert_called_once_with('conf', self.fake_config, 'sock') mock_save.assert_called_once_with('conf', self.fake_config,
'sock', 'test_group')
cmd = ['haproxy', '-f', 'conf', '-p', 'pid'] cmd = ['haproxy', '-f', 'conf', '-p', 'pid']
ip_wrap.assert_has_calls([ ip_wrap.assert_has_calls([
mock.call('sudo', 'qlbaas-pool_id'), mock.call('sudo_test', 'qlbaas-pool_id'),
mock.call().netns.execute(cmd) mock.call().netns.execute(cmd)
]) ])
def test_destroy(self): def test_undeploy_instance(self):
with contextlib.nested( with contextlib.nested(
mock.patch.object(self.driver, '_get_state_file_path'), mock.patch.object(self.driver, '_get_state_file_path'),
mock.patch.object(namespace_driver, 'kill_pids_in_file'), mock.patch.object(namespace_driver, 'kill_pids_in_file'),
@ -99,14 +112,14 @@ class TestHaproxyNSDriver(base.BaseTestCase):
self.driver.pool_to_port_id['pool_id'] = 'port_id' self.driver.pool_to_port_id['pool_id'] = 'port_id'
isdir.return_value = True isdir.return_value = True
self.driver.destroy('pool_id') self.driver.undeploy_instance('pool_id')
kill.assert_called_once_with('sudo', '/pool/pid') kill.assert_called_once_with('sudo_test', '/pool/pid')
unplug.assert_called_once_with('qlbaas-pool_id', 'port_id') unplug.assert_called_once_with('qlbaas-pool_id', 'port_id')
isdir.called_once_with('/pool') isdir.assert_called_once_with('/pool')
rmtree.assert_called_once_with('/pool') rmtree.assert_called_once_with('/pool')
ip_wrap.assert_has_calls([ ip_wrap.assert_has_calls([
mock.call('sudo', 'qlbaas-pool_id'), mock.call('sudo_test', 'qlbaas-pool_id'),
mock.call().garbage_collect_namespace() mock.call().garbage_collect_namespace()
]) ])
@ -125,7 +138,7 @@ class TestHaproxyNSDriver(base.BaseTestCase):
self.driver.exists('pool_id') self.driver.exists('pool_id')
ip_wrap.assert_has_calls([ ip_wrap.assert_has_calls([
mock.call('sudo'), mock.call('sudo_test'),
mock.call().netns.exists('qlbaas-pool_id') mock.call().netns.exists('qlbaas-pool_id')
]) ])
@ -220,7 +233,8 @@ class TestHaproxyNSDriver(base.BaseTestCase):
ip_net.prefixlen = 24 ip_net.prefixlen = 24
self.driver._plug('test_ns', test_port) self.driver._plug('test_ns', test_port)
self.vip_plug_callback.assert_called_once_with('plug', test_port) self.rpc_mock.plug_vip_port.assert_called_once_with(
test_port['id'])
self.assertTrue(dev_exists.called) self.assertTrue(dev_exists.called)
self.vif_driver.plug.assert_called_once_with('net_id', 'port_id', self.vif_driver.plug.assert_called_once_with('net_id', 'port_id',
'test_interface', 'test_interface',
@ -232,7 +246,7 @@ class TestHaproxyNSDriver(base.BaseTestCase):
'test_ns') 'test_ns')
cmd = ['route', 'add', 'default', 'gw', '10.0.0.1'] cmd = ['route', 'add', 'default', 'gw', '10.0.0.1']
ip_wrap.assert_has_calls([ ip_wrap.assert_has_calls([
mock.call('sudo', namespace='test_ns'), mock.call('sudo_test', namespace='test_ns'),
mock.call().netns.execute(cmd, check_exit_code=False), mock.call().netns.execute(cmd, check_exit_code=False),
]) ])
@ -257,7 +271,8 @@ class TestHaproxyNSDriver(base.BaseTestCase):
ip_net.prefixlen = 24 ip_net.prefixlen = 24
self.driver._plug('test_ns', test_port) self.driver._plug('test_ns', test_port)
self.vip_plug_callback.assert_called_once_with('plug', test_port) self.rpc_mock.plug_vip_port.assert_called_once_with(
test_port['id'])
self.assertTrue(dev_exists.called) self.assertTrue(dev_exists.called)
self.vif_driver.plug.assert_called_once_with('net_id', 'port_id', self.vif_driver.plug.assert_called_once_with('net_id', 'port_id',
'test_interface', 'test_interface',
@ -276,8 +291,7 @@ class TestHaproxyNSDriver(base.BaseTestCase):
self.vif_driver.get_device_name.return_value = 'test_interface' self.vif_driver.get_device_name.return_value = 'test_interface'
self.driver._unplug('test_ns', 'port_id') self.driver._unplug('test_ns', 'port_id')
self.vip_plug_callback.assert_called_once_with('unplug', self.rpc_mock.unplug_vip_port.assert_called_once_with('port_id')
{'id': 'port_id'})
self.vif_driver.unplug('test_interface', namespace='test_ns') self.vif_driver.unplug('test_interface', namespace='test_ns')
def test_kill_pids_in_file(self): def test_kill_pids_in_file(self):
@ -293,20 +307,130 @@ class TestHaproxyNSDriver(base.BaseTestCase):
file_mock.__iter__.return_value = iter(['123']) file_mock.__iter__.return_value = iter(['123'])
path_exists.return_value = False path_exists.return_value = False
namespace_driver.kill_pids_in_file('sudo', 'test_path') namespace_driver.kill_pids_in_file('sudo_test', 'test_path')
path_exists.assert_called_once_with('test_path') path_exists.assert_called_once_with('test_path')
self.assertFalse(mock_open.called) self.assertFalse(mock_open.called)
self.assertFalse(mock_execute.called) self.assertFalse(mock_execute.called)
path_exists.return_value = True path_exists.return_value = True
mock_execute.side_effect = RuntimeError mock_execute.side_effect = RuntimeError
namespace_driver.kill_pids_in_file('sudo', 'test_path') namespace_driver.kill_pids_in_file('sudo_test', 'test_path')
self.assertTrue(mock_log.called) self.assertTrue(mock_log.called)
mock_execute.assert_called_once_with( mock_execute.assert_called_once_with(
['kill', '-9', '123'], 'sudo') ['kill', '-9', '123'], 'sudo_test')
def test_get_state_file_path(self): def test_get_state_file_path(self):
with mock.patch('os.makedirs') as mkdir: with mock.patch('os.makedirs') as mkdir:
path = self.driver._get_state_file_path('pool_id', 'conf') path = self.driver._get_state_file_path('pool_id', 'conf')
self.assertEqual('/the/path/pool_id/conf', path) self.assertEqual('/the/path/pool_id/conf', path)
mkdir.assert_called_once_with('/the/path/pool_id', 0o755) mkdir.assert_called_once_with('/the/path/pool_id', 0o755)
def test_deploy_instance(self):
with mock.patch.object(self.driver, 'exists') as exists:
with mock.patch.object(self.driver, 'update') as update:
self.driver.deploy_instance(self.fake_config)
exists.assert_called_once_with(self.fake_config['pool']['id'])
update.assert_called_once_with(self.fake_config)
def test_deploy_instance_non_existing(self):
with mock.patch.object(self.driver, 'exists') as exists:
with mock.patch.object(self.driver, 'create') as create:
exists.return_value = False
self.driver.deploy_instance(self.fake_config)
exists.assert_called_once_with(self.fake_config['pool']['id'])
create.assert_called_once_with(self.fake_config)
def test_deploy_instance_vip_status_non_active(self):
with mock.patch.object(self.driver, 'exists') as exists:
self.fake_config['vip']['status'] = 'NON_ACTIVE'
self.driver.deploy_instance(self.fake_config)
self.assertFalse(exists.called)
def test_deploy_instance_vip_admin_state_down(self):
with mock.patch.object(self.driver, 'exists') as exists:
self.fake_config['vip']['admin_state_up'] = False
self.driver.deploy_instance(self.fake_config)
self.assertFalse(exists.called)
def test_deploy_instance_no_vip(self):
with mock.patch.object(self.driver, 'exists') as exists:
del self.fake_config['vip']
self.driver.deploy_instance(self.fake_config)
self.assertFalse(exists.called)
def test_refresh_device(self):
with mock.patch.object(self.driver, 'deploy_instance') as deploy:
pool_id = 'pool_id1'
self.driver._refresh_device(pool_id)
self.rpc_mock.get_logical_device.assert_called_once_with(pool_id)
deploy.assert_called_once_with(
self.rpc_mock.get_logical_device.return_value)
def test_create_vip(self):
with mock.patch.object(self.driver, '_refresh_device') as refresh:
self.driver.create_vip({'pool_id': '1'})
refresh.assert_called_once_with('1')
def test_update_vip(self):
with mock.patch.object(self.driver, '_refresh_device') as refresh:
self.driver.update_vip({}, {'pool_id': '1'})
refresh.assert_called_once_with('1')
def test_delete_vip(self):
with mock.patch.object(self.driver, 'undeploy_instance') as undeploy:
self.driver.delete_vip({'pool_id': '1'})
undeploy.assert_called_once_with('1')
def test_create_pool(self):
with mock.patch.object(self.driver, '_refresh_device') as refresh:
self.driver.create_pool({'id': '1'})
self.assertFalse(refresh.called)
def test_update_pool(self):
with mock.patch.object(self.driver, '_refresh_device') as refresh:
self.driver.update_pool({}, {'id': '1'})
refresh.assert_called_once_with('1')
def test_delete_pool_existing(self):
with mock.patch.object(self.driver, 'undeploy_instance') as undeploy:
with mock.patch.object(self.driver, 'exists') as exists:
exists.return_value = True
self.driver.delete_pool({'id': '1'})
undeploy.assert_called_once_with('1')
def test_delete_pool_non_existing(self):
with mock.patch.object(self.driver, 'undeploy_instance') as undeploy:
with mock.patch.object(self.driver, 'exists') as exists:
exists.return_value = False
self.driver.delete_pool({'id': '1'})
self.assertFalse(undeploy.called)
def test_create_member(self):
with mock.patch.object(self.driver, '_refresh_device') as refresh:
self.driver.create_member({'pool_id': '1'})
refresh.assert_called_once_with('1')
def test_update_member(self):
with mock.patch.object(self.driver, '_refresh_device') as refresh:
self.driver.update_member({}, {'pool_id': '1'})
refresh.assert_called_once_with('1')
def test_delete_member(self):
with mock.patch.object(self.driver, '_refresh_device') as refresh:
self.driver.delete_member({'pool_id': '1'})
refresh.assert_called_once_with('1')
def test_create_pool_health_monitor(self):
with mock.patch.object(self.driver, '_refresh_device') as refresh:
self.driver.create_pool_health_monitor('', '1')
refresh.assert_called_once_with('1')
def test_update_pool_health_monitor(self):
with mock.patch.object(self.driver, '_refresh_device') as refresh:
self.driver.update_pool_health_monitor('', '', '1')
refresh.assert_called_once_with('1')
def test_delete_pool_health_monitor(self):
with mock.patch.object(self.driver, '_refresh_device') as refresh:
self.driver.delete_pool_health_monitor('', '1')
refresh.assert_called_once_with('1')

View File

@ -16,6 +16,8 @@
# #
# @author: Mark McClain, DreamHost # @author: Mark McClain, DreamHost
import contextlib
import mock import mock
from webob import exc from webob import exc
@ -39,14 +41,21 @@ class TestLoadBalancerPluginBase(
test_db_loadbalancer.LoadBalancerPluginDbTestCase): test_db_loadbalancer.LoadBalancerPluginDbTestCase):
def setUp(self): def setUp(self):
def reset_device_driver():
plugin_driver.AgentBasedPluginDriver.device_driver = None
self.addCleanup(reset_device_driver)
self.mock_importer = mock.patch.object(
plugin_driver, 'importutils').start()
self.addCleanup(mock.patch.stopall)
# needed to reload provider configuration # needed to reload provider configuration
st_db.ServiceTypeManager._instance = None st_db.ServiceTypeManager._instance = None
plugin_driver.AgentBasedPluginDriver.device_driver = 'dummy'
super(TestLoadBalancerPluginBase, self).setUp( super(TestLoadBalancerPluginBase, self).setUp(
lbaas_provider=('LOADBALANCER:lbaas:neutron.services.' lbaas_provider=('LOADBALANCER:lbaas:neutron.services.'
'loadbalancer.drivers.haproxy.plugin_driver.' 'loadbalancer.drivers.haproxy.plugin_driver.'
'HaproxyOnHostPluginDriver:default')) 'AgentBasedPluginDriver:default'))
# create another API instance to make testing easier
# pass a mock to our API instance
# we need access to loaded plugins to modify models # we need access to loaded plugins to modify models
loaded_plugins = manager.NeutronManager().get_service_plugins() loaded_plugins = manager.NeutronManager().get_service_plugins()
@ -66,12 +75,6 @@ class TestLoadBalancerCallbacks(TestLoadBalancerPluginBase):
'.LbaasAgentSchedulerDbMixin.get_lbaas_agents') '.LbaasAgentSchedulerDbMixin.get_lbaas_agents')
get_lbaas_agents_patcher.start() get_lbaas_agents_patcher.start()
# mocking plugin_driver create_pool() as it does nothing more than
# pool scheduling which is beyond the scope of this test case
mock.patch('neutron.services.loadbalancer.drivers.haproxy'
'.plugin_driver.HaproxyOnHostPluginDriver'
'.create_pool').start()
self.addCleanup(mock.patch.stopall) self.addCleanup(mock.patch.stopall)
def test_get_ready_devices(self): def test_get_ready_devices(self):
@ -132,10 +135,10 @@ class TestLoadBalancerCallbacks(TestLoadBalancerPluginBase):
{'id': pools[1].id}, {'id': pools[1].id},
{'id': pools[2].id}]} {'id': pools[2].id}]}
ready = self.callbacks.get_ready_devices(ctx) ready = self.callbacks.get_ready_devices(ctx)
self.assertEqual(len(ready), 2) self.assertEqual(len(ready), 3)
self.assertIn(pools[0].id, ready) self.assertIn(pools[0].id, ready)
self.assertIn(pools[1].id, ready) self.assertIn(pools[1].id, ready)
self.assertNotIn(pools[2].id, ready) self.assertIn(pools[2].id, ready)
# cleanup # cleanup
ctx.session.query(ldb.Pool).delete() ctx.session.query(ldb.Pool).delete()
ctx.session.query(ldb.Vip).delete() ctx.session.query(ldb.Vip).delete()
@ -158,7 +161,7 @@ class TestLoadBalancerCallbacks(TestLoadBalancerPluginBase):
ready = self.callbacks.get_ready_devices( ready = self.callbacks.get_ready_devices(
context.get_admin_context(), context.get_admin_context(),
) )
self.assertFalse(ready) self.assertEqual([vip['vip']['pool_id']], ready)
def test_get_ready_devices_inactive_pool(self): def test_get_ready_devices_inactive_pool(self):
with self.vip() as vip: with self.vip() as vip:
@ -188,15 +191,20 @@ class TestLoadBalancerCallbacks(TestLoadBalancerPluginBase):
exceptions.Invalid, exceptions.Invalid,
self.callbacks.get_logical_device, self.callbacks.get_logical_device,
context.get_admin_context(), context.get_admin_context(),
pool['pool']['id'], pool['pool']['id'])
activate=False
)
def test_get_logical_device_activate(self): def test_get_logical_device_active(self):
with self.pool() as pool: with self.pool() as pool:
with self.vip(pool=pool) as vip: with self.vip(pool=pool) as vip:
with self.member(pool_id=vip['vip']['pool_id']) as member: with self.member(pool_id=vip['vip']['pool_id']) as member:
ctx = context.get_admin_context() ctx = context.get_admin_context()
# activate objects
self.plugin_instance.update_status(
ctx, ldb.Pool, pool['pool']['id'], 'ACTIVE')
self.plugin_instance.update_status(
ctx, ldb.Member, member['member']['id'], 'ACTIVE')
self.plugin_instance.update_status(
ctx, ldb.Vip, vip['vip']['id'], 'ACTIVE')
# build the expected # build the expected
port = self.plugin_instance._core_plugin.get_port( port = self.plugin_instance._core_plugin.get_port(
@ -221,11 +229,12 @@ class TestLoadBalancerCallbacks(TestLoadBalancerPluginBase):
'pool': pool, 'pool': pool,
'vip': vip['vip'], 'vip': vip['vip'],
'members': [member['member']], 'members': [member['member']],
'healthmonitors': [] 'healthmonitors': [],
'driver': 'dummy'
} }
logical_config = self.callbacks.get_logical_device( logical_config = self.callbacks.get_logical_device(
ctx, pool['id'], activate=True ctx, pool['id']
) )
self.assertEqual(logical_config, expected) self.assertEqual(logical_config, expected)
@ -246,7 +255,7 @@ class TestLoadBalancerCallbacks(TestLoadBalancerPluginBase):
'INACTIVE') 'INACTIVE')
logical_config = self.callbacks.get_logical_device( logical_config = self.callbacks.get_logical_device(
ctx, pool['pool']['id'], activate=False) ctx, pool['pool']['id'])
member['member']['status'] = constants.INACTIVE member['member']['status'] = constants.INACTIVE
self.assertEqual([member['member']], self.assertEqual([member['member']],
@ -308,6 +317,58 @@ class TestLoadBalancerCallbacks(TestLoadBalancerPluginBase):
host='host' host='host'
) )
def test_pool_deployed(self):
with self.pool() as pool:
with self.vip(pool=pool) as vip:
with self.member(pool_id=vip['vip']['pool_id']) as member:
ctx = context.get_admin_context()
p = self.plugin_instance.get_pool(ctx, pool['pool']['id'])
self.assertEqual('PENDING_CREATE', p['status'])
v = self.plugin_instance.get_vip(ctx, vip['vip']['id'])
self.assertEqual('PENDING_CREATE', v['status'])
m = self.plugin_instance.get_member(
ctx, member['member']['id'])
self.assertEqual('PENDING_CREATE', m['status'])
self.callbacks.pool_deployed(ctx, pool['pool']['id'])
p = self.plugin_instance.get_pool(ctx, pool['pool']['id'])
self.assertEqual('ACTIVE', p['status'])
v = self.plugin_instance.get_vip(ctx, vip['vip']['id'])
self.assertEqual('ACTIVE', v['status'])
m = self.plugin_instance.get_member(
ctx, member['member']['id'])
self.assertEqual('ACTIVE', m['status'])
def test_update_status_pool(self):
with self.pool() as pool:
pool_id = pool['pool']['id']
ctx = context.get_admin_context()
p = self.plugin_instance.get_pool(ctx, pool_id)
self.assertEqual('PENDING_CREATE', p['status'])
self.callbacks.update_status(ctx, 'pool', pool_id, 'ACTIVE')
p = self.plugin_instance.get_pool(ctx, pool_id)
self.assertEqual('ACTIVE', p['status'])
def test_update_status_health_monitor(self):
with contextlib.nested(
self.pool(),
self.health_monitor()
) as (pool, hm):
pool_id = pool['pool']['id']
ctx = context.get_admin_context()
self.plugin_instance.create_pool_health_monitor(ctx, hm, pool_id)
hm_id = hm['health_monitor']['id']
h = self.plugin_instance.get_pool_health_monitor(ctx, hm_id,
pool_id)
self.assertEqual('PENDING_CREATE', h['status'])
self.callbacks.update_status(
ctx, 'health_monitor',
{'monitor_id': hm_id, 'pool_id': pool_id}, 'ACTIVE')
h = self.plugin_instance.get_pool_health_monitor(ctx, hm_id,
pool_id)
self.assertEqual('ACTIVE', h['status'])
class TestLoadBalancerAgentApi(base.BaseTestCase): class TestLoadBalancerAgentApi(base.BaseTestCase):
def setUp(self): def setUp(self):
@ -321,46 +382,73 @@ class TestLoadBalancerAgentApi(base.BaseTestCase):
def test_init(self): def test_init(self):
self.assertEqual(self.api.topic, 'topic') self.assertEqual(self.api.topic, 'topic')
def _call_test_helper(self, method_name): def _call_test_helper(self, method_name, method_args):
rv = getattr(self.api, method_name)(mock.sentinel.context, 'test', rv = getattr(self.api, method_name)(mock.sentinel.context,
'host') host='host',
self.assertEqual(rv, self.mock_cast.return_value) **method_args)
self.mock_cast.assert_called_once_with(
mock.sentinel.context,
self.mock_msg.return_value,
topic='topic.host'
)
self.mock_msg.assert_called_once_with(
method_name,
pool_id='test',
host='host'
)
def test_reload_pool(self):
self._call_test_helper('reload_pool')
def test_destroy_pool(self):
self._call_test_helper('destroy_pool')
def test_modify_pool(self):
self._call_test_helper('modify_pool')
def test_agent_updated(self):
rv = self.api.agent_updated(mock.sentinel.context, True, 'host')
self.assertEqual(rv, self.mock_cast.return_value) self.assertEqual(rv, self.mock_cast.return_value)
self.mock_cast.assert_called_once_with( self.mock_cast.assert_called_once_with(
mock.sentinel.context, mock.sentinel.context,
self.mock_msg.return_value, self.mock_msg.return_value,
topic='topic.host', topic='topic.host',
version='1.1' version=None
) )
if method_name == 'agent_updated':
method_args = {'payload': method_args}
self.mock_msg.assert_called_once_with( self.mock_msg.assert_called_once_with(
'agent_updated', method_name,
payload={'admin_state_up': True} **method_args
) )
def test_agent_updated(self):
self._call_test_helper('agent_updated', {'admin_state_up': 'test'})
def test_create_pool(self):
self._call_test_helper('create_pool', {'pool': 'test',
'driver_name': 'dummy'})
def test_update_pool(self):
self._call_test_helper('update_pool', {'old_pool': 'test',
'pool': 'test'})
def test_delete_pool(self):
self._call_test_helper('delete_pool', {'pool': 'test'})
def test_create_vip(self):
self._call_test_helper('create_vip', {'vip': 'test'})
def test_update_vip(self):
self._call_test_helper('update_vip', {'old_vip': 'test',
'vip': 'test'})
def test_delete_vip(self):
self._call_test_helper('delete_vip', {'vip': 'test'})
def test_create_member(self):
self._call_test_helper('create_member', {'member': 'test'})
def test_update_member(self):
self._call_test_helper('update_member', {'old_member': 'test',
'member': 'test'})
def test_delete_member(self):
self._call_test_helper('delete_member', {'member': 'test'})
def test_create_monitor(self):
self._call_test_helper('create_pool_health_monitor',
{'health_monitor': 'test', 'pool_id': 'test'})
def test_update_monitor(self):
self._call_test_helper('update_pool_health_monitor',
{'old_health_monitor': 'test',
'health_monitor': 'test',
'pool_id': 'test'})
def test_delete_monitor(self):
self._call_test_helper('delete_pool_health_monitor',
{'health_monitor': 'test', 'pool_id': 'test'})
class TestLoadBalancerPluginNotificationWrapper(TestLoadBalancerPluginBase): class TestLoadBalancerPluginNotificationWrapper(TestLoadBalancerPluginBase):
def setUp(self): def setUp(self):
@ -370,16 +458,10 @@ class TestLoadBalancerPluginNotificationWrapper(TestLoadBalancerPluginBase):
super(TestLoadBalancerPluginNotificationWrapper, self).setUp() super(TestLoadBalancerPluginNotificationWrapper, self).setUp()
self.mock_api = api_cls.return_value self.mock_api = api_cls.return_value
# mocking plugin_driver create_pool() as it does nothing more than
# pool scheduling which is beyond the scope of this test case
mock.patch('neutron.services.loadbalancer.drivers.haproxy'
'.plugin_driver.HaproxyOnHostPluginDriver'
'.create_pool').start()
self.mock_get_driver = mock.patch.object(self.plugin_instance, self.mock_get_driver = mock.patch.object(self.plugin_instance,
'_get_driver') '_get_driver')
self.mock_get_driver.return_value = (plugin_driver. self.mock_get_driver.return_value = (plugin_driver.
HaproxyOnHostPluginDriver( AgentBasedPluginDriver(
self.plugin_instance self.plugin_instance
)) ))
@ -389,9 +471,9 @@ class TestLoadBalancerPluginNotificationWrapper(TestLoadBalancerPluginBase):
with self.subnet() as subnet: with self.subnet() as subnet:
with self.pool(subnet=subnet) as pool: with self.pool(subnet=subnet) as pool:
with self.vip(pool=pool, subnet=subnet) as vip: with self.vip(pool=pool, subnet=subnet) as vip:
self.mock_api.reload_pool.assert_called_once_with( self.mock_api.create_vip.assert_called_once_with(
mock.ANY, mock.ANY,
vip['vip']['pool_id'], vip['vip'],
'host' 'host'
) )
@ -399,8 +481,8 @@ class TestLoadBalancerPluginNotificationWrapper(TestLoadBalancerPluginBase):
with self.subnet() as subnet: with self.subnet() as subnet:
with self.pool(subnet=subnet) as pool: with self.pool(subnet=subnet) as pool:
with self.vip(pool=pool, subnet=subnet) as vip: with self.vip(pool=pool, subnet=subnet) as vip:
self.mock_api.reset_mock()
ctx = context.get_admin_context() ctx = context.get_admin_context()
old_vip = vip['vip'].copy()
vip['vip'].pop('status') vip['vip'].pop('status')
new_vip = self.plugin_instance.update_vip( new_vip = self.plugin_instance.update_vip(
ctx, ctx,
@ -408,9 +490,10 @@ class TestLoadBalancerPluginNotificationWrapper(TestLoadBalancerPluginBase):
vip vip
) )
self.mock_api.reload_pool.assert_called_once_with( self.mock_api.update_vip.assert_called_once_with(
mock.ANY, mock.ANY,
vip['vip']['pool_id'], old_vip,
new_vip,
'host' 'host'
) )
@ -423,51 +506,55 @@ class TestLoadBalancerPluginNotificationWrapper(TestLoadBalancerPluginBase):
with self.subnet() as subnet: with self.subnet() as subnet:
with self.pool(subnet=subnet) as pool: with self.pool(subnet=subnet) as pool:
with self.vip(pool=pool, subnet=subnet, no_delete=True) as vip: with self.vip(pool=pool, subnet=subnet, no_delete=True) as vip:
self.mock_api.reset_mock()
ctx = context.get_admin_context() ctx = context.get_admin_context()
self.plugin_instance.delete_vip(ctx, vip['vip']['id']) self.plugin_instance.delete_vip(ctx, vip['vip']['id'])
self.mock_api.destroy_pool.assert_called_once_with( vip['vip']['status'] = 'PENDING_DELETE'
self.mock_api.delete_vip.assert_called_once_with(
mock.ANY, mock.ANY,
vip['vip']['pool_id'], vip['vip'],
'host' 'host'
) )
def test_create_pool(self): def test_create_pool(self):
with self.pool(): with self.pool() as pool:
self.assertFalse(self.mock_api.reload_pool.called) self.mock_api.create_pool.assert_called_once_with(
self.assertFalse(self.mock_api.modify_pool.called) mock.ANY,
self.assertFalse(self.mock_api.destroy_pool.called) pool['pool'],
mock.ANY,
'dummy'
)
def test_update_pool_non_active(self): def test_update_pool_non_active(self):
with self.pool() as pool: with self.pool() as pool:
pool['pool']['status'] = 'INACTIVE' pool['pool']['status'] = 'INACTIVE'
ctx = context.get_admin_context() ctx = context.get_admin_context()
orig_pool = pool['pool'].copy()
del pool['pool']['provider'] del pool['pool']['provider']
self.plugin_instance.update_pool(ctx, pool['pool']['id'], pool) self.plugin_instance.update_pool(ctx, pool['pool']['id'], pool)
self.mock_api.destroy_pool.assert_called_once_with( self.mock_api.delete_pool.assert_called_once_with(
mock.ANY, pool['pool']['id'], 'host') mock.ANY, orig_pool, 'host')
self.assertFalse(self.mock_api.reload_pool.called)
self.assertFalse(self.mock_api.modify_pool.called)
def test_update_pool_no_vip_id(self): def test_update_pool_no_vip_id(self):
with self.pool() as pool: with self.pool() as pool:
ctx = context.get_admin_context() ctx = context.get_admin_context()
orig_pool = pool['pool'].copy()
del pool['pool']['provider'] del pool['pool']['provider']
self.plugin_instance.update_pool(ctx, pool['pool']['id'], pool) updated = self.plugin_instance.update_pool(
self.assertFalse(self.mock_api.destroy_pool.called) ctx, pool['pool']['id'], pool)
self.assertFalse(self.mock_api.reload_pool.called) self.mock_api.update_pool.assert_called_once_with(
self.assertFalse(self.mock_api.modify_pool.called) mock.ANY, orig_pool, updated, 'host')
def test_update_pool_with_vip_id(self): def test_update_pool_with_vip_id(self):
with self.pool() as pool: with self.pool() as pool:
with self.vip(pool=pool): with self.vip(pool=pool) as vip:
ctx = context.get_admin_context() ctx = context.get_admin_context()
old_pool = pool['pool'].copy()
old_pool['vip_id'] = vip['vip']['id']
del pool['pool']['provider'] del pool['pool']['provider']
self.plugin_instance.update_pool(ctx, pool['pool']['id'], pool) updated = self.plugin_instance.update_pool(
self.mock_api.reload_pool.assert_called_once_with( ctx, pool['pool']['id'], pool)
mock.ANY, pool['pool']['id'], 'host') self.mock_api.update_pool.assert_called_once_with(
self.assertFalse(self.mock_api.destroy_pool.called) mock.ANY, old_pool, updated, 'host')
self.assertFalse(self.mock_api.modify_pool.called)
def test_delete_pool(self): def test_delete_pool(self):
with self.pool(no_delete=True) as pool: with self.pool(no_delete=True) as pool:
@ -475,26 +562,26 @@ class TestLoadBalancerPluginNotificationWrapper(TestLoadBalancerPluginBase):
pool['pool']['id']) pool['pool']['id'])
res = req.get_response(self.ext_api) res = req.get_response(self.ext_api)
self.assertEqual(res.status_int, exc.HTTPNoContent.code) self.assertEqual(res.status_int, exc.HTTPNoContent.code)
self.mock_api.destroy_pool.assert_called_once_with( pool['pool']['status'] = 'PENDING_DELETE'
mock.ANY, pool['pool']['id'], 'host') self.mock_api.delete_pool.assert_called_once_with(
mock.ANY, pool['pool'], 'host')
def test_create_member(self): def test_create_member(self):
with self.pool() as pool: with self.pool() as pool:
pool_id = pool['pool']['id'] pool_id = pool['pool']['id']
with self.member(pool_id=pool_id): with self.member(pool_id=pool_id) as member:
self.mock_api.modify_pool.assert_called_once_with( self.mock_api.create_member.assert_called_once_with(
mock.ANY, pool_id, 'host') mock.ANY, member['member'], 'host')
def test_update_member(self): def test_update_member(self):
with self.pool() as pool: with self.pool() as pool:
pool_id = pool['pool']['id'] pool_id = pool['pool']['id']
with self.member(pool_id=pool_id) as member: with self.member(pool_id=pool_id) as member:
ctx = context.get_admin_context() ctx = context.get_admin_context()
self.mock_api.modify_pool.reset_mock() updated = self.plugin_instance.update_member(
self.plugin_instance.update_member(
ctx, member['member']['id'], member) ctx, member['member']['id'], member)
self.mock_api.modify_pool.assert_called_once_with( self.mock_api.update_member.assert_called_once_with(
mock.ANY, pool_id, 'host') mock.ANY, member['member'], updated, 'host')
def test_update_member_new_pool(self): def test_update_member_new_pool(self):
with self.pool() as pool1: with self.pool() as pool1:
@ -502,89 +589,105 @@ class TestLoadBalancerPluginNotificationWrapper(TestLoadBalancerPluginBase):
with self.pool() as pool2: with self.pool() as pool2:
pool2_id = pool2['pool']['id'] pool2_id = pool2['pool']['id']
with self.member(pool_id=pool1_id) as member: with self.member(pool_id=pool1_id) as member:
self.mock_api.create_member.reset_mock()
ctx = context.get_admin_context() ctx = context.get_admin_context()
self.mock_api.modify_pool.reset_mock() old_member = member['member'].copy()
member['member']['pool_id'] = pool2_id member['member']['pool_id'] = pool2_id
self.plugin_instance.update_member(ctx, updated = self.plugin_instance.update_member(
member['member']['id'], ctx, member['member']['id'], member)
member) self.mock_api.delete_member.assert_called_once_with(
self.assertEqual(2, self.mock_api.modify_pool.call_count) mock.ANY, old_member, 'host')
self.mock_api.modify_pool.assert_has_calls( self.mock_api.create_member.assert_called_once_with(
[mock.call(mock.ANY, pool1_id, 'host'), mock.ANY, updated, 'host')
mock.call(mock.ANY, pool2_id, 'host')])
def test_delete_member(self): def test_delete_member(self):
with self.pool() as pool: with self.pool() as pool:
pool_id = pool['pool']['id'] pool_id = pool['pool']['id']
with self.member(pool_id=pool_id, with self.member(pool_id=pool_id,
no_delete=True) as member: no_delete=True) as member:
self.mock_api.modify_pool.reset_mock()
req = self.new_delete_request('members', req = self.new_delete_request('members',
member['member']['id']) member['member']['id'])
res = req.get_response(self.ext_api) res = req.get_response(self.ext_api)
self.assertEqual(res.status_int, exc.HTTPNoContent.code) self.assertEqual(res.status_int, exc.HTTPNoContent.code)
self.mock_api.modify_pool.assert_called_once_with( member['member']['status'] = 'PENDING_DELETE'
mock.ANY, pool_id, 'host') self.mock_api.delete_member.assert_called_once_with(
mock.ANY, member['member'], 'host')
def test_create_pool_health_monitor(self): def test_create_pool_health_monitor(self):
with self.pool() as pool: with contextlib.nested(
self.pool(),
self.health_monitor()
) as (pool, hm):
pool_id = pool['pool']['id'] pool_id = pool['pool']['id']
with self.health_monitor() as hm: ctx = context.get_admin_context()
ctx = context.get_admin_context() self.plugin_instance.create_pool_health_monitor(ctx, hm, pool_id)
self.plugin_instance.create_pool_health_monitor(ctx, # hm now has a ref to the pool with which it is associated
hm, hm = self.plugin.get_health_monitor(
pool_id) ctx, hm['health_monitor']['id'])
self.mock_api.modify_pool.assert_called_once_with( self.mock_api.create_pool_health_monitor.assert_called_once_with(
mock.ANY, pool_id, 'host') mock.ANY, hm, pool_id, 'host')
def test_delete_pool_health_monitor(self): def test_delete_pool_health_monitor(self):
with self.pool() as pool: with contextlib.nested(
self.pool(),
self.health_monitor()
) as (pool, hm):
pool_id = pool['pool']['id'] pool_id = pool['pool']['id']
with self.health_monitor() as hm: ctx = context.get_admin_context()
ctx = context.get_admin_context() self.plugin_instance.create_pool_health_monitor(ctx, hm, pool_id)
self.plugin_instance.create_pool_health_monitor(ctx, # hm now has a ref to the pool with which it is associated
hm, hm = self.plugin.get_health_monitor(
pool_id) ctx, hm['health_monitor']['id'])
self.mock_api.modify_pool.reset_mock() hm['pools'][0]['status'] = 'PENDING_DELETE'
self.plugin_instance.delete_pool_health_monitor( self.plugin_instance.delete_pool_health_monitor(
ctx, hm['health_monitor']['id'], pool_id) ctx, hm['id'], pool_id)
self.mock_api.modify_pool.assert_called_once_with( self.mock_api.delete_pool_health_monitor.assert_called_once_with(
mock.ANY, pool_id, 'host') mock.ANY, hm, pool_id, 'host')
def test_update_health_monitor_associated_with_pool(self): def test_update_health_monitor_associated_with_pool(self):
with self.health_monitor(type='HTTP') as monitor: with contextlib.nested(
with self.pool() as pool: self.health_monitor(type='HTTP'),
data = { self.pool()
'health_monitor': { ) as (monitor, pool):
'id': monitor['health_monitor']['id'], data = {
'tenant_id': self._tenant_id 'health_monitor': {
} 'id': monitor['health_monitor']['id'],
'tenant_id': self._tenant_id
} }
req = self.new_create_request( }
'pools', req = self.new_create_request(
data, 'pools',
fmt=self.fmt, data,
id=pool['pool']['id'], fmt=self.fmt,
subresource='health_monitors') id=pool['pool']['id'],
res = req.get_response(self.ext_api) subresource='health_monitors')
self.assertEqual(res.status_int, exc.HTTPCreated.code) res = req.get_response(self.ext_api)
self.mock_api.modify_pool.assert_called_once_with( self.assertEqual(res.status_int, exc.HTTPCreated.code)
mock.ANY, # hm now has a ref to the pool with which it is associated
pool['pool']['id'], ctx = context.get_admin_context()
'host' hm = self.plugin.get_health_monitor(
) ctx, monitor['health_monitor']['id'])
self.mock_api.create_pool_health_monitor.assert_called_once_with(
mock.ANY,
hm,
pool['pool']['id'],
'host'
)
self.mock_api.reset_mock() self.mock_api.reset_mock()
data = {'health_monitor': {'delay': 20, data = {'health_monitor': {'delay': 20,
'timeout': 20, 'timeout': 20,
'max_retries': 2, 'max_retries': 2,
'admin_state_up': False}} 'admin_state_up': False}}
req = self.new_update_request("health_monitors", updated = hm.copy()
data, updated.update(data['health_monitor'])
monitor['health_monitor']['id']) req = self.new_update_request("health_monitors",
req.get_response(self.ext_api) data,
self.mock_api.modify_pool.assert_called_once_with( monitor['health_monitor']['id'])
mock.ANY, req.get_response(self.ext_api)
pool['pool']['id'], self.mock_api.update_pool_health_monitor.assert_called_once_with(
'host' mock.ANY,
) hm,
updated,
pool['pool']['id'],
'host')

View File

@ -21,7 +21,7 @@ from neutron.api import extensions
from neutron.api.v2 import attributes from neutron.api.v2 import attributes
from neutron.common import constants from neutron.common import constants
from neutron import context from neutron import context
from neutron.db import servicetype_db as sdb from neutron.db import servicetype_db as st_db
from neutron.extensions import agent from neutron.extensions import agent
from neutron.extensions import lbaas_agentscheduler from neutron.extensions import lbaas_agentscheduler
from neutron import manager from neutron import manager
@ -79,8 +79,8 @@ class LBaaSAgentSchedulerTestCase(test_agent_ext_plugin.AgentDBTestMixIn,
'HaproxyOnHostPluginDriver:default')], 'HaproxyOnHostPluginDriver:default')],
'service_providers') 'service_providers')
#force service type manager to reload configuration: # need to reload provider configuration
sdb.ServiceTypeManager._instance = None st_db.ServiceTypeManager._instance = None
super(LBaaSAgentSchedulerTestCase, self).setUp( super(LBaaSAgentSchedulerTestCase, self).setUp(
self.plugin_str, service_plugins=service_plugins) self.plugin_str, service_plugins=service_plugins)
@ -122,8 +122,7 @@ class LBaaSAgentSchedulerTestCase(test_agent_ext_plugin.AgentDBTestMixIn,
'binary': 'neutron-loadbalancer-agent', 'binary': 'neutron-loadbalancer-agent',
'host': LBAAS_HOSTA, 'host': LBAAS_HOSTA,
'topic': 'LOADBALANCER_AGENT', 'topic': 'LOADBALANCER_AGENT',
'configurations': {'device_driver': 'device_driver', 'configurations': {'device_drivers': ['haproxy_ns']},
'interface_driver': 'interface_driver'},
'agent_type': constants.AGENT_TYPE_LOADBALANCER} 'agent_type': constants.AGENT_TYPE_LOADBALANCER}
self._register_one_agent_state(lbaas_hosta) self._register_one_agent_state(lbaas_hosta)
with self.pool() as pool: with self.pool() as pool:
@ -150,8 +149,7 @@ class LBaaSAgentSchedulerTestCase(test_agent_ext_plugin.AgentDBTestMixIn,
'binary': 'neutron-loadbalancer-agent', 'binary': 'neutron-loadbalancer-agent',
'host': LBAAS_HOSTA, 'host': LBAAS_HOSTA,
'topic': 'LOADBALANCER_AGENT', 'topic': 'LOADBALANCER_AGENT',
'configurations': {'device_driver': 'device_driver', 'configurations': {'device_drivers': ['haproxy_ns']},
'interface_driver': 'interface_driver'},
'agent_type': constants.AGENT_TYPE_LOADBALANCER} 'agent_type': constants.AGENT_TYPE_LOADBALANCER}
self._register_one_agent_state(lbaas_hosta) self._register_one_agent_state(lbaas_hosta)
is_agent_down_str = 'neutron.db.agents_db.AgentDbMixin.is_agent_down' is_agent_down_str = 'neutron.db.agents_db.AgentDbMixin.is_agent_down'