Update node during lockNode

After locking a node we most of the time update a node to double check
if the state changed on us. This is important to not operate on stale
data as we only have a guarantee that the node data is updated if we
update the data after getting the lock (regardless if we operate on
cached data or not).. However it is cumbersome to do this after every
lockNode call so doing this centrally reduces the risk to forget that.

This does not introduce any behavior change.

Change-Id: I06001e487041a3b67d969070b43f491c4ed3dce0
This commit is contained in:
Tobias Henkel 2018-11-08 09:09:49 +01:00
parent 3cb6e7f0c9
commit b6a3e319e7
No known key found for this signature in database
GPG Key ID: 03750DEC158E5FA2
4 changed files with 87 additions and 61 deletions

View File

@ -173,15 +173,14 @@ class StaticNodeProvider(Provider):
if original_attrs == new_attrs: if original_attrs == new_attrs:
continue continue
node.type = static_node["labels"]
node.username = static_node["username"]
node.connection_port = static_node["connection-port"]
node.connection_type = static_node["connection-type"]
nodeutils.set_node_ip(node)
node.host_keys = host_keys
try: try:
self.zk.lockNode(node, blocking=False) self.zk.lockNode(node, blocking=False)
node.type = static_node["labels"]
node.username = static_node["username"]
node.connection_port = static_node["connection-port"]
node.connection_type = static_node["connection-type"]
nodeutils.set_node_ip(node)
node.host_keys = host_keys
except exceptions.ZKLockException: except exceptions.ZKLockException:
self.log.warning("Unable to lock node %s for update", node.id) self.log.warning("Unable to lock node %s for update", node.id)
continue continue

View File

@ -510,10 +510,8 @@ class CleanupWorker(BaseCleanupWorker):
continue continue
# Double check the state now that we have a lock since it # Double check the state now that we have a lock since it
# may have changed on us. We keep using the original node # may have changed on us.
# since it's holding the lock. if node.state != zk.READY:
_node = zk_conn.getNode(node.id)
if _node.state != zk.READY:
zk_conn.unlockNode(node) zk_conn.unlockNode(node)
continue continue
@ -564,10 +562,8 @@ class CleanupWorker(BaseCleanupWorker):
continue continue
# Double check the state now that we have a lock since it # Double check the state now that we have a lock since it
# may have changed on us. We keep using the original node # may have changed on us.
# since it's holding the lock. if node.state != zk.HOLD:
_node = zk_conn.getNode(node.id)
if _node.state != zk.HOLD:
zk_conn.unlockNode(node) zk_conn.unlockNode(node)
continue continue
@ -677,10 +673,8 @@ class DeletedNodeWorker(BaseCleanupWorker):
continue continue
# Double check the state now that we have a lock since it # Double check the state now that we have a lock since it
# may have changed on us. We keep using the original node # may have changed on us.
# since it's holding the lock. if node.state not in cleanup_states:
_node = zk_conn.getNode(node.id)
if _node.state not in cleanup_states:
zk_conn.unlockNode(node) zk_conn.unlockNode(node)
continue continue

View File

@ -860,9 +860,9 @@ class TestLauncher(tests.DBTestCase):
node = nodes[0] node = nodes[0]
self.log.debug("Holding node %s..." % node.id) self.log.debug("Holding node %s..." % node.id)
# hold the node # hold the node
self.zk.lockNode(node, blocking=False)
node.state = zk.HOLD node.state = zk.HOLD
node.comment = 'testing' node.comment = 'testing'
self.zk.lockNode(node, blocking=False)
self.zk.storeNode(node) self.zk.storeNode(node)
self.zk.unlockNode(node) self.zk.unlockNode(node)
znode = self.zk.getNode(node.id) znode = self.zk.getNode(node.id)
@ -885,10 +885,10 @@ class TestLauncher(tests.DBTestCase):
node = nodes[0] node = nodes[0]
self.log.debug("Holding node %s..." % node.id) self.log.debug("Holding node %s..." % node.id)
# hold the node # hold the node
self.zk.lockNode(node, blocking=False)
node.state = zk.HOLD node.state = zk.HOLD
node.comment = 'testing' node.comment = 'testing'
node.hold_expiration = 1 node.hold_expiration = 1
self.zk.lockNode(node, blocking=False)
self.zk.storeNode(node) self.zk.storeNode(node)
self.zk.unlockNode(node) self.zk.unlockNode(node)
znode = self.zk.getNode(node.id) znode = self.zk.getNode(node.id)
@ -911,10 +911,10 @@ class TestLauncher(tests.DBTestCase):
node = nodes[0] node = nodes[0]
self.log.debug("Holding node %s..." % node.id) self.log.debug("Holding node %s..." % node.id)
# hold the node # hold the node
self.zk.lockNode(node, blocking=False)
node.state = zk.HOLD node.state = zk.HOLD
node.comment = 'testing' node.comment = 'testing'
node.hold_expiration = '1' node.hold_expiration = '1'
self.zk.lockNode(node, blocking=False)
self.zk.storeNode(node) self.zk.storeNode(node)
self.zk.unlockNode(node) self.zk.unlockNode(node)
znode = self.zk.getNode(node.id) znode = self.zk.getNode(node.id)
@ -936,10 +936,10 @@ class TestLauncher(tests.DBTestCase):
node = nodes[0] node = nodes[0]
self.log.debug("Holding node %s..." % node.id) self.log.debug("Holding node %s..." % node.id)
# hold the node # hold the node
self.zk.lockNode(node, blocking=False)
node.state = zk.HOLD node.state = zk.HOLD
node.comment = 'testing' node.comment = 'testing'
node.hold_expiration = 'notanumber' node.hold_expiration = 'notanumber'
self.zk.lockNode(node, blocking=False)
self.zk.storeNode(node) self.zk.storeNode(node)
self.zk.unlockNode(node) self.zk.unlockNode(node)
znode = self.zk.getNode(node.id) znode = self.zk.getNode(node.id)
@ -965,15 +965,15 @@ class TestLauncher(tests.DBTestCase):
self.log.debug("Holding node %s...(%s seconds)" % (node_custom.id, self.log.debug("Holding node %s...(%s seconds)" % (node_custom.id,
hold_expiration)) hold_expiration))
# hold the nodes # hold the nodes
self.zk.lockNode(node, blocking=False)
node.state = zk.HOLD node.state = zk.HOLD
node.comment = 'testing' node.comment = 'testing'
node_custom.state = zk.HOLD
node_custom.comment = 'testing hold_expiration'
node_custom.hold_expiration = hold_expiration
self.zk.lockNode(node, blocking=False)
self.zk.storeNode(node) self.zk.storeNode(node)
self.zk.unlockNode(node) self.zk.unlockNode(node)
self.zk.lockNode(node_custom, blocking=False) self.zk.lockNode(node_custom, blocking=False)
node_custom.state = zk.HOLD
node_custom.comment = 'testing hold_expiration'
node_custom.hold_expiration = hold_expiration
self.zk.storeNode(node_custom) self.zk.storeNode(node_custom)
self.zk.unlockNode(node_custom) self.zk.unlockNode(node_custom)
znode = self.zk.getNode(node.id) znode = self.zk.getNode(node.id)
@ -1010,15 +1010,15 @@ class TestLauncher(tests.DBTestCase):
self.log.debug("Holding node %s...(%s seconds)" % (node_custom.id, self.log.debug("Holding node %s...(%s seconds)" % (node_custom.id,
hold_expiration)) hold_expiration))
# hold the nodes # hold the nodes
self.zk.lockNode(node, blocking=False)
node.state = zk.HOLD node.state = zk.HOLD
node.comment = 'testing' node.comment = 'testing'
node_custom.state = zk.HOLD
node_custom.comment = 'testing hold_expiration'
node_custom.hold_expiration = hold_expiration
self.zk.lockNode(node, blocking=False)
self.zk.storeNode(node) self.zk.storeNode(node)
self.zk.unlockNode(node) self.zk.unlockNode(node)
self.zk.lockNode(node_custom, blocking=False) self.zk.lockNode(node_custom, blocking=False)
node_custom.state = zk.HOLD
node_custom.comment = 'testing hold_expiration'
node_custom.hold_expiration = hold_expiration
self.zk.storeNode(node_custom) self.zk.storeNode(node_custom)
self.zk.unlockNode(node_custom) self.zk.unlockNode(node_custom)
znode = self.zk.getNode(node.id) znode = self.zk.getNode(node.id)

View File

@ -613,43 +613,53 @@ class Node(BaseModel):
''' '''
o = Node(o_id) o = Node(o_id)
super(Node, o).fromDict(d) super(Node, o).fromDict(d)
o.cloud = d.get('cloud')
o.provider = d.get('provider') o.updateFromDict(d)
o.pool = d.get('pool') return o
o.type = d.get('type')
o.allocated_to = d.get('allocated_to') def updateFromDict(self, d):
o.az = d.get('az') '''
o.region = d.get('region') Updates the Node object from a dictionary
o.public_ipv4 = d.get('public_ipv4')
o.private_ipv4 = d.get('private_ipv4') :param dict d: The dictionary
o.public_ipv6 = d.get('public_ipv6') '''
o.interface_ip = d.get('interface_ip') super().fromDict(d)
o.connection_port = d.get('connection_port', d.get('ssh_port', 22)) self.cloud = d.get('cloud')
o.image_id = d.get('image_id') self.provider = d.get('provider')
o.launcher = d.get('launcher') self.pool = d.get('pool')
o.created_time = d.get('created_time') self.type = d.get('type')
o.external_id = d.get('external_id') self.allocated_to = d.get('allocated_to')
o.hostname = d.get('hostname') self.az = d.get('az')
o.comment = d.get('comment') self.region = d.get('region')
o.hold_job = d.get('hold_job') self.public_ipv4 = d.get('public_ipv4')
o.username = d.get('username', 'zuul') self.private_ipv4 = d.get('private_ipv4')
o.connection_type = d.get('connection_type') self.public_ipv6 = d.get('public_ipv6')
o.host_keys = d.get('host_keys', []) self.interface_ip = d.get('interface_ip')
self.connection_port = d.get('connection_port', d.get('ssh_port', 22))
self.image_id = d.get('image_id')
self.launcher = d.get('launcher')
self.created_time = d.get('created_time')
self.external_id = d.get('external_id')
self.hostname = d.get('hostname')
self.comment = d.get('comment')
self.hold_job = d.get('hold_job')
self.username = d.get('username', 'zuul')
self.connection_type = d.get('connection_type')
self.host_keys = d.get('host_keys', [])
hold_expiration = d.get('hold_expiration') hold_expiration = d.get('hold_expiration')
if hold_expiration is not None: if hold_expiration is not None:
try: try:
# We try to force this to an integer value because we do # We try to force this to an integer value because we do
# relative second based age comparisons using this value # relative second based age comparisons using this value
# and those need to be a number type. # and those need to be a number type.
o.hold_expiration = int(hold_expiration) self.hold_expiration = int(hold_expiration)
except ValueError: except ValueError:
# Coercion to int failed, just use default of 0, # Coercion to int failed, just use default of 0,
# which means no expiration # which means no expiration
o.hold_expiration = 0 self.hold_expiration = 0
else: else:
o.hold_expiration = hold_expiration self.hold_expiration = hold_expiration
o.resources = d.get('resources') self.resources = d.get('resources')
return o
class ZooKeeper(object): class ZooKeeper(object):
@ -1650,7 +1660,9 @@ class ZooKeeper(object):
Lock a node. Lock a node.
This will set the `lock` attribute of the Node object when the This will set the `lock` attribute of the Node object when the
lock is successfully acquired. lock is successfully acquired. Also this will update the node with the
latest data after acquiring the lock in order to guarantee that it has
the latest state if locking was successful.
:param Node node: The node to lock. :param Node node: The node to lock.
:param bool blocking: Whether or not to block on trying to :param bool blocking: Whether or not to block on trying to
@ -1680,6 +1692,9 @@ class ZooKeeper(object):
node.lock = lock node.lock = lock
# Do an in-place update of the node so we have the latest data.
self.updateNode(node)
def unlockNode(self, node): def unlockNode(self, node):
''' '''
Unlock a node. Unlock a node.
@ -1743,6 +1758,25 @@ class ZooKeeper(object):
d.stat = stat d.stat = stat
return d return d
def updateNode(self, node):
'''
Update the data of a node object in-place
:param node: The node object
'''
path = self._nodePath(node.id)
data, stat = self.client.get(path)
if data:
d = self._bytesToDict(data)
else:
# The node exists but has no data so use empty dict.
d = {}
node.updateFromDict(d)
node.stat = stat
def storeNode(self, node): def storeNode(self, node):
''' '''
Store an new or existing node. Store an new or existing node.
@ -1855,8 +1889,7 @@ class ZooKeeper(object):
continue continue
# Make sure the state didn't change on us # Make sure the state didn't change on us
n = self.getNode(node.id) if node.state != READY:
if n.state != READY:
self.unlockNode(node) self.unlockNode(node)
continue continue