diff --git a/neutron/common/topics.py b/neutron/common/topics.py index 058383eff..5e23bce25 100644 --- a/neutron/common/topics.py +++ b/neutron/common/topics.py @@ -29,10 +29,12 @@ L3PLUGIN = 'q-l3-plugin' DHCP = 'q-dhcp-notifer' FIREWALL_PLUGIN = 'q-firewall-plugin' METERING_PLUGIN = 'q-metering-plugin' +LOADBALANCER_PLUGIN = 'n-lbaas-plugin' L3_AGENT = 'l3_agent' DHCP_AGENT = 'dhcp_agent' METERING_AGENT = 'metering_agent' +LOADBALANCER_AGENT = 'n-lbaas_agent' def get_topic_name(prefix, table, operation, host=None): diff --git a/neutron/services/loadbalancer/agent/__init__.py b/neutron/services/loadbalancer/agent/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/neutron/services/loadbalancer/drivers/haproxy/agent.py b/neutron/services/loadbalancer/agent/agent.py similarity index 92% rename from neutron/services/loadbalancer/drivers/haproxy/agent.py rename to neutron/services/loadbalancer/agent/agent.py index 71e123ff2..64f661c91 100644 --- a/neutron/services/loadbalancer/drivers/haproxy/agent.py +++ b/neutron/services/loadbalancer/agent/agent.py @@ -22,12 +22,10 @@ from oslo.config import cfg from neutron.agent.common import config from neutron.agent.linux import interface from neutron.common import legacy +from neutron.common import topics from neutron.openstack.common.rpc import service as rpc_service from neutron.openstack.common import service -from neutron.services.loadbalancer.drivers.haproxy import ( - agent_manager as manager, - plugin_driver -) +from neutron.services.loadbalancer.agent import agent_manager as manager OPTS = [ cfg.IntOpt( @@ -65,7 +63,7 @@ def main(): mgr = manager.LbaasAgentManager(cfg.CONF) svc = LbaasAgentService( host=cfg.CONF.host, - topic=plugin_driver.TOPIC_LOADBALANCER_AGENT, + topic=topics.LOADBALANCER_AGENT, manager=mgr ) service.launch(svc).wait() diff --git a/neutron/services/loadbalancer/drivers/haproxy/agent_api.py b/neutron/services/loadbalancer/agent/agent_api.py similarity index 100% rename from neutron/services/loadbalancer/drivers/haproxy/agent_api.py rename to neutron/services/loadbalancer/agent/agent_api.py diff --git a/neutron/services/loadbalancer/drivers/agent_device_driver.py b/neutron/services/loadbalancer/agent/agent_device_driver.py similarity index 100% rename from neutron/services/loadbalancer/drivers/agent_device_driver.py rename to neutron/services/loadbalancer/agent/agent_device_driver.py diff --git a/neutron/services/loadbalancer/drivers/haproxy/agent_manager.py b/neutron/services/loadbalancer/agent/agent_manager.py similarity index 98% rename from neutron/services/loadbalancer/drivers/haproxy/agent_manager.py rename to neutron/services/loadbalancer/agent/agent_manager.py index 7c6d20ab6..9517fae9b 100644 --- a/neutron/services/loadbalancer/drivers/haproxy/agent_manager.py +++ b/neutron/services/loadbalancer/agent/agent_manager.py @@ -21,16 +21,14 @@ from oslo.config import cfg from neutron.agent import rpc as agent_rpc from neutron.common import constants as n_const from neutron.common import exceptions as n_exc +from neutron.common import topics from neutron import context from neutron.openstack.common import importutils from neutron.openstack.common import log as logging from neutron.openstack.common import loopingcall from neutron.openstack.common import periodic_task from neutron.plugins.common import constants -from neutron.services.loadbalancer.drivers.haproxy import ( - agent_api, - plugin_driver -) +from neutron.services.loadbalancer.agent import agent_api LOG = logging.getLogger(__name__) @@ -67,7 +65,7 @@ class LbaasAgentManager(periodic_task.PeriodicTasks): self.conf = conf self.context = context.get_admin_context_without_session() self.plugin_rpc = agent_api.LbaasAgentApi( - plugin_driver.TOPIC_LOADBALANCER_PLUGIN, + topics.LOADBALANCER_PLUGIN, self.context, self.conf.host ) @@ -76,7 +74,7 @@ class LbaasAgentManager(periodic_task.PeriodicTasks): self.agent_state = { 'binary': 'neutron-lbaas-agent', 'host': conf.host, - 'topic': plugin_driver.TOPIC_LOADBALANCER_AGENT, + 'topic': topics.LOADBALANCER_AGENT, 'configurations': {'device_drivers': self.device_drivers.keys()}, 'agent_type': n_const.AGENT_TYPE_LOADBALANCER, 'start_flag': True} @@ -109,7 +107,7 @@ class LbaasAgentManager(periodic_task.PeriodicTasks): def _setup_state_rpc(self): self.state_rpc = agent_rpc.PluginReportStateAPI( - plugin_driver.TOPIC_LOADBALANCER_PLUGIN) + topics.LOADBALANCER_PLUGIN) report_interval = self.conf.AGENT.report_interval if report_interval: heartbeat = loopingcall.FixedIntervalLoopingCall( diff --git a/neutron/services/loadbalancer/drivers/common/__init__.py b/neutron/services/loadbalancer/drivers/common/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/neutron/services/loadbalancer/drivers/common/agent_driver_base.py b/neutron/services/loadbalancer/drivers/common/agent_driver_base.py new file mode 100644 index 000000000..d1a0a6fd5 --- /dev/null +++ b/neutron/services/loadbalancer/drivers/common/agent_driver_base.py @@ -0,0 +1,449 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# +# Copyright 2013 New Dream Network, LLC (DreamHost) +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# @author: Mark McClain, DreamHost + +import uuid + +from oslo.config import cfg + +from neutron.common import constants as q_const +from neutron.common import exceptions as q_exc +from neutron.common import rpc as q_rpc +from neutron.common import topics +from neutron.db import agents_db +from neutron.db.loadbalancer import loadbalancer_db +from neutron.extensions import lbaas_agentscheduler +from neutron.extensions import portbindings +from neutron.openstack.common import importutils +from neutron.openstack.common import log as logging +from neutron.openstack.common import rpc +from neutron.openstack.common.rpc import proxy +from neutron.plugins.common import constants +from neutron.services.loadbalancer.drivers import abstract_driver + +LOG = logging.getLogger(__name__) + +AGENT_SCHEDULER_OPTS = [ + cfg.StrOpt('loadbalancer_pool_scheduler_driver', + default='neutron.services.loadbalancer.agent_scheduler' + '.ChanceScheduler', + help=_('Driver to use for scheduling ' + 'pool to a default loadbalancer agent')), +] + +cfg.CONF.register_opts(AGENT_SCHEDULER_OPTS) + + +class DriverNotSpecified(q_exc.NeutronException): + message = _("Device driver for agent should be specified " + "in plugin driver.") + + +class LoadBalancerCallbacks(object): + + RPC_API_VERSION = '2.0' + # history + # 1.0 Initial version + # 2.0 Generic API for agent based drivers + # - get_logical_device() handling changed; + # - pool_deployed() and update_status() methods added; + + def __init__(self, plugin): + self.plugin = plugin + + def create_rpc_dispatcher(self): + return q_rpc.PluginRpcDispatcher( + [self, agents_db.AgentExtRpcCallback(self.plugin)]) + + def get_ready_devices(self, context, host=None): + with context.session.begin(subtransactions=True): + agents = self.plugin.get_lbaas_agents(context, + filters={'host': [host]}) + if not agents: + return [] + elif len(agents) > 1: + LOG.warning(_('Multiple lbaas agents found on host %s'), host) + pools = self.plugin.list_pools_on_lbaas_agent(context, + agents[0].id) + pool_ids = [pool['id'] for pool in pools['pools']] + + qry = context.session.query(loadbalancer_db.Pool.id) + qry = qry.filter(loadbalancer_db.Pool.id.in_(pool_ids)) + qry = qry.filter( + loadbalancer_db.Pool.status.in_(constants.ACTIVE_PENDING)) + up = True # makes pep8 and sqlalchemy happy + qry = qry.filter(loadbalancer_db.Pool.admin_state_up == up) + return [id for id, in qry] + + def get_logical_device(self, context, pool_id=None): + with context.session.begin(subtransactions=True): + qry = context.session.query(loadbalancer_db.Pool) + qry = qry.filter_by(id=pool_id) + pool = qry.one() + + if pool.status != constants.ACTIVE: + raise q_exc.Invalid(_('Expected active pool')) + + retval = {} + retval['pool'] = self.plugin._make_pool_dict(pool) + + if pool.vip: + retval['vip'] = self.plugin._make_vip_dict(pool.vip) + retval['vip']['port'] = ( + self.plugin._core_plugin._make_port_dict(pool.vip.port) + ) + for fixed_ip in retval['vip']['port']['fixed_ips']: + fixed_ip['subnet'] = ( + self.plugin._core_plugin.get_subnet( + context, + fixed_ip['subnet_id'] + ) + ) + retval['members'] = [ + self.plugin._make_member_dict(m) + for m in pool.members if ( + m.status in constants.ACTIVE_PENDING or + m.status == constants.INACTIVE) + ] + retval['healthmonitors'] = [ + self.plugin._make_health_monitor_dict(hm.healthmonitor) + for hm in pool.monitors + if hm.status in constants.ACTIVE_PENDING + ] + retval['driver'] = ( + self.plugin.drivers[pool.provider.provider_name].device_driver) + + return retval + + def pool_deployed(self, context, pool_id): + with context.session.begin(subtransactions=True): + qry = context.session.query(loadbalancer_db.Pool) + qry = qry.filter_by(id=pool_id) + pool = qry.one() + + # set all resources to active + if pool.status in constants.ACTIVE_PENDING: + pool.status = constants.ACTIVE + + if pool.vip and pool.vip.status in constants.ACTIVE_PENDING: + pool.vip.status = constants.ACTIVE + + for m in pool.members: + if m.status in constants.ACTIVE_PENDING: + m.status = constants.ACTIVE + + for hm in pool.monitors: + if hm.status in constants.ACTIVE_PENDING: + hm.status = constants.ACTIVE + + def update_status(self, context, obj_type, obj_id, status): + model_mapping = { + 'pool': loadbalancer_db.Pool, + 'vip': loadbalancer_db.Vip, + 'member': loadbalancer_db.Member, + 'health_monitor': loadbalancer_db.PoolMonitorAssociation + } + if obj_type not in model_mapping: + raise q_exc.Invalid(_('Unknown object type: %s') % obj_type) + try: + if obj_type == 'health_monitor': + self.plugin.update_pool_health_monitor( + context, obj_id['monitor_id'], obj_id['pool_id'], status) + else: + self.plugin.update_status( + context, model_mapping[obj_type], obj_id, status) + except q_exc.NotFound: + # update_status may come from agent on an object which was + # already deleted from db with other request + LOG.warning(_('Cannot update status: %(obj_type)s %(obj_id)s ' + 'not found in the DB, it was probably deleted ' + 'concurrently'), + {'obj_type': obj_type, 'obj_id': obj_id}) + + def pool_destroyed(self, context, pool_id=None): + """Agent confirmation hook that a pool has been destroyed. + + This method exists for subclasses to change the deletion + behavior. + """ + pass + + def plug_vip_port(self, context, port_id=None, host=None): + if not port_id: + return + + try: + port = self.plugin._core_plugin.get_port( + context, + port_id + ) + except q_exc.PortNotFound: + msg = _('Unable to find port %s to plug.') + LOG.debug(msg, port_id) + return + + port['admin_state_up'] = True + port['device_owner'] = 'neutron:' + constants.LOADBALANCER + port['device_id'] = str(uuid.uuid5(uuid.NAMESPACE_DNS, str(host))) + port[portbindings.HOST_ID] = host + self.plugin._core_plugin.update_port( + context, + port_id, + {'port': port} + ) + + def unplug_vip_port(self, context, port_id=None, host=None): + if not port_id: + return + + try: + port = self.plugin._core_plugin.get_port( + context, + port_id + ) + except q_exc.PortNotFound: + msg = _('Unable to find port %s to unplug. This can occur when ' + 'the Vip has been deleted first.') + LOG.debug(msg, port_id) + return + + port['admin_state_up'] = False + port['device_owner'] = '' + port['device_id'] = '' + + try: + self.plugin._core_plugin.update_port( + context, + port_id, + {'port': port} + ) + + except q_exc.PortNotFound: + msg = _('Unable to find port %s to unplug. This can occur when ' + 'the Vip has been deleted first.') + LOG.debug(msg, port_id) + + def update_pool_stats(self, context, pool_id=None, stats=None, host=None): + self.plugin.update_pool_stats(context, pool_id, data=stats) + + +class LoadBalancerAgentApi(proxy.RpcProxy): + """Plugin side of plugin to agent RPC API.""" + + BASE_RPC_API_VERSION = '2.0' + # history + # 1.0 Initial version + # 1.1 Support agent_updated call + # 2.0 Generic API for agent based drivers + # - modify/reload/destroy_pool methods were removed; + # - added methods to handle create/update/delete for every lbaas + # object individually; + + def __init__(self, topic): + super(LoadBalancerAgentApi, self).__init__( + topic, default_version=self.BASE_RPC_API_VERSION) + + def _cast(self, context, method_name, method_args, host, version=None): + return self.cast( + context, + self.make_msg(method_name, **method_args), + topic='%s.%s' % (self.topic, host), + version=version + ) + + def create_vip(self, context, vip, host): + return self._cast(context, 'create_vip', {'vip': vip}, host) + + def update_vip(self, context, old_vip, vip, host): + return self._cast(context, 'update_vip', + {'old_vip': old_vip, 'vip': vip}, host) + + def delete_vip(self, context, vip, host): + return self._cast(context, 'delete_vip', {'vip': vip}, host) + + def create_pool(self, context, pool, host, driver_name): + return self._cast(context, 'create_pool', + {'pool': pool, 'driver_name': driver_name}, host) + + def update_pool(self, context, old_pool, pool, host): + return self._cast(context, 'update_pool', + {'old_pool': old_pool, 'pool': pool}, host) + + def delete_pool(self, context, pool, host): + return self._cast(context, 'delete_pool', {'pool': pool}, host) + + def create_member(self, context, member, host): + return self._cast(context, 'create_member', {'member': member}, host) + + def update_member(self, context, old_member, member, host): + return self._cast(context, 'update_member', + {'old_member': old_member, 'member': member}, host) + + def delete_member(self, context, member, host): + return self._cast(context, 'delete_member', {'member': member}, host) + + def create_pool_health_monitor(self, context, health_monitor, pool_id, + host): + return self._cast(context, 'create_pool_health_monitor', + {'health_monitor': health_monitor, + 'pool_id': pool_id}, host) + + def update_pool_health_monitor(self, context, old_health_monitor, + health_monitor, pool_id, host): + return self._cast(context, 'update_pool_health_monitor', + {'old_health_monitor': old_health_monitor, + 'health_monitor': health_monitor, + 'pool_id': pool_id}, host) + + def delete_pool_health_monitor(self, context, health_monitor, pool_id, + host): + return self._cast(context, 'delete_pool_health_monitor', + {'health_monitor': health_monitor, + 'pool_id': pool_id}, host) + + def agent_updated(self, context, admin_state_up, host): + return self._cast(context, 'agent_updated', + {'payload': {'admin_state_up': admin_state_up}}, + host) + + +class AgentDriverBase(abstract_driver.LoadBalancerAbstractDriver): + + # name of device driver that should be used by the agent; + # vendor specific plugin drivers must override it; + device_driver = None + + def __init__(self, plugin): + if not self.device_driver: + raise DriverNotSpecified() + + self.agent_rpc = LoadBalancerAgentApi(topics.LOADBALANCER_AGENT) + + self.plugin = plugin + self._set_callbacks_on_plugin() + self.plugin.agent_notifiers.update( + {q_const.AGENT_TYPE_LOADBALANCER: self.agent_rpc}) + + self.pool_scheduler = importutils.import_object( + cfg.CONF.loadbalancer_pool_scheduler_driver) + + def _set_callbacks_on_plugin(self): + # other agent based plugin driver might already set callbacks on plugin + if hasattr(self.plugin, 'agent_callbacks'): + return + + self.plugin.agent_callbacks = LoadBalancerCallbacks(self.plugin) + self.plugin.conn = rpc.create_connection(new=True) + self.plugin.conn.create_consumer( + topics.LOADBALANCER_PLUGIN, + self.plugin.agent_callbacks.create_rpc_dispatcher(), + fanout=False) + self.plugin.conn.consume_in_thread() + + def get_pool_agent(self, context, pool_id): + agent = self.plugin.get_lbaas_agent_hosting_pool(context, pool_id) + if not agent: + raise lbaas_agentscheduler.NoActiveLbaasAgent(pool_id=pool_id) + return agent['agent'] + + def create_vip(self, context, vip): + agent = self.get_pool_agent(context, vip['pool_id']) + self.agent_rpc.create_vip(context, vip, agent['host']) + + def update_vip(self, context, old_vip, vip): + agent = self.get_pool_agent(context, vip['pool_id']) + if vip['status'] in constants.ACTIVE_PENDING: + self.agent_rpc.update_vip(context, old_vip, vip, agent['host']) + else: + self.agent_rpc.delete_vip(context, vip, agent['host']) + + def delete_vip(self, context, vip): + self.plugin._delete_db_vip(context, vip['id']) + agent = self.get_pool_agent(context, vip['pool_id']) + self.agent_rpc.delete_vip(context, vip, agent['host']) + + def create_pool(self, context, pool): + agent = self.pool_scheduler.schedule(self.plugin, context, pool, + self.device_driver) + if not agent: + raise lbaas_agentscheduler.NoEligibleLbaasAgent(pool_id=pool['id']) + self.agent_rpc.create_pool(context, pool, agent['host'], + self.device_driver) + + def update_pool(self, context, old_pool, pool): + agent = self.get_pool_agent(context, pool['id']) + if pool['status'] in constants.ACTIVE_PENDING: + self.agent_rpc.update_pool(context, old_pool, pool, + agent['host']) + else: + self.agent_rpc.delete_pool(context, pool, agent['host']) + + def delete_pool(self, context, pool): + # get agent first to know host as binding will be deleted + # after pool is deleted from db + agent = self.plugin.get_lbaas_agent_hosting_pool(context, pool['id']) + self.plugin._delete_db_pool(context, pool['id']) + if agent: + self.agent_rpc.delete_pool(context, pool, agent['agent']['host']) + + def create_member(self, context, member): + agent = self.get_pool_agent(context, member['pool_id']) + self.agent_rpc.create_member(context, member, agent['host']) + + def update_member(self, context, old_member, member): + agent = self.get_pool_agent(context, member['pool_id']) + # member may change pool id + if member['pool_id'] != old_member['pool_id']: + old_pool_agent = self.plugin.get_lbaas_agent_hosting_pool( + context, old_member['pool_id']) + if old_pool_agent: + self.agent_rpc.delete_member(context, old_member, + old_pool_agent['agent']['host']) + self.agent_rpc.create_member(context, member, agent['host']) + else: + self.agent_rpc.update_member(context, old_member, member, + agent['host']) + + def delete_member(self, context, member): + self.plugin._delete_db_member(context, member['id']) + agent = self.get_pool_agent(context, member['pool_id']) + self.agent_rpc.delete_member(context, member, agent['host']) + + def create_pool_health_monitor(self, context, healthmon, pool_id): + # healthmon is not used here + agent = self.get_pool_agent(context, pool_id) + self.agent_rpc.create_pool_health_monitor(context, healthmon, + pool_id, agent['host']) + + def update_pool_health_monitor(self, context, old_health_monitor, + health_monitor, pool_id): + agent = self.get_pool_agent(context, pool_id) + self.agent_rpc.update_pool_health_monitor(context, old_health_monitor, + health_monitor, pool_id, + agent['host']) + + def delete_pool_health_monitor(self, context, health_monitor, pool_id): + self.plugin._delete_db_pool_health_monitor( + context, health_monitor['id'], pool_id + ) + + agent = self.get_pool_agent(context, pool_id) + self.agent_rpc.delete_pool_health_monitor(context, health_monitor, + pool_id, agent['host']) + + def stats(self, context, pool_id): + pass diff --git a/neutron/services/loadbalancer/drivers/haproxy/namespace_driver.py b/neutron/services/loadbalancer/drivers/haproxy/namespace_driver.py index c3aa7adfe..f815ea226 100644 --- a/neutron/services/loadbalancer/drivers/haproxy/namespace_driver.py +++ b/neutron/services/loadbalancer/drivers/haproxy/namespace_driver.py @@ -30,8 +30,8 @@ from neutron.common import utils as n_utils from neutron.openstack.common import importutils from neutron.openstack.common import log as logging from neutron.plugins.common import constants +from neutron.services.loadbalancer.agent import agent_device_driver from neutron.services.loadbalancer import constants as lb_const -from neutron.services.loadbalancer.drivers import agent_device_driver from neutron.services.loadbalancer.drivers.haproxy import cfg as hacfg LOG = logging.getLogger(__name__) diff --git a/neutron/services/loadbalancer/drivers/haproxy/plugin_driver.py b/neutron/services/loadbalancer/drivers/haproxy/plugin_driver.py index 495014180..7dccaa3ac 100644 --- a/neutron/services/loadbalancer/drivers/haproxy/plugin_driver.py +++ b/neutron/services/loadbalancer/drivers/haproxy/plugin_driver.py @@ -1,6 +1,7 @@ # vim: tabstop=4 shiftwidth=4 softtabstop=4 -# -# Copyright 2013 New Dream Network, LLC (DreamHost) + +# Copyright (c) 2013 OpenStack Foundation. +# All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain @@ -13,447 +14,10 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. -# -# @author: Mark McClain, DreamHost -import uuid +from neutron.services.loadbalancer.drivers.common import agent_driver_base +from neutron.services.loadbalancer.drivers.haproxy import namespace_driver -from oslo.config import cfg -from neutron.common import constants as q_const -from neutron.common import exceptions as q_exc -from neutron.common import rpc as q_rpc -from neutron.db import agents_db -from neutron.db.loadbalancer import loadbalancer_db -from neutron.extensions import lbaas_agentscheduler -from neutron.extensions import portbindings -from neutron.openstack.common import importutils -from neutron.openstack.common import log as logging -from neutron.openstack.common import rpc -from neutron.openstack.common.rpc import proxy -from neutron.plugins.common import constants -from neutron.services.loadbalancer.drivers import abstract_driver - -LOG = logging.getLogger(__name__) - -AGENT_SCHEDULER_OPTS = [ - cfg.StrOpt('loadbalancer_pool_scheduler_driver', - default='neutron.services.loadbalancer.agent_scheduler' - '.ChanceScheduler', - help=_('Driver to use for scheduling ' - 'pool to a default loadbalancer agent')), -] - -cfg.CONF.register_opts(AGENT_SCHEDULER_OPTS) - -# topic name for this particular agent implementation -TOPIC_LOADBALANCER_PLUGIN = 'n-lbaas-plugin' -TOPIC_LOADBALANCER_AGENT = 'n-lbaas_agent' - - -class DriverNotSpecified(q_exc.NeutronException): - message = _("Device driver for agent should be specified " - "in plugin driver.") - - -class LoadBalancerCallbacks(object): - - RPC_API_VERSION = '2.0' - # history - # 1.0 Initial version - # 2.0 Generic API for agent based drivers - # - get_logical_device() handling changed; - # - pool_deployed() and update_status() methods added; - - def __init__(self, plugin): - self.plugin = plugin - - def create_rpc_dispatcher(self): - return q_rpc.PluginRpcDispatcher( - [self, agents_db.AgentExtRpcCallback(self.plugin)]) - - def get_ready_devices(self, context, host=None): - with context.session.begin(subtransactions=True): - agents = self.plugin.get_lbaas_agents(context, - filters={'host': [host]}) - if not agents: - return [] - elif len(agents) > 1: - LOG.warning(_('Multiple lbaas agents found on host %s'), host) - pools = self.plugin.list_pools_on_lbaas_agent(context, - agents[0].id) - pool_ids = [pool['id'] for pool in pools['pools']] - - qry = context.session.query(loadbalancer_db.Pool.id) - qry = qry.filter(loadbalancer_db.Pool.id.in_(pool_ids)) - qry = qry.filter( - loadbalancer_db.Pool.status.in_(constants.ACTIVE_PENDING)) - up = True # makes pep8 and sqlalchemy happy - qry = qry.filter(loadbalancer_db.Pool.admin_state_up == up) - return [id for id, in qry] - - def get_logical_device(self, context, pool_id=None): - with context.session.begin(subtransactions=True): - qry = context.session.query(loadbalancer_db.Pool) - qry = qry.filter_by(id=pool_id) - pool = qry.one() - - if pool.status != constants.ACTIVE: - raise q_exc.Invalid(_('Expected active pool')) - - retval = {} - retval['pool'] = self.plugin._make_pool_dict(pool) - - if pool.vip: - retval['vip'] = self.plugin._make_vip_dict(pool.vip) - retval['vip']['port'] = ( - self.plugin._core_plugin._make_port_dict(pool.vip.port) - ) - for fixed_ip in retval['vip']['port']['fixed_ips']: - fixed_ip['subnet'] = ( - self.plugin._core_plugin.get_subnet( - context, - fixed_ip['subnet_id'] - ) - ) - retval['members'] = [ - self.plugin._make_member_dict(m) - for m in pool.members if ( - m.status in constants.ACTIVE_PENDING or - m.status == constants.INACTIVE) - ] - retval['healthmonitors'] = [ - self.plugin._make_health_monitor_dict(hm.healthmonitor) - for hm in pool.monitors - if hm.status in constants.ACTIVE_PENDING - ] - retval['driver'] = ( - self.plugin.drivers[pool.provider.provider_name].device_driver) - - return retval - - def pool_deployed(self, context, pool_id): - with context.session.begin(subtransactions=True): - qry = context.session.query(loadbalancer_db.Pool) - qry = qry.filter_by(id=pool_id) - pool = qry.one() - - # set all resources to active - if pool.status in constants.ACTIVE_PENDING: - pool.status = constants.ACTIVE - - if pool.vip and pool.vip.status in constants.ACTIVE_PENDING: - pool.vip.status = constants.ACTIVE - - for m in pool.members: - if m.status in constants.ACTIVE_PENDING: - m.status = constants.ACTIVE - - for hm in pool.monitors: - if hm.status in constants.ACTIVE_PENDING: - hm.status = constants.ACTIVE - - def update_status(self, context, obj_type, obj_id, status): - model_mapping = { - 'pool': loadbalancer_db.Pool, - 'vip': loadbalancer_db.Vip, - 'member': loadbalancer_db.Member, - 'health_monitor': loadbalancer_db.PoolMonitorAssociation - } - if obj_type not in model_mapping: - raise q_exc.Invalid(_('Unknown object type: %s') % obj_type) - try: - if obj_type == 'health_monitor': - self.plugin.update_pool_health_monitor( - context, obj_id['monitor_id'], obj_id['pool_id'], status) - else: - self.plugin.update_status( - context, model_mapping[obj_type], obj_id, status) - except q_exc.NotFound: - # update_status may come from agent on an object which was - # already deleted from db with other request - LOG.warning(_('Cannot update status: %(obj_type)s %(obj_id)s ' - 'not found in the DB, it was probably deleted ' - 'concurrently'), - {'obj_type': obj_type, 'obj_id': obj_id}) - - def pool_destroyed(self, context, pool_id=None): - """Agent confirmation hook that a pool has been destroyed. - - This method exists for subclasses to change the deletion - behavior. - """ - pass - - def plug_vip_port(self, context, port_id=None, host=None): - if not port_id: - return - - try: - port = self.plugin._core_plugin.get_port( - context, - port_id - ) - except q_exc.PortNotFound: - msg = _('Unable to find port %s to plug.') - LOG.debug(msg, port_id) - return - - port['admin_state_up'] = True - port['device_owner'] = 'neutron:' + constants.LOADBALANCER - port['device_id'] = str(uuid.uuid5(uuid.NAMESPACE_DNS, str(host))) - port[portbindings.HOST_ID] = host - self.plugin._core_plugin.update_port( - context, - port_id, - {'port': port} - ) - - def unplug_vip_port(self, context, port_id=None, host=None): - if not port_id: - return - - try: - port = self.plugin._core_plugin.get_port( - context, - port_id - ) - except q_exc.PortNotFound: - msg = _('Unable to find port %s to unplug. This can occur when ' - 'the Vip has been deleted first.') - LOG.debug(msg, port_id) - return - - port['admin_state_up'] = False - port['device_owner'] = '' - port['device_id'] = '' - - try: - self.plugin._core_plugin.update_port( - context, - port_id, - {'port': port} - ) - - except q_exc.PortNotFound: - msg = _('Unable to find port %s to unplug. This can occur when ' - 'the Vip has been deleted first.') - LOG.debug(msg, port_id) - - def update_pool_stats(self, context, pool_id=None, stats=None, host=None): - self.plugin.update_pool_stats(context, pool_id, data=stats) - - -class LoadBalancerAgentApi(proxy.RpcProxy): - """Plugin side of plugin to agent RPC API.""" - - BASE_RPC_API_VERSION = '2.0' - # history - # 1.0 Initial version - # 1.1 Support agent_updated call - # 2.0 Generic API for agent based drivers - # - modify/reload/destroy_pool methods were removed; - # - added methods to handle create/update/delete for every lbaas - # object individually; - - def __init__(self, topic): - super(LoadBalancerAgentApi, self).__init__( - topic, default_version=self.BASE_RPC_API_VERSION) - - def _cast(self, context, method_name, method_args, host, version=None): - return self.cast( - context, - self.make_msg(method_name, **method_args), - topic='%s.%s' % (self.topic, host), - version=version - ) - - def create_vip(self, context, vip, host): - return self._cast(context, 'create_vip', {'vip': vip}, host) - - def update_vip(self, context, old_vip, vip, host): - return self._cast(context, 'update_vip', - {'old_vip': old_vip, 'vip': vip}, host) - - def delete_vip(self, context, vip, host): - return self._cast(context, 'delete_vip', {'vip': vip}, host) - - def create_pool(self, context, pool, host, driver_name): - return self._cast(context, 'create_pool', - {'pool': pool, 'driver_name': driver_name}, host) - - def update_pool(self, context, old_pool, pool, host): - return self._cast(context, 'update_pool', - {'old_pool': old_pool, 'pool': pool}, host) - - def delete_pool(self, context, pool, host): - return self._cast(context, 'delete_pool', {'pool': pool}, host) - - def create_member(self, context, member, host): - return self._cast(context, 'create_member', {'member': member}, host) - - def update_member(self, context, old_member, member, host): - return self._cast(context, 'update_member', - {'old_member': old_member, 'member': member}, host) - - def delete_member(self, context, member, host): - return self._cast(context, 'delete_member', {'member': member}, host) - - def create_pool_health_monitor(self, context, health_monitor, pool_id, - host): - return self._cast(context, 'create_pool_health_monitor', - {'health_monitor': health_monitor, - 'pool_id': pool_id}, host) - - def update_pool_health_monitor(self, context, old_health_monitor, - health_monitor, pool_id, host): - return self._cast(context, 'update_pool_health_monitor', - {'old_health_monitor': old_health_monitor, - 'health_monitor': health_monitor, - 'pool_id': pool_id}, host) - - def delete_pool_health_monitor(self, context, health_monitor, pool_id, - host): - return self._cast(context, 'delete_pool_health_monitor', - {'health_monitor': health_monitor, - 'pool_id': pool_id}, host) - - def agent_updated(self, context, admin_state_up, host): - return self._cast(context, 'agent_updated', - {'payload': {'admin_state_up': admin_state_up}}, - host) - - -class AgentBasedPluginDriver(abstract_driver.LoadBalancerAbstractDriver): - - # name of device driver that should be used by the agent; - # vendor specific plugin drivers must override it; - device_driver = None - - def __init__(self, plugin): - if not self.device_driver: - raise DriverNotSpecified() - - self.agent_rpc = LoadBalancerAgentApi(TOPIC_LOADBALANCER_AGENT) - - self.plugin = plugin - self._set_callbacks_on_plugin() - self.plugin.agent_notifiers.update( - {q_const.AGENT_TYPE_LOADBALANCER: self.agent_rpc}) - - self.pool_scheduler = importutils.import_object( - cfg.CONF.loadbalancer_pool_scheduler_driver) - - def _set_callbacks_on_plugin(self): - # other agent based plugin driver might already set callbacks on plugin - if hasattr(self.plugin, 'agent_callbacks'): - return - - self.plugin.agent_callbacks = LoadBalancerCallbacks(self.plugin) - self.plugin.conn = rpc.create_connection(new=True) - self.plugin.conn.create_consumer( - TOPIC_LOADBALANCER_PLUGIN, - self.plugin.agent_callbacks.create_rpc_dispatcher(), - fanout=False) - self.plugin.conn.consume_in_thread() - - def get_pool_agent(self, context, pool_id): - agent = self.plugin.get_lbaas_agent_hosting_pool(context, pool_id) - if not agent: - raise lbaas_agentscheduler.NoActiveLbaasAgent(pool_id=pool_id) - return agent['agent'] - - def create_vip(self, context, vip): - agent = self.get_pool_agent(context, vip['pool_id']) - self.agent_rpc.create_vip(context, vip, agent['host']) - - def update_vip(self, context, old_vip, vip): - agent = self.get_pool_agent(context, vip['pool_id']) - if vip['status'] in constants.ACTIVE_PENDING: - self.agent_rpc.update_vip(context, old_vip, vip, agent['host']) - else: - self.agent_rpc.delete_vip(context, vip, agent['host']) - - def delete_vip(self, context, vip): - self.plugin._delete_db_vip(context, vip['id']) - agent = self.get_pool_agent(context, vip['pool_id']) - self.agent_rpc.delete_vip(context, vip, agent['host']) - - def create_pool(self, context, pool): - agent = self.pool_scheduler.schedule(self.plugin, context, pool, - self.device_driver) - if not agent: - raise lbaas_agentscheduler.NoEligibleLbaasAgent(pool_id=pool['id']) - self.agent_rpc.create_pool(context, pool, agent['host'], - self.device_driver) - - def update_pool(self, context, old_pool, pool): - agent = self.get_pool_agent(context, pool['id']) - if pool['status'] in constants.ACTIVE_PENDING: - self.agent_rpc.update_pool(context, old_pool, pool, - agent['host']) - else: - self.agent_rpc.delete_pool(context, pool, agent['host']) - - def delete_pool(self, context, pool): - # get agent first to know host as binding will be deleted - # after pool is deleted from db - agent = self.plugin.get_lbaas_agent_hosting_pool(context, pool['id']) - self.plugin._delete_db_pool(context, pool['id']) - if agent: - self.agent_rpc.delete_pool(context, pool, agent['agent']['host']) - - def create_member(self, context, member): - agent = self.get_pool_agent(context, member['pool_id']) - self.agent_rpc.create_member(context, member, agent['host']) - - def update_member(self, context, old_member, member): - agent = self.get_pool_agent(context, member['pool_id']) - # member may change pool id - if member['pool_id'] != old_member['pool_id']: - old_pool_agent = self.plugin.get_lbaas_agent_hosting_pool( - context, old_member['pool_id']) - if old_pool_agent: - self.agent_rpc.delete_member(context, old_member, - old_pool_agent['agent']['host']) - self.agent_rpc.create_member(context, member, agent['host']) - else: - self.agent_rpc.update_member(context, old_member, member, - agent['host']) - - def delete_member(self, context, member): - self.plugin._delete_db_member(context, member['id']) - agent = self.get_pool_agent(context, member['pool_id']) - self.agent_rpc.delete_member(context, member, agent['host']) - - def create_pool_health_monitor(self, context, healthmon, pool_id): - # healthmon is not used here - agent = self.get_pool_agent(context, pool_id) - self.agent_rpc.create_pool_health_monitor(context, healthmon, - pool_id, agent['host']) - - def update_pool_health_monitor(self, context, old_health_monitor, - health_monitor, pool_id): - agent = self.get_pool_agent(context, pool_id) - self.agent_rpc.update_pool_health_monitor(context, old_health_monitor, - health_monitor, pool_id, - agent['host']) - - def delete_pool_health_monitor(self, context, health_monitor, pool_id): - self.plugin._delete_db_pool_health_monitor( - context, health_monitor['id'], pool_id - ) - - agent = self.get_pool_agent(context, pool_id) - self.agent_rpc.delete_pool_health_monitor(context, health_monitor, - pool_id, agent['host']) - - def stats(self, context, pool_id): - pass - - -class HaproxyOnHostPluginDriver(AgentBasedPluginDriver): - #TODO(obondarev): change hardcoded driver name - # to namespace_driver.DRIVER_NAME after moving HaproxyOnHostPluginDriver - # to a separate file (follow-up patch) - device_driver = 'haproxy_ns' +class HaproxyOnHostPluginDriver(agent_driver_base.AgentDriverBase): + device_driver = namespace_driver.DRIVER_NAME diff --git a/neutron/tests/unit/services/loadbalancer/agent/__init__.py b/neutron/tests/unit/services/loadbalancer/agent/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/neutron/tests/unit/services/loadbalancer/drivers/haproxy/test_agent.py b/neutron/tests/unit/services/loadbalancer/agent/test_agent.py similarity index 96% rename from neutron/tests/unit/services/loadbalancer/drivers/haproxy/test_agent.py rename to neutron/tests/unit/services/loadbalancer/agent/test_agent.py index 7d27d517e..2ff2e8953 100644 --- a/neutron/tests/unit/services/loadbalancer/drivers/haproxy/test_agent.py +++ b/neutron/tests/unit/services/loadbalancer/agent/test_agent.py @@ -20,7 +20,7 @@ import contextlib import mock from oslo.config import cfg -from neutron.services.loadbalancer.drivers.haproxy import agent +from neutron.services.loadbalancer.agent import agent from neutron.tests import base diff --git a/neutron/tests/unit/services/loadbalancer/drivers/haproxy/test_agent_manager.py b/neutron/tests/unit/services/loadbalancer/agent/test_agent_manager.py similarity index 98% rename from neutron/tests/unit/services/loadbalancer/drivers/haproxy/test_agent_manager.py rename to neutron/tests/unit/services/loadbalancer/agent/test_agent_manager.py index 328dc22c9..fdff11b4d 100644 --- a/neutron/tests/unit/services/loadbalancer/drivers/haproxy/test_agent_manager.py +++ b/neutron/tests/unit/services/loadbalancer/agent/test_agent_manager.py @@ -21,9 +21,7 @@ import contextlib import mock from neutron.plugins.common import constants -from neutron.services.loadbalancer.drivers.haproxy import ( - agent_manager as manager -) +from neutron.services.loadbalancer.agent import agent_manager as manager from neutron.tests import base @@ -38,8 +36,7 @@ class TestManager(base.BaseTestCase): self.mock_importer = mock.patch.object(manager, 'importutils').start() rpc_mock_cls = mock.patch( - 'neutron.services.loadbalancer.drivers' - '.haproxy.agent_api.LbaasAgentApi' + 'neutron.services.loadbalancer.agent.agent_api.LbaasAgentApi' ).start() self.mgr = manager.LbaasAgentManager(mock_conf) diff --git a/neutron/tests/unit/services/loadbalancer/drivers/haproxy/test_api.py b/neutron/tests/unit/services/loadbalancer/agent/test_api.py similarity index 98% rename from neutron/tests/unit/services/loadbalancer/drivers/haproxy/test_api.py rename to neutron/tests/unit/services/loadbalancer/agent/test_api.py index 0d9ce3a39..dcd6025bd 100644 --- a/neutron/tests/unit/services/loadbalancer/drivers/haproxy/test_api.py +++ b/neutron/tests/unit/services/loadbalancer/agent/test_api.py @@ -18,9 +18,7 @@ import mock -from neutron.services.loadbalancer.drivers.haproxy import ( - agent_api as api -) +from neutron.services.loadbalancer.agent import agent_api as api from neutron.tests import base diff --git a/neutron/tests/unit/services/loadbalancer/drivers/haproxy/test_plugin_driver.py b/neutron/tests/unit/services/loadbalancer/drivers/test_agent_driver_base.py similarity index 97% rename from neutron/tests/unit/services/loadbalancer/drivers/haproxy/test_plugin_driver.py rename to neutron/tests/unit/services/loadbalancer/drivers/test_agent_driver_base.py index 2db78f34e..692ed0029 100644 --- a/neutron/tests/unit/services/loadbalancer/drivers/haproxy/test_plugin_driver.py +++ b/neutron/tests/unit/services/loadbalancer/drivers/test_agent_driver_base.py @@ -30,9 +30,7 @@ from neutron.extensions import portbindings from neutron import manager from neutron.openstack.common import uuidutils from neutron.plugins.common import constants -from neutron.services.loadbalancer.drivers.haproxy import ( - plugin_driver -) +from neutron.services.loadbalancer.drivers.common import agent_driver_base from neutron.tests import base from neutron.tests.unit.db.loadbalancer import test_db_loadbalancer from neutron.tests.unit import testlib_api @@ -43,20 +41,20 @@ class TestLoadBalancerPluginBase( def setUp(self): def reset_device_driver(): - plugin_driver.AgentBasedPluginDriver.device_driver = None + agent_driver_base.AgentDriverBase.device_driver = None self.addCleanup(reset_device_driver) self.mock_importer = mock.patch.object( - plugin_driver, 'importutils').start() + agent_driver_base, 'importutils').start() self.addCleanup(mock.patch.stopall) # needed to reload provider configuration st_db.ServiceTypeManager._instance = None - plugin_driver.AgentBasedPluginDriver.device_driver = 'dummy' + agent_driver_base.AgentDriverBase.device_driver = 'dummy' super(TestLoadBalancerPluginBase, self).setUp( lbaas_provider=('LOADBALANCER:lbaas:neutron.services.' - 'loadbalancer.drivers.haproxy.plugin_driver.' - 'AgentBasedPluginDriver:default')) + 'loadbalancer.drivers.common.agent_driver_base.' + 'AgentDriverBase:default')) # we need access to loaded plugins to modify models loaded_plugins = manager.NeutronManager().get_service_plugins() @@ -68,7 +66,7 @@ class TestLoadBalancerCallbacks(TestLoadBalancerPluginBase): def setUp(self): super(TestLoadBalancerCallbacks, self).setUp() - self.callbacks = plugin_driver.LoadBalancerCallbacks( + self.callbacks = agent_driver_base.LoadBalancerCallbacks( self.plugin_instance ) get_lbaas_agents_patcher = mock.patch( @@ -400,7 +398,7 @@ class TestLoadBalancerCallbacks(TestLoadBalancerPluginBase): self.assertEqual('ACTIVE', p['status']) def test_update_status_pool_deleted_already(self): - with mock.patch.object(plugin_driver, 'LOG') as mock_log: + with mock.patch.object(agent_driver_base, 'LOG') as mock_log: pool_id = 'deleted_pool' ctx = context.get_admin_context() self.assertRaises(loadbalancer.PoolNotFound, @@ -433,7 +431,7 @@ class TestLoadBalancerAgentApi(base.BaseTestCase): super(TestLoadBalancerAgentApi, self).setUp() self.addCleanup(mock.patch.stopall) - self.api = plugin_driver.LoadBalancerAgentApi('topic') + self.api = agent_driver_base.LoadBalancerAgentApi('topic') self.mock_cast = mock.patch.object(self.api, 'cast').start() self.mock_msg = mock.patch.object(self.api, 'make_msg').start() @@ -510,16 +508,16 @@ class TestLoadBalancerAgentApi(base.BaseTestCase): class TestLoadBalancerPluginNotificationWrapper(TestLoadBalancerPluginBase): def setUp(self): - self.log = mock.patch.object(plugin_driver, 'LOG') - api_cls = mock.patch.object(plugin_driver, + self.log = mock.patch.object(agent_driver_base, 'LOG') + api_cls = mock.patch.object(agent_driver_base, 'LoadBalancerAgentApi').start() super(TestLoadBalancerPluginNotificationWrapper, self).setUp() self.mock_api = api_cls.return_value self.mock_get_driver = mock.patch.object(self.plugin_instance, '_get_driver') - self.mock_get_driver.return_value = (plugin_driver. - AgentBasedPluginDriver( + self.mock_get_driver.return_value = (agent_driver_base. + AgentDriverBase( self.plugin_instance )) diff --git a/setup.cfg b/setup.cfg index d8c21da15..c31e07cb8 100644 --- a/setup.cfg +++ b/setup.cfg @@ -85,7 +85,7 @@ console_scripts = neutron-dhcp-agent = neutron.agent.dhcp_agent:main neutron-hyperv-agent = neutron.plugins.hyperv.agent.hyperv_neutron_agent:main neutron-l3-agent = neutron.agent.l3_agent:main - neutron-lbaas-agent = neutron.services.loadbalancer.drivers.haproxy.agent:main + neutron-lbaas-agent = neutron.services.loadbalancer.agent.agent:main neutron-linuxbridge-agent = neutron.plugins.linuxbridge.agent.linuxbridge_neutron_agent:main neutron-metadata-agent = neutron.agent.metadata.agent:main neutron-mlnx-agent = neutron.plugins.mlnx.agent.eswitch_neutron_agent:main @@ -105,7 +105,7 @@ console_scripts = quantum-dhcp-agent = neutron.agent.dhcp_agent:main quantum-hyperv-agent = neutron.plugins.hyperv.agent.hyperv_neutron_agent:main quantum-l3-agent = neutron.agent.l3_agent:main - quantum-lbaas-agent = neutron.services.loadbalancer.drivers.haproxy.agent:main + quantum-lbaas-agent = neutron.services.loadbalancer.agent.agent:main quantum-linuxbridge-agent = neutron.plugins.linuxbridge.agent.linuxbridge_neutron_agent:main quantum-metadata-agent = neutron.agent.metadata.agent:main quantum-mlnx-agent = neutron.plugins.mlnx.agent.eswitch_neutron_agent:main