git checkpoint commit post-wsgi

This commit is contained in:
Todd Willey
2010-06-20 20:18:56 -04:00
parent 2f70e6af26
commit 62e69c356b
3 changed files with 83 additions and 224 deletions

View File

@@ -19,10 +19,9 @@
""" """
import logging import logging
from wsgiref import simple_server
from nova import vendor from nova import vendor
from tornado import httpserver
import tornado.web
from tornado import ioloop from tornado import ioloop
from nova import flags from nova import flags
@@ -31,34 +30,28 @@ from nova import server
from nova import utils from nova import utils
from nova.auth import users from nova.auth import users
from nova.endpoint import rackspace from nova.endpoint import rackspace
from nova.endpoint import wsgi_wrapper
FLAGS = flags.FLAGS
flags.DEFINE_integer('cc_port', 8773, 'api server port')
def application(_args): FLAGS = flags.FLAGS
""" Wrap WSGI application as tornado """ flags.DEFINE_integer('cc_port', 8773, 'cloud controller port')
wsgi_wrapper.Wrap(rackspace.Api)
def main(_argv): def main(_argv):
user_manager = users.UserManager() user_manager = users.UserManager()
controllers = { api_instance = rackspace.Api(user_manager)
'Servers': rackspace.ServersController()
}
_app = rackspace.RackspaceAPIServerApplication(user_manager, controllers)
conn = rpc.Connection.instance() conn = rpc.Connection.instance()
consumer = rpc.AdapterConsumer(connection=conn, rpc_consumer = rpc.AdapterConsumer(connection=conn,
topic=FLAGS.cloud_topic, topic=FLAGS.cloud_topic,
proxy=controllers['Servers']) proxy=api_instance)
io_inst = ioloop.IOLoop.instance() # TODO: fire rpc response listener (without attach to tornado)
_injected = consumer.attach_to_tornado(io_inst) # io_inst = ioloop.IOLoop.instance()
# _injected = consumer.attach_to_tornado(io_inst)
http_server = httpserver.HTTPServer(_app)
http_server.listen(FLAGS.cc_port)
logging.debug('Started HTTP server on %s', FLAGS.cc_port)
io_inst.start()
http_server = simple_server.WSGIServer(('0.0.0.0', FLAGS.cc_port), simple_server.WSGIRequestHandler)
http_server.set_app(api_instance.handler)
logging.debug('Started HTTP server on port %i' % FLAGS.cc_port)
while True:
http_server.handle_request()
# io_inst.start()
if __name__ == '__main__': if __name__ == '__main__':
utils.default_flagfile() utils.default_flagfile()

View File

@@ -36,6 +36,7 @@ from nova import exception
from nova.auth import users from nova.auth import users
from nova.compute import model from nova.compute import model
from nova.compute import network from nova.compute import network
from nova.endpoint import wsgi
from nova.endpoint import images from nova.endpoint import images
from nova.volume import storage from nova.volume import storage
@@ -53,216 +54,58 @@ def _gen_key(user_id, key_name):
return {'private_key': private_key, 'fingerprint': fingerprint} return {'private_key': private_key, 'fingerprint': fingerprint}
class ServersController(object): class Api(object):
""" ServersController provides the critical dispatch between
inbound API calls through the endpoint and messages
sent to the other nodes.
"""
def __init__(self):
self.instdir = model.InstanceDirectory()
self.network = network.PublicNetworkController()
self.setup()
@property def __init__(self, rpc_mechanism):
def instances(self): self.controllers = {
""" All instances in the system, as dicts """ "v1.0": RackspaceAuthenticationApi(),
for instance in self.instdir.all: "server": RackspaceCloudServerApi()
yield {instance['instance_id']: instance}
@property
def volumes(self):
""" returns a list of all volumes """
for volume_id in datastore.Redis.instance().smembers("volumes"):
volume = storage.Volume(volume_id=volume_id)
yield volume
def __str__(self):
return 'ServersController'
def setup(self):
""" Ensure the keychains and folders exist. """
# Create keys folder, if it doesn't exist
if not os.path.exists(FLAGS.keys_path):
os.makedirs(os.path.abspath(FLAGS.keys_path))
# Gen root CA, if we don't have one
root_ca_path = os.path.join(FLAGS.ca_path, FLAGS.ca_file)
if not os.path.exists(root_ca_path):
start = os.getcwd()
os.chdir(FLAGS.ca_path)
utils.runthis("Generating root CA: %s", "sh genrootca.sh")
os.chdir(start)
# TODO: Do this with M2Crypto instead
def get_instance_by_ip(self, ip):
return self.instdir.by_ip(ip)
def get_metadata(self, ip):
i = self.get_instance_by_ip(ip)
if i is None:
return None
if i['key_name']:
keys = {
'0': {
'_name': i['key_name'],
'openssh-key': i['key_data']
}
}
else:
keys = ''
data = {
'user-data': base64.b64decode(i['user_data']),
'meta-data': {
'ami-id': i['image_id'],
'ami-launch-index': i['ami_launch_index'],
'ami-manifest-path': 'FIXME', # image property
'block-device-mapping': { # TODO: replace with real data
'ami': 'sda1',
'ephemeral0': 'sda2',
'root': '/dev/sda1',
'swap': 'sda3'
},
'hostname': i['private_dns_name'], # is this public sometimes?
'instance-action': 'none',
'instance-id': i['instance_id'],
'instance-type': i.get('instance_type', ''),
'local-hostname': i['private_dns_name'],
'local-ipv4': i['private_dns_name'], # TODO: switch to IP
'kernel-id': i.get('kernel_id', ''),
'placement': {
'availaibility-zone': i.get('availability_zone', 'nova'),
},
'public-hostname': i.get('dns_name', ''),
'public-ipv4': i.get('dns_name', ''), # TODO: switch to IP
'public-keys' : keys,
'ramdisk-id': i.get('ramdisk_id', ''),
'reservation-id': i['reservation_id'],
'security-groups': i.get('groups', '')
}
} }
if False: # TODO: store ancestor ids self.rpc_mechanism = rpc_mechanism
data['ancestor-ami-ids'] = []
if i.get('product_codes', None):
data['product-codes'] = i['product_codes']
return data
def update_state(self, topic, value): def handler(self, environ, responder):
""" accepts status reports from the queue and consolidates them """ logging.error("*** %s" % environ)
# TODO(jmc): if an instance has disappeared from controller, path = wsgi.Util.route(environ['PATH_INFO'], self.controllers)
# the node, call instance_death if not controller:
if topic == "instances": raise Exception("Missing Controller")
return defer.succeed(True) rv = controller.process(path, environ)
aggregate_state = getattr(self, topic) if type(rv) is tuple:
node_name = value.keys()[0] responder(rv[0], rv[1])
items = value[node_name] rv = rv[2]
logging.debug("Updating %s state for %s" % (topic, node_name)) else:
for item_id in items.keys(): responder("200 OK", [])
if (aggregate_state.has_key('pending') and return rv
aggregate_state['pending'].has_key(item_id)):
del aggregate_state['pending'][item_id]
aggregate_state[node_name] = items
return defer.succeed(True)
class RackspaceAPIServerApplication(tornado.web.Application): class RackspaceApiEndpoint(object):
def __init__(self, user_manager, controllers): def process(self, path, env):
tornado.web.Application.__init__(self, [ if len(path) == 0:
(r'/servers/?(.*)', RackspaceServerRequestHandler), return self.index(env)
], pool=multiprocessing.Pool(4))
self.user_manager = user_manager
self.controllers = controllers
class RackspaceServerRequestHandler(tornado.web.RequestHandler): action = path.pop(0)
if hasattr(self, action):
method = getattr(self, action)
return method(path, env)
else:
raise Exception("Missing method %s" % path[0])
def get(self, controller_name):
self.execute(controller_name)
@tornado.web.asynchronous class RackspaceAuthenticationApi(RackspaceApiEndpoint):
def execute(self, controller_name):
controller = self.application.controllers['Servers']
# Obtain the appropriate controller for this request. def index(self, env):
# try: response = '204 No Content'
# controller = self.application.controllers[controller_name] headers = [
# except KeyError: ('X-Server-Management-Url', 'http://localhost:8773/server'),
# self._error('unhandled', 'no controller named %s' % controller_name) ('X-Storage-Url', 'http://localhost:8773/server'),
# return ('X-CDN-Managment-Url', 'http://localhost:8773/server'),
# ]
args = self.request.arguments body = ""
logging.error("ARGS: %s" % args) return (response, headers, body)
# Read request signature. class RackspaceCloudServerApi(object):
try:
signature = args.pop('Signature')[0]
except:
raise tornado.web.HTTPError(400)
# Make a copy of args for authentication and signature verification. def index(self):
auth_params = {} return "IDX"
for key, value in args.items():
auth_params[key] = value[0]
# Get requested action and remove authentication args for final request.
try:
action = args.pop('Action')[0]
access = args.pop('AWSAccessKeyId')[0]
args.pop('SignatureMethod')
args.pop('SignatureVersion')
args.pop('Version')
args.pop('Timestamp')
except:
raise tornado.web.HTTPError(400)
# Authenticate the request.
try:
(user, project) = users.UserManager.instance().authenticate(
access,
signature,
auth_params,
self.request.method,
self.request.host,
self.request.path
)
except exception.Error, ex:
logging.debug("Authentication Failure: %s" % ex)
raise tornado.web.HTTPError(403)
_log.debug('action: %s' % action)
for key, value in args.items():
_log.debug('arg: %s\t\tval: %s' % (key, value))
request = APIRequest(controller, action)
context = APIRequestContext(self, user, project)
d = request.send(context, **args)
# d.addCallback(utils.debug)
# TODO: Wrap response in AWS XML format
d.addCallbacks(self._write_callback, self._error_callback)
def _write_callback(self, data):
self.set_header('Content-Type', 'text/xml')
self.write(data)
self.finish()
def _error_callback(self, failure):
try:
failure.raiseException()
except exception.ApiError as ex:
self._error(type(ex).__name__ + "." + ex.code, ex.message)
# TODO(vish): do something more useful with unknown exceptions
except Exception as ex:
self._error(type(ex).__name__, str(ex))
raise
def post(self, controller_name):
self.execute(controller_name)
def _error(self, code, message):
self._status_code = 400
self.set_header('Content-Type', 'text/xml')
self.write('<?xml version="1.0"?>\n')
self.write('<Response><Errors><Error><Code>%s</Code>'
'<Message>%s</Message></Error></Errors>'
'<RequestID>?</RequestID></Response>' % (code, message))
self.finish()
def list(self, args):
return "%s" % args

23
nova/endpoint/wsgi.py Normal file
View File

@@ -0,0 +1,23 @@
'''
Utility methods for working with WSGI servers
'''
class Util(object):
@staticmethod
def route(reqstr, controllers):
if len(reqstr) == 0:
return Util.select_root_controller(controllers), []
parts = [x for x in reqstr.split("/") if len(x) > 0]
if len(parts) == 0:
return Util.select_root_controller(controllers), []
return controllers[parts[0]], parts[1:]
@staticmethod
def select_root_controller(controllers):
if '' in controllers:
return controllers['']
else:
return None