Fixes task_log_get and task_log_get_all signatures

Fixes signatures for the above methods in nova/db/api.py
based on the recent TaskLog refactor on sqlalchemy/api.py.
Also adds unit tests for TaskLog.

Fixes Bug 1102652.

Change-Id: Id7ce74b8941d9eb6a50dfbfaa62e0fe05dd467c2
This commit is contained in:
Tiago Mello
2013-01-21 20:01:40 -02:00
parent 97a5274f5b
commit 6859f5e843
3 changed files with 56 additions and 16 deletions

View File

@@ -1705,16 +1705,14 @@ def task_log_end_task(context, task_name,
period_ending,
host,
errors,
message=None,
session=None):
message=None):
"""Mark a task as complete for a given host/time period."""
return IMPL.task_log_end_task(context, task_name,
period_beginning,
period_ending,
host,
errors,
message,
session)
message)
def task_log_begin_task(context, task_name,
@@ -1722,25 +1720,23 @@ def task_log_begin_task(context, task_name,
period_ending,
host,
task_items=None,
message=None,
session=None):
message=None):
"""Mark a task as started for a given host/time period."""
return IMPL.task_log_begin_task(context, task_name,
period_beginning,
period_ending,
host,
task_items,
message,
session)
message)
def task_log_get_all(context, task_name, period_beginning,
period_ending, host=None, state=None, session=None):
period_ending, host=None, state=None):
return IMPL.task_log_get_all(context, task_name, period_beginning,
period_ending, host, state, session)
period_ending, host, state)
def task_log_get(context, task_name, period_beginning,
period_ending, host, state=None, session=None):
period_ending, host, state=None):
return IMPL.task_log_get(context, task_name, period_beginning,
period_ending, host, state, session)
period_ending, host, state)

View File

@@ -4770,15 +4770,15 @@ def _task_log_get_query(context, task_name, period_beginning,
@require_admin_context
def task_log_get(context, task_name, period_beginning, period_ending, host,
state=None):
return _task_log_get_query(task_name, period_beginning, period_ending,
host, state).first()
return _task_log_get_query(context, task_name, period_beginning,
period_ending, host, state).first()
@require_admin_context
def task_log_get_all(context, task_name, period_beginning, period_ending,
host=None, state=None):
return _task_log_get_query(task_name, period_beginning, period_ending,
host, state).all()
return _task_log_get_query(context, task_name, period_beginning,
period_ending, host, state).all()
@require_admin_context

View File

@@ -1671,3 +1671,47 @@ class VolumeUsageDBApiTestCase(test.TestCase):
for key, value in expected_vol_usages.items():
self.assertEqual(vol_usages[0][key], value)
timeutils.clear_time_override()
class TaskLogTestCase(test.TestCase):
def setUp(self):
super(TaskLogTestCase, self).setUp()
self.context = context.get_admin_context()
now = timeutils.utcnow()
self.begin = now - datetime.timedelta(seconds=10)
self.end = now - datetime.timedelta(seconds=5)
self.task_name = 'fake-task-name'
self.host = 'fake-host'
self.message = 'Fake task message'
db.task_log_begin_task(self.context, self.task_name, self.begin,
self.end, self.host, message=self.message)
def test_task_log_get(self):
result = db.task_log_get(self.context, self.task_name, self.begin,
self.end, self.host)
self.assertEqual(result['task_name'], self.task_name)
self.assertEqual(result['period_beginning'], self.begin)
self.assertEqual(result['period_ending'], self.end)
self.assertEqual(result['host'], self.host)
self.assertEqual(result['message'], self.message)
def test_task_log_get_all(self):
result = db.task_log_get_all(self.context, self.task_name, self.begin,
self.end, host=self.host)
self.assertEqual(len(result), 1)
def test_task_log_begin_task(self):
db.task_log_begin_task(self.context, 'fake', self.begin,
self.end, self.host, message=self.message)
result = db.task_log_get(self.context, 'fake', self.begin,
self.end, self.host)
self.assertEqual(result['task_name'], 'fake')
def test_task_log_end_task(self):
errors = 1
db.task_log_end_task(self.context, self.task_name, self.begin,
self.end, self.host, errors, message=self.message)
result = db.task_log_get(self.context, self.task_name, self.begin,
self.end, self.host)
self.assertEqual(result['errors'], 1)