Prepare Zookeeper for scale-out scheduler

This change is a common root for other
Zookeeper related changed regarding
scale-out-scheduler. Zookeeper becoming
a central component requires to increase
"maxClientCnxns".

Since the ZooKeeper class is expected to grow
significantly (ZooKeeper is becoming a central part
of Zuul) a split of the ZooKeeper class (zk.py) into
zk module is done here to avoid the current god-class.

Also the zookeeper log is copied to the "zuul_output_dir".

Change-Id: I714c06052b5e17269a6964892ad53b48cf65db19
Story: 2007192
This commit is contained in:
Jan Kubovy 2020-10-15 10:48:40 +02:00 committed by Felix Edel
parent dddbb3dbfe
commit d518e56208
16 changed files with 706 additions and 623 deletions

View File

@ -236,6 +236,7 @@
tox_environment:
ZUUL_TEST_ROOT: /tmp/zuul-test
YARN_REGISTRY: "https://{{ zuul_site_mirror_fqdn }}:4443/registry.npmjs"
post-run: playbooks/common/post-system-logs.yaml
- tox-py38:
irrelevant-files:
- zuul/cmd/migrate.py
@ -243,6 +244,7 @@
timeout: 4800 # 80 minutes
nodeset: ubuntu-bionic
vars: *zuul_tox_vars
post-run: playbooks/common/post-system-logs.yaml
- zuul-build-dashboard-openstack-whitelabel
- zuul-build-dashboard-software-factory
- zuul-build-dashboard-opendev

View File

@ -5,7 +5,7 @@ initLimit=5
syncLimit=2
autopurge.snapRetainCount=3
autopurge.purgeInterval=0
maxClientCnxns=60
maxClientCnxns=1000
standaloneEnabled=true
admin.enableServer=true
server.1=examples_zk_1.examples_default:2888:3888

View File

@ -0,0 +1,5 @@
- hosts: all
tasks:
- name: Collect zookeeper logs
shell: "cp /var/log/zookeeper/zookeeper.log {{ zuul_output_dir }}/logs/zookeeper.log"

View File

@ -111,10 +111,10 @@ import zuul.merger.server
import zuul.model
import zuul.nodepool
import zuul.rpcclient
import zuul.zk
import zuul.configloader
from zuul.lib.config import get_default
from zuul.lib.logutil import get_annotated_logger
from zuul.zk import ZooKeeperClient
FIXTURE_DIR = os.path.join(os.path.dirname(__file__), 'fixtures')
@ -3629,13 +3629,14 @@ class ChrootedKazooFixture(fixtures.Fixture):
for x in range(8))
rand_test_path = '%s_%s_%s' % (random_bits, os.getpid(), self.test_id)
self.zookeeper_chroot = "/nodepool_test/%s" % rand_test_path
self.zookeeper_chroot = f"/test/{rand_test_path}"
self.addCleanup(self._cleanup)
# Ensure the chroot path exists and clean up any pre-existing znodes.
_tmp_client = kazoo.client.KazooClient(
hosts='%s:%s' % (self.zookeeper_host, self.zookeeper_port))
hosts=f'{self.zookeeper_host}:{self.zookeeper_port}', timeout=10
)
_tmp_client.start()
if _tmp_client.exists(self.zookeeper_chroot):
@ -3992,13 +3993,13 @@ class SchedulerTestApp:
self.config, self.sched)
merge_client = RecordingMergeClient(self.config, self.sched)
nodepool = zuul.nodepool.Nodepool(self.sched)
zk = zuul.zk.ZooKeeper(enable_cache=True)
zk.connect(self.zk_config, timeout=30.0)
zk_client = ZooKeeperClient()
zk_client.connect(self.zk_config, timeout=30.0)
self.sched.setExecutor(executor_client)
self.sched.setMerger(merge_client)
self.sched.setNodepool(nodepool)
self.sched.setZooKeeper(zk)
self.sched.setZooKeeper(zk_client)
self.sched.start()
executor_client.gearman.waitForServer()
@ -4626,7 +4627,7 @@ class ZuulTestCase(BaseTestCase):
self.rpcclient.shutdown()
self.gearman_server.shutdown()
self.fake_nodepool.stop()
self.scheds.execute(lambda app: app.sched.zk.disconnect())
self.scheds.execute(lambda app: app.sched.zk_client.disconnect())
self.printHistory()
# We whitelist watchdog threads as they have relatively long delays
# before noticing they should exit, but they should exit on their own.

View File

@ -31,9 +31,9 @@ class TestNodepoolIntegration(BaseTestCase):
super(TestNodepoolIntegration, self).setUp()
self.statsd = None
self.zk = zuul.zk.ZooKeeper(enable_cache=True)
self.addCleanup(self.zk.disconnect)
self.zk.connect('localhost:2181')
self.zk_client = zuul.zk.ZooKeeperClient()
self.addCleanup(self.zk_client.disconnect)
self.zk_client.connect('localhost:2181')
self.hostname = socket.gethostname()
self.provisioned_requests = []
@ -104,8 +104,8 @@ class TestNodepoolIntegration(BaseTestCase):
job.nodeset = nodeset
self.fake_nodepool.paused = True
request = self.nodepool.requestNodes(None, job, 0)
self.zk.client.stop()
self.zk.client.start()
self.zk_client.client.stop()
self.zk_client.client.start()
self.fake_nodepool.paused = False
self.waitForRequests()
self.assertEqual(len(self.provisioned_requests), 1)

View File

@ -15,11 +15,12 @@
import time
import zuul.zk
import zuul.nodepool
from zuul import model
import zuul.nodepool
from tests.base import BaseTestCase, ChrootedKazooFixture, FakeNodepool
from zuul.zk import ZooKeeperClient
from zuul.zk.nodepool import ZooKeeperNodepool
class TestNodepool(BaseTestCase):
@ -37,9 +38,10 @@ class TestNodepool(BaseTestCase):
self.zk_chroot_fixture.zookeeper_port,
self.zk_chroot_fixture.zookeeper_chroot)
self.zk = zuul.zk.ZooKeeper(enable_cache=True)
self.addCleanup(self.zk.disconnect)
self.zk.connect(self.zk_config)
self.zk_client = ZooKeeperClient()
self.zk_nodepool = ZooKeeperNodepool(self.zk_client)
self.addCleanup(self.zk_client.disconnect)
self.zk_client.connect(self.zk_config)
self.hostname = 'nodepool-test-hostname'
self.provisioned_requests = []
@ -105,8 +107,8 @@ class TestNodepool(BaseTestCase):
job.nodeset = nodeset
self.fake_nodepool.pause()
request = self.nodepool.requestNodes(None, job, 0)
self.zk.client.stop()
self.zk.client.start()
self.zk_client.client.stop()
self.zk_client.client.start()
self.fake_nodepool.unpause()
self.waitForRequests()
self.assertEqual(len(self.provisioned_requests), 1)
@ -161,7 +163,7 @@ class TestNodepool(BaseTestCase):
self.assertEqual(len(self.provisioned_requests), 1)
self.assertEqual(request.state, 'fulfilled')
self.zk.deleteNodeRequest(request)
self.zk_nodepool.deleteNodeRequest(request)
# Accept the nodes
self.nodepool.acceptNodes(request, request.id)

View File

@ -190,9 +190,10 @@ class TestSchedulerAutoholdHoldExpiration(ZuulTestCase):
self.assertTrue(r)
# There should be a record in ZooKeeper
request_list = self.scheds.first.sched.zk.getHoldRequests()
request_list = self.scheds.first.sched.zk_nodepool.getHoldRequests()
self.assertEqual(1, len(request_list))
request = self.scheds.first.sched.zk.getHoldRequest(request_list[0])
request = self.scheds.first.sched.zk_nodepool.getHoldRequest(
request_list[0])
self.assertIsNotNone(request)
self.assertEqual('tenant-one', request.tenant)
self.assertEqual('review.example.com/org/project', request.project)
@ -220,9 +221,10 @@ class TestSchedulerAutoholdHoldExpiration(ZuulTestCase):
self.assertTrue(r)
# There should be a record in ZooKeeper
request_list = self.scheds.first.sched.zk.getHoldRequests()
request_list = self.scheds.first.sched.zk_nodepool.getHoldRequests()
self.assertEqual(1, len(request_list))
request = self.scheds.first.sched.zk.getHoldRequest(request_list[0])
request = self.scheds.first.sched.zk_nodepool.getHoldRequest(
request_list[0])
self.assertIsNotNone(request)
self.assertEqual('tenant-one', request.tenant)
self.assertEqual('review.example.com/org/project', request.project)
@ -251,9 +253,10 @@ class TestSchedulerAutoholdHoldExpiration(ZuulTestCase):
self.assertTrue(r)
# There should be a record in ZooKeeper
request_list = self.scheds.first.sched.zk.getHoldRequests()
request_list = self.scheds.first.sched.zk_nodepool.getHoldRequests()
self.assertEqual(1, len(request_list))
request = self.scheds.first.sched.zk.getHoldRequest(request_list[0])
request = self.scheds.first.sched.zk_nodepool.getHoldRequest(
request_list[0])
self.assertIsNotNone(request)
self.assertEqual('tenant-one', request.tenant)
self.assertEqual('review.example.com/org/project', request.project)
@ -1766,9 +1769,10 @@ class TestScheduler(ZuulTestCase):
self.assertTrue(r)
# There should be a record in ZooKeeper
request_list = self.scheds.first.sched.zk.getHoldRequests()
request_list = self.scheds.first.sched.zk_nodepool.getHoldRequests()
self.assertEqual(1, len(request_list))
request = self.scheds.first.sched.zk.getHoldRequest(request_list[0])
request = self.scheds.first.sched.zk_nodepool.getHoldRequest(
request_list[0])
self.assertIsNotNone(request)
self.assertEqual('tenant-one', request.tenant)
self.assertEqual('review.example.com/org/project', request.project)
@ -1827,7 +1831,8 @@ class TestScheduler(ZuulTestCase):
# The hold request current_count should have incremented
# and we should have recorded the held node ID.
request2 = self.scheds.first.sched.zk.getHoldRequest(request.id)
request2 = self.scheds.first.sched.zk_nodepool.getHoldRequest(
request.id)
self.assertEqual(request.current_count + 1, request2.current_count)
self.assertEqual(1, len(request2.nodes))
self.assertEqual(1, len(request2.nodes[0]["nodes"]))
@ -1849,11 +1854,12 @@ class TestScheduler(ZuulTestCase):
self.assertEqual(held_nodes, 1)
# request current_count should not have changed
request3 = self.scheds.first.sched.zk.getHoldRequest(request2.id)
request3 = self.scheds.first.sched.zk_nodepool.getHoldRequest(
request2.id)
self.assertEqual(request2.current_count, request3.current_count)
# Deleting hold request should set held nodes to used
self.scheds.first.sched.zk.deleteHoldRequest(request3)
self.scheds.first.sched.zk_nodepool.deleteHoldRequest(request3)
node_states = [n['state'] for n in self.fake_nodepool.getNodes()]
self.assertEqual(3, len(node_states))
self.assertEqual([zuul.model.STATE_USED] * 3, node_states)
@ -1873,9 +1879,10 @@ class TestScheduler(ZuulTestCase):
self.assertTrue(r)
# There should be a record in ZooKeeper
request_list = self.scheds.first.sched.zk.getHoldRequests()
request_list = self.scheds.first.sched.zk_nodepool.getHoldRequests()
self.assertEqual(1, len(request_list))
request = self.scheds.first.sched.zk.getHoldRequest(request_list[0])
request = self.scheds.first.sched.zk_nodepool.getHoldRequest(
request_list[0])
self.assertIsNotNone(request)
request = client.autohold_info(request.id)
@ -1897,14 +1904,15 @@ class TestScheduler(ZuulTestCase):
self.assertTrue(r)
# There should be a record in ZooKeeper
request_list = self.scheds.first.sched.zk.getHoldRequests()
request_list = self.scheds.first.sched.zk_nodepool.getHoldRequests()
self.assertEqual(1, len(request_list))
request = self.scheds.first.sched.zk.getHoldRequest(request_list[0])
request = self.scheds.first.sched.zk_nodepool.getHoldRequest(
request_list[0])
self.assertIsNotNone(request)
# Delete and verify no more requests
self.assertTrue(client.autohold_delete(request.id))
request_list = self.scheds.first.sched.zk.getHoldRequests()
request_list = self.scheds.first.sched.zk_nodepool.getHoldRequests()
self.assertEqual([], request_list)
def _test_autohold_scoped(self, change_obj, change, ref):
@ -5783,8 +5791,8 @@ For CI problems and help debugging, contact ci@example.org"""
self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
self.waitUntilSettled()
self.scheds.execute(lambda app: app.sched.zk.client.stop())
self.scheds.execute(lambda app: app.sched.zk.client.start())
self.scheds.execute(lambda app: app.sched.zk_client.client.stop())
self.scheds.execute(lambda app: app.sched.zk_client.client.start())
self.fake_nodepool.unpause()
self.waitUntilSettled()
@ -5819,8 +5827,8 @@ For CI problems and help debugging, contact ci@example.org"""
# The request is fulfilled, but the scheduler hasn't processed
# it yet. Reconnect ZK.
self.scheds.execute(lambda app: app.sched.zk.client.stop())
self.scheds.execute(lambda app: app.sched.zk.client.start())
self.scheds.execute(lambda app: app.sched.zk_client.client.stop())
self.scheds.execute(lambda app: app.sched.zk_client.client.start())
# Allow the scheduler to continue and process the (now
# out-of-date) notification that nodes are ready.

View File

@ -15,10 +15,12 @@
import testtools
import zuul.zk
from zuul import model
import zuul.zk.exceptions
from tests.base import BaseTestCase, ChrootedKazooFixture
from zuul.zk import ZooKeeperClient
from zuul.zk.nodepool import ZooKeeperNodepool
class TestZK(BaseTestCase):
@ -33,9 +35,10 @@ class TestZK(BaseTestCase):
self.zk_chroot_fixture.zookeeper_port,
self.zk_chroot_fixture.zookeeper_chroot)
self.zk = zuul.zk.ZooKeeper(enable_cache=True)
self.addCleanup(self.zk.disconnect)
self.zk.connect(self.zk_config)
self.zk_client = ZooKeeperClient()
self.zk_nodepool = ZooKeeperNodepool(self.zk_client)
self.addCleanup(self.zk_client.disconnect)
self.zk_client.connect(self.zk_config)
def _createRequest(self):
req = model.HoldRequest()
@ -46,37 +49,37 @@ class TestZK(BaseTestCase):
def test_hold_requests_api(self):
# Test no requests returns empty list
self.assertEqual([], self.zk.getHoldRequests())
self.assertEqual([], self.zk_nodepool.getHoldRequests())
# Test get on non-existent request is None
self.assertIsNone(self.zk.getHoldRequest('anything'))
self.assertIsNone(self.zk_nodepool.getHoldRequest('anything'))
# Test creating a new request
req1 = self._createRequest()
self.zk.storeHoldRequest(req1)
self.zk_nodepool.storeHoldRequest(req1)
self.assertIsNotNone(req1.id)
self.assertEqual(1, len(self.zk.getHoldRequests()))
self.assertEqual(1, len(self.zk_nodepool.getHoldRequests()))
# Test getting the request
req2 = self.zk.getHoldRequest(req1.id)
req2 = self.zk_nodepool.getHoldRequest(req1.id)
self.assertEqual(req1.toDict(), req2.toDict())
# Test updating the request
req2.reason = 'a new reason'
self.zk.storeHoldRequest(req2)
req2 = self.zk.getHoldRequest(req2.id)
self.zk_nodepool.storeHoldRequest(req2)
req2 = self.zk_nodepool.getHoldRequest(req2.id)
self.assertNotEqual(req1.reason, req2.reason)
# Test lock operations
self.zk.lockHoldRequest(req2, blocking=False)
self.zk_nodepool.lockHoldRequest(req2, blocking=False)
with testtools.ExpectedException(
zuul.zk.LockException,
zuul.zk.exceptions.LockException,
"Timeout trying to acquire lock .*"
):
self.zk.lockHoldRequest(req2, blocking=True, timeout=2)
self.zk.unlockHoldRequest(req2)
self.zk_nodepool.lockHoldRequest(req2, blocking=True, timeout=2)
self.zk_nodepool.unlockHoldRequest(req2)
self.assertIsNone(req2.lock)
# Test deleting the request
self.zk.deleteHoldRequest(req1)
self.assertEqual([], self.zk.getHoldRequests())
self.zk_nodepool.deleteHoldRequest(req1)
self.assertEqual([], self.zk_nodepool.getHoldRequests())

View File

@ -11,6 +11,8 @@ TOOLSDIR=$(dirname $0)
sudo service zookeeper stop
DATADIR=$(sed -n -e 's/^dataDir=//p' /etc/zookeeper/conf/zoo.cfg)
sudo mount -t tmpfs -o nodev,nosuid,size=500M none $DATADIR
echo "autopurge.purgeInterval=1" | sudo tee -a /etc/zookeeper/conf/zoo.cfg
echo "maxClientCnxns=1000" | sudo tee -a /etc/zookeeper/conf/zoo.cfg
# Prepare a tmpfs for Zuul test root
if [[ -n "${ZUUL_TEST_ROOT:-}" ]]; then

View File

@ -20,13 +20,12 @@ import signal
import zuul.cmd
import zuul.executor.client
from zuul.lib.config import get_default
from zuul.lib.statsd import get_statsd_config
import zuul.merger.client
import zuul.nodepool
import zuul.scheduler
import zuul.zk
from zuul.lib.config import get_default
from zuul.lib.statsd import get_statsd_config
from zuul.zk import ZooKeeperClient
class Scheduler(zuul.cmd.ZuulDaemonApp):
@ -144,7 +143,7 @@ class Scheduler(zuul.cmd.ZuulDaemonApp):
merger = zuul.merger.client.MergeClient(self.config, self.sched)
nodepool = zuul.nodepool.Nodepool(self.sched)
zookeeper = zuul.zk.ZooKeeper(enable_cache=True)
zk_client = ZooKeeperClient()
zookeeper_hosts = get_default(self.config, 'zookeeper', 'hosts', None)
if not zookeeper_hosts:
raise Exception("The zookeeper hosts config value is required")
@ -153,7 +152,7 @@ class Scheduler(zuul.cmd.ZuulDaemonApp):
zookeeper_tls_ca = get_default(self.config, 'zookeeper', 'tls_ca')
zookeeper_timeout = float(get_default(self.config, 'zookeeper',
'session_timeout', 10.0))
zookeeper.connect(
zk_client.connect(
zookeeper_hosts,
timeout=zookeeper_timeout,
tls_cert=zookeeper_tls_cert,
@ -164,7 +163,7 @@ class Scheduler(zuul.cmd.ZuulDaemonApp):
self.sched.setExecutor(gearman)
self.sched.setMerger(merger)
self.sched.setNodepool(nodepool)
self.sched.setZooKeeper(zookeeper)
self.sched.setZooKeeper(zk_client)
self.log.info('Starting scheduler')
try:
@ -191,6 +190,7 @@ class Scheduler(zuul.cmd.ZuulDaemonApp):
self.exit_handler(signal.SIGINT, None)
else:
self.sched.join()
zk_client.disconnect()
def main():

View File

@ -16,7 +16,7 @@ import time
from collections import defaultdict
from zuul import model
from zuul.lib.logutil import get_annotated_logger
from zuul.zk import LockException
from zuul.zk.exceptions import LockException
def add_resources(target, source):
@ -115,7 +115,8 @@ class Nodepool(object):
self.requests[req.uid] = req
if nodeset.nodes:
self.sched.zk.submitNodeRequest(req, self._updateNodeRequest)
self.sched.zk_nodepool.submitNodeRequest(req,
self._updateNodeRequest)
# Logged after submission so that we have the request id
log.info("Submitted node request %s", req)
self.emitStats(req)
@ -132,7 +133,7 @@ class Nodepool(object):
if request.uid in self.requests:
request.canceled = True
try:
self.sched.zk.deleteNodeRequest(request)
self.sched.zk_nodepool.deleteNodeRequest(request)
except Exception:
log.exception("Error deleting node request:")
@ -149,7 +150,7 @@ class Nodepool(object):
if relative_priority is None:
return
try:
self.sched.zk.lockNodeRequest(request, blocking=False)
self.sched.zk_nodepool.lockNodeRequest(request, blocking=False)
except LockException:
# It may be locked by nodepool, which is fine.
log.debug("Unable to revise locked node request %s", request)
@ -157,7 +158,7 @@ class Nodepool(object):
try:
old_priority = request.relative_priority
request.relative_priority = relative_priority
self.sched.zk.storeNodeRequest(request)
self.sched.zk_nodepool.storeNodeRequest(request)
log.debug("Revised relative priority of "
"node request %s from %s to %s",
request, old_priority, relative_priority)
@ -165,7 +166,7 @@ class Nodepool(object):
log.exception("Unable to update node request %s", request)
finally:
try:
self.sched.zk.unlockNodeRequest(request)
self.sched.zk_nodepool.unlockNodeRequest(request)
except Exception:
log.exception("Unable to unlock node request %s", request)
@ -190,7 +191,7 @@ class Nodepool(object):
node.comment = request.reason
if request.node_expiration:
node.hold_expiration = request.node_expiration
self.sched.zk.storeNode(node)
self.sched.zk_nodepool.storeNode(node)
request.nodes.append(dict(
build=build.uuid,
@ -205,10 +206,10 @@ class Nodepool(object):
# Give ourselves a few seconds to try to obtain the lock rather than
# immediately give up.
self.sched.zk.lockHoldRequest(request, timeout=5)
self.sched.zk_nodepool.lockHoldRequest(request, timeout=5)
try:
self.sched.zk.storeHoldRequest(request)
self.sched.zk_nodepool.storeHoldRequest(request)
except Exception:
# If we fail to update the request count, we won't consider it
# a real autohold error by passing the exception up. It will
@ -219,7 +220,7 @@ class Nodepool(object):
finally:
# Although any exceptions thrown here are handled higher up in
# _doBuildCompletedEvent, we always want to try to unlock it.
self.sched.zk.unlockHoldRequest(request)
self.sched.zk_nodepool.unlockHoldRequest(request)
def useNodeSet(self, nodeset, build_set=None, event=None):
self.log.info("Setting nodeset %s in use" % (nodeset,))
@ -228,7 +229,7 @@ class Nodepool(object):
if node.lock is None:
raise Exception("Node %s is not locked" % (node,))
node.state = model.STATE_IN_USE
self.sched.zk.storeNode(node)
self.sched.zk_nodepool.storeNode(node)
if node.resources:
add_resources(resources, node.resources)
if build_set and resources:
@ -275,7 +276,7 @@ class Nodepool(object):
if node.resources:
add_resources(resources, node.resources)
node.state = model.STATE_USED
self.sched.zk.storeNode(node)
self.sched.zk_nodepool.storeNode(node)
except Exception:
log.exception("Exception storing node %s "
"while unlocking:", node)
@ -303,7 +304,7 @@ class Nodepool(object):
def _unlockNodes(self, nodes):
for node in nodes:
try:
self.sched.zk.unlockNode(node)
self.sched.zk_nodepool.unlockNode(node)
except Exception:
self.log.exception("Error unlocking node:")
@ -321,7 +322,7 @@ class Nodepool(object):
raise Exception("Node %s allocated to %s, not %s" %
(node.id, node.allocated_to, request_id))
self.log.debug("Locking node %s" % (node,))
self.sched.zk.lockNode(node, timeout=30)
self.sched.zk_nodepool.lockNode(node, timeout=30)
locked_nodes.append(node)
except Exception:
self.log.exception("Error locking nodes:")
@ -347,8 +348,8 @@ class Nodepool(object):
if deleted:
log.debug("Resubmitting lost node request %s", request)
request.id = None
self.sched.zk.submitNodeRequest(request, self._updateNodeRequest)
self.sched.zk_nodepool.submitNodeRequest(request,
self._updateNodeRequest)
# Stop watching this request node
return False
elif request.state in (model.STATE_FULFILLED, model.STATE_FAILED):
@ -397,13 +398,13 @@ class Nodepool(object):
# processing it. Nodepool will automatically reallocate the assigned
# nodes in that situation.
try:
if not self.sched.zk.nodeRequestExists(request):
if not self.sched.zk_nodepool.nodeRequestExists(request):
log.info("Request %s no longer exists, resubmitting",
request.id)
request.id = None
request.state = model.STATE_REQUESTED
self.requests[request.uid] = request
self.sched.zk.submitNodeRequest(
self.sched.zk_nodepool.submitNodeRequest(
request, self._updateNodeRequest)
return False
except Exception:
@ -430,7 +431,7 @@ class Nodepool(object):
# succeeded, delete the request.
log.debug("Deleting node request %s", request)
try:
self.sched.zk.deleteNodeRequest(request)
self.sched.zk_nodepool.deleteNodeRequest(request)
except Exception:
log.exception("Error deleting node request:")
request.failed = True

View File

@ -42,6 +42,7 @@ from zuul.lib.statsd import get_statsd
import zuul.lib.queue
import zuul.lib.repl
from zuul.model import Build, HoldRequest, Tenant, TriggerEvent
from zuul.zk.nodepool import ZooKeeperNodepool
COMMANDS = ['full-reconfigure', 'smart-reconfigure', 'stop', 'repl', 'norepl']
@ -419,8 +420,9 @@ class Scheduler(threading.Thread):
def setNodepool(self, nodepool):
self.nodepool = nodepool
def setZooKeeper(self, zk):
self.zk = zk
def setZooKeeper(self, zk_client):
self.zk_client = zk_client
self.zk_nodepool = ZooKeeperNodepool(zk_client)
def runStats(self):
while not self.stats_stop.wait(self._stats_interval):
@ -652,15 +654,15 @@ class Scheduler(threading.Thread):
request.node_expiration = node_hold_expiration
# No need to lock it since we are creating a new one.
self.zk.storeHoldRequest(request)
self.zk_nodepool.storeHoldRequest(request)
def autohold_list(self):
'''
Return current hold requests as a list of dicts.
'''
data = []
for request_id in self.zk.getHoldRequests():
request = self.zk.getHoldRequest(request_id)
for request_id in self.zk_nodepool.getHoldRequests():
request = self.zk_nodepool.getHoldRequest(request_id)
if not request:
continue
data.append(request.toDict())
@ -673,7 +675,7 @@ class Scheduler(threading.Thread):
:param str hold_request_id: The unique ID of the request to delete.
'''
try:
hold_request = self.zk.getHoldRequest(hold_request_id)
hold_request = self.zk_nodepool.getHoldRequest(hold_request_id)
except Exception:
self.log.exception(
"Error retrieving autohold ID %s:", hold_request_id)
@ -689,8 +691,9 @@ class Scheduler(threading.Thread):
:param str hold_request_id: The unique ID of the request to delete.
'''
hold_request = None
try:
hold_request = self.zk.getHoldRequest(hold_request_id)
hold_request = self.zk_nodepool.getHoldRequest(hold_request_id)
except Exception:
self.log.exception(
"Error retrieving autohold ID %s:", hold_request_id)
@ -702,7 +705,7 @@ class Scheduler(threading.Thread):
self.log.debug("Removing autohold %s", hold_request)
try:
self.zk.deleteHoldRequest(hold_request)
self.zk_nodepool.deleteHoldRequest(hold_request)
except Exception:
self.log.exception(
"Error removing autohold request %s:", hold_request)
@ -1491,15 +1494,15 @@ class Scheduler(threading.Thread):
return True
try:
self.zk.lockHoldRequest(request)
self.zk_nodepool.lockHoldRequest(request)
self.log.info("Removing expired hold request %s", request)
self.zk.deleteHoldRequest(request)
self.zk_nodepool.deleteHoldRequest(request)
except Exception:
self.log.exception(
"Failed to delete expired hold request %s", request)
finally:
try:
self.zk.unlockHoldRequest(request)
self.zk_nodepool.unlockHoldRequest(request)
except Exception:
pass
@ -1537,8 +1540,8 @@ class Scheduler(threading.Thread):
autohold = None
scope = Scope.NONE
self.log.debug("Checking build autohold key %s", autohold_key_base)
for request_id in self.zk.getHoldRequests():
request = self.zk.getHoldRequest(request_id)
for request_id in self.zk_nodepool.getHoldRequests():
request = self.zk_nodepool.getHoldRequest(request_id)
if not request:
continue

View File

@ -28,13 +28,14 @@ import time
import select
import threading
from zuul import exceptions
import zuul.lib.repl
from zuul.lib import commandsocket
from zuul.lib.re2util import filter_allowed_disallowed
import zuul.model
from zuul import exceptions
import zuul.rpcclient
import zuul.zk
from zuul.lib import commandsocket
from zuul.zk import ZooKeeperClient
from zuul.zk.nodepool import ZooKeeperNodepool
STATIC_DIR = os.path.join(os.path.dirname(__file__), 'static')
cherrypy.tools.websocket = WebSocketTool()
@ -227,7 +228,8 @@ class ZuulWebAPI(object):
def __init__(self, zuulweb):
self.rpc = zuulweb.rpc
self.zk = zuulweb.zk
self.zk_client = zuulweb.zk_client
self.zk_nodepool = ZooKeeperNodepool(self.zk_client)
self.zuulweb = zuulweb
self.cache = {}
self.cache_time = {}
@ -853,7 +855,7 @@ class ZuulWebAPI(object):
allowed_labels = data['allowed_labels']
disallowed_labels = data['disallowed_labels']
labels = set()
for launcher in self.zk.getRegisteredLaunchers():
for launcher in self.zk_nodepool.getRegisteredLaunchers():
labels.update(filter_allowed_disallowed(
launcher.supported_labels,
allowed_labels, disallowed_labels))
@ -867,7 +869,7 @@ class ZuulWebAPI(object):
@cherrypy.tools.json_out(content_type='application/json; charset=utf-8')
def nodes(self, tenant):
ret = []
for node in self.zk.nodeIterator():
for node in self.zk_nodepool.nodeIterator():
node_data = {}
for key in ("id", "type", "connection_type", "external_id",
"provider", "state", "state_time", "comment"):
@ -1221,9 +1223,9 @@ class ZuulWeb(object):
self.rpc = zuul.rpcclient.RPCClient(gear_server, gear_port,
ssl_key, ssl_cert, ssl_ca,
client_id='Zuul Web Server')
self.zk = zuul.zk.ZooKeeper(enable_cache=True)
self.zk_client = ZooKeeperClient()
if zk_hosts:
self.zk.connect(hosts=zk_hosts, read_only=True,
self.zk_client.connect(hosts=zk_hosts, read_only=True,
timeout=zk_timeout, tls_cert=zk_tls_cert,
tls_key=zk_tls_key, tls_ca=zk_tls_ca)
@ -1382,7 +1384,7 @@ class ZuulWeb(object):
cherrypy.server.httpserver = None
self.wsplugin.unsubscribe()
self.stream_manager.stop()
self.zk.disconnect()
self.zk_client.disconnect()
self.stop_repl()
self._command_running = False
self.command_socket.stop()

153
zuul/zk/__init__.py Normal file
View File

@ -0,0 +1,153 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import time
from abc import ABCMeta
from typing import Optional, List, Callable
from kazoo.client import KazooClient
from kazoo.handlers.threading import KazooTimeoutError
from kazoo.protocol.states import KazooState
from zuul.zk.exceptions import NoClientException
class ZooKeeperClient(object):
log = logging.getLogger("zuul.zk.base.ZooKeeperClient")
# Log zookeeper retry every 10 seconds
retry_log_rate = 10
def __init__(self):
"""
Initialize the ZooKeeper base client object.
"""
self.client: Optional[KazooClient] = None
self._last_retry_log: int = 0
self.on_connect_listeners: List[Callable[[], None]] = []
self.on_disconnect_listeners: List[Callable[[], None]] = []
def _connectionListener(self, state):
"""
Listener method for Kazoo connection state changes.
.. warning:: This method must not block.
"""
if state == KazooState.LOST:
self.log.debug("ZooKeeper connection: LOST")
elif state == KazooState.SUSPENDED:
self.log.debug("ZooKeeper connection: SUSPENDED")
else:
self.log.debug("ZooKeeper connection: CONNECTED")
@property
def connected(self):
return self.client and self.client.state == KazooState.CONNECTED
@property
def suspended(self):
return self.client and self.client.state == KazooState.SUSPENDED
@property
def lost(self):
return not self.client or self.client.state == KazooState.LOST
def logConnectionRetryEvent(self):
now = time.monotonic()
if now - self._last_retry_log >= self.retry_log_rate:
self.log.warning("Retrying zookeeper connection")
self._last_retry_log = now
def connect(self, hosts: str, read_only: bool = False,
timeout: float = 10.0, tls_cert: Optional[str] = None,
tls_key: Optional[str] = None,
tls_ca: Optional[str] = None):
"""
Establish a connection with ZooKeeper cluster.
Convenience method if a pre-existing ZooKeeper connection is not
supplied to the ZooKeeper object at instantiation time.
:param str hosts: Comma-separated list of hosts to connect to (e.g.
127.0.0.1:2181,127.0.0.1:2182,[::1]:2183).
:param bool read_only: If True, establishes a read-only connection.
:param float timeout: The ZooKeeper session timeout, in
seconds (default: 10.0).
:param str tls_key: Path to TLS key
:param str tls_cert: Path to TLS cert
:param str tls_ca: Path to TLS CA cert
"""
if self.client is None:
args = dict(hosts=hosts, read_only=read_only, timeout=timeout)
if tls_key:
args['use_ssl'] = True
args['keyfile'] = tls_key
args['certfile'] = tls_cert
args['ca'] = tls_ca
self.client = KazooClient(**args)
self.client.add_listener(self._connectionListener)
# Manually retry initial connection attempt
while True:
try:
self.client.start(1)
break
except KazooTimeoutError:
self.logConnectionRetryEvent()
for listener in self.on_connect_listeners:
listener()
def disconnect(self):
"""
Close the ZooKeeper cluster connection.
You should call this method if you used connect() to establish a
cluster connection.
"""
for listener in self.on_disconnect_listeners:
listener()
if self.client is not None and self.client.connected:
self.client.stop()
self.client.close()
self.client = None
def resetHosts(self, hosts):
"""
Reset the ZooKeeper cluster connection host list.
:param str hosts: Comma-separated list of hosts to connect to (e.g.
127.0.0.1:2181,127.0.0.1:2182,[::1]:2183).
"""
if self.client is not None:
self.client.set_hosts(hosts=hosts)
class ZooKeeperBase(metaclass=ABCMeta):
"""Base class for components that need to interact with Zookeeper."""
def __init__(self, client: ZooKeeperClient):
self.client = client
self.client.on_connect_listeners.append(self._onConnect)
self.client.on_disconnect_listeners.append(self._onDisconnect)
@property
def kazoo_client(self) -> KazooClient:
if not self.client.client:
raise NoClientException()
return self.client.client
def _onConnect(self):
pass
def _onDisconnect(self):
pass

27
zuul/zk/exceptions.py Normal file
View File

@ -0,0 +1,27 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from kazoo.exceptions import KazooException
class ZuulZooKeeperException(KazooException):
"""Base exception class for all custom ZK exceptions"""
pass
class LockException(ZuulZooKeeperException):
pass
class NoClientException(ZuulZooKeeperException):
def __init__(self):
super().__init__("No zookeeper client!")

File diff suppressed because it is too large Load Diff