Merge "Fix race in component re-registration"
This commit is contained in:
@@ -13,6 +13,7 @@
|
||||
# under the License.
|
||||
import json
|
||||
import logging
|
||||
import threading
|
||||
from collections import defaultdict
|
||||
|
||||
from kazoo.exceptions import NoNodeError
|
||||
@@ -62,6 +63,7 @@ class BaseComponent(ZooKeeperBase):
|
||||
|
||||
self.path = None
|
||||
self._zstat = None
|
||||
self.register_lock = threading.Lock()
|
||||
|
||||
def __getattr__(self, name):
|
||||
try:
|
||||
@@ -78,36 +80,38 @@ class BaseComponent(ZooKeeperBase):
|
||||
# Set the value in the local content dict
|
||||
self.content[name] = value
|
||||
|
||||
if not self.path:
|
||||
self.log.error(
|
||||
"Path is not set on this component, did you forget to call "
|
||||
"register()?"
|
||||
)
|
||||
return
|
||||
with self.register_lock:
|
||||
if not self.path:
|
||||
self.log.error(
|
||||
"Path is not set on this component, did you forget "
|
||||
"to call register()?"
|
||||
)
|
||||
return
|
||||
|
||||
# Update the ZooKeeper node
|
||||
content = json.dumps(self.content).encode("utf-8")
|
||||
try:
|
||||
zstat = self.kazoo_client.set(
|
||||
self.path, content, version=self._zstat.version
|
||||
)
|
||||
self._zstat = zstat
|
||||
except NoNodeError:
|
||||
self.log.error("Could not update %s in ZooKeeper", self)
|
||||
# Update the ZooKeeper node
|
||||
content = json.dumps(self.content).encode("utf-8")
|
||||
try:
|
||||
zstat = self.kazoo_client.set(
|
||||
self.path, content, version=self._zstat.version
|
||||
)
|
||||
self._zstat = zstat
|
||||
except NoNodeError:
|
||||
self.log.error("Could not update %s in ZooKeeper", self)
|
||||
|
||||
def register(self):
|
||||
path = "/".join([COMPONENTS_ROOT, self.kind, self.hostname])
|
||||
self.log.info("Registering component in ZooKeeper %s", path)
|
||||
self.path, self._zstat = self.kazoo_client.create(
|
||||
path,
|
||||
json.dumps(self.content).encode("utf-8"),
|
||||
makepath=True,
|
||||
ephemeral=True,
|
||||
sequence=True,
|
||||
# Also return the zstat, which is necessary to successfully update
|
||||
# the component.
|
||||
include_data=True,
|
||||
)
|
||||
with self.register_lock:
|
||||
path = "/".join([COMPONENTS_ROOT, self.kind, self.hostname])
|
||||
self.log.info("Registering component in ZooKeeper %s", path)
|
||||
self.path, self._zstat = self.kazoo_client.create(
|
||||
path,
|
||||
json.dumps(self.content).encode("utf-8"),
|
||||
makepath=True,
|
||||
ephemeral=True,
|
||||
sequence=True,
|
||||
# Also return the zstat, which is necessary to successfully
|
||||
# update the component.
|
||||
include_data=True,
|
||||
)
|
||||
|
||||
def _onReconnect(self):
|
||||
self.register()
|
||||
|
||||
Reference in New Issue
Block a user