Merge "LBaaS: unify haproxy-on-host plugin driver and agent"
This commit is contained in:
commit
f4fd1a096c
@ -23,9 +23,16 @@
|
||||
# Example of interface_driver option for LinuxBridge
|
||||
# interface_driver = neutron.agent.linux.interface.BridgeInterfaceDriver
|
||||
|
||||
# The agent requires a driver to manage the loadbalancer. HAProxy is the
|
||||
# opensource version.
|
||||
# The agent requires drivers to manage the loadbalancer. HAProxy is the opensource version.
|
||||
# Multiple device drivers reflecting different service providers could be specified:
|
||||
# device_driver = path.to.provider1.driver.Driver
|
||||
# device_driver = path.to.provider2.driver.Driver
|
||||
# Default is:
|
||||
# device_driver = neutron.services.loadbalancer.drivers.haproxy.namespace_driver.HaproxyNSDriver
|
||||
|
||||
[haproxy]
|
||||
# Location to store config and state files
|
||||
# loadbalancer_state_path = $state_path/lbaas
|
||||
|
||||
# The user group
|
||||
# user_group = nogroup
|
||||
|
@ -79,11 +79,19 @@ class LbaasAgentSchedulerDbMixin(agentschedulers_db.AgentSchedulerDbMixin,
|
||||
else:
|
||||
return {'pools': []}
|
||||
|
||||
def get_lbaas_agent_candidates(self, device_driver, active_agents):
|
||||
candidates = []
|
||||
for agent in active_agents:
|
||||
agent_conf = self.get_configuration_dict(agent)
|
||||
if device_driver in agent_conf['device_drivers']:
|
||||
candidates.append(agent)
|
||||
return candidates
|
||||
|
||||
|
||||
class ChanceScheduler(object):
|
||||
"""Allocate a loadbalancer agent for a vip in a random way."""
|
||||
|
||||
def schedule(self, plugin, context, pool):
|
||||
def schedule(self, plugin, context, pool, device_driver):
|
||||
"""Schedule the pool to an active loadbalancer agent if there
|
||||
is no enabled agent hosting it.
|
||||
"""
|
||||
@ -97,11 +105,18 @@ class ChanceScheduler(object):
|
||||
'agent_id': lbaas_agent['id']})
|
||||
return
|
||||
|
||||
candidates = plugin.get_lbaas_agents(context, active=True)
|
||||
if not candidates:
|
||||
active_agents = plugin.get_lbaas_agents(context, active=True)
|
||||
if not active_agents:
|
||||
LOG.warn(_('No active lbaas agents for pool %s'), pool['id'])
|
||||
return
|
||||
|
||||
candidates = plugin.get_lbaas_agent_candidates(device_driver,
|
||||
active_agents)
|
||||
if not candidates:
|
||||
LOG.warn(_('No lbaas agent supporting device driver %s'),
|
||||
device_driver)
|
||||
return
|
||||
|
||||
chosen_agent = random.choice(candidates)
|
||||
binding = PoolLoadbalancerAgentBinding()
|
||||
binding.agent = chosen_agent
|
||||
|
@ -107,10 +107,10 @@ class LoadBalancerAbstractDriver(object):
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def update_health_monitor(self, context,
|
||||
old_health_monitor,
|
||||
health_monitor,
|
||||
pool_id):
|
||||
def update_pool_health_monitor(self, context,
|
||||
old_health_monitor,
|
||||
health_monitor,
|
||||
pool_id):
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
|
98
neutron/services/loadbalancer/drivers/agent_device_driver.py
Normal file
98
neutron/services/loadbalancer/drivers/agent_device_driver.py
Normal file
@ -0,0 +1,98 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
#
|
||||
# Copyright 2013 OpenStack Foundation. All rights reserved
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import abc
|
||||
|
||||
import six
|
||||
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class AgentDeviceDriver(object):
|
||||
"""Abstract device driver that defines the API required by LBaaS agent."""
|
||||
|
||||
@abc.abstractmethod
|
||||
def get_name(cls):
|
||||
"""Returns unique name across all LBaaS device drivers."""
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def deploy_instance(self, logical_config):
|
||||
"""Fully deploys a loadbalancer instance from a given config."""
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def undeploy_instance(self, pool_id):
|
||||
"""Fully undeploys the loadbalancer instance."""
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def get_stats(self, pool_id):
|
||||
pass
|
||||
|
||||
def remove_orphans(self, known_pool_ids):
|
||||
# Not all drivers will support this
|
||||
raise NotImplementedError()
|
||||
|
||||
@abc.abstractmethod
|
||||
def create_vip(self, vip):
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def update_vip(self, old_vip, vip):
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def delete_vip(self, vip):
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def create_pool(self, pool):
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def update_pool(self, old_pool, pool):
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def delete_pool(self, pool):
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def create_member(self, member):
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def update_member(self, old_member, member):
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def delete_member(self, context, member):
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def create_pool_health_monitor(self, health_monitor, pool_id):
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def update_pool_health_monitor(self,
|
||||
old_health_monitor,
|
||||
health_monitor,
|
||||
pool_id):
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def delete_pool_health_monitor(self, context, health_monitor, pool_id):
|
||||
pass
|
@ -22,7 +22,12 @@ from neutron.openstack.common.rpc import proxy
|
||||
class LbaasAgentApi(proxy.RpcProxy):
|
||||
"""Agent side of the Agent to Plugin RPC API."""
|
||||
|
||||
API_VERSION = '1.0'
|
||||
API_VERSION = '2.0'
|
||||
# history
|
||||
# 1.0 Initial version
|
||||
# 2.0 Generic API for agent based drivers
|
||||
# - get_logical_device() handling changed on plugin side;
|
||||
# - pool_deployed() and update_status() methods added;
|
||||
|
||||
def __init__(self, topic, context, host):
|
||||
super(LbaasAgentApi, self).__init__(topic, self.API_VERSION)
|
||||
@ -36,21 +41,35 @@ class LbaasAgentApi(proxy.RpcProxy):
|
||||
topic=self.topic
|
||||
)
|
||||
|
||||
def pool_destroyed(self, pool_id):
|
||||
return self.call(
|
||||
self.context,
|
||||
self.make_msg('pool_destroyed', pool_id=pool_id),
|
||||
topic=self.topic
|
||||
)
|
||||
|
||||
def pool_deployed(self, pool_id):
|
||||
return self.call(
|
||||
self.context,
|
||||
self.make_msg('pool_deployed', pool_id=pool_id),
|
||||
topic=self.topic
|
||||
)
|
||||
|
||||
def get_logical_device(self, pool_id):
|
||||
return self.call(
|
||||
self.context,
|
||||
self.make_msg(
|
||||
'get_logical_device',
|
||||
pool_id=pool_id,
|
||||
host=self.host
|
||||
pool_id=pool_id
|
||||
),
|
||||
topic=self.topic
|
||||
)
|
||||
|
||||
def pool_destroyed(self, pool_id):
|
||||
def update_status(self, obj_type, obj_id, status):
|
||||
return self.call(
|
||||
self.context,
|
||||
self.make_msg('pool_destroyed', pool_id=pool_id, host=self.host),
|
||||
self.make_msg('update_status', obj_type=obj_type, obj_id=obj_id,
|
||||
status=status),
|
||||
topic=self.topic
|
||||
)
|
||||
|
||||
|
@ -16,152 +16,100 @@
|
||||
#
|
||||
# @author: Mark McClain, DreamHost
|
||||
|
||||
import weakref
|
||||
|
||||
from oslo.config import cfg
|
||||
|
||||
from neutron.agent.common import config
|
||||
from neutron.agent import rpc as agent_rpc
|
||||
from neutron.common import constants
|
||||
from neutron.common import constants as n_const
|
||||
from neutron.common import exceptions as n_exc
|
||||
from neutron import context
|
||||
from neutron.openstack.common import importutils
|
||||
from neutron.openstack.common import log as logging
|
||||
from neutron.openstack.common import loopingcall
|
||||
from neutron.openstack.common import periodic_task
|
||||
from neutron.plugins.common import constants
|
||||
from neutron.services.loadbalancer.drivers.haproxy import (
|
||||
agent_api,
|
||||
plugin_driver
|
||||
)
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
NS_PREFIX = 'qlbaas-'
|
||||
|
||||
OPTS = [
|
||||
cfg.StrOpt(
|
||||
cfg.MultiStrOpt(
|
||||
'device_driver',
|
||||
default=('neutron.services.loadbalancer.drivers'
|
||||
'.haproxy.namespace_driver.HaproxyNSDriver'),
|
||||
help=_('The driver used to manage the loadbalancing device'),
|
||||
),
|
||||
cfg.StrOpt(
|
||||
'loadbalancer_state_path',
|
||||
default='$state_path/lbaas',
|
||||
help=_('Location to store config and state files'),
|
||||
default=['neutron.services.loadbalancer.drivers'
|
||||
'.haproxy.namespace_driver.HaproxyNSDriver'],
|
||||
help=_('Drivers used to manage loadbalancing devices'),
|
||||
),
|
||||
cfg.StrOpt(
|
||||
'interface_driver',
|
||||
help=_('The driver used to manage the virtual interface')
|
||||
),
|
||||
cfg.StrOpt(
|
||||
'user_group',
|
||||
default='nogroup',
|
||||
help=_('The user group'),
|
||||
),
|
||||
]
|
||||
|
||||
|
||||
class LogicalDeviceCache(object):
|
||||
"""Manage a cache of known devices."""
|
||||
|
||||
class Device(object):
|
||||
"""Inner classes used to hold values for weakref lookups."""
|
||||
def __init__(self, port_id, pool_id):
|
||||
self.port_id = port_id
|
||||
self.pool_id = pool_id
|
||||
|
||||
def __eq__(self, other):
|
||||
return self.__dict__ == other.__dict__
|
||||
|
||||
def __hash__(self):
|
||||
return hash((self.port_id, self.pool_id))
|
||||
|
||||
def __init__(self):
|
||||
self.devices = set()
|
||||
self.port_lookup = weakref.WeakValueDictionary()
|
||||
self.pool_lookup = weakref.WeakValueDictionary()
|
||||
|
||||
def put(self, device):
|
||||
port_id = device['vip']['port_id']
|
||||
pool_id = device['pool']['id']
|
||||
d = self.Device(device['vip']['port_id'], device['pool']['id'])
|
||||
if d not in self.devices:
|
||||
self.devices.add(d)
|
||||
self.port_lookup[port_id] = d
|
||||
self.pool_lookup[pool_id] = d
|
||||
|
||||
def remove(self, device):
|
||||
if not isinstance(device, self.Device):
|
||||
device = self.Device(
|
||||
device['vip']['port_id'], device['pool']['id']
|
||||
)
|
||||
if device in self.devices:
|
||||
self.devices.remove(device)
|
||||
|
||||
def remove_by_pool_id(self, pool_id):
|
||||
d = self.pool_lookup.get(pool_id)
|
||||
if d:
|
||||
self.devices.remove(d)
|
||||
|
||||
def get_by_pool_id(self, pool_id):
|
||||
return self.pool_lookup.get(pool_id)
|
||||
|
||||
def get_by_port_id(self, port_id):
|
||||
return self.port_lookup.get(port_id)
|
||||
|
||||
def get_pool_ids(self):
|
||||
return self.pool_lookup.keys()
|
||||
class DeviceNotFoundOnAgent(n_exc.NotFound):
|
||||
msg = _('Unknown device with pool_id %(pool_id)s')
|
||||
|
||||
|
||||
class LbaasAgentManager(periodic_task.PeriodicTasks):
|
||||
|
||||
RPC_API_VERSION = '2.0'
|
||||
# history
|
||||
# 1.0 Initial version
|
||||
# 1.1 Support agent_updated call
|
||||
RPC_API_VERSION = '1.1'
|
||||
# 2.0 Generic API for agent based drivers
|
||||
# - modify/reload/destroy_pool methods were removed;
|
||||
# - added methods to handle create/update/delete for every lbaas
|
||||
# object individually;
|
||||
|
||||
def __init__(self, conf):
|
||||
self.conf = conf
|
||||
try:
|
||||
vif_driver = importutils.import_object(conf.interface_driver, conf)
|
||||
except ImportError:
|
||||
msg = _('Error importing interface driver: %s')
|
||||
raise SystemExit(msg % conf.interface_driver)
|
||||
|
||||
try:
|
||||
self.driver = importutils.import_object(
|
||||
conf.device_driver,
|
||||
config.get_root_helper(self.conf),
|
||||
conf.loadbalancer_state_path,
|
||||
vif_driver,
|
||||
self._vip_plug_callback
|
||||
)
|
||||
except ImportError:
|
||||
msg = _('Error importing loadbalancer device driver: %s')
|
||||
raise SystemExit(msg % conf.device_driver)
|
||||
self.context = context.get_admin_context_without_session()
|
||||
self.plugin_rpc = agent_api.LbaasAgentApi(
|
||||
plugin_driver.TOPIC_LOADBALANCER_PLUGIN,
|
||||
self.context,
|
||||
self.conf.host
|
||||
)
|
||||
self._load_drivers()
|
||||
|
||||
self.agent_state = {
|
||||
'binary': 'neutron-lbaas-agent',
|
||||
'host': conf.host,
|
||||
'topic': plugin_driver.TOPIC_LOADBALANCER_AGENT,
|
||||
'configurations': {'device_driver': conf.device_driver,
|
||||
'interface_driver': conf.interface_driver},
|
||||
'agent_type': constants.AGENT_TYPE_LOADBALANCER,
|
||||
'configurations': {'device_drivers': self.device_drivers.keys()},
|
||||
'agent_type': n_const.AGENT_TYPE_LOADBALANCER,
|
||||
'start_flag': True}
|
||||
self.admin_state_up = True
|
||||
|
||||
self.context = context.get_admin_context_without_session()
|
||||
self._setup_rpc()
|
||||
self._setup_state_rpc()
|
||||
self.needs_resync = False
|
||||
self.cache = LogicalDeviceCache()
|
||||
# pool_id->device_driver_name mapping used to store known instances
|
||||
self.instance_mapping = {}
|
||||
|
||||
def _setup_rpc(self):
|
||||
self.plugin_rpc = agent_api.LbaasAgentApi(
|
||||
plugin_driver.TOPIC_PROCESS_ON_HOST,
|
||||
self.context,
|
||||
self.conf.host
|
||||
)
|
||||
def _load_drivers(self):
|
||||
self.device_drivers = {}
|
||||
for driver in self.conf.device_driver:
|
||||
try:
|
||||
driver_inst = importutils.import_object(
|
||||
driver,
|
||||
self.conf,
|
||||
self.plugin_rpc
|
||||
)
|
||||
except ImportError:
|
||||
msg = _('Error importing loadbalancer device driver: %s')
|
||||
raise SystemExit(msg % driver)
|
||||
|
||||
driver_name = driver_inst.get_name()
|
||||
if driver_name not in self.device_drivers:
|
||||
self.device_drivers[driver_name] = driver_inst
|
||||
else:
|
||||
msg = _('Multiple device drivers with the same name found: %s')
|
||||
raise SystemExit(msg % driver_name)
|
||||
|
||||
def _setup_state_rpc(self):
|
||||
self.state_rpc = agent_rpc.PluginReportStateAPI(
|
||||
plugin_driver.TOPIC_PROCESS_ON_HOST)
|
||||
plugin_driver.TOPIC_LOADBALANCER_PLUGIN)
|
||||
report_interval = self.conf.AGENT.report_interval
|
||||
if report_interval:
|
||||
heartbeat = loopingcall.FixedIntervalLoopingCall(
|
||||
@ -170,8 +118,8 @@ class LbaasAgentManager(periodic_task.PeriodicTasks):
|
||||
|
||||
def _report_state(self):
|
||||
try:
|
||||
device_count = len(self.cache.devices)
|
||||
self.agent_state['configurations']['devices'] = device_count
|
||||
instance_count = len(self.instance_mapping)
|
||||
self.agent_state['configurations']['instances'] = instance_count
|
||||
self.state_rpc.report_state(self.context,
|
||||
self.agent_state)
|
||||
self.agent_state.pop('start_flag', None)
|
||||
@ -189,31 +137,26 @@ class LbaasAgentManager(periodic_task.PeriodicTasks):
|
||||
|
||||
@periodic_task.periodic_task(spacing=6)
|
||||
def collect_stats(self, context):
|
||||
for pool_id in self.cache.get_pool_ids():
|
||||
for pool_id, driver_name in self.instance_mapping.items():
|
||||
driver = self.device_drivers[driver_name]
|
||||
try:
|
||||
stats = self.driver.get_stats(pool_id)
|
||||
stats = driver.get_stats(pool_id)
|
||||
if stats:
|
||||
self.plugin_rpc.update_pool_stats(pool_id, stats)
|
||||
except Exception:
|
||||
LOG.exception(_('Error upating stats'))
|
||||
self.needs_resync = True
|
||||
|
||||
def _vip_plug_callback(self, action, port):
|
||||
if action == 'plug':
|
||||
self.plugin_rpc.plug_vip_port(port['id'])
|
||||
elif action == 'unplug':
|
||||
self.plugin_rpc.unplug_vip_port(port['id'])
|
||||
|
||||
def sync_state(self):
|
||||
known_devices = set(self.cache.get_pool_ids())
|
||||
known_instances = set(self.instance_mapping.keys())
|
||||
try:
|
||||
ready_logical_devices = set(self.plugin_rpc.get_ready_devices())
|
||||
ready_instances = set(self.plugin_rpc.get_ready_devices())
|
||||
|
||||
for deleted_id in known_devices - ready_logical_devices:
|
||||
self.destroy_device(deleted_id)
|
||||
for deleted_id in known_instances - ready_instances:
|
||||
self._destroy_pool(deleted_id)
|
||||
|
||||
for pool_id in ready_logical_devices:
|
||||
self.refresh_device(pool_id)
|
||||
for pool_id in ready_instances:
|
||||
self._reload_pool(pool_id)
|
||||
|
||||
except Exception:
|
||||
LOG.exception(_('Unable to retrieve ready devices'))
|
||||
@ -221,51 +164,168 @@ class LbaasAgentManager(periodic_task.PeriodicTasks):
|
||||
|
||||
self.remove_orphans()
|
||||
|
||||
def refresh_device(self, pool_id):
|
||||
def _get_driver(self, pool_id):
|
||||
if pool_id not in self.instance_mapping:
|
||||
raise DeviceNotFoundOnAgent(pool_id=pool_id)
|
||||
|
||||
driver_name = self.instance_mapping[pool_id]
|
||||
return self.device_drivers[driver_name]
|
||||
|
||||
def _reload_pool(self, pool_id):
|
||||
try:
|
||||
logical_config = self.plugin_rpc.get_logical_device(pool_id)
|
||||
driver_name = logical_config['driver']
|
||||
if driver_name not in self.device_drivers:
|
||||
LOG.error(_('No device driver '
|
||||
'on agent: %s.'), driver_name)
|
||||
self.plugin_rpc.update_status(
|
||||
'pool', pool_id, constants.ERROR)
|
||||
return
|
||||
|
||||
if self.driver.exists(pool_id):
|
||||
self.driver.update(logical_config)
|
||||
else:
|
||||
self.driver.create(logical_config)
|
||||
self.cache.put(logical_config)
|
||||
self.device_drivers[driver_name].deploy_instance(logical_config)
|
||||
self.instance_mapping[pool_id] = driver_name
|
||||
self.plugin_rpc.pool_deployed(pool_id)
|
||||
except Exception:
|
||||
LOG.exception(_('Unable to refresh device for pool: %s'), pool_id)
|
||||
LOG.exception(_('Unable to deploy instance for pool: %s'), pool_id)
|
||||
self.needs_resync = True
|
||||
|
||||
def destroy_device(self, pool_id):
|
||||
device = self.cache.get_by_pool_id(pool_id)
|
||||
if not device:
|
||||
return
|
||||
def _destroy_pool(self, pool_id):
|
||||
driver = self._get_driver(pool_id)
|
||||
try:
|
||||
self.driver.destroy(pool_id)
|
||||
driver.undeploy_instance(pool_id)
|
||||
del self.instance_mapping[pool_id]
|
||||
self.plugin_rpc.pool_destroyed(pool_id)
|
||||
except Exception:
|
||||
LOG.exception(_('Unable to destroy device for pool: %s'), pool_id)
|
||||
self.needs_resync = True
|
||||
self.cache.remove(device)
|
||||
|
||||
def remove_orphans(self):
|
||||
for driver_name in self.device_drivers:
|
||||
pool_ids = [pool_id for pool_id in self.instance_mapping
|
||||
if self.instance_mapping[pool_id] == driver_name]
|
||||
try:
|
||||
self.device_drivers[driver_name].remove_orphans(pool_ids)
|
||||
except NotImplementedError:
|
||||
pass # Not all drivers will support this
|
||||
|
||||
def _handle_failed_driver_call(self, operation, obj_type, obj_id, driver):
|
||||
LOG.exception(_('%(operation)s %(obj)s %(id)s failed on device driver '
|
||||
'%(driver)s'),
|
||||
{'operation': operation.capitalize(), 'obj': obj_type,
|
||||
'id': obj_id, 'driver': driver})
|
||||
self.plugin_rpc.update_status(obj_type, obj_id, constants.ERROR)
|
||||
|
||||
def create_vip(self, context, vip):
|
||||
driver = self._get_driver(vip['pool_id'])
|
||||
try:
|
||||
self.driver.remove_orphans(self.cache.get_pool_ids())
|
||||
except NotImplementedError:
|
||||
pass # Not all drivers will support this
|
||||
driver.create_vip(vip)
|
||||
except Exception:
|
||||
self._handle_failed_driver_call('create', 'vip', vip['id'],
|
||||
driver.get_name())
|
||||
else:
|
||||
self.plugin_rpc.update_status('vip', vip['id'], constants.ACTIVE)
|
||||
|
||||
def reload_pool(self, context, pool_id=None, host=None):
|
||||
"""Handle RPC cast from plugin to reload a pool."""
|
||||
if pool_id:
|
||||
self.refresh_device(pool_id)
|
||||
def update_vip(self, context, old_vip, vip):
|
||||
driver = self._get_driver(vip['pool_id'])
|
||||
try:
|
||||
driver.update_vip(old_vip, vip)
|
||||
except Exception:
|
||||
self._handle_failed_driver_call('update', 'vip', vip['id'],
|
||||
driver.get_name())
|
||||
else:
|
||||
self.plugin_rpc.update_status('vip', vip['id'], constants.ACTIVE)
|
||||
|
||||
def modify_pool(self, context, pool_id=None, host=None):
|
||||
"""Handle RPC cast from plugin to modify a pool if known to agent."""
|
||||
if self.cache.get_by_pool_id(pool_id):
|
||||
self.refresh_device(pool_id)
|
||||
def delete_vip(self, context, vip):
|
||||
driver = self._get_driver(vip['pool_id'])
|
||||
driver.delete_vip(vip)
|
||||
|
||||
def destroy_pool(self, context, pool_id=None, host=None):
|
||||
"""Handle RPC cast from plugin to destroy a pool if known to agent."""
|
||||
if self.cache.get_by_pool_id(pool_id):
|
||||
self.destroy_device(pool_id)
|
||||
def create_pool(self, context, pool, driver_name):
|
||||
if driver_name not in self.device_drivers:
|
||||
LOG.error(_('No device driver on agent: %s.'), driver_name)
|
||||
self.plugin_rpc.update_status('pool', pool['id'], constants.ERROR)
|
||||
return
|
||||
|
||||
driver = self.device_drivers[driver_name]
|
||||
try:
|
||||
driver.create_pool(pool)
|
||||
except Exception:
|
||||
self._handle_failed_driver_call('create', 'pool', pool['id'],
|
||||
driver.get_name())
|
||||
else:
|
||||
self.instance_mapping[pool['id']] = driver_name
|
||||
self.plugin_rpc.update_status('pool', pool['id'], constants.ACTIVE)
|
||||
|
||||
def update_pool(self, context, old_pool, pool):
|
||||
driver = self._get_driver(pool['id'])
|
||||
try:
|
||||
driver.update_pool(old_pool, pool)
|
||||
except Exception:
|
||||
self._handle_failed_driver_call('update', 'pool', pool['id'],
|
||||
driver.get_name())
|
||||
else:
|
||||
self.plugin_rpc.update_status('pool', pool['id'], constants.ACTIVE)
|
||||
|
||||
def delete_pool(self, context, pool):
|
||||
driver = self._get_driver(pool['id'])
|
||||
driver.delete_pool(pool)
|
||||
del self.instance_mapping[pool['id']]
|
||||
|
||||
def create_member(self, context, member):
|
||||
driver = self._get_driver(member['pool_id'])
|
||||
try:
|
||||
driver.create_member(member)
|
||||
except Exception:
|
||||
self._handle_failed_driver_call('create', 'member', member['id'],
|
||||
driver.get_name())
|
||||
else:
|
||||
self.plugin_rpc.update_status('member', member['id'],
|
||||
constants.ACTIVE)
|
||||
|
||||
def update_member(self, context, old_member, member):
|
||||
driver = self._get_driver(member['pool_id'])
|
||||
try:
|
||||
driver.update_member(old_member, member)
|
||||
except Exception:
|
||||
self._handle_failed_driver_call('update', 'member', member['id'],
|
||||
driver.get_name())
|
||||
else:
|
||||
self.plugin_rpc.update_status('member', member['id'],
|
||||
constants.ACTIVE)
|
||||
|
||||
def delete_member(self, context, member):
|
||||
driver = self._get_driver(member['pool_id'])
|
||||
driver.delete_member(member)
|
||||
|
||||
def create_pool_health_monitor(self, context, health_monitor, pool_id):
|
||||
driver = self._get_driver(pool_id)
|
||||
assoc_id = {'pool_id': pool_id, 'monitor_id': health_monitor['id']}
|
||||
try:
|
||||
driver.create_pool_health_monitor(health_monitor, pool_id)
|
||||
except Exception:
|
||||
self._handle_failed_driver_call(
|
||||
'create', 'health_monitor', assoc_id, driver.get_name())
|
||||
else:
|
||||
self.plugin_rpc.update_status(
|
||||
'health_monitor', assoc_id, constants.ACTIVE)
|
||||
|
||||
def update_pool_health_monitor(self, context, old_health_monitor,
|
||||
health_monitor, pool_id):
|
||||
driver = self._get_driver(pool_id)
|
||||
assoc_id = {'pool_id': pool_id, 'monitor_id': health_monitor['id']}
|
||||
try:
|
||||
driver.update_pool_health_monitor(old_health_monitor,
|
||||
health_monitor,
|
||||
pool_id)
|
||||
except Exception:
|
||||
self._handle_failed_driver_call(
|
||||
'update', 'health_monitor', assoc_id, driver.get_name())
|
||||
else:
|
||||
self.plugin_rpc.update_status(
|
||||
'health_monitor', assoc_id, constants.ACTIVE)
|
||||
|
||||
def delete_pool_health_monitor(self, context, health_monitor, pool_id):
|
||||
driver = self._get_driver(pool_id)
|
||||
driver.delete_pool_health_monitor(health_monitor, pool_id)
|
||||
|
||||
def agent_updated(self, context, payload):
|
||||
"""Handle the agent_updated notification event."""
|
||||
@ -274,6 +334,8 @@ class LbaasAgentManager(periodic_task.PeriodicTasks):
|
||||
if self.admin_state_up:
|
||||
self.needs_resync = True
|
||||
else:
|
||||
for pool_id in self.cache.get_pool_ids():
|
||||
self.destroy_device(pool_id)
|
||||
LOG.info(_("agent_updated by server side %s!"), payload)
|
||||
for pool_id in self.instance_mapping.keys():
|
||||
LOG.info(_("Destroying pool %s due to agent disabling"),
|
||||
pool_id)
|
||||
self._destroy_pool(pool_id)
|
||||
LOG.info(_("Agent_updated by server side %s!"), payload)
|
||||
|
@ -18,8 +18,6 @@
|
||||
|
||||
import itertools
|
||||
|
||||
from oslo.config import cfg
|
||||
|
||||
from neutron.agent.linux import utils
|
||||
from neutron.plugins.common import constants as qconstants
|
||||
from neutron.services.loadbalancer import constants
|
||||
@ -53,21 +51,23 @@ ACTIVE = qconstants.ACTIVE
|
||||
INACTIVE = qconstants.INACTIVE
|
||||
|
||||
|
||||
def save_config(conf_path, logical_config, socket_path=None):
|
||||
def save_config(conf_path, logical_config, socket_path=None,
|
||||
user_group='nogroup'):
|
||||
"""Convert a logical configuration to the HAProxy version."""
|
||||
data = []
|
||||
data.extend(_build_global(logical_config, socket_path=socket_path))
|
||||
data.extend(_build_global(logical_config, socket_path=socket_path,
|
||||
user_group=user_group))
|
||||
data.extend(_build_defaults(logical_config))
|
||||
data.extend(_build_frontend(logical_config))
|
||||
data.extend(_build_backend(logical_config))
|
||||
utils.replace_file(conf_path, '\n'.join(data))
|
||||
|
||||
|
||||
def _build_global(config, socket_path=None):
|
||||
def _build_global(config, socket_path=None, user_group='nogroup'):
|
||||
opts = [
|
||||
'daemon',
|
||||
'user nobody',
|
||||
'group %s' % cfg.CONF.user_group,
|
||||
'group %s' % user_group,
|
||||
'log /dev/log local0',
|
||||
'log /dev/log local1 notice'
|
||||
]
|
||||
|
@ -20,27 +20,69 @@ import shutil
|
||||
import socket
|
||||
|
||||
import netaddr
|
||||
from oslo.config import cfg
|
||||
|
||||
from neutron.agent.common import config
|
||||
from neutron.agent.linux import ip_lib
|
||||
from neutron.agent.linux import utils
|
||||
from neutron.common import exceptions
|
||||
from neutron.openstack.common import importutils
|
||||
from neutron.openstack.common import log as logging
|
||||
from neutron.plugins.common import constants
|
||||
from neutron.services.loadbalancer import constants as lb_const
|
||||
from neutron.services.loadbalancer.drivers import agent_device_driver
|
||||
from neutron.services.loadbalancer.drivers.haproxy import cfg as hacfg
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
NS_PREFIX = 'qlbaas-'
|
||||
DRIVER_NAME = 'haproxy_ns'
|
||||
|
||||
ACTIVE_PENDING = (
|
||||
constants.ACTIVE,
|
||||
constants.PENDING_CREATE,
|
||||
constants.PENDING_UPDATE
|
||||
)
|
||||
|
||||
STATE_PATH_DEFAULT = '$state_path/lbaas'
|
||||
USER_GROUP_DEFAULT = 'nogroup'
|
||||
OPTS = [
|
||||
cfg.StrOpt(
|
||||
'loadbalancer_state_path',
|
||||
default=STATE_PATH_DEFAULT,
|
||||
help=_('Location to store config and state files'),
|
||||
deprecated_opts=[cfg.DeprecatedOpt('loadbalancer_state_path')],
|
||||
),
|
||||
cfg.StrOpt(
|
||||
'user_group',
|
||||
default=USER_GROUP_DEFAULT,
|
||||
help=_('The user group'),
|
||||
deprecated_opts=[cfg.DeprecatedOpt('user_group')],
|
||||
)
|
||||
]
|
||||
cfg.CONF.register_opts(OPTS, 'haproxy')
|
||||
|
||||
|
||||
class HaproxyNSDriver(object):
|
||||
def __init__(self, root_helper, state_path, vif_driver, vip_plug_callback):
|
||||
self.root_helper = root_helper
|
||||
self.state_path = state_path
|
||||
class HaproxyNSDriver(agent_device_driver.AgentDeviceDriver):
|
||||
def __init__(self, conf, plugin_rpc):
|
||||
self.conf = conf
|
||||
self.root_helper = config.get_root_helper(conf)
|
||||
self.state_path = conf.haproxy.loadbalancer_state_path
|
||||
try:
|
||||
vif_driver = importutils.import_object(conf.interface_driver, conf)
|
||||
except ImportError:
|
||||
msg = (_('Error importing interface driver: %s')
|
||||
% conf.haproxy.interface_driver)
|
||||
LOG.error(msg)
|
||||
raise
|
||||
|
||||
self.vif_driver = vif_driver
|
||||
self.vip_plug_callback = vip_plug_callback
|
||||
self.plugin_rpc = plugin_rpc
|
||||
self.pool_to_port_id = {}
|
||||
|
||||
@classmethod
|
||||
def get_name(cls):
|
||||
return DRIVER_NAME
|
||||
|
||||
def create(self, logical_config):
|
||||
pool_id = logical_config['pool']['id']
|
||||
namespace = get_ns_name(pool_id)
|
||||
@ -62,8 +104,9 @@ class HaproxyNSDriver(object):
|
||||
conf_path = self._get_state_file_path(pool_id, 'conf')
|
||||
pid_path = self._get_state_file_path(pool_id, 'pid')
|
||||
sock_path = self._get_state_file_path(pool_id, 'sock')
|
||||
user_group = self.conf.haproxy.user_group
|
||||
|
||||
hacfg.save_config(conf_path, logical_config, sock_path)
|
||||
hacfg.save_config(conf_path, logical_config, sock_path, user_group)
|
||||
cmd = ['haproxy', '-f', conf_path, '-p', pid_path]
|
||||
cmd.extend(extra_cmd_args)
|
||||
|
||||
@ -73,7 +116,7 @@ class HaproxyNSDriver(object):
|
||||
# remember the pool<>port mapping
|
||||
self.pool_to_port_id[pool_id] = logical_config['vip']['port']['id']
|
||||
|
||||
def destroy(self, pool_id):
|
||||
def undeploy_instance(self, pool_id):
|
||||
namespace = get_ns_name(pool_id)
|
||||
ns = ip_lib.IPWrapper(self.root_helper, namespace)
|
||||
pid_path = self._get_state_file_path(pool_id, 'pid')
|
||||
@ -176,9 +219,6 @@ class HaproxyNSDriver(object):
|
||||
|
||||
return res_stats
|
||||
|
||||
def remove_orphans(self, known_pool_ids):
|
||||
raise NotImplementedError()
|
||||
|
||||
def _get_state_file_path(self, pool_id, kind, ensure_state_dir=True):
|
||||
"""Returns the file name for a given kind of config file."""
|
||||
confs_dir = os.path.abspath(os.path.normpath(self.state_path))
|
||||
@ -189,7 +229,7 @@ class HaproxyNSDriver(object):
|
||||
return os.path.join(conf_dir, kind)
|
||||
|
||||
def _plug(self, namespace, port, reuse_existing=True):
|
||||
self.vip_plug_callback('plug', port)
|
||||
self.plugin_rpc.plug_vip_port(port['id'])
|
||||
interface_name = self.vif_driver.get_device_name(Wrap(port))
|
||||
|
||||
if ip_lib.device_exists(interface_name, self.root_helper, namespace):
|
||||
@ -222,10 +262,67 @@ class HaproxyNSDriver(object):
|
||||
|
||||
def _unplug(self, namespace, port_id):
|
||||
port_stub = {'id': port_id}
|
||||
self.vip_plug_callback('unplug', port_stub)
|
||||
self.plugin_rpc.unplug_vip_port(port_id)
|
||||
interface_name = self.vif_driver.get_device_name(Wrap(port_stub))
|
||||
self.vif_driver.unplug(interface_name, namespace=namespace)
|
||||
|
||||
def deploy_instance(self, logical_config):
|
||||
# do actual deploy only if vip is configured and active
|
||||
if ('vip' not in logical_config or
|
||||
logical_config['vip']['status'] not in ACTIVE_PENDING or
|
||||
not logical_config['vip']['admin_state_up']):
|
||||
return
|
||||
|
||||
if self.exists(logical_config['pool']['id']):
|
||||
self.update(logical_config)
|
||||
else:
|
||||
self.create(logical_config)
|
||||
|
||||
def _refresh_device(self, pool_id):
|
||||
logical_config = self.plugin_rpc.get_logical_device(pool_id)
|
||||
self.deploy_instance(logical_config)
|
||||
|
||||
def create_vip(self, vip):
|
||||
self._refresh_device(vip['pool_id'])
|
||||
|
||||
def update_vip(self, old_vip, vip):
|
||||
self._refresh_device(vip['pool_id'])
|
||||
|
||||
def delete_vip(self, vip):
|
||||
self.undeploy_instance(vip['pool_id'])
|
||||
|
||||
def create_pool(self, pool):
|
||||
# nothing to do here because a pool needs a vip to be useful
|
||||
pass
|
||||
|
||||
def update_pool(self, old_pool, pool):
|
||||
self._refresh_device(pool['id'])
|
||||
|
||||
def delete_pool(self, pool):
|
||||
# delete_pool may be called before vip deletion in case
|
||||
# pool's admin state set to down
|
||||
if self.exists(pool['id']):
|
||||
self.undeploy_instance(pool['id'])
|
||||
|
||||
def create_member(self, member):
|
||||
self._refresh_device(member['pool_id'])
|
||||
|
||||
def update_member(self, old_member, member):
|
||||
self._refresh_device(member['pool_id'])
|
||||
|
||||
def delete_member(self, member):
|
||||
self._refresh_device(member['pool_id'])
|
||||
|
||||
def create_pool_health_monitor(self, health_monitor, pool_id):
|
||||
self._refresh_device(pool_id)
|
||||
|
||||
def update_pool_health_monitor(self, old_health_monitor, health_monitor,
|
||||
pool_id):
|
||||
self._refresh_device(pool_id)
|
||||
|
||||
def delete_pool_health_monitor(self, health_monitor, pool_id):
|
||||
self._refresh_device(pool_id)
|
||||
|
||||
|
||||
# NOTE (markmcclain) For compliance with interface.py which expects objects
|
||||
class Wrap(object):
|
||||
|
@ -53,13 +53,23 @@ AGENT_SCHEDULER_OPTS = [
|
||||
cfg.CONF.register_opts(AGENT_SCHEDULER_OPTS)
|
||||
|
||||
# topic name for this particular agent implementation
|
||||
TOPIC_PROCESS_ON_HOST = 'q-lbaas-process-on-host'
|
||||
TOPIC_LOADBALANCER_AGENT = 'lbaas_process_on_host_agent'
|
||||
TOPIC_LOADBALANCER_PLUGIN = 'n-lbaas-plugin'
|
||||
TOPIC_LOADBALANCER_AGENT = 'n-lbaas_agent'
|
||||
|
||||
|
||||
class DriverNotSpecified(q_exc.NeutronException):
|
||||
message = _("Device driver for agent should be specified "
|
||||
"in plugin driver.")
|
||||
|
||||
|
||||
class LoadBalancerCallbacks(object):
|
||||
|
||||
RPC_API_VERSION = '1.0'
|
||||
RPC_API_VERSION = '2.0'
|
||||
# history
|
||||
# 1.0 Initial version
|
||||
# 2.0 Generic API for agent based drivers
|
||||
# - get_logical_device() handling changed;
|
||||
# - pool_deployed() and update_status() methods added;
|
||||
|
||||
def __init__(self, plugin):
|
||||
self.plugin = plugin
|
||||
@ -70,67 +80,47 @@ class LoadBalancerCallbacks(object):
|
||||
|
||||
def get_ready_devices(self, context, host=None):
|
||||
with context.session.begin(subtransactions=True):
|
||||
qry = (context.session.query(loadbalancer_db.Pool.id).
|
||||
join(loadbalancer_db.Vip))
|
||||
|
||||
qry = qry.filter(loadbalancer_db.Vip.status.in_(ACTIVE_PENDING))
|
||||
qry = qry.filter(loadbalancer_db.Pool.status.in_(ACTIVE_PENDING))
|
||||
up = True # makes pep8 and sqlalchemy happy
|
||||
qry = qry.filter(loadbalancer_db.Vip.admin_state_up == up)
|
||||
qry = qry.filter(loadbalancer_db.Pool.admin_state_up == up)
|
||||
agents = self.plugin.get_lbaas_agents(context,
|
||||
filters={'host': [host]})
|
||||
if not agents:
|
||||
return []
|
||||
elif len(agents) > 1:
|
||||
LOG.warning(_('Multiple lbaas agents found on host %s'), host)
|
||||
|
||||
pools = self.plugin.list_pools_on_lbaas_agent(context,
|
||||
agents[0].id)
|
||||
pool_ids = [pool['id'] for pool in pools['pools']]
|
||||
|
||||
qry = context.session.query(loadbalancer_db.Pool.id)
|
||||
qry = qry.filter(loadbalancer_db.Pool.id.in_(pool_ids))
|
||||
qry = qry.filter(loadbalancer_db.Pool.status.in_(ACTIVE_PENDING))
|
||||
up = True # makes pep8 and sqlalchemy happy
|
||||
qry = qry.filter(loadbalancer_db.Pool.admin_state_up == up)
|
||||
return [id for id, in qry]
|
||||
|
||||
def get_logical_device(self, context, pool_id=None, activate=True,
|
||||
**kwargs):
|
||||
def get_logical_device(self, context, pool_id=None):
|
||||
with context.session.begin(subtransactions=True):
|
||||
qry = context.session.query(loadbalancer_db.Pool)
|
||||
qry = qry.filter_by(id=pool_id)
|
||||
pool = qry.one()
|
||||
|
||||
if activate:
|
||||
# set all resources to active
|
||||
if pool.status in ACTIVE_PENDING:
|
||||
pool.status = constants.ACTIVE
|
||||
|
||||
if pool.vip.status in ACTIVE_PENDING:
|
||||
pool.vip.status = constants.ACTIVE
|
||||
|
||||
for m in pool.members:
|
||||
if m.status in ACTIVE_PENDING:
|
||||
m.status = constants.ACTIVE
|
||||
|
||||
for hm in pool.monitors:
|
||||
if hm.status in ACTIVE_PENDING:
|
||||
hm.status = constants.ACTIVE
|
||||
|
||||
if (pool.status != constants.ACTIVE
|
||||
or pool.vip.status != constants.ACTIVE):
|
||||
raise q_exc.Invalid(_('Expected active pool and vip'))
|
||||
if pool.status != constants.ACTIVE:
|
||||
raise q_exc.Invalid(_('Expected active pool'))
|
||||
|
||||
retval = {}
|
||||
retval['pool'] = self.plugin._make_pool_dict(pool)
|
||||
retval['vip'] = self.plugin._make_vip_dict(pool.vip)
|
||||
retval['vip']['port'] = (
|
||||
self.plugin._core_plugin._make_port_dict(pool.vip.port)
|
||||
)
|
||||
for fixed_ip in retval['vip']['port']['fixed_ips']:
|
||||
fixed_ip['subnet'] = (
|
||||
self.plugin._core_plugin.get_subnet(
|
||||
context,
|
||||
fixed_ip['subnet_id']
|
||||
)
|
||||
|
||||
if pool.vip:
|
||||
retval['vip'] = self.plugin._make_vip_dict(pool.vip)
|
||||
retval['vip']['port'] = (
|
||||
self.plugin._core_plugin._make_port_dict(pool.vip.port)
|
||||
)
|
||||
for fixed_ip in retval['vip']['port']['fixed_ips']:
|
||||
fixed_ip['subnet'] = (
|
||||
self.plugin._core_plugin.get_subnet(
|
||||
context,
|
||||
fixed_ip['subnet_id']
|
||||
)
|
||||
)
|
||||
retval['members'] = [
|
||||
self.plugin._make_member_dict(m)
|
||||
for m in pool.members if m.status in (constants.ACTIVE,
|
||||
@ -141,10 +131,49 @@ class LoadBalancerCallbacks(object):
|
||||
for hm in pool.monitors
|
||||
if hm.status == constants.ACTIVE
|
||||
]
|
||||
retval['driver'] = (
|
||||
self.plugin.drivers[pool.provider.provider_name].device_driver)
|
||||
|
||||
return retval
|
||||
|
||||
def pool_destroyed(self, context, pool_id=None, host=None):
|
||||
def pool_deployed(self, context, pool_id):
|
||||
with context.session.begin(subtransactions=True):
|
||||
qry = context.session.query(loadbalancer_db.Pool)
|
||||
qry = qry.filter_by(id=pool_id)
|
||||
pool = qry.one()
|
||||
|
||||
# set all resources to active
|
||||
if pool.status in ACTIVE_PENDING:
|
||||
pool.status = constants.ACTIVE
|
||||
|
||||
if pool.vip and pool.vip.status in ACTIVE_PENDING:
|
||||
pool.vip.status = constants.ACTIVE
|
||||
|
||||
for m in pool.members:
|
||||
if m.status in ACTIVE_PENDING:
|
||||
m.status = constants.ACTIVE
|
||||
|
||||
for hm in pool.monitors:
|
||||
if hm.status in ACTIVE_PENDING:
|
||||
hm.status = constants.ACTIVE
|
||||
|
||||
def update_status(self, context, obj_type, obj_id, status):
|
||||
model_mapping = {
|
||||
'pool': loadbalancer_db.Pool,
|
||||
'vip': loadbalancer_db.Vip,
|
||||
'member': loadbalancer_db.Member,
|
||||
'health_monitor': loadbalancer_db.PoolMonitorAssociation
|
||||
}
|
||||
if obj_type not in model_mapping:
|
||||
raise q_exc.Invalid(_('Unknown object type: %s') % obj_type)
|
||||
elif obj_type == 'health_monitor':
|
||||
self.plugin.update_pool_health_monitor(
|
||||
context, obj_id['monitor_id'], obj_id['pool_id'], status)
|
||||
else:
|
||||
self.plugin.update_status(
|
||||
context, model_mapping[obj_type], obj_id, status)
|
||||
|
||||
def pool_destroyed(self, context, pool_id=None):
|
||||
"""Agent confirmation hook that a pool has been destroyed.
|
||||
|
||||
This method exists for subclasses to change the deletion
|
||||
@ -214,65 +243,116 @@ class LoadBalancerCallbacks(object):
|
||||
class LoadBalancerAgentApi(proxy.RpcProxy):
|
||||
"""Plugin side of plugin to agent RPC API."""
|
||||
|
||||
BASE_RPC_API_VERSION = '1.0'
|
||||
BASE_RPC_API_VERSION = '2.0'
|
||||
# history
|
||||
# 1.0 Initial version
|
||||
# 1.1 Support agent_updated call
|
||||
# 2.0 Generic API for agent based drivers
|
||||
# - modify/reload/destroy_pool methods were removed;
|
||||
# - added methods to handle create/update/delete for every lbaas
|
||||
# object individually;
|
||||
|
||||
def __init__(self, topic):
|
||||
super(LoadBalancerAgentApi, self).__init__(
|
||||
topic, default_version=self.BASE_RPC_API_VERSION)
|
||||
|
||||
def reload_pool(self, context, pool_id, host):
|
||||
def _cast(self, context, method_name, method_args, host, version=None):
|
||||
return self.cast(
|
||||
context,
|
||||
self.make_msg('reload_pool', pool_id=pool_id, host=host),
|
||||
topic='%s.%s' % (self.topic, host)
|
||||
self.make_msg(method_name, **method_args),
|
||||
topic='%s.%s' % (self.topic, host),
|
||||
version=version
|
||||
)
|
||||
|
||||
def destroy_pool(self, context, pool_id, host):
|
||||
return self.cast(
|
||||
context,
|
||||
self.make_msg('destroy_pool', pool_id=pool_id, host=host),
|
||||
topic='%s.%s' % (self.topic, host)
|
||||
)
|
||||
def create_vip(self, context, vip, host):
|
||||
return self._cast(context, 'create_vip', {'vip': vip}, host)
|
||||
|
||||
def modify_pool(self, context, pool_id, host):
|
||||
return self.cast(
|
||||
context,
|
||||
self.make_msg('modify_pool', pool_id=pool_id, host=host),
|
||||
topic='%s.%s' % (self.topic, host)
|
||||
)
|
||||
def update_vip(self, context, old_vip, vip, host):
|
||||
return self._cast(context, 'update_vip',
|
||||
{'old_vip': old_vip, 'vip': vip}, host)
|
||||
|
||||
def delete_vip(self, context, vip, host):
|
||||
return self._cast(context, 'delete_vip', {'vip': vip}, host)
|
||||
|
||||
def create_pool(self, context, pool, host, driver_name):
|
||||
return self._cast(context, 'create_pool',
|
||||
{'pool': pool, 'driver_name': driver_name}, host)
|
||||
|
||||
def update_pool(self, context, old_pool, pool, host):
|
||||
return self._cast(context, 'update_pool',
|
||||
{'old_pool': old_pool, 'pool': pool}, host)
|
||||
|
||||
def delete_pool(self, context, pool, host):
|
||||
return self._cast(context, 'delete_pool', {'pool': pool}, host)
|
||||
|
||||
def create_member(self, context, member, host):
|
||||
return self._cast(context, 'create_member', {'member': member}, host)
|
||||
|
||||
def update_member(self, context, old_member, member, host):
|
||||
return self._cast(context, 'update_member',
|
||||
{'old_member': old_member, 'member': member}, host)
|
||||
|
||||
def delete_member(self, context, member, host):
|
||||
return self._cast(context, 'delete_member', {'member': member}, host)
|
||||
|
||||
def create_pool_health_monitor(self, context, health_monitor, pool_id,
|
||||
host):
|
||||
return self._cast(context, 'create_pool_health_monitor',
|
||||
{'health_monitor': health_monitor,
|
||||
'pool_id': pool_id}, host)
|
||||
|
||||
def update_pool_health_monitor(self, context, old_health_monitor,
|
||||
health_monitor, pool_id, host):
|
||||
return self._cast(context, 'update_pool_health_monitor',
|
||||
{'old_health_monitor': old_health_monitor,
|
||||
'health_monitor': health_monitor,
|
||||
'pool_id': pool_id}, host)
|
||||
|
||||
def delete_pool_health_monitor(self, context, health_monitor, pool_id,
|
||||
host):
|
||||
return self._cast(context, 'delete_pool_health_monitor',
|
||||
{'health_monitor': health_monitor,
|
||||
'pool_id': pool_id}, host)
|
||||
|
||||
def agent_updated(self, context, admin_state_up, host):
|
||||
return self.cast(
|
||||
context,
|
||||
self.make_msg('agent_updated',
|
||||
payload={'admin_state_up': admin_state_up}),
|
||||
topic='%s.%s' % (self.topic, host),
|
||||
version='1.1'
|
||||
)
|
||||
return self._cast(context, 'agent_updated',
|
||||
{'payload': {'admin_state_up': admin_state_up}},
|
||||
host)
|
||||
|
||||
|
||||
class HaproxyOnHostPluginDriver(abstract_driver.LoadBalancerAbstractDriver):
|
||||
class AgentBasedPluginDriver(abstract_driver.LoadBalancerAbstractDriver):
|
||||
|
||||
# name of device driver that should be used by the agent;
|
||||
# vendor specific plugin drivers must override it;
|
||||
device_driver = None
|
||||
|
||||
def __init__(self, plugin):
|
||||
self.agent_rpc = LoadBalancerAgentApi(TOPIC_LOADBALANCER_AGENT)
|
||||
self.callbacks = LoadBalancerCallbacks(plugin)
|
||||
if not self.device_driver:
|
||||
raise DriverNotSpecified()
|
||||
|
||||
self.agent_rpc = LoadBalancerAgentApi(TOPIC_LOADBALANCER_AGENT)
|
||||
|
||||
self.conn = rpc.create_connection(new=True)
|
||||
self.conn.create_consumer(
|
||||
TOPIC_PROCESS_ON_HOST,
|
||||
self.callbacks.create_rpc_dispatcher(),
|
||||
fanout=False)
|
||||
self.conn.consume_in_thread()
|
||||
self.plugin = plugin
|
||||
self._set_callbacks_on_plugin()
|
||||
self.plugin.agent_notifiers.update(
|
||||
{q_const.AGENT_TYPE_LOADBALANCER: self.agent_rpc})
|
||||
|
||||
self.pool_scheduler = importutils.import_object(
|
||||
cfg.CONF.loadbalancer_pool_scheduler_driver)
|
||||
|
||||
def _set_callbacks_on_plugin(self):
|
||||
# other agent based plugin driver might already set callbacks on plugin
|
||||
if hasattr(self.plugin, 'agent_callbacks'):
|
||||
return
|
||||
|
||||
self.plugin.agent_callbacks = LoadBalancerCallbacks(self.plugin)
|
||||
self.plugin.conn = rpc.create_connection(new=True)
|
||||
self.plugin.conn.create_consumer(
|
||||
TOPIC_LOADBALANCER_PLUGIN,
|
||||
self.plugin.agent_callbacks.create_rpc_dispatcher(),
|
||||
fanout=False)
|
||||
self.plugin.conn.consume_in_thread()
|
||||
|
||||
def get_pool_agent(self, context, pool_id):
|
||||
agent = self.plugin.get_lbaas_agent_hosting_pool(context, pool_id)
|
||||
if not agent:
|
||||
@ -281,80 +361,95 @@ class HaproxyOnHostPluginDriver(abstract_driver.LoadBalancerAbstractDriver):
|
||||
|
||||
def create_vip(self, context, vip):
|
||||
agent = self.get_pool_agent(context, vip['pool_id'])
|
||||
self.agent_rpc.reload_pool(context, vip['pool_id'], agent['host'])
|
||||
self.agent_rpc.create_vip(context, vip, agent['host'])
|
||||
|
||||
def update_vip(self, context, old_vip, vip):
|
||||
agent = self.get_pool_agent(context, vip['pool_id'])
|
||||
if vip['status'] in ACTIVE_PENDING:
|
||||
self.agent_rpc.reload_pool(context, vip['pool_id'], agent['host'])
|
||||
self.agent_rpc.update_vip(context, old_vip, vip, agent['host'])
|
||||
else:
|
||||
self.agent_rpc.destroy_pool(context, vip['pool_id'], agent['host'])
|
||||
self.agent_rpc.delete_vip(context, vip, agent['host'])
|
||||
|
||||
def delete_vip(self, context, vip):
|
||||
self.plugin._delete_db_vip(context, vip['id'])
|
||||
agent = self.get_pool_agent(context, vip['pool_id'])
|
||||
self.agent_rpc.destroy_pool(context, vip['pool_id'], agent['host'])
|
||||
self.agent_rpc.delete_vip(context, vip, agent['host'])
|
||||
|
||||
def create_pool(self, context, pool):
|
||||
if not self.pool_scheduler.schedule(self.plugin, context, pool):
|
||||
agent = self.pool_scheduler.schedule(self.plugin, context, pool,
|
||||
self.device_driver)
|
||||
if not agent:
|
||||
raise lbaas_agentscheduler.NoEligibleLbaasAgent(pool_id=pool['id'])
|
||||
# don't notify here because a pool needs a vip to be useful
|
||||
self.agent_rpc.create_pool(context, pool, agent['host'],
|
||||
self.device_driver)
|
||||
|
||||
def update_pool(self, context, old_pool, pool):
|
||||
agent = self.get_pool_agent(context, pool['id'])
|
||||
if pool['status'] in ACTIVE_PENDING:
|
||||
if pool['vip_id'] is not None:
|
||||
self.agent_rpc.reload_pool(context, pool['id'], agent['host'])
|
||||
self.agent_rpc.update_pool(context, old_pool, pool,
|
||||
agent['host'])
|
||||
else:
|
||||
self.agent_rpc.destroy_pool(context, pool['id'], agent['host'])
|
||||
self.agent_rpc.delete_pool(context, pool, agent['host'])
|
||||
|
||||
def delete_pool(self, context, pool):
|
||||
# get agent first to know host as binding will be deleted
|
||||
# after pool is deleted from db
|
||||
agent = self.plugin.get_lbaas_agent_hosting_pool(context, pool['id'])
|
||||
if agent:
|
||||
self.agent_rpc.destroy_pool(context, pool['id'],
|
||||
agent['agent']['host'])
|
||||
self.plugin._delete_db_pool(context, pool['id'])
|
||||
if agent:
|
||||
self.agent_rpc.delete_pool(context, pool, agent['agent']['host'])
|
||||
|
||||
def create_member(self, context, member):
|
||||
agent = self.get_pool_agent(context, member['pool_id'])
|
||||
self.agent_rpc.modify_pool(context, member['pool_id'], agent['host'])
|
||||
self.agent_rpc.create_member(context, member, agent['host'])
|
||||
|
||||
def update_member(self, context, old_member, member):
|
||||
agent = self.get_pool_agent(context, member['pool_id'])
|
||||
# member may change pool id
|
||||
if member['pool_id'] != old_member['pool_id']:
|
||||
agent = self.plugin.get_lbaas_agent_hosting_pool(
|
||||
old_pool_agent = self.plugin.get_lbaas_agent_hosting_pool(
|
||||
context, old_member['pool_id'])
|
||||
if agent:
|
||||
self.agent_rpc.modify_pool(context,
|
||||
old_member['pool_id'],
|
||||
agent['agent']['host'])
|
||||
agent = self.get_pool_agent(context, member['pool_id'])
|
||||
self.agent_rpc.modify_pool(context, member['pool_id'], agent['host'])
|
||||
if old_pool_agent:
|
||||
self.agent_rpc.delete_member(context, old_member,
|
||||
old_pool_agent['agent']['host'])
|
||||
self.agent_rpc.create_member(context, member, agent['host'])
|
||||
else:
|
||||
self.agent_rpc.update_member(context, old_member, member,
|
||||
agent['host'])
|
||||
|
||||
def delete_member(self, context, member):
|
||||
self.plugin._delete_db_member(context, member['id'])
|
||||
agent = self.get_pool_agent(context, member['pool_id'])
|
||||
self.agent_rpc.modify_pool(context, member['pool_id'], agent['host'])
|
||||
|
||||
def update_health_monitor(self, context, old_health_monitor,
|
||||
health_monitor, pool_id):
|
||||
# monitors are unused here because agent will fetch what is necessary
|
||||
agent = self.get_pool_agent(context, pool_id)
|
||||
self.agent_rpc.modify_pool(context, pool_id, agent['host'])
|
||||
self.agent_rpc.delete_member(context, member, agent['host'])
|
||||
|
||||
def create_pool_health_monitor(self, context, healthmon, pool_id):
|
||||
# healthmon is not used here
|
||||
agent = self.get_pool_agent(context, pool_id)
|
||||
self.agent_rpc.modify_pool(context, pool_id, agent['host'])
|
||||
self.agent_rpc.create_pool_health_monitor(context, healthmon,
|
||||
pool_id, agent['host'])
|
||||
|
||||
def update_pool_health_monitor(self, context, old_health_monitor,
|
||||
health_monitor, pool_id):
|
||||
agent = self.get_pool_agent(context, pool_id)
|
||||
self.agent_rpc.update_pool_health_monitor(context, old_health_monitor,
|
||||
health_monitor, pool_id,
|
||||
agent['host'])
|
||||
|
||||
def delete_pool_health_monitor(self, context, health_monitor, pool_id):
|
||||
self.plugin._delete_db_pool_health_monitor(
|
||||
context, health_monitor['id'], pool_id
|
||||
)
|
||||
|
||||
# h |