This commit is contained in:
Kevin Chen
2013-05-30 11:32:21 -05:00

View File

@@ -31,404 +31,453 @@ from taskflow.openstack.common import exception
db_api.configure() db_api.configure()
db_api.SQL_CONNECTION = 'sqlite:///test.db' db_api.SQL_CONNECTION = 'sqlite:///test.db'
def setUpModule(): def setUpModule():
if not path.isfile('test.db'): if not path.isfile('test.db'):
models.create_tables() models.create_tables()
def tearDownModule(): def tearDownModule():
os.remove('test.db') os.remove('test.db')
""" """
JobTest JobTest
""" """
class JobTest(unittest.TestCase): class JobTest(unittest.TestCase):
wf_ids = [] wf_ids = []
wf_names = [] wf_names = []
lb_ids = [] lb_ids = []
lb_names = [] lb_names = []
job_ids = [] job_ids = []
job_names = [] job_names = []
@classmethod @classmethod
def setUpClass(cls): def setUpClass(cls):
wf_fmt = u'workflow_{}' wf_fmt = u'workflow_{}'
lb_tmp = db_api.logbook_create('', u'logbook_1', 1) lb_tmp = db_api.logbook_create('', u'logbook_1', 1)
cls.lb_ids.append(1) cls.lb_ids.append(1)
cls.lb_names.append(u'logbook_1') cls.lb_names.append(u'logbook_1')
job_tmp = db_api.job_create('', u'job_1', 1) job_tmp = db_api.job_create('', u'job_1', 1)
cls.job_ids.append(1) cls.job_ids.append(1)
cls.job_names.append(u'job_1') cls.job_names.append(u'job_1')
for i in range(1, 10): for i in range(1, 10):
wf_tmp = db_api.workflow_create('', wf_fmt.format(i)) wf_tmp = db_api.workflow_create('', wf_fmt.format(i))
db_api.logbook_add_workflow('', 1, wf_fmt.format(i)) db_api.logbook_add_workflow('', 1, wf_fmt.format(i))
db_api.job_add_workflow('', 1, wf_fmt.format(i)) db_api.job_add_workflow('', 1, wf_fmt.format(i))
cls.wf_ids.append(i) cls.wf_ids.append(i)
cls.wf_names.append(wf_fmt.format(i)) cls.wf_names.append(wf_fmt.format(i))
@classmethod @classmethod
def tearDownClass(cls): def tearDownClass(cls):
for name in cls.wf_names: for name in cls.wf_names:
db_api.workflow_destroy('', name) db_api.workflow_destroy('', name)
for id in cls.lb_ids: for id in cls.lb_ids:
db_api.logbook_destroy('', id) db_api.logbook_destroy('', id)
for id in cls.job_ids: for id in cls.job_ids:
db_api.job_destroy('', id) db_api.job_destroy('', id)
cls.wf_ids = [] cls.wf_ids = []
cls.wf_names = [] cls.wf_names = []
cls.lb_ids = [] cls.lb_ids = []
cls.lb_names = [] cls.lb_names = []
cls.job_ids = [] cls.job_ids = []
cls.job_names = [] cls.job_names = []
def test_job_get(self): def test_job_get(self):
expected = self.job_names[0] print '\nTesting job_get...'
actual = db_api.job_get('', self.job_ids[0]).name expected = self.job_names[0]
actual = db_api.job_get('', self.job_ids[0]).name
self.assertEquals(expected, actual) self.assertEquals(expected, actual)
self.assertRaises(exception.NotFound, db_api.job_get, '', 9001) self.assertRaises(exception.NotFound, db_api.job_get, '', 9001)
def test_job_update(self): def test_job_update(self):
db_api.job_update('', 1, dict(owner='OwnerTest', state=states.CLAIMED)) print '\nTesting job_update...'
job = db_api.job_get('', 1) db_api.job_update('', 1, dict(owner='OwnerTest', state=states.CLAIMED))
job = db_api.job_get('', 1)
expected = 'OwnerTest' expected = 'OwnerTest'
actual = job.owner actual = job.owner
self.assertEquals(expected, actual) self.assertEquals(expected, actual)
expected = states.CLAIMED expected = states.CLAIMED
actual = job.state actual = job.state
self.assertEquals(expected, actual) self.assertEquals(expected, actual)
self.assertRaises(exception.NotFound, db_api.job_update, '', 9001, dict(owner='OwnerTest', state=states.CLAIMED)) self.assertRaises(exception.NotFound, db_api.job_update, '', 9001,
dict(owner='OwnerTest', state=states.CLAIMED))
def test_job_add_workflow(self): def test_job_add_workflow(self):
db_api.workflow_create('', u'workflow_10') print '\nTesting job_add_workflow...'
self.wf_ids.append(10) db_api.workflow_create('', u'workflow_10')
self.wf_names.append(u'workflow_10') self.wf_ids.append(10)
self.wf_names.append(u'workflow_10')
expected = self.wf_ids expected = self.wf_ids
actual = [] actual = []
temp = db_api.job_add_workflow('', 1, u'workflow_10') temp = db_api.job_add_workflow('', 1, u'workflow_10')
for workflow in temp: for workflow in temp:
actual.append(workflow.id) actual.append(workflow.id)
self.assertEquals(expected, actual) self.assertEquals(expected, actual)
self.assertRaises(exception.NotFound, db_api.job_add_workflow, '', 9001, u'workflow_10') self.assertRaises(exception.NotFound, db_api.job_add_workflow, '',
self.assertRaises(exception.NotFound, db_api.job_add_workflow, '', 1, u'workflow_9001') 9001, u'workflow_10')
self.assertRaises(exception.NotFound, db_api.job_add_workflow, '',
1, u'workflow_9001')
def test_job_get_owner(self): def test_job_get_owner(self):
actual = db_api.job_get_owner('', 1) print '\nTesting job_get_owner...'
actual = db_api.job_get_owner('', 1)
self.assertIsNone(actual) self.assertIsNone(actual)
self.assertRaises(exception.NotFound, db_api.job_get_owner, '', 9001) self.assertRaises(exception.NotFound, db_api.job_get_owner, '', 9001)
def test_job_get_state(self): def test_job_get_state(self):
expected = states.UNCLAIMED print '\nTesting job_get_state...'
actual = db_api.job_get_state('', 1) expected = states.UNCLAIMED
actual = db_api.job_get_state('', 1)
self.assertEquals(expected, actual) self.assertEquals(expected, actual)
self.assertRaises(exception.NotFound, db_api.job_get_state, '', 9001) self.assertRaises(exception.NotFound, db_api.job_get_state, '', 9001)
def test_job_get_logbook(self): def test_job_get_logbook(self):
expected = self.lb_names[0] print '\nTesting job_get_logbook...'
actual = db_api.job_get_logbook('', 1).name expected = self.lb_names[0]
actual = db_api.job_get_logbook('', 1).name
self.assertEquals(expected, actual) self.assertEquals(expected, actual)
self.assertRaises(exception.NotFound, db_api.job_get_logbook, '', 9001) self.assertRaises(exception.NotFound, db_api.job_get_logbook, '', 9001)
def test_job_create(self): def test_job_create(self):
id = 1 print '\nTesting job_create...'
while (self.job_ids.count(id) > 0): id = 1
id = id + 1 while (self.job_ids.count(id) > 0):
job_tmp = db_api.job_create('', u'job_{}'.format(id), id) id = id + 1
self.job_ids.append(id) job_tmp = db_api.job_create('', u'job_{}'.format(id), id)
self.job_names.append(u'job_{}'.format(id)) self.job_ids.append(id)
self.job_names.append(u'job_{}'.format(id))
actual = db_api.job_get('', id) actual = db_api.job_get('', id)
self.assertIsNotNone(actual) self.assertIsNotNone(actual)
def test_job_destroy(self): def test_job_destroy(self):
id = self.job_ids.pop() print '\nTesting job_destroy...'
db_api.job_destroy('', id) id = self.job_ids.pop()
self.job_names.pop() db_api.job_destroy('', id)
self.job_names.pop()
self.assertRaises(exception.NotFound, db_api.job_get, '', id) self.assertRaises(exception.NotFound, db_api.job_get, '', id)
""" """
LogBookTest LogBookTest
""" """
class LogBookTest(unittest.TestCase): class LogBookTest(unittest.TestCase):
wf_ids = [] wf_ids = []
wf_names = [] wf_names = []
lb_ids = [] lb_ids = []
lb_names = [] lb_names = []
@classmethod @classmethod
def setUpClass(cls): def setUpClass(cls):
wf_fmt = u'workflow_{}' wf_fmt = u'workflow_{}'
lb_tmp = db_api.logbook_create('', u'logbook_1', 1) lb_tmp = db_api.logbook_create('', u'logbook_1', 1)
cls.lb_ids.append(1) cls.lb_ids.append(1)
cls.lb_names.append(u'logbook_1') cls.lb_names.append(u'logbook_1')
for i in range(1, 10): for i in range(1, 10):
wf_tmp = db_api.workflow_create('', wf_fmt.format(i)) wf_tmp = db_api.workflow_create('', wf_fmt.format(i))
db_api.logbook_add_workflow('', 1, wf_fmt.format(i)) db_api.logbook_add_workflow('', 1, wf_fmt.format(i))
cls.wf_ids.append(i) cls.wf_ids.append(i)
cls.wf_names.append(wf_fmt.format(i)) cls.wf_names.append(wf_fmt.format(i))
@classmethod @classmethod
def tearDownClass(cls): def tearDownClass(cls):
for name in cls.wf_names: for name in cls.wf_names:
db_api.workflow_destroy('', name) db_api.workflow_destroy('', name)
for id in cls.lb_ids: for id in cls.lb_ids:
db_api.logbook_destroy('', id) db_api.logbook_destroy('', id)
cls.wf_ids = [] cls.wf_ids = []
cls.wf_names = [] cls.wf_names = []
cls.lb_ids = [] cls.lb_ids = []
cls.lb_names = [] cls.lb_names = []
def test_logbook_get(self): def test_logbook_get(self):
expected = self.lb_names[0] print '\nTesting logbook_get...'
actual = db_api.logbook_get('', self.lb_ids[0]).name expected = self.lb_names[0]
actual = db_api.logbook_get('', self.lb_ids[0]).name
self.assertEquals(expected, actual) self.assertEquals(expected, actual)
self.assertRaises(exception.NotFound, db_api.logbook_get, '', 9001) self.assertRaises(exception.NotFound, db_api.logbook_get, '', 9001)
def test_logbook_get_by_name(self): def test_logbook_get_by_name(self):
expected = [self.lb_ids[0]] print '\nTesting logbook_get_by_name...'
actual = [] expected = [self.lb_ids[0]]
for logbook in db_api.logbook_get_by_name('', self.lb_names[0]): actual = []
actual.append(logbook.id) for logbook in db_api.logbook_get_by_name('', self.lb_names[0]):
actual.append(logbook.id)
self.assertEquals(expected, actual) self.assertEquals(expected, actual)
self.assertRaises(exception.NotFound, db_api.logbook_get_by_name, '', u'logbook_9001') self.assertRaises(exception.NotFound, db_api.logbook_get_by_name, '',
u'logbook_9001')
def test_logbook_create(self): def test_logbook_create(self):
id = 1 print '\nTesting logbook_create...'
while (self.lb_ids.count(id) > 0): id = 1
id = id + 1 while (self.lb_ids.count(id) > 0):
lb_tmp = db_api.logbook_create('', u'logbook_{}'.format(id), id) id = id + 1
self.lb_ids.append(id) lb_tmp = db_api.logbook_create('', u'logbook_{}'.format(id), id)
self.lb_names.append(u'logbook_{}'.format(id)) self.lb_ids.append(id)
self.lb_names.append(u'logbook_{}'.format(id))
actual = db_api.logbook_get('', id) actual = db_api.logbook_get('', id)
self.assertIsNotNone(actual) self.assertIsNotNone(actual)
def test_logbook_get_workflows(self): def test_logbook_get_workflows(self):
expected = self.wf_ids print '\nTesting logbook_get_workflows...'
actual = [] expected = self.wf_ids
wfs = db_api.logbook_get_workflows('', self.lb_ids[0]) actual = []
wfs = db_api.logbook_get_workflows('', self.lb_ids[0])
for workflow in wfs: for workflow in wfs:
actual.append(workflow.id) actual.append(workflow.id)
self.assertEquals(expected, actual) self.assertEquals(expected, actual)
self.assertRaises(exception.NotFound, db_api.logbook_get_workflows, '', 9001) self.assertRaises(exception.NotFound, db_api.logbook_get_workflows,
'', 9001)
def test_logbook_add_workflow(self): def test_logbook_add_workflow(self):
db_api.workflow_create('', u'workflow_10') print '\nTesting logbook_add_workflow...'
self.wf_ids.append(10) db_api.workflow_create('', u'workflow_10')
self.wf_names.append(u'workflow_10') self.wf_ids.append(10)
self.wf_names.append(u'workflow_10')
expected = self.wf_ids expected = self.wf_ids
actual = [] actual = []
temp = db_api.logbook_add_workflow('', 1, u'workflow_10') temp = db_api.logbook_add_workflow('', 1, u'workflow_10')
for workflow in temp: for workflow in temp:
actual.append(workflow.id) actual.append(workflow.id)
self.assertEquals(expected, actual) self.assertEquals(expected, actual)
self.assertRaises(exception.NotFound, db_api.logbook_add_workflow, '', 9001, u'workflow_10') self.assertRaises(exception.NotFound, db_api.logbook_add_workflow, '',
self.assertRaises(exception.NotFound, db_api.logbook_add_workflow, '', 1, u'workflow_9001') 9001, u'workflow_10')
self.assertRaises(exception.NotFound, db_api.logbook_add_workflow, '',
1, u'workflow_9001')
def test_logbook_destroy(self): def test_logbook_destroy(self):
id = self.lb_ids.pop() print '\nTesting logbook_destroy...'
db_api.logbook_destroy('', id) id = self.lb_ids.pop()
self.lb_names.pop() db_api.logbook_destroy('', id)
self.lb_names.pop()
self.assertRaises(exception.NotFound, db_api.logbook_get, '', id) self.assertRaises(exception.NotFound, db_api.logbook_get, '', id)
""" """
WorkflowTest WorkflowTest
""" """
class WorkflowTest(unittest.TestCase): class WorkflowTest(unittest.TestCase):
tsk_ids = [] tsk_ids = []
tsk_names = [] tsk_names = []
wf_ids = [] wf_ids = []
wf_names = [] wf_names = []
@classmethod @classmethod
def setUpClass(cls): def setUpClass(cls):
wf_fmt = u'workflow_{}' wf_fmt = u'workflow_{}'
tsk_fmt = u'task_{}' tsk_fmt = u'task_{}'
for i in range(1, 10): for i in range(1, 10):
wf_tmp = db_api.workflow_create('', wf_fmt.format(i)) wf_tmp = db_api.workflow_create('', wf_fmt.format(i))
tsk_tmp = db_api.task_create('', tsk_fmt.format(i), i, i) tsk_tmp = db_api.task_create('', tsk_fmt.format(i), i, i)
db_api.workflow_add_task('', wf_fmt.format(i), i) db_api.workflow_add_task('', wf_fmt.format(i), i)
cls.tsk_ids.append(i) cls.tsk_ids.append(i)
cls.tsk_names.append(tsk_fmt.format(i)) cls.tsk_names.append(tsk_fmt.format(i))
cls.wf_ids.append(i) cls.wf_ids.append(i)
cls.wf_names.append(wf_fmt.format(i)) cls.wf_names.append(wf_fmt.format(i))
@classmethod @classmethod
def teardownClass(cls): def teardownClass(cls):
for id in cls.tsk_ids: for id in cls.tsk_ids:
db_api.task_destroy('', id) db_api.task_destroy('', id)
for name in cls.wf_names: for name in cls.wf_names:
db_api.workflow_destroy('', name) db_api.workflow_destroy('', name)
cls.tsk_ids = [] cls.tsk_ids = []
cls.tsk_names = [] cls.tsk_names = []
cls.wf_ids = [] cls.wf_ids = []
cls.wf_names = [] cls.wf_names = []
def test_workflow_get(self): def test_workflow_get(self):
expected = self.wf_ids[0] print '\nTesting workflow_get...'
actual = db_api.workflow_get('', self.wf_names[0]).id expected = self.wf_ids[0]
actual = db_api.workflow_get('', self.wf_names[0]).id
self.assertEquals(expected, actual) self.assertEquals(expected, actual)
self.assertRaises(exception.NotFound, db_api.workflow_get, '', u'workflow_9001') self.assertRaises(exception.NotFound, db_api.workflow_get, '',
u'workflow_9001')
def test_workflow_get_all(self): def test_workflow_get_all(self):
expected = self.wf_ids print '\nTesting workflow_get_all...'
actual = [] expected = self.wf_ids
temp = db_api.workflow_get_all('') actual = []
temp = db_api.workflow_get_all('')
for workflow in temp: for workflow in temp:
actual.append(workflow.id) actual.append(workflow.id)
self.assertEquals(expected, actual) self.assertEquals(expected, actual)
def test_workflow_get_names(self): def test_workflow_get_names(self):
expected = [] print '\nTesting workflow_get_names...'
for name in self.wf_names: expected = []
expected.append(name) for name in self.wf_names:
expected = tuple(expected) expected.append(name)
expected = [expected] expected = tuple(expected)
actual = db_api.workflow_get_names('') expected = [expected]
actual = db_api.workflow_get_names('')
self.assertEquals(expected, actual) self.assertEquals(expected, actual)
def test_workflow_get_tasks(self): def test_workflow_get_tasks(self):
expected = [self.tsk_names[0], self.tsk_names[9]] print '\nTesting workflow_get_tasks...'
actual = [] expected = [self.tsk_names[0], self.tsk_names[9]]
temp = db_api.workflow_get_tasks('', u'workflow_1') actual = []
temp = db_api.workflow_get_tasks('', u'workflow_1')
for task in temp: for task in temp:
actual.append(task.name) actual.append(task.name)
self.assertEquals(expected, actual) self.assertEquals(expected, actual)
self.assertRaises(exception.NotFound, db_api.workflow_get_tasks, '', u'workflow_9001') self.assertRaises(exception.NotFound, db_api.workflow_get_tasks, '',
u'workflow_9001')
def test_workflow_add_task(self): def test_workflow_add_task(self):
tsk_tmp = db_api.task_create('', u'task_10', 1, 10) print '\nTesting workflow_add_task...'
db_api.workflow_add_task('', u'workflow_1', 10) tsk_tmp = db_api.task_create('', u'task_10', 1, 10)
self.tsk_ids.append(10) db_api.workflow_add_task('', u'workflow_1', 10)
self.tsk_names.append('task_10') self.tsk_ids.append(10)
expected = [self.tsk_names[0], self.tsk_names[9]] self.tsk_names.append('task_10')
tsks = db_api.workflow_get_tasks('', u'workflow_1') expected = [self.tsk_names[0], self.tsk_names[9]]
actual = [tsks[0].name, tsks[1].name] tsks = db_api.workflow_get_tasks('', u'workflow_1')
actual = [tsks[0].name, tsks[1].name]
self.assertEquals(expected, actual) self.assertEquals(expected, actual)
self.assertRaises(exception.NotFound, db_api.workflow_add_task, '', u'workflow_9001', 10) self.assertRaises(exception.NotFound, db_api.workflow_add_task, '',
self.assertRaises(exception.NotFound, db_api.workflow_add_task, '', u'workflow_1', 9001) u'workflow_9001', 10)
self.assertRaises(exception.NotFound, db_api.workflow_add_task, '',
u'workflow_1', 9001)
def test_workflow_create(self): def test_workflow_create(self):
id = 0 print '\nTesting workflow_create...'
while (self.wf_ids.count(id) > 0): id = 0
id = id + 1 while (self.wf_ids.count(id) > 0):
wf_tmp = db_api.workflow_create('', u'workflow_{}'.format(id)) id = id + 1
self.wf_ids.append(id) wf_tmp = db_api.workflow_create('', u'workflow_{}'.format(id))
self.wf_names.append(u'workflow_{}'.format(id)) self.wf_ids.append(id)
self.wf_names.append(u'workflow_{}'.format(id))
self.assertIsNotNone(db_api.workflow_get('', u'workflow_{}'.format(id))) self.assertIsNotNone(db_api.workflow_get('',
u'workflow_{}'.format(id)))
def test_workflow_destroy(self): def test_workflow_destroy(self):
name = self.wf_names.pop() print '\nTesting workflow_destroy...'
db_api.workflow_destroy('', name) name = self.wf_names.pop()
self.wf_ids.pop() db_api.workflow_destroy('', name)
self.wf_ids.pop()
self.assertRaises(exception.NotFound, db_api.workflow_get, '', name) self.assertRaises(exception.NotFound, db_api.workflow_get, '', name)
""" """
TaskTest TaskTest
""" """
class TaskTest(unittest.TestCase): class TaskTest(unittest.TestCase):
tsk_ids = [] tsk_ids = []
tsk_names = [] tsk_names = []
@classmethod @classmethod
def setUpClass(cls): def setUpClass(cls):
tsk_fmt = u'task_{}' tsk_fmt = u'task_{}'
for i in range(1, 10): for i in range(1, 10):
tsk_tmp = db_api.task_create('', tsk_fmt.format(i), i, i) tsk_tmp = db_api.task_create('', tsk_fmt.format(i), i, i)
cls.tsk_ids.append(i) cls.tsk_ids.append(i)
cls.tsk_names.append(tsk_fmt.format(i)) cls.tsk_names.append(tsk_fmt.format(i))
@classmethod @classmethod
def teardownClass(cls): def teardownClass(cls):
for id in cls.tsk_ids: for id in cls.tsk_ids:
db_api.task_destroy('', id) db_api.task_destroy('', id)
cls.tsk_ids = [] cls.tsk_ids = []
cls.tsk_names = [] cls.tsk_names = []
def test_task_get(self): def test_task_get(self):
expected = self.tsk_names[0] print '\nTesting task_get...'
actual = db_api.task_get('', self.tsk_ids[0]) expected = self.tsk_names[0]
actual = db_api.task_get('', self.tsk_ids[0])
self.assertEquals(expected, actual.name) self.assertEquals(expected, actual.name)
self.assertRaises(exception.NotFound, db_api.task_get, '', 9001) self.assertRaises(exception.NotFound, db_api.task_get, '', 9001)
def test_task_create(self): def test_task_create(self):
id = 1 print '\nTesting task_create...'
while (self.tsk_ids.count(id) > 0): id = 1
id = id + 1 while (self.tsk_ids.count(id) > 0):
tsk_tmp = db_api.task_create('', u'task_{}'.format(id), 1, id) id = id + 1
self.tsk_ids.append(id) tsk_tmp = db_api.task_create('', u'task_{}'.format(id), 1, id)
self.tsk_names.append(u'task_{}'.format(id)) self.tsk_ids.append(id)
self.tsk_names.append(u'task_{}'.format(id))
self.assertIsNotNone(db_api.task_get('', id)) self.assertIsNotNone(db_api.task_get('', id))
def test_task_update(self): def test_task_update(self):
db_api.task_update('', 1, dict(exception='ExceptionTest', stacktrace='StacktraceTest')) print '\nTesting task_update...'
task = db_api.task_get('', 1) db_api.task_update('', 1, dict(exception='ExceptionTest',
stacktrace='StacktraceTest'))
task = db_api.task_get('', 1)
expected = 'ExceptionTest' expected = 'ExceptionTest'
actual = task.exception actual = task.exception
self.assertEquals(expected, actual) self.assertEquals(expected, actual)
expected = 'StacktraceTest' expected = 'StacktraceTest'
actual = task.stacktrace actual = task.stacktrace
self.assertEquals(expected, actual) self.assertEquals(expected, actual)
self.assertRaises(exception.NotFound, db_api.task_update, '', 9001, dict(exception='ExceptionTest', stacktrace='StacktraceTest')) self.assertRaises(exception.NotFound, db_api.task_update, '', 9001,
dict(exception='ExceptionTest', stacktrace='StacktraceTest'))
def test_task_destroy(self): def test_task_destroy(self):
id = self.tsk_ids.pop() print '\nTesting task_destroy...'
db_api.task_destroy('', id) id = self.tsk_ids.pop()
self.tsk_names.pop() db_api.task_destroy('', id)
self.tsk_names.pop()
self.assertRaises(exception.NotFound, db_api.task_get, '', id) self.assertRaises(exception.NotFound, db_api.task_get, '', id)