Merge "Fixed backups GET and DELETE to restrict access to the owner of backup"

This commit is contained in:
Jenkins 2013-06-28 14:46:16 +00:00 committed by Gerrit Code Review
commit eafc62f61b
9 changed files with 48 additions and 18 deletions

View File

@ -99,7 +99,7 @@ class Backup(object):
return query.first() return query.first()
@classmethod @classmethod
def get_by_id(cls, backup_id, deleted=False): def get_by_id(cls, context, backup_id, deleted=False):
""" """
get the backup for that id get the backup for that id
:param cls: :param cls:
@ -108,7 +108,9 @@ class Backup(object):
:return: :return:
""" """
try: try:
db_info = DBBackup.find_by(id=backup_id, deleted=deleted) db_info = DBBackup.find_by(context=context,
id=backup_id,
deleted=deleted)
return db_info return db_info
except exception.NotFound: except exception.NotFound:
raise exception.NotFound(uuid=backup_id) raise exception.NotFound(uuid=backup_id)
@ -148,7 +150,7 @@ class Backup(object):
""" """
def _delete_resources(): def _delete_resources():
backup = cls.get_by_id(backup_id) backup = cls.get_by_id(context, backup_id)
if backup.is_running: if backup.is_running:
msg = ("Backup %s cannot be delete because it is running." % msg = ("Backup %s cannot be delete because it is running." %
backup_id) backup_id)

View File

@ -45,7 +45,8 @@ class BackupController(wsgi.Controller):
"""Return a single backup.""" """Return a single backup."""
LOG.info(_("Showing a backup for tenant '%s'") % tenant_id) LOG.info(_("Showing a backup for tenant '%s'") % tenant_id)
LOG.info(_("id : '%s'\n\n") % id) LOG.info(_("id : '%s'\n\n") % id)
backup = Backup.get_by_id(id) context = req.environ[wsgi.CONTEXT_KEY]
backup = Backup.get_by_id(context, id)
return wsgi.Result(views.BackupView(backup).data(), 200) return wsgi.Result(views.BackupView(backup).data(), 200)
def create(self, req, body, tenant_id): def create(self, req, body, tenant_id):

View File

@ -89,11 +89,20 @@ class DatabaseModelBase(models.ModelBase):
self[k] = v self[k] = v
@classmethod @classmethod
def find_by(cls, **conditions): def find_by(cls, context=None, **conditions):
model = cls.get_by(**conditions) model = cls.get_by(**conditions)
if model is None: if model is None:
raise exception.ModelNotFoundError(_("%s Not Found") % raise exception.ModelNotFoundError(_("%s Not Found") %
cls.__name__) cls.__name__)
if ((context and not context.is_admin and hasattr(model, 'tenant_id')
and model.tenant_id != context.tenant)):
LOG.error("Tenant %s tried to access %s, owned by %s."
% (context.tenant, cls.__name__, model.tenant_id))
raise exception.ModelNotFoundError(_("%s Not Found") %
cls.__name__)
return model return model
@classmethod @classmethod

View File

@ -247,13 +247,9 @@ def get_db_info(context, id):
elif id is None: elif id is None:
raise TypeError("Argument id not defined.") raise TypeError("Argument id not defined.")
try: try:
db_info = DBInstance.find_by(id=id, deleted=False) db_info = DBInstance.find_by(context=context, id=id, deleted=False)
except exception.NotFound: except exception.NotFound:
raise exception.NotFound(uuid=id) raise exception.NotFound(uuid=id)
if not context.is_admin and db_info.tenant_id != context.tenant:
LOG.error("Tenant %s tried to access instance %s, owned by %s."
% (context.tenant, id, db_info.tenant_id))
raise exception.NotFound(uuid=id)
return db_info return db_info
@ -438,7 +434,7 @@ class Instance(BuiltInstance):
security_groups = None security_groups = None
if backup_id is not None: if backup_id is not None:
backup_info = Backup.get_by_id(backup_id) backup_info = Backup.get_by_id(context, backup_id)
if backup_info.is_running: if backup_info.is_running:
raise exception.BackupNotCompleteError(backup_id=backup_id) raise exception.BackupNotCompleteError(backup_id=backup_id)

View File

@ -550,7 +550,7 @@ class BackupTasks(object):
@classmethod @classmethod
def delete_backup(cls, context, backup_id): def delete_backup(cls, context, backup_id):
#delete backup from swift #delete backup from swift
backup = trove.backup.models.Backup.get_by_id(backup_id) backup = trove.backup.models.Backup.get_by_id(context, backup_id)
try: try:
filename = backup.filename filename = backup.filename
if filename: if filename:

View File

@ -15,17 +15,18 @@
from proboscis.asserts import assert_equal from proboscis.asserts import assert_equal
from proboscis.asserts import assert_not_equal from proboscis.asserts import assert_not_equal
from proboscis.asserts import assert_raises from proboscis.asserts import assert_raises
from proboscis.asserts import fail
from proboscis import test from proboscis import test
from proboscis import SkipTest from proboscis import SkipTest
from proboscis.decorators import time_out from proboscis.decorators import time_out
from trove.tests.util import poll_until from trove.tests.util import poll_until
from trove.tests.util import test_config from trove.tests.util import test_config
from trove.tests.util import create_dbaas_client
from trove.tests.util.users import Requirements
from trove.tests.config import CONFIG
from troveclient import exceptions from troveclient import exceptions
from trove.tests.api.instances import WaitForGuestInstallationToFinish from trove.tests.api.instances import WaitForGuestInstallationToFinish
from trove.tests.api.instances import instance_info from trove.tests.api.instances import instance_info
from trove.tests.api.instances import assert_unprocessable from trove.tests.api.instances import assert_unprocessable
import time
GROUP = "dbaas.api.backups" GROUP = "dbaas.api.backups"
BACKUP_NAME = 'backup_test' BACKUP_NAME = 'backup_test'
@ -145,6 +146,16 @@ class ListBackups(object):
assert_equal(instance_info.id, backup.instance_id) assert_equal(instance_info.id, backup.instance_id)
assert_equal('COMPLETED', backup.status) assert_equal('COMPLETED', backup.status)
# Test to make sure that user in other tenant is not able
# to GET this backup
reqs = Requirements(is_admin=False)
other_user = CONFIG.users.find_user(
reqs,
black_list=[instance_info.user.auth_user])
other_client = create_dbaas_client(other_user)
assert_raises(exceptions.NotFound, other_client.backups.get,
backup_info.id)
@test(runs_after=[ListBackups], @test(runs_after=[ListBackups],
groups=[GROUP]) groups=[GROUP])
@ -230,6 +241,17 @@ class DeleteBackups(object):
@time_out(60 * 2) @time_out(60 * 2)
def test_backup_delete(self): def test_backup_delete(self):
"""test delete""" """test delete"""
# Test to make sure that user in other tenant is not able
# to DELETE this backup
reqs = Requirements(is_admin=False)
other_user = CONFIG.users.find_user(
reqs,
black_list=[instance_info.user.auth_user])
other_client = create_dbaas_client(other_user)
assert_raises(exceptions.NotFound, other_client.backups.delete,
backup_info.id)
instance_info.dbaas.backups.delete(backup_info.id) instance_info.dbaas.backups.delete(backup_info.id)
assert_equal(202, instance_info.dbaas.last_http_code) assert_equal(202, instance_info.dbaas.last_http_code)

View File

@ -273,7 +273,7 @@ class FakeGuest(object):
def create_backup(self, backup_id): def create_backup(self, backup_id):
from trove.backup.models import Backup, BackupState from trove.backup.models import Backup, BackupState
backup = Backup.get_by_id(backup_id) backup = Backup.get_by_id(context=None, backup_id=backup_id)
def finish_create_backup(): def finish_create_backup():
backup.state = BackupState.COMPLETED backup.state = BackupState.COMPLETED

View File

@ -116,14 +116,14 @@ class BackupDeleteTest(testtools.TestCase):
def test_delete_backup_is_running(self): def test_delete_backup_is_running(self):
backup = mock() backup = mock()
backup.is_running = True backup.is_running = True
when(models.Backup).get_by_id(any()).thenReturn(backup) when(models.Backup).get_by_id(any(), any()).thenReturn(backup)
self.assertRaises(exception.UnprocessableEntity, self.assertRaises(exception.UnprocessableEntity,
models.Backup.delete, self.context, 'backup_id') models.Backup.delete, self.context, 'backup_id')
def test_delete_backup_swift_token_invalid(self): def test_delete_backup_swift_token_invalid(self):
backup = mock() backup = mock()
backup.is_running = False backup.is_running = False
when(models.Backup).get_by_id(any()).thenReturn(backup) when(models.Backup).get_by_id(any(), any()).thenReturn(backup)
when(models.Backup).verify_swift_auth_token(any()).thenRaise( when(models.Backup).verify_swift_auth_token(any()).thenRaise(
exception.SwiftAuthError) exception.SwiftAuthError)
self.assertRaises(exception.SwiftAuthError, models.Backup.delete, self.assertRaises(exception.SwiftAuthError, models.Backup.delete,

View File

@ -38,7 +38,7 @@ class BackupTasksTest(testtools.TestCase):
{'name': 'third'}]) {'name': 'third'}])
when(backup_models.Backup).delete(any()).thenReturn(None) when(backup_models.Backup).delete(any()).thenReturn(None)
when(backup_models.Backup).get_by_id( when(backup_models.Backup).get_by_id(
self.backup.id).thenReturn(self.backup) any(), self.backup.id).thenReturn(self.backup)
when(self.backup).delete(any()).thenReturn(None) when(self.backup).delete(any()).thenReturn(None)
self.swift_client = mock() self.swift_client = mock()
when(remote).create_swift_client( when(remote).create_swift_client(