Files
deb-python-taskflow/taskflow/tests/unit/test_db_api.py
2013-05-28 16:11:46 -05:00

444 lines
12 KiB
Python

""" INSERT HEADER HERE """
"""Import required libraries"""
import os
import unittest
from os import path
from oslo.config import cfg
from taskflow import states
from taskflow.db import api as db_api
from taskflow.db.sqlalchemy import models
from taskflow.db.sqlalchemy.session import get_session
from taskflow.openstack.common import exception
db_api.configure()
db_api.SQL_CONNECTION = 'sqlite:///test.db'
def setUpModule():
if not path.isfile('test.db'):
models.create_tables()
def tearDownModule():
os.remove('test.db')
"""
JobTest
"""
class JobTest(unittest.TestCase):
wf_ids = []
wf_names = []
lb_ids = []
lb_names = []
job_ids = []
job_names = []
@classmethod
def setUpClass(cls):
wf_fmt = u'workflow_{}'
lb_tmp = db_api.logbook_create('', u'logbook_1', 1)
cls.lb_ids.append(1)
cls.lb_names.append(u'logbook_1')
job_tmp = db_api.job_create('', u'job_1', 1)
cls.job_ids.append(1)
cls.job_names.append(u'job_1')
for i in range(1, 10):
wf_tmp = db_api.workflow_create('', wf_fmt.format(i))
db_api.logbook_add_workflow('', 1, wf_fmt.format(i))
db_api.job_add_workflow('', 1, wf_fmt.format(i))
cls.wf_ids.append(i)
cls.wf_names.append(wf_fmt.format(i))
@classmethod
def tearDownClass(cls):
for name in cls.wf_names:
db_api.workflow_destroy('', name)
for id in cls.lb_ids:
db_api.logbook_destroy('', id)
for id in cls.job_ids:
db_api.job_destroy('', id)
cls.wf_ids = []
cls.wf_names = []
cls.lb_ids = []
cls.lb_names = []
cls.job_ids = []
cls.job_names = []
def test_job_get(self):
print '\nTesting job_get...'
expected = self.job_names[0]
actual = db_api.job_get('', self.job_ids[0]).name
self.assertEquals(expected, actual)
self.assertRaises(exception.NotFound, db_api.job_get, '', 9001)
def test_job_update(self):
print '\nTesting job_update...'
db_api.job_update('', 1, dict(owner='OwnerTest', state=states.CLAIMED))
job = db_api.job_get('', 1)
expected = 'OwnerTest'
actual = job.owner
self.assertEquals(expected, actual)
expected = states.CLAIMED
actual = job.state
self.assertEquals(expected, actual)
self.assertRaises(exception.NotFound, db_api.job_update, '', 9001, dict(owner='OwnerTest', state=states.CLAIMED))
def test_job_add_workflow(self):
print '\nTesting job_add_workflow...'
db_api.workflow_create('', u'workflow_10')
self.wf_ids.append(10)
self.wf_names.append(u'workflow_10')
expected = self.wf_ids
actual = []
temp = db_api.job_add_workflow('', 1, u'workflow_10')
for workflow in temp:
actual.append(workflow.id)
self.assertEquals(expected, actual)
self.assertRaises(exception.NotFound, db_api.job_add_workflow, '', 9001, u'workflow_10')
self.assertRaises(exception.NotFound, db_api.job_add_workflow, '', 1, u'workflow_9001')
def test_job_get_owner(self):
print '\nTesting job_get_owner...'
actual = db_api.job_get_owner('', 1)
self.assertIsNone(actual)
self.assertRaises(exception.NotFound, db_api.job_get_owner, '', 9001)
def test_job_get_state(self):
print '\nTesting job_get_state...'
expected = states.UNCLAIMED
actual = db_api.job_get_state('', 1)
self.assertEquals(expected, actual)
self.assertRaises(exception.NotFound, db_api.job_get_state, '', 9001)
def test_job_get_logbook(self):
print '\nTesting job_get_logbook...'
expected = self.lb_names[0]
actual = db_api.job_get_logbook('', 1).name
self.assertEquals(expected, actual)
self.assertRaises(exception.NotFound, db_api.job_get_logbook, '', 9001)
def test_job_create(self):
print '\nTesting job_create...'
id = 1
while (self.job_ids.count(id) > 0):
id = id + 1
job_tmp = db_api.job_create('', u'job_{}'.format(id), id)
self.job_ids.append(id)
self.job_names.append(u'job_{}'.format(id))
actual = db_api.job_get('', id)
self.assertIsNotNone(actual)
def test_job_destroy(self):
print '\nTesting job_destroy...'
id = self.job_ids.pop()
db_api.job_destroy('', id)
self.job_names.pop()
self.assertRaises(exception.NotFound, db_api.job_get, '', id)
"""
LogBookTest
"""
class LogBookTest(unittest.TestCase):
wf_ids = []
wf_names = []
lb_ids = []
lb_names = []
@classmethod
def setUpClass(cls):
wf_fmt = u'workflow_{}'
lb_tmp = db_api.logbook_create('', u'logbook_1', 1)
cls.lb_ids.append(1)
cls.lb_names.append(u'logbook_1')
for i in range(1, 10):
wf_tmp = db_api.workflow_create('', wf_fmt.format(i))
db_api.logbook_add_workflow('', 1, wf_fmt.format(i))
cls.wf_ids.append(i)
cls.wf_names.append(wf_fmt.format(i))
@classmethod
def tearDownClass(cls):
for name in cls.wf_names:
db_api.workflow_destroy('', name)
for id in cls.lb_ids:
db_api.logbook_destroy('', id)
cls.wf_ids = []
cls.wf_names = []
cls.lb_ids = []
cls.lb_names = []
def test_logbook_get(self):
print '\nTesting logbook_get...'
expected = self.lb_names[0]
actual = db_api.logbook_get('', self.lb_ids[0]).name
self.assertEquals(expected, actual)
self.assertRaises(exception.NotFound, db_api.logbook_get, '', 9001)
def test_logbook_get_by_name(self):
print '\nTesting logbook_get_by_name...'
expected = [self.lb_ids[0]]
actual = []
for logbook in db_api.logbook_get_by_name('', self.lb_names[0]):
actual.append(logbook.id)
self.assertEquals(expected, actual)
self.assertRaises(exception.NotFound, db_api.logbook_get_by_name, '', u'logbook_9001')
def test_logbook_create(self):
print '\nTesting logbook_create...'
id = 1
while (self.lb_ids.count(id) > 0):
id = id + 1
lb_tmp = db_api.logbook_create('', u'logbook_{}'.format(id), id)
self.lb_ids.append(id)
self.lb_names.append(u'logbook_{}'.format(id))
actual = db_api.logbook_get('', id)
self.assertIsNotNone(actual)
def test_logbook_get_workflows(self):
print '\nTesting logbook_get_workflows...'
expected = self.wf_ids
actual = []
wfs = db_api.logbook_get_workflows('', self.lb_ids[0])
for workflow in wfs:
actual.append(workflow.id)
self.assertEquals(expected, actual)
self.assertRaises(exception.NotFound, db_api.logbook_get_workflows, '', 9001)
def test_logbook_add_workflow(self):
print '\nTesting logbook_add_workflow...'
db_api.workflow_create('', u'workflow_10')
self.wf_ids.append(10)
self.wf_names.append(u'workflow_10')
expected = self.wf_ids
actual = []
temp = db_api.logbook_add_workflow('', 1, u'workflow_10')
for workflow in temp:
actual.append(workflow.id)
self.assertEquals(expected, actual)
self.assertRaises(exception.NotFound, db_api.logbook_add_workflow, '', 9001, u'workflow_10')
self.assertRaises(exception.NotFound, db_api.logbook_add_workflow, '', 1, u'workflow_9001')
def test_logbook_destroy(self):
print '\nTesting logbook_destroy...'
id = self.lb_ids.pop()
db_api.logbook_destroy('', id)
self.lb_names.pop()
self.assertRaises(exception.NotFound, db_api.logbook_get, '', id)
"""
WorkflowTest
"""
class WorkflowTest(unittest.TestCase):
tsk_ids = []
tsk_names = []
wf_ids = []
wf_names = []
@classmethod
def setUpClass(cls):
wf_fmt = u'workflow_{}'
tsk_fmt = u'task_{}'
for i in range(1, 10):
wf_tmp = db_api.workflow_create('', wf_fmt.format(i))
tsk_tmp = db_api.task_create('', tsk_fmt.format(i), i, i)
db_api.workflow_add_task('', wf_fmt.format(i), i)
cls.tsk_ids.append(i)
cls.tsk_names.append(tsk_fmt.format(i))
cls.wf_ids.append(i)
cls.wf_names.append(wf_fmt.format(i))
@classmethod
def teardownClass(cls):
for id in tsk_ids:
db_api.task_destroy('', id)
for name in wf_names:
db_api.workflow_destroy('', name)
cls.tsk_ids = []
cls.tsk_names = []
cls.wf_ids = []
cls.wf_names = []
def test_workflow_get(self):
print '\nTesting workflow_get...'
expected = self.wf_ids[0]
actual = db_api.workflow_get('', self.wf_names[0]).id
self.assertEquals(expected, actual)
self.assertRaises(exception.NotFound, db_api.workflow_get, '', u'workflow_9001')
def test_workflow_get_all(self):
print '\nTesting workflow_get_all...'
expected = self.wf_ids
actual = []
temp = db_api.workflow_get_all('')
for workflow in temp:
actual.append(workflow.id)
self.assertEquals(expected, actual)
def test_workflow_get_names(self):
print '\nTesting workflow_get_names...'
expected = []
for name in self.wf_names:
expected.append(name)
expected = tuple(expected)
expected = [expected]
actual = db_api.workflow_get_names('')
self.assertEquals(expected, actual)
def test_workflow_get_tasks(self):
print '\nTesting workflow_get_tasks...'
expected = [self.tsk_names[0], self.tsk_names[9]]
actual = []
temp = db_api.workflow_get_tasks('', u'workflow_1')
for task in temp:
actual.append(task.name)
self.assertEquals(expected, actual)
self.assertRaises(exception.NotFound, db_api.workflow_get_tasks, '', u'workflow_9001')
def test_workflow_add_task(self):
print '\nTesting workflow_add_task...'
tsk_tmp = db_api.task_create('', u'task_10', 1, 10)
db_api.workflow_add_task('', u'workflow_1', 10)
self.tsk_ids.append(10)
self.tsk_names.append('task_10')
expected = [self.tsk_names[0], self.tsk_names[9]]
tsks = db_api.workflow_get_tasks('', u'workflow_1')
actual = [tsks[0].name, tsks[1].name]
self.assertEquals(expected, actual)
self.assertRaises(exception.NotFound, db_api.workflow_add_task, '', u'workflow_9001', 10)
self.assertRaises(exception.NotFound, db_api.workflow_add_task, '', u'workflow_1', 9001)
def test_workflow_create(self):
print '\nTesting workflow_create...'
id = 0
while (self.wf_ids.count(id) > 0):
id = id + 1
wf_tmp = db_api.workflow_create('', u'workflow_{}'.format(id))
self.wf_ids.append(id)
self.wf_names.append(u'workflow_{}'.format(id))
self.assertIsNotNone(db_api.workflow_get('', u'workflow_{}'.format(id)))
def test_workflow_destroy(self):
print '\nTesting workflow_destroy...'
name = self.wf_names.pop()
db_api.workflow_destroy('', name)
self.wf_ids.pop()
self.assertRaises(exception.NotFound, db_api.workflow_get, '', name)
"""
TaskTest
"""
class TaskTest(unittest.TestCase):
tsk_ids = []
tsk_names = []
@classmethod
def setUpClass(cls):
tsk_fmt = u'task_{}'
for i in range(1, 10):
tsk_tmp = db_api.task_create('', tsk_fmt.format(i), i, i)
cls.tsk_ids.append(i)
cls.tsk_names.append(tsk_fmt.format(i))
@classmethod
def teardownClass(cls):
for id in tsk_ids:
db_api.task_destroy('', id)
cls.tsk_ids = []
cls.tsk_names = []
def test_task_get(self):
print '\nTesting task_get...'
expected = self.tsk_names[0]
actual = db_api.task_get('', self.tsk_ids[0])
self.assertEquals(expected, actual.name)
self.assertRaises(exception.NotFound, db_api.task_get, '', 9001)
def test_task_create(self):
print '\nTesting task_create...'
id = 1
while (self.tsk_ids.count(id) > 0):
id = id + 1
tsk_tmp = db_api.task_create('', u'task_{}'.format(id), 1, id)
self.tsk_ids.append(id)
self.tsk_names.append(u'task_{}'.format(id))
self.assertIsNotNone(db_api.task_get('', id))
def test_task_update(self):
print '\nTesting task_update...'
db_api.task_update('', 1, dict(exception='ExceptionTest', stacktrace='StacktraceTest'))
task = db_api.task_get('', 1)
expected = 'ExceptionTest'
actual = task.exception
self.assertEquals(expected, actual)
expected = 'StacktraceTest'
actual = task.stacktrace
self.assertEquals(expected, actual)
self.assertRaises(exception.NotFound, db_api.task_update, '', 9001, dict(exception='ExceptionTest', stacktrace='StacktraceTest'))
def test_task_destroy(self):
print '\nTesting task_destroy...'
id = self.tsk_ids.pop()
db_api.task_destroy('', id)
self.tsk_names.pop()
self.assertRaises(exception.NotFound, db_api.task_get, '', id)