Merge "Lock nodes when nodepool request is fulfilled" into feature/zuulv3
This commit is contained in:
commit
379ac33d2f
|
@ -919,14 +919,45 @@ class FakeNodepool(object):
|
||||||
reqs.append(data)
|
reqs.append(data)
|
||||||
return reqs
|
return reqs
|
||||||
|
|
||||||
|
def makeNode(self, request_id, node_type):
|
||||||
|
now = time.time()
|
||||||
|
path = '/nodepool/nodes/'
|
||||||
|
data = dict(type=node_type,
|
||||||
|
provider='test-provider',
|
||||||
|
region='test-region',
|
||||||
|
az=None,
|
||||||
|
public_ipv4='127.0.0.1',
|
||||||
|
private_ipv4=None,
|
||||||
|
public_ipv6=None,
|
||||||
|
allocated_to=request_id,
|
||||||
|
state='ready',
|
||||||
|
state_time=now,
|
||||||
|
created_time=now,
|
||||||
|
updated_time=now,
|
||||||
|
image_id=None,
|
||||||
|
launcher='fake-nodepool')
|
||||||
|
data = json.dumps(data)
|
||||||
|
path = self.client.create(path, data,
|
||||||
|
makepath=True,
|
||||||
|
sequence=True)
|
||||||
|
nodeid = path.split("/")[-1]
|
||||||
|
return nodeid
|
||||||
|
|
||||||
def fulfillRequest(self, request):
|
def fulfillRequest(self, request):
|
||||||
if request['state'] == 'fulfilled':
|
if request['state'] == 'fulfilled':
|
||||||
return
|
return
|
||||||
request = request.copy()
|
request = request.copy()
|
||||||
request['state'] = 'fulfilled'
|
|
||||||
request['state_time'] = time.time()
|
|
||||||
oid = request['_oid']
|
oid = request['_oid']
|
||||||
del request['_oid']
|
del request['_oid']
|
||||||
|
|
||||||
|
nodes = []
|
||||||
|
for node in request['node_types']:
|
||||||
|
nodeid = self.makeNode(oid, node)
|
||||||
|
nodes.append(nodeid)
|
||||||
|
|
||||||
|
request['state'] = 'fulfilled'
|
||||||
|
request['state_time'] = time.time()
|
||||||
|
request['nodes'] = nodes
|
||||||
path = self.REQUEST_ROOT + '/' + oid
|
path = self.REQUEST_ROOT + '/' + oid
|
||||||
data = json.dumps(request)
|
data = json.dumps(request)
|
||||||
self.log.debug("Fulfilling node request: %s %s" % (oid, data))
|
self.log.debug("Fulfilling node request: %s %s" % (oid, data))
|
||||||
|
|
|
@ -70,6 +70,14 @@ class TestNodepool(BaseTestCase):
|
||||||
self.assertEqual(len(self.provisioned_requests), 1)
|
self.assertEqual(len(self.provisioned_requests), 1)
|
||||||
self.assertEqual(request.state, 'fulfilled')
|
self.assertEqual(request.state, 'fulfilled')
|
||||||
|
|
||||||
|
# Accept the nodes
|
||||||
|
self.nodepool.acceptNodes(request)
|
||||||
|
nodeset = request.nodeset
|
||||||
|
|
||||||
|
for node in nodeset.getNodes():
|
||||||
|
self.assertIsNotNone(node.lock)
|
||||||
|
self.assertEqual(node.state, 'ready')
|
||||||
|
|
||||||
def test_node_request_disconnect(self):
|
def test_node_request_disconnect(self):
|
||||||
# Test that node requests are re-submitted after disconnect
|
# Test that node requests are re-submitted after disconnect
|
||||||
|
|
||||||
|
|
|
@ -615,6 +615,7 @@ class PipelineManager(object):
|
||||||
item.setUnableToMerge()
|
item.setUnableToMerge()
|
||||||
|
|
||||||
def onNodesProvisioned(self, event):
|
def onNodesProvisioned(self, event):
|
||||||
|
# TODOv3(jeblair): handle provisioning failure here
|
||||||
request = event.request
|
request = event.request
|
||||||
build_set = request.build_set
|
build_set = request.build_set
|
||||||
build_set.jobNodeRequestComplete(request.job.name, request,
|
build_set.jobNodeRequestComplete(request.job.name, request,
|
||||||
|
|
|
@ -354,10 +354,36 @@ class Node(object):
|
||||||
self.name = name
|
self.name = name
|
||||||
self.image = image
|
self.image = image
|
||||||
self.id = None
|
self.id = None
|
||||||
|
self.lock = None
|
||||||
|
# Attributes from Nodepool
|
||||||
|
self._state = 'unknown'
|
||||||
|
self.state_time = time.time()
|
||||||
|
self.public_ipv4 = None
|
||||||
|
self.private_ipv4 = None
|
||||||
|
self.public_ipv6 = None
|
||||||
|
|
||||||
|
@property
|
||||||
|
def state(self):
|
||||||
|
return self._state
|
||||||
|
|
||||||
|
@state.setter
|
||||||
|
def state(self, value):
|
||||||
|
# TODOv3(jeblair): reinstate
|
||||||
|
# if value not in STATES:
|
||||||
|
# raise TypeError("'%s' is not a valid state" % value)
|
||||||
|
self._state = value
|
||||||
|
self.state_time = time.time()
|
||||||
|
|
||||||
def __repr__(self):
|
def __repr__(self):
|
||||||
return '<Node %s %s:%s>' % (self.id, self.name, self.image)
|
return '<Node %s %s:%s>' % (self.id, self.name, self.image)
|
||||||
|
|
||||||
|
def updateFromDict(self, data):
|
||||||
|
self._state = data['state']
|
||||||
|
self.state_time = data['state_time']
|
||||||
|
self.public_ipv4 = data.get('public_ipv4')
|
||||||
|
self.private_ipv4 = data.get('private_ipv4')
|
||||||
|
self.public_ipv6 = data.get('public_ipv6')
|
||||||
|
|
||||||
|
|
||||||
class NodeSet(object):
|
class NodeSet(object):
|
||||||
"""A set of nodes.
|
"""A set of nodes.
|
||||||
|
@ -407,6 +433,9 @@ class NodeRequest(object):
|
||||||
self.stat = None
|
self.stat = None
|
||||||
self.uid = uuid4().hex
|
self.uid = uuid4().hex
|
||||||
self.id = None
|
self.id = None
|
||||||
|
# Zuul internal failure flag (not stored in ZK so it's not
|
||||||
|
# overwritten).
|
||||||
|
self.failed = False
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def state(self):
|
def state(self):
|
||||||
|
|
|
@ -41,6 +41,34 @@ class Nodepool(object):
|
||||||
def returnNodes(self, nodes, used=True):
|
def returnNodes(self, nodes, used=True):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
def unlockNodeset(self, nodeset):
|
||||||
|
self._unlockNodes(nodeset.getNodes())
|
||||||
|
|
||||||
|
def _unlockNodes(self, nodes):
|
||||||
|
for node in nodes:
|
||||||
|
try:
|
||||||
|
self.sched.zk.unlockNode(node)
|
||||||
|
except Exception:
|
||||||
|
self.log.exception("Error unlocking node:")
|
||||||
|
|
||||||
|
def lockNodeset(self, nodeset):
|
||||||
|
self._lockNodes(nodeset.getNodes())
|
||||||
|
|
||||||
|
def _lockNodes(self, nodes):
|
||||||
|
# Try to lock all of the supplied nodes. If any lock fails,
|
||||||
|
# try to unlock any which have already been locked before
|
||||||
|
# re-raising the error.
|
||||||
|
locked_nodes = []
|
||||||
|
try:
|
||||||
|
for node in nodes:
|
||||||
|
self.log.debug("Locking node: %s" % (node,))
|
||||||
|
self.sched.zk.lockNode(node)
|
||||||
|
locked_nodes.append(node)
|
||||||
|
except Exception:
|
||||||
|
self.log.exception("Error locking nodes:")
|
||||||
|
self._unlockNodes(locked_nodes)
|
||||||
|
raise
|
||||||
|
|
||||||
def _updateNodeRequest(self, request, deleted):
|
def _updateNodeRequest(self, request, deleted):
|
||||||
# Return False to indicate that we should stop watching the
|
# Return False to indicate that we should stop watching the
|
||||||
# node.
|
# node.
|
||||||
|
@ -50,10 +78,45 @@ class Nodepool(object):
|
||||||
return False
|
return False
|
||||||
|
|
||||||
if request.state == 'fulfilled':
|
if request.state == 'fulfilled':
|
||||||
|
self.log.info("Node request %s fulfilled" % (request,))
|
||||||
|
|
||||||
|
# Give our results to the scheduler.
|
||||||
self.sched.onNodesProvisioned(request)
|
self.sched.onNodesProvisioned(request)
|
||||||
del self.requests[request.uid]
|
del self.requests[request.uid]
|
||||||
|
|
||||||
|
# Stop watching this request node.
|
||||||
return False
|
return False
|
||||||
|
# TODOv3(jeblair): handle allocation failure
|
||||||
elif deleted:
|
elif deleted:
|
||||||
self.log.debug("Resubmitting lost node request %s" % (request,))
|
self.log.debug("Resubmitting lost node request %s" % (request,))
|
||||||
self.sched.zk.submitNodeRequest(request, self._updateNodeRequest)
|
self.sched.zk.submitNodeRequest(request, self._updateNodeRequest)
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
def acceptNodes(self, request):
|
||||||
|
# Called by the scheduler when it wants to accept and lock
|
||||||
|
# nodes for (potential) use.
|
||||||
|
|
||||||
|
self.log.debug("Accepting node request: %s" % (request,))
|
||||||
|
|
||||||
|
# First, try to lock the nodes.
|
||||||
|
locked = False
|
||||||
|
try:
|
||||||
|
self.lockNodeset(request.nodeset)
|
||||||
|
locked = True
|
||||||
|
except Exception:
|
||||||
|
self.log.exception("Error locking nodes:")
|
||||||
|
request.failed = True
|
||||||
|
|
||||||
|
# Regardless of whether locking succeeded, delete the
|
||||||
|
# request.
|
||||||
|
self.log.debug("Deleting node request: %s" % (request,))
|
||||||
|
try:
|
||||||
|
self.sched.zk.deleteNodeRequest(request)
|
||||||
|
except Exception:
|
||||||
|
self.log.exception("Error deleting node request:")
|
||||||
|
request.failed = True
|
||||||
|
# If deleting the request failed, and we did lock the
|
||||||
|
# nodes, unlock the nodes since we're not going to use
|
||||||
|
# them.
|
||||||
|
if locked:
|
||||||
|
self.unlockNodeset(request.nodeset)
|
||||||
|
|
|
@ -800,6 +800,14 @@ class Scheduler(threading.Thread):
|
||||||
def _doNodesProvisionedEvent(self, event):
|
def _doNodesProvisionedEvent(self, event):
|
||||||
request = event.request
|
request = event.request
|
||||||
build_set = request.build_set
|
build_set = request.build_set
|
||||||
|
|
||||||
|
try:
|
||||||
|
self.nodepool.acceptNodes(request)
|
||||||
|
except Exception:
|
||||||
|
self.log.exception("Unable to accept nodes from request %s:"
|
||||||
|
% (request,))
|
||||||
|
return
|
||||||
|
|
||||||
if build_set is not build_set.item.current_build_set:
|
if build_set is not build_set.item.current_build_set:
|
||||||
self.log.warning("Build set %s is not current" % (build_set,))
|
self.log.warning("Build set %s is not current" % (build_set,))
|
||||||
self.nodepool.returnNodes(request.nodes, used=False)
|
self.nodepool.returnNodes(request.nodes, used=False)
|
||||||
|
|
69
zuul/zk.py
69
zuul/zk.py
|
@ -17,6 +17,8 @@ import logging
|
||||||
import six
|
import six
|
||||||
import time
|
import time
|
||||||
from kazoo.client import KazooClient, KazooState
|
from kazoo.client import KazooClient, KazooState
|
||||||
|
from kazoo import exceptions as kze
|
||||||
|
from kazoo.recipe.lock import Lock
|
||||||
|
|
||||||
# States:
|
# States:
|
||||||
# We are building this node but it is not ready for use.
|
# We are building this node but it is not ready for use.
|
||||||
|
@ -29,6 +31,10 @@ DELETING = 'deleting'
|
||||||
STATES = set([BUILDING, READY, DELETING])
|
STATES = set([BUILDING, READY, DELETING])
|
||||||
|
|
||||||
|
|
||||||
|
class LockException(Exception):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
class ZooKeeperConnectionConfig(object):
|
class ZooKeeperConnectionConfig(object):
|
||||||
'''
|
'''
|
||||||
Represents the connection parameters for a ZooKeeper server.
|
Represents the connection parameters for a ZooKeeper server.
|
||||||
|
@ -178,6 +184,7 @@ class ZooKeeper(object):
|
||||||
log = logging.getLogger("zuul.zk.ZooKeeper")
|
log = logging.getLogger("zuul.zk.ZooKeeper")
|
||||||
|
|
||||||
REQUEST_ROOT = '/nodepool/requests'
|
REQUEST_ROOT = '/nodepool/requests'
|
||||||
|
NODE_ROOT = '/nodepool/nodes'
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
'''
|
'''
|
||||||
|
@ -300,7 +307,69 @@ class ZooKeeper(object):
|
||||||
if data:
|
if data:
|
||||||
data = self._strToDict(data)
|
data = self._strToDict(data)
|
||||||
node_request.updateFromDict(data)
|
node_request.updateFromDict(data)
|
||||||
|
request_nodes = node_request.nodeset.getNodes()
|
||||||
|
for i, nodeid in enumerate(data.get('nodes', [])):
|
||||||
|
node_path = '%s/%s' % (self.NODE_ROOT, nodeid)
|
||||||
|
node_data, node_stat = self.client.get(node_path)
|
||||||
|
node_data = self._strToDict(node_data)
|
||||||
|
request_nodes[i].id = nodeid
|
||||||
|
request_nodes[i].updateFromDict(node_data)
|
||||||
deleted = (data is None) # data *are* none
|
deleted = (data is None) # data *are* none
|
||||||
return watcher(node_request, deleted)
|
return watcher(node_request, deleted)
|
||||||
|
|
||||||
self.client.DataWatch(path, callback)
|
self.client.DataWatch(path, callback)
|
||||||
|
|
||||||
|
def deleteNodeRequest(self, node_request):
|
||||||
|
'''
|
||||||
|
Delete a request for nodes.
|
||||||
|
|
||||||
|
:param NodeRequest node_request: A NodeRequest with the
|
||||||
|
contents of the request.
|
||||||
|
'''
|
||||||
|
|
||||||
|
path = '%s/%s' % (self.REQUEST_ROOT, node_request.id)
|
||||||
|
try:
|
||||||
|
self.client.delete(path)
|
||||||
|
except kze.NoNodeError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
def lockNode(self, node, blocking=True, timeout=None):
|
||||||
|
'''
|
||||||
|
Lock a node.
|
||||||
|
|
||||||
|
This should be called as soon as a request is fulfilled and
|
||||||
|
the lock held for as long as the node is in-use. It can be
|
||||||
|
used by nodepool to detect if Zuul has gone offline and the
|
||||||
|
node should be reclaimed.
|
||||||
|
|
||||||
|
:param Node node: The node which should be locked.
|
||||||
|
'''
|
||||||
|
|
||||||
|
lock_path = '%s/%s/lock' % (self.NODE_ROOT, node.id)
|
||||||
|
try:
|
||||||
|
lock = Lock(self.client, lock_path)
|
||||||
|
have_lock = lock.acquire(blocking, timeout)
|
||||||
|
except kze.LockTimeout:
|
||||||
|
raise LockException(
|
||||||
|
"Timeout trying to acquire lock %s" % lock_path)
|
||||||
|
|
||||||
|
# If we aren't blocking, it's possible we didn't get the lock
|
||||||
|
# because someone else has it.
|
||||||
|
if not have_lock:
|
||||||
|
raise LockException("Did not get lock on %s" % lock_path)
|
||||||
|
|
||||||
|
node.lock = lock
|
||||||
|
|
||||||
|
def unlockNode(self, node):
|
||||||
|
'''
|
||||||
|
Unlock a node.
|
||||||
|
|
||||||
|
The node must already have been locked.
|
||||||
|
|
||||||
|
:param Node node: The node which should be unlocked.
|
||||||
|
'''
|
||||||
|
|
||||||
|
if node.lock is None:
|
||||||
|
raise LockException("Node %s does not hold a lock" % (node,))
|
||||||
|
node.lock.release()
|
||||||
|
node.lock = None
|
||||||
|
|
Loading…
Reference in New Issue