neutron/neutron/api/rpc/callbacks/version_manager.py

264 lines
11 KiB
Python

# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
import copy
import pprint
import time
from neutron_lib.plugins import directory
from oslo_log import log as logging
from oslo_utils import importutils
from neutron.api.rpc.callbacks import exceptions
LOG = logging.getLogger(__name__)
VERSIONS_TTL = 60
# NOTE(mangelajo): if we import this globally we end up with a (very
# long) circular dependency, this can be fixed if we
# stop importing all exposed classes in
# neutron.api.rpc.callbacks.resources and provide
# a decorator to expose classes
def _import_resources():
return importutils.import_module('neutron.api.rpc.callbacks.resources')
def _import_agents_db():
return importutils.import_module('neutron.db.agents_db')
AgentConsumer = collections.namedtuple('AgentConsumer', ['agent_type',
'host'])
AgentConsumer.__repr__ = lambda self: '%s@%s' % self
class ResourceConsumerTracker(object):
"""Class passed down to collect consumer's resource versions.
This class is responsible for fetching the local versions of
resources, and letting the called function register every consumer's
resource version.
This class is passed down to the plugin get_agents_resource_versions
currently, as the only expected consumers are agents so far.
Later on, this class can also be used to recalculate, for each
resource type, the collection of versions that are local or
known by one or more consumers.
"""
def __init__(self):
# Initialize with the local (server) versions, as we always want
# to send those. Agents, as they upgrade, will need the latest version,
# and there is a corner case we'd not be covering otherwise:
# 1) one or several neutron-servers get disconnected from rpc (while
# running)
# 2) a new agent comes up, with the latest version and it reports
# 2 ways:
# a) via status report (which will be stored in the database)
# b) via fanout call to all neutron servers, this way, all of them
# get their version set updated right away without the need to
# re-fetch anything from the database.
# 3) the neutron-servers get back online to the rpc bus, but they
# lost the fanout message.
#
# TODO(mangelajo) To cover this case we may need a callback from oslo
# messaging to get notified about disconnections/reconnections to the
# rpc bus, invalidating the consumer version cache when we receive such
# callback.
self._versions = self._get_local_resource_versions()
self._versions_by_consumer = collections.defaultdict(dict)
self._needs_recalculation = False
self.last_report = None
def _get_local_resource_versions(self):
resources = _import_resources()
local_resource_versions = collections.defaultdict(set)
for resource_type, version in (
resources.LOCAL_RESOURCE_VERSIONS.items()):
local_resource_versions[resource_type].add(version)
return local_resource_versions
# TODO(mangelajo): add locking with _recalculate_versions if we ever
# move out of green threads.
def _set_version(self, consumer, resource_type, version):
"""Set or update a consumer resource type version."""
self._versions[resource_type].add(version)
consumer_versions = self._versions_by_consumer[consumer]
prev_version = consumer_versions.get(resource_type, None)
if version:
consumer_versions[resource_type] = version
else:
consumer_versions.pop(resource_type, None)
if prev_version != version:
# If a version got updated/changed in a consumer, we need to
# recalculate the main dictionary of versions based on the
# new _versions_by_consumer.
# We defer the recalculation until every consumer version has
# been set for all of its resource types.
self._needs_recalculation = True
LOG.debug("Version for resource type %(resource_type)s changed "
"%(prev_version)s to %(version)s on "
"consumer %(consumer)s",
{'resource_type': resource_type,
'version': version,
'prev_version': prev_version,
'consumer': consumer})
def set_versions(self, consumer, versions):
"""Set or update an specific consumer resource types.
:param consumer: should be an AgentConsumer object, with agent_type
and host set. This acts as the unique ID for the
agent.
:param versions: should be a dictionary in the following format:
{'QosPolicy': '1.1',
'SecurityGroup': '1.0',
'Port': '1.0'}
"""
for resource_type, resource_version in versions.items():
self._set_version(consumer, resource_type,
resource_version)
if versions:
self._cleanup_removed_versions(consumer, versions)
else:
self._handle_no_set_versions(consumer)
def _cleanup_removed_versions(self, consumer, versions):
"""Check if any version report has been removed, and cleanup."""
prev_resource_types = set(
self._versions_by_consumer[consumer].keys())
cur_resource_types = set(versions.keys())
removed_resource_types = prev_resource_types - cur_resource_types
if removed_resource_types:
LOG.debug("Removing stale tracked versions: %s",
removed_resource_types)
for resource_type in removed_resource_types:
self._set_version(consumer, resource_type, None)
def _handle_no_set_versions(self, consumer):
"""Handle consumers reporting no versions."""
if self._versions_by_consumer[consumer]:
self._needs_recalculation = True
LOG.debug("Clearing versions for consumer %s", consumer)
self._versions_by_consumer[consumer] = {}
def get_resource_versions(self, resource_type):
"""Fetch the versions necessary to notify all consumers."""
if self._needs_recalculation:
self._recalculate_versions()
self._needs_recalculation = False
return copy.copy(self._versions[resource_type])
def report(self):
"""Output debug information about the consumer versions."""
def _format(versions):
return pprint.pformat(dict(versions), indent=4)
debug_dict = {'pushed_versions': _format(self._versions),
'consumer_versions': _format(self._versions_by_consumer)}
if self.last_report != debug_dict:
self.last_report = debug_dict
LOG.debug('Tracked resource versions report:\n'
'pushed versions:\n%(pushed_versions)s\n\n'
'consumer versions:\n%(consumer_versions)s\n',
debug_dict)
# TODO(mangelajo): Add locking if we ever move out of greenthreads.
def _recalculate_versions(self):
"""Recalculate the _versions set.
Re-fetch the local (server) versions and expand with consumers'
versions.
"""
versions = self._get_local_resource_versions()
for versions_dict in self._versions_by_consumer.values():
for res_type, res_version in versions_dict.items():
versions[res_type].add(res_version)
self._versions = versions
class CachedResourceConsumerTracker(object):
"""This class takes care of the caching logic of versions."""
def __init__(self):
# This is TTL expiration time, 0 means it will be expired at start
self._expires_at = 0
self._versions = ResourceConsumerTracker()
def _update_consumer_versions(self):
new_tracker = ResourceConsumerTracker()
neutron_plugin = directory.get_plugin()
agents_db = _import_agents_db()
# If you use RPC callbacks, your plugin needs to implement
# AgentsDbMixin so that we know which resource versions your
# agents consume via RPC, please note that rpc_callbacks are
# only designed to work with agents currently.
if isinstance(neutron_plugin, agents_db.AgentDbMixin):
neutron_plugin.get_agents_resource_versions(new_tracker)
else:
raise exceptions.NoAgentDbMixinImplemented()
# preserve last report state so we don't duplicate logs on refresh
new_tracker.last_report = self._versions.last_report
self._versions = new_tracker
self._versions.report()
def _check_expiration(self):
if time.time() > self._expires_at:
self._update_consumer_versions()
self._expires_at = time.time() + VERSIONS_TTL
def get_resource_versions(self, resource_type):
self._check_expiration()
return self._versions.get_resource_versions(resource_type)
def update_versions(self, consumer, resource_versions):
self._versions.set_versions(consumer, resource_versions)
def report(self):
self._check_expiration()
self._versions.report()
_cached_version_tracker = None
# NOTE(ajo): add locking if we ever stop using greenthreads
def _get_cached_tracker():
global _cached_version_tracker
if not _cached_version_tracker:
_cached_version_tracker = CachedResourceConsumerTracker()
return _cached_version_tracker
def get_resource_versions(resource_type):
"""Return the set of versions expected by the consumers of a resource."""
return _get_cached_tracker().get_resource_versions(resource_type)
def update_versions(consumer, resource_versions):
"""Update the resources' versions for a consumer id."""
_get_cached_tracker().update_versions(consumer, resource_versions)
def report():
"""Report resource versions in debug logs."""
_get_cached_tracker().report()