From 0070b452f1c00817dd4a47deb57bfbe6b79c4152 Mon Sep 17 00:00:00 2001 From: gongysh Date: Fri, 22 Feb 2013 23:34:57 +0800 Subject: [PATCH] Add scheduling feature basing on agent management extension 3rd part of blueprint quantum-scheduler 1. Allow networks to be hosted by certain dhcp agents. Network to dhcp agent is a many to many relationship. Provide a simple scheduler to schedule a network randomly to an active dhcp agent when a network or port is created. 2. Allow admin user to (de)schedule network to a certain dhcp agent manually. 3. Allow routers to be hosted by a certain l3 agent. Router to l3 agent is a many to one relationship. Provide a simple scheduler to schedule a router to l3 agent if the router is not scheduled when the router is updated. 4. Auto schedule networks and routers to agents when agents start. 5. Only support ovs plugin at this point Change-Id: Iddec3ea9d4c0fe2d51a59f7db47145722fc5a1cd --- etc/policy.json | 11 +- etc/quantum.conf | 26 +- quantum/agent/dhcp_agent.py | 7 +- quantum/agent/l3_agent.py | 44 +- .../rpc/agentnotifiers/dhcp_rpc_agent_api.py | 60 +- .../rpc/agentnotifiers/l3_rpc_agent_api.py | 120 +++ quantum/common/constants.py | 2 + quantum/common/utils.py | 5 + quantum/db/agents_db.py | 32 +- quantum/db/agentschedulers_db.py | 363 ++++++++ quantum/db/dhcp_rpc_base.py | 25 +- quantum/db/extraroute_db.py | 5 +- quantum/db/l3_db.py | 51 +- quantum/db/l3_rpc_agent_api.py | 50 -- quantum/db/l3_rpc_base.py | 15 +- .../versions/4692d074d587_agent_scheduler.py | 79 ++ quantum/extensions/agentscheduler.py | 252 ++++++ quantum/plugins/openvswitch/common/config.py | 2 + .../plugins/openvswitch/ovs_quantum_plugin.py | 37 +- quantum/policy.py | 1 + quantum/scheduler/__init__.py | 34 + quantum/scheduler/dhcp_agent_scheduler.py | 108 +++ quantum/scheduler/l3_agent_scheduler.py | 149 ++++ .../unit/openvswitch/test_agent_scheduler.py | 803 ++++++++++++++++++ quantum/tests/unit/test_agent_ext_plugin.py | 38 +- quantum/tests/unit/test_db_rpc_base.py | 2 +- quantum/tests/unit/test_l3_agent.py | 14 +- quantum/tests/unit/test_l3_plugin.py | 46 +- tox.ini | 6 +- 29 files changed, 2230 insertions(+), 157 deletions(-) create mode 100644 quantum/api/rpc/agentnotifiers/l3_rpc_agent_api.py create mode 100644 quantum/db/agentschedulers_db.py delete mode 100644 quantum/db/l3_rpc_agent_api.py create mode 100644 quantum/db/migration/alembic_migrations/versions/4692d074d587_agent_scheduler.py create mode 100644 quantum/extensions/agentscheduler.py create mode 100644 quantum/scheduler/__init__.py create mode 100644 quantum/scheduler/dhcp_agent_scheduler.py create mode 100644 quantum/scheduler/l3_agent_scheduler.py create mode 100644 quantum/tests/unit/openvswitch/test_agent_scheduler.py diff --git a/etc/policy.json b/etc/policy.json index a427507f5..aa18ba6e1 100644 --- a/etc/policy.json +++ b/etc/policy.json @@ -58,5 +58,14 @@ "update_agent": "rule:admin_only", "delete_agent": "rule:admin_only", "get_agent": "rule:admin_only", - "get_agents": "rule:admin_only" + "get_agents": "rule:admin_only", + + "create_dhcp-network": "rule:admin_only", + "delete_dhcp-network": "rule:admin_only", + "get_dhcp-networks": "rule:admin_only", + "create_l3-router": "rule:admin_only", + "delete_l3-router": "rule:admin_only", + "get_l3-routers": "rule:admin_only", + "get_dhcp-agents": "rule:admin_only", + "get_l3-agents": "rule:admin_only" } diff --git a/etc/quantum.conf b/etc/quantum.conf index d14aac9ff..89005b636 100644 --- a/etc/quantum.conf +++ b/etc/quantum.conf @@ -198,6 +198,27 @@ notification_topics = notifications # Maximum number of fixed ips per port # max_fixed_ips_per_port = 5 +# =========== items for agent management extension ============= +# Seconds to regard the agent as down. +# agent_down_time = 5 +# =========== end of items for agent management extension ===== + +# =========== items for agent scheduler extension ============= +# Driver to use for scheduling network to DHCP agent +# network_scheduler_driver = quantum.scheduler.dhcp_agent_scheduler.ChanceScheduler +# Driver to use for scheduling router to a default L3 agent +# router_scheduler_driver = quantum.scheduler.l3_agent_scheduler.ChanceScheduler + +# Allow auto scheduling networks to DHCP agent. It will schedule non-hosted +# networks to first DHCP agent which sends get_active_networks message to +# quantum server +# network_auto_schedule = True + +# Allow auto scheduling routers to L3 agent. It will schedule non-hosted +# routers to first L3 agent which sends sync_routers message to quantum server +# router_auto_schedule = True +# =========== end of items for agent scheduler extension ===== + [QUOTAS] # resource name(s) that are supported in quota features # quota_items = network,subnet,port @@ -217,11 +238,6 @@ notification_topics = notifications # default driver to use for quota checks # quota_driver = quantum.quota.ConfDriver -# =========== items for agent management extension ============= -# Seconds to regard the agent as down. -# agent_down_time = 5 -# =========== end of items for agent management extension ===== - [DEFAULT_SERVICETYPE] # Description of the default service type (optional) # description = "default service type" diff --git a/quantum/agent/dhcp_agent.py b/quantum/agent/dhcp_agent.py index 532647b9e..917626468 100644 --- a/quantum/agent/dhcp_agent.py +++ b/quantum/agent/dhcp_agent.py @@ -320,7 +320,7 @@ class DhcpPluginApi(proxy.RpcProxy): super(DhcpPluginApi, self).__init__( topic=topic, default_version=self.BASE_RPC_API_VERSION) self.context = context - self.host = socket.gethostname() + self.host = cfg.CONF.host def get_active_networks(self): """Make a remote process call to retrieve the active networks.""" @@ -685,6 +685,11 @@ class DhcpAgentWithStateReport(DhcpAgent): if self.agent_state.pop('start_flag', None): self.run() + def agent_updated(self, context, payload): + """Handle the agent_updated notification event.""" + self.needs_resync = True + LOG.info(_("agent_updated by server side %s!"), payload) + def after_start(self): LOG.info(_("DHCP agent started")) diff --git a/quantum/agent/l3_agent.py b/quantum/agent/l3_agent.py index 06076db28..c67ebe6a1 100644 --- a/quantum/agent/l3_agent.py +++ b/quantum/agent/l3_agent.py @@ -93,7 +93,7 @@ class L3PluginApi(proxy.RpcProxy): class RouterInfo(object): - def __init__(self, router_id, root_helper, use_namespaces, router=None): + def __init__(self, router_id, root_helper, use_namespaces, router): self.router_id = router_id self.ex_gw_port = None self.internal_ports = [] @@ -227,7 +227,7 @@ class L3NATAgent(manager.Manager): else: raise - def _router_added(self, router_id, router=None): + def _router_added(self, router_id, router): ri = RouterInfo(router_id, self.root_helper, self.conf.use_namespaces, router) self.router_info[router_id] = ri @@ -242,6 +242,10 @@ class L3NATAgent(manager.Manager): def _router_removed(self, router_id): ri = self.router_info[router_id] + ri.router['gw_port'] = None + ri.router[l3_constants.INTERFACE_KEY] = [] + ri.router[l3_constants.FLOATINGIP_KEY] = [] + self.process_router(ri) for c, r in self.metadata_filter_rules(): ri.iptables_manager.ipv4['filter'].remove_rule(c, r) for c, r in self.metadata_nat_rules(): @@ -568,7 +572,13 @@ class L3NATAgent(manager.Manager): LOG.debug(msg) self.fullsync = True - def _process_routers(self, routers): + def router_removed_from_agent(self, context, payload): + self.router_deleted(context, payload['router_id']) + + def router_added_to_agent(self, context, payload): + self.routers_updated(context, payload) + + def _process_routers(self, routers, all_routers=False): if (self.conf.external_network_bridge and not ip_lib.device_exists(self.conf.external_network_bridge)): LOG.error(_("The external network bridge '%s' does not exist"), @@ -576,7 +586,17 @@ class L3NATAgent(manager.Manager): return target_ex_net_id = self._fetch_external_net_id() - + # if routers are all the routers we have (They are from router sync on + # starting or when error occurs during running), we seek the + # routers which should be removed. + # If routers are from server side notification, we seek them + # from subset of incoming routers and ones we have now. + if all_routers: + prev_router_ids = set(self.router_info) + else: + prev_router_ids = set(self.router_info) & set( + [router['id'] for router in routers]) + cur_router_ids = set() for r in routers: if not r['admin_state_up']: continue @@ -593,13 +613,15 @@ class L3NATAgent(manager.Manager): if ex_net_id and ex_net_id != target_ex_net_id: continue - + cur_router_ids.add(r['id']) if r['id'] not in self.router_info: - self._router_added(r['id']) - + self._router_added(r['id'], r) ri = self.router_info[r['id']] ri.router = r self.process_router(ri) + # identify and remove routers that no longer exist + for router_id in prev_router_ids - cur_router_ids: + self._router_removed(router_id) @periodic_task.periodic_task def _sync_routers_task(self, context): @@ -613,8 +635,7 @@ class L3NATAgent(manager.Manager): router_id = None routers = self.plugin_rpc.get_routers( context, router_id) - self.router_info = {} - self._process_routers(routers) + self._process_routers(routers, all_routers=True) self.fullsync = False except Exception: LOG.exception(_("Failed synchronizing routers")) @@ -704,6 +725,11 @@ class L3NATAgentWithStateReport(L3NATAgent): except Exception: LOG.exception(_("Failed reporting state!")) + def agent_updated(self, context, payload): + """Handle the agent_updated notification event.""" + self.fullsync = True + LOG.info(_("agent_updated by server side %s!"), payload) + def main(): eventlet.monkey_patch() diff --git a/quantum/api/rpc/agentnotifiers/dhcp_rpc_agent_api.py b/quantum/api/rpc/agentnotifiers/dhcp_rpc_agent_api.py index 5cf03694f..2a01ad18e 100644 --- a/quantum/api/rpc/agentnotifiers/dhcp_rpc_agent_api.py +++ b/quantum/api/rpc/agentnotifiers/dhcp_rpc_agent_api.py @@ -13,7 +13,10 @@ # See the License for the specific language governing permissions and # limitations under the License. +from quantum.common import constants from quantum.common import topics +from quantum.common import utils +from quantum import manager from quantum.openstack.common import log as logging from quantum.openstack.common.rpc import proxy @@ -40,11 +43,35 @@ class DhcpAgentNotifyAPI(proxy.RpcProxy): super(DhcpAgentNotifyAPI, self).__init__( topic=topic, default_version=self.BASE_RPC_API_VERSION) - def _notification(self, context, method, payload): + def _get_dhcp_agents(self, context, network_id): + plugin = manager.QuantumManager.get_plugin() + dhcp_agents = plugin.get_dhcp_agents_hosting_networks( + context, [network_id], active=True) + return [(dhcp_agent.host, dhcp_agent.topic) for + dhcp_agent in dhcp_agents] + + def _notification_host(self, context, method, payload, host): + """Notify the agent on host""" + self.cast( + context, self.make_msg(method, + payload=payload), + topic='%s.%s' % (topics.DHCP_AGENT, host)) + + def _notification(self, context, method, payload, network_id): """Notify all the agents that are hosting the network""" - # By now, we have no scheduling feature, so we fanout - # to all of the DHCP agents - self._notification_fanout(context, method, payload) + plugin = manager.QuantumManager.get_plugin() + if (method != 'network_delete_end' and utils.is_extension_supported( + plugin, constants.AGENT_SCHEDULER_EXT_ALIAS)): + for (host, topic) in self._get_dhcp_agents(context, network_id): + self.cast( + context, self.make_msg(method, + payload=payload), + topic='%s.%s' % (topic, host)) + else: + # besides the non-agentscheduler plugin, + # There is no way to query who is hosting the network + # when the network is deleted, so we need to fanout + self._notification_fanout(context, method, payload) def _notification_fanout(self, context, method, payload): """Fanout the payload to all dhcp agents""" @@ -53,6 +80,19 @@ class DhcpAgentNotifyAPI(proxy.RpcProxy): payload=payload), topic=topics.DHCP_AGENT) + def network_removed_from_agent(self, context, network_id, host): + self._notification_host(context, 'network_delete_end', + {'network_id': network_id}, host) + + def network_added_to_agent(self, context, network_id, host): + self._notification_host(context, 'network_create_end', + {'network': {'id': network_id}}, host) + + def agent_updated(self, context, admin_state_up, host): + self._notification_host(context, 'agent_updated', + {'admin_state_up': admin_state_up}, + host) + def notify(self, context, data, methodname): # data is {'key' : 'value'} with only one key if methodname not in self.VALID_METHOD_NAMES: @@ -61,10 +101,18 @@ class DhcpAgentNotifyAPI(proxy.RpcProxy): if obj_type not in self.VALID_RESOURCES: return obj_value = data[obj_type] + network_id = None + if obj_type == 'network' and 'id' in obj_value: + network_id = obj_value['id'] + elif obj_type in ['port', 'subnet'] and 'network_id' in obj_value: + network_id = obj_value['network_id'] + if not network_id: + return methodname = methodname.replace(".", "_") if methodname.endswith("_delete_end"): if 'id' in obj_value: self._notification(context, methodname, - {obj_type + '_id': obj_value['id']}) + {obj_type + '_id': obj_value['id']}, + network_id) else: - self._notification(context, methodname, data) + self._notification(context, methodname, data, network_id) diff --git a/quantum/api/rpc/agentnotifiers/l3_rpc_agent_api.py b/quantum/api/rpc/agentnotifiers/l3_rpc_agent_api.py new file mode 100644 index 000000000..c5085cb61 --- /dev/null +++ b/quantum/api/rpc/agentnotifiers/l3_rpc_agent_api.py @@ -0,0 +1,120 @@ +# Copyright (c) 2013 OpenStack, LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from quantum.common import constants +from quantum.common import topics +from quantum.common import utils +from quantum import manager +from quantum.openstack.common import log as logging +from quantum.openstack.common.rpc import proxy + + +LOG = logging.getLogger(__name__) + + +class L3AgentNotifyAPI(proxy.RpcProxy): + """API for plugin to notify L3 agent.""" + BASE_RPC_API_VERSION = '1.0' + + def __init__(self, topic=topics.L3_AGENT): + super(L3AgentNotifyAPI, self).__init__( + topic=topic, default_version=self.BASE_RPC_API_VERSION) + + def _notification_host(self, context, method, payload, host): + """Notify the agent that is hosting the router""" + LOG.debug(_('Nofity agent at %(host)s the message ' + '%(method)s'), {'host': host, + 'method': method}) + self.cast( + context, self.make_msg(method, + payload=payload), + topic='%s.%s' % (topics.L3_AGENT, host)) + + def _agent_notification(self, context, method, routers, + operation, data): + """Notify changed routers to hosting l3 agents. + + Adjust routers according to l3 agents' role and + related dhcp agents. + Notify dhcp agent to get right subnet's gateway ips. + """ + adminContext = context.is_admin and context or context.elevated() + plugin = manager.QuantumManager.get_plugin() + for router in routers: + l3_agents = plugin.get_l3_agents_hosting_routers( + adminContext, [router['id']], + admin_state_up=True, + active=True) + for l3_agent in l3_agents: + LOG.debug(_('Notify agent at %(topic)s.%(host)s the message ' + '%(method)s'), + {'topic': l3_agent.topic, + 'host': l3_agent.host, + 'method': method}) + self.cast( + context, self.make_msg(method, + routers=[router]), + topic='%s.%s' % (l3_agent.topic, l3_agent.host)) + + def _notification(self, context, method, routers, operation, data): + """Notify all the agents that are hosting the routers""" + plugin = manager.QuantumManager.get_plugin() + if utils.is_extension_supported( + plugin, constants.AGENT_SCHEDULER_EXT_ALIAS): + adminContext = (context.is_admin and + context or context.elevated()) + plugin.schedule_routers(adminContext, routers) + self._agent_notification( + context, method, routers, operation, data) + else: + self.fanout_cast( + context, self.make_msg(method, + routers=routers), + topic=topics.L3_AGENT) + + def _notification_fanout(self, context, method, router_id): + """Fanout the deleted router to all L3 agents""" + LOG.debug(_('Fanout notify agent at %(topic)s the message ' + '%(method)s on router %(router_id)s'), + {'topic': topics.DHCP_AGENT, + 'method': method, + 'router_id': router_id}) + self.fanout_cast( + context, self.make_msg(method, + router_id=router_id), + topic=topics.L3_AGENT) + + def agent_updated(self, context, admin_state_up, host): + self._notification_host(context, 'agent_updated', + {'admin_state_up': admin_state_up}, + host) + + def router_deleted(self, context, router_id): + self._notification_fanout(context, 'router_deleted', router_id) + + def routers_updated(self, context, routers, operation=None, data=None): + if routers: + self._notification(context, 'routers_updated', routers, + operation, data) + + def router_removed_from_agent(self, context, router_id, host): + self._notification_host(context, 'router_removed_from_agent', + {'router_id': router_id}, host) + + def router_added_to_agent(self, context, routers, host): + self._notification_host(context, 'router_added_to_agent', + routers, host) + +L3AgentNotify = L3AgentNotifyAPI() diff --git a/quantum/common/constants.py b/quantum/common/constants.py index 3e27fdf14..e1bc25f18 100644 --- a/quantum/common/constants.py +++ b/quantum/common/constants.py @@ -66,3 +66,5 @@ PAGINATION_INFINITE = 'infinite' SORT_DIRECTION_ASC = 'asc' SORT_DIRECTION_DESC = 'desc' + +AGENT_SCHEDULER_EXT_ALIAS = 'agent_scheduler' diff --git a/quantum/common/utils.py b/quantum/common/utils.py index d6b7c6e25..5522e9a7f 100644 --- a/quantum/common/utils.py +++ b/quantum/common/utils.py @@ -183,3 +183,8 @@ def diff_list_of_dict(old_list, new_list): added = new_set - old_set removed = old_set - new_set return [str2dict(a) for a in added], [str2dict(r) for r in removed] + + +def is_extension_supported(plugin, ext_alias): + return ext_alias in getattr( + plugin, "supported_extension_aliases", []) diff --git a/quantum/db/agents_db.py b/quantum/db/agents_db.py index 70c56780e..0f9a236ad 100644 --- a/quantum/db/agents_db.py +++ b/quantum/db/agents_db.py @@ -67,24 +67,30 @@ class AgentDbMixin(ext_agent.AgentPluginBase): raise ext_agent.AgentNotFound(id=id) return agent - def _is_agent_down(self, heart_beat_time_str): - return timeutils.is_older_than(heart_beat_time_str, + @classmethod + def is_agent_down(cls, heart_beat_time): + return timeutils.is_older_than(heart_beat_time, cfg.CONF.agent_down_time) + def get_configuration_dict(self, agent_db): + try: + conf = jsonutils.loads(agent_db.configurations) + except Exception: + msg = _('Configuration for agent %(agent_type)s on host %(host)s' + ' is invalid.') + LOG.warn(msg, {'agent_type': agent_db.agent_type, + 'host': agent_db.host}) + conf = {} + return conf + def _make_agent_dict(self, agent, fields=None): attr = ext_agent.RESOURCE_ATTRIBUTE_MAP.get( ext_agent.RESOURCE_NAME + 's') res = dict((k, agent[k]) for k in attr if k not in ['alive', 'configurations']) - res['alive'] = not self._is_agent_down(res['heartbeat_timestamp']) - try: - res['configurations'] = jsonutils.loads(agent['configurations']) - except Exception: - msg = _('Configurations for agent %(agent_type)s on host %(host)s' - ' are invalid.') - LOG.warn(msg, {'agent_type': res['agent_type'], - 'host': res['host']}) - res['configurations'] = {} + res['alive'] = not AgentDbMixin.is_agent_down( + res['heartbeat_timestamp']) + res['configurations'] = self.get_configuration_dict(agent) return self._fields(res, fields) def delete_agent(self, context, id): @@ -99,6 +105,10 @@ class AgentDbMixin(ext_agent.AgentPluginBase): agent.update(agent_data) return self._make_agent_dict(agent) + def get_agents_db(self, context, filters=None): + query = self._get_collection_query(context, Agent, filters=filters) + return query.all() + def get_agents(self, context, filters=None, fields=None): return self._get_collection(context, Agent, self._make_agent_dict, diff --git a/quantum/db/agentschedulers_db.py b/quantum/db/agentschedulers_db.py new file mode 100644 index 000000000..5137820f8 --- /dev/null +++ b/quantum/db/agentschedulers_db.py @@ -0,0 +1,363 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (c) 2013 OpenStack LLC. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import copy + +import sqlalchemy as sa +from sqlalchemy import orm +from sqlalchemy.orm import exc +from sqlalchemy.orm import joinedload + +from quantum.api.v2 import attributes +from quantum.common import constants +from quantum.db import agents_db +from quantum.db import model_base +from quantum.db import models_v2 +from quantum.extensions import agentscheduler +from quantum.openstack.common import log as logging +from quantum.openstack.common import uuidutils + + +LOG = logging.getLogger(__name__) + + +class NetworkDhcpAgentBinding(model_base.BASEV2): + """Represents binding between quantum networks and DHCP agents""" + network_id = sa.Column(sa.String(36), + sa.ForeignKey("networks.id", ondelete='CASCADE'), + primary_key=True) + dhcp_agent = orm.relation(agents_db.Agent) + dhcp_agent_id = sa.Column(sa.String(36), + sa.ForeignKey("agents.id", + ondelete='CASCADE'), + primary_key=True) + + +class RouterL3AgentBinding(model_base.BASEV2, models_v2.HasId): + """Represents binding between quantum routers and L3 agents""" + router_id = sa.Column(sa.String(36), + sa.ForeignKey("routers.id", ondelete='CASCADE')) + l3_agent = orm.relation(agents_db.Agent) + l3_agent_id = sa.Column(sa.String(36), + sa.ForeignKey("agents.id", + ondelete='CASCADE')) + + +class AgentSchedulerDbMixin(agentscheduler.AgentSchedulerPluginBase): + """Mixin class to add agent scheduler extension to db_plugin_base_v2.""" + + dhcp_agent_notifier = None + l3_agent_notifier = None + network_scheduler = None + router_scheduler = None + + def get_dhcp_agents_hosting_networks( + self, context, network_ids, active=None): + if not network_ids: + return [] + query = context.session.query(NetworkDhcpAgentBinding) + query = query.options(joinedload('dhcp_agent')) + if len(network_ids) == 1: + query = query.filter( + NetworkDhcpAgentBinding.network_id == network_ids[0]) + elif network_ids: + query = query.filter( + NetworkDhcpAgentBinding.network_id in network_ids) + if active is not None: + query = (query.filter(agents_db.Agent.admin_state_up == active)) + dhcp_agents = [binding.dhcp_agent for binding in query.all()] + if active is not None: + dhcp_agents = [dhcp_agent for dhcp_agent in + dhcp_agents if not + agents_db.AgentDbMixin.is_agent_down( + dhcp_agent['heartbeat_timestamp'])] + return dhcp_agents + + def add_network_to_dhcp_agent(self, context, id, network_id): + self._get_network(context, network_id) + with context.session.begin(subtransactions=True): + agent_db = self._get_agent(context, id) + if (agent_db['agent_type'] != constants.AGENT_TYPE_DHCP or + not agent_db['admin_state_up']): + raise agentscheduler.InvalidDHCPAgent(id=id) + dhcp_agents = self.get_dhcp_agents_hosting_networks( + context, [network_id]) + for dhcp_agent in dhcp_agents: + if id == dhcp_agent.id: + raise agentscheduler.NetworkHostedByDHCPAgent( + network_id=network_id, agent_id=id) + binding = NetworkDhcpAgentBinding() + binding.dhcp_agent_id = id + binding.network_id = network_id + context.session.add(binding) + if self.dhcp_agent_notifier: + self.dhcp_agent_notifier.network_added_to_agent( + context, network_id, agent_db.host) + + def remove_network_from_dhcp_agent(self, context, id, network_id): + agent = self._get_agent(context, id) + with context.session.begin(subtransactions=True): + try: + query = context.session.query(NetworkDhcpAgentBinding) + binding = query.filter( + NetworkDhcpAgentBinding.network_id == network_id, + NetworkDhcpAgentBinding.dhcp_agent_id == id).one() + except exc.NoResultFound: + raise agentscheduler.NetworkNotHostedByDhcpAgent( + network_id=network_id, agent_id=id) + context.session.delete(binding) + if self.dhcp_agent_notifier: + self.dhcp_agent_notifier.network_removed_from_agent( + context, network_id, agent.host) + + def list_networks_on_dhcp_agent(self, context, id): + query = context.session.query(NetworkDhcpAgentBinding.network_id) + net_ids = query.filter( + NetworkDhcpAgentBinding.dhcp_agent_id == id).all() + if net_ids: + _ids = [item[0] for item in net_ids] + return {'networks': + self.get_networks(context, filters={'id': _ids})} + else: + return {'networks': []} + + def list_active_networks_on_active_dhcp_agent(self, context, host): + agent = self._get_agent_by_type_and_host( + context, constants.AGENT_TYPE_DHCP, host) + if not agent.admin_state_up: + return [] + query = context.session.query(NetworkDhcpAgentBinding.network_id) + net_ids = query.filter( + NetworkDhcpAgentBinding.dhcp_agent_id == agent.id).all() + if net_ids: + _ids = [item[0] for item in net_ids] + return self.get_networks( + context, filters={'id': _ids, 'admin_state_up': [True]}) + else: + return [] + + def list_dhcp_agents_hosting_network(self, context, network_id): + dhcp_agents = self.get_dhcp_agents_hosting_networks( + context, [network_id]) + agent_ids = [dhcp_agent.id for dhcp_agent in dhcp_agents] + if agent_ids: + return { + 'agents': + self.get_agents(context, filters={'id': agent_ids})} + else: + return {'agents': []} + + def add_router_to_l3_agent(self, context, id, router_id): + """Add a l3 agent to host a router. + """ + router = self.get_router(context, router_id) + with context.session.begin(subtransactions=True): + agent_db = self._get_agent(context, id) + if (agent_db['agent_type'] != constants.AGENT_TYPE_L3 or + not agent_db['admin_state_up'] or + not self.get_l3_agent_candidates(router, [agent_db])): + raise agentscheduler.InvalidL3Agent(id=id) + query = context.session.query(RouterL3AgentBinding) + try: + binding = query.filter( + RouterL3AgentBinding.l3_agent_id == agent_db.id, + RouterL3AgentBinding.router_id == router_id).one() + if binding: + raise agentscheduler.RouterHostedByL3Agent( + router_id=router_id, agent_id=id) + except exc.NoResultFound: + pass + + result = self.auto_schedule_routers(context, + agent_db.host, + router_id) + if not result: + raise agentscheduler.RouterSchedulingFailed( + router_id=router_id, agent_id=id) + + if self.l3_agent_notifier: + routers = self.get_sync_data(context, [router_id]) + self.l3_agent_notifier.router_added_to_agent( + context, routers, agent_db.host) + + def remove_router_from_l3_agent(self, context, id, router_id): + """Remove the router from l3 agent. After it, the router + will be non-hosted until there is update which + lead to re schedule or be added to another agent manually.""" + agent = self._get_agent(context, id) + with context.session.begin(subtransactions=True): + query = context.session.query(RouterL3AgentBinding) + query = query.filter( + RouterL3AgentBinding.router_id == router_id, + RouterL3AgentBinding.l3_agent_id == id) + try: + binding = query.one() + except exc.NoResultFound: + raise agentscheduler.RouterNotHostedByL3Agent( + router_id=router_id, agent_id=id) + context.session.delete(binding) + if self.l3_agent_notifier: + self.l3_agent_notifier.router_removed_from_agent( + context, router_id, agent.host) + + def list_routers_on_l3_agent(self, context, id): + query = context.session.query(RouterL3AgentBinding.router_id) + router_ids = query.filter( + RouterL3AgentBinding.l3_agent_id == id).all() + if router_ids: + _ids = [item[0] for item in router_ids] + return {'routers': + self.get_routers(context, filters={'id': _ids})} + else: + return {'routers': []} + + def list_active_sync_routers_on_active_l3_agent( + self, context, host, router_id): + agent = self._get_agent_by_type_and_host( + context, constants.AGENT_TYPE_L3, host) + if not agent.admin_state_up: + return [] + query = context.session.query(RouterL3AgentBinding.router_id) + query = query.filter( + RouterL3AgentBinding.l3_agent_id == agent.id) + if router_id: + query = query.filter(RouterL3AgentBinding.router_id == router_id) + router_ids = query.all() + if router_ids: + _ids = [item[0] for item in router_ids] + routers = self.get_sync_data(context, router_ids=_ids, + active=True) + return routers + return [] + + def get_l3_agents_hosting_routers(self, context, router_ids, + admin_state_up=None, + active=None): + if not router_ids: + return [] + query = context.session.query(RouterL3AgentBinding) + if len(router_ids) > 1: + query = query.options(joinedload('l3_agent')).filter( + RouterL3AgentBinding.router_id.in_(router_ids)) + else: + query = query.options(joinedload('l3_agent')).filter( + RouterL3AgentBinding.router_id == router_ids[0]) + if admin_state_up is not None: + query = (query.filter(agents_db.Agent.admin_state_up == + admin_state_up)) + l3_agents = [binding.l3_agent for binding in query.all()] + if active is not None: + l3_agents = [l3_agent for l3_agent in + l3_agents if not + agents_db.AgentDbMixin.is_agent_down( + l3_agent['heartbeat_timestamp'])] + return l3_agents + + def _get_l3_bindings_hosting_routers(self, context, router_ids): + if not router_ids: + return [] + query = context.session.query(RouterL3AgentBinding) + if len(router_ids) > 1: + query = query.options(joinedload('l3_agent')).filter( + RouterL3AgentBinding.router_id.in_(router_ids)) + else: + query = query.options(joinedload('l3_agent')).filter( + RouterL3AgentBinding.router_id == router_ids[0]) + return query.all() + + def list_l3_agents_hosting_router(self, context, router_id): + with context.session.begin(subtransactions=True): + bindings = self._get_l3_bindings_hosting_routers( + context, [router_id]) + results = [] + for binding in bindings: + l3_agent_dict = self._make_agent_dict(binding.l3_agent) + results.append(l3_agent_dict) + if results: + return {'agents': results} + else: + return {'agents': []} + + def schedule_network(self, context, request_network, created_network): + if self.network_scheduler: + result = self.network_scheduler.schedule( + self, context, request_network, created_network) + if not result: + LOG.warn(_('Fail scheduling network %s'), created_network) + + def auto_schedule_networks(self, context, host): + if self.network_scheduler: + self.network_scheduler.auto_schedule_networks(self, context, host) + + def get_l3_agents(self, context, active=None, filters=None): + query = context.session.query(agents_db.Agent) + query = query.filter( + agents_db.Agent.agent_type == constants.AGENT_TYPE_L3) + if active is not None: + query = (query.filter(agents_db.Agent.admin_state_up == active)) + if filters: + for key, value in filters.iteritems(): + column = getattr(agents_db.Agent, key, None) + if column: + query = query.filter(column.in_(value)) + l3_agents = query.all() + if active is not None: + l3_agents = [l3_agent for l3_agent in + l3_agents if not + agents_db.AgentDbMixin.is_agent_down( + l3_agent['heartbeat_timestamp'])] + return l3_agents + + def get_l3_agent_candidates(self, sync_router, l3_agents): + """Get the valid l3 agents for the router from a list of l3_agents""" + candidates = [] + for l3_agent in l3_agents: + if not l3_agent.admin_state_up: + continue + agent_conf = self.get_configuration_dict(l3_agent) + router_id = agent_conf.get('router_id', None) + use_namespaces = agent_conf.get('use_namespaces', True) + handle_internal_only_routers = agent_conf.get( + 'handle_internal_only_routers', True) + gateway_external_network_id = agent_conf.get( + 'gateway_external_network_id', None) + if not use_namespaces and router_id != sync_router['id']: + continue + ex_net_id = (sync_router['external_gateway_info'] or {}).get( + 'network_id') + if ((not ex_net_id and not handle_internal_only_routers) or + (ex_net_id and gateway_external_network_id and + ex_net_id != gateway_external_network_id)): + continue + candidates.append(l3_agent) + return candidates + + def auto_schedule_routers(self, context, host, router_id): + if self.router_scheduler: + return self.router_scheduler.auto_schedule_routers( + self, context, host, router_id) + + def schedule_router(self, context, router): + if self.router_scheduler: + return self.router_scheduler.schedule( + self, context, router) + + def schedule_routers(self, context, routers): + """Schedule the routers to l3 agents. + """ + for router in routers: + self.schedule_router(context, router) diff --git a/quantum/db/dhcp_rpc_base.py b/quantum/db/dhcp_rpc_base.py index 78a327d56..958147251 100644 --- a/quantum/db/dhcp_rpc_base.py +++ b/quantum/db/dhcp_rpc_base.py @@ -13,9 +13,12 @@ # See the License for the specific language governing permissions and # limitations under the License. +from oslo.config import cfg from sqlalchemy.orm import exc from quantum.api.v2 import attributes +from quantum.common import constants +from quantum.common import utils from quantum import manager from quantum.openstack.common import log as logging @@ -31,14 +34,24 @@ class DhcpRpcCallbackMixin(object): host = kwargs.get('host') LOG.debug(_('Network list requested from %s'), host) plugin = manager.QuantumManager.get_plugin() - filters = dict(admin_state_up=[True]) - - return [net['id'] for net in - plugin.get_networks(context, filters=filters)] + if utils.is_extension_supported( + plugin, constants.AGENT_SCHEDULER_EXT_ALIAS): + if cfg.CONF.network_auto_schedule: + plugin.auto_schedule_networks(context, host) + nets = plugin.list_active_networks_on_active_dhcp_agent( + context, host) + else: + filters = dict(admin_state_up=[True]) + nets = plugin.get_networks(context, filters=filters) + return [net['id'] for net in nets] def get_network_info(self, context, **kwargs): """Retrieve and return a extended information about a network.""" network_id = kwargs.get('network_id') + host = kwargs.get('host') + LOG.debug(_('Network %(network_id)s requested from ' + '%(host)s'), {'network_id': network_id, + 'host': host}) plugin = manager.QuantumManager.get_plugin() network = plugin.get_network(context, network_id) @@ -62,7 +75,9 @@ class DhcpRpcCallbackMixin(object): # a device id that combines host and network ids LOG.debug(_('Port %(device_id)s for %(network_id)s requested from ' - '%(host)s'), locals()) + '%(host)s'), {'device_id': device_id, + 'network_id': network_id, + 'host': host}) plugin = manager.QuantumManager.get_plugin() retval = None diff --git a/quantum/db/extraroute_db.py b/quantum/db/extraroute_db.py index 77e426560..82425278d 100644 --- a/quantum/db/extraroute_db.py +++ b/quantum/db/extraroute_db.py @@ -154,11 +154,12 @@ class ExtraRoute_db_mixin(l3_db.L3_NAT_db_mixin): context, router['id']) return routers - def get_sync_data(self, context, router_ids=None): + def get_sync_data(self, context, router_ids=None, active=None): """Query routers and their related floating_ips, interfaces.""" with context.session.begin(subtransactions=True): routers = super(ExtraRoute_db_mixin, - self).get_sync_data(context, router_ids) + self).get_sync_data(context, router_ids, + active=active) for router in routers: router['routes'] = self._get_extra_routes_by_router_id( context, router['id']) diff --git a/quantum/db/l3_db.py b/quantum/db/l3_db.py index dd51d1104..943ed993b 100644 --- a/quantum/db/l3_db.py +++ b/quantum/db/l3_db.py @@ -23,11 +23,11 @@ from sqlalchemy import orm from sqlalchemy.orm import exc from sqlalchemy.sql import expression as expr +from quantum.api.rpc.agentnotifiers import l3_rpc_agent_api from quantum.api.v2 import attributes from quantum.common import constants as l3_constants from quantum.common import exceptions as q_exc from quantum.db import db_base_plugin_v2 -from quantum.db import l3_rpc_agent_api from quantum.db import model_base from quantum.db import models_v2 from quantum.extensions import l3 @@ -328,7 +328,8 @@ class L3_NAT_db_mixin(l3.RouterPluginBase): if len(fixed_ips) != 1: msg = _('Router port must have exactly one fixed IP') raise q_exc.BadRequest(resource='router', msg=msg) - subnet = self._get_subnet(context, fixed_ips[0]['subnet_id']) + subnet_id = fixed_ips[0]['subnet_id'] + subnet = self._get_subnet(context, subnet_id) self._check_for_dup_router_subnet(context, router_id, port['network_id'], subnet['id'], @@ -360,7 +361,10 @@ class L3_NAT_db_mixin(l3.RouterPluginBase): 'name': ''}}) routers = self.get_sync_data(context.elevated(), [router_id]) - l3_rpc_agent_api.L3AgentNotify.routers_updated(context, routers) + l3_rpc_agent_api.L3AgentNotify.routers_updated( + context, routers, 'add_router_interface', + {'network_id': port['network_id'], + 'subnet_id': subnet_id}) info = {'port_id': port['id'], 'subnet_id': port['fixed_ips'][0]['subnet_id']} notifier_api.notify(context, @@ -409,6 +413,7 @@ class L3_NAT_db_mixin(l3.RouterPluginBase): subnet_id = port_db['fixed_ips'][0]['subnet_id'] self._confirm_router_interface_not_in_use( context, router_id, subnet_id) + _network_id = port_db['network_id'] self.delete_port(context, port_db['id'], l3_port_check=False) elif 'subnet_id' in interface_info: subnet_id = interface_info['subnet_id'] @@ -428,6 +433,7 @@ class L3_NAT_db_mixin(l3.RouterPluginBase): for p in ports: if p['fixed_ips'][0]['subnet_id'] == subnet_id: port_id = p['id'] + _network_id = p['network_id'] self.delete_port(context, p['id'], l3_port_check=False) found = True break @@ -438,7 +444,10 @@ class L3_NAT_db_mixin(l3.RouterPluginBase): raise l3.RouterInterfaceNotFoundForSubnet(router_id=router_id, subnet_id=subnet_id) routers = self.get_sync_data(context.elevated(), [router_id]) - l3_rpc_agent_api.L3AgentNotify.routers_updated(context, routers) + l3_rpc_agent_api.L3AgentNotify.routers_updated( + context, routers, 'remove_router_interface', + {'network_id': _network_id, + 'subnet_id': subnet_id}) notifier_api.notify(context, notifier_api.publisher_id('network'), 'router.interface.delete', @@ -649,7 +658,8 @@ class L3_NAT_db_mixin(l3.RouterPluginBase): router_id = floatingip_db['router_id'] if router_id: routers = self.get_sync_data(context.elevated(), [router_id]) - l3_rpc_agent_api.L3AgentNotify.routers_updated(context, routers) + l3_rpc_agent_api.L3AgentNotify.routers_updated(context, routers, + 'create_floatingip') return self._make_floatingip_dict(floatingip_db) def update_floatingip(self, context, id, floatingip): @@ -671,7 +681,8 @@ class L3_NAT_db_mixin(l3.RouterPluginBase): router_ids.append(router_id) if router_ids: routers = self.get_sync_data(context.elevated(), router_ids) - l3_rpc_agent_api.L3AgentNotify.routers_updated(context, routers) + l3_rpc_agent_api.L3AgentNotify.routers_updated(context, routers, + 'update_floatingip') return self._make_floatingip_dict(floatingip_db) def delete_floatingip(self, context, id): @@ -684,7 +695,8 @@ class L3_NAT_db_mixin(l3.RouterPluginBase): l3_port_check=False) if router_id: routers = self.get_sync_data(context.elevated(), [router_id]) - l3_rpc_agent_api.L3AgentNotify.routers_updated(context, routers) + l3_rpc_agent_api.L3AgentNotify.routers_updated(context, routers, + 'delete_floatingip') def get_floatingip(self, context, id, fields=None): floatingip = self._get_floatingip(context, id) @@ -816,7 +828,7 @@ class L3_NAT_db_mixin(l3.RouterPluginBase): else: return [n for n in nets if n['id'] not in ext_nets] - def _get_sync_routers(self, context, router_ids=None): + def _get_sync_routers(self, context, router_ids=None, active=None): """Query routers and their gw ports for l3 agent. Query routers with the router_ids. The gateway ports, if any, @@ -831,7 +843,12 @@ class L3_NAT_db_mixin(l3.RouterPluginBase): """ router_query = context.session.query(Router) if router_ids: - router_query = router_query.filter(Router.id.in_(router_ids)) + if 1 == len(router_ids): + router_query = router_query.filter(Router.id == router_ids[0]) + else: + router_query = router_query.filter(Router.id.in_(router_ids)) + if active is not None: + router_query = router_query.filter(Router.admin_state_up == active) routers = router_query.all() gw_port_ids = [] if not routers: @@ -842,7 +859,7 @@ class L3_NAT_db_mixin(l3.RouterPluginBase): gw_port_ids.append(gw_port_id) gw_ports = [] if gw_port_ids: - gw_ports = self._get_sync_gw_ports(context, gw_port_ids) + gw_ports = self.get_sync_gw_ports(context, gw_port_ids) gw_port_id_gw_port_dict = {} for gw_port in gw_ports: gw_port_id_gw_port_dict[gw_port['id']] = gw_port @@ -862,7 +879,7 @@ class L3_NAT_db_mixin(l3.RouterPluginBase): return [] return self.get_floatingips(context, {'router_id': router_ids}) - def _get_sync_gw_ports(self, context, gw_port_ids): + def get_sync_gw_ports(self, context, gw_port_ids): if not gw_port_ids: return [] filters = {'id': gw_port_ids} @@ -871,12 +888,13 @@ class L3_NAT_db_mixin(l3.RouterPluginBase): self._populate_subnet_for_ports(context, gw_ports) return gw_ports - def _get_sync_interfaces(self, context, router_ids): + def get_sync_interfaces(self, context, router_ids, + device_owner=DEVICE_OWNER_ROUTER_INTF): """Query router interfaces that relate to list of router_ids.""" if not router_ids: return [] filters = {'device_id': router_ids, - 'device_owner': [DEVICE_OWNER_ROUTER_INTF]} + 'device_owner': [device_owner]} interfaces = self.get_ports(context, filters) if interfaces: self._populate_subnet_for_ports(context, interfaces) @@ -934,14 +952,15 @@ class L3_NAT_db_mixin(l3.RouterPluginBase): router[l3_constants.INTERFACE_KEY] = router_interfaces return routers_dict.values() - def get_sync_data(self, context, router_ids=None): + def get_sync_data(self, context, router_ids=None, active=None): """Query routers and their related floating_ips, interfaces.""" with context.session.begin(subtransactions=True): routers = self._get_sync_routers(context, - router_ids) + router_ids=router_ids, + active=active) router_ids = [router['id'] for router in routers] floating_ips = self._get_sync_floating_ips(context, router_ids) - interfaces = self._get_sync_interfaces(context, router_ids) + interfaces = self.get_sync_interfaces(context, router_ids) return self._process_sync_data(routers, interfaces, floating_ips) def get_external_network_id(self, context): diff --git a/quantum/db/l3_rpc_agent_api.py b/quantum/db/l3_rpc_agent_api.py deleted file mode 100644 index 718d9c48e..000000000 --- a/quantum/db/l3_rpc_agent_api.py +++ /dev/null @@ -1,50 +0,0 @@ -# Copyright (c) 2012 OpenStack, LLC. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from quantum.common import topics -from quantum.openstack.common import jsonutils -from quantum.openstack.common import log as logging -from quantum.openstack.common.rpc import proxy - - -LOG = logging.getLogger(__name__) - - -class L3AgentNotifyAPI(proxy.RpcProxy): - """API for plugin to notify L3 agent.""" - BASE_RPC_API_VERSION = '1.0' - - def __init__(self, topic=topics.L3_AGENT): - super(L3AgentNotifyAPI, self).__init__( - topic=topic, default_version=self.BASE_RPC_API_VERSION) - - def router_deleted(self, context, router_id): - LOG.debug(_('Notify agent the router %s is deleted'), router_id) - self.cast(context, - self.make_msg('router_deleted', - router_id=router_id), - topic=self.topic) - - def routers_updated(self, context, routers): - if routers: - LOG.debug(_('Notify agent routers were updated:\n %s'), - jsonutils.dumps(routers, indent=5)) - self.cast(context, - self.make_msg('routers_updated', - routers=routers), - topic=self.topic) - - -L3AgentNotify = L3AgentNotifyAPI() diff --git a/quantum/db/l3_rpc_base.py b/quantum/db/l3_rpc_base.py index 2a11b701e..4a8635679 100644 --- a/quantum/db/l3_rpc_base.py +++ b/quantum/db/l3_rpc_base.py @@ -13,6 +13,10 @@ # See the License for the specific language governing permissions and # limitations under the License. +from oslo.config import cfg + +from quantum.common import constants +from quantum.common import utils from quantum import context as quantum_context from quantum import manager from quantum.openstack.common import jsonutils @@ -34,10 +38,17 @@ class L3RpcCallbackMixin(object): with their interfaces and floating_ips """ router_id = kwargs.get('router_id') - # TODO(gongysh) we will use host in kwargs for multi host BP + host = kwargs.get('host') context = quantum_context.get_admin_context() plugin = manager.QuantumManager.get_plugin() - routers = plugin.get_sync_data(context, router_id) + if utils.is_extension_supported( + plugin, constants.AGENT_SCHEDULER_EXT_ALIAS): + if cfg.CONF.router_auto_schedule: + plugin.auto_schedule_routers(context, host, router_id) + routers = plugin.list_active_sync_routers_on_active_l3_agent( + context, host, router_id) + else: + routers = plugin.get_sync_data(context, router_id) LOG.debug(_("Routers returned to l3 agent:\n %s"), jsonutils.dumps(routers, indent=5)) return routers diff --git a/quantum/db/migration/alembic_migrations/versions/4692d074d587_agent_scheduler.py b/quantum/db/migration/alembic_migrations/versions/4692d074d587_agent_scheduler.py new file mode 100644 index 000000000..2d28fff2c --- /dev/null +++ b/quantum/db/migration/alembic_migrations/versions/4692d074d587_agent_scheduler.py @@ -0,0 +1,79 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# +# Copyright 2013 OpenStack LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +"""agent scheduler + +Revision ID: 4692d074d587 +Revises: 3b54bf9e29f7 +Create Date: 2013-02-21 23:01:50.370306 + +""" + +# revision identifiers, used by Alembic. +revision = '4692d074d587' +down_revision = '3b54bf9e29f7' + +# Change to ['*'] if this migration applies to all plugins + +migration_for_plugins = [ + 'quantum.plugins.openvswitch.ovs_quantum_plugin.OVSQuantumPluginV2' +] + +from alembic import op +import sqlalchemy as sa + + +from quantum.db import migration + + +def upgrade(active_plugin=None, options=None): + if not migration.should_run(active_plugin, migration_for_plugins): + return + + ### commands auto generated by Alembic - please adjust! ### + op.create_table( + 'networkdhcpagentbindings', + sa.Column('network_id', sa.String(length=36), nullable=False), + sa.Column('dhcp_agent_id', sa.String(length=36), nullable=False), + sa.ForeignKeyConstraint(['dhcp_agent_id'], ['agents.id'], + ondelete='CASCADE'), + sa.ForeignKeyConstraint(['network_id'], ['networks.id'], + ondelete='CASCADE'), + sa.PrimaryKeyConstraint('network_id', 'dhcp_agent_id') + ) + op.create_table( + 'routerl3agentbindings', + sa.Column('id', sa.String(length=36), nullable=False), + sa.Column('router_id', sa.String(length=36), nullable=True), + sa.Column('l3_agent_id', sa.String(length=36), nullable=True), + sa.ForeignKeyConstraint(['l3_agent_id'], ['agents.id'], + ondelete='CASCADE'), + sa.ForeignKeyConstraint(['router_id'], ['routers.id'], + ondelete='CASCADE'), + sa.PrimaryKeyConstraint('id') + ) + ### end Alembic commands ### + + +def downgrade(active_plugin=None, options=None): + if not migration.should_run(active_plugin, migration_for_plugins): + return + + ### commands auto generated by Alembic - please adjust! ### + op.drop_table('routerl3agentbindings') + op.drop_table('networkdhcpagentbindings') + ### end Alembic commands ### diff --git a/quantum/extensions/agentscheduler.py b/quantum/extensions/agentscheduler.py new file mode 100644 index 000000000..73ae370e2 --- /dev/null +++ b/quantum/extensions/agentscheduler.py @@ -0,0 +1,252 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (c) 2013 OpenStack, LLC. +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from abc import abstractmethod + +from quantum.api import extensions +from quantum.api.v2 import base +from quantum.api.v2 import resource +from quantum.common import constants +from quantum.common import exceptions +from quantum.extensions import agent +from quantum import manager +from quantum import policy +from quantum import wsgi + +DHCP_NET = 'dhcp-network' +DHCP_NETS = DHCP_NET + 's' +DHCP_AGENT = 'dhcp-agent' +DHCP_AGENTS = DHCP_AGENT + 's' +L3_ROUTER = 'l3-router' +L3_ROUTERS = L3_ROUTER + 's' +L3_AGENT = 'l3-agent' +L3_AGENTS = L3_AGENT + 's' + + +class NetworkSchedulerController(wsgi.Controller): + def index(self, request, **kwargs): + plugin = manager.QuantumManager.get_plugin() + policy.enforce(request.context, + "get_%s" % DHCP_NETS, + {}, + plugin=plugin) + return plugin.list_networks_on_dhcp_agent( + request.context, kwargs['agent_id']) + + def create(self, request, body, **kwargs): + plugin = manager.QuantumManager.get_plugin() + policy.enforce(request.context, + "create_%s" % DHCP_NET, + {}, + plugin=plugin) + return plugin.add_network_to_dhcp_agent( + request.context, kwargs['agent_id'], body['network_id']) + + def delete(self, request, id, **kwargs): + plugin = manager.QuantumManager.get_plugin() + policy.enforce(request.context, + "delete_%s" % DHCP_NET, + {}, + plugin=plugin) + return plugin.remove_network_from_dhcp_agent( + request.context, kwargs['agent_id'], id) + + +class RouterSchedulerController(wsgi.Controller): + def index(self, request, **kwargs): + plugin = manager.QuantumManager.get_plugin() + policy.enforce(request.context, + "get_%s" % L3_ROUTERS, + {}, + plugin=plugin) + return plugin.list_routers_on_l3_agent( + request.context, kwargs['agent_id']) + + def create(self, request, body, **kwargs): + plugin = manager.QuantumManager.get_plugin() + policy.enforce(request.context, + "create_%s" % L3_ROUTER, + {}, + plugin=plugin) + return plugin.add_router_to_l3_agent( + request.context, + kwargs['agent_id'], + body['router_id']) + + def delete(self, request, id, **kwargs): + plugin = manager.QuantumManager.get_plugin() + policy.enforce(request.context, + "delete_%s" % L3_ROUTER, + {}, + plugin=plugin) + return plugin.remove_router_from_l3_agent( + request.context, kwargs['agent_id'], id) + + +class DhcpAgentsHostingNetworkController(wsgi.Controller): + def index(self, request, **kwargs): + plugin = manager.QuantumManager.get_plugin() + policy.enforce(request.context, + "get_%s" % DHCP_AGENTS, + {}, + plugin=plugin) + return plugin.list_dhcp_agents_hosting_network( + request.context, kwargs['network_id']) + + +class L3AgentsHostingRouterController(wsgi.Controller): + def index(self, request, **kwargs): + plugin = manager.QuantumManager.get_plugin() + policy.enforce(request.context, + "get_%s" % L3_AGENTS, + {}, + plugin=plugin) + return plugin.list_l3_agents_hosting_router( + request.context, kwargs['router_id']) + + +class Agentscheduler(extensions.ExtensionDescriptor): + """Extension class supporting agent scheduler. + """ + + @classmethod + def get_name(cls): + return "Agent Schedulers" + + @classmethod + def get_alias(cls): + return constants.AGENT_SCHEDULER_EXT_ALIAS + + @classmethod + def get_description(cls): + return "Schedule resources among agents" + + @classmethod + def get_namespace(cls): + return "http://docs.openstack.org/ext/agent_scheduler/api/v1.0" + + @classmethod + def get_updated(cls): + return "2013-02-03T10:00:00-00:00" + + @classmethod + def get_resources(cls): + """Returns Ext Resources """ + exts = [] + parent = dict(member_name="agent", + collection_name="agents") + controller = resource.Resource(NetworkSchedulerController(), + base.FAULT_MAP) + exts.append(extensions.ResourceExtension( + DHCP_NETS, controller, parent)) + + controller = resource.Resource(RouterSchedulerController(), + base.FAULT_MAP) + exts.append(extensions.ResourceExtension( + L3_ROUTERS, controller, parent)) + + parent = dict(member_name="network", + collection_name="networks") + + controller = resource.Resource(DhcpAgentsHostingNetworkController(), + base.FAULT_MAP) + exts.append(extensions.ResourceExtension( + DHCP_AGENTS, controller, parent)) + + parent = dict(member_name="router", + collection_name="routers") + + controller = resource.Resource(L3AgentsHostingRouterController(), + base.FAULT_MAP) + exts.append(extensions.ResourceExtension( + L3_AGENTS, controller, parent)) + return exts + + def get_extended_resources(self, version): + return {} + + +class InvalidDHCPAgent(agent.AgentNotFound): + message = _("Agent %(id)s is not a valid DHCP Agent or has been disabled") + + +class NetworkHostedByDHCPAgent(exceptions.Conflict): + message = _("The network %(network_id)s has been already hosted" + " by the DHCP Agent %(agent_id)s.") + + +class NetworkNotHostedByDhcpAgent(exceptions.Conflict): + message = _("The network %(network_id)s is not hosted" + " by the DHCP agent %(agent_id)s.") + + +class InvalidL3Agent(agent.AgentNotFound): + message = _("Agent %(id)s is not a L3 Agent or has been disabled") + + +class RouterHostedByL3Agent(exceptions.Conflict): + message = _("The router %(router_id)s has been already hosted" + " by the L3 Agent %(agent_id)s.") + + +class RouterSchedulingFailed(exceptions.Conflict): + message = _("Failed scheduling router %(router_id)s to" + " the L3 Agent %(agent_id)s.") + + +class RouterNotHostedByL3Agent(exceptions.Conflict): + message = _("The router %(router_id)s is not hosted" + " by L3 agent %(agent_id)s.") + + +class AgentSchedulerPluginBase(object): + """ REST API to operate the agent scheduler. + + All of method must be in an admin context. + """ + + @abstractmethod + def add_network_to_dhcp_agent(self, context, id, network_id): + pass + + @abstractmethod + def remove_network_from_dhcp_agent(self, context, id, network_id): + pass + + @abstractmethod + def list_networks_on_dhcp_agent(self, context, id): + pass + + @abstractmethod + def list_dhcp_agents_hosting_network(self, context, network_id): + pass + + @abstractmethod + def add_router_to_l3_agent(self, context, id, router_id): + pass + + @abstractmethod + def remove_router_from_l3_agent(self, context, id, router_id): + pass + + @abstractmethod + def list_routers_on_l3_agent(self, context, id): + pass + + @abstractmethod + def list_l3_agents_hosting_router(self, context, router_id): + pass diff --git a/quantum/plugins/openvswitch/common/config.py b/quantum/plugins/openvswitch/common/config.py index 6f16e3cdb..4886974dd 100644 --- a/quantum/plugins/openvswitch/common/config.py +++ b/quantum/plugins/openvswitch/common/config.py @@ -17,6 +17,7 @@ from oslo.config import cfg from quantum.agent.common import config +from quantum import scheduler DEFAULT_BRIDGE_MAPPINGS = [] @@ -64,3 +65,4 @@ cfg.CONF.register_opts(ovs_opts, "OVS") cfg.CONF.register_opts(agent_opts, "AGENT") config.register_agent_state_opts_helper(cfg.CONF) config.register_root_helper(cfg.CONF) +cfg.CONF.register_opts(scheduler.AGENTS_SCHEDULER_OPTS) diff --git a/quantum/plugins/openvswitch/ovs_quantum_plugin.py b/quantum/plugins/openvswitch/ovs_quantum_plugin.py index ce22d9edb..86354fbeb 100644 --- a/quantum/plugins/openvswitch/ovs_quantum_plugin.py +++ b/quantum/plugins/openvswitch/ovs_quantum_plugin.py @@ -25,12 +25,15 @@ import sys from oslo.config import cfg from quantum.agent import securitygroups_rpc as sg_rpc +from quantum.api.rpc.agentnotifiers import dhcp_rpc_agent_api +from quantum.api.rpc.agentnotifiers import l3_rpc_agent_api from quantum.api.v2 import attributes from quantum.common import constants as q_const from quantum.common import exceptions as q_exc from quantum.common import rpc as q_rpc from quantum.common import topics from quantum.db import agents_db +from quantum.db import agentschedulers_db from quantum.db import db_base_plugin_v2 from quantum.db import dhcp_rpc_base from quantum.db import extraroute_db @@ -41,6 +44,7 @@ from quantum.db import securitygroups_rpc_base as sg_db_rpc from quantum.extensions import portbindings from quantum.extensions import providernet as provider from quantum.extensions import securitygroup as ext_sg +from quantum.openstack.common import importutils from quantum.openstack.common import log as logging from quantum.openstack.common import rpc from quantum.openstack.common.rpc import proxy @@ -211,7 +215,9 @@ class AgentNotifierApi(proxy.RpcProxy, class OVSQuantumPluginV2(db_base_plugin_v2.QuantumDbPluginV2, extraroute_db.ExtraRoute_db_mixin, sg_db_rpc.SecurityGroupServerRpcMixin, - agents_db.AgentDbMixin): + agents_db.AgentDbMixin, + agentschedulers_db.AgentSchedulerDbMixin): + """Implement the Quantum abstractions using Open vSwitch. Depending on whether tunneling is enabled, either a GRE tunnel or @@ -238,8 +244,7 @@ class OVSQuantumPluginV2(db_base_plugin_v2.QuantumDbPluginV2, supported_extension_aliases = ["provider", "router", "binding", "quotas", "security-group", - "agent", - "extraroute"] + "agent", "extraroute", "agent_scheduler"] network_view = "extension:provider_network:view" network_set = "extension:provider_network:set" @@ -269,12 +274,18 @@ class OVSQuantumPluginV2(db_base_plugin_v2.QuantumDbPluginV2, "Agent terminated!")) sys.exit(1) self.setup_rpc() + self.network_scheduler = importutils.import_object( + cfg.CONF.network_scheduler_driver) + self.router_scheduler = importutils.import_object( + cfg.CONF.router_scheduler_driver) def setup_rpc(self): # RPC support self.topic = topics.PLUGIN self.conn = rpc.create_connection(new=True) self.notifier = AgentNotifierApi(topics.AGENT) + self.dhcp_agent_notifier = dhcp_rpc_agent_api.DhcpAgentNotifyAPI() + self.l3_agent_notifier = l3_rpc_agent_api.L3AgentNotify self.callbacks = OVSRpcCallbacks(self.notifier) self.dispatcher = self.callbacks.create_rpc_dispatcher() self.conn.create_consumer(self.topic, self.dispatcher, @@ -486,6 +497,7 @@ class OVSQuantumPluginV2(db_base_plugin_v2.QuantumDbPluginV2, self._extend_network_dict_l3(context, net) # note - exception will rollback entire transaction LOG.debug(_("Created network: %s"), net['id']) + self.schedule_network(context, network['network'], net) return net def update_network(self, context, id, network): @@ -569,6 +581,8 @@ class OVSQuantumPluginV2(db_base_plugin_v2.QuantumDbPluginV2, else: self.notifier.security_groups_member_updated( context, port.get(ext_sg.SECURITYGROUPS)) + net = self.get_network(context, port['network_id']) + self.schedule_network(context, None, net) return self._extend_port_dict_binding(context, port) def get_port(self, context, id, fields=None): @@ -636,3 +650,20 @@ class OVSQuantumPluginV2(db_base_plugin_v2.QuantumDbPluginV2, self.notifier.security_groups_member_updated( context, port.get(ext_sg.SECURITYGROUPS)) + + def update_agent(self, context, id, agent): + original_agent = self.get_agent(context, id) + result = super(OVSQuantumPluginV2, self).update_agent( + context, id, agent) + agent_data = agent['agent'] + if ('admin_state_up' in agent_data and + original_agent['admin_state_up'] != agent_data['admin_state_up']): + if original_agent['agent_type'] == q_const.AGENT_TYPE_DHCP: + self.dhcp_agent_notifier.agent_updated( + context, agent_data['admin_state_up'], + original_agent['host']) + elif original_agent['agent_type'] == q_const.AGENT_TYPE_L3: + self.l3_agent_notifier.agent_updated( + context, agent_data['admin_state_up'], + original_agent['host']) + return result diff --git a/quantum/policy.py b/quantum/policy.py index f9dc76df2..d8e31456e 100644 --- a/quantum/policy.py +++ b/quantum/policy.py @@ -51,6 +51,7 @@ def init(): raise exceptions.PolicyNotFound(path=cfg.CONF.policy_file) # pass _set_brain to read_cached_file so that the policy brain # is reset only if the file has changed + LOG.debug(_("loading policy file at %s"), _POLICY_PATH) utils.read_cached_file(_POLICY_PATH, _POLICY_CACHE, reload_func=_set_rules) diff --git a/quantum/scheduler/__init__.py b/quantum/scheduler/__init__.py new file mode 100644 index 000000000..082601856 --- /dev/null +++ b/quantum/scheduler/__init__.py @@ -0,0 +1,34 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (c) 2013 OpenStack LLC. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from oslo.config import cfg + + +AGENTS_SCHEDULER_OPTS = [ + cfg.StrOpt('network_scheduler_driver', + default='quantum.scheduler.' + 'dhcp_agent_scheduler.ChanceScheduler', + help=_('Driver to use for scheduling network to DHCP agent')), + cfg.StrOpt('router_scheduler_driver', + default='quantum.scheduler.l3_agent_scheduler.ChanceScheduler', + help=_('Driver to use for scheduling ' + 'router to a default L3 agent')), + cfg.BoolOpt('network_auto_schedule', default=True, + help=_('Allow auto scheduling networks to DHCP agent.')), + cfg.BoolOpt('router_auto_schedule', default=True, + help=_('Allow auto scheduling routers to L3 agent.')), +] diff --git a/quantum/scheduler/dhcp_agent_scheduler.py b/quantum/scheduler/dhcp_agent_scheduler.py new file mode 100644 index 000000000..62929889c --- /dev/null +++ b/quantum/scheduler/dhcp_agent_scheduler.py @@ -0,0 +1,108 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (c) 2013 OpenStack LLC. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import random + +from sqlalchemy.orm import exc +from sqlalchemy.sql import exists + +from quantum.common import constants +from quantum.db import models_v2 +from quantum.db import agents_db +from quantum.db import agentschedulers_db +from quantum.openstack.common import log as logging + + +LOG = logging.getLogger(__name__) + + +class ChanceScheduler(object): + """Allocate a DHCP agent for a network in a random way. + More sophisticated scheduler (similar to filter scheduler in nova?) + can be introduced later.""" + + def schedule(self, plugin, context, request_network, network): + """Schedule the network to an active DHCP agent if there + is no active DHCP agent hosting it. + """ + #TODO(gongysh) don't schedule the networks with only + # subnets whose enable_dhcp is false + with context.session.begin(subtransactions=True): + dhcp_agents = plugin.get_dhcp_agents_hosting_networks( + context, [network['id']], active=True) + if dhcp_agents: + LOG.debug(_('Network %s is hosted already'), + network['id']) + return False + enabled_dhcp_agents = plugin.get_agents_db( + context, filters={ + 'agent_type': [constants.AGENT_TYPE_DHCP], + 'admin_state_up': [True]}) + if not enabled_dhcp_agents: + LOG.warn(_('No enabled DHCP agents')) + return False + active_dhcp_agents = [enabled_dhcp_agent for enabled_dhcp_agent in + enabled_dhcp_agents if not + agents_db.AgentDbMixin.is_agent_down( + enabled_dhcp_agent['heartbeat_timestamp'])] + if not active_dhcp_agents: + LOG.warn(_('No active DHCP agents')) + return False + chosen_agent = random.choice(active_dhcp_agents) + binding = agentschedulers_db.NetworkDhcpAgentBinding() + binding.dhcp_agent = chosen_agent + binding.network_id = network['id'] + context.session.add(binding) + LOG.debug(_('Network %(network_id)s is scheduled to be hosted by ' + 'DHCP agent %(agent_id)s'), + {'network_id': network['id'], + 'agent_id': chosen_agent['id']}) + return True + + def auto_schedule_networks(self, plugin, context, host): + """Schedule non-hosted networks to the DHCP agent on + the specified host.""" + with context.session.begin(subtransactions=True): + query = context.session.query(agents_db.Agent) + query = query.filter(agents_db.Agent.agent_type == + constants.AGENT_TYPE_DHCP, + agents_db.Agent.host == host, + agents_db.Agent.admin_state_up == True) + try: + dhcp_agent = query.one() + except (exc.MultipleResultsFound, exc.NoResultFound): + LOG.warn(_('No enabled DHCP agent on host %s'), + host) + return False + if agents_db.AgentDbMixin.is_agent_down( + dhcp_agent.heartbeat_timestamp): + LOG.warn(_('DHCP agent %s is not active'), dhcp_agent.id) + #TODO(gongysh) consider the disabled agent's network + net_stmt = ~exists().where( + models_v2.Network.id == + agentschedulers_db.NetworkDhcpAgentBinding.network_id) + net_ids = context.session.query( + models_v2.Network.id).filter(net_stmt).all() + if not net_ids: + LOG.debug(_('No non-hosted networks')) + return False + for net_id in net_ids: + binding = agentschedulers_db.NetworkDhcpAgentBinding() + binding.dhcp_agent = dhcp_agent + binding.network_id = net_id[0] + context.session.add(binding) + return True diff --git a/quantum/scheduler/l3_agent_scheduler.py b/quantum/scheduler/l3_agent_scheduler.py new file mode 100644 index 000000000..0d3b1efbb --- /dev/null +++ b/quantum/scheduler/l3_agent_scheduler.py @@ -0,0 +1,149 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (c) 2013 OpenStack LLC. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import random + +from sqlalchemy.orm import exc +from sqlalchemy.sql import exists + +from quantum.common import constants +from quantum.db import l3_db +from quantum.db import agents_db +from quantum.db import agentschedulers_db +from quantum.openstack.common import log as logging + + +LOG = logging.getLogger(__name__) + + +class ChanceScheduler(object): + """Allocate a L3 agent for a router in a random way. + More sophisticated scheduler (similar to filter scheduler in nova?) + can be introduced later.""" + + def auto_schedule_routers(self, plugin, context, host, router_id): + """Schedule non-hosted routers to L3 Agent running on host. + If router_id is given, only this router is scheduled + if it is not hosted yet. + Don't schedule the routers which are hosted already + by active l3 agents. + """ + with context.session.begin(subtransactions=True): + # query if we have valid l3 agent on the host + query = context.session.query(agents_db.Agent) + query = query.filter(agents_db.Agent.agent_type == + constants.AGENT_TYPE_L3, + agents_db.Agent.host == host, + agents_db.Agent.admin_state_up == True) + try: + l3_agent = query.one() + except (exc.MultipleResultsFound, exc.NoResultFound): + LOG.debug(_('No enabled L3 agent on host %s'), + host) + return False + if agents_db.AgentDbMixin.is_agent_down( + l3_agent.heartbeat_timestamp): + LOG.warn(_('L3 agent %s is not active'), l3_agent.id) + # check if the specified router is hosted + if router_id: + l3_agents = plugin.get_l3_agents_hosting_routers( + context, [router_id], admin_state_up=True) + if l3_agents: + LOG.debug(_('Router %(router_id)s has already been hosted' + ' by L3 agent %(agent_id)s'), + {'router_id': router_id, + 'agent_id': l3_agents[0]['id']}) + return False + + # get the router ids + if router_id: + router_ids = [(router_id,)] + else: + # get all routers that are not hosted + #TODO(gongysh) consider the disabled agent's router + stmt = ~exists().where( + l3_db.Router.id == + agentschedulers_db.RouterL3AgentBinding.router_id) + router_ids = context.session.query( + l3_db.Router.id).filter(stmt).all() + if not router_ids: + LOG.debug(_('No non-hosted routers')) + return False + + # check if the configuration of l3 agent is compatible + # with the router + router_ids = [router_id[0] for router_id in router_ids] + routers = plugin.get_routers(context, filters={'id': router_ids}) + to_removed_ids = [] + for router in routers: + candidates = plugin.get_l3_agent_candidates(router, [l3_agent]) + if not candidates: + to_removed_ids.append(router['id']) + router_ids = list(set(router_ids) - set(to_removed_ids)) + if not router_ids: + LOG.warn(_('No routers compatible with L3 agent configuration' + ' on host %s', host)) + return False + + # binding + for router_id in router_ids: + binding = agentschedulers_db.RouterL3AgentBinding() + binding.l3_agent = l3_agent + binding.router_id = router_id + binding.default = True + context.session.add(binding) + return True + + def schedule(self, plugin, context, sync_router): + """Schedule the router to an active L3 agent if there + is no enable L3 agent hosting it. + """ + with context.session.begin(subtransactions=True): + # allow one router is hosted by just + # one enabled l3 agent hosting since active is just a + # timing problem. Non-active l3 agent can return to + # active any time + l3_agents = plugin.get_l3_agents_hosting_routers( + context, [sync_router['id']], admin_state_up=True) + if l3_agents: + LOG.debug(_('Router %(router_id)s has already been hosted' + ' by L3 agent %(agent_id)s'), + {'router_id': sync_router['id'], + 'agent_id': l3_agents[0]['id']}) + return False + + active_l3_agents = plugin.get_l3_agents(context, active=True) + if not active_l3_agents: + LOG.warn(_('No active L3 agents')) + return False + candidates = plugin.get_l3_agent_candidates(sync_router, + active_l3_agents) + if not candidates: + LOG.warn(_('No L3 agents can host the router %s'), + sync_router['id']) + return False + + chosen_agent = random.choice(candidates) + binding = agentschedulers_db.RouterL3AgentBinding() + binding.l3_agent = chosen_agent + binding.router_id = sync_router['id'] + context.session.add(binding) + LOG.debug(_('Router %(router_id)s is scheduled to ' + 'L3 agent %(agent_id)s'), + {'router_id': sync_router['id'], + 'agent_id': chosen_agent['id']}) + return True diff --git a/quantum/tests/unit/openvswitch/test_agent_scheduler.py b/quantum/tests/unit/openvswitch/test_agent_scheduler.py new file mode 100644 index 000000000..38c74ca9b --- /dev/null +++ b/quantum/tests/unit/openvswitch/test_agent_scheduler.py @@ -0,0 +1,803 @@ +# Copyright (c) 2013 OpenStack, LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import contextlib +import copy + +import mock +from webob import exc + +from quantum.api import extensions +from quantum.common import constants +from quantum import context +from quantum.db import agents_db +from quantum.db import dhcp_rpc_base +from quantum.db import l3_rpc_base +from quantum.extensions import agentscheduler +from quantum import manager +from quantum.openstack.common import uuidutils +from quantum.plugins.openvswitch.ovs_quantum_plugin import OVSQuantumPluginV2 +from quantum.tests.unit import test_agent_ext_plugin +from quantum.tests.unit.testlib_api import create_request +from quantum.tests.unit import test_db_plugin as test_plugin +from quantum.tests.unit import test_extensions +from quantum.tests.unit import test_l3_plugin +from quantum.wsgi import Serializer + +L3_HOSTA = 'hosta' +DHCP_HOSTA = 'hosta' +L3_HOSTB = 'hostb' +DHCP_HOSTC = 'hostc' + + +class AgentSchedulerTestMixIn(object): + + def _request_list(self, path, admin_context=True, + expected_code=exc.HTTPOk.code): + req = self._path_req(path, admin_context=admin_context) + res = req.get_response(self.ext_api) + self.assertEqual(res.status_int, expected_code) + return self.deserialize(self.fmt, res) + + def _path_req(self, path, method='GET', data=None, + query_string=None, + admin_context=True): + content_type = 'application/%s' % self.fmt + body = None + if data is not None: # empty dict is valid + body = Serializer().serialize(data, content_type) + if admin_context: + return create_request( + path, body, content_type, method, query_string=query_string) + else: + return create_request( + path, body, content_type, method, query_string=query_string, + context=context.Context('', 'tenant_id')) + + def _path_create_request(self, path, data, admin_context=True): + return self._path_req(path, method='POST', data=data, + admin_context=admin_context) + + def _path_show_request(self, path, admin_context=True): + return self._path_req(path, admin_context=admin_context) + + def _path_delete_request(self, path, admin_context=True): + return self._path_req(path, method='DELETE', + admin_context=admin_context) + + def _path_update_request(self, path, data, admin_context=True): + return self._path_req(path, method='PUT', data=data, + admin_context=admin_context) + + def _list_routers_hosted_by_l3_agent(self, agent_id, + expected_code=exc.HTTPOk.code, + admin_context=True): + path = "/agents/%s/%s.%s" % (agent_id, + agentscheduler.L3_ROUTERS, + self.fmt) + return self._request_list(path, expected_code=expected_code, + admin_context=admin_context) + + def _list_networks_hosted_by_dhcp_agent(self, agent_id, + expected_code=exc.HTTPOk.code, + admin_context=True): + path = "/agents/%s/%s.%s" % (agent_id, + agentscheduler.DHCP_NETS, + self.fmt) + return self._request_list(path, expected_code=expected_code, + admin_context=admin_context) + + def _list_l3_agents_hosting_router(self, router_id, + expected_code=exc.HTTPOk.code, + admin_context=True): + path = "/routers/%s/%s.%s" % (router_id, + agentscheduler.L3_AGENTS, + self.fmt) + return self._request_list(path, expected_code=expected_code, + admin_context=admin_context) + + def _list_dhcp_agents_hosting_network(self, network_id, + expected_code=exc.HTTPOk.code, + admin_context=True): + path = "/networks/%s/%s.%s" % (network_id, + agentscheduler.DHCP_AGENTS, + self.fmt) + return self._request_list(path, expected_code=expected_code, + admin_context=admin_context) + + def _add_router_to_l3_agent(self, id, router_id, + expected_code=exc.HTTPCreated.code, + admin_context=True): + path = "/agents/%s/%s.%s" % (id, + agentscheduler.L3_ROUTERS, + self.fmt) + req = self._path_create_request(path, + {'router_id': router_id}, + admin_context=admin_context) + res = req.get_response(self.ext_api) + self.assertEqual(res.status_int, expected_code) + + def _add_network_to_dhcp_agent(self, id, network_id, + expected_code=exc.HTTPCreated.code, + admin_context=True): + path = "/agents/%s/%s.%s" % (id, + agentscheduler.DHCP_NETS, + self.fmt) + req = self._path_create_request(path, + {'network_id': network_id}, + admin_context=admin_context) + res = req.get_response(self.ext_api) + self.assertEqual(res.status_int, expected_code) + + def _remove_network_from_dhcp_agent(self, id, network_id, + expected_code=exc.HTTPNoContent.code, + admin_context=True): + path = "/agents/%s/%s/%s.%s" % (id, + agentscheduler.DHCP_NETS, + network_id, + self.fmt) + req = self._path_delete_request(path, + admin_context=admin_context) + res = req.get_response(self.ext_api) + self.assertEqual(res.status_int, expected_code) + + def _remove_router_from_l3_agent(self, id, router_id, + expected_code=exc.HTTPNoContent.code, + admin_context=True): + path = "/agents/%s/%s/%s.%s" % (id, + agentscheduler.L3_ROUTERS, + router_id, + self.fmt) + req = self._path_delete_request(path, admin_context=admin_context) + res = req.get_response(self.ext_api) + self.assertEqual(res.status_int, expected_code) + + def _register_one_agent_state(self, agent_state): + callback = agents_db.AgentExtRpcCallback() + callback.report_state(self.adminContext, + agent_state={'agent_state': agent_state}) + + def _disable_agent(self, agent_id, admin_state_up=False): + new_agent = {} + new_agent['agent'] = {} + new_agent['agent']['admin_state_up'] = admin_state_up + self._update('agents', agent_id, new_agent) + + def _get_agent_id(self, agent_type, host): + agents = self._list_agents() + for agent in agents['agents']: + if (agent['agent_type'] == agent_type and + agent['host'] == host): + return agent['id'] + + +class AgentSchedulerTestCase(test_l3_plugin.L3NatTestCaseMixin, + test_agent_ext_plugin.AgentDBTestMixIn, + AgentSchedulerTestMixIn, + test_plugin.QuantumDbPluginV2TestCase): + fmt = 'json' + + def setUp(self): + plugin = ('quantum.plugins.openvswitch.' + 'ovs_quantum_plugin.OVSQuantumPluginV2') + self.dhcp_notifier_cls_p = mock.patch( + 'quantum.api.rpc.agentnotifiers.dhcp_rpc_agent_api.' + 'DhcpAgentNotifyAPI') + self.dhcp_notifier = mock.Mock(name='dhcp_notifier') + self.dhcp_notifier_cls = self.dhcp_notifier_cls_p.start() + self.dhcp_notifier_cls.return_value = self.dhcp_notifier + super(AgentSchedulerTestCase, self).setUp(plugin) + ext_mgr = extensions.PluginAwareExtensionManager.get_instance() + self.ext_api = test_extensions.setup_extensions_middleware(ext_mgr) + self.adminContext = context.get_admin_context() + self.agentscheduler_dbMinxin = manager.QuantumManager.get_plugin() + self.addCleanup(self.dhcp_notifier_cls_p.stop) + + def test_report_states(self): + self._register_agent_states() + agents = self._list_agents() + self.assertEqual(4, len(agents['agents'])) + + def test_network_scheduling_on_network_creation(self): + self._register_agent_states() + with self.network() as net: + dhcp_agents = self._list_dhcp_agents_hosting_network( + net['network']['id']) + self.assertEqual(1, len(dhcp_agents['agents'])) + + def test_network_auto_schedule_with_disabled(self): + with contextlib.nested(self.network(), + self.network()): + dhcp_rpc = dhcp_rpc_base.DhcpRpcCallbackMixin() + self._register_agent_states() + hosta_id = self._get_agent_id(constants.AGENT_TYPE_DHCP, + DHCP_HOSTA) + hostc_id = self._get_agent_id(constants.AGENT_TYPE_DHCP, + DHCP_HOSTC) + self._disable_agent(hosta_id) + dhcp_rpc.get_active_networks(self.adminContext, host=DHCP_HOSTA) + # second agent will host all the networks since first is disabled. + dhcp_rpc.get_active_networks(self.adminContext, host=DHCP_HOSTC) + networks = self._list_networks_hosted_by_dhcp_agent(hostc_id) + num_hostc_nets = len(networks['networks']) + networks = self._list_networks_hosted_by_dhcp_agent(hosta_id) + num_hosta_nets = len(networks['networks']) + self.assertEqual(0, num_hosta_nets) + self.assertEqual(2, num_hostc_nets) + + def test_network_auto_schedule_with_hosted(self): + # one agent hosts all the networks, other hosts none + with contextlib.nested(self.network(), + self.network()) as (net1, net2): + dhcp_rpc = dhcp_rpc_base.DhcpRpcCallbackMixin() + self._register_agent_states() + dhcp_rpc.get_active_networks(self.adminContext, host=DHCP_HOSTA) + # second agent will not host the network since first has got it. + dhcp_rpc.get_active_networks(self.adminContext, host=DHCP_HOSTC) + dhcp_agents = self._list_dhcp_agents_hosting_network( + net1['network']['id']) + hosta_id = self._get_agent_id(constants.AGENT_TYPE_DHCP, + DHCP_HOSTA) + hostc_id = self._get_agent_id(constants.AGENT_TYPE_DHCP, + DHCP_HOSTC) + hosta_nets = self._list_networks_hosted_by_dhcp_agent(hosta_id) + num_hosta_nets = len(hosta_nets['networks']) + hostc_nets = self._list_networks_hosted_by_dhcp_agent(hostc_id) + num_hostc_nets = len(hostc_nets['networks']) + + self.assertEqual(2, num_hosta_nets) + self.assertEqual(0, num_hostc_nets) + self.assertEqual(1, len(dhcp_agents['agents'])) + self.assertEqual(DHCP_HOSTA, dhcp_agents['agents'][0]['host']) + + def test_network_auto_schedule_with_hosted_2(self): + # one agent hosts one network + dhcp_rpc = dhcp_rpc_base.DhcpRpcCallbackMixin() + dhcp_hosta = { + 'binary': 'quantum-dhcp-agent', + 'host': DHCP_HOSTA, + 'topic': 'DHCP_AGENT', + 'configurations': {'dhcp_driver': 'dhcp_driver', + 'use_namespaces': True, + }, + 'agent_type': constants.AGENT_TYPE_DHCP} + dhcp_hostc = copy.deepcopy(dhcp_hosta) + dhcp_hostc['host'] = DHCP_HOSTC + with self.network() as net1: + self._register_one_agent_state(dhcp_hosta) + dhcp_rpc.get_active_networks(self.adminContext, host=DHCP_HOSTA) + hosta_id = self._get_agent_id(constants.AGENT_TYPE_DHCP, + DHCP_HOSTA) + self._disable_agent(hosta_id, admin_state_up=False) + with self.network() as net2: + self._register_one_agent_state(dhcp_hostc) + dhcp_rpc.get_active_networks(self.adminContext, + host=DHCP_HOSTC) + dhcp_agents_1 = self._list_dhcp_agents_hosting_network( + net1['network']['id']) + dhcp_agents_2 = self._list_dhcp_agents_hosting_network( + net2['network']['id']) + hosta_nets = self._list_networks_hosted_by_dhcp_agent(hosta_id) + num_hosta_nets = len(hosta_nets['networks']) + hostc_id = self._get_agent_id( + constants.AGENT_TYPE_DHCP, + DHCP_HOSTC) + hostc_nets = self._list_networks_hosted_by_dhcp_agent(hostc_id) + num_hostc_nets = len(hostc_nets['networks']) + + self.assertEqual(1, num_hosta_nets) + self.assertEqual(1, num_hostc_nets) + self.assertEqual(1, len(dhcp_agents_1['agents'])) + self.assertEqual(1, len(dhcp_agents_2['agents'])) + self.assertEqual(DHCP_HOSTA, dhcp_agents_1['agents'][0]['host']) + self.assertEqual(DHCP_HOSTC, dhcp_agents_2['agents'][0]['host']) + + def test_network_scheduling_on_port_creation(self): + with self.subnet() as subnet: + dhcp_agents = self._list_dhcp_agents_hosting_network( + subnet['subnet']['network_id']) + result0 = len(dhcp_agents['agents']) + self._register_agent_states() + with self.port(subnet=subnet, + device_owner="compute:test:" + DHCP_HOSTA) as port: + dhcp_agents = self._list_dhcp_agents_hosting_network( + port['port']['network_id']) + result1 = len(dhcp_agents['agents']) + self.assertEqual(0, result0) + self.assertEqual(1, result1) + + def test_network_scheduler_with_disabled_agent(self): + dhcp_hosta = { + 'binary': 'quantum-dhcp-agent', + 'host': DHCP_HOSTA, + 'topic': 'DHCP_AGENT', + 'configurations': {'dhcp_driver': 'dhcp_driver', + 'use_namespaces': True, + }, + 'agent_type': constants.AGENT_TYPE_DHCP} + self._register_one_agent_state(dhcp_hosta) + with self.network() as net1: + dhcp_agents = self._list_dhcp_agents_hosting_network( + net1['network']['id']) + self.assertEqual(1, len(dhcp_agents['agents'])) + agents = self._list_agents() + self._disable_agent(agents['agents'][0]['id']) + with self.network() as net2: + dhcp_agents = self._list_dhcp_agents_hosting_network( + net2['network']['id']) + self.assertEqual(0, len(dhcp_agents['agents'])) + + def test_network_scheduler_with_down_agent(self): + dhcp_hosta = { + 'binary': 'quantum-dhcp-agent', + 'host': DHCP_HOSTA, + 'topic': 'DHCP_AGENT', + 'configurations': {'dhcp_driver': 'dhcp_driver', + 'use_namespaces': True, + }, + 'agent_type': constants.AGENT_TYPE_DHCP} + self._register_one_agent_state(dhcp_hosta) + is_agent_down_str = 'quantum.db.agents_db.AgentDbMixin.is_agent_down' + with mock.patch(is_agent_down_str) as mock_is_agent_down: + mock_is_agent_down.return_value = False + with self.network() as net: + dhcp_agents = self._list_dhcp_agents_hosting_network( + net['network']['id']) + self.assertEqual(1, len(dhcp_agents['agents'])) + with mock.patch(is_agent_down_str) as mock_is_agent_down: + mock_is_agent_down.return_value = True + with self.network() as net: + dhcp_agents = self._list_dhcp_agents_hosting_network( + net['network']['id']) + self.assertEqual(0, len(dhcp_agents['agents'])) + + def test_network_scheduler_with_hosted_network(self): + dhcp_hosta = { + 'binary': 'quantum-dhcp-agent', + 'host': DHCP_HOSTA, + 'topic': 'DHCP_AGENT', + 'configurations': {'dhcp_driver': 'dhcp_driver', + 'use_namespaces': True, + }, + 'agent_type': constants.AGENT_TYPE_DHCP} + self._register_one_agent_state(dhcp_hosta) + agents = self._list_agents() + with self.network() as net1: + dhcp_agents = self._list_dhcp_agents_hosting_network( + net1['network']['id']) + self.assertEqual(1, len(dhcp_agents['agents'])) + with mock.patch.object(OVSQuantumPluginV2, + 'get_dhcp_agents_hosting_networks', + autospec=True) as mock_hosting_agents: + + mock_hosting_agents.return_value = agents['agents'] + with self.network(do_delete=False) as net2: + pass + dhcp_agents = self._list_dhcp_agents_hosting_network( + net2['network']['id']) + self.assertEqual(0, len(dhcp_agents['agents'])) + + def test_network_policy(self): + with self.network() as net1: + self._register_agent_states() + hosta_id = self._get_agent_id(constants.AGENT_TYPE_DHCP, + DHCP_HOSTA) + self._list_networks_hosted_by_dhcp_agent( + hosta_id, expected_code=exc.HTTPForbidden.code, + admin_context=False) + self._add_network_to_dhcp_agent( + hosta_id, net1['network']['id'], + expected_code=exc.HTTPForbidden.code, + admin_context=False) + self._add_network_to_dhcp_agent(hosta_id, + net1['network']['id']) + self._remove_network_from_dhcp_agent( + hosta_id, net1['network']['id'], + expected_code=exc.HTTPForbidden.code, + admin_context=False) + self._list_dhcp_agents_hosting_network( + net1['network']['id'], + expected_code=exc.HTTPForbidden.code, + admin_context=False) + + def test_network_add_to_dhcp_agent(self): + with self.network() as net1: + self._register_agent_states() + hosta_id = self._get_agent_id(constants.AGENT_TYPE_DHCP, + DHCP_HOSTA) + num_before_add = len( + self._list_networks_hosted_by_dhcp_agent( + hosta_id)['networks']) + self._add_network_to_dhcp_agent(hosta_id, + net1['network']['id']) + num_after_add = len( + self._list_networks_hosted_by_dhcp_agent( + hosta_id)['networks']) + self.assertEqual(0, num_before_add) + self.assertEqual(1, num_after_add) + + def test_network_remove_from_dhcp_agent(self): + dhcp_hosta = { + 'binary': 'quantum-dhcp-agent', + 'host': DHCP_HOSTA, + 'topic': 'DHCP_AGENT', + 'configurations': {'dhcp_driver': 'dhcp_driver', + 'use_namespaces': True, + }, + 'agent_type': constants.AGENT_TYPE_DHCP} + self._register_one_agent_state(dhcp_hosta) + hosta_id = self._get_agent_id(constants.AGENT_TYPE_DHCP, + DHCP_HOSTA) + with self.network() as net1: + num_before_remove = len( + self._list_networks_hosted_by_dhcp_agent( + hosta_id)['networks']) + self._remove_network_from_dhcp_agent(hosta_id, + net1['network']['id']) + num_after_remove = len( + self._list_networks_hosted_by_dhcp_agent( + hosta_id)['networks']) + self.assertEqual(1, num_before_remove) + self.assertEqual(0, num_after_remove) + + def test_router_auto_schedule_with_hosted(self): + with self.router() as router: + l3_rpc = l3_rpc_base.L3RpcCallbackMixin() + self._register_agent_states() + l3_rpc.sync_routers(self.adminContext, host=L3_HOSTA) + l3_rpc.sync_routers(self.adminContext, host=L3_HOSTB) + l3_agents = self._list_l3_agents_hosting_router( + router['router']['id']) + self.assertEqual(1, len(l3_agents['agents'])) + self.assertEqual(L3_HOSTA, l3_agents['agents'][0]['host']) + + def test_router_auto_schedule_with_hosted_2(self): + # one agent hosts one router + l3_rpc = l3_rpc_base.L3RpcCallbackMixin() + l3_hosta = { + 'binary': 'quantum-l3-agent', + 'host': L3_HOSTA, + 'topic': 'L3_AGENT', + 'configurations': {'use_namespaces': True, + 'router_id': None, + 'handle_internal_only_routers': + True, + 'gateway_external_network_id': + None, + 'interface_driver': 'interface_driver', + }, + 'agent_type': constants.AGENT_TYPE_L3} + l3_hostb = copy.deepcopy(l3_hosta) + l3_hostb['host'] = L3_HOSTB + with self.router() as router1: + self._register_one_agent_state(l3_hosta) + l3_rpc.sync_routers(self.adminContext, host=L3_HOSTA) + hosta_id = self._get_agent_id(constants.AGENT_TYPE_L3, + L3_HOSTA) + self._disable_agent(hosta_id, admin_state_up=False) + with self.router() as router2: + self._register_one_agent_state(l3_hostb) + l3_rpc.sync_routers(self.adminContext, host=L3_HOSTB) + l3_agents_1 = self._list_l3_agents_hosting_router( + router1['router']['id']) + l3_agents_2 = self._list_l3_agents_hosting_router( + router2['router']['id']) + hosta_routers = self._list_routers_hosted_by_l3_agent(hosta_id) + num_hosta_routers = len(hosta_routers['routers']) + hostb_id = self._get_agent_id( + constants.AGENT_TYPE_L3, + L3_HOSTB) + hostb_routers = self._list_routers_hosted_by_l3_agent(hostb_id) + num_hostc_routers = len(hostb_routers['routers']) + + self.assertEqual(1, num_hosta_routers) + self.assertEqual(1, num_hostc_routers) + self.assertEqual(1, len(l3_agents_1['agents'])) + self.assertEqual(1, len(l3_agents_2['agents'])) + self.assertEqual(L3_HOSTA, l3_agents_1['agents'][0]['host']) + self.assertEqual(L3_HOSTB, l3_agents_2['agents'][0]['host']) + + def test_router_auto_schedule_with_disabled(self): + with contextlib.nested(self.router(), + self.router()): + l3_rpc = l3_rpc_base.L3RpcCallbackMixin() + self._register_agent_states() + hosta_id = self._get_agent_id(constants.AGENT_TYPE_L3, + L3_HOSTA) + hostb_id = self._get_agent_id(constants.AGENT_TYPE_L3, + L3_HOSTB) + self._disable_agent(hosta_id) + # first agent will not host router since it is disabled + l3_rpc.sync_routers(self.adminContext, host=L3_HOSTA) + # second agent will host all the routers since first is disabled. + l3_rpc.sync_routers(self.adminContext, host=L3_HOSTB) + hostb_routers = self._list_routers_hosted_by_l3_agent(hostb_id) + num_hostb_routers = len(hostb_routers['routers']) + hosta_routers = self._list_routers_hosted_by_l3_agent(hosta_id) + num_hosta_routers = len(hosta_routers['routers']) + self.assertEqual(2, num_hostb_routers) + self.assertEqual(0, num_hosta_routers) + + def test_router_auto_schedule_with_candidates(self): + l3_hosta = { + 'binary': 'quantum-l3-agent', + 'host': L3_HOSTA, + 'topic': 'L3_AGENT', + 'configurations': {'use_namespaces': False, + 'router_id': None, + 'handle_internal_only_routers': + True, + 'gateway_external_network_id': + None, + 'interface_driver': 'interface_driver', + }, + 'agent_type': constants.AGENT_TYPE_L3} + with contextlib.nested(self.router(), + self.router()) as (router1, router2): + l3_rpc = l3_rpc_base.L3RpcCallbackMixin() + l3_hosta['configurations']['router_id'] = router1['router']['id'] + self._register_one_agent_state(l3_hosta) + hosta_id = self._get_agent_id(constants.AGENT_TYPE_L3, + L3_HOSTA) + l3_rpc.sync_routers(self.adminContext, host=L3_HOSTA) + hosta_routers = self._list_routers_hosted_by_l3_agent(hosta_id) + num_hosta_routers = len(hosta_routers['routers']) + l3_agents_1 = self._list_l3_agents_hosting_router( + router1['router']['id']) + l3_agents_2 = self._list_l3_agents_hosting_router( + router2['router']['id']) + # L3 agent will host only the compatible router. + self.assertEqual(1, num_hosta_routers) + self.assertEqual(1, len(l3_agents_1['agents'])) + self.assertEqual(0, len(l3_agents_2['agents'])) + + def test_router_schedule_with_candidates(self): + l3_hosta = { + 'binary': 'quantum-l3-agent', + 'host': L3_HOSTA, + 'topic': 'L3_AGENT', + 'configurations': {'use_namespaces': False, + 'router_id': None, + 'handle_internal_only_routers': + True, + 'gateway_external_network_id': + None, + 'interface_driver': 'interface_driver', + }, + 'agent_type': constants.AGENT_TYPE_L3} + with contextlib.nested(self.router(), + self.router(), + self.subnet(), + self.subnet(cidr='10.0.3.0/24')) as (router1, + router2, + subnet1, + subnet2): + l3_hosta['configurations']['router_id'] = router1['router']['id'] + self._register_one_agent_state(l3_hosta) + hosta_id = self._get_agent_id(constants.AGENT_TYPE_L3, + L3_HOSTA) + self._router_interface_action('add', + router1['router']['id'], + subnet1['subnet']['id'], + None) + self._router_interface_action('add', + router2['router']['id'], + subnet2['subnet']['id'], + None) + hosta_routers = self._list_routers_hosted_by_l3_agent(hosta_id) + num_hosta_routers = len(hosta_routers['routers']) + l3_agents_1 = self._list_l3_agents_hosting_router( + router1['router']['id']) + l3_agents_2 = self._list_l3_agents_hosting_router( + router2['router']['id']) + # L3 agent will host only the compatible router. + self.assertEqual(1, num_hosta_routers) + self.assertEqual(1, len(l3_agents_1['agents'])) + self.assertEqual(0, len(l3_agents_2['agents'])) + + def test_router_without_l3_agents(self): + with self.subnet() as s: + self._set_net_external(s['subnet']['network_id']) + data = {'router': {'tenant_id': uuidutils.generate_uuid()}} + data['router']['name'] = 'router1' + data['router']['external_gateway_info'] = { + 'network_id': s['subnet']['network_id']} + router_req = self.new_create_request('routers', data, self.fmt) + res = router_req.get_response(self.ext_api) + router = self.deserialize(self.fmt, res) + l3agents = ( + self.agentscheduler_dbMinxin.get_l3_agents_hosting_routers( + self.adminContext, [router['router']['id']])) + self._delete('routers', router['router']['id']) + self.assertEqual(0, len(l3agents)) + + def test_router_sync_data(self): + with contextlib.nested(self.subnet(), + self.subnet(cidr='10.0.2.0/24'), + self.subnet(cidr='10.0.3.0/24')) as ( + s1, s2, s3): + self._register_agent_states() + self._set_net_external(s1['subnet']['network_id']) + data = {'router': {'tenant_id': uuidutils.generate_uuid()}} + data['router']['name'] = 'router1' + data['router']['external_gateway_info'] = { + 'network_id': s1['subnet']['network_id']} + router_req = self.new_create_request('routers', data, self.fmt) + res = router_req.get_response(self.ext_api) + router = self.deserialize(self.fmt, res) + self._router_interface_action('add', + router['router']['id'], + s2['subnet']['id'], + None) + self._router_interface_action('add', + router['router']['id'], + s3['subnet']['id'], + None) + l3agents = self._list_l3_agents_hosting_router( + router['router']['id']) + self.assertEqual(1, len(l3agents['agents'])) + agents = self._list_agents() + another_l3_agent_id = None + another_l3_agent_host = None + default = l3agents['agents'][0]['id'] + for com in agents['agents']: + if (com['id'] != default and + com['agent_type'] == constants.AGENT_TYPE_L3): + another_l3_agent_id = com['id'] + another_l3_agent_host = com['host'] + break + self.assertTrue(another_l3_agent_id is not None) + self._add_router_to_l3_agent(another_l3_agent_id, + router['router']['id'], + expected_code=exc.HTTPConflict.code) + self._remove_router_from_l3_agent(default, + router['router']['id']) + self._add_router_to_l3_agent(another_l3_agent_id, + router['router']['id']) + l3agents = self._list_l3_agents_hosting_router( + router['router']['id']) + self.assertEqual(another_l3_agent_host, + l3agents['agents'][0]['host']) + self._remove_router_from_l3_agent(another_l3_agent_id, + router['router']['id']) + self._router_interface_action('remove', + router['router']['id'], + s2['subnet']['id'], + None) + l3agents = self._list_l3_agents_hosting_router( + router['router']['id']) + self.assertEqual(1, + len(l3agents['agents'])) + self._router_interface_action('remove', + router['router']['id'], + s3['subnet']['id'], + None) + self._delete('routers', router['router']['id']) + + def test_router_add_to_l3_agent(self): + with self.router() as router1: + self._register_agent_states() + hosta_id = self._get_agent_id(constants.AGENT_TYPE_L3, + L3_HOSTA) + num_before_add = len( + self._list_routers_hosted_by_l3_agent( + hosta_id)['routers']) + self._add_router_to_l3_agent(hosta_id, + router1['router']['id']) + hostb_id = self._get_agent_id(constants.AGENT_TYPE_L3, + L3_HOSTB) + self._add_router_to_l3_agent(hostb_id, + router1['router']['id'], + expected_code=exc.HTTPConflict.code) + num_after_add = len( + self._list_routers_hosted_by_l3_agent( + hosta_id)['routers']) + self.assertEqual(0, num_before_add) + self.assertEqual(1, num_after_add) + + def test_router_policy(self): + with self.router() as router1: + self._register_agent_states() + hosta_id = self._get_agent_id(constants.AGENT_TYPE_L3, + L3_HOSTA) + self._list_routers_hosted_by_l3_agent( + hosta_id, expected_code=exc.HTTPForbidden.code, + admin_context=False) + self._add_router_to_l3_agent( + hosta_id, router1['router']['id'], + expected_code=exc.HTTPForbidden.code, + admin_context=False) + self._add_router_to_l3_agent( + hosta_id, router1['router']['id']) + self._remove_router_from_l3_agent( + hosta_id, router1['router']['id'], + expected_code=exc.HTTPForbidden.code, + admin_context=False) + self._list_l3_agents_hosting_router( + router1['router']['id'], + expected_code=exc.HTTPForbidden.code, + admin_context=False) + + +class L3AgentNotifierTestCase(test_l3_plugin.L3NatTestCaseMixin, + test_agent_ext_plugin.AgentDBTestMixIn, + AgentSchedulerTestMixIn, + test_plugin.QuantumDbPluginV2TestCase): + def setUp(self): + plugin = ('quantum.plugins.openvswitch.' + 'ovs_quantum_plugin.OVSQuantumPluginV2') + self.dhcp_notifier_cls_p = mock.patch( + 'quantum.api.rpc.agentnotifiers.dhcp_rpc_agent_api.' + 'DhcpAgentNotifyAPI') + self.dhcp_notifier = mock.Mock(name='dhcp_notifier') + self.dhcp_notifier_cls = self.dhcp_notifier_cls_p.start() + self.dhcp_notifier_cls.return_value = self.dhcp_notifier + super(L3AgentNotifierTestCase, self).setUp(plugin) + ext_mgr = extensions.PluginAwareExtensionManager.get_instance() + self.ext_api = test_extensions.setup_extensions_middleware(ext_mgr) + self.adminContext = context.get_admin_context() + self.addCleanup(self.dhcp_notifier_cls_p.stop) + + def test_router_add_to_l3_agent_notification(self): + plugin = manager.QuantumManager.get_plugin() + with mock.patch.object(plugin.l3_agent_notifier, 'cast') as mock_l3: + with self.router() as router1: + self._register_agent_states() + hosta_id = self._get_agent_id(constants.AGENT_TYPE_L3, + L3_HOSTA) + self._add_router_to_l3_agent(hosta_id, + router1['router']['id']) + routers = plugin.get_sync_data(self.adminContext, + [router1['router']['id']]) + mock_l3.assert_called_with( + mock.ANY, + plugin.l3_agent_notifier.make_msg( + 'router_added_to_agent', + payload=routers), + topic='l3_agent.hosta') + + def test_router_remove_from_l3_agent_notification(self): + plugin = manager.QuantumManager.get_plugin() + with mock.patch.object(plugin.l3_agent_notifier, 'cast') as mock_l3: + with self.router() as router1: + self._register_agent_states() + hosta_id = self._get_agent_id(constants.AGENT_TYPE_L3, + L3_HOSTA) + self._add_router_to_l3_agent(hosta_id, + router1['router']['id']) + self._remove_router_from_l3_agent(hosta_id, + router1['router']['id']) + mock_l3.assert_called_with( + mock.ANY, plugin.l3_agent_notifier.make_msg( + 'router_removed_from_agent', + payload={'router_id': router1['router']['id']}), + topic='l3_agent.hosta') + + def test_agent_updated_l3_agent_notification(self): + plugin = manager.QuantumManager.get_plugin() + with mock.patch.object(plugin.l3_agent_notifier, 'cast') as mock_l3: + self._register_agent_states() + hosta_id = self._get_agent_id(constants.AGENT_TYPE_L3, + L3_HOSTA) + self._disable_agent(hosta_id, admin_state_up=False) + mock_l3.assert_called_with( + mock.ANY, plugin.l3_agent_notifier.make_msg( + 'agent_updated', + payload={'admin_state_up': False}), + topic='l3_agent.hosta') + + +class AgentSchedulerTestCaseXML(AgentSchedulerTestCase): + fmt = 'xml' diff --git a/quantum/tests/unit/test_agent_ext_plugin.py b/quantum/tests/unit/test_agent_ext_plugin.py index f38b72c48..24acc4ea1 100644 --- a/quantum/tests/unit/test_agent_ext_plugin.py +++ b/quantum/tests/unit/test_agent_ext_plugin.py @@ -62,28 +62,17 @@ class TestAgentPlugin(db_base_plugin_v2.QuantumDbPluginV2, supported_extension_aliases = ["agent"] -class AgentDBTestCase(test_db_plugin.QuantumDbPluginV2TestCase): - fmt = 'json' - - def setUp(self): - self.adminContext = context.get_admin_context() - test_config['plugin_name_v2'] = ( - 'quantum.tests.unit.test_agent_ext_plugin.TestAgentPlugin') - # for these tests we need to enable overlapping ips - cfg.CONF.set_default('allow_overlapping_ips', True) - ext_mgr = AgentTestExtensionManager() - test_config['extension_manager'] = ext_mgr - super(AgentDBTestCase, self).setUp() +class AgentDBTestMixIn(object): def _list_agents(self, expected_res_status=None, quantum_context=None, query_string=None): - comp_res = self._list('agents', - quantum_context=quantum_context, - query_params=query_string) + agent_res = self._list('agents', + quantum_context=quantum_context, + query_params=query_string) if expected_res_status: - self.assertEqual(comp_res.status_int, expected_res_status) - return comp_res + self.assertEqual(agent_res.status_int, expected_res_status) + return agent_res def _register_agent_states(self): """Register two L3 agents and two DHCP agents.""" @@ -123,6 +112,21 @@ class AgentDBTestCase(test_db_plugin.QuantumDbPluginV2TestCase): agent_state={'agent_state': dhcp_hostc}) return [l3_hosta, l3_hostb, dhcp_hosta, dhcp_hostc] + +class AgentDBTestCase(AgentDBTestMixIn, + test_db_plugin.QuantumDbPluginV2TestCase): + fmt = 'json' + + def setUp(self): + self.adminContext = context.get_admin_context() + test_config['plugin_name_v2'] = ( + 'quantum.tests.unit.test_agent_ext_plugin.TestAgentPlugin') + # for these tests we need to enable overlapping ips + cfg.CONF.set_default('allow_overlapping_ips', True) + ext_mgr = AgentTestExtensionManager() + test_config['extension_manager'] = ext_mgr + super(AgentDBTestCase, self).setUp() + def test_create_agent(self): data = {'agent': {}} _req = self.new_create_request('agents', data, self.fmt) diff --git a/quantum/tests/unit/test_db_rpc_base.py b/quantum/tests/unit/test_db_rpc_base.py index 994e3c82b..136973e37 100644 --- a/quantum/tests/unit/test_db_rpc_base.py +++ b/quantum/tests/unit/test_db_rpc_base.py @@ -25,7 +25,7 @@ class TestDhcpRpcCallackMixin(testtools.TestCase): super(TestDhcpRpcCallackMixin, self).setUp() self.plugin_p = mock.patch('quantum.manager.QuantumManager.get_plugin') get_plugin = self.plugin_p.start() - self.plugin = mock.Mock() + self.plugin = mock.MagicMock() get_plugin.return_value = self.plugin self.callbacks = dhcp_rpc_base.DhcpRpcCallbackMixin() self.log_p = mock.patch('quantum.db.dhcp_rpc_base.LOG') diff --git a/quantum/tests/unit/test_l3_agent.py b/quantum/tests/unit/test_l3_agent.py index 38bd901a0..576b10f61 100644 --- a/quantum/tests/unit/test_l3_agent.py +++ b/quantum/tests/unit/test_l3_agent.py @@ -88,7 +88,7 @@ class TestBasicRouterOperations(testtools.TestCase): def testRouterInfoCreate(self): id = _uuid() ri = l3_agent.RouterInfo(id, self.conf.root_helper, - self.conf.use_namespaces) + self.conf.use_namespaces, None) self.assertTrue(ri.ns_name().endswith(id)) @@ -100,7 +100,7 @@ class TestBasicRouterOperations(testtools.TestCase): router_id = _uuid() network_id = _uuid() ri = l3_agent.RouterInfo(router_id, self.conf.root_helper, - self.conf.use_namespaces) + self.conf.use_namespaces, None) agent = l3_agent.L3NATAgent(HOSTNAME, self.conf) cidr = '99.0.1.9/24' mac = 'ca:fe:de:ad:be:ef' @@ -128,7 +128,7 @@ class TestBasicRouterOperations(testtools.TestCase): def _test_external_gateway_action(self, action): router_id = _uuid() ri = l3_agent.RouterInfo(router_id, self.conf.root_helper, - self.conf.use_namespaces) + self.conf.use_namespaces, None) agent = l3_agent.L3NATAgent(HOSTNAME, self.conf) internal_cidrs = ['100.0.1.0/24', '200.74.0.0/16'] ex_gw_port = {'fixed_ips': [{'ip_address': '20.0.0.30', @@ -172,7 +172,7 @@ class TestBasicRouterOperations(testtools.TestCase): def _test_floating_ip_action(self, action): router_id = _uuid() ri = l3_agent.RouterInfo(router_id, self.conf.root_helper, - self.conf.use_namespaces) + self.conf.use_namespaces, None) agent = l3_agent.L3NATAgent(HOSTNAME, self.conf) floating_ip = '20.0.0.100' fixed_ip = '10.0.0.23' @@ -227,7 +227,8 @@ class TestBasicRouterOperations(testtools.TestCase): router_id = _uuid() ri = l3_agent.RouterInfo(router_id, self.conf.root_helper, - self.conf.use_namespaces) + self.conf.use_namespaces, + None) agent = l3_agent.L3NATAgent(HOSTNAME, self.conf) fake_route1 = {'destination': '135.207.0.0/16', @@ -274,7 +275,8 @@ class TestBasicRouterOperations(testtools.TestCase): router_id = _uuid() ri = l3_agent.RouterInfo(router_id, self.conf.root_helper, - self.conf.use_namespaces) + self.conf.use_namespaces, + None) ri.router = {} fake_old_routes = [] diff --git a/quantum/tests/unit/test_l3_plugin.py b/quantum/tests/unit/test_l3_plugin.py index 3e0e7b0d5..0eea65304 100644 --- a/quantum/tests/unit/test_l3_plugin.py +++ b/quantum/tests/unit/test_l3_plugin.py @@ -29,6 +29,7 @@ from webob import exc import webtest from quantum.api import extensions +from quantum.api.rpc.agentnotifiers import l3_rpc_agent_api from quantum.api.v2 import attributes from quantum.common import config from quantum.common import constants as l3_constants @@ -37,7 +38,6 @@ from quantum.common.test_lib import test_config from quantum import context from quantum.db import db_base_plugin_v2 from quantum.db import l3_db -from quantum.db import l3_rpc_agent_api from quantum.db import models_v2 from quantum.extensions import l3 from quantum.manager import QuantumManager @@ -307,24 +307,7 @@ class TestL3NatPlugin(db_base_plugin_v2.QuantumDbPluginV2, return super(TestL3NatPlugin, self).delete_port(context, id) -class L3NatTestCaseBase(test_db_plugin.QuantumDbPluginV2TestCase): - - def setUp(self): - test_config['plugin_name_v2'] = ( - 'quantum.tests.unit.test_l3_plugin.TestL3NatPlugin') - # for these tests we need to enable overlapping ips - cfg.CONF.set_default('allow_overlapping_ips', True) - ext_mgr = L3TestExtensionManager() - test_config['extension_manager'] = ext_mgr - super(L3NatTestCaseBase, self).setUp() - - # Set to None to reload the drivers - notifier_api._drivers = None - cfg.CONF.set_override("notification_driver", [test_notifier.__name__]) - - def tearDown(self): - test_notifier.NOTIFICATIONS = [] - super(L3NatTestCaseBase, self).tearDown() +class L3NatTestCaseMixin(object): def _create_network(self, fmt, name, admin_state_up, **kwargs): """ Override the routine for allowing the router:external attribute """ @@ -334,7 +317,7 @@ class L3NatTestCaseBase(test_db_plugin.QuantumDbPluginV2TestCase): kwargs), kwargs.values())) arg_list = new_args.pop('arg_list', ()) + (l3.EXTERNAL,) - return super(L3NatTestCaseBase, self)._create_network( + return super(L3NatTestCaseMixin, self)._create_network( fmt, name, admin_state_up, arg_list=arg_list, **new_args) def _create_router(self, fmt, tenant_id, name=None, @@ -505,6 +488,27 @@ class L3NatTestCaseBase(test_db_plugin.QuantumDbPluginV2TestCase): public_sub['subnet']['network_id']) +class L3NatTestCaseBase(L3NatTestCaseMixin, + test_db_plugin.QuantumDbPluginV2TestCase): + + def setUp(self): + test_config['plugin_name_v2'] = ( + 'quantum.tests.unit.test_l3_plugin.TestL3NatPlugin') + # for these tests we need to enable overlapping ips + cfg.CONF.set_default('allow_overlapping_ips', True) + ext_mgr = L3TestExtensionManager() + test_config['extension_manager'] = ext_mgr + super(L3NatTestCaseBase, self).setUp() + + # Set to None to reload the drivers + notifier_api._drivers = None + cfg.CONF.set_override("notification_driver", [test_notifier.__name__]) + + def tearDown(self): + test_notifier.NOTIFICATIONS = [] + super(L3NatTestCaseBase, self).tearDown() + + class L3NatDBTestCase(L3NatTestCaseBase): def test_router_create(self): @@ -1459,7 +1463,7 @@ class L3NatDBTestCase(L3NatTestCaseBase): def _test_notify_op_agent(self, target_func, *args): l3_rpc_agent_api_str = ( - 'quantum.db.l3_rpc_agent_api.L3AgentNotifyAPI') + 'quantum.api.rpc.agentnotifiers.l3_rpc_agent_api.L3AgentNotifyAPI') oldNotify = l3_rpc_agent_api.L3AgentNotify try: with mock.patch(l3_rpc_agent_api_str) as notifyApi: diff --git a/tox.ini b/tox.ini index 45261b2f0..3b4fadeaf 100644 --- a/tox.ini +++ b/tox.ini @@ -19,8 +19,12 @@ sitepackages = True downloadcache = ~/cache/pip [testenv:pep8] +# E712 comparison to False should be 'if cond is False:' or 'if not cond:' +# query = query.filter(Component.disabled == False) +# E125 continuation line does not distinguish itself from next logical line + commands = - pep8 --repeat --show-source --ignore=E125 --exclude=.venv,.tox,dist,doc,openstack,*egg . + pep8 --repeat --show-source --ignore=E125,E712 --exclude=.venv,.tox,dist,doc,openstack,*egg . pep8 --repeat --show-source --ignore=E125 --filename=quantum* bin [testenv:i18n]