Cells: Add cells support to hypervisors extension

This removes the DB calls from the extension and moves them into
HostAPI().  The cells version of HostAPI() proxies them to the
appropriate cell or cells.

Change-Id: I17ff10e62d2f10b85a557edcc1509ca3b0d15a29
This commit is contained in:
Chris Behrens
2013-01-16 18:49:56 +00:00
parent be992f7b9a
commit a019a9007d
10 changed files with 337 additions and 20 deletions

View File

@@ -20,8 +20,7 @@ import webob.exc
from nova.api.openstack import extensions
from nova.api.openstack import wsgi
from nova.api.openstack import xmlutil
from nova.compute import api as compute_api
from nova import db
from nova import compute
from nova import exception
from nova.openstack.common import log as logging
@@ -128,7 +127,7 @@ class HypervisorsController(object):
"""The Hypervisors API controller for the OpenStack API."""
def __init__(self):
self.api = compute_api.HostAPI()
self.host_api = compute.HostAPI()
super(HypervisorsController, self).__init__()
def _view_hypervisor(self, hypervisor, detail, servers=None, **kwargs):
@@ -164,22 +163,24 @@ class HypervisorsController(object):
def index(self, req):
context = req.environ['nova.context']
authorize(context)
compute_nodes = self.host_api.compute_node_get_all(context)
return dict(hypervisors=[self._view_hypervisor(hyp, False)
for hyp in db.compute_node_get_all(context)])
for hyp in compute_nodes])
@wsgi.serializers(xml=HypervisorDetailTemplate)
def detail(self, req):
context = req.environ['nova.context']
authorize(context)
compute_nodes = self.host_api.compute_node_get_all(context)
return dict(hypervisors=[self._view_hypervisor(hyp, True)
for hyp in db.compute_node_get_all(context)])
for hyp in compute_nodes])
@wsgi.serializers(xml=HypervisorTemplate)
def show(self, req, id):
context = req.environ['nova.context']
authorize(context)
try:
hyp = db.compute_node_get(context, int(id))
hyp = self.host_api.compute_node_get(context, id)
except (ValueError, exception.ComputeHostNotFound):
msg = _("Hypervisor with ID '%s' could not be found.") % id
raise webob.exc.HTTPNotFound(explanation=msg)
@@ -190,7 +191,7 @@ class HypervisorsController(object):
context = req.environ['nova.context']
authorize(context)
try:
hyp = db.compute_node_get(context, int(id))
hyp = self.host_api.compute_node_get(context, id)
except (ValueError, exception.ComputeHostNotFound):
msg = _("Hypervisor with ID '%s' could not be found.") % id
raise webob.exc.HTTPNotFound(explanation=msg)
@@ -198,7 +199,7 @@ class HypervisorsController(object):
# Get the uptime
try:
host = hyp['service']['host']
uptime = self.api.get_host_uptime(context, host)
uptime = self.host_api.get_host_uptime(context, host)
except NotImplementedError:
msg = _("Virt driver does not implement uptime function.")
raise webob.exc.HTTPNotImplemented(explanation=msg)
@@ -210,7 +211,8 @@ class HypervisorsController(object):
def search(self, req, id):
context = req.environ['nova.context']
authorize(context)
hypervisors = db.compute_node_search_by_hypervisor(context, id)
hypervisors = self.host_api.compute_node_search_by_hypervisor(
context, id)
if hypervisors:
return dict(hypervisors=[self._view_hypervisor(hyp, False)
for hyp in hypervisors])
@@ -222,21 +224,24 @@ class HypervisorsController(object):
def servers(self, req, id):
context = req.environ['nova.context']
authorize(context)
hypervisors = db.compute_node_search_by_hypervisor(context, id)
if hypervisors:
return dict(hypervisors=[self._view_hypervisor(hyp, False,
db.instance_get_all_by_host(context,
hyp['service']['host']))
for hyp in hypervisors])
else:
compute_nodes = self.host_api.compute_node_search_by_hypervisor(
context, id)
if not compute_nodes:
msg = _("No hypervisor matching '%s' could be found.") % id
raise webob.exc.HTTPNotFound(explanation=msg)
hypervisors = []
for compute_node in compute_nodes:
instances = self.host_api.instance_get_all_by_host(context,
compute_node['service']['host'])
hyp = self._view_hypervisor(compute_node, False, instances)
hypervisors.append(hyp)
return dict(hypervisors=hypervisors)
@wsgi.serializers(xml=HypervisorStatisticsTemplate)
def statistics(self, req):
context = req.environ['nova.context']
authorize(context)
stats = db.compute_node_statistics(context)
stats = self.host_api.compute_node_statistics(context)
return dict(hypervisor_statistics=stats)

View File

@@ -65,7 +65,7 @@ class CellsManager(manager.Manager):
Scheduling requests get passed to the scheduler class.
"""
RPC_API_VERSION = '1.3'
RPC_API_VERSION = '1.4'
def __init__(self, *args, **kwargs):
# Mostly for tests.
@@ -295,3 +295,39 @@ class CellsManager(manager.Manager):
response.cell_name)
ret_task_logs.append(task_log)
return ret_task_logs
def compute_node_get(self, ctxt, compute_id):
"""Get a compute node by ID in a specific cell."""
cell_name, compute_id = cells_utils.split_cell_and_item(
compute_id)
response = self.msg_runner.compute_node_get(ctxt, cell_name,
compute_id)
node = response.value_or_raise()
cells_utils.add_cell_to_compute_node(node, cell_name)
return node
def compute_node_get_all(self, ctxt, hypervisor_match=None):
"""Return list of compute nodes in all cells."""
responses = self.msg_runner.compute_node_get_all(ctxt,
hypervisor_match=hypervisor_match)
# 1 response per cell. Each response is a list of compute_node
# entries.
ret_nodes = []
for response in responses:
nodes = response.value_or_raise()
for node in nodes:
cells_utils.add_cell_to_compute_node(node,
response.cell_name)
ret_nodes.append(node)
return ret_nodes
def compute_node_stats(self, ctxt):
"""Return compute node stats totals from all cells."""
responses = self.msg_runner.compute_node_stats(ctxt)
totals = {}
for response in responses:
data = response.value_or_raise()
for key, val in data.iteritems():
totals.setdefault(key, 0)
totals[key] += val
return totals

View File

@@ -711,6 +711,12 @@ class _TargetedMessageMethods(_BaseMessageMethods):
timeout=timeout)
rpc.cast(message.ctxt, topic, rpc_message)
def compute_node_get(self, message, compute_id):
"""Get compute node by ID."""
compute_node = self.db.compute_node_get(message.ctxt,
compute_id)
return jsonutils.to_primitive(compute_node)
class _BroadcastMessageMethods(_BaseMessageMethods):
"""These are the methods that can be called as a part of a broadcast
@@ -848,6 +854,19 @@ class _BroadcastMessageMethods(_BaseMessageMethods):
ret_services.append(service)
return ret_services
def compute_node_get_all(self, message, hypervisor_match):
"""Return compute nodes in this cell."""
if hypervisor_match is not None:
nodes = self.db.compute_node_search_by_hypervisor(message.ctxt,
hypervisor_match)
else:
nodes = self.db.compute_node_get_all(message.ctxt)
return jsonutils.to_primitive(nodes)
def compute_node_stats(self, message):
"""Return compute node stats from this cell."""
return self.db.compute_node_statistics(message.ctxt)
_CELL_MESSAGE_TYPE_TO_MESSAGE_CLS = {'targeted': _TargetedMessage,
'broadcast': _BroadcastMessage,
@@ -1140,6 +1159,30 @@ class MessageRunner(object):
run_locally=True, need_response=True)
return message.process()
def compute_node_get_all(self, ctxt, hypervisor_match=None):
"""Return list of compute nodes in all child cells."""
method_kwargs = dict(hypervisor_match=hypervisor_match)
message = _BroadcastMessage(self, ctxt, 'compute_node_get_all',
method_kwargs, 'down',
run_locally=True, need_response=True)
return message.process()
def compute_node_stats(self, ctxt):
"""Return compute node stats from all child cells."""
method_kwargs = dict()
message = _BroadcastMessage(self, ctxt, 'compute_node_stats',
method_kwargs, 'down',
run_locally=True, need_response=True)
return message.process()
def compute_node_get(self, ctxt, cell_name, compute_id):
"""Return compute node entry from a specific cell by ID."""
method_kwargs = dict(compute_id=compute_id)
message = _TargetedMessage(self, ctxt, 'compute_node_get',
method_kwargs, 'down',
cell_name, need_response=True)
return message.process()
@staticmethod
def get_message_types():
return _CELL_MESSAGE_TYPE_TO_MESSAGE_CLS.keys()

View File

@@ -44,6 +44,8 @@ class CellsAPI(rpc_proxy.RpcProxy):
1.2 - Adds service_get_all(), service_get_by_compute_host(),
and proxy_rpc_to_compute_manager()
1.3 - Adds task_log_get_all()
1.4 - Adds compute_node_get(), compute_node_get_all(), and
compute_node_stats()
'''
BASE_RPC_API_VERSION = '1.0'
@@ -196,3 +198,23 @@ class CellsAPI(rpc_proxy.RpcProxy):
period_ending=period_ending,
host=host, state=state),
version='1.3')
def compute_node_get(self, ctxt, compute_id):
"""Get a compute node by ID in a specific cell."""
return self.call(ctxt, self.make_msg('compute_node_get',
compute_id=compute_id),
version='1.4')
def compute_node_get_all(self, ctxt, hypervisor_match=None):
"""Return list of compute nodes in all cells, optionally
filtering by hypervisor host.
"""
return self.call(ctxt,
self.make_msg('compute_node_get_all',
hypervisor_match=hypervisor_match),
version='1.4')
def compute_node_stats(self, ctxt):
"""Return compute node stats from all cells."""
return self.call(ctxt, self.make_msg('compute_node_stats'),
version='1.4')

View File

@@ -2482,6 +2482,20 @@ class HostAPI(base.Base):
host=host,
state=state)
def compute_node_get(self, context, compute_id):
"""Return compute node entry for particular integer ID."""
return self.db.compute_node_get(context, int(compute_id))
def compute_node_get_all(self, context):
return self.db.compute_node_get_all(context)
def compute_node_search_by_hypervisor(self, context, hypervisor_match):
return self.db.compute_node_search_by_hypervisor(context,
hypervisor_match)
def compute_node_statistics(self, context):
return self.db.compute_node_statistics(context)
class AggregateAPI(base.Base):
"""Sub-set of the Compute Manager API for managing host aggregates."""

View File

@@ -649,3 +649,19 @@ class HostAPI(compute_api.HostAPI):
ending,
host=host,
state=state)
def compute_node_get(self, context, compute_id):
"""Get a compute node from a particular cell by its integer ID.
compute_id should be in the format of 'path!to!cell@ID'.
"""
return self.cells_rpcapi.compute_node_get(context, compute_id)
def compute_node_get_all(self, context):
return self.cells_rpcapi.compute_node_get_all(context)
def compute_node_search_by_hypervisor(self, context, hypervisor_match):
return self.cells_rpcapi.compute_node_get_all(context,
hypervisor_match=hypervisor_match)
def compute_node_statistics(self, context):
return self.cells_rpcapi.compute_node_stats(context)

View File

@@ -267,7 +267,7 @@ class HypervisorsTest(test.TestCase):
def fake_get_host_uptime(context, hyp):
raise exc.HTTPNotImplemented()
self.stubs.Set(self.controller.api, 'get_host_uptime',
self.stubs.Set(self.controller.host_api, 'get_host_uptime',
fake_get_host_uptime)
req = fakes.HTTPRequest.blank('/v2/fake/os-hypervisors/1')
@@ -278,7 +278,7 @@ class HypervisorsTest(test.TestCase):
def fake_get_host_uptime(context, hyp):
return "fake uptime"
self.stubs.Set(self.controller.api, 'get_host_uptime',
self.stubs.Set(self.controller.host_api, 'get_host_uptime',
fake_get_host_uptime)
req = fakes.HTTPRequest.blank('/v2/fake/os-hypervisors/1')

View File

@@ -370,3 +370,61 @@ class CellsManagerClassTestCase(test.TestCase):
period_beginning='fake-begin', period_ending='fake-end',
host=cell_and_host, state='fake-state')
self.assertEqual(expected_response, response)
def test_compute_node_get_all(self):
responses = []
expected_response = []
# 3 cells... so 3 responses. Each response is a list of computes.
# Manager should turn these into a single list of responses.
for i in xrange(3):
cell_name = 'path!to!cell%i' % i
compute_nodes = []
for compute_node in FAKE_COMPUTE_NODES:
compute_nodes.append(copy.deepcopy(compute_node))
expected_compute_node = copy.deepcopy(compute_node)
cells_utils.add_cell_to_compute_node(expected_compute_node,
cell_name)
expected_response.append(expected_compute_node)
response = messaging.Response(cell_name, compute_nodes, False)
responses.append(response)
self.mox.StubOutWithMock(self.msg_runner,
'compute_node_get_all')
self.msg_runner.compute_node_get_all(self.ctxt,
hypervisor_match='fake-match').AndReturn(responses)
self.mox.ReplayAll()
response = self.cells_manager.compute_node_get_all(self.ctxt,
hypervisor_match='fake-match')
self.assertEqual(expected_response, response)
def test_compute_node_stats(self):
raw_resp1 = {'key1': 1, 'key2': 2}
raw_resp2 = {'key2': 1, 'key3': 2}
raw_resp3 = {'key3': 1, 'key4': 2}
responses = [messaging.Response('cell1', raw_resp1, False),
messaging.Response('cell2', raw_resp2, False),
messaging.Response('cell2', raw_resp3, False)]
expected_resp = {'key1': 1, 'key2': 3, 'key3': 3, 'key4': 2}
self.mox.StubOutWithMock(self.msg_runner,
'compute_node_stats')
self.msg_runner.compute_node_stats(self.ctxt).AndReturn(responses)
self.mox.ReplayAll()
response = self.cells_manager.compute_node_stats(self.ctxt)
self.assertEqual(expected_resp, response)
def test_compute_node_get(self):
fake_cell = 'fake-cell'
fake_response = messaging.Response(fake_cell,
FAKE_COMPUTE_NODES[0],
False)
expected_response = copy.deepcopy(FAKE_COMPUTE_NODES[0])
cells_utils.add_cell_to_compute_node(expected_response, fake_cell)
cell_and_id = cells_utils.cell_with_item(fake_cell, 'fake-id')
self.mox.StubOutWithMock(self.msg_runner,
'compute_node_get')
self.msg_runner.compute_node_get(self.ctxt,
'fake-cell', 'fake-id').AndReturn(fake_response)
self.mox.ReplayAll()
response = self.cells_manager.compute_node_get(self.ctxt,
compute_id=cell_and_id)
self.assertEqual(expected_response, response)

View File

@@ -811,6 +811,19 @@ class CellsTargetedMethodsTestCase(test.TestCase):
result = response[0].value_or_raise()
self.assertEqual(['fake_result'], result)
def test_compute_node_get(self):
compute_id = 'fake-id'
self.mox.StubOutWithMock(self.tgt_db_inst, 'compute_node_get')
self.tgt_db_inst.compute_node_get(self.ctxt,
compute_id).AndReturn('fake_result')
self.mox.ReplayAll()
response = self.src_msg_runner.compute_node_get(self.ctxt,
self.tgt_cell_name, compute_id)
result = response.value_or_raise()
self.assertEqual('fake_result', result)
class CellsBroadcastMethodsTestCase(test.TestCase):
"""Test case for _BroadcastMessageMethods class. Most of these
@@ -1140,3 +1153,86 @@ class CellsBroadcastMethodsTestCase(test.TestCase):
('api-cell!child-cell2', [3]),
('api-cell', [1, 2])]
self.assertEqual(expected, response_values)
def test_compute_node_get_all(self):
# Reset this, as this is a broadcast down.
self._setup_attrs(up=False)
ctxt = self.ctxt.elevated()
self.mox.StubOutWithMock(self.src_db_inst, 'compute_node_get_all')
self.mox.StubOutWithMock(self.mid_db_inst, 'compute_node_get_all')
self.mox.StubOutWithMock(self.tgt_db_inst, 'compute_node_get_all')
self.src_db_inst.compute_node_get_all(ctxt).AndReturn([1, 2])
self.mid_db_inst.compute_node_get_all(ctxt).AndReturn([3])
self.tgt_db_inst.compute_node_get_all(ctxt).AndReturn([4, 5])
self.mox.ReplayAll()
responses = self.src_msg_runner.compute_node_get_all(ctxt)
response_values = [(resp.cell_name, resp.value_or_raise())
for resp in responses]
expected = [('api-cell!child-cell2!grandchild-cell1', [4, 5]),
('api-cell!child-cell2', [3]),
('api-cell', [1, 2])]
self.assertEqual(expected, response_values)
def test_compute_node_get_all_with_hyp_match(self):
# Reset this, as this is a broadcast down.
self._setup_attrs(up=False)
hypervisor_match = 'meow'
ctxt = self.ctxt.elevated()
self.mox.StubOutWithMock(self.src_db_inst,
'compute_node_search_by_hypervisor')
self.mox.StubOutWithMock(self.mid_db_inst,
'compute_node_search_by_hypervisor')
self.mox.StubOutWithMock(self.tgt_db_inst,
'compute_node_search_by_hypervisor')
self.src_db_inst.compute_node_search_by_hypervisor(ctxt,
hypervisor_match).AndReturn([1, 2])
self.mid_db_inst.compute_node_search_by_hypervisor(ctxt,
hypervisor_match).AndReturn([3])
self.tgt_db_inst.compute_node_search_by_hypervisor(ctxt,
hypervisor_match).AndReturn([4, 5])
self.mox.ReplayAll()
responses = self.src_msg_runner.compute_node_get_all(ctxt,
hypervisor_match=hypervisor_match)
response_values = [(resp.cell_name, resp.value_or_raise())
for resp in responses]
expected = [('api-cell!child-cell2!grandchild-cell1', [4, 5]),
('api-cell!child-cell2', [3]),
('api-cell', [1, 2])]
self.assertEqual(expected, response_values)
def test_compute_node_stats(self):
# Reset this, as this is a broadcast down.
self._setup_attrs(up=False)
ctxt = self.ctxt.elevated()
self.mox.StubOutWithMock(self.src_db_inst,
'compute_node_statistics')
self.mox.StubOutWithMock(self.mid_db_inst,
'compute_node_statistics')
self.mox.StubOutWithMock(self.tgt_db_inst,
'compute_node_statistics')
self.src_db_inst.compute_node_statistics(ctxt).AndReturn([1, 2])
self.mid_db_inst.compute_node_statistics(ctxt).AndReturn([3])
self.tgt_db_inst.compute_node_statistics(ctxt).AndReturn([4, 5])
self.mox.ReplayAll()
responses = self.src_msg_runner.compute_node_stats(ctxt)
response_values = [(resp.cell_name, resp.value_or_raise())
for resp in responses]
expected = [('api-cell!child-cell2!grandchild-cell1', [4, 5]),
('api-cell!child-cell2', [3]),
('api-cell', [1, 2])]
self.assertEqual(expected, response_values)

View File

@@ -277,3 +277,30 @@ class CellsAPITestCase(test.TestCase):
self._check_result(call_info, 'task_log_get_all', expected_args,
version='1.3')
self.assertEqual(result, 'fake_response')
def test_compute_node_get_all(self):
call_info = self._stub_rpc_method('call', 'fake_response')
result = self.cells_rpcapi.compute_node_get_all(self.fake_context,
hypervisor_match='fake-match')
expected_args = {'hypervisor_match': 'fake-match'}
self._check_result(call_info, 'compute_node_get_all', expected_args,
version='1.4')
self.assertEqual(result, 'fake_response')
def test_compute_node_stats(self):
call_info = self._stub_rpc_method('call', 'fake_response')
result = self.cells_rpcapi.compute_node_stats(self.fake_context)
expected_args = {}
self._check_result(call_info, 'compute_node_stats',
expected_args, version='1.4')
self.assertEqual(result, 'fake_response')
def test_compute_node_get(self):
call_info = self._stub_rpc_method('call', 'fake_response')
result = self.cells_rpcapi.compute_node_get(self.fake_context,
'fake_compute_id')
expected_args = {'compute_id': 'fake_compute_id'}
self._check_result(call_info, 'compute_node_get',
expected_args, version='1.4')
self.assertEqual(result, 'fake_response')