LBaaS: move agent based driver files into a separate dir

Replace agent based driver files from haproxy driver dir
Same for unit tests files

Finalizes blueprint lbaas-common-agent-driver

Change-Id: Ibff85a9c2e1f1c59d72616836d56a697dce72c59
This commit is contained in:
Oleg Bondarev 2013-12-10 19:10:38 +04:00
parent e72929af4d
commit 6b913e9d3e
16 changed files with 486 additions and 482 deletions

View File

@ -29,10 +29,12 @@ L3PLUGIN = 'q-l3-plugin'
DHCP = 'q-dhcp-notifer' DHCP = 'q-dhcp-notifer'
FIREWALL_PLUGIN = 'q-firewall-plugin' FIREWALL_PLUGIN = 'q-firewall-plugin'
METERING_PLUGIN = 'q-metering-plugin' METERING_PLUGIN = 'q-metering-plugin'
LOADBALANCER_PLUGIN = 'n-lbaas-plugin'
L3_AGENT = 'l3_agent' L3_AGENT = 'l3_agent'
DHCP_AGENT = 'dhcp_agent' DHCP_AGENT = 'dhcp_agent'
METERING_AGENT = 'metering_agent' METERING_AGENT = 'metering_agent'
LOADBALANCER_AGENT = 'n-lbaas_agent'
def get_topic_name(prefix, table, operation, host=None): def get_topic_name(prefix, table, operation, host=None):

View File

@ -22,12 +22,10 @@ from oslo.config import cfg
from neutron.agent.common import config from neutron.agent.common import config
from neutron.agent.linux import interface from neutron.agent.linux import interface
from neutron.common import legacy from neutron.common import legacy
from neutron.common import topics
from neutron.openstack.common.rpc import service as rpc_service from neutron.openstack.common.rpc import service as rpc_service
from neutron.openstack.common import service from neutron.openstack.common import service
from neutron.services.loadbalancer.drivers.haproxy import ( from neutron.services.loadbalancer.agent import agent_manager as manager
agent_manager as manager,
plugin_driver
)
OPTS = [ OPTS = [
cfg.IntOpt( cfg.IntOpt(
@ -65,7 +63,7 @@ def main():
mgr = manager.LbaasAgentManager(cfg.CONF) mgr = manager.LbaasAgentManager(cfg.CONF)
svc = LbaasAgentService( svc = LbaasAgentService(
host=cfg.CONF.host, host=cfg.CONF.host,
topic=plugin_driver.TOPIC_LOADBALANCER_AGENT, topic=topics.LOADBALANCER_AGENT,
manager=mgr manager=mgr
) )
service.launch(svc).wait() service.launch(svc).wait()

View File

@ -21,16 +21,14 @@ from oslo.config import cfg
from neutron.agent import rpc as agent_rpc from neutron.agent import rpc as agent_rpc
from neutron.common import constants as n_const from neutron.common import constants as n_const
from neutron.common import exceptions as n_exc from neutron.common import exceptions as n_exc
from neutron.common import topics
from neutron import context from neutron import context
from neutron.openstack.common import importutils from neutron.openstack.common import importutils
from neutron.openstack.common import log as logging from neutron.openstack.common import log as logging
from neutron.openstack.common import loopingcall from neutron.openstack.common import loopingcall
from neutron.openstack.common import periodic_task from neutron.openstack.common import periodic_task
from neutron.plugins.common import constants from neutron.plugins.common import constants
from neutron.services.loadbalancer.drivers.haproxy import ( from neutron.services.loadbalancer.agent import agent_api
agent_api,
plugin_driver
)
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
@ -67,7 +65,7 @@ class LbaasAgentManager(periodic_task.PeriodicTasks):
self.conf = conf self.conf = conf
self.context = context.get_admin_context_without_session() self.context = context.get_admin_context_without_session()
self.plugin_rpc = agent_api.LbaasAgentApi( self.plugin_rpc = agent_api.LbaasAgentApi(
plugin_driver.TOPIC_LOADBALANCER_PLUGIN, topics.LOADBALANCER_PLUGIN,
self.context, self.context,
self.conf.host self.conf.host
) )
@ -76,7 +74,7 @@ class LbaasAgentManager(periodic_task.PeriodicTasks):
self.agent_state = { self.agent_state = {
'binary': 'neutron-lbaas-agent', 'binary': 'neutron-lbaas-agent',
'host': conf.host, 'host': conf.host,
'topic': plugin_driver.TOPIC_LOADBALANCER_AGENT, 'topic': topics.LOADBALANCER_AGENT,
'configurations': {'device_drivers': self.device_drivers.keys()}, 'configurations': {'device_drivers': self.device_drivers.keys()},
'agent_type': n_const.AGENT_TYPE_LOADBALANCER, 'agent_type': n_const.AGENT_TYPE_LOADBALANCER,
'start_flag': True} 'start_flag': True}
@ -109,7 +107,7 @@ class LbaasAgentManager(periodic_task.PeriodicTasks):
def _setup_state_rpc(self): def _setup_state_rpc(self):
self.state_rpc = agent_rpc.PluginReportStateAPI( self.state_rpc = agent_rpc.PluginReportStateAPI(
plugin_driver.TOPIC_LOADBALANCER_PLUGIN) topics.LOADBALANCER_PLUGIN)
report_interval = self.conf.AGENT.report_interval report_interval = self.conf.AGENT.report_interval
if report_interval: if report_interval:
heartbeat = loopingcall.FixedIntervalLoopingCall( heartbeat = loopingcall.FixedIntervalLoopingCall(

View File

@ -0,0 +1,449 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright 2013 New Dream Network, LLC (DreamHost)
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Mark McClain, DreamHost
import uuid
from oslo.config import cfg
from neutron.common import constants as q_const
from neutron.common import exceptions as q_exc
from neutron.common import rpc as q_rpc
from neutron.common import topics
from neutron.db import agents_db
from neutron.db.loadbalancer import loadbalancer_db
from neutron.extensions import lbaas_agentscheduler
from neutron.extensions import portbindings
from neutron.openstack.common import importutils
from neutron.openstack.common import log as logging
from neutron.openstack.common import rpc
from neutron.openstack.common.rpc import proxy
from neutron.plugins.common import constants
from neutron.services.loadbalancer.drivers import abstract_driver
LOG = logging.getLogger(__name__)
AGENT_SCHEDULER_OPTS = [
cfg.StrOpt('loadbalancer_pool_scheduler_driver',
default='neutron.services.loadbalancer.agent_scheduler'
'.ChanceScheduler',
help=_('Driver to use for scheduling '
'pool to a default loadbalancer agent')),
]
cfg.CONF.register_opts(AGENT_SCHEDULER_OPTS)
class DriverNotSpecified(q_exc.NeutronException):
message = _("Device driver for agent should be specified "
"in plugin driver.")
class LoadBalancerCallbacks(object):
RPC_API_VERSION = '2.0'
# history
# 1.0 Initial version
# 2.0 Generic API for agent based drivers
# - get_logical_device() handling changed;
# - pool_deployed() and update_status() methods added;
def __init__(self, plugin):
self.plugin = plugin
def create_rpc_dispatcher(self):
return q_rpc.PluginRpcDispatcher(
[self, agents_db.AgentExtRpcCallback(self.plugin)])
def get_ready_devices(self, context, host=None):
with context.session.begin(subtransactions=True):
agents = self.plugin.get_lbaas_agents(context,
filters={'host': [host]})
if not agents:
return []
elif len(agents) > 1:
LOG.warning(_('Multiple lbaas agents found on host %s'), host)
pools = self.plugin.list_pools_on_lbaas_agent(context,
agents[0].id)
pool_ids = [pool['id'] for pool in pools['pools']]
qry = context.session.query(loadbalancer_db.Pool.id)
qry = qry.filter(loadbalancer_db.Pool.id.in_(pool_ids))
qry = qry.filter(
loadbalancer_db.Pool.status.in_(constants.ACTIVE_PENDING))
up = True # makes pep8 and sqlalchemy happy
qry = qry.filter(loadbalancer_db.Pool.admin_state_up == up)
return [id for id, in qry]
def get_logical_device(self, context, pool_id=None):
with context.session.begin(subtransactions=True):
qry = context.session.query(loadbalancer_db.Pool)
qry = qry.filter_by(id=pool_id)
pool = qry.one()
if pool.status != constants.ACTIVE:
raise q_exc.Invalid(_('Expected active pool'))
retval = {}
retval['pool'] = self.plugin._make_pool_dict(pool)
if pool.vip:
retval['vip'] = self.plugin._make_vip_dict(pool.vip)
retval['vip']['port'] = (
self.plugin._core_plugin._make_port_dict(pool.vip.port)
)
for fixed_ip in retval['vip']['port']['fixed_ips']:
fixed_ip['subnet'] = (
self.plugin._core_plugin.get_subnet(
context,
fixed_ip['subnet_id']
)
)
retval['members'] = [
self.plugin._make_member_dict(m)
for m in pool.members if (
m.status in constants.ACTIVE_PENDING or
m.status == constants.INACTIVE)
]
retval['healthmonitors'] = [
self.plugin._make_health_monitor_dict(hm.healthmonitor)
for hm in pool.monitors
if hm.status in constants.ACTIVE_PENDING
]
retval['driver'] = (
self.plugin.drivers[pool.provider.provider_name].device_driver)
return retval
def pool_deployed(self, context, pool_id):
with context.session.begin(subtransactions=True):
qry = context.session.query(loadbalancer_db.Pool)
qry = qry.filter_by(id=pool_id)
pool = qry.one()
# set all resources to active
if pool.status in constants.ACTIVE_PENDING:
pool.status = constants.ACTIVE
if pool.vip and pool.vip.status in constants.ACTIVE_PENDING:
pool.vip.status = constants.ACTIVE
for m in pool.members:
if m.status in constants.ACTIVE_PENDING:
m.status = constants.ACTIVE
for hm in pool.monitors:
if hm.status in constants.ACTIVE_PENDING:
hm.status = constants.ACTIVE
def update_status(self, context, obj_type, obj_id, status):
model_mapping = {
'pool': loadbalancer_db.Pool,
'vip': loadbalancer_db.Vip,
'member': loadbalancer_db.Member,
'health_monitor': loadbalancer_db.PoolMonitorAssociation
}
if obj_type not in model_mapping:
raise q_exc.Invalid(_('Unknown object type: %s') % obj_type)
try:
if obj_type == 'health_monitor':
self.plugin.update_pool_health_monitor(
context, obj_id['monitor_id'], obj_id['pool_id'], status)
else:
self.plugin.update_status(
context, model_mapping[obj_type], obj_id, status)
except q_exc.NotFound:
# update_status may come from agent on an object which was
# already deleted from db with other request
LOG.warning(_('Cannot update status: %(obj_type)s %(obj_id)s '
'not found in the DB, it was probably deleted '
'concurrently'),
{'obj_type': obj_type, 'obj_id': obj_id})
def pool_destroyed(self, context, pool_id=None):
"""Agent confirmation hook that a pool has been destroyed.
This method exists for subclasses to change the deletion
behavior.
"""
pass
def plug_vip_port(self, context, port_id=None, host=None):
if not port_id:
return
try:
port = self.plugin._core_plugin.get_port(
context,
port_id
)
except q_exc.PortNotFound:
msg = _('Unable to find port %s to plug.')
LOG.debug(msg, port_id)
return
port['admin_state_up'] = True
port['device_owner'] = 'neutron:' + constants.LOADBALANCER
port['device_id'] = str(uuid.uuid5(uuid.NAMESPACE_DNS, str(host)))
port[portbindings.HOST_ID] = host
self.plugin._core_plugin.update_port(
context,
port_id,
{'port': port}
)
def unplug_vip_port(self, context, port_id=None, host=None):
if not port_id:
return
try:
port = self.plugin._core_plugin.get_port(
context,
port_id
)
except q_exc.PortNotFound:
msg = _('Unable to find port %s to unplug. This can occur when '
'the Vip has been deleted first.')
LOG.debug(msg, port_id)
return
port['admin_state_up'] = False
port['device_owner'] = ''
port['device_id'] = ''
try:
self.plugin._core_plugin.update_port(
context,
port_id,
{'port': port}
)
except q_exc.PortNotFound:
msg = _('Unable to find port %s to unplug. This can occur when '
'the Vip has been deleted first.')
LOG.debug(msg, port_id)
def update_pool_stats(self, context, pool_id=None, stats=None, host=None):
self.plugin.update_pool_stats(context, pool_id, data=stats)
class LoadBalancerAgentApi(proxy.RpcProxy):
"""Plugin side of plugin to agent RPC API."""
BASE_RPC_API_VERSION = '2.0'
# history
# 1.0 Initial version
# 1.1 Support agent_updated call
# 2.0 Generic API for agent based drivers
# - modify/reload/destroy_pool methods were removed;
# - added methods to handle create/update/delete for every lbaas
# object individually;
def __init__(self, topic):
super(LoadBalancerAgentApi, self).__init__(
topic, default_version=self.BASE_RPC_API_VERSION)
def _cast(self, context, method_name, method_args, host, version=None):
return self.cast(
context,
self.make_msg(method_name, **method_args),
topic='%s.%s' % (self.topic, host),
version=version
)
def create_vip(self, context, vip, host):
return self._cast(context, 'create_vip', {'vip': vip}, host)
def update_vip(self, context, old_vip, vip, host):
return self._cast(context, 'update_vip',
{'old_vip': old_vip, 'vip': vip}, host)
def delete_vip(self, context, vip, host):
return self._cast(context, 'delete_vip', {'vip': vip}, host)
def create_pool(self, context, pool, host, driver_name):
return self._cast(context, 'create_pool',
{'pool': pool, 'driver_name': driver_name}, host)
def update_pool(self, context, old_pool, pool, host):
return self._cast(context, 'update_pool',
{'old_pool': old_pool, 'pool': pool}, host)
def delete_pool(self, context, pool, host):
return self._cast(context, 'delete_pool', {'pool': pool}, host)
def create_member(self, context, member, host):
return self._cast(context, 'create_member', {'member': member}, host)
def update_member(self, context, old_member, member, host):
return self._cast(context, 'update_member',
{'old_member': old_member, 'member': member}, host)
def delete_member(self, context, member, host):
return self._cast(context, 'delete_member', {'member': member}, host)
def create_pool_health_monitor(self, context, health_monitor, pool_id,
host):
return self._cast(context, 'create_pool_health_monitor',
{'health_monitor': health_monitor,
'pool_id': pool_id}, host)
def update_pool_health_monitor(self, context, old_health_monitor,
health_monitor, pool_id, host):
return self._cast(context, 'update_pool_health_monitor',
{'old_health_monitor': old_health_monitor,
'health_monitor': health_monitor,
'pool_id': pool_id}, host)
def delete_pool_health_monitor(self, context, health_monitor, pool_id,
host):
return self._cast(context, 'delete_pool_health_monitor',
{'health_monitor': health_monitor,
'pool_id': pool_id}, host)
def agent_updated(self, context, admin_state_up, host):
return self._cast(context, 'agent_updated',
{'payload': {'admin_state_up': admin_state_up}},
host)
class AgentDriverBase(abstract_driver.LoadBalancerAbstractDriver):
# name of device driver that should be used by the agent;
# vendor specific plugin drivers must override it;
device_driver = None
def __init__(self, plugin):
if not self.device_driver:
raise DriverNotSpecified()
self.agent_rpc = LoadBalancerAgentApi(topics.LOADBALANCER_AGENT)
self.plugin = plugin
self._set_callbacks_on_plugin()
self.plugin.agent_notifiers.update(
{q_const.AGENT_TYPE_LOADBALANCER: self.agent_rpc})
self.pool_scheduler = importutils.import_object(
cfg.CONF.loadbalancer_pool_scheduler_driver)
def _set_callbacks_on_plugin(self):
# other agent based plugin driver might already set callbacks on plugin
if hasattr(self.plugin, 'agent_callbacks'):
return
self.plugin.agent_callbacks = LoadBalancerCallbacks(self.plugin)
self.plugin.conn = rpc.create_connection(new=True)
self.plugin.conn.create_consumer(
topics.LOADBALANCER_PLUGIN,
self.plugin.agent_callbacks.create_rpc_dispatcher(),
fanout=False)
self.plugin.conn.consume_in_thread()
def get_pool_agent(self, context, pool_id):
agent = self.plugin.get_lbaas_agent_hosting_pool(context, pool_id)
if not agent:
raise lbaas_agentscheduler.NoActiveLbaasAgent(pool_id=pool_id)
return agent['agent']
def create_vip(self, context, vip):
agent = self.get_pool_agent(context, vip['pool_id'])
self.agent_rpc.create_vip(context, vip, agent['host'])
def update_vip(self, context, old_vip, vip):
agent = self.get_pool_agent(context, vip['pool_id'])
if vip['status'] in constants.ACTIVE_PENDING:
self.agent_rpc.update_vip(context, old_vip, vip, agent['host'])
else:
self.agent_rpc.delete_vip(context, vip, agent['host'])
def delete_vip(self, context, vip):
self.plugin._delete_db_vip(context, vip['id'])
agent = self.get_pool_agent(context, vip['pool_id'])
self.agent_rpc.delete_vip(context, vip, agent['host'])
def create_pool(self, context, pool):
agent = self.pool_scheduler.schedule(self.plugin, context, pool,
self.device_driver)
if not agent:
raise lbaas_agentscheduler.NoEligibleLbaasAgent(pool_id=pool['id'])
self.agent_rpc.create_pool(context, pool, agent['host'],
self.device_driver)
def update_pool(self, context, old_pool, pool):
agent = self.get_pool_agent(context, pool['id'])
if pool['status'] in constants.ACTIVE_PENDING:
self.agent_rpc.update_pool(context, old_pool, pool,
agent['host'])
else:
self.agent_rpc.delete_pool(context, pool, agent['host'])
def delete_pool(self, context, pool):
# get agent first to know host as binding will be deleted
# after pool is deleted from db
agent = self.plugin.get_lbaas_agent_hosting_pool(context, pool['id'])
self.plugin._delete_db_pool(context, pool['id'])
if agent:
self.agent_rpc.delete_pool(context, pool, agent['agent']['host'])
def create_member(self, context, member):
agent = self.get_pool_agent(context, member['pool_id'])
self.agent_rpc.create_member(context, member, agent['host'])
def update_member(self, context, old_member, member):
agent = self.get_pool_agent(context, member['pool_id'])
# member may change pool id
if member['pool_id'] != old_member['pool_id']:
old_pool_agent = self.plugin.get_lbaas_agent_hosting_pool(
context, old_member['pool_id'])
if old_pool_agent:
self.agent_rpc.delete_member(context, old_member,
old_pool_agent['agent']['host'])
self.agent_rpc.create_member(context, member, agent['host'])
else:
self.agent_rpc.update_member(context, old_member, member,
agent['host'])
def delete_member(self, context, member):
self.plugin._delete_db_member(context, member['id'])
agent = self.get_pool_agent(context, member['pool_id'])
self.agent_rpc.delete_member(context, member, agent['host'])
def create_pool_health_monitor(self, context, healthmon, pool_id):
# healthmon is not used here
agent = self.get_pool_agent(context, pool_id)
self.agent_rpc.create_pool_health_monitor(context, healthmon,
pool_id, agent['host'])
def update_pool_health_monitor(self, context, old_health_monitor,
health_monitor, pool_id):
agent = self.get_pool_agent(context, pool_id)
self.agent_rpc.update_pool_health_monitor(context, old_health_monitor,
health_monitor, pool_id,
agent['host'])
def delete_pool_health_monitor(self, context, health_monitor, pool_id):
self.plugin._delete_db_pool_health_monitor(
context, health_monitor['id'], pool_id
)
agent = self.get_pool_agent(context, pool_id)
self.agent_rpc.delete_pool_health_monitor(context, health_monitor,
pool_id, agent['host'])
def stats(self, context, pool_id):
pass

View File

@ -30,8 +30,8 @@ from neutron.common import utils as n_utils
from neutron.openstack.common import importutils from neutron.openstack.common import importutils
from neutron.openstack.common import log as logging from neutron.openstack.common import log as logging
from neutron.plugins.common import constants from neutron.plugins.common import constants
from neutron.services.loadbalancer.agent import agent_device_driver
from neutron.services.loadbalancer import constants as lb_const from neutron.services.loadbalancer import constants as lb_const
from neutron.services.loadbalancer.drivers import agent_device_driver
from neutron.services.loadbalancer.drivers.haproxy import cfg as hacfg from neutron.services.loadbalancer.drivers.haproxy import cfg as hacfg
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)

View File

@ -1,6 +1,7 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4 # vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright 2013 New Dream Network, LLC (DreamHost) # Copyright (c) 2013 OpenStack Foundation.
# All Rights Reserved.
# #
# Licensed under the Apache License, Version 2.0 (the "License"); you may # Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain # not use this file except in compliance with the License. You may obtain
@ -13,447 +14,10 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
#
# @author: Mark McClain, DreamHost
import uuid from neutron.services.loadbalancer.drivers.common import agent_driver_base
from neutron.services.loadbalancer.drivers.haproxy import namespace_driver
from oslo.config import cfg
from neutron.common import constants as q_const class HaproxyOnHostPluginDriver(agent_driver_base.AgentDriverBase):
from neutron.common import exceptions as q_exc device_driver = namespace_driver.DRIVER_NAME
from neutron.common import rpc as q_rpc
from neutron.db import agents_db
from neutron.db.loadbalancer import loadbalancer_db
from neutron.extensions import lbaas_agentscheduler
from neutron.extensions import portbindings
from neutron.openstack.common import importutils
from neutron.openstack.common import log as logging
from neutron.openstack.common import rpc
from neutron.openstack.common.rpc import proxy
from neutron.plugins.common import constants
from neutron.services.loadbalancer.drivers import abstract_driver
LOG = logging.getLogger(__name__)
AGENT_SCHEDULER_OPTS = [
cfg.StrOpt('loadbalancer_pool_scheduler_driver',
default='neutron.services.loadbalancer.agent_scheduler'
'.ChanceScheduler',
help=_('Driver to use for scheduling '
'pool to a default loadbalancer agent')),
]
cfg.CONF.register_opts(AGENT_SCHEDULER_OPTS)
# topic name for this particular agent implementation
TOPIC_LOADBALANCER_PLUGIN = 'n-lbaas-plugin'
TOPIC_LOADBALANCER_AGENT = 'n-lbaas_agent'
class DriverNotSpecified(q_exc.NeutronException):
message = _("Device driver for agent should be specified "
"in plugin driver.")
class LoadBalancerCallbacks(object):
RPC_API_VERSION = '2.0'
# history
# 1.0 Initial version
# 2.0 Generic API for agent based drivers
# - get_logical_device() handling changed;
# - pool_deployed() and update_status() methods added;
def __init__(self, plugin):
self.plugin = plugin
def create_rpc_dispatcher(self):
return q_rpc.PluginRpcDispatcher(
[self, agents_db.AgentExtRpcCallback(self.plugin)])
def get_ready_devices(self, context, host=None):
with context.session.begin(subtransactions=True):
agents = self.plugin.get_lbaas_agents(context,
filters={'host': [host]})
if not agents:
return []
elif len(agents) > 1:
LOG.warning(_('Multiple lbaas agents found on host %s'), host)
pools = self.plugin.list_pools_on_lbaas_agent(context,
agents[0].id)
pool_ids = [pool['id'] for pool in pools['pools']]
qry = context.session.query(loadbalancer_db.Pool.id)
qry = qry.filter(loadbalancer_db.Pool.id.in_(pool_ids))
qry = qry.filter(
loadbalancer_db.Pool.status.in_(constants.ACTIVE_PENDING))
up = True # makes pep8 and sqlalchemy happy
qry = qry.filter(loadbalancer_db.Pool.admin_state_up == up)
return [id for id, in qry]
def get_logical_device(self, context, pool_id=None):
with context.session.begin(subtransactions=True):
qry = context.session.query(loadbalancer_db.Pool)
qry = qry.filter_by(id=pool_id)
pool = qry.one()
if pool.status != constants.ACTIVE:
raise q_exc.Invalid(_('Expected active pool'))
retval = {}
retval['pool'] = self.plugin._make_pool_dict(pool)
if pool.vip:
retval['vip'] = self.plugin._make_vip_dict(pool.vip)
retval['vip']['port'] = (
self.plugin._core_plugin._make_port_dict(pool.vip.port)
)
for fixed_ip in retval['vip']['port']['fixed_ips']:
fixed_ip['subnet'] = (
self.plugin._core_plugin.get_subnet(
context,
fixed_ip['subnet_id']
)
)
retval['members'] = [
self.plugin._make_member_dict(m)
for m in pool.members if (
m.status in constants.ACTIVE_PENDING or
m.status == constants.INACTIVE)
]
retval['healthmonitors'] = [
self.plugin._make_health_monitor_dict(hm.healthmonitor)
for hm in pool.monitors
if hm.status in constants.ACTIVE_PENDING
]
retval['driver'] = (
self.plugin.drivers[pool.provider.provider_name].device_driver)
return retval
def pool_deployed(self, context, pool_id):
with context.session.begin(subtransactions=True):
qry = context.session.query(loadbalancer_db.Pool)
qry = qry.filter_by(id=pool_id)
pool = qry.one()
# set all resources to active
if pool.status in constants.ACTIVE_PENDING:
pool.status = constants.ACTIVE
if pool.vip and pool.vip.status in constants.ACTIVE_PENDING:
pool.vip.status = constants.ACTIVE
for m in pool.members:
if m.status in constants.ACTIVE_PENDING:
m.status = constants.ACTIVE
for hm in pool.monitors:
if hm.status in constants.ACTIVE_PENDING:
hm.status = constants.ACTIVE
def update_status(self, context, obj_type, obj_id, status):
model_mapping = {
'pool': loadbalancer_db.Pool,
'vip': loadbalancer_db.Vip,
'member': loadbalancer_db.Member,
'health_monitor': loadbalancer_db.PoolMonitorAssociation
}
if obj_type not in model_mapping:
raise q_exc.Invalid(_('Unknown object type: %s') % obj_type)
try:
if obj_type == 'health_monitor':
self.plugin.update_pool_health_monitor(
context, obj_id['monitor_id'], obj_id['pool_id'], status)
else:
self.plugin.update_status(
context, model_mapping[obj_type], obj_id, status)
except q_exc.NotFound:
# update_status may come from agent on an object which was
# already deleted from db with other request
LOG.warning(_('Cannot update status: %(obj_type)s %(obj_id)s '
'not found in the DB, it was probably deleted '
'concurrently'),
{'obj_type': obj_type, 'obj_id': obj_id})
def pool_destroyed(self, context, pool_id=None):
"""Agent confirmation hook that a pool has been destroyed.
This method exists for subclasses to change the deletion
behavior.
"""
pass
def plug_vip_port(self, context, port_id=None, host=None):
if not port_id:
return
try:
port = self.plugin._core_plugin.get_port(
context,
port_id
)
except q_exc.PortNotFound:
msg = _('Unable to find port %s to plug.')
LOG.debug(msg, port_id)
return
port['admin_state_up'] = True
port['device_owner'] = 'neutron:' + constants.LOADBALANCER
port['device_id'] = str(uuid.uuid5(uuid.NAMESPACE_DNS, str(host)))
port[portbindings.HOST_ID] = host
self.plugin._core_plugin.update_port(
context,
port_id,
{'port': port}
)
def unplug_vip_port(self, context, port_id=None, host=None):
if not port_id:
return
try:
port = self.plugin._core_plugin.get_port(
context,
port_id
)
except q_exc.PortNotFound:
msg = _('Unable to find port %s to unplug. This can occur when '
'the Vip has been deleted first.')
LOG.debug(msg, port_id)
return
port['admin_state_up'] = False
port['device_owner'] = ''
port['device_id'] = ''
try:
self.plugin._core_plugin.update_port(
context,
port_id,
{'port': port}
)
except q_exc.PortNotFound:
msg = _('Unable to find port %s to unplug. This can occur when '
'the Vip has been deleted first.')
LOG.debug(msg, port_id)
def update_pool_stats(self, context, pool_id=None, stats=None, host=None):
self.plugin.update_pool_stats(context, pool_id, data=stats)
class LoadBalancerAgentApi(proxy.RpcProxy):
"""Plugin side of plugin to agent RPC API."""
BASE_RPC_API_VERSION = '2.0'
# history
# 1.0 Initial version
# 1.1 Support agent_updated call
# 2.0 Generic API for agent based drivers
# - modify/reload/destroy_pool methods were removed;
# - added methods to handle create/update/delete for every lbaas
# object individually;
def __init__(self, topic):
super(LoadBalancerAgentApi, self).__init__(
topic, default_version=self.BASE_RPC_API_VERSION)
def _cast(self, context, method_name, method_args, host, version=None):
return self.cast(
context,
self.make_msg(method_name, **method_args),
topic='%s.%s' % (self.topic, host),
version=version
)
def create_vip(self, context, vip, host):
return self._cast(context, 'create_vip', {'vip': vip}, host)
def update_vip(self, context, old_vip, vip, host):
return self._cast(context, 'update_vip',
{'old_vip': old_vip, 'vip': vip}, host)
def delete_vip(self, context, vip, host):
return self._cast(context, 'delete_vip', {'vip': vip}, host)
def create_pool(self, context, pool, host, driver_name):
return self._cast(context, 'create_pool',
{'pool': pool, 'driver_name': driver_name}, host)
def update_pool(self, context, old_pool, pool, host):
return self._cast(context, 'update_pool',
{'old_pool': old_pool, 'pool': pool}, host)
def delete_pool(self, context, pool, host):
return self._cast(context, 'delete_pool', {'pool': pool}, host)
def create_member(self, context, member, host):
return self._cast(context, 'create_member', {'member': member}, host)
def update_member(self, context, old_member, member, host):
return self._cast(context, 'update_member',
{'old_member': old_member, 'member': member}, host)
def delete_member(self, context, member, host):
return self._cast(context, 'delete_member', {'member': member}, host)
def create_pool_health_monitor(self, context, health_monitor, pool_id,
host):
return self._cast(context, 'create_pool_health_monitor',
{'health_monitor': health_monitor,
'pool_id': pool_id}, host)
def update_pool_health_monitor(self, context, old_health_monitor,
health_monitor, pool_id, host):
return self._cast(context, 'update_pool_health_monitor',
{'old_health_monitor': old_health_monitor,
'health_monitor': health_monitor,
'pool_id': pool_id}, host)
def delete_pool_health_monitor(self, context, health_monitor, pool_id,
host):
return self._cast(context, 'delete_pool_health_monitor',
{'health_monitor': health_monitor,
'pool_id': pool_id}, host)
def agent_updated(self, context, admin_state_up, host):
return self._cast(context, 'agent_updated',
{'payload': {'admin_state_up': admin_state_up}},
host)
class AgentBasedPluginDriver(abstract_driver.LoadBalancerAbstractDriver):
# name of device driver that should be used by the agent;
# vendor specific plugin drivers must override it;
device_driver = None
def __init__(self, plugin):
if not self.device_driver:
raise DriverNotSpecified()
self.agent_rpc = LoadBalancerAgentApi(TOPIC_LOADBALANCER_AGENT)
self.plugin = plugin
self._set_callbacks_on_plugin()
self.plugin.agent_notifiers.update(
{q_const.AGENT_TYPE_LOADBALANCER: self.agent_rpc})
self.pool_scheduler = importutils.import_object(
cfg.CONF.loadbalancer_pool_scheduler_driver)
def _set_callbacks_on_plugin(self):
# other agent based plugin driver might already set callbacks on plugin
if hasattr(self.plugin, 'agent_callbacks'):
return
self.plugin.agent_callbacks = LoadBalancerCallbacks(self.plugin)
self.plugin.conn = rpc.create_connection(new=True)
self.plugin.conn.create_consumer(
TOPIC_LOADBALANCER_PLUGIN,
self.plugin.agent_callbacks.create_rpc_dispatcher(),
fanout=False)
self.plugin.conn.consume_in_thread()
def get_pool_agent(self, context, pool_id):
agent = self.plugin.get_lbaas_agent_hosting_pool(context, pool_id)
if not agent:
raise lbaas_agentscheduler.NoActiveLbaasAgent(pool_id=pool_id)
return agent['agent']
def create_vip(self, context, vip):
agent = self.get_pool_agent(context, vip['pool_id'])
self.agent_rpc.create_vip(context, vip, agent['host'])
def update_vip(self, context, old_vip, vip):
agent = self.get_pool_agent(context, vip['pool_id'])
if vip['status'] in constants.ACTIVE_PENDING:
self.agent_rpc.update_vip(context, old_vip, vip, agent['host'])
else:
self.agent_rpc.delete_vip(context, vip, agent['host'])
def delete_vip(self, context, vip):
self.plugin._delete_db_vip(context, vip['id'])
agent = self.get_pool_agent(context, vip['pool_id'])
self.agent_rpc.delete_vip(context, vip, agent['host'])
def create_pool(self, context, pool):
agent = self.pool_scheduler.schedule(self.plugin, context, pool,
self.device_driver)
if not agent:
raise lbaas_agentscheduler.NoEligibleLbaasAgent(pool_id=pool['id'])
self.agent_rpc.create_pool(context, pool, agent['host'],
self.device_driver)
def update_pool(self, context, old_pool, pool):
agent = self.get_pool_agent(context, pool['id'])
if pool['status'] in constants.ACTIVE_PENDING:
self.agent_rpc.update_pool(context, old_pool, pool,
agent['host'])
else:
self.agent_rpc.delete_pool(context, pool, agent['host'])
def delete_pool(self, context, pool):
# get agent first to know host as binding will be deleted
# after pool is deleted from db
agent = self.plugin.get_lbaas_agent_hosting_pool(context, pool['id'])
self.plugin._delete_db_pool(context, pool['id'])
if agent:
self.agent_rpc.delete_pool(context, pool, agent['agent']['host'])
def create_member(self, context, member):
agent = self.get_pool_agent(context, member['pool_id'])
self.agent_rpc.create_member(context, member, agent['host'])
def update_member(self, context, old_member, member):
agent = self.get_pool_agent(context, member['pool_id'])
# member may change pool id
if member['pool_id'] != old_member['pool_id']:
old_pool_agent = self.plugin.get_lbaas_agent_hosting_pool(
context, old_member['pool_id'])
if old_pool_agent:
self.agent_rpc.delete_member(context, old_member,
old_pool_agent['agent']['host'])
self.agent_rpc.create_member(context, member, agent['host'])
else:
self.agent_rpc.update_member(context, old_member, member,
agent['host'])
def delete_member(self, context, member):
self.plugin._delete_db_member(context, member['id'])
agent = self.get_pool_agent(context, member['pool_id'])
self.agent_rpc.delete_member(context, member, agent['host'])
def create_pool_health_monitor(self, context, healthmon, pool_id):
# healthmon is not used here
agent = self.get_pool_agent(context, pool_id)
self.agent_rpc.create_pool_health_monitor(context, healthmon,
pool_id, agent['host'])
def update_pool_health_monitor(self, context, old_health_monitor,
health_monitor, pool_id):
agent = self.get_pool_agent(context, pool_id)
self.agent_rpc.update_pool_health_monitor(context, old_health_monitor,
health_monitor, pool_id,
agent['host'])
def delete_pool_health_monitor(self, context, health_monitor, pool_id):
self.plugin._delete_db_pool_health_monitor(
context, health_monitor['id'], pool_id
)
agent = self.get_pool_agent(context, pool_id)
self.agent_rpc.delete_pool_health_monitor(context, health_monitor,
pool_id, agent['host'])
def stats(self, context, pool_id):
pass
class HaproxyOnHostPluginDriver(AgentBasedPluginDriver):
#TODO(obondarev): change hardcoded driver name
# to namespace_driver.DRIVER_NAME after moving HaproxyOnHostPluginDriver
# to a separate file (follow-up patch)
device_driver = 'haproxy_ns'

View File

@ -20,7 +20,7 @@ import contextlib
import mock import mock
from oslo.config import cfg from oslo.config import cfg
from neutron.services.loadbalancer.drivers.haproxy import agent from neutron.services.loadbalancer.agent import agent
from neutron.tests import base from neutron.tests import base

View File

@ -21,9 +21,7 @@ import contextlib
import mock import mock
from neutron.plugins.common import constants from neutron.plugins.common import constants
from neutron.services.loadbalancer.drivers.haproxy import ( from neutron.services.loadbalancer.agent import agent_manager as manager
agent_manager as manager
)
from neutron.tests import base from neutron.tests import base
@ -38,8 +36,7 @@ class TestManager(base.BaseTestCase):
self.mock_importer = mock.patch.object(manager, 'importutils').start() self.mock_importer = mock.patch.object(manager, 'importutils').start()
rpc_mock_cls = mock.patch( rpc_mock_cls = mock.patch(
'neutron.services.loadbalancer.drivers' 'neutron.services.loadbalancer.agent.agent_api.LbaasAgentApi'
'.haproxy.agent_api.LbaasAgentApi'
).start() ).start()
self.mgr = manager.LbaasAgentManager(mock_conf) self.mgr = manager.LbaasAgentManager(mock_conf)

View File

@ -18,9 +18,7 @@
import mock import mock
from neutron.services.loadbalancer.drivers.haproxy import ( from neutron.services.loadbalancer.agent import agent_api as api
agent_api as api
)
from neutron.tests import base from neutron.tests import base

View File

@ -30,9 +30,7 @@ from neutron.extensions import portbindings
from neutron import manager from neutron import manager
from neutron.openstack.common import uuidutils from neutron.openstack.common import uuidutils
from neutron.plugins.common import constants from neutron.plugins.common import constants
from neutron.services.loadbalancer.drivers.haproxy import ( from neutron.services.loadbalancer.drivers.common import agent_driver_base
plugin_driver
)
from neutron.tests import base from neutron.tests import base
from neutron.tests.unit.db.loadbalancer import test_db_loadbalancer from neutron.tests.unit.db.loadbalancer import test_db_loadbalancer
from neutron.tests.unit import testlib_api from neutron.tests.unit import testlib_api
@ -43,20 +41,20 @@ class TestLoadBalancerPluginBase(
def setUp(self): def setUp(self):
def reset_device_driver(): def reset_device_driver():
plugin_driver.AgentBasedPluginDriver.device_driver = None agent_driver_base.AgentDriverBase.device_driver = None
self.addCleanup(reset_device_driver) self.addCleanup(reset_device_driver)
self.mock_importer = mock.patch.object( self.mock_importer = mock.patch.object(
plugin_driver, 'importutils').start() agent_driver_base, 'importutils').start()
self.addCleanup(mock.patch.stopall) self.addCleanup(mock.patch.stopall)
# needed to reload provider configuration # needed to reload provider configuration
st_db.ServiceTypeManager._instance = None st_db.ServiceTypeManager._instance = None
plugin_driver.AgentBasedPluginDriver.device_driver = 'dummy' agent_driver_base.AgentDriverBase.device_driver = 'dummy'
super(TestLoadBalancerPluginBase, self).setUp( super(TestLoadBalancerPluginBase, self).setUp(
lbaas_provider=('LOADBALANCER:lbaas:neutron.services.' lbaas_provider=('LOADBALANCER:lbaas:neutron.services.'
'loadbalancer.drivers.haproxy.plugin_driver.' 'loadbalancer.drivers.common.agent_driver_base.'
'AgentBasedPluginDriver:default')) 'AgentDriverBase:default'))
# we need access to loaded plugins to modify models # we need access to loaded plugins to modify models
loaded_plugins = manager.NeutronManager().get_service_plugins() loaded_plugins = manager.NeutronManager().get_service_plugins()
@ -68,7 +66,7 @@ class TestLoadBalancerCallbacks(TestLoadBalancerPluginBase):
def setUp(self): def setUp(self):
super(TestLoadBalancerCallbacks, self).setUp() super(TestLoadBalancerCallbacks, self).setUp()
self.callbacks = plugin_driver.LoadBalancerCallbacks( self.callbacks = agent_driver_base.LoadBalancerCallbacks(
self.plugin_instance self.plugin_instance
) )
get_lbaas_agents_patcher = mock.patch( get_lbaas_agents_patcher = mock.patch(
@ -400,7 +398,7 @@ class TestLoadBalancerCallbacks(TestLoadBalancerPluginBase):
self.assertEqual('ACTIVE', p['status']) self.assertEqual('ACTIVE', p['status'])
def test_update_status_pool_deleted_already(self): def test_update_status_pool_deleted_already(self):
with mock.patch.object(plugin_driver, 'LOG') as mock_log: with mock.patch.object(agent_driver_base, 'LOG') as mock_log:
pool_id = 'deleted_pool' pool_id = 'deleted_pool'
ctx = context.get_admin_context() ctx = context.get_admin_context()
self.assertRaises(loadbalancer.PoolNotFound, self.assertRaises(loadbalancer.PoolNotFound,
@ -433,7 +431,7 @@ class TestLoadBalancerAgentApi(base.BaseTestCase):
super(TestLoadBalancerAgentApi, self).setUp() super(TestLoadBalancerAgentApi, self).setUp()
self.addCleanup(mock.patch.stopall) self.addCleanup(mock.patch.stopall)
self.api = plugin_driver.LoadBalancerAgentApi('topic') self.api = agent_driver_base.LoadBalancerAgentApi('topic')
self.mock_cast = mock.patch.object(self.api, 'cast').start() self.mock_cast = mock.patch.object(self.api, 'cast').start()
self.mock_msg = mock.patch.object(self.api, 'make_msg').start() self.mock_msg = mock.patch.object(self.api, 'make_msg').start()
@ -510,16 +508,16 @@ class TestLoadBalancerAgentApi(base.BaseTestCase):
class TestLoadBalancerPluginNotificationWrapper(TestLoadBalancerPluginBase): class TestLoadBalancerPluginNotificationWrapper(TestLoadBalancerPluginBase):
def setUp(self): def setUp(self):
self.log = mock.patch.object(plugin_driver, 'LOG') self.log = mock.patch.object(agent_driver_base, 'LOG')
api_cls = mock.patch.object(plugin_driver, api_cls = mock.patch.object(agent_driver_base,
'LoadBalancerAgentApi').start() 'LoadBalancerAgentApi').start()
super(TestLoadBalancerPluginNotificationWrapper, self).setUp() super(TestLoadBalancerPluginNotificationWrapper, self).setUp()
self.mock_api = api_cls.return_value self.mock_api = api_cls.return_value
self.mock_get_driver = mock.patch.object(self.plugin_instance, self.mock_get_driver = mock.patch.object(self.plugin_instance,
'_get_driver') '_get_driver')
self.mock_get_driver.return_value = (plugin_driver. self.mock_get_driver.return_value = (agent_driver_base.
AgentBasedPluginDriver( AgentDriverBase(
self.plugin_instance self.plugin_instance
)) ))

View File

@ -85,7 +85,7 @@ console_scripts =
neutron-dhcp-agent = neutron.agent.dhcp_agent:main neutron-dhcp-agent = neutron.agent.dhcp_agent:main
neutron-hyperv-agent = neutron.plugins.hyperv.agent.hyperv_neutron_agent:main neutron-hyperv-agent = neutron.plugins.hyperv.agent.hyperv_neutron_agent:main
neutron-l3-agent = neutron.agent.l3_agent:main neutron-l3-agent = neutron.agent.l3_agent:main
neutron-lbaas-agent = neutron.services.loadbalancer.drivers.haproxy.agent:main neutron-lbaas-agent = neutron.services.loadbalancer.agent.agent:main
neutron-linuxbridge-agent = neutron.plugins.linuxbridge.agent.linuxbridge_neutron_agent:main neutron-linuxbridge-agent = neutron.plugins.linuxbridge.agent.linuxbridge_neutron_agent:main
neutron-metadata-agent = neutron.agent.metadata.agent:main neutron-metadata-agent = neutron.agent.metadata.agent:main
neutron-mlnx-agent = neutron.plugins.mlnx.agent.eswitch_neutron_agent:main neutron-mlnx-agent = neutron.plugins.mlnx.agent.eswitch_neutron_agent:main
@ -105,7 +105,7 @@ console_scripts =
quantum-dhcp-agent = neutron.agent.dhcp_agent:main quantum-dhcp-agent = neutron.agent.dhcp_agent:main
quantum-hyperv-agent = neutron.plugins.hyperv.agent.hyperv_neutron_agent:main quantum-hyperv-agent = neutron.plugins.hyperv.agent.hyperv_neutron_agent:main
quantum-l3-agent = neutron.agent.l3_agent:main quantum-l3-agent = neutron.agent.l3_agent:main
quantum-lbaas-agent = neutron.services.loadbalancer.drivers.haproxy.agent:main quantum-lbaas-agent = neutron.services.loadbalancer.agent.agent:main
quantum-linuxbridge-agent = neutron.plugins.linuxbridge.agent.linuxbridge_neutron_agent:main quantum-linuxbridge-agent = neutron.plugins.linuxbridge.agent.linuxbridge_neutron_agent:main
quantum-metadata-agent = neutron.agent.metadata.agent:main quantum-metadata-agent = neutron.agent.metadata.agent:main
quantum-mlnx-agent = neutron.plugins.mlnx.agent.eswitch_neutron_agent:main quantum-mlnx-agent = neutron.plugins.mlnx.agent.eswitch_neutron_agent:main