Merge "Connect merger to Zookeeper"

This commit is contained in:
Zuul 2021-02-15 18:07:12 +00:00 committed by Gerrit Code Review
commit 284dc65a25
9 changed files with 123 additions and 37 deletions

View File

@ -49,6 +49,9 @@ which is described below.
Scheduler -- Database;
Scheduler -- Gerrit;
Scheduler -- Zookeeper;
Zookeeper -- Executor;
Zookeeper -- Finger;
Zookeeper -- Merger
Zookeeper -- Nodepool;
Scheduler -- GitHub;
Scheduler -- Statsd;

View File

@ -4166,7 +4166,8 @@ class ZuulTestCase(BaseTestCase):
def _startMerger(self):
self.merge_server = zuul.merger.server.MergeServer(
self.config, self.scheds.first.connections)
self.config, self.zk_client, self.scheds.first.connections
)
self.merge_server.start()
def setUp(self):

View File

@ -26,7 +26,8 @@ import time
import zuul.web
import zuul.lib.log_streamer
import zuul.lib.fingergw
from zuul.lib.fingergw import FingerGateway
from zuul.zk import ZooKeeperClient
import tests.base
from tests.base import iterate_timeout, ZuulWebFixture
@ -521,10 +522,14 @@ class TestStreaming(tests.base.AnsibleZuulTestCase):
logfile = open(ansible_log, 'r')
self.addCleanup(logfile.close)
zk_client = ZooKeeperClient()
zk_client.connect(self.zk_config, timeout=30.0)
self.addCleanup(zk_client.disconnect)
# Start the finger gateway daemon
gateway = zuul.lib.fingergw.FingerGateway(
gateway = FingerGateway(
('127.0.0.1', self.gearman_server.port, None, None, None),
(self.host, 0),
zk_client, (self.host, 0),
user=None,
command_socket=None,
pid_file=None

View File

@ -15,11 +15,12 @@
import logging
import signal
import sys
from typing import Optional
import zuul.cmd
import zuul.lib.fingergw
from zuul.lib.config import get_default
from zuul.lib.fingergw import COMMANDS, FingerGateway
from zuul.zk import ZooKeeperClient
class FingerGatewayApp(zuul.cmd.ZuulDaemonApp):
@ -32,12 +33,12 @@ class FingerGatewayApp(zuul.cmd.ZuulDaemonApp):
def __init__(self):
super(FingerGatewayApp, self).__init__()
self.gateway = None
self.gateway: Optional[FingerGateway] = None
def createParser(self):
parser = super(FingerGatewayApp, self).createParser()
parser.add_argument('command',
choices=zuul.lib.fingergw.COMMANDS,
choices=COMMANDS,
nargs='?')
return parser
@ -52,7 +53,7 @@ class FingerGatewayApp(zuul.cmd.ZuulDaemonApp):
Called by the main() method of the parent class.
'''
if self.args.command in zuul.lib.fingergw.COMMANDS:
if self.args.command in COMMANDS:
self.send_command(self.args.command)
sys.exit(0)
@ -72,8 +73,26 @@ class FingerGatewayApp(zuul.cmd.ZuulDaemonApp):
ssl_cert = get_default(self.config, 'gearman', 'ssl_cert')
ssl_ca = get_default(self.config, 'gearman', 'ssl_ca')
self.gateway = zuul.lib.fingergw.FingerGateway(
zk_client = ZooKeeperClient()
zookeeper_hosts = get_default(self.config, 'zookeeper', 'hosts', None)
if not zookeeper_hosts:
raise Exception("The zookeeper hosts config value is required")
zookeeper_tls_key = get_default(self.config, 'zookeeper', 'tls_key')
zookeeper_tls_cert = get_default(self.config, 'zookeeper', 'tls_cert')
zookeeper_tls_ca = get_default(self.config, 'zookeeper', 'tls_ca')
zookeeper_timeout = float(get_default(self.config, 'zookeeper',
'session_timeout', 10.0))
zk_client.connect(
zookeeper_hosts,
timeout=zookeeper_timeout,
tls_cert=zookeeper_tls_cert,
tls_key=zookeeper_tls_key,
tls_ca=zookeeper_tls_ca,
)
self.gateway = FingerGateway(
(gear_server, gear_port, ssl_key, ssl_cert, ssl_ca),
zk_client,
(host, port),
user,
cmdsock,
@ -96,6 +115,7 @@ class FingerGatewayApp(zuul.cmd.ZuulDaemonApp):
break
else:
self.gateway.wait()
zk_client.disconnect()
self.log.info('Stopped Zuul finger gateway app')

View File

@ -18,7 +18,9 @@ import signal
import sys
import zuul.cmd
import zuul.merger.server
from zuul.merger.server import COMMANDS, MergeServer
from zuul.lib.config import get_default
from zuul.zk import ZooKeeperClient
class Merger(zuul.cmd.ZuulDaemonApp):
@ -28,7 +30,7 @@ class Merger(zuul.cmd.ZuulDaemonApp):
def createParser(self):
parser = super(Merger, self).createParser()
parser.add_argument('command',
choices=zuul.merger.server.COMMANDS,
choices=COMMANDS,
nargs='?')
return parser
@ -43,7 +45,7 @@ class Merger(zuul.cmd.ZuulDaemonApp):
sys.exit(0)
def run(self):
if self.args.command in zuul.merger.server.COMMANDS:
if self.args.command in COMMANDS:
self.send_command(self.args.command)
sys.exit(0)
@ -51,8 +53,24 @@ class Merger(zuul.cmd.ZuulDaemonApp):
self.setup_logging('merger', 'log_config')
self.merger = zuul.merger.server.MergeServer(self.config,
self.connections)
zk_client = ZooKeeperClient()
zookeeper_hosts = get_default(self.config, 'zookeeper', 'hosts', None)
if not zookeeper_hosts:
raise Exception("The zookeeper hosts config value is required")
zookeeper_tls_key = get_default(self.config, 'zookeeper', 'tls_key')
zookeeper_tls_cert = get_default(self.config, 'zookeeper', 'tls_cert')
zookeeper_tls_ca = get_default(self.config, 'zookeeper', 'tls_ca')
zookeeper_timeout = float(get_default(self.config, 'zookeeper',
'session_timeout', 10.0))
zk_client.connect(
zookeeper_hosts,
timeout=zookeeper_timeout,
tls_cert=zookeeper_tls_cert,
tls_key=zookeeper_tls_key,
tls_ca=zookeeper_tls_ca,
)
self.merger = MergeServer(self.config, zk_client, self.connections)
self.merger.start()
if self.args.nodaemon:
@ -65,6 +83,7 @@ class Merger(zuul.cmd.ZuulDaemonApp):
self.exit_handler(signal.SIGINT, None)
else:
self.merger.join()
zk_client.disconnect()
def main():

View File

@ -2559,9 +2559,7 @@ class ExecutorServer(BaseMergeServer):
log_streaming_port=DEFAULT_FINGER_PORT,
log_console_port=DEFAULT_STREAM_PORT,
):
# TODO(jeblair): add zk client to merger and remove this assignment
self.zk_client = zk_client
super().__init__(config, 'executor', connections)
super().__init__(config, 'executor', zk_client, connections)
self.keep_jobdir = keep_jobdir
self.jobdir_root = jobdir_root

View File

@ -16,10 +16,10 @@ import functools
import logging
import socket
import threading
from typing import Optional, Tuple
import zuul.rpcclient
from zuul.lib import commandsocket
from zuul.lib.commandsocket import CommandSocket
from zuul.zk import ZooKeeperClient
from zuul.lib import streamer_utils
@ -101,7 +101,15 @@ class FingerGateway(object):
log = logging.getLogger("zuul.fingergw")
def __init__(self, gearman, address, user, command_socket, pid_file):
def __init__(
self,
gearman: Tuple,
zk_client: ZooKeeperClient,
address: Tuple,
user: Optional[str],
command_socket: Optional[str],
pid_file: Optional[str],
):
'''
Initialize the finger gateway.
@ -118,6 +126,7 @@ class FingerGateway(object):
self.gear_ssl_key = gearman[2]
self.gear_ssl_cert = gearman[3]
self.gear_ssl_ca = gearman[4]
self.zk_client = zk_client
self.address = address
self.user = user
self.pid_file = pid_file
@ -128,7 +137,8 @@ class FingerGateway(object):
self.command_thread = None
self.command_running = False
self.command_socket = command_socket
self.command_socket_path = command_socket
self.command_socket = None
self.command_map = dict(
stop=self.stop,
@ -168,10 +178,9 @@ class FingerGateway(object):
pid_file=self.pid_file)
# Start the command processor after the server and privilege drop
if self.command_socket:
if self.command_socket_path:
self.log.debug("Starting command processor")
self.command_socket = commandsocket.CommandSocket(
self.command_socket)
self.command_socket = CommandSocket(self.command_socket_path)
self.command_socket.start()
self.command_running = True
self.command_thread = threading.Thread(

View File

@ -14,6 +14,8 @@
# under the License.
from contextlib import contextmanager
from logging import Logger
from typing import Optional
from urllib.parse import urlsplit, urlunsplit, urlparse
import hashlib
import logging
@ -25,6 +27,7 @@ import time
import git
import gitdb
import paramiko
from zuul.zk import ZooKeeperClient
import zuul.model
@ -721,18 +724,30 @@ class Repo(object):
class Merger(object):
def __init__(self, working_root, connections, email, username,
speed_limit, speed_time, cache_root=None, logger=None,
execution_context=False, git_timeout=300):
def __init__(
self,
working_root: str,
connections,
zk_client: ZooKeeperClient,
email: str,
username: str,
speed_limit: str,
speed_time: str,
cache_root: Optional[str] = None,
logger: Optional[Logger] = None,
execution_context: bool = False,
git_timeout: int = 300,
):
self.logger = logger
if logger is None:
self.log = logging.getLogger("zuul.Merger")
else:
self.log = logger
self.repos = {}
self.repos = {} # type: ignore
self.working_root = working_root
os.makedirs(working_root, exist_ok=True)
self.connections = connections
self.zk_client = zk_client
self.email = email
self.username = username
self.speed_limit = speed_limit

View File

@ -17,6 +17,9 @@ import logging
import os
import threading
from abc import ABCMeta
from configparser import ConfigParser
from zuul.zk import ZooKeeperClient
from zuul.lib import commandsocket
from zuul.lib.config import get_default
@ -49,8 +52,14 @@ class BaseMergeServer(metaclass=ABCMeta):
_repo_locks_class = BaseRepoLocks
def __init__(self, config, component, connections=None):
self.connections = connections or {}
def __init__(
self,
config: ConfigParser,
component: str,
zk_client: ZooKeeperClient,
connections,
):
self.connections = connections
self.merge_email = get_default(config, 'merger', 'git_user_email',
'zuul.merger.default@example.com')
self.merge_name = get_default(config, 'merger', 'git_user_name',
@ -63,6 +72,7 @@ class BaseMergeServer(metaclass=ABCMeta):
self.merge_root = get_default(config, component, 'git_dir',
'/var/lib/zuul/{}-git'.format(component))
self.zk_client = zk_client
# This merger and its git repos are used to maintain
# up-to-date copies of all the repos that are used by jobs, as
@ -90,9 +100,10 @@ class BaseMergeServer(metaclass=ABCMeta):
def _getMerger(self, root, cache_root, logger=None):
return merger.Merger(
root, self.connections, self.merge_email, self.merge_name,
self.merge_speed_limit, self.merge_speed_time, cache_root, logger,
execution_context=True, git_timeout=self.git_timeout)
root, self.connections, self.zk_client, self.merge_email,
self.merge_name, self.merge_speed_limit, self.merge_speed_time,
cache_root, logger, execution_context=True,
git_timeout=self.git_timeout)
def _repoLock(self, connection_name, project_name):
# The merger does not need locking so return a null lock.
@ -228,8 +239,13 @@ class BaseMergeServer(metaclass=ABCMeta):
class MergeServer(BaseMergeServer):
log = logging.getLogger("zuul.MergeServer")
def __init__(self, config, connections=None):
super().__init__(config, 'merger', connections)
def __init__(
self,
config: ConfigParser,
zk_client: ZooKeeperClient,
connections,
):
super().__init__(config, 'merger', zk_client, connections)
self.command_map = dict(
stop=self.stop,