Standup coordination during start
If coordination is stood up during the service consturctor, we end up with multiple processes using the same coordination ID. Change-Id: I3287ec585522495be64bdca2ab0e6ed83ddbeeac
This commit is contained in:
parent
5b41fab00f
commit
9030d92ab1
@ -54,28 +54,28 @@ class CoordinationMixin(object):
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(CoordinationMixin, self).__init__(*args, **kwargs)
|
||||
|
||||
self._coordination_id = ":".join([CONF.host, str(uuid.uuid4())])
|
||||
self._coordinator = None
|
||||
if CONF.coordination.backend_url is not None:
|
||||
self._init_coordination()
|
||||
else:
|
||||
msg = _LW("No coordination backend configured, distributed "
|
||||
"coordination functionality will be disabled."
|
||||
" Please configure a coordination backend.")
|
||||
LOG.warn(msg)
|
||||
|
||||
def _init_coordination(self):
|
||||
backend_url = cfg.CONF.coordination.backend_url
|
||||
self._coordinator = tooz.coordination.get_coordinator(
|
||||
backend_url, self._coordination_id)
|
||||
self._coordination_started = False
|
||||
|
||||
self.tg.add_timer(cfg.CONF.coordination.heartbeat_interval,
|
||||
self._coordinator_heartbeat)
|
||||
self.tg.add_timer(cfg.CONF.coordination.run_watchers_interval,
|
||||
self._coordinator_run_watchers)
|
||||
|
||||
def start(self):
|
||||
self._coordination_id = ":".join([CONF.host, str(uuid.uuid4())])
|
||||
|
||||
if CONF.coordination.backend_url is not None:
|
||||
backend_url = cfg.CONF.coordination.backend_url
|
||||
self._coordinator = tooz.coordination.get_coordinator(
|
||||
backend_url, self._coordination_id)
|
||||
self._coordination_started = False
|
||||
|
||||
self.tg.add_timer(cfg.CONF.coordination.heartbeat_interval,
|
||||
self._coordinator_heartbeat)
|
||||
self.tg.add_timer(cfg.CONF.coordination.run_watchers_interval,
|
||||
self._coordinator_run_watchers)
|
||||
|
||||
else:
|
||||
msg = _LW("No coordination backend configured, distributed "
|
||||
"coordination functionality will be disabled. "
|
||||
"Please configure a coordination backend.")
|
||||
LOG.warn(msg)
|
||||
|
||||
super(CoordinationMixin, self).start()
|
||||
|
||||
if self._coordinator is not None:
|
||||
@ -95,6 +95,8 @@ class CoordinationMixin(object):
|
||||
|
||||
super(CoordinationMixin, self).stop()
|
||||
|
||||
self._coordinator = None
|
||||
|
||||
def _coordinator_heartbeat(self):
|
||||
if not self._coordination_started:
|
||||
return
|
||||
|
@ -29,19 +29,17 @@ NS = 'designate.periodic_tasks'
|
||||
|
||||
|
||||
class Service(coordination.CoordinationMixin, service.Service):
|
||||
def __init__(self, threads=None):
|
||||
super(Service, self).__init__(threads=threads)
|
||||
@property
|
||||
def service_name(self):
|
||||
return 'zone_manager'
|
||||
|
||||
def start(self):
|
||||
super(Service, self).start()
|
||||
|
||||
self._partitioner = coordination.Partitioner(
|
||||
self._coordinator, self.service_name, self._coordination_id,
|
||||
range(0, 4095))
|
||||
|
||||
def _rebalance(self, my_partitions, members, event):
|
||||
LOG.info(_LI("Received rebalance event %s") % event)
|
||||
self.partition_range = my_partitions
|
||||
|
||||
def start(self):
|
||||
super(Service, self).start()
|
||||
self._partitioner.start()
|
||||
self._partitioner.watch_partition_change(self._rebalance)
|
||||
|
||||
@ -57,6 +55,6 @@ class Service(coordination.CoordinationMixin, service.Service):
|
||||
interval = CONF[task.get_canonical_name()].interval
|
||||
self.tg.add_timer(interval, task)
|
||||
|
||||
@property
|
||||
def service_name(self):
|
||||
return 'zone_manager'
|
||||
def _rebalance(self, my_partitions, members, event):
|
||||
LOG.info(_LI("Received rebalance event %s") % event)
|
||||
self.partition_range = my_partitions
|
||||
|
Loading…
Reference in New Issue
Block a user