Merge "[etcd3gw] create new lease if expired."
This commit is contained in:
commit
a10db5d089
@ -0,0 +1,7 @@
|
|||||||
|
---
|
||||||
|
fixes:
|
||||||
|
- |
|
||||||
|
[etcd3gw] Membership lease can be already expired to the moment of lease
|
||||||
|
refreshing. In this case heartbeat will try to refresh expired lease
|
||||||
|
forever without success. The patch checks if lease is expired and creates
|
||||||
|
new one.
|
@ -14,6 +14,7 @@
|
|||||||
|
|
||||||
import base64
|
import base64
|
||||||
import functools
|
import functools
|
||||||
|
import logging
|
||||||
import threading
|
import threading
|
||||||
import uuid
|
import uuid
|
||||||
|
|
||||||
@ -27,6 +28,8 @@ from tooz import coordination
|
|||||||
from tooz import locking
|
from tooz import locking
|
||||||
from tooz import utils
|
from tooz import utils
|
||||||
|
|
||||||
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
def _encode(data):
|
def _encode(data):
|
||||||
"""Safely encode data for consumption of the gateway."""
|
"""Safely encode data for consumption of the gateway."""
|
||||||
@ -218,6 +221,7 @@ class Etcd3Driver(coordination.CoordinationDriverWithExecutor):
|
|||||||
self.membership_timeout = int(options.get(
|
self.membership_timeout = int(options.get(
|
||||||
'membership_timeout', timeout))
|
'membership_timeout', timeout))
|
||||||
self._acquired_locks = set()
|
self._acquired_locks = set()
|
||||||
|
self._membership_lease = None
|
||||||
|
|
||||||
def _start(self):
|
def _start(self):
|
||||||
super(Etcd3Driver, self)._start()
|
super(Etcd3Driver, self)._start()
|
||||||
@ -227,11 +231,15 @@ class Etcd3Driver(coordination.CoordinationDriverWithExecutor):
|
|||||||
return Etcd3Lock(self, name, self.lock_timeout)
|
return Etcd3Lock(self, name, self.lock_timeout)
|
||||||
|
|
||||||
def heartbeat(self):
|
def heartbeat(self):
|
||||||
|
# TODO(kaifeng) use the same lease for locks?
|
||||||
|
if self._membership_lease.refresh() == -1:
|
||||||
|
expired_lease = self._membership_lease.id
|
||||||
|
self._membership_lease = self.client.lease(self.membership_timeout)
|
||||||
|
LOG.debug('Created new lease %s after previous lease %s expired.',
|
||||||
|
self._membership_lease.id, expired_lease)
|
||||||
# NOTE(jaypipes): Copying because set can mutate during iteration
|
# NOTE(jaypipes): Copying because set can mutate during iteration
|
||||||
for lock in self._acquired_locks.copy():
|
for lock in self._acquired_locks.copy():
|
||||||
lock.heartbeat()
|
lock.heartbeat()
|
||||||
# TODO(kaifeng) use the same lease for locks?
|
|
||||||
self._membership_lease.refresh()
|
|
||||||
return min(self.lock_timeout, self.membership_timeout)
|
return min(self.lock_timeout, self.membership_timeout)
|
||||||
|
|
||||||
def watch_join_group(self, group_id, callback):
|
def watch_join_group(self, group_id, callback):
|
||||||
|
Loading…
Reference in New Issue
Block a user