From b5191bb1052f225973b41a282b0be88009a3a8d4 Mon Sep 17 00:00:00 2001 From: Monsyne Dragon Date: Wed, 19 Aug 2015 06:46:29 +0000 Subject: [PATCH] Add AtomPub handler Adds AtomPub handler to send events to an atom server as a feed. Also, fixed Usage driver to save new events directly from pipeline. Fixed minor (but annoyingly hard to diagnose) bug in db layer where it didn't recognize 'long' values. Change-Id: I792e9f77accfea4583fd75805a9ff0d946827df8 --- setup.cfg | 2 +- tests/test_atompub.py | 151 +++++++++++++ tests/test_usage_handler.py | 16 +- winchester/db/interface.py | 7 +- winchester/models.py | 1 + winchester/pipeline_handler.py | 377 ++++++++++++++++++++++++++++++--- winchester/trigger_manager.py | 12 +- 7 files changed, 519 insertions(+), 47 deletions(-) create mode 100644 tests/test_atompub.py diff --git a/setup.cfg b/setup.cfg index 2ba939e..60ea942 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,7 +1,7 @@ [metadata] description-file = README.md name = winchester -version = 0.57 +version = 0.62 author = Monsyne Dragon author_email = mdragon@rackspace.com summary = An OpenStack notification event processing library. diff --git a/tests/test_atompub.py b/tests/test_atompub.py new file mode 100644 index 0000000..10888e7 --- /dev/null +++ b/tests/test_atompub.py @@ -0,0 +1,151 @@ +# Copyright (c) 2015 Rackspace +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import datetime +import unittest2 as unittest + +import mock + +from winchester import pipeline_handler + + +class TestException(Exception): + pass + + +class TestAtomPubHandler(unittest.TestCase): + + def test_constructor_event_types(self): + fakeurl = 'fake://' + h = pipeline_handler.AtomPubHandler(fakeurl) + self.assertEqual(h.included_types, ['*']) + self.assertEqual(h.excluded_types, []) + + h = pipeline_handler.AtomPubHandler(fakeurl, + event_types='test.thing') + self.assertEqual(h.included_types, ['test.thing']) + self.assertEqual(h.excluded_types, []) + + h = pipeline_handler.AtomPubHandler(fakeurl, + event_types=['test.thing']) + self.assertEqual(h.included_types, ['test.thing']) + self.assertEqual(h.excluded_types, []) + + h = pipeline_handler.AtomPubHandler(fakeurl, + event_types=['!test.thing']) + self.assertEqual(h.included_types, ['*']) + self.assertEqual(h.excluded_types, ['test.thing']) + + def test_match_type(self): + event_types = ["test.foo.bar", "!test.wakka.wakka"] + h = pipeline_handler.AtomPubHandler('fakeurl', + event_types=event_types) + self.assertTrue(h.match_type('test.foo.bar')) + self.assertFalse(h.match_type('test.wakka.wakka')) + self.assertFalse(h.match_type('test.foo.baz')) + + event_types = ["test.foo.*", "!test.wakka.*"] + h = pipeline_handler.AtomPubHandler('fakeurl', + event_types=event_types) + self.assertTrue(h.match_type('test.foo.bar')) + self.assertTrue(h.match_type('test.foo.baz')) + self.assertFalse(h.match_type('test.wakka.wakka')) + + def test_handle_events(self): + event_types = ["test.foo.*", "!test.wakka.*"] + h = pipeline_handler.AtomPubHandler('fakeurl', + event_types=event_types) + event1 = dict(event_type="test.foo.zazz") + event2 = dict(event_type="test.wakka.zazz") + event3 = dict(event_type="test.boingy") + events = [event1, event2, event3] + res = h.handle_events(events, dict()) + self.assertEqual(events, res) + self.assertIn(event1, h.events) + self.assertNotIn(event2, h.events) + self.assertNotIn(event3, h.events) + + def test_format_cuf_xml(self): + expected = ('' + '') + d1 = datetime.datetime(2015, 8, 10, 0, 0, 0) + d2 = datetime.datetime(2015, 8, 11, 0, 0, 0) + d3 = datetime.datetime(2015, 8, 9, 15, 21, 0) + event = dict(message_id='1234-56789', + event_type='test.thing', + audit_period_beginning=d1, + audit_period_ending=d2, + launched_at=d3, + instance_id='98765-4321', + state='active', + state_description='', + rax_options='4') + extra = dict(data_center='TST1', region='TST') + h = pipeline_handler.AtomPubHandler('fakeurl', + extra_info=extra) + res, content_type = h.format_cuf_xml(event) + self.assertEqual(res, expected) + self.assertEqual(content_type, 'application/xml') + + def test_generate_atom(self): + expected = ("""""" + """urn:uuid:12-34""" + """""" + """""" + """Server""" + """TEST_CONTENT""" + """""") + event = dict(message_id='12-34', + original_message_id='56-78', + event_type='test.thing') + event_type = 'test.thing.bar' + ctype = 'test/thing' + content = 'TEST_CONTENT' + h = pipeline_handler.AtomPubHandler('fakeurl') + atom = h.generate_atom(event, event_type, content, ctype) + self.assertEqual(atom, expected) + + @mock.patch.object(pipeline_handler.requests, 'post') + @mock.patch.object(pipeline_handler.AtomPubHandler, '_get_auth') + def test_send_event(self, auth, rpost): + test_headers = {'Content-Type': 'application/atom+xml', + 'X-Auth-Token': 'testtoken'} + auth.return_value = test_headers + test_response = mock.MagicMock('http response') + test_response.status_code = 200 + rpost.return_value = test_response + h = pipeline_handler.AtomPubHandler('fakeurl', http_timeout=123, + wait_interval=10, max_wait=100) + test_atom = mock.MagicMock('atom content') + + status = h._send_event(test_atom) + + self.assertEqual(1, auth.call_count) + self.assertEqual(1, rpost.call_count) + rpost.assert_called_with('fakeurl', + data=test_atom, + headers=test_headers, + timeout=123) + self.assertEqual(status, 200) diff --git a/tests/test_usage_handler.py b/tests/test_usage_handler.py index cba3d0e..77def88 100644 --- a/tests/test_usage_handler.py +++ b/tests/test_usage_handler.py @@ -211,7 +211,7 @@ class TestUsageHandler(unittest.TestCase): f['event_type']) self.assertEqual("now", f['timestamp']) self.assertEqual(123, f['stream_id']) - self.assertEqual("inst", f['payload']['instance_id']) + self.assertEqual("inst", f['instance_id']) self.assertEqual("None", f['error']) self.assertIsNone(f['error_code']) @@ -228,7 +228,7 @@ class TestUsageHandler(unittest.TestCase): f['event_type']) self.assertEqual("now", f['timestamp']) self.assertEqual(123, f['stream_id']) - self.assertEqual("inst", f['payload']['instance_id']) + self.assertEqual("inst", f['instance_id']) self.assertEqual("Error", f['error']) self.assertEqual("UX", f['error_code']) @@ -301,11 +301,9 @@ class TestUsageHandler(unittest.TestCase): env = {'stream_id': 123} raw = [{'event_type': 'foo'}] events = self.handler.handle_events(raw, env) - self.assertEqual(1, len(events)) - notifications = env['usage_notifications'] - self.assertEqual(1, len(notifications)) + self.assertEqual(2, len(events)) self.assertEqual("compute.instance.exists.failed", - notifications[0]['event_type']) + events[-1]['event_type']) @mock.patch.object(pipeline_handler.UsageHandler, '_process_block') def test_handle_events_exists(self, pb): @@ -325,9 +323,7 @@ class TestUsageHandler(unittest.TestCase): {'event_type': 'foo'}, ] events = self.handler.handle_events(raw, env) - self.assertEqual(3, len(events)) - notifications = env['usage_notifications'] - self.assertEqual(1, len(notifications)) + self.assertEqual(4, len(events)) self.assertEqual("compute.instance.exists.failed", - notifications[0]['event_type']) + events[-1]['event_type']) self.assertTrue(pb.called) diff --git a/winchester/db/interface.py b/winchester/db/interface.py index 1f3064b..4220604 100644 --- a/winchester/db/interface.py +++ b/winchester/db/interface.py @@ -143,7 +143,12 @@ class DBInterface(object): event_type = self.get_event_type(event_type, session=session) e = models.Event(message_id, event_type, generated) for name in traits: - e[name] = traits[name] + try: + e[name] = traits[name] + except models.InvalidTraitType: + logger.error("Invalid trait for %s " + "(%s) %s" % (name, traits[name], + type(traits[name]))) session.add(e) @sessioned diff --git a/winchester/models.py b/winchester/models.py index 318641e..cf0de67 100644 --- a/winchester/models.py +++ b/winchester/models.py @@ -160,6 +160,7 @@ class PolymorphicVerticalProperty(object): ATTRIBUTE_MAP = {Datatype.none: None} PY_TYPE_MAP = {unicode: Datatype.string, int: Datatype.int, + long: Datatype.int, float: Datatype.float, datetime: Datatype.datetime, DBTimeRange: Datatype.timerange} diff --git a/winchester/pipeline_handler.py b/winchester/pipeline_handler.py index 93ff32e..a7ce7d3 100644 --- a/winchester/pipeline_handler.py +++ b/winchester/pipeline_handler.py @@ -15,12 +15,17 @@ # limitations under the License. import abc +import collections import datetime +import fnmatch +import json import logging import six +import time import uuid from notabene import kombu_driver as driver +import requests logger = logging.getLogger(__name__) @@ -358,33 +363,54 @@ class UsageHandler(PipelineHandlerBase): self._confirm_delete(exists, deleted, delete_fields) def _base_notification(self, exists): - apb, ape = self._get_audit_period(exists) - return { - 'payload': { - 'audit_period_beginning': str(apb), - 'audit_period_ending': str(ape), - 'launched_at': str(exists.get('launched_at', '')), - 'deleted_at': str(exists.get('deleted_at', '')), - 'instance_id': exists.get('instance_id', ''), - 'tenant_id': exists.get('tenant_id', ''), - 'display_name': exists.get('display_name', ''), - 'instance_type': exists.get('instance_flavor', ''), - 'instance_flavor_id': exists.get('instance_flavor_id', ''), - 'state': exists.get('state', ''), - 'state_description': exists.get('state_description', ''), - 'bandwidth': {'public': { - 'bw_in': exists.get('bandwidth_in', 0), - 'bw_out': exists.get('bandwidth_out', 0)}}, - 'image_meta': { - 'org.openstack__1__architecture': exists.get( - 'os_architecture', ''), - 'org.openstack__1__os_version': exists.get('os_version', - ''), - 'org.openstack__1__os_distro': exists.get('os_distro', ''), - 'org.rackspace__1__options': exists.get('rax_options', '0') - } - }, - 'original_message_id': exists.get('message_id', '')} + basen = exists.copy() + if 'bandwidth_in' not in basen: + basen['bandwidth_in'] = 0 + if 'bandwidth_out' not in basen: + basen['bandwidth_out'] = 0 + if 'rax_options' not in basen: + basen['rax_options'] = '0' + basen['original_message_id'] = exists.get('message_id', '') + return basen +# apb, ape = self._get_audit_period(exists) +# return { +# 'payload': { +# 'audit_period_beginning': str(apb), +# 'audit_period_ending': str(ape), +# 'launched_at': str(exists.get('launched_at', '')), +# 'deleted_at': str(exists.get('deleted_at', '')), +# 'instance_id': exists.get('instance_id', ''), +# 'tenant_id': exists.get('tenant_id', ''), +# 'display_name': exists.get('display_name', ''), +# 'instance_type': exists.get('instance_flavor', ''), +# 'instance_flavor_id': exists.get('instance_flavor_id', ''), +# 'state': exists.get('state', ''), +# 'state_description': exists.get('state_description', ''), +# 'bandwidth': {'public': { +# 'bw_in': exists.get('bandwidth_in', 0), +# 'bw_out': exists.get('bandwidth_out', 0)}}, +# 'image_meta': { +# 'org.openstack__1__architecture': exists.get( +# 'os_architecture', ''), +# 'org.openstack__1__os_version': exists.get('os_version', +# ''), +# 'org.openstack__1__os_distro': exists.get('os_distro', ''), +# 'org.rackspace__1__options': exists.get('rax_options', '0') +# } +# }, +# 'original_message_id': exists.get('message_id', '')} + + def _generate_new_id(self, original_message_id, event_type): + # Generate message_id for new events deterministically from + # the original message_id and event type using uuid5 algo. + # This will allow any dups to be caught by message_id. (mdragon) + if original_message_id: + oid = uuid.UUID(original_message_id) + return uuid.uuid5(oid, event_type) + else: + logger.error("Generating %s, but origional message missing" + " origional_message_id." % event_type) + return uuid.uuid4() def _process_block(self, block, exists): error = None @@ -422,12 +448,13 @@ class UsageHandler(PipelineHandlerBase): datetime.datetime.utcnow()), 'stream_id': int(self.stream_id), 'instance_id': instance_id, - 'warnings': self.warnings} + 'warnings': ', '.join(self.warnings)} events.append(warning_event) new_event = self._base_notification(exists) + new_event['message_id'] = self._generate_new_id( + new_event['original_message_id'], event_type) new_event.update({'event_type': event_type, - 'message_id': str(uuid.uuid4()), 'publisher_id': 'stv3', 'timestamp': exists.get('timestamp', datetime.datetime.utcnow()), @@ -466,11 +493,299 @@ class UsageHandler(PipelineHandlerBase): } new_events.append(new_event) - env['usage_notifications'] = new_events - return events + return events + new_events def commit(self): pass def rollback(self): pass + + +class AtomPubException(Exception): + pass + + +cuf_template = ("""""") + + +class AtomPubHandler(PipelineHandlerBase): + auth_token_cache = None + + def __init__(self, url, event_types=None, extra_info=None, + auth_user='', auth_key='', auth_server='', + wait_interval=30, max_wait=600, http_timeout=120, **kw): + super(AtomPubHandler, self).__init__(**kw) + self.events = [] + self.included_types = [] + self.excluded_types = [] + self.url = url + self.auth_user = auth_user + self.auth_key = auth_key + self.auth_server = auth_server + self.wait_interval = wait_interval + self.max_wait = max_wait + self.http_timeout = http_timeout + if extra_info: + self.extra_info = extra_info + else: + self.extra_info = {} + + if event_types: + if isinstance(event_types, six.string_types): + event_types = [event_types] + for t in event_types: + if t.startswith('!'): + self.excluded_types.append(t[1:]) + else: + self.included_types.append(t) + else: + self.included_types.append('*') + if self.excluded_types and not self.included_types: + self.included_types.append('*') + + def _included_type(self, event_type): + return any(fnmatch.fnmatch(event_type, t) for t in self.included_types) + + def _excluded_type(self, event_type): + return any(fnmatch.fnmatch(event_type, t) for t in self.excluded_types) + + def match_type(self, event_type): + return (self._included_type(event_type) + and not self._excluded_type(event_type)) + + def handle_events(self, events, env): + for event in events: + event_type = event['event_type'] + if self.match_type(event_type): + self.events.append(event) + logger.debug("Matched %s events." % len(self.events)) + return events + + def commit(self): + for event in self.events: + event_type = event.get('event_type', '') + message_id = event.get('message_id', '') + try: + status = self.publish_event(event) + logger.debug("Sent %s event %s. Status %s" % (event_type, + message_id, + status)) + except Exception: + original_message_id = event.get('original_message_id', '') + logger.exception("Error publishing %s event %s " + "(original id: %s)!" % (event_type, + message_id, + original_message_id)) + + def publish_event(self, event): + content, content_type = self.format_cuf_xml(event) + event_type = self.event_type_cuf_xml(event.get('event_type')) + atom = self.generate_atom(event, event_type, content, content_type) + + logger.debug("Publishing event: %s" % atom) + return self._send_event(atom) + + def generate_atom(self, event, event_type, content, content_type): + template = ("""""" + """urn:uuid:%(message_id)s""" + """""" + """""" + """Server""" + """%(content)s""" + """""") + info = dict(message_id=event.get('message_id'), + original_message_id=event.get('original_message_id'), + event=event, + event_type=event_type, + content=content, + content_type=content_type) + return template % info + + def event_type_cuf_xml(self, event_type): + return event_type + ".cuf" + + def format_cuf_xml(self, event): + tvals = collections.defaultdict(lambda: '') + tvals.update(event) + tvals.update(self.extra_info) + start_time, end_time = self._get_times(event) + tvals['start_time'] = self._format_time(start_time) + tvals['end_time'] = self._format_time(end_time) + tvals['status'] = self._get_status(event) + tvals['options'] = self._get_options(event) + c = cuf_template % tvals + return (c, 'application/xml') + + def _get_options(self, event): + opt = int(event.get('rax_options', 0)) + flags = [bool(opt & (2**i)) for i in range(8)] + os = 'LINUX' + app = None + if flags[0]: + os = 'RHEL' + if flags[2]: + os = 'WINDOWS' + if flags[6]: + os = 'VYATTA' + if flags[3]: + app = 'MSSQL' + if flags[5]: + app = 'MSSQL_WEB' + if app is None: + return 'osLicenseType="%s"' % os + else: + return 'osLicenseType="%s" applicationLicense="%s"' % (os, app) + + def _get_status(self, event): + state = event.get('state') + state_description = event.get('state_description') + status = 'UNKNOWN' + status_map = { + "building": 'BUILD', + "stopped": 'SHUTOFF', + "paused": 'PAUSED', + "suspended": 'SUSPENDED', + "rescued": 'RESCUE', + "error": 'ERROR', + "deleted": 'DELETED', + "soft-delete": 'SOFT_DELETED', + "shelved": 'SHELVED', + "shelved_offloaded": 'SHELVED_OFFLOADED', + } + if state in status_map: + status = status_map[state] + if state == 'resized': + if state_description == 'resize_reverting': + status = 'REVERT_RESIZE' + else: + status = 'VERIFY_RESIZE' + if state == 'active': + active_map = { + "rebooting": 'REBOOT', + "rebooting_hard": 'HARD_REBOOT', + "updating_password": 'PASSWORD', + "rebuilding": 'REBUILD', + "rebuild_block_device_mapping": 'REBUILD', + "rebuild_spawning": 'REBUILD', + "migrating": 'MIGRATING', + "resize_prep": 'RESIZE', + "resize_migrating": 'RESIZE', + "resize_migrated": 'RESIZE', + "resize_finish": 'RESIZE', + } + status = active_map.get(state_description, 'ACTIVE') + if status == 'UNKNOWN': + logger.error("Unknown status for event %s: state %s (%s)" % ( + event.get('message_id'), state, state_description)) + return status + + def _get_times(self, event): + audit_period_beginning = event.get('audit_period_beginning') + audit_period_ending = event.get('audit_period_ending') + launched_at = event.get('launched_at') + terminated_at = event.get('terminated_at') + if not terminated_at: + terminated_at = event.get('deleted_at') + + start_time = max(launched_at, audit_period_beginning) + if not terminated_at: + end_time = audit_period_ending + else: + end_time = min(terminated_at, audit_period_ending) + if start_time > end_time: + start_time = audit_period_beginning + return (start_time, end_time) + + def _format_time(self, dt): + time_format = "%Y-%m-%dT%H:%M:%SZ" + if dt: + return datetime.datetime.strftime(dt, time_format) + else: + return '' + + def _get_auth(self, force=False, headers=None): + if headers is None: + headers = {} + if force or not AtomPubHandler.auth_token_cache: + auth_body = {"auth": { + "RAX-KSKEY:apiKeyCredentials": { + "username": self.auth_user, + "apiKey": self.auth_key, + }}} + auth_headers = {"User-Agent": "Winchester", + "Accept": "application/json", + "Content-Type": "application/json"} + logger.debug("Contacting auth server %s" % self.auth_server) + res = requests.post(self.auth_server, + data=json.dumps(auth_body), + headers=auth_headers) + res.raise_for_status() + token = res.json()["access"]["token"]["id"] + logger.debug("Token received: %s" % token) + AtomPubHandler.auth_token_cache = token + headers["X-Auth-Token"] = AtomPubHandler.auth_token_cache + return headers + + def _send_event(self, atom): + headers = {"Content-Type": "application/atom+xml"} + headers = self._get_auth(headers=headers) + attempts = 0 + status = 0 + while True: + try: + res = requests.post(self.url, + data=atom, + headers=headers, + timeout=self.http_timeout) + status = res.status_code + if status >= 200 and status < 300: + break + if status == 401: + logger.info("Auth expired, reauthorizing...") + headers = self._get_auth(headers=headers, force=True) + continue + if status == 409: + # they already have this. No need to retry. (mdragon) + logger.debug("Duplicate message: \n%s" % atom) + break + if status == 400: + # AtomPub server won't accept content. + logger.error("Invalid Content: Server rejected content: " + "\n%s" % atom) + break + except requests.exceptions.ConnectionError: + logger.exception("Connection error talking to %s" % self.url) + except requests.exceptions.Timeout: + logger.exception("HTTP timeout talking to %s" % self.url) + except requests.exceptions.HTTPError: + logger.exception("HTTP protocol error talking to " + "%s" % self.url) + except requests.exceptions.RequestException: + logger.exception("Unknown exeption talking to %s" % self.url) + # If we got here, something went wrong + attempts += 1 + wait = min(attempts * self.wait_interval, self.max_wait) + logger.error("Message delivery failed, going to sleep, will " + "try again in %s seconds" % str(wait)) + time.sleep(wait) + return status + + def rollback(self): + pass diff --git a/winchester/trigger_manager.py b/winchester/trigger_manager.py index 6ba0910..04ab9a3 100644 --- a/winchester/trigger_manager.py +++ b/winchester/trigger_manager.py @@ -193,14 +193,18 @@ class TriggerManager(object): return self.time_sync.current_time() def save_event(self, event): - traits = event.copy() + traits = {} try: - message_id = traits.pop('message_id') - timestamp = traits.pop('timestamp') - event_type = traits.pop('event_type') + message_id = event['message_id'] + timestamp = event['timestamp'] + event_type = event['event_type'] except KeyError as e: logger.warning("Received invalid event: %s" % e) return False + for key, val in event.items(): + if key not in ('message_id', 'timestamp', 'event_type'): + if val is not None: + traits[key] = val try: self.db.create_event(message_id, event_type, timestamp, traits)