From 10df93540fa49cf2134a46fa63440d1a905b3dfd Mon Sep 17 00:00:00 2001 From: "James E. Blair" Date: Wed, 11 May 2022 16:56:17 -0700 Subject: [PATCH] Use Zuul-style ZooKeeper connections We have made many improvements to connection handling in Zuul. Bring those back to Nodepool by copying over the zuul/zk directory which has our base ZK connection classes. This will enable us to bring other Zuul classes over, such as the component registry. The existing connection-related code is removed and the remaining model-style code is moved to nodepool.zk.zookeeper. Almost every file imported the model as nodepool.zk, so import adjustments are made to compensate while keeping the code more or less as-is. Change-Id: I9f793d7bbad573cb881dfcfdf11e3013e0f8e4a3 --- nodepool/builder.py | 13 +- nodepool/cmd/nodepoolcmd.py | 14 +- nodepool/config.py | 73 +++++- nodepool/driver/__init__.py | 2 +- nodepool/driver/kubernetes/handler.py | 2 +- nodepool/driver/metastatic/adapter.py | 2 +- nodepool/driver/openshift/handler.py | 2 +- nodepool/driver/openshiftpods/handler.py | 2 +- nodepool/driver/openstack/handler.py | 2 +- nodepool/driver/openstack/provider.py | 2 +- nodepool/driver/simple.py | 2 +- nodepool/driver/statemachine.py | 2 +- nodepool/driver/static/handler.py | 2 +- nodepool/driver/static/provider.py | 2 +- nodepool/driver/test/handler.py | 2 +- nodepool/driver/utils.py | 2 +- nodepool/launcher.py | 25 +- nodepool/stats.py | 2 +- nodepool/tests/__init__.py | 19 +- nodepool/tests/unit/test_builder.py | 2 +- nodepool/tests/unit/test_commands.py | 2 +- nodepool/tests/unit/test_driver_aws.py | 2 +- nodepool/tests/unit/test_driver_azure.py | 2 +- nodepool/tests/unit/test_driver_gce.py | 2 +- nodepool/tests/unit/test_driver_ibmvpc.py | 2 +- nodepool/tests/unit/test_driver_kubernetes.py | 2 +- nodepool/tests/unit/test_driver_metastatic.py | 2 +- nodepool/tests/unit/test_driver_openshift.py | 2 +- .../tests/unit/test_driver_openshiftpods.py | 2 +- nodepool/tests/unit/test_driver_static.py | 2 +- nodepool/tests/unit/test_launcher.py | 17 +- nodepool/tests/unit/test_webapp.py | 2 +- nodepool/tests/unit/test_zk.py | 29 +-- nodepool/zk/__init__.py | 216 ++++++++++++++++++ nodepool/zk/exceptions.py | 27 +++ nodepool/zk/handler.py | 36 +++ nodepool/{zk.py => zk/zookeeper.py} | 146 ++---------- tools/print-zk.py | 14 +- 38 files changed, 467 insertions(+), 214 deletions(-) create mode 100644 nodepool/zk/__init__.py create mode 100644 nodepool/zk/exceptions.py create mode 100644 nodepool/zk/handler.py rename nodepool/{zk.py => zk/zookeeper.py} (94%) diff --git a/nodepool/builder.py b/nodepool/builder.py index d14008336..e1e57ae45 100644 --- a/nodepool/builder.py +++ b/nodepool/builder.py @@ -30,7 +30,8 @@ from nodepool import config as nodepool_config from nodepool import exceptions from nodepool import provider_manager from nodepool import stats -from nodepool import zk +from nodepool.zk import zookeeper as zk +from nodepool.zk import ZooKeeperClient MINS = 60 @@ -1362,7 +1363,7 @@ class NodePoolBuilder(object): config = nodepool_config.loadConfig(self._config_path) if self._secure_path: nodepool_config.loadSecureConfig(config, self._secure_path) - if not config.zookeeper_servers.values(): + if not config.zookeeper_servers: raise RuntimeError('No ZooKeeper servers specified in config.') if not config.images_dir: raise RuntimeError('No images-dir specified in config.') @@ -1392,15 +1393,15 @@ class NodePoolBuilder(object): builder_id = self._getBuilderID(builder_id_file) # All worker threads share a single ZooKeeper instance/connection. - self.zk = zk.ZooKeeper(enable_cache=False) - self.zk.connect( - list(self._config.zookeeper_servers.values()), + self.zk_client = ZooKeeperClient( + self._config.zookeeper_servers, tls_cert=self._config.zookeeper_tls_cert, tls_key=self._config.zookeeper_tls_key, tls_ca=self._config.zookeeper_tls_ca, timeout=self._config.zookeeper_timeout, ) - + self.zk_client.connect() + self.zk = zk.ZooKeeper(self.zk_client, enable_cache=False) self.log.debug('Starting listener for build jobs') # Create build and upload worker objects diff --git a/nodepool/cmd/nodepoolcmd.py b/nodepool/cmd/nodepoolcmd.py index 457725363..ec8629505 100644 --- a/nodepool/cmd/nodepoolcmd.py +++ b/nodepool/cmd/nodepoolcmd.py @@ -21,7 +21,8 @@ from prettytable import PrettyTable from nodepool import launcher from nodepool import provider_manager from nodepool import status -from nodepool import zk +from nodepool.zk import zookeeper as zk +from nodepool.zk import ZooKeeperClient from nodepool.cmd import NodepoolApp from nodepool.cmd.config_validator import ConfigValidator @@ -425,12 +426,15 @@ class NodePoolCmd(NodepoolApp): 'request-list', 'info', 'erase', 'image-pause', 'image-unpause', 'export-image-data', 'import-image-data'): - self.zk = zk.ZooKeeper(enable_cache=False) - self.zk.connect( - list(config.zookeeper_servers.values()), + self.zk_client = ZooKeeperClient( + config.zookeeper_servers, tls_cert=config.zookeeper_tls_cert, tls_key=config.zookeeper_tls_key, - tls_ca=config.zookeeper_tls_ca) + tls_ca=config.zookeeper_tls_ca, + timeout=config.zookeeper_timeout, + ) + self.zk_client.connect() + self.zk = zk.ZooKeeper(self.zk_client, enable_cache=False) self.pool.setConfig(config) self.args.func() diff --git a/nodepool/config.py b/nodepool/config.py index a49ffe64c..3d6ccafe5 100644 --- a/nodepool/config.py +++ b/nodepool/config.py @@ -15,16 +15,76 @@ # limitations under the License. import functools +import ipaddress import math import os import time import yaml -from nodepool import zk from nodepool.driver import ConfigValue from nodepool.driver import Drivers +class ZooKeeperConnectionConfig(object): + ''' + Represents the connection parameters for a ZooKeeper server. + ''' + + def __eq__(self, other): + if isinstance(other, ZooKeeperConnectionConfig): + if other.__dict__ == self.__dict__: + return True + return False + + def __init__(self, host, port=2181, chroot=None): + '''Initialize the ZooKeeperConnectionConfig object. + + :param str host: The hostname of the ZooKeeper server. + :param int port: The port on which ZooKeeper is listening. + Optional, default: 2181. + :param str chroot: A chroot for this connection. All + ZooKeeper nodes will be underneath this root path. + Optional, default: None. + + (one per server) defining the ZooKeeper cluster servers. Only + the 'host' attribute is required.'. + + ''' + self.host = host + self.port = port + self.chroot = chroot or '' + + def __repr__(self): + return "host=%s port=%s chroot=%s" % \ + (self.host, self.port, self.chroot) + + +def buildZooKeeperHosts(host_list): + ''' + Build the ZK cluster host list for client connections. + + :param list host_list: A list of + :py:class:`~nodepool.zk.ZooKeeperConnectionConfig` objects (one + per server) defining the ZooKeeper cluster servers. + ''' + if not isinstance(host_list, list): + raise Exception("'host_list' must be a list") + hosts = [] + for host_def in host_list: + h = host_def.host + # If this looks like a ipv6 literal address, make sure it's + # quoted in []'s + try: + addr = ipaddress.ip_address(host_def.host) + if addr.version == 6: + h = '[%s]' % addr + except ValueError: + pass + host = '%s:%s%s' % (h, host_def.port, host_def.chroot) + hosts.append(host) + return ",".join(hosts) + + class Config(ConfigValue): ''' Class representing the nodepool configuration. @@ -104,12 +164,13 @@ class Config(ConfigValue): if not zk_cfg: return + hosts = [] for server in zk_cfg: - z = zk.ZooKeeperConnectionConfig(server['host'], - server.get('port', 2281), - server.get('chroot', None)) - name = z.host + '_' + str(z.port) - self.zookeeper_servers[name] = z + z = ZooKeeperConnectionConfig(server['host'], + server.get('port', 2281), + server.get('chroot', None)) + hosts.append(z) + self.zookeeper_servers = buildZooKeeperHosts(hosts) def setZooKeeperTimeout(self, timeout): self.zookeeper_timeout = float(timeout) diff --git a/nodepool/driver/__init__.py b/nodepool/driver/__init__.py index 88e8b3bfe..37f0b34d6 100644 --- a/nodepool/driver/__init__.py +++ b/nodepool/driver/__init__.py @@ -24,7 +24,7 @@ import math import os import voluptuous as v -from nodepool import zk +from nodepool.zk import zookeeper as zk from nodepool import exceptions from nodepool.logconfig import get_annotated_logger diff --git a/nodepool/driver/kubernetes/handler.py b/nodepool/driver/kubernetes/handler.py index f26c2381c..89855f13b 100644 --- a/nodepool/driver/kubernetes/handler.py +++ b/nodepool/driver/kubernetes/handler.py @@ -16,7 +16,7 @@ import logging from kazoo import exceptions as kze -from nodepool import zk +from nodepool.zk import zookeeper as zk from nodepool.driver.simple import SimpleTaskManagerHandler from nodepool.driver.utils import NodeLauncher diff --git a/nodepool/driver/metastatic/adapter.py b/nodepool/driver/metastatic/adapter.py index a24f0b661..a08654fb7 100644 --- a/nodepool/driver/metastatic/adapter.py +++ b/nodepool/driver/metastatic/adapter.py @@ -19,7 +19,7 @@ import time from nodepool.driver.utils import QuotaInformation, RateLimiter from nodepool.driver import statemachine -from nodepool import zk +from nodepool.zk import zookeeper as zk """ This driver behaves like a static driver execpt that the backing diff --git a/nodepool/driver/openshift/handler.py b/nodepool/driver/openshift/handler.py index e0e416bf9..1fb3efb2a 100644 --- a/nodepool/driver/openshift/handler.py +++ b/nodepool/driver/openshift/handler.py @@ -17,7 +17,7 @@ import logging from kazoo import exceptions as kze from nodepool import exceptions -from nodepool import zk +from nodepool.zk import zookeeper as zk from nodepool.driver.utils import NodeLauncher from nodepool.driver import NodeRequestHandler diff --git a/nodepool/driver/openshiftpods/handler.py b/nodepool/driver/openshiftpods/handler.py index 6f1beabdc..fb34e59af 100644 --- a/nodepool/driver/openshiftpods/handler.py +++ b/nodepool/driver/openshiftpods/handler.py @@ -14,7 +14,7 @@ import logging -from nodepool import zk +from nodepool.zk import zookeeper as zk from nodepool.driver.openshift.handler import OpenshiftLauncher from nodepool.driver.openshift.handler import OpenshiftNodeRequestHandler diff --git a/nodepool/driver/openstack/handler.py b/nodepool/driver/openstack/handler.py index 22ad10757..9ac8cf072 100644 --- a/nodepool/driver/openstack/handler.py +++ b/nodepool/driver/openstack/handler.py @@ -22,7 +22,7 @@ import openstack from nodepool import exceptions from nodepool import nodeutils as utils -from nodepool import zk +from nodepool.zk import zookeeper as zk from nodepool.driver.utils import NodeLauncher, QuotaInformation from nodepool.driver import NodeRequestHandler diff --git a/nodepool/driver/openstack/provider.py b/nodepool/driver/openstack/provider.py index cfe986363..3d64cef9c 100755 --- a/nodepool/driver/openstack/provider.py +++ b/nodepool/driver/openstack/provider.py @@ -28,7 +28,7 @@ from nodepool.driver.utils import QuotaInformation, QuotaSupport from nodepool.driver.utils import NodeDeleter from nodepool import stats from nodepool import version -from nodepool import zk +from nodepool.zk import zookeeper as zk # Import entire module to avoid partial-loading, circular import from nodepool.driver.openstack import handler diff --git a/nodepool/driver/simple.py b/nodepool/driver/simple.py index 20c8c3ecf..e4733748f 100644 --- a/nodepool/driver/simple.py +++ b/nodepool/driver/simple.py @@ -22,7 +22,7 @@ from nodepool.driver.utils import NodeLauncher, QuotaInformation, QuotaSupport from nodepool.driver.utils import NodeDeleter from nodepool.nodeutils import iterate_timeout, nodescan from nodepool import exceptions -from nodepool import zk +from nodepool.zk import zookeeper as zk # Private support classes diff --git a/nodepool/driver/statemachine.py b/nodepool/driver/statemachine.py index cd4bd92b4..6bb3fcdc3 100644 --- a/nodepool/driver/statemachine.py +++ b/nodepool/driver/statemachine.py @@ -26,7 +26,7 @@ from nodepool.nodeutils import nodescan from nodepool.logconfig import get_annotated_logger from nodepool import stats from nodepool import exceptions -from nodepool import zk +from nodepool.zk import zookeeper as zk from kazoo import exceptions as kze import cachetools diff --git a/nodepool/driver/static/handler.py b/nodepool/driver/static/handler.py index 6e63ae22b..15dfa5fdc 100644 --- a/nodepool/driver/static/handler.py +++ b/nodepool/driver/static/handler.py @@ -14,7 +14,7 @@ import logging -from nodepool import zk +from nodepool.zk import zookeeper as zk from nodepool.driver import NodeRequestHandler diff --git a/nodepool/driver/static/provider.py b/nodepool/driver/static/provider.py index 79d2e9e10..dec4c03c5 100644 --- a/nodepool/driver/static/provider.py +++ b/nodepool/driver/static/provider.py @@ -22,7 +22,7 @@ from collections import Counter, namedtuple from nodepool import exceptions from nodepool import nodeutils -from nodepool import zk +from nodepool.zk import zookeeper as zk from nodepool.driver import Provider from nodepool.driver.utils import NodeDeleter from nodepool.driver.utils import QuotaInformation, QuotaSupport diff --git a/nodepool/driver/test/handler.py b/nodepool/driver/test/handler.py index 98f87f018..8b60a1c99 100644 --- a/nodepool/driver/test/handler.py +++ b/nodepool/driver/test/handler.py @@ -14,7 +14,7 @@ import logging -from nodepool import zk +from nodepool.zk import zookeeper as zk from nodepool.driver import NodeRequestHandler diff --git a/nodepool/driver/utils.py b/nodepool/driver/utils.py index 7daae5709..36b3607a5 100644 --- a/nodepool/driver/utils.py +++ b/nodepool/driver/utils.py @@ -26,7 +26,7 @@ from kazoo import exceptions as kze from nodepool import exceptions from nodepool import stats -from nodepool import zk +from nodepool.zk import zookeeper as zk from nodepool.logconfig import get_annotated_logger diff --git a/nodepool/launcher.py b/nodepool/launcher.py index a993c9284..c5aa08de7 100644 --- a/nodepool/launcher.py +++ b/nodepool/launcher.py @@ -30,7 +30,8 @@ from nodepool import exceptions from nodepool import provider_manager from nodepool import stats from nodepool import config as nodepool_config -from nodepool import zk +from nodepool.zk import zookeeper as zk +from nodepool.zk import ZooKeeperClient from nodepool.driver.utils import QuotaInformation, QuotaSupport from nodepool.logconfig import get_annotated_logger from nodepool.version import version_info as npd_version_info @@ -962,26 +963,28 @@ class NodePool(threading.Thread): def reconfigureZooKeeper(self, config): if self.config: - running = list(self.config.zookeeper_servers.values()) + running = self.config.zookeeper_servers else: running = None - configured = list(config.zookeeper_servers.values()) + configured = config.zookeeper_servers if running == configured: return if not self.zk and configured: self.log.debug("Connecting to ZooKeeper servers") - self.zk = zk.ZooKeeper() - self.zk.connect(configured, - tls_cert=config.zookeeper_tls_cert, - tls_key=config.zookeeper_tls_key, - tls_ca=config.zookeeper_tls_ca, - timeout=config.zookeeper_timeout, - ) + self.zk_client = ZooKeeperClient( + configured, + tls_cert=config.zookeeper_tls_cert, + tls_key=config.zookeeper_tls_key, + tls_ca=config.zookeeper_tls_ca, + timeout=config.zookeeper_timeout, + ) + self.zk_client.connect() + self.zk = zk.ZooKeeper(self.zk_client) else: self.log.debug("Detected ZooKeeper server changes") - self.zk.resetHosts(configured) + self.zk_client.resetHosts(configured) def setConfig(self, config): self.config = config diff --git a/nodepool/stats.py b/nodepool/stats.py index 229a18d8b..f1d333e24 100644 --- a/nodepool/stats.py +++ b/nodepool/stats.py @@ -18,7 +18,7 @@ import os import logging import statsd -from nodepool import zk +from nodepool.zk import zookeeper as zk log = logging.getLogger("nodepool.stats") diff --git a/nodepool/tests/__init__.py b/nodepool/tests/__init__.py index 56edb03db..84bd8e132 100644 --- a/nodepool/tests/__init__.py +++ b/nodepool/tests/__init__.py @@ -36,7 +36,8 @@ import testtools from nodepool import builder from nodepool import launcher from nodepool import webapp -from nodepool import zk +from nodepool.zk import zookeeper as zk +from nodepool.zk import ZooKeeperClient from nodepool.cmd.config_validator import ConfigValidator from nodepool.nodeutils import iterate_timeout @@ -637,14 +638,16 @@ class DBTestCase(BaseTestCase): self.zookeeper_key, )) self.zookeeper_chroot = kz_fxtr.zookeeper_chroot - self.zk = zk.ZooKeeper(enable_cache=False) - host = zk.ZooKeeperConnectionConfig( - self.zookeeper_host, self.zookeeper_port, self.zookeeper_chroot, + host = (f'{self.zookeeper_host}:{self.zookeeper_port}' + f'{self.zookeeper_chroot}') + self.zk_client = ZooKeeperClient( + host, + tls_ca=self.zookeeper_ca, + tls_cert=self.zookeeper_cert, + tls_key=self.zookeeper_key ) - self.zk.connect([host], - tls_ca=self.zookeeper_ca, - tls_cert=self.zookeeper_cert, - tls_key=self.zookeeper_key) + self.zk_client.connect() + self.zk = zk.ZooKeeper(self.zk_client, enable_cache=False) self.addCleanup(self.zk.disconnect) def printZKTree(self, node): diff --git a/nodepool/tests/unit/test_builder.py b/nodepool/tests/unit/test_builder.py index 17819118a..d6159509d 100644 --- a/nodepool/tests/unit/test_builder.py +++ b/nodepool/tests/unit/test_builder.py @@ -21,7 +21,7 @@ import time from nodepool import builder, tests from nodepool.driver.fake import provider as fakeprovider -from nodepool import zk +from nodepool.zk import zookeeper as zk from nodepool.config import Config from nodepool.nodeutils import iterate_timeout diff --git a/nodepool/tests/unit/test_commands.py b/nodepool/tests/unit/test_commands.py index 3fb24a04d..6a8f772db 100644 --- a/nodepool/tests/unit/test_commands.py +++ b/nodepool/tests/unit/test_commands.py @@ -24,7 +24,7 @@ import testtools from nodepool.cmd import nodepoolcmd from nodepool import tests -from nodepool import zk +from nodepool.zk import zookeeper as zk from nodepool.nodeutils import iterate_timeout diff --git a/nodepool/tests/unit/test_driver_aws.py b/nodepool/tests/unit/test_driver_aws.py index 9be7c194f..53f46dfd6 100644 --- a/nodepool/tests/unit/test_driver_aws.py +++ b/nodepool/tests/unit/test_driver_aws.py @@ -26,7 +26,7 @@ import testtools from nodepool import config as nodepool_config from nodepool import tests -from nodepool import zk +from nodepool.zk import zookeeper as zk from nodepool.nodeutils import iterate_timeout import nodepool.driver.statemachine from nodepool.driver.statemachine import StateMachineProvider diff --git a/nodepool/tests/unit/test_driver_azure.py b/nodepool/tests/unit/test_driver_azure.py index 75236064b..aa99b49ff 100644 --- a/nodepool/tests/unit/test_driver_azure.py +++ b/nodepool/tests/unit/test_driver_azure.py @@ -17,7 +17,7 @@ import logging from nodepool import tests -from nodepool import zk +from nodepool.zk import zookeeper as zk from nodepool.driver.statemachine import StateMachineProvider from . import fake_azure diff --git a/nodepool/tests/unit/test_driver_gce.py b/nodepool/tests/unit/test_driver_gce.py index 290ef6046..234b8c226 100644 --- a/nodepool/tests/unit/test_driver_gce.py +++ b/nodepool/tests/unit/test_driver_gce.py @@ -26,7 +26,7 @@ import googleapiclient.discovery import googleapiclient.errors from nodepool import tests -from nodepool import zk +from nodepool.zk import zookeeper as zk from nodepool.nodeutils import iterate_timeout diff --git a/nodepool/tests/unit/test_driver_ibmvpc.py b/nodepool/tests/unit/test_driver_ibmvpc.py index d8e6aac2d..9aad05b7f 100644 --- a/nodepool/tests/unit/test_driver_ibmvpc.py +++ b/nodepool/tests/unit/test_driver_ibmvpc.py @@ -18,7 +18,7 @@ import os import logging from nodepool import tests -from nodepool import zk +from nodepool.zk import zookeeper as zk from nodepool.driver.statemachine import StateMachineProvider from nodepool.driver.ibmvpc.adapter import IBMVPCAdapter diff --git a/nodepool/tests/unit/test_driver_kubernetes.py b/nodepool/tests/unit/test_driver_kubernetes.py index 9b4c44526..80c01eb18 100644 --- a/nodepool/tests/unit/test_driver_kubernetes.py +++ b/nodepool/tests/unit/test_driver_kubernetes.py @@ -18,7 +18,7 @@ import logging import time from nodepool import tests -from nodepool import zk +from nodepool.zk import zookeeper as zk class FakeCoreClient(object): diff --git a/nodepool/tests/unit/test_driver_metastatic.py b/nodepool/tests/unit/test_driver_metastatic.py index 9e64ff1a3..5f7dec398 100644 --- a/nodepool/tests/unit/test_driver_metastatic.py +++ b/nodepool/tests/unit/test_driver_metastatic.py @@ -19,7 +19,7 @@ import logging import testtools from nodepool import tests -from nodepool import zk +from nodepool.zk import zookeeper as zk from nodepool.driver.statemachine import StateMachineProvider from nodepool.cmd.config_validator import ConfigValidator diff --git a/nodepool/tests/unit/test_driver_openshift.py b/nodepool/tests/unit/test_driver_openshift.py index a418a7b00..8efd0d624 100644 --- a/nodepool/tests/unit/test_driver_openshift.py +++ b/nodepool/tests/unit/test_driver_openshift.py @@ -17,7 +17,7 @@ import fixtures import logging from nodepool import tests -from nodepool import zk +from nodepool.zk import zookeeper as zk class FakeOpenshiftProjectsQuery: diff --git a/nodepool/tests/unit/test_driver_openshiftpods.py b/nodepool/tests/unit/test_driver_openshiftpods.py index 814b77f7e..1ca287e32 100644 --- a/nodepool/tests/unit/test_driver_openshiftpods.py +++ b/nodepool/tests/unit/test_driver_openshiftpods.py @@ -17,7 +17,7 @@ import fixtures import logging from nodepool import tests -from nodepool import zk +from nodepool.zk import zookeeper as zk class FakeCoreClient(object): diff --git a/nodepool/tests/unit/test_driver_static.py b/nodepool/tests/unit/test_driver_static.py index 715d3c97f..f2dc3e1fd 100644 --- a/nodepool/tests/unit/test_driver_static.py +++ b/nodepool/tests/unit/test_driver_static.py @@ -19,7 +19,7 @@ import os from nodepool import config as nodepool_config from nodepool import tests -from nodepool import zk +from nodepool.zk import zookeeper as zk from nodepool.cmd.config_validator import ConfigValidator diff --git a/nodepool/tests/unit/test_launcher.py b/nodepool/tests/unit/test_launcher.py index 9f8b1aa38..8285a6545 100644 --- a/nodepool/tests/unit/test_launcher.py +++ b/nodepool/tests/unit/test_launcher.py @@ -21,7 +21,7 @@ import mock import testtools from nodepool import tests -from nodepool import zk +from nodepool.zk import zookeeper as zk from nodepool.driver.fake import provider as fakeprovider from nodepool.nodeutils import iterate_timeout import nodepool.launcher @@ -1658,11 +1658,10 @@ class TestLauncher(tests.DBTestCase): self.assertEqual('secret', fake_image.env_vars['REG_PASSWORD']) zk_servers = pool.config.zookeeper_servers - self.assertEqual(1, len(zk_servers)) - key = list(zk_servers.keys())[0] - self.assertEqual(self.zookeeper_host, zk_servers[key].host) - self.assertEqual(self.zookeeper_port, zk_servers[key].port) - self.assertEqual(self.zookeeper_chroot, zk_servers[key].chroot) + self.assertTrue(len(zk_servers) > 0) + expected = (f'{self.zookeeper_host}:{self.zookeeper_port}' + f'{self.zookeeper_chroot}') + self.assertEqual(expected, zk_servers) image = self.waitForImage('fake-provider', 'fake-image') self.assertEqual(image.username, 'zuul') @@ -2430,8 +2429,8 @@ class TestLauncher(tests.DBTestCase): # We want the first call to deleteRawNode() to fail, but subsequent # ones to succeed, so we store a pointer to the actual method so we # can reset it at the point we want to really delete. - real_method = nodepool.zk.ZooKeeper.deleteRawNode - nodepool.zk.ZooKeeper.deleteRawNode = mock.Mock( + real_method = zk.ZooKeeper.deleteRawNode + zk.ZooKeeper.deleteRawNode = mock.Mock( side_effect=Exception('mock exception')) # This call should leave the node in the DELETED state @@ -2442,5 +2441,5 @@ class TestLauncher(tests.DBTestCase): self.assertEqual(zk.DELETED, node.state) # Ready for the real delete now - nodepool.zk.ZooKeeper.deleteRawNode = real_method + zk.ZooKeeper.deleteRawNode = real_method self.waitForNodeDeletion(node) diff --git a/nodepool/tests/unit/test_webapp.py b/nodepool/tests/unit/test_webapp.py index 61101eec2..65dd2248c 100644 --- a/nodepool/tests/unit/test_webapp.py +++ b/nodepool/tests/unit/test_webapp.py @@ -20,7 +20,7 @@ from urllib import request from urllib.error import HTTPError from nodepool import tests -from nodepool import zk +from nodepool.zk import zookeeper as zk from nodepool.nodeutils import iterate_timeout diff --git a/nodepool/tests/unit/test_zk.py b/nodepool/tests/unit/test_zk.py index f6756f2fd..cf2addd71 100644 --- a/nodepool/tests/unit/test_zk.py +++ b/nodepool/tests/unit/test_zk.py @@ -17,7 +17,8 @@ import uuid from nodepool import exceptions as npe from nodepool import tests -from nodepool import zk +from nodepool.zk import zookeeper as zk +from nodepool.config import ZooKeeperConnectionConfig, buildZooKeeperHosts from nodepool.nodeutils import iterate_timeout @@ -28,38 +29,38 @@ class TestZooKeeper(tests.DBTestCase): def test_buildZooKeeperHosts_single(self): hosts = [ - zk.ZooKeeperConnectionConfig('127.0.0.1', port=2181, - chroot='/test1') + ZooKeeperConnectionConfig('127.0.0.1', port=2181, + chroot='/test1') ] self.assertEqual('127.0.0.1:2181/test1', - zk.buildZooKeeperHosts(hosts)) + buildZooKeeperHosts(hosts)) def test_buildZooKeeperHosts_multiple(self): hosts = [ - zk.ZooKeeperConnectionConfig('127.0.0.1', port=2181, - chroot='/test1'), - zk.ZooKeeperConnectionConfig('127.0.0.2', port=2182, - chroot='/test2') + ZooKeeperConnectionConfig('127.0.0.1', port=2181, + chroot='/test1'), + ZooKeeperConnectionConfig('127.0.0.2', port=2182, + chroot='/test2') ] self.assertEqual('127.0.0.1:2181/test1,127.0.0.2:2182/test2', - zk.buildZooKeeperHosts(hosts)) + buildZooKeeperHosts(hosts)) def test_buildZooKeeperHosts_ipv6(self): hosts = [ - zk.ZooKeeperConnectionConfig( + ZooKeeperConnectionConfig( '2001:4800:7817:103:be76:4eff:fe04:e359', port=2181, chroot='/test1'), - zk.ZooKeeperConnectionConfig( + ZooKeeperConnectionConfig( '[2002:4800:7817:103:be76:4eff:fe04:e359]', port=2181, chroot='/test2'), - zk.ZooKeeperConnectionConfig('127.0.0.2', port=2182, - chroot='/test3') + ZooKeeperConnectionConfig('127.0.0.2', port=2182, + chroot='/test3') ] self.assertEqual(( '[2001:4800:7817:103:be76:4eff:fe04:e359]:2181/test1,' '[2002:4800:7817:103:be76:4eff:fe04:e359]:2181/test2,' '127.0.0.2:2182/test3' - ), zk.buildZooKeeperHosts(hosts)) + ), buildZooKeeperHosts(hosts)) def test_imageBuildLock(self): path = self.zk._imageBuildLockPath("ubuntu-trusty") diff --git a/nodepool/zk/__init__.py b/nodepool/zk/__init__.py new file mode 100644 index 000000000..ad47e06d0 --- /dev/null +++ b/nodepool/zk/__init__.py @@ -0,0 +1,216 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +import logging +import time +from abc import ABCMeta +from threading import Thread + +from kazoo.client import KazooClient +from kazoo.handlers.threading import KazooTimeoutError +from kazoo.protocol.states import KazooState + +from nodepool.zk.exceptions import NoClientException +from nodepool.zk.handler import PoolSequentialThreadingHandler + + +class ZooKeeperClient(object): + log = logging.getLogger("nodepool.zk.ZooKeeperClient") + + # Log zookeeper retry every 10 seconds + retry_log_rate = 10 + + def __init__( + self, + hosts, + read_only=False, + timeout=10.0, + tls_cert=None, + tls_key=None, + tls_ca=None, + ): + """ + Initialize the ZooKeeper base client object. + + :param str hosts: Comma-separated list of hosts to connect to (e.g. + 127.0.0.1:2181,127.0.0.1:2182,[::1]:2183). + :param bool read_only: If True, establishes a read-only connection. + :param float timeout: The ZooKeeper session timeout, in + seconds (default: 10.0). + :param str tls_key: Path to TLS key + :param str tls_cert: Path to TLS cert + :param str tls_ca: Path to TLS CA cert + """ + self.hosts = hosts + self.read_only = read_only + self.timeout = timeout + self.tls_cert = tls_cert + self.tls_key = tls_key + self.tls_ca = tls_ca + self.was_lost = False + + self.client = None + + if not (tls_key and tls_cert and tls_ca): + raise Exception("A TLS ZooKeeper connection is required; " + "please supply the zookeeper-tls " + "config values.") + + # Verify that we can read the cert files (Kazoo doesn't + # provide useful error messages). + for fn in (tls_cert, tls_key, tls_ca): + if fn: + with open(fn): + pass + + self._last_retry_log = 0 + self.on_connect_listeners = [] + self.on_disconnect_listeners = [] + self.on_connection_lost_listeners = [] + self.on_reconnect_listeners = [] + + def _connectionListener(self, state): + """ + Listener method for Kazoo connection state changes. + + .. warning:: This method must not block. + """ + if state == KazooState.LOST: + self.log.debug("ZooKeeper connection: LOST") + self.was_lost = True + for listener in self.on_connection_lost_listeners: + try: + listener() + except Exception: + self.log.exception("Exception calling listener:") + elif state == KazooState.SUSPENDED: + self.log.debug("ZooKeeper connection: SUSPENDED") + else: + self.log.debug("ZooKeeper connection: CONNECTED") + # Create a throwaway thread since zk operations can't + # happen in this one. + if self.was_lost: + self.was_lost = False + for listener in self.on_reconnect_listeners: + t = Thread(target=listener) + t.daemon = True + t.start() + + @property + def connected(self): + return self.client and self.client.state == KazooState.CONNECTED + + @property + def suspended(self): + return self.client and self.client.state == KazooState.SUSPENDED + + @property + def lost(self): + return not self.client or self.client.state == KazooState.LOST + + def logConnectionRetryEvent(self): + now = time.monotonic() + if now - self._last_retry_log >= self.retry_log_rate: + self.log.warning("Retrying zookeeper connection") + self._last_retry_log = now + + def connect(self): + if self.client is None: + args = dict( + hosts=self.hosts, + read_only=self.read_only, + timeout=self.timeout, + handler=PoolSequentialThreadingHandler(), + ) + if self.tls_key: + args['use_ssl'] = True + args['keyfile'] = self.tls_key + args['certfile'] = self.tls_cert + args['ca'] = self.tls_ca + self.client = KazooClient(**args) + self.client.add_listener(self._connectionListener) + # Manually retry initial connection attempt + while True: + try: + self.client.start(1) + break + except KazooTimeoutError: + self.logConnectionRetryEvent() + + for listener in self.on_connect_listeners: + listener() + + def disconnect(self): + """ + Close the ZooKeeper cluster connection. + + You should call this method if you used connect() to establish a + cluster connection. + """ + for listener in self.on_disconnect_listeners: + listener() + + if self.client is not None and self.client.connected: + self.client.stop() + self.client.close() + self.client = None + + def resetHosts(self, hosts): + """ + Reset the ZooKeeper cluster connection host list. + + :param str hosts: Comma-separated list of hosts to connect to (e.g. + 127.0.0.1:2181,127.0.0.1:2182,[::1]:2183). + """ + if self.client is not None: + self.client.set_hosts(hosts=hosts) + + def commitTransaction(self, tr): + results = tr.commit() + for res in results: + self.log.debug("Transaction response %s", repr(res)) + for res in results: + if isinstance(res, Exception): + raise res + return results + + +class ZooKeeperSimpleBase(metaclass=ABCMeta): + """Base class for stateless Zookeeper interaction.""" + + def __init__(self, client): + self.client = client + + @property + def kazoo_client(self): + if not self.client.client: + raise NoClientException() + return self.client.client + + +class ZooKeeperBase(ZooKeeperSimpleBase): + """Base class for registering state handling methods with ZooKeeper.""" + + def __init__(self, client): + super().__init__(client) + if client: + self.client.on_connect_listeners.append(self._onConnect) + self.client.on_disconnect_listeners.append(self._onDisconnect) + self.client.on_reconnect_listeners.append(self._onReconnect) + + def _onConnect(self): + pass + + def _onDisconnect(self): + pass + + def _onReconnect(self): + pass diff --git a/nodepool/zk/exceptions.py b/nodepool/zk/exceptions.py new file mode 100644 index 000000000..694fbc79b --- /dev/null +++ b/nodepool/zk/exceptions.py @@ -0,0 +1,27 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +from kazoo.exceptions import KazooException + + +class NodepoolZooKeeperException(KazooException): + """Base exception class for all custom ZK exceptions""" + pass + + +class LockException(NodepoolZooKeeperException): + pass + + +class NoClientException(NodepoolZooKeeperException): + + def __init__(self): + super().__init__("No zookeeper client!") diff --git a/nodepool/zk/handler.py b/nodepool/zk/handler.py new file mode 100644 index 000000000..3cbce0b0e --- /dev/null +++ b/nodepool/zk/handler.py @@ -0,0 +1,36 @@ +# Copyright 2021 Acme Gating, LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from concurrent.futures import ThreadPoolExecutor + +from kazoo.handlers.threading import SequentialThreadingHandler + + +class PoolSequentialThreadingHandler(SequentialThreadingHandler): + def __init__(self): + super().__init__() + self._pool_executor = None + + def start(self): + self._pool_executor = ThreadPoolExecutor(max_workers=10) + super().start() + + def stop(self): + super().stop() + if self._pool_executor: + self._pool_executor.shutdown() + self._pool_executor = None + + def short_spawn(self, func, *args, **kwargs): + self._pool_executor.submit(func, *args, **kwargs) diff --git a/nodepool/zk.py b/nodepool/zk/zookeeper.py similarity index 94% rename from nodepool/zk.py rename to nodepool/zk/zookeeper.py index b83b37167..135410de4 100644 --- a/nodepool/zk.py +++ b/nodepool/zk/zookeeper.py @@ -13,15 +13,13 @@ from contextlib import contextmanager from copy import copy import abc -import ipaddress import json import logging import time import uuid -from kazoo.client import KazooClient, KazooState +from kazoo.client import KazooState from kazoo import exceptions as kze -from kazoo.handlers.threading import KazooTimeoutError from kazoo.recipe.lock import Lock from kazoo.recipe.cache import TreeCache, TreeEvent from kazoo.recipe.election import Election @@ -73,66 +71,6 @@ def as_list(item): return [item] -class ZooKeeperConnectionConfig(object): - ''' - Represents the connection parameters for a ZooKeeper server. - ''' - - def __eq__(self, other): - if isinstance(other, ZooKeeperConnectionConfig): - if other.__dict__ == self.__dict__: - return True - return False - - def __init__(self, host, port=2181, chroot=None): - '''Initialize the ZooKeeperConnectionConfig object. - - :param str host: The hostname of the ZooKeeper server. - :param int port: The port on which ZooKeeper is listening. - Optional, default: 2181. - :param str chroot: A chroot for this connection. All - ZooKeeper nodes will be underneath this root path. - Optional, default: None. - - (one per server) defining the ZooKeeper cluster servers. Only - the 'host' attribute is required.'. - - ''' - self.host = host - self.port = port - self.chroot = chroot or '' - - def __repr__(self): - return "host=%s port=%s chroot=%s" % \ - (self.host, self.port, self.chroot) - - -def buildZooKeeperHosts(host_list): - ''' - Build the ZK cluster host list for client connections. - - :param list host_list: A list of - :py:class:`~nodepool.zk.ZooKeeperConnectionConfig` objects (one - per server) defining the ZooKeeper cluster servers. - ''' - if not isinstance(host_list, list): - raise Exception("'host_list' must be a list") - hosts = [] - for host_def in host_list: - h = host_def.host - # If this looks like a ipv6 literal address, make sure it's - # quoted in []'s - try: - addr = ipaddress.ip_address(host_def.host) - if addr.version == 6: - h = '[%s]' % addr - except ValueError: - pass - host = '%s:%s%s' % (h, host_def.port, host_def.chroot) - hosts.append(host) - return ",".join(hosts) - - class ZooKeeperWatchEvent(object): ''' Class representing a watch trigger event. @@ -192,6 +130,9 @@ class Launcher(Serializable): else: return False + def __hash__(self): + return hash(self.id) + @property def supported_labels(self): return self._supported_labels @@ -797,11 +738,12 @@ class ZooKeeper(object): # Log zookeeper retry every 10 seconds retry_log_rate = 10 - def __init__(self, enable_cache=True): + def __init__(self, zk_client, enable_cache=True): ''' Initialize the ZooKeeper object. ''' - self.client = None + self.zk_client = zk_client # nodepool.zk.ZooKeeperClient + self.client = zk_client.client # KazooClient self._became_lost = False self._last_retry_log = 0 self._node_cache = None @@ -809,9 +751,19 @@ class ZooKeeper(object): self._cached_nodes = {} self._cached_node_requests = {} self.enable_cache = enable_cache - self.node_stats_event = None + if self.enable_cache: + self._node_cache = TreeCache(self.client, self.NODE_ROOT) + self._node_cache.listen_fault(self.cacheFaultListener) + self._node_cache.listen(self.nodeCacheListener) + self._node_cache.start() + + self._request_cache = TreeCache(self.client, self.REQUEST_ROOT) + self._request_cache.listen_fault(self.cacheFaultListener) + self._request_cache.listen(self.requestCacheListener) + self._request_cache.start() + # ======================================================================= # Private Methods # ======================================================================= @@ -1011,61 +963,6 @@ class ZooKeeper(object): def resetLostFlag(self): self._became_lost = False - def connect(self, host_list, read_only=False, tls_cert=None, - tls_key=None, tls_ca=None, timeout=10.0): - ''' - Establish a connection with ZooKeeper cluster. - - Convenience method if a pre-existing ZooKeeper connection is not - supplied to the ZooKeeper object at instantiation time. - - :param list host_list: A list of - :py:class:`~nodepool.zk.ZooKeeperConnectionConfig` objects - (one per server) defining the ZooKeeper cluster servers. - :param bool read_only: If True, establishes a read-only connection. - :param str tls_key: Path to TLS key - :param str tls_cert: Path to TLS cert - :param str tls_ca: Path to TLS CA cert - - ''' - if self.client is None: - hosts = buildZooKeeperHosts(host_list) - args = dict( - hosts=hosts, - read_only=read_only, - timeout=timeout, - ) - - args['use_ssl'] = True - if not (tls_key and tls_cert and tls_ca): - raise Exception("A TLS ZooKeeper connection is required; " - "please supply the zookeeper-tls " - "config values.") - - args['keyfile'] = tls_key - args['certfile'] = tls_cert - args['ca'] = tls_ca - self.client = KazooClient(**args) - self.client.add_listener(self._connection_listener) - # Manually retry initial connection attempt - while True: - try: - self.client.start(1) - break - except KazooTimeoutError: - self.logConnectionRetryEvent() - - if self.enable_cache: - self._node_cache = TreeCache(self.client, self.NODE_ROOT) - self._node_cache.listen_fault(self.cacheFaultListener) - self._node_cache.listen(self.nodeCacheListener) - self._node_cache.start() - - self._request_cache = TreeCache(self.client, self.REQUEST_ROOT) - self._request_cache.listen_fault(self.cacheFaultListener) - self._request_cache.listen(self.requestCacheListener) - self._request_cache.start() - def disconnect(self): ''' Close the ZooKeeper cluster connection. @@ -1087,16 +984,13 @@ class ZooKeeper(object): self.client.close() self.client = None - def resetHosts(self, host_list): + def resetHosts(self, hosts): ''' Reset the ZooKeeper cluster connection host list. - :param list host_list: A list of - :py:class:`~nodepool.zk.ZooKeeperConnectionConfig` objects - (one per server) defining the ZooKeeper cluster servers. + :param str host_list: A ZK host list ''' if self.client is not None: - hosts = buildZooKeeperHosts(host_list) self.client.set_hosts(hosts=hosts) @contextmanager diff --git a/tools/print-zk.py b/tools/print-zk.py index 23aab5f67..c7d2d2c5b 100755 --- a/tools/print-zk.py +++ b/tools/print-zk.py @@ -16,7 +16,8 @@ import argparse import logging import nodepool.config -import nodepool.zk +from nodepool.zk import zookeeper as zk +from nodepool.zk import ZooKeeperClient # A script to print the zookeeper tree given a nodepool config file. @@ -30,8 +31,15 @@ args = parser.parse_args() config = nodepool.config.loadConfig(args.config) -zk = nodepool.zk.ZooKeeper(enable_cache=False) -zk.connect(list(config.zookeeper_servers.values())) +zk_client = ZooKeeperClient( + config.zookeeper_servers, + tls_cert=config.zookeeper_tls_cert, + tls_key=config.zookeeper_tls_key, + tls_ca=config.zookeeper_tls_ca, + timeout=config.zookeeper_timeout, +) +zk_client.connect() +zk = zk.ZooKeeper(zk_client, enable_cache=False) def join(a, b): if a.endswith('/'):