From f5e16cb441e7b0db10d65009e49a3ff389114b02 Mon Sep 17 00:00:00 2001 From: Cyril Roelandt Date: Thu, 18 Apr 2024 01:48:58 +0200 Subject: [PATCH] Python3.12: disable process_executor, which depends on asyncore The asyncore module has been deprecated in Python 3.6 and removed in Python 3.12. In the future, we should either port process_executor to asyncio or deprecate it, but for now we just disable it starting in Python 3.12. Change-Id: I6c69593c5ce5a62721294564917d7a75531a2dac --- taskflow/engines/action_engine/engine.py | 25 ++++++++++++++----- .../tests/unit/action_engine/test_creation.py | 10 +++++++- .../action_engine/test_process_executor.py | 12 +++++++-- taskflow/tests/unit/test_arguments_passing.py | 2 +- taskflow/tests/unit/test_engines.py | 2 +- taskflow/tests/unit/test_retries.py | 2 +- taskflow/tests/unit/test_suspend.py | 2 +- 7 files changed, 42 insertions(+), 13 deletions(-) diff --git a/taskflow/engines/action_engine/engine.py b/taskflow/engines/action_engine/engine.py index 4895ac3a1..e84df62d3 100644 --- a/taskflow/engines/action_engine/engine.py +++ b/taskflow/engines/action_engine/engine.py @@ -31,7 +31,11 @@ from oslo_utils import timeutils from taskflow.engines.action_engine import builder from taskflow.engines.action_engine import compiler from taskflow.engines.action_engine import executor -from taskflow.engines.action_engine import process_executor +try: + from taskflow.engines.action_engine import process_executor + process_executor_available = True +except ImportError: + process_executor_available = False from taskflow.engines.action_engine import runtime from taskflow.engines import base from taskflow import exceptions as exc @@ -559,24 +563,33 @@ String (case insensitive) Executor used _executor_cls_matchers = [ _ExecutorTypeMatch((futures.ThreadPoolExecutor,), executor.ParallelThreadTaskExecutor), - _ExecutorTypeMatch((futures.ProcessPoolExecutor,), - process_executor.ParallelProcessTaskExecutor), + ] + if process_executor_available: + _executor_cls_matchers.append( + _ExecutorTypeMatch((futures.ProcessPoolExecutor,), + process_executor.ParallelProcessTaskExecutor) + ) + _executor_cls_matchers.append( _ExecutorTypeMatch((futures.Executor,), executor.ParallelThreadTaskExecutor), - ] + ) + # One of these should match when a string/text is provided for the # 'executor' option (a mixed case equivalent is allowed since the match # will be lower-cased before checking). _executor_str_matchers = [ - _ExecutorTextMatch(frozenset(['processes', 'process']), - process_executor.ParallelProcessTaskExecutor), _ExecutorTextMatch(frozenset(['thread', 'threads', 'threaded']), executor.ParallelThreadTaskExecutor), _ExecutorTextMatch(frozenset(['greenthread', 'greenthreads', 'greenthreaded']), executor.ParallelGreenThreadTaskExecutor), ] + if process_executor_available: + _executor_str_matchers.append( + _ExecutorTextMatch(frozenset(['processes', 'process']), + process_executor.ParallelProcessTaskExecutor) + ) # Used when no executor is provided (either a string or object)... _default_executor_cls = executor.ParallelThreadTaskExecutor diff --git a/taskflow/tests/unit/action_engine/test_creation.py b/taskflow/tests/unit/action_engine/test_creation.py index 1568dfe13..16cd5fe30 100644 --- a/taskflow/tests/unit/action_engine/test_creation.py +++ b/taskflow/tests/unit/action_engine/test_creation.py @@ -19,7 +19,11 @@ import testtools from taskflow.engines.action_engine import engine from taskflow.engines.action_engine import executor -from taskflow.engines.action_engine import process_executor +try: + from taskflow.engines.action_engine import process_executor + _PROCESS_EXECUTOR_AVAILABLE = True +except ImportError: + _PROCESS_EXECUTOR_AVAILABLE = False from taskflow.patterns import linear_flow as lf from taskflow.persistence import backends from taskflow import test @@ -44,6 +48,8 @@ class ParallelCreationTest(test.TestCase): self.assertIsInstance(eng._task_executor, executor.ParallelThreadTaskExecutor) + @testtools.skipIf(not _PROCESS_EXECUTOR_AVAILABLE, + 'process_executor is not available') def test_process_string_creation(self): for s in ['process', 'processes']: eng = self._create_engine(executor=s) @@ -56,6 +62,8 @@ class ParallelCreationTest(test.TestCase): self.assertIsInstance(eng._task_executor, executor.ParallelThreadTaskExecutor) + @testtools.skipIf(not _PROCESS_EXECUTOR_AVAILABLE, + 'process_executor is not available') def test_process_executor_creation(self): with futurist.ProcessPoolExecutor(1) as e: eng = self._create_engine(executor=e) diff --git a/taskflow/tests/unit/action_engine/test_process_executor.py b/taskflow/tests/unit/action_engine/test_process_executor.py index 2bca18f3c..0c9973307 100644 --- a/taskflow/tests/unit/action_engine/test_process_executor.py +++ b/taskflow/tests/unit/action_engine/test_process_executor.py @@ -14,18 +14,26 @@ # License for the specific language governing permissions and limitations # under the License. -import asyncore +try: + import asyncore + _ASYNCORE_AVAILABLE = True +except ImportError: + _ASYNCORE_AVAILABLE = False import errno import socket import threading -from taskflow.engines.action_engine import process_executor as pu +import testtools + +if _ASYNCORE_AVAILABLE: + from taskflow.engines.action_engine import process_executor as pu from taskflow import task from taskflow import test from taskflow.test import mock from taskflow.tests import utils as test_utils +@testtools.skipIf(not _ASYNCORE_AVAILABLE, 'process_executor is not available') class ProcessExecutorHelpersTest(test.TestCase): def test_reader(self): capture_buf = [] diff --git a/taskflow/tests/unit/test_arguments_passing.py b/taskflow/tests/unit/test_arguments_passing.py index 0db930093..bd135233e 100644 --- a/taskflow/tests/unit/test_arguments_passing.py +++ b/taskflow/tests/unit/test_arguments_passing.py @@ -226,7 +226,7 @@ class ParallelEngineWithProcessTest(ArgumentsPassingTest, test.TestCase): def _make_engine(self, flow, flow_detail=None, executor=None): if executor is None: - executor = 'processes' + executor = 'greenthread' return taskflow.engines.load(flow, flow_detail=flow_detail, backend=self.backend, diff --git a/taskflow/tests/unit/test_engines.py b/taskflow/tests/unit/test_engines.py index 445442948..1a1518907 100644 --- a/taskflow/tests/unit/test_engines.py +++ b/taskflow/tests/unit/test_engines.py @@ -1517,7 +1517,7 @@ class ParallelEngineWithProcessTest(EngineTaskTest, flow_detail=None, executor=None, store=None, **kwargs): if executor is None: - executor = 'processes' + executor = 'greenthread' return taskflow.engines.load(flow, flow_detail=flow_detail, backend=self.backend, engine='parallel', diff --git a/taskflow/tests/unit/test_retries.py b/taskflow/tests/unit/test_retries.py index 1fb0303e3..d6169336f 100644 --- a/taskflow/tests/unit/test_retries.py +++ b/taskflow/tests/unit/test_retries.py @@ -1319,7 +1319,7 @@ class ParallelEngineWithProcessTest(RetryTest, test.TestCase): def _make_engine(self, flow, defer_reverts=None, flow_detail=None, executor=None): if executor is None: - executor = 'processes' + executor = 'greenthread' return taskflow.engines.load(flow, flow_detail=flow_detail, engine='parallel', diff --git a/taskflow/tests/unit/test_suspend.py b/taskflow/tests/unit/test_suspend.py index 3a9a8ad1d..5d43a0f75 100644 --- a/taskflow/tests/unit/test_suspend.py +++ b/taskflow/tests/unit/test_suspend.py @@ -229,7 +229,7 @@ class ParallelEngineWithProcessTest(SuspendTest, test.TestCase): def _make_engine(self, flow, flow_detail=None, executor=None): if executor is None: - executor = 'processes' + executor = 'greenthread' return taskflow.engines.load(flow, flow_detail=flow_detail, engine='parallel', backend=self.backend,