Add multiple reseller prefixes and composite tokens

This change is in support of Composite Tokens and Service Accounts
(see http://specs.openstack.org/openstack/swift-specs/specs/in_progress/
service_token.html)

During coding, minor changes were made compared to the original
specification. See https://review.openstack.org/138771 for these changes.

DocImpact

Change-Id: I6072b4efb3a479a8e0cc2d9c11ffda5764b55e30
This commit is contained in:
Donagh McCabe 2014-11-25 14:42:42 +00:00
parent e4b4e3e065
commit 89397c5b67
16 changed files with 2038 additions and 195 deletions

View File

@ -30,6 +30,10 @@ following actions occur:
The account owner can grant account and container access to users
through access control lists (ACLs).
In addition, it is possible to provide an additional token in the
''X-Service-Token'' header. More information about how this is used is in
:doc:`../overview_backing_store`.
The following list describes the authentication services that you can
use with Object Storage:

View File

@ -56,6 +56,7 @@ Overview and Concepts
overview_expiring_objects
cors
crossdomain
overview_backing_store
associated_projects
Developer Documentation

View File

@ -196,12 +196,87 @@ but in short:
for tempurl/formpost middleware, authtoken will need
to be configured with delay_auth_decision set to 1.
and you can finally add the keystoneauth configuration::
and you can finally add the keystoneauth configuration. Here is a simple
configuration::
[filter:keystoneauth]
use = egg:swift#keystoneauth
operator_roles = admin, swiftoperator
Use an appropriate list of roles in operator_roles. For example, in
some systems, the role ``_member_`` or ``Member`` is used to indicate
that the user is allowed to operate on project resources.
OpenStack Service Using Composite Tokens
----------------------------------------
Some Openstack services such as Cinder and Glance may use
a "service account". In this mode, you configure a separate account where
the service stores project data that it manages. This account is not used
directly by the end-user. Instead, all access is done through the service.
To access the "service" account, the service must present two tokens: one from
the end-user and another from its own service user. Only when both tokens are
present can the account be accessed. This section describes how to set the
configuration options to correctly control access to both the "normal" and
"service" accounts.
In this example, end users use the ``AUTH_`` prefix in account names,
whereas services use the ``SERVICE_`` prefix::
[filter:keystoneauth]
use = egg:swift#keystoneauth
reseller_prefix = AUTH, SERVICE
operator_roles = admin, swiftoperator
SERVICE_service_roles = service
The actual values for these variable will need to be set depending on your
situation as follows:
* The first item in the reseller_prefix list must match Keystone's endpoint
(see ``/etc/keystone/default_catalog.templates`` above). Normally
this is ``AUTH``.
* The second item in the reseller_prefix list is the prefix used by the
Openstack services(s). You must configure this value (``SERVICE`` in the
example) with whatever the other Openstack service(s) use.
* Set the operator_roles option to contain a role or roles that end-user's
have on project's they use.
* Set the SERVICE_service_roles value to a role or roles that only the
Openstack service user has. Do not use a role that is assigned to
"normal" end users. In this example, the role ``service`` is used.
The service user is granted this role to a *single* project only. You do
not need to make the service user a member of every project.
This configuration works as follows:
* The end-user presents a user token to an Openstack service. The service
then makes a Swift request to the account with the ``SERVICE`` prefix.
* The service forwards the original user token with the request. It also
adds it's own service token.
* Swift validates both tokens. When validated, the user token gives the
``admin`` or ``swiftoperator`` role(s). When validated, the service token
gives the ``service`` role.
* Swift interprets the above configuration as follows:
* Did the user token provide one of the roles listed in operator_roles?
* Did the service token have the ``service`` role as described by the
``SERVICE_service_roles`` options.
* If both conditions are met, the request is granted. Otherwise, Swift
rejects the request.
In the above example, all services share the same account. You can separate
each service into its own account. For example, the following provides a
dedicated account for each of the Glance and Cinder services. In addition,
you must assign the ``glance_service`` and ``cinder_service`` to the
appropriate service users::
[filter:keystoneauth]
use = egg:swift#keystoneauth
reseller_prefix = AUTH, IMAGE, VOLUME
operator_roles = admin, swiftoperator
IMAGE_service_roles = glance_service
VOLUME_service_roles = cinder_service
Access control using keystoneauth
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

View File

@ -0,0 +1,272 @@
=============================================
Using Swift as Backing Store for Service Data
=============================================
----------
Background
----------
This section provides guidance to OpenStack Service developers for how to
store your users' data in Swift. An example of this is that a user requests
that Nova save a snapshot of a VM. Nova passes the request to Glance,
Glance writes the image to a Swift container as a set of objects.
Throughout this section, the following terminology and concepts are used:
* User or end-user. This is a person making a request that will result in
an Openstack Service making a request to Swift.
* Project (also known as Tenant). This is the unit of resource ownership.
While data such as snapshot images or block volume backups may be
stored as a result of an end-user's request, the reality is that these
are project data.
* Service. This is a program or system used by end-users. Specifically, it
is any program or system that is capable of receiving end-user's tokens and
validating the token with the Keystone Service and has a need to store
data in Swift. Glance and Cinder are examples of such Services.
* Service User. This is a Keystone user that has been assigned to a Service.
This allows the Service to generate and use its own tokens so that it
can interact with other Services as itself.
* Service Project. This is a project (tenant) that is associated with a
Service. There may be a single project shared by many Services or there
may be a project dedicated to each Service. In this document, the
main purpose of the Service Project is to allow the system operator
to configure specific roles for each Service User.
-------------------------------
Alternate Backing Store Schemes
-------------------------------
There are three schemes described here:
* Dedicated Service Account (Single Tenant)
Your Service has a dedicated Service Project (hence a single dedicated
Swift account). Data for all users and projects are stored in this
account. Your Service must have a user assigned to it (the Service User).
When you have data to store on behalf of one of your users, you use the
Service User credentials to get a token for the Service Project and
request Swift to store the data in the Service Project.
With this scheme, data for all users is stored in a single account. This
is transparent to your users and since the credentials for the Service User
are typically not shared with anyone, your users' cannot access their
data by making a request directly to Swift. However, since data belonging
to all users is stored in one account, it presents a single point of
vulnerably to accidental deletion or a leak of the service-user
credentials.
* Multi Project (Multi Tenant)
Data belonging to a project is stored in the Swift account
associated with the project. Users make requests to your Service using
a token scoped to a project in the normal way. You can then use this
same token to store the user data in the project's Swift account.
The effect is that data is stored in multiple projects (aka tenants).
Hence this scheme has been known as the "multi tenant" scheme.
With this scheme, access is controlled by Keystone. The users must
have a role that allows them to perform the request to your Service. In
addition, they must have a role that also allows them to store data in
the Swift account. By default, the admin or swiftoperator roles are
used for this purpose (specific systems may use other role names). If the
user does not have the appropriate roles, when your Service attempts
to access Swift, the operation will fail.
Since you are using the user's token to access the data, it follows that
the user can use the same token to access Swift directly -- bypassing your
Service. When end-users are browsing containers, they will also see
your Service's containers and objects -- and may potentially delete
the data. Conversely, there is no single account where all data so leakage
of credentials will only affect a single project/tenant.
* Service Prefix Account
Data belonging to a project is stored in a Swift account associated
with the project. This is similar to the Multi Project scheme described
above. However, the Swift account is different than the account that
users access. Specifically, it has a different account prefix. For example,
for the project 1234, the user account is named AUTH_1234. Your Service uses
a different account, for example, SERVICE_1234.
To access the SERVICE_1234 account, you must present two tokens: the user's
token is put in the X-Auth-Token header. You present your Service's token
in the X-Service-Token header. Swift is configured such that only when both
tokens are presented will it allow access. Specifically, the user cannot
bypass your Service because they only have their own token. Conversely, your
Service can only access the data while it has a copy of the user's token --
the Service's token by itself will not grant access.
The data stored in the Service Prefix Account cannot be seen by end-users.
So they cannot delete this data -- they can only access the data if they
make a request through your Service. The data is also more secure. To make
an unauthorized access, someone would need to compromise both an end-user's
and your Service User credentials. Even then, this would only expose one
project -- not other projects.
The Service Prefix Account scheme combines features of the Dedicated Service
Account and Multi Project schemes. It has the private, dedicated,
characteristics of the Dedicated Service Account scheme but does not present
a single point of attack. Using the Service Prefix Account scheme is a little
more involved than the other schemes, so the rest of this document describes
it more detail.
-------------------------------
Service Prefix Account Overview
-------------------------------
The following diagram shows the flow through the system from the end-user,
to your Service and then onto Swift::
client
\
\ <request>: <path-specific-to-the-service>
\ x-auth-token: <user-token>
\
SERVICE
\
\ PUT: /v1/SERVICE_1234/<container>/<object>
\ x-auth-token: <user-token>
\ x-service-token: <service-token>
\
Swift
The sequence of events and actions are as follows:
* Request arrives at your Service
* The <user-token> is validated by the keystonemiddleware.auth_token
middleware. The user's role(s) are used to determine if the user
can perform the request. See :doc:`overview_auth` for technical
information on the authentication system.
* As part of this request, your Service needs to access Swift (either to
write or read a container or object). In this example, you want to perform
a PUT on <container>/<object>.
* In the wsgi environment, the auth_token module will have populated the
HTTP_X_SERVICE_CATALOG item. This lists the Swift endpoint and account.
This is something such as https://<netloc>/v1/AUTH_1234 where ``AUTH_``
is a prefix and ``1234`` is the project id.
* The ``AUTH_`` prefix is the default value. However, your system may use a
different prefix. To determine the actual prefix, search for the first
underscore ('_') character in the account name. If there is no underscore
character in the account name, this means there is no prefix.
* Your Service should have a configuration parameter that provides the
appropriate prefix to use for storing data in Swift. There is more
discussion of this below, but for now assume the prefix is ``SERVICE_``.
* Replace the prefix (``AUTH_`` in above examples) in the path with
``SERVICE_``, so the full URL to access the object becomes
https://<netloc>/v1/SERVICE_1234/<container>/<object>.
* Make the request to Swift, using this URL. In the X-Auth-Token header place
a copy of the <user-token>. In the X-Service-Token header, place your
Service's token. If you use python-swiftclient you can achieve this
by:
* Putting the URL in the ``preauthurl`` parameter
* Putting the <user-token> in ``preauthtoken`` paramater
* Adding the X-Service-Token to the ``headers`` parameter
Using the HTTP_X_SERVICE_CATALOG to get Swift Account Name
----------------------------------------------------------
The auth_token middleware populates the wsgi environment with information when
it validates the user's token. The HTTP_X_SERVICE_CATALOG item is a JSON
string containing details of the Openstack endpoints. For Swift, this also
contains the project's Swift account name. Here is an example of a catalog
entry for Swift::
"serviceCatalog": [
...
{
....
"type": "object-store",
"endpoints": [
...
{
...
"publicURL": "https://<netloc>/v1/AUTH_1234",
"region": "<region-name>"
...
}
...
...
}
}
To get the End-user's account:
* Look for an entry with ``type`` of ``object-store``
* If there are several regions, there will be several endpoints. Use the
appropriate region name and select the ``publicURL`` item.
* The Swift account name is the final item in the path ("AUTH_1234" in this
example).
Getting a Service Token
-----------------------
A Service Token is no different than any other token and is requested
from Keystone using user credentials and project in the usual way. The core
requirement is that your Service User has the appropriate role. In practice:
* Your Service must have a user assigned to it (the Service User).
* Your Service has a project assigned to it (the Service Project).
* The Service User must have a role on the Service Project. This role is
distinct from any of the normal end-user roles.
* The role used must the role configured in the /etc/swift/proxy-server.conf.
This is the ``<prefix>_service_roles`` option. In this example, the role
is the ``service`` role::
[keystoneauth]
reseller_prefix = AUTH_, SERVICE_
SERVICE_service_role = service
The ``service`` role should only be granted to Openstack Services. It should
not be granted to users.
Single or multiple Service Prefixes?
------------------------------------
Most of the examples used in this document used a single prefix. The
prefix, ``SERVICE`` was used. By using a single prefix, an operator is
allowing all Openstack Services to share the same account for data
associated with a given project. For test systems or deployments well protected
on private firewalled networks, this is appropriate.
However, if one Service is compromised, that Service can access
data created by another Service. To prevent this, multiple Service Prefixes may
be used. This also requires that the operator configure multiple service
roles. For example, in a system that has Glance and Cinder, the following
Swift configuration could be used:
[keystoneauth]
reseller_prefix = AUTH_, IMAGE_, BLOCK_
IMAGE_service_roles = image_service
BLOCK_service_roles = block_service
The Service User for Glance would be granted the ``image_service`` role on its
Service Project and the Cinder Service user is granted the ``block_service``
role on its project. In this scheme, if the Cinder Service was compromised,
it would not be able to access any Glance data.
Container Naming
----------------
Since a single Service Prefix is possible, container names should be prefixed
with a unique string to prevent name clashes. We suggest you use the service
type field (as used in the service catalog). For example, The Glance Service
would use "image" as a prefix.

View File

@ -218,8 +218,22 @@ use = egg:swift#tempauth
# attempting to validate it. Also, with authorization, only Swift storage
# accounts with this prefix will be authorized by this middleware. Useful if
# multiple auth systems are in use for one Swift cluster.
# The reseller_prefix may contain a comma separated list of items. The first
# item is used for the token as mentioned above. If second and subsequent
# items exist, the middleware will handle authorization for an account with
# that prefix. For example, for prefixes "AUTH, SERVICE", a path of
# /v1/SERVICE_account is handled the same as /v1/AUTH_account. If an empty
# (blank) reseller prefix is required, it must be first in the list. Two
# single quote characters indicates an empty (blank) reseller prefix.
# reseller_prefix = AUTH
#
# The require_group parameter names a group that must be presented by
# either X-Auth-Token or X-Service-Token. Usually this parameter is
# used only with multiple reseller prefixes (e.g., SERVICE_require_group=blah).
# By default, no group is needed. Do not use .admin.
# require_group =
# The auth prefix will cause requests beginning with this prefix to be routed
# to the auth subsystem, for granting tokens, etc.
# auth_prefix = /auth/
@ -255,6 +269,7 @@ user_admin_admin = admin .admin .reseller_admin
user_test_tester = testing .admin
user_test2_tester2 = testing2 .admin
user_test_tester3 = testing3
user_test5_tester5 = testing5 service
# To enable Keystone authentication you need to have the auth token
# middleware first to be configured. Here is an example below, please
@ -278,8 +293,27 @@ user_test_tester3 = testing3
#
# [filter:keystoneauth]
# use = egg:swift#keystoneauth
# Operator roles is the role which user would be allowed to manage a
# tenant and be able to create container or give ACL to others.
# The reseller_prefix option lists account namespaces that this middleware is
# responsible for. The prefix is placed before the Keystone project id.
# For example, for project 12345678, and prefix AUTH, the account is
# named AUTH_12345678 (i.e., path is /v1/AUTH_12345678/...).
# Several prefixes are allowed by specifying a comma-separated list
# as in: "reseller_prefix = AUTH, SERVICE". The empty string indicates a
# single blank/empty prefix. If an empty prefix is required in a list of
# prefixes, a value of '' (two single quote characters) indicates a
# blank/empty prefix. Except for the blank/empty prefix, an underscore ('_')
# character is appended to the value unless already present.
# reseller_prefix = AUTH
#
# The user must have at least one role named by operator_roles on a
# project in order to create, delete and modify containers and objects
# and to set and read privileged headers such as ACLs.
# If there are several reseller prefix items, you can prefix the
# parameter so it applies only to those accounts (for example
# the parameter SERVICE_operator_roles applies to the /v1/SERVICE_<project>
# path). If you omit the prefix, the option applies to all reseller
# prefix items. For the blank/empty prefix, prefix with '' (do not put
# underscore after the two single quote characters).
# operator_roles = admin, swiftoperator
#
# The reseller admin role has the ability to create and delete accounts
@ -297,12 +331,25 @@ user_test_tester3 = testing3
# compares names rather than UUIDs. This option is deprecated.
# is_admin = false
#
# If the service_roles parameter is present, an X-Service-Token must be
# present in the request that when validated, grants at least one role listed
# in the parameter. The X-Service-Token may be scoped to any project.
# If there are several reseller prefix items, you can prefix the
# parameter so it applies only to those accounts (for example
# the parameter SERVICE_service_roles applies to the /v1/SERVICE_<project>
# path). If you omit the prefix, the option applies to all reseller
# prefix items. For the blank/empty prefix, prefix with '' (do not put
# underscore after the two single quote characters).
# By default, no service_roles are required.
# service_roles =
#
# For backwards compatibility, keystoneauth will match names in cross-tenant
# access control lists (ACLs) when both the requesting user and the tenant
# are in the default domain i.e the domain to which existing tenants are
# migrated. The default_domain_id value configured here should be the same as
# the value used during migration of tenants to keystone domains.
# default_domain_id = default
#
# For a new installation, or an installation in which keystone projects may
# move between domains, you should disable backwards compatible name matching
# in ACLs by setting allow_names_in_acls to false:

View File

@ -17,7 +17,7 @@ from swift.common.http import is_success
from swift.common.middleware import acl as swift_acl
from swift.common.request_helpers import get_sys_meta_prefix
from swift.common.swob import HTTPNotFound, HTTPForbidden, HTTPUnauthorized
from swift.common.utils import register_swift_info
from swift.common.utils import config_read_reseller_options, list_from_csv
from swift.proxy.controllers.base import get_account_info
import functools
@ -65,13 +65,16 @@ class KeystoneAuth(object):
use = egg:swift#keystoneauth
operator_roles = admin, swiftoperator
This maps tenants to account in Swift.
The user whose able to give ACL / create Containers permissions
will be the one that are inside the ``operator_roles``
The user who is able to give ACL / create Containers permissions
will be the user with a role listed in the ``operator_roles``
setting which by default includes the admin and the swiftoperator
roles.
The keystoneauth middleware maps a Keystone project/tenant to an account
in Swift by adding a prefix (``AUTH_`` by default) to the tenant/project
id.. For example, if the project id is ``1234``, the path is
``/v1/AUTH_1234``.
If the ``is_admin`` option is ``true``, a user whose username is the same
as the project name and who has any role on the project will have access
rights elevated to be the same as if the user had one of the
@ -84,6 +87,48 @@ class KeystoneAuth(object):
reseller_prefix = NEWAUTH
Don't forget to also update the Keystone service endpoint configuration to
use NEWAUTH in the path.
It is possible to have several accounts associated with the same project.
This is done by listing several prefixes as shown in the following
example:
reseller_prefix = AUTH, SERVICE
This means that for project id '1234', the paths '/v1/AUTH_1234' and
'/v1/SERVICE_1234' are associated with the project and are authorized
using roles that a user has with that project. The core use of this feature
is that it is possible to provide different rules for each account
prefix. The following parameters may be prefixed with the appropriate
prefix:
operator_roles
service_roles
For backward compatibility, no prefix implies the parameter
applies to all reseller_prefixes. Here is an example, using two
prefixes::
reseller_prefix = AUTH, SERVICE
# The next three lines have identical effects (since the first applies
# to both prefixes).
operator_roles = admin, swiftoperator
AUTH_operator_roles = admin, swiftoperator
SERVICE_operator_roles = admin, swiftoperator
# The next line only applies to accounts with the SERVICE prefix
SERVICE_operator_roles = admin, some_other_role
X-Service-Token tokens are supported by the inclusion of the service_roles
configuration option. When present, this option requires that the
X-Service-Token header supply a token from a user who has a role listed
in service_roles. Here is an example configuration::
reseller_prefix = AUTH, SERVICE
AUTH_operator_roles = admin, swiftoperator
SERVICE_operator_roles = admin, swiftoperator
SERVICE_service_roles = service
The keystoneauth middleware supports cross-tenant access control using
the syntax ``<tenant>:<user>`` to specify a grantee in container Access
Control Lists (ACLs). For a request to be granted by an ACL, the grantee
@ -135,11 +180,11 @@ class KeystoneAuth(object):
self.app = app
self.conf = conf
self.logger = swift_utils.get_logger(conf, log_route='keystoneauth')
self.reseller_prefix = conf.get('reseller_prefix', 'AUTH_').strip()
if self.reseller_prefix and self.reseller_prefix[-1] != '_':
self.reseller_prefix += '_'
self.operator_roles = conf.get('operator_roles',
'admin, swiftoperator').lower()
self.reseller_prefixes, self.account_rules = \
config_read_reseller_options(conf,
dict(operator_roles=['admin',
'swiftoperator'],
service_roles=[]))
self.reseller_admin_role = conf.get('reseller_admin_role',
'ResellerAdmin').lower()
config_is_admin = conf.get('is_admin', "false").lower()
@ -158,7 +203,7 @@ class KeystoneAuth(object):
# authentication
if (self.allow_overrides and
environ.get('swift.authorize_override', False)):
msg = 'Authorizing from an overriding middleware (i.e: tempurl)'
msg = 'Authorizing from an overriding middleware'
self.logger.debug(msg)
return self.app(environ, start_response)
@ -212,14 +257,14 @@ class KeystoneAuth(object):
"""Extract the identity from the Keystone auth component."""
if environ.get('HTTP_X_IDENTITY_STATUS') != 'Confirmed':
return
roles = []
if 'HTTP_X_ROLES' in environ:
roles = environ['HTTP_X_ROLES'].split(',')
roles = list_from_csv(environ.get('HTTP_X_ROLES', ''))
service_roles = list_from_csv(environ.get('HTTP_X_SERVICE_ROLES', ''))
identity = {'user': (environ.get('HTTP_X_USER_ID'),
environ.get('HTTP_X_USER_NAME')),
'tenant': (environ.get('HTTP_X_TENANT_ID'),
environ.get('HTTP_X_TENANT_NAME')),
'roles': roles}
'roles': roles,
'service_roles': service_roles}
token_info = environ.get('keystone.token_info', {})
auth_version = 0
user_domain = project_domain = (None, None)
@ -237,12 +282,25 @@ class KeystoneAuth(object):
identity['auth_version'] = auth_version
return identity
def _get_account_for_tenant(self, tenant_id):
return '%s%s' % (self.reseller_prefix, tenant_id)
def _get_account_name(self, prefix, tenant_id):
return '%s%s' % (prefix, tenant_id)
def _reseller_check(self, account, tenant_id):
"""Check reseller prefix."""
return account == self._get_account_for_tenant(tenant_id)
def _account_matches_tenant(self, account, tenant_id):
"""Check if account belongs to a project/tenant"""
for prefix in self.reseller_prefixes:
if self._get_account_name(prefix, tenant_id) == account:
return True
return False
def _get_account_prefix(self, account):
"""Get the prefix of an account"""
# Empty prefix matches everything, so try to match others first
for prefix in [pre for pre in self.reseller_prefixes if pre != '']:
if account.startswith(prefix):
return prefix
if '' in self.reseller_prefixes:
return ''
return None
def _get_project_domain_id(self, environ):
info = get_account_info(environ, self.app, 'KS')
@ -269,7 +327,7 @@ class KeystoneAuth(object):
tenant_id, tenant_name = env_identity['tenant']
exists, sysmeta_id = self._get_project_domain_id(req.environ)
req_has_id, req_id, new_id = False, None, None
if self._reseller_check(account, tenant_id):
if self._account_matches_tenant(account, tenant_id):
# domain id can be inferred from request (may be None)
req_has_id = True
req_id = env_identity['project_domain'][0]
@ -304,7 +362,7 @@ class KeystoneAuth(object):
# request user and scoped project are both in default domain
tenant_id, tenant_name = identity['tenant']
version, account, container, obj = path_parts
if self._reseller_check(account, tenant_id):
if self._account_matches_tenant(account, tenant_id):
# account == scoped project, so account is also in default domain
allow = True
else:
@ -365,6 +423,8 @@ class KeystoneAuth(object):
self._set_project_domain_id(req, part, env_identity)
user_roles = [r.lower() for r in env_identity.get('roles', [])]
user_service_roles = [r.lower() for r in env_identity.get(
'service_roles', [])]
# Give unconditional access to a user with the reseller_admin
# role.
@ -402,21 +462,35 @@ class KeystoneAuth(object):
# Check if a user tries to access an account that does not match their
# token
if not self._reseller_check(account, tenant_id):
if not self._account_matches_tenant(account, tenant_id):
log_msg = 'tenant mismatch: %s != %s'
self.logger.debug(log_msg, account, tenant_id)
return self.denied_response(req)
# Check the roles the user is belonging to. If the user is
# part of the role defined in the config variable
# operator_roles (like admin) then it will be
# promoted as an admin of the account/tenant.
for role in self.operator_roles.split(','):
role = role.strip()
if role in user_roles:
log_msg = 'allow user with role %s as account admin'
self.logger.debug(log_msg, role)
# Compare roles from tokens against the configuration options:
#
# X-Auth-Token role Has specified X-Service-Token role Grant
# in operator_roles? service_roles? in service_roles? swift_owner?
# ------------------ -------------- -------------------- ------------
# yes yes yes yes
# yes no don't care yes
# no don't care don't care no
# ------------------ -------------- -------------------- ------------
account_prefix = self._get_account_prefix(account)
operator_roles = self.account_rules[account_prefix]['operator_roles']
have_operator_role = set(operator_roles).intersection(
set(user_roles))
service_roles = self.account_rules[account_prefix]['service_roles']
have_service_role = set(service_roles).intersection(
set(user_service_roles))
if have_operator_role and (service_roles and have_service_role):
req.environ['swift_owner'] = True
elif have_operator_role and not service_roles:
req.environ['swift_owner'] = True
if req.environ.get('swift_owner'):
log_msg = 'allow user with role(s) %s as account admin'
self.logger.debug(log_msg, ','.join(have_operator_role.union(
have_service_role)))
return
# If user is of the same name of the tenant then make owner of it.
@ -457,7 +531,8 @@ class KeystoneAuth(object):
return
is_authoritative_authz = (account and
account.startswith(self.reseller_prefix))
(self._get_account_prefix(account) in
self.reseller_prefixes))
if not is_authoritative_authz:
return self.denied_response(req)
@ -508,7 +583,6 @@ def filter_factory(global_conf, **local_conf):
"""Returns a WSGI filter app for use with paste.deploy."""
conf = global_conf.copy()
conf.update(local_conf)
register_swift_info('keystoneauth')
def auth_filter(app):
return KeystoneAuth(app, conf)

View File

@ -1,4 +1,4 @@
# Copyright (c) 2011 OpenStack Foundation
# Copyright (c) 2011-2014 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -32,7 +32,8 @@ from swift.common.request_helpers import get_sys_meta_prefix
from swift.common.middleware.acl import (
clean_acl, parse_acl, referrer_allowed, acls_from_account_info)
from swift.common.utils import cache_from_env, get_logger, \
split_path, config_true_value, register_swift_info
split_path, config_true_value
from swift.common.utils import config_read_reseller_options
from swift.proxy.controllers.base import get_account_info
@ -66,6 +67,53 @@ class TempAuth(object):
See the proxy-server.conf-sample for more information.
Multiple Reseller Prefix Items:
The reseller prefix specifies which parts of the account namespace this
middleware is responsible for managing authentication and authorization.
By default, the prefix is AUTH so accounts and tokens are prefixed
by AUTH_. When a request's token and/or path start with AUTH_, this
middleware knows it is responsible.
We allow the reseller prefix to be a list. In tempauth, the first item
in the list is used as the prefix for tokens and user groups. The
other prefixes provide alternate accounts that user's can access. For
example if the reseller prefix list is 'AUTH, OTHER', a user with
admin access to AUTH_account also has admin access to
OTHER_account.
Required Group:
The group .admin is normally needed to access an account (ACLs provide
an additional way to access an account). You can specify the
``require_group`` parameter. This means that you also need the named group
to access an account. If you have several reseller prefix items, prefix
the ``require_group`` parameter with the appropriate prefix.
X-Service-Token:
If an X-Service-Token is presented in the request headers, the groups
derived from the token are appended to the roles derived form
X-Auth-Token. If X-Auth-Token is missing or invalid, X-Service-Token
is not processed.
The X-Service-Token is useful when combined with multiple reseller prefix
items. In the following configuration, accounts prefixed SERVICE_
are only accessible if X-Auth-Token is form the end-user and
X-Service-Token is from the ``glance`` user::
[filter:tempauth]
use = egg:swift#tempauth
reseller_prefix = AUTH, SERVICE
SERVICE_require_group = .service
user_admin_admin = admin .admin .reseller_admin
user_joeacct_joe = joepw .admin
user_maryacct_mary = marypw .admin
user_glance_glance = glancepw .service
The name .service is an example. Unlike .admin and .reseller_admin
it is not a reserved name.
Account ACLs:
If a swift_owner issues a POST or PUT to the account, with the
X-Account-Access-Control header set in the request, then this may
@ -112,9 +160,9 @@ class TempAuth(object):
self.conf = conf
self.logger = get_logger(conf, log_route='tempauth')
self.log_headers = config_true_value(conf.get('log_headers', 'f'))
self.reseller_prefix = conf.get('reseller_prefix', 'AUTH').strip()
if self.reseller_prefix and self.reseller_prefix[-1] != '_':
self.reseller_prefix += '_'
self.reseller_prefixes, self.account_rules = \
config_read_reseller_options(conf, dict(require_group=''))
self.reseller_prefix = self.reseller_prefixes[0]
self.logger.set_statsd_prefix('tempauth.%s' % (
self.reseller_prefix if self.reseller_prefix else 'NONE',))
self.auth_prefix = conf.get('auth_prefix', '/auth/')
@ -179,9 +227,14 @@ class TempAuth(object):
return self.handle(env, start_response)
s3 = env.get('HTTP_AUTHORIZATION')
token = env.get('HTTP_X_AUTH_TOKEN', env.get('HTTP_X_STORAGE_TOKEN'))
service_token = env.get('HTTP_X_SERVICE_TOKEN')
if s3 or (token and token.startswith(self.reseller_prefix)):
# Note: Empty reseller_prefix will match all tokens.
groups = self.get_groups(env, token)
if service_token:
service_groups = self.get_groups(env, service_token)
if groups and service_groups:
groups += ',' + service_groups
if groups:
user = groups and groups.split(',', 1)[0] or ''
trans_id = env.get('swift.trans_id')
@ -211,41 +264,101 @@ class TempAuth(object):
elif 'swift.authorize' not in env:
env['swift.authorize'] = self.denied_response
else:
if self.reseller_prefix:
# With a non-empty reseller_prefix, I would like to be called
# back for anonymous access to accounts I know I'm the
# definitive auth for.
try:
version, rest = split_path(env.get('PATH_INFO', ''),
1, 2, True)
except ValueError:
version, rest = None, None
self.logger.increment('errors')
if rest and rest.startswith(self.reseller_prefix):
if self._is_definitive_auth(env.get('PATH_INFO', '')):
# Handle anonymous access to accounts I'm the definitive
# auth for.
env['swift.authorize'] = self.authorize
env['swift.clean_acl'] = clean_acl
# Not my token, not my account, I can't authorize this request,
# deny all is a good idea if not already set...
elif 'swift.authorize' not in env:
env['swift.authorize'] = self.denied_response
# Because I'm not certain if I'm the definitive auth for empty
# reseller_prefixed accounts, I won't overwrite swift.authorize.
elif 'swift.authorize' not in env:
elif self.reseller_prefix == '':
# Because I'm not certain if I'm the definitive auth, I won't
# overwrite swift.authorize.
if 'swift.authorize' not in env:
env['swift.authorize'] = self.authorize
env['swift.clean_acl'] = clean_acl
else:
# Not my token, not my account, I can't authorize this request,
# deny all is a good idea if not already set...
if 'swift.authorize' not in env:
env['swift.authorize'] = self.denied_response
return self.app(env, start_response)
def _is_definitive_auth(self, path):
"""
Determine if we are the definitive auth
Determines if we are the definitive auth for a given path.
If the account name is prefixed with something matching one
of the reseller_prefix items, then we are the auth (return True)
Non-matching: we are not the auth.
However, one of the reseller_prefix items can be blank. If
so, we cannot always be definite so return False.
:param path: A path (e.g., /v1/AUTH_joesaccount/c/o)
:return:True if we are definitive auth
"""
try:
version, account, rest = split_path(path, 1, 3, True)
except ValueError:
return False
if account:
return bool(self._get_account_prefix(account))
return False
def _non_empty_reseller_prefixes(self):
return iter([pre for pre in self.reseller_prefixes if pre != ''])
def _get_account_prefix(self, account):
"""
Get the prefix of an account
Determines which reseller prefix matches the account and returns
that prefix. If account does not start with one of the known
reseller prefixes, returns None.
:param account: Account name (e.g., AUTH_joesaccount) or None
:return: The prefix string (examples: 'AUTH_', 'SERVICE_', '')
If we can't match the prefix of the account, return None
"""
if account is None:
return None
# Empty prefix matches everything, so try to match others first
for prefix in self._non_empty_reseller_prefixes():
if account.startswith(prefix):
return prefix
if '' in self.reseller_prefixes:
return ''
return None
def _dot_account(self, account):
"""
Detect if account starts with dot character after the prefix
:param account: account in path (e.g., AUTH_joesaccount)
:return:True if name starts with dot character
"""
prefix = self._get_account_prefix(account)
return prefix is not None and account[len(prefix)] == '.'
def _get_user_groups(self, account, account_user, account_id):
"""
:param account: example: test
:param account_user: example: test:tester
:param account_id: example: AUTH_test
:return: a comma separated string of group names. The group names are
as follows: account,account_user,groups...
If .admin is in the groups, this is replaced by all the
possible account ids. For example, for user joe, account acct
and resellers AUTH_, OTHER_, the returned string is as
follows: acct,acct:joe,AUTH_acct,OTHER_acct
"""
groups = [account, account_user]
groups.extend(self.users[account_user]['groups'])
if '.admin' in groups:
groups.remove('.admin')
for prefix in self._non_empty_reseller_prefixes():
groups.append('%s%s' % (prefix, account))
if account_id not in groups:
groups.append(account_id)
groups = ','.join(groups)
return groups
@ -256,7 +369,6 @@ class TempAuth(object):
:param env: The current WSGI environment dictionary.
:param token: Token to validate and return a group string for.
:returns: None if the token is invalid or a string containing a comma
separated list of groups the authenticated user is a member
of. The first group in the list is also considered a unique
@ -287,7 +399,7 @@ class TempAuth(object):
s = base64.encodestring(hmac.new(key, msg, sha1).digest()).strip()
if s != sign:
return None
groups = self._get_user_groups(account, account_user, account_id)
groups = self._get_user_groups(account, account_user)
return groups
@ -356,17 +468,16 @@ class TempAuth(object):
Returns None if the request is authorized to continue or a standard
WSGI response callable if not.
"""
try:
_junk, account, container, obj = req.split_path(1, 4, True)
except ValueError:
self.logger.increment('errors')
return HTTPNotFound(request=req)
if not account or not account.startswith(self.reseller_prefix):
if self._get_account_prefix(account) is None:
self.logger.debug("Account name: %s doesn't start with "
"reseller_prefix: %s."
% (account, self.reseller_prefix))
"reseller_prefix(s): %s."
% (account, ','.join(self.reseller_prefixes)))
return self.denied_response(req)
# At this point, TempAuth is convinced that it is authoritative.
@ -385,8 +496,8 @@ class TempAuth(object):
account_user = user_groups[1] if len(user_groups) > 1 else None
if '.reseller_admin' in user_groups and \
account != self.reseller_prefix and \
account[len(self.reseller_prefix)] != '.':
account not in self.reseller_prefixes and \
not self._dot_account(account):
req.environ['swift_owner'] = True
self.logger.debug("User %s has reseller admin authorizing."
% account_user)
@ -394,8 +505,18 @@ class TempAuth(object):
if account in user_groups and \
(req.method not in ('DELETE', 'PUT') or container):
# If the user is admin for the account and is not trying to do an
# account DELETE or PUT...
# The user is admin for the account and is not trying to do an
# account DELETE or PUT
account_prefix = self._get_account_prefix(account)
require_group = self.account_rules.get(account_prefix).get(
'require_group')
if require_group and require_group in user_groups:
req.environ['swift_owner'] = True
self.logger.debug("User %s has admin and %s group."
" Authorizing." % (account_user,
require_group))
return None
elif not require_group:
req.environ['swift_owner'] = True
self.logger.debug("User %s has admin authorizing."
% account_user)
@ -648,7 +769,6 @@ def filter_factory(global_conf, **local_conf):
"""Returns a WSGI filter app for use with paste.deploy."""
conf = global_conf.copy()
conf.update(local_conf)
register_swift_info('tempauth', account_acls=True)
def auth_filter(app):
return TempAuth(app, conf)

View File

@ -291,6 +291,72 @@ def config_auto_int_value(value, default):
return value
def append_underscore(prefix):
if prefix and prefix[-1] != '_':
prefix += '_'
return prefix
def config_read_reseller_options(conf, defaults):
"""
Read reseller_prefix option and associated options from configuration
Reads the reseller_prefix option, then reads options that may be
associated with a specific reseller prefix. Reads options such that an
option without a prefix applies to all reseller prefixes unless an option
has an explicit prefix.
:param conf: the configuration
:param defaults: a dict of default values. The key is the option
name. The value is either an array of strings or a string
:return: tuple of an array of reseller prefixes and a dict of option values
"""
reseller_prefix_opt = conf.get('reseller_prefix', 'AUTH').split(',')
reseller_prefixes = []
for prefix in [pre.strip() for pre in reseller_prefix_opt if pre.strip()]:
if prefix == "''":
prefix = ''
prefix = append_underscore(prefix)
if prefix not in reseller_prefixes:
reseller_prefixes.append(prefix)
if len(reseller_prefixes) == 0:
reseller_prefixes.append('')
# Get prefix-using config options
associated_options = {}
for prefix in reseller_prefixes:
associated_options[prefix] = dict(defaults)
associated_options[prefix].update(
config_read_prefixed_options(conf, '', defaults))
prefix_name = prefix if prefix != '' else "''"
associated_options[prefix].update(
config_read_prefixed_options(conf, prefix_name, defaults))
return reseller_prefixes, associated_options
def config_read_prefixed_options(conf, prefix_name, defaults):
"""
Read prefixed options from configuration
:param conf: the configuration
:param prefix_name: the prefix (including, if needed, an underscore)
:param defaults: a dict of default values. The dict supplies the
option name and type (string or comma separated string)
:return: a dict containing the options
"""
params = {}
for option_name in defaults.keys():
value = conf.get('%s%s' % (prefix_name, option_name))
if value:
if isinstance(defaults.get(option_name), list):
params[option_name] = []
for role in value.lower().split(','):
params[option_name].append(role.strip())
else:
params[option_name] = value.strip()
return params
def noop_libc_function(*args):
return 0

View File

@ -82,15 +82,15 @@ normalized_urls = None
# If no config was read, we will fall back to old school env vars
swift_test_auth_version = None
swift_test_auth = os.environ.get('SWIFT_TEST_AUTH')
swift_test_user = [os.environ.get('SWIFT_TEST_USER'), None, None, '']
swift_test_key = [os.environ.get('SWIFT_TEST_KEY'), None, None, '']
swift_test_tenant = ['', '', '', '']
swift_test_perm = ['', '', '', '']
swift_test_domain = ['', '', '', '']
swift_test_user_id = ['', '', '', '']
swift_test_tenant_id = ['', '', '', '']
swift_test_user = [os.environ.get('SWIFT_TEST_USER'), None, None, '', '']
swift_test_key = [os.environ.get('SWIFT_TEST_KEY'), None, None, '', '']
swift_test_tenant = ['', '', '', '', '']
swift_test_perm = ['', '', '', '', '']
swift_test_domain = ['', '', '', '', '']
swift_test_user_id = ['', '', '', '', '']
swift_test_tenant_id = ['', '', '', '', '']
skip, skip2, skip3 = False, False, False
skip, skip2, skip3, skip_service_tokens = False, False, False, False
orig_collate = ''
insecure = False
@ -207,11 +207,14 @@ def in_process_setup(the_object_server=object_server):
# User on same account as first, but without admin access
'username3': 'tester3',
'password3': 'testing3',
# For tempauth middleware
'user_admin_admin': 'admin .admin .reseller_admin',
'user_test_tester': 'testing .admin',
'user_test2_tester2': 'testing2 .admin',
'user_test_tester3': 'testing3'
# Service user and prefix (emulates glance, cinder, etc. user)
'account5': 'test5',
'username5': 'tester5',
'password5': 'testing5',
'service_prefix': 'SERVICE',
# For tempauth middleware. Update reseller_prefix
'reseller_prefix': 'AUTH, SERVICE',
'SERVICE_require_group': 'service'
})
acc1lis = eventlet.listen(('localhost', 0))
@ -415,6 +418,9 @@ def setup_package():
global swift_test_tenant
global swift_test_perm
global swift_test_domain
global swift_test_service_prefix
swift_test_service_prefix = None
if config:
swift_test_auth_version = str(config.get('auth_version', '1'))
@ -430,6 +436,10 @@ def setup_package():
except KeyError:
pass # skip
if 'service_prefix' in config:
swift_test_service_prefix = utils.append_underscore(
config['service_prefix'])
if swift_test_auth_version == "1":
swift_test_auth += 'v1.0'
@ -457,6 +467,13 @@ def setup_package():
swift_test_key[2] = config['password3']
except KeyError:
pass # old config, no third account tests can be run
try:
swift_test_user[4] = '%s%s' % (
'%s:' % config['account5'], config['username5'])
swift_test_key[4] = config['password5']
swift_test_tenant[4] = config['account5']
except KeyError:
pass # no service token tests can be run
for _ in range(3):
swift_test_perm[_] = swift_test_user[_]
@ -476,8 +493,12 @@ def setup_package():
swift_test_tenant[3] = config['account4']
swift_test_key[3] = config['password4']
swift_test_domain[3] = config['domain4']
if 'username5' in config:
swift_test_user[4] = config['username5']
swift_test_tenant[4] = config['account5']
swift_test_key[4] = config['password5']
for _ in range(4):
for _ in range(5):
swift_test_perm[_] = swift_test_tenant[_] + ':' \
+ swift_test_user[_]
@ -508,6 +529,14 @@ def setup_package():
print >>sys.stderr, \
'SKIPPING FUNCTIONAL TESTS SPECIFIC TO AUTH VERSION 3'
global skip_service_tokens
skip_service_tokens = not all([not skip, swift_test_user[4],
swift_test_key[4], swift_test_tenant[4],
swift_test_service_prefix])
if not skip and skip_service_tokens:
print >>sys.stderr, \
'SKIPPING FUNCTIONAL TESTS SPECIFIC TO SERVICE TOKENS'
get_cluster_info()
@ -546,10 +575,11 @@ class InternalServerError(Exception):
pass
url = [None, None, None, None]
token = [None, None, None, None]
parsed = [None, None, None, None]
conn = [None, None, None, None]
url = [None, None, None, None, None]
token = [None, None, None, None, None]
service_token = [None, None, None, None, None]
parsed = [None, None, None, None, None]
conn = [None, None, None, None, None]
def connection(url):
@ -558,6 +588,18 @@ def connection(url):
return http_connection(url)
def get_url_token(user_index, os_options):
authargs = dict(snet=False,
tenant_name=swift_test_tenant[user_index],
auth_version=swift_test_auth_version,
os_options=os_options,
insecure=insecure)
return get_auth(swift_test_auth,
swift_test_user[user_index],
swift_test_key[user_index],
**authargs)
def retry(func, *args, **kwargs):
"""
You can use the kwargs to override:
@ -566,13 +608,17 @@ def retry(func, *args, **kwargs):
'url_account' (default: matches 'use_account') - which user's storage URL
'resource' (default: url[url_account] - URL to connect to; retry()
will interpolate the variable :storage_url: if present
'service_user' - add a service token from this user (1 indexed)
"""
global url, token, parsed, conn
global url, token, service_token, parsed, conn
retries = kwargs.get('retries', 5)
attempts, backoff = 0, 1
# use account #1 by default; turn user's 1-indexed account into 0-indexed
use_account = kwargs.pop('use_account', 1) - 1
service_user = kwargs.pop('service_user', None)
if service_user:
service_user -= 1 # 0-index
# access our own account by default
url_account = kwargs.pop('url_account', use_account + 1) - 1
@ -582,13 +628,8 @@ def retry(func, *args, **kwargs):
attempts += 1
try:
if not url[use_account] or not token[use_account]:
url[use_account], token[use_account] = \
get_auth(swift_test_auth, swift_test_user[use_account],
swift_test_key[use_account],
snet=False,
tenant_name=swift_test_tenant[use_account],
auth_version=swift_test_auth_version,
os_options=os_options)
url[use_account], token[use_account] = get_url_token(
use_account, os_options)
parsed[use_account] = conn[use_account] = None
if not parsed[use_account] or not conn[use_account]:
parsed[use_account], conn[use_account] = \
@ -598,6 +639,11 @@ def retry(func, *args, **kwargs):
resource = kwargs.pop('resource', '%(storage_url)s')
template_vars = {'storage_url': url[url_account]}
parsed_result = urlparse(resource % template_vars)
if isinstance(service_user, int):
if not service_token[service_user]:
dummy, service_token[service_user] = get_url_token(
service_user, os_options)
kwargs['service_token'] = service_token[service_user]
return func(url[url_account], token[use_account],
parsed_result, conn[url_account],
*args, **kwargs)
@ -605,9 +651,12 @@ def retry(func, *args, **kwargs):
if attempts > retries:
raise
parsed[use_account] = conn[use_account] = None
if service_user:
service_token[service_user] = None
except AuthError:
url[use_account] = token[use_account] = None
continue
if service_user:
service_token[service_user] = None
except InternalServerError:
pass
if attempts <= retries:

View File

@ -26,8 +26,11 @@ import simplejson as json
from nose import SkipTest
from xml.dom import minidom
from swiftclient import get_auth
from swift.common.utils import config_true_value
from test import safe_repr
@ -109,6 +112,7 @@ class Connection(object):
self.auth_host = config['auth_host']
self.auth_port = int(config['auth_port'])
self.auth_ssl = config['auth_ssl'] in ('on', 'true', 'yes', '1')
self.insecure = config_true_value(config.get('insecure', 'false'))
self.auth_prefix = config.get('auth_prefix', '/')
self.auth_version = str(config.get('auth_version', '1'))
@ -147,10 +151,11 @@ class Connection(object):
auth_netloc = "%s:%d" % (self.auth_host, self.auth_port)
auth_url = auth_scheme + auth_netloc + auth_path
authargs = dict(snet=False, tenant_name=self.account,
auth_version=self.auth_version, os_options={},
insecure=self.insecure)
(storage_url, storage_token) = get_auth(
auth_url, auth_user, self.password, snet=False,
tenant_name=self.account, auth_version=self.auth_version,
os_options={})
auth_url, auth_user, self.password, **authargs)
if not (storage_url and storage_token):
raise AuthenticationFailed()

View File

@ -28,8 +28,10 @@ import uuid
from copy import deepcopy
import eventlet
from nose import SkipTest
from swift.common.http import is_success, is_client_error
from test.functional import normalized_urls, load_constraint, cluster_info
from test.functional import check_response, retry
import test.functional as tf
from test.functional.swift_test_client import Account, Connection, File, \
ResponseError
@ -2833,5 +2835,174 @@ class TestSloTempurlUTF8(Base2, TestSloTempurl):
set_up = False
class TestServiceToken(unittest.TestCase):
def setUp(self):
if tf.skip_service_tokens:
raise SkipTest
self.SET_TO_USERS_TOKEN = 1
self.SET_TO_SERVICE_TOKEN = 2
# keystoneauth and tempauth differ in allowing PUT account
# Even if keystoneauth allows it, the proxy-server uses
# allow_account_management to decide if accounts can be created
self.put_account_expect = is_client_error
if tf.swift_test_auth_version != '1':
if cluster_info.get('swift').get('allow_account_management'):
self.put_account_expect = is_success
def _scenario_generator(self):
paths = ((None, None), ('c', None), ('c', 'o'))
for path in paths:
for method in ('PUT', 'POST', 'HEAD', 'GET', 'OPTIONS'):
yield method, path[0], path[1]
for path in reversed(paths):
yield 'DELETE', path[0], path[1]
def _assert_is_authed_response(self, method, container, object, resp):
resp.read()
expect = is_success
if method == 'DELETE' and not container:
expect = is_client_error
if method == 'PUT' and not container:
expect = self.put_account_expect
self.assertTrue(expect(resp.status), 'Unexpected %s for %s %s %s'
% (resp.status, method, container, object))
def _assert_not_authed_response(self, method, container, object, resp):
resp.read()
expect = is_client_error
if method == 'OPTIONS':
expect = is_success
self.assertTrue(expect(resp.status), 'Unexpected %s for %s %s %s'
% (resp.status, method, container, object))
def prepare_request(self, method, use_service_account=False,
container=None, obj=None, body=None, headers=None,
x_auth_token=None,
x_service_token=None, dbg=False):
"""
Setup for making the request
When retry() calls the do_request() function, it calls it the
test user's token, the parsed path, a connection and (optionally)
a token from the test service user. We save options here so that
do_request() can make the appropriate request.
:param method: The operation (e.g'. 'HEAD')
:param use_service_account: Optional. Set True to change the path to
be the service account
:param container: Optional. Adds a container name to the path
:param obj: Optional. Adds an object name to the path
:param body: Optional. Adds a body (string) in the request
:param headers: Optional. Adds additional headers.
:param x_auth_token: Optional. Default is SET_TO_USERS_TOKEN. One of:
SET_TO_USERS_TOKEN Put the test user's token in
X-Auth-Token
SET_TO_SERVICE_TOKEN Put the service token in X-Auth-Token
:param x_service_token: Optional. Default is to not set X-Service-Token
to any value. If specified, is one of following:
SET_TO_USERS_TOKEN Put the test user's token in
X-Service-Token
SET_TO_SERVICE_TOKEN Put the service token in
X-Service-Token
:param dbg: Optional. Set true to check request arguments
"""
self.method = method
self.use_service_account = use_service_account
self.container = container
self.obj = obj
self.body = body
self.headers = headers
if x_auth_token:
self.x_auth_token = x_auth_token
else:
self.x_auth_token = self.SET_TO_USERS_TOKEN
self.x_service_token = x_service_token
self.dbg = dbg
def do_request(self, url, token, parsed, conn, service_token=''):
if self.use_service_account:
path = self._service_account(parsed.path)
else:
path = parsed.path
if self.container:
path += '/%s' % self.container
if self.obj:
path += '/%s' % self.obj
headers = {}
if self.body:
headers.update({'Content-Length': len(self.body)})
if self.headers:
headers.update(self.headers)
if self.x_auth_token == self.SET_TO_USERS_TOKEN:
headers.update({'X-Auth-Token': token})
elif self.x_auth_token == self.SET_TO_SERVICE_TOKEN:
headers.update({'X-Auth-Token': service_token})
if self.x_service_token == self.SET_TO_USERS_TOKEN:
headers.update({'X-Service-Token': token})
elif self.x_service_token == self.SET_TO_SERVICE_TOKEN:
headers.update({'X-Service-Token': service_token})
if self.dbg:
print('DEBUG: conn.request: method:%s path:%s'
' body:%s headers:%s' % (self.method, path, self.body,
headers))
conn.request(self.method, path, self.body, headers=headers)
return check_response(conn)
def _service_account(self, path):
parts = path.split('/', 3)
account = parts[2]
try:
project_id = account[account.index('_') + 1:]
except ValueError:
project_id = account
parts[2] = '%s%s' % (tf.swift_test_service_prefix, project_id)
return '/'.join(parts)
def test_user_access_own_auth_account(self):
# This covers ground tested elsewhere (tests a user doing HEAD
# on own account). However, if this fails, none of the remaining
# tests will work
self.prepare_request('HEAD')
resp = retry(self.do_request)
resp.read()
self.assert_(resp.status in (200, 204), resp.status)
def test_user_cannot_access_service_account(self):
for method, container, obj in self._scenario_generator():
self.prepare_request(method, use_service_account=True,
container=container, obj=obj)
resp = retry(self.do_request)
self._assert_not_authed_response(method, container, obj, resp)
def test_service_user_denied_with_x_auth_token(self):
for method, container, obj in self._scenario_generator():
self.prepare_request(method, use_service_account=True,
container=container, obj=obj,
x_auth_token=self.SET_TO_SERVICE_TOKEN)
resp = retry(self.do_request, service_user=5)
self._assert_not_authed_response(method, container, obj, resp)
def test_service_user_denied_with_x_service_token(self):
for method, container, obj in self._scenario_generator():
self.prepare_request(method, use_service_account=True,
container=container, obj=obj,
x_auth_token=self.SET_TO_SERVICE_TOKEN,
x_service_token=self.SET_TO_SERVICE_TOKEN)
resp = retry(self.do_request, service_user=5)
self._assert_not_authed_response(method, container, obj, resp)
def test_user_plus_service_can_access_service_account(self):
for method, container, obj in self._scenario_generator():
self.prepare_request(method, use_service_account=True,
container=container, obj=obj,
x_auth_token=self.SET_TO_USERS_TOKEN,
x_service_token=self.SET_TO_SERVICE_TOKEN)
resp = retry(self.do_request, service_user=5)
self._assert_is_authed_response(method, container, obj, resp)
if __name__ == '__main__':
unittest.main()

View File

@ -33,6 +33,31 @@ password3 = testing3
#password4 = testing4
#domain4 = test-domain
# Fifth user is required for service token-specific tests.
# The account must be different than the primary test account
# The user must not have a group (tempauth) or role (keystoneauth) on
# the primary test account. The user must have a group/role that is unique
# and not given to the primary tester and is specified in the options
# <prefix>_require_group (tempauth) or <prefix>_service_roles (keystoneauth).
#account5 = service
#username5 = tester5
#password5 = testing5
# The service_prefix option is used for service token-specific tests.
# If service_prefix or username5 above is not supplied, the tests are skipped.
# To set the value and enable the service token tests, look at the
# reseller_prefix option in /etc/swift/proxy-server.conf. There must be at
# least two prefixes. If not, add a prefix as follows (where we add SERVICE):
# reseller_prefix = AUTH, SERVICE
# The service_prefix must match the <prefix> used in <prefix>_require_group
# (tempauth) or <prefix>_service_roles (keystoneauth); for example:
# SERVICE_require_group = service
# SERVICE_service_roles = service
# Note: Do not enable service token tests if the first prefix in
# reseller_prefix is the empty prefix AND the primary functional test
# account contains an underscore.
#service_prefix = SERVICE
collate = C
# Only necessary if a pre-existing server uses self-signed certificate

View File

@ -18,6 +18,7 @@ import unittest
from swift.common.middleware import keystoneauth
from swift.common.swob import Request, Response
from swift.common.http import HTTP_FORBIDDEN
from swift.common.utils import split_path
from swift.proxy.controllers.base import _get_cache_key
from test.unit import FakeLogger
@ -31,6 +32,45 @@ def _fake_token_info(version='2'):
return {'token': 'fake_value'}
def operator_roles(test_auth):
# Return copy -- not a reference
return list(test_auth.account_rules[test_auth.reseller_prefixes[0]].get(
'operator_roles'))
def get_account_for_tenant(test_auth, tenant_id):
"""Convenience function reduces unit test churn"""
return '%s%s' % (test_auth.reseller_prefixes[0], tenant_id)
def get_identity_headers(status='Confirmed', tenant_id='1',
tenant_name='acct', project_domain_name='domA',
project_domain_id='99',
user_name='usr', user_id='42',
user_domain_name='domA', user_domain_id='99',
role='admin',
service_role=None):
if role is None:
role = []
if isinstance(role, list):
role = ','.join(role)
res = dict(X_IDENTITY_STATUS=status,
X_TENANT_ID=tenant_id,
X_TENANT_NAME=tenant_name,
X_PROJECT_ID=tenant_id,
X_PROJECT_NAME=tenant_name,
X_PROJECT_DOMAIN_ID=project_domain_id,
X_PROJECT_DOMAIN_NAME=project_domain_name,
X_ROLES=role,
X_USER_NAME=user_name,
X_USER_ID=user_id,
X_USER_DOMAIN_NAME=user_domain_name,
X_USER_DOMAIN_ID=user_domain_id)
if service_role:
res.update(X_SERVICE_ROLES=service_role)
return res
class FakeApp(object):
def __init__(self, status_headers_body_iter=None):
self.calls = 0
@ -61,35 +101,16 @@ class SwiftAuth(unittest.TestCase):
def _make_request(self, path=None, headers=None, **kwargs):
if not path:
path = '/v1/%s/c/o' % self.test_auth._get_account_for_tenant('foo')
path = '/v1/%s/c/o' % get_account_for_tenant(self.test_auth, 'foo')
return Request.blank(path, headers=headers, **kwargs)
def _get_identity_headers(self, status='Confirmed', tenant_id='1',
tenant_name='acct', project_domain_name='domA',
project_domain_id='99',
user_name='usr', user_id='42',
user_domain_name='domA', user_domain_id='99',
role='admin'):
return dict(X_IDENTITY_STATUS=status,
X_TENANT_ID=tenant_id,
X_TENANT_NAME=tenant_name,
X_PROJECT_ID=tenant_id,
X_PROJECT_NAME=tenant_name,
X_PROJECT_DOMAIN_ID=project_domain_id,
X_PROJECT_DOMAIN_NAME=project_domain_name,
X_ROLES=role,
X_USER_NAME=user_name,
X_USER_ID=user_id,
X_USER_DOMAIN_NAME=user_domain_name,
X_USER_DOMAIN_ID=user_domain_id)
def _get_successful_middleware(self):
response_iter = iter([('200 OK', {}, '')])
return keystoneauth.filter_factory({})(FakeApp(response_iter))
def test_invalid_request_authorized(self):
role = self.test_auth.reseller_admin_role
headers = self._get_identity_headers(role=role)
headers = get_identity_headers(role=role)
req = self._make_request('/', headers=headers)
resp = req.get_response(self._get_successful_middleware())
self.assertEqual(resp.status_int, 404)
@ -101,20 +122,20 @@ class SwiftAuth(unittest.TestCase):
def test_confirmed_identity_is_authorized(self):
role = self.test_auth.reseller_admin_role
headers = self._get_identity_headers(role=role)
headers = get_identity_headers(role=role)
req = self._make_request('/v1/AUTH_acct/c', headers)
resp = req.get_response(self._get_successful_middleware())
self.assertEqual(resp.status_int, 200)
def test_detect_reseller_request(self):
role = self.test_auth.reseller_admin_role
headers = self._get_identity_headers(role=role)
headers = get_identity_headers(role=role)
req = self._make_request('/v1/AUTH_acct/c', headers)
req.get_response(self._get_successful_middleware())
self.assertTrue(req.environ.get('reseller_request'))
def test_confirmed_identity_is_not_authorized(self):
headers = self._get_identity_headers()
headers = get_identity_headers()
req = self._make_request('/v1/AUTH_acct/c', headers)
resp = req.get_response(self.test_auth)
self.assertEqual(resp.status_int, 403)
@ -141,17 +162,17 @@ class SwiftAuth(unittest.TestCase):
conf = {'reseller_prefix': ''}
test_auth = keystoneauth.filter_factory(conf)(FakeApp())
account = tenant_id = 'foo'
self.assertTrue(test_auth._reseller_check(account, tenant_id))
self.assertTrue(test_auth._account_matches_tenant(account, tenant_id))
def test_reseller_prefix_added_underscore(self):
conf = {'reseller_prefix': 'AUTH'}
test_auth = keystoneauth.filter_factory(conf)(FakeApp())
self.assertEqual(test_auth.reseller_prefix, "AUTH_")
self.assertEqual(test_auth.reseller_prefixes[0], "AUTH_")
def test_reseller_prefix_not_added_double_underscores(self):
conf = {'reseller_prefix': 'AUTH_'}
test_auth = keystoneauth.filter_factory(conf)(FakeApp())
self.assertEqual(test_auth.reseller_prefix, "AUTH_")
self.assertEqual(test_auth.reseller_prefixes[0], "AUTH_")
def test_override_asked_for_but_not_allowed(self):
conf = {'allow_overrides': 'false'}
@ -182,10 +203,10 @@ class SwiftAuth(unittest.TestCase):
self.assertEqual(resp.status_int, 200)
def test_identified_options_allowed(self):
headers = self._get_identity_headers()
headers = get_identity_headers()
headers['REQUEST_METHOD'] = 'OPTIONS'
req = self._make_request('/v1/AUTH_account',
headers=self._get_identity_headers(),
headers=get_identity_headers(),
environ={'REQUEST_METHOD': 'OPTIONS'})
resp = req.get_response(self._get_successful_middleware())
self.assertEqual(resp.status_int, 200)
@ -200,9 +221,9 @@ class SwiftAuth(unittest.TestCase):
def test_project_domain_id_sysmeta_set(self):
proj_id = '12345678'
proj_domain_id = '13'
headers = self._get_identity_headers(tenant_id=proj_id,
headers = get_identity_headers(tenant_id=proj_id,
project_domain_id=proj_domain_id)
account = self.test_auth._get_account_for_tenant(proj_id)
account = get_account_for_tenant(self.test_auth, proj_id)
path = '/v1/' + account
# fake cached account info
_, info_key = _get_cache_key(account, None)
@ -228,10 +249,10 @@ class SwiftAuth(unittest.TestCase):
def test_project_domain_id_sysmeta_set_to_unknown(self):
proj_id = '12345678'
# token scoped to a different project
headers = self._get_identity_headers(tenant_id='87654321',
headers = get_identity_headers(tenant_id='87654321',
project_domain_id='default',
role='reselleradmin')
account = self.test_auth._get_account_for_tenant(proj_id)
account = get_account_for_tenant(self.test_auth, proj_id)
path = '/v1/' + account
# fake cached account info
_, info_key = _get_cache_key(account, None)
@ -252,8 +273,8 @@ class SwiftAuth(unittest.TestCase):
def test_project_domain_id_sysmeta_not_set(self):
proj_id = '12345678'
headers = self._get_identity_headers(tenant_id=proj_id, role='admin')
account = self.test_auth._get_account_for_tenant(proj_id)
headers = get_identity_headers(tenant_id=proj_id, role='admin')
account = get_account_for_tenant(self.test_auth, proj_id)
path = '/v1/' + account
_, info_key = _get_cache_key(account, None)
# v2 token
@ -273,9 +294,9 @@ class SwiftAuth(unittest.TestCase):
def test_project_domain_id_sysmeta_set_unknown_with_v2(self):
proj_id = '12345678'
# token scoped to a different project
headers = self._get_identity_headers(tenant_id='87654321',
headers = get_identity_headers(tenant_id='87654321',
role='reselleradmin')
account = self.test_auth._get_account_for_tenant(proj_id)
account = get_account_for_tenant(self.test_auth, proj_id)
path = '/v1/' + account
_, info_key = _get_cache_key(account, None)
# v2 token
@ -295,6 +316,171 @@ class SwiftAuth(unittest.TestCase):
UNKNOWN_ID)
class SwiftAuthMultiple(SwiftAuth):
"""Runs same tests as SwiftAuth with multiple reseller prefixes
Runs SwiftAuth tests while a second reseller prefix item exists.
Validates that there is no regression against the original
single prefix configuration.
"""
def setUp(self):
self.test_auth = keystoneauth.filter_factory(
{'reseller_prefix': 'AUTH, PRE2'})(FakeApp())
self.test_auth.logger = FakeLogger()
class ServiceTokenFunctionality(unittest.TestCase):
def _make_authed_request(self, conf, project_id, path, method='GET',
user_role='admin', service_role=None):
"""Make a request with keystoneauth as auth
By default, acts as though the user had presented a token
containing the 'admin' role in X-Auth-Token scoped to the specified
project_id.
:param conf: configuration for keystoneauth
:param project_id: the project_id of the token
:param path: the path of the request
:param method: the method (defaults to GET)
:param user_role: the role of X-Auth-Token (defaults to 'admin')
:param service_role: the role in X-Service-Token (defaults to none)
:returns: response object
"""
headers = get_identity_headers(tenant_id=project_id,
role=user_role,
service_role=service_role)
(version, account, _junk, _junk) = split_path(path, 2, 4, True)
_, info_key = _get_cache_key(account, None)
env = {info_key: {'status': 0, 'sysmeta': {}},
'keystone.token_info': _fake_token_info(version='2')}
req = Request.blank(path, environ=env, headers=headers)
req.method = method
fake_app = FakeApp(iter([('200 OK', {}, '')]))
test_auth = keystoneauth.filter_factory(conf)(fake_app)
resp = req.get_response(test_auth)
return resp
def test_unknown_prefix(self):
resp = self._make_authed_request({}, '12345678', '/v1/BLAH_12345678')
self.assertEqual(resp.status_int, 403)
resp = self._make_authed_request(
{'reseller_prefix': 'AUTH, PRE2'}, '12345678', '/v1/BLAH_12345678')
self.assertEqual(resp.status_int, 403)
def test_authed_for_path_single(self):
resp = self._make_authed_request({}, '12345678', '/v1/AUTH_12345678')
self.assertEqual(resp.status_int, 200)
resp = self._make_authed_request(
{'reseller_prefix': 'AUTH'}, '12345678', '/v1/AUTH_12345678')
self.assertEqual(resp.status_int, 200)
resp = self._make_authed_request(
{'reseller_prefix': 'AUTH'}, '12345678', '/v1/AUTH_12345678/c')
self.assertEqual(resp.status_int, 200)
resp = self._make_authed_request(
{'reseller_prefix': 'AUTH'}, '12345678', '/v1/AUTH_12345678',
user_role='ResellerAdmin')
self.assertEqual(resp.status_int, 200)
resp = self._make_authed_request(
{'reseller_prefix': 'AUTH'}, '12345678', '/v1/AUTH_anything',
user_role='ResellerAdmin')
self.assertEqual(resp.status_int, 200)
def test_denied_for_path_single(self):
resp = self._make_authed_request(
{'reseller_prefix': 'AUTH'}, '12345678', '/v1/AUTH_789')
self.assertEqual(resp.status_int, 403)
resp = self._make_authed_request(
{'reseller_prefix': 'AUTH'}, '12345678', '/v1/AUTH_12345678',
user_role='something_else')
self.assertEqual(resp.status_int, 403)
resp = self._make_authed_request(
{'reseller_prefix': 'AUTH'}, '12345678', '/v1/AUTH_12345678',
method='DELETE')
self.assertEqual(resp.status_int, 403)
def test_authed_for_primary_path_multiple(self):
resp = self._make_authed_request(
{'reseller_prefix': 'AUTH, PRE2',
'PRE2_service_roles': 'service'},
'12345678', '/v1/AUTH_12345678')
self.assertEqual(resp.status_int, 200)
def test_denied_for_second_path_with_only_operator_role(self):
# User only presents X-Auth-Token
resp = self._make_authed_request(
{'reseller_prefix': 'AUTH, PRE2',
'PRE2_service_roles': 'service'},
'12345678', '/v1/PRE2_12345678')
self.assertEqual(resp.status_int, 403)
# User puts token in X-Service-Token
resp = self._make_authed_request(
{'reseller_prefix': 'AUTH, PRE2',
'PRE2_service_roles': 'service'},
'12345678', '/v1/PRE2_12345678',
user_role='', service_role='admin')
self.assertEqual(resp.status_int, 403)
# User puts token in both X-Auth-Token and X-Service-Token
resp = self._make_authed_request(
{'reseller_prefix': 'AUTH, PRE2',
'PRE2_service_roles': 'service'},
'12345678', '/v1/PRE2_12345678',
user_role='admin', service_role='admin')
self.assertEqual(resp.status_int, 403)
def test_authed_for_second_path_with_operator_role_and_service(self):
resp = self._make_authed_request(
{'reseller_prefix': 'AUTH, PRE2',
'PRE2_service_roles': 'service'},
'12345678', '/v1/PRE2_12345678', service_role='service')
self.assertEqual(resp.status_int, 200)
def test_denied_for_second_path_with_only_service(self):
resp = self._make_authed_request(
{'reseller_prefix': 'AUTH, PRE2',
'PRE2_service_roles': 'service'},
'12345678', '/v1/PRE2_12345678', user_role='something_else',
service_role='service')
self.assertEqual(resp.status_int, 403)
def test_denied_for_second_path_for_service_user(self):
# User presents token with 'service' role in X-Auth-Token
resp = self._make_authed_request(
{'reseller_prefix': 'AUTH, PRE2',
'PRE2_service_roles': 'service'},
'12345678', '/v1/PRE2_12345678', user_role='service')
self.assertEqual(resp.status_int, 403)
# User presents token with 'service' role in X-Auth-Token
# and also in X-Service-Token
resp = self._make_authed_request(
{'reseller_prefix': 'AUTH, PRE2',
'PRE2_service_roles': 'service'},
'12345678', '/v1/PRE2_12345678', user_role='service',
service_role='service')
self.assertEqual(resp.status_int, 403)
def test_delete_denied_for_second_path(self):
resp = self._make_authed_request(
{'reseller_prefix': 'AUTH, PRE2',
'PRE2_service_roles': 'service'},
'12345678', '/v1/PRE2_12345678', service_role='service',
method='DELETE')
self.assertEqual(resp.status_int, 403)
def test_delete_of_second_path_by_reseller_admin(self):
resp = self._make_authed_request(
{'reseller_prefix': 'AUTH, PRE2',
'PRE2_service_roles': 'service'},
'12345678', '/v1/PRE2_12345678', user_role='ResellerAdmin',
method='DELETE')
self.assertEqual(resp.status_int, 200)
class BaseTestAuthorize(unittest.TestCase):
def setUp(self):
self.test_auth = keystoneauth.filter_factory({})(FakeApp())
@ -306,7 +492,7 @@ class BaseTestAuthorize(unittest.TestCase):
def _get_account(self, identity=None):
if not identity:
identity = self._get_identity()
return self.test_auth._get_account_for_tenant(
return get_account_for_tenant(self.test_auth,
identity['HTTP_X_TENANT_ID'])
def _get_identity(self, tenant_id='tenant_id', tenant_name='tenant_name',
@ -393,13 +579,13 @@ class TestAuthorize(BaseTestAuthorize):
self.assertTrue(req.environ.get('swift_owner'))
def test_authorize_succeeds_as_owner_for_operator_role(self):
roles = self.test_auth.operator_roles.split(',')
roles = operator_roles(self.test_auth)
identity = self._get_identity(roles=roles)
req = self._check_authenticate(identity=identity)
self.assertTrue(req.environ.get('swift_owner'))
def test_authorize_succeeds_as_owner_for_insensitive_operator_role(self):
roles = [r.upper() for r in self.test_auth.operator_roles.split(',')]
roles = [r.upper() for r in operator_roles(self.test_auth)]
identity = self._get_identity(roles=roles)
req = self._check_authenticate(identity=identity)
self.assertTrue(req.environ.get('swift_owner'))
@ -570,7 +756,7 @@ class TestAuthorize(BaseTestAuthorize):
'tenantID:userID')
def test_delete_own_account_not_allowed(self):
roles = self.test_auth.operator_roles.split(',')
roles = operator_roles(self.test_auth)
identity = self._get_identity(roles=roles)
account = self._get_account(identity)
self._check_authenticate(account=account,
@ -597,7 +783,7 @@ class TestAuthorize(BaseTestAuthorize):
self.test_auth(the_env, fake_start_response)
subreq = Request.blank(
'/v1/%s/c/o' % self.test_auth._get_account_for_tenant('test'))
'/v1/%s/c/o' % get_account_for_tenant(self.test_auth, 'test'))
subreq.environ.update(
self._get_identity(tenant_id='test', roles=['got_erased']))
@ -671,6 +857,7 @@ class TestAuthorize(BaseTestAuthorize):
def test_integral_keystone_identity(self):
user = ('U_ID', 'U_NAME')
roles = ('ROLE1', 'ROLE2')
service_roles = ('ROLE3', 'ROLE4')
project = ('P_ID', 'P_NAME')
user_domain = ('UD_ID', 'UD_NAME')
project_domain = ('PD_ID', 'PD_NAME')
@ -699,6 +886,7 @@ class TestAuthorize(BaseTestAuthorize):
expected = {'user': user,
'tenant': project,
'roles': list(roles),
'service_roles': [],
'user_domain': (None, None),
'project_domain': (None, None),
'auth_version': 0}
@ -710,6 +898,7 @@ class TestAuthorize(BaseTestAuthorize):
expected = {'user': user,
'tenant': project,
'roles': list(roles),
'service_roles': [],
'user_domain': (None, None),
'project_domain': (None, None),
'auth_version': 2}
@ -721,6 +910,19 @@ class TestAuthorize(BaseTestAuthorize):
expected = {'user': user,
'tenant': project,
'roles': list(roles),
'service_roles': [],
'user_domain': user_domain,
'project_domain': project_domain,
'auth_version': 3}
data = self.test_auth._integral_keystone_identity(req.environ)
self.assertEquals(expected, data)
# service token in environ
req.headers.update({'X-Service-Roles': '%s,%s' % service_roles})
expected = {'user': user,
'tenant': project,
'roles': list(roles),
'service_roles': list(service_roles),
'user_domain': user_domain,
'project_domain': project_domain,
'auth_version': 3}
@ -764,7 +966,7 @@ class TestIsNameAllowedInACL(BaseTestAuthorize):
scoped='account'):
project_name = 'foo'
account_id = '12345678'
account = self.test_auth._get_account_for_tenant(account_id)
account = get_account_for_tenant(self.test_auth, account_id)
parts = ('v1', account, None, None)
path = '/%s/%s' % parts[0:2]
@ -1175,5 +1377,106 @@ class TestSetProjectDomain(BaseTestAuthorize):
sysmeta_project_domain_id='test_id')
class ResellerInInfo(unittest.TestCase):
def setUp(self):
self.default_rules = {'operator_roles': ['admin', 'swiftoperator'],
'service_roles': []}
def test_defaults(self):
test_auth = keystoneauth.filter_factory({})(FakeApp())
self.assertEqual(test_auth.account_rules['AUTH_'], self.default_rules)
def test_multiple(self):
conf = {"reseller_prefix": "AUTH, '', PRE2"}
test_auth = keystoneauth.filter_factory(conf)(FakeApp())
self.assertEqual(test_auth.account_rules['AUTH_'], self.default_rules)
self.assertEqual(test_auth.account_rules[''], self.default_rules)
self.assertEqual(test_auth.account_rules['PRE2_'], self.default_rules)
class PrefixAccount(unittest.TestCase):
def test_default(self):
conf = {}
test_auth = keystoneauth.filter_factory(conf)(FakeApp())
self.assertEqual(get_account_for_tenant(test_auth,
'1234'), 'AUTH_1234')
self.assertEqual(test_auth._get_account_prefix(
'AUTH_1234'), 'AUTH_')
self.assertEqual(test_auth._get_account_prefix(
'JUNK_1234'), None)
self.assertTrue(test_auth._account_matches_tenant(
'AUTH_1234', '1234'))
self.assertFalse(test_auth._account_matches_tenant(
'AUTH_1234', '5678'))
self.assertFalse(test_auth._account_matches_tenant(
'JUNK_1234', '1234'))
def test_same_as_default(self):
conf = {'reseller_prefix': 'AUTH'}
test_auth = keystoneauth.filter_factory(conf)(FakeApp())
self.assertEqual(get_account_for_tenant(test_auth,
'1234'), 'AUTH_1234')
self.assertEqual(test_auth._get_account_prefix(
'AUTH_1234'), 'AUTH_')
self.assertEqual(test_auth._get_account_prefix(
'JUNK_1234'), None)
self.assertTrue(test_auth._account_matches_tenant(
'AUTH_1234', '1234'))
self.assertFalse(test_auth._account_matches_tenant(
'AUTH_1234', '5678'))
def test_blank_reseller(self):
conf = {'reseller_prefix': ''}
test_auth = keystoneauth.filter_factory(conf)(FakeApp())
self.assertEqual(get_account_for_tenant(test_auth,
'1234'), '1234')
self.assertEqual(test_auth._get_account_prefix(
'1234'), '')
self.assertEqual(test_auth._get_account_prefix(
'JUNK_1234'), '') # yes, it should return ''
self.assertTrue(test_auth._account_matches_tenant(
'1234', '1234'))
self.assertFalse(test_auth._account_matches_tenant(
'1234', '5678'))
self.assertFalse(test_auth._account_matches_tenant(
'JUNK_1234', '1234'))
def test_multiple_resellers(self):
conf = {'reseller_prefix': 'AUTH, PRE2'}
test_auth = keystoneauth.filter_factory(conf)(FakeApp())
self.assertEqual(get_account_for_tenant(test_auth,
'1234'), 'AUTH_1234')
self.assertEqual(test_auth._get_account_prefix(
'AUTH_1234'), 'AUTH_')
self.assertEqual(test_auth._get_account_prefix(
'JUNK_1234'), None)
self.assertTrue(test_auth._account_matches_tenant(
'AUTH_1234', '1234'))
self.assertTrue(test_auth._account_matches_tenant(
'PRE2_1234', '1234'))
self.assertFalse(test_auth._account_matches_tenant(
'AUTH_1234', '5678'))
self.assertFalse(test_auth._account_matches_tenant(
'PRE2_1234', '5678'))
def test_blank_plus_other_reseller(self):
conf = {'reseller_prefix': " '', PRE2"}
test_auth = keystoneauth.filter_factory(conf)(FakeApp())
self.assertEqual(get_account_for_tenant(test_auth,
'1234'), '1234')
self.assertEqual(test_auth._get_account_prefix(
'PRE2_1234'), 'PRE2_')
self.assertEqual(test_auth._get_account_prefix('JUNK_1234'), '')
self.assertTrue(test_auth._account_matches_tenant(
'1234', '1234'))
self.assertTrue(test_auth._account_matches_tenant(
'PRE2_1234', '1234'))
self.assertFalse(test_auth._account_matches_tenant(
'1234', '5678'))
self.assertFalse(test_auth._account_matches_tenant(
'PRE2_1234', '5678'))
if __name__ == '__main__':
unittest.main()

View File

@ -1,4 +1,4 @@
# Copyright (c) 2011 OpenStack Foundation
# Copyright (c) 2011-2015 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -119,10 +119,26 @@ class TestAuth(unittest.TestCase):
app = FakeApp()
ath = auth.filter_factory({})(app)
self.assertEquals(ath.reseller_prefix, 'AUTH_')
self.assertEquals(ath.reseller_prefixes, ['AUTH_'])
ath = auth.filter_factory({'reseller_prefix': 'TEST'})(app)
self.assertEquals(ath.reseller_prefix, 'TEST_')
self.assertEquals(ath.reseller_prefixes, ['TEST_'])
ath = auth.filter_factory({'reseller_prefix': 'TEST_'})(app)
self.assertEquals(ath.reseller_prefix, 'TEST_')
self.assertEquals(ath.reseller_prefixes, ['TEST_'])
ath = auth.filter_factory({'reseller_prefix': ''})(app)
self.assertEquals(ath.reseller_prefix, '')
self.assertEquals(ath.reseller_prefixes, [''])
ath = auth.filter_factory({'reseller_prefix': ' '})(app)
self.assertEquals(ath.reseller_prefix, '')
self.assertEquals(ath.reseller_prefixes, [''])
ath = auth.filter_factory({'reseller_prefix': ' '' '})(app)
self.assertEquals(ath.reseller_prefix, '')
self.assertEquals(ath.reseller_prefixes, [''])
ath = auth.filter_factory({'reseller_prefix': " '', TEST"})(app)
self.assertEquals(ath.reseller_prefix, '')
self.assertTrue('' in ath.reseller_prefixes)
self.assertTrue('TEST_' in ath.reseller_prefixes)
def test_auth_prefix_init(self):
app = FakeApp()
@ -264,8 +280,8 @@ class TestAuth(unittest.TestCase):
req = self._make_request('/v1/account', environ={'swift.authorize':
local_authorize})
resp = req.get_response(local_auth)
self.assertEquals(resp.status_int, 200)
self.assertEquals(req.environ['swift.authorize'], local_authorize)
self.assertEquals(resp.status_int, 200)
def test_auth_fail(self):
resp = self._make_request(
@ -791,6 +807,7 @@ class TestAuth(unittest.TestCase):
self.assertEquals(resp, None)
def test_get_user_group(self):
# More tests in TestGetUserGroups class
app = FakeApp()
ath = auth.filter_factory({})(app)
@ -812,6 +829,116 @@ class TestAuth(unittest.TestCase):
'Swift realm="BLAH_account"')
class TestAuthWithMultiplePrefixes(TestAuth):
"""
Repeats all tests in TestAuth except adds multiple
reseller_prefix items
"""
def setUp(self):
self.test_auth = auth.filter_factory(
{'reseller_prefix': 'AUTH_, SOMEOTHER_, YETANOTHER_'})(FakeApp())
class TestGetUserGroups(unittest.TestCase):
def test_custom_url_config(self):
app = FakeApp()
ath = auth.filter_factory({
'user_test_tester':
'testing .admin http://saio:8080/v1/AUTH_monkey'})(app)
groups = ath._get_user_groups('test', 'test:tester', 'AUTH_monkey')
self.assertEquals(groups, 'test,test:tester,AUTH_test,AUTH_monkey')
def test_no_prefix_reseller(self):
app = FakeApp()
ath = auth.filter_factory({'reseller_prefix': ''})(app)
ath.users = {'test:tester': {'groups': ['.admin']}}
groups = ath._get_user_groups('test', 'test:tester', 'test')
self.assertEquals(groups, 'test,test:tester')
ath.users = {'test:tester': {'groups': []}}
groups = ath._get_user_groups('test', 'test:tester', 'test')
self.assertEquals(groups, 'test,test:tester')
def test_single_reseller(self):
app = FakeApp()
ath = auth.filter_factory({})(app)
ath.users = {'test:tester': {'groups': ['.admin']}}
groups = ath._get_user_groups('test', 'test:tester', 'AUTH_test')
self.assertEquals(groups, 'test,test:tester,AUTH_test')
ath.users = {'test:tester': {'groups': []}}
groups = ath._get_user_groups('test', 'test:tester', 'AUTH_test')
self.assertEquals(groups, 'test,test:tester')
def test_multiple_reseller(self):
app = FakeApp()
ath = auth.filter_factory(
{'reseller_prefix': 'AUTH_, SOMEOTHER_, YETANOTHER_'})(app)
self.assertEquals(ath.reseller_prefixes, ['AUTH_', 'SOMEOTHER_',
'YETANOTHER_'])
ath.users = {'test:tester': {'groups': ['.admin']}}
groups = ath._get_user_groups('test', 'test:tester', 'AUTH_test')
self.assertEquals(groups,
'test,test:tester,AUTH_test,'
'SOMEOTHER_test,YETANOTHER_test')
ath.users = {'test:tester': {'groups': []}}
groups = ath._get_user_groups('test', 'test:tester', 'AUTH_test')
self.assertEquals(groups, 'test,test:tester')
class TestDefinitiveAuth(unittest.TestCase):
def setUp(self):
self.test_auth = auth.filter_factory(
{'reseller_prefix': 'AUTH_, SOMEOTHER_'})(FakeApp())
def test_noreseller_prefix(self):
ath = auth.filter_factory({'reseller_prefix': ''})(FakeApp())
result = ath._is_definitive_auth(path='/v1/test')
self.assertEquals(result, False)
result = ath._is_definitive_auth(path='/v1/AUTH_test')
self.assertEquals(result, False)
result = ath._is_definitive_auth(path='/v1/BLAH_test')
self.assertEquals(result, False)
def test_blank_prefix(self):
ath = auth.filter_factory({'reseller_prefix':
" '', SOMEOTHER"})(FakeApp())
result = ath._is_definitive_auth(path='/v1/test')
self.assertEquals(result, False)
result = ath._is_definitive_auth(path='/v1/SOMEOTHER_test')
self.assertEquals(result, True)
result = ath._is_definitive_auth(path='/v1/SOMEOTHERtest')
self.assertEquals(result, False)
def test_default_prefix(self):
ath = auth.filter_factory({})(FakeApp())
result = ath._is_definitive_auth(path='/v1/AUTH_test')
self.assertEquals(result, True)
result = ath._is_definitive_auth(path='/v1/BLAH_test')
self.assertEquals(result, False)
ath = auth.filter_factory({'reseller_prefix': 'AUTH'})(FakeApp())
result = ath._is_definitive_auth(path='/v1/AUTH_test')
self.assertEquals(result, True)
result = ath._is_definitive_auth(path='/v1/BLAH_test')
self.assertEquals(result, False)
def test_multiple_prefixes(self):
ath = auth.filter_factory({'reseller_prefix':
'AUTH, SOMEOTHER'})(FakeApp())
result = ath._is_definitive_auth(path='/v1/AUTH_test')
self.assertEquals(result, True)
result = ath._is_definitive_auth(path='/v1/SOMEOTHER_test')
self.assertEquals(result, True)
result = ath._is_definitive_auth(path='/v1/BLAH_test')
self.assertEquals(result, False)
class TestParseUserCreation(unittest.TestCase):
def test_parse_user_creation(self):
auth_filter = auth.filter_factory({
@ -869,6 +996,15 @@ class TestParseUserCreation(unittest.TestCase):
class TestAccountAcls(unittest.TestCase):
"""
These tests use a single reseller prefix (AUTH_) and the
target paths are /v1/AUTH_<blah>
"""
def setUp(self):
self.reseller_prefix = {}
self.accpre = 'AUTH'
def _make_request(self, path, **kwargs):
# Our TestAccountAcls default request will have a valid auth token
version, acct, _ = split_path(path, 1, 3, True)
@ -897,38 +1033,51 @@ class TestAccountAcls(unittest.TestCase):
return req
def _conf(self, moreconf):
conf = self.reseller_prefix
conf.update(moreconf)
return conf
def test_account_acl_success(self):
test_auth = auth.filter_factory({'user_admin_user': 'testing'})(
test_auth = auth.filter_factory(
self._conf({'user_admin_user': 'testing'}))(
FakeApp(iter(NO_CONTENT_RESP * 1)))
# admin (not a swift admin) wants to read from otheracct
req = self._make_request('/v1/AUTH_otheract', user_groups="AUTH_admin")
req = self._make_request('/v1/%s_otheract' % self.accpre,
user_groups="AUTH_admin")
# The request returned by _make_request should be allowed
resp = req.get_response(test_auth)
self.assertEquals(resp.status_int, 204)
def test_account_acl_failures(self):
test_auth = auth.filter_factory({'user_admin_user': 'testing'})(
test_auth = auth.filter_factory(
self._conf({'user_admin_user': 'testing'}))(
FakeApp())
# If I'm not authed as anyone on the ACLs, I shouldn't get in
req = self._make_request('/v1/AUTH_otheract', user_groups="AUTH_bob")
req = self._make_request('/v1/%s_otheract' % self.accpre,
user_groups="AUTH_bob")
resp = req.get_response(test_auth)
self.assertEquals(resp.status_int, 403)
# If the target account has no ACLs, a non-owner shouldn't get in
req = self._make_request('/v1/AUTH_otheract', user_groups="AUTH_admin",
req = self._make_request('/v1/%s_otheract' % self.accpre,
user_groups="AUTH_admin",
acls={})
resp = req.get_response(test_auth)
self.assertEquals(resp.status_int, 403)
def test_admin_privileges(self):
test_auth = auth.filter_factory({'user_admin_user': 'testing'})(
test_auth = auth.filter_factory(
self._conf({'user_admin_user': 'testing'}))(
FakeApp(iter(NO_CONTENT_RESP * 18)))
for target in ('/v1/AUTH_otheracct', '/v1/AUTH_otheracct/container',
'/v1/AUTH_otheracct/container/obj'):
for target in (
'/v1/%s_otheracct' % self.accpre,
'/v1/%s_otheracct/container' % self.accpre,
'/v1/%s_otheracct/container/obj' % self.accpre):
for method in ('GET', 'HEAD', 'OPTIONS', 'PUT', 'POST', 'DELETE'):
# Admin ACL user can do anything
req = self._make_request(target, user_groups="AUTH_admin",
@ -941,10 +1090,11 @@ class TestAccountAcls(unittest.TestCase):
self.assertTrue(req.environ.get('swift_owner'))
def test_readwrite_privileges(self):
test_auth = auth.filter_factory({'user_rw_user': 'testing'})(
test_auth = auth.filter_factory(
self._conf({'user_rw_user': 'testing'}))(
FakeApp(iter(NO_CONTENT_RESP * 15)))
for target in ('/v1/AUTH_otheracct',):
for target in ('/v1/%s_otheracct' % self.accpre,):
for method in ('GET', 'HEAD', 'OPTIONS'):
# Read-Write user can read account data
req = self._make_request(target, user_groups="AUTH_rw",
@ -964,7 +1114,8 @@ class TestAccountAcls(unittest.TestCase):
# RW user should be able to GET, PUT, POST, or DELETE to containers
# and objects
for target in ('/v1/AUTH_otheracct/c', '/v1/AUTH_otheracct/c/o'):
for target in ('/v1/%s_otheracct/c' % self.accpre,
'/v1/%s_otheracct/c/o' % self.accpre):
for method in ('GET', 'HEAD', 'OPTIONS', 'PUT', 'POST', 'DELETE'):
req = self._make_request(target, user_groups="AUTH_rw",
environ={'REQUEST_METHOD': method})
@ -972,13 +1123,15 @@ class TestAccountAcls(unittest.TestCase):
self.assertEquals(resp.status_int, 204)
def test_readonly_privileges(self):
test_auth = auth.filter_factory({'user_ro_user': 'testing'})(
test_auth = auth.filter_factory(
self._conf({'user_ro_user': 'testing'}))(
FakeApp(iter(NO_CONTENT_RESP * 9)))
# ReadOnly user should NOT be able to PUT, POST, or DELETE to account,
# container, or object
for target in ('/v1/AUTH_otheracct', '/v1/AUTH_otheracct/cont',
'/v1/AUTH_otheracct/cont/obj'):
for target in ('/v1/%s_otheracct' % self.accpre,
'/v1/%s_otheracct/cont' % self.accpre,
'/v1/%s_otheracct/cont/obj' % self.accpre):
for method in ('GET', 'HEAD', 'OPTIONS'):
req = self._make_request(target, user_groups="AUTH_ro",
environ={'REQUEST_METHOD': method})
@ -995,12 +1148,14 @@ class TestAccountAcls(unittest.TestCase):
self.assertFalse(req.environ.get('swift_owner'))
def test_user_gets_best_acl(self):
test_auth = auth.filter_factory({'user_acct_username': 'testing'})(
test_auth = auth.filter_factory(
self._conf({'user_acct_username': 'testing'}))(
FakeApp(iter(NO_CONTENT_RESP * 18)))
mygroups = "AUTH_acct,AUTH_ro,AUTH_something,AUTH_admin"
for target in ('/v1/AUTH_otheracct', '/v1/AUTH_otheracct/container',
'/v1/AUTH_otheracct/container/obj'):
for target in ('/v1/%s_otheracct' % self.accpre,
'/v1/%s_otheracct/container' % self.accpre,
'/v1/%s_otheracct/container/obj' % self.accpre):
for method in ('GET', 'HEAD', 'OPTIONS', 'PUT', 'POST', 'DELETE'):
# Admin ACL user can do anything
req = self._make_request(target, user_groups=mygroups,
@ -1015,9 +1170,11 @@ class TestAccountAcls(unittest.TestCase):
self.assertTrue(req.environ.get('swift_owner'))
def test_acl_syntax_verification(self):
test_auth = auth.filter_factory({'user_admin_user': 'testing'})(
test_auth = auth.filter_factory(
self._conf({'user_admin_user': 'testing .admin'}))(
FakeApp(iter(NO_CONTENT_RESP * 5)))
user_groups = test_auth._get_user_groups('admin', 'admin:user',
'AUTH_admin')
good_headers = {'X-Auth-Token': 'AUTH_t'}
good_acl = '{"read-only":["a","b"]}'
bad_acl = 'syntactically invalid acl -- this does not parse as JSON'
@ -1026,23 +1183,25 @@ class TestAccountAcls(unittest.TestCase):
not_dict_acl = '["read-only"]'
not_dict_acl2 = 1
empty_acls = ['{}', '', '{ }']
target = '/v1/AUTH_firstacct'
target = '/v1/%s_firstacct' % self.accpre
# no acls -- no problem!
req = self._make_request(target, headers=good_headers)
req = self._make_request(target, headers=good_headers,
user_groups=user_groups)
resp = req.get_response(test_auth)
self.assertEquals(resp.status_int, 204)
# syntactically valid acls should go through
update = {'x-account-access-control': good_acl}
req = self._make_request(target, headers=dict(good_headers, **update))
req = self._make_request(target, user_groups=user_groups,
headers=dict(good_headers, **update))
resp = req.get_response(test_auth)
self.assertEquals(resp.status_int, 204)
# syntactically valid empty acls should go through
for acl in empty_acls:
update = {'x-account-access-control': acl}
req = self._make_request(target,
req = self._make_request(target, user_groups=user_groups,
headers=dict(good_headers, **update))
resp = req.get_response(test_auth)
self.assertEquals(resp.status_int, 204)
@ -1125,6 +1284,299 @@ class TestAccountAcls(unittest.TestCase):
self.assertEquals(resp.status_int, 400)
class TestAuthMultiplePrefixes(TestAccountAcls):
"""
These tests repeat the same tests as TestAccountACLs,
but use multiple reseller prefix items (AUTH_ and SOMEOTHER_).
The target paths are /v1/SOMEOTHER_<blah>
"""
def setUp(self):
self.reseller_prefix = {'reseller_prefix': 'AUTH_, SOMEOTHER_'}
self.accpre = 'SOMEOTHER'
class PrefixAccount(unittest.TestCase):
def test_default(self):
conf = {}
test_auth = auth.filter_factory(conf)(FakeApp())
self.assertEquals(test_auth._get_account_prefix(
'AUTH_1234'), 'AUTH_')
self.assertEquals(test_auth._get_account_prefix(
'JUNK_1234'), None)
def test_same_as_default(self):
conf = {'reseller_prefix': 'AUTH'}
test_auth = auth.filter_factory(conf)(FakeApp())
self.assertEquals(test_auth._get_account_prefix(
'AUTH_1234'), 'AUTH_')
self.assertEquals(test_auth._get_account_prefix(
'JUNK_1234'), None)
def test_blank_reseller(self):
conf = {'reseller_prefix': ''}
test_auth = auth.filter_factory(conf)(FakeApp())
self.assertEquals(test_auth._get_account_prefix(
'1234'), '')
self.assertEquals(test_auth._get_account_prefix(
'JUNK_1234'), '') # yes, it should return ''
def test_multiple_resellers(self):
conf = {'reseller_prefix': 'AUTH, PRE2'}
test_auth = auth.filter_factory(conf)(FakeApp())
self.assertEquals(test_auth._get_account_prefix(
'AUTH_1234'), 'AUTH_')
self.assertEquals(test_auth._get_account_prefix(
'JUNK_1234'), None)
class ServiceTokenFunctionality(unittest.TestCase):
def _make_authed_request(self, conf, remote_user, path, method='GET'):
"""Make a request with tempauth as auth
Acts as though the user had presented a token
granting groups as described in remote_user.
If remote_user contains the .service group, it emulates presenting
X-Service-Token containing a .service group.
:param conf: configuration for tempauth
:param remote_user: the groups the user belongs to. Examples:
acct:joe,acct user joe, no .admin
acct:joe,acct,AUTH_joeacct user joe, jas .admin group
acct:joe,acct,AUTH_joeacct,.service adds .service group
:param path: the path of the request
:param method: the method (defaults to GET)
:returns: response object
"""
self.req = Request.blank(path)
self.req.method = method
self.req.remote_user = remote_user
fake_app = FakeApp(iter([('200 OK', {}, '')]))
test_auth = auth.filter_factory(conf)(fake_app)
resp = self.req.get_response(test_auth)
return resp
def test_authed_for_path_single(self):
resp = self._make_authed_request({}, 'acct:joe,acct,AUTH_acct',
'/v1/AUTH_acct')
self.assertEqual(resp.status_int, 200)
resp = self._make_authed_request(
{'reseller_prefix': 'AUTH'}, 'acct:joe,acct,AUTH_acct',
'/v1/AUTH_acct/c', method='PUT')
self.assertEqual(resp.status_int, 200)
resp = self._make_authed_request(
{'reseller_prefix': 'AUTH'},
'admin:mary,admin,AUTH_admin,.reseller_admin',
'/v1/AUTH_acct', method='GET')
self.assertEqual(resp.status_int, 200)
resp = self._make_authed_request(
{'reseller_prefix': 'AUTH'},
'admin:mary,admin,AUTH_admin,.reseller_admin',
'/v1/AUTH_acct', method='DELETE')
self.assertEqual(resp.status_int, 200)
def test_denied_for_path_single(self):
resp = self._make_authed_request(
{'reseller_prefix': 'AUTH'},
'fredacc:fred,fredacct,AUTH_fredacc',
'/v1/AUTH_acct')
self.assertEqual(resp.status_int, 403)
resp = self._make_authed_request(
{'reseller_prefix': 'AUTH'},
'acct:joe,acct',
'/v1/AUTH_acct',
method='PUT')
self.assertEqual(resp.status_int, 403)
resp = self._make_authed_request(
{'reseller_prefix': 'AUTH'},
'acct:joe,acct,AUTH_acct',
'/v1/AUTH_acct',
method='DELETE')
self.assertEqual(resp.status_int, 403)
def test_authed_for_primary_path_multiple(self):
resp = self._make_authed_request(
{'reseller_prefix': 'AUTH, PRE2'},
'acct:joe,acct,AUTH_acct,PRE2_acct',
'/v1/PRE2_acct')
self.assertEqual(resp.status_int, 200)
def test_denied_for_second_path_with_only_operator_role(self):
# User only presents a token in X-Auth-Token (or in X-Service-Token)
resp = self._make_authed_request(
{'reseller_prefix': 'AUTH, PRE2',
'PRE2_require_group': '.service'},
'acct:joe,acct,AUTH_acct,PRE2_acct',
'/v1/PRE2_acct')
self.assertEqual(resp.status_int, 403)
# User puts token in both X-Auth-Token and X-Service-Token
resp = self._make_authed_request(
{'reseller_prefix': 'AUTH, PRE2',
'PRE2_require_group': '.service'},
'acct:joe,acct,AUTH_acct,PRE2_acct,AUTH_acct,PRE2_acct',
'/v1/PRE2_acct')
self.assertEqual(resp.status_int, 403)
def test_authed_for_second_path_with_operator_role_and_service(self):
resp = self._make_authed_request(
{'reseller_prefix': 'AUTH, PRE2',
'PRE2_require_group': '.service'},
'acct:joe,acct,AUTH_acct,PRE2_acct,'
'admin:mary,admin,AUTH_admin,PRE2_admin,.service',
'/v1/PRE2_acct')
self.assertEqual(resp.status_int, 200)
def test_denied_for_second_path_with_only_service(self):
resp = self._make_authed_request(
{'reseller_prefix': 'AUTH, PRE2',
'PRE2_require_group': '.service'},
'admin:mary,admin,AUTH_admin,PRE2_admin,.service',
'/v1/PRE2_acct')
self.assertEqual(resp.status_int, 403)
def test_denied_for_second_path_for_service_user(self):
# User presents token with 'service' role in X-Auth-Token
resp = self._make_authed_request(
{'reseller_prefix': 'AUTH, PRE2',
'PRE2_require_group': '.service'},
'admin:mary,admin,AUTH_admin,PRE2_admin,.service',
'/v1/PRE2_acct')
self.assertEqual(resp.status_int, 403)
# User presents token with 'service' role in X-Auth-Token
# and also in X-Service-Token
resp = self._make_authed_request(
{'reseller_prefix': 'AUTH, PRE2',
'PRE2_require_group': '.service'},
'admin:mary,admin,AUTH_admin,PRE2_admin,.service,'
'admin:mary,admin,AUTH_admin,PRE2_admin,.service',
'/v1/PRE2_acct')
self.assertEqual(resp.status_int, 403)
def test_delete_denied_for_second_path(self):
resp = self._make_authed_request(
{'reseller_prefix': 'AUTH, PRE2',
'PRE2_require_group': '.service'},
'acct:joe,acct,AUTH_acct,PRE2_acct,'
'admin:mary,admin,AUTH_admin,PRE2_admin,.service',
'/v1/PRE2_acct',
method='DELETE')
self.assertEqual(resp.status_int, 403)
def test_delete_of_second_path_by_reseller_admin(self):
resp = self._make_authed_request(
{'reseller_prefix': 'AUTH, PRE2',
'PRE2_require_group': '.service'},
'acct:joe,acct,AUTH_acct,PRE2_acct,'
'admin:mary,admin,AUTH_admin,PRE2_admin,.reseller_admin',
'/v1/PRE2_acct',
method='DELETE')
self.assertEqual(resp.status_int, 200)
class TestTokenHandling(unittest.TestCase):
def _make_request(self, conf, path, headers, method='GET'):
"""Make a request with tempauth as auth
It sets up AUTH_t and AUTH_s as tokens in memcache, where "joe"
has .admin role on /v1/AUTH_acct and user "glance" has .service
role on /v1/AUTH_admin.
:param conf: configuration for tempauth
:param path: the path of the request
:param headers: allows you to pass X-Auth-Token, etc.
:param method: the method (defaults to GET)
:returns: response object
"""
fake_app = FakeApp(iter([('200 OK', {}, '')]))
self.test_auth = auth.filter_factory(conf)(fake_app)
self.req = Request.blank(path, headers=headers)
self.req.method = method
self.req.environ['swift.cache'] = FakeMemcache()
self._setup_user_and_token('AUTH_t', 'acct', 'acct:joe',
'.admin')
self._setup_user_and_token('AUTH_s', 'admin', 'admin:glance',
'.service')
resp = self.req.get_response(self.test_auth)
return resp
def _setup_user_and_token(self, token_name, account, account_user,
groups):
"""Setup named token in memcache
:param token_name: name of token
:param account: example: acct
:param account_user: example: acct_joe
:param groups: example: .admin
"""
self.test_auth.users[account_user] = dict(groups=[groups])
account_id = 'AUTH_%s' % account
cache_key = 'AUTH_/token/%s' % token_name
cache_entry = (time() + 3600,
self.test_auth._get_user_groups(account,
account_user,
account_id))
self.req.environ['swift.cache'].set(cache_key, cache_entry)
def test_tokens_set_remote_user(self):
conf = {} # Default conf
resp = self._make_request(conf, '/v1/AUTH_acct',
{'x-auth-token': 'AUTH_t'})
self.assertEqual(self.req.environ['REMOTE_USER'],
'acct,acct:joe,AUTH_acct')
self.assertEqual(resp.status_int, 200)
# Add x-service-token
resp = self._make_request(conf, '/v1/AUTH_acct',
{'x-auth-token': 'AUTH_t',
'x-service-token': 'AUTH_s'})
self.assertEqual(self.req.environ['REMOTE_USER'],
'acct,acct:joe,AUTH_acct,admin,admin:glance,.service')
self.assertEqual(resp.status_int, 200)
# Put x-auth-token value into x-service-token
resp = self._make_request(conf, '/v1/AUTH_acct',
{'x-auth-token': 'AUTH_t',
'x-service-token': 'AUTH_t'})
self.assertEqual(self.req.environ['REMOTE_USER'],
'acct,acct:joe,AUTH_acct,acct,acct:joe,AUTH_acct')
self.assertEqual(resp.status_int, 200)
def test_service_token_given_and_needed(self):
conf = {'reseller_prefix': 'AUTH, PRE2',
'PRE2_require_group': '.service'}
resp = self._make_request(conf, '/v1/PRE2_acct',
{'x-auth-token': 'AUTH_t',
'x-service-token': 'AUTH_s'})
self.assertEqual(resp.status_int, 200)
def test_service_token_omitted(self):
conf = {'reseller_prefix': 'AUTH, PRE2',
'PRE2_require_group': '.service'}
resp = self._make_request(conf, '/v1/PRE2_acct',
{'x-auth-token': 'AUTH_t'})
self.assertEqual(resp.status_int, 403)
def test_invalid_tokens(self):
conf = {'reseller_prefix': 'AUTH, PRE2',
'PRE2_require_group': '.service'}
resp = self._make_request(conf, '/v1/PRE2_acct',
{'x-auth-token': 'AUTH_junk'})
self.assertEqual(resp.status_int, 401)
resp = self._make_request(conf, '/v1/PRE2_acct',
{'x-auth-token': 'AUTH_t',
'x-service-token': 'AUTH_junk'})
self.assertEqual(resp.status_int, 403)
resp = self._make_request(conf, '/v1/PRE2_acct',
{'x-auth-token': 'AUTH_junk',
'x-service-token': 'AUTH_s'})
self.assertEqual(resp.status_int, 401)
class TestUtilityMethods(unittest.TestCase):
def test_account_acls_bad_path_raises_exception(self):
auth_inst = auth.filter_factory({})(FakeApp())

View File

@ -5,7 +5,7 @@
# Copyright (c) 2013 Alex Gaynor
# Copyright (c) 2013 Chuck Thier
# Copyright (c) 2013 David Goetz
# Copyright (c) 2013 Donagh McCabe
# Copyright (c) 2015 Donagh McCabe
# Copyright (c) 2013 Greg Lange
# Copyright (c) 2013 John Dickinson
# Copyright (c) 2013 Kun Huang
@ -66,8 +66,7 @@ class TestTempURL(unittest.TestCase):
def setUp(self):
self.app = FakeApp()
self.auth = tempauth.filter_factory({})(self.app)
self.auth.reseller_prefix = 'a'
self.auth = tempauth.filter_factory({'reseller_prefix': ''})(self.app)
self.tempurl = tempurl.filter_factory({})(self.auth)
def _make_request(self, path, environ=None, keys=(), **kwargs):

View File

@ -2756,6 +2756,186 @@ cluster_dfw1 = http://dfw1.host/v1/
self.assertEqual(0, len(logger.get_lines_for_level('error')))
class ResellerConfReader(unittest.TestCase):
def setUp(self):
self.default_rules = {'operator_roles': ['admin', 'swiftoperator'],
'service_roles': [],
'require_group': ''}
def test_defaults(self):
conf = {}
prefixes, options = utils.config_read_reseller_options(
conf, self.default_rules)
self.assertEqual(prefixes, ['AUTH_'])
self.assertEqual(options['AUTH_'], self.default_rules)
def test_same_as_default(self):
conf = {'reseller_prefix': 'AUTH',
'operator_roles': 'admin, swiftoperator'}
prefixes, options = utils.config_read_reseller_options(
conf, self.default_rules)
self.assertEqual(prefixes, ['AUTH_'])
self.assertEqual(options['AUTH_'], self.default_rules)
def test_single_blank_reseller(self):
conf = {'reseller_prefix': ''}
prefixes, options = utils.config_read_reseller_options(
conf, self.default_rules)
self.assertEqual(prefixes, [''])
self.assertEqual(options[''], self.default_rules)
def test_single_blank_reseller_with_conf(self):
conf = {'reseller_prefix': '',
"''operator_roles": 'role1, role2'}
prefixes, options = utils.config_read_reseller_options(
conf, self.default_rules)
self.assertEqual(prefixes, [''])
self.assertEqual(options[''].get('operator_roles'),
['role1', 'role2'])
self.assertEqual(options[''].get('service_roles'),
self.default_rules.get('service_roles'))
self.assertEqual(options[''].get('require_group'),
self.default_rules.get('require_group'))
def test_multiple_same_resellers(self):
conf = {'reseller_prefix': " '' , '' "}
prefixes, options = utils.config_read_reseller_options(
conf, self.default_rules)
self.assertEqual(prefixes, [''])
conf = {'reseller_prefix': '_, _'}
prefixes, options = utils.config_read_reseller_options(
conf, self.default_rules)
self.assertEqual(prefixes, ['_'])
conf = {'reseller_prefix': 'AUTH, PRE2, AUTH, PRE2'}
prefixes, options = utils.config_read_reseller_options(
conf, self.default_rules)
self.assertEqual(prefixes, ['AUTH_', 'PRE2_'])
def test_several_resellers_with_conf(self):
conf = {'reseller_prefix': 'PRE1, PRE2',
'PRE1_operator_roles': 'role1, role2',
'PRE1_service_roles': 'role3, role4',
'PRE2_operator_roles': 'role5',
'PRE2_service_roles': 'role6',
'PRE2_require_group': 'pre2_group'}
prefixes, options = utils.config_read_reseller_options(
conf, self.default_rules)
self.assertEqual(prefixes, ['PRE1_', 'PRE2_'])
self.assertEquals(set(['role1', 'role2']),
set(options['PRE1_'].get('operator_roles')))
self.assertEquals(['role5'],
options['PRE2_'].get('operator_roles'))
self.assertEquals(set(['role3', 'role4']),
set(options['PRE1_'].get('service_roles')))
self.assertEquals(['role6'], options['PRE2_'].get('service_roles'))
self.assertEquals('', options['PRE1_'].get('require_group'))
self.assertEquals('pre2_group', options['PRE2_'].get('require_group'))
def test_several_resellers_first_blank(self):
conf = {'reseller_prefix': " '' , PRE2",
"''operator_roles": 'role1, role2',
"''service_roles": 'role3, role4',
'PRE2_operator_roles': 'role5',
'PRE2_service_roles': 'role6',
'PRE2_require_group': 'pre2_group'}
prefixes, options = utils.config_read_reseller_options(
conf, self.default_rules)
self.assertEqual(prefixes, ['', 'PRE2_'])
self.assertEquals(set(['role1', 'role2']),
set(options[''].get('operator_roles')))
self.assertEquals(['role5'],
options['PRE2_'].get('operator_roles'))
self.assertEquals(set(['role3', 'role4']),
set(options[''].get('service_roles')))
self.assertEquals(['role6'], options['PRE2_'].get('service_roles'))
self.assertEquals('', options[''].get('require_group'))
self.assertEquals('pre2_group', options['PRE2_'].get('require_group'))
def test_several_resellers_with_blank_comma(self):
conf = {'reseller_prefix': "AUTH , '', PRE2",
"''operator_roles": 'role1, role2',
"''service_roles": 'role3, role4',
'PRE2_operator_roles': 'role5',
'PRE2_service_roles': 'role6',
'PRE2_require_group': 'pre2_group'}
prefixes, options = utils.config_read_reseller_options(
conf, self.default_rules)
self.assertEqual(prefixes, ['AUTH_', '', 'PRE2_'])
self.assertEquals(set(['admin', 'swiftoperator']),
set(options['AUTH_'].get('operator_roles')))
self.assertEquals(set(['role1', 'role2']),
set(options[''].get('operator_roles')))
self.assertEquals(['role5'],
options['PRE2_'].get('operator_roles'))
self.assertEquals([],
options['AUTH_'].get('service_roles'))
self.assertEquals(set(['role3', 'role4']),
set(options[''].get('service_roles')))
self.assertEquals(['role6'], options['PRE2_'].get('service_roles'))
self.assertEquals('', options['AUTH_'].get('require_group'))
self.assertEquals('', options[''].get('require_group'))
self.assertEquals('pre2_group', options['PRE2_'].get('require_group'))
def test_stray_comma(self):
conf = {'reseller_prefix': "AUTH ,, PRE2",
"''operator_roles": 'role1, role2',
"''service_roles": 'role3, role4',
'PRE2_operator_roles': 'role5',
'PRE2_service_roles': 'role6',
'PRE2_require_group': 'pre2_group'}
prefixes, options = utils.config_read_reseller_options(
conf, self.default_rules)
self.assertEqual(prefixes, ['AUTH_', 'PRE2_'])
self.assertEquals(set(['admin', 'swiftoperator']),
set(options['AUTH_'].get('operator_roles')))
self.assertEquals(['role5'],
options['PRE2_'].get('operator_roles'))
self.assertEquals([],
options['AUTH_'].get('service_roles'))
self.assertEquals(['role6'], options['PRE2_'].get('service_roles'))
self.assertEquals('', options['AUTH_'].get('require_group'))
self.assertEquals('pre2_group', options['PRE2_'].get('require_group'))
def test_multiple_stray_commas_resellers(self):
conf = {'reseller_prefix': ' , , ,'}
prefixes, options = utils.config_read_reseller_options(
conf, self.default_rules)
self.assertEqual(prefixes, [''])
self.assertEqual(options[''], self.default_rules)
def test_unprefixed_options(self):
conf = {'reseller_prefix': "AUTH , '', PRE2",
"operator_roles": 'role1, role2',
"service_roles": 'role3, role4',
'require_group': 'auth_blank_group',
'PRE2_operator_roles': 'role5',
'PRE2_service_roles': 'role6',
'PRE2_require_group': 'pre2_group'}
prefixes, options = utils.config_read_reseller_options(
conf, self.default_rules)
self.assertEqual(prefixes, ['AUTH_', '', 'PRE2_'])
self.assertEquals(set(['role1', 'role2']),
set(options['AUTH_'].get('operator_roles')))
self.assertEquals(set(['role1', 'role2']),
set(options[''].get('operator_roles')))
self.assertEquals(['role5'],
options['PRE2_'].get('operator_roles'))
self.assertEquals(set(['role3', 'role4']),
set(options['AUTH_'].get('service_roles')))
self.assertEquals(set(['role3', 'role4']),
set(options[''].get('service_roles')))
self.assertEquals(['role6'], options['PRE2_'].get('service_roles'))
self.assertEquals('auth_blank_group',
options['AUTH_'].get('require_group'))
self.assertEquals('auth_blank_group', options[''].get('require_group'))
self.assertEquals('pre2_group', options['PRE2_'].get('require_group'))
class TestSwiftInfo(unittest.TestCase):
def tearDown(self):