From 3f6542f049f7046bb3c10015221a8c4266836e96 Mon Sep 17 00:00:00 2001 From: Manali Latkar Date: Mon, 24 Jun 2013 08:52:04 +0530 Subject: [PATCH] - Added logic to populate generic rawdata and glance rawdata - Moved the responsibilty to save rawdata to the notification classes - Notification are now created based on exchange instead of routing_key since routing_keys may not be unique across services - Separate consumers are now created for every exchange specified in the config - Each consumer is started in a separate process - Introduced notification factory and the config module --- etc/sample_stacktach_worker_config.json | 12 +- settings.py | 2 + stacktach/db.py | 19 +- ...eexists_and_instanceusages_from_rawdata.py | 8 +- ...create_glancerawdata_and_genericrawdata.py | 211 +++++++++++++ stacktach/models.py | 92 +++++- stacktach/notification.py | 176 ++++++++--- stacktach/tests.py | 91 +++++- stacktach/views.py | 38 +-- tests/unit/test_models.py | 33 ++ tests/unit/test_notification.py | 298 +++++++++--------- tests/unit/test_stacktach.py | 56 ++-- tests/unit/test_worker.py | 82 +++-- tests/unit/utils.py | 1 + worker/config.py | 43 +++ worker/start_workers.py | 27 +- worker/worker.py | 62 ++-- 17 files changed, 899 insertions(+), 352 deletions(-) create mode 100644 stacktach/migrations/0004_create_glancerawdata_and_genericrawdata.py create mode 100644 tests/unit/test_models.py create mode 100644 worker/config.py diff --git a/etc/sample_stacktach_worker_config.json b/etc/sample_stacktach_worker_config.json index e33ad9e..f15892e 100644 --- a/etc/sample_stacktach_worker_config.json +++ b/etc/sample_stacktach_worker_config.json @@ -7,7 +7,11 @@ "rabbit_userid": "rabbit", "rabbit_password": "rabbit", "rabbit_virtual_host": "/", - "exit_on_exception": true + "exit_on_exception": true, + "topics": { + "nova": ["monitor.info", "monitor.error"], + "glance": ["monitor_glance.info", "monitor_glance.error"] + } }, { "name": "east_coast.prod.cell1", @@ -17,6 +21,10 @@ "rabbit_userid": "rabbit", "rabbit_password": "rabbit", "rabbit_virtual_host": "/", - "exit_on_exception": false + "exit_on_exception": false, + "topics": { + "nova": ["monitor.info", "monitor.error"], + "glance": ["monitor_glance.info", "monitor_glance.error"] + } }] } diff --git a/settings.py b/settings.py index ea859b1..c9106be 100644 --- a/settings.py +++ b/settings.py @@ -140,6 +140,8 @@ INSTALLED_APPS = ( 'south' ) +SOUTH_TESTS_MIGRATE = False + ALLOWED_HOSTS = ['*'] # A sample logging configuration. The only tangible logging diff --git a/stacktach/db.py b/stacktach/db.py index ae3672d..0043ec7 100644 --- a/stacktach/db.py +++ b/stacktach/db.py @@ -20,7 +20,7 @@ def get_or_create_deployment(name): return models.Deployment.objects.get_or_create(name=name) -def create_rawdata(**kwargs): +def create_nova_rawdata(**kwargs): imagemeta_fields = ['os_architecture', 'os_version', 'os_distro', 'rax_options'] imagemeta_kwargs = \ @@ -35,6 +35,7 @@ def create_rawdata(**kwargs): return rawdata + def create_lifecycle(**kwargs): return models.Lifecycle(**kwargs) @@ -88,4 +89,18 @@ def create_instance_exists(**kwargs): def save(obj): - obj.save() \ No newline at end of file + obj.save() + + +def create_glance_rawdata(**kwargs): + rawdata = models.GlanceRawData(**kwargs) + rawdata.save() + + return rawdata + + +def create_generic_rawdata(**kwargs): + rawdata = models.GenericRawData(**kwargs) + rawdata.save() + + return rawdata \ No newline at end of file diff --git a/stacktach/migrations/0003_populate_usage_related_fields_in_rawdataimagemeta_instanceexists_and_instanceusages_from_rawdata.py b/stacktach/migrations/0003_populate_usage_related_fields_in_rawdataimagemeta_instanceexists_and_instanceusages_from_rawdata.py index 67edfa4..86ae331 100644 --- a/stacktach/migrations/0003_populate_usage_related_fields_in_rawdataimagemeta_instanceexists_and_instanceusages_from_rawdata.py +++ b/stacktach/migrations/0003_populate_usage_related_fields_in_rawdataimagemeta_instanceexists_and_instanceusages_from_rawdata.py @@ -1,8 +1,7 @@ # -*- coding: utf-8 -*- import copy from south.v2 import DataMigration -from stacktach.notification import Notification -from stacktach.views import NOTIFICATIONS +from stacktach.notification import notification_factory try: import ujson as json @@ -43,8 +42,7 @@ class Migration(DataMigration): json_dict = json.loads(json_message) routing_key = json_dict[0] body = json_dict[1] - notification = NOTIFICATIONS[routing_key](body) - return notification + return notification_factory(body, None, routing_key, json_message, None) def forwards(self, orm): # Note: Don't use "from appname.models import ModelName". @@ -79,7 +77,7 @@ class Migration(DataMigration): exists_update_count += 1 print "Updated %s records in InstanceExists" % exists_update_count - print "\nStarted updating records in InstacnceUsages" + print "\nStarted updating records in InstanceUsages" usages = orm.InstanceUsage.objects.all().values('request_id') usages_update_count = 0 for usage in usages: diff --git a/stacktach/migrations/0004_create_glancerawdata_and_genericrawdata.py b/stacktach/migrations/0004_create_glancerawdata_and_genericrawdata.py new file mode 100644 index 0000000..277c5ca --- /dev/null +++ b/stacktach/migrations/0004_create_glancerawdata_and_genericrawdata.py @@ -0,0 +1,211 @@ +# -*- coding: utf-8 -*- +import datetime +from south.db import db +from south.v2 import SchemaMigration +from django.db import models + + +class Migration(SchemaMigration): + + def forwards(self, orm): + # Adding model 'GlanceRawData' + db.create_table(u'stacktach_glancerawdata', ( + (u'id', self.gf('django.db.models.fields.AutoField')(primary_key=True)), + ('deployment', self.gf('django.db.models.fields.related.ForeignKey')(to=orm['stacktach.Deployment'])), + ('owner', self.gf('django.db.models.fields.CharField')(db_index=True, max_length=50, null=True, blank=True)), + ('json', self.gf('django.db.models.fields.TextField')()), + ('routing_key', self.gf('django.db.models.fields.CharField')(db_index=True, max_length=50, null=True, blank=True)), + ('when', self.gf('django.db.models.fields.DecimalField')(max_digits=20, decimal_places=6, db_index=True)), + ('publisher', self.gf('django.db.models.fields.CharField')(db_index=True, max_length=100, null=True, blank=True)), + ('event', self.gf('django.db.models.fields.CharField')(db_index=True, max_length=50, null=True, blank=True)), + ('service', self.gf('django.db.models.fields.CharField')(db_index=True, max_length=50, null=True, blank=True)), + ('host', self.gf('django.db.models.fields.CharField')(db_index=True, max_length=100, null=True, blank=True)), + ('instance', self.gf('django.db.models.fields.CharField')(db_index=True, max_length=50, null=True, blank=True)), + ('request_id', self.gf('django.db.models.fields.CharField')(db_index=True, max_length=50, null=True, blank=True)), + ('uuid', self.gf('django.db.models.fields.CharField')(max_length=50)), + ('status', self.gf('django.db.models.fields.CharField')(default='queued', max_length=50, db_index=True)), + ('image_type', self.gf('django.db.models.fields.IntegerField')(default=0, null=True, db_index=True)), + )) + db.send_create_signal(u'stacktach', ['GlanceRawData']) + + # Adding model 'GenericRawData' + db.create_table(u'stacktach_genericrawdata', ( + (u'id', self.gf('django.db.models.fields.AutoField')(primary_key=True)), + ('deployment', self.gf('django.db.models.fields.related.ForeignKey')(to=orm['stacktach.Deployment'])), + ('tenant', self.gf('django.db.models.fields.CharField')(db_index=True, max_length=50, null=True, blank=True)), + ('json', self.gf('django.db.models.fields.TextField')()), + ('routing_key', self.gf('django.db.models.fields.CharField')(db_index=True, max_length=50, null=True, blank=True)), + ('image_type', self.gf('django.db.models.fields.IntegerField')(default=0, null=True, db_index=True)), + ('when', self.gf('django.db.models.fields.DecimalField')(max_digits=20, decimal_places=6, db_index=True)), + ('publisher', self.gf('django.db.models.fields.CharField')(db_index=True, max_length=100, null=True, blank=True)), + ('event', self.gf('django.db.models.fields.CharField')(db_index=True, max_length=50, null=True, blank=True)), + ('service', self.gf('django.db.models.fields.CharField')(db_index=True, max_length=50, null=True, blank=True)), + ('host', self.gf('django.db.models.fields.CharField')(db_index=True, max_length=100, null=True, blank=True)), + ('instance', self.gf('django.db.models.fields.CharField')(db_index=True, max_length=50, null=True, blank=True)), + ('request_id', self.gf('django.db.models.fields.CharField')(db_index=True, max_length=50, null=True, blank=True)), + )) + db.send_create_signal(u'stacktach', ['GenericRawData']) + + + def backwards(self, orm): + # Deleting model 'GlanceRawData' + db.delete_table(u'stacktach_glancerawdata') + + # Deleting model 'GenericRawData' + db.delete_table(u'stacktach_genericrawdata') + + + models = { + u'stacktach.deployment': { + 'Meta': {'object_name': 'Deployment'}, + u'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}), + 'name': ('django.db.models.fields.CharField', [], {'max_length': '50'}) + }, + u'stacktach.genericrawdata': { + 'Meta': {'object_name': 'GenericRawData'}, + 'deployment': ('django.db.models.fields.related.ForeignKey', [], {'to': u"orm['stacktach.Deployment']"}), + 'event': ('django.db.models.fields.CharField', [], {'db_index': 'True', 'max_length': '50', 'null': 'True', 'blank': 'True'}), + 'host': ('django.db.models.fields.CharField', [], {'db_index': 'True', 'max_length': '100', 'null': 'True', 'blank': 'True'}), + u'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}), + 'image_type': ('django.db.models.fields.IntegerField', [], {'default': '0', 'null': 'True', 'db_index': 'True'}), + 'instance': ('django.db.models.fields.CharField', [], {'db_index': 'True', 'max_length': '50', 'null': 'True', 'blank': 'True'}), + 'json': ('django.db.models.fields.TextField', [], {}), + 'publisher': ('django.db.models.fields.CharField', [], {'db_index': 'True', 'max_length': '100', 'null': 'True', 'blank': 'True'}), + 'request_id': ('django.db.models.fields.CharField', [], {'db_index': 'True', 'max_length': '50', 'null': 'True', 'blank': 'True'}), + 'routing_key': ('django.db.models.fields.CharField', [], {'db_index': 'True', 'max_length': '50', 'null': 'True', 'blank': 'True'}), + 'service': ('django.db.models.fields.CharField', [], {'db_index': 'True', 'max_length': '50', 'null': 'True', 'blank': 'True'}), + 'tenant': ('django.db.models.fields.CharField', [], {'db_index': 'True', 'max_length': '50', 'null': 'True', 'blank': 'True'}), + 'when': ('django.db.models.fields.DecimalField', [], {'max_digits': '20', 'decimal_places': '6', 'db_index': 'True'}) + }, + u'stacktach.glancerawdata': { + 'Meta': {'object_name': 'GlanceRawData'}, + 'deployment': ('django.db.models.fields.related.ForeignKey', [], {'to': u"orm['stacktach.Deployment']"}), + 'event': ('django.db.models.fields.CharField', [], {'db_index': 'True', 'max_length': '50', 'null': 'True', 'blank': 'True'}), + 'host': ('django.db.models.fields.CharField', [], {'db_index': 'True', 'max_length': '100', 'null': 'True', 'blank': 'True'}), + u'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}), + 'image_type': ('django.db.models.fields.IntegerField', [], {'default': '0', 'null': 'True', 'db_index': 'True'}), + 'instance': ('django.db.models.fields.CharField', [], {'db_index': 'True', 'max_length': '50', 'null': 'True', 'blank': 'True'}), + 'json': ('django.db.models.fields.TextField', [], {}), + 'owner': ('django.db.models.fields.CharField', [], {'db_index': 'True', 'max_length': '50', 'null': 'True', 'blank': 'True'}), + 'publisher': ('django.db.models.fields.CharField', [], {'db_index': 'True', 'max_length': '100', 'null': 'True', 'blank': 'True'}), + 'request_id': ('django.db.models.fields.CharField', [], {'db_index': 'True', 'max_length': '50', 'null': 'True', 'blank': 'True'}), + 'routing_key': ('django.db.models.fields.CharField', [], {'db_index': 'True', 'max_length': '50', 'null': 'True', 'blank': 'True'}), + 'service': ('django.db.models.fields.CharField', [], {'db_index': 'True', 'max_length': '50', 'null': 'True', 'blank': 'True'}), + 'status': ('django.db.models.fields.CharField', [], {'default': "'queued'", 'max_length': '50', 'db_index': 'True'}), + 'uuid': ('django.db.models.fields.CharField', [], {'max_length': '50'}), + 'when': ('django.db.models.fields.DecimalField', [], {'max_digits': '20', 'decimal_places': '6', 'db_index': 'True'}) + }, + u'stacktach.instancedeletes': { + 'Meta': {'object_name': 'InstanceDeletes'}, + 'deleted_at': ('django.db.models.fields.DecimalField', [], {'null': 'True', 'max_digits': '20', 'decimal_places': '6', 'db_index': 'True'}), + u'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}), + 'instance': ('django.db.models.fields.CharField', [], {'db_index': 'True', 'max_length': '50', 'null': 'True', 'blank': 'True'}), + 'launched_at': ('django.db.models.fields.DecimalField', [], {'null': 'True', 'max_digits': '20', 'decimal_places': '6', 'db_index': 'True'}), + 'raw': ('django.db.models.fields.related.ForeignKey', [], {'to': u"orm['stacktach.RawData']", 'null': 'True'}) + }, + u'stacktach.instanceexists': { + 'Meta': {'object_name': 'InstanceExists'}, + 'audit_period_beginning': ('django.db.models.fields.DecimalField', [], {'null': 'True', 'max_digits': '20', 'decimal_places': '6', 'db_index': 'True'}), + 'audit_period_ending': ('django.db.models.fields.DecimalField', [], {'null': 'True', 'max_digits': '20', 'decimal_places': '6', 'db_index': 'True'}), + 'delete': ('django.db.models.fields.related.ForeignKey', [], {'related_name': "'+'", 'null': 'True', 'to': u"orm['stacktach.InstanceDeletes']"}), + 'deleted_at': ('django.db.models.fields.DecimalField', [], {'null': 'True', 'max_digits': '20', 'decimal_places': '6', 'db_index': 'True'}), + 'fail_reason': ('django.db.models.fields.CharField', [], {'db_index': 'True', 'max_length': '300', 'null': 'True', 'blank': 'True'}), + u'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}), + 'instance': ('django.db.models.fields.CharField', [], {'db_index': 'True', 'max_length': '50', 'null': 'True', 'blank': 'True'}), + 'instance_type_id': ('django.db.models.fields.CharField', [], {'db_index': 'True', 'max_length': '50', 'null': 'True', 'blank': 'True'}), + 'launched_at': ('django.db.models.fields.DecimalField', [], {'null': 'True', 'max_digits': '20', 'decimal_places': '6', 'db_index': 'True'}), + 'message_id': ('django.db.models.fields.CharField', [], {'db_index': 'True', 'max_length': '50', 'null': 'True', 'blank': 'True'}), + 'os_architecture': ('django.db.models.fields.TextField', [], {'null': 'True', 'blank': 'True'}), + 'os_distro': ('django.db.models.fields.TextField', [], {'null': 'True', 'blank': 'True'}), + 'os_version': ('django.db.models.fields.TextField', [], {'null': 'True', 'blank': 'True'}), + 'raw': ('django.db.models.fields.related.ForeignKey', [], {'related_name': "'+'", 'null': 'True', 'to': u"orm['stacktach.RawData']"}), + 'rax_options': ('django.db.models.fields.TextField', [], {'null': 'True', 'blank': 'True'}), + 'send_status': ('django.db.models.fields.IntegerField', [], {'default': '0', 'null': 'True', 'db_index': 'True'}), + 'status': ('django.db.models.fields.CharField', [], {'default': "'pending'", 'max_length': '50', 'db_index': 'True'}), + 'tenant': ('django.db.models.fields.CharField', [], {'db_index': 'True', 'max_length': '50', 'null': 'True', 'blank': 'True'}), + 'usage': ('django.db.models.fields.related.ForeignKey', [], {'related_name': "'+'", 'null': 'True', 'to': u"orm['stacktach.InstanceUsage']"}) + }, + u'stacktach.instanceusage': { + 'Meta': {'object_name': 'InstanceUsage'}, + u'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}), + 'instance': ('django.db.models.fields.CharField', [], {'db_index': 'True', 'max_length': '50', 'null': 'True', 'blank': 'True'}), + 'instance_type_id': ('django.db.models.fields.CharField', [], {'db_index': 'True', 'max_length': '50', 'null': 'True', 'blank': 'True'}), + 'launched_at': ('django.db.models.fields.DecimalField', [], {'null': 'True', 'max_digits': '20', 'decimal_places': '6', 'db_index': 'True'}), + 'os_architecture': ('django.db.models.fields.TextField', [], {'null': 'True', 'blank': 'True'}), + 'os_distro': ('django.db.models.fields.TextField', [], {'null': 'True', 'blank': 'True'}), + 'os_version': ('django.db.models.fields.TextField', [], {'null': 'True', 'blank': 'True'}), + 'rax_options': ('django.db.models.fields.TextField', [], {'null': 'True', 'blank': 'True'}), + 'request_id': ('django.db.models.fields.CharField', [], {'db_index': 'True', 'max_length': '50', 'null': 'True', 'blank': 'True'}), + 'tenant': ('django.db.models.fields.CharField', [], {'db_index': 'True', 'max_length': '50', 'null': 'True', 'blank': 'True'}) + }, + u'stacktach.jsonreport': { + 'Meta': {'object_name': 'JsonReport'}, + 'created': ('django.db.models.fields.DecimalField', [], {'max_digits': '20', 'decimal_places': '6', 'db_index': 'True'}), + u'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}), + 'json': ('django.db.models.fields.TextField', [], {}), + 'name': ('django.db.models.fields.CharField', [], {'max_length': '50', 'db_index': 'True'}), + 'period_end': ('django.db.models.fields.DateTimeField', [], {'db_index': 'True'}), + 'period_start': ('django.db.models.fields.DateTimeField', [], {'db_index': 'True'}), + 'version': ('django.db.models.fields.IntegerField', [], {'default': '1'}) + }, + u'stacktach.lifecycle': { + 'Meta': {'object_name': 'Lifecycle'}, + u'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}), + 'instance': ('django.db.models.fields.CharField', [], {'db_index': 'True', 'max_length': '50', 'null': 'True', 'blank': 'True'}), + 'last_raw': ('django.db.models.fields.related.ForeignKey', [], {'to': u"orm['stacktach.RawData']", 'null': 'True'}), + 'last_state': ('django.db.models.fields.CharField', [], {'db_index': 'True', 'max_length': '50', 'null': 'True', 'blank': 'True'}), + 'last_task_state': ('django.db.models.fields.CharField', [], {'db_index': 'True', 'max_length': '50', 'null': 'True', 'blank': 'True'}) + }, + u'stacktach.rawdata': { + 'Meta': {'object_name': 'RawData'}, + 'deployment': ('django.db.models.fields.related.ForeignKey', [], {'to': u"orm['stacktach.Deployment']"}), + 'event': ('django.db.models.fields.CharField', [], {'db_index': 'True', 'max_length': '50', 'null': 'True', 'blank': 'True'}), + 'host': ('django.db.models.fields.CharField', [], {'db_index': 'True', 'max_length': '100', 'null': 'True', 'blank': 'True'}), + u'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}), + 'image_type': ('django.db.models.fields.IntegerField', [], {'default': '0', 'null': 'True', 'db_index': 'True'}), + 'instance': ('django.db.models.fields.CharField', [], {'db_index': 'True', 'max_length': '50', 'null': 'True', 'blank': 'True'}), + 'json': ('django.db.models.fields.TextField', [], {}), + 'old_state': ('django.db.models.fields.CharField', [], {'db_index': 'True', 'max_length': '20', 'null': 'True', 'blank': 'True'}), + 'old_task': ('django.db.models.fields.CharField', [], {'db_index': 'True', 'max_length': '30', 'null': 'True', 'blank': 'True'}), + 'publisher': ('django.db.models.fields.CharField', [], {'db_index': 'True', 'max_length': '100', 'null': 'True', 'blank': 'True'}), + 'request_id': ('django.db.models.fields.CharField', [], {'db_index': 'True', 'max_length': '50', 'null': 'True', 'blank': 'True'}), + 'routing_key': ('django.db.models.fields.CharField', [], {'db_index': 'True', 'max_length': '50', 'null': 'True', 'blank': 'True'}), + 'service': ('django.db.models.fields.CharField', [], {'db_index': 'True', 'max_length': '50', 'null': 'True', 'blank': 'True'}), + 'state': ('django.db.models.fields.CharField', [], {'db_index': 'True', 'max_length': '20', 'null': 'True', 'blank': 'True'}), + 'task': ('django.db.models.fields.CharField', [], {'db_index': 'True', 'max_length': '30', 'null': 'True', 'blank': 'True'}), + 'tenant': ('django.db.models.fields.CharField', [], {'db_index': 'True', 'max_length': '50', 'null': 'True', 'blank': 'True'}), + 'when': ('django.db.models.fields.DecimalField', [], {'max_digits': '20', 'decimal_places': '6', 'db_index': 'True'}) + }, + u'stacktach.rawdataimagemeta': { + 'Meta': {'object_name': 'RawDataImageMeta'}, + u'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}), + 'os_architecture': ('django.db.models.fields.TextField', [], {'null': 'True', 'blank': 'True'}), + 'os_distro': ('django.db.models.fields.TextField', [], {'null': 'True', 'blank': 'True'}), + 'os_version': ('django.db.models.fields.TextField', [], {'null': 'True', 'blank': 'True'}), + 'raw': ('django.db.models.fields.related.ForeignKey', [], {'to': u"orm['stacktach.RawData']"}), + 'rax_options': ('django.db.models.fields.TextField', [], {'null': 'True', 'blank': 'True'}) + }, + u'stacktach.requesttracker': { + 'Meta': {'object_name': 'RequestTracker'}, + 'completed': ('django.db.models.fields.BooleanField', [], {'default': 'False', 'db_index': 'True'}), + 'duration': ('django.db.models.fields.DecimalField', [], {'max_digits': '20', 'decimal_places': '6', 'db_index': 'True'}), + u'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}), + 'last_timing': ('django.db.models.fields.related.ForeignKey', [], {'to': u"orm['stacktach.Timing']", 'null': 'True'}), + 'lifecycle': ('django.db.models.fields.related.ForeignKey', [], {'to': u"orm['stacktach.Lifecycle']"}), + 'request_id': ('django.db.models.fields.CharField', [], {'max_length': '50', 'db_index': 'True'}), + 'start': ('django.db.models.fields.DecimalField', [], {'max_digits': '20', 'decimal_places': '6', 'db_index': 'True'}) + }, + u'stacktach.timing': { + 'Meta': {'object_name': 'Timing'}, + 'diff': ('django.db.models.fields.DecimalField', [], {'null': 'True', 'max_digits': '20', 'decimal_places': '6', 'db_index': 'True'}), + 'end_raw': ('django.db.models.fields.related.ForeignKey', [], {'related_name': "'+'", 'null': 'True', 'to': u"orm['stacktach.RawData']"}), + 'end_when': ('django.db.models.fields.DecimalField', [], {'null': 'True', 'max_digits': '20', 'decimal_places': '6'}), + u'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}), + 'lifecycle': ('django.db.models.fields.related.ForeignKey', [], {'to': u"orm['stacktach.Lifecycle']"}), + 'name': ('django.db.models.fields.CharField', [], {'max_length': '50', 'db_index': 'True'}), + 'start_raw': ('django.db.models.fields.related.ForeignKey', [], {'related_name': "'+'", 'null': 'True', 'to': u"orm['stacktach.RawData']"}), + 'start_when': ('django.db.models.fields.DecimalField', [], {'null': 'True', 'max_digits': '20', 'decimal_places': '6'}) + } + } + + complete_apps = ['stacktach'] \ No newline at end of file diff --git a/stacktach/models.py b/stacktach/models.py index f76fed9..6980ee4 100644 --- a/stacktach/models.py +++ b/stacktach/models.py @@ -1,18 +1,3 @@ -# Copyright 2012 - Dark Secret Software Inc. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - from django import forms from django.db import models @@ -24,6 +9,34 @@ class Deployment(models.Model): return self.name +class GenericRawData(models.Model): + deployment = models.ForeignKey(Deployment) + tenant = models.CharField(max_length=50, null=True, blank=True, + db_index=True) + json = models.TextField() + routing_key = models.CharField(max_length=50, null=True, + blank=True, db_index=True) + image_type = models.IntegerField(null=True, default=0, db_index=True) + when = models.DecimalField(max_digits=20, decimal_places=6, + db_index=True) + publisher = models.CharField(max_length=100, null=True, + blank=True, db_index=True) + event = models.CharField(max_length=50, null=True, + blank=True, db_index=True) + service = models.CharField(max_length=50, null=True, + blank=True, db_index=True) + host = models.CharField(max_length=100, null=True, + blank=True, db_index=True) + instance = models.CharField(max_length=50, null=True, + blank=True, db_index=True) + request_id = models.CharField(max_length=50, null=True, + blank=True, db_index=True) + + @staticmethod + def get_name(): + return GenericRawData.__name__ + + class RawData(models.Model): deployment = models.ForeignKey(Deployment) tenant = models.CharField(max_length=50, null=True, blank=True, @@ -58,6 +71,10 @@ class RawData(models.Model): def __repr__(self): return "%s %s %s" % (self.event, self.instance, self.state) + @staticmethod + def get_name(): + return RawData.__name__ + class RawDataImageMeta(models.Model): raw = models.ForeignKey(RawData, null=False) @@ -158,6 +175,7 @@ class InstanceExists(models.Model): (RECONCILED, 'Passed Verification After Reconciliation'), (FAILED, 'Failed Verification'), ] + instance = models.CharField(max_length=50, null=True, blank=True, db_index=True) launched_at = models.DecimalField(null=True, max_digits=20, @@ -238,5 +256,49 @@ class JsonReport(models.Model): json = models.TextField() +class GlanceRawData(models.Model): + ACTIVE = 'active' + DELETED = 'deleted' + KILLED = 'killed' + PENDING_DELETE = 'pending_delete' + QUEUED = 'queued' + SAVING = 'saving' + STATUS_CHOICES = [ + (ACTIVE, 'Active'), + (DELETED, 'Deleted'), + (KILLED, 'Killed'), + (PENDING_DELETE, 'Pending delete'), + (QUEUED, 'Queued'), + (SAVING, 'Saving'), + ] + + deployment = models.ForeignKey(Deployment) + owner = models.CharField(max_length=50, null=True, blank=True, + db_index=True) + json = models.TextField() + routing_key = models.CharField(max_length=50, null=True, blank=True, + db_index=True) + when = models.DecimalField(max_digits=20, decimal_places=6, db_index=True) + publisher = models.CharField(max_length=100, null=True, + blank=True, db_index=True) + event = models.CharField(max_length=50, null=True, blank=True, + db_index=True) + service = models.CharField(max_length=50, null=True, blank=True, + db_index=True) + host = models.CharField(max_length=100, null=True, blank=True, + db_index=True) + instance = models.CharField(max_length=50, null=True, blank=True, + db_index=True) + request_id = models.CharField(max_length=50, null=True, blank=True, + db_index=True) + uuid = models.CharField(max_length=50) + status = models.CharField(max_length=50, db_index=True, + choices=STATUS_CHOICES, default=QUEUED) + image_type = models.IntegerField(null=True, default=0, db_index=True) + + @staticmethod + def get_name(): + return GlanceRawData.__name__ + def get_model_fields(model): return model._meta.fields diff --git a/stacktach/notification.py b/stacktach/notification.py index edf367e..b69f76a 100644 --- a/stacktach/notification.py +++ b/stacktach/notification.py @@ -1,25 +1,37 @@ +# Copyright (c) 2013 - Rackspace Inc. +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to +# deal in the Software without restriction, including without limitation the +# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or +# sell copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS +# IN THE SOFTWARE. from stacktach import utils from stacktach import image_type +from stacktach import db class Notification(object): - def __init__(self, body): + def __init__(self, body, deployment, routing_key, json): self.body = body self.request_id = body.get('_context_request_id', "") + self.deployment = deployment + self.routing_key = routing_key + self.json = json self.payload = body.get('payload', {}) - self.state = self.payload.get('state', "") - self.old_state = self.payload.get('old_state', "") - self.old_task = self.payload.get('old_task_state', "") - self.task = self.payload.get('new_task_state', "") - self.image_type = image_type.get_numeric_code(self.payload) self.publisher = self.body['publisher_id'] self.event = self.body['event_type'] - image_meta = self.payload.get('image_meta', {}) - self.os_architecture = image_meta.get('org.openstack__1__architecture', - '') - self.os_distro = image_meta.get('org.openstack__1__os_distro', '') - self.os_version = image_meta.get('org.openstack__1__os_version', '') - self.rax_options = image_meta.get('com.rackspace__1__options', '') @property def when(self): @@ -29,30 +41,24 @@ class Notification(object): when = utils.str_time_to_unix(when) return when - def rawdata_kwargs(self, deployment, routing_key, json): - return { - 'deployment': deployment, - 'routing_key': routing_key, - 'event': self.event, - 'publisher': self.publisher, - 'json': json, - 'state': self.state, - 'old_state': self.old_state, - 'task': self.task, - 'old_task': self.old_task, - 'image_type': self.image_type, - 'when': self.when, - 'publisher': self.publisher, - 'service': self.service, - 'host': self.host, - 'instance': self.instance, - 'request_id': self.request_id, - 'tenant': self.tenant, - 'os_architecture': self.os_architecture, - 'os_distro': self.os_distro, - 'os_version': self.os_version, - 'rax_options': self.rax_options - } + @property + def service(self): + parts = self.publisher.split('.') + return parts[0] + + @property + def host(self): + host = None + parts = self.publisher.split('.') + if len(parts) > 1: + host = ".".join(parts[1:]) + return host + + @property + def tenant(self): + tenant = self.body.get('_context_project_id', None) + tenant = self.payload.get('tenant_id', tenant) + return tenant @property def instance(self): @@ -65,6 +71,71 @@ class Notification(object): instance = self.payload.get('instance', {}).get('uuid') return instance + def save(self): + return db.create_generic_rawdata(deployment=self.deployment, + routing_key=self.routing_key, + tenant=self.tenant, + json=self.json, + when=self.when, + publisher=self.publisher, + event=self.event, + service=self.service, + host=self.host, + instance=self.instance, + request_id=self.request_id) + + +class GlanceNotification(Notification): + def __init__(self, body, deployment, routing_key, json): + super(GlanceNotification, self).__init__(body, deployment, + routing_key, json) + self.properties = self.payload.get('properties', {}) + self.image_type = image_type.get_numeric_code(self.payload) + self.status = self.payload.get('status', None) + self.uuid = self.payload.get('id', None) + + + @property + def owner(self): + return self.payload.get('owner', None) + + @property + def instance(self): + return self.properties.get('instance_uuid', None) + + def save(self): + db.create_glance_rawdata(deployment=self.deployment, + routing_key=self.routing_key, + owner=self.owner, + json=self.json, + when=self.when, + publisher=self.publisher, + event=self.event, + service=self.service, + host=self.host, + instance=self.instance, + request_id=self.request_id, + image_type=self.image_type, + status=self.status, + uuid=self.uuid) + + +class NovaNotification(Notification): + def __init__(self, body, deployment, routing_key, json): + super(NovaNotification, self).__init__(body, deployment, routing_key, + json) + self.state = self.payload.get('state', "") + self.old_state = self.payload.get('old_state', "") + self.old_task = self.payload.get('old_task_state', "") + self.task = self.payload.get('new_task_state', "") + self.image_type = image_type.get_numeric_code(self.payload) + image_meta = self.payload.get('image_meta', {}) + self.os_architecture = image_meta.get('org.openstack__1__architecture', + '') + self.os_distro = image_meta.get('org.openstack__1__os_distro', '') + self.os_version = image_meta.get('org.openstack__1__os_version', '') + self.rax_options = image_meta.get('com.rackspace__1__options', '') + @property def host(self): host = None @@ -78,8 +149,31 @@ class Notification(object): parts = self.publisher.split('.') return parts[0] - @property - def tenant(self): - tenant = self.body.get('_context_project_id', None) - tenant = self.payload.get('tenant_id', tenant) - return tenant + def save(self): + return db.create_nova_rawdata(deployment=self.deployment, + routing_key=self.routing_key, + tenant=self.tenant, + json=self.json, + when=self.when, + publisher=self.publisher, + event=self.event, + service=self.service, + host=self.host, + instance=self.instance, + request_id=self.request_id, + state=self.state, + old_state=self.old_state, + task=self.task, + old_task=self.old_task, + os_architecture=self.os_architecture, + os_distro=self.os_distro, + os_version=self.os_version, + rax_options=self.rax_options) + + +def notification_factory(body, deployment, routing_key, json, exchange): + if exchange == 'nova': + return NovaNotification(body, deployment, routing_key, json) + if exchange == "glance": + return GlanceNotification(body, deployment, routing_key, json) + return Notification(body, deployment, routing_key, json) diff --git a/stacktach/tests.py b/stacktach/tests.py index 504c345..80ec3ad 100644 --- a/stacktach/tests.py +++ b/stacktach/tests.py @@ -19,39 +19,106 @@ # IN THE SOFTWARE. from datetime import datetime -import unittest +from django.test import TransactionTestCase import db from stacktach.datetime_to_decimal import dt_to_decimal from stacktach.models import RawDataImageMeta +from stacktach.models import GenericRawData +from stacktach.models import GlanceRawData from stacktach.models import RawData from stacktach.models import get_model_fields -class RawDataImageMetaDbTestCase(unittest.TestCase): +class RawDataImageMetaDbTestCase(TransactionTestCase): def test_create_raw_data_should_populate_rawdata_and_rawdata_imagemeta(self): deployment = db.get_or_create_deployment('deployment1')[0] kwargs = { 'deployment': deployment, 'when': dt_to_decimal(datetime.utcnow()), - 'tenant': '1', 'json': '{}', 'routing_key': 'monitor.info', - 'state': 'verifying', 'old_state': 'pending', - 'old_task': '', 'task': '', 'image_type': 1, - 'publisher': '', 'event': 'compute.instance.exists', - 'service': '', 'host': '', 'instance': '1234-5678-9012-3456', - 'request_id': '1234', 'os_architecture': 'x86', 'os_version': '1', - 'os_distro': 'windows', 'rax_options': '2'} + 'tenant': '1', + 'json': '{}', + 'routing_key': 'monitor.info', + 'state': 'verifying', + 'old_state': 'pending', + 'old_task': 'building', + 'task': 'saving', + 'image_type': 1, + 'publisher': 'publisher', + 'event': 'compute.instance.exists', + 'service': 'compute', + 'host': 'host', + 'instance': '1234-5678-9012-3456', + 'request_id': '1234', + 'os_architecture': 'x86', + 'os_version': '1', + 'os_distro': 'windows', + 'rax_options': '2'} - rawdata = db.create_rawdata(**kwargs) + rawdata = db.create_nova_rawdata(**kwargs) for field in get_model_fields(RawData): if field.name != 'id': self.assertEquals(getattr(rawdata, field.name), kwargs[field.name]) - raw_image_meta = RawDataImageMeta.objects.all()[0] - self.assertEquals(raw_image_meta.raw, rawdata) + raw_image_meta = RawDataImageMeta.objects.filter(raw_id=rawdata.id)[0] self.assertEquals(raw_image_meta.os_architecture, kwargs['os_architecture']) self.assertEquals(raw_image_meta.os_version, kwargs['os_version']) self.assertEquals(raw_image_meta.os_distro, kwargs['os_distro']) self.assertEquals(raw_image_meta.rax_options, kwargs['rax_options']) + + +class GlanceRawDataTestCase(TransactionTestCase): + def test_create_rawdata_should_populate_glance_rawdata(self): + deployment = db.get_or_create_deployment('deployment1')[0] + kwargs = { + 'deployment': deployment, + 'when': dt_to_decimal(datetime.utcnow()), + 'owner': '1234567', + 'json': '{}', + 'routing_key': 'glance_monitor.info', + 'image_type': 1, + 'publisher': 'publisher', + 'event': 'event', + 'service': 'service', + 'host': 'host', + 'instance': '1234-5678-9012-3456', + 'request_id': '1234', + 'uuid': '1234-5678-0912-3456', + 'status': 'active', + } + + db.create_glance_rawdata(**kwargs) + rawdata = GlanceRawData.objects.all().order_by('-id')[0] + + for field in get_model_fields(GlanceRawData): + if field.name != 'id': + self.assertEquals(getattr(rawdata, field.name), + kwargs[field.name]) + + +class GenericRawDataTestCase(TransactionTestCase): + def test_create_generic_rawdata_should_populate_generic_rawdata(self): + deployment = db.get_or_create_deployment('deployment1')[0] + kwargs = { + 'deployment': deployment, + 'when': dt_to_decimal(datetime.utcnow()), + 'tenant': '1234567', + 'json': '{}', + 'routing_key': 'monitor.info', + 'image_type': 1, + 'publisher': 'publisher', + 'event': 'event', + 'service': 'service', + 'host': 'host', + 'instance': '1234-5678-9012-3456', + 'request_id': '1234'} + + db.create_generic_rawdata(**kwargs) + rawdata = GenericRawData.objects.all()[0] + + for field in get_model_fields(GenericRawData): + if field.name != 'id': + self.assertEquals(getattr(rawdata, field.name), + kwargs[field.name]) diff --git a/stacktach/views.py b/stacktach/views.py index d639fe5..15da309 100644 --- a/stacktach/views.py +++ b/stacktach/views.py @@ -12,7 +12,7 @@ from stacktach import db as stackdb from stacktach import models from stacktach import stacklog from stacktach import utils -from stacktach.notification import Notification +from stacktach import notification STACKDB = stackdb @@ -25,13 +25,6 @@ def log_warn(msg): LOG.warn(msg) -# routing_key : handler - -NOTIFICATIONS = { - 'monitor.info': Notification, - 'monitor.error': Notification} - - def start_kpi_tracking(lifecycle, raw): """Start the clock for kpi timings when we see an instance.update coming in from an api node.""" @@ -279,10 +272,8 @@ def _process_exists(raw, body): values['tenant'] = payload['tenant_id'] image_meta = payload.get('image_meta', {}) values['rax_options'] = image_meta.get('com.rackspace__1__options', '') - os_arch = image_meta.get('org.openstack__1__architecture', '') - values['os_architecture'] = os_arch - os_version = image_meta.get('org.openstack__1__os_version', '') - values['os_version'] = os_version + values['os_architecture'] = image_meta.get('org.openstack__1__architecture', '') + values['os_version'] = image_meta.get('org.openstack__1__os_version', '') values['os_distro'] = image_meta.get('org.openstack__1__os_distro', '') deleted_at = payload.get('deleted_at') @@ -327,26 +318,29 @@ def aggregate_usage(raw, body): USAGE_PROCESS_MAPPING[raw.event](raw, body) -def process_raw_data(deployment, args, json_args): +def process_raw_data(deployment, args, json_args, exchange): """This is called directly by the worker to add the event to the db.""" db.reset_queries() routing_key, body = args - record = None - notification = NOTIFICATIONS[routing_key](body) - if notification: - values = notification.rawdata_kwargs(deployment, routing_key, json_args) - if not values: - return record - record = STACKDB.create_rawdata(**values) - return record + notif = notification.notification_factory(body, deployment, routing_key, + json_args, exchange) + return notif.save() -def post_process(raw, body): +def post_process_rawdata(raw, body): aggregate_lifecycle(raw) aggregate_usage(raw, body) +def post_process_glancerawdata(raw, body): + pass + + +def post_process_genericrawdata(raw, body): + pass + + def _post_process_raw_data(rows, highlight=None): for row in rows: if "error" in row.routing_key: diff --git a/tests/unit/test_models.py b/tests/unit/test_models.py new file mode 100644 index 0000000..9501e68 --- /dev/null +++ b/tests/unit/test_models.py @@ -0,0 +1,33 @@ +# Copyright (c) 2013 - Rackspace Inc. +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to +# deal in the Software without restriction, including without limitation the +# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or +# sell copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS +# IN THE SOFTWARE. + +import unittest +from stacktach.models import RawData, GlanceRawData, GenericRawData + + +class ModelsTestCase(unittest.TestCase): + def test_get_name_for_rawdata(self): + self.assertEquals(RawData.get_name(), 'RawData') + + def test_get_name_for_glancerawdata(self): + self.assertEquals(GlanceRawData.get_name(), 'GlanceRawData') + + def test_get_name_for_genericrawdata(self): + self.assertEquals(GenericRawData.get_name(), 'GenericRawData') \ No newline at end of file diff --git a/tests/unit/test_notification.py b/tests/unit/test_notification.py index feeb696..b1dffe3 100644 --- a/tests/unit/test_notification.py +++ b/tests/unit/test_notification.py @@ -18,163 +18,167 @@ # FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS # IN THE SOFTWARE. -from decimal import Decimal import unittest + +import mox + +from stacktach import notification, utils + from stacktach.notification import Notification -from tests.unit.utils import REQUEST_ID_1, TENANT_ID_1, INSTANCE_ID_1 +from stacktach.notification import GlanceNotification +from stacktach import db +from tests.unit.utils import REQUEST_ID_1 +from tests.unit.utils import TIMESTAMP_1 +from tests.unit.utils import TENANT_ID_1 +from tests.unit.utils import INSTANCE_ID_1 + + +class NovaNotificationTestCase(unittest.TestCase): + + def setUp(self): + self.mox = mox.Mox() + + def tearDown(self): + self.mox.UnsetStubs() + + def test_factory_should_return_nova_notification_for_nova_exchange( + self): + body = {} + deployment = "1" + json = "{}" + routing_key = "monitor.info" + self.mox.StubOutWithMock(notification, 'NovaNotification') + notification.NovaNotification(body, deployment, routing_key, json) + + self.mox.ReplayAll() + notification.notification_factory(body, deployment, routing_key, json, + 'nova') + self.mox.VerifyAll() + + def test_factory_should_return_glance_notification_for_glance_exchange( + self): + body = {} + deployment = "1" + json = "{}" + routing_key = "monitor_glance.info" + + self.mox.StubOutWithMock(notification, 'GlanceNotification') + notification.GlanceNotification(body, deployment, routing_key, json) + + self.mox.ReplayAll() + notification.notification_factory(body, deployment, routing_key, json, + 'glance') + self.mox.VerifyAll() + + def test_factory_should_return_notification_for_unknown_exchange( + self): + body = {} + deployment = "1" + json = "{}" + routing_key = "unknown.info" + + self.mox.StubOutWithMock(notification, 'Notification') + notification.Notification(body, deployment, routing_key, json) + + self.mox.ReplayAll() + notification.notification_factory(body, deployment, routing_key, json, + 'unknown_exchange') + self.mox.VerifyAll() + + +class GlanceNotificationTestCase(unittest.TestCase): + def setUp(self): + self.mox = mox.Mox() + + def tearDown(self): + self.mox.UnsetStubs() + + def test_save_should_persist_glance_rawdata_to_database(self): + body = { + "event_type": "image.upload", + "timestamp": "2013-06-20 17:31:57.939614", + "publisher_id": "glance-api01-r2961.global.preprod-ord.ohthree.com", + "payload": { + "status": "saving", + "properties": { + "image_type": "snapshot", + "instance_uuid": INSTANCE_ID_1, + }, + "owner": TENANT_ID_1, + "id": "2df2ccf6-bc1b-4853-aab0-25fda346b3bb", + } + } + deployment = "1" + routing_key = "glance_monitor.info" + json = '{["routing_key", {%s}]}' % body + self.mox.StubOutWithMock(db, 'create_glance_rawdata') + db.create_glance_rawdata( + deployment="1", + owner=TENANT_ID_1, + json=json, + routing_key=routing_key, + when=utils.str_time_to_unix("2013-06-20 17:31:57.939614"), + publisher="glance-api01-r2961.global.preprod-ord.ohthree.com", + event="image.upload", + service="glance-api01-r2961", + host="global.preprod-ord.ohthree.com", + instance=INSTANCE_ID_1, + request_id=None, + image_type=0, + status="saving", + uuid="2df2ccf6-bc1b-4853-aab0-25fda346b3bb") + + self.mox.ReplayAll() + + notification = GlanceNotification(body, deployment, routing_key, + json) + notification.save() + self.mox.VerifyAll() class NotificationTestCase(unittest.TestCase): + def setUp(self): + self.mox = mox.Mox() - def test_rawdata_kwargs(self): - message = { - 'event_type': 'compute.instance.create.start', - 'publisher_id': 'compute.cpu1-n01.example.com', + def tearDown(self): + self.mox.UnsetStubs() + + def test_save_should_persist_generic_rawdata_to_database(self): + body = { + "event_type": "image.upload", '_context_request_id': REQUEST_ID_1, '_context_project_id': TENANT_ID_1, - 'timestamp': '2013-06-12 06:30:52.790476', - 'payload': { + "timestamp": TIMESTAMP_1, + "publisher_id": "glance-api01-r2961.global.preprod-ord.ohthree.com", + "payload": { 'instance_id': INSTANCE_ID_1, - 'state': 'active', - 'old_state': 'building', - 'old_task_state': 'build', - "new_task_state": 'rebuild_spawning', - 'image_meta': { - 'image_type': 'base', - 'org.openstack__1__architecture': 'x64', - 'org.openstack__1__os_distro': 'com.microsoft.server', - 'org.openstack__1__os_version': '2008.2', - 'com.rackspace__1__options': '36' - } + "status": "saving", + "container_format": "ovf", + "properties": { + "image_type": "snapshot", + }, + "tenant": "5877054", } } - kwargs = Notification(message).rawdata_kwargs('1', 'monitor.info', 'json') + deployment = "1" + routing_key = "generic_monitor.info" + json = '{["routing_key", {%s}]}' % body + self.mox.StubOutWithMock(db, 'create_generic_rawdata') + db.create_generic_rawdata( + deployment="1", + tenant=TENANT_ID_1, + json=json, + routing_key=routing_key, + when=utils.str_time_to_unix(TIMESTAMP_1), + publisher="glance-api01-r2961.global.preprod-ord.ohthree.com", + event="image.upload", + service="glance-api01-r2961", + host="global.preprod-ord.ohthree.com", + instance=INSTANCE_ID_1, + request_id=REQUEST_ID_1) - self.assertEquals(kwargs['host'], 'cpu1-n01.example.com') - self.assertEquals(kwargs['deployment'], '1') - self.assertEquals(kwargs['routing_key'], 'monitor.info') - self.assertEquals(kwargs['tenant'], TENANT_ID_1) - self.assertEquals(kwargs['json'], 'json') - self.assertEquals(kwargs['state'], 'active') - self.assertEquals(kwargs['old_state'], 'building') - self.assertEquals(kwargs['old_task'], 'build') - self.assertEquals(kwargs['task'], 'rebuild_spawning') - self.assertEquals(kwargs['image_type'], 1) - self.assertEquals(kwargs['when'], Decimal('1371018652.790476')) - self.assertEquals(kwargs['publisher'], 'compute.cpu1-n01.example.com') - self.assertEquals(kwargs['event'], 'compute.instance.create.start') - self.assertEquals(kwargs['request_id'], REQUEST_ID_1) + self.mox.ReplayAll() - def test_rawdata_kwargs_missing_image_meta(self): - message = { - 'event_type': 'compute.instance.create.start', - 'publisher_id': 'compute.cpu1-n01.example.com', - '_context_request_id': REQUEST_ID_1, - '_context_project_id': TENANT_ID_1, - 'timestamp': '2013-06-12 06:30:52.790476', - 'payload': { - 'instance_id': INSTANCE_ID_1, - 'state': 'active', - 'old_state': 'building', - 'old_task_state': 'build', - "new_task_state": 'rebuild_spawning', - 'image_meta': { - 'image_type': 'base', - } - } - } - kwargs = Notification(message).rawdata_kwargs('1', 'monitor.info', 'json') - - self.assertEquals(kwargs['host'], 'cpu1-n01.example.com') - self.assertEquals(kwargs['deployment'], '1') - self.assertEquals(kwargs['routing_key'], 'monitor.info') - self.assertEquals(kwargs['tenant'], TENANT_ID_1) - self.assertEquals(kwargs['json'], 'json') - self.assertEquals(kwargs['state'], 'active') - self.assertEquals(kwargs['old_state'], 'building') - self.assertEquals(kwargs['old_task'], 'build') - self.assertEquals(kwargs['task'], 'rebuild_spawning') - self.assertEquals(kwargs['image_type'], 1) - self.assertEquals(kwargs['when'], Decimal('1371018652.790476')) - self.assertEquals(kwargs['publisher'], 'compute.cpu1-n01.example.com') - self.assertEquals(kwargs['event'], 'compute.instance.create.start') - self.assertEquals(kwargs['request_id'], REQUEST_ID_1) - - def test_rawdata_kwargs_for_message_with_no_host(self): - message = { - 'event_type': 'compute.instance.create.start', - 'publisher_id': 'compute', - '_context_request_id': REQUEST_ID_1, - '_context_project_id': TENANT_ID_1, - 'timestamp': '2013-06-12 06:30:52.790476', - 'payload': { - 'instance_id': INSTANCE_ID_1, - 'state': 'active', - 'old_state': 'building', - 'old_task_state': 'build', - "new_task_state": 'rebuild_spawning', - 'image_meta': { - 'image_type': 'base', - 'org.openstack__1__architecture': 'x64', - 'org.openstack__1__os_distro': 'com.microsoft.server', - 'org.openstack__1__os_version': '2008.2', - 'com.rackspace__1__options': '36' - } - } - } - kwargs = Notification(message).rawdata_kwargs('1', 'monitor.info', 'json') - self.assertEquals(kwargs['host'], None) - - self.assertEquals(kwargs['deployment'], '1') - self.assertEquals(kwargs['routing_key'], 'monitor.info') - self.assertEquals(kwargs['tenant'], TENANT_ID_1) - self.assertEquals(kwargs['json'], 'json') - self.assertEquals(kwargs['state'], 'active') - self.assertEquals(kwargs['old_state'], 'building') - self.assertEquals(kwargs['old_task'], 'build') - self.assertEquals(kwargs['task'], 'rebuild_spawning') - self.assertEquals(kwargs['image_type'], 1) - self.assertEquals(kwargs['when'], Decimal('1371018652.790476')) - self.assertEquals(kwargs['publisher'], 'compute') - self.assertEquals(kwargs['event'], 'compute.instance.create.start') - self.assertEquals(kwargs['request_id'], REQUEST_ID_1) - - def test_rawdata_kwargs_for_message_with_exception(self): - message = { - 'event_type': 'compute.instance.create.start', - 'publisher_id': 'compute.cpu1-n01.example.com', - '_context_request_id': REQUEST_ID_1, - '_context_project_id': TENANT_ID_1, - 'timestamp': '2013-06-12 06:30:52.790476', - 'payload': { - 'exception': {'kwargs':{'uuid': INSTANCE_ID_1}}, - 'instance_id': INSTANCE_ID_1, - 'state': 'active', - 'old_state': 'building', - 'old_task_state': 'build', - "new_task_state": 'rebuild_spawning', - 'image_meta': { - 'image_type': 'base', - 'org.openstack__1__architecture': 'x64', - 'org.openstack__1__os_distro': 'com.microsoft.server', - 'org.openstack__1__os_version': '2008.2', - 'com.rackspace__1__options': '36' - } - } - } - kwargs = Notification(message).rawdata_kwargs('1', 'monitor.info', 'json') - - self.assertEquals(kwargs['host'], 'cpu1-n01.example.com') - self.assertEquals(kwargs['deployment'], '1') - self.assertEquals(kwargs['routing_key'], 'monitor.info') - self.assertEquals(kwargs['tenant'], TENANT_ID_1) - self.assertEquals(kwargs['json'], 'json') - self.assertEquals(kwargs['state'], 'active') - self.assertEquals(kwargs['old_state'], 'building') - self.assertEquals(kwargs['old_task'], 'build') - self.assertEquals(kwargs['task'], 'rebuild_spawning') - self.assertEquals(kwargs['image_type'], 1) - self.assertEquals(kwargs['when'], Decimal('1371018652.790476')) - self.assertEquals(kwargs['publisher'], 'compute.cpu1-n01.example.com') - self.assertEquals(kwargs['event'], 'compute.instance.create.start') - self.assertEquals(kwargs['request_id'], REQUEST_ID_1) + notification = Notification(body, deployment, routing_key, json) + notification.save() + self.mox.VerifyAll() diff --git a/tests/unit/test_stacktach.py b/tests/unit/test_stacktach.py index d3e0853..4138997 100644 --- a/tests/unit/test_stacktach.py +++ b/tests/unit/test_stacktach.py @@ -37,6 +37,7 @@ from utils import INSTANCE_TYPE_ID_1 from utils import DUMMY_TIME from utils import INSTANCE_TYPE_ID_2 from stacktach import stacklog +from stacktach import notification from stacktach import views @@ -59,54 +60,45 @@ class StacktachRawParsingTestCase(unittest.TestCase): dict = { 'timestamp': when, } - args = ('monitor.info', dict) + routing_key = 'monitor.info' + args = (routing_key, dict) json_args = json.dumps(args) - raw_values = { - 'deployment': deployment, - 'when': utils.decimal_utc(datetime.datetime.strptime(when, '%Y-%m-%d %H:%M:%S.%f')), - 'host': 'api', - 'routing_key': 'monitor.info', - 'json': json_args - } - - old_info_handler = views.NOTIFICATIONS['monitor.info'] + mock_record = self.mox.CreateMockAnything() mock_notification = self.mox.CreateMockAnything() - mock_notification.rawdata_kwargs(deployment, 'monitor.info', json_args).AndReturn(raw_values) - views.NOTIFICATIONS['monitor.info'] = lambda message_body: mock_notification - - views.STACKDB.create_rawdata(**raw_values) + mock_notification.save().AndReturn(mock_record) + self.mox.StubOutWithMock(notification, 'notification_factory') + exchange = 'nova' + notification.notification_factory(dict, deployment, routing_key, + json_args, exchange).AndReturn( + mock_notification) self.mox.ReplayAll() - views.process_raw_data(deployment, args, json_args) - self.mox.VerifyAll() - views.NOTIFICATIONS['monitor.info'] = old_info_handler + self.assertEquals( + views.process_raw_data(deployment, args, json_args, exchange), + mock_record) + self.mox.VerifyAll() def test_process_raw_data_old_timestamp(self): deployment = self.mox.CreateMockAnything() when = '2013-1-25T13:38:23.123' dict = { '_context_timestamp': when, - } + } + routing_key = 'monitor.info' args = ('monitor.info', dict) json_args = json.dumps(args[1]) - raw_values = { - 'deployment': deployment, - 'when': utils.decimal_utc(datetime.datetime.strptime(when, '%Y-%m-%dT%H:%M:%S.%f')), - 'host': 'api', - 'routing_key': 'monitor.info', - 'json': json_args - } - old_info_handler = views.NOTIFICATIONS['monitor.info'] - mock_notification = self.mox.CreateMockAnything() - mock_notification.rawdata_kwargs(deployment, 'monitor.info', json_args).AndReturn(raw_values) - views.NOTIFICATIONS['monitor.info'] = lambda message_body: mock_notification - views.STACKDB.create_rawdata(**raw_values) + mock_notification = self.mox.CreateMockAnything() + mock_notification.save() + self.mox.StubOutWithMock(notification, 'notification_factory') + exchange = 'nova' + notification.notification_factory(dict, deployment, routing_key, + json_args, exchange).AndReturn(mock_notification) self.mox.ReplayAll() - views.process_raw_data(deployment, args, json_args) + + views.process_raw_data(deployment, args, json_args, exchange) self.mox.VerifyAll() - views.NOTIFICATIONS['monitor.info'] = old_info_handler class StacktachLifecycleTestCase(unittest.TestCase): def setUp(self): diff --git a/tests/unit/test_worker.py b/tests/unit/test_worker.py index 442b967..f21c227 100644 --- a/tests/unit/test_worker.py +++ b/tests/unit/test_worker.py @@ -22,15 +22,14 @@ import json import unittest import kombu -import kombu.entity -import kombu.connection import mox -from stacktach import db, views +from stacktach import db +from stacktach import views import worker.worker as worker -class NovaConsumerTestCase(unittest.TestCase): +class ConsumerTestCase(unittest.TestCase): def setUp(self): self.mox = mox.Mox() @@ -47,9 +46,10 @@ class NovaConsumerTestCase(unittest.TestCase): consumer = self.mox.CreateMockAnything() created_consumers.append(consumer) return consumer - self.mox.StubOutWithMock(worker.NovaConsumer, '_create_exchange') - self.mox.StubOutWithMock(worker.NovaConsumer, '_create_queue') - consumer = worker.NovaConsumer('test', None, None, True, {}) + self.mox.StubOutWithMock(worker.Consumer, '_create_exchange') + self.mox.StubOutWithMock(worker.Consumer, '_create_queue') + consumer = worker.Consumer('test', None, None, True, {}, "nova", + ["monitor.info", "monitor.error"]) exchange = self.mox.CreateMockAnything() consumer._create_exchange('nova', 'topic').AndReturn(exchange) info_queue = self.mox.CreateMockAnything() @@ -71,7 +71,8 @@ class NovaConsumerTestCase(unittest.TestCase): def test_create_exchange(self): args = {'key': 'value'} - consumer = worker.NovaConsumer('test', None, None, True, args) + consumer = worker.Consumer('test', None, None, True, args, 'nova', + ["monitor.info", "monitor.error"]) self.mox.StubOutClassWithMocks(kombu.entity, 'Exchange') exchange = kombu.entity.Exchange('nova', type='topic', exclusive=False, @@ -87,7 +88,8 @@ class NovaConsumerTestCase(unittest.TestCase): queue = kombu.Queue('name', exchange, auto_delete=False, durable=True, exclusive=False, routing_key='routing.key', queue_arguments={}) - consumer = worker.NovaConsumer('test', None, None, True, {}) + consumer = worker.Consumer('test', None, None, True, {}, 'nova', + ["monitor.info", "monitor.error"]) self.mox.ReplayAll() actual_queue = consumer._create_queue('name', exchange, 'routing.key', exclusive=False, @@ -103,7 +105,8 @@ class NovaConsumerTestCase(unittest.TestCase): queue = kombu.Queue('name', exchange, auto_delete=False, durable=True, exclusive=False, routing_key='routing.key', queue_arguments=queue_args) - consumer = worker.NovaConsumer('test', None, None, True, queue_args) + consumer = worker.Consumer('test', None, None, True, queue_args, + 'nova', ["monitor.info", "monitor.error"]) self.mox.ReplayAll() actual_queue = consumer._create_queue('name', exchange, 'routing.key', exclusive=False, @@ -114,21 +117,29 @@ class NovaConsumerTestCase(unittest.TestCase): def test_process(self): deployment = self.mox.CreateMockAnything() raw = self.mox.CreateMockAnything() + raw.get_name().AndReturn('RawData') message = self.mox.CreateMockAnything() - consumer = worker.NovaConsumer('test', None, deployment, True, {}) + exchange = 'nova' + consumer = worker.Consumer('test', None, deployment, True, {}, + exchange, ["monitor.info", "monitor.error"]) routing_key = 'monitor.info' message.delivery_info = {'routing_key': routing_key} body_dict = {u'key': u'value'} message.body = json.dumps(body_dict) + + mock_post_process_method = self.mox.CreateMockAnything() + mock_post_process_method(raw, body_dict) + old_handler = worker.POST_PROCESS_METHODS + worker.POST_PROCESS_METHODS["RawData"] = mock_post_process_method + self.mox.StubOutWithMock(views, 'process_raw_data', use_mock_anything=True) args = (routing_key, body_dict) - views.process_raw_data(deployment, args, json.dumps(args))\ - .AndReturn(raw) + views.process_raw_data(deployment, args, json.dumps(args), exchange) \ + .AndReturn(raw) message.ack() - self.mox.StubOutWithMock(views, 'post_process') - views.post_process(raw, body_dict) + self.mox.StubOutWithMock(consumer, '_check_memory', use_mock_anything=True) consumer._check_memory() @@ -136,13 +147,16 @@ class NovaConsumerTestCase(unittest.TestCase): consumer._process(message) self.assertEqual(consumer.processed, 1) self.mox.VerifyAll() + worker.POST_PROCESS_METHODS["RawData"] = old_handler def test_process_no_raw_dont_ack(self): deployment = self.mox.CreateMockAnything() raw = self.mox.CreateMockAnything() message = self.mox.CreateMockAnything() - consumer = worker.NovaConsumer('test', None, deployment, True, {}) + exchange = 'nova' + consumer = worker.Consumer('test', None, deployment, True, {}, + exchange, ["monitor.info", "monitor.error"]) routing_key = 'monitor.info' message.delivery_info = {'routing_key': routing_key} body_dict = {u'key': u'value'} @@ -150,8 +164,8 @@ class NovaConsumerTestCase(unittest.TestCase): self.mox.StubOutWithMock(views, 'process_raw_data', use_mock_anything=True) args = (routing_key, body_dict) - views.process_raw_data(deployment, args, json.dumps(args))\ - .AndReturn(None) + views.process_raw_data(deployment, args, json.dumps(args), exchange) \ + .AndReturn(None) self.mox.StubOutWithMock(consumer, '_check_memory', use_mock_anything=True) consumer._check_memory() @@ -168,7 +182,9 @@ class NovaConsumerTestCase(unittest.TestCase): 'rabbit_port': 5672, 'rabbit_userid': 'rabbit', 'rabbit_password': 'rabbit', - 'rabbit_virtual_host': '/' + 'rabbit_virtual_host': '/', + "services": ["nova"], + "topics": {"nova": ["monitor.info", "monitor.error"]} } self.mox.StubOutWithMock(db, 'get_or_create_deployment') deployment = self.mox.CreateMockAnything() @@ -187,13 +203,15 @@ class NovaConsumerTestCase(unittest.TestCase): kombu.connection.BrokerConnection(**params).AndReturn(conn) conn.__enter__().AndReturn(conn) conn.__exit__(None, None, None).AndReturn(None) - self.mox.StubOutClassWithMocks(worker, 'NovaConsumer') - consumer = worker.NovaConsumer(config['name'], conn, deployment, - config['durable_queue'], {}) + self.mox.StubOutClassWithMocks(worker, 'Consumer') + exchange = 'nova' + consumer = worker.Consumer(config['name'], conn, deployment, + config['durable_queue'], {}, exchange, + ["monitor.info", "monitor.error"]) consumer.run() worker.continue_running().AndReturn(False) self.mox.ReplayAll() - worker.run(config) + worker.run(config, exchange) self.mox.VerifyAll() def test_run_queue_args(self): @@ -205,7 +223,9 @@ class NovaConsumerTestCase(unittest.TestCase): 'rabbit_userid': 'rabbit', 'rabbit_password': 'rabbit', 'rabbit_virtual_host': '/', - 'queue_arguments': {'x-ha-policy': 'all'} + 'queue_arguments': {'x-ha-policy': 'all'}, + "services": ["nova"], + "topics": {"nova": ["monitor.info", "monitor.error"]} } self.mox.StubOutWithMock(db, 'get_or_create_deployment') deployment = self.mox.CreateMockAnything() @@ -224,12 +244,14 @@ class NovaConsumerTestCase(unittest.TestCase): kombu.connection.BrokerConnection(**params).AndReturn(conn) conn.__enter__().AndReturn(conn) conn.__exit__(None, None, None).AndReturn(None) - self.mox.StubOutClassWithMocks(worker, 'NovaConsumer') - consumer = worker.NovaConsumer(config['name'], conn, deployment, - config['durable_queue'], - config['queue_arguments']) + self.mox.StubOutClassWithMocks(worker, 'Consumer') + exchange = 'nova' + consumer = worker.Consumer(config['name'], conn, deployment, + config['durable_queue'], + config['queue_arguments'], exchange, + ["monitor.info", "monitor.error"]) consumer.run() worker.continue_running().AndReturn(False) self.mox.ReplayAll() - worker.run(config) - self.mox.VerifyAll() \ No newline at end of file + worker.run(config, exchange) + self.mox.VerifyAll() diff --git a/tests/unit/utils.py b/tests/unit/utils.py index 961f3ce..437e169 100644 --- a/tests/unit/utils.py +++ b/tests/unit/utils.py @@ -52,6 +52,7 @@ OS_ARCH_2 = "x64" OS_VERSION_1 = "1" OS_VERSION_2 = "2" +TIMESTAMP_1 = "2013-06-20 17:31:57.939614" def decimal_utc(t = datetime.datetime.utcnow()): return dt.dt_to_decimal(t) diff --git a/worker/config.py b/worker/config.py new file mode 100644 index 0000000..28c8b5e --- /dev/null +++ b/worker/config.py @@ -0,0 +1,43 @@ +# Copyright (c) 2013 - Rackspace Inc. +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to +# deal in the Software without restriction, including without limitation the +# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or +# sell copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS +# IN THE SOFTWARE. +import json +import os + +config_filename = os.environ.get('STACKTACH_DEPLOYMENTS_FILE', + 'stacktach_worker_config.json') +try: + from local_settings import * + config_filename = STACKTACH_DEPLOYMENTS_FILE +except ImportError: + pass + +config = None +with open(config_filename, "r") as f: + config = json.load(f) + + +def deployments(): + return config['deployments'] + + +def topics(): + return config['topics'] + + diff --git a/worker/start_workers.py b/worker/start_workers.py index 19a63d9..e7fc3d5 100644 --- a/worker/start_workers.py +++ b/worker/start_workers.py @@ -1,9 +1,9 @@ -import json import os import signal import sys from multiprocessing import Process +from worker import config POSSIBLE_TOPDIR = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]), os.pardir, os.pardir)) @@ -12,14 +12,6 @@ if os.path.exists(os.path.join(POSSIBLE_TOPDIR, 'stacktach')): import worker.worker as worker -config_filename = os.environ.get('STACKTACH_DEPLOYMENTS_FILE', - 'stacktach_worker_config.json') -try: - from local_settings import * - config_filename = STACKTACH_DEPLOYMENTS_FILE -except ImportError: - pass - processes = [] @@ -35,18 +27,15 @@ def kill_time(signal, frame): if __name__ == '__main__': - config = None - with open(config_filename, "r") as f: - config = json.load(f) - deployments = config['deployments'] - - for deployment in deployments: + for deployment in config.deployments(): if deployment.get('enabled', True): - process = Process(target=worker.run, args=(deployment,)) - process.daemon = True - process.start() - processes.append(process) + for exchange in deployment.get('topics').keys(): + process = Process(target=worker.run, args=(deployment, + exchange,)) + process.daemon = True + process.start() + processes.append(process) signal.signal(signal.SIGINT, kill_time) signal.signal(signal.SIGTERM, kill_time) signal.pause() diff --git a/worker/worker.py b/worker/worker.py index a9e8080..c3b584e 100644 --- a/worker/worker.py +++ b/worker/worker.py @@ -17,12 +17,13 @@ # to set TENANT_ID and URL to point to your StackTach web server. import datetime -import kombu -import kombu.entity -import kombu.mixins import sys import time +import kombu +import kombu.mixins + + try: import ujson as json except ImportError: @@ -41,8 +42,9 @@ stacklog.set_default_logger_name('worker') LOG = stacklog.get_logger() -class NovaConsumer(kombu.mixins.ConsumerMixin): - def __init__(self, name, connection, deployment, durable, queue_arguments): +class Consumer(kombu.mixins.ConsumerMixin): + def __init__(self, name, connection, deployment, durable, queue_arguments, + exchange, topics): self.connection = connection self.deployment = deployment self.durable = durable @@ -52,6 +54,8 @@ class NovaConsumer(kombu.mixins.ConsumerMixin): self.pmi = None self.processed = 0 self.total_processed = 0 + self.topics = topics + self.exchange = exchange def _create_exchange(self, name, type, exclusive=False, auto_delete=False): return kombu.entity.Exchange(name, type=type, exclusive=exclusive, @@ -66,14 +70,12 @@ class NovaConsumer(kombu.mixins.ConsumerMixin): routing_key=routing_key) def get_consumers(self, Consumer, channel): - nova_exchange = self._create_exchange("nova", "topic") + exchange = self._create_exchange(self.exchange, "topic") - nova_queues = [ - self._create_queue('monitor.info', nova_exchange, 'monitor.info'), - self._create_queue('monitor.error', nova_exchange, 'monitor.error') - ] + queues = [self._create_queue(topic, exchange, topic) + for topic in self.topics] - return [Consumer(queues=nova_queues, callbacks=[self.on_nova])] + return [Consumer(queues=queues, callbacks=[self.on_nova])] def _process(self, message): routing_key = message.delivery_info['routing_key'] @@ -81,14 +83,13 @@ class NovaConsumer(kombu.mixins.ConsumerMixin): body = str(message.body) args = (routing_key, json.loads(body)) asJson = json.dumps(args) - # save raw and ack the message - raw = views.process_raw_data(self.deployment, args, asJson) + raw = views.process_raw_data(self.deployment, args, asJson, self.exchange) if raw: self.processed += 1 message.ack() - views.post_process(raw, args[1]) + POST_PROCESS_METHODS[raw.get_name()](raw, args[1]) self._check_memory() @@ -140,7 +141,7 @@ def exit_or_sleep(exit=False): time.sleep(5) -def run(deployment_config): +def run(deployment_config, exchange): name = deployment_config['name'] host = deployment_config.get('rabbit_host', 'localhost') port = deployment_config.get('rabbit_port', 5672) @@ -150,11 +151,13 @@ def run(deployment_config): durable = deployment_config.get('durable_queue', True) queue_arguments = deployment_config.get('queue_arguments', {}) exit_on_exception = deployment_config.get('exit_on_exception', False) + topics = deployment_config.get('topics', {}) deployment, new = db.get_or_create_deployment(name) - print "Starting worker for '%s'" % name - LOG.info("%s: %s %s %s %s" % (name, host, port, user_id, virtual_host)) + print "Starting worker for '%s %s'" % (name, exchange) + LOG.info("%s: %s %s %s %s %s" % (name, exchange, host, port, user_id, + virtual_host)) params = dict(hostname=host, port=port, @@ -166,21 +169,30 @@ def run(deployment_config): # continue_running() is used for testing while continue_running(): try: - LOG.debug("Processing on '%s'" % name) + LOG.debug("Processing on '%s %s'" % (name, exchange)) with kombu.connection.BrokerConnection(**params) as conn: try: - consumer = NovaConsumer(name, conn, deployment, durable, - queue_arguments) + consumer = Consumer(name, conn, deployment, durable, + queue_arguments, exchange, + topics[exchange]) consumer.run() except Exception as e: LOG.error("!!!!Exception!!!!") - LOG.exception("name=%s, exception=%s. Reconnecting in 5s" % - (name, e)) + LOG.exception("name=%s, exchange=%s, exception=%s. " + "Reconnecting in 5s" % + (name, exchange, e)) exit_or_sleep(exit_on_exception) - LOG.debug("Completed processing on '%s'" % name) + LOG.debug("Completed processing on '%s %s'" % (name, exchange)) except: LOG.error("!!!!Exception!!!!") e = sys.exc_info()[0] - msg = "Uncaught exception: deployment=%s, exception=%s. Retrying in 5s" - LOG.exception(msg % (name, e)) + msg = "Uncaught exception: deployment=%s, exchange=%s, " \ + "exception=%s. Retrying in 5s" + LOG.exception(msg % (name, exchange, e)) exit_or_sleep(exit_on_exception) + +POST_PROCESS_METHODS = { + 'RawData': views.post_process_rawdata, + 'GlanceRawData': views.post_process_glancerawdata, + 'GenericRawData': views.post_process_genericrawdata +} \ No newline at end of file