DLM to fix host notification race.
Two concurrent events that host is down might be accepted by different instances of API which triggers two evacuation workflows for instances. Patch introduces distributed lock for creation of host type notifications. It solves the issue. Closes-Bug: 1961110 Change-Id: Ie8f10b14f29a8548181560cd8a26b4dc79afc3dc
This commit is contained in:
parent
c3fc0c2f9d
commit
9fef8807cf
|
@ -54,4 +54,10 @@ In the ``[keystone_authtoken]`` sections, configure Identity service access:
|
|||
|
||||
Replace ``MASAKARI_PASS`` with the password you chose for the ``masakari`` user in the Identity service.
|
||||
|
||||
In the ``[coordination]`` section, set 'backend_url' if use coordination for Masakari-api service.
|
||||
|
||||
.. note::
|
||||
Additional packages may be required depending on the tooz backend used in
|
||||
the installation. For example, ``etcd3gw`` is required if the backend driver
|
||||
is configured to use ``etcd3+http://``. Supported drivers are listed at
|
||||
`Tooz drivers <https://docs.openstack.org/tooz/latest/user/drivers.html>`_.
|
||||
|
|
|
@ -26,6 +26,7 @@ from paste import deploy
|
|||
from masakari.common import config
|
||||
import masakari.conf
|
||||
from masakari import config as api_config
|
||||
from masakari import coordination
|
||||
from masakari import exception
|
||||
from masakari import objects
|
||||
from masakari import rpc
|
||||
|
@ -53,7 +54,9 @@ def main():
|
|||
|
||||
launcher = service.process_launcher()
|
||||
try:
|
||||
server = service.WSGIService("masakari_api", use_ssl=CONF.use_ssl)
|
||||
use_coordination = bool(CONF.coordination.backend_url)
|
||||
server = service.WSGIService("masakari_api", use_ssl=CONF.use_ssl,
|
||||
coordination=use_coordination)
|
||||
launcher.launch_service(server, workers=server.workers or 1)
|
||||
except exception.PasteAppNotFound as ex:
|
||||
log.error("Failed to start ``masakari_api`` service. Error: %s",
|
||||
|
@ -80,6 +83,10 @@ def initialize_application():
|
|||
CONF.log_opt_values(logging.getLogger(__name__), logging.DEBUG)
|
||||
|
||||
config.set_middleware_defaults()
|
||||
|
||||
if CONF.coordination.backend_url:
|
||||
coordination.COORDINATOR.start()
|
||||
|
||||
rpc.init(CONF)
|
||||
conf = conf_files[0]
|
||||
|
||||
|
|
|
@ -17,6 +17,7 @@ from oslo_config import cfg
|
|||
|
||||
from masakari.conf import api
|
||||
from masakari.conf import base
|
||||
from masakari.conf import coordination
|
||||
from masakari.conf import database
|
||||
from masakari.conf import engine
|
||||
from masakari.conf import engine_driver
|
||||
|
@ -32,6 +33,7 @@ CONF = cfg.CONF
|
|||
|
||||
api.register_opts(CONF)
|
||||
base.register_opts(CONF)
|
||||
coordination.register_opts(CONF)
|
||||
database.register_opts(CONF)
|
||||
engine.register_opts(CONF)
|
||||
engine_driver.register_opts(CONF)
|
||||
|
|
|
@ -0,0 +1,41 @@
|
|||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from oslo_config import cfg
|
||||
|
||||
|
||||
coordination_opts = [
|
||||
cfg.StrOpt('backend_url',
|
||||
default=None,
|
||||
help="The backend URL to use for distributed coordination."
|
||||
"By default it's None which means that coordination is "
|
||||
"disabled. The coordination is implemented for "
|
||||
"distributed lock management and was tested with etcd."
|
||||
"Coordination doesn't work for file driver because lock "
|
||||
"files aren't removed after lock releasing."),
|
||||
]
|
||||
|
||||
|
||||
def register_opts(conf):
|
||||
"""Registers coordination configuration options
|
||||
:param conf: configuration
|
||||
"""
|
||||
conf.register_opts(coordination_opts, group="coordination")
|
||||
|
||||
|
||||
def list_opts():
|
||||
"""Lists coordination configuration options
|
||||
:return: coordination configuration options
|
||||
"""
|
||||
return {"coordination": coordination_opts}
|
|
@ -0,0 +1,155 @@
|
|||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
"""Coordination and locking utilities."""
|
||||
|
||||
import inspect
|
||||
import uuid
|
||||
|
||||
import decorator
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log
|
||||
from oslo_utils import timeutils
|
||||
from tooz import coordination
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
class Coordinator(object):
|
||||
"""Tooz coordination wrapper.
|
||||
|
||||
Coordination member id is created from concatenated
|
||||
`prefix` and `agent_id` parameters.
|
||||
|
||||
:param str agent_id: Agent identifier
|
||||
:param str prefix: Used to provide member identifier with a
|
||||
meaningful prefix.
|
||||
"""
|
||||
|
||||
def __init__(self, agent_id=None, prefix=''):
|
||||
self.coordinator = None
|
||||
self.agent_id = agent_id or str(uuid.uuid4())
|
||||
self.started = False
|
||||
self.prefix = prefix
|
||||
|
||||
def start(self):
|
||||
"""Starts coordination
|
||||
:return: None
|
||||
"""
|
||||
if self.started:
|
||||
return
|
||||
|
||||
# Tooz expects member_id as a byte string.
|
||||
member_id = (self.prefix + self.agent_id).encode('ascii')
|
||||
self.coordinator = coordination.get_coordinator(
|
||||
cfg.CONF.coordination.backend_url, member_id)
|
||||
self.coordinator.start(start_heart=True)
|
||||
self.started = True
|
||||
|
||||
def stop(self):
|
||||
"""Disconnect from coordination backend and stop heartbeat."""
|
||||
if self.started:
|
||||
self.coordinator.stop()
|
||||
self.coordinator = None
|
||||
self.started = False
|
||||
|
||||
def get_lock(self, name):
|
||||
"""Return a Tooz backend lock.
|
||||
|
||||
:param str name: The lock name that is used to identify it
|
||||
across all nodes.
|
||||
"""
|
||||
# Tooz expects lock name as a byte string.
|
||||
lock_name = (self.prefix + name).encode('ascii')
|
||||
if cfg.CONF.coordination.backend_url:
|
||||
return self.coordinator.get_lock(lock_name)
|
||||
LOG.debug("Cannot get lock %s, because coordination is not configured",
|
||||
lock_name)
|
||||
|
||||
|
||||
COORDINATOR = Coordinator(prefix='masakari-')
|
||||
|
||||
|
||||
def synchronized(lock_name, blocking=True, coordinator=COORDINATOR):
|
||||
"""Synchronization decorator.
|
||||
|
||||
:param str lock_name: Lock name.
|
||||
:param blocking: If True, blocks until the lock is acquired.
|
||||
If False, raises exception when not acquired. Otherwise,
|
||||
the value is used as a timeout value and if lock is not acquired
|
||||
after this number of seconds exception is raised.
|
||||
:param coordinator: Coordinator class to use when creating lock.
|
||||
Defaults to the global coordinator.
|
||||
:raises tooz.coordination.LockAcquireFailed: if lock is not acquired
|
||||
|
||||
Decorating a method like so::
|
||||
|
||||
@synchronized('my_lock')
|
||||
def foo(self, *args):
|
||||
...
|
||||
|
||||
ensures that only one process will execute the foo method at a time.
|
||||
|
||||
Different methods can share the same lock::
|
||||
|
||||
@synchronized('my_lock')
|
||||
def foo(self, *args):
|
||||
...
|
||||
|
||||
@synchronized('my_lock')
|
||||
def bar(self, *args):
|
||||
...
|
||||
|
||||
This way only one of either foo or bar can be executing at a time.
|
||||
|
||||
Lock name can be formatted using Python format string syntax::
|
||||
|
||||
@synchronized('{function_name}-{vol.id}-{snap[name]}')
|
||||
def foo(self, vol, snap):
|
||||
...
|
||||
|
||||
Available field names are: decorated function parameters and
|
||||
`function_name` as a decorated function name.
|
||||
"""
|
||||
|
||||
@decorator.decorator
|
||||
def _synchronized(f, *a, **k):
|
||||
call_args = inspect.getcallargs(f, *a, **k)
|
||||
call_args['function_name'] = f.__name__
|
||||
lock = coordinator.get_lock(lock_name.format(**call_args))
|
||||
if lock:
|
||||
t1 = timeutils.now()
|
||||
t2 = None
|
||||
try:
|
||||
with lock(blocking):
|
||||
t2 = timeutils.now()
|
||||
LOG.debug('Lock "%(name)s" acquired by "%(function)s" :: '
|
||||
'waited %(wait_secs)0.3fs',
|
||||
{'name': lock.name,
|
||||
'function': f.__name__,
|
||||
'wait_secs': (t2 - t1)})
|
||||
return f(*a, **k)
|
||||
finally:
|
||||
t3 = timeutils.now()
|
||||
if t2 is None:
|
||||
held_secs = "N/A"
|
||||
else:
|
||||
held_secs = "%0.3fs" % (t3 - t2)
|
||||
LOG.debug('Lock "%(name)s" released by "%(function)s" :: held '
|
||||
'%(held_secs)s',
|
||||
{'name': lock.name,
|
||||
'function': f.__name__,
|
||||
'held_secs': held_secs})
|
||||
else:
|
||||
return f(*a, **k)
|
||||
|
||||
return _synchronized
|
|
@ -23,6 +23,7 @@ from oslo_utils import uuidutils
|
|||
from masakari.api import utils as api_utils
|
||||
from masakari.compute import nova
|
||||
import masakari.conf
|
||||
from masakari.coordination import synchronized
|
||||
from masakari.engine import rpcapi as engine_rpcapi
|
||||
from masakari import exception
|
||||
from masakari.i18n import _
|
||||
|
@ -291,9 +292,7 @@ class NotificationAPI(object):
|
|||
|
||||
return False
|
||||
|
||||
def create_notification(self, context, notification_data):
|
||||
"""Create notification"""
|
||||
|
||||
def _create_notification(self, context, notification_data):
|
||||
# Check whether host from which the notification came is already
|
||||
# present in failover segment or not
|
||||
host_name = notification_data.get('hostname')
|
||||
|
@ -320,7 +319,7 @@ class NotificationAPI(object):
|
|||
|
||||
if self._is_duplicate_notification(context, notification):
|
||||
message = (_("Notification received from host %(host)s of "
|
||||
" type '%(type)s' is duplicate.") %
|
||||
"type %(type)s is duplicate.") %
|
||||
{'host': host_name, 'type': notification.type})
|
||||
raise exception.DuplicateNotification(message=message)
|
||||
|
||||
|
@ -330,12 +329,26 @@ class NotificationAPI(object):
|
|||
except Exception as e:
|
||||
with excutils.save_and_reraise_exception():
|
||||
tb = traceback.format_exc()
|
||||
api_utils.notify_about_notification_api(context, notification,
|
||||
api_utils.notify_about_notification_api(
|
||||
context, notification,
|
||||
action=fields.EventNotificationAction.NOTIFICATION_CREATE,
|
||||
phase=fields.EventNotificationPhase.ERROR, exception=e,
|
||||
tb=tb)
|
||||
return notification
|
||||
|
||||
@synchronized('create_host_notification-{notification_data[hostname]}')
|
||||
def _create_host_type_notification(self, context, notification_data):
|
||||
return self._create_notification(context, notification_data)
|
||||
|
||||
def create_notification(self, context, notification_data):
|
||||
"""Create notification"""
|
||||
create_notification_function = '_create_notification'
|
||||
if notification_data.get('type') == \
|
||||
fields.NotificationType.COMPUTE_HOST:
|
||||
create_notification_function = '_create_host_type_notification'
|
||||
return getattr(self, create_notification_function)(context,
|
||||
notification_data)
|
||||
|
||||
def get_all(self, context, filters=None, sort_keys=None,
|
||||
sort_dirs=None, limit=None, marker=None):
|
||||
"""Get all notifications filtered by one of the given parameters.
|
||||
|
|
|
@ -27,6 +27,7 @@ from oslo_utils import importutils
|
|||
|
||||
import masakari.conf
|
||||
from masakari import context
|
||||
from masakari import coordination as masakari_coordination
|
||||
from masakari import exception
|
||||
from masakari.i18n import _
|
||||
from masakari.objects import base as objects_base
|
||||
|
@ -188,7 +189,8 @@ class Service(service.Service):
|
|||
class WSGIService(service.Service):
|
||||
"""Provides ability to launch API from a 'paste' configuration."""
|
||||
|
||||
def __init__(self, name, loader=None, use_ssl=False, max_url_len=None):
|
||||
def __init__(self, name, loader=None, use_ssl=False, max_url_len=None,
|
||||
coordination=False):
|
||||
"""Initialize, but do not start the WSGI server.
|
||||
|
||||
:param name: The name of the WSGI server given to the loader.
|
||||
|
@ -221,6 +223,7 @@ class WSGIService(service.Service):
|
|||
port=self.port,
|
||||
use_ssl=self.use_ssl,
|
||||
max_url_len=max_url_len)
|
||||
self.coordination = coordination
|
||||
|
||||
def reset(self):
|
||||
"""Reset server greenpool size to default.
|
||||
|
@ -239,6 +242,8 @@ class WSGIService(service.Service):
|
|||
:returns: None
|
||||
|
||||
"""
|
||||
if self.coordination:
|
||||
masakari_coordination.COORDINATOR.start()
|
||||
self.server.start()
|
||||
|
||||
def stop(self):
|
||||
|
@ -247,6 +252,12 @@ class WSGIService(service.Service):
|
|||
:returns: None
|
||||
|
||||
"""
|
||||
if self.coordination:
|
||||
try:
|
||||
masakari_coordination.COORDINATOR.stop()
|
||||
except Exception as error:
|
||||
LOG.warning('Error occurred during masakari coordination was '
|
||||
'stopped: %s', error)
|
||||
self.server.stop()
|
||||
|
||||
def wait(self):
|
||||
|
|
|
@ -34,7 +34,7 @@ class TestMasakariAPI(test.NoDBTestCase):
|
|||
mock_service.WSGIService.side_effect = fake_service
|
||||
api.main()
|
||||
mock_service.WSGIService.assert_has_calls([
|
||||
mock.call('masakari_api', use_ssl=False),
|
||||
mock.call('masakari_api', use_ssl=False, coordination=False)
|
||||
])
|
||||
launcher = mock_service.process_launcher.return_value
|
||||
launcher.launch_service.assert_called_once_with(
|
||||
|
|
|
@ -22,6 +22,7 @@ from oslo_utils import timeutils
|
|||
|
||||
from masakari.api import utils as api_utils
|
||||
from masakari.compute import nova as nova_obj
|
||||
from masakari import coordination
|
||||
from masakari.engine import rpcapi as engine_rpcapi
|
||||
from masakari import exception
|
||||
from masakari.ha import api as ha_api
|
||||
|
@ -720,6 +721,7 @@ class NotificationAPITestCase(test.NoDBTestCase):
|
|||
)
|
||||
self.exception_duplicate = exception.DuplicateNotification(
|
||||
host='host_1', type='COMPUTE_HOST')
|
||||
coordination.Coordinator.get_lock = mock.MagicMock()
|
||||
|
||||
def _assert_notification_data(self, expected, actual):
|
||||
self.assertTrue(obj_base.obj_equal_prims(expected, actual),
|
||||
|
|
|
@ -0,0 +1,12 @@
|
|||
---
|
||||
fixes:
|
||||
- |
|
||||
Fixes an issue which triggers two recovery workflows for the same
|
||||
host failure. `LP#1961110
|
||||
<https://bugs.launchpad.net/masakari/+bug/1961110>`__
|
||||
|
||||
It introduces distributed lock for Masakari-api services when handle
|
||||
the concurrent notifications for the same host failure from multiple
|
||||
Masakari-hostmonitor services. To enable coordination, the user needs
|
||||
to set the new configuration option ``[coordination]backend_url``,
|
||||
which specifies the backend.
|
|
@ -18,7 +18,7 @@ oslo.middleware>=3.31.0 # Apache-2.0
|
|||
oslo.policy>=3.6.0 # Apache-2.0
|
||||
oslo.service!=1.28.1,>=1.24.0 # Apache-2.0
|
||||
oslo.upgradecheck>=1.3.0 # Apache-2.0
|
||||
oslo.utils>=3.33.0 # Apache-2.0
|
||||
oslo.utils>=4.7.0 # Apache-2.0
|
||||
oslo.versionedobjects>=1.31.2 # Apache-2.0
|
||||
pbr!=2.1.0,>=2.0.0 # Apache-2.0
|
||||
python-novaclient>=9.1.0 # Apache-2.0
|
||||
|
@ -26,3 +26,4 @@ stevedore>=1.20.0 # Apache-2.0
|
|||
SQLAlchemy>=1.2.19 # MIT
|
||||
SQLAlchemy-Utils>=0.33.10 # Apache-2.0
|
||||
taskflow>=2.16.0 # Apache-2.0
|
||||
tooz>=2.10.1 # Apache-2.0
|
||||
|
|
Loading…
Reference in New Issue