Add support for EBS volumes to the live migration feature.
Currently, only AoE is supported.
This commit is contained in:
1
Authors
1
Authors
@@ -26,6 +26,7 @@ Kei Masumoto <masumotok@nttdata.co.jp>
|
||||
Matt Dietz <matt.dietz@rackspace.com>
|
||||
Michael Gundlach <michael.gundlach@rackspace.com>
|
||||
Monty Taylor <mordred@inaugust.com>
|
||||
Muneyuki Noguchi <noguchimn@nttdata.co.jp>
|
||||
Paul Voccio <paul@openstack.org>
|
||||
Rick Clark <rick@openstack.org>
|
||||
Rick Harris <rconradharris@gmail.com>
|
||||
|
@@ -83,8 +83,6 @@ from nova import rpc
|
||||
from nova.cloudpipe import pipelib
|
||||
from nova.api.ec2 import cloud
|
||||
|
||||
|
||||
|
||||
FLAGS = flags.FLAGS
|
||||
flags.DECLARE('fixed_range', 'nova.network.manager')
|
||||
flags.DECLARE('num_networks', 'nova.network.manager')
|
||||
@@ -462,6 +460,10 @@ class InstanceCommands(object):
|
||||
def live_migration(self, ec2_id, dest):
|
||||
"""live_migration"""
|
||||
|
||||
if FLAGS.volume_driver != 'nova.volume.driver.AOEDriver':
|
||||
raise exception.Error('Only AOEDriver is supported for now. '
|
||||
'Sorry.')
|
||||
|
||||
logging.basicConfig()
|
||||
ctxt = context.get_admin_context()
|
||||
|
||||
@@ -491,7 +493,6 @@ class InstanceCommands(object):
|
||||
class HostCommands(object):
|
||||
"""Class for mangaging host(physical nodes)."""
|
||||
|
||||
|
||||
def list(self):
|
||||
"""describe host list."""
|
||||
|
||||
@@ -502,7 +503,6 @@ class HostCommands(object):
|
||||
for host_ref in host_refs:
|
||||
print host_ref['name']
|
||||
|
||||
|
||||
def show(self, host):
|
||||
"""describe cpu/memory/hdd info for host."""
|
||||
|
||||
@@ -546,6 +546,7 @@ CATEGORIES = [
|
||||
('instance', InstanceCommands),
|
||||
('host', HostCommands)]
|
||||
|
||||
|
||||
def lazy_match(name, key_value_tuples):
|
||||
"""Finds all objects that have a key that case insensitively contains
|
||||
[name] key_value_tuples is a list of tuples of the form (key, value)
|
||||
|
@@ -170,7 +170,8 @@ class ComputeTestFunctions(unittest.TestCase):
|
||||
# mocks for pre_live_migration
|
||||
self.ctxt = context.get_admin_context()
|
||||
db.instance_get = Mock(return_value=self.instance1)
|
||||
db.volume_get_by_ec2_id = Mock(return_value=[self.vol1, self.vol2])
|
||||
db.volume_get_all_by_instance \
|
||||
= Mock(return_value=[self.vol1, self.vol2])
|
||||
db.volume_get_shelf_and_blade = Mock(return_value=(3, 4))
|
||||
db.instance_get_fixed_address = Mock(return_value=self.fixed_ip1)
|
||||
db.security_group_get_by_instance \
|
||||
@@ -199,7 +200,7 @@ class ComputeTestFunctions(unittest.TestCase):
|
||||
def test02(self):
|
||||
"""02: NotAuthrized occurs on finding volume on DB. """
|
||||
|
||||
db.volume_get_by_ec2_id \
|
||||
db.volume_get_all_by_instance \
|
||||
= Mock(side_effect=exception.NotAuthorized('ERR'))
|
||||
|
||||
self.assertRaises(exception.NotAuthorized,
|
||||
@@ -211,7 +212,7 @@ class ComputeTestFunctions(unittest.TestCase):
|
||||
def test03(self):
|
||||
"""03: Unexpected exception occurs on finding volume on DB. """
|
||||
|
||||
db.volume_get_by_ec2_id = Mock(side_effect=TypeError('ERR'))
|
||||
db.volume_get_all_by_instance = Mock(side_effect=TypeError('ERR'))
|
||||
|
||||
self.assertRaises(TypeError,
|
||||
self.manager.pre_live_migration,
|
||||
@@ -222,7 +223,6 @@ class ComputeTestFunctions(unittest.TestCase):
|
||||
def test04(self):
|
||||
"""04: no volume and fixed ip found on DB, """
|
||||
|
||||
db.volume_get_by_ec2_id = Mock(side_effect=exception.NotFound('ERR'))
|
||||
db.instance_get_fixed_address = Mock(return_value=None)
|
||||
|
||||
self.assertRaises(rpc.RemoteError,
|
||||
@@ -230,10 +230,6 @@ class ComputeTestFunctions(unittest.TestCase):
|
||||
self.ctxt,
|
||||
'dummy_ec2_id',
|
||||
'host2')
|
||||
|
||||
c1 = (0 <= sys.stderr.buffer.find('has no volume'))
|
||||
|
||||
self.assertEqual(c1, True)
|
||||
|
||||
def test05(self):
|
||||
"""05: volume found and no fixed_ip found on DB. """
|
||||
|
@@ -97,6 +97,9 @@ class NovaManageTestFunctions(unittest.TestCase):
|
||||
# prepare test data
|
||||
self.setTestData()
|
||||
|
||||
# only AoE is supported for now
|
||||
FLAGS.volume_driver = 'nova.volume.driver.AOEDriver'
|
||||
|
||||
|
||||
def setTestData(self):
|
||||
import bin.novamanagetest
|
||||
|
@@ -1,222 +0,0 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2010 United States Government as represented by the
|
||||
# Administrator of the National Aeronautics and Space Administration.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
"""
|
||||
A service is a very thin wrapper around a Manager object. It exposes the
|
||||
manager's public methods to other components of the system via rpc. It will
|
||||
report state periodically to the database and is responsible for initiating
|
||||
any periodic tasts that need to be executed on a given host.
|
||||
|
||||
This module contains Service, a generic baseclass for all workers.
|
||||
"""
|
||||
|
||||
import inspect
|
||||
import logging
|
||||
import os
|
||||
|
||||
from twisted.internet import defer
|
||||
from twisted.internet import task
|
||||
from twisted.application import service
|
||||
|
||||
from nova import context
|
||||
from nova import db
|
||||
from nova import exception
|
||||
from nova import flags
|
||||
from nova import rpc
|
||||
from nova import utils
|
||||
|
||||
|
||||
FLAGS = flags.FLAGS
|
||||
flags.DEFINE_integer('report_interval', 10,
|
||||
'seconds between nodes reporting state to datastore',
|
||||
lower_bound=1)
|
||||
|
||||
flags.DEFINE_integer('periodic_interval', 60,
|
||||
'seconds between running periodic tasks',
|
||||
lower_bound=1)
|
||||
|
||||
|
||||
class Service(object, service.Service):
|
||||
"""Base class for workers that run on hosts."""
|
||||
|
||||
def __init__(self, host, binary, topic, manager, report_interval=None,
|
||||
periodic_interval=None, *args, **kwargs):
|
||||
self.host = host
|
||||
self.binary = binary
|
||||
self.topic = topic
|
||||
self.manager_class_name = manager
|
||||
self.report_interval = report_interval
|
||||
self.periodic_interval = periodic_interval
|
||||
super(Service, self).__init__(*args, **kwargs)
|
||||
self.saved_args, self.saved_kwargs = args, kwargs
|
||||
|
||||
def startService(self): # pylint: disable-msg C0103
|
||||
manager_class = utils.import_class(self.manager_class_name)
|
||||
self.manager = manager_class(host=self.host, *self.saved_args,
|
||||
**self.saved_kwargs)
|
||||
self.manager.init_host()
|
||||
self.model_disconnected = False
|
||||
ctxt = context.get_admin_context()
|
||||
|
||||
try:
|
||||
host_ref = db.host_get_by_name(ctxt, self.host)
|
||||
except exception.NotFound:
|
||||
host_ref = db.host_create(ctxt, {'name': self.host})
|
||||
host_ref = self._update_host_ref(ctxt, host_ref)
|
||||
|
||||
try:
|
||||
service_ref = db.service_get_by_args(ctxt,
|
||||
self.host,
|
||||
self.binary)
|
||||
self.service_id = service_ref['id']
|
||||
except exception.NotFound:
|
||||
self._create_service_ref(ctxt)
|
||||
|
||||
conn = rpc.Connection.instance()
|
||||
if self.report_interval:
|
||||
consumer_all = rpc.AdapterConsumer(
|
||||
connection=conn,
|
||||
topic=self.topic,
|
||||
proxy=self)
|
||||
consumer_node = rpc.AdapterConsumer(
|
||||
connection=conn,
|
||||
topic='%s.%s' % (self.topic, self.host),
|
||||
proxy=self)
|
||||
|
||||
consumer_all.attach_to_twisted()
|
||||
consumer_node.attach_to_twisted()
|
||||
|
||||
pulse = task.LoopingCall(self.report_state)
|
||||
pulse.start(interval=self.report_interval, now=False)
|
||||
|
||||
if self.periodic_interval:
|
||||
pulse = task.LoopingCall(self.periodic_tasks)
|
||||
pulse.start(interval=self.periodic_interval, now=False)
|
||||
|
||||
def _create_service_ref(self, context):
|
||||
service_ref = db.service_create(context,
|
||||
{'host': self.host,
|
||||
'binary': self.binary,
|
||||
'topic': self.topic,
|
||||
'report_count': 0})
|
||||
self.service_id = service_ref['id']
|
||||
|
||||
def _update_host_ref(self, context, host_ref):
|
||||
|
||||
if 0 <= self.manager_class_name.find('ComputeManager'):
|
||||
vcpu = self.manager.driver.get_vcpu_number()
|
||||
memory_mb = self.manager.get_memory_mb()
|
||||
local_gb = self.manager.get_local_gb()
|
||||
hypervisor = self.manager.driver.get_hypervisor_type()
|
||||
version = self.manager.driver.get_hypervisor_version()
|
||||
cpu_xml = self.manager.driver.get_cpu_xml()
|
||||
|
||||
db.host_update(context,
|
||||
host_ref['id'],
|
||||
{'vcpus': vcpu,
|
||||
'memory_mb': memory_mb,
|
||||
'local_gb': local_gb,
|
||||
'hypervisor_type': hypervisor,
|
||||
'hypervisor_version': version,
|
||||
'cpu_info':cpu_xml })
|
||||
return host_ref
|
||||
|
||||
def __getattr__(self, key):
|
||||
manager = self.__dict__.get('manager', None)
|
||||
return getattr(manager, key)
|
||||
|
||||
@classmethod
|
||||
def create(cls,
|
||||
host=None,
|
||||
binary=None,
|
||||
topic=None,
|
||||
manager=None,
|
||||
report_interval=None,
|
||||
periodic_interval=None):
|
||||
"""Instantiates class and passes back application object.
|
||||
|
||||
Args:
|
||||
host, defaults to FLAGS.host
|
||||
binary, defaults to basename of executable
|
||||
topic, defaults to bin_name - "nova-" part
|
||||
manager, defaults to FLAGS.<topic>_manager
|
||||
report_interval, defaults to FLAGS.report_interval
|
||||
periodic_interval, defaults to FLAGS.periodic_interval
|
||||
"""
|
||||
if not host:
|
||||
host = FLAGS.host
|
||||
if not binary:
|
||||
binary = os.path.basename(inspect.stack()[-1][1])
|
||||
if not topic:
|
||||
topic = binary.rpartition("nova-")[2]
|
||||
if not manager:
|
||||
manager = FLAGS.get('%s_manager' % topic, None)
|
||||
if not report_interval:
|
||||
report_interval = FLAGS.report_interval
|
||||
if not periodic_interval:
|
||||
periodic_interval = FLAGS.periodic_interval
|
||||
logging.warn("Starting %s node", topic)
|
||||
service_obj = cls(host, binary, topic, manager,
|
||||
report_interval, periodic_interval)
|
||||
|
||||
# This is the parent service that twistd will be looking for when it
|
||||
# parses this file, return it so that we can get it into globals.
|
||||
application = service.Application(binary)
|
||||
service_obj.setServiceParent(application)
|
||||
return application
|
||||
|
||||
def kill(self):
|
||||
"""Destroy the service object in the datastore"""
|
||||
try:
|
||||
db.service_destroy(context.get_admin_context(), self.service_id)
|
||||
except exception.NotFound:
|
||||
logging.warn("Service killed that has no database entry")
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def periodic_tasks(self):
|
||||
"""Tasks to be run at a periodic interval"""
|
||||
yield self.manager.periodic_tasks(context.get_admin_context())
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def report_state(self):
|
||||
"""Update the state of this service in the datastore."""
|
||||
ctxt = context.get_admin_context()
|
||||
try:
|
||||
try:
|
||||
service_ref = db.service_get(ctxt, self.service_id)
|
||||
except exception.NotFound:
|
||||
logging.debug("The service database object disappeared, "
|
||||
"Recreating it.")
|
||||
self._create_service_ref(ctxt)
|
||||
service_ref = db.service_get(ctxt, self.service_id)
|
||||
|
||||
db.service_update(ctxt,
|
||||
self.service_id,
|
||||
{'report_count': service_ref['report_count'] + 1})
|
||||
|
||||
# TODO(termie): make this pattern be more elegant.
|
||||
if getattr(self, "model_disconnected", False):
|
||||
self.model_disconnected = False
|
||||
logging.error("Recovered model server connection!")
|
||||
|
||||
# TODO(vish): this should probably only catch connection errors
|
||||
except Exception: # pylint: disable-msg=W0702
|
||||
if not getattr(self, "model_disconnected", False):
|
||||
self.model_disconnected = True
|
||||
logging.exception("model server went away")
|
||||
yield
|
Reference in New Issue
Block a user