Embrane Tempest Compliance

This changeset tracks the changes needed by the Embrane's Neutron Plugin
in order to consistently  pass tempest tests.

Changes:

- Some db transactions were too long and were causing lock timeout
exception. Removed useless transactions (waiting on non-db tasks to complete)
to fix the problem.

- The operation filter was useless, and breaking the tests. Most of the
logic which guarantees the appliance correct state when an operation in executed
is now in the internal library used for the heleos APIs.
The filter was therefore removed (as well as the corresponding exception).

- Fixed "sync" mode. The behavior was incorrect due to the queue timeout.
Furthermore, parallel requests were not waiting on the correct thread.

- Added missing methods for floating IPs (not all the scenarios were covered).

- Minor bug fixes caught during the tests.

Change-Id: If081b50b4629158016ba421b94612a4cfac82257
Closes-Bug:1269098
This commit is contained in:
Ivar Lazzaro 2014-01-14 11:17:05 -08:00
parent ab1f474ace
commit a23b1e6de0
7 changed files with 192 additions and 206 deletions

View File

@ -26,18 +26,10 @@ from neutron.openstack.common import log as logging
from neutron.plugins.embrane.agent.operations import router_operations
from neutron.plugins.embrane.common import constants as p_con
from neutron.plugins.embrane.common import contexts as ctx
from neutron.plugins.embrane.common import exceptions as plugin_exc
LOG = logging.getLogger(__name__)
def _validate_operation(event, status, item_id):
if status and event not in p_con.operation_filter[status]:
raise plugin_exc.StateConstraintException(operation=event,
dva_id=item_id, state=status)
class Dispatcher(object):
def __init__(self, plugin, async=True):
@ -52,28 +44,29 @@ class Dispatcher(object):
chain = d_context.chain
item_id = item["id"]
# First round validation (Controller level)
_validate_operation(event, item["status"], item_id)
handlers = router_operations.handlers
if event in handlers:
for f in handlers[event]:
first_run = False
if item_id not in self.sync_items:
self.sync_items[item_id] = queue.Queue()
self.sync_items[item_id] = (queue.Queue(),)
first_run = True
self.sync_items[item_id].put(
self.sync_items[item_id][0].put(
ctx.OperationContext(event, q_context, item, chain, f,
args, kwargs))
t = None
if first_run:
t = greenthread.spawn(self._consume_l3,
item_id,
self.sync_items[item_id],
self._plugin)
self.sync_items[item_id][0],
self._plugin,
self._async)
self.sync_items[item_id] += (t,)
if not self._async:
t = self.sync_items[item_id][1]
t.wait()
def _consume_l3(self, sync_item, sync_queue, plugin):
def _consume_l3(self, sync_item, sync_queue, plugin, a_sync):
current_state = None
while True:
try:
@ -83,15 +76,13 @@ class Dispatcher(object):
del self.sync_items[sync_item]
return
try:
# If synchronous op, empty the queue as fast as possible
operation_context = sync_queue.get(
block=a_sync,
timeout=p_con.QUEUE_TIMEOUT)
except queue.Empty:
del self.sync_items[sync_item]
return
# Second round validation (enqueued level)
_validate_operation(operation_context.event,
current_state,
operation_context.item["id"])
# Execute the preliminary operations
(operation_context.chain and
operation_context.chain.execute_all())
@ -134,12 +125,10 @@ class Dispatcher(object):
operation_context.q_context,
operation_context.item["id"])
# Error state cannot be reverted
elif current_state != p_con.Status.ERROR:
elif transient_state != p_con.Status.ERROR:
current_state = plugin._update_neutron_state(
operation_context.q_context,
operation_context.item,
transient_state)
except plugin_exc.StateConstraintException as e:
LOG.error(_("%s"), e.message)
except Exception:
LOG.exception(_("Unhandled exception occurred"))

View File

@ -127,6 +127,8 @@ def _shrink_dva_iface(api, tenant_id, neutron_router, port_id):
except h_exc.InterfaceNotFound:
LOG.warning(_("Interface %s not found in the heleos back-end,"
"likely already deleted"), port_id)
return (p_con.Status.ACTIVE if neutron_router["admin_state_up"] else
p_con.Status.READY)
except h_exc.PreliminaryOperationsFailed as ex:
raise h_exc.BrokenInterface(err_msg=ex.message)
state = api.extract_dva_state(dva)

View File

@ -21,6 +21,7 @@ from heleosapi import backend_operations as h_op
from heleosapi import constants as h_con
from heleosapi import exceptions as h_exc
from oslo.config import cfg
from sqlalchemy.orm import exc
from neutron.common import constants as l3_constants
from neutron.common import exceptions as neutron_exc
@ -33,7 +34,6 @@ from neutron.plugins.embrane.agent import dispatcher
from neutron.plugins.embrane.common import config # noqa
from neutron.plugins.embrane.common import constants as p_con
from neutron.plugins.embrane.common import contexts as embrane_ctx
from neutron.plugins.embrane.common import exceptions as c_exc
from neutron.plugins.embrane.common import operation
from neutron.plugins.embrane.common import utils
@ -111,7 +111,7 @@ class EmbranePlugin(object):
def _retrieve_prefix_from_port(self, context, neutron_port):
subnet_id = neutron_port["fixed_ips"][0]["subnet_id"]
subnet = self._get_subnet(context, subnet_id)
subnet = utils.retrieve_subnet(context, subnet_id)
prefix = subnet["cidr"].split("/")[1]
return prefix
@ -119,80 +119,47 @@ class EmbranePlugin(object):
def create_router(self, context, router):
r = router["router"]
self._get_tenant_id_for_create(context, r)
with context.session.begin(subtransactions=True):
neutron_router = self._l3super.create_router(self, context, router)
network_id = None
gw_port = None
ext_gw_info = neutron_router.get(l3_db.EXTERNAL_GW_INFO)
if ext_gw_info:
network_id = ext_gw_info.get("network_id")
if network_id:
gw_ports = self.get_ports(
context,
{"device_id": [id],
"device_owner": ["network:router_gateway"]})
if len(gw_ports) != 1:
raise c_exc.EmbranePluginException(
err_msg=_("There must be only one gateway port "
"per router at once"))
gw_port = gw_ports[0]
# For now, only small flavor is used
utif_info = (self._plugin_support.retrieve_utif_info(context,
gw_port)
if network_id else None)
ip_allocation_info = (utils.retrieve_ip_allocation_info(self,
context,
gw_port)
if network_id else None)
neutron_router = self._l3super._get_router(self, context,
neutron_router["id"])
neutron_router["status"] = p_con.Status.CREATING
self._dispatcher.dispatch_l3(
d_context=embrane_ctx.DispatcherContext(
p_con.Events.CREATE_ROUTER, neutron_router, context, None),
args=(h_con.Flavor.SMALL, utif_info, ip_allocation_info))
return self._make_router_dict(neutron_router)
db_router = self._l3super.create_router(self, context, router)
neutron_router = self._get_router(context, db_router['id'])
gw_port = neutron_router.gw_port
# For now, only small flavor is used
utif_info = (self._plugin_support.retrieve_utif_info(context,
gw_port)
if gw_port else None)
ip_allocation_info = (utils.retrieve_ip_allocation_info(context,
gw_port)
if gw_port else None)
neutron_router = self._l3super._get_router(self, context,
neutron_router["id"])
neutron_router["status"] = p_con.Status.CREATING
self._dispatcher.dispatch_l3(
d_context=embrane_ctx.DispatcherContext(
p_con.Events.CREATE_ROUTER, neutron_router, context, None),
args=(h_con.Flavor.SMALL, utif_info, ip_allocation_info))
return self._make_router_dict(neutron_router)
def update_router(self, context, id, router):
with context.session.begin(subtransactions=True):
db_router = self._l3super.update_router(self, context, id, router)
gw_port = None
ext_gw_info = db_router.get(l3_db.EXTERNAL_GW_INFO)
if ext_gw_info:
ext_gw_info = db_router[l3_db.EXTERNAL_GW_INFO]
network_id = (ext_gw_info.get("network_id")
if ext_gw_info else None)
if network_id:
gw_ports = self.get_ports(
context,
{"device_id": [id],
"device_owner": ["network:router_gateway"]})
if len(gw_ports) != 1:
raise c_exc.EmbranePluginException(
err_msg=_("There must be only one gateway port "
"per router at once"))
gw_port = gw_ports[0]
db_router = self._l3super.update_router(self, context, id, router)
neutron_router = self._get_router(context, db_router['id'])
gw_port = neutron_router.gw_port
utif_info = (self._plugin_support.retrieve_utif_info(context,
gw_port)
if gw_port else None)
ip_allocation_info = (utils.retrieve_ip_allocation_info(context,
gw_port)
if gw_port else None)
utif_info = (self._plugin_support.retrieve_utif_info(context,
gw_port)
if gw_port else None)
ip_allocation_info = (utils.retrieve_ip_allocation_info(self,
context,
gw_port)
if gw_port else None)
routes_info = router["router"].get("routes")
routes_info = router["router"].get("routes")
neutron_router = self._l3super._get_router(self, context, id)
state_change = operation.Operation(
self._set_db_router_state,
args=(context, neutron_router, p_con.Status.UPDATING))
self._dispatcher.dispatch_l3(
d_context=embrane_ctx.DispatcherContext(
p_con.Events.UPDATE_ROUTER, neutron_router, context,
state_change),
args=(utif_info, ip_allocation_info, routes_info))
neutron_router = self._l3super._get_router(self, context, id)
state_change = operation.Operation(
self._set_db_router_state,
args=(context, neutron_router, p_con.Status.UPDATING))
self._dispatcher.dispatch_l3(
d_context=embrane_ctx.DispatcherContext(
p_con.Events.UPDATE_ROUTER, neutron_router, context,
state_change),
args=(utif_info, ip_allocation_info, routes_info))
return self._make_router_dict(neutron_router)
def get_router(self, context, id, fields=None):
@ -241,58 +208,55 @@ class EmbranePlugin(object):
"""Deletes the DVA with the specific router id."""
# Copy of the parent validation code, shouldn't the base modules
# provide functions for validating operations?
with context.session.begin(subtransactions=True):
DEVICE_OWNER_ROUTER_INTF = l3_constants.DEVICE_OWNER_ROUTER_INTF
fips = self.get_floatingips_count(context.elevated(),
filters={"router_id": [id]})
if fips:
raise l3.RouterInUse(router_id=id)
device_owner_router_intf = l3_constants.DEVICE_OWNER_ROUTER_INTF
fips = self.get_floatingips_count(context.elevated(),
filters={"router_id": [id]})
if fips:
raise l3.RouterInUse(router_id=id)
device_filter = {"device_id": [id],
"device_owner": [DEVICE_OWNER_ROUTER_INTF]}
ports = self.get_ports_count(context.elevated(),
filters=device_filter)
if ports:
raise l3.RouterInUse(router_id=id)
neutron_router = self._get_router(context, id)
state_change = operation.Operation(self._set_db_router_state,
args=(context, neutron_router,
p_con.Status.DELETING))
self._dispatcher.dispatch_l3(
d_context=embrane_ctx.DispatcherContext(
p_con.Events.DELETE_ROUTER, neutron_router, context,
state_change), args=())
LOG.debug(_("Deleting router=%s"), neutron_router)
return neutron_router
device_filter = {"device_id": [id],
"device_owner": [device_owner_router_intf]}
ports = self.get_ports_count(context.elevated(),
filters=device_filter)
if ports:
raise l3.RouterInUse(router_id=id)
neutron_router = self._get_router(context, id)
state_change = operation.Operation(self._set_db_router_state,
args=(context, neutron_router,
p_con.Status.DELETING))
self._dispatcher.dispatch_l3(
d_context=embrane_ctx.DispatcherContext(
p_con.Events.DELETE_ROUTER, neutron_router, context,
state_change), args=())
LOG.debug(_("Deleting router=%s"), neutron_router)
return neutron_router
def add_router_interface(self, context, router_id, interface_info):
"""Grows DVA interface in the specified subnet."""
with context.session.begin(subtransactions=True):
neutron_router = self._get_router(context, router_id)
rport_qry = context.session.query(models_v2.Port)
ports = rport_qry.filter_by(
device_id=router_id).all()
if len(ports) >= p_con.UTIF_LIMIT:
raise neutron_exc.BadRequest(
resource=router_id,
msg=("this router doesn't support more than "
+ str(p_con.UTIF_LIMIT) + " interfaces"))
neutron_router_iface = self._l3super.add_router_interface(
self, context, router_id, interface_info)
port = self._get_port(context, neutron_router_iface["port_id"])
utif_info = self._plugin_support.retrieve_utif_info(context, port)
ip_allocation_info = utils.retrieve_ip_allocation_info(self,
context,
port)
state_change = operation.Operation(self._set_db_router_state,
args=(context, neutron_router,
p_con.Status.UPDATING))
self._dispatcher.dispatch_l3(
d_context=embrane_ctx.DispatcherContext(
p_con.Events.GROW_ROUTER_IF, neutron_router, context,
state_change),
args=(utif_info, ip_allocation_info))
return neutron_router_iface
neutron_router = self._get_router(context, router_id)
rport_qry = context.session.query(models_v2.Port)
ports = rport_qry.filter_by(
device_id=router_id).all()
if len(ports) >= p_con.UTIF_LIMIT:
raise neutron_exc.BadRequest(
resource=router_id,
msg=("this router doesn't support more than "
+ str(p_con.UTIF_LIMIT) + " interfaces"))
neutron_router_iface = self._l3super.add_router_interface(
self, context, router_id, interface_info)
port = self._get_port(context, neutron_router_iface["port_id"])
utif_info = self._plugin_support.retrieve_utif_info(context, port)
ip_allocation_info = utils.retrieve_ip_allocation_info(context,
port)
state_change = operation.Operation(self._set_db_router_state,
args=(context, neutron_router,
p_con.Status.UPDATING))
self._dispatcher.dispatch_l3(
d_context=embrane_ctx.DispatcherContext(
p_con.Events.GROW_ROUTER_IF, neutron_router, context,
state_change),
args=(utif_info, ip_allocation_info))
return neutron_router_iface
def remove_router_interface(self, context, router_id, interface_info):
port_id = None
@ -300,7 +264,7 @@ class EmbranePlugin(object):
port_id = interface_info["port_id"]
elif "subnet_id" in interface_info:
subnet_id = interface_info["subnet_id"]
subnet = self._get_subnet(context, subnet_id)
subnet = utils.retrieve_subnet(context, subnet_id)
rport_qry = context.session.query(models_v2.Port)
ports = rport_qry.filter_by(
device_id=router_id,
@ -322,43 +286,90 @@ class EmbranePlugin(object):
state_change),
args=(port_id,))
def update_floatingip(self, context, id, floatingip):
with context.session.begin(subtransactions=True):
db_fip = self._l3super.get_floatingip(self, context, id)
result = self._l3super.update_floatingip(self, context, id,
floatingip)
def create_floatingip(self, context, floatingip):
result = self._l3super.create_floatingip(
self, context, floatingip)
if db_fip["port_id"]:
neutron_router = self._get_router(context, db_fip["router_id"])
fip_id = db_fip["id"]
state_change = operation.Operation(
self._set_db_router_state,
args=(context, neutron_router, p_con.Status.UPDATING))
if result["port_id"]:
neutron_router = self._get_router(context, result["router_id"])
db_fixed_port = self._get_port(context, result["port_id"])
fixed_prefix = self._retrieve_prefix_from_port(context,
db_fixed_port)
db_floating_port = neutron_router["gw_port"]
floating_prefix = self._retrieve_prefix_from_port(
context, db_floating_port)
nat_info = utils.retrieve_nat_info(context, result,
fixed_prefix,
floating_prefix,
neutron_router)
state_change = operation.Operation(
self._set_db_router_state,
args=(context, neutron_router, p_con.Status.UPDATING))
self._dispatcher.dispatch_l3(
d_context=embrane_ctx.DispatcherContext(
p_con.Events.RESET_NAT_RULE, neutron_router, context,
state_change),
args=(fip_id,))
if floatingip["floatingip"]["port_id"]:
neutron_router = self._get_router(context, result["router_id"])
db_fixed_port = self._get_port(context, result["port_id"])
fixed_prefix = self._retrieve_prefix_from_port(context,
db_fixed_port)
db_floating_port = neutron_router["gw_port"]
floating_prefix = self._retrieve_prefix_from_port(
context, db_floating_port)
nat_info = utils.retrieve_nat_info(context, result,
fixed_prefix,
floating_prefix,
neutron_router)
state_change = operation.Operation(
self._set_db_router_state,
args=(context, neutron_router, p_con.Status.UPDATING))
self._dispatcher.dispatch_l3(
d_context=embrane_ctx.DispatcherContext(
p_con.Events.SET_NAT_RULE, neutron_router, context,
state_change),
args=(nat_info,))
self._dispatcher.dispatch_l3(
d_context=embrane_ctx.DispatcherContext(
p_con.Events.SET_NAT_RULE, neutron_router, context,
state_change),
args=(nat_info,))
return result
def update_floatingip(self, context, id, floatingip):
db_fip = self._l3super.get_floatingip(self, context, id)
result = self._l3super.update_floatingip(self, context, id,
floatingip)
if db_fip["port_id"] and db_fip["port_id"] != result["port_id"]:
neutron_router = self._get_router(context, db_fip["router_id"])
fip_id = db_fip["id"]
state_change = operation.Operation(
self._set_db_router_state,
args=(context, neutron_router, p_con.Status.UPDATING))
self._dispatcher.dispatch_l3(
d_context=embrane_ctx.DispatcherContext(
p_con.Events.RESET_NAT_RULE, neutron_router, context,
state_change),
args=(fip_id,))
if result["port_id"]:
neutron_router = self._get_router(context, result["router_id"])
db_fixed_port = self._get_port(context, result["port_id"])
fixed_prefix = self._retrieve_prefix_from_port(context,
db_fixed_port)
db_floating_port = neutron_router["gw_port"]
floating_prefix = self._retrieve_prefix_from_port(
context, db_floating_port)
nat_info = utils.retrieve_nat_info(context, result,
fixed_prefix,
floating_prefix,
neutron_router)
state_change = operation.Operation(
self._set_db_router_state,
args=(context, neutron_router, p_con.Status.UPDATING))
self._dispatcher.dispatch_l3(
d_context=embrane_ctx.DispatcherContext(
p_con.Events.SET_NAT_RULE, neutron_router, context,
state_change),
args=(nat_info,))
return result
def disassociate_floatingips(self, context, port_id):
try:
fip_qry = context.session.query(l3_db.FloatingIP)
floating_ip = fip_qry.filter_by(fixed_port_id=port_id).one()
router_id = floating_ip["router_id"]
except exc.NoResultFound:
return
self._l3super.disassociate_floatingips(self, context, port_id)
if router_id:
neutron_router = self._get_router(context, router_id)
fip_id = floating_ip["id"]
state_change = operation.Operation(
self._set_db_router_state,
args=(context, neutron_router, p_con.Status.UPDATING))
self._dispatcher.dispatch_l3(
d_context=embrane_ctx.DispatcherContext(
p_con.Events.RESET_NAT_RULE, neutron_router, context,
state_change),
args=(fip_id,))

View File

@ -48,18 +48,6 @@ class Events:
SET_NAT_RULE = "set_nat_rule"
RESET_NAT_RULE = "reset_nat_rule"
operation_filter = {
Status.ACTIVE: [Events.DELETE_ROUTER, Events.GROW_ROUTER_IF,
Events.SHRINK_ROUTER_IF, Events.UPDATE_ROUTER,
Events.SET_NAT_RULE, Events.RESET_NAT_RULE],
Status.READY: [Events.DELETE_ROUTER, Events.GROW_ROUTER_IF,
Events.SHRINK_ROUTER_IF, Events.UPDATE_ROUTER],
Status.ERROR: [Events.DELETE_ROUTER, Events.SHRINK_ROUTER_IF],
Status.UPDATING: [Events.DELETE_ROUTER, Events.SHRINK_ROUTER_IF,
Events.RESET_NAT_RULE],
Status.CREATING: [Events.DELETE_ROUTER, Events.CREATE_ROUTER],
Status.DELETING: [Events.DELETE_ROUTER]}
_DVA_PENDING_ERROR_MSG = _("Dva is pending for the following reason: %s")
_DVA_NOT_FOUNT_ERROR_MSG = _("Dva can't be found to execute the operation, "
"probably was cancelled through the heleos UI")

View File

@ -22,14 +22,3 @@ from neutron.common import exceptions as neutron_exec
class EmbranePluginException(neutron_exec.NeutronException):
message = _("An unexpected error occurred:%(err_msg)s")
# Not permitted operation
class NonPermitted(neutron_exec.BadRequest):
pass
class StateConstraintException(NonPermitted):
message = _("Operation not permitted due to state constraint violation:"
"%(operation)s not allowed for DVA %(dva_id)s in state "
" %(state)s")

View File

@ -19,6 +19,7 @@
from heleosapi import info as h_info
from neutron.db import models_v2
from neutron.openstack.common import log as logging
LOG = logging.getLogger(__name__)
@ -31,7 +32,12 @@ def set_db_item_state(context, neutron_item, new_state):
context.session.merge(neutron_item)
def retrieve_ip_allocation_info(l2_plugin, context, neutron_port):
def retrieve_subnet(context, subnet_id):
return (context.session.query(
models_v2.Subnet).filter(models_v2.Subnet.id == subnet_id).one())
def retrieve_ip_allocation_info(context, neutron_port):
"""Retrieves ip allocation info for a specific port if any."""
try:
@ -39,7 +45,7 @@ def retrieve_ip_allocation_info(l2_plugin, context, neutron_port):
except (KeyError, IndexError):
LOG.info(_("No ip allocation set"))
return
subnet = l2_plugin._get_subnet(context, subnet_id)
subnet = retrieve_subnet(context, subnet_id)
allocated_ip = neutron_port["fixed_ips"][0]["ip_address"]
is_gw_port = neutron_port["device_owner"] == "network:router_gateway"
gateway_ip = subnet["gateway_ip"]

View File

@ -33,5 +33,6 @@ class EmbraneOvsPlugin(base.EmbranePlugin, l2.OVSNeutronPluginV2):
def __init__(self):
'''First run plugin specific initialization, then Embrane's.'''
self._supported_extension_aliases.remove("l3_agent_scheduler")
l2.OVSNeutronPluginV2.__init__(self)
self._run_embrane_config()