diff --git a/doc/source/devref/index.rst b/doc/source/devref/index.rst index 457c6efec62..8996a70bef2 100644 --- a/doc/source/devref/index.rst +++ b/doc/source/devref/index.rst @@ -76,6 +76,7 @@ Neutron Internals openvswitch_firewall network_ip_availability tag + provisioning_blocks Testing ------- diff --git a/doc/source/devref/provisioning_blocks.rst b/doc/source/devref/provisioning_blocks.rst new file mode 100644 index 00000000000..e5f764c05d1 --- /dev/null +++ b/doc/source/devref/provisioning_blocks.rst @@ -0,0 +1,159 @@ +.. + Licensed under the Apache License, Version 2.0 (the "License"); you may + not use this file except in compliance with the License. You may obtain + a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + License for the specific language governing permissions and limitations + under the License. + + + Convention for heading levels in Neutron devref: + ======= Heading 0 (reserved for the title in a document) + ------- Heading 1 + ~~~~~~~ Heading 2 + +++++++ Heading 3 + ''''''' Heading 4 + (Avoid deeper levels because they do not render well.) + + +Composite Object Status via Provisioning Blocks +=============================================== + +We use the STATUS field on objects to indicate when a resource is ready +by setting it to ACTIVE so external systems know when it's safe to use +that resource. Knowing when to set the status to ACTIVE is simple when +there is only one entity responsible for provisioning a given object. +When that entity has finishing provisioning, we just update the STATUS +directly to active. However, there are resources in Neutron that require +provisioning by multiple asynchronous entities before they are ready to +be used so managing the transition to the ACTIVE status becomes more +complex. To handle these cases, Neutron has the provisioning_blocks +module to track the entities that are still provisioning a resource. + +The main example of this is with ML2, the L2 agents and the DHCP agents. +When a port is created and bound to a host, it's placed in the DOWN +status. The L2 agent now has to setup flows, security group rules, etc +for the port and the DHCP agent has to setup a DHCP reservation for +the port's IP and MAC. Before the transition to ACTIVE, both agents +must complete their work or the port user (e.g. Nova) may attempt to +use the port and not have connectivity. To solve this, the +provisioning_blocks module is used to track the provisioning state +of each agent and the status is only updated when both complete. + + +High Level View +--------------- + +To make use of the provisioning_blocks module, provisioning components +should be added whenever there is work to be done by another entity +before an object's status can transition to ACTIVE. This is +accomplished by calling the add_provisioning_component method for +each entity. Then as each entity finishes provisioning the object, +the provisioning_complete must be called to lift the provisioning +block. + +When the last provisioning block is removed, the provisioning_blocks +module will trigger a callback notification containing the object ID +for the object's resource type with the event PROVISIONING_COMPLETE. +A subscriber to this event can now update the status of this object +to ACTIVE or perform any other necessary actions. + +A normal state transition will look something like the following: + +1. Request comes in to create an object +2. Logic on the Neutron server determines which entities are required + to provision the object and adds a provisioning component for each + entity for that object. +3. A notification is emitted to the entities so they start their work. +4. Object is returned to the API caller in the DOWN (or BUILD) state. +5. Each entity tells the server when it has finished provisioning the + object. The server calls provisioning_complete for each entity that + finishes. +6. When provisioning_complete is called on the last remaining entity, + the provisioning_blocks module will emit an event indicating that + provisioning has completed for that object. +7. A subscriber to this event on the server will then update the status + of the object to ACTIVE to indicate that it is fully provisioned. + +For a more concrete example, see the section below. + + +ML2, L2 agents, and DHCP agents +------------------------------- + +ML2 makes use of the provisioning_blocks module to prevent the status +of ports from being transitioned to ACTIVE until both the L2 agent and +the DHCP agent have finished wiring a port. + +When a port is created or updated, the following happens to register +the DHCP agent's provisioning blocks: + +1. The subnet_ids are extracted from the fixed_ips field of the port + and then ML2 checks to see if DHCP is enabled on any of the subnets. +2. The configuration for the DHCP agents hosting the network are looked + up to ensure that at least one of them is new enough to report back + that it has finished setting up the port reservation. +3. If either of the preconditions above fail, a provisioning block for + the DHCP agent is not added and any existing DHCP agent blocks for + that port are cleared to ensure the port isn't blocked waiting for an + event that will never happen. +4. If the preconditions pass, a provisioning block is added for the port + under the 'DHCP' entity. + +When a port is created or updated, the following happens to register the +L2 agent's provisioning blocks: + +1. If the port is not bound, nothing happens because we don't know yet + if an L2 agent is involved so we have to wait until a port update that + binds it. +2. Once the port is bound, the agent based mechanism drivers will check + if they have an agent on the bound host and if the VNIC type belongs + to the mechanism driver, a provisioning block is added for the port + under the 'L2 Agent' entity. + + +Once the DHCP agent has finished setting up the reservation, it calls +dhcp_ready_on_ports via the RPC API with the port ID. The DHCP RPC +handler receives this and calls 'provisioning_complete' in the +provisioning module with the port ID and the 'DHCP' entity to remove +the provisioning block. + +Once the L2 agent has finished setting up the reservation, it calls +the normal update_device_list (or update_device_up) via the RPC API. +The RPC callbacks handler calls 'provisioning_complete' with the +port ID and the 'L2 Agent' entity to remove the provisioning block. + +On the 'provisioning_complete' call that removes the last record, +the provisioning_blocks module emits a callback PROVISIONING_COMPLETE +event with the port ID. A function subscribed to this in ML2 then calls +update_port_status to set the port to ACTIVE. + +At this point the normal notification is emitted to Nova allowing the +VM to be unpaused. + +In the event that the DHCP or L2 agent is down, the port will not +transition to the ACTIVE status (as is the case now if the L2 agent +is down). Agents must account for this by telling the server that +wiring has been completed after configuring everything during +startup. This ensures that ports created on offline agents (or agents +that crash and restart) eventually become active. + +To account for server instability, the notifications about port wiring +be complete must use RPC calls so the agent gets a positive +acknowledgement from the server and it must keep retrying until either +the port is deleted or it is successful. + +If an ML2 driver immediately places a bound port in the ACTIVE state +(e.g. after calling a backend in update_port_postcommit), this patch +will not have any impact on that process. + + +References +---------- + +.. [#] Provisioning Blocks Module: http://git.openstack.org/cgit/openstack/neutron/tree/neutron/db/provisioning_blocks.py diff --git a/neutron/agent/dhcp/agent.py b/neutron/agent/dhcp/agent.py index b2e436b4369..1e3505a46fd 100644 --- a/neutron/agent/dhcp/agent.py +++ b/neutron/agent/dhcp/agent.py @@ -54,6 +54,7 @@ class DhcpAgent(manager.Manager): def __init__(self, host=None, conf=None): super(DhcpAgent, self).__init__(host=host) self.needs_resync_reasons = collections.defaultdict(list) + self.dhcp_ready_ports = set() self.conf = conf or cfg.CONF self.cache = NetworkCache() self.dhcp_driver_cls = importutils.import_class(self.conf.dhcp_driver) @@ -97,6 +98,7 @@ class DhcpAgent(manager.Manager): """Activate the DHCP agent.""" self.sync_state() self.periodic_resync() + self.start_ready_ports_loop() def call_driver(self, action, network, **action_kwargs): """Invoke an action on a DHCP driver instance.""" @@ -169,6 +171,9 @@ class DhcpAgent(manager.Manager): network.id in only_nets): # specific network to sync pool.spawn(self.safe_configure_dhcp_for_network, network) pool.waitall() + # we notify all ports in case some were created while the agent + # was down + self.dhcp_ready_ports |= set(self.cache.get_port_ids()) LOG.info(_LI('Synchronizing state complete')) except Exception as e: @@ -179,6 +184,37 @@ class DhcpAgent(manager.Manager): self.schedule_resync(e) LOG.exception(_LE('Unable to sync network state.')) + def _dhcp_ready_ports_loop(self): + """Notifies the server of any ports that had reservations setup.""" + while True: + # this is just watching a set so we can do it really frequently + eventlet.sleep(0.1) + if self.dhcp_ready_ports: + ports_to_send = self.dhcp_ready_ports + self.dhcp_ready_ports = set() + try: + self.plugin_rpc.dhcp_ready_on_ports(ports_to_send) + continue + except oslo_messaging.MessagingTimeout: + LOG.error(_LE("Timeout notifying server of ports ready. " + "Retrying...")) + except Exception as e: + if (isinstance(e, oslo_messaging.RemoteError) + and e.exc_type == 'NoSuchMethod'): + LOG.info(_LI("Server does not support port ready " + "notifications. Waiting for 5 minutes " + "before retrying.")) + eventlet.sleep(300) + continue + LOG.exception(_LE("Failure notifying DHCP server of " + "ready DHCP ports. Will retry on next " + "iteration.")) + self.dhcp_ready_ports |= ports_to_send + + def start_ready_ports_loop(self): + """Spawn a thread to push changed ports to server.""" + eventlet.spawn(self._dhcp_ready_ports_loop) + @utils.exception_logger() def _periodic_resync_helper(self): """Resync the dhcp state at the configured interval.""" @@ -348,6 +384,7 @@ class DhcpAgent(manager.Manager): driver_action = 'restart' self.cache.put_port(updated_port) self.call_driver(driver_action, network) + self.dhcp_ready_ports.add(updated_port.id) def _is_port_on_this_agent(self, port): thishost = utils.get_dhcp_agent_device_id( @@ -421,6 +458,7 @@ class DhcpPluginApi(object): 1.0 - Initial version. 1.1 - Added get_active_networks_info, create_dhcp_port, and update_dhcp_port methods. + 1.5 - Added dhcp_ready_on_ports """ @@ -471,6 +509,12 @@ class DhcpPluginApi(object): network_id=network_id, device_id=device_id, host=self.host) + def dhcp_ready_on_ports(self, port_ids): + """Notify the server that DHCP is configured for the port.""" + cctxt = self.client.prepare(version='1.5') + return cctxt.call(self.context, 'dhcp_ready_on_ports', + port_ids=port_ids) + class NetworkCache(object): """Agent cache of the current network state.""" @@ -479,6 +523,9 @@ class NetworkCache(object): self.subnet_lookup = {} self.port_lookup = {} + def get_port_ids(self): + return self.port_lookup.keys() + def get_network_ids(self): return self.cache.keys() @@ -563,6 +610,7 @@ class DhcpAgentWithStateReport(DhcpAgent): 'availability_zone': self.conf.AGENT.availability_zone, 'topic': topics.DHCP_AGENT, 'configurations': { + 'notifies_port_ready': True, 'dhcp_driver': self.conf.dhcp_driver, 'dhcp_lease_duration': self.conf.dhcp_lease_duration, 'log_agent_heartbeats': self.conf.AGENT.log_agent_heartbeats}, diff --git a/neutron/api/rpc/handlers/dhcp_rpc.py b/neutron/api/rpc/handlers/dhcp_rpc.py index 8722187452e..cffe7eae8ea 100644 --- a/neutron/api/rpc/handlers/dhcp_rpc.py +++ b/neutron/api/rpc/handlers/dhcp_rpc.py @@ -26,10 +26,12 @@ import oslo_messaging from oslo_utils import excutils from neutron._i18n import _, _LW +from neutron.callbacks import resources from neutron.common import constants as n_const from neutron.common import exceptions as n_exc from neutron.common import utils from neutron.db import api as db_api +from neutron.db import provisioning_blocks from neutron.extensions import portbindings from neutron import manager from neutron.plugins.common import utils as p_utils @@ -64,9 +66,10 @@ class DhcpRpcCallback(object): # 1.4 - Removed update_lease_expiration. It's not used by reference # DHCP agent since Juno, so similar rationale for not bumping the # major version as above applies here too. + # 1.5 - Added dhcp_ready_on_ports. target = oslo_messaging.Target( namespace=n_const.RPC_NAMESPACE_DHCP_PLUGIN, - version='1.4') + version='1.5') def _get_active_networks(self, context, **kwargs): """Retrieve and return a list of the active networks.""" @@ -225,3 +228,9 @@ class DhcpRpcCallback(object): {'port': port, 'host': host}) return self._port_action(plugin, context, port, 'update_port') + + def dhcp_ready_on_ports(self, context, port_ids): + for port_id in port_ids: + provisioning_blocks.provisioning_complete( + context, port_id, resources.PORT, + provisioning_blocks.DHCP_ENTITY) diff --git a/neutron/db/migration/alembic_migrations/versions/EXPAND_HEAD b/neutron/db/migration/alembic_migrations/versions/EXPAND_HEAD index 95db9b9d117..cf26eb1f33e 100644 --- a/neutron/db/migration/alembic_migrations/versions/EXPAND_HEAD +++ b/neutron/db/migration/alembic_migrations/versions/EXPAND_HEAD @@ -1 +1 @@ -d3435b514502 +30107ab6a3ee diff --git a/neutron/db/migration/alembic_migrations/versions/newton/expand/30107ab6a3ee_provisioning_blocks.py b/neutron/db/migration/alembic_migrations/versions/newton/expand/30107ab6a3ee_provisioning_blocks.py new file mode 100644 index 00000000000..01d56f39f6c --- /dev/null +++ b/neutron/db/migration/alembic_migrations/versions/newton/expand/30107ab6a3ee_provisioning_blocks.py @@ -0,0 +1,39 @@ +# Copyright 2015 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +"""provisioning_blocks.py + +Revision ID: 30107ab6a3ee +Revises: d3435b514502 +Create Date: 2016-04-15 05:59:59.000001 + +""" + +# revision identifiers, used by Alembic. +revision = '30107ab6a3ee' +down_revision = 'd3435b514502' +from alembic import op +import sqlalchemy as sa + + +def upgrade(): + op.create_table( + 'provisioningblocks', + sa.Column('standard_attr_id', sa.BigInteger(), + sa.ForeignKey('standardattributes.id', ondelete='CASCADE'), + nullable=False, primary_key=True), + sa.Column('entity', sa.String(length=255), nullable=False, + primary_key=True), + ) diff --git a/neutron/db/migration/models/head.py b/neutron/db/migration/models/head.py index 1626e6833e4..6112dbf68ab 100644 --- a/neutron/db/migration/models/head.py +++ b/neutron/db/migration/models/head.py @@ -42,6 +42,7 @@ from neutron.db import model_base from neutron.db import models_v2 # noqa from neutron.db import portbindings_db # noqa from neutron.db import portsecurity_db # noqa +from neutron.db import provisioning_blocks # noqa from neutron.db.qos import models as qos_models # noqa from neutron.db.quota import models # noqa from neutron.db import rbac_db_models # noqa diff --git a/neutron/db/provisioning_blocks.py b/neutron/db/provisioning_blocks.py new file mode 100644 index 00000000000..d55870b7946 --- /dev/null +++ b/neutron/db/provisioning_blocks.py @@ -0,0 +1,168 @@ +# Copyright 2016 Mirantis, Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +from oslo_db import exception as db_exc +from oslo_log import log as logging +import sqlalchemy as sa + +from neutron._i18n import _LE +from neutron.callbacks import registry +from neutron.callbacks import resources +from neutron.db import api as db_api +from neutron.db import model_base +from neutron.db import models_v2 + +LOG = logging.getLogger(__name__) +PROVISIONING_COMPLETE = 'provisioning_complete' +# identifiers for the various entities that participate in provisioning +DHCP_ENTITY = 'DHCP' +L2_AGENT_ENTITY = 'L2' +_RESOURCE_TO_MODEL_MAP = {resources.PORT: models_v2.Port} + + +class ProvisioningBlock(model_base.BASEV2): + # the standard attr id of the thing we want to block + standard_attr_id = ( + sa.Column(sa.BigInteger().with_variant(sa.Integer(), 'sqlite'), + sa.ForeignKey(model_base.StandardAttribute.id, + ondelete="CASCADE"), + primary_key=True)) + # the entity that wants to block the status change (e.g. L2 Agent) + entity = sa.Column(sa.String(255), nullable=False, primary_key=True) + + +def add_model_for_resource(resource, model): + """Adds a mapping between a callback resource and a DB model.""" + _RESOURCE_TO_MODEL_MAP[resource] = model + + +def add_provisioning_component(context, object_id, object_type, entity): + """Adds a provisioning block by an entity to a given object. + + Adds a provisioning block to the DB for object_id with an identifier + of the entity that is doing the provisioning. While an object has these + provisioning blocks present, this module will not emit any callback events + indicating that provisioning has completed. Any logic that depends on + multiple disjoint components use these blocks and subscribe to the + PROVISIONING_COMPLETE event to know when all components have completed. + + :param context: neutron api request context + :param object_id: ID of object that has been provisioned + :param object_type: callback resource type of the object + :param entity: The entity that has provisioned the object + """ + log_dict = {'entity': entity, 'oid': object_id, 'otype': object_type} + # we get an object's ID, so we need to convert that into a standard attr id + standard_attr_id = _get_standard_attr_id(context, object_id, object_type) + if not standard_attr_id: + return + try: + with db_api.autonested_transaction(context.session): + record = ProvisioningBlock(standard_attr_id=standard_attr_id, + entity=entity) + context.session.add(record) + except db_exc.DBDuplicateEntry: + # an entry could be leftover from a previous transition that hasn't + # yet been provisioned. (e.g. multiple updates in a short period) + LOG.debug("Ignored duplicate provisioning block setup for %(otype)s " + "%(oid)s by entity %(entity)s.", log_dict) + return + LOG.debug("Transition to ACTIVE for %(otype)s object %(oid)s " + "will not be triggered until provisioned by entity %(entity)s.", + log_dict) + + +def remove_provisioning_component(context, object_id, object_type, entity, + standard_attr_id=None): + """Removes a provisioning block for an object with triggering a callback. + + Removes a provisioning block without triggering a callback. A user of this + module should call this when a block is no longer correct. If the block has + been satisfied, the 'provisioning_complete' method should be called. + + :param context: neutron api request context + :param object_id: ID of object that has been provisioned + :param object_type: callback resource type of the object + :param entity: The entity that has provisioned the object + :param standard_attr_id: Optional ID to pass to the function to avoid the + extra DB lookup to translate the object_id into + the standard_attr_id. + :return: boolean indicating whether or not a record was deleted + """ + with context.session.begin(subtransactions=True): + standard_attr_id = standard_attr_id or _get_standard_attr_id( + context, object_id, object_type) + if not standard_attr_id: + return False + record = context.session.query(ProvisioningBlock).filter_by( + standard_attr_id=standard_attr_id, entity=entity).first() + if record: + context.session.delete(record) + return True + return False + + +def provisioning_complete(context, object_id, object_type, entity): + """Mark that the provisioning for object_id has been completed by entity. + + Marks that an entity has finished provisioning an object. If there are + no remaining provisioning components, a callback will be triggered + indicating that provisioning has been completed for the object. Subscribers + to this callback must be idempotent because it may be called multiple + times in high availability deployments. + + :param context: neutron api request context + :param object_id: ID of object that has been provisioned + :param object_type: callback resource type of the object + :param entity: The entity that has provisioned the object + """ + log_dict = {'oid': object_id, 'entity': entity, 'otype': object_type} + # this can't be called in a transaction to avoid REPEATABLE READ + # tricking us into thinking there are remaining provisioning components + if context.session.is_active: + raise RuntimeError(_LE("Must not be called in a transaction")) + standard_attr_id = _get_standard_attr_id(context, object_id, + object_type) + if not standard_attr_id: + return + if remove_provisioning_component(context, object_id, object_type, entity, + standard_attr_id): + LOG.debug("Provisioning for %(otype)s %(oid)s completed by entity " + "%(entity)s.", log_dict) + # now with that committed, check if any records are left. if None, emit + # an event that provisioning is complete. + records = context.session.query(ProvisioningBlock).filter_by( + standard_attr_id=standard_attr_id).count() + if not records: + LOG.debug("Provisioning complete for %(otype)s %(oid)s", log_dict) + registry.notify(object_type, PROVISIONING_COMPLETE, + 'neutron.db.provisioning_blocks', + context=context, object_id=object_id) + + +def _get_standard_attr_id(context, object_id, object_type): + model = _RESOURCE_TO_MODEL_MAP.get(object_type) + if not model: + raise RuntimeError(_LE("Could not find model for %s. If you are " + "adding provisioning blocks for a new resource " + "you must call add_model_for_resource during " + "initialization for your type.") % object_type) + obj = (context.session.query(model).enable_eagerloads(False). + filter_by(id=object_id).first()) + if not obj: + # concurrent delete + LOG.debug("Could not find standard attr ID for object %s.", object_id) + return + return obj.standard_attr_id diff --git a/neutron/plugins/ml2/db.py b/neutron/plugins/ml2/db.py index d3043e822c7..c6eb1b9fb97 100644 --- a/neutron/plugins/ml2/db.py +++ b/neutron/plugins/ml2/db.py @@ -283,3 +283,11 @@ def get_dvr_port_bindings(session, port_id): if not bindings: LOG.debug("No bindings for DVR port %s", port_id) return bindings + + +def is_dhcp_active_on_any_subnet(context, subnet_ids): + if not subnet_ids: + return False + return bool(context.session.query(models_v2.Subnet). + enable_eagerloads(False).filter_by(enable_dhcp=True). + filter(models_v2.Subnet.id.in_(subnet_ids)).count()) diff --git a/neutron/plugins/ml2/drivers/mech_agent.py b/neutron/plugins/ml2/drivers/mech_agent.py index 9658f673f93..af78bd7d7a9 100644 --- a/neutron/plugins/ml2/drivers/mech_agent.py +++ b/neutron/plugins/ml2/drivers/mech_agent.py @@ -19,6 +19,9 @@ from oslo_log import log import six from neutron._i18n import _LW +from neutron.callbacks import resources +from neutron.common import constants +from neutron.db import provisioning_blocks from neutron.extensions import portbindings from neutron.plugins.common import constants as p_constants from neutron.plugins.ml2 import driver_api as api @@ -53,6 +56,32 @@ class AgentMechanismDriverBase(api.MechanismDriver): def initialize(self): pass + def create_port_precommit(self, context): + self._insert_provisioning_block(context) + + def update_port_precommit(self, context): + if context.host == context.original_host: + return + self._insert_provisioning_block(context) + + def _insert_provisioning_block(self, context): + # we insert a status barrier to prevent the port from transitioning + # to active until the agent reports back that the wiring is done + port = context.current + if not context.host or port['status'] == constants.PORT_STATUS_ACTIVE: + # no point in putting in a block if the status is already ACTIVE + return + vnic_type = context.current.get(portbindings.VNIC_TYPE, + portbindings.VNIC_NORMAL) + if vnic_type not in self.supported_vnic_types: + # we check the VNIC type because there could be multiple agents + # on a single host with different VNIC types + return + if context.host_agents(self.agent_type): + provisioning_blocks.add_provisioning_component( + context._plugin_context, port['id'], resources.PORT, + provisioning_blocks.L2_AGENT_ENTITY) + def bind_port(self, context): LOG.debug("Attempting to bind port %(port)s on " "network %(network)s", diff --git a/neutron/plugins/ml2/plugin.py b/neutron/plugins/ml2/plugin.py index c361475c5ab..b40eaa22db0 100644 --- a/neutron/plugins/ml2/plugin.py +++ b/neutron/plugins/ml2/plugin.py @@ -59,6 +59,7 @@ from neutron.db import external_net_db from neutron.db import extradhcpopt_db from neutron.db import models_v2 from neutron.db import netmtu_db +from neutron.db import provisioning_blocks from neutron.db.quota import driver # noqa from neutron.db import securitygroups_db from neutron.db import securitygroups_rpc_base as sg_db_rpc @@ -159,6 +160,8 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, self.type_manager.initialize() self.extension_manager.initialize() self.mechanism_manager.initialize() + registry.subscribe(self._port_provisioned, resources.PORT, + provisioning_blocks.PROVISIONING_COMPLETE) self._setup_dhcp() self._start_rpc_notifiers() self.add_agent_status_check(self.agent_health_check) @@ -195,6 +198,23 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, driver=extension_driver, service_plugin=service_plugin ) + def _port_provisioned(self, rtype, event, trigger, context, object_id, + **kwargs): + port_id = object_id + port = db.get_port(context.session, port_id) + if not port: + LOG.debug("Port %s was deleted so its status cannot be updated.", + port_id) + return + if port.port_binding.vif_type in (portbindings.VIF_TYPE_BINDING_FAILED, + portbindings.VIF_TYPE_UNBOUND): + # NOTE(kevinbenton): we hit here when a port is created without + # a host ID and the dhcp agent notifies that its wiring is done + LOG.debug("Port %s cannot update to ACTIVE because it " + "is not bound.", port_id) + return + self.update_port_status(context, port_id, const.PORT_STATUS_ACTIVE) + @property def supported_qos_rule_types(self): return self.mechanism_manager.supported_qos_rule_types @@ -1056,6 +1076,24 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, elif self._check_update_has_security_groups(port): raise psec.PortSecurityAndIPRequiredForSecurityGroups() + def _setup_dhcp_agent_provisioning_component(self, context, port): + subnet_ids = [f['subnet_id'] for f in port['fixed_ips']] + if (db.is_dhcp_active_on_any_subnet(context, subnet_ids) and + any(self.get_configuration_dict(a).get('notifies_port_ready') + for a in self.get_dhcp_agents_hosting_networks( + context, [port['network_id']]))): + # at least one of the agents will tell us when the dhcp config + # is ready so we setup a provisioning component to prevent the + # port from going ACTIVE until a dhcp_ready_on_port + # notification is received. + provisioning_blocks.add_provisioning_component( + context, port['id'], resources.PORT, + provisioning_blocks.DHCP_ENTITY) + else: + provisioning_blocks.remove_provisioning_component( + context, port['id'], resources.PORT, + provisioning_blocks.DHCP_ENTITY) + def _create_port_db(self, context, port): attrs = port[attributes.PORT] if not attrs.get('status'): @@ -1086,6 +1124,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, self._process_port_create_extra_dhcp_opts(context, result, dhcp_opts) self.mechanism_manager.create_port_precommit(mech_context) + self._setup_dhcp_agent_provisioning_component(context, result) self._apply_dict_extend_functions('ports', result, port_db) return result, mech_context @@ -1271,6 +1310,8 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, bound_mech_contexts.append(dvr_mech_context) else: self.mechanism_manager.update_port_precommit(mech_context) + self._setup_dhcp_agent_provisioning_component( + context, updated_port) bound_mech_contexts.append(mech_context) # Notifications must be sent after the above transaction is complete @@ -1572,6 +1613,16 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, if updated: self.mechanism_manager.update_port_postcommit(mech_context) + kwargs = {'context': context, 'port': mech_context.current, + 'original_port': original_port} + if status == const.PORT_STATUS_ACTIVE: + # NOTE(kevinbenton): this kwarg was carried over from + # the RPC handler that used to call this. it's not clear + # who uses it so maybe it can be removed. added in commit + # 3f3874717c07e2b469ea6c6fd52bcb4da7b380c7 + kwargs['update_device_up'] = True + registry.notify(resources.PORT, events.AFTER_UPDATE, self, + **kwargs) if port['device_owner'] == const.DEVICE_OWNER_DVR_INTERFACE: db.delete_dvr_port_binding_if_stale(session, binding) @@ -1579,20 +1630,22 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, return port['id'] def port_bound_to_host(self, context, port_id, host): + if not host: + return port = db.get_port(context.session, port_id) if not port: LOG.debug("No Port match for: %s", port_id) - return False + return if port['device_owner'] == const.DEVICE_OWNER_DVR_INTERFACE: bindings = db.get_dvr_port_bindings(context.session, port_id) for b in bindings: if b.host == host: - return True + return port LOG.debug("No binding found for DVR port %s", port['id']) - return False + return else: port_host = db.get_port_binding_host(context.session, port_id) - return (port_host == host) + return port if (port_host == host) else None def get_ports_from_devices(self, context, devices): port_ids_to_devices = dict( diff --git a/neutron/plugins/ml2/rpc.py b/neutron/plugins/ml2/rpc.py index 57992ff0681..f8c1a07ca73 100644 --- a/neutron/plugins/ml2/rpc.py +++ b/neutron/plugins/ml2/rpc.py @@ -14,7 +14,6 @@ # under the License. from neutron_lib import constants as n_const -from neutron_lib import exceptions from oslo_log import log import oslo_messaging from sqlalchemy.orm import exc @@ -22,14 +21,14 @@ from sqlalchemy.orm import exc from neutron._i18n import _LE, _LW from neutron.api.rpc.handlers import dvr_rpc from neutron.api.rpc.handlers import securitygroups_rpc as sg_rpc -from neutron.callbacks import events -from neutron.callbacks import registry from neutron.callbacks import resources from neutron.common import rpc as n_rpc from neutron.common import topics +from neutron.db import provisioning_blocks from neutron.extensions import portbindings from neutron.extensions import portsecurity as psec from neutron import manager +from neutron.plugins.ml2 import db as ml2_db from neutron.plugins.ml2 import driver_api as api from neutron.plugins.ml2.drivers import type_tunnel from neutron.services.qos import qos_consts @@ -205,31 +204,30 @@ class RpcCallbacks(type_tunnel.TunnelRpcCallbackMixin): {'device': device, 'agent_id': agent_id}) plugin = manager.NeutronManager.get_plugin() port_id = plugin._device_to_port_id(rpc_context, device) - if (host and not plugin.port_bound_to_host(rpc_context, - port_id, host)): + port = plugin.port_bound_to_host(rpc_context, port_id, host) + if host and not port: LOG.debug("Device %(device)s not bound to the" " agent host %(host)s", {'device': device, 'host': host}) return - - port_id = plugin.update_port_status(rpc_context, port_id, - n_const.PORT_STATUS_ACTIVE, - host) - try: - # NOTE(armax): it's best to remove all objects from the - # session, before we try to retrieve the new port object - rpc_context.session.expunge_all() - port = plugin._get_port(rpc_context, port_id) - except exceptions.PortNotFound: - LOG.debug('Port %s not found during update', port_id) + if port and port['device_owner'] == n_const.DEVICE_OWNER_DVR_INTERFACE: + # NOTE(kevinbenton): we have to special case DVR ports because of + # the special multi-binding status update logic they have that + # depends on the host + plugin.update_port_status(rpc_context, port_id, + n_const.PORT_STATUS_ACTIVE, host) else: - kwargs = { - 'context': rpc_context, - 'port': port, - 'update_device_up': True - } - registry.notify( - resources.PORT, events.AFTER_UPDATE, plugin, **kwargs) + # _device_to_port_id may have returned a truncated UUID if the + # agent did not provide a full one (e.g. Linux Bridge case). We + # need to look up the full one before calling provisioning_complete + if not port: + port = ml2_db.get_port(rpc_context.session, port_id) + if not port: + # port doesn't exist, no need to add a provisioning block + return + provisioning_blocks.provisioning_complete( + rpc_context, port['id'], resources.PORT, + provisioning_blocks.L2_AGENT_ENTITY) def update_device_list(self, rpc_context, **kwargs): devices_up = [] diff --git a/neutron/tests/unit/agent/dhcp/test_agent.py b/neutron/tests/unit/agent/dhcp/test_agent.py index c9ba5478cec..fde08bde6d7 100644 --- a/neutron/tests/unit/agent/dhcp/test_agent.py +++ b/neutron/tests/unit/agent/dhcp/test_agent.py @@ -245,6 +245,9 @@ class TestDhcpAgent(base.BaseTestCase): state_rpc_str = 'neutron.agent.rpc.PluginReportStateAPI' # sync_state is needed for this test cfg.CONF.set_override('report_interval', 1, 'AGENT') + mock_start_ready = mock.patch.object( + dhcp_agent.DhcpAgentWithStateReport, 'start_ready_ports_loop', + autospec=True).start() with mock.patch.object(dhcp_agent.DhcpAgentWithStateReport, 'sync_state', autospec=True) as mock_sync_state: @@ -267,6 +270,7 @@ class TestDhcpAgent(base.BaseTestCase): agent_mgr.after_start() mock_sync_state.assert_called_once_with(agent_mgr) mock_periodic_resync.assert_called_once_with(agent_mgr) + mock_start_ready.assert_called_once_with(agent_mgr) state_rpc.assert_has_calls( [mock.call(mock.ANY), mock.call().report_state(mock.ANY, mock.ANY, @@ -277,11 +281,13 @@ class TestDhcpAgent(base.BaseTestCase): dhcp = dhcp_agent.DhcpAgent(HOSTNAME) attrs_to_mock = dict( [(a, mock.DEFAULT) for a in - ['sync_state', 'periodic_resync']]) + ['sync_state', 'periodic_resync', + 'start_ready_ports_loop']]) with mock.patch.multiple(dhcp, **attrs_to_mock) as mocks: dhcp.run() mocks['sync_state'].assert_called_once_with() mocks['periodic_resync'].assert_called_once_with() + mocks['start_ready_ports_loop'].assert_called_once_with() def test_call_driver(self): network = mock.Mock() @@ -350,12 +356,14 @@ class TestDhcpAgent(base.BaseTestCase): with mock.patch.multiple(dhcp, **attrs_to_mock) as mocks: mocks['cache'].get_network_ids.return_value = known_net_ids + mocks['cache'].get_port_ids.return_value = range(4) dhcp.sync_state() diff = set(known_net_ids) - set(active_net_ids) exp_disable = [mock.call(net_id) for net_id in diff] mocks['cache'].assert_has_calls([mock.call.get_network_ids()]) mocks['disable_dhcp_helper'].assert_has_calls(exp_disable) + self.assertEqual(set(range(4)), dhcp.dhcp_ready_ports) def test_sync_state_initial(self): self._test_sync_state_helper([], ['a']) @@ -410,6 +418,55 @@ class TestDhcpAgent(base.BaseTestCase): dhcp.periodic_resync() spawn.assert_called_once_with(dhcp._periodic_resync_helper) + def test_start_ready_ports_loop(self): + dhcp = dhcp_agent.DhcpAgent(HOSTNAME) + with mock.patch.object(dhcp_agent.eventlet, 'spawn') as spawn: + dhcp.start_ready_ports_loop() + spawn.assert_called_once_with(dhcp._dhcp_ready_ports_loop) + + def test__dhcp_ready_ports_doesnt_log_exception_on_timeout(self): + dhcp = dhcp_agent.DhcpAgent(HOSTNAME) + dhcp.dhcp_ready_ports = set(range(4)) + + with mock.patch.object(dhcp.plugin_rpc, 'dhcp_ready_on_ports', + side_effect=oslo_messaging.MessagingTimeout): + # exit after 2 iterations + with mock.patch.object(dhcp_agent.eventlet, 'sleep', + side_effect=[0, 0, RuntimeError]): + with mock.patch.object(dhcp_agent.LOG, 'exception') as lex: + with testtools.ExpectedException(RuntimeError): + dhcp._dhcp_ready_ports_loop() + self.assertFalse(lex.called) + + def test__dhcp_ready_ports_disables_on_incompatible_server(self): + dhcp = dhcp_agent.DhcpAgent(HOSTNAME) + dhcp.agent_state = dict(configurations=dict(notifies_port_ready=True)) + dhcp.dhcp_ready_ports = set(range(4)) + + side_effect = oslo_messaging.RemoteError(exc_type='NoSuchMethod') + with mock.patch.object(dhcp.plugin_rpc, 'dhcp_ready_on_ports', + side_effect=side_effect): + with mock.patch.object(dhcp_agent.eventlet, 'sleep', + side_effect=[None, RuntimeError]) as sleep: + with testtools.ExpectedException(RuntimeError): + dhcp._dhcp_ready_ports_loop() + # should have slept for 5 minutes + sleep.assert_called_with(300) + + def test__dhcp_ready_ports_loop(self): + dhcp = dhcp_agent.DhcpAgent(HOSTNAME) + dhcp.dhcp_ready_ports = set(range(4)) + + with mock.patch.object(dhcp.plugin_rpc, 'dhcp_ready_on_ports', + side_effect=[RuntimeError, 0]) as ready: + # exit after 2 iterations + with mock.patch.object(dhcp_agent.eventlet, 'sleep', + side_effect=[0, 0, RuntimeError]): + with testtools.ExpectedException(RuntimeError): + dhcp._dhcp_ready_ports_loop() + # should have been called with all ports again after the failure + ready.assert_has_calls([mock.call(set(range(4)))] * 2) + def test_report_state_revival_logic(self): dhcp = dhcp_agent.DhcpAgentWithStateReport(HOSTNAME) with mock.patch.object(dhcp.state_rpc, @@ -1123,6 +1180,18 @@ class TestNetworkCache(base.BaseTestCase): self.assertEqual(nc.get_network_by_port_id(fake_port1.id), fake_network) + def test_get_port_ids(self): + fake_net = dhcp.NetModel( + dict(id='12345678-1234-5678-1234567890ab', + tenant_id='aaaaaaaa-aaaa-aaaa-aaaaaaaaaaaa', + subnets=[fake_subnet1], + ports=[fake_port1])) + nc = dhcp_agent.NetworkCache() + nc.put(fake_net) + nc.put_port(fake_port2) + self.assertEqual(set([fake_port1['id'], fake_port2['id']]), + set(nc.get_port_ids())) + def test_put_port(self): fake_net = dhcp.NetModel( dict(id='12345678-1234-5678-1234567890ab', diff --git a/neutron/tests/unit/api/rpc/handlers/test_dhcp_rpc.py b/neutron/tests/unit/api/rpc/handlers/test_dhcp_rpc.py index 17ef1d8c130..f8f406cb3c1 100644 --- a/neutron/tests/unit/api/rpc/handlers/test_dhcp_rpc.py +++ b/neutron/tests/unit/api/rpc/handlers/test_dhcp_rpc.py @@ -19,9 +19,11 @@ from neutron_lib import exceptions as n_exc from oslo_db import exception as db_exc from neutron.api.rpc.handlers import dhcp_rpc +from neutron.callbacks import resources from neutron.common import constants as n_const from neutron.common import exceptions from neutron.common import utils +from neutron.db import provisioning_blocks from neutron.extensions import portbindings from neutron.tests import base @@ -251,3 +253,14 @@ class TestDhcpRpcCallback(base.BaseTestCase): self.plugin.assert_has_calls([ mock.call.delete_ports_by_device_id(mock.ANY, 'devid', 'netid')]) + + def test_dhcp_ready_on_ports(self): + context = mock.Mock() + port_ids = range(10) + with mock.patch.object(provisioning_blocks, + 'provisioning_complete') as pc: + self.callbacks.dhcp_ready_on_ports(context, port_ids) + calls = [mock.call(context, port_id, resources.PORT, + provisioning_blocks.DHCP_ENTITY) + for port_id in port_ids] + pc.assert_has_calls(calls) diff --git a/neutron/tests/unit/db/test_provisioning_blocks.py b/neutron/tests/unit/db/test_provisioning_blocks.py new file mode 100644 index 00000000000..333dd1d9973 --- /dev/null +++ b/neutron/tests/unit/db/test_provisioning_blocks.py @@ -0,0 +1,130 @@ +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import mock +import testtools + +from neutron.callbacks import registry +from neutron.callbacks import resources +from neutron import context as n_ctx +from neutron.db import models_v2 +from neutron.db import provisioning_blocks as pb +from neutron.tests.unit import testlib_api + + +class TestStatusBarriers(testlib_api.SqlTestCase): + + def setUp(self): + super(TestStatusBarriers, self).setUp() + self.ctx = n_ctx.get_admin_context() + self.provisioned = mock.Mock() + self.port = self._make_port() + registry.subscribe(self.provisioned, resources.PORT, + pb.PROVISIONING_COMPLETE) + + def _make_net(self): + with self.ctx.session.begin(): + net = models_v2.Network(name='net_net', status='ACTIVE', + tenant_id='1', admin_state_up=True) + self.ctx.session.add(net) + return net + + def _make_port(self): + net = self._make_net() + with self.ctx.session.begin(): + port = models_v2.Port(networks=net, mac_address='1', tenant_id='1', + admin_state_up=True, status='DOWN', + device_id='2', device_owner='3') + self.ctx.session.add(port) + return port + + def test_no_callback_on_missing_object(self): + pb.provisioning_complete(self.ctx, 'someid', resources.PORT, 'entity') + self.assertFalse(self.provisioned.called) + + def test_provisioned_with_no_components(self): + pb.provisioning_complete(self.ctx, self.port.id, resources.PORT, + 'entity') + self.assertTrue(self.provisioned.called) + + def test_provisioned_after_component_finishes(self): + pb.add_provisioning_component(self.ctx, self.port.id, resources.PORT, + 'entity') + pb.provisioning_complete(self.ctx, self.port.id, resources.PORT, + 'entity') + self.assertTrue(self.provisioned.called) + + def test_not_provisioned_until_final_component_complete(self): + pb.add_provisioning_component(self.ctx, self.port.id, resources.PORT, + 'entity1') + pb.add_provisioning_component(self.ctx, self.port.id, resources.PORT, + 'entity2') + pb.provisioning_complete(self.ctx, self.port.id, resources.PORT, + 'entity1') + self.assertFalse(self.provisioned.called) + pb.provisioning_complete(self.ctx, self.port.id, resources.PORT, + 'entity2') + self.assertTrue(self.provisioned.called) + + def test_provisioning_of_correct_item(self): + port2 = self._make_port() + pb.add_provisioning_component(self.ctx, self.port.id, resources.PORT, + 'entity1') + pb.provisioning_complete(self.ctx, port2.id, + resources.PORT, 'entity1') + self.provisioned.assert_called_once_with( + resources.PORT, pb.PROVISIONING_COMPLETE, mock.ANY, + context=self.ctx, object_id=port2.id) + + def test_not_provisioned_when_wrong_component_reports(self): + pb.add_provisioning_component(self.ctx, self.port.id, resources.PORT, + 'entity1') + pb.provisioning_complete(self.ctx, self.port.id, + resources.PORT, 'entity2') + self.assertFalse(self.provisioned.called) + + def test_remove_provisioning_component(self): + pb.add_provisioning_component(self.ctx, self.port.id, resources.PORT, + 'e1') + pb.add_provisioning_component(self.ctx, self.port.id, resources.PORT, + 'e2') + self.assertTrue(pb.remove_provisioning_component( + self.ctx, self.port.id, resources.PORT, 'e1')) + self.assertFalse(self.provisioned.called) + pb.provisioning_complete(self.ctx, self.port.id, + resources.PORT, 'other') + self.assertFalse(self.provisioned.called) + pb.provisioning_complete(self.ctx, self.port.id, + resources.PORT, 'e2') + self.assertTrue(self.provisioned.called) + + def test_adding_component_idempotent(self): + for i in range(5): + pb.add_provisioning_component(self.ctx, self.port.id, + resources.PORT, 'entity1') + pb.provisioning_complete(self.ctx, self.port.id, + resources.PORT, 'entity1') + self.assertTrue(self.provisioned.called) + + def test_adding_component_for_new_resource_type(self): + provisioned = mock.Mock() + registry.subscribe(provisioned, 'NETWORK', pb.PROVISIONING_COMPLETE) + net = self._make_net() + # expect failed because the model was not registered for the type + with testtools.ExpectedException(RuntimeError): + pb.add_provisioning_component(self.ctx, net.id, 'NETWORK', 'ent') + pb.add_model_for_resource('NETWORK', models_v2.Network) + pb.add_provisioning_component(self.ctx, net.id, 'NETWORK', 'ent') + pb.provisioning_complete(self.ctx, net.id, 'NETWORK', 'ent') + self.assertTrue(provisioned.called) diff --git a/neutron/tests/unit/plugins/ml2/test_plugin.py b/neutron/tests/unit/plugins/ml2/test_plugin.py index 6ed3678f660..84d2964eb2e 100644 --- a/neutron/tests/unit/plugins/ml2/test_plugin.py +++ b/neutron/tests/unit/plugins/ml2/test_plugin.py @@ -39,6 +39,7 @@ from neutron.db import api as db_api from neutron.db import db_base_plugin_v2 as base_plugin from neutron.db import l3_db from neutron.db import models_v2 +from neutron.db import provisioning_blocks from neutron.extensions import availability_zone as az_ext from neutron.extensions import external_net from neutron.extensions import multiprovidernet as mpnet @@ -556,6 +557,45 @@ class TestMl2PortsV2(test_plugin.TestPortsV2, Ml2PluginV2TestCase): plugin.update_port_status(ctx, short_id, 'UP') mock_gbl.assert_called_once_with(mock.ANY, port_id, mock.ANY) + def _add_fake_dhcp_agent(self): + agent = mock.Mock(configurations='{"notifies_port_ready": true}') + plugin = manager.NeutronManager.get_plugin() + self.get_dhcp_mock = mock.patch.object( + plugin, 'get_dhcp_agents_hosting_networks', + return_value=[agent]).start() + + def test_dhcp_provisioning_blocks_inserted_on_create_with_agents(self): + self._add_fake_dhcp_agent() + with mock.patch.object(provisioning_blocks, + 'add_provisioning_component') as ap: + with self.port(): + self.assertTrue(ap.called) + + def test_dhcp_provisioning_blocks_skipped_on_create_with_no_dhcp(self): + self._add_fake_dhcp_agent() + with self.subnet(enable_dhcp=False) as subnet: + with mock.patch.object(provisioning_blocks, + 'add_provisioning_component') as ap: + with self.port(subnet=subnet): + self.assertFalse(ap.called) + + def test_dhcp_provisioning_blocks_inserted_on_update(self): + ctx = context.get_admin_context() + plugin = manager.NeutronManager.get_plugin() + self._add_fake_dhcp_agent() + with self.port() as port: + with mock.patch.object(provisioning_blocks, + 'add_provisioning_component') as ap: + port['port']['binding:host_id'] = 'newhost' + plugin.update_port(ctx, port['port']['id'], port) + self.assertTrue(ap.called) + + def test_dhcp_provisioning_blocks_removed_without_dhcp_agents(self): + with mock.patch.object(provisioning_blocks, + 'remove_provisioning_component') as cp: + with self.port(): + self.assertTrue(cp.called) + def test_update_port_fixed_ip_changed(self): ctx = context.get_admin_context() plugin = manager.NeutronManager.get_plugin() diff --git a/neutron/tests/unit/plugins/ml2/test_rpc.py b/neutron/tests/unit/plugins/ml2/test_rpc.py index 8dc7f9c700f..79e8301d947 100644 --- a/neutron/tests/unit/plugins/ml2/test_rpc.py +++ b/neutron/tests/unit/plugins/ml2/test_rpc.py @@ -21,14 +21,15 @@ import collections import mock from neutron_lib import constants -from neutron_lib import exceptions from oslo_config import cfg from oslo_context import context as oslo_context import oslo_messaging from sqlalchemy.orm import exc from neutron.agent import rpc as agent_rpc +from neutron.callbacks import resources from neutron.common import topics +from neutron.db import provisioning_blocks from neutron.plugins.ml2.drivers import type_tunnel from neutron.plugins.ml2 import managers from neutron.plugins.ml2 import rpc as plugin_rpc @@ -51,29 +52,27 @@ class RpcCallbacksTestCase(base.BaseTestCase): plugin_rpc.manager, 'NeutronManager').start() self.plugin = self.manager.get_plugin() - def _test_update_device_up(self): + def _test_update_device_up(self, host=None): kwargs = { 'agent_id': 'foo_agent', - 'device': 'foo_device' + 'device': 'foo_device', + 'host': host } with mock.patch('neutron.plugins.ml2.plugin.Ml2Plugin' '._device_to_port_id'): - with mock.patch('neutron.callbacks.registry.notify') as notify: + with mock.patch('neutron.db.provisioning_blocks.' + 'provisioning_complete') as pc: self.callbacks.update_device_up(mock.Mock(), **kwargs) - return notify + return pc def test_update_device_up_notify(self): notify = self._test_update_device_up() - kwargs = { - 'context': mock.ANY, 'port': mock.ANY, 'update_device_up': True - } - notify.assert_called_once_with( - 'port', 'after_update', self.plugin, **kwargs) + notify.assert_called_once_with(mock.ANY, mock.ANY, resources.PORT, + provisioning_blocks.L2_AGENT_ENTITY) def test_update_device_up_notify_not_sent_with_port_not_found(self): - self.plugin._get_port.side_effect = ( - exceptions.PortNotFound(port_id='foo_port_id')) - notify = self._test_update_device_up() + self.plugin.port_bound_to_host.return_value = False + notify = self._test_update_device_up('host') self.assertFalse(notify.call_count) def test_get_device_details_without_port_context(self): @@ -93,7 +92,7 @@ class RpcCallbacksTestCase(base.BaseTestCase): def test_get_device_details_port_status_equal_new_status(self): port = collections.defaultdict(lambda: 'fake') self.plugin.get_bound_port_context().current = port - self.plugin.port_bound_to_host = mock.MagicMock(return_value=True) + self.plugin.port_bound_to_host = port for admin_state_up in (True, False): new_status = (constants.PORT_STATUS_BUILD if admin_state_up else constants.PORT_STATUS_DOWN)