Add FakeNodepool test fixture

Add a fake nodepool that immediately successfully fulfills all
requests, but actually uses the Nodepool ZooKeeper API.

Update the Zuul Nodepool facade to use the Nodepool ZooKeeper API.

Change-Id: If7859f0c6531439c3be38cc6ca6b699b3b5eade2
This commit is contained in:
James E. Blair 2016-12-20 16:45:32 -08:00
parent 498059ba28
commit dce6ceac8e
5 changed files with 465 additions and 31 deletions

View File

@ -41,6 +41,7 @@ import git
import gear
import fixtures
import kazoo.client
import kazoo.exceptions
import statsd
import testtools
from git.exc import NoSuchPathError
@ -64,6 +65,7 @@ import zuul.source.gerrit
import zuul.trigger.gerrit
import zuul.trigger.timer
import zuul.trigger.zuultrigger
import zuul.zk
FIXTURE_DIR = os.path.join(os.path.dirname(__file__),
'fixtures')
@ -871,6 +873,63 @@ class FakeSwiftClientConnection(swiftclient.client.Connection):
return endpoint, ''
class FakeNodepool(object):
REQUEST_ROOT = '/nodepool/requests'
log = logging.getLogger("zuul.test.FakeNodepool")
def __init__(self, host, port, chroot):
self.client = kazoo.client.KazooClient(
hosts='%s:%s%s' % (host, port, chroot))
self.client.start()
self._running = True
self.thread = threading.Thread(target=self.run)
self.thread.daemon = True
self.thread.start()
def stop(self):
self._running = False
self.thread.join()
self.client.stop()
self.client.close()
def run(self):
while self._running:
self._run()
time.sleep(0.1)
def _run(self):
for req in self.getNodeRequests():
self.fulfillRequest(req)
def getNodeRequests(self):
try:
reqids = self.client.get_children(self.REQUEST_ROOT)
except kazoo.exceptions.NoNodeError:
return []
reqs = []
for oid in sorted(reqids):
path = self.REQUEST_ROOT + '/' + oid
data, stat = self.client.get(path)
data = json.loads(data)
data['_oid'] = oid
reqs.append(data)
return reqs
def fulfillRequest(self, request):
if request['state'] == 'fulfilled':
return
request = request.copy()
request['state'] = 'fulfilled'
request['state_time'] = time.time()
oid = request['_oid']
del request['_oid']
path = self.REQUEST_ROOT + '/' + oid
data = json.dumps(request)
self.log.debug("Fulfilling node request: %s %s" % (oid, data))
self.client.set(path, data)
class ChrootedKazooFixture(fixtures.Fixture):
def __init__(self):
super(ChrootedKazooFixture, self).__init__()
@ -962,27 +1021,29 @@ class BaseTestCase(testtools.TestCase):
format='%(asctime)s %(name)-32s '
'%(levelname)-8s %(message)s'))
# NOTE(notmorgan): Extract logging overrides for specific libraries
# from the OS_LOG_DEFAULTS env and create FakeLogger fixtures for
# each. This is used to limit the output during test runs from
# libraries that zuul depends on such as gear.
log_defaults_from_env = os.environ.get('OS_LOG_DEFAULTS')
# NOTE(notmorgan): Extract logging overrides for specific libraries
# from the OS_LOG_DEFAULTS env and create FakeLogger fixtures for
# each. This is used to limit the output during test runs from
# libraries that zuul depends on such as gear.
log_defaults_from_env = os.environ.get(
'OS_LOG_DEFAULTS',
'git.cmd=INFO,kazoo.client=INFO')
if log_defaults_from_env:
for default in log_defaults_from_env.split(','):
try:
name, level_str = default.split('=', 1)
level = getattr(logging, level_str, logging.DEBUG)
self.useFixture(fixtures.FakeLogger(
name=name,
level=level,
format='%(asctime)s %(name)-32s '
'%(levelname)-8s %(message)s'))
except ValueError:
# NOTE(notmorgan): Invalid format of the log default,
# skip and don't try and apply a logger for the
# specified module
pass
if log_defaults_from_env:
for default in log_defaults_from_env.split(','):
try:
name, level_str = default.split('=', 1)
level = getattr(logging, level_str, logging.DEBUG)
self.useFixture(fixtures.FakeLogger(
name=name,
level=level,
format='%(asctime)s %(name)-32s '
'%(levelname)-8s %(message)s'))
except ValueError:
# NOTE(notmorgan): Invalid format of the log default,
# skip and don't try and apply a logger for the
# specified module
pass
class ZuulTestCase(BaseTestCase):
@ -1140,10 +1201,17 @@ class ZuulTestCase(BaseTestCase):
self.merge_client = zuul.merger.client.MergeClient(
self.config, self.sched)
self.nodepool = zuul.nodepool.Nodepool(self.sched)
self.zk = zuul.zk.ZooKeeper()
self.zk.connect([self.zk_config])
self.fake_nodepool = FakeNodepool(self.zk_config.host,
self.zk_config.port,
self.zk_config.chroot)
self.sched.setLauncher(self.launch_client)
self.sched.setMerger(self.merge_client)
self.sched.setNodepool(self.nodepool)
self.sched.setZooKeeper(self.zk)
self.webapp = zuul.webapp.WebApp(
self.sched, port=0, listen_address='127.0.0.1')
@ -1244,9 +1312,10 @@ class ZuulTestCase(BaseTestCase):
def setupZK(self):
self.zk_chroot_fixture = self.useFixture(ChrootedKazooFixture())
self.zookeeper_host = self.zk_chroot_fixture.zookeeper_host
self.zookeeper_port = self.zk_chroot_fixture.zookeeper_port
self.zookeeper_chroot = self.zk_chroot_fixture.zookeeper_chroot
self.zk_config = zuul.zk.ZooKeeperConnectionConfig(
self.zk_chroot_fixture.zookeeper_host,
self.zk_chroot_fixture.zookeeper_port,
self.zk_chroot_fixture.zookeeper_chroot)
def copyDirToRepo(self, project, source_path):
self.init_repo(project)
@ -1295,6 +1364,8 @@ class ZuulTestCase(BaseTestCase):
self.rpc.stop()
self.rpc.join()
self.gearman_server.shutdown()
self.fake_nodepool.stop()
self.zk.disconnect()
threads = threading.enumerate()
if len(threads) > 1:
self.log.error("More than one thread is running: %s" % threads)
@ -1428,6 +1499,11 @@ class ZuulTestCase(BaseTestCase):
return False
return True
def areAllNodeRequestsComplete(self):
if self.sched.nodepool.requests:
return False
return True
def eventQueuesEmpty(self):
for queue in self.event_queues:
yield queue.empty()
@ -1461,7 +1537,8 @@ class ZuulTestCase(BaseTestCase):
if (not self.merge_client.jobs and
all(self.eventQueuesEmpty()) and
self.haveAllBuildsReported() and
self.areAllBuildsWaiting()):
self.areAllBuildsWaiting() and
self.areAllNodeRequestsComplete()):
self.sched.run_handler_lock.release()
self.launch_server.lock.release()
self.log.debug("...settled.")

View File

@ -395,11 +395,39 @@ class NodeRequest(object):
self.build_set = build_set
self.job = job
self.nodeset = nodeset
self.id = uuid4().hex
self._state = 'requested'
self.state_time = time.time()
self.stat = None
self.uid = uuid4().hex
@property
def state(self):
return self._state
@state.setter
def state(self, value):
# TODOv3(jeblair): reinstate
# if value not in STATES:
# raise TypeError("'%s' is not a valid state" % value)
self._state = value
self.state_time = time.time()
def __repr__(self):
return '<NodeRequest %s>' % (self.nodeset,)
def toDict(self):
d = {}
nodes = [n.image for n in self.nodeset.getNodes()]
d['node_types'] = nodes
d['requestor'] = 'zuul' # TODOv3(jeblair): better descriptor
d['state'] = self.state
d['state_time'] = self.state_time
return d
def updateFromDict(self, data):
self._state = data['state']
self.state_time = data['state_time']
class Job(object):
"""A Job represents the defintion of actions to perform."""

View File

@ -10,18 +10,26 @@
# License for the specific language governing permissions and limitations
# under the License.
import logging
from zuul.model import NodeRequest
class Nodepool(object):
log = logging.getLogger('zuul.nodepool')
def __init__(self, scheduler):
self.requests = {}
self.sched = scheduler
def requestNodes(self, build_set, job):
req = NodeRequest(build_set, job, job.nodeset)
self.requests[req.id] = req
self._requestComplete(req.id)
self.requests[req.uid] = req
self.log.debug("Submitting node request: %s" % (req,))
self.sched.zk.submitNodeRequest(req)
self._updateNodeRequest(req)
return req
def cancelRequest(self, request):
@ -31,7 +39,16 @@ class Nodepool(object):
def returnNodes(self, nodes, used=True):
pass
def _requestComplete(self, id):
req = self.requests[id]
del self.requests[id]
self.sched.onNodesProvisioned(req)
def _updateNodeRequest(self, request):
self.log.debug("Updating node request: %s" % (request,))
def callback(event):
self._updateNodeRequest(request)
self.sched.zk.getNodeRequest(request, callback)
if request.uid not in self.requests:
return
if request.state == 'fulfilled':
self.sched.onNodesProvisioned(request)
del self.requests[request.uid]

View File

@ -285,6 +285,9 @@ class Scheduler(threading.Thread):
def setNodepool(self, nodepool):
self.nodepool = nodepool
def setZooKeeper(self, zk):
self.zk = zk
def addEvent(self, event):
self.log.debug("Adding trigger event: %s" % event)
try:

309
zuul/zk.py Normal file
View File

@ -0,0 +1,309 @@
#!/usr/bin/env python
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
import logging
import six
import time
from kazoo.client import KazooClient, KazooState
from kazoo import exceptions as kze
# States:
# We are building this node but it is not ready for use.
BUILDING = 'building'
# The node is ready for use.
READY = 'ready'
# The node should be deleted.
DELETING = 'deleting'
STATES = set([BUILDING, READY, DELETING])
class ZooKeeperConnectionConfig(object):
'''
Represents the connection parameters for a ZooKeeper server.
'''
def __eq__(self, other):
if isinstance(other, ZooKeeperConnectionConfig):
if other.__dict__ == self.__dict__:
return True
return False
def __init__(self, host, port=2181, chroot=None):
'''Initialize the ZooKeeperConnectionConfig object.
:param str host: The hostname of the ZooKeeper server.
:param int port: The port on which ZooKeeper is listening.
Optional, default: 2181.
:param str chroot: A chroot for this connection. All
ZooKeeper nodes will be underneath this root path.
Optional, default: None.
(one per server) defining the ZooKeeper cluster servers. Only
the 'host' attribute is required.'.
'''
self.host = host
self.port = port
self.chroot = chroot or ''
def buildZooKeeperHosts(host_list):
'''
Build the ZK cluster host list for client connections.
:param list host_list: A list of
:py:class:`~nodepool.zk.ZooKeeperConnectionConfig` objects (one
per server) defining the ZooKeeper cluster servers.
'''
if not isinstance(host_list, list):
raise Exception("'host_list' must be a list")
hosts = []
for host_def in host_list:
host = '%s:%s%s' % (host_def.host, host_def.port, host_def.chroot)
hosts.append(host)
return ",".join(hosts)
class BaseModel(object):
def __init__(self, o_id):
if o_id:
self.id = o_id
self._state = None
self.state_time = None
self.stat = None
@property
def id(self):
return self._id
@id.setter
def id(self, value):
if not isinstance(value, six.string_types):
raise TypeError("'id' attribute must be a string type")
self._id = value
@property
def state(self):
return self._state
@state.setter
def state(self, value):
if value not in STATES:
raise TypeError("'%s' is not a valid state" % value)
self._state = value
self.state_time = time.time()
def toDict(self):
'''
Convert a BaseModel object's attributes to a dictionary.
'''
d = {}
d['state'] = self.state
d['state_time'] = self.state_time
return d
def fromDict(self, d):
'''
Set base attributes based on the given dict.
Unlike the derived classes, this should NOT return an object as it
assumes self has already been instantiated.
'''
if 'state' in d:
self.state = d['state']
if 'state_time' in d:
self.state_time = d['state_time']
class NodeRequest(BaseModel):
'''
Class representing a node request.
'''
def __init__(self, id=None):
super(NodeRequest, self).__init__(id)
def __repr__(self):
d = self.toDict()
d['id'] = self.id
d['stat'] = self.stat
return '<NodeRequest %s>' % d
def toDict(self):
'''
Convert a NodeRequest object's attributes to a dictionary.
'''
d = super(NodeRequest, self).toDict()
return d
@staticmethod
def fromDict(d, o_id=None):
'''
Create a NodeRequest object from a dictionary.
:param dict d: The dictionary.
:param str o_id: The object ID.
:returns: An initialized ImageBuild object.
'''
o = NodeRequest(o_id)
super(NodeRequest, o).fromDict(d)
return o
class ZooKeeper(object):
'''
Class implementing the ZooKeeper interface.
This class uses the facade design pattern to keep common interaction
with the ZooKeeper API simple and consistent for the caller, and
limits coupling between objects. It allows for more complex interactions
by providing direct access to the client connection when needed (though
that is discouraged). It also provides for a convenient entry point for
testing only ZooKeeper interactions.
'''
log = logging.getLogger("zuul.zk.ZooKeeper")
REQUEST_ROOT = '/nodepool/requests'
def __init__(self):
'''
Initialize the ZooKeeper object.
'''
self.client = None
self._became_lost = False
def _dictToStr(self, data):
return json.dumps(data)
def _strToDict(self, data):
return json.loads(data)
def _connection_listener(self, state):
'''
Listener method for Kazoo connection state changes.
.. warning:: This method must not block.
'''
if state == KazooState.LOST:
self.log.debug("ZooKeeper connection: LOST")
self._became_lost = True
elif state == KazooState.SUSPENDED:
self.log.debug("ZooKeeper connection: SUSPENDED")
else:
self.log.debug("ZooKeeper connection: CONNECTED")
@property
def connected(self):
return self.client.state == KazooState.CONNECTED
@property
def suspended(self):
return self.client.state == KazooState.SUSPENDED
@property
def lost(self):
return self.client.state == KazooState.LOST
@property
def didLoseConnection(self):
return self._became_lost
def resetLostFlag(self):
self._became_lost = False
def connect(self, host_list, read_only=False):
'''
Establish a connection with ZooKeeper cluster.
Convenience method if a pre-existing ZooKeeper connection is not
supplied to the ZooKeeper object at instantiation time.
:param list host_list: A list of
:py:class:`~nodepool.zk.ZooKeeperConnectionConfig` objects
(one per server) defining the ZooKeeper cluster servers.
:param bool read_only: If True, establishes a read-only connection.
'''
if self.client is None:
hosts = buildZooKeeperHosts(host_list)
self.client = KazooClient(hosts=hosts, read_only=read_only)
self.client.add_listener(self._connection_listener)
self.client.start()
def disconnect(self):
'''
Close the ZooKeeper cluster connection.
You should call this method if you used connect() to establish a
cluster connection.
'''
if self.client is not None and self.client.connected:
self.client.stop()
self.client.close()
self.client = None
def resetHosts(self, host_list):
'''
Reset the ZooKeeper cluster connection host list.
:param list host_list: A list of
:py:class:`~nodepool.zk.ZooKeeperConnectionConfig` objects
(one per server) defining the ZooKeeper cluster servers.
'''
if self.client is not None:
hosts = buildZooKeeperHosts(host_list)
self.client.set_hosts(hosts=hosts)
def submitNodeRequest(self, node_request):
'''
Submit a request for nodes to Nodepool.
:param NodeRequest node_request: A NodeRequest with the
contents of the request.
'''
priority = 100 # TODO(jeblair): integrate into nodereq
data = node_request.toDict()
data['created_time'] = time.time()
path = '%s/%s-' % (self.REQUEST_ROOT, priority)
self.log.debug(data)
path = self.client.create(path, self._dictToStr(data),
makepath=True,
sequence=True, ephemeral=True)
reqid = path.split("/")[-1]
node_request.id = reqid
def getNodeRequest(self, node_request, watcher):
'''
Read the specified node request and update its values.
:param NodeRequest node_request: A NodeRequest to be read. It
will be updated with the results of the read.
:param callable watcher: A watch function to be called when the
node request is updated.
'''
path = '%s/%s' % (self.REQUEST_ROOT, node_request.id)
try:
data, stat = self.client.get(path, watch=watcher)
except kze.NoNodeError:
return
data = self._strToDict(data)
node_request.updateFromDict(data)
# TODOv3(jeblair): re-register watches on disconnect