Merge "Simplify ZooKeeper client initialization"
This commit is contained in:
commit
14b540060d
|
@ -3994,8 +3994,8 @@ class SchedulerTestApp:
|
|||
self.config, self.sched)
|
||||
merge_client = RecordingMergeClient(self.config, self.sched)
|
||||
nodepool = zuul.nodepool.Nodepool(self.sched)
|
||||
zk_client = ZooKeeperClient()
|
||||
zk_client.connect(self.zk_config, timeout=30.0)
|
||||
zk_client = ZooKeeperClient(self.zk_config, timeout=30.0)
|
||||
zk_client.connect()
|
||||
|
||||
self.sched.setExecutor(executor_client)
|
||||
self.sched.setMerger(merge_client)
|
||||
|
@ -4179,8 +4179,8 @@ class ZuulTestCase(BaseTestCase):
|
|||
self.zk_chroot_fixture.zookeeper_port,
|
||||
self.zk_chroot_fixture.zookeeper_chroot)
|
||||
|
||||
self.zk_client = ZooKeeperClient()
|
||||
self.zk_client.connect(hosts=self.zk_config)
|
||||
self.zk_client = ZooKeeperClient(hosts=self.zk_config)
|
||||
self.zk_client.connect()
|
||||
|
||||
if not KEEP_TEMPDIRS:
|
||||
tmp_root = self.useFixture(fixtures.TempDir(
|
||||
|
|
|
@ -31,9 +31,9 @@ class TestNodepoolIntegration(BaseTestCase):
|
|||
super(TestNodepoolIntegration, self).setUp()
|
||||
|
||||
self.statsd = None
|
||||
self.zk_client = zuul.zk.ZooKeeperClient()
|
||||
self.zk_client = zuul.zk.ZooKeeperClient('localhost:2181')
|
||||
self.addCleanup(self.zk_client.disconnect)
|
||||
self.zk_client.connect('localhost:2181')
|
||||
self.zk_client.connect()
|
||||
self.hostname = socket.gethostname()
|
||||
|
||||
self.provisioned_requests = []
|
||||
|
|
|
@ -38,10 +38,10 @@ class TestNodepool(BaseTestCase):
|
|||
self.zk_chroot_fixture.zookeeper_port,
|
||||
self.zk_chroot_fixture.zookeeper_chroot)
|
||||
|
||||
self.zk_client = ZooKeeperClient()
|
||||
self.zk_client = ZooKeeperClient(self.zk_config)
|
||||
self.zk_nodepool = ZooKeeperNodepool(self.zk_client)
|
||||
self.addCleanup(self.zk_client.disconnect)
|
||||
self.zk_client.connect(self.zk_config)
|
||||
self.zk_client.connect()
|
||||
self.hostname = 'nodepool-test-hostname'
|
||||
|
||||
self.provisioned_requests = []
|
||||
|
|
|
@ -522,8 +522,8 @@ class TestStreaming(tests.base.AnsibleZuulTestCase):
|
|||
logfile = open(ansible_log, 'r')
|
||||
self.addCleanup(logfile.close)
|
||||
|
||||
zk_client = ZooKeeperClient()
|
||||
zk_client.connect(self.zk_config, timeout=30.0)
|
||||
zk_client = ZooKeeperClient(self.zk_config, timeout=30.0)
|
||||
zk_client.connect()
|
||||
self.addCleanup(zk_client.disconnect)
|
||||
|
||||
# Start the finger gateway daemon
|
||||
|
|
|
@ -35,10 +35,10 @@ class TestZK(BaseTestCase):
|
|||
self.zk_chroot_fixture.zookeeper_port,
|
||||
self.zk_chroot_fixture.zookeeper_chroot)
|
||||
|
||||
self.zk_client = ZooKeeperClient()
|
||||
self.zk_client = ZooKeeperClient(self.zk_config)
|
||||
self.zk_nodepool = ZooKeeperNodepool(self.zk_client)
|
||||
self.addCleanup(self.zk_client.disconnect)
|
||||
self.zk_client.connect(self.zk_config)
|
||||
self.zk_client.connect()
|
||||
|
||||
def _createRequest(self):
|
||||
req = model.HoldRequest()
|
||||
|
|
|
@ -97,25 +97,8 @@ class Executor(zuul.cmd.ZuulDaemonApp):
|
|||
|
||||
self.start_log_streamer()
|
||||
|
||||
zk_client = ZooKeeperClient()
|
||||
zookeeper_hosts = get_default(self.config, 'zookeeper', 'hosts')
|
||||
if not zookeeper_hosts:
|
||||
raise Exception("The zookeeper hosts config value is required")
|
||||
zookeeper_tls_key = get_default(self.config, 'zookeeper', 'tls_key')
|
||||
zookeeper_tls_cert = get_default(self.config, 'zookeeper', 'tls_cert')
|
||||
zookeeper_tls_ca = get_default(self.config, 'zookeeper', 'tls_ca')
|
||||
if not (zookeeper_tls_key and zookeeper_tls_cert and zookeeper_tls_ca):
|
||||
raise Exception("A TLS ZooKeeper connection is required; "
|
||||
"please supply the tls_* zookeeper config values.")
|
||||
zookeeper_timeout = float(get_default(self.config, 'zookeeper',
|
||||
'session_timeout', 10.0))
|
||||
zk_client.connect(
|
||||
zookeeper_hosts,
|
||||
timeout=zookeeper_timeout,
|
||||
tls_cert=zookeeper_tls_cert,
|
||||
tls_key=zookeeper_tls_key,
|
||||
tls_ca=zookeeper_tls_ca
|
||||
)
|
||||
zk_client = ZooKeeperClient.fromConfig(self.config)
|
||||
zk_client.connect()
|
||||
|
||||
ExecutorServer = zuul.executor.server.ExecutorServer
|
||||
self.executor = ExecutorServer(self.config, zk_client,
|
||||
|
|
|
@ -73,25 +73,8 @@ class FingerGatewayApp(zuul.cmd.ZuulDaemonApp):
|
|||
ssl_cert = get_default(self.config, 'gearman', 'ssl_cert')
|
||||
ssl_ca = get_default(self.config, 'gearman', 'ssl_ca')
|
||||
|
||||
zk_client = ZooKeeperClient()
|
||||
zookeeper_hosts = get_default(self.config, 'zookeeper', 'hosts')
|
||||
if not zookeeper_hosts:
|
||||
raise Exception("The zookeeper hosts config value is required")
|
||||
zookeeper_tls_key = get_default(self.config, 'zookeeper', 'tls_key')
|
||||
zookeeper_tls_cert = get_default(self.config, 'zookeeper', 'tls_cert')
|
||||
zookeeper_tls_ca = get_default(self.config, 'zookeeper', 'tls_ca')
|
||||
if not (zookeeper_tls_key and zookeeper_tls_cert and zookeeper_tls_ca):
|
||||
raise Exception("A TLS ZooKeeper connection is required; "
|
||||
"please supply the tls_* zookeeper config values.")
|
||||
zookeeper_timeout = float(get_default(self.config, 'zookeeper',
|
||||
'session_timeout', 10.0))
|
||||
zk_client.connect(
|
||||
zookeeper_hosts,
|
||||
timeout=zookeeper_timeout,
|
||||
tls_cert=zookeeper_tls_cert,
|
||||
tls_key=zookeeper_tls_key,
|
||||
tls_ca=zookeeper_tls_ca,
|
||||
)
|
||||
zk_client = ZooKeeperClient.fromConfig(self.config)
|
||||
zk_client.connect()
|
||||
|
||||
self.gateway = FingerGateway(
|
||||
(gear_server, gear_port, ssl_key, ssl_cert, ssl_ca),
|
||||
|
|
|
@ -19,7 +19,6 @@ import sys
|
|||
|
||||
import zuul.cmd
|
||||
from zuul.merger.server import COMMANDS, MergeServer
|
||||
from zuul.lib.config import get_default
|
||||
from zuul.zk import ZooKeeperClient
|
||||
|
||||
|
||||
|
@ -53,25 +52,8 @@ class Merger(zuul.cmd.ZuulDaemonApp):
|
|||
|
||||
self.setup_logging('merger', 'log_config')
|
||||
|
||||
zk_client = ZooKeeperClient()
|
||||
zookeeper_hosts = get_default(self.config, 'zookeeper', 'hosts')
|
||||
if not zookeeper_hosts:
|
||||
raise Exception("The zookeeper hosts config value is required")
|
||||
zookeeper_tls_key = get_default(self.config, 'zookeeper', 'tls_key')
|
||||
zookeeper_tls_cert = get_default(self.config, 'zookeeper', 'tls_cert')
|
||||
zookeeper_tls_ca = get_default(self.config, 'zookeeper', 'tls_ca')
|
||||
if not (zookeeper_tls_key and zookeeper_tls_cert and zookeeper_tls_ca):
|
||||
raise Exception("A TLS ZooKeeper connection is required; "
|
||||
"please supply the tls_* zookeeper config values.")
|
||||
zookeeper_timeout = float(get_default(self.config, 'zookeeper',
|
||||
'session_timeout', 10.0))
|
||||
zk_client.connect(
|
||||
zookeeper_hosts,
|
||||
timeout=zookeeper_timeout,
|
||||
tls_cert=zookeeper_tls_cert,
|
||||
tls_key=zookeeper_tls_key,
|
||||
tls_ca=zookeeper_tls_ca,
|
||||
)
|
||||
zk_client = ZooKeeperClient.fromConfig(self.config)
|
||||
zk_client.connect()
|
||||
|
||||
self.merger = MergeServer(self.config, zk_client, self.connections)
|
||||
self.merger.start()
|
||||
|
|
|
@ -138,24 +138,8 @@ class Scheduler(zuul.cmd.ZuulDaemonApp):
|
|||
merger = zuul.merger.client.MergeClient(self.config, self.sched)
|
||||
nodepool = zuul.nodepool.Nodepool(self.sched)
|
||||
|
||||
zk_client = ZooKeeperClient()
|
||||
zookeeper_hosts = get_default(self.config, 'zookeeper', 'hosts')
|
||||
if not zookeeper_hosts:
|
||||
raise Exception("The zookeeper hosts config value is required")
|
||||
zookeeper_tls_key = get_default(self.config, 'zookeeper', 'tls_key')
|
||||
zookeeper_tls_cert = get_default(self.config, 'zookeeper', 'tls_cert')
|
||||
zookeeper_tls_ca = get_default(self.config, 'zookeeper', 'tls_ca')
|
||||
if not (zookeeper_tls_key and zookeeper_tls_cert and zookeeper_tls_ca):
|
||||
raise Exception("A TLS ZooKeeper connection is required; "
|
||||
"please supply the tls_* zookeeper config values.")
|
||||
zookeeper_timeout = float(get_default(self.config, 'zookeeper',
|
||||
'session_timeout', 10.0))
|
||||
zk_client.connect(
|
||||
zookeeper_hosts,
|
||||
timeout=zookeeper_timeout,
|
||||
tls_cert=zookeeper_tls_cert,
|
||||
tls_key=zookeeper_tls_key,
|
||||
tls_ca=zookeeper_tls_ca)
|
||||
zk_client = ZooKeeperClient.fromConfig(self.config)
|
||||
zk_client.connect()
|
||||
|
||||
self.configure_connections(require_sql=True)
|
||||
self.sched.setExecutor(gearman)
|
||||
|
|
|
@ -1231,10 +1231,15 @@ class ZuulWeb(object):
|
|||
self.rpc = zuul.rpcclient.RPCClient(gear_server, gear_port,
|
||||
ssl_key, ssl_cert, ssl_ca,
|
||||
client_id='Zuul Web Server')
|
||||
self.zk_client = ZooKeeperClient()
|
||||
self.zk_client.connect(hosts=zk_hosts, read_only=True,
|
||||
timeout=zk_timeout, tls_cert=zk_tls_cert,
|
||||
tls_key=zk_tls_key, tls_ca=zk_tls_ca)
|
||||
self.zk_client = ZooKeeperClient(
|
||||
hosts=zk_hosts,
|
||||
read_only=True,
|
||||
timeout=zk_timeout,
|
||||
tls_cert=zk_tls_cert,
|
||||
tls_key=zk_tls_key,
|
||||
tls_ca=zk_tls_ca,
|
||||
)
|
||||
self.zk_client.connect()
|
||||
|
||||
self.connections = connections
|
||||
self.authenticators = authenticators
|
||||
|
|
|
@ -12,12 +12,14 @@
|
|||
import logging
|
||||
import time
|
||||
from abc import ABCMeta
|
||||
from configparser import ConfigParser
|
||||
from typing import Optional, List, Callable
|
||||
|
||||
from kazoo.client import KazooClient
|
||||
from kazoo.handlers.threading import KazooTimeoutError
|
||||
from kazoo.protocol.states import KazooState
|
||||
|
||||
from zuul.lib.config import get_default
|
||||
from zuul.zk.exceptions import NoClientException
|
||||
|
||||
|
||||
|
@ -27,10 +29,34 @@ class ZooKeeperClient(object):
|
|||
# Log zookeeper retry every 10 seconds
|
||||
retry_log_rate = 10
|
||||
|
||||
def __init__(self):
|
||||
def __init__(
|
||||
self,
|
||||
hosts: str,
|
||||
read_only: bool = False,
|
||||
timeout: float = 10.0,
|
||||
tls_cert: Optional[str] = None,
|
||||
tls_key: Optional[str] = None,
|
||||
tls_ca: Optional[str] = None,
|
||||
):
|
||||
"""
|
||||
Initialize the ZooKeeper base client object.
|
||||
|
||||
:param str hosts: Comma-separated list of hosts to connect to (e.g.
|
||||
127.0.0.1:2181,127.0.0.1:2182,[::1]:2183).
|
||||
:param bool read_only: If True, establishes a read-only connection.
|
||||
:param float timeout: The ZooKeeper session timeout, in
|
||||
seconds (default: 10.0).
|
||||
:param str tls_key: Path to TLS key
|
||||
:param str tls_cert: Path to TLS cert
|
||||
:param str tls_ca: Path to TLS CA cert
|
||||
"""
|
||||
self.hosts = hosts
|
||||
self.read_only = read_only
|
||||
self.timeout = timeout
|
||||
self.tls_cert = tls_cert
|
||||
self.tls_key = tls_key
|
||||
self.tls_ca = tls_ca
|
||||
|
||||
self.client: Optional[KazooClient] = None
|
||||
self._last_retry_log: int = 0
|
||||
self.on_connect_listeners: List[Callable[[], None]] = []
|
||||
|
@ -67,32 +93,18 @@ class ZooKeeperClient(object):
|
|||
self.log.warning("Retrying zookeeper connection")
|
||||
self._last_retry_log = now
|
||||
|
||||
def connect(self, hosts: str, read_only: bool = False,
|
||||
timeout: float = 10.0, tls_cert: Optional[str] = None,
|
||||
tls_key: Optional[str] = None,
|
||||
tls_ca: Optional[str] = None):
|
||||
"""
|
||||
Establish a connection with ZooKeeper cluster.
|
||||
|
||||
Convenience method if a pre-existing ZooKeeper connection is not
|
||||
supplied to the ZooKeeper object at instantiation time.
|
||||
|
||||
:param str hosts: Comma-separated list of hosts to connect to (e.g.
|
||||
127.0.0.1:2181,127.0.0.1:2182,[::1]:2183).
|
||||
:param bool read_only: If True, establishes a read-only connection.
|
||||
:param float timeout: The ZooKeeper session timeout, in
|
||||
seconds (default: 10.0).
|
||||
:param str tls_key: Path to TLS key
|
||||
:param str tls_cert: Path to TLS cert
|
||||
:param str tls_ca: Path to TLS CA cert
|
||||
"""
|
||||
def connect(self):
|
||||
if self.client is None:
|
||||
args = dict(hosts=hosts, read_only=read_only, timeout=timeout)
|
||||
if tls_key:
|
||||
args = dict(
|
||||
hosts=self.hosts,
|
||||
read_only=self.read_only,
|
||||
timeout=self.timeout,
|
||||
)
|
||||
if self.tls_key:
|
||||
args['use_ssl'] = True
|
||||
args['keyfile'] = tls_key
|
||||
args['certfile'] = tls_cert
|
||||
args['ca'] = tls_ca
|
||||
args['keyfile'] = self.tls_key
|
||||
args['certfile'] = self.tls_cert
|
||||
args['ca'] = self.tls_ca
|
||||
self.client = KazooClient(**args)
|
||||
self.client.add_listener(self._connectionListener)
|
||||
# Manually retry initial connection attempt
|
||||
|
@ -131,6 +143,31 @@ class ZooKeeperClient(object):
|
|||
if self.client is not None:
|
||||
self.client.set_hosts(hosts=hosts)
|
||||
|
||||
@classmethod
|
||||
def fromConfig(cls, config: ConfigParser) -> "ZooKeeperClient":
|
||||
hosts = get_default(config, "zookeeper", "hosts")
|
||||
if not hosts:
|
||||
raise Exception("The zookeeper hosts config value is required")
|
||||
tls_key = get_default(config, "zookeeper", "tls_key")
|
||||
tls_cert = get_default(config, "zookeeper", "tls_cert")
|
||||
tls_ca = get_default(config, "zookeeper", "tls_ca")
|
||||
if not all([tls_key, tls_cert, tls_ca]):
|
||||
raise Exception(
|
||||
"A TLS ZooKeeper connection is required; please supply the "
|
||||
"tls_* zookeeper config values."
|
||||
)
|
||||
timeout = float(
|
||||
get_default(config, "zookeeper", "session_timeout", 120.0)
|
||||
)
|
||||
|
||||
return cls(
|
||||
hosts=hosts,
|
||||
timeout=timeout,
|
||||
tls_key=tls_key,
|
||||
tls_cert=tls_cert,
|
||||
tls_ca=tls_ca,
|
||||
)
|
||||
|
||||
|
||||
class ZooKeeperBase(metaclass=ABCMeta):
|
||||
"""Base class for components that need to interact with Zookeeper."""
|
||||
|
|
Loading…
Reference in New Issue