From 2fb146a994fb797e643f55afb9c79326e85fbb59 Mon Sep 17 00:00:00 2001 From: "Ivan A. Melnikov" Date: Thu, 17 Oct 2013 15:29:51 +0400 Subject: [PATCH] Storage: restore injected data on resumption In storage we now use single task for injector (we look it up by name) and we restore result mapping for that task in constructor if there is one in flow detail we are given. This has visible effect of injected data staying in storage after resumption. Closes-bug: #1240931 Change-Id: I2e8fb2c4b5cca769d36f578a934119db4a530f5c --- taskflow/storage.py | 40 +++++++++++++++++++++++------ taskflow/tests/unit/test_storage.py | 27 +++++++++++++++++++ 2 files changed, 59 insertions(+), 8 deletions(-) diff --git a/taskflow/storage.py b/taskflow/storage.py index e6b338a1..bee11f52 100644 --- a/taskflow/storage.py +++ b/taskflow/storage.py @@ -65,6 +65,12 @@ class Storage(object): self._backend = backend self._flowdetail = flow_detail + injector_td = self._flowdetail.find_by_name(self.injector_name) + if injector_td is not None and injector_td.results is not None: + names = six.iterkeys(injector_td.results) + self.set_result_mapping(injector_td.uuid, + dict((name, name) for name in names)) + def _with_connection(self, functor, *args, **kwargs): # NOTE(harlowja): Activate the given function with a backend # connection, if a backend is provided in the first place, otherwise @@ -229,16 +235,22 @@ class Storage(object): def inject(self, pairs): """Add values into storage - This method should be used by job in order to put flow parameters - into storage and put it to action. + This method should be used to put flow parameters (requirements that + are not satisified by any task in the flow) into storage. """ - pairs = dict(pairs) - injector_uuid = uuidutils.generate_uuid() - self.add_task(injector_uuid, self.injector_name) - self.save(injector_uuid, pairs) + injector_td = self._flowdetail.find_by_name(self.injector_name) + if injector_td is None: + injector_uuid = uuidutils.generate_uuid() + self.add_task(injector_uuid, self.injector_name) + results = dict(pairs) + else: + injector_uuid = injector_td.uuid + results = injector_td.results.copy() + results.update(pairs) + self.save(injector_uuid, results) + names = six.iterkeys(results) self.set_result_mapping(injector_uuid, - dict((name, name) - for name in six.iterkeys(pairs))) + dict((name, name) for name in names)) def set_result_mapping(self, uuid, mapping): """Set mapping for naming task results @@ -253,6 +265,18 @@ class Storage(object): self._result_mappings[uuid] = mapping for name, index in six.iteritems(mapping): entries = self._reverse_mapping.setdefault(name, []) + + # NOTE(imelnikov): We support setting same result mapping for + # the same task twice (e.g when we are injecting 'a' and then + # injecting 'a' again), so we should not log warning below in + # that case and we should have only one item for each pair + # (uuid, index) in entries. It should be put to the end of + # entries list because order matters on fetching. + try: + entries.remove((uuid, index)) + except ValueError: + pass + entries.append((uuid, index)) if len(entries) > 1: LOG.warning("Multiple provider mappings being created for %r", diff --git a/taskflow/tests/unit/test_storage.py b/taskflow/tests/unit/test_storage.py index c2019ab1..ec629158 100644 --- a/taskflow/tests/unit/test_storage.py +++ b/taskflow/tests/unit/test_storage.py @@ -180,6 +180,33 @@ class StorageTest(test.TestCase): 'spam': 'eggs', }) + def test_inject_twice(self): + s = self._get_storage() + s.inject({'foo': 'bar'}) + self.assertEquals(s.fetch_all(), {'foo': 'bar'}) + s.inject({'spam': 'eggs'}) + self.assertEquals(s.fetch_all(), { + 'foo': 'bar', + 'spam': 'eggs', + }) + + def test_inject_resumed(self): + s = self._get_storage() + s.inject({'foo': 'bar', 'spam': 'eggs'}) + # verify it's there + self.assertEquals(s.fetch_all(), { + 'foo': 'bar', + 'spam': 'eggs', + }) + # imagine we are resuming, so we need to make new + # storage from same flow details + s2 = storage.Storage(s._flowdetail, backend=self.backend) + # injected data should still be there: + self.assertEquals(s2.fetch_all(), { + 'foo': 'bar', + 'spam': 'eggs', + }) + def test_fetch_meapped_args(self): s = self._get_storage() s.inject({'foo': 'bar', 'spam': 'eggs'})