From 4d0200f92e7cb19591c13a1eafbe8eb7ab2360f9 Mon Sep 17 00:00:00 2001 From: Joshua Harlow Date: Thu, 11 Jun 2015 16:46:06 -0700 Subject: [PATCH] Add smarter/better/faster impl. of `ensure_atoms` Instead of ensuring an atom at a time we can examine at bulk what atoms details need to be created, and then only do any persistence operations if any atom details are determined to not exist. This saves on unneeded reads and writes and helps improve storage and prepare times. Part of blueprint make-things-speedy Change-Id: I2e7e3cc60cc97cbbe4e66c69e22c34ee31ebfeb3 --- taskflow/storage.py | 132 +++++++++++++++++++++++--------------------- 1 file changed, 69 insertions(+), 63 deletions(-) diff --git a/taskflow/storage.py b/taskflow/storage.py index 8eb19c09..cb1fbaad 100644 --- a/taskflow/storage.py +++ b/taskflow/storage.py @@ -186,14 +186,67 @@ class Storage(object): with contextlib.closing(self._backend.get_connection()) as conn: return functor(conn, *args, **kwargs) - def ensure_atoms(self, atoms_iter): + @staticmethod + def _create_atom_detail(atom_name, atom_detail_cls, + atom_version=None, atom_state=states.PENDING): + ad = atom_detail_cls(atom_name, uuidutils.generate_uuid()) + ad.state = atom_state + if atom_version is not None: + ad.version = atom_version + return ad + + @fasteners.write_locked + def ensure_atoms(self, atoms): """Ensure there is an atomdetail for **each** of the given atoms. Returns list of atomdetail uuids for each atom processed. """ atom_ids = [] - for atom in atoms_iter: - atom_ids.append(self.ensure_atom(atom)) + missing_ads = [] + for i, atom in enumerate(atoms): + match = misc.match_type(atom, self._ensure_matchers) + if not match: + raise TypeError("Unknown atom '%s' (%s) requested to ensure" + % (atom, type(atom))) + atom_detail_cls, kind = match + atom_name = atom.name + if not atom_name: + raise ValueError("%s name must be non-empty" % (kind)) + try: + atom_id = self._atom_name_to_uuid[atom_name] + except KeyError: + missing_ads.append((i, atom, atom_detail_cls)) + # This will be later replaced with the uuid that is created... + atom_ids.append(None) + else: + ad = self._flowdetail.find(atom_id) + if not isinstance(ad, atom_detail_cls): + raise exceptions.Duplicate( + "Atom detail '%s' already exists in flow" + " detail '%s'" % (atom_name, self._flowdetail.name)) + else: + atom_ids.append(ad.uuid) + self._set_result_mapping(atom_name, atom.save_as) + if missing_ads: + needs_to_be_created_ads = [] + for (i, atom, atom_detail_cls) in missing_ads: + ad = self._create_atom_detail( + atom.name, atom_detail_cls, + atom_version=misc.get_version_string(atom)) + needs_to_be_created_ads.append((i, atom, ad)) + # Add the atom detail(s) to a clone, which upon success will be + # updated into the contained flow detail; if it does not get saved + # then no update will happen. + source, clone = self._fetch_flowdetail(clone=True) + for (_i, _atom, ad) in needs_to_be_created_ads: + clone.add(ad) + self._with_connection(self._save_flow_detail, source, clone) + # Insert the needed data, and get outta here... + for (i, atom, ad) in needs_to_be_created_ads: + atom_name = atom.name + atom_ids[i] = ad.uuid + self._atom_name_to_uuid[atom_name] = ad.uuid + self._set_result_mapping(atom_name, atom.save_as) return atom_ids def ensure_atom(self, atom): @@ -201,63 +254,7 @@ class Storage(object): Returns the uuid for the atomdetail that corresponds to the given atom. """ - match = misc.match_type(atom, self._ensure_matchers) - if not match: - raise TypeError("Unknown atom '%s' (%s) requested to ensure" - % (atom, type(atom))) - else: - detail_cls, kind = match - atom_id = self._ensure_atom_detail(kind, detail_cls, atom.name, - misc.get_version_string(atom), - atom.save_as) - return atom_id - - def _ensure_atom_detail(self, kind, detail_cls, - atom_name, atom_version, result_mapping): - """Ensures there is a atomdetail that corresponds to the given atom. - - If atom does not exist, adds a record for it. Added atom will have - PENDING state. Sets result mapping for the atom from result_mapping - argument. - - Returns uuid for the atomdetails corresponding to the atom with - given name. - """ - if not atom_name: - raise ValueError("%s name must be non-empty" % (kind)) - with self._lock.write_lock(): - try: - atom_id = self._atom_name_to_uuid[atom_name] - except KeyError: - atom_id = uuidutils.generate_uuid() - self._create_atom_detail(detail_cls, atom_name, - atom_id, atom_version=atom_version) - else: - ad = self._flowdetail.find(atom_id) - if not isinstance(ad, detail_cls): - raise exceptions.Duplicate( - "Atom detail '%s' already exists in flow" - " detail '%s'" % (atom_name, self._flowdetail.name)) - self._set_result_mapping(atom_name, result_mapping) - return atom_id - - def _create_atom_detail(self, detail_cls, name, uuid, atom_version=None): - """Add the atom detail to flow detail. - - Atom becomes known to storage by that name and uuid. - Atom state is set to PENDING. - """ - ad = detail_cls(name, uuid) - ad.state = states.PENDING - ad.version = atom_version - # Add the atom detail to the clone, which upon success will be - # updated into the contained flow detail; if it does not get saved - # then no update will happen. - source, clone = self._fetch_flowdetail(clone=True) - clone.add(ad) - self._with_connection(self._save_flow_detail, source, clone) - self._atom_name_to_uuid[ad.name] = ad.uuid - return ad + return self.ensure_atoms([atom])[0] @property def flow_name(self): @@ -631,9 +628,18 @@ class Storage(object): expected_type=logbook.TaskDetail, clone=True) except exceptions.NotFound: - source = self._create_atom_detail(logbook.TaskDetail, - self.injector_name, - uuidutils.generate_uuid()) + # Ensure we have our special task detail... + # + # TODO(harlowja): get this removed when + # https://review.openstack.org/#/c/165645/ merges. + source = self._create_atom_detail(self.injector_name, + logbook.TaskDetail, + atom_state=None) + fd_source, fd_clone = self._fetch_flowdetail(clone=True) + fd_clone.add(source) + self._with_connection(self._save_flow_detail, fd_source, + fd_clone) + self._atom_name_to_uuid[source.name] = source.uuid clone = source clone.results = dict(pairs) clone.state = states.SUCCESS