Isolate unit tests from integration tests data

Reasons:
 - unit tests are depending on integration tests data, such as:
  - datastore and its versions;
 - distribution package maintainers are not able to package Trove since ice house RC1.

Gate behaviour:
 - gate tests are not failing because unit tests are running right after
   fake-mode tests, so sqlite still exist.

Changes:
 - fixing those unit tests that are failing while building Trove package
   (more info see at bug-report).

Change-Id: Ib9a41d68481d32cdd81fc4d7c015a63c4da33bb3
Closes-Bug: #1302784
This commit is contained in:
Denis M 2014-09-25 00:07:05 +03:00 committed by Denis Makogon
parent 2acb6ecd0d
commit 2de5c0f66d
2 changed files with 234 additions and 232 deletions

View File

@ -13,12 +13,16 @@
# License for the specific language governing permissions and limitations
# under the License.
#
import uuid
from testtools import TestCase
from trove.common.instance import ServiceStatuses
from trove.datastore import models
from trove.instance.models import InstanceStatus
from trove.instance.models import InstanceServiceStatus
from trove.instance.models import SimpleInstance
from trove.tests.util import test_config
from trove.tests.unittests.util import util
class FakeInstanceTask(object):
@ -31,108 +35,105 @@ class FakeInstanceTask(object):
class FakeDBInstance(object):
def __init__(self):
self.id = None
self.id = str(uuid.uuid4())
self.deleted = False
self.datastore_version_id = test_config.dbaas_datastore_version_id
self.datastore_version_id = str(uuid.uuid4())
self.server_status = "ACTIVE"
self.task_status = FakeInstanceTask()
class InstanceStatusTest(TestCase):
class BaseInstanceStatusTestCase(TestCase):
def setUp(self):
super(InstanceStatusTest, self).setUp()
util.init_db()
self.db_info = FakeDBInstance()
self.status = InstanceServiceStatus(
ServiceStatuses.RUNNING)
self.datastore = models.DBDatastore.create(
id=str(uuid.uuid4()),
name='mysql',
default_version_id=self.db_info.datastore_version_id
)
self.version = models.DBDatastoreVersion.create(
id=self.db_info.datastore_version_id,
datastore_id=self.datastore.id,
name='5.5',
manager='mysql',
image_id=str(uuid.uuid4()),
active=1,
packages="mysql-server-5.5"
)
super(BaseInstanceStatusTestCase, self).setUp()
def tearDown(self):
super(InstanceStatusTest, self).tearDown()
self.datastore.delete()
self.version.delete()
super(BaseInstanceStatusTestCase, self).tearDown()
class InstanceStatusTest(BaseInstanceStatusTestCase):
def test_task_status_error_reports_error(self):
fake_db_info = FakeDBInstance()
fake_status = InstanceServiceStatus(ServiceStatuses.RUNNING)
fake_db_info.task_status.is_error = True
instance = SimpleInstance('dummy context', fake_db_info, fake_status)
self.db_info.task_status.is_error = True
instance = SimpleInstance('dummy context', self.db_info, self.status)
self.assertEqual(InstanceStatus.ERROR, instance.status)
def test_task_status_action_building_reports_build(self):
fake_db_info = FakeDBInstance()
fake_status = InstanceServiceStatus(ServiceStatuses.RUNNING)
fake_db_info.task_status.action = "BUILDING"
instance = SimpleInstance('dummy context', fake_db_info, fake_status)
self.db_info.task_status.action = "BUILDING"
instance = SimpleInstance('dummy context', self.db_info, self.status)
self.assertEqual(InstanceStatus.BUILD, instance.status)
def test_task_status_action_rebooting_reports_reboot(self):
fake_db_info = FakeDBInstance()
fake_status = InstanceServiceStatus(ServiceStatuses.RUNNING)
fake_db_info.task_status.action = "REBOOTING"
instance = SimpleInstance('dummy context', fake_db_info, fake_status)
self.db_info.task_status.action = "REBOOTING"
instance = SimpleInstance('dummy context', self.db_info, self.status)
self.assertEqual(InstanceStatus.REBOOT, instance.status)
def test_task_status_action_resizing_reports_resize(self):
fake_db_info = FakeDBInstance()
fake_status = InstanceServiceStatus(ServiceStatuses.RUNNING)
fake_db_info.task_status.action = "RESIZING"
instance = SimpleInstance('dummy context', fake_db_info, fake_status)
self.db_info.task_status.action = "RESIZING"
instance = SimpleInstance('dummy context', self.db_info, self.status)
self.assertEqual(InstanceStatus.RESIZE, instance.status)
def test_task_status_action_deleting_reports_shutdown(self):
fake_db_info = FakeDBInstance()
fake_status = InstanceServiceStatus(ServiceStatuses.RUNNING)
fake_db_info.task_status.action = "DELETING"
instance = SimpleInstance('dummy context', fake_db_info, fake_status)
self.db_info.task_status.action = "DELETING"
instance = SimpleInstance('dummy context', self.db_info, self.status)
self.assertEqual(InstanceStatus.SHUTDOWN, instance.status)
def test_nova_server_build_reports_build(self):
fake_db_info = FakeDBInstance()
fake_status = InstanceServiceStatus(ServiceStatuses.RUNNING)
fake_db_info.server_status = "BUILD"
instance = SimpleInstance('dummy context', fake_db_info, fake_status)
self.db_info.server_status = "BUILD"
instance = SimpleInstance('dummy context', self.db_info, self.status)
self.assertEqual(InstanceStatus.BUILD, instance.status)
def test_nova_server_error_reports_error(self):
fake_db_info = FakeDBInstance()
fake_status = InstanceServiceStatus(ServiceStatuses.RUNNING)
fake_db_info.server_status = "ERROR"
instance = SimpleInstance('dummy context', fake_db_info, fake_status)
self.db_info.server_status = "ERROR"
instance = SimpleInstance('dummy context', self.db_info, self.status)
self.assertEqual(InstanceStatus.ERROR, instance.status)
def test_nova_server_reboot_reports_reboot(self):
fake_db_info = FakeDBInstance()
fake_status = InstanceServiceStatus(ServiceStatuses.RUNNING)
fake_db_info.server_status = "REBOOT"
instance = SimpleInstance('dummy context', fake_db_info, fake_status)
self.db_info.server_status = "REBOOT"
instance = SimpleInstance('dummy context', self.db_info, self.status)
self.assertEqual(InstanceStatus.REBOOT, instance.status)
def test_nova_server_resize_reports_resize(self):
fake_db_info = FakeDBInstance()
fake_status = InstanceServiceStatus(ServiceStatuses.RUNNING)
fake_db_info.server_status = "RESIZE"
instance = SimpleInstance('dummy context', fake_db_info, fake_status)
self.db_info.server_status = "RESIZE"
instance = SimpleInstance('dummy context', self.db_info, self.status)
self.assertEqual(InstanceStatus.RESIZE, instance.status)
def test_nova_server_verify_resize_reports_resize(self):
fake_db_info = FakeDBInstance()
fake_status = InstanceServiceStatus(ServiceStatuses.RUNNING)
fake_db_info.server_status = "VERIFY_RESIZE"
instance = SimpleInstance('dummy context', fake_db_info, fake_status)
self.db_info.server_status = "VERIFY_RESIZE"
instance = SimpleInstance('dummy context', self.db_info, self.status)
self.assertEqual(InstanceStatus.RESIZE, instance.status)
def test_service_status_paused_reports_reboot(self):
fake_db_info = FakeDBInstance()
fake_status = InstanceServiceStatus(ServiceStatuses.RUNNING)
fake_status.set_status(ServiceStatuses.PAUSED)
instance = SimpleInstance('dummy context', fake_db_info, fake_status)
self.status.set_status(ServiceStatuses.PAUSED)
instance = SimpleInstance('dummy context', self.db_info, self.status)
self.assertEqual(InstanceStatus.REBOOT, instance.status)
def test_service_status_new_reports_build(self):
fake_db_info = FakeDBInstance()
fake_status = InstanceServiceStatus(ServiceStatuses.RUNNING)
fake_status.set_status(ServiceStatuses.NEW)
instance = SimpleInstance('dummy context', fake_db_info, fake_status)
self.status.set_status(ServiceStatuses.NEW)
instance = SimpleInstance('dummy context', self.db_info, self.status)
self.assertEqual(InstanceStatus.BUILD, instance.status)
def test_service_status_running_reports_active(self):
fake_db_info = FakeDBInstance()
fake_status = InstanceServiceStatus(ServiceStatuses.RUNNING)
fake_status.set_status(ServiceStatuses.RUNNING)
instance = SimpleInstance('dummy context', fake_db_info, fake_status)
self.status.set_status(ServiceStatuses.RUNNING)
instance = SimpleInstance('dummy context', self.db_info, self.status)
self.assertEqual(InstanceStatus.ACTIVE, instance.status)

View File

@ -13,6 +13,7 @@
# License for the specific language governing permissions and limitations
# under the License.
#
import uuid
from mock import MagicMock, patch, ANY
from testtools import TestCase
from testtools.matchers import Equals, Is, Not
@ -25,21 +26,40 @@ from trove.backup.models import Backup
from trove.common.context import TroveContext
from trove.common import instance as rd_instance
from trove.datastore import models as datastore_models
from trove.db.models import DatabaseModelBase
from trove.instance.models import DBInstance
from trove.instance.models import InstanceServiceStatus
from trove.instance.tasks import InstanceTasks
import trove.extensions.mgmt.instances.models as mgmtmodels
from trove.openstack.common.notifier import api as notifier
from trove.common import remote
from trove.tests.util import test_config
from trove.tests.unittests.util import util
CONF = cfg.CONF
class MockMgmtInstanceTest(TestCase):
@classmethod
def setUpClass(cls):
util.init_db()
cls.version_id = str(uuid.uuid4())
cls.datastore = datastore_models.DBDatastore.create(
id=str(uuid.uuid4()),
name='mysql',
default_version_id=cls.version_id
)
cls.version = datastore_models.DBDatastoreVersion.create(
id=cls.version_id,
datastore_id=cls.datastore.id,
name='5.5',
manager='mysql',
image_id=str(uuid.uuid4()),
active=1,
packages="mysql-server-5.5"
)
super(MockMgmtInstanceTest, cls).setUpClass()
def setUp(self):
super(MockMgmtInstanceTest, self).setUp()
self.context = TroveContext()
self.context.auth_token = 'some_secret_password'
self.client = MagicMock(spec=Client)
@ -53,63 +73,59 @@ class MockMgmtInstanceTest(TestCase):
CONF.set_override('report_interval', 20)
CONF.set_override('notification_service_id', {'mysql': '123'})
def tearDown(self):
super(MockMgmtInstanceTest, self).tearDown()
super(MockMgmtInstanceTest, self).setUp()
@staticmethod
def build_db_instance(status, task_status=InstanceTasks.DELETING):
return DBInstance(task_status,
created='xyz',
name='test_name',
id='1',
flavor_id='flavor_1',
datastore_version_id=
test_config.dbaas_datastore_version_id,
compute_instance_id='compute_id_1',
server_id='server_id_1',
tenant_id='tenant_id_1',
server_status=status)
def do_cleanup(self, instance, status):
instance.delete()
status.delete()
def build_db_instance(self, status, task_status=InstanceTasks.NONE):
version = datastore_models.DBDatastoreVersion.get_by(name='5.5')
instance = DBInstance(InstanceTasks.NONE,
name='test_name',
id=str(uuid.uuid4()),
flavor_id='flavor_1',
datastore_version_id=
version.id,
compute_instance_id='compute_id_1',
server_id='server_id_1',
tenant_id='tenant_id_1',
server_status=
rd_instance.ServiceStatuses.
BUILDING.api_status,
deleted=False)
instance.save()
service_status = InstanceServiceStatus(
rd_instance.ServiceStatuses.RUNNING,
id=str(uuid.uuid4()),
instance_id=instance.id,
)
service_status.save()
instance.set_task_status(task_status)
instance.server_status = status
instance.save()
return instance, service_status
class TestNotificationTransformer(MockMgmtInstanceTest):
@classmethod
def setUpClass(cls):
super(TestNotificationTransformer, cls).setUpClass()
def test_tranformer(self):
transformer = mgmtmodels.NotificationTransformer(context=self.context)
status = rd_instance.ServiceStatuses.BUILDING.api_status
db_instance = MockMgmtInstanceTest.build_db_instance(
instance, service_status = self.build_db_instance(
status, InstanceTasks.BUILDING)
with patch.object(DatabaseModelBase, 'find_all',
return_value=[db_instance]):
stub_dsv_db_info = MagicMock(
spec=datastore_models.DBDatastoreVersion)
stub_dsv_db_info.id = "test_datastore_version"
stub_dsv_db_info.datastore_id = "mysql_test_version"
stub_dsv_db_info.name = "test_datastore_name"
stub_dsv_db_info.image_id = "test_datastore_image_id"
stub_dsv_db_info.packages = "test_datastore_pacakges"
stub_dsv_db_info.active = 1
stub_dsv_db_info.manager = "mysql"
stub_datastore_version = datastore_models.DatastoreVersion(
stub_dsv_db_info)
def side_effect_func(*args, **kwargs):
if 'instance_id' in kwargs:
return InstanceServiceStatus(
rd_instance.ServiceStatuses.BUILDING)
else:
return stub_datastore_version
with patch.object(DatabaseModelBase, 'find_by',
side_effect=side_effect_func):
payloads = transformer()
self.assertIsNotNone(payloads)
self.assertThat(len(payloads), Equals(1))
payload = payloads[0]
self.assertThat(payload['audit_period_beginning'],
Not(Is(None)))
self.assertThat(payload['audit_period_ending'], Not(Is(None)))
self.assertThat(payload['state'], Equals(status.lower()))
payloads = mgmtmodels.NotificationTransformer(
context=self.context)()
self.assertIsNotNone(payloads)
payload = payloads[0]
self.assertThat(payload['audit_period_beginning'],
Not(Is(None)))
self.assertThat(payload['audit_period_ending'], Not(Is(None)))
self.assertTrue(status.lower() in [db['state'] for db in payloads])
self.addCleanup(self.do_cleanup, instance, service_status)
def test_get_service_id(self):
id_map = {
@ -131,6 +147,11 @@ class TestNotificationTransformer(MockMgmtInstanceTest):
class TestNovaNotificationTransformer(MockMgmtInstanceTest):
@classmethod
def setUpClass(cls):
super(TestNovaNotificationTransformer, cls).setUpClass()
def test_transformer_cache(self):
flavor = MagicMock(spec=Flavor)
flavor.name = 'db.small'
@ -145,9 +166,9 @@ class TestNovaNotificationTransformer(MockMgmtInstanceTest):
def test_lookup_flavor(self):
flavor = MagicMock(spec=Flavor)
flavor.name = 'flav_1'
transformer = mgmtmodels.NovaNotificationTransformer(
context=self.context)
with patch.object(self.flavor_mgr, 'get', side_effect=[flavor, None]):
transformer = mgmtmodels.NovaNotificationTransformer(
context=self.context)
self.assertThat(transformer._lookup_flavor('1'),
Equals(flavor.name))
self.assertThat(transformer._lookup_flavor('2'),
@ -155,188 +176,160 @@ class TestNovaNotificationTransformer(MockMgmtInstanceTest):
def test_tranformer(self):
status = rd_instance.ServiceStatuses.BUILDING.api_status
db_instance = MockMgmtInstanceTest.build_db_instance(
status, task_status=InstanceTasks.BUILDING)
stub_dsv_db_info = MagicMock(spec=datastore_models.DBDatastoreVersion)
stub_dsv_db_info.id = "test_datastore_version"
stub_dsv_db_info.datastore_id = "mysql_test_version"
stub_dsv_db_info.name = "test_datastore_name"
stub_dsv_db_info.image_id = "test_datastore_image_id"
stub_dsv_db_info.packages = "test_datastore_pacakges"
stub_dsv_db_info.active = 1
stub_dsv_db_info.manager = "mysql"
stub_datastore_version = datastore_models.DatastoreVersion(
stub_dsv_db_info)
instance, service_status = self.build_db_instance(
status, InstanceTasks.BUILDING)
flavor = MagicMock(spec=Flavor)
flavor.name = 'db.small'
server = MagicMock(spec=Server)
server.user_id = 'test_user_id'
transformer = mgmtmodels.NovaNotificationTransformer(
context=self.context)
mgmt_instance = mgmtmodels.SimpleMgmtInstance(self.context,
db_instance,
instance,
server,
None)
service_status)
with patch.object(DatabaseModelBase, 'find_by',
return_value=stub_datastore_version):
with patch.object(mgmtmodels, 'load_mgmt_instances',
return_value=[mgmt_instance]):
with patch.object(self.flavor_mgr, 'get', return_value=flavor):
with patch.object(mgmtmodels, 'load_mgmt_instances',
return_value=[mgmt_instance]):
payloads = transformer()
with patch.object(self.flavor_mgr, 'get', return_value=flavor):
# invocation
transformer = mgmtmodels.NovaNotificationTransformer(
context=self.context)
payloads = transformer()
# assertions
self.assertIsNotNone(payloads)
self.assertThat(len(payloads), Equals(1))
payload = payloads[0]
self.assertThat(payload['audit_period_beginning'],
Not(Is(None)))
self.assertThat(payload['audit_period_ending'],
Not(Is(None)))
self.assertThat(payload['state'], Equals(status.lower()))
self.assertThat(payload['instance_type'],
Equals('db.small'))
self.assertThat(payload['instance_type_id'],
Equals('flavor_1'))
self.assertThat(payload['user_id'], Equals('test_user_id'))
self.assertThat(payload['service_id'], Equals('123'))
self.assertIsNotNone(payloads)
payload = payloads[0]
self.assertThat(payload['audit_period_beginning'],
Not(Is(None)))
self.assertThat(payload['audit_period_ending'],
Not(Is(None)))
self.assertThat(payload['state'], Not(Is(None)))
self.assertThat(payload['instance_type'],
Equals('db.small'))
self.assertThat(payload['instance_type_id'],
Equals('flavor_1'))
self.assertThat(payload['user_id'], Equals('test_user_id'))
self.assertThat(payload['service_id'], Equals('123'))
self.addCleanup(self.do_cleanup, instance, service_status)
def test_tranformer_invalid_datastore_manager(self):
status = rd_instance.ServiceStatuses.BUILDING.api_status
db_instance = MockMgmtInstanceTest.build_db_instance(
status, task_status=InstanceTasks.BUILDING)
instance, service_status = self.build_db_instance(
status, InstanceTasks.BUILDING)
version = datastore_models.DBDatastoreVersion.get_by(
id=instance.datastore_version_id)
version.update(manager='something invalid')
server = MagicMock(spec=Server)
server.user_id = 'test_user_id'
stub_datastore_version = MagicMock()
stub_datastore_version.id = "stub_datastore_version"
stub_datastore_version.manager = "m0ng0"
stub_datastore = MagicMock()
stub_datastore.default_datastore_version = "stub_datastore_version"
flavor = MagicMock(spec=Flavor)
flavor.name = 'db.small'
with patch.object(datastore_models.DatastoreVersion, 'load',
return_value=stub_datastore_version):
with patch.object(datastore_models.DatastoreVersion,
'load_by_uuid',
return_value=stub_datastore_version):
with patch.object(datastore_models.Datastore, 'load',
return_value=stub_datastore):
mgmt_instance = mgmtmodels.SimpleMgmtInstance(self.context,
db_instance,
server,
None)
with patch.object(mgmtmodels, 'load_mgmt_instances',
return_value=[mgmt_instance]):
with patch.object(self.flavor_mgr,
'get', return_value=flavor):
# invocation
transformer = (
mgmtmodels.NovaNotificationTransformer(
context=self.context)
)
payloads = transformer()
# assertions
self.assertIsNotNone(payloads)
self.assertThat(len(payloads), Equals(1))
payload = payloads[0]
self.assertThat(payload['audit_period_beginning'],
Not(Is(None)))
self.assertThat(payload['audit_period_ending'],
Not(Is(None)))
self.assertThat(payload['state'],
Equals(status.lower()))
self.assertThat(payload['instance_type'],
Equals('db.small'))
self.assertThat(payload['instance_type_id'],
Equals('flavor_1'))
self.assertThat(payload['user_id'],
Equals('test_user_id'))
self.assertThat(payload['service_id'],
Equals('unknown-service-id-error'))
mgmt_instance = mgmtmodels.SimpleMgmtInstance(self.context,
instance,
server,
service_status)
transformer = mgmtmodels.NovaNotificationTransformer(
context=self.context)
with patch.object(mgmtmodels, 'load_mgmt_instances',
return_value=[mgmt_instance]):
with patch.object(self.flavor_mgr,
'get', return_value=flavor):
payloads = transformer()
# assertions
self.assertIsNotNone(payloads)
payload = payloads[0]
self.assertThat(payload['audit_period_beginning'],
Not(Is(None)))
self.assertThat(payload['audit_period_ending'],
Not(Is(None)))
self.assertIn(status.lower(),
[db['state']
for db in payloads])
self.assertThat(payload['instance_type'],
Equals('db.small'))
self.assertThat(payload['instance_type_id'],
Equals('flavor_1'))
self.assertThat(payload['user_id'],
Equals('test_user_id'))
self.assertThat(payload['service_id'],
Equals('unknown-service-id-error'))
version.update(manager='mysql')
self.addCleanup(self.do_cleanup, instance, service_status)
def test_tranformer_shutdown_instance(self):
status = rd_instance.ServiceStatuses.SHUTDOWN.api_status
db_instance = self.build_db_instance(status)
instance, service_status = self.build_db_instance(status)
service_status.set_status(rd_instance.ServiceStatuses.SHUTDOWN)
server = MagicMock(spec=Server)
server.user_id = 'test_user_id'
mgmt_instance = mgmtmodels.SimpleMgmtInstance(self.context,
db_instance,
instance,
server,
None)
service_status)
flavor = MagicMock(spec=Flavor)
flavor.name = 'db.small'
transformer = mgmtmodels.NovaNotificationTransformer(
context=self.context)
with patch.object(Backup, 'running', return_value=None):
self.assertThat(mgmt_instance.status, Equals('SHUTDOWN'))
with patch.object(mgmtmodels, 'load_mgmt_instances',
return_value=[mgmt_instance]):
with patch.object(self.flavor_mgr, 'get', return_value=flavor):
# invocation
transformer = mgmtmodels.NovaNotificationTransformer(
context=self.context)
payloads = transformer()
# assertion that SHUTDOWN instances are not reported
self.assertIsNotNone(payloads)
self.assertThat(len(payloads), Equals(0))
self.assertNotIn(status.lower(),
[db['status']
for db in payloads])
self.addCleanup(self.do_cleanup, instance, service_status)
def test_tranformer_no_nova_instance(self):
status = rd_instance.ServiceStatuses.SHUTDOWN.api_status
db_instance = MockMgmtInstanceTest.build_db_instance(status)
instance, service_status = self.build_db_instance(status)
service_status.set_status(rd_instance.ServiceStatuses.SHUTDOWN)
mgmt_instance = mgmtmodels.SimpleMgmtInstance(self.context,
db_instance,
instance,
None,
None)
service_status)
flavor = MagicMock(spec=Flavor)
flavor.name = 'db.small'
transformer = mgmtmodels.NovaNotificationTransformer(
context=self.context)
with patch.object(Backup, 'running', return_value=None):
self.assertThat(mgmt_instance.status, Equals('SHUTDOWN'))
with patch.object(mgmtmodels, 'load_mgmt_instances',
return_value=[mgmt_instance]):
with patch.object(self.flavor_mgr, 'get', return_value=flavor):
# invocation
transformer = mgmtmodels.NovaNotificationTransformer(
context=self.context)
payloads = transformer()
# assertion that SHUTDOWN instances are not reported
self.assertIsNotNone(payloads)
self.assertThat(len(payloads), Equals(0))
self.assertNotIn(status.lower(),
[db['status']
for db in payloads])
self.addCleanup(self.do_cleanup, instance, service_status)
def test_tranformer_flavor_cache(self):
status = rd_instance.ServiceStatuses.BUILDING.api_status
db_instance = MockMgmtInstanceTest.build_db_instance(
instance, service_status = self.build_db_instance(
status, InstanceTasks.BUILDING)
server = MagicMock(spec=Server)
server.user_id = 'test_user_id'
mgmt_instance = mgmtmodels.SimpleMgmtInstance(self.context,
db_instance,
instance,
server,
None)
service_status)
flavor = MagicMock(spec=Flavor)
flavor.name = 'db.small'
transformer = mgmtmodels.NovaNotificationTransformer(
context=self.context)
with patch.object(mgmtmodels, 'load_mgmt_instances',
return_value=[mgmt_instance]):
with patch.object(self.flavor_mgr, 'get', return_value=flavor):
transformer = mgmtmodels.NovaNotificationTransformer(
context=self.context)
transformer()
# call twice ensure client.flavor invoked once
payloads = transformer()
self.assertIsNotNone(payloads)
self.assertThat(len(payloads), Equals(1))
@ -344,27 +337,34 @@ class TestNovaNotificationTransformer(MockMgmtInstanceTest):
self.assertThat(payload['audit_period_beginning'],
Not(Is(None)))
self.assertThat(payload['audit_period_ending'], Not(Is(None)))
self.assertThat(payload['state'], Equals(status.lower()))
self.assertIn(status.lower(),
[db['state']
for db in payloads])
self.assertThat(payload['instance_type'], Equals('db.small'))
self.assertThat(payload['instance_type_id'],
Equals('flavor_1'))
self.assertThat(payload['user_id'], Equals('test_user_id'))
# ensure cache was used to get flavor second time
self.flavor_mgr.get.assert_any_call('flavor_1')
self.addCleanup(self.do_cleanup, instance, service_status)
class TestMgmtInstanceTasks(MockMgmtInstanceTest):
@classmethod
def setUpClass(cls):
super(TestMgmtInstanceTasks, cls).setUpClass()
def test_public_exists_events(self):
status = rd_instance.ServiceStatuses.BUILDING.api_status
db_instance = MockMgmtInstanceTest.build_db_instance(
instance, service_status = self.build_db_instance(
status, task_status=InstanceTasks.BUILDING)
server = MagicMock(spec=Server)
server.user_id = 'test_user_id'
mgmt_instance = mgmtmodels.SimpleMgmtInstance(self.context,
db_instance,
instance,
server,
None)
service_status)
flavor = MagicMock(spec=Flavor)
flavor.name = 'db.small'
@ -387,3 +387,4 @@ class TestMgmtInstanceTasks(MockMgmtInstanceTest):
'INFO',
ANY)
self.assertThat(self.context.auth_token, Is(None))
self.addCleanup(self.do_cleanup, instance, service_status)