In this commit we split utils module into several parts: - flow_utils, with code used in running flows; - threading_utils, with code that helps in working with threads; - reflection, with code that inspects python objects metadata; - misc, with all the other code that used to live in utils.py. We also move graph_utils into taskflow.utils package. This commit just moves code around. It should not change any logic (with exception of complex_graph example). Change-Id: Iebfe45395f0ff502bc00fc7ae14829130b2c6abe
138 lines
5.8 KiB
Python
138 lines
5.8 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
|
|
|
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import logging
|
|
|
|
from taskflow import states
|
|
from taskflow.utils import misc as utils
|
|
|
|
LOG = logging.getLogger(__name__)
|
|
|
|
|
|
class Resumption(object):
|
|
# NOTE(harlowja): This allows for resumption by skipping tasks which
|
|
# have already occurred, aka fast-forwarding through a workflow to
|
|
# the last point it stopped (if possible).
|
|
def __init__(self, logbook):
|
|
self._logbook = logbook
|
|
|
|
def record_for(self, flow):
|
|
|
|
def _task_listener(state, details):
|
|
"""Store the result of the task under the given flow in the log
|
|
book so that it can be retrieved later.
|
|
"""
|
|
runner = details['runner']
|
|
flow = details['flow']
|
|
LOG.debug("Recording %s of %s has finished state %s",
|
|
runner, flow, state)
|
|
metadata = {}
|
|
flow_details = self._logbook[flow.uuid]
|
|
if state in (states.SUCCESS, states.FAILURE):
|
|
metadata['result'] = runner.result
|
|
if runner.uuid not in flow_details:
|
|
metadata['states'] = [state]
|
|
metadata['version'] = runner.version
|
|
flow_details.add_task(runner.uuid, metadata)
|
|
else:
|
|
details = flow_details[runner.uuid]
|
|
immediate_version = runner.version
|
|
recorded_version = details.metadata.get('version')
|
|
if recorded_version is not None:
|
|
if not utils.is_version_compatible(recorded_version,
|
|
immediate_version):
|
|
LOG.warn("Updating a task with a different version"
|
|
" than the one being listened to (%s != %s)",
|
|
recorded_version, immediate_version)
|
|
past_states = details.metadata.get('states', [])
|
|
if state not in past_states:
|
|
past_states.append(state)
|
|
details.metadata['states'] = past_states
|
|
if metadata:
|
|
details.metadata.update(metadata)
|
|
|
|
def _workflow_listener(state, details):
|
|
"""Ensure that when we receive an event from said workflow that we
|
|
make sure a logbook entry exists for that flow.
|
|
"""
|
|
flow = details['flow']
|
|
old_state = details['old_state']
|
|
LOG.debug("%s has transitioned from %s to %s", flow, old_state,
|
|
state)
|
|
if flow.uuid in self._logbook:
|
|
return
|
|
self._logbook.add_flow(flow.uuid)
|
|
|
|
flow.task_notifier.register('*', _task_listener)
|
|
flow.notifier.register('*', _workflow_listener)
|
|
|
|
def _reconcile_versions(self, desired_version, task_details):
|
|
# For now don't do anything to reconcile the desired version
|
|
# from the actual version present in the task details, but in the
|
|
# future we could try to alter the task details to be in the older
|
|
# format (or more complicated logic...)
|
|
return task_details
|
|
|
|
def _get_details(self, flow_details, runner):
|
|
if runner.uuid not in flow_details:
|
|
return (False, None)
|
|
details = flow_details[runner.uuid]
|
|
has_completed = False
|
|
for state in details.metadata.get('states', []):
|
|
if state in (states.SUCCESS, states.FAILURE):
|
|
has_completed = True
|
|
break
|
|
if not has_completed:
|
|
return (False, None)
|
|
immediate_version = runner.version
|
|
recorded_version = details.metadata.get('version')
|
|
if recorded_version is not None:
|
|
if not utils.is_version_compatible(recorded_version,
|
|
immediate_version):
|
|
LOG.warn("Fetching runner metadata from a task with"
|
|
" a different version from the one being"
|
|
" processed (%s != %s)", recorded_version,
|
|
immediate_version)
|
|
details = self._reconcile_versions(immediate_version, details)
|
|
return (True, details)
|
|
|
|
def __call__(self, flow, ordering):
|
|
"""Splits the initial ordering into two segments, the first which
|
|
has already completed (or errored) and the second which has not
|
|
completed or errored.
|
|
"""
|
|
|
|
flow_id = flow.uuid
|
|
if flow_id not in self._logbook:
|
|
LOG.debug("No record of %s", flow)
|
|
return ([], ordering)
|
|
flow_details = self._logbook[flow_id]
|
|
ran_already = []
|
|
for r in ordering:
|
|
LOG.debug("Checking if ran %s of %s", r, flow)
|
|
(has_ran, details) = self._get_details(flow_details, r)
|
|
LOG.debug(has_ran)
|
|
if not has_ran:
|
|
# We need to put back the last task we took out since it did
|
|
# not run and therefore needs to, thats why we have this
|
|
# different iterator (which can do this).
|
|
return (ran_already, utils.LastFedIter(r, ordering))
|
|
LOG.debug("Already ran %s", r)
|
|
ran_already.append((r, details.metadata))
|
|
return (ran_already, iter([]))
|