Use Zuul-style ZooKeeper connections

We have made many improvements to connection handling in Zuul.
Bring those back to Nodepool by copying over the zuul/zk directory
which has our base ZK connection classes.

This will enable us to bring other Zuul classes over, such as the
component registry.

The existing connection-related code is removed and the remaining
model-style code is moved to nodepool.zk.zookeeper.  Almost every
file imported the model as nodepool.zk, so import adjustments are
made to compensate while keeping the code more or less as-is.

Change-Id: I9f793d7bbad573cb881dfcfdf11e3013e0f8e4a3
changes/10/841510/3
James E. Blair 9 months ago
parent 325aaf5047
commit 10df93540f

@ -30,7 +30,8 @@ from nodepool import config as nodepool_config
from nodepool import exceptions
from nodepool import provider_manager
from nodepool import stats
from nodepool import zk
from nodepool.zk import zookeeper as zk
from nodepool.zk import ZooKeeperClient
MINS = 60
@ -1362,7 +1363,7 @@ class NodePoolBuilder(object):
config = nodepool_config.loadConfig(self._config_path)
if self._secure_path:
nodepool_config.loadSecureConfig(config, self._secure_path)
if not config.zookeeper_servers.values():
if not config.zookeeper_servers:
raise RuntimeError('No ZooKeeper servers specified in config.')
if not config.images_dir:
raise RuntimeError('No images-dir specified in config.')
@ -1392,15 +1393,15 @@ class NodePoolBuilder(object):
builder_id = self._getBuilderID(builder_id_file)
# All worker threads share a single ZooKeeper instance/connection.
self.zk = zk.ZooKeeper(enable_cache=False)
self.zk.connect(
list(self._config.zookeeper_servers.values()),
self.zk_client = ZooKeeperClient(
self._config.zookeeper_servers,
tls_cert=self._config.zookeeper_tls_cert,
tls_key=self._config.zookeeper_tls_key,
tls_ca=self._config.zookeeper_tls_ca,
timeout=self._config.zookeeper_timeout,
)
self.zk_client.connect()
self.zk = zk.ZooKeeper(self.zk_client, enable_cache=False)
self.log.debug('Starting listener for build jobs')
# Create build and upload worker objects

@ -21,7 +21,8 @@ from prettytable import PrettyTable
from nodepool import launcher
from nodepool import provider_manager
from nodepool import status
from nodepool import zk
from nodepool.zk import zookeeper as zk
from nodepool.zk import ZooKeeperClient
from nodepool.cmd import NodepoolApp
from nodepool.cmd.config_validator import ConfigValidator
@ -425,12 +426,15 @@ class NodePoolCmd(NodepoolApp):
'request-list', 'info', 'erase',
'image-pause', 'image-unpause',
'export-image-data', 'import-image-data'):
self.zk = zk.ZooKeeper(enable_cache=False)
self.zk.connect(
list(config.zookeeper_servers.values()),
self.zk_client = ZooKeeperClient(
config.zookeeper_servers,
tls_cert=config.zookeeper_tls_cert,
tls_key=config.zookeeper_tls_key,
tls_ca=config.zookeeper_tls_ca)
tls_ca=config.zookeeper_tls_ca,
timeout=config.zookeeper_timeout,
)
self.zk_client.connect()
self.zk = zk.ZooKeeper(self.zk_client, enable_cache=False)
self.pool.setConfig(config)
self.args.func()

@ -15,16 +15,76 @@
# limitations under the License.
import functools
import ipaddress
import math
import os
import time
import yaml
from nodepool import zk
from nodepool.driver import ConfigValue
from nodepool.driver import Drivers
class ZooKeeperConnectionConfig(object):
'''
Represents the connection parameters for a ZooKeeper server.
'''
def __eq__(self, other):
if isinstance(other, ZooKeeperConnectionConfig):
if other.__dict__ == self.__dict__:
return True
return False
def __init__(self, host, port=2181, chroot=None):
'''Initialize the ZooKeeperConnectionConfig object.
:param str host: The hostname of the ZooKeeper server.
:param int port: The port on which ZooKeeper is listening.
Optional, default: 2181.
:param str chroot: A chroot for this connection. All
ZooKeeper nodes will be underneath this root path.
Optional, default: None.
(one per server) defining the ZooKeeper cluster servers. Only
the 'host' attribute is required.'.
'''
self.host = host
self.port = port
self.chroot = chroot or ''
def __repr__(self):
return "host=%s port=%s chroot=%s" % \
(self.host, self.port, self.chroot)
def buildZooKeeperHosts(host_list):
'''
Build the ZK cluster host list for client connections.
:param list host_list: A list of
:py:class:`~nodepool.zk.ZooKeeperConnectionConfig` objects (one
per server) defining the ZooKeeper cluster servers.
'''
if not isinstance(host_list, list):
raise Exception("'host_list' must be a list")
hosts = []
for host_def in host_list:
h = host_def.host
# If this looks like a ipv6 literal address, make sure it's
# quoted in []'s
try:
addr = ipaddress.ip_address(host_def.host)
if addr.version == 6:
h = '[%s]' % addr
except ValueError:
pass
host = '%s:%s%s' % (h, host_def.port, host_def.chroot)
hosts.append(host)
return ",".join(hosts)
class Config(ConfigValue):
'''
Class representing the nodepool configuration.
@ -104,12 +164,13 @@ class Config(ConfigValue):
if not zk_cfg:
return
hosts = []
for server in zk_cfg:
z = zk.ZooKeeperConnectionConfig(server['host'],
server.get('port', 2281),
server.get('chroot', None))
name = z.host + '_' + str(z.port)
self.zookeeper_servers[name] = z
z = ZooKeeperConnectionConfig(server['host'],
server.get('port', 2281),
server.get('chroot', None))
hosts.append(z)
self.zookeeper_servers = buildZooKeeperHosts(hosts)
def setZooKeeperTimeout(self, timeout):
self.zookeeper_timeout = float(timeout)

@ -24,7 +24,7 @@ import math
import os
import voluptuous as v
from nodepool import zk
from nodepool.zk import zookeeper as zk
from nodepool import exceptions
from nodepool.logconfig import get_annotated_logger

@ -16,7 +16,7 @@ import logging
from kazoo import exceptions as kze
from nodepool import zk
from nodepool.zk import zookeeper as zk
from nodepool.driver.simple import SimpleTaskManagerHandler
from nodepool.driver.utils import NodeLauncher

@ -19,7 +19,7 @@ import time
from nodepool.driver.utils import QuotaInformation, RateLimiter
from nodepool.driver import statemachine
from nodepool import zk
from nodepool.zk import zookeeper as zk
""" This driver behaves like a static driver execpt that the backing

@ -17,7 +17,7 @@ import logging
from kazoo import exceptions as kze
from nodepool import exceptions
from nodepool import zk
from nodepool.zk import zookeeper as zk
from nodepool.driver.utils import NodeLauncher
from nodepool.driver import NodeRequestHandler

@ -14,7 +14,7 @@
import logging
from nodepool import zk
from nodepool.zk import zookeeper as zk
from nodepool.driver.openshift.handler import OpenshiftLauncher
from nodepool.driver.openshift.handler import OpenshiftNodeRequestHandler

@ -22,7 +22,7 @@ import openstack
from nodepool import exceptions
from nodepool import nodeutils as utils
from nodepool import zk
from nodepool.zk import zookeeper as zk
from nodepool.driver.utils import NodeLauncher, QuotaInformation
from nodepool.driver import NodeRequestHandler

@ -28,7 +28,7 @@ from nodepool.driver.utils import QuotaInformation, QuotaSupport
from nodepool.driver.utils import NodeDeleter
from nodepool import stats
from nodepool import version
from nodepool import zk
from nodepool.zk import zookeeper as zk
# Import entire module to avoid partial-loading, circular import
from nodepool.driver.openstack import handler

@ -22,7 +22,7 @@ from nodepool.driver.utils import NodeLauncher, QuotaInformation, QuotaSupport
from nodepool.driver.utils import NodeDeleter
from nodepool.nodeutils import iterate_timeout, nodescan
from nodepool import exceptions
from nodepool import zk
from nodepool.zk import zookeeper as zk
# Private support classes

@ -26,7 +26,7 @@ from nodepool.nodeutils import nodescan
from nodepool.logconfig import get_annotated_logger
from nodepool import stats
from nodepool import exceptions
from nodepool import zk
from nodepool.zk import zookeeper as zk
from kazoo import exceptions as kze
import cachetools

@ -14,7 +14,7 @@
import logging
from nodepool import zk
from nodepool.zk import zookeeper as zk
from nodepool.driver import NodeRequestHandler

@ -22,7 +22,7 @@ from collections import Counter, namedtuple
from nodepool import exceptions
from nodepool import nodeutils
from nodepool import zk
from nodepool.zk import zookeeper as zk
from nodepool.driver import Provider
from nodepool.driver.utils import NodeDeleter
from nodepool.driver.utils import QuotaInformation, QuotaSupport

@ -14,7 +14,7 @@
import logging
from nodepool import zk
from nodepool.zk import zookeeper as zk
from nodepool.driver import NodeRequestHandler

@ -26,7 +26,7 @@ from kazoo import exceptions as kze
from nodepool import exceptions
from nodepool import stats
from nodepool import zk
from nodepool.zk import zookeeper as zk
from nodepool.logconfig import get_annotated_logger

@ -30,7 +30,8 @@ from nodepool import exceptions
from nodepool import provider_manager
from nodepool import stats
from nodepool import config as nodepool_config
from nodepool import zk
from nodepool.zk import zookeeper as zk
from nodepool.zk import ZooKeeperClient
from nodepool.driver.utils import QuotaInformation, QuotaSupport
from nodepool.logconfig import get_annotated_logger
from nodepool.version import version_info as npd_version_info
@ -962,26 +963,28 @@ class NodePool(threading.Thread):
def reconfigureZooKeeper(self, config):
if self.config:
running = list(self.config.zookeeper_servers.values())
running = self.config.zookeeper_servers
else:
running = None
configured = list(config.zookeeper_servers.values())
configured = config.zookeeper_servers
if running == configured:
return
if not self.zk and configured:
self.log.debug("Connecting to ZooKeeper servers")
self.zk = zk.ZooKeeper()
self.zk.connect(configured,
tls_cert=config.zookeeper_tls_cert,
tls_key=config.zookeeper_tls_key,
tls_ca=config.zookeeper_tls_ca,
timeout=config.zookeeper_timeout,
)
self.zk_client = ZooKeeperClient(
configured,
tls_cert=config.zookeeper_tls_cert,
tls_key=config.zookeeper_tls_key,
tls_ca=config.zookeeper_tls_ca,
timeout=config.zookeeper_timeout,
)
self.zk_client.connect()
self.zk = zk.ZooKeeper(self.zk_client)
else:
self.log.debug("Detected ZooKeeper server changes")
self.zk.resetHosts(configured)
self.zk_client.resetHosts(configured)
def setConfig(self, config):
self.config = config

@ -18,7 +18,7 @@ import os
import logging
import statsd
from nodepool import zk
from nodepool.zk import zookeeper as zk
log = logging.getLogger("nodepool.stats")

@ -36,7 +36,8 @@ import testtools
from nodepool import builder
from nodepool import launcher
from nodepool import webapp
from nodepool import zk
from nodepool.zk import zookeeper as zk
from nodepool.zk import ZooKeeperClient
from nodepool.cmd.config_validator import ConfigValidator
from nodepool.nodeutils import iterate_timeout
@ -637,14 +638,16 @@ class DBTestCase(BaseTestCase):
self.zookeeper_key,
))
self.zookeeper_chroot = kz_fxtr.zookeeper_chroot
self.zk = zk.ZooKeeper(enable_cache=False)
host = zk.ZooKeeperConnectionConfig(
self.zookeeper_host, self.zookeeper_port, self.zookeeper_chroot,
host = (f'{self.zookeeper_host}:{self.zookeeper_port}'
f'{self.zookeeper_chroot}')
self.zk_client = ZooKeeperClient(
host,
tls_ca=self.zookeeper_ca,
tls_cert=self.zookeeper_cert,
tls_key=self.zookeeper_key
)
self.zk.connect([host],
tls_ca=self.zookeeper_ca,
tls_cert=self.zookeeper_cert,
tls_key=self.zookeeper_key)
self.zk_client.connect()
self.zk = zk.ZooKeeper(self.zk_client, enable_cache=False)
self.addCleanup(self.zk.disconnect)
def printZKTree(self, node):

@ -21,7 +21,7 @@ import time
from nodepool import builder, tests
from nodepool.driver.fake import provider as fakeprovider
from nodepool import zk
from nodepool.zk import zookeeper as zk
from nodepool.config import Config
from nodepool.nodeutils import iterate_timeout

@ -24,7 +24,7 @@ import testtools
from nodepool.cmd import nodepoolcmd
from nodepool import tests
from nodepool import zk
from nodepool.zk import zookeeper as zk
from nodepool.nodeutils import iterate_timeout

@ -26,7 +26,7 @@ import testtools
from nodepool import config as nodepool_config
from nodepool import tests
from nodepool import zk
from nodepool.zk import zookeeper as zk
from nodepool.nodeutils import iterate_timeout
import nodepool.driver.statemachine
from nodepool.driver.statemachine import StateMachineProvider

@ -17,7 +17,7 @@
import logging
from nodepool import tests
from nodepool import zk
from nodepool.zk import zookeeper as zk
from nodepool.driver.statemachine import StateMachineProvider
from . import fake_azure

@ -26,7 +26,7 @@ import googleapiclient.discovery
import googleapiclient.errors
from nodepool import tests
from nodepool import zk
from nodepool.zk import zookeeper as zk
from nodepool.nodeutils import iterate_timeout

@ -18,7 +18,7 @@ import os
import logging
from nodepool import tests
from nodepool import zk
from nodepool.zk import zookeeper as zk
from nodepool.driver.statemachine import StateMachineProvider
from nodepool.driver.ibmvpc.adapter import IBMVPCAdapter

@ -18,7 +18,7 @@ import logging
import time
from nodepool import tests
from nodepool import zk
from nodepool.zk import zookeeper as zk
class FakeCoreClient(object):

@ -19,7 +19,7 @@ import logging
import testtools
from nodepool import tests
from nodepool import zk
from nodepool.zk import zookeeper as zk
from nodepool.driver.statemachine import StateMachineProvider
from nodepool.cmd.config_validator import ConfigValidator

@ -17,7 +17,7 @@ import fixtures
import logging
from nodepool import tests
from nodepool import zk
from nodepool.zk import zookeeper as zk
class FakeOpenshiftProjectsQuery:

@ -17,7 +17,7 @@ import fixtures
import logging
from nodepool import tests
from nodepool import zk
from nodepool.zk import zookeeper as zk
class FakeCoreClient(object):

@ -19,7 +19,7 @@ import os
from nodepool import config as nodepool_config
from nodepool import tests
from nodepool import zk
from nodepool.zk import zookeeper as zk
from nodepool.cmd.config_validator import ConfigValidator

@ -21,7 +21,7 @@ import mock
import testtools
from nodepool import tests
from nodepool import zk
from nodepool.zk import zookeeper as zk
from nodepool.driver.fake import provider as fakeprovider
from nodepool.nodeutils import iterate_timeout
import nodepool.launcher
@ -1658,11 +1658,10 @@ class TestLauncher(tests.DBTestCase):
self.assertEqual('secret', fake_image.env_vars['REG_PASSWORD'])
zk_servers = pool.config.zookeeper_servers
self.assertEqual(1, len(zk_servers))
key = list(zk_servers.keys())[0]
self.assertEqual(self.zookeeper_host, zk_servers[key].host)
self.assertEqual(self.zookeeper_port, zk_servers[key].port)
self.assertEqual(self.zookeeper_chroot, zk_servers[key].chroot)
self.assertTrue(len(zk_servers) > 0)
expected = (f'{self.zookeeper_host}:{self.zookeeper_port}'
f'{self.zookeeper_chroot}')
self.assertEqual(expected, zk_servers)
image = self.waitForImage('fake-provider', 'fake-image')
self.assertEqual(image.username, 'zuul')
@ -2430,8 +2429,8 @@ class TestLauncher(tests.DBTestCase):
# We want the first call to deleteRawNode() to fail, but subsequent
# ones to succeed, so we store a pointer to the actual method so we
# can reset it at the point we want to really delete.
real_method = nodepool.zk.ZooKeeper.deleteRawNode
nodepool.zk.ZooKeeper.deleteRawNode = mock.Mock(
real_method = zk.ZooKeeper.deleteRawNode
zk.ZooKeeper.deleteRawNode = mock.Mock(
side_effect=Exception('mock exception'))
# This call should leave the node in the DELETED state
@ -2442,5 +2441,5 @@ class TestLauncher(tests.DBTestCase):
self.assertEqual(zk.DELETED, node.state)
# Ready for the real delete now
nodepool.zk.ZooKeeper.deleteRawNode = real_method
zk.ZooKeeper.deleteRawNode = real_method
self.waitForNodeDeletion(node)

@ -20,7 +20,7 @@ from urllib import request
from urllib.error import HTTPError
from nodepool import tests
from nodepool import zk
from nodepool.zk import zookeeper as zk
from nodepool.nodeutils import iterate_timeout

@ -17,7 +17,8 @@ import uuid
from nodepool import exceptions as npe
from nodepool import tests
from nodepool import zk
from nodepool.zk import zookeeper as zk
from nodepool.config import ZooKeeperConnectionConfig, buildZooKeeperHosts
from nodepool.nodeutils import iterate_timeout
@ -28,38 +29,38 @@ class TestZooKeeper(tests.DBTestCase):
def test_buildZooKeeperHosts_single(self):
hosts = [
zk.ZooKeeperConnectionConfig('127.0.0.1', port=2181,
chroot='/test1')
ZooKeeperConnectionConfig('127.0.0.1', port=2181,
chroot='/test1')
]
self.assertEqual('127.0.0.1:2181/test1',
zk.buildZooKeeperHosts(hosts))
buildZooKeeperHosts(hosts))
def test_buildZooKeeperHosts_multiple(self):
hosts = [
zk.ZooKeeperConnectionConfig('127.0.0.1', port=2181,
chroot='/test1'),
zk.ZooKeeperConnectionConfig('127.0.0.2', port=2182,
chroot='/test2')
ZooKeeperConnectionConfig('127.0.0.1', port=2181,
chroot='/test1'),
ZooKeeperConnectionConfig('127.0.0.2', port=2182,
chroot='/test2')
]
self.assertEqual('127.0.0.1:2181/test1,127.0.0.2:2182/test2',
zk.buildZooKeeperHosts(hosts))
buildZooKeeperHosts(hosts))
def test_buildZooKeeperHosts_ipv6(self):
hosts = [
zk.ZooKeeperConnectionConfig(
ZooKeeperConnectionConfig(
'2001:4800:7817:103:be76:4eff:fe04:e359', port=2181,
chroot='/test1'),
zk.ZooKeeperConnectionConfig(
ZooKeeperConnectionConfig(
'[2002:4800:7817:103:be76:4eff:fe04:e359]', port=2181,
chroot='/test2'),
zk.ZooKeeperConnectionConfig('127.0.0.2', port=2182,
chroot='/test3')
ZooKeeperConnectionConfig('127.0.0.2', port=2182,
chroot='/test3')
]
self.assertEqual((
'[2001:4800:7817:103:be76:4eff:fe04:e359]:2181/test1,'
'[2002:4800:7817:103:be76:4eff:fe04:e359]:2181/test2,'
'127.0.0.2:2182/test3'
), zk.buildZooKeeperHosts(hosts))
), buildZooKeeperHosts(hosts))
def test_imageBuildLock(self):
path = self.zk._imageBuildLockPath("ubuntu-trusty")

@ -0,0 +1,216 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import time
from abc import ABCMeta
from threading import Thread
from kazoo.client import KazooClient
from kazoo.handlers.threading import KazooTimeoutError
from kazoo.protocol.states import KazooState
from nodepool.zk.exceptions import NoClientException
from nodepool.zk.handler import PoolSequentialThreadingHandler
class ZooKeeperClient(object):
log = logging.getLogger("nodepool.zk.ZooKeeperClient")
# Log zookeeper retry every 10 seconds
retry_log_rate = 10
def __init__(
self,
hosts,
read_only=False,
timeout=10.0,
tls_cert=None,
tls_key=None,
tls_ca=None,
):
"""
Initialize the ZooKeeper base client object.
:param str hosts: Comma-separated list of hosts to connect to (e.g.
127.0.0.1:2181,127.0.0.1:2182,[::1]:2183).
:param bool read_only: If True, establishes a read-only connection.
:param float timeout: The ZooKeeper session timeout, in
seconds (default: 10.0).
:param str tls_key: Path to TLS key
:param str tls_cert: Path to TLS cert
:param str tls_ca: Path to TLS CA cert
"""
self.hosts = hosts
self.read_only = read_only
self.timeout = timeout
self.tls_cert = tls_cert
self.tls_key = tls_key
self.tls_ca = tls_ca
self.was_lost = False
self.client = None
if not (tls_key and tls_cert and tls_ca):
raise Exception("A TLS ZooKeeper connection is required; "
"please supply the zookeeper-tls "
"config values.")
# Verify that we can read the cert files (Kazoo doesn't
# provide useful error messages).
for fn in (tls_cert, tls_key, tls_ca):
if fn:
with open(fn):
pass
self._last_retry_log = 0
self.on_connect_listeners = []
self.on_disconnect_listeners = []
self.on_connection_lost_listeners = []
self.on_reconnect_listeners = []
def _connectionListener(self, state):
"""
Listener method for Kazoo connection state changes.
.. warning:: This method must not block.
"""
if state == KazooState.LOST:
self.log.debug("ZooKeeper connection: LOST")
self.was_lost = True
for listener in self.on_connection_lost_listeners:
try:
listener()
except Exception:
self.log.exception("Exception calling listener:")
elif state == KazooState.SUSPENDED:
self.log.debug("ZooKeeper connection: SUSPENDED")
else:
self.log.debug("ZooKeeper connection: CONNECTED")
# Create a throwaway thread since zk operations can't
# happen in this one.
if self.was_lost:
self.was_lost = False
for listener in self.on_reconnect_listeners:
t = Thread(target=listener)
t.daemon = True
t.start()
@property
def connected(self):
return self.client and self.client.state == KazooState.CONNECTED
@property
def suspended(self):
return self.client and self.client.state == KazooState.SUSPENDED
@property
def lost(self):
return not self.client or self.client.state == KazooState.LOST
def logConnectionRetryEvent(self):
now = time.monotonic()
if now - self._last_retry_log >= self.retry_log_rate:
self.log.warning("Retrying zookeeper connection")
self._last_retry_log = now
def connect(self):
if self.client is None:
args = dict(
hosts=self.hosts,
read_only=self.read_only,
timeout=self.timeout,
handler=PoolSequentialThreadingHandler(),
)
if self.tls_key:
args['use_ssl'] = True
args['keyfile'] = self.tls_key
args['certfile'] = self.tls_cert
args['ca'] = self.tls_ca
self.client = KazooClient(**args)
self.client.add_listener(self._connectionListener)
# Manually retry initial connection attempt
while True:
try:
self.client.start(1)
break
except KazooTimeoutError:
self.logConnectionRetryEvent()
for listener in self.on_connect_listeners:
listener()
def disconnect(self):
"""
Close the ZooKeeper cluster connection.
You should call this method if you used connect() to establish a
cluster connection.
"""
for listener in self.on_disconnect_listeners:
listener()
if self.client is not None and self.client.connected:
self.client.stop()
self.client.close()
self.client = None
def resetHosts(self, hosts):
"""
Reset the ZooKeeper cluster connection host list.
:param str hosts: Comma-separated list of hosts to connect to (e.g.
127.0.0.1:2181,127.0.0.1:2182,[::1]:2183).
"""
if self.client is not None:
self.client.set_hosts(hosts=hosts)
def commitTransaction(self, tr):
results = tr.commit()
for res in results:
self.log.debug("Transaction response %s", repr(res))
for res in results:
if isinstance(res, Exception):
raise res
return results
class ZooKeeperSimpleBase(metaclass=ABCMeta):
"""Base class for stateless Zookeeper interaction."""
def __init__(self, client):
self.client = client
@property
def kazoo_client(self):
if not self.client.client:
raise NoClientException()
return self.client.client
class ZooKeeperBase(ZooKeeperSimpleBase):
"""Base class for registering state handling methods with ZooKeeper."""
def __init__(self, client):
super().__init__(client)
if client:
self.client.on_connect_listeners.append(self._onConnect)
self.client.on_disconnect_listeners.append(self._onDisconnect)
self.client.on_reconnect_listeners.append(self._onReconnect)
def _onConnect(self):
pass
def _onDisconnect(self):
pass
def _onReconnect(self):
pass

@ -0,0 +1,27 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from kazoo.exceptions import KazooException
class NodepoolZooKeeperException(KazooException):
"""Base exception class for all custom ZK exceptions"""
pass
class LockException(NodepoolZooKeeperException):
pass
class NoClientException(NodepoolZooKeeperException):
def __init__(self):
super().__init__("No zookeeper client!")

@ -0,0 +1,36 @@
# Copyright 2021 Acme Gating, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from concurrent.futures import ThreadPoolExecutor
from kazoo.handlers.threading import SequentialThreadingHandler
class PoolSequentialThreadingHandler(SequentialThreadingHandler):
def __init__(self):
super().__init__()
self._pool_executor = None
def start(self):
self._pool_executor = ThreadPoolExecutor(max_workers=10)
super().start()
def stop(self):
super().stop()
if self._pool_executor:
self._pool_executor.shutdown()
self._pool_executor = None
def short_spawn(self, func, *args, **kwargs):
self._pool_executor.submit(func, *args, **kwargs)

@ -13,15 +13,13 @@
from contextlib import contextmanager
from copy import copy
import abc
import ipaddress
import json
import logging
import time
import uuid
from kazoo.client import KazooClient, KazooState
from kazoo.client import KazooState
from kazoo import exceptions as kze
from kazoo.handlers.threading import KazooTimeoutError
from kazoo.recipe.lock import Lock
from kazoo.recipe.cache import TreeCache, TreeEvent
from kazoo.recipe.election import Election
@ -73,66 +71,6 @@ def as_list(item):
return [item]
class ZooKeeperConnectionConfig(object):
'''
Represents the connection parameters for a ZooKeeper server.
'''
def __eq__(self, other):
if isinstance(other, ZooKeeperConnectionConfig):
if other.__dict__ == self.__dict__:
return True
return False
def __init__(self, host, port=2181, chroot=None):
'''Initialize the ZooKeeperConnectionConfig object.
:param str host: The hostname of the ZooKeeper server.
:param int port: The port on which ZooKeeper is listening.
Optional, default: 2181.
:param str chroot: A chroot for this connection. All
ZooKeeper nodes will be underneath this root path.
Optional, default: None.
(one per server) defining the ZooKeeper cluster servers. Only
the 'host' attribute is required.'.
'''
self.host = host
self.port = port
self.chroot = chroot or ''
def __repr__(self):
return "host=%s port=%s chroot=%s" % \
(self.host, self.port, self.chroot)
def buildZooKeeperHosts(host_list):
'''
Build the ZK cluster host list for client connections.
:param list host_list: A list of
:py:class:`~nodepool.zk.ZooKeeperConnectionConfig` objects (one
per server) defining the ZooKeeper cluster servers.
'''
if not isinstance(host_list, list):
raise Exception("'host_list' must be a list")
hosts = []
for host_def in host_list:
h = host_def.host
# If this looks like a ipv6 literal address, make sure it's
# quoted in []'s
try:
addr = ipaddress.ip_address(host_def.host)
if addr.version == 6:
h = '[%s]' % addr
except ValueError:
pass
host = '%s:%s%s' % (h, host_def.port, host_def.chroot)
hosts.append(host)
return ",".join(hosts)
class ZooKeeperWatchEvent(object):
'''
Class representing a watch trigger event.
@ -192,6 +130,9 @@ class Launcher(Serializable):
else:
return False
def __hash__(self):
return hash(self.id)
@property
def supported_labels(self):
return self._supported_labels
@ -797,11 +738,12 @@ class ZooKeeper(object):
# Log zookeeper retry every 10 seconds
retry_log_rate = 10
def __init__(self, enable_cache=True):
def __init__(self, zk_client, enable_cache=True):
'''
Initialize the ZooKeeper object.
'''
self.client = None
self.zk_client = zk_client # nodepool.zk.ZooKeeperClient
self.client = zk_client.client # KazooClient
self._became_lost = False
self._last_retry_log = 0
self._node_cache = None
@ -809,9 +751,19 @@ class ZooKeeper(object):
self._cached_nodes = {}
self._cached_node_requests = {}
self.enable_cache = enable_cache
self.node_stats_event = None
if self.enable_cache:
self._node_cache = TreeCache(self.client, self.NODE_ROOT)
self._node_cache.listen_fault(self.cacheFaultListener)
self._node_cache.listen(self.nodeCacheListener)
self._node_cache.start()
self._request_cache = TreeCache(self.client, self.REQUEST_ROOT)
self._request_cache.listen_fault(self.cacheFaultListener)
self._request_cache.listen(self.requestCacheListener)
self._request_cache.start()
# =======================================================================
# Private Methods
# =======================================================================
@ -1011,61 +963,6 @@ class ZooKeeper(object):
def resetLostFlag(self):
self._became_lost = False
def connect(self, host_list, read_only=False, tls_cert=None,
tls_key=None, tls_ca=None, timeout=10.0):
'''
Establish a connection with ZooKeeper cluster.
Convenience method if a pre-existing ZooKeeper connection is not
supplied to the ZooKeeper object at instantiation time.
:param list host_list: A list of
:py:class:`~nodepool.zk.ZooKeeperConnectionConfig` objects
(one per server) defining the ZooKeeper cluster servers.
:param bool read_only: If True, establishes a read-only connection.
:param str tls_key: Path to TLS key
:param str tls_cert: Path to TLS cert
:param str tls_ca: Path to TLS CA cert
'''
if self.client is None:
hosts = buildZooKeeperHosts(host_list)
args = dict(
hosts=hosts,
read_only=read_only,
timeout=timeout,
)
args['use_ssl'] = True
if not (tls_key and tls_cert and tls_ca):
raise Exception("A TLS ZooKeeper connection is required; "
"please supply the zookeeper-tls "
"config values.")
args['keyfile'] = tls_key
args['certfile'] = tls_cert
args['ca'] = tls_ca
self.client = KazooClient(**args)
self.client.add_listener(self._connection_listener)
# Manually retry initial connection attempt
while True:
try:
self.client.start(1)
break
except KazooTimeoutError:
self.logConnectionRetryEvent()
if self.enable_cache:
self._node_cache = TreeCache(self.client, self.NODE_ROOT)
self._node_cache.listen_fault(self.cacheFaultListener)
self._node_cache.listen(self.nodeCacheListener)
self._node_cache.start()
self._request_cache = TreeCache(self.client, self.REQUEST_ROOT)
self._request_cache.listen_fault(self.cacheFaultListener)
self._request_cache.listen(self.requestCacheListener)
self._request_cache.start()
def disconnect(self):
'''
Close the ZooKeeper cluster connection.
@ -1087,16 +984,13 @@ class ZooKeeper(object):
self.client.close()
self.client = None
def resetHosts(self, host_list):
def resetHosts(self, hosts):
'''
Reset the ZooKeeper cluster connection host list.
:param list host_list: A list of
:py:class:`~nodepool.zk.ZooKeeperConnectionConfig` objects
(one per server) defining the ZooKeeper cluster servers.
:param str host_list: A ZK host list
'''
if self.client is not None:
hosts = buildZooKeeperHosts(host_list)
self.client.set_hosts(hosts=hosts)
@contextmanager

@ -16,7 +16,8 @@ import argparse
import logging
import nodepool.config
import nodepool.zk
from nodepool.zk import zookeeper as zk
from nodepool.zk import ZooKeeperClient
# A script to print the zookeeper tree given a nodepool config file.
@ -30,8 +31,15 @@ args = parser.parse_args()
config = nodepool.config.loadConfig(args.config)
zk = nodepool.zk.ZooKeeper(enable_cache=False)
zk.connect(list(config.zookeeper_servers.values()))
zk_client = ZooKeeperClient(
config.zookeeper_servers,
tls_cert=config.zookeeper_tls_cert,
tls_key=config.zookeeper_tls_key,
tls_ca=config.zookeeper_tls_ca,
timeout=config.zookeeper_timeout,
)
zk_client.connect()
zk = zk.ZooKeeper(zk_client, enable_cache=False)
def join(a, b):
if a.endswith('/'):

Loading…
Cancel
Save