Merge "Execute builds via ZooKeeper"

This commit is contained in:
Zuul 2021-07-01 00:34:11 +00:00 committed by Gerrit Code Review
commit e1f0d49822
15 changed files with 689 additions and 707 deletions

View File

@ -35,7 +35,6 @@ which is described below.
Web [href="#web-server"]
Merger -- Gearman
Executor -- Gearman
Executor -- Statsd
Web -- Database
Web -- Gearman

View File

@ -68,7 +68,9 @@ import paramiko
import prometheus_client.exposition
from zuul.driver.sql.sqlconnection import DatabaseSession
from zuul.model import Change
from zuul.model import (
BuildRequest, Change, PRECEDENCE_NORMAL, WebInfo
)
from zuul.rpcclient import RPCClient
from zuul.driver.zuul import ZuulDriver
@ -89,9 +91,9 @@ from zuul.lib.collections import DefaultKeyDict
from zuul.lib.connections import ConnectionRegistry
from zuul.zk import ZooKeeperClient
from zuul.zk.event_queues import ConnectionEventQueue
from zuul.zk.executor import ExecutorApi
from psutil import Popen
import tests.fakegithub
import zuul.driver.gerrit.gerritsource as gerritsource
import zuul.driver.gerrit.gerritconnection as gerritconnection
import zuul.driver.git.gitwatcher as gitwatcher
@ -110,13 +112,14 @@ import zuul.lib.auth
import zuul.merger.client
import zuul.merger.merger
import zuul.merger.server
import zuul.model
import zuul.nodepool
import zuul.rpcclient
import zuul.configloader
from zuul.lib.config import get_default
from zuul.lib.logutil import get_annotated_logger
import tests.fakegithub
FIXTURE_DIR = os.path.join(os.path.dirname(__file__), 'fixtures')
KEEP_TEMPDIRS = bool(os.environ.get('KEEP_TEMPDIRS', False))
@ -2843,13 +2846,13 @@ class FakeStatsd(threading.Thread):
class FakeBuild(object):
log = logging.getLogger("zuul.test")
def __init__(self, executor_server, job):
def __init__(self, executor_server, build_request, params):
self.daemon = True
self.executor_server = executor_server
self.job = job
self.build_request = build_request
self.jobdir = None
self.uuid = job.unique
self.parameters = json.loads(job.arguments)
self.uuid = build_request.uuid
self.parameters = params
# TODOv3(jeblair): self.node is really "the label of the node
# assigned". We should rename it (self.node_label?) if we
# keep using it like this, or we may end up exposing more of
@ -3059,7 +3062,7 @@ class RecordingAnsibleJob(zuul.executor.server.AnsibleJob):
def recordResult(self, result):
self.executor_server.lock.acquire()
build = self.executor_server.job_builds.get(self.job.unique)
build = self.executor_server.job_builds.get(self.build_request.uuid)
if not build:
self.executor_server.lock.release()
# Already recorded
@ -3073,11 +3076,11 @@ class RecordingAnsibleJob(zuul.executor.server.AnsibleJob):
pipeline=build.parameters['zuul']['pipeline'])
)
self.executor_server.running_builds.remove(build)
del self.executor_server.job_builds[self.job.unique]
del self.executor_server.job_builds[self.build_request.uuid]
self.executor_server.lock.release()
def runPlaybooks(self, args):
build = self.executor_server.job_builds[self.job.unique]
build = self.executor_server.job_builds[self.build_request.uuid]
build.jobdir = self.jobdir
self.result = super(RecordingAnsibleJob, self).runPlaybooks(args)
@ -3093,7 +3096,7 @@ class RecordingAnsibleJob(zuul.executor.server.AnsibleJob):
def runAnsible(self, cmd, timeout, playbook, ansible_version,
wrapped=True, cleanup=False):
build = self.executor_server.job_builds[self.job.unique]
build = self.executor_server.job_builds[self.build_request.uuid]
if self.executor_server._run_ansible:
# Call run on the fake build omitting the result so we also can
@ -3121,12 +3124,12 @@ class RecordingAnsibleJob(zuul.executor.server.AnsibleJob):
return hosts
def pause(self):
build = self.executor_server.job_builds[self.job.unique]
build = self.executor_server.job_builds[self.build_request.uuid]
build.paused = True
super().pause()
def resume(self):
build = self.executor_server.job_builds.get(self.job.unique)
build = self.executor_server.job_builds.get(self.build_request.uuid)
if build:
build.paused = False
super().resume()
@ -3143,13 +3146,99 @@ class RecordingMergeClient(zuul.merger.client.MergeClient):
self.history = {}
def submitJob(self, name, data, build_set,
precedence=zuul.model.PRECEDENCE_NORMAL, event=None):
precedence=PRECEDENCE_NORMAL, event=None):
self.history.setdefault(name, [])
self.history[name].append((data, build_set))
return super().submitJob(
name, data, build_set, precedence, event=event)
class HoldableExecutorApi(ExecutorApi):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.hold_in_queue = False
@property
def initial_state(self):
if self.hold_in_queue:
return BuildRequest.HOLD
return BuildRequest.REQUESTED
class TestingExecutorApi(HoldableExecutorApi):
log = logging.getLogger("zuul.test.TestingExecutorApi")
def _test_getBuildsInState(self, *states):
# As this method is used for assertions in the tests, it
# should look up the build requests directly from ZooKeeper
# and not from a cache layer.
zones = []
if self.zone_filter:
zones = self.zone_filter
else:
try:
# Get all available zones from ZooKeeper
zones = self.kazoo_client.get_children(
'/'.join([self.BUILD_REQUEST_ROOT, 'zones']))
zones.append(None)
except kazoo.exceptions.NoNodeError:
zones = [None]
all_builds = []
for zone in zones:
try:
zone_path = self._getZoneRoot(zone)
builds = self.kazoo_client.get_children(zone_path)
except kazoo.exceptions.NoNodeError:
# Skip this zone as it doesn't have any builds
continue
for build_uuid in builds:
build = self.get("/".join([zone_path, build_uuid]))
if build and (not states or build.state in states):
all_builds.append(build)
all_builds.sort()
return all_builds
def release(self, what=None):
"""
Releases a build request which was previously put on hold for testing.
The what parameter specifies what to release. This can be a concrete
build request or a regular expression matching a job name.
"""
self.log.debug("Releasing builds matching %s", what)
if isinstance(what, BuildRequest):
self.log.debug("Releasing build %s", what)
what.state = BuildRequest.REQUESTED
self.update(what)
return
for build_request in self._test_getBuildsInState(
BuildRequest.HOLD):
# Either release all build requests in HOLD state or the ones
# matching the given job name pattern.
if what is None or (
build_request.params and
re.match(what, build_request.params["job"])):
self.log.debug("Releasing build %s", build_request)
build_request.state = BuildRequest.REQUESTED
self.update(build_request)
def queued(self):
return self._test_getBuildsInState(
BuildRequest.REQUESTED, BuildRequest.HOLD
)
def all(self):
return self._test_getBuildsInState()
class HoldableExecutorClient(zuul.executor.client.ExecutorClient):
_executor_api_class = HoldableExecutorApi
class RecordingExecutorServer(zuul.executor.server.ExecutorServer):
"""An Ansible executor to be used in tests.
@ -3243,25 +3332,22 @@ class RecordingExecutorServer(zuul.executor.server.ExecutorServer):
self.log.debug("Done releasing builds %s (%s)" %
(regex, len(builds)))
def executeJob(self, job):
build = FakeBuild(self, job)
job.build = build
def executeJob(self, build_request, params):
build = FakeBuild(self, build_request, params)
self.running_builds.append(build)
self.job_builds[job.unique] = build
args = json.loads(job.arguments)
args['zuul']['_test'] = dict(test_root=self._test_root)
job.arguments = json.dumps(args)
super(RecordingExecutorServer, self).executeJob(job)
self.job_builds[build_request.uuid] = build
params['zuul']['_test'] = dict(test_root=self._test_root)
self.executor_api.update(build_request)
super(RecordingExecutorServer, self).executeJob(build_request, params)
def stopJob(self, job):
def stopJob(self, build_request: BuildRequest):
self.log.debug("handle stop")
parameters = json.loads(job.arguments)
uuid = parameters['uuid']
uuid = build_request.uuid
for build in self.running_builds:
if build.unique == uuid:
build.aborted = True
build.release()
super(RecordingExecutorServer, self).stopJob(job)
super(RecordingExecutorServer, self).stopJob(build_request)
def stop(self):
for build in self.running_builds:
@ -3271,6 +3357,7 @@ class RecordingExecutorServer(zuul.executor.server.ExecutorServer):
class TestScheduler(zuul.scheduler.Scheduler):
_merger_client_class = RecordingMergeClient
_executor_client_class = HoldableExecutorClient
class FakeGearmanServer(gear.Server):
@ -3307,9 +3394,7 @@ class FakeGearmanServer(gear.Server):
for job in job_queue:
self.jobs_history.append(job)
if not hasattr(job, 'waiting'):
if job.name.startswith(b'executor:execute'):
job.waiting = self.hold_jobs_in_queue
elif job.name.startswith(b'merger:'):
if job.name.startswith(b'merger:'):
job.waiting = self.hold_merge_jobs_in_queue
else:
job.waiting = False
@ -3788,7 +3873,7 @@ class ZuulWebFixture(fixtures.Fixture):
rpcclient: RPCClient, poller_events, git_url_with_auth: bool,
add_cleanup: Callable[[Callable[[], None]], None],
test_root: str, fake_sql: bool = True,
info: Optional[zuul.model.WebInfo] = None):
info: Optional[WebInfo] = None):
super(ZuulWebFixture, self).__init__()
self.config = config
self.connections = TestConnectionRegistry(
@ -3803,7 +3888,7 @@ class ZuulWebFixture(fixtures.Fixture):
self.authenticators = zuul.lib.auth.AuthenticatorRegistry()
self.authenticators.configure(config)
if info is None:
self.info = zuul.model.WebInfo.fromConfig(config)
self.info = WebInfo.fromConfig(config)
else:
self.info = info
self.test_root = test_root
@ -4080,7 +4165,6 @@ class SchedulerTestApp:
def start(self, validate_tenants: list):
self.sched.start()
self.sched.executor.gearman.waitForServer()
self.sched.prime(self.config, validate_tenants=validate_tenants)
def fullReconfigure(self):
@ -4402,6 +4486,7 @@ class ZuulTestCase(BaseTestCase):
self.git_url_with_auth, self.addCleanup, True)
executor_connections.configure(self.config,
source_only=self.source_only)
self.executor_api = TestingExecutorApi(self.zk_client)
self.executor_server = RecordingExecutorServer(
self.config,
executor_connections,
@ -4865,26 +4950,23 @@ class ZuulTestCase(BaseTestCase):
return sorted(self.builds, key=lambda x: x.name)
def release(self, job):
if isinstance(job, FakeBuild):
job.release()
else:
job.waiting = False
self.log.debug("Queued job %s released" % job.unique)
self.gearman_server.wakeConnections()
job.release()
def getParameter(self, job, name):
if isinstance(job, FakeBuild):
return job.parameters[name]
else:
parameters = json.loads(job.arguments)
return parameters[name]
@property
def hold_jobs_in_queue(self):
return self.executor_api.hold_in_queue
@hold_jobs_in_queue.setter
def hold_jobs_in_queue(self, hold_jobs_in_queue: bool):
"""Helper method to set hold_in_queue on all involved BuildQueues"""
self.executor_api.hold_in_queue = hold_jobs_in_queue
for app in self.scheds:
app.sched.executor.executor_api.hold_in_queue = hold_jobs_in_queue
def __haveAllBuildsReported(self, matcher) -> bool:
for app in self.scheds.filter(matcher):
executor_client = app.sched.executor
# See if Zuul is waiting on a meta job to complete
if executor_client.meta_jobs:
return False
# Find out if every build that the worker has completed has been
# reported back to Zuul. If it hasn't then that means a Gearman
# event is still in transit and the system is not stable.
@ -4895,58 +4977,54 @@ class ZuulTestCase(BaseTestCase):
continue
# It hasn't been reported yet.
return False
# Make sure that none of the worker connections are in GRAB_WAIT
worker = self.executor_server.executor_gearworker.gearman
for connection in worker.active_connections:
if connection.state == 'GRAB_WAIT':
return False
return True
def __areAllBuildsWaiting(self, matcher) -> bool:
# TODO (felix): With all build requests stored in ZK would it be
# sufficient to query ZK for all known builds and make assertions based
# on their state?
for app in self.scheds.filter(matcher):
executor_client = app.sched.executor
builds = executor_client.builds.values()
seen_builds = set()
for build in builds:
seen_builds.add(build.uuid)
client_job = None
for conn in executor_client.gearman.active_connections:
for j in conn.related_jobs.values():
if j.unique == build.uuid:
client_job = j
break
if not client_job:
self.log.debug("%s is not known to the gearman client" %
build)
return False
if not client_job.handle:
self.log.debug("%s has no handle" % client_job)
return False
server_job = self.gearman_server.jobs.get(client_job.handle)
if not server_job:
self.log.debug("%s is not known to the gearman server" %
client_job)
return False
if not hasattr(server_job, 'waiting'):
self.log.debug("%s is being enqueued" % server_job)
return False
if server_job.waiting:
# Noop jobs are now added to the local build list in the
# executor client, so they can be looked up in the scheduler
# when the build result events are processed.
# As most of the following tests don't make much sense for
# those builds and they are - per definition - completed
# immediately, we can simply skip them.
if build.job.name == "noop":
continue
if build.url is None:
self.log.debug("%s has not reported start" % build)
if not build.build_request_ref:
self.log.debug("%s has not been submitted", build)
return False
# using internal ServerJob which offers no Text interface
worker_build = self.executor_server.job_builds.get(
server_job.unique.decode('utf8'))
build_request = self.executor_api.get(build.build_request_ref)
if not build_request:
self.log.debug("%s is not known in Zookeeper", build)
return False
if build_request.state == BuildRequest.HOLD:
continue
if build.url is None:
self.log.debug("%s has not reported start", build)
return False
# Check if the build is currently processed by the
# RecordingExecutorServer.
worker_build = self.executor_server.job_builds.get(build.uuid)
if worker_build:
if build.paused:
continue
if worker_build.isWaiting():
continue
self.log.debug("%s is running" % worker_build)
self.log.debug("%s is running", worker_build)
return False
else:
self.log.debug("%s is unassigned" % server_job)
self.log.debug("%s is unassigned", build)
return False
for (build_uuid, job_worker) in \
self.executor_server.job_workers.items():
@ -5075,6 +5153,8 @@ class ZuulTestCase(BaseTestCase):
lambda app: app.sched.run_handler_lock.release())
self.executor_server.lock.release()
self.scheds.execute(lambda app: app.sched.wake_event.wait(0.1))
# Let other threads work
time.sleep(0.1)
def _logQueueStatus(self, logger, matcher, all_zk_queues_empty,
all_merge_jobs_waiting, all_builds_reported,
@ -5278,10 +5358,10 @@ class ZuulTestCase(BaseTestCase):
getattr(self.builds[i], k), v,
"Element %i in builds does not match" % (i,))
except Exception:
if not self.builds:
self.log.error("No running builds")
for build in self.builds:
self.log.error("Running build: %s" % build)
else:
self.log.error("No running builds")
raise
def assertHistory(self, history, ordered=True):

View File

@ -239,7 +239,7 @@ class TestGerritToGithubCRD(ZuulTestCase):
def test_crd_check(self):
"Test cross-repo dependencies in independent pipelines"
self.executor_server.hold_jobs_in_build = True
self.gearman_server.hold_jobs_in_queue = True
self.hold_jobs_in_queue = True
A = self.fake_gerrit.addFakeChange('gerrit/project1', 'master', 'A')
B = self.fake_github.openFakePullRequest(
'github/project2', 'master', 'B')
@ -251,8 +251,8 @@ class TestGerritToGithubCRD(ZuulTestCase):
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
self.gearman_server.hold_jobs_in_queue = False
self.gearman_server.release()
self.hold_jobs_in_queue = False
self.executor_api.release()
self.waitUntilSettled()
self.executor_server.release('.*-merge')
@ -334,7 +334,7 @@ class TestGerritToGithubCRD(ZuulTestCase):
def _test_crd_check_reconfiguration(self, project1, project2):
"Test cross-repo dependencies re-enqueued in independent pipelines"
self.gearman_server.hold_jobs_in_queue = True
self.hold_jobs_in_queue = True
A = self.fake_gerrit.addFakeChange('gerrit/project1', 'master', 'A')
B = self.fake_github.openFakePullRequest(
'github/project2', 'master', 'B')
@ -359,8 +359,8 @@ class TestGerritToGithubCRD(ZuulTestCase):
self.assertFalse(first_item.live)
self.assertTrue(queue.queue[1].live)
self.gearman_server.hold_jobs_in_queue = False
self.gearman_server.release()
self.hold_jobs_in_queue = False
self.executor_api.release()
self.waitUntilSettled()
self.assertEqual(A.data['status'], 'NEW')
@ -694,7 +694,7 @@ class TestGithubToGerritCRD(ZuulTestCase):
def test_crd_check(self):
"Test cross-repo dependencies in independent pipelines"
self.executor_server.hold_jobs_in_build = True
self.gearman_server.hold_jobs_in_queue = True
self.hold_jobs_in_queue = True
A = self.fake_github.openFakePullRequest('github/project2', 'master',
'A')
B = self.fake_gerrit.addFakeChange(
@ -706,8 +706,8 @@ class TestGithubToGerritCRD(ZuulTestCase):
self.fake_github.emitEvent(A.getPullRequestEditedEvent())
self.waitUntilSettled()
self.gearman_server.hold_jobs_in_queue = False
self.gearman_server.release()
self.hold_jobs_in_queue = False
self.executor_api.release()
self.waitUntilSettled()
self.executor_server.release('.*-merge')
@ -787,7 +787,7 @@ class TestGithubToGerritCRD(ZuulTestCase):
def _test_crd_check_reconfiguration(self, project1, project2):
"Test cross-repo dependencies re-enqueued in independent pipelines"
self.gearman_server.hold_jobs_in_queue = True
self.hold_jobs_in_queue = True
A = self.fake_github.openFakePullRequest('github/project2', 'master',
'A')
B = self.fake_gerrit.addFakeChange(
@ -812,8 +812,8 @@ class TestGithubToGerritCRD(ZuulTestCase):
self.assertFalse(first_item.live)
self.assertTrue(queue.queue[1].live)
self.gearman_server.hold_jobs_in_queue = False
self.gearman_server.release()
self.hold_jobs_in_queue = False
self.executor_api.release()
self.waitUntilSettled()
self.assertFalse(A.is_merged)

View File

@ -21,10 +21,6 @@ import os
import time
from unittest import mock
import zuul.executor.server
import zuul.model
import gear
from tests.base import (
BaseTestCase,
ZuulTestCase,
@ -36,7 +32,9 @@ from tests.base import (
from zuul.executor.sensors.startingbuilds import StartingBuildsSensor
from zuul.executor.sensors.ram import RAMSensor
from zuul.executor.server import AnsibleJob, squash_variables
from zuul.lib.ansible import AnsibleManager
from zuul.model import BuildRequest
class TestExecutorRepos(ZuulTestCase):
@ -437,10 +435,22 @@ class TestAnsibleJob(ZuulTestCase):
def setUp(self):
super(TestAnsibleJob, self).setUp()
ansible_version = AnsibleManager().default_version
args = '{"ansible_version": "%s"}' % ansible_version
job = gear.TextJob('executor:execute', args, unique='test')
self.test_job = zuul.executor.server.AnsibleJob(self.executor_server,
job)
params = {
"ansible_version": ansible_version,
"zuul_event_id": 0,
}
build_request = BuildRequest(
"test",
state=None,
precedence=200,
params=params,
zone=None,
tenant_name=None,
pipeline_name=None,
event_id='1',
)
self.test_job = AnsibleJob(self.executor_server, build_request, params)
def test_getHostList_host_keys(self):
# Test without connection_port set
@ -1001,7 +1011,7 @@ class TestVarSquash(BaseTestCase):
extravars = {
'extra': 'extravar_extra',
}
out = zuul.executor.server.squash_variables(
out = squash_variables(
nodes, groups, jobvars, groupvars, extravars)
expected = {

View File

@ -395,7 +395,7 @@ class TestGerritCRD(ZuulTestCase):
"Test cross-repo dependencies in independent pipelines"
self.executor_server.hold_jobs_in_build = True
self.gearman_server.hold_jobs_in_queue = True
self.hold_jobs_in_queue = True
A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
B = self.fake_gerrit.addFakeChange('org/project2', 'master', 'B')
@ -406,8 +406,8 @@ class TestGerritCRD(ZuulTestCase):
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
self.gearman_server.hold_jobs_in_queue = False
self.gearman_server.release()
self.hold_jobs_in_queue = False
self.executor_api.release()
self.waitUntilSettled()
self.executor_server.release('.*-merge')
@ -505,7 +505,7 @@ class TestGerritCRD(ZuulTestCase):
def _test_crd_check_reconfiguration(self, project1, project2):
"Test cross-repo dependencies re-enqueued in independent pipelines"
self.gearman_server.hold_jobs_in_queue = True
self.hold_jobs_in_queue = True
A = self.fake_gerrit.addFakeChange(project1, 'master', 'A')
B = self.fake_gerrit.addFakeChange(project2, 'master', 'B')
@ -529,8 +529,8 @@ class TestGerritCRD(ZuulTestCase):
self.assertFalse(first_item.live)
self.assertTrue(queue.queue[1].live)
self.gearman_server.hold_jobs_in_queue = False
self.gearman_server.release()
self.hold_jobs_in_queue = False
self.executor_api.release()
self.waitUntilSettled()
self.assertEqual(A.data['status'], 'NEW')
@ -556,7 +556,7 @@ class TestGerritCRD(ZuulTestCase):
def test_crd_check_ignore_dependencies(self):
"Test cross-repo dependencies can be ignored"
self.gearman_server.hold_jobs_in_queue = True
self.hold_jobs_in_queue = True
A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
B = self.fake_gerrit.addFakeChange('org/project2', 'master', 'B')
C = self.fake_gerrit.addFakeChange('org/project2', 'master', 'C')
@ -580,8 +580,8 @@ class TestGerritCRD(ZuulTestCase):
for item in check_pipeline.getAllItems():
self.assertTrue(item.live)
self.gearman_server.hold_jobs_in_queue = False
self.gearman_server.release()
self.hold_jobs_in_queue = False
self.executor_api.release()
self.waitUntilSettled()
self.assertEqual(A.data['status'], 'NEW')
@ -736,7 +736,7 @@ class TestGerritCRDAltBaseUrl(ZuulTestCase):
"Test basic cross-repo dependencies with an alternate gerrit baseurl"
self.executor_server.hold_jobs_in_build = True
self.gearman_server.hold_jobs_in_queue = True
self.hold_jobs_in_queue = True
A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
B = self.fake_gerrit.addFakeChange('org/project2', 'master', 'B')
@ -748,8 +748,8 @@ class TestGerritCRDAltBaseUrl(ZuulTestCase):
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
self.gearman_server.hold_jobs_in_queue = False
self.gearman_server.release()
self.hold_jobs_in_queue = False
self.executor_api.release()
self.waitUntilSettled()
self.executor_server.release('.*-merge')

View File

@ -324,7 +324,7 @@ class TestGerritLegacyCRD(ZuulTestCase):
"Test cross-repo dependencies in independent pipelines"
self.executor_server.hold_jobs_in_build = True
self.gearman_server.hold_jobs_in_queue = True
self.hold_jobs_in_queue = True
A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
B = self.fake_gerrit.addFakeChange('org/project2', 'master', 'B')
@ -335,8 +335,8 @@ class TestGerritLegacyCRD(ZuulTestCase):
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
self.gearman_server.hold_jobs_in_queue = False
self.gearman_server.release()
self.hold_jobs_in_queue = False
self.executor_api.release()
self.waitUntilSettled()
self.executor_server.release('.*-merge')
@ -434,7 +434,7 @@ class TestGerritLegacyCRD(ZuulTestCase):
def _test_crd_check_reconfiguration(self, project1, project2):
"Test cross-repo dependencies re-enqueued in independent pipelines"
self.gearman_server.hold_jobs_in_queue = True
self.hold_jobs_in_queue = True
A = self.fake_gerrit.addFakeChange(project1, 'master', 'A')
B = self.fake_gerrit.addFakeChange(project2, 'master', 'B')
@ -458,8 +458,8 @@ class TestGerritLegacyCRD(ZuulTestCase):
self.assertFalse(first_item.live)
self.assertTrue(queue.queue[1].live)
self.gearman_server.hold_jobs_in_queue = False
self.gearman_server.release()
self.hold_jobs_in_queue = False
self.executor_api.release()
self.waitUntilSettled()
self.assertEqual(A.data['status'], 'NEW')
@ -485,7 +485,7 @@ class TestGerritLegacyCRD(ZuulTestCase):
def test_crd_check_ignore_dependencies(self):
"Test cross-repo dependencies can be ignored"
self.gearman_server.hold_jobs_in_queue = True
self.hold_jobs_in_queue = True
A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
B = self.fake_gerrit.addFakeChange('org/project2', 'master', 'B')
C = self.fake_gerrit.addFakeChange('org/project2', 'master', 'C')
@ -509,8 +509,8 @@ class TestGerritLegacyCRD(ZuulTestCase):
for item in check_pipeline.getAllItems():
self.assertTrue(item.live)
self.gearman_server.hold_jobs_in_queue = False
self.gearman_server.release()
self.hold_jobs_in_queue = False
self.executor_api.release()
self.waitUntilSettled()
self.assertEqual(A.data['status'], 'NEW')

View File

@ -34,7 +34,7 @@ class TestInventoryBase(ZuulTestCase):
if shell_type:
self.fake_nodepool.shell_type = shell_type
self.executor_server.hold_jobs_in_build = True
self.gearman_server.hold_jobs_in_queue = True
self.hold_jobs_in_queue = True
if self.use_gerrit:
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
@ -67,8 +67,8 @@ class TestInventoryBase(ZuulTestCase):
return yaml.ansible_unsafe_load(open(setup_inv_path, 'r'))
def runJob(self, name):
self.gearman_server.hold_jobs_in_queue = False
self.gearman_server.release('^%s$' % name)
self.hold_jobs_in_queue = False
self.executor_api.release(f'^{name}$')
self.waitUntilSettled()
def cancelExecutorJobs(self):
@ -76,7 +76,7 @@ class TestInventoryBase(ZuulTestCase):
executor_client = app.sched.executor
builds = [b for b in executor_client.builds.values()]
for build in builds:
executor_client.cancelJobInQueue(build)
executor_client.cancel(build)
class TestInventoryGithub(TestInventoryBase):

View File

@ -15,16 +15,15 @@
import configparser
import gc
import json
import textwrap
import os
import re
import shutil
import socket
import textwrap
import threading
import time
from collections import namedtuple
from unittest import mock
from unittest import skip
from unittest import mock, skip
from kazoo.exceptions import NoNodeError
import git
@ -126,19 +125,19 @@ class TestSchedulerZone(ZuulTestCase):
'zuul.executors.zone.test-provider_vpn.online',
value='1', kind='g')
self.gearman_server.hold_jobs_in_queue = True
self.hold_jobs_in_queue = True
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
A.addApproval('Code-Review', 2)
self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
self.waitUntilSettled()
queue = self.gearman_server.getQueue()
queue = list(self.executor_api.queued())
self.assertEqual(len(self.builds), 0)
self.assertEqual(len(queue), 1)
self.assertEqual(b'executor:execute:test-provider.vpn', queue[0].name)
self.assertEqual('test-provider.vpn', queue[0].zone)
self.gearman_server.hold_jobs_in_queue = False
self.gearman_server.release()
self.hold_jobs_in_queue = False
self.executor_api.release()
self.waitUntilSettled()
self.assertEqual(self.getJobFromHistory('project-merge').result,
@ -163,6 +162,45 @@ class TestSchedulerZone(ZuulTestCase):
'zuul.executors.zone.test-provider_vpn.accepting',
value='1', kind='g')
@skip("Disabled until I0245b71f31aae9616d8e65d27c63b25b2c27f815")
def test_executor_disconnect(self):
"Test that jobs are completed after an executor disconnect"
self.executor_server.hold_jobs_in_build = True
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
A.addApproval('Code-Review', 2)
self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
self.waitUntilSettled()
# Forcibly disconnect the executor from ZK
self.executor_server.zk_client.client.stop()
self.executor_server.zk_client.client.start()
# Find the build in the scheduler so we can check its status
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
items = tenant.layout.pipelines['gate'].getAllItems()
builds = items[0].current_build_set.getBuilds()
build = builds[0]
# Clean up the build
self.scheds.first.sched.executor.cleanupLostBuildRequests()
# Wait for the build to be reported as lost
for x in iterate_timeout(30, 'retry build'):
if build.result == 'RETRY':
break
# If we didn't timeout, then it worked; we're done
self.executor_server.hold_jobs_in_build = False
self.executor_server.release()
self.waitUntilSettled()
self.assertHistory([
dict(name='project-merge', result='ABORTED', changes='1,1'),
dict(name='project-merge', result='SUCCESS', changes='1,1'),
dict(name='project-test1', result='SUCCESS', changes='1,1'),
dict(name='project-test2', result='SUCCESS', changes='1,1'),
], ordered=False)
class TestSchedulerZoneFallback(ZuulTestCase):
tenant_config_file = 'config/single-tenant/main.yaml'
@ -175,19 +213,19 @@ class TestSchedulerZoneFallback(ZuulTestCase):
def test_jobs_executed(self):
"Test that jobs are executed and a change is merged per zone"
self.gearman_server.hold_jobs_in_queue = True
self.hold_jobs_in_queue = True
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
A.addApproval('Code-Review', 2)
self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
self.waitUntilSettled()
queue = self.gearman_server.getQueue()
queue = list(self.executor_api.queued())
self.assertEqual(len(self.builds), 0)
self.assertEqual(len(queue), 1)
self.assertEqual(b'executor:execute', queue[0].name)
self.assertEqual(None, queue[0].zone)
self.gearman_server.hold_jobs_in_queue = False
self.gearman_server.release()
self.hold_jobs_in_queue = False
self.executor_api.release()
self.waitUntilSettled()
self.assertEqual(self.getJobFromHistory('project-merge').result,
@ -945,7 +983,7 @@ class TestScheduler(ZuulTestCase):
def test_failed_change_at_head_with_queue(self):
"Test that if a change at the head fails, queued jobs are canceled"
self.gearman_server.hold_jobs_in_queue = True
self.hold_jobs_in_queue = True
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
C = self.fake_gerrit.addFakeChange('org/project', 'master', 'C')
@ -960,54 +998,42 @@ class TestScheduler(ZuulTestCase):
self.fake_gerrit.addEvent(C.addApproval('Approved', 1))
self.waitUntilSettled()
queue = self.gearman_server.getQueue()
queue = list(self.executor_api.queued())
self.assertEqual(len(self.builds), 0)
self.assertEqual(len(queue), 1)
self.assertEqual(queue[0].name, b'executor:execute')
job_args = json.loads(queue[0].arguments.decode('utf8'))
self.assertEqual(queue[0].zone, None)
job_args = queue[0].params
self.assertEqual(job_args['job'], 'project-merge')
self.assertEqual(job_args['items'][0]['number'], '%d' % A.number)
self.gearman_server.release('.*-merge')
self.executor_api.release('.*-merge')
self.waitUntilSettled()
self.gearman_server.release('.*-merge')
self.executor_api.release('.*-merge')
self.waitUntilSettled()
self.gearman_server.release('.*-merge')
self.executor_api.release('.*-merge')
self.waitUntilSettled()
queue = self.gearman_server.getQueue()
queue = list(self.executor_api.queued())
self.assertEqual(len(self.builds), 0)
self.assertEqual(len(queue), 6)
self.assertEqual(
json.loads(queue[0].arguments.decode('utf8'))['job'],
'project-test1')
self.assertEqual(
json.loads(queue[1].arguments.decode('utf8'))['job'],
'project-test2')
self.assertEqual(
json.loads(queue[2].arguments.decode('utf8'))['job'],
'project-test1')
self.assertEqual(
json.loads(queue[3].arguments.decode('utf8'))['job'],
'project-test2')
self.assertEqual(
json.loads(queue[4].arguments.decode('utf8'))['job'],
'project-test1')
self.assertEqual(
json.loads(queue[5].arguments.decode('utf8'))['job'],
'project-test2')
self.assertEqual(queue[0].params['job'], 'project-test1')
self.assertEqual(queue[1].params['job'], 'project-test2')
self.assertEqual(queue[2].params['job'], 'project-test1')
self.assertEqual(queue[3].params['job'], 'project-test2')
self.assertEqual(queue[4].params['job'], 'project-test1')
self.assertEqual(queue[5].params['job'], 'project-test2')
self.release(queue[0])
self.executor_api.release(queue[0])
self.waitUntilSettled()
self.assertEqual(len(self.builds), 0)
queue = self.gearman_server.getQueue()
queue = list(self.executor_api.queued())
self.assertEqual(len(queue), 2) # project-test2, project-merge for B
self.assertEqual(self.countJobResults(self.history, 'ABORTED'), 0)
self.gearman_server.hold_jobs_in_queue = False
self.gearman_server.release()
self.hold_jobs_in_queue = False
self.executor_api.release()
self.waitUntilSettled()
self.assertEqual(len(self.builds), 0)
@ -1374,7 +1400,7 @@ class TestScheduler(ZuulTestCase):
def test_project_merge_conflict(self):
"Test that gate merge conflicts are handled properly"
self.gearman_server.hold_jobs_in_queue = True
self.hold_jobs_in_queue = True
A = self.fake_gerrit.addFakeChange('org/project',
'master', 'A',
files={'conflict': 'foo'})
@ -1394,15 +1420,15 @@ class TestScheduler(ZuulTestCase):
self.assertEqual(A.reported, 1)
self.assertEqual(C.reported, 1)
self.gearman_server.release('project-merge')
self.executor_api.release('project-merge')
self.waitUntilSettled()
self.gearman_server.release('project-merge')
self.executor_api.release('project-merge')
self.waitUntilSettled()
self.gearman_server.release('project-merge')
self.executor_api.release('project-merge')
self.waitUntilSettled()
self.gearman_server.hold_jobs_in_queue = False
self.gearman_server.release()
self.hold_jobs_in_queue = False
self.executor_api.release()
self.waitUntilSettled()
self.assertEqual(A.data['status'], 'MERGED')
@ -1424,11 +1450,11 @@ class TestScheduler(ZuulTestCase):
def test_delayed_merge_conflict(self):
"Test that delayed check merge conflicts are handled properly"
# Hold jobs in the gearman queue so that we can test whether
# Hold jobs in the ZooKeeper queue so that we can test whether
# the executor sucesfully merges a change based on an old
# repo state (frozen by the scheduler) which would otherwise
# conflict.
self.gearman_server.hold_jobs_in_queue = True
self.hold_jobs_in_queue = True
A = self.fake_gerrit.addFakeChange('org/project',
'master', 'A',
files={'conflict': 'foo'})
@ -1454,16 +1480,16 @@ class TestScheduler(ZuulTestCase):
# A merges while B and C are queued in check
# Release A project-merge
queue = self.gearman_server.getQueue()
self.release(queue[0])
queue = list(self.executor_api.queued())
self.executor_api.release(queue[0])
self.waitUntilSettled()
# Release A project-test*
# gate has higher precedence, so A's test jobs are added in
# front of the merge jobs for B and C
queue = self.gearman_server.getQueue()
self.release(queue[0])
self.release(queue[1])
queue = list(self.executor_api.queued())
self.executor_api.release(queue[0])
self.executor_api.release(queue[1])
self.waitUntilSettled()
self.assertEqual(A.data['status'], 'MERGED')
@ -1480,13 +1506,13 @@ class TestScheduler(ZuulTestCase):
# B and C report merge conflicts
# Release B project-merge
queue = self.gearman_server.getQueue()
self.release(queue[0])
queue = list(self.executor_api.queued())
self.executor_api.release(queue[0])
self.waitUntilSettled()
# Release C
self.gearman_server.hold_jobs_in_queue = False
self.gearman_server.release()
self.hold_jobs_in_queue = False
self.executor_api.release()
self.waitUntilSettled()
self.assertEqual(A.data['status'], 'MERGED')
@ -2740,14 +2766,24 @@ class TestScheduler(ZuulTestCase):
if all(b.worker.name != "Unknown" for b in builds):
break
tevent = threading.Event()
def data_watch(data, stat, event):
if not any([data, stat, event]):
return
# Set the threading event as soon as the cancel node is present
tevent.set()
return False
builds = list(self.executor_api.all())
# Use a DataWatch to avoid a race condition between creating and
# immediately deleting the cancel node in ZooKeeper.
self.zk_client.client.DataWatch(f"{builds[0].path}/cancel", data_watch)
# Abandon change to cancel build
self.fake_gerrit.addEvent(A.getChangeAbandonedEvent())
for _ in iterate_timeout(30, 'Wait for executor:stop request'):
stop_jobs = [x for x in self.gearman_server.jobs_history
if b'executor:stop' in x.name]
if stop_jobs:
break
self.assertTrue(tevent.wait(timeout=30))
self.executor_server.hold_jobs_in_start = False
self.waitUntilSettled()
@ -2907,7 +2943,8 @@ class TestScheduler(ZuulTestCase):
self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
self.waitUntilSettled()
self.assertEqual(len(self.gearman_server.getQueue()), 0)
queue = list(self.executor_api.queued())
self.assertEqual(len(queue), 0)
self.assertTrue(self.scheds.first.sched._areAllBuildsComplete())
self.assertEqual(len(self.history), 0)
self.assertEqual(A.data['status'], 'MERGED')
@ -3387,7 +3424,7 @@ class TestScheduler(ZuulTestCase):
def test_queue_precedence(self):
"Test that queue precedence works"
self.gearman_server.hold_jobs_in_queue = True
self.hold_jobs_in_queue = True
self.executor_server.hold_jobs_in_build = True
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
@ -3395,8 +3432,8 @@ class TestScheduler(ZuulTestCase):
self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
self.waitUntilSettled()
self.gearman_server.hold_jobs_in_queue = False
self.gearman_server.release()
self.hold_jobs_in_queue = False
self.executor_api.release()
self.waitUntilSettled()
# Run one build at a time to ensure non-race order:
@ -5891,6 +5928,7 @@ For CI problems and help debugging, contact ci@example.org"""
self.assertEqual(A.reported, 1)
self.assertIn('RETRY_LIMIT', A.messages[0])
@skip("Disabled until I0245b71f31aae9616d8e65d27c63b25b2c27f815")
def test_executor_disconnect(self):
"Test that jobs are completed after an executor disconnect"
@ -5900,22 +5938,21 @@ For CI problems and help debugging, contact ci@example.org"""
self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
self.waitUntilSettled()
# Forcibly disconnect the executor from gearman
for connection in self.executor_server.executor_gearworker.\
gearman.active_connections:
connection.conn.shutdown(socket.SHUT_RDWR)
# Wake up the cleanup thread since it is on a 5 minute interval
self.scheds.first.sched.executor.cleanup_thread.wake_event.set()
# Forcibly disconnect the executor from ZK
self.executor_server.zk_client.client.stop()
self.executor_server.zk_client.client.start()
# Find the build in the scheduler so we can check its status
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
items = tenant.layout.pipelines['gate'].getAllItems()
builds = items[0].current_build_set.getBuilds()
build = builds[0]
# Clean up the build
self.scheds.first.sched.executor.cleanupLostBuildRequests()
# Wait for the build to be reported as lost
for x in iterate_timeout(30, 'lost build'):
if build.result == 'LOST':
for x in iterate_timeout(30, 'retry build'):
if build.result == 'RETRY':
break
# If we didn't timeout, then it worked; we're done
@ -5923,16 +5960,14 @@ For CI problems and help debugging, contact ci@example.org"""
self.executor_server.release()
self.waitUntilSettled()
# LOST builds aren't recorded in the test history; instead the
# original build will be reported as success since it
# continued after our fake disconnect. However, the fact that
# it's the *only* build, and the other two jobs are not
# present is extra confirmation that the scheduler saw the
# build as lost.
self.assertHistory([
dict(name='project-merge', result='ABORTED', changes='1,1'),
dict(name='project-merge', result='SUCCESS', changes='1,1'),
])
dict(name='project-test1', result='SUCCESS', changes='1,1'),
dict(name='project-test2', result='SUCCESS', changes='1,1'),
], ordered=False)
@skip("Disabled until I0245b71f31aae9616d8e65d27c63b25b2c27f815")
def test_scheduler_disconnect(self):
"Test that jobs are completed after a scheduler disconnect"
@ -5942,29 +5977,18 @@ For CI problems and help debugging, contact ci@example.org"""
self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
self.waitUntilSettled()
# Forcibly disconnect the scheduler from gearman
for connection in self.scheds.first.sched.executor.gearman.\
active_connections:
connection.conn.shutdown(socket.SHUT_RDWR)
# Forcibly disconnect the scheduler from ZK
self.scheds.execute(lambda app: app.sched.zk_client.client.stop())
self.scheds.execute(lambda app: app.sched.zk_client.client.start())
# Find the build in the scheduler so we can check its status
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
items = tenant.layout.pipelines['gate'].getAllItems()
builds = items[0].current_build_set.getBuilds()
build = builds[0]
# Wait for the build to be reported as lost
for x in iterate_timeout(30, 'lost build'):
if build.result == 'RETRY':
break
# Clean up lost builds
self.scheds.first.sched.executor.cleanupLostBuildRequests()
# If we didn't timeout, then it worked; we're done
self.executor_server.hold_jobs_in_build = False
self.executor_server.release()
self.waitUntilSettled()
# There's an extra merge build due to the retry
self.assertHistory([
dict(name='project-merge', result='SUCCESS', changes='1,1'),
dict(name='project-merge', result='SUCCESS', changes='1,1'),
dict(name='project-test1', result='SUCCESS', changes='1,1'),
dict(name='project-test2', result='SUCCESS', changes='1,1'),

View File

@ -4468,7 +4468,7 @@ class TestDataReturn(AnsibleZuulTestCase):
# Stop the job worker to simulate an executor restart
for job_worker in self.executor_server.job_workers.values():
if job_worker.job.unique == paused_job.uuid:
if job_worker.build_request.uuid == paused_job.uuid:
job_worker.stop()
self.waitUntilSettled("stop job worker")
@ -6131,7 +6131,7 @@ class TestJobPause(AnsibleZuulTestCase):
# Stop the job worker of compile1 to simulate an executor restart
for job_worker in self.executor_server.job_workers.values():
if job_worker.job.unique == compile1.unique:
if job_worker.build_request.uuid == compile1.unique:
job_worker.stop()
self.waitUntilSettled("Stop job")
@ -6368,7 +6368,7 @@ class TestJobPause(AnsibleZuulTestCase):
# Stop the job worker of test to simulate an executor restart
job_test = self.builds[1]
for job_worker in self.executor_server.job_workers.values():
if job_worker.job.unique == job_test.unique:
if job_worker.build_request.uuid == job_test.uuid:
job_worker.stop()
self.executor_server.hold_jobs_in_build = False

View File

@ -34,7 +34,7 @@ from zuul.zk.components import (
BaseComponent, ComponentRegistry, ExecutorComponent
)
from tests.base import BaseTestCase, iterate_timeout
from tests.base import BaseTestCase, HoldableExecutorApi, iterate_timeout
class ZooKeeperBaseTestCase(BaseTestCase):
@ -311,17 +311,6 @@ class TestComponentRegistry(ZooKeeperBaseTestCase):
self.assertComponentState("executor", BaseComponent.RUNNING)
class HoldableExecutorApi(ExecutorApi):
hold_in_queue = False
@property
def initial_state(self):
# This supports holding build requests in tests
if self.hold_in_queue:
return BuildRequest.HOLD
return BuildRequest.REQUESTED
class TestExecutorApi(ZooKeeperBaseTestCase):
def _get_zk_tree(self, root):
items = []
@ -352,7 +341,7 @@ class TestExecutorApi(ZooKeeperBaseTestCase):
build_event_callback=eq_put)
# Scheduler submits request
client.submit("A", "tenant", "pipeline", {}, None)
client.submit("A", "tenant", "pipeline", {}, None, '1')
request_queue.get(timeout=30)
# Executor receives request
@ -435,7 +424,7 @@ class TestExecutorApi(ZooKeeperBaseTestCase):
build_event_callback=eq_put)
# Scheduler submits request
client.submit("A", "tenant", "pipeline", {}, None)
client.submit("A", "tenant", "pipeline", {}, None, '1')
request_queue.get(timeout=30)
# Executor receives request
@ -487,7 +476,7 @@ class TestExecutorApi(ZooKeeperBaseTestCase):
build_event_callback=eq_put)
# Scheduler submits request
a_path = client.submit("A", "tenant", "pipeline", {}, None)
a_path = client.submit("A", "tenant", "pipeline", {}, None, '1')
request_queue.get(timeout=30)
# Executor receives nothing
@ -523,7 +512,7 @@ class TestExecutorApi(ZooKeeperBaseTestCase):
client = ExecutorApi(self.zk_client)
# Scheduler submits request
a_path = client.submit("A", "tenant", "pipeline", {}, None)
a_path = client.submit("A", "tenant", "pipeline", {}, None, '1')
sched_a = client.get(a_path)
# Simulate the server side
@ -542,11 +531,15 @@ class TestExecutorApi(ZooKeeperBaseTestCase):
# requests
executor_api = ExecutorApi(self.zk_client)
executor_api.submit("A", "tenant", "pipeline", {}, "zone")
path_b = executor_api.submit("B", "tenant", "pipeline", {}, None)
path_c = executor_api.submit("C", "tenant", "pipeline", {}, "zone")
path_d = executor_api.submit("D", "tenant", "pipeline", {}, "zone")
path_e = executor_api.submit("E", "tenant", "pipeline", {}, "zone")
executor_api.submit("A", "tenant", "pipeline", {}, "zone", '1')
path_b = executor_api.submit("B", "tenant", "pipeline", {},
None, '1')
path_c = executor_api.submit("C", "tenant", "pipeline", {},
"zone", '1')
path_d = executor_api.submit("D", "tenant", "pipeline", {},
"zone", '1')
path_e = executor_api.submit("E", "tenant", "pipeline", {},
"zone", '1')
b = executor_api.get(path_b)
c = executor_api.get(path_c)
@ -603,7 +596,7 @@ class TestExecutorApi(ZooKeeperBaseTestCase):
# Simulate the client side
client = ExecutorApi(self.zk_client)
client.submit("A", "tenant", "pipeline", {}, None)
client.submit("A", "tenant", "pipeline", {}, None, '1')
# Simulate the server side
server = ExecutorApi(self.zk_client,

View File

@ -12,111 +12,26 @@
# License for the specific language governing permissions and limitations
# under the License.
import gear
import json
import logging
import time
import threading
from uuid import uuid4
import zuul.executor.common
from zuul.lib.config import get_default
from zuul.lib.gear_utils import getGearmanFunctions
from zuul.lib.jsonutil import json_dumps
from zuul.lib.logutil import get_annotated_logger
from zuul.model import (
Build,
BuildCompletedEvent,
BuildRequest,
BuildStartedEvent,
PRECEDENCE_HIGH,
PRECEDENCE_LOW,
PRECEDENCE_NORMAL,
PRIORITY_MAP,
)
from zuul.zk.event_queues import PipelineResultEventQueue
class GearmanCleanup(threading.Thread):
""" A thread that checks to see if outstanding builds have
completed without reporting back. """
log = logging.getLogger("zuul.GearmanCleanup")
def __init__(self, gearman):
threading.Thread.__init__(self)
self.daemon = True
self.gearman = gearman
self.wake_event = threading.Event()
self._stopped = False
def stop(self):
self._stopped = True
self.wake_event.set()
def run(self):
while True:
self.wake_event.wait(300)
if self._stopped:
return
try:
self.gearman.lookForLostBuilds()
except Exception:
self.log.exception("Exception checking builds:")
def getJobData(job):
if not len(job.data):
return {}
d = job.data[-1]
if not d:
return {}
return json.loads(d)
class ZuulGearmanClient(gear.Client):
def __init__(self, zuul_gearman):
super(ZuulGearmanClient, self).__init__('Zuul Executor Client')
self.__zuul_gearman = zuul_gearman
def handleWorkComplete(self, packet):
job = super(ZuulGearmanClient, self).handleWorkComplete(packet)
self.__zuul_gearman.onBuildCompleted(job)
return job
def handleWorkFail(self, packet):
job = super(ZuulGearmanClient, self).handleWorkFail(packet)
self.__zuul_gearman.onBuildCompleted(job)
return job
def handleWorkException(self, packet):
job = super(ZuulGearmanClient, self).handleWorkException(packet)
self.__zuul_gearman.onBuildCompleted(job)
return job
def handleWorkStatus(self, packet):
job = super(ZuulGearmanClient, self).handleWorkStatus(packet)
self.__zuul_gearman.onWorkStatus(job)
return job
def handleWorkData(self, packet):
job = super(ZuulGearmanClient, self).handleWorkData(packet)
self.__zuul_gearman.onWorkStatus(job)
return job
def handleDisconnect(self, job):
job = super(ZuulGearmanClient, self).handleDisconnect(job)
self.__zuul_gearman.onDisconnect(job)
def handleStatusRes(self, packet):
try:
super(ZuulGearmanClient, self).handleStatusRes(packet)
except gear.UnknownJobError:
handle = packet.getArgument(0)
for build in self.__zuul_gearman.builds.values():
if build._gearman_job.handle == handle:
self.__zuul_gearman.onUnknownJob(build)
from zuul.zk.executor import ExecutorApi
class ExecutorClient(object):
log = logging.getLogger("zuul.ExecutorClient")
_executor_api_class = ExecutorApi
def __init__(self, config, sched):
self.config = config
@ -124,29 +39,13 @@ class ExecutorClient(object):
self.builds = {}
self.meta_jobs = {} # A list of meta-jobs like stop or describe
self.executor_api = self._executor_api_class(self.sched.zk_client)
self.result_events = PipelineResultEventQueue.createRegistry(
self.sched.zk_client
)
server = config.get('gearman', 'server')
port = get_default(self.config, 'gearman', 'port', 4730)
ssl_key = get_default(self.config, 'gearman', 'ssl_key')
ssl_cert = get_default(self.config, 'gearman', 'ssl_cert')
ssl_ca = get_default(self.config, 'gearman', 'ssl_ca')
self.gearman = ZuulGearmanClient(self)
self.gearman.addServer(server, port, ssl_key, ssl_cert, ssl_ca,
keepalive=True, tcp_keepidle=60,
tcp_keepintvl=30, tcp_keepcnt=5)
self.cleanup_thread = GearmanCleanup(self)
self.cleanup_thread.start()
def stop(self):
self.log.debug("Stopping")
self.cleanup_thread.stop()
self.cleanup_thread.join()
self.gearman.shutdown()
self.log.debug("Stopped")
def execute(self, job, item, pipeline, dependent_changes=[],
merger_items=[]):
@ -192,15 +91,13 @@ class ExecutorClient(object):
completed_event
)
return build
return
# Update zuul attempts after addBuild above to ensure build_set
# is up to date.
attempts = build.build_set.getTries(job.name)
params["zuul"]['attempts'] = attempts
functions = getGearmanFunctions(self.gearman)
function_name = 'executor:execute'
# Because all nodes belong to the same provider, region and
# availability zone we can get executor_zone from only the first
# node.
@ -209,50 +106,32 @@ class ExecutorClient(object):
executor_zone = params[
"nodes"][0]['attributes'].get('executor-zone')
zone_known = False
if executor_zone:
_fname = '%s:%s' % (
function_name,
executor_zone)
if _fname in functions:
function_name = _fname
else:
# Check the component registry for executors subscribed to this
# zone
for comp in self.sched.component_registry.all(kind="executor"):
if comp.zone == executor_zone:
zone_known = True
break
if not zone_known:
self.log.warning(
"Job requested '%s' zuul-executor zone, but no "
"zuul-executors found for this zone; ignoring zone "
"request" % executor_zone)
"request", executor_zone)
# Fall back to the default zone
executor_zone = None
gearman_job = gear.TextJob(
function_name, json_dumps(params), unique=uuid)
build._gearman_job = gearman_job
build.__gearman_worker = None
if pipeline.precedence == PRECEDENCE_NORMAL:
precedence = gear.PRECEDENCE_NORMAL
elif pipeline.precedence == PRECEDENCE_HIGH:
precedence = gear.PRECEDENCE_HIGH
elif pipeline.precedence == PRECEDENCE_LOW:
precedence = gear.PRECEDENCE_LOW
try:
self.gearman.submitJob(gearman_job, precedence=precedence,
timeout=300)
except Exception:
log.exception("Unable to submit job to Gearman")
# TODO (felix): Directly put the result event in the queue
self.onBuildCompleted(gearman_job, 'EXCEPTION')
return build
if not gearman_job.handle:
log.error("No job handle was received for %s after"
" 300 seconds; marking as lost.",
gearman_job)
# TODO (felix): Directly put the result event in the queue
self.onBuildCompleted(gearman_job, 'NO_HANDLE')
log.debug("Received handle %s for %s", gearman_job.handle, build)
return build
build.build_request_ref = self.executor_api.submit(
uuid=uuid,
tenant_name=build.build_set.item.pipeline.tenant.name,
pipeline_name=build.build_set.item.pipeline.name,
params=params,
zone=executor_zone,
event_id=item.event.zuul_event_id,
precedence=PRIORITY_MAP[pipeline.precedence],
)
def cancel(self, build):
log = get_annotated_logger(self.log, build.zuul_event_id,
@ -261,137 +140,77 @@ class ExecutorClient(object):
log.info("Cancel build %s for job %s", build, build.job)
build.canceled = True
try:
job = build._gearman_job # noqa
except AttributeError:
log.debug("Build has no associated gearman job")
if not build.build_request_ref:
log.debug("Build has not been submitted to ZooKeeper")
return False
if build.__gearman_worker is not None:
log.debug("Build has already started")
self.cancelRunningBuild(build)
log.debug("Canceled running build")
return True
else:
log.debug("Build has not started yet")
build_request = self.executor_api.get(build.build_request_ref)
if build_request:
log.debug("Canceling build request %s", build_request)
# If we can acquire the build request lock here, the build wasn't
# picked up by any executor server yet. With acquiring the lock
# we prevent the executor server from picking up the build so we
# can cancel it before it will run.
if self.executor_api.lock(build_request, blocking=False):
log.debug(
"Canceling build %s directly because it is not locked by "
"any executor",
build_request,
)
# Mark the build request as complete and forward the event to
# the scheduler, so the executor server doesn't pick up the
# request. The build will be deleted from the scheduler when it
# picks up the BuildCompletedEvent.
try:
build_request.state = BuildRequest.COMPLETED
self.executor_api.update(build_request)
log.debug("Looking for build in queue")
if self.cancelJobInQueue(build):
log.debug("Removed build from queue")
result = {"result": "CANCELED", "end_time": time.time()}
tenant_name = build.build_set.item.pipeline.tenant.name
pipeline_name = build.build_set.item.pipeline.name
event = BuildCompletedEvent(build_request.uuid, result)
self.result_events[tenant_name][pipeline_name].put(event)
finally:
self.executor_api.unlock(build_request)
else:
log.debug(
"Sending cancel request for build %s because it is locked",
build_request,
)
# If the build request is locked, schedule a cancel request in
# the executor server.
self.executor_api.requestCancel(build_request)
log.debug("Canceled build")
return True
return False
def resumeBuild(self, build: Build) -> bool:
log = get_annotated_logger(self.log, build.zuul_event_id)
if not build.build_request_ref:
log.debug("Build has not been submitted")
return False
time.sleep(1)
log.debug("Still unable to find build to cancel")
if build.__gearman_worker is not None:
log.debug("Build has just started")
self.cancelRunningBuild(build)
log.debug("Canceled running build")
return True
log.error("Unable to cancel build")
def onBuildCompleted(self, job, result=None):
if job.unique in self.meta_jobs:
del self.meta_jobs[job.unique]
return
# TODO (felix): Remove this once the builds are executed via ZooKeeper.
# It's currently necessary to set the correct private attribute on the
# build for the gearman worker.
def setWorkerInfo(self, build, data):
# Update information about worker
build.worker.updateFromData(data)
build.__gearman_worker = build.worker.name
def onWorkStatus(self, job):
data = getJobData(job)
self.log.debug("Build %s update %s" % (job, data))
def onDisconnect(self, job):
self.log.info("Gearman job %s lost due to disconnect" % job)
self.onBuildCompleted(job, 'DISCONNECT')
build = self.builds.get(job.unique)
if build:
tenant_name = build.build_set.item.pipeline.tenant.name
pipeline_name = build.build_set.item.pipeline.name
result = {"result": "DISCONNECT", "end_time": time.time()}
event = BuildCompletedEvent(build.uuid, result)
self.result_events[tenant_name][pipeline_name].put(event)
def onUnknownJob(self, build):
self.log.info("Gearman job for build %s lost "
"due to unknown handle" % build)
# We don't need to call onBuildCompleted, because by
# definition, we have no record of the job.
tenant_name = build.build_set.item.pipeline.tenant.name
pipeline_name = build.build_set.item.pipeline.name
result = {"result": "LOST", "end_time": time.time()}
event = BuildCompletedEvent(build.uuid, result)
self.result_events[tenant_name][pipeline_name].put(event)
def cancelJobInQueue(self, build):
log = get_annotated_logger(self.log, build.zuul_event_id,
build=build.uuid)
job = build._gearman_job
req = gear.CancelJobAdminRequest(job.handle)
job.connection.sendAdminRequest(req, timeout=300)
log.debug("Response to cancel build request: %s", req.response.strip())
if req.response.startswith(b"OK"):
# Since this isn't otherwise going to get a build complete
# event, send one to the scheduler so that it can unlock
# the nodes.
tenant_name = build.build_set.item.pipeline.tenant.name
pipeline_name = build.build_set.item.pipeline.name
result = {"result": "CANCELED", "end_time": time.time()}
event = BuildCompletedEvent(build.uuid, result)
self.result_events[tenant_name][pipeline_name].put(event)
build_request = self.executor_api.get(build.build_request_ref)
if build_request:
log.debug("Requesting resume for build %s", build)
self.executor_api.requestResume(build_request)
return True
return False
def cancelRunningBuild(self, build):
def removeBuild(self, build: Build) -> None:
log = get_annotated_logger(self.log, build.zuul_event_id)
if not build.__gearman_worker:
log.error("Build %s has no manager while canceling", build)
stop_uuid = str(uuid4().hex)
data = dict(uuid=build._gearman_job.unique,
zuul_event_id=build.zuul_event_id)
stop_job = gear.TextJob("executor:stop:%s" % build.__gearman_worker,
json_dumps(data), unique=stop_uuid)
self.meta_jobs[stop_uuid] = stop_job
log.debug("Submitting stop job: %s", stop_job)
self.gearman.submitJob(stop_job, precedence=gear.PRECEDENCE_HIGH,
timeout=300)
return True
log.debug("Removing build %s", build.uuid)
def resumeBuild(self, build):
log = get_annotated_logger(self.log, build.zuul_event_id)
if not build.__gearman_worker:
log.error("Build %s has no manager while resuming", build)
resume_uuid = str(uuid4().hex)
data = dict(uuid=build._gearman_job.unique,
zuul_event_id=build.zuul_event_id)
stop_job = gear.TextJob("executor:resume:%s" % build.__gearman_worker,
json_dumps(data), unique=resume_uuid)
self.meta_jobs[resume_uuid] = stop_job
log.debug("Submitting resume job: %s", stop_job)
self.gearman.submitJob(stop_job, precedence=gear.PRECEDENCE_HIGH,
timeout=300)
if not build.build_request_ref:
log.debug("Build has not been submitted to ZooKeeper")
return
def lookForLostBuilds(self):
self.log.debug("Looking for lost builds")
# Construct a list from the values iterator to protect from it changing
# out from underneath us.
for build in list(self.builds.values()):
if build.result:
# The build has finished, it will be removed
continue
job = build._gearman_job
if not job.handle:
# The build hasn't been enqueued yet
continue
p = gear.Packet(gear.constants.REQ, gear.constants.GET_STATUS,
job.handle)
job.connection.sendPacket(p)
build_request = self.executor_api.get(build.build_request_ref)
if build_request:
self.executor_api.remove(build_request)
del self.builds[build.uuid]

View File

@ -30,13 +30,12 @@ import threading
import time
import traceback
from concurrent.futures.process import ProcessPoolExecutor, BrokenProcessPool
from typing import Dict
import git
from kazoo.exceptions import BadVersionError
from urllib.parse import urlsplit
from zuul.lib.ansible import AnsibleManager
from zuul.lib.gearworker import ZuulGearWorker
from zuul.lib.result_data import get_warnings_from_result_data
from zuul.lib import yamlutil as yaml
from zuul.lib.config import get_default
@ -59,11 +58,17 @@ from zuul.executor.sensors.ram import RAMSensor
from zuul.lib import commandsocket
from zuul.merger.server import BaseMergeServer, RepoLocks
from zuul.model import (
BuildCompletedEvent, BuildPausedEvent, BuildStartedEvent, BuildStatusEvent
BuildCompletedEvent,
BuildPausedEvent,
BuildRequest,
BuildStartedEvent,
BuildStatusEvent,
)
import zuul.model
from zuul.zk.event_queues import PipelineResultEventQueue
from zuul.zk.components import ExecutorComponent
from zuul.zk.exceptions import BuildRequestNotFound
from zuul.zk.executor import BuildRequestEvent, ExecutorApi
BUFFER_LINES_FOR_SYNTAX = 200
COMMANDS = ['stop', 'pause', 'unpause', 'graceful', 'verbose',
@ -872,10 +877,10 @@ class AnsibleJob(object):
RESULT_DISK_FULL: 'RESULT_DISK_FULL',
}
def __init__(self, executor_server, job):
def __init__(self, executor_server, build_request, arguments):
logger = logging.getLogger("zuul.AnsibleJob")
self.arguments = json.loads(job.arguments)
self.zuul_event_id = self.arguments.get('zuul_event_id')
self.arguments = arguments
self.zuul_event_id = self.arguments["zuul_event_id"]
# Record ansible version being used for the cleanup phase
self.ansible_version = self.arguments.get('ansible_version')
# TODO(corvus): Remove default setting after 4.3.0; this is to handle
@ -883,9 +888,10 @@ class AnsibleJob(object):
self.scheme = self.arguments.get('workspace_scheme',
zuul.model.SCHEME_GOLANG)
self.log = get_annotated_logger(
logger, self.zuul_event_id, build=job.unique)
logger, self.zuul_event_id, build=build_request.uuid
)
self.executor_server = executor_server
self.job = job
self.build_request = build_request
self.jobdir = None
self.proc = None
self.proc_lock = threading.Lock()
@ -917,7 +923,7 @@ class AnsibleJob(object):
'executor',
'winrm_read_timeout_sec')
self.ssh_agent = SshAgent(zuul_event_id=self.zuul_event_id,
build=self.job.unique)
build=self.build_request.uuid)
self.port_forwards = []
self.executor_variables_file = None
@ -954,7 +960,7 @@ class AnsibleJob(object):
def run(self):
self.running = True
self.thread = threading.Thread(target=self.execute,
name='build-%s' % self.job.unique)
name=f"build-{self.build_request.uuid}")
self.thread.start()
def stop(self, reason=None):
@ -982,7 +988,7 @@ class AnsibleJob(object):
data = {'paused': self.paused,
'data': result_data,
'secret_data': secret_result_data}
self.executor_server.pauseBuild(self.job, data)
self.executor_server.pauseBuild(self.build_request, data)
self._resume_event.wait()
def resume(self):
@ -1000,6 +1006,7 @@ class AnsibleJob(object):
"{now} |\n".format(now=datetime.datetime.now()))
self.paused = False
self.executor_server.resumeBuild(self.build_request)
self._resume_event.set()
def wait(self):
@ -1011,7 +1018,9 @@ class AnsibleJob(object):
self.time_starting_build = time.monotonic()
# report that job has been taken
self.executor_server.startBuild(self.job, self._base_job_data())
self.executor_server.startBuild(
self.build_request, self._base_job_data()
)
self.ssh_agent.start()
self.ssh_agent.add(self.private_key_file)
@ -1024,7 +1033,7 @@ class AnsibleJob(object):
self.ssh_agent.addData(name, private_ssh_key)
self.jobdir = JobDir(self.executor_server.jobdir_root,
self.executor_server.keep_jobdir,
str(self.job.unique))
str(self.build_request.uuid))
self._execute()
except BrokenProcessPool:
# The process pool got broken, re-initialize it and send
@ -1035,11 +1044,11 @@ class AnsibleJob(object):
except ExecutorError as e:
result_data = dict(result='ERROR', error_detail=e.args[0])
self.log.debug("Sending result: %s", result_data)
self.executor_server.completeBuild(self.job, result_data)
self.executor_server.completeBuild(self.build_request, result_data)
except Exception:
self.log.exception("Exception while executing job")
data = {"exception": traceback.format_exc()}
self.executor_server.completeBuild(self.job, data)
self.executor_server.completeBuild(self.build_request, data)
finally:
self.running = False
if self.jobdir:
@ -1058,7 +1067,7 @@ class AnsibleJob(object):
except Exception:
self.log.exception("Error stopping port forward:")
try:
self.executor_server.finishJob(self.job.unique)
self.executor_server.finishJob(self.build_request.uuid)
except Exception:
self.log.exception("Error finalizing job thread:")
self.log.info("Job execution took: %.3f seconds" % (
@ -1078,7 +1087,7 @@ class AnsibleJob(object):
def _send_aborted(self):
result = dict(result='ABORTED')
self.executor_server.completeBuild(self.job, result)
self.executor_server.completeBuild(self.build_request, result)
def _execute(self):
args = self.arguments
@ -1103,7 +1112,7 @@ class AnsibleJob(object):
project['connection'], project['name'],
repo_state=repo_state,
zuul_event_id=self.zuul_event_id,
build=self.job.unique))
build=self.build_request.uuid))
projects.add((project['connection'], project['name']))
# ...as well as all playbook and role projects.
@ -1122,7 +1131,7 @@ class AnsibleJob(object):
tasks.append(self.executor_server.update(
*key, repo_state=repo_state,
zuul_event_id=self.zuul_event_id,
build=self.job.unique))
build=self.build_request.uuid))
projects.add(key)
for task in tasks:
@ -1276,13 +1285,13 @@ class AnsibleJob(object):
data['url'] = "finger://{hostname}:{port}/{uuid}".format(
hostname=self.executor_server.hostname,
port=self.executor_server.log_streaming_port,
uuid=self.job.unique)
uuid=self.build_request.uuid)
else:
data['url'] = 'finger://{hostname}/{uuid}'.format(
hostname=self.executor_server.hostname,
uuid=self.job.unique)
uuid=self.build_request.uuid)
self.executor_server.updateBuildStatus(self.job, data)
self.executor_server.updateBuildStatus(self.build_request, data)
result = self.runPlaybooks(args)
success = result == 'SUCCESS'
@ -1305,7 +1314,7 @@ class AnsibleJob(object):
secret_data=secret_data)
# TODO do we want to log the secret data here?
self.log.debug("Sending result: %s", result_data)
self.executor_server.completeBuild(self.job, result_data)
self.executor_server.completeBuild(self.build_request, result_data)
def getResultData(self):
data = {}
@ -1408,14 +1417,14 @@ class AnsibleJob(object):
# can't fetch them, it should resolve itself.
self.log.exception("Could not fetch refs to merge from remote")
result = dict(result='ABORTED')
self.executor_server.completeBuild(self.job, result)
self.executor_server.completeBuild(self.build_request, result)
return None
if not ret: # merge conflict
result = dict(result='MERGER_FAILURE')
if self.executor_server.statsd:
base_key = "zuul.executor.{hostname}.merger"
self.executor_server.statsd.incr(base_key + ".FAILURE")
self.executor_server.completeBuild(self.job, result)
self.executor_server.completeBuild(self.build_request, result)
return None
if self.executor_server.statsd:
@ -2231,7 +2240,7 @@ class AnsibleJob(object):
zuul_resources[node['name'][0]]['pod'] = data['pod']
fwd = KubeFwd(zuul_event_id=self.zuul_event_id,
build=self.job.unique,
build=self.build_request.uuid,
kubeconfig=self.jobdir.kubeconfig,
context=data['context_name'],
namespace=data['namespace'],
@ -2599,7 +2608,7 @@ class AnsibleJob(object):
try:
ansible_log = get_annotated_logger(
logging.getLogger("zuul.AnsibleJob.output"),
self.zuul_event_id, build=self.job.unique)
self.zuul_event_id, build=self.build_request.uuid)
# Use manual idx instead of enumerate so that RESULT lines
# don't count towards BUFFER_LINES_FOR_SYNTAX
@ -2933,21 +2942,6 @@ class ExecutorMergeWorker(gear.TextWorker):
super(ExecutorMergeWorker, self).handleNoop(packet)
class ExecutorExecuteWorker(gear.TextWorker):
def __init__(self, executor_server, *args, **kw):
self.zuul_executor_server = executor_server
super(ExecutorExecuteWorker, self).__init__(*args, **kw)
def handleNoop(self, packet):
# Delay our response to running a new job based on the number
# of jobs we're currently running, in an attempt to spread
# load evenly among executors.
workers = len(self.zuul_executor_server.job_workers)
delay = (workers ** 2) / 1000.0
time.sleep(delay)
return super(ExecutorExecuteWorker, self).handleNoop(packet)
class ExecutorServer(BaseMergeServer):
log = logging.getLogger("zuul.ExecutorServer")
_ansible_manager_class = AnsibleManager
@ -2967,11 +2961,11 @@ class ExecutorServer(BaseMergeServer):
self.keep_jobdir = keep_jobdir
self.jobdir_root = jobdir_root
self.keystore = ZooKeeperKeyStorage(
self.zk_client,
password=self._get_key_store_password())
self._running = False
self._command_running = False
# TODOv3(mordred): make the executor name more unique --
# perhaps hostname+pid.
self.hostname = get_default(self.config, 'executor', 'hostname',
@ -3113,26 +3107,26 @@ class ExecutorServer(BaseMergeServer):
self.component_info.process_merge_jobs = self.process_merge_jobs
self.result_events = PipelineResultEventQueue.createRegistry(
self.zk_client
self.zk_client)
self.build_worker = threading.Thread(
target=self.runBuildWorker,
name="ExecutorServerBuildWorkerThread",
)
self.executor_jobs = {
"executor:resume:%s" % self.hostname: self.resumeJob,
"executor:stop:%s" % self.hostname: self.stopJob,
}
for function_name in self._getExecuteFunctionNames():
self.executor_jobs[function_name] = self.executeJob
for function_name in self._getOnlineFunctionNames():
self.executor_jobs[function_name] = self.noop
self.build_loop_wake_event = threading.Event()
self.executor_gearworker = ZuulGearWorker(
'Zuul Executor Server',
'zuul.ExecutorServer.ExecuteWorker',
'executor',
self.config,
self.executor_jobs,
worker_class=ExecutorExecuteWorker,
worker_args=[self])
zone_filter = [self.zone]
if self.allow_unzoned:
# In case we are allowed to execute unzoned jobs, make sure, we are
# subscribed to the default zone.
zone_filter.append(None)
self.executor_api = ExecutorApi(
self.zk_client,
zone_filter=zone_filter,
build_request_callback=self.build_loop_wake_event.set,
build_event_callback=self._handleBuildEvent,
)
# Used to offload expensive operations to different processes
self.process_worker = None
@ -3143,24 +3137,6 @@ class ExecutorServer(BaseMergeServer):
except KeyError:
raise RuntimeError("No key store password configured!")
def _getFunctionSuffixes(self):
suffixes = []
if self.zone:
suffixes.append(':' + self.zone)
if self.allow_unzoned:
suffixes.append('')
else:
suffixes.append('')
return suffixes
def _getExecuteFunctionNames(self):
base_name = 'executor:execute'
return [base_name + suffix for suffix in self._getFunctionSuffixes()]
def _getOnlineFunctionNames(self):
base_name = 'executor:online'
return [base_name + suffix for suffix in self._getFunctionSuffixes()]
def _repoLock(self, connection_name, project_name):
return self.repo_locks.getRepoLock(connection_name, project_name)
@ -3195,7 +3171,7 @@ class ExecutorServer(BaseMergeServer):
self.log.warning('Multiprocessing context has already been set')
self.process_worker = ProcessPoolExecutor()
self.executor_gearworker.start()
self.build_worker.start()
self.log.debug("Starting command processor")
self.command_socket.start()
@ -3223,23 +3199,14 @@ class ExecutorServer(BaseMergeServer):
def register_work(self):
if self._running:
self.accepting_work = True
for function in self._getExecuteFunctionNames():
self.executor_gearworker.gearman.registerFunction(function)
# TODO(jeblair): Update geard to send a noop after
# registering for a job which is in the queue, then remove
# this API violation.
self.executor_gearworker.gearman._sendGrabJobUniq()
self.build_loop_wake_event.set()
def unregister_work(self):
self.accepting_work = False
for function in self._getExecuteFunctionNames():
self.executor_gearworker.gearman.unRegisterFunction(function)
def stop(self):
self.log.debug("Stopping")
self.component_info.state = self.component_info.STOPPED
# Use the BaseMergeServer's stop method to disconnect from ZooKeeper.
super().stop()
self.connections.stop()
self.disk_accountant.stop()
# The governor can change function registration, so make sure
@ -3249,7 +3216,6 @@ class ExecutorServer(BaseMergeServer):
# Stop accepting new jobs
if self.merger_gearworker is not None:
self.merger_gearworker.gearman.setFunctions([])
self.executor_gearworker.gearman.setFunctions([])
# Tell the executor worker to abort any jobs it just accepted,
# and grab the list of currently running job workers.
with self.run_lock:
@ -3280,7 +3246,8 @@ class ExecutorServer(BaseMergeServer):
# All job results should have been sent by now, shutdown the
# gearman workers.
self.executor_gearworker.stop()
self.build_loop_wake_event.set()
self.build_worker.join()
if self.process_worker is not None:
self.process_worker.shutdown()
@ -3291,6 +3258,10 @@ class ExecutorServer(BaseMergeServer):
self.statsd.gauge(base_key + '.pct_used_ram', 0)
self.statsd.gauge(base_key + '.running_builds', 0)
# Use the BaseMergeServer's stop method to disconnect from
# ZooKeeper. We do this as one of the last steps to ensure
# that all ZK related components can be stopped first.
super().stop()
self.stop_repl()
self.log.debug("Stopped")
@ -3300,7 +3271,8 @@ class ExecutorServer(BaseMergeServer):
update_thread.join()
if self.process_merge_jobs:
super().join()
self.executor_gearworker.join()
self.build_loop_wake_event.set()
self.build_worker.join()
self.command_thread.join()
def pause(self):
@ -3444,20 +3416,110 @@ class ExecutorServer(BaseMergeServer):
log.error(msg)
raise Exception(msg)
def executeJob(self, job):
args = json.loads(job.arguments)
zuul_event_id = args.get('zuul_event_id')
def executeJob(self, build_request, params):
zuul_event_id = params['zuul_event_id']
log = get_annotated_logger(self.log, zuul_event_id)
log.debug("Got %s job: %s", job.name, job.unique)
log.debug(
"Got %s job: %s",
params["zuul"]["job"],
build_request.uuid,
)
if self.statsd:
base_key = 'zuul.executor.{hostname}'
self.statsd.incr(base_key + '.builds')
self.job_workers[job.unique] = self._job_class(self, job)
self.job_workers[build_request.uuid] = self._job_class(
self, build_request, params
)
# Run manageLoad before starting the thread mostly for the
# benefit of the unit tests to make the calculation of the
# number of starting jobs more deterministic.
self.manageLoad()
self.job_workers[job.unique].run()
self.job_workers[build_request.uuid].run()
def _handleBuildEvent(self, build_request, build_event):
# TODO (felix): Would it harm to simply keep the cancel/resume node? If
# not we could avoid this ZK update. The cancel request can anyway
# only be fulfilled by the executor that executes the job. So, if
# that executor died, no other can pick up the request.
if build_event == BuildRequestEvent.CANCELED:
self.executor_api.fulfillCancel(build_request)
self.stopJob(build_request)
elif build_event == BuildRequestEvent.RESUMED:
self.executor_api.fulfillResume(build_request)
self.resumeJob(build_request)
elif build_event == BuildRequestEvent.DELETED:
self.stopJob(build_request)
def runBuildWorker(self):
while self._running:
self.build_loop_wake_event.wait()
self.build_loop_wake_event.clear()
for build_request in self.executor_api.next():
# Check the sensors again as they might have changed in the
# meantime. E.g. the last build started within the next()
# generator could have fulfilled the StartingBuildSensor.
if not self.accepting_work:
break
if not self._running:
break
try:
self._runBuildWorker(build_request)
except Exception:
log = get_annotated_logger(
self.log, event=None, build=build_request.uuid
)
log.exception("Exception while running job")
result = {
"result": "ERROR",
"exception": traceback.format_exc(),
}
self.completeBuild(build_request, result)
def _runBuildWorker(self, build_request: BuildRequest):
log = get_annotated_logger(
self.log, event=None, build=build_request.uuid
)
if not self.executor_api.lock(build_request, blocking=False):
return
build_request.state = BuildRequest.RUNNING
params = build_request.params
build_request.params = None
# Directly update the build in ZooKeeper, so we don't
# loop over and try to lock it again and again.
self.executor_api.update(build_request)
log.debug("Next executed job: %s", build_request)
self.executeJob(build_request, params)
def runCleanupWorker(self):
while self._running:
try:
self.cleanup_election.run(self._runCleanupWorker)
except Exception:
self.log.exception("Exception in cleanup worker")
def _runCleanupWorker(self):
while self._running:
for build_request in self.executor_api.lostBuildRequests():
try:
result = {"result": "CANCELED"}
self.completeBuild(build_request, result)
except BadVersionError:
# There could be a race condition:
# The build is found by lost_builds in state RUNNING
# but gets completed/unlocked before the is_locked()
# check. Since we use the znode version, the update
# will fail in this case and we can simply ignore the
# exception.
pass
if not self._running:
return
# TODO (felix): It should be enough to execute the cleanup every
# 60 minutes. Find a proper way to do that. Maybe we could combine
# this with other cleanups in the scheduler and use APScheduler for
# proper scheduling.
time.sleep(5)
def run_governor(self):
while not self.governor_stop_event.wait(10):
@ -3503,32 +3565,28 @@ class ExecutorServer(BaseMergeServer):
def finishJob(self, unique):
del(self.job_workers[unique])
self.log.debug(
"Finishing Job: %s, queue(%d): %s",
unique,
len(self.job_workers),
self.job_workers,
)
def stopJobDiskFull(self, jobdir):
unique = os.path.basename(jobdir)
self.stopJobByUnique(unique, reason=AnsibleJob.RESULT_DISK_FULL)
def resumeJob(self, job):
try:
args = json.loads(job.arguments)
zuul_event_id = args.get('zuul_event_id')
log = get_annotated_logger(self.log, zuul_event_id)
log.debug("Resume job with arguments: %s", args)
unique = args['uuid']
self.resumeJobByUnique(unique, zuul_event_id=zuul_event_id)
finally:
job.sendWorkComplete()
def resumeJob(self, build_request):
log = get_annotated_logger(self.log, build_request.event_id)
log.debug("Resume job")
self.resumeJobByUnique(
build_request.uuid, build_request.event_id
)
def stopJob(self, job):
try:
args = json.loads(job.arguments)
zuul_event_id = args.get('zuul_event_id')
log = get_annotated_logger(self.log, zuul_event_id)
log.debug("Stop job with arguments: %s", args)
unique = args['uuid']
self.stopJobByUnique(unique, zuul_event_id=zuul_event_id)
finally:
job.sendWorkComplete()
def stopJob(self, build_request):
log = get_annotated_logger(self.log, build_request.event_id)
log.debug("Stop job")
self.stopJobByUnique(build_request.uuid, build_request.event_id)
def resumeJobByUnique(self, unique, zuul_event_id=None):
log = get_annotated_logger(self.log, zuul_event_id)
@ -3552,62 +3610,57 @@ class ExecutorServer(BaseMergeServer):
except Exception:
log.exception("Exception sending stop command to worker:")
def startBuild(self, job: gear.TextJob, data: Dict) -> None:
# Mark the gearman job as started, as we are still using it for the
# actual job execution. The data, however, will be passed to the
# scheduler via the event queues in ZooKeeper.
job.sendWorkData(json.dumps(data))
def startBuild(self, build_request, data):
# TODO (felix): Once the builds are stored in ZooKeeper, we can store
# the start_time directly on the build. But for now we have to use the
# data dict for that.
data["start_time"] = time.time()
params = json.loads(job.arguments)
tenant_name = params["zuul"]["tenant"]
pipeline_name = params["zuul"]["pipeline"]
event = BuildStartedEvent(build_request.uuid, data)
self.result_events[build_request.tenant_name][
build_request.pipeline_name].put(event)
event = BuildStartedEvent(job.unique, data)
self.result_events[tenant_name][pipeline_name].put(event)
def updateBuildStatus(self, build_request, data):
event = BuildStatusEvent(build_request.uuid, data)
self.result_events[build_request.tenant_name][
build_request.pipeline_name].put(event)
def updateBuildStatus(self, job: gear.TextJob, data: Dict) -> None:
job.sendWorkData(json.dumps(data))
def pauseBuild(self, build_request, data):
build_request.state = BuildRequest.PAUSED
try:
self.executor_api.update(build_request)
except BuildRequestNotFound as e:
self.log.warning("Could not pause build: %s", str(e))
return
params = json.loads(job.arguments)
tenant_name = params["zuul"]["tenant"]
pipeline_name = params["zuul"]["pipeline"]
event = BuildPausedEvent(build_request.uuid, data)
self.result_events[build_request.tenant_name][
build_request.pipeline_name].put(event)
event = BuildStatusEvent(job.unique, data)
self.result_events[tenant_name][pipeline_name].put(event)
def pauseBuild(self, job: gear.TextJob, data: Dict) -> None:
# Mark the gearman job as paused, as we are still using it for the
# actual job execution. The data, however, will be passed to the
# scheduler via the event queues in ZooKeeper.
job.sendWorkData(json.dumps(data))
params = json.loads(job.arguments)
tenant_name = params["zuul"]["tenant"]
pipeline_name = params["zuul"]["pipeline"]
event = BuildPausedEvent(job.unique, data)
self.result_events[tenant_name][pipeline_name].put(event)
def completeBuild(self, job: gear.TextJob, result: Dict) -> None:
# Mark the gearman job as complete, as we are still using it for the
# actual job execution. The result, however, will be passed to the
# scheduler via the event queues in ZooKeeper.
# TODO (felix): Remove the data from the complete() call
job.sendWorkComplete(json.dumps(result))
def resumeBuild(self, build_request):
build_request.state = BuildRequest.RUNNING
try:
self.executor_api.update(build_request)
except BuildRequestNotFound as e:
self.log.warning("Could not resume build: %s", str(e))
return
def completeBuild(self, build_request, result):
# TODO (felix): Once the builds are stored in ZooKeeper, we can store
# the end_time directly on the build. But for now we have to use the
# result dict for that.
result["end_time"] = time.time()
params = json.loads(job.arguments)
tenant_name = params["zuul"]["tenant"]
pipeline_name = params["zuul"]["pipeline"]
build_request.state = BuildRequest.COMPLETED
try:
self.executor_api.update(build_request)
except BuildRequestNotFound as e:
self.log.warning("Could not complete build: %s", str(e))
return
event = BuildCompletedEvent(job.unique, result)
self.result_events[tenant_name][pipeline_name].put(event)
# Unlock the build request
self.executor_api.unlock(build_request)
event = BuildCompletedEvent(build_request.uuid, result)
self.result_events[build_request.tenant_name][
build_request.pipeline_name].put(event)

View File

@ -19,7 +19,6 @@ import copy
import json
import logging
import os
from itertools import chain
from functools import total_ordering
import re2
@ -1867,8 +1866,8 @@ class Job(ConfigObject):
project_canonical_names = set()
project_canonical_names.update(self.required_projects.keys())
project_canonical_names.update(self._projectsFromPlaybooks(
chain(self.pre_run, [self.run[0]], self.post_run,
self.cleanup_run), with_implicit=True))
itertools.chain(self.pre_run, [self.run[0]], self.post_run,
self.cleanup_run), with_implicit=True))
projects = list()
for project_canonical_name in project_canonical_names:
@ -2061,7 +2060,7 @@ class BuildRequest:
ALL_STATES = (REQUESTED, HOLD, RUNNING, PAUSED, COMPLETED)
def __init__(self, uuid, state, precedence, params, zone,
tenant_name, pipeline_name):
tenant_name, pipeline_name, event_id):
self.uuid = uuid
self.state = state
self.precedence = precedence
@ -2069,6 +2068,7 @@ class BuildRequest:
self.zone = zone
self.tenant_name = tenant_name
self.pipeline_name = pipeline_name
self.event_id = event_id
# ZK related data
self.path = None
@ -2084,6 +2084,7 @@ class BuildRequest:
"zone": self.zone,
"tenant_name": self.tenant_name,
"pipeline_name": self.pipeline_name,
"event_id": self.event_id,
}
@classmethod
@ -2096,6 +2097,7 @@ class BuildRequest:
data["zone"],
data["tenant_name"],
data["pipeline_name"],
data["event_id"],
)
return build_request

View File

@ -28,9 +28,7 @@ import urllib
from kazoo.exceptions import NotEmptyError
from zuul import configloader
from zuul import model
from zuul import exceptions
from zuul import configloader, exceptions
from zuul import version as zuul_version
from zuul import rpclistener
from zuul.lib import commandsocket
@ -53,6 +51,7 @@ from zuul.model import (
BuildPausedEvent,
BuildStartedEvent,
BuildStatusEvent,
Change,
ChangeManagementEvent,
DequeueEvent,
EnqueueEvent,
@ -64,6 +63,7 @@ from zuul.model import (
ReconfigureEvent,
SmartReconfigureEvent,
TenantReconfigureEvent,
TimeDataBase,
UnparsedAbideConfig,
)
from zuul.zk import ZooKeeperClient
@ -111,6 +111,7 @@ class Scheduler(threading.Thread):
_stats_interval = 30
_cleanup_interval = 60 * 60
_merger_client_class = MergeClient
_executor_client_class = ExecutorClient
# Number of seconds past node expiration a hold request will remain
EXPIRED_HOLD_REQUEST_TTL = 24 * 60 * 60
@ -189,7 +190,7 @@ class Scheduler(threading.Thread):
if not testonly:
time_dir = self._get_time_database_dir()
self.time_database = model.TimeDataBase(time_dir)
self.time_database = TimeDataBase(time_dir)
command_socket = get_default(
self.config, 'scheduler', 'command_socket',
@ -218,7 +219,7 @@ class Scheduler(threading.Thread):
default_version=default_ansible_version)
if not testonly:
self.executor = ExecutorClient(self.config, self)
self.executor = self._executor_client_class(self.config, self)
self.merger = self._merger_client_class(self.config, self)
self.nodepool = nodepool.Nodepool(
self.zk_client, self.hostname, self.statsd, self)
@ -1204,7 +1205,7 @@ class Scheduler(threading.Thread):
for item in shared_queue.queue:
if item.change.project != change.project:
continue
if (isinstance(item.change, model.Change) and
if (isinstance(item.change, Change) and
item.change.number == change.number and
item.change.patchset == change.patchset) or\
(item.change.ref == change.ref):
@ -1577,10 +1578,10 @@ class Scheduler(threading.Thread):
return
build.start_time = event.data["start_time"]
# TODO (felix): Remove this once the builds are executed via ZooKeeper.
# It's currently necessary to set the correct private attribute on the
# build for the gearman worker.
self.executor.setWorkerInfo(build, event.data)
# Update information about worker
if event.data:
# Noop builds don't provide any event data
build.worker.updateFromData(event.data)
log = get_annotated_logger(self.log, build.zuul_event_id)
if build.build_set is not build.build_set.item.current_build_set:
@ -1816,7 +1817,7 @@ class Scheduler(threading.Thread):
# If the build was canceled, we did actively cancel the job so
# don't overwrite the result and don't retry.
if build.canceled:
result = build.result
result = build.result or "CANCELED"
build.retry = False
build.end_time = event_result["end_time"]
@ -1845,7 +1846,7 @@ class Scheduler(threading.Thread):
# The test suite expects the build to be removed from the
# internal dict after it's added to the report queue.
del self.executor.builds[build.uuid]
self.executor.removeBuild(build)
if build.build_set is not build.build_set.item.current_build_set:
log.debug("Build %s is not in the current build set", build)

View File

@ -213,7 +213,7 @@ class ExecutorApi(ZooKeeperSimpleBase):
yield from self.inState(BuildRequest.REQUESTED)
def submit(self, uuid, tenant_name, pipeline_name, params, zone,
precedence=200):
event_id, precedence=200):
log = get_annotated_logger(self.log, event=None, build=uuid)
path = "/".join([self._getZoneRoot(zone), uuid])
@ -226,6 +226,7 @@ class ExecutorApi(ZooKeeperSimpleBase):
zone,
tenant_name,
pipeline_name,
event_id,
)
log.debug("Submitting build request to ZooKeeper %s", build_request)