You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

256 lines
11 KiB

# All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron_lib.callbacks import events
from neutron_lib.callbacks import registry
from neutron_lib import context as n_ctx
from neutron_lib import rpc as n_rpc
from oslo_log import log as logging
from neutron._i18n import _
from neutron.api.rpc.callbacks.consumer import registry as registry_rpc
from neutron.api.rpc.callbacks import events as events_rpc
from neutron.api.rpc.handlers import resources_rpc
from neutron import objects
LOG = logging.getLogger(__name__)
class RemoteResourceCache(object):
"""Retrieves and stashes logical resources in their OVO format.
This is currently only compatible with OVO objects that have an ID.
def __init__(self, resource_types):
self.resource_types = resource_types
self._cache_by_type_and_id = {rt: {} for rt in self.resource_types}
self._deleted_ids_by_type = {rt: set() for rt in self.resource_types}
# track everything we've asked the server so we don't ask again
self._satisfied_server_queries = set()
self._puller = resources_rpc.ResourcesPullRpcApi()
def _type_cache(self, rtype):
if rtype not in self.resource_types:
raise RuntimeError(_("Resource cache not tracking %s") % rtype)
return self._cache_by_type_and_id[rtype]
def start_watcher(self):
self._watcher = RemoteResourceWatcher(self)
def get_resource_by_id(self, rtype, obj_id):
"""Returns None if it doesn't exist."""
if obj_id in self._deleted_ids_by_type[rtype]:
return None
cached_item = self._type_cache(rtype).get(obj_id)
if cached_item:
return cached_item
# try server in case object existed before agent start
self._flood_cache_for_query(rtype, id=(obj_id, ))
return self._type_cache(rtype).get(obj_id)
def _flood_cache_for_query(self, rtype, **filter_kwargs):
"""Load info from server for first query.
Queries the server if this is the first time a given query for
rtype has been issued.
query_ids = self._get_query_ids(rtype, filter_kwargs)
if query_ids.issubset(self._satisfied_server_queries):
# we've already asked the server this question so we don't
# ask directly again because any updates will have been
# pushed to us
context = n_ctx.get_admin_context()
resources = self._puller.bulk_pull(context, rtype,
for resource in resources:
if self._is_stale(rtype, resource):
# if the server was slow enough to respond the object may have
# been updated already and pushed to us in another thread.
LOG.debug("Ignoring stale update for %s: %s", rtype, resource)
self.record_resource_update(context, rtype, resource)
LOG.debug("%s resources returned for queries %s", len(resources),
def _get_query_ids(self, rtype, filters):
"""Turns filters for a given rypte into a set of query IDs.
This can result in multiple queries due to the nature of the query
processing on the server side. Since multiple values are treated as
an OR condition, a query for {'id': ('1', '2')} is equivalent
to a query for {'id': ('1',)} and {'id': ('2')}. This method splits
the former into the latter to ensure we aren't asking the server
something we already know.
query_ids = set()
for k, values in tuple(sorted(filters.items())):
if len(values) > 1:
for v in values:
new_filters = filters.copy()
new_filters[k] = (v, )
query_ids.update(self._get_query_ids(rtype, new_filters))
# no multiple value filters left so add an ID
query_ids.add((rtype, ) + tuple(sorted(filters.items())))
return query_ids
def get_resources(self, rtype, filters):
"""Find resources that match key:values in filters dict.
If the attribute on the object is a list, each value is checked if it
is in the list.
The values in the dicionary for a single key are matched in an OR
self._flood_cache_for_query(rtype, **filters)
def match(obj):
for key, values in filters.items():
for value in values:
attr = getattr(obj, key)
if isinstance(attr, (list, tuple, set)):
# attribute is a list so we check if value is in
# list
if value in attr:
elif value == attr:
# no match found for this key
return False
return True
return self.match_resources_with_func(rtype, match)
def match_resources_with_func(self, rtype, matcher):
"""Returns a list of all resources satisfying func matcher."""
# TODO(kevinbenton): this is O(N), offer better lookup functions
return [r for r in self._type_cache(rtype).values()
if matcher(r)]
def _is_stale(self, rtype, resource):
"""Determines if a given resource update is safe to ignore.
It can be safe to ignore if it has already been deleted or if
we have a copy with a higher revision number.
if in self._deleted_ids_by_type[rtype]:
return True
existing = self._type_cache(rtype).get(
if existing and existing.revision_number > resource.revision_number:
# NOTE(kevinbenton): we could be strict and check for >=, but this
# makes us more tolerant of bugs on the server where we forget to
# bump the revision_number.
return True
return False
def record_resource_update(self, context, rtype, resource):
"""Takes in an OVO and generates an event on relevant changes.
A change is deemed to be relevant if it is not stale and if any
fields changed beyond the revision number and update time.
Both creates and updates are handled in this function.
if self._is_stale(rtype, resource):
LOG.debug("Ignoring stale update for %s: %s", rtype, resource)
existing = self._type_cache(rtype).get(
self._type_cache(rtype)[] = resource
changed_fields = self._get_changed_fields(existing, resource)
if not changed_fields:
LOG.debug("Received resource %s update without any changes: %s",
if existing:
LOG.debug("Resource %s %s updated (revision_number %s->%s). "
"Old fields: %s New fields: %s",
rtype,, existing.revision_number,
{f: existing.get(f) for f in changed_fields},
{f: resource.get(f) for f in changed_fields})
LOG.debug("Received new resource %s: %s", rtype, resource)
# local notification for agent internals to subscribe to
registry.notify(rtype, events.AFTER_UPDATE, self,
context=context, changed_fields=changed_fields,
existing=existing, updated=resource,
def record_resource_delete(self, context, rtype, resource_id):
# deletions are final, record them so we never
# accept new data for the same ID.
LOG.debug("Resource %s deleted: %s", rtype, resource_id)
# TODO(kevinbenton): we need a way to expire items from the set at
# some TTL so it doesn't grow indefinitely with churn
if resource_id in self._deleted_ids_by_type[rtype]:
LOG.debug("Skipped duplicate delete event for %s", resource_id)
existing = self._type_cache(rtype).pop(resource_id, None)
# local notification for agent internals to subscribe to
registry.notify(rtype, events.AFTER_DELETE, self, context=context,
existing=existing, resource_id=resource_id)
def _get_changed_fields(self, old, new):
"""Returns changed fields excluding update time and revision."""
new = new.to_dict()
changed = set(new)
if old:
for k, v in old.to_dict().items():
if v == new.get(k):
for ignore in ('revision_number', 'updated_at'):
return changed
class RemoteResourceWatcher(object):
"""Converts RPC callback notifications to local registry notifications.
This allows a constructor to listen for RPC callbacks for a given
dictionary of resources and fields desired.
This watcher will listen to the RPC callbacks as sent on the wire and
handle things like out-of-order message detection and throwing away
updates to fields the constructor doesn't care about.
All watched resources must be primary keyed on a field called 'id' and
have a standard attr revision number.
def __init__(self, remote_resource_cache):
self.rcache = remote_resource_cache
def _init_rpc_listeners(self):
endpoints = [resources_rpc.ResourcesPushRpcCallback()]
self._connection = n_rpc.Connection()
for rtype in self.rcache.resource_types:
registry_rpc.register(self.resource_change_handler, rtype)
topic = resources_rpc.resource_type_versioned_topic(rtype)
self._connection.create_consumer(topic, endpoints, fanout=True)
def resource_change_handler(self, context, rtype, resources, event_type):
for r in resources:
if event_type == events_rpc.DELETED:
self.rcache.record_resource_delete(context, rtype,
# creates and updates are treated equally
self.rcache.record_resource_update(context, rtype, r)