Do not create queues statically

Queues should be created programatically by audit scripts. Engine
should provide an exchange, and audit scripts can create queues
using that exchange.

Changed code in engine.py to create an exchange in the constructor.
Changed the base audit object to include a constructor that creates
queues.
Added an associated option to audit template to demonstrate this
feature.

Moved some code in start_audit to try/except blocks. Was a TODO

Change-Id: Ibe1e2283d6b182cd74b33c684a1477e5afe68f67
This commit is contained in:
pran1990 2014-04-09 01:16:34 -07:00
parent eb02881170
commit 73c6e2ece8
6 changed files with 28 additions and 52 deletions

View File

@ -14,6 +14,8 @@
import abc
import logging
from kombu import Queue
LOG = logging.getLogger(__name__)
@ -21,9 +23,14 @@ LOG = logging.getLogger(__name__)
class AuditBase(object):
__metaclass__ = abc.ABCMeta
def __init__(self, **kwargs):
self.name = kwargs['name']
self.exchange = kwargs['exchange']
self.routing_key = kwargs['routing_key']
self.message_queue = Queue(self.name,
self.exchange,
self.routing_key)
@abc.abstractmethod
def send_message(self, **kwargs):
pass
def test(self):
pass

View File

@ -24,7 +24,6 @@ from novaclient.client import Client
import paramiko
import base
from entropy.queues import entropy_exchange
LOG = logging.getLogger(__name__)
@ -127,9 +126,9 @@ class Audit(base.AuditBase):
message = {'From': __name__,
'Date': str(datetime.datetime.now().isoformat())}
with producers[connection].acquire(block=True) as producer:
maybe_declare(entropy_exchange, producer.channel)
maybe_declare(kwargs['exchange'], producer.channel)
message['payload'] = self.boot_vm_with_cli(**kwargs)
producer.publish(message,
exchange=entropy_exchange,
routing_key='vmboot',
exchange=self.exchange,
routing_key=self.routing_key,
serializer='json')

View File

@ -1,6 +1,7 @@
{
"name" : "vmbooter",
"hostname" : "localhost",
"routing_key": "vmboot",
"schedule" :"*/1 * * * *",
"mq_host": "localhost",
"mq_port": "5672",

View File

@ -21,6 +21,7 @@ import logging
import os
import croniter
from kombu import Exchange
import pause
from entropy import utils
@ -35,6 +36,7 @@ class Engine(object):
self.max_workers = 8
self.audit_type = 'audit'
self.repair_type = 'repair'
self.entropy_exchange = Exchange('entropy_exchage', type='fanout')
# engine variables
self.name = name
self.audit_cfg = cfg_data['audit_cfg']
@ -130,11 +132,10 @@ class Engine(object):
self.running_audits.append(script['name'])
# start a process for this audit script
future = self.executor.submit(Engine.start_audit, script)
future = self.executor.submit(self.start_audit, script)
return future
@staticmethod
def start_audit(script):
def start_audit(self, script):
LOG.info("Starting audit for %s", script['name'])
data = dict(utils.load_yaml(script['conf']).next())
schedule = data['schedule']
@ -144,11 +145,10 @@ class Engine(object):
while True:
LOG.info('Next call at %s', next_iteration)
pause.until(next_iteration)
Engine.run_audit(script)
self.run_audit(script)
next_iteration = cron.get_next(datetime.datetime)
@staticmethod
def run_audit(script):
def run_audit(self, script):
# Read the conf file
data = dict(utils.load_yaml(script['conf']).next())
# general stuff for the audit module
@ -159,18 +159,14 @@ class Engine(object):
'mq_password': data['mq_password']}
kwargs = data
kwargs['mq_args'] = mq_args
kwargs['exchange'] = self.entropy_exchange
# Put a message on the mq
#TODO(praneshp): this should be the path with register-audit
#TODO(praneshp): The whole logic in this function should be in
# try except blocks
available_modules = utils.find_module(kwargs['module'], ['audit'])
LOG.info('Found these modules: %s', available_modules)
if not available_modules:
LOG.error('No module to load')
else:
try:
available_modules = utils.find_module(kwargs['module'], ['audit'])
LOG.info('Found these modules: %s', available_modules)
imported_module = utils.import_module(available_modules[0])
audit_obj = imported_module.Audit()
try:
audit_obj.send_message(**kwargs)
except Exception:
audit_obj = imported_module.Audit(**kwargs)
audit_obj.send_message(**kwargs)
except Exception:
LOG.exception('Could not run audit %s', kwargs['name'])

View File

@ -1,28 +0,0 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2013 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from kombu import Exchange
from kombu import Queue
PASS_KEY = 'pass'
FAIL_KEY = 'fail'
entropy_exchange = Exchange('entropy_exchage', type='fanout')
pass_events = Queue('pass', entropy_exchange, routing_key=PASS_KEY)
fail_events = Queue('fail', entropy_exchange, routing_key=FAIL_KEY)
vm_count_events = Queue('vmcount', entropy_exchange, routing_key='vmcount')
vm_boot_events = Queue('vmboot', entropy_exchange, routing_key='vmboot')

View File

@ -51,6 +51,7 @@ def import_module(module_name):
return sys.modules[module_name]
# TODO(praneshp): return exception isntead
def find_module(base_name, search_paths, required_attrs=None):
found_places = []
if not required_attrs: