Remove keystoneclient.middleware
The code has been moved to the new keystonemiddleware project and keystone.middleware was deprecated since Juno. It's time to drop it in Liberty. Remove the directory keystoneclient/middleware/. Remove test_auth_token_middleware.py, test_memcache_crypt.py and test_s3_token_middleware.py in keystoneclient/tests/unit/. Remove the create_middleware_cert shell function from examples/pki/gen_pki.sh. And remove the call from examples/pki/run_all.sh. Remove netaddr, pycrypto and WebOb test dependencies, only needed to test the removed middleware. DocImpact: The keystoneclient.middleware module has been removed Closes-Bug: #1449066 Change-Id: I88ddfdb674db1ec9c0fd4f9a62ae8347785ea10c
This commit is contained in:
		@@ -191,11 +191,6 @@ function issue_certs {
 | 
			
		||||
  check_error $?
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
function create_middleware_cert {
 | 
			
		||||
  cp $CERTS_DIR/ssl_cert.pem $CERTS_DIR/middleware.pem
 | 
			
		||||
  cat $PRIVATE_DIR/ssl_key.pem >> $CERTS_DIR/middleware.pem
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
function check_openssl {
 | 
			
		||||
  echo 'Checking openssl availability ...'
 | 
			
		||||
  which openssl
 | 
			
		||||
 
 | 
			
		||||
@@ -26,6 +26,5 @@ generate_ca
 | 
			
		||||
ssl_cert_req
 | 
			
		||||
cms_signing_cert_req
 | 
			
		||||
issue_certs
 | 
			
		||||
create_middleware_cert
 | 
			
		||||
gen_sample_cms
 | 
			
		||||
cleanup
 | 
			
		||||
 
 | 
			
		||||
										
											
												File diff suppressed because it is too large
												Load Diff
											
										
									
								
							@@ -1,209 +0,0 @@
 | 
			
		||||
# Copyright 2010-2013 OpenStack Foundation
 | 
			
		||||
#
 | 
			
		||||
# Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
# you may not use this file except in compliance with the License.
 | 
			
		||||
# You may obtain a copy of the License at
 | 
			
		||||
#
 | 
			
		||||
#    http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
#
 | 
			
		||||
# Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
# distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
 | 
			
		||||
# implied.
 | 
			
		||||
# See the License for the specific language governing permissions and
 | 
			
		||||
# limitations under the License.
 | 
			
		||||
 | 
			
		||||
"""
 | 
			
		||||
Utilities for memcache encryption and integrity check.
 | 
			
		||||
 | 
			
		||||
Data should be serialized before entering these functions. Encryption
 | 
			
		||||
has a dependency on the pycrypto. If pycrypto is not available,
 | 
			
		||||
CryptoUnavailableError will be raised.
 | 
			
		||||
 | 
			
		||||
This module will not be called unless signing or encryption is enabled
 | 
			
		||||
in the config. It will always validate signatures, and will decrypt
 | 
			
		||||
data if encryption is enabled. It is not valid to mix protection
 | 
			
		||||
modes.
 | 
			
		||||
 | 
			
		||||
"""
 | 
			
		||||
 | 
			
		||||
import base64
 | 
			
		||||
import functools
 | 
			
		||||
import hashlib
 | 
			
		||||
import hmac
 | 
			
		||||
import math
 | 
			
		||||
import os
 | 
			
		||||
import sys
 | 
			
		||||
 | 
			
		||||
import six
 | 
			
		||||
 | 
			
		||||
# make sure pycrypto is available
 | 
			
		||||
try:
 | 
			
		||||
    from Crypto.Cipher import AES
 | 
			
		||||
except ImportError:
 | 
			
		||||
    AES = None
 | 
			
		||||
 | 
			
		||||
HASH_FUNCTION = hashlib.sha384
 | 
			
		||||
DIGEST_LENGTH = HASH_FUNCTION().digest_size
 | 
			
		||||
DIGEST_SPLIT = DIGEST_LENGTH // 3
 | 
			
		||||
DIGEST_LENGTH_B64 = 4 * int(math.ceil(DIGEST_LENGTH / 3.0))
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class InvalidMacError(Exception):
 | 
			
		||||
    """raise when unable to verify MACed data.
 | 
			
		||||
 | 
			
		||||
    This usually indicates that data had been expectedly modified in memcache.
 | 
			
		||||
 | 
			
		||||
    """
 | 
			
		||||
    pass
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class DecryptError(Exception):
 | 
			
		||||
    """raise when unable to decrypt encrypted data.
 | 
			
		||||
 | 
			
		||||
    """
 | 
			
		||||
    pass
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class CryptoUnavailableError(Exception):
 | 
			
		||||
    """raise when Python Crypto module is not available.
 | 
			
		||||
 | 
			
		||||
    """
 | 
			
		||||
    pass
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def assert_crypto_availability(f):
 | 
			
		||||
    """Ensure Crypto module is available."""
 | 
			
		||||
 | 
			
		||||
    @functools.wraps(f)
 | 
			
		||||
    def wrapper(*args, **kwds):
 | 
			
		||||
        if AES is None:
 | 
			
		||||
            raise CryptoUnavailableError()
 | 
			
		||||
        return f(*args, **kwds)
 | 
			
		||||
    return wrapper
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
if sys.version_info >= (3, 3):
 | 
			
		||||
    constant_time_compare = hmac.compare_digest
 | 
			
		||||
else:
 | 
			
		||||
    def constant_time_compare(first, second):
 | 
			
		||||
        """Returns True if both string inputs are equal, otherwise False.
 | 
			
		||||
 | 
			
		||||
        This function should take a constant amount of time regardless of
 | 
			
		||||
        how many characters in the strings match.
 | 
			
		||||
 | 
			
		||||
        """
 | 
			
		||||
        if len(first) != len(second):
 | 
			
		||||
            return False
 | 
			
		||||
        result = 0
 | 
			
		||||
        if six.PY3 and isinstance(first, bytes) and isinstance(second, bytes):
 | 
			
		||||
            for x, y in zip(first, second):
 | 
			
		||||
                result |= x ^ y
 | 
			
		||||
        else:
 | 
			
		||||
            for x, y in zip(first, second):
 | 
			
		||||
                result |= ord(x) ^ ord(y)
 | 
			
		||||
        return result == 0
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def derive_keys(token, secret, strategy):
 | 
			
		||||
    """Derives keys for MAC and ENCRYPTION from the user-provided
 | 
			
		||||
    secret. The resulting keys should be passed to the protect and
 | 
			
		||||
    unprotect functions.
 | 
			
		||||
 | 
			
		||||
    As suggested by NIST Special Publication 800-108, this uses the
 | 
			
		||||
    first 128 bits from the sha384 KDF for the obscured cache key
 | 
			
		||||
    value, the second 128 bits for the message authentication key and
 | 
			
		||||
    the remaining 128 bits for the encryption key.
 | 
			
		||||
 | 
			
		||||
    This approach is faster than computing a separate hmac as the KDF
 | 
			
		||||
    for each desired key.
 | 
			
		||||
    """
 | 
			
		||||
    digest = hmac.new(secret, token + strategy, HASH_FUNCTION).digest()
 | 
			
		||||
    return {'CACHE_KEY': digest[:DIGEST_SPLIT],
 | 
			
		||||
            'MAC': digest[DIGEST_SPLIT: 2 * DIGEST_SPLIT],
 | 
			
		||||
            'ENCRYPTION': digest[2 * DIGEST_SPLIT:],
 | 
			
		||||
            'strategy': strategy}
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def sign_data(key, data):
 | 
			
		||||
    """Sign the data using the defined function and the derived key."""
 | 
			
		||||
    mac = hmac.new(key, data, HASH_FUNCTION).digest()
 | 
			
		||||
    return base64.b64encode(mac)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@assert_crypto_availability
 | 
			
		||||
def encrypt_data(key, data):
 | 
			
		||||
    """Encrypt the data with the given secret key.
 | 
			
		||||
 | 
			
		||||
    Padding is n bytes of the value n, where 1 <= n <= blocksize.
 | 
			
		||||
    """
 | 
			
		||||
    iv = os.urandom(16)
 | 
			
		||||
    cipher = AES.new(key, AES.MODE_CBC, iv)
 | 
			
		||||
    padding = 16 - len(data) % 16
 | 
			
		||||
    return iv + cipher.encrypt(data + six.int2byte(padding) * padding)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@assert_crypto_availability
 | 
			
		||||
def decrypt_data(key, data):
 | 
			
		||||
    """Decrypt the data with the given secret key."""
 | 
			
		||||
    iv = data[:16]
 | 
			
		||||
    cipher = AES.new(key, AES.MODE_CBC, iv)
 | 
			
		||||
    try:
 | 
			
		||||
        result = cipher.decrypt(data[16:])
 | 
			
		||||
    except Exception:
 | 
			
		||||
        raise DecryptError('Encrypted data appears to be corrupted.')
 | 
			
		||||
 | 
			
		||||
    # Strip the last n padding bytes where n is the last value in
 | 
			
		||||
    # the plaintext
 | 
			
		||||
    return result[:-1 * six.byte2int([result[-1]])]
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def protect_data(keys, data):
 | 
			
		||||
    """Given keys and serialized data, returns an appropriately
 | 
			
		||||
    protected string suitable for storage in the cache.
 | 
			
		||||
 | 
			
		||||
    """
 | 
			
		||||
    if keys['strategy'] == b'ENCRYPT':
 | 
			
		||||
        data = encrypt_data(keys['ENCRYPTION'], data)
 | 
			
		||||
 | 
			
		||||
    encoded_data = base64.b64encode(data)
 | 
			
		||||
 | 
			
		||||
    signature = sign_data(keys['MAC'], encoded_data)
 | 
			
		||||
    return signature + encoded_data
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def unprotect_data(keys, signed_data):
 | 
			
		||||
    """Given keys and cached string data, verifies the signature,
 | 
			
		||||
    decrypts if necessary, and returns the original serialized data.
 | 
			
		||||
 | 
			
		||||
    """
 | 
			
		||||
    # cache backends return None when no data is found. We don't mind
 | 
			
		||||
    # that this particular special value is unsigned.
 | 
			
		||||
    if signed_data is None:
 | 
			
		||||
        return None
 | 
			
		||||
 | 
			
		||||
    # First we calculate the signature
 | 
			
		||||
    provided_mac = signed_data[:DIGEST_LENGTH_B64]
 | 
			
		||||
    calculated_mac = sign_data(
 | 
			
		||||
        keys['MAC'],
 | 
			
		||||
        signed_data[DIGEST_LENGTH_B64:])
 | 
			
		||||
 | 
			
		||||
    # Then verify that it matches the provided value
 | 
			
		||||
    if not constant_time_compare(provided_mac, calculated_mac):
 | 
			
		||||
        raise InvalidMacError('Invalid MAC; data appears to be corrupted.')
 | 
			
		||||
 | 
			
		||||
    data = base64.b64decode(signed_data[DIGEST_LENGTH_B64:])
 | 
			
		||||
 | 
			
		||||
    # then if necessary decrypt the data
 | 
			
		||||
    if keys['strategy'] == b'ENCRYPT':
 | 
			
		||||
        data = decrypt_data(keys['ENCRYPTION'], data)
 | 
			
		||||
 | 
			
		||||
    return data
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def get_cache_key(keys):
 | 
			
		||||
    """Given keys generated by derive_keys(), returns a base64
 | 
			
		||||
    encoded value suitable for use as a cache key in memcached.
 | 
			
		||||
 | 
			
		||||
    """
 | 
			
		||||
    return base64.b64encode(keys['CACHE_KEY'])
 | 
			
		||||
@@ -1,268 +0,0 @@
 | 
			
		||||
# Copyright 2012 OpenStack Foundation
 | 
			
		||||
# Copyright 2010 United States Government as represented by the
 | 
			
		||||
# Administrator of the National Aeronautics and Space Administration.
 | 
			
		||||
# Copyright 2011,2012 Akira YOSHIYAMA <akirayoshiyama@gmail.com>
 | 
			
		||||
# All Rights Reserved.
 | 
			
		||||
#
 | 
			
		||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
 | 
			
		||||
# not use this file except in compliance with the License. You may obtain
 | 
			
		||||
# a copy of the License at
 | 
			
		||||
#
 | 
			
		||||
#      http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
#
 | 
			
		||||
# Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 | 
			
		||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 | 
			
		||||
# License for the specific language governing permissions and limitations
 | 
			
		||||
# under the License.
 | 
			
		||||
 | 
			
		||||
# This source code is based ./auth_token.py and ./ec2_token.py.
 | 
			
		||||
# See them for their copyright.
 | 
			
		||||
 | 
			
		||||
"""
 | 
			
		||||
S3 TOKEN MIDDLEWARE
 | 
			
		||||
 | 
			
		||||
This WSGI component:
 | 
			
		||||
 | 
			
		||||
* Get a request from the swift3 middleware with an S3 Authorization
 | 
			
		||||
  access key.
 | 
			
		||||
* Validate s3 token in Keystone.
 | 
			
		||||
* Transform the account name to AUTH_%(tenant_name).
 | 
			
		||||
 | 
			
		||||
"""
 | 
			
		||||
 | 
			
		||||
import logging
 | 
			
		||||
 | 
			
		||||
from oslo_serialization import jsonutils
 | 
			
		||||
from oslo_utils import strutils
 | 
			
		||||
import requests
 | 
			
		||||
import six
 | 
			
		||||
from six.moves import urllib
 | 
			
		||||
import webob
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
PROTOCOL_NAME = 'S3 Token Authentication'
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
# TODO(kun): remove it after oslo merge this.
 | 
			
		||||
def split_path(path, minsegs=1, maxsegs=None, rest_with_last=False):
 | 
			
		||||
    """Validate and split the given HTTP request path.
 | 
			
		||||
 | 
			
		||||
    **Examples**::
 | 
			
		||||
 | 
			
		||||
        ['a'] = split_path('/a')
 | 
			
		||||
        ['a', None] = split_path('/a', 1, 2)
 | 
			
		||||
        ['a', 'c'] = split_path('/a/c', 1, 2)
 | 
			
		||||
        ['a', 'c', 'o/r'] = split_path('/a/c/o/r', 1, 3, True)
 | 
			
		||||
 | 
			
		||||
    :param path: HTTP Request path to be split
 | 
			
		||||
    :param minsegs: Minimum number of segments to be extracted
 | 
			
		||||
    :param maxsegs: Maximum number of segments to be extracted
 | 
			
		||||
    :param rest_with_last: If True, trailing data will be returned as part
 | 
			
		||||
                           of last segment.  If False, and there is
 | 
			
		||||
                           trailing data, raises ValueError.
 | 
			
		||||
    :returns: list of segments with a length of maxsegs (non-existent
 | 
			
		||||
              segments will return as None)
 | 
			
		||||
    :raises: ValueError if given an invalid path
 | 
			
		||||
    """
 | 
			
		||||
    if not maxsegs:
 | 
			
		||||
        maxsegs = minsegs
 | 
			
		||||
    if minsegs > maxsegs:
 | 
			
		||||
        raise ValueError('minsegs > maxsegs: %d > %d' % (minsegs, maxsegs))
 | 
			
		||||
    if rest_with_last:
 | 
			
		||||
        segs = path.split('/', maxsegs)
 | 
			
		||||
        minsegs += 1
 | 
			
		||||
        maxsegs += 1
 | 
			
		||||
        count = len(segs)
 | 
			
		||||
        if (segs[0] or count < minsegs or count > maxsegs or
 | 
			
		||||
                '' in segs[1:minsegs]):
 | 
			
		||||
            raise ValueError('Invalid path: %s' % urllib.parse.quote(path))
 | 
			
		||||
    else:
 | 
			
		||||
        minsegs += 1
 | 
			
		||||
        maxsegs += 1
 | 
			
		||||
        segs = path.split('/', maxsegs)
 | 
			
		||||
        count = len(segs)
 | 
			
		||||
        if (segs[0] or count < minsegs or count > maxsegs + 1 or
 | 
			
		||||
                '' in segs[1:minsegs] or
 | 
			
		||||
                (count == maxsegs + 1 and segs[maxsegs])):
 | 
			
		||||
            raise ValueError('Invalid path: %s' % urllib.parse.quote(path))
 | 
			
		||||
    segs = segs[1:maxsegs]
 | 
			
		||||
    segs.extend([None] * (maxsegs - 1 - len(segs)))
 | 
			
		||||
    return segs
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class ServiceError(Exception):
 | 
			
		||||
    pass
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class S3Token(object):
 | 
			
		||||
    """Auth Middleware that handles S3 authenticating client calls."""
 | 
			
		||||
 | 
			
		||||
    def __init__(self, app, conf):
 | 
			
		||||
        """Common initialization code."""
 | 
			
		||||
        self.app = app
 | 
			
		||||
        self.logger = logging.getLogger(conf.get('log_name', __name__))
 | 
			
		||||
        self.logger.debug('Starting the %s component', PROTOCOL_NAME)
 | 
			
		||||
        self.logger.warning(
 | 
			
		||||
            'This middleware module is deprecated as of v0.11.0 in favor of '
 | 
			
		||||
            'keystonemiddleware.s3_token - please update your WSGI pipeline '
 | 
			
		||||
            'to reference the new middleware package.')
 | 
			
		||||
        self.reseller_prefix = conf.get('reseller_prefix', 'AUTH_')
 | 
			
		||||
        # where to find the auth service (we use this to validate tokens)
 | 
			
		||||
 | 
			
		||||
        auth_host = conf.get('auth_host')
 | 
			
		||||
        auth_port = int(conf.get('auth_port', 35357))
 | 
			
		||||
        auth_protocol = conf.get('auth_protocol', 'https')
 | 
			
		||||
 | 
			
		||||
        self.request_uri = '%s://%s:%s' % (auth_protocol, auth_host, auth_port)
 | 
			
		||||
 | 
			
		||||
        # SSL
 | 
			
		||||
        insecure = strutils.bool_from_string(conf.get('insecure', False))
 | 
			
		||||
        cert_file = conf.get('certfile')
 | 
			
		||||
        key_file = conf.get('keyfile')
 | 
			
		||||
 | 
			
		||||
        if insecure:
 | 
			
		||||
            self.verify = False
 | 
			
		||||
        elif cert_file and key_file:
 | 
			
		||||
            self.verify = (cert_file, key_file)
 | 
			
		||||
        elif cert_file:
 | 
			
		||||
            self.verify = cert_file
 | 
			
		||||
        else:
 | 
			
		||||
            self.verify = None
 | 
			
		||||
 | 
			
		||||
    def deny_request(self, code):
 | 
			
		||||
        error_table = {
 | 
			
		||||
            'AccessDenied': (401, 'Access denied'),
 | 
			
		||||
            'InvalidURI': (400, 'Could not parse the specified URI'),
 | 
			
		||||
        }
 | 
			
		||||
        resp = webob.Response(content_type='text/xml')
 | 
			
		||||
        resp.status = error_table[code][0]
 | 
			
		||||
        error_msg = ('<?xml version="1.0" encoding="UTF-8"?>\r\n'
 | 
			
		||||
                     '<Error>\r\n  <Code>%s</Code>\r\n  '
 | 
			
		||||
                     '<Message>%s</Message>\r\n</Error>\r\n' %
 | 
			
		||||
                     (code, error_table[code][1]))
 | 
			
		||||
        if six.PY3:
 | 
			
		||||
            error_msg = error_msg.encode()
 | 
			
		||||
        resp.body = error_msg
 | 
			
		||||
        return resp
 | 
			
		||||
 | 
			
		||||
    def _json_request(self, creds_json):
 | 
			
		||||
        headers = {'Content-Type': 'application/json'}
 | 
			
		||||
        try:
 | 
			
		||||
            response = requests.post('%s/v2.0/s3tokens' % self.request_uri,
 | 
			
		||||
                                     headers=headers, data=creds_json,
 | 
			
		||||
                                     verify=self.verify)
 | 
			
		||||
        except requests.exceptions.RequestException as e:
 | 
			
		||||
            self.logger.info('HTTP connection exception: %s', e)
 | 
			
		||||
            resp = self.deny_request('InvalidURI')
 | 
			
		||||
            raise ServiceError(resp)
 | 
			
		||||
 | 
			
		||||
        if response.status_code < 200 or response.status_code >= 300:
 | 
			
		||||
            self.logger.debug('Keystone reply error: status=%s reason=%s',
 | 
			
		||||
                              response.status_code, response.reason)
 | 
			
		||||
            resp = self.deny_request('AccessDenied')
 | 
			
		||||
            raise ServiceError(resp)
 | 
			
		||||
 | 
			
		||||
        return response
 | 
			
		||||
 | 
			
		||||
    def __call__(self, environ, start_response):
 | 
			
		||||
        """Handle incoming request. authenticate and send downstream."""
 | 
			
		||||
        req = webob.Request(environ)
 | 
			
		||||
        self.logger.debug('Calling S3Token middleware.')
 | 
			
		||||
 | 
			
		||||
        try:
 | 
			
		||||
            parts = split_path(req.path, 1, 4, True)
 | 
			
		||||
            version, account, container, obj = parts
 | 
			
		||||
        except ValueError:
 | 
			
		||||
            msg = 'Not a path query, skipping.'
 | 
			
		||||
            self.logger.debug(msg)
 | 
			
		||||
            return self.app(environ, start_response)
 | 
			
		||||
 | 
			
		||||
        # Read request signature and access id.
 | 
			
		||||
        if 'Authorization' not in req.headers:
 | 
			
		||||
            msg = 'No Authorization header. skipping.'
 | 
			
		||||
            self.logger.debug(msg)
 | 
			
		||||
            return self.app(environ, start_response)
 | 
			
		||||
 | 
			
		||||
        token = req.headers.get('X-Auth-Token',
 | 
			
		||||
                                req.headers.get('X-Storage-Token'))
 | 
			
		||||
        if not token:
 | 
			
		||||
            msg = 'You did not specify an auth or a storage token. skipping.'
 | 
			
		||||
            self.logger.debug(msg)
 | 
			
		||||
            return self.app(environ, start_response)
 | 
			
		||||
 | 
			
		||||
        auth_header = req.headers['Authorization']
 | 
			
		||||
        try:
 | 
			
		||||
            access, signature = auth_header.split(' ')[-1].rsplit(':', 1)
 | 
			
		||||
        except ValueError:
 | 
			
		||||
            msg = 'You have an invalid Authorization header: %s'
 | 
			
		||||
            self.logger.debug(msg, auth_header)
 | 
			
		||||
            return self.deny_request('InvalidURI')(environ, start_response)
 | 
			
		||||
 | 
			
		||||
        # NOTE(chmou): This is to handle the special case with nova
 | 
			
		||||
        # when we have the option s3_affix_tenant. We will force it to
 | 
			
		||||
        # connect to another account than the one
 | 
			
		||||
        # authenticated. Before people start getting worried about
 | 
			
		||||
        # security, I should point that we are connecting with
 | 
			
		||||
        # username/token specified by the user but instead of
 | 
			
		||||
        # connecting to its own account we will force it to go to an
 | 
			
		||||
        # another account. In a normal scenario if that user don't
 | 
			
		||||
        # have the reseller right it will just fail but since the
 | 
			
		||||
        # reseller account can connect to every account it is allowed
 | 
			
		||||
        # by the swift_auth middleware.
 | 
			
		||||
        force_tenant = None
 | 
			
		||||
        if ':' in access:
 | 
			
		||||
            access, force_tenant = access.split(':')
 | 
			
		||||
 | 
			
		||||
        # Authenticate request.
 | 
			
		||||
        creds = {'credentials': {'access': access,
 | 
			
		||||
                                 'token': token,
 | 
			
		||||
                                 'signature': signature}}
 | 
			
		||||
        creds_json = jsonutils.dumps(creds)
 | 
			
		||||
        self.logger.debug('Connecting to Keystone sending this JSON: %s',
 | 
			
		||||
                          creds_json)
 | 
			
		||||
        # NOTE(vish): We could save a call to keystone by having
 | 
			
		||||
        #             keystone return token, tenant, user, and roles
 | 
			
		||||
        #             from this call.
 | 
			
		||||
        #
 | 
			
		||||
        # NOTE(chmou): We still have the same problem we would need to
 | 
			
		||||
        #              change token_auth to detect if we already
 | 
			
		||||
        #              identified and not doing a second query and just
 | 
			
		||||
        #              pass it through to swiftauth in this case.
 | 
			
		||||
        try:
 | 
			
		||||
            resp = self._json_request(creds_json)
 | 
			
		||||
        except ServiceError as e:
 | 
			
		||||
            resp = e.args[0]
 | 
			
		||||
            msg = 'Received error, exiting middleware with error: %s'
 | 
			
		||||
            self.logger.debug(msg, resp.status_code)
 | 
			
		||||
            return resp(environ, start_response)
 | 
			
		||||
 | 
			
		||||
        self.logger.debug('Keystone Reply: Status: %d, Output: %s',
 | 
			
		||||
                          resp.status_code, resp.content)
 | 
			
		||||
 | 
			
		||||
        try:
 | 
			
		||||
            identity_info = resp.json()
 | 
			
		||||
            token_id = str(identity_info['access']['token']['id'])
 | 
			
		||||
            tenant = identity_info['access']['token']['tenant']
 | 
			
		||||
        except (ValueError, KeyError):
 | 
			
		||||
            error = 'Error on keystone reply: %d %s'
 | 
			
		||||
            self.logger.debug(error, resp.status_code, resp.content)
 | 
			
		||||
            return self.deny_request('InvalidURI')(environ, start_response)
 | 
			
		||||
 | 
			
		||||
        req.headers['X-Auth-Token'] = token_id
 | 
			
		||||
        tenant_to_connect = force_tenant or tenant['id']
 | 
			
		||||
        self.logger.debug('Connecting with tenant: %s', tenant_to_connect)
 | 
			
		||||
        new_tenant_name = '%s%s' % (self.reseller_prefix, tenant_to_connect)
 | 
			
		||||
        environ['PATH_INFO'] = environ['PATH_INFO'].replace(account,
 | 
			
		||||
                                                            new_tenant_name)
 | 
			
		||||
        return self.app(environ, start_response)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def filter_factory(global_conf, **local_conf):
 | 
			
		||||
    """Returns a WSGI filter app for use with paste.deploy."""
 | 
			
		||||
    conf = global_conf.copy()
 | 
			
		||||
    conf.update(local_conf)
 | 
			
		||||
 | 
			
		||||
    def auth_filter(app):
 | 
			
		||||
        return S3Token(app, conf)
 | 
			
		||||
    return auth_filter
 | 
			
		||||
										
											
												File diff suppressed because it is too large
												Load Diff
											
										
									
								
							@@ -1,97 +0,0 @@
 | 
			
		||||
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
 | 
			
		||||
#    not use this file except in compliance with the License. You may obtain
 | 
			
		||||
#    a copy of the License at
 | 
			
		||||
#
 | 
			
		||||
#         http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
#
 | 
			
		||||
#    Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 | 
			
		||||
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 | 
			
		||||
#    License for the specific language governing permissions and limitations
 | 
			
		||||
#    under the License.
 | 
			
		||||
 | 
			
		||||
import six
 | 
			
		||||
import testtools
 | 
			
		||||
 | 
			
		||||
from keystoneclient.middleware import memcache_crypt
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class MemcacheCryptPositiveTests(testtools.TestCase):
 | 
			
		||||
    def _setup_keys(self, strategy):
 | 
			
		||||
        return memcache_crypt.derive_keys(b'token', b'secret', strategy)
 | 
			
		||||
 | 
			
		||||
    def test_constant_time_compare(self):
 | 
			
		||||
        # make sure it works as a compare, the "constant time" aspect
 | 
			
		||||
        # isn't appropriate to test in unittests
 | 
			
		||||
        ctc = memcache_crypt.constant_time_compare
 | 
			
		||||
        self.assertTrue(ctc('abcd', 'abcd'))
 | 
			
		||||
        self.assertTrue(ctc('', ''))
 | 
			
		||||
        self.assertFalse(ctc('abcd', 'efgh'))
 | 
			
		||||
        self.assertFalse(ctc('abc', 'abcd'))
 | 
			
		||||
        self.assertFalse(ctc('abc', 'abc\x00'))
 | 
			
		||||
        self.assertFalse(ctc('', 'abc'))
 | 
			
		||||
 | 
			
		||||
        # For Python 3, we want to test these functions with both str and bytes
 | 
			
		||||
        # as input.
 | 
			
		||||
        if six.PY3:
 | 
			
		||||
            self.assertTrue(ctc(b'abcd', b'abcd'))
 | 
			
		||||
            self.assertTrue(ctc(b'', b''))
 | 
			
		||||
            self.assertFalse(ctc(b'abcd', b'efgh'))
 | 
			
		||||
            self.assertFalse(ctc(b'abc', b'abcd'))
 | 
			
		||||
            self.assertFalse(ctc(b'abc', b'abc\x00'))
 | 
			
		||||
            self.assertFalse(ctc(b'', b'abc'))
 | 
			
		||||
 | 
			
		||||
    def test_derive_keys(self):
 | 
			
		||||
        keys = self._setup_keys(b'strategy')
 | 
			
		||||
        self.assertEqual(len(keys['ENCRYPTION']),
 | 
			
		||||
                         len(keys['CACHE_KEY']))
 | 
			
		||||
        self.assertEqual(len(keys['CACHE_KEY']),
 | 
			
		||||
                         len(keys['MAC']))
 | 
			
		||||
        self.assertNotEqual(keys['ENCRYPTION'],
 | 
			
		||||
                            keys['MAC'])
 | 
			
		||||
        self.assertIn('strategy', keys.keys())
 | 
			
		||||
 | 
			
		||||
    def test_key_strategy_diff(self):
 | 
			
		||||
        k1 = self._setup_keys(b'MAC')
 | 
			
		||||
        k2 = self._setup_keys(b'ENCRYPT')
 | 
			
		||||
        self.assertNotEqual(k1, k2)
 | 
			
		||||
 | 
			
		||||
    def test_sign_data(self):
 | 
			
		||||
        keys = self._setup_keys(b'MAC')
 | 
			
		||||
        sig = memcache_crypt.sign_data(keys['MAC'], b'data')
 | 
			
		||||
        self.assertEqual(len(sig), memcache_crypt.DIGEST_LENGTH_B64)
 | 
			
		||||
 | 
			
		||||
    def test_encryption(self):
 | 
			
		||||
        keys = self._setup_keys(b'ENCRYPT')
 | 
			
		||||
        # what you put in is what you get out
 | 
			
		||||
        for data in [b'data', b'1234567890123456', b'\x00\xFF' * 13
 | 
			
		||||
                     ] + [six.int2byte(x % 256) * x for x in range(768)]:
 | 
			
		||||
            crypt = memcache_crypt.encrypt_data(keys['ENCRYPTION'], data)
 | 
			
		||||
            decrypt = memcache_crypt.decrypt_data(keys['ENCRYPTION'], crypt)
 | 
			
		||||
            self.assertEqual(data, decrypt)
 | 
			
		||||
            self.assertRaises(memcache_crypt.DecryptError,
 | 
			
		||||
                              memcache_crypt.decrypt_data,
 | 
			
		||||
                              keys['ENCRYPTION'], crypt[:-1])
 | 
			
		||||
 | 
			
		||||
    def test_protect_wrappers(self):
 | 
			
		||||
        data = b'My Pretty Little Data'
 | 
			
		||||
        for strategy in [b'MAC', b'ENCRYPT']:
 | 
			
		||||
            keys = self._setup_keys(strategy)
 | 
			
		||||
            protected = memcache_crypt.protect_data(keys, data)
 | 
			
		||||
            self.assertNotEqual(protected, data)
 | 
			
		||||
            if strategy == b'ENCRYPT':
 | 
			
		||||
                self.assertNotIn(data, protected)
 | 
			
		||||
            unprotected = memcache_crypt.unprotect_data(keys, protected)
 | 
			
		||||
            self.assertEqual(data, unprotected)
 | 
			
		||||
            self.assertRaises(memcache_crypt.InvalidMacError,
 | 
			
		||||
                              memcache_crypt.unprotect_data,
 | 
			
		||||
                              keys, protected[:-1])
 | 
			
		||||
            self.assertIsNone(memcache_crypt.unprotect_data(keys, None))
 | 
			
		||||
 | 
			
		||||
    def test_no_pycrypt(self):
 | 
			
		||||
        aes = memcache_crypt.AES
 | 
			
		||||
        memcache_crypt.AES = None
 | 
			
		||||
        self.assertRaises(memcache_crypt.CryptoUnavailableError,
 | 
			
		||||
                          memcache_crypt.encrypt_data, 'token', 'secret',
 | 
			
		||||
                          'data')
 | 
			
		||||
        memcache_crypt.AES = aes
 | 
			
		||||
@@ -1,259 +0,0 @@
 | 
			
		||||
# Copyright 2012 OpenStack Foundation
 | 
			
		||||
#
 | 
			
		||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
 | 
			
		||||
# not use this file except in compliance with the License. You may obtain
 | 
			
		||||
# a copy of the License at
 | 
			
		||||
#
 | 
			
		||||
#      http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
#
 | 
			
		||||
# Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 | 
			
		||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 | 
			
		||||
# License for the specific language governing permissions and limitations
 | 
			
		||||
# under the License.
 | 
			
		||||
 | 
			
		||||
import mock
 | 
			
		||||
from oslo_serialization import jsonutils
 | 
			
		||||
import requests
 | 
			
		||||
import six
 | 
			
		||||
import testtools
 | 
			
		||||
import webob
 | 
			
		||||
 | 
			
		||||
from keystoneclient.middleware import s3_token
 | 
			
		||||
from keystoneclient.tests.unit import utils
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
GOOD_RESPONSE = {'access': {'token': {'id': 'TOKEN_ID',
 | 
			
		||||
                                      'tenant': {'id': 'TENANT_ID'}}}}
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class FakeApp(object):
 | 
			
		||||
    """This represents a WSGI app protected by the auth_token middleware."""
 | 
			
		||||
    def __call__(self, env, start_response):
 | 
			
		||||
        resp = webob.Response()
 | 
			
		||||
        resp.environ = env
 | 
			
		||||
        return resp(env, start_response)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class S3TokenMiddlewareTestBase(utils.TestCase):
 | 
			
		||||
 | 
			
		||||
    TEST_PROTOCOL = 'https'
 | 
			
		||||
    TEST_HOST = 'fakehost'
 | 
			
		||||
    TEST_PORT = 35357
 | 
			
		||||
    TEST_URL = '%s://%s:%d/v2.0/s3tokens' % (TEST_PROTOCOL,
 | 
			
		||||
                                             TEST_HOST,
 | 
			
		||||
                                             TEST_PORT)
 | 
			
		||||
 | 
			
		||||
    def setUp(self):
 | 
			
		||||
        super(S3TokenMiddlewareTestBase, self).setUp()
 | 
			
		||||
 | 
			
		||||
        self.conf = {
 | 
			
		||||
            'auth_host': self.TEST_HOST,
 | 
			
		||||
            'auth_port': self.TEST_PORT,
 | 
			
		||||
            'auth_protocol': self.TEST_PROTOCOL,
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
    def start_fake_response(self, status, headers):
 | 
			
		||||
        self.response_status = int(status.split(' ', 1)[0])
 | 
			
		||||
        self.response_headers = dict(headers)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class S3TokenMiddlewareTestGood(S3TokenMiddlewareTestBase):
 | 
			
		||||
 | 
			
		||||
    def setUp(self):
 | 
			
		||||
        super(S3TokenMiddlewareTestGood, self).setUp()
 | 
			
		||||
        self.middleware = s3_token.S3Token(FakeApp(), self.conf)
 | 
			
		||||
 | 
			
		||||
        self.requests_mock.post(self.TEST_URL,
 | 
			
		||||
                                status_code=201,
 | 
			
		||||
                                json=GOOD_RESPONSE)
 | 
			
		||||
 | 
			
		||||
    # Ignore the request and pass to the next middleware in the
 | 
			
		||||
    # pipeline if no path has been specified.
 | 
			
		||||
    def test_no_path_request(self):
 | 
			
		||||
        req = webob.Request.blank('/')
 | 
			
		||||
        self.middleware(req.environ, self.start_fake_response)
 | 
			
		||||
        self.assertEqual(self.response_status, 200)
 | 
			
		||||
 | 
			
		||||
    # Ignore the request and pass to the next middleware in the
 | 
			
		||||
    # pipeline if no Authorization header has been specified
 | 
			
		||||
    def test_without_authorization(self):
 | 
			
		||||
        req = webob.Request.blank('/v1/AUTH_cfa/c/o')
 | 
			
		||||
        self.middleware(req.environ, self.start_fake_response)
 | 
			
		||||
        self.assertEqual(self.response_status, 200)
 | 
			
		||||
 | 
			
		||||
    def test_without_auth_storage_token(self):
 | 
			
		||||
        req = webob.Request.blank('/v1/AUTH_cfa/c/o')
 | 
			
		||||
        req.headers['Authorization'] = 'badboy'
 | 
			
		||||
        self.middleware(req.environ, self.start_fake_response)
 | 
			
		||||
        self.assertEqual(self.response_status, 200)
 | 
			
		||||
 | 
			
		||||
    def test_authorized(self):
 | 
			
		||||
        req = webob.Request.blank('/v1/AUTH_cfa/c/o')
 | 
			
		||||
        req.headers['Authorization'] = 'access:signature'
 | 
			
		||||
        req.headers['X-Storage-Token'] = 'token'
 | 
			
		||||
        req.get_response(self.middleware)
 | 
			
		||||
        self.assertTrue(req.path.startswith('/v1/AUTH_TENANT_ID'))
 | 
			
		||||
        self.assertEqual(req.headers['X-Auth-Token'], 'TOKEN_ID')
 | 
			
		||||
 | 
			
		||||
    def test_authorized_http(self):
 | 
			
		||||
        TEST_URL = 'http://%s:%d/v2.0/s3tokens' % (self.TEST_HOST,
 | 
			
		||||
                                                   self.TEST_PORT)
 | 
			
		||||
 | 
			
		||||
        self.requests_mock.post(TEST_URL, status_code=201, json=GOOD_RESPONSE)
 | 
			
		||||
 | 
			
		||||
        self.middleware = (
 | 
			
		||||
            s3_token.filter_factory({'auth_protocol': 'http',
 | 
			
		||||
                                     'auth_host': self.TEST_HOST,
 | 
			
		||||
                                     'auth_port': self.TEST_PORT})(FakeApp()))
 | 
			
		||||
        req = webob.Request.blank('/v1/AUTH_cfa/c/o')
 | 
			
		||||
        req.headers['Authorization'] = 'access:signature'
 | 
			
		||||
        req.headers['X-Storage-Token'] = 'token'
 | 
			
		||||
        req.get_response(self.middleware)
 | 
			
		||||
        self.assertTrue(req.path.startswith('/v1/AUTH_TENANT_ID'))
 | 
			
		||||
        self.assertEqual(req.headers['X-Auth-Token'], 'TOKEN_ID')
 | 
			
		||||
 | 
			
		||||
    def test_authorization_nova_toconnect(self):
 | 
			
		||||
        req = webob.Request.blank('/v1/AUTH_swiftint/c/o')
 | 
			
		||||
        req.headers['Authorization'] = 'access:FORCED_TENANT_ID:signature'
 | 
			
		||||
        req.headers['X-Storage-Token'] = 'token'
 | 
			
		||||
        req.get_response(self.middleware)
 | 
			
		||||
        path = req.environ['PATH_INFO']
 | 
			
		||||
        self.assertTrue(path.startswith('/v1/AUTH_FORCED_TENANT_ID'))
 | 
			
		||||
 | 
			
		||||
    @mock.patch.object(requests, 'post')
 | 
			
		||||
    def test_insecure(self, MOCK_REQUEST):
 | 
			
		||||
        self.middleware = (
 | 
			
		||||
            s3_token.filter_factory({'insecure': 'True'})(FakeApp()))
 | 
			
		||||
 | 
			
		||||
        text_return_value = jsonutils.dumps(GOOD_RESPONSE)
 | 
			
		||||
        if six.PY3:
 | 
			
		||||
            text_return_value = text_return_value.encode()
 | 
			
		||||
        MOCK_REQUEST.return_value = utils.TestResponse({
 | 
			
		||||
            'status_code': 201,
 | 
			
		||||
            'text': text_return_value})
 | 
			
		||||
 | 
			
		||||
        req = webob.Request.blank('/v1/AUTH_cfa/c/o')
 | 
			
		||||
        req.headers['Authorization'] = 'access:signature'
 | 
			
		||||
        req.headers['X-Storage-Token'] = 'token'
 | 
			
		||||
        req.get_response(self.middleware)
 | 
			
		||||
 | 
			
		||||
        self.assertTrue(MOCK_REQUEST.called)
 | 
			
		||||
        mock_args, mock_kwargs = MOCK_REQUEST.call_args
 | 
			
		||||
        self.assertIs(mock_kwargs['verify'], False)
 | 
			
		||||
 | 
			
		||||
    def test_insecure_option(self):
 | 
			
		||||
        # insecure is passed as a string.
 | 
			
		||||
 | 
			
		||||
        # Some non-secure values.
 | 
			
		||||
        true_values = ['true', 'True', '1', 'yes']
 | 
			
		||||
        for val in true_values:
 | 
			
		||||
            config = {'insecure': val, 'certfile': 'false_ind'}
 | 
			
		||||
            middleware = s3_token.filter_factory(config)(FakeApp())
 | 
			
		||||
            self.assertIs(False, middleware.verify)
 | 
			
		||||
 | 
			
		||||
        # Some "secure" values, including unexpected value.
 | 
			
		||||
        false_values = ['false', 'False', '0', 'no', 'someweirdvalue']
 | 
			
		||||
        for val in false_values:
 | 
			
		||||
            config = {'insecure': val, 'certfile': 'false_ind'}
 | 
			
		||||
            middleware = s3_token.filter_factory(config)(FakeApp())
 | 
			
		||||
            self.assertEqual('false_ind', middleware.verify)
 | 
			
		||||
 | 
			
		||||
        # Default is secure.
 | 
			
		||||
        config = {'certfile': 'false_ind'}
 | 
			
		||||
        middleware = s3_token.filter_factory(config)(FakeApp())
 | 
			
		||||
        self.assertIs('false_ind', middleware.verify)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class S3TokenMiddlewareTestBad(S3TokenMiddlewareTestBase):
 | 
			
		||||
    def setUp(self):
 | 
			
		||||
        super(S3TokenMiddlewareTestBad, self).setUp()
 | 
			
		||||
        self.middleware = s3_token.S3Token(FakeApp(), self.conf)
 | 
			
		||||
 | 
			
		||||
    def test_unauthorized_token(self):
 | 
			
		||||
        ret = {"error":
 | 
			
		||||
               {"message": "EC2 access key not found.",
 | 
			
		||||
                "code": 401,
 | 
			
		||||
                "title": "Unauthorized"}}
 | 
			
		||||
        self.requests_mock.post(self.TEST_URL, status_code=403, json=ret)
 | 
			
		||||
        req = webob.Request.blank('/v1/AUTH_cfa/c/o')
 | 
			
		||||
        req.headers['Authorization'] = 'access:signature'
 | 
			
		||||
        req.headers['X-Storage-Token'] = 'token'
 | 
			
		||||
        resp = req.get_response(self.middleware)
 | 
			
		||||
        s3_denied_req = self.middleware.deny_request('AccessDenied')
 | 
			
		||||
        self.assertEqual(resp.body, s3_denied_req.body)
 | 
			
		||||
        self.assertEqual(resp.status_int, s3_denied_req.status_int)
 | 
			
		||||
 | 
			
		||||
    def test_bogus_authorization(self):
 | 
			
		||||
        req = webob.Request.blank('/v1/AUTH_cfa/c/o')
 | 
			
		||||
        req.headers['Authorization'] = 'badboy'
 | 
			
		||||
        req.headers['X-Storage-Token'] = 'token'
 | 
			
		||||
        resp = req.get_response(self.middleware)
 | 
			
		||||
        self.assertEqual(resp.status_int, 400)
 | 
			
		||||
        s3_invalid_req = self.middleware.deny_request('InvalidURI')
 | 
			
		||||
        self.assertEqual(resp.body, s3_invalid_req.body)
 | 
			
		||||
        self.assertEqual(resp.status_int, s3_invalid_req.status_int)
 | 
			
		||||
 | 
			
		||||
    def test_fail_to_connect_to_keystone(self):
 | 
			
		||||
        with mock.patch.object(self.middleware, '_json_request') as o:
 | 
			
		||||
            s3_invalid_req = self.middleware.deny_request('InvalidURI')
 | 
			
		||||
            o.side_effect = s3_token.ServiceError(s3_invalid_req)
 | 
			
		||||
 | 
			
		||||
            req = webob.Request.blank('/v1/AUTH_cfa/c/o')
 | 
			
		||||
            req.headers['Authorization'] = 'access:signature'
 | 
			
		||||
            req.headers['X-Storage-Token'] = 'token'
 | 
			
		||||
            resp = req.get_response(self.middleware)
 | 
			
		||||
            self.assertEqual(resp.body, s3_invalid_req.body)
 | 
			
		||||
            self.assertEqual(resp.status_int, s3_invalid_req.status_int)
 | 
			
		||||
 | 
			
		||||
    def test_bad_reply(self):
 | 
			
		||||
        self.requests_mock.post(self.TEST_URL,
 | 
			
		||||
                                status_code=201,
 | 
			
		||||
                                text="<badreply>")
 | 
			
		||||
 | 
			
		||||
        req = webob.Request.blank('/v1/AUTH_cfa/c/o')
 | 
			
		||||
        req.headers['Authorization'] = 'access:signature'
 | 
			
		||||
        req.headers['X-Storage-Token'] = 'token'
 | 
			
		||||
        resp = req.get_response(self.middleware)
 | 
			
		||||
        s3_invalid_req = self.middleware.deny_request('InvalidURI')
 | 
			
		||||
        self.assertEqual(resp.body, s3_invalid_req.body)
 | 
			
		||||
        self.assertEqual(resp.status_int, s3_invalid_req.status_int)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class S3TokenMiddlewareTestUtil(testtools.TestCase):
 | 
			
		||||
    def test_split_path_failed(self):
 | 
			
		||||
        self.assertRaises(ValueError, s3_token.split_path, '')
 | 
			
		||||
        self.assertRaises(ValueError, s3_token.split_path, '/')
 | 
			
		||||
        self.assertRaises(ValueError, s3_token.split_path, '//')
 | 
			
		||||
        self.assertRaises(ValueError, s3_token.split_path, '//a')
 | 
			
		||||
        self.assertRaises(ValueError, s3_token.split_path, '/a/c')
 | 
			
		||||
        self.assertRaises(ValueError, s3_token.split_path, '//c')
 | 
			
		||||
        self.assertRaises(ValueError, s3_token.split_path, '/a/c/')
 | 
			
		||||
        self.assertRaises(ValueError, s3_token.split_path, '/a//')
 | 
			
		||||
        self.assertRaises(ValueError, s3_token.split_path, '/a', 2)
 | 
			
		||||
        self.assertRaises(ValueError, s3_token.split_path, '/a', 2, 3)
 | 
			
		||||
        self.assertRaises(ValueError, s3_token.split_path, '/a', 2, 3, True)
 | 
			
		||||
        self.assertRaises(ValueError, s3_token.split_path, '/a/c/o/r', 3, 3)
 | 
			
		||||
        self.assertRaises(ValueError, s3_token.split_path, '/a', 5, 4)
 | 
			
		||||
 | 
			
		||||
    def test_split_path_success(self):
 | 
			
		||||
        self.assertEqual(s3_token.split_path('/a'), ['a'])
 | 
			
		||||
        self.assertEqual(s3_token.split_path('/a/'), ['a'])
 | 
			
		||||
        self.assertEqual(s3_token.split_path('/a/c', 2), ['a', 'c'])
 | 
			
		||||
        self.assertEqual(s3_token.split_path('/a/c/o', 3), ['a', 'c', 'o'])
 | 
			
		||||
        self.assertEqual(s3_token.split_path('/a/c/o/r', 3, 3, True),
 | 
			
		||||
                         ['a', 'c', 'o/r'])
 | 
			
		||||
        self.assertEqual(s3_token.split_path('/a/c', 2, 3, True),
 | 
			
		||||
                         ['a', 'c', None])
 | 
			
		||||
        self.assertEqual(s3_token.split_path('/a/c/', 2), ['a', 'c'])
 | 
			
		||||
        self.assertEqual(s3_token.split_path('/a/c/', 2, 3), ['a', 'c', ''])
 | 
			
		||||
 | 
			
		||||
    def test_split_path_invalid_path(self):
 | 
			
		||||
        try:
 | 
			
		||||
            s3_token.split_path('o\nn e', 2)
 | 
			
		||||
        except ValueError as err:
 | 
			
		||||
            self.assertEqual(str(err), 'Invalid path: o%0An%20e')
 | 
			
		||||
        try:
 | 
			
		||||
            s3_token.split_path('o\nn e', 2, 3, True)
 | 
			
		||||
        except ValueError as err:
 | 
			
		||||
            self.assertEqual(str(err), 'Invalid path: o%0An%20e')
 | 
			
		||||
@@ -7,7 +7,6 @@ pbr>=0.6,!=0.7,<1.0
 | 
			
		||||
argparse
 | 
			
		||||
Babel>=1.3
 | 
			
		||||
iso8601>=0.1.9
 | 
			
		||||
netaddr>=0.7.12
 | 
			
		||||
oslo.config>=1.9.3  # Apache-2.0
 | 
			
		||||
oslo.i18n>=1.5.0  # Apache-2.0
 | 
			
		||||
oslo.serialization>=1.4.0               # Apache-2.0
 | 
			
		||||
 
 | 
			
		||||
@@ -14,11 +14,9 @@ mox3>=0.7.0
 | 
			
		||||
oauthlib>=0.6
 | 
			
		||||
oslosphinx>=2.5.0  # Apache-2.0
 | 
			
		||||
oslotest>=1.5.1  # Apache-2.0
 | 
			
		||||
pycrypto>=2.6
 | 
			
		||||
requests-mock>=0.6.0  # Apache-2.0
 | 
			
		||||
sphinx>=1.1.2,!=1.2.0,!=1.3b1,<1.3
 | 
			
		||||
tempest-lib>=0.4.0
 | 
			
		||||
testrepository>=0.0.18
 | 
			
		||||
testresources>=0.2.4
 | 
			
		||||
testtools>=0.9.36,!=1.2.0
 | 
			
		||||
WebOb>=1.2.3
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user