diff --git a/taskflow/storage.py b/taskflow/storage.py index e6b338a1..bee11f52 100644 --- a/taskflow/storage.py +++ b/taskflow/storage.py @@ -65,6 +65,12 @@ class Storage(object): self._backend = backend self._flowdetail = flow_detail + injector_td = self._flowdetail.find_by_name(self.injector_name) + if injector_td is not None and injector_td.results is not None: + names = six.iterkeys(injector_td.results) + self.set_result_mapping(injector_td.uuid, + dict((name, name) for name in names)) + def _with_connection(self, functor, *args, **kwargs): # NOTE(harlowja): Activate the given function with a backend # connection, if a backend is provided in the first place, otherwise @@ -229,16 +235,22 @@ class Storage(object): def inject(self, pairs): """Add values into storage - This method should be used by job in order to put flow parameters - into storage and put it to action. + This method should be used to put flow parameters (requirements that + are not satisified by any task in the flow) into storage. """ - pairs = dict(pairs) - injector_uuid = uuidutils.generate_uuid() - self.add_task(injector_uuid, self.injector_name) - self.save(injector_uuid, pairs) + injector_td = self._flowdetail.find_by_name(self.injector_name) + if injector_td is None: + injector_uuid = uuidutils.generate_uuid() + self.add_task(injector_uuid, self.injector_name) + results = dict(pairs) + else: + injector_uuid = injector_td.uuid + results = injector_td.results.copy() + results.update(pairs) + self.save(injector_uuid, results) + names = six.iterkeys(results) self.set_result_mapping(injector_uuid, - dict((name, name) - for name in six.iterkeys(pairs))) + dict((name, name) for name in names)) def set_result_mapping(self, uuid, mapping): """Set mapping for naming task results @@ -253,6 +265,18 @@ class Storage(object): self._result_mappings[uuid] = mapping for name, index in six.iteritems(mapping): entries = self._reverse_mapping.setdefault(name, []) + + # NOTE(imelnikov): We support setting same result mapping for + # the same task twice (e.g when we are injecting 'a' and then + # injecting 'a' again), so we should not log warning below in + # that case and we should have only one item for each pair + # (uuid, index) in entries. It should be put to the end of + # entries list because order matters on fetching. + try: + entries.remove((uuid, index)) + except ValueError: + pass + entries.append((uuid, index)) if len(entries) > 1: LOG.warning("Multiple provider mappings being created for %r", diff --git a/taskflow/tests/unit/test_storage.py b/taskflow/tests/unit/test_storage.py index c2019ab1..ec629158 100644 --- a/taskflow/tests/unit/test_storage.py +++ b/taskflow/tests/unit/test_storage.py @@ -180,6 +180,33 @@ class StorageTest(test.TestCase): 'spam': 'eggs', }) + def test_inject_twice(self): + s = self._get_storage() + s.inject({'foo': 'bar'}) + self.assertEquals(s.fetch_all(), {'foo': 'bar'}) + s.inject({'spam': 'eggs'}) + self.assertEquals(s.fetch_all(), { + 'foo': 'bar', + 'spam': 'eggs', + }) + + def test_inject_resumed(self): + s = self._get_storage() + s.inject({'foo': 'bar', 'spam': 'eggs'}) + # verify it's there + self.assertEquals(s.fetch_all(), { + 'foo': 'bar', + 'spam': 'eggs', + }) + # imagine we are resuming, so we need to make new + # storage from same flow details + s2 = storage.Storage(s._flowdetail, backend=self.backend) + # injected data should still be there: + self.assertEquals(s2.fetch_all(), { + 'foo': 'bar', + 'spam': 'eggs', + }) + def test_fetch_meapped_args(self): s = self._get_storage() s.inject({'foo': 'bar', 'spam': 'eggs'})