diff --git a/.zuul.yaml b/.zuul.yaml index dce959552..07cdf9d4c 100644 --- a/.zuul.yaml +++ b/.zuul.yaml @@ -38,13 +38,9 @@ - release-notes-jobs-python3 check: jobs: - - openstack-tox-docs: - nodeset: ubuntu-jammy - taskflow-functional-redis - taskflow-functional-etcd gate: jobs: - - openstack-tox-docs: - nodeset: ubuntu-jammy - taskflow-functional-redis - taskflow-functional-etcd diff --git a/doc/source/user/engines.rst b/doc/source/user/engines.rst index ec939e5c4..c537edd66 100644 --- a/doc/source/user/engines.rst +++ b/doc/source/user/engines.rst @@ -449,7 +449,6 @@ Components .. automodule:: taskflow.engines.action_engine.completer .. automodule:: taskflow.engines.action_engine.deciders .. automodule:: taskflow.engines.action_engine.executor -.. automodule:: taskflow.engines.action_engine.process_executor .. automodule:: taskflow.engines.action_engine.runtime .. automodule:: taskflow.engines.action_engine.scheduler .. automodule:: taskflow.engines.action_engine.selector diff --git a/releasenotes/notes/remove-process_executor-f59d40a5dd287cd7.yaml b/releasenotes/notes/remove-process_executor-f59d40a5dd287cd7.yaml new file mode 100644 index 000000000..701d5097f --- /dev/null +++ b/releasenotes/notes/remove-process_executor-f59d40a5dd287cd7.yaml @@ -0,0 +1,4 @@ +--- +upgrade: + - | + Process executor was removed. diff --git a/taskflow/engines/action_engine/engine.py b/taskflow/engines/action_engine/engine.py index b45e02a20..07917b200 100644 --- a/taskflow/engines/action_engine/engine.py +++ b/taskflow/engines/action_engine/engine.py @@ -40,11 +40,6 @@ from taskflow import storage from taskflow.types import failure from taskflow.utils import misc -try: - from taskflow.engines.action_engine import process_executor -except ImportError: - process_executor = None - LOG = logging.getLogger(__name__) @@ -548,7 +543,6 @@ String (case insensitive) Executor used polling while a higher number will involve less polling but a slower time for an engine to notice a task has completed. - .. |pe| replace:: process_executor .. |cfp| replace:: concurrent.futures.process .. |cft| replace:: concurrent.futures.thread .. |cf| replace:: concurrent.futures @@ -563,16 +557,9 @@ String (case insensitive) Executor used _executor_cls_matchers = [ _ExecutorTypeMatch((futures.ThreadPoolExecutor,), executor.ParallelThreadTaskExecutor), - ] - if process_executor is not None: - _executor_cls_matchers.append( - _ExecutorTypeMatch((futures.ProcessPoolExecutor,), - process_executor.ParallelProcessTaskExecutor) - ) - _executor_cls_matchers.append( _ExecutorTypeMatch((futures.Executor,), executor.ParallelThreadTaskExecutor), - ) + ] # One of these should match when a string/text is provided for the # 'executor' option (a mixed case equivalent is allowed since the match @@ -584,11 +571,6 @@ String (case insensitive) Executor used 'greenthreaded']), executor.ParallelGreenThreadTaskExecutor), ] - if process_executor is not None: - _executor_str_matchers.append( - _ExecutorTextMatch(frozenset(['processes', 'process']), - process_executor.ParallelProcessTaskExecutor) - ) # Used when no executor is provided (either a string or object)... _default_executor_cls = executor.ParallelThreadTaskExecutor diff --git a/taskflow/engines/action_engine/process_executor.py b/taskflow/engines/action_engine/process_executor.py deleted file mode 100644 index 1d2dc37ec..000000000 --- a/taskflow/engines/action_engine/process_executor.py +++ /dev/null @@ -1,720 +0,0 @@ -# -*- coding: utf-8 -*- - -# Copyright (C) 2015 Yahoo! Inc. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import asyncore -import binascii -import collections -import errno -import functools -import hashlib -import hmac -import math -import os -import pickle -import socket -import struct -import time - -import futurist -from oslo_utils import excutils - -from taskflow.engines.action_engine import executor as base -from taskflow import logging -from taskflow import task as ta -from taskflow.types import notifier as nt -from taskflow.utils import iter_utils -from taskflow.utils import misc -from taskflow.utils import schema_utils as su -from taskflow.utils import threading_utils - -LOG = logging.getLogger(__name__) - -# Internal parent <-> child process protocol schema, message constants... -MAGIC_HEADER = 0xDECAF -CHALLENGE = 'identify_yourself' -CHALLENGE_RESPONSE = 'worker_reporting_in' -ACK = 'ack' -EVENT = 'event' -SCHEMAS = { - # Basic jsonschemas for verifying that the data we get back and - # forth from parent <-> child observes at least a basic expected - # format. - CHALLENGE: { - "type": "string", - "minLength": 1, - }, - ACK: { - "type": "string", - "minLength": 1, - }, - CHALLENGE_RESPONSE: { - "type": "string", - "minLength": 1, - }, - EVENT: { - "type": "object", - "properties": { - 'event_type': { - "type": "string", - }, - 'sent_on': { - "type": "number", - }, - }, - "required": ['event_type', 'sent_on'], - "additionalProperties": True, - }, -} - - -class UnknownSender(Exception): - """Exception raised when message from unknown sender is recvd.""" - - -class ChallengeIgnored(Exception): - """Exception raised when challenge has not been responded to.""" - - -class Reader(object): - """Reader machine that streams & parses messages that it then dispatches. - - TODO(harlowja): Use python-suitcase in the future when the following - are addressed/resolved and released: - - - https://github.com/digidotcom/python-suitcase/issues/28 - - https://github.com/digidotcom/python-suitcase/issues/29 - - Binary format format is the following (no newlines in actual format):: - - (4 bytes) - (4 bytes) - (1 or more variable bytes) - (4 bytes) - (1 or more variable bytes) - (4 bytes) - (1 or more variable bytes) - """ - - #: Per state memory initializers. - _INITIALIZERS = { - 'magic_header_left': 4, - 'mac_header_left': 4, - 'identity_header_left': 4, - 'msg_header_left': 4, - } - - #: Linear steps/transitions (order matters here). - _TRANSITIONS = tuple([ - 'magic_header_left', - 'mac_header_left', - 'mac_left', - 'identity_header_left', - 'identity_left', - 'msg_header_left', - 'msg_left', - ]) - - def __init__(self, auth_key, dispatch_func, msg_limit=-1): - if not callable(dispatch_func): - raise ValueError("Expected provided dispatch function" - " to be callable") - self.auth_key = auth_key - self.dispatch_func = dispatch_func - msg_limiter = iter_utils.iter_forever(msg_limit) - self.msg_count = next(msg_limiter) - self._msg_limiter = msg_limiter - self._buffer = misc.BytesIO() - self._state = None - # Local machine variables and such are stored in here. - self._memory = {} - self._transitions = collections.deque(self._TRANSITIONS) - # This is the per state callback handler set. The first entry reads - # the data and the second entry is called after reading is completed, - # typically to save that data into object memory, or to validate - # it. - self._handlers = { - 'magic_header_left': (self._read_field_data, - self._save_and_validate_magic), - 'mac_header_left': (self._read_field_data, - functools.partial(self._save_pos_integer, - 'mac_left')), - 'mac_left': (functools.partial(self._read_data, 'mac'), - functools.partial(self._save_data, 'mac')), - 'identity_header_left': (self._read_field_data, - functools.partial(self._save_pos_integer, - 'identity_left')), - 'identity_left': (functools.partial(self._read_data, 'identity'), - functools.partial(self._save_data, 'identity')), - 'msg_header_left': (self._read_field_data, - functools.partial(self._save_pos_integer, - 'msg_left')), - 'msg_left': (functools.partial(self._read_data, 'msg'), - self._dispatch_and_reset), - } - # Force transition into first state... - self._transition() - - def _save_pos_integer(self, key_name, data): - key_val = struct.unpack("!i", data)[0] - if key_val <= 0: - raise IOError("Invalid %s length received for key '%s', expected" - " greater than zero length" % (key_val, key_name)) - self._memory[key_name] = key_val - return True - - def _save_data(self, key_name, data): - self._memory[key_name] = data - return True - - def _dispatch_and_reset(self, data): - self.dispatch_func( - self._memory['identity'], - # Lazy evaluate so the message can be thrown out as needed - # (instead of the receiver discarding it after the fact)... - functools.partial(_decode_message, self.auth_key, data, - self._memory['mac'])) - self.msg_count = next(self._msg_limiter) - self._memory.clear() - - def _transition(self): - try: - self._state = self._transitions.popleft() - except IndexError: - self._transitions.extend(self._TRANSITIONS) - self._state = self._transitions.popleft() - try: - self._memory[self._state] = self._INITIALIZERS[self._state] - except KeyError: - pass - self._handle_func, self._post_handle_func = self._handlers[self._state] - - def _save_and_validate_magic(self, data): - magic_header = struct.unpack("!i", data)[0] - if magic_header != MAGIC_HEADER: - raise IOError("Invalid magic header received, expected 0x%x but" - " got 0x%x for message %s" % (MAGIC_HEADER, - magic_header, - self.msg_count + 1)) - self._memory['magic'] = magic_header - return True - - def _read_data(self, save_key_name, data): - data_len_left = self._memory[self._state] - self._buffer.write(data[0:data_len_left]) - if len(data) < data_len_left: - data_len_left -= len(data) - self._memory[self._state] = data_len_left - return '' - else: - self._memory[self._state] = 0 - buf_data = self._buffer.getvalue() - self._buffer.reset() - self._post_handle_func(buf_data) - self._transition() - return data[data_len_left:] - - def _read_field_data(self, data): - return self._read_data(self._state, data) - - @property - def bytes_needed(self): - return self._memory.get(self._state, 0) - - def feed(self, data): - while len(data): - data = self._handle_func(data) - - -class BadHmacValueError(ValueError): - """Value error raised when an invalid hmac is discovered.""" - - -def _create_random_string(desired_length): - if desired_length <= 0: - return b'' - data_length = int(math.ceil(desired_length / 2.0)) - data = os.urandom(data_length) - hex_data = binascii.hexlify(data) - return hex_data[0:desired_length] - - -def _calculate_hmac(auth_key, body): - mac = hmac.new(auth_key, body, hashlib.md5).hexdigest() - if isinstance(mac, str): - mac = mac.encode("ascii") - return mac - - -def _encode_message(auth_key, message, identity, reverse=False): - message = pickle.dumps(message, 2) - message_mac = _calculate_hmac(auth_key, message) - pieces = [ - struct.pack("!i", MAGIC_HEADER), - struct.pack("!i", len(message_mac)), - message_mac, - struct.pack("!i", len(identity)), - identity, - struct.pack("!i", len(message)), - message, - ] - if reverse: - pieces.reverse() - return tuple(pieces) - - -def _decode_message(auth_key, message, message_mac): - tmp_message_mac = _calculate_hmac(auth_key, message) - if tmp_message_mac != message_mac: - raise BadHmacValueError('Invalid message hmac') - return pickle.loads(message) - - -class Channel(object): - """Object that workers use to communicate back to their creator.""" - - def __init__(self, port, identity, auth_key): - self.identity = identity - self.port = port - self.auth_key = auth_key - self.dead = False - self._sent = self._received = 0 - self._socket = None - self._read_pipe = None - self._write_pipe = None - - def close(self): - if self._socket is not None: - self._socket.close() - self._socket = None - self._read_pipe = None - self._write_pipe = None - - def _ensure_connected(self): - if self._socket is None: - s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - s.setblocking(1) - try: - s.connect(("", self.port)) - except socket.error as e: - with excutils.save_and_reraise_exception(): - s.close() - if e.errno in (errno.ECONNREFUSED, errno.ENOTCONN, - errno.ECONNRESET): - # Don't bother with further connections... - self.dead = True - read_pipe = s.makefile("rb", 0) - write_pipe = s.makefile("wb", 0) - try: - msg = self._do_recv(read_pipe=read_pipe) - su.schema_validate(msg, SCHEMAS[CHALLENGE]) - if msg != CHALLENGE: - raise IOError("Challenge expected not received") - else: - pieces = _encode_message(self.auth_key, - CHALLENGE_RESPONSE, - self.identity) - self._do_send_and_ack(pieces, write_pipe=write_pipe, - read_pipe=read_pipe) - except Exception: - with excutils.save_and_reraise_exception(): - s.close() - else: - self._socket = s - self._read_pipe = read_pipe - self._write_pipe = write_pipe - - def recv(self): - self._ensure_connected() - return self._do_recv() - - def _do_recv(self, read_pipe=None): - if read_pipe is None: - read_pipe = self._read_pipe - msg_capture = collections.deque(maxlen=1) - msg_capture_func = (lambda _from_who, msg_decoder_func: - msg_capture.append(msg_decoder_func())) - reader = Reader(self.auth_key, msg_capture_func, msg_limit=1) - try: - maybe_msg_num = self._received + 1 - bytes_needed = reader.bytes_needed - while True: - blob = read_pipe.read(bytes_needed) - if len(blob) != bytes_needed: - raise EOFError("Read pipe closed while reading %s" - " bytes for potential message %s" - % (bytes_needed, maybe_msg_num)) - reader.feed(blob) - bytes_needed = reader.bytes_needed - except StopIteration: - pass - msg = msg_capture[0] - self._received += 1 - return msg - - def _do_send(self, pieces, write_pipe=None): - if write_pipe is None: - write_pipe = self._write_pipe - for piece in pieces: - write_pipe.write(piece) - write_pipe.flush() - - def _do_send_and_ack(self, pieces, write_pipe=None, read_pipe=None): - self._do_send(pieces, write_pipe=write_pipe) - self._sent += 1 - msg = self._do_recv(read_pipe=read_pipe) - su.schema_validate(msg, SCHEMAS[ACK]) - if msg != ACK: - raise IOError("Failed receiving ack for sent" - " message %s" % self._metrics['sent']) - - def send(self, message): - self._ensure_connected() - self._do_send_and_ack(_encode_message(self.auth_key, message, - self.identity)) - - -class EventSender(object): - """Sends event information from a child worker process to its creator.""" - - def __init__(self, channel): - self._channel = channel - self._pid = None - - def __call__(self, event_type, details): - if not self._channel.dead: - if self._pid is None: - self._pid = os.getpid() - message = { - 'event_type': event_type, - 'details': details, - 'sent_on': time.time(), - } - LOG.trace("Sending %s (from child %s)", message, self._pid) - self._channel.send(message) - - -class DispatcherHandler(asyncore.dispatcher): - """Dispatches from a single connection into a target.""" - - #: Read/write chunk size. - CHUNK_SIZE = 8192 - - def __init__(self, sock, addr, dispatcher): - super(DispatcherHandler, self).__init__(map=dispatcher.map, - sock=sock) - self.blobs_to_write = list(dispatcher.challenge_pieces) - self.reader = Reader(dispatcher.auth_key, self._dispatch) - self.targets = dispatcher.targets - self.tied_to = None - self.challenge_responded = False - self.ack_pieces = _encode_message(dispatcher.auth_key, ACK, - dispatcher.identity, - reverse=True) - self.addr = addr - - def handle_close(self): - self.close() - - def writable(self): - return bool(self.blobs_to_write) - - def handle_write(self): - try: - blob = self.blobs_to_write.pop() - except IndexError: - pass - else: - sent = self.send(blob[0:self.CHUNK_SIZE]) - if sent < len(blob): - self.blobs_to_write.append(blob[sent:]) - - def _send_ack(self): - self.blobs_to_write.extend(self.ack_pieces) - - def _dispatch(self, from_who, msg_decoder_func): - if not self.challenge_responded: - msg = msg_decoder_func() - su.schema_validate(msg, SCHEMAS[CHALLENGE_RESPONSE]) - if msg != CHALLENGE_RESPONSE: - raise ChallengeIgnored("Discarding connection from %s" - " challenge was not responded to" - % self.addr) - else: - LOG.trace("Peer %s (%s) has passed challenge sequence", - self.addr, from_who) - self.challenge_responded = True - self.tied_to = from_who - self._send_ack() - else: - if self.tied_to != from_who: - raise UnknownSender("Sender %s previously identified as %s" - " changed there identity to %s after" - " challenge sequence" % (self.addr, - self.tied_to, - from_who)) - try: - task = self.targets[from_who] - except KeyError: - raise UnknownSender("Unknown message from %s (%s) not matched" - " to any known target" % (self.addr, - from_who)) - msg = msg_decoder_func() - su.schema_validate(msg, SCHEMAS[EVENT]) - if LOG.isEnabledFor(logging.TRACE): - msg_delay = max(0, time.time() - msg['sent_on']) - LOG.trace("Dispatching message from %s (%s) (it took %0.3f" - " seconds for it to arrive for processing after" - " being sent)", self.addr, from_who, msg_delay) - task.notifier.notify(msg['event_type'], msg.get('details')) - self._send_ack() - - def handle_read(self): - data = self.recv(self.CHUNK_SIZE) - if len(data) == 0: - self.handle_close() - else: - try: - self.reader.feed(data) - except (IOError, UnknownSender): - LOG.warning("Invalid received message", exc_info=True) - self.handle_close() - except (pickle.PickleError, TypeError): - LOG.warning("Badly formatted message", exc_info=True) - self.handle_close() - except (ValueError, su.ValidationError): - LOG.warning("Failed validating message", exc_info=True) - self.handle_close() - except ChallengeIgnored: - LOG.warning("Failed challenge sequence", exc_info=True) - self.handle_close() - - -class Dispatcher(asyncore.dispatcher): - """Accepts messages received from child worker processes.""" - - #: See https://docs.python.org/2/library/socket.html#socket.socket.listen - MAX_BACKLOG = 5 - - def __init__(self, map, auth_key, identity): - super(Dispatcher, self).__init__(map=map) - self.identity = identity - self.challenge_pieces = _encode_message(auth_key, CHALLENGE, - identity, reverse=True) - self.auth_key = auth_key - self.targets = {} - - @property - def port(self): - if self.socket is not None: - return self.socket.getsockname()[1] - else: - return None - - def setup(self): - self.targets.clear() - self.create_socket(socket.AF_INET, socket.SOCK_STREAM) - self.bind(("", 0)) - LOG.trace("Accepting dispatch requests on port %s", self.port) - self.listen(self.MAX_BACKLOG) - - def writable(self): - return False - - @property - def map(self): - return self._map - - def handle_close(self): - if self.socket is not None: - self.close() - - def handle_accept(self): - pair = self.accept() - if pair is not None: - sock, addr = pair - addr = "%s:%s" % (addr[0], addr[1]) - LOG.trace("Potentially accepted new connection from %s", addr) - DispatcherHandler(sock, addr, self) - - -class ParallelProcessTaskExecutor(base.ParallelTaskExecutor): - """Executes tasks in parallel using a process pool executor. - - NOTE(harlowja): this executor executes tasks in external processes, so that - implies that tasks that are sent to that external process are pickleable - since this is how the multiprocessing works (sending pickled objects back - and forth) and that the bound handlers (for progress updating in - particular) are proxied correctly from that external process to the one - that is alive in the parent process to ensure that callbacks registered in - the parent are executed on events in the child. - """ - - #: Default timeout used by asyncore io loop (and eventually select/poll). - WAIT_TIMEOUT = 0.01 - - constructor_options = [ - ('max_workers', lambda v: v if v is None else int(v)), - ('wait_timeout', lambda v: v if v is None else float(v)), - ] - """ - Optional constructor keyword arguments this executor supports. These will - typically be passed via engine options (by a engine user) and converted - into the correct type before being sent into this - classes ``__init__`` method. - """ - - def __init__(self, executor=None, - max_workers=None, wait_timeout=None): - super(ParallelProcessTaskExecutor, self).__init__( - executor=executor, max_workers=max_workers) - LOG.warning('Process task executor is deprecated. It is now disabled ' - 'in Python 3.12 or later and will be removed.') - self._auth_key = _create_random_string(32) - self._dispatcher = Dispatcher({}, self._auth_key, - _create_random_string(32)) - if wait_timeout is None: - self._wait_timeout = self.WAIT_TIMEOUT - else: - if wait_timeout <= 0: - raise ValueError("Provided wait timeout must be greater" - " than zero and not '%s'" % wait_timeout) - self._wait_timeout = wait_timeout - # Only created after starting... - self._worker = None - - def _create_executor(self, max_workers=None): - return futurist.ProcessPoolExecutor(max_workers=max_workers) - - def start(self): - if threading_utils.is_alive(self._worker): - raise RuntimeError("Worker thread must be stopped via stop()" - " before starting/restarting") - super(ParallelProcessTaskExecutor, self).start() - self._dispatcher.setup() - self._worker = threading_utils.daemon_thread( - asyncore.loop, map=self._dispatcher.map, - timeout=self._wait_timeout) - self._worker.start() - - def stop(self): - super(ParallelProcessTaskExecutor, self).stop() - self._dispatcher.close() - if threading_utils.is_alive(self._worker): - self._worker.join() - self._worker = None - - def _submit_task(self, func, task, *args, **kwargs): - """Submit a function to run the given task (with given args/kwargs). - - NOTE(harlowja): Adjust all events to be proxies instead since we want - those callbacks to be activated in this process, not in the child, - also since typically callbacks are functors (or callables) we can - not pickle those in the first place... - - To make sure people understand how this works, the following is a - lengthy description of what is going on here, read at will: - - So to ensure that we are proxying task triggered events that occur - in the executed subprocess (which will be created and used by the - thing using the multiprocessing based executor) we need to establish - a link between that process and this process that ensures that when a - event is triggered in that task in that process that a corresponding - event is triggered on the original task that was requested to be ran - in this process. - - To accomplish this we have to create a copy of the task (without - any listeners) and then reattach a new set of listeners that will - now instead of calling the desired listeners just place messages - for this process (a dispatcher thread that is created in this class) - to dispatch to the original task (using a common accepting socket and - per task sender socket that is used and associated to know - which task to proxy back too, since it is possible that there many - be *many* subprocess running at the same time). - - Once the subprocess task has finished execution, the executor will - then trigger a callback that will remove the task + target from the - dispatcher (which will stop any further proxying back to the original - task). - """ - progress_callback = kwargs.pop('progress_callback', None) - clone = task.copy(retain_listeners=False) - identity = _create_random_string(32) - channel = Channel(self._dispatcher.port, identity, self._auth_key) - - def rebind_task(): - # Creates and binds proxies for all events the task could receive - # so that when the clone runs in another process that this task - # can receive the same notifications (thus making it look like the - # the notifications are transparently happening in this process). - proxy_event_types = set() - for (event_type, listeners) in task.notifier.listeners_iter(): - if listeners: - proxy_event_types.add(event_type) - if progress_callback is not None: - proxy_event_types.add(ta.EVENT_UPDATE_PROGRESS) - if nt.Notifier.ANY in proxy_event_types: - # NOTE(harlowja): If ANY is present, just have it be - # the **only** event registered, as all other events will be - # sent if ANY is registered (due to the nature of ANY sending - # all the things); if we also include the other event types - # in this set if ANY is present we will receive duplicate - # messages in this process (the one where the local - # task callbacks are being triggered). For example the - # emissions of the tasks notifier (that is running out - # of process) will for specific events send messages for - # its ANY event type **and** the specific event - # type (2 messages, when we just want one) which will - # cause > 1 notify() call on the local tasks notifier, which - # causes more local callback triggering than we want - # to actually happen. - proxy_event_types = set([nt.Notifier.ANY]) - if proxy_event_types: - # This sender acts as our forwarding proxy target, it - # will be sent pickled to the process that will execute - # the needed task and it will do the work of using the - # channel object to send back messages to this process for - # dispatch into the local task. - sender = EventSender(channel) - for event_type in proxy_event_types: - clone.notifier.register(event_type, sender) - return bool(proxy_event_types) - - def register(): - if progress_callback is not None: - task.notifier.register(ta.EVENT_UPDATE_PROGRESS, - progress_callback) - self._dispatcher.targets[identity] = task - - def deregister(fut=None): - if progress_callback is not None: - task.notifier.deregister(ta.EVENT_UPDATE_PROGRESS, - progress_callback) - self._dispatcher.targets.pop(identity, None) - - should_register = rebind_task() - if should_register: - register() - try: - fut = self._executor.submit(func, clone, *args, **kwargs) - except RuntimeError: - with excutils.save_and_reraise_exception(): - if should_register: - deregister() - - fut.atom = task - if should_register: - fut.add_done_callback(deregister) - return fut diff --git a/taskflow/tests/unit/action_engine/test_creation.py b/taskflow/tests/unit/action_engine/test_creation.py index e9c1d24bc..a7d7f6898 100644 --- a/taskflow/tests/unit/action_engine/test_creation.py +++ b/taskflow/tests/unit/action_engine/test_creation.py @@ -26,11 +26,6 @@ from taskflow.tests import utils from taskflow.utils import eventlet_utils as eu from taskflow.utils import persistence_utils as pu -try: - from taskflow.engines.action_engine import process_executor as pe -except ImportError: - pe = None - class ParallelCreationTest(test.TestCase): @staticmethod @@ -48,26 +43,12 @@ class ParallelCreationTest(test.TestCase): self.assertIsInstance(eng._task_executor, executor.ParallelThreadTaskExecutor) - @testtools.skipIf(pe is None, 'process_executor is not available') - def test_process_string_creation(self): - for s in ['process', 'processes']: - eng = self._create_engine(executor=s) - self.assertIsInstance(eng._task_executor, - pe.ParallelProcessTaskExecutor) - def test_thread_executor_creation(self): with futurist.ThreadPoolExecutor(1) as e: eng = self._create_engine(executor=e) self.assertIsInstance(eng._task_executor, executor.ParallelThreadTaskExecutor) - @testtools.skipIf(pe is None, 'process_executor is not available') - def test_process_executor_creation(self): - with futurist.ProcessPoolExecutor(1) as e: - eng = self._create_engine(executor=e) - self.assertIsInstance(eng._task_executor, - pe.ParallelProcessTaskExecutor) - @testtools.skipIf(not eu.EVENTLET_AVAILABLE, 'eventlet is not available') def test_green_executor_creation(self): with futurist.GreenThreadPoolExecutor(1) as e: diff --git a/taskflow/tests/unit/action_engine/test_process_executor.py b/taskflow/tests/unit/action_engine/test_process_executor.py deleted file mode 100644 index f882ab938..000000000 --- a/taskflow/tests/unit/action_engine/test_process_executor.py +++ /dev/null @@ -1,106 +0,0 @@ -# -*- coding: utf-8 -*- - -# Copyright (C) 2015 Yahoo! Inc. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -import errno -import socket -import threading - -import testtools - -from taskflow import task -from taskflow import test -from taskflow.test import mock -from taskflow.tests import utils as test_utils - -try: - import asyncore - from taskflow.engines.action_engine import process_executor as pe -except ImportError: - asyncore = None - pe = None - - -@testtools.skipIf(asyncore is None, 'process_executor is not available') -class ProcessExecutorHelpersTest(test.TestCase): - def test_reader(self): - capture_buf = [] - - def do_capture(identity, message_capture_func): - capture_buf.append(message_capture_func()) - - r = pe.Reader(b"secret", do_capture) - for data in pe._encode_message(b"secret", ['hi'], b'me'): - self.assertEqual(len(data), r.bytes_needed) - r.feed(data) - - self.assertEqual(1, len(capture_buf)) - self.assertEqual(['hi'], capture_buf[0]) - - def test_bad_hmac_reader(self): - r = pe.Reader(b"secret-2", lambda ident, capture_func: capture_func()) - in_data = b"".join(pe._encode_message(b"secret", ['hi'], b'me')) - self.assertRaises(pe.BadHmacValueError, r.feed, in_data) - - @mock.patch("socket.socket") - def test_no_connect_channel(self, mock_socket_factory): - mock_sock = mock.MagicMock() - mock_socket_factory.return_value = mock_sock - mock_sock.connect.side_effect = socket.error(errno.ECONNREFUSED, - 'broken') - c = pe.Channel(2222, b"me", b"secret") - self.assertRaises(socket.error, c.send, "hi") - self.assertTrue(c.dead) - self.assertTrue(mock_sock.close.called) - - def test_send_and_dispatch(self): - details_capture = [] - - t = test_utils.DummyTask("rcver") - t.notifier.register( - task.EVENT_UPDATE_PROGRESS, - lambda _event_type, details: details_capture.append(details)) - - d = pe.Dispatcher({}, b'secret', b'server-josh') - d.setup() - d.targets[b'child-josh'] = t - - s = threading.Thread(target=asyncore.loop, kwargs={'map': d.map}) - s.start() - self.addCleanup(s.join) - - c = pe.Channel(d.port, b'child-josh', b'secret') - self.addCleanup(c.close) - - send_what = [ - {'progress': 0.1}, - {'progress': 0.2}, - {'progress': 0.3}, - {'progress': 0.4}, - {'progress': 0.5}, - {'progress': 0.6}, - {'progress': 0.7}, - {'progress': 0.8}, - {'progress': 0.9}, - ] - e_s = pe.EventSender(c) - for details in send_what: - e_s(task.EVENT_UPDATE_PROGRESS, details) - - # This forces the thread to shutdown (since the asyncore loop - # will exit when no more sockets exist to process...) - d.close() - - self.assertEqual(len(send_what), len(details_capture)) - self.assertEqual(send_what, details_capture) diff --git a/taskflow/tests/unit/test_arguments_passing.py b/taskflow/tests/unit/test_arguments_passing.py index 19260131d..697408ab3 100644 --- a/taskflow/tests/unit/test_arguments_passing.py +++ b/taskflow/tests/unit/test_arguments_passing.py @@ -23,11 +23,6 @@ from taskflow import test from taskflow.tests import utils from taskflow.utils import eventlet_utils as eu -try: - from taskflow.engines.action_engine import process_executor as pe -except ImportError: - pe = None - class ArgumentsPassingTest(utils.EngineTestBase): @@ -224,18 +219,3 @@ class ParallelEngineWithEventletTest(ArgumentsPassingTest, test.TestCase): backend=self.backend, engine='parallel', executor=executor) - - -@testtools.skipIf(pe is None, 'process_executor is not available') -class ParallelEngineWithProcessTest(ArgumentsPassingTest, test.TestCase): - _EXECUTOR_WORKERS = 2 - - def _make_engine(self, flow, flow_detail=None, executor=None): - if executor is None: - executor = 'processes' - return taskflow.engines.load(flow, - flow_detail=flow_detail, - backend=self.backend, - engine='parallel', - executor=executor, - max_workers=self._EXECUTOR_WORKERS) diff --git a/taskflow/tests/unit/test_engines.py b/taskflow/tests/unit/test_engines.py index a8d877893..b4bb7d620 100644 --- a/taskflow/tests/unit/test_engines.py +++ b/taskflow/tests/unit/test_engines.py @@ -41,11 +41,6 @@ from taskflow.utils import eventlet_utils as eu from taskflow.utils import persistence_utils as p_utils from taskflow.utils import threading_utils as tu -try: - from taskflow.engines.action_engine import process_executor as pe -except ImportError: - pe = None - # Expected engine transitions when empty workflows are ran... _EMPTY_TRANSITIONS = [ @@ -1499,82 +1494,6 @@ class ParallelEngineWithEventletTest(EngineTaskTest, store=store, **kwargs) -@testtools.skipIf(pe is None, 'process_executor is not available') -class ParallelEngineWithProcessTest(EngineTaskTest, - EngineMultipleResultsTest, - EngineLinearFlowTest, - EngineParallelFlowTest, - EngineLinearAndUnorderedExceptionsTest, - EngineOptionalRequirementsTest, - EngineGraphFlowTest, - EngineResetTests, - EngineMissingDepsTest, - EngineGraphConditionalFlowTest, - EngineDeciderDepthTest, - EngineTaskNotificationsTest, - test.TestCase): - _EXECUTOR_WORKERS = 2 - - def test_correct_load(self): - engine = self._make_engine(utils.TaskNoRequiresNoReturns) - self.assertIsInstance(engine, eng.ParallelActionEngine) - - def _make_engine(self, flow, - flow_detail=None, executor=None, store=None, - **kwargs): - if executor is None: - executor = 'processes' - return taskflow.engines.load(flow, flow_detail=flow_detail, - backend=self.backend, - engine='parallel', - executor=executor, - store=store, - max_workers=self._EXECUTOR_WORKERS, - **kwargs) - - def test_update_progress_notifications_proxied(self): - captured = collections.defaultdict(list) - - def notify_me(event_type, details): - captured[event_type].append(details) - - a = utils.MultiProgressingTask('a') - a.notifier.register(a.notifier.ANY, notify_me) - progress_chunks = list(x / 10.0 for x in range(1, 10)) - e = self._make_engine(a, store={'progress_chunks': progress_chunks}) - e.run() - - self.assertEqual(11, len(captured[task.EVENT_UPDATE_PROGRESS])) - - def test_custom_notifications_proxied(self): - captured = collections.defaultdict(list) - - def notify_me(event_type, details): - captured[event_type].append(details) - - a = utils.EmittingTask('a') - a.notifier.register(a.notifier.ANY, notify_me) - e = self._make_engine(a) - e.run() - - self.assertEqual(1, len(captured['hi'])) - self.assertEqual(2, len(captured[task.EVENT_UPDATE_PROGRESS])) - - def test_just_custom_notifications_proxied(self): - captured = collections.defaultdict(list) - - def notify_me(event_type, details): - captured[event_type].append(details) - - a = utils.EmittingTask('a') - a.notifier.register('hi', notify_me) - e = self._make_engine(a) - e.run() - - self.assertEqual(1, len(captured['hi'])) - self.assertEqual(0, len(captured[task.EVENT_UPDATE_PROGRESS])) - - class WorkerBasedEngineTest(EngineTaskTest, EngineMultipleResultsTest, EngineLinearFlowTest, diff --git a/taskflow/tests/unit/test_retries.py b/taskflow/tests/unit/test_retries.py index a8e56a050..a8814a63c 100644 --- a/taskflow/tests/unit/test_retries.py +++ b/taskflow/tests/unit/test_retries.py @@ -30,11 +30,6 @@ from taskflow.tests import utils from taskflow.types import failure from taskflow.utils import eventlet_utils as eu -try: - from taskflow.engines.action_engine import process_executor as pe -except ImportError: - pe = None - class FailingRetry(retry.Retry): @@ -1368,20 +1363,3 @@ class ParallelEngineWithEventletTest(RetryTest, test.TestCase): engine='parallel', executor=executor, defer_reverts=defer_reverts) - - -@testtools.skipIf(pe is None, 'process_executor is not available') -class ParallelEngineWithProcessTest(RetryTest, test.TestCase): - _EXECUTOR_WORKERS = 2 - - def _make_engine(self, flow, defer_reverts=None, flow_detail=None, - executor=None): - if executor is None: - executor = 'processes' - return taskflow.engines.load(flow, - flow_detail=flow_detail, - engine='parallel', - backend=self.backend, - executor=executor, - max_workers=self._EXECUTOR_WORKERS, - defer_reverts=defer_reverts) diff --git a/taskflow/tests/unit/test_suspend.py b/taskflow/tests/unit/test_suspend.py index c095e3834..1c1c1d7ef 100644 --- a/taskflow/tests/unit/test_suspend.py +++ b/taskflow/tests/unit/test_suspend.py @@ -25,11 +25,6 @@ from taskflow import test from taskflow.tests import utils from taskflow.utils import eventlet_utils as eu -try: - from taskflow.engines.action_engine import process_executor as pe -except ImportError: - pe = None - class SuspendingListener(utils.CaptureListener): @@ -227,17 +222,3 @@ class ParallelEngineWithEventletTest(SuspendTest, test.TestCase): return taskflow.engines.load(flow, flow_detail=flow_detail, backend=self.backend, engine='parallel', executor=executor) - - -@testtools.skipIf(pe is None, 'process_executor is not available') -class ParallelEngineWithProcessTest(SuspendTest, test.TestCase): - _EXECUTOR_WORKERS = 2 - - def _make_engine(self, flow, flow_detail=None, executor=None): - if executor is None: - executor = 'processes' - return taskflow.engines.load(flow, flow_detail=flow_detail, - engine='parallel', - backend=self.backend, - executor=executor, - max_workers=self._EXECUTOR_WORKERS)