diff --git a/taskflow/storage.py b/taskflow/storage.py index 8eb19c09..cb1fbaad 100644 --- a/taskflow/storage.py +++ b/taskflow/storage.py @@ -186,14 +186,67 @@ class Storage(object): with contextlib.closing(self._backend.get_connection()) as conn: return functor(conn, *args, **kwargs) - def ensure_atoms(self, atoms_iter): + @staticmethod + def _create_atom_detail(atom_name, atom_detail_cls, + atom_version=None, atom_state=states.PENDING): + ad = atom_detail_cls(atom_name, uuidutils.generate_uuid()) + ad.state = atom_state + if atom_version is not None: + ad.version = atom_version + return ad + + @fasteners.write_locked + def ensure_atoms(self, atoms): """Ensure there is an atomdetail for **each** of the given atoms. Returns list of atomdetail uuids for each atom processed. """ atom_ids = [] - for atom in atoms_iter: - atom_ids.append(self.ensure_atom(atom)) + missing_ads = [] + for i, atom in enumerate(atoms): + match = misc.match_type(atom, self._ensure_matchers) + if not match: + raise TypeError("Unknown atom '%s' (%s) requested to ensure" + % (atom, type(atom))) + atom_detail_cls, kind = match + atom_name = atom.name + if not atom_name: + raise ValueError("%s name must be non-empty" % (kind)) + try: + atom_id = self._atom_name_to_uuid[atom_name] + except KeyError: + missing_ads.append((i, atom, atom_detail_cls)) + # This will be later replaced with the uuid that is created... + atom_ids.append(None) + else: + ad = self._flowdetail.find(atom_id) + if not isinstance(ad, atom_detail_cls): + raise exceptions.Duplicate( + "Atom detail '%s' already exists in flow" + " detail '%s'" % (atom_name, self._flowdetail.name)) + else: + atom_ids.append(ad.uuid) + self._set_result_mapping(atom_name, atom.save_as) + if missing_ads: + needs_to_be_created_ads = [] + for (i, atom, atom_detail_cls) in missing_ads: + ad = self._create_atom_detail( + atom.name, atom_detail_cls, + atom_version=misc.get_version_string(atom)) + needs_to_be_created_ads.append((i, atom, ad)) + # Add the atom detail(s) to a clone, which upon success will be + # updated into the contained flow detail; if it does not get saved + # then no update will happen. + source, clone = self._fetch_flowdetail(clone=True) + for (_i, _atom, ad) in needs_to_be_created_ads: + clone.add(ad) + self._with_connection(self._save_flow_detail, source, clone) + # Insert the needed data, and get outta here... + for (i, atom, ad) in needs_to_be_created_ads: + atom_name = atom.name + atom_ids[i] = ad.uuid + self._atom_name_to_uuid[atom_name] = ad.uuid + self._set_result_mapping(atom_name, atom.save_as) return atom_ids def ensure_atom(self, atom): @@ -201,63 +254,7 @@ class Storage(object): Returns the uuid for the atomdetail that corresponds to the given atom. """ - match = misc.match_type(atom, self._ensure_matchers) - if not match: - raise TypeError("Unknown atom '%s' (%s) requested to ensure" - % (atom, type(atom))) - else: - detail_cls, kind = match - atom_id = self._ensure_atom_detail(kind, detail_cls, atom.name, - misc.get_version_string(atom), - atom.save_as) - return atom_id - - def _ensure_atom_detail(self, kind, detail_cls, - atom_name, atom_version, result_mapping): - """Ensures there is a atomdetail that corresponds to the given atom. - - If atom does not exist, adds a record for it. Added atom will have - PENDING state. Sets result mapping for the atom from result_mapping - argument. - - Returns uuid for the atomdetails corresponding to the atom with - given name. - """ - if not atom_name: - raise ValueError("%s name must be non-empty" % (kind)) - with self._lock.write_lock(): - try: - atom_id = self._atom_name_to_uuid[atom_name] - except KeyError: - atom_id = uuidutils.generate_uuid() - self._create_atom_detail(detail_cls, atom_name, - atom_id, atom_version=atom_version) - else: - ad = self._flowdetail.find(atom_id) - if not isinstance(ad, detail_cls): - raise exceptions.Duplicate( - "Atom detail '%s' already exists in flow" - " detail '%s'" % (atom_name, self._flowdetail.name)) - self._set_result_mapping(atom_name, result_mapping) - return atom_id - - def _create_atom_detail(self, detail_cls, name, uuid, atom_version=None): - """Add the atom detail to flow detail. - - Atom becomes known to storage by that name and uuid. - Atom state is set to PENDING. - """ - ad = detail_cls(name, uuid) - ad.state = states.PENDING - ad.version = atom_version - # Add the atom detail to the clone, which upon success will be - # updated into the contained flow detail; if it does not get saved - # then no update will happen. - source, clone = self._fetch_flowdetail(clone=True) - clone.add(ad) - self._with_connection(self._save_flow_detail, source, clone) - self._atom_name_to_uuid[ad.name] = ad.uuid - return ad + return self.ensure_atoms([atom])[0] @property def flow_name(self): @@ -631,9 +628,18 @@ class Storage(object): expected_type=logbook.TaskDetail, clone=True) except exceptions.NotFound: - source = self._create_atom_detail(logbook.TaskDetail, - self.injector_name, - uuidutils.generate_uuid()) + # Ensure we have our special task detail... + # + # TODO(harlowja): get this removed when + # https://review.openstack.org/#/c/165645/ merges. + source = self._create_atom_detail(self.injector_name, + logbook.TaskDetail, + atom_state=None) + fd_source, fd_clone = self._fetch_flowdetail(clone=True) + fd_clone.add(source) + self._with_connection(self._save_flow_detail, fd_source, + fd_clone) + self._atom_name_to_uuid[source.name] = source.uuid clone = source clone.results = dict(pairs) clone.state = states.SUCCESS