From 6bf4192b0295baf330415c960e34da13792746f3 Mon Sep 17 00:00:00 2001 From: Federico Ressi Date: Thu, 20 Aug 2020 18:47:52 +0200 Subject: [PATCH] Create timer tool based on SIGALARM signal Change-Id: Ic3eecb78531c836a70f679c24490eb15a18e3262 --- tobiko/__init__.py | 5 ++ tobiko/common/_itimer.py | 109 +++++++++++++++++++++++++ tobiko/tests/functional/test_itimer.py | 71 ++++++++++++++++ 3 files changed, 185 insertions(+) create mode 100644 tobiko/common/_itimer.py create mode 100644 tobiko/tests/functional/test_itimer.py diff --git a/tobiko/__init__.py b/tobiko/__init__.py index 50ed87f21..794011832 100644 --- a/tobiko/__init__.py +++ b/tobiko/__init__.py @@ -19,6 +19,7 @@ from tobiko.common import _config from tobiko.common import _detail from tobiko.common import _exception from tobiko.common import _fixture +from tobiko.common import _itimer from tobiko.common import _logging from tobiko.common.managers import loader as loader_manager from tobiko.common import _operation @@ -65,6 +66,10 @@ list_required_fixtures = _fixture.list_required_fixtures SharedFixture = _fixture.SharedFixture FixtureManager = _fixture.FixtureManager +itimer = _itimer.itimer +ITimer = _itimer.ITimer +ITimerExpired = _itimer.ITimerExpired + CaptureLogFixture = _logging.CaptureLogFixture load_object = loader_manager.load_object diff --git a/tobiko/common/_itimer.py b/tobiko/common/_itimer.py new file mode 100644 index 000000000..cc42bd4e9 --- /dev/null +++ b/tobiko/common/_itimer.py @@ -0,0 +1,109 @@ +# Copyright 2020 Red Hat +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +from __future__ import absolute_import + +import signal +import traceback +import typing # noqa + +from tobiko.common import _exception +from tobiko.common import _fixture +from tobiko.common import _time + + +ITIMER_SIGNALS = { + signal.SIGALRM: signal.ITIMER_REAL, + signal.SIGVTALRM: signal.ITIMER_VIRTUAL, + signal.SIGPROF: signal.ITIMER_PROF +} + + +class ITimerExpired(_exception.TobikoException): + message = ("ITimer expired (signal_number={signal_number}):\n" + "{stack}") + + +ITimerHandler = typing.Callable[[int, typing.Any], typing.Any] + + +class ITimer(_fixture.SharedFixture): + + signal_number: int = signal.SIGALRM + delay: float = 0. + interval: float = 0. + + original_delay: _time.Seconds = None + original_interval: _time.Seconds = None + original_handler: typing.Union[typing.Callable, int, None] = None + + def __init__(self, + signal_number: typing.Optional[int] = None, + delay: _time.Seconds = None, + interval: _time.Seconds = None, + on_timeout: typing.Optional[ITimerHandler] = None): + super(ITimer, self).__init__() + if signal_number is not None: + self.signal_number = signal_number + if delay is not None: + self.delay = delay + if interval is not None: + self.interval = interval + if on_timeout: + setattr(self, 'on_timeout', on_timeout) + + def setup_fixture(self): + self.setup_handler() + self.setup_timer() + + def setup_handler(self): + self.original_handler = signal.getsignal(self.signal_number) + signal.signal(self.signal_number, self.on_timeout) + self.addCleanup(self.cleanup_handler) + + def cleanup_handler(self): + if self.original_handler is not None: + signal.signal(self.signal_number, self.original_handler) + del self.original_handler + + def setup_timer(self): + self.original_delay, self.original_interval = signal.setitimer( + self.which, self.delay, self.interval) + self.addCleanup(self.cleanup_timer) + + def cleanup_timer(self): + if (self.original_delay is not None and + self.original_interval is not None): + signal.setitimer(self.which, self.original_delay, + self.original_interval) + del self.original_delay + del self.original_interval + + @property + def which(self): + return ITIMER_SIGNALS[self.signal_number] + + def on_timeout(self, signal_number: int, frame: typing.Any): + assert self.signal_number == signal_number + stack = ''.join(traceback.format_stack(frame)) + raise ITimerExpired(signal_number=signal_number, stack=stack) + + +def itimer(signal_number: typing.Optional[int] = None, + delay: _time.Seconds = None, + interval: _time.Seconds = None, + on_timeout: typing.Optional[ITimerHandler] = None): + return ITimer(signal_number=signal_number, + delay=delay, + interval=interval, + on_timeout=on_timeout) diff --git a/tobiko/tests/functional/test_itimer.py b/tobiko/tests/functional/test_itimer.py new file mode 100644 index 000000000..a80f4d757 --- /dev/null +++ b/tobiko/tests/functional/test_itimer.py @@ -0,0 +1,71 @@ +# Copyright (c) 2020 Red Hat, Inc. +# +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +from __future__ import absolute_import + +import itertools +import signal +import time +import traceback + +import testtools + +import tobiko + + +class ITimerTest(testtools.TestCase): + + do_wait = True + + def test_itimer(self, signal_number=None, delay=0.01, + exception=tobiko.ITimerExpired, on_timeout=None): + with tobiko.itimer(delay=delay, signal_number=signal_number, + on_timeout=on_timeout): + if exception is None: + self._wait_for_timeout() + else: + ex = self.assertRaises(exception, self._wait_for_timeout) + self.assertEqual(signal_number or signal.SIGALRM, + ex.signal_number) + self.assertIn('_wait_for_timeout', ex.stack) + + def test_itimer_with_sigalrm(self): + self.test_itimer(signal_number=signal.SIGALRM) + + def test_itimer_with_sigvtalrm(self): + self.test_itimer(signal_number=signal.SIGVTALRM) + + def test_itimer_with_sigprof(self): + self.test_itimer(signal_number=signal.SIGPROF) + + def test_itimer_with_on_timeout(self): + + counter = itertools.count() + + def on_timeout(signal_number, frame): + self.do_wait = False + next(counter) + self.assertEqual(signal.SIGALRM, signal_number) + stack = traceback.extract_stack(frame, 1) + self.assertEqual('_wait_for_timeout', stack[0].name) + + self.test_itimer(signal_number=signal.SIGALRM, + on_timeout=on_timeout, + exception=None) + self.assertEqual(1, next(counter)) + + def _wait_for_timeout(self): + while self.do_wait: + time.sleep(0.)