Merge "Storage: restore injected data on resumption"
This commit is contained in:
@@ -65,6 +65,12 @@ class Storage(object):
|
||||
self._backend = backend
|
||||
self._flowdetail = flow_detail
|
||||
|
||||
injector_td = self._flowdetail.find_by_name(self.injector_name)
|
||||
if injector_td is not None and injector_td.results is not None:
|
||||
names = six.iterkeys(injector_td.results)
|
||||
self.set_result_mapping(injector_td.uuid,
|
||||
dict((name, name) for name in names))
|
||||
|
||||
def _with_connection(self, functor, *args, **kwargs):
|
||||
# NOTE(harlowja): Activate the given function with a backend
|
||||
# connection, if a backend is provided in the first place, otherwise
|
||||
@@ -229,16 +235,22 @@ class Storage(object):
|
||||
def inject(self, pairs):
|
||||
"""Add values into storage
|
||||
|
||||
This method should be used by job in order to put flow parameters
|
||||
into storage and put it to action.
|
||||
This method should be used to put flow parameters (requirements that
|
||||
are not satisified by any task in the flow) into storage.
|
||||
"""
|
||||
pairs = dict(pairs)
|
||||
injector_uuid = uuidutils.generate_uuid()
|
||||
self.add_task(injector_uuid, self.injector_name)
|
||||
self.save(injector_uuid, pairs)
|
||||
injector_td = self._flowdetail.find_by_name(self.injector_name)
|
||||
if injector_td is None:
|
||||
injector_uuid = uuidutils.generate_uuid()
|
||||
self.add_task(injector_uuid, self.injector_name)
|
||||
results = dict(pairs)
|
||||
else:
|
||||
injector_uuid = injector_td.uuid
|
||||
results = injector_td.results.copy()
|
||||
results.update(pairs)
|
||||
self.save(injector_uuid, results)
|
||||
names = six.iterkeys(results)
|
||||
self.set_result_mapping(injector_uuid,
|
||||
dict((name, name)
|
||||
for name in six.iterkeys(pairs)))
|
||||
dict((name, name) for name in names))
|
||||
|
||||
def set_result_mapping(self, uuid, mapping):
|
||||
"""Set mapping for naming task results
|
||||
@@ -253,6 +265,18 @@ class Storage(object):
|
||||
self._result_mappings[uuid] = mapping
|
||||
for name, index in six.iteritems(mapping):
|
||||
entries = self._reverse_mapping.setdefault(name, [])
|
||||
|
||||
# NOTE(imelnikov): We support setting same result mapping for
|
||||
# the same task twice (e.g when we are injecting 'a' and then
|
||||
# injecting 'a' again), so we should not log warning below in
|
||||
# that case and we should have only one item for each pair
|
||||
# (uuid, index) in entries. It should be put to the end of
|
||||
# entries list because order matters on fetching.
|
||||
try:
|
||||
entries.remove((uuid, index))
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
entries.append((uuid, index))
|
||||
if len(entries) > 1:
|
||||
LOG.warning("Multiple provider mappings being created for %r",
|
||||
|
||||
@@ -180,6 +180,33 @@ class StorageTest(test.TestCase):
|
||||
'spam': 'eggs',
|
||||
})
|
||||
|
||||
def test_inject_twice(self):
|
||||
s = self._get_storage()
|
||||
s.inject({'foo': 'bar'})
|
||||
self.assertEquals(s.fetch_all(), {'foo': 'bar'})
|
||||
s.inject({'spam': 'eggs'})
|
||||
self.assertEquals(s.fetch_all(), {
|
||||
'foo': 'bar',
|
||||
'spam': 'eggs',
|
||||
})
|
||||
|
||||
def test_inject_resumed(self):
|
||||
s = self._get_storage()
|
||||
s.inject({'foo': 'bar', 'spam': 'eggs'})
|
||||
# verify it's there
|
||||
self.assertEquals(s.fetch_all(), {
|
||||
'foo': 'bar',
|
||||
'spam': 'eggs',
|
||||
})
|
||||
# imagine we are resuming, so we need to make new
|
||||
# storage from same flow details
|
||||
s2 = storage.Storage(s._flowdetail, backend=self.backend)
|
||||
# injected data should still be there:
|
||||
self.assertEquals(s2.fetch_all(), {
|
||||
'foo': 'bar',
|
||||
'spam': 'eggs',
|
||||
})
|
||||
|
||||
def test_fetch_meapped_args(self):
|
||||
s = self._get_storage()
|
||||
s.inject({'foo': 'bar', 'spam': 'eggs'})
|
||||
|
||||
Reference in New Issue
Block a user