Merged trunk and resolved conflict in nova/db/sqlalchemy/api.py

This commit is contained in:
Brian Lamar
2011-03-24 13:53:54 -04:00
17 changed files with 453 additions and 68 deletions

View File

@@ -97,6 +97,7 @@ flags.DECLARE('vlan_start', 'nova.network.manager')
flags.DECLARE('vpn_start', 'nova.network.manager')
flags.DECLARE('fixed_range_v6', 'nova.network.manager')
flags.DECLARE('images_path', 'nova.image.local')
flags.DECLARE('libvirt_type', 'nova.virt.libvirt_conn')
flags.DEFINE_flag(flags.HelpFlag())
flags.DEFINE_flag(flags.HelpshortFlag())
flags.DEFINE_flag(flags.HelpXMLFlag())

View File

@@ -358,5 +358,6 @@ DEFINE_string('node_availability_zone', 'nova',
'availability zone of this node')
DEFINE_string('zone_name', 'nova', 'name of this zone')
DEFINE_string('zone_capabilities', 'kypervisor:xenserver;os:linux',
'Key/Value tags which represent capabilities of this zone')
DEFINE_list('zone_capabilities',
['hypervisor=xenserver;kvm', 'os=linux;windows'],
'Key/Multi-value list representng capabilities of this zone')

View File

@@ -53,11 +53,14 @@ This module provides Manager, a base class for managers.
from nova import utils
from nova import flags
from nova import log as logging
from nova.db import base
from nova.scheduler import api
FLAGS = flags.FLAGS
LOG = logging.getLogger('nova.manager')
class Manager(base.Base):
def __init__(self, host=None, db_driver=None):
@@ -74,3 +77,29 @@ class Manager(base.Base):
"""Do any initialization that needs to be run if this is a standalone
service. Child classes should override this method."""
pass
class SchedulerDependentManager(Manager):
"""Periodically send capability updates to the Scheduler services.
Services that need to update the Scheduler of their capabilities
should derive from this class. Otherwise they can derive from
manager.Manager directly. Updates are only sent after
update_service_capabilities is called with non-None values."""
def __init__(self, host=None, db_driver=None, service_name="undefined"):
self.last_capabilities = None
self.service_name = service_name
super(SchedulerDependentManager, self).__init__(host, db_driver)
def update_service_capabilities(self, capabilities):
"""Remember these capabilities to send on next periodic update."""
self.last_capabilities = capabilities
def periodic_tasks(self, context=None):
"""Pass data back to the scheduler at a periodic interval"""
if self.last_capabilities:
LOG.debug(_("Notifying Schedulers of capabilities ..."))
api.update_service_capabilities(context, self.service_name,
self.host, self.last_capabilities)
super(SchedulerDependentManager, self).periodic_tasks(context)

View File

@@ -137,24 +137,7 @@ class Consumer(messaging.Consumer):
return timer
class Publisher(messaging.Publisher):
"""Publisher base class"""
pass
class TopicConsumer(Consumer):
"""Consumes messages on a specific topic"""
exchange_type = "topic"
def __init__(self, connection=None, topic="broadcast"):
self.queue = topic
self.routing_key = topic
self.exchange = FLAGS.control_exchange
self.durable = False
super(TopicConsumer, self).__init__(connection=connection)
class AdapterConsumer(TopicConsumer):
class AdapterConsumer(Consumer):
"""Calls methods on a proxy object based on method and args"""
def __init__(self, connection=None, topic="broadcast", proxy=None):
LOG.debug(_('Initing the Adapter Consumer for %s') % topic)
@@ -207,6 +190,41 @@ class AdapterConsumer(TopicConsumer):
return
class Publisher(messaging.Publisher):
"""Publisher base class"""
pass
class TopicAdapterConsumer(AdapterConsumer):
"""Consumes messages on a specific topic"""
exchange_type = "topic"
def __init__(self, connection=None, topic="broadcast", proxy=None):
self.queue = topic
self.routing_key = topic
self.exchange = FLAGS.control_exchange
self.durable = False
super(TopicAdapterConsumer, self).__init__(connection=connection,
topic=topic, proxy=proxy)
class FanoutAdapterConsumer(AdapterConsumer):
"""Consumes messages from a fanout exchange"""
exchange_type = "fanout"
def __init__(self, connection=None, topic="broadcast", proxy=None):
self.exchange = "%s_fanout" % topic
self.routing_key = topic
unique = uuid.uuid4().hex
self.queue = "%s_fanout_%s" % (topic, unique)
self.durable = False
LOG.info(_("Created '%(exchange)s' fanout exchange "
"with '%(key)s' routing key"),
dict(exchange=self.exchange, key=self.routing_key))
super(FanoutAdapterConsumer, self).__init__(connection=connection,
topic=topic, proxy=proxy)
class TopicPublisher(Publisher):
"""Publishes messages on a specific topic"""
exchange_type = "topic"
@@ -218,6 +236,19 @@ class TopicPublisher(Publisher):
super(TopicPublisher, self).__init__(connection=connection)
class FanoutPublisher(Publisher):
"""Publishes messages to a fanout exchange."""
exchange_type = "fanout"
def __init__(self, topic, connection=None):
self.exchange = "%s_fanout" % topic
self.queue = "%s_fanout" % topic
self.durable = False
LOG.info(_("Creating '%(exchange)s' fanout exchange"),
dict(exchange=self.exchange))
super(FanoutPublisher, self).__init__(connection=connection)
class DirectConsumer(Consumer):
"""Consumes messages directly on a channel specified by msg_id"""
exchange_type = "direct"
@@ -360,6 +391,16 @@ def cast(context, topic, msg):
publisher.close()
def fanout_cast(context, topic, msg):
"""Sends a message on a fanout exchange without waiting for a response"""
LOG.debug(_("Making asynchronous fanout cast..."))
_pack_context(msg, context)
conn = Connection.instance()
publisher = FanoutPublisher(topic, connection=conn)
publisher.send(msg)
publisher.close()
def generic_response(message_data, message):
"""Logs a result and exits"""
LOG.debug(_('response %s'), message_data)

View File

@@ -25,25 +25,40 @@ FLAGS = flags.FLAGS
LOG = logging.getLogger('nova.scheduler.api')
class API(object):
"""API for interacting with the scheduler."""
def _call_scheduler(method, context, params=None):
"""Generic handler for RPC calls to the scheduler.
def _call_scheduler(self, method, context, params=None):
"""Generic handler for RPC calls to the scheduler.
:param params: Optional dictionary of arguments to be passed to the
scheduler worker
:param params: Optional dictionary of arguments to be passed to the
scheduler worker
:retval: Result returned by scheduler worker
"""
if not params:
params = {}
queue = FLAGS.scheduler_topic
kwargs = {'method': method, 'args': params}
return rpc.call(context, queue, kwargs)
:retval: Result returned by scheduler worker
"""
if not params:
params = {}
queue = FLAGS.scheduler_topic
kwargs = {'method': method, 'args': params}
return rpc.call(context, queue, kwargs)
def get_zone_list(self, context):
items = self._call_scheduler('get_zone_list', context)
for item in items:
item['api_url'] = item['api_url'].replace('\\/', '/')
return items
def get_zone_list(context):
"""Return a list of zones assoicated with this zone."""
items = _call_scheduler('get_zone_list', context)
for item in items:
item['api_url'] = item['api_url'].replace('\\/', '/')
return items
def get_zone_capabilities(context, service=None):
"""Returns a dict of key, value capabilities for this zone,
or for a particular class of services running in this zone."""
return _call_scheduler('get_zone_capabilities', context=context,
params=dict(service=service))
def update_service_capabilities(context, service_name, host, capabilities):
"""Send an update to all the scheduler services informing them
of the capabilities of this service."""
kwargs = dict(method='update_service_capabilities',
args=dict(service_name=service_name, host=host,
capabilities=capabilities))
return rpc.fanout_cast(context, 'scheduler', kwargs)

View File

@@ -49,6 +49,13 @@ class WillNotSchedule(exception.Error):
class Scheduler(object):
"""The base class that all Scheduler clases should inherit from."""
def __init__(self):
self.zone_manager = None
def set_zone_manager(self, zone_manager):
"""Called by the Scheduler Service to supply a ZoneManager."""
self.zone_manager = zone_manager
@staticmethod
def service_is_up(service):
"""Check whether a service is up based on last heartbeat."""

View File

@@ -41,10 +41,11 @@ flags.DEFINE_string('scheduler_driver',
class SchedulerManager(manager.Manager):
"""Chooses a host to run instances on."""
def __init__(self, scheduler_driver=None, *args, **kwargs):
self.zone_manager = zone_manager.ZoneManager()
if not scheduler_driver:
scheduler_driver = FLAGS.scheduler_driver
self.driver = utils.import_object(scheduler_driver)
self.zone_manager = zone_manager.ZoneManager()
self.driver.set_zone_manager(self.zone_manager)
super(SchedulerManager, self).__init__(*args, **kwargs)
def __getattr__(self, key):
@@ -59,6 +60,17 @@ class SchedulerManager(manager.Manager):
"""Get a list of zones from the ZoneManager."""
return self.zone_manager.get_zone_list()
def get_zone_capabilities(self, context=None, service=None):
"""Get the normalized set of capabilites for this zone,
or for a particular service."""
return self.zone_manager.get_zone_capabilities(context, service)
def update_service_capabilities(self, context=None, service_name=None,
host=None, capabilities={}):
"""Process a capability update from a service node."""
self.zone_manager.update_service_capabilities(service_name,
host, capabilities)
def _schedule(self, method, context, topic, *args, **kwargs):
"""Tries to call schedule_* method on the driver to retrieve host.

View File

@@ -105,12 +105,36 @@ class ZoneManager(object):
def __init__(self):
self.last_zone_db_check = datetime.min
self.zone_states = {}
self.service_states = {} # { <service> : { <host> : { cap k : v }}}
self.green_pool = greenpool.GreenPool()
def get_zone_list(self):
"""Return the list of zones we know about."""
return [zone.to_dict() for zone in self.zone_states.values()]
def get_zone_capabilities(self, context, service=None):
"""Roll up all the individual host info to generic 'service'
capabilities. Each capability is aggregated into
<cap>_min and <cap>_max values."""
service_dict = self.service_states
if service:
service_dict = {service: self.service_states.get(service, {})}
# TODO(sandy) - be smarter about fabricating this structure.
# But it's likely to change once we understand what the Best-Match
# code will need better.
combined = {} # { <service>_<cap> : (min, max), ... }
for service_name, host_dict in service_dict.iteritems():
for host, caps_dict in host_dict.iteritems():
for cap, value in caps_dict.iteritems():
key = "%s_%s" % (service_name, cap)
min_value, max_value = combined.get(key, (value, value))
min_value = min(min_value, value)
max_value = max(max_value, value)
combined[key] = (min_value, max_value)
return combined
def _refresh_from_db(self, context):
"""Make our zone state map match the db."""
# Add/update existing zones ...
@@ -141,3 +165,11 @@ class ZoneManager(object):
self.last_zone_db_check = datetime.now()
self._refresh_from_db(context)
self._poll_zones(context)
def update_service_capabilities(self, service_name, host, capabilities):
"""Update the per-service capabilities based on this notification."""
logging.debug(_("Received %(service_name)s service update from "
"%(host)s: %(capabilities)s") % locals())
service_caps = self.service_states.get(service_name, {})
service_caps[host] = capabilities
self.service_states[service_name] = service_caps

View File

@@ -97,18 +97,24 @@ class Service(object):
conn1 = rpc.Connection.instance(new=True)
conn2 = rpc.Connection.instance(new=True)
conn3 = rpc.Connection.instance(new=True)
if self.report_interval:
consumer_all = rpc.AdapterConsumer(
consumer_all = rpc.TopicAdapterConsumer(
connection=conn1,
topic=self.topic,
proxy=self)
consumer_node = rpc.AdapterConsumer(
consumer_node = rpc.TopicAdapterConsumer(
connection=conn2,
topic='%s.%s' % (self.topic, self.host),
proxy=self)
fanout = rpc.FanoutAdapterConsumer(
connection=conn3,
topic=self.topic,
proxy=self)
self.timers.append(consumer_all.attach_to_eventlet())
self.timers.append(consumer_node.attach_to_eventlet())
self.timers.append(fanout.attach_to_eventlet())
pulse = utils.LoopingCall(self.report_state)
pulse.start(interval=self.report_interval, now=False)

View File

@@ -44,6 +44,14 @@ flags.DECLARE('stub_network', 'nova.compute.manager')
flags.DECLARE('live_migration_retry_count', 'nova.compute.manager')
class FakeTime(object):
def __init__(self):
self.counter = 0
def sleep(self, t):
self.counter += t
class ComputeTestCase(test.TestCase):
"""Test case for compute"""
def setUp(self):
@@ -82,6 +90,21 @@ class ComputeTestCase(test.TestCase):
inst.update(params)
return db.instance_create(self.context, inst)['id']
def _create_instance_type(self, params={}):
"""Create a test instance"""
context = self.context.elevated()
inst = {}
inst['name'] = 'm1.small'
inst['memory_mb'] = '1024'
inst['vcpus'] = '1'
inst['local_gb'] = '20'
inst['flavorid'] = '1'
inst['swap'] = '2048'
inst['rxtx_quota'] = 100
inst['rxtx_cap'] = 200
inst.update(params)
return db.instance_type_create(context, inst)['id']
def _create_group(self):
values = {'name': 'testgroup',
'description': 'testgroup',
@@ -299,15 +322,53 @@ class ComputeTestCase(test.TestCase):
"""Ensure instance can be migrated/resized"""
instance_id = self._create_instance()
context = self.context.elevated()
self.compute.run_instance(self.context, instance_id)
db.instance_update(self.context, instance_id, {'host': 'foo'})
self.compute.prep_resize(context, instance_id)
self.compute.prep_resize(context, instance_id, 1)
migration_ref = db.migration_get_by_instance_and_status(context,
instance_id, 'pre-migrating')
self.compute.resize_instance(context, instance_id,
migration_ref['id'])
self.compute.terminate_instance(context, instance_id)
def test_resize_invalid_flavor_fails(self):
"""Ensure invalid flavors raise"""
instance_id = self._create_instance()
context = self.context.elevated()
self.compute.run_instance(self.context, instance_id)
self.assertRaises(exception.NotFound, self.compute_api.resize,
context, instance_id, 200)
self.compute.terminate_instance(context, instance_id)
def test_resize_down_fails(self):
"""Ensure resizing down raises and fails"""
context = self.context.elevated()
instance_id = self._create_instance()
self.compute.run_instance(self.context, instance_id)
db.instance_update(self.context, instance_id,
{'instance_type': 'm1.xlarge'})
self.assertRaises(exception.ApiError, self.compute_api.resize,
context, instance_id, 1)
self.compute.terminate_instance(context, instance_id)
def test_resize_same_size_fails(self):
"""Ensure invalid flavors raise"""
context = self.context.elevated()
instance_id = self._create_instance()
self.compute.run_instance(self.context, instance_id)
self.assertRaises(exception.ApiError, self.compute_api.resize,
context, instance_id, 1)
self.compute.terminate_instance(context, instance_id)
def test_get_by_flavor_id(self):
type = instance_types.get_by_flavor_id(1)
self.assertEqual(type, 'm1.tiny')
@@ -318,10 +379,8 @@ class ComputeTestCase(test.TestCase):
instance_id = self._create_instance()
self.compute.run_instance(self.context, instance_id)
self.assertRaises(exception.Error, self.compute.prep_resize,
self.context, instance_id)
self.context, instance_id, 1)
self.compute.terminate_instance(self.context, instance_id)
type = instance_types.get_by_flavor_id("1")
self.assertEqual(type, 'm1.tiny')
def _setup_other_managers(self):
self.volume_manager = utils.import_object(FLAGS.volume_manager)
@@ -342,7 +401,7 @@ class ComputeTestCase(test.TestCase):
self.mox.ReplayAll()
self.assertRaises(exception.NotFound,
self.compute.pre_live_migration,
c, instance_ref['id'])
c, instance_ref['id'], time=FakeTime())
def test_pre_live_migration_instance_has_volume(self):
"""Confirm setup_compute_volume is called when volume is mounted."""
@@ -395,7 +454,7 @@ class ComputeTestCase(test.TestCase):
self.compute.driver = drivermock
self.mox.ReplayAll()
ret = self.compute.pre_live_migration(c, i_ref['id'])
ret = self.compute.pre_live_migration(c, i_ref['id'], time=FakeTime())
self.assertEqual(ret, None)
def test_pre_live_migration_setup_compute_node_fail(self):
@@ -428,7 +487,7 @@ class ComputeTestCase(test.TestCase):
self.mox.ReplayAll()
self.assertRaises(exception.ProcessExecutionError,
self.compute.pre_live_migration,
c, i_ref['id'])
c, i_ref['id'], time=FakeTime())
def test_live_migration_works_correctly_with_volume(self):
"""Confirm check_for_export to confirm volume health check."""
@@ -575,3 +634,24 @@ class ComputeTestCase(test.TestCase):
db.instance_destroy(c, instance_id)
db.volume_destroy(c, v_ref['id'])
db.floating_ip_destroy(c, flo_addr)
def test_run_kill_vm(self):
"""Detect when a vm is terminated behind the scenes"""
instance_id = self._create_instance()
self.compute.run_instance(self.context, instance_id)
instances = db.instance_get_all(context.get_admin_context())
LOG.info(_("Running instances: %s"), instances)
self.assertEqual(len(instances), 1)
instance_name = instances[0].name
self.compute.driver.test_remove_vm(instance_name)
# Force the compute manager to do its periodic poll
error_list = self.compute.periodic_tasks(context.get_admin_context())
self.assertFalse(error_list)
instances = db.instance_get_all(context.get_admin_context())
LOG.info(_("After force-killing instances: %s"), instances)
self.assertEqual(len(instances), 0)

View File

@@ -21,9 +21,10 @@ import sys
import unittest
import nova
from nova import test
class LocalizationTestCase(unittest.TestCase):
class LocalizationTestCase(test.TestCase):
def test_multiple_positional_format_placeholders(self):
pat = re.compile("\W_\(")
single_pat = re.compile("\W%\W")

View File

@@ -18,8 +18,12 @@ import errno
import os
import select
from eventlet import greenpool
from eventlet import greenthread
from nova import test
from nova.utils import parse_mailmap, str_dict_replace, synchronized
from nova import utils
from nova.utils import parse_mailmap, str_dict_replace
class ProjectTestCase(test.TestCase):
@@ -63,7 +67,7 @@ class ProjectTestCase(test.TestCase):
class LockTestCase(test.TestCase):
def test_synchronized_wrapped_function_metadata(self):
@synchronized('whatever')
@utils.synchronized('whatever')
def foo():
"""Bar"""
pass
@@ -72,11 +76,42 @@ class LockTestCase(test.TestCase):
self.assertEquals(foo.__name__, 'foo', "Wrapped function's name "
"got mangled")
def test_synchronized(self):
def test_synchronized_internally(self):
"""We can lock across multiple green threads"""
saved_sem_num = len(utils._semaphores)
seen_threads = list()
@utils.synchronized('testlock2', external=False)
def f(id):
for x in range(10):
seen_threads.append(id)
greenthread.sleep(0)
threads = []
pool = greenpool.GreenPool(10)
for i in range(10):
threads.append(pool.spawn(f, i))
for thread in threads:
thread.wait()
self.assertEquals(len(seen_threads), 100)
# Looking at the seen threads, split it into chunks of 10, and verify
# that the last 9 match the first in each chunk.
for i in range(10):
for j in range(9):
self.assertEquals(seen_threads[i * 10],
seen_threads[i * 10 + 1 + j])
self.assertEqual(saved_sem_num, len(utils._semaphores),
"Semaphore leak detected")
def test_synchronized_externally(self):
"""We can lock across multiple processes"""
rpipe1, wpipe1 = os.pipe()
rpipe2, wpipe2 = os.pipe()
@synchronized('testlock')
@utils.synchronized('testlock1', external=True)
def f(rpipe, wpipe):
try:
os.write(wpipe, "foo")

View File

@@ -36,7 +36,7 @@ class RpcTestCase(test.TestCase):
super(RpcTestCase, self).setUp()
self.conn = rpc.Connection.instance(True)
self.receiver = TestReceiver()
self.consumer = rpc.AdapterConsumer(connection=self.conn,
self.consumer = rpc.TopicAdapterConsumer(connection=self.conn,
topic='test',
proxy=self.receiver)
self.consumer.attach_to_eventlet()
@@ -97,7 +97,7 @@ class RpcTestCase(test.TestCase):
nested = Nested()
conn = rpc.Connection.instance(True)
consumer = rpc.AdapterConsumer(connection=conn,
consumer = rpc.TopicAdapterConsumer(connection=conn,
topic='nested',
proxy=nested)
consumer.attach_to_eventlet()

View File

@@ -77,13 +77,11 @@ class CacheConcurrencyTestCase(test.TestCase):
eventlet.sleep(0)
try:
self.assertFalse(done2.ready())
self.assertTrue('fname' in conn._image_sems)
finally:
wait1.send()
done1.wait()
eventlet.sleep(0)
self.assertTrue(done2.ready())
self.assertFalse('fname' in conn._image_sems)
def test_different_fname_concurrency(self):
"""Ensures that two different fname caches are concurrent"""
@@ -429,6 +427,15 @@ class LibvirtConnTestCase(test.TestCase):
def fake_raise(self):
raise libvirt.libvirtError('ERR')
class FakeTime(object):
def __init__(self):
self.counter = 0
def sleep(self, t):
self.counter += t
fake_timer = FakeTime()
self.create_fake_libvirt_mock(nwfilterLookupByName=fake_raise)
instance_ref = db.instance_create(self.context, self.test_instance)
@@ -438,11 +445,15 @@ class LibvirtConnTestCase(test.TestCase):
conn = libvirt_conn.LibvirtConnection(False)
conn.firewall_driver.setattr('setup_basic_filtering', fake_none)
conn.firewall_driver.setattr('prepare_instance_filter', fake_none)
conn.ensure_filtering_rules_for_instance(instance_ref)
conn.ensure_filtering_rules_for_instance(instance_ref,
time=fake_timer)
except exception.Error, e:
c1 = (0 <= e.message.find('Timeout migrating for'))
self.assertTrue(c1)
self.assertEqual(29, fake_timer.counter, "Didn't wait the expected "
"amount of time")
db.instance_destroy(self.context, instance_ref['id'])
def test_live_migration_raises_exception(self):

View File

@@ -76,6 +76,40 @@ class ZoneManagerTestCase(test.TestCase):
self.assertEquals(len(zm.zone_states), 1)
self.assertEquals(zm.zone_states[1].username, 'user1')
def test_service_capabilities(self):
zm = zone_manager.ZoneManager()
caps = zm.get_zone_capabilities(self, None)
self.assertEquals(caps, {})
zm.update_service_capabilities("svc1", "host1", dict(a=1, b=2))
caps = zm.get_zone_capabilities(self, None)
self.assertEquals(caps, dict(svc1_a=(1, 1), svc1_b=(2, 2)))
zm.update_service_capabilities("svc1", "host1", dict(a=2, b=3))
caps = zm.get_zone_capabilities(self, None)
self.assertEquals(caps, dict(svc1_a=(2, 2), svc1_b=(3, 3)))
zm.update_service_capabilities("svc1", "host2", dict(a=20, b=30))
caps = zm.get_zone_capabilities(self, None)
self.assertEquals(caps, dict(svc1_a=(2, 20), svc1_b=(3, 30)))
zm.update_service_capabilities("svc10", "host1", dict(a=99, b=99))
caps = zm.get_zone_capabilities(self, None)
self.assertEquals(caps, dict(svc1_a=(2, 20), svc1_b=(3, 30),
svc10_a=(99, 99), svc10_b=(99, 99)))
zm.update_service_capabilities("svc1", "host3", dict(c=5))
caps = zm.get_zone_capabilities(self, None)
self.assertEquals(caps, dict(svc1_a=(2, 20), svc1_b=(3, 30),
svc1_c=(5, 5), svc10_a=(99, 99),
svc10_b=(99, 99)))
caps = zm.get_zone_capabilities(self, 'svc1')
self.assertEquals(caps, dict(svc1_a=(2, 20), svc1_b=(3, 30),
svc1_c=(5, 5)))
caps = zm.get_zone_capabilities(self, 'svc10')
self.assertEquals(caps, dict(svc10_a=(99, 99), svc10_b=(99, 99)))
def test_refresh_from_db_replace_existing(self):
zm = zone_manager.ZoneManager()
zone_state = zone_manager.ZoneState()

View File

@@ -228,6 +228,9 @@ class FakeSessionForMigrationTests(fake.SessionBase):
def VDI_get_by_uuid(*args):
return 'hurr'
def VDI_resize_online(*args):
pass
def VM_start(self, _1, ref, _2, _3):
vm = fake.get_record('VM', ref)
if vm['power_state'] != 'Halted':
@@ -240,7 +243,7 @@ class FakeSessionForMigrationTests(fake.SessionBase):
def stub_out_migration_methods(stubs):
def fake_get_snapshot(self, instance):
return 'foo', 'bar'
return 'vm_ref', dict(image='foo', snap='bar')
@classmethod
def fake_get_vdi(cls, session, vm_ref):
@@ -249,7 +252,7 @@ def stub_out_migration_methods(stubs):
vdi_rec = session.get_xenapi().VDI.get_record(vdi_ref)
return vdi_ref, {'uuid': vdi_rec['uuid'], }
def fake_shutdown(self, inst, vm, method='clean'):
def fake_shutdown(self, inst, vm, hard=True):
pass
@classmethod

View File

@@ -41,6 +41,7 @@ from xml.sax import saxutils
from eventlet import event
from eventlet import greenthread
from eventlet import semaphore
from eventlet.green import subprocess
None
from nova import exception
@@ -334,6 +335,14 @@ def utcnow():
utcnow.override_time = None
def is_older_than(before, seconds):
"""Return True if before is older than 'seconds'"""
if utcnow() - before > datetime.timedelta(seconds=seconds):
return True
else:
return False
def utcnow_ts():
"""Timestamp version of our utcnow function."""
return time.mktime(utcnow().timetuple())
@@ -531,17 +540,76 @@ def loads(s):
return json.loads(s)
def synchronized(name):
_semaphores = {}
class _NoopContextManager(object):
def __enter__(self):
pass
def __exit__(self, exc_type, exc_val, exc_tb):
pass
def synchronized(name, external=False):
"""Synchronization decorator
Decorating a method like so:
@synchronized('mylock')
def foo(self, *args):
...
ensures that only one thread will execute the bar method at a time.
Different methods can share the same lock:
@synchronized('mylock')
def foo(self, *args):
...
@synchronized('mylock')
def bar(self, *args):
...
This way only one of either foo or bar can be executing at a time.
The external keyword argument denotes whether this lock should work across
multiple processes. This means that if two different workers both run a
a method decorated with @synchronized('mylock', external=True), only one
of them will execute at a time.
"""
def wrap(f):
@functools.wraps(f)
def inner(*args, **kwargs):
LOG.debug(_("Attempting to grab %(lock)s for method "
"%(method)s..." % {"lock": name,
# NOTE(soren): If we ever go natively threaded, this will be racy.
# See http://stackoverflow.com/questions/5390569/dyn\
# amically-allocating-and-destroying-mutexes
if name not in _semaphores:
_semaphores[name] = semaphore.Semaphore()
sem = _semaphores[name]
LOG.debug(_('Attempting to grab semaphore "%(lock)s" for method '
'"%(method)s"...' % {"lock": name,
"method": f.__name__}))
lock = lockfile.FileLock(os.path.join(FLAGS.lock_path,
'nova-%s.lock' % name))
with lock:
return f(*args, **kwargs)
with sem:
if external:
LOG.debug(_('Attempting to grab file lock "%(lock)s" for '
'method "%(method)s"...' %
{"lock": name, "method": f.__name__}))
lock_file_path = os.path.join(FLAGS.lock_path,
'nova-%s.lock' % name)
lock = lockfile.FileLock(lock_file_path)
else:
lock = _NoopContextManager()
with lock:
retval = f(*args, **kwargs)
# If no-one else is waiting for it, delete it.
# See note about possible raciness above.
if not sem.balance < 1:
del _semaphores[name]
return retval
return inner
return wrap
@@ -593,3 +661,12 @@ def get_from_path(items, path):
return results
else:
return get_from_path(results, remainder)
def check_isinstance(obj, cls):
"""Checks that obj is of type cls, and lets PyLint infer types"""
if isinstance(obj, cls):
return obj
raise Exception(_("Expected object of type: %s") % (str(cls)))
# TODO(justinsb): Can we make this better??
return cls() # Ugly PyLint hack