Merge "Update node request during locking"
This commit is contained in:
commit
00a81d63dc
|
@ -200,8 +200,7 @@ class PoolWorker(threading.Thread):
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# Make sure the state didn't change on us after getting the lock
|
# Make sure the state didn't change on us after getting the lock
|
||||||
req2 = self.zk.getNodeRequest(req_id)
|
if req.state != zk.REQUESTED:
|
||||||
if req2 and req2.state != zk.REQUESTED:
|
|
||||||
self.zk.unlockNodeRequest(req)
|
self.zk.unlockNodeRequest(req)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
|
|
@ -475,13 +475,17 @@ class NodeRequest(BaseModel):
|
||||||
'''
|
'''
|
||||||
o = NodeRequest(o_id)
|
o = NodeRequest(o_id)
|
||||||
super(NodeRequest, o).fromDict(d)
|
super(NodeRequest, o).fromDict(d)
|
||||||
o.declined_by = d.get('declined_by', [])
|
o.updateFromDict(d)
|
||||||
o.node_types = d.get('node_types', [])
|
|
||||||
o.nodes = d.get('nodes', [])
|
|
||||||
o.reuse = d.get('reuse', True)
|
|
||||||
o.requestor = d.get('requestor')
|
|
||||||
return o
|
return o
|
||||||
|
|
||||||
|
def updateFromDict(self, d):
|
||||||
|
super().fromDict(d)
|
||||||
|
self.declined_by = d.get('declined_by', [])
|
||||||
|
self.node_types = d.get('node_types', [])
|
||||||
|
self.nodes = d.get('nodes', [])
|
||||||
|
self.reuse = d.get('reuse', True)
|
||||||
|
self.requestor = d.get('requestor')
|
||||||
|
|
||||||
|
|
||||||
class Node(BaseModel):
|
class Node(BaseModel):
|
||||||
'''
|
'''
|
||||||
|
@ -1586,6 +1590,24 @@ class ZooKeeper(object):
|
||||||
d.stat = stat
|
d.stat = stat
|
||||||
return d
|
return d
|
||||||
|
|
||||||
|
def updateNodeRequest(self, request):
|
||||||
|
'''
|
||||||
|
Update the data of a node request object in-place
|
||||||
|
|
||||||
|
:param request: the node request object to update
|
||||||
|
'''
|
||||||
|
|
||||||
|
path = self._requestPath(request.id)
|
||||||
|
data, stat = self.client.get(path)
|
||||||
|
|
||||||
|
if data:
|
||||||
|
d = self._bytesToDict(data)
|
||||||
|
else:
|
||||||
|
d = {}
|
||||||
|
|
||||||
|
request.updateFromDict(d)
|
||||||
|
request.stat = stat
|
||||||
|
|
||||||
def storeNodeRequest(self, request, priority="100"):
|
def storeNodeRequest(self, request, priority="100"):
|
||||||
'''
|
'''
|
||||||
Store a new or existing node request.
|
Store a new or existing node request.
|
||||||
|
@ -1632,7 +1654,9 @@ class ZooKeeper(object):
|
||||||
Lock a node request.
|
Lock a node request.
|
||||||
|
|
||||||
This will set the `lock` attribute of the request object when the
|
This will set the `lock` attribute of the request object when the
|
||||||
lock is successfully acquired.
|
lock is successfully acquired. Also this will update the node request
|
||||||
|
with the latest data after acquiring the lock in order to guarantee
|
||||||
|
that it has the latest state if locking was successful.
|
||||||
|
|
||||||
:param NodeRequest request: The request to lock.
|
:param NodeRequest request: The request to lock.
|
||||||
:param bool blocking: Whether or not to block on trying to
|
:param bool blocking: Whether or not to block on trying to
|
||||||
|
@ -1662,6 +1686,9 @@ class ZooKeeper(object):
|
||||||
|
|
||||||
request.lock = lock
|
request.lock = lock
|
||||||
|
|
||||||
|
# Do an in-place update of the node request so we have the latest data
|
||||||
|
self.updateNodeRequest(request)
|
||||||
|
|
||||||
def unlockNodeRequest(self, request):
|
def unlockNodeRequest(self, request):
|
||||||
'''
|
'''
|
||||||
Unlock a node request.
|
Unlock a node request.
|
||||||
|
|
Loading…
Reference in New Issue