diff --git a/keystone/tests/unit/test_contrib_ec2.py b/keystone/tests/unit/test_contrib_ec2.py deleted file mode 100644 index b3f5471c1a..0000000000 --- a/keystone/tests/unit/test_contrib_ec2.py +++ /dev/null @@ -1,193 +0,0 @@ -# Copyright 2015 Intel Corporation -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import uuid - -from keystoneclient.contrib.ec2 import utils as ec2_utils - -from keystone.contrib.ec2 import controllers -from keystone import exception -from keystone.tests import unit -from keystone.tests.unit import default_fixtures -from keystone.tests.unit.ksfixtures import database - - -class TestCredentialEc2(unit.TestCase): - # TODO(davechen): more testcases for ec2 credential are expected here and - # the file name would be renamed to "test_credential" to correspond with - # "test_v3_credential.py". - def setUp(self): - super(TestCredentialEc2, self).setUp() - self.useFixture(database.Database()) - self.load_backends() - self.load_fixtures(default_fixtures) - self.user_id = self.user_foo['id'] - self.project_id = self.tenant_bar['id'] - self.controller = controllers.Ec2Controller() - self.blob, tmp_ref = unit.new_ec2_credential( - user_id=self.user_id, - project_id=self.project_id) - - self.creds_ref = (controllers.Ec2Controller - ._convert_v3_to_ec2_credential(tmp_ref)) - - def test_signature_validate_no_host_port(self): - """Test signature validation with the access/secret provided.""" - access = self.blob['access'] - secret = self.blob['secret'] - signer = ec2_utils.Ec2Signer(secret) - params = {'SignatureMethod': 'HmacSHA256', - 'SignatureVersion': '2', - 'AWSAccessKeyId': access} - request = {'host': 'foo', - 'verb': 'GET', - 'path': '/bar', - 'params': params} - signature = signer.generate(request) - - sig_ref = {'access': access, - 'signature': signature, - 'host': 'foo', - 'verb': 'GET', - 'path': '/bar', - 'params': params} - - # Now validate the signature based on the dummy request - self.assertTrue(self.controller.check_signature(self.creds_ref, - sig_ref)) - - def test_signature_validate_with_host_port(self): - """Test signature validation when host is bound with port. - - Host is bound with a port, generally, the port here is not the - standard port for the protocol, like '80' for HTTP and port 443 - for HTTPS, the port is not omitted by the client library. - """ - access = self.blob['access'] - secret = self.blob['secret'] - signer = ec2_utils.Ec2Signer(secret) - params = {'SignatureMethod': 'HmacSHA256', - 'SignatureVersion': '2', - 'AWSAccessKeyId': access} - request = {'host': 'foo:8181', - 'verb': 'GET', - 'path': '/bar', - 'params': params} - signature = signer.generate(request) - - sig_ref = {'access': access, - 'signature': signature, - 'host': 'foo:8181', - 'verb': 'GET', - 'path': '/bar', - 'params': params} - - # Now validate the signature based on the dummy request - self.assertTrue(self.controller.check_signature(self.creds_ref, - sig_ref)) - - def test_signature_validate_with_missed_host_port(self): - """Test signature validation when host is bound with well-known port. - - Host is bound with a port, but the port is well-know port like '80' - for HTTP and port 443 for HTTPS, sometimes, client library omit - the port but then make the request with the port. - see (How to create the string to sign): 'http://docs.aws.amazon.com/ - general/latest/gr/signature-version-2.html'. - - Since "credentials['host']" is not set by client library but is - taken from "req.host", so caused the differences. - """ - access = self.blob['access'] - secret = self.blob['secret'] - signer = ec2_utils.Ec2Signer(secret) - params = {'SignatureMethod': 'HmacSHA256', - 'SignatureVersion': '2', - 'AWSAccessKeyId': access} - # Omit the port to generate the signature. - cnt_req = {'host': 'foo', - 'verb': 'GET', - 'path': '/bar', - 'params': params} - signature = signer.generate(cnt_req) - - sig_ref = {'access': access, - 'signature': signature, - 'host': 'foo:8080', - 'verb': 'GET', - 'path': '/bar', - 'params': params} - - # Now validate the signature based on the dummy request - # Check the signature again after omitting the port. - self.assertTrue(self.controller.check_signature(self.creds_ref, - sig_ref)) - - def test_signature_validate_no_signature(self): - """Signature is not presented in signature reference data.""" - access = self.blob['access'] - params = {'SignatureMethod': 'HmacSHA256', - 'SignatureVersion': '2', - 'AWSAccessKeyId': access} - - sig_ref = {'access': access, - 'signature': None, - 'host': 'foo:8080', - 'verb': 'GET', - 'path': '/bar', - 'params': params} - - # Now validate the signature based on the dummy request - self.assertRaises(exception.Unauthorized, - self.controller.check_signature, - self.creds_ref, sig_ref) - - def test_signature_validate_invalid_signature(self): - """Signature is not signed on the correct data.""" - access = self.blob['access'] - secret = self.blob['secret'] - signer = ec2_utils.Ec2Signer(secret) - params = {'SignatureMethod': 'HmacSHA256', - 'SignatureVersion': '2', - 'AWSAccessKeyId': access} - request = {'host': 'bar', - 'verb': 'GET', - 'path': '/bar', - 'params': params} - signature = signer.generate(request) - - sig_ref = {'access': access, - 'signature': signature, - 'host': 'foo:8080', - 'verb': 'GET', - 'path': '/bar', - 'params': params} - - # Now validate the signature based on the dummy request - self.assertRaises(exception.Unauthorized, - self.controller.check_signature, - self.creds_ref, sig_ref) - - def test_check_non_admin_user(self): - """Checking if user is admin causes uncaught error. - - When checking if a user is an admin, keystone.exception.Unauthorized - is raised but not caught if the user is not an admin. - """ - # make a non-admin user - context = {'is_admin': False, 'token_id': uuid.uuid4().hex} - - # check if user is admin - # no exceptions should be raised - self.controller._is_admin(context) diff --git a/keystone/tests/unit/test_credential.py b/keystone/tests/unit/test_credential.py index 2b8c7190e3..e917ef7188 100644 --- a/keystone/tests/unit/test_credential.py +++ b/keystone/tests/unit/test_credential.py @@ -14,19 +14,23 @@ import uuid +from keystoneclient.contrib.ec2 import utils as ec2_utils from six.moves import http_client from keystone.common import utils from keystone.contrib.ec2 import controllers +from keystone import exception from keystone.tests import unit +from keystone.tests.unit import default_fixtures +from keystone.tests.unit.ksfixtures import database from keystone.tests.unit import rest CRED_TYPE_EC2 = controllers.CRED_TYPE_EC2 -class TestCredentialEc2(rest.RestfulTestCase): +class V2CredentialEc2TestCase(rest.RestfulTestCase): def setUp(self): - super(TestCredentialEc2, self).setUp() + super(V2CredentialEc2TestCase, self).setUp() self.user_id = self.user_foo['id'] self.project_id = self.tenant_bar['id'] @@ -92,3 +96,170 @@ class TestCredentialEc2(rest.RestfulTestCase): # still one element because non-EC2 credentials are not returned. self.assertEqual(1, len(cred_list_2)) self.assertEqual(cred_list[0], cred_list_2[0]) + + +class V2CredentialEc2Controller(unit.TestCase): + def setUp(self): + super(V2CredentialEc2Controller, self).setUp() + self.useFixture(database.Database()) + self.load_backends() + self.load_fixtures(default_fixtures) + self.user_id = self.user_foo['id'] + self.project_id = self.tenant_bar['id'] + self.controller = controllers.Ec2Controller() + self.blob, tmp_ref = unit.new_ec2_credential( + user_id=self.user_id, + project_id=self.project_id) + + self.creds_ref = (controllers.Ec2Controller + ._convert_v3_to_ec2_credential(tmp_ref)) + + def test_signature_validate_no_host_port(self): + """Test signature validation with the access/secret provided.""" + access = self.blob['access'] + secret = self.blob['secret'] + signer = ec2_utils.Ec2Signer(secret) + params = {'SignatureMethod': 'HmacSHA256', + 'SignatureVersion': '2', + 'AWSAccessKeyId': access} + request = {'host': 'foo', + 'verb': 'GET', + 'path': '/bar', + 'params': params} + signature = signer.generate(request) + + sig_ref = {'access': access, + 'signature': signature, + 'host': 'foo', + 'verb': 'GET', + 'path': '/bar', + 'params': params} + + # Now validate the signature based on the dummy request + self.assertTrue(self.controller.check_signature(self.creds_ref, + sig_ref)) + + def test_signature_validate_with_host_port(self): + """Test signature validation when host is bound with port. + + Host is bound with a port, generally, the port here is not the + standard port for the protocol, like '80' for HTTP and port 443 + for HTTPS, the port is not omitted by the client library. + """ + access = self.blob['access'] + secret = self.blob['secret'] + signer = ec2_utils.Ec2Signer(secret) + params = {'SignatureMethod': 'HmacSHA256', + 'SignatureVersion': '2', + 'AWSAccessKeyId': access} + request = {'host': 'foo:8181', + 'verb': 'GET', + 'path': '/bar', + 'params': params} + signature = signer.generate(request) + + sig_ref = {'access': access, + 'signature': signature, + 'host': 'foo:8181', + 'verb': 'GET', + 'path': '/bar', + 'params': params} + + # Now validate the signature based on the dummy request + self.assertTrue(self.controller.check_signature(self.creds_ref, + sig_ref)) + + def test_signature_validate_with_missed_host_port(self): + """Test signature validation when host is bound with well-known port. + + Host is bound with a port, but the port is well-know port like '80' + for HTTP and port 443 for HTTPS, sometimes, client library omit + the port but then make the request with the port. + see (How to create the string to sign): 'http://docs.aws.amazon.com/ + general/latest/gr/signature-version-2.html'. + + Since "credentials['host']" is not set by client library but is + taken from "req.host", so caused the differences. + """ + access = self.blob['access'] + secret = self.blob['secret'] + signer = ec2_utils.Ec2Signer(secret) + params = {'SignatureMethod': 'HmacSHA256', + 'SignatureVersion': '2', + 'AWSAccessKeyId': access} + # Omit the port to generate the signature. + cnt_req = {'host': 'foo', + 'verb': 'GET', + 'path': '/bar', + 'params': params} + signature = signer.generate(cnt_req) + + sig_ref = {'access': access, + 'signature': signature, + 'host': 'foo:8080', + 'verb': 'GET', + 'path': '/bar', + 'params': params} + + # Now validate the signature based on the dummy request + # Check the signature again after omitting the port. + self.assertTrue(self.controller.check_signature(self.creds_ref, + sig_ref)) + + def test_signature_validate_no_signature(self): + """Signature is not presented in signature reference data.""" + access = self.blob['access'] + params = {'SignatureMethod': 'HmacSHA256', + 'SignatureVersion': '2', + 'AWSAccessKeyId': access} + + sig_ref = {'access': access, + 'signature': None, + 'host': 'foo:8080', + 'verb': 'GET', + 'path': '/bar', + 'params': params} + + # Now validate the signature based on the dummy request + self.assertRaises(exception.Unauthorized, + self.controller.check_signature, + self.creds_ref, sig_ref) + + def test_signature_validate_invalid_signature(self): + """Signature is not signed on the correct data.""" + access = self.blob['access'] + secret = self.blob['secret'] + signer = ec2_utils.Ec2Signer(secret) + params = {'SignatureMethod': 'HmacSHA256', + 'SignatureVersion': '2', + 'AWSAccessKeyId': access} + request = {'host': 'bar', + 'verb': 'GET', + 'path': '/bar', + 'params': params} + signature = signer.generate(request) + + sig_ref = {'access': access, + 'signature': signature, + 'host': 'foo:8080', + 'verb': 'GET', + 'path': '/bar', + 'params': params} + + # Now validate the signature based on the dummy request + self.assertRaises(exception.Unauthorized, + self.controller.check_signature, + self.creds_ref, sig_ref) + + def test_check_non_admin_user(self): + """Checking if user is admin causes uncaught error. + + When checking if a user is an admin, keystone.exception.Unauthorized + is raised but not caught if the user is not an admin. + """ + # make a non-admin user + context = {'is_admin': False, 'token_id': uuid.uuid4().hex} + + # check if user is admin + # no exceptions should be raised + self.controller._is_admin(context) diff --git a/tox.ini b/tox.ini index ddcc4111b7..fe1f9edf46 100644 --- a/tox.ini +++ b/tox.ini @@ -42,7 +42,6 @@ commands = keystone/tests/unit/test_backend_sql.py \ keystone/tests/unit/test_cli.py \ keystone/tests/unit/test_config.py \ - keystone/tests/unit/test_contrib_ec2.py \ keystone/tests/unit/test_contrib_s3_core.py \ keystone/tests/unit/test_driver_hints.py \ keystone/tests/unit/test_exception.py \