From 7ae2805a5a601b47a90f70532fc045a00a3af7ca Mon Sep 17 00:00:00 2001 From: Jan Kubovy Date: Thu, 23 Apr 2020 12:05:34 +0200 Subject: [PATCH] Connect merger to Zookeeper Part of point 5 in https://etherpad.openstack.org/p/zuulv4 Connection is idle for now. Also update component documentation. Change-Id: I97a97f61940fab2a555c3651e78fa7a929e8ebfb --- doc/source/discussion/components.rst | 3 +++ tests/base.py | 3 ++- tests/unit/test_streaming.py | 11 +++++++--- zuul/cmd/fingergw.py | 32 ++++++++++++++++++++++------ zuul/cmd/merger.py | 29 ++++++++++++++++++++----- zuul/executor/server.py | 4 +--- zuul/lib/fingergw.py | 25 +++++++++++++++------- zuul/merger/merger.py | 23 ++++++++++++++++---- zuul/merger/server.py | 30 ++++++++++++++++++++------ 9 files changed, 123 insertions(+), 37 deletions(-) diff --git a/doc/source/discussion/components.rst b/doc/source/discussion/components.rst index 7cf715313a..44c5bddf98 100644 --- a/doc/source/discussion/components.rst +++ b/doc/source/discussion/components.rst @@ -49,6 +49,9 @@ which is described below. Scheduler -- Database; Scheduler -- Gerrit; Scheduler -- Zookeeper; + Zookeeper -- Executor; + Zookeeper -- Finger; + Zookeeper -- Merger Zookeeper -- Nodepool; Scheduler -- GitHub; Scheduler -- Statsd; diff --git a/tests/base.py b/tests/base.py index 32149d6b89..46d3878f2e 100644 --- a/tests/base.py +++ b/tests/base.py @@ -4166,7 +4166,8 @@ class ZuulTestCase(BaseTestCase): def _startMerger(self): self.merge_server = zuul.merger.server.MergeServer( - self.config, self.scheds.first.connections) + self.config, self.zk_client, self.scheds.first.connections + ) self.merge_server.start() def setUp(self): diff --git a/tests/unit/test_streaming.py b/tests/unit/test_streaming.py index 98a7d21d9c..a117b283ae 100644 --- a/tests/unit/test_streaming.py +++ b/tests/unit/test_streaming.py @@ -26,7 +26,8 @@ import time import zuul.web import zuul.lib.log_streamer -import zuul.lib.fingergw +from zuul.lib.fingergw import FingerGateway +from zuul.zk import ZooKeeperClient import tests.base from tests.base import iterate_timeout, ZuulWebFixture @@ -521,10 +522,14 @@ class TestStreaming(tests.base.AnsibleZuulTestCase): logfile = open(ansible_log, 'r') self.addCleanup(logfile.close) + zk_client = ZooKeeperClient() + zk_client.connect(self.zk_config, timeout=30.0) + self.addCleanup(zk_client.disconnect) + # Start the finger gateway daemon - gateway = zuul.lib.fingergw.FingerGateway( + gateway = FingerGateway( ('127.0.0.1', self.gearman_server.port, None, None, None), - (self.host, 0), + zk_client, (self.host, 0), user=None, command_socket=None, pid_file=None diff --git a/zuul/cmd/fingergw.py b/zuul/cmd/fingergw.py index 63a5401031..abc8c3bef4 100644 --- a/zuul/cmd/fingergw.py +++ b/zuul/cmd/fingergw.py @@ -15,11 +15,12 @@ import logging import signal import sys +from typing import Optional import zuul.cmd -import zuul.lib.fingergw - from zuul.lib.config import get_default +from zuul.lib.fingergw import COMMANDS, FingerGateway +from zuul.zk import ZooKeeperClient class FingerGatewayApp(zuul.cmd.ZuulDaemonApp): @@ -32,12 +33,12 @@ class FingerGatewayApp(zuul.cmd.ZuulDaemonApp): def __init__(self): super(FingerGatewayApp, self).__init__() - self.gateway = None + self.gateway: Optional[FingerGateway] = None def createParser(self): parser = super(FingerGatewayApp, self).createParser() parser.add_argument('command', - choices=zuul.lib.fingergw.COMMANDS, + choices=COMMANDS, nargs='?') return parser @@ -52,7 +53,7 @@ class FingerGatewayApp(zuul.cmd.ZuulDaemonApp): Called by the main() method of the parent class. ''' - if self.args.command in zuul.lib.fingergw.COMMANDS: + if self.args.command in COMMANDS: self.send_command(self.args.command) sys.exit(0) @@ -72,8 +73,26 @@ class FingerGatewayApp(zuul.cmd.ZuulDaemonApp): ssl_cert = get_default(self.config, 'gearman', 'ssl_cert') ssl_ca = get_default(self.config, 'gearman', 'ssl_ca') - self.gateway = zuul.lib.fingergw.FingerGateway( + zk_client = ZooKeeperClient() + zookeeper_hosts = get_default(self.config, 'zookeeper', 'hosts', None) + if not zookeeper_hosts: + raise Exception("The zookeeper hosts config value is required") + zookeeper_tls_key = get_default(self.config, 'zookeeper', 'tls_key') + zookeeper_tls_cert = get_default(self.config, 'zookeeper', 'tls_cert') + zookeeper_tls_ca = get_default(self.config, 'zookeeper', 'tls_ca') + zookeeper_timeout = float(get_default(self.config, 'zookeeper', + 'session_timeout', 10.0)) + zk_client.connect( + zookeeper_hosts, + timeout=zookeeper_timeout, + tls_cert=zookeeper_tls_cert, + tls_key=zookeeper_tls_key, + tls_ca=zookeeper_tls_ca, + ) + + self.gateway = FingerGateway( (gear_server, gear_port, ssl_key, ssl_cert, ssl_ca), + zk_client, (host, port), user, cmdsock, @@ -96,6 +115,7 @@ class FingerGatewayApp(zuul.cmd.ZuulDaemonApp): break else: self.gateway.wait() + zk_client.disconnect() self.log.info('Stopped Zuul finger gateway app') diff --git a/zuul/cmd/merger.py b/zuul/cmd/merger.py index 1d4b6fefff..e21da91b9c 100755 --- a/zuul/cmd/merger.py +++ b/zuul/cmd/merger.py @@ -18,7 +18,9 @@ import signal import sys import zuul.cmd -import zuul.merger.server +from zuul.merger.server import COMMANDS, MergeServer +from zuul.lib.config import get_default +from zuul.zk import ZooKeeperClient class Merger(zuul.cmd.ZuulDaemonApp): @@ -28,7 +30,7 @@ class Merger(zuul.cmd.ZuulDaemonApp): def createParser(self): parser = super(Merger, self).createParser() parser.add_argument('command', - choices=zuul.merger.server.COMMANDS, + choices=COMMANDS, nargs='?') return parser @@ -43,7 +45,7 @@ class Merger(zuul.cmd.ZuulDaemonApp): sys.exit(0) def run(self): - if self.args.command in zuul.merger.server.COMMANDS: + if self.args.command in COMMANDS: self.send_command(self.args.command) sys.exit(0) @@ -51,8 +53,24 @@ class Merger(zuul.cmd.ZuulDaemonApp): self.setup_logging('merger', 'log_config') - self.merger = zuul.merger.server.MergeServer(self.config, - self.connections) + zk_client = ZooKeeperClient() + zookeeper_hosts = get_default(self.config, 'zookeeper', 'hosts', None) + if not zookeeper_hosts: + raise Exception("The zookeeper hosts config value is required") + zookeeper_tls_key = get_default(self.config, 'zookeeper', 'tls_key') + zookeeper_tls_cert = get_default(self.config, 'zookeeper', 'tls_cert') + zookeeper_tls_ca = get_default(self.config, 'zookeeper', 'tls_ca') + zookeeper_timeout = float(get_default(self.config, 'zookeeper', + 'session_timeout', 10.0)) + zk_client.connect( + zookeeper_hosts, + timeout=zookeeper_timeout, + tls_cert=zookeeper_tls_cert, + tls_key=zookeeper_tls_key, + tls_ca=zookeeper_tls_ca, + ) + + self.merger = MergeServer(self.config, zk_client, self.connections) self.merger.start() if self.args.nodaemon: @@ -65,6 +83,7 @@ class Merger(zuul.cmd.ZuulDaemonApp): self.exit_handler(signal.SIGINT, None) else: self.merger.join() + zk_client.disconnect() def main(): diff --git a/zuul/executor/server.py b/zuul/executor/server.py index c6665d0bb7..766a6e04f0 100644 --- a/zuul/executor/server.py +++ b/zuul/executor/server.py @@ -2559,9 +2559,7 @@ class ExecutorServer(BaseMergeServer): log_streaming_port=DEFAULT_FINGER_PORT, log_console_port=DEFAULT_STREAM_PORT, ): - # TODO(jeblair): add zk client to merger and remove this assignment - self.zk_client = zk_client - super().__init__(config, 'executor', connections) + super().__init__(config, 'executor', zk_client, connections) self.keep_jobdir = keep_jobdir self.jobdir_root = jobdir_root diff --git a/zuul/lib/fingergw.py b/zuul/lib/fingergw.py index 85efdc4c25..070fefed4f 100644 --- a/zuul/lib/fingergw.py +++ b/zuul/lib/fingergw.py @@ -16,10 +16,10 @@ import functools import logging import socket import threading - +from typing import Optional, Tuple import zuul.rpcclient - -from zuul.lib import commandsocket +from zuul.lib.commandsocket import CommandSocket +from zuul.zk import ZooKeeperClient from zuul.lib import streamer_utils @@ -101,7 +101,15 @@ class FingerGateway(object): log = logging.getLogger("zuul.fingergw") - def __init__(self, gearman, address, user, command_socket, pid_file): + def __init__( + self, + gearman: Tuple, + zk_client: ZooKeeperClient, + address: Tuple, + user: Optional[str], + command_socket: Optional[str], + pid_file: Optional[str], + ): ''' Initialize the finger gateway. @@ -118,6 +126,7 @@ class FingerGateway(object): self.gear_ssl_key = gearman[2] self.gear_ssl_cert = gearman[3] self.gear_ssl_ca = gearman[4] + self.zk_client = zk_client self.address = address self.user = user self.pid_file = pid_file @@ -128,7 +137,8 @@ class FingerGateway(object): self.command_thread = None self.command_running = False - self.command_socket = command_socket + self.command_socket_path = command_socket + self.command_socket = None self.command_map = dict( stop=self.stop, @@ -168,10 +178,9 @@ class FingerGateway(object): pid_file=self.pid_file) # Start the command processor after the server and privilege drop - if self.command_socket: + if self.command_socket_path: self.log.debug("Starting command processor") - self.command_socket = commandsocket.CommandSocket( - self.command_socket) + self.command_socket = CommandSocket(self.command_socket_path) self.command_socket.start() self.command_running = True self.command_thread = threading.Thread( diff --git a/zuul/merger/merger.py b/zuul/merger/merger.py index 81114179d0..ffddb2ac24 100644 --- a/zuul/merger/merger.py +++ b/zuul/merger/merger.py @@ -14,6 +14,8 @@ # under the License. from contextlib import contextmanager +from logging import Logger +from typing import Optional from urllib.parse import urlsplit, urlunsplit, urlparse import hashlib import logging @@ -25,6 +27,7 @@ import time import git import gitdb import paramiko +from zuul.zk import ZooKeeperClient import zuul.model @@ -721,18 +724,30 @@ class Repo(object): class Merger(object): - def __init__(self, working_root, connections, email, username, - speed_limit, speed_time, cache_root=None, logger=None, - execution_context=False, git_timeout=300): + def __init__( + self, + working_root: str, + connections, + zk_client: ZooKeeperClient, + email: str, + username: str, + speed_limit: str, + speed_time: str, + cache_root: Optional[str] = None, + logger: Optional[Logger] = None, + execution_context: bool = False, + git_timeout: int = 300, + ): self.logger = logger if logger is None: self.log = logging.getLogger("zuul.Merger") else: self.log = logger - self.repos = {} + self.repos = {} # type: ignore self.working_root = working_root os.makedirs(working_root, exist_ok=True) self.connections = connections + self.zk_client = zk_client self.email = email self.username = username self.speed_limit = speed_limit diff --git a/zuul/merger/server.py b/zuul/merger/server.py index 7b2090c5ee..64fa250ad5 100644 --- a/zuul/merger/server.py +++ b/zuul/merger/server.py @@ -17,6 +17,9 @@ import logging import os import threading from abc import ABCMeta +from configparser import ConfigParser + +from zuul.zk import ZooKeeperClient from zuul.lib import commandsocket from zuul.lib.config import get_default @@ -49,8 +52,14 @@ class BaseMergeServer(metaclass=ABCMeta): _repo_locks_class = BaseRepoLocks - def __init__(self, config, component, connections=None): - self.connections = connections or {} + def __init__( + self, + config: ConfigParser, + component: str, + zk_client: ZooKeeperClient, + connections, + ): + self.connections = connections self.merge_email = get_default(config, 'merger', 'git_user_email', 'zuul.merger.default@example.com') self.merge_name = get_default(config, 'merger', 'git_user_name', @@ -63,6 +72,7 @@ class BaseMergeServer(metaclass=ABCMeta): self.merge_root = get_default(config, component, 'git_dir', '/var/lib/zuul/{}-git'.format(component)) + self.zk_client = zk_client # This merger and its git repos are used to maintain # up-to-date copies of all the repos that are used by jobs, as @@ -90,9 +100,10 @@ class BaseMergeServer(metaclass=ABCMeta): def _getMerger(self, root, cache_root, logger=None): return merger.Merger( - root, self.connections, self.merge_email, self.merge_name, - self.merge_speed_limit, self.merge_speed_time, cache_root, logger, - execution_context=True, git_timeout=self.git_timeout) + root, self.connections, self.zk_client, self.merge_email, + self.merge_name, self.merge_speed_limit, self.merge_speed_time, + cache_root, logger, execution_context=True, + git_timeout=self.git_timeout) def _repoLock(self, connection_name, project_name): # The merger does not need locking so return a null lock. @@ -228,8 +239,13 @@ class BaseMergeServer(metaclass=ABCMeta): class MergeServer(BaseMergeServer): log = logging.getLogger("zuul.MergeServer") - def __init__(self, config, connections=None): - super().__init__(config, 'merger', connections) + def __init__( + self, + config: ConfigParser, + zk_client: ZooKeeperClient, + connections, + ): + super().__init__(config, 'merger', zk_client, connections) self.command_map = dict( stop=self.stop,