neutron/neutron/agent/resource_cache.py
Sławek Kapłoński b22a9d5ad5 Fix race condition with enabling SG on many ports at once
When there are many calls to enable security groups on ports there
can be sometimes race condition between refresh recource_cache
with data get by "pull" call to neutron server and data received
with "push" rpc message from neutron server.
In such case when "push" message comes with information about
updated port (with enabled port_security), in local cache this port
is already updated so local AFTER_UPDATE call is not called for
such port and its rules in firewall are not updated.

It happend quite often in fullstack security groups test because
there are 4 ports created in this test and all 4 are updated to
apply SG to it one by one.
And here's what happen then in details:
1. port 1 was updated in neutron-server so it sends push notification
   to L2 agent to update security groups,
2. port 1 info was saved in resource cache on L2 agent's side and agent
   started to configure security groups for this port,
3. as one of steps L2 agent called
   SecurityGroupServerAPIShim._select_ips_for_remote_group() method;
   In that method RemoteResourceCache.get_resources() is called and this
   method asks neutron-server for details about ports from given
   security_group,
4. in the meantime neutron-server got port update call for second port
   (with same security group) so it sends to L2 agent informations about 2
   ports (as a reply to request sent from L2 agent in step 3),
5. resource cache updates informations about two ports in local cache,
   returns its data to
   SecurityGroupServerAPIShim._select_ips_for_remote_group() and all
   looks fine,
6. but now L2 agent receives push notification with info that port 2 is
   updated (changed security groups), so it checks info about this port
   in local cache,
7. in local cache info about port 2 is already WITH updated security
   group so RemoteResourceCache doesn't trigger local notification about
   port AFTER UPDATE and L2 agent doesn't know that security groups for this
   port should be changed

This patch fixes it by changing way how items are updated in
the resource_cache.
For now it is done with record_resource_update() method instead of
writing new values directly to resource_cache._type_cache dict.
Due to that if resource will be updated during "pull" call to neutron
server, local AFTER_UPDATE will still be triggered for such resource.

Change-Id: I5a62cc5731c5ba571506a3aa26303a1b0290d37b
Closes-Bug: #1742401
(cherry picked from commit 725df3e038)
2018-06-20 18:16:07 +00:00

255 lines
11 KiB
Python

# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron_lib import context as n_ctx
from oslo_log import log as logging
from neutron.api.rpc.callbacks.consumer import registry as registry_rpc
from neutron.api.rpc.callbacks import events as events_rpc
from neutron.api.rpc.handlers import resources_rpc
from neutron.callbacks import events
from neutron.callbacks import registry
from neutron.common import rpc as n_rpc
from neutron import objects
LOG = logging.getLogger(__name__)
objects.register_objects()
class RemoteResourceCache(object):
"""Retrieves and stashes logical resources in their OVO format.
This is currently only compatible with OVO objects that have an ID.
"""
def __init__(self, resource_types):
self.resource_types = resource_types
self._cache_by_type_and_id = {rt: {} for rt in self.resource_types}
self._deleted_ids_by_type = {rt: set() for rt in self.resource_types}
# track everything we've asked the server so we don't ask again
self._satisfied_server_queries = set()
self._puller = resources_rpc.ResourcesPullRpcApi()
def _type_cache(self, rtype):
if rtype not in self.resource_types:
raise RuntimeError("Resource cache not tracking %s" % rtype)
return self._cache_by_type_and_id[rtype]
def start_watcher(self):
self._watcher = RemoteResourceWatcher(self)
def get_resource_by_id(self, rtype, obj_id):
"""Returns None if it doesn't exist."""
if obj_id in self._deleted_ids_by_type[rtype]:
return None
cached_item = self._type_cache(rtype).get(obj_id)
if cached_item:
return cached_item
# try server in case object existed before agent start
self._flood_cache_for_query(rtype, id=(obj_id, ))
return self._type_cache(rtype).get(obj_id)
def _flood_cache_for_query(self, rtype, **filter_kwargs):
"""Load info from server for first query.
Queries the server if this is the first time a given query for
rtype has been issued.
"""
query_ids = self._get_query_ids(rtype, filter_kwargs)
if query_ids.issubset(self._satisfied_server_queries):
# we've already asked the server this question so we don't
# ask directly again because any updates will have been
# pushed to us
return
context = n_ctx.get_admin_context()
resources = self._puller.bulk_pull(context, rtype,
filter_kwargs=filter_kwargs)
for resource in resources:
if self._is_stale(rtype, resource):
# if the server was slow enough to respond the object may have
# been updated already and pushed to us in another thread.
LOG.debug("Ignoring stale update for %s: %s", rtype, resource)
continue
self.record_resource_update(context, rtype, resource)
LOG.debug("%s resources returned for queries %s", len(resources),
query_ids)
self._satisfied_server_queries.update(query_ids)
def _get_query_ids(self, rtype, filters):
"""Turns filters for a given rypte into a set of query IDs.
This can result in multiple queries due to the nature of the query
processing on the server side. Since multiple values are treated as
an OR condition, a query for {'id': ('1', '2')} is equivalent
to a query for {'id': ('1',)} and {'id': ('2')}. This method splits
the former into the latter to ensure we aren't asking the server
something we already know.
"""
query_ids = set()
for k, values in tuple(sorted(filters.items())):
if len(values) > 1:
for v in values:
new_filters = filters.copy()
new_filters[k] = (v, )
query_ids.update(self._get_query_ids(rtype, new_filters))
break
else:
# no multiple value filters left so add an ID
query_ids.add((rtype, ) + tuple(sorted(filters.items())))
return query_ids
def get_resources(self, rtype, filters):
"""Find resources that match key:values in filters dict.
If the attribute on the object is a list, each value is checked if it
is in the list.
The values in the dicionary for a single key are matched in an OR
fashion.
"""
self._flood_cache_for_query(rtype, **filters)
def match(obj):
for key, values in filters.items():
for value in values:
attr = getattr(obj, key)
if isinstance(attr, (list, tuple, set)):
# attribute is a list so we check if value is in
# list
if value in attr:
break
elif value == attr:
break
else:
# no match found for this key
return False
return True
return self.match_resources_with_func(rtype, match)
def match_resources_with_func(self, rtype, matcher):
"""Returns a list of all resources satisfying func matcher."""
# TODO(kevinbenton): this is O(N), offer better lookup functions
return [r for r in self._type_cache(rtype).values()
if matcher(r)]
def _is_stale(self, rtype, resource):
"""Determines if a given resource update is safe to ignore.
It can be safe to ignore if it has already been deleted or if
we have a copy with a higher revision number.
"""
if resource.id in self._deleted_ids_by_type[rtype]:
return True
existing = self._type_cache(rtype).get(resource.id)
if existing and existing.revision_number > resource.revision_number:
# NOTE(kevinbenton): we could be strict and check for >=, but this
# makes us more tolerant of bugs on the server where we forget to
# bump the revision_number.
return True
return False
def record_resource_update(self, context, rtype, resource):
"""Takes in an OVO and generates an event on relevant changes.
A change is deemed to be relevant if it is not stale and if any
fields changed beyond the revision number and update time.
Both creates and updates are handled in this function.
"""
if self._is_stale(rtype, resource):
LOG.debug("Ignoring stale update for %s: %s", rtype, resource)
return
existing = self._type_cache(rtype).get(resource.id)
self._type_cache(rtype)[resource.id] = resource
changed_fields = self._get_changed_fields(existing, resource)
if not changed_fields:
LOG.debug("Received resource %s update without any changes: %s",
rtype, resource.id)
return
if existing:
LOG.debug("Resource %s %s updated (revision_number %s->%s). "
"Old fields: %s New fields: %s",
rtype, existing.id, existing.revision_number,
resource.revision_number,
{f: existing.get(f) for f in changed_fields},
{f: resource.get(f) for f in changed_fields})
else:
LOG.debug("Received new resource %s: %s", rtype, resource)
# local notification for agent internals to subscribe to
registry.notify(rtype, events.AFTER_UPDATE, self,
context=context, changed_fields=changed_fields,
existing=existing, updated=resource,
resource_id=resource.id)
def record_resource_delete(self, context, rtype, resource_id):
# deletions are final, record them so we never
# accept new data for the same ID.
LOG.debug("Resource %s deleted: %s", rtype, resource_id)
# TODO(kevinbenton): we need a way to expire items from the set at
# some TTL so it doesn't grow indefinitely with churn
if resource_id in self._deleted_ids_by_type[rtype]:
LOG.debug("Skipped duplicate delete event for %s", resource_id)
return
self._deleted_ids_by_type[rtype].add(resource_id)
existing = self._type_cache(rtype).pop(resource_id, None)
# local notification for agent internals to subscribe to
registry.notify(rtype, events.AFTER_DELETE, self, context=context,
existing=existing, resource_id=resource_id)
def _get_changed_fields(self, old, new):
"""Returns changed fields excluding update time and revision."""
new = new.to_dict()
changed = set(new)
if old:
for k, v in old.to_dict().items():
if v == new.get(k):
changed.discard(k)
for ignore in ('revision_number', 'updated_at'):
changed.discard(ignore)
return changed
class RemoteResourceWatcher(object):
"""Converts RPC callback notifications to local registry notifications.
This allows a constructor to listen for RPC callbacks for a given
dictionary of resources and fields desired.
This watcher will listen to the RPC callbacks as sent on the wire and
handle things like out-of-order message detection and throwing away
updates to fields the constructor doesn't care about.
All watched resources must be primary keyed on a field called 'id' and
have a standard attr revision number.
"""
def __init__(self, remote_resource_cache):
self.rcache = remote_resource_cache
self._init_rpc_listeners()
def _init_rpc_listeners(self):
endpoints = [resources_rpc.ResourcesPushRpcCallback()]
self._connection = n_rpc.create_connection()
for rtype in self.rcache.resource_types:
registry_rpc.register(self.resource_change_handler, rtype)
topic = resources_rpc.resource_type_versioned_topic(rtype)
self._connection.create_consumer(topic, endpoints, fanout=True)
self._connection.consume_in_threads()
def resource_change_handler(self, context, rtype, resources, event_type):
for r in resources:
if event_type == events_rpc.DELETED:
self.rcache.record_resource_delete(context, rtype, r.id)
else:
# creates and updates are treated equally
self.rcache.record_resource_update(context, rtype, r)