It is not needed to return the atom that was executed from the futures result() method, since we can just as easily set an attribute on the future and reference it from there when using it later. This is also required for a process based executor since it is not typically possible to send back a raw task object (and is not desireable to require this); even if it was possible the task would be pickled and unpickled multiple times so when this happens it can not be guaranteed to even be the same object (in fact it is not). Part of blueprint process-executor Change-Id: I4a05ea5dcdef97218312e3a88ed4a1dfdf1b1edf
51 lines
1.8 KiB
Python
51 lines
1.8 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
# Copyright (C) 2014 Yahoo! Inc. All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
from taskflow.engines.action_engine import executor
|
|
from taskflow.utils import reflection
|
|
|
|
|
|
class Endpoint(object):
|
|
"""Represents a single task with execute/revert methods."""
|
|
|
|
def __init__(self, task_cls):
|
|
self._task_cls = task_cls
|
|
self._task_cls_name = reflection.get_class_name(task_cls)
|
|
self._executor = executor.SerialTaskExecutor()
|
|
|
|
def __str__(self):
|
|
return self._task_cls_name
|
|
|
|
@property
|
|
def name(self):
|
|
return self._task_cls_name
|
|
|
|
def _get_task(self, name=None):
|
|
# NOTE(skudriashev): Note that task is created here with the `name`
|
|
# argument passed to its constructor. This will be a problem when
|
|
# task's constructor requires any other arguments.
|
|
return self._task_cls(name=name)
|
|
|
|
def execute(self, task_name, **kwargs):
|
|
task = self._get_task(task_name)
|
|
event, result = self._executor.execute_task(task, **kwargs).result()
|
|
return result
|
|
|
|
def revert(self, task_name, **kwargs):
|
|
task = self._get_task(task_name)
|
|
event, result = self._executor.revert_task(task, **kwargs).result()
|
|
return result
|