Implement _process_unfinished_notifications periodic tasks
Added _process_unfinished_notifications to process notifications which are in error or new state. This periodic task will execute at regular interval defined by new config option 'process_unfinished_notifications_interval' defaults to 120 seconds. The notifications which are in ‘new’ status will be picked up based on a new config option ‘retry_notification_new_status_interval’ defaults to 60 seconds. Implements: bp add-periodic-tasks Change-Id: I6e607d83f04618ad695a9614f84ad690b8804848
This commit is contained in:
parent
7645ab489d
commit
6db17bc33a
@ -113,6 +113,7 @@ function configure_masakari {
|
||||
|
||||
# Set os_privileged_user credentials (used for connecting nova service)
|
||||
iniset $MASAKARI_CONF DEFAULT os_privileged_user_name nova
|
||||
iniset $MASAKARI_CONF DEFAULT os_privileged_user_auth_url "${KEYSTONE_AUTH_PROTOCOL}://${KEYSTONE_AUTH_HOST}/identity_admin"
|
||||
iniset $MASAKARI_CONF DEFAULT os_privileged_user_password "$SERVICE_PASSWORD"
|
||||
iniset $MASAKARI_CONF DEFAULT os_privileged_user_tenant "$SERVICE_PROJECT_NAME"
|
||||
iniset $MASAKARI_CONF DEFAULT graceful_shutdown_timeout "$SERVICE_GRACEFUL_SHUTDOWN_TIMEOUT"
|
||||
|
@ -19,7 +19,6 @@ Handles all requests to Nova.
|
||||
import functools
|
||||
import sys
|
||||
|
||||
from keystoneauth1.access import service_catalog
|
||||
from keystoneauth1 import exceptions as keystone_exception
|
||||
import keystoneauth1.loading
|
||||
import keystoneauth1.session
|
||||
@ -88,17 +87,9 @@ def novaclient(context, timeout=None):
|
||||
@param timeout: Number of seconds to wait for an answer before raising a
|
||||
Timeout exception (None to disable)
|
||||
"""
|
||||
sc = context.service_catalog or []
|
||||
|
||||
nova_catalog_info = CONF.nova_catalog_admin_info
|
||||
service_type, service_name, endpoint_type = nova_catalog_info.split(':')
|
||||
|
||||
# Extract the region if set in configuration
|
||||
if CONF.os_region_name:
|
||||
region_filter = {'region_name': CONF.os_region_name}
|
||||
else:
|
||||
region_filter = {}
|
||||
|
||||
context = ctx.RequestContext(
|
||||
CONF.os_privileged_user_name, None,
|
||||
auth_token=CONF.os_privileged_user_password,
|
||||
@ -107,22 +98,7 @@ def novaclient(context, timeout=None):
|
||||
|
||||
# User needs to authenticate to Keystone before querying Nova, so we set
|
||||
# auth_url to the identity service endpoint
|
||||
if CONF.os_privileged_user_auth_url:
|
||||
url = CONF.os_privileged_user_auth_url
|
||||
else:
|
||||
# We then pass region_name, endpoint_type, etc. to the
|
||||
# Client() constructor so that the final endpoint is
|
||||
# chosen correctly.
|
||||
try:
|
||||
url = service_catalog.ServiceCatalogV2(sc).url_for(
|
||||
service_type='identity',
|
||||
interface=endpoint_type,
|
||||
**region_filter)
|
||||
except keystone_exception.EndpointNotFound:
|
||||
url = service_catalog.ServiceCatalogV3(sc).url_for(
|
||||
service_type='identity',
|
||||
interface=endpoint_type,
|
||||
**region_filter)
|
||||
url = CONF.os_privileged_user_auth_url
|
||||
|
||||
LOG.debug('Creating a Nova client using "%s" user',
|
||||
CONF.os_privileged_user_name)
|
||||
|
@ -77,6 +77,18 @@ notification_opts = [
|
||||
cfg.IntOpt('wait_period_after_power_on',
|
||||
default=60,
|
||||
help='Number of seconds to wait for instance to start'),
|
||||
cfg.IntOpt('process_unfinished_notifications_interval',
|
||||
default=120,
|
||||
help='Interval in seconds for processing notifications which '
|
||||
'are in error or new state.'),
|
||||
cfg.IntOpt('retry_notification_new_status_interval',
|
||||
default=60,
|
||||
help="Interval in seconds for identifying notifications which "
|
||||
"are in new state. If the notification is in new state "
|
||||
"till this config option value after it's "
|
||||
"generated_time, then it is considered that notification "
|
||||
"is ignored by the messaging queue and will be processed "
|
||||
"by 'process_unfinished_notifications' periodic task."),
|
||||
]
|
||||
|
||||
|
||||
|
@ -24,11 +24,13 @@ workflows.
|
||||
|
||||
from oslo_log import log as logging
|
||||
import oslo_messaging as messaging
|
||||
from oslo_service import periodic_task
|
||||
from oslo_utils import timeutils
|
||||
|
||||
import masakari.conf
|
||||
from masakari.engine import driver
|
||||
from masakari import exception
|
||||
from masakari.i18n import _LI, _LW
|
||||
from masakari.i18n import _LE, _LI, _LW
|
||||
from masakari import manager
|
||||
from masakari import objects
|
||||
from masakari.objects import fields
|
||||
@ -152,8 +154,7 @@ class MasakariManager(manager.Manager):
|
||||
|
||||
return notification_status
|
||||
|
||||
def process_notification(self, context, notification=None):
|
||||
"""Processes the notification"""
|
||||
def _process_notification(self, context, notification):
|
||||
@utils.synchronized(notification.source_host_uuid)
|
||||
def do_process_notification(notification):
|
||||
LOG.info(_LI('Processing notification %(notification_uuid)s of '
|
||||
@ -191,3 +192,46 @@ class MasakariManager(manager.Manager):
|
||||
notification.save()
|
||||
|
||||
do_process_notification(notification)
|
||||
|
||||
def process_notification(self, context, notification=None):
|
||||
"""Processes the notification"""
|
||||
self._process_notification(context, notification)
|
||||
|
||||
@periodic_task.periodic_task(
|
||||
spacing=CONF.process_unfinished_notifications_interval)
|
||||
def _process_unfinished_notifications(self, context):
|
||||
filters = {
|
||||
'status': [fields.NotificationStatus.ERROR,
|
||||
fields.NotificationStatus.NEW]
|
||||
}
|
||||
notifications_list = objects.NotificationList.get_all(context,
|
||||
filters=filters)
|
||||
|
||||
for notification in notifications_list:
|
||||
if (notification.status == fields.NotificationStatus.ERROR or
|
||||
(notification.status == fields.NotificationStatus.NEW and
|
||||
timeutils.is_older_than(
|
||||
notification.generated_time,
|
||||
CONF.retry_notification_new_status_interval))):
|
||||
self._process_notification(context, notification)
|
||||
|
||||
# get updated notification from db after workflow execution
|
||||
notification_db = objects.Notification.get_by_uuid(
|
||||
context, notification.notification_uuid)
|
||||
|
||||
if notification_db.status == fields.NotificationStatus.ERROR:
|
||||
# update notification status as failed
|
||||
notification_status = fields.NotificationStatus.FAILED
|
||||
update_data = {
|
||||
'status': notification_status
|
||||
}
|
||||
|
||||
notification_db.update(update_data)
|
||||
notification_db.save()
|
||||
LOG.error(_LE(
|
||||
"Periodic task 'process_unfinished_notifications': "
|
||||
"Notification %(notification_uuid)s exits with "
|
||||
"status: %(status)s."), {
|
||||
'notification_uuid': notification.notification_uuid,
|
||||
'status': notification_status
|
||||
})
|
||||
|
@ -39,6 +39,8 @@ class NovaClientTestCase(test.TestCase):
|
||||
|
||||
self.override_config('os_privileged_user_name', 'adminuser')
|
||||
self.override_config('os_privileged_user_password', 'strongpassword')
|
||||
self.override_config('os_privileged_user_auth_url',
|
||||
'http://keystonehost/identity_admin')
|
||||
|
||||
@mock.patch('novaclient.api_versions.APIVersion')
|
||||
@mock.patch('novaclient.client.Client')
|
||||
@ -48,7 +50,7 @@ class NovaClientTestCase(test.TestCase):
|
||||
p_client, p_api_version):
|
||||
nova.novaclient(self.ctx)
|
||||
p_plugin_loader.return_value.load_from_options.assert_called_once_with(
|
||||
auth_url='http://keystonehost:5000/v2.0',
|
||||
auth_url='http://keystonehost/identity_admin',
|
||||
password='strongpassword', project_name=None, username='adminuser'
|
||||
)
|
||||
p_client.assert_called_once_with(
|
||||
@ -65,7 +67,7 @@ class NovaClientTestCase(test.TestCase):
|
||||
p_client, p_api_version):
|
||||
nova.novaclient(self.ctx)
|
||||
p_plugin_loader.return_value.load_from_options.assert_called_once_with(
|
||||
auth_url='http://keystonehost:5000/v2.0',
|
||||
auth_url='http://keystonehost/identity_admin',
|
||||
password='strongpassword', project_name=None, username='adminuser'
|
||||
)
|
||||
p_client.assert_called_once_with(
|
||||
@ -82,11 +84,9 @@ class NovaClientTestCase(test.TestCase):
|
||||
p_plugin_loader,
|
||||
p_client,
|
||||
p_api_version):
|
||||
self.override_config('os_privileged_user_auth_url',
|
||||
'http://privatekeystonehost:5000/v2.0')
|
||||
nova.novaclient(self.ctx)
|
||||
p_plugin_loader.return_value.load_from_options.assert_called_once_with(
|
||||
auth_url='http://privatekeystonehost:5000/v2.0',
|
||||
auth_url='http://keystonehost/identity_admin',
|
||||
password='strongpassword', project_name=None, username='adminuser'
|
||||
)
|
||||
p_client.assert_called_once_with(
|
||||
@ -97,19 +97,14 @@ class NovaClientTestCase(test.TestCase):
|
||||
|
||||
@mock.patch('novaclient.api_versions.APIVersion')
|
||||
@mock.patch('novaclient.client.Client')
|
||||
@mock.patch('keystoneauth1.access.service_catalog.ServiceCatalogV2.'
|
||||
'url_for')
|
||||
@mock.patch('keystoneauth1.access.service_catalog.ServiceCatalogV3.'
|
||||
'url_for')
|
||||
@mock.patch('keystoneauth1.loading.get_plugin_loader')
|
||||
@mock.patch('keystoneauth1.session.Session')
|
||||
def test_nova_client_custom_region(self, p_session, p_plugin_loader,
|
||||
p_catalogv3, p_catalogv2,
|
||||
p_client, p_api_version):
|
||||
self.override_config('os_region_name', 'farfaraway')
|
||||
nova.novaclient(self.ctx)
|
||||
p_plugin_loader.return_value.load_from_options.assert_called_once_with(
|
||||
auth_url=p_catalogv2() or p_catalogv3(),
|
||||
auth_url='http://keystonehost/identity_admin',
|
||||
password='strongpassword', project_name=None, username='adminuser'
|
||||
)
|
||||
p_client.assert_called_once_with(
|
||||
|
Loading…
Reference in New Issue
Block a user