merge trunk

This commit is contained in:
Salvatore Orlando
2011-03-11 11:44:48 +00:00
27 changed files with 1228 additions and 190 deletions

View File

@@ -36,51 +36,15 @@ gettext.install('nova', unicode=1)
from nova import flags from nova import flags
from nova import log as logging from nova import log as logging
from nova import service
from nova import utils from nova import utils
from nova import version from nova import version
from nova import wsgi from nova import wsgi
LOG = logging.getLogger('nova.api') LOG = logging.getLogger('nova.api')
FLAGS = flags.FLAGS FLAGS = flags.FLAGS
flags.DEFINE_string('paste_config', "api-paste.ini",
'File name for the paste.deploy config for nova-api')
flags.DEFINE_string('ec2_listen', "0.0.0.0",
'IP address for EC2 API to listen')
flags.DEFINE_integer('ec2_listen_port', 8773, 'port for ec2 api to listen')
flags.DEFINE_string('osapi_listen', "0.0.0.0",
'IP address for OpenStack API to listen')
flags.DEFINE_integer('osapi_listen_port', 8774, 'port for os api to listen')
flags.DEFINE_flag(flags.HelpFlag())
flags.DEFINE_flag(flags.HelpshortFlag())
flags.DEFINE_flag(flags.HelpXMLFlag())
API_ENDPOINTS = ['ec2', 'osapi']
def run_app(paste_config_file):
LOG.debug(_("Using paste.deploy config at: %s"), paste_config_file)
apps = []
for api in API_ENDPOINTS:
config = wsgi.load_paste_configuration(paste_config_file, api)
if config is None:
LOG.debug(_("No paste configuration for app: %s"), api)
continue
LOG.debug(_("App Config: %(api)s\n%(config)r") % locals())
LOG.info(_("Running %s API"), api)
app = wsgi.load_paste_app(paste_config_file, api)
apps.append((app, getattr(FLAGS, "%s_listen_port" % api),
getattr(FLAGS, "%s_listen" % api)))
if len(apps) == 0:
LOG.error(_("No known API applications configured in %s."),
paste_config_file)
return
server = wsgi.Server()
for app in apps:
server.start(*app)
server.wait()
if __name__ == '__main__': if __name__ == '__main__':
utils.default_flagfile() utils.default_flagfile()
@@ -92,9 +56,6 @@ if __name__ == '__main__':
for flag in FLAGS: for flag in FLAGS:
flag_get = FLAGS.get(flag, None) flag_get = FLAGS.get(flag, None)
LOG.debug("%(flag)s : %(flag_get)s" % locals()) LOG.debug("%(flag)s : %(flag_get)s" % locals())
conf = wsgi.paste_config_file(FLAGS.paste_config)
if conf: service = service.serve_wsgi(service.ApiService)
run_app(conf) service.wait()
else:
LOG.error(_("No paste configuration found for: %s"),
FLAGS.paste_config)

View File

@@ -55,6 +55,8 @@
import datetime import datetime
import gettext import gettext
import glob
import json
import os import os
import re import re
import sys import sys
@@ -81,7 +83,7 @@ from nova import log as logging
from nova import quota from nova import quota
from nova import rpc from nova import rpc
from nova import utils from nova import utils
from nova.api.ec2.cloud import ec2_id_to_id from nova.api.ec2 import ec2utils
from nova.auth import manager from nova.auth import manager
from nova.cloudpipe import pipelib from nova.cloudpipe import pipelib
from nova.compute import instance_types from nova.compute import instance_types
@@ -94,6 +96,7 @@ flags.DECLARE('network_size', 'nova.network.manager')
flags.DECLARE('vlan_start', 'nova.network.manager') flags.DECLARE('vlan_start', 'nova.network.manager')
flags.DECLARE('vpn_start', 'nova.network.manager') flags.DECLARE('vpn_start', 'nova.network.manager')
flags.DECLARE('fixed_range_v6', 'nova.network.manager') flags.DECLARE('fixed_range_v6', 'nova.network.manager')
flags.DECLARE('images_path', 'nova.image.local')
flags.DEFINE_flag(flags.HelpFlag()) flags.DEFINE_flag(flags.HelpFlag())
flags.DEFINE_flag(flags.HelpshortFlag()) flags.DEFINE_flag(flags.HelpshortFlag())
flags.DEFINE_flag(flags.HelpXMLFlag()) flags.DEFINE_flag(flags.HelpXMLFlag())
@@ -104,7 +107,7 @@ def param2id(object_id):
args: [object_id], e.g. 'vol-0000000a' or 'volume-0000000a' or '10' args: [object_id], e.g. 'vol-0000000a' or 'volume-0000000a' or '10'
""" """
if '-' in object_id: if '-' in object_id:
return ec2_id_to_id(object_id) return ec2utils.ec2_id_to_id(object_id)
else: else:
return int(object_id) return int(object_id)
@@ -545,6 +548,15 @@ class NetworkCommands(object):
network.dhcp_start, network.dhcp_start,
network.dns) network.dns)
def delete(self, fixed_range):
"""Deletes a network"""
network = db.network_get_by_cidr(context.get_admin_context(), \
fixed_range)
if network.project_id is not None:
raise ValueError(_('Network must be disassociated from project %s'
' before delete' % network.project_id))
db.network_delete_safe(context.get_admin_context(), network.id)
class ServiceCommands(object): class ServiceCommands(object):
"""Enable and disable running services""" """Enable and disable running services"""
@@ -735,6 +747,155 @@ class InstanceTypeCommands(object):
self._print_instance_types(name, inst_types) self._print_instance_types(name, inst_types)
class ImageCommands(object):
"""Methods for dealing with a cloud in an odd state"""
def __init__(self, *args, **kwargs):
self.image_service = utils.import_object(FLAGS.image_service)
def _register(self, image_type, disk_format, container_format,
path, owner, name=None, is_public='T',
architecture='x86_64', kernel_id=None, ramdisk_id=None):
meta = {'is_public': True,
'name': name,
'disk_format': disk_format,
'container_format': container_format,
'properties': {'image_state': 'available',
'owner': owner,
'type': image_type,
'architecture': architecture,
'image_location': 'local',
'is_public': (is_public == 'T')}}
print image_type, meta
if kernel_id:
meta['properties']['kernel_id'] = int(kernel_id)
if ramdisk_id:
meta['properties']['ramdisk_id'] = int(ramdisk_id)
elevated = context.get_admin_context()
try:
with open(path) as ifile:
image = self.image_service.create(elevated, meta, ifile)
new = image['id']
print _("Image registered to %(new)s (%(new)08x).") % locals()
return new
except Exception as exc:
print _("Failed to register %(path)s: %(exc)s") % locals()
def all_register(self, image, kernel, ramdisk, owner, name=None,
is_public='T', architecture='x86_64'):
"""Uploads an image, kernel, and ramdisk into the image_service
arguments: image kernel ramdisk owner [name] [is_public='T']
[architecture='x86_64']"""
kernel_id = self.kernel_register(kernel, owner, None,
is_public, architecture)
ramdisk_id = self.ramdisk_register(ramdisk, owner, None,
is_public, architecture)
self.image_register(image, owner, name, is_public,
architecture, kernel_id, ramdisk_id)
def image_register(self, path, owner, name=None, is_public='T',
architecture='x86_64', kernel_id=None, ramdisk_id=None,
disk_format='ami', container_format='ami'):
"""Uploads an image into the image_service
arguments: path owner [name] [is_public='T'] [architecture='x86_64']
[kernel_id=None] [ramdisk_id=None]
[disk_format='ami'] [container_format='ami']"""
return self._register('machine', disk_format, container_format, path,
owner, name, is_public, architecture,
kernel_id, ramdisk_id)
def kernel_register(self, path, owner, name=None, is_public='T',
architecture='x86_64'):
"""Uploads a kernel into the image_service
arguments: path owner [name] [is_public='T'] [architecture='x86_64']
"""
return self._register('kernel', 'aki', 'aki', path, owner, name,
is_public, architecture)
def ramdisk_register(self, path, owner, name=None, is_public='T',
architecture='x86_64'):
"""Uploads a ramdisk into the image_service
arguments: path owner [name] [is_public='T'] [architecture='x86_64']
"""
return self._register('ramdisk', 'ari', 'ari', path, owner, name,
is_public, architecture)
def _lookup(self, old_image_id):
try:
internal_id = ec2utils.ec2_id_to_id(old_image_id)
image = self.image_service.show(context, internal_id)
except exception.NotFound:
image = self.image_service.show_by_name(context, old_image_id)
return image['id']
def _old_to_new(self, old):
mapping = {'machine': 'ami',
'kernel': 'aki',
'ramdisk': 'ari'}
container_format = mapping[old['type']]
disk_format = container_format
new = {'disk_format': disk_format,
'container_format': container_format,
'is_public': True,
'name': old['imageId'],
'properties': {'image_state': old['imageState'],
'owner': old['imageOwnerId'],
'architecture': old['architecture'],
'type': old['type'],
'image_location': old['imageLocation'],
'is_public': old['isPublic']}}
if old.get('kernelId'):
new['properties']['kernel_id'] = self._lookup(old['kernelId'])
if old.get('ramdiskId'):
new['properties']['ramdisk_id'] = self._lookup(old['ramdiskId'])
return new
def _convert_images(self, images):
elevated = context.get_admin_context()
for image_path, image_metadata in images.iteritems():
meta = self._old_to_new(image_metadata)
old = meta['name']
try:
with open(image_path) as ifile:
image = self.image_service.create(elevated, meta, ifile)
new = image['id']
print _("Image %(old)s converted to " \
"%(new)s (%(new)08x).") % locals()
except Exception as exc:
print _("Failed to convert %(old)s: %(exc)s") % locals()
def convert(self, directory):
"""Uploads old objectstore images in directory to new service
arguments: directory"""
machine_images = {}
other_images = {}
directory = os.path.abspath(directory)
# NOTE(vish): If we're importing from the images path dir, attempt
# to move the files out of the way before importing
# so we aren't writing to the same directory. This
# may fail if the dir was a mointpoint.
if (FLAGS.image_service == 'nova.image.local.LocalImageService'
and directory == os.path.abspath(FLAGS.images_path)):
new_dir = "%s_bak" % directory
os.move(directory, new_dir)
os.mkdir(directory)
directory = new_dir
for fn in glob.glob("%s/*/info.json" % directory):
try:
image_path = os.path.join(fn.rpartition('/')[0], 'image')
with open(fn) as metadata_file:
image_metadata = json.load(metadata_file)
if image_metadata['type'] == 'machine':
machine_images[image_path] = image_metadata
else:
other_images[image_path] = image_metadata
except Exception as exc:
print _("Failed to load %(fn)s.") % locals()
# NOTE(vish): do kernels and ramdisks first so images
self._convert_images(other_images)
self._convert_images(machine_images)
CATEGORIES = [ CATEGORIES = [
('user', UserCommands), ('user', UserCommands),
('project', ProjectCommands), ('project', ProjectCommands),
@@ -749,6 +910,7 @@ CATEGORIES = [
('db', DbCommands), ('db', DbCommands),
('volume', VolumeCommands), ('volume', VolumeCommands),
('instance_type', InstanceTypeCommands), ('instance_type', InstanceTypeCommands),
('image', ImageCommands),
('flavor', InstanceTypeCommands)] ('flavor', InstanceTypeCommands)]

View File

@@ -8,5 +8,6 @@ from nova import utils
def setup(app): def setup(app):
rootdir = os.path.abspath(app.srcdir + '/..') rootdir = os.path.abspath(app.srcdir + '/..')
print "**Autodocumenting from %s" % rootdir print "**Autodocumenting from %s" % rootdir
rv = utils.execute('cd %s && ./generate_autodoc_index.sh' % rootdir) os.chdir(rootdir)
rv = utils.execute('./generate_autodoc_index.sh')
print rv[0] print rv[0]

View File

@@ -173,7 +173,10 @@ Nova Floating IPs
``nova-manage floating create <host> <ip_range>`` ``nova-manage floating create <host> <ip_range>``
Creates floating IP addresses for the named host by the given range. Creates floating IP addresses for the named host by the given range.
floating delete <ip_range> Deletes floating IP addresses in the range given.
``nova-manage floating delete <ip_range>``
Deletes floating IP addresses in the range given.
``nova-manage floating list`` ``nova-manage floating list``
@@ -193,7 +196,7 @@ Nova Flavor
``nova-manage flavor create <name> <memory> <vCPU> <local_storage> <flavorID> <(optional) swap> <(optional) RXTX Quota> <(optional) RXTX Cap>`` ``nova-manage flavor create <name> <memory> <vCPU> <local_storage> <flavorID> <(optional) swap> <(optional) RXTX Quota> <(optional) RXTX Cap>``
creates a flavor with the following positional arguments: creates a flavor with the following positional arguments:
* memory (expressed in megabytes) * memory (expressed in megabytes)
* vcpu(s) (integer) * vcpu(s) (integer)
* local storage (expressed in gigabytes) * local storage (expressed in gigabytes)
* flavorid (unique integer) * flavorid (unique integer)
@@ -209,12 +212,33 @@ Nova Flavor
Purges the flavor with the name <name>. This removes this flavor from the database. Purges the flavor with the name <name>. This removes this flavor from the database.
Nova Instance_type Nova Instance_type
~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~~
The instance_type command is provided as an alias for the flavor command. All the same subcommands and arguments from nova-manage flavor can be used. The instance_type command is provided as an alias for the flavor command. All the same subcommands and arguments from nova-manage flavor can be used.
Nova Images
~~~~~~~~~~~
``nova-manage image image_register <path> <owner>``
Registers an image with the image service.
``nova-manage image kernel_register <path> <owner>``
Registers a kernel with the image service.
``nova-manage image ramdisk_register <path> <owner>``
Registers a ramdisk with the image service.
``nova-manage image all_register <image_path> <kernel_path> <ramdisk_path> <owner>``
Registers an image kernel and ramdisk with the image service.
``nova-manage image convert <directory>``
Converts all images in directory from the old (Bexar) format to the new format.
FILES FILES
======== ========

View File

@@ -182,6 +182,29 @@ Nova Floating IPs
Displays a list of all floating IP addresses. Displays a list of all floating IP addresses.
Nova Images
~~~~~~~~~~~
``nova-manage image image_register <path> <owner>``
Registers an image with the image service.
``nova-manage image kernel_register <path> <owner>``
Registers a kernel with the image service.
``nova-manage image ramdisk_register <path> <owner>``
Registers a ramdisk with the image service.
``nova-manage image all_register <image_path> <kernel_path> <ramdisk_path> <owner>``
Registers an image kernel and ramdisk with the image service.
``nova-manage image convert <directory>``
Converts all images in directory from the old (Bexar) format to the new format.
Concept: Flags Concept: Flags
-------------- --------------

View File

@@ -88,6 +88,10 @@ class InvalidInputException(Error):
pass pass
class InvalidContentType(Error):
pass
class TimeoutException(Error): class TimeoutException(Error):
pass pass

View File

@@ -48,7 +48,6 @@ class Exchange(object):
nm = self.name nm = self.name
LOG.debug(_('(%(nm)s) publish (key: %(routing_key)s)' LOG.debug(_('(%(nm)s) publish (key: %(routing_key)s)'
' %(message)s') % locals()) ' %(message)s') % locals())
routing_key = routing_key.split('.')[0]
if routing_key in self._routes: if routing_key in self._routes:
for f in self._routes[routing_key]: for f in self._routes[routing_key]:
LOG.debug(_('Publishing to route %s'), f) LOG.debug(_('Publishing to route %s'), f)

View File

@@ -321,6 +321,8 @@ DEFINE_integer('auth_token_ttl', 3600, 'Seconds for auth tokens to linger')
DEFINE_string('state_path', os.path.join(os.path.dirname(__file__), '../'), DEFINE_string('state_path', os.path.join(os.path.dirname(__file__), '../'),
"Top-level directory for maintaining nova's state") "Top-level directory for maintaining nova's state")
DEFINE_string('lock_path', os.path.join(os.path.dirname(__file__), '../'),
"Directory for lock files")
DEFINE_string('logdir', None, 'output to a per-service log file in named ' DEFINE_string('logdir', None, 'output to a per-service log file in named '
'directory') 'directory')
@@ -346,7 +348,7 @@ DEFINE_string('scheduler_manager', 'nova.scheduler.manager.SchedulerManager',
'Manager for scheduler') 'Manager for scheduler')
# The service to use for image search and retrieval # The service to use for image search and retrieval
DEFINE_string('image_service', 'nova.image.s3.S3ImageService', DEFINE_string('image_service', 'nova.image.local.LocalImageService',
'The service to use for retrieving and searching for images.') 'The service to use for retrieving and searching for images.')
DEFINE_string('host', socket.gethostname(), DEFINE_string('host', socket.gethostname(),
@@ -354,3 +356,7 @@ DEFINE_string('host', socket.gethostname(),
DEFINE_string('node_availability_zone', 'nova', DEFINE_string('node_availability_zone', 'nova',
'availability zone of this node') 'availability zone of this node')
DEFINE_string('zone_name', 'nova', 'name of this zone')
DEFINE_string('zone_capabilities', 'kypervisor:xenserver;os:linux',
'Key/Value tags which represent capabilities of this zone')

49
nova/scheduler/api.py Normal file
View File

@@ -0,0 +1,49 @@
# Copyright (c) 2011 Openstack, LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Handles all requests relating to schedulers.
"""
from nova import flags
from nova import log as logging
from nova import rpc
FLAGS = flags.FLAGS
LOG = logging.getLogger('nova.scheduler.api')
class API(object):
"""API for interacting with the scheduler."""
def _call_scheduler(self, method, context, params=None):
"""Generic handler for RPC calls to the scheduler.
:param params: Optional dictionary of arguments to be passed to the
scheduler worker
:retval: Result returned by scheduler worker
"""
if not params:
params = {}
queue = FLAGS.scheduler_topic
kwargs = {'method': method, 'args': params}
return rpc.call(context, queue, kwargs)
def get_zone_list(self, context):
items = self._call_scheduler('get_zone_list', context)
for item in items:
item['api_url'] = item['api_url'].replace('\\/', '/')
return items

View File

@@ -29,6 +29,7 @@ from nova import log as logging
from nova import manager from nova import manager
from nova import rpc from nova import rpc
from nova import utils from nova import utils
from nova.scheduler import zone_manager
LOG = logging.getLogger('nova.scheduler.manager') LOG = logging.getLogger('nova.scheduler.manager')
FLAGS = flags.FLAGS FLAGS = flags.FLAGS
@@ -43,12 +44,21 @@ class SchedulerManager(manager.Manager):
if not scheduler_driver: if not scheduler_driver:
scheduler_driver = FLAGS.scheduler_driver scheduler_driver = FLAGS.scheduler_driver
self.driver = utils.import_object(scheduler_driver) self.driver = utils.import_object(scheduler_driver)
self.zone_manager = zone_manager.ZoneManager()
super(SchedulerManager, self).__init__(*args, **kwargs) super(SchedulerManager, self).__init__(*args, **kwargs)
def __getattr__(self, key): def __getattr__(self, key):
"""Converts all method calls to use the schedule method""" """Converts all method calls to use the schedule method"""
return functools.partial(self._schedule, key) return functools.partial(self._schedule, key)
def periodic_tasks(self, context=None):
"""Poll child zones periodically to get status."""
self.zone_manager.ping(context)
def get_zone_list(self, context=None):
"""Get a list of zones from the ZoneManager."""
return self.zone_manager.get_zone_list()
def _schedule(self, method, context, topic, *args, **kwargs): def _schedule(self, method, context, topic, *args, **kwargs):
"""Tries to call schedule_* method on the driver to retrieve host. """Tries to call schedule_* method on the driver to retrieve host.

View File

@@ -0,0 +1,143 @@
# Copyright (c) 2011 Openstack, LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
ZoneManager oversees all communications with child Zones.
"""
import novaclient
import thread
import traceback
from datetime import datetime
from eventlet import greenpool
from nova import db
from nova import flags
from nova import log as logging
FLAGS = flags.FLAGS
flags.DEFINE_integer('zone_db_check_interval', 60,
'Seconds between getting fresh zone info from db.')
flags.DEFINE_integer('zone_failures_to_offline', 3,
'Number of consecutive errors before marking zone offline')
class ZoneState(object):
"""Holds the state of all connected child zones."""
def __init__(self):
self.is_active = True
self.name = None
self.capabilities = None
self.attempt = 0
self.last_seen = datetime.min
self.last_exception = None
self.last_exception_time = None
def update_credentials(self, zone):
"""Update zone credentials from db"""
self.zone_id = zone.id
self.api_url = zone.api_url
self.username = zone.username
self.password = zone.password
def update_metadata(self, zone_metadata):
"""Update zone metadata after successful communications with
child zone."""
self.last_seen = datetime.now()
self.attempt = 0
self.name = zone_metadata["name"]
self.capabilities = zone_metadata["capabilities"]
self.is_active = True
def to_dict(self):
return dict(name=self.name, capabilities=self.capabilities,
is_active=self.is_active, api_url=self.api_url,
id=self.zone_id)
def log_error(self, exception):
"""Something went wrong. Check to see if zone should be
marked as offline."""
self.last_exception = exception
self.last_exception_time = datetime.now()
api_url = self.api_url
logging.warning(_("'%(exception)s' error talking to "
"zone %(api_url)s") % locals())
max_errors = FLAGS.zone_failures_to_offline
self.attempt += 1
if self.attempt >= max_errors:
self.is_active = False
logging.error(_("No answer from zone %(api_url)s "
"after %(max_errors)d "
"attempts. Marking inactive.") % locals())
def _call_novaclient(zone):
"""Call novaclient. Broken out for testing purposes."""
client = novaclient.OpenStack(zone.username, zone.password, zone.api_url)
return client.zones.info()._info
def _poll_zone(zone):
"""Eventlet worker to poll a zone."""
logging.debug(_("Polling zone: %s") % zone.api_url)
try:
zone.update_metadata(_call_novaclient(zone))
except Exception, e:
zone.log_error(traceback.format_exc())
class ZoneManager(object):
"""Keeps the zone states updated."""
def __init__(self):
self.last_zone_db_check = datetime.min
self.zone_states = {}
self.green_pool = greenpool.GreenPool()
def get_zone_list(self):
"""Return the list of zones we know about."""
return [zone.to_dict() for zone in self.zone_states.values()]
def _refresh_from_db(self, context):
"""Make our zone state map match the db."""
# Add/update existing zones ...
zones = db.zone_get_all(context)
existing = self.zone_states.keys()
db_keys = []
for zone in zones:
db_keys.append(zone.id)
if zone.id not in existing:
self.zone_states[zone.id] = ZoneState()
self.zone_states[zone.id].update_credentials(zone)
# Cleanup zones removed from db ...
keys = self.zone_states.keys() # since we're deleting
for zone_id in keys:
if zone_id not in db_keys:
del self.zone_states[zone_id]
def _poll_zones(self, context):
"""Try to connect to each child zone and get update."""
self.green_pool.imap(_poll_zone, self.zone_states.values())
def ping(self, context=None):
"""Ping should be called periodically to update zone status."""
diff = datetime.now() - self.last_zone_db_check
if diff.seconds >= FLAGS.zone_db_check_interval:
logging.debug(_("Updating zone cache from db."))
self.last_zone_db_check = datetime.now()
self._refresh_from_db(context)
self._poll_zones(context)

View File

@@ -2,6 +2,7 @@
# Copyright 2010 United States Government as represented by the # Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration. # Administrator of the National Aeronautics and Space Administration.
# Copyright 2011 Justin Santa Barbara
# All Rights Reserved. # All Rights Reserved.
# #
# Licensed under the Apache License, Version 2.0 (the "License"); you may # Licensed under the Apache License, Version 2.0 (the "License"); you may
@@ -39,6 +40,7 @@ from nova import flags
from nova import rpc from nova import rpc
from nova import utils from nova import utils
from nova import version from nova import version
from nova import wsgi
FLAGS = flags.FLAGS FLAGS = flags.FLAGS
@@ -48,6 +50,14 @@ flags.DEFINE_integer('report_interval', 10,
flags.DEFINE_integer('periodic_interval', 60, flags.DEFINE_integer('periodic_interval', 60,
'seconds between running periodic tasks', 'seconds between running periodic tasks',
lower_bound=1) lower_bound=1)
flags.DEFINE_string('ec2_listen', "0.0.0.0",
'IP address for EC2 API to listen')
flags.DEFINE_integer('ec2_listen_port', 8773, 'port for ec2 api to listen')
flags.DEFINE_string('osapi_listen', "0.0.0.0",
'IP address for OpenStack API to listen')
flags.DEFINE_integer('osapi_listen_port', 8774, 'port for os api to listen')
flags.DEFINE_string('api_paste_config', "api-paste.ini",
'File name for the paste.deploy config for nova-api')
class Service(object): class Service(object):
@@ -210,6 +220,41 @@ class Service(object):
logging.exception(_("model server went away")) logging.exception(_("model server went away"))
class WsgiService(object):
"""Base class for WSGI based services.
For each api you define, you must also define these flags:
:<api>_listen: The address on which to listen
:<api>_listen_port: The port on which to listen
"""
def __init__(self, conf, apis):
self.conf = conf
self.apis = apis
self.wsgi_app = None
def start(self):
self.wsgi_app = _run_wsgi(self.conf, self.apis)
def wait(self):
self.wsgi_app.wait()
class ApiService(WsgiService):
"""Class for our nova-api service"""
@classmethod
def create(cls, conf=None):
if not conf:
conf = wsgi.paste_config_file(FLAGS.api_paste_config)
if not conf:
message = (_("No paste configuration found for: %s"),
FLAGS.api_paste_config)
raise exception.Error(message)
api_endpoints = ['ec2', 'osapi']
service = cls(conf, api_endpoints)
return service
def serve(*services): def serve(*services):
try: try:
if not services: if not services:
@@ -239,3 +284,46 @@ def serve(*services):
def wait(): def wait():
while True: while True:
greenthread.sleep(5) greenthread.sleep(5)
def serve_wsgi(cls, conf=None):
try:
service = cls.create(conf)
except Exception:
logging.exception('in WsgiService.create()')
raise
finally:
# After we've loaded up all our dynamic bits, check
# whether we should print help
flags.DEFINE_flag(flags.HelpFlag())
flags.DEFINE_flag(flags.HelpshortFlag())
flags.DEFINE_flag(flags.HelpXMLFlag())
FLAGS.ParseNewFlags()
service.start()
return service
def _run_wsgi(paste_config_file, apis):
logging.debug(_("Using paste.deploy config at: %s"), paste_config_file)
apps = []
for api in apis:
config = wsgi.load_paste_configuration(paste_config_file, api)
if config is None:
logging.debug(_("No paste configuration for app: %s"), api)
continue
logging.debug(_("App Config: %(api)s\n%(config)r") % locals())
logging.info(_("Running %s API"), api)
app = wsgi.load_paste_app(paste_config_file, api)
apps.append((app, getattr(FLAGS, "%s_listen_port" % api),
getattr(FLAGS, "%s_listen" % api)))
if len(apps) == 0:
logging.error(_("No known API applications configured in %s."),
paste_config_file)
return
server = wsgi.Server()
for app in apps:
server.start(*app)
return server

View File

@@ -32,6 +32,7 @@ flags.DECLARE('fake_network', 'nova.network.manager')
FLAGS.network_size = 8 FLAGS.network_size = 8
FLAGS.num_networks = 2 FLAGS.num_networks = 2
FLAGS.fake_network = True FLAGS.fake_network = True
FLAGS.image_service = 'nova.image.local.LocalImageService'
flags.DECLARE('num_shelves', 'nova.volume.driver') flags.DECLARE('num_shelves', 'nova.volume.driver')
flags.DECLARE('blades_per_shelf', 'nova.volume.driver') flags.DECLARE('blades_per_shelf', 'nova.volume.driver')
flags.DECLARE('iscsi_num_targets', 'nova.volume.driver') flags.DECLARE('iscsi_num_targets', 'nova.volume.driver')

View File

@@ -38,6 +38,8 @@ from nova import test
from nova.auth import manager from nova.auth import manager
from nova.compute import power_state from nova.compute import power_state
from nova.api.ec2 import cloud from nova.api.ec2 import cloud
from nova.api.ec2 import ec2utils
from nova.image import local
from nova.objectstore import image from nova.objectstore import image
@@ -76,6 +78,12 @@ class CloudTestCase(test.TestCase):
project=self.project) project=self.project)
host = self.network.get_network_host(self.context.elevated()) host = self.network.get_network_host(self.context.elevated())
def fake_show(meh, context, id):
return {'id': 1, 'properties': {'kernel_id': 1, 'ramdisk_id': 1}}
self.stubs.Set(local.LocalImageService, 'show', fake_show)
self.stubs.Set(local.LocalImageService, 'show_by_name', fake_show)
def tearDown(self): def tearDown(self):
network_ref = db.project_get_network(self.context, network_ref = db.project_get_network(self.context,
self.project.id) self.project.id)
@@ -122,7 +130,7 @@ class CloudTestCase(test.TestCase):
self.cloud.allocate_address(self.context) self.cloud.allocate_address(self.context)
inst = db.instance_create(self.context, {'host': self.compute.host}) inst = db.instance_create(self.context, {'host': self.compute.host})
fixed = self.network.allocate_fixed_ip(self.context, inst['id']) fixed = self.network.allocate_fixed_ip(self.context, inst['id'])
ec2_id = cloud.id_to_ec2_id(inst['id']) ec2_id = ec2utils.id_to_ec2_id(inst['id'])
self.cloud.associate_address(self.context, self.cloud.associate_address(self.context,
instance_id=ec2_id, instance_id=ec2_id,
public_ip=address) public_ip=address)
@@ -158,12 +166,12 @@ class CloudTestCase(test.TestCase):
vol2 = db.volume_create(self.context, {}) vol2 = db.volume_create(self.context, {})
result = self.cloud.describe_volumes(self.context) result = self.cloud.describe_volumes(self.context)
self.assertEqual(len(result['volumeSet']), 2) self.assertEqual(len(result['volumeSet']), 2)
volume_id = cloud.id_to_ec2_id(vol2['id'], 'vol-%08x') volume_id = ec2utils.id_to_ec2_id(vol2['id'], 'vol-%08x')
result = self.cloud.describe_volumes(self.context, result = self.cloud.describe_volumes(self.context,
volume_id=[volume_id]) volume_id=[volume_id])
self.assertEqual(len(result['volumeSet']), 1) self.assertEqual(len(result['volumeSet']), 1)
self.assertEqual( self.assertEqual(
cloud.ec2_id_to_id(result['volumeSet'][0]['volumeId']), ec2utils.ec2_id_to_id(result['volumeSet'][0]['volumeId']),
vol2['id']) vol2['id'])
db.volume_destroy(self.context, vol1['id']) db.volume_destroy(self.context, vol1['id'])
db.volume_destroy(self.context, vol2['id']) db.volume_destroy(self.context, vol2['id'])
@@ -188,8 +196,10 @@ class CloudTestCase(test.TestCase):
def test_describe_instances(self): def test_describe_instances(self):
"""Makes sure describe_instances works and filters results.""" """Makes sure describe_instances works and filters results."""
inst1 = db.instance_create(self.context, {'reservation_id': 'a', inst1 = db.instance_create(self.context, {'reservation_id': 'a',
'image_id': 1,
'host': 'host1'}) 'host': 'host1'})
inst2 = db.instance_create(self.context, {'reservation_id': 'a', inst2 = db.instance_create(self.context, {'reservation_id': 'a',
'image_id': 1,
'host': 'host2'}) 'host': 'host2'})
comp1 = db.service_create(self.context, {'host': 'host1', comp1 = db.service_create(self.context, {'host': 'host1',
'availability_zone': 'zone1', 'availability_zone': 'zone1',
@@ -200,7 +210,7 @@ class CloudTestCase(test.TestCase):
result = self.cloud.describe_instances(self.context) result = self.cloud.describe_instances(self.context)
result = result['reservationSet'][0] result = result['reservationSet'][0]
self.assertEqual(len(result['instancesSet']), 2) self.assertEqual(len(result['instancesSet']), 2)
instance_id = cloud.id_to_ec2_id(inst2['id']) instance_id = ec2utils.id_to_ec2_id(inst2['id'])
result = self.cloud.describe_instances(self.context, result = self.cloud.describe_instances(self.context,
instance_id=[instance_id]) instance_id=[instance_id])
result = result['reservationSet'][0] result = result['reservationSet'][0]
@@ -215,10 +225,9 @@ class CloudTestCase(test.TestCase):
db.service_destroy(self.context, comp2['id']) db.service_destroy(self.context, comp2['id'])
def test_console_output(self): def test_console_output(self):
image_id = FLAGS.default_image
instance_type = FLAGS.default_instance_type instance_type = FLAGS.default_instance_type
max_count = 1 max_count = 1
kwargs = {'image_id': image_id, kwargs = {'image_id': 'ami-1',
'instance_type': instance_type, 'instance_type': instance_type,
'max_count': max_count} 'max_count': max_count}
rv = self.cloud.run_instances(self.context, **kwargs) rv = self.cloud.run_instances(self.context, **kwargs)
@@ -234,8 +243,7 @@ class CloudTestCase(test.TestCase):
greenthread.sleep(0.3) greenthread.sleep(0.3)
def test_ajax_console(self): def test_ajax_console(self):
image_id = FLAGS.default_image kwargs = {'image_id': 'ami-1'}
kwargs = {'image_id': image_id}
rv = self.cloud.run_instances(self.context, **kwargs) rv = self.cloud.run_instances(self.context, **kwargs)
instance_id = rv['instancesSet'][0]['instanceId'] instance_id = rv['instancesSet'][0]['instanceId']
greenthread.sleep(0.3) greenthread.sleep(0.3)
@@ -347,7 +355,7 @@ class CloudTestCase(test.TestCase):
def test_update_of_instance_display_fields(self): def test_update_of_instance_display_fields(self):
inst = db.instance_create(self.context, {}) inst = db.instance_create(self.context, {})
ec2_id = cloud.id_to_ec2_id(inst['id']) ec2_id = ec2utils.id_to_ec2_id(inst['id'])
self.cloud.update_instance(self.context, ec2_id, self.cloud.update_instance(self.context, ec2_id,
display_name='c00l 1m4g3') display_name='c00l 1m4g3')
inst = db.instance_get(self.context, inst['id']) inst = db.instance_get(self.context, inst['id'])
@@ -365,7 +373,7 @@ class CloudTestCase(test.TestCase):
def test_update_of_volume_display_fields(self): def test_update_of_volume_display_fields(self):
vol = db.volume_create(self.context, {}) vol = db.volume_create(self.context, {})
self.cloud.update_volume(self.context, self.cloud.update_volume(self.context,
cloud.id_to_ec2_id(vol['id'], 'vol-%08x'), ec2utils.id_to_ec2_id(vol['id'], 'vol-%08x'),
display_name='c00l v0lum3') display_name='c00l v0lum3')
vol = db.volume_get(self.context, vol['id']) vol = db.volume_get(self.context, vol['id'])
self.assertEqual('c00l v0lum3', vol['display_name']) self.assertEqual('c00l v0lum3', vol['display_name'])
@@ -374,7 +382,7 @@ class CloudTestCase(test.TestCase):
def test_update_of_volume_wont_update_private_fields(self): def test_update_of_volume_wont_update_private_fields(self):
vol = db.volume_create(self.context, {}) vol = db.volume_create(self.context, {})
self.cloud.update_volume(self.context, self.cloud.update_volume(self.context,
cloud.id_to_ec2_id(vol['id'], 'vol-%08x'), ec2utils.id_to_ec2_id(vol['id'], 'vol-%08x'),
mountpoint='/not/here') mountpoint='/not/here')
vol = db.volume_get(self.context, vol['id']) vol = db.volume_get(self.context, vol['id'])
self.assertEqual(None, vol['mountpoint']) self.assertEqual(None, vol['mountpoint'])

View File

@@ -31,7 +31,7 @@ from nova import test
from nova import utils from nova import utils
from nova.auth import manager from nova.auth import manager
from nova.compute import instance_types from nova.compute import instance_types
from nova.image import local
LOG = logging.getLogger('nova.tests.compute') LOG = logging.getLogger('nova.tests.compute')
FLAGS = flags.FLAGS FLAGS = flags.FLAGS
@@ -52,6 +52,11 @@ class ComputeTestCase(test.TestCase):
self.project = self.manager.create_project('fake', 'fake', 'fake') self.project = self.manager.create_project('fake', 'fake', 'fake')
self.context = context.RequestContext('fake', 'fake', False) self.context = context.RequestContext('fake', 'fake', False)
def fake_show(meh, context, id):
return {'id': 1, 'properties': {'kernel_id': 1, 'ramdisk_id': 1}}
self.stubs.Set(local.LocalImageService, 'show', fake_show)
def tearDown(self): def tearDown(self):
self.manager.delete_user(self.user) self.manager.delete_user(self.user)
self.manager.delete_project(self.project) self.manager.delete_project(self.project)
@@ -60,7 +65,7 @@ class ComputeTestCase(test.TestCase):
def _create_instance(self, params={}): def _create_instance(self, params={}):
"""Create a test instance""" """Create a test instance"""
inst = {} inst = {}
inst['image_id'] = 'ami-test' inst['image_id'] = 1
inst['reservation_id'] = 'r-fakeres' inst['reservation_id'] = 'r-fakeres'
inst['launch_time'] = '10' inst['launch_time'] = '10'
inst['user_id'] = self.user.id inst['user_id'] = self.user.id

View File

@@ -57,7 +57,7 @@ class ConsoleTestCase(test.TestCase):
inst = {} inst = {}
#inst['host'] = self.host #inst['host'] = self.host
#inst['name'] = 'instance-1234' #inst['name'] = 'instance-1234'
inst['image_id'] = 'ami-test' inst['image_id'] = 1
inst['reservation_id'] = 'r-fakeres' inst['reservation_id'] = 'r-fakeres'
inst['launch_time'] = '10' inst['launch_time'] = '10'
inst['user_id'] = self.user.id inst['user_id'] = self.user.id

View File

@@ -59,6 +59,7 @@ class DirectTestCase(test.TestCase):
req.headers['X-OpenStack-User'] = 'user1' req.headers['X-OpenStack-User'] = 'user1'
req.headers['X-OpenStack-Project'] = 'proj1' req.headers['X-OpenStack-Project'] = 'proj1'
resp = req.get_response(self.auth_router) resp = req.get_response(self.auth_router)
self.assertEqual(resp.status_int, 200)
data = json.loads(resp.body) data = json.loads(resp.body)
self.assertEqual(data['user'], 'user1') self.assertEqual(data['user'], 'user1')
self.assertEqual(data['project'], 'proj1') self.assertEqual(data['project'], 'proj1')
@@ -69,6 +70,7 @@ class DirectTestCase(test.TestCase):
req.method = 'POST' req.method = 'POST'
req.body = 'json=%s' % json.dumps({'data': 'foo'}) req.body = 'json=%s' % json.dumps({'data': 'foo'})
resp = req.get_response(self.router) resp = req.get_response(self.router)
self.assertEqual(resp.status_int, 200)
resp_parsed = json.loads(resp.body) resp_parsed = json.loads(resp.body)
self.assertEqual(resp_parsed['data'], 'foo') self.assertEqual(resp_parsed['data'], 'foo')
@@ -78,6 +80,7 @@ class DirectTestCase(test.TestCase):
req.method = 'POST' req.method = 'POST'
req.body = 'data=foo' req.body = 'data=foo'
resp = req.get_response(self.router) resp = req.get_response(self.router)
self.assertEqual(resp.status_int, 200)
resp_parsed = json.loads(resp.body) resp_parsed = json.loads(resp.body)
self.assertEqual(resp_parsed['data'], 'foo') self.assertEqual(resp_parsed['data'], 'foo')
@@ -90,8 +93,7 @@ class DirectTestCase(test.TestCase):
class DirectCloudTestCase(test_cloud.CloudTestCase): class DirectCloudTestCase(test_cloud.CloudTestCase):
def setUp(self): def setUp(self):
super(DirectCloudTestCase, self).setUp() super(DirectCloudTestCase, self).setUp()
compute_handle = compute.API(image_service=self.cloud.image_service, compute_handle = compute.API(network_api=self.cloud.network_api,
network_api=self.cloud.network_api,
volume_api=self.cloud.volume_api) volume_api=self.cloud.volume_api)
direct.register_service('compute', compute_handle) direct.register_service('compute', compute_handle)
self.router = direct.JsonParamsMiddleware(direct.Router()) self.router = direct.JsonParamsMiddleware(direct.Router())

View File

@@ -14,10 +14,12 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import errno
import os import os
import select
from nova import test from nova import test
from nova.utils import parse_mailmap, str_dict_replace from nova.utils import parse_mailmap, str_dict_replace, synchronized
class ProjectTestCase(test.TestCase): class ProjectTestCase(test.TestCase):
@@ -55,3 +57,47 @@ class ProjectTestCase(test.TestCase):
'%r not listed in Authors' % missing) '%r not listed in Authors' % missing)
finally: finally:
tree.unlock() tree.unlock()
class LockTestCase(test.TestCase):
def test_synchronized_wrapped_function_metadata(self):
@synchronized('whatever')
def foo():
"""Bar"""
pass
self.assertEquals(foo.__doc__, 'Bar', "Wrapped function's docstring "
"got lost")
self.assertEquals(foo.__name__, 'foo', "Wrapped function's name "
"got mangled")
def test_synchronized(self):
rpipe1, wpipe1 = os.pipe()
rpipe2, wpipe2 = os.pipe()
@synchronized('testlock')
def f(rpipe, wpipe):
try:
os.write(wpipe, "foo")
except OSError, e:
self.assertEquals(e.errno, errno.EPIPE)
return
rfds, _, __ = select.select([rpipe], [], [], 1)
self.assertEquals(len(rfds), 0, "The other process, which was"
" supposed to be locked, "
"wrote on its end of the "
"pipe")
os.close(rpipe)
pid = os.fork()
if pid > 0:
os.close(wpipe1)
os.close(rpipe2)
f(rpipe1, wpipe2)
else:
os.close(rpipe1)
os.close(wpipe2)
f(rpipe2, wpipe1)
os._exit(0)

View File

@@ -29,11 +29,153 @@ from nova import log as logging
from nova import test from nova import test
from nova import utils from nova import utils
from nova.auth import manager from nova.auth import manager
from nova.network import linux_net
FLAGS = flags.FLAGS FLAGS = flags.FLAGS
LOG = logging.getLogger('nova.tests.network') LOG = logging.getLogger('nova.tests.network')
class IptablesManagerTestCase(test.TestCase):
sample_filter = ['#Generated by iptables-save on Fri Feb 18 15:17:05 2011',
'*filter',
':INPUT ACCEPT [2223527:305688874]',
':FORWARD ACCEPT [0:0]',
':OUTPUT ACCEPT [2172501:140856656]',
':nova-compute-FORWARD - [0:0]',
':nova-compute-INPUT - [0:0]',
':nova-compute-local - [0:0]',
':nova-compute-OUTPUT - [0:0]',
':nova-filter-top - [0:0]',
'-A FORWARD -j nova-filter-top ',
'-A OUTPUT -j nova-filter-top ',
'-A nova-filter-top -j nova-compute-local ',
'-A INPUT -j nova-compute-INPUT ',
'-A OUTPUT -j nova-compute-OUTPUT ',
'-A FORWARD -j nova-compute-FORWARD ',
'-A INPUT -i virbr0 -p udp -m udp --dport 53 -j ACCEPT ',
'-A INPUT -i virbr0 -p tcp -m tcp --dport 53 -j ACCEPT ',
'-A INPUT -i virbr0 -p udp -m udp --dport 67 -j ACCEPT ',
'-A INPUT -i virbr0 -p tcp -m tcp --dport 67 -j ACCEPT ',
'-A FORWARD -s 192.168.122.0/24 -i virbr0 -j ACCEPT ',
'-A FORWARD -i virbr0 -o virbr0 -j ACCEPT ',
'-A FORWARD -o virbr0 -j REJECT --reject-with '
'icmp-port-unreachable ',
'-A FORWARD -i virbr0 -j REJECT --reject-with '
'icmp-port-unreachable ',
'COMMIT',
'# Completed on Fri Feb 18 15:17:05 2011']
sample_nat = ['# Generated by iptables-save on Fri Feb 18 15:17:05 2011',
'*nat',
':PREROUTING ACCEPT [3936:762355]',
':INPUT ACCEPT [2447:225266]',
':OUTPUT ACCEPT [63491:4191863]',
':POSTROUTING ACCEPT [63112:4108641]',
':nova-compute-OUTPUT - [0:0]',
':nova-compute-floating-ip-snat - [0:0]',
':nova-compute-SNATTING - [0:0]',
':nova-compute-PREROUTING - [0:0]',
':nova-compute-POSTROUTING - [0:0]',
':nova-postrouting-bottom - [0:0]',
'-A PREROUTING -j nova-compute-PREROUTING ',
'-A OUTPUT -j nova-compute-OUTPUT ',
'-A POSTROUTING -j nova-compute-POSTROUTING ',
'-A POSTROUTING -j nova-postrouting-bottom ',
'-A nova-postrouting-bottom -j nova-compute-SNATTING ',
'-A nova-compute-SNATTING -j nova-compute-floating-ip-snat ',
'COMMIT',
'# Completed on Fri Feb 18 15:17:05 2011']
def setUp(self):
super(IptablesManagerTestCase, self).setUp()
self.manager = linux_net.IptablesManager()
def test_filter_rules_are_wrapped(self):
current_lines = self.sample_filter
table = self.manager.ipv4['filter']
table.add_rule('FORWARD', '-s 1.2.3.4/5 -j DROP')
new_lines = self.manager._modify_rules(current_lines, table)
self.assertTrue('-A run_tests.py-FORWARD '
'-s 1.2.3.4/5 -j DROP' in new_lines)
table.remove_rule('FORWARD', '-s 1.2.3.4/5 -j DROP')
new_lines = self.manager._modify_rules(current_lines, table)
self.assertTrue('-A run_tests.py-FORWARD '
'-s 1.2.3.4/5 -j DROP' not in new_lines)
def test_nat_rules(self):
current_lines = self.sample_nat
new_lines = self.manager._modify_rules(current_lines,
self.manager.ipv4['nat'])
for line in [':nova-compute-OUTPUT - [0:0]',
':nova-compute-floating-ip-snat - [0:0]',
':nova-compute-SNATTING - [0:0]',
':nova-compute-PREROUTING - [0:0]',
':nova-compute-POSTROUTING - [0:0]']:
self.assertTrue(line in new_lines, "One of nova-compute's chains "
"went missing.")
seen_lines = set()
for line in new_lines:
line = line.strip()
self.assertTrue(line not in seen_lines,
"Duplicate line: %s" % line)
seen_lines.add(line)
last_postrouting_line = ''
for line in new_lines:
if line.startswith('-A POSTROUTING'):
last_postrouting_line = line
self.assertTrue('-j nova-postrouting-bottom' in last_postrouting_line,
"Last POSTROUTING rule does not jump to "
"nova-postouting-bottom: %s" % last_postrouting_line)
for chain in ['POSTROUTING', 'PREROUTING', 'OUTPUT']:
self.assertTrue('-A %s -j run_tests.py-%s' \
% (chain, chain) in new_lines,
"Built-in chain %s not wrapped" % (chain,))
def test_filter_rules(self):
current_lines = self.sample_filter
new_lines = self.manager._modify_rules(current_lines,
self.manager.ipv4['filter'])
for line in [':nova-compute-FORWARD - [0:0]',
':nova-compute-INPUT - [0:0]',
':nova-compute-local - [0:0]',
':nova-compute-OUTPUT - [0:0]']:
self.assertTrue(line in new_lines, "One of nova-compute's chains"
" went missing.")
seen_lines = set()
for line in new_lines:
line = line.strip()
self.assertTrue(line not in seen_lines,
"Duplicate line: %s" % line)
seen_lines.add(line)
for chain in ['FORWARD', 'OUTPUT']:
for line in new_lines:
if line.startswith('-A %s' % chain):
self.assertTrue('-j nova-filter-top' in line,
"First %s rule does not "
"jump to nova-filter-top" % chain)
break
self.assertTrue('-A nova-filter-top '
'-j run_tests.py-local' in new_lines,
"nova-filter-top does not jump to wrapped local chain")
for chain in ['INPUT', 'OUTPUT', 'FORWARD']:
self.assertTrue('-A %s -j run_tests.py-%s' \
% (chain, chain) in new_lines,
"Built-in chain %s not wrapped" % (chain,))
class NetworkTestCase(test.TestCase): class NetworkTestCase(test.TestCase):
"""Test cases for network code""" """Test cases for network code"""
def setUp(self): def setUp(self):
@@ -343,13 +485,13 @@ def lease_ip(private_ip):
private_ip) private_ip)
instance_ref = db.fixed_ip_get_instance(context.get_admin_context(), instance_ref = db.fixed_ip_get_instance(context.get_admin_context(),
private_ip) private_ip)
cmd = "%s add %s %s fake" % (binpath('nova-dhcpbridge'), cmd = (binpath('nova-dhcpbridge'), 'add',
instance_ref['mac_address'], instance_ref['mac_address'],
private_ip) private_ip, 'fake')
env = {'DNSMASQ_INTERFACE': network_ref['bridge'], env = {'DNSMASQ_INTERFACE': network_ref['bridge'],
'TESTING': '1', 'TESTING': '1',
'FLAGFILE': FLAGS.dhcpbridge_flagfile} 'FLAGFILE': FLAGS.dhcpbridge_flagfile}
(out, err) = utils.execute(cmd, addl_env=env) (out, err) = utils.execute(*cmd, addl_env=env)
LOG.debug("ISSUE_IP: %s, %s ", out, err) LOG.debug("ISSUE_IP: %s, %s ", out, err)
@@ -359,11 +501,11 @@ def release_ip(private_ip):
private_ip) private_ip)
instance_ref = db.fixed_ip_get_instance(context.get_admin_context(), instance_ref = db.fixed_ip_get_instance(context.get_admin_context(),
private_ip) private_ip)
cmd = "%s del %s %s fake" % (binpath('nova-dhcpbridge'), cmd = (binpath('nova-dhcpbridge'), 'del',
instance_ref['mac_address'], instance_ref['mac_address'],
private_ip) private_ip, 'fake')
env = {'DNSMASQ_INTERFACE': network_ref['bridge'], env = {'DNSMASQ_INTERFACE': network_ref['bridge'],
'TESTING': '1', 'TESTING': '1',
'FLAGFILE': FLAGS.dhcpbridge_flagfile} 'FLAGFILE': FLAGS.dhcpbridge_flagfile}
(out, err) = utils.execute(cmd, addl_env=env) (out, err) = utils.execute(*cmd, addl_env=env)
LOG.debug("RELEASE_IP: %s, %s ", out, err) LOG.debug("RELEASE_IP: %s, %s ", out, err)

View File

@@ -155,7 +155,7 @@ class SimpleDriverTestCase(test.TestCase):
def _create_instance(self, **kwargs): def _create_instance(self, **kwargs):
"""Create a test instance""" """Create a test instance"""
inst = {} inst = {}
inst['image_id'] = 'ami-test' inst['image_id'] = 1
inst['reservation_id'] = 'r-fakeres' inst['reservation_id'] = 'r-fakeres'
inst['user_id'] = self.user.id inst['user_id'] = self.user.id
inst['project_id'] = self.project.id inst['project_id'] = self.project.id
@@ -169,8 +169,6 @@ class SimpleDriverTestCase(test.TestCase):
def _create_volume(self): def _create_volume(self):
"""Create a test volume""" """Create a test volume"""
vol = {} vol = {}
vol['image_id'] = 'ami-test'
vol['reservation_id'] = 'r-fakeres'
vol['size'] = 1 vol['size'] = 1
vol['availability_zone'] = 'test' vol['availability_zone'] = 'test'
return db.volume_create(self.context, vol)['id'] return db.volume_create(self.context, vol)['id']

View File

@@ -14,6 +14,10 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import re
import os
import eventlet
from xml.etree.ElementTree import fromstring as xml_to_tree from xml.etree.ElementTree import fromstring as xml_to_tree
from xml.dom.minidom import parseString as xml_to_dom from xml.dom.minidom import parseString as xml_to_dom
@@ -30,6 +34,70 @@ FLAGS = flags.FLAGS
flags.DECLARE('instances_path', 'nova.compute.manager') flags.DECLARE('instances_path', 'nova.compute.manager')
def _concurrency(wait, done, target):
wait.wait()
done.send()
class CacheConcurrencyTestCase(test.TestCase):
def setUp(self):
super(CacheConcurrencyTestCase, self).setUp()
def fake_exists(fname):
basedir = os.path.join(FLAGS.instances_path, '_base')
if fname == basedir:
return True
return False
def fake_execute(*args, **kwargs):
pass
self.stubs.Set(os.path, 'exists', fake_exists)
self.stubs.Set(utils, 'execute', fake_execute)
def test_same_fname_concurrency(self):
"""Ensures that the same fname cache runs at a sequentially"""
conn = libvirt_conn.LibvirtConnection
wait1 = eventlet.event.Event()
done1 = eventlet.event.Event()
eventlet.spawn(conn._cache_image, _concurrency,
'target', 'fname', False, wait1, done1)
wait2 = eventlet.event.Event()
done2 = eventlet.event.Event()
eventlet.spawn(conn._cache_image, _concurrency,
'target', 'fname', False, wait2, done2)
wait2.send()
eventlet.sleep(0)
try:
self.assertFalse(done2.ready())
self.assertTrue('fname' in conn._image_sems)
finally:
wait1.send()
done1.wait()
eventlet.sleep(0)
self.assertTrue(done2.ready())
self.assertFalse('fname' in conn._image_sems)
def test_different_fname_concurrency(self):
"""Ensures that two different fname caches are concurrent"""
conn = libvirt_conn.LibvirtConnection
wait1 = eventlet.event.Event()
done1 = eventlet.event.Event()
eventlet.spawn(conn._cache_image, _concurrency,
'target', 'fname2', False, wait1, done1)
wait2 = eventlet.event.Event()
done2 = eventlet.event.Event()
eventlet.spawn(conn._cache_image, _concurrency,
'target', 'fname1', False, wait2, done2)
wait2.send()
eventlet.sleep(0)
try:
self.assertTrue(done2.ready())
finally:
wait1.send()
eventlet.sleep(0)
class LibvirtConnTestCase(test.TestCase): class LibvirtConnTestCase(test.TestCase):
def setUp(self): def setUp(self):
super(LibvirtConnTestCase, self).setUp() super(LibvirtConnTestCase, self).setUp()
@@ -234,16 +302,22 @@ class IptablesFirewallTestCase(test.TestCase):
self.manager.delete_user(self.user) self.manager.delete_user(self.user)
super(IptablesFirewallTestCase, self).tearDown() super(IptablesFirewallTestCase, self).tearDown()
in_rules = [ in_nat_rules = [
'# Generated by iptables-save v1.4.10 on Sat Feb 19 00:03:19 2011',
'*nat',
':PREROUTING ACCEPT [1170:189210]',
':INPUT ACCEPT [844:71028]',
':OUTPUT ACCEPT [5149:405186]',
':POSTROUTING ACCEPT [5063:386098]'
]
in_filter_rules = [
'# Generated by iptables-save v1.4.4 on Mon Dec 6 11:54:13 2010', '# Generated by iptables-save v1.4.4 on Mon Dec 6 11:54:13 2010',
'*filter', '*filter',
':INPUT ACCEPT [969615:281627771]', ':INPUT ACCEPT [969615:281627771]',
':FORWARD ACCEPT [0:0]', ':FORWARD ACCEPT [0:0]',
':OUTPUT ACCEPT [915599:63811649]', ':OUTPUT ACCEPT [915599:63811649]',
':nova-block-ipv4 - [0:0]', ':nova-block-ipv4 - [0:0]',
'-A INPUT -i virbr0 -p udp -m udp --dport 53 -j ACCEPT ',
'-A INPUT -i virbr0 -p tcp -m tcp --dport 53 -j ACCEPT ',
'-A INPUT -i virbr0 -p udp -m udp --dport 67 -j ACCEPT ',
'-A INPUT -i virbr0 -p tcp -m tcp --dport 67 -j ACCEPT ', '-A INPUT -i virbr0 -p tcp -m tcp --dport 67 -j ACCEPT ',
'-A FORWARD -d 192.168.122.0/24 -o virbr0 -m state --state RELATED' '-A FORWARD -d 192.168.122.0/24 -o virbr0 -m state --state RELATED'
',ESTABLISHED -j ACCEPT ', ',ESTABLISHED -j ACCEPT ',
@@ -255,7 +329,7 @@ class IptablesFirewallTestCase(test.TestCase):
'# Completed on Mon Dec 6 11:54:13 2010', '# Completed on Mon Dec 6 11:54:13 2010',
] ]
in6_rules = [ in6_filter_rules = [
'# Generated by ip6tables-save v1.4.4 on Tue Jan 18 23:47:56 2011', '# Generated by ip6tables-save v1.4.4 on Tue Jan 18 23:47:56 2011',
'*filter', '*filter',
':INPUT ACCEPT [349155:75810423]', ':INPUT ACCEPT [349155:75810423]',
@@ -315,23 +389,34 @@ class IptablesFirewallTestCase(test.TestCase):
instance_ref = db.instance_get(admin_ctxt, instance_ref['id']) instance_ref = db.instance_get(admin_ctxt, instance_ref['id'])
# self.fw.add_instance(instance_ref) # self.fw.add_instance(instance_ref)
def fake_iptables_execute(cmd, process_input=None): def fake_iptables_execute(*cmd, **kwargs):
if cmd == 'sudo ip6tables-save -t filter': process_input = kwargs.get('process_input', None)
return '\n'.join(self.in6_rules), None if cmd == ('sudo', 'ip6tables-save', '-t', 'filter'):
if cmd == 'sudo iptables-save -t filter': return '\n'.join(self.in6_filter_rules), None
return '\n'.join(self.in_rules), None if cmd == ('sudo', 'iptables-save', '-t', 'filter'):
if cmd == 'sudo iptables-restore': return '\n'.join(self.in_filter_rules), None
self.out_rules = process_input.split('\n') if cmd == ('sudo', 'iptables-save', '-t', 'nat'):
return '\n'.join(self.in_nat_rules), None
if cmd == ('sudo', 'iptables-restore'):
lines = process_input.split('\n')
if '*filter' in lines:
self.out_rules = lines
return '', '' return '', ''
if cmd == 'sudo ip6tables-restore': if cmd == ('sudo', 'ip6tables-restore'):
self.out6_rules = process_input.split('\n') lines = process_input.split('\n')
if '*filter' in lines:
self.out6_rules = lines
return '', '' return '', ''
self.fw.execute = fake_iptables_execute print cmd, kwargs
from nova.network import linux_net
linux_net.iptables_manager.execute = fake_iptables_execute
self.fw.prepare_instance_filter(instance_ref) self.fw.prepare_instance_filter(instance_ref)
self.fw.apply_instance_filter(instance_ref) self.fw.apply_instance_filter(instance_ref)
in_rules = filter(lambda l: not l.startswith('#'), self.in_rules) in_rules = filter(lambda l: not l.startswith('#'),
self.in_filter_rules)
for rule in in_rules: for rule in in_rules:
if not 'nova' in rule: if not 'nova' in rule:
self.assertTrue(rule in self.out_rules, self.assertTrue(rule in self.out_rules,
@@ -354,17 +439,18 @@ class IptablesFirewallTestCase(test.TestCase):
self.assertTrue(security_group_chain, self.assertTrue(security_group_chain,
"The security group chain wasn't added") "The security group chain wasn't added")
self.assertTrue('-A %s -p icmp -s 192.168.11.0/24 -j ACCEPT' % \ regex = re.compile('-A .* -p icmp -s 192.168.11.0/24 -j ACCEPT')
security_group_chain in self.out_rules, self.assertTrue(len(filter(regex.match, self.out_rules)) > 0,
"ICMP acceptance rule wasn't added") "ICMP acceptance rule wasn't added")
self.assertTrue('-A %s -p icmp -s 192.168.11.0/24 -m icmp --icmp-type ' regex = re.compile('-A .* -p icmp -s 192.168.11.0/24 -m icmp '
'8 -j ACCEPT' % security_group_chain in self.out_rules, '--icmp-type 8 -j ACCEPT')
self.assertTrue(len(filter(regex.match, self.out_rules)) > 0,
"ICMP Echo Request acceptance rule wasn't added") "ICMP Echo Request acceptance rule wasn't added")
self.assertTrue('-A %s -p tcp -s 192.168.10.0/24 -m multiport ' regex = re.compile('-A .* -p tcp -s 192.168.10.0/24 -m multiport '
'--dports 80:81 -j ACCEPT' % security_group_chain \ '--dports 80:81 -j ACCEPT')
in self.out_rules, self.assertTrue(len(filter(regex.match, self.out_rules)) > 0,
"TCP port 80/81 acceptance rule wasn't added") "TCP port 80/81 acceptance rule wasn't added")
db.instance_destroy(admin_ctxt, instance_ref['id']) db.instance_destroy(admin_ctxt, instance_ref['id'])

View File

@@ -99,7 +99,7 @@ class VolumeTestCase(test.TestCase):
def test_run_attach_detach_volume(self): def test_run_attach_detach_volume(self):
"""Make sure volume can be attached and detached from instance.""" """Make sure volume can be attached and detached from instance."""
inst = {} inst = {}
inst['image_id'] = 'ami-test' inst['image_id'] = 1
inst['reservation_id'] = 'r-fakeres' inst['reservation_id'] = 'r-fakeres'
inst['launch_time'] = '10' inst['launch_time'] = '10'
inst['user_id'] = 'fake' inst['user_id'] = 'fake'

View File

@@ -372,16 +372,28 @@ class XenAPIMigrateInstance(test.TestCase):
db_fakes.stub_out_db_instance_api(self.stubs) db_fakes.stub_out_db_instance_api(self.stubs)
stubs.stub_out_get_target(self.stubs) stubs.stub_out_get_target(self.stubs)
xenapi_fake.reset() xenapi_fake.reset()
self.manager = manager.AuthManager()
self.user = self.manager.create_user('fake', 'fake', 'fake',
admin=True)
self.project = self.manager.create_project('fake', 'fake', 'fake')
self.values = {'name': 1, 'id': 1, self.values = {'name': 1, 'id': 1,
'project_id': 'fake', 'project_id': self.project.id,
'user_id': 'fake', 'user_id': self.user.id,
'image_id': 1, 'image_id': 1,
'kernel_id': 2, 'kernel_id': None,
'ramdisk_id': 3, 'ramdisk_id': None,
'instance_type': 'm1.large', 'instance_type': 'm1.large',
'mac_address': 'aa:bb:cc:dd:ee:ff', 'mac_address': 'aa:bb:cc:dd:ee:ff',
} }
stubs.stub_out_migration_methods(self.stubs) stubs.stub_out_migration_methods(self.stubs)
glance_stubs.stubout_glance_client(self.stubs,
glance_stubs.FakeGlance)
def tearDown(self):
super(XenAPIMigrateInstance, self).tearDown()
self.manager.delete_project(self.project)
self.manager.delete_user(self.user)
self.stubs.UnsetAll()
def test_migrate_disk_and_power_off(self): def test_migrate_disk_and_power_off(self):
instance = db.instance_create(self.values) instance = db.instance_create(self.values)
@@ -389,11 +401,11 @@ class XenAPIMigrateInstance(test.TestCase):
conn = xenapi_conn.get_connection(False) conn = xenapi_conn.get_connection(False)
conn.migrate_disk_and_power_off(instance, '127.0.0.1') conn.migrate_disk_and_power_off(instance, '127.0.0.1')
def test_attach_disk(self): def test_finish_resize(self):
instance = db.instance_create(self.values) instance = db.instance_create(self.values)
stubs.stubout_session(self.stubs, stubs.FakeSessionForMigrationTests) stubs.stubout_session(self.stubs, stubs.FakeSessionForMigrationTests)
conn = xenapi_conn.get_connection(False) conn = xenapi_conn.get_connection(False)
conn.attach_disk(instance, {'base_copy': 'hurr', 'cow': 'durr'}) conn.finish_resize(instance, dict(base_copy='hurr', cow='durr'))
class XenAPIDetermineDiskImageTestCase(test.TestCase): class XenAPIDetermineDiskImageTestCase(test.TestCase):

172
nova/tests/test_zones.py Normal file
View File

@@ -0,0 +1,172 @@
# Copyright 2010 United States Government as represented by the
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Tests For ZoneManager
"""
import datetime
import mox
import novaclient
from nova import context
from nova import db
from nova import flags
from nova import service
from nova import test
from nova import rpc
from nova import utils
from nova.auth import manager as auth_manager
from nova.scheduler import zone_manager
FLAGS = flags.FLAGS
class FakeZone:
"""Represents a fake zone from the db"""
def __init__(self, *args, **kwargs):
for k, v in kwargs.iteritems():
setattr(self, k, v)
def exploding_novaclient(zone):
"""Used when we want to simulate a novaclient call failing."""
raise Exception("kaboom")
class ZoneManagerTestCase(test.TestCase):
"""Test case for zone manager"""
def test_ping(self):
zm = zone_manager.ZoneManager()
self.mox.StubOutWithMock(zm, '_refresh_from_db')
self.mox.StubOutWithMock(zm, '_poll_zones')
zm._refresh_from_db(mox.IgnoreArg())
zm._poll_zones(mox.IgnoreArg())
self.mox.ReplayAll()
zm.ping(None)
self.mox.VerifyAll()
def test_refresh_from_db_new(self):
zm = zone_manager.ZoneManager()
self.mox.StubOutWithMock(db, 'zone_get_all')
db.zone_get_all(mox.IgnoreArg()).AndReturn([
FakeZone(id=1, api_url='http://foo.com', username='user1',
password='pass1'),
])
self.assertEquals(len(zm.zone_states), 0)
self.mox.ReplayAll()
zm._refresh_from_db(None)
self.mox.VerifyAll()
self.assertEquals(len(zm.zone_states), 1)
self.assertEquals(zm.zone_states[1].username, 'user1')
def test_refresh_from_db_replace_existing(self):
zm = zone_manager.ZoneManager()
zone_state = zone_manager.ZoneState()
zone_state.update_credentials(FakeZone(id=1, api_url='http://foo.com',
username='user1', password='pass1'))
zm.zone_states[1] = zone_state
self.mox.StubOutWithMock(db, 'zone_get_all')
db.zone_get_all(mox.IgnoreArg()).AndReturn([
FakeZone(id=1, api_url='http://foo.com', username='user2',
password='pass2'),
])
self.assertEquals(len(zm.zone_states), 1)
self.mox.ReplayAll()
zm._refresh_from_db(None)
self.mox.VerifyAll()
self.assertEquals(len(zm.zone_states), 1)
self.assertEquals(zm.zone_states[1].username, 'user2')
def test_refresh_from_db_missing(self):
zm = zone_manager.ZoneManager()
zone_state = zone_manager.ZoneState()
zone_state.update_credentials(FakeZone(id=1, api_url='http://foo.com',
username='user1', password='pass1'))
zm.zone_states[1] = zone_state
self.mox.StubOutWithMock(db, 'zone_get_all')
db.zone_get_all(mox.IgnoreArg()).AndReturn([])
self.assertEquals(len(zm.zone_states), 1)
self.mox.ReplayAll()
zm._refresh_from_db(None)
self.mox.VerifyAll()
self.assertEquals(len(zm.zone_states), 0)
def test_refresh_from_db_add_and_delete(self):
zm = zone_manager.ZoneManager()
zone_state = zone_manager.ZoneState()
zone_state.update_credentials(FakeZone(id=1, api_url='http://foo.com',
username='user1', password='pass1'))
zm.zone_states[1] = zone_state
self.mox.StubOutWithMock(db, 'zone_get_all')
db.zone_get_all(mox.IgnoreArg()).AndReturn([
FakeZone(id=2, api_url='http://foo.com', username='user2',
password='pass2'),
])
self.assertEquals(len(zm.zone_states), 1)
self.mox.ReplayAll()
zm._refresh_from_db(None)
self.mox.VerifyAll()
self.assertEquals(len(zm.zone_states), 1)
self.assertEquals(zm.zone_states[2].username, 'user2')
def test_poll_zone(self):
self.mox.StubOutWithMock(zone_manager, '_call_novaclient')
zone_manager._call_novaclient(mox.IgnoreArg()).AndReturn(
dict(name='zohan', capabilities='hairdresser'))
zone_state = zone_manager.ZoneState()
zone_state.update_credentials(FakeZone(id=2,
api_url='http://foo.com', username='user2',
password='pass2'))
zone_state.attempt = 1
self.mox.ReplayAll()
zone_manager._poll_zone(zone_state)
self.mox.VerifyAll()
self.assertEquals(zone_state.attempt, 0)
self.assertEquals(zone_state.name, 'zohan')
def test_poll_zone_fails(self):
self.stubs.Set(zone_manager, "_call_novaclient", exploding_novaclient)
zone_state = zone_manager.ZoneState()
zone_state.update_credentials(FakeZone(id=2,
api_url='http://foo.com', username='user2',
password='pass2'))
zone_state.attempt = FLAGS.zone_failures_to_offline - 1
self.mox.ReplayAll()
zone_manager._poll_zone(zone_state)
self.mox.VerifyAll()
self.assertEquals(zone_state.attempt, 3)
self.assertFalse(zone_state.is_active)
self.assertEquals(zone_state.name, None)

View File

@@ -225,6 +225,18 @@ class FakeSessionForMigrationTests(fake.SessionBase):
def __init__(self, uri): def __init__(self, uri):
super(FakeSessionForMigrationTests, self).__init__(uri) super(FakeSessionForMigrationTests, self).__init__(uri)
def VDI_get_by_uuid(*args):
return 'hurr'
def VM_start(self, _1, ref, _2, _3):
vm = fake.get_record('VM', ref)
if vm['power_state'] != 'Halted':
raise fake.Failure(['VM_BAD_POWER_STATE', ref, 'Halted',
vm['power_state']])
vm['power_state'] = 'Running'
vm['is_a_template'] = False
vm['is_control_domain'] = False
def stub_out_migration_methods(stubs): def stub_out_migration_methods(stubs):
def fake_get_snapshot(self, instance): def fake_get_snapshot(self, instance):
@@ -251,6 +263,9 @@ def stub_out_migration_methods(stubs):
def fake_destroy(*args, **kwargs): def fake_destroy(*args, **kwargs):
pass pass
def fake_reset_network(*args, **kwargs):
pass
stubs.Set(vmops.VMOps, '_destroy', fake_destroy) stubs.Set(vmops.VMOps, '_destroy', fake_destroy)
stubs.Set(vm_utils.VMHelper, 'scan_default_sr', fake_sr) stubs.Set(vm_utils.VMHelper, 'scan_default_sr', fake_sr)
stubs.Set(vm_utils.VMHelper, 'scan_sr', fake_sr) stubs.Set(vm_utils.VMHelper, 'scan_sr', fake_sr)
@@ -258,4 +273,5 @@ def stub_out_migration_methods(stubs):
stubs.Set(vm_utils.VMHelper, 'get_vdi_for_vm_safely', fake_get_vdi) stubs.Set(vm_utils.VMHelper, 'get_vdi_for_vm_safely', fake_get_vdi)
stubs.Set(xenapi_conn.XenAPISession, 'wait_for_task', lambda x, y, z: None) stubs.Set(xenapi_conn.XenAPISession, 'wait_for_task', lambda x, y, z: None)
stubs.Set(vm_utils.VMHelper, 'get_sr_path', fake_get_sr_path) stubs.Set(vm_utils.VMHelper, 'get_sr_path', fake_get_sr_path)
stubs.Set(vmops.VMOps, 'reset_network', fake_reset_network)
stubs.Set(vmops.VMOps, '_shutdown', fake_shutdown) stubs.Set(vmops.VMOps, '_shutdown', fake_shutdown)

View File

@@ -23,10 +23,14 @@ System-level utilities and helper functions.
import base64 import base64
import datetime import datetime
import functools
import inspect import inspect
import json import json
import lockfile
import netaddr
import os import os
import random import random
import re
import socket import socket
import string import string
import struct import struct
@@ -34,20 +38,20 @@ import sys
import time import time
import types import types
from xml.sax import saxutils from xml.sax import saxutils
import re
import netaddr
from eventlet import event from eventlet import event
from eventlet import greenthread from eventlet import greenthread
from eventlet.green import subprocess from eventlet.green import subprocess
None
from nova import exception from nova import exception
from nova.exception import ProcessExecutionError from nova.exception import ProcessExecutionError
from nova import flags
from nova import log as logging from nova import log as logging
LOG = logging.getLogger("nova.utils") LOG = logging.getLogger("nova.utils")
TIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ" TIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ"
FLAGS = flags.FLAGS
def import_class(import_str): def import_class(import_str):
@@ -125,40 +129,59 @@ def fetchfile(url, target):
# c.perform() # c.perform()
# c.close() # c.close()
# fp.close() # fp.close()
execute("curl --fail %s -o %s" % (url, target)) execute("curl", "--fail", url, "-o", target)
def execute(cmd, process_input=None, addl_env=None, check_exit_code=True): def execute(*cmd, **kwargs):
LOG.debug(_("Running cmd (subprocess): %s"), cmd) process_input = kwargs.get('process_input', None)
env = os.environ.copy() addl_env = kwargs.get('addl_env', None)
if addl_env: check_exit_code = kwargs.get('check_exit_code', 0)
env.update(addl_env) stdin = kwargs.get('stdin', subprocess.PIPE)
obj = subprocess.Popen(cmd, shell=True, stdin=subprocess.PIPE, stdout = kwargs.get('stdout', subprocess.PIPE)
stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env) stderr = kwargs.get('stderr', subprocess.PIPE)
result = None attempts = kwargs.get('attempts', 1)
if process_input != None: cmd = map(str, cmd)
result = obj.communicate(process_input)
else: while attempts > 0:
result = obj.communicate() attempts -= 1
obj.stdin.close() try:
if obj.returncode: LOG.debug(_("Running cmd (subprocess): %s"), ' '.join(cmd))
LOG.debug(_("Result was %s") % obj.returncode) env = os.environ.copy()
if check_exit_code and obj.returncode != 0: if addl_env:
(stdout, stderr) = result env.update(addl_env)
raise ProcessExecutionError(exit_code=obj.returncode, obj = subprocess.Popen(cmd, stdin=stdin,
stdout=stdout, stdout=stdout, stderr=stderr, env=env)
stderr=stderr, result = None
cmd=cmd) if process_input != None:
# NOTE(termie): this appears to be necessary to let the subprocess call result = obj.communicate(process_input)
# clean something up in between calls, without it two else:
# execute calls in a row hangs the second one result = obj.communicate()
greenthread.sleep(0) obj.stdin.close()
return result if obj.returncode:
LOG.debug(_("Result was %s") % obj.returncode)
if type(check_exit_code) == types.IntType \
and obj.returncode != check_exit_code:
(stdout, stderr) = result
raise ProcessExecutionError(exit_code=obj.returncode,
stdout=stdout,
stderr=stderr,
cmd=' '.join(cmd))
# NOTE(termie): this appears to be necessary to let the subprocess
# call clean something up in between calls, without
# it two execute calls in a row hangs the second one
greenthread.sleep(0)
return result
except ProcessExecutionError:
if not attempts:
raise
else:
LOG.debug(_("%r failed. Retrying."), cmd)
greenthread.sleep(random.randint(20, 200) / 100.0)
def ssh_execute(ssh, cmd, process_input=None, def ssh_execute(ssh, cmd, process_input=None,
addl_env=None, check_exit_code=True): addl_env=None, check_exit_code=True):
LOG.debug(_("Running cmd (SSH): %s"), cmd) LOG.debug(_("Running cmd (SSH): %s"), ' '.join(cmd))
if addl_env: if addl_env:
raise exception.Error("Environment not supported over SSH") raise exception.Error("Environment not supported over SSH")
@@ -187,7 +210,7 @@ def ssh_execute(ssh, cmd, process_input=None,
raise exception.ProcessExecutionError(exit_code=exit_status, raise exception.ProcessExecutionError(exit_code=exit_status,
stdout=stdout, stdout=stdout,
stderr=stderr, stderr=stderr,
cmd=cmd) cmd=' '.join(cmd))
return (stdout, stderr) return (stdout, stderr)
@@ -220,9 +243,9 @@ def debug(arg):
return arg return arg
def runthis(prompt, cmd, check_exit_code=True): def runthis(prompt, *cmd, **kwargs):
LOG.debug(_("Running %s"), (cmd)) LOG.debug(_("Running %s"), (" ".join(cmd)))
rv, err = execute(cmd, check_exit_code=check_exit_code) rv, err = execute(*cmd, **kwargs)
def generate_uid(topic, size=8): def generate_uid(topic, size=8):
@@ -254,7 +277,7 @@ def last_octet(address):
def get_my_linklocal(interface): def get_my_linklocal(interface):
try: try:
if_str = execute("ip -f inet6 -o addr show %s" % interface) if_str = execute("ip", "-f", "inet6", "-o", "addr", "show", interface)
condition = "\s+inet6\s+([0-9a-f:]+)/\d+\s+scope\s+link" condition = "\s+inet6\s+([0-9a-f:]+)/\d+\s+scope\s+link"
links = [re.search(condition, x) for x in if_str[0].split('\n')] links = [re.search(condition, x) for x in if_str[0].split('\n')]
address = [w.group(1) for w in links if w is not None] address = [w.group(1) for w in links if w is not None]
@@ -491,6 +514,18 @@ def loads(s):
return json.loads(s) return json.loads(s)
def synchronized(name):
def wrap(f):
@functools.wraps(f)
def inner(*args, **kwargs):
lock = lockfile.FileLock(os.path.join(FLAGS.lock_path,
'nova-%s.lock' % name))
with lock:
return f(*args, **kwargs)
return inner
return wrap
def ensure_b64_encoding(val): def ensure_b64_encoding(val):
"""Safety method to ensure that values expected to be base64-encoded """Safety method to ensure that values expected to be base64-encoded
actually are. If they are, the value is returned unchanged. Otherwise, actually are. If they are, the value is returned unchanged. Otherwise,

View File

@@ -36,6 +36,7 @@ import webob.exc
from paste import deploy from paste import deploy
from nova import exception
from nova import flags from nova import flags
from nova import log as logging from nova import log as logging
from nova import utils from nova import utils
@@ -82,6 +83,35 @@ class Server(object):
log=WritableLogger(logger)) log=WritableLogger(logger))
class Request(webob.Request):
def best_match_content_type(self):
"""
Determine the most acceptable content-type based on the
query extension then the Accept header
"""
parts = self.path.rsplit(".", 1)
if len(parts) > 1:
format = parts[1]
if format in ["json", "xml"]:
return "application/{0}".format(parts[1])
ctypes = ["application/json", "application/xml"]
bm = self.accept.best_match(ctypes)
return bm or "application/json"
def get_content_type(self):
try:
ct = self.headers["Content-Type"]
assert ct in ("application/xml", "application/json")
return ct
except Exception:
raise webob.exc.HTTPBadRequest("Invalid content type")
class Application(object): class Application(object):
"""Base WSGI application wrapper. Subclasses need to implement __call__.""" """Base WSGI application wrapper. Subclasses need to implement __call__."""
@@ -113,7 +143,7 @@ class Application(object):
def __call__(self, environ, start_response): def __call__(self, environ, start_response):
r"""Subclasses will probably want to implement __call__ like this: r"""Subclasses will probably want to implement __call__ like this:
@webob.dec.wsgify @webob.dec.wsgify(RequestClass=Request)
def __call__(self, req): def __call__(self, req):
# Any of the following objects work as responses: # Any of the following objects work as responses:
@@ -199,7 +229,7 @@ class Middleware(Application):
"""Do whatever you'd like to the response.""" """Do whatever you'd like to the response."""
return response return response
@webob.dec.wsgify @webob.dec.wsgify(RequestClass=Request)
def __call__(self, req): def __call__(self, req):
response = self.process_request(req) response = self.process_request(req)
if response: if response:
@@ -212,7 +242,7 @@ class Debug(Middleware):
"""Helper class that can be inserted into any WSGI application chain """Helper class that can be inserted into any WSGI application chain
to get information about the request and response.""" to get information about the request and response."""
@webob.dec.wsgify @webob.dec.wsgify(RequestClass=Request)
def __call__(self, req): def __call__(self, req):
print ("*" * 40) + " REQUEST ENVIRON" print ("*" * 40) + " REQUEST ENVIRON"
for key, value in req.environ.items(): for key, value in req.environ.items():
@@ -276,7 +306,7 @@ class Router(object):
self._router = routes.middleware.RoutesMiddleware(self._dispatch, self._router = routes.middleware.RoutesMiddleware(self._dispatch,
self.map) self.map)
@webob.dec.wsgify @webob.dec.wsgify(RequestClass=Request)
def __call__(self, req): def __call__(self, req):
""" """
Route the incoming request to a controller based on self.map. Route the incoming request to a controller based on self.map.
@@ -285,7 +315,7 @@ class Router(object):
return self._router return self._router
@staticmethod @staticmethod
@webob.dec.wsgify @webob.dec.wsgify(RequestClass=Request)
def _dispatch(req): def _dispatch(req):
""" """
Called by self._router after matching the incoming request to a route Called by self._router after matching the incoming request to a route
@@ -304,11 +334,11 @@ class Controller(object):
WSGI app that reads routing information supplied by RoutesMiddleware WSGI app that reads routing information supplied by RoutesMiddleware
and calls the requested action method upon itself. All action methods and calls the requested action method upon itself. All action methods
must, in addition to their normal parameters, accept a 'req' argument must, in addition to their normal parameters, accept a 'req' argument
which is the incoming webob.Request. They raise a webob.exc exception, which is the incoming wsgi.Request. They raise a webob.exc exception,
or return a dict which will be serialized by requested content type. or return a dict which will be serialized by requested content type.
""" """
@webob.dec.wsgify @webob.dec.wsgify(RequestClass=Request)
def __call__(self, req): def __call__(self, req):
""" """
Call the method specified in req.environ by RoutesMiddleware. Call the method specified in req.environ by RoutesMiddleware.
@@ -318,32 +348,45 @@ class Controller(object):
method = getattr(self, action) method = getattr(self, action)
del arg_dict['controller'] del arg_dict['controller']
del arg_dict['action'] del arg_dict['action']
if 'format' in arg_dict:
del arg_dict['format']
arg_dict['req'] = req arg_dict['req'] = req
result = method(**arg_dict) result = method(**arg_dict)
if type(result) is dict: if type(result) is dict:
return self._serialize(result, req) content_type = req.best_match_content_type()
body = self._serialize(result, content_type)
response = webob.Response()
response.headers["Content-Type"] = content_type
response.body = body
return response
else: else:
return result return result
def _serialize(self, data, request): def _serialize(self, data, content_type):
""" """
Serialize the given dict to the response type requested in request. Serialize the given dict to the provided content_type.
Uses self._serialization_metadata if it exists, which is a dict mapping Uses self._serialization_metadata if it exists, which is a dict mapping
MIME types to information needed to serialize to that type. MIME types to information needed to serialize to that type.
""" """
_metadata = getattr(type(self), "_serialization_metadata", {}) _metadata = getattr(type(self), "_serialization_metadata", {})
serializer = Serializer(request.environ, _metadata) serializer = Serializer(_metadata)
return serializer.to_content_type(data) try:
return serializer.serialize(data, content_type)
except exception.InvalidContentType:
raise webob.exc.HTTPNotAcceptable()
def _deserialize(self, data, request): def _deserialize(self, data, content_type):
""" """
Deserialize the request body to the response type requested in request. Deserialize the request body to the specefied content type.
Uses self._serialization_metadata if it exists, which is a dict mapping Uses self._serialization_metadata if it exists, which is a dict mapping
MIME types to information needed to serialize to that type. MIME types to information needed to serialize to that type.
""" """
_metadata = getattr(type(self), "_serialization_metadata", {}) _metadata = getattr(type(self), "_serialization_metadata", {})
serializer = Serializer(request.environ, _metadata) serializer = Serializer(_metadata)
return serializer.deserialize(data) return serializer.deserialize(data, content_type)
class Serializer(object): class Serializer(object):
@@ -351,50 +394,52 @@ class Serializer(object):
Serializes and deserializes dictionaries to certain MIME types. Serializes and deserializes dictionaries to certain MIME types.
""" """
def __init__(self, environ, metadata=None): def __init__(self, metadata=None):
""" """
Create a serializer based on the given WSGI environment. Create a serializer based on the given WSGI environment.
'metadata' is an optional dict mapping MIME types to information 'metadata' is an optional dict mapping MIME types to information
needed to serialize a dictionary to that type. needed to serialize a dictionary to that type.
""" """
self.metadata = metadata or {} self.metadata = metadata or {}
req = webob.Request.blank('', environ)
suffix = req.path_info.split('.')[-1].lower()
if suffix == 'json':
self.handler = self._to_json
elif suffix == 'xml':
self.handler = self._to_xml
elif 'application/json' in req.accept:
self.handler = self._to_json
elif 'application/xml' in req.accept:
self.handler = self._to_xml
else:
# This is the default
self.handler = self._to_json
def to_content_type(self, data): def _get_serialize_handler(self, content_type):
handlers = {
"application/json": self._to_json,
"application/xml": self._to_xml,
}
try:
return handlers[content_type]
except Exception:
raise exception.InvalidContentType()
def serialize(self, data, content_type):
""" """
Serialize a dictionary into a string. Serialize a dictionary into a string of the specified content type.
The format of the string will be decided based on the Content Type
requested in self.environ: by Accept: header, or by URL suffix.
""" """
return self.handler(data) return self._get_serialize_handler(content_type)(data)
def deserialize(self, datastring): def deserialize(self, datastring, content_type):
""" """
Deserialize a string to a dictionary. Deserialize a string to a dictionary.
The string must be in the format of a supported MIME type. The string must be in the format of a supported MIME type.
""" """
datastring = datastring.strip() return self.get_deserialize_handler(content_type)(datastring)
def get_deserialize_handler(self, content_type):
handlers = {
"application/json": self._from_json,
"application/xml": self._from_xml,
}
try: try:
is_xml = (datastring[0] == '<') return handlers[content_type]
if not is_xml: except Exception:
return utils.loads(datastring) raise exception.InvalidContentType()
return self._from_xml(datastring)
except: def _from_json(self, datastring):
return None return utils.loads(datastring)
def _from_xml(self, datastring): def _from_xml(self, datastring):
xmldata = self.metadata.get('application/xml', {}) xmldata = self.metadata.get('application/xml', {})