Rework completed. Added test cases, changed helper method name, etc.

This commit is contained in:
Masanori Itoh
2011-04-22 21:35:54 +09:00
parent 377d84494b
commit 4683480df0
3 changed files with 40 additions and 80 deletions

View File

@@ -1,48 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 NTT DATA CORPORATION.
# Copyright 2011 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Auth module specific utilities and helper functions.
"""
import netaddr
import string
def get_host_only_server_string(server_str):
"""
Returns host part only of the given server_string if it's a combination
of host part and port. Otherwise, return null string.
"""
# First of all, exclude pure IPv6 address (w/o port).
if netaddr.valid_ipv6(server_str):
return ''
# Next, check if this is IPv6 address with port number combination.
if server_str.find("]:") != -1:
[address, sep, port] = server_str.replace('[', '', 1).partition(']:')
return address
# Third, check if this is a combination of general address and port
if server_str.find(':') == -1:
return ''
# This must be a combination of host part and port
(address, port) = server_str.split(':')
return address

View File

@@ -35,7 +35,6 @@ from nova import flags
from nova import log as logging from nova import log as logging
from nova import utils from nova import utils
from nova.auth import signer from nova.auth import signer
from nova.auth import authutils
FLAGS = flags.FLAGS FLAGS = flags.FLAGS
@@ -316,13 +315,12 @@ class AuthManager(object):
LOG.debug(_('expected_signature: %s'), expected_signature) LOG.debug(_('expected_signature: %s'), expected_signature)
LOG.debug(_('signature: %s'), signature) LOG.debug(_('signature: %s'), signature)
if signature != expected_signature: if signature != expected_signature:
host_only = authutils.get_host_only_server_string( (addr_str, port_str) = utils.parse_server_string(server_string)
server_string)
# If the given server_string contains port num, try without it. # If the given server_string contains port num, try without it.
if host_only != '': if port_str != '':
host_only_signature = signer.Signer( host_only_signature = signer.Signer(
user.secret.encode()).generate(params, verb, user.secret.encode()).generate(params, verb,
host_only, path) addr_str, path)
LOG.debug(_('host_only_signature: %s'), LOG.debug(_('host_only_signature: %s'),
host_only_signature) host_only_signature)
if signature == host_only_signature: if signature == host_only_signature:

View File

@@ -25,7 +25,6 @@ from nova import log as logging
from nova import test from nova import test
from nova.auth import manager from nova.auth import manager
from nova.api.ec2 import cloud from nova.api.ec2 import cloud
from nova.auth import authutils
FLAGS = flags.FLAGS FLAGS = flags.FLAGS
LOG = logging.getLogger('nova.tests.auth_unittest') LOG = logging.getLogger('nova.tests.auth_unittest')
@@ -102,9 +101,43 @@ class _AuthManagerBaseTestCase(test.TestCase):
self.assertEqual('private-party', u.access) self.assertEqual('private-party', u.access)
def test_004_signature_is_valid(self): def test_004_signature_is_valid(self):
#self.assertTrue(self.manager.authenticate(**boto.generate_url ...? )) with user_generator(self.manager, name='admin', secret='admin',
pass access='admin'):
#raise NotImplementedError with project_generator(self.manager, name="admin",
manager_user='admin'):
accesskey = 'admin:admin'
expected_result = (self.manager.get_user('admin'),
self.manager.get_project('admin'))
# captured sig and query string using boto 1.9b/euca2ools 1.2
sig = 'd67Wzd9Bwz8xid9QU+lzWXcF2Y3tRicYABPJgrqfrwM='
auth_params = {'AWSAccessKeyId': 'admin:admin',
'Action': 'DescribeAvailabilityZones',
'SignatureMethod': 'HmacSHA256',
'SignatureVersion': '2',
'Timestamp': '2011-04-22T11:29:29',
'Version': '2009-11-30'}
self.assertTrue(expected_result, self.manager.authenticate(
accesskey,
sig,
auth_params,
'GET',
'127.0.0.1:8773',
'/services/Cloud/'))
# captured sig and query string using RightAWS 1.10.0
sig = 'ECYLU6xdFG0ZqRVhQybPJQNJ5W4B9n8fGs6+/fuGD2c='
auth_params = {'AWSAccessKeyId': 'admin:admin',
'Action': 'DescribeAvailabilityZones',
'SignatureMethod': 'HmacSHA256',
'SignatureVersion': '2',
'Timestamp': '2011-04-22T11:29:49.000Z',
'Version': '2008-12-01'}
self.assertTrue(expected_result, self.manager.authenticate(
accesskey,
sig,
auth_params,
'GET',
'127.0.0.1',
'/services/Cloud'))
def test_005_can_get_credentials(self): def test_005_can_get_credentials(self):
return return
@@ -340,29 +373,6 @@ class AuthManagerDbTestCase(_AuthManagerBaseTestCase):
auth_driver = 'nova.auth.dbdriver.DbDriver' auth_driver = 'nova.auth.dbdriver.DbDriver'
class AuthManagerUtilTestCase(test.TestCase):
def test_get_host_only_server_string(self):
result = authutils.get_host_only_server_string('::1')
self.assertEqual('', result)
result = authutils.get_host_only_server_string('[::1]:8773')
self.assertEqual('::1', result)
result = authutils.get_host_only_server_string('2001:db8::192.168.1.1')
self.assertEqual('', result)
result = authutils.get_host_only_server_string(
'[2001:db8::192.168.1.1]:8773')
self.assertEqual('2001:db8::192.168.1.1', result)
result = authutils.get_host_only_server_string('192.168.1.1')
self.assertEqual('', result)
result = authutils.get_host_only_server_string('192.168.1.2:8773')
self.assertEqual('192.168.1.2', result)
result = authutils.get_host_only_server_string('192.168.1.3')
self.assertEqual('', result)
result = authutils.get_host_only_server_string('www.example.com:8443')
self.assertEqual('www.example.com', result)
result = authutils.get_host_only_server_string('www.example.com')
self.assertEqual('', result)
if __name__ == "__main__": if __name__ == "__main__":
# TODO: Implement use_fake as an option # TODO: Implement use_fake as an option
unittest.main() unittest.main()