diff --git a/entropy/audit/base.py b/entropy/audit/base.py index 07ee7e5..9400e74 100644 --- a/entropy/audit/base.py +++ b/entropy/audit/base.py @@ -14,6 +14,8 @@ import abc import logging +from kombu import Queue + LOG = logging.getLogger(__name__) @@ -21,9 +23,14 @@ LOG = logging.getLogger(__name__) class AuditBase(object): __metaclass__ = abc.ABCMeta + def __init__(self, **kwargs): + self.name = kwargs['name'] + self.exchange = kwargs['exchange'] + self.routing_key = kwargs['routing_key'] + self.message_queue = Queue(self.name, + self.exchange, + self.routing_key) + @abc.abstractmethod def send_message(self, **kwargs): pass - - def test(self): - pass diff --git a/entropy/audit/vmbooter.py b/entropy/audit/vmbooter.py index a84bae9..4612a5d 100644 --- a/entropy/audit/vmbooter.py +++ b/entropy/audit/vmbooter.py @@ -24,7 +24,6 @@ from novaclient.client import Client import paramiko import base -from entropy.queues import entropy_exchange LOG = logging.getLogger(__name__) @@ -127,9 +126,9 @@ class Audit(base.AuditBase): message = {'From': __name__, 'Date': str(datetime.datetime.now().isoformat())} with producers[connection].acquire(block=True) as producer: - maybe_declare(entropy_exchange, producer.channel) + maybe_declare(kwargs['exchange'], producer.channel) message['payload'] = self.boot_vm_with_cli(**kwargs) producer.publish(message, - exchange=entropy_exchange, - routing_key='vmboot', + exchange=self.exchange, + routing_key=self.routing_key, serializer='json') diff --git a/entropy/audit/vmbooter.template b/entropy/audit/vmbooter.template index 43eb283..ebf9eef 100644 --- a/entropy/audit/vmbooter.template +++ b/entropy/audit/vmbooter.template @@ -1,6 +1,7 @@ { "name" : "vmbooter", "hostname" : "localhost", + "routing_key": "vmboot", "schedule" :"*/1 * * * *", "mq_host": "localhost", "mq_port": "5672", diff --git a/entropy/engine.py b/entropy/engine.py index 42fac2f..c4f0c8e 100644 --- a/entropy/engine.py +++ b/entropy/engine.py @@ -21,6 +21,7 @@ import logging import os import croniter +from kombu import Exchange import pause from entropy import utils @@ -35,6 +36,7 @@ class Engine(object): self.max_workers = 8 self.audit_type = 'audit' self.repair_type = 'repair' + self.entropy_exchange = Exchange('entropy_exchage', type='fanout') # engine variables self.name = name self.audit_cfg = cfg_data['audit_cfg'] @@ -130,11 +132,10 @@ class Engine(object): self.running_audits.append(script['name']) # start a process for this audit script - future = self.executor.submit(Engine.start_audit, script) + future = self.executor.submit(self.start_audit, script) return future - @staticmethod - def start_audit(script): + def start_audit(self, script): LOG.info("Starting audit for %s", script['name']) data = dict(utils.load_yaml(script['conf']).next()) schedule = data['schedule'] @@ -144,11 +145,10 @@ class Engine(object): while True: LOG.info('Next call at %s', next_iteration) pause.until(next_iteration) - Engine.run_audit(script) + self.run_audit(script) next_iteration = cron.get_next(datetime.datetime) - @staticmethod - def run_audit(script): + def run_audit(self, script): # Read the conf file data = dict(utils.load_yaml(script['conf']).next()) # general stuff for the audit module @@ -159,18 +159,14 @@ class Engine(object): 'mq_password': data['mq_password']} kwargs = data kwargs['mq_args'] = mq_args + kwargs['exchange'] = self.entropy_exchange # Put a message on the mq #TODO(praneshp): this should be the path with register-audit - #TODO(praneshp): The whole logic in this function should be in - # try except blocks - available_modules = utils.find_module(kwargs['module'], ['audit']) - LOG.info('Found these modules: %s', available_modules) - if not available_modules: - LOG.error('No module to load') - else: + try: + available_modules = utils.find_module(kwargs['module'], ['audit']) + LOG.info('Found these modules: %s', available_modules) imported_module = utils.import_module(available_modules[0]) - audit_obj = imported_module.Audit() - try: - audit_obj.send_message(**kwargs) - except Exception: + audit_obj = imported_module.Audit(**kwargs) + audit_obj.send_message(**kwargs) + except Exception: LOG.exception('Could not run audit %s', kwargs['name']) diff --git a/entropy/queues.py b/entropy/queues.py deleted file mode 100644 index 420f3ef..0000000 --- a/entropy/queues.py +++ /dev/null @@ -1,28 +0,0 @@ -# -*- coding: utf-8 -*- -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright (C) 2013 Yahoo! Inc. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from kombu import Exchange -from kombu import Queue - -PASS_KEY = 'pass' -FAIL_KEY = 'fail' - -entropy_exchange = Exchange('entropy_exchage', type='fanout') -pass_events = Queue('pass', entropy_exchange, routing_key=PASS_KEY) -fail_events = Queue('fail', entropy_exchange, routing_key=FAIL_KEY) -vm_count_events = Queue('vmcount', entropy_exchange, routing_key='vmcount') -vm_boot_events = Queue('vmboot', entropy_exchange, routing_key='vmboot') diff --git a/entropy/utils.py b/entropy/utils.py index 9e0c8e5..1cd01ac 100644 --- a/entropy/utils.py +++ b/entropy/utils.py @@ -51,6 +51,7 @@ def import_module(module_name): return sys.modules[module_name] +# TODO(praneshp): return exception isntead def find_module(base_name, search_paths, required_attrs=None): found_places = [] if not required_attrs: