worker now daemonized and supports multiple nova deploys from one worker
This commit is contained in:
74
worker.py
Normal file → Executable file
74
worker.py
Normal file → Executable file
@@ -16,6 +16,7 @@
|
|||||||
# This is the worker you run in your OpenStack environment. You need
|
# This is the worker you run in your OpenStack environment. You need
|
||||||
# to set TENANT_ID and URL to point to your StackTach web server.
|
# to set TENANT_ID and URL to point to your StackTach web server.
|
||||||
|
|
||||||
|
import daemon
|
||||||
import json
|
import json
|
||||||
import kombu
|
import kombu
|
||||||
import kombu.connection
|
import kombu.connection
|
||||||
@@ -26,13 +27,16 @@ import urllib
|
|||||||
import urllib2
|
import urllib2
|
||||||
|
|
||||||
# CHANGE THESE FOR YOUR INSTALLATION ...
|
# CHANGE THESE FOR YOUR INSTALLATION ...
|
||||||
TENANT_ID = 1
|
DEPLOYMENTS = [
|
||||||
URL = 'http://darksecretsoftware.com/stacktach/%d/data/' % TENANT_ID
|
dict(
|
||||||
RABBIT_HOST = "localhost"
|
tenant_id=1,
|
||||||
RABBIT_PORT = 5672
|
url='http://example.com',
|
||||||
RABBIT_USERID = "guest"
|
rabbit_host="localhost",
|
||||||
RABBIT_PASSWORD = "guest"
|
rabbit_port=5672,
|
||||||
RABBIT_VIRTUAL_HOST = "/"
|
rabbit_userid="guest",
|
||||||
|
rabbit_password="guest",
|
||||||
|
rabbit_virtual_host="/"),
|
||||||
|
]
|
||||||
|
|
||||||
try:
|
try:
|
||||||
from worker_conf import *
|
from worker_conf import *
|
||||||
@@ -63,8 +67,9 @@ nova_queues = [
|
|||||||
|
|
||||||
|
|
||||||
class SchedulerFanoutConsumer(kombu.mixins.ConsumerMixin):
|
class SchedulerFanoutConsumer(kombu.mixins.ConsumerMixin):
|
||||||
def __init__(self, connection):
|
def __init__(self, connection, url):
|
||||||
self.connection = connection
|
self.connection = connection
|
||||||
|
self.url = url
|
||||||
|
|
||||||
def get_consumers(self, Consumer, channel):
|
def get_consumers(self, Consumer, channel):
|
||||||
return [Consumer(queues=scheduler_queues,
|
return [Consumer(queues=scheduler_queues,
|
||||||
@@ -79,13 +84,13 @@ class SchedulerFanoutConsumer(kombu.mixins.ConsumerMixin):
|
|||||||
try:
|
try:
|
||||||
raw_data = dict(args=jvalues)
|
raw_data = dict(args=jvalues)
|
||||||
cooked_data = urllib.urlencode(raw_data)
|
cooked_data = urllib.urlencode(raw_data)
|
||||||
req = urllib2.Request(URL, cooked_data)
|
req = urllib2.Request(self.url, cooked_data)
|
||||||
response = urllib2.urlopen(req)
|
response = urllib2.urlopen(req)
|
||||||
page = response.read()
|
page = response.read()
|
||||||
print page
|
print page
|
||||||
except urllib2.HTTPError, e:
|
except urllib2.HTTPError, e:
|
||||||
if e.code == 401:
|
if e.code == 401:
|
||||||
print "Unauthorized. Correct tenant id of %d?" % TENANT_ID
|
print "Unauthorized. Correct URL?", self.url
|
||||||
print e
|
print e
|
||||||
page = e.read()
|
page = e.read()
|
||||||
print page
|
print page
|
||||||
@@ -101,20 +106,41 @@ class SchedulerFanoutConsumer(kombu.mixins.ConsumerMixin):
|
|||||||
message.ack()
|
message.ack()
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
class Monitor(threading.Thread):
|
||||||
print "StackTach", URL
|
def __init__(self, deployment):
|
||||||
print "Rabbit", RABBIT_HOST, RABBIT_PORT, RABBIT_USERID, RABBIT_VIRTUAL_HOST
|
super(Monitor, self).__init__()
|
||||||
|
self.deployment = deployment
|
||||||
|
|
||||||
params = dict(hostname=RABBIT_HOST,
|
def run(self):
|
||||||
port=RABBIT_PORT,
|
tenant_id = self.deployment.get('tenant_id', 1)
|
||||||
userid=RABBIT_USERID,
|
url = self.deployment.get('url', 'http://www.example.com')
|
||||||
password=RABBIT_PASSWORD,
|
url = "%s/%d/data/" % (url, tenant_id)
|
||||||
virtual_host=RABBIT_VIRTUAL_HOST)
|
host = self.deployment.get('rabbit_host', 'localhost')
|
||||||
|
port = self.deployment.get('rabbit_port', 5672)
|
||||||
|
user_id = self.deployment.get('rabbit_userid', 'rabbit')
|
||||||
|
password = self.deployment.get('rabbit_password', 'rabbit')
|
||||||
|
virtual_host = self.deployment.get('rabbit_virtual_host', '/')
|
||||||
|
|
||||||
with kombu.connection.BrokerConnection(**params) as conn:
|
print "StackTach", url
|
||||||
consumer = SchedulerFanoutConsumer(conn)
|
print "Rabbit:", host, port, user_id, virtual_host
|
||||||
try:
|
|
||||||
print "Listening"
|
params = dict(hostname=host,
|
||||||
|
port=port,
|
||||||
|
userid=user_id,
|
||||||
|
password=password,
|
||||||
|
virtual_host=virtual_host)
|
||||||
|
|
||||||
|
with kombu.connection.BrokerConnection(**params) as conn:
|
||||||
|
consumer = SchedulerFanoutConsumer(conn, url)
|
||||||
consumer.run()
|
consumer.run()
|
||||||
except KeyboardInterrupt:
|
|
||||||
print("bye bye")
|
|
||||||
|
with daemon.DaemonContext():
|
||||||
|
workers = []
|
||||||
|
for deployment in DEPLOYMENTS:
|
||||||
|
monitor = Monitor(deployment)
|
||||||
|
workers.append(monitor)
|
||||||
|
monitor.start()
|
||||||
|
|
||||||
|
for worker in workers:
|
||||||
|
worker.join()
|
||||||
|
Reference in New Issue
Block a user