Merge "Allow global admins to operate sd resources from other projects"

This commit is contained in:
Zuul 2019-01-24 08:19:17 +00:00 committed by Gerrit Code Review
commit f63e726ad5
2 changed files with 39 additions and 11 deletions

View File

@ -1192,7 +1192,7 @@ def software_deployment_create(context, values):
def software_deployment_get(context, deployment_id): def software_deployment_get(context, deployment_id):
result = context.session.query( result = context.session.query(
models.SoftwareDeployment).get(deployment_id) models.SoftwareDeployment).get(deployment_id)
if (result is not None and context is not None and if (result is not None and context is not None and not context.is_admin and
context.tenant_id not in (result.tenant, context.tenant_id not in (result.tenant,
result.stack_user_project_id)): result.stack_user_project_id)):
result = None result = None
@ -1205,14 +1205,14 @@ def software_deployment_get(context, deployment_id):
def software_deployment_get_all(context, server_id=None): def software_deployment_get_all(context, server_id=None):
sd = models.SoftwareDeployment sd = models.SoftwareDeployment
query = context.session.query( query = context.session.query(sd).order_by(sd.created_at)
sd if not context.is_admin:
).filter(sqlalchemy.or_( query = query.filter(sqlalchemy.or_(
sd.tenant == context.tenant_id, sd.tenant == context.tenant_id,
sd.stack_user_project_id == context.tenant_id) sd.stack_user_project_id == context.tenant_id))
).order_by(sd.created_at)
if server_id: if server_id:
query = query.filter_by(server_id=server_id) query = query.filter_by(server_id=server_id)
return query.all() return query.all()

View File

@ -1191,6 +1191,12 @@ class SqlAlchemyTest(common.HeatTestCase):
self.assertIsNotNone(deployment) self.assertIsNotNone(deployment)
self.assertEqual(values['tenant'], deployment.tenant) self.assertEqual(values['tenant'], deployment.tenant)
# admin can get the deployments
admin_ctx = utils.dummy_context(is_admin=True,
tenant_id='admin_tenant')
deployment = db_api.software_deployment_get(admin_ctx, deployment_id)
self.assertIsNotNone(deployment)
def test_software_deployment_get_all(self): def test_software_deployment_get_all(self):
self.assertEqual([], db_api.software_deployment_get_all(self.ctx)) self.assertEqual([], db_api.software_deployment_get_all(self.ctx))
values = self._deployment_values() values = self._deployment_values()
@ -1206,6 +1212,11 @@ class SqlAlchemyTest(common.HeatTestCase):
deployments = db_api.software_deployment_get_all( deployments = db_api.software_deployment_get_all(
self.ctx, server_id=str(uuid.uuid4())) self.ctx, server_id=str(uuid.uuid4()))
self.assertEqual([], deployments) self.assertEqual([], deployments)
# admin can get the deployments of other tenants
admin_ctx = utils.dummy_context(is_admin=True,
tenant_id='admin_tenant')
deployments = db_api.software_deployment_get_all(admin_ctx)
self.assertEqual(1, len(deployments))
def test_software_deployment_update(self): def test_software_deployment_update(self):
deployment_id = str(uuid.uuid4()) deployment_id = str(uuid.uuid4())
@ -1221,8 +1232,15 @@ class SqlAlchemyTest(common.HeatTestCase):
self.ctx, deployment_id, values) self.ctx, deployment_id, values)
self.assertIsNotNone(deployment) self.assertIsNotNone(deployment)
self.assertEqual(values['status'], deployment.status) self.assertEqual(values['status'], deployment.status)
admin_ctx = utils.dummy_context(is_admin=True,
tenant_id='admin_tenant')
values = {'status': 'FAILED'}
deployment = db_api.software_deployment_update(
admin_ctx, deployment_id, values)
self.assertIsNotNone(deployment)
self.assertEqual(values['status'], deployment.status)
def test_software_deployment_delete(self): def _test_software_deployment_delete(self, test_ctx=None):
deployment_id = str(uuid.uuid4()) deployment_id = str(uuid.uuid4())
err = self.assertRaises(exception.NotFound, err = self.assertRaises(exception.NotFound,
db_api.software_deployment_delete, db_api.software_deployment_delete,
@ -1231,18 +1249,28 @@ class SqlAlchemyTest(common.HeatTestCase):
values = self._deployment_values() values = self._deployment_values()
deployment = db_api.software_deployment_create(self.ctx, values) deployment = db_api.software_deployment_create(self.ctx, values)
deployment_id = deployment.id deployment_id = deployment.id
deployment = db_api.software_deployment_get(self.ctx, deployment_id) test_ctx = test_ctx or self.ctx
deployment = db_api.software_deployment_get(test_ctx, deployment_id)
self.assertIsNotNone(deployment) self.assertIsNotNone(deployment)
db_api.software_deployment_delete(self.ctx, deployment_id) db_api.software_deployment_delete(test_ctx, deployment_id)
err = self.assertRaises( err = self.assertRaises(
exception.NotFound, exception.NotFound,
db_api.software_deployment_get, db_api.software_deployment_get,
self.ctx, test_ctx,
deployment_id) deployment_id)
self.assertIn(deployment_id, six.text_type(err)) self.assertIn(deployment_id, six.text_type(err))
def test_software_deployment_delete(self):
self._test_software_deployment_delete()
def test_software_deployment_delete_by_admin(self):
admin_ctx = utils.dummy_context(is_admin=True,
tenant_id='admin_tenant')
self._test_software_deployment_delete(test_ctx=admin_ctx)
def test_snapshot_create(self): def test_snapshot_create(self):
template = create_raw_template(self.ctx) template = create_raw_template(self.ctx)
user_creds = create_user_creds(self.ctx) user_creds = create_user_creds(self.ctx)