diff --git a/neutron/plugins/embrane/agent/dispatcher.py b/neutron/plugins/embrane/agent/dispatcher.py index 8038b93e02..31bc15511a 100644 --- a/neutron/plugins/embrane/agent/dispatcher.py +++ b/neutron/plugins/embrane/agent/dispatcher.py @@ -26,18 +26,10 @@ from neutron.openstack.common import log as logging from neutron.plugins.embrane.agent.operations import router_operations from neutron.plugins.embrane.common import constants as p_con from neutron.plugins.embrane.common import contexts as ctx -from neutron.plugins.embrane.common import exceptions as plugin_exc - LOG = logging.getLogger(__name__) -def _validate_operation(event, status, item_id): - if status and event not in p_con.operation_filter[status]: - raise plugin_exc.StateConstraintException(operation=event, - dva_id=item_id, state=status) - - class Dispatcher(object): def __init__(self, plugin, async=True): @@ -52,28 +44,29 @@ class Dispatcher(object): chain = d_context.chain item_id = item["id"] - # First round validation (Controller level) - _validate_operation(event, item["status"], item_id) - handlers = router_operations.handlers if event in handlers: for f in handlers[event]: first_run = False if item_id not in self.sync_items: - self.sync_items[item_id] = queue.Queue() + self.sync_items[item_id] = (queue.Queue(),) first_run = True - self.sync_items[item_id].put( + self.sync_items[item_id][0].put( ctx.OperationContext(event, q_context, item, chain, f, args, kwargs)) + t = None if first_run: t = greenthread.spawn(self._consume_l3, item_id, - self.sync_items[item_id], - self._plugin) + self.sync_items[item_id][0], + self._plugin, + self._async) + self.sync_items[item_id] += (t,) if not self._async: + t = self.sync_items[item_id][1] t.wait() - def _consume_l3(self, sync_item, sync_queue, plugin): + def _consume_l3(self, sync_item, sync_queue, plugin, a_sync): current_state = None while True: try: @@ -83,15 +76,13 @@ class Dispatcher(object): del self.sync_items[sync_item] return try: + # If synchronous op, empty the queue as fast as possible operation_context = sync_queue.get( + block=a_sync, timeout=p_con.QUEUE_TIMEOUT) except queue.Empty: del self.sync_items[sync_item] return - # Second round validation (enqueued level) - _validate_operation(operation_context.event, - current_state, - operation_context.item["id"]) # Execute the preliminary operations (operation_context.chain and operation_context.chain.execute_all()) @@ -134,12 +125,10 @@ class Dispatcher(object): operation_context.q_context, operation_context.item["id"]) # Error state cannot be reverted - elif current_state != p_con.Status.ERROR: + elif transient_state != p_con.Status.ERROR: current_state = plugin._update_neutron_state( operation_context.q_context, operation_context.item, transient_state) - except plugin_exc.StateConstraintException as e: - LOG.error(_("%s"), e.message) except Exception: LOG.exception(_("Unhandled exception occurred")) diff --git a/neutron/plugins/embrane/agent/operations/router_operations.py b/neutron/plugins/embrane/agent/operations/router_operations.py index d460bc47ef..032a5298f4 100644 --- a/neutron/plugins/embrane/agent/operations/router_operations.py +++ b/neutron/plugins/embrane/agent/operations/router_operations.py @@ -127,6 +127,8 @@ def _shrink_dva_iface(api, tenant_id, neutron_router, port_id): except h_exc.InterfaceNotFound: LOG.warning(_("Interface %s not found in the heleos back-end," "likely already deleted"), port_id) + return (p_con.Status.ACTIVE if neutron_router["admin_state_up"] else + p_con.Status.READY) except h_exc.PreliminaryOperationsFailed as ex: raise h_exc.BrokenInterface(err_msg=ex.message) state = api.extract_dva_state(dva) diff --git a/neutron/plugins/embrane/base_plugin.py b/neutron/plugins/embrane/base_plugin.py index 2db95e8778..33d2138886 100644 --- a/neutron/plugins/embrane/base_plugin.py +++ b/neutron/plugins/embrane/base_plugin.py @@ -21,6 +21,7 @@ from heleosapi import backend_operations as h_op from heleosapi import constants as h_con from heleosapi import exceptions as h_exc from oslo.config import cfg +from sqlalchemy.orm import exc from neutron.common import constants as l3_constants from neutron.common import exceptions as neutron_exc @@ -33,7 +34,6 @@ from neutron.plugins.embrane.agent import dispatcher from neutron.plugins.embrane.common import config # noqa from neutron.plugins.embrane.common import constants as p_con from neutron.plugins.embrane.common import contexts as embrane_ctx -from neutron.plugins.embrane.common import exceptions as c_exc from neutron.plugins.embrane.common import operation from neutron.plugins.embrane.common import utils @@ -111,7 +111,7 @@ class EmbranePlugin(object): def _retrieve_prefix_from_port(self, context, neutron_port): subnet_id = neutron_port["fixed_ips"][0]["subnet_id"] - subnet = self._get_subnet(context, subnet_id) + subnet = utils.retrieve_subnet(context, subnet_id) prefix = subnet["cidr"].split("/")[1] return prefix @@ -119,80 +119,47 @@ class EmbranePlugin(object): def create_router(self, context, router): r = router["router"] self._get_tenant_id_for_create(context, r) - with context.session.begin(subtransactions=True): - neutron_router = self._l3super.create_router(self, context, router) - network_id = None - gw_port = None - ext_gw_info = neutron_router.get(l3_db.EXTERNAL_GW_INFO) - if ext_gw_info: - network_id = ext_gw_info.get("network_id") - if network_id: - gw_ports = self.get_ports( - context, - {"device_id": [id], - "device_owner": ["network:router_gateway"]}) - if len(gw_ports) != 1: - raise c_exc.EmbranePluginException( - err_msg=_("There must be only one gateway port " - "per router at once")) - gw_port = gw_ports[0] - - # For now, only small flavor is used - utif_info = (self._plugin_support.retrieve_utif_info(context, - gw_port) - if network_id else None) - ip_allocation_info = (utils.retrieve_ip_allocation_info(self, - context, - gw_port) - if network_id else None) - neutron_router = self._l3super._get_router(self, context, - neutron_router["id"]) - neutron_router["status"] = p_con.Status.CREATING - self._dispatcher.dispatch_l3( - d_context=embrane_ctx.DispatcherContext( - p_con.Events.CREATE_ROUTER, neutron_router, context, None), - args=(h_con.Flavor.SMALL, utif_info, ip_allocation_info)) - return self._make_router_dict(neutron_router) + db_router = self._l3super.create_router(self, context, router) + neutron_router = self._get_router(context, db_router['id']) + gw_port = neutron_router.gw_port + # For now, only small flavor is used + utif_info = (self._plugin_support.retrieve_utif_info(context, + gw_port) + if gw_port else None) + ip_allocation_info = (utils.retrieve_ip_allocation_info(context, + gw_port) + if gw_port else None) + neutron_router = self._l3super._get_router(self, context, + neutron_router["id"]) + neutron_router["status"] = p_con.Status.CREATING + self._dispatcher.dispatch_l3( + d_context=embrane_ctx.DispatcherContext( + p_con.Events.CREATE_ROUTER, neutron_router, context, None), + args=(h_con.Flavor.SMALL, utif_info, ip_allocation_info)) + return self._make_router_dict(neutron_router) def update_router(self, context, id, router): - with context.session.begin(subtransactions=True): - db_router = self._l3super.update_router(self, context, id, router) - gw_port = None - ext_gw_info = db_router.get(l3_db.EXTERNAL_GW_INFO) - if ext_gw_info: - ext_gw_info = db_router[l3_db.EXTERNAL_GW_INFO] - network_id = (ext_gw_info.get("network_id") - if ext_gw_info else None) - if network_id: - gw_ports = self.get_ports( - context, - {"device_id": [id], - "device_owner": ["network:router_gateway"]}) - if len(gw_ports) != 1: - raise c_exc.EmbranePluginException( - err_msg=_("There must be only one gateway port " - "per router at once")) - gw_port = gw_ports[0] + db_router = self._l3super.update_router(self, context, id, router) + neutron_router = self._get_router(context, db_router['id']) + gw_port = neutron_router.gw_port + utif_info = (self._plugin_support.retrieve_utif_info(context, + gw_port) + if gw_port else None) + ip_allocation_info = (utils.retrieve_ip_allocation_info(context, + gw_port) + if gw_port else None) - utif_info = (self._plugin_support.retrieve_utif_info(context, - gw_port) - if gw_port else None) - ip_allocation_info = (utils.retrieve_ip_allocation_info(self, - context, - gw_port) - if gw_port else None) + routes_info = router["router"].get("routes") - routes_info = router["router"].get("routes") - - neutron_router = self._l3super._get_router(self, context, id) - state_change = operation.Operation( - self._set_db_router_state, - args=(context, neutron_router, p_con.Status.UPDATING)) - self._dispatcher.dispatch_l3( - d_context=embrane_ctx.DispatcherContext( - p_con.Events.UPDATE_ROUTER, neutron_router, context, - state_change), - args=(utif_info, ip_allocation_info, routes_info)) + neutron_router = self._l3super._get_router(self, context, id) + state_change = operation.Operation( + self._set_db_router_state, + args=(context, neutron_router, p_con.Status.UPDATING)) + self._dispatcher.dispatch_l3( + d_context=embrane_ctx.DispatcherContext( + p_con.Events.UPDATE_ROUTER, neutron_router, context, + state_change), + args=(utif_info, ip_allocation_info, routes_info)) return self._make_router_dict(neutron_router) def get_router(self, context, id, fields=None): @@ -241,58 +208,55 @@ class EmbranePlugin(object): """Deletes the DVA with the specific router id.""" # Copy of the parent validation code, shouldn't the base modules # provide functions for validating operations? - with context.session.begin(subtransactions=True): - DEVICE_OWNER_ROUTER_INTF = l3_constants.DEVICE_OWNER_ROUTER_INTF - fips = self.get_floatingips_count(context.elevated(), - filters={"router_id": [id]}) - if fips: - raise l3.RouterInUse(router_id=id) + device_owner_router_intf = l3_constants.DEVICE_OWNER_ROUTER_INTF + fips = self.get_floatingips_count(context.elevated(), + filters={"router_id": [id]}) + if fips: + raise l3.RouterInUse(router_id=id) - device_filter = {"device_id": [id], - "device_owner": [DEVICE_OWNER_ROUTER_INTF]} - ports = self.get_ports_count(context.elevated(), - filters=device_filter) - if ports: - raise l3.RouterInUse(router_id=id) - neutron_router = self._get_router(context, id) - state_change = operation.Operation(self._set_db_router_state, - args=(context, neutron_router, - p_con.Status.DELETING)) - self._dispatcher.dispatch_l3( - d_context=embrane_ctx.DispatcherContext( - p_con.Events.DELETE_ROUTER, neutron_router, context, - state_change), args=()) - LOG.debug(_("Deleting router=%s"), neutron_router) - return neutron_router + device_filter = {"device_id": [id], + "device_owner": [device_owner_router_intf]} + ports = self.get_ports_count(context.elevated(), + filters=device_filter) + if ports: + raise l3.RouterInUse(router_id=id) + neutron_router = self._get_router(context, id) + state_change = operation.Operation(self._set_db_router_state, + args=(context, neutron_router, + p_con.Status.DELETING)) + self._dispatcher.dispatch_l3( + d_context=embrane_ctx.DispatcherContext( + p_con.Events.DELETE_ROUTER, neutron_router, context, + state_change), args=()) + LOG.debug(_("Deleting router=%s"), neutron_router) + return neutron_router def add_router_interface(self, context, router_id, interface_info): """Grows DVA interface in the specified subnet.""" - with context.session.begin(subtransactions=True): - neutron_router = self._get_router(context, router_id) - rport_qry = context.session.query(models_v2.Port) - ports = rport_qry.filter_by( - device_id=router_id).all() - if len(ports) >= p_con.UTIF_LIMIT: - raise neutron_exc.BadRequest( - resource=router_id, - msg=("this router doesn't support more than " - + str(p_con.UTIF_LIMIT) + " interfaces")) - neutron_router_iface = self._l3super.add_router_interface( - self, context, router_id, interface_info) - port = self._get_port(context, neutron_router_iface["port_id"]) - utif_info = self._plugin_support.retrieve_utif_info(context, port) - ip_allocation_info = utils.retrieve_ip_allocation_info(self, - context, - port) - state_change = operation.Operation(self._set_db_router_state, - args=(context, neutron_router, - p_con.Status.UPDATING)) - self._dispatcher.dispatch_l3( - d_context=embrane_ctx.DispatcherContext( - p_con.Events.GROW_ROUTER_IF, neutron_router, context, - state_change), - args=(utif_info, ip_allocation_info)) - return neutron_router_iface + neutron_router = self._get_router(context, router_id) + rport_qry = context.session.query(models_v2.Port) + ports = rport_qry.filter_by( + device_id=router_id).all() + if len(ports) >= p_con.UTIF_LIMIT: + raise neutron_exc.BadRequest( + resource=router_id, + msg=("this router doesn't support more than " + + str(p_con.UTIF_LIMIT) + " interfaces")) + neutron_router_iface = self._l3super.add_router_interface( + self, context, router_id, interface_info) + port = self._get_port(context, neutron_router_iface["port_id"]) + utif_info = self._plugin_support.retrieve_utif_info(context, port) + ip_allocation_info = utils.retrieve_ip_allocation_info(context, + port) + state_change = operation.Operation(self._set_db_router_state, + args=(context, neutron_router, + p_con.Status.UPDATING)) + self._dispatcher.dispatch_l3( + d_context=embrane_ctx.DispatcherContext( + p_con.Events.GROW_ROUTER_IF, neutron_router, context, + state_change), + args=(utif_info, ip_allocation_info)) + return neutron_router_iface def remove_router_interface(self, context, router_id, interface_info): port_id = None @@ -300,7 +264,7 @@ class EmbranePlugin(object): port_id = interface_info["port_id"] elif "subnet_id" in interface_info: subnet_id = interface_info["subnet_id"] - subnet = self._get_subnet(context, subnet_id) + subnet = utils.retrieve_subnet(context, subnet_id) rport_qry = context.session.query(models_v2.Port) ports = rport_qry.filter_by( device_id=router_id, @@ -322,43 +286,90 @@ class EmbranePlugin(object): state_change), args=(port_id,)) - def update_floatingip(self, context, id, floatingip): - with context.session.begin(subtransactions=True): - db_fip = self._l3super.get_floatingip(self, context, id) - result = self._l3super.update_floatingip(self, context, id, - floatingip) + def create_floatingip(self, context, floatingip): + result = self._l3super.create_floatingip( + self, context, floatingip) - if db_fip["port_id"]: - neutron_router = self._get_router(context, db_fip["router_id"]) - fip_id = db_fip["id"] - state_change = operation.Operation( - self._set_db_router_state, - args=(context, neutron_router, p_con.Status.UPDATING)) + if result["port_id"]: + neutron_router = self._get_router(context, result["router_id"]) + db_fixed_port = self._get_port(context, result["port_id"]) + fixed_prefix = self._retrieve_prefix_from_port(context, + db_fixed_port) + db_floating_port = neutron_router["gw_port"] + floating_prefix = self._retrieve_prefix_from_port( + context, db_floating_port) + nat_info = utils.retrieve_nat_info(context, result, + fixed_prefix, + floating_prefix, + neutron_router) + state_change = operation.Operation( + self._set_db_router_state, + args=(context, neutron_router, p_con.Status.UPDATING)) - self._dispatcher.dispatch_l3( - d_context=embrane_ctx.DispatcherContext( - p_con.Events.RESET_NAT_RULE, neutron_router, context, - state_change), - args=(fip_id,)) - if floatingip["floatingip"]["port_id"]: - neutron_router = self._get_router(context, result["router_id"]) - db_fixed_port = self._get_port(context, result["port_id"]) - fixed_prefix = self._retrieve_prefix_from_port(context, - db_fixed_port) - db_floating_port = neutron_router["gw_port"] - floating_prefix = self._retrieve_prefix_from_port( - context, db_floating_port) - nat_info = utils.retrieve_nat_info(context, result, - fixed_prefix, - floating_prefix, - neutron_router) - state_change = operation.Operation( - self._set_db_router_state, - args=(context, neutron_router, p_con.Status.UPDATING)) - - self._dispatcher.dispatch_l3( - d_context=embrane_ctx.DispatcherContext( - p_con.Events.SET_NAT_RULE, neutron_router, context, - state_change), - args=(nat_info,)) + self._dispatcher.dispatch_l3( + d_context=embrane_ctx.DispatcherContext( + p_con.Events.SET_NAT_RULE, neutron_router, context, + state_change), + args=(nat_info,)) return result + + def update_floatingip(self, context, id, floatingip): + db_fip = self._l3super.get_floatingip(self, context, id) + result = self._l3super.update_floatingip(self, context, id, + floatingip) + + if db_fip["port_id"] and db_fip["port_id"] != result["port_id"]: + neutron_router = self._get_router(context, db_fip["router_id"]) + fip_id = db_fip["id"] + state_change = operation.Operation( + self._set_db_router_state, + args=(context, neutron_router, p_con.Status.UPDATING)) + + self._dispatcher.dispatch_l3( + d_context=embrane_ctx.DispatcherContext( + p_con.Events.RESET_NAT_RULE, neutron_router, context, + state_change), + args=(fip_id,)) + if result["port_id"]: + neutron_router = self._get_router(context, result["router_id"]) + db_fixed_port = self._get_port(context, result["port_id"]) + fixed_prefix = self._retrieve_prefix_from_port(context, + db_fixed_port) + db_floating_port = neutron_router["gw_port"] + floating_prefix = self._retrieve_prefix_from_port( + context, db_floating_port) + nat_info = utils.retrieve_nat_info(context, result, + fixed_prefix, + floating_prefix, + neutron_router) + state_change = operation.Operation( + self._set_db_router_state, + args=(context, neutron_router, p_con.Status.UPDATING)) + + self._dispatcher.dispatch_l3( + d_context=embrane_ctx.DispatcherContext( + p_con.Events.SET_NAT_RULE, neutron_router, context, + state_change), + args=(nat_info,)) + return result + + def disassociate_floatingips(self, context, port_id): + try: + fip_qry = context.session.query(l3_db.FloatingIP) + floating_ip = fip_qry.filter_by(fixed_port_id=port_id).one() + router_id = floating_ip["router_id"] + except exc.NoResultFound: + return + self._l3super.disassociate_floatingips(self, context, port_id) + if router_id: + neutron_router = self._get_router(context, router_id) + fip_id = floating_ip["id"] + state_change = operation.Operation( + self._set_db_router_state, + args=(context, neutron_router, p_con.Status.UPDATING)) + + self._dispatcher.dispatch_l3( + d_context=embrane_ctx.DispatcherContext( + p_con.Events.RESET_NAT_RULE, neutron_router, context, + state_change), + args=(fip_id,)) diff --git a/neutron/plugins/embrane/common/constants.py b/neutron/plugins/embrane/common/constants.py index ae75e89d2c..65f3818a28 100644 --- a/neutron/plugins/embrane/common/constants.py +++ b/neutron/plugins/embrane/common/constants.py @@ -48,18 +48,6 @@ class Events: SET_NAT_RULE = "set_nat_rule" RESET_NAT_RULE = "reset_nat_rule" -operation_filter = { - Status.ACTIVE: [Events.DELETE_ROUTER, Events.GROW_ROUTER_IF, - Events.SHRINK_ROUTER_IF, Events.UPDATE_ROUTER, - Events.SET_NAT_RULE, Events.RESET_NAT_RULE], - Status.READY: [Events.DELETE_ROUTER, Events.GROW_ROUTER_IF, - Events.SHRINK_ROUTER_IF, Events.UPDATE_ROUTER], - Status.ERROR: [Events.DELETE_ROUTER, Events.SHRINK_ROUTER_IF], - Status.UPDATING: [Events.DELETE_ROUTER, Events.SHRINK_ROUTER_IF, - Events.RESET_NAT_RULE], - Status.CREATING: [Events.DELETE_ROUTER, Events.CREATE_ROUTER], - Status.DELETING: [Events.DELETE_ROUTER]} - _DVA_PENDING_ERROR_MSG = _("Dva is pending for the following reason: %s") _DVA_NOT_FOUNT_ERROR_MSG = _("Dva can't be found to execute the operation, " "probably was cancelled through the heleos UI") diff --git a/neutron/plugins/embrane/common/exceptions.py b/neutron/plugins/embrane/common/exceptions.py index 763dabdad2..d2e2c1fdd9 100644 --- a/neutron/plugins/embrane/common/exceptions.py +++ b/neutron/plugins/embrane/common/exceptions.py @@ -22,14 +22,3 @@ from neutron.common import exceptions as neutron_exec class EmbranePluginException(neutron_exec.NeutronException): message = _("An unexpected error occurred:%(err_msg)s") - - -# Not permitted operation -class NonPermitted(neutron_exec.BadRequest): - pass - - -class StateConstraintException(NonPermitted): - message = _("Operation not permitted due to state constraint violation:" - "%(operation)s not allowed for DVA %(dva_id)s in state " - " %(state)s") diff --git a/neutron/plugins/embrane/common/utils.py b/neutron/plugins/embrane/common/utils.py index 78c3f8f730..8e4a92abab 100644 --- a/neutron/plugins/embrane/common/utils.py +++ b/neutron/plugins/embrane/common/utils.py @@ -19,6 +19,7 @@ from heleosapi import info as h_info +from neutron.db import models_v2 from neutron.openstack.common import log as logging LOG = logging.getLogger(__name__) @@ -31,7 +32,12 @@ def set_db_item_state(context, neutron_item, new_state): context.session.merge(neutron_item) -def retrieve_ip_allocation_info(l2_plugin, context, neutron_port): +def retrieve_subnet(context, subnet_id): + return (context.session.query( + models_v2.Subnet).filter(models_v2.Subnet.id == subnet_id).one()) + + +def retrieve_ip_allocation_info(context, neutron_port): """Retrieves ip allocation info for a specific port if any.""" try: @@ -39,7 +45,7 @@ def retrieve_ip_allocation_info(l2_plugin, context, neutron_port): except (KeyError, IndexError): LOG.info(_("No ip allocation set")) return - subnet = l2_plugin._get_subnet(context, subnet_id) + subnet = retrieve_subnet(context, subnet_id) allocated_ip = neutron_port["fixed_ips"][0]["ip_address"] is_gw_port = neutron_port["device_owner"] == "network:router_gateway" gateway_ip = subnet["gateway_ip"] diff --git a/neutron/plugins/embrane/plugins/embrane_ovs_plugin.py b/neutron/plugins/embrane/plugins/embrane_ovs_plugin.py index 1ad9368266..d4d5ac1806 100644 --- a/neutron/plugins/embrane/plugins/embrane_ovs_plugin.py +++ b/neutron/plugins/embrane/plugins/embrane_ovs_plugin.py @@ -33,5 +33,6 @@ class EmbraneOvsPlugin(base.EmbranePlugin, l2.OVSNeutronPluginV2): def __init__(self): '''First run plugin specific initialization, then Embrane's.''' + self._supported_extension_aliases.remove("l3_agent_scheduler") l2.OVSNeutronPluginV2.__init__(self) self._run_embrane_config()