Merge "check expired notifications"
This commit is contained in:
@@ -97,6 +97,13 @@ notification_opts = [
|
||||
"generated_time, then it is considered that notification "
|
||||
"is ignored by the messaging queue and will be processed "
|
||||
"by 'process_unfinished_notifications' periodic task."),
|
||||
cfg.IntOpt('check_expired_notifications_interval',
|
||||
default=600,
|
||||
help='Interval in seconds for checking running notifications.'),
|
||||
cfg.IntOpt('notifications_expired_interval',
|
||||
default=86400,
|
||||
help='Interval in seconds for identifying running '
|
||||
'notifications expired.'),
|
||||
cfg.IntOpt('host_failure_recovery_threads',
|
||||
default=3,
|
||||
min=1,
|
||||
|
||||
@@ -368,6 +368,34 @@ class MasakariManager(manager.Manager):
|
||||
{'notification_uuid': notification.notification_uuid,
|
||||
'status': notification_status})
|
||||
|
||||
@periodic_task.periodic_task(
|
||||
spacing=CONF.check_expired_notifications_interval)
|
||||
def _check_expired_notifications(self, context):
|
||||
filters = {
|
||||
'status': [fields.NotificationStatus.RUNNING,
|
||||
fields.NotificationStatus.ERROR,
|
||||
fields.NotificationStatus.NEW]
|
||||
}
|
||||
notifications_list = objects.NotificationList.get_all(context,
|
||||
filters=filters)
|
||||
|
||||
for notification in notifications_list:
|
||||
if timeutils.is_older_than(
|
||||
notification.generated_time,
|
||||
CONF.notifications_expired_interval):
|
||||
# update running expired notification status as failed
|
||||
notification_status = fields.NotificationStatus.FAILED
|
||||
update_data = {
|
||||
'status': notification_status
|
||||
}
|
||||
|
||||
notification.update(update_data)
|
||||
notification.save()
|
||||
LOG.error(
|
||||
"Periodic task 'check_expired_notifications': "
|
||||
"Notification %(notification_uuid)s is expired.",
|
||||
{'notification_uuid': notification.notification_uuid})
|
||||
|
||||
def get_notification_recovery_workflow_details(self, context,
|
||||
notification):
|
||||
"""Retrieve recovery workflow details of the notification"""
|
||||
|
||||
@@ -12,6 +12,8 @@
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import datetime
|
||||
from unittest import mock
|
||||
|
||||
from oslo_utils import importutils
|
||||
@@ -34,6 +36,8 @@ from masakari.tests import uuidsentinel
|
||||
CONF = masakari.conf.CONF
|
||||
|
||||
NOW = timeutils.utcnow().replace(microsecond=0)
|
||||
EXPIRED_TIME = timeutils.utcnow().replace(microsecond=0) \
|
||||
- datetime.timedelta(seconds=CONF.notifications_expired_interval)
|
||||
|
||||
|
||||
def _get_vm_type_notification(status="new"):
|
||||
@@ -69,14 +73,15 @@ class EngineManagerUnitTestCase(test.NoDBTestCase):
|
||||
generated_time=NOW, status="new",
|
||||
notification_uuid=uuidsentinel.fake_notification)
|
||||
|
||||
def _get_compute_host_type_notification(self):
|
||||
def _get_compute_host_type_notification(self, expired=False):
|
||||
return fakes.create_fake_notification(
|
||||
type="COMPUTE_HOST", id=1, payload={
|
||||
'event': 'stopped', 'host_status': 'NORMAL',
|
||||
'cluster_status': 'ONLINE'
|
||||
},
|
||||
source_host_uuid=uuidsentinel.fake_host,
|
||||
generated_time=NOW, status="new",
|
||||
generated_time=EXPIRED_TIME if expired else NOW,
|
||||
status="new",
|
||||
notification_uuid=uuidsentinel.fake_notification)
|
||||
|
||||
@mock.patch("masakari.engine.drivers.taskflow."
|
||||
@@ -1147,3 +1152,12 @@ class EngineManagerUnitTestCase(test.NoDBTestCase):
|
||||
|
||||
mock_progress_details.assert_called_once_with(
|
||||
self.context, notification)
|
||||
|
||||
@mock.patch.object(notification_obj.Notification, "save")
|
||||
@mock.patch.object(notification_obj.NotificationList, "get_all")
|
||||
def test_check_expired_notifications(self, mock_get_all, mock_save,
|
||||
mock_notification_get):
|
||||
notification = self._get_compute_host_type_notification(expired=True)
|
||||
mock_get_all.return_value = [notification]
|
||||
self.engine._check_expired_notifications(self.context)
|
||||
self.assertEqual("failed", notification.status)
|
||||
|
||||
Reference in New Issue
Block a user