From b672c26cb42ad3d9a17ed049b506b5622601e891 Mon Sep 17 00:00:00 2001 From: Kevin Benton Date: Fri, 15 Apr 2016 06:05:56 -0700 Subject: [PATCH] Add provisioning blocks to status ACTIVE transition Sometimes an object requires multiple disjoint actors to complete a set of tasks before the status of the object should be transitioned to ACTIVE. The main example of this is when a port is being created. The L2 agent has to do its business to wire up the VIF, but at the same time the DHCP agent has to setup the DHCP reservation. This led to Nova booting the VM when the L2 agent was done even though the DHCP agent may have been nowhere near ready. This patch introduces a provisioning blocks mechansim that allows the entities to be tracked that need to be involved to make a transition to ACTIVE happen. See the devref in the dependent patch for a high-level view of how this works. The ML2 code is updated to use this new mechanism to prevent updating the port status to ACTIVE without both the DHCP agent and L2 agent reporting that the port is ready. The DHCP RPC API required a version bump to allow the port ready notification. This also adds a devref doc for the provisioning_blocks module with a high-level overview of how it works in addition to a detailed description of how it is used specifically with ML2, the L2 agents, and the DHCP agents. Closes-Bug: #1453350 Change-Id: Id85ff6de1a14a550ab50baf4f79d3130af3680c8 --- doc/source/devref/index.rst | 1 + doc/source/devref/provisioning_blocks.rst | 159 +++++++++++++++++ neutron/agent/dhcp/agent.py | 48 +++++ neutron/api/rpc/handlers/dhcp_rpc.py | 11 +- .../alembic_migrations/versions/EXPAND_HEAD | 2 +- .../30107ab6a3ee_provisioning_blocks.py | 39 ++++ neutron/db/migration/models/head.py | 1 + neutron/db/provisioning_blocks.py | 168 ++++++++++++++++++ neutron/plugins/ml2/db.py | 8 + neutron/plugins/ml2/drivers/mech_agent.py | 29 +++ neutron/plugins/ml2/plugin.py | 61 ++++++- neutron/plugins/ml2/rpc.py | 44 +++-- neutron/tests/unit/agent/dhcp/test_agent.py | 71 +++++++- .../unit/api/rpc/handlers/test_dhcp_rpc.py | 13 ++ .../tests/unit/db/test_provisioning_blocks.py | 130 ++++++++++++++ neutron/tests/unit/plugins/ml2/test_plugin.py | 40 +++++ neutron/tests/unit/plugins/ml2/test_rpc.py | 27 ++- 17 files changed, 808 insertions(+), 44 deletions(-) create mode 100644 doc/source/devref/provisioning_blocks.rst create mode 100644 neutron/db/migration/alembic_migrations/versions/newton/expand/30107ab6a3ee_provisioning_blocks.py create mode 100644 neutron/db/provisioning_blocks.py create mode 100644 neutron/tests/unit/db/test_provisioning_blocks.py diff --git a/doc/source/devref/index.rst b/doc/source/devref/index.rst index b172d048a91..30987c8d212 100644 --- a/doc/source/devref/index.rst +++ b/doc/source/devref/index.rst @@ -77,6 +77,7 @@ Neutron Internals openvswitch_firewall network_ip_availability tag + provisioning_blocks Testing ------- diff --git a/doc/source/devref/provisioning_blocks.rst b/doc/source/devref/provisioning_blocks.rst new file mode 100644 index 00000000000..e5f764c05d1 --- /dev/null +++ b/doc/source/devref/provisioning_blocks.rst @@ -0,0 +1,159 @@ +.. + Licensed under the Apache License, Version 2.0 (the "License"); you may + not use this file except in compliance with the License. You may obtain + a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + License for the specific language governing permissions and limitations + under the License. + + + Convention for heading levels in Neutron devref: + ======= Heading 0 (reserved for the title in a document) + ------- Heading 1 + ~~~~~~~ Heading 2 + +++++++ Heading 3 + ''''''' Heading 4 + (Avoid deeper levels because they do not render well.) + + +Composite Object Status via Provisioning Blocks +=============================================== + +We use the STATUS field on objects to indicate when a resource is ready +by setting it to ACTIVE so external systems know when it's safe to use +that resource. Knowing when to set the status to ACTIVE is simple when +there is only one entity responsible for provisioning a given object. +When that entity has finishing provisioning, we just update the STATUS +directly to active. However, there are resources in Neutron that require +provisioning by multiple asynchronous entities before they are ready to +be used so managing the transition to the ACTIVE status becomes more +complex. To handle these cases, Neutron has the provisioning_blocks +module to track the entities that are still provisioning a resource. + +The main example of this is with ML2, the L2 agents and the DHCP agents. +When a port is created and bound to a host, it's placed in the DOWN +status. The L2 agent now has to setup flows, security group rules, etc +for the port and the DHCP agent has to setup a DHCP reservation for +the port's IP and MAC. Before the transition to ACTIVE, both agents +must complete their work or the port user (e.g. Nova) may attempt to +use the port and not have connectivity. To solve this, the +provisioning_blocks module is used to track the provisioning state +of each agent and the status is only updated when both complete. + + +High Level View +--------------- + +To make use of the provisioning_blocks module, provisioning components +should be added whenever there is work to be done by another entity +before an object's status can transition to ACTIVE. This is +accomplished by calling the add_provisioning_component method for +each entity. Then as each entity finishes provisioning the object, +the provisioning_complete must be called to lift the provisioning +block. + +When the last provisioning block is removed, the provisioning_blocks +module will trigger a callback notification containing the object ID +for the object's resource type with the event PROVISIONING_COMPLETE. +A subscriber to this event can now update the status of this object +to ACTIVE or perform any other necessary actions. + +A normal state transition will look something like the following: + +1. Request comes in to create an object +2. Logic on the Neutron server determines which entities are required + to provision the object and adds a provisioning component for each + entity for that object. +3. A notification is emitted to the entities so they start their work. +4. Object is returned to the API caller in the DOWN (or BUILD) state. +5. Each entity tells the server when it has finished provisioning the + object. The server calls provisioning_complete for each entity that + finishes. +6. When provisioning_complete is called on the last remaining entity, + the provisioning_blocks module will emit an event indicating that + provisioning has completed for that object. +7. A subscriber to this event on the server will then update the status + of the object to ACTIVE to indicate that it is fully provisioned. + +For a more concrete example, see the section below. + + +ML2, L2 agents, and DHCP agents +------------------------------- + +ML2 makes use of the provisioning_blocks module to prevent the status +of ports from being transitioned to ACTIVE until both the L2 agent and +the DHCP agent have finished wiring a port. + +When a port is created or updated, the following happens to register +the DHCP agent's provisioning blocks: + +1. The subnet_ids are extracted from the fixed_ips field of the port + and then ML2 checks to see if DHCP is enabled on any of the subnets. +2. The configuration for the DHCP agents hosting the network are looked + up to ensure that at least one of them is new enough to report back + that it has finished setting up the port reservation. +3. If either of the preconditions above fail, a provisioning block for + the DHCP agent is not added and any existing DHCP agent blocks for + that port are cleared to ensure the port isn't blocked waiting for an + event that will never happen. +4. If the preconditions pass, a provisioning block is added for the port + under the 'DHCP' entity. + +When a port is created or updated, the following happens to register the +L2 agent's provisioning blocks: + +1. If the port is not bound, nothing happens because we don't know yet + if an L2 agent is involved so we have to wait until a port update that + binds it. +2. Once the port is bound, the agent based mechanism drivers will check + if they have an agent on the bound host and if the VNIC type belongs + to the mechanism driver, a provisioning block is added for the port + under the 'L2 Agent' entity. + + +Once the DHCP agent has finished setting up the reservation, it calls +dhcp_ready_on_ports via the RPC API with the port ID. The DHCP RPC +handler receives this and calls 'provisioning_complete' in the +provisioning module with the port ID and the 'DHCP' entity to remove +the provisioning block. + +Once the L2 agent has finished setting up the reservation, it calls +the normal update_device_list (or update_device_up) via the RPC API. +The RPC callbacks handler calls 'provisioning_complete' with the +port ID and the 'L2 Agent' entity to remove the provisioning block. + +On the 'provisioning_complete' call that removes the last record, +the provisioning_blocks module emits a callback PROVISIONING_COMPLETE +event with the port ID. A function subscribed to this in ML2 then calls +update_port_status to set the port to ACTIVE. + +At this point the normal notification is emitted to Nova allowing the +VM to be unpaused. + +In the event that the DHCP or L2 agent is down, the port will not +transition to the ACTIVE status (as is the case now if the L2 agent +is down). Agents must account for this by telling the server that +wiring has been completed after configuring everything during +startup. This ensures that ports created on offline agents (or agents +that crash and restart) eventually become active. + +To account for server instability, the notifications about port wiring +be complete must use RPC calls so the agent gets a positive +acknowledgement from the server and it must keep retrying until either +the port is deleted or it is successful. + +If an ML2 driver immediately places a bound port in the ACTIVE state +(e.g. after calling a backend in update_port_postcommit), this patch +will not have any impact on that process. + + +References +---------- + +.. [#] Provisioning Blocks Module: http://git.openstack.org/cgit/openstack/neutron/tree/neutron/db/provisioning_blocks.py diff --git a/neutron/agent/dhcp/agent.py b/neutron/agent/dhcp/agent.py index b2e436b4369..1e3505a46fd 100644 --- a/neutron/agent/dhcp/agent.py +++ b/neutron/agent/dhcp/agent.py @@ -54,6 +54,7 @@ class DhcpAgent(manager.Manager): def __init__(self, host=None, conf=None): super(DhcpAgent, self).__init__(host=host) self.needs_resync_reasons = collections.defaultdict(list) + self.dhcp_ready_ports = set() self.conf = conf or cfg.CONF self.cache = NetworkCache() self.dhcp_driver_cls = importutils.import_class(self.conf.dhcp_driver) @@ -97,6 +98,7 @@ class DhcpAgent(manager.Manager): """Activate the DHCP agent.""" self.sync_state() self.periodic_resync() + self.start_ready_ports_loop() def call_driver(self, action, network, **action_kwargs): """Invoke an action on a DHCP driver instance.""" @@ -169,6 +171,9 @@ class DhcpAgent(manager.Manager): network.id in only_nets): # specific network to sync pool.spawn(self.safe_configure_dhcp_for_network, network) pool.waitall() + # we notify all ports in case some were created while the agent + # was down + self.dhcp_ready_ports |= set(self.cache.get_port_ids()) LOG.info(_LI('Synchronizing state complete')) except Exception as e: @@ -179,6 +184,37 @@ class DhcpAgent(manager.Manager): self.schedule_resync(e) LOG.exception(_LE('Unable to sync network state.')) + def _dhcp_ready_ports_loop(self): + """Notifies the server of any ports that had reservations setup.""" + while True: + # this is just watching a set so we can do it really frequently + eventlet.sleep(0.1) + if self.dhcp_ready_ports: + ports_to_send = self.dhcp_ready_ports + self.dhcp_ready_ports = set() + try: + self.plugin_rpc.dhcp_ready_on_ports(ports_to_send) + continue + except oslo_messaging.MessagingTimeout: + LOG.error(_LE("Timeout notifying server of ports ready. " + "Retrying...")) + except Exception as e: + if (isinstance(e, oslo_messaging.RemoteError) + and e.exc_type == 'NoSuchMethod'): + LOG.info(_LI("Server does not support port ready " + "notifications. Waiting for 5 minutes " + "before retrying.")) + eventlet.sleep(300) + continue + LOG.exception(_LE("Failure notifying DHCP server of " + "ready DHCP ports. Will retry on next " + "iteration.")) + self.dhcp_ready_ports |= ports_to_send + + def start_ready_ports_loop(self): + """Spawn a thread to push changed ports to server.""" + eventlet.spawn(self._dhcp_ready_ports_loop) + @utils.exception_logger() def _periodic_resync_helper(self): """Resync the dhcp state at the configured interval.""" @@ -348,6 +384,7 @@ class DhcpAgent(manager.Manager): driver_action = 'restart' self.cache.put_port(updated_port) self.call_driver(driver_action, network) + self.dhcp_ready_ports.add(updated_port.id) def _is_port_on_this_agent(self, port): thishost = utils.get_dhcp_agent_device_id( @@ -421,6 +458,7 @@ class DhcpPluginApi(object): 1.0 - Initial version. 1.1 - Added get_active_networks_info, create_dhcp_port, and update_dhcp_port methods. + 1.5 - Added dhcp_ready_on_ports """ @@ -471,6 +509,12 @@ class DhcpPluginApi(object): network_id=network_id, device_id=device_id, host=self.host) + def dhcp_ready_on_ports(self, port_ids): + """Notify the server that DHCP is configured for the port.""" + cctxt = self.client.prepare(version='1.5') + return cctxt.call(self.context, 'dhcp_ready_on_ports', + port_ids=port_ids) + class NetworkCache(object): """Agent cache of the current network state.""" @@ -479,6 +523,9 @@ class NetworkCache(object): self.subnet_lookup = {} self.port_lookup = {} + def get_port_ids(self): + return self.port_lookup.keys() + def get_network_ids(self): return self.cache.keys() @@ -563,6 +610,7 @@ class DhcpAgentWithStateReport(DhcpAgent): 'availability_zone': self.conf.AGENT.availability_zone, 'topic': topics.DHCP_AGENT, 'configurations': { + 'notifies_port_ready': True, 'dhcp_driver': self.conf.dhcp_driver, 'dhcp_lease_duration': self.conf.dhcp_lease_duration, 'log_agent_heartbeats': self.conf.AGENT.log_agent_heartbeats}, diff --git a/neutron/api/rpc/handlers/dhcp_rpc.py b/neutron/api/rpc/handlers/dhcp_rpc.py index 8722187452e..cffe7eae8ea 100644 --- a/neutron/api/rpc/handlers/dhcp_rpc.py +++ b/neutron/api/rpc/handlers/dhcp_rpc.py @@ -26,10 +26,12 @@ import oslo_messaging from oslo_utils import excutils from neutron._i18n import _, _LW +from neutron.callbacks import resources from neutron.common import constants as n_const from neutron.common import exceptions as n_exc from neutron.common import utils from neutron.db import api as db_api +from neutron.db import provisioning_blocks from neutron.extensions import portbindings from neutron import manager from neutron.plugins.common import utils as p_utils @@ -64,9 +66,10 @@ class DhcpRpcCallback(object): # 1.4 - Removed update_lease_expiration. It's not used by reference # DHCP agent since Juno, so similar rationale for not bumping the # major version as above applies here too. + # 1.5 - Added dhcp_ready_on_ports. target = oslo_messaging.Target( namespace=n_const.RPC_NAMESPACE_DHCP_PLUGIN, - version='1.4') + version='1.5') def _get_active_networks(self, context, **kwargs): """Retrieve and return a list of the active networks.""" @@ -225,3 +228,9 @@ class DhcpRpcCallback(object): {'port': port, 'host': host}) return self._port_action(plugin, context, port, 'update_port') + + def dhcp_ready_on_ports(self, context, port_ids): + for port_id in port_ids: + provisioning_blocks.provisioning_complete( + context, port_id, resources.PORT, + provisioning_blocks.DHCP_ENTITY) diff --git a/neutron/db/migration/alembic_migrations/versions/EXPAND_HEAD b/neutron/db/migration/alembic_migrations/versions/EXPAND_HEAD index 95db9b9d117..cf26eb1f33e 100644 --- a/neutron/db/migration/alembic_migrations/versions/EXPAND_HEAD +++ b/neutron/db/migration/alembic_migrations/versions/EXPAND_HEAD @@ -1 +1 @@ -d3435b514502 +30107ab6a3ee diff --git a/neutron/db/migration/alembic_migrations/versions/newton/expand/30107ab6a3ee_provisioning_blocks.py b/neutron/db/migration/alembic_migrations/versions/newton/expand/30107ab6a3ee_provisioning_blocks.py new file mode 100644 index 00000000000..01d56f39f6c --- /dev/null +++ b/neutron/db/migration/alembic_migrations/versions/newton/expand/30107ab6a3ee_provisioning_blocks.py @@ -0,0 +1,39 @@ +# Copyright 2015 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +"""provisioning_blocks.py + +Revision ID: 30107ab6a3ee +Revises: d3435b514502 +Create Date: 2016-04-15 05:59:59.000001 + +""" + +# revision identifiers, used by Alembic. +revision = '30107ab6a3ee' +down_revision = 'd3435b514502' +from alembic import op +import sqlalchemy as sa + + +def upgrade(): + op.create_table( + 'provisioningblocks', + sa.Column('standard_attr_id', sa.BigInteger(), + sa.ForeignKey('standardattributes.id', ondelete='CASCADE'), + nullable=False, primary_key=True), + sa.Column('entity', sa.String(length=255), nullable=False, + primary_key=True), + ) diff --git a/neutron/db/migration/models/head.py b/neutron/db/migration/models/head.py index 1626e6833e4..6112dbf68ab 100644 --- a/neutron/db/migration/models/head.py +++ b/neutron/db/migration/models/head.py @@ -42,6 +42,7 @@ from neutron.db import model_base from neutron.db import models_v2 # noqa from neutron.db import portbindings_db # noqa from neutron.db import portsecurity_db # noqa +from neutron.db import provisioning_blocks # noqa from neutron.db.qos import models as qos_models # noqa from neutron.db.quota import models # noqa from neutron.db import rbac_db_models # noqa diff --git a/neutron/db/provisioning_blocks.py b/neutron/db/provisioning_blocks.py new file mode 100644 index 00000000000..d55870b7946 --- /dev/null +++ b/neutron/db/provisioning_blocks.py @@ -0,0 +1,168 @@ +# Copyright 2016 Mirantis, Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +from oslo_db import exception as db_exc +from oslo_log import log as logging +import sqlalchemy as sa + +from neutron._i18n import _LE +from neutron.callbacks import registry +from neutron.callbacks import resources +from neutron.db import api as db_api +from neutron.db import model_base +from neutron.db import models_v2 + +LOG = logging.getLogger(__name__) +PROVISIONING_COMPLETE = 'provisioning_complete' +# identifiers for the various entities that participate in provisioning +DHCP_ENTITY = 'DHCP' +L2_AGENT_ENTITY = 'L2' +_RESOURCE_TO_MODEL_MAP = {resources.PORT: models_v2.Port} + + +class ProvisioningBlock(model_base.BASEV2): + # the standard attr id of the thing we want to block + standard_attr_id = ( + sa.Column(sa.BigInteger().with_variant(sa.Integer(), 'sqlite'), + sa.ForeignKey(model_base.StandardAttribute.id, + ondelete="CASCADE"), + primary_key=True)) + # the entity that wants to block the status change (e.g. L2 Agent) + entity = sa.Column(sa.String(255), nullable=False, primary_key=True) + + +def add_model_for_resource(resource, model): + """Adds a mapping between a callback resource and a DB model.""" + _RESOURCE_TO_MODEL_MAP[resource] = model + + +def add_provisioning_component(context, object_id, object_type, entity): + """Adds a provisioning block by an entity to a given object. + + Adds a provisioning block to the DB for object_id with an identifier + of the entity that is doing the provisioning. While an object has these + provisioning blocks present, this module will not emit any callback events + indicating that provisioning has completed. Any logic that depends on + multiple disjoint components use these blocks and subscribe to the + PROVISIONING_COMPLETE event to know when all components have completed. + + :param context: neutron api request context + :param object_id: ID of object that has been provisioned + :param object_type: callback resource type of the object + :param entity: The entity that has provisioned the object + """ + log_dict = {'entity': entity, 'oid': object_id, 'otype': object_type} + # we get an object's ID, so we need to convert that into a standard attr id + standard_attr_id = _get_standard_attr_id(context, object_id, object_type) + if not standard_attr_id: + return + try: + with db_api.autonested_transaction(context.session): + record = ProvisioningBlock(standard_attr_id=standard_attr_id, + entity=entity) + context.session.add(record) + except db_exc.DBDuplicateEntry: + # an entry could be leftover from a previous transition that hasn't + # yet been provisioned. (e.g. multiple updates in a short period) + LOG.debug("Ignored duplicate provisioning block setup for %(otype)s " + "%(oid)s by entity %(entity)s.", log_dict) + return + LOG.debug("Transition to ACTIVE for %(otype)s object %(oid)s " + "will not be triggered until provisioned by entity %(entity)s.", + log_dict) + + +def remove_provisioning_component(context, object_id, object_type, entity, + standard_attr_id=None): + """Removes a provisioning block for an object with triggering a callback. + + Removes a provisioning block without triggering a callback. A user of this + module should call this when a block is no longer correct. If the block has + been satisfied, the 'provisioning_complete' method should be called. + + :param context: neutron api request context + :param object_id: ID of object that has been provisioned + :param object_type: callback resource type of the object + :param entity: The entity that has provisioned the object + :param standard_attr_id: Optional ID to pass to the function to avoid the + extra DB lookup to translate the object_id into + the standard_attr_id. + :return: boolean indicating whether or not a record was deleted + """ + with context.session.begin(subtransactions=True): + standard_attr_id = standard_attr_id or _get_standard_attr_id( + context, object_id, object_type) + if not standard_attr_id: + return False + record = context.session.query(ProvisioningBlock).filter_by( + standard_attr_id=standard_attr_id, entity=entity).first() + if record: + context.session.delete(record) + return True + return False + + +def provisioning_complete(context, object_id, object_type, entity): + """Mark that the provisioning for object_id has been completed by entity. + + Marks that an entity has finished provisioning an object. If there are + no remaining provisioning components, a callback will be triggered + indicating that provisioning has been completed for the object. Subscribers + to this callback must be idempotent because it may be called multiple + times in high availability deployments. + + :param context: neutron api request context + :param object_id: ID of object that has been provisioned + :param object_type: callback resource type of the object + :param entity: The entity that has provisioned the object + """ + log_dict = {'oid': object_id, 'entity': entity, 'otype': object_type} + # this can't be called in a transaction to avoid REPEATABLE READ + # tricking us into thinking there are remaining provisioning components + if context.session.is_active: + raise RuntimeError(_LE("Must not be called in a transaction")) + standard_attr_id = _get_standard_attr_id(context, object_id, + object_type) + if not standard_attr_id: + return + if remove_provisioning_component(context, object_id, object_type, entity, + standard_attr_id): + LOG.debug("Provisioning for %(otype)s %(oid)s completed by entity " + "%(entity)s.", log_dict) + # now with that committed, check if any records are left. if None, emit + # an event that provisioning is complete. + records = context.session.query(ProvisioningBlock).filter_by( + standard_attr_id=standard_attr_id).count() + if not records: + LOG.debug("Provisioning complete for %(otype)s %(oid)s", log_dict) + registry.notify(object_type, PROVISIONING_COMPLETE, + 'neutron.db.provisioning_blocks', + context=context, object_id=object_id) + + +def _get_standard_attr_id(context, object_id, object_type): + model = _RESOURCE_TO_MODEL_MAP.get(object_type) + if not model: + raise RuntimeError(_LE("Could not find model for %s. If you are " + "adding provisioning blocks for a new resource " + "you must call add_model_for_resource during " + "initialization for your type.") % object_type) + obj = (context.session.query(model).enable_eagerloads(False). + filter_by(id=object_id).first()) + if not obj: + # concurrent delete + LOG.debug("Could not find standard attr ID for object %s.", object_id) + return + return obj.standard_attr_id diff --git a/neutron/plugins/ml2/db.py b/neutron/plugins/ml2/db.py index d3043e822c7..c6eb1b9fb97 100644 --- a/neutron/plugins/ml2/db.py +++ b/neutron/plugins/ml2/db.py @@ -283,3 +283,11 @@ def get_dvr_port_bindings(session, port_id): if not bindings: LOG.debug("No bindings for DVR port %s", port_id) return bindings + + +def is_dhcp_active_on_any_subnet(context, subnet_ids): + if not subnet_ids: + return False + return bool(context.session.query(models_v2.Subnet). + enable_eagerloads(False).filter_by(enable_dhcp=True). + filter(models_v2.Subnet.id.in_(subnet_ids)).count()) diff --git a/neutron/plugins/ml2/drivers/mech_agent.py b/neutron/plugins/ml2/drivers/mech_agent.py index 9658f673f93..af78bd7d7a9 100644 --- a/neutron/plugins/ml2/drivers/mech_agent.py +++ b/neutron/plugins/ml2/drivers/mech_agent.py @@ -19,6 +19,9 @@ from oslo_log import log import six from neutron._i18n import _LW +from neutron.callbacks import resources +from neutron.common import constants +from neutron.db import provisioning_blocks from neutron.extensions import portbindings from neutron.plugins.common import constants as p_constants from neutron.plugins.ml2 import driver_api as api @@ -53,6 +56,32 @@ class AgentMechanismDriverBase(api.MechanismDriver): def initialize(self): pass + def create_port_precommit(self, context): + self._insert_provisioning_block(context) + + def update_port_precommit(self, context): + if context.host == context.original_host: + return + self._insert_provisioning_block(context) + + def _insert_provisioning_block(self, context): + # we insert a status barrier to prevent the port from transitioning + # to active until the agent reports back that the wiring is done + port = context.current + if not context.host or port['status'] == constants.PORT_STATUS_ACTIVE: + # no point in putting in a block if the status is already ACTIVE + return + vnic_type = context.current.get(portbindings.VNIC_TYPE, + portbindings.VNIC_NORMAL) + if vnic_type not in self.supported_vnic_types: + # we check the VNIC type because there could be multiple agents + # on a single host with different VNIC types + return + if context.host_agents(self.agent_type): + provisioning_blocks.add_provisioning_component( + context._plugin_context, port['id'], resources.PORT, + provisioning_blocks.L2_AGENT_ENTITY) + def bind_port(self, context): LOG.debug("Attempting to bind port %(port)s on " "network %(network)s", diff --git a/neutron/plugins/ml2/plugin.py b/neutron/plugins/ml2/plugin.py index c361475c5ab..b40eaa22db0 100644 --- a/neutron/plugins/ml2/plugin.py +++ b/neutron/plugins/ml2/plugin.py @@ -59,6 +59,7 @@ from neutron.db import external_net_db from neutron.db import extradhcpopt_db from neutron.db import models_v2 from neutron.db import netmtu_db +from neutron.db import provisioning_blocks from neutron.db.quota import driver # noqa from neutron.db import securitygroups_db from neutron.db import securitygroups_rpc_base as sg_db_rpc @@ -159,6 +160,8 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, self.type_manager.initialize() self.extension_manager.initialize() self.mechanism_manager.initialize() + registry.subscribe(self._port_provisioned, resources.PORT, + provisioning_blocks.PROVISIONING_COMPLETE) self._setup_dhcp() self._start_rpc_notifiers() self.add_agent_status_check(self.agent_health_check) @@ -195,6 +198,23 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, driver=extension_driver, service_plugin=service_plugin ) + def _port_provisioned(self, rtype, event, trigger, context, object_id, + **kwargs): + port_id = object_id + port = db.get_port(context.session, port_id) + if not port: + LOG.debug("Port %s was deleted so its status cannot be updated.", + port_id) + return + if port.port_binding.vif_type in (portbindings.VIF_TYPE_BINDING_FAILED, + portbindings.VIF_TYPE_UNBOUND): + # NOTE(kevinbenton): we hit here when a port is created without + # a host ID and the dhcp agent notifies that its wiring is done + LOG.debug("Port %s cannot update to ACTIVE because it " + "is not bound.", port_id) + return + self.update_port_status(context, port_id, const.PORT_STATUS_ACTIVE) + @property def supported_qos_rule_types(self): return self.mechanism_manager.supported_qos_rule_types @@ -1056,6 +1076,24 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, elif self._check_update_has_security_groups(port): raise psec.PortSecurityAndIPRequiredForSecurityGroups() + def _setup_dhcp_agent_provisioning_component(self, context, port): + subnet_ids = [f['subnet_id'] for f in port['fixed_ips']] + if (db.is_dhcp_active_on_any_subnet(context, subnet_ids) and + any(self.get_configuration_dict(a).get('notifies_port_ready') + for a in self.get_dhcp_agents_hosting_networks( + context, [port['network_id']]))): + # at least one of the agents will tell us when the dhcp config + # is ready so we setup a provisioning component to prevent the + # port from going ACTIVE until a dhcp_ready_on_port + # notification is received. + provisioning_blocks.add_provisioning_component( + context, port['id'], resources.PORT, + provisioning_blocks.DHCP_ENTITY) + else: + provisioning_blocks.remove_provisioning_component( + context, port['id'], resources.PORT, + provisioning_blocks.DHCP_ENTITY) + def _create_port_db(self, context, port): attrs = port[attributes.PORT] if not attrs.get('status'): @@ -1086,6 +1124,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, self._process_port_create_extra_dhcp_opts(context, result, dhcp_opts) self.mechanism_manager.create_port_precommit(mech_context) + self._setup_dhcp_agent_provisioning_component(context, result) self._apply_dict_extend_functions('ports', result, port_db) return result, mech_context @@ -1271,6 +1310,8 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, bound_mech_contexts.append(dvr_mech_context) else: self.mechanism_manager.update_port_precommit(mech_context) + self._setup_dhcp_agent_provisioning_component( + context, updated_port) bound_mech_contexts.append(mech_context) # Notifications must be sent after the above transaction is complete @@ -1572,6 +1613,16 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, if updated: self.mechanism_manager.update_port_postcommit(mech_context) + kwargs = {'context': context, 'port': mech_context.current, + 'original_port': original_port} + if status == const.PORT_STATUS_ACTIVE: + # NOTE(kevinbenton): this kwarg was carried over from + # the RPC handler that used to call this. it's not clear + # who uses it so maybe it can be removed. added in commit + # 3f3874717c07e2b469ea6c6fd52bcb4da7b380c7 + kwargs['update_device_up'] = True + registry.notify(resources.PORT, events.AFTER_UPDATE, self, + **kwargs) if port['device_owner'] == const.DEVICE_OWNER_DVR_INTERFACE: db.delete_dvr_port_binding_if_stale(session, binding) @@ -1579,20 +1630,22 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, return port['id'] def port_bound_to_host(self, context, port_id, host): + if not host: + return port = db.get_port(context.session, port_id) if not port: LOG.debug("No Port match for: %s", port_id) - return False + return if port['device_owner'] == const.DEVICE_OWNER_DVR_INTERFACE: bindings = db.get_dvr_port_bindings(context.session, port_id) for b in bindings: if b.host == host: - return True + return port LOG.debug("No binding found for DVR port %s", port['id']) - return False + return else: port_host = db.get_port_binding_host(context.session, port_id) - return (port_host == host) + return port if (port_host == host) else None def get_ports_from_devices(self, context, devices): port_ids_to_devices = dict( diff --git a/neutron/plugins/ml2/rpc.py b/neutron/plugins/ml2/rpc.py index 57992ff0681..f8c1a07ca73 100644 --- a/neutron/plugins/ml2/rpc.py +++ b/neutron/plugins/ml2/rpc.py @@ -14,7 +14,6 @@ # under the License. from neutron_lib import constants as n_const -from neutron_lib import exceptions from oslo_log import log import oslo_messaging from sqlalchemy.orm import exc @@ -22,14 +21,14 @@ from sqlalchemy.orm import exc from neutron._i18n import _LE, _LW from neutron.api.rpc.handlers import dvr_rpc from neutron.api.rpc.handlers import securitygroups_rpc as sg_rpc -from neutron.callbacks import events -from neutron.callbacks import registry from neutron.callbacks import resources from neutron.common import rpc as n_rpc from neutron.common import topics +from neutron.db import provisioning_blocks from neutron.extensions import portbindings from neutron.extensions import portsecurity as psec from neutron import manager +from neutron.plugins.ml2 import db as ml2_db from neutron.plugins.ml2 import driver_api as api from neutron.plugins.ml2.drivers import type_tunnel from neutron.services.qos import qos_consts @@ -205,31 +204,30 @@ class RpcCallbacks(type_tunnel.TunnelRpcCallbackMixin): {'device': device, 'agent_id': agent_id}) plugin = manager.NeutronManager.get_plugin() port_id = plugin._device_to_port_id(rpc_context, device) - if (host and not plugin.port_bound_to_host(rpc_context, - port_id, host)): + port = plugin.port_bound_to_host(rpc_context, port_id, host) + if host and not port: LOG.debug("Device %(device)s not bound to the" " agent host %(host)s", {'device': device, 'host': host}) return - - port_id = plugin.update_port_status(rpc_context, port_id, - n_const.PORT_STATUS_ACTIVE, - host) - try: - # NOTE(armax): it's best to remove all objects from the - # session, before we try to retrieve the new port object - rpc_context.session.expunge_all() - port = plugin._get_port(rpc_context, port_id) - except exceptions.PortNotFound: - LOG.debug('Port %s not found during update', port_id) + if port and port['device_owner'] == n_const.DEVICE_OWNER_DVR_INTERFACE: + # NOTE(kevinbenton): we have to special case DVR ports because of + # the special multi-binding status update logic they have that + # depends on the host + plugin.update_port_status(rpc_context, port_id, + n_const.PORT_STATUS_ACTIVE, host) else: - kwargs = { - 'context': rpc_context, - 'port': port, - 'update_device_up': True - } - registry.notify( - resources.PORT, events.AFTER_UPDATE, plugin, **kwargs) + # _device_to_port_id may have returned a truncated UUID if the + # agent did not provide a full one (e.g. Linux Bridge case). We + # need to look up the full one before calling provisioning_complete + if not port: + port = ml2_db.get_port(rpc_context.session, port_id) + if not port: + # port doesn't exist, no need to add a provisioning block + return + provisioning_blocks.provisioning_complete( + rpc_context, port['id'], resources.PORT, + provisioning_blocks.L2_AGENT_ENTITY) def update_device_list(self, rpc_context, **kwargs): devices_up = [] diff --git a/neutron/tests/unit/agent/dhcp/test_agent.py b/neutron/tests/unit/agent/dhcp/test_agent.py index ef83ac13507..65b7e5db766 100644 --- a/neutron/tests/unit/agent/dhcp/test_agent.py +++ b/neutron/tests/unit/agent/dhcp/test_agent.py @@ -245,6 +245,9 @@ class TestDhcpAgent(base.BaseTestCase): state_rpc_str = 'neutron.agent.rpc.PluginReportStateAPI' # sync_state is needed for this test cfg.CONF.set_override('report_interval', 1, 'AGENT') + mock_start_ready = mock.patch.object( + dhcp_agent.DhcpAgentWithStateReport, 'start_ready_ports_loop', + autospec=True).start() with mock.patch.object(dhcp_agent.DhcpAgentWithStateReport, 'sync_state', autospec=True) as mock_sync_state: @@ -267,6 +270,7 @@ class TestDhcpAgent(base.BaseTestCase): agent_mgr.after_start() mock_sync_state.assert_called_once_with(agent_mgr) mock_periodic_resync.assert_called_once_with(agent_mgr) + mock_start_ready.assert_called_once_with(agent_mgr) state_rpc.assert_has_calls( [mock.call(mock.ANY), mock.call().report_state(mock.ANY, mock.ANY, @@ -291,11 +295,13 @@ class TestDhcpAgent(base.BaseTestCase): dhcp = dhcp_agent.DhcpAgent(HOSTNAME) attrs_to_mock = dict( [(a, mock.DEFAULT) for a in - ['sync_state', 'periodic_resync']]) + ['sync_state', 'periodic_resync', + 'start_ready_ports_loop']]) with mock.patch.multiple(dhcp, **attrs_to_mock) as mocks: dhcp.run() mocks['sync_state'].assert_called_once_with() mocks['periodic_resync'].assert_called_once_with() + mocks['start_ready_ports_loop'].assert_called_once_with() def test_call_driver(self): network = mock.Mock() @@ -364,12 +370,14 @@ class TestDhcpAgent(base.BaseTestCase): with mock.patch.multiple(dhcp, **attrs_to_mock) as mocks: mocks['cache'].get_network_ids.return_value = known_net_ids + mocks['cache'].get_port_ids.return_value = range(4) dhcp.sync_state() diff = set(known_net_ids) - set(active_net_ids) exp_disable = [mock.call(net_id) for net_id in diff] mocks['cache'].assert_has_calls([mock.call.get_network_ids()]) mocks['disable_dhcp_helper'].assert_has_calls(exp_disable) + self.assertEqual(set(range(4)), dhcp.dhcp_ready_ports) def test_sync_state_initial(self): self._test_sync_state_helper([], ['a']) @@ -424,6 +432,55 @@ class TestDhcpAgent(base.BaseTestCase): dhcp.periodic_resync() spawn.assert_called_once_with(dhcp._periodic_resync_helper) + def test_start_ready_ports_loop(self): + dhcp = dhcp_agent.DhcpAgent(HOSTNAME) + with mock.patch.object(dhcp_agent.eventlet, 'spawn') as spawn: + dhcp.start_ready_ports_loop() + spawn.assert_called_once_with(dhcp._dhcp_ready_ports_loop) + + def test__dhcp_ready_ports_doesnt_log_exception_on_timeout(self): + dhcp = dhcp_agent.DhcpAgent(HOSTNAME) + dhcp.dhcp_ready_ports = set(range(4)) + + with mock.patch.object(dhcp.plugin_rpc, 'dhcp_ready_on_ports', + side_effect=oslo_messaging.MessagingTimeout): + # exit after 2 iterations + with mock.patch.object(dhcp_agent.eventlet, 'sleep', + side_effect=[0, 0, RuntimeError]): + with mock.patch.object(dhcp_agent.LOG, 'exception') as lex: + with testtools.ExpectedException(RuntimeError): + dhcp._dhcp_ready_ports_loop() + self.assertFalse(lex.called) + + def test__dhcp_ready_ports_disables_on_incompatible_server(self): + dhcp = dhcp_agent.DhcpAgent(HOSTNAME) + dhcp.agent_state = dict(configurations=dict(notifies_port_ready=True)) + dhcp.dhcp_ready_ports = set(range(4)) + + side_effect = oslo_messaging.RemoteError(exc_type='NoSuchMethod') + with mock.patch.object(dhcp.plugin_rpc, 'dhcp_ready_on_ports', + side_effect=side_effect): + with mock.patch.object(dhcp_agent.eventlet, 'sleep', + side_effect=[None, RuntimeError]) as sleep: + with testtools.ExpectedException(RuntimeError): + dhcp._dhcp_ready_ports_loop() + # should have slept for 5 minutes + sleep.assert_called_with(300) + + def test__dhcp_ready_ports_loop(self): + dhcp = dhcp_agent.DhcpAgent(HOSTNAME) + dhcp.dhcp_ready_ports = set(range(4)) + + with mock.patch.object(dhcp.plugin_rpc, 'dhcp_ready_on_ports', + side_effect=[RuntimeError, 0]) as ready: + # exit after 2 iterations + with mock.patch.object(dhcp_agent.eventlet, 'sleep', + side_effect=[0, 0, RuntimeError]): + with testtools.ExpectedException(RuntimeError): + dhcp._dhcp_ready_ports_loop() + # should have been called with all ports again after the failure + ready.assert_has_calls([mock.call(set(range(4)))] * 2) + def test_report_state_revival_logic(self): dhcp = dhcp_agent.DhcpAgentWithStateReport(HOSTNAME) with mock.patch.object(dhcp.state_rpc, @@ -1137,6 +1194,18 @@ class TestNetworkCache(base.BaseTestCase): self.assertEqual(nc.get_network_by_port_id(fake_port1.id), fake_network) + def test_get_port_ids(self): + fake_net = dhcp.NetModel( + dict(id='12345678-1234-5678-1234567890ab', + tenant_id='aaaaaaaa-aaaa-aaaa-aaaaaaaaaaaa', + subnets=[fake_subnet1], + ports=[fake_port1])) + nc = dhcp_agent.NetworkCache() + nc.put(fake_net) + nc.put_port(fake_port2) + self.assertEqual(set([fake_port1['id'], fake_port2['id']]), + set(nc.get_port_ids())) + def test_put_port(self): fake_net = dhcp.NetModel( dict(id='12345678-1234-5678-1234567890ab', diff --git a/neutron/tests/unit/api/rpc/handlers/test_dhcp_rpc.py b/neutron/tests/unit/api/rpc/handlers/test_dhcp_rpc.py index 17ef1d8c130..f8f406cb3c1 100644 --- a/neutron/tests/unit/api/rpc/handlers/test_dhcp_rpc.py +++ b/neutron/tests/unit/api/rpc/handlers/test_dhcp_rpc.py @@ -19,9 +19,11 @@ from neutron_lib import exceptions as n_exc from oslo_db import exception as db_exc from neutron.api.rpc.handlers import dhcp_rpc +from neutron.callbacks import resources from neutron.common import constants as n_const from neutron.common import exceptions from neutron.common import utils +from neutron.db import provisioning_blocks from neutron.extensions import portbindings from neutron.tests import base @@ -251,3 +253,14 @@ class TestDhcpRpcCallback(base.BaseTestCase): self.plugin.assert_has_calls([ mock.call.delete_ports_by_device_id(mock.ANY, 'devid', 'netid')]) + + def test_dhcp_ready_on_ports(self): + context = mock.Mock() + port_ids = range(10) + with mock.patch.object(provisioning_blocks, + 'provisioning_complete') as pc: + self.callbacks.dhcp_ready_on_ports(context, port_ids) + calls = [mock.call(context, port_id, resources.PORT, + provisioning_blocks.DHCP_ENTITY) + for port_id in port_ids] + pc.assert_has_calls(calls) diff --git a/neutron/tests/unit/db/test_provisioning_blocks.py b/neutron/tests/unit/db/test_provisioning_blocks.py new file mode 100644 index 00000000000..333dd1d9973 --- /dev/null +++ b/neutron/tests/unit/db/test_provisioning_blocks.py @@ -0,0 +1,130 @@ +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import mock +import testtools + +from neutron.callbacks import registry +from neutron.callbacks import resources +from neutron import context as n_ctx +from neutron.db import models_v2 +from neutron.db import provisioning_blocks as pb +from neutron.tests.unit import testlib_api + + +class TestStatusBarriers(testlib_api.SqlTestCase): + + def setUp(self): + super(TestStatusBarriers, self).setUp() + self.ctx = n_ctx.get_admin_context() + self.provisioned = mock.Mock() + self.port = self._make_port() + registry.subscribe(self.provisioned, resources.PORT, + pb.PROVISIONING_COMPLETE) + + def _make_net(self): + with self.ctx.session.begin(): + net = models_v2.Network(name='net_net', status='ACTIVE', + tenant_id='1', admin_state_up=True) + self.ctx.session.add(net) + return net + + def _make_port(self): + net = self._make_net() + with self.ctx.session.begin(): + port = models_v2.Port(networks=net, mac_address='1', tenant_id='1', + admin_state_up=True, status='DOWN', + device_id='2', device_owner='3') + self.ctx.session.add(port) + return port + + def test_no_callback_on_missing_object(self): + pb.provisioning_complete(self.ctx, 'someid', resources.PORT, 'entity') + self.assertFalse(self.provisioned.called) + + def test_provisioned_with_no_components(self): + pb.provisioning_complete(self.ctx, self.port.id, resources.PORT, + 'entity') + self.assertTrue(self.provisioned.called) + + def test_provisioned_after_component_finishes(self): + pb.add_provisioning_component(self.ctx, self.port.id, resources.PORT, + 'entity') + pb.provisioning_complete(self.ctx, self.port.id, resources.PORT, + 'entity') + self.assertTrue(self.provisioned.called) + + def test_not_provisioned_until_final_component_complete(self): + pb.add_provisioning_component(self.ctx, self.port.id, resources.PORT, + 'entity1') + pb.add_provisioning_component(self.ctx, self.port.id, resources.PORT, + 'entity2') + pb.provisioning_complete(self.ctx, self.port.id, resources.PORT, + 'entity1') + self.assertFalse(self.provisioned.called) + pb.provisioning_complete(self.ctx, self.port.id, resources.PORT, + 'entity2') + self.assertTrue(self.provisioned.called) + + def test_provisioning_of_correct_item(self): + port2 = self._make_port() + pb.add_provisioning_component(self.ctx, self.port.id, resources.PORT, + 'entity1') + pb.provisioning_complete(self.ctx, port2.id, + resources.PORT, 'entity1') + self.provisioned.assert_called_once_with( + resources.PORT, pb.PROVISIONING_COMPLETE, mock.ANY, + context=self.ctx, object_id=port2.id) + + def test_not_provisioned_when_wrong_component_reports(self): + pb.add_provisioning_component(self.ctx, self.port.id, resources.PORT, + 'entity1') + pb.provisioning_complete(self.ctx, self.port.id, + resources.PORT, 'entity2') + self.assertFalse(self.provisioned.called) + + def test_remove_provisioning_component(self): + pb.add_provisioning_component(self.ctx, self.port.id, resources.PORT, + 'e1') + pb.add_provisioning_component(self.ctx, self.port.id, resources.PORT, + 'e2') + self.assertTrue(pb.remove_provisioning_component( + self.ctx, self.port.id, resources.PORT, 'e1')) + self.assertFalse(self.provisioned.called) + pb.provisioning_complete(self.ctx, self.port.id, + resources.PORT, 'other') + self.assertFalse(self.provisioned.called) + pb.provisioning_complete(self.ctx, self.port.id, + resources.PORT, 'e2') + self.assertTrue(self.provisioned.called) + + def test_adding_component_idempotent(self): + for i in range(5): + pb.add_provisioning_component(self.ctx, self.port.id, + resources.PORT, 'entity1') + pb.provisioning_complete(self.ctx, self.port.id, + resources.PORT, 'entity1') + self.assertTrue(self.provisioned.called) + + def test_adding_component_for_new_resource_type(self): + provisioned = mock.Mock() + registry.subscribe(provisioned, 'NETWORK', pb.PROVISIONING_COMPLETE) + net = self._make_net() + # expect failed because the model was not registered for the type + with testtools.ExpectedException(RuntimeError): + pb.add_provisioning_component(self.ctx, net.id, 'NETWORK', 'ent') + pb.add_model_for_resource('NETWORK', models_v2.Network) + pb.add_provisioning_component(self.ctx, net.id, 'NETWORK', 'ent') + pb.provisioning_complete(self.ctx, net.id, 'NETWORK', 'ent') + self.assertTrue(provisioned.called) diff --git a/neutron/tests/unit/plugins/ml2/test_plugin.py b/neutron/tests/unit/plugins/ml2/test_plugin.py index 6ed3678f660..84d2964eb2e 100644 --- a/neutron/tests/unit/plugins/ml2/test_plugin.py +++ b/neutron/tests/unit/plugins/ml2/test_plugin.py @@ -39,6 +39,7 @@ from neutron.db import api as db_api from neutron.db import db_base_plugin_v2 as base_plugin from neutron.db import l3_db from neutron.db import models_v2 +from neutron.db import provisioning_blocks from neutron.extensions import availability_zone as az_ext from neutron.extensions import external_net from neutron.extensions import multiprovidernet as mpnet @@ -556,6 +557,45 @@ class TestMl2PortsV2(test_plugin.TestPortsV2, Ml2PluginV2TestCase): plugin.update_port_status(ctx, short_id, 'UP') mock_gbl.assert_called_once_with(mock.ANY, port_id, mock.ANY) + def _add_fake_dhcp_agent(self): + agent = mock.Mock(configurations='{"notifies_port_ready": true}') + plugin = manager.NeutronManager.get_plugin() + self.get_dhcp_mock = mock.patch.object( + plugin, 'get_dhcp_agents_hosting_networks', + return_value=[agent]).start() + + def test_dhcp_provisioning_blocks_inserted_on_create_with_agents(self): + self._add_fake_dhcp_agent() + with mock.patch.object(provisioning_blocks, + 'add_provisioning_component') as ap: + with self.port(): + self.assertTrue(ap.called) + + def test_dhcp_provisioning_blocks_skipped_on_create_with_no_dhcp(self): + self._add_fake_dhcp_agent() + with self.subnet(enable_dhcp=False) as subnet: + with mock.patch.object(provisioning_blocks, + 'add_provisioning_component') as ap: + with self.port(subnet=subnet): + self.assertFalse(ap.called) + + def test_dhcp_provisioning_blocks_inserted_on_update(self): + ctx = context.get_admin_context() + plugin = manager.NeutronManager.get_plugin() + self._add_fake_dhcp_agent() + with self.port() as port: + with mock.patch.object(provisioning_blocks, + 'add_provisioning_component') as ap: + port['port']['binding:host_id'] = 'newhost' + plugin.update_port(ctx, port['port']['id'], port) + self.assertTrue(ap.called) + + def test_dhcp_provisioning_blocks_removed_without_dhcp_agents(self): + with mock.patch.object(provisioning_blocks, + 'remove_provisioning_component') as cp: + with self.port(): + self.assertTrue(cp.called) + def test_update_port_fixed_ip_changed(self): ctx = context.get_admin_context() plugin = manager.NeutronManager.get_plugin() diff --git a/neutron/tests/unit/plugins/ml2/test_rpc.py b/neutron/tests/unit/plugins/ml2/test_rpc.py index 8dc7f9c700f..79e8301d947 100644 --- a/neutron/tests/unit/plugins/ml2/test_rpc.py +++ b/neutron/tests/unit/plugins/ml2/test_rpc.py @@ -21,14 +21,15 @@ import collections import mock from neutron_lib import constants -from neutron_lib import exceptions from oslo_config import cfg from oslo_context import context as oslo_context import oslo_messaging from sqlalchemy.orm import exc from neutron.agent import rpc as agent_rpc +from neutron.callbacks import resources from neutron.common import topics +from neutron.db import provisioning_blocks from neutron.plugins.ml2.drivers import type_tunnel from neutron.plugins.ml2 import managers from neutron.plugins.ml2 import rpc as plugin_rpc @@ -51,29 +52,27 @@ class RpcCallbacksTestCase(base.BaseTestCase): plugin_rpc.manager, 'NeutronManager').start() self.plugin = self.manager.get_plugin() - def _test_update_device_up(self): + def _test_update_device_up(self, host=None): kwargs = { 'agent_id': 'foo_agent', - 'device': 'foo_device' + 'device': 'foo_device', + 'host': host } with mock.patch('neutron.plugins.ml2.plugin.Ml2Plugin' '._device_to_port_id'): - with mock.patch('neutron.callbacks.registry.notify') as notify: + with mock.patch('neutron.db.provisioning_blocks.' + 'provisioning_complete') as pc: self.callbacks.update_device_up(mock.Mock(), **kwargs) - return notify + return pc def test_update_device_up_notify(self): notify = self._test_update_device_up() - kwargs = { - 'context': mock.ANY, 'port': mock.ANY, 'update_device_up': True - } - notify.assert_called_once_with( - 'port', 'after_update', self.plugin, **kwargs) + notify.assert_called_once_with(mock.ANY, mock.ANY, resources.PORT, + provisioning_blocks.L2_AGENT_ENTITY) def test_update_device_up_notify_not_sent_with_port_not_found(self): - self.plugin._get_port.side_effect = ( - exceptions.PortNotFound(port_id='foo_port_id')) - notify = self._test_update_device_up() + self.plugin.port_bound_to_host.return_value = False + notify = self._test_update_device_up('host') self.assertFalse(notify.call_count) def test_get_device_details_without_port_context(self): @@ -93,7 +92,7 @@ class RpcCallbacksTestCase(base.BaseTestCase): def test_get_device_details_port_status_equal_new_status(self): port = collections.defaultdict(lambda: 'fake') self.plugin.get_bound_port_context().current = port - self.plugin.port_bound_to_host = mock.MagicMock(return_value=True) + self.plugin.port_bound_to_host = port for admin_state_up in (True, False): new_status = (constants.PORT_STATUS_BUILD if admin_state_up else constants.PORT_STATUS_DOWN)