Merge "Fix race in metastatic allocation"
This commit is contained in:
commit
2edef78a87
|
@ -270,6 +270,10 @@ class MetastaticAdapter(statemachine.Adapter):
|
||||||
# be well after this point.
|
# be well after this point.
|
||||||
self.performed_init = False
|
self.performed_init = False
|
||||||
self.init_lock = threading.Lock()
|
self.init_lock = threading.Lock()
|
||||||
|
# Allocation of new nodes to backing nodes happens in one
|
||||||
|
# thread while cleanup of unused backing nodes happens in
|
||||||
|
# another. Use a lock to serialize updates between the two.
|
||||||
|
self.allocation_lock = threading.Lock()
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def zk(self):
|
def zk(self):
|
||||||
|
@ -290,26 +294,27 @@ class MetastaticAdapter(statemachine.Adapter):
|
||||||
# Since this is called periodically, this is a good place to
|
# Since this is called periodically, this is a good place to
|
||||||
# see about deleting unused backing nodes.
|
# see about deleting unused backing nodes.
|
||||||
now = time.time()
|
now = time.time()
|
||||||
for label_name, backing_node_records in \
|
with self.allocation_lock:
|
||||||
self.backing_node_records.items():
|
for label_name, backing_node_records in \
|
||||||
for bnr in backing_node_records[:]:
|
self.backing_node_records.items():
|
||||||
label_config = self.provider._getLabel(bnr.label_name)
|
for bnr in backing_node_records[:]:
|
||||||
if label_config:
|
label_config = self.provider._getLabel(bnr.label_name)
|
||||||
grace_time = label_config.grace_time
|
if label_config:
|
||||||
else:
|
grace_time = label_config.grace_time
|
||||||
# The label doesn't exist in our config any more,
|
else:
|
||||||
# it must have been removed.
|
# The label doesn't exist in our config any more,
|
||||||
grace_time = 0
|
# it must have been removed.
|
||||||
if (bnr.isEmpty() and
|
grace_time = 0
|
||||||
now - bnr.last_used > grace_time):
|
if (bnr.isEmpty() and
|
||||||
self.log.info("Backing node %s has been idle for "
|
now - bnr.last_used > grace_time):
|
||||||
"%s seconds, releasing",
|
self.log.info("Backing node %s has been idle for "
|
||||||
bnr.node_id, now - bnr.last_used)
|
"%s seconds, releasing",
|
||||||
node = self._getNode(bnr.node_id)
|
bnr.node_id, now - bnr.last_used)
|
||||||
node.state = zk.USED
|
node = self._getNode(bnr.node_id)
|
||||||
self.zk.storeNode(node)
|
node.state = zk.USED
|
||||||
self.zk.forceUnlockNode(node)
|
self.zk.storeNode(node)
|
||||||
backing_node_records.remove(bnr)
|
self.zk.forceUnlockNode(node)
|
||||||
|
backing_node_records.remove(bnr)
|
||||||
return []
|
return []
|
||||||
|
|
||||||
def deleteResource(self, resource):
|
def deleteResource(self, resource):
|
||||||
|
@ -383,82 +388,88 @@ class MetastaticAdapter(statemachine.Adapter):
|
||||||
self._init()
|
self._init()
|
||||||
# if we have room for the label, allocate and return existing slot
|
# if we have room for the label, allocate and return existing slot
|
||||||
# otherwise, make a new backing node
|
# otherwise, make a new backing node
|
||||||
backing_node_record = None
|
with self.allocation_lock:
|
||||||
for bnr in self.backing_node_records.get(label.name, []):
|
backing_node_record = None
|
||||||
if bnr.hasAvailableSlot():
|
for bnr in self.backing_node_records.get(label.name, []):
|
||||||
backing_node_record = bnr
|
if bnr.hasAvailableSlot():
|
||||||
break
|
backing_node_record = bnr
|
||||||
if backing_node_record is None:
|
break
|
||||||
req = zk.NodeRequest()
|
if backing_node_record is None:
|
||||||
req.node_types = [label.backing_label]
|
req = zk.NodeRequest()
|
||||||
req.state = zk.REQUESTED
|
req.node_types = [label.backing_label]
|
||||||
req.requestor = self.my_id
|
req.state = zk.REQUESTED
|
||||||
self.zk.storeNodeRequest(req, priority='100')
|
req.requestor = self.my_id
|
||||||
backing_node_record = BackingNodeRecord(
|
self.zk.storeNodeRequest(req, priority='100')
|
||||||
label.name, label.max_parallel_jobs)
|
backing_node_record = BackingNodeRecord(
|
||||||
backing_node_record.request_id = req.id
|
label.name, label.max_parallel_jobs)
|
||||||
self._addBackingNode(label.name, backing_node_record)
|
backing_node_record.request_id = req.id
|
||||||
backing_node_log = (backing_node_record.node_id or
|
self._addBackingNode(label.name, backing_node_record)
|
||||||
f'request {backing_node_record.request_id}')
|
backing_node_log = (backing_node_record.node_id or
|
||||||
slot = backing_node_record.allocateSlot(node_id)
|
f'request {backing_node_record.request_id}')
|
||||||
self.log.info("Assigned node %s to backing node %s slot %s",
|
slot = backing_node_record.allocateSlot(node_id)
|
||||||
node_id, backing_node_log, slot)
|
self.log.info("Assigned node %s to backing node %s slot %s",
|
||||||
return backing_node_record, slot
|
node_id, backing_node_log, slot)
|
||||||
|
return backing_node_record, slot
|
||||||
|
|
||||||
def _addBackingNode(self, label_name, backing_node_record):
|
def _addBackingNode(self, label_name, backing_node_record):
|
||||||
|
# We hold the allocation lock already
|
||||||
nodelist = self.backing_node_records.setdefault(label_name, [])
|
nodelist = self.backing_node_records.setdefault(label_name, [])
|
||||||
nodelist.append(backing_node_record)
|
nodelist.append(backing_node_record)
|
||||||
|
|
||||||
def _deallocateBackingNode(self, node_id):
|
def _deallocateBackingNode(self, node_id):
|
||||||
self._init()
|
self._init()
|
||||||
for label_name, backing_node_records in \
|
with self.allocation_lock:
|
||||||
self.backing_node_records.items():
|
for label_name, backing_node_records in \
|
||||||
for bn in backing_node_records:
|
self.backing_node_records.items():
|
||||||
if bn.backsNode(node_id):
|
for bn in backing_node_records:
|
||||||
slot = bn.deallocateSlot(node_id)
|
if bn.backsNode(node_id):
|
||||||
self.log.info(
|
slot = bn.deallocateSlot(node_id)
|
||||||
"Unassigned node %s from backing node %s slot %s",
|
self.log.info(
|
||||||
node_id, bn.node_id, slot)
|
"Unassigned node %s from backing node %s slot %s",
|
||||||
return
|
node_id, bn.node_id, slot)
|
||||||
|
return
|
||||||
|
|
||||||
def _checkBackingNodeRequests(self):
|
def _checkBackingNodeRequests(self):
|
||||||
self._init()
|
self._init()
|
||||||
waiting_requests = {}
|
with self.allocation_lock:
|
||||||
for label_name, backing_node_records in \
|
waiting_requests = {}
|
||||||
self.backing_node_records.items():
|
for label_name, backing_node_records in \
|
||||||
for bnr in backing_node_records:
|
self.backing_node_records.items():
|
||||||
if bnr.request_id:
|
for bnr in backing_node_records:
|
||||||
waiting_requests[bnr.request_id] = bnr
|
if bnr.request_id:
|
||||||
if not waiting_requests:
|
waiting_requests[bnr.request_id] = bnr
|
||||||
return
|
if not waiting_requests:
|
||||||
for request in self.zk.nodeRequestIterator():
|
return
|
||||||
if request.id not in waiting_requests:
|
for request in self.zk.nodeRequestIterator():
|
||||||
continue
|
if request.id not in waiting_requests:
|
||||||
if request.state == zk.FAILED:
|
continue
|
||||||
self.log.error("Backing request %s failed", request.id)
|
if request.state == zk.FAILED:
|
||||||
for label_name, records in self.backing_node_records.items():
|
self.log.error("Backing request %s failed", request.id)
|
||||||
for bnr in records[:]:
|
for label_name, records in \
|
||||||
if bnr.request_id == request.id:
|
self.backing_node_records.items():
|
||||||
bnr.failed = True
|
for bnr in records[:]:
|
||||||
records.remove(bnr)
|
if bnr.request_id == request.id:
|
||||||
if request.state == zk.FULFILLED:
|
bnr.failed = True
|
||||||
bnr = waiting_requests[request.id]
|
records.remove(bnr)
|
||||||
node_id = request.nodes[0]
|
if request.state == zk.FULFILLED:
|
||||||
self.log.info("Backing request %s fulfilled with node id %s",
|
bnr = waiting_requests[request.id]
|
||||||
request.id, node_id)
|
node_id = request.nodes[0]
|
||||||
node = self._getNode(node_id)
|
self.log.info(
|
||||||
self.zk.lockNode(node, blocking=True, timeout=30,
|
"Backing request %s fulfilled with node id %s",
|
||||||
ephemeral=False, identifier=self.my_id)
|
request.id, node_id)
|
||||||
node.user_data = json.dumps({
|
node = self._getNode(node_id)
|
||||||
'owner': self.my_id,
|
self.zk.lockNode(node, blocking=True, timeout=30,
|
||||||
'label': bnr.label_name,
|
ephemeral=False, identifier=self.my_id)
|
||||||
'slots': bnr.slot_count,
|
node.user_data = json.dumps({
|
||||||
})
|
'owner': self.my_id,
|
||||||
node.state = zk.IN_USE
|
'label': bnr.label_name,
|
||||||
self.zk.storeNode(node)
|
'slots': bnr.slot_count,
|
||||||
self.zk.deleteNodeRequest(request)
|
})
|
||||||
bnr.request_id = None
|
node.state = zk.IN_USE
|
||||||
bnr.node_id = node_id
|
self.zk.storeNode(node)
|
||||||
|
self.zk.deleteNodeRequest(request)
|
||||||
|
bnr.request_id = None
|
||||||
|
bnr.node_id = node_id
|
||||||
|
|
||||||
def _getNode(self, node_id):
|
def _getNode(self, node_id):
|
||||||
return self.zk.getNode(node_id)
|
return self.zk.getNode(node_id)
|
||||||
|
|
Loading…
Reference in New Issue