Merge "Add smarter/better/faster impl. of ensure_atoms"

This commit is contained in:
Jenkins
2015-07-08 23:42:05 +00:00
committed by Gerrit Code Review

View File

@@ -186,14 +186,67 @@ class Storage(object):
with contextlib.closing(self._backend.get_connection()) as conn:
return functor(conn, *args, **kwargs)
def ensure_atoms(self, atoms_iter):
@staticmethod
def _create_atom_detail(atom_name, atom_detail_cls,
atom_version=None, atom_state=states.PENDING):
ad = atom_detail_cls(atom_name, uuidutils.generate_uuid())
ad.state = atom_state
if atom_version is not None:
ad.version = atom_version
return ad
@fasteners.write_locked
def ensure_atoms(self, atoms):
"""Ensure there is an atomdetail for **each** of the given atoms.
Returns list of atomdetail uuids for each atom processed.
"""
atom_ids = []
for atom in atoms_iter:
atom_ids.append(self.ensure_atom(atom))
missing_ads = []
for i, atom in enumerate(atoms):
match = misc.match_type(atom, self._ensure_matchers)
if not match:
raise TypeError("Unknown atom '%s' (%s) requested to ensure"
% (atom, type(atom)))
atom_detail_cls, kind = match
atom_name = atom.name
if not atom_name:
raise ValueError("%s name must be non-empty" % (kind))
try:
atom_id = self._atom_name_to_uuid[atom_name]
except KeyError:
missing_ads.append((i, atom, atom_detail_cls))
# This will be later replaced with the uuid that is created...
atom_ids.append(None)
else:
ad = self._flowdetail.find(atom_id)
if not isinstance(ad, atom_detail_cls):
raise exceptions.Duplicate(
"Atom detail '%s' already exists in flow"
" detail '%s'" % (atom_name, self._flowdetail.name))
else:
atom_ids.append(ad.uuid)
self._set_result_mapping(atom_name, atom.save_as)
if missing_ads:
needs_to_be_created_ads = []
for (i, atom, atom_detail_cls) in missing_ads:
ad = self._create_atom_detail(
atom.name, atom_detail_cls,
atom_version=misc.get_version_string(atom))
needs_to_be_created_ads.append((i, atom, ad))
# Add the atom detail(s) to a clone, which upon success will be
# updated into the contained flow detail; if it does not get saved
# then no update will happen.
source, clone = self._fetch_flowdetail(clone=True)
for (_i, _atom, ad) in needs_to_be_created_ads:
clone.add(ad)
self._with_connection(self._save_flow_detail, source, clone)
# Insert the needed data, and get outta here...
for (i, atom, ad) in needs_to_be_created_ads:
atom_name = atom.name
atom_ids[i] = ad.uuid
self._atom_name_to_uuid[atom_name] = ad.uuid
self._set_result_mapping(atom_name, atom.save_as)
return atom_ids
def ensure_atom(self, atom):
@@ -201,63 +254,7 @@ class Storage(object):
Returns the uuid for the atomdetail that corresponds to the given atom.
"""
match = misc.match_type(atom, self._ensure_matchers)
if not match:
raise TypeError("Unknown atom '%s' (%s) requested to ensure"
% (atom, type(atom)))
else:
detail_cls, kind = match
atom_id = self._ensure_atom_detail(kind, detail_cls, atom.name,
misc.get_version_string(atom),
atom.save_as)
return atom_id
def _ensure_atom_detail(self, kind, detail_cls,
atom_name, atom_version, result_mapping):
"""Ensures there is a atomdetail that corresponds to the given atom.
If atom does not exist, adds a record for it. Added atom will have
PENDING state. Sets result mapping for the atom from result_mapping
argument.
Returns uuid for the atomdetails corresponding to the atom with
given name.
"""
if not atom_name:
raise ValueError("%s name must be non-empty" % (kind))
with self._lock.write_lock():
try:
atom_id = self._atom_name_to_uuid[atom_name]
except KeyError:
atom_id = uuidutils.generate_uuid()
self._create_atom_detail(detail_cls, atom_name,
atom_id, atom_version=atom_version)
else:
ad = self._flowdetail.find(atom_id)
if not isinstance(ad, detail_cls):
raise exceptions.Duplicate(
"Atom detail '%s' already exists in flow"
" detail '%s'" % (atom_name, self._flowdetail.name))
self._set_result_mapping(atom_name, result_mapping)
return atom_id
def _create_atom_detail(self, detail_cls, name, uuid, atom_version=None):
"""Add the atom detail to flow detail.
Atom becomes known to storage by that name and uuid.
Atom state is set to PENDING.
"""
ad = detail_cls(name, uuid)
ad.state = states.PENDING
ad.version = atom_version
# Add the atom detail to the clone, which upon success will be
# updated into the contained flow detail; if it does not get saved
# then no update will happen.
source, clone = self._fetch_flowdetail(clone=True)
clone.add(ad)
self._with_connection(self._save_flow_detail, source, clone)
self._atom_name_to_uuid[ad.name] = ad.uuid
return ad
return self.ensure_atoms([atom])[0]
@property
def flow_name(self):
@@ -631,9 +628,18 @@ class Storage(object):
expected_type=logbook.TaskDetail,
clone=True)
except exceptions.NotFound:
source = self._create_atom_detail(logbook.TaskDetail,
self.injector_name,
uuidutils.generate_uuid())
# Ensure we have our special task detail...
#
# TODO(harlowja): get this removed when
# https://review.openstack.org/#/c/165645/ merges.
source = self._create_atom_detail(self.injector_name,
logbook.TaskDetail,
atom_state=None)
fd_source, fd_clone = self._fetch_flowdetail(clone=True)
fd_clone.add(source)
self._with_connection(self._save_flow_detail, fd_source,
fd_clone)
self._atom_name_to_uuid[source.name] = source.uuid
clone = source
clone.results = dict(pairs)
clone.state = states.SUCCESS