From 4a06e08c051f0ba05206295de6f9f795f461af4e Mon Sep 17 00:00:00 2001 From: Robert Myers Date: Tue, 9 Apr 2013 08:32:01 -0500 Subject: [PATCH] Adding the start of notifications * Adds a nofity mixin for the taskmanager models * Adding a usage verifier test * Adding tests for usage notifications * Adding extra parameters to match Nova notifications Implements: blueprint reddwarf-notifications Change-Id: I61193c238fac6e14d360f3e17baeb5302aa7d720 --- etc/reddwarf/reddwarf-taskmanager.conf.sample | 3 + etc/reddwarf/reddwarf.conf.test | 3 + reddwarf/common/cfg.py | 6 + reddwarf/taskmanager/models.py | 130 ++++++++++++++++-- reddwarf/tests/api/instances.py | 64 +++++++-- reddwarf/tests/api/instances_actions.py | 66 +++++++-- reddwarf/tests/config.py | 5 + reddwarf/tests/fakes/nova.py | 1 + reddwarf/tests/util/__init__.py | 11 ++ reddwarf/tests/util/usage.py | 83 +++++++++++ 10 files changed, 346 insertions(+), 26 deletions(-) create mode 100644 reddwarf/tests/util/usage.py diff --git a/etc/reddwarf/reddwarf-taskmanager.conf.sample b/etc/reddwarf/reddwarf-taskmanager.conf.sample index 1f73665780..b9d858915e 100644 --- a/etc/reddwarf/reddwarf-taskmanager.conf.sample +++ b/etc/reddwarf/reddwarf-taskmanager.conf.sample @@ -63,6 +63,9 @@ agent_call_high_timeout = 150 # Whether to use nova's contrib api for create server with volume use_nova_server_volume = False +# usage notifications +notification_driver = reddwarf.tests.util.usage + # ============ notifer queue kombu connection options ======================== notifier_queue_hostname = localhost diff --git a/etc/reddwarf/reddwarf.conf.test b/etc/reddwarf/reddwarf.conf.test index 3365a47c0d..d105c48d61 100644 --- a/etc/reddwarf/reddwarf.conf.test +++ b/etc/reddwarf/reddwarf.conf.test @@ -94,6 +94,9 @@ dns_time_out = 120 resize_time_out = 120 revert_time_out = 120 +# usage notifications +notification_driver = reddwarf.tests.util.usage + # ============ notifer queue kombu connection options ======================== notifier_queue_hostname = localhost diff --git a/reddwarf/common/cfg.py b/reddwarf/common/cfg.py index ec709b19e8..25a0752619 100644 --- a/reddwarf/common/cfg.py +++ b/reddwarf/common/cfg.py @@ -114,6 +114,12 @@ common_opts = [ cfg.BoolOpt('reddwarf_security_groups_support', default=True), cfg.StrOpt('reddwarf_security_group_rule_protocol', default='tcp'), cfg.IntOpt('reddwarf_security_group_rule_port', default=3306), + cfg.IntOpt('usage_sleep_time', default=1, + help="Time to sleep during the check active guest"), + cfg.IntOpt('usage_timeout', default=300, + help="Timeout to wait for an guest to become active"), + cfg.StrOpt('region', default='LOCAL_DEV', + help="The region this service is located.") ] diff --git a/reddwarf/taskmanager/models.py b/reddwarf/taskmanager/models.py index 3b374bbd72..43656c8951 100644 --- a/reddwarf/taskmanager/models.py +++ b/reddwarf/taskmanager/models.py @@ -43,6 +43,8 @@ from reddwarf.instance.tasks import InstanceTasks from reddwarf.instance.views import get_ip_address from reddwarf.openstack.common import log as logging from reddwarf.openstack.common.gettextutils import _ +from reddwarf.openstack.common.notifier import api as notifier +from reddwarf.openstack.common import timeutils LOG = logging.getLogger(__name__) @@ -51,11 +53,64 @@ VOLUME_TIME_OUT = CONF.volume_time_out # seconds. DNS_TIME_OUT = CONF.dns_time_out # seconds. RESIZE_TIME_OUT = CONF.resize_time_out # seconds. REVERT_TIME_OUT = CONF.revert_time_out # seconds. +USAGE_SLEEP_TIME = CONF.usage_sleep_time # seconds. +USAGE_TIMEOUT = CONF.usage_timeout # seconds. use_nova_server_volume = CONF.use_nova_server_volume -class FreshInstanceTasks(FreshInstance): +class NotifyMixin(object): + """Notification Mixin + + This adds the ability to send usage events to an Instance object. + """ + + def send_usage_event(self, event_type, **kwargs): + event_type = 'reddwarf.instance.%s' % event_type + publisher_id = CONF.host + + # Grab the instance size from the kwargs or from the nova client + instance_size = kwargs.pop('instance_size', None) + flavor = self.nova_client.flavors.get(self.flavor_id) + server = kwargs.pop('server', None) + if server is None: + server = self.nova_client.servers.get(self.server_id) + az = getattr(server, 'OS-EXT-AZ:availability_zone', None) + + # Default payload + created_time = timeutils.isotime(self.db_info.created) + payload = { + 'availability_zone': az, + 'created_at': created_time, + 'display_name': self.name, + 'instance_id': self.id, + 'instance_name': self.name, + 'instance_size': instance_size or flavor.ram, + 'instance_type': flavor.name, + 'instance_type_id': flavor.id, + 'launched_at': created_time, + 'nova_instance_id': self.server_id, + 'region': CONF.region, + 'state_description': self.status, + 'state': self.status, + 'tenant_id': self.tenant_id, + 'user_id': self.context.user, + } + + if CONF.reddwarf_volume_support: + payload.update({ + 'volume_size': self.volume_size, + 'nova_volume_id': self.volume_id + }) + + # Update payload with all other kwargs + payload.update(kwargs) + LOG.debug('Sending event: %s, %s' % (event_type, payload)) + notifier.notify(self.context, publisher_id, event_type, 'INFO', + payload) + + +class FreshInstanceTasks(FreshInstance, NotifyMixin): def create_instance(self, flavor_id, flavor_ram, image_id, databases, users, service_type, volume_size, @@ -88,6 +143,46 @@ class FreshInstanceTasks(FreshInstance): if not self.db_info.task_status.is_error: self.update_db(task_status=inst_models.InstanceTasks.NONE) + # Make sure the service becomes active before sending a usage + # record to avoid over billing a customer for an instance that + # fails to build properly. + try: + utils.poll_until(self._service_is_active, + sleep_time=USAGE_SLEEP_TIME, + time_out=USAGE_TIMEOUT) + self.send_usage_event('create', instance_size=flavor_ram) + except PollTimeOut: + LOG.error("Timeout for service changing to active. " + "No usage create-event sent.") + except Exception: + LOG.exception("Error during create-event call.") + + def _service_is_active(self): + """ + Check that the database guest is active. + + This function is meant to be called with poll_until to check that + the guest is alive before sending a 'create' message. This prevents + over billing a customer for a instance that they can never use. + + Returns: boolean if the service is active. + Raises: ReddwarfError if the service is in a failure state. + """ + service = InstanceServiceStatus.find_by(instance_id=self.id) + status = service.get_status() + if status == ServiceStatuses.RUNNING: + return True + elif status not in [ServiceStatuses.NEW, + ServiceStatuses.BUILDING]: + raise ReddwarfError("Service not active, status: %s" % status) + + c_id = self.db_info.compute_instance_id + nova_status = self.nova_client.servers.get(c_id).status + if nova_status in [InstanceStatus.ERROR, + InstanceStatus.FAILED]: + raise ReddwarfError("Server not active, status: %s" % nova_status) + return False + def _create_server_volume(self, flavor_id, image_id, security_groups, service_type, volume_size): server = None @@ -279,7 +374,7 @@ class FreshInstanceTasks(FreshInstance): (greenthread.getcurrent(), self.id)) -class BuiltInstanceTasks(BuiltInstance): +class BuiltInstanceTasks(BuiltInstance, NotifyMixin): """ Performs the various asynchronous instance related tasks. """ @@ -293,6 +388,8 @@ class BuiltInstanceTasks(BuiltInstance): return mountpoint def _delete_resources(self): + server_id = self.db_info.compute_instance_id + old_server = self.nova_client.servers.get(server_id) try: self.server.delete() except Exception as ex: @@ -313,7 +410,6 @@ class BuiltInstanceTasks(BuiltInstance): def server_is_finished(): try: - server_id = self.db_info.compute_instance_id server = self.nova_client.servers.get(server_id) if server.status not in ['SHUTDOWN', 'ACTIVE']: msg = "Server %s got into ERROR status during delete " \ @@ -325,11 +421,16 @@ class BuiltInstanceTasks(BuiltInstance): poll_until(server_is_finished, sleep_time=2, time_out=CONF.server_delete_time_out) + self.send_usage_event('delete', deleted_at=timeutils.isotime(), + server=old_server) def resize_volume(self, new_size): - LOG.debug("%s: Resizing volume for instance: %s to %r GB" - % (greenthread.getcurrent(), self.server.id, new_size)) - self.volume_client.volumes.resize(self.volume_id, int(new_size)) + old_volume_size = self.volume_size + new_size = int(new_size) + LOG.debug("%s: Resizing volume for instance: %s from %s to %r GB" + % (greenthread.getcurrent(), self.server.id, + old_volume_size, new_size)) + self.volume_client.volumes.resize(self.volume_id, new_size) try: utils.poll_until( lambda: self.volume_client.volumes.get(self.volume_id), @@ -340,6 +441,11 @@ class BuiltInstanceTasks(BuiltInstance): self.update_db(volume_size=volume.size) self.nova_client.volumes.rescan_server_volume(self.server, self.volume_id) + self.send_usage_event('modify_volume', + old_volume_size=old_volume_size, + launched_at=timeutils.isotime(), + modify_at=timeutils.isotime(), + volume_size=new_size) except PollTimeOut as pto: LOG.error("Timeout trying to rescan or resize the attached volume " "filesystem for volume: %s" % self.volume_id) @@ -353,7 +459,8 @@ class BuiltInstanceTasks(BuiltInstance): def resize_flavor(self, new_flavor_id, old_memory_size, new_memory_size): - action = ResizeAction(self, new_flavor_id, new_memory_size) + action = ResizeAction(self, new_flavor_id, + new_memory_size, old_memory_size) action.execute() def migrate(self): @@ -554,8 +661,10 @@ class ResizeActionBase(object): class ResizeAction(ResizeActionBase): - def __init__(self, instance, new_flavor_id=None, new_memory_size=None): + def __init__(self, instance, new_flavor_id=None, + new_memory_size=None, old_memory_size=None): self.instance = instance + self.old_memory_size = old_memory_size self.new_flavor_id = new_flavor_id self.new_memory_size = new_memory_size @@ -573,6 +682,11 @@ class ResizeAction(ResizeActionBase): LOG.debug("Updating instance %s to flavor_id %s." % (self.instance.id, self.new_flavor_id)) self.instance.update_db(flavor_id=self.new_flavor_id) + self.instance.send_usage_event('modify_flavor', + old_instance_size=self.old_memory_size, + instance_size=self.new_memory_size, + launched_at=timeutils.isotime(), + modify_at=timeutils.isotime()) def _start_mysql(self): self.instance.guest.start_db_with_conf_changes(self.new_memory_size) diff --git a/reddwarf/tests/api/instances.py b/reddwarf/tests/api/instances.py index e6ed957a6f..4ca34fb40b 100644 --- a/reddwarf/tests/api/instances.py +++ b/reddwarf/tests/api/instances.py @@ -34,9 +34,6 @@ GROUP_DATABASES = "dbaas.api.databases" GROUP_SECURITY_GROUPS = "dbaas.api.security_groups" from datetime import datetime -from nose.plugins.skip import SkipTest -from nose.tools import assert_true - from time import sleep from reddwarf.common import exception as rd_exceptions @@ -46,23 +43,22 @@ from proboscis.decorators import time_out from proboscis import before_class from proboscis import after_class from proboscis import test +from proboscis import SkipTest from proboscis.asserts import assert_equal from proboscis.asserts import assert_false from proboscis.asserts import assert_not_equal from proboscis.asserts import assert_raises -from proboscis.asserts import assert_is -from proboscis.asserts import assert_is_none from proboscis.asserts import assert_is_not_none -from proboscis.asserts import assert_is_not from proboscis.asserts import assert_true -from proboscis.asserts import Check from proboscis.asserts import fail +from reddwarf.openstack.common import timeutils from reddwarf import tests from reddwarf.tests.config import CONFIG -from reddwarf.tests.util import create_client from reddwarf.tests.util import create_dbaas_client from reddwarf.tests.util import create_nova_client +from reddwarf.tests.util.usage import create_usage_verifier +from reddwarf.tests.util import iso_time from reddwarf.tests.util import process from reddwarf.tests.util.users import Requirements from reddwarf.tests.util import skip_if_xml @@ -102,6 +98,7 @@ class InstanceTestInfo(object): self.host_info = None # Host Info before creating instances self.user_context = None # A regular user context self.users = None # The users created on the instance. + self.consumer = create_usage_verifier() def get_address(self): result = self.dbaas_admin.mgmt.instances.show(self.id) @@ -133,6 +130,11 @@ def create_new_instance(): return existing_instance() is None +@test(groups=['dbaas.usage', 'dbaas.usage.init']) +def clear_messages_off_queue(): + instance_info.consumer.clear_events() + + @test(groups=[GROUP, GROUP_START, GROUP_START_SIMPLE, 'dbaas.setup'], depends_on_groups=["services.initialize"]) class InstanceSetup(object): @@ -290,7 +292,6 @@ class CreateInstanceQuotaTest(unittest.TestCase): self.test_info.volume = {'size': volume_quota + 1} new_quotas = dbaas_admin.quota.update(self.test_info.user.tenant_id, quota_dict) - assert_equal(volume_quota, new_quotas['volumes']) self.test_info.name = "too_large_volume" @@ -876,6 +877,30 @@ class TestInstanceListing(object): check.volume_mgmt() +@test(depends_on_classes=[WaitForGuestInstallationToFinish], + groups=[GROUP, 'dbaas.usage']) +class TestCreateNotification(object): + """ + Test that the create notification has been sent correctly. + """ + + @test + def test_create_notification(self): + expected = { + 'instance_size': instance_info.dbaas_flavor.ram, + 'tenant_id': instance_info.user.tenant_id, + 'instance_id': instance_info.id, + 'instance_name': instance_info.name, + 'created_at': iso_time(instance_info.initial_result.created), + 'launched_at': iso_time(instance_info.initial_result.created), + 'region': 'LOCAL_DEV', + 'availability_zone': 'nova', + } + instance_info.consumer.check_message(instance_info.id, + 'reddwarf.instance.create', + **expected) + + @test(depends_on_groups=['dbaas.api.instances.actions'], groups=[GROUP, tests.INSTANCES, "dbaas.diagnostics"]) class CheckDiagnosticsAfterTests(object): @@ -916,6 +941,7 @@ class DeleteInstance(object): # Update the report so the logs inside the instance will be saved. CONFIG.get_report().update() dbaas.instances.delete(instance_info.id) + instance_info.deleted_at = timeutils.utcnow().isoformat() attempts = 0 try: @@ -950,6 +976,26 @@ class DeleteInstance(object): # entries are deleted. +@test(depends_on=[WaitForGuestInstallationToFinish], + runs_after=[DeleteInstance], + groups=[GROUP, GROUP_STOP, 'dbaas.usage']) +class AfterDeleteChecks(object): + @test + def test_instance_delete_event_sent(self): + expected = { + 'instance_size': instance_info.dbaas_flavor.ram, + 'tenant_id': instance_info.user.tenant_id, + 'instance_id': instance_info.id, + 'instance_name': instance_info.name, + 'created_at': iso_time(instance_info.initial_result.created), + 'launched_at': iso_time(instance_info.initial_result.created), + 'deleted_at': iso_time(instance_info.deleted_at), + } + instance_info.consumer.check_message(instance_info.id, + 'reddwarf.instance.delete', + **expected) + + @test(depends_on_classes=[CreateInstance, VerifyGuestStarted, WaitForGuestInstallationToFinish], groups=[GROUP, GROUP_START, GROUP_START_SIMPLE], diff --git a/reddwarf/tests/api/instances_actions.py b/reddwarf/tests/api/instances_actions.py index 4e71c55a08..fae9400539 100644 --- a/reddwarf/tests/api/instances_actions.py +++ b/reddwarf/tests/api/instances_actions.py @@ -35,6 +35,7 @@ from reddwarf.tests.util.server_connection import create_server_connection from reddwarf.tests.util import poll_until from reddwarf.tests.config import CONFIG from reddwarf.tests.util import LocalSqlClient +from reddwarf.tests.util import iso_time from sqlalchemy import create_engine from sqlalchemy import exc as sqlalchemy_exc from reddwarf.tests.util.check import TypeCheck @@ -439,8 +440,6 @@ class ResizeInstanceTest(ActionTestBase): def test_status_changed_to_resize(self): self.log_current_users() self.obtain_flavor_ids() - if CONFIG.simulate_events: - raise SkipTest("Cannot simulate this test.") self.dbaas.instances.resize_instance( self.instance_id, self.get_flavor_href(flavor_id=self.expected_new_flavor_id)) @@ -457,7 +456,27 @@ class ResizeInstanceTest(ActionTestBase): def test_instance_returns_to_active_after_resize(self): self.wait_for_resize() - @test(depends_on=[test_instance_returns_to_active_after_resize]) + @test(depends_on=[test_instance_returns_to_active_after_resize, + test_status_changed_to_resize], + groups=["dbaas.usage"]) + def test_resize_instance_usage_event_sent(self): + expected = { + 'old_instance_size': self.old_dbaas_flavor.ram, + 'instance_size': instance_info.dbaas_flavor.ram, + 'tenant_id': instance_info.user.tenant_id, + 'instance_id': instance_info.id, + 'instance_name': instance_info.name, + 'created_at': iso_time(instance_info.initial_result.created), + 'launched_at': iso_time(self.instance.updated), + 'modify_at': iso_time(self.instance.updated), + } + + instance_info.consumer.check_message(instance_info.id, + 'reddwarf.instance.modify_flavor', + **expected) + + @test(depends_on=[test_instance_returns_to_active_after_resize], + runs_after=[test_resize_instance_usage_event_sent]) def resize_should_not_delete_users(self): """Resize should not delete users.""" # Resize has an incredibly weird bug where users are deleted after @@ -477,15 +496,11 @@ class ResizeInstanceTest(ActionTestBase): @test(depends_on=[test_instance_returns_to_active_after_resize], runs_after=[resize_should_not_delete_users]) def test_make_sure_mysql_is_running_after_resize(self): - if CONFIG.simulate_events: - raise SkipTest("Cannot simulate this test.") self.ensure_mysql_is_running() @test(depends_on=[test_instance_returns_to_active_after_resize], runs_after=[test_make_sure_mysql_is_running_after_resize]) def test_instance_has_new_flavor_after_resize(self): - if CONFIG.simulate_events: - raise SkipTest("Cannot simulate this test.") actual = self.get_flavor_href(self.instance.flavor['id']) expected = self.get_flavor_href(flavor_id=self.expected_new_flavor_id) assert_equal(actual, expected) @@ -493,8 +508,6 @@ class ResizeInstanceTest(ActionTestBase): @test(depends_on=[test_instance_has_new_flavor_after_resize]) @time_out(TIME_OUT_TIME) def test_resize_down(self): - if CONFIG.simulate_events: - raise SkipTest("Cannot simulate this test.") expected_dbaas_flavor = self.expected_dbaas_flavor self.dbaas.instances.resize_instance( self.instance_id, @@ -506,6 +519,23 @@ class ResizeInstanceTest(ActionTestBase): assert_equal(str(self.instance.flavor['id']), str(self.expected_old_flavor_id)) + @test(depends_on=[test_resize_down], + groups=["dbaas.usage"]) + def test_resize_instance_down_usage_event_sent(self): + expected = { + 'old_instance_size': self.old_dbaas_flavor.ram, + 'instance_size': instance_info.dbaas_flavor.ram, + 'tenant_id': instance_info.user.tenant_id, + 'instance_id': instance_info.id, + 'instance_name': instance_info.name, + 'created_at': iso_time(instance_info.initial_result.created), + 'launched_at': iso_time(self.instance.updated), + 'modify_at': iso_time(self.instance.updated), + } + instance_info.consumer.check_message(instance_info.id, + 'reddwarf.instance.modify_flavor', + **expected) + @test(groups=[tests.INSTANCES, INSTANCE_GROUP, GROUP, GROUP + ".resize.instance"], @@ -557,6 +587,24 @@ class ResizeInstanceVolume(object): instance = instance_info.dbaas.instances.get(instance_info.id) assert_equal(instance.volume['size'], self.new_volume_size) + @test(depends_on=[test_volume_resize_success], groups=["dbaas.usage"]) + def test_resize_volume_usage_event_sent(self): + instance = instance_info.dbaas.instances.get(instance_info.id) + expected = { + 'old_volume_size': self.old_volume_size, + 'instance_size': instance_info.dbaas_flavor.ram, + 'tenant_id': instance_info.user.tenant_id, + 'instance_id': instance_info.id, + 'instance_name': instance_info.name, + 'created_at': iso_time(instance_info.initial_result.created), + 'launched_at': iso_time(instance.updated), + 'modify_at': iso_time(instance.updated), + 'volume_size': self.new_volume_size, + } + instance_info.consumer.check_message(instance_info.id, + 'reddwarf.instance.modify_volume', + **expected) + @test @time_out(300) def test_volume_resize_success_databases(self): diff --git a/reddwarf/tests/config.py b/reddwarf/tests/config.py index 190247e367..babcc90e23 100644 --- a/reddwarf/tests/config.py +++ b/reddwarf/tests/config.py @@ -47,6 +47,10 @@ class FrozenDict(Mapping): return self.original.__str__() +USAGE_ENDPOINT = os.environ.get("USAGE_ENDPOINT", + "reddwarf.tests.util.usage.UsageVerifier") + + class TestConfig(object): """ Holds test configuration values which can be accessed as attributes @@ -81,6 +85,7 @@ class TestConfig(object): "simulate_events": False, "reddwarf_volume_support": True, "reddwarf_max_volumes_per_user": 100, + "usage_endpoint": USAGE_ENDPOINT, } self._frozen_values = FrozenDict(self._values) self._users = None diff --git a/reddwarf/tests/fakes/nova.py b/reddwarf/tests/fakes/nova.py index c7dbd97c39..8329d1a3ca 100644 --- a/reddwarf/tests/fakes/nova.py +++ b/reddwarf/tests/fakes/nova.py @@ -118,6 +118,7 @@ class FakeServer(object): volume.set_attachment(id) self.host = FAKE_HOSTS[0] self.old_host = None + setattr(self, 'OS-EXT-AZ:availability_zone', 'nova') self._info = {'os:volumes': info_vols} diff --git a/reddwarf/tests/util/__init__.py b/reddwarf/tests/util/__init__.py index 3445335d17..e33bb1d438 100644 --- a/reddwarf/tests/util/__init__.py +++ b/reddwarf/tests/util/__init__.py @@ -221,6 +221,17 @@ def unquote_user_host(user_hostname): return user, host +def iso_time(time_string): + """Return a iso formated datetime: 2013-04-15T19:50:23Z""" + ts = time_string.replace(' ', 'T') + try: + micro = ts.rindex('.') + ts = ts[:micro] + except ValueError: + pass + return '%sZ' % ts + + if CONFIG.simulate_events: # Without event let, this just calls time.sleep. def poll_until(retriever, condition=lambda value: value, diff --git a/reddwarf/tests/util/usage.py b/reddwarf/tests/util/usage.py new file mode 100644 index 0000000000..e61aef1e46 --- /dev/null +++ b/reddwarf/tests/util/usage.py @@ -0,0 +1,83 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (c) 2013 Rackspace Hosting +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from collections import defaultdict +from proboscis.asserts import Check +from proboscis.asserts import assert_equal +from proboscis.asserts import assert_not_equal +import unittest + +from reddwarf.common import utils +from reddwarf.tests.config import CONFIG +from proboscis.dependencies import SkipTest + +MESSAGE_QUEUE = defaultdict(list) + + +def create_usage_verifier(): + return utils.import_object(CONFIG.usage_endpoint) + + +class UsageVerifier(object): + + def clear_events(self): + """Hook that is called to allow endpoints to clean up.""" + pass + + def check_message(self, resource_id, event_type, **attrs): + messages = self.get_messages(resource_id) + print messages, resource_id + found = None + for message in messages: + if message['event_type'] == event_type: + found = message + assert_not_equal(found, None) + with Check() as check: + for key, value in attrs.iteritems(): + check.equal(found[key], value) + + def get_messages(self, resource_id, expected_messages=None): + global MESSAGE_QUEUE + import pprint + pprint.pprint(MESSAGE_QUEUE.items()) + msgs = MESSAGE_QUEUE.get(resource_id, []) + if expected_messages is not None: + assert_equal(len(msgs), expected_messages) + return msgs + + +class FakeVerifier(object): + """This is the default handler in fake mode, it is basically a no-op.""" + + def clear_event(self): + pass + + def check_message(self, *args, **kwargs): + raise SkipTest("Notifications not available") + + def get_messages(self, *args, **kwargs): + pass + + +def notify(context, message): + """Simple test notify function which saves the messages to global list.""" + print('Received Usage Notification: %s' % message) + payload = message.get('payload', None) + payload['event_type'] = message['event_type'] + resource_id = payload['instance_id'] + global MESSAGE_QUEUE + MESSAGE_QUEUE[resource_id].append(payload)