pump events from STv2 db
This requires the introduction of the time_sync.py service. A trivial HTTP service that allows the POST/GET of timestamps. This can used between the consumer and stream pipeline processor to synchronize time when replaying old events (so they don't expire too prematurely). Change-Id: Iba61b3b75f68a19869d06a0484d2676dd44fdc60
This commit is contained in:
61
bin/pump_from_stv2.py
Normal file
61
bin/pump_from_stv2.py
Normal file
@@ -0,0 +1,61 @@
|
|||||||
|
"""Pump simulated OpenStack notificationss into RabbitMQ.
|
||||||
|
|
||||||
|
You need to install rabbitqm-server and
|
||||||
|
pip install librabbitmq
|
||||||
|
pip install --pre notabene
|
||||||
|
pip install --pre notification_utils
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
import datetime
|
||||||
|
import json
|
||||||
|
import sys
|
||||||
|
|
||||||
|
import mysql.connector
|
||||||
|
from notabene import kombu_driver as driver
|
||||||
|
import notification_utils
|
||||||
|
|
||||||
|
|
||||||
|
connection = driver.create_connection("localhost", 5672, 'guest', 'guest',
|
||||||
|
"librabbitmq", "/")
|
||||||
|
exchange = driver.create_exchange("monitor", "topic")
|
||||||
|
queue_name = "monitor.info"
|
||||||
|
queue = driver.create_queue(queue_name, exchange, queue_name,
|
||||||
|
channel=connection.channel())
|
||||||
|
queue.declare()
|
||||||
|
|
||||||
|
cnx = mysql.connector.connect(user='root', password='password',
|
||||||
|
host='127.0.0.1',
|
||||||
|
database='stacktach')
|
||||||
|
|
||||||
|
cursor = cnx.cursor()
|
||||||
|
|
||||||
|
query = ("SELECT stacktach_rawdata.when AS d, "
|
||||||
|
"stacktach_rawdata.json AS rawjson "
|
||||||
|
"FROM stacktach_rawdata "
|
||||||
|
"ORDER BY stacktach_rawdata.when LIMIT 100000")
|
||||||
|
cursor = cnx.cursor()
|
||||||
|
cursor.execute(query)
|
||||||
|
|
||||||
|
start = None
|
||||||
|
end = None
|
||||||
|
num = 0
|
||||||
|
for when, rawjson in cursor:
|
||||||
|
when = notification_utils.dt_from_decimal(when)
|
||||||
|
if not start:
|
||||||
|
start = when
|
||||||
|
end = start + datetime.timedelta(days=1)
|
||||||
|
if when > end:
|
||||||
|
break
|
||||||
|
queue, event = json.loads(rawjson)
|
||||||
|
# Skip the noise ...
|
||||||
|
if event['event_type'] in ['compute.instance.update', 'compute.instance.exists.verified']:
|
||||||
|
continue
|
||||||
|
print when, event['event_type']
|
||||||
|
driver.send_notification(event, queue_name, connection, exchange)
|
||||||
|
num+=1
|
||||||
|
|
||||||
|
print "Published %d events" % num
|
||||||
|
|
||||||
|
cursor.close()
|
||||||
|
cnx.close()
|
34
bin/time_sync.py
Normal file
34
bin/time_sync.py
Normal file
@@ -0,0 +1,34 @@
|
|||||||
|
import datetime
|
||||||
|
|
||||||
|
import falcon
|
||||||
|
|
||||||
|
|
||||||
|
# gunicorn --log-file=- 'time_sync:get_api()'
|
||||||
|
#
|
||||||
|
# To get the current time
|
||||||
|
# curl localhost:8000/time
|
||||||
|
#
|
||||||
|
# To set the current time
|
||||||
|
# curl --data "2014-10-09 22:55:33.111111" localhost:8000/time
|
||||||
|
|
||||||
|
class TimeResource(object):
|
||||||
|
def __init__(self):
|
||||||
|
self.last_time = None
|
||||||
|
|
||||||
|
def on_get(self, req, resp):
|
||||||
|
resp.body = self.last_time
|
||||||
|
print "GET", self.last_time
|
||||||
|
|
||||||
|
def on_post(self, req, resp):
|
||||||
|
chunk = req.stream.read(4096)
|
||||||
|
self.last_time = chunk
|
||||||
|
print "POST", self.last_time
|
||||||
|
|
||||||
|
|
||||||
|
api = falcon.API()
|
||||||
|
time_resource = TimeResource()
|
||||||
|
api.add_route('/time', time_resource)
|
||||||
|
|
||||||
|
|
||||||
|
def get_api():
|
||||||
|
return api
|
Reference in New Issue
Block a user