Refine interface for starting/stopping actors

- Have actor_id as a UUID string and use fixture name as actor_name
- Add actor_id to ActorRequest
- Add ping_actor method
- Create ActorRef during Actor class initialisation
- Add support for type variables in generic actor classes

Change-Id: Ib6708980e61bc32b778757cb027eb7c41ae5c46f
This commit is contained in:
Federico Ressi
2022-03-01 15:31:17 +01:00
parent 71da996a33
commit 5e7ab1f6cf
5 changed files with 287 additions and 88 deletions
+4 -1
View File
@@ -23,7 +23,10 @@ call_proxy_class = _proxy.create_call_proxy_class
CallProxy = _proxy.CallProxy
CallProxyBase = _proxy.CallProxyBase
create_actor = _actor.create_actor
actor_method = _actor.actor_method
cleanup_actor = _actor.cleanup_actor
setup_actor = _actor.setup_actor
start_actor = _actor.start_actor
stop_actor = _actor.stop_actor
Actor = _actor.Actor
ActorRef = _actor.ActorRef
+170 -60
View File
@@ -20,6 +20,7 @@ import asyncio
import inspect
import logging
import typing
import uuid
from oslo_log import log
@@ -31,22 +32,29 @@ from tobiko.actors import _request
P = typing.TypeVar('P', bound=abc.ABC)
class ActorRef(_proxy.CallProxyBase, typing.Generic[P]):
class ActorRef(_proxy.CallProxyBase, typing.Generic[P], abc.ABC):
def __init__(self, actor_id: str, requests: _request.ActorRequestQueue):
def __init__(self, actor_id: str,
requests: _request.ActorRequestQueue):
super().__init__()
self.actor_id = actor_id
self._requests = requests
def send_request(self, method: str, **arguments):
return self._requests.send_request(method=method, arguments=arguments)
return self._requests.send_request(actor_id=self.actor_id,
method=method,
arguments=arguments)
def _handle_call(self, method: typing.Callable, *args, **kwargs) -> None:
def _handle_call(self, method: typing.Callable, *args, **kwargs) \
-> asyncio.Future:
arguments = inspect.signature(method).bind(
None, *args, **kwargs).arguments
arguments.pop('self', None)
return self.send_request(method.__name__, **arguments)
def ping_actor(self, data: typing.Any = None) -> typing.Any:
return self.send_request(method='ping_actor', data=data)
def is_actor_method(obj):
return getattr(obj, '__tobiko_actor_method__', False)
@@ -75,51 +83,73 @@ class Actor(tobiko.SharedFixture, typing.Generic[P],
metaclass=_proxy.GenericMeta):
max_queue_size: int = 0
log: logging.LoggerAdapter
event_loop: asyncio.AbstractEventLoop
actor_ref: P
_actor_protocol = _DummyActorProtocol
_actor_request_queue: _request.ActorRequestQueue
_cancel_actor = False
_run_actor_task: asyncio.Task
# Class methods ----------------------------------------------------------
def __init_subclass__(cls,
*args,
**kwargs):
super().__init_subclass__(*args, **kwargs)
cls._actor_methods = dict(inspect.getmembers(cls, is_actor_method))
cls._actor_ref_class = ActorRef[cls._actor_protocol]
cls._actor_ref_class: typing.Type[ActorRef[P]] = (
ActorRef[cls._actor_protocol])
def __class_getitem__(cls, item: typing.Type[P]):
tobiko.check_valid_type(item, type)
return type(cls.__name__, (cls, item), dict(_actor_protocol=item))
if isinstance(item, type):
return type(cls.__name__, (cls, item), dict(_actor_protocol=item))
else:
return cls
# Public instance methods ------------------------------------------------
def __init__(self,
actor_id: str = None,
event_loop: asyncio.AbstractEventLoop = None,
log: logging.LoggerAdapter = None,
requests: _request.ActorRequestQueue = None,
ref: ActorRef[P] = None):
# pylint: disable=redefined-outer-name
super().__init__()
if actor_id is None:
actor_id = self._init_actor_id()
self.actor_id = actor_id
if event_loop is None:
event_loop = self._init_event_loop()
self.event_loop = event_loop
if log is None:
log = self._init_log()
self.log = log
if requests is None:
requests = self._init_actor_request_queue()
self.requests = requests
if ref is None:
ref = self._init_actor_ref()
self.ref = typing.cast(P, ref)
self.setup_future = event_loop.create_future()
self.cleanup_future = event_loop.create_future()
@property
def actor_id(self) -> str:
return self.fixture_name
def actor_name(self) -> str:
return tobiko.get_fixture_name(self)
def setup_fixture(self):
self.log = self._setup_log()
self.event_loop = self._setup_event_loop()
self._actor_request_queue = self._setup_actor_request_queue()
self.actor_ref = self._setup_actor_ref()
self.setup_future.cancel()
self.setup_future = self.event_loop.create_future()
self._run_actor_task = self.event_loop.create_task(
self._run_actor())
def _setup_log(self):
return log.getLogger(self.actor_id)
@staticmethod
def _setup_event_loop() -> asyncio.AbstractEventLoop:
return asyncio.get_event_loop()
def _setup_actor_request_queue(self) -> _request.ActorRequestQueue:
return _request.create_request_queue(max_size=self.max_queue_size,
loop=self.event_loop)
def _setup_actor_ref(self) -> P:
return self._actor_ref_class(actor_id=self.actor_id,
requests=self._actor_request_queue)
def cleanup_fixture(self):
self.cleanup_future.cancel()
self.cleanup_future = self.event_loop.create_future()
self._cancel_actor = True
self.ref.ping_actor('cleanup') # must weak up the actor with a message
async def setup_actor(self):
pass
@@ -127,48 +157,88 @@ class Actor(tobiko.SharedFixture, typing.Generic[P],
async def cleanup_actor(self):
pass
async def on_setup_error(self):
self.log.exception("Actor setup error")
async def on_request_error(
self, request: typing.Optional[_request.ActorRequest]):
self.log.exception(f"Actor request error: {request}")
pass
async def on_cleanup_error(self):
self.log.exception("Actor cleanup error")
pass
async def ping_actor(self, data: typing.Any = None) -> typing.Any:
return data
ping_actor.__tobiko_actor_method__ = True # type: ignore[attr-defined]
# Private instance methods -----------------------------------------------
@staticmethod
def _init_actor_id() -> str:
return str(uuid.uuid4())
def _init_log(self):
return log.getLogger(self.actor_name)
@staticmethod
def _init_event_loop() -> asyncio.AbstractEventLoop:
return asyncio.get_event_loop()
def _init_actor_request_queue(self) -> _request.ActorRequestQueue:
return _request.create_request_queue(max_size=self.max_queue_size,
loop=self.event_loop)
def _init_actor_ref(self) -> ActorRef[P]:
return self._actor_ref_class(actor_id=self.actor_id,
requests=self.requests)
async def _run_actor(self):
try:
await self.setup_actor()
except Exception:
await self.on_setup_error()
await self._cleanup_actor()
else:
while True:
await self._setup_actor()
self._cancel_actor = False
while not self._cancel_actor:
request = None
try:
request = await (
self._actor_request_queue.receive_request())
if not isinstance(request, _request.ActorRequest):
raise TypeError(
f"Invalid actor request type: {request}")
request = await self.requests.receive_request()
await self._receive_request(request)
except Exception:
await self.on_request_error(request=request)
finally:
with tobiko.exc_info(reraise=True):
await self._cleanup_actor()
async def _setup_actor(self):
try:
self.log.debug(f'Actor setup started {self.actor_name}')
await self.setup_actor()
except Exception as ex:
self.log.exception(
f'Failed Setting up actor: {self.actor_name} '
f'({self.actor_id})')
self.setup_future.set_exception(ex)
else:
self.setup_future.set_result(self.ref)
self.log.debug(f'Actor setup succeeded: {self.actor_name} '
f'({self.actor_id}).')
async def _cleanup_actor(self):
try:
self.log.debug(f'Actor cleanup started: {self.actor_name} '
f'({self.actor_id}).')
await self.cleanup_actor()
except Exception:
await self.on_cleanup_error()
def _get_actor_method(self, name: str) -> typing.Callable:
method = self._actor_methods.get(name)
if method is None:
raise ValueError(f"Invalid request method name: {name}")
return method
except Exception as ex:
self.cleanup_future.set_exception(ex)
self.log.exception(
f'Actor cleanup failed: {self.actor_name} '
f'({self.actor_id}).')
else:
self.cleanup_future.set_result(self.ref)
self.log.debug(f'Actor cleanup succeeded: {self.actor_name} '
f'({self.actor_id}).')
finally:
with tobiko.exc_info(reraise=True):
await self.requests.cancel_requests(actor_id=self.actor_id)
async def _receive_request(self, request: _request.ActorRequest):
tobiko.check_valid_type(request, _request.ActorRequest)
if request.actor_id != self.actor_id:
raise ValueError(f"Invalid request actor_id: {request.actor_id}")
method = self._get_actor_method(request.method)
try:
result = await method(self, **request.arguments)
@@ -177,11 +247,51 @@ class Actor(tobiko.SharedFixture, typing.Generic[P],
else:
request.future.set_result(result)
def _get_actor_method(self, name: str) -> typing.Callable:
method = self._actor_methods.get(name)
if method is None:
raise ValueError(f"Invalid request method name: {name}")
return method
def create_actor(obj: typing.Type[P],
fixture_id: typing.Optional[str] = None,
manager=None) -> P:
ActorType = typing.Union[Actor[P], typing.Type[Actor[P]]]
def start_actor(obj: ActorType,
fixture_id: typing.Optional[str] = None,
manager=None) -> ActorRef[P]:
return tobiko.setup_fixture(obj,
fixture_id=fixture_id,
manager=manager).ref
async def setup_actor(obj: ActorType,
fixture_id: typing.Optional[str] = None,
manager=None,
timeout: tobiko.Seconds = None) -> ActorRef[P]:
actor = tobiko.setup_fixture(obj,
fixture_id=fixture_id,
manager=manager)
return actor.actor_ref
await asyncio.wait_for(actor.setup_future,
timeout=timeout)
return actor.ref
async def stop_actor(obj: ActorType,
fixture_id: typing.Optional[str] = None,
manager=None) -> ActorRef[P]:
return tobiko.cleanup_fixture(obj,
fixture_id=fixture_id,
manager=manager).ref
async def cleanup_actor(obj: ActorType,
fixture_id: typing.Optional[str] = None,
manager=None,
timeout: tobiko.Seconds = None) -> ActorRef[P]:
actor = tobiko.cleanup_fixture(obj,
fixture_id=fixture_id,
manager=manager)
await asyncio.wait_for(actor.cleanup_future,
timeout=timeout)
return actor.ref
+18 -15
View File
@@ -21,8 +21,6 @@ import typing
import decorator
import tobiko
P = typing.TypeVar('P', bound=abc.ABC)
@@ -37,16 +35,18 @@ if hasattr(typing, 'GenericMeta'):
class GenericMeta(GenericMetaBase):
def __getitem__(self, item):
# pylint: disable=not-callable
cls = self
getitem = getattr(super(), '__getitem__', None)
if callable(getitem):
cls = getitem(item) # pylint: disable=not-callable
if hasattr(cls, '__class_getitem__'):
cls = cls.__class_getitem__(cls, item)
return cls
elif hasattr(self, '__class_getitem__'):
return self.__class_getitem__(item)
else:
return self
cls = getitem(item)
class_getitem = getattr(cls, '__class_getitem__', None)
if callable(class_getitem):
if inspect.ismethod(class_getitem):
cls = class_getitem(item)
else:
cls = class_getitem(cls, item)
return cls
def is_public_function(obj):
@@ -57,7 +57,8 @@ def is_public_function(obj):
class CallHandler(abc.ABC):
@abc.abstractmethod
def _handle_call(self, method: typing.Callable, *args, **kwargs):
def _handle_call(self, method: typing.Callable, *args, **kwargs) \
-> typing.Any:
raise NotImplementedError
@@ -65,10 +66,12 @@ class CallProxyBase(CallHandler, typing.Generic[P], abc.ABC,
metaclass=GenericMeta):
def __class_getitem__(cls, item: typing.Type[P]):
tobiko.check_valid_type(item, type)
return create_call_proxy_class(protocols=(item,),
class_name=cls.__name__,
bases=(cls,))
if isinstance(item, type):
return create_call_proxy_class(protocols=(item,),
class_name=cls.__name__,
bases=(cls,))
else:
return cls
def __init_subclass__(cls,
*args,
+68 -8
View File
@@ -15,36 +15,96 @@
# under the License.
from __future__ import absolute_import
import abc
import asyncio
import typing
class ActorRequest(typing.NamedTuple):
future: asyncio.Future
actor_id: str
method: str
arguments: typing.Dict[str, typing.Any]
class ActorRequestQueue(object):
class ActorRequestQueue(abc.ABC):
def __init__(self, loop: asyncio.AbstractEventLoop, max_size=0):
self._queue: asyncio.Queue = asyncio.Queue(maxsize=max_size)
@abc.abstractmethod
def send_request(self,
actor_id: str,
method: str,
arguments: typing.Dict[str, typing.Any]) \
-> asyncio.Future:
raise NotImplementedError
async def cancel_requests(self,
actor_id: str = None) \
-> typing.List[ActorRequest]:
return await self.drain_requests(actor_id=actor_id,
cancel=True)
@abc.abstractmethod
async def drain_requests(self,
actor_id: str = None,
cancel=False) -> typing.List[ActorRequest]:
raise NotImplementedError
@abc.abstractmethod
async def receive_request(self) -> ActorRequest:
raise NotImplementedError
class AsyncioActorRequestQueue(ActorRequestQueue):
def __init__(self,
loop: asyncio.AbstractEventLoop,
max_size=0):
self.max_size = max_size
self._loop = loop
self._queue: asyncio.Queue = self._init_queue()
def _init_queue(self) -> asyncio.Queue:
return asyncio.Queue(maxsize=self.max_size)
def send_request(self,
actor_id: str,
method: str,
arguments: typing.Dict[str, typing.Any]):
arguments: typing.Dict[str, typing.Any]) \
-> asyncio.Future:
future = self._loop.create_future()
request = ActorRequest(future=future,
actor_id=actor_id,
method=method,
arguments=arguments)
self._queue.put_nowait(request)
return future
async def receive_request(self):
async def drain_requests(self, actor_id: str = None,
cancel=False) \
-> typing.List[ActorRequest]:
old_queue = self._queue
self._queue = self._init_queue()
keep_requests = []
drained_requests = []
while True:
try:
request: ActorRequest = old_queue.get_nowait()
except asyncio.QueueEmpty:
break
if actor_id in [None, request.actor_id]:
drained_requests.append(request)
if cancel:
request.future.cancel()
else:
keep_requests.append(request)
for request in keep_requests:
await self._queue.put(request)
return drained_requests
async def receive_request(self) -> ActorRequest:
return await self._queue.get()
def create_request_queue(loop: asyncio.AbstractEventLoop, max_size=0) \
-> ActorRequestQueue:
return ActorRequestQueue(loop=loop, max_size=max_size)
def create_request_queue(loop: asyncio.AbstractEventLoop,
max_size=0) -> ActorRequestQueue:
return AsyncioActorRequestQueue(loop=loop, max_size=max_size)
+27 -4
View File
@@ -18,6 +18,7 @@ from __future__ import absolute_import
import abc
import typing
import tobiko
from tobiko.tests import unit
from tobiko import actors
@@ -60,22 +61,44 @@ class GreeterActor(actors.Actor[Greeter]):
raise ValueError("'whom' parameter can't be empty")
self.log.info(f"Hello {whom}!")
await greeted.greeted(whom=whom, greeter=self.actor_ref)
await greeted.greeted(whom=whom, greeter=self)
class ActorTest(unit.TobikoUnitTest):
actor = tobiko.required_fixture(GreeterActor, setup=False)
async def test_setup_actor(self):
self.assertFalse(self.actor.setup_called)
self.assertFalse(self.actor.cleanup_called)
await actors.setup_actor(self.actor)
self.assertTrue(self.actor.setup_called)
self.assertFalse(self.actor.cleanup_called)
async def test_cleanup_actor(self):
await actors.setup_actor(self.actor)
self.assertTrue(self.actor.setup_called)
self.assertFalse(self.actor.cleanup_called)
await actors.cleanup_actor(self.actor)
self.assertTrue(self.actor.setup_called)
self.assertTrue(self.actor.cleanup_called)
async def test_ping_actor(self):
ref = actors.start_actor(self.actor)
result = await ref.ping_actor(self.id())
self.assertEqual(self.id(), result)
async def test_async_request(self):
greeter = actors.create_actor(GreeterActor)
greeter = actors.start_actor(self.actor)
self.assertIsInstance(greeter, actors.ActorRef)
self.assertIsInstance(greeter, Greeter)
greeted = Greeted()
await greeter.greet(whom=self.id(), greeted=greeted)
self.assertEqual(self.id(), greeted.whom)
self.assertIs(greeter, greeted.greeter)
self.assertIs(self.actor, greeted.greeter)
async def test_async_request_failure(self):
greeter = actors.create_actor(GreeterActor)
greeter = actors.start_actor(self.actor)
self.assertIsInstance(greeter, actors.ActorRef)
self.assertIsInstance(greeter, Greeter)
greeted = Greeted()