Add retry server and functional tests to DevStack

Add a retry scheduler server process to the DevStack start/stop
processes. This includes adding a PBR entry point and barbican.cmd
script for the retry scheduler process, as other projects such as
Glance and Nova are doing now. Eventually we'll want to move over all
our boot scripts to the entry point approach. Verify functional test
for generating a simple certificate order, which is the first of the
extended-workflow order types that utilize the retry processing logic.
Also add try/catch around the retry process because if we don't pass
back a retry interval to the Oslo periodic task framework, it stops
rescheduling tasks! Also added delays to the functional test order
status check as for SQLite I was noticing disk I/O concurrency errors
otherwise. Yes, I'd still like to support SQLite for local functional
testing.

Change-Id: Ib7b50ab7f7354fefebfdf654689427ae7bf59e58
This commit is contained in:
jfwood 2015-04-06 08:08:04 -05:00 committed by Steve Heyman
parent b34bbe7cbd
commit 02dc4cdb71
13 changed files with 184 additions and 31 deletions

View File

@ -27,8 +27,9 @@ from oslo_log import log
class DatabaseManager(object):
"""Builds and executes a CLI parser to manage the Barbican
"""Database Manager class.
Builds and executes a CLI parser to manage the Barbican database
This extends the Alembic commands.
"""

View File

@ -68,6 +68,5 @@ def main():
except RuntimeError as e:
fail(1, e)
if __name__ == '__main__':
main()

View File

@ -87,17 +87,13 @@ class PeriodicServer(service.Service):
:return: Return the number of seconds to wait before invoking this
method again.
"""
LOG.info(u._LI("Processing scheduled retry tasks:"))
# Retrieve tasks to retry.
entities, _, _, total = self.order_retry_repo.get_by_create_date(
only_at_or_before_this_date=datetime.datetime.utcnow(),
suppress_exception=True)
# Create RPC tasks for each retry task found.
if total > 0:
for task in entities:
self._enqueue_task(task)
total_tasks_processed = 0
try:
total_tasks_processed = self._process_retry_tasks()
except Exception:
LOG.exception(
u._LE("Problem seen processing scheduled retry tasks")
)
# Return the next delay before this method is invoked again.
check_again_in_seconds = _compute_next_periodic_interval()
@ -105,16 +101,45 @@ class PeriodicServer(service.Service):
u._LI("Done processing '%(total)s' tasks, will check again in "
"'%(next)s' seconds."),
{
'total': total,
'total': total_tasks_processed,
'next': check_again_in_seconds
}
)
return check_again_in_seconds
def _process_retry_tasks(self):
"""Scan for and then re-queue tasks that are ready to retry."""
LOG.info(u._LI("Processing scheduled retry tasks:"))
# Retrieve tasks to retry.
entities, total = self._retrieve_tasks()
# Create RPC tasks for each retry task found.
for task in entities:
self._enqueue_task(task)
return total
def _retrieve_tasks(self):
"""Retrieve a list of tasks to retry."""
repositories.start()
try:
entities, _, _, total = self.order_retry_repo.get_by_create_date(
only_at_or_before_this_date=datetime.datetime.utcnow(),
suppress_exception=True)
finally:
repositories.clear()
return entities, total
def _enqueue_task(self, task):
"""Re-enqueue the specified task."""
retry_task_name = 'N/A'
retry_args = 'N/A'
retry_kwargs = 'N/A'
# Start a new isolated database transaction just for this task.
repositories.start()
try:
# Invoke queue client to place retried RPC task on queue.
retry_task_name = task.retry_task

View File

@ -52,6 +52,11 @@ MAP_RETRY_TASKS = {
}
def find_function_name(func, if_no_name=None):
"""Returns pretty-formatted function name."""
return getattr(func, '__name__', if_no_name)
def retryable_order(fn):
"""Provides retry/scheduling support to Order-related tasks."""
@ -64,6 +69,10 @@ def retryable_order(fn):
LOG.info(
u._LI("Scheduled RPC method for retry: '%s'"),
retry_rpc_method)
else:
LOG.info(
u._LI("Task '%s' did not have to be retried"),
find_function_name(fn, if_no_name='???'))
return wrapper
@ -73,7 +82,7 @@ def transactional(fn):
@functools.wraps(fn)
def wrapper(*args, **kwargs):
fn_name = getattr(fn, '__name__', '????')
fn_name = find_function_name(fn, if_no_name='???')
if not queue.is_server_side():
# Non-server mode directly invokes tasks.
@ -84,7 +93,9 @@ def transactional(fn):
try:
fn(*args, **kwargs)
repositories.commit()
LOG.info(u._LI("Completed worker task: '%s'"), fn_name)
LOG.info(
u._LI("Completed worker task (post-commit): '%s'"),
fn_name)
except Exception:
"""NOTE: Wrapped functions must process with care!
@ -158,8 +169,7 @@ def schedule_order_retry_tasks(
elif common.RetryTasks.INVOKE_SAME_TASK == retry_result.retry_task:
if invoked_task:
retry_rpc_method = getattr(
invoked_task, '__name__', None)
retry_rpc_method = find_function_name(invoked_task)
else:
retry_rpc_method = MAP_RETRY_TASKS.get(retry_result.retry_task)

View File

@ -56,6 +56,13 @@ class BaseTask(object):
For Liberty, we might want to consider a workflow manager instead of
these process_xxxx() method as shown here:
https://gist.github.com/jfwood/a8130265b0db3c793ec8
:param args: List of arguments passed in from the client.
:param kwargs: Dict of arguments passed in from the client.
:return: Returns :class:`FollowOnProcessingStatusDTO` if follow-on
processing (such as retrying this or another task) is
required, otherwise a None return indicates that no
follow-on processing is required.
"""
try:
return self.process(*args, **kwargs)

View File

View File

@ -0,0 +1,52 @@
# Copyright (c) 2015 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
from barbican.cmd import retry_scheduler
from barbican.tests import utils
class WhenInvokingRetryServiceCommand(utils.BaseTestCase):
"""Test the retry scheduler functionality."""
def setUp(self):
super(WhenInvokingRetryServiceCommand, self).setUp()
@mock.patch('barbican.common.config')
@mock.patch('barbican.queue.init')
@mock.patch('oslo_service.service.launch')
@mock.patch('barbican.queue.retry_scheduler.PeriodicServer')
def test_should_launch_service(
self,
mock_periodic_server,
mock_service_launch,
mock_queue_init,
mock_config):
retry_scheduler.main()
self.assertEqual(mock_queue_init.call_count, 1)
self.assertEqual(mock_service_launch.call_count, 1)
self.assertEqual(mock_periodic_server.call_count, 1)
@mock.patch('oslo_log.log.setup')
@mock.patch('sys.exit')
def test_should_fail_run_command(
self, mock_sys_exit, mock_log_setup):
mock_log_setup.side_effect = RuntimeError()
retry_scheduler.main()
self.assertEqual(mock_sys_exit.call_count, 1)

View File

@ -31,6 +31,10 @@ INITIAL_DELAY_SECONDS = 5.0
NEXT_RETRY_SECONDS = 5.0
def is_interval_in_expected_range(interval):
return NEXT_RETRY_SECONDS * .8 <= interval < NEXT_RETRY_SECONDS * 1.2
class WhenRunningPeriodicServerRetryLogic(database_utils.RepositoryTestCase):
"""Tests the retry logic invoked by the periodic task retry server.
@ -65,9 +69,7 @@ class WhenRunningPeriodicServerRetryLogic(database_utils.RepositoryTestCase):
def test_should_perform_retry_processing_no_tasks(self):
interval = self.periodic_server._check_retry_tasks()
self.assertEqual(
True,
NEXT_RETRY_SECONDS * .8 <= interval < NEXT_RETRY_SECONDS * 1.2)
self.assertTrue(is_interval_in_expected_range(interval))
def test_should_perform_retry_processing_one_task(self):
# Add one retry task.
@ -86,9 +88,7 @@ class WhenRunningPeriodicServerRetryLogic(database_utils.RepositoryTestCase):
suppress_exception=True)
self.assertEqual(0, total)
self.assertEqual(
True,
NEXT_RETRY_SECONDS * .8 <= interval < NEXT_RETRY_SECONDS * 1.2)
self.assertTrue(is_interval_in_expected_range(interval))
self.queue_client.test_task.assert_called_once_with(
*args, **kwargs
)
@ -113,6 +113,18 @@ class WhenRunningPeriodicServerRetryLogic(database_utils.RepositoryTestCase):
suppress_exception=True)
self.assertEqual(1, total)
@mock.patch('barbican.model.repositories.get_order_retry_tasks_repository')
def test_should_fail_process_retry(self, mock_get_repo):
mock_get_repo.return_value.get_by_create_date.side_effect = \
Exception()
periodic_server_with_mock_repo = retry_scheduler.PeriodicServer(
queue_resource=self.queue_client)
interval = periodic_server_with_mock_repo._check_retry_tasks()
self.assertTrue(is_interval_in_expected_range(interval))
def _create_retry_task(self):
# Add one retry task:
task = 'test_task'

View File

@ -2,7 +2,7 @@
# Install and start **Barbican** service
# To enable a minimal set of Barbican features, add the following to localrc:
# enable_service barbican
# enable_service barbican-svc barbican-retry
#
# Dependencies:
# - functions
@ -55,6 +55,31 @@ TEMPEST_SERVICES+=,barbican
# Functions
# ---------
# TODO(john-wood-w) These 'magic' functions are called by devstack to enable
# a given service (so the name between 'is_' and '_enabled'). Currently the
# Zuul infra gate configuration (at https://github.com/openstack-infra/project-config/blob/master/jenkins/jobs/barbican.yaml)
# only enables the 'barbican' service. So the two functions below, for the two
# services we wish to run, have to key off of that lone 'barbican' selection.
# Once the Zuul config is updated to add these two services properly, then
# these functions should be replaced by the single method below.
# !!!! Special thanks to rm_work for figuring this out !!!!
function is_barbican-retry_enabled {
[[ ,${ENABLED_SERVICES} =~ ,"barbican" ]] && return 0
}
function is_barbican-svc_enabled {
[[ ,${ENABLED_SERVICES} =~ ,"barbican" ]] && return 0
}
# TODO(john-wood-w) Replace the above two functions with the one below once
# Zuul is update per above.
## Test if any Barbican services are enabled
## is_barbican_enabled
#function is_barbican_enabled {
# [[ ,${ENABLED_SERVICES} =~ ,"barbican-" ]] && return 0
# return 1
#}
# cleanup_barbican - Remove residual data files, anything left over from previous
# runs that a clean run would need to clean up
function cleanup_barbican {
@ -175,7 +200,16 @@ function install_barbicanclient {
# start_barbican - Start running processes, including screen
function start_barbican {
screen_it barbican "uwsgi --master --emperor $BARBICAN_CONF_DIR/vassals"
# Start the Barbican service up.
run_process barbican-svc "uwsgi --master --emperor $BARBICAN_CONF_DIR/vassals"
# Pause while the barbican-svc populates the database, otherwise the retry
# service below might try to do this at the same time, leading to race
# conditions.
sleep 10
# Start the retry scheduler server up.
run_process barbican-retry "$BARBICAN_BIN_DIR/barbican-retry --config-file=$BARBICAN_CONF_DIR/barbican-api.conf"
}
# stop_barbican - Stop running processes
@ -187,7 +221,9 @@ function stop_barbican {
# This cleans up the PID file, but uses pkill so Barbican
# uWSGI emperor process doesn't actually stop
screen_stop barbican
stop_process barbican-svc
stop_process barbican-retry
}
function get_id {

View File

@ -1,6 +1,6 @@
[[local|localrc]]
disable_all_services
enable_service rabbit mysql key barbican
enable_service rabbit mysql key barbican-svc barbican-retry
# This is to keep the token small for testing
KEYSTONE_TOKEN_FORMAT=UUID

View File

@ -157,14 +157,17 @@ class CertificatesTestCase(base.TestCase):
self.behaviors.delete_all_created_orders()
super(CertificatesTestCase, self).tearDown()
def wait_for_order(self, order_ref):
def wait_for_order(
self, order_ref, delay_before_check_seconds=1, max_wait_seconds=4):
time.sleep(delay_before_check_seconds)
# Make sure we have an order in a terminal state
time_count = 1
order_resp = self.behaviors.get_order(order_ref)
while ((order_resp.model.status != "ACTIVE") and
(order_resp.model.status != "ERROR") and
time_count <= 4):
time_count <= max_wait_seconds):
time.sleep(1)
time_count += 1
order_resp = self.behaviors.get_order(order_ref)
@ -271,6 +274,7 @@ class CertificatesTestCase(base.TestCase):
self.assertEqual(message, resp_dict['description'])
@testtools.testcase.attr('positive')
@testtools.skipIf(dogtag_imports_ok, "not applicable with dogtag plugin")
def test_create_simple_cmc_order(self):
test_model = order_models.OrderModel(**self.simple_cmc_data)
test_model.meta['request_data'] = base64.b64encode(
@ -283,6 +287,13 @@ class CertificatesTestCase(base.TestCase):
order_resp = self.behaviors.get_order(order_ref)
self.verify_pending_waiting_for_ca(order_resp)
# Wait for retry processing to handle checking for status with the
# default certificate plugin (which takes about 10 seconds +- 20%).
order_resp = self.wait_for_order(
order_ref, delay_before_check_seconds=20, max_wait_seconds=25)
self.assertEqual('ACTIVE', order_resp.model.status)
@testtools.testcase.attr('positive')
def test_create_simple_cmc_order_without_requestor_info(self):
self.simple_cmc_data.pop("requestor_name", None)

View File

@ -26,8 +26,8 @@ console_scripts =
barbican-db-manage = barbican.cmd.db_manage:main
barbican-keystone-listener = barbican.cmd.keystone_listener:main
barbican-worker = barbican.cmd.worker:main
barbican-worker-retry-scheduler = barbican.cmd.worker_retry_scheduler:main
pkcs11-kek-rewrap = barbican.cmd.pkcs11_kek_rewrap:main
barbican-retry = barbican.cmd.retry_scheduler:main
barbican.secretstore.plugin =
store_crypto = barbican.plugin.store_crypto:StoreCryptoAdapterPlugin