Merge "Connect executor to Zookeeper"
This commit is contained in:
commit
d9b8521ee9
|
@ -90,6 +90,7 @@ services:
|
|||
- "./playbooks/:/var/playbooks/:z"
|
||||
- "sshkey:/var/ssh:z"
|
||||
- "logs:/srv/static/logs:z"
|
||||
- "certs:/var/certs:z"
|
||||
node:
|
||||
build:
|
||||
dockerfile: node-Dockerfile
|
||||
|
|
|
@ -0,0 +1,6 @@
|
|||
---
|
||||
upgrade:
|
||||
- |
|
||||
The :attr:`zookeeper` section in ``zuul.conf`` is required for all
|
||||
components, and all components must now be able to connect to
|
||||
ZooKeeper.
|
|
@ -4178,6 +4178,9 @@ class ZuulTestCase(BaseTestCase):
|
|||
self.zk_chroot_fixture.zookeeper_port,
|
||||
self.zk_chroot_fixture.zookeeper_chroot)
|
||||
|
||||
self.zk_client = ZooKeeperClient()
|
||||
self.zk_client.connect(hosts=self.zk_config)
|
||||
|
||||
if not KEEP_TEMPDIRS:
|
||||
tmp_root = self.useFixture(fixtures.TempDir(
|
||||
rootdir=os.environ.get("ZUUL_TEST_ROOT"))
|
||||
|
@ -4284,7 +4287,9 @@ class ZuulTestCase(BaseTestCase):
|
|||
executor_connections.configure(self.config,
|
||||
source_only=self.source_only)
|
||||
self.executor_server = RecordingExecutorServer(
|
||||
self.config, executor_connections,
|
||||
self.config,
|
||||
self.zk_client,
|
||||
executor_connections,
|
||||
jobdir_root=self.jobdir_root,
|
||||
_run_ansible=self.run_ansible,
|
||||
_test_root=self.test_root,
|
||||
|
@ -4628,6 +4633,7 @@ class ZuulTestCase(BaseTestCase):
|
|||
self.rpcclient.shutdown()
|
||||
self.gearman_server.shutdown()
|
||||
self.fake_nodepool.stop()
|
||||
self.zk_client.disconnect()
|
||||
self.scheds.execute(lambda app: app.sched.zk_client.disconnect())
|
||||
self.printHistory()
|
||||
# We whitelist watchdog threads as they have relatively long delays
|
||||
|
|
|
@ -22,6 +22,7 @@ import zuul.cmd
|
|||
import zuul.executor.server
|
||||
|
||||
from zuul.lib.config import get_default
|
||||
from zuul.zk import ZooKeeperClient
|
||||
|
||||
|
||||
class Executor(zuul.cmd.ZuulDaemonApp):
|
||||
|
@ -96,8 +97,26 @@ class Executor(zuul.cmd.ZuulDaemonApp):
|
|||
|
||||
self.start_log_streamer()
|
||||
|
||||
zk_client = ZooKeeperClient()
|
||||
zookeeper_hosts = get_default(self.config, 'zookeeper', 'hosts', None)
|
||||
if not zookeeper_hosts:
|
||||
raise Exception("The zookeeper hosts config value is required")
|
||||
zookeeper_tls_key = get_default(self.config, 'zookeeper', 'tls_key')
|
||||
zookeeper_tls_cert = get_default(self.config, 'zookeeper', 'tls_cert')
|
||||
zookeeper_tls_ca = get_default(self.config, 'zookeeper', 'tls_ca')
|
||||
zookeeper_timeout = float(get_default(self.config, 'zookeeper',
|
||||
'session_timeout', 10.0))
|
||||
zk_client.connect(
|
||||
zookeeper_hosts,
|
||||
timeout=zookeeper_timeout,
|
||||
tls_cert=zookeeper_tls_cert,
|
||||
tls_key=zookeeper_tls_key,
|
||||
tls_ca=zookeeper_tls_ca
|
||||
)
|
||||
|
||||
ExecutorServer = zuul.executor.server.ExecutorServer
|
||||
self.executor = ExecutorServer(self.config, self.connections,
|
||||
self.executor = ExecutorServer(self.config, zk_client,
|
||||
self.connections,
|
||||
jobdir_root=self.job_dir,
|
||||
keep_jobdir=self.args.keep_jobdir,
|
||||
log_streaming_port=self.finger_port)
|
||||
|
@ -107,6 +126,7 @@ class Executor(zuul.cmd.ZuulDaemonApp):
|
|||
signal.signal(signal.SIGTERM, self.exit_handler)
|
||||
|
||||
self.executor.join()
|
||||
zk_client.disconnect()
|
||||
|
||||
|
||||
def main():
|
||||
|
|
|
@ -2549,9 +2549,18 @@ class ExecutorServer(BaseMergeServer):
|
|||
_job_class = AnsibleJob
|
||||
_repo_locks_class = RepoLocks
|
||||
|
||||
def __init__(self, config, connections=None, jobdir_root=None,
|
||||
keep_jobdir=False, log_streaming_port=DEFAULT_FINGER_PORT,
|
||||
log_console_port=DEFAULT_STREAM_PORT):
|
||||
def __init__(
|
||||
self,
|
||||
config,
|
||||
zk_client,
|
||||
connections=None,
|
||||
jobdir_root=None,
|
||||
keep_jobdir=False,
|
||||
log_streaming_port=DEFAULT_FINGER_PORT,
|
||||
log_console_port=DEFAULT_STREAM_PORT,
|
||||
):
|
||||
# TODO(jeblair): add zk client to merger and remove this assignment
|
||||
self.zk_client = zk_client
|
||||
super().__init__(config, 'executor', connections)
|
||||
|
||||
self.keep_jobdir = keep_jobdir
|
||||
|
|
Loading…
Reference in New Issue