Connect merger to Zookeeper
Part of point 5 in https://etherpad.openstack.org/p/zuulv4 Connection is idle for now. Also update component documentation. Change-Id: I97a97f61940fab2a555c3651e78fa7a929e8ebfb
This commit is contained in:
parent
f14e998438
commit
7ae2805a5a
|
@ -49,6 +49,9 @@ which is described below.
|
|||
Scheduler -- Database;
|
||||
Scheduler -- Gerrit;
|
||||
Scheduler -- Zookeeper;
|
||||
Zookeeper -- Executor;
|
||||
Zookeeper -- Finger;
|
||||
Zookeeper -- Merger
|
||||
Zookeeper -- Nodepool;
|
||||
Scheduler -- GitHub;
|
||||
Scheduler -- Statsd;
|
||||
|
|
|
@ -4166,7 +4166,8 @@ class ZuulTestCase(BaseTestCase):
|
|||
|
||||
def _startMerger(self):
|
||||
self.merge_server = zuul.merger.server.MergeServer(
|
||||
self.config, self.scheds.first.connections)
|
||||
self.config, self.zk_client, self.scheds.first.connections
|
||||
)
|
||||
self.merge_server.start()
|
||||
|
||||
def setUp(self):
|
||||
|
|
|
@ -26,7 +26,8 @@ import time
|
|||
|
||||
import zuul.web
|
||||
import zuul.lib.log_streamer
|
||||
import zuul.lib.fingergw
|
||||
from zuul.lib.fingergw import FingerGateway
|
||||
from zuul.zk import ZooKeeperClient
|
||||
import tests.base
|
||||
from tests.base import iterate_timeout, ZuulWebFixture
|
||||
|
||||
|
@ -521,10 +522,14 @@ class TestStreaming(tests.base.AnsibleZuulTestCase):
|
|||
logfile = open(ansible_log, 'r')
|
||||
self.addCleanup(logfile.close)
|
||||
|
||||
zk_client = ZooKeeperClient()
|
||||
zk_client.connect(self.zk_config, timeout=30.0)
|
||||
self.addCleanup(zk_client.disconnect)
|
||||
|
||||
# Start the finger gateway daemon
|
||||
gateway = zuul.lib.fingergw.FingerGateway(
|
||||
gateway = FingerGateway(
|
||||
('127.0.0.1', self.gearman_server.port, None, None, None),
|
||||
(self.host, 0),
|
||||
zk_client, (self.host, 0),
|
||||
user=None,
|
||||
command_socket=None,
|
||||
pid_file=None
|
||||
|
|
|
@ -15,11 +15,12 @@
|
|||
import logging
|
||||
import signal
|
||||
import sys
|
||||
from typing import Optional
|
||||
|
||||
import zuul.cmd
|
||||
import zuul.lib.fingergw
|
||||
|
||||
from zuul.lib.config import get_default
|
||||
from zuul.lib.fingergw import COMMANDS, FingerGateway
|
||||
from zuul.zk import ZooKeeperClient
|
||||
|
||||
|
||||
class FingerGatewayApp(zuul.cmd.ZuulDaemonApp):
|
||||
|
@ -32,12 +33,12 @@ class FingerGatewayApp(zuul.cmd.ZuulDaemonApp):
|
|||
|
||||
def __init__(self):
|
||||
super(FingerGatewayApp, self).__init__()
|
||||
self.gateway = None
|
||||
self.gateway: Optional[FingerGateway] = None
|
||||
|
||||
def createParser(self):
|
||||
parser = super(FingerGatewayApp, self).createParser()
|
||||
parser.add_argument('command',
|
||||
choices=zuul.lib.fingergw.COMMANDS,
|
||||
choices=COMMANDS,
|
||||
nargs='?')
|
||||
return parser
|
||||
|
||||
|
@ -52,7 +53,7 @@ class FingerGatewayApp(zuul.cmd.ZuulDaemonApp):
|
|||
|
||||
Called by the main() method of the parent class.
|
||||
'''
|
||||
if self.args.command in zuul.lib.fingergw.COMMANDS:
|
||||
if self.args.command in COMMANDS:
|
||||
self.send_command(self.args.command)
|
||||
sys.exit(0)
|
||||
|
||||
|
@ -72,8 +73,26 @@ class FingerGatewayApp(zuul.cmd.ZuulDaemonApp):
|
|||
ssl_cert = get_default(self.config, 'gearman', 'ssl_cert')
|
||||
ssl_ca = get_default(self.config, 'gearman', 'ssl_ca')
|
||||
|
||||
self.gateway = zuul.lib.fingergw.FingerGateway(
|
||||
zk_client = ZooKeeperClient()
|
||||
zookeeper_hosts = get_default(self.config, 'zookeeper', 'hosts', None)
|
||||
if not zookeeper_hosts:
|
||||
raise Exception("The zookeeper hosts config value is required")
|
||||
zookeeper_tls_key = get_default(self.config, 'zookeeper', 'tls_key')
|
||||
zookeeper_tls_cert = get_default(self.config, 'zookeeper', 'tls_cert')
|
||||
zookeeper_tls_ca = get_default(self.config, 'zookeeper', 'tls_ca')
|
||||
zookeeper_timeout = float(get_default(self.config, 'zookeeper',
|
||||
'session_timeout', 10.0))
|
||||
zk_client.connect(
|
||||
zookeeper_hosts,
|
||||
timeout=zookeeper_timeout,
|
||||
tls_cert=zookeeper_tls_cert,
|
||||
tls_key=zookeeper_tls_key,
|
||||
tls_ca=zookeeper_tls_ca,
|
||||
)
|
||||
|
||||
self.gateway = FingerGateway(
|
||||
(gear_server, gear_port, ssl_key, ssl_cert, ssl_ca),
|
||||
zk_client,
|
||||
(host, port),
|
||||
user,
|
||||
cmdsock,
|
||||
|
@ -96,6 +115,7 @@ class FingerGatewayApp(zuul.cmd.ZuulDaemonApp):
|
|||
break
|
||||
else:
|
||||
self.gateway.wait()
|
||||
zk_client.disconnect()
|
||||
|
||||
self.log.info('Stopped Zuul finger gateway app')
|
||||
|
||||
|
|
|
@ -18,7 +18,9 @@ import signal
|
|||
import sys
|
||||
|
||||
import zuul.cmd
|
||||
import zuul.merger.server
|
||||
from zuul.merger.server import COMMANDS, MergeServer
|
||||
from zuul.lib.config import get_default
|
||||
from zuul.zk import ZooKeeperClient
|
||||
|
||||
|
||||
class Merger(zuul.cmd.ZuulDaemonApp):
|
||||
|
@ -28,7 +30,7 @@ class Merger(zuul.cmd.ZuulDaemonApp):
|
|||
def createParser(self):
|
||||
parser = super(Merger, self).createParser()
|
||||
parser.add_argument('command',
|
||||
choices=zuul.merger.server.COMMANDS,
|
||||
choices=COMMANDS,
|
||||
nargs='?')
|
||||
return parser
|
||||
|
||||
|
@ -43,7 +45,7 @@ class Merger(zuul.cmd.ZuulDaemonApp):
|
|||
sys.exit(0)
|
||||
|
||||
def run(self):
|
||||
if self.args.command in zuul.merger.server.COMMANDS:
|
||||
if self.args.command in COMMANDS:
|
||||
self.send_command(self.args.command)
|
||||
sys.exit(0)
|
||||
|
||||
|
@ -51,8 +53,24 @@ class Merger(zuul.cmd.ZuulDaemonApp):
|
|||
|
||||
self.setup_logging('merger', 'log_config')
|
||||
|
||||
self.merger = zuul.merger.server.MergeServer(self.config,
|
||||
self.connections)
|
||||
zk_client = ZooKeeperClient()
|
||||
zookeeper_hosts = get_default(self.config, 'zookeeper', 'hosts', None)
|
||||
if not zookeeper_hosts:
|
||||
raise Exception("The zookeeper hosts config value is required")
|
||||
zookeeper_tls_key = get_default(self.config, 'zookeeper', 'tls_key')
|
||||
zookeeper_tls_cert = get_default(self.config, 'zookeeper', 'tls_cert')
|
||||
zookeeper_tls_ca = get_default(self.config, 'zookeeper', 'tls_ca')
|
||||
zookeeper_timeout = float(get_default(self.config, 'zookeeper',
|
||||
'session_timeout', 10.0))
|
||||
zk_client.connect(
|
||||
zookeeper_hosts,
|
||||
timeout=zookeeper_timeout,
|
||||
tls_cert=zookeeper_tls_cert,
|
||||
tls_key=zookeeper_tls_key,
|
||||
tls_ca=zookeeper_tls_ca,
|
||||
)
|
||||
|
||||
self.merger = MergeServer(self.config, zk_client, self.connections)
|
||||
self.merger.start()
|
||||
|
||||
if self.args.nodaemon:
|
||||
|
@ -65,6 +83,7 @@ class Merger(zuul.cmd.ZuulDaemonApp):
|
|||
self.exit_handler(signal.SIGINT, None)
|
||||
else:
|
||||
self.merger.join()
|
||||
zk_client.disconnect()
|
||||
|
||||
|
||||
def main():
|
||||
|
|
|
@ -2559,9 +2559,7 @@ class ExecutorServer(BaseMergeServer):
|
|||
log_streaming_port=DEFAULT_FINGER_PORT,
|
||||
log_console_port=DEFAULT_STREAM_PORT,
|
||||
):
|
||||
# TODO(jeblair): add zk client to merger and remove this assignment
|
||||
self.zk_client = zk_client
|
||||
super().__init__(config, 'executor', connections)
|
||||
super().__init__(config, 'executor', zk_client, connections)
|
||||
|
||||
self.keep_jobdir = keep_jobdir
|
||||
self.jobdir_root = jobdir_root
|
||||
|
|
|
@ -16,10 +16,10 @@ import functools
|
|||
import logging
|
||||
import socket
|
||||
import threading
|
||||
|
||||
from typing import Optional, Tuple
|
||||
import zuul.rpcclient
|
||||
|
||||
from zuul.lib import commandsocket
|
||||
from zuul.lib.commandsocket import CommandSocket
|
||||
from zuul.zk import ZooKeeperClient
|
||||
from zuul.lib import streamer_utils
|
||||
|
||||
|
||||
|
@ -101,7 +101,15 @@ class FingerGateway(object):
|
|||
|
||||
log = logging.getLogger("zuul.fingergw")
|
||||
|
||||
def __init__(self, gearman, address, user, command_socket, pid_file):
|
||||
def __init__(
|
||||
self,
|
||||
gearman: Tuple,
|
||||
zk_client: ZooKeeperClient,
|
||||
address: Tuple,
|
||||
user: Optional[str],
|
||||
command_socket: Optional[str],
|
||||
pid_file: Optional[str],
|
||||
):
|
||||
'''
|
||||
Initialize the finger gateway.
|
||||
|
||||
|
@ -118,6 +126,7 @@ class FingerGateway(object):
|
|||
self.gear_ssl_key = gearman[2]
|
||||
self.gear_ssl_cert = gearman[3]
|
||||
self.gear_ssl_ca = gearman[4]
|
||||
self.zk_client = zk_client
|
||||
self.address = address
|
||||
self.user = user
|
||||
self.pid_file = pid_file
|
||||
|
@ -128,7 +137,8 @@ class FingerGateway(object):
|
|||
|
||||
self.command_thread = None
|
||||
self.command_running = False
|
||||
self.command_socket = command_socket
|
||||
self.command_socket_path = command_socket
|
||||
self.command_socket = None
|
||||
|
||||
self.command_map = dict(
|
||||
stop=self.stop,
|
||||
|
@ -168,10 +178,9 @@ class FingerGateway(object):
|
|||
pid_file=self.pid_file)
|
||||
|
||||
# Start the command processor after the server and privilege drop
|
||||
if self.command_socket:
|
||||
if self.command_socket_path:
|
||||
self.log.debug("Starting command processor")
|
||||
self.command_socket = commandsocket.CommandSocket(
|
||||
self.command_socket)
|
||||
self.command_socket = CommandSocket(self.command_socket_path)
|
||||
self.command_socket.start()
|
||||
self.command_running = True
|
||||
self.command_thread = threading.Thread(
|
||||
|
|
|
@ -14,6 +14,8 @@
|
|||
# under the License.
|
||||
|
||||
from contextlib import contextmanager
|
||||
from logging import Logger
|
||||
from typing import Optional
|
||||
from urllib.parse import urlsplit, urlunsplit, urlparse
|
||||
import hashlib
|
||||
import logging
|
||||
|
@ -25,6 +27,7 @@ import time
|
|||
import git
|
||||
import gitdb
|
||||
import paramiko
|
||||
from zuul.zk import ZooKeeperClient
|
||||
|
||||
import zuul.model
|
||||
|
||||
|
@ -721,18 +724,30 @@ class Repo(object):
|
|||
|
||||
|
||||
class Merger(object):
|
||||
def __init__(self, working_root, connections, email, username,
|
||||
speed_limit, speed_time, cache_root=None, logger=None,
|
||||
execution_context=False, git_timeout=300):
|
||||
def __init__(
|
||||
self,
|
||||
working_root: str,
|
||||
connections,
|
||||
zk_client: ZooKeeperClient,
|
||||
email: str,
|
||||
username: str,
|
||||
speed_limit: str,
|
||||
speed_time: str,
|
||||
cache_root: Optional[str] = None,
|
||||
logger: Optional[Logger] = None,
|
||||
execution_context: bool = False,
|
||||
git_timeout: int = 300,
|
||||
):
|
||||
self.logger = logger
|
||||
if logger is None:
|
||||
self.log = logging.getLogger("zuul.Merger")
|
||||
else:
|
||||
self.log = logger
|
||||
self.repos = {}
|
||||
self.repos = {} # type: ignore
|
||||
self.working_root = working_root
|
||||
os.makedirs(working_root, exist_ok=True)
|
||||
self.connections = connections
|
||||
self.zk_client = zk_client
|
||||
self.email = email
|
||||
self.username = username
|
||||
self.speed_limit = speed_limit
|
||||
|
|
|
@ -17,6 +17,9 @@ import logging
|
|||
import os
|
||||
import threading
|
||||
from abc import ABCMeta
|
||||
from configparser import ConfigParser
|
||||
|
||||
from zuul.zk import ZooKeeperClient
|
||||
|
||||
from zuul.lib import commandsocket
|
||||
from zuul.lib.config import get_default
|
||||
|
@ -49,8 +52,14 @@ class BaseMergeServer(metaclass=ABCMeta):
|
|||
|
||||
_repo_locks_class = BaseRepoLocks
|
||||
|
||||
def __init__(self, config, component, connections=None):
|
||||
self.connections = connections or {}
|
||||
def __init__(
|
||||
self,
|
||||
config: ConfigParser,
|
||||
component: str,
|
||||
zk_client: ZooKeeperClient,
|
||||
connections,
|
||||
):
|
||||
self.connections = connections
|
||||
self.merge_email = get_default(config, 'merger', 'git_user_email',
|
||||
'zuul.merger.default@example.com')
|
||||
self.merge_name = get_default(config, 'merger', 'git_user_name',
|
||||
|
@ -63,6 +72,7 @@ class BaseMergeServer(metaclass=ABCMeta):
|
|||
|
||||
self.merge_root = get_default(config, component, 'git_dir',
|
||||
'/var/lib/zuul/{}-git'.format(component))
|
||||
self.zk_client = zk_client
|
||||
|
||||
# This merger and its git repos are used to maintain
|
||||
# up-to-date copies of all the repos that are used by jobs, as
|
||||
|
@ -90,9 +100,10 @@ class BaseMergeServer(metaclass=ABCMeta):
|
|||
|
||||
def _getMerger(self, root, cache_root, logger=None):
|
||||
return merger.Merger(
|
||||
root, self.connections, self.merge_email, self.merge_name,
|
||||
self.merge_speed_limit, self.merge_speed_time, cache_root, logger,
|
||||
execution_context=True, git_timeout=self.git_timeout)
|
||||
root, self.connections, self.zk_client, self.merge_email,
|
||||
self.merge_name, self.merge_speed_limit, self.merge_speed_time,
|
||||
cache_root, logger, execution_context=True,
|
||||
git_timeout=self.git_timeout)
|
||||
|
||||
def _repoLock(self, connection_name, project_name):
|
||||
# The merger does not need locking so return a null lock.
|
||||
|
@ -228,8 +239,13 @@ class BaseMergeServer(metaclass=ABCMeta):
|
|||
class MergeServer(BaseMergeServer):
|
||||
log = logging.getLogger("zuul.MergeServer")
|
||||
|
||||
def __init__(self, config, connections=None):
|
||||
super().__init__(config, 'merger', connections)
|
||||
def __init__(
|
||||
self,
|
||||
config: ConfigParser,
|
||||
zk_client: ZooKeeperClient,
|
||||
connections,
|
||||
):
|
||||
super().__init__(config, 'merger', zk_client, connections)
|
||||
|
||||
self.command_map = dict(
|
||||
stop=self.stop,
|
||||
|
|
Loading…
Reference in New Issue