From 73c6e2ece818f2fb55b4d26ba7ec41303c3c4567 Mon Sep 17 00:00:00 2001 From: pran1990 Date: Wed, 9 Apr 2014 01:16:34 -0700 Subject: [PATCH] Do not create queues statically Queues should be created programatically by audit scripts. Engine should provide an exchange, and audit scripts can create queues using that exchange. Changed code in engine.py to create an exchange in the constructor. Changed the base audit object to include a constructor that creates queues. Added an associated option to audit template to demonstrate this feature. Moved some code in start_audit to try/except blocks. Was a TODO Change-Id: Ibe1e2283d6b182cd74b33c684a1477e5afe68f67 --- entropy/audit/base.py | 13 ++++++++++--- entropy/audit/vmbooter.py | 7 +++---- entropy/audit/vmbooter.template | 1 + entropy/engine.py | 30 +++++++++++++----------------- entropy/queues.py | 28 ---------------------------- entropy/utils.py | 1 + 6 files changed, 28 insertions(+), 52 deletions(-) delete mode 100644 entropy/queues.py diff --git a/entropy/audit/base.py b/entropy/audit/base.py index 07ee7e5..9400e74 100644 --- a/entropy/audit/base.py +++ b/entropy/audit/base.py @@ -14,6 +14,8 @@ import abc import logging +from kombu import Queue + LOG = logging.getLogger(__name__) @@ -21,9 +23,14 @@ LOG = logging.getLogger(__name__) class AuditBase(object): __metaclass__ = abc.ABCMeta + def __init__(self, **kwargs): + self.name = kwargs['name'] + self.exchange = kwargs['exchange'] + self.routing_key = kwargs['routing_key'] + self.message_queue = Queue(self.name, + self.exchange, + self.routing_key) + @abc.abstractmethod def send_message(self, **kwargs): pass - - def test(self): - pass diff --git a/entropy/audit/vmbooter.py b/entropy/audit/vmbooter.py index a84bae9..4612a5d 100644 --- a/entropy/audit/vmbooter.py +++ b/entropy/audit/vmbooter.py @@ -24,7 +24,6 @@ from novaclient.client import Client import paramiko import base -from entropy.queues import entropy_exchange LOG = logging.getLogger(__name__) @@ -127,9 +126,9 @@ class Audit(base.AuditBase): message = {'From': __name__, 'Date': str(datetime.datetime.now().isoformat())} with producers[connection].acquire(block=True) as producer: - maybe_declare(entropy_exchange, producer.channel) + maybe_declare(kwargs['exchange'], producer.channel) message['payload'] = self.boot_vm_with_cli(**kwargs) producer.publish(message, - exchange=entropy_exchange, - routing_key='vmboot', + exchange=self.exchange, + routing_key=self.routing_key, serializer='json') diff --git a/entropy/audit/vmbooter.template b/entropy/audit/vmbooter.template index 43eb283..ebf9eef 100644 --- a/entropy/audit/vmbooter.template +++ b/entropy/audit/vmbooter.template @@ -1,6 +1,7 @@ { "name" : "vmbooter", "hostname" : "localhost", + "routing_key": "vmboot", "schedule" :"*/1 * * * *", "mq_host": "localhost", "mq_port": "5672", diff --git a/entropy/engine.py b/entropy/engine.py index 42fac2f..c4f0c8e 100644 --- a/entropy/engine.py +++ b/entropy/engine.py @@ -21,6 +21,7 @@ import logging import os import croniter +from kombu import Exchange import pause from entropy import utils @@ -35,6 +36,7 @@ class Engine(object): self.max_workers = 8 self.audit_type = 'audit' self.repair_type = 'repair' + self.entropy_exchange = Exchange('entropy_exchage', type='fanout') # engine variables self.name = name self.audit_cfg = cfg_data['audit_cfg'] @@ -130,11 +132,10 @@ class Engine(object): self.running_audits.append(script['name']) # start a process for this audit script - future = self.executor.submit(Engine.start_audit, script) + future = self.executor.submit(self.start_audit, script) return future - @staticmethod - def start_audit(script): + def start_audit(self, script): LOG.info("Starting audit for %s", script['name']) data = dict(utils.load_yaml(script['conf']).next()) schedule = data['schedule'] @@ -144,11 +145,10 @@ class Engine(object): while True: LOG.info('Next call at %s', next_iteration) pause.until(next_iteration) - Engine.run_audit(script) + self.run_audit(script) next_iteration = cron.get_next(datetime.datetime) - @staticmethod - def run_audit(script): + def run_audit(self, script): # Read the conf file data = dict(utils.load_yaml(script['conf']).next()) # general stuff for the audit module @@ -159,18 +159,14 @@ class Engine(object): 'mq_password': data['mq_password']} kwargs = data kwargs['mq_args'] = mq_args + kwargs['exchange'] = self.entropy_exchange # Put a message on the mq #TODO(praneshp): this should be the path with register-audit - #TODO(praneshp): The whole logic in this function should be in - # try except blocks - available_modules = utils.find_module(kwargs['module'], ['audit']) - LOG.info('Found these modules: %s', available_modules) - if not available_modules: - LOG.error('No module to load') - else: + try: + available_modules = utils.find_module(kwargs['module'], ['audit']) + LOG.info('Found these modules: %s', available_modules) imported_module = utils.import_module(available_modules[0]) - audit_obj = imported_module.Audit() - try: - audit_obj.send_message(**kwargs) - except Exception: + audit_obj = imported_module.Audit(**kwargs) + audit_obj.send_message(**kwargs) + except Exception: LOG.exception('Could not run audit %s', kwargs['name']) diff --git a/entropy/queues.py b/entropy/queues.py deleted file mode 100644 index 420f3ef..0000000 --- a/entropy/queues.py +++ /dev/null @@ -1,28 +0,0 @@ -# -*- coding: utf-8 -*- -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright (C) 2013 Yahoo! Inc. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from kombu import Exchange -from kombu import Queue - -PASS_KEY = 'pass' -FAIL_KEY = 'fail' - -entropy_exchange = Exchange('entropy_exchage', type='fanout') -pass_events = Queue('pass', entropy_exchange, routing_key=PASS_KEY) -fail_events = Queue('fail', entropy_exchange, routing_key=FAIL_KEY) -vm_count_events = Queue('vmcount', entropy_exchange, routing_key='vmcount') -vm_boot_events = Queue('vmboot', entropy_exchange, routing_key='vmboot') diff --git a/entropy/utils.py b/entropy/utils.py index 9e0c8e5..1cd01ac 100644 --- a/entropy/utils.py +++ b/entropy/utils.py @@ -51,6 +51,7 @@ def import_module(module_name): return sys.modules[module_name] +# TODO(praneshp): return exception isntead def find_module(base_name, search_paths, required_attrs=None): found_places = [] if not required_attrs: