magnum/magnum/conductor/bay_lock.py

131 lines
5.2 KiB
Python

#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import contextlib
from oslo_config import cfg
import oslo_messaging as messaging
from oslo_utils import excutils
from magnum.common import exception
from magnum.conductor.api import ListenerAPI
from magnum.i18n import _LI
from magnum.i18n import _LW
from magnum import objects
from magnum.openstack.common import log as logging
cfg.CONF.import_opt('topic', 'magnum.conductor.config',
group='conductor')
cfg.CONF.import_opt('conductor_life_check_timeout', 'magnum.conductor.config',
group='conductor')
LOG = logging.getLogger(__name__)
class BayLock(object):
def __init__(self, context, bay, conductor_id):
self.context = context
self.bay = bay
self.conductor_id = conductor_id
@staticmethod
def conductor_alive(context, conductor_id):
topic = cfg.CONF.conductor.topic
timeout = cfg.CONF.conductor.conductor_life_check_timeout
listener_api = ListenerAPI(context=context, topic=topic,
server=conductor_id, timeout=timeout)
try:
return listener_api.ping_conductor()
except messaging.MessagingTimeout:
return False
def acquire(self, retry=True):
"""Acquire a lock on the bay.
:param retry: When True, retry if lock was released while stealing.
"""
lock_conductor_id = objects.BayLock.create(self.bay.uuid,
self.conductor_id)
if lock_conductor_id is None:
LOG.debug("Conductor %(conductor)s acquired lock on bay "
"%(bay)s" % {'conductor': self.conductor_id,
'bay': self.bay.uuid})
return
if (lock_conductor_id == self.conductor_id or
self.conductor_alive(self.context, lock_conductor_id)):
LOG.debug("Lock on bay %(bay)s is owned by conductor "
"%(conductor)s" % {'bay': self.bay.uuid,
'conductor': lock_conductor_id})
raise exception.OperationInProgress(bay_name=self.bay.name)
else:
LOG.info(_LI("Stale lock detected on bay %(bay)s. Conductor "
"%(conductor)s will attempt to steal the lock"),
{'bay': self.bay.uuid, 'conductor': self.conductor_id})
result = objects.BayLock.steal(self.bay.uuid,
lock_conductor_id,
self.conductor_id)
if result is None:
LOG.info(_LI("Conductor %(conductor)s successfully stole the "
"lock on bay %(bay)s"),
{'conductor': self.conductor_id,
'bay': self.bay.uuid})
return
elif result is True:
if retry:
LOG.info(_LI("The lock on bay %(bay)s was released while "
"conductor %(conductor)s was stealing it. "
"Trying again"),
{'bay': self.bay.uuid,
'conductor': self.conductor_id})
return self.acquire(retry=False)
else:
new_lock_conductor_id = result
LOG.info(_LI("Failed to steal lock on bay %(bay)s. "
"Conductor %(conductor)s stole the lock first"),
{'bay': self.bay.uuid,
'conductor': new_lock_conductor_id})
raise exception.OperationInProgress(bay_name=self.bay.name)
def release(self, bay_uuid):
"""Release a bay lock."""
# Only the conductor that owns the lock will be releasing it.
result = objects.BayLock.release(bay_uuid, self.conductor_id)
if result is True:
LOG.warn(_LW("Lock was already released on bay %s!"), bay_uuid)
else:
LOG.debug("Conductor %(conductor)s released lock on bay "
"%(bay)s" % {'conductor': self.conductor_id,
'bay': bay_uuid})
@contextlib.contextmanager
def thread_lock(self, bay_uuid):
"""Acquire a lock and release it only if there is an exception.
The release method still needs to be scheduled to be run at the
end of the thread using the Thread.link method.
"""
try:
self.acquire()
yield
except exception.OperationInProgress:
raise
except: # noqa
with excutils.save_and_reraise_exception():
self.release(bay_uuid)