Added a message queue

Added kombu, an interface to AMQP

Added react.py, sample reaction script with a kombu consumer.
Should be run separately. Just loops forever listening for
messages on the PASS_EVENTS queue/

Added queues.py, define an exchange and two queues to use
for the message queue

Replaced runthis.sh with audit.py, changed runthis.json to
audit.json. audit.py has send_message(), which sends a message
to the PASS_EVENTS queue

Configuration data for audit.py and react.py should be picked up
from audit.json and react.json.

Added H304, H302 to pep8 ignores

Change-Id: I32d5c007d3b00ee2ec33635bca1567da4447124c
This commit is contained in:
pran1990 2013-12-14 17:23:52 -08:00
parent 48b83513fb
commit 8011130e2b
10 changed files with 183 additions and 22 deletions

11
entropy/audit.json Normal file
View File

@ -0,0 +1,11 @@
{
"name" : "audit",
"hostname" : "localhost",
"cron-freq" :"*/5 * * * *",
"username" : "praneshp",
"ssh-key" : "id_rsa",
"mq_host": "localhost",
"mq_port": "5672",
"mq_user": "guest",
"mq_password": "guest"
}

38
entropy/audit.py Normal file
View File

@ -0,0 +1,38 @@
# Copyright (C) 2013 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import datetime
from kombu import BrokerConnection
from kombu.common import maybe_declare
from kombu.pools import producers
from queues import entropy_exchange
from queues import PASS_KEY
#TODO(praneshp) : this should be read from a conf file.
def send_message(**kwargs):
connection = BrokerConnection('amqp://%(mq_user)s:%(mq_password)s@'
'%(mq_host)s:%(mq_port)s//' % kwargs)
message = {'From': __file__,
'Date': str(datetime.datetime.now())}
with producers[connection].acquire(block=True) as producer:
maybe_declare(entropy_exchange, producer.channel)
producer.publish(message,
exchange=entropy_exchange,
routing_key=PASS_KEY,
serializer='json')

View File

@ -26,7 +26,11 @@ import time
import croniter
from entropy import utils
sys.path.insert(0, os.path.join(os.path.abspath(os.pardir)))
sys.path.insert(0, os.path.abspath(os.getcwd()))
import audit
import utils
GOOD_MOOD = 1
SCRIPT_REPO = os.path.dirname(__file__)
@ -34,18 +38,22 @@ LOG_REPO = os.path.join(os.getcwd(), 'logs')
def validate_cfg(file):
#TODO(praneshp): can do better here
if GOOD_MOOD == 1:
return True
return False
def do_something():
with open(os.path.join(os.getcwd(), 'test'), "a") as op:
op.write('starting audit ' + str(datetime.datetime.now()) + '\n')
def do_something(**kwargs):
# Put a message on the mq
audit.send_message(**kwargs)
def start_audit(**kwargs):
#TODO(praneshp): Start croniter job here
#TODO(praneshp): fix bug here, where thread wakes up 0.0003 seconds
#before it should, and then sleeps off and cannot wake up in time.
#We lose the message this way.
now = datetime.datetime.now()
schedule = kwargs['schedule']
cron = croniter.croniter(schedule, now)
@ -54,7 +62,7 @@ def start_audit(**kwargs):
now = datetime.datetime.now()
logging.warning(str(now) + str(next_iteration))
if now > next_iteration:
do_something()
do_something(**kwargs['mq_args'])
next_iteration = cron.get_next(datetime.datetime)
else:
sleep_time = (next_iteration - now).total_seconds()
@ -73,16 +81,22 @@ def register_audit(args):
# Now validate cfg
conf_file = os.path.join(SCRIPT_REPO, args.conf)
validate_cfg(conf_file)
# Now pick out relevant info
kwargs = {}
# TODO(praneshp) eventually this must become a function call
with open(conf_file, 'r') as json_data:
data = json.load(json_data)
kwargs['username'] = data['username']
# TODO(praneshp) eventually this must become a function call
# somewhere else
kwargs['sshkey'] = utils.get_key_path()
kwargs['name'] = data['name']
kwargs['schedule'] = data['cron-freq']
# stuff for the message queue
mq_args = {'mq_host': data['mq_host'],
'mq_port': data['mq_port'],
'mq_user': data['mq_user'],
'mq_password': data['mq_password']}
# general stuff for the audit module
kwargs = {'sshkey': utils.get_key_path(),
'name': data['name'],
'schedule': data['cron-freq'],
'mq_args': mq_args}
#Start a thread to run a cron job for this audit script
t = threading.Thread(name=kwargs['name'], target=start_audit,
@ -99,6 +113,7 @@ def register_repair(args):
def init():
logging.warning('Initializing')
#TODO(praneshp): come up with to start all registered reaction scripts
def parse():
@ -124,6 +139,8 @@ def parse():
if __name__ == '__main__':
#TODO(praneshp): AMQP, json->yaml, reaction scripts(after amqp)
logging.basicConfig(filename=os.path.join(
LOG_REPO, 'entropy-' + str(time.time()) + '.log'))
init()
parse()

26
entropy/queues.py Normal file
View File

@ -0,0 +1,26 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2013 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from kombu import Exchange
from kombu import Queue
PASS_KEY = 'pass'
FAIL_KEY = 'fail'
entropy_exchange = Exchange('entropy_exchage', type='fanout')
pass_events = Queue('pass', entropy_exchange, routing_key=PASS_KEY)
fail_events = Queue('fail', entropy_exchange, routing_key=FAIL_KEY)

10
entropy/react.json Normal file
View File

@ -0,0 +1,10 @@
{
"name" : "react",
"hostname" : "localhost",
"username" : "praneshp",
"ssh-key" : "id_rsa",
"mq_host": "localhost",
"mq_port": "5672",
"mq_user": "guest",
"mq_password": "guest"
}

66
entropy/react.py Normal file
View File

@ -0,0 +1,66 @@
# Copyright (C) 2013 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
import logging
import os
from kombu import BrokerConnection
from kombu.mixins import ConsumerMixin
from queues import pass_events
SCRIPT_REPO = os.path.dirname(__file__)
conf_file = os.path.join(SCRIPT_REPO, 'react.json')
class SomeConsumer(ConsumerMixin):
def __init__(self, connection):
self.connection = connection
return
def get_consumers(self, Consumer, channel):
return [Consumer(pass_events, callbacks=[self.on_message])]
def on_message(self, body, message):
logging.warning("Received message: %r" % body)
message.ack()
return
def recv_message(**kwargs):
connection = BrokerConnection('amqp://%(mq_user)s:%(mq_password)s@'
'%(mq_host)s:%(mq_port)s//' % kwargs)
with connection as conn:
try:
SomeConsumer(conn).run()
except KeyboardInterrupt:
logging.warning('Quitting %s' % __name__)
def parse_conf():
with open(conf_file, 'r') as json_data:
data = json.load(json_data)
# stuff for the message queue
mq_args = {'mq_host': data['mq_host'],
'mq_port': data['mq_port'],
'mq_user': data['mq_user'],
'mq_password': data['mq_password']}
return mq_args
if __name__ == '__main__':
logging.warning('starting react script %s' % __file__)
mq_args = parse_conf()
recv_message(**mq_args)

View File

@ -1,7 +0,0 @@
{
"name" : "runthis",
"hostname" : "localhost",
"cron-freq" :"*/5 * * * *",
"username" : "praneshp",
"ssh-key" : "id_rsa"
}

View File

@ -2,4 +2,4 @@
Pbr>=0.5.21,<1.0
sphinx>=1.1.2,<1.2
croniter>=0.3.3
kombu==3.0.7

View File

@ -22,6 +22,6 @@ commands =
commands = {posargs}
[flake8]
ignore = H233
ignore = H233,H302,H304
builtins = _
exclude = .idea,.tox,docs,.git