Add TaskFlowDriver support to process notifications

Added base NotificationDriver and TaskFlowDriver which will implement
NotificationDriver for procesing the notification workflows such as
host-failure, instance-failure and process-failure.

Change-Id: I2d12681bd26b5b1fb2535732d9f3c08ec2409d7a
This commit is contained in:
dineshbhor 2016-10-14 14:12:32 +05:30
parent 49c1171625
commit c3a58e4988
10 changed files with 265 additions and 1 deletions

View File

@ -40,6 +40,16 @@ whenever an RPC call to the masakari engine is made.
]
driver_opts = [
cfg.StrOpt(
'notification_driver',
default='taskflow_driver',
help="""
Defines which driver to use for executing notification workflows.
"""),
]
notification_opts = [
cfg.IntOpt('duplicate_notification_detection_interval',
default=180,
@ -55,7 +65,7 @@ notification_opts = [
]
ALL_OPTS = (rpcapi_opts + notification_opts)
ALL_OPTS = (rpcapi_opts + notification_opts + driver_opts)
def register_opts(conf):

84
masakari/engine/driver.py Normal file
View File

@ -0,0 +1,84 @@
# Copyright 2016 NTT DATA
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Driver base-class:
(Beginning of) the contract that masakari drivers must follow, and shared
types that support that contract
"""
import abc
import sys
from oslo_log import log as logging
import six
from stevedore import driver
import masakari.conf
from masakari.i18n import _LE, _LI
from masakari import utils
CONF = masakari.conf.CONF
LOG = logging.getLogger(__name__)
@six.add_metaclass(abc.ABCMeta)
class NotificationDriver(object):
@abc.abstractmethod
def execute_host_failure(self, context, host_name, recovery_method,
notification_uuid):
pass
@abc.abstractmethod
def execute_instance_failure(self, context, instance_uuid,
notification_uuid):
pass
@abc.abstractmethod
def execute_process_failure(self, context, process_name, host_name,
notification_uuid):
pass
def load_masakari_driver(masakari_driver=None):
"""Load a masakari driver module.
Load the masakari driver module specified by the notification_driver
configuration option or, if supplied, the driver name supplied as an
argument.
:param masakari_driver: a masakari driver name to override the config opt
:returns: a NotificationDriver instance
"""
if not masakari_driver:
masakari_driver = CONF.notification_driver
if not masakari_driver:
LOG.error(_LE("Notification driver option required, but not"
"specified"))
sys.exit(1)
LOG.info(_LI("Loading masakari notification driver '%s'"), masakari_driver)
try:
notification_driver = driver.DriverManager('masakari.driver',
masakari_driver,
invoke_on_load=True).driver
return utils.check_isinstance(notification_driver, NotificationDriver)
except ImportError:
LOG.exception(_LE("Failed to load notification driver "
"'%s'."), masakari_driver)
sys.exit(1)

View File

@ -0,0 +1,16 @@
# Copyright 2016 NTT DATA
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
__import__('pkg_resources').declare_namespace(__name__)

View File

@ -0,0 +1,18 @@
# Copyright 2016 NTT DATA
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from masakari.engine.drivers.taskflow import driver
TaskFlowDriver = driver.TaskFlowDriver

View File

@ -0,0 +1,82 @@
# Copyright 2016 NTT DATA
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
from oslo_log import log as logging
# For more information please visit: https://wiki.openstack.org/wiki/TaskFlow
from taskflow.listeners import base
from taskflow.listeners import logging as logging_listener
from taskflow import task
from masakari import exception
LOG = logging.getLogger(__name__)
def _make_task_name(cls, addons=None):
"""Makes a pretty name for a task class."""
base_name = ".".join([cls.__module__, cls.__name__])
extra = ''
if addons:
extra = ';%s' % (", ".join([str(a) for a in addons]))
return base_name + extra
class MasakariTask(task.Task):
"""The root task class for all masakari tasks.
It automatically names the given task using the module and class that
implement the given task as the task name.
"""
def __init__(self, addons=None, **kwargs):
super(MasakariTask, self).__init__(self.make_name(addons), **kwargs)
@classmethod
def make_name(cls, addons=None):
return _make_task_name(cls, addons)
class DynamicLogListener(logging_listener.DynamicLoggingListener):
"""This is used to attach to taskflow engines while they are running.
It provides a bunch of useful features that expose the actions happening
inside a taskflow engine, which can be useful for developers for debugging,
for operations folks for monitoring and tracking of the resource actions
and more...
"""
#: Exception is an excepted case, don't include traceback in log if fails.
_NO_TRACE_EXCEPTIONS = (exception.InvalidInput)
def __init__(self, engine,
task_listen_for=base.DEFAULT_LISTEN_FOR,
flow_listen_for=base.DEFAULT_LISTEN_FOR,
retry_listen_for=base.DEFAULT_LISTEN_FOR,
logger=LOG):
super(DynamicLogListener, self).__init__(
engine,
task_listen_for=task_listen_for,
flow_listen_for=flow_listen_for,
retry_listen_for=retry_listen_for,
log=logger)
def _format_failure(self, fail):
if fail.check(*self._NO_TRACE_EXCEPTIONS) is not None:
exc_info = None
exc_details = '%s%s' % (os.linesep, fail.pformat(traceback=False))
return (exc_info, exc_details)
else:
return super(DynamicLogListener, self)._format_failure(fail)

View File

@ -0,0 +1,40 @@
# Copyright 2016 NTT DATA
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Driver TaskFlowDriver:
Execute notification workflows using taskflow.
"""
from masakari.engine import driver
class TaskFlowDriver(driver.NotificationDriver):
def __init__(self):
super(TaskFlowDriver, self).__init__()
def execute_host_failure(self, context, host_name, recovery_method,
notification_uuid):
raise NotImplementedError()
def execute_instance_failure(self, context, instance_uuid,
notification_uuid):
raise NotImplementedError()
def execute_process_failure(self, context, process_name, host_name,
notification_uuid):
raise NotImplementedError()

View File

@ -26,6 +26,7 @@ from oslo_log import log as logging
import oslo_messaging as messaging
import masakari.conf
from masakari.engine import driver
from masakari.i18n import _LI
from masakari import manager
from masakari.objects import fields
@ -47,6 +48,8 @@ class MasakariManager(manager.Manager):
super(MasakariManager, self).__init__(service_name="engine",
*args, **kwargs)
self.driver = driver.load_masakari_driver(masakari_driver)
def process_notification(self, context, notification=None):
"""Processes the notification"""
@utils.synchronized(notification.source_host_uuid)

View File

@ -60,6 +60,13 @@ def utf8(value):
return value.encode('utf-8')
def check_isinstance(obj, cls):
"""Checks that obj is of type cls, and lets PyLint infer types."""
if isinstance(obj, cls):
return obj
raise Exception(_('Expected object of type: %s') % (str(cls)))
def monkey_patch():
"""If the CONF.monkey_patch set as True,
this function patches a decorator

View File

@ -22,3 +22,4 @@ pbr>=1.6 # Apache-2.0
setuptools>=16.0 # PSF/ZPL
six>=1.9.0 # MIT
stevedore>=1.10.0 # Apache-2.0
taskflow>=1.26.0 # Apache-2.0

View File

@ -44,6 +44,9 @@ masakari.api.v1.extensions =
hosts = masakari.api.openstack.ha.hosts:Hosts
notifications = masakari.api.openstack.ha.notifications:Notifications
masakari.driver =
taskflow_driver = masakari.engine.drivers.taskflow:TaskFlowDriver
[build_sphinx]
source-dir = doc/source
build-dir = doc/build