Merge "Add agent scheduling for LBaaS namespace agent"
This commit is contained in:
commit
5294b8a90c
@ -220,6 +220,8 @@ notification_topics = notifications
|
||||
# network_scheduler_driver = neutron.scheduler.dhcp_agent_scheduler.ChanceScheduler
|
||||
# Driver to use for scheduling router to a default L3 agent
|
||||
# router_scheduler_driver = neutron.scheduler.l3_agent_scheduler.ChanceScheduler
|
||||
# Driver to use for scheduling a loadbalancer pool to an lbaas agent
|
||||
# loadbalancer_pool_scheduler_driver = neutron.services.loadbalancer.agent_scheduler.ChanceScheduler
|
||||
|
||||
# Allow auto scheduling networks to DHCP agent. It will schedule non-hosted
|
||||
# networks to first DHCP agent which sends get_active_networks message to
|
||||
|
@ -79,6 +79,8 @@
|
||||
"get_l3-routers": "rule:admin_only",
|
||||
"get_dhcp-agents": "rule:admin_only",
|
||||
"get_l3-agents": "rule:admin_only",
|
||||
"get_loadbalancer-agent": "rule:admin_only",
|
||||
"get_loadbalancer-pools": "rule:admin_only",
|
||||
|
||||
"create_router": "rule:regular_user",
|
||||
"get_router": "rule:admin_or_owner",
|
||||
|
@ -67,6 +67,7 @@ AGENT_TYPE_OVS = 'Open vSwitch agent'
|
||||
AGENT_TYPE_LINUXBRIDGE = 'Linux bridge agent'
|
||||
AGENT_TYPE_NEC = 'NEC plugin agent'
|
||||
AGENT_TYPE_L3 = 'L3 agent'
|
||||
AGENT_TYPE_LOADBALANCER = 'Loadbalancer agent'
|
||||
L2_AGENT_TOPIC = 'N/A'
|
||||
|
||||
PAGINATION_INFINITE = 'infinite'
|
||||
@ -76,3 +77,4 @@ SORT_DIRECTION_DESC = 'desc'
|
||||
|
||||
L3_AGENT_SCHEDULER_EXT_ALIAS = 'l3_agent_scheduler'
|
||||
DHCP_AGENT_SCHEDULER_EXT_ALIAS = 'dhcp_agent_scheduler'
|
||||
LBAAS_AGENT_SCHEDULER_EXT_ALIAS = 'lbaas_agent_scheduler'
|
||||
|
@ -162,6 +162,9 @@ class AgentExtRpcCallback(object):
|
||||
RPC_API_VERSION = '1.0'
|
||||
START_TIME = timeutils.utcnow()
|
||||
|
||||
def __init__(self, plugin=None):
|
||||
self.plugin = plugin
|
||||
|
||||
def report_state(self, context, **kwargs):
|
||||
"""Report state from agent to server."""
|
||||
time = kwargs['time']
|
||||
@ -170,5 +173,6 @@ class AgentExtRpcCallback(object):
|
||||
LOG.debug(_("Message with invalid timestamp received"))
|
||||
return
|
||||
agent_state = kwargs['agent_state']['agent_state']
|
||||
plugin = manager.NeutronManager.get_plugin()
|
||||
plugin.create_or_update_agent(context, agent_state)
|
||||
if not self.plugin:
|
||||
self.plugin = manager.NeutronManager.get_plugin()
|
||||
self.plugin.create_or_update_agent(context, agent_state)
|
||||
|
@ -79,8 +79,13 @@ class RouterL3AgentBinding(model_base.BASEV2, models_v2.HasId):
|
||||
class AgentSchedulerDbMixin(agents_db.AgentDbMixin):
|
||||
"""Common class for agent scheduler mixins."""
|
||||
|
||||
dhcp_agent_notifier = None
|
||||
l3_agent_notifier = None
|
||||
# agent notifiers to handle agent update operations;
|
||||
# should be updated by plugins;
|
||||
agent_notifiers = {
|
||||
constants.AGENT_TYPE_DHCP: None,
|
||||
constants.AGENT_TYPE_L3: None,
|
||||
constants.AGENT_TYPE_LOADBALANCER: None,
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
def is_eligible_agent(active, agent):
|
||||
@ -100,18 +105,13 @@ class AgentSchedulerDbMixin(agents_db.AgentDbMixin):
|
||||
result = super(AgentSchedulerDbMixin, self).update_agent(
|
||||
context, id, agent)
|
||||
agent_data = agent['agent']
|
||||
if ('admin_state_up' in agent_data and
|
||||
agent_notifier = self.agent_notifiers.get(original_agent['agent_type'])
|
||||
if (agent_notifier and
|
||||
'admin_state_up' in agent_data and
|
||||
original_agent['admin_state_up'] != agent_data['admin_state_up']):
|
||||
if (original_agent['agent_type'] == constants.AGENT_TYPE_DHCP and
|
||||
self.dhcp_agent_notifier):
|
||||
self.dhcp_agent_notifier.agent_updated(
|
||||
context, agent_data['admin_state_up'],
|
||||
original_agent['host'])
|
||||
elif (original_agent['agent_type'] == constants.AGENT_TYPE_L3 and
|
||||
self.l3_agent_notifier):
|
||||
self.l3_agent_notifier.agent_updated(
|
||||
context, agent_data['admin_state_up'],
|
||||
original_agent['host'])
|
||||
agent_notifier.agent_updated(context,
|
||||
agent_data['admin_state_up'],
|
||||
original_agent['host'])
|
||||
return result
|
||||
|
||||
|
||||
@ -148,8 +148,9 @@ class L3AgentSchedulerDbMixin(l3agentscheduler.L3AgentSchedulerPluginBase,
|
||||
raise l3agentscheduler.RouterSchedulingFailed(
|
||||
router_id=router_id, agent_id=id)
|
||||
|
||||
if self.l3_agent_notifier:
|
||||
self.l3_agent_notifier.router_added_to_agent(
|
||||
l3_notifier = self.agent_notifiers.get(constants.AGENT_TYPE_L3)
|
||||
if l3_notifier:
|
||||
l3_notifier.router_added_to_agent(
|
||||
context, [router_id], agent_db.host)
|
||||
|
||||
def remove_router_from_l3_agent(self, context, id, router_id):
|
||||
@ -170,8 +171,9 @@ class L3AgentSchedulerDbMixin(l3agentscheduler.L3AgentSchedulerPluginBase,
|
||||
raise l3agentscheduler.RouterNotHostedByL3Agent(
|
||||
router_id=router_id, agent_id=id)
|
||||
context.session.delete(binding)
|
||||
if self.l3_agent_notifier:
|
||||
self.l3_agent_notifier.router_removed_from_agent(
|
||||
l3_notifier = self.agent_notifiers.get(constants.AGENT_TYPE_L3)
|
||||
if l3_notifier:
|
||||
l3_notifier.router_removed_from_agent(
|
||||
context, router_id, agent.host)
|
||||
|
||||
def list_routers_on_l3_agent(self, context, id):
|
||||
@ -356,8 +358,9 @@ class DhcpAgentSchedulerDbMixin(dhcpagentscheduler
|
||||
binding.dhcp_agent_id = id
|
||||
binding.network_id = network_id
|
||||
context.session.add(binding)
|
||||
if self.dhcp_agent_notifier:
|
||||
self.dhcp_agent_notifier.network_added_to_agent(
|
||||
dhcp_notifier = self.agent_notifiers.get(constants.AGENT_TYPE_DHCP)
|
||||
if dhcp_notifier:
|
||||
dhcp_notifier.network_added_to_agent(
|
||||
context, network_id, agent_db.host)
|
||||
|
||||
def remove_network_from_dhcp_agent(self, context, id, network_id):
|
||||
@ -372,8 +375,9 @@ class DhcpAgentSchedulerDbMixin(dhcpagentscheduler
|
||||
raise dhcpagentscheduler.NetworkNotHostedByDhcpAgent(
|
||||
network_id=network_id, agent_id=id)
|
||||
context.session.delete(binding)
|
||||
if self.dhcp_agent_notifier:
|
||||
self.dhcp_agent_notifier.network_removed_from_agent(
|
||||
dhcp_notifier = self.agent_notifiers.get(constants.AGENT_TYPE_DHCP)
|
||||
if dhcp_notifier:
|
||||
dhcp_notifier.network_removed_from_agent(
|
||||
context, network_id, agent.host)
|
||||
|
||||
def list_networks_on_dhcp_agent(self, context, id):
|
||||
|
@ -0,0 +1,53 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
#
|
||||
# Copyright 2013 OpenStack Foundation
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
#
|
||||
|
||||
"""LBaaS Pool scheduler
|
||||
|
||||
Revision ID: 52c5e4a18807
|
||||
Revises: 2032abe8edac
|
||||
Create Date: 2013-06-14 03:23:47.815865
|
||||
|
||||
"""
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = '52c5e4a18807'
|
||||
down_revision = '2032abe8edac'
|
||||
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
|
||||
|
||||
def upgrade(active_plugin=None, options=None):
|
||||
### commands auto generated by Alembic - please adjust! ###
|
||||
op.create_table(
|
||||
'poolloadbalanceragentbindings',
|
||||
sa.Column('pool_id', sa.String(length=36), nullable=False),
|
||||
sa.Column('loadbalancer_agent_id', sa.String(length=36),
|
||||
nullable=False),
|
||||
sa.ForeignKeyConstraint(['loadbalancer_agent_id'], ['agents.id'],
|
||||
ondelete='CASCADE'),
|
||||
sa.ForeignKeyConstraint(['pool_id'], ['pools.id'],
|
||||
ondelete='CASCADE'),
|
||||
sa.PrimaryKeyConstraint('pool_id')
|
||||
)
|
||||
### end Alembic commands ###
|
||||
|
||||
|
||||
def downgrade(active_plugin=None, options=None):
|
||||
### commands auto generated by Alembic - please adjust! ###
|
||||
op.drop_table('poolloadbalanceragentbindings')
|
||||
### end Alembic commands ###
|
138
neutron/extensions/lbaas_agentscheduler.py
Normal file
138
neutron/extensions/lbaas_agentscheduler.py
Normal file
@ -0,0 +1,138 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright (c) 2013 OpenStack Foundation.
|
||||
# All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from abc import abstractmethod
|
||||
|
||||
from neutron.api import extensions
|
||||
from neutron.api.v2 import base
|
||||
from neutron.api.v2 import resource
|
||||
from neutron.common import constants
|
||||
from neutron.extensions import agent
|
||||
from neutron import manager
|
||||
from neutron.plugins.common import constants as plugin_const
|
||||
from neutron import policy
|
||||
from neutron import wsgi
|
||||
|
||||
LOADBALANCER_POOL = 'loadbalancer-pool'
|
||||
LOADBALANCER_POOLS = LOADBALANCER_POOL + 's'
|
||||
LOADBALANCER_AGENT = 'loadbalancer-agent'
|
||||
|
||||
|
||||
class PoolSchedulerController(wsgi.Controller):
|
||||
def index(self, request, **kwargs):
|
||||
lbaas_plugin = manager.NeutronManager.get_service_plugins().get(
|
||||
plugin_const.LOADBALANCER)
|
||||
if not lbaas_plugin:
|
||||
return {'pools': []}
|
||||
|
||||
policy.enforce(request.context,
|
||||
"get_%s" % LOADBALANCER_POOLS,
|
||||
{},
|
||||
plugin=lbaas_plugin)
|
||||
return lbaas_plugin.list_pools_on_lbaas_agent(
|
||||
request.context, kwargs['agent_id'])
|
||||
|
||||
|
||||
class LbaasAgentHostingPoolController(wsgi.Controller):
|
||||
def index(self, request, **kwargs):
|
||||
lbaas_plugin = manager.NeutronManager.get_service_plugins().get(
|
||||
plugin_const.LOADBALANCER)
|
||||
if not lbaas_plugin:
|
||||
return
|
||||
|
||||
policy.enforce(request.context,
|
||||
"get_%s" % LOADBALANCER_AGENT,
|
||||
{},
|
||||
plugin=lbaas_plugin)
|
||||
return lbaas_plugin.get_lbaas_agent_hosting_pool(
|
||||
request.context, kwargs['pool_id'])
|
||||
|
||||
|
||||
class Lbaas_agentscheduler(extensions.ExtensionDescriptor):
|
||||
"""Extension class supporting l3 agent scheduler.
|
||||
"""
|
||||
|
||||
@classmethod
|
||||
def get_name(cls):
|
||||
return "Loadbalancer Agent Scheduler"
|
||||
|
||||
@classmethod
|
||||
def get_alias(cls):
|
||||
return constants.LBAAS_AGENT_SCHEDULER_EXT_ALIAS
|
||||
|
||||
@classmethod
|
||||
def get_description(cls):
|
||||
return "Schedule pools among lbaas agents"
|
||||
|
||||
@classmethod
|
||||
def get_namespace(cls):
|
||||
return "http://docs.openstack.org/ext/lbaas_agent_scheduler/api/v1.0"
|
||||
|
||||
@classmethod
|
||||
def get_updated(cls):
|
||||
return "2013-02-07T10:00:00-00:00"
|
||||
|
||||
@classmethod
|
||||
def get_resources(cls):
|
||||
"""Returns Ext Resources."""
|
||||
exts = []
|
||||
parent = dict(member_name="agent",
|
||||
collection_name="agents")
|
||||
|
||||
controller = resource.Resource(PoolSchedulerController(),
|
||||
base.FAULT_MAP)
|
||||
exts.append(extensions.ResourceExtension(
|
||||
LOADBALANCER_POOLS, controller, parent))
|
||||
|
||||
parent = dict(member_name="pool",
|
||||
collection_name="pools")
|
||||
|
||||
controller = resource.Resource(LbaasAgentHostingPoolController(),
|
||||
base.FAULT_MAP)
|
||||
exts.append(extensions.ResourceExtension(
|
||||
LOADBALANCER_AGENT, controller, parent,
|
||||
path_prefix=plugin_const.
|
||||
COMMON_PREFIXES[plugin_const.LOADBALANCER]))
|
||||
return exts
|
||||
|
||||
def get_extended_resources(self, version):
|
||||
return {}
|
||||
|
||||
|
||||
class NoEligibleLbaasAgent(agent.AgentNotFound):
|
||||
message = _("No eligible loadbalancer agent found "
|
||||
"for pool %(pool_id)s.")
|
||||
|
||||
|
||||
class NoActiveLbaasAgent(agent.AgentNotFound):
|
||||
message = _("No active loadbalancer agent found "
|
||||
"for pool %(pool_id)s.")
|
||||
|
||||
|
||||
class LbaasAgentSchedulerPluginBase(object):
|
||||
"""REST API to operate the lbaas agent scheduler.
|
||||
|
||||
All of method must be in an admin context.
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def list_pools_on_lbaas_agent(self, context, id):
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def get_lbaas_agent_hosting_pool(self, context, pool_id):
|
||||
pass
|
@ -177,6 +177,12 @@ class NeutronManager(object):
|
||||
|
||||
self.service_plugins[plugin_inst.get_plugin_type()] = plugin_inst
|
||||
|
||||
# search for possible agent notifiers declared in service plugin
|
||||
# (needed by agent management extension)
|
||||
if (hasattr(self.plugin, 'agent_notifiers') and
|
||||
hasattr(plugin_inst, 'agent_notifiers')):
|
||||
self.plugin.agent_notifiers.update(plugin_inst.agent_notifiers)
|
||||
|
||||
LOG.debug(_("Successfully loaded %(type)s plugin. "
|
||||
"Description: %(desc)s"),
|
||||
{"type": plugin_inst.get_plugin_type(),
|
||||
|
@ -30,6 +30,7 @@ from oslo.config import cfg
|
||||
from neutron.agent import securitygroups_rpc as sg_rpc
|
||||
from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api
|
||||
from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api
|
||||
from neutron.common import constants as q_const
|
||||
from neutron.common import rpc as q_rpc
|
||||
from neutron.common import topics
|
||||
from neutron.common import utils
|
||||
@ -254,8 +255,12 @@ class BrocadePluginV2(db_base_plugin_v2.NeutronDbPluginV2,
|
||||
# Consume from all consumers in a thread
|
||||
self.conn.consume_in_thread()
|
||||
self.notifier = AgentNotifierApi(topics.AGENT)
|
||||
self.dhcp_agent_notifier = dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
|
||||
self.l3_agent_notifier = l3_rpc_agent_api.L3AgentNotify
|
||||
self.agent_notifiers[q_const.AGENT_TYPE_DHCP] = (
|
||||
dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
|
||||
)
|
||||
self.agent_notifiers[q_const.AGENT_TYPE_L3] = (
|
||||
l3_rpc_agent_api.L3AgentNotify
|
||||
)
|
||||
|
||||
def create_network(self, context, network):
|
||||
"""Create network.
|
||||
|
@ -268,8 +268,12 @@ class LinuxBridgePluginV2(db_base_plugin_v2.NeutronDbPluginV2,
|
||||
# Consume from all consumers in a thread
|
||||
self.conn.consume_in_thread()
|
||||
self.notifier = AgentNotifierApi(topics.AGENT)
|
||||
self.dhcp_agent_notifier = dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
|
||||
self.l3_agent_notifier = l3_rpc_agent_api.L3AgentNotify
|
||||
self.agent_notifiers[q_const.AGENT_TYPE_DHCP] = (
|
||||
dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
|
||||
)
|
||||
self.agent_notifiers[q_const.AGENT_TYPE_L3] = (
|
||||
l3_rpc_agent_api.L3AgentNotify
|
||||
)
|
||||
|
||||
def _parse_network_vlan_ranges(self):
|
||||
try:
|
||||
|
@ -107,8 +107,12 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
|
||||
|
||||
def _setup_rpc(self):
|
||||
self.notifier = rpc.AgentNotifierApi(topics.AGENT)
|
||||
self.dhcp_agent_notifier = dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
|
||||
self.l3_agent_notifier = l3_rpc_agent_api.L3AgentNotify
|
||||
self.agent_notifiers[const.AGENT_TYPE_DHCP] = (
|
||||
dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
|
||||
)
|
||||
self.agent_notifiers[const.AGENT_TYPE_L3] = (
|
||||
l3_rpc_agent_api.L3AgentNotify
|
||||
)
|
||||
self.callbacks = rpc.RpcCallbacks(self.notifier, self.type_manager)
|
||||
self.topic = topics.PLUGIN
|
||||
self.conn = c_rpc.create_connection(new=True)
|
||||
|
@ -19,6 +19,7 @@
|
||||
from neutron.agent import securitygroups_rpc as sg_rpc
|
||||
from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api
|
||||
from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api
|
||||
from neutron.common import constants as q_const
|
||||
from neutron.common import exceptions as q_exc
|
||||
from neutron.common import rpc as q_rpc
|
||||
from neutron.common import topics
|
||||
@ -119,8 +120,12 @@ class NECPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
|
||||
self.topic = topics.PLUGIN
|
||||
self.conn = rpc.create_connection(new=True)
|
||||
self.notifier = NECPluginV2AgentNotifierApi(topics.AGENT)
|
||||
self.dhcp_agent_notifier = dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
|
||||
self.l3_agent_notifier = l3_rpc_agent_api.L3AgentNotify
|
||||
self.agent_notifiers[q_const.AGENT_TYPE_DHCP] = (
|
||||
dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
|
||||
)
|
||||
self.agent_notifiers[q_const.AGENT_TYPE_L3] = (
|
||||
l3_rpc_agent_api.L3AgentNotify
|
||||
)
|
||||
|
||||
# NOTE: callback_sg is referred to from the sg unit test.
|
||||
self.callback_sg = SecurityGroupServerRpcCallback()
|
||||
|
@ -819,7 +819,8 @@ class NvpPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
|
||||
self.dispatcher = NVPRpcCallbacks().create_rpc_dispatcher()
|
||||
self.conn.create_consumer(self.topic, self.dispatcher,
|
||||
fanout=False)
|
||||
self.dhcp_agent_notifier = dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
|
||||
self.agent_notifiers[constants.AGENT_TYPE_DHCP] = (
|
||||
dhcp_rpc_agent_api.DhcpAgentNotifyAPI())
|
||||
# Consume from all consumers in a thread
|
||||
self.conn.consume_in_thread()
|
||||
|
||||
|
@ -309,8 +309,12 @@ class OVSNeutronPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
|
||||
self.topic = topics.PLUGIN
|
||||
self.conn = rpc.create_connection(new=True)
|
||||
self.notifier = AgentNotifierApi(topics.AGENT)
|
||||
self.dhcp_agent_notifier = dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
|
||||
self.l3_agent_notifier = l3_rpc_agent_api.L3AgentNotify
|
||||
self.agent_notifiers[q_const.AGENT_TYPE_DHCP] = (
|
||||
dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
|
||||
)
|
||||
self.agent_notifiers[q_const.AGENT_TYPE_L3] = (
|
||||
l3_rpc_agent_api.L3AgentNotify
|
||||
)
|
||||
self.callbacks = OVSRpcCallbacks(self.notifier, self.tunnel_type)
|
||||
self.dispatcher = self.callbacks.create_rpc_dispatcher()
|
||||
self.conn.create_consumer(self.topic, self.dispatcher,
|
||||
|
114
neutron/services/loadbalancer/agent_scheduler.py
Normal file
114
neutron/services/loadbalancer/agent_scheduler.py
Normal file
@ -0,0 +1,114 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright (c) 2013 OpenStack Foundation.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import random
|
||||
|
||||
import sqlalchemy as sa
|
||||
from sqlalchemy import orm
|
||||
from sqlalchemy.orm import joinedload
|
||||
|
||||
from neutron.common import constants
|
||||
from neutron.db import agents_db
|
||||
from neutron.db import agentschedulers_db
|
||||
from neutron.db import model_base
|
||||
from neutron.extensions import lbaas_agentscheduler
|
||||
from neutron.openstack.common import log as logging
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class PoolLoadbalancerAgentBinding(model_base.BASEV2):
|
||||
"""Represents binding between neutron loadbalancer pools and agents."""
|
||||
|
||||
pool_id = sa.Column(sa.String(36),
|
||||
sa.ForeignKey("pools.id", ondelete='CASCADE'),
|
||||
primary_key=True)
|
||||
agent = orm.relation(agents_db.Agent)
|
||||
agent_id = sa.Column(sa.String(36), sa.ForeignKey("agents.id",
|
||||
ondelete='CASCADE'))
|
||||
|
||||
|
||||
class LbaasAgentSchedulerDbMixin(agentschedulers_db.AgentSchedulerDbMixin,
|
||||
lbaas_agentscheduler
|
||||
.LbaasAgentSchedulerPluginBase):
|
||||
|
||||
def get_lbaas_agent_hosting_pool(self, context, pool_id, active=None):
|
||||
query = context.session.query(PoolLoadbalancerAgentBinding)
|
||||
query = query.options(joinedload('agent'))
|
||||
binding = query.get(pool_id)
|
||||
|
||||
if (binding and self.is_eligible_agent(
|
||||
active, binding.agent)):
|
||||
return {'agent': self._make_agent_dict(binding.agent)}
|
||||
|
||||
def get_lbaas_agents(self, context, active=None, filters=None):
|
||||
query = context.session.query(agents_db.Agent)
|
||||
query = query.filter_by(agent_type=constants.AGENT_TYPE_LOADBALANCER)
|
||||
if active is not None:
|
||||
query = query.filter_by(admin_state_up=active)
|
||||
if filters:
|
||||
for key, value in filters.iteritems():
|
||||
column = getattr(agents_db.Agent, key, None)
|
||||
if column:
|
||||
query = query.filter(column.in_(value))
|
||||
|
||||
return [agent
|
||||
for agent in query
|
||||
if self.is_eligible_agent(active, agent)]
|
||||
|
||||
def list_pools_on_lbaas_agent(self, context, id):
|
||||
query = context.session.query(PoolLoadbalancerAgentBinding.pool_id)
|
||||
query = query.filter_by(agent_id=id)
|
||||
pool_ids = [item[0] for item in query]
|
||||
if pool_ids:
|
||||
return {'pools': self.get_pools(context, filters={'id': pool_ids})}
|
||||
else:
|
||||
return {'pools': []}
|
||||
|
||||
|
||||
class ChanceScheduler(object):
|
||||
"""Allocate a loadbalancer agent for a vip in a random way."""
|
||||
|
||||
def schedule(self, plugin, context, pool):
|
||||
"""Schedule the pool to an active loadbalancer agent if there
|
||||
is no enabled agent hosting it.
|
||||
"""
|
||||
with context.session.begin(subtransactions=True):
|
||||
lbaas_agent = plugin.get_lbaas_agent_hosting_pool(
|
||||
context, pool['id'])
|
||||
if lbaas_agent:
|
||||
LOG.debug(_('Pool %(pool_id)s has already been hosted'
|
||||
' by lbaas agent %(agent_id)s'),
|
||||
{'pool_id': pool['id'],
|
||||
'agent_id': lbaas_agent['id']})
|
||||
return
|
||||
|
||||
candidates = plugin.get_lbaas_agents(context, active=True)
|
||||
if not candidates:
|
||||
LOG.warn(_('No active lbaas agents for pool %s') % pool['id'])
|
||||
return
|
||||
|
||||
chosen_agent = random.choice(candidates)
|
||||
binding = PoolLoadbalancerAgentBinding()
|
||||
binding.agent = chosen_agent
|
||||
binding.pool_id = pool['id']
|
||||
context.session.add(binding)
|
||||
LOG.debug(_('Pool %(pool_id)s is scheduled to '
|
||||
'lbaas agent %(agent_id)s'),
|
||||
{'pool_id': pool['id'],
|
||||
'agent_id': chosen_agent['id']})
|
||||
return chosen_agent
|
@ -55,6 +55,7 @@ def main():
|
||||
cfg.CONF.register_opts(manager.OPTS)
|
||||
# import interface options just in case the driver uses namespaces
|
||||
cfg.CONF.register_opts(interface.OPTS)
|
||||
config.register_agent_state_opts_helper(cfg.CONF)
|
||||
config.register_root_helper(cfg.CONF)
|
||||
|
||||
cfg.CONF(project='neutron')
|
||||
|
@ -21,9 +21,12 @@ import weakref
|
||||
from oslo.config import cfg
|
||||
|
||||
from neutron.agent.common import config
|
||||
from neutron.agent import rpc as agent_rpc
|
||||
from neutron.common import constants
|
||||
from neutron import context
|
||||
from neutron.openstack.common import importutils
|
||||
from neutron.openstack.common import log as logging
|
||||
from neutron.openstack.common import loopingcall
|
||||
from neutron.openstack.common import periodic_task
|
||||
from neutron.services.loadbalancer.drivers.haproxy import (
|
||||
agent_api,
|
||||
@ -110,6 +113,12 @@ class LogicalDeviceCache(object):
|
||||
|
||||
|
||||
class LbaasAgentManager(periodic_task.PeriodicTasks):
|
||||
|
||||
# history
|
||||
# 1.0 Initial version
|
||||
# 1.1 Support agent_updated call
|
||||
RPC_API_VERSION = '1.1'
|
||||
|
||||
def __init__(self, conf):
|
||||
self.conf = conf
|
||||
try:
|
||||
@ -131,15 +140,46 @@ class LbaasAgentManager(periodic_task.PeriodicTasks):
|
||||
except ImportError:
|
||||
msg = _('Error importing loadbalancer device driver: %s')
|
||||
raise SystemExit(msg % conf.device_driver)
|
||||
ctx = context.get_admin_context_without_session()
|
||||
self.plugin_rpc = agent_api.LbaasAgentApi(
|
||||
plugin_driver.TOPIC_PROCESS_ON_HOST,
|
||||
ctx,
|
||||
conf.host
|
||||
)
|
||||
|
||||
self.agent_state = {
|
||||
'binary': 'neutron-loadbalancer-agent',
|
||||
'host': conf.host,
|
||||
'topic': plugin_driver.TOPIC_LOADBALANCER_AGENT,
|
||||
'configurations': {'device_driver': conf.device_driver,
|
||||
'interface_driver': conf.interface_driver},
|
||||
'agent_type': constants.AGENT_TYPE_LOADBALANCER,
|
||||
'start_flag': True}
|
||||
self.admin_state_up = True
|
||||
|
||||
self.context = context.get_admin_context_without_session()
|
||||
self._setup_rpc()
|
||||
self.needs_resync = False
|
||||
self.cache = LogicalDeviceCache()
|
||||
|
||||
def _setup_rpc(self):
|
||||
self.plugin_rpc = agent_api.LbaasAgentApi(
|
||||
plugin_driver.TOPIC_PROCESS_ON_HOST,
|
||||
self.context,
|
||||
self.conf.host
|
||||
)
|
||||
self.state_rpc = agent_rpc.PluginReportStateAPI(
|
||||
plugin_driver.TOPIC_PROCESS_ON_HOST)
|
||||
report_interval = self.conf.AGENT.report_interval
|
||||
if report_interval:
|
||||
heartbeat = loopingcall.FixedIntervalLoopingCall(
|
||||
self._report_state)
|
||||
heartbeat.start(interval=report_interval)
|
||||
|
||||
def _report_state(self):
|
||||
try:
|
||||
device_count = len(self.cache.devices)
|
||||
self.agent_state['configurations']['devices'] = device_count
|
||||
self.state_rpc.report_state(self.context,
|
||||
self.agent_state)
|
||||
self.agent_state.pop('start_flag', None)
|
||||
except Exception:
|
||||
LOG.exception("Failed reporting state!")
|
||||
|
||||
def initialize_service_hook(self, started_by):
|
||||
self.sync_state()
|
||||
|
||||
@ -228,3 +268,14 @@ class LbaasAgentManager(periodic_task.PeriodicTasks):
|
||||
"""Handle RPC cast from plugin to destroy a pool if known to agent."""
|
||||
if self.cache.get_by_pool_id(pool_id):
|
||||
self.destroy_device(pool_id)
|
||||
|
||||
def agent_updated(self, context, payload):
|
||||
"""Handle the agent_updated notification event."""
|
||||
if payload['admin_state_up'] != self.admin_state_up:
|
||||
self.admin_state_up = payload['admin_state_up']
|
||||
if self.admin_state_up:
|
||||
self.needs_resync = True
|
||||
else:
|
||||
for pool_id in self.cache.get_pool_ids():
|
||||
self.destroy_device(pool_id)
|
||||
LOG.info(_("agent_updated by server side %s!"), payload)
|
||||
|
@ -20,9 +20,13 @@ import uuid
|
||||
|
||||
from oslo.config import cfg
|
||||
|
||||
from neutron.common import constants as q_const
|
||||
from neutron.common import exceptions as q_exc
|
||||
from neutron.common import rpc as q_rpc
|
||||
from neutron.db import agents_db
|
||||
from neutron.db.loadbalancer import loadbalancer_db
|
||||
from neutron.extensions import lbaas_agentscheduler
|
||||
from neutron.openstack.common import importutils
|
||||
from neutron.openstack.common import log as logging
|
||||
from neutron.openstack.common import rpc
|
||||
from neutron.openstack.common.rpc import proxy
|
||||
@ -37,19 +41,31 @@ ACTIVE_PENDING = (
|
||||
constants.PENDING_UPDATE
|
||||
)
|
||||
|
||||
AGENT_SCHEDULER_OPTS = [
|
||||
cfg.StrOpt('loadbalancer_pool_scheduler_driver',
|
||||
default='neutron.services.loadbalancer.agent_scheduler'
|
||||
'.ChanceScheduler',
|
||||
help=_('Driver to use for scheduling '
|
||||
'pool to a default loadbalancer agent')),
|
||||
]
|
||||
|
||||
cfg.CONF.register_opts(AGENT_SCHEDULER_OPTS)
|
||||
|
||||
# topic name for this particular agent implementation
|
||||
TOPIC_PROCESS_ON_HOST = 'q-lbaas-process-on-host'
|
||||
TOPIC_LOADBALANCER_AGENT = 'lbaas_process_on_host_agent'
|
||||
|
||||
|
||||
class LoadBalancerCallbacks(object):
|
||||
|
||||
RPC_API_VERSION = '1.0'
|
||||
|
||||
def __init__(self, plugin):
|
||||
self.plugin = plugin
|
||||
|
||||
def create_rpc_dispatcher(self):
|
||||
return q_rpc.PluginRpcDispatcher([self])
|
||||
return q_rpc.PluginRpcDispatcher(
|
||||
[self, agents_db.AgentExtRpcCallback(self.plugin)])
|
||||
|
||||
def get_ready_devices(self, context, host=None):
|
||||
with context.session.begin(subtransactions=True):
|
||||
@ -61,6 +77,17 @@ class LoadBalancerCallbacks(object):
|
||||
up = True # makes pep8 and sqlalchemy happy
|
||||
qry = qry.filter(loadbalancer_db.Vip.admin_state_up == up)
|
||||
qry = qry.filter(loadbalancer_db.Pool.admin_state_up == up)
|
||||
agents = self.plugin.get_lbaas_agents(context,
|
||||
filters={'host': [host]})
|
||||
if not agents:
|
||||
return []
|
||||
elif len(agents) > 1:
|
||||
LOG.warning(_('Multiple lbaas agents found on host %s') % host)
|
||||
|
||||
pools = self.plugin.list_pools_on_lbaas_agent(context,
|
||||
agents[0].id)
|
||||
pool_ids = [pool['id'] for pool in pools['pools']]
|
||||
qry = qry.filter(loadbalancer_db.Pool.id.in_(pool_ids))
|
||||
return [id for id, in qry]
|
||||
|
||||
def get_logical_device(self, context, pool_id=None, activate=True,
|
||||
@ -185,40 +212,50 @@ class LoadBalancerCallbacks(object):
|
||||
class LoadBalancerAgentApi(proxy.RpcProxy):
|
||||
"""Plugin side of plugin to agent RPC API."""
|
||||
|
||||
API_VERSION = '1.0'
|
||||
BASE_RPC_API_VERSION = '1.0'
|
||||
# history
|
||||
# 1.0 Initial version
|
||||
# 1.1 Support agent_updated call
|
||||
|
||||
def __init__(self, topic, host):
|
||||
super(LoadBalancerAgentApi, self).__init__(topic, self.API_VERSION)
|
||||
self.host = host
|
||||
def __init__(self, topic):
|
||||
super(LoadBalancerAgentApi, self).__init__(
|
||||
topic, default_version=self.BASE_RPC_API_VERSION)
|
||||
|
||||
def reload_pool(self, context, pool_id):
|
||||
def reload_pool(self, context, pool_id, host):
|
||||
return self.cast(
|
||||
context,
|
||||
self.make_msg('reload_pool', pool_id=pool_id, host=self.host),
|
||||
topic=self.topic
|
||||
self.make_msg('reload_pool', pool_id=pool_id, host=host),
|
||||
topic='%s.%s' % (self.topic, host)
|
||||
)
|
||||
|
||||
def destroy_pool(self, context, pool_id):
|
||||
def destroy_pool(self, context, pool_id, host):
|
||||
return self.cast(
|
||||
context,
|
||||
self.make_msg('destroy_pool', pool_id=pool_id, host=self.host),
|
||||
topic=self.topic
|
||||
self.make_msg('destroy_pool', pool_id=pool_id, host=host),
|
||||
topic='%s.%s' % (self.topic, host)
|
||||
)
|
||||
|
||||
def modify_pool(self, context, pool_id):
|
||||
def modify_pool(self, context, pool_id, host):
|
||||
return self.cast(
|
||||
context,
|
||||
self.make_msg('modify_pool', pool_id=pool_id, host=self.host),
|
||||
topic=self.topic
|
||||
self.make_msg('modify_pool', pool_id=pool_id, host=host),
|
||||
topic='%s.%s' % (self.topic, host)
|
||||
)
|
||||
|
||||
def agent_updated(self, context, admin_state_up, host):
|
||||
return self.cast(
|
||||
context,
|
||||
self.make_msg('agent_updated',
|
||||
payload={'admin_state_up': admin_state_up}),
|
||||
topic='%s.%s' % (self.topic, host),
|
||||
version='1.1'
|
||||
)
|
||||
|
||||
|
||||
class HaproxyOnHostPluginDriver(abstract_driver.LoadBalancerAbstractDriver):
|
||||
|
||||
def __init__(self, plugin):
|
||||
self.agent_rpc = LoadBalancerAgentApi(
|
||||
TOPIC_LOADBALANCER_AGENT,
|
||||
cfg.CONF.host
|
||||
)
|
||||
self.agent_rpc = LoadBalancerAgentApi(TOPIC_LOADBALANCER_AGENT)
|
||||
self.callbacks = LoadBalancerCallbacks(plugin)
|
||||
|
||||
self.conn = rpc.create_connection(new=True)
|
||||
@ -228,56 +265,85 @@ class HaproxyOnHostPluginDriver(abstract_driver.LoadBalancerAbstractDriver):
|
||||
fanout=False)
|
||||
self.conn.consume_in_thread()
|
||||
self.plugin = plugin
|
||||
self.plugin.agent_notifiers.update(
|
||||
{q_const.AGENT_TYPE_LOADBALANCER: self.agent_rpc})
|
||||
|
||||
self.pool_scheduler = importutils.import_object(
|
||||
cfg.CONF.loadbalancer_pool_scheduler_driver)
|
||||
|
||||
def get_pool_agent(self, context, pool_id):
|
||||
agent = self.plugin.get_lbaas_agent_hosting_pool(context, pool_id)
|
||||
if not agent:
|
||||
raise lbaas_agentscheduler.NoActiveLbaasAgent(pool_id=pool_id)
|
||||
return agent['agent']
|
||||
|
||||
def create_vip(self, context, vip):
|
||||
self.agent_rpc.reload_pool(context, vip['pool_id'])
|
||||
agent = self.get_pool_agent(context, vip['pool_id'])
|
||||
self.agent_rpc.reload_pool(context, vip['pool_id'], agent['host'])
|
||||
|
||||
def update_vip(self, context, old_vip, vip):
|
||||
agent = self.get_pool_agent(context, vip['pool_id'])
|
||||
if vip['status'] in ACTIVE_PENDING:
|
||||
self.agent_rpc.reload_pool(context, vip['pool_id'])
|
||||
self.agent_rpc.reload_pool(context, vip['pool_id'], agent['host'])
|
||||
else:
|
||||
self.agent_rpc.destroy_pool(context, vip['pool_id'])
|
||||
self.agent_rpc.destroy_pool(context, vip['pool_id'], agent['host'])
|
||||
|
||||
def delete_vip(self, context, vip):
|
||||
self.plugin._delete_db_vip(context, vip['id'])
|
||||
self.agent_rpc.destroy_pool(context, vip['pool_id'])
|
||||
agent = self.get_pool_agent(context, vip['pool_id'])
|
||||
self.agent_rpc.destroy_pool(context, vip['pool_id'], agent['host'])
|
||||
|
||||
def create_pool(self, context, pool):
|
||||
if not self.pool_scheduler.schedule(self.plugin, context, pool):
|
||||
raise lbaas_agentscheduler.NoEligibleLbaasAgent(pool_id=pool['id'])
|
||||
# don't notify here because a pool needs a vip to be useful
|
||||
pass
|
||||
|
||||
def update_pool(self, context, old_pool, pool):
|
||||
agent = self.get_pool_agent(context, pool['id'])
|
||||
if pool['status'] in ACTIVE_PENDING:
|
||||
if pool['vip_id'] is not None:
|
||||
self.agent_rpc.reload_pool(context, pool['id'])
|
||||
self.agent_rpc.reload_pool(context, pool['id'], agent['host'])
|
||||
else:
|
||||
self.agent_rpc.destroy_pool(context, pool['id'])
|
||||
self.agent_rpc.destroy_pool(context, pool['id'], agent['host'])
|
||||
|
||||
def delete_pool(self, context, pool):
|
||||
agent = self.plugin.get_lbaas_agent_hosting_pool(context, pool['id'])
|
||||
if agent:
|
||||
self.agent_rpc.destroy_pool(context, pool['id'],
|
||||
agent['agent']['host'])
|
||||
self.plugin._delete_db_pool(context, pool['id'])
|
||||
self.agent_rpc.destroy_pool(context, pool['id'])
|
||||
|
||||
def create_member(self, context, member):
|
||||
self.agent_rpc.modify_pool(context, member['pool_id'])
|
||||
agent = self.get_pool_agent(context, member['pool_id'])
|
||||
self.agent_rpc.modify_pool(context, member['pool_id'], agent['host'])
|
||||
|
||||
def update_member(self, context, old_member, member):
|
||||
# member may change pool id
|
||||
if member['pool_id'] != old_member['pool_id']:
|
||||
self.agent_rpc.modify_pool(context, old_member['pool_id'])
|
||||
self.agent_rpc.modify_pool(context, member['pool_id'])
|
||||
agent = self.plugin.get_lbaas_agent_hosting_pool(
|
||||
context, old_member['pool_id'])
|
||||
if agent:
|
||||
self.agent_rpc.modify_pool(context,
|
||||
old_member['pool_id'],
|
||||
agent['agent']['host'])
|
||||
agent = self.get_pool_agent(context, member['pool_id'])
|
||||
self.agent_rpc.modify_pool(context, member['pool_id'], agent['host'])
|
||||
|
||||
def delete_member(self, context, member):
|
||||
self.plugin._delete_db_member(context, member['id'])
|
||||
self.agent_rpc.modify_pool(context, member['pool_id'])
|
||||
agent = self.get_pool_agent(context, member['pool_id'])
|
||||
self.agent_rpc.modify_pool(context, member['pool_id'], agent['host'])
|
||||
|
||||
def update_health_monitor(self, context, old_health_monitor,
|
||||
health_monitor, pool_id):
|
||||
# monitors are unused here because agent will fetch what is necessary
|
||||
self.agent_rpc.modify_pool(context, pool_id)
|
||||
agent = self.get_pool_agent(context, pool_id)
|
||||
self.agent_rpc.modify_pool(context, pool_id, agent['host'])
|
||||
|
||||
def create_pool_health_monitor(self, context, healthmon, pool_id):
|
||||
# healthmon is not used here
|
||||
self.agent_rpc.modify_pool(context, pool_id)
|
||||
agent = self.get_pool_agent(context, pool_id)
|
||||
self.agent_rpc.modify_pool(context, pool_id, agent['host'])
|
||||
|
||||
def delete_pool_health_monitor(self, context, health_monitor, pool_id):
|
||||
self.plugin._delete_db_pool_health_monitor(
|
||||
@ -285,7 +351,8 @@ class HaproxyOnHostPluginDriver(abstract_driver.LoadBalancerAbstractDriver):
|
||||
)
|
||||
|
||||
# healthmon_id is not used here
|
||||
self.agent_rpc.modify_pool(context, pool_id)
|
||||
agent = self.get_pool_agent(context, pool_id)
|
||||
self.agent_rpc.modify_pool(context, pool_id, agent['host'])
|
||||
|
||||
def create_health_monitor(self, context, health_monitor):
|
||||
pass
|
||||
|
@ -58,7 +58,7 @@ class NoopLbaaSDriver(abstract_driver.LoadBalancerAbstractDriver):
|
||||
|
||||
@log.log
|
||||
def delete_pool(self, context, pool):
|
||||
pass
|
||||
self.plugin._delete_db_pool(context, pool["id"])
|
||||
|
||||
@log.log
|
||||
def stats(self, context, pool_id):
|
||||
|
@ -23,6 +23,7 @@ from neutron.db.loadbalancer import loadbalancer_db
|
||||
from neutron.openstack.common import importutils
|
||||
from neutron.openstack.common import log as logging
|
||||
from neutron.plugins.common import constants
|
||||
from neutron.services.loadbalancer import agent_scheduler
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
@ -39,7 +40,8 @@ cfg.CONF.register_opts(lbaas_plugin_opts, "LBAAS")
|
||||
legacy.override_config(cfg.CONF, [('LBAAS', 'driver_fqn')])
|
||||
|
||||
|
||||
class LoadBalancerPlugin(loadbalancer_db.LoadBalancerPluginDb):
|
||||
class LoadBalancerPlugin(loadbalancer_db.LoadBalancerPluginDb,
|
||||
agent_scheduler.LbaasAgentSchedulerDbMixin):
|
||||
|
||||
"""Implementation of the Neutron Loadbalancer Service Plugin.
|
||||
|
||||
@ -47,7 +49,12 @@ class LoadBalancerPlugin(loadbalancer_db.LoadBalancerPluginDb):
|
||||
Most DB related works are implemented in class
|
||||
loadbalancer_db.LoadBalancerPluginDb.
|
||||
"""
|
||||
supported_extension_aliases = ["lbaas"]
|
||||
supported_extension_aliases = ["lbaas", "lbaas_agent_scheduler"]
|
||||
|
||||
# lbaas agent notifiers to handle agent update operations;
|
||||
# can be updated by plugin drivers while loading;
|
||||
# will be extracted by neutron manager when loading service plugins;
|
||||
agent_notifiers = {}
|
||||
|
||||
def __init__(self):
|
||||
"""Initialization for the loadbalancer service plugin."""
|
||||
@ -213,7 +220,7 @@ class LoadBalancerPlugin(loadbalancer_db.LoadBalancerPluginDb):
|
||||
# update the db and return the value from db
|
||||
# else - return what we have in db
|
||||
if stats_data:
|
||||
super(LoadBalancerPlugin, self)._update_pool_stats(
|
||||
super(LoadBalancerPlugin, self).update_pool_stats(
|
||||
context,
|
||||
pool_id,
|
||||
stats_data
|
||||
|
@ -16,14 +16,17 @@
|
||||
import contextlib
|
||||
import logging
|
||||
import os
|
||||
import testtools
|
||||
|
||||
import mock
|
||||
from oslo.config import cfg
|
||||
import testtools
|
||||
import webob.exc
|
||||
|
||||
from neutron.api.extensions import ExtensionMiddleware
|
||||
from neutron.api.extensions import PluginAwareExtensionManager
|
||||
from neutron.common import config
|
||||
from neutron import context
|
||||
import neutron.db.l3_db # noqa
|
||||
from neutron.db.loadbalancer import loadbalancer_db as ldb
|
||||
import neutron.extensions
|
||||
from neutron.extensions import loadbalancer
|
||||
@ -46,34 +49,19 @@ ETCDIR = os.path.join(ROOTDIR, 'etc')
|
||||
|
||||
extensions_path = ':'.join(neutron.extensions.__path__)
|
||||
|
||||
_subnet_id = "0c798ed8-33ba-11e2-8b28-000c291c4d14"
|
||||
|
||||
|
||||
def etcdir(*p):
|
||||
return os.path.join(ETCDIR, *p)
|
||||
|
||||
|
||||
class LoadBalancerPluginDbTestCase(test_db_plugin.NeutronDbPluginV2TestCase):
|
||||
class LoadBalancerTestMixin(object):
|
||||
resource_prefix_map = dict(
|
||||
(k, constants.COMMON_PREFIXES[constants.LOADBALANCER])
|
||||
for k in loadbalancer.RESOURCE_ATTRIBUTE_MAP.keys()
|
||||
)
|
||||
|
||||
def setUp(self, core_plugin=None, lb_plugin=None):
|
||||
service_plugins = {'lb_plugin_name': DB_LB_PLUGIN_KLASS}
|
||||
|
||||
super(LoadBalancerPluginDbTestCase, self).setUp(
|
||||
service_plugins=service_plugins
|
||||
)
|
||||
|
||||
self._subnet_id = "0c798ed8-33ba-11e2-8b28-000c291c4d14"
|
||||
|
||||
self.plugin = loadbalancer_plugin.LoadBalancerPlugin()
|
||||
ext_mgr = PluginAwareExtensionManager(
|
||||
extensions_path,
|
||||
{constants.LOADBALANCER: self.plugin}
|
||||
)
|
||||
app = config.load_paste_app('extensions_test_app')
|
||||
self.ext_api = ExtensionMiddleware(app, ext_mgr=ext_mgr)
|
||||
|
||||
def _create_vip(self, fmt, name, pool_id, protocol, protocol_port,
|
||||
admin_state_up, expected_res_status=None, **kwargs):
|
||||
data = {'vip': {'name': name,
|
||||
@ -97,7 +85,7 @@ class LoadBalancerPluginDbTestCase(test_db_plugin.NeutronDbPluginV2TestCase):
|
||||
def _create_pool(self, fmt, name, lb_method, protocol, admin_state_up,
|
||||
expected_res_status=None, **kwargs):
|
||||
data = {'pool': {'name': name,
|
||||
'subnet_id': self._subnet_id,
|
||||
'subnet_id': _subnet_id,
|
||||
'lb_method': lb_method,
|
||||
'protocol': protocol,
|
||||
'admin_state_up': admin_state_up,
|
||||
@ -151,12 +139,6 @@ class LoadBalancerPluginDbTestCase(test_db_plugin.NeutronDbPluginV2TestCase):
|
||||
|
||||
return res
|
||||
|
||||
def _api_for_resource(self, resource):
|
||||
if resource in ['networks', 'subnets', 'ports']:
|
||||
return self.api
|
||||
else:
|
||||
return self.ext_api
|
||||
|
||||
@contextlib.contextmanager
|
||||
def vip(self, fmt=None, name='vip1', pool=None, subnet=None,
|
||||
protocol='HTTP', protocol_port=80, admin_state_up=True,
|
||||
@ -270,7 +252,43 @@ class LoadBalancerPluginDbTestCase(test_db_plugin.NeutronDbPluginV2TestCase):
|
||||
self._delete('health_monitors', the_health_monitor['id'])
|
||||
|
||||
|
||||
class LoadBalancerPluginDbTestCase(LoadBalancerTestMixin,
|
||||
test_db_plugin.NeutronDbPluginV2TestCase):
|
||||
def setUp(self, core_plugin=None, lb_plugin=None):
|
||||
service_plugins = {'lb_plugin_name': DB_LB_PLUGIN_KLASS}
|
||||
super(LoadBalancerPluginDbTestCase, self).setUp(
|
||||
service_plugins=service_plugins
|
||||
)
|
||||
|
||||
self._subnet_id = _subnet_id
|
||||
|
||||
self.plugin = loadbalancer_plugin.LoadBalancerPlugin()
|
||||
|
||||
get_lbaas_agent_patcher = mock.patch(
|
||||
'neutron.services.loadbalancer.agent_scheduler'
|
||||
'.LbaasAgentSchedulerDbMixin.get_lbaas_agent_hosting_pool')
|
||||
mock_lbaas_agent = mock.MagicMock()
|
||||
get_lbaas_agent_patcher.start().return_value = mock_lbaas_agent
|
||||
mock_lbaas_agent.__getitem__.return_value = {'host': 'host'}
|
||||
self.addCleanup(mock.patch.stopall)
|
||||
|
||||
ext_mgr = PluginAwareExtensionManager(
|
||||
extensions_path,
|
||||
{constants.LOADBALANCER: self.plugin}
|
||||
)
|
||||
app = config.load_paste_app('extensions_test_app')
|
||||
self.ext_api = ExtensionMiddleware(app, ext_mgr=ext_mgr)
|
||||
|
||||
|
||||
class TestLoadBalancer(LoadBalancerPluginDbTestCase):
|
||||
def setUp(self):
|
||||
cfg.CONF.set_override('driver_fqn',
|
||||
'neutron.services.loadbalancer.drivers.noop'
|
||||
'.noop_driver.NoopLbaaSDriver',
|
||||
group='LBAAS')
|
||||
self.addCleanup(cfg.CONF.reset)
|
||||
super(TestLoadBalancer, self).setUp()
|
||||
|
||||
def test_create_vip(self, **extras):
|
||||
expected = {
|
||||
'name': 'vip1',
|
||||
|
@ -95,6 +95,7 @@ class DummyServicePlugin(ServicePluginBase):
|
||||
"""
|
||||
|
||||
supported_extension_aliases = ['dummy', servicetype.EXT_ALIAS]
|
||||
agent_notifiers = {'dummy': 'dummy_agent_notifier'}
|
||||
|
||||
def __init__(self):
|
||||
self.svctype_mgr = servicetype_db.ServiceTypeManager.get_instance()
|
||||
|
@ -1106,7 +1106,8 @@ class OvsL3AgentNotifierTestCase(test_l3_plugin.L3NatTestCaseMixin,
|
||||
|
||||
def test_router_add_to_l3_agent_notification(self):
|
||||
plugin = manager.NeutronManager.get_plugin()
|
||||
with mock.patch.object(plugin.l3_agent_notifier, 'cast') as mock_l3:
|
||||
l3_notifier = plugin.agent_notifiers[constants.AGENT_TYPE_L3]
|
||||
with mock.patch.object(l3_notifier, 'cast') as mock_l3:
|
||||
with self.router() as router1:
|
||||
self._register_agent_states()
|
||||
hosta_id = self._get_agent_id(constants.AGENT_TYPE_L3,
|
||||
@ -1116,14 +1117,15 @@ class OvsL3AgentNotifierTestCase(test_l3_plugin.L3NatTestCaseMixin,
|
||||
routers = [router1['router']['id']]
|
||||
mock_l3.assert_called_with(
|
||||
mock.ANY,
|
||||
plugin.l3_agent_notifier.make_msg(
|
||||
l3_notifier.make_msg(
|
||||
'router_added_to_agent',
|
||||
payload=routers),
|
||||
topic='l3_agent.hosta')
|
||||
|
||||
def test_router_remove_from_l3_agent_notification(self):
|
||||
plugin = manager.NeutronManager.get_plugin()
|
||||
with mock.patch.object(plugin.l3_agent_notifier, 'cast') as mock_l3:
|
||||
l3_notifier = plugin.agent_notifiers[constants.AGENT_TYPE_L3]
|
||||
with mock.patch.object(l3_notifier, 'cast') as mock_l3:
|
||||
with self.router() as router1:
|
||||
self._register_agent_states()
|
||||
hosta_id = self._get_agent_id(constants.AGENT_TYPE_L3,
|
||||
@ -1133,22 +1135,22 @@ class OvsL3AgentNotifierTestCase(test_l3_plugin.L3NatTestCaseMixin,
|
||||
self._remove_router_from_l3_agent(hosta_id,
|
||||
router1['router']['id'])
|
||||
mock_l3.assert_called_with(
|
||||
mock.ANY, plugin.l3_agent_notifier.make_msg(
|
||||
mock.ANY, l3_notifier.make_msg(
|
||||
'router_removed_from_agent',
|
||||
payload={'router_id': router1['router']['id']}),
|
||||
topic='l3_agent.hosta')
|
||||
|
||||
def test_agent_updated_l3_agent_notification(self):
|
||||
plugin = manager.NeutronManager.get_plugin()
|
||||
with mock.patch.object(plugin.l3_agent_notifier, 'cast') as mock_l3:
|
||||
l3_notifier = plugin.agent_notifiers[constants.AGENT_TYPE_L3]
|
||||
with mock.patch.object(l3_notifier, 'cast') as mock_l3:
|
||||
self._register_agent_states()
|
||||
hosta_id = self._get_agent_id(constants.AGENT_TYPE_L3,
|
||||
L3_HOSTA)
|
||||
self._disable_agent(hosta_id, admin_state_up=False)
|
||||
mock_l3.assert_called_with(
|
||||
mock.ANY, plugin.l3_agent_notifier.make_msg(
|
||||
'agent_updated',
|
||||
payload={'admin_state_up': False}),
|
||||
mock.ANY, l3_notifier.make_msg(
|
||||
'agent_updated', payload={'admin_state_up': False}),
|
||||
topic='l3_agent.hosta')
|
||||
|
||||
|
||||
|
@ -53,13 +53,30 @@ class TestLoadBalancerCallbacks(TestLoadBalancerPluginBase):
|
||||
self.callbacks = plugin_driver.LoadBalancerCallbacks(
|
||||
self.plugin_instance
|
||||
)
|
||||
get_lbaas_agents_patcher = mock.patch(
|
||||
'neutron.services.loadbalancer.agent_scheduler'
|
||||
'.LbaasAgentSchedulerDbMixin.get_lbaas_agents')
|
||||
get_lbaas_agents_patcher.start()
|
||||
|
||||
# mocking plugin_driver create_pool() as it does nothing more than
|
||||
# pool scheduling which is beyond the scope of this test case
|
||||
mock.patch('neutron.services.loadbalancer.drivers.haproxy'
|
||||
'.plugin_driver.HaproxyOnHostPluginDriver'
|
||||
'.create_pool').start()
|
||||
|
||||
self.addCleanup(mock.patch.stopall)
|
||||
|
||||
def test_get_ready_devices(self):
|
||||
with self.vip() as vip:
|
||||
ready = self.callbacks.get_ready_devices(
|
||||
context.get_admin_context(),
|
||||
)
|
||||
self.assertEqual(ready, [vip['vip']['pool_id']])
|
||||
with mock.patch('neutron.services.loadbalancer.agent_scheduler'
|
||||
'.LbaasAgentSchedulerDbMixin.'
|
||||
'list_pools_on_lbaas_agent') as mock_agent_pools:
|
||||
mock_agent_pools.return_value = {
|
||||
'pools': [{'id': vip['vip']['pool_id']}]}
|
||||
ready = self.callbacks.get_ready_devices(
|
||||
context.get_admin_context(),
|
||||
)
|
||||
self.assertEqual(ready, [vip['vip']['pool_id']])
|
||||
|
||||
def test_get_ready_devices_multiple_vips_and_pools(self):
|
||||
ctx = context.get_admin_context()
|
||||
@ -100,11 +117,17 @@ class TestLoadBalancerCallbacks(TestLoadBalancerPluginBase):
|
||||
|
||||
self.assertEqual(ctx.session.query(ldb.Pool).count(), 3)
|
||||
self.assertEqual(ctx.session.query(ldb.Vip).count(), 2)
|
||||
ready = self.callbacks.get_ready_devices(ctx)
|
||||
self.assertEqual(len(ready), 2)
|
||||
self.assertIn(pools[0].id, ready)
|
||||
self.assertIn(pools[1].id, ready)
|
||||
self.assertNotIn(pools[2].id, ready)
|
||||
with mock.patch('neutron.services.loadbalancer.agent_scheduler'
|
||||
'.LbaasAgentSchedulerDbMixin'
|
||||
'.list_pools_on_lbaas_agent') as mock_agent_pools:
|
||||
mock_agent_pools.return_value = {'pools': [{'id': pools[0].id},
|
||||
{'id': pools[1].id},
|
||||
{'id': pools[2].id}]}
|
||||
ready = self.callbacks.get_ready_devices(ctx)
|
||||
self.assertEqual(len(ready), 2)
|
||||
self.assertIn(pools[0].id, ready)
|
||||
self.assertIn(pools[1].id, ready)
|
||||
self.assertNotIn(pools[2].id, ready)
|
||||
# cleanup
|
||||
ctx.session.query(ldb.Pool).delete()
|
||||
ctx.session.query(ldb.Vip).delete()
|
||||
@ -119,11 +142,15 @@ class TestLoadBalancerCallbacks(TestLoadBalancerPluginBase):
|
||||
vip['vip']['id'],
|
||||
{'vip': {'status': constants.INACTIVE}}
|
||||
)
|
||||
|
||||
ready = self.callbacks.get_ready_devices(
|
||||
context.get_admin_context(),
|
||||
)
|
||||
self.assertFalse(ready)
|
||||
with mock.patch('neutron.services.loadbalancer.agent_scheduler'
|
||||
'.LbaasAgentSchedulerDbMixin.'
|
||||
'list_pools_on_lbaas_agent') as mock_agent_pools:
|
||||
mock_agent_pools.return_value = {
|
||||
'pools': [{'id': vip['vip']['pool_id']}]}
|
||||
ready = self.callbacks.get_ready_devices(
|
||||
context.get_admin_context(),
|
||||
)
|
||||
self.assertFalse(ready)
|
||||
|
||||
def test_get_ready_devices_inactive_pool(self):
|
||||
with self.vip() as vip:
|
||||
@ -135,11 +162,15 @@ class TestLoadBalancerCallbacks(TestLoadBalancerPluginBase):
|
||||
vip['vip']['pool_id'],
|
||||
{'pool': {'status': constants.INACTIVE}}
|
||||
)
|
||||
|
||||
ready = self.callbacks.get_ready_devices(
|
||||
context.get_admin_context(),
|
||||
)
|
||||
self.assertFalse(ready)
|
||||
with mock.patch('neutron.services.loadbalancer.agent_scheduler'
|
||||
'.LbaasAgentSchedulerDbMixin.'
|
||||
'list_pools_on_lbaas_agent') as mock_agent_pools:
|
||||
mock_agent_pools.return_value = {
|
||||
'pools': [{'id': vip['vip']['pool_id']}]}
|
||||
ready = self.callbacks.get_ready_devices(
|
||||
context.get_admin_context(),
|
||||
)
|
||||
self.assertFalse(ready)
|
||||
|
||||
def test_get_logical_device_inactive(self):
|
||||
with self.pool() as pool:
|
||||
@ -235,26 +266,26 @@ class TestLoadBalancerAgentApi(base.BaseTestCase):
|
||||
super(TestLoadBalancerAgentApi, self).setUp()
|
||||
self.addCleanup(mock.patch.stopall)
|
||||
|
||||
self.api = plugin_driver.LoadBalancerAgentApi('topic', 'host')
|
||||
self.api = plugin_driver.LoadBalancerAgentApi('topic')
|
||||
self.mock_cast = mock.patch.object(self.api, 'cast').start()
|
||||
self.mock_msg = mock.patch.object(self.api, 'make_msg').start()
|
||||
|
||||
def test_init(self):
|
||||
self.assertEqual(self.api.topic, 'topic')
|
||||
self.assertEqual(self.api.host, 'host')
|
||||
|
||||
def _call_test_helper(self, method_name):
|
||||
rv = getattr(self.api, method_name)(mock.sentinel.context, 'the_id')
|
||||
rv = getattr(self.api, method_name)(mock.sentinel.context, 'test',
|
||||
'host')
|
||||
self.assertEqual(rv, self.mock_cast.return_value)
|
||||
self.mock_cast.assert_called_once_with(
|
||||
mock.sentinel.context,
|
||||
self.mock_msg.return_value,
|
||||
topic='topic'
|
||||
topic='topic.host'
|
||||
)
|
||||
|
||||
self.mock_msg.assert_called_once_with(
|
||||
method_name,
|
||||
pool_id='the_id',
|
||||
pool_id='test',
|
||||
host='host'
|
||||
)
|
||||
|
||||
@ -267,6 +298,21 @@ class TestLoadBalancerAgentApi(base.BaseTestCase):
|
||||
def test_modify_pool(self):
|
||||
self._call_test_helper('modify_pool')
|
||||
|
||||
def test_agent_updated(self):
|
||||
rv = self.api.agent_updated(mock.sentinel.context, True, 'host')
|
||||
self.assertEqual(rv, self.mock_cast.return_value)
|
||||
self.mock_cast.assert_called_once_with(
|
||||
mock.sentinel.context,
|
||||
self.mock_msg.return_value,
|
||||
topic='topic.host',
|
||||
version='1.1'
|
||||
)
|
||||
|
||||
self.mock_msg.assert_called_once_with(
|
||||
'agent_updated',
|
||||
payload={'admin_state_up': True}
|
||||
)
|
||||
|
||||
|
||||
class TestLoadBalancerPluginNotificationWrapper(TestLoadBalancerPluginBase):
|
||||
def setUp(self):
|
||||
@ -276,6 +322,12 @@ class TestLoadBalancerPluginNotificationWrapper(TestLoadBalancerPluginBase):
|
||||
super(TestLoadBalancerPluginNotificationWrapper, self).setUp()
|
||||
self.mock_api = api_cls.return_value
|
||||
|
||||
# mocking plugin_driver create_pool() as it does nothing more than
|
||||
# pool scheduling which is beyond the scope of this test case
|
||||
mock.patch('neutron.services.loadbalancer.drivers.haproxy'
|
||||
'.plugin_driver.HaproxyOnHostPluginDriver'
|
||||
'.create_pool').start()
|
||||
|
||||
self.addCleanup(mock.patch.stopall)
|
||||
|
||||
def test_create_vip(self):
|
||||
@ -284,7 +336,8 @@ class TestLoadBalancerPluginNotificationWrapper(TestLoadBalancerPluginBase):
|
||||
with self.vip(pool=pool, subnet=subnet) as vip:
|
||||
self.mock_api.reload_pool.assert_called_once_with(
|
||||
mock.ANY,
|
||||
vip['vip']['pool_id']
|
||||
vip['vip']['pool_id'],
|
||||
'host'
|
||||
)
|
||||
|
||||
def test_update_vip(self):
|
||||
@ -302,7 +355,8 @@ class TestLoadBalancerPluginNotificationWrapper(TestLoadBalancerPluginBase):
|
||||
|
||||
self.mock_api.reload_pool.assert_called_once_with(
|
||||
mock.ANY,
|
||||
vip['vip']['pool_id']
|
||||
vip['vip']['pool_id'],
|
||||
'host'
|
||||
)
|
||||
|
||||
self.assertEqual(
|
||||
@ -319,7 +373,8 @@ class TestLoadBalancerPluginNotificationWrapper(TestLoadBalancerPluginBase):
|
||||
self.plugin_instance.delete_vip(ctx, vip['vip']['id'])
|
||||
self.mock_api.destroy_pool.assert_called_once_with(
|
||||
mock.ANY,
|
||||
vip['vip']['pool_id']
|
||||
vip['vip']['pool_id'],
|
||||
'host'
|
||||
)
|
||||
|
||||
def test_create_pool(self):
|
||||
@ -334,7 +389,7 @@ class TestLoadBalancerPluginNotificationWrapper(TestLoadBalancerPluginBase):
|
||||
ctx = context.get_admin_context()
|
||||
self.plugin_instance.update_pool(ctx, pool['pool']['id'], pool)
|
||||
self.mock_api.destroy_pool.assert_called_once_with(
|
||||
mock.ANY, pool['pool']['id'])
|
||||
mock.ANY, pool['pool']['id'], 'host')
|
||||
self.assertFalse(self.mock_api.reload_pool.called)
|
||||
self.assertFalse(self.mock_api.modify_pool.called)
|
||||
|
||||
@ -352,7 +407,7 @@ class TestLoadBalancerPluginNotificationWrapper(TestLoadBalancerPluginBase):
|
||||
ctx = context.get_admin_context()
|
||||
self.plugin_instance.update_pool(ctx, pool['pool']['id'], pool)
|
||||
self.mock_api.reload_pool.assert_called_once_with(
|
||||
mock.ANY, pool['pool']['id'])
|
||||
mock.ANY, pool['pool']['id'], 'host')
|
||||
self.assertFalse(self.mock_api.destroy_pool.called)
|
||||
self.assertFalse(self.mock_api.modify_pool.called)
|
||||
|
||||
@ -363,14 +418,14 @@ class TestLoadBalancerPluginNotificationWrapper(TestLoadBalancerPluginBase):
|
||||
res = req.get_response(self.ext_api)
|
||||
self.assertEqual(res.status_int, 204)
|
||||
self.mock_api.destroy_pool.assert_called_once_with(
|
||||
mock.ANY, pool['pool']['id'])
|
||||
mock.ANY, pool['pool']['id'], 'host')
|
||||
|
||||
def test_create_member(self):
|
||||
with self.pool() as pool:
|
||||
pool_id = pool['pool']['id']
|
||||
with self.member(pool_id=pool_id):
|
||||
self.mock_api.modify_pool.assert_called_once_with(
|
||||
mock.ANY, pool_id)
|
||||
mock.ANY, pool_id, 'host')
|
||||
|
||||
def test_update_member(self):
|
||||
with self.pool() as pool:
|
||||
@ -381,7 +436,7 @@ class TestLoadBalancerPluginNotificationWrapper(TestLoadBalancerPluginBase):
|
||||
self.plugin_instance.update_member(
|
||||
ctx, member['member']['id'], member)
|
||||
self.mock_api.modify_pool.assert_called_once_with(
|
||||
mock.ANY, pool_id)
|
||||
mock.ANY, pool_id, 'host')
|
||||
|
||||
def test_update_member_new_pool(self):
|
||||
with self.pool() as pool1:
|
||||
@ -397,8 +452,8 @@ class TestLoadBalancerPluginNotificationWrapper(TestLoadBalancerPluginBase):
|
||||
member)
|
||||
self.assertEqual(2, self.mock_api.modify_pool.call_count)
|
||||
self.mock_api.modify_pool.assert_has_calls(
|
||||
[mock.call(mock.ANY, pool1_id),
|
||||
mock.call(mock.ANY, pool2_id)])
|
||||
[mock.call(mock.ANY, pool1_id, 'host'),
|
||||
mock.call(mock.ANY, pool2_id, 'host')])
|
||||
|
||||
def test_delete_member(self):
|
||||
with self.pool() as pool:
|
||||
@ -411,7 +466,7 @@ class TestLoadBalancerPluginNotificationWrapper(TestLoadBalancerPluginBase):
|
||||
res = req.get_response(self.ext_api)
|
||||
self.assertEqual(res.status_int, 204)
|
||||
self.mock_api.modify_pool.assert_called_once_with(
|
||||
mock.ANY, pool_id)
|
||||
mock.ANY, pool_id, 'host')
|
||||
|
||||
def test_create_pool_health_monitor(self):
|
||||
with self.pool() as pool:
|
||||
@ -422,7 +477,7 @@ class TestLoadBalancerPluginNotificationWrapper(TestLoadBalancerPluginBase):
|
||||
hm,
|
||||
pool_id)
|
||||
self.mock_api.modify_pool.assert_called_once_with(
|
||||
mock.ANY, pool_id)
|
||||
mock.ANY, pool_id, 'host')
|
||||
|
||||
def test_delete_pool_health_monitor(self):
|
||||
with self.pool() as pool:
|
||||
@ -436,7 +491,7 @@ class TestLoadBalancerPluginNotificationWrapper(TestLoadBalancerPluginBase):
|
||||
self.plugin_instance.delete_pool_health_monitor(
|
||||
ctx, hm['health_monitor']['id'], pool_id)
|
||||
self.mock_api.modify_pool.assert_called_once_with(
|
||||
mock.ANY, pool_id)
|
||||
mock.ANY, pool_id, 'host')
|
||||
|
||||
def test_update_health_monitor_associated_with_pool(self):
|
||||
with self.health_monitor(type='HTTP') as monitor:
|
||||
@ -457,7 +512,8 @@ class TestLoadBalancerPluginNotificationWrapper(TestLoadBalancerPluginBase):
|
||||
self.assertEqual(res.status_int, 201)
|
||||
self.mock_api.modify_pool.assert_called_once_with(
|
||||
mock.ANY,
|
||||
pool['pool']['id']
|
||||
pool['pool']['id'],
|
||||
'host'
|
||||
)
|
||||
|
||||
self.mock_api.reset_mock()
|
||||
@ -471,5 +527,6 @@ class TestLoadBalancerPluginNotificationWrapper(TestLoadBalancerPluginBase):
|
||||
req.get_response(self.ext_api)
|
||||
self.mock_api.modify_pool.assert_called_once_with(
|
||||
mock.ANY,
|
||||
pool['pool']['id']
|
||||
pool['pool']['id'],
|
||||
'host'
|
||||
)
|
||||
|
200
neutron/tests/unit/services/loadbalancer/test_agent_scheduler.py
Normal file
200
neutron/tests/unit/services/loadbalancer/test_agent_scheduler.py
Normal file
@ -0,0 +1,200 @@
|
||||
# Copyright (c) 2013 OpenStack Foundation.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import mock
|
||||
from webob import exc
|
||||
|
||||
from neutron.api import extensions
|
||||
from neutron.api.v2 import attributes
|
||||
from neutron.common import constants
|
||||
from neutron import context
|
||||
from neutron.extensions import agent
|
||||
from neutron.extensions import lbaas_agentscheduler
|
||||
from neutron import manager
|
||||
from neutron.plugins.common import constants as plugin_const
|
||||
from neutron.tests.unit.db.loadbalancer import test_db_loadbalancer
|
||||
from neutron.tests.unit.openvswitch import test_agent_scheduler
|
||||
from neutron.tests.unit import test_agent_ext_plugin
|
||||
from neutron.tests.unit import test_db_plugin as test_plugin
|
||||
from neutron.tests.unit import test_extensions
|
||||
|
||||
LBAAS_HOSTA = 'hosta'
|
||||
|
||||
|
||||
class AgentSchedulerTestMixIn(test_agent_scheduler.AgentSchedulerTestMixIn):
|
||||
def _list_pools_hosted_by_lbaas_agent(self, agent_id,
|
||||
expected_code=exc.HTTPOk.code,
|
||||
admin_context=True):
|
||||
path = "/agents/%s/%s.%s" % (agent_id,
|
||||
lbaas_agentscheduler.LOADBALANCER_POOLS,
|
||||
self.fmt)
|
||||
return self._request_list(path, expected_code=expected_code,
|
||||
admin_context=admin_context)
|
||||
|
||||
def _get_lbaas_agent_hosting_pool(self, pool_id,
|
||||
expected_code=exc.HTTPOk.code,
|
||||
admin_context=True):
|
||||
path = "/lb/pools/%s/%s.%s" % (pool_id,
|
||||
lbaas_agentscheduler.LOADBALANCER_AGENT,
|
||||
self.fmt)
|
||||
return self._request_list(path, expected_code=expected_code,
|
||||
admin_context=admin_context)
|
||||
|
||||
|
||||
class LBaaSAgentSchedulerTestCase(test_agent_ext_plugin.AgentDBTestMixIn,
|
||||
AgentSchedulerTestMixIn,
|
||||
test_db_loadbalancer.LoadBalancerTestMixin,
|
||||
test_plugin.NeutronDbPluginV2TestCase):
|
||||
fmt = 'json'
|
||||
plugin_str = ('neutron.plugins.openvswitch.'
|
||||
'ovs_neutron_plugin.OVSNeutronPluginV2')
|
||||
|
||||
def setUp(self):
|
||||
# Save the global RESOURCE_ATTRIBUTE_MAP
|
||||
self.saved_attr_map = {}
|
||||
for resource, attrs in attributes.RESOURCE_ATTRIBUTE_MAP.iteritems():
|
||||
self.saved_attr_map[resource] = attrs.copy()
|
||||
service_plugins = {
|
||||
'lb_plugin_name': test_db_loadbalancer.DB_LB_PLUGIN_KLASS}
|
||||
super(LBaaSAgentSchedulerTestCase, self).setUp(
|
||||
self.plugin_str, service_plugins=service_plugins)
|
||||
ext_mgr = extensions.PluginAwareExtensionManager.get_instance()
|
||||
self.ext_api = test_extensions.setup_extensions_middleware(ext_mgr)
|
||||
self.adminContext = context.get_admin_context()
|
||||
# Add the resources to the global attribute map
|
||||
# This is done here as the setup process won't
|
||||
# initialize the main API router which extends
|
||||
# the global attribute map
|
||||
attributes.RESOURCE_ATTRIBUTE_MAP.update(
|
||||
agent.RESOURCE_ATTRIBUTE_MAP)
|
||||
self.addCleanup(self.restore_attribute_map)
|
||||
|
||||
def restore_attribute_map(self):
|
||||
# Restore the original RESOURCE_ATTRIBUTE_MAP
|
||||
attributes.RESOURCE_ATTRIBUTE_MAP = self.saved_attr_map
|
||||
|
||||
def test_report_states(self):
|
||||
self._register_agent_states(lbaas_agents=True)
|
||||
agents = self._list_agents()
|
||||
self.assertEqual(6, len(agents['agents']))
|
||||
|
||||
def test_pool_scheduling_on_pool_creation(self):
|
||||
self._register_agent_states(lbaas_agents=True)
|
||||
with self.pool() as pool:
|
||||
lbaas_agent = self._get_lbaas_agent_hosting_pool(
|
||||
pool['pool']['id'])
|
||||
self.assertIsNotNone(lbaas_agent)
|
||||
self.assertEqual(lbaas_agent['agent']['agent_type'],
|
||||
constants.AGENT_TYPE_LOADBALANCER)
|
||||
pools = self._list_pools_hosted_by_lbaas_agent(
|
||||
lbaas_agent['agent']['id'])
|
||||
self.assertEqual(1, len(pools['pools']))
|
||||
self.assertEqual(pool['pool'], pools['pools'][0])
|
||||
|
||||
def test_schedule_poll_with_disabled_agent(self):
|
||||
lbaas_hosta = {
|
||||
'binary': 'neutron-loadbalancer-agent',
|
||||
'host': LBAAS_HOSTA,
|
||||
'topic': 'LOADBALANCER_AGENT',
|
||||
'configurations': {'device_driver': 'device_driver',
|
||||
'interface_driver': 'interface_driver'},
|
||||
'agent_type': constants.AGENT_TYPE_LOADBALANCER}
|
||||
self._register_one_agent_state(lbaas_hosta)
|
||||
with self.pool() as pool:
|
||||
lbaas_agent = self._get_lbaas_agent_hosting_pool(
|
||||
pool['pool']['id'])
|
||||
self.assertIsNotNone(lbaas_agent)
|
||||
|
||||
agents = self._list_agents()
|
||||
self._disable_agent(agents['agents'][0]['id'])
|
||||
pool = {'pool': {'name': 'test',
|
||||
'subnet_id': 'test',
|
||||
'lb_method': 'ROUND_ROBIN',
|
||||
'protocol': 'HTTP',
|
||||
'admin_state_up': True,
|
||||
'tenant_id': 'test',
|
||||
'description': 'test'}}
|
||||
lbaas_plugin = manager.NeutronManager.get_service_plugins()[
|
||||
plugin_const.LOADBALANCER]
|
||||
self.assertRaises(lbaas_agentscheduler.NoEligibleLbaasAgent,
|
||||
lbaas_plugin.create_pool, self.adminContext, pool)
|
||||
|
||||
def test_schedule_poll_with_down_agent(self):
|
||||
lbaas_hosta = {
|
||||
'binary': 'neutron-loadbalancer-agent',
|
||||
'host': LBAAS_HOSTA,
|
||||
'topic': 'LOADBALANCER_AGENT',
|
||||
'configurations': {'device_driver': 'device_driver',
|
||||
'interface_driver': 'interface_driver'},
|
||||
'agent_type': constants.AGENT_TYPE_LOADBALANCER}
|
||||
self._register_one_agent_state(lbaas_hosta)
|
||||
is_agent_down_str = 'neutron.db.agents_db.AgentDbMixin.is_agent_down'
|
||||
with mock.patch(is_agent_down_str) as mock_is_agent_down:
|
||||
mock_is_agent_down.return_value = False
|
||||
with self.pool() as pool:
|
||||
lbaas_agent = self._get_lbaas_agent_hosting_pool(
|
||||
pool['pool']['id'])
|
||||
self.assertIsNotNone(lbaas_agent)
|
||||
with mock.patch(is_agent_down_str) as mock_is_agent_down:
|
||||
mock_is_agent_down.return_value = True
|
||||
pool = {'pool': {'name': 'test',
|
||||
'subnet_id': 'test',
|
||||
'lb_method': 'ROUND_ROBIN',
|
||||
'protocol': 'HTTP',
|
||||
'admin_state_up': True,
|
||||
'tenant_id': 'test',
|
||||
'description': 'test'}}
|
||||
lbaas_plugin = manager.NeutronManager.get_service_plugins()[
|
||||
plugin_const.LOADBALANCER]
|
||||
self.assertRaises(lbaas_agentscheduler.NoEligibleLbaasAgent,
|
||||
lbaas_plugin.create_pool,
|
||||
self.adminContext, pool)
|
||||
|
||||
def test_pool_unscheduling_on_pool_deletion(self):
|
||||
self._register_agent_states(lbaas_agents=True)
|
||||
with self.pool(no_delete=True) as pool:
|
||||
lbaas_agent = self._get_lbaas_agent_hosting_pool(
|
||||
pool['pool']['id'])
|
||||
self.assertIsNotNone(lbaas_agent)
|
||||
self.assertEqual(lbaas_agent['agent']['agent_type'],
|
||||
constants.AGENT_TYPE_LOADBALANCER)
|
||||
pools = self._list_pools_hosted_by_lbaas_agent(
|
||||
lbaas_agent['agent']['id'])
|
||||
self.assertEqual(1, len(pools['pools']))
|
||||
self.assertEqual(pool['pool'], pools['pools'][0])
|
||||
|
||||
req = self.new_delete_request('pools',
|
||||
pool['pool']['id'])
|
||||
res = req.get_response(self.ext_api)
|
||||
self.assertEqual(res.status_int, 204)
|
||||
pools = self._list_pools_hosted_by_lbaas_agent(
|
||||
lbaas_agent['agent']['id'])
|
||||
self.assertEqual(0, len(pools['pools']))
|
||||
|
||||
def test_pool_scheduling_non_admin_access(self):
|
||||
self._register_agent_states(lbaas_agents=True)
|
||||
with self.pool() as pool:
|
||||
self._get_lbaas_agent_hosting_pool(
|
||||
pool['pool']['id'],
|
||||
expected_code=exc.HTTPForbidden.code,
|
||||
admin_context=False)
|
||||
self._list_pools_hosted_by_lbaas_agent(
|
||||
'fake_id',
|
||||
expected_code=exc.HTTPForbidden.code,
|
||||
admin_context=False)
|
||||
|
||||
|
||||
class LBaaSAgentSchedulerTestCaseXML(LBaaSAgentSchedulerTestCase):
|
||||
fmt = 'xml'
|
@ -45,6 +45,8 @@ DHCP_HOSTA = 'hosta'
|
||||
L3_HOSTB = 'hostb'
|
||||
DHCP_HOSTC = 'hostc'
|
||||
DHCP_HOST1 = 'host1'
|
||||
LBAAS_HOSTA = 'hosta'
|
||||
LBAAS_HOSTB = 'hostb'
|
||||
|
||||
|
||||
class AgentTestExtensionManager(object):
|
||||
@ -83,7 +85,7 @@ class AgentDBTestMixIn(object):
|
||||
self.assertEqual(agent_res.status_int, expected_res_status)
|
||||
return agent_res
|
||||
|
||||
def _register_agent_states(self):
|
||||
def _register_agent_states(self, lbaas_agents=False):
|
||||
"""Register two L3 agents and two DHCP agents."""
|
||||
l3_hosta = {
|
||||
'binary': 'neutron-l3-agent',
|
||||
@ -110,6 +112,16 @@ class AgentDBTestMixIn(object):
|
||||
'agent_type': constants.AGENT_TYPE_DHCP}
|
||||
dhcp_hostc = copy.deepcopy(dhcp_hosta)
|
||||
dhcp_hostc['host'] = DHCP_HOSTC
|
||||
lbaas_hosta = {
|
||||
'binary': 'neutron-loadbalancer-agent',
|
||||
'host': LBAAS_HOSTA,
|
||||
'topic': 'LOADBALANCER_AGENT',
|
||||
'configurations': {'device_driver': 'device_driver',
|
||||
'interface_driver': 'interface_driver',
|
||||
},
|
||||
'agent_type': constants.AGENT_TYPE_LOADBALANCER}
|
||||
lbaas_hostb = copy.deepcopy(lbaas_hosta)
|
||||
lbaas_hostb['host'] = LBAAS_HOSTB
|
||||
callback = agents_db.AgentExtRpcCallback()
|
||||
callback.report_state(self.adminContext,
|
||||
agent_state={'agent_state': l3_hosta},
|
||||
@ -123,7 +135,18 @@ class AgentDBTestMixIn(object):
|
||||
callback.report_state(self.adminContext,
|
||||
agent_state={'agent_state': dhcp_hostc},
|
||||
time=timeutils.strtime())
|
||||
return [l3_hosta, l3_hostb, dhcp_hosta, dhcp_hostc]
|
||||
|
||||
res = [l3_hosta, l3_hostb, dhcp_hosta, dhcp_hostc]
|
||||
if lbaas_agents:
|
||||
callback.report_state(self.adminContext,
|
||||
agent_state={'agent_state': lbaas_hosta},
|
||||
time=timeutils.strtime())
|
||||
callback.report_state(self.adminContext,
|
||||
agent_state={'agent_state': lbaas_hostb},
|
||||
time=timeutils.strtime())
|
||||
res += [lbaas_hosta, lbaas_hostb]
|
||||
|
||||
return res
|
||||
|
||||
def _register_one_dhcp_agent(self):
|
||||
"""Register one DHCP agent."""
|
||||
|
@ -47,6 +47,11 @@ class MultiServiceCorePlugin(object):
|
||||
supported_extension_aliases = ['lbaas', 'dummy']
|
||||
|
||||
|
||||
class CorePluginWithAgentNotifiers(object):
|
||||
agent_notifiers = {'l3': 'l3_agent_notifier',
|
||||
'dhcp': 'dhcp_agent_notifier'}
|
||||
|
||||
|
||||
class NeutronManagerTestCase(base.BaseTestCase):
|
||||
|
||||
def setUp(self):
|
||||
@ -121,3 +126,16 @@ class NeutronManagerTestCase(base.BaseTestCase):
|
||||
self.assertIsNotNone(validate_pre_plugin_load())
|
||||
cfg.CONF.set_override('core_plugin', 'dummy.plugin')
|
||||
self.assertIsNone(validate_pre_plugin_load())
|
||||
|
||||
def test_manager_gathers_agent_notifiers_from_service_plugins(self):
|
||||
cfg.CONF.set_override("service_plugins",
|
||||
["neutron.tests.unit.dummy_plugin."
|
||||
"DummyServicePlugin"])
|
||||
cfg.CONF.set_override("core_plugin",
|
||||
"neutron.tests.unit.test_neutron_manager."
|
||||
"CorePluginWithAgentNotifiers")
|
||||
expected = {'l3': 'l3_agent_notifier',
|
||||
'dhcp': 'dhcp_agent_notifier',
|
||||
'dummy': 'dummy_agent_notifier'}
|
||||
core_plugin = NeutronManager.get_plugin()
|
||||
self.assertEqual(expected, core_plugin.agent_notifiers)
|
||||
|
Loading…
Reference in New Issue
Block a user