From 7ace25e25fafef7809789dc3b799c1943a45301d Mon Sep 17 00:00:00 2001 From: Dave Chen Date: Wed, 24 Dec 2014 20:41:20 +0800 Subject: [PATCH] Fix errors in ec2 signature logic checking - Check for colon in credentials['host'] instead of credentials['signature']. - Fix the syntax issue when trying to get the signature from the dict. - credentials['signature'] is not guaranteed to existed, so check it explicitly. - Need to reinitialize 'signer' to avoid contaminated status of signature. Closes-Bug: #1289115 Closes-Bug: #1434599 Change-Id: Idb5d97c30a20872fdaafea786bcea2631d70858c --- keystone/contrib/ec2/controllers.py | 34 +++-- keystone/tests/unit/test_contrib_ec2.py | 195 ++++++++++++++++++++++++ 2 files changed, 219 insertions(+), 10 deletions(-) create mode 100644 keystone/tests/unit/test_contrib_ec2.py diff --git a/keystone/contrib/ec2/controllers.py b/keystone/contrib/ec2/controllers.py index 25957d33f4..c2999b3e97 100644 --- a/keystone/contrib/ec2/controllers.py +++ b/keystone/contrib/ec2/controllers.py @@ -57,16 +57,30 @@ class Ec2ControllerCommon(object): def check_signature(self, creds_ref, credentials): signer = ec2_utils.Ec2Signer(creds_ref['secret']) signature = signer.generate(credentials) - if utils.auth_str_equal(credentials['signature'], signature): - return - # NOTE(vish): Some libraries don't use the port when signing - # requests, so try again without port. - elif ':' in credentials['signature']: - hostname, _port = credentials['host'].split(':') - credentials['host'] = hostname - signature = signer.generate(credentials) - if not utils.auth_str_equal(credentials.signature, signature): - raise exception.Unauthorized(message='Invalid EC2 signature.') + # NOTE(davechen): credentials.get('signature') is not guaranteed to + # exist, we need check it explicitly. + if credentials.get('signature'): + if utils.auth_str_equal(credentials['signature'], signature): + return True + # NOTE(vish): Some client libraries don't use the port when signing + # requests, so try again without port. + elif ':' in credentials['host']: + hostname, _port = credentials['host'].split(':') + credentials['host'] = hostname + # NOTE(davechen): we need reinitialize 'signer' to avoid + # contaminated status of signature, this is similar with + # other programming language libraries, JAVA for example. + signer = ec2_utils.Ec2Signer(creds_ref['secret']) + signature = signer.generate(credentials) + if utils.auth_str_equal(credentials['signature'], + signature): + return True + raise exception.Unauthorized( + message='Invalid EC2 signature.') + else: + raise exception.Unauthorized( + message='EC2 signature not supplied.') + # Raise the exception when credentials.get('signature') is None else: raise exception.Unauthorized(message='EC2 signature not supplied.') diff --git a/keystone/tests/unit/test_contrib_ec2.py b/keystone/tests/unit/test_contrib_ec2.py new file mode 100644 index 0000000000..dbb742d4d7 --- /dev/null +++ b/keystone/tests/unit/test_contrib_ec2.py @@ -0,0 +1,195 @@ +# Copyright 2015 Intel Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import uuid + +from keystoneclient.contrib.ec2 import utils as ec2_utils + +from keystone.contrib.ec2 import controllers +from keystone import exception +from keystone.tests import unit as tests +from keystone.tests.unit import default_fixtures +from keystone.tests.unit.ksfixtures import database + + +class TestCredentialEc2(tests.TestCase): + # TODO(davechen): more testcases for ec2 credential are expected here and + # the file name would be renamed to "test_credential" to correspond with + # "test_v3_credential.py". + def setUp(self): + super(TestCredentialEc2, self).setUp() + self.useFixture(database.Database()) + self.load_backends() + self.load_fixtures(default_fixtures) + self.user_id = self.user_foo['id'] + self.project_id = self.tenant_bar['id'] + self.blob = {'access': uuid.uuid4().hex, + 'secret': uuid.uuid4().hex} + self.controller = controllers.Ec2Controller() + self.creds_ref = {'user_id': self.user_id, + 'tenant_id': self.project_id, + 'access': self.blob['access'], + 'secret': self.blob['secret'], + 'trust_id': None} + + def test_signature_validate_no_host_port(self): + """Test signature validation with the access/secret provided.""" + access = self.blob['access'] + secret = self.blob['secret'] + signer = ec2_utils.Ec2Signer(secret) + params = {'SignatureMethod': 'HmacSHA256', + 'SignatureVersion': '2', + 'AWSAccessKeyId': access} + request = {'host': 'foo', + 'verb': 'GET', + 'path': '/bar', + 'params': params} + signature = signer.generate(request) + + sig_ref = {'access': access, + 'signature': signature, + 'host': 'foo', + 'verb': 'GET', + 'path': '/bar', + 'params': params} + + # Now validate the signature based on the dummy request + self.assertTrue(self.controller.check_signature(self.creds_ref, + sig_ref)) + + def test_signature_validate_with_host_port(self): + """Test signature validation when host is bound with port. + + Host is bound with a port, generally, the port here is not the + standard port for the protocol, like '80' for HTTP and port 443 + for HTTPS, the port is not omitted by the client library. + """ + access = self.blob['access'] + secret = self.blob['secret'] + signer = ec2_utils.Ec2Signer(secret) + params = {'SignatureMethod': 'HmacSHA256', + 'SignatureVersion': '2', + 'AWSAccessKeyId': access} + request = {'host': 'foo:8181', + 'verb': 'GET', + 'path': '/bar', + 'params': params} + signature = signer.generate(request) + + sig_ref = {'access': access, + 'signature': signature, + 'host': 'foo:8181', + 'verb': 'GET', + 'path': '/bar', + 'params': params} + + # Now validate the signature based on the dummy request + self.assertTrue(self.controller.check_signature(self.creds_ref, + sig_ref)) + + def test_signature_validate_with_missed_host_port(self): + """Test signature validation when host is bound with well-known port. + + Host is bound with a port, but the port is well-know port like '80' + for HTTP and port 443 for HTTPS, sometimes, client library omit + the port but then make the request with the port. + see (How to create the string to sign): 'http://docs.aws.amazon.com/ + general/latest/gr/signature-version-2.html'. + + Since "credentials['host']" is not set by client library but is + taken from "req.host", so caused the differences. + """ + access = self.blob['access'] + secret = self.blob['secret'] + signer = ec2_utils.Ec2Signer(secret) + params = {'SignatureMethod': 'HmacSHA256', + 'SignatureVersion': '2', + 'AWSAccessKeyId': access} + # Omit the port to generate the signature. + cnt_req = {'host': 'foo', + 'verb': 'GET', + 'path': '/bar', + 'params': params} + signature = signer.generate(cnt_req) + + sig_ref = {'access': access, + 'signature': signature, + 'host': 'foo:8080', + 'verb': 'GET', + 'path': '/bar', + 'params': params} + + # Now validate the signature based on the dummy request + # Check the signature again after omitting the port. + self.assertTrue(self.controller.check_signature(self.creds_ref, + sig_ref)) + + def test_signature_validate_no_signature(self): + """Signature is not presented in signature reference data.""" + access = self.blob['access'] + params = {'SignatureMethod': 'HmacSHA256', + 'SignatureVersion': '2', + 'AWSAccessKeyId': access} + + sig_ref = {'access': access, + 'signature': None, + 'host': 'foo:8080', + 'verb': 'GET', + 'path': '/bar', + 'params': params} + + creds_ref = {'user_id': self.user_id, + 'tenant_id': self.project_id, + 'access': self.blob['access'], + 'secret': self.blob['secret'], + 'trust_id': None + } + + # Now validate the signature based on the dummy request + self.assertRaises(exception.Unauthorized, + self.controller.check_signature, + creds_ref, sig_ref) + + def test_signature_validate_invalid_signature(self): + """Signature is not signed on the correct data.""" + access = self.blob['access'] + secret = self.blob['secret'] + signer = ec2_utils.Ec2Signer(secret) + params = {'SignatureMethod': 'HmacSHA256', + 'SignatureVersion': '2', + 'AWSAccessKeyId': access} + request = {'host': 'bar', + 'verb': 'GET', + 'path': '/bar', + 'params': params} + signature = signer.generate(request) + + sig_ref = {'access': access, + 'signature': signature, + 'host': 'foo:8080', + 'verb': 'GET', + 'path': '/bar', + 'params': params} + + creds_ref = {'user_id': self.user_id, + 'tenant_id': self.project_id, + 'access': self.blob['access'], + 'secret': self.blob['secret'], + 'trust_id': None + } + + # Now validate the signature based on the dummy request + self.assertRaises(exception.Unauthorized, + self.controller.check_signature, + creds_ref, sig_ref)