Fixup event transaction semantics for ML2 bulk ops

This splits out code in ML2 for ports, subnets, and networks
into clear before, during, and after functions corresponding
to the state of the transaction they are called within.
This allows the bulk creation to correctly call chunks of code
outside of DB transactions even when doing many creations at once.

Change-Id: Id6fe0f5066358be954a6ca14dd49c36755897e31
Closes-Bug: #1683550
This commit is contained in:
Kevin Benton 2017-04-17 18:46:49 -07:00
parent 52636bc024
commit 47fdee4436
2 changed files with 77 additions and 30 deletions

View File

@ -670,6 +670,9 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
objects = []
collection = "%ss" % resource
items = request_items[collection]
obj_before_create = getattr(self, '_before_create_%s' % resource)
for item in items:
obj_before_create(context, item)
with db_api.context_manager.writer.using(context):
obj_creator = getattr(self, '_create_%s_db' % resource)
for item in items:
@ -687,22 +690,23 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
"the %(resource)s:%(item)s"),
{'resource': resource, 'item': item})
postcommit_op = getattr(self.mechanism_manager,
'create_%s_postcommit' % resource)
postcommit_op = getattr(self, '_after_create_%s' % resource)
for obj in objects:
try:
postcommit_op(obj['mech_context'])
except ml2_exc.MechanismDriverError:
postcommit_op(context, obj['result'], obj['mech_context'])
except Exception:
with excutils.save_and_reraise_exception():
resource_ids = [res['result']['id'] for res in objects]
LOG.exception(_LE("mechanism_manager.create_%(res)s"
"_postcommit failed for %(res)s: "
LOG.exception(_LE("ML2 _after_create_%(res)s "
"failed for %(res)s: "
"'%(failed_id)s'. Deleting "
"%(res)ss %(resource_ids)s"),
{'res': resource,
'failed_id': obj['result']['id'],
'resource_ids': ', '.join(resource_ids)})
self._delete_objects(context, resource, objects)
# _after_handler will have deleted the object that threw
to_delete = [o for o in objects if o != obj]
self._delete_objects(context, resource, to_delete)
return objects
def _get_network_mtu(self, network):
@ -744,11 +748,14 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
mtus.append(mtu)
return min(mtus) if mtus else 0
def _before_create_network(self, context, network):
net_data = network[attributes.NETWORK]
registry.notify(resources.NETWORK, events.BEFORE_CREATE, self,
context=context, network=net_data)
def _create_network_db(self, context, network):
net_data = network[attributes.NETWORK]
tenant_id = net_data['tenant_id']
registry.notify(resources.NETWORK, events.BEFORE_CREATE, self,
context=context, network=net_data)
with db_api.context_manager.writer.using(context):
net_db = self.create_network_db(context, network)
result = self._make_network_dict(net_db, process_extensions=False,
@ -785,7 +792,11 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
@utils.transaction_guard
@db_api.retry_if_session_inactive()
def create_network(self, context, network):
self._before_create_network(context, network)
result, mech_context = self._create_network_db(context, network)
return self._after_create_network(context, result, mech_context)
def _after_create_network(self, context, result, mech_context):
kwargs = {'context': context, 'network': result}
registry.notify(resources.NETWORK, events.AFTER_CREATE, self, **kwargs)
try:
@ -915,6 +926,10 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
" failed"))
self.notifier.network_delete(context, network['id'])
def _before_create_subnet(self, context, subnet):
# TODO(kevinbenton): BEFORE notification should be added here
pass
def _create_subnet_db(self, context, subnet):
with db_api.context_manager.writer.using(context):
result, net_db, ipam_sub = self._create_subnet_precommit(
@ -928,6 +943,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
result, network)
self.mechanism_manager.create_subnet_precommit(mech_context)
# TODO(kevinbenton): move this to '_after_subnet_create'
# db base plugin post commit ops
self._create_subnet_postcommit(context, result, net_db, ipam_sub)
@ -936,7 +952,11 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
@utils.transaction_guard
@db_api.retry_if_session_inactive()
def create_subnet(self, context, subnet):
self._before_create_subnet(context, subnet)
result, mech_context = self._create_subnet_db(context, subnet)
return self._after_create_subnet(context, result, mech_context)
def _after_create_subnet(self, context, result, mech_context):
kwargs = {'context': context, 'subnet': result}
registry.notify(resources.SUBNET, events.AFTER_CREATE, self, **kwargs)
try:
@ -1050,7 +1070,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
context, port['id'], resources.PORT,
provisioning_blocks.DHCP_ENTITY)
def _create_port_db(self, context, port):
def _before_create_port(self, context, port):
attrs = port[attributes.PORT]
if not attrs.get('status'):
attrs['status'] = const.PORT_STATUS_DOWN
@ -1060,6 +1080,9 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
# NOTE(kevinbenton): triggered outside of transaction since it
# emits 'AFTER' events if it creates.
self._ensure_default_security_group(context, attrs['tenant_id'])
def _create_port_db(self, context, port):
attrs = port[attributes.PORT]
with db_api.context_manager.writer.using(context):
dhcp_opts = attrs.get(edo_ext.EXTRADHCPOPTS, [])
port_db = self.create_port_db(context, port)
@ -1094,7 +1117,11 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
@utils.transaction_guard
@db_api.retry_if_session_inactive()
def create_port(self, context, port):
self._before_create_port(context, port)
result, mech_context = self._create_port_db(context, port)
return self._after_create_port(context, result, mech_context)
def _after_create_port(self, context, result, mech_context):
# notify any plugin that is interested in port create events
kwargs = {'context': context, 'port': result}
registry.notify(resources.PORT, events.AFTER_CREATE, self, **kwargs)
@ -1120,26 +1147,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
@db_api.retry_if_session_inactive()
def create_port_bulk(self, context, ports):
objects = self._create_bulk_ml2(attributes.PORT, context, ports)
for obj in objects:
attrs = obj['attributes']
if attrs and attrs.get(portbindings.HOST_ID):
kwargs = {'context': context, 'port': obj['result']}
registry.notify(
resources.PORT, events.AFTER_CREATE, self, **kwargs)
try:
for obj in objects:
obj['bound_context'] = self._bind_port_if_needed(
obj['mech_context'])
return [obj['bound_context'].current for obj in objects]
except ml2_exc.MechanismDriverError:
with excutils.save_and_reraise_exception():
resource_ids = [res['result']['id'] for res in objects]
LOG.error(_LE("_bind_port_if_needed failed. "
"Deleting all ports from create bulk '%s'"),
resource_ids)
self._delete_objects(context, attributes.PORT, objects)
return [obj['result'] for obj in objects]
# TODO(yalei) - will be simplified after security group and address pair be
# converted to ext driver too.

View File

@ -241,6 +241,24 @@ class TestMl2NetworksV2(test_plugin.TestNetworksV2,
self.assertEqual(n['network']['id'],
kwargs['network']['id'])
def test_bulk_network_before_and_after_events_outside_of_txn(self):
# capture session states during each before and after event
before = []
after = []
b_func = lambda *a, **k: before.append(k['context'].session.is_active)
a_func = lambda *a, **k: after.append(k['context'].session.is_active)
registry.subscribe(b_func, resources.NETWORK, events.BEFORE_CREATE)
registry.subscribe(a_func, resources.NETWORK, events.AFTER_CREATE)
data = [{'tenant_id': self._tenant_id}] * 4
self._create_bulk_from_list(
self.fmt, 'network', data, context=context.get_admin_context())
# ensure events captured
self.assertTrue(before)
self.assertTrue(after)
# ensure session was closed for all
self.assertFalse(any(before))
self.assertFalse(any(after))
def _create_and_verify_networks(self, networks):
for net_idx, net in enumerate(networks):
# create
@ -691,6 +709,27 @@ class TestMl2PortsV2(test_plugin.TestPortsV2, Ml2PluginV2TestCase):
self._delete('ports', p['port']['id'])
self.assertFalse(self.tx_open)
def test_bulk_ports_before_and_after_events_outside_of_txn(self):
with self.network() as n:
pass
# capture session states during each before and after event
before = []
after = []
b_func = lambda *a, **k: before.append(k['context'].session.is_active)
a_func = lambda *a, **k: after.append(k['context'].session.is_active)
registry.subscribe(b_func, resources.PORT, events.BEFORE_CREATE)
registry.subscribe(a_func, resources.PORT, events.AFTER_CREATE)
data = [{'tenant_id': self._tenant_id,
'network_id': n['network']['id']}] * 4
self._create_bulk_from_list(
self.fmt, 'port', data, context=context.get_admin_context())
# ensure events captured
self.assertTrue(before)
self.assertTrue(after)
# ensure session was closed for all
self.assertFalse(any(before))
self.assertFalse(any(after))
def test_create_router_port_and_fail_create_postcommit(self):
with mock.patch.object(managers.MechanismManager,