Browse Source

Improve performance of sync under heavy load

This change includes 3 fixes:
1. Sync workers sleep for [0,1) seconds after every sync
2. Dependent resources are always synced in the same sync cycle
3. Sync lock is released on failure if at least one successful sync has occurred

Change-Id: I1ab75dcce69b68acf63c24d31a3e106ecc506fb3
(cherry picked from commit 84a118e718)
changes/76/683476/1 2019.1.1
Mitchell Jameson 2 years ago
committed by Nader Lahouti
parent
commit
ca4c18c3ba
  1. 18
      networking_arista/ml2/arista_sync.py
  2. 171
      networking_arista/ml2/mechanism_arista.py
  3. 19
      networking_arista/tests/unit/ml2/test_arista_sync.py

18
networking_arista/ml2/arista_sync.py

@ -14,6 +14,7 @@
# limitations under the License.
import os
import random
import time
from eventlet import event
@ -130,6 +131,15 @@ class AristaSyncWorker(worker.BaseWorker):
'pid': os.getpid()})
resource_class = self.get_resource_class(resource.resource_type)
resource_class.update_neutron_resource(resource.id, resource.action)
for resource_type, resource_id in resource.related_resources:
LOG.info("%(pid)s %(action)s requisite %(rtype) with id %(id)s",
{'action': resource.action,
'rtype': resource_type,
'id': resource_id,
'pid': os.getpid()})
resource_class = self.get_resource_class(resource_type)
resource_class.update_neutron_resource(resource_id,
resource.action)
def force_full_sync(self):
for resource_type in reversed(self.sync_order):
@ -213,7 +223,6 @@ class AristaSyncWorker(worker.BaseWorker):
if not self._rpc.sync_start():
LOG.info("%(pid)s Failed to grab the sync lock",
{'pid': os.getpid()})
greenthread.sleep(1)
return
for resource in self._resources_to_update:
@ -250,10 +259,15 @@ class AristaSyncWorker(worker.BaseWorker):
except Exception:
LOG.exception("%(pid)s Arista Sync failed",
{'pid': os.getpid()})
# Release sync lock if held and at least one full sync was
# successful with the current cvx uuid
if (self._rpc.current_sync_name is not None and
self._cvx_uuid is not None):
self._rpc.sync_end()
self._cvx_uuid = None
self._synchronizing_uuid = None
# Yield to avoid starvation
greenthread.sleep(0)
greenthread.sleep(random.random())
self.done.send(True)

171
networking_arista/ml2/mechanism_arista.py

@ -46,10 +46,11 @@ def log_context(function, context):
class MechResource(object):
"""Container class for passing data to sync worker"""
def __init__(self, id, resource_type, action):
def __init__(self, id, resource_type, action, related_resources=None):
self.id = id
self.resource_type = resource_type
self.action = action
self.related_resources = related_resources or list()
def __str__(self):
return "%s %s ID: %s" % (self.action, self.resource_type, self.id)
@ -79,46 +80,38 @@ class AristaDriver(driver_api.MechanismDriver):
def get_workers(self):
return [arista_sync.AristaSyncWorker(self.provision_queue)]
def create_tenant(self, tenant_id):
"""Enqueue tenant create"""
t_res = MechResource(tenant_id, a_const.TENANT_RESOURCE,
a_const.CREATE if tenant_id
else a_const.FULL_SYNC)
self.provision_queue.put(t_res)
def delete_tenant_if_removed(self, tenant_id):
"""Enqueue tenant delete if it's no longer in the db"""
if not db_lib.tenant_provisioned(tenant_id):
t_res = MechResource(tenant_id, a_const.TENANT_RESOURCE,
a_const.DELETE if tenant_id
else a_const.FULL_SYNC)
self.provision_queue.put(t_res)
def create_network(self, network):
def create_network(self, network, segments):
"""Enqueue network create"""
n_res = MechResource(network['id'], a_const.NETWORK_RESOURCE,
a_const.CREATE)
tenant_id = network['project_id']
action = a_const.CREATE if tenant_id else a_const.FULL_SYNC
n_res = MechResource(network['id'], a_const.NETWORK_RESOURCE, action)
n_res.related_resources.append((a_const.TENANT_RESOURCE, tenant_id))
for segment in segments:
n_res.related_resources.append(
(a_const.SEGMENT_RESOURCE, segment['id']))
self.provision_queue.put(n_res)
def delete_network(self, network):
def delete_network(self, network, segments):
"""Enqueue network delete"""
n_res = MechResource(network['id'], a_const.NETWORK_RESOURCE,
a_const.DELETE)
self.provision_queue.put(n_res)
tenant_id = network['project_id']
action = a_const.DELETE if tenant_id else a_const.FULL_SYNC
n_res = MechResource(network['id'], a_const.NETWORK_RESOURCE, action)
def create_segments(self, segments):
"""Enqueue segment creates"""
for segment in segments:
s_res = MechResource(segment['id'], a_const.SEGMENT_RESOURCE,
a_const.CREATE)
self.provision_queue.put(s_res)
# Delete tenant if this was the last tenant resource
if not db_lib.tenant_provisioned(tenant_id):
n_res.related_resources.append(
(a_const.TENANT_RESOURCE, tenant_id))
def delete_segments(self, segments):
"""Enqueue segment deletes"""
for segment in segments:
s_res = MechResource(segment['id'], a_const.SEGMENT_RESOURCE,
a_const.DELETE)
self.provision_queue.put(s_res)
n_res.related_resources.append(
(a_const.SEGMENT_RESOURCE, segment['id']))
self.provision_queue.put(n_res)
def delete_segment(self, segment):
"""Enqueue segment delete"""
s_res = MechResource(segment['id'], a_const.SEGMENT_RESOURCE,
a_const.DELETE)
self.provision_queue.put(s_res)
def get_instance_type(self, port):
"""Determine the port type based on device owner and vnic type"""
@ -137,43 +130,6 @@ class AristaDriver(driver_api.MechanismDriver):
return a_const.VM_RESOURCE
return None
def create_instance(self, port):
"""Enqueue instance create"""
instance_type = self.get_instance_type(port)
if not instance_type:
return
i_res = MechResource(port['device_id'], instance_type, a_const.CREATE)
self.provision_queue.put(i_res)
def delete_instance_if_removed(self, port):
"""Enqueue instance delete if it's no longer in the db"""
instance_type = self.get_instance_type(port)
if not instance_type:
return
if not db_lib.instance_provisioned(port['device_id']):
i_res = MechResource(port['device_id'], instance_type,
a_const.DELETE)
self.provision_queue.put(i_res)
def create_port(self, port):
"""Enqueue port create"""
instance_type = self.get_instance_type(port)
if not instance_type:
return
port_type = instance_type + a_const.PORT_SUFFIX
p_res = MechResource(port['id'], port_type, a_const.CREATE)
self.provision_queue.put(p_res)
def delete_port_if_removed(self, port):
"""Enqueue port delete"""
instance_type = self.get_instance_type(port)
if not instance_type:
return
port_type = instance_type + a_const.PORT_SUFFIX
if not db_lib.port_provisioned(port['id']):
p_res = MechResource(port['id'], port_type, a_const.DELETE)
self.provision_queue.put(p_res)
def _get_binding_keys(self, port, host):
"""Get binding keys from the port binding"""
binding_keys = list()
@ -190,20 +146,46 @@ class AristaDriver(driver_api.MechanismDriver):
def create_port_binding(self, port, host):
"""Enqueue port binding create"""
if not self.get_instance_type(port):
tenant_id = port['project_id']
instance_type = self.get_instance_type(port)
if not instance_type:
return
port_type = instance_type + a_const.PORT_SUFFIX
action = a_const.CREATE if tenant_id else a_const.FULL_SYNC
related_resources = list()
related_resources.append((a_const.TENANT_RESOURCE, tenant_id))
related_resources.append((instance_type, port['device_id']))
related_resources.append((port_type, port['id']))
for pb_key in self._get_binding_keys(port, host):
pb_res = MechResource(pb_key, a_const.PORT_BINDING_RESOURCE,
a_const.CREATE)
action, related_resources=related_resources)
self.provision_queue.put(pb_res)
def delete_port_binding(self, port, host):
"""Enqueue port binding delete"""
if not self.get_instance_type(port):
tenant_id = port['project_id']
instance_type = self.get_instance_type(port)
if not instance_type:
return
port_type = instance_type + a_const.PORT_SUFFIX
action = a_const.DELETE if tenant_id else a_const.FULL_SYNC
related_resources = list()
# Delete tenant if this was the last tenant resource
if not db_lib.tenant_provisioned(tenant_id):
related_resources.append((a_const.TENANT_RESOURCE, tenant_id))
# Delete instance if this was the last instance port
if not db_lib.instance_provisioned(port['device_id']):
related_resources.append((instance_type, port['device_id']))
# Delete port if this was the last port binding
if not db_lib.port_provisioned(port['id']):
related_resources.append((port_type, port['id']))
for pb_key in self._get_binding_keys(port, host):
pb_res = MechResource(pb_key, a_const.PORT_BINDING_RESOURCE,
a_const.DELETE)
action, related_resources=related_resources)
self.provision_queue.put(pb_res)
def create_network_postcommit(self, context):
@ -213,17 +195,13 @@ class AristaDriver(driver_api.MechanismDriver):
log_context("create_network_postcommit: network", network)
segments = context.network_segments
tenant_id = network['project_id']
self.create_tenant(tenant_id)
self.create_network(network)
self.create_segments(segments)
self.create_network(network, segments)
def update_network_postcommit(self, context):
"""Send network updates to CVX:
- Update the network name
- Add new segments
- Delete stale segments
"""
network = context.current
orig_network = context.original
@ -233,10 +211,8 @@ class AristaDriver(driver_api.MechanismDriver):
segments = context.network_segments
self.create_network(network)
# New segments may have been added
self.create_segments(segments)
self.create_network(network, segments)
def delete_network_postcommit(self, context):
"""Delete the network from CVX"""
@ -245,18 +221,7 @@ class AristaDriver(driver_api.MechanismDriver):
log_context("delete_network_postcommit: network", network)
segments = context.network_segments
tenant_id = network['project_id']
self.delete_segments(segments)
self.delete_network(network)
self.delete_tenant_if_removed(tenant_id)
def _delete_port_resources(self, port, host):
tenant_id = port['project_id']
self.delete_port_binding(port, host)
self.delete_port_if_removed(port)
self.delete_instance_if_removed(port)
self.delete_tenant_if_removed(tenant_id)
self.delete_network(network, segments)
def update_port_postcommit(self, context):
"""Send port updates to CVX
@ -271,27 +236,21 @@ class AristaDriver(driver_api.MechanismDriver):
log_context("update_port_postcommit: port", port)
log_context("update_port_postcommit: orig", orig_port)
tenant_id = port['project_id']
# Device id can change without a port going DOWN, but the new device
# id may not be supported
if orig_port and port['device_id'] != orig_port['device_id']:
self._delete_port_resources(orig_port, context.original_host)
self.delete_port_binding(orig_port, context.original_host)
if context.status == n_const.PORT_STATUS_DOWN:
if (context.original_host and
context.status != context.original_status):
self._delete_port_resources(orig_port, context.original_host)
self.delete_port_binding(orig_port, context.original_host)
self._try_to_release_dynamic_segment(context, migration=True)
else:
self.create_tenant(tenant_id)
self.create_network(network)
if context.binding_levels:
segments = [
level['bound_segment'] for level in context.binding_levels]
self.create_segments(segments)
self.create_instance(port)
self.create_port(port)
self.create_network(network, segments)
self.create_port_binding(port, context.host)
def delete_port_postcommit(self, context):
@ -300,7 +259,7 @@ class AristaDriver(driver_api.MechanismDriver):
log_context("delete_port_postcommit: port", port)
self._delete_port_resources(port, context.host)
self.delete_port_binding(port, context.host)
self._try_to_release_dynamic_segment(context)
def _bind_baremetal_port(self, context, segment):
@ -425,7 +384,7 @@ class AristaDriver(driver_api.MechanismDriver):
continue
if not db_lib.segment_bound(segment_id):
context.release_dynamic_segment(segment_id)
self.delete_segments([bound_segment])
self.delete_segment(bound_segment)
LOG.debug("Released dynamic segment %(seg)s allocated "
"by %(drv)s", {'seg': segment_id,
'drv': allocating_driver})

19
networking_arista/tests/unit/ml2/test_arista_sync.py

@ -16,6 +16,7 @@
from eventlet import greenthread
from eventlet import queue
import mock
import time
from neutron_lib.plugins import constants as plugin_constants
from neutron_lib.plugins import directory
@ -25,6 +26,7 @@ from oslo_utils import importutils
from neutron.tests.unit import testlib_api
from networking_arista.common import constants as a_const
from networking_arista.common import exceptions as arista_exc
from networking_arista.ml2 import arista_sync
from networking_arista.ml2.mechanism_arista import MechResource
from networking_arista.tests.unit import utils
@ -137,7 +139,7 @@ class SyncServiceTest(testlib_api.SqlTestCase):
def test_full_sync_required(self):
self.sync_service.initialize()
self.sync_service.cvx_uuid = 'old-id'
self.sync_service._cvx_uuid = 'old-id'
self.sync_service._rpc = mock.MagicMock()
self.sync_service._rpc.get_cvx_uuid.return_value = 'new-id'
with mock.patch.object(self.sync_service, 'force_full_sync') as ffs:
@ -171,3 +173,18 @@ class SyncServiceTest(testlib_api.SqlTestCase):
resource_type.delete_cvx_resources.assert_not_called()
resource_type.create_cvx_resources.assert_not_called()
self.sync_service._rpc.sync_end.assert_not_called()
def test_sync_lock_release_on_failure(self):
self.sync_service.initialize()
self.sync_service._rpc = mock.MagicMock()
self.sync_service.update_neutron_resource = mock.Mock(
side_effect=arista_exc.AristaRpcError(msg='fail'))
resource = MechResource('tid', a_const.TENANT_RESOURCE, a_const.CREATE)
self.sync_service.start()
self.sync_service._last_sync_time = time.time()
self.sync_service._cvx_uuid = 'cvx-id'
self.mech_queue.put(resource)
greenthread.sleep(0)
self.sync_service.stop()
self.sync_service.wait()
self.sync_service._rpc.sync_end.assert_called_once()

Loading…
Cancel
Save