Initialize ZooKeeper connection in server rather than in cmd classes
Currently, the ZooKeeper connection is initialized directly in the cmd classes like zuul.cmd.scheduler or zuul.cmd.merger and then passed to the server instance. Although this makes it easy to reuse a single ZooKeeper connection for multiple components in the tests it's not very realistic. A better approach would be to initialize the connection directly in the server classes so that each component has its own connection to ZooKeeper. Those classes already get all necessary parameters, so we could get rid of the additional "zk_client" parameter. Furthermore it would allow us to use a dedicated ZooKeeper connection for each component in the tests which is more realistic than sharing a single connection between all components. Change-Id: I12260d43be0897321cf47ef0c722ccd74599d43d
This commit is contained in:
parent
71a990d42a
commit
2dfb34a818
|
@ -114,7 +114,6 @@ import zuul.rpcclient
|
|||
import zuul.configloader
|
||||
from zuul.lib.config import get_default
|
||||
from zuul.lib.logutil import get_annotated_logger
|
||||
from zuul.zk import ZooKeeperClient
|
||||
|
||||
FIXTURE_DIR = os.path.join(os.path.dirname(__file__), 'fixtures')
|
||||
|
||||
|
@ -3736,7 +3735,7 @@ class WebProxyFixture(fixtures.Fixture):
|
|||
|
||||
|
||||
class ZuulWebFixture(fixtures.Fixture):
|
||||
def __init__(self, gearman_server_port,
|
||||
def __init__(self,
|
||||
changes: Dict[str, Dict[str, Change]], config: ConfigParser,
|
||||
additional_event_queues, upstream_root: str,
|
||||
rpcclient: RPCClient, poller_events, git_url_with_auth: bool,
|
||||
|
@ -3744,7 +3743,7 @@ class ZuulWebFixture(fixtures.Fixture):
|
|||
test_root: str, fake_sql: bool = True,
|
||||
info: Optional[zuul.model.WebInfo] = None):
|
||||
super(ZuulWebFixture, self).__init__()
|
||||
self.gearman_server_port = gearman_server_port
|
||||
self.config = config
|
||||
self.connections = TestConnectionRegistry(
|
||||
changes, config, additional_event_queues, upstream_root, rpcclient,
|
||||
poller_events, git_url_with_auth, add_cleanup, fake_sql)
|
||||
|
@ -3760,19 +3759,14 @@ class ZuulWebFixture(fixtures.Fixture):
|
|||
self.info = zuul.model.WebInfo.fromConfig(config)
|
||||
else:
|
||||
self.info = info
|
||||
self.zk_client = ZooKeeperClient.fromConfig(config)
|
||||
self.zk_client.connect()
|
||||
self.test_root = test_root
|
||||
|
||||
def _setUp(self):
|
||||
# Start the web server
|
||||
self.web = zuul.web.ZuulWeb(
|
||||
listen_address='::', listen_port=0,
|
||||
gear_server='127.0.0.1', gear_port=self.gearman_server_port,
|
||||
config=self.config,
|
||||
info=self.info,
|
||||
connections=self.connections,
|
||||
zk_client=self.zk_client,
|
||||
command_socket=os.path.join(self.test_root, 'web.socket'),
|
||||
authenticators=self.authenticators)
|
||||
self.web.start()
|
||||
self.addCleanup(self.stop)
|
||||
|
@ -4037,13 +4031,10 @@ class SchedulerTestApp:
|
|||
self.config, self.sched)
|
||||
merge_client = RecordingMergeClient(self.config, self.sched)
|
||||
nodepool = zuul.nodepool.Nodepool(self.sched)
|
||||
zk_client = ZooKeeperClient.fromConfig(self.config)
|
||||
zk_client.connect()
|
||||
|
||||
self.sched.setExecutor(executor_client)
|
||||
self.sched.setMerger(merge_client)
|
||||
self.sched.setNodepool(nodepool)
|
||||
self.sched.setZooKeeper(zk_client)
|
||||
|
||||
def start(self, validate_tenants: list):
|
||||
self.sched.start()
|
||||
|
@ -4212,7 +4203,7 @@ class ZuulTestCase(BaseTestCase):
|
|||
|
||||
def _startMerger(self):
|
||||
self.merge_server = zuul.merger.server.MergeServer(
|
||||
self.config, self.zk_client, self.scheds.first.connections
|
||||
self.config, self.scheds.first.connections
|
||||
)
|
||||
self.merge_server.start()
|
||||
|
||||
|
@ -4279,6 +4270,11 @@ class ZuulTestCase(BaseTestCase):
|
|||
self.config.set(
|
||||
'merger', 'command_socket',
|
||||
os.path.join(self.test_root, 'merger.socket'))
|
||||
self.config.set('web', 'listen_address', '::')
|
||||
self.config.set('web', 'port', '0')
|
||||
self.config.set(
|
||||
'web', 'command_socket',
|
||||
os.path.join(self.test_root, 'web.socket'))
|
||||
|
||||
self.statsd = FakeStatsd()
|
||||
if self.config.has_section('statsd'):
|
||||
|
@ -4311,9 +4307,6 @@ class ZuulTestCase(BaseTestCase):
|
|||
self.config.set('zookeeper', 'tls_ca',
|
||||
self.zk_chroot_fixture.zookeeper_ca)
|
||||
|
||||
self.zk_client = ZooKeeperClient.fromConfig(self.config)
|
||||
self.zk_client.connect()
|
||||
|
||||
self.rpcclient = zuul.rpcclient.RPCClient(
|
||||
self.config.get('gearman', 'server'),
|
||||
self.gearman_server.port,
|
||||
|
@ -4341,7 +4334,6 @@ class ZuulTestCase(BaseTestCase):
|
|||
source_only=self.source_only)
|
||||
self.executor_server = RecordingExecutorServer(
|
||||
self.config,
|
||||
self.zk_client,
|
||||
executor_connections,
|
||||
jobdir_root=self.jobdir_root,
|
||||
_run_ansible=self.run_ansible,
|
||||
|
@ -4422,7 +4414,9 @@ class ZuulTestCase(BaseTestCase):
|
|||
config = configparser.ConfigParser()
|
||||
config.read(os.path.join(FIXTURE_DIR, config_file))
|
||||
|
||||
sections = ['zuul', 'scheduler', 'executor', 'merger', 'zookeeper']
|
||||
sections = [
|
||||
'zuul', 'scheduler', 'executor', 'merger', 'web', 'zookeeper'
|
||||
]
|
||||
for section in sections:
|
||||
if not config.has_section(section):
|
||||
config.add_section(section)
|
||||
|
@ -4681,8 +4675,6 @@ class ZuulTestCase(BaseTestCase):
|
|||
self.rpcclient.shutdown()
|
||||
self.gearman_server.shutdown()
|
||||
self.fake_nodepool.stop()
|
||||
self.zk_client.disconnect()
|
||||
self.scheds.execute(lambda app: app.sched.zk_client.disconnect())
|
||||
self.printHistory()
|
||||
# We whitelist watchdog threads as they have relatively long delays
|
||||
# before noticing they should exit, but they should exit on their own.
|
||||
|
|
|
@ -35,6 +35,16 @@ class BaseClientTestCase(BaseTestCase):
|
|||
rootdir=os.environ.get("ZUUL_TEST_ROOT"))).path
|
||||
self.config = configparser.ConfigParser()
|
||||
self.config.read(os.path.join(FIXTURE_DIR, self.config_file))
|
||||
self.setupZK()
|
||||
self.config.add_section('zookeeper')
|
||||
self.config.set('zookeeper', 'hosts', self.zk_chroot_fixture.zk_hosts)
|
||||
self.config.set('zookeeper', 'session_timeout', '30')
|
||||
self.config.set('zookeeper', 'tls_cert',
|
||||
self.zk_chroot_fixture.zookeeper_cert)
|
||||
self.config.set('zookeeper', 'tls_key',
|
||||
self.zk_chroot_fixture.zookeeper_key)
|
||||
self.config.set('zookeeper', 'tls_ca',
|
||||
self.zk_chroot_fixture.zookeeper_ca)
|
||||
|
||||
|
||||
class TestTenantValidationClient(BaseClientTestCase):
|
||||
|
|
|
@ -1612,7 +1612,7 @@ class TestGithubWebhook(ZuulTestCase):
|
|||
|
||||
# Start the web server
|
||||
self.web = self.useFixture(
|
||||
ZuulWebFixture(self.gearman_server.port, self.changes, self.config,
|
||||
ZuulWebFixture(self.changes, self.config,
|
||||
self.additional_event_queues, self.upstream_root,
|
||||
self.rpcclient, self.poller_events,
|
||||
self.git_url_with_auth, self.addCleanup,
|
||||
|
|
|
@ -34,7 +34,7 @@ class TestGitlabWebhook(ZuulTestCase):
|
|||
|
||||
# Start the web server
|
||||
self.web = self.useFixture(
|
||||
ZuulWebFixture(self.gearman_server.port, self.changes, self.config,
|
||||
ZuulWebFixture(self.changes, self.config,
|
||||
self.additional_event_queues, self.upstream_root,
|
||||
self.rpcclient, self.poller_events,
|
||||
self.git_url_with_auth, self.addCleanup,
|
||||
|
|
|
@ -1044,7 +1044,7 @@ class TestPagureWebhook(ZuulTestCase):
|
|||
super(TestPagureWebhook, self).setUp()
|
||||
# Start the web server
|
||||
self.web = self.useFixture(
|
||||
ZuulWebFixture(self.gearman_server.port, self.changes, self.config,
|
||||
ZuulWebFixture(self.changes, self.config,
|
||||
self.additional_event_queues, self.upstream_root,
|
||||
self.rpcclient, self.poller_events,
|
||||
self.git_url_with_auth, self.addCleanup,
|
||||
|
@ -1092,7 +1092,7 @@ class TestPagureWebhookWhitelist(ZuulTestCase):
|
|||
super(TestPagureWebhookWhitelist, self).setUp()
|
||||
# Start the web server
|
||||
self.web = self.useFixture(
|
||||
ZuulWebFixture(self.gearman_server.port, self.changes, self.config,
|
||||
ZuulWebFixture(self.changes, self.config,
|
||||
self.additional_event_queues, self.upstream_root,
|
||||
self.rpcclient, self.poller_events,
|
||||
self.git_url_with_auth, self.addCleanup,
|
||||
|
|
|
@ -89,7 +89,7 @@ class TestSchedulerZone(ZuulTestCase):
|
|||
executor_connections.configure(self.config,
|
||||
source_only=self.source_only)
|
||||
self.executor_server_unzoned = RecordingExecutorServer(
|
||||
config, self.zk_client,
|
||||
config,
|
||||
connections=executor_connections,
|
||||
jobdir_root=self.jobdir_root,
|
||||
_run_ansible=self.run_ansible,
|
||||
|
|
|
@ -27,7 +27,6 @@ import time
|
|||
import zuul.web
|
||||
import zuul.lib.log_streamer
|
||||
from zuul.lib.fingergw import FingerGateway
|
||||
from zuul.zk import ZooKeeperClient
|
||||
import tests.base
|
||||
from tests.base import iterate_timeout, ZuulWebFixture
|
||||
|
||||
|
@ -251,7 +250,7 @@ class TestStreaming(tests.base.AnsibleZuulTestCase):
|
|||
'''
|
||||
# Start the web server
|
||||
web = self.useFixture(
|
||||
ZuulWebFixture(self.gearman_server.port, self.changes, self.config,
|
||||
ZuulWebFixture(self.changes, self.config,
|
||||
self.additional_event_queues, self.upstream_root,
|
||||
self.rpcclient, self.poller_events,
|
||||
self.git_url_with_auth, self.addCleanup,
|
||||
|
@ -330,7 +329,7 @@ class TestStreaming(tests.base.AnsibleZuulTestCase):
|
|||
def test_websocket_streaming(self):
|
||||
# Start the web server
|
||||
web = self.useFixture(
|
||||
ZuulWebFixture(self.gearman_server.port, self.changes, self.config,
|
||||
ZuulWebFixture(self.changes, self.config,
|
||||
self.additional_event_queues, self.upstream_root,
|
||||
self.rpcclient, self.poller_events,
|
||||
self.git_url_with_auth, self.addCleanup,
|
||||
|
@ -406,7 +405,7 @@ class TestStreaming(tests.base.AnsibleZuulTestCase):
|
|||
def test_websocket_hangup(self):
|
||||
# Start the web server
|
||||
web = self.useFixture(
|
||||
ZuulWebFixture(self.gearman_server.port, self.changes, self.config,
|
||||
ZuulWebFixture(self.changes, self.config,
|
||||
self.additional_event_queues, self.upstream_root,
|
||||
self.rpcclient, self.poller_events,
|
||||
self.git_url_with_auth, self.addCleanup,
|
||||
|
@ -522,14 +521,11 @@ class TestStreaming(tests.base.AnsibleZuulTestCase):
|
|||
logfile = open(ansible_log, 'r')
|
||||
self.addCleanup(logfile.close)
|
||||
|
||||
zk_client = ZooKeeperClient.fromConfig(self.config)
|
||||
zk_client.connect()
|
||||
self.addCleanup(zk_client.disconnect)
|
||||
|
||||
# Start the finger gateway daemon
|
||||
gateway = FingerGateway(
|
||||
self.config,
|
||||
('127.0.0.1', self.gearman_server.port, None, None, None),
|
||||
zk_client, (self.host, 0),
|
||||
(self.host, 0),
|
||||
user=None,
|
||||
command_socket=None,
|
||||
pid_file=None
|
||||
|
|
|
@ -54,7 +54,7 @@ class BaseTestWeb(ZuulTestCase):
|
|||
self.zuul_ini_config = FakeConfig(self.config_ini_data)
|
||||
# Start the web server
|
||||
self.web = self.useFixture(
|
||||
ZuulWebFixture(self.gearman_server.port, self.changes, self.config,
|
||||
ZuulWebFixture(self.changes, self.config,
|
||||
self.additional_event_queues, self.upstream_root,
|
||||
self.rpcclient, self.poller_events,
|
||||
self.git_url_with_auth, self.addCleanup,
|
||||
|
|
|
@ -27,7 +27,7 @@ class TestWebURLs(ZuulTestCase):
|
|||
def setUp(self):
|
||||
super(TestWebURLs, self).setUp()
|
||||
self.web = self.useFixture(
|
||||
ZuulWebFixture(self.gearman_server.port, self.changes, self.config,
|
||||
ZuulWebFixture(self.changes, self.config,
|
||||
self.additional_event_queues, self.upstream_root,
|
||||
self.rpcclient, self.poller_events,
|
||||
self.git_url_with_auth, self.addCleanup,
|
||||
|
|
|
@ -22,7 +22,6 @@ import zuul.cmd
|
|||
import zuul.executor.server
|
||||
|
||||
from zuul.lib.config import get_default
|
||||
from zuul.zk import ZooKeeperClient
|
||||
|
||||
|
||||
class Executor(zuul.cmd.ZuulDaemonApp):
|
||||
|
@ -97,11 +96,8 @@ class Executor(zuul.cmd.ZuulDaemonApp):
|
|||
|
||||
self.start_log_streamer()
|
||||
|
||||
zk_client = ZooKeeperClient.fromConfig(self.config)
|
||||
zk_client.connect()
|
||||
|
||||
ExecutorServer = zuul.executor.server.ExecutorServer
|
||||
self.executor = ExecutorServer(self.config, zk_client,
|
||||
self.executor = ExecutorServer(self.config,
|
||||
self.connections,
|
||||
jobdir_root=self.job_dir,
|
||||
keep_jobdir=self.args.keep_jobdir,
|
||||
|
@ -112,7 +108,6 @@ class Executor(zuul.cmd.ZuulDaemonApp):
|
|||
signal.signal(signal.SIGTERM, self.exit_handler)
|
||||
|
||||
self.executor.join()
|
||||
zk_client.disconnect()
|
||||
|
||||
|
||||
def main():
|
||||
|
|
|
@ -20,7 +20,6 @@ from typing import Optional
|
|||
import zuul.cmd
|
||||
from zuul.lib.config import get_default
|
||||
from zuul.lib.fingergw import COMMANDS, FingerGateway
|
||||
from zuul.zk import ZooKeeperClient
|
||||
|
||||
|
||||
class FingerGatewayApp(zuul.cmd.ZuulDaemonApp):
|
||||
|
@ -73,12 +72,9 @@ class FingerGatewayApp(zuul.cmd.ZuulDaemonApp):
|
|||
ssl_cert = get_default(self.config, 'gearman', 'ssl_cert')
|
||||
ssl_ca = get_default(self.config, 'gearman', 'ssl_ca')
|
||||
|
||||
zk_client = ZooKeeperClient.fromConfig(self.config)
|
||||
zk_client.connect()
|
||||
|
||||
self.gateway = FingerGateway(
|
||||
self.config,
|
||||
(gear_server, gear_port, ssl_key, ssl_cert, ssl_ca),
|
||||
zk_client,
|
||||
(host, port),
|
||||
user,
|
||||
cmdsock,
|
||||
|
@ -101,7 +97,6 @@ class FingerGatewayApp(zuul.cmd.ZuulDaemonApp):
|
|||
break
|
||||
else:
|
||||
self.gateway.wait()
|
||||
zk_client.disconnect()
|
||||
|
||||
self.log.info('Stopped Zuul finger gateway app')
|
||||
|
||||
|
|
|
@ -19,7 +19,6 @@ import sys
|
|||
|
||||
import zuul.cmd
|
||||
from zuul.merger.server import COMMANDS, MergeServer
|
||||
from zuul.zk import ZooKeeperClient
|
||||
|
||||
|
||||
class Merger(zuul.cmd.ZuulDaemonApp):
|
||||
|
@ -52,10 +51,7 @@ class Merger(zuul.cmd.ZuulDaemonApp):
|
|||
|
||||
self.setup_logging('merger', 'log_config')
|
||||
|
||||
zk_client = ZooKeeperClient.fromConfig(self.config)
|
||||
zk_client.connect()
|
||||
|
||||
self.merger = MergeServer(self.config, zk_client, self.connections)
|
||||
self.merger = MergeServer(self.config, self.connections)
|
||||
self.merger.start()
|
||||
|
||||
if self.args.nodaemon:
|
||||
|
@ -68,7 +64,6 @@ class Merger(zuul.cmd.ZuulDaemonApp):
|
|||
self.exit_handler(signal.SIGINT, None)
|
||||
else:
|
||||
self.merger.join()
|
||||
zk_client.disconnect()
|
||||
|
||||
|
||||
def main():
|
||||
|
|
|
@ -25,7 +25,6 @@ from zuul.lib.statsd import get_statsd_config
|
|||
import zuul.merger.client
|
||||
import zuul.nodepool
|
||||
import zuul.scheduler
|
||||
from zuul.zk import ZooKeeperClient
|
||||
|
||||
|
||||
class Scheduler(zuul.cmd.ZuulDaemonApp):
|
||||
|
@ -147,7 +146,6 @@ class Scheduler(zuul.cmd.ZuulDaemonApp):
|
|||
self.configure_connections(require_sql=True)
|
||||
self.sched.setMerger(merger)
|
||||
|
||||
zk_client = None
|
||||
if self.args.validate_tenants is None:
|
||||
|
||||
# Only needed in full mode
|
||||
|
@ -155,12 +153,8 @@ class Scheduler(zuul.cmd.ZuulDaemonApp):
|
|||
self.sched)
|
||||
nodepool = zuul.nodepool.Nodepool(self.sched)
|
||||
|
||||
zk_client = ZooKeeperClient.fromConfig(self.config)
|
||||
zk_client.connect()
|
||||
|
||||
self.sched.setExecutor(gearman)
|
||||
self.sched.setNodepool(nodepool)
|
||||
self.sched.setZooKeeper(zk_client)
|
||||
|
||||
self.log.info('Starting scheduler')
|
||||
try:
|
||||
|
@ -190,7 +184,6 @@ class Scheduler(zuul.cmd.ZuulDaemonApp):
|
|||
self.exit_handler(signal.SIGINT, None)
|
||||
else:
|
||||
self.sched.join()
|
||||
zk_client.disconnect()
|
||||
|
||||
|
||||
def main():
|
||||
|
|
|
@ -23,9 +23,6 @@ import zuul.driver.sql
|
|||
import zuul.driver.github
|
||||
import zuul.lib.auth
|
||||
|
||||
from zuul.lib.config import get_default
|
||||
from zuul.zk import ZooKeeperClient
|
||||
|
||||
|
||||
class WebServer(zuul.cmd.ZuulDaemonApp):
|
||||
app_name = 'web'
|
||||
|
@ -49,31 +46,6 @@ class WebServer(zuul.cmd.ZuulDaemonApp):
|
|||
def _run(self):
|
||||
info = zuul.model.WebInfo.fromConfig(self.config)
|
||||
|
||||
params = dict()
|
||||
|
||||
params['info'] = info
|
||||
params['listen_address'] = get_default(self.config,
|
||||
'web', 'listen_address',
|
||||
'127.0.0.1')
|
||||
params['listen_port'] = get_default(self.config, 'web', 'port', 9000)
|
||||
params['static_cache_expiry'] = get_default(self.config, 'web',
|
||||
'static_cache_expiry',
|
||||
3600)
|
||||
params['static_path'] = get_default(self.config,
|
||||
'web', 'static_path',
|
||||
None)
|
||||
params['gear_server'] = get_default(self.config, 'gearman', 'server')
|
||||
params['gear_port'] = get_default(self.config, 'gearman', 'port', 4730)
|
||||
params['ssl_key'] = get_default(self.config, 'gearman', 'ssl_key')
|
||||
params['ssl_cert'] = get_default(self.config, 'gearman', 'ssl_cert')
|
||||
params['ssl_ca'] = get_default(self.config, 'gearman', 'ssl_ca')
|
||||
|
||||
params['command_socket'] = get_default(
|
||||
self.config, 'web', 'command_socket',
|
||||
'/var/lib/zuul/web.socket')
|
||||
|
||||
params['connections'] = self.connections
|
||||
params['authenticators'] = self.authenticators
|
||||
# Validate config here before we spin up the ZuulWeb object
|
||||
for conn_name, connection in self.connections.connections.items():
|
||||
try:
|
||||
|
@ -82,12 +54,13 @@ class WebServer(zuul.cmd.ZuulDaemonApp):
|
|||
self.log.exception("Error validating config")
|
||||
sys.exit(1)
|
||||
|
||||
zk_client = ZooKeeperClient.fromConfig(self.config)
|
||||
zk_client.connect()
|
||||
params["zk_client"] = zk_client
|
||||
|
||||
try:
|
||||
self.web = zuul.web.ZuulWeb(**params)
|
||||
self.web = zuul.web.ZuulWeb(
|
||||
config=self.config,
|
||||
info=info,
|
||||
connections=self.connections,
|
||||
authenticators=self.authenticators,
|
||||
)
|
||||
except Exception:
|
||||
self.log.exception("Error creating ZuulWeb:")
|
||||
sys.exit(1)
|
||||
|
|
|
@ -2554,14 +2554,13 @@ class ExecutorServer(BaseMergeServer):
|
|||
def __init__(
|
||||
self,
|
||||
config,
|
||||
zk_client,
|
||||
connections=None,
|
||||
jobdir_root=None,
|
||||
keep_jobdir=False,
|
||||
log_streaming_port=DEFAULT_FINGER_PORT,
|
||||
log_console_port=DEFAULT_STREAM_PORT,
|
||||
):
|
||||
super().__init__(config, 'executor', zk_client, connections)
|
||||
super().__init__(config, 'executor', connections)
|
||||
|
||||
self.keep_jobdir = keep_jobdir
|
||||
self.jobdir_root = jobdir_root
|
||||
|
@ -2801,6 +2800,8 @@ class ExecutorServer(BaseMergeServer):
|
|||
|
||||
def stop(self):
|
||||
self.log.debug("Stopping")
|
||||
# Use the BaseMergeServer's stop method to disconnect from ZooKeeper.
|
||||
super().stop()
|
||||
self.connections.stop()
|
||||
self.disk_accountant.stop()
|
||||
# The governor can change function registration, so make sure
|
||||
|
@ -2841,8 +2842,6 @@ class ExecutorServer(BaseMergeServer):
|
|||
|
||||
# All job results should have been sent by now, shutdown the
|
||||
# gearman workers.
|
||||
if self.process_merge_jobs:
|
||||
super().stop()
|
||||
self.executor_gearworker.stop()
|
||||
|
||||
if self.process_worker is not None:
|
||||
|
|
|
@ -16,11 +16,13 @@ import functools
|
|||
import logging
|
||||
import socket
|
||||
import threading
|
||||
from configparser import ConfigParser
|
||||
from typing import Optional, Tuple
|
||||
|
||||
import zuul.rpcclient
|
||||
from zuul.lib import streamer_utils
|
||||
from zuul.lib.commandsocket import CommandSocket
|
||||
from zuul.zk import ZooKeeperClient
|
||||
from zuul.lib import streamer_utils
|
||||
|
||||
|
||||
COMMANDS = ['stop']
|
||||
|
@ -103,8 +105,8 @@ class FingerGateway(object):
|
|||
|
||||
def __init__(
|
||||
self,
|
||||
config: ConfigParser,
|
||||
gearman: Tuple,
|
||||
zk_client: ZooKeeperClient,
|
||||
address: Tuple,
|
||||
user: Optional[str],
|
||||
command_socket: Optional[str],
|
||||
|
@ -113,6 +115,7 @@ class FingerGateway(object):
|
|||
'''
|
||||
Initialize the finger gateway.
|
||||
|
||||
:param config: The parsed Zuul configuration.
|
||||
:param tuple gearman: Gearman connection information. This should
|
||||
include the server, port, SSL key, SSL cert, and SSL CA.
|
||||
:param tuple address: The address and port to bind to for our gateway.
|
||||
|
@ -126,7 +129,6 @@ class FingerGateway(object):
|
|||
self.gear_ssl_key = gearman[2]
|
||||
self.gear_ssl_cert = gearman[3]
|
||||
self.gear_ssl_ca = gearman[4]
|
||||
self.zk_client = zk_client
|
||||
self.address = address
|
||||
self.user = user
|
||||
self.pid_file = pid_file
|
||||
|
@ -144,6 +146,9 @@ class FingerGateway(object):
|
|||
stop=self.stop,
|
||||
)
|
||||
|
||||
self.zk_client = ZooKeeperClient.fromConfig(config)
|
||||
self.zk_client.connect()
|
||||
|
||||
def _runCommand(self):
|
||||
while self.command_running:
|
||||
try:
|
||||
|
@ -219,6 +224,8 @@ class FingerGateway(object):
|
|||
except Exception:
|
||||
self.log.exception("Error stopping command socket:")
|
||||
|
||||
self.zk_client.disconnect()
|
||||
|
||||
self.log.info("Finger gateway is stopped")
|
||||
|
||||
def wait(self):
|
||||
|
|
|
@ -56,7 +56,6 @@ class BaseMergeServer(metaclass=ABCMeta):
|
|||
self,
|
||||
config: ConfigParser,
|
||||
component: str,
|
||||
zk_client: ZooKeeperClient,
|
||||
connections,
|
||||
):
|
||||
self.connections = connections
|
||||
|
@ -72,7 +71,11 @@ class BaseMergeServer(metaclass=ABCMeta):
|
|||
|
||||
self.merge_root = get_default(config, component, 'git_dir',
|
||||
'/var/lib/zuul/{}-git'.format(component))
|
||||
self.zk_client = zk_client
|
||||
|
||||
self.config = config
|
||||
|
||||
self.zk_client = ZooKeeperClient.fromConfig(self.config)
|
||||
self.zk_client.connect()
|
||||
|
||||
# This merger and its git repos are used to maintain
|
||||
# up-to-date copies of all the repos that are used by jobs, as
|
||||
|
@ -80,8 +83,6 @@ class BaseMergeServer(metaclass=ABCMeta):
|
|||
# configuration information to Zuul when it starts.
|
||||
self.merger = self._getMerger(self.merge_root, None)
|
||||
|
||||
self.config = config
|
||||
|
||||
# Repo locking is needed on the executor
|
||||
self.repo_locks = self._repo_locks_class()
|
||||
|
||||
|
@ -100,10 +101,18 @@ class BaseMergeServer(metaclass=ABCMeta):
|
|||
|
||||
def _getMerger(self, root, cache_root, logger=None):
|
||||
return merger.Merger(
|
||||
root, self.connections, self.zk_client, self.merge_email,
|
||||
self.merge_name, self.merge_speed_limit, self.merge_speed_time,
|
||||
cache_root, logger, execution_context=True,
|
||||
git_timeout=self.git_timeout)
|
||||
root,
|
||||
self.connections,
|
||||
self.zk_client,
|
||||
self.merge_email,
|
||||
self.merge_name,
|
||||
self.merge_speed_limit,
|
||||
self.merge_speed_time,
|
||||
cache_root,
|
||||
logger,
|
||||
execution_context=True,
|
||||
git_timeout=self.git_timeout,
|
||||
)
|
||||
|
||||
def _repoLock(self, connection_name, project_name):
|
||||
# The merger does not need locking so return a null lock.
|
||||
|
@ -138,6 +147,7 @@ class BaseMergeServer(metaclass=ABCMeta):
|
|||
def stop(self):
|
||||
self.log.debug('Stopping merger worker')
|
||||
self.merger_gearworker.stop()
|
||||
self.zk_client.disconnect()
|
||||
|
||||
def join(self):
|
||||
self.merger_gearworker.join()
|
||||
|
@ -239,10 +249,9 @@ class MergeServer(BaseMergeServer):
|
|||
def __init__(
|
||||
self,
|
||||
config: ConfigParser,
|
||||
zk_client: ZooKeeperClient,
|
||||
connections,
|
||||
):
|
||||
super().__init__(config, 'merger', zk_client, connections)
|
||||
super().__init__(config, 'merger', connections)
|
||||
|
||||
self.command_map = dict(
|
||||
stop=self.stop,
|
||||
|
|
|
@ -42,6 +42,7 @@ from zuul.lib.statsd import get_statsd, normalize_statsd_name
|
|||
import zuul.lib.queue
|
||||
import zuul.lib.repl
|
||||
from zuul.model import Build, HoldRequest, Tenant, TriggerEvent
|
||||
from zuul.zk import ZooKeeperClient
|
||||
from zuul.zk.nodepool import ZooKeeperNodepool
|
||||
|
||||
COMMANDS = ['full-reconfigure', 'smart-reconfigure', 'stop', 'repl', 'norepl']
|
||||
|
@ -326,6 +327,10 @@ class Scheduler(threading.Thread):
|
|||
self.triggers = dict()
|
||||
self.config = config
|
||||
|
||||
self.zk_client = ZooKeeperClient.fromConfig(self.config)
|
||||
self.zk_client.connect()
|
||||
self.zk_nodepool = ZooKeeperNodepool(self.zk_client)
|
||||
|
||||
self.trigger_event_queue = queue.Queue()
|
||||
self.result_event_queue = queue.Queue()
|
||||
self.management_event_queue = zuul.lib.queue.MergedQueue()
|
||||
|
@ -377,6 +382,7 @@ class Scheduler(threading.Thread):
|
|||
self.stats_thread.start()
|
||||
|
||||
def stop(self):
|
||||
self.zk_client.disconnect()
|
||||
self._stopped = True
|
||||
self.stats_stop.set()
|
||||
self.stopConnections()
|
||||
|
|
|
@ -20,7 +20,6 @@ from ws4py.server.cherrypyserver import WebSocketPlugin, WebSocketTool
|
|||
from ws4py.websocket import WebSocket
|
||||
import codecs
|
||||
import copy
|
||||
from typing import Optional
|
||||
from datetime import datetime
|
||||
import json
|
||||
import logging
|
||||
|
@ -38,6 +37,7 @@ import zuul.rpcclient
|
|||
from zuul.zk import ZooKeeperClient
|
||||
from zuul.zk.nodepool import ZooKeeperNodepool
|
||||
from zuul.lib.auth import AuthenticatorRegistry
|
||||
from zuul.lib.config import get_default
|
||||
|
||||
STATIC_DIR = os.path.join(os.path.dirname(__file__), 'static')
|
||||
cherrypy.tools.websocket = WebSocketTool()
|
||||
|
@ -1208,35 +1208,46 @@ class StreamManager(object):
|
|||
class ZuulWeb(object):
|
||||
log = logging.getLogger("zuul.web.ZuulWeb")
|
||||
|
||||
# There is an import loop with ConnectionRegistry
|
||||
def __init__(self,
|
||||
listen_address: str, listen_port: int,
|
||||
gear_server: str, gear_port: int,
|
||||
connections, # ConnectionRegistry,
|
||||
config,
|
||||
connections,
|
||||
authenticators: AuthenticatorRegistry,
|
||||
zk_client: ZooKeeperClient,
|
||||
ssl_key: str = None, ssl_cert: str = None, ssl_ca: str = None,
|
||||
static_cache_expiry: int = 3600,
|
||||
info: Optional[zuul.model.WebInfo] = None,
|
||||
static_path: Optional[str] = None,
|
||||
command_socket: Optional[str] = None):
|
||||
info: zuul.model.WebInfo = None):
|
||||
self.start_time = time.time()
|
||||
self.listen_address = listen_address
|
||||
self.listen_port = listen_port
|
||||
self.config = config
|
||||
self.listen_address = get_default(self.config,
|
||||
'web', 'listen_address',
|
||||
'127.0.0.1')
|
||||
self.listen_port = get_default(self.config, 'web', 'port', 9000)
|
||||
self.server = None
|
||||
self.static_cache_expiry = static_cache_expiry
|
||||
self.static_cache_expiry = get_default(self.config, 'web',
|
||||
'static_cache_expiry',
|
||||
3600)
|
||||
self.info = info
|
||||
self.static_path = os.path.abspath(static_path or STATIC_DIR)
|
||||
self.static_path = os.path.abspath(
|
||||
get_default(self.config, 'web', 'static_path', STATIC_DIR)
|
||||
)
|
||||
gear_server = get_default(self.config, 'gearman', 'server')
|
||||
gear_port = get_default(self.config, 'gearman', 'port', 4730)
|
||||
ssl_key = get_default(self.config, 'gearman', 'ssl_key')
|
||||
ssl_cert = get_default(self.config, 'gearman', 'ssl_cert')
|
||||
ssl_ca = get_default(self.config, 'gearman', 'ssl_ca')
|
||||
|
||||
# instanciate handlers
|
||||
self.rpc = zuul.rpcclient.RPCClient(gear_server, gear_port,
|
||||
ssl_key, ssl_cert, ssl_ca,
|
||||
client_id='Zuul Web Server')
|
||||
self.zk_client = zk_client
|
||||
self.zk_client = ZooKeeperClient.fromConfig(self.config)
|
||||
|
||||
self.connections = connections
|
||||
self.authenticators = authenticators
|
||||
self.stream_manager = StreamManager()
|
||||
|
||||
command_socket = get_default(
|
||||
self.config, 'web', 'command_socket',
|
||||
'/var/lib/zuul/web.socket'
|
||||
)
|
||||
|
||||
self.command_socket = commandsocket.CommandSocket(command_socket)
|
||||
|
||||
self.repl = None
|
||||
|
@ -1352,8 +1363,8 @@ class ZuulWeb(object):
|
|||
cherrypy.config.update({
|
||||
'global': {
|
||||
'environment': 'production',
|
||||
'server.socket_host': listen_address,
|
||||
'server.socket_port': int(listen_port),
|
||||
'server.socket_host': self.listen_address,
|
||||
'server.socket_port': int(self.listen_port),
|
||||
},
|
||||
})
|
||||
|
||||
|
@ -1365,6 +1376,7 @@ class ZuulWeb(object):
|
|||
|
||||
def start(self):
|
||||
self.log.debug("ZuulWeb starting")
|
||||
self.zk_client.connect()
|
||||
self.stream_manager.start()
|
||||
self.wsplugin = WebSocketPlugin(cherrypy.engine)
|
||||
self.wsplugin.subscribe()
|
||||
|
|
Loading…
Reference in New Issue