merge trunk

This commit is contained in:
Cory Wright 2011-05-31 09:40:01 -04:00
commit 6120fca7ff
24 changed files with 1008 additions and 196 deletions

View File

@ -30,6 +30,7 @@ Gabe Westmaas <gabe.westmaas@rackspace.com>
Hisaharu Ishii <ishii.hisaharu@lab.ntt.co.jp>
Hisaki Ohara <hisaki.ohara@intel.com>
Ilya Alekseyev <ialekseev@griddynamics.com>
Isaku Yamahata <yamahata@valinux.co.jp>
Jason Koelker <jason@koelker.net>
Jay Pipes <jaypipes@gmail.com>
Jesse Andrews <anotherjesse@gmail.com>

View File

@ -327,6 +327,12 @@ class Executor(wsgi.Application):
ec2_id = ec2utils.id_to_ec2_id(ex.volume_id, 'vol-%08x')
message = _('Volume %s not found') % ec2_id
return self._error(req, context, type(ex).__name__, message)
except exception.SnapshotNotFound as ex:
LOG.info(_('SnapshotNotFound raised: %s'), unicode(ex),
context=context)
ec2_id = ec2utils.id_to_ec2_id(ex.snapshot_id, 'snap-%08x')
message = _('Snapshot %s not found') % ec2_id
return self._error(req, context, type(ex).__name__, message)
except exception.NotFound as ex:
LOG.info(_('NotFound raised: %s'), unicode(ex), context=context)
return self._error(req, context, type(ex).__name__, unicode(ex))

View File

@ -283,14 +283,50 @@ class CloudController(object):
owner=None,
restorable_by=None,
**kwargs):
return {'snapshotSet': [{'snapshotId': 'fixme',
'volumeId': 'fixme',
'status': 'fixme',
'startTime': 'fixme',
'progress': 'fixme',
'ownerId': 'fixme',
'volumeSize': 0,
'description': 'fixme'}]}
if snapshot_id:
snapshots = []
for ec2_id in snapshot_id:
internal_id = ec2utils.ec2_id_to_id(ec2_id)
snapshot = self.volume_api.get_snapshot(
context,
snapshot_id=internal_id)
snapshots.append(snapshot)
else:
snapshots = self.volume_api.get_all_snapshots(context)
snapshots = [self._format_snapshot(context, s) for s in snapshots]
return {'snapshotSet': snapshots}
def _format_snapshot(self, context, snapshot):
s = {}
s['snapshotId'] = ec2utils.id_to_ec2_id(snapshot['id'], 'snap-%08x')
s['volumeId'] = ec2utils.id_to_ec2_id(snapshot['volume_id'],
'vol-%08x')
s['status'] = snapshot['status']
s['startTime'] = snapshot['created_at']
s['progress'] = snapshot['progress']
s['ownerId'] = snapshot['project_id']
s['volumeSize'] = snapshot['volume_size']
s['description'] = snapshot['display_description']
s['display_name'] = snapshot['display_name']
s['display_description'] = snapshot['display_description']
return s
def create_snapshot(self, context, volume_id, **kwargs):
LOG.audit(_("Create snapshot of volume %s"), volume_id,
context=context)
volume_id = ec2utils.ec2_id_to_id(volume_id)
snapshot = self.volume_api.create_snapshot(
context,
volume_id=volume_id,
name=kwargs.get('display_name'),
description=kwargs.get('display_description'))
return self._format_snapshot(context, snapshot)
def delete_snapshot(self, context, snapshot_id, **kwargs):
snapshot_id = ec2utils.ec2_id_to_id(snapshot_id)
self.volume_api.delete_snapshot(context, snapshot_id=snapshot_id)
return True
def describe_key_pairs(self, context, key_name=None, **kwargs):
key_pairs = db.key_pair_get_all_by_user(context, context.user_id)
@ -619,6 +655,11 @@ class CloudController(object):
'volumeId': v['volumeId']}]
else:
v['attachmentSet'] = [{}]
if volume.get('snapshot_id') != None:
v['snapshotId'] = ec2utils.id_to_ec2_id(volume['snapshot_id'],
'snap-%08x')
else:
v['snapshotId'] = None
v['display_name'] = volume['display_name']
v['display_description'] = volume['display_description']

View File

@ -1,4 +1,4 @@
NOVA_KEY_DIR=$(pushd $(dirname $BASH_SOURCE)>/dev/null; pwd; popd>/dev/null)
NOVA_KEY_DIR=$(dirname $(readlink -f ${BASH_SOURCE}))
export EC2_ACCESS_KEY="%(access)s:%(project)s"
export EC2_SECRET_KEY="%(secret)s"
export EC2_URL="%(ec2)s"

View File

@ -47,6 +47,8 @@ flags.DEFINE_string('instance_name_template', 'instance-%08x',
'Template string to be used to generate instance names')
flags.DEFINE_string('volume_name_template', 'volume-%08x',
'Template string to be used to generate instance names')
flags.DEFINE_string('snapshot_name_template', 'snapshot-%08x',
'Template string to be used to generate snapshot names')
IMPL = utils.LazyPluggable(FLAGS['db_backend'],
@ -881,6 +883,43 @@ def volume_update(context, volume_id, values):
####################
def snapshot_create(context, values):
"""Create a snapshot from the values dictionary."""
return IMPL.snapshot_create(context, values)
def snapshot_destroy(context, snapshot_id):
"""Destroy the snapshot or raise if it does not exist."""
return IMPL.snapshot_destroy(context, snapshot_id)
def snapshot_get(context, snapshot_id):
"""Get a snapshot or raise if it does not exist."""
return IMPL.snapshot_get(context, snapshot_id)
def snapshot_get_all(context):
"""Get all snapshots."""
return IMPL.snapshot_get_all(context)
def snapshot_get_all_by_project(context, project_id):
"""Get all snapshots belonging to a project."""
return IMPL.snapshot_get_all_by_project(context, project_id)
def snapshot_update(context, snapshot_id, values):
"""Set the given properties on an snapshot and update it.
Raises NotFound if snapshot does not exist.
"""
return IMPL.snapshot_update(context, snapshot_id, values)
####################
def security_group_get_all(context):
"""Get all security groups."""
return IMPL.security_group_get_all(context)

View File

@ -1789,6 +1789,82 @@ def volume_update(context, volume_id, values):
###################
@require_context
def snapshot_create(context, values):
snapshot_ref = models.Snapshot()
snapshot_ref.update(values)
session = get_session()
with session.begin():
snapshot_ref.save(session=session)
return snapshot_ref
@require_admin_context
def snapshot_destroy(context, snapshot_id):
session = get_session()
with session.begin():
session.query(models.Snapshot).\
filter_by(id=snapshot_id).\
update({'deleted': 1,
'deleted_at': datetime.datetime.utcnow(),
'updated_at': literal_column('updated_at')})
@require_context
def snapshot_get(context, snapshot_id, session=None):
if not session:
session = get_session()
result = None
if is_admin_context(context):
result = session.query(models.Snapshot).\
filter_by(id=snapshot_id).\
filter_by(deleted=can_read_deleted(context)).\
first()
elif is_user_context(context):
result = session.query(models.Snapshot).\
filter_by(project_id=context.project_id).\
filter_by(id=snapshot_id).\
filter_by(deleted=False).\
first()
if not result:
raise exception.SnapshotNotFound(snapshot_id=snapshot_id)
return result
@require_admin_context
def snapshot_get_all(context):
session = get_session()
return session.query(models.Snapshot).\
filter_by(deleted=can_read_deleted(context)).\
all()
@require_context
def snapshot_get_all_by_project(context, project_id):
authorize_project_context(context, project_id)
session = get_session()
return session.query(models.Snapshot).\
filter_by(project_id=project_id).\
filter_by(deleted=can_read_deleted(context)).\
all()
@require_context
def snapshot_update(context, snapshot_id, values):
session = get_session()
with session.begin():
snapshot_ref = snapshot_get(context, snapshot_id, session=session)
snapshot_ref.update(values)
snapshot_ref.save(session=session)
###################
@require_context
def security_group_get_all(context):
session = get_session()

View File

@ -0,0 +1,70 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 MORITA Kazutaka.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from sqlalchemy import Column, Table, MetaData
from sqlalchemy import Integer, DateTime, Boolean, String
from nova import log as logging
meta = MetaData()
snapshots = Table('snapshots', meta,
Column('created_at', DateTime(timezone=False)),
Column('updated_at', DateTime(timezone=False)),
Column('deleted_at', DateTime(timezone=False)),
Column('deleted', Boolean(create_constraint=True, name=None)),
Column('id', Integer(), primary_key=True, nullable=False),
Column('volume_id', Integer(), nullable=False),
Column('user_id',
String(length=255, convert_unicode=False, assert_unicode=None,
unicode_error=None, _warn_on_bytestring=False)),
Column('project_id',
String(length=255, convert_unicode=False, assert_unicode=None,
unicode_error=None, _warn_on_bytestring=False)),
Column('status',
String(length=255, convert_unicode=False, assert_unicode=None,
unicode_error=None, _warn_on_bytestring=False)),
Column('progress',
String(length=255, convert_unicode=False, assert_unicode=None,
unicode_error=None, _warn_on_bytestring=False)),
Column('volume_size', Integer()),
Column('scheduled_at', DateTime(timezone=False)),
Column('display_name',
String(length=255, convert_unicode=False, assert_unicode=None,
unicode_error=None, _warn_on_bytestring=False)),
Column('display_description',
String(length=255, convert_unicode=False, assert_unicode=None,
unicode_error=None, _warn_on_bytestring=False)))
def upgrade(migrate_engine):
# Upgrade operations go here. Don't create your own engine;
# bind migrate_engine to your metadata
meta.bind = migrate_engine
try:
snapshots.create()
except Exception:
logging.info(repr(snapshots))
logging.exception('Exception while creating table')
meta.drop_all(tables=[snapshots])
raise
def downgrade(migrate_engine):
# Operations to reverse the above upgrade go here.
snapshots.drop()

View File

@ -329,6 +329,31 @@ class Quota(BASE, NovaBase):
hard_limit = Column(Integer, nullable=True)
class Snapshot(BASE, NovaBase):
"""Represents a block storage device that can be attached to a vm."""
__tablename__ = 'snapshots'
id = Column(Integer, primary_key=True, autoincrement=True)
@property
def name(self):
return FLAGS.snapshot_name_template % self.id
@property
def volume_name(self):
return FLAGS.volume_name_template % self.volume_id
user_id = Column(String(255))
project_id = Column(String(255))
volume_id = Column(Integer)
status = Column(String(255))
progress = Column(String(255))
volume_size = Column(Integer)
display_name = Column(String(255))
display_description = Column(String(255))
class ExportDevice(BASE, NovaBase):
"""Represates a shelf and blade that a volume can be exported on."""
__tablename__ = 'export_devices'

View File

@ -271,6 +271,14 @@ class VolumeNotFoundForInstance(VolumeNotFound):
message = _("Volume not found for instance %(instance_id)s.")
class SnapshotNotFound(NotFound):
message = _("Snapshot %(snapshot_id)s could not be found.")
class VolumeIsBusy(Error):
message = _("deleting volume %(volume_name)s that has snapshot")
class ExportDeviceNotFoundForVolume(NotFound):
message = _("No export device found for volume %(volume_id)s.")

View File

@ -31,6 +31,7 @@ LOG = logging.getLogger("nova.fakerabbit")
EXCHANGES = {}
QUEUES = {}
CONSUMERS = {}
class Message(base.BaseMessage):
@ -96,17 +97,29 @@ class Backend(base.BaseBackend):
' key %(routing_key)s') % locals())
EXCHANGES[exchange].bind(QUEUES[queue].push, routing_key)
def declare_consumer(self, queue, callback, *args, **kwargs):
self.current_queue = queue
self.current_callback = callback
def declare_consumer(self, queue, callback, consumer_tag, *args, **kwargs):
global CONSUMERS
LOG.debug("Adding consumer %s", consumer_tag)
CONSUMERS[consumer_tag] = (queue, callback)
def cancel(self, consumer_tag):
global CONSUMERS
LOG.debug("Removing consumer %s", consumer_tag)
del CONSUMERS[consumer_tag]
def consume(self, limit=None):
global CONSUMERS
num = 0
while True:
item = self.get(self.current_queue)
if item:
self.current_callback(item)
raise StopIteration()
greenthread.sleep(0)
for (queue, callback) in CONSUMERS.itervalues():
item = self.get(queue)
if item:
callback(item)
num += 1
yield
if limit and num == limit:
raise StopIteration()
greenthread.sleep(0.1)
def get(self, queue, no_ack=False):
global QUEUES
@ -134,5 +147,7 @@ class Backend(base.BaseBackend):
def reset_all():
global EXCHANGES
global QUEUES
global CONSUMERS
EXCHANGES = {}
QUEUES = {}
CONSUMERS = {}

View File

@ -28,12 +28,15 @@ import json
import sys
import time
import traceback
import types
import uuid
from carrot import connection as carrot_connection
from carrot import messaging
from eventlet import greenpool
from eventlet import greenthread
from eventlet import pools
from eventlet import queue
import greenlet
from nova import context
from nova import exception
@ -47,7 +50,10 @@ LOG = logging.getLogger('nova.rpc')
FLAGS = flags.FLAGS
flags.DEFINE_integer('rpc_thread_pool_size', 1024, 'Size of RPC thread pool')
flags.DEFINE_integer('rpc_thread_pool_size', 1024,
'Size of RPC thread pool')
flags.DEFINE_integer('rpc_conn_pool_size', 30,
'Size of RPC connection pool')
class Connection(carrot_connection.BrokerConnection):
@ -90,6 +96,22 @@ class Connection(carrot_connection.BrokerConnection):
return cls.instance()
class Pool(pools.Pool):
"""Class that implements a Pool of Connections."""
# TODO(comstud): Timeout connections not used in a while
def create(self):
LOG.debug('Creating new connection')
return Connection.instance(new=True)
# Create a ConnectionPool to use for RPC calls. We'll order the
# pool as a stack (LIFO), so that we can potentially loop through and
# timeout old unused connections at some point
ConnectionPool = Pool(
max_size=FLAGS.rpc_conn_pool_size,
order_as_stack=True)
class Consumer(messaging.Consumer):
"""Consumer base class.
@ -131,7 +153,9 @@ class Consumer(messaging.Consumer):
self.connection = Connection.recreate()
self.backend = self.connection.create_backend()
self.declare()
super(Consumer, self).fetch(no_ack, auto_ack, enable_callbacks)
return super(Consumer, self).fetch(no_ack,
auto_ack,
enable_callbacks)
if self.failed_connection:
LOG.error(_('Reconnected to queue'))
self.failed_connection = False
@ -159,13 +183,13 @@ class AdapterConsumer(Consumer):
self.pool = greenpool.GreenPool(FLAGS.rpc_thread_pool_size)
super(AdapterConsumer, self).__init__(connection=connection,
topic=topic)
self.register_callback(self.process_data)
def receive(self, *args, **kwargs):
self.pool.spawn_n(self._receive, *args, **kwargs)
def process_data(self, message_data, message):
"""Consumer callback to call a method on a proxy object.
@exception.wrap_exception
def _receive(self, message_data, message):
"""Magically looks for a method on the proxy object and calls it.
Parses the message for validity and fires off a thread to call the
proxy object method.
Message data should be a dictionary with two keys:
method: string representing the method to call
@ -175,8 +199,8 @@ class AdapterConsumer(Consumer):
"""
LOG.debug(_('received %s') % message_data)
msg_id = message_data.pop('_msg_id', None)
# This will be popped off in _unpack_context
msg_id = message_data.get('_msg_id', None)
ctxt = _unpack_context(message_data)
method = message_data.get('method')
@ -188,8 +212,17 @@ class AdapterConsumer(Consumer):
# we just log the message and send an error string
# back to the caller
LOG.warn(_('no method for message: %s') % message_data)
msg_reply(msg_id, _('No method for message: %s') % message_data)
if msg_id:
msg_reply(msg_id,
_('No method for message: %s') % message_data)
return
self.pool.spawn_n(self._process_data, msg_id, ctxt, method, args)
@exception.wrap_exception
def _process_data(self, msg_id, ctxt, method, args):
"""Thread that maigcally looks for a method on the proxy
object and calls it.
"""
node_func = getattr(self.proxy, str(method))
node_args = dict((str(k), v) for k, v in args.iteritems())
@ -197,7 +230,18 @@ class AdapterConsumer(Consumer):
try:
rval = node_func(context=ctxt, **node_args)
if msg_id:
msg_reply(msg_id, rval, None)
# Check if the result was a generator
if isinstance(rval, types.GeneratorType):
for x in rval:
msg_reply(msg_id, x, None)
else:
msg_reply(msg_id, rval, None)
# This final None tells multicall that it is done.
msg_reply(msg_id, None, None)
elif isinstance(rval, types.GeneratorType):
# NOTE(vish): this iterates through the generator
list(rval)
except Exception as e:
logging.exception('Exception during message handling')
if msg_id:
@ -205,11 +249,6 @@ class AdapterConsumer(Consumer):
return
class Publisher(messaging.Publisher):
"""Publisher base class."""
pass
class TopicAdapterConsumer(AdapterConsumer):
"""Consumes messages on a specific topic."""
@ -242,6 +281,58 @@ class FanoutAdapterConsumer(AdapterConsumer):
topic=topic, proxy=proxy)
class ConsumerSet(object):
"""Groups consumers to listen on together on a single connection."""
def __init__(self, connection, consumer_list):
self.consumer_list = set(consumer_list)
self.consumer_set = None
self.enabled = True
self.init(connection)
def init(self, conn):
if not conn:
conn = Connection.instance(new=True)
if self.consumer_set:
self.consumer_set.close()
self.consumer_set = messaging.ConsumerSet(conn)
for consumer in self.consumer_list:
consumer.connection = conn
# consumer.backend is set for us
self.consumer_set.add_consumer(consumer)
def reconnect(self):
self.init(None)
def wait(self, limit=None):
running = True
while running:
it = self.consumer_set.iterconsume(limit=limit)
if not it:
break
while True:
try:
it.next()
except StopIteration:
return
except greenlet.GreenletExit:
running = False
break
except Exception as e:
LOG.exception(_("Exception while processing consumer"))
self.reconnect()
# Break to outer loop
break
def close(self):
self.consumer_set.close()
class Publisher(messaging.Publisher):
"""Publisher base class."""
pass
class TopicPublisher(Publisher):
"""Publishes messages on a specific topic."""
@ -306,16 +397,18 @@ def msg_reply(msg_id, reply=None, failure=None):
LOG.error(_("Returning exception %s to caller"), message)
LOG.error(tb)
failure = (failure[0].__name__, str(failure[1]), tb)
conn = Connection.instance()
publisher = DirectPublisher(connection=conn, msg_id=msg_id)
try:
publisher.send({'result': reply, 'failure': failure})
except TypeError:
publisher.send(
{'result': dict((k, repr(v))
for k, v in reply.__dict__.iteritems()),
'failure': failure})
publisher.close()
with ConnectionPool.item() as conn:
publisher = DirectPublisher(connection=conn, msg_id=msg_id)
try:
publisher.send({'result': reply, 'failure': failure})
except TypeError:
publisher.send(
{'result': dict((k, repr(v))
for k, v in reply.__dict__.iteritems()),
'failure': failure})
publisher.close()
class RemoteError(exception.Error):
@ -347,8 +440,9 @@ def _unpack_context(msg):
if key.startswith('_context_'):
value = msg.pop(key)
context_dict[key[9:]] = value
context_dict['msg_id'] = msg.pop('_msg_id', None)
LOG.debug(_('unpacked context: %s'), context_dict)
return context.RequestContext.from_dict(context_dict)
return RpcContext.from_dict(context_dict)
def _pack_context(msg, context):
@ -360,70 +454,112 @@ def _pack_context(msg, context):
for args at some point.
"""
context = dict([('_context_%s' % key, value)
for (key, value) in context.to_dict().iteritems()])
msg.update(context)
context_d = dict([('_context_%s' % key, value)
for (key, value) in context.to_dict().iteritems()])
msg.update(context_d)
def call(context, topic, msg):
"""Sends a message on a topic and wait for a response."""
class RpcContext(context.RequestContext):
def __init__(self, *args, **kwargs):
msg_id = kwargs.pop('msg_id', None)
self.msg_id = msg_id
super(RpcContext, self).__init__(*args, **kwargs)
def reply(self, *args, **kwargs):
msg_reply(self.msg_id, *args, **kwargs)
def multicall(context, topic, msg):
"""Make a call that returns multiple times."""
LOG.debug(_('Making asynchronous call on %s ...'), topic)
msg_id = uuid.uuid4().hex
msg.update({'_msg_id': msg_id})
LOG.debug(_('MSG_ID is %s') % (msg_id))
_pack_context(msg, context)
class WaitMessage(object):
def __call__(self, data, message):
"""Acks message and sets result."""
message.ack()
if data['failure']:
self.result = RemoteError(*data['failure'])
else:
self.result = data['result']
wait_msg = WaitMessage()
conn = Connection.instance()
consumer = DirectConsumer(connection=conn, msg_id=msg_id)
con_conn = ConnectionPool.get()
consumer = DirectConsumer(connection=con_conn, msg_id=msg_id)
wait_msg = MulticallWaiter(consumer)
consumer.register_callback(wait_msg)
conn = Connection.instance()
publisher = TopicPublisher(connection=conn, topic=topic)
publisher = TopicPublisher(connection=con_conn, topic=topic)
publisher.send(msg)
publisher.close()
try:
consumer.wait(limit=1)
except StopIteration:
pass
consumer.close()
# NOTE(termie): this is a little bit of a change from the original
# non-eventlet code where returning a Failure
# instance from a deferred call is very similar to
# raising an exception
if isinstance(wait_msg.result, Exception):
raise wait_msg.result
return wait_msg.result
return wait_msg
class MulticallWaiter(object):
def __init__(self, consumer):
self._consumer = consumer
self._results = queue.Queue()
self._closed = False
def close(self):
self._closed = True
self._consumer.close()
ConnectionPool.put(self._consumer.connection)
def __call__(self, data, message):
"""Acks message and sets result."""
message.ack()
if data['failure']:
self._results.put(RemoteError(*data['failure']))
else:
self._results.put(data['result'])
def __iter__(self):
return self.wait()
def wait(self):
while True:
rv = None
while rv is None and not self._closed:
try:
rv = self._consumer.fetch(enable_callbacks=True)
except Exception:
self.close()
raise
time.sleep(0.01)
result = self._results.get()
if isinstance(result, Exception):
self.close()
raise result
if result == None:
self.close()
raise StopIteration
yield result
def call(context, topic, msg):
"""Sends a message on a topic and wait for a response."""
rv = multicall(context, topic, msg)
# NOTE(vish): return the last result from the multicall
rv = list(rv)
if not rv:
return
return rv[-1]
def cast(context, topic, msg):
"""Sends a message on a topic without waiting for a response."""
LOG.debug(_('Making asynchronous cast on %s...'), topic)
_pack_context(msg, context)
conn = Connection.instance()
publisher = TopicPublisher(connection=conn, topic=topic)
publisher.send(msg)
publisher.close()
with ConnectionPool.item() as conn:
publisher = TopicPublisher(connection=conn, topic=topic)
publisher.send(msg)
publisher.close()
def fanout_cast(context, topic, msg):
"""Sends a message on a fanout exchange without waiting for a response."""
LOG.debug(_('Making asynchronous fanout cast...'))
_pack_context(msg, context)
conn = Connection.instance()
publisher = FanoutPublisher(topic, connection=conn)
publisher.send(msg)
publisher.close()
with ConnectionPool.item() as conn:
publisher = FanoutPublisher(topic, connection=conn)
publisher.send(msg)
publisher.close()
def generic_response(message_data, message):
@ -459,6 +595,7 @@ def send_message(topic, message, wait=True):
if wait:
consumer.wait()
consumer.close()
if __name__ == '__main__':

View File

@ -19,14 +19,11 @@
"""Generic Node baseclass for all workers that run on hosts."""
import greenlet
import inspect
import os
import sys
import time
from eventlet import event
from eventlet import greenthread
from eventlet import greenpool
from nova import context
from nova import db
@ -91,27 +88,37 @@ class Service(object):
if 'nova-compute' == self.binary:
self.manager.update_available_resource(ctxt)
conn1 = rpc.Connection.instance(new=True)
conn2 = rpc.Connection.instance(new=True)
conn3 = rpc.Connection.instance(new=True)
self.conn = rpc.Connection.instance(new=True)
logging.debug("Creating Consumer connection for Service %s" %
self.topic)
# Share this same connection for these Consumers
consumer_all = rpc.TopicAdapterConsumer(
connection=self.conn,
topic=self.topic,
proxy=self)
consumer_node = rpc.TopicAdapterConsumer(
connection=self.conn,
topic='%s.%s' % (self.topic, self.host),
proxy=self)
fanout = rpc.FanoutAdapterConsumer(
connection=self.conn,
topic=self.topic,
proxy=self)
consumer_set = rpc.ConsumerSet(
connection=self.conn,
consumer_list=[consumer_all, consumer_node, fanout])
# Wait forever, processing these consumers
def _wait():
try:
consumer_set.wait()
finally:
consumer_set.close()
self.consumer_set_thread = greenthread.spawn(_wait)
if self.report_interval:
consumer_all = rpc.TopicAdapterConsumer(
connection=conn1,
topic=self.topic,
proxy=self)
consumer_node = rpc.TopicAdapterConsumer(
connection=conn2,
topic='%s.%s' % (self.topic, self.host),
proxy=self)
fanout = rpc.FanoutAdapterConsumer(
connection=conn3,
topic=self.topic,
proxy=self)
self.timers.append(consumer_all.attach_to_eventlet())
self.timers.append(consumer_node.attach_to_eventlet())
self.timers.append(fanout.attach_to_eventlet())
pulse = utils.LoopingCall(self.report_state)
pulse.start(interval=self.report_interval, now=False)
self.timers.append(pulse)
@ -174,6 +181,11 @@ class Service(object):
logging.warn(_('Service killed that has no database entry'))
def stop(self):
self.consumer_set_thread.kill()
try:
self.consumer_set_thread.wait()
except greenlet.GreenletExit:
pass
for x in self.timers:
try:
x.stop()

View File

@ -31,17 +31,15 @@ import uuid
import unittest
import mox
import shutil
import stubout
from eventlet import greenthread
from nova import context
from nova import db
from nova import fakerabbit
from nova import flags
from nova import rpc
from nova import service
from nova import wsgi
from nova.virt import fake
FLAGS = flags.FLAGS
@ -85,6 +83,7 @@ class TestCase(unittest.TestCase):
self._monkey_patch_attach()
self._monkey_patch_wsgi()
self._original_flags = FLAGS.FlagValuesDict()
rpc.ConnectionPool = rpc.Pool(max_size=FLAGS.rpc_conn_pool_size)
def tearDown(self):
"""Runs after each test method to tear down test environment."""
@ -99,6 +98,10 @@ class TestCase(unittest.TestCase):
if FLAGS.fake_rabbit:
fakerabbit.reset_all()
if FLAGS.connection_type == 'fake':
if hasattr(fake.FakeConnection, '_instance'):
del fake.FakeConnection._instance
# Reset any overriden flags
self.reset_flags()

View File

@ -154,10 +154,7 @@ class _IntegratedTestBase(test.TestCase):
# set up services
self.start_service('compute')
self.start_service('volume')
# NOTE(justinsb): There's a bug here which is eluding me...
# If we start the network_service, all is good, but then subsequent
# tests fail: CloudTestCase.test_ajax_console in particular.
#self.start_service('network')
self.start_service('network')
self.start_service('scheduler')
self._start_api_service()

View File

@ -17,13 +17,9 @@
# under the License.
from base64 import b64decode
import json
from M2Crypto import BIO
from M2Crypto import RSA
import os
import shutil
import tempfile
import time
from eventlet import greenthread
@ -33,12 +29,10 @@ from nova import db
from nova import flags
from nova import log as logging
from nova import rpc
from nova import service
from nova import test
from nova import utils
from nova import exception
from nova.auth import manager
from nova.compute import power_state
from nova.api.ec2 import cloud
from nova.api.ec2 import ec2utils
from nova.image import local
@ -79,14 +73,21 @@ class CloudTestCase(test.TestCase):
self.stubs.Set(local.LocalImageService, 'show', fake_show)
self.stubs.Set(local.LocalImageService, 'show_by_name', fake_show)
# NOTE(vish): set up a manual wait so rpc.cast has a chance to finish
rpc_cast = rpc.cast
def finish_cast(*args, **kwargs):
rpc_cast(*args, **kwargs)
greenthread.sleep(0.2)
self.stubs.Set(rpc, 'cast', finish_cast)
def tearDown(self):
network_ref = db.project_get_network(self.context,
self.project.id)
db.network_disassociate(self.context, network_ref['id'])
self.manager.delete_project(self.project)
self.manager.delete_user(self.user)
self.compute.kill()
self.network.kill()
super(CloudTestCase, self).tearDown()
def _create_key(self, name):
@ -113,7 +114,6 @@ class CloudTestCase(test.TestCase):
self.cloud.describe_addresses(self.context)
self.cloud.release_address(self.context,
public_ip=address)
greenthread.sleep(0.3)
db.floating_ip_destroy(self.context, address)
def test_associate_disassociate_address(self):
@ -129,12 +129,10 @@ class CloudTestCase(test.TestCase):
self.cloud.associate_address(self.context,
instance_id=ec2_id,
public_ip=address)
greenthread.sleep(0.3)
self.cloud.disassociate_address(self.context,
public_ip=address)
self.cloud.release_address(self.context,
public_ip=address)
greenthread.sleep(0.3)
self.network.deallocate_fixed_ip(self.context, fixed)
db.instance_destroy(self.context, inst['id'])
db.floating_ip_destroy(self.context, address)
@ -188,6 +186,52 @@ class CloudTestCase(test.TestCase):
db.service_destroy(self.context, service1['id'])
db.service_destroy(self.context, service2['id'])
def test_describe_snapshots(self):
"""Makes sure describe_snapshots works and filters results."""
vol = db.volume_create(self.context, {})
snap1 = db.snapshot_create(self.context, {'volume_id': vol['id']})
snap2 = db.snapshot_create(self.context, {'volume_id': vol['id']})
result = self.cloud.describe_snapshots(self.context)
self.assertEqual(len(result['snapshotSet']), 2)
snapshot_id = ec2utils.id_to_ec2_id(snap2['id'], 'snap-%08x')
result = self.cloud.describe_snapshots(self.context,
snapshot_id=[snapshot_id])
self.assertEqual(len(result['snapshotSet']), 1)
self.assertEqual(
ec2utils.ec2_id_to_id(result['snapshotSet'][0]['snapshotId']),
snap2['id'])
db.snapshot_destroy(self.context, snap1['id'])
db.snapshot_destroy(self.context, snap2['id'])
db.volume_destroy(self.context, vol['id'])
def test_create_snapshot(self):
"""Makes sure create_snapshot works."""
vol = db.volume_create(self.context, {'status': "available"})
volume_id = ec2utils.id_to_ec2_id(vol['id'], 'vol-%08x')
result = self.cloud.create_snapshot(self.context,
volume_id=volume_id)
snapshot_id = result['snapshotId']
result = self.cloud.describe_snapshots(self.context)
self.assertEqual(len(result['snapshotSet']), 1)
self.assertEqual(result['snapshotSet'][0]['snapshotId'], snapshot_id)
db.snapshot_destroy(self.context, ec2utils.ec2_id_to_id(snapshot_id))
db.volume_destroy(self.context, vol['id'])
def test_delete_snapshot(self):
"""Makes sure delete_snapshot works."""
vol = db.volume_create(self.context, {'status': "available"})
snap = db.snapshot_create(self.context, {'volume_id': vol['id'],
'status': "available"})
snapshot_id = ec2utils.id_to_ec2_id(snap['id'], 'snap-%08x')
result = self.cloud.delete_snapshot(self.context,
snapshot_id=snapshot_id)
self.assertTrue(result)
db.volume_destroy(self.context, vol['id'])
def test_describe_instances(self):
"""Makes sure describe_instances works and filters results."""
inst1 = db.instance_create(self.context, {'reservation_id': 'a',
@ -306,31 +350,25 @@ class CloudTestCase(test.TestCase):
'instance_type': instance_type,
'max_count': max_count}
rv = self.cloud.run_instances(self.context, **kwargs)
greenthread.sleep(0.3)
instance_id = rv['instancesSet'][0]['instanceId']
output = self.cloud.get_console_output(context=self.context,
instance_id=[instance_id])
self.assertEquals(b64decode(output['output']), 'FAKE CONSOLE?OUTPUT')
# TODO(soren): We need this until we can stop polling in the rpc code
# for unit tests.
greenthread.sleep(0.3)
rv = self.cloud.terminate_instances(self.context, [instance_id])
greenthread.sleep(0.3)
def test_ajax_console(self):
kwargs = {'image_id': 'ami-1'}
rv = self.cloud.run_instances(self.context, **kwargs)
instance_id = rv['instancesSet'][0]['instanceId']
greenthread.sleep(0.3)
output = self.cloud.get_ajax_console(context=self.context,
instance_id=[instance_id])
self.assertEquals(output['url'],
'%s/?token=FAKETOKEN' % FLAGS.ajax_console_proxy_url)
# TODO(soren): We need this until we can stop polling in the rpc code
# for unit tests.
greenthread.sleep(0.3)
rv = self.cloud.terminate_instances(self.context, [instance_id])
greenthread.sleep(0.3)
def test_key_generation(self):
result = self._create_key('test')

View File

@ -31,7 +31,6 @@ LOG = logging.getLogger('nova.tests.rpc')
class RpcTestCase(test.TestCase):
"""Test cases for rpc"""
def setUp(self):
super(RpcTestCase, self).setUp()
self.conn = rpc.Connection.instance(True)
@ -43,14 +42,55 @@ class RpcTestCase(test.TestCase):
self.context = context.get_admin_context()
def test_call_succeed(self):
"""Get a value through rpc call"""
value = 42
result = rpc.call(self.context, 'test', {"method": "echo",
"args": {"value": value}})
self.assertEqual(value, result)
def test_call_succeed_despite_multiple_returns(self):
value = 42
result = rpc.call(self.context, 'test', {"method": "echo_three_times",
"args": {"value": value}})
self.assertEqual(value + 2, result)
def test_call_succeed_despite_multiple_returns_yield(self):
value = 42
result = rpc.call(self.context, 'test',
{"method": "echo_three_times_yield",
"args": {"value": value}})
self.assertEqual(value + 2, result)
def test_multicall_succeed_once(self):
value = 42
result = rpc.multicall(self.context,
'test',
{"method": "echo",
"args": {"value": value}})
for i, x in enumerate(result):
if i > 0:
self.fail('should only receive one response')
self.assertEqual(value + i, x)
def test_multicall_succeed_three_times(self):
value = 42
result = rpc.multicall(self.context,
'test',
{"method": "echo_three_times",
"args": {"value": value}})
for i, x in enumerate(result):
self.assertEqual(value + i, x)
def test_multicall_succeed_three_times_yield(self):
value = 42
result = rpc.multicall(self.context,
'test',
{"method": "echo_three_times_yield",
"args": {"value": value}})
for i, x in enumerate(result):
self.assertEqual(value + i, x)
def test_context_passed(self):
"""Makes sure a context is passed through rpc call"""
"""Makes sure a context is passed through rpc call."""
value = 42
result = rpc.call(self.context,
'test', {"method": "context",
@ -58,11 +98,12 @@ class RpcTestCase(test.TestCase):
self.assertEqual(self.context.to_dict(), result)
def test_call_exception(self):
"""Test that exception gets passed back properly
"""Test that exception gets passed back properly.
rpc.call returns a RemoteError object. The value of the
exception is converted to a string, so we convert it back
to an int in the test.
"""
value = 42
self.assertRaises(rpc.RemoteError,
@ -81,7 +122,7 @@ class RpcTestCase(test.TestCase):
self.assertEqual(int(exc.value), value)
def test_nested_calls(self):
"""Test that we can do an rpc.call inside another call"""
"""Test that we can do an rpc.call inside another call."""
class Nested(object):
@staticmethod
def echo(context, queue, value):
@ -108,25 +149,80 @@ class RpcTestCase(test.TestCase):
"value": value}})
self.assertEqual(value, result)
def test_connectionpool_single(self):
"""Test that ConnectionPool recycles a single connection."""
conn1 = rpc.ConnectionPool.get()
rpc.ConnectionPool.put(conn1)
conn2 = rpc.ConnectionPool.get()
rpc.ConnectionPool.put(conn2)
self.assertEqual(conn1, conn2)
def test_connectionpool_double(self):
"""Test that ConnectionPool returns and reuses separate connections.
When called consecutively we should get separate connections and upon
returning them those connections should be reused for future calls
before generating a new connection.
"""
conn1 = rpc.ConnectionPool.get()
conn2 = rpc.ConnectionPool.get()
self.assertNotEqual(conn1, conn2)
rpc.ConnectionPool.put(conn1)
rpc.ConnectionPool.put(conn2)
conn3 = rpc.ConnectionPool.get()
conn4 = rpc.ConnectionPool.get()
self.assertEqual(conn1, conn3)
self.assertEqual(conn2, conn4)
def test_connectionpool_limit(self):
"""Test connection pool limit and connection uniqueness."""
max_size = FLAGS.rpc_conn_pool_size
conns = []
for i in xrange(max_size):
conns.append(rpc.ConnectionPool.get())
self.assertFalse(rpc.ConnectionPool.free_items)
self.assertEqual(rpc.ConnectionPool.current_size,
rpc.ConnectionPool.max_size)
self.assertEqual(len(set(conns)), max_size)
class TestReceiver(object):
"""Simple Proxy class so the consumer has methods to call
"""Simple Proxy class so the consumer has methods to call.
Uses static methods because we aren't actually storing any state"""
Uses static methods because we aren't actually storing any state.
"""
@staticmethod
def echo(context, value):
"""Simply returns whatever value is sent in"""
"""Simply returns whatever value is sent in."""
LOG.debug(_("Received %s"), value)
return value
@staticmethod
def context(context, value):
"""Returns dictionary version of context"""
"""Returns dictionary version of context."""
LOG.debug(_("Received %s"), context)
return context.to_dict()
@staticmethod
def echo_three_times(context, value):
context.reply(value)
context.reply(value + 1)
context.reply(value + 2)
@staticmethod
def echo_three_times_yield(context, value):
yield value
yield value + 1
yield value + 2
@staticmethod
def fail(context, value):
"""Raises an exception with the value sent in"""
"""Raises an exception with the value sent in."""
raise Exception(value)

View File

@ -106,7 +106,10 @@ class ServiceTestCase(test.TestCase):
# NOTE(vish): Create was moved out of mox replay to make sure that
# the looping calls are created in StartService.
app = service.Service.create(host=host, binary=binary)
app = service.Service.create(host=host, binary=binary, topic=topic)
self.mox.StubOutWithMock(service.rpc.Connection, 'instance')
service.rpc.Connection.instance(new=mox.IgnoreArg())
self.mox.StubOutWithMock(rpc,
'TopicAdapterConsumer',
@ -114,6 +117,11 @@ class ServiceTestCase(test.TestCase):
self.mox.StubOutWithMock(rpc,
'FanoutAdapterConsumer',
use_mock_anything=True)
self.mox.StubOutWithMock(rpc,
'ConsumerSet',
use_mock_anything=True)
rpc.TopicAdapterConsumer(connection=mox.IgnoreArg(),
topic=topic,
proxy=mox.IsA(service.Service)).AndReturn(
@ -129,9 +137,14 @@ class ServiceTestCase(test.TestCase):
proxy=mox.IsA(service.Service)).AndReturn(
rpc.FanoutAdapterConsumer)
rpc.TopicAdapterConsumer.attach_to_eventlet()
rpc.TopicAdapterConsumer.attach_to_eventlet()
rpc.FanoutAdapterConsumer.attach_to_eventlet()
def wait_func(self, limit=None):
return None
mock_cset = self.mox.CreateMock(rpc.ConsumerSet,
{'wait': wait_func})
rpc.ConsumerSet(connection=mox.IgnoreArg(),
consumer_list=mox.IsA(list)).AndReturn(mock_cset)
wait_func(mox.IgnoreArg())
service_create = {'host': host,
'binary': binary,
@ -287,8 +300,42 @@ class ServiceTestCase(test.TestCase):
# Creating mocks
self.mox.StubOutWithMock(service.rpc.Connection, 'instance')
service.rpc.Connection.instance(new=mox.IgnoreArg())
service.rpc.Connection.instance(new=mox.IgnoreArg())
service.rpc.Connection.instance(new=mox.IgnoreArg())
self.mox.StubOutWithMock(rpc,
'TopicAdapterConsumer',
use_mock_anything=True)
self.mox.StubOutWithMock(rpc,
'FanoutAdapterConsumer',
use_mock_anything=True)
self.mox.StubOutWithMock(rpc,
'ConsumerSet',
use_mock_anything=True)
rpc.TopicAdapterConsumer(connection=mox.IgnoreArg(),
topic=topic,
proxy=mox.IsA(service.Service)).AndReturn(
rpc.TopicAdapterConsumer)
rpc.TopicAdapterConsumer(connection=mox.IgnoreArg(),
topic='%s.%s' % (topic, host),
proxy=mox.IsA(service.Service)).AndReturn(
rpc.TopicAdapterConsumer)
rpc.FanoutAdapterConsumer(connection=mox.IgnoreArg(),
topic=topic,
proxy=mox.IsA(service.Service)).AndReturn(
rpc.FanoutAdapterConsumer)
def wait_func(self, limit=None):
return None
mock_cset = self.mox.CreateMock(rpc.ConsumerSet,
{'wait': wait_func})
rpc.ConsumerSet(connection=mox.IgnoreArg(),
consumer_list=mox.IsA(list)).AndReturn(mock_cset)
wait_func(mox.IgnoreArg())
self.mox.StubOutWithMock(serv.manager.driver,
'update_available_resource')
serv.manager.driver.update_available_resource(mox.IgnoreArg(), host)

View File

@ -176,6 +176,34 @@ class VolumeTestCase(test.TestCase):
# This will allow us to test cross-node interactions
pass
@staticmethod
def _create_snapshot(volume_id, size='0'):
"""Create a snapshot object."""
snap = {}
snap['volume_size'] = size
snap['user_id'] = 'fake'
snap['project_id'] = 'fake'
snap['volume_id'] = volume_id
snap['status'] = "creating"
return db.snapshot_create(context.get_admin_context(), snap)['id']
def test_create_delete_snapshot(self):
"""Test snapshot can be created and deleted."""
volume_id = self._create_volume()
self.volume.create_volume(self.context, volume_id)
snapshot_id = self._create_snapshot(volume_id)
self.volume.create_snapshot(self.context, volume_id, snapshot_id)
self.assertEqual(snapshot_id,
db.snapshot_get(context.get_admin_context(),
snapshot_id).id)
self.volume.delete_snapshot(self.context, snapshot_id)
self.assertRaises(exception.NotFound,
db.snapshot_get,
self.context,
snapshot_id)
self.volume.delete_volume(self.context, volume_id)
class DriverTestCase(test.TestCase):
"""Base Test class for Drivers."""

View File

@ -592,11 +592,29 @@ class XenAPIDiffieHellmanTestCase(test.TestCase):
bob_shared = self.bob.compute_shared(alice_pub)
self.assertEquals(alice_shared, bob_shared)
def test_encryption(self):
msg = "This is a top-secret message"
enc = self.alice.encrypt(msg)
def _test_encryption(self, message):
enc = self.alice.encrypt(message)
self.assertFalse(enc.endswith('\n'))
dec = self.bob.decrypt(enc)
self.assertEquals(dec, msg)
self.assertEquals(dec, message)
def test_encrypt_simple_message(self):
self._test_encryption('This is a simple message.')
def test_encrypt_message_with_newlines_at_end(self):
self._test_encryption('This message has a newline at the end.\n')
def test_encrypt_many_newlines_at_end(self):
self._test_encryption('Message with lotsa newlines.\n\n\n')
def test_encrypt_newlines_inside_message(self):
self._test_encryption('Message\nwith\ninterior\nnewlines.')
def test_encrypt_with_leading_newlines(self):
self._test_encryption('\n\nMessage with leading newlines.')
def test_encrypt_really_long_message(self):
self._test_encryption(''.join(['abcd' for i in xrange(1024)]))
def tearDown(self):
super(XenAPIDiffieHellmanTestCase, self).tearDown()

View File

@ -1190,26 +1190,22 @@ class SimpleDH(object):
mpi = M2Crypto.m2.bn_to_mpi(bn)
return mpi
def _run_ssl(self, text, which):
base_cmd = ('openssl enc -aes-128-cbc -a -pass pass:%(shared)s '
'-nosalt %(dec_flag)s')
if which.lower()[0] == 'd':
dec_flag = ' -d'
else:
dec_flag = ''
shared = self._shared
cmd = base_cmd % locals()
proc = _runproc(cmd)
proc.stdin.write(text + '\n')
def _run_ssl(self, text, extra_args=None):
if not extra_args:
extra_args = ''
cmd = 'enc -aes-128-cbc -A -a -pass pass:%s -nosalt %s' % (
self._shared, extra_args)
proc = _runproc('openssl %s' % cmd)
proc.stdin.write(text)
proc.stdin.close()
proc.wait()
err = proc.stderr.read()
if err:
raise RuntimeError(_('OpenSSL error: %s') % err)
return proc.stdout.read().strip('\n')
return proc.stdout.read()
def encrypt(self, text):
return self._run_ssl(text, 'enc')
return self._run_ssl(text).strip('\n')
def decrypt(self, text):
return self._run_ssl(text, 'dec')
return self._run_ssl(text, '-d')

View File

@ -90,6 +90,15 @@ class API(base.Base):
return self.db.volume_get_all(context)
return self.db.volume_get_all_by_project(context, context.project_id)
def get_snapshot(self, context, snapshot_id):
rv = self.db.snapshot_get(context, snapshot_id)
return dict(rv.iteritems())
def get_all_snapshots(self, context):
if context.is_admin:
return self.db.snapshot_get_all(context)
return self.db.snapshot_get_all_by_project(context, context.project_id)
def check_attach(self, context, volume_id):
volume = self.get(context, volume_id)
# TODO(vish): abstract status checking?
@ -110,3 +119,38 @@ class API(base.Base):
self.db.queue_get_for(context, FLAGS.compute_topic, host),
{"method": "remove_volume",
"args": {'volume_id': volume_id}})
def create_snapshot(self, context, volume_id, name, description):
volume = self.get(context, volume_id)
if volume['status'] != "available":
raise exception.ApiError(_("Volume status must be available"))
options = {
'volume_id': volume_id,
'user_id': context.user_id,
'project_id': context.project_id,
'status': "creating",
'progress': '0%',
'volume_size': volume['size'],
'display_name': name,
'display_description': description}
snapshot = self.db.snapshot_create(context, options)
rpc.cast(context,
FLAGS.scheduler_topic,
{"method": "create_snapshot",
"args": {"topic": FLAGS.volume_topic,
"volume_id": volume_id,
"snapshot_id": snapshot['id']}})
return snapshot
def delete_snapshot(self, context, snapshot_id):
snapshot = self.get_snapshot(context, snapshot_id)
if snapshot['status'] != "available":
raise exception.ApiError(_("Snapshot status must be available"))
self.db.snapshot_update(context, snapshot_id, {'status': 'deleting'})
rpc.cast(context,
FLAGS.scheduler_topic,
{"method": "delete_snapshot",
"args": {"topic": FLAGS.volume_topic,
"snapshot_id": snapshot_id}})

View File

@ -90,42 +90,91 @@ class VolumeDriver(object):
raise exception.Error(_("volume group %s doesn't exist")
% FLAGS.volume_group)
def _create_volume(self, volume_name, sizestr):
self._try_execute('sudo', 'lvcreate', '-L', sizestr, '-n',
volume_name, FLAGS.volume_group)
def _copy_volume(self, srcstr, deststr, size_in_g):
self._execute('sudo', 'dd', 'if=%s' % srcstr, 'of=%s' % deststr,
'count=%d' % (size_in_g * 1024), 'bs=1M')
def _volume_not_present(self, volume_name):
path_name = '%s/%s' % (FLAGS.volume_group, volume_name)
try:
self._try_execute('sudo', 'lvdisplay', path_name)
except Exception as e:
# If the volume isn't present
return True
return False
def _delete_volume(self, volume, size_in_g):
"""Deletes a logical volume."""
# zero out old volumes to prevent data leaking between users
# TODO(ja): reclaiming space should be done lazy and low priority
self._copy_volume('/dev/zero', self.local_path(volume), size_in_g)
self._try_execute('sudo', 'lvremove', '-f', "%s/%s" %
(FLAGS.volume_group,
self._escape_snapshot(volume['name'])))
def _sizestr(self, size_in_g):
if int(size_in_g) == 0:
return '100M'
return '%sG' % size_in_g
# Linux LVM reserves name that starts with snapshot, so that
# such volume name can't be created. Mangle it.
def _escape_snapshot(self, snapshot_name):
if not snapshot_name.startswith('snapshot'):
return snapshot_name
return '_' + snapshot_name
def create_volume(self, volume):
"""Creates a logical volume. Can optionally return a Dictionary of
changes to the volume object to be persisted."""
if int(volume['size']) == 0:
sizestr = '100M'
else:
sizestr = '%sG' % volume['size']
self._try_execute('sudo', 'lvcreate', '-L', sizestr, '-n',
volume['name'],
FLAGS.volume_group)
self._create_volume(volume['name'], self._sizestr(volume['size']))
def delete_volume(self, volume):
"""Deletes a logical volume."""
try:
self._try_execute('sudo', 'lvdisplay',
'%s/%s' %
(FLAGS.volume_group,
volume['name']))
except Exception as e:
if self._volume_not_present(volume['name']):
# If the volume isn't present, then don't attempt to delete
return True
# zero out old volumes to prevent data leaking between users
# TODO(ja): reclaiming space should be done lazy and low priority
self._execute('sudo', 'dd', 'if=/dev/zero',
'of=%s' % self.local_path(volume),
'count=%d' % (volume['size'] * 1024),
'bs=1M')
self._try_execute('sudo', 'lvremove', '-f', "%s/%s" %
(FLAGS.volume_group,
volume['name']))
# TODO(yamahata): lvm can't delete origin volume only without
# deleting derived snapshots. Can we do something fancy?
out, err = self._execute('sudo', 'lvdisplay', '--noheading',
'-C', '-o', 'Attr',
'%s/%s' % (FLAGS.volume_group,
volume['name']))
# fake_execute returns None resulting unit test error
if out:
out = out.strip()
if (out[0] == 'o') or (out[0] == 'O'):
raise exception.VolumeIsBusy(volume_name=volume['name'])
self._delete_volume(volume, volume['size'])
def create_snapshot(self, snapshot):
"""Creates a snapshot."""
orig_lv_name = "%s/%s" % (FLAGS.volume_group, snapshot['volume_name'])
self._try_execute('sudo', 'lvcreate', '-L',
self._sizestr(snapshot['volume_size']),
'--name', self._escape_snapshot(snapshot['name']),
'--snapshot', orig_lv_name)
def delete_snapshot(self, snapshot):
"""Deletes a snapshot."""
if self._volume_not_present(self._escape_snapshot(snapshot['name'])):
# If the snapshot isn't present, then don't attempt to delete
return True
# TODO(yamahata): zeroing out the whole snapshot triggers COW.
# it's quite slow.
self._delete_volume(snapshot, snapshot['volume_size'])
def local_path(self, volume):
# NOTE(vish): stops deprecation warning
escaped_group = FLAGS.volume_group.replace('-', '--')
escaped_name = volume['name'].replace('-', '--')
escaped_name = self._escape_snapshot(volume['name']).replace('-', '--')
return "/dev/mapper/%s-%s" % (escaped_group, escaped_name)
def ensure_export(self, context, volume):
@ -559,6 +608,18 @@ class RBDDriver(VolumeDriver):
self._try_execute('rbd', '--pool', FLAGS.rbd_pool,
'rm', volume['name'])
def create_snapshot(self, snapshot):
"""Creates an rbd snapshot"""
self._try_execute('rbd', '--pool', FLAGS.rbd_pool,
'snap', 'create', '--snap', snapshot['name'],
snapshot['volume_name'])
def delete_snapshot(self, snapshot):
"""Deletes an rbd snapshot"""
self._try_execute('rbd', '--pool', FLAGS.rbd_pool,
'snap', 'rm', '--snap', snapshot['name'],
snapshot['volume_name'])
def local_path(self, volume):
"""Returns the path of the rbd volume."""
# This is the same as the remote path
@ -600,18 +661,24 @@ class SheepdogDriver(VolumeDriver):
def create_volume(self, volume):
"""Creates a sheepdog volume"""
if int(volume['size']) == 0:
sizestr = '100M'
else:
sizestr = '%sG' % volume['size']
self._try_execute('qemu-img', 'create',
"sheepdog:%s" % volume['name'],
sizestr)
self._sizestr(volume['size']))
def delete_volume(self, volume):
"""Deletes a logical volume"""
self._try_execute('collie', 'vdi', 'delete', volume['name'])
def create_snapshot(self, snapshot):
"""Creates a sheepdog snapshot"""
self._try_execute('qemu-img', 'snapshot', '-c', snapshot['name'],
"sheepdog:%s" % snapshot['volume_name'])
def delete_snapshot(self, snapshot):
"""Deletes a sheepdog snapshot"""
self._try_execute('collie', 'vdi', 'delete', snapshot['volume_name'],
'-s', snapshot['name'])
def local_path(self, volume):
return "sheepdog:%s" % volume['name']

View File

@ -142,6 +142,12 @@ class VolumeManager(manager.SchedulerDependentManager):
self.driver.remove_export(context, volume_ref)
LOG.debug(_("volume %s: deleting"), volume_ref['name'])
self.driver.delete_volume(volume_ref)
except exception.VolumeIsBusy, e:
LOG.debug(_("volume %s: volume is busy"), volume_ref['name'])
self.driver.ensure_export(context, volume_ref)
self.db.volume_update(context, volume_ref['id'],
{'status': 'available'})
return True
except Exception:
self.db.volume_update(context,
volume_ref['id'],
@ -152,6 +158,49 @@ class VolumeManager(manager.SchedulerDependentManager):
LOG.debug(_("volume %s: deleted successfully"), volume_ref['name'])
return True
def create_snapshot(self, context, volume_id, snapshot_id):
"""Creates and exports the snapshot."""
context = context.elevated()
snapshot_ref = self.db.snapshot_get(context, snapshot_id)
LOG.info(_("snapshot %s: creating"), snapshot_ref['name'])
try:
snap_name = snapshot_ref['name']
LOG.debug(_("snapshot %(snap_name)s: creating") % locals())
model_update = self.driver.create_snapshot(snapshot_ref)
if model_update:
self.db.snapshot_update(context, snapshot_ref['id'],
model_update)
except Exception:
self.db.snapshot_update(context,
snapshot_ref['id'], {'status': 'error'})
raise
self.db.snapshot_update(context,
snapshot_ref['id'], {'status': 'available',
'progress': '100%'})
LOG.debug(_("snapshot %s: created successfully"), snapshot_ref['name'])
return snapshot_id
def delete_snapshot(self, context, snapshot_id):
"""Deletes and unexports snapshot."""
context = context.elevated()
snapshot_ref = self.db.snapshot_get(context, snapshot_id)
try:
LOG.debug(_("snapshot %s: deleting"), snapshot_ref['name'])
self.driver.delete_snapshot(snapshot_ref)
except Exception:
self.db.snapshot_update(context,
snapshot_ref['id'],
{'status': 'error_deleting'})
raise
self.db.snapshot_destroy(context, snapshot_id)
LOG.debug(_("snapshot %s: deleted successfully"), snapshot_ref['name'])
return True
def setup_compute_volume(self, context, volume_id):
"""Setup remote volume on compute host.

View File

@ -17,8 +17,7 @@ redis==2.0.0
routes==1.12.3
WebOb==0.9.8
wsgiref==0.1.2
mox==0.5.0
-f http://pymox.googlecode.com/files/mox-0.5.0.tar.gz
mox==0.5.3
greenlet==0.3.1
nose
bzr