Adding the start of notifications

* Adds a nofity mixin for the taskmanager models
* Adding a usage verifier test
* Adding tests for usage notifications
* Adding extra parameters to match Nova notifications

Implements: blueprint reddwarf-notifications
Change-Id: I61193c238fac6e14d360f3e17baeb5302aa7d720
This commit is contained in:
Robert Myers 2013-04-09 08:32:01 -05:00
parent c6b2d78e82
commit 4a06e08c05
10 changed files with 346 additions and 26 deletions

View File

@ -63,6 +63,9 @@ agent_call_high_timeout = 150
# Whether to use nova's contrib api for create server with volume
use_nova_server_volume = False
# usage notifications
notification_driver = reddwarf.tests.util.usage
# ============ notifer queue kombu connection options ========================
notifier_queue_hostname = localhost

View File

@ -94,6 +94,9 @@ dns_time_out = 120
resize_time_out = 120
revert_time_out = 120
# usage notifications
notification_driver = reddwarf.tests.util.usage
# ============ notifer queue kombu connection options ========================
notifier_queue_hostname = localhost

View File

@ -114,6 +114,12 @@ common_opts = [
cfg.BoolOpt('reddwarf_security_groups_support', default=True),
cfg.StrOpt('reddwarf_security_group_rule_protocol', default='tcp'),
cfg.IntOpt('reddwarf_security_group_rule_port', default=3306),
cfg.IntOpt('usage_sleep_time', default=1,
help="Time to sleep during the check active guest"),
cfg.IntOpt('usage_timeout', default=300,
help="Timeout to wait for an guest to become active"),
cfg.StrOpt('region', default='LOCAL_DEV',
help="The region this service is located.")
]

View File

@ -43,6 +43,8 @@ from reddwarf.instance.tasks import InstanceTasks
from reddwarf.instance.views import get_ip_address
from reddwarf.openstack.common import log as logging
from reddwarf.openstack.common.gettextutils import _
from reddwarf.openstack.common.notifier import api as notifier
from reddwarf.openstack.common import timeutils
LOG = logging.getLogger(__name__)
@ -51,11 +53,64 @@ VOLUME_TIME_OUT = CONF.volume_time_out # seconds.
DNS_TIME_OUT = CONF.dns_time_out # seconds.
RESIZE_TIME_OUT = CONF.resize_time_out # seconds.
REVERT_TIME_OUT = CONF.revert_time_out # seconds.
USAGE_SLEEP_TIME = CONF.usage_sleep_time # seconds.
USAGE_TIMEOUT = CONF.usage_timeout # seconds.
use_nova_server_volume = CONF.use_nova_server_volume
class FreshInstanceTasks(FreshInstance):
class NotifyMixin(object):
"""Notification Mixin
This adds the ability to send usage events to an Instance object.
"""
def send_usage_event(self, event_type, **kwargs):
event_type = 'reddwarf.instance.%s' % event_type
publisher_id = CONF.host
# Grab the instance size from the kwargs or from the nova client
instance_size = kwargs.pop('instance_size', None)
flavor = self.nova_client.flavors.get(self.flavor_id)
server = kwargs.pop('server', None)
if server is None:
server = self.nova_client.servers.get(self.server_id)
az = getattr(server, 'OS-EXT-AZ:availability_zone', None)
# Default payload
created_time = timeutils.isotime(self.db_info.created)
payload = {
'availability_zone': az,
'created_at': created_time,
'display_name': self.name,
'instance_id': self.id,
'instance_name': self.name,
'instance_size': instance_size or flavor.ram,
'instance_type': flavor.name,
'instance_type_id': flavor.id,
'launched_at': created_time,
'nova_instance_id': self.server_id,
'region': CONF.region,
'state_description': self.status,
'state': self.status,
'tenant_id': self.tenant_id,
'user_id': self.context.user,
}
if CONF.reddwarf_volume_support:
payload.update({
'volume_size': self.volume_size,
'nova_volume_id': self.volume_id
})
# Update payload with all other kwargs
payload.update(kwargs)
LOG.debug('Sending event: %s, %s' % (event_type, payload))
notifier.notify(self.context, publisher_id, event_type, 'INFO',
payload)
class FreshInstanceTasks(FreshInstance, NotifyMixin):
def create_instance(self, flavor_id, flavor_ram, image_id,
databases, users, service_type, volume_size,
@ -88,6 +143,46 @@ class FreshInstanceTasks(FreshInstance):
if not self.db_info.task_status.is_error:
self.update_db(task_status=inst_models.InstanceTasks.NONE)
# Make sure the service becomes active before sending a usage
# record to avoid over billing a customer for an instance that
# fails to build properly.
try:
utils.poll_until(self._service_is_active,
sleep_time=USAGE_SLEEP_TIME,
time_out=USAGE_TIMEOUT)
self.send_usage_event('create', instance_size=flavor_ram)
except PollTimeOut:
LOG.error("Timeout for service changing to active. "
"No usage create-event sent.")
except Exception:
LOG.exception("Error during create-event call.")
def _service_is_active(self):
"""
Check that the database guest is active.
This function is meant to be called with poll_until to check that
the guest is alive before sending a 'create' message. This prevents
over billing a customer for a instance that they can never use.
Returns: boolean if the service is active.
Raises: ReddwarfError if the service is in a failure state.
"""
service = InstanceServiceStatus.find_by(instance_id=self.id)
status = service.get_status()
if status == ServiceStatuses.RUNNING:
return True
elif status not in [ServiceStatuses.NEW,
ServiceStatuses.BUILDING]:
raise ReddwarfError("Service not active, status: %s" % status)
c_id = self.db_info.compute_instance_id
nova_status = self.nova_client.servers.get(c_id).status
if nova_status in [InstanceStatus.ERROR,
InstanceStatus.FAILED]:
raise ReddwarfError("Server not active, status: %s" % nova_status)
return False
def _create_server_volume(self, flavor_id, image_id, security_groups,
service_type, volume_size):
server = None
@ -279,7 +374,7 @@ class FreshInstanceTasks(FreshInstance):
(greenthread.getcurrent(), self.id))
class BuiltInstanceTasks(BuiltInstance):
class BuiltInstanceTasks(BuiltInstance, NotifyMixin):
"""
Performs the various asynchronous instance related tasks.
"""
@ -293,6 +388,8 @@ class BuiltInstanceTasks(BuiltInstance):
return mountpoint
def _delete_resources(self):
server_id = self.db_info.compute_instance_id
old_server = self.nova_client.servers.get(server_id)
try:
self.server.delete()
except Exception as ex:
@ -313,7 +410,6 @@ class BuiltInstanceTasks(BuiltInstance):
def server_is_finished():
try:
server_id = self.db_info.compute_instance_id
server = self.nova_client.servers.get(server_id)
if server.status not in ['SHUTDOWN', 'ACTIVE']:
msg = "Server %s got into ERROR status during delete " \
@ -325,11 +421,16 @@ class BuiltInstanceTasks(BuiltInstance):
poll_until(server_is_finished, sleep_time=2,
time_out=CONF.server_delete_time_out)
self.send_usage_event('delete', deleted_at=timeutils.isotime(),
server=old_server)
def resize_volume(self, new_size):
LOG.debug("%s: Resizing volume for instance: %s to %r GB"
% (greenthread.getcurrent(), self.server.id, new_size))
self.volume_client.volumes.resize(self.volume_id, int(new_size))
old_volume_size = self.volume_size
new_size = int(new_size)
LOG.debug("%s: Resizing volume for instance: %s from %s to %r GB"
% (greenthread.getcurrent(), self.server.id,
old_volume_size, new_size))
self.volume_client.volumes.resize(self.volume_id, new_size)
try:
utils.poll_until(
lambda: self.volume_client.volumes.get(self.volume_id),
@ -340,6 +441,11 @@ class BuiltInstanceTasks(BuiltInstance):
self.update_db(volume_size=volume.size)
self.nova_client.volumes.rescan_server_volume(self.server,
self.volume_id)
self.send_usage_event('modify_volume',
old_volume_size=old_volume_size,
launched_at=timeutils.isotime(),
modify_at=timeutils.isotime(),
volume_size=new_size)
except PollTimeOut as pto:
LOG.error("Timeout trying to rescan or resize the attached volume "
"filesystem for volume: %s" % self.volume_id)
@ -353,7 +459,8 @@ class BuiltInstanceTasks(BuiltInstance):
def resize_flavor(self, new_flavor_id, old_memory_size,
new_memory_size):
action = ResizeAction(self, new_flavor_id, new_memory_size)
action = ResizeAction(self, new_flavor_id,
new_memory_size, old_memory_size)
action.execute()
def migrate(self):
@ -554,8 +661,10 @@ class ResizeActionBase(object):
class ResizeAction(ResizeActionBase):
def __init__(self, instance, new_flavor_id=None, new_memory_size=None):
def __init__(self, instance, new_flavor_id=None,
new_memory_size=None, old_memory_size=None):
self.instance = instance
self.old_memory_size = old_memory_size
self.new_flavor_id = new_flavor_id
self.new_memory_size = new_memory_size
@ -573,6 +682,11 @@ class ResizeAction(ResizeActionBase):
LOG.debug("Updating instance %s to flavor_id %s."
% (self.instance.id, self.new_flavor_id))
self.instance.update_db(flavor_id=self.new_flavor_id)
self.instance.send_usage_event('modify_flavor',
old_instance_size=self.old_memory_size,
instance_size=self.new_memory_size,
launched_at=timeutils.isotime(),
modify_at=timeutils.isotime())
def _start_mysql(self):
self.instance.guest.start_db_with_conf_changes(self.new_memory_size)

View File

@ -34,9 +34,6 @@ GROUP_DATABASES = "dbaas.api.databases"
GROUP_SECURITY_GROUPS = "dbaas.api.security_groups"
from datetime import datetime
from nose.plugins.skip import SkipTest
from nose.tools import assert_true
from time import sleep
from reddwarf.common import exception as rd_exceptions
@ -46,23 +43,22 @@ from proboscis.decorators import time_out
from proboscis import before_class
from proboscis import after_class
from proboscis import test
from proboscis import SkipTest
from proboscis.asserts import assert_equal
from proboscis.asserts import assert_false
from proboscis.asserts import assert_not_equal
from proboscis.asserts import assert_raises
from proboscis.asserts import assert_is
from proboscis.asserts import assert_is_none
from proboscis.asserts import assert_is_not_none
from proboscis.asserts import assert_is_not
from proboscis.asserts import assert_true
from proboscis.asserts import Check
from proboscis.asserts import fail
from reddwarf.openstack.common import timeutils
from reddwarf import tests
from reddwarf.tests.config import CONFIG
from reddwarf.tests.util import create_client
from reddwarf.tests.util import create_dbaas_client
from reddwarf.tests.util import create_nova_client
from reddwarf.tests.util.usage import create_usage_verifier
from reddwarf.tests.util import iso_time
from reddwarf.tests.util import process
from reddwarf.tests.util.users import Requirements
from reddwarf.tests.util import skip_if_xml
@ -102,6 +98,7 @@ class InstanceTestInfo(object):
self.host_info = None # Host Info before creating instances
self.user_context = None # A regular user context
self.users = None # The users created on the instance.
self.consumer = create_usage_verifier()
def get_address(self):
result = self.dbaas_admin.mgmt.instances.show(self.id)
@ -133,6 +130,11 @@ def create_new_instance():
return existing_instance() is None
@test(groups=['dbaas.usage', 'dbaas.usage.init'])
def clear_messages_off_queue():
instance_info.consumer.clear_events()
@test(groups=[GROUP, GROUP_START, GROUP_START_SIMPLE, 'dbaas.setup'],
depends_on_groups=["services.initialize"])
class InstanceSetup(object):
@ -290,7 +292,6 @@ class CreateInstanceQuotaTest(unittest.TestCase):
self.test_info.volume = {'size': volume_quota + 1}
new_quotas = dbaas_admin.quota.update(self.test_info.user.tenant_id,
quota_dict)
assert_equal(volume_quota, new_quotas['volumes'])
self.test_info.name = "too_large_volume"
@ -876,6 +877,30 @@ class TestInstanceListing(object):
check.volume_mgmt()
@test(depends_on_classes=[WaitForGuestInstallationToFinish],
groups=[GROUP, 'dbaas.usage'])
class TestCreateNotification(object):
"""
Test that the create notification has been sent correctly.
"""
@test
def test_create_notification(self):
expected = {
'instance_size': instance_info.dbaas_flavor.ram,
'tenant_id': instance_info.user.tenant_id,
'instance_id': instance_info.id,
'instance_name': instance_info.name,
'created_at': iso_time(instance_info.initial_result.created),
'launched_at': iso_time(instance_info.initial_result.created),
'region': 'LOCAL_DEV',
'availability_zone': 'nova',
}
instance_info.consumer.check_message(instance_info.id,
'reddwarf.instance.create',
**expected)
@test(depends_on_groups=['dbaas.api.instances.actions'],
groups=[GROUP, tests.INSTANCES, "dbaas.diagnostics"])
class CheckDiagnosticsAfterTests(object):
@ -916,6 +941,7 @@ class DeleteInstance(object):
# Update the report so the logs inside the instance will be saved.
CONFIG.get_report().update()
dbaas.instances.delete(instance_info.id)
instance_info.deleted_at = timeutils.utcnow().isoformat()
attempts = 0
try:
@ -950,6 +976,26 @@ class DeleteInstance(object):
# entries are deleted.
@test(depends_on=[WaitForGuestInstallationToFinish],
runs_after=[DeleteInstance],
groups=[GROUP, GROUP_STOP, 'dbaas.usage'])
class AfterDeleteChecks(object):
@test
def test_instance_delete_event_sent(self):
expected = {
'instance_size': instance_info.dbaas_flavor.ram,
'tenant_id': instance_info.user.tenant_id,
'instance_id': instance_info.id,
'instance_name': instance_info.name,
'created_at': iso_time(instance_info.initial_result.created),
'launched_at': iso_time(instance_info.initial_result.created),
'deleted_at': iso_time(instance_info.deleted_at),
}
instance_info.consumer.check_message(instance_info.id,
'reddwarf.instance.delete',
**expected)
@test(depends_on_classes=[CreateInstance, VerifyGuestStarted,
WaitForGuestInstallationToFinish],
groups=[GROUP, GROUP_START, GROUP_START_SIMPLE],

View File

@ -35,6 +35,7 @@ from reddwarf.tests.util.server_connection import create_server_connection
from reddwarf.tests.util import poll_until
from reddwarf.tests.config import CONFIG
from reddwarf.tests.util import LocalSqlClient
from reddwarf.tests.util import iso_time
from sqlalchemy import create_engine
from sqlalchemy import exc as sqlalchemy_exc
from reddwarf.tests.util.check import TypeCheck
@ -439,8 +440,6 @@ class ResizeInstanceTest(ActionTestBase):
def test_status_changed_to_resize(self):
self.log_current_users()
self.obtain_flavor_ids()
if CONFIG.simulate_events:
raise SkipTest("Cannot simulate this test.")
self.dbaas.instances.resize_instance(
self.instance_id,
self.get_flavor_href(flavor_id=self.expected_new_flavor_id))
@ -457,7 +456,27 @@ class ResizeInstanceTest(ActionTestBase):
def test_instance_returns_to_active_after_resize(self):
self.wait_for_resize()
@test(depends_on=[test_instance_returns_to_active_after_resize])
@test(depends_on=[test_instance_returns_to_active_after_resize,
test_status_changed_to_resize],
groups=["dbaas.usage"])
def test_resize_instance_usage_event_sent(self):
expected = {
'old_instance_size': self.old_dbaas_flavor.ram,
'instance_size': instance_info.dbaas_flavor.ram,
'tenant_id': instance_info.user.tenant_id,
'instance_id': instance_info.id,
'instance_name': instance_info.name,
'created_at': iso_time(instance_info.initial_result.created),
'launched_at': iso_time(self.instance.updated),
'modify_at': iso_time(self.instance.updated),
}
instance_info.consumer.check_message(instance_info.id,
'reddwarf.instance.modify_flavor',
**expected)
@test(depends_on=[test_instance_returns_to_active_after_resize],
runs_after=[test_resize_instance_usage_event_sent])
def resize_should_not_delete_users(self):
"""Resize should not delete users."""
# Resize has an incredibly weird bug where users are deleted after
@ -477,15 +496,11 @@ class ResizeInstanceTest(ActionTestBase):
@test(depends_on=[test_instance_returns_to_active_after_resize],
runs_after=[resize_should_not_delete_users])
def test_make_sure_mysql_is_running_after_resize(self):
if CONFIG.simulate_events:
raise SkipTest("Cannot simulate this test.")
self.ensure_mysql_is_running()
@test(depends_on=[test_instance_returns_to_active_after_resize],
runs_after=[test_make_sure_mysql_is_running_after_resize])
def test_instance_has_new_flavor_after_resize(self):
if CONFIG.simulate_events:
raise SkipTest("Cannot simulate this test.")
actual = self.get_flavor_href(self.instance.flavor['id'])
expected = self.get_flavor_href(flavor_id=self.expected_new_flavor_id)
assert_equal(actual, expected)
@ -493,8 +508,6 @@ class ResizeInstanceTest(ActionTestBase):
@test(depends_on=[test_instance_has_new_flavor_after_resize])
@time_out(TIME_OUT_TIME)
def test_resize_down(self):
if CONFIG.simulate_events:
raise SkipTest("Cannot simulate this test.")
expected_dbaas_flavor = self.expected_dbaas_flavor
self.dbaas.instances.resize_instance(
self.instance_id,
@ -506,6 +519,23 @@ class ResizeInstanceTest(ActionTestBase):
assert_equal(str(self.instance.flavor['id']),
str(self.expected_old_flavor_id))
@test(depends_on=[test_resize_down],
groups=["dbaas.usage"])
def test_resize_instance_down_usage_event_sent(self):
expected = {
'old_instance_size': self.old_dbaas_flavor.ram,
'instance_size': instance_info.dbaas_flavor.ram,
'tenant_id': instance_info.user.tenant_id,
'instance_id': instance_info.id,
'instance_name': instance_info.name,
'created_at': iso_time(instance_info.initial_result.created),
'launched_at': iso_time(self.instance.updated),
'modify_at': iso_time(self.instance.updated),
}
instance_info.consumer.check_message(instance_info.id,
'reddwarf.instance.modify_flavor',
**expected)
@test(groups=[tests.INSTANCES, INSTANCE_GROUP, GROUP,
GROUP + ".resize.instance"],
@ -557,6 +587,24 @@ class ResizeInstanceVolume(object):
instance = instance_info.dbaas.instances.get(instance_info.id)
assert_equal(instance.volume['size'], self.new_volume_size)
@test(depends_on=[test_volume_resize_success], groups=["dbaas.usage"])
def test_resize_volume_usage_event_sent(self):
instance = instance_info.dbaas.instances.get(instance_info.id)
expected = {
'old_volume_size': self.old_volume_size,
'instance_size': instance_info.dbaas_flavor.ram,
'tenant_id': instance_info.user.tenant_id,
'instance_id': instance_info.id,
'instance_name': instance_info.name,
'created_at': iso_time(instance_info.initial_result.created),
'launched_at': iso_time(instance.updated),
'modify_at': iso_time(instance.updated),
'volume_size': self.new_volume_size,
}
instance_info.consumer.check_message(instance_info.id,
'reddwarf.instance.modify_volume',
**expected)
@test
@time_out(300)
def test_volume_resize_success_databases(self):

View File

@ -47,6 +47,10 @@ class FrozenDict(Mapping):
return self.original.__str__()
USAGE_ENDPOINT = os.environ.get("USAGE_ENDPOINT",
"reddwarf.tests.util.usage.UsageVerifier")
class TestConfig(object):
"""
Holds test configuration values which can be accessed as attributes
@ -81,6 +85,7 @@ class TestConfig(object):
"simulate_events": False,
"reddwarf_volume_support": True,
"reddwarf_max_volumes_per_user": 100,
"usage_endpoint": USAGE_ENDPOINT,
}
self._frozen_values = FrozenDict(self._values)
self._users = None

View File

@ -118,6 +118,7 @@ class FakeServer(object):
volume.set_attachment(id)
self.host = FAKE_HOSTS[0]
self.old_host = None
setattr(self, 'OS-EXT-AZ:availability_zone', 'nova')
self._info = {'os:volumes': info_vols}

View File

@ -221,6 +221,17 @@ def unquote_user_host(user_hostname):
return user, host
def iso_time(time_string):
"""Return a iso formated datetime: 2013-04-15T19:50:23Z"""
ts = time_string.replace(' ', 'T')
try:
micro = ts.rindex('.')
ts = ts[:micro]
except ValueError:
pass
return '%sZ' % ts
if CONFIG.simulate_events:
# Without event let, this just calls time.sleep.
def poll_until(retriever, condition=lambda value: value,

View File

@ -0,0 +1,83 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (c) 2013 Rackspace Hosting
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from collections import defaultdict
from proboscis.asserts import Check
from proboscis.asserts import assert_equal
from proboscis.asserts import assert_not_equal
import unittest
from reddwarf.common import utils
from reddwarf.tests.config import CONFIG
from proboscis.dependencies import SkipTest
MESSAGE_QUEUE = defaultdict(list)
def create_usage_verifier():
return utils.import_object(CONFIG.usage_endpoint)
class UsageVerifier(object):
def clear_events(self):
"""Hook that is called to allow endpoints to clean up."""
pass
def check_message(self, resource_id, event_type, **attrs):
messages = self.get_messages(resource_id)
print messages, resource_id
found = None
for message in messages:
if message['event_type'] == event_type:
found = message
assert_not_equal(found, None)
with Check() as check:
for key, value in attrs.iteritems():
check.equal(found[key], value)
def get_messages(self, resource_id, expected_messages=None):
global MESSAGE_QUEUE
import pprint
pprint.pprint(MESSAGE_QUEUE.items())
msgs = MESSAGE_QUEUE.get(resource_id, [])
if expected_messages is not None:
assert_equal(len(msgs), expected_messages)
return msgs
class FakeVerifier(object):
"""This is the default handler in fake mode, it is basically a no-op."""
def clear_event(self):
pass
def check_message(self, *args, **kwargs):
raise SkipTest("Notifications not available")
def get_messages(self, *args, **kwargs):
pass
def notify(context, message):
"""Simple test notify function which saves the messages to global list."""
print('Received Usage Notification: %s' % message)
payload = message.get('payload', None)
payload['event_type'] = message['event_type']
resource_id = payload['instance_id']
global MESSAGE_QUEUE
MESSAGE_QUEUE[resource_id].append(payload)