Add smarter/better/faster impl. of ensure_atoms

Instead of ensuring an atom at a time we can examine
at bulk what atoms details need to be created, and then
only do any persistence operations if any atom details
are determined to not exist. This saves on unneeded
reads and writes and helps improve storage and prepare
times.

Part of blueprint make-things-speedy

Change-Id: I2e7e3cc60cc97cbbe4e66c69e22c34ee31ebfeb3
This commit is contained in:
Joshua Harlow
2015-06-11 16:46:06 -07:00
parent b7bb295fb0
commit 4d0200f92e

View File

@@ -186,14 +186,67 @@ class Storage(object):
with contextlib.closing(self._backend.get_connection()) as conn:
return functor(conn, *args, **kwargs)
def ensure_atoms(self, atoms_iter):
@staticmethod
def _create_atom_detail(atom_name, atom_detail_cls,
atom_version=None, atom_state=states.PENDING):
ad = atom_detail_cls(atom_name, uuidutils.generate_uuid())
ad.state = atom_state
if atom_version is not None:
ad.version = atom_version
return ad
@fasteners.write_locked
def ensure_atoms(self, atoms):
"""Ensure there is an atomdetail for **each** of the given atoms.
Returns list of atomdetail uuids for each atom processed.
"""
atom_ids = []
for atom in atoms_iter:
atom_ids.append(self.ensure_atom(atom))
missing_ads = []
for i, atom in enumerate(atoms):
match = misc.match_type(atom, self._ensure_matchers)
if not match:
raise TypeError("Unknown atom '%s' (%s) requested to ensure"
% (atom, type(atom)))
atom_detail_cls, kind = match
atom_name = atom.name
if not atom_name:
raise ValueError("%s name must be non-empty" % (kind))
try:
atom_id = self._atom_name_to_uuid[atom_name]
except KeyError:
missing_ads.append((i, atom, atom_detail_cls))
# This will be later replaced with the uuid that is created...
atom_ids.append(None)
else:
ad = self._flowdetail.find(atom_id)
if not isinstance(ad, atom_detail_cls):
raise exceptions.Duplicate(
"Atom detail '%s' already exists in flow"
" detail '%s'" % (atom_name, self._flowdetail.name))
else:
atom_ids.append(ad.uuid)
self._set_result_mapping(atom_name, atom.save_as)
if missing_ads:
needs_to_be_created_ads = []
for (i, atom, atom_detail_cls) in missing_ads:
ad = self._create_atom_detail(
atom.name, atom_detail_cls,
atom_version=misc.get_version_string(atom))
needs_to_be_created_ads.append((i, atom, ad))
# Add the atom detail(s) to a clone, which upon success will be
# updated into the contained flow detail; if it does not get saved
# then no update will happen.
source, clone = self._fetch_flowdetail(clone=True)
for (_i, _atom, ad) in needs_to_be_created_ads:
clone.add(ad)
self._with_connection(self._save_flow_detail, source, clone)
# Insert the needed data, and get outta here...
for (i, atom, ad) in needs_to_be_created_ads:
atom_name = atom.name
atom_ids[i] = ad.uuid
self._atom_name_to_uuid[atom_name] = ad.uuid
self._set_result_mapping(atom_name, atom.save_as)
return atom_ids
def ensure_atom(self, atom):
@@ -201,63 +254,7 @@ class Storage(object):
Returns the uuid for the atomdetail that corresponds to the given atom.
"""
match = misc.match_type(atom, self._ensure_matchers)
if not match:
raise TypeError("Unknown atom '%s' (%s) requested to ensure"
% (atom, type(atom)))
else:
detail_cls, kind = match
atom_id = self._ensure_atom_detail(kind, detail_cls, atom.name,
misc.get_version_string(atom),
atom.save_as)
return atom_id
def _ensure_atom_detail(self, kind, detail_cls,
atom_name, atom_version, result_mapping):
"""Ensures there is a atomdetail that corresponds to the given atom.
If atom does not exist, adds a record for it. Added atom will have
PENDING state. Sets result mapping for the atom from result_mapping
argument.
Returns uuid for the atomdetails corresponding to the atom with
given name.
"""
if not atom_name:
raise ValueError("%s name must be non-empty" % (kind))
with self._lock.write_lock():
try:
atom_id = self._atom_name_to_uuid[atom_name]
except KeyError:
atom_id = uuidutils.generate_uuid()
self._create_atom_detail(detail_cls, atom_name,
atom_id, atom_version=atom_version)
else:
ad = self._flowdetail.find(atom_id)
if not isinstance(ad, detail_cls):
raise exceptions.Duplicate(
"Atom detail '%s' already exists in flow"
" detail '%s'" % (atom_name, self._flowdetail.name))
self._set_result_mapping(atom_name, result_mapping)
return atom_id
def _create_atom_detail(self, detail_cls, name, uuid, atom_version=None):
"""Add the atom detail to flow detail.
Atom becomes known to storage by that name and uuid.
Atom state is set to PENDING.
"""
ad = detail_cls(name, uuid)
ad.state = states.PENDING
ad.version = atom_version
# Add the atom detail to the clone, which upon success will be
# updated into the contained flow detail; if it does not get saved
# then no update will happen.
source, clone = self._fetch_flowdetail(clone=True)
clone.add(ad)
self._with_connection(self._save_flow_detail, source, clone)
self._atom_name_to_uuid[ad.name] = ad.uuid
return ad
return self.ensure_atoms([atom])[0]
@property
def flow_name(self):
@@ -631,9 +628,18 @@ class Storage(object):
expected_type=logbook.TaskDetail,
clone=True)
except exceptions.NotFound:
source = self._create_atom_detail(logbook.TaskDetail,
self.injector_name,
uuidutils.generate_uuid())
# Ensure we have our special task detail...
#
# TODO(harlowja): get this removed when
# https://review.openstack.org/#/c/165645/ merges.
source = self._create_atom_detail(self.injector_name,
logbook.TaskDetail,
atom_state=None)
fd_source, fd_clone = self._fetch_flowdetail(clone=True)
fd_clone.add(source)
self._with_connection(self._save_flow_detail, fd_source,
fd_clone)
self._atom_name_to_uuid[source.name] = source.uuid
clone = source
clone.results = dict(pairs)
clone.state = states.SUCCESS