Add support for a "sudo" like mechanism in the API.

Adding an X-Moniker-Tenant-ID request header to API calls will allow admins to `sudo` to another tenant.

Fixes bug #1083597

Change-Id: I95f07cc7f017b0ecf9989bf0c4bae8962c039523
This commit is contained in:
Kiall Mac Innes 2013-01-14 14:42:31 +00:00
parent 49e9691d47
commit 86202458ca
12 changed files with 149 additions and 27 deletions

View File

@ -0,0 +1,20 @@
General
=======
Administrative Access
---------------------
Administrative users can "sudo" into another tenant by providing an additional HTTP header: 'X-Moniker-Tenant-ID'
.. http:get:: /url
Example HTTP Request using the X-Moniker-Tenant-ID header
**Example request**:
.. sourcecode:: http
GET /domains/09494b72b65b42979efb187f65a0553e HTTP/1.1
Host: example.com
Accept: application/json
X-Moniker-Tenant-ID: 12345

View File

@ -4,14 +4,14 @@ use = egg:Paste#urlmap
[composite:osapi_dns_api_v1] [composite:osapi_dns_api_v1]
use = call:moniker.api.auth:pipeline_factory use = call:moniker.api.auth:pipeline_factory
noauth = noauth osapi_dns_app_v1 noauth = noauthcontext osapi_dns_app_v1
keystone = authtoken keystonecontext osapi_dns_app_v1 keystone = authtoken keystonecontext osapi_dns_app_v1
[app:osapi_dns_app_v1] [app:osapi_dns_app_v1]
paste.app_factory = moniker.api.v1:factory paste.app_factory = moniker.api.v1:factory
[filter:noauth] [filter:noauthcontext]
paste.filter_factory = moniker.api.auth:NoAuthMiddleware.factory paste.filter_factory = moniker.api.auth:NoAuthContextMiddleware.factory
[filter:keystonecontext] [filter:keystonecontext]
paste.filter_factory = moniker.api.auth:KeystoneContextMiddleware.factory paste.filter_factory = moniker.api.auth:KeystoneContextMiddleware.factory

View File

@ -1,6 +1,7 @@
{ {
"admin": "role:admin or is_admin:True", "admin": "role:admin or is_admin:True",
"admin_or_owner": "rule:admin or tenant_id:%(tenant_id)s", "owner": "tenant_id:%(tenant_id)s or tenant_id:%(effective_tenant_id)s",
"admin_or_owner": "rule:admin or rule:owner",
"default": "rule:admin_or_owner", "default": "rule:admin_or_owner",
@ -22,5 +23,6 @@
"update_record": "rule:admin_or_owner", "update_record": "rule:admin_or_owner",
"delete_record": "rule:admin_or_owner", "delete_record": "rule:admin_or_owner",
"use_sudo": "rule:admin",
"use_reserved_domain_suffix": "rule:admin" "use_reserved_domain_suffix": "rule:admin"
} }

View File

@ -47,10 +47,17 @@ class KeystoneContextMiddleware(wsgi.Middleware):
user=headers.get('X-User-ID'), user=headers.get('X-User-ID'),
tenant=headers.get('X-Tenant-ID'), tenant=headers.get('X-Tenant-ID'),
roles=roles) roles=roles)
# Attempt to sudo, if requested.
sudo_tenant_id = headers.get('X-Moniker-Tenant-ID', None)
if sudo_tenant_id:
context.sudo(sudo_tenant_id)
request.environ['context'] = context request.environ['context'] = context
class NoAuthMiddleware(wsgi.Middleware): class NoAuthContextMiddleware(wsgi.Middleware):
def process_request(self, request): def process_request(self, request):
# NOTE(kiall): This makes the assumption that disabling authentication # NOTE(kiall): This makes the assumption that disabling authentication
# means you wish to allow full access to everyone. # means you wish to allow full access to everyone.

View File

@ -19,6 +19,7 @@ from moniker.openstack.common import wsgi
from moniker.openstack.common import cfg from moniker.openstack.common import cfg
from moniker import exceptions from moniker import exceptions
from moniker import utils from moniker import utils
from moniker import policy
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
@ -36,6 +37,8 @@ class Service(wsgi.Service):
LOG.info('Using api-paste-config found at: %s' % config_paths[0]) LOG.info('Using api-paste-config found at: %s' % config_paths[0])
policy.init_policy()
application = deploy.loadapp("config:%s" % config_paths[0], application = deploy.loadapp("config:%s" % config_paths[0],
name='osapi_dns') name='osapi_dns')

View File

@ -44,7 +44,6 @@ def create_domain():
try: try:
domain_schema.validate(values) domain_schema.validate(values)
values['tenant_id'] = context.tenant_id
domain = central_api.create_domain(context, values) domain = central_api.create_domain(context, values)
except exceptions.Forbidden: except exceptions.Forbidden:
return flask.Response(status=401) return flask.Response(status=401)

View File

@ -195,8 +195,7 @@ class Service(rpc_service.Service):
# Domain Methods # Domain Methods
def create_domain(self, context, values): def create_domain(self, context, values):
if 'tenant_id' not in values: values['tenant_id'] = context.effective_tenant_id
values['tenant_id'] = None
target = {'tenant_id': values['tenant_id']} target = {'tenant_id': values['tenant_id']}
policy.check('create_domain', context, target) policy.check('create_domain', context, target)
@ -212,13 +211,13 @@ class Service(rpc_service.Service):
return domain return domain
def get_domains(self, context, criterion=None): def get_domains(self, context, criterion=None):
policy.check('get_domains', context, {'tenant_id': context.tenant_id}) target = {'tenant_id': context.effective_tenant_id}
policy.check('get_domains', context, target)
if criterion is None: if criterion is None:
criterion = {} criterion = {}
if context.tenant_id is not None: criterion['tenant_id'] = context.effective_tenant_id
criterion['tenant_id'] = context.tenant_id
return self.storage_conn.get_domains(context, criterion) return self.storage_conn.get_domains(context, criterion)
@ -236,6 +235,10 @@ class Service(rpc_service.Service):
target = {'domain_id': domain_id, 'tenant_id': domain['tenant_id']} target = {'domain_id': domain_id, 'tenant_id': domain['tenant_id']}
policy.check('update_domain', context, target) policy.check('update_domain', context, target)
if 'tenant_id' in values:
target = {'domain_id': domain_id, 'tenant_id': values['tenant_id']}
policy.check('create_domain', context, target)
if 'name' in values: if 'name' in values:
# Ensure the domain does not end with a reserved suffix. # Ensure the domain does not end with a reserved suffix.
self._check_reserved_domain_suffixes(context, values['name']) self._check_reserved_domain_suffixes(context, values['name'])

View File

@ -15,6 +15,10 @@
# under the License. # under the License.
import itertools import itertools
from moniker.openstack.common import context from moniker.openstack.common import context
from moniker.openstack.common import log as logging
from moniker import policy
LOG = logging.getLogger(__name__)
class MonikerContext(context.RequestContext): class MonikerContext(context.RequestContext):
@ -32,14 +36,31 @@ class MonikerContext(context.RequestContext):
self.user_id = user self.user_id = user
self.tenant_id = tenant self.tenant_id = tenant
self.effective_tenant_id = self.tenant_id
self.roles = roles self.roles = roles
def sudo(self, tenant_id):
# We use exc=None here since the context is built early in the request
# lifecycle, outside of our ordinary error handling.
# For now, we silently ignore failed sudo requests.
allowed_sudo = policy.check('use_sudo', self, {'tenant_id': tenant_id},
exc=None)
if allowed_sudo:
LOG.warn('Accepted sudo from user_id %s for tenant_id %s'
% (self.user_id, tenant_id))
self.effective_tenant_id = tenant_id
else:
LOG.warn('Rejected sudo from user_id %s for tenant_id %s'
% (self.user_id, tenant_id))
def to_dict(self): def to_dict(self):
d = super(MonikerContext, self).to_dict() d = super(MonikerContext, self).to_dict()
d.update({ d.update({
'user_id': self.user_id, 'user_id': self.user_id,
'tenant_id': self.tenant_id, 'tenant_id': self.tenant_id,
'effective_tenant_id': self.effective_tenant_id,
'roles': self.roles, 'roles': self.roles,
}) })
@ -47,7 +68,7 @@ class MonikerContext(context.RequestContext):
@classmethod @classmethod
def get_admin_context(cls): def get_admin_context(cls):
return cls(None, tenant=None, is_admin=True) return cls(None, tenant=None, is_admin=True, roles=['admin'])
@classmethod @classmethod
def get_context_from_function_and_args(cls, function, args, kwargs): def get_context_from_function_and_args(cls, function, args, kwargs):

View File

@ -4,14 +4,14 @@ use = egg:Paste#urlmap
[composite:osapi_dns_api_v1] [composite:osapi_dns_api_v1]
use = call:moniker.api.auth:pipeline_factory use = call:moniker.api.auth:pipeline_factory
noauth = noauth osapi_dns_app_v1 noauth = noauthcontext osapi_dns_app_v1
keystone = authtoken keystonecontext osapi_dns_app_v1 keystone = authtoken keystonecontext osapi_dns_app_v1
[app:osapi_dns_app_v1] [app:osapi_dns_app_v1]
paste.app_factory = moniker.api.v1:factory paste.app_factory = moniker.api.v1:factory
[filter:noauth] [filter:noauthcontext]
paste.filter_factory = moniker.api.auth:NoAuthMiddleware.factory paste.filter_factory = moniker.api.auth:NoAuthContextMiddleware.factory
[filter:keystonecontext] [filter:keystonecontext]
paste.filter_factory = moniker.api.auth:KeystoneContextMiddleware.factory paste.filter_factory = moniker.api.auth:KeystoneContextMiddleware.factory

View File

@ -17,14 +17,15 @@ from moniker.tests.test_api import ApiTestCase
from moniker.api import auth from moniker.api import auth
class FakeRequest(object):
headers = {}
environ = {}
class KeystoneContextMiddlewareTest(ApiTestCase): class KeystoneContextMiddlewareTest(ApiTestCase):
__test__ = True __test__ = True
def test_process_request(self): def test_process_request(self):
class FakeRequest(object):
headers = {}
environ = {}
app = {} app = {}
middleware = auth.KeystoneContextMiddleware(app) middleware = auth.KeystoneContextMiddleware(app)
@ -50,17 +51,44 @@ class KeystoneContextMiddlewareTest(ApiTestCase):
self.assertEqual('TenantID', context.tenant_id) self.assertEqual('TenantID', context.tenant_id)
self.assertEqual(['admin', 'Member'], context.roles) self.assertEqual(['admin', 'Member'], context.roles)
def test_process_request_sudo(self):
# Set the policy to accept the authz
self.policy({'use_sudo': '@'})
class NoAuthMiddlewareTest(ApiTestCase): app = {}
middleware = auth.KeystoneContextMiddleware(app)
request = FakeRequest()
request.headers = {
'X-Auth-Token': 'AuthToken',
'X-User-ID': 'UserID',
'X-Tenant-ID': 'TenantID',
'X-Roles': 'admin,Member',
'X-Moniker-Tenant-ID': 'SudoTenantID'
}
# Process the request
middleware.process_request(request)
self.assertIn('context', request.environ)
context = request.environ['context']
self.assertFalse(context.is_admin)
self.assertEqual('AuthToken', context.auth_tok)
self.assertEqual('UserID', context.user_id)
self.assertEqual('TenantID', context.tenant_id)
self.assertEqual('SudoTenantID', context.effective_tenant_id)
self.assertEqual(['admin', 'Member'], context.roles)
class NoAuthContextMiddlewareTest(ApiTestCase):
__test__ = True __test__ = True
def test_process_request(self): def test_process_request(self):
class FakeRequest(object):
headers = {}
environ = {}
app = {} app = {}
middleware = auth.NoAuthMiddleware(app) middleware = auth.NoAuthContextMiddleware(app)
request = FakeRequest() request = FakeRequest()

View File

@ -15,7 +15,7 @@
# under the License. # under the License.
from moniker.openstack.common import log as logging from moniker.openstack.common import log as logging
from moniker.api.v1 import factory from moniker.api.v1 import factory
from moniker.api.auth import NoAuthMiddleware from moniker.api.auth import NoAuthContextMiddleware
from moniker.tests.test_api import ApiTestCase from moniker.tests.test_api import ApiTestCase
@ -32,7 +32,7 @@ class ApiV1Test(ApiTestCase):
self.app = factory({}) self.app = factory({})
# Inject the NoAuth middleware # Inject the NoAuth middleware
self.app.wsgi_app = NoAuthMiddleware(self.app.wsgi_app) self.app.wsgi_app = NoAuthContextMiddleware(self.app.wsgi_app)
# Obtain a test client # Obtain a test client
self.client = self.app.test_client() self.client = self.app.test_client()

View File

@ -0,0 +1,39 @@
# Copyright 2012 Managed I.T.
#
# Author: Kiall Mac Innes <kiall@managedit.ie>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from moniker.tests import TestCase
from moniker import context
class TestMonikerContext(TestCase):
def test_sudo(self):
# Set the policy to accept the authz
self.policy({'use_sudo': '@'})
ctxt = context.MonikerContext(tenant='original')
ctxt.sudo('effective')
self.assertEqual('original', ctxt.tenant_id)
self.assertEqual('effective', ctxt.effective_tenant_id)
def test_sudo_fail(self):
# Set the policy to deny the authz
self.policy({'use_sudo': '!'})
ctxt = context.MonikerContext(tenant='original')
ctxt.sudo('effective')
self.assertEqual('original', ctxt.tenant_id)
self.assertEqual('original', ctxt.effective_tenant_id)