276 lines
11 KiB
Python
276 lines
11 KiB
Python
# Copyright (c) 2014 OpenStack Foundation
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
|
# implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
import os
|
|
import random
|
|
import time
|
|
|
|
from eventlet import event
|
|
from eventlet import greenthread
|
|
from six.moves.queue import Empty
|
|
|
|
from neutron_lib import worker
|
|
from oslo_config import cfg
|
|
from oslo_log import log as logging
|
|
|
|
from networking_arista.common import constants as a_const
|
|
from networking_arista.ml2 import arista_resources as resources
|
|
from networking_arista.ml2.rpc.arista_json import AristaRPCWrapperJSON
|
|
|
|
LOG = logging.getLogger(__name__)
|
|
|
|
|
|
class AristaSyncWorker(worker.BaseWorker):
|
|
def __init__(self, provision_queue):
|
|
super(AristaSyncWorker, self).__init__(worker_process_count=1)
|
|
self._rpc = AristaRPCWrapperJSON()
|
|
self.provision_queue = provision_queue
|
|
self._thread = None
|
|
self._running = False
|
|
self.done = event.Event()
|
|
self._sync_interval = cfg.CONF.ml2_arista.sync_interval
|
|
|
|
def initialize(self):
|
|
self._last_sync_time = 0
|
|
self._cvx_uuid = None
|
|
self._synchronizing_uuid = None
|
|
self._in_full_sync = False
|
|
self._resources_to_update = list()
|
|
|
|
self.tenants = resources.Tenants(self._rpc)
|
|
self.networks = resources.Networks(self._rpc)
|
|
self.segments = resources.Segments(self._rpc)
|
|
self.dhcps = resources.Dhcps(self._rpc)
|
|
self.routers = resources.Routers(self._rpc)
|
|
self.vms = resources.Vms(self._rpc)
|
|
self.baremetals = resources.Baremetals(self._rpc)
|
|
self.dhcp_ports = resources.DhcpPorts(self._rpc)
|
|
self.router_ports = resources.RouterPorts(self._rpc)
|
|
self.vm_ports = resources.VmPorts(self._rpc)
|
|
self.baremetal_ports = resources.BaremetalPorts(self._rpc)
|
|
self.port_bindings = resources.PortBindings(self._rpc)
|
|
|
|
# Sync order is important because of entity dependencies:
|
|
# PortBinding -> Port -> Instance -> Tenant
|
|
# -> Segment -> Network -> Tenant
|
|
self.sync_order = [self.tenants,
|
|
self.networks,
|
|
self.segments,
|
|
self.dhcps,
|
|
self.routers,
|
|
self.vms,
|
|
self.baremetals,
|
|
self.dhcp_ports,
|
|
self.router_ports,
|
|
self.vm_ports,
|
|
self.baremetal_ports,
|
|
self.port_bindings]
|
|
|
|
def _on_done(self, gt, *args, **kwargs):
|
|
self._thread = None
|
|
self._running = False
|
|
|
|
def start(self):
|
|
if self._thread is not None:
|
|
LOG.warning('Arista sync loop has already been started')
|
|
return
|
|
|
|
LOG.info("Arista sync worker started")
|
|
super(AristaSyncWorker, self).start()
|
|
self.initialize()
|
|
self._running = True
|
|
LOG.info("Spawning Arista sync loop")
|
|
self._thread = greenthread.spawn(self.sync_loop)
|
|
self._thread.link(self._on_done)
|
|
|
|
def stop(self, graceful=True):
|
|
if graceful:
|
|
self._running = False
|
|
else:
|
|
self._thread.kill()
|
|
|
|
def wait(self):
|
|
return self.done.wait()
|
|
|
|
def reset(self):
|
|
self.stop()
|
|
self.wait()
|
|
self.start()
|
|
|
|
def get_resource_class(self, resource_type):
|
|
class_map = {a_const.TENANT_RESOURCE: self.tenants,
|
|
a_const.NETWORK_RESOURCE: self.networks,
|
|
a_const.SEGMENT_RESOURCE: self.segments,
|
|
a_const.DHCP_RESOURCE: self.dhcps,
|
|
a_const.ROUTER_RESOURCE: self.routers,
|
|
a_const.VM_RESOURCE: self.vms,
|
|
a_const.BAREMETAL_RESOURCE: self.baremetals,
|
|
a_const.DHCP_PORT_RESOURCE: self.dhcp_ports,
|
|
a_const.ROUTER_PORT_RESOURCE: self.router_ports,
|
|
a_const.VM_PORT_RESOURCE: self.vm_ports,
|
|
a_const.BAREMETAL_PORT_RESOURCE: self.baremetal_ports,
|
|
a_const.PORT_BINDING_RESOURCE: self.port_bindings}
|
|
return class_map[resource_type]
|
|
|
|
def update_neutron_resource(self, resource):
|
|
LOG.debug("%(pid)s %(action)s %(rtype)s with id %(id)s",
|
|
{'action': resource.action,
|
|
'rtype': resource.resource_type,
|
|
'id': resource.id,
|
|
'pid': os.getpid()})
|
|
resource_class = self.get_resource_class(resource.resource_type)
|
|
resource_class.update_neutron_resource(resource.id, resource.action)
|
|
for resource_type, resource_id in resource.related_resources:
|
|
LOG.debug("%(pid)s %(action)s requisite %(rtype)s with id %(id)s",
|
|
{'action': resource.action,
|
|
'rtype': resource_type,
|
|
'id': resource_id,
|
|
'pid': os.getpid()})
|
|
resource_class = self.get_resource_class(resource_type)
|
|
resource_class.update_neutron_resource(resource_id,
|
|
resource.action)
|
|
|
|
def force_full_sync(self):
|
|
self._in_full_sync = True
|
|
for resource_type in reversed(self.sync_order):
|
|
resource_type.clear_all_data()
|
|
|
|
def check_if_out_of_sync(self):
|
|
cvx_uuid = self._rpc.get_cvx_uuid()
|
|
out_of_sync = False
|
|
if self._cvx_uuid != cvx_uuid:
|
|
LOG.info("%(pid)s Initiating full sync - local uuid %(l_uuid)s"
|
|
" - cvx uuid %(c_uuid)s",
|
|
{'l_uuid': self._cvx_uuid,
|
|
'c_uuid': cvx_uuid,
|
|
'pid': os.getpid()})
|
|
self.force_full_sync()
|
|
self._synchronizing_uuid = cvx_uuid
|
|
out_of_sync = True
|
|
self._last_sync_time = time.time()
|
|
return out_of_sync
|
|
|
|
def wait_for_mech_driver_update(self, timeout):
|
|
try:
|
|
resource = self.provision_queue.get(timeout=timeout)
|
|
LOG.info("%(pid)s Processing %(res)s", {'res': resource,
|
|
'pid': os.getpid()})
|
|
self._resources_to_update.append(resource)
|
|
except Empty:
|
|
pass
|
|
return len(self._resources_to_update) > 0
|
|
|
|
def wait_for_sync_required(self):
|
|
timeout = (self._sync_interval -
|
|
(time.time() - self._last_sync_time))
|
|
LOG.info("%(pid)s Arista Sync time %(time)s last sync %(last_sync)s "
|
|
"timeout %(timeout)s", {'time': time.time(),
|
|
'last_sync': self._last_sync_time,
|
|
'timeout': timeout,
|
|
'pid': os.getpid()})
|
|
if timeout < 0:
|
|
return self.check_if_out_of_sync()
|
|
else:
|
|
return self.wait_for_mech_driver_update(timeout)
|
|
|
|
def synchronize_resources(self):
|
|
"""Synchronize worker with CVX
|
|
|
|
All database queries must occur while the sync lock is held. This
|
|
tightly couples reads with writes and ensures that an older read
|
|
does not result in the last write. Eg:
|
|
|
|
Worker 1 reads (P1 created)
|
|
Worder 2 reads (P1 deleted)
|
|
Worker 2 writes (Delete P1 from CVX)
|
|
Worker 1 writes (Create P1 on CVX)
|
|
|
|
By ensuring that all reads occur with the sync lock held, we ensure
|
|
that Worker 1 completes its writes before Worker2 is allowed to read.
|
|
A failure to write results in a full resync and purges all reads from
|
|
memory.
|
|
|
|
It is also important that we compute resources to sync in reverse sync
|
|
order in order to avoid missing dependencies on creation. Eg:
|
|
|
|
If we query in sync order
|
|
1. Query Instances -> I1 isn't there
|
|
2. Query Port table -> Port P1 is there, connected to I1
|
|
3. We send P1 to CVX without sending I1 -> Error raised
|
|
|
|
But if we query P1 first:
|
|
1. Query Ports P1 -> P1 is not there
|
|
2. Query Instances -> find I1
|
|
3. We create I1, not P1 -> harmless, mech driver creates P1
|
|
|
|
Missing dependencies on deletion will helpfully result in the
|
|
dependent resource not being created:
|
|
1. Query Ports -> P1 is found
|
|
2. Query Instances -> I1 not found
|
|
3. Creating P1 fails on CVX
|
|
"""
|
|
# Grab the sync lock
|
|
if not self._rpc.sync_start():
|
|
LOG.info("%(pid)s Failed to grab the sync lock",
|
|
{'pid': os.getpid()})
|
|
return
|
|
|
|
for resource in self._resources_to_update:
|
|
self.update_neutron_resource(resource)
|
|
self._resources_to_update = list()
|
|
|
|
# Sync any necessary resources.
|
|
# We delete in reverse order and create in order to ensure that
|
|
# dependent resources are deleted before the resources they depend
|
|
# on and created after them
|
|
for resource_type in reversed(self.sync_order):
|
|
resource_type.delete_cvx_resources()
|
|
for resource_type in self.sync_order:
|
|
resource_type.create_cvx_resources()
|
|
|
|
# Release the sync lock
|
|
self._rpc.sync_end()
|
|
|
|
# Update local uuid if this was a full sync
|
|
if self._in_full_sync:
|
|
LOG.info("%(pid)s Full sync for cvx uuid %(uuid)s complete",
|
|
{'uuid': self._synchronizing_uuid,
|
|
'pid': os.getpid()})
|
|
self._cvx_uuid = self._synchronizing_uuid
|
|
|
|
def sync_loop(self):
|
|
while self._running:
|
|
try:
|
|
sync_required = self.wait_for_sync_required()
|
|
|
|
if sync_required:
|
|
self.synchronize_resources()
|
|
except Exception:
|
|
LOG.exception("%(pid)s Arista Sync failed",
|
|
{'pid': os.getpid()})
|
|
# Release sync lock if held and at least one full sync was
|
|
# successful with the current cvx uuid
|
|
if (self._rpc.current_sync_name is not None and
|
|
self._cvx_uuid is not None and
|
|
self._cvx_uuid == self._synchronizing_uuid):
|
|
self._rpc.sync_end()
|
|
self._synchronizing_uuid = None
|
|
self._in_full_sync = False
|
|
|
|
# Yield to avoid starvation
|
|
greenthread.sleep(random.random())
|
|
|
|
self.done.send(True)
|