Python3.12: disable process_executor, which depends on asyncore

The asyncore module has been deprecated in Python 3.6 and removed in
Python 3.12. In the future, we should either port process_executor to
asyncio or deprecate it, but for now we just disable it starting in
Python 3.12.

Change-Id: I6c69593c5ce5a62721294564917d7a75531a2dac
This commit is contained in:
Cyril Roelandt 2024-04-18 01:48:58 +02:00
parent 3ca2d4fdc8
commit f5e16cb441
7 changed files with 42 additions and 13 deletions

View File

@ -31,7 +31,11 @@ from oslo_utils import timeutils
from taskflow.engines.action_engine import builder from taskflow.engines.action_engine import builder
from taskflow.engines.action_engine import compiler from taskflow.engines.action_engine import compiler
from taskflow.engines.action_engine import executor from taskflow.engines.action_engine import executor
from taskflow.engines.action_engine import process_executor try:
from taskflow.engines.action_engine import process_executor
process_executor_available = True
except ImportError:
process_executor_available = False
from taskflow.engines.action_engine import runtime from taskflow.engines.action_engine import runtime
from taskflow.engines import base from taskflow.engines import base
from taskflow import exceptions as exc from taskflow import exceptions as exc
@ -559,24 +563,33 @@ String (case insensitive) Executor used
_executor_cls_matchers = [ _executor_cls_matchers = [
_ExecutorTypeMatch((futures.ThreadPoolExecutor,), _ExecutorTypeMatch((futures.ThreadPoolExecutor,),
executor.ParallelThreadTaskExecutor), executor.ParallelThreadTaskExecutor),
_ExecutorTypeMatch((futures.ProcessPoolExecutor,), ]
process_executor.ParallelProcessTaskExecutor), if process_executor_available:
_executor_cls_matchers.append(
_ExecutorTypeMatch((futures.ProcessPoolExecutor,),
process_executor.ParallelProcessTaskExecutor)
)
_executor_cls_matchers.append(
_ExecutorTypeMatch((futures.Executor,), _ExecutorTypeMatch((futures.Executor,),
executor.ParallelThreadTaskExecutor), executor.ParallelThreadTaskExecutor),
] )
# One of these should match when a string/text is provided for the # One of these should match when a string/text is provided for the
# 'executor' option (a mixed case equivalent is allowed since the match # 'executor' option (a mixed case equivalent is allowed since the match
# will be lower-cased before checking). # will be lower-cased before checking).
_executor_str_matchers = [ _executor_str_matchers = [
_ExecutorTextMatch(frozenset(['processes', 'process']),
process_executor.ParallelProcessTaskExecutor),
_ExecutorTextMatch(frozenset(['thread', 'threads', 'threaded']), _ExecutorTextMatch(frozenset(['thread', 'threads', 'threaded']),
executor.ParallelThreadTaskExecutor), executor.ParallelThreadTaskExecutor),
_ExecutorTextMatch(frozenset(['greenthread', 'greenthreads', _ExecutorTextMatch(frozenset(['greenthread', 'greenthreads',
'greenthreaded']), 'greenthreaded']),
executor.ParallelGreenThreadTaskExecutor), executor.ParallelGreenThreadTaskExecutor),
] ]
if process_executor_available:
_executor_str_matchers.append(
_ExecutorTextMatch(frozenset(['processes', 'process']),
process_executor.ParallelProcessTaskExecutor)
)
# Used when no executor is provided (either a string or object)... # Used when no executor is provided (either a string or object)...
_default_executor_cls = executor.ParallelThreadTaskExecutor _default_executor_cls = executor.ParallelThreadTaskExecutor

View File

@ -19,7 +19,11 @@ import testtools
from taskflow.engines.action_engine import engine from taskflow.engines.action_engine import engine
from taskflow.engines.action_engine import executor from taskflow.engines.action_engine import executor
from taskflow.engines.action_engine import process_executor try:
from taskflow.engines.action_engine import process_executor
_PROCESS_EXECUTOR_AVAILABLE = True
except ImportError:
_PROCESS_EXECUTOR_AVAILABLE = False
from taskflow.patterns import linear_flow as lf from taskflow.patterns import linear_flow as lf
from taskflow.persistence import backends from taskflow.persistence import backends
from taskflow import test from taskflow import test
@ -44,6 +48,8 @@ class ParallelCreationTest(test.TestCase):
self.assertIsInstance(eng._task_executor, self.assertIsInstance(eng._task_executor,
executor.ParallelThreadTaskExecutor) executor.ParallelThreadTaskExecutor)
@testtools.skipIf(not _PROCESS_EXECUTOR_AVAILABLE,
'process_executor is not available')
def test_process_string_creation(self): def test_process_string_creation(self):
for s in ['process', 'processes']: for s in ['process', 'processes']:
eng = self._create_engine(executor=s) eng = self._create_engine(executor=s)
@ -56,6 +62,8 @@ class ParallelCreationTest(test.TestCase):
self.assertIsInstance(eng._task_executor, self.assertIsInstance(eng._task_executor,
executor.ParallelThreadTaskExecutor) executor.ParallelThreadTaskExecutor)
@testtools.skipIf(not _PROCESS_EXECUTOR_AVAILABLE,
'process_executor is not available')
def test_process_executor_creation(self): def test_process_executor_creation(self):
with futurist.ProcessPoolExecutor(1) as e: with futurist.ProcessPoolExecutor(1) as e:
eng = self._create_engine(executor=e) eng = self._create_engine(executor=e)

View File

@ -14,18 +14,26 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import asyncore try:
import asyncore
_ASYNCORE_AVAILABLE = True
except ImportError:
_ASYNCORE_AVAILABLE = False
import errno import errno
import socket import socket
import threading import threading
from taskflow.engines.action_engine import process_executor as pu import testtools
if _ASYNCORE_AVAILABLE:
from taskflow.engines.action_engine import process_executor as pu
from taskflow import task from taskflow import task
from taskflow import test from taskflow import test
from taskflow.test import mock from taskflow.test import mock
from taskflow.tests import utils as test_utils from taskflow.tests import utils as test_utils
@testtools.skipIf(not _ASYNCORE_AVAILABLE, 'process_executor is not available')
class ProcessExecutorHelpersTest(test.TestCase): class ProcessExecutorHelpersTest(test.TestCase):
def test_reader(self): def test_reader(self):
capture_buf = [] capture_buf = []

View File

@ -226,7 +226,7 @@ class ParallelEngineWithProcessTest(ArgumentsPassingTest, test.TestCase):
def _make_engine(self, flow, flow_detail=None, executor=None): def _make_engine(self, flow, flow_detail=None, executor=None):
if executor is None: if executor is None:
executor = 'processes' executor = 'greenthread'
return taskflow.engines.load(flow, return taskflow.engines.load(flow,
flow_detail=flow_detail, flow_detail=flow_detail,
backend=self.backend, backend=self.backend,

View File

@ -1517,7 +1517,7 @@ class ParallelEngineWithProcessTest(EngineTaskTest,
flow_detail=None, executor=None, store=None, flow_detail=None, executor=None, store=None,
**kwargs): **kwargs):
if executor is None: if executor is None:
executor = 'processes' executor = 'greenthread'
return taskflow.engines.load(flow, flow_detail=flow_detail, return taskflow.engines.load(flow, flow_detail=flow_detail,
backend=self.backend, backend=self.backend,
engine='parallel', engine='parallel',

View File

@ -1319,7 +1319,7 @@ class ParallelEngineWithProcessTest(RetryTest, test.TestCase):
def _make_engine(self, flow, defer_reverts=None, flow_detail=None, def _make_engine(self, flow, defer_reverts=None, flow_detail=None,
executor=None): executor=None):
if executor is None: if executor is None:
executor = 'processes' executor = 'greenthread'
return taskflow.engines.load(flow, return taskflow.engines.load(flow,
flow_detail=flow_detail, flow_detail=flow_detail,
engine='parallel', engine='parallel',

View File

@ -229,7 +229,7 @@ class ParallelEngineWithProcessTest(SuspendTest, test.TestCase):
def _make_engine(self, flow, flow_detail=None, executor=None): def _make_engine(self, flow, flow_detail=None, executor=None):
if executor is None: if executor is None:
executor = 'processes' executor = 'greenthread'
return taskflow.engines.load(flow, flow_detail=flow_detail, return taskflow.engines.load(flow, flow_detail=flow_detail,
engine='parallel', engine='parallel',
backend=self.backend, backend=self.backend,