Merge trunk (naïvely).

This commit is contained in:
Soren Hansen 2010-12-17 13:07:43 +01:00
commit 2be7a90e54
54 changed files with 785 additions and 1326 deletions

View File

@ -6,6 +6,7 @@ Chris Behrens <cbehrens@codestud.com>
Chmouel Boudjnah <chmouel@chmouel.com>
Dean Troyer <dtroyer@gmail.com>
Devin Carlen <devin.carlen@gmail.com>
Eldar Nugaev <enugaev@griddynamics.com>
Eric Day <eday@oddments.org>
Ewan Mellor <ewan.mellor@citrix.com>
Hisaki Ohara <hisaki.ohara@intel.com>

View File

@ -17,10 +17,10 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Nova API daemon.
"""
"""Starter script for Nova API."""
import gettext
import os
import sys
@ -32,9 +32,13 @@ possible_topdir = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]),
if os.path.exists(os.path.join(possible_topdir, 'nova', '__init__.py')):
sys.path.insert(0, possible_topdir)
gettext.install('nova', unicode=1)
from nova import api
from nova import flags
from nova import utils
from nova import server
from nova import wsgi
FLAGS = flags.FLAGS
flags.DEFINE_integer('osapi_port', 8774, 'OpenStack API port')
@ -43,15 +47,10 @@ flags.DEFINE_integer('ec2api_port', 8773, 'EC2 API port')
flags.DEFINE_string('ec2api_host', '0.0.0.0', 'EC2 API host')
def main(_args):
from nova import api
from nova import wsgi
if __name__ == '__main__':
utils.default_flagfile()
FLAGS(sys.argv)
server = wsgi.Server()
server.start(api.API('os'), FLAGS.osapi_port, host=FLAGS.osapi_host)
server.start(api.API('ec2'), FLAGS.ec2api_port, host=FLAGS.ec2api_host)
server.wait()
if __name__ == '__main__':
utils.default_flagfile()
server.serve('nova-api', main)

65
bin/nova-combined Executable file
View File

@ -0,0 +1,65 @@
#!/usr/bin/env python
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Combined starter script for Nova services."""
import eventlet
eventlet.monkey_patch()
import os
import sys
# If ../nova/__init__.py exists, add ../ to Python search path, so that
# it will override what happens to be installed in /usr/(local/)lib/python...
possible_topdir = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]),
os.pardir,
os.pardir))
if os.path.exists(os.path.join(possible_topdir, 'nova', '__init__.py')):
sys.path.insert(0, possible_topdir)
from nova import api
from nova import flags
from nova import service
from nova import utils
from nova import wsgi
FLAGS = flags.FLAGS
flags.DEFINE_integer('osapi_port', 8774, 'OpenStack API port')
flags.DEFINE_string('osapi_host', '0.0.0.0', 'OpenStack API host')
flags.DEFINE_integer('ec2api_port', 8773, 'EC2 API port')
flags.DEFINE_string('ec2api_host', '0.0.0.0', 'EC2 API host')
if __name__ == '__main__':
utils.default_flagfile()
FLAGS(sys.argv)
compute = service.Service.create(binary='nova-compute')
network = service.Service.create(binary='nova-network')
volume = service.Service.create(binary='nova-volume')
scheduler = service.Service.create(binary='nova-scheduler')
#objectstore = service.Service.create(binary='nova-objectstore')
service.serve(compute, network, volume, scheduler)
server = wsgi.Server()
server.start(api.API('os'), FLAGS.osapi_port, host=FLAGS.osapi_host)
server.start(api.API('ec2'), FLAGS.ec2api_port, host=FLAGS.ec2api_host)
server.wait()

View File

@ -17,10 +17,12 @@
# License for the specific language governing permissions and limitations
# under the License.
"""
Twistd daemon for the nova compute nodes.
"""
"""Starter script for Nova Compute."""
import eventlet
eventlet.monkey_patch()
import gettext
import os
import sys
@ -32,14 +34,12 @@ possible_topdir = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]),
if os.path.exists(os.path.join(possible_topdir, 'nova', '__init__.py')):
sys.path.insert(0, possible_topdir)
from nova import service
from nova import twistd
from nova import utils
gettext.install('nova', unicode=1)
from nova import service
from nova import utils
if __name__ == '__main__':
utils.default_flagfile()
twistd.serve(__file__)
if __name__ == '__builtin__':
application = service.Service.create() # pylint: disable=C0103
service.serve()
service.wait()

View File

@ -21,6 +21,7 @@
Handle lease database updates from DHCP servers.
"""
import gettext
import logging
import os
import sys
@ -33,6 +34,8 @@ possible_topdir = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]),
if os.path.exists(os.path.join(possible_topdir, 'nova', '__init__.py')):
sys.path.insert(0, possible_topdir)
gettext.install('nova', unicode=1)
from nova import context
from nova import db
from nova import flags

View File

@ -21,6 +21,7 @@
Download images from Canonical Image Store
"""
import gettext
import json
import os
import tempfile
@ -37,6 +38,8 @@ possible_topdir = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]),
if os.path.exists(os.path.join(possible_topdir, 'nova', '__init__.py')):
sys.path.insert(0, possible_topdir)
gettext.install('nova', unicode=1)
from nova import flags
from nova import utils
from nova.objectstore import image

View File

@ -21,6 +21,7 @@
Daemon for Nova RRD based instance resource monitoring.
"""
import gettext
import os
import logging
import sys
@ -34,6 +35,8 @@ possible_topdir = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]),
if os.path.exists(os.path.join(possible_topdir, 'nova', '__init__.py')):
sys.path.insert(0, possible_topdir)
gettext.install('nova', unicode=1)
from nova import utils
from nova import twistd
from nova.compute import monitor

View File

@ -53,6 +53,7 @@
CLI interface for nova management.
"""
import gettext
import logging
import os
import sys
@ -68,6 +69,8 @@ possible_topdir = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]),
if os.path.exists(os.path.join(possible_topdir, 'nova', '__init__.py')):
sys.path.insert(0, possible_topdir)
gettext.install('nova', unicode=1)
from nova import context
from nova import db
from nova import exception

View File

@ -17,10 +17,12 @@
# License for the specific language governing permissions and limitations
# under the License.
"""
Twistd daemon for the nova network nodes.
"""
"""Starter script for Nova Network."""
import eventlet
eventlet.monkey_patch()
import gettext
import os
import sys
@ -32,14 +34,12 @@ possible_topdir = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]),
if os.path.exists(os.path.join(possible_topdir, 'nova', '__init__.py')):
sys.path.insert(0, possible_topdir)
from nova import service
from nova import twistd
from nova import utils
gettext.install('nova', unicode=1)
from nova import service
from nova import utils
if __name__ == '__main__':
utils.default_flagfile()
twistd.serve(__file__)
if __name__ == '__builtin__':
application = service.Service.create() # pylint: disable-msg=C0103
service.serve()
service.wait()

View File

@ -21,6 +21,7 @@
Twisted daemon for nova objectstore. Supports S3 API.
"""
import gettext
import os
import sys
@ -32,6 +33,8 @@ possible_topdir = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]),
if os.path.exists(os.path.join(possible_topdir, 'nova', '__init__.py')):
sys.path.insert(0, possible_topdir)
gettext.install('nova', unicode=1)
from nova import flags
from nova import utils
from nova import twistd

View File

@ -17,10 +17,12 @@
# License for the specific language governing permissions and limitations
# under the License.
"""
Twistd daemon for the nova scheduler nodes.
"""
"""Starter script for Nova Scheduler."""
import eventlet
eventlet.monkey_patch()
import gettext
import os
import sys
@ -32,14 +34,12 @@ possible_topdir = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]),
if os.path.exists(os.path.join(possible_topdir, 'nova', '__init__.py')):
sys.path.insert(0, possible_topdir)
from nova import service
from nova import twistd
from nova import utils
gettext.install('nova', unicode=1)
from nova import service
from nova import utils
if __name__ == '__main__':
utils.default_flagfile()
twistd.serve(__file__)
if __name__ == '__builtin__':
application = service.Service.create()
service.serve()
service.wait()

View File

@ -17,10 +17,12 @@
# License for the specific language governing permissions and limitations
# under the License.
"""
Twistd daemon for the nova volume nodes.
"""
"""Starter script for Nova Volume."""
import eventlet
eventlet.monkey_patch()
import gettext
import os
import sys
@ -32,14 +34,12 @@ possible_topdir = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]),
if os.path.exists(os.path.join(possible_topdir, 'nova', '__init__.py')):
sys.path.insert(0, possible_topdir)
from nova import service
from nova import twistd
from nova import utils
gettext.install('nova', unicode=1)
from nova import service
from nova import utils
if __name__ == '__main__':
utils.default_flagfile()
twistd.serve(__file__)
if __name__ == '__builtin__':
application = service.Service.create() # pylint: disable-msg=C0103
service.serve()
service.wait()

View File

@ -72,7 +72,7 @@ fi
# You should only have to run this once
if [ "$CMD" == "install" ]; then
sudo apt-get install -y python-software-properties
sudo add-apt-repository ppa:nova-core/ppa
sudo add-apt-repository ppa:nova-core/trunk
sudo apt-get update
sudo apt-get install -y dnsmasq kpartx kvm gawk iptables ebtables
sudo apt-get install -y user-mode-linux kvm libvirt-bin

View File

@ -26,8 +26,6 @@ import logging
import os
import tempfile
from twisted.internet import defer
from nova import exception
from nova import flags
@ -39,7 +37,6 @@ flags.DEFINE_integer('block_size', 1024 * 1024 * 256,
'block_size to use for dd')
@defer.inlineCallbacks
def partition(infile, outfile, local_bytes=0, resize=True,
local_type='ext2', execute=None):
"""
@ -64,10 +61,10 @@ def partition(infile, outfile, local_bytes=0, resize=True,
file_size = os.path.getsize(infile)
if resize and file_size < FLAGS.minimum_root_size:
last_sector = FLAGS.minimum_root_size / sector_size - 1
yield execute('dd if=/dev/zero of=%s count=1 seek=%d bs=%d'
% (infile, last_sector, sector_size))
yield execute('e2fsck -fp %s' % infile, check_exit_code=False)
yield execute('resize2fs %s' % infile)
execute('dd if=/dev/zero of=%s count=1 seek=%d bs=%d'
% (infile, last_sector, sector_size))
execute('e2fsck -fp %s' % infile, check_exit_code=False)
execute('resize2fs %s' % infile)
file_size = FLAGS.minimum_root_size
elif file_size % sector_size != 0:
logging.warn("Input partition size not evenly divisible by"
@ -86,37 +83,34 @@ def partition(infile, outfile, local_bytes=0, resize=True,
last_sector = local_last # e
# create an empty file
yield execute('dd if=/dev/zero of=%s count=1 seek=%d bs=%d'
% (outfile, mbr_last, sector_size))
execute('dd if=/dev/zero of=%s count=1 seek=%d bs=%d'
% (outfile, mbr_last, sector_size))
# make mbr partition
yield execute('parted --script %s mklabel msdos' % outfile)
execute('parted --script %s mklabel msdos' % outfile)
# append primary file
yield execute('dd if=%s of=%s bs=%s conv=notrunc,fsync oflag=append'
% (infile, outfile, FLAGS.block_size))
execute('dd if=%s of=%s bs=%s conv=notrunc,fsync oflag=append'
% (infile, outfile, FLAGS.block_size))
# make primary partition
yield execute('parted --script %s mkpart primary %ds %ds'
% (outfile, primary_first, primary_last))
execute('parted --script %s mkpart primary %ds %ds'
% (outfile, primary_first, primary_last))
if local_bytes > 0:
# make the file bigger
yield execute('dd if=/dev/zero of=%s count=1 seek=%d bs=%d'
% (outfile, last_sector, sector_size))
execute('dd if=/dev/zero of=%s count=1 seek=%d bs=%d'
% (outfile, last_sector, sector_size))
# make and format local partition
yield execute('parted --script %s mkpartfs primary %s %ds %ds'
% (outfile, local_type, local_first, local_last))
execute('parted --script %s mkpartfs primary %s %ds %ds'
% (outfile, local_type, local_first, local_last))
@defer.inlineCallbacks
def extend(image, size, execute):
file_size = os.path.getsize(image)
if file_size >= size:
return
yield execute('truncate -s size %s' % (image,))
return execute('truncate -s size %s' % (image,))
@defer.inlineCallbacks
def inject_data(image, key=None, net=None, partition=None, execute=None):
"""Injects a ssh key and optionally net data into a disk image.
@ -126,14 +120,14 @@ def inject_data(image, key=None, net=None, partition=None, execute=None):
If partition is not specified it mounts the image as a single partition.
"""
out, err = yield execute('sudo losetup --find --show %s' % image)
out, err = execute('sudo losetup --find --show %s' % image)
if err:
raise exception.Error('Could not attach image to loopback: %s' % err)
device = out.strip()
try:
if not partition is None:
# create partition
out, err = yield execute('sudo kpartx -a %s' % device)
out, err = execute('sudo kpartx -a %s' % device)
if err:
raise exception.Error('Failed to load partition: %s' % err)
mapped_device = '/dev/mapper/%sp%s' % (device.split('/')[-1],
@ -149,12 +143,12 @@ def inject_data(image, key=None, net=None, partition=None, execute=None):
mapped_device)
# Configure ext2fs so that it doesn't auto-check every N boots
out, err = yield execute('sudo tune2fs -c 0 -i 0 %s' % mapped_device)
out, err = execute('sudo tune2fs -c 0 -i 0 %s' % mapped_device)
tmpdir = tempfile.mkdtemp()
try:
# mount loopback to dir
out, err = yield execute(
out, err = execute(
'sudo mount %s %s' % (mapped_device, tmpdir))
if err:
raise exception.Error('Failed to mount filesystem: %s' % err)
@ -162,24 +156,23 @@ def inject_data(image, key=None, net=None, partition=None, execute=None):
try:
if key:
# inject key file
yield _inject_key_into_fs(key, tmpdir, execute=execute)
_inject_key_into_fs(key, tmpdir, execute=execute)
if net:
yield _inject_net_into_fs(net, tmpdir, execute=execute)
_inject_net_into_fs(net, tmpdir, execute=execute)
finally:
# unmount device
yield execute('sudo umount %s' % mapped_device)
execute('sudo umount %s' % mapped_device)
finally:
# remove temporary directory
yield execute('rmdir %s' % tmpdir)
execute('rmdir %s' % tmpdir)
if not partition is None:
# remove partitions
yield execute('sudo kpartx -d %s' % device)
execute('sudo kpartx -d %s' % device)
finally:
# remove loopback
yield execute('sudo losetup --detach %s' % device)
execute('sudo losetup --detach %s' % device)
@defer.inlineCallbacks
def _inject_key_into_fs(key, fs, execute=None):
"""Add the given public ssh key to root's authorized_keys.
@ -187,22 +180,21 @@ def _inject_key_into_fs(key, fs, execute=None):
fs is the path to the base of the filesystem into which to inject the key.
"""
sshdir = os.path.join(fs, 'root', '.ssh')
yield execute('sudo mkdir -p %s' % sshdir) # existing dir doesn't matter
yield execute('sudo chown root %s' % sshdir)
yield execute('sudo chmod 700 %s' % sshdir)
execute('sudo mkdir -p %s' % sshdir) # existing dir doesn't matter
execute('sudo chown root %s' % sshdir)
execute('sudo chmod 700 %s' % sshdir)
keyfile = os.path.join(sshdir, 'authorized_keys')
yield execute('sudo tee -a %s' % keyfile, '\n' + key.strip() + '\n')
execute('sudo tee -a %s' % keyfile, '\n' + key.strip() + '\n')
@defer.inlineCallbacks
def _inject_net_into_fs(net, fs, execute=None):
"""Inject /etc/network/interfaces into the filesystem rooted at fs.
net is the contents of /etc/network/interfaces.
"""
netdir = os.path.join(os.path.join(fs, 'etc'), 'network')
yield execute('sudo mkdir -p %s' % netdir) # existing dir doesn't matter
yield execute('sudo chown root:root %s' % netdir)
yield execute('sudo chmod 755 %s' % netdir)
execute('sudo mkdir -p %s' % netdir) # existing dir doesn't matter
execute('sudo chown root:root %s' % netdir)
execute('sudo chmod 755 %s' % netdir)
netfile = os.path.join(netdir, 'interfaces')
yield execute('sudo tee %s' % netfile, net)
execute('sudo tee %s' % netfile, net)

View File

@ -37,8 +37,6 @@ terminating it.
import datetime
import logging
from twisted.internet import defer
from nova import exception
from nova import flags
from nova import manager
@ -78,13 +76,11 @@ class ComputeManager(manager.Manager):
state = power_state.NOSTATE
self.db.instance_set_state(context, instance_id, state)
@defer.inlineCallbacks
@exception.wrap_exception
def refresh_security_group(self, context, security_group_id, **_kwargs):
"""This call passes stright through to the virtualization driver."""
yield self.driver.refresh_security_group(security_group_id)
self.driver.refresh_security_group(security_group_id)
@defer.inlineCallbacks
@exception.wrap_exception
def run_instance(self, context, instance_id, **_kwargs):
"""Launch a new instance with specified options."""
@ -105,7 +101,7 @@ class ComputeManager(manager.Manager):
'spawning')
try:
yield self.driver.spawn(instance_ref)
self.driver.spawn(instance_ref)
now = datetime.datetime.utcnow()
self.db.instance_update(context,
instance_id,
@ -119,7 +115,6 @@ class ComputeManager(manager.Manager):
self._update_state(context, instance_id)
@defer.inlineCallbacks
@exception.wrap_exception
def terminate_instance(self, context, instance_id):
"""Terminate an instance on this machine."""
@ -134,12 +129,11 @@ class ComputeManager(manager.Manager):
self.db.instance_destroy(context, instance_id)
raise exception.Error('trying to destroy already destroyed'
' instance: %s' % instance_id)
yield self.driver.destroy(instance_ref)
self.driver.destroy(instance_ref)
# TODO(ja): should we keep it in a terminated state for a bit?
self.db.instance_destroy(context, instance_id)
@defer.inlineCallbacks
@exception.wrap_exception
def reboot_instance(self, context, instance_id):
"""Reboot an instance on this server."""
@ -159,10 +153,9 @@ class ComputeManager(manager.Manager):
instance_id,
power_state.NOSTATE,
'rebooting')
yield self.driver.reboot(instance_ref)
self.driver.reboot(instance_ref)
self._update_state(context, instance_id)
@defer.inlineCallbacks
@exception.wrap_exception
def rescue_instance(self, context, instance_id):
"""Rescue an instance on this server."""
@ -175,10 +168,9 @@ class ComputeManager(manager.Manager):
instance_id,
power_state.NOSTATE,
'rescuing')
yield self.driver.rescue(instance_ref)
self.driver.rescue(instance_ref)
self._update_state(context, instance_id)
@defer.inlineCallbacks
@exception.wrap_exception
def unrescue_instance(self, context, instance_id):
"""Rescue an instance on this server."""
@ -191,7 +183,7 @@ class ComputeManager(manager.Manager):
instance_id,
power_state.NOSTATE,
'unrescuing')
yield self.driver.unrescue(instance_ref)
self.driver.unrescue(instance_ref)
self._update_state(context, instance_id)
@exception.wrap_exception
@ -203,7 +195,6 @@ class ComputeManager(manager.Manager):
return self.driver.get_console_output(instance_ref)
@defer.inlineCallbacks
@exception.wrap_exception
def attach_volume(self, context, instance_id, volume_id, mountpoint):
"""Attach a volume to an instance."""
@ -211,12 +202,12 @@ class ComputeManager(manager.Manager):
logging.debug("instance %s: attaching volume %s to %s", instance_id,
volume_id, mountpoint)
instance_ref = self.db.instance_get(context, instance_id)
dev_path = yield self.volume_manager.setup_compute_volume(context,
volume_id)
dev_path = self.volume_manager.setup_compute_volume(context,
volume_id)
try:
yield self.driver.attach_volume(instance_ref['name'],
dev_path,
mountpoint)
self.driver.attach_volume(instance_ref['name'],
dev_path,
mountpoint)
self.db.volume_attached(context,
volume_id,
instance_id,
@ -227,12 +218,12 @@ class ComputeManager(manager.Manager):
# ecxception below.
logging.exception("instance %s: attach failed %s, removing",
instance_id, mountpoint)
yield self.volume_manager.remove_compute_volume(context,
volume_id)
self.volume_manager.remove_compute_volume(context,
volume_id)
raise exc
defer.returnValue(True)
@defer.inlineCallbacks
return True
@exception.wrap_exception
def detach_volume(self, context, instance_id, volume_id):
"""Detach a volume from an instance."""
@ -246,8 +237,8 @@ class ComputeManager(manager.Manager):
logging.warn("Detaching volume from unknown instance %s",
instance_ref['name'])
else:
yield self.driver.detach_volume(instance_ref['name'],
volume_ref['mountpoint'])
yield self.volume_manager.remove_compute_volume(context, volume_id)
self.driver.detach_volume(instance_ref['name'],
volume_ref['mountpoint'])
self.volume_manager.remove_compute_volume(context, volume_id)
self.db.volume_detached(context, volume_id)
defer.returnValue(True)
return True

View File

@ -159,6 +159,7 @@ class StrWrapper(object):
return str(val)
raise KeyError(name)
FLAGS = FlagValues()
gflags.FLAGS = FLAGS
gflags.DEFINE_flag(gflags.HelpFlag(), FLAGS)
@ -183,6 +184,12 @@ DEFINE_list = _wrapper(gflags.DEFINE_list)
DEFINE_spaceseplist = _wrapper(gflags.DEFINE_spaceseplist)
DEFINE_multistring = _wrapper(gflags.DEFINE_multistring)
DEFINE_multi_int = _wrapper(gflags.DEFINE_multi_int)
DEFINE_flag = _wrapper(gflags.DEFINE_flag)
HelpFlag = gflags.HelpFlag
HelpshortFlag = gflags.HelpshortFlag
HelpXMLFlag = gflags.HelpXMLFlag
def DECLARE(name, module_string, flag_values=FLAGS):

View File

@ -55,7 +55,6 @@ from nova import utils
from nova import flags
from nova.db import base
from twisted.internet import defer
FLAGS = flags.FLAGS
@ -67,10 +66,9 @@ class Manager(base.Base):
self.host = host
super(Manager, self).__init__(db_driver)
@defer.inlineCallbacks
def periodic_tasks(self, context=None):
"""Tasks to be run at a periodic interval"""
yield
pass
def init_host(self):
"""Do any initialization that needs to be run if this is a standalone

View File

@ -49,7 +49,6 @@ import logging
import math
import IPy
from twisted.internet import defer
from nova import context
from nova import db
@ -399,10 +398,9 @@ class VlanManager(NetworkManager):
instances in its subnet.
"""
@defer.inlineCallbacks
def periodic_tasks(self, context=None):
"""Tasks to be run at a periodic interval."""
yield super(VlanManager, self).periodic_tasks(context)
super(VlanManager, self).periodic_tasks(context)
now = datetime.datetime.utcnow()
timeout = FLAGS.fixed_ip_disassociate_timeout
time = now - datetime.timedelta(seconds=timeout)

View File

@ -21,7 +21,6 @@ Take uploaded bucket contents and register them as disk images (AMIs).
Requires decryption using keys in the manifest.
"""
# TODO(jesse): Got these from Euca2ools, will need to revisit them
import binascii
import glob
@ -29,7 +28,6 @@ import json
import os
import shutil
import tarfile
import tempfile
from xml.etree import ElementTree
from nova import exception
@ -199,12 +197,17 @@ class Image(object):
except:
ramdisk_id = None
try:
arch = manifest.find("machine_configuration/architecture").text
except:
arch = 'x86_64'
info = {
'imageId': image_id,
'imageLocation': image_location,
'imageOwnerId': context.project_id,
'isPublic': False, # FIXME: grab public from manifest
'architecture': 'x86_64', # FIXME: grab architecture from manifest
'architecture': arch,
'imageType': image_type}
if kernel_id:
@ -264,6 +267,7 @@ class Image(object):
if err:
raise exception.Error("Failed to decrypt initialization "
"vector: %s" % err)
_out, err = utils.execute(
'openssl enc -d -aes-128-cbc -in %s -K %s -iv %s -out %s'
% (encrypted_filename, key, iv, decrypted_filename),

View File

@ -1,209 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# Copyright 2010 FathomDB Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Process pool using twisted threading
"""
import logging
import StringIO
from twisted.internet import defer
from twisted.internet import error
from twisted.internet import protocol
from twisted.internet import reactor
from nova import flags
from nova.exception import ProcessExecutionError
FLAGS = flags.FLAGS
flags.DEFINE_integer('process_pool_size', 4,
'Number of processes to use in the process pool')
# This is based on _BackRelay from twister.internal.utils, but modified to
# capture both stdout and stderr, without odd stderr handling, and also to
# handle stdin
class BackRelayWithInput(protocol.ProcessProtocol):
"""
Trivial protocol for communicating with a process and turning its output
into the result of a L{Deferred}.
@ivar deferred: A L{Deferred} which will be called back with all of stdout
and all of stderr as well (as a tuple). C{terminate_on_stderr} is true
and any bytes are received over stderr, this will fire with an
L{_ProcessExecutionError} instance and the attribute will be set to
C{None}.
@ivar onProcessEnded: If C{terminate_on_stderr} is false and bytes are
received over stderr, this attribute will refer to a L{Deferred} which
will be called back when the process ends. This C{Deferred} is also
associated with the L{_ProcessExecutionError} which C{deferred} fires
with earlier in this case so that users can determine when the process
has actually ended, in addition to knowing when bytes have been
received via stderr.
"""
def __init__(self, deferred, cmd, started_deferred=None,
terminate_on_stderr=False, check_exit_code=True,
process_input=None):
self.deferred = deferred
self.cmd = cmd
self.stdout = StringIO.StringIO()
self.stderr = StringIO.StringIO()
self.started_deferred = started_deferred
self.terminate_on_stderr = terminate_on_stderr
self.check_exit_code = check_exit_code
self.process_input = process_input
self.on_process_ended = None
def _build_execution_error(self, exit_code=None):
return ProcessExecutionError(cmd=self.cmd,
exit_code=exit_code,
stdout=self.stdout.getvalue(),
stderr=self.stderr.getvalue())
def errReceived(self, text):
self.stderr.write(text)
if self.terminate_on_stderr and (self.deferred is not None):
self.on_process_ended = defer.Deferred()
self.deferred.errback(self._build_execution_error())
self.deferred = None
self.transport.loseConnection()
def outReceived(self, text):
self.stdout.write(text)
def processEnded(self, reason):
if self.deferred is not None:
stdout, stderr = self.stdout.getvalue(), self.stderr.getvalue()
exit_code = reason.value.exitCode
if self.check_exit_code and exit_code != 0:
self.deferred.errback(self._build_execution_error(exit_code))
else:
try:
if self.check_exit_code:
reason.trap(error.ProcessDone)
self.deferred.callback((stdout, stderr))
except:
# NOTE(justinsb): This logic is a little suspicious to me.
# If the callback throws an exception, then errback will
# be called also. However, this is what the unit tests
# test for.
exec_error = self._build_execution_error(exit_code)
self.deferred.errback(exec_error)
elif self.on_process_ended is not None:
self.on_process_ended.errback(reason)
def connectionMade(self):
if self.started_deferred:
self.started_deferred.callback(self)
if self.process_input:
self.transport.write(str(self.process_input))
self.transport.closeStdin()
def get_process_output(executable, args=None, env=None, path=None,
process_reactor=None, check_exit_code=True,
process_input=None, started_deferred=None,
terminate_on_stderr=False):
if process_reactor is None:
process_reactor = reactor
args = args and args or ()
env = env and env and {}
deferred = defer.Deferred()
cmd = executable
if args:
cmd = " ".join([cmd] + args)
logging.debug("Running cmd: %s", cmd)
process_handler = BackRelayWithInput(
deferred,
cmd,
started_deferred=started_deferred,
check_exit_code=check_exit_code,
process_input=process_input,
terminate_on_stderr=terminate_on_stderr)
# NOTE(vish): commands come in as unicode, but self.executes needs
# strings or process.spawn raises a deprecation warning
executable = str(executable)
if not args is None:
args = [str(x) for x in args]
process_reactor.spawnProcess(process_handler, executable,
(executable,) + tuple(args), env, path)
return deferred
class ProcessPool(object):
""" A simple process pool implementation using Twisted's Process bits.
This is pretty basic right now, but hopefully the API will be the correct
one so that it can be optimized later.
"""
def __init__(self, size=None):
self.size = size and size or FLAGS.process_pool_size
self._pool = defer.DeferredSemaphore(self.size)
def simple_execute(self, cmd, **kw):
""" Weak emulation of the old utils.execute() function.
This only exists as a way to quickly move old execute methods to
this new style of code.
NOTE(termie): This will break on args with spaces in them.
"""
parsed = cmd.split(' ')
executable, args = parsed[0], parsed[1:]
return self.execute(executable, args, **kw)
def execute(self, *args, **kw):
deferred = self._pool.acquire()
def _associate_process(proto):
deferred.process = proto.transport
return proto.transport
started = defer.Deferred()
started.addCallback(_associate_process)
kw.setdefault('started_deferred', started)
deferred.process = None
deferred.started = started
deferred.addCallback(lambda _: get_process_output(*args, **kw))
deferred.addBoth(self._release)
return deferred
def _release(self, retval=None):
self._pool.release()
return retval
class SharedPool(object):
_instance = None
def __init__(self):
if SharedPool._instance is None:
self.__class__._instance = ProcessPool()
def __getattr__(self, key):
return getattr(self._instance, key)
def simple_execute(cmd, **kwargs):
return SharedPool().simple_execute(cmd, **kwargs)

View File

@ -25,18 +25,18 @@ import json
import logging
import sys
import time
import traceback
import uuid
from carrot import connection as carrot_connection
from carrot import messaging
from eventlet import greenthread
from twisted.internet import defer
from twisted.internet import task
from nova import context
from nova import exception
from nova import fakerabbit
from nova import flags
from nova import context
from nova import utils
FLAGS = flags.FLAGS
@ -128,17 +128,9 @@ class Consumer(messaging.Consumer):
def attach_to_eventlet(self):
"""Only needed for unit tests!"""
def fetch_repeatedly():
while True:
self.fetch(enable_callbacks=True)
greenthread.sleep(0.1)
greenthread.spawn(fetch_repeatedly)
def attach_to_twisted(self):
"""Attach a callback to twisted that fires 10 times a second"""
loop = task.LoopingCall(self.fetch, enable_callbacks=True)
loop.start(interval=0.1)
return loop
timer = utils.LoopingCall(self.fetch, enable_callbacks=True)
timer.start(0.1)
return timer
class Publisher(messaging.Publisher):
@ -196,11 +188,13 @@ class AdapterConsumer(TopicConsumer):
node_func = getattr(self.proxy, str(method))
node_args = dict((str(k), v) for k, v in args.iteritems())
# NOTE(vish): magic is fun!
# pylint: disable-msg=W0142
d = defer.maybeDeferred(node_func, context=ctxt, **node_args)
if msg_id:
d.addCallback(lambda rval: msg_reply(msg_id, rval, None))
d.addErrback(lambda e: msg_reply(msg_id, None, e))
try:
rval = node_func(context=ctxt, **node_args)
if msg_id:
msg_reply(msg_id, rval, None)
except Exception as e:
if msg_id:
msg_reply(msg_id, None, sys.exc_info())
return
@ -242,13 +236,15 @@ class DirectPublisher(Publisher):
def msg_reply(msg_id, reply=None, failure=None):
"""Sends a reply or an error on the channel signified by msg_id
failure should be a twisted failure object"""
failure should be a sys.exc_info() tuple.
"""
if failure:
message = failure.getErrorMessage()
traceback = failure.getTraceback()
message = str(failure[1])
tb = traceback.format_exception(*failure)
logging.error("Returning exception %s to caller", message)
logging.error(traceback)
failure = (failure.type.__name__, str(failure.value), traceback)
logging.error(tb)
failure = (failure[0].__name__, str(failure[1]), tb)
conn = Connection.instance()
publisher = DirectPublisher(connection=conn, msg_id=msg_id)
try:
@ -313,7 +309,6 @@ def call(context, topic, msg):
_pack_context(msg, context)
class WaitMessage(object):
def __call__(self, data, message):
"""Acks message and sets result."""
message.ack()
@ -337,41 +332,15 @@ def call(context, topic, msg):
except StopIteration:
pass
consumer.close()
# NOTE(termie): this is a little bit of a change from the original
# non-eventlet code where returning a Failure
# instance from a deferred call is very similar to
# raising an exception
if isinstance(wait_msg.result, Exception):
raise wait_msg.result
return wait_msg.result
def call_twisted(context, topic, msg):
"""Sends a message on a topic and wait for a response"""
LOG.debug("Making asynchronous call...")
msg_id = uuid.uuid4().hex
msg.update({'_msg_id': msg_id})
LOG.debug("MSG_ID is %s" % (msg_id))
_pack_context(msg, context)
conn = Connection.instance()
d = defer.Deferred()
consumer = DirectConsumer(connection=conn, msg_id=msg_id)
def deferred_receive(data, message):
"""Acks message and callbacks or errbacks"""
message.ack()
if data['failure']:
return d.errback(RemoteError(*data['failure']))
else:
return d.callback(data['result'])
consumer.register_callback(deferred_receive)
injected = consumer.attach_to_twisted()
# clean up after the injected listened and return x
d.addCallback(lambda x: injected.stop() and x or x)
publisher = TopicPublisher(connection=conn, topic=topic)
publisher.send(msg)
publisher.close()
return d
def cast(context, topic, msg):
"""Sends a message on a topic without waiting for a response"""
LOG.debug("Making asynchronous cast...")

View File

@ -1,151 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Base functionality for nova daemons - gradually being replaced with twistd.py.
"""
import daemon
from daemon import pidlockfile
import logging
import logging.handlers
import os
import signal
import sys
import time
from nova import flags
FLAGS = flags.FLAGS
flags.DEFINE_bool('daemonize', False, 'daemonize this process')
# NOTE(termie): right now I am defaulting to using syslog when we daemonize
# it may be better to do something else -shrug-
# NOTE(Devin): I think we should let each process have its own log file
# and put it in /var/logs/nova/(appname).log
# This makes debugging much easier and cuts down on sys log
# clutter.
flags.DEFINE_bool('use_syslog', True, 'output to syslog when daemonizing')
flags.DEFINE_string('logfile', None, 'log file to output to')
flags.DEFINE_string('logdir', None, 'directory to keep log files in '
'(will be prepended to $logfile)')
flags.DEFINE_string('pidfile', None, 'pid file to output to')
flags.DEFINE_string('working_directory', './', 'working directory...')
flags.DEFINE_integer('uid', os.getuid(), 'uid under which to run')
flags.DEFINE_integer('gid', os.getgid(), 'gid under which to run')
def stop(pidfile):
"""
Stop the daemon
"""
# Get the pid from the pidfile
try:
pid = int(open(pidfile, 'r').read().strip())
except IOError:
message = "pidfile %s does not exist. Daemon not running?\n"
sys.stderr.write(message % pidfile)
return
# Try killing the daemon process
try:
while 1:
os.kill(pid, signal.SIGTERM)
time.sleep(0.1)
except OSError, err:
err = str(err)
if err.find("No such process") > 0:
if os.path.exists(pidfile):
os.remove(pidfile)
else:
print str(err)
sys.exit(1)
def serve(name, main):
"""Controller for server"""
argv = FLAGS(sys.argv)
if not FLAGS.pidfile:
FLAGS.pidfile = '%s.pid' % name
logging.debug("Full set of FLAGS: \n\n\n")
for flag in FLAGS:
logging.debug("%s : %s", flag, FLAGS.get(flag, None))
action = 'start'
if len(argv) > 1:
action = argv.pop()
if action == 'stop':
stop(FLAGS.pidfile)
sys.exit()
elif action == 'restart':
stop(FLAGS.pidfile)
elif action == 'start':
pass
else:
print 'usage: %s [options] [start|stop|restart]' % argv[0]
sys.exit(1)
daemonize(argv, name, main)
def daemonize(args, name, main):
"""Does the work of daemonizing the process"""
logging.getLogger('amqplib').setLevel(logging.WARN)
files_to_keep = []
if FLAGS.daemonize:
logger = logging.getLogger()
formatter = logging.Formatter(
name + '(%(name)s): %(levelname)s %(message)s')
if FLAGS.use_syslog and not FLAGS.logfile:
syslog = logging.handlers.SysLogHandler(address='/dev/log')
syslog.setFormatter(formatter)
logger.addHandler(syslog)
files_to_keep.append(syslog.socket)
else:
if not FLAGS.logfile:
FLAGS.logfile = '%s.log' % name
if FLAGS.logdir:
FLAGS.logfile = os.path.join(FLAGS.logdir, FLAGS.logfile)
logfile = logging.FileHandler(FLAGS.logfile)
logfile.setFormatter(formatter)
logger.addHandler(logfile)
files_to_keep.append(logfile.stream)
stdin, stdout, stderr = None, None, None
else:
stdin, stdout, stderr = sys.stdin, sys.stdout, sys.stderr
if FLAGS.verbose:
logging.getLogger().setLevel(logging.DEBUG)
else:
logging.getLogger().setLevel(logging.WARNING)
with daemon.DaemonContext(
detach_process=FLAGS.daemonize,
working_directory=FLAGS.working_directory,
pidfile=pidlockfile.TimeoutPIDLockFile(FLAGS.pidfile,
acquire_timeout=1,
threaded=False),
stdin=stdin,
stdout=stdout,
stderr=stderr,
uid=FLAGS.uid,
gid=FLAGS.gid,
files_preserve=files_to_keep):
main(args)

View File

@ -17,21 +17,17 @@
# under the License.
"""
A service is a very thin wrapper around a Manager object. It exposes the
manager's public methods to other components of the system via rpc. It will
report state periodically to the database and is responsible for initiating
any periodic tasts that need to be executed on a given host.
This module contains Service, a generic baseclass for all workers.
Generic Node baseclass for all workers that run on hosts
"""
import inspect
import logging
import os
import sys
from twisted.internet import defer
from twisted.internet import task
from twisted.application import service
from eventlet import event
from eventlet import greenthread
from eventlet import greenpool
from nova import context
from nova import db
@ -50,8 +46,16 @@ flags.DEFINE_integer('periodic_interval', 60,
'seconds between running periodic tasks',
lower_bound=1)
flags.DEFINE_string('pidfile', None,
'pidfile to use for this service')
class Service(object, service.Service):
flags.DEFINE_flag(flags.HelpFlag())
flags.DEFINE_flag(flags.HelpshortFlag())
flags.DEFINE_flag(flags.HelpXMLFlag())
class Service(object):
"""Base class for workers that run on hosts."""
def __init__(self, host, binary, topic, manager, report_interval=None,
@ -64,8 +68,9 @@ class Service(object, service.Service):
self.periodic_interval = periodic_interval
super(Service, self).__init__(*args, **kwargs)
self.saved_args, self.saved_kwargs = args, kwargs
self.timers = []
def startService(self): # pylint: disable-msg C0103
def start(self):
manager_class = utils.import_class(self.manager_class_name)
self.manager = manager_class(host=self.host, *self.saved_args,
**self.saved_kwargs)
@ -80,26 +85,29 @@ class Service(object, service.Service):
except exception.NotFound:
self._create_service_ref(ctxt)
conn = rpc.Connection.instance()
conn1 = rpc.Connection.instance(new=True)
conn2 = rpc.Connection.instance(new=True)
if self.report_interval:
consumer_all = rpc.AdapterConsumer(
connection=conn,
connection=conn1,
topic=self.topic,
proxy=self)
consumer_node = rpc.AdapterConsumer(
connection=conn,
connection=conn2,
topic='%s.%s' % (self.topic, self.host),
proxy=self)
consumer_all.attach_to_twisted()
consumer_node.attach_to_twisted()
self.timers.append(consumer_all.attach_to_eventlet())
self.timers.append(consumer_node.attach_to_eventlet())
pulse = task.LoopingCall(self.report_state)
pulse = utils.LoopingCall(self.report_state)
pulse.start(interval=self.report_interval, now=False)
self.timers.append(pulse)
if self.periodic_interval:
pulse = task.LoopingCall(self.periodic_tasks)
pulse.start(interval=self.periodic_interval, now=False)
periodic = utils.LoopingCall(self.periodic_tasks)
periodic.start(interval=self.periodic_interval, now=False)
self.timers.append(periodic)
def _create_service_ref(self, context):
service_ref = db.service_create(context,
@ -147,25 +155,28 @@ class Service(object, service.Service):
service_obj = cls(host, binary, topic, manager,
report_interval, periodic_interval)
# This is the parent service that twistd will be looking for when it
# parses this file, return it so that we can get it into globals.
application = service.Application(binary)
service_obj.setServiceParent(application)
return application
return service_obj
def kill(self):
"""Destroy the service object in the datastore"""
self.stop()
try:
db.service_destroy(context.get_admin_context(), self.service_id)
except exception.NotFound:
logging.warn("Service killed that has no database entry")
@defer.inlineCallbacks
def stop(self):
for x in self.timers:
try:
x.stop()
except Exception:
pass
self.timers = []
def periodic_tasks(self):
"""Tasks to be run at a periodic interval"""
yield self.manager.periodic_tasks(context.get_admin_context())
self.manager.periodic_tasks(context.get_admin_context())
@defer.inlineCallbacks
def report_state(self):
"""Update the state of this service in the datastore."""
ctxt = context.get_admin_context()
@ -192,4 +203,32 @@ class Service(object, service.Service):
if not getattr(self, "model_disconnected", False):
self.model_disconnected = True
logging.exception("model server went away")
yield
def serve(*services):
argv = FLAGS(sys.argv)
if not services:
services = [Service.create()]
name = '_'.join(x.binary for x in services)
logging.debug("Serving %s" % name)
logging.getLogger('amqplib').setLevel(logging.WARN)
if FLAGS.verbose:
logging.getLogger().setLevel(logging.DEBUG)
else:
logging.getLogger().setLevel(logging.WARNING)
logging.debug("Full set of FLAGS:")
for flag in FLAGS:
logging.debug("%s : %s" % (flag, FLAGS.get(flag, None)))
for x in services:
x.start()
def wait():
while True:
greenthread.sleep(5)

View File

@ -25,11 +25,12 @@ and some black magic for inline callbacks.
import datetime
import sys
import time
import unittest
import mox
import stubout
from twisted.internet import defer
from twisted.trial import unittest
from twisted.trial import unittest as trial_unittest
from nova import context
from nova import db
@ -55,11 +56,11 @@ def skip_if_fake(func):
return _skipper
class TrialTestCase(unittest.TestCase):
class TestCase(unittest.TestCase):
"""Test case base class for all unit tests"""
def setUp(self):
"""Run before each test method to initialize test environment"""
super(TrialTestCase, self).setUp()
super(TestCase, self).setUp()
# NOTE(vish): We need a better method for creating fixtures for tests
# now that we have some required db setup for the system
# to work properly.
@ -94,7 +95,87 @@ class TrialTestCase(unittest.TestCase):
db.fixed_ip_disassociate_all_by_timeout(ctxt, FLAGS.host,
self.start)
db.network_disassociate_all(ctxt)
rpc.Consumer.attach_to_twisted = self.originalAttach
rpc.Consumer.attach_to_eventlet = self.originalAttach
for x in self.injected:
try:
x.stop()
except AssertionError:
pass
if FLAGS.fake_rabbit:
fakerabbit.reset_all()
db.security_group_destroy_all(ctxt)
super(TestCase, self).tearDown()
finally:
self.reset_flags()
def flags(self, **kw):
"""Override flag variables for a test"""
for k, v in kw.iteritems():
if k in self.flag_overrides:
self.reset_flags()
raise Exception(
'trying to override already overriden flag: %s' % k)
self.flag_overrides[k] = getattr(FLAGS, k)
setattr(FLAGS, k, v)
def reset_flags(self):
"""Resets all flag variables for the test. Runs after each test"""
FLAGS.Reset()
for k, v in self._original_flags.iteritems():
setattr(FLAGS, k, v)
def _monkey_patch_attach(self):
self.originalAttach = rpc.Consumer.attach_to_eventlet
def _wrapped(innerSelf):
rv = self.originalAttach(innerSelf)
self.injected.append(rv)
return rv
_wrapped.func_name = self.originalAttach.func_name
rpc.Consumer.attach_to_eventlet = _wrapped
class TrialTestCase(trial_unittest.TestCase):
"""Test case base class for all unit tests"""
def setUp(self):
"""Run before each test method to initialize test environment"""
super(TrialTestCase, self).setUp()
# NOTE(vish): We need a better method for creating fixtures for tests
# now that we have some required db setup for the system
# to work properly.
self.start = datetime.datetime.utcnow()
ctxt = context.get_admin_context()
if db.network_count(ctxt) != 5:
network_manager.VlanManager().create_networks(ctxt,
FLAGS.fixed_range,
5, 16,
FLAGS.vlan_start,
FLAGS.vpn_start)
# emulate some of the mox stuff, we can't use the metaclass
# because it screws with our generators
self.mox = mox.Mox()
self.stubs = stubout.StubOutForTesting()
self.flag_overrides = {}
self.injected = []
self._original_flags = FLAGS.FlagValuesDict()
def tearDown(self):
"""Runs after each test method to finalize/tear down test
environment."""
try:
self.mox.UnsetStubs()
self.stubs.UnsetAll()
self.stubs.SmartUnsetAll()
self.mox.VerifyAll()
# NOTE(vish): Clean up any ips associated during the test.
ctxt = context.get_admin_context()
db.fixed_ip_disassociate_all_by_timeout(ctxt, FLAGS.host,
self.start)
db.network_disassociate_all(ctxt)
for x in self.injected:
try:
x.stop()
@ -147,14 +228,3 @@ class TrialTestCase(unittest.TestCase):
return d
_wrapped.func_name = func.func_name
return _wrapped
def _monkey_patch_attach(self):
self.originalAttach = rpc.Consumer.attach_to_twisted
def _wrapped(innerSelf):
rv = self.originalAttach(innerSelf)
self.injected.append(rv)
return rv
_wrapped.func_name = self.originalAttach.func_name
rpc.Consumer.attach_to_twisted = _wrapped

View File

@ -29,3 +29,8 @@
.. moduleauthor:: Manish Singh <yosh@gimp.org>
.. moduleauthor:: Andy Smith <andy@anarkystic.com>
"""
# See http://code.google.com/p/python-nose/issues/detail?id=373
# The code below enables nosetests to work with i18n _() blocks
import __builtin__
setattr(__builtin__, '_', lambda x: x)

View File

@ -35,7 +35,7 @@ class Context(object):
pass
class AccessTestCase(test.TrialTestCase):
class AccessTestCase(test.TestCase):
def setUp(self):
super(AccessTestCase, self).setUp()
um = manager.AuthManager()

View File

@ -326,12 +326,12 @@ class AuthManagerTestCase(object):
self.assertTrue(user.is_admin())
class AuthManagerLdapTestCase(AuthManagerTestCase, test.TrialTestCase):
class AuthManagerLdapTestCase(AuthManagerTestCase, test.TestCase):
auth_driver = 'nova.auth.ldapdriver.FakeLdapDriver'
def __init__(self, *args, **kwargs):
AuthManagerTestCase.__init__(self)
test.TrialTestCase.__init__(self, *args, **kwargs)
test.TestCase.__init__(self, *args, **kwargs)
import nova.auth.fakeldap as fakeldap
FLAGS.redis_db = 8
if FLAGS.flush_db:
@ -343,7 +343,7 @@ class AuthManagerLdapTestCase(AuthManagerTestCase, test.TrialTestCase):
self.skip = True
class AuthManagerDbTestCase(AuthManagerTestCase, test.TrialTestCase):
class AuthManagerDbTestCase(AuthManagerTestCase, test.TestCase):
auth_driver = 'nova.auth.dbdriver.DbDriver'

View File

@ -27,8 +27,6 @@ import tempfile
import time
from eventlet import greenthread
from twisted.internet import defer
import unittest
from xml.etree import ElementTree
from nova import context
@ -53,7 +51,7 @@ IMAGES_PATH = os.path.join(OSS_TEMPDIR, 'images')
os.makedirs(IMAGES_PATH)
class CloudTestCase(test.TrialTestCase):
class CloudTestCase(test.TestCase):
def setUp(self):
super(CloudTestCase, self).setUp()
self.flags(connection_type='fake', images_path=IMAGES_PATH)
@ -199,7 +197,7 @@ class CloudTestCase(test.TrialTestCase):
logging.debug("Need to watch instance %s until it's running..." %
instance['instance_id'])
while True:
rv = yield defer.succeed(time.sleep(1))
greenthread.sleep(1)
info = self.cloud._get_instance(instance['instance_id'])
logging.debug(info['state'])
if info['state'] == power_state.RUNNING:

View File

@ -22,8 +22,6 @@ Tests For Compute
import datetime
import logging
from twisted.internet import defer
from nova import context
from nova import db
from nova import exception
@ -33,10 +31,11 @@ from nova import utils
from nova.auth import manager
from nova.compute import api as compute_api
FLAGS = flags.FLAGS
class ComputeTestCase(test.TrialTestCase):
class ComputeTestCase(test.TestCase):
"""Test case for compute"""
def setUp(self):
logging.getLogger().setLevel(logging.DEBUG)
@ -94,24 +93,22 @@ class ComputeTestCase(test.TrialTestCase):
db.security_group_destroy(self.context, group['id'])
db.instance_destroy(self.context, ref[0]['id'])
@defer.inlineCallbacks
def test_run_terminate(self):
"""Make sure it is possible to run and terminate instance"""
instance_id = self._create_instance()
yield self.compute.run_instance(self.context, instance_id)
self.compute.run_instance(self.context, instance_id)
instances = db.instance_get_all(context.get_admin_context())
logging.info("Running instances: %s", instances)
self.assertEqual(len(instances), 1)
yield self.compute.terminate_instance(self.context, instance_id)
self.compute.terminate_instance(self.context, instance_id)
instances = db.instance_get_all(context.get_admin_context())
logging.info("After terminating instances: %s", instances)
self.assertEqual(len(instances), 0)
@defer.inlineCallbacks
def test_run_terminate_timestamps(self):
"""Make sure timestamps are set for launched and destroyed"""
instance_id = self._create_instance()
@ -119,42 +116,40 @@ class ComputeTestCase(test.TrialTestCase):
self.assertEqual(instance_ref['launched_at'], None)
self.assertEqual(instance_ref['deleted_at'], None)
launch = datetime.datetime.utcnow()
yield self.compute.run_instance(self.context, instance_id)
self.compute.run_instance(self.context, instance_id)
instance_ref = db.instance_get(self.context, instance_id)
self.assert_(instance_ref['launched_at'] > launch)
self.assertEqual(instance_ref['deleted_at'], None)
terminate = datetime.datetime.utcnow()
yield self.compute.terminate_instance(self.context, instance_id)
self.compute.terminate_instance(self.context, instance_id)
self.context = self.context.elevated(True)
instance_ref = db.instance_get(self.context, instance_id)
self.assert_(instance_ref['launched_at'] < terminate)
self.assert_(instance_ref['deleted_at'] > terminate)
@defer.inlineCallbacks
def test_reboot(self):
"""Ensure instance can be rebooted"""
instance_id = self._create_instance()
yield self.compute.run_instance(self.context, instance_id)
yield self.compute.reboot_instance(self.context, instance_id)
yield self.compute.terminate_instance(self.context, instance_id)
self.compute.run_instance(self.context, instance_id)
self.compute.reboot_instance(self.context, instance_id)
self.compute.terminate_instance(self.context, instance_id)
@defer.inlineCallbacks
def test_console_output(self):
"""Make sure we can get console output from instance"""
instance_id = self._create_instance()
yield self.compute.run_instance(self.context, instance_id)
self.compute.run_instance(self.context, instance_id)
console = yield self.compute.get_console_output(self.context,
console = self.compute.get_console_output(self.context,
instance_id)
self.assert_(console)
yield self.compute.terminate_instance(self.context, instance_id)
self.compute.terminate_instance(self.context, instance_id)
@defer.inlineCallbacks
def test_run_instance_existing(self):
"""Ensure failure when running an instance that already exists"""
instance_id = self._create_instance()
yield self.compute.run_instance(self.context, instance_id)
self.assertFailure(self.compute.run_instance(self.context,
instance_id),
exception.Error)
yield self.compute.terminate_instance(self.context, instance_id)
self.compute.run_instance(self.context, instance_id)
self.assertRaises(exception.Error,
self.compute.run_instance,
self.context,
instance_id)
self.compute.terminate_instance(self.context, instance_id)

View File

@ -24,7 +24,7 @@ FLAGS = flags.FLAGS
flags.DEFINE_string('flags_unittest', 'foo', 'for testing purposes only')
class FlagsTestCase(test.TrialTestCase):
class FlagsTestCase(test.TestCase):
def setUp(self):
super(FlagsTestCase, self).setUp()

View File

@ -20,7 +20,7 @@ from nova import test
from nova.utils import parse_mailmap, str_dict_replace
class ProjectTestCase(test.TrialTestCase):
class ProjectTestCase(test.TestCase):
def test_authors_up_to_date(self):
if os.path.exists('../.bzr'):
contributors = set()
@ -30,23 +30,26 @@ class ProjectTestCase(test.TrialTestCase):
import bzrlib.workingtree
tree = bzrlib.workingtree.WorkingTree.open('..')
tree.lock_read()
parents = tree.get_parent_ids()
g = tree.branch.repository.get_graph()
for p in parents[1:]:
rev_ids = [r for r, _ in g.iter_ancestry(parents)
if r != "null:"]
revs = tree.branch.repository.get_revisions(rev_ids)
for r in revs:
for author in r.get_apparent_authors():
email = author.split(' ')[-1]
contributors.add(str_dict_replace(email, mailmap))
try:
parents = tree.get_parent_ids()
g = tree.branch.repository.get_graph()
for p in parents[1:]:
rev_ids = [r for r, _ in g.iter_ancestry(parents)
if r != "null:"]
revs = tree.branch.repository.get_revisions(rev_ids)
for r in revs:
for author in r.get_apparent_authors():
email = author.split(' ')[-1]
contributors.add(str_dict_replace(email, mailmap))
authors_file = open('../Authors', 'r').read()
authors_file = open('../Authors', 'r').read()
missing = set()
for contributor in contributors:
if not contributor in authors_file:
missing.add(contributor)
missing = set()
for contributor in contributors:
if not contributor in authors_file:
missing.add(contributor)
self.assertTrue(len(missing) == 0,
'%r not listed in Authors' % missing)
self.assertTrue(len(missing) == 0,
'%r not listed in Authors' % missing)
finally:
tree.unlock()

View File

@ -33,7 +33,7 @@ from nova.auth import manager
FLAGS = flags.FLAGS
class NetworkTestCase(test.TrialTestCase):
class NetworkTestCase(test.TestCase):
"""Test cases for network code"""
def setUp(self):
super(NetworkTestCase, self).setUp()

View File

@ -54,7 +54,7 @@ os.makedirs(os.path.join(OSS_TEMPDIR, 'images'))
os.makedirs(os.path.join(OSS_TEMPDIR, 'buckets'))
class ObjectStoreTestCase(test.TrialTestCase):
class ObjectStoreTestCase(test.TestCase):
"""Test objectstore API directly."""
def setUp(self):
@ -191,7 +191,7 @@ class TestSite(server.Site):
protocol = TestHTTPChannel
class S3APITestCase(test.TrialTestCase):
class S3APITestCase(test.TestCase):
"""Test objectstore through S3 API."""
def setUp(self):

View File

@ -1,132 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
from twisted.internet import defer
from twisted.internet import reactor
from xml.etree import ElementTree
from nova import exception
from nova import flags
from nova import process
from nova import test
from nova import utils
FLAGS = flags.FLAGS
class ProcessTestCase(test.TrialTestCase):
def setUp(self):
logging.getLogger().setLevel(logging.DEBUG)
super(ProcessTestCase, self).setUp()
def test_execute_stdout(self):
pool = process.ProcessPool(2)
d = pool.simple_execute('echo test')
def _check(rv):
self.assertEqual(rv[0], 'test\n')
self.assertEqual(rv[1], '')
d.addCallback(_check)
d.addErrback(self.fail)
return d
def test_execute_stderr(self):
pool = process.ProcessPool(2)
d = pool.simple_execute('cat BAD_FILE', check_exit_code=False)
def _check(rv):
self.assertEqual(rv[0], '')
self.assert_('No such file' in rv[1])
d.addCallback(_check)
d.addErrback(self.fail)
return d
def test_execute_unexpected_stderr(self):
pool = process.ProcessPool(2)
d = pool.simple_execute('cat BAD_FILE')
d.addCallback(lambda x: self.fail('should have raised an error'))
d.addErrback(lambda failure: failure.trap(IOError))
return d
def test_max_processes(self):
pool = process.ProcessPool(2)
d1 = pool.simple_execute('sleep 0.01')
d2 = pool.simple_execute('sleep 0.01')
d3 = pool.simple_execute('sleep 0.005')
d4 = pool.simple_execute('sleep 0.005')
called = []
def _called(rv, name):
called.append(name)
d1.addCallback(_called, 'd1')
d2.addCallback(_called, 'd2')
d3.addCallback(_called, 'd3')
d4.addCallback(_called, 'd4')
# Make sure that d3 and d4 had to wait on the other two and were called
# in order
# NOTE(termie): there may be a race condition in this test if for some
# reason one of the sleeps takes longer to complete
# than it should
d4.addCallback(lambda x: self.assertEqual(called[2], 'd3'))
d4.addCallback(lambda x: self.assertEqual(called[3], 'd4'))
d4.addErrback(self.fail)
return d4
def test_kill_long_process(self):
pool = process.ProcessPool(2)
d1 = pool.simple_execute('sleep 1')
d2 = pool.simple_execute('sleep 0.005')
timeout = reactor.callLater(0.1, self.fail, 'should have been killed')
# kill d1 and wait on it to end then cancel the timeout
d2.addCallback(lambda _: d1.process.signalProcess('KILL'))
d2.addCallback(lambda _: d1)
d2.addBoth(lambda _: timeout.active() and timeout.cancel())
d2.addErrback(self.fail)
return d2
def test_process_exit_is_contained(self):
pool = process.ProcessPool(2)
d1 = pool.simple_execute('sleep 1')
d1.addCallback(lambda x: self.fail('should have errbacked'))
d1.addErrback(lambda fail: fail.trap(IOError))
reactor.callLater(0.05, d1.process.signalProcess, 'KILL')
return d1
def test_shared_pool_is_singleton(self):
pool1 = process.SharedPool()
pool2 = process.SharedPool()
self.assertEqual(id(pool1._instance), id(pool2._instance))
def test_shared_pool_works_as_singleton(self):
d1 = process.simple_execute('sleep 1')
d2 = process.simple_execute('sleep 0.005')
# lp609749: would have failed with
# exceptions.AssertionError: Someone released me too many times:
# too many tokens!
return d1

View File

@ -32,7 +32,7 @@ from nova.api.ec2 import cloud
FLAGS = flags.FLAGS
class QuotaTestCase(test.TrialTestCase):
class QuotaTestCase(test.TestCase):
def setUp(self):
logging.getLogger().setLevel(logging.DEBUG)
super(QuotaTestCase, self).setUp()

View File

@ -20,8 +20,6 @@ Unit Tests for remote procedure calls using queue
"""
import logging
from twisted.internet import defer
from nova import context
from nova import flags
from nova import rpc
@ -31,7 +29,7 @@ from nova import test
FLAGS = flags.FLAGS
class RpcTestCase(test.TrialTestCase):
class RpcTestCase(test.TestCase):
"""Test cases for rpc"""
def setUp(self):
super(RpcTestCase, self).setUp()
@ -40,23 +38,22 @@ class RpcTestCase(test.TrialTestCase):
self.consumer = rpc.AdapterConsumer(connection=self.conn,
topic='test',
proxy=self.receiver)
self.consumer.attach_to_twisted()
self.consumer.attach_to_eventlet()
self.context = context.get_admin_context()
def test_call_succeed(self):
"""Get a value through rpc call"""
value = 42
result = yield rpc.call_twisted(self.context,
'test', {"method": "echo",
result = rpc.call(self.context, 'test', {"method": "echo",
"args": {"value": value}})
self.assertEqual(value, result)
def test_context_passed(self):
"""Makes sure a context is passed through rpc call"""
value = 42
result = yield rpc.call_twisted(self.context,
'test', {"method": "context",
"args": {"value": value}})
result = rpc.call(self.context,
'test', {"method": "context",
"args": {"value": value}})
self.assertEqual(self.context.to_dict(), result)
def test_call_exception(self):
@ -67,14 +64,17 @@ class RpcTestCase(test.TrialTestCase):
to an int in the test.
"""
value = 42
self.assertFailure(rpc.call_twisted(self.context, 'test',
{"method": "fail",
"args": {"value": value}}),
rpc.RemoteError)
self.assertRaises(rpc.RemoteError,
rpc.call,
self.context,
'test',
{"method": "fail",
"args": {"value": value}})
try:
yield rpc.call_twisted(self.context,
'test', {"method": "fail",
"args": {"value": value}})
rpc.call(self.context,
'test',
{"method": "fail",
"args": {"value": value}})
self.fail("should have thrown rpc.RemoteError")
except rpc.RemoteError as exc:
self.assertEqual(int(exc.value), value)
@ -89,13 +89,13 @@ class TestReceiver(object):
def echo(context, value):
"""Simply returns whatever value is sent in"""
logging.debug("Received %s", value)
return defer.succeed(value)
return value
@staticmethod
def context(context, value):
"""Returns dictionary version of context"""
logging.debug("Received %s", context)
return defer.succeed(context.to_dict())
return context.to_dict()
@staticmethod
def fail(context, value):

View File

@ -44,7 +44,7 @@ class TestDriver(driver.Scheduler):
return 'named_host'
class SchedulerTestCase(test.TrialTestCase):
class SchedulerTestCase(test.TestCase):
"""Test case for scheduler"""
def setUp(self):
super(SchedulerTestCase, self).setUp()
@ -73,7 +73,7 @@ class SchedulerTestCase(test.TrialTestCase):
scheduler.named_method(ctxt, 'topic', num=7)
class SimpleDriverTestCase(test.TrialTestCase):
class SimpleDriverTestCase(test.TestCase):
"""Test case for simple driver"""
def setUp(self):
super(SimpleDriverTestCase, self).setUp()
@ -122,12 +122,12 @@ class SimpleDriverTestCase(test.TrialTestCase):
'nova-compute',
'compute',
FLAGS.compute_manager)
compute1.startService()
compute1.start()
compute2 = service.Service('host2',
'nova-compute',
'compute',
FLAGS.compute_manager)
compute2.startService()
compute2.start()
hosts = self.scheduler.driver.hosts_up(self.context, 'compute')
self.assertEqual(len(hosts), 2)
compute1.kill()
@ -139,12 +139,12 @@ class SimpleDriverTestCase(test.TrialTestCase):
'nova-compute',
'compute',
FLAGS.compute_manager)
compute1.startService()
compute1.start()
compute2 = service.Service('host2',
'nova-compute',
'compute',
FLAGS.compute_manager)
compute2.startService()
compute2.start()
instance_id1 = self._create_instance()
compute1.run_instance(self.context, instance_id1)
instance_id2 = self._create_instance()
@ -162,12 +162,12 @@ class SimpleDriverTestCase(test.TrialTestCase):
'nova-compute',
'compute',
FLAGS.compute_manager)
compute1.startService()
compute1.start()
compute2 = service.Service('host2',
'nova-compute',
'compute',
FLAGS.compute_manager)
compute2.startService()
compute2.start()
instance_ids1 = []
instance_ids2 = []
for index in xrange(FLAGS.max_cores):
@ -195,12 +195,12 @@ class SimpleDriverTestCase(test.TrialTestCase):
'nova-volume',
'volume',
FLAGS.volume_manager)
volume1.startService()
volume1.start()
volume2 = service.Service('host2',
'nova-volume',
'volume',
FLAGS.volume_manager)
volume2.startService()
volume2.start()
volume_id1 = self._create_volume()
volume1.create_volume(self.context, volume_id1)
volume_id2 = self._create_volume()
@ -218,12 +218,12 @@ class SimpleDriverTestCase(test.TrialTestCase):
'nova-volume',
'volume',
FLAGS.volume_manager)
volume1.startService()
volume1.start()
volume2 = service.Service('host2',
'nova-volume',
'volume',
FLAGS.volume_manager)
volume2.startService()
volume2.start()
volume_ids1 = []
volume_ids2 = []
for index in xrange(FLAGS.max_gigabytes):

View File

@ -22,9 +22,6 @@ Unit Tests for remote procedure calls using queue
import mox
from twisted.application.app import startApplication
from twisted.internet import defer
from nova import exception
from nova import flags
from nova import rpc
@ -48,7 +45,7 @@ class ExtendedService(service.Service):
return 'service'
class ServiceManagerTestCase(test.TrialTestCase):
class ServiceManagerTestCase(test.TestCase):
"""Test cases for Services"""
def test_attribute_error_for_no_manager(self):
@ -63,7 +60,7 @@ class ServiceManagerTestCase(test.TrialTestCase):
'test',
'test',
'nova.tests.service_unittest.FakeManager')
serv.startService()
serv.start()
self.assertEqual(serv.test_method(), 'manager')
def test_override_manager_method(self):
@ -71,11 +68,11 @@ class ServiceManagerTestCase(test.TrialTestCase):
'test',
'test',
'nova.tests.service_unittest.FakeManager')
serv.startService()
serv.start()
self.assertEqual(serv.test_method(), 'service')
class ServiceTestCase(test.TrialTestCase):
class ServiceTestCase(test.TestCase):
"""Test cases for Services"""
def setUp(self):
@ -94,8 +91,6 @@ class ServiceTestCase(test.TrialTestCase):
self.mox.StubOutWithMock(rpc,
'AdapterConsumer',
use_mock_anything=True)
self.mox.StubOutWithMock(
service.task, 'LoopingCall', use_mock_anything=True)
rpc.AdapterConsumer(connection=mox.IgnoreArg(),
topic=topic,
proxy=mox.IsA(service.Service)).AndReturn(
@ -106,19 +101,8 @@ class ServiceTestCase(test.TrialTestCase):
proxy=mox.IsA(service.Service)).AndReturn(
rpc.AdapterConsumer)
rpc.AdapterConsumer.attach_to_twisted()
rpc.AdapterConsumer.attach_to_twisted()
# Stub out looping call a bit needlessly since we don't have an easy
# way to cancel it (yet) when the tests finishes
service.task.LoopingCall(mox.IgnoreArg()).AndReturn(
service.task.LoopingCall)
service.task.LoopingCall.start(interval=mox.IgnoreArg(),
now=mox.IgnoreArg())
service.task.LoopingCall(mox.IgnoreArg()).AndReturn(
service.task.LoopingCall)
service.task.LoopingCall.start(interval=mox.IgnoreArg(),
now=mox.IgnoreArg())
rpc.AdapterConsumer.attach_to_eventlet()
rpc.AdapterConsumer.attach_to_eventlet()
service_create = {'host': host,
'binary': binary,
@ -136,14 +120,14 @@ class ServiceTestCase(test.TrialTestCase):
service_create).AndReturn(service_ref)
self.mox.ReplayAll()
startApplication(app, False)
app.start()
app.stop()
self.assert_(app)
# We're testing sort of weird behavior in how report_state decides
# whether it is disconnected, it looks for a variable on itself called
# 'model_disconnected' and report_state doesn't really do much so this
# these are mostly just for coverage
@defer.inlineCallbacks
def test_report_state_no_service(self):
host = 'foo'
binary = 'bar'
@ -173,10 +157,9 @@ class ServiceTestCase(test.TrialTestCase):
binary,
topic,
'nova.tests.service_unittest.FakeManager')
serv.startService()
yield serv.report_state()
serv.start()
serv.report_state()
@defer.inlineCallbacks
def test_report_state_newly_disconnected(self):
host = 'foo'
binary = 'bar'
@ -204,11 +187,10 @@ class ServiceTestCase(test.TrialTestCase):
binary,
topic,
'nova.tests.service_unittest.FakeManager')
serv.startService()
yield serv.report_state()
serv.start()
serv.report_state()
self.assert_(serv.model_disconnected)
@defer.inlineCallbacks
def test_report_state_newly_connected(self):
host = 'foo'
binary = 'bar'
@ -238,8 +220,8 @@ class ServiceTestCase(test.TrialTestCase):
binary,
topic,
'nova.tests.service_unittest.FakeManager')
serv.startService()
serv.start()
serv.model_disconnected = True
yield serv.report_state()
serv.report_state()
self.assert_(not serv.model_disconnected)

View File

@ -1,42 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import unittest
from nova import flags
from nova import test
from nova import validate
class ValidationTestCase(test.TrialTestCase):
def setUp(self):
super(ValidationTestCase, self).setUp()
def tearDown(self):
super(ValidationTestCase, self).tearDown()
def test_type_validation(self):
self.assertTrue(type_case("foo", 5, 1))
self.assertRaises(TypeError, type_case, "bar", "5", 1)
self.assertRaises(TypeError, type_case, None, 5, 1)
@validate.typetest(instanceid=str, size=int, number_of_instances=int)
def type_case(instanceid, size, number_of_instances):
return True

View File

@ -30,7 +30,7 @@ FLAGS = flags.FLAGS
flags.DECLARE('instances_path', 'nova.compute.manager')
class LibvirtConnTestCase(test.TrialTestCase):
class LibvirtConnTestCase(test.TestCase):
def setUp(self):
super(LibvirtConnTestCase, self).setUp()
self.manager = manager.AuthManager()
@ -207,7 +207,7 @@ class LibvirtConnTestCase(test.TrialTestCase):
self.manager.delete_user(self.user)
class NWFilterTestCase(test.TrialTestCase):
class NWFilterTestCase(test.TestCase):
def setUp(self):
super(NWFilterTestCase, self).setUp()
@ -319,7 +319,7 @@ class NWFilterTestCase(test.TrialTestCase):
'project_id': 'fake'})
inst_id = instance_ref['id']
def _ensure_all_called(_):
def _ensure_all_called():
instance_filter = 'nova-instance-%s' % instance_ref['name']
secgroup_filter = 'nova-secgroup-%s' % self.security_group['id']
for required in [secgroup_filter, 'allow-dhcp-server',
@ -337,7 +337,6 @@ class NWFilterTestCase(test.TrialTestCase):
instance = db.instance_get(self.context, inst_id)
d = self.fw.setup_nwfilters_for_instance(instance)
d.addCallback(_ensure_all_called)
d.addCallback(lambda _: self.teardown_security_group())
_ensure_all_called()
self.teardown_security_group()
return d

View File

@ -21,8 +21,6 @@ Tests for Volume Code.
"""
import logging
from twisted.internet import defer
from nova import context
from nova import exception
from nova import db
@ -33,7 +31,7 @@ from nova import utils
FLAGS = flags.FLAGS
class VolumeTestCase(test.TrialTestCase):
class VolumeTestCase(test.TestCase):
"""Test Case for volumes."""
def setUp(self):
@ -56,51 +54,48 @@ class VolumeTestCase(test.TrialTestCase):
vol['attach_status'] = "detached"
return db.volume_create(context.get_admin_context(), vol)['id']
@defer.inlineCallbacks
def test_create_delete_volume(self):
"""Test volume can be created and deleted."""
volume_id = self._create_volume()
yield self.volume.create_volume(self.context, volume_id)
self.volume.create_volume(self.context, volume_id)
self.assertEqual(volume_id, db.volume_get(context.get_admin_context(),
volume_id).id)
yield self.volume.delete_volume(self.context, volume_id)
self.volume.delete_volume(self.context, volume_id)
self.assertRaises(exception.NotFound,
db.volume_get,
self.context,
volume_id)
@defer.inlineCallbacks
def test_too_big_volume(self):
"""Ensure failure if a too large of a volume is requested."""
# FIXME(vish): validation needs to move into the data layer in
# volume_create
defer.returnValue(True)
return True
try:
volume_id = self._create_volume('1001')
yield self.volume.create_volume(self.context, volume_id)
self.volume.create_volume(self.context, volume_id)
self.fail("Should have thrown TypeError")
except TypeError:
pass
@defer.inlineCallbacks
def test_too_many_volumes(self):
"""Ensure that NoMoreTargets is raised when we run out of volumes."""
vols = []
total_slots = FLAGS.iscsi_num_targets
for _index in xrange(total_slots):
volume_id = self._create_volume()
yield self.volume.create_volume(self.context, volume_id)
self.volume.create_volume(self.context, volume_id)
vols.append(volume_id)
volume_id = self._create_volume()
self.assertFailure(self.volume.create_volume(self.context,
volume_id),
db.NoMoreTargets)
self.assertRaises(db.NoMoreTargets,
self.volume.create_volume,
self.context,
volume_id)
db.volume_destroy(context.get_admin_context(), volume_id)
for volume_id in vols:
yield self.volume.delete_volume(self.context, volume_id)
self.volume.delete_volume(self.context, volume_id)
@defer.inlineCallbacks
def test_run_attach_detach_volume(self):
"""Make sure volume can be attached and detached from instance."""
inst = {}
@ -115,15 +110,15 @@ class VolumeTestCase(test.TrialTestCase):
instance_id = db.instance_create(self.context, inst)['id']
mountpoint = "/dev/sdf"
volume_id = self._create_volume()
yield self.volume.create_volume(self.context, volume_id)
self.volume.create_volume(self.context, volume_id)
if FLAGS.fake_tests:
db.volume_attached(self.context, volume_id, instance_id,
mountpoint)
else:
yield self.compute.attach_volume(self.context,
instance_id,
volume_id,
mountpoint)
self.compute.attach_volume(self.context,
instance_id,
volume_id,
mountpoint)
vol = db.volume_get(context.get_admin_context(), volume_id)
self.assertEqual(vol['status'], "in-use")
self.assertEqual(vol['attach_status'], "attached")
@ -131,25 +126,26 @@ class VolumeTestCase(test.TrialTestCase):
instance_ref = db.volume_get_instance(self.context, volume_id)
self.assertEqual(instance_ref['id'], instance_id)
self.assertFailure(self.volume.delete_volume(self.context, volume_id),
exception.Error)
self.assertRaises(exception.Error,
self.volume.delete_volume,
self.context,
volume_id)
if FLAGS.fake_tests:
db.volume_detached(self.context, volume_id)
else:
yield self.compute.detach_volume(self.context,
instance_id,
volume_id)
self.compute.detach_volume(self.context,
instance_id,
volume_id)
vol = db.volume_get(self.context, volume_id)
self.assertEqual(vol['status'], "available")
yield self.volume.delete_volume(self.context, volume_id)
self.volume.delete_volume(self.context, volume_id)
self.assertRaises(exception.Error,
db.volume_get,
self.context,
volume_id)
db.instance_destroy(self.context, instance_id)
@defer.inlineCallbacks
def test_concurrent_volumes_get_different_targets(self):
"""Ensure multiple concurrent volumes get different targets."""
volume_ids = []
@ -164,15 +160,11 @@ class VolumeTestCase(test.TrialTestCase):
self.assert_(iscsi_target not in targets)
targets.append(iscsi_target)
logging.debug("Target %s allocated", iscsi_target)
deferreds = []
total_slots = FLAGS.iscsi_num_targets
for _index in xrange(total_slots):
volume_id = self._create_volume()
d = self.volume.create_volume(self.context, volume_id)
d.addCallback(_check)
d.addErrback(self.fail)
deferreds.append(d)
yield defer.DeferredList(deferreds)
_check(d)
for volume_id in volume_ids:
self.volume.delete_volume(self.context, volume_id)

View File

@ -31,7 +31,8 @@ import socket
import sys
from xml.sax import saxutils
from twisted.internet.threads import deferToThread
from eventlet import event
from eventlet import greenthread
from nova import exception
from nova import flags
@ -75,7 +76,7 @@ def fetchfile(url, target):
def execute(cmd, process_input=None, addl_env=None, check_exit_code=True):
logging.debug("Running cmd: %s", cmd)
logging.debug("Running cmd (subprocess): %s", cmd)
env = os.environ.copy()
if addl_env:
env.update(addl_env)
@ -95,6 +96,10 @@ def execute(cmd, process_input=None, addl_env=None, check_exit_code=True):
stdout=stdout,
stderr=stderr,
cmd=cmd)
# NOTE(termie): this appears to be necessary to let the subprocess call
# clean something up in between calls, without it two
# execute calls in a row hangs the second one
greenthread.sleep(0)
return result
@ -123,13 +128,7 @@ def debug(arg):
def runthis(prompt, cmd, check_exit_code=True):
logging.debug("Running %s" % (cmd))
exit_code = subprocess.call(cmd.split(" "))
logging.debug(prompt % (exit_code))
if check_exit_code and exit_code != 0:
raise ProcessExecutionError(exit_code=exit_code,
stdout=None,
stderr=None,
cmd=cmd)
rv, err = execute(cmd, check_exit_code=check_exit_code)
def generate_uid(topic, size=8):
@ -224,10 +223,41 @@ class LazyPluggable(object):
return getattr(backend, key)
def deferredToThread(f):
def g(*args, **kwargs):
return deferToThread(f, *args, **kwargs)
return g
class LoopingCall(object):
def __init__(self, f=None, *args, **kw):
self.args = args
self.kw = kw
self.f = f
self._running = False
def start(self, interval, now=True):
self._running = True
done = event.Event()
def _inner():
if not now:
greenthread.sleep(interval)
try:
while self._running:
self.f(*self.args, **self.kw)
greenthread.sleep(interval)
except Exception:
logging.exception('in looping call')
done.send_exception(*sys.exc_info())
return
done.send(True)
self.done = done
greenthread.spawn(_inner)
return self.done
def stop(self):
self._running = False
def wait(self):
return self.done.wait()
def xhtml_escape(value):

View File

@ -1,94 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Decorators for argument validation, courtesy of
http://rmi.net/~lutz/rangetest.html"""
def rangetest(**argchecks):
"""Validate ranges for both + defaults"""
def onDecorator(func):
"""onCall remembers func and argchecks"""
import sys
code = func.__code__ if sys.version_info[0] == 3 else func.func_code
allargs = code.co_varnames[:code.co_argcount]
funcname = func.__name__
def onCall(*pargs, **kargs):
# all pargs match first N args by position
# the rest must be in kargs or omitted defaults
positionals = list(allargs)
positionals = positionals[:len(pargs)]
for (argname, (low, high)) in argchecks.items():
# for all args to be checked
if argname in kargs:
# was passed by name
if float(kargs[argname]) < low or \
float(kargs[argname]) > high:
errmsg = '{0} argument "{1}" not in {2}..{3}'
errmsg = errmsg.format(funcname, argname, low, high)
raise TypeError(errmsg)
elif argname in positionals:
# was passed by position
position = positionals.index(argname)
if float(pargs[position]) < low or \
float(pargs[position]) > high:
errmsg = '{0} argument "{1}" with value of {4} ' \
'not in {2}..{3}'
errmsg = errmsg.format(funcname, argname, low, high,
pargs[position])
raise TypeError(errmsg)
else:
pass
return func(*pargs, **kargs) # okay: run original call
return onCall
return onDecorator
def typetest(**argchecks):
def onDecorator(func):
import sys
code = func.__code__ if sys.version_info[0] == 3 else func.func_code
allargs = code.co_varnames[:code.co_argcount]
funcname = func.__name__
def onCall(*pargs, **kargs):
positionals = list(allargs)[:len(pargs)]
for (argname, typeof) in argchecks.items():
if argname in kargs:
if not isinstance(kargs[argname], typeof):
errmsg = '{0} argument "{1}" not of type {2}'
errmsg = errmsg.format(funcname, argname, typeof)
raise TypeError(errmsg)
elif argname in positionals:
position = positionals.index(argname)
if not isinstance(pargs[position], typeof):
errmsg = '{0} argument "{1}" with value of {2} ' \
'not of type {3}'
errmsg = errmsg.format(funcname, argname,
pargs[position], typeof)
raise TypeError(errmsg)
else:
pass
return func(*pargs, **kargs)
return onCall
return onDecorator

View File

@ -25,8 +25,6 @@ semantics of real hypervisor connections.
"""
from twisted.internet import defer
from nova import exception
from nova.compute import power_state
@ -107,7 +105,6 @@ class FakeConnection(object):
fake_instance = FakeInstance()
self.instances[instance.name] = fake_instance
fake_instance._state = power_state.RUNNING
return defer.succeed(None)
def reboot(self, instance):
"""
@ -119,19 +116,19 @@ class FakeConnection(object):
The work will be done asynchronously. This function returns a
Deferred that allows the caller to detect when it is complete.
"""
return defer.succeed(None)
pass
def rescue(self, instance):
"""
Rescue the specified instance.
"""
return defer.succeed(None)
pass
def unrescue(self, instance):
"""
Unrescue the specified instance.
"""
return defer.succeed(None)
pass
def destroy(self, instance):
"""
@ -144,7 +141,6 @@ class FakeConnection(object):
Deferred that allows the caller to detect when it is complete.
"""
del self.instances[instance.name]
return defer.succeed(None)
def attach_volume(self, instance_name, device_path, mountpoint):
"""Attach the disk at device_path to the instance at mountpoint"""

View File

@ -26,7 +26,7 @@ import time
import urlparse
from nova import flags
from nova import process
from nova import utils
from nova.auth import manager
from nova.auth import signer
from nova.objectstore import image
@ -50,7 +50,7 @@ def _fetch_s3_image(image, path, user, project):
# This should probably move somewhere else, like e.g. a download_as
# method on User objects and at the same time get rewritten to use
# twisted web client.
# a web client.
headers = {}
headers['Date'] = time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime())
@ -63,15 +63,16 @@ def _fetch_s3_image(image, path, user, project):
cmd = ['/usr/bin/curl', '--fail', '--silent', url]
for (k, v) in headers.iteritems():
cmd += ['-H', '%s: %s' % (k, v)]
cmd += ['-H', '"%s: %s"' % (k, v)]
cmd += ['-o', path]
return process.SharedPool().execute(executable=cmd[0], args=cmd[1:])
cmd_out = ' '.join(cmd)
return utils.execute(cmd_out)
def _fetch_local_image(image, path, user, project):
source = _image_path('%s/image' % image)
return process.simple_execute('cp %s %s' % (source, path))
return utils.execute('cp %s %s' % (source, path))
def _image_path(path):

View File

@ -40,16 +40,15 @@ import logging
import os
import shutil
from eventlet import event
from eventlet import tpool
import IPy
from twisted.internet import defer
from twisted.internet import task
from twisted.internet import threads
from nova import context
from nova import db
from nova import exception
from nova import flags
from nova import process
from nova import utils
#from nova.api import context
from nova.auth import manager
@ -154,14 +153,12 @@ class LibvirtConnection(object):
except Exception as _err:
pass
# If the instance is already terminated, we're still happy
d = defer.Deferred()
if cleanup:
d.addCallback(lambda _: self._cleanup(instance))
# FIXME: What does this comment mean?
# TODO(termie): short-circuit me for tests
# WE'LL save this for when we do shutdown,
done = event.Event()
# We'll save this for when we do shutdown,
# instead of destroy - but destroy returns immediately
timer = task.LoopingCall(f=None)
timer = utils.LoopingCall(f=None)
def _wait_for_shutdown():
try:
@ -170,17 +167,26 @@ class LibvirtConnection(object):
instance['id'], state)
if state == power_state.SHUTDOWN:
timer.stop()
d.callback(None)
except Exception:
db.instance_set_state(context.get_admin_context(),
instance['id'],
power_state.SHUTDOWN)
timer.stop()
d.callback(None)
timer.f = _wait_for_shutdown
timer.start(interval=0.5, now=True)
return d
timer_done = timer.start(interval=0.5, now=True)
# NOTE(termie): this is strictly superfluous (we could put the
# cleanup code in the timer), but this emulates the
# previous model so I am keeping it around until
# everything has been vetted a bit
def _wait_for_timer():
timer_done.wait()
self._cleanup(instance)
done.send()
greenthread.spawn(_wait_for_timer)
return done
def _cleanup(self, instance):
target = os.path.join(FLAGS.instances_path, instance['name'])
@ -189,7 +195,6 @@ class LibvirtConnection(object):
if os.path.exists(target):
shutil.rmtree(target)
@defer.inlineCallbacks
@exception.wrap_exception
def attach_volume(self, instance_name, device_path, mountpoint):
virt_dom = self._conn.lookupByName(instance_name)
@ -200,7 +205,6 @@ class LibvirtConnection(object):
<target dev='%s' bus='virtio'/>
</disk>""" % (device_path, mount_device)
virt_dom.attachDevice(xml)
yield
def _get_disk_xml(self, xml, device):
"""Returns the xml for the disk mounted at device"""
@ -222,7 +226,6 @@ class LibvirtConnection(object):
if doc != None:
doc.freeDoc()
@defer.inlineCallbacks
@exception.wrap_exception
def detach_volume(self, instance_name, mountpoint):
virt_dom = self._conn.lookupByName(instance_name)
@ -231,17 +234,13 @@ class LibvirtConnection(object):
if not xml:
raise exception.NotFound("No disk at %s" % mount_device)
virt_dom.detachDevice(xml)
yield
@defer.inlineCallbacks
@exception.wrap_exception
def reboot(self, instance):
yield self.destroy(instance, False)
self.destroy(instance, False)
xml = self.to_xml(instance)
yield self._conn.createXML(xml, 0)
d = defer.Deferred()
timer = task.LoopingCall(f=None)
self._conn.createXML(xml, 0)
timer = utils.LoopingCall(f=None)
def _wait_for_reboot():
try:
@ -251,33 +250,28 @@ class LibvirtConnection(object):
if state == power_state.RUNNING:
logging.debug('instance %s: rebooted', instance['name'])
timer.stop()
d.callback(None)
except Exception, exn:
logging.error('_wait_for_reboot failed: %s', exn)
db.instance_set_state(context.get_admin_context(),
instance['id'],
power_state.SHUTDOWN)
timer.stop()
d.callback(None)
timer.f = _wait_for_reboot
timer.start(interval=0.5, now=True)
yield d
return timer.start(interval=0.5, now=True)
@defer.inlineCallbacks
@exception.wrap_exception
def rescue(self, instance):
yield self.destroy(instance, False)
self.destroy(instance, False)
xml = self.to_xml(instance, rescue=True)
rescue_images = {'image_id': FLAGS.rescue_image_id,
'kernel_id': FLAGS.rescue_kernel_id,
'ramdisk_id': FLAGS.rescue_ramdisk_id}
yield self._create_image(instance, xml, 'rescue-', rescue_images)
yield self._conn.createXML(xml, 0)
self._create_image(instance, xml, 'rescue-', rescue_images)
self._conn.createXML(xml, 0)
d = defer.Deferred()
timer = task.LoopingCall(f=None)
timer = utils.LoopingCall(f=None)
def _wait_for_rescue():
try:
@ -286,27 +280,22 @@ class LibvirtConnection(object):
if state == power_state.RUNNING:
logging.debug('instance %s: rescued', instance['name'])
timer.stop()
d.callback(None)
except Exception, exn:
logging.error('_wait_for_rescue failed: %s', exn)
db.instance_set_state(None,
instance['id'],
power_state.SHUTDOWN)
timer.stop()
d.callback(None)
timer.f = _wait_for_rescue
timer.start(interval=0.5, now=True)
yield d
return timer.start(interval=0.5, now=True)
@defer.inlineCallbacks
@exception.wrap_exception
def unrescue(self, instance):
# NOTE(vish): Because reboot destroys and recreates an instance using
# the normal xml file, we can just call reboot here
yield self.reboot(instance)
self.reboot(instance)
@defer.inlineCallbacks
@exception.wrap_exception
def spawn(self, instance):
xml = self.to_xml(instance)
@ -314,14 +303,12 @@ class LibvirtConnection(object):
instance['id'],
power_state.NOSTATE,
'launching')
yield NWFilterFirewall(self._conn).\
setup_nwfilters_for_instance(instance)
yield self._create_image(instance, xml)
yield self._conn.createXML(xml, 0)
NWFilterFirewall(self._conn).setup_nwfilters_for_instance(instance)
self._create_image(instance, xml)
self._conn.createXML(xml, 0)
logging.debug("instance %s: is running", instance['name'])
local_d = defer.Deferred()
timer = task.LoopingCall(f=None)
timer = utils.LoopingCall(f=None)
def _wait_for_boot():
try:
@ -331,7 +318,6 @@ class LibvirtConnection(object):
if state == power_state.RUNNING:
logging.debug('instance %s: booted', instance['name'])
timer.stop()
local_d.callback(None)
except:
logging.exception('instance %s: failed to boot',
instance['name'])
@ -339,10 +325,9 @@ class LibvirtConnection(object):
instance['id'],
power_state.SHUTDOWN)
timer.stop()
local_d.callback(None)
timer.f = _wait_for_boot
timer.start(interval=0.5, now=True)
yield local_d
return timer.start(interval=0.5, now=True)
def _flush_xen_console(self, virsh_output):
logging.info('virsh said: %r' % (virsh_output,))
@ -350,10 +335,9 @@ class LibvirtConnection(object):
if virsh_output.startswith('/dev/'):
logging.info('cool, it\'s a device')
d = process.simple_execute("sudo dd if=%s iflag=nonblock" %
virsh_output, check_exit_code=False)
d.addCallback(lambda r: r[0])
return d
out, err = utils.execute("sudo dd if=%s iflag=nonblock" %
virsh_output, check_exit_code=False)
return out
else:
return ''
@ -373,21 +357,20 @@ class LibvirtConnection(object):
def get_console_output(self, instance):
console_log = os.path.join(FLAGS.instances_path, instance['name'],
'console.log')
d = process.simple_execute('sudo chown %d %s' % (os.getuid(),
console_log))
if FLAGS.libvirt_type == 'xen':
# Xen is spethial
d.addCallback(lambda _:
process.simple_execute("virsh ttyconsole %s" %
instance['name']))
d.addCallback(self._flush_xen_console)
d.addCallback(self._append_to_file, console_log)
else:
d.addCallback(lambda _: defer.succeed(console_log))
d.addCallback(self._dump_file)
return d
@defer.inlineCallbacks
utils.execute('sudo chown %d %s' % (os.getuid(), console_log))
if FLAGS.libvirt_type == 'xen':
# Xen is special
virsh_output = utils.execute("virsh ttyconsole %s" %
instance['name'])
data = self._flush_xen_console(virsh_output)
fpath = self._append_to_file(data, console_log)
else:
fpath = console_log
return self._dump_file(fpath)
def _create_image(self, inst, libvirt_xml, prefix='', disk_images=None):
# syntactic nicety
basepath = lambda fname = '', prefix = prefix: os.path.join(
@ -396,8 +379,8 @@ class LibvirtConnection(object):
prefix + fname)
# ensure directories exist and are writable
yield process.simple_execute('mkdir -p %s' % basepath(prefix=''))
yield process.simple_execute('chmod 0777 %s' % basepath(prefix=''))
utils.execute('mkdir -p %s' % basepath(prefix=''))
utils.execute('chmod 0777 %s' % basepath(prefix=''))
# TODO(termie): these are blocking calls, it would be great
# if they weren't.
@ -418,22 +401,22 @@ class LibvirtConnection(object):
'kernel_id': inst['kernel_id'],
'ramdisk_id': inst['ramdisk_id']}
if not os.path.exists(basepath('disk')):
yield images.fetch(inst.image_id, basepath('disk-raw'), user,
project)
images.fetch(inst.image_id, basepath('disk-raw'), user,
project)
if inst['kernel_id']:
if not os.path.exists(basepath('kernel')):
yield images.fetch(inst['kernel_id'], basepath('kernel'),
user, project)
images.fetch(inst['kernel_id'], basepath('kernel'),
user, project)
if inst['ramdisk_id']:
if not os.path.exists(basepath('ramdisk')):
yield images.fetch(inst['ramdisk_id'], basepath('ramdisk'),
user, project)
images.fetch(inst['ramdisk_id'], basepath('ramdisk'),
user, project)
execute = lambda cmd, process_input = None, check_exit_code = True: \
process.simple_execute(cmd=cmd,
process_input=process_input,
check_exit_code=check_exit_code)
def execute(cmd, process_input=None, check_exit_code=True):
return utils.execute(cmd=cmd,
process_input=process_input,
check_exit_code=check_exit_code)
# For now, we assume that if we're not using a kernel, we're using a
# partitioned disk image where the target partition is the first
@ -463,9 +446,9 @@ class LibvirtConnection(object):
logging.info('instance %s: injecting net into image %s',
inst['name'], inst.image_id)
try:
yield disk.inject_data(basepath('disk-raw'), key, net,
partition=target_partition,
execute=execute)
disk.inject_data(basepath('disk-raw'), key, net,
partition=target_partition,
execute=execute)
except Exception as e:
# This could be a windows image, or a vmdk format disk
logging.warn('instance %s: ignoring error injecting data'
@ -474,7 +457,7 @@ class LibvirtConnection(object):
if inst['kernel_id']:
if os.path.exists(basepath('disk')):
yield process.simple_execute('rm -f %s' % basepath('disk'))
utils.execute('rm -f %s' % basepath('disk'))
local_bytes = (instance_types.INSTANCE_TYPES[inst.instance_type]
['local_gb']
@ -485,15 +468,14 @@ class LibvirtConnection(object):
resize = False
if inst['kernel_id']:
yield disk.partition(basepath('disk-raw'), basepath('disk'),
local_bytes, resize, execute=execute)
disk.partition(basepath('disk-raw'), basepath('disk'),
local_bytes, resize, execute=execute)
else:
os.rename(basepath('disk-raw'), basepath('disk'))
yield disk.extend(basepath('disk'), local_bytes, execute=execute)
disk.extend(basepath('disk'), local_bytes, execute=execute)
if FLAGS.libvirt_type == 'uml':
yield process.simple_execute('sudo chown root %s' %
basepath('disk'))
utils.execute('sudo chown root %s' % basepath('disk'))
def to_xml(self, instance, rescue=False):
# TODO(termie): cache?
@ -758,15 +740,15 @@ class NWFilterFirewall(object):
def _define_filter(self, xml):
if callable(xml):
xml = xml()
d = threads.deferToThread(self._conn.nwfilterDefineXML, xml)
return d
# execute in a native thread and block current greenthread until done
tpool.execute(self._conn.nwfilterDefineXML, xml)
@staticmethod
def _get_net_and_mask(cidr):
net = IPy.IP(cidr)
return str(net.net()), str(net.netmask())
@defer.inlineCallbacks
def setup_nwfilters_for_instance(self, instance):
"""
Creates an NWFilter for the given instance. In the process,
@ -774,10 +756,10 @@ class NWFilterFirewall(object):
the base filter are all in place.
"""
yield self._define_filter(self.nova_base_ipv4_filter)
yield self._define_filter(self.nova_base_ipv6_filter)
yield self._define_filter(self.nova_dhcp_filter)
yield self._define_filter(self.nova_base_filter)
self._define_filter(self.nova_base_ipv4_filter)
self._define_filter(self.nova_base_ipv6_filter)
self._define_filter(self.nova_dhcp_filter)
self._define_filter(self.nova_base_filter)
nwfilter_xml = "<filter name='nova-instance-%s' chain='root'>\n" \
" <filterref filter='nova-base' />\n" % \
@ -789,20 +771,19 @@ class NWFilterFirewall(object):
net, mask = self._get_net_and_mask(network_ref['cidr'])
project_filter = self.nova_project_filter(instance['project_id'],
net, mask)
yield self._define_filter(project_filter)
self._define_filter(project_filter)
nwfilter_xml += " <filterref filter='nova-project-%s' />\n" % \
instance['project_id']
for security_group in instance.security_groups:
yield self.ensure_security_group_filter(security_group['id'])
self.ensure_security_group_filter(security_group['id'])
nwfilter_xml += " <filterref filter='nova-secgroup-%d' />\n" % \
security_group['id']
nwfilter_xml += "</filter>"
yield self._define_filter(nwfilter_xml)
return
self._define_filter(nwfilter_xml)
def ensure_security_group_filter(self, security_group_id):
return self._define_filter(

View File

@ -20,8 +20,6 @@ records and their attributes like bridges, PIFs, QoS, as well as
their lookup functions.
"""
from twisted.internet import defer
class NetworkHelper():
"""
@ -31,14 +29,12 @@ class NetworkHelper():
return
@classmethod
@defer.inlineCallbacks
def find_network_with_bridge(cls, session, bridge):
""" Return the network on which the bridge is attached, if found """
""" Return the network on which the bridge is attached, if found."""
expr = 'field "bridge" = "%s"' % bridge
networks = yield session.call_xenapi('network.get_all_records_where',
expr)
networks = session.call_xenapi('network.get_all_records_where', expr)
if len(networks) == 1:
defer.returnValue(networks.keys()[0])
return networks.keys()[0]
elif len(networks) > 1:
raise Exception('Found non-unique network for bridge %s' % bridge)
else:

View File

@ -21,18 +21,16 @@ their attributes like VDIs, VIFs, as well as their lookup functions.
import logging
import urllib
from twisted.internet import defer
from xml.dom import minidom
from nova import flags
from nova import utils
from nova.auth.manager import AuthManager
from nova.compute import instance_types
from nova.compute import power_state
from nova.virt import images
FLAGS = flags.FLAGS
XENAPI_POWER_STATE = {
@ -64,7 +62,6 @@ class VMHelper():
XenAPI = __import__('XenAPI')
@classmethod
@defer.inlineCallbacks
def create_vm(cls, session, instance, kernel, ramdisk):
"""Create a VM record. Returns a Deferred that gives the new
VM reference."""
@ -102,12 +99,11 @@ class VMHelper():
'other_config': {},
}
logging.debug('Created VM %s...', instance.name)
vm_ref = yield session.call_xenapi('VM.create', rec)
vm_ref = session.call_xenapi('VM.create', rec)
logging.debug('Created VM %s as %s.', instance.name, vm_ref)
defer.returnValue(vm_ref)
return vm_ref
@classmethod
@defer.inlineCallbacks
def create_vbd(cls, session, vm_ref, vdi_ref, userdevice, bootable):
"""Create a VBD record. Returns a Deferred that gives the new
VBD reference."""
@ -126,13 +122,12 @@ class VMHelper():
vbd_rec['qos_algorithm_params'] = {}
vbd_rec['qos_supported_algorithms'] = []
logging.debug('Creating VBD for VM %s, VDI %s ... ', vm_ref, vdi_ref)
vbd_ref = yield session.call_xenapi('VBD.create', vbd_rec)
vbd_ref = session.call_xenapi('VBD.create', vbd_rec)
logging.debug('Created VBD %s for VM %s, VDI %s.', vbd_ref, vm_ref,
vdi_ref)
defer.returnValue(vbd_ref)
return vbd_ref
@classmethod
@defer.inlineCallbacks
def create_vif(cls, session, vm_ref, network_ref, mac_address):
"""Create a VIF record. Returns a Deferred that gives the new
VIF reference."""
@ -148,13 +143,12 @@ class VMHelper():
vif_rec['qos_algorithm_params'] = {}
logging.debug('Creating VIF for VM %s, network %s ... ', vm_ref,
network_ref)
vif_ref = yield session.call_xenapi('VIF.create', vif_rec)
vif_ref = session.call_xenapi('VIF.create', vif_rec)
logging.debug('Created VIF %s for VM %s, network %s.', vif_ref,
vm_ref, network_ref)
defer.returnValue(vif_ref)
return vif_ref
@classmethod
@defer.inlineCallbacks
def fetch_image(cls, session, image, user, project, use_sr):
"""use_sr: True to put the image as a VDI in an SR, False to place
it on dom0's filesystem. The former is for VM disks, the latter for
@ -171,12 +165,11 @@ class VMHelper():
args['password'] = user.secret
if use_sr:
args['add_partition'] = 'true'
task = yield session.async_call_plugin('objectstore', fn, args)
uuid = yield session.wait_for_task(task)
defer.returnValue(uuid)
task = session.async_call_plugin('objectstore', fn, args)
uuid = session.wait_for_task(task)
return uuid
@classmethod
@utils.deferredToThread
def lookup(cls, session, i):
""" Look the instance i up, and returns it if available """
return VMHelper.lookup_blocking(session, i)
@ -194,7 +187,6 @@ class VMHelper():
return vms[0]
@classmethod
@utils.deferredToThread
def lookup_vm_vdis(cls, session, vm):
""" Look for the VDIs that are attached to the VM """
return VMHelper.lookup_vm_vdis_blocking(session, vm)

View File

@ -20,8 +20,6 @@ Management class for VM-related functions (spawn, reboot, etc).
import logging
from twisted.internet import defer
from nova import db
from nova import context
@ -49,10 +47,9 @@ class VMOps(object):
return [self._session.get_xenapi().VM.get_name_label(vm) \
for vm in self._session.get_xenapi().VM.get_all()]
@defer.inlineCallbacks
def spawn(self, instance):
""" Create VM instance """
vm = yield VMHelper.lookup(self._session, instance.name)
vm = VMHelper.lookup(self._session, instance.name)
if vm is not None:
raise Exception('Attempted to create non-unique name %s' %
instance.name)
@ -60,66 +57,63 @@ class VMOps(object):
bridge = db.project_get_network(context.get_admin_context(),
instance.project_id).bridge
network_ref = \
yield NetworkHelper.find_network_with_bridge(self._session, bridge)
NetworkHelper.find_network_with_bridge(self._session, bridge)
user = AuthManager().get_user(instance.user_id)
project = AuthManager().get_project(instance.project_id)
vdi_uuid = yield VMHelper.fetch_image(self._session,
instance.image_id, user, project, True)
kernel = yield VMHelper.fetch_image(self._session,
instance.kernel_id, user, project, False)
ramdisk = yield VMHelper.fetch_image(self._session,
instance.ramdisk_id, user, project, False)
vdi_ref = yield self._session.call_xenapi('VDI.get_by_uuid', vdi_uuid)
vm_ref = yield VMHelper.create_vm(self._session,
instance, kernel, ramdisk)
yield VMHelper.create_vbd(self._session, vm_ref, vdi_ref, 0, True)
vdi_uuid = VMHelper.fetch_image(
self._session, instance.image_id, user, project, True)
kernel = VMHelper.fetch_image(
self._session, instance.kernel_id, user, project, False)
ramdisk = VMHelper.fetch_image(
self._session, instance.ramdisk_id, user, project, False)
vdi_ref = self._session.call_xenapi('VDI.get_by_uuid', vdi_uuid)
vm_ref = VMHelper.create_vm(
self._session, instance, kernel, ramdisk)
VMHelper.create_vbd(self._session, vm_ref, vdi_ref, 0, True)
if network_ref:
yield VMHelper.create_vif(self._session, vm_ref,
network_ref, instance.mac_address)
VMHelper.create_vif(self._session, vm_ref,
network_ref, instance.mac_address)
logging.debug('Starting VM %s...', vm_ref)
yield self._session.call_xenapi('VM.start', vm_ref, False, False)
self._session.call_xenapi('VM.start', vm_ref, False, False)
logging.info('Spawning VM %s created %s.', instance.name,
vm_ref)
@defer.inlineCallbacks
def reboot(self, instance):
""" Reboot VM instance """
instance_name = instance.name
vm = yield VMHelper.lookup(self._session, instance_name)
vm = VMHelper.lookup(self._session, instance_name)
if vm is None:
raise Exception('instance not present %s' % instance_name)
task = yield self._session.call_xenapi('Async.VM.clean_reboot', vm)
yield self._session.wait_for_task(task)
task = self._session.call_xenapi('Async.VM.clean_reboot', vm)
self._session.wait_for_task(task)
@defer.inlineCallbacks
def destroy(self, instance):
""" Destroy VM instance """
vm = yield VMHelper.lookup(self._session, instance.name)
vm = VMHelper.lookup(self._session, instance.name)
if vm is None:
# Don't complain, just return. This lets us clean up instances
# that have already disappeared from the underlying platform.
defer.returnValue(None)
return
# Get the VDIs related to the VM
vdis = yield VMHelper.lookup_vm_vdis(self._session, vm)
vdis = VMHelper.lookup_vm_vdis(self._session, vm)
try:
task = yield self._session.call_xenapi('Async.VM.hard_shutdown',
task = self._session.call_xenapi('Async.VM.hard_shutdown',
vm)
yield self._session.wait_for_task(task)
self._session.wait_for_task(task)
except XenAPI.Failure, exc:
logging.warn(exc)
# Disk clean-up
if vdis:
for vdi in vdis:
try:
task = yield self._session.call_xenapi('Async.VDI.destroy',
vdi)
yield self._session.wait_for_task(task)
task = self._session.call_xenapi('Async.VDI.destroy', vdi)
self._session.wait_for_task(task)
except XenAPI.Failure, exc:
logging.warn(exc)
try:
task = yield self._session.call_xenapi('Async.VM.destroy', vm)
yield self._session.wait_for_task(task)
task = self._session.call_xenapi('Async.VM.destroy', vm)
self._session.wait_for_task(task)
except XenAPI.Failure, exc:
logging.warn(exc)
@ -131,14 +125,13 @@ class VMOps(object):
rec = self._session.get_xenapi().VM.get_record(vm)
return VMHelper.compile_info(rec)
@defer.inlineCallbacks
def get_diagnostics(self, instance_id):
"""Return data about VM diagnostics"""
vm = yield VMHelper.lookup(self._session, instance_id)
vm = VMHelper.lookup(self._session, instance_id)
if vm is None:
raise Exception("instance not present %s" % instance_id)
rec = yield self._session.get_xenapi().VM.get_record(vm)
defer.returnValue(VMHelper.compile_diagnostics(self._session, rec))
rec = self._session.get_xenapi().VM.get_record(vm)
return VMHelper.compile_diagnostics(self._session, rec)
def get_console_output(self, instance):
""" Return snapshot of console """

View File

@ -48,10 +48,11 @@ reactor thread if the VM.get_by_name_label or VM.get_record calls block.
"""
import logging
import sys
import xmlrpclib
from twisted.internet import defer
from twisted.internet import reactor
from eventlet import event
from eventlet import tpool
from nova import utils
from nova import flags
@ -159,53 +160,51 @@ class XenAPISession(object):
""" Return the xenapi host """
return self._session.xenapi.session.get_this_host(self._session.handle)
@utils.deferredToThread
def call_xenapi(self, method, *args):
"""Call the specified XenAPI method on a background thread. Returns
a Deferred for the result."""
"""Call the specified XenAPI method on a background thread."""
f = self._session.xenapi
for m in method.split('.'):
f = f.__getattr__(m)
return f(*args)
return tpool.execute(f, *args)
@utils.deferredToThread
def async_call_plugin(self, plugin, fn, args):
"""Call Async.host.call_plugin on a background thread. Returns a
Deferred with the task reference."""
return _unwrap_plugin_exceptions(
self._session.xenapi.Async.host.call_plugin,
self.get_xenapi_host(), plugin, fn, args)
"""Call Async.host.call_plugin on a background thread."""
return tpool.execute(_unwrap_plugin_exceptions,
self._session.xenapi.Async.host.call_plugin,
self.get_xenapi_host(), plugin, fn, args)
def wait_for_task(self, task):
"""Return a Deferred that will give the result of the given task.
The task is polled until it completes."""
d = defer.Deferred()
reactor.callLater(0, self._poll_task, task, d)
return d
@utils.deferredToThread
def _poll_task(self, task, deferred):
done = event.Event()
loop = utils.LoopingCall(self._poll_task, task, done)
loop.start(FLAGS.xenapi_task_poll_interval, now=True)
rv = done.wait()
loop.stop()
return rv
def _poll_task(self, task, done):
"""Poll the given XenAPI task, and fire the given Deferred if we
get a result."""
try:
#logging.debug('Polling task %s...', task)
status = self._session.xenapi.task.get_status(task)
if status == 'pending':
reactor.callLater(FLAGS.xenapi_task_poll_interval,
self._poll_task, task, deferred)
return
elif status == 'success':
result = self._session.xenapi.task.get_result(task)
logging.info('Task %s status: success. %s', task, result)
deferred.callback(_parse_xmlrpc_value(result))
done.send(_parse_xmlrpc_value(result))
else:
error_info = self._session.xenapi.task.get_error_info(task)
logging.warn('Task %s status: %s. %s', task, status,
error_info)
deferred.errback(XenAPI.Failure(error_info))
#logging.debug('Polling task %s done.', task)
done.send_exception(XenAPI.Failure(error_info))
#logging.debug('Polling task %s done.', task)
except XenAPI.Failure, exc:
logging.warn(exc)
deferred.errback(exc)
done.send_exception(*sys.exc_info())
def _unwrap_plugin_exceptions(func, *args, **kwargs):

View File

@ -22,12 +22,10 @@ Drivers for volumes.
import logging
import os
from twisted.internet import defer
import time
from nova import exception
from nova import flags
from nova import process
from nova import utils
@ -55,14 +53,13 @@ flags.DEFINE_string('iscsi_ip_prefix', '127.0',
class VolumeDriver(object):
"""Executes commands relating to Volumes."""
def __init__(self, execute=process.simple_execute,
def __init__(self, execute=utils.execute,
sync_exec=utils.execute, *args, **kwargs):
# NOTE(vish): db is set by Manager
self.db = None
self._execute = execute
self._sync_exec = sync_exec
@defer.inlineCallbacks
def _try_execute(self, command):
# NOTE(vish): Volume commands can partially fail due to timing, but
# running them a second time on failure will usually
@ -70,15 +67,15 @@ class VolumeDriver(object):
tries = 0
while True:
try:
yield self._execute(command)
defer.returnValue(True)
self._execute(command)
return True
except exception.ProcessExecutionError:
tries = tries + 1
if tries >= FLAGS.num_shell_tries:
raise
logging.exception("Recovering from a failed execute."
"Try number %s", tries)
yield self._execute("sleep %s" % tries ** 2)
time.sleep(tries ** 2)
def check_for_setup_error(self):
"""Returns an error if prerequisites aren't met"""
@ -86,53 +83,45 @@ class VolumeDriver(object):
raise exception.Error("volume group %s doesn't exist"
% FLAGS.volume_group)
@defer.inlineCallbacks
def create_volume(self, volume):
"""Creates a logical volume."""
if int(volume['size']) == 0:
sizestr = '100M'
else:
sizestr = '%sG' % volume['size']
yield self._try_execute("sudo lvcreate -L %s -n %s %s" %
(sizestr,
volume['name'],
FLAGS.volume_group))
self._try_execute("sudo lvcreate -L %s -n %s %s" %
(sizestr,
volume['name'],
FLAGS.volume_group))
@defer.inlineCallbacks
def delete_volume(self, volume):
"""Deletes a logical volume."""
yield self._try_execute("sudo lvremove -f %s/%s" %
(FLAGS.volume_group,
volume['name']))
self._try_execute("sudo lvremove -f %s/%s" %
(FLAGS.volume_group,
volume['name']))
@defer.inlineCallbacks
def local_path(self, volume):
yield # NOTE(vish): stops deprecation warning
# NOTE(vish): stops deprecation warning
escaped_group = FLAGS.volume_group.replace('-', '--')
escaped_name = volume['name'].replace('-', '--')
defer.returnValue("/dev/mapper/%s-%s" % (escaped_group,
escaped_name))
return "/dev/mapper/%s-%s" % (escaped_group, escaped_name)
def ensure_export(self, context, volume):
"""Synchronously recreates an export for a logical volume."""
raise NotImplementedError()
@defer.inlineCallbacks
def create_export(self, context, volume):
"""Exports the volume."""
raise NotImplementedError()
@defer.inlineCallbacks
def remove_export(self, context, volume):
"""Removes an export for a logical volume."""
raise NotImplementedError()
@defer.inlineCallbacks
def discover_volume(self, volume):
"""Discover volume on a remote host."""
raise NotImplementedError()
@defer.inlineCallbacks
def undiscover_volume(self, volume):
"""Undiscover volume on a remote host."""
raise NotImplementedError()
@ -155,14 +144,13 @@ class AOEDriver(VolumeDriver):
dev = {'shelf_id': shelf_id, 'blade_id': blade_id}
self.db.export_device_create_safe(context, dev)
@defer.inlineCallbacks
def create_export(self, context, volume):
"""Creates an export for a logical volume."""
self._ensure_blades(context)
(shelf_id,
blade_id) = self.db.volume_allocate_shelf_and_blade(context,
volume['id'])
yield self._try_execute(
self._try_execute(
"sudo vblade-persist setup %s %s %s /dev/%s/%s" %
(shelf_id,
blade_id,
@ -176,33 +164,30 @@ class AOEDriver(VolumeDriver):
# still works for the other volumes, so we
# just wait a bit for the current volume to
# be ready and ignore any errors.
yield self._execute("sleep 2")
yield self._execute("sudo vblade-persist auto all",
check_exit_code=False)
yield self._execute("sudo vblade-persist start all",
check_exit_code=False)
time.sleep(2)
self._execute("sudo vblade-persist auto all",
check_exit_code=False)
self._execute("sudo vblade-persist start all",
check_exit_code=False)
@defer.inlineCallbacks
def remove_export(self, context, volume):
"""Removes an export for a logical volume."""
(shelf_id,
blade_id) = self.db.volume_get_shelf_and_blade(context,
volume['id'])
yield self._try_execute("sudo vblade-persist stop %s %s" %
(shelf_id, blade_id))
yield self._try_execute("sudo vblade-persist destroy %s %s" %
(shelf_id, blade_id))
self._try_execute("sudo vblade-persist stop %s %s" %
(shelf_id, blade_id))
self._try_execute("sudo vblade-persist destroy %s %s" %
(shelf_id, blade_id))
@defer.inlineCallbacks
def discover_volume(self, _volume):
"""Discover volume on a remote host."""
yield self._execute("sudo aoe-discover")
yield self._execute("sudo aoe-stat", check_exit_code=False)
self._execute("sudo aoe-discover")
self._execute("sudo aoe-stat", check_exit_code=False)
@defer.inlineCallbacks
def undiscover_volume(self, _volume):
"""Undiscover volume on a remote host."""
yield
pass
class FakeAOEDriver(AOEDriver):
@ -252,7 +237,6 @@ class ISCSIDriver(VolumeDriver):
target = {'host': host, 'target_num': target_num}
self.db.iscsi_target_create_safe(context, target)
@defer.inlineCallbacks
def create_export(self, context, volume):
"""Creates an export for a logical volume."""
self._ensure_iscsi_targets(context, volume['host'])
@ -261,61 +245,55 @@ class ISCSIDriver(VolumeDriver):
volume['host'])
iscsi_name = "%s%s" % (FLAGS.iscsi_target_prefix, volume['name'])
volume_path = "/dev/%s/%s" % (FLAGS.volume_group, volume['name'])
yield self._execute("sudo ietadm --op new "
"--tid=%s --params Name=%s" %
(iscsi_target, iscsi_name))
yield self._execute("sudo ietadm --op new --tid=%s "
"--lun=0 --params Path=%s,Type=fileio" %
(iscsi_target, volume_path))
self._execute("sudo ietadm --op new "
"--tid=%s --params Name=%s" %
(iscsi_target, iscsi_name))
self._execute("sudo ietadm --op new --tid=%s "
"--lun=0 --params Path=%s,Type=fileio" %
(iscsi_target, volume_path))
@defer.inlineCallbacks
def remove_export(self, context, volume):
"""Removes an export for a logical volume."""
iscsi_target = self.db.volume_get_iscsi_target_num(context,
volume['id'])
yield self._execute("sudo ietadm --op delete --tid=%s "
"--lun=0" % iscsi_target)
yield self._execute("sudo ietadm --op delete --tid=%s" %
iscsi_target)
self._execute("sudo ietadm --op delete --tid=%s "
"--lun=0" % iscsi_target)
self._execute("sudo ietadm --op delete --tid=%s" %
iscsi_target)
@defer.inlineCallbacks
def _get_name_and_portal(self, volume_name, host):
"""Gets iscsi name and portal from volume name and host."""
(out, _err) = yield self._execute("sudo iscsiadm -m discovery -t "
"sendtargets -p %s" % host)
(out, _err) = self._execute("sudo iscsiadm -m discovery -t "
"sendtargets -p %s" % host)
for target in out.splitlines():
if FLAGS.iscsi_ip_prefix in target and volume_name in target:
(location, _sep, iscsi_name) = target.partition(" ")
break
iscsi_portal = location.split(",")[0]
defer.returnValue((iscsi_name, iscsi_portal))
return (iscsi_name, iscsi_portal)
@defer.inlineCallbacks
def discover_volume(self, volume):
"""Discover volume on a remote host."""
(iscsi_name,
iscsi_portal) = yield self._get_name_and_portal(volume['name'],
volume['host'])
yield self._execute("sudo iscsiadm -m node -T %s -p %s --login" %
(iscsi_name, iscsi_portal))
yield self._execute("sudo iscsiadm -m node -T %s -p %s --op update "
"-n node.startup -v automatic" %
(iscsi_name, iscsi_portal))
defer.returnValue("/dev/iscsi/%s" % volume['name'])
iscsi_name, iscsi_portal = self._get_name_and_portal(volume['name'],
volume['host'])
self._execute("sudo iscsiadm -m node -T %s -p %s --login" %
(iscsi_name, iscsi_portal))
self._execute("sudo iscsiadm -m node -T %s -p %s --op update "
"-n node.startup -v automatic" %
(iscsi_name, iscsi_portal))
return "/dev/iscsi/%s" % volume['name']
@defer.inlineCallbacks
def undiscover_volume(self, volume):
"""Undiscover volume on a remote host."""
(iscsi_name,
iscsi_portal) = yield self._get_name_and_portal(volume['name'],
volume['host'])
yield self._execute("sudo iscsiadm -m node -T %s -p %s --op update "
"-n node.startup -v manual" %
(iscsi_name, iscsi_portal))
yield self._execute("sudo iscsiadm -m node -T %s -p %s --logout " %
(iscsi_name, iscsi_portal))
yield self._execute("sudo iscsiadm -m node --op delete "
"--targetname %s" % iscsi_name)
iscsi_name, iscsi_portal = self._get_name_and_portal(volume['name'],
volume['host'])
self._execute("sudo iscsiadm -m node -T %s -p %s --op update "
"-n node.startup -v manual" %
(iscsi_name, iscsi_portal))
self._execute("sudo iscsiadm -m node -T %s -p %s --logout " %
(iscsi_name, iscsi_portal))
self._execute("sudo iscsiadm -m node --op delete "
"--targetname %s" % iscsi_name)
class FakeISCSIDriver(ISCSIDriver):

View File

@ -45,7 +45,6 @@ intact.
import logging
import datetime
from twisted.internet import defer
from nova import context
from nova import exception
@ -86,7 +85,6 @@ class VolumeManager(manager.Manager):
for volume in volumes:
self.driver.ensure_export(ctxt, volume)
@defer.inlineCallbacks
def create_volume(self, context, volume_id):
"""Creates and exports the volume."""
context = context.elevated()
@ -102,19 +100,18 @@ class VolumeManager(manager.Manager):
logging.debug("volume %s: creating lv of size %sG",
volume_ref['name'], volume_ref['size'])
yield self.driver.create_volume(volume_ref)
self.driver.create_volume(volume_ref)
logging.debug("volume %s: creating export", volume_ref['name'])
yield self.driver.create_export(context, volume_ref)
self.driver.create_export(context, volume_ref)
now = datetime.datetime.utcnow()
self.db.volume_update(context,
volume_ref['id'], {'status': 'available',
'launched_at': now})
logging.debug("volume %s: created successfully", volume_ref['name'])
defer.returnValue(volume_id)
return volume_id
@defer.inlineCallbacks
def delete_volume(self, context, volume_id):
"""Deletes and unexports volume."""
context = context.elevated()
@ -124,14 +121,13 @@ class VolumeManager(manager.Manager):
if volume_ref['host'] != self.host:
raise exception.Error("Volume is not local to this node")
logging.debug("volume %s: removing export", volume_ref['name'])
yield self.driver.remove_export(context, volume_ref)
self.driver.remove_export(context, volume_ref)
logging.debug("volume %s: deleting", volume_ref['name'])
yield self.driver.delete_volume(volume_ref)
self.driver.delete_volume(volume_ref)
self.db.volume_destroy(context, volume_id)
logging.debug("volume %s: deleted successfully", volume_ref['name'])
defer.returnValue(True)
return True
@defer.inlineCallbacks
def setup_compute_volume(self, context, volume_id):
"""Setup remote volume on compute host.
@ -139,17 +135,16 @@ class VolumeManager(manager.Manager):
context = context.elevated()
volume_ref = self.db.volume_get(context, volume_id)
if volume_ref['host'] == self.host and FLAGS.use_local_volumes:
path = yield self.driver.local_path(volume_ref)
path = self.driver.local_path(volume_ref)
else:
path = yield self.driver.discover_volume(volume_ref)
defer.returnValue(path)
path = self.driver.discover_volume(volume_ref)
return path
@defer.inlineCallbacks
def remove_compute_volume(self, context, volume_id):
"""Remove remote volume on compute host."""
context = context.elevated()
volume_ref = self.db.volume_get(context, volume_id)
if volume_ref['host'] == self.host and FLAGS.use_local_volumes:
defer.returnValue(True)
return True
else:
yield self.driver.undiscover_volume(volume_ref)
self.driver.undiscover_volume(volume_ref)

View File

@ -39,10 +39,16 @@ Due to our use of multiprocessing it we frequently get some ignorable
"""
import eventlet
eventlet.monkey_patch()
import __main__
import gettext
import os
import sys
gettext.install('nova', unicode=1)
from twisted.scripts import trial as trial_script
from nova import flags
@ -56,15 +62,12 @@ from nova.tests.compute_unittest import *
from nova.tests.flags_unittest import *
from nova.tests.misc_unittest import *
from nova.tests.network_unittest import *
from nova.tests.objectstore_unittest import *
from nova.tests.process_unittest import *
#from nova.tests.objectstore_unittest import *
from nova.tests.quota_unittest import *
from nova.tests.rpc_unittest import *
from nova.tests.scheduler_unittest import *
from nova.tests.service_unittest import *
from nova.tests.twistd_unittest import *
from nova.tests.validator_unittest import *
from nova.tests.virt_unittest import *
from nova.tests.virt_unittest import *
from nova.tests.volume_unittest import *

View File

@ -22,3 +22,4 @@ mox==0.5.0
greenlet==0.3.1
nose
bzr
Twisted>=10.1.0