Cells: Add the main code.

This introduces *EXPERIMENTAL* compute cells functionality as a way to
scale nova in a more distributed fashion without having to use complicated
technologies like DB and message queue clustering.

Cells are configured as a tree and the top level cell should contain
nova-api without any nova-computes while child cells contain everything
except nova-api.  One can think of a cell as a normal nova deployment in
that each cell has its own DB server and message queue broker.

The top level cell keeps a subset of data about ALL instances in all
cells in its DB.  Child cells send messages to the top level cell when
instances change state.  Data in 1 child cell is not shared with another
child cell.

A new service, nova-cells, is introduced that handles communication
between cells and picking of a cell for new instances.  This service is
required for every cell.  Communication between cells is pluggable with
the only option currently implemented being communnication via RPC.

Cells scheduling is separate from host scheduling.  nova-cells first picks
a cell (currently randomly -- future patches add filtering/weighing
functionality and decisions can be based on broadcasts of
capacity/capabilities).  Once a cell has been selected and the new build
request has reached its nova-cells service, it'll be sent over to the host
scheduler in that cell and the build proceeds as it does without cells.

New config options are introduced for enabling and configuring the cells
code.  Cells is disabled by default.  All of the config options below go
under a '[cells]' section in nova.conf.  These are the options that one
may want to tweak:

enable -- Turn on cells code (default is False)
name -- Name of the current cell.
capabilities -- List of arbitrary key=value pairs defining capabilities
                of the current cell.  These are sent to parent cells,
                but aren't used in scheduling until later filter/weight
                support is added.
call_timeout -- How long to wait for replies from a calls between cells

When using cells, the compute API class must be changed in the API cell,
so that requests can be proxied via nova-cells down to the correct cell
properly.  Thus, config requirements for API cell:

--
[DEFAULT]
compute_api_class=nova.compute.cells_api.ComputeCellsAPI.
[cells]
enable=True
name=api-cell
--

Config requirements for child cell:

--
[cells]
enable=True
name=child-cell1
--

Another requirement is populating the 'cells' DB table in each cell.
Each cell needs to know about its parent and children and how to
communicate with them (message broker location, credentials, etc).

Implements blueprint nova-compute-cells

DocImpact

Change-Id: I1b52788ea9d7753365d175abf39bdbc22ba822fe
This commit is contained in:
Chris Behrens 2012-04-13 05:54:48 +00:00
parent 48487f1a4b
commit f9a868e86c
24 changed files with 4696 additions and 13 deletions

53
bin/nova-cells Executable file
View File

@ -0,0 +1,53 @@
#!/usr/bin/env python
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright (c) 2012 Rackspace Hosting
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Starter script for Nova Cells Service."""
import eventlet
eventlet.monkey_patch()
import os
import sys
# If ../nova/__init__.py exists, add ../ to Python search path, so that
# it will override what happens to be installed in /usr/(local/)lib/python...
possible_topdir = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]),
os.pardir,
os.pardir))
if os.path.exists(os.path.join(possible_topdir, 'nova', '__init__.py')):
sys.path.insert(0, possible_topdir)
from nova import config
from nova.openstack.common import cfg
from nova.openstack.common import log as logging
from nova import service
from nova import utils
CONF = cfg.CONF
CONF.import_opt('topic', 'nova.cells.opts', group='cells')
CONF.import_opt('manager', 'nova.cells.opts', group='cells')
if __name__ == '__main__':
config.parse_args(sys.argv)
logging.setup('nova')
utils.monkey_patch()
server = service.Service.create(binary='nova-cells',
topic=CONF.cells.topic,
manager=CONF.cells.manager)
service.serve(server)
service.wait()

19
nova/cells/__init__.py Normal file
View File

@ -0,0 +1,19 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (c) 2012 Rackspace Hosting
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Cells
"""

41
nova/cells/driver.py Normal file
View File

@ -0,0 +1,41 @@
# Copyright (c) 2012 Rackspace Hosting
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Base Cells Communication Driver
"""
class BaseCellsDriver(object):
"""The base class for cells communication.
One instance of this class will be created for every neighbor cell
that we find in the DB and it will be associated with the cell in
its CellState.
One instance is also created by the cells manager for setting up
the consumers.
"""
def start_consumers(self, msg_runner):
"""Start any consumers the driver may need."""
raise NotImplementedError()
def stop_consumers(self):
"""Stop consuming messages."""
raise NotImplementedError()
def send_message_to_cell(self, cell_state, message):
"""Send a message to a cell."""
raise NotImplementedError()

136
nova/cells/manager.py Normal file
View File

@ -0,0 +1,136 @@
# Copyright (c) 2012 Rackspace Hosting
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Cells Service Manager
"""
from nova.cells import messaging
from nova.cells import state as cells_state
from nova import context
from nova import manager
from nova.openstack.common import cfg
from nova.openstack.common import importutils
from nova.openstack.common import log as logging
cell_manager_opts = [
cfg.StrOpt('driver',
default='nova.cells.rpc_driver.CellsRPCDriver',
help='Cells communication driver to use'),
]
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
CONF.register_opts(cell_manager_opts, group='cells')
class CellsManager(manager.Manager):
"""The nova-cells manager class. This class defines RPC
methods that the local cell may call. This class is NOT used for
messages coming from other cells. That communication is
driver-specific.
Communication to other cells happens via the messaging module. The
MessageRunner from that module will handle routing the message to
the correct cell via the communications driver. Most methods below
create 'targeted' (where we want to route a message to a specific cell)
or 'broadcast' (where we want a message to go to multiple cells)
messages.
Scheduling requests get passed to the scheduler class.
"""
RPC_API_VERSION = '1.0'
def __init__(self, *args, **kwargs):
# Mostly for tests.
cell_state_manager = kwargs.pop('cell_state_manager', None)
super(CellsManager, self).__init__(*args, **kwargs)
if cell_state_manager is None:
cell_state_manager = cells_state.CellStateManager
self.state_manager = cell_state_manager()
self.msg_runner = messaging.MessageRunner(self.state_manager)
cells_driver_cls = importutils.import_class(
CONF.cells.driver)
self.driver = cells_driver_cls()
def post_start_hook(self):
"""Have the driver start its consumers for inter-cell communication.
Also ask our child cells for their capacities and capabilities so
we get them more quickly than just waiting for the next periodic
update. Receiving the updates from the children will cause us to
update our parents. If we don't have any children, just update
our parents immediately.
"""
# FIXME(comstud): There's currently no hooks when services are
# stopping, so we have no way to stop consumers cleanly.
self.driver.start_consumers(self.msg_runner)
ctxt = context.get_admin_context()
if self.state_manager.get_child_cells():
self.msg_runner.ask_children_for_capabilities(ctxt)
self.msg_runner.ask_children_for_capacities(ctxt)
else:
self._update_our_parents(ctxt)
@manager.periodic_task
def _update_our_parents(self, ctxt):
"""Update our parent cells with our capabilities and capacity
if we're at the bottom of the tree.
"""
self.msg_runner.tell_parents_our_capabilities(ctxt)
self.msg_runner.tell_parents_our_capacities(ctxt)
def schedule_run_instance(self, ctxt, host_sched_kwargs):
"""Pick a cell (possibly ourselves) to build new instance(s)
and forward the request accordingly.
"""
# Target is ourselves first.
our_cell = self.state_manager.get_my_state()
self.msg_runner.schedule_run_instance(ctxt, our_cell,
host_sched_kwargs)
def run_compute_api_method(self, ctxt, cell_name, method_info, call):
"""Call a compute API method in a specific cell."""
response = self.msg_runner.run_compute_api_method(ctxt,
cell_name,
method_info,
call)
if call:
return response.value_or_raise()
def instance_update_at_top(self, ctxt, instance):
"""Update an instance at the top level cell."""
self.msg_runner.instance_update_at_top(ctxt, instance)
def instance_destroy_at_top(self, ctxt, instance):
"""Destroy an instance at the top level cell."""
self.msg_runner.instance_destroy_at_top(ctxt, instance)
def instance_delete_everywhere(self, ctxt, instance, delete_type):
"""This is used by API cell when it didn't know what cell
an instance was in, but the instance was requested to be
deleted or soft_deleted. So, we'll broadcast this everywhere.
"""
self.msg_runner.instance_delete_everywhere(ctxt, instance,
delete_type)
def instance_fault_create_at_top(self, ctxt, instance_fault):
"""Create an instance fault at the top level cell."""
self.msg_runner.instance_fault_create_at_top(ctxt, instance_fault)
def bw_usage_update_at_top(self, ctxt, bw_update_info):
"""Update bandwidth usage at top level cell."""
self.msg_runner.bw_usage_update_at_top(ctxt, bw_update_info)

1047
nova/cells/messaging.py Normal file

File diff suppressed because it is too large Load Diff

44
nova/cells/opts.py Normal file
View File

@ -0,0 +1,44 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (c) 2012 Rackspace Hosting
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Global cells config options
"""
from nova.openstack.common import cfg
cells_opts = [
cfg.BoolOpt('enable',
default=False,
help='Enable cell functionality'),
cfg.StrOpt('topic',
default='cells',
help='the topic cells nodes listen on'),
cfg.StrOpt('manager',
default='nova.cells.manager.CellsManager',
help='Manager for cells'),
cfg.StrOpt('name',
default='nova',
help='name of this cell'),
cfg.ListOpt('capabilities',
default=['hypervisor=xenserver;kvm', 'os=linux;windows'],
help='Key/Multi-value list with the capabilities of the cell'),
cfg.IntOpt('call_timeout',
default=60,
help='Seconds to wait for response from a call to a cell.'),
]
cfg.CONF.register_opts(cells_opts, group='cells')

165
nova/cells/rpc_driver.py Normal file
View File

@ -0,0 +1,165 @@
# Copyright (c) 2012 Rackspace Hosting
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Cells RPC Communication Driver
"""
from nova.cells import driver
from nova.openstack.common import cfg
from nova.openstack.common import rpc
from nova.openstack.common.rpc import dispatcher as rpc_dispatcher
from nova.openstack.common.rpc import proxy as rpc_proxy
cell_rpc_driver_opts = [
cfg.StrOpt('rpc_driver_queue_base',
default='cells.intercell',
help="Base queue name to use when communicating between "
"cells. Various topics by message type will be "
"appended to this.")]
CONF = cfg.CONF
CONF.register_opts(cell_rpc_driver_opts, group='cells')
CONF.import_opt('call_timeout', 'nova.cells.opts', group='cells')
_CELL_TO_CELL_RPC_API_VERSION = '1.0'
class CellsRPCDriver(driver.BaseCellsDriver):
"""Driver for cell<->cell communication via RPC. This is used to
setup the RPC consumers as well as to send a message to another cell.
One instance of this class will be created for every neighbor cell
that we find in the DB and it will be associated with the cell in
its CellState.
One instance is also created by the cells manager for setting up
the consumers.
"""
BASE_RPC_API_VERSION = _CELL_TO_CELL_RPC_API_VERSION
def __init__(self, *args, **kwargs):
super(CellsRPCDriver, self).__init__(*args, **kwargs)
self.rpc_connections = []
self.intercell_rpcapi = InterCellRPCAPI(
self.BASE_RPC_API_VERSION)
def _start_consumer(self, dispatcher, topic):
"""Start an RPC consumer."""
conn = rpc.create_connection(new=True)
conn.create_consumer(topic, dispatcher, fanout=False)
conn.create_consumer(topic, dispatcher, fanout=True)
self.rpc_connections.append(conn)
conn.consume_in_thread()
return conn
def start_consumers(self, msg_runner):
"""Start RPC consumers.
Start up 2 separate consumers for handling inter-cell
communication via RPC. Both handle the same types of
messages, but requests/replies are separated to solve
potential deadlocks. (If we used the same queue for both,
it's possible to exhaust the RPC thread pool while we wait
for replies.. such that we'd never consume a reply.)
"""
topic_base = CONF.cells.rpc_driver_queue_base
proxy_manager = InterCellRPCDispatcher(msg_runner)
dispatcher = rpc_dispatcher.RpcDispatcher([proxy_manager])
for msg_type in msg_runner.get_message_types():
topic = '%s.%s' % (topic_base, msg_type)
self._start_consumer(dispatcher, topic)
def stop_consumers(self):
"""Stop RPC consumers.
NOTE: Currently there's no hooks when stopping services
to have managers cleanup, so this is not currently called.
"""
for conn in self.rpc_connections:
conn.close()
def send_message_to_cell(self, cell_state, message):
"""Use the IntercellRPCAPI to send a message to a cell."""
self.intercell_rpcapi.send_message_to_cell(cell_state, message)
class InterCellRPCAPI(rpc_proxy.RpcProxy):
"""Client side of the Cell<->Cell RPC API.
The CellsRPCDriver uses this to make calls to another cell.
API version history:
1.0 - Initial version.
"""
def __init__(self, default_version):
super(InterCellRPCAPI, self).__init__(None, default_version)
@staticmethod
def _get_server_params_for_cell(next_hop):
"""Turn the DB information for a cell into the parameters
needed for the RPC call.
"""
param_map = {'username': 'username',
'password': 'password',
'rpc_host': 'hostname',
'rpc_port': 'port',
'rpc_virtual_host': 'virtual_host'}
server_params = {}
for source, target in param_map.items():
if next_hop.db_info[source]:
server_params[target] = next_hop.db_info[source]
return server_params
def send_message_to_cell(self, cell_state, message):
"""Send a message to another cell by JSON-ifying the message and
making an RPC cast to 'process_message'. If the message says to
fanout, do it. The topic that is used will be
'CONF.rpc_driver_queue_base.<message_type>'.
"""
ctxt = message.ctxt
json_message = message.to_json()
rpc_message = self.make_msg('process_message', message=json_message)
topic_base = CONF.cells.rpc_driver_queue_base
topic = '%s.%s' % (topic_base, message.message_type)
server_params = self._get_server_params_for_cell(cell_state)
if message.fanout:
self.fanout_cast_to_server(ctxt, server_params,
rpc_message, topic=topic)
else:
self.cast_to_server(ctxt, server_params,
rpc_message, topic=topic)
class InterCellRPCDispatcher(object):
"""RPC Dispatcher to handle messages received from other cells.
All messages received here have come from a sibling cell. Depending
on the ultimate target and type of message, we may process the message
in this cell, relay the message to another sibling cell, or both. This
logic is defined by the message class in the messaging module.
"""
BASE_RPC_API_VERSION = _CELL_TO_CELL_RPC_API_VERSION
def __init__(self, msg_runner):
"""Init the Intercell RPC Dispatcher."""
self.msg_runner = msg_runner
def process_message(self, _ctxt, message):
"""We received a message from another cell. Use the MessageRunner
to turn this from JSON back into an instance of the correct
Message class. Then process it!
"""
message = self.msg_runner.message_from_json(message)
message.process()

138
nova/cells/rpcapi.py Normal file
View File

@ -0,0 +1,138 @@
# Copyright (c) 2012 Rackspace Hosting
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Client side of nova-cells RPC API (for talking to the nova-cells service
within a cell).
This is different than communication between child and parent nova-cells
services. That communication is handled by the cells driver via the
messging module.
"""
from nova.openstack.common import cfg
from nova.openstack.common import jsonutils
from nova.openstack.common import log as logging
from nova.openstack.common.rpc import proxy as rpc_proxy
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
CONF.import_opt('enable', 'nova.cells.opts', group='cells')
CONF.import_opt('topic', 'nova.cells.opts', group='cells')
class CellsAPI(rpc_proxy.RpcProxy):
'''Cells client-side RPC API
API version history:
1.0 - Initial version.
'''
BASE_RPC_API_VERSION = '1.0'
def __init__(self):
super(CellsAPI, self).__init__(topic=CONF.cells.topic,
default_version=self.BASE_RPC_API_VERSION)
def cast_compute_api_method(self, ctxt, cell_name, method,
*args, **kwargs):
"""Make a cast to a compute API method in a certain cell."""
method_info = {'method': method,
'method_args': args,
'method_kwargs': kwargs}
self.cast(ctxt, self.make_msg('run_compute_api_method',
cell_name=cell_name,
method_info=method_info,
call=False))
def call_compute_api_method(self, ctxt, cell_name, method,
*args, **kwargs):
"""Make a call to a compute API method in a certain cell."""
method_info = {'method': method,
'method_args': args,
'method_kwargs': kwargs}
return self.call(ctxt, self.make_msg('run_compute_api_method',
cell_name=cell_name,
method_info=method_info,
call=True))
def schedule_run_instance(self, ctxt, **kwargs):
"""Schedule a new instance for creation."""
self.cast(ctxt, self.make_msg('schedule_run_instance',
host_sched_kwargs=kwargs))
def instance_update_at_top(self, ctxt, instance):
"""Update instance at API level."""
if not CONF.cells.enable:
return
# Make sure we have a dict, not a SQLAlchemy model
instance_p = jsonutils.to_primitive(instance)
self.cast(ctxt, self.make_msg('instance_update_at_top',
instance=instance_p))
def instance_destroy_at_top(self, ctxt, instance):
"""Destroy instance at API level."""
if not CONF.cells.enable:
return
instance_p = jsonutils.to_primitive(instance)
self.cast(ctxt, self.make_msg('instance_destroy_at_top',
instance=instance_p))
def instance_delete_everywhere(self, ctxt, instance, delete_type):
"""Delete instance everywhere. delete_type may be 'soft'
or 'hard'. This is generally only used to resolve races
when API cell doesn't know to what cell an instance belongs.
"""
if not CONF.cells.enable:
return
instance_p = jsonutils.to_primitive(instance)
self.cast(ctxt, self.make_msg('instance_delete_everywhere',
instance=instance_p,
delete_type=delete_type))
def instance_fault_create_at_top(self, ctxt, instance_fault):
"""Create an instance fault at the top."""
if not CONF.cells.enable:
return
instance_fault_p = jsonutils.to_primitive(instance_fault)
self.cast(ctxt, self.make_msg('instance_fault_create_at_top',
instance_fault=instance_fault_p))
def bw_usage_update_at_top(self, ctxt, uuid, mac, start_period,
bw_in, bw_out, last_ctr_in, last_ctr_out, last_refreshed=None):
"""Broadcast upwards that bw_usage was updated."""
if not CONF.cells.enable:
return
bw_update_info = {'uuid': uuid,
'mac': mac,
'start_period': start_period,
'bw_in': bw_in,
'bw_out': bw_out,
'last_ctr_in': last_ctr_in,
'last_ctr_out': last_ctr_out,
'last_refreshed': last_refreshed}
self.cast(ctxt, self.make_msg('bw_usage_update_at_top',
bw_update_info=bw_update_info))
def instance_info_cache_update_at_top(self, ctxt, instance_info_cache):
"""Broadcast up that an instance's info_cache has changed."""
if not CONF.cells.enable:
return
iicache = jsonutils.to_primitive(instance_info_cache)
instance = {'uuid': iicache['instance_uuid'],
'info_cache': iicache}
self.cast(ctxt, self.make_msg('instance_update_at_top',
instance=instance))

136
nova/cells/scheduler.py Normal file
View File

@ -0,0 +1,136 @@
# Copyright (c) 2012 Rackspace Hosting
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Cells Scheduler
"""
import random
import time
from nova import compute
from nova.compute import vm_states
from nova.db import base
from nova import exception
from nova.openstack.common import cfg
from nova.openstack.common import log as logging
from nova.scheduler import rpcapi as scheduler_rpcapi
cell_scheduler_opts = [
cfg.IntOpt('scheduler_retries',
default=10,
help='How many retries when no cells are available.'),
cfg.IntOpt('scheduler_retry_delay',
default=2,
help='How often to retry in seconds when no cells are '
'available.')
]
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
CONF.register_opts(cell_scheduler_opts, group='cells')
class CellsScheduler(base.Base):
"""The cells scheduler."""
def __init__(self, msg_runner):
super(CellsScheduler, self).__init__()
self.msg_runner = msg_runner
self.state_manager = msg_runner.state_manager
self.compute_api = compute.API()
self.scheduler_rpcapi = scheduler_rpcapi.SchedulerAPI()
def _create_instances_here(self, ctxt, request_spec):
instance_values = request_spec['instance_properties']
for instance_uuid in request_spec['instance_uuids']:
instance_values['uuid'] = instance_uuid
instance = self.compute_api.create_db_entry_for_new_instance(
ctxt,
request_spec['instance_type'],
request_spec['image'],
instance_values,
request_spec['security_group'],
request_spec['block_device_mapping'])
self.msg_runner.instance_update_at_top(ctxt, instance)
def _get_possible_cells(self):
cells = set(self.state_manager.get_child_cells())
our_cell = self.state_manager.get_my_state()
# Include our cell in the list, if we have any capacity info
if not cells or our_cell.capacities:
cells.add(our_cell)
return cells
def _run_instance(self, message, host_sched_kwargs):
"""Attempt to schedule instance(s). If we have no cells
to try, raise exception.NoCellsAvailable
"""
ctxt = message.ctxt
request_spec = host_sched_kwargs['request_spec']
# The message we might forward to a child cell
cells = self._get_possible_cells()
if not cells:
raise exception.NoCellsAvailable()
cells = list(cells)
# Random selection for now
random.shuffle(cells)
target_cell = cells[0]
LOG.debug(_("Scheduling with routing_path=%(routing_path)s"),
locals())
if target_cell.is_me:
# Need to create instance DB entries as the host scheduler
# expects that the instance(s) already exists.
self._create_instances_here(ctxt, request_spec)
self.scheduler_rpcapi.run_instance(ctxt,
**host_sched_kwargs)
return
self.msg_runner.schedule_run_instance(ctxt, target_cell,
host_sched_kwargs)
def run_instance(self, message, host_sched_kwargs):
"""Pick a cell where we should create a new instance."""
try:
for i in xrange(max(0, CONF.cells.scheduler_retries) + 1):
try:
return self._run_instance(message, host_sched_kwargs)
except exception.NoCellsAvailable:
if i == max(0, CONF.cells.scheduler_retries):
raise
sleep_time = max(1, CONF.cells.scheduler_retry_delay)
LOG.info(_("No cells available when scheduling. Will "
"retry in %(sleep_time)s second(s)"), locals())
time.sleep(sleep_time)
continue
except Exception:
request_spec = host_sched_kwargs['request_spec']
instance_uuids = request_spec['instance_uuids']
LOG.exception(_("Error scheduling instances %(instance_uuids)s"),
locals())
ctxt = message.ctxt
for instance_uuid in instance_uuids:
self.msg_runner.instance_update_at_top(ctxt,
{'uuid': instance_uuid,
'vm_state': vm_states.ERROR})
try:
self.db.instance_update(ctxt,
instance_uuid,
{'vm_state': vm_states.ERROR})
except Exception:
pass

346
nova/cells/state.py Normal file
View File

@ -0,0 +1,346 @@
# Copyright (c) 2012 Rackspace Hosting
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
CellState Manager
"""
import copy
import datetime
import functools
from nova.cells import rpc_driver
from nova import context
from nova.db import base
from nova.openstack.common import cfg
from nova.openstack.common import lockutils
from nova.openstack.common import log as logging
from nova.openstack.common import timeutils
cell_state_manager_opts = [
cfg.IntOpt('db_check_interval',
default=60,
help='Seconds between getting fresh cell info from db.'),
]
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
CONF.import_opt('host', 'nova.config')
CONF.import_opt('name', 'nova.cells.opts', group='cells')
#CONF.import_opt('capabilities', 'nova.cells.opts', group='cells')
CONF.register_opts(cell_state_manager_opts, group='cells')
class CellState(object):
"""Holds information for a particular cell."""
def __init__(self, cell_name, is_me=False):
self.name = cell_name
self.is_me = is_me
self.last_seen = datetime.datetime.min
self.capabilities = {}
self.capacities = {}
self.db_info = {}
# TODO(comstud): The DB will specify the driver to use to talk
# to this cell, but there's no column for this yet. The only
# available driver is the rpc driver.
self.driver = rpc_driver.CellsRPCDriver()
def update_db_info(self, cell_db_info):
"""Update cell credentials from db"""
self.db_info = dict(
[(k, v) for k, v in cell_db_info.iteritems()
if k != 'name'])
def update_capabilities(self, cell_metadata):
"""Update cell capabilities for a cell."""
self.last_seen = timeutils.utcnow()
self.capabilities = cell_metadata
def update_capacities(self, capacities):
"""Update capacity information for a cell."""
self.last_seen = timeutils.utcnow()
self.capacities = capacities
def get_cell_info(self):
"""Return subset of cell information for OS API use."""
db_fields_to_return = ['id', 'is_parent', 'weight_scale',
'weight_offset', 'username', 'rpc_host', 'rpc_port']
cell_info = dict(name=self.name, capabilities=self.capabilities)
if self.db_info:
for field in db_fields_to_return:
cell_info[field] = self.db_info[field]
return cell_info
def send_message(self, message):
"""Send a message to a cell. Just forward this to the driver,
passing ourselves and the message as arguments.
"""
self.driver.send_message_to_cell(self, message)
def __repr__(self):
me = "me" if self.is_me else "not_me"
return "Cell '%s' (%s)" % (self.name, me)
def sync_from_db(f):
"""Use as a decorator to wrap methods that use cell information to
make sure they sync the latest information from the DB periodically.
"""
@functools.wraps(f)
def wrapper(self, *args, **kwargs):
if self._time_to_sync():
self._cell_db_sync()
return f(self, *args, **kwargs)
return wrapper
class CellStateManager(base.Base):
def __init__(self, cell_state_cls=None):
super(CellStateManager, self).__init__()
if not cell_state_cls:
cell_state_cls = CellState
self.cell_state_cls = cell_state_cls
self.my_cell_state = cell_state_cls(CONF.cells.name, is_me=True)
self.parent_cells = {}
self.child_cells = {}
self.last_cell_db_check = datetime.datetime.min
self._cell_db_sync()
my_cell_capabs = {}
for cap in CONF.cells.capabilities:
name, value = cap.split('=', 1)
if ';' in value:
values = set(value.split(';'))
else:
values = set([value])
my_cell_capabs[name] = values
self.my_cell_state.update_capabilities(my_cell_capabs)
def _refresh_cells_from_db(self, ctxt):
"""Make our cell info map match the db."""
# Add/update existing cells ...
db_cells = self.db.cell_get_all(ctxt)
db_cells_dict = dict([(cell['name'], cell) for cell in db_cells])
# Update current cells. Delete ones that disappeared
for cells_dict in (self.parent_cells, self.child_cells):
for cell_name, cell_info in cells_dict.items():
is_parent = cell_info.db_info['is_parent']
db_dict = db_cells_dict.get(cell_name)
if db_dict and is_parent == db_dict['is_parent']:
cell_info.update_db_info(db_dict)
else:
del cells_dict[cell_name]
# Add new cells
for cell_name, db_info in db_cells_dict.items():
if db_info['is_parent']:
cells_dict = self.parent_cells
else:
cells_dict = self.child_cells
if cell_name not in cells_dict:
cells_dict[cell_name] = self.cell_state_cls(cell_name)
cells_dict[cell_name].update_db_info(db_info)
def _time_to_sync(self):
"""Is it time to sync the DB against our memory cache?"""
diff = timeutils.utcnow() - self.last_cell_db_check
return diff.seconds >= CONF.cells.db_check_interval
def _update_our_capacity(self, context):
"""Update our capacity in the self.my_cell_state CellState.
This will add/update 2 entries in our CellState.capacities,
'ram_free' and 'disk_free'.
The values of these are both dictionaries with the following
format:
{'total_mb': <total_memory_free_in_the_cell>,
'units_by_mb: <units_dictionary>}
<units_dictionary> contains the number of units that we can
build for every instance_type that we have. This number is
computed by looking at room available on every compute_node.
Take the following instance_types as an example:
[{'memory_mb': 1024, 'root_gb': 10, 'ephemeral_gb': 100},
{'memory_mb': 2048, 'root_gb': 20, 'ephemeral_gb': 200}]
capacities['ram_free']['units_by_mb'] would contain the following:
{'1024': <number_of_instances_that_will_fit>,
'2048': <number_of_instances_that_will_fit>}
capacities['disk_free']['units_by_mb'] would contain the following:
{'122880': <number_of_instances_that_will_fit>,
'225280': <number_of_instances_that_will_fit>}
Units are in MB, so 122880 = (10 + 100) * 1024.
NOTE(comstud): Perhaps we should only report a single number
available per instance_type.
"""
compute_hosts = {}
def _get_compute_hosts():
compute_nodes = self.db.compute_node_get_all(context)
for compute in compute_nodes:
service = compute['service']
if not service or service['disabled']:
continue
host = service['host']
compute_hosts[host] = {
'free_ram_mb': compute['free_ram_mb'],
'free_disk_mb': compute['free_disk_gb'] * 1024}
_get_compute_hosts()
if not compute_hosts:
self.my_cell_state.update_capacities({})
return
ram_mb_free_units = {}
disk_mb_free_units = {}
total_ram_mb_free = 0
total_disk_mb_free = 0
def _free_units(tot, per_inst):
if per_inst:
return max(0, int(tot / per_inst))
else:
return 0
def _update_from_values(values, instance_type):
memory_mb = instance_type['memory_mb']
disk_mb = (instance_type['root_gb'] +
instance_type['ephemeral_gb']) * 1024
ram_mb_free_units.setdefault(str(memory_mb), 0)
disk_mb_free_units.setdefault(str(disk_mb), 0)
ram_free_units = _free_units(compute_values['free_ram_mb'],
memory_mb)
disk_free_units = _free_units(compute_values['free_disk_mb'],
disk_mb)
ram_mb_free_units[str(memory_mb)] += ram_free_units
disk_mb_free_units[str(disk_mb)] += disk_free_units
instance_types = self.db.instance_type_get_all(context)
for compute_values in compute_hosts.values():
total_ram_mb_free += compute_values['free_ram_mb']
total_disk_mb_free += compute_values['free_disk_mb']
for instance_type in instance_types:
_update_from_values(compute_values, instance_type)
capacities = {'ram_free': {'total_mb': total_ram_mb_free,
'units_by_mb': ram_mb_free_units},
'disk_free': {'total_mb': total_disk_mb_free,
'units_by_mb': disk_mb_free_units}}
self.my_cell_state.update_capacities(capacities)
@lockutils.synchronized('cell-db-sync', 'nova-')
def _cell_db_sync(self):
"""Update status for all cells if it's time. Most calls to
this are from the check_for_update() decorator that checks
the time, but it checks outside of a lock. The duplicate
check here is to prevent multiple threads from pulling the
information simultaneously.
"""
if self._time_to_sync():
LOG.debug(_("Updating cell cache from db."))
self.last_cell_db_check = timeutils.utcnow()
ctxt = context.get_admin_context()
self._refresh_cells_from_db(ctxt)
self._update_our_capacity(ctxt)
@sync_from_db
def get_my_state(self):
"""Return information for my (this) cell."""
return self.my_cell_state
@sync_from_db
def get_child_cells(self):
"""Return list of child cell_infos."""
return self.child_cells.values()
@sync_from_db
def get_parent_cells(self):
"""Return list of parent cell_infos."""
return self.parent_cells.values()
@sync_from_db
def get_parent_cell(self, cell_name):
return self.parent_cells.get(cell_name)
@sync_from_db
def get_child_cell(self, cell_name):
return self.child_cells.get(cell_name)
@sync_from_db
def update_cell_capabilities(self, cell_name, capabilities):
"""Update capabilities for a cell."""
cell = self.child_cells.get(cell_name)
if not cell:
cell = self.parent_cells.get(cell_name)
if not cell:
LOG.error(_("Unknown cell '%(cell_name)s' when trying to "
"update capabilities"), locals())
return
# Make sure capabilities are sets.
for capab_name, values in capabilities.items():
capabilities[capab_name] = set(values)
cell.update_capabilities(capabilities)
@sync_from_db
def update_cell_capacities(self, cell_name, capacities):
"""Update capacities for a cell."""
cell = self.child_cells.get(cell_name)
if not cell:
cell = self.parent_cells.get(cell_name)
if not cell:
LOG.error(_("Unknown cell '%(cell_name)s' when trying to "
"update capacities"), locals())
return
cell.update_capacities(capacities)
@sync_from_db
def get_our_capabilities(self, include_children=True):
capabs = copy.deepcopy(self.my_cell_state.capabilities)
if include_children:
for cell in self.child_cells.values():
for capab_name, values in cell.capabilities.items():
if capab_name not in capabs:
capabs[capab_name] = set([])
capabs[capab_name] |= values
return capabs
def _add_to_dict(self, target, src):
for key, value in src.items():
if isinstance(value, dict):
target.setdefault(key, {})
self._add_to_dict(target[key], value)
continue
target.setdefault(key, 0)
target[key] += value
@sync_from_db
def get_our_capacities(self, include_children=True):
capacities = copy.deepcopy(self.my_cell_state.capacities)
if include_children:
for cell in self.child_cells.values():
self._add_to_dict(capacities, cell.capacities)
return capacities

View File

@ -1954,6 +1954,14 @@ class API(base.Base):
return {'url': connect_info['access_url']}
def get_vnc_connect_info(self, context, instance, console_type):
"""Used in a child cell to get console info."""
if not instance['host']:
raise exception.InstanceNotReady(instance_id=instance['uuid'])
connect_info = self.compute_rpcapi.get_vnc_console(context,
instance=instance, console_type=console_type)
return connect_info
@wrap_check_policy
def get_console_output(self, context, instance, tail_length=None):
"""Get console output for an instance."""

471
nova/compute/cells_api.py Normal file
View File

@ -0,0 +1,471 @@
# Copyright (c) 2012 Rackspace Hosting
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Compute API that proxies via Cells Service"""
from nova import block_device
from nova.cells import rpcapi as cells_rpcapi
from nova.compute import api as compute_api
from nova.compute import task_states
from nova.compute import vm_states
from nova import exception
from nova.openstack.common import excutils
from nova.openstack.common import log as logging
LOG = logging.getLogger(__name__)
check_instance_state = compute_api.check_instance_state
wrap_check_policy = compute_api.wrap_check_policy
check_policy = compute_api.check_policy
check_instance_lock = compute_api.check_instance_lock
def validate_cell(fn):
def _wrapped(self, context, instance, *args, **kwargs):
self._validate_cell(instance, fn.__name__)
return fn(self, context, instance, *args, **kwargs)
_wrapped.__name__ = fn.__name__
return _wrapped
class ComputeRPCAPINoOp(object):
def __getattr__(self, key):
def _noop_rpc_wrapper(*args, **kwargs):
return None
return _noop_rpc_wrapper
class SchedulerRPCAPIRedirect(object):
def __init__(self, cells_rpcapi_obj):
self.cells_rpcapi = cells_rpcapi_obj
def __getattr__(self, key):
def _noop_rpc_wrapper(*args, **kwargs):
return None
return _noop_rpc_wrapper
def run_instance(self, context, **kwargs):
self.cells_rpcapi.schedule_run_instance(context, **kwargs)
class ComputeCellsAPI(compute_api.API):
def __init__(self, *args, **kwargs):
super(ComputeCellsAPI, self).__init__(*args, **kwargs)
self.cells_rpcapi = cells_rpcapi.CellsAPI()
# Avoid casts/calls directly to compute
self.compute_rpcapi = ComputeRPCAPINoOp()
# Redirect scheduler run_instance to cells.
self.scheduler_rpcapi = SchedulerRPCAPIRedirect(self.cells_rpcapi)
def _cell_read_only(self, cell_name):
"""Is the target cell in a read-only mode?"""
# FIXME(comstud): Add support for this.
return False
def _validate_cell(self, instance, method):
cell_name = instance['cell_name']
if not cell_name:
raise exception.InstanceUnknownCell(
instance_uuid=instance['uuid'])
if self._cell_read_only(cell_name):
raise exception.InstanceInvalidState(
attr="vm_state",
instance_uuid=instance['uuid'],
state="temporary_readonly",
method=method)
def _cast_to_cells(self, context, instance, method, *args, **kwargs):
instance_uuid = instance['uuid']
cell_name = instance['cell_name']
if not cell_name:
raise exception.InstanceUnknownCell(instance_uuid=instance_uuid)
self.cells_rpcapi.cast_compute_api_method(context, cell_name,
method, instance_uuid, *args, **kwargs)
def _call_to_cells(self, context, instance, method, *args, **kwargs):
instance_uuid = instance['uuid']
cell_name = instance['cell_name']
if not cell_name:
raise exception.InstanceUnknownCell(instance_uuid=instance_uuid)
return self.cells_rpcapi.call_compute_api_method(context, cell_name,
method, instance_uuid, *args, **kwargs)
def _check_requested_networks(self, context, requested_networks):
"""Override compute API's checking of this. It'll happen in
child cell
"""
return
def _validate_image_href(self, context, image_href):
"""Override compute API's checking of this. It'll happen in
child cell
"""
return
def _create_image(self, context, instance, name, image_type,
backup_type=None, rotation=None, extra_properties=None):
if backup_type:
return self._call_to_cells(context, instance, 'backup',
name, backup_type, rotation,
extra_properties=extra_properties)
else:
return self._call_to_cells(context, instance, 'snapshot',
name, extra_properties=extra_properties)
def create(self, *args, **kwargs):
"""We can use the base functionality, but I left this here just
for completeness.
"""
return super(ComputeCellsAPI, self).create(*args, **kwargs)
@validate_cell
def update(self, context, instance, **kwargs):
"""Update an instance."""
rv = super(ComputeCellsAPI, self).update(context,
instance, **kwargs)
# We need to skip vm_state/task_state updates... those will
# happen when via a a _cast_to_cells for running a different
# compute api method
kwargs_copy = kwargs.copy()
kwargs_copy.pop('vm_state', None)
kwargs_copy.pop('task_state', None)
if kwargs_copy:
try:
self._cast_to_cells(context, instance, 'update',
**kwargs_copy)
except exception.InstanceUnknownCell:
pass
return rv
def _local_delete(self, context, instance, bdms):
# This will get called for every delete in the API cell
# because _delete() in compute/api.py will not find a
# service when checking if it's up.
# We need to only take action if there's no cell_name. Our
# overrides of delete() and soft_delete() will take care of
# the rest.
cell_name = instance['cell_name']
if not cell_name:
return super(ComputeCellsAPI, self)._local_delete(context,
instance, bdms)
def soft_delete(self, context, instance):
self._handle_cell_delete(context, instance,
super(ComputeCellsAPI, self).soft_delete, 'soft_delete')
def delete(self, context, instance):
self._handle_cell_delete(context, instance,
super(ComputeCellsAPI, self).delete, 'delete')
def _handle_cell_delete(self, context, instance, method, method_name):
"""Terminate an instance."""
# We can't use the decorator because we have special logic in the
# case we don't know the cell_name...
cell_name = instance['cell_name']
if cell_name and self._cell_read_only(cell_name):
raise exception.InstanceInvalidState(
attr="vm_state",
instance_uuid=instance['uuid'],
state="temporary_readonly",
method=method_name)
method(context, instance)
try:
self._cast_to_cells(context, instance, method_name)
except exception.InstanceUnknownCell:
# If there's no cell, there's also no host... which means
# the instance was destroyed from the DB here. Let's just
# broadcast a message down to all cells and hope this ends
# up resolving itself... Worse case.. the instance will
# show back up again here.
delete_type = method == 'soft_delete' and 'soft' or 'hard'
self.cells_rpcapi.instance_delete_everywhere(context,
instance['uuid'], delete_type)
@validate_cell
def restore(self, context, instance):
"""Restore a previously deleted (but not reclaimed) instance."""
super(ComputeCellsAPI, self).restore(context, instance)
self._cast_to_cells(context, instance, 'restore')
@validate_cell
def force_delete(self, context, instance):
"""Force delete a previously deleted (but not reclaimed) instance."""
super(ComputeCellsAPI, self).force_delete(context, instance)
self._cast_to_cells(context, instance, 'force_delete')
@validate_cell
def stop(self, context, instance, do_cast=True):
"""Stop an instance."""
super(ComputeCellsAPI, self).stop(context, instance)
if do_cast:
self._cast_to_cells(context, instance, 'stop', do_cast=True)
else:
return self._call_to_cells(context, instance, 'stop',
do_cast=False)
@validate_cell
def start(self, context, instance):
"""Start an instance."""
super(ComputeCellsAPI, self).start(context, instance)
self._cast_to_cells(context, instance, 'start')
@validate_cell
def reboot(self, context, instance, *args, **kwargs):
"""Reboot the given instance."""
super(ComputeCellsAPI, self).reboot(context, instance,
*args, **kwargs)
self._cast_to_cells(context, instance, 'reboot', *args,
**kwargs)
@validate_cell
def rebuild(self, context, instance, *args, **kwargs):
"""Rebuild the given instance with the provided attributes."""
super(ComputeCellsAPI, self).rebuild(context, instance, *args,
**kwargs)
self._cast_to_cells(context, instance, 'rebuild', *args, **kwargs)
@check_instance_state(vm_state=[vm_states.RESIZED])
@validate_cell
def revert_resize(self, context, instance):
"""Reverts a resize, deleting the 'new' instance in the process."""
# NOTE(markwash): regular api manipulates the migration here, but we
# don't have access to it. So to preserve the interface just update the
# vm and task state.
self.update(context, instance,
task_state=task_states.RESIZE_REVERTING)
self._cast_to_cells(context, instance, 'revert_resize')
@check_instance_state(vm_state=[vm_states.RESIZED])
@validate_cell
def confirm_resize(self, context, instance):
"""Confirms a migration/resize and deletes the 'old' instance."""
# NOTE(markwash): regular api manipulates migration here, but we don't
# have the migration in the api database. So to preserve the interface
# just update the vm and task state without calling super()
self.update(context, instance, task_state=None,
vm_state=vm_states.ACTIVE)
self._cast_to_cells(context, instance, 'confirm_resize')
@check_instance_state(vm_state=[vm_states.ACTIVE, vm_states.STOPPED],
task_state=[None])
@validate_cell
def resize(self, context, instance, *args, **kwargs):
"""Resize (ie, migrate) a running instance.
If flavor_id is None, the process is considered a migration, keeping
the original flavor_id. If flavor_id is not None, the instance should
be migrated to a new host and resized to the new flavor_id.
"""
super(ComputeCellsAPI, self).resize(context, instance, *args,
**kwargs)
# FIXME(comstud): pass new instance_type object down to a method
# that'll unfold it
self._cast_to_cells(context, instance, 'resize', *args, **kwargs)
@validate_cell
def add_fixed_ip(self, context, instance, *args, **kwargs):
"""Add fixed_ip from specified network to given instance."""
super(ComputeCellsAPI, self).add_fixed_ip(context, instance,
*args, **kwargs)
self._cast_to_cells(context, instance, 'add_fixed_ip',
*args, **kwargs)
@validate_cell
def remove_fixed_ip(self, context, instance, *args, **kwargs):
"""Remove fixed_ip from specified network to given instance."""
super(ComputeCellsAPI, self).remove_fixed_ip(context, instance,
*args, **kwargs)
self._cast_to_cells(context, instance, 'remove_fixed_ip',
*args, **kwargs)
@validate_cell
def pause(self, context, instance):
"""Pause the given instance."""
super(ComputeCellsAPI, self).pause(context, instance)
self._cast_to_cells(context, instance, 'pause')
@validate_cell
def unpause(self, context, instance):
"""Unpause the given instance."""
super(ComputeCellsAPI, self).unpause(context, instance)
self._cast_to_cells(context, instance, 'unpause')
def set_host_enabled(self, context, host, enabled):
"""Sets the specified host's ability to accept new instances."""
# FIXME(comstud): Since there's no instance here, we have no
# idea which cell should be the target.
pass
def host_power_action(self, context, host, action):
"""Reboots, shuts down or powers up the host."""
# FIXME(comstud): Since there's no instance here, we have no
# idea which cell should be the target.
pass
def get_diagnostics(self, context, instance):
"""Retrieve diagnostics for the given instance."""
# FIXME(comstud): Cache this?
# Also: only calling super() to get state/policy checking
super(ComputeCellsAPI, self).get_diagnostics(context, instance)
return self._call_to_cells(context, instance, 'get_diagnostics')
@validate_cell
def suspend(self, context, instance):
"""Suspend the given instance."""
super(ComputeCellsAPI, self).suspend(context, instance)
self._cast_to_cells(context, instance, 'suspend')
@validate_cell
def resume(self, context, instance):
"""Resume the given instance."""
super(ComputeCellsAPI, self).resume(context, instance)
self._cast_to_cells(context, instance, 'resume')
@validate_cell
def rescue(self, context, instance, rescue_password=None):
"""Rescue the given instance."""
super(ComputeCellsAPI, self).rescue(context, instance,
rescue_password=rescue_password)
self._cast_to_cells(context, instance, 'rescue',
rescue_password=rescue_password)
@validate_cell
def unrescue(self, context, instance):
"""Unrescue the given instance."""
super(ComputeCellsAPI, self).unrescue(context, instance)
self._cast_to_cells(context, instance, 'unrescue')
@validate_cell
def set_admin_password(self, context, instance, password=None):
"""Set the root/admin password for the given instance."""
super(ComputeCellsAPI, self).set_admin_password(context, instance,
password=password)
self._cast_to_cells(context, instance, 'set_admin_password',
password=password)
@validate_cell
def inject_file(self, context, instance, *args, **kwargs):
"""Write a file to the given instance."""
super(ComputeCellsAPI, self).inject_file(context, instance, *args,
**kwargs)
self._cast_to_cells(context, instance, 'inject_file', *args, **kwargs)
@wrap_check_policy
@validate_cell
def get_vnc_console(self, context, instance, console_type):
"""Get a url to a VNC Console."""
if not instance['host']:
raise exception.InstanceNotReady(instance_id=instance['uuid'])
connect_info = self._call_to_cells(context, instance,
'get_vnc_connect_info', console_type)
self.consoleauth_rpcapi.authorize_console(context,
connect_info['token'], console_type, connect_info['host'],
connect_info['port'], connect_info['internal_access_path'])
return {'url': connect_info['access_url']}
@validate_cell
def get_console_output(self, context, instance, *args, **kwargs):
"""Get console output for an an instance."""
# NOTE(comstud): Calling super() just to get policy check
super(ComputeCellsAPI, self).get_console_output(context, instance,
*args, **kwargs)
return self._call_to_cells(context, instance, 'get_console_output',
*args, **kwargs)
def lock(self, context, instance):
"""Lock the given instance."""
super(ComputeCellsAPI, self).lock(context, instance)
self._cast_to_cells(context, instance, 'lock')
def unlock(self, context, instance):
"""Unlock the given instance."""
super(ComputeCellsAPI, self).lock(context, instance)
self._cast_to_cells(context, instance, 'unlock')
@validate_cell
def reset_network(self, context, instance):
"""Reset networking on the instance."""
super(ComputeCellsAPI, self).reset_network(context, instance)
self._cast_to_cells(context, instance, 'reset_network')
@validate_cell
def inject_network_info(self, context, instance):
"""Inject network info for the instance."""
super(ComputeCellsAPI, self).inject_network_info(context, instance)
self._cast_to_cells(context, instance, 'inject_network_info')
@wrap_check_policy
@validate_cell
def attach_volume(self, context, instance, volume_id, device=None):
"""Attach an existing volume to an existing instance."""
if device and not block_device.match_device(device):
raise exception.InvalidDevicePath(path=device)
device = self.compute_rpcapi.reserve_block_device_name(
context, device=device, instance=instance, volume_id=volume_id)
try:
volume = self.volume_api.get(context, volume_id)
self.volume_api.check_attach(context, volume)
except Exception:
with excutils.save_and_reraise_exception():
self.db.block_device_mapping_destroy_by_instance_and_device(
context, instance['uuid'], device)
self._cast_to_cells(context, instance, 'attach_volume',
volume_id, device)
@check_instance_lock
@validate_cell
def _detach_volume(self, context, instance, volume_id):
"""Detach a volume from an instance."""
check_policy(context, 'detach_volume', instance)
volume = self.volume_api.get(context, volume_id)
self.volume_api.check_detach(context, volume)
self._cast_to_cells(context, instance, 'detach_volume',
volume_id)
@wrap_check_policy
@validate_cell
def associate_floating_ip(self, context, instance, address):
"""Makes calls to network_api to associate_floating_ip.
:param address: is a string floating ip address
"""
self._cast_to_cells(context, instance, 'associate_floating_ip',
address)
@validate_cell
def delete_instance_metadata(self, context, instance, key):
"""Delete the given metadata item from an instance."""
super(ComputeCellsAPI, self).delete_instance_metadata(context,
instance, key)
self._cast_to_cells(context, instance, 'delete_instance_metadata',
key)
@wrap_check_policy
@validate_cell
def update_instance_metadata(self, context, instance,
metadata, delete=False):
rv = super(ComputeCellsAPI, self).update_instance_metadata(context,
instance, metadata, delete=delete)
try:
self._cast_to_cells(context, instance,
'update_instance_metadata',
metadata, delete=delete)
except exception.InstanceUnknownCell:
pass
return rv

View File

@ -43,8 +43,10 @@ these objects be simple dictionaries.
"""
from nova.cells import rpcapi as cells_rpcapi
from nova import exception
from nova.openstack.common import cfg
from nova.openstack.common import log as logging
from nova import utils
@ -68,6 +70,7 @@ CONF.register_opts(db_opts)
IMPL = utils.LazyPluggable('db_backend',
sqlalchemy='nova.db.sqlalchemy.api')
LOG = logging.getLogger(__name__)
class NoMoreNetworks(exception.NovaException):
@ -566,9 +569,16 @@ def instance_data_get_for_project(context, project_id, session=None):
session=session)
def instance_destroy(context, instance_uuid, constraint=None):
def instance_destroy(context, instance_uuid, constraint=None,
update_cells=True):
"""Destroy the instance or raise if it does not exist."""
return IMPL.instance_destroy(context, instance_uuid, constraint)
rv = IMPL.instance_destroy(context, instance_uuid, constraint)
if update_cells:
try:
cells_rpcapi.CellsAPI().instance_destroy_at_top(context, rv)
except Exception:
LOG.exception(_("Failed to notify cells of instance destroy"))
return rv
def instance_get_by_uuid(context, uuid):
@ -665,13 +675,19 @@ def instance_test_and_set(context, instance_uuid, attr, ok_states,
ok_states, new_state)
def instance_update(context, instance_uuid, values):
def instance_update(context, instance_uuid, values, update_cells=True):
"""Set the given properties on an instance and update it.
Raises NotFound if instance does not exist.
"""
return IMPL.instance_update(context, instance_uuid, values)
rv = IMPL.instance_update(context, instance_uuid, values)
if update_cells:
try:
cells_rpcapi.CellsAPI().instance_update_at_top(context, rv)
except Exception:
LOG.exception(_("Failed to notify cells of instance update"))
return rv
def instance_update_and_get_original(context, instance_uuid, values):
@ -687,8 +703,12 @@ def instance_update_and_get_original(context, instance_uuid, values):
Raises NotFound if instance does not exist.
"""
return IMPL.instance_update_and_get_original(context, instance_uuid,
values)
rv = IMPL.instance_update_and_get_original(context, instance_uuid, values)
try:
cells_rpcapi.CellsAPI().instance_update_at_top(context, rv[1])
except Exception:
LOG.exception(_("Failed to notify cells of instance update"))
return rv
def instance_add_security_group(context, instance_id, security_group_id):
@ -714,13 +734,21 @@ def instance_info_cache_get(context, instance_uuid):
return IMPL.instance_info_cache_get(context, instance_uuid)
def instance_info_cache_update(context, instance_uuid, values):
def instance_info_cache_update(context, instance_uuid, values,
update_cells=True):
"""Update an instance info cache record in the table.
:param instance_uuid: = uuid of info cache's instance
:param values: = dict containing column values to update
"""
return IMPL.instance_info_cache_update(context, instance_uuid, values)
rv = IMPL.instance_info_cache_update(context, instance_uuid, values)
try:
cells_rpcapi.CellsAPI().instance_info_cache_update_at_top(context,
rv)
except Exception:
LOG.exception(_("Failed to notify cells of instance info cache "
"update"))
return rv
def instance_info_cache_delete(context, instance_uuid):
@ -1354,7 +1382,7 @@ def instance_metadata_delete(context, instance_uuid, key):
def instance_metadata_update(context, instance_uuid, metadata, delete):
"""Update metadata if it exists, otherwise create it."""
return IMPL.instance_metadata_update(context, instance_uuid,
metadata, delete)
metadata, delete)
####################
@ -1414,12 +1442,21 @@ def bw_usage_get_by_uuids(context, uuids, start_period):
def bw_usage_update(context, uuid, mac, start_period, bw_in, bw_out,
last_ctr_in, last_ctr_out, last_refreshed=None):
last_ctr_in, last_ctr_out, last_refreshed=None,
update_cells=True):
"""Update cached bandwidth usage for an instance's network based on mac
address. Creates new record if needed.
"""
return IMPL.bw_usage_update(context, uuid, mac, start_period, bw_in,
rv = IMPL.bw_usage_update(context, uuid, mac, start_period, bw_in,
bw_out, last_ctr_in, last_ctr_out, last_refreshed=last_refreshed)
if update_cells:
try:
cells_rpcapi.CellsAPI().bw_usage_update_at_top(context,
uuid, mac, start_period, bw_in, bw_out,
last_ctr_in, last_ctr_out, last_refreshed)
except Exception:
LOG.exception(_("Failed to notify cells of bw_usage update"))
return rv
####################
@ -1555,9 +1592,15 @@ def aggregate_host_delete(context, aggregate_id, host):
####################
def instance_fault_create(context, values):
def instance_fault_create(context, values, update_cells=True):
"""Create a new Instance Fault."""
return IMPL.instance_fault_create(context, values)
rv = IMPL.instance_fault_create(context, values)
if update_cells:
try:
cells_rpcapi.CellsAPI().instance_fault_create_at_top(context, rv)
except Exception:
LOG.exception(_("Failed to notify cells of instance fault"))
return rv
def instance_fault_get_by_instance_uuids(context, instance_uuids):

View File

@ -769,6 +769,34 @@ class CellNotFound(NotFound):
message = _("Cell %(cell_id)s could not be found.")
class CellRoutingInconsistency(NovaException):
message = _("Inconsistency in cell routing: %(reason)s")
class CellServiceAPIMethodNotFound(NotFound):
message = _("Service API method not found: %(detail)s")
class CellTimeout(NotFound):
message = _("Timeout waiting for response from cell")
class CellMaxHopCountReached(NovaException):
message = _("Cell message has reached maximum hop count: %(hop_count)s")
class NoCellsAvailable(NovaException):
message = _("No cells available matching scheduling criteria.")
class CellError(NovaException):
message = _("Exception received during cell processing: %(exc_name)s.")
class InstanceUnknownCell(NotFound):
message = _("Cell is not known for instance %(instance_uuid)s")
class SchedulerHostFilterNotFound(NotFound):
message = _("Scheduler Host Filter %(filter_name)s could not be found.")

View File

@ -0,0 +1,19 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (c) 2012 Rackspace Hosting
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# NOTE(vish): this forces the fixtures from tests/__init.py:setup() to work
from nova.tests import *

191
nova/tests/cells/fakes.py Normal file
View File

@ -0,0 +1,191 @@
# Copyright (c) 2012 Rackspace Hosting
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Fakes For Cells tests.
"""
from nova.cells import driver
from nova.cells import manager as cells_manager
from nova.cells import messaging
from nova.cells import state as cells_state
import nova.db
from nova.db import base
from nova.openstack.common import cfg
CONF = cfg.CONF
CONF.import_opt('name', 'nova.cells.opts', group='cells')
# Fake Cell Hierarchy
FAKE_TOP_LEVEL_CELL_NAME = 'api-cell'
FAKE_CELL_LAYOUT = [{'child-cell1': []},
{'child-cell2': [{'grandchild-cell1': []}]},
{'child-cell3': [{'grandchild-cell2': []},
{'grandchild-cell3': []}]},
{'child-cell4': []}]
# build_cell_stub_infos() below will take the above layout and create
# a fake view of the DB from the perspective of each of the cells.
# For each cell, a CellStubInfo will be created with this info.
CELL_NAME_TO_STUB_INFO = {}
class FakeDBApi(object):
def __init__(self, cell_db_entries):
self.cell_db_entries = cell_db_entries
def __getattr__(self, key):
return getattr(nova.db, key)
def cell_get_all(self, ctxt):
return self.cell_db_entries
def compute_node_get_all(self, ctxt):
return []
class FakeCellsDriver(driver.BaseCellsDriver):
pass
class FakeCellState(cells_state.CellState):
def send_message(self, message):
message_runner = get_message_runner(self.name)
orig_ctxt = message.ctxt
json_message = message.to_json()
message = message_runner.message_from_json(json_message)
# Restore this so we can use mox and verify same context
message.ctxt = orig_ctxt
message.process()
class FakeCellStateManager(cells_state.CellStateManager):
def __init__(self, *args, **kwargs):
super(FakeCellStateManager, self).__init__(*args,
cell_state_cls=FakeCellState, **kwargs)
class FakeCellsManager(cells_manager.CellsManager):
def __init__(self, *args, **kwargs):
super(FakeCellsManager, self).__init__(*args,
cell_state_manager=FakeCellStateManager,
**kwargs)
class CellStubInfo(object):
def __init__(self, test_case, cell_name, db_entries):
self.test_case = test_case
self.cell_name = cell_name
self.db_entries = db_entries
def fake_base_init(_self, *args, **kwargs):
_self.db = FakeDBApi(db_entries)
test_case.stubs.Set(base.Base, '__init__', fake_base_init)
self.cells_manager = FakeCellsManager()
# Fix the cell name, as it normally uses CONF.cells.name
msg_runner = self.cells_manager.msg_runner
msg_runner.our_name = self.cell_name
self.cells_manager.state_manager.my_cell_state.name = self.cell_name
def _build_cell_stub_info(test_case, our_name, parent_path, children):
cell_db_entries = []
cur_db_id = 1
sep_char = messaging._PATH_CELL_SEP
if parent_path:
cell_db_entries.append(
dict(id=cur_db_id,
name=parent_path.split(sep_char)[-1],
is_parent=True,
username='username%s' % cur_db_id,
password='password%s' % cur_db_id,
rpc_host='rpc_host%s' % cur_db_id,
rpc_port='rpc_port%s' % cur_db_id,
rpc_virtual_host='rpc_vhost%s' % cur_db_id))
cur_db_id += 1
our_path = parent_path + sep_char + our_name
else:
our_path = our_name
for child in children:
for child_name, grandchildren in child.items():
_build_cell_stub_info(test_case, child_name, our_path,
grandchildren)
cell_entry = dict(id=cur_db_id,
name=child_name,
username='username%s' % cur_db_id,
password='password%s' % cur_db_id,
rpc_host='rpc_host%s' % cur_db_id,
rpc_port='rpc_port%s' % cur_db_id,
rpc_virtual_host='rpc_vhost%s' % cur_db_id,
is_parent=False)
cell_db_entries.append(cell_entry)
cur_db_id += 1
stub_info = CellStubInfo(test_case, our_name, cell_db_entries)
CELL_NAME_TO_STUB_INFO[our_name] = stub_info
def _build_cell_stub_infos(test_case):
_build_cell_stub_info(test_case, FAKE_TOP_LEVEL_CELL_NAME, '',
FAKE_CELL_LAYOUT)
def init(test_case):
global CELL_NAME_TO_STUB_INFO
test_case.flags(driver='nova.tests.cells.fakes.FakeCellsDriver',
group='cells')
CELL_NAME_TO_STUB_INFO = {}
_build_cell_stub_infos(test_case)
def _get_cell_stub_info(cell_name):
return CELL_NAME_TO_STUB_INFO[cell_name]
def get_state_manager(cell_name):
return _get_cell_stub_info(cell_name).cells_manager.state_manager
def get_cell_state(cur_cell_name, tgt_cell_name):
state_manager = get_state_manager(cur_cell_name)
cell = state_manager.child_cells.get(tgt_cell_name)
if cell is None:
cell = state_manager.parent_cells.get(tgt_cell_name)
return cell
def get_cells_manager(cell_name):
return _get_cell_stub_info(cell_name).cells_manager
def get_message_runner(cell_name):
return _get_cell_stub_info(cell_name).cells_manager.msg_runner
def stub_tgt_method(test_case, cell_name, method_name, method):
msg_runner = get_message_runner(cell_name)
tgt_msg_methods = msg_runner.methods_by_type['targeted']
setattr(tgt_msg_methods, method_name, method)
def stub_bcast_method(test_case, cell_name, method_name, method):
msg_runner = get_message_runner(cell_name)
tgt_msg_methods = msg_runner.methods_by_type['broadcast']
setattr(tgt_msg_methods, method_name, method)
def stub_bcast_methods(test_case, method_name, method):
for cell_name in CELL_NAME_TO_STUB_INFO.keys():
stub_bcast_method(test_case, cell_name, method_name, method)

View File

@ -0,0 +1,151 @@
# Copyright (c) 2012 Rackspace Hosting
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Tests For CellsManager
"""
from nova.cells import messaging
from nova import context
from nova import test
from nova.tests.cells import fakes
class CellsManagerClassTestCase(test.TestCase):
"""Test case for CellsManager class"""
def setUp(self):
super(CellsManagerClassTestCase, self).setUp()
fakes.init(self)
# pick a child cell to use for tests.
self.our_cell = 'grandchild-cell1'
self.cells_manager = fakes.get_cells_manager(self.our_cell)
self.msg_runner = self.cells_manager.msg_runner
self.driver = self.cells_manager.driver
self.ctxt = 'fake_context'
def test_post_start_hook_child_cell(self):
self.mox.StubOutWithMock(self.driver, 'start_consumers')
self.mox.StubOutWithMock(context, 'get_admin_context')
self.mox.StubOutWithMock(self.cells_manager, '_update_our_parents')
self.driver.start_consumers(self.msg_runner)
context.get_admin_context().AndReturn(self.ctxt)
self.cells_manager._update_our_parents(self.ctxt)
self.mox.ReplayAll()
self.cells_manager.post_start_hook()
def test_post_start_hook_middle_cell(self):
cells_manager = fakes.get_cells_manager('child-cell2')
msg_runner = cells_manager.msg_runner
driver = cells_manager.driver
self.mox.StubOutWithMock(driver, 'start_consumers')
self.mox.StubOutWithMock(context, 'get_admin_context')
self.mox.StubOutWithMock(msg_runner,
'ask_children_for_capabilities')
self.mox.StubOutWithMock(msg_runner,
'ask_children_for_capacities')
driver.start_consumers(msg_runner)
context.get_admin_context().AndReturn(self.ctxt)
msg_runner.ask_children_for_capabilities(self.ctxt)
msg_runner.ask_children_for_capacities(self.ctxt)
self.mox.ReplayAll()
cells_manager.post_start_hook()
def test_update_our_parents(self):
self.mox.StubOutWithMock(self.msg_runner,
'tell_parents_our_capabilities')
self.mox.StubOutWithMock(self.msg_runner,
'tell_parents_our_capacities')
self.msg_runner.tell_parents_our_capabilities(self.ctxt)
self.msg_runner.tell_parents_our_capacities(self.ctxt)
self.mox.ReplayAll()
self.cells_manager._update_our_parents(self.ctxt)
def test_schedule_run_instance(self):
host_sched_kwargs = 'fake_host_sched_kwargs_silently_passed'
self.mox.StubOutWithMock(self.msg_runner, 'schedule_run_instance')
our_cell = self.msg_runner.state_manager.get_my_state()
self.msg_runner.schedule_run_instance(self.ctxt, our_cell,
host_sched_kwargs)
self.mox.ReplayAll()
self.cells_manager.schedule_run_instance(self.ctxt,
host_sched_kwargs=host_sched_kwargs)
def test_run_compute_api_method(self):
# Args should just be silently passed through
cell_name = 'fake-cell-name'
method_info = 'fake-method-info'
fake_response = messaging.Response('fake', 'fake', False)
self.mox.StubOutWithMock(self.msg_runner,
'run_compute_api_method')
self.mox.StubOutWithMock(fake_response,
'value_or_raise')
self.msg_runner.run_compute_api_method(self.ctxt,
cell_name,
method_info,
True).AndReturn(fake_response)
fake_response.value_or_raise().AndReturn('fake-response')
self.mox.ReplayAll()
response = self.cells_manager.run_compute_api_method(
self.ctxt, cell_name=cell_name, method_info=method_info,
call=True)
self.assertEqual('fake-response', response)
def test_instance_update_at_top(self):
self.mox.StubOutWithMock(self.msg_runner, 'instance_update_at_top')
self.msg_runner.instance_update_at_top(self.ctxt, 'fake-instance')
self.mox.ReplayAll()
self.cells_manager.instance_update_at_top(self.ctxt,
instance='fake-instance')
def test_instance_destroy_at_top(self):
self.mox.StubOutWithMock(self.msg_runner, 'instance_destroy_at_top')
self.msg_runner.instance_destroy_at_top(self.ctxt, 'fake-instance')
self.mox.ReplayAll()
self.cells_manager.instance_destroy_at_top(self.ctxt,
instance='fake-instance')
def test_instance_delete_everywhere(self):
self.mox.StubOutWithMock(self.msg_runner,
'instance_delete_everywhere')
self.msg_runner.instance_delete_everywhere(self.ctxt,
'fake-instance',
'fake-type')
self.mox.ReplayAll()
self.cells_manager.instance_delete_everywhere(
self.ctxt, instance='fake-instance',
delete_type='fake-type')
def test_instance_fault_create_at_top(self):
self.mox.StubOutWithMock(self.msg_runner,
'instance_fault_create_at_top')
self.msg_runner.instance_fault_create_at_top(self.ctxt,
'fake-fault')
self.mox.ReplayAll()
self.cells_manager.instance_fault_create_at_top(
self.ctxt, instance_fault='fake-fault')
def test_bw_usage_update_at_top(self):
self.mox.StubOutWithMock(self.msg_runner,
'bw_usage_update_at_top')
self.msg_runner.bw_usage_update_at_top(self.ctxt,
'fake-bw-info')
self.mox.ReplayAll()
self.cells_manager.bw_usage_update_at_top(
self.ctxt, bw_update_info='fake-bw-info')

View File

@ -0,0 +1,913 @@
# Copyright (c) 2012 Rackspace Hosting # All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Tests For Cells Messaging module
"""
from nova.cells import messaging
from nova import context
from nova import exception
from nova.openstack.common import cfg
from nova import test
from nova.tests.cells import fakes
CONF = cfg.CONF
CONF.import_opt('host', 'nova.config')
CONF.import_opt('name', 'nova.cells.opts', group='cells')
CONF.import_opt('allowed_rpc_exception_modules',
'nova.openstack.common.rpc')
class CellsMessageClassesTestCase(test.TestCase):
"""Test case for the main Cells Message classes."""
def setUp(self):
super(CellsMessageClassesTestCase, self).setUp()
fakes.init(self)
self.ctxt = context.RequestContext('fake', 'fake')
# Need to be able to deserialize test.TestingException.
allowed_modules = CONF.allowed_rpc_exception_modules
allowed_modules.append('nova.test')
self.flags(allowed_rpc_exception_modules=allowed_modules)
self.our_name = 'api-cell'
self.msg_runner = fakes.get_message_runner(self.our_name)
self.state_manager = self.msg_runner.state_manager
def test_reverse_path(self):
path = 'a!b!c!d'
expected = 'd!c!b!a'
rev_path = messaging._reverse_path(path)
self.assertEqual(rev_path, expected)
def test_response_cell_name_from_path(self):
# test array with tuples of inputs/expected outputs
test_paths = [('cell1', 'cell1'),
('cell1!cell2', 'cell2!cell1'),
('cell1!cell2!cell3', 'cell3!cell2!cell1')]
for test_input, expected_output in test_paths:
self.assertEqual(expected_output,
messaging._response_cell_name_from_path(test_input))
def test_response_cell_name_from_path_neighbor_only(self):
# test array with tuples of inputs/expected outputs
test_paths = [('cell1', 'cell1'),
('cell1!cell2', 'cell2!cell1'),
('cell1!cell2!cell3', 'cell3!cell2')]
for test_input, expected_output in test_paths:
self.assertEqual(expected_output,
messaging._response_cell_name_from_path(test_input,
neighbor_only=True))
def test_targeted_message(self):
self.flags(max_hop_count=99, group='cells')
target_cell = 'api-cell!child-cell2!grandchild-cell1'
method = 'fake_method'
method_kwargs = dict(arg1=1, arg2=2)
direction = 'down'
tgt_message = messaging._TargetedMessage(self.msg_runner,
self.ctxt, method,
method_kwargs, direction,
target_cell)
self.assertEqual(self.ctxt, tgt_message.ctxt)
self.assertEqual(method, tgt_message.method_name)
self.assertEqual(method_kwargs, tgt_message.method_kwargs)
self.assertEqual(direction, tgt_message.direction)
self.assertEqual(target_cell, target_cell)
self.assertFalse(tgt_message.fanout)
self.assertFalse(tgt_message.need_response)
self.assertEqual(self.our_name, tgt_message.routing_path)
self.assertEqual(1, tgt_message.hop_count)
self.assertEqual(99, tgt_message.max_hop_count)
self.assertFalse(tgt_message.is_broadcast)
# Correct next hop?
next_hop = tgt_message._get_next_hop()
child_cell = self.state_manager.get_child_cell('child-cell2')
self.assertEqual(child_cell, next_hop)
def test_create_targeted_message_with_response(self):
self.flags(max_hop_count=99, group='cells')
our_name = 'child-cell1'
target_cell = 'child-cell1!api-cell'
msg_runner = fakes.get_message_runner(our_name)
method = 'fake_method'
method_kwargs = dict(arg1=1, arg2=2)
direction = 'up'
tgt_message = messaging._TargetedMessage(msg_runner,
self.ctxt, method,
method_kwargs, direction,
target_cell,
need_response=True)
self.assertEqual(self.ctxt, tgt_message.ctxt)
self.assertEqual(method, tgt_message.method_name)
self.assertEqual(method_kwargs, tgt_message.method_kwargs)
self.assertEqual(direction, tgt_message.direction)
self.assertEqual(target_cell, target_cell)
self.assertFalse(tgt_message.fanout)
self.assertTrue(tgt_message.need_response)
self.assertEqual(our_name, tgt_message.routing_path)
self.assertEqual(1, tgt_message.hop_count)
self.assertEqual(99, tgt_message.max_hop_count)
self.assertFalse(tgt_message.is_broadcast)
# Correct next hop?
next_hop = tgt_message._get_next_hop()
parent_cell = msg_runner.state_manager.get_parent_cell('api-cell')
self.assertEqual(parent_cell, next_hop)
def test_targeted_message_when_target_is_cell_state(self):
method = 'fake_method'
method_kwargs = dict(arg1=1, arg2=2)
direction = 'down'
target_cell = self.state_manager.get_child_cell('child-cell2')
tgt_message = messaging._TargetedMessage(self.msg_runner,
self.ctxt, method,
method_kwargs, direction,
target_cell)
self.assertEqual('api-cell!child-cell2', tgt_message.target_cell)
# Correct next hop?
next_hop = tgt_message._get_next_hop()
self.assertEqual(target_cell, next_hop)
def test_targeted_message_when_target_cell_state_is_me(self):
method = 'fake_method'
method_kwargs = dict(arg1=1, arg2=2)
direction = 'down'
target_cell = self.state_manager.get_my_state()
tgt_message = messaging._TargetedMessage(self.msg_runner,
self.ctxt, method,
method_kwargs, direction,
target_cell)
self.assertEqual('api-cell', tgt_message.target_cell)
# Correct next hop?
next_hop = tgt_message._get_next_hop()
self.assertEqual(target_cell, next_hop)
def test_create_broadcast_message(self):
self.flags(max_hop_count=99, group='cells')
self.flags(name='api-cell', max_hop_count=99, group='cells')
method = 'fake_method'
method_kwargs = dict(arg1=1, arg2=2)
direction = 'down'
bcast_message = messaging._BroadcastMessage(self.msg_runner,
self.ctxt, method,
method_kwargs, direction)
self.assertEqual(self.ctxt, bcast_message.ctxt)
self.assertEqual(method, bcast_message.method_name)
self.assertEqual(method_kwargs, bcast_message.method_kwargs)
self.assertEqual(direction, bcast_message.direction)
self.assertFalse(bcast_message.fanout)
self.assertFalse(bcast_message.need_response)
self.assertEqual(self.our_name, bcast_message.routing_path)
self.assertEqual(1, bcast_message.hop_count)
self.assertEqual(99, bcast_message.max_hop_count)
self.assertTrue(bcast_message.is_broadcast)
# Correct next hops?
next_hops = bcast_message._get_next_hops()
child_cells = self.state_manager.get_child_cells()
self.assertEqual(child_cells, next_hops)
def test_create_broadcast_message_with_response(self):
self.flags(max_hop_count=99, group='cells')
our_name = 'child-cell1'
msg_runner = fakes.get_message_runner(our_name)
method = 'fake_method'
method_kwargs = dict(arg1=1, arg2=2)
direction = 'up'
bcast_message = messaging._BroadcastMessage(msg_runner, self.ctxt,
method, method_kwargs, direction, need_response=True)
self.assertEqual(self.ctxt, bcast_message.ctxt)
self.assertEqual(method, bcast_message.method_name)
self.assertEqual(method_kwargs, bcast_message.method_kwargs)
self.assertEqual(direction, bcast_message.direction)
self.assertFalse(bcast_message.fanout)
self.assertTrue(bcast_message.need_response)
self.assertEqual(our_name, bcast_message.routing_path)
self.assertEqual(1, bcast_message.hop_count)
self.assertEqual(99, bcast_message.max_hop_count)
self.assertTrue(bcast_message.is_broadcast)
# Correct next hops?
next_hops = bcast_message._get_next_hops()
parent_cells = msg_runner.state_manager.get_parent_cells()
self.assertEqual(parent_cells, next_hops)
def test_self_targeted_message(self):
target_cell = 'api-cell'
method = 'our_fake_method'
method_kwargs = dict(arg1=1, arg2=2)
direction = 'down'
call_info = {}
def our_fake_method(message, **kwargs):
call_info['context'] = message.ctxt
call_info['routing_path'] = message.routing_path
call_info['kwargs'] = kwargs
fakes.stub_tgt_method(self, 'api-cell', 'our_fake_method',
our_fake_method)
tgt_message = messaging._TargetedMessage(self.msg_runner,
self.ctxt, method,
method_kwargs, direction,
target_cell)
tgt_message.process()
self.assertEqual(self.ctxt, call_info['context'])
self.assertEqual(method_kwargs, call_info['kwargs'])
self.assertEqual(target_cell, call_info['routing_path'])
def test_child_targeted_message(self):
target_cell = 'api-cell!child-cell1'
method = 'our_fake_method'
method_kwargs = dict(arg1=1, arg2=2)
direction = 'down'
call_info = {}
def our_fake_method(message, **kwargs):
call_info['context'] = message.ctxt
call_info['routing_path'] = message.routing_path
call_info['kwargs'] = kwargs
fakes.stub_tgt_method(self, 'child-cell1', 'our_fake_method',
our_fake_method)
tgt_message = messaging._TargetedMessage(self.msg_runner,
self.ctxt, method,
method_kwargs, direction,
target_cell)
tgt_message.process()
self.assertEqual(self.ctxt, call_info['context'])
self.assertEqual(method_kwargs, call_info['kwargs'])
self.assertEqual(target_cell, call_info['routing_path'])
def test_grandchild_targeted_message(self):
target_cell = 'api-cell!child-cell2!grandchild-cell1'
method = 'our_fake_method'
method_kwargs = dict(arg1=1, arg2=2)
direction = 'down'
call_info = {}
def our_fake_method(message, **kwargs):
call_info['context'] = message.ctxt
call_info['routing_path'] = message.routing_path
call_info['kwargs'] = kwargs
fakes.stub_tgt_method(self, 'grandchild-cell1', 'our_fake_method',
our_fake_method)
tgt_message = messaging._TargetedMessage(self.msg_runner,
self.ctxt, method,
method_kwargs, direction,
target_cell)
tgt_message.process()
self.assertEqual(self.ctxt, call_info['context'])
self.assertEqual(method_kwargs, call_info['kwargs'])
self.assertEqual(target_cell, call_info['routing_path'])
def test_grandchild_targeted_message_with_response(self):
target_cell = 'api-cell!child-cell2!grandchild-cell1'
method = 'our_fake_method'
method_kwargs = dict(arg1=1, arg2=2)
direction = 'down'
call_info = {}
def our_fake_method(message, **kwargs):
call_info['context'] = message.ctxt
call_info['routing_path'] = message.routing_path
call_info['kwargs'] = kwargs
return 'our_fake_response'
fakes.stub_tgt_method(self, 'grandchild-cell1', 'our_fake_method',
our_fake_method)
tgt_message = messaging._TargetedMessage(self.msg_runner,
self.ctxt, method,
method_kwargs, direction,
target_cell,
need_response=True)
response = tgt_message.process()
self.assertEqual(self.ctxt, call_info['context'])
self.assertEqual(method_kwargs, call_info['kwargs'])
self.assertEqual(target_cell, call_info['routing_path'])
self.assertFalse(response.failure)
self.assertTrue(response.value_or_raise(), 'our_fake_response')
def test_grandchild_targeted_message_with_error(self):
target_cell = 'api-cell!child-cell2!grandchild-cell1'
method = 'our_fake_method'
method_kwargs = dict(arg1=1, arg2=2)
direction = 'down'
def our_fake_method(message, **kwargs):
raise test.TestingException('this should be returned')
fakes.stub_tgt_method(self, 'grandchild-cell1', 'our_fake_method',
our_fake_method)
tgt_message = messaging._TargetedMessage(self.msg_runner,
self.ctxt, method,
method_kwargs, direction,
target_cell,
need_response=True)
response = tgt_message.process()
self.assertTrue(response.failure)
self.assertRaises(test.TestingException, response.value_or_raise)
def test_grandchild_targeted_message_max_hops(self):
self.flags(max_hop_count=2, group='cells')
target_cell = 'api-cell!child-cell2!grandchild-cell1'
method = 'our_fake_method'
method_kwargs = dict(arg1=1, arg2=2)
direction = 'down'
def our_fake_method(message, **kwargs):
raise test.TestingException('should not be reached')
fakes.stub_tgt_method(self, 'grandchild-cell1', 'our_fake_method',
our_fake_method)
tgt_message = messaging._TargetedMessage(self.msg_runner,
self.ctxt, method,
method_kwargs, direction,
target_cell,
need_response=True)
response = tgt_message.process()
self.assertTrue(response.failure)
self.assertRaises(exception.CellMaxHopCountReached,
response.value_or_raise)
def test_targeted_message_invalid_cell(self):
target_cell = 'api-cell!child-cell2!grandchild-cell4'
method = 'our_fake_method'
method_kwargs = dict(arg1=1, arg2=2)
direction = 'down'
tgt_message = messaging._TargetedMessage(self.msg_runner,
self.ctxt, method,
method_kwargs, direction,
target_cell,
need_response=True)
response = tgt_message.process()
self.assertTrue(response.failure)
self.assertRaises(exception.CellRoutingInconsistency,
response.value_or_raise)
def test_targeted_message_invalid_cell2(self):
target_cell = 'unknown-cell!child-cell2'
method = 'our_fake_method'
method_kwargs = dict(arg1=1, arg2=2)
direction = 'down'
tgt_message = messaging._TargetedMessage(self.msg_runner,
self.ctxt, method,
method_kwargs, direction,
target_cell,
need_response=True)
response = tgt_message.process()
self.assertTrue(response.failure)
self.assertRaises(exception.CellRoutingInconsistency,
response.value_or_raise)
def test_broadcast_routing(self):
method = 'our_fake_method'
method_kwargs = dict(arg1=1, arg2=2)
direction = 'down'
cells = set()
def our_fake_method(message, **kwargs):
cells.add(message.routing_path)
fakes.stub_bcast_methods(self, 'our_fake_method', our_fake_method)
bcast_message = messaging._BroadcastMessage(self.msg_runner,
self.ctxt, method,
method_kwargs,
direction,
run_locally=True)
bcast_message.process()
# fakes creates 8 cells (including ourself).
self.assertEqual(len(cells), 8)
def test_broadcast_routing_up(self):
method = 'our_fake_method'
method_kwargs = dict(arg1=1, arg2=2)
direction = 'up'
msg_runner = fakes.get_message_runner('grandchild-cell3')
cells = set()
def our_fake_method(message, **kwargs):
cells.add(message.routing_path)
fakes.stub_bcast_methods(self, 'our_fake_method', our_fake_method)
bcast_message = messaging._BroadcastMessage(msg_runner, self.ctxt,
method, method_kwargs,
direction,
run_locally=True)
bcast_message.process()
# Paths are reversed, since going 'up'
expected = set(['grandchild-cell3', 'grandchild-cell3!child-cell3',
'grandchild-cell3!child-cell3!api-cell'])
self.assertEqual(expected, cells)
def test_broadcast_routing_without_ourselves(self):
method = 'our_fake_method'
method_kwargs = dict(arg1=1, arg2=2)
direction = 'down'
cells = set()
def our_fake_method(message, **kwargs):
cells.add(message.routing_path)
fakes.stub_bcast_methods(self, 'our_fake_method', our_fake_method)
bcast_message = messaging._BroadcastMessage(self.msg_runner,
self.ctxt, method,
method_kwargs,
direction,
run_locally=False)
bcast_message.process()
# fakes creates 8 cells (including ourself). So we should see
# only 7 here.
self.assertEqual(len(cells), 7)
def test_broadcast_routing_with_response(self):
method = 'our_fake_method'
method_kwargs = dict(arg1=1, arg2=2)
direction = 'down'
def our_fake_method(message, **kwargs):
return 'response-%s' % message.routing_path
fakes.stub_bcast_methods(self, 'our_fake_method', our_fake_method)
bcast_message = messaging._BroadcastMessage(self.msg_runner,
self.ctxt, method,
method_kwargs,
direction,
run_locally=True,
need_response=True)
responses = bcast_message.process()
self.assertEqual(len(responses), 8)
for response in responses:
self.assertFalse(response.failure)
self.assertEqual('response-%s' % response.cell_name,
response.value_or_raise())
def test_broadcast_routing_with_response_max_hops(self):
self.flags(max_hop_count=2, group='cells')
method = 'our_fake_method'
method_kwargs = dict(arg1=1, arg2=2)
direction = 'down'
def our_fake_method(message, **kwargs):
return 'response-%s' % message.routing_path
fakes.stub_bcast_methods(self, 'our_fake_method', our_fake_method)
bcast_message = messaging._BroadcastMessage(self.msg_runner,
self.ctxt, method,
method_kwargs,
direction,
run_locally=True,
need_response=True)
responses = bcast_message.process()
# Should only get responses from our immediate children (and
# ourselves)
self.assertEqual(len(responses), 5)
for response in responses:
self.assertFalse(response.failure)
self.assertEqual('response-%s' % response.cell_name,
response.value_or_raise())
def test_broadcast_routing_with_all_erroring(self):
method = 'our_fake_method'
method_kwargs = dict(arg1=1, arg2=2)
direction = 'down'
def our_fake_method(message, **kwargs):
raise test.TestingException('fake failure')
fakes.stub_bcast_methods(self, 'our_fake_method', our_fake_method)
bcast_message = messaging._BroadcastMessage(self.msg_runner,
self.ctxt, method,
method_kwargs,
direction,
run_locally=True,
need_response=True)
responses = bcast_message.process()
self.assertEqual(len(responses), 8)
for response in responses:
self.assertTrue(response.failure)
self.assertRaises(test.TestingException, response.value_or_raise)
def test_broadcast_routing_with_two_erroring(self):
method = 'our_fake_method'
method_kwargs = dict(arg1=1, arg2=2)
direction = 'down'
def our_fake_method_failing(message, **kwargs):
raise test.TestingException('fake failure')
def our_fake_method(message, **kwargs):
return 'response-%s' % message.routing_path
fakes.stub_bcast_methods(self, 'our_fake_method', our_fake_method)
fakes.stub_bcast_method(self, 'child-cell2', 'our_fake_method',
our_fake_method_failing)
fakes.stub_bcast_method(self, 'grandchild-cell3', 'our_fake_method',
our_fake_method_failing)
bcast_message = messaging._BroadcastMessage(self.msg_runner,
self.ctxt, method,
method_kwargs,
direction,
run_locally=True,
need_response=True)
responses = bcast_message.process()
self.assertEqual(len(responses), 8)
failure_responses = [resp for resp in responses if resp.failure]
success_responses = [resp for resp in responses if not resp.failure]
self.assertEqual(len(failure_responses), 2)
self.assertEqual(len(success_responses), 6)
for response in success_responses:
self.assertFalse(response.failure)
self.assertEqual('response-%s' % response.cell_name,
response.value_or_raise())
for response in failure_responses:
self.assertIn(response.cell_name, ['api-cell!child-cell2',
'api-cell!child-cell3!grandchild-cell3'])
self.assertTrue(response.failure)
self.assertRaises(test.TestingException, response.value_or_raise)
class CellsTargetedMethodsTestCase(test.TestCase):
"""Test case for _TargetedMessageMethods class. Most of these
tests actually test the full path from the MessageRunner through
to the functionality of the message method. Hits 2 birds with 1
stone, even though it's a little more than a unit test.
"""
def setUp(self):
super(CellsTargetedMethodsTestCase, self).setUp()
fakes.init(self)
self.ctxt = context.RequestContext('fake', 'fake')
self._setup_attrs('api-cell', 'api-cell!child-cell2')
def _setup_attrs(self, source_cell, target_cell):
self.tgt_cell_name = target_cell
self.src_msg_runner = fakes.get_message_runner(source_cell)
self.src_state_manager = self.src_msg_runner.state_manager
tgt_shortname = target_cell.split('!')[-1]
self.tgt_cell_mgr = fakes.get_cells_manager(tgt_shortname)
self.tgt_msg_runner = self.tgt_cell_mgr.msg_runner
self.tgt_scheduler = self.tgt_msg_runner.scheduler
self.tgt_state_manager = self.tgt_msg_runner.state_manager
methods_cls = self.tgt_msg_runner.methods_by_type['targeted']
self.tgt_methods_cls = methods_cls
self.tgt_compute_api = methods_cls.compute_api
self.tgt_db_inst = methods_cls.db
def test_schedule_run_instance(self):
host_sched_kwargs = {'filter_properties': {},
'key1': 'value1',
'key2': 'value2'}
self.mox.StubOutWithMock(self.tgt_scheduler, 'run_instance')
self.tgt_scheduler.run_instance(self.ctxt, host_sched_kwargs)
self.mox.ReplayAll()
self.src_msg_runner.schedule_run_instance(self.ctxt,
self.tgt_cell_name,
host_sched_kwargs)
def test_call_compute_api_method(self):
instance_uuid = 'fake_instance_uuid'
method_info = {'method': 'reboot',
'method_args': (instance_uuid, 2, 3),
'method_kwargs': {'arg1': 'val1', 'arg2': 'val2'}}
self.mox.StubOutWithMock(self.tgt_compute_api, 'reboot')
self.mox.StubOutWithMock(self.tgt_db_inst, 'instance_get_by_uuid')
self.tgt_db_inst.instance_get_by_uuid(self.ctxt,
instance_uuid).AndReturn(
'fake_instance')
self.tgt_compute_api.reboot(self.ctxt, 'fake_instance', 2, 3,
arg1='val1', arg2='val2').AndReturn('fake_result')
self.mox.ReplayAll()
response = self.src_msg_runner.run_compute_api_method(
self.ctxt,
self.tgt_cell_name,
method_info,
True)
result = response.value_or_raise()
self.assertEqual('fake_result', result)
def test_call_compute_api_method_unknown_instance(self):
# Unknown instance should send a broadcast up that instance
# is gone.
instance_uuid = 'fake_instance_uuid'
instance = {'uuid': instance_uuid}
method_info = {'method': 'reboot',
'method_args': (instance_uuid, 2, 3),
'method_kwargs': {'arg1': 'val1', 'arg2': 'val2'}}
self.mox.StubOutWithMock(self.tgt_db_inst, 'instance_get_by_uuid')
self.mox.StubOutWithMock(self.tgt_msg_runner,
'instance_destroy_at_top')
self.tgt_db_inst.instance_get_by_uuid(self.ctxt,
'fake_instance_uuid').AndRaise(
exception.InstanceNotFound(instance_id=instance_uuid))
self.tgt_msg_runner.instance_destroy_at_top(self.ctxt, instance)
self.mox.ReplayAll()
response = self.src_msg_runner.run_compute_api_method(
self.ctxt,
self.tgt_cell_name,
method_info,
True)
self.assertRaises(exception.InstanceNotFound,
response.value_or_raise)
def test_update_capabilities(self):
# Route up to API
self._setup_attrs('child-cell2', 'child-cell2!api-cell')
capabs = {'cap1': set(['val1', 'val2']),
'cap2': set(['val3'])}
# The list(set([])) seems silly, but we can't assume the order
# of the list... This behavior should match the code we're
# testing... which is check that a set was converted to a list.
expected_capabs = {'cap1': list(set(['val1', 'val2'])),
'cap2': ['val3']}
self.mox.StubOutWithMock(self.src_state_manager,
'get_our_capabilities')
self.mox.StubOutWithMock(self.tgt_state_manager,
'update_cell_capabilities')
self.mox.StubOutWithMock(self.tgt_msg_runner,
'tell_parents_our_capabilities')
self.src_state_manager.get_our_capabilities().AndReturn(capabs)
self.tgt_state_manager.update_cell_capabilities('child-cell2',
expected_capabs)
self.tgt_msg_runner.tell_parents_our_capabilities(self.ctxt)
self.mox.ReplayAll()
self.src_msg_runner.tell_parents_our_capabilities(self.ctxt)
def test_update_capacities(self):
self._setup_attrs('child-cell2', 'child-cell2!api-cell')
capacs = 'fake_capacs'
self.mox.StubOutWithMock(self.src_state_manager,
'get_our_capacities')
self.mox.StubOutWithMock(self.tgt_state_manager,
'update_cell_capacities')
self.mox.StubOutWithMock(self.tgt_msg_runner,
'tell_parents_our_capacities')
self.src_state_manager.get_our_capacities().AndReturn(capacs)
self.tgt_state_manager.update_cell_capacities('child-cell2',
capacs)
self.tgt_msg_runner.tell_parents_our_capacities(self.ctxt)
self.mox.ReplayAll()
self.src_msg_runner.tell_parents_our_capacities(self.ctxt)
def test_announce_capabilities(self):
self._setup_attrs('api-cell', 'api-cell!child-cell1')
# To make this easier to test, make us only have 1 child cell.
cell_state = self.src_state_manager.child_cells['child-cell1']
self.src_state_manager.child_cells = {'child-cell1': cell_state}
self.mox.StubOutWithMock(self.tgt_msg_runner,
'tell_parents_our_capabilities')
self.tgt_msg_runner.tell_parents_our_capabilities(self.ctxt)
self.mox.ReplayAll()
self.src_msg_runner.ask_children_for_capabilities(self.ctxt)
def test_announce_capacities(self):
self._setup_attrs('api-cell', 'api-cell!child-cell1')
# To make this easier to test, make us only have 1 child cell.
cell_state = self.src_state_manager.child_cells['child-cell1']
self.src_state_manager.child_cells = {'child-cell1': cell_state}
self.mox.StubOutWithMock(self.tgt_msg_runner,
'tell_parents_our_capacities')
self.tgt_msg_runner.tell_parents_our_capacities(self.ctxt)
self.mox.ReplayAll()
self.src_msg_runner.ask_children_for_capacities(self.ctxt)
class CellsBroadcastMethodsTestCase(test.TestCase):
"""Test case for _BroadcastMessageMethods class. Most of these
tests actually test the full path from the MessageRunner through
to the functionality of the message method. Hits 2 birds with 1
stone, even though it's a little more than a unit test.
"""
def setUp(self):
super(CellsBroadcastMethodsTestCase, self).setUp()
fakes.init(self)
self.ctxt = context.RequestContext('fake', 'fake')
self._setup_attrs()
def _setup_attrs(self, up=True):
mid_cell = 'child-cell2'
if up:
src_cell = 'grandchild-cell1'
tgt_cell = 'api-cell'
else:
src_cell = 'api-cell'
tgt_cell = 'grandchild-cell1'
self.src_msg_runner = fakes.get_message_runner(src_cell)
methods_cls = self.src_msg_runner.methods_by_type['broadcast']
self.src_methods_cls = methods_cls
self.src_db_inst = methods_cls.db
self.src_compute_api = methods_cls.compute_api
self.mid_msg_runner = fakes.get_message_runner(mid_cell)
methods_cls = self.mid_msg_runner.methods_by_type['broadcast']
self.mid_methods_cls = methods_cls
self.mid_db_inst = methods_cls.db
self.mid_compute_api = methods_cls.compute_api
self.tgt_msg_runner = fakes.get_message_runner(tgt_cell)
methods_cls = self.tgt_msg_runner.methods_by_type['broadcast']
self.tgt_methods_cls = methods_cls
self.tgt_db_inst = methods_cls.db
self.tgt_compute_api = methods_cls.compute_api
def test_at_the_top(self):
self.assertTrue(self.tgt_methods_cls._at_the_top())
self.assertFalse(self.mid_methods_cls._at_the_top())
self.assertFalse(self.src_methods_cls._at_the_top())
def test_instance_update_at_top(self):
fake_info_cache = {'id': 1,
'instance': 'fake_instance',
'other': 'moo'}
fake_sys_metadata = [{'id': 1,
'key': 'key1',
'value': 'value1'},
{'id': 2,
'key': 'key2',
'value': 'value2'}]
fake_instance = {'id': 2,
'uuid': 'fake_uuid',
'security_groups': 'fake',
'instance_type': 'fake',
'volumes': 'fake',
'cell_name': 'fake',
'name': 'fake',
'metadata': 'fake',
'info_cache': fake_info_cache,
'system_metadata': fake_sys_metadata,
'other': 'meow'}
expected_sys_metadata = {'key1': 'value1',
'key2': 'value2'}
expected_info_cache = {'other': 'moo'}
expected_instance = {'system_metadata': expected_sys_metadata,
'other': 'meow',
'uuid': 'fake_uuid'}
# To show these should not be called in src/mid-level cell
self.mox.StubOutWithMock(self.src_db_inst, 'instance_update')
self.mox.StubOutWithMock(self.src_db_inst,
'instance_info_cache_update')
self.mox.StubOutWithMock(self.mid_db_inst, 'instance_update')
self.mox.StubOutWithMock(self.mid_db_inst,
'instance_info_cache_update')
self.mox.StubOutWithMock(self.tgt_db_inst, 'instance_update')
self.mox.StubOutWithMock(self.tgt_db_inst,
'instance_info_cache_update')
self.tgt_db_inst.instance_update(self.ctxt, 'fake_uuid',
expected_instance,
update_cells=False)
self.tgt_db_inst.instance_info_cache_update(self.ctxt, 'fake_uuid',
expected_info_cache,
update_cells=False)
self.mox.ReplayAll()
self.src_msg_runner.instance_update_at_top(self.ctxt, fake_instance)
def test_instance_destroy_at_top(self):
fake_instance = {'uuid': 'fake_uuid'}
# To show these should not be called in src/mid-level cell
self.mox.StubOutWithMock(self.src_db_inst, 'instance_destroy')
self.mox.StubOutWithMock(self.tgt_db_inst, 'instance_destroy')
self.tgt_db_inst.instance_destroy(self.ctxt, 'fake_uuid',
update_cells=False)
self.mox.ReplayAll()
self.src_msg_runner.instance_destroy_at_top(self.ctxt, fake_instance)
def test_instance_hard_delete_everywhere(self):
# Reset this, as this is a broadcast down.
self._setup_attrs(up=False)
instance = {'uuid': 'meow'}
# Should not be called in src (API cell)
self.mox.StubOutWithMock(self.src_compute_api, 'delete')
self.mox.StubOutWithMock(self.mid_compute_api, 'delete')
self.mox.StubOutWithMock(self.tgt_compute_api, 'delete')
self.mid_compute_api.delete(self.ctxt, instance)
self.tgt_compute_api.delete(self.ctxt, instance)
self.mox.ReplayAll()
self.src_msg_runner.instance_delete_everywhere(self.ctxt,
instance, 'hard')
def test_instance_soft_delete_everywhere(self):
# Reset this, as this is a broadcast down.
self._setup_attrs(up=False)
instance = {'uuid': 'meow'}
# Should not be called in src (API cell)
self.mox.StubOutWithMock(self.src_compute_api, 'soft_delete')
self.mox.StubOutWithMock(self.mid_compute_api, 'soft_delete')
self.mox.StubOutWithMock(self.tgt_compute_api, 'soft_delete')
self.mid_compute_api.soft_delete(self.ctxt, instance)
self.tgt_compute_api.soft_delete(self.ctxt, instance)
self.mox.ReplayAll()
self.src_msg_runner.instance_delete_everywhere(self.ctxt,
instance, 'soft')
def test_instance_fault_create_at_top(self):
fake_instance_fault = {'id': 1,
'other stuff': 2,
'more stuff': 3}
expected_instance_fault = {'other stuff': 2,
'more stuff': 3}
# Shouldn't be called for these 2 cells
self.mox.StubOutWithMock(self.src_db_inst, 'instance_fault_create')
self.mox.StubOutWithMock(self.mid_db_inst, 'instance_fault_create')
self.mox.StubOutWithMock(self.tgt_db_inst, 'instance_fault_create')
self.tgt_db_inst.instance_fault_create(self.ctxt,
expected_instance_fault)
self.mox.ReplayAll()
self.src_msg_runner.instance_fault_create_at_top(self.ctxt,
fake_instance_fault)
def test_bw_usage_update_at_top(self):
fake_bw_update_info = {'uuid': 'fake_uuid',
'mac': 'fake_mac',
'start_period': 'fake_start_period',
'bw_in': 'fake_bw_in',
'bw_out': 'fake_bw_out',
'last_ctr_in': 'fake_last_ctr_in',
'last_ctr_out': 'fake_last_ctr_out',
'last_refreshed': 'fake_last_refreshed'}
# Shouldn't be called for these 2 cells
self.mox.StubOutWithMock(self.src_db_inst, 'bw_usage_update')
self.mox.StubOutWithMock(self.mid_db_inst, 'bw_usage_update')
self.mox.StubOutWithMock(self.tgt_db_inst, 'bw_usage_update')
self.tgt_db_inst.bw_usage_update(self.ctxt, **fake_bw_update_info)
self.mox.ReplayAll()
self.src_msg_runner.bw_usage_update_at_top(self.ctxt,
fake_bw_update_info)

View File

@ -0,0 +1,218 @@
# Copyright (c) 2012 Rackspace Hosting
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Tests For Cells RPC Communication Driver
"""
from nova.cells import messaging
from nova.cells import rpc_driver
from nova import context
from nova.openstack.common import cfg
from nova.openstack.common import rpc
from nova.openstack.common.rpc import dispatcher as rpc_dispatcher
from nova import test
from nova.tests.cells import fakes
CONF = cfg.CONF
CONF.import_opt('rpc_driver_queue_base', 'nova.cells.rpc_driver',
group='cells')
class CellsRPCDriverTestCase(test.TestCase):
"""Test case for Cells communication via RPC."""
def setUp(self):
super(CellsRPCDriverTestCase, self).setUp()
fakes.init(self)
self.ctxt = context.RequestContext('fake', 'fake')
self.driver = rpc_driver.CellsRPCDriver()
def test_start_consumers(self):
self.flags(rpc_driver_queue_base='cells.intercell42', group='cells')
rpc_consumers = []
rpc_conns = []
fake_msg_runner = fakes.get_message_runner('api-cell')
call_info = {}
class FakeInterCellRPCDispatcher(object):
def __init__(_self, msg_runner):
self.assertEqual(fake_msg_runner, msg_runner)
call_info['intercell_dispatcher'] = _self
class FakeRPCDispatcher(object):
def __init__(_self, proxy_objs):
self.assertEqual([call_info['intercell_dispatcher']],
proxy_objs)
call_info['rpc_dispatcher'] = _self
class FakeRPCConn(object):
def create_consumer(_self, topic, proxy_obj, **kwargs):
self.assertEqual(call_info['rpc_dispatcher'], proxy_obj)
rpc_consumers.append((topic, kwargs))
def consume_in_thread(_self):
pass
def _fake_create_connection(new):
self.assertTrue(new)
fake_conn = FakeRPCConn()
rpc_conns.append(fake_conn)
return fake_conn
self.stubs.Set(rpc, 'create_connection', _fake_create_connection)
self.stubs.Set(rpc_driver, 'InterCellRPCDispatcher',
FakeInterCellRPCDispatcher)
self.stubs.Set(rpc_dispatcher, 'RpcDispatcher', FakeRPCDispatcher)
self.driver.start_consumers(fake_msg_runner)
for message_type in ['broadcast', 'response', 'targeted']:
topic = 'cells.intercell42.' + message_type
self.assertIn((topic, {'fanout': True}), rpc_consumers)
self.assertIn((topic, {'fanout': False}), rpc_consumers)
self.assertEqual(rpc_conns, self.driver.rpc_connections)
def test_stop_consumers(self):
call_info = {'closed': []}
class FakeRPCConn(object):
def close(self):
call_info['closed'].append(self)
fake_conns = [FakeRPCConn() for x in xrange(5)]
self.driver.rpc_connections = fake_conns
self.driver.stop_consumers()
self.assertEqual(fake_conns, call_info['closed'])
def test_send_message_to_cell_cast(self):
msg_runner = fakes.get_message_runner('api-cell')
cell_state = fakes.get_cell_state('api-cell', 'child-cell2')
message = messaging._TargetedMessage(msg_runner,
self.ctxt, 'fake', 'fake', 'down', cell_state, fanout=False)
call_info = {}
def _fake_make_msg(method, **kwargs):
call_info['rpc_method'] = method
call_info['rpc_kwargs'] = kwargs
return 'fake-message'
def _fake_cast_to_server(*args, **kwargs):
call_info['cast_args'] = args
call_info['cast_kwargs'] = kwargs
self.stubs.Set(rpc, 'cast_to_server', _fake_cast_to_server)
self.stubs.Set(self.driver.intercell_rpcapi, 'make_msg',
_fake_make_msg)
self.stubs.Set(self.driver.intercell_rpcapi, 'cast_to_server',
_fake_cast_to_server)
self.driver.send_message_to_cell(cell_state, message)
expected_server_params = {'hostname': 'rpc_host2',
'password': 'password2',
'port': 'rpc_port2',
'username': 'username2',
'virtual_host': 'rpc_vhost2'}
expected_cast_args = (self.ctxt, expected_server_params,
'fake-message')
expected_cast_kwargs = {'topic': 'cells.intercell.targeted'}
expected_rpc_kwargs = {'message': message.to_json()}
self.assertEqual(expected_cast_args, call_info['cast_args'])
self.assertEqual(expected_cast_kwargs, call_info['cast_kwargs'])
self.assertEqual('process_message', call_info['rpc_method'])
self.assertEqual(expected_rpc_kwargs, call_info['rpc_kwargs'])
def test_send_message_to_cell_fanout_cast(self):
msg_runner = fakes.get_message_runner('api-cell')
cell_state = fakes.get_cell_state('api-cell', 'child-cell2')
message = messaging._TargetedMessage(msg_runner,
self.ctxt, 'fake', 'fake', 'down', cell_state, fanout=True)
call_info = {}
def _fake_make_msg(method, **kwargs):
call_info['rpc_method'] = method
call_info['rpc_kwargs'] = kwargs
return 'fake-message'
def _fake_fanout_cast_to_server(*args, **kwargs):
call_info['cast_args'] = args
call_info['cast_kwargs'] = kwargs
self.stubs.Set(rpc, 'fanout_cast_to_server',
_fake_fanout_cast_to_server)
self.stubs.Set(self.driver.intercell_rpcapi, 'make_msg',
_fake_make_msg)
self.stubs.Set(self.driver.intercell_rpcapi,
'fanout_cast_to_server', _fake_fanout_cast_to_server)
self.driver.send_message_to_cell(cell_state, message)
expected_server_params = {'hostname': 'rpc_host2',
'password': 'password2',
'port': 'rpc_port2',
'username': 'username2',
'virtual_host': 'rpc_vhost2'}
expected_cast_args = (self.ctxt, expected_server_params,
'fake-message')
expected_cast_kwargs = {'topic': 'cells.intercell.targeted'}
expected_rpc_kwargs = {'message': message.to_json()}
self.assertEqual(expected_cast_args, call_info['cast_args'])
self.assertEqual(expected_cast_kwargs, call_info['cast_kwargs'])
self.assertEqual('process_message', call_info['rpc_method'])
self.assertEqual(expected_rpc_kwargs, call_info['rpc_kwargs'])
def test_rpc_topic_uses_message_type(self):
self.flags(rpc_driver_queue_base='cells.intercell42', group='cells')
msg_runner = fakes.get_message_runner('api-cell')
cell_state = fakes.get_cell_state('api-cell', 'child-cell2')
message = messaging._BroadcastMessage(msg_runner,
self.ctxt, 'fake', 'fake', 'down', fanout=True)
message.message_type = 'fake-message-type'
call_info = {}
def _fake_fanout_cast_to_server(*args, **kwargs):
call_info['topic'] = kwargs.get('topic')
self.stubs.Set(self.driver.intercell_rpcapi,
'fanout_cast_to_server', _fake_fanout_cast_to_server)
self.driver.send_message_to_cell(cell_state, message)
self.assertEqual('cells.intercell42.fake-message-type',
call_info['topic'])
def test_process_message(self):
msg_runner = fakes.get_message_runner('api-cell')
dispatcher = rpc_driver.InterCellRPCDispatcher(msg_runner)
message = messaging._BroadcastMessage(msg_runner,
self.ctxt, 'fake', 'fake', 'down', fanout=True)
call_info = {}
def _fake_message_from_json(json_message):
call_info['json_message'] = json_message
self.assertEqual(message.to_json(), json_message)
return message
def _fake_process():
call_info['process_called'] = True
self.stubs.Set(msg_runner, 'message_from_json',
_fake_message_from_json)
self.stubs.Set(message, 'process', _fake_process)
dispatcher.process_message(self.ctxt, message.to_json())
self.assertEqual(message.to_json(), call_info['json_message'])
self.assertTrue(call_info['process_called'])

View File

@ -0,0 +1,206 @@
# Copyright (c) 2012 Rackspace Hosting
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Tests For Cells RPCAPI
"""
from nova.cells import rpcapi as cells_rpcapi
from nova.openstack.common import cfg
from nova.openstack.common import rpc
from nova import test
CONF = cfg.CONF
CONF.import_opt('topic', 'nova.cells.opts', group='cells')
class CellsAPITestCase(test.TestCase):
"""Test case for cells.api interfaces."""
def setUp(self):
super(CellsAPITestCase, self).setUp()
self.fake_topic = 'fake_topic'
self.fake_context = 'fake_context'
self.flags(topic=self.fake_topic, enable=True, group='cells')
self.cells_rpcapi = cells_rpcapi.CellsAPI()
def _stub_rpc_method(self, rpc_method, result):
call_info = {}
def fake_rpc_method(ctxt, topic, msg, *args, **kwargs):
call_info['context'] = ctxt
call_info['topic'] = topic
call_info['msg'] = msg
return result
self.stubs.Set(rpc, rpc_method, fake_rpc_method)
return call_info
def _check_result(self, call_info, method, args, version=None):
if version is None:
version = self.cells_rpcapi.BASE_RPC_API_VERSION
self.assertEqual(self.fake_context, call_info['context'])
self.assertEqual(self.fake_topic, call_info['topic'])
self.assertEqual(method, call_info['msg']['method'])
self.assertEqual(version, call_info['msg']['version'])
self.assertEqual(args, call_info['msg']['args'])
def test_cast_compute_api_method(self):
fake_cell_name = 'fake_cell_name'
fake_method = 'fake_method'
fake_method_args = (1, 2)
fake_method_kwargs = {'kwarg1': 10, 'kwarg2': 20}
expected_method_info = {'method': fake_method,
'method_args': fake_method_args,
'method_kwargs': fake_method_kwargs}
expected_args = {'method_info': expected_method_info,
'cell_name': fake_cell_name,
'call': False}
call_info = self._stub_rpc_method('cast', None)
self.cells_rpcapi.cast_compute_api_method(self.fake_context,
fake_cell_name, fake_method,
*fake_method_args, **fake_method_kwargs)
self._check_result(call_info, 'run_compute_api_method',
expected_args)
def test_call_compute_api_method(self):
fake_cell_name = 'fake_cell_name'
fake_method = 'fake_method'
fake_method_args = (1, 2)
fake_method_kwargs = {'kwarg1': 10, 'kwarg2': 20}
fake_response = 'fake_response'
expected_method_info = {'method': fake_method,
'method_args': fake_method_args,
'method_kwargs': fake_method_kwargs}
expected_args = {'method_info': expected_method_info,
'cell_name': fake_cell_name,
'call': True}
call_info = self._stub_rpc_method('call', fake_response)
result = self.cells_rpcapi.call_compute_api_method(self.fake_context,
fake_cell_name, fake_method,
*fake_method_args, **fake_method_kwargs)
self._check_result(call_info, 'run_compute_api_method',
expected_args)
self.assertEqual(fake_response, result)
def test_schedule_run_instance(self):
call_info = self._stub_rpc_method('cast', None)
self.cells_rpcapi.schedule_run_instance(
self.fake_context, arg1=1, arg2=2, arg3=3)
expected_args = {'host_sched_kwargs': {'arg1': 1,
'arg2': 2,
'arg3': 3}}
self._check_result(call_info, 'schedule_run_instance',
expected_args)
def test_instance_update_at_top(self):
fake_info_cache = {'id': 1,
'instance': 'fake_instance',
'other': 'moo'}
fake_sys_metadata = [{'id': 1,
'key': 'key1',
'value': 'value1'},
{'id': 2,
'key': 'key2',
'value': 'value2'}]
fake_instance = {'id': 2,
'security_groups': 'fake',
'instance_type': 'fake',
'volumes': 'fake',
'cell_name': 'fake',
'name': 'fake',
'metadata': 'fake',
'info_cache': fake_info_cache,
'system_metadata': fake_sys_metadata,
'other': 'meow'}
call_info = self._stub_rpc_method('cast', None)
self.cells_rpcapi.instance_update_at_top(
self.fake_context, fake_instance)
expected_args = {'instance': fake_instance}
self._check_result(call_info, 'instance_update_at_top',
expected_args)
def test_instance_destroy_at_top(self):
fake_instance = {'uuid': 'fake-uuid'}
call_info = self._stub_rpc_method('cast', None)
self.cells_rpcapi.instance_destroy_at_top(
self.fake_context, fake_instance)
expected_args = {'instance': fake_instance}
self._check_result(call_info, 'instance_destroy_at_top',
expected_args)
def test_instance_delete_everywhere(self):
fake_instance = {'uuid': 'fake-uuid'}
call_info = self._stub_rpc_method('cast', None)
self.cells_rpcapi.instance_delete_everywhere(
self.fake_context, fake_instance,
'fake-type')
expected_args = {'instance': fake_instance,
'delete_type': 'fake-type'}
self._check_result(call_info, 'instance_delete_everywhere',
expected_args)
def test_instance_fault_create_at_top(self):
fake_instance_fault = {'id': 2,
'other': 'meow'}
call_info = self._stub_rpc_method('cast', None)
self.cells_rpcapi.instance_fault_create_at_top(
self.fake_context, fake_instance_fault)
expected_args = {'instance_fault': fake_instance_fault}
self._check_result(call_info, 'instance_fault_create_at_top',
expected_args)
def test_bw_usage_update_at_top(self):
update_args = ('fake_uuid', 'fake_mac', 'fake_start_period',
'fake_bw_in', 'fake_bw_out', 'fake_ctr_in',
'fake_ctr_out')
update_kwargs = {'last_refreshed': 'fake_refreshed'}
call_info = self._stub_rpc_method('cast', None)
self.cells_rpcapi.bw_usage_update_at_top(
self.fake_context, *update_args, **update_kwargs)
bw_update_info = {'uuid': 'fake_uuid',
'mac': 'fake_mac',
'start_period': 'fake_start_period',
'bw_in': 'fake_bw_in',
'bw_out': 'fake_bw_out',
'last_ctr_in': 'fake_ctr_in',
'last_ctr_out': 'fake_ctr_out',
'last_refreshed': 'fake_refreshed'}
expected_args = {'bw_update_info': bw_update_info}
self._check_result(call_info, 'bw_usage_update_at_top',
expected_args)

View File

@ -0,0 +1,206 @@
# Copyright (c) 2012 Rackspace Hosting
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Tests For CellsScheduler
"""
import time
from nova.compute import vm_states
from nova import context
from nova import db
from nova import exception
from nova.openstack.common import cfg
from nova.openstack.common import uuidutils
from nova import test
from nova.tests.cells import fakes
CONF = cfg.CONF
CONF.import_opt('scheduler_retries', 'nova.cells.scheduler', group='cells')
class CellsSchedulerTestCase(test.TestCase):
"""Test case for CellsScheduler class"""
def setUp(self):
super(CellsSchedulerTestCase, self).setUp()
fakes.init(self)
self.msg_runner = fakes.get_message_runner('api-cell')
self.scheduler = self.msg_runner.scheduler
self.state_manager = self.msg_runner.state_manager
self.my_cell_state = self.state_manager.get_my_state()
self.ctxt = context.RequestContext('fake', 'fake')
instance_uuids = []
for x in xrange(3):
instance_uuids.append(uuidutils.generate_uuid())
self.instance_uuids = instance_uuids
self.request_spec = {'instance_uuids': instance_uuids,
'other': 'stuff'}
def test_create_instances_here(self):
# Just grab the first instance type
inst_type = db.instance_type_get(self.ctxt, 1)
image = {'properties': {}}
instance_props = {'hostname': 'meow',
'display_name': 'moo',
'image_ref': 'fake_image_ref',
'user_id': self.ctxt.user_id,
'project_id': self.ctxt.project_id}
request_spec = {'instance_type': inst_type,
'image': image,
'security_group': ['default'],
'block_device_mapping': [],
'instance_properties': instance_props,
'instance_uuids': self.instance_uuids}
call_info = {'uuids': []}
def _fake_instance_update_at_top(_ctxt, instance):
call_info['uuids'].append(instance['uuid'])
self.stubs.Set(self.msg_runner, 'instance_update_at_top',
_fake_instance_update_at_top)
self.scheduler._create_instances_here(self.ctxt, request_spec)
self.assertEqual(self.instance_uuids, call_info['uuids'])
for instance_uuid in self.instance_uuids:
instance = db.instance_get_by_uuid(self.ctxt, instance_uuid)
self.assertEqual('meow', instance['hostname'])
self.assertEqual('moo', instance['display_name'])
self.assertEqual('fake_image_ref', instance['image_ref'])
def test_run_instance_selects_child_cell(self):
# Make sure there's no capacity info so we're sure to
# select a child cell
our_cell_info = self.state_manager.get_my_state()
our_cell_info.capacities = {}
call_info = {'times': 0}
orig_fn = self.msg_runner.schedule_run_instance
def msg_runner_schedule_run_instance(ctxt, target_cell,
host_sched_kwargs):
# This gets called twice. Once for our running it
# in this cell.. and then it'll get called when the
# child cell is picked. So, first time.. just run it
# like normal.
if not call_info['times']:
call_info['times'] += 1
return orig_fn(ctxt, target_cell, host_sched_kwargs)
call_info['ctxt'] = ctxt
call_info['target_cell'] = target_cell
call_info['host_sched_kwargs'] = host_sched_kwargs
self.stubs.Set(self.msg_runner, 'schedule_run_instance',
msg_runner_schedule_run_instance)
host_sched_kwargs = {'request_spec': self.request_spec}
self.msg_runner.schedule_run_instance(self.ctxt,
self.my_cell_state, host_sched_kwargs)
self.assertEqual(self.ctxt, call_info['ctxt'])
self.assertEqual(host_sched_kwargs, call_info['host_sched_kwargs'])
child_cells = self.state_manager.get_child_cells()
self.assertIn(call_info['target_cell'], child_cells)
def test_run_instance_selects_current_cell(self):
# Make sure there's no child cells so that we will be
# selected
self.state_manager.child_cells = {}
call_info = {}
def fake_create_instances_here(ctxt, request_spec):
call_info['ctxt'] = ctxt
call_info['request_spec'] = request_spec
def fake_rpc_run_instance(ctxt, **host_sched_kwargs):
call_info['host_sched_kwargs'] = host_sched_kwargs
self.stubs.Set(self.scheduler, '_create_instances_here',
fake_create_instances_here)
self.stubs.Set(self.scheduler.scheduler_rpcapi,
'run_instance', fake_rpc_run_instance)
host_sched_kwargs = {'request_spec': self.request_spec,
'other': 'stuff'}
self.msg_runner.schedule_run_instance(self.ctxt,
self.my_cell_state, host_sched_kwargs)
self.assertEqual(self.ctxt, call_info['ctxt'])
self.assertEqual(self.request_spec, call_info['request_spec'])
self.assertEqual(host_sched_kwargs, call_info['host_sched_kwargs'])
def test_run_instance_retries_when_no_cells_avail(self):
self.flags(scheduler_retries=7, group='cells')
host_sched_kwargs = {'request_spec': self.request_spec}
call_info = {'num_tries': 0, 'errored_uuids': []}
def fake_run_instance(message, host_sched_kwargs):
call_info['num_tries'] += 1
raise exception.NoCellsAvailable()
def fake_sleep(_secs):
return
def fake_instance_update(ctxt, instance_uuid, values):
self.assertEqual(vm_states.ERROR, values['vm_state'])
call_info['errored_uuids'].append(instance_uuid)
self.stubs.Set(self.scheduler, '_run_instance', fake_run_instance)
self.stubs.Set(time, 'sleep', fake_sleep)
self.stubs.Set(db, 'instance_update', fake_instance_update)
self.msg_runner.schedule_run_instance(self.ctxt,
self.my_cell_state, host_sched_kwargs)
self.assertEqual(8, call_info['num_tries'])
self.assertEqual(self.instance_uuids, call_info['errored_uuids'])
def test_run_instance_on_random_exception(self):
self.flags(scheduler_retries=7, group='cells')
host_sched_kwargs = {'request_spec': self.request_spec}
call_info = {'num_tries': 0,
'errored_uuids1': [],
'errored_uuids2': []}
def fake_run_instance(message, host_sched_kwargs):
call_info['num_tries'] += 1
raise test.TestingException()
def fake_instance_update(ctxt, instance_uuid, values):
self.assertEqual(vm_states.ERROR, values['vm_state'])
call_info['errored_uuids1'].append(instance_uuid)
def fake_instance_update_at_top(ctxt, instance):
self.assertEqual(vm_states.ERROR, instance['vm_state'])
call_info['errored_uuids2'].append(instance['uuid'])
self.stubs.Set(self.scheduler, '_run_instance', fake_run_instance)
self.stubs.Set(db, 'instance_update', fake_instance_update)
self.stubs.Set(self.msg_runner, 'instance_update_at_top',
fake_instance_update_at_top)
self.msg_runner.schedule_run_instance(self.ctxt,
self.my_cell_state, host_sched_kwargs)
# Shouldn't retry
self.assertEqual(1, call_info['num_tries'])
self.assertEqual(self.instance_uuids, call_info['errored_uuids1'])
self.assertEqual(self.instance_uuids, call_info['errored_uuids2'])

View File

@ -3696,8 +3696,12 @@ class ComputeAPITestCase(BaseTestCase):
{'vm_state': vm_states.SOFT_DELETED,
'task_state': None})
# Ensure quotas are committed
self.mox.StubOutWithMock(nova.quota.QUOTAS, 'commit')
nova.quota.QUOTAS.commit(mox.IgnoreArg(), mox.IgnoreArg())
if self.__class__.__name__ == 'CellsComputeAPITestCase':
# Called a 2nd time (for the child cell) when testing cells
nova.quota.QUOTAS.commit(mox.IgnoreArg(), mox.IgnoreArg())
self.mox.ReplayAll()
self.compute_api.restore(self.context, instance)

View File

@ -0,0 +1,99 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (c) 2012 Rackspace Hosting
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Tests For Compute w/ Cells
"""
from nova.compute import cells_api as compute_cells_api
from nova.openstack.common import log as logging
from nova.tests.compute import test_compute
LOG = logging.getLogger('nova.tests.test_compute_cells')
ORIG_COMPUTE_API = None
def stub_call_to_cells(context, instance, method, *args, **kwargs):
fn = getattr(ORIG_COMPUTE_API, method)
return fn(context, instance, *args, **kwargs)
def stub_cast_to_cells(context, instance, method, *args, **kwargs):
fn = getattr(ORIG_COMPUTE_API, method)
fn(context, instance, *args, **kwargs)
def deploy_stubs(stubs, api):
stubs.Set(api, '_call_to_cells', stub_call_to_cells)
stubs.Set(api, '_cast_to_cells', stub_cast_to_cells)
class CellsComputeAPITestCase(test_compute.ComputeAPITestCase):
def setUp(self):
super(CellsComputeAPITestCase, self).setUp()
global ORIG_COMPUTE_API
ORIG_COMPUTE_API = self.compute_api
def _fake_cell_read_only(*args, **kwargs):
return False
def _fake_validate_cell(*args, **kwargs):
return
def _nop_update(context, instance, **kwargs):
return instance
self.compute_api = compute_cells_api.ComputeCellsAPI()
self.stubs.Set(self.compute_api, '_cell_read_only',
_fake_cell_read_only)
self.stubs.Set(self.compute_api, '_validate_cell',
_fake_validate_cell)
# NOTE(belliott) Don't update the instance state
# for the tests at the API layer. Let it happen after
# the stub cast to cells so that expected_task_states
# match.
self.stubs.Set(self.compute_api, 'update', _nop_update)
deploy_stubs(self.stubs, self.compute_api)
def tearDown(self):
global ORIG_COMPUTE_API
self.compute_api = ORIG_COMPUTE_API
super(CellsComputeAPITestCase, self).tearDown()
def test_instance_metadata(self):
self.skipTest("Test is incompatible with cells.")
def test_live_migrate(self):
self.skipTest("Test is incompatible with cells.")
def test_get_backdoor_port(self):
self.skipTest("Test is incompatible with cells.")
class CellsComputePolicyTestCase(test_compute.ComputePolicyTestCase):
def setUp(self):
super(CellsComputePolicyTestCase, self).setUp()
global ORIG_COMPUTE_API
ORIG_COMPUTE_API = self.compute_api
self.compute_api = compute_cells_api.ComputeCellsAPI()
deploy_stubs(self.stubs, self.compute_api)
def tearDown(self):
global ORIG_COMPUTE_API
self.compute_api = ORIG_COMPUTE_API
super(CellsComputePolicyTestCase, self).tearDown()

View File

@ -50,6 +50,7 @@ setuptools.setup(name='nova',
'bin/nova-api-metadata',
'bin/nova-api-os-compute',
'bin/nova-rpc-zmq-receiver',
'bin/nova-cells',
'bin/nova-cert',
'bin/nova-clear-rabbit-queues',
'bin/nova-compute',