Add retry periodic task and worker-client logic

Added a new retry scheduler Oslo Server that executes the Oslo
periodic task process. This process invokes a retry method on a
configured periodic schedule. This process can in turn enqueue RPC
tasks via the same client interface that the API nodes use. Follow-on
CRs will utilize this periodic process to enqueue RPC tasks.

Implements: blueprint add-worker-retry-update-support
Change-Id: I8ba6dc57b015be16c78eef3ba949f336c60817f3
This commit is contained in:
jfwood 2015-03-17 00:36:08 -05:00
parent 87bb9141a2
commit b2fbda189f
7 changed files with 553 additions and 3 deletions

View File

@ -17,14 +17,14 @@ See http://docs.openstack.org/developer/oslo.i18n/usage.html
"""
try:
import oslo.i18n
import oslo_i18n
# NOTE(dhellmann): This reference to o-s-l-o will be replaced by the
# application name when this module is synced into the separate
# repository. It is OK to have more than one translation function
# using the same domain, since there will still only be one message
# catalog.
_translators = oslo.i18n.TranslatorFactory(domain='barbican')
_translators = oslo_i18n.TranslatorFactory(domain='barbican')
# The primary translation function using the well-known name "_"
_ = _translators.primary

View File

@ -0,0 +1,232 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import copy
import logging
import random
import time
from oslo_config import cfg
import six
from barbican.openstack.common._i18n import _, _LE, _LI
periodic_opts = [
cfg.BoolOpt('run_external_periodic_tasks',
default=True,
help='Some periodic tasks can be run in a separate process. '
'Should we run them here?'),
]
CONF = cfg.CONF
CONF.register_opts(periodic_opts)
LOG = logging.getLogger(__name__)
DEFAULT_INTERVAL = 60.0
def list_opts():
"""Entry point for oslo-config-generator."""
return [(None, copy.deepcopy(periodic_opts))]
class InvalidPeriodicTaskArg(Exception):
message = _("Unexpected argument for periodic task creation: %(arg)s.")
def periodic_task(*args, **kwargs):
"""Decorator to indicate that a method is a periodic task.
This decorator can be used in two ways:
1. Without arguments '@periodic_task', this will be run on the default
interval of 60 seconds.
2. With arguments:
@periodic_task(spacing=N [, run_immediately=[True|False]]
[, name=[None|"string"])
this will be run on approximately every N seconds. If this number is
negative the periodic task will be disabled. If the run_immediately
argument is provided and has a value of 'True', the first run of the
task will be shortly after task scheduler starts. If
run_immediately is omitted or set to 'False', the first time the
task runs will be approximately N seconds after the task scheduler
starts. If name is not provided, __name__ of function is used.
"""
def decorator(f):
# Test for old style invocation
if 'ticks_between_runs' in kwargs:
raise InvalidPeriodicTaskArg(arg='ticks_between_runs')
# Control if run at all
f._periodic_task = True
f._periodic_external_ok = kwargs.pop('external_process_ok', False)
if f._periodic_external_ok and not CONF.run_external_periodic_tasks:
f._periodic_enabled = False
else:
f._periodic_enabled = kwargs.pop('enabled', True)
f._periodic_name = kwargs.pop('name', f.__name__)
# Control frequency
f._periodic_spacing = kwargs.pop('spacing', 0)
f._periodic_immediate = kwargs.pop('run_immediately', False)
if f._periodic_immediate:
f._periodic_last_run = None
else:
f._periodic_last_run = time.time()
return f
# NOTE(sirp): The `if` is necessary to allow the decorator to be used with
# and without parenthesis.
#
# In the 'with-parenthesis' case (with kwargs present), this function needs
# to return a decorator function since the interpreter will invoke it like:
#
# periodic_task(*args, **kwargs)(f)
#
# In the 'without-parenthesis' case, the original function will be passed
# in as the first argument, like:
#
# periodic_task(f)
if kwargs:
return decorator
else:
return decorator(args[0])
class _PeriodicTasksMeta(type):
def _add_periodic_task(cls, task):
"""Add a periodic task to the list of periodic tasks.
The task should already be decorated by @periodic_task.
:return: whether task was actually enabled
"""
name = task._periodic_name
if task._periodic_spacing < 0:
LOG.info(_LI('Skipping periodic task %(task)s because '
'its interval is negative'),
{'task': name})
return False
if not task._periodic_enabled:
LOG.info(_LI('Skipping periodic task %(task)s because '
'it is disabled'),
{'task': name})
return False
# A periodic spacing of zero indicates that this task should
# be run on the default interval to avoid running too
# frequently.
if task._periodic_spacing == 0:
task._periodic_spacing = DEFAULT_INTERVAL
cls._periodic_tasks.append((name, task))
cls._periodic_spacing[name] = task._periodic_spacing
return True
def __init__(cls, names, bases, dict_):
"""Metaclass that allows us to collect decorated periodic tasks."""
super(_PeriodicTasksMeta, cls).__init__(names, bases, dict_)
# NOTE(sirp): if the attribute is not present then we must be the base
# class, so, go ahead an initialize it. If the attribute is present,
# then we're a subclass so make a copy of it so we don't step on our
# parent's toes.
try:
cls._periodic_tasks = cls._periodic_tasks[:]
except AttributeError:
cls._periodic_tasks = []
try:
cls._periodic_spacing = cls._periodic_spacing.copy()
except AttributeError:
cls._periodic_spacing = {}
for value in cls.__dict__.values():
if getattr(value, '_periodic_task', False):
cls._add_periodic_task(value)
def _nearest_boundary(last_run, spacing):
"""Find nearest boundary which is in the past, which is a multiple of the
spacing with the last run as an offset.
Eg if last run was 10 and spacing was 7, the new last run could be: 17, 24,
31, 38...
0% to 5% of the spacing value will be added to this value to ensure tasks
do not synchronize. This jitter is rounded to the nearest second, this
means that spacings smaller than 20 seconds will not have jitter.
"""
current_time = time.time()
if last_run is None:
return current_time
delta = current_time - last_run
offset = delta % spacing
# Add up to 5% jitter
jitter = int(spacing * (random.random() / 20))
return current_time - offset + jitter
@six.add_metaclass(_PeriodicTasksMeta)
class PeriodicTasks(object):
def __init__(self):
super(PeriodicTasks, self).__init__()
self._periodic_last_run = {}
for name, task in self._periodic_tasks:
self._periodic_last_run[name] = task._periodic_last_run
def add_periodic_task(self, task):
"""Add a periodic task to the list of periodic tasks.
The task should already be decorated by @periodic_task.
"""
if self.__class__._add_periodic_task(task):
self._periodic_last_run[task._periodic_name] = (
task._periodic_last_run)
def run_periodic_tasks(self, context, raise_on_error=False):
"""Tasks to be run at a periodic interval."""
idle_for = DEFAULT_INTERVAL
for task_name, task in self._periodic_tasks:
full_task_name = '.'.join([self.__class__.__name__, task_name])
spacing = self._periodic_spacing[task_name]
last_run = self._periodic_last_run[task_name]
# Check if due, if not skip
idle_for = min(idle_for, spacing)
if last_run is not None:
delta = last_run + spacing - time.time()
if delta > 0:
idle_for = min(idle_for, delta)
continue
LOG.debug("Running periodic task %(full_task_name)s",
{"full_task_name": full_task_name})
self._periodic_last_run[task_name] = _nearest_boundary(
last_run, spacing)
try:
task(self, context)
except Exception as e:
if raise_on_error:
raise
LOG.exception(_LE("Error during %(full_task_name)s: %(e)s"),
{"full_task_name": full_task_name, "e": e})
time.sleep(0)
return idle_for

View File

@ -0,0 +1,89 @@
# Copyright (c) 2015 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Retry/scheduler classes and logic.
"""
from oslo_config import cfg
from barbican.common import utils
from barbican import i18n as u
from barbican.model import repositories
from barbican.openstack.common import periodic_task
from barbican.openstack.common import service
from barbican.queue import client as async_client
LOG = utils.getLogger(__name__)
retry_opt_group = cfg.OptGroup(name='retry_scheduler',
title='Retry/Scheduler Options')
retry_opts = [
cfg.FloatOpt(
'task_retry_tg_initial_delay', default=10.0,
help=u._('Seconds (float) to wait before starting retry scheduler')),
cfg.FloatOpt(
'task_retry_tg_periodic_interval_max', default=10.0,
help=u._('Seconds (float) to wait between periodic schedule events')),
]
CONF = cfg.CONF
CONF.register_group(retry_opt_group)
CONF.register_opts(retry_opts, group=retry_opt_group)
class PeriodicServer(service.Service):
"""Server to process retry and scheduled tasks.
This server is an Oslo periodic-task service (see
http://docs.openstack.org/developer/oslo-incubator/api/openstack.common
.periodic_task.html). On a periodic basis, this server checks for tasks
that need to be retried, and then sends them up to the RPC queue for later
processing by a worker node.
"""
def __init__(self, queue_resource=None):
super(PeriodicServer, self).__init__()
# Setting up db engine to avoid lazy initialization
repositories.setup_database_engine_and_factory()
# Connect to the worker queue, to send retry RPC tasks to it later.
self.queue = queue_resource or async_client.TaskClient()
# Start the task retry periodic scheduler process up.
periodic_interval = (
CONF.retry_scheduler.task_retry_tg_periodic_interval_max
)
self.tg.add_dynamic_timer(
self._check_retry_tasks,
initial_delay=CONF.retry_scheduler.task_retry_tg_initial_delay,
periodic_interval_max=periodic_interval)
def start(self):
LOG.info("Starting the PeriodicServer")
super(PeriodicServer, self).start()
def stop(self, graceful=True):
LOG.info("Halting the PeriodicServer")
super(PeriodicServer, self).stop(graceful=graceful)
@periodic_task.periodic_task
def _check_retry_tasks(self):
"""Periodically check to see if tasks need to be scheduled."""
LOG.debug("Processing scheduled retry tasks")
# Return the next delay before this method is invoked again
# TODO(john-wood-w) A future CR will fill in the blanks here.
return 60.0

View File

@ -0,0 +1,148 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import time
import eventlet
import mock
import oslotest.base as oslotest
from barbican.queue import retry_scheduler
# Oslo messaging RPC server uses eventlet.
eventlet.monkey_patch()
INITIAL_DELAY_SECONDS = 5.0
NEXT_RETRY_SECONDS = 5.0
class WhenRunningPeriodicServerRetryLogic(oslotest.BaseTestCase):
"""Tests the retry logic invoked by the periodic task retry server.
These tests are only concerned with the logic of the invoked periodic
task method. Testing of whether or not the periodic tasks are
actually invoked per configured schedule configuration is deferred to the
tests in :class:`WhenRunningPeriodicServer`.
"""
def setUp(self):
super(WhenRunningPeriodicServerRetryLogic, self).setUp()
retry_scheduler.CONF.set_override(
"task_retry_tg_initial_delay",
2 * INITIAL_DELAY_SECONDS,
group='retry_scheduler')
self.database_patcher = _DatabasePatcherHelper()
self.database_patcher.start()
self.periodic_server = retry_scheduler.PeriodicServer(
queue_resource=None)
def tearDown(self):
super(WhenRunningPeriodicServerRetryLogic, self).tearDown()
self.periodic_server.stop()
self.database_patcher.stop()
def test_should_perform_retry_processing(self):
next_interval = self.periodic_server._check_retry_tasks()
# TODO(john-wood-w) Will be updated by future CR with actual retry
# logic unit tests.
self.assertEqual(60, next_interval)
class WhenRunningPeriodicServer(oslotest.BaseTestCase):
"""Tests the timing-related functionality of the periodic task retry server.
These tests are only concerned with whether or not periodic tasks are
actually invoked per configured schedule configuration. The logic of the
invoked periodic task method itself is deferred to the tests in
:class:`WhenRunningPeriodicServerRetryLogic`.
"""
def setUp(self):
super(WhenRunningPeriodicServer, self).setUp()
retry_scheduler.CONF.set_override(
"task_retry_tg_initial_delay",
INITIAL_DELAY_SECONDS,
group='retry_scheduler')
self.database_patcher = _DatabasePatcherHelper()
self.database_patcher.start()
self.periodic_server = _PeriodicServerStub(queue_resource=None)
self.periodic_server.start()
def tearDown(self):
super(WhenRunningPeriodicServer, self).tearDown()
self.periodic_server.stop()
self.database_patcher.stop()
def test_should_have_invoked_periodic_task_after_initial_delay(self):
# Wait a bit longer than the initial delay.
time.sleep(3 * INITIAL_DELAY_SECONDS / 2)
self.assertEqual(1, self.periodic_server.invoke_count)
def test_should_have_invoked_periodic_task_twice(self):
# Wait a bit longer than the initial delay plus retry interval.
time.sleep(INITIAL_DELAY_SECONDS + 2 * NEXT_RETRY_SECONDS)
self.assertEqual(2, self.periodic_server.invoke_count)
def test_should_have_not_invoked_periodic_task_yet(self):
# Wait a short time, before the initial delay expires.
time.sleep(1)
self.assertEqual(0, self.periodic_server.invoke_count)
class _PeriodicServerStub(retry_scheduler.PeriodicServer):
"""Periodic server testing stub class.
This class overrides the periodic retry task so that we can track how
many times it has been invoked by the Oslo periodic task process.
"""
def __init__(self, queue_resource=None):
super(_PeriodicServerStub, self).__init__()
self.invoke_count = 0
def _check_retry_tasks(self):
"""Override the periodic method, indicating we have called it."""
self.invoke_count += 1
return NEXT_RETRY_SECONDS
class _DatabasePatcherHelper(object):
"""This test suite does not test database interactions, so just stub it."""
def __init__(self):
super(_DatabasePatcherHelper, self).__init__()
database_config = {
'return_value': None
}
self.database_patcher = mock.patch(
'barbican.model.repositories.setup_database_engine_and_factory',
**database_config
)
def start(self):
self.database_patcher.start()
def stop(self):
self.database_patcher.stop()

View File

@ -0,0 +1,70 @@
#!/usr/bin/env python
# Copyright (c) 2015 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Barbican worker server, running a periodic retry/scheduler process.
"""
import eventlet
import os
import sys
# Oslo messaging RPC server uses eventlet.
eventlet.monkey_patch()
# 'Borrowed' from the Glance project:
# If ../barbican/__init__.py exists, add ../ to Python search path, so that
# it will override what happens to be installed in /usr/(local/)lib/python...
possible_topdir = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]),
os.pardir,
os.pardir))
if os.path.exists(os.path.join(possible_topdir, 'barbican', '__init__.py')):
sys.path.insert(0, possible_topdir)
from barbican.common import config
from barbican.openstack.common import service
from barbican import queue
from barbican.queue import retry_scheduler
from oslo_config import cfg
from oslo_log import log
def fail(returncode, e):
sys.stderr.write("ERROR: {0}\n".format(e))
sys.exit(returncode)
if __name__ == '__main__':
try:
config.parse_args()
CONF = cfg.CONF
# Import and configure logging.
log.setup(CONF, 'barbican-retry-scheduler')
LOG = log.getLogger(__name__)
LOG.debug("Booting up Barbican worker retry/scheduler node...")
# Queuing initialization (as a client only).
queue.init(CONF, is_server_side=False)
service.launch(
retry_scheduler.PeriodicServer()
).wait()
except RuntimeError as e:
fail(1, e)

View File

@ -151,6 +151,17 @@ version = '1.1'
# Server name for RPC service
server_name = 'barbican.queue'
# ================= Retry/Scheduler Options ==========================
[retry_scheduler]
# Seconds (float) to wait between starting retry scheduler
task_retry_tg_initial_delay = 10.0
# Seconds (float) to wait between starting retry scheduler
task_retry_tg_periodic_interval_max = 10.0
# ================= Keystone Notification Options - Application ===============
[keystone_notifications]

View File

@ -1,7 +1,7 @@
[DEFAULT]
# The list of modules to copy from openstack-common
modules=local,policy,eventlet_backdoor,service
modules=local,policy,eventlet_backdoor,service,periodic_task
# The base module to hold the copy of openstack.common
base=barbican