Merge "Storage: add flow name and uuid properties"

This commit is contained in:
Jenkins
2013-10-14 18:42:56 +00:00
committed by Gerrit Code Review
2 changed files with 38 additions and 0 deletions

View File

@@ -88,6 +88,14 @@ class Storage(object):
self._with_connection(save_both)
@property
def flow_name(self):
return self._flowdetail.name
@property
def flow_uuid(self):
return self._flowdetail.uuid
def _save_flow_detail(self, conn):
# NOTE(harlowja): we need to update our contained flow detail if
# the result of the update actually added more (aka another process

View File

@@ -21,9 +21,11 @@ import mock
from taskflow import exceptions
from taskflow.persistence.backends import impl_memory
from taskflow.persistence import logbook
from taskflow import states
from taskflow import storage
from taskflow import test
from taskflow.utils import misc
from taskflow.utils import persistence_utils as p_utils
@@ -48,6 +50,12 @@ class StorageTest(test.TestCase):
s.add_task('42', 'my task')
self.assertEquals(s.get_uuid_by_name('my task'), '42')
def test_flow_name_and_uuid(self):
fd = logbook.FlowDetail(name='test-fd', uuid='aaaa')
s = storage.Storage(flow_detail=fd)
self.assertEquals(s.flow_name, 'test-fd')
self.assertEquals(s.flow_uuid, 'aaaa')
def test_add_task(self):
s = self._get_storage()
s.add_task('42', 'my task')
@@ -79,6 +87,14 @@ class StorageTest(test.TestCase):
self.assertEquals(s.get('42'), 5)
self.assertEquals(s.get_task_state('42'), states.FAILURE)
def test_save_and_get_failure(self):
fail = misc.Failure(exc_info=(RuntimeError, RuntimeError(), None))
s = self._get_storage()
s.add_task('42', 'my task')
s.save('42', fail, states.FAILURE)
self.assertEquals(s.get('42'), fail)
self.assertEquals(s.get_task_state('42'), states.FAILURE)
def test_get_non_existing_var(self):
s = self._get_storage()
s.add_task('42', 'my task')
@@ -114,6 +130,20 @@ class StorageTest(test.TestCase):
"^Name 'xxx' is not mapped"):
s.fetch('xxx')
def test_default_task_progress(self):
s = self._get_storage()
s.add_task('42', 'my task')
self.assertEquals(s.get_task_progress('42'), 0.0)
self.assertEquals(s.get_task_progress_details('42'), None)
def test_task_progress(self):
s = self._get_storage()
s.add_task('42', 'my task')
s.set_task_progress('42', 0.5, test_data=11)
self.assertEquals(s.get_task_progress('42'), 0.5)
self.assertEquals(s.get_task_progress_details('42'),
{'test_data': 11})
def test_fetch_result_not_ready(self):
s = self._get_storage()
s.add_task('42', 'my task')