diff --git a/networking_arista/ml2/arista_sync.py b/networking_arista/ml2/arista_sync.py index 4468c539..0c6bf808 100644 --- a/networking_arista/ml2/arista_sync.py +++ b/networking_arista/ml2/arista_sync.py @@ -14,6 +14,7 @@ # limitations under the License. import os +import random import time from eventlet import event @@ -130,6 +131,15 @@ class AristaSyncWorker(worker.BaseWorker): 'pid': os.getpid()}) resource_class = self.get_resource_class(resource.resource_type) resource_class.update_neutron_resource(resource.id, resource.action) + for resource_type, resource_id in resource.related_resources: + LOG.info("%(pid)s %(action)s requisite %(rtype) with id %(id)s", + {'action': resource.action, + 'rtype': resource_type, + 'id': resource_id, + 'pid': os.getpid()}) + resource_class = self.get_resource_class(resource_type) + resource_class.update_neutron_resource(resource_id, + resource.action) def force_full_sync(self): for resource_type in reversed(self.sync_order): @@ -213,7 +223,6 @@ class AristaSyncWorker(worker.BaseWorker): if not self._rpc.sync_start(): LOG.info("%(pid)s Failed to grab the sync lock", {'pid': os.getpid()}) - greenthread.sleep(1) return for resource in self._resources_to_update: @@ -250,10 +259,15 @@ class AristaSyncWorker(worker.BaseWorker): except Exception: LOG.exception("%(pid)s Arista Sync failed", {'pid': os.getpid()}) + # Release sync lock if held and at least one full sync was + # successful with the current cvx uuid + if (self._rpc.current_sync_name is not None and + self._cvx_uuid is not None): + self._rpc.sync_end() self._cvx_uuid = None self._synchronizing_uuid = None # Yield to avoid starvation - greenthread.sleep(0) + greenthread.sleep(random.random()) self.done.send(True) diff --git a/networking_arista/ml2/mechanism_arista.py b/networking_arista/ml2/mechanism_arista.py index e9c5b991..952b35ee 100644 --- a/networking_arista/ml2/mechanism_arista.py +++ b/networking_arista/ml2/mechanism_arista.py @@ -46,10 +46,11 @@ def log_context(function, context): class MechResource(object): """Container class for passing data to sync worker""" - def __init__(self, id, resource_type, action): + def __init__(self, id, resource_type, action, related_resources=None): self.id = id self.resource_type = resource_type self.action = action + self.related_resources = related_resources or list() def __str__(self): return "%s %s ID: %s" % (self.action, self.resource_type, self.id) @@ -81,46 +82,38 @@ class AristaDriver(driver_api.MechanismDriver): def get_workers(self): return [arista_sync.AristaSyncWorker(self.provision_queue)] - def create_tenant(self, tenant_id): - """Enqueue tenant create""" - t_res = MechResource(tenant_id, a_const.TENANT_RESOURCE, - a_const.CREATE if tenant_id - else a_const.FULL_SYNC) - self.provision_queue.put(t_res) - - def delete_tenant_if_removed(self, tenant_id): - """Enqueue tenant delete if it's no longer in the db""" - if not db_lib.tenant_provisioned(tenant_id): - t_res = MechResource(tenant_id, a_const.TENANT_RESOURCE, - a_const.DELETE if tenant_id - else a_const.FULL_SYNC) - self.provision_queue.put(t_res) - - def create_network(self, network): + def create_network(self, network, segments): """Enqueue network create""" - n_res = MechResource(network['id'], a_const.NETWORK_RESOURCE, - a_const.CREATE) + tenant_id = network['project_id'] + action = a_const.CREATE if tenant_id else a_const.FULL_SYNC + n_res = MechResource(network['id'], a_const.NETWORK_RESOURCE, action) + n_res.related_resources.append((a_const.TENANT_RESOURCE, tenant_id)) + for segment in segments: + n_res.related_resources.append( + (a_const.SEGMENT_RESOURCE, segment['id'])) self.provision_queue.put(n_res) - def delete_network(self, network): + def delete_network(self, network, segments): """Enqueue network delete""" - n_res = MechResource(network['id'], a_const.NETWORK_RESOURCE, - a_const.DELETE) + tenant_id = network['project_id'] + action = a_const.DELETE if tenant_id else a_const.FULL_SYNC + n_res = MechResource(network['id'], a_const.NETWORK_RESOURCE, action) + + # Delete tenant if this was the last tenant resource + if not db_lib.tenant_provisioned(tenant_id): + n_res.related_resources.append( + (a_const.TENANT_RESOURCE, tenant_id)) + + for segment in segments: + n_res.related_resources.append( + (a_const.SEGMENT_RESOURCE, segment['id'])) self.provision_queue.put(n_res) - def create_segments(self, segments): - """Enqueue segment creates""" - for segment in segments: - s_res = MechResource(segment['id'], a_const.SEGMENT_RESOURCE, - a_const.CREATE) - self.provision_queue.put(s_res) - - def delete_segments(self, segments): - """Enqueue segment deletes""" - for segment in segments: - s_res = MechResource(segment['id'], a_const.SEGMENT_RESOURCE, - a_const.DELETE) - self.provision_queue.put(s_res) + def delete_segment(self, segment): + """Enqueue segment delete""" + s_res = MechResource(segment['id'], a_const.SEGMENT_RESOURCE, + a_const.DELETE) + self.provision_queue.put(s_res) def get_instance_type(self, port): """Determine the port type based on device owner and vnic type""" @@ -139,43 +132,6 @@ class AristaDriver(driver_api.MechanismDriver): return a_const.VM_RESOURCE return None - def create_instance(self, port): - """Enqueue instance create""" - instance_type = self.get_instance_type(port) - if not instance_type: - return - i_res = MechResource(port['device_id'], instance_type, a_const.CREATE) - self.provision_queue.put(i_res) - - def delete_instance_if_removed(self, port): - """Enqueue instance delete if it's no longer in the db""" - instance_type = self.get_instance_type(port) - if not instance_type: - return - if not db_lib.instance_provisioned(port['device_id']): - i_res = MechResource(port['device_id'], instance_type, - a_const.DELETE) - self.provision_queue.put(i_res) - - def create_port(self, port): - """Enqueue port create""" - instance_type = self.get_instance_type(port) - if not instance_type: - return - port_type = instance_type + a_const.PORT_SUFFIX - p_res = MechResource(port['id'], port_type, a_const.CREATE) - self.provision_queue.put(p_res) - - def delete_port_if_removed(self, port): - """Enqueue port delete""" - instance_type = self.get_instance_type(port) - if not instance_type: - return - port_type = instance_type + a_const.PORT_SUFFIX - if not db_lib.port_provisioned(port['id']): - p_res = MechResource(port['id'], port_type, a_const.DELETE) - self.provision_queue.put(p_res) - def _get_binding_keys(self, port, host): """Get binding keys from the port binding""" binding_keys = list() @@ -192,20 +148,46 @@ class AristaDriver(driver_api.MechanismDriver): def create_port_binding(self, port, host): """Enqueue port binding create""" - if not self.get_instance_type(port): + tenant_id = port['project_id'] + instance_type = self.get_instance_type(port) + if not instance_type: return + port_type = instance_type + a_const.PORT_SUFFIX + action = a_const.CREATE if tenant_id else a_const.FULL_SYNC + related_resources = list() + related_resources.append((a_const.TENANT_RESOURCE, tenant_id)) + related_resources.append((instance_type, port['device_id'])) + related_resources.append((port_type, port['id'])) for pb_key in self._get_binding_keys(port, host): pb_res = MechResource(pb_key, a_const.PORT_BINDING_RESOURCE, - a_const.CREATE) + action, related_resources=related_resources) self.provision_queue.put(pb_res) def delete_port_binding(self, port, host): """Enqueue port binding delete""" - if not self.get_instance_type(port): + tenant_id = port['project_id'] + instance_type = self.get_instance_type(port) + if not instance_type: return + port_type = instance_type + a_const.PORT_SUFFIX + action = a_const.DELETE if tenant_id else a_const.FULL_SYNC + related_resources = list() + + # Delete tenant if this was the last tenant resource + if not db_lib.tenant_provisioned(tenant_id): + related_resources.append((a_const.TENANT_RESOURCE, tenant_id)) + + # Delete instance if this was the last instance port + if not db_lib.instance_provisioned(port['device_id']): + related_resources.append((instance_type, port['device_id'])) + + # Delete port if this was the last port binding + if not db_lib.port_provisioned(port['id']): + related_resources.append((port_type, port['id'])) + for pb_key in self._get_binding_keys(port, host): pb_res = MechResource(pb_key, a_const.PORT_BINDING_RESOURCE, - a_const.DELETE) + action, related_resources=related_resources) self.provision_queue.put(pb_res) def create_network_postcommit(self, context): @@ -215,17 +197,13 @@ class AristaDriver(driver_api.MechanismDriver): log_context("create_network_postcommit: network", network) segments = context.network_segments - tenant_id = network['project_id'] - self.create_tenant(tenant_id) - self.create_network(network) - self.create_segments(segments) + self.create_network(network, segments) def update_network_postcommit(self, context): """Send network updates to CVX: - Update the network name - Add new segments - - Delete stale segments """ network = context.current orig_network = context.original @@ -235,10 +213,8 @@ class AristaDriver(driver_api.MechanismDriver): segments = context.network_segments - self.create_network(network) - # New segments may have been added - self.create_segments(segments) + self.create_network(network, segments) def delete_network_postcommit(self, context): """Delete the network from CVX""" @@ -247,18 +223,7 @@ class AristaDriver(driver_api.MechanismDriver): log_context("delete_network_postcommit: network", network) segments = context.network_segments - tenant_id = network['project_id'] - self.delete_segments(segments) - self.delete_network(network) - self.delete_tenant_if_removed(tenant_id) - - def _delete_port_resources(self, port, host): - tenant_id = port['project_id'] - - self.delete_port_binding(port, host) - self.delete_port_if_removed(port) - self.delete_instance_if_removed(port) - self.delete_tenant_if_removed(tenant_id) + self.delete_network(network, segments) def update_port_postcommit(self, context): """Send port updates to CVX @@ -273,27 +238,21 @@ class AristaDriver(driver_api.MechanismDriver): log_context("update_port_postcommit: port", port) log_context("update_port_postcommit: orig", orig_port) - tenant_id = port['project_id'] - # Device id can change without a port going DOWN, but the new device # id may not be supported if orig_port and port['device_id'] != orig_port['device_id']: - self._delete_port_resources(orig_port, context.original_host) + self.delete_port_binding(orig_port, context.original_host) if context.status == n_const.PORT_STATUS_DOWN: if (context.original_host and context.status != context.original_status): - self._delete_port_resources(orig_port, context.original_host) + self.delete_port_binding(orig_port, context.original_host) self._try_to_release_dynamic_segment(context, migration=True) else: - self.create_tenant(tenant_id) - self.create_network(network) if context.binding_levels: segments = [ level['bound_segment'] for level in context.binding_levels] - self.create_segments(segments) - self.create_instance(port) - self.create_port(port) + self.create_network(network, segments) self.create_port_binding(port, context.host) def delete_port_postcommit(self, context): @@ -302,7 +261,7 @@ class AristaDriver(driver_api.MechanismDriver): log_context("delete_port_postcommit: port", port) - self._delete_port_resources(port, context.host) + self.delete_port_binding(port, context.host) self._try_to_release_dynamic_segment(context) def _bind_baremetal_port(self, context, segment): @@ -427,7 +386,7 @@ class AristaDriver(driver_api.MechanismDriver): continue if not db_lib.segment_bound(segment_id): context.release_dynamic_segment(segment_id) - self.delete_segments([bound_segment]) + self.delete_segment(bound_segment) LOG.debug("Released dynamic segment %(seg)s allocated " "by %(drv)s", {'seg': segment_id, 'drv': allocating_driver}) diff --git a/networking_arista/tests/unit/ml2/test_arista_sync.py b/networking_arista/tests/unit/ml2/test_arista_sync.py index 3f6b9d28..e9cbbe0c 100644 --- a/networking_arista/tests/unit/ml2/test_arista_sync.py +++ b/networking_arista/tests/unit/ml2/test_arista_sync.py @@ -16,6 +16,7 @@ from eventlet import greenthread from eventlet import queue import mock +import time from neutron_lib.plugins import constants as plugin_constants from neutron_lib.plugins import directory @@ -25,6 +26,7 @@ from oslo_utils import importutils from neutron.tests.unit import testlib_api from networking_arista.common import constants as a_const +from networking_arista.common import exceptions as arista_exc from networking_arista.ml2 import arista_sync from networking_arista.ml2.mechanism_arista import MechResource from networking_arista.tests.unit import utils @@ -137,7 +139,7 @@ class SyncServiceTest(testlib_api.SqlTestCase): def test_full_sync_required(self): self.sync_service.initialize() - self.sync_service.cvx_uuid = 'old-id' + self.sync_service._cvx_uuid = 'old-id' self.sync_service._rpc = mock.MagicMock() self.sync_service._rpc.get_cvx_uuid.return_value = 'new-id' with mock.patch.object(self.sync_service, 'force_full_sync') as ffs: @@ -171,3 +173,18 @@ class SyncServiceTest(testlib_api.SqlTestCase): resource_type.delete_cvx_resources.assert_not_called() resource_type.create_cvx_resources.assert_not_called() self.sync_service._rpc.sync_end.assert_not_called() + + def test_sync_lock_release_on_failure(self): + self.sync_service.initialize() + self.sync_service._rpc = mock.MagicMock() + self.sync_service.update_neutron_resource = mock.Mock( + side_effect=arista_exc.AristaRpcError(msg='fail')) + resource = MechResource('tid', a_const.TENANT_RESOURCE, a_const.CREATE) + self.sync_service.start() + self.sync_service._last_sync_time = time.time() + self.sync_service._cvx_uuid = 'cvx-id' + self.mech_queue.put(resource) + greenthread.sleep(0) + self.sync_service.stop() + self.sync_service.wait() + self.sync_service._rpc.sync_end.assert_called_once()