Improve performance of sync under heavy load
This change includes 3 fixes: 1. Sync workers sleep for [0,1) seconds after every sync 2. Dependent resources are always synced in the same sync cycle 3. Sync lock is released on failure if at least one successful sync has occurred Change-Id: I1ab75dcce69b68acf63c24d31a3e106ecc506fb3changes/48/683248/2
parent
9c1cfc27a5
commit
84a118e718
|
@ -14,6 +14,7 @@
|
|||
# limitations under the License.
|
||||
|
||||
import os
|
||||
import random
|
||||
import time
|
||||
|
||||
from eventlet import event
|
||||
|
@ -130,6 +131,15 @@ class AristaSyncWorker(worker.BaseWorker):
|
|||
'pid': os.getpid()})
|
||||
resource_class = self.get_resource_class(resource.resource_type)
|
||||
resource_class.update_neutron_resource(resource.id, resource.action)
|
||||
for resource_type, resource_id in resource.related_resources:
|
||||
LOG.info("%(pid)s %(action)s requisite %(rtype) with id %(id)s",
|
||||
{'action': resource.action,
|
||||
'rtype': resource_type,
|
||||
'id': resource_id,
|
||||
'pid': os.getpid()})
|
||||
resource_class = self.get_resource_class(resource_type)
|
||||
resource_class.update_neutron_resource(resource_id,
|
||||
resource.action)
|
||||
|
||||
def force_full_sync(self):
|
||||
for resource_type in reversed(self.sync_order):
|
||||
|
@ -213,7 +223,6 @@ class AristaSyncWorker(worker.BaseWorker):
|
|||
if not self._rpc.sync_start():
|
||||
LOG.info("%(pid)s Failed to grab the sync lock",
|
||||
{'pid': os.getpid()})
|
||||
greenthread.sleep(1)
|
||||
return
|
||||
|
||||
for resource in self._resources_to_update:
|
||||
|
@ -250,10 +259,15 @@ class AristaSyncWorker(worker.BaseWorker):
|
|||
except Exception:
|
||||
LOG.exception("%(pid)s Arista Sync failed",
|
||||
{'pid': os.getpid()})
|
||||
# Release sync lock if held and at least one full sync was
|
||||
# successful with the current cvx uuid
|
||||
if (self._rpc.current_sync_name is not None and
|
||||
self._cvx_uuid is not None):
|
||||
self._rpc.sync_end()
|
||||
self._cvx_uuid = None
|
||||
self._synchronizing_uuid = None
|
||||
|
||||
# Yield to avoid starvation
|
||||
greenthread.sleep(0)
|
||||
greenthread.sleep(random.random())
|
||||
|
||||
self.done.send(True)
|
||||
|
|
|
@ -46,10 +46,11 @@ def log_context(function, context):
|
|||
class MechResource(object):
|
||||
"""Container class for passing data to sync worker"""
|
||||
|
||||
def __init__(self, id, resource_type, action):
|
||||
def __init__(self, id, resource_type, action, related_resources=None):
|
||||
self.id = id
|
||||
self.resource_type = resource_type
|
||||
self.action = action
|
||||
self.related_resources = related_resources or list()
|
||||
|
||||
def __str__(self):
|
||||
return "%s %s ID: %s" % (self.action, self.resource_type, self.id)
|
||||
|
@ -81,46 +82,38 @@ class AristaDriver(driver_api.MechanismDriver):
|
|||
def get_workers(self):
|
||||
return [arista_sync.AristaSyncWorker(self.provision_queue)]
|
||||
|
||||
def create_tenant(self, tenant_id):
|
||||
"""Enqueue tenant create"""
|
||||
t_res = MechResource(tenant_id, a_const.TENANT_RESOURCE,
|
||||
a_const.CREATE if tenant_id
|
||||
else a_const.FULL_SYNC)
|
||||
self.provision_queue.put(t_res)
|
||||
|
||||
def delete_tenant_if_removed(self, tenant_id):
|
||||
"""Enqueue tenant delete if it's no longer in the db"""
|
||||
if not db_lib.tenant_provisioned(tenant_id):
|
||||
t_res = MechResource(tenant_id, a_const.TENANT_RESOURCE,
|
||||
a_const.DELETE if tenant_id
|
||||
else a_const.FULL_SYNC)
|
||||
self.provision_queue.put(t_res)
|
||||
|
||||
def create_network(self, network):
|
||||
def create_network(self, network, segments):
|
||||
"""Enqueue network create"""
|
||||
n_res = MechResource(network['id'], a_const.NETWORK_RESOURCE,
|
||||
a_const.CREATE)
|
||||
tenant_id = network['project_id']
|
||||
action = a_const.CREATE if tenant_id else a_const.FULL_SYNC
|
||||
n_res = MechResource(network['id'], a_const.NETWORK_RESOURCE, action)
|
||||
n_res.related_resources.append((a_const.TENANT_RESOURCE, tenant_id))
|
||||
for segment in segments:
|
||||
n_res.related_resources.append(
|
||||
(a_const.SEGMENT_RESOURCE, segment['id']))
|
||||
self.provision_queue.put(n_res)
|
||||
|
||||
def delete_network(self, network):
|
||||
def delete_network(self, network, segments):
|
||||
"""Enqueue network delete"""
|
||||
n_res = MechResource(network['id'], a_const.NETWORK_RESOURCE,
|
||||
a_const.DELETE)
|
||||
tenant_id = network['project_id']
|
||||
action = a_const.DELETE if tenant_id else a_const.FULL_SYNC
|
||||
n_res = MechResource(network['id'], a_const.NETWORK_RESOURCE, action)
|
||||
|
||||
# Delete tenant if this was the last tenant resource
|
||||
if not db_lib.tenant_provisioned(tenant_id):
|
||||
n_res.related_resources.append(
|
||||
(a_const.TENANT_RESOURCE, tenant_id))
|
||||
|
||||
for segment in segments:
|
||||
n_res.related_resources.append(
|
||||
(a_const.SEGMENT_RESOURCE, segment['id']))
|
||||
self.provision_queue.put(n_res)
|
||||
|
||||
def create_segments(self, segments):
|
||||
"""Enqueue segment creates"""
|
||||
for segment in segments:
|
||||
s_res = MechResource(segment['id'], a_const.SEGMENT_RESOURCE,
|
||||
a_const.CREATE)
|
||||
self.provision_queue.put(s_res)
|
||||
|
||||
def delete_segments(self, segments):
|
||||
"""Enqueue segment deletes"""
|
||||
for segment in segments:
|
||||
s_res = MechResource(segment['id'], a_const.SEGMENT_RESOURCE,
|
||||
a_const.DELETE)
|
||||
self.provision_queue.put(s_res)
|
||||
def delete_segment(self, segment):
|
||||
"""Enqueue segment delete"""
|
||||
s_res = MechResource(segment['id'], a_const.SEGMENT_RESOURCE,
|
||||
a_const.DELETE)
|
||||
self.provision_queue.put(s_res)
|
||||
|
||||
def get_instance_type(self, port):
|
||||
"""Determine the port type based on device owner and vnic type"""
|
||||
|
@ -139,43 +132,6 @@ class AristaDriver(driver_api.MechanismDriver):
|
|||
return a_const.VM_RESOURCE
|
||||
return None
|
||||
|
||||
def create_instance(self, port):
|
||||
"""Enqueue instance create"""
|
||||
instance_type = self.get_instance_type(port)
|
||||
if not instance_type:
|
||||
return
|
||||
i_res = MechResource(port['device_id'], instance_type, a_const.CREATE)
|
||||
self.provision_queue.put(i_res)
|
||||
|
||||
def delete_instance_if_removed(self, port):
|
||||
"""Enqueue instance delete if it's no longer in the db"""
|
||||
instance_type = self.get_instance_type(port)
|
||||
if not instance_type:
|
||||
return
|
||||
if not db_lib.instance_provisioned(port['device_id']):
|
||||
i_res = MechResource(port['device_id'], instance_type,
|
||||
a_const.DELETE)
|
||||
self.provision_queue.put(i_res)
|
||||
|
||||
def create_port(self, port):
|
||||
"""Enqueue port create"""
|
||||
instance_type = self.get_instance_type(port)
|
||||
if not instance_type:
|
||||
return
|
||||
port_type = instance_type + a_const.PORT_SUFFIX
|
||||
p_res = MechResource(port['id'], port_type, a_const.CREATE)
|
||||
self.provision_queue.put(p_res)
|
||||
|
||||
def delete_port_if_removed(self, port):
|
||||
"""Enqueue port delete"""
|
||||
instance_type = self.get_instance_type(port)
|
||||
if not instance_type:
|
||||
return
|
||||
port_type = instance_type + a_const.PORT_SUFFIX
|
||||
if not db_lib.port_provisioned(port['id']):
|
||||
p_res = MechResource(port['id'], port_type, a_const.DELETE)
|
||||
self.provision_queue.put(p_res)
|
||||
|
||||
def _get_binding_keys(self, port, host):
|
||||
"""Get binding keys from the port binding"""
|
||||
binding_keys = list()
|
||||
|
@ -192,20 +148,46 @@ class AristaDriver(driver_api.MechanismDriver):
|
|||
|
||||
def create_port_binding(self, port, host):
|
||||
"""Enqueue port binding create"""
|
||||
if not self.get_instance_type(port):
|
||||
tenant_id = port['project_id']
|
||||
instance_type = self.get_instance_type(port)
|
||||
if not instance_type:
|
||||
return
|
||||
port_type = instance_type + a_const.PORT_SUFFIX
|
||||
action = a_const.CREATE if tenant_id else a_const.FULL_SYNC
|
||||
related_resources = list()
|
||||
related_resources.append((a_const.TENANT_RESOURCE, tenant_id))
|
||||
related_resources.append((instance_type, port['device_id']))
|
||||
related_resources.append((port_type, port['id']))
|
||||
for pb_key in self._get_binding_keys(port, host):
|
||||
pb_res = MechResource(pb_key, a_const.PORT_BINDING_RESOURCE,
|
||||
a_const.CREATE)
|
||||
action, related_resources=related_resources)
|
||||
self.provision_queue.put(pb_res)
|
||||
|
||||
def delete_port_binding(self, port, host):
|
||||
"""Enqueue port binding delete"""
|
||||
if not self.get_instance_type(port):
|
||||
tenant_id = port['project_id']
|
||||
instance_type = self.get_instance_type(port)
|
||||
if not instance_type:
|
||||
return
|
||||
port_type = instance_type + a_const.PORT_SUFFIX
|
||||
action = a_const.DELETE if tenant_id else a_const.FULL_SYNC
|
||||
related_resources = list()
|
||||
|
||||
# Delete tenant if this was the last tenant resource
|
||||
if not db_lib.tenant_provisioned(tenant_id):
|
||||
related_resources.append((a_const.TENANT_RESOURCE, tenant_id))
|
||||
|
||||
# Delete instance if this was the last instance port
|
||||
if not db_lib.instance_provisioned(port['device_id']):
|
||||
related_resources.append((instance_type, port['device_id']))
|
||||
|
||||
# Delete port if this was the last port binding
|
||||
if not db_lib.port_provisioned(port['id']):
|
||||
related_resources.append((port_type, port['id']))
|
||||
|
||||
for pb_key in self._get_binding_keys(port, host):
|
||||
pb_res = MechResource(pb_key, a_const.PORT_BINDING_RESOURCE,
|
||||
a_const.DELETE)
|
||||
action, related_resources=related_resources)
|
||||
self.provision_queue.put(pb_res)
|
||||
|
||||
def create_network_postcommit(self, context):
|
||||
|
@ -215,17 +197,13 @@ class AristaDriver(driver_api.MechanismDriver):
|
|||
log_context("create_network_postcommit: network", network)
|
||||
|
||||
segments = context.network_segments
|
||||
tenant_id = network['project_id']
|
||||
self.create_tenant(tenant_id)
|
||||
self.create_network(network)
|
||||
self.create_segments(segments)
|
||||
self.create_network(network, segments)
|
||||
|
||||
def update_network_postcommit(self, context):
|
||||
"""Send network updates to CVX:
|
||||
|
||||
- Update the network name
|
||||
- Add new segments
|
||||
- Delete stale segments
|
||||
"""
|
||||
network = context.current
|
||||
orig_network = context.original
|
||||
|
@ -235,10 +213,8 @@ class AristaDriver(driver_api.MechanismDriver):
|
|||
|
||||
segments = context.network_segments
|
||||
|
||||
self.create_network(network)
|
||||
|
||||
# New segments may have been added
|
||||
self.create_segments(segments)
|
||||
self.create_network(network, segments)
|
||||
|
||||
def delete_network_postcommit(self, context):
|
||||
"""Delete the network from CVX"""
|
||||
|
@ -247,18 +223,7 @@ class AristaDriver(driver_api.MechanismDriver):
|
|||
log_context("delete_network_postcommit: network", network)
|
||||
|
||||
segments = context.network_segments
|
||||
tenant_id = network['project_id']
|
||||
self.delete_segments(segments)
|
||||
self.delete_network(network)
|
||||
self.delete_tenant_if_removed(tenant_id)
|
||||
|
||||
def _delete_port_resources(self, port, host):
|
||||
tenant_id = port['project_id']
|
||||
|
||||
self.delete_port_binding(port, host)
|
||||
self.delete_port_if_removed(port)
|
||||
self.delete_instance_if_removed(port)
|
||||
self.delete_tenant_if_removed(tenant_id)
|
||||
self.delete_network(network, segments)
|
||||
|
||||
def update_port_postcommit(self, context):
|
||||
"""Send port updates to CVX
|
||||
|
@ -273,27 +238,21 @@ class AristaDriver(driver_api.MechanismDriver):
|
|||
log_context("update_port_postcommit: port", port)
|
||||
log_context("update_port_postcommit: orig", orig_port)
|
||||
|
||||
tenant_id = port['project_id']
|
||||
|
||||
# Device id can change without a port going DOWN, but the new device
|
||||
# id may not be supported
|
||||
if orig_port and port['device_id'] != orig_port['device_id']:
|
||||
self._delete_port_resources(orig_port, context.original_host)
|
||||
self.delete_port_binding(orig_port, context.original_host)
|
||||
|
||||
if context.status == n_const.PORT_STATUS_DOWN:
|
||||
if (context.original_host and
|
||||
context.status != context.original_status):
|
||||
self._delete_port_resources(orig_port, context.original_host)
|
||||
self.delete_port_binding(orig_port, context.original_host)
|
||||
self._try_to_release_dynamic_segment(context, migration=True)
|
||||
else:
|
||||
self.create_tenant(tenant_id)
|
||||
self.create_network(network)
|
||||
if context.binding_levels:
|
||||
segments = [
|
||||
level['bound_segment'] for level in context.binding_levels]
|
||||
self.create_segments(segments)
|
||||
self.create_instance(port)
|
||||
self.create_port(port)
|
||||
self.create_network(network, segments)
|
||||
self.create_port_binding(port, context.host)
|
||||
|
||||
def delete_port_postcommit(self, context):
|
||||
|
@ -302,7 +261,7 @@ class AristaDriver(driver_api.MechanismDriver):
|
|||
|
||||
log_context("delete_port_postcommit: port", port)
|
||||
|
||||
self._delete_port_resources(port, context.host)
|
||||
self.delete_port_binding(port, context.host)
|
||||
self._try_to_release_dynamic_segment(context)
|
||||
|
||||
def _bind_baremetal_port(self, context, segment):
|
||||
|
@ -427,7 +386,7 @@ class AristaDriver(driver_api.MechanismDriver):
|
|||
continue
|
||||
if not db_lib.segment_bound(segment_id):
|
||||
context.release_dynamic_segment(segment_id)
|
||||
self.delete_segments([bound_segment])
|
||||
self.delete_segment(bound_segment)
|
||||
LOG.debug("Released dynamic segment %(seg)s allocated "
|
||||
"by %(drv)s", {'seg': segment_id,
|
||||
'drv': allocating_driver})
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
from eventlet import greenthread
|
||||
from eventlet import queue
|
||||
import mock
|
||||
import time
|
||||
|
||||
from neutron_lib.plugins import constants as plugin_constants
|
||||
from neutron_lib.plugins import directory
|
||||
|
@ -25,6 +26,7 @@ from oslo_utils import importutils
|
|||
from neutron.tests.unit import testlib_api
|
||||
|
||||
from networking_arista.common import constants as a_const
|
||||
from networking_arista.common import exceptions as arista_exc
|
||||
from networking_arista.ml2 import arista_sync
|
||||
from networking_arista.ml2.mechanism_arista import MechResource
|
||||
from networking_arista.tests.unit import utils
|
||||
|
@ -137,7 +139,7 @@ class SyncServiceTest(testlib_api.SqlTestCase):
|
|||
|
||||
def test_full_sync_required(self):
|
||||
self.sync_service.initialize()
|
||||
self.sync_service.cvx_uuid = 'old-id'
|
||||
self.sync_service._cvx_uuid = 'old-id'
|
||||
self.sync_service._rpc = mock.MagicMock()
|
||||
self.sync_service._rpc.get_cvx_uuid.return_value = 'new-id'
|
||||
with mock.patch.object(self.sync_service, 'force_full_sync') as ffs:
|
||||
|
@ -171,3 +173,18 @@ class SyncServiceTest(testlib_api.SqlTestCase):
|
|||
resource_type.delete_cvx_resources.assert_not_called()
|
||||
resource_type.create_cvx_resources.assert_not_called()
|
||||
self.sync_service._rpc.sync_end.assert_not_called()
|
||||
|
||||
def test_sync_lock_release_on_failure(self):
|
||||
self.sync_service.initialize()
|
||||
self.sync_service._rpc = mock.MagicMock()
|
||||
self.sync_service.update_neutron_resource = mock.Mock(
|
||||
side_effect=arista_exc.AristaRpcError(msg='fail'))
|
||||
resource = MechResource('tid', a_const.TENANT_RESOURCE, a_const.CREATE)
|
||||
self.sync_service.start()
|
||||
self.sync_service._last_sync_time = time.time()
|
||||
self.sync_service._cvx_uuid = 'cvx-id'
|
||||
self.mech_queue.put(resource)
|
||||
greenthread.sleep(0)
|
||||
self.sync_service.stop()
|
||||
self.sync_service.wait()
|
||||
self.sync_service._rpc.sync_end.assert_called_once()
|
||||
|
|
Loading…
Reference in New Issue