Merge "Ensure only one pool-manager performs periodic tasks"

This commit is contained in:
Jenkins 2015-07-09 17:07:07 +00:00 committed by Gerrit Code Review
commit f900c75c26
1 changed files with 17 additions and 9 deletions

View File

@ -22,6 +22,7 @@ from oslo_log import log as logging
from oslo_concurrency import lockutils from oslo_concurrency import lockutils
from designate import backend from designate import backend
from designate import coordination
from designate import exceptions from designate import exceptions
from designate import objects from designate import objects
from designate import utils from designate import utils
@ -60,7 +61,8 @@ def wrap_backend_call():
raise exceptions.Backend('Unknown backend failure: %r' % e) raise exceptions.Backend('Unknown backend failure: %r' % e)
class Service(service.RPCService, service.Service): class Service(service.RPCService, coordination.CoordinationMixin,
service.Service):
""" """
Service side of the Pool Manager RPC API. Service side of the Pool Manager RPC API.
@ -128,6 +130,12 @@ class Service(service.RPCService, service.Service):
super(Service, self).start() super(Service, self).start()
# Setup a Leader Election, use for ensuring certain tasks are executed
# on exactly one pool-manager instance at a time]
self._pool_election = coordination.LeaderElection(
self._coordinator, '%s:%s' % (self.service_name, self.pool.id))
self._pool_election.start()
if CONF['service:pool_manager'].enable_recovery_timer: if CONF['service:pool_manager'].enable_recovery_timer:
LOG.info(_LI('Starting periodic recovery timer')) LOG.info(_LI('Starting periodic recovery timer'))
self.tg.add_timer( self.tg.add_timer(
@ -143,11 +151,13 @@ class Service(service.RPCService, service.Service):
CONF['service:pool_manager'].periodic_sync_interval) CONF['service:pool_manager'].periodic_sync_interval)
def stop(self): def stop(self):
for target in self.pool.targets: self._pool_election.stop()
self.target_backends[target.id].stop()
super(Service, self).stop() super(Service, self).stop()
for target in self.pool.targets:
self.target_backends[target.id].stop()
@property @property
def central_api(self): def central_api(self):
return central_api.CentralAPI.get_instance() return central_api.CentralAPI.get_instance()
@ -161,9 +171,8 @@ class Service(service.RPCService, service.Service):
""" """
:return: None :return: None
""" """
# TODO(kiall): Replace this inter-process-lock with a distributed # NOTE(kiall): Only run this periodic task on the pool leader
# lock, likely using the tooz library - see bug 1445127. if self._pool_election.is_leader:
with lockutils.lock('periodic_recovery', external=True, delay=30):
context = DesignateContext.get_admin_context(all_tenants=True) context = DesignateContext.get_admin_context(all_tenants=True)
LOG.debug("Starting Periodic Recovery") LOG.debug("Starting Periodic Recovery")
@ -195,9 +204,8 @@ class Service(service.RPCService, service.Service):
""" """
:return: None :return: None
""" """
# TODO(kiall): Replace this inter-process-lock with a distributed # NOTE(kiall): Only run this periodic task on the pool leader
# lock, likely using the tooz library - see bug 1445127. if self._pool_election.is_leader:
with lockutils.lock('periodic_sync', external=True, delay=30):
context = DesignateContext.get_admin_context(all_tenants=True) context = DesignateContext.get_admin_context(all_tenants=True)
LOG.debug("Starting Periodic Synchronization") LOG.debug("Starting Periodic Synchronization")