Merge "Refactor gateway auth layer for metadef APIs"

This commit is contained in:
Zuul
2021-08-10 17:48:40 +00:00
committed by Gerrit Code Review
2 changed files with 320 additions and 94 deletions

View File

@@ -161,107 +161,189 @@ class Gateway(object):
image_factory,
admin_repo=admin_repo)
def get_metadef_namespace_factory(self, context):
ns_factory = glance.domain.MetadefNamespaceFactory()
policy_ns_factory = policy.MetadefNamespaceFactoryProxy(
ns_factory, context, self.policy)
notifier_ns_factory = glance.notifier.MetadefNamespaceFactoryProxy(
policy_ns_factory, context, self.notifier)
authorized_ns_factory = authorization.MetadefNamespaceFactoryProxy(
notifier_ns_factory, context)
return authorized_ns_factory
def get_metadef_namespace_factory(self, context,
authorization_layer=True):
factory = glance.domain.MetadefNamespaceFactory()
if authorization_layer:
factory = policy.MetadefNamespaceFactoryProxy(
factory, context, self.policy)
factory = glance.notifier.MetadefNamespaceFactoryProxy(
factory, context, self.notifier)
if authorization_layer:
factory = authorization.MetadefNamespaceFactoryProxy(
factory, context)
return factory
def get_metadef_namespace_repo(self, context):
ns_repo = glance.db.MetadefNamespaceRepo(context, self.db_api)
policy_ns_repo = policy.MetadefNamespaceRepoProxy(
ns_repo, context, self.policy)
notifier_ns_repo = glance.notifier.MetadefNamespaceRepoProxy(
policy_ns_repo, context, self.notifier)
authorized_ns_repo = authorization.MetadefNamespaceRepoProxy(
notifier_ns_repo, context)
return authorized_ns_repo
def get_metadef_namespace_repo(self, context, authorization_layer=True):
"""Get the layered NamespaceRepo model.
def get_metadef_object_factory(self, context):
object_factory = glance.domain.MetadefObjectFactory()
policy_object_factory = policy.MetadefObjectFactoryProxy(
object_factory, context, self.policy)
notifier_object_factory = glance.notifier.MetadefObjectFactoryProxy(
policy_object_factory, context, self.notifier)
authorized_object_factory = authorization.MetadefObjectFactoryProxy(
notifier_object_factory, context)
return authorized_object_factory
This is where we construct the "the onion" by layering
NamespaceRepo models on top of each other, starting with the DB at
the bottom.
def get_metadef_object_repo(self, context):
object_repo = glance.db.MetadefObjectRepo(context, self.db_api)
policy_object_repo = policy.MetadefObjectRepoProxy(
object_repo, context, self.policy)
notifier_object_repo = glance.notifier.MetadefObjectRepoProxy(
policy_object_repo, context, self.notifier)
authorized_object_repo = authorization.MetadefObjectRepoProxy(
notifier_object_repo, context)
return authorized_object_repo
:param context: The RequestContext
:param authorization_layer: Controls whether or not we add the legacy
glance.authorization and glance.policy
layers.
:returns: An NamespaceRepo-like object
"""
repo = glance.db.MetadefNamespaceRepo(context, self.db_api)
if authorization_layer:
repo = policy.MetadefNamespaceRepoProxy(
repo, context, self.policy)
repo = glance.notifier.MetadefNamespaceRepoProxy(
repo, context, self.notifier)
if authorization_layer:
repo = authorization.MetadefNamespaceRepoProxy(
repo, context)
return repo
def get_metadef_resource_type_factory(self, context):
resource_type_factory = glance.domain.MetadefResourceTypeFactory()
policy_resource_type_factory = policy.MetadefResourceTypeFactoryProxy(
resource_type_factory, context, self.policy)
notifier_resource_type_factory = (
glance.notifier.MetadefResourceTypeFactoryProxy(
policy_resource_type_factory, context, self.notifier)
)
authorized_resource_type_factory = (
authorization.MetadefResourceTypeFactoryProxy(
notifier_resource_type_factory, context)
)
return authorized_resource_type_factory
def get_metadef_object_factory(self, context,
authorization_layer=True):
factory = glance.domain.MetadefObjectFactory()
if authorization_layer:
factory = policy.MetadefObjectFactoryProxy(
factory, context, self.policy)
factory = glance.notifier.MetadefObjectFactoryProxy(
factory, context, self.notifier)
if authorization_layer:
factory = authorization.MetadefObjectFactoryProxy(
factory, context)
return factory
def get_metadef_resource_type_repo(self, context):
resource_type_repo = glance.db.MetadefResourceTypeRepo(
def get_metadef_object_repo(self, context, authorization_layer=True):
"""Get the layered MetadefObjectRepo model.
This is where we construct the "the onion" by layering
MetadefObjectRepo models on top of each other, starting with the DB at
the bottom.
:param context: The RequestContext
:param authorization_layer: Controls whether or not we add the legacy
glance.authorization and glance.policy
layers.
:returns: An MetadefObjectRepo-like object
"""
repo = glance.db.MetadefObjectRepo(context, self.db_api)
if authorization_layer:
repo = policy.MetadefObjectRepoProxy(
repo, context, self.policy)
repo = glance.notifier.MetadefObjectRepoProxy(
repo, context, self.notifier)
if authorization_layer:
repo = authorization.MetadefObjectRepoProxy(
repo, context)
return repo
def get_metadef_resource_type_factory(self, context,
authorization_layer=True):
factory = glance.domain.MetadefResourceTypeFactory()
if authorization_layer:
factory = policy.MetadefResourceTypeFactoryProxy(
factory, context, self.policy)
factory = glance.notifier.MetadefResourceTypeFactoryProxy(
factory, context, self.notifier)
if authorization_layer:
factory = authorization.MetadefResourceTypeFactoryProxy(
factory, context)
return factory
def get_metadef_resource_type_repo(self, context,
authorization_layer=True):
"""Get the layered MetadefResourceTypeRepo model.
This is where we construct the "the onion" by layering
MetadefResourceTypeRepo models on top of each other, starting with
the DB at the bottom.
:param context: The RequestContext
:param authorization_layer: Controls whether or not we add the legacy
glance.authorization and glance.policy
layers.
:returns: An MetadefResourceTypeRepo-like object
"""
repo = glance.db.MetadefResourceTypeRepo(
context, self.db_api)
policy_object_repo = policy.MetadefResourceTypeRepoProxy(
resource_type_repo, context, self.policy)
notifier_object_repo = glance.notifier.MetadefResourceTypeRepoProxy(
policy_object_repo, context, self.notifier)
authorized_object_repo = authorization.MetadefResourceTypeRepoProxy(
notifier_object_repo, context)
return authorized_object_repo
if authorization_layer:
repo = policy.MetadefResourceTypeRepoProxy(
repo, context, self.policy)
repo = glance.notifier.MetadefResourceTypeRepoProxy(
repo, context, self.notifier)
if authorization_layer:
repo = authorization.MetadefResourceTypeRepoProxy(
repo, context)
return repo
def get_metadef_property_factory(self, context):
prop_factory = glance.domain.MetadefPropertyFactory()
policy_prop_factory = policy.MetadefPropertyFactoryProxy(
prop_factory, context, self.policy)
notifier_prop_factory = glance.notifier.MetadefPropertyFactoryProxy(
policy_prop_factory, context, self.notifier)
authorized_prop_factory = authorization.MetadefPropertyFactoryProxy(
notifier_prop_factory, context)
return authorized_prop_factory
def get_metadef_property_factory(self, context,
authorization_layer=True):
factory = glance.domain.MetadefPropertyFactory()
if authorization_layer:
factory = policy.MetadefPropertyFactoryProxy(
factory, context, self.policy)
factory = glance.notifier.MetadefPropertyFactoryProxy(
factory, context, self.notifier)
if authorization_layer:
factory = authorization.MetadefPropertyFactoryProxy(
factory, context)
return factory
def get_metadef_property_repo(self, context):
prop_repo = glance.db.MetadefPropertyRepo(context, self.db_api)
policy_prop_repo = policy.MetadefPropertyRepoProxy(
prop_repo, context, self.policy)
notifier_prop_repo = glance.notifier.MetadefPropertyRepoProxy(
policy_prop_repo, context, self.notifier)
authorized_prop_repo = authorization.MetadefPropertyRepoProxy(
notifier_prop_repo, context)
return authorized_prop_repo
def get_metadef_property_repo(self, context, authorization_layer=True):
"""Get the layered MetadefPropertyRepo model.
def get_metadef_tag_factory(self, context):
tag_factory = glance.domain.MetadefTagFactory()
policy_tag_factory = policy.MetadefTagFactoryProxy(
tag_factory, context, self.policy)
notifier_tag_factory = glance.notifier.MetadefTagFactoryProxy(
policy_tag_factory, context, self.notifier)
authorized_tag_factory = authorization.MetadefTagFactoryProxy(
notifier_tag_factory, context)
return authorized_tag_factory
This is where we construct the "the onion" by layering
MetadefPropertyRepo models on top of each other, starting with
the DB at the bottom.
def get_metadef_tag_repo(self, context):
tag_repo = glance.db.MetadefTagRepo(context, self.db_api)
policy_tag_repo = policy.MetadefTagRepoProxy(
tag_repo, context, self.policy)
notifier_tag_repo = glance.notifier.MetadefTagRepoProxy(
policy_tag_repo, context, self.notifier)
authorized_tag_repo = authorization.MetadefTagRepoProxy(
notifier_tag_repo, context)
return authorized_tag_repo
:param context: The RequestContext
:param authorization_layer: Controls whether or not we add the legacy
glance.authorization and glance.policy
layers.
:returns: An MetadefPropertyRepo-like object
"""
repo = glance.db.MetadefPropertyRepo(context, self.db_api)
if authorization_layer:
repo = policy.MetadefPropertyRepoProxy(
repo, context, self.policy)
repo = glance.notifier.MetadefPropertyRepoProxy(
repo, context, self.notifier)
if authorization_layer:
repo = authorization.MetadefPropertyRepoProxy(
repo, context)
return repo
def get_metadef_tag_factory(self, context,
authorization_layer=True):
factory = glance.domain.MetadefTagFactory()
if authorization_layer:
factory = policy.MetadefTagFactoryProxy(
factory, context, self.policy)
factory = glance.notifier.MetadefTagFactoryProxy(
factory, context, self.notifier)
if authorization_layer:
factory = authorization.MetadefTagFactoryProxy(
factory, context)
return factory
def get_metadef_tag_repo(self, context, authorization_layer=True):
"""Get the layered MetadefTagRepo model.
This is where we construct the "the onion" by layering
MetadefTagRepo models on top of each other, starting with
the DB at the bottom.
:param context: The RequestContext
:param authorization_layer: Controls whether or not we add the legacy
glance.authorization and glance.policy
layers.
:returns: An MetadefTagRepo-like object
"""
repo = glance.db.MetadefTagRepo(context, self.db_api)
if authorization_layer:
repo = policy.MetadefTagRepoProxy(
repo, context, self.policy)
repo = glance.notifier.MetadefTagRepoProxy(
repo, context, self.notifier)
if authorization_layer:
repo = authorization.MetadefTagRepoProxy(
repo, context)
return repo

View File

@@ -113,3 +113,147 @@ class TestGateway(test_utils.BaseTestCase):
image = repo.get(unit_test_utils.UUID1)
# We are a member, so member is our tenant id
self.assertEqual(unit_test_utils.TENANT2, image.member)
@mock.patch('glance.api.policy.MetadefNamespaceRepoProxy')
def test_get_namespace_repo(self, mock_proxy):
repo = self.gateway.get_metadef_namespace_repo(self.context)
self.assertIsInstance(repo, authorization.MetadefNamespaceRepoProxy)
mock_proxy.assert_called_once_with(mock.ANY, mock.sentinel.context,
mock.ANY)
@mock.patch('glance.api.policy.MetadefNamespaceFactoryProxy')
def test_get_namespace_repo_without_auth(self, mock_proxy):
repo = self.gateway.get_metadef_namespace_repo(
self.context, authorization_layer=False)
self.assertIsInstance(repo, notifier.MetadefNamespaceRepoProxy)
mock_proxy.assert_not_called()
@mock.patch('glance.api.policy.MetadefNamespaceFactoryProxy')
def test_get_namespace_factory(self, mock_proxy):
repo = self.gateway.get_metadef_namespace_factory(self.context)
self.assertIsInstance(repo,
authorization.MetadefNamespaceFactoryProxy)
mock_proxy.assert_called_once_with(mock.ANY, mock.sentinel.context,
mock.ANY)
@mock.patch('glance.api.policy.MetadefNamespaceFactoryProxy')
def test_get_namespace_factory_without_auth(self, mock_proxy):
repo = self.gateway.get_metadef_namespace_factory(
self.context, authorization_layer=False)
self.assertIsInstance(repo, notifier.MetadefNamespaceFactoryProxy)
mock_proxy.assert_not_called()
@mock.patch('glance.api.policy.MetadefObjectRepoProxy')
def test_get_object_repo(self, mock_proxy):
repo = self.gateway.get_metadef_object_repo(self.context)
self.assertIsInstance(repo, authorization.MetadefObjectRepoProxy)
mock_proxy.assert_called_once_with(mock.ANY, mock.sentinel.context,
mock.ANY)
@mock.patch('glance.api.policy.MetadefObjectRepoProxy')
def test_get_object_repo_without_auth(self, mock_proxy):
repo = self.gateway.get_metadef_object_repo(
self.context, authorization_layer=False)
self.assertIsInstance(repo, notifier.MetadefObjectRepoProxy)
mock_proxy.assert_not_called()
@mock.patch('glance.api.policy.MetadefObjectFactoryProxy')
def test_get_object_factory(self, mock_proxy):
repo = self.gateway.get_metadef_object_factory(self.context)
self.assertIsInstance(repo,
authorization.MetadefObjectFactoryProxy)
mock_proxy.assert_called_once_with(mock.ANY, mock.sentinel.context,
mock.ANY)
@mock.patch('glance.api.policy.MetadefObjectFactoryProxy')
def test_get_object_factory_without_auth(self, mock_proxy):
repo = self.gateway.get_metadef_object_factory(
self.context, authorization_layer=False)
self.assertIsInstance(repo, notifier.MetadefObjectFactoryProxy)
mock_proxy.assert_not_called()
@mock.patch('glance.api.policy.MetadefResourceTypeRepoProxy')
def test_get_resourcetype_repo(self, mock_proxy):
repo = self.gateway.get_metadef_resource_type_repo(self.context)
self.assertIsInstance(repo, authorization.MetadefResourceTypeRepoProxy)
mock_proxy.assert_called_once_with(mock.ANY, mock.sentinel.context,
mock.ANY)
@mock.patch('glance.api.policy.MetadefResourceTypeRepoProxy')
def test_get_resourcetype_repo_without_auth(self, mock_proxy):
repo = self.gateway.get_metadef_resource_type_repo(
self.context, authorization_layer=False)
self.assertIsInstance(repo, notifier.MetadefResourceTypeRepoProxy)
mock_proxy.assert_not_called()
@mock.patch('glance.api.policy.MetadefResourceTypeFactoryProxy')
def test_get_resource_type_factory(self, mock_proxy):
repo = self.gateway.get_metadef_resource_type_factory(self.context)
self.assertIsInstance(repo,
authorization.MetadefResourceTypeFactoryProxy)
mock_proxy.assert_called_once_with(mock.ANY, mock.sentinel.context,
mock.ANY)
@mock.patch('glance.api.policy.MetadefResourceTypeFactoryProxy')
def test_get_resource_type_factory_without_auth(self, mock_proxy):
repo = self.gateway.get_metadef_resource_type_factory(
self.context, authorization_layer=False)
self.assertIsInstance(repo, notifier.MetadefResourceTypeFactoryProxy)
mock_proxy.assert_not_called()
@mock.patch('glance.api.policy.MetadefPropertyRepoProxy')
def test_get_property_repo(self, mock_proxy):
repo = self.gateway.get_metadef_property_repo(self.context)
self.assertIsInstance(repo,
authorization.MetadefPropertyRepoProxy)
mock_proxy.assert_called_once_with(mock.ANY, mock.sentinel.context,
mock.ANY)
@mock.patch('glance.api.policy.MetadefPropertyRepoProxy')
def test_get_property_repo_without_auth(self, mock_proxy):
repo = self.gateway.get_metadef_property_repo(
self.context, authorization_layer=False)
self.assertIsInstance(repo, notifier.MetadefPropertyRepoProxy)
mock_proxy.assert_not_called()
@mock.patch('glance.api.policy.MetadefPropertyFactoryProxy')
def test_get_property_factory(self, mock_proxy):
repo = self.gateway.get_metadef_property_factory(self.context)
self.assertIsInstance(repo, authorization.MetadefPropertyFactoryProxy)
mock_proxy.assert_called_once_with(mock.ANY, mock.sentinel.context,
mock.ANY)
@mock.patch('glance.api.policy.MetadefPropertyFactoryProxy')
def test_get_property_factory_without_auth(self, mock_proxy):
repo = self.gateway.get_metadef_property_factory(
self.context, authorization_layer=False)
self.assertIsInstance(repo, notifier.MetadefPropertyFactoryProxy)
mock_proxy.assert_not_called()
@mock.patch('glance.api.policy.MetadefTagRepoProxy')
def test_get_tag_repo(self, mock_proxy):
repo = self.gateway.get_metadef_tag_repo(self.context)
self.assertIsInstance(repo, authorization.MetadefTagRepoProxy)
mock_proxy.assert_called_once_with(mock.ANY, mock.sentinel.context,
mock.ANY)
@mock.patch('glance.api.policy.MetadefTagRepoProxy')
def test_get_tag_repo_without_auth(self, mock_proxy):
repo = self.gateway.get_metadef_tag_repo(
self.context, authorization_layer=False)
self.assertIsInstance(repo, notifier.MetadefTagRepoProxy)
mock_proxy.assert_not_called()
@mock.patch('glance.api.policy.MetadefTagFactoryProxy')
def test_get_tag_factory(self, mock_proxy):
repo = self.gateway.get_metadef_tag_factory(self.context)
self.assertIsInstance(repo, authorization.MetadefTagFactoryProxy)
mock_proxy.assert_called_once_with(mock.ANY, mock.sentinel.context,
mock.ANY)
@mock.patch('glance.api.policy.MetadefTagFactoryProxy')
def test_get_tag_factory_without_auth(self, mock_proxy):
repo = self.gateway.get_metadef_tag_factory(
self.context, authorization_layer=False)
self.assertIsInstance(repo, notifier.MetadefTagFactoryProxy)
mock_proxy.assert_not_called()