 148723b0fe
			
		
	
	148723b0fe
	
	
	
		
			
			To ensure we are consistent with our usage of the warnings module always use it through the deprecation utility instead of accessing the warnings module directly. This adds on a new deprecation utility module decorator that can be used to notify users about function kwarg renaming (of which the 'engine_conf' -> 'engine' rename/change is one example of) and adjusts the messaging formatting functions to accomodate for its introduction. Change-Id: Ieb987ef9e1e4515caa676a007ad00590d354132e
		
			
				
	
	
		
			296 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			296 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| # -*- coding: utf-8 -*-
 | |
| 
 | |
| #    Copyright (C) 2013 Yahoo! Inc. All Rights Reserved.
 | |
| #
 | |
| #    Licensed under the Apache License, Version 2.0 (the "License"); you may
 | |
| #    not use this file except in compliance with the License. You may obtain
 | |
| #    a copy of the License at
 | |
| #
 | |
| #         http://www.apache.org/licenses/LICENSE-2.0
 | |
| #
 | |
| #    Unless required by applicable law or agreed to in writing, software
 | |
| #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 | |
| #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 | |
| #    License for the specific language governing permissions and limitations
 | |
| #    under the License.
 | |
| 
 | |
| import contextlib
 | |
| 
 | |
| from oslo.utils import importutils
 | |
| import six
 | |
| import stevedore.driver
 | |
| 
 | |
| from taskflow import exceptions as exc
 | |
| from taskflow.persistence import backends as p_backends
 | |
| from taskflow.utils import deprecation
 | |
| from taskflow.utils import misc
 | |
| from taskflow.utils import persistence_utils as p_utils
 | |
| from taskflow.utils import reflection
 | |
| 
 | |
| 
 | |
| # NOTE(imelnikov): this is the entrypoint namespace, not the module namespace.
 | |
| ENGINES_NAMESPACE = 'taskflow.engines'
 | |
| 
 | |
| # The default entrypoint engine type looked for when it is not provided.
 | |
| ENGINE_DEFAULT = 'default'
 | |
| 
 | |
| 
 | |
| @deprecation.renamed_kwarg('engine_conf', 'engine',
 | |
|                            version="0.6", removal_version="?",
 | |
|                            # This is set to none since this function is called
 | |
|                            # from 2 other functions in this module, both of
 | |
|                            # which have different stack levels, possibly we
 | |
|                            # can fix this in the future...
 | |
|                            stacklevel=None)
 | |
| def _extract_engine(**kwargs):
 | |
|     """Extracts the engine kind and any associated options."""
 | |
|     options = {}
 | |
|     kind = kwargs.pop('engine', None)
 | |
|     engine_conf = kwargs.pop('engine_conf', None)
 | |
|     if engine_conf is not None:
 | |
|         if isinstance(engine_conf, six.string_types):
 | |
|             kind = engine_conf
 | |
|         else:
 | |
|             options.update(engine_conf)
 | |
|             kind = options.pop('engine', None)
 | |
|     if not kind:
 | |
|         kind = ENGINE_DEFAULT
 | |
|     # See if it's a URI and if so, extract any further options...
 | |
|     try:
 | |
|         uri = misc.parse_uri(kind)
 | |
|     except (TypeError, ValueError):
 | |
|         pass
 | |
|     else:
 | |
|         kind = uri.scheme
 | |
|         options = misc.merge_uri(uri, options.copy())
 | |
|     # Merge in any leftover **kwargs into the options, this makes it so that
 | |
|     # the provided **kwargs override any URI or engine_conf specific options.
 | |
|     options.update(kwargs)
 | |
|     return (kind, options)
 | |
| 
 | |
| 
 | |
| def _fetch_factory(factory_name):
 | |
|     try:
 | |
|         return importutils.import_class(factory_name)
 | |
|     except (ImportError, ValueError) as e:
 | |
|         raise ImportError("Could not import factory %r: %s"
 | |
|                           % (factory_name, e))
 | |
| 
 | |
| 
 | |
| def _fetch_validate_factory(flow_factory):
 | |
|     if isinstance(flow_factory, six.string_types):
 | |
|         factory_fun = _fetch_factory(flow_factory)
 | |
|         factory_name = flow_factory
 | |
|     else:
 | |
|         factory_fun = flow_factory
 | |
|         factory_name = reflection.get_callable_name(flow_factory)
 | |
|         try:
 | |
|             reimported = _fetch_factory(factory_name)
 | |
|             assert reimported == factory_fun
 | |
|         except (ImportError, AssertionError):
 | |
|             raise ValueError('Flow factory %r is not reimportable by name %s'
 | |
|                              % (factory_fun, factory_name))
 | |
|     return (factory_name, factory_fun)
 | |
| 
 | |
| 
 | |
| def load(flow, store=None, flow_detail=None, book=None,
 | |
|          engine_conf=None, backend=None,
 | |
|          namespace=ENGINES_NAMESPACE, engine=ENGINE_DEFAULT, **kwargs):
 | |
|     """Load a flow into an engine.
 | |
| 
 | |
|     This function creates and prepares an engine to run the provided flow. All
 | |
|     that is left after this returns is to run the engine with the
 | |
|     engines ``run()`` method.
 | |
| 
 | |
|     Which engine to load is specified via the ``engine`` parameter. It
 | |
|     can be a string that names the engine type to use, or a string that
 | |
|     is a URI with a scheme that names the engine type to use and further
 | |
|     options contained in the URI's host, port, and query parameters...
 | |
| 
 | |
|     Which storage backend to use is defined by the backend parameter. It
 | |
|     can be backend itself, or a dictionary that is passed to
 | |
|     ``taskflow.persistence.backends.fetch()`` to obtain a viable backend.
 | |
| 
 | |
|     :param flow: flow to load
 | |
|     :param store: dict -- data to put to storage to satisfy flow requirements
 | |
|     :param flow_detail: FlowDetail that holds the state of the flow (if one is
 | |
|         not provided then one will be created for you in the provided backend)
 | |
|     :param book: LogBook to create flow detail in if flow_detail is None
 | |
|     :param engine_conf: engine type or URI and options (**deprecated**)
 | |
|     :param backend: storage backend to use or configuration that defines it
 | |
|     :param namespace: driver namespace for stevedore (or empty for default)
 | |
|     :param engine: string engine type or URI string with scheme that contains
 | |
|                    the engine type and any URI specific components that will
 | |
|                    become part of the engine options.
 | |
|     :param kwargs: arbitrary keyword arguments passed as options (merged with
 | |
|                    any extracted ``engine`` and ``engine_conf`` options),
 | |
|                    typically used for any engine specific options that do not
 | |
|                    fit as any of the existing arguments.
 | |
|     :returns: engine
 | |
|     """
 | |
| 
 | |
|     kind, options = _extract_engine(engine_conf=engine_conf,
 | |
|                                     engine=engine, **kwargs)
 | |
| 
 | |
|     if isinstance(backend, dict):
 | |
|         backend = p_backends.fetch(backend)
 | |
| 
 | |
|     if flow_detail is None:
 | |
|         flow_detail = p_utils.create_flow_detail(flow, book=book,
 | |
|                                                  backend=backend)
 | |
| 
 | |
|     try:
 | |
|         mgr = stevedore.driver.DriverManager(
 | |
|             namespace, kind,
 | |
|             invoke_on_load=True,
 | |
|             invoke_args=(flow, flow_detail, backend, options))
 | |
|         engine = mgr.driver
 | |
|     except RuntimeError as e:
 | |
|         raise exc.NotFound("Could not find engine '%s'" % (kind), e)
 | |
|     else:
 | |
|         if store:
 | |
|             engine.storage.inject(store)
 | |
|         return engine
 | |
| 
 | |
| 
 | |
| def run(flow, store=None, flow_detail=None, book=None,
 | |
|         engine_conf=None, backend=None, namespace=ENGINES_NAMESPACE,
 | |
|         engine=ENGINE_DEFAULT, **kwargs):
 | |
|     """Run the flow.
 | |
| 
 | |
|     This function loads the flow into an engine (with the :func:`load() <load>`
 | |
|     function) and runs the engine.
 | |
| 
 | |
|     The arguments are interpreted as for :func:`load() <load>`.
 | |
| 
 | |
|     :returns: dictionary of all named results (see ``storage.fetch_all()``)
 | |
|     """
 | |
|     engine = load(flow, store=store, flow_detail=flow_detail, book=book,
 | |
|                   engine_conf=engine_conf, backend=backend,
 | |
|                   namespace=namespace, engine=engine, **kwargs)
 | |
|     engine.run()
 | |
|     return engine.storage.fetch_all()
 | |
| 
 | |
| 
 | |
| def save_factory_details(flow_detail,
 | |
|                          flow_factory, factory_args, factory_kwargs,
 | |
|                          backend=None):
 | |
|     """Saves the given factories reimportable attributes into the flow detail.
 | |
| 
 | |
|     This function saves the factory name, arguments, and keyword arguments
 | |
|     into the given flow details object  and if a backend is provided it will
 | |
|     also ensure that the backend saves the flow details after being updated.
 | |
| 
 | |
|     :param flow_detail: FlowDetail that holds state of the flow to load
 | |
|     :param flow_factory: function or string: function that creates the flow
 | |
|     :param factory_args: list or tuple of factory positional arguments
 | |
|     :param factory_kwargs: dict of factory keyword arguments
 | |
|     :param backend: storage backend to use or configuration
 | |
|     """
 | |
|     if not factory_args:
 | |
|         factory_args = []
 | |
|     if not factory_kwargs:
 | |
|         factory_kwargs = {}
 | |
|     factory_name, _factory_fun = _fetch_validate_factory(flow_factory)
 | |
|     factory_data = {
 | |
|         'factory': {
 | |
|             'name': factory_name,
 | |
|             'args': factory_args,
 | |
|             'kwargs': factory_kwargs,
 | |
|         },
 | |
|     }
 | |
|     if not flow_detail.meta:
 | |
|         flow_detail.meta = factory_data
 | |
|     else:
 | |
|         flow_detail.meta.update(factory_data)
 | |
|     if backend is not None:
 | |
|         if isinstance(backend, dict):
 | |
|             backend = p_backends.fetch(backend)
 | |
|         with contextlib.closing(backend.get_connection()) as conn:
 | |
|             conn.update_flow_details(flow_detail)
 | |
| 
 | |
| 
 | |
| def load_from_factory(flow_factory, factory_args=None, factory_kwargs=None,
 | |
|                       store=None, book=None, engine_conf=None, backend=None,
 | |
|                       namespace=ENGINES_NAMESPACE, engine=ENGINE_DEFAULT,
 | |
|                       **kwargs):
 | |
|     """Loads a flow from a factory function into an engine.
 | |
| 
 | |
|     Gets flow factory function (or name of it) and creates flow with
 | |
|     it. Then, the flow is loaded into an engine with the :func:`load() <load>`
 | |
|     function, and the factory function fully qualified name is saved to flow
 | |
|     metadata so that it can be later resumed.
 | |
| 
 | |
|     :param flow_factory: function or string: function that creates the flow
 | |
|     :param factory_args: list or tuple of factory positional arguments
 | |
|     :param factory_kwargs: dict of factory keyword arguments
 | |
| 
 | |
|     Further arguments are interpreted as for :func:`load() <load>`.
 | |
| 
 | |
|     :returns: engine
 | |
|     """
 | |
| 
 | |
|     _factory_name, factory_fun = _fetch_validate_factory(flow_factory)
 | |
|     if not factory_args:
 | |
|         factory_args = []
 | |
|     if not factory_kwargs:
 | |
|         factory_kwargs = {}
 | |
|     flow = factory_fun(*factory_args, **factory_kwargs)
 | |
|     if isinstance(backend, dict):
 | |
|         backend = p_backends.fetch(backend)
 | |
|     flow_detail = p_utils.create_flow_detail(flow, book=book, backend=backend)
 | |
|     save_factory_details(flow_detail,
 | |
|                          flow_factory, factory_args, factory_kwargs,
 | |
|                          backend=backend)
 | |
|     return load(flow=flow, store=store, flow_detail=flow_detail, book=book,
 | |
|                 engine_conf=engine_conf, backend=backend, namespace=namespace,
 | |
|                 engine=engine, **kwargs)
 | |
| 
 | |
| 
 | |
| def flow_from_detail(flow_detail):
 | |
|     """Reloads a flow previously saved.
 | |
| 
 | |
|     Gets the flow factories name and any arguments and keyword arguments from
 | |
|     the flow details metadata, and then calls that factory to recreate the
 | |
|     flow.
 | |
| 
 | |
|     :param flow_detail: FlowDetail that holds state of the flow to load
 | |
|     """
 | |
|     try:
 | |
|         factory_data = flow_detail.meta['factory']
 | |
|     except (KeyError, AttributeError, TypeError):
 | |
|         raise ValueError('Cannot reconstruct flow %s %s: '
 | |
|                          'no factory information saved.'
 | |
|                          % (flow_detail.name, flow_detail.uuid))
 | |
| 
 | |
|     try:
 | |
|         factory_fun = _fetch_factory(factory_data['name'])
 | |
|     except (KeyError, ImportError):
 | |
|         raise ImportError('Could not import factory for flow %s %s'
 | |
|                           % (flow_detail.name, flow_detail.uuid))
 | |
| 
 | |
|     args = factory_data.get('args', ())
 | |
|     kwargs = factory_data.get('kwargs', {})
 | |
|     return factory_fun(*args, **kwargs)
 | |
| 
 | |
| 
 | |
| def load_from_detail(flow_detail, store=None, engine_conf=None, backend=None,
 | |
|                      namespace=ENGINES_NAMESPACE, engine=ENGINE_DEFAULT,
 | |
|                      **kwargs):
 | |
|     """Reloads an engine previously saved.
 | |
| 
 | |
|     This reloads the flow using the
 | |
|     :func:`flow_from_detail() <flow_from_detail>` function and then calls
 | |
|     into the :func:`load() <load>` function to create an engine from that flow.
 | |
| 
 | |
|     :param flow_detail: FlowDetail that holds state of the flow to load
 | |
| 
 | |
|     Further arguments are interpreted as for :func:`load() <load>`.
 | |
| 
 | |
|     :returns: engine
 | |
|     """
 | |
|     flow = flow_from_detail(flow_detail)
 | |
|     return load(flow, flow_detail=flow_detail,
 | |
|                 store=store, engine_conf=engine_conf, backend=backend,
 | |
|                 namespace=namespace, engine=engine, **kwargs)
 |