
The neutron.common.rpc module has been in neutron-lib for awhile now and neutron is shimmed to use neutron-lib already. This patch removes neutron.common.rpc and switches the code over to use neutron-lib's implementation where needed. NeutronLibImpact Change-Id: I733f07a8c4a2af071b3467bd710290eee11a4f4c
256 lines
11 KiB
Python
256 lines
11 KiB
Python
# All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
from neutron_lib.callbacks import events
|
|
from neutron_lib.callbacks import registry
|
|
from neutron_lib import context as n_ctx
|
|
from neutron_lib import rpc as n_rpc
|
|
from oslo_log import log as logging
|
|
|
|
from neutron._i18n import _
|
|
from neutron.api.rpc.callbacks.consumer import registry as registry_rpc
|
|
from neutron.api.rpc.callbacks import events as events_rpc
|
|
from neutron.api.rpc.handlers import resources_rpc
|
|
from neutron import objects
|
|
|
|
LOG = logging.getLogger(__name__)
|
|
objects.register_objects()
|
|
|
|
|
|
class RemoteResourceCache(object):
|
|
"""Retrieves and stashes logical resources in their OVO format.
|
|
|
|
This is currently only compatible with OVO objects that have an ID.
|
|
"""
|
|
def __init__(self, resource_types):
|
|
self.resource_types = resource_types
|
|
self._cache_by_type_and_id = {rt: {} for rt in self.resource_types}
|
|
self._deleted_ids_by_type = {rt: set() for rt in self.resource_types}
|
|
# track everything we've asked the server so we don't ask again
|
|
self._satisfied_server_queries = set()
|
|
self._puller = resources_rpc.ResourcesPullRpcApi()
|
|
|
|
def _type_cache(self, rtype):
|
|
if rtype not in self.resource_types:
|
|
raise RuntimeError(_("Resource cache not tracking %s") % rtype)
|
|
return self._cache_by_type_and_id[rtype]
|
|
|
|
def start_watcher(self):
|
|
self._watcher = RemoteResourceWatcher(self)
|
|
|
|
def get_resource_by_id(self, rtype, obj_id):
|
|
"""Returns None if it doesn't exist."""
|
|
if obj_id in self._deleted_ids_by_type[rtype]:
|
|
return None
|
|
cached_item = self._type_cache(rtype).get(obj_id)
|
|
if cached_item:
|
|
return cached_item
|
|
# try server in case object existed before agent start
|
|
self._flood_cache_for_query(rtype, id=(obj_id, ))
|
|
return self._type_cache(rtype).get(obj_id)
|
|
|
|
def _flood_cache_for_query(self, rtype, **filter_kwargs):
|
|
"""Load info from server for first query.
|
|
|
|
Queries the server if this is the first time a given query for
|
|
rtype has been issued.
|
|
"""
|
|
query_ids = self._get_query_ids(rtype, filter_kwargs)
|
|
if query_ids.issubset(self._satisfied_server_queries):
|
|
# we've already asked the server this question so we don't
|
|
# ask directly again because any updates will have been
|
|
# pushed to us
|
|
return
|
|
context = n_ctx.get_admin_context()
|
|
resources = self._puller.bulk_pull(context, rtype,
|
|
filter_kwargs=filter_kwargs)
|
|
for resource in resources:
|
|
if self._is_stale(rtype, resource):
|
|
# if the server was slow enough to respond the object may have
|
|
# been updated already and pushed to us in another thread.
|
|
LOG.debug("Ignoring stale update for %s: %s", rtype, resource)
|
|
continue
|
|
self.record_resource_update(context, rtype, resource)
|
|
LOG.debug("%s resources returned for queries %s", len(resources),
|
|
query_ids)
|
|
self._satisfied_server_queries.update(query_ids)
|
|
|
|
def _get_query_ids(self, rtype, filters):
|
|
"""Turns filters for a given rypte into a set of query IDs.
|
|
|
|
This can result in multiple queries due to the nature of the query
|
|
processing on the server side. Since multiple values are treated as
|
|
an OR condition, a query for {'id': ('1', '2')} is equivalent
|
|
to a query for {'id': ('1',)} and {'id': ('2')}. This method splits
|
|
the former into the latter to ensure we aren't asking the server
|
|
something we already know.
|
|
"""
|
|
query_ids = set()
|
|
for k, values in tuple(sorted(filters.items())):
|
|
if len(values) > 1:
|
|
for v in values:
|
|
new_filters = filters.copy()
|
|
new_filters[k] = (v, )
|
|
query_ids.update(self._get_query_ids(rtype, new_filters))
|
|
break
|
|
else:
|
|
# no multiple value filters left so add an ID
|
|
query_ids.add((rtype, ) + tuple(sorted(filters.items())))
|
|
return query_ids
|
|
|
|
def get_resources(self, rtype, filters):
|
|
"""Find resources that match key:values in filters dict.
|
|
|
|
If the attribute on the object is a list, each value is checked if it
|
|
is in the list.
|
|
|
|
The values in the dicionary for a single key are matched in an OR
|
|
fashion.
|
|
"""
|
|
self._flood_cache_for_query(rtype, **filters)
|
|
|
|
def match(obj):
|
|
for key, values in filters.items():
|
|
for value in values:
|
|
attr = getattr(obj, key)
|
|
if isinstance(attr, (list, tuple, set)):
|
|
# attribute is a list so we check if value is in
|
|
# list
|
|
if value in attr:
|
|
break
|
|
elif value == attr:
|
|
break
|
|
else:
|
|
# no match found for this key
|
|
return False
|
|
return True
|
|
return self.match_resources_with_func(rtype, match)
|
|
|
|
def match_resources_with_func(self, rtype, matcher):
|
|
"""Returns a list of all resources satisfying func matcher."""
|
|
# TODO(kevinbenton): this is O(N), offer better lookup functions
|
|
return [r for r in self._type_cache(rtype).values()
|
|
if matcher(r)]
|
|
|
|
def _is_stale(self, rtype, resource):
|
|
"""Determines if a given resource update is safe to ignore.
|
|
|
|
It can be safe to ignore if it has already been deleted or if
|
|
we have a copy with a higher revision number.
|
|
"""
|
|
if resource.id in self._deleted_ids_by_type[rtype]:
|
|
return True
|
|
existing = self._type_cache(rtype).get(resource.id)
|
|
if existing and existing.revision_number > resource.revision_number:
|
|
# NOTE(kevinbenton): we could be strict and check for >=, but this
|
|
# makes us more tolerant of bugs on the server where we forget to
|
|
# bump the revision_number.
|
|
return True
|
|
return False
|
|
|
|
def record_resource_update(self, context, rtype, resource):
|
|
"""Takes in an OVO and generates an event on relevant changes.
|
|
|
|
A change is deemed to be relevant if it is not stale and if any
|
|
fields changed beyond the revision number and update time.
|
|
|
|
Both creates and updates are handled in this function.
|
|
"""
|
|
if self._is_stale(rtype, resource):
|
|
LOG.debug("Ignoring stale update for %s: %s", rtype, resource)
|
|
return
|
|
existing = self._type_cache(rtype).get(resource.id)
|
|
self._type_cache(rtype)[resource.id] = resource
|
|
changed_fields = self._get_changed_fields(existing, resource)
|
|
if not changed_fields:
|
|
LOG.debug("Received resource %s update without any changes: %s",
|
|
rtype, resource.id)
|
|
return
|
|
if existing:
|
|
LOG.debug("Resource %s %s updated (revision_number %s->%s). "
|
|
"Old fields: %s New fields: %s",
|
|
rtype, existing.id, existing.revision_number,
|
|
resource.revision_number,
|
|
{f: existing.get(f) for f in changed_fields},
|
|
{f: resource.get(f) for f in changed_fields})
|
|
else:
|
|
LOG.debug("Received new resource %s: %s", rtype, resource)
|
|
# local notification for agent internals to subscribe to
|
|
registry.notify(rtype, events.AFTER_UPDATE, self,
|
|
context=context, changed_fields=changed_fields,
|
|
existing=existing, updated=resource,
|
|
resource_id=resource.id)
|
|
|
|
def record_resource_delete(self, context, rtype, resource_id):
|
|
# deletions are final, record them so we never
|
|
# accept new data for the same ID.
|
|
LOG.debug("Resource %s deleted: %s", rtype, resource_id)
|
|
# TODO(kevinbenton): we need a way to expire items from the set at
|
|
# some TTL so it doesn't grow indefinitely with churn
|
|
if resource_id in self._deleted_ids_by_type[rtype]:
|
|
LOG.debug("Skipped duplicate delete event for %s", resource_id)
|
|
return
|
|
self._deleted_ids_by_type[rtype].add(resource_id)
|
|
existing = self._type_cache(rtype).pop(resource_id, None)
|
|
# local notification for agent internals to subscribe to
|
|
registry.notify(rtype, events.AFTER_DELETE, self, context=context,
|
|
existing=existing, resource_id=resource_id)
|
|
|
|
def _get_changed_fields(self, old, new):
|
|
"""Returns changed fields excluding update time and revision."""
|
|
new = new.to_dict()
|
|
changed = set(new)
|
|
if old:
|
|
for k, v in old.to_dict().items():
|
|
if v == new.get(k):
|
|
changed.discard(k)
|
|
for ignore in ('revision_number', 'updated_at'):
|
|
changed.discard(ignore)
|
|
return changed
|
|
|
|
|
|
class RemoteResourceWatcher(object):
|
|
"""Converts RPC callback notifications to local registry notifications.
|
|
|
|
This allows a constructor to listen for RPC callbacks for a given
|
|
dictionary of resources and fields desired.
|
|
This watcher will listen to the RPC callbacks as sent on the wire and
|
|
handle things like out-of-order message detection and throwing away
|
|
updates to fields the constructor doesn't care about.
|
|
|
|
All watched resources must be primary keyed on a field called 'id' and
|
|
have a standard attr revision number.
|
|
"""
|
|
|
|
def __init__(self, remote_resource_cache):
|
|
self.rcache = remote_resource_cache
|
|
self._init_rpc_listeners()
|
|
|
|
def _init_rpc_listeners(self):
|
|
endpoints = [resources_rpc.ResourcesPushRpcCallback()]
|
|
self._connection = n_rpc.Connection()
|
|
for rtype in self.rcache.resource_types:
|
|
registry_rpc.register(self.resource_change_handler, rtype)
|
|
topic = resources_rpc.resource_type_versioned_topic(rtype)
|
|
self._connection.create_consumer(topic, endpoints, fanout=True)
|
|
self._connection.consume_in_threads()
|
|
|
|
def resource_change_handler(self, context, rtype, resources, event_type):
|
|
for r in resources:
|
|
if event_type == events_rpc.DELETED:
|
|
self.rcache.record_resource_delete(context, rtype, r.id)
|
|
else:
|
|
# creates and updates are treated equally
|
|
self.rcache.record_resource_update(context, rtype, r)
|