OpenStack Orchestration (Heat)
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

3939 lines
164 KiB

#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import copy
import datetime
import fixtures
import json
import logging
import time
import uuid
import mock
from oslo_config import cfg
from oslo_db import exception as db_exception
from oslo_utils import timeutils
import six
from sqlalchemy.orm import exc
from sqlalchemy.orm import session
from heat.common import context
from heat.common import exception
from heat.common import template_format
from heat.db.sqlalchemy import api as db_api
from heat.db.sqlalchemy import models
from heat.engine.clients.os import glance
from heat.engine.clients.os import nova
from heat.engine import environment
from heat.engine import resource as rsrc
from heat.engine import stack as parser
from heat.engine import template as tmpl
from heat.engine import template_files
from heat.tests import common
from heat.tests.openstack.nova import fakes as fakes_nova
from heat.tests import utils
wp_template = '''
{
"AWSTemplateFormatVersion" : "2010-09-09",
"Description" : "WordPress",
"Parameters" : {
"KeyName" : {
"Description" : "KeyName",
"Type" : "String",
"Default" : "test"
}
},
"Resources" : {
"WebServer": {
"Type": "AWS::EC2::Instance",
"Properties": {
"ImageId" : "F17-x86_64-gold",
"InstanceType" : "m1.large",
"KeyName" : "test",
"UserData" : "wordpress"
}
}
}
}
'''
UUIDs = (UUID1, UUID2, UUID3) = sorted([str(uuid.uuid4())
for x in range(3)])
class SqlAlchemyTest(common.HeatTestCase):
def setUp(self):
super(SqlAlchemyTest, self).setUp()
self.fc = fakes_nova.FakeClient()
self.ctx = utils.dummy_context()
def _mock_get_image_id_success(self, imageId_input, imageId):
self.patchobject(glance.GlanceClientPlugin,
'find_image_by_name_or_id',
return_value=imageId)
def _setup_test_stack(self, stack_name, stack_id=None, owner_id=None,
stack_user_project_id=None, backup=False):
t = template_format.parse(wp_template)
template = tmpl.Template(
t, env=environment.Environment({'KeyName': 'test'}))
stack_id = stack_id or str(uuid.uuid4())
stack = parser.Stack(self.ctx, stack_name, template,
owner_id=owner_id,
stack_user_project_id=stack_user_project_id)
with utils.UUIDStub(stack_id):
stack.store(backup=backup)
return (template, stack)
def _mock_create(self):
self.patchobject(nova.NovaClientPlugin, 'client',
return_value=self.fc)
self._mock_get_image_id_success('F17-x86_64-gold', 744)
self.fc.servers.create = mock.Mock(
return_value=self.fc.servers.list()[4])
return self.fc
def _mock_delete(self):
self.patchobject(self.fc.servers, 'delete',
side_effect=fakes_nova.fake_exception())
@mock.patch.object(db_api, '_paginate_query')
def test_filter_and_page_query_paginates_query(self, mock_paginate_query):
query = mock.Mock()
db_api._filter_and_page_query(self.ctx, query)
self.assertTrue(mock_paginate_query.called)
@mock.patch.object(db_api, '_events_paginate_query')
def test_events_filter_and_page_query(self, mock_events_paginate_query):
query = mock.Mock()
db_api._events_filter_and_page_query(self.ctx, query)
self.assertTrue(mock_events_paginate_query.called)
@mock.patch.object(db_api.utils, 'paginate_query')
def test_events_filter_invalid_sort_key(self, mock_paginate_query):
query = mock.Mock()
class InvalidSortKey(db_api.utils.InvalidSortKey):
@property
def message(_):
self.fail("_events_paginate_query() should not have tried to "
"access .message attribute - it's deprecated in "
"oslo.db and removed from base Exception in Py3K.")
mock_paginate_query.side_effect = InvalidSortKey()
self.assertRaises(exception.Invalid,
db_api._events_filter_and_page_query,
self.ctx, query, sort_keys=['foo'])
@mock.patch.object(db_api.db_filters, 'exact_filter')
def test_filter_and_page_query_handles_no_filters(self, mock_db_filter):
query = mock.Mock()
db_api._filter_and_page_query(self.ctx, query)
mock_db_filter.assert_called_once_with(mock.ANY, mock.ANY, {})
@mock.patch.object(db_api.db_filters, 'exact_filter')
def test_events_filter_and_page_query_handles_no_filters(self,
mock_db_filter):
query = mock.Mock()
db_api._events_filter_and_page_query(self.ctx, query)
mock_db_filter.assert_called_once_with(mock.ANY, mock.ANY, {})
@mock.patch.object(db_api.db_filters, 'exact_filter')
def test_filter_and_page_query_applies_filters(self, mock_db_filter):
query = mock.Mock()
filters = {'foo': 'bar'}
db_api._filter_and_page_query(self.ctx, query, filters=filters)
self.assertTrue(mock_db_filter.called)
@mock.patch.object(db_api.db_filters, 'exact_filter')
def test_events_filter_and_page_query_applies_filters(self,
mock_db_filter):
query = mock.Mock()
filters = {'foo': 'bar'}
db_api._events_filter_and_page_query(self.ctx, query, filters=filters)
self.assertTrue(mock_db_filter.called)
@mock.patch.object(db_api, '_paginate_query')
def test_filter_and_page_query_whitelists_sort_keys(self,
mock_paginate_query):
query = mock.Mock()
sort_keys = ['stack_name', 'foo']
db_api._filter_and_page_query(self.ctx, query, sort_keys=sort_keys)
args, _ = mock_paginate_query.call_args
self.assertIn(['name'], args)
@mock.patch.object(db_api, '_events_paginate_query')
def test_events_filter_and_page_query_whitelists_sort_keys(
self, mock_paginate_query):
query = mock.Mock()
sort_keys = ['event_time', 'foo']
db_api._events_filter_and_page_query(self.ctx, query,
sort_keys=sort_keys)
args, _ = mock_paginate_query.call_args
self.assertIn(['created_at'], args)
@mock.patch.object(db_api.utils, 'paginate_query')
def test_paginate_query_default_sorts_by_created_at_and_id(
self, mock_paginate_query):
query = mock.Mock()
model = mock.Mock()
db_api._paginate_query(self.ctx, query, model, sort_keys=None)
args, _ = mock_paginate_query.call_args
self.assertIn(['created_at', 'id'], args)
@mock.patch.object(db_api.utils, 'paginate_query')
def test_paginate_query_default_sorts_dir_by_desc(self,
mock_paginate_query):
query = mock.Mock()
model = mock.Mock()
db_api._paginate_query(self.ctx, query, model, sort_dir=None)
args, _ = mock_paginate_query.call_args
self.assertIn('desc', args)
@mock.patch.object(db_api.utils, 'paginate_query')
def test_paginate_query_uses_given_sort_plus_id(self,
mock_paginate_query):
query = mock.Mock()
model = mock.Mock()
db_api._paginate_query(self.ctx, query, model, sort_keys=['name'])
args, _ = mock_paginate_query.call_args
self.assertIn(['name', 'id'], args)
@mock.patch.object(db_api.utils, 'paginate_query')
def test_paginate_query_gets_model_marker(self, mock_paginate_query):
query = mock.Mock()
model = mock.Mock()
marker = mock.Mock()
mock_query_object = mock.Mock()
mock_query_object.get.return_value = 'real_marker'
ctx = mock.MagicMock()
ctx.session.query.return_value = mock_query_object
db_api._paginate_query(ctx, query, model, marker=marker)
mock_query_object.get.assert_called_once_with(marker)
args, _ = mock_paginate_query.call_args
self.assertIn('real_marker', args)
@mock.patch.object(db_api.utils, 'paginate_query')
def test_paginate_query_raises_invalid_sort_key(self, mock_paginate_query):
query = mock.Mock()
model = mock.Mock()
class InvalidSortKey(db_api.utils.InvalidSortKey):
@property
def message(_):
self.fail("_paginate_query() should not have tried to access "
".message attribute - it's deprecated in oslo.db "
"and removed from base Exception class in Py3K.")
mock_paginate_query.side_effect = InvalidSortKey()
self.assertRaises(exception.Invalid, db_api._paginate_query,
self.ctx, query, model, sort_keys=['foo'])
def test_get_sort_keys_returns_empty_list_if_no_keys(self):
sort_keys = None
mapping = {}
filtered_keys = db_api._get_sort_keys(sort_keys, mapping)
self.assertEqual([], filtered_keys)
def test_get_sort_keys_whitelists_single_key(self):
sort_key = 'foo'
mapping = {'foo': 'Foo'}
filtered_keys = db_api._get_sort_keys(sort_key, mapping)
self.assertEqual(['Foo'], filtered_keys)
def test_get_sort_keys_whitelists_multiple_keys(self):
sort_keys = ['foo', 'bar', 'nope']
mapping = {'foo': 'Foo', 'bar': 'Bar'}
filtered_keys = db_api._get_sort_keys(sort_keys, mapping)
self.assertIn('Foo', filtered_keys)
self.assertIn('Bar', filtered_keys)
self.assertEqual(2, len(filtered_keys))
def test_encryption(self):
stack_name = 'test_encryption'
stack = self._setup_test_stack(stack_name)[1]
self._mock_create()
stack.create()
stack = parser.Stack.load(self.ctx, stack.id)
cs = stack['WebServer']
cs.data_set('my_secret', 'fake secret', True)
rs = db_api.resource_get_by_name_and_stack(self.ctx,
'WebServer',
stack.id)
encrypted_key = rs.data[0]['value']
self.assertNotEqual(encrypted_key, "fake secret")
# Test private_key property returns decrypted value
self.assertEqual("fake secret", db_api.resource_data_get(
self.ctx, cs.id, 'my_secret'))
# do this twice to verify that the orm does not commit the unencrypted
# value.
self.assertEqual("fake secret", db_api.resource_data_get(
self.ctx, cs.id, 'my_secret'))
self.fc.servers.create.assert_called_once_with(
image=744, flavor=3, key_name='test',
name=mock.ANY,
security_groups=None,
userdata=mock.ANY, scheduler_hints=None,
meta=None, nics=None,
availability_zone=None,
block_device_mapping=None
)
def test_resource_data_delete(self):
stack = self._setup_test_stack('stack', UUID1)[1]
self._mock_create()
stack.create()
stack = parser.Stack.load(self.ctx, stack.id)
resource = stack['WebServer']
resource.data_set('test', 'test_data')
self.assertEqual('test_data', db_api.resource_data_get(
self.ctx, resource.id, 'test'))
db_api.resource_data_delete(self.ctx, resource.id, 'test')
self.assertRaises(exception.NotFound,
db_api.resource_data_get, self.ctx,
resource.id, 'test')
self.fc.servers.create.assert_called_once_with(
image=744, flavor=3, key_name='test',
name=mock.ANY,
security_groups=None,
userdata=mock.ANY, scheduler_hints=None,
meta=None, nics=None,
availability_zone=None,
block_device_mapping=None
)
def test_stack_get_by_name(self):
stack = self._setup_test_stack('stack', UUID1,
stack_user_project_id=UUID2)[1]
st = db_api.stack_get_by_name(self.ctx, 'stack')
self.assertEqual(UUID1, st.id)
self.ctx.tenant = UUID3
st = db_api.stack_get_by_name(self.ctx, 'stack')
self.assertIsNone(st)
self.ctx.tenant = UUID2
st = db_api.stack_get_by_name(self.ctx, 'stack')
self.assertEqual(UUID1, st.id)
stack.delete()
st = db_api.stack_get_by_name(self.ctx, 'stack')
self.assertIsNone(st)
def test_nested_stack_get_by_name(self):
stack1 = self._setup_test_stack('stack1', UUID1)[1]
stack2 = self._setup_test_stack('stack2', UUID2,
owner_id=stack1.id)[1]
result = db_api.stack_get_by_name(self.ctx, 'stack2')
self.assertEqual(UUID2, result.id)
stack2.delete()
result = db_api.stack_get_by_name(self.ctx, 'stack2')
self.assertIsNone(result)
def test_stack_get_by_name_and_owner_id(self):
stack1 = self._setup_test_stack('stack1', UUID1,
stack_user_project_id=UUID3)[1]
stack2 = self._setup_test_stack('stack2', UUID2,
owner_id=stack1.id,
stack_user_project_id=UUID3)[1]
result = db_api.stack_get_by_name_and_owner_id(self.ctx, 'stack2',
None)
self.assertIsNone(result)
result = db_api.stack_get_by_name_and_owner_id(self.ctx, 'stack2',
stack1.id)
self.assertEqual(UUID2, result.id)
self.ctx.tenant = str(uuid.uuid4())
result = db_api.stack_get_by_name_and_owner_id(self.ctx, 'stack2',
None)
self.assertIsNone(result)
self.ctx.tenant = UUID3
result = db_api.stack_get_by_name_and_owner_id(self.ctx, 'stack2',
stack1.id)
self.assertEqual(UUID2, result.id)
stack2.delete()
result = db_api.stack_get_by_name_and_owner_id(self.ctx, 'stack2',
stack1.id)
self.assertIsNone(result)
def test_stack_get(self):
stack = self._setup_test_stack('stack', UUID1)[1]
st = db_api.stack_get(self.ctx, UUID1, show_deleted=False)
self.assertEqual(UUID1, st.id)
stack.delete()
st = db_api.stack_get(self.ctx, UUID1, show_deleted=False)
self.assertIsNone(st)
st = db_api.stack_get(self.ctx, UUID1, show_deleted=True)
self.assertEqual(UUID1, st.id)
def test_stack_get_status(self):
stack = self._setup_test_stack('stack', UUID1)[1]
st = db_api.stack_get_status(self.ctx, UUID1)
self.assertEqual(('CREATE', 'IN_PROGRESS', '', None), st)
stack.delete()
st = db_api.stack_get_status(self.ctx, UUID1)
self.assertEqual(
('DELETE', 'COMPLETE',
'Stack DELETE completed successfully', None),
st)
self.assertRaises(exception.NotFound,
db_api.stack_get_status, self.ctx, UUID2)
def test_stack_get_show_deleted_context(self):
stack = self._setup_test_stack('stack', UUID1)[1]
self.assertFalse(self.ctx.show_deleted)
st = db_api.stack_get(self.ctx, UUID1)
self.assertEqual(UUID1, st.id)
stack.delete()
st = db_api.stack_get(self.ctx, UUID1)
self.assertIsNone(st)
self.ctx.show_deleted = True
st = db_api.stack_get(self.ctx, UUID1)
self.assertEqual(UUID1, st.id)
def test_stack_get_all(self):
stacks = [self._setup_test_stack('stack', x)[1] for x in UUIDs]
st_db = db_api.stack_get_all(self.ctx)
self.assertEqual(3, len(st_db))
stacks[0].delete()
st_db = db_api.stack_get_all(self.ctx)
self.assertEqual(2, len(st_db))
stacks[1].delete()
st_db = db_api.stack_get_all(self.ctx)
self.assertEqual(1, len(st_db))
def test_stack_get_all_show_deleted(self):
stacks = [self._setup_test_stack('stack', x)[1] for x in UUIDs]
st_db = db_api.stack_get_all(self.ctx)
self.assertEqual(3, len(st_db))
stacks[0].delete()
st_db = db_api.stack_get_all(self.ctx)
self.assertEqual(2, len(st_db))
st_db = db_api.stack_get_all(self.ctx, show_deleted=True)
self.assertEqual(3, len(st_db))
def test_stack_get_all_show_nested(self):
stack1 = self._setup_test_stack('stack1', UUID1)[1]
stack2 = self._setup_test_stack('stack2', UUID2,
owner_id=stack1.id)[1]
# Backup stack should not be returned
stack3 = self._setup_test_stack('stack1*', UUID3,
owner_id=stack1.id,
backup=True)[1]
st_db = db_api.stack_get_all(self.ctx)
self.assertEqual(1, len(st_db))
self.assertEqual(stack1.id, st_db[0].id)
st_db = db_api.stack_get_all(self.ctx, show_nested=True)
self.assertEqual(2, len(st_db))
st_ids = [s.id for s in st_db]
self.assertNotIn(stack3.id, st_ids)
self.assertIn(stack1.id, st_ids)
self.assertIn(stack2.id, st_ids)
def test_stack_get_all_with_filters(self):
self._setup_test_stack('foo', UUID1)
self._setup_test_stack('bar', UUID2)
filters = {'name': 'foo'}
results = db_api.stack_get_all(self.ctx,
filters=filters)
self.assertEqual(1, len(results))
self.assertEqual('foo', results[0]['name'])
def test_stack_get_all_filter_matches_in_list(self):
self._setup_test_stack('foo', UUID1)
self._setup_test_stack('bar', UUID2)
filters = {'name': ['bar', 'quux']}
results = db_api.stack_get_all(self.ctx,
filters=filters)
self.assertEqual(1, len(results))
self.assertEqual('bar', results[0]['name'])
def test_stack_get_all_returns_all_if_no_filters(self):
self._setup_test_stack('foo', UUID1)
self._setup_test_stack('bar', UUID2)
filters = None
results = db_api.stack_get_all(self.ctx,
filters=filters)
self.assertEqual(2, len(results))
def test_stack_get_all_default_sort_keys_and_dir(self):
stacks = [self._setup_test_stack('stack', x)[1] for x in UUIDs]
st_db = db_api.stack_get_all(self.ctx)
self.assertEqual(3, len(st_db))
self.assertEqual(stacks[2].id, st_db[0].id)
self.assertEqual(stacks[1].id, st_db[1].id)
self.assertEqual(stacks[0].id, st_db[2].id)
def test_stack_get_all_default_sort_dir(self):
stacks = [self._setup_test_stack('stack', x)[1] for x in UUIDs]
st_db = db_api.stack_get_all(self.ctx, sort_dir='asc')
self.assertEqual(3, len(st_db))
self.assertEqual(stacks[0].id, st_db[0].id)
self.assertEqual(stacks[1].id, st_db[1].id)
self.assertEqual(stacks[2].id, st_db[2].id)
def test_stack_get_all_str_sort_keys(self):
stacks = [self._setup_test_stack('stack', x)[1] for x in UUIDs]
st_db = db_api.stack_get_all(self.ctx,
sort_keys='creation_time')
self.assertEqual(3, len(st_db))
self.assertEqual(stacks[0].id, st_db[0].id)
self.assertEqual(stacks[1].id, st_db[1].id)
self.assertEqual(stacks[2].id, st_db[2].id)
@mock.patch.object(db_api.utils, 'paginate_query')
def test_stack_get_all_filters_sort_keys(self, mock_paginate):
sort_keys = ['stack_name', 'stack_status', 'creation_time',
'updated_time', 'stack_owner']
db_api.stack_get_all(self.ctx, sort_keys=sort_keys)
args = mock_paginate.call_args[0]
used_sort_keys = set(args[3])
expected_keys = set(['name', 'status', 'created_at',
'updated_at', 'id'])
self.assertEqual(expected_keys, used_sort_keys)
def test_stack_get_all_marker(self):
stacks = [self._setup_test_stack('stack', x)[1] for x in UUIDs]
st_db = db_api.stack_get_all(self.ctx, marker=stacks[1].id)
self.assertEqual(1, len(st_db))
self.assertEqual(stacks[0].id, st_db[0].id)
def test_stack_get_all_non_existing_marker(self):
[self._setup_test_stack('stack', x)[1] for x in UUIDs]
uuid = 'this stack doesn\'t exist'
st_db = db_api.stack_get_all(self.ctx, marker=uuid)
self.assertEqual(3, len(st_db))
def test_stack_get_all_doesnt_mutate_sort_keys(self):
[self._setup_test_stack('stack', x)[1] for x in UUIDs]
sort_keys = ['id']
db_api.stack_get_all(self.ctx, sort_keys=sort_keys)
self.assertEqual(['id'], sort_keys)
def test_stack_get_all_hidden_tags(self):
cfg.CONF.set_override('hidden_stack_tags', ['hidden'])
stacks = [self._setup_test_stack('stack', x)[1] for x in UUIDs]
stacks[0].tags = ['hidden']
stacks[0].store()
stacks[1].tags = ['random']
stacks[1].store()
st_db = db_api.stack_get_all(self.ctx, show_hidden=True)
self.assertEqual(3, len(st_db))
st_db_visible = db_api.stack_get_all(self.ctx, show_hidden=False)
self.assertEqual(2, len(st_db_visible))
# Make sure the hidden stack isn't in the stacks returned by
# stack_get_all_visible()
for stack in st_db_visible:
self.assertNotEqual(stacks[0].id, stack.id)
def test_stack_get_all_by_tags(self):
stacks = [self._setup_test_stack('stack', x)[1] for x in UUIDs]
stacks[0].tags = ['tag1']
stacks[0].store()
stacks[1].tags = ['tag1', 'tag2']
stacks[1].store()
stacks[2].tags = ['tag1', 'tag2', 'tag3']
stacks[2].store()
st_db = db_api.stack_get_all(self.ctx, tags=['tag2'])
self.assertEqual(2, len(st_db))
st_db = db_api.stack_get_all(self.ctx, tags=['tag1', 'tag2'])
self.assertEqual(2, len(st_db))
st_db = db_api.stack_get_all(self.ctx, tags=['tag1', 'tag2', 'tag3'])
self.assertEqual(1, len(st_db))
def test_stack_get_all_by_tags_any(self):
stacks = [self._setup_test_stack('stack', x)[1] for x in UUIDs]
stacks[0].tags = ['tag2']
stacks[0].store()
stacks[1].tags = ['tag1', 'tag2']
stacks[1].store()
stacks[2].tags = ['tag1', 'tag3']
stacks[2].store()
st_db = db_api.stack_get_all(self.ctx, tags_any=['tag1'])
self.assertEqual(2, len(st_db))
st_db = db_api.stack_get_all(self.ctx, tags_any=['tag1', 'tag2',
'tag3'])
self.assertEqual(3, len(st_db))
def test_stack_get_all_by_not_tags(self):
stacks = [self._setup_test_stack('stack', x)[1] for x in UUIDs]
stacks[0].tags = ['tag1']
stacks[0].store()
stacks[1].tags = ['tag1', 'tag2']
stacks[1].store()
stacks[2].tags = ['tag1', 'tag2', 'tag3']
stacks[2].store()
st_db = db_api.stack_get_all(self.ctx, not_tags=['tag2'])
self.assertEqual(1, len(st_db))
st_db = db_api.stack_get_all(self.ctx, not_tags=['tag1', 'tag2'])
self.assertEqual(1, len(st_db))
st_db = db_api.stack_get_all(self.ctx, not_tags=['tag1', 'tag2',
'tag3'])
self.assertEqual(2, len(st_db))
def test_stack_get_all_by_not_tags_any(self):
stacks = [self._setup_test_stack('stack', x)[1] for x in UUIDs]
stacks[0].tags = ['tag2']
stacks[0].store()
stacks[1].tags = ['tag1', 'tag2']
stacks[1].store()
stacks[2].tags = ['tag1', 'tag3']
stacks[2].store()
st_db = db_api.stack_get_all(self.ctx, not_tags_any=['tag1'])
self.assertEqual(1, len(st_db))
st_db = db_api.stack_get_all(self.ctx, not_tags_any=['tag1', 'tag2',
'tag3'])
self.assertEqual(0, len(st_db))
def test_stack_get_all_by_tag_with_pagination(self):
stacks = [self._setup_test_stack('stack', x)[1] for x in UUIDs]
stacks[0].tags = ['tag1']
stacks[0].store()
stacks[1].tags = ['tag2']
stacks[1].store()
stacks[2].tags = ['tag1']
stacks[2].store()
st_db = db_api.stack_get_all(self.ctx, tags=['tag1'])
self.assertEqual(2, len(st_db))
st_db = db_api.stack_get_all(self.ctx, tags=['tag1'], limit=1)
self.assertEqual(1, len(st_db))
self.assertEqual(stacks[2].id, st_db[0].id)
st_db = db_api.stack_get_all(self.ctx, tags=['tag1'], limit=1,
marker=stacks[2].id)
self.assertEqual(1, len(st_db))
self.assertEqual(stacks[0].id, st_db[0].id)
def test_stack_get_all_by_tag_with_show_hidden(self):
cfg.CONF.set_override('hidden_stack_tags', ['hidden'])
stacks = [self._setup_test_stack('stack', x)[1] for x in UUIDs]
stacks[0].tags = ['tag1']
stacks[0].store()
stacks[1].tags = ['hidden', 'tag1']
stacks[1].store()
st_db = db_api.stack_get_all(self.ctx, tags=['tag1'],
show_hidden=True)
self.assertEqual(2, len(st_db))
st_db = db_api.stack_get_all(self.ctx, tags=['tag1'],
show_hidden=False)
self.assertEqual(1, len(st_db))
def test_stack_count_all(self):
stacks = [self._setup_test_stack('stack', x)[1] for x in UUIDs]
st_db = db_api.stack_count_all(self.ctx)
self.assertEqual(3, st_db)
stacks[0].delete()
st_db = db_api.stack_count_all(self.ctx)
self.assertEqual(2, st_db)
# show deleted
st_db = db_api.stack_count_all(self.ctx, show_deleted=True)
self.assertEqual(3, st_db)
stacks[1].delete()
st_db = db_api.stack_count_all(self.ctx)
self.assertEqual(1, st_db)
# show deleted
st_db = db_api.stack_count_all(self.ctx, show_deleted=True)
self.assertEqual(3, st_db)
def test_count_all_hidden_tags(self):
cfg.CONF.set_override('hidden_stack_tags', ['hidden'])
stacks = [self._setup_test_stack('stack', x)[1] for x in UUIDs]
stacks[0].tags = ['hidden']
stacks[0].store()
stacks[1].tags = ['random']
stacks[1].store()
st_db = db_api.stack_count_all(self.ctx, show_hidden=True)
self.assertEqual(3, st_db)
st_db_visible = db_api.stack_count_all(self.ctx, show_hidden=False)
self.assertEqual(2, st_db_visible)
def test_count_all_by_tags(self):
stacks = [self._setup_test_stack('stack', x)[1] for x in UUIDs]
stacks[0].tags = ['tag1']
stacks[0].store()
stacks[1].tags = ['tag2']
stacks[1].store()
stacks[2].tags = ['tag2']
stacks[2].store()
st_db = db_api.stack_count_all(self.ctx, tags=['tag1'])
self.assertEqual(1, st_db)
st_db = db_api.stack_count_all(self.ctx, tags=['tag2'])
self.assertEqual(2, st_db)
def test_count_all_by_tag_with_show_hidden(self):
cfg.CONF.set_override('hidden_stack_tags', ['hidden'])
stacks = [self._setup_test_stack('stack', x)[1] for x in UUIDs]
stacks[0].tags = ['tag1']
stacks[0].store()
stacks[1].tags = ['hidden', 'tag1']
stacks[1].store()
st_db = db_api.stack_count_all(self.ctx, tags=['tag1'],
show_hidden=True)
self.assertEqual(2, st_db)
st_db = db_api.stack_count_all(self.ctx, tags=['tag1'],
show_hidden=False)
self.assertEqual(1, st_db)
def test_stack_count_all_with_filters(self):
self._setup_test_stack('foo', UUID1)
self._setup_test_stack('bar', UUID2)
self._setup_test_stack('bar', UUID3)
filters = {'name': 'bar'}
st_db = db_api.stack_count_all(self.ctx, filters=filters)
self.assertEqual(2, st_db)
def test_stack_count_all_show_nested(self):
stack1 = self._setup_test_stack('stack1', UUID1)[1]
self._setup_test_stack('stack2', UUID2,
owner_id=stack1.id)
# Backup stack should not be counted
self._setup_test_stack('stack1*', UUID3,
owner_id=stack1.id,
backup=True)
st_db = db_api.stack_count_all(self.ctx)
self.assertEqual(1, st_db)
st_db = db_api.stack_count_all(self.ctx, show_nested=True)
self.assertEqual(2, st_db)
def test_event_get_all_by_stack(self):
stack = self._setup_test_stack('stack', UUID1)[1]
self._mock_create()
stack.create()
stack._persist_state()
events = db_api.event_get_all_by_stack(self.ctx, UUID1)
self.assertEqual(4, len(events))
# test filter by resource_status
filters = {'resource_status': 'COMPLETE'}
events = db_api.event_get_all_by_stack(self.ctx, UUID1,
filters=filters)
self.assertEqual(2, len(events))
self.assertEqual('COMPLETE', events[0].resource_status)
self.assertEqual('COMPLETE', events[1].resource_status)
# test filter by resource_action
filters = {'resource_action': 'CREATE'}
events = db_api.event_get_all_by_stack(self.ctx, UUID1,
filters=filters)
self.assertEqual(4, len(events))
self.assertEqual('CREATE', events[0].resource_action)
self.assertEqual('CREATE', events[1].resource_action)
self.assertEqual('CREATE', events[2].resource_action)
self.assertEqual('CREATE', events[3].resource_action)
# test filter by resource_type
filters = {'resource_type': 'AWS::EC2::Instance'}
events = db_api.event_get_all_by_stack(self.ctx, UUID1,
filters=filters)
self.assertEqual(2, len(events))
self.assertEqual('AWS::EC2::Instance', events[0].resource_type)
self.assertEqual('AWS::EC2::Instance', events[1].resource_type)
filters = {'resource_type': 'OS::Nova::Server'}
events = db_api.event_get_all_by_stack(self.ctx, UUID1,
filters=filters)
self.assertEqual(0, len(events))
# test limit and marker
events_all = db_api.event_get_all_by_stack(self.ctx, UUID1)
marker = events_all[0].uuid
expected = events_all[1].uuid
events = db_api.event_get_all_by_stack(self.ctx, UUID1,
limit=1, marker=marker)
self.assertEqual(1, len(events))
self.assertEqual(expected, events[0].uuid)
self._mock_delete()
stack.delete()
# test filter by resource_status
filters = {'resource_status': 'COMPLETE'}
events = db_api.event_get_all_by_stack(self.ctx, UUID1,
filters=filters)
self.assertEqual(4, len(events))
self.assertEqual('COMPLETE', events[0].resource_status)
self.assertEqual('COMPLETE', events[1].resource_status)
self.assertEqual('COMPLETE', events[2].resource_status)
self.assertEqual('COMPLETE', events[3].resource_status)
# test filter by resource_action
filters = {'resource_action': 'DELETE',
'resource_status': 'COMPLETE'}
events = db_api.event_get_all_by_stack(self.ctx, UUID1,
filters=filters)
self.assertEqual(2, len(events))
self.assertEqual('DELETE', events[0].resource_action)
self.assertEqual('COMPLETE', events[0].resource_status)
self.assertEqual('DELETE', events[1].resource_action)
self.assertEqual('COMPLETE', events[1].resource_status)
# test limit and marker
events_all = db_api.event_get_all_by_stack(self.ctx, UUID1)
self.assertEqual(8, len(events_all))
marker = events_all[1].uuid
events2_uuid = events_all[2].uuid
events3_uuid = events_all[3].uuid
events = db_api.event_get_all_by_stack(self.ctx, UUID1,
limit=1, marker=marker)
self.assertEqual(1, len(events))
self.assertEqual(events2_uuid, events[0].uuid)
events = db_api.event_get_all_by_stack(self.ctx, UUID1,
limit=2, marker=marker)
self.assertEqual(2, len(events))
self.assertEqual(events2_uuid, events[0].uuid)
self.assertEqual(events3_uuid, events[1].uuid)
self.fc.servers.create.assert_called_once_with(
image=744, flavor=3, key_name='test',
name=mock.ANY,
security_groups=None,
userdata=mock.ANY, scheduler_hints=None,
meta=None, nics=None,
availability_zone=None,
block_device_mapping=None
)
def test_event_count_all_by_stack(self):
stack = self._setup_test_stack('stack', UUID1)[1]
self._mock_create()
stack.create()
stack._persist_state()
num_events = db_api.event_count_all_by_stack(self.ctx, UUID1)
self.assertEqual(4, num_events)
self._mock_delete()
stack.delete()
num_events = db_api.event_count_all_by_stack(self.ctx, UUID1)
self.assertEqual(8, num_events)
self.fc.servers.create.assert_called_once_with(
image=744, flavor=3, key_name='test',
name=mock.ANY,
security_groups=None,
userdata=mock.ANY, scheduler_hints=None,
meta=None, nics=None,
availability_zone=None,
block_device_mapping=None
)
def test_event_get_all_by_tenant(self):
stacks = [self._setup_test_stack('stack', x)[1] for x in UUIDs]
self._mock_create()
[s.create() for s in stacks]
[s._persist_state() for s in stacks]
events = db_api.event_get_all_by_tenant(self.ctx)
self.assertEqual(12, len(events))
self._mock_delete()
[s.delete() for s in stacks]
events = db_api.event_get_all_by_tenant(self.ctx)
self.assertEqual(0, len(events))
self.fc.servers.create.assert_called_with(
image=744, flavor=3, key_name='test',
name=mock.ANY,
security_groups=None,
userdata=mock.ANY, scheduler_hints=None,
meta=None, nics=None,
availability_zone=None,
block_device_mapping=None
)
self.assertEqual(len(stacks), self.fc.servers.create.call_count)
def test_user_creds_password(self):
self.ctx.password = 'password'
self.ctx.trust_id = None
self.ctx.region_name = 'RegionOne'
db_creds = db_api.user_creds_create(self.ctx)
load_creds = db_api.user_creds_get(self.ctx, db_creds['id'])
self.assertEqual('test_username', load_creds.get('username'))
self.assertEqual('password', load_creds.get('password'))
self.assertEqual('test_tenant', load_creds.get('tenant'))
self.assertEqual('test_tenant_id', load_creds.get('tenant_id'))
self.assertEqual('RegionOne', load_creds.get('region_name'))
self.assertIsNotNone(load_creds.get('created_at'))
self.assertIsNone(load_creds.get('updated_at'))
self.assertEqual('http://server.test:5000/v2.0',
load_creds.get('auth_url'))
self.assertIsNone(load_creds.get('trust_id'))
self.assertIsNone(load_creds.get('trustor_user_id'))
def test_user_creds_password_too_long(self):
self.ctx.trust_id = None
self.ctx.password = 'O123456789O1234567' * 20
error = self.assertRaises(exception.Error,
db_api.user_creds_create,
self.ctx)
self.assertIn('Length of OS_PASSWORD after encryption exceeds '
'Heat limit (255 chars)', six.text_type(error))
def test_user_creds_trust(self):
self.ctx.username = None
self.ctx.password = None
self.ctx.trust_id = 'atrust123'
self.ctx.trustor_user_id = 'atrustor123'
self.ctx.tenant = 'atenant123'
self.ctx.project_name = 'atenant'
self.ctx.auth_url = 'anauthurl'
self.ctx.region_name = 'aregion'
db_creds = db_api.user_creds_create(self.ctx)
load_creds = db_api.user_creds_get(self.ctx, db_creds['id'])
self.assertIsNone(load_creds.get('username'))
self.assertIsNone(load_creds.get('password'))
self.assertIsNotNone(load_creds.get('created_at'))
self.assertIsNone(load_creds.get('updated_at'))
self.assertEqual('anauthurl', load_creds.get('auth_url'))
self.assertEqual('aregion', load_creds.get('region_name'))
self.assertEqual('atenant123', load_creds.get('tenant_id'))
self.assertEqual('atenant', load_creds.get('tenant'))
self.assertEqual('atrust123', load_creds.get('trust_id'))
self.assertEqual('atrustor123', load_creds.get('trustor_user_id'))
def test_user_creds_none(self):
self.ctx.username = None
self.ctx.password = None
self.ctx.trust_id = None
self.ctx.region_name = None
db_creds = db_api.user_creds_create(self.ctx)
load_creds = db_api.user_creds_get(self.ctx, db_creds['id'])
self.assertIsNone(load_creds.get('username'))
self.assertIsNone(load_creds.get('password'))
self.assertIsNone(load_creds.get('trust_id'))
self.assertIsNone(load_creds.get('region_name'))
def test_software_config_create(self):
tenant_id = self.ctx.tenant_id
config = db_api.software_config_create(
self.ctx, {'name': 'config_mysql',
'tenant': tenant_id})
self.assertIsNotNone(config)
self.assertEqual('config_mysql', config.name)
self.assertEqual(tenant_id, config.tenant)
def test_software_config_get(self):
self.assertRaises(
exception.NotFound,
db_api.software_config_get,
self.ctx,
str(uuid.uuid4()))
conf = ('#!/bin/bash\n'
'echo "$bar and $foo"\n')
config = {
'inputs': [{'name': 'foo'}, {'name': 'bar'}],
'outputs': [{'name': 'result'}],
'config': conf,
'options': {}
}
tenant_id = self.ctx.tenant_id
values = {'name': 'config_mysql',
'tenant': tenant_id,
'group': 'Heat::Shell',
'config': config}
config = db_api.software_config_create(
self.ctx, values)
config_id = config.id
config = db_api.software_config_get(self.ctx, config_id)
self.assertIsNotNone(config)
self.assertEqual('config_mysql', config.name)
self.assertEqual(tenant_id, config.tenant)
self.assertEqual('Heat::Shell', config.group)
self.assertEqual(conf, config.config['config'])
self.ctx.tenant = None
self.assertRaises(
exception.NotFound,
db_api.software_config_get,
self.ctx,
config_id)
# admin can get the software_config
admin_ctx = utils.dummy_context(is_admin=True,
tenant_id='admin_tenant')
config = db_api.software_config_get(admin_ctx, config_id)
self.assertIsNotNone(config)
def _create_software_config_record(self):
tenant_id = self.ctx.tenant_id
software_config = db_api.software_config_create(
self.ctx, {'name': 'config_mysql',
'tenant': tenant_id})
self.assertIsNotNone(software_config)
return software_config.id
def _test_software_config_get_all(self, get_ctx=None):
self.assertEqual([], db_api.software_config_get_all(self.ctx))
scf_id = self._create_software_config_record()
software_configs = db_api.software_config_get_all(get_ctx or self.ctx)
self.assertEqual(1, len(software_configs))
self.assertEqual(scf_id, software_configs[0].id)
def test_software_config_get_all(self):
self._test_software_config_get_all()
def test_software_config_get_all_by_admin(self):
admin_ctx = utils.dummy_context(is_admin=True,
tenant_id='admin_tenant')
self._test_software_config_get_all(get_ctx=admin_ctx)
def test_software_config_delete(self):
scf_id = self._create_software_config_record()
cfg = db_api.software_config_get(self.ctx, scf_id)
self.assertIsNotNone(cfg)
db_api.software_config_delete(self.ctx, scf_id)
err = self.assertRaises(
exception.NotFound,
db_api.software_config_get,
self.ctx,
scf_id)
self.assertIn(scf_id, six.text_type(err))
err = self.assertRaises(
exception.NotFound, db_api.software_config_delete,
self.ctx, scf_id)
self.assertIn(scf_id, six.text_type(err))
def test_software_config_delete_by_admin(self):
scf_id = self._create_software_config_record()
cfg = db_api.software_config_get(self.ctx, scf_id)
self.assertIsNotNone(cfg)
admin_ctx = utils.dummy_context(is_admin=True,
tenant_id='admin_tenant')
db_api.software_config_delete(admin_ctx, scf_id)
def test_software_config_delete_not_allowed(self):
tenant_id = self.ctx.tenant_id
config = db_api.software_config_create(
self.ctx, {'name': 'config_mysql',
'tenant': tenant_id})
config_id = config.id
values = {
'tenant': tenant_id,
'stack_user_project_id': str(uuid.uuid4()),
'config_id': config_id,
'server_id': str(uuid.uuid4()),
}
db_api.software_deployment_create(self.ctx, values)
err = self.assertRaises(
exception.InvalidRestrictedAction, db_api.software_config_delete,
self.ctx, config_id)
msg = ("Software config with id %s can not be deleted as it is "
"referenced" % config_id)
self.assertIn(msg, six.text_type(err))
def _deployment_values(self):
tenant_id = self.ctx.tenant_id
stack_user_project_id = str(uuid.uuid4())
config_id = db_api.software_config_create(
self.ctx, {'name': 'config_mysql', 'tenant': tenant_id}).id
server_id = str(uuid.uuid4())
input_values = {'foo': 'fooooo', 'bar': 'baaaaa'}
values = {
'tenant': tenant_id,
'stack_user_project_id': stack_user_project_id,
'config_id': config_id,
'server_id': server_id,
'input_values': input_values
}
return values
def test_software_deployment_create(self):
values = self._deployment_values()
deployment = db_api.software_deployment_create(self.ctx, values)
self.assertIsNotNone(deployment)
self.assertEqual(values['tenant'], deployment.tenant)
def test_software_deployment_get(self):
self.assertRaises(
exception.NotFound,
db_api.software_deployment_get,
self.ctx,
str(uuid.uuid4()))
values = self._deployment_values()
deployment = db_api.software_deployment_create(self.ctx, values)
self.assertIsNotNone(deployment)
deployment_id = deployment.id
deployment = db_api.software_deployment_get(self.ctx, deployment_id)
self.assertIsNotNone(deployment)
self.assertEqual(values['tenant'], deployment.tenant)
self.assertEqual(values['config_id'], deployment.config_id)
self.assertEqual(values['server_id'], deployment.server_id)
self.assertEqual(values['input_values'], deployment.input_values)
self.assertEqual(
values['stack_user_project_id'], deployment.stack_user_project_id)
# assert not found with invalid context tenant
self.ctx.tenant = str(uuid.uuid4())
self.assertRaises(
exception.NotFound,
db_api.software_deployment_get,
self.ctx,
deployment_id)
# assert found with stack_user_project_id context tenant
self.ctx.tenant = deployment.stack_user_project_id
deployment = db_api.software_deployment_get(self.ctx, deployment_id)
self.assertIsNotNone(deployment)
self.assertEqual(values['tenant'], deployment.tenant)
# admin can get the deployments
admin_ctx = utils.dummy_context(is_admin=True,
tenant_id='admin_tenant')
deployment = db_api.software_deployment_get(admin_ctx, deployment_id)
self.assertIsNotNone(deployment)
def test_software_deployment_get_all(self):
self.assertEqual([], db_api.software_deployment_get_all(self.ctx))
values = self._deployment_values()
deployment = db_api.software_deployment_create(self.ctx, values)
self.assertIsNotNone(deployment)
deployments = db_api.software_deployment_get_all(self.ctx)
self.assertEqual(1, len(deployments))
self.assertEqual(deployment.id, deployments[0].id)
deployments = db_api.software_deployment_get_all(
self.ctx, server_id=values['server_id'])
self.assertEqual(1, len(deployments))
self.assertEqual(deployment.id, deployments[0].id)
deployments = db_api.software_deployment_get_all(
self.ctx, server_id=str(uuid.uuid4()))
self.assertEqual([], deployments)
# admin can get the deployments of other tenants
admin_ctx = utils.dummy_context(is_admin=True,
tenant_id='admin_tenant')
deployments = db_api.software_deployment_get_all(admin_ctx)
self.assertEqual(1, len(deployments))
def test_software_deployment_update(self):
deployment_id = str(uuid.uuid4())
err = self.assertRaises(exception.NotFound,
db_api.software_deployment_update,
self.ctx, deployment_id, values={})
self.assertIn(deployment_id, six.text_type(err))
values = self._deployment_values()
deployment = db_api.software_deployment_create(self.ctx, values)
deployment_id = deployment.id
values = {'status': 'COMPLETED'}
deployment = db_api.software_deployment_update(
self.ctx, deployment_id, values)
self.assertIsNotNone(deployment)
self.assertEqual(values['status'], deployment.status)
admin_ctx = utils.dummy_context(is_admin=True,
tenant_id='admin_tenant')
values = {'status': 'FAILED'}
deployment = db_api.software_deployment_update(
admin_ctx, deployment_id, values)
self.assertIsNotNone(deployment)
self.assertEqual(values['status'], deployment.status)
def _test_software_deployment_delete(self, test_ctx=None):
deployment_id = str(uuid.uuid4())
err = self.assertRaises(exception.NotFound,
db_api.software_deployment_delete,
self.ctx, deployment_id)
self.assertIn(deployment_id, six.text_type(err))
values = self._deployment_values()
deployment = db_api.software_deployment_create(self.ctx, values)
deployment_id = deployment.id
test_ctx = test_ctx or self.ctx
deployment = db_api.software_deployment_get(test_ctx, deployment_id)
self.assertIsNotNone(deployment)
db_api.software_deployment_delete(test_ctx, deployment_id)
err = self.assertRaises(
exception.NotFound,
db_api.software_deployment_get,
test_ctx,
deployment_id)
self.assertIn(deployment_id, six.text_type(err))
def test_software_deployment_delete(self):
self._test_software_deployment_delete()
def test_software_deployment_delete_by_admin(self):
admin_ctx = utils.dummy_context(is_admin=True,
tenant_id='admin_tenant')
self._test_software_deployment_delete(test_ctx=admin_ctx)
def test_snapshot_create(self):
template = create_raw_template(self.ctx)
user_creds = create_user_creds(self.ctx)
stack = create_stack(self.ctx, template, user_creds)
values = {'tenant': self.ctx.tenant_id, 'status': 'IN_PROGRESS',
'stack_id': stack.id}
snapshot = db_api.snapshot_create(self.ctx, values)
self.assertIsNotNone(snapshot)
self.assertEqual(values['tenant'], snapshot.tenant)
def test_snapshot_create_with_name(self):
template = create_raw_template(self.ctx)
user_creds = create_user_creds(self.ctx)
stack = create_stack(self.ctx, template, user_creds)
values = {'tenant': self.ctx.tenant_id, 'status': 'IN_PROGRESS',
'stack_id': stack.id, 'name': 'snap1'}
snapshot = db_api.snapshot_create(self.ctx, values)
self.assertIsNotNone(snapshot)
self.assertEqual(values['tenant'], snapshot.tenant)
self.assertEqual('snap1', snapshot.name)
def test_snapshot_get_not_found(self):
self.assertRaises(
exception.NotFound,
db_api.snapshot_get,
self.ctx,
str(uuid.uuid4()))
def test_snapshot_get(self):
template = create_raw_template(self.ctx)
user_creds = create_user_creds(self.ctx)
stack = create_stack(self.ctx, template, user_creds)
values = {'tenant': self.ctx.tenant_id, 'status': 'IN_PROGRESS',
'stack_id': stack.id}
snapshot = db_api.snapshot_create(self.ctx, values)
self.assertIsNotNone(snapshot)
snapshot_id = snapshot.id
snapshot = db_api.snapshot_get(self.ctx, snapshot_id)
self.assertIsNotNone(snapshot)
self.assertEqual(values['tenant'], snapshot.tenant)
self.assertEqual(values['status'], snapshot.status)
self.assertIsNotNone(snapshot.created_at)
def test_snapshot_get_by_another_stack(self):
template = create_raw_template(self.ctx)
user_creds = create_user_creds(self.ctx)
stack = create_stack(self.ctx, template, user_creds)
stack1 = create_stack(self.ctx, template, user_creds)
values = {'tenant': self.ctx.tenant_id, 'status': 'IN_PROGRESS',
'stack_id': stack.id}
snapshot = db_api.snapshot_create(self.ctx, values)
self.assertIsNotNone(snapshot)
snapshot_id = snapshot.id
self.assertRaises(exception.SnapshotNotFound,
db_api.snapshot_get_by_stack,
self.ctx, snapshot_id, stack1)
def test_snapshot_get_not_found_invalid_tenant(self):
template = create_raw_template(self.ctx)
user_creds = create_user_creds(self.ctx)
stack = create_stack(self.ctx, template, user_creds)
values = {'tenant': self.ctx.tenant_id, 'status': 'IN_PROGRESS',
'stack_id': stack.id}
snapshot = db_api.snapshot_create(self.ctx, values)
self.ctx.tenant = str(uuid.uuid4())
self.assertRaises(
exception.NotFound,
db_api.snapshot_get,
self.ctx,
snapshot.id)
def test_snapshot_update_not_found(self):
snapshot_id = str(uuid.uuid4())
err = self.assertRaises(exception.NotFound,
db_api.snapshot_update,
self.ctx, snapshot_id, values={})
self.assertIn(snapshot_id, six.text_type(err))
def test_snapshot_update(self):
template = create_raw_template(self.ctx)
user_creds = create_user_creds(self.ctx)
stack = create_stack(self.ctx, template, user_creds)
values = {'tenant': self.ctx.tenant_id, 'status': 'IN_PROGRESS',
'stack_id': stack.id}
snapshot = db_api.snapshot_create(self.ctx, values)
snapshot_id = snapshot.id
values = {'status': 'COMPLETED'}
snapshot = db_api.snapshot_update(self.ctx, snapshot_id, values)
self.assertIsNotNone(snapshot)
self.assertEqual(values['status'], snapshot.status)
def test_snapshot_delete_not_found(self):
snapshot_id = str(uuid.uuid4())
err = self.assertRaises(exception.NotFound,
db_api.snapshot_delete,
self.ctx, snapshot_id)
self.assertIn(snapshot_id, six.text_type(err))
def test_snapshot_delete(self):
template = create_raw_template(self.ctx)
user_creds = create_user_creds(self.ctx)
stack = create_stack(self.ctx, template, user_creds)
values = {'tenant': self.ctx.tenant_id, 'status': 'IN_PROGRESS',
'stack_id': stack.id}
snapshot = db_api.snapshot_create(self.ctx, values)
snapshot_id = snapshot.id
snapshot = db_api.snapshot_get(self.ctx, snapshot_id)
self.assertIsNotNone(snapshot)
db_api.snapshot_delete(self.ctx, snapshot_id)
err = self.assertRaises(
exception.NotFound,
db_api.snapshot_get,
self.ctx,
snapshot_id)
self.assertIn(snapshot_id, six.text_type(err))
def test_snapshot_get_all(self):
template = create_raw_template(self.ctx)
user_creds = create_user_creds(self.ctx)
stack = create_stack(self.ctx, template, user_creds)
values = {'tenant': self.ctx.tenant_id, 'status': 'IN_PROGRESS',
'stack_id': stack.id}
snapshot = db_api.snapshot_create(self.ctx, values)
self.assertIsNotNone(snapshot)
[snapshot] = db_api.snapshot_get_all(self.ctx, stack.id)
self.assertIsNotNone(snapshot)
self.assertEqual(values['tenant'], snapshot.tenant)
self.assertEqual(values['status'], snapshot.status)
self.assertIsNotNone(snapshot.created_at)
def create_raw_template(context, **kwargs):
t = template_format.parse(wp_template)
template = {
'template': t,
}
if 'files' not in kwargs and 'files_id' not in kwargs:
# modern raw_templates have associated raw_template_files db obj
tf = template_files.TemplateFiles({'foo': 'bar'})
tf.store(context)
kwargs['files_id'] = tf.files_id
template.update(kwargs)
return db_api.raw_template_create(context, template)
def create_user_creds(ctx, **kwargs):
ctx_dict = ctx.to_dict()
ctx_dict.update(kwargs)
ctx = context.RequestContext.from_dict(ctx_dict)
return db_api.user_creds_create(ctx)
def create_stack(ctx, template, user_creds, **kwargs):
values = {
'name': 'db_test_stack_name',
'raw_template_id': template.id,
'username': ctx.username,
'tenant': ctx.tenant_id,
'action': 'create',
'status': 'complete',
'status_reason': 'create_complete',
'parameters': {},
'user_creds_id': user_creds['id'],
'owner_id': None,
'backup': False,
'timeout': '60',
'disable_rollback': 0,
'current_traversal': 'dummy-uuid',
'prev_raw_template': None
}
values.update(kwargs)
return db_api.stack_create(ctx, values)
def create_resource(ctx, stack, legacy_prop_data=False, **kwargs):
phy_res_id = UUID1
if 'phys_res_id' in kwargs:
phy_res_id = kwargs.pop('phys_res_id')
if not legacy_prop_data:
rpd = db_api.resource_prop_data_create(ctx, {'data': {'foo1': 'bar1'},
'encrypted': False})
values = {
'name': 'test_resource_name',
'physical_resource_id': phy_res_id,
'action': 'create',
'status': 'complete',
'status_reason': 'create_complete',
'rsrc_metadata': json.loads('{"foo": "123"}'),
'stack_id': stack.id,
'atomic_key': 1,
}
if not legacy_prop_data:
values['rsrc_prop_data'] = rpd
else:
values['properties_data'] = {'foo1': 'bar1'}
values.update(kwargs)
return db_api.resource_create(ctx, values)
def create_resource_data(ctx, resource, **kwargs):
values = {
'key': 'test_resource_key',
'value': 'test_value',
'redact': 0,
}
values.update(kwargs)
return db_api.resource_data_set(ctx, resource.id, **values)
def create_resource_prop_data(ctx, **kwargs):
values = {
'data': {'foo1': 'bar1'},
'encrypted': False
}
values.update(kwargs)
return db_api.resource_prop_data_create(ctx, **values)
def create_event(ctx, legacy_prop_data=False, **kwargs):
if not legacy_prop_data:
rpd = db_api.resource_prop_data_create(ctx,
{'data': {'foo2': 'ev_bar'},
'encrypted': False})
values = {
'stack_id': 'test_stack_id',
'resource_action': 'create',
'resource_status': 'complete',
'resource_name': 'res',
'physical_resource_id': UUID1,
'resource_status_reason': "create_complete",
}
if not legacy_prop_data:
values['rsrc_prop_data'] = rpd
else:
values['resource_properties'] = {'foo2': 'ev_bar'}
values.update(kwargs)
return db_api.event_create(ctx, values)
def create_service(ctx, **kwargs):
values = {
'id': '7079762f-c863-4954-ba61-9dccb68c57e2',
'engine_id': 'f9aff81e-bc1f-4119-941d-ad1ea7f31d19',
'host': 'engine-1',
'hostname': 'host1.devstack.org',
'binary': 'heat-engine',
'topic': 'engine',
'report_interval': 60}
values.update(kwargs)
return db_api.service_create(ctx, values)
def create_sync_point(ctx, **kwargs):
values = {'entity_id': '0782c463-064a-468d-98fd-442efb638e3a',
'is_update': True,
'traversal_id': '899ff81e-fc1f-41f9-f41d-ad1ea7f31d19',
'atomic_key': 0,
'stack_id': 'f6359498-764b-49e7-a515-ad31cbef885b',
'input_data': {}}
values.update(kwargs)
return db_api.sync_point_create(ctx, values)
class DBAPIRawTemplateTest(common.HeatTestCase):
def setUp(self):
super(DBAPIRawTemplateTest, self).setUp()
self.ctx = utils.dummy_context()
def test_raw_template_create(self):
t = template_format.parse(wp_template)
tp = create_raw_template(self.ctx, template=t)
self.assertIsNotNone(tp.id)
self.assertEqual(t, tp.template)
def test_raw_template_get(self):
t = template_format.parse(wp_template)
tp = create_raw_template(self.ctx, template=t)
template = db_api.raw_template_get(self.ctx, tp.id)
self.assertEqual(tp.id, template.id)
self.assertEqual(tp.template, template.template)
def test_raw_template_update(self):
another_wp_template = '''
{
"AWSTemplateFormatVersion" : "2010-09-09",
"Description" : "WordPress",
"Parameters" : {
"KeyName" : {
"Description" : "KeyName",
"Type" : "String",
"Default" : "test"
}
},
"Resources" : {
"WebServer": {
"Type": "AWS::EC2::Instance",
"Properties": {
"ImageId" : "fedora-20.x86_64.qcow2",
"InstanceType" : "m1.xlarge",
"KeyName" : "test",
"UserData" : "wordpress"
}
}
}
}
'''
new_t = template_format.parse(another_wp_template)
new_files = {
'foo': 'bar',
'myfile': 'file:///home/somefile'
}
new_values = {
'template': new_t,
'files': new_files
}
orig_tp = create_raw_template(self.ctx)
updated_tp = db_api.raw_template_update(self.ctx,
orig_tp.id, new_values)
self.assertEqual(orig_tp.id, updated_tp.id)
self.assertEqual(new_t, updated_tp.template)
self.assertEqual(new_files, updated_tp.files)
def test_raw_template_delete(self):
t = template_format.parse(wp_template)
tp = create_raw_template(self.ctx, template=t)
db_api.raw_template_delete(self.ctx, tp.id)
self.assertRaises(exception.NotFound, db_api.raw_template_get,
self.ctx, tp.id)
class DBAPIUserCredsTest(common.HeatTestCase):
def setUp(self):
super(DBAPIUserCredsTest, self).setUp()
self.ctx = utils.dummy_context()
def test_user_creds_create_trust(self):
user_creds = create_user_creds(self.ctx, trust_id='test_trust_id',
trustor_user_id='trustor_id')
self.assertIsNotNone(user_creds['id'])
self.assertEqual('test_trust_id', user_creds['trust_id'])
self.assertEqual('trustor_id', user_creds['trustor_user_id'])
self.assertIsNone(user_creds['username'])
self.assertIsNone(user_creds['password'])
self.assertEqual(self.ctx.project_name, user_creds['tenant'])
self.assertEqual(self.ctx.tenant_id, user_creds['tenant_id'])
def test_user_creds_create_password(self):
user_creds = create_user_creds(self.ctx)
self.assertIsNotNone(user_creds['id'])
self.assertEqual(self.ctx.password, user_creds['password'])
def test_user_creds_get(self):
user_creds = create_user_creds(self.ctx)
ret_user_creds = db_api.user_creds_get(self.ctx, user_creds['id'])
self.assertEqual(user_creds['password'],
ret_user_creds['password'])
def test_user_creds_get_noexist(self):
self.assertIsNone(db_api.user_creds_get(self.ctx, 123456))
def test_user_creds_delete(self):
user_creds = create_user_creds(self.ctx)
self.assertIsNotNone(user_creds['id'])
db_api.user_creds_delete(self.ctx, user_creds['id'])
creds = db_api.user_creds_get(self.ctx, user_creds['id'])
self.assertIsNone(creds)
mock_delete = self.patchobject(session.Session, 'delete')
err = self.assertRaises(
exception.NotFound, db_api.user_creds_delete,
self.ctx, user_creds['id'])
exp_msg = ('Attempt to delete user creds with id '
'%s that does not exist' % user_creds['id'])
self.assertIn(exp_msg, six.text_type(err))
self.assertEqual(0, mock_delete.call_count)
def test_user_creds_delete_retries(self):
mock_delete = self.patchobject(session.Session, 'delete')
# returns StaleDataErrors, so we try delete 3 times
mock_delete.side_effect = [exc.StaleDataError,
exc.StaleDataError,
None]
user_creds = create_user_creds(self.ctx)
self.assertIsNotNone(user_creds['id'])
self.assertIsNone(
db_api.user_creds_delete(self.ctx, user_creds['id']))
self.assertEqual(3, mock_delete.call_count)
# returns other errors, so we try delete once
mock_delete.side_effect = [exc.UnmappedError]
self.assertRaises(exc.UnmappedError, db_api.user_creds_delete,
self.ctx, user_creds['id'])
self.assertEqual(4, mock_delete.call_count)
class DBAPIStackTagTest(common.HeatTestCase):
def setUp(self):
super(DBAPIStackTagTest, self).setUp()
self.ctx = utils.dummy_context()
self.template = create_raw_template(self.ctx)
self.user_creds = create_user_creds(self.ctx)
self.stack = create_stack(self.ctx, self.template, self.user_creds)
def test_stack_tags_set(self):
tags = db_api.stack_tags_set(self.ctx, self.stack.id, ['tag1', 'tag2'])
self.assertEqual(self.stack.id, tags[0].stack_id)
self.assertEqual('tag1', tags[0].tag)
tags = db_api.stack_tags_set(self.ctx, self.stack.id, [])
self.assertIsNone(tags)
def test_stack_tags_get(self):
db_api.stack_tags_set(self.ctx, self.stack.id, ['tag1', 'tag2'])
tags = db_api.stack_tags_get(self.ctx, self.stack.id)
self.assertEqual(self.stack.id, tags[0].stack_id)
self.assertEqual('tag1', tags[0].tag)
tags = db_api.stack_tags_get(self.ctx, UUID1)
self.assertIsNone(tags)
def test_stack_tags_delete(self):
db_api.stack_tags_set(self.ctx, self.stack.id, ['tag1', 'tag2'])
db_api.stack_tags_delete(self.ctx, self.stack.id)
tags = db_api.stack_tags_get(self.ctx, self.stack.id)
self.assertIsNone(tags)
class DBAPIStackTest(common.HeatTestCase):
def setUp(self):
super(DBAPIStackTest, self).setUp()
self.ctx = utils.dummy_context()
self.template = create_raw_template(self.ctx)
self.user_creds = create_user_creds(self.ctx)
def test_stack_create(self):
stack = create_stack(self.ctx, self.template, self.user_creds)
self.assertIsNotNone(stack.id)
self.assertEqual('db_test_stack_name', stack.name)
self.assertEqual(self.template.id, stack.raw_template_id)
self.assertEqual(self.ctx.username, stack.username)
self.assertEqual(self.ctx.tenant_id, stack.tenant)
self.assertEqual('create', stack.action)
self.assertEqual('complete', stack.status)
self.assertEqual('create_complete', stack.status_reason)
self.assertEqual({}, stack.parameters)
self.assertEqual(self.user_creds['id'], stack.user_creds_id)
self.assertIsNone(stack.owner_id)
self.assertEqual('60', stack.timeout)
self.assertFalse(stack.disable_rollback)
def test_stack_delete(self):
stack = create_stack(self.ctx, self.template, self.user_creds)
stack_id = stack.id
resource = create_resource(self.ctx, stack)
db_api.stack_delete(self.ctx, stack_id)
self.assertIsNone(db_api.stack_get(self.ctx, stack_id,
show_deleted=False))
self.assertRaises(exception.NotFound, db_api.resource_get,
self.ctx, resource.id)
self.assertRaises(exception.NotFound, db_api.stack_delete,
self.ctx, stack_id)
# Testing soft delete
ret_stack = db_api.stack_get(self.ctx, stack_id, show_deleted=True)
self.assertIsNotNone(ret_stack)
self.assertEqual(stack_id, ret_stack.id)
self.assertEqual('db_test_stack_name', ret_stack.name)
# Testing child resources deletion
self.assertRaises(exception.NotFound, db_api.resource_get,
self.ctx, resource.id)
def test_stack_update(self):
stack = create_stack(self.ctx, self.template, self.user_creds)
values = {
'name': 'db_test_stack_name2',
'action': 'update',
'status': 'failed',
'status_reason': "update_failed",
'timeout': '90',
'current_traversal': 'another-dummy-uuid',
}
db_api.stack_update(self.ctx, stack.id, values)
stack = db_api.stack_get(self.ctx, stack.id)
self.assertEqual('db_test_stack_name2', stack.name)
self.assertEqual('update', stack.action)
self.assertEqual('failed', stack.status)
self.assertEqual('update_failed', stack.status_reason)
self.assertEqual(90, stack.timeout)
self.assertEqual('another-dummy-uuid', stack.current_traversal)
self.assertRaises(exception.NotFound, db_api.stack_update, self.ctx,
UUID2, values)
def test_stack_update_matches_traversal_id(self):
stack = create_stack(self.ctx, self.template, self.user_creds)
values = {
'current_traversal': 'another-dummy-uuid',
}
updated = db_api.stack_update(self.ctx, stack.id, values,
exp_trvsl='dummy-uuid')
self.assertTrue(updated)
stack = db_api.stack_get(self.ctx, stack.id)
self.assertEqual('another-dummy-uuid', stack.current_traversal)
# test update fails when expected traversal is not matched
matching_uuid = 'another-dummy-uuid'
updated = db_api.stack_update(self.ctx, stack.id, values,
exp_trvsl=matching_uuid)
self.assertTrue(updated)
diff_uuid = 'some-other-dummy-uuid'
updated = db_api.stack_update(self.ctx, stack.id, values,
exp_trvsl=diff_uuid)
self.assertFalse(updated)
@mock.patch.object(time, 'sleep')
def test_stack_update_retries_on_deadlock(self, sleep):
stack = create_stack(self.ctx, self.template, self.user_creds)
with mock.patch('sqlalchemy.orm.query.Query.update',
side_effect=db_exception.DBDeadlock) as mock_update:
self.assertRaises(db_exception.DBDeadlock,
db_api.stack_update, self.ctx, stack.id, {})
self.assertEqual(4, mock_update.call_count)
def test_stack_set_status_release_lock(self):
stack = create_stack(self.ctx, self.template, self.user_creds)
values = {
'name': 'db_test_stack_name2',
'action': 'update',
'status': 'failed',
'status_reason': "update_failed",
'timeout': '90',
'current_traversal': 'another-dummy-uuid',
}
db_api.stack_lock_create(self.ctx, stack.id, UUID1)
observed = db_api.persist_state_and_release_lock(self.ctx, stack.id,
UUID1, values)
self.assertIsNone(observed)
stack = db_api.stack_get(self.ctx, stack.id)
self.assertEqual('db_test_stack_name2', stack.name)
self.assertEqual('update', stack.action)
self.assertEqual('failed', stack.status)
self.assertEqual('update_failed', stack.status_reason)
self.assertEqual(90, stack.timeout)
self.assertEqual('another-dummy-uuid', stack.current_traversal)
def test_stack_set_status_release_lock_failed(self):
stack = create_stack(self.ctx, self.template, self.user_creds)
values = {
'name': 'db_test_stack_name2',
'action': 'update',
'status': 'failed',
'status_reason': "update_failed",
'timeout': '90',
'current_traversal': 'another-dummy-uuid',
}
db_api.stack_lock_create(self.ctx, stack.id, UUID2)
observed = db_api.persist_state_and_release_lock(self.ctx, stack.id,
UUID1, values)
self.assertTrue(observed)
def test_stack_set_status_failed_release_lock(self):
stack = create_stack(self.ctx, self.template, self.user_creds)
values = {
'name': 'db_test_stack_name2',
'action': 'update',
'status': 'failed',
'status_reason': "update_failed",
'timeout': '90',
'current_traversal': 'another-dummy-uuid',
}
db_api.stack_lock_create(self.ctx, stack.id, UUID1)
observed = db_api.persist_state_and_release_lock(self.ctx, UUID2,
UUID1, values)
self.assertTrue(observed)
def test_stack_get_returns_a_stack(self):
stack = create_stack(self.ctx, self.template, self.user_creds)
ret_stack = db_api.stack_get(self.ctx, stack.id, show_deleted=False)
self.assertIsNotNone(ret_stack)
self.assertEqual(stack.id, ret_stack.id)
self.assertEqual('db_test_stack_name', ret_stack.name)
def test_stack_get_returns_none_if_stack_does_not_exist(self):
stack = db_api.stack_get(self.ctx, UUID1, show_deleted=False)
self.assertIsNone(stack)
def test_stack_get_returns_none_if_tenant_id_does_not_match(self):
stack = create_stack(self.ctx, self.template, self.user_creds)
self.ctx.tenant = 'abc'
stack = db_api.stack_get(self.ctx, UUID1, show_deleted=False)
self.assertIsNone(stack)
def test_stack_get_tenant_is_stack_user_project_id(self):
stack = create_stack(self.ctx, self.template, self.user_creds,
stack_user_project_id='astackuserproject')
self.ctx.tenant = 'astackuserproject'
ret_stack = db_api.stack_get(self.ctx, stack.id, show_deleted=False)
self.assertIsNotNone(ret_stack)
self.assertEqual(stack.id, ret_stack.id)
self.assertEqual('db_test_stack_name', ret_stack.name)
def test_stack_get_can_return_a_stack_from_different_tenant(self):
# create a stack with the common tenant
stack = create_stack(self.ctx, self.template, self.user_creds)
# admin context can get the stack
admin_ctx = utils.dummy_context(user='admin_username',
tenant_id='admin_tenant',
is_admin=True)
ret_stack = db_api.stack_get(admin_ctx, stack.id,
show_deleted=False)
self.assertEqual(stack.id, ret_stack.id)
self.assertEqual('db_test_stack_name', ret_stack.name)
def test_stack_get_by_name(self):
stack = create_stack(self.ctx, self.template, self.user_creds)
ret_stack = db_api.stack_get_by_name(self.ctx, stack.name)
self.assertIsNotNone(ret_stack)
self.assertEqual(stack.id, ret_stack.id)
self.assertEqual('db_test_stack_name', ret_stack.name)
self.assertIsNone(db_api.stack_get_by_name(self.ctx, 'abc'))
self.ctx.tenant = 'abc'
self.assertIsNone(db_api.stack_get_by_name(self.ctx, 'abc'))
def test_stack_get_all(self):
values = [
{'name': 'stack1'},
{'name': 'stack2'},
{'name': 'stack3'},
{'name': 'stack4'}
]
[create_stack(self.ctx, self.template, self.user_creds,
**val) for val in values]
ret_stacks = db_api.stack_get_all(self.ctx)
self.assertEqual(4, len(ret_stacks))
names = [ret_stack.name for ret_stack in ret_stacks]
[self.assertIn(val['name'], names) for val in values]
def test_stack_get_all_by_owner_id(self):
parent_stack1 = create_stack(self.ctx, self.template, self.user_creds)
parent_stack2 = create_stack(self.ctx, self.template, self.user_creds)
values = [
{'owner_id': parent_stack1.id},
{'owner_id': parent_stack1.id},
{'owner_id': parent_stack2.id},
{'owner_id': parent_stack2.id},
]
[create_stack(self.ctx, self.template, self.user_creds,
**val) for val in values]
stack1_children = db_api.stack_get_all_by_owner_id(self.ctx,
parent_stack1.id)
self.assertEqual(2, len(stack1_children))
stack2_children = db_api.stack_get_all_by_owner_id(self.ctx,
parent_stack2.id)
self.assertEqual(2, len(stack2_children))
def test_stack_get_all_by_root_owner_id(self):
parent_stack1 = create_stack(self.ctx, self.template, self.user_creds)
parent_stack2 = create_stack(self.ctx, self.template, self.user_creds)
for i in range(3):
lvl1_st = create_stack(self.ctx, self.template, self.user_creds,
owner_id=parent_stack1.id)
for j in range(2):
create_stack(self.ctx, self.template, self.user_creds,
owner_id=lvl1_st.id)
for i in range(2):
lvl1_st = create_stack(self.ctx, self.template, self.user_creds,
owner_id=parent_stack2.id)
for j in range(4):
lvl2_st = create_stack(self.ctx, self.template,
self.user_creds, owner_id=lvl1_st.id)
for k in range(3):
create_stack(self.ctx, self.template,
self.user_creds, owner_id=lvl2_st.id)
stack1_children = db_api.stack_get_all_by_root_owner_id(
self.ctx,
parent_stack1.id)
# 3 stacks on the first level + 6 stack on the second
self.assertEqual(9, len(list(stack1_children)))
stack2_children = db_api.stack_get_all_by_root_owner_id(
self.ctx,
parent_stack2.id)
# 2 + 8 + 24
self.assertEqual(34, len(list(stack2_children)))
def test_stack_get_all_with_regular_tenant(self):
values = [
{'tenant': UUID1},
{'tenant': UUID1},
{'tenant': UUID2},
{'tenant': UUID2},
{'tenant': UUID2},
]
[create_stack(self.ctx, self.template, self.user_creds,
**val) for val in values]
self.ctx.tenant = UUID1
stacks = db_api.stack_get_all(self.ctx)
self.assertEqual(2, len(stacks))
self.ctx.tenant = UUID2
stacks = db_api.stack_get_all(self.ctx)
self.assertEqual(3, len(stacks))
self.ctx.tenant = UUID3
self.assertEqual([], db_api.stack_get_all(self.ctx))
def test_stack_get_all_with_admin_context(self):
values = [
{'tenant': UUID1},
{'tenant': UUID1},
{'tenant': UUID2},
{'tenant': UUID2},
{'tenant': UUID2},
]
[create_stack(self.ctx, self.template, self.user_creds,
**val) for val in values]
admin_ctx = utils.dummy_context(user='admin_user',
tenant_id='admin_tenant',
is_admin=True)
stacks = db_api.stack_get_all(admin_ctx)
self.assertEqual(5, len(stacks))
def test_stack_count_all_with_regular_tenant(self):
values = [
{'tenant': UUID1},
{'tenant': UUID1},
{'tenant': UUID2},
{'tenant': UUID2},
{'tenant': UUID2},
]
[create_stack(self.ctx, self.template, self.user_creds,
**val) for val in values]
self.ctx.tenant = UUID1
self.assertEqual(2, db_api.stack_count_all(self.ctx))
self.ctx.tenant = UUID2
self.assertEqual(3, db_api.stack_count_all(self.ctx))
def test_stack_count_all_with_admin_context(self):
values = [
{'tenant': UUID1},
{'tenant': UUID1},
{'tenant': UUID2},
{'tenant': UUID2},
{'tenant': UUID2},
]
[create_stack(self.ctx, self.template, self.user_creds,
**val) for val in values]
admin_ctx = utils.dummy_context(user='admin_user',
tenant_id='admin_tenant',
is_admin=True)
self.assertEqual(5, db_api.stack_count_all(admin_ctx))
def test_purge_deleted(self):
now = timeutils.utcnow()
delta = datetime.timedelta(seconds=3600 * 7)
deleted = [now - delta * i for i in range(1, 6)]
tmpl_files = [template_files.TemplateFiles(
{'foo': 'file contents %d' % i}) for i in range(5)]
[tmpl_file.store(self.ctx) for tmpl_file in tmpl_files]
templates = [create_raw_template(self.ctx,
files_id=tmpl_files[i].files_id
) for i in range(5)]
creds = [create_user_creds(self.ctx) for i in range(5)]
stacks = [create_stack(self.ctx, templates[i], creds[i],
deleted_at=deleted[i]) for i in range(5)]
resources = [create_resource(self.ctx, stacks[i]) for i in range(5)]
events = [create_event(self.ctx, stack_id=stacks[i].id)
for i in range(5)]
db_api.purge_deleted(age=1, granularity='days')
admin_ctx = utils.dummy_context(is_admin=True)
self._deleted_stack_existance(admin_ctx, stacks, resources,
events, tmpl_files, (0, 1, 2), (3, 4))
db_api.purge_deleted(age=22, granularity='hours')
self._deleted_stack_existance(admin_ctx, stacks, resources,
events, tmpl_files, (0, 1, 2), (3, 4))
db_api.purge_deleted(age=1100, granularity='minutes')
self._deleted_stack_existance(admin_ctx, stacks, resources,
events, tmpl_files, (0, 1), (2, 3, 4))
db_api.purge_deleted(age=3600, granularity='seconds')
self._deleted_stack_existance(admin_ctx, stacks, resources,
events, tmpl_files, (), (0, 1, 2, 3, 4))
# test wrong age
self.assertRaises(exception.Error, db_api.purge_deleted, -1, 'seconds')
def test_purge_project_deleted(self):
now = timeutils.utcnow()
delta = datetime.timedelta(seconds=3600 * 7)
deleted = [now - delta * i for i in range(1, 6)]
tmpl_files = [template_files.TemplateFiles(
{'foo': 'file contents %d' % i}) for i in range(5)]
[tmpl_file.store(self.ctx) for tmpl_file in tmpl_files]
templates = [create_raw_template(self.ctx,
files_id=tmpl_files[i].files_id
) for i in range(5)]
values = [
{'tenant': UUID1},
{'tenant': UUID1},
{'tenant': UUID1},
{'tenant': UUID2},
{'tenant': UUID2},
]
creds = [create_user_creds(self.ctx) for i in range(5)]
stacks = [create_stack(self.ctx, templates[i], creds[i],
deleted_at=deleted[i], **values[i]
) for i in range(5)]
resources = [create_resource(self.ctx, stacks[i]) for i in range(5)]
events = [create_event(self.ctx, stack_id=stacks[i].id)
for i in range(5)]
db_api.purge_deleted(age=1, granularity='days', project_id=UUID1)
admin_ctx = utils.dummy_context(is_admin=True)
self._deleted_stack_existance(admin_ctx, stacks, resources,
events, tmpl_files, (0, 1, 2, 3, 4), ())
db_api.purge_deleted(age=22, granularity='hours', project_id=UUID1)
self._deleted_stack_existance(admin_ctx, stacks, resources,
events, tmpl_files, (0, 1, 2, 3, 4), ())
db_api.purge_deleted(age=1100, granularity='minutes', project_id=UUID1)
self._deleted_stack_existance(admin_ctx, stacks, resources,
events, tmpl_files, (0, 1, 3, 4), (2,))