From 5fdf5149b315dad9577d44d98842369b67c8adc5 Mon Sep 17 00:00:00 2001 From: Kevin Chen Date: Wed, 29 May 2013 14:55:34 -0500 Subject: [PATCH] Fixed pep8 formatting... Finally. --- taskflow/tests/unit/test_sql_db_api.py | 631 +++++++++++++------------ 1 file changed, 340 insertions(+), 291 deletions(-) diff --git a/taskflow/tests/unit/test_sql_db_api.py b/taskflow/tests/unit/test_sql_db_api.py index 458c1637..21dbd84e 100644 --- a/taskflow/tests/unit/test_sql_db_api.py +++ b/taskflow/tests/unit/test_sql_db_api.py @@ -31,404 +31,453 @@ from taskflow.openstack.common import exception db_api.configure() db_api.SQL_CONNECTION = 'sqlite:///test.db' + def setUpModule(): - if not path.isfile('test.db'): - models.create_tables() + if not path.isfile('test.db'): + models.create_tables() + def tearDownModule(): - os.remove('test.db') + os.remove('test.db') """ JobTest """ + + class JobTest(unittest.TestCase): - wf_ids = [] - wf_names = [] - lb_ids = [] - lb_names = [] - job_ids = [] - job_names = [] + wf_ids = [] + wf_names = [] + lb_ids = [] + lb_names = [] + job_ids = [] + job_names = [] - @classmethod - def setUpClass(cls): - wf_fmt = u'workflow_{}' - lb_tmp = db_api.logbook_create('', u'logbook_1', 1) - cls.lb_ids.append(1) - cls.lb_names.append(u'logbook_1') - job_tmp = db_api.job_create('', u'job_1', 1) - cls.job_ids.append(1) - cls.job_names.append(u'job_1') - for i in range(1, 10): - wf_tmp = db_api.workflow_create('', wf_fmt.format(i)) + @classmethod + def setUpClass(cls): + wf_fmt = u'workflow_{}' + lb_tmp = db_api.logbook_create('', u'logbook_1', 1) + cls.lb_ids.append(1) + cls.lb_names.append(u'logbook_1') + job_tmp = db_api.job_create('', u'job_1', 1) + cls.job_ids.append(1) + cls.job_names.append(u'job_1') + for i in range(1, 10): + wf_tmp = db_api.workflow_create('', wf_fmt.format(i)) - db_api.logbook_add_workflow('', 1, wf_fmt.format(i)) - db_api.job_add_workflow('', 1, wf_fmt.format(i)) + db_api.logbook_add_workflow('', 1, wf_fmt.format(i)) + db_api.job_add_workflow('', 1, wf_fmt.format(i)) - cls.wf_ids.append(i) - cls.wf_names.append(wf_fmt.format(i)) + cls.wf_ids.append(i) + cls.wf_names.append(wf_fmt.format(i)) - @classmethod - def tearDownClass(cls): - for name in cls.wf_names: - db_api.workflow_destroy('', name) - for id in cls.lb_ids: - db_api.logbook_destroy('', id) - for id in cls.job_ids: - db_api.job_destroy('', id) - cls.wf_ids = [] - cls.wf_names = [] - cls.lb_ids = [] - cls.lb_names = [] - cls.job_ids = [] - cls.job_names = [] + @classmethod + def tearDownClass(cls): + for name in cls.wf_names: + db_api.workflow_destroy('', name) + for id in cls.lb_ids: + db_api.logbook_destroy('', id) + for id in cls.job_ids: + db_api.job_destroy('', id) + cls.wf_ids = [] + cls.wf_names = [] + cls.lb_ids = [] + cls.lb_names = [] + cls.job_ids = [] + cls.job_names = [] - def test_job_get(self): - expected = self.job_names[0] - actual = db_api.job_get('', self.job_ids[0]).name + def test_job_get(self): + print '\nTesting job_get...' + expected = self.job_names[0] + actual = db_api.job_get('', self.job_ids[0]).name - self.assertEquals(expected, actual) + self.assertEquals(expected, actual) - self.assertRaises(exception.NotFound, db_api.job_get, '', 9001) + self.assertRaises(exception.NotFound, db_api.job_get, '', 9001) - def test_job_update(self): - db_api.job_update('', 1, dict(owner='OwnerTest', state=states.CLAIMED)) - job = db_api.job_get('', 1) + def test_job_update(self): + print '\nTesting job_update...' + db_api.job_update('', 1, dict(owner='OwnerTest', state=states.CLAIMED)) + job = db_api.job_get('', 1) - expected = 'OwnerTest' - actual = job.owner + expected = 'OwnerTest' + actual = job.owner - self.assertEquals(expected, actual) + self.assertEquals(expected, actual) - expected = states.CLAIMED - actual = job.state + expected = states.CLAIMED + actual = job.state - self.assertEquals(expected, actual) + self.assertEquals(expected, actual) - self.assertRaises(exception.NotFound, db_api.job_update, '', 9001, dict(owner='OwnerTest', state=states.CLAIMED)) + self.assertRaises(exception.NotFound, db_api.job_update, '', 9001, + dict(owner='OwnerTest', state=states.CLAIMED)) - def test_job_add_workflow(self): - db_api.workflow_create('', u'workflow_10') - self.wf_ids.append(10) - self.wf_names.append(u'workflow_10') + def test_job_add_workflow(self): + print '\nTesting job_add_workflow...' + db_api.workflow_create('', u'workflow_10') + self.wf_ids.append(10) + self.wf_names.append(u'workflow_10') - expected = self.wf_ids - actual = [] - temp = db_api.job_add_workflow('', 1, u'workflow_10') + expected = self.wf_ids + actual = [] + temp = db_api.job_add_workflow('', 1, u'workflow_10') - for workflow in temp: - actual.append(workflow.id) + for workflow in temp: + actual.append(workflow.id) - self.assertEquals(expected, actual) + self.assertEquals(expected, actual) - self.assertRaises(exception.NotFound, db_api.job_add_workflow, '', 9001, u'workflow_10') - self.assertRaises(exception.NotFound, db_api.job_add_workflow, '', 1, u'workflow_9001') + self.assertRaises(exception.NotFound, db_api.job_add_workflow, '', + 9001, u'workflow_10') + self.assertRaises(exception.NotFound, db_api.job_add_workflow, '', + 1, u'workflow_9001') - def test_job_get_owner(self): - actual = db_api.job_get_owner('', 1) + def test_job_get_owner(self): + print '\nTesting job_get_owner...' + actual = db_api.job_get_owner('', 1) - self.assertIsNone(actual) + self.assertIsNone(actual) - self.assertRaises(exception.NotFound, db_api.job_get_owner, '', 9001) + self.assertRaises(exception.NotFound, db_api.job_get_owner, '', 9001) - def test_job_get_state(self): - expected = states.UNCLAIMED - actual = db_api.job_get_state('', 1) + def test_job_get_state(self): + print '\nTesting job_get_state...' + expected = states.UNCLAIMED + actual = db_api.job_get_state('', 1) - self.assertEquals(expected, actual) + self.assertEquals(expected, actual) - self.assertRaises(exception.NotFound, db_api.job_get_state, '', 9001) + self.assertRaises(exception.NotFound, db_api.job_get_state, '', 9001) - def test_job_get_logbook(self): - expected = self.lb_names[0] - actual = db_api.job_get_logbook('', 1).name + def test_job_get_logbook(self): + print '\nTesting job_get_logbook...' + expected = self.lb_names[0] + actual = db_api.job_get_logbook('', 1).name - self.assertEquals(expected, actual) + self.assertEquals(expected, actual) - self.assertRaises(exception.NotFound, db_api.job_get_logbook, '', 9001) + self.assertRaises(exception.NotFound, db_api.job_get_logbook, '', 9001) - def test_job_create(self): - id = 1 - while (self.job_ids.count(id) > 0): - id = id + 1 - job_tmp = db_api.job_create('', u'job_{}'.format(id), id) - self.job_ids.append(id) - self.job_names.append(u'job_{}'.format(id)) + def test_job_create(self): + print '\nTesting job_create...' + id = 1 + while (self.job_ids.count(id) > 0): + id = id + 1 + job_tmp = db_api.job_create('', u'job_{}'.format(id), id) + self.job_ids.append(id) + self.job_names.append(u'job_{}'.format(id)) - actual = db_api.job_get('', id) - self.assertIsNotNone(actual) + actual = db_api.job_get('', id) + self.assertIsNotNone(actual) - def test_job_destroy(self): - id = self.job_ids.pop() - db_api.job_destroy('', id) - self.job_names.pop() + def test_job_destroy(self): + print '\nTesting job_destroy...' + id = self.job_ids.pop() + db_api.job_destroy('', id) + self.job_names.pop() - self.assertRaises(exception.NotFound, db_api.job_get, '', id) + self.assertRaises(exception.NotFound, db_api.job_get, '', id) """ LogBookTest """ + + class LogBookTest(unittest.TestCase): - wf_ids = [] - wf_names = [] - lb_ids = [] - lb_names = [] + wf_ids = [] + wf_names = [] + lb_ids = [] + lb_names = [] - @classmethod - def setUpClass(cls): - wf_fmt = u'workflow_{}' - lb_tmp = db_api.logbook_create('', u'logbook_1', 1) - cls.lb_ids.append(1) - cls.lb_names.append(u'logbook_1') - for i in range(1, 10): - wf_tmp = db_api.workflow_create('', wf_fmt.format(i)) + @classmethod + def setUpClass(cls): + wf_fmt = u'workflow_{}' + lb_tmp = db_api.logbook_create('', u'logbook_1', 1) + cls.lb_ids.append(1) + cls.lb_names.append(u'logbook_1') + for i in range(1, 10): + wf_tmp = db_api.workflow_create('', wf_fmt.format(i)) - db_api.logbook_add_workflow('', 1, wf_fmt.format(i)) + db_api.logbook_add_workflow('', 1, wf_fmt.format(i)) - cls.wf_ids.append(i) - cls.wf_names.append(wf_fmt.format(i)) + cls.wf_ids.append(i) + cls.wf_names.append(wf_fmt.format(i)) - @classmethod - def tearDownClass(cls): - for name in cls.wf_names: - db_api.workflow_destroy('', name) - for id in cls.lb_ids: - db_api.logbook_destroy('', id) - cls.wf_ids = [] - cls.wf_names = [] - cls.lb_ids = [] - cls.lb_names = [] + @classmethod + def tearDownClass(cls): + for name in cls.wf_names: + db_api.workflow_destroy('', name) + for id in cls.lb_ids: + db_api.logbook_destroy('', id) + cls.wf_ids = [] + cls.wf_names = [] + cls.lb_ids = [] + cls.lb_names = [] - def test_logbook_get(self): - expected = self.lb_names[0] - actual = db_api.logbook_get('', self.lb_ids[0]).name + def test_logbook_get(self): + print '\nTesting logbook_get...' + expected = self.lb_names[0] + actual = db_api.logbook_get('', self.lb_ids[0]).name - self.assertEquals(expected, actual) + self.assertEquals(expected, actual) - self.assertRaises(exception.NotFound, db_api.logbook_get, '', 9001) + self.assertRaises(exception.NotFound, db_api.logbook_get, '', 9001) - def test_logbook_get_by_name(self): - expected = [self.lb_ids[0]] - actual = [] - for logbook in db_api.logbook_get_by_name('', self.lb_names[0]): - actual.append(logbook.id) + def test_logbook_get_by_name(self): + print '\nTesting logbook_get_by_name...' + expected = [self.lb_ids[0]] + actual = [] + for logbook in db_api.logbook_get_by_name('', self.lb_names[0]): + actual.append(logbook.id) - self.assertEquals(expected, actual) + self.assertEquals(expected, actual) - self.assertRaises(exception.NotFound, db_api.logbook_get_by_name, '', u'logbook_9001') + self.assertRaises(exception.NotFound, db_api.logbook_get_by_name, '', + u'logbook_9001') - def test_logbook_create(self): - id = 1 - while (self.lb_ids.count(id) > 0): - id = id + 1 - lb_tmp = db_api.logbook_create('', u'logbook_{}'.format(id), id) - self.lb_ids.append(id) - self.lb_names.append(u'logbook_{}'.format(id)) + def test_logbook_create(self): + print '\nTesting logbook_create...' + id = 1 + while (self.lb_ids.count(id) > 0): + id = id + 1 + lb_tmp = db_api.logbook_create('', u'logbook_{}'.format(id), id) + self.lb_ids.append(id) + self.lb_names.append(u'logbook_{}'.format(id)) - actual = db_api.logbook_get('', id) + actual = db_api.logbook_get('', id) - self.assertIsNotNone(actual) + self.assertIsNotNone(actual) - def test_logbook_get_workflows(self): - expected = self.wf_ids - actual = [] - wfs = db_api.logbook_get_workflows('', self.lb_ids[0]) + def test_logbook_get_workflows(self): + print '\nTesting logbook_get_workflows...' + expected = self.wf_ids + actual = [] + wfs = db_api.logbook_get_workflows('', self.lb_ids[0]) - for workflow in wfs: - actual.append(workflow.id) + for workflow in wfs: + actual.append(workflow.id) - self.assertEquals(expected, actual) + self.assertEquals(expected, actual) - self.assertRaises(exception.NotFound, db_api.logbook_get_workflows, '', 9001) + self.assertRaises(exception.NotFound, db_api.logbook_get_workflows, + '', 9001) - def test_logbook_add_workflow(self): - db_api.workflow_create('', u'workflow_10') - self.wf_ids.append(10) - self.wf_names.append(u'workflow_10') + def test_logbook_add_workflow(self): + print '\nTesting logbook_add_workflow...' + db_api.workflow_create('', u'workflow_10') + self.wf_ids.append(10) + self.wf_names.append(u'workflow_10') - expected = self.wf_ids - actual = [] - temp = db_api.logbook_add_workflow('', 1, u'workflow_10') + expected = self.wf_ids + actual = [] + temp = db_api.logbook_add_workflow('', 1, u'workflow_10') - for workflow in temp: - actual.append(workflow.id) + for workflow in temp: + actual.append(workflow.id) - self.assertEquals(expected, actual) + self.assertEquals(expected, actual) - self.assertRaises(exception.NotFound, db_api.logbook_add_workflow, '', 9001, u'workflow_10') - self.assertRaises(exception.NotFound, db_api.logbook_add_workflow, '', 1, u'workflow_9001') + self.assertRaises(exception.NotFound, db_api.logbook_add_workflow, '', + 9001, u'workflow_10') + self.assertRaises(exception.NotFound, db_api.logbook_add_workflow, '', + 1, u'workflow_9001') - def test_logbook_destroy(self): - id = self.lb_ids.pop() - db_api.logbook_destroy('', id) - self.lb_names.pop() + def test_logbook_destroy(self): + print '\nTesting logbook_destroy...' + id = self.lb_ids.pop() + db_api.logbook_destroy('', id) + self.lb_names.pop() + + self.assertRaises(exception.NotFound, db_api.logbook_get, '', id) - self.assertRaises(exception.NotFound, db_api.logbook_get, '', id) - """ WorkflowTest """ + + class WorkflowTest(unittest.TestCase): - tsk_ids = [] - tsk_names = [] - wf_ids = [] - wf_names = [] + tsk_ids = [] + tsk_names = [] + wf_ids = [] + wf_names = [] - @classmethod - def setUpClass(cls): - wf_fmt = u'workflow_{}' - tsk_fmt = u'task_{}' - for i in range(1, 10): - wf_tmp = db_api.workflow_create('', wf_fmt.format(i)) - tsk_tmp = db_api.task_create('', tsk_fmt.format(i), i, i) + @classmethod + def setUpClass(cls): + wf_fmt = u'workflow_{}' + tsk_fmt = u'task_{}' + for i in range(1, 10): + wf_tmp = db_api.workflow_create('', wf_fmt.format(i)) + tsk_tmp = db_api.task_create('', tsk_fmt.format(i), i, i) - db_api.workflow_add_task('', wf_fmt.format(i), i) - - cls.tsk_ids.append(i) - cls.tsk_names.append(tsk_fmt.format(i)) - cls.wf_ids.append(i) - cls.wf_names.append(wf_fmt.format(i)) - - @classmethod - def teardownClass(cls): - for id in cls.tsk_ids: - db_api.task_destroy('', id) - for name in cls.wf_names: - db_api.workflow_destroy('', name) - cls.tsk_ids = [] - cls.tsk_names = [] - cls.wf_ids = [] - cls.wf_names = [] - - def test_workflow_get(self): - expected = self.wf_ids[0] - actual = db_api.workflow_get('', self.wf_names[0]).id + db_api.workflow_add_task('', wf_fmt.format(i), i) - self.assertEquals(expected, actual) + cls.tsk_ids.append(i) + cls.tsk_names.append(tsk_fmt.format(i)) + cls.wf_ids.append(i) + cls.wf_names.append(wf_fmt.format(i)) - self.assertRaises(exception.NotFound, db_api.workflow_get, '', u'workflow_9001') + @classmethod + def teardownClass(cls): + for id in cls.tsk_ids: + db_api.task_destroy('', id) + for name in cls.wf_names: + db_api.workflow_destroy('', name) + cls.tsk_ids = [] + cls.tsk_names = [] + cls.wf_ids = [] + cls.wf_names = [] - def test_workflow_get_all(self): - expected = self.wf_ids - actual = [] - temp = db_api.workflow_get_all('') + def test_workflow_get(self): + print '\nTesting workflow_get...' + expected = self.wf_ids[0] + actual = db_api.workflow_get('', self.wf_names[0]).id - for workflow in temp: - actual.append(workflow.id) - - self.assertEquals(expected, actual) + self.assertEquals(expected, actual) - def test_workflow_get_names(self): - expected = [] - for name in self.wf_names: - expected.append(name) - expected = tuple(expected) - expected = [expected] - actual = db_api.workflow_get_names('') + self.assertRaises(exception.NotFound, db_api.workflow_get, '', + u'workflow_9001') - self.assertEquals(expected, actual) + def test_workflow_get_all(self): + print '\nTesting workflow_get_all...' + expected = self.wf_ids + actual = [] + temp = db_api.workflow_get_all('') - def test_workflow_get_tasks(self): - expected = [self.tsk_names[0], self.tsk_names[9]] - actual = [] - temp = db_api.workflow_get_tasks('', u'workflow_1') + for workflow in temp: + actual.append(workflow.id) - for task in temp: - actual.append(task.name) + self.assertEquals(expected, actual) - self.assertEquals(expected, actual) + def test_workflow_get_names(self): + print '\nTesting workflow_get_names...' + expected = [] + for name in self.wf_names: + expected.append(name) + expected = tuple(expected) + expected = [expected] + actual = db_api.workflow_get_names('') - self.assertRaises(exception.NotFound, db_api.workflow_get_tasks, '', u'workflow_9001') + self.assertEquals(expected, actual) - def test_workflow_add_task(self): - tsk_tmp = db_api.task_create('', u'task_10', 1, 10) - db_api.workflow_add_task('', u'workflow_1', 10) - self.tsk_ids.append(10) - self.tsk_names.append('task_10') - expected = [self.tsk_names[0], self.tsk_names[9]] - tsks = db_api.workflow_get_tasks('', u'workflow_1') - actual = [tsks[0].name, tsks[1].name] + def test_workflow_get_tasks(self): + print '\nTesting workflow_get_tasks...' + expected = [self.tsk_names[0], self.tsk_names[9]] + actual = [] + temp = db_api.workflow_get_tasks('', u'workflow_1') - self.assertEquals(expected, actual) + for task in temp: + actual.append(task.name) - self.assertRaises(exception.NotFound, db_api.workflow_add_task, '', u'workflow_9001', 10) - self.assertRaises(exception.NotFound, db_api.workflow_add_task, '', u'workflow_1', 9001) + self.assertEquals(expected, actual) - def test_workflow_create(self): - id = 0 - while (self.wf_ids.count(id) > 0): - id = id + 1 - wf_tmp = db_api.workflow_create('', u'workflow_{}'.format(id)) - self.wf_ids.append(id) - self.wf_names.append(u'workflow_{}'.format(id)) - - self.assertIsNotNone(db_api.workflow_get('', u'workflow_{}'.format(id))) + self.assertRaises(exception.NotFound, db_api.workflow_get_tasks, '', + u'workflow_9001') - def test_workflow_destroy(self): - name = self.wf_names.pop() - db_api.workflow_destroy('', name) - self.wf_ids.pop() + def test_workflow_add_task(self): + print '\nTesting workflow_add_task...' + tsk_tmp = db_api.task_create('', u'task_10', 1, 10) + db_api.workflow_add_task('', u'workflow_1', 10) + self.tsk_ids.append(10) + self.tsk_names.append('task_10') + expected = [self.tsk_names[0], self.tsk_names[9]] + tsks = db_api.workflow_get_tasks('', u'workflow_1') + actual = [tsks[0].name, tsks[1].name] - self.assertRaises(exception.NotFound, db_api.workflow_get, '', name) + self.assertEquals(expected, actual) + + self.assertRaises(exception.NotFound, db_api.workflow_add_task, '', + u'workflow_9001', 10) + self.assertRaises(exception.NotFound, db_api.workflow_add_task, '', + u'workflow_1', 9001) + + def test_workflow_create(self): + print '\nTesting workflow_create...' + id = 0 + while (self.wf_ids.count(id) > 0): + id = id + 1 + wf_tmp = db_api.workflow_create('', u'workflow_{}'.format(id)) + self.wf_ids.append(id) + self.wf_names.append(u'workflow_{}'.format(id)) + + self.assertIsNotNone(db_api.workflow_get('', + u'workflow_{}'.format(id))) + + def test_workflow_destroy(self): + print '\nTesting workflow_destroy...' + name = self.wf_names.pop() + db_api.workflow_destroy('', name) + self.wf_ids.pop() + + self.assertRaises(exception.NotFound, db_api.workflow_get, '', name) """ TaskTest """ + + class TaskTest(unittest.TestCase): - tsk_ids = [] - tsk_names = [] + tsk_ids = [] + tsk_names = [] - @classmethod - def setUpClass(cls): - tsk_fmt = u'task_{}' - for i in range(1, 10): - tsk_tmp = db_api.task_create('', tsk_fmt.format(i), i, i) - cls.tsk_ids.append(i) - cls.tsk_names.append(tsk_fmt.format(i)) + @classmethod + def setUpClass(cls): + tsk_fmt = u'task_{}' + for i in range(1, 10): + tsk_tmp = db_api.task_create('', tsk_fmt.format(i), i, i) + cls.tsk_ids.append(i) + cls.tsk_names.append(tsk_fmt.format(i)) - @classmethod - def teardownClass(cls): - for id in cls.tsk_ids: - db_api.task_destroy('', id) - cls.tsk_ids = [] - cls.tsk_names = [] + @classmethod + def teardownClass(cls): + for id in cls.tsk_ids: + db_api.task_destroy('', id) + cls.tsk_ids = [] + cls.tsk_names = [] - def test_task_get(self): - expected = self.tsk_names[0] - actual = db_api.task_get('', self.tsk_ids[0]) + def test_task_get(self): + print '\nTesting task_get...' + expected = self.tsk_names[0] + actual = db_api.task_get('', self.tsk_ids[0]) - self.assertEquals(expected, actual.name) + self.assertEquals(expected, actual.name) - self.assertRaises(exception.NotFound, db_api.task_get, '', 9001) + self.assertRaises(exception.NotFound, db_api.task_get, '', 9001) - def test_task_create(self): - id = 1 - while (self.tsk_ids.count(id) > 0): - id = id + 1 - tsk_tmp = db_api.task_create('', u'task_{}'.format(id), 1, id) - self.tsk_ids.append(id) - self.tsk_names.append(u'task_{}'.format(id)) + def test_task_create(self): + print '\nTesting task_create...' + id = 1 + while (self.tsk_ids.count(id) > 0): + id = id + 1 + tsk_tmp = db_api.task_create('', u'task_{}'.format(id), 1, id) + self.tsk_ids.append(id) + self.tsk_names.append(u'task_{}'.format(id)) - self.assertIsNotNone(db_api.task_get('', id)) + self.assertIsNotNone(db_api.task_get('', id)) - def test_task_update(self): - db_api.task_update('', 1, dict(exception='ExceptionTest', stacktrace='StacktraceTest')) - task = db_api.task_get('', 1) + def test_task_update(self): + print '\nTesting task_update...' + db_api.task_update('', 1, dict(exception='ExceptionTest', + stacktrace='StacktraceTest')) + task = db_api.task_get('', 1) - expected = 'ExceptionTest' - actual = task.exception + expected = 'ExceptionTest' + actual = task.exception - self.assertEquals(expected, actual) + self.assertEquals(expected, actual) - expected = 'StacktraceTest' - actual = task.stacktrace + expected = 'StacktraceTest' + actual = task.stacktrace - self.assertEquals(expected, actual) + self.assertEquals(expected, actual) - self.assertRaises(exception.NotFound, db_api.task_update, '', 9001, dict(exception='ExceptionTest', stacktrace='StacktraceTest')) + self.assertRaises(exception.NotFound, db_api.task_update, '', 9001, + dict(exception='ExceptionTest', stacktrace='StacktraceTest')) - def test_task_destroy(self): - id = self.tsk_ids.pop() - db_api.task_destroy('', id) - self.tsk_names.pop() + def test_task_destroy(self): + print '\nTesting task_destroy...' + id = self.tsk_ids.pop() + db_api.task_destroy('', id) + self.tsk_names.pop() - self.assertRaises(exception.NotFound, db_api.task_get, '', id) + self.assertRaises(exception.NotFound, db_api.task_get, '', id)