Cells: Add cells support to instance_usage_audit_log api extension

Adds task_log_get_all() call to HostAPI() so that we can use the local
DB when cells is disabled or proxy via cells if cells is enabled.

Adds task_log_get_all() call to cells.

Change-Id: I9e78c17fcf70f98903d0a26c1de3e2581b8977ad
This commit is contained in:
Chris Behrens
2013-01-16 18:49:45 +00:00
parent 24018cabb4
commit be992f7b9a
12 changed files with 303 additions and 21 deletions

View File

@@ -21,7 +21,7 @@ import datetime
import webob.exc
from nova.api.openstack import extensions
from nova import db
from nova import compute
from nova.openstack.common import cfg
from nova import utils
@@ -34,6 +34,8 @@ authorize = extensions.extension_authorizer('compute',
class InstanceUsageAuditLogController(object):
def __init__(self):
self.host_api = compute.HostAPI()
def index(self, req):
context = req.environ['nova.context']
@@ -78,12 +80,13 @@ class InstanceUsageAuditLogController(object):
begin = defbegin
if end is None:
end = defend
task_logs = db.task_log_get_all(context, "instance_usage_audit",
begin, end)
task_logs = self.host_api.task_log_get_all(context,
"instance_usage_audit",
begin, end)
# We do this this way to include disabled compute services,
# which can have instances on them. (mdragon)
services = [svc for svc in db.service_get_all(context)
if svc['topic'] == CONF.compute_topic]
filters = {'topic': CONF.compute_topic}
services = self.host_api.service_get_all(context, filters=filters)
hosts = set(serv['host'] for serv in services)
seen_hosts = set()
done_hosts = set()

View File

@@ -65,7 +65,7 @@ class CellsManager(manager.Manager):
Scheduling requests get passed to the scheduler class.
"""
RPC_API_VERSION = '1.2'
RPC_API_VERSION = '1.3'
def __init__(self, *args, **kwargs):
# Mostly for tests.
@@ -260,3 +260,38 @@ class CellsManager(manager.Manager):
response = self.msg_runner.proxy_rpc_to_manager(ctxt, cell_name,
host_name, topic, rpc_message, call, timeout)
return response.value_or_raise()
def task_log_get_all(self, ctxt, task_name, period_beginning,
period_ending, host=None, state=None):
"""Get task logs from the DB from all cells or a particular
cell.
If 'host' is not None, host will be of the format 'cell!name@host',
with '@host' being optional. The query will be directed to the
appropriate cell and return all task logs, or task logs matching
the host if specified.
'state' also may be None. If it's not, filter by the state as well.
"""
if host is None:
cell_name = None
else:
result = cells_utils.split_cell_and_item(host)
cell_name = result[0]
if len(result) > 1:
host = result[1]
else:
host = None
responses = self.msg_runner.task_log_get_all(ctxt, cell_name,
task_name, period_beginning, period_ending,
host=host, state=state)
# 1 response per cell. Each response is a list of task log
# entries.
ret_task_logs = []
for response in responses:
task_logs = response.value_or_raise()
for task_log in task_logs:
cells_utils.add_cell_to_task_log(task_log,
response.cell_name)
ret_task_logs.append(task_log)
return ret_task_logs

View File

@@ -600,6 +600,22 @@ class _BaseMessageMethods(base.Base):
self.state_manager = msg_runner.state_manager
self.compute_api = compute.API()
def task_log_get_all(self, message, task_name, period_beginning,
period_ending, host, state):
"""Get task logs from the DB. The message could have
directly targeted this cell, or it could have been a broadcast
message.
If 'host' is not None, filter by host.
If 'state' is not None, filter by state.
"""
task_logs = self.db.task_log_get_all(message.ctxt, task_name,
period_beginning,
period_ending,
host=host,
state=state)
return jsonutils.to_primitive(task_logs)
class _ResponseMessageMethods(_BaseMessageMethods):
"""Methods that are called from a ResponseMessage. There's only
@@ -1097,6 +1113,33 @@ class MessageRunner(object):
need_response=call)
return message.process()
def task_log_get_all(self, ctxt, cell_name, task_name,
period_beginning, period_ending,
host=None, state=None):
"""Get task logs from the DB from all cells or a particular
cell.
If 'cell_name' is None or '', get responses from all cells.
If 'host' is not None, filter by host.
If 'state' is not None, filter by state.
Return a list of Response objects.
"""
method_kwargs = dict(task_name=task_name,
period_beginning=period_beginning,
period_ending=period_ending,
host=host, state=state)
if cell_name:
message = _TargetedMessage(self, ctxt, 'task_log_get_all',
method_kwargs, 'down',
cell_name, need_response=True)
# Caller should get a list of Responses.
return [message.process()]
message = _BroadcastMessage(self, ctxt, 'task_log_get_all',
method_kwargs, 'down',
run_locally=True, need_response=True)
return message.process()
@staticmethod
def get_message_types():
return _CELL_MESSAGE_TYPE_TO_MESSAGE_CLS.keys()

View File

@@ -43,6 +43,7 @@ class CellsAPI(rpc_proxy.RpcProxy):
1.1 - Adds get_cell_info_for_neighbors() and sync_instances()
1.2 - Adds service_get_all(), service_get_by_compute_host(),
and proxy_rpc_to_compute_manager()
1.3 - Adds task_log_get_all()
'''
BASE_RPC_API_VERSION = '1.0'
@@ -185,3 +186,13 @@ class CellsAPI(rpc_proxy.RpcProxy):
timeout=timeout),
timeout=timeout,
version='1.2')
def task_log_get_all(self, ctxt, task_name, period_beginning,
period_ending, host=None, state=None):
"""Get the task logs from the DB in child cells."""
return self.call(ctxt, self.make_msg('task_log_get_all',
task_name=task_name,
period_beginning=period_beginning,
period_ending=period_ending,
host=host, state=state),
version='1.3')

View File

@@ -89,3 +89,11 @@ def add_cell_to_service(service, cell_name):
compute_node = service.get('compute_node')
if compute_node:
add_cell_to_compute_node(compute_node[0], cell_name)
def add_cell_to_task_log(task_log, cell_name):
"""Fix task_log attributes that should be unique. In particular,
the 'id' and 'host' fields should be prepended with cell name.
"""
task_log['id'] = cell_with_item(cell_name, task_log['id'])
task_log['host'] = cell_with_item(cell_name, task_log['host'])

View File

@@ -2471,6 +2471,17 @@ class HostAPI(base.Base):
"""Return all instances on the given host."""
return self.db.instance_get_all_by_host(context, host_name)
def task_log_get_all(self, context, task_name, period_beginning,
period_ending, host=None, state=None):
"""Return the task logs within a given range, optionally
filtering by host and/or state.
"""
return self.db.task_log_get_all(context, task_name,
period_beginning,
period_ending,
host=host,
state=state)
class AggregateAPI(base.Base):
"""Sub-set of the Compute Manager API for managing host aggregates."""

View File

@@ -635,3 +635,17 @@ class HostAPI(compute_api.HostAPI):
instances = [i for i in instances
if i['cell_name'] == cell_name]
return instances
def task_log_get_all(self, context, task_name, beginning, ending,
host=None, state=None):
"""Return the task logs within a given range from cells,
optionally filtering by the host and/or state. For cells, the
host should be a path like 'path!to!cell@host'. If no @host
is given, only task logs from a particular cell will be returned.
"""
return self.cells_rpcapi.task_log_get_all(context,
task_name,
beginning,
ending,
host=host,
state=state)

View File

@@ -80,11 +80,8 @@ TEST_LOGS3 = [
]
def fake_service_get_all(context):
return TEST_COMPUTE_SERVICES
def fake_task_log_get_all(context, task_name, begin, end):
def fake_task_log_get_all(context, task_name, begin, end,
host=None, state=None):
assert task_name == "instance_usage_audit"
if begin == begin1 and end == end1:
@@ -114,13 +111,18 @@ class InstanceUsageAuditLogTest(test.TestCase):
self.context = context.get_admin_context()
timeutils.set_time_override(datetime.datetime(2012, 7, 5, 10, 0, 0))
self.controller = ial.InstanceUsageAuditLogController()
self.host_api = self.controller.host_api
def fake_service_get_all(context, disabled):
self.assertTrue(disabled is None)
return TEST_COMPUTE_SERVICES
self.stubs.Set(utils, 'last_completed_audit_period',
fake_last_completed_audit_period)
self.stubs.Set(db, 'service_get_all',
fake_service_get_all)
fake_service_get_all)
self.stubs.Set(db, 'task_log_get_all',
fake_task_log_get_all)
fake_task_log_get_all)
def tearDown(self):
super(InstanceUsageAuditLogTest, self).tearDown()

View File

@@ -37,6 +37,8 @@ FAKE_SERVICES = [dict(id=1, host='host1',
dict(id=2, host='host2',
compute_node=[FAKE_COMPUTE_NODES[1]]),
dict(id=3, host='host3', compute_node=[])]
FAKE_TASK_LOGS = [dict(id=1, host='host1'),
dict(id=2, host='host2')]
class CellsManagerClassTestCase(test.TestCase):
@@ -52,14 +54,6 @@ class CellsManagerClassTestCase(test.TestCase):
self.driver = self.cells_manager.driver
self.ctxt = 'fake_context'
def _get_fake_responses(self):
responses = []
expected_responses = []
for x in xrange(1, 4):
responses.append(messaging.Response('cell%s' % x, x, False))
expected_responses.append(('cell%s' % x, x))
return expected_responses, responses
def _get_fake_response(self, raw_response=None, exc=False):
if exc:
return messaging.Response('fake', test.TestingException(),
@@ -313,3 +307,66 @@ class CellsManagerClassTestCase(test.TestCase):
topic=topic, rpc_message='fake-rpc-msg', call=True,
timeout=-1)
self.assertEqual('fake-response', response)
def _build_task_log_responses(self, num):
responses = []
expected_response = []
# 3 cells... so 3 responses. Each response is a list of task log
# entries. Manager should turn these into a single list of
# task log entries.
for i in xrange(num):
cell_name = 'path!to!cell%i' % i
task_logs = []
for task_log in FAKE_TASK_LOGS:
task_logs.append(copy.deepcopy(task_log))
expected_task_log = copy.deepcopy(task_log)
cells_utils.add_cell_to_task_log(expected_task_log,
cell_name)
expected_response.append(expected_task_log)
response = messaging.Response(cell_name, task_logs, False)
responses.append(response)
return expected_response, responses
def test_task_log_get_all(self):
expected_response, responses = self._build_task_log_responses(3)
self.mox.StubOutWithMock(self.msg_runner,
'task_log_get_all')
self.msg_runner.task_log_get_all(self.ctxt, None,
'fake-name', 'fake-begin',
'fake-end', host=None, state=None).AndReturn(responses)
self.mox.ReplayAll()
response = self.cells_manager.task_log_get_all(self.ctxt,
task_name='fake-name',
period_beginning='fake-begin', period_ending='fake-end')
self.assertEqual(expected_response, response)
def test_task_log_get_all_with_filters(self):
expected_response, responses = self._build_task_log_responses(1)
cell_and_host = cells_utils.cell_with_item('fake-cell', 'fake-host')
self.mox.StubOutWithMock(self.msg_runner,
'task_log_get_all')
self.msg_runner.task_log_get_all(self.ctxt, 'fake-cell',
'fake-name', 'fake-begin', 'fake-end', host='fake-host',
state='fake-state').AndReturn(responses)
self.mox.ReplayAll()
response = self.cells_manager.task_log_get_all(self.ctxt,
task_name='fake-name',
period_beginning='fake-begin', period_ending='fake-end',
host=cell_and_host, state='fake-state')
self.assertEqual(expected_response, response)
def test_task_log_get_all_with_cell_but_no_host_filters(self):
expected_response, responses = self._build_task_log_responses(1)
# Host filter only has cell name.
cell_and_host = 'fake-cell'
self.mox.StubOutWithMock(self.msg_runner,
'task_log_get_all')
self.msg_runner.task_log_get_all(self.ctxt, 'fake-cell',
'fake-name', 'fake-begin', 'fake-end', host=None,
state='fake-state').AndReturn(responses)
self.mox.ReplayAll()
response = self.cells_manager.task_log_get_all(self.ctxt,
task_name='fake-name',
period_beginning='fake-begin', period_ending='fake-end',
host=cell_and_host, state='fake-state')
self.assertEqual(expected_response, response)

View File

@@ -789,6 +789,28 @@ class CellsTargetedMethodsTestCase(test.TestCase):
fake_topic,
fake_rpc_message, False, timeout=None)
def test_task_log_get_all_targetted(self):
task_name = 'fake_task_name'
begin = 'fake_begin'
end = 'fake_end'
host = 'fake_host'
state = 'fake_state'
self.mox.StubOutWithMock(self.tgt_db_inst, 'task_log_get_all')
self.tgt_db_inst.task_log_get_all(self.ctxt, task_name,
begin, end, host=host,
state=state).AndReturn(['fake_result'])
self.mox.ReplayAll()
response = self.src_msg_runner.task_log_get_all(self.ctxt,
self.tgt_cell_name, task_name, begin, end, host=host,
state=state)
self.assertTrue(isinstance(response, list))
self.assertEqual(1, len(response))
result = response[0].value_or_raise()
self.assertEqual(['fake_result'], result)
class CellsBroadcastMethodsTestCase(test.TestCase):
"""Test case for _BroadcastMessageMethods class. Most of these
@@ -1085,3 +1107,36 @@ class CellsBroadcastMethodsTestCase(test.TestCase):
('api-cell!child-cell2', [3]),
('api-cell', [1, 2])]
self.assertEqual(expected, response_values)
def test_task_log_get_all_broadcast(self):
# Reset this, as this is a broadcast down.
self._setup_attrs(up=False)
task_name = 'fake_task_name'
begin = 'fake_begin'
end = 'fake_end'
host = 'fake_host'
state = 'fake_state'
ctxt = self.ctxt.elevated()
self.mox.StubOutWithMock(self.src_db_inst, 'task_log_get_all')
self.mox.StubOutWithMock(self.mid_db_inst, 'task_log_get_all')
self.mox.StubOutWithMock(self.tgt_db_inst, 'task_log_get_all')
self.src_db_inst.task_log_get_all(ctxt, task_name,
begin, end, host=host, state=state).AndReturn([1, 2])
self.mid_db_inst.task_log_get_all(ctxt, task_name,
begin, end, host=host, state=state).AndReturn([3])
self.tgt_db_inst.task_log_get_all(ctxt, task_name,
begin, end, host=host, state=state).AndReturn([4, 5])
self.mox.ReplayAll()
responses = self.src_msg_runner.task_log_get_all(ctxt, None,
task_name, begin, end, host=host, state=state)
response_values = [(resp.cell_name, resp.value_or_raise())
for resp in responses]
expected = [('api-cell!child-cell2!grandchild-cell1', [4, 5]),
('api-cell!child-cell2', [3]),
('api-cell', [1, 2])]
self.assertEqual(expected, response_values)

View File

@@ -259,3 +259,21 @@ class CellsAPITestCase(test.TestCase):
expected_args,
version='1.2')
self.assertEqual(result, 'fake_response')
def test_task_log_get_all(self):
call_info = self._stub_rpc_method('call', 'fake_response')
result = self.cells_rpcapi.task_log_get_all(self.fake_context,
task_name='fake_name',
period_beginning='fake_begin',
period_ending='fake_end',
host='fake_host',
state='fake_state')
expected_args = {'task_name': 'fake_name',
'period_beginning': 'fake_begin',
'period_ending': 'fake_end',
'host': 'fake_host',
'state': 'fake_state'}
self._check_result(call_info, 'task_log_get_all', expected_args,
version='1.3')
self.assertEqual(result, 'fake_response')

View File

@@ -197,6 +197,18 @@ class ComputeHostAPITestCase(test.TestCase):
'fake-host')
self.assertEqual(['fake-responses'], result)
def test_task_log_get_all(self):
self.mox.StubOutWithMock(self.host_api.db, 'task_log_get_all')
self.host_api.db.task_log_get_all(self.ctxt,
'fake-name', 'fake-begin', 'fake-end', host='fake-host',
state='fake-state').AndReturn('fake-response')
self.mox.ReplayAll()
result = self.host_api.task_log_get_all(self.ctxt, 'fake-name',
'fake-begin', 'fake-end', host='fake-host',
state='fake-state')
self.assertEqual('fake-response', result)
class ComputeHostAPICellsTestCase(ComputeHostAPITestCase):
def setUp(self):
@@ -296,3 +308,16 @@ class ComputeHostAPICellsTestCase(ComputeHostAPITestCase):
result = self.host_api.instance_get_all_by_host(self.ctxt,
cell_and_host)
self.assertEqual(expected_result, result)
def test_task_log_get_all(self):
self.mox.StubOutWithMock(self.host_api.cells_rpcapi,
'task_log_get_all')
self.host_api.cells_rpcapi.task_log_get_all(self.ctxt,
'fake-name', 'fake-begin', 'fake-end', host='fake-host',
state='fake-state').AndReturn('fake-response')
self.mox.ReplayAll()
result = self.host_api.task_log_get_all(self.ctxt, 'fake-name',
'fake-begin', 'fake-end', host='fake-host',
state='fake-state')
self.assertEqual('fake-response', result)