diff --git a/barbican/openstack/common/_i18n.py b/barbican/openstack/common/_i18n.py index fb49186ac..7ec759750 100644 --- a/barbican/openstack/common/_i18n.py +++ b/barbican/openstack/common/_i18n.py @@ -17,14 +17,14 @@ See http://docs.openstack.org/developer/oslo.i18n/usage.html """ try: - import oslo.i18n + import oslo_i18n # NOTE(dhellmann): This reference to o-s-l-o will be replaced by the # application name when this module is synced into the separate # repository. It is OK to have more than one translation function # using the same domain, since there will still only be one message # catalog. - _translators = oslo.i18n.TranslatorFactory(domain='barbican') + _translators = oslo_i18n.TranslatorFactory(domain='barbican') # The primary translation function using the well-known name "_" _ = _translators.primary diff --git a/barbican/openstack/common/periodic_task.py b/barbican/openstack/common/periodic_task.py new file mode 100644 index 000000000..526875847 --- /dev/null +++ b/barbican/openstack/common/periodic_task.py @@ -0,0 +1,232 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import copy +import logging +import random +import time + +from oslo_config import cfg +import six + +from barbican.openstack.common._i18n import _, _LE, _LI + + +periodic_opts = [ + cfg.BoolOpt('run_external_periodic_tasks', + default=True, + help='Some periodic tasks can be run in a separate process. ' + 'Should we run them here?'), +] + +CONF = cfg.CONF +CONF.register_opts(periodic_opts) + +LOG = logging.getLogger(__name__) + +DEFAULT_INTERVAL = 60.0 + + +def list_opts(): + """Entry point for oslo-config-generator.""" + return [(None, copy.deepcopy(periodic_opts))] + + +class InvalidPeriodicTaskArg(Exception): + message = _("Unexpected argument for periodic task creation: %(arg)s.") + + +def periodic_task(*args, **kwargs): + """Decorator to indicate that a method is a periodic task. + + This decorator can be used in two ways: + + 1. Without arguments '@periodic_task', this will be run on the default + interval of 60 seconds. + + 2. With arguments: + @periodic_task(spacing=N [, run_immediately=[True|False]] + [, name=[None|"string"]) + this will be run on approximately every N seconds. If this number is + negative the periodic task will be disabled. If the run_immediately + argument is provided and has a value of 'True', the first run of the + task will be shortly after task scheduler starts. If + run_immediately is omitted or set to 'False', the first time the + task runs will be approximately N seconds after the task scheduler + starts. If name is not provided, __name__ of function is used. + """ + def decorator(f): + # Test for old style invocation + if 'ticks_between_runs' in kwargs: + raise InvalidPeriodicTaskArg(arg='ticks_between_runs') + + # Control if run at all + f._periodic_task = True + f._periodic_external_ok = kwargs.pop('external_process_ok', False) + if f._periodic_external_ok and not CONF.run_external_periodic_tasks: + f._periodic_enabled = False + else: + f._periodic_enabled = kwargs.pop('enabled', True) + f._periodic_name = kwargs.pop('name', f.__name__) + + # Control frequency + f._periodic_spacing = kwargs.pop('spacing', 0) + f._periodic_immediate = kwargs.pop('run_immediately', False) + if f._periodic_immediate: + f._periodic_last_run = None + else: + f._periodic_last_run = time.time() + return f + + # NOTE(sirp): The `if` is necessary to allow the decorator to be used with + # and without parenthesis. + # + # In the 'with-parenthesis' case (with kwargs present), this function needs + # to return a decorator function since the interpreter will invoke it like: + # + # periodic_task(*args, **kwargs)(f) + # + # In the 'without-parenthesis' case, the original function will be passed + # in as the first argument, like: + # + # periodic_task(f) + if kwargs: + return decorator + else: + return decorator(args[0]) + + +class _PeriodicTasksMeta(type): + def _add_periodic_task(cls, task): + """Add a periodic task to the list of periodic tasks. + + The task should already be decorated by @periodic_task. + + :return: whether task was actually enabled + """ + name = task._periodic_name + + if task._periodic_spacing < 0: + LOG.info(_LI('Skipping periodic task %(task)s because ' + 'its interval is negative'), + {'task': name}) + return False + if not task._periodic_enabled: + LOG.info(_LI('Skipping periodic task %(task)s because ' + 'it is disabled'), + {'task': name}) + return False + + # A periodic spacing of zero indicates that this task should + # be run on the default interval to avoid running too + # frequently. + if task._periodic_spacing == 0: + task._periodic_spacing = DEFAULT_INTERVAL + + cls._periodic_tasks.append((name, task)) + cls._periodic_spacing[name] = task._periodic_spacing + return True + + def __init__(cls, names, bases, dict_): + """Metaclass that allows us to collect decorated periodic tasks.""" + super(_PeriodicTasksMeta, cls).__init__(names, bases, dict_) + + # NOTE(sirp): if the attribute is not present then we must be the base + # class, so, go ahead an initialize it. If the attribute is present, + # then we're a subclass so make a copy of it so we don't step on our + # parent's toes. + try: + cls._periodic_tasks = cls._periodic_tasks[:] + except AttributeError: + cls._periodic_tasks = [] + + try: + cls._periodic_spacing = cls._periodic_spacing.copy() + except AttributeError: + cls._periodic_spacing = {} + + for value in cls.__dict__.values(): + if getattr(value, '_periodic_task', False): + cls._add_periodic_task(value) + + +def _nearest_boundary(last_run, spacing): + """Find nearest boundary which is in the past, which is a multiple of the + spacing with the last run as an offset. + + Eg if last run was 10 and spacing was 7, the new last run could be: 17, 24, + 31, 38... + + 0% to 5% of the spacing value will be added to this value to ensure tasks + do not synchronize. This jitter is rounded to the nearest second, this + means that spacings smaller than 20 seconds will not have jitter. + """ + current_time = time.time() + if last_run is None: + return current_time + delta = current_time - last_run + offset = delta % spacing + # Add up to 5% jitter + jitter = int(spacing * (random.random() / 20)) + return current_time - offset + jitter + + +@six.add_metaclass(_PeriodicTasksMeta) +class PeriodicTasks(object): + def __init__(self): + super(PeriodicTasks, self).__init__() + self._periodic_last_run = {} + for name, task in self._periodic_tasks: + self._periodic_last_run[name] = task._periodic_last_run + + def add_periodic_task(self, task): + """Add a periodic task to the list of periodic tasks. + + The task should already be decorated by @periodic_task. + """ + if self.__class__._add_periodic_task(task): + self._periodic_last_run[task._periodic_name] = ( + task._periodic_last_run) + + def run_periodic_tasks(self, context, raise_on_error=False): + """Tasks to be run at a periodic interval.""" + idle_for = DEFAULT_INTERVAL + for task_name, task in self._periodic_tasks: + full_task_name = '.'.join([self.__class__.__name__, task_name]) + + spacing = self._periodic_spacing[task_name] + last_run = self._periodic_last_run[task_name] + + # Check if due, if not skip + idle_for = min(idle_for, spacing) + if last_run is not None: + delta = last_run + spacing - time.time() + if delta > 0: + idle_for = min(idle_for, delta) + continue + + LOG.debug("Running periodic task %(full_task_name)s", + {"full_task_name": full_task_name}) + self._periodic_last_run[task_name] = _nearest_boundary( + last_run, spacing) + + try: + task(self, context) + except Exception as e: + if raise_on_error: + raise + LOG.exception(_LE("Error during %(full_task_name)s: %(e)s"), + {"full_task_name": full_task_name, "e": e}) + time.sleep(0) + + return idle_for diff --git a/barbican/queue/retry_scheduler.py b/barbican/queue/retry_scheduler.py new file mode 100644 index 000000000..7d1b5f021 --- /dev/null +++ b/barbican/queue/retry_scheduler.py @@ -0,0 +1,89 @@ +# Copyright (c) 2015 Rackspace, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Retry/scheduler classes and logic. +""" +from oslo_config import cfg + +from barbican.common import utils +from barbican import i18n as u +from barbican.model import repositories +from barbican.openstack.common import periodic_task +from barbican.openstack.common import service +from barbican.queue import client as async_client + +LOG = utils.getLogger(__name__) + +retry_opt_group = cfg.OptGroup(name='retry_scheduler', + title='Retry/Scheduler Options') + +retry_opts = [ + cfg.FloatOpt( + 'task_retry_tg_initial_delay', default=10.0, + help=u._('Seconds (float) to wait before starting retry scheduler')), + cfg.FloatOpt( + 'task_retry_tg_periodic_interval_max', default=10.0, + help=u._('Seconds (float) to wait between periodic schedule events')), +] + +CONF = cfg.CONF +CONF.register_group(retry_opt_group) +CONF.register_opts(retry_opts, group=retry_opt_group) + + +class PeriodicServer(service.Service): + """Server to process retry and scheduled tasks. + + This server is an Oslo periodic-task service (see + http://docs.openstack.org/developer/oslo-incubator/api/openstack.common + .periodic_task.html). On a periodic basis, this server checks for tasks + that need to be retried, and then sends them up to the RPC queue for later + processing by a worker node. + """ + def __init__(self, queue_resource=None): + super(PeriodicServer, self).__init__() + + # Setting up db engine to avoid lazy initialization + repositories.setup_database_engine_and_factory() + + # Connect to the worker queue, to send retry RPC tasks to it later. + self.queue = queue_resource or async_client.TaskClient() + + # Start the task retry periodic scheduler process up. + periodic_interval = ( + CONF.retry_scheduler.task_retry_tg_periodic_interval_max + ) + self.tg.add_dynamic_timer( + self._check_retry_tasks, + initial_delay=CONF.retry_scheduler.task_retry_tg_initial_delay, + periodic_interval_max=periodic_interval) + + def start(self): + LOG.info("Starting the PeriodicServer") + super(PeriodicServer, self).start() + + def stop(self, graceful=True): + LOG.info("Halting the PeriodicServer") + super(PeriodicServer, self).stop(graceful=graceful) + + @periodic_task.periodic_task + def _check_retry_tasks(self): + """Periodically check to see if tasks need to be scheduled.""" + LOG.debug("Processing scheduled retry tasks") + + # Return the next delay before this method is invoked again + # TODO(john-wood-w) A future CR will fill in the blanks here. + return 60.0 diff --git a/barbican/tests/tasks/test_retry_scheduler.py b/barbican/tests/tasks/test_retry_scheduler.py new file mode 100644 index 000000000..2dc4fb18c --- /dev/null +++ b/barbican/tests/tasks/test_retry_scheduler.py @@ -0,0 +1,148 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import time + +import eventlet +import mock +import oslotest.base as oslotest + +from barbican.queue import retry_scheduler + +# Oslo messaging RPC server uses eventlet. +eventlet.monkey_patch() + + +INITIAL_DELAY_SECONDS = 5.0 +NEXT_RETRY_SECONDS = 5.0 + + +class WhenRunningPeriodicServerRetryLogic(oslotest.BaseTestCase): + """Tests the retry logic invoked by the periodic task retry server. + + These tests are only concerned with the logic of the invoked periodic + task method. Testing of whether or not the periodic tasks are + actually invoked per configured schedule configuration is deferred to the + tests in :class:`WhenRunningPeriodicServer`. + """ + + def setUp(self): + super(WhenRunningPeriodicServerRetryLogic, self).setUp() + + retry_scheduler.CONF.set_override( + "task_retry_tg_initial_delay", + 2 * INITIAL_DELAY_SECONDS, + group='retry_scheduler') + + self.database_patcher = _DatabasePatcherHelper() + self.database_patcher.start() + + self.periodic_server = retry_scheduler.PeriodicServer( + queue_resource=None) + + def tearDown(self): + super(WhenRunningPeriodicServerRetryLogic, self).tearDown() + self.periodic_server.stop() + self.database_patcher.stop() + + def test_should_perform_retry_processing(self): + next_interval = self.periodic_server._check_retry_tasks() + + # TODO(john-wood-w) Will be updated by future CR with actual retry + # logic unit tests. + self.assertEqual(60, next_interval) + + +class WhenRunningPeriodicServer(oslotest.BaseTestCase): + """Tests the timing-related functionality of the periodic task retry server. + + These tests are only concerned with whether or not periodic tasks are + actually invoked per configured schedule configuration. The logic of the + invoked periodic task method itself is deferred to the tests in + :class:`WhenRunningPeriodicServerRetryLogic`. + """ + + def setUp(self): + super(WhenRunningPeriodicServer, self).setUp() + + retry_scheduler.CONF.set_override( + "task_retry_tg_initial_delay", + INITIAL_DELAY_SECONDS, + group='retry_scheduler') + + self.database_patcher = _DatabasePatcherHelper() + self.database_patcher.start() + + self.periodic_server = _PeriodicServerStub(queue_resource=None) + self.periodic_server.start() + + def tearDown(self): + super(WhenRunningPeriodicServer, self).tearDown() + self.periodic_server.stop() + self.database_patcher.stop() + + def test_should_have_invoked_periodic_task_after_initial_delay(self): + # Wait a bit longer than the initial delay. + time.sleep(3 * INITIAL_DELAY_SECONDS / 2) + + self.assertEqual(1, self.periodic_server.invoke_count) + + def test_should_have_invoked_periodic_task_twice(self): + # Wait a bit longer than the initial delay plus retry interval. + time.sleep(INITIAL_DELAY_SECONDS + 2 * NEXT_RETRY_SECONDS) + + self.assertEqual(2, self.periodic_server.invoke_count) + + def test_should_have_not_invoked_periodic_task_yet(self): + # Wait a short time, before the initial delay expires. + time.sleep(1) + + self.assertEqual(0, self.periodic_server.invoke_count) + + +class _PeriodicServerStub(retry_scheduler.PeriodicServer): + """Periodic server testing stub class. + + This class overrides the periodic retry task so that we can track how + many times it has been invoked by the Oslo periodic task process. + """ + def __init__(self, queue_resource=None): + super(_PeriodicServerStub, self).__init__() + + self.invoke_count = 0 + + def _check_retry_tasks(self): + """Override the periodic method, indicating we have called it.""" + self.invoke_count += 1 + + return NEXT_RETRY_SECONDS + + +class _DatabasePatcherHelper(object): + """This test suite does not test database interactions, so just stub it.""" + def __init__(self): + super(_DatabasePatcherHelper, self).__init__() + + database_config = { + 'return_value': None + } + self.database_patcher = mock.patch( + 'barbican.model.repositories.setup_database_engine_and_factory', + **database_config + ) + + def start(self): + self.database_patcher.start() + + def stop(self): + self.database_patcher.stop() diff --git a/bin/barbican-worker-retry-scheduler.py b/bin/barbican-worker-retry-scheduler.py new file mode 100755 index 000000000..caebd0d87 --- /dev/null +++ b/bin/barbican-worker-retry-scheduler.py @@ -0,0 +1,70 @@ +#!/usr/bin/env python + +# Copyright (c) 2015 Rackspace, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Barbican worker server, running a periodic retry/scheduler process. +""" + +import eventlet +import os +import sys + +# Oslo messaging RPC server uses eventlet. +eventlet.monkey_patch() + +# 'Borrowed' from the Glance project: +# If ../barbican/__init__.py exists, add ../ to Python search path, so that +# it will override what happens to be installed in /usr/(local/)lib/python... +possible_topdir = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]), + os.pardir, + os.pardir)) +if os.path.exists(os.path.join(possible_topdir, 'barbican', '__init__.py')): + sys.path.insert(0, possible_topdir) + + +from barbican.common import config +from barbican.openstack.common import service +from barbican import queue +from barbican.queue import retry_scheduler + +from oslo_config import cfg +from oslo_log import log + + +def fail(returncode, e): + sys.stderr.write("ERROR: {0}\n".format(e)) + sys.exit(returncode) + + +if __name__ == '__main__': + try: + config.parse_args() + CONF = cfg.CONF + + # Import and configure logging. + log.setup(CONF, 'barbican-retry-scheduler') + LOG = log.getLogger(__name__) + LOG.debug("Booting up Barbican worker retry/scheduler node...") + + # Queuing initialization (as a client only). + queue.init(CONF, is_server_side=False) + + service.launch( + retry_scheduler.PeriodicServer() + ).wait() + except RuntimeError as e: + fail(1, e) diff --git a/etc/barbican/barbican-api.conf b/etc/barbican/barbican-api.conf index 1187208aa..e6aecce36 100644 --- a/etc/barbican/barbican-api.conf +++ b/etc/barbican/barbican-api.conf @@ -151,6 +151,17 @@ version = '1.1' # Server name for RPC service server_name = 'barbican.queue' + +# ================= Retry/Scheduler Options ========================== + +[retry_scheduler] +# Seconds (float) to wait between starting retry scheduler +task_retry_tg_initial_delay = 10.0 + +# Seconds (float) to wait between starting retry scheduler +task_retry_tg_periodic_interval_max = 10.0 + + # ================= Keystone Notification Options - Application =============== [keystone_notifications] diff --git a/openstack-common.conf b/openstack-common.conf index 6ea9003d6..8008dddce 100644 --- a/openstack-common.conf +++ b/openstack-common.conf @@ -1,7 +1,7 @@ [DEFAULT] # The list of modules to copy from openstack-common -modules=local,policy,eventlet_backdoor,service +modules=local,policy,eventlet_backdoor,service,periodic_task # The base module to hold the copy of openstack.common base=barbican