aws/ec2 api validation

Adds middleware to validate user-input to the aws/ec2 api.
This patch is a port to gerrit of this launchpad merge request:
    https://code.launchpad.net/~u-matt-h/nova/aws-api-validation/+merge/71962

blueprint aws-api-validation
bug 813685

Code started by Matthew Hooker, fixes by Joe Gordon

Change-Id: I9346ecd5e5051cb0126c13f7c771173bc23959b9
This commit is contained in:
Joe Gordon 2011-12-21 20:52:13 -05:00
parent 7265a71d99
commit 64341eedf9
4 changed files with 306 additions and 4 deletions

@ -36,9 +36,9 @@ use = egg:Paste#urlmap
/services/Admin: ec2admin
[pipeline:ec2cloud]
pipeline = ec2faultwrap logrequest ec2noauth cloudrequest authorizer ec2executor
pipeline = ec2faultwrap logrequest ec2noauth cloudrequest authorizer validator ec2executor
# NOTE(vish): use the following pipeline for deprecated auth
#pipeline = ec2faultwrap logrequest authenticate cloudrequest authorizer ec2executor
#pipeline = ec2faultwrap logrequest authenticate cloudrequest authorizer validator ec2executor
[pipeline:ec2admin]
pipeline = ec2faultwrap logrequest ec2noauth adminrequest authorizer ec2executor
@ -71,6 +71,9 @@ paste.filter_factory = nova.api.ec2:Requestify.factory
[filter:authorizer]
paste.filter_factory = nova.api.ec2:Authorizer.factory
[filter:validator]
paste.filter_factory = nova.api.ec2:Validator.factory
[app:ec2executor]
paste.app_factory = nova.api.ec2:Executor.factory

@ -28,6 +28,7 @@ import webob.exc
from nova.api.ec2 import apirequest
from nova.api.ec2 import ec2utils
from nova.api.ec2 import faults
from nova.api import validator
from nova.auth import manager
from nova import context
from nova import exception
@ -340,12 +341,52 @@ class Authorizer(wsgi.Middleware):
return any(role in context.roles for role in roles)
class Validator(wsgi.Middleware):
def validate_ec2_id(val):
if not validator.validate_str()(val):
return False
try:
ec2utils.ec2_id_to_id(val)
except exception.InvalidEc2Id:
return False
return True
validator.validate_ec2_id = validate_ec2_id
validator.DEFAULT_VALIDATOR = {
'instance_id': validator.validate_ec2_id,
'volume_id': validator.validate_ec2_id,
'image_id': validator.validate_ec2_id,
'attribute': validator.validate_str(),
'image_location': validator.validate_image_path,
'public_ip': validator.validate_ipv4,
'region_name': validator.validate_str(),
'group_name': validator.validate_str(max_length=255),
'group_description': validator.validate_str(max_length=255),
'size': validator.validate_int(),
'user_data': validator.validate_user_data
}
def __init__(self, application):
super(Validator, self).__init__(application)
@webob.dec.wsgify(RequestClass=wsgi.Request)
def __call__(self, req):
if validator.validate(req.environ['ec2.request'].args,
validator.DEFAULT_VALIDATOR):
return self.application
else:
raise webob.exc.HTTPBadRequest()
class Executor(wsgi.Application):
"""Execute an EC2 API request.
Executes 'ec2.request', passing 'nova.context' (both variables in WSGI
environ.) Returns an XML response, or a 400 upon failure.
Executes 'ec2.action' upon 'ec2.controller', passing 'nova.context' and
'ec2.action_args' (all variables in WSGI environ.) Returns an XML
response, or a 400 upon failure.
"""
@webob.dec.wsgify(RequestClass=wsgi.Request)

144
nova/api/validator.py Normal file

@ -0,0 +1,144 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 Cloudscaling, Inc.
# Author: Matthew Hooker <matt@cloudscaling.com>
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import base64
import logging
import re
import socket
from nova import exception
LOG = logging.getLogger("nova.api.validator")
def _get_path_validator_regex():
# rfc3986 path validator regex from
# http://jmrware.com/articles/2009/uri_regexp/URI_regex.html
pchar = "([A-Za-z0-9\-._~!$&'()*+,;=:@]|%[0-9A-Fa-f]{2})"
path = "((/{pchar}*)*|"
path += "/({pchar}+(/{pchar}*)*)?|"
path += "{pchar}+(/{pchar}*)*|"
path += "{pchar}+(/{pchar}*)*|)"
path = path.format(pchar=pchar)
return re.compile(path)
VALIDATE_PATH_RE = _get_path_validator_regex()
def validate_str(max_length=None):
def _do(val):
if not isinstance(val, basestring):
return False
if max_length and len(val) > max_length:
return False
return True
return _do
def validate_int(max_value=None):
def _do(val):
if not isinstance(val, int):
return False
if max_value and val > max_value:
return False
return True
return _do
def validate_url_path(val):
"""True if val is matched by the path component grammar in rfc3986."""
if not validate_str()(val):
return False
return VALIDATE_PATH_RE.match(val).end() == len(val)
def validate_image_path(val):
if not validate_str()(val):
return False
bucket_name = val.split('/')[0]
manifest_path = val[len(bucket_name) + 1:]
if not len(bucket_name) or not len(manifest_path):
return False
if val[0] == '/':
return False
# make sure the image path if rfc3986 compliant
# prepend '/' to make input validate
if not validate_url_path('/' + val):
return False
return True
def validate_ipv4(addr):
try:
socket.inet_aton(addr)
except (socket.error, TypeError):
return False
return True
def validate_user_data(user_data):
"""Check if the user_data is encoded properly"""
try:
user_data = base64.b64decode(user_data)
except TypeError:
return False
return True
def validate(args, validator):
"""Validate values of args against validators in validator.
args Dict of values to be validated.
validator A dict where the keys map to keys in args
and the values are validators.
Applies each validator to args[key]
A validator should be a callable which accepts 1 argument and which
returns True if the argument passes validation. False otherwise.
A validator should not raise an exception to indicate validity of the
argument.
Only validates keys which show up in both args and validator.
returns True if validation succeeds. Otherwise False.
"""
for key in validator:
if key not in args:
continue
f = validator[key]
assert callable(f)
if not f(args[key]):
msg = "%s with value %s failed validator %s" % (
key, args[key], f.__name__)
LOG.debug(_(msg))
return False
return True

@ -0,0 +1,114 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 Cloudscaling, Inc.
# Author: Matthew Hooker <matt@cloudscaling.com>
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import base64
from nova import test
from nova.api import validator
class ValidatorTestCase(test.TestCase):
def test_validate(self):
fixture = {
'foo': lambda val: val == True
}
self.assertTrue(
validator.validate({'foo': True}, fixture))
self.assertFalse(
validator.validate({'foo': False}, fixture))
def test_only_tests_intersect(self):
"""Test that validator.validate only tests the intersect of keys
from args and validator.
"""
fixture = {
'foo': lambda val: True,
'bar': lambda val: True
}
self.assertTrue(
validator.validate({'foo': True}, fixture))
self.assertTrue(
validator.validate({'foo': True, 'bar': True}, fixture))
self.assertTrue(
validator.validate({'foo': True, 'bar': True, 'baz': True},
fixture))
def test_validate_str(self):
self.assertTrue(validator.validate_str()('foo'))
self.assertFalse(validator.validate_str()(1))
self.assertTrue(validator.validate_str(4)('foo'))
self.assertFalse(validator.validate_str(2)('foo'))
self.assertFalse(validator.validate_str()(None))
self.assertTrue(validator.validate_str()(u'foo'))
def test_validate_int(self):
self.assertTrue(validator.validate_int()(1))
self.assertFalse(validator.validate_int()('foo'))
self.assertTrue(validator.validate_int(100)(1))
self.assertFalse(validator.validate_int(4)(5))
self.assertFalse(validator.validate_int()(None))
def test_validate_ec2_id(self):
self.assertFalse(validator.validate_ec2_id('foobar'))
self.assertFalse(validator.validate_ec2_id(''))
self.assertFalse(validator.validate_ec2_id(1234))
self.assertTrue(validator.validate_ec2_id('i-284f3a41'))
def test_validate_ipv4(self):
self.assertTrue(validator.validate_ipv4('4.2.2.4'))
self.assertFalse(validator.validate_ipv4('foobar'))
self.assertFalse(
validator.validate_ipv4('2001:5a8:4:68e0:e6ce:8fff:fe27:d116'))
self.assertFalse(validator.validate_ipv4(123))
self.assertFalse(validator.validate_ipv4(''))
def test_validate_url_path(self):
self.assertTrue(validator.validate_url_path('/path/to/file'))
self.assertFalse(validator.validate_url_path('path/to/file'))
self.assertFalse(
validator.validate_url_path('#this is not a path!@#$%^&*()')
)
self.assertFalse(validator.validate_url_path(None))
self.assertFalse(validator.validate_url_path(123))
def test_validate_image_path(self):
self.assertTrue(validator.validate_image_path('path/to/file'))
self.assertFalse(validator.validate_image_path('/path/to/file'))
self.assertFalse(validator.validate_image_path('path'))
def test_validate_user_data(self):
fixture = base64.b64encode('foo')
self.assertTrue(validator.validate_user_data(fixture))
self.assertFalse(validator.validate_user_data(False))
self.assertFalse(validator.validate_user_data('hello, world!'))
def test_default_validator(self):
expect_pass = {
'attribute': 'foobar'
}
self.assertTrue(validator.validate(expect_pass,
validator.DEFAULT_VALIDATOR))
expect_fail = {
'attribute': 0
}
self.assertFalse(validator.validate(expect_fail,
validator.DEFAULT_VALIDATOR))