Merge "Python3.12: disable process_executor, which depends on asyncore"

This commit is contained in:
Zuul 2024-05-17 09:53:29 +00:00 committed by Gerrit Code Review
commit 828e92409c
9 changed files with 78 additions and 21 deletions

View File

@ -0,0 +1,6 @@
---
deprecations:
- |
The process_executor module has been deprecated, starting with Python 3.12.
It is still available in older versions of Python. There is no replacement
for it.

View File

@ -31,7 +31,6 @@ from oslo_utils import timeutils
from taskflow.engines.action_engine import builder from taskflow.engines.action_engine import builder
from taskflow.engines.action_engine import compiler from taskflow.engines.action_engine import compiler
from taskflow.engines.action_engine import executor from taskflow.engines.action_engine import executor
from taskflow.engines.action_engine import process_executor
from taskflow.engines.action_engine import runtime from taskflow.engines.action_engine import runtime
from taskflow.engines import base from taskflow.engines import base
from taskflow import exceptions as exc from taskflow import exceptions as exc
@ -41,6 +40,11 @@ from taskflow import storage
from taskflow.types import failure from taskflow.types import failure
from taskflow.utils import misc from taskflow.utils import misc
try:
from taskflow.engines.action_engine import process_executor
except ImportError:
process_executor = None
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
@ -559,24 +563,32 @@ String (case insensitive) Executor used
_executor_cls_matchers = [ _executor_cls_matchers = [
_ExecutorTypeMatch((futures.ThreadPoolExecutor,), _ExecutorTypeMatch((futures.ThreadPoolExecutor,),
executor.ParallelThreadTaskExecutor), executor.ParallelThreadTaskExecutor),
]
if process_executor is not None:
_executor_cls_matchers.append(
_ExecutorTypeMatch((futures.ProcessPoolExecutor,), _ExecutorTypeMatch((futures.ProcessPoolExecutor,),
process_executor.ParallelProcessTaskExecutor), process_executor.ParallelProcessTaskExecutor)
)
_executor_cls_matchers.append(
_ExecutorTypeMatch((futures.Executor,), _ExecutorTypeMatch((futures.Executor,),
executor.ParallelThreadTaskExecutor), executor.ParallelThreadTaskExecutor),
] )
# One of these should match when a string/text is provided for the # One of these should match when a string/text is provided for the
# 'executor' option (a mixed case equivalent is allowed since the match # 'executor' option (a mixed case equivalent is allowed since the match
# will be lower-cased before checking). # will be lower-cased before checking).
_executor_str_matchers = [ _executor_str_matchers = [
_ExecutorTextMatch(frozenset(['processes', 'process']),
process_executor.ParallelProcessTaskExecutor),
_ExecutorTextMatch(frozenset(['thread', 'threads', 'threaded']), _ExecutorTextMatch(frozenset(['thread', 'threads', 'threaded']),
executor.ParallelThreadTaskExecutor), executor.ParallelThreadTaskExecutor),
_ExecutorTextMatch(frozenset(['greenthread', 'greenthreads', _ExecutorTextMatch(frozenset(['greenthread', 'greenthreads',
'greenthreaded']), 'greenthreaded']),
executor.ParallelGreenThreadTaskExecutor), executor.ParallelGreenThreadTaskExecutor),
] ]
if process_executor is not None:
_executor_str_matchers.append(
_ExecutorTextMatch(frozenset(['processes', 'process']),
process_executor.ParallelProcessTaskExecutor)
)
# Used when no executor is provided (either a string or object)... # Used when no executor is provided (either a string or object)...
_default_executor_cls = executor.ParallelThreadTaskExecutor _default_executor_cls = executor.ParallelThreadTaskExecutor

View File

@ -580,6 +580,8 @@ class ParallelProcessTaskExecutor(base.ParallelTaskExecutor):
max_workers=None, wait_timeout=None): max_workers=None, wait_timeout=None):
super(ParallelProcessTaskExecutor, self).__init__( super(ParallelProcessTaskExecutor, self).__init__(
executor=executor, max_workers=max_workers) executor=executor, max_workers=max_workers)
LOG.warning('Process task executor is deprecated. It is now disabled '
'in Python 3.12 or later and will be removed.')
self._auth_key = _create_random_string(32) self._auth_key = _create_random_string(32)
self._dispatcher = Dispatcher({}, self._auth_key, self._dispatcher = Dispatcher({}, self._auth_key,
_create_random_string(32)) _create_random_string(32))

View File

@ -19,7 +19,6 @@ import testtools
from taskflow.engines.action_engine import engine from taskflow.engines.action_engine import engine
from taskflow.engines.action_engine import executor from taskflow.engines.action_engine import executor
from taskflow.engines.action_engine import process_executor
from taskflow.patterns import linear_flow as lf from taskflow.patterns import linear_flow as lf
from taskflow.persistence import backends from taskflow.persistence import backends
from taskflow import test from taskflow import test
@ -27,6 +26,11 @@ from taskflow.tests import utils
from taskflow.utils import eventlet_utils as eu from taskflow.utils import eventlet_utils as eu
from taskflow.utils import persistence_utils as pu from taskflow.utils import persistence_utils as pu
try:
from taskflow.engines.action_engine import process_executor as pe
except ImportError:
pe = None
class ParallelCreationTest(test.TestCase): class ParallelCreationTest(test.TestCase):
@staticmethod @staticmethod
@ -44,11 +48,12 @@ class ParallelCreationTest(test.TestCase):
self.assertIsInstance(eng._task_executor, self.assertIsInstance(eng._task_executor,
executor.ParallelThreadTaskExecutor) executor.ParallelThreadTaskExecutor)
@testtools.skipIf(pe is None, 'process_executor is not available')
def test_process_string_creation(self): def test_process_string_creation(self):
for s in ['process', 'processes']: for s in ['process', 'processes']:
eng = self._create_engine(executor=s) eng = self._create_engine(executor=s)
self.assertIsInstance(eng._task_executor, self.assertIsInstance(eng._task_executor,
process_executor.ParallelProcessTaskExecutor) pe.ParallelProcessTaskExecutor)
def test_thread_executor_creation(self): def test_thread_executor_creation(self):
with futurist.ThreadPoolExecutor(1) as e: with futurist.ThreadPoolExecutor(1) as e:
@ -56,11 +61,12 @@ class ParallelCreationTest(test.TestCase):
self.assertIsInstance(eng._task_executor, self.assertIsInstance(eng._task_executor,
executor.ParallelThreadTaskExecutor) executor.ParallelThreadTaskExecutor)
@testtools.skipIf(pe is None, 'process_executor is not available')
def test_process_executor_creation(self): def test_process_executor_creation(self):
with futurist.ProcessPoolExecutor(1) as e: with futurist.ProcessPoolExecutor(1) as e:
eng = self._create_engine(executor=e) eng = self._create_engine(executor=e)
self.assertIsInstance(eng._task_executor, self.assertIsInstance(eng._task_executor,
process_executor.ParallelProcessTaskExecutor) pe.ParallelProcessTaskExecutor)
@testtools.skipIf(not eu.EVENTLET_AVAILABLE, 'eventlet is not available') @testtools.skipIf(not eu.EVENTLET_AVAILABLE, 'eventlet is not available')
def test_green_executor_creation(self): def test_green_executor_creation(self):

View File

@ -13,19 +13,26 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import asyncore
import errno import errno
import socket import socket
import threading import threading
from taskflow.engines.action_engine import process_executor as pu import testtools
from taskflow import task from taskflow import task
from taskflow import test from taskflow import test
from taskflow.test import mock from taskflow.test import mock
from taskflow.tests import utils as test_utils from taskflow.tests import utils as test_utils
try:
import asyncore
from taskflow.engines.action_engine import process_executor as pe
except ImportError:
asyncore = None
pe = None
@testtools.skipIf(asyncore is None, 'process_executor is not available')
class ProcessExecutorHelpersTest(test.TestCase): class ProcessExecutorHelpersTest(test.TestCase):
def test_reader(self): def test_reader(self):
capture_buf = [] capture_buf = []
@ -33,8 +40,8 @@ class ProcessExecutorHelpersTest(test.TestCase):
def do_capture(identity, message_capture_func): def do_capture(identity, message_capture_func):
capture_buf.append(message_capture_func()) capture_buf.append(message_capture_func())
r = pu.Reader(b"secret", do_capture) r = pe.Reader(b"secret", do_capture)
for data in pu._encode_message(b"secret", ['hi'], b'me'): for data in pe._encode_message(b"secret", ['hi'], b'me'):
self.assertEqual(len(data), r.bytes_needed) self.assertEqual(len(data), r.bytes_needed)
r.feed(data) r.feed(data)
@ -42,9 +49,9 @@ class ProcessExecutorHelpersTest(test.TestCase):
self.assertEqual(['hi'], capture_buf[0]) self.assertEqual(['hi'], capture_buf[0])
def test_bad_hmac_reader(self): def test_bad_hmac_reader(self):
r = pu.Reader(b"secret-2", lambda ident, capture_func: capture_func()) r = pe.Reader(b"secret-2", lambda ident, capture_func: capture_func())
in_data = b"".join(pu._encode_message(b"secret", ['hi'], b'me')) in_data = b"".join(pe._encode_message(b"secret", ['hi'], b'me'))
self.assertRaises(pu.BadHmacValueError, r.feed, in_data) self.assertRaises(pe.BadHmacValueError, r.feed, in_data)
@mock.patch("socket.socket") @mock.patch("socket.socket")
def test_no_connect_channel(self, mock_socket_factory): def test_no_connect_channel(self, mock_socket_factory):
@ -52,7 +59,7 @@ class ProcessExecutorHelpersTest(test.TestCase):
mock_socket_factory.return_value = mock_sock mock_socket_factory.return_value = mock_sock
mock_sock.connect.side_effect = socket.error(errno.ECONNREFUSED, mock_sock.connect.side_effect = socket.error(errno.ECONNREFUSED,
'broken') 'broken')
c = pu.Channel(2222, b"me", b"secret") c = pe.Channel(2222, b"me", b"secret")
self.assertRaises(socket.error, c.send, "hi") self.assertRaises(socket.error, c.send, "hi")
self.assertTrue(c.dead) self.assertTrue(c.dead)
self.assertTrue(mock_sock.close.called) self.assertTrue(mock_sock.close.called)
@ -65,7 +72,7 @@ class ProcessExecutorHelpersTest(test.TestCase):
task.EVENT_UPDATE_PROGRESS, task.EVENT_UPDATE_PROGRESS,
lambda _event_type, details: details_capture.append(details)) lambda _event_type, details: details_capture.append(details))
d = pu.Dispatcher({}, b'secret', b'server-josh') d = pe.Dispatcher({}, b'secret', b'server-josh')
d.setup() d.setup()
d.targets[b'child-josh'] = t d.targets[b'child-josh'] = t
@ -73,7 +80,7 @@ class ProcessExecutorHelpersTest(test.TestCase):
s.start() s.start()
self.addCleanup(s.join) self.addCleanup(s.join)
c = pu.Channel(d.port, b'child-josh', b'secret') c = pe.Channel(d.port, b'child-josh', b'secret')
self.addCleanup(c.close) self.addCleanup(c.close)
send_what = [ send_what = [
@ -87,7 +94,7 @@ class ProcessExecutorHelpersTest(test.TestCase):
{'progress': 0.8}, {'progress': 0.8},
{'progress': 0.9}, {'progress': 0.9},
] ]
e_s = pu.EventSender(c) e_s = pe.EventSender(c)
for details in send_what: for details in send_what:
e_s(task.EVENT_UPDATE_PROGRESS, details) e_s(task.EVENT_UPDATE_PROGRESS, details)

View File

@ -23,6 +23,11 @@ from taskflow import test
from taskflow.tests import utils from taskflow.tests import utils
from taskflow.utils import eventlet_utils as eu from taskflow.utils import eventlet_utils as eu
try:
from taskflow.engines.action_engine import process_executor as pe
except ImportError:
pe = None
class ArgumentsPassingTest(utils.EngineTestBase): class ArgumentsPassingTest(utils.EngineTestBase):
@ -221,6 +226,7 @@ class ParallelEngineWithEventletTest(ArgumentsPassingTest, test.TestCase):
executor=executor) executor=executor)
@testtools.skipIf(pe is None, 'process_executor is not available')
class ParallelEngineWithProcessTest(ArgumentsPassingTest, test.TestCase): class ParallelEngineWithProcessTest(ArgumentsPassingTest, test.TestCase):
_EXECUTOR_WORKERS = 2 _EXECUTOR_WORKERS = 2

View File

@ -41,6 +41,11 @@ from taskflow.utils import eventlet_utils as eu
from taskflow.utils import persistence_utils as p_utils from taskflow.utils import persistence_utils as p_utils
from taskflow.utils import threading_utils as tu from taskflow.utils import threading_utils as tu
try:
from taskflow.engines.action_engine import process_executor as pe
except ImportError:
pe = None
# Expected engine transitions when empty workflows are ran... # Expected engine transitions when empty workflows are ran...
_EMPTY_TRANSITIONS = [ _EMPTY_TRANSITIONS = [
@ -1494,6 +1499,7 @@ class ParallelEngineWithEventletTest(EngineTaskTest,
store=store, **kwargs) store=store, **kwargs)
@testtools.skipIf(pe is None, 'process_executor is not available')
class ParallelEngineWithProcessTest(EngineTaskTest, class ParallelEngineWithProcessTest(EngineTaskTest,
EngineMultipleResultsTest, EngineMultipleResultsTest,
EngineLinearFlowTest, EngineLinearFlowTest,

View File

@ -28,6 +28,11 @@ from taskflow.tests import utils
from taskflow.types import failure from taskflow.types import failure
from taskflow.utils import eventlet_utils as eu from taskflow.utils import eventlet_utils as eu
try:
from taskflow.engines.action_engine import process_executor as pe
except ImportError:
pe = None
class FailingRetry(retry.Retry): class FailingRetry(retry.Retry):
@ -1313,6 +1318,7 @@ class ParallelEngineWithEventletTest(RetryTest, test.TestCase):
defer_reverts=defer_reverts) defer_reverts=defer_reverts)
@testtools.skipIf(pe is None, 'process_executor is not available')
class ParallelEngineWithProcessTest(RetryTest, test.TestCase): class ParallelEngineWithProcessTest(RetryTest, test.TestCase):
_EXECUTOR_WORKERS = 2 _EXECUTOR_WORKERS = 2

View File

@ -25,6 +25,11 @@ from taskflow import test
from taskflow.tests import utils from taskflow.tests import utils
from taskflow.utils import eventlet_utils as eu from taskflow.utils import eventlet_utils as eu
try:
from taskflow.engines.action_engine import process_executor as pe
except ImportError:
pe = None
class SuspendingListener(utils.CaptureListener): class SuspendingListener(utils.CaptureListener):
@ -224,6 +229,7 @@ class ParallelEngineWithEventletTest(SuspendTest, test.TestCase):
executor=executor) executor=executor)
@testtools.skipIf(pe is None, 'process_executor is not available')
class ParallelEngineWithProcessTest(SuspendTest, test.TestCase): class ParallelEngineWithProcessTest(SuspendTest, test.TestCase):
_EXECUTOR_WORKERS = 2 _EXECUTOR_WORKERS = 2