diff --git a/tobiko/__init__.py b/tobiko/__init__.py index d199054d7..7db9295df 100644 --- a/tobiko/__init__.py +++ b/tobiko/__init__.py @@ -58,6 +58,7 @@ required_setup_fixture = _fixture.required_setup_fixture get_fixture_name = _fixture.get_fixture_name get_fixture_class = _fixture.get_fixture_class get_fixture_dir = _fixture.get_fixture_dir +get_object_name = _fixture.get_object_name remove_fixture = _fixture.remove_fixture reset_fixture = _fixture.reset_fixture setup_fixture = _fixture.setup_fixture @@ -85,6 +86,7 @@ get_operation_name = _operation.get_operation_name operation_config = _operation.operation_config Protocol = _proxy.Protocol +list_protocols = _proxy.list_protocols call_proxy = _proxy.call_proxy call_proxy_class = _proxy.call_proxy_class CallHandler = _proxy.CallHandler diff --git a/tobiko/actor/__init__.py b/tobiko/actor/__init__.py new file mode 100644 index 000000000..cdeaabe0f --- /dev/null +++ b/tobiko/actor/__init__.py @@ -0,0 +1,23 @@ +# Copyright (c) 2021 Red Hat, Inc. +# +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +from __future__ import absolute_import + +from tobiko.actor import _actor + +create_actor = _actor.create_actor +actor_method = _actor.actor_method +Actor = _actor.Actor +ActorRef = _actor.ActorRef diff --git a/tobiko/actor/_actor.py b/tobiko/actor/_actor.py new file mode 100644 index 000000000..5fb599141 --- /dev/null +++ b/tobiko/actor/_actor.py @@ -0,0 +1,195 @@ +# Copyright (c) 2021 Red Hat, Inc. +# +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +from __future__ import absolute_import + +import asyncio +import inspect +import logging +import typing + +from oslo_log import log + +import tobiko +from tobiko.actor import _request + + +class ActorRef(tobiko.CallHandler): + + def __init__(self, + actor_id: str, + requests: _request.ActorRequestQueue, + protocols: typing.Iterable[typing.Type[tobiko.Protocol]]): + self.actor_id = actor_id + self._requests = requests + self._interfaces: typing.Dict[ + typing.Type[tobiko.Protocol], + typing.Any] = {protocol: None + for protocol in protocols} + + def send_request(self, method: str, **arguments): + return self._requests.send_request(method=method, arguments=arguments) + + def _handle_call(self, method: typing.Callable, *args, **kwargs) -> None: + arguments = inspect.signature(method).bind( + None, *args, **kwargs).arguments + arguments.pop('self', None) + return self.send_request(method.__name__, **arguments) + + def get_interface(self, protocol: typing.Type[tobiko.Protocol]): + try: + interface = self._interfaces[protocol] + except KeyError as ex: + raise TypeError( + f"Protocol '{protocol}' is not supported by actor " + f"'{self.actor_id}") from ex + + if interface is None: + self._interfaces[protocol] = interface = tobiko.call_proxy( + protocol, self._handle_call) + return interface + + +def is_actor_method(obj): + return getattr(obj, '__tobiko_actor_method__', False) + + +def actor_method(obj): + if not callable(obj): + raise TypeError(f"Actor method {obj} is not callable") + + if not inspect.iscoroutinefunction(obj): + raise TypeError(f"Actor method {obj} is not async") + + name = getattr(obj, '__name__', None) + if name is None or hasattr(ActorRef, name) or hasattr(Actor, name): + raise TypeError(f"Invalid method name: '{name}'") + + obj.__tobiko_actor_method__ = True + return obj + + +class Actor(tobiko.SharedFixture): + max_queue_size: int = 0 + + log: logging.LoggerAdapter + loop: asyncio.AbstractEventLoop + + ref: ActorRef + + _requests: _request.ActorRequestQueue + _run_actor_task: asyncio.Task + + _protocols: typing.Sequence[typing.Type[tobiko.Protocol]] + _actor_methods: typing.Dict[str, typing.Callable] + + @property + def actor_id(self) -> str: + return self.fixture_name + + def setup_fixture(self): + self.loop = self.get_loop() + self.log = self.create_log() + self._requests = self.create_request_queue() + self._protocols = tobiko.list_protocols(type(self)) + self.ref = ActorRef(actor_id=self.actor_id, + requests=self._requests, + protocols=self._protocols) + self._run_actor_task = self.loop.create_task( + self._run_actor()) + + def get_loop(self) -> asyncio.AbstractEventLoop: + return asyncio.get_event_loop() + + @classmethod + def _get_actor_methods(cls) -> typing.Dict[str, typing.Callable]: + try: + return cls._actor_methods + except AttributeError: + pass + cls._actor_methods = dict(inspect.getmembers(cls, is_actor_method)) + return cls._actor_methods + + def create_log(self): + return log.getLogger(self.actor_id) + + def create_request_queue(self) -> _request.ActorRequestQueue: + return _request.create_request_queue(max_size=self.max_queue_size, + loop=self.loop) + + async def setup_actor(self): + pass + + async def cleanup_actor(self): + pass + + async def on_setup_error(self): + self.log.exception("Actor setup error") + + async def on_request_error( + self, request: typing.Optional[_request.ActorRequest]): + self.log.exception(f"Actor request error: {request}") + + async def on_cleanup_error(self): + self.log.exception("Actor cleanup error") + + async def _run_actor(self): + try: + await self.setup_actor() + except Exception: + await self.on_setup_error() + await self._cleanup_actor() + else: + while True: + request = None + try: + request = await self._requests.receive_request() + if not isinstance(request, _request.ActorRequest): + raise TypeError( + f"Invalid actor request type: {request}") + await self._receive_request(request) + except Exception: + await self.on_request_error(request=request) + + async def _cleanup_actor(self): + try: + await self.cleanup_actor() + except Exception: + await self.on_cleanup_error() + + def _get_actor_method(self, name: str) -> typing.Callable: + methods = self._get_actor_methods() + method = methods.get(name) + if method is None: + raise ValueError(f"Invalid request method name: {name}") + return method + + async def _receive_request(self, request: _request.ActorRequest): + method = self._get_actor_method(request.method) + try: + result = await method(self, **request.arguments) + except Exception as ex: + request.future.set_exception(ex) + else: + request.future.set_result(result) + + +def create_actor(obj: typing.Union[str, Actor, typing.Type[Actor]], + fixture_id: typing.Optional[str] = None, + manager=None) -> ActorRef: + actor = tobiko.setup_fixture(obj, + fixture_id=fixture_id, + manager=manager) + return actor.ref diff --git a/tobiko/actor/_request.py b/tobiko/actor/_request.py new file mode 100644 index 000000000..4ad3e0839 --- /dev/null +++ b/tobiko/actor/_request.py @@ -0,0 +1,50 @@ +# Copyright (c) 2021 Red Hat, Inc. +# +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +from __future__ import absolute_import + +import asyncio +import typing + + +class ActorRequest(typing.NamedTuple): + future: asyncio.Future + method: str + arguments: typing.Dict[str, typing.Any] + + +class ActorRequestQueue(object): + + def __init__(self, loop: asyncio.AbstractEventLoop, max_size=0): + self._queue: asyncio.Queue = asyncio.Queue(maxsize=max_size) + self._loop = loop + + def send_request(self, + method: str, + arguments: typing.Dict[str, typing.Any]): + future = self._loop.create_future() + request = ActorRequest(future=future, + method=method, + arguments=arguments) + self._queue.put_nowait(request) + return future + + async def receive_request(self): + return await self._queue.get() + + +def create_request_queue(loop: asyncio.AbstractEventLoop, max_size=0) \ + -> ActorRequestQueue: + return ActorRequestQueue(loop=loop, max_size=max_size) diff --git a/tobiko/tests/unit/_case.py b/tobiko/tests/unit/_case.py index 60f44655e..de9314121 100644 --- a/tobiko/tests/unit/_case.py +++ b/tobiko/tests/unit/_case.py @@ -13,6 +13,8 @@ # under the License. from __future__ import absolute_import +import asyncio +import functools import inspect import shutil import os @@ -65,6 +67,20 @@ class TobikoUnitTest(_patch.PatchMixin, testtools.TestCase): 'no_proxy': '127.0.0.1' } + def _get_test_method(self): + method = super(TobikoUnitTest, self)._get_test_method() + if inspect.iscoroutinefunction(method): + + @functools.wraps(method) + def wrapped_test(*args, **kwargs): + loop = asyncio.get_event_loop() + task = loop.create_task(method(*args, **kwargs)) + loop.run_until_complete(task) + + return wrapped_test + else: + return method + def setUp(self): super(TobikoUnitTest, self).setUp() # Protect from mis-configuring logging diff --git a/tobiko/tests/unit/actor/test_actor.py b/tobiko/tests/unit/actor/test_actor.py new file mode 100644 index 000000000..d4a4b4f0a --- /dev/null +++ b/tobiko/tests/unit/actor/test_actor.py @@ -0,0 +1,85 @@ +# Copyright (c) 2021 Red Hat, Inc. +# +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +from __future__ import absolute_import + +from unittest import mock + +import tobiko +from tobiko.tests import unit +from tobiko import actor + + +class Greeter(tobiko.Protocol): + + async def greet(self, whom: str, greeted: 'Greeted'): + raise NotImplementedError + + +class Greeted(tobiko.Protocol): + + def greeted(self, whom: str, greeter: Greeter): + raise NotImplementedError + + +class GreeterActor(Greeter, actor.Actor): + + setup_called = False + cleanup_called = False + + async def setup_actor(self): + self.setup_called = True + + async def cleanup_actor(self): + self.cleanup_called = True + + @actor.actor_method + async def greet(self, whom: str, greeted: Greeted): + assert self.setup_called + assert not self.cleanup_called + if not whom: + raise ValueError("'whom' parameter can't be empty") + + self.log.info(f"Hello {whom}!") + greeted.greeted(whom=whom, greeter=self.ref.get_interface(Greeter)) + + +class ActorTest(unit.TobikoUnitTest): + + async def test_async_request(self): + actor_ref = actor.create_actor(GreeterActor) + self.assertIsInstance(actor_ref, actor.ActorRef) + greeter = actor_ref.get_interface(Greeter) + self.assertIsInstance(greeter, Greeter) + greeted = mock.MagicMock(spec=Greeted) + + await greeter.greet(whom=self.id(), greeted=greeted) + greeted.greeted.assert_called_with(whom=self.id(), + greeter=greeter) + + async def test_async_request_failure(self): + actor_ref = actor.create_actor(GreeterActor) + self.assertIsInstance(actor_ref, actor.ActorRef) + greeter = actor_ref.get_interface(Greeter) + self.assertIsInstance(greeter, Greeter) + greeted = mock.MagicMock(spec=Greeted) + + try: + await greeter.greet(whom="", greeted=greeted) + except ValueError as ex: + self.assertEqual("'whom' parameter can't be empty", str(ex)) + else: + self.fail("Exception not raised") + greeted.greeted.assert_not_called()