Rename zuul-launcher to zuul-executor
To avoid confusion with nodepool-launcher, we've decided to rename zuul-launcher to zuul-executor. Change-Id: I7d03cf0f0093400f4ba2e4beb1c92694224a3e8c Signed-off-by: Paul Belanger <pabelanger@redhat.com>changes/94/445594/2
parent
df8b742356
commit
174a8274d0
|
@ -118,7 +118,7 @@ the following:
|
|||
Construct a test to fully simulate the series of events you want to
|
||||
see, then run it in the foreground. For example::
|
||||
|
||||
.tox/py27/bin/python -m testtools.run tests.test_scheduler.TestScheduler.test_jobs_launched
|
||||
.tox/py27/bin/python -m testtools.run tests.test_scheduler.TestScheduler.test_jobs_executed
|
||||
|
||||
See TESTING.rst for more information.
|
||||
|
||||
|
|
|
@ -64,12 +64,12 @@ To run individual tests with tox::
|
|||
|
||||
For example, to *run the basic Zuul test*::
|
||||
|
||||
tox -e py27 -- tests.unit.test_scheduler.TestScheduler.test_jobs_launched
|
||||
tox -e py27 -- tests.unit.test_scheduler.TestScheduler.test_jobs_executed
|
||||
|
||||
To *run one test in the foreground* (after previously having run tox
|
||||
to set up the virtualenv)::
|
||||
|
||||
.tox/py27/bin/python -m testtools.run tests.unit.test_scheduler.TestScheduler.test_jobs_launched
|
||||
.tox/py27/bin/python -m testtools.run tests.unit.test_scheduler.TestScheduler.test_jobs_executed
|
||||
|
||||
List Failing Tests
|
||||
------------------
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
:title: Launchers
|
||||
:title: Executors
|
||||
|
||||
.. _Gearman: http://gearman.org/
|
||||
|
||||
|
@ -11,27 +11,27 @@
|
|||
.. _`Turbo-Hipster Documentation`:
|
||||
http://turbo-hipster.rtfd.org/
|
||||
|
||||
.. _launchers:
|
||||
.. _executors:
|
||||
|
||||
Launchers
|
||||
Executors
|
||||
=========
|
||||
|
||||
Zuul has a modular architecture for launching jobs. Currently, the
|
||||
Zuul has a modular architecture for executing jobs. Currently, the
|
||||
only supported module interfaces with Gearman_. This design allows
|
||||
any system to run jobs for Zuul simply by interfacing with a Gearman
|
||||
server. The recommended way of integrating a new job-runner with Zuul
|
||||
is via this method.
|
||||
|
||||
If Gearman is unsuitable, Zuul may be extended with a new launcher
|
||||
If Gearman is unsuitable, Zuul may be extended with a new executor
|
||||
module. Zuul makes very few assumptions about the interface to a
|
||||
launcher -- if it can trigger jobs, cancel them, and receive success
|
||||
executor -- if it can trigger jobs, cancel them, and receive success
|
||||
or failure reports, it should be able to be used with Zuul. Patches
|
||||
to this effect are welcome.
|
||||
|
||||
Zuul Parameters
|
||||
---------------
|
||||
|
||||
Zuul will pass some parameters with every job it launches. These are
|
||||
Zuul will pass some parameters with every job it executes. These are
|
||||
for workers to be able to get the repositories into the state they are
|
||||
intended to be tested in. Builds can be triggered either by an action
|
||||
on a change or by a reference update. Both events share a common set
|
|
@ -21,7 +21,7 @@ Contents:
|
|||
zuul
|
||||
merger
|
||||
cloner
|
||||
launchers
|
||||
executors
|
||||
statsd
|
||||
client
|
||||
developer
|
||||
|
|
|
@ -30,7 +30,7 @@ cherry-picking changes as required and identifies the result with a
|
|||
Git reference of the form ``refs/zuul/<branch>/Z<random sha1>``.
|
||||
Preparing the workspace is then a simple matter of fetching that ref
|
||||
and checking it out. The parameters that provide this information are
|
||||
described in :ref:`launchers`.
|
||||
described in :ref:`executors`.
|
||||
|
||||
These references need to be made available via a Git repository that
|
||||
is available to workers (such as Jenkins). This is accomplished by
|
||||
|
|
|
@ -19,7 +19,7 @@ Zuul Components
|
|||
Zuul provides the following components:
|
||||
|
||||
- **zuul-server**: scheduler daemon which communicates with Gerrit and
|
||||
Gearman. Handles receiving events, launching jobs, collecting results
|
||||
Gearman. Handles receiving events, executing jobs, collecting results
|
||||
and postingreports.
|
||||
- **zuul-merger**: speculative-merger which communicates with Gearman.
|
||||
Prepares Git repositories for jobs to test against. This additionally
|
||||
|
|
|
@ -17,7 +17,7 @@ the statsd python module, so an existing Zuul installation may be missing it.
|
|||
The configuration is done via environment variables STATSD_HOST and
|
||||
STATSD_PORT. They are interpreted by the statsd module directly and there is no
|
||||
such parameter in zuul.conf yet. Your init script will have to initialize both
|
||||
of them before launching Zuul.
|
||||
of them before executing Zuul.
|
||||
|
||||
Your init script most probably loads a configuration file named
|
||||
``/etc/default/zuul`` which would contain the environment variables::
|
||||
|
@ -61,7 +61,7 @@ The metrics are emitted by the Zuul scheduler (`zuul/scheduler.py`):
|
|||
#. **job.<jobname>** subtree detailing per job statistics:
|
||||
|
||||
#. **wait_time** counter and timer of the wait time, with the
|
||||
difference of the job start time and the launch time, in
|
||||
difference of the job start time and the execute time, in
|
||||
milliseconds.
|
||||
|
||||
**zuul.pipeline.**
|
||||
|
@ -88,7 +88,7 @@ The metrics are emitted by the Zuul scheduler (`zuul/scheduler.py`):
|
|||
#. **total_changes** counter of the number of change proceeding since
|
||||
Zuul started.
|
||||
#. **wait_time** counter and timer of the wait time, with the difference
|
||||
of the job start time and the launch time, in milliseconds.
|
||||
of the job start time and the execute time, in milliseconds.
|
||||
|
||||
Additionally, the `zuul.pipeline.<pipeline name>` hierarchy contains
|
||||
`current_changes` (gauge), `resident_time` (timing) and `total_changes`
|
||||
|
|
|
@ -635,9 +635,9 @@ each job as it builds a list from the project specification.
|
|||
**hold-following-changes (optional)**
|
||||
This is a boolean that indicates that changes that follow this
|
||||
change in a dependent change pipeline should wait until this job
|
||||
succeeds before launching. If this is applied to a very short job
|
||||
succeeds before executing. If this is applied to a very short job
|
||||
that can predict whether longer jobs will fail early, this can be
|
||||
used to reduce the number of jobs that Zuul will launch and
|
||||
used to reduce the number of jobs that Zuul will execute and
|
||||
ultimately have to cancel. In that case, a small amount of
|
||||
parallelization of jobs is traded for more efficient use of testing
|
||||
resources. On the other hand, to apply this to a long running job
|
||||
|
@ -709,7 +709,7 @@ each job as it builds a list from the project specification.
|
|||
a job is voting or not. Default: ``true``.
|
||||
|
||||
**attempts (optional)**
|
||||
Number of attempts zuul will launch a job. Once reached, zuul will report
|
||||
Number of attempts zuul will execute a job. Once reached, zuul will report
|
||||
RETRY_LIMIT as the job result.
|
||||
Defaults to 3.
|
||||
|
||||
|
|
|
@ -2,12 +2,12 @@
|
|||
tasks:
|
||||
- name: Collect console log.
|
||||
synchronize:
|
||||
dest: "{{ zuul.launcher.log_root }}"
|
||||
dest: "{{ zuul.executor.log_root }}"
|
||||
mode: pull
|
||||
src: "/tmp/console.log"
|
||||
|
||||
- name: Publish logs.
|
||||
copy:
|
||||
dest: "/opt/zuul-logs/{{ zuul.uuid}}"
|
||||
src: "{{ zuul.launcher.log_root }}/"
|
||||
src: "{{ zuul.executor.log_root }}/"
|
||||
delegate_to: 127.0.0.1
|
||||
|
|
|
@ -18,7 +18,7 @@
|
|||
- name: Synchronize src repos to workspace directory.
|
||||
synchronize:
|
||||
dest: "{{ prepare_workspace_root }}"
|
||||
src: "{{ zuul.launcher.src_root }}"
|
||||
src: "{{ zuul.executor.src_root }}"
|
||||
|
||||
- name: Run configure_mirror.sh
|
||||
shell: /opt/nodepool-scripts/configure_mirror.sh
|
||||
|
|
|
@ -11,7 +11,7 @@
|
|||
|
||||
- name: Collect tox logs.
|
||||
synchronize:
|
||||
dest: "{{ zuul.launcher.log_root }}/tox"
|
||||
dest: "{{ zuul.executor.log_root }}/tox"
|
||||
mode: pull
|
||||
src: "{{ item.path }}/log/"
|
||||
with_items: "{{ result.files }}"
|
||||
|
|
|
@ -25,7 +25,7 @@ console_scripts =
|
|||
zuul-merger = zuul.cmd.merger:main
|
||||
zuul = zuul.cmd.client:main
|
||||
zuul-cloner = zuul.cmd.cloner:main
|
||||
zuul-launcher = zuul.cmd.launcher:main
|
||||
zuul-executor = zuul.cmd.executor:main
|
||||
|
||||
[build_sphinx]
|
||||
source-dir = doc/source
|
||||
|
|
103
tests/base.py
103
tests/base.py
|
@ -57,8 +57,8 @@ import zuul.connection.sql
|
|||
import zuul.scheduler
|
||||
import zuul.webapp
|
||||
import zuul.rpclistener
|
||||
import zuul.launcher.server
|
||||
import zuul.launcher.client
|
||||
import zuul.executor.server
|
||||
import zuul.executor.client
|
||||
import zuul.lib.connections
|
||||
import zuul.merger.client
|
||||
import zuul.merger.merger
|
||||
|
@ -570,9 +570,9 @@ class FakeStatsd(threading.Thread):
|
|||
class FakeBuild(object):
|
||||
log = logging.getLogger("zuul.test")
|
||||
|
||||
def __init__(self, launch_server, job):
|
||||
def __init__(self, executor_server, job):
|
||||
self.daemon = True
|
||||
self.launch_server = launch_server
|
||||
self.executor_server = executor_server
|
||||
self.job = job
|
||||
self.jobdir = None
|
||||
self.uuid = job.unique
|
||||
|
@ -638,7 +638,7 @@ class FakeBuild(object):
|
|||
def run(self):
|
||||
self.log.debug('Running build %s' % self.unique)
|
||||
|
||||
if self.launch_server.hold_jobs_in_build:
|
||||
if self.executor_server.hold_jobs_in_build:
|
||||
self.log.debug('Holding build %s' % self.unique)
|
||||
self._wait()
|
||||
self.log.debug("Build %s continuing" % self.unique)
|
||||
|
@ -654,7 +654,7 @@ class FakeBuild(object):
|
|||
return result
|
||||
|
||||
def shouldFail(self):
|
||||
changes = self.launch_server.fail_tests.get(self.name, [])
|
||||
changes = self.executor_server.fail_tests.get(self.name, [])
|
||||
for change in changes:
|
||||
if self.hasChanges(change):
|
||||
return True
|
||||
|
@ -691,21 +691,21 @@ class FakeBuild(object):
|
|||
return True
|
||||
|
||||
|
||||
class RecordingLaunchServer(zuul.launcher.server.LaunchServer):
|
||||
"""An Ansible launcher to be used in tests.
|
||||
class RecordingExecutorServer(zuul.executor.server.ExecutorServer):
|
||||
"""An Ansible executor to be used in tests.
|
||||
|
||||
:ivar bool hold_jobs_in_build: If true, when jobs are launched
|
||||
:ivar bool hold_jobs_in_build: If true, when jobs are executed
|
||||
they will report that they have started but then pause until
|
||||
released before reporting completion. This attribute may be
|
||||
changed at any time and will take effect for subsequently
|
||||
launched builds, but previously held builds will still need to
|
||||
executed builds, but previously held builds will still need to
|
||||
be explicitly released.
|
||||
|
||||
"""
|
||||
def __init__(self, *args, **kw):
|
||||
self._run_ansible = kw.pop('_run_ansible', False)
|
||||
self._test_root = kw.pop('_test_root', False)
|
||||
super(RecordingLaunchServer, self).__init__(*args, **kw)
|
||||
super(RecordingExecutorServer, self).__init__(*args, **kw)
|
||||
self.hold_jobs_in_build = False
|
||||
self.lock = threading.Lock()
|
||||
self.running_builds = []
|
||||
|
@ -714,7 +714,7 @@ class RecordingLaunchServer(zuul.launcher.server.LaunchServer):
|
|||
self.job_builds = {}
|
||||
|
||||
def failJob(self, name, change):
|
||||
"""Instruct the launcher to report matching builds as failures.
|
||||
"""Instruct the executor to report matching builds as failures.
|
||||
|
||||
:arg str name: The name of the job to fail.
|
||||
:arg Change change: The :py:class:`~tests.base.FakeChange`
|
||||
|
@ -748,7 +748,7 @@ class RecordingLaunchServer(zuul.launcher.server.LaunchServer):
|
|||
self.log.debug("Done releasing builds %s (%s)" %
|
||||
(regex, len(self.running_builds)))
|
||||
|
||||
def launchJob(self, job):
|
||||
def executeJob(self, job):
|
||||
build = FakeBuild(self, job)
|
||||
job.build = build
|
||||
self.running_builds.append(build)
|
||||
|
@ -767,32 +767,32 @@ class RecordingLaunchServer(zuul.launcher.server.LaunchServer):
|
|||
if build.unique == uuid:
|
||||
build.aborted = True
|
||||
build.release()
|
||||
super(RecordingLaunchServer, self).stopJob(job)
|
||||
super(RecordingExecutorServer, self).stopJob(job)
|
||||
|
||||
|
||||
class RecordingAnsibleJob(zuul.launcher.server.AnsibleJob):
|
||||
class RecordingAnsibleJob(zuul.executor.server.AnsibleJob):
|
||||
def runPlaybooks(self, args):
|
||||
build = self.launcher_server.job_builds[self.job.unique]
|
||||
build = self.executor_server.job_builds[self.job.unique]
|
||||
build.jobdir = self.jobdir
|
||||
|
||||
result = super(RecordingAnsibleJob, self).runPlaybooks(args)
|
||||
|
||||
self.launcher_server.lock.acquire()
|
||||
self.launcher_server.build_history.append(
|
||||
self.executor_server.lock.acquire()
|
||||
self.executor_server.build_history.append(
|
||||
BuildHistory(name=build.name, result=result, changes=build.changes,
|
||||
node=build.node, uuid=build.unique,
|
||||
parameters=build.parameters, jobdir=build.jobdir,
|
||||
pipeline=build.parameters['ZUUL_PIPELINE'])
|
||||
)
|
||||
self.launcher_server.running_builds.remove(build)
|
||||
del self.launcher_server.job_builds[self.job.unique]
|
||||
self.launcher_server.lock.release()
|
||||
self.executor_server.running_builds.remove(build)
|
||||
del self.executor_server.job_builds[self.job.unique]
|
||||
self.executor_server.lock.release()
|
||||
return result
|
||||
|
||||
def runAnsible(self, cmd, timeout, trusted=False):
|
||||
build = self.launcher_server.job_builds[self.job.unique]
|
||||
build = self.executor_server.job_builds[self.job.unique]
|
||||
|
||||
if self.launcher_server._run_ansible:
|
||||
if self.executor_server._run_ansible:
|
||||
result = super(RecordingAnsibleJob, self).runAnsible(
|
||||
cmd, timeout, trusted=trusted)
|
||||
else:
|
||||
|
@ -828,7 +828,7 @@ class FakeGearmanServer(gear.Server):
|
|||
for queue in [self.high_queue, self.normal_queue, self.low_queue]:
|
||||
for job in queue:
|
||||
if not hasattr(job, 'waiting'):
|
||||
if job.name.startswith('launcher:launch'):
|
||||
if job.name.startswith('executor:execute'):
|
||||
job.waiting = self.hold_jobs_in_queue
|
||||
else:
|
||||
job.waiting = False
|
||||
|
@ -855,7 +855,7 @@ class FakeGearmanServer(gear.Server):
|
|||
len(self.low_queue))
|
||||
self.log.debug("releasing queued job %s (%s)" % (regex, qlen))
|
||||
for job in self.getQueue():
|
||||
if job.name != 'launcher:launch':
|
||||
if job.name != 'executor:execute':
|
||||
continue
|
||||
parameters = json.loads(job.arguments)
|
||||
if not regex or re.match(regex, parameters.get('job')):
|
||||
|
@ -991,7 +991,7 @@ class FakeNodepool(object):
|
|||
created_time=now,
|
||||
updated_time=now,
|
||||
image_id=None,
|
||||
launcher='fake-nodepool')
|
||||
executor='fake-nodepool')
|
||||
data = json.dumps(data)
|
||||
path = self.client.create(path, data,
|
||||
makepath=True,
|
||||
|
@ -1223,13 +1223,13 @@ class ZuulTestCase(BaseTestCase):
|
|||
server that all of the Zuul components in this test use to
|
||||
communicate with each other.
|
||||
|
||||
:ivar RecordingLaunchServer launch_server: An instance of
|
||||
:py:class:`~tests.base.RecordingLaunchServer` which is the
|
||||
Ansible launch server used to run jobs for this test.
|
||||
:ivar RecordingExecutorServer executor_server: An instance of
|
||||
:py:class:`~tests.base.RecordingExecutorServer` which is the
|
||||
Ansible execute server used to run jobs for this test.
|
||||
|
||||
:ivar list builds: A list of :py:class:`~tests.base.FakeBuild` objects
|
||||
representing currently running builds. They are appended to
|
||||
the list in the order they are launched, and removed from this
|
||||
the list in the order they are executed, and removed from this
|
||||
list upon completion.
|
||||
|
||||
:ivar list history: A list of :py:class:`~tests.base.BuildHistory`
|
||||
|
@ -1261,7 +1261,7 @@ class ZuulTestCase(BaseTestCase):
|
|||
self.test_root = os.path.join(tmp_root, "zuul-test")
|
||||
self.upstream_root = os.path.join(self.test_root, "upstream")
|
||||
self.merger_src_root = os.path.join(self.test_root, "merger-git")
|
||||
self.launcher_src_root = os.path.join(self.test_root, "launcher-git")
|
||||
self.executor_src_root = os.path.join(self.test_root, "executor-git")
|
||||
self.state_root = os.path.join(self.test_root, "lib")
|
||||
|
||||
if os.path.exists(self.test_root):
|
||||
|
@ -1276,7 +1276,7 @@ class ZuulTestCase(BaseTestCase):
|
|||
os.path.join(FIXTURE_DIR,
|
||||
self.config.get('zuul', 'tenant_config')))
|
||||
self.config.set('merger', 'git_dir', self.merger_src_root)
|
||||
self.config.set('launcher', 'git_dir', self.launcher_src_root)
|
||||
self.config.set('executor', 'git_dir', self.executor_src_root)
|
||||
self.config.set('zuul', 'state_dir', self.state_root)
|
||||
|
||||
# For each project in config:
|
||||
|
@ -1337,17 +1337,17 @@ class ZuulTestCase(BaseTestCase):
|
|||
|
||||
self._startMerger()
|
||||
|
||||
self.launch_server = RecordingLaunchServer(
|
||||
self.executor_server = RecordingExecutorServer(
|
||||
self.config, self.connections,
|
||||
jobdir_root=self.test_root,
|
||||
_run_ansible=self.run_ansible,
|
||||
_test_root=self.test_root,
|
||||
keep_jobdir=KEEP_TEMPDIRS)
|
||||
self.launch_server.start()
|
||||
self.history = self.launch_server.build_history
|
||||
self.builds = self.launch_server.running_builds
|
||||
self.executor_server.start()
|
||||
self.history = self.executor_server.build_history
|
||||
self.builds = self.executor_server.running_builds
|
||||
|
||||
self.launch_client = zuul.launcher.client.LaunchClient(
|
||||
self.executor_client = zuul.executor.client.ExecutorClient(
|
||||
self.config, self.sched)
|
||||
self.merge_client = zuul.merger.client.MergeClient(
|
||||
self.config, self.sched)
|
||||
|
@ -1360,7 +1360,7 @@ class ZuulTestCase(BaseTestCase):
|
|||
self.zk_chroot_fixture.zookeeper_port,
|
||||
self.zk_chroot_fixture.zookeeper_chroot)
|
||||
|
||||
self.sched.setLauncher(self.launch_client)
|
||||
self.sched.setExecutor(self.executor_client)
|
||||
self.sched.setMerger(self.merge_client)
|
||||
self.sched.setNodepool(self.nodepool)
|
||||
self.sched.setZooKeeper(self.zk)
|
||||
|
@ -1372,7 +1372,7 @@ class ZuulTestCase(BaseTestCase):
|
|||
self.sched.start()
|
||||
self.webapp.start()
|
||||
self.rpc.start()
|
||||
self.launch_client.gearman.waitForServer()
|
||||
self.executor_client.gearman.waitForServer()
|
||||
self.addCleanup(self.shutdown)
|
||||
|
||||
self.sched.reconfigure(self.config)
|
||||
|
@ -1488,11 +1488,11 @@ class ZuulTestCase(BaseTestCase):
|
|||
|
||||
def shutdown(self):
|
||||
self.log.debug("Shutting down after tests")
|
||||
self.launch_client.stop()
|
||||
self.executor_client.stop()
|
||||
self.merge_server.stop()
|
||||
self.merge_server.join()
|
||||
self.merge_client.stop()
|
||||
self.launch_server.stop()
|
||||
self.executor_server.stop()
|
||||
self.sched.stop()
|
||||
self.sched.join()
|
||||
self.statsd.stop()
|
||||
|
@ -1579,29 +1579,29 @@ class ZuulTestCase(BaseTestCase):
|
|||
|
||||
def haveAllBuildsReported(self):
|
||||
# See if Zuul is waiting on a meta job to complete
|
||||
if self.launch_client.meta_jobs:
|
||||
if self.executor_client.meta_jobs:
|
||||
return False
|
||||
# Find out if every build that the worker has completed has been
|
||||
# reported back to Zuul. If it hasn't then that means a Gearman
|
||||
# event is still in transit and the system is not stable.
|
||||
for build in self.history:
|
||||
zbuild = self.launch_client.builds.get(build.uuid)
|
||||
zbuild = self.executor_client.builds.get(build.uuid)
|
||||
if not zbuild:
|
||||
# It has already been reported
|
||||
continue
|
||||
# It hasn't been reported yet.
|
||||
return False
|
||||
# Make sure that none of the worker connections are in GRAB_WAIT
|
||||
for connection in self.launch_server.worker.active_connections:
|
||||
for connection in self.executor_server.worker.active_connections:
|
||||
if connection.state == 'GRAB_WAIT':
|
||||
return False
|
||||
return True
|
||||
|
||||
def areAllBuildsWaiting(self):
|
||||
builds = self.launch_client.builds.values()
|
||||
builds = self.executor_client.builds.values()
|
||||
for build in builds:
|
||||
client_job = None
|
||||
for conn in self.launch_client.gearman.active_connections:
|
||||
for conn in self.executor_client.gearman.active_connections:
|
||||
for j in conn.related_jobs.values():
|
||||
if j.unique == build.uuid:
|
||||
client_job = j
|
||||
|
@ -1626,7 +1626,8 @@ class ZuulTestCase(BaseTestCase):
|
|||
if build.url is None:
|
||||
self.log.debug("%s has not reported start" % build)
|
||||
return False
|
||||
worker_build = self.launch_server.job_builds.get(server_job.unique)
|
||||
worker_build = self.executor_server.job_builds.get(
|
||||
server_job.unique)
|
||||
if worker_build:
|
||||
if worker_build.isWaiting():
|
||||
continue
|
||||
|
@ -1673,7 +1674,7 @@ class ZuulTestCase(BaseTestCase):
|
|||
raise Exception("Timeout waiting for Zuul to settle")
|
||||
# Make sure no new events show up while we're checking
|
||||
|
||||
self.launch_server.lock.acquire()
|
||||
self.executor_server.lock.acquire()
|
||||
# have all build states propogated to zuul?
|
||||
if self.haveAllBuildsReported():
|
||||
# Join ensures that the queue is empty _and_ events have been
|
||||
|
@ -1691,11 +1692,11 @@ class ZuulTestCase(BaseTestCase):
|
|||
# components were stable, we don't erroneously
|
||||
# report that we are settled.
|
||||
self.sched.run_handler_lock.release()
|
||||
self.launch_server.lock.release()
|
||||
self.executor_server.lock.release()
|
||||
self.log.debug("...settled.")
|
||||
return
|
||||
self.sched.run_handler_lock.release()
|
||||
self.launch_server.lock.release()
|
||||
self.executor_server.lock.release()
|
||||
self.sched.wake_event.wait(0.1)
|
||||
|
||||
def countJobResults(self, jobs, result):
|
||||
|
@ -1912,7 +1913,7 @@ class ZuulTestCase(BaseTestCase):
|
|||
|
||||
|
||||
class AnsibleZuulTestCase(ZuulTestCase):
|
||||
"""ZuulTestCase but with an actual ansible launcher running"""
|
||||
"""ZuulTestCase but with an actual ansible executor running"""
|
||||
run_ansible = True
|
||||
|
||||
|
||||
|
|
|
@ -12,8 +12,8 @@ git_user_email=zuul@example.com
|
|||
git_user_name=zuul
|
||||
zuul_url=http://zuul.example.com/p
|
||||
|
||||
[launcher]
|
||||
git_dir=/tmp/zuul-test/launcher-git
|
||||
[executor]
|
||||
git_dir=/tmp/zuul-test/executor-git
|
||||
|
||||
[connection review_gerrit]
|
||||
driver=gerrit
|
||||
|
|
|
@ -12,8 +12,8 @@ git_user_email=zuul@example.com
|
|||
git_user_name=zuul
|
||||
zuul_url=http://zuul.example.com/p
|
||||
|
||||
[launcher]
|
||||
git_dir=/tmp/zuul-test/launcher-git
|
||||
[executor]
|
||||
git_dir=/tmp/zuul-test/executor-git
|
||||
|
||||
[connection review_gerrit]
|
||||
driver=gerrit
|
||||
|
|
|
@ -12,8 +12,8 @@ git_user_email=zuul@example.com
|
|||
git_user_name=zuul
|
||||
zuul_url=http://zuul.example.com/p
|
||||
|
||||
[launcher]
|
||||
git_dir=/tmp/zuul-test/launcher-git
|
||||
[executor]
|
||||
git_dir=/tmp/zuul-test/executor-git
|
||||
|
||||
[swift]
|
||||
authurl=https://identity.api.example.org/v2.0/
|
||||
|
|
|
@ -12,8 +12,8 @@ git_user_email=zuul@example.com
|
|||
git_user_name=zuul
|
||||
zuul_url=http://zuul.example.com/p
|
||||
|
||||
[launcher]
|
||||
git_dir=/tmp/zuul-test/launcher-git
|
||||
[executor]
|
||||
git_dir=/tmp/zuul-test/executor-git
|
||||
|
||||
[swift]
|
||||
authurl=https://identity.api.example.org/v2.0/
|
||||
|
|
|
@ -610,7 +610,7 @@ class TestCloner(ZuulTestCase):
|
|||
|
||||
# Start a periodic job
|
||||
self.worker.hold_jobs_in_build = True
|
||||
self.launcher.negative_function_cache_ttl = 0
|
||||
self.executor.negative_function_cache_ttl = 0
|
||||
self.config.set('zuul', 'layout_config',
|
||||
'tests/fixtures/layout-timer.yaml')
|
||||
self.sched.reconfigure(self.config)
|
||||
|
|
|
@ -46,7 +46,7 @@ class TestConnections(ZuulTestCase):
|
|||
'jenkins')
|
||||
|
||||
B = self.fake_review_gerrit.addFakeChange('org/project', 'master', 'B')
|
||||
self.launch_server.failJob('project-test2', B)
|
||||
self.executor_server.failJob('project-test2', B)
|
||||
self.addEvent('review_gerrit', B.getPatchsetCreatedEvent(1))
|
||||
|
||||
self.waitUntilSettled()
|
||||
|
@ -239,7 +239,7 @@ class TestMultipleGerrits(ZuulTestCase):
|
|||
tenant_config_file = 'config/zuul-connections-multiple-gerrits/main.yaml'
|
||||
|
||||
def test_multiple_project_separate_gerrits(self):
|
||||
self.launch_server.hold_jobs_in_build = True
|
||||
self.executor_server.hold_jobs_in_build = True
|
||||
|
||||
A = self.fake_another_gerrit.addFakeChange(
|
||||
'org/project1', 'master', 'A')
|
||||
|
@ -276,6 +276,6 @@ class TestMultipleGerrits(ZuulTestCase):
|
|||
pipeline='review_check'),
|
||||
])
|
||||
|
||||
self.launch_server.hold_jobs_in_build = False
|
||||
self.launch_server.release()
|
||||
self.executor_server.hold_jobs_in_build = False
|
||||
self.executor_server.release()
|
||||
self.waitUntilSettled()
|
||||
|
|
|
@ -58,7 +58,7 @@ class TestOpenStack(AnsibleZuulTestCase):
|
|||
'ubuntu-trusty')
|
||||
|
||||
def test_dsvm_keystone_repo(self):
|
||||
self.launch_server.keep_jobdir = True
|
||||
self.executor_server.keep_jobdir = True
|
||||
A = self.fake_gerrit.addFakeChange('openstack/nova', 'master', 'A')
|
||||
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
|
||||
self.waitUntilSettled()
|
||||
|
@ -68,9 +68,9 @@ class TestOpenStack(AnsibleZuulTestCase):
|
|||
build = self.getJobFromHistory('dsvm')
|
||||
|
||||
# Check that a change to nova triggered a keystone clone
|
||||
launcher_git_dir = os.path.join(self.launcher_src_root,
|
||||
executor_git_dir = os.path.join(self.executor_src_root,
|
||||
'openstack', 'keystone', '.git')
|
||||
self.assertTrue(os.path.exists(launcher_git_dir),
|
||||
self.assertTrue(os.path.exists(executor_git_dir),
|
||||
msg='openstack/keystone should be cloned.')
|
||||
|
||||
jobdir_git_dir = os.path.join(build.jobdir.src_root,
|
||||
|
@ -79,7 +79,7 @@ class TestOpenStack(AnsibleZuulTestCase):
|
|||
msg='openstack/keystone should be cloned.')
|
||||
|
||||
def test_dsvm_nova_repo(self):
|
||||
self.launch_server.keep_jobdir = True
|
||||
self.executor_server.keep_jobdir = True
|
||||
A = self.fake_gerrit.addFakeChange('openstack/keystone', 'master', 'A')
|
||||
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
|
||||
self.waitUntilSettled()
|
||||
|
@ -89,9 +89,9 @@ class TestOpenStack(AnsibleZuulTestCase):
|
|||
build = self.getJobFromHistory('dsvm')
|
||||
|
||||
# Check that a change to keystone triggered a nova clone
|
||||
launcher_git_dir = os.path.join(self.launcher_src_root,
|
||||
executor_git_dir = os.path.join(self.executor_src_root,
|
||||
'openstack', 'nova', '.git')
|
||||
self.assertTrue(os.path.exists(launcher_git_dir),
|
||||
self.assertTrue(os.path.exists(executor_git_dir),
|
||||
msg='openstack/nova should be cloned.')
|
||||
|
||||
jobdir_git_dir = os.path.join(build.jobdir.src_root,
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -27,7 +27,7 @@ class TestWebapp(ZuulTestCase):
|
|||
|
||||
def setUp(self):
|
||||
super(TestWebapp, self).setUp()
|
||||
self.launch_server.hold_jobs_in_build = True
|
||||
self.executor_server.hold_jobs_in_build = True
|
||||
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
|
||||
A.addApproval('code-review', 2)
|
||||
self.fake_gerrit.addEvent(A.addApproval('approved', 1))
|
||||
|
@ -38,8 +38,8 @@ class TestWebapp(ZuulTestCase):
|
|||
self.port = self.webapp.server.socket.getsockname()[1]
|
||||
|
||||
def tearDown(self):
|
||||
self.launch_server.hold_jobs_in_build = False
|
||||
self.launch_server.release()
|
||||
self.executor_server.hold_jobs_in_build = False
|
||||
self.executor_server.release()
|
||||
self.waitUntilSettled()
|
||||
super(TestWebapp, self).tearDown()
|
||||
|
||||
|
|
|
@ -27,7 +27,7 @@ class TestZuulTriggerParentChangeEnqueued(ZuulTestCase):
|
|||
# When A is enqueued in the gate, B1 and B2 should both attempt
|
||||
# to be enqueued in both pipelines. B1 should end up in check
|
||||
# and B2 in gate because of differing pipeline requirements.
|
||||
self.launch_server.hold_jobs_in_build = True
|
||||
self.executor_server.hold_jobs_in_build = True
|
||||
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
|
||||
B1 = self.fake_gerrit.addFakeChange('org/project', 'master', 'B1')
|
||||
B2 = self.fake_gerrit.addFakeChange('org/project', 'master', 'B2')
|
||||
|
@ -46,8 +46,8 @@ class TestZuulTriggerParentChangeEnqueued(ZuulTestCase):
|
|||
# to enqueue behind 1,1 so that the test is more
|
||||
# deterministic.
|
||||
self.waitUntilSettled()
|
||||
self.launch_server.hold_jobs_in_build = False
|
||||
self.launch_server.release()
|
||||
self.executor_server.hold_jobs_in_build = False
|
||||
self.executor_server.release()
|
||||
self.waitUntilSettled()
|
||||
|
||||
self.assertEqual(len(self.history), 3)
|
||||
|
|
|
@ -239,8 +239,8 @@ class Client(zuul.cmd.ZuulApp):
|
|||
'uuid': {
|
||||
'title': 'UUID'
|
||||
},
|
||||
'launch_time': {
|
||||
'title': 'Launch Time',
|
||||
'execute_time': {
|
||||
'title': 'Execute Time',
|
||||
'transform': self._epoch_to_relative_time,
|
||||
'append': ' ago'
|
||||
},
|
||||
|
|
|
@ -29,7 +29,7 @@ import sys
|
|||
import signal
|
||||
|
||||
import zuul.cmd
|
||||
import zuul.launcher.server
|
||||
import zuul.executor.server
|
||||
|
||||
# No zuul imports that pull in paramiko here; it must not be
|
||||
# imported until after the daemonization.
|
||||
|
@ -37,10 +37,10 @@ import zuul.launcher.server
|
|||
# Similar situation with gear and statsd.
|
||||
|
||||
|
||||
class Launcher(zuul.cmd.ZuulApp):
|
||||
class Executor(zuul.cmd.ZuulApp):
|
||||
|
||||
def parse_arguments(self):
|
||||
parser = argparse.ArgumentParser(description='Zuul launch worker.')
|
||||
parser = argparse.ArgumentParser(description='Zuul executor.')
|
||||
parser.add_argument('-c', dest='config',
|
||||
help='specify the config file')
|
||||
parser.add_argument('-d', dest='nodaemon', action='store_true',
|
||||
|
@ -52,7 +52,7 @@ class Launcher(zuul.cmd.ZuulApp):
|
|||
action='store_true',
|
||||
help='keep local jobdirs after run completes')
|
||||
parser.add_argument('command',
|
||||
choices=zuul.launcher.server.COMMANDS,
|
||||
choices=zuul.executor.server.COMMANDS,
|
||||
nargs='?')
|
||||
|
||||
self.args = parser.parse_args()
|
||||
|
@ -63,55 +63,55 @@ class Launcher(zuul.cmd.ZuulApp):
|
|||
self.config.get('zuul', 'state_dir'))
|
||||
else:
|
||||
state_dir = '/var/lib/zuul'
|
||||
path = os.path.join(state_dir, 'launcher.socket')
|
||||
path = os.path.join(state_dir, 'executor.socket')
|
||||
s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
||||
s.connect(path)
|
||||
s.sendall('%s\n' % cmd)
|
||||
|
||||
def exit_handler(self):
|
||||
self.launcher.stop()
|
||||
self.launcher.join()
|
||||
self.executor.stop()
|
||||
self.executor.join()
|
||||
|
||||
def main(self, daemon=True):
|
||||
# See comment at top of file about zuul imports
|
||||
|
||||
self.setup_logging('launcher', 'log_config')
|
||||
self.setup_logging('executor', 'log_config')
|
||||
|
||||
self.log = logging.getLogger("zuul.Launcher")
|
||||
self.log = logging.getLogger("zuul.Executor")
|
||||
|
||||
LaunchServer = zuul.launcher.server.LaunchServer
|
||||
self.launcher = LaunchServer(self.config, self.connections,
|
||||
LaunchServer = zuul.executor.server.LaunchServer
|
||||
self.executor = LaunchServer(self.config, self.connections,
|
||||
keep_jobdir=self.args.keep_jobdir)
|
||||
self.launcher.start()
|
||||
self.executor.start()
|
||||
|
||||
signal.signal(signal.SIGUSR2, zuul.cmd.stack_dump_handler)
|
||||
if daemon:
|
||||
self.launcher.join()
|
||||
self.executor.join()
|
||||
else:
|
||||
while True:
|
||||
try:
|
||||
signal.pause()
|
||||
except KeyboardInterrupt:
|
||||
print("Ctrl + C: asking launcher to exit nicely...\n")
|
||||
print("Ctrl + C: asking executor to exit nicely...\n")
|
||||
self.exit_handler()
|
||||
sys.exit(0)
|
||||
|
||||
|
||||
def main():
|
||||
server = Launcher()
|
||||
server = Executor()
|
||||
server.parse_arguments()
|
||||
server.read_config()
|
||||
|
||||
if server.args.command in zuul.launcher.server.COMMANDS:
|
||||
if server.args.command in zuul.executor.server.COMMANDS:
|
||||
server.send_command(server.args.command)
|
||||
sys.exit(0)
|
||||
|
||||
server.configure_connections()
|
||||
|
||||
if server.config.has_option('launcher', 'pidfile'):
|
||||
pid_fn = os.path.expanduser(server.config.get('launcher', 'pidfile'))
|
||||
if server.config.has_option('executor', 'pidfile'):
|
||||
pid_fn = os.path.expanduser(server.config.get('executor', 'pidfile'))
|
||||
else:
|
||||
pid_fn = '/var/run/zuul-launcher/zuul-launcher.pid'
|
||||
pid_fn = '/var/run/zuul-executor/zuul-executor.pid'
|
||||
pid = pid_file_module.TimeoutPIDLockFile(pid_fn, 10)
|
||||
|
||||
if server.args.nodaemon:
|
|
@ -78,7 +78,7 @@ class Scheduler(zuul.cmd.ZuulApp):
|
|||
def test_config(self):
|
||||
# See comment at top of file about zuul imports
|
||||
import zuul.scheduler
|
||||
import zuul.launcher.client
|
||||
import zuul.executor.client
|
||||
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
try:
|
||||
|
@ -124,7 +124,7 @@ class Scheduler(zuul.cmd.ZuulApp):
|
|||
def main(self):
|
||||
# See comment at top of file about zuul imports
|
||||
import zuul.scheduler
|
||||
import zuul.launcher.client
|
||||
import zuul.executor.client
|
||||
import zuul.merger.client
|
||||
import zuul.nodepool
|
||||
import zuul.webapp
|
||||
|
@ -141,7 +141,7 @@ class Scheduler(zuul.cmd.ZuulApp):
|
|||
|
||||
self.sched = zuul.scheduler.Scheduler(self.config)
|
||||
|
||||
gearman = zuul.launcher.client.LaunchClient(self.config, self.sched)
|
||||
gearman = zuul.executor.client.LaunchClient(self.config, self.sched)
|
||||
merger = zuul.merger.client.MergeClient(self.config, self.sched)
|
||||
nodepool = zuul.nodepool.Nodepool(self.sched)
|
||||
|
||||
|
@ -174,7 +174,7 @@ class Scheduler(zuul.cmd.ZuulApp):
|
|||
rpc = zuul.rpclistener.RPCListener(self.config, self.sched)
|
||||
|
||||
self.configure_connections()
|
||||
self.sched.setLauncher(gearman)
|
||||
self.sched.setExecutor(gearman)
|
||||
self.sched.setMerger(merger)
|
||||
self.sched.setNodepool(nodepool)
|
||||
self.sched.setZooKeeper(zookeeper)
|
||||
|
|
|
@ -782,7 +782,7 @@ class TenantParser(object):
|
|||
|
||||
for job in jobs:
|
||||
# Note: this is an ordered list -- we wait for cat jobs to
|
||||
# complete in the order they were launched which is the
|
||||
# complete in the order they were executed which is the
|
||||
# same order they were defined in the main config file.
|
||||
# This is important for correct inheritance.
|
||||
TenantParser.log.debug("Waiting for cat job %s" % (job,))
|
||||
|
|
|
@ -102,7 +102,7 @@ def getJobData(job):
|
|||
|
||||
class ZuulGearmanClient(gear.Client):
|
||||
def __init__(self, zuul_gearman):
|
||||
super(ZuulGearmanClient, self).__init__('Zuul Launch Client')
|
||||
super(ZuulGearmanClient, self).__init__('Zuul Executor Client')
|
||||
self.__zuul_gearman = zuul_gearman
|
||||
|
||||
def handleWorkComplete(self, packet):
|
||||
|
@ -144,8 +144,8 @@ class ZuulGearmanClient(gear.Client):
|
|||
self.__zuul_gearman.onUnknownJob(job)
|
||||
|
||||
|
||||
class LaunchClient(object):
|
||||
log = logging.getLogger("zuul.LaunchClient")
|
||||
class ExecutorClient(object):
|
||||
log = logging.getLogger("zuul.ExecutorClient")
|
||||
negative_function_cache_ttl = 5
|
||||
|
||||
def __init__(self, config, sched):
|
||||
|
@ -209,10 +209,10 @@ class LaunchClient(object):
|
|||
self.log.debug("Function %s is not registered" % name)
|
||||
return False
|
||||
|
||||
def launch(self, job, item, pipeline, dependent_items=[]):
|
||||
def execute(self, job, item, pipeline, dependent_items=[]):
|
||||
uuid = str(uuid4().hex)
|
||||
self.log.info(
|
||||
"Launch job %s (uuid: %s) on nodes %s for change %s "
|
||||
"Execute job %s (uuid: %s) on nodes %s for change %s "
|
||||
"with dependent changes %s" % (
|
||||
job, uuid,
|
||||
item.current_build_set.getJobNodeSet(job.name),
|
||||
|
@ -339,7 +339,7 @@ class LaunchClient(object):
|
|||
self.sched.onBuildCompleted(build, 'SUCCESS')
|
||||
return build
|
||||
|
||||
gearman_job = gear.Job('launcher:launch', json.dumps(params),
|
||||
gearman_job = gear.Job('executor:execute', json.dumps(params),
|
||||
unique=uuid)
|
||||
build.__gearman_job = gearman_job
|
||||
build.__gearman_manager = None
|
||||
|
@ -433,7 +433,7 @@ class LaunchClient(object):
|
|||
# internal dict after it's added to the report queue.
|
||||
del self.builds[job.unique]
|
||||
else:
|
||||
if not job.name.startswith("launcher:stop:"):
|
||||
if not job.name.startswith("executor:stop:"):
|
||||
self.log.error("Unable to find build %s" % job.unique)
|
||||
|
||||
def onWorkStatus(self, job):
|
||||
|
@ -483,7 +483,7 @@ class LaunchClient(object):
|
|||
(build,))
|
||||
stop_uuid = str(uuid4().hex)
|
||||
data = dict(uuid=build.__gearman_job.unique)
|
||||
stop_job = gear.Job("launcher:stop:%s" % build.__gearman_manager,
|
||||
stop_job = gear.Job("executor:stop:%s" % build.__gearman_manager,
|
||||
json.dumps(data), unique=stop_uuid)
|
||||
self.meta_jobs[stop_uuid] = stop_job
|
||||
self.log.debug("Submitting stop job: %s", stop_job)
|
|
@ -211,15 +211,15 @@ class DeduplicateQueue(object):
|
|||
self.condition.release()
|
||||
|
||||
|
||||
class LaunchServer(object):
|
||||
log = logging.getLogger("zuul.LaunchServer")
|
||||
class ExecutorServer(object):
|
||||
log = logging.getLogger("zuul.ExecutorServer")
|
||||
|
||||
def __init__(self, config, connections={}, jobdir_root=None,
|
||||
keep_jobdir=False):
|
||||
self.config = config
|
||||
self.keep_jobdir = keep_jobdir
|
||||
self.jobdir_root = jobdir_root
|
||||
# TODOv3(mordred): make the launcher name more unique --
|
||||
# TODOv3(mordred): make the executor name more unique --
|
||||
# perhaps hostname+pid.
|
||||
self.hostname = socket.gethostname()
|
||||
self.zuul_url = config.get('merger', 'zuul_url')
|
||||
|
@ -232,10 +232,10 @@ class LaunchServer(object):
|
|||
unverbose=self.verboseOff,
|
||||
)
|
||||
|
||||
if self.config.has_option('launcher', 'git_dir'):
|
||||
self.merge_root = self.config.get('launcher', 'git_dir')
|
||||
if self.config.has_option('executor', 'git_dir'):
|
||||
self.merge_root = self.config.get('executor', 'git_dir')
|
||||
else:
|
||||
self.merge_root = '/var/lib/zuul/launcher-git'
|
||||
self.merge_root = '/var/lib/zuul/executor-git'
|
||||
|
||||
if self.config.has_option('merger', 'git_user_email'):
|
||||
self.merge_email = self.config.get('merger', 'git_user_email')
|
||||
|
@ -260,7 +260,7 @@ class LaunchServer(object):
|
|||
self.config.get('zuul', 'state_dir'))
|
||||
else:
|
||||
state_dir = '/var/lib/zuul'
|
||||
path = os.path.join(state_dir, 'launcher.socket')
|
||||
path = os.path.join(state_dir, 'executor.socket')
|
||||
self.command_socket = commandsocket.CommandSocket(path)
|
||||
ansible_dir = os.path.join(state_dir, 'ansible')
|
||||
self.library_dir = os.path.join(ansible_dir, 'library')
|
||||
|
@ -303,7 +303,7 @@ class LaunchServer(object):
|
|||
port = self.config.get('gearman', 'port')
|
||||
else:
|
||||
port = 4730
|
||||
self.worker = gear.Worker('Zuul Launch Server')
|
||||
self.worker = gear.Worker('Zuul Executor Server')
|
||||
self.worker.addServer(server, port)
|
||||
self.log.debug("Waiting for server")
|
||||
self.worker.waitForServer()
|
||||
|
@ -325,8 +325,8 @@ class LaunchServer(object):
|
|||
self.thread.start()
|
||||
|
||||
def register(self):
|
||||
self.worker.registerFunction("launcher:launch")
|
||||
self.worker.registerFunction("launcher:stop:%s" % self.hostname)
|
||||
self.worker.registerFunction("executor:execute")
|
||||
self.worker.registerFunction("executor:stop:%s" % self.hostname)
|
||||
self.worker.registerFunction("merger:merge")
|
||||
self.worker.registerFunction("merger:cat")
|
||||
|
||||
|
@ -398,15 +398,15 @@ class LaunchServer(object):
|
|||
return task
|
||||
|
||||
def run(self):
|
||||
self.log.debug("Starting launch listener")
|
||||
self.log.debug("Starting executor listener")
|
||||
while self._running:
|
||||
try:
|
||||
job = self.worker.getJob()
|
||||
try:
|
||||
if job.name == 'launcher:launch':
|
||||
self.log.debug("Got launch job: %s" % job.unique)
|
||||
self.launchJob(job)
|
||||
elif job.name.startswith('launcher:stop'):
|
||||
if job.name == 'executor:execute':
|
||||
self.log.debug("Got execute job: %s" % job.unique)
|
||||
self.executeJob(job)
|
||||
elif job.name.startswith('executor:stop'):
|
||||
self.log.debug("Got stop job: %s" % job.unique)
|
||||
self.stopJob(job)
|
||||
elif job.name == 'merger:cat':
|
||||
|
@ -426,7 +426,7 @@ class LaunchServer(object):
|
|||
except Exception:
|
||||
self.log.exception("Exception while getting job")
|
||||
|
||||
def launchJob(self, job):
|
||||
def executeJob(self, job):
|
||||
self.job_workers[job.unique] = AnsibleJob(self, job)
|
||||
self.job_workers[job.unique].run()
|
||||
|
||||
|
@ -481,8 +481,8 @@ class AnsibleJob(object):
|
|||
RESULT_UNREACHABLE = 3
|
||||
RESULT_ABORTED = 4
|
||||
|
||||
def __init__(self, launcher_server, job):
|
||||
self.launcher_server = launcher_server
|
||||
def __init__(self, executor_server, job):
|
||||
self.executor_server = executor_server
|
||||
self.job = job
|
||||
self.jobdir = None
|
||||
self.proc = None
|
||||
|
@ -490,16 +490,16 @@ class AnsibleJob(object):
|
|||
self.running = False
|
||||
self.aborted = False
|
||||
|
||||
if self.launcher_server.config.has_option(
|
||||
'launcher', 'private_key_file'):
|
||||
self.private_key_file = self.launcher_server.config.get(
|
||||
'launcher', 'private_key_file')
|
||||
if self.executor_server.config.has_option(
|
||||
'executor', 'private_key_file'):
|
||||
self.private_key_file = self.executor_server.config.get(
|
||||
'executor', 'private_key_file')
|
||||
else:
|
||||
self.private_key_file = '~/.ssh/id_rsa'
|
||||
|
||||
def run(self):
|
||||
self.running = True
|
||||
self.thread = threading.Thread(target=self.launch)
|
||||
self.thread = threading.Thread(target=self.execute)
|
||||
self.thread.start()
|
||||
|
||||
def stop(self):
|
||||
|
@ -507,13 +507,13 @@ class AnsibleJob(object):
|
|||
self.abortRunningProc()
|
||||
self.thread.join()
|
||||
|
||||
def launch(self):
|
||||
def execute(self):
|
||||
try:
|
||||
self.jobdir = JobDir(root=self.launcher_server.jobdir_root,
|
||||
keep=self.launcher_server.keep_jobdir)
|
||||
self._launch()
|
||||
self.jobdir = JobDir(root=self.executor_server.jobdir_root,
|
||||
keep=self.executor_server.keep_jobdir)
|
||||
self._execute()
|
||||
except Exception:
|
||||
self.log.exception("Exception while launching job")
|
||||
self.log.exception("Exception while executing job")
|
||||
self.job.sendWorkException(traceback.format_exc())
|
||||
finally:
|
||||
self.running = False
|
||||
|
@ -522,11 +522,11 @@ class AnsibleJob(object):
|
|||
except Exception:
|
||||
self.log.exception("Error cleaning up jobdir:")
|
||||
try:
|
||||
self.launcher_server.finishJob(self.job.unique)
|
||||
self.executor_server.finishJob(self.job.unique)
|
||||
except Exception:
|
||||
self.log.exception("Error finalizing job thread:")
|
||||
|
||||
def _launch(self):
|
||||
def _execute(self):
|
||||
self.log.debug("Job %s: beginning" % (self.job.unique,))
|
||||
self.log.debug("Job %s: args: %s" % (self.job.unique,
|
||||
self.job.arguments,))
|
||||
|
@ -537,7 +537,7 @@ class AnsibleJob(object):
|
|||
for project in args['projects']:
|
||||
self.log.debug("Job %s: updating project %s" %
|
||||
(self.job.unique, project['name']))
|
||||
tasks.append(self.launcher_server.update(
|
||||
tasks.append(self.executor_server.update(
|
||||
project['name'], project['url']))
|
||||
for task in tasks:
|
||||
task.wait()
|
||||
|
@ -546,14 +546,14 @@ class AnsibleJob(object):
|
|||
for project in args['projects']:
|
||||
self.log.debug("Cloning %s" % (project['name'],))
|
||||
repo = git.Repo.clone_from(
|
||||
os.path.join(self.launcher_server.merge_root,
|
||||
os.path.join(self.executor_server.merge_root,
|
||||
project['name']),
|
||||
os.path.join(self.jobdir.src_root,
|
||||
project['name']))
|
||||
repo.remotes.origin.config_writer.set('url', project['url'])
|
||||
|
||||
# Get a merger in order to update the repos involved in this job.
|
||||
merger = self.launcher_server._getMerger(self.jobdir.src_root)
|
||||
merger = self.executor_server._getMerger(self.jobdir.src_root)
|
||||
merge_items = [i for i in args['items'] if i.get('refspec')]
|
||||
if merge_items:
|
||||
commit = merger.mergeChanges(merge_items) # noqa
|
||||
|
@ -569,14 +569,14 @@ class AnsibleJob(object):
|
|||
self.prepareAnsibleFiles(args)
|
||||
|
||||
data = {
|
||||
'manager': self.launcher_server.hostname,
|
||||
'manager': self.executor_server.hostname,
|
||||
'url': 'https://server/job/{}/0/'.format(args['job']),
|
||||
'worker_name': 'My Worker',
|
||||
}
|
||||
|
||||
# TODOv3:
|
||||
# 'name': self.name,
|
||||
# 'manager': self.launch_server.hostname,
|
||||
# 'manager': self.executor_server.hostname,
|
||||
# 'worker_name': 'My Worker',
|
||||
# 'worker_hostname': 'localhost',
|
||||
# 'worker_ips': ['127.0.0.1', '192.168.1.1'],
|
||||
|
@ -696,7 +696,7 @@ class AnsibleJob(object):
|
|||
# Check out the playbook repo if needed and set the path to
|
||||
# the playbook that should be run.
|
||||
jobdir_playbook.trusted = playbook['trusted']
|
||||
source = self.launcher_server.connections.getSource(
|
||||
source = self.executor_server.connections.getSource(
|
||||
playbook['connection'])
|
||||
project = source.getProject(playbook['project'])
|
||||
# TODO(jeblair): construct the url in the merger itself
|
||||
|
@ -721,7 +721,7 @@ class AnsibleJob(object):
|
|||
# the stack of changes we are testing, so check out the branch
|
||||
# tip into a dedicated space.
|
||||
|