Storage: restore injected data on resumption
In storage we now use single task for injector (we look it up by name) and we restore result mapping for that task in constructor if there is one in flow detail we are given. This has visible effect of injected data staying in storage after resumption. Closes-bug: #1240931 Change-Id: I2e8fb2c4b5cca769d36f578a934119db4a530f5c
This commit is contained in:
@@ -65,6 +65,12 @@ class Storage(object):
|
||||
self._backend = backend
|
||||
self._flowdetail = flow_detail
|
||||
|
||||
injector_td = self._flowdetail.find_by_name(self.injector_name)
|
||||
if injector_td is not None and injector_td.results is not None:
|
||||
names = six.iterkeys(injector_td.results)
|
||||
self.set_result_mapping(injector_td.uuid,
|
||||
dict((name, name) for name in names))
|
||||
|
||||
def _with_connection(self, functor, *args, **kwargs):
|
||||
# NOTE(harlowja): Activate the given function with a backend
|
||||
# connection, if a backend is provided in the first place, otherwise
|
||||
@@ -229,16 +235,22 @@ class Storage(object):
|
||||
def inject(self, pairs):
|
||||
"""Add values into storage
|
||||
|
||||
This method should be used by job in order to put flow parameters
|
||||
into storage and put it to action.
|
||||
This method should be used to put flow parameters (requirements that
|
||||
are not satisified by any task in the flow) into storage.
|
||||
"""
|
||||
pairs = dict(pairs)
|
||||
injector_uuid = uuidutils.generate_uuid()
|
||||
self.add_task(injector_uuid, self.injector_name)
|
||||
self.save(injector_uuid, pairs)
|
||||
injector_td = self._flowdetail.find_by_name(self.injector_name)
|
||||
if injector_td is None:
|
||||
injector_uuid = uuidutils.generate_uuid()
|
||||
self.add_task(injector_uuid, self.injector_name)
|
||||
results = dict(pairs)
|
||||
else:
|
||||
injector_uuid = injector_td.uuid
|
||||
results = injector_td.results.copy()
|
||||
results.update(pairs)
|
||||
self.save(injector_uuid, results)
|
||||
names = six.iterkeys(results)
|
||||
self.set_result_mapping(injector_uuid,
|
||||
dict((name, name)
|
||||
for name in six.iterkeys(pairs)))
|
||||
dict((name, name) for name in names))
|
||||
|
||||
def set_result_mapping(self, uuid, mapping):
|
||||
"""Set mapping for naming task results
|
||||
@@ -253,6 +265,18 @@ class Storage(object):
|
||||
self._result_mappings[uuid] = mapping
|
||||
for name, index in six.iteritems(mapping):
|
||||
entries = self._reverse_mapping.setdefault(name, [])
|
||||
|
||||
# NOTE(imelnikov): We support setting same result mapping for
|
||||
# the same task twice (e.g when we are injecting 'a' and then
|
||||
# injecting 'a' again), so we should not log warning below in
|
||||
# that case and we should have only one item for each pair
|
||||
# (uuid, index) in entries. It should be put to the end of
|
||||
# entries list because order matters on fetching.
|
||||
try:
|
||||
entries.remove((uuid, index))
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
entries.append((uuid, index))
|
||||
if len(entries) > 1:
|
||||
LOG.warning("Multiple provider mappings being created for %r",
|
||||
|
||||
@@ -180,6 +180,33 @@ class StorageTest(test.TestCase):
|
||||
'spam': 'eggs',
|
||||
})
|
||||
|
||||
def test_inject_twice(self):
|
||||
s = self._get_storage()
|
||||
s.inject({'foo': 'bar'})
|
||||
self.assertEquals(s.fetch_all(), {'foo': 'bar'})
|
||||
s.inject({'spam': 'eggs'})
|
||||
self.assertEquals(s.fetch_all(), {
|
||||
'foo': 'bar',
|
||||
'spam': 'eggs',
|
||||
})
|
||||
|
||||
def test_inject_resumed(self):
|
||||
s = self._get_storage()
|
||||
s.inject({'foo': 'bar', 'spam': 'eggs'})
|
||||
# verify it's there
|
||||
self.assertEquals(s.fetch_all(), {
|
||||
'foo': 'bar',
|
||||
'spam': 'eggs',
|
||||
})
|
||||
# imagine we are resuming, so we need to make new
|
||||
# storage from same flow details
|
||||
s2 = storage.Storage(s._flowdetail, backend=self.backend)
|
||||
# injected data should still be there:
|
||||
self.assertEquals(s2.fetch_all(), {
|
||||
'foo': 'bar',
|
||||
'spam': 'eggs',
|
||||
})
|
||||
|
||||
def test_fetch_meapped_args(self):
|
||||
s = self._get_storage()
|
||||
s.inject({'foo': 'bar', 'spam': 'eggs'})
|
||||
|
||||
Reference in New Issue
Block a user