Make WSGI routing support routing to WSGI apps or to controller+action

Support the beginnings of serialization format detection (aka did the request ask for JSON or XML)
This commit is contained in:
Michael Gundlach
2010-08-12 15:19:33 -07:00
committed by Monty Taylor
31 changed files with 1233 additions and 399 deletions

View File

@@ -203,7 +203,7 @@ class ProjectCommands(object):
arguments: project user"""
self.manager.remove_from_project(user, project)
def create_zip(self, project_id, user_id, filename='nova.zip'):
def zipfile(self, project_id, user_id, filename='nova.zip'):
"""Exports credentials for project to a zip file
arguments: project_id user_id [filename='nova.zip]"""
zip_file = self.manager.get_credentials(project_id, user_id)

View File

@@ -21,14 +21,15 @@
Daemon for the Rackspace API endpoint.
"""
import nova.endpoint
from nova import flags
from nova import utils
from nova import wsgi
from nova.endpoint import rackspace
FLAGS = flags.FLAGS
flags.DEFINE_integer('cc_port', 8773, 'cloud controller port')
if __name__ == '__main__':
utils.default_flagfile()
wsgi.run_server(rackspace.API(), FLAGS.cc_port)
wsgi.run_server(nova.endpoint.ApiVersionRouter(), FLAGS.cc_port)

View File

@@ -182,7 +182,8 @@ class LdapDriver(object):
for member_uid in member_uids:
if not self.__user_exists(member_uid):
raise exception.NotFound("Project can't be created "
"because user %s doesn't exist" % member_uid)
"because user %s doesn't exist"
% member_uid)
members.append(self.__uid_to_dn(member_uid))
# always add the manager as a member because members is required
if not manager_dn in members:
@@ -236,6 +237,26 @@ class LdapDriver(object):
role_dn = self.__role_to_dn(role, project_id)
return self.__remove_from_group(uid, role_dn)
def get_user_roles(self, uid, project_id=None):
"""Retrieve list of roles for user (or user and project)"""
if project_id is None:
# NOTE(vish): This is unneccesarily slow, but since we can't
# guarantee that the global roles are located
# together in the ldap tree, we're doing this version.
roles = []
for role in FLAGS.allowed_roles:
role_dn = self.__role_to_dn(role)
if self.__is_in_group(uid, role_dn):
roles.append(role)
return roles
else:
project_dn = 'cn=%s,%s' % (project_id, FLAGS.ldap_project_subtree)
roles = self.__find_objects(project_dn,
'(&(&(objectclass=groupOfNames)'
'(!(objectclass=novaProject)))'
'(member=%s))' % self.__uid_to_dn(uid))
return [role['cn'][0] for role in roles]
def delete_user(self, uid):
"""Delete a user"""
if not self.__user_exists(uid):
@@ -253,24 +274,24 @@ class LdapDriver(object):
self.conn.delete_s('cn=%s,uid=%s,%s' % (key_name, uid,
FLAGS.ldap_user_subtree))
def delete_project(self, name):
def delete_project(self, project_id):
"""Delete a project"""
project_dn = 'cn=%s,%s' % (name, FLAGS.ldap_project_subtree)
project_dn = 'cn=%s,%s' % (project_id, FLAGS.ldap_project_subtree)
self.__delete_roles(project_dn)
self.__delete_group(project_dn)
def __user_exists(self, name):
def __user_exists(self, uid):
"""Check if user exists"""
return self.get_user(name) != None
return self.get_user(uid) != None
def __key_pair_exists(self, uid, key_name):
"""Check if key pair exists"""
return self.get_user(uid) != None
return self.get_key_pair(uid, key_name) != None
def __project_exists(self, name):
def __project_exists(self, project_id):
"""Check if project exists"""
return self.get_project(name) != None
return self.get_project(project_id) != None
def __find_object(self, dn, query=None, scope=None):
"""Find an object by dn and query"""

View File

@@ -29,17 +29,19 @@ import uuid
import zipfile
from nova import crypto
from nova import datastore
from nova import exception
from nova import flags
from nova import objectstore # for flags
from nova import utils
from nova.auth import ldapdriver # for flags
from nova.auth import signer
from nova.network import vpn
FLAGS = flags.FLAGS
flags.DEFINE_list('allowed_roles',
['cloudadmin', 'itsec', 'sysadmin', 'netadmin', 'developer'],
'Allowed roles for project')
# NOTE(vish): a user with one of these roles will be a superuser and
# have access to all api commands
flags.DEFINE_list('superuser_roles', ['cloudadmin'],
@@ -99,6 +101,7 @@ class AuthBase(object):
class User(AuthBase):
"""Object representing a user"""
def __init__(self, id, name, access, secret, admin):
AuthBase.__init__(self)
self.id = id
self.name = name
self.access = access
@@ -159,6 +162,7 @@ class KeyPair(AuthBase):
fingerprint is stored. The user's private key is not saved.
"""
def __init__(self, id, name, owner_id, public_key, fingerprint):
AuthBase.__init__(self)
self.id = id
self.name = name
self.owner_id = owner_id
@@ -176,6 +180,7 @@ class KeyPair(AuthBase):
class Project(AuthBase):
"""Represents a Project returned from the datastore"""
def __init__(self, id, name, project_manager_id, description, member_ids):
AuthBase.__init__(self)
self.id = id
self.name = name
self.project_manager_id = project_manager_id
@@ -234,7 +239,7 @@ class AuthManager(object):
AuthManager also manages associated data related to Auth objects that
need to be more accessible, such as vpn ips and ports.
"""
_instance=None
_instance = None
def __new__(cls, *args, **kwargs):
"""Returns the AuthManager singleton"""
if not cls._instance:
@@ -248,7 +253,7 @@ class AuthManager(object):
reset the driver if it is not set or a new driver is specified.
"""
if driver or not getattr(self, 'driver', None):
self.driver = utils.import_class(driver or FLAGS.auth_driver)
self.driver = utils.import_class(driver or FLAGS.auth_driver)
def authenticate(self, access, signature, params, verb='GET',
server_string='127.0.0.1:8773', path='/',
@@ -431,6 +436,10 @@ class AuthManager(object):
@type project: Project or project_id
@param project: Project in which to add local role.
"""
if role not in FLAGS.allowed_roles:
raise exception.NotFound("The %s role can not be found" % role)
if project is not None and role in FLAGS.global_roles:
raise exception.NotFound("The %s role is global only" % role)
with self.driver() as drv:
drv.add_role(User.safe_id(user), role, Project.safe_id(project))
@@ -454,6 +463,19 @@ class AuthManager(object):
with self.driver() as drv:
drv.remove_role(User.safe_id(user), role, Project.safe_id(project))
def get_roles(self, project_roles=True):
"""Get list of allowed roles"""
if project_roles:
return list(set(FLAGS.allowed_roles) - set(FLAGS.global_roles))
else:
return FLAGS.allowed_roles
def get_user_roles(self, user, project=None):
"""Get user global or per-project roles"""
with self.driver() as drv:
return drv.get_user_roles(User.safe_id(user),
Project.safe_id(project))
def get_project(self, pid):
"""Get project object by id"""
with self.driver() as drv:

View File

@@ -170,6 +170,9 @@ class BasicModel(object):
def setdefault(self, item, default):
return self.state.setdefault(item, default)
def __contains__(self, item):
return item in self.state
def __getitem__(self, item):
return self.state[item]

View File

@@ -30,3 +30,22 @@
.. moduleauthor:: Manish Singh <yosh@gimp.org>
.. moduleauthor:: Andy Smith <andy@anarkystic.com>
"""
from nova import wsgi
import routes
from nova.endpoint import rackspace
from nova.endpoint import aws
class ApiVersionRouter(wsgi.Router):
"""Routes top-level requests to the appropriate API."""
def __init__(self):
mapper = routes.Mapper()
mapper.connect(None, "/v1.0/{path_info:.*}", controller="rs")
mapper.connect(None, "/ec2/{path_info:.*}", controller="ec2")
targets = {"rs": rackspace.Api(), "ec2": aws.Api()}
super(ApiVersionRouter, self).__init__(mapper, targets)

View File

@@ -0,0 +1,24 @@
import routes
import webob.dec
from nova import wsgi
# TODO(gundlach): temp
class Api(wsgi.Router):
"""WSGI entry point for all AWS API requests."""
def __init__(self):
mapper = routes.Mapper()
mapper.connect(None, "{all:.*}", controller="dummy")
targets = {"dummy": self.dummy }
super(Api, self).__init__(mapper, targets)
@webob.dec.wsgify
def dummy(self, req):
#TODO(gundlach)
msg = "dummy response -- please hook up __init__() to cloud.py instead"
return repr({ 'dummy': msg,
'kwargs': repr(req.environ['wsgiorg.routing_args'][1]) })

View File

@@ -1,183 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Rackspace API Endpoint
"""
import json
import time
import webob.dec
import webob.exc
from nova import flags
from nova import rpc
from nova import utils
from nova import wsgi
from nova.auth import manager
from nova.compute import model as compute
from nova.network import model as network
FLAGS = flags.FLAGS
flags.DEFINE_string('cloud_topic', 'cloud', 'the topic clouds listen on')
class API(wsgi.Middleware):
"""Entry point for all requests."""
def __init__(self):
super(API, self).__init__(Router(webob.exc.HTTPNotFound()))
def __call__(self, environ, start_response):
context = {}
if "HTTP_X_AUTH_TOKEN" in environ:
context['user'] = manager.AuthManager().get_user_from_access_key(
environ['HTTP_X_AUTH_TOKEN'])
if context['user']:
context['project'] = manager.AuthManager().get_project(
context['user'].name)
if "user" not in context:
return webob.exc.HTTPForbidden()(environ, start_response)
environ['nova.context'] = context
return self.application(environ, start_response)
class Router(wsgi.Router):
"""Route requests to the next WSGI application."""
def _build_map(self):
"""Build routing map for authentication and cloud."""
self._connect("/v1.0", controller=AuthenticationAPI())
cloud = CloudServerAPI()
self._connect("/servers", controller=cloud.launch_server,
conditions={"method": ["POST"]})
self._connect("/servers/{server_id}", controller=cloud.delete_server,
conditions={'method': ["DELETE"]})
self._connect("/servers", controller=cloud)
class AuthenticationAPI(wsgi.Application):
"""Handle all authorization requests through WSGI applications."""
@webob.dec.wsgify
def __call__(self, req): # pylint: disable-msg=W0221
# TODO(todd): make a actual session with a unique token
# just pass the auth key back through for now
res = webob.Response()
res.status = '204 No Content'
res.headers.add('X-Server-Management-Url', req.host_url)
res.headers.add('X-Storage-Url', req.host_url)
res.headers.add('X-CDN-Managment-Url', req.host_url)
res.headers.add('X-Auth-Token', req.headers['X-Auth-Key'])
return res
class CloudServerAPI(wsgi.Application):
"""Handle all server requests through WSGI applications."""
def __init__(self):
super(CloudServerAPI, self).__init__()
self.instdir = compute.InstanceDirectory()
self.network = network.PublicNetworkController()
@webob.dec.wsgify
def __call__(self, req): # pylint: disable-msg=W0221
value = {"servers": []}
for inst in self.instdir.all:
value["servers"].append(self.instance_details(inst))
return json.dumps(value)
def instance_details(self, inst): # pylint: disable-msg=R0201
"""Build the data structure to represent details for an instance."""
return {
"id": inst.get("instance_id", None),
"imageId": inst.get("image_id", None),
"flavorId": inst.get("instacne_type", None),
"hostId": inst.get("node_name", None),
"status": inst.get("state", "pending"),
"addresses": {
"public": [network.get_public_ip_for_instance(
inst.get("instance_id", None))],
"private": [inst.get("private_dns_name", None)]},
# implemented only by Rackspace, not AWS
"name": inst.get("name", "Not-Specified"),
# not supported
"progress": "Not-Supported",
"metadata": {
"Server Label": "Not-Supported",
"Image Version": "Not-Supported"}}
@webob.dec.wsgify
def launch_server(self, req):
"""Launch a new instance."""
data = json.loads(req.body)
inst = self.build_server_instance(data, req.environ['nova.context'])
rpc.cast(
FLAGS.compute_topic, {
"method": "run_instance",
"args": {"instance_id": inst.instance_id}})
return json.dumps({"server": self.instance_details(inst)})
def build_server_instance(self, env, context):
"""Build instance data structure and save it to the data store."""
reservation = utils.generate_uid('r')
ltime = time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime())
inst = self.instdir.new()
inst['name'] = env['server']['name']
inst['image_id'] = env['server']['imageId']
inst['instance_type'] = env['server']['flavorId']
inst['user_id'] = context['user'].id
inst['project_id'] = context['project'].id
inst['reservation_id'] = reservation
inst['launch_time'] = ltime
inst['mac_address'] = utils.generate_mac()
address = self.network.allocate_ip(
inst['user_id'],
inst['project_id'],
mac=inst['mac_address'])
inst['private_dns_name'] = str(address)
inst['bridge_name'] = network.BridgedNetwork.get_network_for_project(
inst['user_id'],
inst['project_id'],
'default')['bridge_name']
# key_data, key_name, ami_launch_index
# TODO(todd): key data or root password
inst.save()
return inst
@webob.dec.wsgify
@wsgi.route_args
def delete_server(self, req, route_args): # pylint: disable-msg=R0201
"""Delete an instance."""
owner_hostname = None
instance = compute.Instance.lookup(route_args['server_id'])
if instance:
owner_hostname = instance["node_name"]
if not owner_hostname:
return webob.exc.HTTPNotFound("Did not find image, or it was "
"not in a running state.")
rpc_transport = "%s:%s" % (FLAGS.compute_topic, owner_hostname)
rpc.cast(rpc_transport,
{"method": "reboot_instance",
"args": {"instance_id": route_args['server_id']}})
req.status = "202 Accepted"

View File

View File

@@ -0,0 +1,5 @@
from nova.endpoint.rackspace.controllers.images import ImagesController
from nova.endpoint.rackspace.controllers.flavors import FlavorsController
from nova.endpoint.rackspace.controllers.servers import ServersController
from nova.endpoint.rackspace.controllers.sharedipgroups import \
SharedIpGroupsController

View File

@@ -0,0 +1,9 @@
from nova.wsgi import WSGIController
class BaseController(WSGIController):
@classmethod
def render(cls, instance):
if isinstance(instance, list):
return { cls.entity_name : cls.render(instance) }
else:
return { "TODO": "TODO" }

View File

@@ -0,0 +1 @@
class FlavorsController(object): pass

View File

@@ -0,0 +1 @@
class ImagesController(object): pass

View File

@@ -0,0 +1,63 @@
from nova import rpc
from nova.compute import model as compute
from nova.endpoint.rackspace.controllers.base import BaseController
class ServersController(BaseController):
entity_name = 'servers'
def index(cls):
return [instance_details(inst) for inst in compute.InstanceDirectory().all]
def show(self, **kwargs):
instance_id = kwargs['id']
return compute.InstanceDirectory().get(instance_id)
def delete(self, **kwargs):
instance_id = kwargs['id']
instance = compute.InstanceDirectory().get(instance_id)
if not instance:
raise ServerNotFound("The requested server was not found")
instance.destroy()
return True
def create(self, **kwargs):
inst = self.build_server_instance(kwargs['server'])
rpc.cast(
FLAGS.compute_topic, {
"method": "run_instance",
"args": {"instance_id": inst.instance_id}})
def update(self, **kwargs):
instance_id = kwargs['id']
instance = compute.InstanceDirectory().get(instance_id)
if not instance:
raise ServerNotFound("The requested server was not found")
instance.update(kwargs['server'])
instance.save()
def build_server_instance(self, env):
"""Build instance data structure and save it to the data store."""
reservation = utils.generate_uid('r')
ltime = time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime())
inst = self.instdir.new()
inst['name'] = env['server']['name']
inst['image_id'] = env['server']['imageId']
inst['instance_type'] = env['server']['flavorId']
inst['user_id'] = env['user']['id']
inst['project_id'] = env['project']['id']
inst['reservation_id'] = reservation
inst['launch_time'] = ltime
inst['mac_address'] = utils.generate_mac()
address = self.network.allocate_ip(
inst['user_id'],
inst['project_id'],
mac=inst['mac_address'])
inst['private_dns_name'] = str(address)
inst['bridge_name'] = network.BridgedNetwork.get_network_for_project(
inst['user_id'],
inst['project_id'],
'default')['bridge_name']
# key_data, key_name, ami_launch_index
# TODO(todd): key data or root password
inst.save()
return inst

View File

@@ -0,0 +1 @@
class SharedIpGroupsController(object): pass

View File

@@ -0,0 +1,90 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Rackspace API Endpoint
"""
import json
import time
import webob.dec
import webob.exc
import routes
from nova import flags
from nova import wsgi
from nova.auth import manager
from nova.endpoint.rackspace import controllers
FLAGS = flags.FLAGS
flags.DEFINE_string('cloud_topic', 'cloud', 'the topic clouds listen on')
class Api(wsgi.Middleware):
"""WSGI entry point for all Rackspace API requests."""
def __init__(self):
app = AuthMiddleware(ApiRouter())
super(Api, self).__init__(app)
class AuthMiddleware(wsgi.Middleware):
"""Authorize the rackspace API request or return an HTTP Forbidden."""
#TODO(gundlach): isn't this the old Nova API's auth? Should it be replaced
#with correct RS API auth?
@webob.dec.wsgify
def __call__(self, req):
context = {}
if "HTTP_X_AUTH_TOKEN" in req.environ:
context['user'] = manager.AuthManager().get_user_from_access_key(
req.environ['HTTP_X_AUTH_TOKEN'])
if context['user']:
context['project'] = manager.AuthManager().get_project(
context['user'].name)
if "user" not in context:
return webob.exc.HTTPForbidden()
req.environ['nova.context'] = context
return self.application
class ApiRouter(wsgi.Router):
"""
Routes requests on the Rackspace API to the appropriate controller
and method.
"""
def __init__(self):
mapper = routes.Mapper()
mapper.resource("server", "servers")
mapper.resource("image", "images")
mapper.resource("flavor", "flavors")
mapper.resource("sharedipgroup", "sharedipgroups")
targets = {
'servers': controllers.ServersController(),
'images': controllers.ImagesController(),
'flavors': controllers.FlavorsController(),
'sharedipgroups': controllers.SharedIpGroupsController()
}
super(ApiRouter, self).__init__(mapper, targets)

View File

@@ -97,11 +97,11 @@ class Vlan(datastore.BasicModel):
def dict_by_vlan(cls):
"""a hash of vlan:project"""
set_name = cls._redis_set_name(cls.__name__)
rv = {}
h = datastore.Redis.instance().hgetall(set_name)
for v in h.keys():
rv[h[v]] = v
return rv
retvals = {}
hashset = datastore.Redis.instance().hgetall(set_name)
for val in hashset.keys():
retvals[hashset[val]] = val
return retvals
@classmethod
@datastore.absorb_connection_error
@@ -136,7 +136,8 @@ class Vlan(datastore.BasicModel):
# CLEANUP:
# TODO(ja): Save the IPs at the top of each subnet for cloudpipe vpn clients
# TODO(ja): does vlanpool "keeper" need to know the min/max - shouldn't FLAGS always win?
# TODO(ja): does vlanpool "keeper" need to know the min/max -
# shouldn't FLAGS always win?
# TODO(joshua): Save the IPs at the top of each subnet for cloudpipe vpn clients
class BaseNetwork(datastore.BasicModel):
@@ -217,7 +218,9 @@ class BaseNetwork(datastore.BasicModel):
def available(self):
# the .2 address is always CloudPipe
# and the top <n> are for vpn clients
for idx in range(self.num_static_ips, len(self.network)-(1 + FLAGS.cnt_vpn_clients)):
num_ips = self.num_static_ips
num_clients = FLAGS.cnt_vpn_clients
for idx in range(num_ips, len(self.network)-(1 + num_clients)):
address = str(self.network[idx])
if not address in self.hosts.keys():
yield address
@@ -338,8 +341,9 @@ class DHCPNetwork(BridgedNetwork):
private_ip = str(self.network[2])
linux_net.confirm_rule("FORWARD -d %s -p udp --dport 1194 -j ACCEPT"
% (private_ip, ))
linux_net.confirm_rule("PREROUTING -t nat -d %s -p udp --dport %s -j DNAT --to %s:1194"
% (self.project.vpn_ip, self.project.vpn_port, private_ip))
linux_net.confirm_rule(
"PREROUTING -t nat -d %s -p udp --dport %s -j DNAT --to %s:1194"
% (self.project.vpn_ip, self.project.vpn_port, private_ip))
def deexpress(self, address=None):
# if this is the last address, stop dns
@@ -374,13 +378,14 @@ class PublicAddress(datastore.BasicModel):
return addr
DEFAULT_PORTS = [("tcp",80), ("tcp",22), ("udp",1194), ("tcp",443)]
DEFAULT_PORTS = [("tcp", 80), ("tcp", 22), ("udp", 1194), ("tcp", 443)]
class PublicNetworkController(BaseNetwork):
override_type = 'network'
def __init__(self, *args, **kwargs):
network_id = "public:default"
super(PublicNetworkController, self).__init__(network_id, FLAGS.public_range)
super(PublicNetworkController, self).__init__(network_id,
FLAGS.public_range)
self['user_id'] = "public"
self['project_id'] = "public"
self["create_time"] = time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime())
@@ -415,7 +420,7 @@ class PublicNetworkController(BaseNetwork):
def deallocate_ip(self, ip_str):
# NOTE(vish): cleanup is now done on release by the parent class
self.release_ip(ip_str)
self.release_ip(ip_str)
def associate_address(self, public_ip, private_ip, instance_id):
if not public_ip in self.assigned:
@@ -461,8 +466,9 @@ class PublicNetworkController(BaseNetwork):
linux_net.confirm_rule("FORWARD -d %s -p icmp -j ACCEPT"
% (private_ip))
for (protocol, port) in DEFAULT_PORTS:
linux_net.confirm_rule("FORWARD -d %s -p %s --dport %s -j ACCEPT"
% (private_ip, protocol, port))
linux_net.confirm_rule(
"FORWARD -d %s -p %s --dport %s -j ACCEPT"
% (private_ip, protocol, port))
def deexpress(self, address=None):
addr = self.get_host(address)

View File

@@ -179,7 +179,21 @@ class AuthTestCase(test.BaseTestCase):
project.add_role('test1', 'sysadmin')
self.assertTrue(project.has_role('test1', 'sysadmin'))
def test_211_can_remove_project_role(self):
def test_211_can_list_project_roles(self):
project = self.manager.get_project('testproj')
user = self.manager.get_user('test1')
self.manager.add_role(user, 'netadmin', project)
roles = self.manager.get_user_roles(user)
self.assertTrue('sysadmin' in roles)
self.assertFalse('netadmin' in roles)
project_roles = self.manager.get_user_roles(user, project)
self.assertTrue('sysadmin' in project_roles)
self.assertTrue('netadmin' in project_roles)
# has role should be false because global role is missing
self.assertFalse(self.manager.has_role(user, 'netadmin', project))
def test_212_can_remove_project_role(self):
project = self.manager.get_project('testproj')
self.assertTrue(project.has_role('test1', 'sysadmin'))
project.remove_role('test1', 'sysadmin')

View File

@@ -17,6 +17,10 @@
# under the License.
import logging
import shutil
import tempfile
from twisted.internet import defer
from nova import compute
from nova import exception
@@ -34,10 +38,16 @@ class VolumeTestCase(test.TrialTestCase):
super(VolumeTestCase, self).setUp()
self.compute = compute.service.ComputeService()
self.volume = None
self.tempdir = tempfile.mkdtemp()
self.flags(connection_type='fake',
fake_storage=True)
fake_storage=True,
aoe_export_dir=self.tempdir)
self.volume = volume_service.VolumeService()
def tearDown(self):
shutil.rmtree(self.tempdir)
@defer.inlineCallbacks
def test_run_create_volume(self):
vol_size = '0'
user_id = 'fake'
@@ -48,34 +58,40 @@ class VolumeTestCase(test.TrialTestCase):
volume_service.get_volume(volume_id)['volume_id'])
rv = self.volume.delete_volume(volume_id)
self.assertFailure(volume_service.get_volume(volume_id),
exception.Error)
self.assertRaises(exception.Error, volume_service.get_volume, volume_id)
@defer.inlineCallbacks
def test_too_big_volume(self):
vol_size = '1001'
user_id = 'fake'
project_id = 'fake'
self.assertRaises(TypeError,
self.volume.create_volume,
vol_size, user_id, project_id)
try:
yield self.volume.create_volume(vol_size, user_id, project_id)
self.fail("Should have thrown TypeError")
except TypeError:
pass
@defer.inlineCallbacks
def test_too_many_volumes(self):
vol_size = '1'
user_id = 'fake'
project_id = 'fake'
num_shelves = FLAGS.last_shelf_id - FLAGS.first_shelf_id + 1
total_slots = FLAGS.slots_per_shelf * num_shelves
total_slots = FLAGS.blades_per_shelf * num_shelves
vols = []
from nova import datastore
redis = datastore.Redis.instance()
for i in xrange(total_slots):
vid = yield self.volume.create_volume(vol_size, user_id, project_id)
vols.append(vid)
self.assertFailure(self.volume.create_volume(vol_size,
user_id,
project_id),
volume_service.NoMoreVolumes)
volume_service.NoMoreBlades)
for id in vols:
yield self.volume.delete_volume(id)
@defer.inlineCallbacks
def test_run_attach_detach_volume(self):
# Create one volume and one compute to test with
instance_id = "storage-test"
@@ -84,22 +100,26 @@ class VolumeTestCase(test.TrialTestCase):
project_id = 'fake'
mountpoint = "/dev/sdf"
volume_id = yield self.volume.create_volume(vol_size, user_id, project_id)
volume_obj = volume_service.get_volume(volume_id)
volume_obj.start_attach(instance_id, mountpoint)
rv = yield self.compute.attach_volume(volume_id,
instance_id,
mountpoint)
if FLAGS.fake_tests:
volume_obj.finish_attach()
else:
rv = yield self.compute.attach_volume(instance_id,
volume_id,
mountpoint)
self.assertEqual(volume_obj['status'], "in-use")
self.assertEqual(volume_obj['attachStatus'], "attached")
self.assertEqual(volume_obj['attach_status'], "attached")
self.assertEqual(volume_obj['instance_id'], instance_id)
self.assertEqual(volume_obj['mountpoint'], mountpoint)
self.assertRaises(exception.Error,
self.volume.delete_volume,
volume_id)
rv = yield self.volume.detach_volume(volume_id)
self.assertFailure(self.volume.delete_volume(volume_id), exception.Error)
volume_obj.start_detach()
if FLAGS.fake_tests:
volume_obj.finish_detach()
else:
rv = yield self.volume.detach_volume(instance_id,
volume_id)
volume_obj = volume_service.get_volume(volume_id)
self.assertEqual(volume_obj['status'], "available")
@@ -108,6 +128,27 @@ class VolumeTestCase(test.TrialTestCase):
volume_service.get_volume,
volume_id)
@defer.inlineCallbacks
def test_multiple_volume_race_condition(self):
vol_size = "5"
user_id = "fake"
project_id = 'fake'
shelf_blades = []
def _check(volume_id):
vol = volume_service.get_volume(volume_id)
shelf_blade = '%s.%s' % (vol['shelf_id'], vol['blade_id'])
self.assert_(shelf_blade not in shelf_blades)
shelf_blades.append(shelf_blade)
logging.debug("got %s" % shelf_blade)
vol.destroy()
deferreds = []
for i in range(5):
d = self.volume.create_volume(vol_size, user_id, project_id)
d.addCallback(_check)
d.addErrback(self.fail)
deferreds.append(d)
yield defer.DeferredList(deferreds)
def test_multi_node(self):
# TODO(termie): Figure out how to test with two nodes,
# each of them having a different FLAG for storage_node

View File

@@ -23,6 +23,7 @@ Handling of VM disk images.
import os.path
import time
import urlparse
from nova import flags
from nova import process
@@ -43,7 +44,7 @@ def fetch(image, path, user, project):
return f(image, path, user, project)
def _fetch_s3_image(image, path, user, project):
url = _image_url('%s/image' % image)
url = image_url(image)
# This should probably move somewhere else, like e.g. a download_as
# method on User objects and at the same time get rewritten to use
@@ -51,11 +52,11 @@ def _fetch_s3_image(image, path, user, project):
headers = {}
headers['Date'] = time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime())
uri = '/' + url.partition('/')[2]
(_, _, url_path, _, _, _) = urlparse.urlparse(url)
access = manager.AuthManager().get_access_key(user, project)
signature = signer.Signer(user.secret.encode()).s3_authorization(headers,
'GET',
uri)
url_path)
headers['Authorization'] = 'AWS %s:%s' % (access, signature)
cmd = ['/usr/bin/curl', '--silent', url]
@@ -72,5 +73,6 @@ def _fetch_local_image(image, path, user, project):
def _image_path(path):
return os.path.join(FLAGS.images_path, path)
def _image_url(path):
return "%s:%s/_images/%s" % (FLAGS.s3_host, FLAGS.s3_port, path)
def image_url(image):
return "http://%s:%s/_images/%s/image" % (FLAGS.s3_host, FLAGS.s3_port,
image)

View File

@@ -114,7 +114,8 @@ class LibvirtConnection(object):
def _cleanup(self, instance):
target = os.path.abspath(instance.datamodel['basepath'])
logging.info("Deleting instance files at %s", target)
shutil.rmtree(target)
if os.path.exists(target):
shutil.rmtree(target)
@defer.inlineCallbacks

View File

@@ -19,6 +19,7 @@ A connection to XenServer or Xen Cloud Platform.
"""
import logging
import xmlrpclib
from twisted.internet import defer
from twisted.internet import task
@@ -26,7 +27,9 @@ from twisted.internet import task
from nova import exception
from nova import flags
from nova import process
from nova.auth.manager import AuthManager
from nova.compute import power_state
from nova.virt import images
XenAPI = None
@@ -71,10 +74,41 @@ class XenAPIConnection(object):
@defer.inlineCallbacks
@exception.wrap_exception
def spawn(self, instance):
vm = self.lookup(instance.name)
vm = yield self.lookup(instance.name)
if vm is not None:
raise Exception('Attempted to create non-unique name %s' %
instance.name)
if 'bridge_name' in instance.datamodel:
network_ref = \
yield self._find_network_with_bridge(
instance.datamodel['bridge_name'])
else:
network_ref = None
if 'mac_address' in instance.datamodel:
mac_address = instance.datamodel['mac_address']
else:
mac_address = ''
user = AuthManager().get_user(instance.datamodel['user_id'])
project = AuthManager().get_project(instance.datamodel['project_id'])
vdi_uuid = yield self.fetch_image(
instance.datamodel['image_id'], user, project, True)
kernel = yield self.fetch_image(
instance.datamodel['kernel_id'], user, project, False)
ramdisk = yield self.fetch_image(
instance.datamodel['ramdisk_id'], user, project, False)
vdi_ref = yield self._conn.xenapi.VDI.get_by_uuid(vdi_uuid)
vm_ref = yield self.create_vm(instance, kernel, ramdisk)
yield self.create_vbd(vm_ref, vdi_ref, 0, True)
if network_ref:
yield self._create_vif(vm_ref, network_ref, mac_address)
yield self._conn.xenapi.VM.start(vm_ref, False, False)
def create_vm(self, instance, kernel, ramdisk):
mem = str(long(instance.datamodel['memory_kb']) * 1024)
vcpus = str(instance.datamodel['vcpus'])
rec = {
@@ -92,9 +126,9 @@ class XenAPIConnection(object):
'actions_after_reboot': 'restart',
'actions_after_crash': 'destroy',
'PV_bootloader': '',
'PV_kernel': instance.datamodel['kernel_id'],
'PV_ramdisk': instance.datamodel['ramdisk_id'],
'PV_args': '',
'PV_kernel': kernel,
'PV_ramdisk': ramdisk,
'PV_args': 'root=/dev/xvda1',
'PV_bootloader_args': '',
'PV_legacy_args': '',
'HVM_boot_policy': '',
@@ -106,8 +140,78 @@ class XenAPIConnection(object):
'user_version': '0',
'other_config': {},
}
vm = yield self._conn.xenapi.VM.create(rec)
#yield self._conn.xenapi.VM.start(vm, False, False)
logging.debug('Created VM %s...', instance.name)
vm_ref = self._conn.xenapi.VM.create(rec)
logging.debug('Created VM %s as %s.', instance.name, vm_ref)
return vm_ref
def create_vbd(self, vm_ref, vdi_ref, userdevice, bootable):
vbd_rec = {}
vbd_rec['VM'] = vm_ref
vbd_rec['VDI'] = vdi_ref
vbd_rec['userdevice'] = str(userdevice)
vbd_rec['bootable'] = bootable
vbd_rec['mode'] = 'RW'
vbd_rec['type'] = 'disk'
vbd_rec['unpluggable'] = True
vbd_rec['empty'] = False
vbd_rec['other_config'] = {}
vbd_rec['qos_algorithm_type'] = ''
vbd_rec['qos_algorithm_params'] = {}
vbd_rec['qos_supported_algorithms'] = []
logging.debug('Creating VBD for VM %s, VDI %s ... ', vm_ref, vdi_ref)
vbd_ref = self._conn.xenapi.VBD.create(vbd_rec)
logging.debug('Created VBD %s for VM %s, VDI %s.', vbd_ref, vm_ref,
vdi_ref)
return vbd_ref
def _create_vif(self, vm_ref, network_ref, mac_address):
vif_rec = {}
vif_rec['device'] = '0'
vif_rec['network']= network_ref
vif_rec['VM'] = vm_ref
vif_rec['MAC'] = mac_address
vif_rec['MTU'] = '1500'
vif_rec['other_config'] = {}
vif_rec['qos_algorithm_type'] = ''
vif_rec['qos_algorithm_params'] = {}
logging.debug('Creating VIF for VM %s, network %s ... ', vm_ref,
network_ref)
vif_ref = self._conn.xenapi.VIF.create(vif_rec)
logging.debug('Created VIF %s for VM %s, network %s.', vif_ref,
vm_ref, network_ref)
return vif_ref
def _find_network_with_bridge(self, bridge):
expr = 'field "bridge" = "%s"' % bridge
networks = self._conn.xenapi.network.get_all_records_where(expr)
if len(networks) == 1:
return networks.keys()[0]
elif len(networks) > 1:
raise Exception('Found non-unique network for bridge %s' % bridge)
else:
raise Exception('Found no network for bridge %s' % bridge)
def fetch_image(self, image, user, project, use_sr):
"""use_sr: True to put the image as a VDI in an SR, False to place
it on dom0's filesystem. The former is for VM disks, the latter for
its kernel and ramdisk (if external kernels are being used)."""
url = images.image_url(image)
access = AuthManager().get_access_key(user, project)
logging.debug("Asking xapi to fetch %s as %s" % (url, access))
fn = use_sr and 'get_vdi' or 'get_kernel'
args = {}
args['src_url'] = url
args['username'] = access
args['password'] = user.secret
if use_sr:
args['add_partition'] = 'true'
return self._call_plugin('objectstore', fn, args)
def reboot(self, instance):
@@ -125,7 +229,7 @@ class XenAPIConnection(object):
def get_info(self, instance_id):
vm = self.lookup(instance_id)
if vm is None:
raise Exception('instance not present %s' % instance.name)
raise Exception('instance not present %s' % instance_id)
rec = self._conn.xenapi.VM.get_record(vm)
return {'state': power_state_from_xenapi[rec['power_state']],
'max_mem': long(rec['memory_static_max']) >> 10,
@@ -143,10 +247,42 @@ class XenAPIConnection(object):
else:
return vms[0]
power_state_from_xenapi = {
'Halted' : power_state.RUNNING, #FIXME
'Running' : power_state.RUNNING,
'Paused' : power_state.PAUSED,
'Suspended': power_state.SHUTDOWN, # FIXME
'Crashed' : power_state.CRASHED
}
def _call_plugin(self, plugin, fn, args):
return _unwrap_plugin_exceptions(
self._conn.xenapi.host.call_plugin,
self._get_xenapi_host(), plugin, fn, args)
def _get_xenapi_host(self):
return self._conn.xenapi.session.get_this_host(self._conn.handle)
power_state_from_xenapi = {
'Halted' : power_state.SHUTDOWN,
'Running' : power_state.RUNNING,
'Paused' : power_state.PAUSED,
'Suspended': power_state.SHUTDOWN, # FIXME
'Crashed' : power_state.CRASHED
}
def _unwrap_plugin_exceptions(func, *args, **kwargs):
try:
return func(*args, **kwargs)
except XenAPI.Failure, exn:
logging.debug("Got exception: %s", exn)
if (len(exn.details) == 4 and
exn.details[0] == 'XENAPI_PLUGIN_EXCEPTION' and
exn.details[2] == 'Failure'):
params = None
try:
params = eval(exn.details[3])
except:
raise exn
raise XenAPI.Failure(params)
else:
raise
except xmlrpclib.ProtocolError, exn:
logging.debug("Got exception: %s", exn)
raise

View File

@@ -22,12 +22,8 @@ destroying persistent storage volumes, ala EBS.
Currently uses Ata-over-Ethernet.
"""
import glob
import logging
import os
import shutil
import socket
import tempfile
from twisted.internet import defer
@@ -47,9 +43,6 @@ flags.DEFINE_string('volume_group', 'nova-volumes',
'Name for the VG that will contain exported volumes')
flags.DEFINE_string('aoe_eth_dev', 'eth0',
'Which device to export the volumes on')
flags.DEFINE_string('storage_name',
socket.gethostname(),
'name of this service')
flags.DEFINE_integer('first_shelf_id',
utils.last_octet(utils.get_my_ip()) * 10,
'AoE starting shelf_id for this service')
@@ -59,9 +52,9 @@ flags.DEFINE_integer('last_shelf_id',
flags.DEFINE_string('aoe_export_dir',
'/var/lib/vblade-persist/vblades',
'AoE directory where exports are created')
flags.DEFINE_integer('slots_per_shelf',
flags.DEFINE_integer('blades_per_shelf',
16,
'Number of AoE slots per shelf')
'Number of AoE blades per shelf')
flags.DEFINE_string('storage_availability_zone',
'nova',
'availability zone of this service')
@@ -69,7 +62,7 @@ flags.DEFINE_boolean('fake_storage', False,
'Should we make real storage volumes to attach?')
class NoMoreVolumes(exception.Error):
class NoMoreBlades(exception.Error):
pass
def get_volume(volume_id):
@@ -77,8 +70,9 @@ def get_volume(volume_id):
volume_class = Volume
if FLAGS.fake_storage:
volume_class = FakeVolume
if datastore.Redis.instance().sismember('volumes', volume_id):
return volume_class(volume_id=volume_id)
vol = volume_class.lookup(volume_id)
if vol:
return vol
raise exception.Error("Volume does not exist")
class VolumeService(service.Service):
@@ -91,18 +85,9 @@ class VolumeService(service.Service):
super(VolumeService, self).__init__()
self.volume_class = Volume
if FLAGS.fake_storage:
FLAGS.aoe_export_dir = tempfile.mkdtemp()
self.volume_class = FakeVolume
self._init_volume_group()
def __del__(self):
# TODO(josh): Get rid of this destructor, volumes destroy themselves
if FLAGS.fake_storage:
try:
shutil.rmtree(FLAGS.aoe_export_dir)
except Exception, err:
pass
@defer.inlineCallbacks
@validate.rangetest(size=(0, 1000))
def create_volume(self, size, user_id, project_id):
@@ -113,8 +98,6 @@ class VolumeService(service.Service):
"""
logging.debug("Creating volume of size: %s" % (size))
vol = yield self.volume_class.create(size, user_id, project_id)
datastore.Redis.instance().sadd('volumes', vol['volume_id'])
datastore.Redis.instance().sadd('volumes:%s' % (FLAGS.storage_name), vol['volume_id'])
logging.debug("restarting exports")
yield self._restart_exports()
defer.returnValue(vol['volume_id'])
@@ -134,21 +117,19 @@ class VolumeService(service.Service):
def delete_volume(self, volume_id):
logging.debug("Deleting volume with id of: %s" % (volume_id))
vol = get_volume(volume_id)
if vol['status'] == "attached":
if vol['attach_status'] == "attached":
raise exception.Error("Volume is still attached")
if vol['node_name'] != FLAGS.storage_name:
if vol['node_name'] != FLAGS.node_name:
raise exception.Error("Volume is not local to this node")
yield vol.destroy()
datastore.Redis.instance().srem('volumes', vol['volume_id'])
datastore.Redis.instance().srem('volumes:%s' % (FLAGS.storage_name), vol['volume_id'])
defer.returnValue(True)
@defer.inlineCallbacks
def _restart_exports(self):
if FLAGS.fake_storage:
return
yield process.simple_execute("sudo vblade-persist auto all")
# NOTE(vish): this command sometimes sends output to stderr for warnings
# NOTE(vish): these commands sometimes sends output to stderr for warnings
yield process.simple_execute("sudo vblade-persist auto all", error_ok=1)
yield process.simple_execute("sudo vblade-persist start all", error_ok=1)
@defer.inlineCallbacks
@@ -172,14 +153,15 @@ class Volume(datastore.BasicModel):
return self.volume_id
def default_state(self):
return {"volume_id": self.volume_id}
return {"volume_id": self.volume_id,
"node_name": "unassigned"}
@classmethod
@defer.inlineCallbacks
def create(cls, size, user_id, project_id):
volume_id = utils.generate_uid('vol')
vol = cls(volume_id)
vol['node_name'] = FLAGS.storage_name
vol['node_name'] = FLAGS.node_name
vol['size'] = size
vol['user_id'] = user_id
vol['project_id'] = project_id
@@ -225,14 +207,31 @@ class Volume(datastore.BasicModel):
self['attach_status'] = "detached"
self.save()
def save(self):
is_new = self.is_new_record()
super(Volume, self).save()
if is_new:
redis = datastore.Redis.instance()
key = self.__devices_key
# TODO(vish): these should be added by admin commands
more = redis.scard(self._redis_association_name("node",
self['node_name']))
if (not redis.exists(key) and not more):
for shelf_id in range(FLAGS.first_shelf_id,
FLAGS.last_shelf_id + 1):
for blade_id in range(FLAGS.blades_per_shelf):
redis.sadd(key, "%s.%s" % (shelf_id, blade_id))
self.associate_with("node", self['node_name'])
@defer.inlineCallbacks
def destroy(self):
try:
yield self._remove_export()
except Exception as ex:
logging.debug("Ingnoring failure to remove export %s" % ex)
pass
yield self._remove_export()
yield self._delete_lv()
self.unassociate_with("node", self['node_name'])
if self.get('shelf_id', None) and self.get('blade_id', None):
redis = datastore.Redis.instance()
key = self.__devices_key
redis.sadd(key, "%s.%s" % (self['shelf_id'], self['blade_id']))
super(Volume, self).destroy()
@defer.inlineCallbacks
@@ -244,66 +243,72 @@ class Volume(datastore.BasicModel):
yield process.simple_execute(
"sudo lvcreate -L %s -n %s %s" % (sizestr,
self['volume_id'],
FLAGS.volume_group))
FLAGS.volume_group),
error_ok=1)
@defer.inlineCallbacks
def _delete_lv(self):
yield process.simple_execute(
"sudo lvremove -f %s/%s" % (FLAGS.volume_group,
self['volume_id']))
self['volume_id']), error_ok=1)
@property
def __devices_key(self):
return 'volume_devices:%s' % FLAGS.node_name
@defer.inlineCallbacks
def _setup_export(self):
(shelf_id, blade_id) = get_next_aoe_numbers()
redis = datastore.Redis.instance()
key = self.__devices_key
device = redis.spop(key)
if not device:
raise NoMoreBlades()
(shelf_id, blade_id) = device.split('.')
self['aoe_device'] = "e%s.%s" % (shelf_id, blade_id)
self['shelf_id'] = shelf_id
self['blade_id'] = blade_id
self.save()
yield self._exec_export()
yield self._exec_setup_export()
@defer.inlineCallbacks
def _exec_export(self):
def _exec_setup_export(self):
yield process.simple_execute(
"sudo vblade-persist setup %s %s %s /dev/%s/%s" %
(self['shelf_id'],
self['blade_id'],
FLAGS.aoe_eth_dev,
FLAGS.volume_group,
self['volume_id']))
self['volume_id']), error_ok=1)
@defer.inlineCallbacks
def _remove_export(self):
if not self.get('shelf_id', None) or not self.get('blade_id', None):
defer.returnValue(False)
yield self._exec_remove_export()
defer.returnValue(True)
@defer.inlineCallbacks
def _exec_remove_export(self):
yield process.simple_execute(
"sudo vblade-persist stop %s %s" % (self['shelf_id'],
self['blade_id']))
self['blade_id']), error_ok=1)
yield process.simple_execute(
"sudo vblade-persist destroy %s %s" % (self['shelf_id'],
self['blade_id']))
self['blade_id']), error_ok=1)
class FakeVolume(Volume):
def _create_lv(self):
pass
def _exec_export(self):
def _exec_setup_export(self):
fname = os.path.join(FLAGS.aoe_export_dir, self['aoe_device'])
f = file(fname, "w")
f.close()
def _remove_export(self):
pass
def _exec_remove_export(self):
os.unlink(os.path.join(FLAGS.aoe_export_dir, self['aoe_device']))
def _delete_lv(self):
pass
def get_next_aoe_numbers():
for shelf_id in xrange(FLAGS.first_shelf_id, FLAGS.last_shelf_id + 1):
aoes = glob.glob("%s/e%s.*" % (FLAGS.aoe_export_dir, shelf_id))
if not aoes:
blade_id = 0
else:
blade_id = int(max([int(a.rpartition('.')[2]) for a in aoes])) + 1
if blade_id < FLAGS.slots_per_shelf:
logging.debug("Next shelf.blade is %s.%s", shelf_id, blade_id)
return (shelf_id, blade_id)
raise NoMoreVolumes()

View File

@@ -29,6 +29,8 @@ import eventlet.wsgi
eventlet.patcher.monkey_patch(all=False, socket=True)
import routes
import routes.middleware
import webob.dec
import webob.exc
logging.getLogger("routes.middleware").addHandler(logging.StreamHandler())
@@ -41,6 +43,8 @@ def run_server(application, port):
class Application(object):
# TODO(gundlach): I think we should toss this class, now that it has no
# purpose.
"""Base WSGI application wrapper. Subclasses need to implement __call__."""
def __call__(self, environ, start_response):
@@ -79,95 +83,158 @@ class Application(object):
raise NotImplementedError("You must implement __call__")
class Middleware(Application): # pylint: disable-msg=W0223
"""Base WSGI middleware wrapper. These classes require an
application to be initialized that will be called next."""
class Middleware(Application): # pylint: disable=W0223
"""
Base WSGI middleware wrapper. These classes require an application to be
initialized that will be called next. By default the middleware will
simply call its wrapped app, or you can override __call__ to customize its
behavior.
"""
def __init__(self, application): # pylint: disable-msg=W0231
def __init__(self, application): # pylint: disable=W0231
self.application = application
@webob.dec.wsgify
def __call__(self, req):
"""Override to implement middleware behavior."""
return self.application
class Debug(Middleware):
"""Helper class that can be insertd into any WSGI application chain
"""Helper class that can be inserted into any WSGI application chain
to get information about the request and response."""
def __call__(self, environ, start_response):
for key, value in environ.items():
@webob.dec.wsgify
def __call__(self, req):
print ("*" * 40) + " REQUEST ENVIRON"
for key, value in req.environ.items():
print key, "=", value
print
wrapper = debug_start_response(start_response)
return debug_print_body(self.application(environ, wrapper))
resp = req.get_response(self.application)
def debug_start_response(start_response):
"""Wrap the start_response to capture when called."""
def wrapper(status, headers, exc_info=None):
"""Print out all headers when start_response is called."""
print status
for (key, value) in headers:
print ("*" * 40) + " RESPONSE HEADERS"
for (key, value) in resp.headers:
print key, "=", value
print
start_response(status, headers, exc_info)
return wrapper
resp.app_iter = self.print_generator(resp.app_iter)
return resp
@staticmethod
def print_generator(app_iter):
"""
Iterator that prints the contents of a wrapper string iterator
when iterated.
"""
print ("*" * 40) + "BODY"
for part in app_iter:
sys.stdout.write(part)
sys.stdout.flush()
yield part
print
def debug_print_body(body):
"""Print the body of the response as it is sent back."""
class Router(object):
"""
WSGI middleware that maps incoming requests to WSGI apps.
"""
class Wrapper(object):
"""Iterate through all the body parts and print before returning."""
def __init__(self, mapper, targets):
"""
Create a router for the given routes.Mapper.
def __iter__(self):
for part in body:
sys.stdout.write(part)
sys.stdout.flush()
yield part
print
Each route in `mapper` must specify a 'controller' string, which is
a key into the 'targets' dictionary whose value is a WSGI app to
run. If routing to a WSGIController, you'll want to specify
'action' as well so the controller knows what method to call on
itself.
return Wrapper()
Examples:
mapper = routes.Mapper()
targets = { "servers": ServerController(), "blog": BlogWsgiApp() }
# Explicit mapping of one route to a controller+action
mapper.connect(None, "/svrlist", controller="servers", action="list")
# Controller string is implicitly equal to 2nd param here, and
# actions are all implicitly defined
mapper.resource("server", "servers")
# Pointing to an arbitrary WSGI app. You can specify the
# {path_info:.*} parameter so the target app can be handed just that
# section of the URL.
mapper.connect(None, "/v1.0/{path_info:.*}", controller="blog")
"""
self.map = mapper
self.targets = targets
self._router = routes.middleware.RoutesMiddleware(self._dispatch,
self.map)
@webob.dec.wsgify
def __call__(self, req):
"""
Route the incoming request to a controller based on self.map.
If no match, return a 404.
"""
return self._router
@webob.dec.wsgify
def _dispatch(self, req):
"""
Called by self._router after matching the incoming request to a route
and putting the information into req.environ. Either returns 404
or the routed WSGI app's response.
"""
if req.environ['routes.route'] is None:
return webob.exc.HTTPNotFound()
match = req.environ['wsgiorg.routing_args'][1]
app_name = match['controller']
app = self.targets[app_name]
return app
class ParsedRoutes(Middleware):
"""Processed parsed routes from routes.middleware.RoutesMiddleware
and call either the controller if found or the default application
otherwise."""
def __call__(self, environ, start_response):
if environ['routes.route'] is None:
return self.application(environ, start_response)
app = environ['wsgiorg.routing_args'][1]['controller']
return app(environ, start_response)
class WSGIController(object):
"""
WSGI app that reads routing information supplied by RoutesMiddleware
and calls the requested action method on itself.
"""
@webob.dec.wsgify
def __call__(self, req):
"""
Call the method on self specified in req.environ by RoutesMiddleware.
"""
routes_dict = req.environ['wsgiorg.routing_args'][1]
action = routes_dict['action']
method = getattr(self, action)
del routes_dict['controller']
del routes_dict['action']
return method(**routes_dict)
class Router(Middleware): # pylint: disable-msg=R0921
"""Wrapper to help setup routes.middleware.RoutesMiddleware."""
class Serializer(object):
"""
Serializes a dictionary to a Content Type specified by a WSGI environment.
"""
def __init__(self, application):
self.map = routes.Mapper()
self._build_map()
application = ParsedRoutes(application)
application = routes.middleware.RoutesMiddleware(application, self.map)
super(Router, self).__init__(application)
def __init__(self, environ):
"""Create a serializer based on the given WSGI environment."""
self.environ = environ
def __call__(self, environ, start_response):
return self.application(environ, start_response)
def _build_map(self):
"""Method to create new connections for the routing map."""
raise NotImplementedError("You must implement _build_map")
def _connect(self, *args, **kwargs):
"""Wrapper for the map.connect method."""
self.map.connect(*args, **kwargs)
def serialize(self, data):
"""
Serialize a dictionary into a string. The format of the string
will be decided based on the Content Type requested in self.environ:
by Accept: header, or by URL suffix.
"""
req = webob.Request(self.environ)
# TODO(gundlach): do XML correctly and be more robust
if req.accept and 'application/json' in req.accept:
import json
return json.dumps(data)
else:
return '<xmlified_yeah_baby>' + repr(data) + \
'</xmlified_yeah_baby>'
def route_args(application):
"""Decorator to make grabbing routing args more convenient."""
def wrapper(self, req):
"""Call application with req and parsed routing args from."""
return application(self, req, req.environ['wsgiorg.routing_args'][1])
return wrapper

2
plugins/xenapi/README Normal file
View File

@@ -0,0 +1,2 @@
This directory contains files that are required for the XenAPI support. They
should be installed in the XenServer / Xen Cloud Platform domain 0.

View File

@@ -0,0 +1,231 @@
#!/usr/bin/env python
# Copyright (c) 2010 Citrix Systems, Inc.
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# XenAPI plugin for fetching images from nova-objectstore.
#
import base64
import errno
import hmac
import os
import os.path
import sha
import time
import urlparse
import XenAPIPlugin
from pluginlib_nova import *
configure_logging('objectstore')
KERNEL_DIR = '/boot/guest'
DOWNLOAD_CHUNK_SIZE = 2 * 1024 * 1024
SECTOR_SIZE = 512
MBR_SIZE_SECTORS = 63
MBR_SIZE_BYTES = MBR_SIZE_SECTORS * SECTOR_SIZE
def get_vdi(session, args):
src_url = exists(args, 'src_url')
username = exists(args, 'username')
password = exists(args, 'password')
add_partition = validate_bool(args, 'add_partition', 'false')
(proto, netloc, url_path, _, _, _) = urlparse.urlparse(src_url)
sr = find_sr(session)
if sr is None:
raise Exception('Cannot find SR to write VDI to')
virtual_size = \
get_content_length(proto, netloc, url_path, username, password)
if virtual_size < 0:
raise Exception('Cannot get VDI size')
vdi_size = virtual_size
if add_partition:
# Make room for MBR.
vdi_size += MBR_SIZE_BYTES
vdi = create_vdi(session, sr, src_url, vdi_size, False)
with_vdi_in_dom0(session, vdi, False,
lambda dev: get_vdi_(proto, netloc, url_path,
username, password, add_partition,
virtual_size, '/dev/%s' % dev))
return session.xenapi.VDI.get_uuid(vdi)
def get_vdi_(proto, netloc, url_path, username, password, add_partition,
virtual_size, dest):
if add_partition:
write_partition(virtual_size, dest)
offset = add_partition and MBR_SIZE_BYTES or 0
get(proto, netloc, url_path, username, password, dest, offset)
def write_partition(virtual_size, dest):
mbr_last = MBR_SIZE_SECTORS - 1
primary_first = MBR_SIZE_SECTORS
primary_last = MBR_SIZE_SECTORS + (virtual_size / SECTOR_SIZE) - 1
logging.debug('Writing partition table %d %d to %s...',
primary_first, primary_last, dest)
result = os.system('parted --script %s mklabel msdos' % dest)
if result != 0:
raise Exception('Failed to mklabel')
result = os.system('parted --script %s mkpart primary %ds %ds' %
(dest, primary_first, primary_last))
if result != 0:
raise Exception('Failed to mkpart')
logging.debug('Writing partition table %s done.', dest)
def find_sr(session):
host = get_this_host(session)
srs = session.xenapi.SR.get_all()
for sr in srs:
sr_rec = session.xenapi.SR.get_record(sr)
if not ('i18n-key' in sr_rec['other_config'] and
sr_rec['other_config']['i18n-key'] == 'local-storage'):
continue
for pbd in sr_rec['PBDs']:
pbd_rec = session.xenapi.PBD.get_record(pbd)
if pbd_rec['host'] == host:
return sr
return None
def get_kernel(session, args):
src_url = exists(args, 'src_url')
username = exists(args, 'username')
password = exists(args, 'password')
(proto, netloc, url_path, _, _, _) = urlparse.urlparse(src_url)
dest = os.path.join(KERNEL_DIR, url_path[1:])
# Paranoid check against people using ../ to do rude things.
if os.path.commonprefix([KERNEL_DIR, dest]) != KERNEL_DIR:
raise Exception('Illegal destination %s %s', (url_path, dest))
dirname = os.path.dirname(dest)
try:
os.makedirs(dirname)
except os.error, e:
if e.errno != errno.EEXIST:
raise
if not os.path.isdir(dirname):
raise Exception('Cannot make directory %s', dirname)
try:
os.remove(dest)
except:
pass
get(proto, netloc, url_path, username, password, dest, 0)
return dest
def get_content_length(proto, netloc, url_path, username, password):
headers = make_headers('HEAD', url_path, username, password)
return with_http_connection(
proto, netloc,
lambda conn: get_content_length_(url_path, headers, conn))
def get_content_length_(url_path, headers, conn):
conn.request('HEAD', url_path, None, headers)
response = conn.getresponse()
if response.status != 200:
raise Exception('%d %s' % (response.status, response.reason))
return long(response.getheader('Content-Length', -1))
def get(proto, netloc, url_path, username, password, dest, offset):
headers = make_headers('GET', url_path, username, password)
download(proto, netloc, url_path, headers, dest, offset)
def make_headers(verb, url_path, username, password):
headers = {}
headers['Date'] = \
time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime())
headers['Authorization'] = \
'AWS %s:%s' % (username,
s3_authorization(verb, url_path, password, headers))
return headers
def s3_authorization(verb, path, password, headers):
sha1 = hmac.new(password, digestmod=sha)
sha1.update(plaintext(verb, path, headers))
return base64.encodestring(sha1.digest()).strip()
def plaintext(verb, path, headers):
return '%s\n\n\n%s\n%s' % (verb,
"\n".join([headers[h] for h in headers]),
path)
def download(proto, netloc, url_path, headers, dest, offset):
with_http_connection(
proto, netloc,
lambda conn: download_(url_path, dest, offset, headers, conn))
def download_(url_path, dest, offset, headers, conn):
conn.request('GET', url_path, None, headers)
response = conn.getresponse()
if response.status != 200:
raise Exception('%d %s' % (response.status, response.reason))
length = response.getheader('Content-Length', -1)
with_file(
dest, 'a',
lambda dest_file: download_all(response, length, dest_file, offset))
def download_all(response, length, dest_file, offset):
dest_file.seek(offset)
i = 0
while True:
buf = response.read(DOWNLOAD_CHUNK_SIZE)
if buf:
dest_file.write(buf)
else:
return
i += len(buf)
if length != -1 and i >= length:
return
if __name__ == '__main__':
XenAPIPlugin.dispatch({'get_vdi': get_vdi,
'get_kernel': get_kernel})

View File

@@ -0,0 +1,216 @@
# Copyright (c) 2010 Citrix Systems, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# Helper functions for the Nova xapi plugins. In time, this will merge
# with the pluginlib.py shipped with xapi, but for now, that file is not
# very stable, so it's easiest just to have a copy of all the functions
# that we need.
#
import httplib
import logging
import logging.handlers
import re
import time
##### Logging setup
def configure_logging(name):
log = logging.getLogger()
log.setLevel(logging.DEBUG)
sysh = logging.handlers.SysLogHandler('/dev/log')
sysh.setLevel(logging.DEBUG)
formatter = logging.Formatter('%s: %%(levelname)-8s %%(message)s' % name)
sysh.setFormatter(formatter)
log.addHandler(sysh)
##### Exceptions
class PluginError(Exception):
"""Base Exception class for all plugin errors."""
def __init__(self, *args):
Exception.__init__(self, *args)
class ArgumentError(PluginError):
"""Raised when required arguments are missing, argument values are invalid,
or incompatible arguments are given.
"""
def __init__(self, *args):
PluginError.__init__(self, *args)
##### Helpers
def ignore_failure(func, *args, **kwargs):
try:
return func(*args, **kwargs)
except XenAPI.Failure, e:
logging.error('Ignoring XenAPI.Failure %s', e)
return None
##### Argument validation
ARGUMENT_PATTERN = re.compile(r'^[a-zA-Z0-9_:\.\-,]+$')
def validate_exists(args, key, default=None):
"""Validates that a string argument to a RPC method call is given, and
matches the shell-safe regex, with an optional default value in case it
does not exist.
Returns the string.
"""
if key in args:
if len(args[key]) == 0:
raise ArgumentError('Argument %r value %r is too short.' % (key, args[key]))
if not ARGUMENT_PATTERN.match(args[key]):
raise ArgumentError('Argument %r value %r contains invalid characters.' % (key, args[key]))
if args[key][0] == '-':
raise ArgumentError('Argument %r value %r starts with a hyphen.' % (key, args[key]))
return args[key]
elif default is not None:
return default
else:
raise ArgumentError('Argument %s is required.' % key)
def validate_bool(args, key, default=None):
"""Validates that a string argument to a RPC method call is a boolean string,
with an optional default value in case it does not exist.
Returns the python boolean value.
"""
value = validate_exists(args, key, default)
if value.lower() == 'true':
return True
elif value.lower() == 'false':
return False
else:
raise ArgumentError("Argument %s may not take value %r. Valid values are ['true', 'false']." % (key, value))
def exists(args, key):
"""Validates that a freeform string argument to a RPC method call is given.
Returns the string.
"""
if key in args:
return args[key]
else:
raise ArgumentError('Argument %s is required.' % key)
def optional(args, key):
"""If the given key is in args, return the corresponding value, otherwise
return None"""
return key in args and args[key] or None
def get_this_host(session):
return session.xenapi.session.get_this_host(session.handle)
def get_domain_0(session):
this_host_ref = get_this_host(session)
expr = 'field "is_control_domain" = "true" and field "resident_on" = "%s"' % this_host_ref
return session.xenapi.VM.get_all_records_where(expr).keys()[0]
def create_vdi(session, sr_ref, name_label, virtual_size, read_only):
vdi_ref = session.xenapi.VDI.create(
{ 'name_label': name_label,
'name_description': '',
'SR': sr_ref,
'virtual_size': str(virtual_size),
'type': 'User',
'sharable': False,
'read_only': read_only,
'xenstore_data': {},
'other_config': {},
'sm_config': {},
'tags': [] })
logging.debug('Created VDI %s (%s, %s, %s) on %s.', vdi_ref, name_label,
virtual_size, read_only, sr_ref)
return vdi_ref
def with_vdi_in_dom0(session, vdi, read_only, f):
dom0 = get_domain_0(session)
vbd_rec = {}
vbd_rec['VM'] = dom0
vbd_rec['VDI'] = vdi
vbd_rec['userdevice'] = 'autodetect'
vbd_rec['bootable'] = False
vbd_rec['mode'] = read_only and 'RO' or 'RW'
vbd_rec['type'] = 'disk'
vbd_rec['unpluggable'] = True
vbd_rec['empty'] = False
vbd_rec['other_config'] = {}
vbd_rec['qos_algorithm_type'] = ''
vbd_rec['qos_algorithm_params'] = {}
vbd_rec['qos_supported_algorithms'] = []
logging.debug('Creating VBD for VDI %s ... ', vdi)
vbd = session.xenapi.VBD.create(vbd_rec)
logging.debug('Creating VBD for VDI %s done.', vdi)
try:
logging.debug('Plugging VBD %s ... ', vbd)
session.xenapi.VBD.plug(vbd)
logging.debug('Plugging VBD %s done.', vbd)
return f(session.xenapi.VBD.get_device(vbd))
finally:
logging.debug('Destroying VBD for VDI %s ... ', vdi)
vbd_unplug_with_retry(session, vbd)
ignore_failure(session.xenapi.VBD.destroy, vbd)
logging.debug('Destroying VBD for VDI %s done.', vdi)
def vbd_unplug_with_retry(session, vbd):
"""Call VBD.unplug on the given VBD, with a retry if we get
DEVICE_DETACH_REJECTED. For reasons which I don't understand, we're
seeing the device still in use, even when all processes using the device
should be dead."""
while True:
try:
session.xenapi.VBD.unplug(vbd)
logging.debug('VBD.unplug successful first time.')
return
except XenAPI.Failure, e:
if (len(e.details) > 0 and
e.details[0] == 'DEVICE_DETACH_REJECTED'):
logging.debug('VBD.unplug rejected: retrying...')
time.sleep(1)
elif (len(e.details) > 0 and
e.details[0] == 'DEVICE_ALREADY_DETACHED'):
logging.debug('VBD.unplug successful eventually.')
return
else:
logging.error('Ignoring XenAPI.Failure in VBD.unplug: %s', e)
return
def with_http_connection(proto, netloc, f):
conn = (proto == 'https' and
httplib.HTTPSConnection(netloc) or
httplib.HTTPConnection(netloc))
try:
return f(conn)
finally:
conn.close()
def with_file(dest_path, mode, f):
dest = open(dest_path, mode)
try:
return f(dest)
finally:
dest.close()

View File

@@ -1,5 +1,9 @@
[Messages Control]
disable-msg=C0103
disable=C0103
# TODOs in code comments are fine...
disable=W0511
# *args and **kwargs are fine
disable=W0142
[Basic]
# Variables can be 1 to 31 characters long, with
@@ -10,10 +14,6 @@ variable-rgx=[a-z_][a-z0-9_]{0,30}$
# and be lowecased with underscores
method-rgx=[a-z_][a-z0-9_]{2,50}$
[MESSAGES CONTROL]
# TODOs in code comments are fine...
disable-msg=W0511
[Design]
max-public-methods=100
min-public-methods=0

View File

@@ -6,8 +6,7 @@ with_venv=tools/with_venv.sh
if [ -e ${venv} ]; then
${with_venv} python run_tests.py $@
else
echo "You need to install the Nova virtualenv before you can run this."
echo ""
echo "Please run tools/install_venv.py"
exit 1
echo "No virtual environment found...creating one"
python tools/install_venv.py
${with_venv} python run_tests.py $@
fi

View File

@@ -1,3 +1,23 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Copyright 2010 OpenStack, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Installation script for Nova's development virtualenv
"""
@@ -12,15 +32,15 @@ VENV = os.path.join(ROOT, '.nova-venv')
PIP_REQUIRES = os.path.join(ROOT, 'tools', 'pip-requires')
TWISTED_NOVA='http://nova.openstack.org/Twisted-10.0.0Nova.tar.gz'
def die(message, *args):
print >>sys.stderr, message % args
sys.exit(1)
def run_command(cmd, redirect_output=True, error_ok=False):
# Useful for debugging:
#print >>sys.stderr, ' '.join(cmd)
"""Runs a command in an out-of-process shell, returning the
output of that command
"""
if redirect_output:
stdout = subprocess.PIPE
else:
@@ -33,32 +53,44 @@ def run_command(cmd, redirect_output=True, error_ok=False):
return output
HAS_EASY_INSTALL = bool(run_command(['which', 'easy_install']).strip())
HAS_VIRTUALENV = bool(run_command(['which', 'virtualenv']).strip())
def check_dependencies():
"""Make sure pip and virtualenv are on the path."""
print 'Checking for pip...',
if not run_command(['which', 'pip']).strip():
die('ERROR: pip not found.\n\nNova development requires pip,'
' please install it using your favorite package management tool')
print 'done.'
"""Make sure virtualenv is in the path."""
print 'Checking for virtualenv...',
if not run_command(['which', 'virtualenv']).strip():
die('ERROR: virtualenv not found.\n\nNova development requires virtualenv,'
' please install it using your favorite package management tool')
if not HAS_VIRTUALENV:
print 'not found.'
# Try installing it via easy_install...
if HAS_EASY_INSTALL:
print 'Installing virtualenv via easy_install...',
if not run_command(['which', 'easy_install']):
die('ERROR: virtualenv not found.\n\nNova development requires virtualenv,'
' please install it using your favorite package management tool')
print 'done.'
print 'done.'
def create_virtualenv(venv=VENV):
"""Creates the virtual environment and installs PIP only into the
virtual environment
"""
print 'Creating venv...',
run_command(['virtualenv', '-q', '--no-site-packages', VENV])
print 'done.'
print 'Installing pip in virtualenv...',
if not run_command(['tools/with_venv.sh', 'easy_install', 'pip']).strip():
die("Failed to install pip.")
print 'done.'
def install_dependencies(venv=VENV):
print 'Installing dependencies with pip (this can take a while)...'
run_command(['pip', 'install', '-E', venv, '-r', PIP_REQUIRES],
run_command(['tools/with_venv.sh', 'pip', 'install', '-E', venv, '-r', PIP_REQUIRES],
redirect_output=False)
run_command(['pip', 'install', '-E', venv, TWISTED_NOVA],
run_command(['tools/with_venv.sh', 'pip', 'install', '-E', venv, TWISTED_NOVA],
redirect_output=False)

View File

@@ -1,14 +1,19 @@
pep8==0.5.0
pylint==0.21.1
IPy==0.70
M2Crypto==0.20.2
amqplib==0.6.1
anyjson==0.2.4
boto==2.0b1
carrot==0.10.5
eventlet==0.9.10
lockfile==0.8
python-daemon==1.5.5
python-gflags==1.3
redis==2.0.0
routes==1.12.3
tornado==1.0
webob==0.9.8
wsgiref==0.1.2
zope.interface==3.6.1
mox==0.5.0