From 78d39c49aab39fc823c86ef93bc696968e12f233 Mon Sep 17 00:00:00 2001 From: Cyril Roelandt Date: Thu, 18 Apr 2024 01:48:58 +0200 Subject: [PATCH] Python3.12: disable process_executor, which depends on asyncore The asyncore module has been deprecated in Python 3.6 and removed in Python 3.12. In the future, we should either port process_executor to asyncio or deprecate it, but for now we just disable it starting in Python 3.12. Closes-Bug: #2026183 Change-Id: I6c69593c5ce5a62721294564917d7a75531a2dac --- ..._executor-python-312-d1074c816bc8303e.yaml | 6 ++++ taskflow/engines/action_engine/engine.py | 24 ++++++++++---- .../engines/action_engine/process_executor.py | 2 ++ .../tests/unit/action_engine/test_creation.py | 12 +++++-- .../action_engine/test_process_executor.py | 31 ++++++++++++------- taskflow/tests/unit/test_arguments_passing.py | 6 ++++ taskflow/tests/unit/test_engines.py | 6 ++++ taskflow/tests/unit/test_retries.py | 6 ++++ taskflow/tests/unit/test_suspend.py | 6 ++++ 9 files changed, 78 insertions(+), 21 deletions(-) create mode 100644 releasenotes/notes/disable-process_executor-python-312-d1074c816bc8303e.yaml diff --git a/releasenotes/notes/disable-process_executor-python-312-d1074c816bc8303e.yaml b/releasenotes/notes/disable-process_executor-python-312-d1074c816bc8303e.yaml new file mode 100644 index 000000000..8731a7dbc --- /dev/null +++ b/releasenotes/notes/disable-process_executor-python-312-d1074c816bc8303e.yaml @@ -0,0 +1,6 @@ +--- +deprecations: + - | + The process_executor module has been deprecated, starting with Python 3.12. + It is still available in older versions of Python. There is no replacement + for it. diff --git a/taskflow/engines/action_engine/engine.py b/taskflow/engines/action_engine/engine.py index 4895ac3a1..b45e02a20 100644 --- a/taskflow/engines/action_engine/engine.py +++ b/taskflow/engines/action_engine/engine.py @@ -31,7 +31,6 @@ from oslo_utils import timeutils from taskflow.engines.action_engine import builder from taskflow.engines.action_engine import compiler from taskflow.engines.action_engine import executor -from taskflow.engines.action_engine import process_executor from taskflow.engines.action_engine import runtime from taskflow.engines import base from taskflow import exceptions as exc @@ -41,6 +40,11 @@ from taskflow import storage from taskflow.types import failure from taskflow.utils import misc +try: + from taskflow.engines.action_engine import process_executor +except ImportError: + process_executor = None + LOG = logging.getLogger(__name__) @@ -559,24 +563,32 @@ String (case insensitive) Executor used _executor_cls_matchers = [ _ExecutorTypeMatch((futures.ThreadPoolExecutor,), executor.ParallelThreadTaskExecutor), - _ExecutorTypeMatch((futures.ProcessPoolExecutor,), - process_executor.ParallelProcessTaskExecutor), + ] + if process_executor is not None: + _executor_cls_matchers.append( + _ExecutorTypeMatch((futures.ProcessPoolExecutor,), + process_executor.ParallelProcessTaskExecutor) + ) + _executor_cls_matchers.append( _ExecutorTypeMatch((futures.Executor,), executor.ParallelThreadTaskExecutor), - ] + ) # One of these should match when a string/text is provided for the # 'executor' option (a mixed case equivalent is allowed since the match # will be lower-cased before checking). _executor_str_matchers = [ - _ExecutorTextMatch(frozenset(['processes', 'process']), - process_executor.ParallelProcessTaskExecutor), _ExecutorTextMatch(frozenset(['thread', 'threads', 'threaded']), executor.ParallelThreadTaskExecutor), _ExecutorTextMatch(frozenset(['greenthread', 'greenthreads', 'greenthreaded']), executor.ParallelGreenThreadTaskExecutor), ] + if process_executor is not None: + _executor_str_matchers.append( + _ExecutorTextMatch(frozenset(['processes', 'process']), + process_executor.ParallelProcessTaskExecutor) + ) # Used when no executor is provided (either a string or object)... _default_executor_cls = executor.ParallelThreadTaskExecutor diff --git a/taskflow/engines/action_engine/process_executor.py b/taskflow/engines/action_engine/process_executor.py index d20f7a3f6..1d2dc37ec 100644 --- a/taskflow/engines/action_engine/process_executor.py +++ b/taskflow/engines/action_engine/process_executor.py @@ -580,6 +580,8 @@ class ParallelProcessTaskExecutor(base.ParallelTaskExecutor): max_workers=None, wait_timeout=None): super(ParallelProcessTaskExecutor, self).__init__( executor=executor, max_workers=max_workers) + LOG.warning('Process task executor is deprecated. It is now disabled ' + 'in Python 3.12 or later and will be removed.') self._auth_key = _create_random_string(32) self._dispatcher = Dispatcher({}, self._auth_key, _create_random_string(32)) diff --git a/taskflow/tests/unit/action_engine/test_creation.py b/taskflow/tests/unit/action_engine/test_creation.py index 1568dfe13..e9c1d24bc 100644 --- a/taskflow/tests/unit/action_engine/test_creation.py +++ b/taskflow/tests/unit/action_engine/test_creation.py @@ -19,7 +19,6 @@ import testtools from taskflow.engines.action_engine import engine from taskflow.engines.action_engine import executor -from taskflow.engines.action_engine import process_executor from taskflow.patterns import linear_flow as lf from taskflow.persistence import backends from taskflow import test @@ -27,6 +26,11 @@ from taskflow.tests import utils from taskflow.utils import eventlet_utils as eu from taskflow.utils import persistence_utils as pu +try: + from taskflow.engines.action_engine import process_executor as pe +except ImportError: + pe = None + class ParallelCreationTest(test.TestCase): @staticmethod @@ -44,11 +48,12 @@ class ParallelCreationTest(test.TestCase): self.assertIsInstance(eng._task_executor, executor.ParallelThreadTaskExecutor) + @testtools.skipIf(pe is None, 'process_executor is not available') def test_process_string_creation(self): for s in ['process', 'processes']: eng = self._create_engine(executor=s) self.assertIsInstance(eng._task_executor, - process_executor.ParallelProcessTaskExecutor) + pe.ParallelProcessTaskExecutor) def test_thread_executor_creation(self): with futurist.ThreadPoolExecutor(1) as e: @@ -56,11 +61,12 @@ class ParallelCreationTest(test.TestCase): self.assertIsInstance(eng._task_executor, executor.ParallelThreadTaskExecutor) + @testtools.skipIf(pe is None, 'process_executor is not available') def test_process_executor_creation(self): with futurist.ProcessPoolExecutor(1) as e: eng = self._create_engine(executor=e) self.assertIsInstance(eng._task_executor, - process_executor.ParallelProcessTaskExecutor) + pe.ParallelProcessTaskExecutor) @testtools.skipIf(not eu.EVENTLET_AVAILABLE, 'eventlet is not available') def test_green_executor_creation(self): diff --git a/taskflow/tests/unit/action_engine/test_process_executor.py b/taskflow/tests/unit/action_engine/test_process_executor.py index 2bca18f3c..f882ab938 100644 --- a/taskflow/tests/unit/action_engine/test_process_executor.py +++ b/taskflow/tests/unit/action_engine/test_process_executor.py @@ -13,19 +13,26 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. - -import asyncore import errno import socket import threading -from taskflow.engines.action_engine import process_executor as pu +import testtools + from taskflow import task from taskflow import test from taskflow.test import mock from taskflow.tests import utils as test_utils +try: + import asyncore + from taskflow.engines.action_engine import process_executor as pe +except ImportError: + asyncore = None + pe = None + +@testtools.skipIf(asyncore is None, 'process_executor is not available') class ProcessExecutorHelpersTest(test.TestCase): def test_reader(self): capture_buf = [] @@ -33,8 +40,8 @@ class ProcessExecutorHelpersTest(test.TestCase): def do_capture(identity, message_capture_func): capture_buf.append(message_capture_func()) - r = pu.Reader(b"secret", do_capture) - for data in pu._encode_message(b"secret", ['hi'], b'me'): + r = pe.Reader(b"secret", do_capture) + for data in pe._encode_message(b"secret", ['hi'], b'me'): self.assertEqual(len(data), r.bytes_needed) r.feed(data) @@ -42,9 +49,9 @@ class ProcessExecutorHelpersTest(test.TestCase): self.assertEqual(['hi'], capture_buf[0]) def test_bad_hmac_reader(self): - r = pu.Reader(b"secret-2", lambda ident, capture_func: capture_func()) - in_data = b"".join(pu._encode_message(b"secret", ['hi'], b'me')) - self.assertRaises(pu.BadHmacValueError, r.feed, in_data) + r = pe.Reader(b"secret-2", lambda ident, capture_func: capture_func()) + in_data = b"".join(pe._encode_message(b"secret", ['hi'], b'me')) + self.assertRaises(pe.BadHmacValueError, r.feed, in_data) @mock.patch("socket.socket") def test_no_connect_channel(self, mock_socket_factory): @@ -52,7 +59,7 @@ class ProcessExecutorHelpersTest(test.TestCase): mock_socket_factory.return_value = mock_sock mock_sock.connect.side_effect = socket.error(errno.ECONNREFUSED, 'broken') - c = pu.Channel(2222, b"me", b"secret") + c = pe.Channel(2222, b"me", b"secret") self.assertRaises(socket.error, c.send, "hi") self.assertTrue(c.dead) self.assertTrue(mock_sock.close.called) @@ -65,7 +72,7 @@ class ProcessExecutorHelpersTest(test.TestCase): task.EVENT_UPDATE_PROGRESS, lambda _event_type, details: details_capture.append(details)) - d = pu.Dispatcher({}, b'secret', b'server-josh') + d = pe.Dispatcher({}, b'secret', b'server-josh') d.setup() d.targets[b'child-josh'] = t @@ -73,7 +80,7 @@ class ProcessExecutorHelpersTest(test.TestCase): s.start() self.addCleanup(s.join) - c = pu.Channel(d.port, b'child-josh', b'secret') + c = pe.Channel(d.port, b'child-josh', b'secret') self.addCleanup(c.close) send_what = [ @@ -87,7 +94,7 @@ class ProcessExecutorHelpersTest(test.TestCase): {'progress': 0.8}, {'progress': 0.9}, ] - e_s = pu.EventSender(c) + e_s = pe.EventSender(c) for details in send_what: e_s(task.EVENT_UPDATE_PROGRESS, details) diff --git a/taskflow/tests/unit/test_arguments_passing.py b/taskflow/tests/unit/test_arguments_passing.py index 0db930093..19260131d 100644 --- a/taskflow/tests/unit/test_arguments_passing.py +++ b/taskflow/tests/unit/test_arguments_passing.py @@ -23,6 +23,11 @@ from taskflow import test from taskflow.tests import utils from taskflow.utils import eventlet_utils as eu +try: + from taskflow.engines.action_engine import process_executor as pe +except ImportError: + pe = None + class ArgumentsPassingTest(utils.EngineTestBase): @@ -221,6 +226,7 @@ class ParallelEngineWithEventletTest(ArgumentsPassingTest, test.TestCase): executor=executor) +@testtools.skipIf(pe is None, 'process_executor is not available') class ParallelEngineWithProcessTest(ArgumentsPassingTest, test.TestCase): _EXECUTOR_WORKERS = 2 diff --git a/taskflow/tests/unit/test_engines.py b/taskflow/tests/unit/test_engines.py index 445442948..a8d877893 100644 --- a/taskflow/tests/unit/test_engines.py +++ b/taskflow/tests/unit/test_engines.py @@ -41,6 +41,11 @@ from taskflow.utils import eventlet_utils as eu from taskflow.utils import persistence_utils as p_utils from taskflow.utils import threading_utils as tu +try: + from taskflow.engines.action_engine import process_executor as pe +except ImportError: + pe = None + # Expected engine transitions when empty workflows are ran... _EMPTY_TRANSITIONS = [ @@ -1494,6 +1499,7 @@ class ParallelEngineWithEventletTest(EngineTaskTest, store=store, **kwargs) +@testtools.skipIf(pe is None, 'process_executor is not available') class ParallelEngineWithProcessTest(EngineTaskTest, EngineMultipleResultsTest, EngineLinearFlowTest, diff --git a/taskflow/tests/unit/test_retries.py b/taskflow/tests/unit/test_retries.py index 1fb0303e3..177a49c07 100644 --- a/taskflow/tests/unit/test_retries.py +++ b/taskflow/tests/unit/test_retries.py @@ -28,6 +28,11 @@ from taskflow.tests import utils from taskflow.types import failure from taskflow.utils import eventlet_utils as eu +try: + from taskflow.engines.action_engine import process_executor as pe +except ImportError: + pe = None + class FailingRetry(retry.Retry): @@ -1313,6 +1318,7 @@ class ParallelEngineWithEventletTest(RetryTest, test.TestCase): defer_reverts=defer_reverts) +@testtools.skipIf(pe is None, 'process_executor is not available') class ParallelEngineWithProcessTest(RetryTest, test.TestCase): _EXECUTOR_WORKERS = 2 diff --git a/taskflow/tests/unit/test_suspend.py b/taskflow/tests/unit/test_suspend.py index 3a9a8ad1d..c095e3834 100644 --- a/taskflow/tests/unit/test_suspend.py +++ b/taskflow/tests/unit/test_suspend.py @@ -25,6 +25,11 @@ from taskflow import test from taskflow.tests import utils from taskflow.utils import eventlet_utils as eu +try: + from taskflow.engines.action_engine import process_executor as pe +except ImportError: + pe = None + class SuspendingListener(utils.CaptureListener): @@ -224,6 +229,7 @@ class ParallelEngineWithEventletTest(SuspendTest, test.TestCase): executor=executor) +@testtools.skipIf(pe is None, 'process_executor is not available') class ParallelEngineWithProcessTest(SuspendTest, test.TestCase): _EXECUTOR_WORKERS = 2