diff --git a/taskflow/patterns/distributed_flow.py b/taskflow/patterns/distributed_flow.py index 5b4268b6..e6699a2c 100644 --- a/taskflow/patterns/distributed_flow.py +++ b/taskflow/patterns/distributed_flow.py @@ -20,8 +20,6 @@ import celery import logging -from taskflow import logbook - LOG = logging.getLogger(__name__) @@ -40,7 +38,6 @@ class Flow(object): self.name = name self.root = None self._tasks = [] - logbook.add_workflow(name) def chain_listeners(self, context, initial_task, callback_task): """Register one listener for a task.""" diff --git a/update.py b/update.py old mode 100644 new mode 100755 index c790ef72..7d78185d --- a/update.py +++ b/update.py @@ -1,3 +1,5 @@ +#!/usr/bin/env python + # vim: tabstop=4 shiftwidth=4 softtabstop=4 # Copyright 2012 Red Hat, Inc. @@ -70,13 +72,14 @@ import sys from oslo.config import cfg -opts = [ +BASE_MOD = 'taskflow' +OPTS = [ cfg.ListOpt('primitives', default=[], - help='The list of primitives to copy from taskflow'), + help='The list of primitives to copy from %s' % BASE_MOD), cfg.StrOpt('base', default=None, - help='The base module to hold the copy of taskflow'), + help='The base module to hold the copy of %s' % BASE_MOD), cfg.StrOpt('dest-dir', default=None, help='Destination project directory'), @@ -85,17 +88,21 @@ opts = [ help='A config file or destination project directory', positional=True), ] -allowed_primitives = ['flow', 'task', 'decorators'] +ALLOWED_PRIMITIVES = ('flow', 'task', 'decorators') +IMPORT_FROM = re.compile(r"^\s*from\s+" + BASE_MOD + r"\s*(.*)$") +BASE_CONF = '%s.conf' % (BASE_MOD) +MACHINE_GENERATED = ('# DO NOT EDIT THIS FILE BY HAND -- YOUR CHANGES WILL BE ' + 'OVERWRITTEN', '') def _parse_args(argv): conf = cfg.ConfigOpts() - conf.register_cli_opts(opts) + conf.register_cli_opts(OPTS) conf(argv, usage='Usage: %(prog)s [config-file|dest-dir]') if conf.configfile_or_destdir: def def_config_file(dest_dir): - return os.path.join(dest_dir, 'taskflow.conf') + return os.path.join(dest_dir, BASE_CONF) config_file = None if os.path.isfile(conf.configfile_or_destdir): @@ -112,17 +119,14 @@ def _parse_args(argv): def _explode_path(path): dirs = [] - comps = [] dirs.append(path) (head, tail) = os.path.split(path) while tail: dirs.append(head) - comps.append(tail) path = head (head, tail) = os.path.split(path) dirs.sort() - comps.reverse() - return (dirs, comps) + return dirs def _mod_to_path(mod): @@ -133,7 +137,13 @@ def _dest_path(path, base, dest_dir): return os.path.join(dest_dir, _mod_to_path(base), path) -def _replace(path, pattern, replacement): +def _drop_init(path): + with open(path, 'wb') as fh: + for line in MACHINE_GENERATED: + fh.write(line + '\n') + + +def _bulk_replace(path, pattern, replacement): with open(path, "rb+") as f: lines = f.readlines() f.seek(0) @@ -142,116 +152,136 @@ def _replace(path, pattern, replacement): f.write(re.sub(pattern, replacement, line)) -def _drop_init(path): - with open(path, 'wb') as fh: - contents = ''' -# vim: tabstop=4 shiftwidth=4 softtabstop=4 -''' - fh.write(contents.strip()) - fh.write("\n") - - def _make_dirs(path): dir_name = os.path.dirname(path) - for d in _explode_path(dir_name)[0]: + dirs_needed = [] + for d in _explode_path(dir_name): if not os.path.isdir(d): - print(" '%s/' (new)" % (d)) + dirs_needed.append(d) + if dirs_needed: + print("Creating directories for '%s'" % (dir_name)) + for d in dirs_needed: + print(" '%s'" % (d)) os.mkdir(d) init_path = os.path.join(d, '__init__.py') if not os.path.exists(init_path): - print(" '%s' (new)" % (init_path)) + print(" '%s'" % (init_path)) _drop_init(init_path) -def _join_prefix_postfix(prefix, postfix): - joined = str(prefix) - if postfix: - if joined: - joined += '.' - joined += str(postfix) - return joined +def _join_mod(*pieces): + return ".".join([str(p) for p in pieces if p]) -def _copy_file(path, dest, base, common_already=None): - _make_dirs(dest) +def _reform_import(mod, postfix, alias, comment): + assert mod, 'Module required' + import_line = '' + if mod and not postfix: + import_line = 'import %s' % (mod) + else: + import_line = 'from %s import %s' % (mod, postfix) + if alias: + import_line += ' as %s' % (alias) + if comment: + import_line += ' #' + str(comment) + return import_line - print(" '%s' -> '%s'" % (path, dest)) - shutil.copy2(path, dest) - def import_replace(path): +def _copy_file(path, dest, base, root_mods=None, common_already=None): + + def _copy_it(): + _make_dirs(dest) + print("Copying '%s'" % (path)) + print(" '%s' -> '%s'" % (path, dest)) + shutil.copy2(path, dest) + + def _form_mod(prefix, postfix): + importing = _join_mod(prefix, postfix) + if importing not in common_already: + new_mod = [base, BASE_MOD, prefix] + else: + new_mod = [base, 'openstack', 'common'] + # If the import is something like 'openstack.common.a.b.c.d' + # ensure that we take the part after the first two + # segments to ensure that we include it correctly. + prefix_pieces = _split_mod(prefix) + for p in prefix_pieces[2:]: + new_mod.append(p) + return _join_mod(*new_mod) + + def _import_replace(path): with open(path, "rb+") as f: lines = f.readlines() f.seek(0) f.truncate() - for (i, line) in enumerate(lines): + new_lines = [] + for line in MACHINE_GENERATED: + new_lines.append(line + "\n") + new_lines.extend(lines) + for (i, line) in enumerate(new_lines): segments = _parse_import_line(line, i + 1, path) if segments: - old_line = line + original_line = line (comment, prefix, postfix, alias) = segments - line = 'from %s import %s' - importing = _join_prefix_postfix(prefix, postfix) - if common_already and importing in common_already: - # Use the existing openstack common - mod = '%s.openstack.common' % (base) - else: - mod = '%s.taskflow' % (base) - if prefix: - mod += ".%s" % (prefix) - line = line % (mod, postfix) - if alias: - line += ' as %s' % (alias) - if comment: - line += ' #' + str(comment) - line += "\n" - if old_line != line: - print(" '%s' -> '%s'" % (old_line.strip(), - line.strip())) + line = "%s\n" % _reform_import(_form_mod(prefix, postfix), + postfix, alias, comment) + if original_line != line: + print(" '%s' -> '%s'; line %s" + % (original_line.strip(), line.strip(), i + 1)) f.write(line) - print("Fixing up %s" % (dest)) - import_replace(dest) - _replace(dest, - 'possible_topdir, "taskflow",$', - 'possible_topdir, "' + base + '",') + # Only bother making it if we already didn't make it... + if not os.path.exists(dest): + _copy_it() + print("Fixing up '%s'" % (dest)) + _import_replace(dest) + _bulk_replace(dest, + 'possible_topdir, "%s",$' % (BASE_MOD), + 'possible_topdir, "' + base + '",') -def _is_mod_path(segments): +def _get_mod_path(segments, base): if not segments: - return False - mod = ".".join(segments) - mod_path = _mod_to_path("taskflow.%s" % (mod)) + ".py" - if os.path.exists(mod_path): - return True - return False + return (False, None) + mod_path = _mod_to_path(_join_mod(base, *segments)) + '.py' + if os.path.isfile(mod_path): + return (True, mod_path) + return (False, mod_path) -def _split_import(text): - pieces = [] - for piece in text.split("."): - piece = piece.strip() - if piece: - pieces.append(piece) - return pieces +def _split_mod(text): + pieces = text.split('.') + return [p.strip() for p in pieces if p.strip()] -def _copy_pyfile(path, base, dest_dir, common_already=None): - _copy_file(path, _dest_path(path, base, dest_dir), base, common_already) +def _copy_pyfile(path, base, dest_dir, root_mods=None, common_already=None): + _copy_file(path, _dest_path(path, base, dest_dir), base, + common_already=common_already, root_mods=root_mods) -def _copy_mod(mod, base, dest_dir, common_already=None): - full_mod = base + '.taskflow.%s' % mod - base_mod = 'taskflow.%s' % mod - print("Copying module '%s' -> '%s'" % (base_mod, full_mod)) - +def _copy_mod(mod, base, dest_dir, common_already=None, root_mods=None): + if not root_mods: + root_mods = {} + if not common_already: + common_already = set() copy_pyfile = functools.partial(_copy_pyfile, base=base, dest_dir=dest_dir, - common_already=common_already) - - mod_file = _mod_to_path(base_mod) + ".py" - if os.path.isfile(mod_file): + common_already=common_already, + root_mods=root_mods) + # Ensure that the module has a root module if it has a mapping to one so + # that its __init__.py file will exist. + root_existed = False + if mod in root_mods: + root_existed = True + copy_pyfile(root_mods[mod]) + exists, mod_file = _get_mod_path([mod], base=BASE_MOD) + if exists: + print("Creating module '%s'" % (_join_mod(base, BASE_MOD, mod))) copy_pyfile(mod_file) else: - raise IOError("Can not find module file: %s" % (mod_file)) + if not root_existed: + raise IOError("Can not find module: %s" % (_join_mod(BASE_MOD, + mod))) def _parse_import_line(line, linenum=-1, filename=None): @@ -261,20 +291,10 @@ def _parse_import_line(line, linenum=-1, filename=None): if linenum > 0: msg += "; line %s" % (linenum) if filename: - msg += " from file (%s)" % (filename) + msg += " from file '%s'" % (filename) raise IOError(msg) - def split_import(text): - pieces = [] - for piece in text.split("."): - piece = piece.strip() - if piece: - pieces.append(piece) - else: - blowup() - return pieces - - result = re.match(r"\s*from\s+taskflow\s*(.*)$", line) + result = IMPORT_FROM.match(line) if not result: return None rest = result.group(1).split("#", 1) @@ -289,7 +309,7 @@ def _parse_import_line(line, linenum=-1, filename=None): # Figure out the contents of a line like: # - # from taskflow.xyz import blah as blah2 + # from abc.xyz import blah as blah2 # First looking at the '.xyz' part (if it exists) prefix = '' @@ -310,52 +330,64 @@ def _parse_import_line(line, linenum=-1, filename=None): # Figure out if this is being aliased and keep the alias. importing = result.group(1).strip() - alias_match = re.search(r"(.*?)\s+as\s+(.*)$", importing) + result = re.match(r"(.*?)\s+as\s+(.*)$", importing) alias = '' - if not alias_match: + if not result: postfix = importing else: - alias = alias_match.group(2).strip() - postfix = alias_match.group(1).strip() + alias = result.group(2).strip() + postfix = result.group(1).strip() return (comment, prefix, postfix, alias) def _find_import_modules(srcfile): with open(srcfile, 'rb') as f: - for (i, line) in enumerate(f): - segments = _parse_import_line(line, i + 1, srcfile) - if segments: - (comment, prefix, postfix, alias) = segments - importing = _join_prefix_postfix(prefix, postfix) - import_segments = _split_import(importing) - while len(import_segments): - if _is_mod_path(import_segments): - break - else: - import_segments.pop() - import_what = ".".join(import_segments) - if import_what: - yield import_what + lines = f.readlines() + for (i, line) in enumerate(lines): + segments = _parse_import_line(line, i + 1, srcfile) + if not segments: + continue + (comment, prefix, postfix, alias) = segments + importing = _join_mod(prefix, postfix) + import_segments = _split_mod(importing) + prefix_segments = _split_mod(prefix) + while len(import_segments): + # Attempt to locate where the module is by popping import + # segments until we find one that actually exists. + exists, _mod_path = _get_mod_path(import_segments, base=BASE_MOD) + if exists: + break + else: + import_segments.pop() + if not import_segments or len(import_segments) < len(prefix_segments): + raise IOError("Unable to find import '%s'; line %s from file" + " '%s'" % (importing, i + 1, srcfile)) + yield _join_mod(*import_segments) def _build_dependency_tree(): dep_tree = {} - base_path = 'taskflow' - for dirpath, _, filenames in os.walk(base_path): + root_mods = {} + for dirpath, _tmp, filenames in os.walk(BASE_MOD): for filename in [x for x in filenames if x.endswith('.py')]: - if dirpath == base_path: + if dirpath == BASE_MOD: mod_name = filename.split('.')[0] + root_mods[mod_name] = os.path.join(dirpath, '__init__.py') else: - mod_name = dirpath.split(os.sep)[1:] - mod_name = ".".join(mod_name) - mod_name += '.' + filename.split('.')[0] - if mod_name.endswith('__init__'): - continue - filepath = os.path.join(dirpath, filename) - dep_list = dep_tree.setdefault(mod_name, []) - dep_list.extend([x for x in _find_import_modules(filepath) - if x != mod_name and x not in dep_list]) - return dep_tree + mod_pieces = dirpath.split(os.sep)[1:] + mod_pieces.append(filename.split('.')[0]) + mod_name = _join_mod(*mod_pieces) + if mod_name.endswith('__init__') or filename == '__init__.py': + segments = _split_mod(mod_name)[0:-1] + if segments: + mod_name = _join_mod(*segments) + root_mods[mod_name] = os.path.join(dirpath, filename) + else: + filepath = os.path.join(dirpath, filename) + dep_list = dep_tree.setdefault(mod_name, []) + dep_list.extend([x for x in _find_import_modules(filepath) + if x != mod_name and x not in dep_list]) + return (dep_tree, root_mods) def _dfs_dependency_tree(dep_tree, mod_name, mod_list=[]): @@ -369,9 +401,9 @@ def _dfs_dependency_tree(dep_tree, mod_name, mod_list=[]): def _complete_flow_list(flows): def check_fetch_mod(flow): - mod = 'patterns.%s' % (flow) - mod_path = _mod_to_path("taskflow.%s" % (mod)) + ".py" - if not os.path.isfile(mod_path): + mod = _join_mod('patterns', flow) + exists, mod_path = _get_mod_path([mod], base=BASE_MOD) + if not exists: raise IOError("Flow %s file not found at: %s" % (flow, mod_path)) return mod @@ -381,28 +413,39 @@ def _complete_flow_list(flows): if not f: continue flow_mods.append(check_fetch_mod(f)) + return flow_mods - return _complete_module_list(flow_mods) + +def _is_prefix_of(prefix_text, haystack): + for t in haystack: + if t.startswith(prefix_text): + return True + return False def _complete_module_list(base): - dep_tree = _build_dependency_tree() + dep_tree, root_mods = _build_dependency_tree() mod_list = [] for mod in base: for x in _dfs_dependency_tree(dep_tree, mod, []): if x not in mod_list and x not in base: mod_list.append(x) mod_list.extend(base) - return mod_list + # Ensure that we connect the roots of the mods to the mods themselves + # and include them in the list of mods to be completed so they are included + # also. + for m in root_mods.keys(): + if _is_prefix_of(m, base) and m not in mod_list: + mod_list.append(m) + return (mod_list, root_mods) -def _find_existing_common(mod, base, dest_dir): - existing_mod = ".".join([base, mod]) - existing_path = _mod_to_path(existing_mod) - existing_path = os.path.join(dest_dir, existing_path) + ".py" - if not os.path.isfile(existing_path): - return None - return existing_mod +def _find_existing(mod, base, dest_dir): + mod = _join_mod(base, mod) + mod_path = os.path.join(dest_dir, _mod_to_path(mod)) + '.py' + if os.path.isfile(mod_path): + return mod + return None def _uniq_itr(itr): @@ -414,6 +457,19 @@ def _uniq_itr(itr): yield i +def _rm_tree(base): + dirpaths = [] + for dirpath, _tmp, filenames in os.walk(base): + print(" '%s' (X)" % (dirpath)) + for filename in filenames: + filepath = os.path.join(dirpath, filename) + print(" '%s' (X)" % (filepath)) + os.unlink(filepath) + dirpaths.append(dirpath) + for d in reversed(dirpaths): + shutil.rmtree(d) + + def main(argv): conf = _parse_args(argv) @@ -441,19 +497,19 @@ def main(argv): # TODO(harlowja): for now these are the only primitives we are allowing to # be copied over. Later add more as needed. prims = 0 - for k in allowed_primitives: + for k in ALLOWED_PRIMITIVES: prims += len(primitive_types[k]) if prims <= 0: - allowed = ", ".join(sorted(allowed_primitives)) + allowed = ", ".join(sorted(ALLOWED_PRIMITIVES)) print("A list of primitives to copy is required " "(%s is allowed)" % (allowed), file=sys.stderr) sys.exit(1) unknown_prims = [] for k in primitive_types.keys(): - if k not in allowed_primitives: + if k not in ALLOWED_PRIMITIVES: unknown_prims.append(k) if unknown_prims: - allowed = ", ".join(sorted(allowed_primitives)) + allowed = ", ".join(sorted(ALLOWED_PRIMITIVES)) unknown = ", ".join(sorted(unknown_prims)) print("Unknown primitives (%s) are being copied " "(%s is allowed)" % (unknown, allowed), file=sys.stderr) @@ -463,22 +519,53 @@ def main(argv): print("A destination base module is required", file=sys.stderr) sys.exit(1) - def copy_mods(mod_list): - common_already = [] - for mod in sorted(mod_list): + def copy_mods(mod_list, root_mods): + common_already = {} + missing_common = set() + for mod in list(sorted(mod_list)): + # NOTE(harlowja): attempt to use the modules being copied to common + # folder as much as possible for modules that are needed for + # taskflow as this avoids duplicating openstack.common in the + # contained project as well as in the taskflow subfolder. if mod.startswith("openstack.common"): - existing = _find_existing_common(mod, conf.base, dest_dir) - if existing: + existing_mod = _find_existing(mod, conf.base, dest_dir) + if existing_mod: + common_already[mod] = existing_mod mod_list.remove(mod) - common_already.append(mod) + else: + missing_common.add(mod) + there_common_mod = _join_mod(conf.base, 'openstack', 'common') + if common_already: + print("The following modules will be used from the containing" + " projects '%s'" % (there_common_mod)) + for mod in sorted(common_already.keys()): + target_mod = common_already[mod] + print(" '%s' -> '%s'" % (mod, target_mod)) + if missing_common: + print("The following modules will *not* be used from the" + " containing projects '%s'" % (there_common_mod)) + for mod in sorted(missing_common): + print(" '%s'" % (mod)) for mod in _uniq_itr(sorted(mod_list)): - _copy_mod(mod, conf.base, dest_dir, common_already) + _copy_mod(mod, conf.base, dest_dir, + common_already=common_already, + root_mods=root_mods) - copy_what = [] - copy_what.extend(_complete_flow_list(primitive_types.pop('flow', []))) - for k in primitive_types.keys(): - copy_what.extend(_complete_module_list([k])) - copy_mods(copy_what) + def clean_old(): + old_base = os.path.join(dest_dir, conf.base, BASE_MOD) + if os.path.isdir(old_base): + print("Removing old %s tree found at '%s'" % (BASE_MOD, old_base)) + _rm_tree(old_base) + + find_what = _complete_flow_list(primitive_types.pop('flow', [])) + find_what.extend(primitive_types.keys()) + find_what = [f for f in _uniq_itr(find_what)] + copy_what, root_mods = _complete_module_list(find_what) + if copy_what: + clean_old() + copy_mods([m for m in _uniq_itr(copy_what)], root_mods) + else: + print("Nothing to copy.") if __name__ == "__main__":