diff --git a/etc/neutron.conf b/etc/neutron.conf index 8968f52e8..a5d286fc5 100644 --- a/etc/neutron.conf +++ b/etc/neutron.conf @@ -220,6 +220,8 @@ notification_topics = notifications # network_scheduler_driver = neutron.scheduler.dhcp_agent_scheduler.ChanceScheduler # Driver to use for scheduling router to a default L3 agent # router_scheduler_driver = neutron.scheduler.l3_agent_scheduler.ChanceScheduler +# Driver to use for scheduling a loadbalancer pool to an lbaas agent +# loadbalancer_pool_scheduler_driver = neutron.services.loadbalancer.agent_scheduler.ChanceScheduler # Allow auto scheduling networks to DHCP agent. It will schedule non-hosted # networks to first DHCP agent which sends get_active_networks message to diff --git a/etc/policy.json b/etc/policy.json index b85384e99..2bf5aa6b9 100644 --- a/etc/policy.json +++ b/etc/policy.json @@ -79,6 +79,8 @@ "get_l3-routers": "rule:admin_only", "get_dhcp-agents": "rule:admin_only", "get_l3-agents": "rule:admin_only", + "get_loadbalancer-agent": "rule:admin_only", + "get_loadbalancer-pools": "rule:admin_only", "create_router": "rule:regular_user", "get_router": "rule:admin_or_owner", diff --git a/neutron/common/constants.py b/neutron/common/constants.py index a644022d4..3909044aa 100644 --- a/neutron/common/constants.py +++ b/neutron/common/constants.py @@ -67,6 +67,7 @@ AGENT_TYPE_OVS = 'Open vSwitch agent' AGENT_TYPE_LINUXBRIDGE = 'Linux bridge agent' AGENT_TYPE_NEC = 'NEC plugin agent' AGENT_TYPE_L3 = 'L3 agent' +AGENT_TYPE_LOADBALANCER = 'Loadbalancer agent' L2_AGENT_TOPIC = 'N/A' PAGINATION_INFINITE = 'infinite' @@ -76,3 +77,4 @@ SORT_DIRECTION_DESC = 'desc' L3_AGENT_SCHEDULER_EXT_ALIAS = 'l3_agent_scheduler' DHCP_AGENT_SCHEDULER_EXT_ALIAS = 'dhcp_agent_scheduler' +LBAAS_AGENT_SCHEDULER_EXT_ALIAS = 'lbaas_agent_scheduler' diff --git a/neutron/db/agents_db.py b/neutron/db/agents_db.py index b4a8b2165..3e4ce5ea2 100644 --- a/neutron/db/agents_db.py +++ b/neutron/db/agents_db.py @@ -162,6 +162,9 @@ class AgentExtRpcCallback(object): RPC_API_VERSION = '1.0' START_TIME = timeutils.utcnow() + def __init__(self, plugin=None): + self.plugin = plugin + def report_state(self, context, **kwargs): """Report state from agent to server.""" time = kwargs['time'] @@ -170,5 +173,6 @@ class AgentExtRpcCallback(object): LOG.debug(_("Message with invalid timestamp received")) return agent_state = kwargs['agent_state']['agent_state'] - plugin = manager.NeutronManager.get_plugin() - plugin.create_or_update_agent(context, agent_state) + if not self.plugin: + self.plugin = manager.NeutronManager.get_plugin() + self.plugin.create_or_update_agent(context, agent_state) diff --git a/neutron/db/agentschedulers_db.py b/neutron/db/agentschedulers_db.py index 49d6f3832..d44b4b509 100644 --- a/neutron/db/agentschedulers_db.py +++ b/neutron/db/agentschedulers_db.py @@ -79,8 +79,13 @@ class RouterL3AgentBinding(model_base.BASEV2, models_v2.HasId): class AgentSchedulerDbMixin(agents_db.AgentDbMixin): """Common class for agent scheduler mixins.""" - dhcp_agent_notifier = None - l3_agent_notifier = None + # agent notifiers to handle agent update operations; + # should be updated by plugins; + agent_notifiers = { + constants.AGENT_TYPE_DHCP: None, + constants.AGENT_TYPE_L3: None, + constants.AGENT_TYPE_LOADBALANCER: None, + } @staticmethod def is_eligible_agent(active, agent): @@ -100,18 +105,13 @@ class AgentSchedulerDbMixin(agents_db.AgentDbMixin): result = super(AgentSchedulerDbMixin, self).update_agent( context, id, agent) agent_data = agent['agent'] - if ('admin_state_up' in agent_data and + agent_notifier = self.agent_notifiers.get(original_agent['agent_type']) + if (agent_notifier and + 'admin_state_up' in agent_data and original_agent['admin_state_up'] != agent_data['admin_state_up']): - if (original_agent['agent_type'] == constants.AGENT_TYPE_DHCP and - self.dhcp_agent_notifier): - self.dhcp_agent_notifier.agent_updated( - context, agent_data['admin_state_up'], - original_agent['host']) - elif (original_agent['agent_type'] == constants.AGENT_TYPE_L3 and - self.l3_agent_notifier): - self.l3_agent_notifier.agent_updated( - context, agent_data['admin_state_up'], - original_agent['host']) + agent_notifier.agent_updated(context, + agent_data['admin_state_up'], + original_agent['host']) return result @@ -148,8 +148,9 @@ class L3AgentSchedulerDbMixin(l3agentscheduler.L3AgentSchedulerPluginBase, raise l3agentscheduler.RouterSchedulingFailed( router_id=router_id, agent_id=id) - if self.l3_agent_notifier: - self.l3_agent_notifier.router_added_to_agent( + l3_notifier = self.agent_notifiers.get(constants.AGENT_TYPE_L3) + if l3_notifier: + l3_notifier.router_added_to_agent( context, [router_id], agent_db.host) def remove_router_from_l3_agent(self, context, id, router_id): @@ -170,8 +171,9 @@ class L3AgentSchedulerDbMixin(l3agentscheduler.L3AgentSchedulerPluginBase, raise l3agentscheduler.RouterNotHostedByL3Agent( router_id=router_id, agent_id=id) context.session.delete(binding) - if self.l3_agent_notifier: - self.l3_agent_notifier.router_removed_from_agent( + l3_notifier = self.agent_notifiers.get(constants.AGENT_TYPE_L3) + if l3_notifier: + l3_notifier.router_removed_from_agent( context, router_id, agent.host) def list_routers_on_l3_agent(self, context, id): @@ -356,8 +358,9 @@ class DhcpAgentSchedulerDbMixin(dhcpagentscheduler binding.dhcp_agent_id = id binding.network_id = network_id context.session.add(binding) - if self.dhcp_agent_notifier: - self.dhcp_agent_notifier.network_added_to_agent( + dhcp_notifier = self.agent_notifiers.get(constants.AGENT_TYPE_DHCP) + if dhcp_notifier: + dhcp_notifier.network_added_to_agent( context, network_id, agent_db.host) def remove_network_from_dhcp_agent(self, context, id, network_id): @@ -372,8 +375,9 @@ class DhcpAgentSchedulerDbMixin(dhcpagentscheduler raise dhcpagentscheduler.NetworkNotHostedByDhcpAgent( network_id=network_id, agent_id=id) context.session.delete(binding) - if self.dhcp_agent_notifier: - self.dhcp_agent_notifier.network_removed_from_agent( + dhcp_notifier = self.agent_notifiers.get(constants.AGENT_TYPE_DHCP) + if dhcp_notifier: + dhcp_notifier.network_removed_from_agent( context, network_id, agent.host) def list_networks_on_dhcp_agent(self, context, id): diff --git a/neutron/db/migration/alembic_migrations/versions/52c5e4a18807_lbaas_pool_scheduler.py b/neutron/db/migration/alembic_migrations/versions/52c5e4a18807_lbaas_pool_scheduler.py new file mode 100644 index 000000000..b0f12e9ef --- /dev/null +++ b/neutron/db/migration/alembic_migrations/versions/52c5e4a18807_lbaas_pool_scheduler.py @@ -0,0 +1,53 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# +# Copyright 2013 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +"""LBaaS Pool scheduler + +Revision ID: 52c5e4a18807 +Revises: 2032abe8edac +Create Date: 2013-06-14 03:23:47.815865 + +""" + +# revision identifiers, used by Alembic. +revision = '52c5e4a18807' +down_revision = '2032abe8edac' + +from alembic import op +import sqlalchemy as sa + + +def upgrade(active_plugin=None, options=None): + ### commands auto generated by Alembic - please adjust! ### + op.create_table( + 'poolloadbalanceragentbindings', + sa.Column('pool_id', sa.String(length=36), nullable=False), + sa.Column('loadbalancer_agent_id', sa.String(length=36), + nullable=False), + sa.ForeignKeyConstraint(['loadbalancer_agent_id'], ['agents.id'], + ondelete='CASCADE'), + sa.ForeignKeyConstraint(['pool_id'], ['pools.id'], + ondelete='CASCADE'), + sa.PrimaryKeyConstraint('pool_id') + ) + ### end Alembic commands ### + + +def downgrade(active_plugin=None, options=None): + ### commands auto generated by Alembic - please adjust! ### + op.drop_table('poolloadbalanceragentbindings') + ### end Alembic commands ### diff --git a/neutron/extensions/lbaas_agentscheduler.py b/neutron/extensions/lbaas_agentscheduler.py new file mode 100644 index 000000000..5d2e64c48 --- /dev/null +++ b/neutron/extensions/lbaas_agentscheduler.py @@ -0,0 +1,138 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (c) 2013 OpenStack Foundation. +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from abc import abstractmethod + +from neutron.api import extensions +from neutron.api.v2 import base +from neutron.api.v2 import resource +from neutron.common import constants +from neutron.extensions import agent +from neutron import manager +from neutron.plugins.common import constants as plugin_const +from neutron import policy +from neutron import wsgi + +LOADBALANCER_POOL = 'loadbalancer-pool' +LOADBALANCER_POOLS = LOADBALANCER_POOL + 's' +LOADBALANCER_AGENT = 'loadbalancer-agent' + + +class PoolSchedulerController(wsgi.Controller): + def index(self, request, **kwargs): + lbaas_plugin = manager.NeutronManager.get_service_plugins().get( + plugin_const.LOADBALANCER) + if not lbaas_plugin: + return {'pools': []} + + policy.enforce(request.context, + "get_%s" % LOADBALANCER_POOLS, + {}, + plugin=lbaas_plugin) + return lbaas_plugin.list_pools_on_lbaas_agent( + request.context, kwargs['agent_id']) + + +class LbaasAgentHostingPoolController(wsgi.Controller): + def index(self, request, **kwargs): + lbaas_plugin = manager.NeutronManager.get_service_plugins().get( + plugin_const.LOADBALANCER) + if not lbaas_plugin: + return + + policy.enforce(request.context, + "get_%s" % LOADBALANCER_AGENT, + {}, + plugin=lbaas_plugin) + return lbaas_plugin.get_lbaas_agent_hosting_pool( + request.context, kwargs['pool_id']) + + +class Lbaas_agentscheduler(extensions.ExtensionDescriptor): + """Extension class supporting l3 agent scheduler. + """ + + @classmethod + def get_name(cls): + return "Loadbalancer Agent Scheduler" + + @classmethod + def get_alias(cls): + return constants.LBAAS_AGENT_SCHEDULER_EXT_ALIAS + + @classmethod + def get_description(cls): + return "Schedule pools among lbaas agents" + + @classmethod + def get_namespace(cls): + return "http://docs.openstack.org/ext/lbaas_agent_scheduler/api/v1.0" + + @classmethod + def get_updated(cls): + return "2013-02-07T10:00:00-00:00" + + @classmethod + def get_resources(cls): + """Returns Ext Resources.""" + exts = [] + parent = dict(member_name="agent", + collection_name="agents") + + controller = resource.Resource(PoolSchedulerController(), + base.FAULT_MAP) + exts.append(extensions.ResourceExtension( + LOADBALANCER_POOLS, controller, parent)) + + parent = dict(member_name="pool", + collection_name="pools") + + controller = resource.Resource(LbaasAgentHostingPoolController(), + base.FAULT_MAP) + exts.append(extensions.ResourceExtension( + LOADBALANCER_AGENT, controller, parent, + path_prefix=plugin_const. + COMMON_PREFIXES[plugin_const.LOADBALANCER])) + return exts + + def get_extended_resources(self, version): + return {} + + +class NoEligibleLbaasAgent(agent.AgentNotFound): + message = _("No eligible loadbalancer agent found " + "for pool %(pool_id)s.") + + +class NoActiveLbaasAgent(agent.AgentNotFound): + message = _("No active loadbalancer agent found " + "for pool %(pool_id)s.") + + +class LbaasAgentSchedulerPluginBase(object): + """REST API to operate the lbaas agent scheduler. + + All of method must be in an admin context. + """ + + @abstractmethod + def list_pools_on_lbaas_agent(self, context, id): + pass + + @abstractmethod + def get_lbaas_agent_hosting_pool(self, context, pool_id): + pass diff --git a/neutron/manager.py b/neutron/manager.py index bf5471633..bc7146419 100644 --- a/neutron/manager.py +++ b/neutron/manager.py @@ -177,6 +177,12 @@ class NeutronManager(object): self.service_plugins[plugin_inst.get_plugin_type()] = plugin_inst + # search for possible agent notifiers declared in service plugin + # (needed by agent management extension) + if (hasattr(self.plugin, 'agent_notifiers') and + hasattr(plugin_inst, 'agent_notifiers')): + self.plugin.agent_notifiers.update(plugin_inst.agent_notifiers) + LOG.debug(_("Successfully loaded %(type)s plugin. " "Description: %(desc)s"), {"type": plugin_inst.get_plugin_type(), diff --git a/neutron/plugins/brocade/NeutronPlugin.py b/neutron/plugins/brocade/NeutronPlugin.py index f809a5132..2a676e918 100644 --- a/neutron/plugins/brocade/NeutronPlugin.py +++ b/neutron/plugins/brocade/NeutronPlugin.py @@ -30,6 +30,7 @@ from oslo.config import cfg from neutron.agent import securitygroups_rpc as sg_rpc from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api +from neutron.common import constants as q_const from neutron.common import rpc as q_rpc from neutron.common import topics from neutron.common import utils @@ -254,8 +255,12 @@ class BrocadePluginV2(db_base_plugin_v2.NeutronDbPluginV2, # Consume from all consumers in a thread self.conn.consume_in_thread() self.notifier = AgentNotifierApi(topics.AGENT) - self.dhcp_agent_notifier = dhcp_rpc_agent_api.DhcpAgentNotifyAPI() - self.l3_agent_notifier = l3_rpc_agent_api.L3AgentNotify + self.agent_notifiers[q_const.AGENT_TYPE_DHCP] = ( + dhcp_rpc_agent_api.DhcpAgentNotifyAPI() + ) + self.agent_notifiers[q_const.AGENT_TYPE_L3] = ( + l3_rpc_agent_api.L3AgentNotify + ) def create_network(self, context, network): """Create network. diff --git a/neutron/plugins/linuxbridge/lb_neutron_plugin.py b/neutron/plugins/linuxbridge/lb_neutron_plugin.py index 1f6383fc3..367d5d45d 100644 --- a/neutron/plugins/linuxbridge/lb_neutron_plugin.py +++ b/neutron/plugins/linuxbridge/lb_neutron_plugin.py @@ -268,8 +268,12 @@ class LinuxBridgePluginV2(db_base_plugin_v2.NeutronDbPluginV2, # Consume from all consumers in a thread self.conn.consume_in_thread() self.notifier = AgentNotifierApi(topics.AGENT) - self.dhcp_agent_notifier = dhcp_rpc_agent_api.DhcpAgentNotifyAPI() - self.l3_agent_notifier = l3_rpc_agent_api.L3AgentNotify + self.agent_notifiers[q_const.AGENT_TYPE_DHCP] = ( + dhcp_rpc_agent_api.DhcpAgentNotifyAPI() + ) + self.agent_notifiers[q_const.AGENT_TYPE_L3] = ( + l3_rpc_agent_api.L3AgentNotify + ) def _parse_network_vlan_ranges(self): try: diff --git a/neutron/plugins/ml2/plugin.py b/neutron/plugins/ml2/plugin.py index 45236b80f..891386481 100644 --- a/neutron/plugins/ml2/plugin.py +++ b/neutron/plugins/ml2/plugin.py @@ -107,8 +107,12 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, def _setup_rpc(self): self.notifier = rpc.AgentNotifierApi(topics.AGENT) - self.dhcp_agent_notifier = dhcp_rpc_agent_api.DhcpAgentNotifyAPI() - self.l3_agent_notifier = l3_rpc_agent_api.L3AgentNotify + self.agent_notifiers[const.AGENT_TYPE_DHCP] = ( + dhcp_rpc_agent_api.DhcpAgentNotifyAPI() + ) + self.agent_notifiers[const.AGENT_TYPE_L3] = ( + l3_rpc_agent_api.L3AgentNotify + ) self.callbacks = rpc.RpcCallbacks(self.notifier, self.type_manager) self.topic = topics.PLUGIN self.conn = c_rpc.create_connection(new=True) diff --git a/neutron/plugins/nec/nec_plugin.py b/neutron/plugins/nec/nec_plugin.py index 08b74403e..17c0e62a8 100644 --- a/neutron/plugins/nec/nec_plugin.py +++ b/neutron/plugins/nec/nec_plugin.py @@ -19,6 +19,7 @@ from neutron.agent import securitygroups_rpc as sg_rpc from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api +from neutron.common import constants as q_const from neutron.common import exceptions as q_exc from neutron.common import rpc as q_rpc from neutron.common import topics @@ -119,8 +120,12 @@ class NECPluginV2(db_base_plugin_v2.NeutronDbPluginV2, self.topic = topics.PLUGIN self.conn = rpc.create_connection(new=True) self.notifier = NECPluginV2AgentNotifierApi(topics.AGENT) - self.dhcp_agent_notifier = dhcp_rpc_agent_api.DhcpAgentNotifyAPI() - self.l3_agent_notifier = l3_rpc_agent_api.L3AgentNotify + self.agent_notifiers[q_const.AGENT_TYPE_DHCP] = ( + dhcp_rpc_agent_api.DhcpAgentNotifyAPI() + ) + self.agent_notifiers[q_const.AGENT_TYPE_L3] = ( + l3_rpc_agent_api.L3AgentNotify + ) # NOTE: callback_sg is referred to from the sg unit test. self.callback_sg = SecurityGroupServerRpcCallback() diff --git a/neutron/plugins/nicira/NeutronPlugin.py b/neutron/plugins/nicira/NeutronPlugin.py index 0647a16fa..b5f50c3d2 100644 --- a/neutron/plugins/nicira/NeutronPlugin.py +++ b/neutron/plugins/nicira/NeutronPlugin.py @@ -819,7 +819,8 @@ class NvpPluginV2(db_base_plugin_v2.NeutronDbPluginV2, self.dispatcher = NVPRpcCallbacks().create_rpc_dispatcher() self.conn.create_consumer(self.topic, self.dispatcher, fanout=False) - self.dhcp_agent_notifier = dhcp_rpc_agent_api.DhcpAgentNotifyAPI() + self.agent_notifiers[constants.AGENT_TYPE_DHCP] = ( + dhcp_rpc_agent_api.DhcpAgentNotifyAPI()) # Consume from all consumers in a thread self.conn.consume_in_thread() diff --git a/neutron/plugins/openvswitch/ovs_neutron_plugin.py b/neutron/plugins/openvswitch/ovs_neutron_plugin.py index 8a50b2ff9..bb29c9114 100644 --- a/neutron/plugins/openvswitch/ovs_neutron_plugin.py +++ b/neutron/plugins/openvswitch/ovs_neutron_plugin.py @@ -309,8 +309,12 @@ class OVSNeutronPluginV2(db_base_plugin_v2.NeutronDbPluginV2, self.topic = topics.PLUGIN self.conn = rpc.create_connection(new=True) self.notifier = AgentNotifierApi(topics.AGENT) - self.dhcp_agent_notifier = dhcp_rpc_agent_api.DhcpAgentNotifyAPI() - self.l3_agent_notifier = l3_rpc_agent_api.L3AgentNotify + self.agent_notifiers[q_const.AGENT_TYPE_DHCP] = ( + dhcp_rpc_agent_api.DhcpAgentNotifyAPI() + ) + self.agent_notifiers[q_const.AGENT_TYPE_L3] = ( + l3_rpc_agent_api.L3AgentNotify + ) self.callbacks = OVSRpcCallbacks(self.notifier, self.tunnel_type) self.dispatcher = self.callbacks.create_rpc_dispatcher() self.conn.create_consumer(self.topic, self.dispatcher, diff --git a/neutron/services/loadbalancer/agent_scheduler.py b/neutron/services/loadbalancer/agent_scheduler.py new file mode 100644 index 000000000..084496e6c --- /dev/null +++ b/neutron/services/loadbalancer/agent_scheduler.py @@ -0,0 +1,114 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (c) 2013 OpenStack Foundation. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import random + +import sqlalchemy as sa +from sqlalchemy import orm +from sqlalchemy.orm import joinedload + +from neutron.common import constants +from neutron.db import agents_db +from neutron.db import agentschedulers_db +from neutron.db import model_base +from neutron.extensions import lbaas_agentscheduler +from neutron.openstack.common import log as logging + +LOG = logging.getLogger(__name__) + + +class PoolLoadbalancerAgentBinding(model_base.BASEV2): + """Represents binding between neutron loadbalancer pools and agents.""" + + pool_id = sa.Column(sa.String(36), + sa.ForeignKey("pools.id", ondelete='CASCADE'), + primary_key=True) + agent = orm.relation(agents_db.Agent) + agent_id = sa.Column(sa.String(36), sa.ForeignKey("agents.id", + ondelete='CASCADE')) + + +class LbaasAgentSchedulerDbMixin(agentschedulers_db.AgentSchedulerDbMixin, + lbaas_agentscheduler + .LbaasAgentSchedulerPluginBase): + + def get_lbaas_agent_hosting_pool(self, context, pool_id, active=None): + query = context.session.query(PoolLoadbalancerAgentBinding) + query = query.options(joinedload('agent')) + binding = query.get(pool_id) + + if (binding and self.is_eligible_agent( + active, binding.agent)): + return {'agent': self._make_agent_dict(binding.agent)} + + def get_lbaas_agents(self, context, active=None, filters=None): + query = context.session.query(agents_db.Agent) + query = query.filter_by(agent_type=constants.AGENT_TYPE_LOADBALANCER) + if active is not None: + query = query.filter_by(admin_state_up=active) + if filters: + for key, value in filters.iteritems(): + column = getattr(agents_db.Agent, key, None) + if column: + query = query.filter(column.in_(value)) + + return [agent + for agent in query + if self.is_eligible_agent(active, agent)] + + def list_pools_on_lbaas_agent(self, context, id): + query = context.session.query(PoolLoadbalancerAgentBinding.pool_id) + query = query.filter_by(agent_id=id) + pool_ids = [item[0] for item in query] + if pool_ids: + return {'pools': self.get_pools(context, filters={'id': pool_ids})} + else: + return {'pools': []} + + +class ChanceScheduler(object): + """Allocate a loadbalancer agent for a vip in a random way.""" + + def schedule(self, plugin, context, pool): + """Schedule the pool to an active loadbalancer agent if there + is no enabled agent hosting it. + """ + with context.session.begin(subtransactions=True): + lbaas_agent = plugin.get_lbaas_agent_hosting_pool( + context, pool['id']) + if lbaas_agent: + LOG.debug(_('Pool %(pool_id)s has already been hosted' + ' by lbaas agent %(agent_id)s'), + {'pool_id': pool['id'], + 'agent_id': lbaas_agent['id']}) + return + + candidates = plugin.get_lbaas_agents(context, active=True) + if not candidates: + LOG.warn(_('No active lbaas agents for pool %s') % pool['id']) + return + + chosen_agent = random.choice(candidates) + binding = PoolLoadbalancerAgentBinding() + binding.agent = chosen_agent + binding.pool_id = pool['id'] + context.session.add(binding) + LOG.debug(_('Pool %(pool_id)s is scheduled to ' + 'lbaas agent %(agent_id)s'), + {'pool_id': pool['id'], + 'agent_id': chosen_agent['id']}) + return chosen_agent diff --git a/neutron/services/loadbalancer/drivers/haproxy/agent.py b/neutron/services/loadbalancer/drivers/haproxy/agent.py index 0aa183a99..71e123ff2 100644 --- a/neutron/services/loadbalancer/drivers/haproxy/agent.py +++ b/neutron/services/loadbalancer/drivers/haproxy/agent.py @@ -55,6 +55,7 @@ def main(): cfg.CONF.register_opts(manager.OPTS) # import interface options just in case the driver uses namespaces cfg.CONF.register_opts(interface.OPTS) + config.register_agent_state_opts_helper(cfg.CONF) config.register_root_helper(cfg.CONF) cfg.CONF(project='neutron') diff --git a/neutron/services/loadbalancer/drivers/haproxy/agent_manager.py b/neutron/services/loadbalancer/drivers/haproxy/agent_manager.py index bea874dab..41da44ae4 100644 --- a/neutron/services/loadbalancer/drivers/haproxy/agent_manager.py +++ b/neutron/services/loadbalancer/drivers/haproxy/agent_manager.py @@ -21,9 +21,12 @@ import weakref from oslo.config import cfg from neutron.agent.common import config +from neutron.agent import rpc as agent_rpc +from neutron.common import constants from neutron import context from neutron.openstack.common import importutils from neutron.openstack.common import log as logging +from neutron.openstack.common import loopingcall from neutron.openstack.common import periodic_task from neutron.services.loadbalancer.drivers.haproxy import ( agent_api, @@ -110,6 +113,12 @@ class LogicalDeviceCache(object): class LbaasAgentManager(periodic_task.PeriodicTasks): + + # history + # 1.0 Initial version + # 1.1 Support agent_updated call + RPC_API_VERSION = '1.1' + def __init__(self, conf): self.conf = conf try: @@ -131,15 +140,46 @@ class LbaasAgentManager(periodic_task.PeriodicTasks): except ImportError: msg = _('Error importing loadbalancer device driver: %s') raise SystemExit(msg % conf.device_driver) - ctx = context.get_admin_context_without_session() - self.plugin_rpc = agent_api.LbaasAgentApi( - plugin_driver.TOPIC_PROCESS_ON_HOST, - ctx, - conf.host - ) + + self.agent_state = { + 'binary': 'neutron-loadbalancer-agent', + 'host': conf.host, + 'topic': plugin_driver.TOPIC_LOADBALANCER_AGENT, + 'configurations': {'device_driver': conf.device_driver, + 'interface_driver': conf.interface_driver}, + 'agent_type': constants.AGENT_TYPE_LOADBALANCER, + 'start_flag': True} + self.admin_state_up = True + + self.context = context.get_admin_context_without_session() + self._setup_rpc() self.needs_resync = False self.cache = LogicalDeviceCache() + def _setup_rpc(self): + self.plugin_rpc = agent_api.LbaasAgentApi( + plugin_driver.TOPIC_PROCESS_ON_HOST, + self.context, + self.conf.host + ) + self.state_rpc = agent_rpc.PluginReportStateAPI( + plugin_driver.TOPIC_PROCESS_ON_HOST) + report_interval = self.conf.AGENT.report_interval + if report_interval: + heartbeat = loopingcall.FixedIntervalLoopingCall( + self._report_state) + heartbeat.start(interval=report_interval) + + def _report_state(self): + try: + device_count = len(self.cache.devices) + self.agent_state['configurations']['devices'] = device_count + self.state_rpc.report_state(self.context, + self.agent_state) + self.agent_state.pop('start_flag', None) + except Exception: + LOG.exception("Failed reporting state!") + def initialize_service_hook(self, started_by): self.sync_state() @@ -228,3 +268,14 @@ class LbaasAgentManager(periodic_task.PeriodicTasks): """Handle RPC cast from plugin to destroy a pool if known to agent.""" if self.cache.get_by_pool_id(pool_id): self.destroy_device(pool_id) + + def agent_updated(self, context, payload): + """Handle the agent_updated notification event.""" + if payload['admin_state_up'] != self.admin_state_up: + self.admin_state_up = payload['admin_state_up'] + if self.admin_state_up: + self.needs_resync = True + else: + for pool_id in self.cache.get_pool_ids(): + self.destroy_device(pool_id) + LOG.info(_("agent_updated by server side %s!"), payload) diff --git a/neutron/services/loadbalancer/drivers/haproxy/plugin_driver.py b/neutron/services/loadbalancer/drivers/haproxy/plugin_driver.py index 09e8f0da0..85fb9de05 100644 --- a/neutron/services/loadbalancer/drivers/haproxy/plugin_driver.py +++ b/neutron/services/loadbalancer/drivers/haproxy/plugin_driver.py @@ -20,9 +20,13 @@ import uuid from oslo.config import cfg +from neutron.common import constants as q_const from neutron.common import exceptions as q_exc from neutron.common import rpc as q_rpc +from neutron.db import agents_db from neutron.db.loadbalancer import loadbalancer_db +from neutron.extensions import lbaas_agentscheduler +from neutron.openstack.common import importutils from neutron.openstack.common import log as logging from neutron.openstack.common import rpc from neutron.openstack.common.rpc import proxy @@ -37,19 +41,31 @@ ACTIVE_PENDING = ( constants.PENDING_UPDATE ) +AGENT_SCHEDULER_OPTS = [ + cfg.StrOpt('loadbalancer_pool_scheduler_driver', + default='neutron.services.loadbalancer.agent_scheduler' + '.ChanceScheduler', + help=_('Driver to use for scheduling ' + 'pool to a default loadbalancer agent')), +] + +cfg.CONF.register_opts(AGENT_SCHEDULER_OPTS) + # topic name for this particular agent implementation TOPIC_PROCESS_ON_HOST = 'q-lbaas-process-on-host' TOPIC_LOADBALANCER_AGENT = 'lbaas_process_on_host_agent' class LoadBalancerCallbacks(object): + RPC_API_VERSION = '1.0' def __init__(self, plugin): self.plugin = plugin def create_rpc_dispatcher(self): - return q_rpc.PluginRpcDispatcher([self]) + return q_rpc.PluginRpcDispatcher( + [self, agents_db.AgentExtRpcCallback(self.plugin)]) def get_ready_devices(self, context, host=None): with context.session.begin(subtransactions=True): @@ -61,6 +77,17 @@ class LoadBalancerCallbacks(object): up = True # makes pep8 and sqlalchemy happy qry = qry.filter(loadbalancer_db.Vip.admin_state_up == up) qry = qry.filter(loadbalancer_db.Pool.admin_state_up == up) + agents = self.plugin.get_lbaas_agents(context, + filters={'host': [host]}) + if not agents: + return [] + elif len(agents) > 1: + LOG.warning(_('Multiple lbaas agents found on host %s') % host) + + pools = self.plugin.list_pools_on_lbaas_agent(context, + agents[0].id) + pool_ids = [pool['id'] for pool in pools['pools']] + qry = qry.filter(loadbalancer_db.Pool.id.in_(pool_ids)) return [id for id, in qry] def get_logical_device(self, context, pool_id=None, activate=True, @@ -185,40 +212,50 @@ class LoadBalancerCallbacks(object): class LoadBalancerAgentApi(proxy.RpcProxy): """Plugin side of plugin to agent RPC API.""" - API_VERSION = '1.0' + BASE_RPC_API_VERSION = '1.0' + # history + # 1.0 Initial version + # 1.1 Support agent_updated call - def __init__(self, topic, host): - super(LoadBalancerAgentApi, self).__init__(topic, self.API_VERSION) - self.host = host + def __init__(self, topic): + super(LoadBalancerAgentApi, self).__init__( + topic, default_version=self.BASE_RPC_API_VERSION) - def reload_pool(self, context, pool_id): + def reload_pool(self, context, pool_id, host): return self.cast( context, - self.make_msg('reload_pool', pool_id=pool_id, host=self.host), - topic=self.topic + self.make_msg('reload_pool', pool_id=pool_id, host=host), + topic='%s.%s' % (self.topic, host) ) - def destroy_pool(self, context, pool_id): + def destroy_pool(self, context, pool_id, host): return self.cast( context, - self.make_msg('destroy_pool', pool_id=pool_id, host=self.host), - topic=self.topic + self.make_msg('destroy_pool', pool_id=pool_id, host=host), + topic='%s.%s' % (self.topic, host) ) - def modify_pool(self, context, pool_id): + def modify_pool(self, context, pool_id, host): return self.cast( context, - self.make_msg('modify_pool', pool_id=pool_id, host=self.host), - topic=self.topic + self.make_msg('modify_pool', pool_id=pool_id, host=host), + topic='%s.%s' % (self.topic, host) + ) + + def agent_updated(self, context, admin_state_up, host): + return self.cast( + context, + self.make_msg('agent_updated', + payload={'admin_state_up': admin_state_up}), + topic='%s.%s' % (self.topic, host), + version='1.1' ) class HaproxyOnHostPluginDriver(abstract_driver.LoadBalancerAbstractDriver): + def __init__(self, plugin): - self.agent_rpc = LoadBalancerAgentApi( - TOPIC_LOADBALANCER_AGENT, - cfg.CONF.host - ) + self.agent_rpc = LoadBalancerAgentApi(TOPIC_LOADBALANCER_AGENT) self.callbacks = LoadBalancerCallbacks(plugin) self.conn = rpc.create_connection(new=True) @@ -228,56 +265,85 @@ class HaproxyOnHostPluginDriver(abstract_driver.LoadBalancerAbstractDriver): fanout=False) self.conn.consume_in_thread() self.plugin = plugin + self.plugin.agent_notifiers.update( + {q_const.AGENT_TYPE_LOADBALANCER: self.agent_rpc}) + + self.pool_scheduler = importutils.import_object( + cfg.CONF.loadbalancer_pool_scheduler_driver) + + def get_pool_agent(self, context, pool_id): + agent = self.plugin.get_lbaas_agent_hosting_pool(context, pool_id) + if not agent: + raise lbaas_agentscheduler.NoActiveLbaasAgent(pool_id=pool_id) + return agent['agent'] def create_vip(self, context, vip): - self.agent_rpc.reload_pool(context, vip['pool_id']) + agent = self.get_pool_agent(context, vip['pool_id']) + self.agent_rpc.reload_pool(context, vip['pool_id'], agent['host']) def update_vip(self, context, old_vip, vip): + agent = self.get_pool_agent(context, vip['pool_id']) if vip['status'] in ACTIVE_PENDING: - self.agent_rpc.reload_pool(context, vip['pool_id']) + self.agent_rpc.reload_pool(context, vip['pool_id'], agent['host']) else: - self.agent_rpc.destroy_pool(context, vip['pool_id']) + self.agent_rpc.destroy_pool(context, vip['pool_id'], agent['host']) def delete_vip(self, context, vip): self.plugin._delete_db_vip(context, vip['id']) - self.agent_rpc.destroy_pool(context, vip['pool_id']) + agent = self.get_pool_agent(context, vip['pool_id']) + self.agent_rpc.destroy_pool(context, vip['pool_id'], agent['host']) def create_pool(self, context, pool): + if not self.pool_scheduler.schedule(self.plugin, context, pool): + raise lbaas_agentscheduler.NoEligibleLbaasAgent(pool_id=pool['id']) # don't notify here because a pool needs a vip to be useful - pass def update_pool(self, context, old_pool, pool): + agent = self.get_pool_agent(context, pool['id']) if pool['status'] in ACTIVE_PENDING: if pool['vip_id'] is not None: - self.agent_rpc.reload_pool(context, pool['id']) + self.agent_rpc.reload_pool(context, pool['id'], agent['host']) else: - self.agent_rpc.destroy_pool(context, pool['id']) + self.agent_rpc.destroy_pool(context, pool['id'], agent['host']) def delete_pool(self, context, pool): + agent = self.plugin.get_lbaas_agent_hosting_pool(context, pool['id']) + if agent: + self.agent_rpc.destroy_pool(context, pool['id'], + agent['agent']['host']) self.plugin._delete_db_pool(context, pool['id']) - self.agent_rpc.destroy_pool(context, pool['id']) def create_member(self, context, member): - self.agent_rpc.modify_pool(context, member['pool_id']) + agent = self.get_pool_agent(context, member['pool_id']) + self.agent_rpc.modify_pool(context, member['pool_id'], agent['host']) def update_member(self, context, old_member, member): # member may change pool id if member['pool_id'] != old_member['pool_id']: - self.agent_rpc.modify_pool(context, old_member['pool_id']) - self.agent_rpc.modify_pool(context, member['pool_id']) + agent = self.plugin.get_lbaas_agent_hosting_pool( + context, old_member['pool_id']) + if agent: + self.agent_rpc.modify_pool(context, + old_member['pool_id'], + agent['agent']['host']) + agent = self.get_pool_agent(context, member['pool_id']) + self.agent_rpc.modify_pool(context, member['pool_id'], agent['host']) def delete_member(self, context, member): self.plugin._delete_db_member(context, member['id']) - self.agent_rpc.modify_pool(context, member['pool_id']) + agent = self.get_pool_agent(context, member['pool_id']) + self.agent_rpc.modify_pool(context, member['pool_id'], agent['host']) def update_health_monitor(self, context, old_health_monitor, health_monitor, pool_id): # monitors are unused here because agent will fetch what is necessary - self.agent_rpc.modify_pool(context, pool_id) + agent = self.get_pool_agent(context, pool_id) + self.agent_rpc.modify_pool(context, pool_id, agent['host']) def create_pool_health_monitor(self, context, healthmon, pool_id): # healthmon is not used here - self.agent_rpc.modify_pool(context, pool_id) + agent = self.get_pool_agent(context, pool_id) + self.agent_rpc.modify_pool(context, pool_id, agent['host']) def delete_pool_health_monitor(self, context, health_monitor, pool_id): self.plugin._delete_db_pool_health_monitor( @@ -285,7 +351,8 @@ class HaproxyOnHostPluginDriver(abstract_driver.LoadBalancerAbstractDriver): ) # healthmon_id is not used here - self.agent_rpc.modify_pool(context, pool_id) + agent = self.get_pool_agent(context, pool_id) + self.agent_rpc.modify_pool(context, pool_id, agent['host']) def create_health_monitor(self, context, health_monitor): pass diff --git a/neutron/services/loadbalancer/drivers/noop/noop_driver.py b/neutron/services/loadbalancer/drivers/noop/noop_driver.py index 73612c075..3183c448a 100644 --- a/neutron/services/loadbalancer/drivers/noop/noop_driver.py +++ b/neutron/services/loadbalancer/drivers/noop/noop_driver.py @@ -58,7 +58,7 @@ class NoopLbaaSDriver(abstract_driver.LoadBalancerAbstractDriver): @log.log def delete_pool(self, context, pool): - pass + self.plugin._delete_db_pool(context, pool["id"]) @log.log def stats(self, context, pool_id): diff --git a/neutron/services/loadbalancer/plugin.py b/neutron/services/loadbalancer/plugin.py index 642b14dfa..9457ac3c1 100644 --- a/neutron/services/loadbalancer/plugin.py +++ b/neutron/services/loadbalancer/plugin.py @@ -23,6 +23,7 @@ from neutron.db.loadbalancer import loadbalancer_db from neutron.openstack.common import importutils from neutron.openstack.common import log as logging from neutron.plugins.common import constants +from neutron.services.loadbalancer import agent_scheduler LOG = logging.getLogger(__name__) @@ -39,7 +40,8 @@ cfg.CONF.register_opts(lbaas_plugin_opts, "LBAAS") legacy.override_config(cfg.CONF, [('LBAAS', 'driver_fqn')]) -class LoadBalancerPlugin(loadbalancer_db.LoadBalancerPluginDb): +class LoadBalancerPlugin(loadbalancer_db.LoadBalancerPluginDb, + agent_scheduler.LbaasAgentSchedulerDbMixin): """Implementation of the Neutron Loadbalancer Service Plugin. @@ -47,7 +49,12 @@ class LoadBalancerPlugin(loadbalancer_db.LoadBalancerPluginDb): Most DB related works are implemented in class loadbalancer_db.LoadBalancerPluginDb. """ - supported_extension_aliases = ["lbaas"] + supported_extension_aliases = ["lbaas", "lbaas_agent_scheduler"] + + # lbaas agent notifiers to handle agent update operations; + # can be updated by plugin drivers while loading; + # will be extracted by neutron manager when loading service plugins; + agent_notifiers = {} def __init__(self): """Initialization for the loadbalancer service plugin.""" @@ -213,7 +220,7 @@ class LoadBalancerPlugin(loadbalancer_db.LoadBalancerPluginDb): # update the db and return the value from db # else - return what we have in db if stats_data: - super(LoadBalancerPlugin, self)._update_pool_stats( + super(LoadBalancerPlugin, self).update_pool_stats( context, pool_id, stats_data diff --git a/neutron/tests/unit/db/loadbalancer/test_db_loadbalancer.py b/neutron/tests/unit/db/loadbalancer/test_db_loadbalancer.py index 470d7e4e5..584569741 100644 --- a/neutron/tests/unit/db/loadbalancer/test_db_loadbalancer.py +++ b/neutron/tests/unit/db/loadbalancer/test_db_loadbalancer.py @@ -16,14 +16,17 @@ import contextlib import logging import os -import testtools +import mock +from oslo.config import cfg +import testtools import webob.exc from neutron.api.extensions import ExtensionMiddleware from neutron.api.extensions import PluginAwareExtensionManager from neutron.common import config from neutron import context +import neutron.db.l3_db # noqa from neutron.db.loadbalancer import loadbalancer_db as ldb import neutron.extensions from neutron.extensions import loadbalancer @@ -46,34 +49,19 @@ ETCDIR = os.path.join(ROOTDIR, 'etc') extensions_path = ':'.join(neutron.extensions.__path__) +_subnet_id = "0c798ed8-33ba-11e2-8b28-000c291c4d14" + def etcdir(*p): return os.path.join(ETCDIR, *p) -class LoadBalancerPluginDbTestCase(test_db_plugin.NeutronDbPluginV2TestCase): +class LoadBalancerTestMixin(object): resource_prefix_map = dict( (k, constants.COMMON_PREFIXES[constants.LOADBALANCER]) for k in loadbalancer.RESOURCE_ATTRIBUTE_MAP.keys() ) - def setUp(self, core_plugin=None, lb_plugin=None): - service_plugins = {'lb_plugin_name': DB_LB_PLUGIN_KLASS} - - super(LoadBalancerPluginDbTestCase, self).setUp( - service_plugins=service_plugins - ) - - self._subnet_id = "0c798ed8-33ba-11e2-8b28-000c291c4d14" - - self.plugin = loadbalancer_plugin.LoadBalancerPlugin() - ext_mgr = PluginAwareExtensionManager( - extensions_path, - {constants.LOADBALANCER: self.plugin} - ) - app = config.load_paste_app('extensions_test_app') - self.ext_api = ExtensionMiddleware(app, ext_mgr=ext_mgr) - def _create_vip(self, fmt, name, pool_id, protocol, protocol_port, admin_state_up, expected_res_status=None, **kwargs): data = {'vip': {'name': name, @@ -97,7 +85,7 @@ class LoadBalancerPluginDbTestCase(test_db_plugin.NeutronDbPluginV2TestCase): def _create_pool(self, fmt, name, lb_method, protocol, admin_state_up, expected_res_status=None, **kwargs): data = {'pool': {'name': name, - 'subnet_id': self._subnet_id, + 'subnet_id': _subnet_id, 'lb_method': lb_method, 'protocol': protocol, 'admin_state_up': admin_state_up, @@ -151,12 +139,6 @@ class LoadBalancerPluginDbTestCase(test_db_plugin.NeutronDbPluginV2TestCase): return res - def _api_for_resource(self, resource): - if resource in ['networks', 'subnets', 'ports']: - return self.api - else: - return self.ext_api - @contextlib.contextmanager def vip(self, fmt=None, name='vip1', pool=None, subnet=None, protocol='HTTP', protocol_port=80, admin_state_up=True, @@ -270,7 +252,43 @@ class LoadBalancerPluginDbTestCase(test_db_plugin.NeutronDbPluginV2TestCase): self._delete('health_monitors', the_health_monitor['id']) +class LoadBalancerPluginDbTestCase(LoadBalancerTestMixin, + test_db_plugin.NeutronDbPluginV2TestCase): + def setUp(self, core_plugin=None, lb_plugin=None): + service_plugins = {'lb_plugin_name': DB_LB_PLUGIN_KLASS} + super(LoadBalancerPluginDbTestCase, self).setUp( + service_plugins=service_plugins + ) + + self._subnet_id = _subnet_id + + self.plugin = loadbalancer_plugin.LoadBalancerPlugin() + + get_lbaas_agent_patcher = mock.patch( + 'neutron.services.loadbalancer.agent_scheduler' + '.LbaasAgentSchedulerDbMixin.get_lbaas_agent_hosting_pool') + mock_lbaas_agent = mock.MagicMock() + get_lbaas_agent_patcher.start().return_value = mock_lbaas_agent + mock_lbaas_agent.__getitem__.return_value = {'host': 'host'} + self.addCleanup(mock.patch.stopall) + + ext_mgr = PluginAwareExtensionManager( + extensions_path, + {constants.LOADBALANCER: self.plugin} + ) + app = config.load_paste_app('extensions_test_app') + self.ext_api = ExtensionMiddleware(app, ext_mgr=ext_mgr) + + class TestLoadBalancer(LoadBalancerPluginDbTestCase): + def setUp(self): + cfg.CONF.set_override('driver_fqn', + 'neutron.services.loadbalancer.drivers.noop' + '.noop_driver.NoopLbaaSDriver', + group='LBAAS') + self.addCleanup(cfg.CONF.reset) + super(TestLoadBalancer, self).setUp() + def test_create_vip(self, **extras): expected = { 'name': 'vip1', diff --git a/neutron/tests/unit/dummy_plugin.py b/neutron/tests/unit/dummy_plugin.py index 8a1f4a757..435064291 100644 --- a/neutron/tests/unit/dummy_plugin.py +++ b/neutron/tests/unit/dummy_plugin.py @@ -95,6 +95,7 @@ class DummyServicePlugin(ServicePluginBase): """ supported_extension_aliases = ['dummy', servicetype.EXT_ALIAS] + agent_notifiers = {'dummy': 'dummy_agent_notifier'} def __init__(self): self.svctype_mgr = servicetype_db.ServiceTypeManager.get_instance() diff --git a/neutron/tests/unit/openvswitch/test_agent_scheduler.py b/neutron/tests/unit/openvswitch/test_agent_scheduler.py index c4ab9124b..bc206662f 100644 --- a/neutron/tests/unit/openvswitch/test_agent_scheduler.py +++ b/neutron/tests/unit/openvswitch/test_agent_scheduler.py @@ -1106,7 +1106,8 @@ class OvsL3AgentNotifierTestCase(test_l3_plugin.L3NatTestCaseMixin, def test_router_add_to_l3_agent_notification(self): plugin = manager.NeutronManager.get_plugin() - with mock.patch.object(plugin.l3_agent_notifier, 'cast') as mock_l3: + l3_notifier = plugin.agent_notifiers[constants.AGENT_TYPE_L3] + with mock.patch.object(l3_notifier, 'cast') as mock_l3: with self.router() as router1: self._register_agent_states() hosta_id = self._get_agent_id(constants.AGENT_TYPE_L3, @@ -1116,14 +1117,15 @@ class OvsL3AgentNotifierTestCase(test_l3_plugin.L3NatTestCaseMixin, routers = [router1['router']['id']] mock_l3.assert_called_with( mock.ANY, - plugin.l3_agent_notifier.make_msg( + l3_notifier.make_msg( 'router_added_to_agent', payload=routers), topic='l3_agent.hosta') def test_router_remove_from_l3_agent_notification(self): plugin = manager.NeutronManager.get_plugin() - with mock.patch.object(plugin.l3_agent_notifier, 'cast') as mock_l3: + l3_notifier = plugin.agent_notifiers[constants.AGENT_TYPE_L3] + with mock.patch.object(l3_notifier, 'cast') as mock_l3: with self.router() as router1: self._register_agent_states() hosta_id = self._get_agent_id(constants.AGENT_TYPE_L3, @@ -1133,22 +1135,22 @@ class OvsL3AgentNotifierTestCase(test_l3_plugin.L3NatTestCaseMixin, self._remove_router_from_l3_agent(hosta_id, router1['router']['id']) mock_l3.assert_called_with( - mock.ANY, plugin.l3_agent_notifier.make_msg( + mock.ANY, l3_notifier.make_msg( 'router_removed_from_agent', payload={'router_id': router1['router']['id']}), topic='l3_agent.hosta') def test_agent_updated_l3_agent_notification(self): plugin = manager.NeutronManager.get_plugin() - with mock.patch.object(plugin.l3_agent_notifier, 'cast') as mock_l3: + l3_notifier = plugin.agent_notifiers[constants.AGENT_TYPE_L3] + with mock.patch.object(l3_notifier, 'cast') as mock_l3: self._register_agent_states() hosta_id = self._get_agent_id(constants.AGENT_TYPE_L3, L3_HOSTA) self._disable_agent(hosta_id, admin_state_up=False) mock_l3.assert_called_with( - mock.ANY, plugin.l3_agent_notifier.make_msg( - 'agent_updated', - payload={'admin_state_up': False}), + mock.ANY, l3_notifier.make_msg( + 'agent_updated', payload={'admin_state_up': False}), topic='l3_agent.hosta') diff --git a/neutron/tests/unit/services/loadbalancer/drivers/haproxy/test_plugin_driver.py b/neutron/tests/unit/services/loadbalancer/drivers/haproxy/test_plugin_driver.py index ff6d0341b..6cc0cc6c7 100644 --- a/neutron/tests/unit/services/loadbalancer/drivers/haproxy/test_plugin_driver.py +++ b/neutron/tests/unit/services/loadbalancer/drivers/haproxy/test_plugin_driver.py @@ -53,13 +53,30 @@ class TestLoadBalancerCallbacks(TestLoadBalancerPluginBase): self.callbacks = plugin_driver.LoadBalancerCallbacks( self.plugin_instance ) + get_lbaas_agents_patcher = mock.patch( + 'neutron.services.loadbalancer.agent_scheduler' + '.LbaasAgentSchedulerDbMixin.get_lbaas_agents') + get_lbaas_agents_patcher.start() + + # mocking plugin_driver create_pool() as it does nothing more than + # pool scheduling which is beyond the scope of this test case + mock.patch('neutron.services.loadbalancer.drivers.haproxy' + '.plugin_driver.HaproxyOnHostPluginDriver' + '.create_pool').start() + + self.addCleanup(mock.patch.stopall) def test_get_ready_devices(self): with self.vip() as vip: - ready = self.callbacks.get_ready_devices( - context.get_admin_context(), - ) - self.assertEqual(ready, [vip['vip']['pool_id']]) + with mock.patch('neutron.services.loadbalancer.agent_scheduler' + '.LbaasAgentSchedulerDbMixin.' + 'list_pools_on_lbaas_agent') as mock_agent_pools: + mock_agent_pools.return_value = { + 'pools': [{'id': vip['vip']['pool_id']}]} + ready = self.callbacks.get_ready_devices( + context.get_admin_context(), + ) + self.assertEqual(ready, [vip['vip']['pool_id']]) def test_get_ready_devices_multiple_vips_and_pools(self): ctx = context.get_admin_context() @@ -100,11 +117,17 @@ class TestLoadBalancerCallbacks(TestLoadBalancerPluginBase): self.assertEqual(ctx.session.query(ldb.Pool).count(), 3) self.assertEqual(ctx.session.query(ldb.Vip).count(), 2) - ready = self.callbacks.get_ready_devices(ctx) - self.assertEqual(len(ready), 2) - self.assertIn(pools[0].id, ready) - self.assertIn(pools[1].id, ready) - self.assertNotIn(pools[2].id, ready) + with mock.patch('neutron.services.loadbalancer.agent_scheduler' + '.LbaasAgentSchedulerDbMixin' + '.list_pools_on_lbaas_agent') as mock_agent_pools: + mock_agent_pools.return_value = {'pools': [{'id': pools[0].id}, + {'id': pools[1].id}, + {'id': pools[2].id}]} + ready = self.callbacks.get_ready_devices(ctx) + self.assertEqual(len(ready), 2) + self.assertIn(pools[0].id, ready) + self.assertIn(pools[1].id, ready) + self.assertNotIn(pools[2].id, ready) # cleanup ctx.session.query(ldb.Pool).delete() ctx.session.query(ldb.Vip).delete() @@ -119,11 +142,15 @@ class TestLoadBalancerCallbacks(TestLoadBalancerPluginBase): vip['vip']['id'], {'vip': {'status': constants.INACTIVE}} ) - - ready = self.callbacks.get_ready_devices( - context.get_admin_context(), - ) - self.assertFalse(ready) + with mock.patch('neutron.services.loadbalancer.agent_scheduler' + '.LbaasAgentSchedulerDbMixin.' + 'list_pools_on_lbaas_agent') as mock_agent_pools: + mock_agent_pools.return_value = { + 'pools': [{'id': vip['vip']['pool_id']}]} + ready = self.callbacks.get_ready_devices( + context.get_admin_context(), + ) + self.assertFalse(ready) def test_get_ready_devices_inactive_pool(self): with self.vip() as vip: @@ -135,11 +162,15 @@ class TestLoadBalancerCallbacks(TestLoadBalancerPluginBase): vip['vip']['pool_id'], {'pool': {'status': constants.INACTIVE}} ) - - ready = self.callbacks.get_ready_devices( - context.get_admin_context(), - ) - self.assertFalse(ready) + with mock.patch('neutron.services.loadbalancer.agent_scheduler' + '.LbaasAgentSchedulerDbMixin.' + 'list_pools_on_lbaas_agent') as mock_agent_pools: + mock_agent_pools.return_value = { + 'pools': [{'id': vip['vip']['pool_id']}]} + ready = self.callbacks.get_ready_devices( + context.get_admin_context(), + ) + self.assertFalse(ready) def test_get_logical_device_inactive(self): with self.pool() as pool: @@ -235,26 +266,26 @@ class TestLoadBalancerAgentApi(base.BaseTestCase): super(TestLoadBalancerAgentApi, self).setUp() self.addCleanup(mock.patch.stopall) - self.api = plugin_driver.LoadBalancerAgentApi('topic', 'host') + self.api = plugin_driver.LoadBalancerAgentApi('topic') self.mock_cast = mock.patch.object(self.api, 'cast').start() self.mock_msg = mock.patch.object(self.api, 'make_msg').start() def test_init(self): self.assertEqual(self.api.topic, 'topic') - self.assertEqual(self.api.host, 'host') def _call_test_helper(self, method_name): - rv = getattr(self.api, method_name)(mock.sentinel.context, 'the_id') + rv = getattr(self.api, method_name)(mock.sentinel.context, 'test', + 'host') self.assertEqual(rv, self.mock_cast.return_value) self.mock_cast.assert_called_once_with( mock.sentinel.context, self.mock_msg.return_value, - topic='topic' + topic='topic.host' ) self.mock_msg.assert_called_once_with( method_name, - pool_id='the_id', + pool_id='test', host='host' ) @@ -267,6 +298,21 @@ class TestLoadBalancerAgentApi(base.BaseTestCase): def test_modify_pool(self): self._call_test_helper('modify_pool') + def test_agent_updated(self): + rv = self.api.agent_updated(mock.sentinel.context, True, 'host') + self.assertEqual(rv, self.mock_cast.return_value) + self.mock_cast.assert_called_once_with( + mock.sentinel.context, + self.mock_msg.return_value, + topic='topic.host', + version='1.1' + ) + + self.mock_msg.assert_called_once_with( + 'agent_updated', + payload={'admin_state_up': True} + ) + class TestLoadBalancerPluginNotificationWrapper(TestLoadBalancerPluginBase): def setUp(self): @@ -276,6 +322,12 @@ class TestLoadBalancerPluginNotificationWrapper(TestLoadBalancerPluginBase): super(TestLoadBalancerPluginNotificationWrapper, self).setUp() self.mock_api = api_cls.return_value + # mocking plugin_driver create_pool() as it does nothing more than + # pool scheduling which is beyond the scope of this test case + mock.patch('neutron.services.loadbalancer.drivers.haproxy' + '.plugin_driver.HaproxyOnHostPluginDriver' + '.create_pool').start() + self.addCleanup(mock.patch.stopall) def test_create_vip(self): @@ -284,7 +336,8 @@ class TestLoadBalancerPluginNotificationWrapper(TestLoadBalancerPluginBase): with self.vip(pool=pool, subnet=subnet) as vip: self.mock_api.reload_pool.assert_called_once_with( mock.ANY, - vip['vip']['pool_id'] + vip['vip']['pool_id'], + 'host' ) def test_update_vip(self): @@ -302,7 +355,8 @@ class TestLoadBalancerPluginNotificationWrapper(TestLoadBalancerPluginBase): self.mock_api.reload_pool.assert_called_once_with( mock.ANY, - vip['vip']['pool_id'] + vip['vip']['pool_id'], + 'host' ) self.assertEqual( @@ -319,7 +373,8 @@ class TestLoadBalancerPluginNotificationWrapper(TestLoadBalancerPluginBase): self.plugin_instance.delete_vip(ctx, vip['vip']['id']) self.mock_api.destroy_pool.assert_called_once_with( mock.ANY, - vip['vip']['pool_id'] + vip['vip']['pool_id'], + 'host' ) def test_create_pool(self): @@ -334,7 +389,7 @@ class TestLoadBalancerPluginNotificationWrapper(TestLoadBalancerPluginBase): ctx = context.get_admin_context() self.plugin_instance.update_pool(ctx, pool['pool']['id'], pool) self.mock_api.destroy_pool.assert_called_once_with( - mock.ANY, pool['pool']['id']) + mock.ANY, pool['pool']['id'], 'host') self.assertFalse(self.mock_api.reload_pool.called) self.assertFalse(self.mock_api.modify_pool.called) @@ -352,7 +407,7 @@ class TestLoadBalancerPluginNotificationWrapper(TestLoadBalancerPluginBase): ctx = context.get_admin_context() self.plugin_instance.update_pool(ctx, pool['pool']['id'], pool) self.mock_api.reload_pool.assert_called_once_with( - mock.ANY, pool['pool']['id']) + mock.ANY, pool['pool']['id'], 'host') self.assertFalse(self.mock_api.destroy_pool.called) self.assertFalse(self.mock_api.modify_pool.called) @@ -363,14 +418,14 @@ class TestLoadBalancerPluginNotificationWrapper(TestLoadBalancerPluginBase): res = req.get_response(self.ext_api) self.assertEqual(res.status_int, 204) self.mock_api.destroy_pool.assert_called_once_with( - mock.ANY, pool['pool']['id']) + mock.ANY, pool['pool']['id'], 'host') def test_create_member(self): with self.pool() as pool: pool_id = pool['pool']['id'] with self.member(pool_id=pool_id): self.mock_api.modify_pool.assert_called_once_with( - mock.ANY, pool_id) + mock.ANY, pool_id, 'host') def test_update_member(self): with self.pool() as pool: @@ -381,7 +436,7 @@ class TestLoadBalancerPluginNotificationWrapper(TestLoadBalancerPluginBase): self.plugin_instance.update_member( ctx, member['member']['id'], member) self.mock_api.modify_pool.assert_called_once_with( - mock.ANY, pool_id) + mock.ANY, pool_id, 'host') def test_update_member_new_pool(self): with self.pool() as pool1: @@ -397,8 +452,8 @@ class TestLoadBalancerPluginNotificationWrapper(TestLoadBalancerPluginBase): member) self.assertEqual(2, self.mock_api.modify_pool.call_count) self.mock_api.modify_pool.assert_has_calls( - [mock.call(mock.ANY, pool1_id), - mock.call(mock.ANY, pool2_id)]) + [mock.call(mock.ANY, pool1_id, 'host'), + mock.call(mock.ANY, pool2_id, 'host')]) def test_delete_member(self): with self.pool() as pool: @@ -411,7 +466,7 @@ class TestLoadBalancerPluginNotificationWrapper(TestLoadBalancerPluginBase): res = req.get_response(self.ext_api) self.assertEqual(res.status_int, 204) self.mock_api.modify_pool.assert_called_once_with( - mock.ANY, pool_id) + mock.ANY, pool_id, 'host') def test_create_pool_health_monitor(self): with self.pool() as pool: @@ -422,7 +477,7 @@ class TestLoadBalancerPluginNotificationWrapper(TestLoadBalancerPluginBase): hm, pool_id) self.mock_api.modify_pool.assert_called_once_with( - mock.ANY, pool_id) + mock.ANY, pool_id, 'host') def test_delete_pool_health_monitor(self): with self.pool() as pool: @@ -436,7 +491,7 @@ class TestLoadBalancerPluginNotificationWrapper(TestLoadBalancerPluginBase): self.plugin_instance.delete_pool_health_monitor( ctx, hm['health_monitor']['id'], pool_id) self.mock_api.modify_pool.assert_called_once_with( - mock.ANY, pool_id) + mock.ANY, pool_id, 'host') def test_update_health_monitor_associated_with_pool(self): with self.health_monitor(type='HTTP') as monitor: @@ -457,7 +512,8 @@ class TestLoadBalancerPluginNotificationWrapper(TestLoadBalancerPluginBase): self.assertEqual(res.status_int, 201) self.mock_api.modify_pool.assert_called_once_with( mock.ANY, - pool['pool']['id'] + pool['pool']['id'], + 'host' ) self.mock_api.reset_mock() @@ -471,5 +527,6 @@ class TestLoadBalancerPluginNotificationWrapper(TestLoadBalancerPluginBase): req.get_response(self.ext_api) self.mock_api.modify_pool.assert_called_once_with( mock.ANY, - pool['pool']['id'] + pool['pool']['id'], + 'host' ) diff --git a/neutron/tests/unit/services/loadbalancer/test_agent_scheduler.py b/neutron/tests/unit/services/loadbalancer/test_agent_scheduler.py new file mode 100644 index 000000000..e3d3df0f8 --- /dev/null +++ b/neutron/tests/unit/services/loadbalancer/test_agent_scheduler.py @@ -0,0 +1,200 @@ +# Copyright (c) 2013 OpenStack Foundation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import mock +from webob import exc + +from neutron.api import extensions +from neutron.api.v2 import attributes +from neutron.common import constants +from neutron import context +from neutron.extensions import agent +from neutron.extensions import lbaas_agentscheduler +from neutron import manager +from neutron.plugins.common import constants as plugin_const +from neutron.tests.unit.db.loadbalancer import test_db_loadbalancer +from neutron.tests.unit.openvswitch import test_agent_scheduler +from neutron.tests.unit import test_agent_ext_plugin +from neutron.tests.unit import test_db_plugin as test_plugin +from neutron.tests.unit import test_extensions + +LBAAS_HOSTA = 'hosta' + + +class AgentSchedulerTestMixIn(test_agent_scheduler.AgentSchedulerTestMixIn): + def _list_pools_hosted_by_lbaas_agent(self, agent_id, + expected_code=exc.HTTPOk.code, + admin_context=True): + path = "/agents/%s/%s.%s" % (agent_id, + lbaas_agentscheduler.LOADBALANCER_POOLS, + self.fmt) + return self._request_list(path, expected_code=expected_code, + admin_context=admin_context) + + def _get_lbaas_agent_hosting_pool(self, pool_id, + expected_code=exc.HTTPOk.code, + admin_context=True): + path = "/lb/pools/%s/%s.%s" % (pool_id, + lbaas_agentscheduler.LOADBALANCER_AGENT, + self.fmt) + return self._request_list(path, expected_code=expected_code, + admin_context=admin_context) + + +class LBaaSAgentSchedulerTestCase(test_agent_ext_plugin.AgentDBTestMixIn, + AgentSchedulerTestMixIn, + test_db_loadbalancer.LoadBalancerTestMixin, + test_plugin.NeutronDbPluginV2TestCase): + fmt = 'json' + plugin_str = ('neutron.plugins.openvswitch.' + 'ovs_neutron_plugin.OVSNeutronPluginV2') + + def setUp(self): + # Save the global RESOURCE_ATTRIBUTE_MAP + self.saved_attr_map = {} + for resource, attrs in attributes.RESOURCE_ATTRIBUTE_MAP.iteritems(): + self.saved_attr_map[resource] = attrs.copy() + service_plugins = { + 'lb_plugin_name': test_db_loadbalancer.DB_LB_PLUGIN_KLASS} + super(LBaaSAgentSchedulerTestCase, self).setUp( + self.plugin_str, service_plugins=service_plugins) + ext_mgr = extensions.PluginAwareExtensionManager.get_instance() + self.ext_api = test_extensions.setup_extensions_middleware(ext_mgr) + self.adminContext = context.get_admin_context() + # Add the resources to the global attribute map + # This is done here as the setup process won't + # initialize the main API router which extends + # the global attribute map + attributes.RESOURCE_ATTRIBUTE_MAP.update( + agent.RESOURCE_ATTRIBUTE_MAP) + self.addCleanup(self.restore_attribute_map) + + def restore_attribute_map(self): + # Restore the original RESOURCE_ATTRIBUTE_MAP + attributes.RESOURCE_ATTRIBUTE_MAP = self.saved_attr_map + + def test_report_states(self): + self._register_agent_states(lbaas_agents=True) + agents = self._list_agents() + self.assertEqual(6, len(agents['agents'])) + + def test_pool_scheduling_on_pool_creation(self): + self._register_agent_states(lbaas_agents=True) + with self.pool() as pool: + lbaas_agent = self._get_lbaas_agent_hosting_pool( + pool['pool']['id']) + self.assertIsNotNone(lbaas_agent) + self.assertEqual(lbaas_agent['agent']['agent_type'], + constants.AGENT_TYPE_LOADBALANCER) + pools = self._list_pools_hosted_by_lbaas_agent( + lbaas_agent['agent']['id']) + self.assertEqual(1, len(pools['pools'])) + self.assertEqual(pool['pool'], pools['pools'][0]) + + def test_schedule_poll_with_disabled_agent(self): + lbaas_hosta = { + 'binary': 'neutron-loadbalancer-agent', + 'host': LBAAS_HOSTA, + 'topic': 'LOADBALANCER_AGENT', + 'configurations': {'device_driver': 'device_driver', + 'interface_driver': 'interface_driver'}, + 'agent_type': constants.AGENT_TYPE_LOADBALANCER} + self._register_one_agent_state(lbaas_hosta) + with self.pool() as pool: + lbaas_agent = self._get_lbaas_agent_hosting_pool( + pool['pool']['id']) + self.assertIsNotNone(lbaas_agent) + + agents = self._list_agents() + self._disable_agent(agents['agents'][0]['id']) + pool = {'pool': {'name': 'test', + 'subnet_id': 'test', + 'lb_method': 'ROUND_ROBIN', + 'protocol': 'HTTP', + 'admin_state_up': True, + 'tenant_id': 'test', + 'description': 'test'}} + lbaas_plugin = manager.NeutronManager.get_service_plugins()[ + plugin_const.LOADBALANCER] + self.assertRaises(lbaas_agentscheduler.NoEligibleLbaasAgent, + lbaas_plugin.create_pool, self.adminContext, pool) + + def test_schedule_poll_with_down_agent(self): + lbaas_hosta = { + 'binary': 'neutron-loadbalancer-agent', + 'host': LBAAS_HOSTA, + 'topic': 'LOADBALANCER_AGENT', + 'configurations': {'device_driver': 'device_driver', + 'interface_driver': 'interface_driver'}, + 'agent_type': constants.AGENT_TYPE_LOADBALANCER} + self._register_one_agent_state(lbaas_hosta) + is_agent_down_str = 'neutron.db.agents_db.AgentDbMixin.is_agent_down' + with mock.patch(is_agent_down_str) as mock_is_agent_down: + mock_is_agent_down.return_value = False + with self.pool() as pool: + lbaas_agent = self._get_lbaas_agent_hosting_pool( + pool['pool']['id']) + self.assertIsNotNone(lbaas_agent) + with mock.patch(is_agent_down_str) as mock_is_agent_down: + mock_is_agent_down.return_value = True + pool = {'pool': {'name': 'test', + 'subnet_id': 'test', + 'lb_method': 'ROUND_ROBIN', + 'protocol': 'HTTP', + 'admin_state_up': True, + 'tenant_id': 'test', + 'description': 'test'}} + lbaas_plugin = manager.NeutronManager.get_service_plugins()[ + plugin_const.LOADBALANCER] + self.assertRaises(lbaas_agentscheduler.NoEligibleLbaasAgent, + lbaas_plugin.create_pool, + self.adminContext, pool) + + def test_pool_unscheduling_on_pool_deletion(self): + self._register_agent_states(lbaas_agents=True) + with self.pool(no_delete=True) as pool: + lbaas_agent = self._get_lbaas_agent_hosting_pool( + pool['pool']['id']) + self.assertIsNotNone(lbaas_agent) + self.assertEqual(lbaas_agent['agent']['agent_type'], + constants.AGENT_TYPE_LOADBALANCER) + pools = self._list_pools_hosted_by_lbaas_agent( + lbaas_agent['agent']['id']) + self.assertEqual(1, len(pools['pools'])) + self.assertEqual(pool['pool'], pools['pools'][0]) + + req = self.new_delete_request('pools', + pool['pool']['id']) + res = req.get_response(self.ext_api) + self.assertEqual(res.status_int, 204) + pools = self._list_pools_hosted_by_lbaas_agent( + lbaas_agent['agent']['id']) + self.assertEqual(0, len(pools['pools'])) + + def test_pool_scheduling_non_admin_access(self): + self._register_agent_states(lbaas_agents=True) + with self.pool() as pool: + self._get_lbaas_agent_hosting_pool( + pool['pool']['id'], + expected_code=exc.HTTPForbidden.code, + admin_context=False) + self._list_pools_hosted_by_lbaas_agent( + 'fake_id', + expected_code=exc.HTTPForbidden.code, + admin_context=False) + + +class LBaaSAgentSchedulerTestCaseXML(LBaaSAgentSchedulerTestCase): + fmt = 'xml' diff --git a/neutron/tests/unit/test_agent_ext_plugin.py b/neutron/tests/unit/test_agent_ext_plugin.py index 10a1ecf0d..eb65d79bb 100644 --- a/neutron/tests/unit/test_agent_ext_plugin.py +++ b/neutron/tests/unit/test_agent_ext_plugin.py @@ -45,6 +45,8 @@ DHCP_HOSTA = 'hosta' L3_HOSTB = 'hostb' DHCP_HOSTC = 'hostc' DHCP_HOST1 = 'host1' +LBAAS_HOSTA = 'hosta' +LBAAS_HOSTB = 'hostb' class AgentTestExtensionManager(object): @@ -83,7 +85,7 @@ class AgentDBTestMixIn(object): self.assertEqual(agent_res.status_int, expected_res_status) return agent_res - def _register_agent_states(self): + def _register_agent_states(self, lbaas_agents=False): """Register two L3 agents and two DHCP agents.""" l3_hosta = { 'binary': 'neutron-l3-agent', @@ -110,6 +112,16 @@ class AgentDBTestMixIn(object): 'agent_type': constants.AGENT_TYPE_DHCP} dhcp_hostc = copy.deepcopy(dhcp_hosta) dhcp_hostc['host'] = DHCP_HOSTC + lbaas_hosta = { + 'binary': 'neutron-loadbalancer-agent', + 'host': LBAAS_HOSTA, + 'topic': 'LOADBALANCER_AGENT', + 'configurations': {'device_driver': 'device_driver', + 'interface_driver': 'interface_driver', + }, + 'agent_type': constants.AGENT_TYPE_LOADBALANCER} + lbaas_hostb = copy.deepcopy(lbaas_hosta) + lbaas_hostb['host'] = LBAAS_HOSTB callback = agents_db.AgentExtRpcCallback() callback.report_state(self.adminContext, agent_state={'agent_state': l3_hosta}, @@ -123,7 +135,18 @@ class AgentDBTestMixIn(object): callback.report_state(self.adminContext, agent_state={'agent_state': dhcp_hostc}, time=timeutils.strtime()) - return [l3_hosta, l3_hostb, dhcp_hosta, dhcp_hostc] + + res = [l3_hosta, l3_hostb, dhcp_hosta, dhcp_hostc] + if lbaas_agents: + callback.report_state(self.adminContext, + agent_state={'agent_state': lbaas_hosta}, + time=timeutils.strtime()) + callback.report_state(self.adminContext, + agent_state={'agent_state': lbaas_hostb}, + time=timeutils.strtime()) + res += [lbaas_hosta, lbaas_hostb] + + return res def _register_one_dhcp_agent(self): """Register one DHCP agent.""" diff --git a/neutron/tests/unit/test_neutron_manager.py b/neutron/tests/unit/test_neutron_manager.py index 16033e3ca..6c6a3eec7 100644 --- a/neutron/tests/unit/test_neutron_manager.py +++ b/neutron/tests/unit/test_neutron_manager.py @@ -47,6 +47,11 @@ class MultiServiceCorePlugin(object): supported_extension_aliases = ['lbaas', 'dummy'] +class CorePluginWithAgentNotifiers(object): + agent_notifiers = {'l3': 'l3_agent_notifier', + 'dhcp': 'dhcp_agent_notifier'} + + class NeutronManagerTestCase(base.BaseTestCase): def setUp(self): @@ -121,3 +126,16 @@ class NeutronManagerTestCase(base.BaseTestCase): self.assertIsNotNone(validate_pre_plugin_load()) cfg.CONF.set_override('core_plugin', 'dummy.plugin') self.assertIsNone(validate_pre_plugin_load()) + + def test_manager_gathers_agent_notifiers_from_service_plugins(self): + cfg.CONF.set_override("service_plugins", + ["neutron.tests.unit.dummy_plugin." + "DummyServicePlugin"]) + cfg.CONF.set_override("core_plugin", + "neutron.tests.unit.test_neutron_manager." + "CorePluginWithAgentNotifiers") + expected = {'l3': 'l3_agent_notifier', + 'dhcp': 'dhcp_agent_notifier', + 'dummy': 'dummy_agent_notifier'} + core_plugin = NeutronManager.get_plugin() + self.assertEqual(expected, core_plugin.agent_notifiers)