Implement base actor class
Change-Id: I0861957f7c8c4e4ea1153dac4f44fbb2e0f1c6dc
This commit is contained in:
parent
c1fcde26b0
commit
2de97f4826
@ -58,6 +58,7 @@ required_setup_fixture = _fixture.required_setup_fixture
|
|||||||
get_fixture_name = _fixture.get_fixture_name
|
get_fixture_name = _fixture.get_fixture_name
|
||||||
get_fixture_class = _fixture.get_fixture_class
|
get_fixture_class = _fixture.get_fixture_class
|
||||||
get_fixture_dir = _fixture.get_fixture_dir
|
get_fixture_dir = _fixture.get_fixture_dir
|
||||||
|
get_object_name = _fixture.get_object_name
|
||||||
remove_fixture = _fixture.remove_fixture
|
remove_fixture = _fixture.remove_fixture
|
||||||
reset_fixture = _fixture.reset_fixture
|
reset_fixture = _fixture.reset_fixture
|
||||||
setup_fixture = _fixture.setup_fixture
|
setup_fixture = _fixture.setup_fixture
|
||||||
@ -85,6 +86,7 @@ get_operation_name = _operation.get_operation_name
|
|||||||
operation_config = _operation.operation_config
|
operation_config = _operation.operation_config
|
||||||
|
|
||||||
Protocol = _proxy.Protocol
|
Protocol = _proxy.Protocol
|
||||||
|
list_protocols = _proxy.list_protocols
|
||||||
call_proxy = _proxy.call_proxy
|
call_proxy = _proxy.call_proxy
|
||||||
call_proxy_class = _proxy.call_proxy_class
|
call_proxy_class = _proxy.call_proxy_class
|
||||||
CallHandler = _proxy.CallHandler
|
CallHandler = _proxy.CallHandler
|
||||||
|
23
tobiko/actor/__init__.py
Normal file
23
tobiko/actor/__init__.py
Normal file
@ -0,0 +1,23 @@
|
|||||||
|
# Copyright (c) 2021 Red Hat, Inc.
|
||||||
|
#
|
||||||
|
# All Rights Reserved.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
from __future__ import absolute_import
|
||||||
|
|
||||||
|
from tobiko.actor import _actor
|
||||||
|
|
||||||
|
create_actor = _actor.create_actor
|
||||||
|
actor_method = _actor.actor_method
|
||||||
|
Actor = _actor.Actor
|
||||||
|
ActorRef = _actor.ActorRef
|
195
tobiko/actor/_actor.py
Normal file
195
tobiko/actor/_actor.py
Normal file
@ -0,0 +1,195 @@
|
|||||||
|
# Copyright (c) 2021 Red Hat, Inc.
|
||||||
|
#
|
||||||
|
# All Rights Reserved.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
from __future__ import absolute_import
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import inspect
|
||||||
|
import logging
|
||||||
|
import typing
|
||||||
|
|
||||||
|
from oslo_log import log
|
||||||
|
|
||||||
|
import tobiko
|
||||||
|
from tobiko.actor import _request
|
||||||
|
|
||||||
|
|
||||||
|
class ActorRef(tobiko.CallHandler):
|
||||||
|
|
||||||
|
def __init__(self,
|
||||||
|
actor_id: str,
|
||||||
|
requests: _request.ActorRequestQueue,
|
||||||
|
protocols: typing.Iterable[typing.Type[tobiko.Protocol]]):
|
||||||
|
self.actor_id = actor_id
|
||||||
|
self._requests = requests
|
||||||
|
self._interfaces: typing.Dict[
|
||||||
|
typing.Type[tobiko.Protocol],
|
||||||
|
typing.Any] = {protocol: None
|
||||||
|
for protocol in protocols}
|
||||||
|
|
||||||
|
def send_request(self, method: str, **arguments):
|
||||||
|
return self._requests.send_request(method=method, arguments=arguments)
|
||||||
|
|
||||||
|
def _handle_call(self, method: typing.Callable, *args, **kwargs) -> None:
|
||||||
|
arguments = inspect.signature(method).bind(
|
||||||
|
None, *args, **kwargs).arguments
|
||||||
|
arguments.pop('self', None)
|
||||||
|
return self.send_request(method.__name__, **arguments)
|
||||||
|
|
||||||
|
def get_interface(self, protocol: typing.Type[tobiko.Protocol]):
|
||||||
|
try:
|
||||||
|
interface = self._interfaces[protocol]
|
||||||
|
except KeyError as ex:
|
||||||
|
raise TypeError(
|
||||||
|
f"Protocol '{protocol}' is not supported by actor "
|
||||||
|
f"'{self.actor_id}") from ex
|
||||||
|
|
||||||
|
if interface is None:
|
||||||
|
self._interfaces[protocol] = interface = tobiko.call_proxy(
|
||||||
|
protocol, self._handle_call)
|
||||||
|
return interface
|
||||||
|
|
||||||
|
|
||||||
|
def is_actor_method(obj):
|
||||||
|
return getattr(obj, '__tobiko_actor_method__', False)
|
||||||
|
|
||||||
|
|
||||||
|
def actor_method(obj):
|
||||||
|
if not callable(obj):
|
||||||
|
raise TypeError(f"Actor method {obj} is not callable")
|
||||||
|
|
||||||
|
if not inspect.iscoroutinefunction(obj):
|
||||||
|
raise TypeError(f"Actor method {obj} is not async")
|
||||||
|
|
||||||
|
name = getattr(obj, '__name__', None)
|
||||||
|
if name is None or hasattr(ActorRef, name) or hasattr(Actor, name):
|
||||||
|
raise TypeError(f"Invalid method name: '{name}'")
|
||||||
|
|
||||||
|
obj.__tobiko_actor_method__ = True
|
||||||
|
return obj
|
||||||
|
|
||||||
|
|
||||||
|
class Actor(tobiko.SharedFixture):
|
||||||
|
max_queue_size: int = 0
|
||||||
|
|
||||||
|
log: logging.LoggerAdapter
|
||||||
|
loop: asyncio.AbstractEventLoop
|
||||||
|
|
||||||
|
ref: ActorRef
|
||||||
|
|
||||||
|
_requests: _request.ActorRequestQueue
|
||||||
|
_run_actor_task: asyncio.Task
|
||||||
|
|
||||||
|
_protocols: typing.Sequence[typing.Type[tobiko.Protocol]]
|
||||||
|
_actor_methods: typing.Dict[str, typing.Callable]
|
||||||
|
|
||||||
|
@property
|
||||||
|
def actor_id(self) -> str:
|
||||||
|
return self.fixture_name
|
||||||
|
|
||||||
|
def setup_fixture(self):
|
||||||
|
self.loop = self.get_loop()
|
||||||
|
self.log = self.create_log()
|
||||||
|
self._requests = self.create_request_queue()
|
||||||
|
self._protocols = tobiko.list_protocols(type(self))
|
||||||
|
self.ref = ActorRef(actor_id=self.actor_id,
|
||||||
|
requests=self._requests,
|
||||||
|
protocols=self._protocols)
|
||||||
|
self._run_actor_task = self.loop.create_task(
|
||||||
|
self._run_actor())
|
||||||
|
|
||||||
|
def get_loop(self) -> asyncio.AbstractEventLoop:
|
||||||
|
return asyncio.get_event_loop()
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def _get_actor_methods(cls) -> typing.Dict[str, typing.Callable]:
|
||||||
|
try:
|
||||||
|
return cls._actor_methods
|
||||||
|
except AttributeError:
|
||||||
|
pass
|
||||||
|
cls._actor_methods = dict(inspect.getmembers(cls, is_actor_method))
|
||||||
|
return cls._actor_methods
|
||||||
|
|
||||||
|
def create_log(self):
|
||||||
|
return log.getLogger(self.actor_id)
|
||||||
|
|
||||||
|
def create_request_queue(self) -> _request.ActorRequestQueue:
|
||||||
|
return _request.create_request_queue(max_size=self.max_queue_size,
|
||||||
|
loop=self.loop)
|
||||||
|
|
||||||
|
async def setup_actor(self):
|
||||||
|
pass
|
||||||
|
|
||||||
|
async def cleanup_actor(self):
|
||||||
|
pass
|
||||||
|
|
||||||
|
async def on_setup_error(self):
|
||||||
|
self.log.exception("Actor setup error")
|
||||||
|
|
||||||
|
async def on_request_error(
|
||||||
|
self, request: typing.Optional[_request.ActorRequest]):
|
||||||
|
self.log.exception(f"Actor request error: {request}")
|
||||||
|
|
||||||
|
async def on_cleanup_error(self):
|
||||||
|
self.log.exception("Actor cleanup error")
|
||||||
|
|
||||||
|
async def _run_actor(self):
|
||||||
|
try:
|
||||||
|
await self.setup_actor()
|
||||||
|
except Exception:
|
||||||
|
await self.on_setup_error()
|
||||||
|
await self._cleanup_actor()
|
||||||
|
else:
|
||||||
|
while True:
|
||||||
|
request = None
|
||||||
|
try:
|
||||||
|
request = await self._requests.receive_request()
|
||||||
|
if not isinstance(request, _request.ActorRequest):
|
||||||
|
raise TypeError(
|
||||||
|
f"Invalid actor request type: {request}")
|
||||||
|
await self._receive_request(request)
|
||||||
|
except Exception:
|
||||||
|
await self.on_request_error(request=request)
|
||||||
|
|
||||||
|
async def _cleanup_actor(self):
|
||||||
|
try:
|
||||||
|
await self.cleanup_actor()
|
||||||
|
except Exception:
|
||||||
|
await self.on_cleanup_error()
|
||||||
|
|
||||||
|
def _get_actor_method(self, name: str) -> typing.Callable:
|
||||||
|
methods = self._get_actor_methods()
|
||||||
|
method = methods.get(name)
|
||||||
|
if method is None:
|
||||||
|
raise ValueError(f"Invalid request method name: {name}")
|
||||||
|
return method
|
||||||
|
|
||||||
|
async def _receive_request(self, request: _request.ActorRequest):
|
||||||
|
method = self._get_actor_method(request.method)
|
||||||
|
try:
|
||||||
|
result = await method(self, **request.arguments)
|
||||||
|
except Exception as ex:
|
||||||
|
request.future.set_exception(ex)
|
||||||
|
else:
|
||||||
|
request.future.set_result(result)
|
||||||
|
|
||||||
|
|
||||||
|
def create_actor(obj: typing.Union[str, Actor, typing.Type[Actor]],
|
||||||
|
fixture_id: typing.Optional[str] = None,
|
||||||
|
manager=None) -> ActorRef:
|
||||||
|
actor = tobiko.setup_fixture(obj,
|
||||||
|
fixture_id=fixture_id,
|
||||||
|
manager=manager)
|
||||||
|
return actor.ref
|
50
tobiko/actor/_request.py
Normal file
50
tobiko/actor/_request.py
Normal file
@ -0,0 +1,50 @@
|
|||||||
|
# Copyright (c) 2021 Red Hat, Inc.
|
||||||
|
#
|
||||||
|
# All Rights Reserved.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
from __future__ import absolute_import
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import typing
|
||||||
|
|
||||||
|
|
||||||
|
class ActorRequest(typing.NamedTuple):
|
||||||
|
future: asyncio.Future
|
||||||
|
method: str
|
||||||
|
arguments: typing.Dict[str, typing.Any]
|
||||||
|
|
||||||
|
|
||||||
|
class ActorRequestQueue(object):
|
||||||
|
|
||||||
|
def __init__(self, loop: asyncio.AbstractEventLoop, max_size=0):
|
||||||
|
self._queue: asyncio.Queue = asyncio.Queue(maxsize=max_size)
|
||||||
|
self._loop = loop
|
||||||
|
|
||||||
|
def send_request(self,
|
||||||
|
method: str,
|
||||||
|
arguments: typing.Dict[str, typing.Any]):
|
||||||
|
future = self._loop.create_future()
|
||||||
|
request = ActorRequest(future=future,
|
||||||
|
method=method,
|
||||||
|
arguments=arguments)
|
||||||
|
self._queue.put_nowait(request)
|
||||||
|
return future
|
||||||
|
|
||||||
|
async def receive_request(self):
|
||||||
|
return await self._queue.get()
|
||||||
|
|
||||||
|
|
||||||
|
def create_request_queue(loop: asyncio.AbstractEventLoop, max_size=0) \
|
||||||
|
-> ActorRequestQueue:
|
||||||
|
return ActorRequestQueue(loop=loop, max_size=max_size)
|
@ -13,6 +13,8 @@
|
|||||||
# under the License.
|
# under the License.
|
||||||
from __future__ import absolute_import
|
from __future__ import absolute_import
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import functools
|
||||||
import inspect
|
import inspect
|
||||||
import shutil
|
import shutil
|
||||||
import os
|
import os
|
||||||
@ -65,6 +67,20 @@ class TobikoUnitTest(_patch.PatchMixin, testtools.TestCase):
|
|||||||
'no_proxy': '127.0.0.1'
|
'no_proxy': '127.0.0.1'
|
||||||
}
|
}
|
||||||
|
|
||||||
|
def _get_test_method(self):
|
||||||
|
method = super(TobikoUnitTest, self)._get_test_method()
|
||||||
|
if inspect.iscoroutinefunction(method):
|
||||||
|
|
||||||
|
@functools.wraps(method)
|
||||||
|
def wrapped_test(*args, **kwargs):
|
||||||
|
loop = asyncio.get_event_loop()
|
||||||
|
task = loop.create_task(method(*args, **kwargs))
|
||||||
|
loop.run_until_complete(task)
|
||||||
|
|
||||||
|
return wrapped_test
|
||||||
|
else:
|
||||||
|
return method
|
||||||
|
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
super(TobikoUnitTest, self).setUp()
|
super(TobikoUnitTest, self).setUp()
|
||||||
# Protect from mis-configuring logging
|
# Protect from mis-configuring logging
|
||||||
|
85
tobiko/tests/unit/actor/test_actor.py
Normal file
85
tobiko/tests/unit/actor/test_actor.py
Normal file
@ -0,0 +1,85 @@
|
|||||||
|
# Copyright (c) 2021 Red Hat, Inc.
|
||||||
|
#
|
||||||
|
# All Rights Reserved.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
from __future__ import absolute_import
|
||||||
|
|
||||||
|
from unittest import mock
|
||||||
|
|
||||||
|
import tobiko
|
||||||
|
from tobiko.tests import unit
|
||||||
|
from tobiko import actor
|
||||||
|
|
||||||
|
|
||||||
|
class Greeter(tobiko.Protocol):
|
||||||
|
|
||||||
|
async def greet(self, whom: str, greeted: 'Greeted'):
|
||||||
|
raise NotImplementedError
|
||||||
|
|
||||||
|
|
||||||
|
class Greeted(tobiko.Protocol):
|
||||||
|
|
||||||
|
def greeted(self, whom: str, greeter: Greeter):
|
||||||
|
raise NotImplementedError
|
||||||
|
|
||||||
|
|
||||||
|
class GreeterActor(Greeter, actor.Actor):
|
||||||
|
|
||||||
|
setup_called = False
|
||||||
|
cleanup_called = False
|
||||||
|
|
||||||
|
async def setup_actor(self):
|
||||||
|
self.setup_called = True
|
||||||
|
|
||||||
|
async def cleanup_actor(self):
|
||||||
|
self.cleanup_called = True
|
||||||
|
|
||||||
|
@actor.actor_method
|
||||||
|
async def greet(self, whom: str, greeted: Greeted):
|
||||||
|
assert self.setup_called
|
||||||
|
assert not self.cleanup_called
|
||||||
|
if not whom:
|
||||||
|
raise ValueError("'whom' parameter can't be empty")
|
||||||
|
|
||||||
|
self.log.info(f"Hello {whom}!")
|
||||||
|
greeted.greeted(whom=whom, greeter=self.ref.get_interface(Greeter))
|
||||||
|
|
||||||
|
|
||||||
|
class ActorTest(unit.TobikoUnitTest):
|
||||||
|
|
||||||
|
async def test_async_request(self):
|
||||||
|
actor_ref = actor.create_actor(GreeterActor)
|
||||||
|
self.assertIsInstance(actor_ref, actor.ActorRef)
|
||||||
|
greeter = actor_ref.get_interface(Greeter)
|
||||||
|
self.assertIsInstance(greeter, Greeter)
|
||||||
|
greeted = mock.MagicMock(spec=Greeted)
|
||||||
|
|
||||||
|
await greeter.greet(whom=self.id(), greeted=greeted)
|
||||||
|
greeted.greeted.assert_called_with(whom=self.id(),
|
||||||
|
greeter=greeter)
|
||||||
|
|
||||||
|
async def test_async_request_failure(self):
|
||||||
|
actor_ref = actor.create_actor(GreeterActor)
|
||||||
|
self.assertIsInstance(actor_ref, actor.ActorRef)
|
||||||
|
greeter = actor_ref.get_interface(Greeter)
|
||||||
|
self.assertIsInstance(greeter, Greeter)
|
||||||
|
greeted = mock.MagicMock(spec=Greeted)
|
||||||
|
|
||||||
|
try:
|
||||||
|
await greeter.greet(whom="", greeted=greeted)
|
||||||
|
except ValueError as ex:
|
||||||
|
self.assertEqual("'whom' parameter can't be empty", str(ex))
|
||||||
|
else:
|
||||||
|
self.fail("Exception not raised")
|
||||||
|
greeted.greeted.assert_not_called()
|
Loading…
Reference in New Issue
Block a user