Merge "API compare-and-swap updates based on revision_number"

This commit is contained in:
Jenkins 2017-07-09 18:17:59 +00:00 committed by Gerrit Code Review
commit 59006183e1
12 changed files with 332 additions and 13 deletions

View File

@ -27,6 +27,7 @@ from six.moves.urllib import parse
from webob import exc from webob import exc
from neutron._i18n import _, _LW from neutron._i18n import _, _LW
from neutron.api import extensions
from neutron.common import constants from neutron.common import constants
from neutron import wsgi from neutron import wsgi
@ -34,6 +35,34 @@ from neutron import wsgi
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
def ensure_if_match_supported():
"""Raises exception if 'if-match' revision matching unsupported."""
if 'revision-if-match' in (extensions.PluginAwareExtensionManager.
get_instance().extensions):
return
msg = _("This server does not support constraining operations based on "
"revision numbers")
raise exceptions.BadRequest(resource='if-match', msg=msg)
def check_request_for_revision_constraint(request):
"""Parses, verifies, and returns a constraint from a request."""
revision_number = None
for e in getattr(request.if_match, 'etags', []):
if e.startswith('revision_number='):
if revision_number is not None:
msg = _("Multiple revision_number etags are not supported.")
raise exceptions.BadRequest(resource='if-match', msg=msg)
ensure_if_match_supported()
try:
revision_number = int(e.split('revision_number=')[1])
except ValueError:
msg = _("Revision number etag must be in the format of "
"revision_number=<int>")
raise exceptions.BadRequest(resource='if-match', msg=msg)
return revision_number
def get_filters(request, attr_info, skips=None): def get_filters(request, attr_info, skips=None):
return get_filters_from_dict(request.GET.dict_of_lists(), return get_filters_from_dict(request.GET.dict_of_lists(),
attr_info, attr_info,

View File

@ -89,6 +89,15 @@ def Resource(controller, faults=None, deserializers=None, serializers=None,
if fmt is not None and fmt not in format_types: if fmt is not None and fmt not in format_types:
args['id'] = '.'.join([args['id'], fmt]) args['id'] = '.'.join([args['id'], fmt])
revision_number = api_common.check_request_for_revision_constraint(
request)
if revision_number is not None:
constraint = {'if_revision_match': revision_number,
'resource': controller._collection,
'resource_id': args['id']}
# TODO(kevinbenton): add an interface to context to do this
setattr(request.context, '_CONSTRAINT', constraint)
method = getattr(controller, action) method = getattr(controller, action)
result = method(request=request, **args) result = method(request=request, **args)
except Exception as e: except Exception as e:

View File

@ -0,0 +1,42 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from neutron_lib.api import extensions as api_extensions
class Revisionifmatch(api_extensions.ExtensionDescriptor):
"""Indicate that If-Match constraints on revision_number are supported."""
@classmethod
def get_name(cls):
return "If-Match constraints based on revision_number"
@classmethod
def get_alias(cls):
return 'revision-if-match'
@classmethod
def get_description(cls):
return ("Extension indicating that If-Match based on revision_number "
"is supported.")
@classmethod
def get_updated(cls):
return "2016-12-11T00:00:00-00:00"
@classmethod
def get_resources(cls):
return []
def get_extended_resources(self, version):
return {}

View File

@ -88,6 +88,7 @@ class QueryParametersHook(hooks.PecanHook):
priority = policy_enforcement.PolicyHook.priority - 1 priority = policy_enforcement.PolicyHook.priority - 1
def before(self, state): def before(self, state):
self._process_if_match_headers(state)
state.request.context['query_params'] = {} state.request.context['query_params'] = {}
if state.request.method != 'GET': if state.request.method != 'GET':
return return
@ -108,6 +109,23 @@ class QueryParametersHook(hooks.PecanHook):
added_fields) added_fields)
state.request.context['query_params'] = query_params state.request.context['query_params'] = query_params
def _process_if_match_headers(self, state):
collection = state.request.context.get('collection')
if not collection:
return
# add in if-match criterion to the context if present
revision_number = api_common.check_request_for_revision_constraint(
state.request)
if revision_number is None:
return
constraint = {
'if_revision_match': revision_number,
'resource': collection,
'resource_id': state.request.context['resource_id']}
# TODO(kevinbenton): add an interface to context to do this
setattr(state.request.context['neutron_context'],
'_CONSTRAINT', constraint)
def after(self, state): def after(self, state):
resource = state.request.context.get('resource') resource = state.request.context.get('resource')
collection = state.request.context.get('collection') collection = state.request.context.get('collection')

View File

@ -16,6 +16,7 @@ from oslo_log import log as logging
import sqlalchemy import sqlalchemy
from sqlalchemy.orm import exc from sqlalchemy.orm import exc
from sqlalchemy.orm import session as se from sqlalchemy.orm import session as se
import webob.exc
from neutron._i18n import _, _LW from neutron._i18n import _, _LW
from neutron.db import _resource_extend as resource_extend from neutron.db import _resource_extend as resource_extend
@ -29,7 +30,8 @@ LOG = logging.getLogger(__name__)
class RevisionPlugin(service_base.ServicePluginBase): class RevisionPlugin(service_base.ServicePluginBase):
"""Plugin to populate revision numbers into standard attr resources.""" """Plugin to populate revision numbers into standard attr resources."""
supported_extension_aliases = ['standard-attr-revisions'] supported_extension_aliases = ['standard-attr-revisions',
'revision-if-match']
def __init__(self): def __init__(self):
super(RevisionPlugin, self).__init__() super(RevisionPlugin, self).__init__()
@ -40,6 +42,7 @@ class RevisionPlugin(service_base.ServicePluginBase):
self._clear_rev_bumped_flags) self._clear_rev_bumped_flags)
def bump_revisions(self, session, context, instances): def bump_revisions(self, session, context, instances):
self._enforce_if_match_constraints(session)
# bump revision number for any updated objects in the session # bump revision number for any updated objects in the session
for obj in session.dirty: for obj in session.dirty:
if isinstance(obj, standard_attr.HasStandardAttributes): if isinstance(obj, standard_attr.HasStandardAttributes):
@ -117,5 +120,73 @@ class RevisionPlugin(service_base.ServicePluginBase):
if getattr(obj, '_rev_bumped', False): if getattr(obj, '_rev_bumped', False):
# we've already bumped the revision of this object in this txn # we've already bumped the revision of this object in this txn
return return
instance, match = self._get_constrained_instance_match(session)
if instance and instance == obj:
# one last check before bumping revision
self._enforce_if_match_constraints(session)
obj.bump_revision() obj.bump_revision()
setattr(obj, '_rev_bumped', True) setattr(obj, '_rev_bumped', True)
def _find_instance_by_column_value(self, session, model, column, value):
"""Lookup object in session or from DB based on a column's value."""
for session_obj in session:
if not isinstance(session_obj, model):
continue
if getattr(session_obj, column) == value:
return session_obj
# object isn't in session so we have to query for it
related_obj = (session.query(model).filter_by(**{column: value}).
first())
return related_obj
def _get_constrained_instance_match(self, session):
"""Returns instance and constraint of if-match criterion if present.
Checks the context associated with the session for compare-and-swap
update revision number constraints. If one is found, this returns the
instance that is constrained as well as the requested revision number
to match.
"""
criteria = getattr(session.info.get('using_context'),
'_CONSTRAINT', None)
if not criteria:
return None, None
match = criteria['if_revision_match']
mmap = standard_attr.get_standard_attr_resource_model_map()
model = mmap.get(criteria['resource'])
if not model:
msg = _("Revision matching not supported for this resource")
raise exc.BadRequest(resource=criteria['resource'], msg=msg)
instance = self._find_instance_by_column_value(
session, model, 'id', criteria['resource_id'])
return instance, match
def _enforce_if_match_constraints(self, session):
"""Check for if-match constraints and raise exception if violated.
We determine the collection being modified and look for any
objects of the collection type in the dirty/deleted items in
the session. If they don't match the revision_number constraint
supplied, we throw an exception.
We are protected from a concurrent update because if we match
revision number here and another update commits to the database
first, the compare and swap of revision_number will fail and a
StaleDataError (or deadlock in galera multi-writer) will be raised,
at which point this will be retried and fail to match.
"""
instance, match = self._get_constrained_instance_match(session)
if not instance or getattr(instance, '_rev_bumped', False):
# no constraints present or constrain satisfied in this transaction
return
if instance.revision_number != match:
raise RevisionNumberConstraintFailed(match,
instance.revision_number)
class RevisionNumberConstraintFailed(webob.exc.HTTPPreconditionFailed):
def __init__(self, expected, current):
detail = (_("Constrained to %(exp)s, but current revision is %(cur)s")
% {'exp': expected, 'cur': current})
super(RevisionNumberConstraintFailed, self).__init__(detail=detail)

View File

@ -18,6 +18,7 @@ from neutron_lib.callbacks import events
from neutron_lib import context from neutron_lib import context
from neutron_lib.db import constants as db_const from neutron_lib.db import constants as db_const
from neutron_lib.plugins import directory from neutron_lib.plugins import directory
from oslo_config import cfg
from oslo_policy import policy as oslo_policy from oslo_policy import policy as oslo_policy
from oslo_serialization import jsonutils from oslo_serialization import jsonutils
@ -45,6 +46,51 @@ class TestOwnershipHook(test_functional.PecanFunctionalTest):
self.assertEqual(201, port_response.status_int) self.assertEqual(201, port_response.status_int)
class TestQueryParamatersHook(test_functional.PecanFunctionalTest):
def test_if_match_on_update(self):
net_response = jsonutils.loads(self.app.post_json(
'/v2.0/networks.json',
params={'network': {'name': 'meh'}},
headers={'X-Project-Id': 'tenid'}).body)
network_id = net_response['network']['id']
response = self.app.put_json('/v2.0/networks/%s.json' % network_id,
params={'network': {'name': 'cat'}},
headers={'X-Project-Id': 'tenid',
'If-Match': 'revision_number=0'},
expect_errors=True)
# revision plugin not supported by default, so badrequest
self.assertEqual(400, response.status_int)
class TestQueryParamatersHookWithRevision(test_functional.PecanFunctionalTest):
def setUp(self):
cfg.CONF.set_override('service_plugins', ['revisions'])
super(TestQueryParamatersHookWithRevision, self).setUp()
def test_if_match_on_update(self):
net_response = jsonutils.loads(self.app.post_json(
'/v2.0/networks.json',
params={'network': {'name': 'meh'}},
headers={'X-Project-Id': 'tenid'}).body)
network_id = net_response['network']['id']
rev = net_response['network']['revision_number']
stale = rev - 1
response = self.app.put_json(
'/v2.0/networks/%s.json' % network_id,
params={'network': {'name': 'cat'}},
headers={'X-Project-Id': 'tenid',
'If-Match': 'revision_number=%s' % stale},
expect_errors=True)
self.assertEqual(412, response.status_int)
self.app.put_json('/v2.0/networks/%s.json' % network_id,
params={'network': {'name': 'cat'}},
headers={'X-Project-Id': 'tenid',
'If-Match': 'revision_number=%s' % rev})
class TestQuotaEnforcementHook(test_functional.PecanFunctionalTest): class TestQuotaEnforcementHook(test_functional.PecanFunctionalTest):
def test_quota_enforcement_single(self): def test_quota_enforcement_single(self):

View File

@ -13,6 +13,7 @@
import netaddr import netaddr
from tempest.lib import decorators from tempest.lib import decorators
from tempest.lib import exceptions
from tempest import test from tempest import test
from neutron.tests.tempest.api import base from neutron.tests.tempest.api import base
@ -33,6 +34,35 @@ class TestRevisions(base.BaseAdminNetworkTest, bsg.BaseSecGroupTest):
self.assertGreater(updated['network']['revision_number'], self.assertGreater(updated['network']['revision_number'],
net['revision_number']) net['revision_number'])
@decorators.idempotent_id('4a26a4be-9c53-483c-bc50-b11111113333')
def test_update_network_constrained_by_revision(self):
net = self.create_network()
current = net['revision_number']
stale = current - 1
# using a stale number should fail
self.assertRaises(
exceptions.PreconditionFailed,
self.client.update_network,
net['id'], name='newnet',
headers={'If-Match': 'revision_number=%s' % stale}
)
# using current should pass. in case something is updating the network
# on the server at the same time, we have to re-read and update to be
# safe
for i in range(100):
current = (self.client.show_network(net['id'])
['network']['revision_number'])
try:
self.client.update_network(
net['id'], name='newnet',
headers={'If-Match': 'revision_number=%s' % current})
except exceptions.UnexpectedResponseCode:
continue
break
else:
self.fail("Failed to update network after 100 tries.")
@decorators.idempotent_id('cac7ecde-12d5-4331-9a03-420899dea077') @decorators.idempotent_id('cac7ecde-12d5-4331-9a03-420899dea077')
def test_update_port_bumps_revision(self): def test_update_port_bumps_revision(self):
net = self.create_network() net = self.create_network()

View File

@ -148,10 +148,11 @@ class NetworkClientJSON(service_client.RestClient):
def _updater(self, resource_name): def _updater(self, resource_name):
def _update(res_id, **kwargs): def _update(res_id, **kwargs):
headers = kwargs.pop('headers', {})
plural = self.pluralize(resource_name) plural = self.pluralize(resource_name)
uri = '%s/%s' % (self.get_uri(plural), res_id) uri = '%s/%s' % (self.get_uri(plural), res_id)
post_data = self.serialize({resource_name: kwargs}) post_data = self.serialize({resource_name: kwargs})
resp, body = self.put(uri, post_data) resp, body = self.put(uri, post_data, headers=headers)
body = self.deserialize_single(body) body = self.deserialize_single(body)
self.expected_success(200, resp.status) self.expected_success(200, resp.status)
return service_client.ResponseBody(resp, body) return service_client.ResponseBody(resp, body)

View File

@ -178,7 +178,8 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase):
super(NeutronDbPluginV2TestCase, self).setup_config(args=args) super(NeutronDbPluginV2TestCase, self).setup_config(args=args)
def _req(self, method, resource, data=None, fmt=None, id=None, params=None, def _req(self, method, resource, data=None, fmt=None, id=None, params=None,
action=None, subresource=None, sub_id=None, context=None): action=None, subresource=None, sub_id=None, context=None,
headers=None):
fmt = fmt or self.fmt fmt = fmt or self.fmt
path = '/%s.%s' % ( path = '/%s.%s' % (
@ -196,7 +197,8 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase):
if data is not None: # empty dict is valid if data is not None: # empty dict is valid
body = self.serialize(data) body = self.serialize(data)
return testlib_api.create_request(path, body, content_type, method, return testlib_api.create_request(path, body, content_type, method,
query_string=params, context=context) query_string=params, context=context,
headers=headers)
def new_create_request(self, resource, data, fmt=None, id=None, def new_create_request(self, resource, data, fmt=None, id=None,
subresource=None, context=None): subresource=None, context=None):
@ -219,7 +221,7 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase):
params=params, subresource=subresource, sub_id=sub_id) params=params, subresource=subresource, sub_id=sub_id)
def new_delete_request(self, resource, id, fmt=None, subresource=None, def new_delete_request(self, resource, id, fmt=None, subresource=None,
sub_id=None, data=None): sub_id=None, data=None, headers=None):
return self._req( return self._req(
'DELETE', 'DELETE',
resource, resource,
@ -227,14 +229,16 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase):
fmt, fmt,
id=id, id=id,
subresource=subresource, subresource=subresource,
sub_id=sub_id sub_id=sub_id,
headers=headers
) )
def new_update_request(self, resource, data, id, fmt=None, def new_update_request(self, resource, data, id, fmt=None,
subresource=None, context=None, sub_id=None): subresource=None, context=None, sub_id=None,
headers=None):
return self._req( return self._req(
'PUT', resource, data, fmt, id=id, subresource=subresource, 'PUT', resource, data, fmt, id=id, subresource=subresource,
sub_id=sub_id, context=context sub_id=sub_id, context=context, headers=headers
) )
def new_action_request(self, resource, data, id, action, fmt=None, def new_action_request(self, resource, data, id, action, fmt=None,
@ -520,8 +524,8 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase):
def _delete(self, collection, id, def _delete(self, collection, id,
expected_code=webob.exc.HTTPNoContent.code, expected_code=webob.exc.HTTPNoContent.code,
neutron_context=None): neutron_context=None, headers=None):
req = self.new_delete_request(collection, id) req = self.new_delete_request(collection, id, headers=headers)
if neutron_context: if neutron_context:
# create a specific auth context for this request # create a specific auth context for this request
req.environ['neutron.context'] = neutron_context req.environ['neutron.context'] = neutron_context
@ -545,8 +549,8 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase):
def _update(self, resource, id, new_data, def _update(self, resource, id, new_data,
expected_code=webob.exc.HTTPOk.code, expected_code=webob.exc.HTTPOk.code,
neutron_context=None): neutron_context=None, headers=None):
req = self.new_update_request(resource, new_data, id) req = self.new_update_request(resource, new_data, id, headers=headers)
if neutron_context: if neutron_context:
# create a specific auth context for this request # create a specific auth context for this request
req.environ['neutron.context'] = neutron_context req.environ['neutron.context'] = neutron_context

View File

@ -17,8 +17,12 @@ import netaddr
from neutron_lib import context as nctx from neutron_lib import context as nctx
from neutron_lib.plugins import constants from neutron_lib.plugins import constants
from neutron_lib.plugins import directory from neutron_lib.plugins import directory
from oslo_db import exception as db_exc
from oslo_utils import uuidutils from oslo_utils import uuidutils
from sqlalchemy.orm import session as se
from webob import exc
from neutron.db import api as db_api
from neutron.db import models_v2 from neutron.db import models_v2
from neutron.plugins.ml2 import config from neutron.plugins.ml2 import config
from neutron.tests.unit.plugins.ml2 import test_plugin from neutron.tests.unit.plugins.ml2 import test_plugin
@ -83,6 +87,55 @@ class TestRevisionPlugin(test_plugin.Ml2PluginV2TestCase):
new_rev = response['port']['revision_number'] new_rev = response['port']['revision_number']
self.assertGreater(new_rev, rev) self.assertGreater(new_rev, rev)
def test_constrained_port_update(self):
with self.port() as port:
rev = port['port']['revision_number']
new = {'port': {'name': 'nigiri'}}
for val in (rev - 1, rev + 1):
# make sure off-by ones are rejected
self._update('ports', port['port']['id'], new,
headers={'If-Match': 'revision_number=%s' % val},
expected_code=exc.HTTPPreconditionFailed.code)
after_attempt = self._show('ports', port['port']['id'])
self.assertEqual(rev, after_attempt['port']['revision_number'])
self.assertEqual(port['port']['name'],
after_attempt['port']['name'])
# correct revision should work
self._update('ports', port['port']['id'], new,
headers={'If-Match': 'revision_number=%s' % rev})
def test_constrained_port_delete(self):
with self.port() as port:
rev = port['port']['revision_number']
for val in (rev - 1, rev + 1):
# make sure off-by ones are rejected
self._delete('ports', port['port']['id'],
headers={'If-Match': 'revision_number=%s' % val},
expected_code=exc.HTTPPreconditionFailed.code)
# correct revision should work
self._delete('ports', port['port']['id'],
headers={'If-Match': 'revision_number=%s' % rev})
def test_constrained_port_update_handles_db_retries(self):
# here we ensure all of the constraint handling logic persists
# on retriable failures to commit caused by races with another
# update
with self.port() as port:
rev = port['port']['revision_number']
new = {'port': {'name': 'nigiri'}}
def concurrent_increment(s):
db_api.sqla_remove(se.Session, 'before_commit',
concurrent_increment)
# slip in a concurrent update that will bump the revision
self._update('ports', port['port']['id'], new)
raise db_exc.DBDeadlock()
db_api.sqla_listen(se.Session, 'before_commit',
concurrent_increment)
self._update('ports', port['port']['id'], new,
headers={'If-Match': 'revision_number=%s' % rev},
expected_code=exc.HTTPPreconditionFailed.code)
def test_port_ip_update_revises(self): def test_port_ip_update_revises(self):
with self.port() as port: with self.port() as port:
rev = port['port']['revision_number'] rev = port['port']['revision_number']

View File

@ -48,7 +48,8 @@ class ExpectedException(testtools.ExpectedException):
def create_request(path, body, content_type, method='GET', def create_request(path, body, content_type, method='GET',
query_string=None, context=None): query_string=None, context=None, headers=None):
headers = headers or {}
if query_string: if query_string:
url = "%s?%s" % (path, query_string) url = "%s?%s" % (path, query_string)
else: else:
@ -57,6 +58,7 @@ def create_request(path, body, content_type, method='GET',
req.method = method req.method = method
req.headers = {} req.headers = {}
req.headers['Accept'] = content_type req.headers['Accept'] = content_type
req.headers.update(headers)
if isinstance(body, six.text_type): if isinstance(body, six.text_type):
req.body = body.encode() req.body = body.encode()
else: else:

View File

@ -0,0 +1,14 @@
---
prelude: >
The Neutron API now supports conditional updates to resources with the
'revision_number' attribute by setting the desired revision number in
an HTTP If-Match header. This allows clients to ensure that a resource
hasn't been modified since it was retrieved by the client.
features:
- |
The Neutron API now supports conditional updates to resources with the
'revision_number' attribute by setting the desired revision number in
an HTTP If-Match header. This allows clients to ensure that a resource
hasn't been modified since it was retrieved by the client. Support for
conditional updates on the server can be checked for by looking for the
'revision-if-match' extension in the supported extensions.