From 8011130e2b43b321b6780b1b065431002ee0b435 Mon Sep 17 00:00:00 2001 From: pran1990 Date: Sat, 14 Dec 2013 17:23:52 -0800 Subject: [PATCH] Added a message queue Added kombu, an interface to AMQP Added react.py, sample reaction script with a kombu consumer. Should be run separately. Just loops forever listening for messages on the PASS_EVENTS queue/ Added queues.py, define an exchange and two queues to use for the message queue Replaced runthis.sh with audit.py, changed runthis.json to audit.json. audit.py has send_message(), which sends a message to the PASS_EVENTS queue Configuration data for audit.py and react.py should be picked up from audit.json and react.json. Added H304, H302 to pep8 ignores Change-Id: I32d5c007d3b00ee2ec33635bca1567da4447124c --- entropy/audit.json | 11 ++++++ entropy/audit.py | 38 ++++++++++++++++++ entropy/{runthis.sh => audit.sh} | 0 entropy/entropy.py | 43 ++++++++++++++------- entropy/queues.py | 26 +++++++++++++ entropy/react.json | 10 +++++ entropy/react.py | 66 ++++++++++++++++++++++++++++++++ entropy/runthis.json | 7 ---- requirements.txt | 2 +- tox.ini | 2 +- 10 files changed, 183 insertions(+), 22 deletions(-) create mode 100644 entropy/audit.json create mode 100644 entropy/audit.py rename entropy/{runthis.sh => audit.sh} (100%) create mode 100644 entropy/queues.py create mode 100644 entropy/react.json create mode 100644 entropy/react.py delete mode 100644 entropy/runthis.json diff --git a/entropy/audit.json b/entropy/audit.json new file mode 100644 index 0000000..d36c912 --- /dev/null +++ b/entropy/audit.json @@ -0,0 +1,11 @@ +{ + "name" : "audit", + "hostname" : "localhost", + "cron-freq" :"*/5 * * * *", + "username" : "praneshp", + "ssh-key" : "id_rsa", + "mq_host": "localhost", + "mq_port": "5672", + "mq_user": "guest", + "mq_password": "guest" +} diff --git a/entropy/audit.py b/entropy/audit.py new file mode 100644 index 0000000..e9c0841 --- /dev/null +++ b/entropy/audit.py @@ -0,0 +1,38 @@ +# Copyright (C) 2013 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import datetime + +from kombu import BrokerConnection +from kombu.common import maybe_declare +from kombu.pools import producers + +from queues import entropy_exchange +from queues import PASS_KEY + + +#TODO(praneshp) : this should be read from a conf file. + + +def send_message(**kwargs): + connection = BrokerConnection('amqp://%(mq_user)s:%(mq_password)s@' + '%(mq_host)s:%(mq_port)s//' % kwargs) + message = {'From': __file__, + 'Date': str(datetime.datetime.now())} + with producers[connection].acquire(block=True) as producer: + maybe_declare(entropy_exchange, producer.channel) + producer.publish(message, + exchange=entropy_exchange, + routing_key=PASS_KEY, + serializer='json') diff --git a/entropy/runthis.sh b/entropy/audit.sh similarity index 100% rename from entropy/runthis.sh rename to entropy/audit.sh diff --git a/entropy/entropy.py b/entropy/entropy.py index 3abef56..43d4895 100644 --- a/entropy/entropy.py +++ b/entropy/entropy.py @@ -26,7 +26,11 @@ import time import croniter -from entropy import utils +sys.path.insert(0, os.path.join(os.path.abspath(os.pardir))) +sys.path.insert(0, os.path.abspath(os.getcwd())) + +import audit +import utils GOOD_MOOD = 1 SCRIPT_REPO = os.path.dirname(__file__) @@ -34,18 +38,22 @@ LOG_REPO = os.path.join(os.getcwd(), 'logs') def validate_cfg(file): + #TODO(praneshp): can do better here if GOOD_MOOD == 1: return True return False -def do_something(): - with open(os.path.join(os.getcwd(), 'test'), "a") as op: - op.write('starting audit ' + str(datetime.datetime.now()) + '\n') +def do_something(**kwargs): + # Put a message on the mq + audit.send_message(**kwargs) def start_audit(**kwargs): - #TODO(praneshp): Start croniter job here + #TODO(praneshp): fix bug here, where thread wakes up 0.0003 seconds + #before it should, and then sleeps off and cannot wake up in time. + #We lose the message this way. + now = datetime.datetime.now() schedule = kwargs['schedule'] cron = croniter.croniter(schedule, now) @@ -54,7 +62,7 @@ def start_audit(**kwargs): now = datetime.datetime.now() logging.warning(str(now) + str(next_iteration)) if now > next_iteration: - do_something() + do_something(**kwargs['mq_args']) next_iteration = cron.get_next(datetime.datetime) else: sleep_time = (next_iteration - now).total_seconds() @@ -73,16 +81,22 @@ def register_audit(args): # Now validate cfg conf_file = os.path.join(SCRIPT_REPO, args.conf) validate_cfg(conf_file) + # Now pick out relevant info - kwargs = {} + # TODO(praneshp) eventually this must become a function call with open(conf_file, 'r') as json_data: data = json.load(json_data) - kwargs['username'] = data['username'] - # TODO(praneshp) eventually this must become a function call - # somewhere else - kwargs['sshkey'] = utils.get_key_path() - kwargs['name'] = data['name'] - kwargs['schedule'] = data['cron-freq'] + # stuff for the message queue + mq_args = {'mq_host': data['mq_host'], + 'mq_port': data['mq_port'], + 'mq_user': data['mq_user'], + 'mq_password': data['mq_password']} + + # general stuff for the audit module + kwargs = {'sshkey': utils.get_key_path(), + 'name': data['name'], + 'schedule': data['cron-freq'], + 'mq_args': mq_args} #Start a thread to run a cron job for this audit script t = threading.Thread(name=kwargs['name'], target=start_audit, @@ -99,6 +113,7 @@ def register_repair(args): def init(): logging.warning('Initializing') + #TODO(praneshp): come up with to start all registered reaction scripts def parse(): @@ -124,6 +139,8 @@ def parse(): if __name__ == '__main__': + #TODO(praneshp): AMQP, json->yaml, reaction scripts(after amqp) logging.basicConfig(filename=os.path.join( LOG_REPO, 'entropy-' + str(time.time()) + '.log')) + init() parse() diff --git a/entropy/queues.py b/entropy/queues.py new file mode 100644 index 0000000..2086af5 --- /dev/null +++ b/entropy/queues.py @@ -0,0 +1,26 @@ +# -*- coding: utf-8 -*- +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (C) 2013 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from kombu import Exchange +from kombu import Queue + +PASS_KEY = 'pass' +FAIL_KEY = 'fail' + +entropy_exchange = Exchange('entropy_exchage', type='fanout') +pass_events = Queue('pass', entropy_exchange, routing_key=PASS_KEY) +fail_events = Queue('fail', entropy_exchange, routing_key=FAIL_KEY) diff --git a/entropy/react.json b/entropy/react.json new file mode 100644 index 0000000..da64510 --- /dev/null +++ b/entropy/react.json @@ -0,0 +1,10 @@ +{ + "name" : "react", + "hostname" : "localhost", + "username" : "praneshp", + "ssh-key" : "id_rsa", + "mq_host": "localhost", + "mq_port": "5672", + "mq_user": "guest", + "mq_password": "guest" +} \ No newline at end of file diff --git a/entropy/react.py b/entropy/react.py new file mode 100644 index 0000000..4421b86 --- /dev/null +++ b/entropy/react.py @@ -0,0 +1,66 @@ +# Copyright (C) 2013 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +import json +import logging +import os + +from kombu import BrokerConnection +from kombu.mixins import ConsumerMixin + +from queues import pass_events + + +SCRIPT_REPO = os.path.dirname(__file__) +conf_file = os.path.join(SCRIPT_REPO, 'react.json') + + +class SomeConsumer(ConsumerMixin): + def __init__(self, connection): + self.connection = connection + return + + def get_consumers(self, Consumer, channel): + return [Consumer(pass_events, callbacks=[self.on_message])] + + def on_message(self, body, message): + logging.warning("Received message: %r" % body) + message.ack() + return + + +def recv_message(**kwargs): + connection = BrokerConnection('amqp://%(mq_user)s:%(mq_password)s@' + '%(mq_host)s:%(mq_port)s//' % kwargs) + with connection as conn: + try: + SomeConsumer(conn).run() + except KeyboardInterrupt: + logging.warning('Quitting %s' % __name__) + + +def parse_conf(): + with open(conf_file, 'r') as json_data: + data = json.load(json_data) + # stuff for the message queue + mq_args = {'mq_host': data['mq_host'], + 'mq_port': data['mq_port'], + 'mq_user': data['mq_user'], + 'mq_password': data['mq_password']} + return mq_args + + +if __name__ == '__main__': + logging.warning('starting react script %s' % __file__) + mq_args = parse_conf() + recv_message(**mq_args) diff --git a/entropy/runthis.json b/entropy/runthis.json deleted file mode 100644 index 5d3097e..0000000 --- a/entropy/runthis.json +++ /dev/null @@ -1,7 +0,0 @@ -{ - "name" : "runthis", - "hostname" : "localhost", - "cron-freq" :"*/5 * * * *", - "username" : "praneshp", - "ssh-key" : "id_rsa" -} diff --git a/requirements.txt b/requirements.txt index fff1acd..70a9e3b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,4 +2,4 @@ Pbr>=0.5.21,<1.0 sphinx>=1.1.2,<1.2 croniter>=0.3.3 - +kombu==3.0.7 diff --git a/tox.ini b/tox.ini index 53a6de8..16bca70 100644 --- a/tox.ini +++ b/tox.ini @@ -22,6 +22,6 @@ commands = commands = {posargs} [flake8] -ignore = H233 +ignore = H233,H302,H304 builtins = _ exclude = .idea,.tox,docs,.git