Make Actor class generic type

This allow to specify a protocol for an actor as below:

  class Protocol:
     "Class containing abstact methods to be implemented"

  class MyActor(Actor[MyProtocol]):
      "Class containing methods implementations"

Move tobiko.common._proxy module to tobilo.actors
Mova tobiko.actor._actor module to tobiko.actors

Change-Id: Ic9a7440a870c48db9449ccec907599eaaab88cfd
This commit is contained in:
Federico Ressi 2022-02-25 16:33:26 +01:00
parent e66b09527f
commit 7ab82b6512
9 changed files with 249 additions and 237 deletions

3
mypy.ini Normal file
View File

@ -0,0 +1,3 @@
[mypy]
show_error_codes = True
# disable_error_code = type-arg

View File

@ -26,7 +26,6 @@ from tobiko.common import _logging
from tobiko.common.managers import loader as loader_manager from tobiko.common.managers import loader as loader_manager
from tobiko.common import _operation from tobiko.common import _operation
from tobiko.common import _os from tobiko.common import _os
from tobiko.common import _proxy
from tobiko.common import _retry from tobiko.common import _retry
from tobiko.common import _select from tobiko.common import _select
from tobiko.common import _skip from tobiko.common import _skip
@ -101,12 +100,6 @@ get_operation = _operation.get_operation
get_operation_name = _operation.get_operation_name get_operation_name = _operation.get_operation_name
operation_config = _operation.operation_config operation_config = _operation.operation_config
call_proxy = _proxy.call_proxy
call_proxy_class = _proxy.call_proxy_class
list_protocols = _proxy.list_protocols
protocol = _proxy.protocol
CallHandler = _proxy.CallHandler
retry = _retry.retry retry = _retry.retry
retry_attempt = _retry.retry_attempt retry_attempt = _retry.retry_attempt
retry_on_exception = _retry.retry_on_exception retry_on_exception = _retry.retry_on_exception

View File

@ -15,7 +15,13 @@
# under the License. # under the License.
from __future__ import absolute_import from __future__ import absolute_import
from tobiko.actor import _actor from tobiko.actors import _actor
from tobiko.actors import _proxy
call_proxy = _proxy.create_call_proxy
call_proxy_class = _proxy.create_call_proxy_class
CallProxy = _proxy.CallProxy
CallProxyBase = _proxy.CallProxyBase
create_actor = _actor.create_actor create_actor = _actor.create_actor
actor_method = _actor.actor_method actor_method = _actor.actor_method

View File

@ -15,6 +15,7 @@
# under the License. # under the License.
from __future__ import absolute_import from __future__ import absolute_import
import abc
import asyncio import asyncio
import inspect import inspect
import logging import logging
@ -23,13 +24,14 @@ import typing
from oslo_log import log from oslo_log import log
import tobiko import tobiko
from tobiko.actor import _request from tobiko.actors import _proxy
from tobiko.actors import _request
T = typing.TypeVar('T') P = typing.TypeVar('P', bound=abc.ABC)
class ActorRef(tobiko.CallHandler): class ActorRef(_proxy.CallProxyBase, typing.Generic[P]):
def __init__(self, actor_id: str, requests: _request.ActorRequestQueue): def __init__(self, actor_id: str, requests: _request.ActorRequestQueue):
super().__init__() super().__init__()
@ -65,73 +67,59 @@ def actor_method(obj):
return obj return obj
class Actor(tobiko.SharedFixture): class _DummyActorProtocol(abc.ABC):
pass
class Actor(tobiko.SharedFixture, typing.Generic[P],
metaclass=_proxy.GenericMeta):
max_queue_size: int = 0 max_queue_size: int = 0
log: logging.LoggerAdapter log: logging.LoggerAdapter
loop: asyncio.AbstractEventLoop event_loop: asyncio.AbstractEventLoop
actor_ref: P
base_ref_class = ActorRef _actor_protocol = _DummyActorProtocol
_actor_request_queue: _request.ActorRequestQueue
_actor_methods: typing.Dict[str, typing.Callable]
_ref_class: type
_ref: ActorRef
_requests: _request.ActorRequestQueue
_run_actor_task: asyncio.Task _run_actor_task: asyncio.Task
def __init_subclass__(cls,
*args,
**kwargs):
super().__init_subclass__(*args, **kwargs)
cls._actor_methods = dict(inspect.getmembers(cls, is_actor_method))
cls._actor_ref_class = ActorRef[cls._actor_protocol]
def __class_getitem__(cls, item: typing.Type[P]):
tobiko.check_valid_type(item, type)
return type(cls.__name__, (cls, item), dict(_actor_protocol=item))
@property @property
def actor_id(self) -> str: def actor_id(self) -> str:
return self.fixture_name return self.fixture_name
def setup_fixture(self): def setup_fixture(self):
self.loop = self.get_loop() self.log = self._setup_log()
self.log = self.create_log() self.event_loop = self._setup_event_loop()
self._requests = self.create_request_queue() self._actor_request_queue = self._setup_actor_request_queue()
self.actor_ref = self._setup_actor_ref()
self._run_actor_task = self.loop.create_task( self._run_actor_task = self.event_loop.create_task(
self._run_actor()) self._run_actor())
@classmethod def _setup_log(self):
def ref_class(cls) -> type:
try:
return cls._ref_class
except AttributeError:
pass
name = cls.__name__ + 'Ref'
bases = cls.base_ref_class,
namespace = {'__module__': cls.__module__,
'protocol_class': cls}
return type(name, bases, namespace)
@property
def ref(self) -> ActorRef:
try:
return self._ref
except AttributeError:
pass
ref_class = self.ref_class()
self._ref = ref = ref_class(actor_id=self.actor_id,
requests=self._requests)
return ref
def get_loop(self) -> asyncio.AbstractEventLoop:
return asyncio.get_event_loop()
@classmethod
def _get_actor_methods(cls) -> typing.Dict[str, typing.Callable]:
try:
return cls._actor_methods
except AttributeError:
pass
cls._actor_methods = dict(inspect.getmembers(cls, is_actor_method))
return cls._actor_methods
def create_log(self):
return log.getLogger(self.actor_id) return log.getLogger(self.actor_id)
def create_request_queue(self) -> _request.ActorRequestQueue: @staticmethod
def _setup_event_loop() -> asyncio.AbstractEventLoop:
return asyncio.get_event_loop()
def _setup_actor_request_queue(self) -> _request.ActorRequestQueue:
return _request.create_request_queue(max_size=self.max_queue_size, return _request.create_request_queue(max_size=self.max_queue_size,
loop=self.loop) loop=self.event_loop)
def _setup_actor_ref(self) -> P:
return self._actor_ref_class(actor_id=self.actor_id,
requests=self._actor_request_queue)
async def setup_actor(self): async def setup_actor(self):
pass pass
@ -159,7 +147,8 @@ class Actor(tobiko.SharedFixture):
while True: while True:
request = None request = None
try: try:
request = await self._requests.receive_request() request = await (
self._actor_request_queue.receive_request())
if not isinstance(request, _request.ActorRequest): if not isinstance(request, _request.ActorRequest):
raise TypeError( raise TypeError(
f"Invalid actor request type: {request}") f"Invalid actor request type: {request}")
@ -174,8 +163,7 @@ class Actor(tobiko.SharedFixture):
await self.on_cleanup_error() await self.on_cleanup_error()
def _get_actor_method(self, name: str) -> typing.Callable: def _get_actor_method(self, name: str) -> typing.Callable:
methods = self._get_actor_methods() method = self._actor_methods.get(name)
method = methods.get(name)
if method is None: if method is None:
raise ValueError(f"Invalid request method name: {name}") raise ValueError(f"Invalid request method name: {name}")
return method return method
@ -190,10 +178,10 @@ class Actor(tobiko.SharedFixture):
request.future.set_result(result) request.future.set_result(result)
def create_actor(obj: typing.Union[str, Actor, typing.Type[Actor]], def create_actor(obj: typing.Type[P],
fixture_id: typing.Optional[str] = None, fixture_id: typing.Optional[str] = None,
manager=None) -> ActorRef: manager=None) -> P:
actor = tobiko.setup_fixture(obj, actor = tobiko.setup_fixture(obj,
fixture_id=fixture_id, fixture_id=fixture_id,
manager=manager) manager=manager)
return actor.ref return actor.actor_ref

151
tobiko/actors/_proxy.py Normal file
View File

@ -0,0 +1,151 @@
# Copyright 2021 Red Hat
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import
import abc
import inspect
import sys
import types
import typing
import decorator
import tobiko
P = typing.TypeVar('P', bound=abc.ABC)
GenericMetaBase = abc.ABCMeta
if hasattr(typing, 'GenericMeta'):
class GenericMetaBase( # type: ignore[no-redef]
typing.GenericMeta, # type: ignore[name-defined]
abc.ABCMeta):
# pylint: disable=function-redefined,no-member
pass
class GenericMeta(GenericMetaBase):
def __getitem__(self, item):
getitem = getattr(super(), '__getitem__', None)
if callable(getitem):
cls = getitem(item) # pylint: disable=not-callable
if hasattr(cls, '__class_getitem__'):
cls = cls.__class_getitem__(cls, item)
return cls
elif hasattr(self, '__class_getitem__'):
return self.__class_getitem__(item)
else:
return self
def is_public_function(obj):
return (inspect.isfunction(obj) and
getattr(obj, '__name__', '_')[0] != '_')
class CallHandler(abc.ABC):
@abc.abstractmethod
def _handle_call(self, method: typing.Callable, *args, **kwargs):
raise NotImplementedError
class CallProxyBase(CallHandler, typing.Generic[P], abc.ABC,
metaclass=GenericMeta):
def __class_getitem__(cls, item: typing.Type[P]):
tobiko.check_valid_type(item, type)
return create_call_proxy_class(protocols=(item,),
class_name=cls.__name__,
bases=(cls,))
def __init_subclass__(cls,
*args,
**kwargs):
super().__init_subclass__(*args, **kwargs)
# On python < 3.8 must ensure __class_getitem__ is there
if sys.version_info < (3, 8):
cls.__class_getitem__ = CallProxyBase.__class_getitem__
class CallProxy(CallProxyBase, typing.Generic[P]):
def __init__(self, handle_call: typing.Callable):
assert callable(handle_call)
self._handle_call = handle_call # type: ignore
def _handle_call(self, method: typing.Callable, *args, **kwargs):
raise NotImplementedError
def create_call_proxy_class(
protocols: typing.Tuple[typing.Type[P], ...],
class_name: str,
bases: typing.Tuple[typing.Type, ...] = None,
namespace: dict = None) -> typing.Type[P]:
if bases is None:
bases = tuple()
def exec_body(ns: typing.Dict[str, typing.Any]):
if namespace is not None:
ns.update(namespace)
for cls in protocols:
for member_name, member in list_abstract_methods(cls):
if member_name not in ns and is_public_function(member):
method = create_call_proxy_method(member)
ns[member_name] = method
proxy_class = types.new_class(name=class_name,
bases=bases + protocols,
exec_body=exec_body)
return typing.cast(typing.Type[P], proxy_class)
def create_call_proxy(handle_call: typing.Callable,
*protocols: typing.Type[P]) -> P:
cls = create_call_proxy_class(protocols=protocols,
class_name='CallProxy',
bases=(CallProxy,))
return cls(handle_call) # type: ignore[call-arg]
def list_abstract_classes(cls: typing.Type) \
-> typing.Tuple[typing.Type[P], ...]:
subclasses = inspect.getmro(cls)
protocols = tuple(cls
for cls in subclasses
if inspect.isabstract(cls))
return typing.cast(typing.Tuple[typing.Type[P], ...], protocols)
def list_abstract_methods(cls: typing.Type) \
-> typing.List[typing.Tuple[str, typing.Callable]]:
methods: typing.List[typing.Tuple[str, typing.Callable]] = []
if inspect.isabstract(cls):
for name, member in inspect.getmembers(cls, inspect.isfunction):
if getattr(member, "__isabstractmethod__", False):
methods.append((name, member))
return methods
def create_call_proxy_method(func: typing.Callable) -> typing.Callable:
method = decorator.decorate(func, _call_proxy_method)
assert method is not func
setattr(method, "__isabstractmethod__", False)
return method
def _call_proxy_method(func, self: CallProxy, *args, **kwargs):
# pylint: disable=protected-access
return self._handle_call(func, *args, **kwargs)

View File

@ -1,124 +0,0 @@
# Copyright 2021 Red Hat
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import
import functools
import inspect
import typing
import decorator
def protocol(cls: typing.Type) -> typing.Type:
name = cls.__name__
bases = inspect.getmro(cls)[1:]
namespace = dict(cls.__dict__,
_is_protocol=True,
__module__=cls.__module__)
return type(name, bases, namespace)
def is_protocol_class(obj):
return inspect.isclass(obj) and obj.__dict__.get('_is_protocol', False)
def is_public_function(obj):
return (inspect.isfunction(obj) and
getattr(obj, '__name__', '_')[0] != '_')
T = typing.TypeVar('T')
class CallHandlerMeta(type):
def __new__(mcls, name, bases, namespace, **kwargs):
protocol_class = namespace.get('protocol_class')
if protocol_class is not None:
proxy_class = call_proxy_class(protocol_class)
bases += proxy_class,
return super().__new__(mcls, name, bases, namespace, **kwargs)
class CallHandler(metaclass=CallHandlerMeta):
protocol_class: typing.Type
def __init__(self,
handle_call: typing.Optional[typing.Callable] = None):
if handle_call is not None:
assert callable(handle_call)
setattr(self, '_handle_call', handle_call)
def _handle_call(self, method: typing.Callable, *args, **kwargs):
pass
def use_as(self, cls: typing.Type[T]) -> T:
assert isinstance(self, cls)
return typing.cast(T, self)
def call_proxy_class(
cls: typing.Type,
*bases: typing.Type,
class_name: str = None,
namespace: dict = None) \
-> typing.Type:
if not inspect.isclass(cls):
raise TypeError(f"Object {cls} is not a class")
if class_name is None:
class_name = cls.__name__ + 'Proxy'
protocol_classes = list_protocols(typing.cast(typing.Hashable, cls))
if not protocol_classes:
raise TypeError(f"Class {cls} doesn't implement any protocol")
if namespace is None:
namespace = {}
for protocol_class in reversed(protocol_classes):
for name, member in protocol_class.__dict__.items():
if is_public_function(member):
method = call_proxy_method(member)
namespace[name] = method
# Skip empty protocols
if not namespace:
raise TypeError(f"Class {cls} has any protocol specification")
namespace['__module__'] = cls.__module__
proxy_class = type(class_name, bases + protocol_classes, namespace)
assert not is_protocol_class(proxy_class)
assert not is_protocol_class(proxy_class)
return proxy_class
def call_proxy(cls: typing.Type, handle_call: typing.Callable) -> CallHandler:
proxy_class = call_proxy_class(cls, CallHandler)
return proxy_class(handle_call)
@functools.lru_cache()
def list_protocols(cls: typing.Type) -> typing.Tuple[typing.Type, ...]:
subclasses = inspect.getmro(cls)
protocols = tuple(cls
for cls in subclasses
if is_protocol_class(cls))
return tuple(protocols)
def call_proxy_method(func: typing.Callable) -> typing.Callable:
method = decorator.decorate(func, _call_proxy_method)
assert method is not func
return method
def _call_proxy_method(func, self: CallHandler, *args, **kwargs):
# pylint: disable=protected-access
return self._handle_call(func, *args, **kwargs)

View File

@ -15,36 +15,34 @@
# under the License. # under the License.
from __future__ import absolute_import from __future__ import absolute_import
from unittest import mock import abc
import typing
import tobiko
from tobiko.tests import unit from tobiko.tests import unit
from tobiko import actor from tobiko import actors
@tobiko.protocol class Greeter(abc.ABC):
class Greeter:
@abc.abstractmethod
async def greet(self, whom: str, greeted: 'Greeted'): async def greet(self, whom: str, greeted: 'Greeted'):
raise NotImplementedError raise NotImplementedError
@tobiko.protocol
class Greeted: class Greeted:
def greeted(self, whom: str, greeter: Greeter): greeter: typing.Optional[Greeter] = None
raise NotImplementedError whom: typing.Optional[str] = None
async def greeted(self, whom: str, greeter: Greeter):
self.greeter = greeter
self.whom = whom
class GreeterRef(actor.ActorRef): class GreeterActor(actors.Actor[Greeter]):
pass
class GreeterActor(Greeter, actor.Actor):
setup_called = False setup_called = False
cleanup_called = False cleanup_called = False
base_ref_class = GreeterRef
async def setup_actor(self): async def setup_actor(self):
self.setup_called = True self.setup_called = True
@ -52,41 +50,35 @@ class GreeterActor(Greeter, actor.Actor):
async def cleanup_actor(self): async def cleanup_actor(self):
self.cleanup_called = True self.cleanup_called = True
@actor.actor_method @actors.actor_method
async def greet(self, whom: str, greeted: Greeted): async def greet(self, whom: str, greeted: Greeted):
assert isinstance(self, Greeter)
assert isinstance(self, GreeterActor)
assert self.setup_called assert self.setup_called
assert not self.cleanup_called assert not self.cleanup_called
if not whom: if not whom:
raise ValueError("'whom' parameter can't be empty") raise ValueError("'whom' parameter can't be empty")
self.log.info(f"Hello {whom}!") self.log.info(f"Hello {whom}!")
greeted.greeted(whom=whom, greeter=self.ref.use_as(Greeter)) await greeted.greeted(whom=whom, greeter=self.actor_ref)
class ActorTest(unit.TobikoUnitTest): class ActorTest(unit.TobikoUnitTest):
def test_greeter_ref_class(self):
ref_class = GreeterActor.ref_class()
self.assertTrue(issubclass(ref_class, actor.ActorRef))
self.assertTrue(issubclass(ref_class, GreeterRef))
self.assertTrue(issubclass(ref_class, Greeter))
async def test_async_request(self): async def test_async_request(self):
greeter = actor.create_actor(GreeterActor).use_as(Greeter) greeter = actors.create_actor(GreeterActor)
self.assertIsInstance(greeter, actor.ActorRef) self.assertIsInstance(greeter, actors.ActorRef)
self.assertIsInstance(greeter, GreeterRef)
self.assertIsInstance(greeter, Greeter) self.assertIsInstance(greeter, Greeter)
greeted = mock.MagicMock(spec=Greeted) greeted = Greeted()
await greeter.greet(whom=self.id(), greeted=greeted) await greeter.greet(whom=self.id(), greeted=greeted)
greeted.greeted.assert_called_with(whom=self.id(), self.assertEqual(self.id(), greeted.whom)
greeter=greeter) self.assertIs(greeter, greeted.greeter)
async def test_async_request_failure(self): async def test_async_request_failure(self):
greeter = actor.create_actor(GreeterActor).use_as(Greeter) greeter = actors.create_actor(GreeterActor)
self.assertIsInstance(greeter, actor.ActorRef) self.assertIsInstance(greeter, actors.ActorRef)
self.assertIsInstance(greeter, Greeter) self.assertIsInstance(greeter, Greeter)
greeted = mock.MagicMock(spec=Greeted) greeted = Greeted()
try: try:
await greeter.greet(whom="", greeted=greeted) await greeter.greet(whom="", greeted=greeted)
@ -94,4 +86,5 @@ class ActorTest(unit.TobikoUnitTest):
self.assertEqual("'whom' parameter can't be empty", str(ex)) self.assertEqual("'whom' parameter can't be empty", str(ex))
else: else:
self.fail("Exception not raised") self.fail("Exception not raised")
greeted.greeted.assert_not_called() self.assertIsNone(greeted.whom)
self.assertIsNone(greeted.greeter)

View File

@ -13,31 +13,32 @@
# under the License. # under the License.
from __future__ import absolute_import from __future__ import absolute_import
import abc
import inspect import inspect
import typing import typing
from unittest import mock from unittest import mock
import tobiko from tobiko import actors
from tobiko.tests import unit from tobiko.tests import unit
@tobiko.protocol class MyProto(abc.ABC):
class MyProto:
# pylint: disable=unused-argument
@abc.abstractmethod
def call_one(self, arg='a') -> int: def call_one(self, arg='a') -> int:
return 42 raise NotImplementedError
@abc.abstractmethod
def call_two(self, *args) -> int: def call_two(self, *args) -> int:
return 42 raise NotImplementedError
@abc.abstractmethod
def call_three(self, **kwargs) -> int: def call_three(self, **kwargs) -> int:
return 42 raise NotImplementedError
class MyProtoHandler(tobiko.CallHandler): class MyProxy(actors.CallProxy[MyProto]):
pass
protocol_class = MyProto
class ProxyTest(unit.TobikoUnitTest): class ProxyTest(unit.TobikoUnitTest):
@ -52,7 +53,7 @@ class ProxyTest(unit.TobikoUnitTest):
def test_call_handler(self): def test_call_handler(self):
# pylint: disable=no-member # pylint: disable=no-member
handler = self.mock_handler() handler = self.mock_handler()
proxy: MyProto = MyProtoHandler(handler).use_as(MyProto) proxy = MyProxy(handler)
self.assertIsInstance(proxy, MyProto) self.assertIsInstance(proxy, MyProto)
self.assertTrue(callable(proxy.call_one)) self.assertTrue(callable(proxy.call_one))
self.assertEqual(inspect.signature(MyProto.call_one), self.assertEqual(inspect.signature(MyProto.call_one),
@ -63,8 +64,9 @@ class ProxyTest(unit.TobikoUnitTest):
handler.assert_called_with(MyProto.call_one, 'a') handler.assert_called_with(MyProto.call_one, 'a')
def test_call_proxy(self): def test_call_proxy(self):
# pylint: disable=no-member
handler = self.mock_handler() handler = self.mock_handler()
proxy = tobiko.call_proxy(MyProto, handler).use_as(MyProto) proxy = actors.call_proxy(handler, MyProto)
self.assertIsInstance(proxy, MyProto) self.assertIsInstance(proxy, MyProto)
self.assertTrue(callable(proxy.call_one)) self.assertTrue(callable(proxy.call_one))
self.assertEqual(inspect.signature(MyProto.call_one), self.assertEqual(inspect.signature(MyProto.call_one),