Execute builds via ZooKeeper
This is the second part of I5de26afdf6774944b35472e2054b93d12fe21793. It uses the executor api. Three tests are disabled until the next change. Change-Id: Ie08fa9dfb4bb3adb9a02e0a2e8b11309e1ec27cd
This commit is contained in:
parent
fb6b8fb439
commit
6ac14615a0
|
@ -35,7 +35,6 @@ which is described below.
|
|||
Web [href="#web-server"]
|
||||
|
||||
Merger -- Gearman
|
||||
Executor -- Gearman
|
||||
Executor -- Statsd
|
||||
Web -- Database
|
||||
Web -- Gearman
|
||||
|
|
246
tests/base.py
246
tests/base.py
|
@ -68,7 +68,9 @@ import paramiko
|
|||
import prometheus_client.exposition
|
||||
|
||||
from zuul.driver.sql.sqlconnection import DatabaseSession
|
||||
from zuul.model import Change
|
||||
from zuul.model import (
|
||||
BuildRequest, Change, PRECEDENCE_NORMAL, WebInfo
|
||||
)
|
||||
from zuul.rpcclient import RPCClient
|
||||
|
||||
from zuul.driver.zuul import ZuulDriver
|
||||
|
@ -89,9 +91,9 @@ from zuul.lib.collections import DefaultKeyDict
|
|||
from zuul.lib.connections import ConnectionRegistry
|
||||
from zuul.zk import ZooKeeperClient
|
||||
from zuul.zk.event_queues import ConnectionEventQueue
|
||||
from zuul.zk.executor import ExecutorApi
|
||||
from psutil import Popen
|
||||
|
||||
import tests.fakegithub
|
||||
import zuul.driver.gerrit.gerritsource as gerritsource
|
||||
import zuul.driver.gerrit.gerritconnection as gerritconnection
|
||||
import zuul.driver.git.gitwatcher as gitwatcher
|
||||
|
@ -110,13 +112,14 @@ import zuul.lib.auth
|
|||
import zuul.merger.client
|
||||
import zuul.merger.merger
|
||||
import zuul.merger.server
|
||||
import zuul.model
|
||||
import zuul.nodepool
|
||||
import zuul.rpcclient
|
||||
import zuul.configloader
|
||||
from zuul.lib.config import get_default
|
||||
from zuul.lib.logutil import get_annotated_logger
|
||||
|
||||
import tests.fakegithub
|
||||
|
||||
FIXTURE_DIR = os.path.join(os.path.dirname(__file__), 'fixtures')
|
||||
|
||||
KEEP_TEMPDIRS = bool(os.environ.get('KEEP_TEMPDIRS', False))
|
||||
|
@ -2843,13 +2846,13 @@ class FakeStatsd(threading.Thread):
|
|||
class FakeBuild(object):
|
||||
log = logging.getLogger("zuul.test")
|
||||
|
||||
def __init__(self, executor_server, job):
|
||||
def __init__(self, executor_server, build_request, params):
|
||||
self.daemon = True
|
||||
self.executor_server = executor_server
|
||||
self.job = job
|
||||
self.build_request = build_request
|
||||
self.jobdir = None
|
||||
self.uuid = job.unique
|
||||
self.parameters = json.loads(job.arguments)
|
||||
self.uuid = build_request.uuid
|
||||
self.parameters = params
|
||||
# TODOv3(jeblair): self.node is really "the label of the node
|
||||
# assigned". We should rename it (self.node_label?) if we
|
||||
# keep using it like this, or we may end up exposing more of
|
||||
|
@ -3059,7 +3062,7 @@ class RecordingAnsibleJob(zuul.executor.server.AnsibleJob):
|
|||
|
||||
def recordResult(self, result):
|
||||
self.executor_server.lock.acquire()
|
||||
build = self.executor_server.job_builds.get(self.job.unique)
|
||||
build = self.executor_server.job_builds.get(self.build_request.uuid)
|
||||
if not build:
|
||||
self.executor_server.lock.release()
|
||||
# Already recorded
|
||||
|
@ -3073,11 +3076,11 @@ class RecordingAnsibleJob(zuul.executor.server.AnsibleJob):
|
|||
pipeline=build.parameters['zuul']['pipeline'])
|
||||
)
|
||||
self.executor_server.running_builds.remove(build)
|
||||
del self.executor_server.job_builds[self.job.unique]
|
||||
del self.executor_server.job_builds[self.build_request.uuid]
|
||||
self.executor_server.lock.release()
|
||||
|
||||
def runPlaybooks(self, args):
|
||||
build = self.executor_server.job_builds[self.job.unique]
|
||||
build = self.executor_server.job_builds[self.build_request.uuid]
|
||||
build.jobdir = self.jobdir
|
||||
|
||||
self.result = super(RecordingAnsibleJob, self).runPlaybooks(args)
|
||||
|
@ -3093,7 +3096,7 @@ class RecordingAnsibleJob(zuul.executor.server.AnsibleJob):
|
|||
|
||||
def runAnsible(self, cmd, timeout, playbook, ansible_version,
|
||||
wrapped=True, cleanup=False):
|
||||
build = self.executor_server.job_builds[self.job.unique]
|
||||
build = self.executor_server.job_builds[self.build_request.uuid]
|
||||
|
||||
if self.executor_server._run_ansible:
|
||||
# Call run on the fake build omitting the result so we also can
|
||||
|
@ -3121,12 +3124,12 @@ class RecordingAnsibleJob(zuul.executor.server.AnsibleJob):
|
|||
return hosts
|
||||
|
||||
def pause(self):
|
||||
build = self.executor_server.job_builds[self.job.unique]
|
||||
build = self.executor_server.job_builds[self.build_request.uuid]
|
||||
build.paused = True
|
||||
super().pause()
|
||||
|
||||
def resume(self):
|
||||
build = self.executor_server.job_builds.get(self.job.unique)
|
||||
build = self.executor_server.job_builds.get(self.build_request.uuid)
|
||||
if build:
|
||||
build.paused = False
|
||||
super().resume()
|
||||
|
@ -3143,13 +3146,99 @@ class RecordingMergeClient(zuul.merger.client.MergeClient):
|
|||
self.history = {}
|
||||
|
||||
def submitJob(self, name, data, build_set,
|
||||
precedence=zuul.model.PRECEDENCE_NORMAL, event=None):
|
||||
precedence=PRECEDENCE_NORMAL, event=None):
|
||||
self.history.setdefault(name, [])
|
||||
self.history[name].append((data, build_set))
|
||||
return super().submitJob(
|
||||
name, data, build_set, precedence, event=event)
|
||||
|
||||
|
||||
class HoldableExecutorApi(ExecutorApi):
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
self.hold_in_queue = False
|
||||
|
||||
@property
|
||||
def initial_state(self):
|
||||
if self.hold_in_queue:
|
||||
return BuildRequest.HOLD
|
||||
return BuildRequest.REQUESTED
|
||||
|
||||
|
||||
class TestingExecutorApi(HoldableExecutorApi):
|
||||
log = logging.getLogger("zuul.test.TestingExecutorApi")
|
||||
|
||||
def _test_getBuildsInState(self, *states):
|
||||
# As this method is used for assertions in the tests, it
|
||||
# should look up the build requests directly from ZooKeeper
|
||||
# and not from a cache layer.
|
||||
zones = []
|
||||
if self.zone_filter:
|
||||
zones = self.zone_filter
|
||||
else:
|
||||
try:
|
||||
# Get all available zones from ZooKeeper
|
||||
zones = self.kazoo_client.get_children(
|
||||
'/'.join([self.BUILD_REQUEST_ROOT, 'zones']))
|
||||
zones.append(None)
|
||||
except kazoo.exceptions.NoNodeError:
|
||||
zones = [None]
|
||||
|
||||
all_builds = []
|
||||
for zone in zones:
|
||||
try:
|
||||
zone_path = self._getZoneRoot(zone)
|
||||
builds = self.kazoo_client.get_children(zone_path)
|
||||
except kazoo.exceptions.NoNodeError:
|
||||
# Skip this zone as it doesn't have any builds
|
||||
continue
|
||||
|
||||
for build_uuid in builds:
|
||||
build = self.get("/".join([zone_path, build_uuid]))
|
||||
if build and (not states or build.state in states):
|
||||
all_builds.append(build)
|
||||
|
||||
all_builds.sort()
|
||||
return all_builds
|
||||
|
||||
def release(self, what=None):
|
||||
"""
|
||||
Releases a build request which was previously put on hold for testing.
|
||||
|
||||
The what parameter specifies what to release. This can be a concrete
|
||||
build request or a regular expression matching a job name.
|
||||
"""
|
||||
self.log.debug("Releasing builds matching %s", what)
|
||||
if isinstance(what, BuildRequest):
|
||||
self.log.debug("Releasing build %s", what)
|
||||
what.state = BuildRequest.REQUESTED
|
||||
self.update(what)
|
||||
return
|
||||
|
||||
for build_request in self._test_getBuildsInState(
|
||||
BuildRequest.HOLD):
|
||||
# Either release all build requests in HOLD state or the ones
|
||||
# matching the given job name pattern.
|
||||
if what is None or (
|
||||
build_request.params and
|
||||
re.match(what, build_request.params["job"])):
|
||||
self.log.debug("Releasing build %s", build_request)
|
||||
build_request.state = BuildRequest.REQUESTED
|
||||
self.update(build_request)
|
||||
|
||||
def queued(self):
|
||||
return self._test_getBuildsInState(
|
||||
BuildRequest.REQUESTED, BuildRequest.HOLD
|
||||
)
|
||||
|
||||
def all(self):
|
||||
return self._test_getBuildsInState()
|
||||
|
||||
|
||||
class HoldableExecutorClient(zuul.executor.client.ExecutorClient):
|
||||
_executor_api_class = HoldableExecutorApi
|
||||
|
||||
|
||||
class RecordingExecutorServer(zuul.executor.server.ExecutorServer):
|
||||
"""An Ansible executor to be used in tests.
|
||||
|
||||
|
@ -3243,25 +3332,22 @@ class RecordingExecutorServer(zuul.executor.server.ExecutorServer):
|
|||
self.log.debug("Done releasing builds %s (%s)" %
|
||||
(regex, len(builds)))
|
||||
|
||||
def executeJob(self, job):
|
||||
build = FakeBuild(self, job)
|
||||
job.build = build
|
||||
def executeJob(self, build_request, params):
|
||||
build = FakeBuild(self, build_request, params)
|
||||
self.running_builds.append(build)
|
||||
self.job_builds[job.unique] = build
|
||||
args = json.loads(job.arguments)
|
||||
args['zuul']['_test'] = dict(test_root=self._test_root)
|
||||
job.arguments = json.dumps(args)
|
||||
super(RecordingExecutorServer, self).executeJob(job)
|
||||
self.job_builds[build_request.uuid] = build
|
||||
params['zuul']['_test'] = dict(test_root=self._test_root)
|
||||
self.executor_api.update(build_request)
|
||||
super(RecordingExecutorServer, self).executeJob(build_request, params)
|
||||
|
||||
def stopJob(self, job):
|
||||
def stopJob(self, build_request: BuildRequest):
|
||||
self.log.debug("handle stop")
|
||||
parameters = json.loads(job.arguments)
|
||||
uuid = parameters['uuid']
|
||||
uuid = build_request.uuid
|
||||
for build in self.running_builds:
|
||||
if build.unique == uuid:
|
||||
build.aborted = True
|
||||
build.release()
|
||||
super(RecordingExecutorServer, self).stopJob(job)
|
||||
super(RecordingExecutorServer, self).stopJob(build_request)
|
||||
|
||||
def stop(self):
|
||||
for build in self.running_builds:
|
||||
|
@ -3271,6 +3357,7 @@ class RecordingExecutorServer(zuul.executor.server.ExecutorServer):
|
|||
|
||||
class TestScheduler(zuul.scheduler.Scheduler):
|
||||
_merger_client_class = RecordingMergeClient
|
||||
_executor_client_class = HoldableExecutorClient
|
||||
|
||||
|
||||
class FakeGearmanServer(gear.Server):
|
||||
|
@ -3307,9 +3394,7 @@ class FakeGearmanServer(gear.Server):
|
|||
for job in job_queue:
|
||||
self.jobs_history.append(job)
|
||||
if not hasattr(job, 'waiting'):
|
||||
if job.name.startswith(b'executor:execute'):
|
||||
job.waiting = self.hold_jobs_in_queue
|
||||
elif job.name.startswith(b'merger:'):
|
||||
if job.name.startswith(b'merger:'):
|
||||
job.waiting = self.hold_merge_jobs_in_queue
|
||||
else:
|
||||
job.waiting = False
|
||||
|
@ -3788,7 +3873,7 @@ class ZuulWebFixture(fixtures.Fixture):
|
|||
rpcclient: RPCClient, poller_events, git_url_with_auth: bool,
|
||||
add_cleanup: Callable[[Callable[[], None]], None],
|
||||
test_root: str, fake_sql: bool = True,
|
||||
info: Optional[zuul.model.WebInfo] = None):
|
||||
info: Optional[WebInfo] = None):
|
||||
super(ZuulWebFixture, self).__init__()
|
||||
self.config = config
|
||||
self.connections = TestConnectionRegistry(
|
||||
|
@ -3803,7 +3888,7 @@ class ZuulWebFixture(fixtures.Fixture):
|
|||
self.authenticators = zuul.lib.auth.AuthenticatorRegistry()
|
||||
self.authenticators.configure(config)
|
||||
if info is None:
|
||||
self.info = zuul.model.WebInfo.fromConfig(config)
|
||||
self.info = WebInfo.fromConfig(config)
|
||||
else:
|
||||
self.info = info
|
||||
self.test_root = test_root
|
||||
|
@ -4080,7 +4165,6 @@ class SchedulerTestApp:
|
|||
|
||||
def start(self, validate_tenants: list):
|
||||
self.sched.start()
|
||||
self.sched.executor.gearman.waitForServer()
|
||||
self.sched.prime(self.config, validate_tenants=validate_tenants)
|
||||
|
||||
def fullReconfigure(self):
|
||||
|
@ -4386,6 +4470,7 @@ class ZuulTestCase(BaseTestCase):
|
|||
self.git_url_with_auth, self.addCleanup, True)
|
||||
executor_connections.configure(self.config,
|
||||
source_only=self.source_only)
|
||||
self.executor_api = TestingExecutorApi(self.zk_client)
|
||||
self.executor_server = RecordingExecutorServer(
|
||||
self.config,
|
||||
executor_connections,
|
||||
|
@ -4849,26 +4934,23 @@ class ZuulTestCase(BaseTestCase):
|
|||
return sorted(self.builds, key=lambda x: x.name)
|
||||
|
||||
def release(self, job):
|
||||
if isinstance(job, FakeBuild):
|
||||
job.release()
|
||||
else:
|
||||
job.waiting = False
|
||||
self.log.debug("Queued job %s released" % job.unique)
|
||||
self.gearman_server.wakeConnections()
|
||||
job.release()
|
||||
|
||||
def getParameter(self, job, name):
|
||||
if isinstance(job, FakeBuild):
|
||||
return job.parameters[name]
|
||||
else:
|
||||
parameters = json.loads(job.arguments)
|
||||
return parameters[name]
|
||||
@property
|
||||
def hold_jobs_in_queue(self):
|
||||
return self.executor_api.hold_in_queue
|
||||
|
||||
@hold_jobs_in_queue.setter
|
||||
def hold_jobs_in_queue(self, hold_jobs_in_queue: bool):
|
||||
"""Helper method to set hold_in_queue on all involved BuildQueues"""
|
||||
|
||||
self.executor_api.hold_in_queue = hold_jobs_in_queue
|
||||
for app in self.scheds:
|
||||
app.sched.executor.executor_api.hold_in_queue = hold_jobs_in_queue
|
||||
|
||||
def __haveAllBuildsReported(self, matcher) -> bool:
|
||||
for app in self.scheds.filter(matcher):
|
||||
executor_client = app.sched.executor
|
||||
# See if Zuul is waiting on a meta job to complete
|
||||
if executor_client.meta_jobs:
|
||||
return False
|
||||
# Find out if every build that the worker has completed has been
|
||||
# reported back to Zuul. If it hasn't then that means a Gearman
|
||||
# event is still in transit and the system is not stable.
|
||||
|
@ -4879,58 +4961,54 @@ class ZuulTestCase(BaseTestCase):
|
|||
continue
|
||||
# It hasn't been reported yet.
|
||||
return False
|
||||
# Make sure that none of the worker connections are in GRAB_WAIT
|
||||
worker = self.executor_server.executor_gearworker.gearman
|
||||
for connection in worker.active_connections:
|
||||
if connection.state == 'GRAB_WAIT':
|
||||
return False
|
||||
return True
|
||||
|
||||
def __areAllBuildsWaiting(self, matcher) -> bool:
|
||||
# TODO (felix): With all build requests stored in ZK would it be
|
||||
# sufficient to query ZK for all known builds and make assertions based
|
||||
# on their state?
|
||||
for app in self.scheds.filter(matcher):
|
||||
executor_client = app.sched.executor
|
||||
builds = executor_client.builds.values()
|
||||
seen_builds = set()
|
||||
for build in builds:
|
||||
seen_builds.add(build.uuid)
|
||||
client_job = None
|
||||
for conn in executor_client.gearman.active_connections:
|
||||
for j in conn.related_jobs.values():
|
||||
if j.unique == build.uuid:
|
||||
client_job = j
|
||||
break
|
||||
if not client_job:
|
||||
self.log.debug("%s is not known to the gearman client" %
|
||||
build)
|
||||
return False
|
||||
if not client_job.handle:
|
||||
self.log.debug("%s has no handle" % client_job)
|
||||
return False
|
||||
server_job = self.gearman_server.jobs.get(client_job.handle)
|
||||
if not server_job:
|
||||
self.log.debug("%s is not known to the gearman server" %
|
||||
client_job)
|
||||
return False
|
||||
if not hasattr(server_job, 'waiting'):
|
||||
self.log.debug("%s is being enqueued" % server_job)
|
||||
return False
|
||||
if server_job.waiting:
|
||||
|
||||
# Noop jobs are now added to the local build list in the
|
||||
# executor client, so they can be looked up in the scheduler
|
||||
# when the build result events are processed.
|
||||
# As most of the following tests don't make much sense for
|
||||
# those builds and they are - per definition - completed
|
||||
# immediately, we can simply skip them.
|
||||
if build.job.name == "noop":
|
||||
continue
|
||||
if build.url is None:
|
||||
self.log.debug("%s has not reported start" % build)
|
||||
|
||||
if not build.build_request_ref:
|
||||
self.log.debug("%s has not been submitted", build)
|
||||
return False
|
||||
# using internal ServerJob which offers no Text interface
|
||||
worker_build = self.executor_server.job_builds.get(
|
||||
server_job.unique.decode('utf8'))
|
||||
build_request = self.executor_api.get(build.build_request_ref)
|
||||
if not build_request:
|
||||
self.log.debug("%s is not known in Zookeeper", build)
|
||||
return False
|
||||
|
||||
if build_request.state == BuildRequest.HOLD:
|
||||
continue
|
||||
|
||||
if build.url is None:
|
||||
self.log.debug("%s has not reported start", build)
|
||||
return False
|
||||
# Check if the build is currently processed by the
|
||||
# RecordingExecutorServer.
|
||||
worker_build = self.executor_server.job_builds.get(build.uuid)
|
||||
if worker_build:
|
||||
if build.paused:
|
||||
continue
|
||||
if worker_build.isWaiting():
|
||||
continue
|
||||
self.log.debug("%s is running" % worker_build)
|
||||
self.log.debug("%s is running", worker_build)
|
||||
return False
|
||||
else:
|
||||
self.log.debug("%s is unassigned" % server_job)
|
||||
self.log.debug("%s is unassigned", build)
|
||||
return False
|
||||
for (build_uuid, job_worker) in \
|
||||
self.executor_server.job_workers.items():
|
||||
|
@ -5059,6 +5137,8 @@ class ZuulTestCase(BaseTestCase):
|
|||
lambda app: app.sched.run_handler_lock.release())
|
||||
self.executor_server.lock.release()
|
||||
self.scheds.execute(lambda app: app.sched.wake_event.wait(0.1))
|
||||
# Let other threads work
|
||||
time.sleep(0.1)
|
||||
|
||||
def _logQueueStatus(self, logger, matcher, all_zk_queues_empty,
|
||||
all_merge_jobs_waiting, all_builds_reported,
|
||||
|
@ -5262,10 +5342,10 @@ class ZuulTestCase(BaseTestCase):
|
|||
getattr(self.builds[i], k), v,
|
||||
"Element %i in builds does not match" % (i,))
|
||||
except Exception:
|
||||
if not self.builds:
|
||||
self.log.error("No running builds")
|
||||
for build in self.builds:
|
||||
self.log.error("Running build: %s" % build)
|
||||
else:
|
||||
self.log.error("No running builds")
|
||||
raise
|
||||
|
||||
def assertHistory(self, history, ordered=True):
|
||||
|
|
|
@ -239,7 +239,7 @@ class TestGerritToGithubCRD(ZuulTestCase):
|
|||
def test_crd_check(self):
|
||||
"Test cross-repo dependencies in independent pipelines"
|
||||
self.executor_server.hold_jobs_in_build = True
|
||||
self.gearman_server.hold_jobs_in_queue = True
|
||||
self.hold_jobs_in_queue = True
|
||||
A = self.fake_gerrit.addFakeChange('gerrit/project1', 'master', 'A')
|
||||
B = self.fake_github.openFakePullRequest(
|
||||
'github/project2', 'master', 'B')
|
||||
|
@ -251,8 +251,8 @@ class TestGerritToGithubCRD(ZuulTestCase):
|
|||
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
|
||||
self.waitUntilSettled()
|
||||
|
||||
self.gearman_server.hold_jobs_in_queue = False
|
||||
self.gearman_server.release()
|
||||
self.hold_jobs_in_queue = False
|
||||
self.executor_api.release()
|
||||
self.waitUntilSettled()
|
||||
|
||||
self.executor_server.release('.*-merge')
|
||||
|
@ -334,7 +334,7 @@ class TestGerritToGithubCRD(ZuulTestCase):
|
|||
def _test_crd_check_reconfiguration(self, project1, project2):
|
||||
"Test cross-repo dependencies re-enqueued in independent pipelines"
|
||||
|
||||
self.gearman_server.hold_jobs_in_queue = True
|
||||
self.hold_jobs_in_queue = True
|
||||
A = self.fake_gerrit.addFakeChange('gerrit/project1', 'master', 'A')
|
||||
B = self.fake_github.openFakePullRequest(
|
||||
'github/project2', 'master', 'B')
|
||||
|
@ -359,8 +359,8 @@ class TestGerritToGithubCRD(ZuulTestCase):
|
|||
self.assertFalse(first_item.live)
|
||||
self.assertTrue(queue.queue[1].live)
|
||||
|
||||
self.gearman_server.hold_jobs_in_queue = False
|
||||
self.gearman_server.release()
|
||||
self.hold_jobs_in_queue = False
|
||||
self.executor_api.release()
|
||||
self.waitUntilSettled()
|
||||
|
||||
self.assertEqual(A.data['status'], 'NEW')
|
||||
|
@ -694,7 +694,7 @@ class TestGithubToGerritCRD(ZuulTestCase):
|
|||
def test_crd_check(self):
|
||||
"Test cross-repo dependencies in independent pipelines"
|
||||
self.executor_server.hold_jobs_in_build = True
|
||||
self.gearman_server.hold_jobs_in_queue = True
|
||||
self.hold_jobs_in_queue = True
|
||||
A = self.fake_github.openFakePullRequest('github/project2', 'master',
|
||||
'A')
|
||||
B = self.fake_gerrit.addFakeChange(
|
||||
|
@ -706,8 +706,8 @@ class TestGithubToGerritCRD(ZuulTestCase):
|
|||
self.fake_github.emitEvent(A.getPullRequestEditedEvent())
|
||||
self.waitUntilSettled()
|
||||
|
||||
self.gearman_server.hold_jobs_in_queue = False
|
||||
self.gearman_server.release()
|
||||
self.hold_jobs_in_queue = False
|
||||
self.executor_api.release()
|
||||
self.waitUntilSettled()
|
||||
|
||||
self.executor_server.release('.*-merge')
|
||||
|
@ -787,7 +787,7 @@ class TestGithubToGerritCRD(ZuulTestCase):
|
|||
def _test_crd_check_reconfiguration(self, project1, project2):
|
||||
"Test cross-repo dependencies re-enqueued in independent pipelines"
|
||||
|
||||
self.gearman_server.hold_jobs_in_queue = True
|
||||
self.hold_jobs_in_queue = True
|
||||
A = self.fake_github.openFakePullRequest('github/project2', 'master',
|
||||
'A')
|
||||
B = self.fake_gerrit.addFakeChange(
|
||||
|
@ -812,8 +812,8 @@ class TestGithubToGerritCRD(ZuulTestCase):
|
|||
self.assertFalse(first_item.live)
|
||||
self.assertTrue(queue.queue[1].live)
|
||||
|
||||
self.gearman_server.hold_jobs_in_queue = False
|
||||
self.gearman_server.release()
|
||||
self.hold_jobs_in_queue = False
|
||||
self.executor_api.release()
|
||||
self.waitUntilSettled()
|
||||
|
||||
self.assertFalse(A.is_merged)
|
||||
|
|
|
@ -21,10 +21,6 @@ import os
|
|||
import time
|
||||
from unittest import mock
|
||||
|
||||
import zuul.executor.server
|
||||
import zuul.model
|
||||
import gear
|
||||
|
||||
from tests.base import (
|
||||
BaseTestCase,
|
||||
ZuulTestCase,
|
||||
|
@ -36,7 +32,9 @@ from tests.base import (
|
|||
|
||||
from zuul.executor.sensors.startingbuilds import StartingBuildsSensor
|
||||
from zuul.executor.sensors.ram import RAMSensor
|
||||
from zuul.executor.server import AnsibleJob, squash_variables
|
||||
from zuul.lib.ansible import AnsibleManager
|
||||
from zuul.model import BuildRequest
|
||||
|
||||
|
||||
class TestExecutorRepos(ZuulTestCase):
|
||||
|
@ -437,10 +435,22 @@ class TestAnsibleJob(ZuulTestCase):
|
|||
def setUp(self):
|
||||
super(TestAnsibleJob, self).setUp()
|
||||
ansible_version = AnsibleManager().default_version
|
||||
args = '{"ansible_version": "%s"}' % ansible_version
|
||||
job = gear.TextJob('executor:execute', args, unique='test')
|
||||
self.test_job = zuul.executor.server.AnsibleJob(self.executor_server,
|
||||
job)
|
||||
params = {
|
||||
"ansible_version": ansible_version,
|
||||
"zuul_event_id": 0,
|
||||
}
|
||||
build_request = BuildRequest(
|
||||
"test",
|
||||
state=None,
|
||||
precedence=200,
|
||||
params=params,
|
||||
zone=None,
|
||||
tenant_name=None,
|
||||
pipeline_name=None,
|
||||
event_id='1',
|
||||
)
|
||||
|
||||
self.test_job = AnsibleJob(self.executor_server, build_request, params)
|
||||
|
||||
def test_getHostList_host_keys(self):
|
||||
# Test without connection_port set
|
||||
|
@ -1001,7 +1011,7 @@ class TestVarSquash(BaseTestCase):
|
|||
extravars = {
|
||||
'extra': 'extravar_extra',
|
||||
}
|
||||
out = zuul.executor.server.squash_variables(
|
||||
out = squash_variables(
|
||||
nodes, groups, jobvars, groupvars, extravars)
|
||||
|
||||
expected = {
|
||||
|
|
|
@ -395,7 +395,7 @@ class TestGerritCRD(ZuulTestCase):
|
|||
"Test cross-repo dependencies in independent pipelines"
|
||||
|
||||
self.executor_server.hold_jobs_in_build = True
|
||||
self.gearman_server.hold_jobs_in_queue = True
|
||||
self.hold_jobs_in_queue = True
|
||||
A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
|
||||
B = self.fake_gerrit.addFakeChange('org/project2', 'master', 'B')
|
||||
|
||||
|
@ -406,8 +406,8 @@ class TestGerritCRD(ZuulTestCase):
|
|||
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
|
||||
self.waitUntilSettled()
|
||||
|
||||
self.gearman_server.hold_jobs_in_queue = False
|
||||
self.gearman_server.release()
|
||||
self.hold_jobs_in_queue = False
|
||||
self.executor_api.release()
|
||||
self.waitUntilSettled()
|
||||
|
||||
self.executor_server.release('.*-merge')
|
||||
|
@ -505,7 +505,7 @@ class TestGerritCRD(ZuulTestCase):
|
|||
def _test_crd_check_reconfiguration(self, project1, project2):
|
||||
"Test cross-repo dependencies re-enqueued in independent pipelines"
|
||||
|
||||
self.gearman_server.hold_jobs_in_queue = True
|
||||
self.hold_jobs_in_queue = True
|
||||
A = self.fake_gerrit.addFakeChange(project1, 'master', 'A')
|
||||
B = self.fake_gerrit.addFakeChange(project2, 'master', 'B')
|
||||
|
||||
|
@ -529,8 +529,8 @@ class TestGerritCRD(ZuulTestCase):
|
|||
self.assertFalse(first_item.live)
|
||||
self.assertTrue(queue.queue[1].live)
|
||||
|
||||
self.gearman_server.hold_jobs_in_queue = False
|
||||
self.gearman_server.release()
|
||||
self.hold_jobs_in_queue = False
|
||||
self.executor_api.release()
|
||||
self.waitUntilSettled()
|
||||
|
||||
self.assertEqual(A.data['status'], 'NEW')
|
||||
|
@ -556,7 +556,7 @@ class TestGerritCRD(ZuulTestCase):
|
|||
def test_crd_check_ignore_dependencies(self):
|
||||
"Test cross-repo dependencies can be ignored"
|
||||
|
||||
self.gearman_server.hold_jobs_in_queue = True
|
||||
self.hold_jobs_in_queue = True
|
||||
A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
|
||||
B = self.fake_gerrit.addFakeChange('org/project2', 'master', 'B')
|
||||
C = self.fake_gerrit.addFakeChange('org/project2', 'master', 'C')
|
||||
|
@ -580,8 +580,8 @@ class TestGerritCRD(ZuulTestCase):
|
|||
for item in check_pipeline.getAllItems():
|
||||
self.assertTrue(item.live)
|
||||
|
||||
self.gearman_server.hold_jobs_in_queue = False
|
||||
self.gearman_server.release()
|
||||
self.hold_jobs_in_queue = False
|
||||
self.executor_api.release()
|
||||
self.waitUntilSettled()
|
||||
|
||||
self.assertEqual(A.data['status'], 'NEW')
|
||||
|
@ -736,7 +736,7 @@ class TestGerritCRDAltBaseUrl(ZuulTestCase):
|
|||
"Test basic cross-repo dependencies with an alternate gerrit baseurl"
|
||||
|
||||
self.executor_server.hold_jobs_in_build = True
|
||||
self.gearman_server.hold_jobs_in_queue = True
|
||||
self.hold_jobs_in_queue = True
|
||||
A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
|
||||
B = self.fake_gerrit.addFakeChange('org/project2', 'master', 'B')
|
||||
|
||||
|
@ -748,8 +748,8 @@ class TestGerritCRDAltBaseUrl(ZuulTestCase):
|
|||
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
|
||||
self.waitUntilSettled()
|
||||
|
||||
self.gearman_server.hold_jobs_in_queue = False
|
||||
self.gearman_server.release()
|
||||
self.hold_jobs_in_queue = False
|
||||
self.executor_api.release()
|
||||
self.waitUntilSettled()
|
||||
|
||||
self.executor_server.release('.*-merge')
|
||||
|
|
|
@ -324,7 +324,7 @@ class TestGerritLegacyCRD(ZuulTestCase):
|
|||
"Test cross-repo dependencies in independent pipelines"
|
||||
|
||||
self.executor_server.hold_jobs_in_build = True
|
||||
self.gearman_server.hold_jobs_in_queue = True
|
||||
self.hold_jobs_in_queue = True
|
||||
A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
|
||||
B = self.fake_gerrit.addFakeChange('org/project2', 'master', 'B')
|
||||
|
||||
|
@ -335,8 +335,8 @@ class TestGerritLegacyCRD(ZuulTestCase):
|
|||
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
|
||||
self.waitUntilSettled()
|
||||
|
||||
self.gearman_server.hold_jobs_in_queue = False
|
||||
self.gearman_server.release()
|
||||
self.hold_jobs_in_queue = False
|
||||
self.executor_api.release()
|
||||
self.waitUntilSettled()
|
||||
|
||||
self.executor_server.release('.*-merge')
|
||||
|
@ -434,7 +434,7 @@ class TestGerritLegacyCRD(ZuulTestCase):
|
|||
def _test_crd_check_reconfiguration(self, project1, project2):
|
||||
"Test cross-repo dependencies re-enqueued in independent pipelines"
|
||||
|
||||
self.gearman_server.hold_jobs_in_queue = True
|
||||
self.hold_jobs_in_queue = True
|
||||
A = self.fake_gerrit.addFakeChange(project1, 'master', 'A')
|
||||
B = self.fake_gerrit.addFakeChange(project2, 'master', 'B')
|
||||
|
||||
|
@ -458,8 +458,8 @@ class TestGerritLegacyCRD(ZuulTestCase):
|
|||
self.assertFalse(first_item.live)
|
||||
self.assertTrue(queue.queue[1].live)
|
||||
|
||||
self.gearman_server.hold_jobs_in_queue = False
|
||||
self.gearman_server.release()
|
||||
self.hold_jobs_in_queue = False
|
||||
self.executor_api.release()
|
||||
self.waitUntilSettled()
|
||||
|
||||
self.assertEqual(A.data['status'], 'NEW')
|
||||
|
@ -485,7 +485,7 @@ class TestGerritLegacyCRD(ZuulTestCase):
|
|||
def test_crd_check_ignore_dependencies(self):
|
||||
"Test cross-repo dependencies can be ignored"
|
||||
|
||||
self.gearman_server.hold_jobs_in_queue = True
|
||||
self.hold_jobs_in_queue = True
|
||||
A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
|
||||
B = self.fake_gerrit.addFakeChange('org/project2', 'master', 'B')
|
||||
C = self.fake_gerrit.addFakeChange('org/project2', 'master', 'C')
|
||||
|
@ -509,8 +509,8 @@ class TestGerritLegacyCRD(ZuulTestCase):
|
|||
for item in check_pipeline.getAllItems():
|
||||
self.assertTrue(item.live)
|
||||
|
||||
self.gearman_server.hold_jobs_in_queue = False
|
||||
self.gearman_server.release()
|
||||
self.hold_jobs_in_queue = False
|
||||
self.executor_api.release()
|
||||
self.waitUntilSettled()
|
||||
|
||||
self.assertEqual(A.data['status'], 'NEW')
|
||||
|
|
|
@ -34,7 +34,7 @@ class TestInventoryBase(ZuulTestCase):
|
|||
if shell_type:
|
||||
self.fake_nodepool.shell_type = shell_type
|
||||
self.executor_server.hold_jobs_in_build = True
|
||||
self.gearman_server.hold_jobs_in_queue = True
|
||||
self.hold_jobs_in_queue = True
|
||||
|
||||
if self.use_gerrit:
|
||||
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
|
||||
|
@ -74,8 +74,8 @@ class TestInventoryBase(ZuulTestCase):
|
|||
return yaml.ansible_unsafe_load(open(setup_inv_path, 'r'))
|
||||
|
||||
def runJob(self, name):
|
||||
self.gearman_server.hold_jobs_in_queue = False
|
||||
self.gearman_server.release('^%s$' % name)
|
||||
self.hold_jobs_in_queue = False
|
||||
self.executor_api.release(f'^{name}$')
|
||||
self.waitUntilSettled()
|
||||
|
||||
def cancelExecutorJobs(self):
|
||||
|
@ -83,7 +83,7 @@ class TestInventoryBase(ZuulTestCase):
|
|||
executor_client = app.sched.executor
|
||||
builds = [b for b in executor_client.builds.values()]
|
||||
for build in builds:
|
||||
executor_client.cancelJobInQueue(build)
|
||||
executor_client.cancel(build)
|
||||
|
||||
|
||||
class TestInventoryGithub(TestInventoryBase):
|
||||
|
|
|
@ -15,16 +15,15 @@
|
|||
import configparser
|
||||
import gc
|
||||
import json
|
||||
import textwrap
|
||||
|
||||
import os
|
||||
import re
|
||||
import shutil
|
||||
import socket
|
||||
import textwrap
|
||||
import threading
|
||||
import time
|
||||
from collections import namedtuple
|
||||
from unittest import mock
|
||||
from unittest import skip
|
||||
from unittest import mock, skip
|
||||
from kazoo.exceptions import NoNodeError
|
||||
|
||||
import git
|
||||
|
@ -126,19 +125,19 @@ class TestSchedulerZone(ZuulTestCase):
|
|||
'zuul.executors.zone.test-provider_vpn.online',
|
||||
value='1', kind='g')
|
||||
|
||||
self.gearman_server.hold_jobs_in_queue = True
|
||||
self.hold_jobs_in_queue = True
|
||||
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
|
||||
A.addApproval('Code-Review', 2)
|
||||
self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
|
||||
self.waitUntilSettled()
|
||||
|
||||
queue = self.gearman_server.getQueue()
|
||||
queue = list(self.executor_api.queued())
|
||||
self.assertEqual(len(self.builds), 0)
|
||||
self.assertEqual(len(queue), 1)
|
||||
self.assertEqual(b'executor:execute:test-provider.vpn', queue[0].name)
|
||||
self.assertEqual('test-provider.vpn', queue[0].zone)
|
||||
|
||||
self.gearman_server.hold_jobs_in_queue = False
|
||||
self.gearman_server.release()
|
||||
self.hold_jobs_in_queue = False
|
||||
self.executor_api.release()
|
||||
self.waitUntilSettled()
|
||||
|
||||
self.assertEqual(self.getJobFromHistory('project-merge').result,
|
||||
|
@ -163,6 +162,45 @@ class TestSchedulerZone(ZuulTestCase):
|
|||
'zuul.executors.zone.test-provider_vpn.accepting',
|
||||
value='1', kind='g')
|
||||
|
||||
@skip("Disabled until I0245b71f31aae9616d8e65d27c63b25b2c27f815")
|
||||
def test_executor_disconnect(self):
|
||||
"Test that jobs are completed after an executor disconnect"
|
||||
|
||||
self.executor_server.hold_jobs_in_build = True
|
||||
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
|
||||
A.addApproval('Code-Review', 2)
|
||||
self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
|
||||
self.waitUntilSettled()
|
||||
|
||||
# Forcibly disconnect the executor from ZK
|
||||
self.executor_server.zk_client.client.stop()
|
||||
self.executor_server.zk_client.client.start()
|
||||
|
||||
# Find the build in the scheduler so we can check its status
|
||||
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
|
||||
items = tenant.layout.pipelines['gate'].getAllItems()
|
||||
builds = items[0].current_build_set.getBuilds()
|
||||
build = builds[0]
|
||||
|
||||
# Clean up the build
|
||||
self.scheds.first.sched.executor.cleanupLostBuildRequests()
|
||||
# Wait for the build to be reported as lost
|
||||
for x in iterate_timeout(30, 'retry build'):
|
||||
if build.result == 'RETRY':
|
||||
break
|
||||
|
||||
# If we didn't timeout, then it worked; we're done
|
||||
self.executor_server.hold_jobs_in_build = False
|
||||
self.executor_server.release()
|
||||
self.waitUntilSettled()
|
||||
|
||||
self.assertHistory([
|
||||
dict(name='project-merge', result='ABORTED', changes='1,1'),
|
||||
dict(name='project-merge', result='SUCCESS', changes='1,1'),
|
||||
dict(name='project-test1', result='SUCCESS', changes='1,1'),
|
||||
dict(name='project-test2', result='SUCCESS', changes='1,1'),
|
||||
], ordered=False)
|
||||
|
||||
|
||||
class TestSchedulerZoneFallback(ZuulTestCase):
|
||||
tenant_config_file = 'config/single-tenant/main.yaml'
|
||||
|
@ -175,19 +213,19 @@ class TestSchedulerZoneFallback(ZuulTestCase):
|
|||
|
||||
def test_jobs_executed(self):
|
||||
"Test that jobs are executed and a change is merged per zone"
|
||||
self.gearman_server.hold_jobs_in_queue = True
|
||||
self.hold_jobs_in_queue = True
|
||||
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
|
||||
A.addApproval('Code-Review', 2)
|
||||
self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
|
||||
self.waitUntilSettled()
|
||||
|
||||
queue = self.gearman_server.getQueue()
|
||||
queue = list(self.executor_api.queued())
|
||||
self.assertEqual(len(self.builds), 0)
|
||||
self.assertEqual(len(queue), 1)
|
||||
self.assertEqual(b'executor:execute', queue[0].name)
|
||||
self.assertEqual(None, queue[0].zone)
|
||||
|
||||
self.gearman_server.hold_jobs_in_queue = False
|
||||
self.gearman_server.release()
|
||||
self.hold_jobs_in_queue = False
|
||||
self.executor_api.release()
|
||||
self.waitUntilSettled()
|
||||
|
||||
self.assertEqual(self.getJobFromHistory('project-merge').result,
|
||||
|
@ -945,7 +983,7 @@ class TestScheduler(ZuulTestCase):
|
|||
def test_failed_change_at_head_with_queue(self):
|
||||
"Test that if a change at the head fails, queued jobs are canceled"
|
||||
|
||||
self.gearman_server.hold_jobs_in_queue = True
|
||||
self.hold_jobs_in_queue = True
|
||||
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
|
||||
B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
|
||||
C = self.fake_gerrit.addFakeChange('org/project', 'master', 'C')
|
||||
|
@ -960,54 +998,42 @@ class TestScheduler(ZuulTestCase):
|
|||
self.fake_gerrit.addEvent(C.addApproval('Approved', 1))
|
||||
|
||||
self.waitUntilSettled()
|
||||
queue = self.gearman_server.getQueue()
|
||||
queue = list(self.executor_api.queued())
|
||||
self.assertEqual(len(self.builds), 0)
|
||||
self.assertEqual(len(queue), 1)
|
||||
self.assertEqual(queue[0].name, b'executor:execute')
|
||||
job_args = json.loads(queue[0].arguments.decode('utf8'))
|
||||
self.assertEqual(queue[0].zone, None)
|
||||
job_args = queue[0].params
|
||||
self.assertEqual(job_args['job'], 'project-merge')
|
||||
self.assertEqual(job_args['items'][0]['number'], '%d' % A.number)
|
||||
|
||||
self.gearman_server.release('.*-merge')
|
||||
self.executor_api.release('.*-merge')
|
||||
self.waitUntilSettled()
|
||||
self.gearman_server.release('.*-merge')
|
||||
self.executor_api.release('.*-merge')
|
||||
self.waitUntilSettled()
|
||||
self.gearman_server.release('.*-merge')
|
||||
self.executor_api.release('.*-merge')
|
||||
self.waitUntilSettled()
|
||||
queue = self.gearman_server.getQueue()
|
||||
queue = list(self.executor_api.queued())
|
||||
|
||||
self.assertEqual(len(self.builds), 0)
|
||||
self.assertEqual(len(queue), 6)
|
||||
|
||||
self.assertEqual(
|
||||
json.loads(queue[0].arguments.decode('utf8'))['job'],
|
||||
'project-test1')
|
||||
self.assertEqual(
|
||||
json.loads(queue[1].arguments.decode('utf8'))['job'],
|
||||
'project-test2')
|
||||
self.assertEqual(
|
||||
json.loads(queue[2].arguments.decode('utf8'))['job'],
|
||||
'project-test1')
|
||||
self.assertEqual(
|
||||
json.loads(queue[3].arguments.decode('utf8'))['job'],
|
||||
'project-test2')
|
||||
self.assertEqual(
|
||||
json.loads(queue[4].arguments.decode('utf8'))['job'],
|
||||
'project-test1')
|
||||
self.assertEqual(
|
||||
json.loads(queue[5].arguments.decode('utf8'))['job'],
|
||||
'project-test2')
|
||||
self.assertEqual(queue[0].params['job'], 'project-test1')
|
||||
self.assertEqual(queue[1].params['job'], 'project-test2')
|
||||
self.assertEqual(queue[2].params['job'], 'project-test1')
|
||||
self.assertEqual(queue[3].params['job'], 'project-test2')
|
||||
self.assertEqual(queue[4].params['job'], 'project-test1')
|
||||
self.assertEqual(queue[5].params['job'], 'project-test2')
|
||||
|
||||
self.release(queue[0])
|
||||
self.executor_api.release(queue[0])
|
||||
self.waitUntilSettled()
|
||||
|
||||
self.assertEqual(len(self.builds), 0)
|
||||
queue = self.gearman_server.getQueue()
|
||||
queue = list(self.executor_api.queued())
|
||||
self.assertEqual(len(queue), 2) # project-test2, project-merge for B
|
||||
self.assertEqual(self.countJobResults(self.history, 'ABORTED'), 0)
|
||||
|
||||
self.gearman_server.hold_jobs_in_queue = False
|
||||
self.gearman_server.release()
|
||||
self.hold_jobs_in_queue = False
|
||||
self.executor_api.release()
|
||||
self.waitUntilSettled()
|
||||
|
||||
self.assertEqual(len(self.builds), 0)
|
||||
|
@ -1374,7 +1400,7 @@ class TestScheduler(ZuulTestCase):
|
|||
def test_project_merge_conflict(self):
|
||||
"Test that gate merge conflicts are handled properly"
|
||||
|
||||
self.gearman_server.hold_jobs_in_queue = True
|
||||
self.hold_jobs_in_queue = True
|
||||
A = self.fake_gerrit.addFakeChange('org/project',
|
||||
'master', 'A',
|
||||
files={'conflict': 'foo'})
|
||||
|
@ -1394,15 +1420,15 @@ class TestScheduler(ZuulTestCase):
|
|||
self.assertEqual(A.reported, 1)
|
||||
self.assertEqual(C.reported, 1)
|
||||
|
||||
self.gearman_server.release('project-merge')
|
||||
self.executor_api.release('project-merge')
|
||||
self.waitUntilSettled()
|
||||
self.gearman_server.release('project-merge')
|
||||
self.executor_api.release('project-merge')
|
||||
self.waitUntilSettled()
|
||||
self.gearman_server.release('project-merge')
|
||||
self.executor_api.release('project-merge')
|
||||
self.waitUntilSettled()
|
||||
|
||||
self.gearman_server.hold_jobs_in_queue = False
|
||||
self.gearman_server.release()
|
||||
self.hold_jobs_in_queue = False
|
||||
self.executor_api.release()
|
||||
self.waitUntilSettled()
|
||||
|
||||
self.assertEqual(A.data['status'], 'MERGED')
|
||||
|
@ -1424,11 +1450,11 @@ class TestScheduler(ZuulTestCase):
|
|||
def test_delayed_merge_conflict(self):
|
||||
"Test that delayed check merge conflicts are handled properly"
|
||||
|
||||
# Hold jobs in the gearman queue so that we can test whether
|
||||
# Hold jobs in the ZooKeeper queue so that we can test whether
|
||||
# the executor sucesfully merges a change based on an old
|
||||
# repo state (frozen by the scheduler) which would otherwise
|
||||
# conflict.
|
||||
self.gearman_server.hold_jobs_in_queue = True
|
||||
self.hold_jobs_in_queue = True
|
||||
A = self.fake_gerrit.addFakeChange('org/project',
|
||||
'master', 'A',
|
||||
files={'conflict': 'foo'})
|
||||
|
@ -1454,16 +1480,16 @@ class TestScheduler(ZuulTestCase):
|
|||
|
||||
# A merges while B and C are queued in check
|
||||
# Release A project-merge
|
||||
queue = self.gearman_server.getQueue()
|
||||
self.release(queue[0])
|
||||
queue = list(self.executor_api.queued())
|
||||
self.executor_api.release(queue[0])
|
||||
self.waitUntilSettled()
|
||||
|
||||
# Release A project-test*
|
||||
# gate has higher precedence, so A's test jobs are added in
|
||||
# front of the merge jobs for B and C
|
||||
queue = self.gearman_server.getQueue()
|
||||
self.release(queue[0])
|
||||
self.release(queue[1])
|
||||
queue = list(self.executor_api.queued())
|
||||
self.executor_api.release(queue[0])
|
||||
self.executor_api.release(queue[1])
|
||||
self.waitUntilSettled()
|
||||
|
||||
self.assertEqual(A.data['status'], 'MERGED')
|
||||
|
@ -1480,13 +1506,13 @@ class TestScheduler(ZuulTestCase):
|
|||
|
||||
# B and C report merge conflicts
|
||||
# Release B project-merge
|
||||
queue = self.gearman_server.getQueue()
|
||||
self.release(queue[0])
|
||||
queue = list(self.executor_api.queued())
|
||||
self.executor_api.release(queue[0])
|
||||
self.waitUntilSettled()
|
||||
|
||||
# Release C
|
||||
self.gearman_server.hold_jobs_in_queue = False
|
||||
self.gearman_server.release()
|
||||
self.hold_jobs_in_queue = False
|
||||
self.executor_api.release()
|
||||
self.waitUntilSettled()
|
||||
|
||||
self.assertEqual(A.data['status'], 'MERGED')
|
||||
|
@ -2740,14 +2766,24 @@ class TestScheduler(ZuulTestCase):
|
|||
if all(b.worker.name != "Unknown" for b in builds):
|
||||
break
|
||||
|
||||
tevent = threading.Event()
|
||||
|
||||
def data_watch(data, stat, event):
|
||||
if not any([data, stat, event]):
|
||||
return
|
||||
# Set the threading event as soon as the cancel node is present
|
||||
tevent.set()
|
||||
return False
|
||||
|
||||
builds = list(self.executor_api.all())
|
||||
# Use a DataWatch to avoid a race condition between creating and
|
||||
# immediately deleting the cancel node in ZooKeeper.
|
||||
self.zk_client.client.DataWatch(f"{builds[0].path}/cancel", data_watch)
|
||||
|
||||
# Abandon change to cancel build
|
||||
self.fake_gerrit.addEvent(A.getChangeAbandonedEvent())
|
||||
|
||||
for _ in iterate_timeout(30, 'Wait for executor:stop request'):
|
||||
stop_jobs = [x for x in self.gearman_server.jobs_history
|
||||
if b'executor:stop' in x.name]
|
||||
if stop_jobs:
|
||||
break
|
||||
self.assertTrue(tevent.wait(timeout=30))
|
||||
|
||||
self.executor_server.hold_jobs_in_start = False
|
||||
self.waitUntilSettled()
|
||||
|
@ -2907,7 +2943,8 @@ class TestScheduler(ZuulTestCase):
|
|||
self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
|
||||
self.waitUntilSettled()
|
||||
|
||||
self.assertEqual(len(self.gearman_server.getQueue()), 0)
|
||||
queue = list(self.executor_api.queued())
|
||||
self.assertEqual(len(queue), 0)
|
||||
self.assertTrue(self.scheds.first.sched._areAllBuildsComplete())
|
||||
self.assertEqual(len(self.history), 0)
|
||||
self.assertEqual(A.data['status'], 'MERGED')
|
||||
|
@ -3387,7 +3424,7 @@ class TestScheduler(ZuulTestCase):
|
|||
def test_queue_precedence(self):
|
||||
"Test that queue precedence works"
|
||||
|
||||
self.gearman_server.hold_jobs_in_queue = True
|
||||
self.hold_jobs_in_queue = True
|
||||
self.executor_server.hold_jobs_in_build = True
|
||||
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
|
||||
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
|
||||
|
@ -3395,8 +3432,8 @@ class TestScheduler(ZuulTestCase):
|
|||
self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
|
||||
|
||||
self.waitUntilSettled()
|
||||
self.gearman_server.hold_jobs_in_queue = False
|
||||
self.gearman_server.release()
|
||||
self.hold_jobs_in_queue = False
|
||||
self.executor_api.release()
|
||||
self.waitUntilSettled()
|
||||
|
||||
# Run one build at a time to ensure non-race order:
|
||||
|
@ -5890,6 +5927,7 @@ For CI problems and help debugging, contact ci@example.org"""
|
|||
self.assertEqual(A.reported, 1)
|
||||
self.assertIn('RETRY_LIMIT', A.messages[0])
|
||||
|
||||
@skip("Disabled until I0245b71f31aae9616d8e65d27c63b25b2c27f815")
|
||||
def test_executor_disconnect(self):
|
||||
"Test that jobs are completed after an executor disconnect"
|
||||
|
||||
|
@ -5899,22 +5937,21 @@ For CI problems and help debugging, contact ci@example.org"""
|
|||
self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
|
||||
self.waitUntilSettled()
|
||||
|
||||
# Forcibly disconnect the executor from gearman
|
||||
for connection in self.executor_server.executor_gearworker.\
|
||||
gearman.active_connections:
|
||||
connection.conn.shutdown(socket.SHUT_RDWR)
|
||||
|
||||
# Wake up the cleanup thread since it is on a 5 minute interval
|
||||
self.scheds.first.sched.executor.cleanup_thread.wake_event.set()
|
||||
# Forcibly disconnect the executor from ZK
|
||||
self.executor_server.zk_client.client.stop()
|
||||
self.executor_server.zk_client.client.start()
|
||||
|
||||
# Find the build in the scheduler so we can check its status
|
||||
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
|
||||
items = tenant.layout.pipelines['gate'].getAllItems()
|
||||
builds = items[0].current_build_set.getBuilds()
|
||||
build = builds[0]
|
||||
|
||||
# Clean up the build
|
||||
self.scheds.first.sched.executor.cleanupLostBuildRequests()
|
||||
# Wait for the build to be reported as lost
|
||||
for x in iterate_timeout(30, 'lost build'):
|
||||
if build.result == 'LOST':
|
||||
for x in iterate_timeout(30, 'retry build'):
|
||||
if build.result == 'RETRY':
|
||||
break
|
||||
|
||||
# If we didn't timeout, then it worked; we're done
|
||||
|
@ -5922,16 +5959,14 @@ For CI problems and help debugging, contact ci@example.org"""
|
|||
self.executor_server.release()
|
||||
self.waitUntilSettled()
|
||||
|
||||
# LOST builds aren't recorded in the test history; instead the
|
||||
# original build will be reported as success since it
|
||||
# continued after our fake disconnect. However, the fact that
|
||||
# it's the *only* build, and the other two jobs are not
|
||||
# present is extra confirmation that the scheduler saw the
|
||||
# build as lost.
|
||||
self.assertHistory([
|
||||
dict(name='project-merge', result='ABORTED', changes='1,1'),
|
||||
dict(name='project-merge', result='SUCCESS', changes='1,1'),
|
||||
])
|
||||
dict(name='project-test1', result='SUCCESS', changes='1,1'),
|
||||
dict(name='project-test2', result='SUCCESS', changes='1,1'),
|
||||
], ordered=False)
|
||||
|
||||
@skip("Disabled until I0245b71f31aae9616d8e65d27c63b25b2c27f815")
|
||||
def test_scheduler_disconnect(self):
|
||||
"Test that jobs are completed after a scheduler disconnect"
|
||||
|
||||
|
@ -5941,29 +5976,18 @@ For CI problems and help debugging, contact ci@example.org"""
|
|||
self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
|
||||
self.waitUntilSettled()
|
||||
|
||||
# Forcibly disconnect the scheduler from gearman
|
||||
for connection in self.scheds.first.sched.executor.gearman.\
|
||||
active_connections:
|
||||
connection.conn.shutdown(socket.SHUT_RDWR)
|
||||
# Forcibly disconnect the scheduler from ZK
|
||||
self.scheds.execute(lambda app: app.sched.zk_client.client.stop())
|
||||
self.scheds.execute(lambda app: app.sched.zk_client.client.start())
|
||||
|
||||
# Find the build in the scheduler so we can check its status
|
||||
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
|
||||
items = tenant.layout.pipelines['gate'].getAllItems()
|
||||
builds = items[0].current_build_set.getBuilds()
|
||||
build = builds[0]
|
||||
# Wait for the build to be reported as lost
|
||||
for x in iterate_timeout(30, 'lost build'):
|
||||
if build.result == 'RETRY':
|
||||
break
|
||||
# Clean up lost builds
|
||||
self.scheds.first.sched.executor.cleanupLostBuildRequests()
|
||||
|
||||
# If we didn't timeout, then it worked; we're done
|
||||
self.executor_server.hold_jobs_in_build = False
|
||||
self.executor_server.release()
|
||||
self.waitUntilSettled()
|
||||
|
||||
# There's an extra merge build due to the retry
|
||||
self.assertHistory([
|
||||
dict(name='project-merge', result='SUCCESS', changes='1,1'),
|
||||
dict(name='project-merge', result='SUCCESS', changes='1,1'),
|
||||
dict(name='project-test1', result='SUCCESS', changes='1,1'),
|
||||
dict(name='project-test2', result='SUCCESS', changes='1,1'),
|
||||
|
|
|
@ -4468,7 +4468,7 @@ class TestDataReturn(AnsibleZuulTestCase):
|
|||
|
||||
# Stop the job worker to simulate an executor restart
|
||||
for job_worker in self.executor_server.job_workers.values():
|
||||
if job_worker.job.unique == paused_job.uuid:
|
||||
if job_worker.build_request.uuid == paused_job.uuid:
|
||||
job_worker.stop()
|
||||
self.waitUntilSettled("stop job worker")
|
||||
|
||||
|
@ -6131,7 +6131,7 @@ class TestJobPause(AnsibleZuulTestCase):
|
|||
|
||||
# Stop the job worker of compile1 to simulate an executor restart
|
||||
for job_worker in self.executor_server.job_workers.values():
|
||||
if job_worker.job.unique == compile1.unique:
|
||||
if job_worker.build_request.uuid == compile1.unique:
|
||||
job_worker.stop()
|
||||
self.waitUntilSettled("Stop job")
|
||||
|
||||
|
@ -6368,7 +6368,7 @@ class TestJobPause(AnsibleZuulTestCase):
|
|||
# Stop the job worker of test to simulate an executor restart
|
||||
job_test = self.builds[1]
|
||||
for job_worker in self.executor_server.job_workers.values():
|
||||
if job_worker.job.unique == job_test.unique:
|
||||
if job_worker.build_request.uuid == job_test.uuid:
|
||||
job_worker.stop()
|
||||
|
||||
self.executor_server.hold_jobs_in_build = False
|
||||
|
|
|
@ -34,7 +34,7 @@ from zuul.zk.components import (
|
|||
BaseComponent, ComponentRegistry, ExecutorComponent
|
||||
)
|
||||
|
||||
from tests.base import BaseTestCase, iterate_timeout
|
||||
from tests.base import BaseTestCase, HoldableExecutorApi, iterate_timeout
|
||||
|
||||
|
||||
class ZooKeeperBaseTestCase(BaseTestCase):
|
||||
|
@ -311,17 +311,6 @@ class TestComponentRegistry(ZooKeeperBaseTestCase):
|
|||
self.assertComponentState("executor", BaseComponent.RUNNING)
|
||||
|
||||
|
||||
class HoldableExecutorApi(ExecutorApi):
|
||||
hold_in_queue = False
|
||||
|
||||
@property
|
||||
def initial_state(self):
|
||||
# This supports holding build requests in tests
|
||||
if self.hold_in_queue:
|
||||
return BuildRequest.HOLD
|
||||
return BuildRequest.REQUESTED
|
||||
|
||||
|
||||
class TestExecutorApi(ZooKeeperBaseTestCase):
|
||||
def _get_zk_tree(self, root):
|
||||
items = []
|
||||
|
@ -352,7 +341,7 @@ class TestExecutorApi(ZooKeeperBaseTestCase):
|
|||
build_event_callback=eq_put)
|
||||
|
||||
# Scheduler submits request
|
||||
client.submit("A", "tenant", "pipeline", {}, None)
|
||||
client.submit("A", "tenant", "pipeline", {}, None, '1')
|
||||
request_queue.get(timeout=30)
|
||||
|
||||
# Executor receives request
|
||||
|
@ -435,7 +424,7 @@ class TestExecutorApi(ZooKeeperBaseTestCase):
|
|||
build_event_callback=eq_put)
|
||||
|
||||
# Scheduler submits request
|
||||
client.submit("A", "tenant", "pipeline", {}, None)
|
||||
client.submit("A", "tenant", "pipeline", {}, None, '1')
|
||||
request_queue.get(timeout=30)
|
||||
|
||||
# Executor receives request
|
||||
|
@ -487,7 +476,7 @@ class TestExecutorApi(ZooKeeperBaseTestCase):
|
|||
build_event_callback=eq_put)
|
||||
|
||||
# Scheduler submits request
|
||||
a_path = client.submit("A", "tenant", "pipeline", {}, None)
|
||||
a_path = client.submit("A", "tenant", "pipeline", {}, None, '1')
|
||||
request_queue.get(timeout=30)
|
||||
|
||||
# Executor receives nothing
|
||||
|
@ -523,7 +512,7 @@ class TestExecutorApi(ZooKeeperBaseTestCase):
|
|||
client = ExecutorApi(self.zk_client)
|
||||
|
||||
# Scheduler submits request
|
||||
a_path = client.submit("A", "tenant", "pipeline", {}, None)
|
||||
a_path = client.submit("A", "tenant", "pipeline", {}, None, '1')
|
||||
sched_a = client.get(a_path)
|
||||
|
||||
# Simulate the server side
|
||||
|
@ -542,11 +531,15 @@ class TestExecutorApi(ZooKeeperBaseTestCase):
|
|||
# requests
|
||||
executor_api = ExecutorApi(self.zk_client)
|
||||
|
||||
executor_api.submit("A", "tenant", "pipeline", {}, "zone")
|
||||
path_b = executor_api.submit("B", "tenant", "pipeline", {}, None)
|
||||
path_c = executor_api.submit("C", "tenant", "pipeline", {}, "zone")
|
||||
path_d = executor_api.submit("D", "tenant", "pipeline", {}, "zone")
|
||||
path_e = executor_api.submit("E", "tenant", "pipeline", {}, "zone")
|
||||
executor_api.submit("A", "tenant", "pipeline", {}, "zone", '1')
|
||||
path_b = executor_api.submit("B", "tenant", "pipeline", {},
|
||||
None, '1')
|
||||
path_c = executor_api.submit("C", "tenant", "pipeline", {},
|
||||
"zone", '1')
|
||||
path_d = executor_api.submit("D", "tenant", "pipeline", {},
|
||||
"zone", '1')
|
||||
path_e = executor_api.submit("E", "tenant", "pipeline", {},
|
||||
"zone", '1')
|
||||
|
||||
b = executor_api.get(path_b)
|
||||
c = executor_api.get(path_c)
|
||||
|
@ -603,7 +596,7 @@ class TestExecutorApi(ZooKeeperBaseTestCase):
|
|||
|
||||
# Simulate the client side
|
||||
client = ExecutorApi(self.zk_client)
|
||||
client.submit("A", "tenant", "pipeline", {}, None)
|
||||
client.submit("A", "tenant", "pipeline", {}, None, '1')
|
||||
|
||||
# Simulate the server side
|
||||
server = ExecutorApi(self.zk_client,
|
||||
|
|
|
@ -12,111 +12,26 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import gear
|
||||
import json
|
||||
import logging
|
||||
import time
|
||||
import threading
|
||||
from uuid import uuid4
|
||||
|
||||
import zuul.executor.common
|
||||
from zuul.lib.config import get_default
|
||||
from zuul.lib.gear_utils import getGearmanFunctions
|
||||
from zuul.lib.jsonutil import json_dumps
|
||||
from zuul.lib.logutil import get_annotated_logger
|
||||
from zuul.model import (
|
||||
Build,
|
||||
BuildCompletedEvent,
|
||||
BuildRequest,
|
||||
BuildStartedEvent,
|
||||
PRECEDENCE_HIGH,
|
||||
PRECEDENCE_LOW,
|
||||
PRECEDENCE_NORMAL,
|
||||
PRIORITY_MAP,
|
||||
)
|
||||
from zuul.zk.event_queues import PipelineResultEventQueue
|
||||
|
||||
|
||||
class GearmanCleanup(threading.Thread):
|
||||
""" A thread that checks to see if outstanding builds have
|
||||
completed without reporting back. """
|
||||
log = logging.getLogger("zuul.GearmanCleanup")
|
||||
|
||||
def __init__(self, gearman):
|
||||
threading.Thread.__init__(self)
|
||||
self.daemon = True
|
||||
self.gearman = gearman
|
||||
self.wake_event = threading.Event()
|
||||
self._stopped = False
|
||||
|
||||
def stop(self):
|
||||
self._stopped = True
|
||||
self.wake_event.set()
|
||||
|
||||
def run(self):
|
||||
while True:
|
||||
self.wake_event.wait(300)
|
||||
if self._stopped:
|
||||
return
|
||||
try:
|
||||
self.gearman.lookForLostBuilds()
|
||||
except Exception:
|
||||
self.log.exception("Exception checking builds:")
|
||||
|
||||
|
||||
def getJobData(job):
|
||||
if not len(job.data):
|
||||
return {}
|
||||
d = job.data[-1]
|
||||
if not d:
|
||||
return {}
|
||||
return json.loads(d)
|
||||
|
||||
|
||||
class ZuulGearmanClient(gear.Client):
|
||||
def __init__(self, zuul_gearman):
|
||||
super(ZuulGearmanClient, self).__init__('Zuul Executor Client')
|
||||
self.__zuul_gearman = zuul_gearman
|
||||
|
||||
def handleWorkComplete(self, packet):
|
||||
job = super(ZuulGearmanClient, self).handleWorkComplete(packet)
|
||||
self.__zuul_gearman.onBuildCompleted(job)
|
||||
return job
|
||||
|
||||
def handleWorkFail(self, packet):
|
||||
job = super(ZuulGearmanClient, self).handleWorkFail(packet)
|
||||
self.__zuul_gearman.onBuildCompleted(job)
|
||||
return job
|
||||
|
||||
def handleWorkException(self, packet):
|
||||
job = super(ZuulGearmanClient, self).handleWorkException(packet)
|
||||
self.__zuul_gearman.onBuildCompleted(job)
|
||||
return job
|
||||
|
||||
def handleWorkStatus(self, packet):
|
||||
job = super(ZuulGearmanClient, self).handleWorkStatus(packet)
|
||||
self.__zuul_gearman.onWorkStatus(job)
|
||||
return job
|
||||
|
||||
def handleWorkData(self, packet):
|
||||
job = super(ZuulGearmanClient, self).handleWorkData(packet)
|
||||
self.__zuul_gearman.onWorkStatus(job)
|
||||
return job
|
||||
|
||||
def handleDisconnect(self, job):
|
||||
job = super(ZuulGearmanClient, self).handleDisconnect(job)
|
||||
self.__zuul_gearman.onDisconnect(job)
|
||||
|
||||
def handleStatusRes(self, packet):
|
||||
try:
|
||||
super(ZuulGearmanClient, self).handleStatusRes(packet)
|
||||
except gear.UnknownJobError:
|
||||
handle = packet.getArgument(0)
|
||||
for build in self.__zuul_gearman.builds.values():
|
||||
if build._gearman_job.handle == handle:
|
||||
self.__zuul_gearman.onUnknownJob(build)
|
||||
from zuul.zk.executor import ExecutorApi
|
||||
|
||||
|
||||
class ExecutorClient(object):
|
||||
log = logging.getLogger("zuul.ExecutorClient")
|
||||
_executor_api_class = ExecutorApi
|
||||
|
||||
def __init__(self, config, sched):
|
||||
self.config = config
|
||||
|
@ -124,29 +39,13 @@ class ExecutorClient(object):
|
|||
self.builds = {}
|
||||
self.meta_jobs = {} # A list of meta-jobs like stop or describe
|
||||
|
||||
self.executor_api = self._executor_api_class(self.sched.zk_client)
|
||||
self.result_events = PipelineResultEventQueue.createRegistry(
|
||||
self.sched.zk_client
|
||||
)
|
||||
|
||||
server = config.get('gearman', 'server')
|
||||
port = get_default(self.config, 'gearman', 'port', 4730)
|
||||
ssl_key = get_default(self.config, 'gearman', 'ssl_key')
|
||||
ssl_cert = get_default(self.config, 'gearman', 'ssl_cert')
|
||||
ssl_ca = get_default(self.config, 'gearman', 'ssl_ca')
|
||||
self.gearman = ZuulGearmanClient(self)
|
||||
self.gearman.addServer(server, port, ssl_key, ssl_cert, ssl_ca,
|
||||
keepalive=True, tcp_keepidle=60,
|
||||
tcp_keepintvl=30, tcp_keepcnt=5)
|
||||
|
||||
self.cleanup_thread = GearmanCleanup(self)
|
||||
self.cleanup_thread.start()
|
||||
|
||||
def stop(self):
|
||||
self.log.debug("Stopping")
|
||||
self.cleanup_thread.stop()
|
||||
self.cleanup_thread.join()
|
||||
self.gearman.shutdown()
|
||||
self.log.debug("Stopped")
|
||||
|
||||
def execute(self, job, item, pipeline, dependent_changes=[],
|
||||
merger_items=[]):
|
||||
|
@ -192,15 +91,13 @@ class ExecutorClient(object):
|
|||
completed_event
|
||||
)
|
||||
|
||||
return build
|
||||
return
|
||||
|
||||
# Update zuul attempts after addBuild above to ensure build_set
|
||||
# is up to date.
|
||||
attempts = build.build_set.getTries(job.name)
|
||||
params["zuul"]['attempts'] = attempts
|
||||
|
||||
functions = getGearmanFunctions(self.gearman)
|
||||
function_name = 'executor:execute'
|
||||
# Because all nodes belong to the same provider, region and
|
||||
# availability zone we can get executor_zone from only the first
|
||||
# node.
|
||||
|
@ -209,50 +106,32 @@ class ExecutorClient(object):
|
|||
executor_zone = params[
|
||||
"nodes"][0]['attributes'].get('executor-zone')
|
||||
|
||||
zone_known = False
|
||||
if executor_zone:
|
||||
_fname = '%s:%s' % (
|
||||
function_name,
|
||||
executor_zone)
|
||||
if _fname in functions:
|
||||
function_name = _fname
|
||||
else:
|
||||
# Check the component registry for executors subscribed to this
|
||||
# zone
|
||||
for comp in self.sched.component_registry.all(kind="executor"):
|
||||
if comp.zone == executor_zone:
|
||||
zone_known = True
|
||||
break
|
||||
|
||||
if not zone_known:
|
||||
self.log.warning(
|
||||
"Job requested '%s' zuul-executor zone, but no "
|
||||
"zuul-executors found for this zone; ignoring zone "
|
||||
"request" % executor_zone)
|
||||
"request", executor_zone)
|
||||
# Fall back to the default zone
|
||||
executor_zone = None
|
||||
|
||||
gearman_job = gear.TextJob(
|
||||
function_name, json_dumps(params), unique=uuid)
|
||||
|
||||
build._gearman_job = gearman_job
|
||||
build.__gearman_worker = None
|
||||
|
||||
if pipeline.precedence == PRECEDENCE_NORMAL:
|
||||
precedence = gear.PRECEDENCE_NORMAL
|
||||
elif pipeline.precedence == PRECEDENCE_HIGH:
|
||||
precedence = gear.PRECEDENCE_HIGH
|
||||
elif pipeline.precedence == PRECEDENCE_LOW:
|
||||
precedence = gear.PRECEDENCE_LOW
|
||||
|
||||
try:
|
||||
self.gearman.submitJob(gearman_job, precedence=precedence,
|
||||
timeout=300)
|
||||
except Exception:
|
||||
log.exception("Unable to submit job to Gearman")
|
||||
# TODO (felix): Directly put the result event in the queue
|
||||
self.onBuildCompleted(gearman_job, 'EXCEPTION')
|
||||
return build
|
||||
|
||||
if not gearman_job.handle:
|
||||
log.error("No job handle was received for %s after"
|
||||
" 300 seconds; marking as lost.",
|
||||
gearman_job)
|
||||
# TODO (felix): Directly put the result event in the queue
|
||||
self.onBuildCompleted(gearman_job, 'NO_HANDLE')
|
||||
|
||||
log.debug("Received handle %s for %s", gearman_job.handle, build)
|
||||
|
||||
return build
|
||||
build.build_request_ref = self.executor_api.submit(
|
||||
uuid=uuid,
|
||||
tenant_name=build.build_set.item.pipeline.tenant.name,
|
||||
pipeline_name=build.build_set.item.pipeline.name,
|
||||
params=params,
|
||||
zone=executor_zone,
|
||||
event_id=item.event.zuul_event_id,
|
||||
precedence=PRIORITY_MAP[pipeline.precedence],
|
||||
)
|
||||
|
||||
def cancel(self, build):
|
||||
log = get_annotated_logger(self.log, build.zuul_event_id,
|
||||
|
@ -261,137 +140,77 @@ class ExecutorClient(object):
|
|||
log.info("Cancel build %s for job %s", build, build.job)
|
||||
|
||||
build.canceled = True
|
||||
try:
|
||||
job = build._gearman_job # noqa
|
||||
except AttributeError:
|
||||
log.debug("Build has no associated gearman job")
|
||||
|
||||
if not build.build_request_ref:
|
||||
log.debug("Build has not been submitted to ZooKeeper")
|
||||
return False
|
||||
|
||||
if build.__gearman_worker is not None:
|
||||
log.debug("Build has already started")
|
||||
self.cancelRunningBuild(build)
|
||||
log.debug("Canceled running build")
|
||||
return True
|
||||
else:
|
||||
log.debug("Build has not started yet")
|
||||
build_request = self.executor_api.get(build.build_request_ref)
|
||||
if build_request:
|
||||
log.debug("Canceling build request %s", build_request)
|
||||
# If we can acquire the build request lock here, the build wasn't
|
||||
# picked up by any executor server yet. With acquiring the lock
|
||||
# we prevent the executor server from picking up the build so we
|
||||
# can cancel it before it will run.
|
||||
if self.executor_api.lock(build_request, blocking=False):
|
||||
log.debug(
|
||||
"Canceling build %s directly because it is not locked by "
|
||||
"any executor",
|
||||
build_request,
|
||||
)
|
||||
# Mark the build request as complete and forward the event to
|
||||
# the scheduler, so the executor server doesn't pick up the
|
||||
# request. The build will be deleted from the scheduler when it
|
||||
# picks up the BuildCompletedEvent.
|
||||
try:
|
||||
build_request.state = BuildRequest.COMPLETED
|
||||
self.executor_api.update(build_request)
|
||||
|
||||
log.debug("Looking for build in queue")
|
||||
if self.cancelJobInQueue(build):
|
||||
log.debug("Removed build from queue")
|
||||
result = {"result": "CANCELED", "end_time": time.time()}
|
||||
tenant_name = build.build_set.item.pipeline.tenant.name
|
||||
pipeline_name = build.build_set.item.pipeline.name
|
||||
event = BuildCompletedEvent(build_request.uuid, result)
|
||||
self.result_events[tenant_name][pipeline_name].put(event)
|
||||
finally:
|
||||
self.executor_api.unlock(build_request)
|
||||
else:
|
||||
log.debug(
|
||||
"Sending cancel request for build %s because it is locked",
|
||||
build_request,
|
||||
)
|
||||
# If the build request is locked, schedule a cancel request in
|
||||
# the executor server.
|
||||
self.executor_api.requestCancel(build_request)
|
||||
|
||||
log.debug("Canceled build")
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def resumeBuild(self, build: Build) -> bool:
|
||||
log = get_annotated_logger(self.log, build.zuul_event_id)
|
||||
|
||||
if not build.build_request_ref:
|
||||
log.debug("Build has not been submitted")
|
||||
return False
|
||||
|
||||
time.sleep(1)
|
||||
|
||||
log.debug("Still unable to find build to cancel")
|
||||
if build.__gearman_worker is not None:
|
||||
log.debug("Build has just started")
|
||||
self.cancelRunningBuild(build)
|
||||
log.debug("Canceled running build")
|
||||
return True
|
||||
log.error("Unable to cancel build")
|
||||
|
||||
def onBuildCompleted(self, job, result=None):
|
||||
if job.unique in self.meta_jobs:
|
||||
del self.meta_jobs[job.unique]
|
||||
return
|
||||
|
||||
# TODO (felix): Remove this once the builds are executed via ZooKeeper.
|
||||
# It's currently necessary to set the correct private attribute on the
|
||||
# build for the gearman worker.
|
||||
def setWorkerInfo(self, build, data):
|
||||
# Update information about worker
|
||||
build.worker.updateFromData(data)
|
||||
build.__gearman_worker = build.worker.name
|
||||
|
||||
def onWorkStatus(self, job):
|
||||
data = getJobData(job)
|
||||
self.log.debug("Build %s update %s" % (job, data))
|
||||
|
||||
def onDisconnect(self, job):
|
||||
self.log.info("Gearman job %s lost due to disconnect" % job)
|
||||
self.onBuildCompleted(job, 'DISCONNECT')
|
||||
|
||||
build = self.builds.get(job.unique)
|
||||
if build:
|
||||
tenant_name = build.build_set.item.pipeline.tenant.name
|
||||
pipeline_name = build.build_set.item.pipeline.name
|
||||
result = {"result": "DISCONNECT", "end_time": time.time()}
|
||||
event = BuildCompletedEvent(build.uuid, result)
|
||||
self.result_events[tenant_name][pipeline_name].put(event)
|
||||
|
||||
def onUnknownJob(self, build):
|
||||
self.log.info("Gearman job for build %s lost "
|
||||
"due to unknown handle" % build)
|
||||
# We don't need to call onBuildCompleted, because by
|
||||
# definition, we have no record of the job.
|
||||
tenant_name = build.build_set.item.pipeline.tenant.name
|
||||
pipeline_name = build.build_set.item.pipeline.name
|
||||
result = {"result": "LOST", "end_time": time.time()}
|
||||
event = BuildCompletedEvent(build.uuid, result)
|
||||
self.result_events[tenant_name][pipeline_name].put(event)
|
||||
|
||||
def cancelJobInQueue(self, build):
|
||||
log = get_annotated_logger(self.log, build.zuul_event_id,
|
||||
build=build.uuid)
|
||||
job = build._gearman_job
|
||||
|
||||
req = gear.CancelJobAdminRequest(job.handle)
|
||||
job.connection.sendAdminRequest(req, timeout=300)
|
||||
log.debug("Response to cancel build request: %s", req.response.strip())
|
||||
if req.response.startswith(b"OK"):
|
||||
# Since this isn't otherwise going to get a build complete
|
||||
# event, send one to the scheduler so that it can unlock
|
||||
# the nodes.
|
||||
tenant_name = build.build_set.item.pipeline.tenant.name
|
||||
pipeline_name = build.build_set.item.pipeline.name
|
||||
|
||||
result = {"result": "CANCELED", "end_time": time.time()}
|
||||
event = BuildCompletedEvent(build.uuid, result)
|
||||
self.result_events[tenant_name][pipeline_name].put(event)
|
||||
build_request = self.executor_api.get(build.build_request_ref)
|
||||
if build_request:
|
||||
log.debug("Requesting resume for build %s", build)
|
||||
self.executor_api.requestResume(build_request)
|
||||
return True
|
||||
return False
|
||||
|
||||
def cancelRunningBuild(self, build):
|
||||
def removeBuild(self, build: Build) -> None:
|
||||
log = get_annotated_logger(self.log, build.zuul_event_id)
|
||||
if not build.__gearman_worker:
|
||||
log.error("Build %s has no manager while canceling", build)
|
||||
stop_uuid = str(uuid4().hex)
|
||||
data = dict(uuid=build._gearman_job.unique,
|
||||
zuul_event_id=build.zuul_event_id)
|
||||
stop_job = gear.TextJob("executor:stop:%s" % build.__gearman_worker,
|
||||
json_dumps(data), unique=stop_uuid)
|
||||
self.meta_jobs[stop_uuid] = stop_job
|
||||
log.debug("Submitting stop job: %s", stop_job)
|
||||
self.gearman.submitJob(stop_job, precedence=gear.PRECEDENCE_HIGH,
|
||||
timeout=300)
|
||||
return True
|
||||
log.debug("Removing build %s", build.uuid)
|
||||
|
||||
def resumeBuild(self, build):
|
||||
log = get_annotated_logger(self.log, build.zuul_event_id)
|
||||
if not build.__gearman_worker:
|
||||
log.error("Build %s has no manager while resuming", build)
|
||||
resume_uuid = str(uuid4().hex)
|
||||
data = dict(uuid=build._gearman_job.unique,
|
||||
zuul_event_id=build.zuul_event_id)
|
||||
stop_job = gear.TextJob("executor:resume:%s" % build.__gearman_worker,
|
||||
json_dumps(data), unique=resume_uuid)
|
||||
self.meta_jobs[resume_uuid] = stop_job
|
||||
log.debug("Submitting resume job: %s", stop_job)
|
||||
self.gearman.submitJob(stop_job, precedence=gear.PRECEDENCE_HIGH,
|
||||
timeout=300)
|
||||
if not build.build_request_ref:
|
||||
log.debug("Build has not been submitted to ZooKeeper")
|
||||
return
|
||||
|
||||
def lookForLostBuilds(self):
|
||||
self.log.debug("Looking for lost builds")
|
||||
# Construct a list from the values iterator to protect from it changing
|
||||
# out from underneath us.
|
||||
for build in list(self.builds.values()):
|
||||
if build.result:
|
||||
# The build has finished, it will be removed
|
||||
continue
|
||||
job = build._gearman_job
|
||||
if not job.handle:
|
||||
# The build hasn't been enqueued yet
|
||||
continue
|
||||
p = gear.Packet(gear.constants.REQ, gear.constants.GET_STATUS,
|
||||
job.handle)
|
||||
job.connection.sendPacket(p)
|
||||
build_request = self.executor_api.get(build.build_request_ref)
|
||||
if build_request:
|
||||
self.executor_api.remove(build_request)
|
||||
|
||||
del self.builds[build.uuid]
|
||||
|
|
|
@ -30,13 +30,12 @@ import threading
|
|||
import time
|
||||
import traceback
|
||||
from concurrent.futures.process import ProcessPoolExecutor, BrokenProcessPool
|
||||
from typing import Dict
|
||||
|
||||
import git
|
||||
from kazoo.exceptions import BadVersionError
|
||||
from urllib.parse import urlsplit
|
||||
|
||||
from zuul.lib.ansible import AnsibleManager
|
||||
from zuul.lib.gearworker import ZuulGearWorker
|
||||
from zuul.lib.result_data import get_warnings_from_result_data
|
||||
from zuul.lib import yamlutil as yaml
|
||||
from zuul.lib.config import get_default
|
||||
|
@ -59,11 +58,17 @@ from zuul.executor.sensors.ram import RAMSensor
|
|||
from zuul.lib import commandsocket
|
||||
from zuul.merger.server import BaseMergeServer, RepoLocks
|
||||
from zuul.model import (
|
||||
BuildCompletedEvent, BuildPausedEvent, BuildStartedEvent, BuildStatusEvent
|
||||
BuildCompletedEvent,
|
||||
BuildPausedEvent,
|
||||
BuildRequest,
|
||||
BuildStartedEvent,
|
||||
BuildStatusEvent,
|
||||
)
|
||||
import zuul.model
|
||||
from zuul.zk.event_queues import PipelineResultEventQueue
|
||||
from zuul.zk.components import ExecutorComponent
|
||||
from zuul.zk.exceptions import BuildRequestNotFound
|
||||
from zuul.zk.executor import BuildRequestEvent, ExecutorApi
|
||||
|
||||
BUFFER_LINES_FOR_SYNTAX = 200
|
||||
COMMANDS = ['stop', 'pause', 'unpause', 'graceful', 'verbose',
|
||||
|
@ -872,10 +877,10 @@ class AnsibleJob(object):
|
|||
RESULT_DISK_FULL: 'RESULT_DISK_FULL',
|
||||
}
|
||||
|
||||
def __init__(self, executor_server, job):
|
||||
def __init__(self, executor_server, build_request, arguments):
|
||||
logger = logging.getLogger("zuul.AnsibleJob")
|
||||
self.arguments = json.loads(job.arguments)
|
||||
self.zuul_event_id = self.arguments.get('zuul_event_id')
|
||||
self.arguments = arguments
|
||||
self.zuul_event_id = self.arguments["zuul_event_id"]
|
||||
# Record ansible version being used for the cleanup phase
|
||||
self.ansible_version = self.arguments.get('ansible_version')
|
||||
# TODO(corvus): Remove default setting after 4.3.0; this is to handle
|
||||
|
@ -883,9 +888,10 @@ class AnsibleJob(object):
|
|||
self.scheme = self.arguments.get('workspace_scheme',
|
||||
zuul.model.SCHEME_GOLANG)
|
||||
self.log = get_annotated_logger(
|
||||
logger, self.zuul_event_id, build=job.unique)
|
||||
logger, self.zuul_event_id, build=build_request.uuid
|
||||
)
|
||||
self.executor_server = executor_server
|
||||
self.job = job
|
||||
self.build_request = build_request
|
||||
self.jobdir = None
|
||||
self.proc = None
|
||||
self.proc_lock = threading.Lock()
|
||||
|
@ -917,7 +923,7 @@ class AnsibleJob(object):
|
|||
'executor',
|
||||
'winrm_read_timeout_sec')
|
||||
self.ssh_agent = SshAgent(zuul_event_id=self.zuul_event_id,
|
||||
build=self.job.unique)
|
||||
build=self.build_request.uuid)
|
||||
self.port_forwards = []
|
||||
self.executor_variables_file = None
|
||||
|
||||
|
@ -952,7 +958,7 @@ class AnsibleJob(object):
|
|||
def run(self):
|
||||
self.running = True
|
||||
self.thread = threading.Thread(target=self.execute,
|
||||
name='build-%s' % self.job.unique)
|
||||
name=f"build-{self.build_request.uuid}")
|
||||
self.thread.start()
|
||||
|
||||
def stop(self, reason=None):
|
||||
|
@ -980,7 +986,7 @@ class AnsibleJob(object):
|
|||
data = {'paused': self.paused,
|
||||
'data': result_data,
|
||||
'secret_data': secret_result_data}
|
||||
self.executor_server.pauseBuild(self.job, data)
|
||||
self.executor_server.pauseBuild(self.build_request, data)
|
||||
self._resume_event.wait()
|
||||
|
||||
def resume(self):
|
||||
|
@ -998,6 +1004,7 @@ class AnsibleJob(object):
|
|||
"{now} |\n".format(now=datetime.datetime.now()))
|
||||
|
||||
self.paused = False
|
||||
self.executor_server.resumeBuild(self.build_request)
|
||||
self._resume_event.set()
|
||||
|
||||
def wait(self):
|
||||
|
@ -1009,7 +1016,9 @@ class AnsibleJob(object):
|
|||
self.time_starting_build = time.monotonic()
|
||||
|
||||
# report that job has been taken
|
||||
self.executor_server.startBuild(self.job, self._base_job_data())
|
||||
self.executor_server.startBuild(
|
||||
self.build_request, self._base_job_data()
|
||||
)
|
||||
|
||||
self.ssh_agent.start()
|
||||
self.ssh_agent.add(self.private_key_file)
|
||||
|
@ -1022,7 +1031,7 @@ class AnsibleJob(object):
|
|||
self.ssh_agent.addData(name, private_ssh_key)
|
||||
self.jobdir = JobDir(self.executor_server.jobdir_root,
|
||||
self.executor_server.keep_jobdir,
|
||||
str(self.job.unique))
|
||||
str(self.build_request.uuid))
|
||||
self._execute()
|
||||
except BrokenProcessPool:
|
||||
# The process pool got broken, re-initialize it and send
|
||||
|
@ -1033,11 +1042,11 @@ class AnsibleJob(object):
|
|||
except ExecutorError as e:
|
||||
result_data = dict(result='ERROR', error_detail=e.args[0])
|
||||
self.log.debug("Sending result: %s", result_data)
|
||||
self.executor_server.completeBuild(self.job, result_data)
|
||||
self.executor_server.completeBuild(self.build_request, result_data)
|
||||
except Exception:
|
||||
self.log.exception("Exception while executing job")
|
||||
data = {"exception": traceback.format_exc()}
|
||||
self.executor_server.completeBuild(self.job, data)
|
||||
self.executor_server.completeBuild(self.build_request, data)
|
||||
finally:
|
||||
self.running = False
|
||||
if self.jobdir:
|
||||
|
@ -1056,7 +1065,7 @@ class AnsibleJob(object):
|
|||
except Exception:
|
||||
self.log.exception("Error stopping port forward:")
|
||||
try:
|
||||
self.executor_server.finishJob(self.job.unique)
|
||||
self.executor_server.finishJob(self.build_request.uuid)
|
||||
except Exception:
|
||||
self.log.exception("Error finalizing job thread:")
|
||||
self.log.info("Job execution took: %.3f seconds" % (
|
||||
|
@ -1076,7 +1085,7 @@ class AnsibleJob(object):
|
|||
|
||||
def _send_aborted(self):
|
||||
result = dict(result='ABORTED')
|
||||
self.executor_server.completeBuild(self.job, result)
|
||||
self.executor_server.completeBuild(self.build_request, result)
|
||||
|
||||
def _execute(self):
|
||||
args = self.arguments
|
||||
|
@ -1101,7 +1110,7 @@ class AnsibleJob(object):
|
|||
project['connection'], project['name'],
|
||||
repo_state=repo_state,
|
||||
zuul_event_id=self.zuul_event_id,
|
||||
build=self.job.unique))
|
||||
build=self.build_request.uuid))
|
||||
projects.add((project['connection'], project['name']))
|
||||
|
||||
# ...as well as all playbook and role projects.
|
||||
|
@ -1120,7 +1129,7 @@ class AnsibleJob(object):
|
|||
tasks.append(self.executor_server.update(
|
||||
*key, repo_state=repo_state,
|
||||
zuul_event_id=self.zuul_event_id,
|
||||
build=self.job.unique))
|
||||
build=self.build_request.uuid))
|
||||
projects.add(key)
|
||||
|
||||
for task in tasks:
|
||||
|
@ -1274,13 +1283,13 @@ class AnsibleJob(object):
|
|||
data['url'] = "finger://{hostname}:{port}/{uuid}".format(
|
||||
hostname=self.executor_server.hostname,
|
||||
port=self.executor_server.log_streaming_port,
|
||||
uuid=self.job.unique)
|
||||
uuid=self.build_request.uuid)
|
||||
else:
|
||||
data['url'] = 'finger://{hostname}/{uuid}'.format(
|
||||
hostname=self.executor_server.hostname,
|
||||
uuid=self.job.unique)
|
||||
uuid=self.build_request.uuid)
|
||||
|
||||
self.executor_server.updateBuildStatus(self.job, data)
|
||||
self.executor_server.updateBuildStatus(self.build_request, data)
|
||||
|
||||
result = self.runPlaybooks(args)
|
||||
success = result == 'SUCCESS'
|
||||
|
@ -1303,7 +1312,7 @@ class AnsibleJob(object):
|
|||
secret_data=secret_data)
|
||||
# TODO do we want to log the secret data here?
|
||||
self.log.debug("Sending result: %s", result_data)
|
||||
self.executor_server.completeBuild(self.job, result_data)
|
||||
self.executor_server.completeBuild(self.build_request, result_data)
|
||||
|
||||
def getResultData(self):
|
||||
data = {}
|
||||
|
@ -1406,14 +1415,14 @@ class AnsibleJob(object):
|
|||
# can't fetch them, it should resolve itself.
|
||||
self.log.exception("Could not fetch refs to merge from remote")
|
||||
result = dict(result='ABORTED')
|
||||
self.executor_server.completeBuild(self.job, result)
|
||||
self.executor_server.completeBuild(self.build_request, result)
|
||||
return None
|
||||
if not ret: # merge conflict
|
||||
result = dict(result='MERGER_FAILURE')
|
||||
if self.executor_server.statsd:
|
||||
base_key = "zuul.executor.{hostname}.merger"
|
||||
self.executor_server.statsd.incr(base_key + ".FAILURE")
|
||||
self.executor_server.completeBuild(self.job, result)
|
||||
self.executor_server.completeBuild(self.build_request, result)
|
||||
return None
|
||||
|
||||
if self.executor_server.statsd:
|
||||
|
@ -2229,7 +2238,7 @@ class AnsibleJob(object):
|
|||
zuul_resources[node['name'][0]]['pod'] = data['pod']
|
||||
|
||||
fwd = KubeFwd(zuul_event_id=self.zuul_event_id,
|
||||
build=self.job.unique,
|
||||
build=self.build_request.uuid,
|
||||
kubeconfig=self.jobdir.kubeconfig,
|
||||
context=data['context_name'],
|
||||
namespace=data['namespace'],
|
||||
|
@ -2595,7 +2604,7 @@ class AnsibleJob(object):
|
|||
try:
|
||||
ansible_log = get_annotated_logger(
|
||||
logging.getLogger("zuul.AnsibleJob.output"),
|
||||
self.zuul_event_id, build=self.job.unique)
|
||||
self.zuul_event_id, build=self.build_request.uuid)
|
||||
|
||||
# Use manual idx instead of enumerate so that RESULT lines
|
||||
# don't count towards BUFFER_LINES_FOR_SYNTAX
|
||||
|
@ -2929,21 +2938,6 @@ class ExecutorMergeWorker(gear.TextWorker):
|
|||
super(ExecutorMergeWorker, self).handleNoop(packet)
|
||||
|
||||
|
||||
class ExecutorExecuteWorker(gear.TextWorker):
|
||||
def __init__(self, executor_server, *args, **kw):
|
||||
self.zuul_executor_server = executor_server
|
||||
super(ExecutorExecuteWorker, self).__init__(*args, **kw)
|
||||
|
||||
def handleNoop(self, packet):
|
||||
# Delay our response to running a new job based on the number
|
||||
# of jobs we're currently running, in an attempt to spread
|
||||
# load evenly among executors.
|
||||
workers = len(self.zuul_executor_server.job_workers)
|
||||
delay = (workers ** 2) / 1000.0
|
||||
time.sleep(delay)
|
||||
return super(ExecutorExecuteWorker, self).handleNoop(packet)
|
||||
|
||||
|
||||
class ExecutorServer(BaseMergeServer):
|
||||
log = logging.getLogger("zuul.ExecutorServer")
|
||||
_ansible_manager_class = AnsibleManager
|
||||
|
@ -2963,11 +2957,11 @@ class ExecutorServer(BaseMergeServer):
|
|||
|
||||
self.keep_jobdir = keep_jobdir
|
||||
self.jobdir_root = jobdir_root
|
||||
|
||||
self.keystore = ZooKeeperKeyStorage(
|
||||
self.zk_client,
|
||||
password=self._get_key_store_password())
|
||||
|
||||
self._running = False
|
||||
self._command_running = False
|
||||
# TODOv3(mordred): make the executor name more unique --
|
||||
# perhaps hostname+pid.
|
||||
self.hostname = get_default(self.config, 'executor', 'hostname',
|
||||
|
@ -3109,26 +3103,26 @@ class ExecutorServer(BaseMergeServer):
|
|||
self.component_info.process_merge_jobs = self.process_merge_jobs
|
||||
|
||||
self.result_events = PipelineResultEventQueue.createRegistry(
|
||||
self.zk_client
|
||||
self.zk_client)
|
||||
self.build_worker = threading.Thread(
|
||||
target=self.runBuildWorker,
|
||||
name="ExecutorServerBuildWorkerThread",
|
||||
)
|
||||
|
||||
self.executor_jobs = {
|
||||
"executor:resume:%s" % self.hostname: self.resumeJob,
|
||||
"executor:stop:%s" % self.hostname: self.stopJob,
|
||||
}
|
||||
for function_name in self._getExecuteFunctionNames():
|
||||
self.executor_jobs[function_name] = self.executeJob
|
||||
for function_name in self._getOnlineFunctionNames():
|
||||
self.executor_jobs[function_name] = self.noop
|
||||
self.build_loop_wake_event = threading.Event()
|
||||
|
||||
self.executor_gearworker = ZuulGearWorker(
|
||||
'Zuul Executor Server',
|
||||
'zuul.ExecutorServer.ExecuteWorker',
|
||||
'executor',
|
||||
self.config,
|
||||
self.executor_jobs,
|
||||
worker_class=ExecutorExecuteWorker,
|
||||
worker_args=[self])
|
||||
zone_filter = [self.zone]
|
||||
if self.allow_unzoned:
|
||||
# In case we are allowed to execute unzoned jobs, make sure, we are
|
||||
# subscribed to the default zone.
|
||||
zone_filter.append(None)
|
||||
|
||||
self.executor_api = ExecutorApi(
|
||||
self.zk_client,
|
||||
zone_filter=zone_filter,
|
||||
build_request_callback=self.build_loop_wake_event.set,
|
||||
build_event_callback=self._handleBuildEvent,
|
||||
)
|
||||
|
||||
# Used to offload expensive operations to different processes
|
||||
self.process_worker = None
|
||||
|
@ -3139,24 +3133,6 @@ class ExecutorServer(BaseMergeServer):
|
|||
except KeyError:
|
||||
raise RuntimeError("No key store password configured!")
|
||||
|
||||
def _getFunctionSuffixes(self):
|
||||
suffixes = []
|
||||
if self.zone:
|
||||
suffixes.append(':' + self.zone)
|
||||
if self.allow_unzoned:
|
||||
suffixes.append('')
|
||||
else:
|
||||
suffixes.append('')
|
||||
return suffixes
|
||||
|
||||
def _getExecuteFunctionNames(self):
|
||||
base_name = 'executor:execute'
|
||||
return [base_name + suffix for suffix in self._getFunctionSuffixes()]
|
||||
|
||||
def _getOnlineFunctionNames(self):
|
||||
base_name = 'executor:online'
|
||||
return [base_name + suffix for suffix in self._getFunctionSuffixes()]
|
||||
|
||||
def _repoLock(self, connection_name, project_name):
|
||||
return self.repo_locks.getRepoLock(connection_name, project_name)
|
||||
|
||||
|
@ -3191,7 +3167,7 @@ class ExecutorServer(BaseMergeServer):
|
|||
self.log.warning('Multiprocessing context has already been set')
|
||||
self.process_worker = ProcessPoolExecutor()
|
||||
|
||||
self.executor_gearworker.start()
|
||||
self.build_worker.start()
|
||||
|
||||
self.log.debug("Starting command processor")
|
||||
self.command_socket.start()
|
||||
|
@ -3219,23 +3195,14 @@ class ExecutorServer(BaseMergeServer):
|
|||
def register_work(self):
|
||||
if self._running:
|
||||
self.accepting_work = True
|
||||
for function in self._getExecuteFunctionNames():
|
||||
self.executor_gearworker.gearman.registerFunction(function)
|
||||
# TODO(jeblair): Update geard to send a noop after
|
||||
# registering for a job which is in the queue, then remove
|
||||
# this API violation.
|
||||
self.executor_gearworker.gearman._sendGrabJobUniq()
|
||||
self.build_loop_wake_event.set()
|
||||
|
||||
def unregister_work(self):
|
||||
self.accepting_work = False
|
||||
for function in self._getExecuteFunctionNames():
|
||||
self.executor_gearworker.gearman.unRegisterFunction(function)
|
||||
|
||||
def stop(self):
|
||||
self.log.debug("Stopping")
|
||||
self.component_info.state = self.component_info.STOPPED
|
||||
# Use the BaseMergeServer's stop method to disconnect from ZooKeeper.
|
||||
super().stop()
|
||||
self.connections.stop()
|
||||
self.disk_accountant.stop()
|
||||
# The governor can change function registration, so make sure
|
||||
|
@ -3245,7 +3212,6 @@ class ExecutorServer(BaseMergeServer):
|
|||
# Stop accepting new jobs
|
||||
if self.merger_gearworker is not None:
|
||||
self.merger_gearworker.gearman.setFunctions([])
|
||||
self.executor_gearworker.gearman.setFunctions([])
|
||||
# Tell the executor worker to abort any jobs it just accepted,
|
||||
# and grab the list of currently running job workers.
|
||||
with self.run_lock:
|
||||
|
@ -3276,7 +3242,8 @@ class ExecutorServer(BaseMergeServer):
|
|||
|
||||
# All job results should have been sent by now, shutdown the
|
||||
# gearman workers.
|
||||
self.executor_gearworker.stop()
|
||||
self.build_loop_wake_event.set()
|
||||
self.build_worker.join()
|
||||
|
||||
if self.process_worker is not None:
|
||||
self.process_worker.shutdown()
|
||||
|
@ -3287,6 +3254,10 @@ class ExecutorServer(BaseMergeServer):
|
|||
self.statsd.gauge(base_key + '.pct_used_ram', 0)
|
||||
self.statsd.gauge(base_key + '.running_builds', 0)
|
||||
|
||||
# Use the BaseMergeServer's stop method to disconnect from
|
||||
# ZooKeeper. We do this as one of the last steps to ensure
|
||||
# that all ZK related components can be stopped first.
|
||||
super().stop()
|
||||
self.stop_repl()
|
||||
self.log.debug("Stopped")
|
||||
|
||||
|
@ -3296,7 +3267,8 @@ class ExecutorServer(BaseMergeServer):
|
|||
update_thread.join()
|
||||
if self.process_merge_jobs:
|
||||
super().join()
|
||||
self.executor_gearworker.join()
|
||||
self.build_loop_wake_event.set()
|
||||
self.build_worker.join()
|
||||
self.command_thread.join()
|
||||
|
||||
def pause(self):
|
||||
|
@ -3440,20 +3412,110 @@ class ExecutorServer(BaseMergeServer):
|
|||
log.error(msg)
|
||||
raise Exception(msg)
|
||||
|
||||
def executeJob(self, job):
|
||||
args = json.loads(job.arguments)
|
||||
zuul_event_id = args.get('zuul_event_id')
|
||||
def executeJob(self, build_request, params):
|
||||
zuul_event_id = params['zuul_event_id']
|
||||
log = get_annotated_logger(self.log, zuul_event_id)
|
||||
log.debug("Got %s job: %s", job.name, job.unique)
|
||||
log.debug(
|
||||
"Got %s job: %s",
|
||||
params["zuul"]["job"],
|
||||
build_request.uuid,
|
||||
)
|
||||
if self.statsd:
|
||||
base_key = 'zuul.executor.{hostname}'
|
||||
self.statsd.incr(base_key + '.builds')
|
||||
self.job_workers[job.unique] = self._job_class(self, job)
|
||||
self.job_workers[build_request.uuid] = self._job_class(
|
||||
self, build_request, params
|
||||
)
|
||||
# Run manageLoad before starting the thread mostly for the
|
||||
# benefit of the unit tests to make the calculation of the
|
||||
# number of starting jobs more deterministic.
|
||||
self.manageLoad()
|
||||
self.job_workers[job.unique].run()
|
||||
self.job_workers[build_request.uuid].run()
|
||||
|
||||
def _handleBuildEvent(self, build_request, build_event):
|
||||
# TODO (felix): Would it harm to simply keep the cancel/resume node? If
|
||||
# not we could avoid this ZK update. The cancel request can anyway
|
||||
# only be fulfilled by the executor that executes the job. So, if
|
||||
# that executor died, no other can pick up the request.
|
||||
if build_event == BuildRequestEvent.CANCELED:
|
||||
self.executor_api.fulfillCancel(build_request)
|
||||
self.stopJob(build_request)
|
||||
elif build_event == BuildRequestEvent.RESUMED:
|
||||
self.executor_api.fulfillResume(build_request)
|
||||
self.resumeJob(build_request)
|
||||
elif build_event == BuildRequestEvent.DELETED:
|
||||
self.stopJob(build_request)
|
||||
|
||||
def runBuildWorker(self):
|
||||
while self._running:
|
||||
self.build_loop_wake_event.wait()
|
||||
self.build_loop_wake_event.clear()
|
||||
for build_request in self.executor_api.next():
|
||||
# Check the sensors again as they might have changed in the
|
||||
# meantime. E.g. the last build started within the next()
|
||||
# generator could have fulfilled the StartingBuildSensor.
|
||||
if not self.accepting_work:
|
||||
break
|
||||
if not self._running:
|
||||
break
|
||||
|
||||
try:
|
||||
self._runBuildWorker(build_request)
|
||||
except Exception:
|
||||
log = get_annotated_logger(
|
||||
self.log, event=None, build=build_request.uuid
|
||||
)
|
||||
log.exception("Exception while running job")
|
||||
result = {
|
||||
"result": "ERROR",
|
||||
"exception": traceback.format_exc(),
|
||||
}
|
||||
self.completeBuild(build_request, result)
|
||||
|
||||
def _runBuildWorker(self, build_request: BuildRequest):
|
||||
log = get_annotated_logger(
|
||||
self.log, event=None, build=build_request.uuid
|
||||
)
|
||||
if not self.executor_api.lock(build_request, blocking=False):
|
||||
return
|
||||
|
||||
build_request.state = BuildRequest.RUNNING
|
||||
params = build_request.params
|
||||
build_request.params = None
|
||||
# Directly update the build in ZooKeeper, so we don't
|
||||
# loop over and try to lock it again and again.
|
||||
self.executor_api.update(build_request)
|
||||
log.debug("Next executed job: %s", build_request)
|
||||
self.executeJob(build_request, params)
|
||||
|
||||
def runCleanupWorker(self):
|
||||
while self._running:
|
||||
try:
|
||||
self.cleanup_election.run(self._runCleanupWorker)
|
||||
except Exception:
|
||||
self.log.exception("Exception in cleanup worker")
|
||||
|
||||
def _runCleanupWorker(self):
|
||||
while self._running:
|
||||
for build_request in self.executor_api.lostBuildRequests():
|
||||
try:
|
||||
result = {"result": "CANCELED"}
|
||||
self.completeBuild(build_request, result)
|
||||
except BadVersionError:
|
||||
# There could be a race condition:
|
||||
# The build is found by lost_builds in state RUNNING
|
||||
# but gets completed/unlocked before the is_locked()
|
||||
# check. Since we use the znode version, the update
|
||||
# will fail in this case and we can simply ignore the
|
||||
# exception.
|
||||
pass
|
||||
if not self._running:
|
||||
return
|
||||
# TODO (felix): It should be enough to execute the cleanup every
|
||||
# 60 minutes. Find a proper way to do that. Maybe we could combine
|
||||
# this with other cleanups in the scheduler and use APScheduler for
|
||||
# proper scheduling.
|
||||
time.sleep(5)
|
||||
|
||||
def run_governor(self):
|
||||
while not self.governor_stop_event.wait(10):
|
||||
|
@ -3499,32 +3561,28 @@ class ExecutorServer(BaseMergeServer):
|
|||
|
||||
def finishJob(self, unique):
|
||||
del(self.job_workers[unique])
|
||||
self.log.debug(
|
||||
"Finishing Job: %s, queue(%d): %s",
|
||||
unique,
|
||||
len(self.job_workers),
|
||||
self.job_workers,
|
||||
)
|
||||
|
||||
def stopJobDiskFull(self, jobdir):
|
||||
unique = os.path.basename(jobdir)
|
||||
self.stopJobByUnique(unique, reason=AnsibleJob.RESULT_DISK_FULL)
|
||||
|
||||
def resumeJob(self, job):
|
||||
try:
|
||||
args = json.loads(job.arguments)
|
||||
zuul_event_id = args.get('zuul_event_id')
|
||||
log = get_annotated_logger(self.log, zuul_event_id)
|
||||
log.debug("Resume job with arguments: %s", args)
|
||||
unique = args['uuid']
|
||||
self.resumeJobByUnique(unique, zuul_event_id=zuul_event_id)
|
||||
finally:
|
||||
job.sendWorkComplete()
|
||||
def resumeJob(self, build_request):
|
||||
log = get_annotated_logger(self.log, build_request.event_id)
|
||||
log.debug("Resume job")
|
||||
self.resumeJobByUnique(
|
||||
build_request.uuid, build_request.event_id
|
||||
)
|
||||
|
||||
def stopJob(self, job):
|
||||
try:
|
||||
args = json.loads(job.arguments)
|
||||
zuul_event_id = args.get('zuul_event_id')
|
||||
log = get_annotated_logger(self.log, zuul_event_id)
|
||||
log.debug("Stop job with arguments: %s", args)
|
||||
unique = args['uuid']
|
||||
self.stopJobByUnique(unique, zuul_event_id=zuul_event_id)
|
||||
finally:
|
||||
job.sendWorkComplete()
|
||||
def stopJob(self, build_request):
|
||||
log = get_annotated_logger(self.log, build_request.event_id)
|
||||
log.debug("Stop job")
|
||||
self.stopJobByUnique(build_request.uuid, build_request.event_id)
|
||||
|
||||
def resumeJobByUnique(self, unique, zuul_event_id=None):
|
||||
log = get_annotated_logger(self.log, zuul_event_id)
|
||||
|
@ -3548,62 +3606,57 @@ class ExecutorServer(BaseMergeServer):
|
|||
except Exception:
|
||||
log.exception("Exception sending stop command to worker:")
|
||||
|
||||
def startBuild(self, job: gear.TextJob, data: Dict) -> None:
|
||||
# Mark the gearman job as started, as we are still using it for the
|
||||
# actual job execution. The data, however, will be passed to the
|
||||
# scheduler via the event queues in ZooKeeper.
|
||||
job.sendWorkData(json.dumps(data))
|
||||
|
||||
def startBuild(self, build_request, data):
|
||||
# TODO (felix): Once the builds are stored in ZooKeeper, we can store
|
||||
# the start_time directly on the build. But for now we have to use the
|
||||
# data dict for that.
|
||||
data["start_time"] = time.time()
|
||||
|
||||
params = json.loads(job.arguments)
|
||||
tenant_name = params["zuul"]["tenant"]
|
||||
pipeline_name = params["zuul"]["pipeline"]
|
||||
event = BuildStartedEvent(build_request.uuid, data)
|
||||
self.result_events[build_request.tenant_name][
|
||||
build_request.pipeline_name].put(event)
|
||||
|
||||
event = BuildStartedEvent(job.unique, data)
|
||||
self.result_events[tenant_name][pipeline_name].put(event)
|
||||
def updateBuildStatus(self, build_request, data):
|
||||
event = BuildStatusEvent(build_request.uuid, data)
|
||||
self.result_events[build_request.tenant_name][
|
||||
build_request.pipeline_name].put(event)
|
||||
|
||||
def updateBuildStatus(self, job: gear.TextJob, data: Dict) -> None:
|
||||
job.sendWorkData(json.dumps(data))
|
||||
def pauseBuild(self, build_request, data):
|
||||
build_request.state = BuildRequest.PAUSED
|
||||
try:
|
||||
self.executor_api.update(build_request)
|
||||
except BuildRequestNotFound as e:
|
||||
self.log.warning("Could not pause build: %s", str(e))
|
||||
return
|
||||
|
||||
params = json.loads(job.arguments)
|
||||
tenant_name = params["zuul"]["tenant"]
|
||||
pipeline_name = params["zuul"]["pipeline"]
|
||||
event = BuildPausedEvent(build_request.uuid, data)
|
||||
self.result_events[build_request.tenant_name][
|
||||
build_request.pipeline_name].put(event)
|
||||
|
||||
event = BuildStatusEvent(job.unique, data)
|
||||
self.result_events[tenant_name][pipeline_name].put(event)
|
||||
|
||||
def pauseBuild(self, job: gear.TextJob, data: Dict) -> None:
|
||||
# Mark the gearman job as paused, as we are still using it for the
|
||||
# actual job execution. The data, however, will be passed to the
|
||||
# scheduler via the event queues in ZooKeeper.
|
||||
job.sendWorkData(json.dumps(data))
|
||||
|
||||
params = json.loads(job.arguments)
|
||||
tenant_name = params["zuul"]["tenant"]
|
||||
pipeline_name = params["zuul"]["pipeline"]
|
||||
|
||||
event = BuildPausedEvent(job.unique, data)
|
||||
self.result_events[tenant_name][pipeline_name].put(event)
|
||||
|
||||
def completeBuild(self, job: gear.TextJob, result: Dict) -> None:
|
||||
# Mark the gearman job as complete, as we are still using it for the
|
||||
# actual job execution. The result, however, will be passed to the
|
||||
# scheduler via the event queues in ZooKeeper.
|
||||
# TODO (felix): Remove the data from the complete() call
|
||||
job.sendWorkComplete(json.dumps(result))
|
||||
def resumeBuild(self, build_request):
|
||||
build_request.state = BuildRequest.RUNNING
|
||||
try:
|
||||
self.executor_api.update(build_request)
|
||||
except BuildRequestNotFound as e:
|
||||
self.log.warning("Could not resume build: %s", str(e))
|
||||
return
|
||||
|
||||
def completeBuild(self, build_request, result):
|
||||
# TODO (felix): Once the builds are stored in ZooKeeper, we can store
|
||||
# the end_time directly on the build. But for now we have to use the
|
||||
# result dict for that.
|
||||
result["end_time"] = time.time()
|
||||
|
||||
params = json.loads(job.arguments)
|
||||
tenant_name = params["zuul"]["tenant"]
|
||||
pipeline_name = params["zuul"]["pipeline"]
|
||||
build_request.state = BuildRequest.COMPLETED
|
||||
try:
|
||||
self.executor_api.update(build_request)
|
||||
except BuildRequestNotFound as e:
|
||||
self.log.warning("Could not complete build: %s", str(e))
|
||||
return
|
||||
|
||||
event = BuildCompletedEvent(job.unique, result)
|
||||
self.result_events[tenant_name][pipeline_name].put(event)
|
||||
# Unlock the build request
|
||||
self.executor_api.unlock(build_request)
|
||||
|
||||
event = BuildCompletedEvent(build_request.uuid, result)
|
||||
self.result_events[build_request.tenant_name][
|
||||
build_request.pipeline_name].put(event)
|
||||
|
|
|
@ -19,7 +19,6 @@ import copy
|
|||
import json
|
||||
import logging
|
||||
import os
|
||||
from itertools import chain
|
||||
from functools import total_ordering
|
||||
|
||||
import re2
|
||||
|
@ -1867,8 +1866,8 @@ class Job(ConfigObject):
|
|||
project_canonical_names = set()
|
||||
project_canonical_names.update(self.required_projects.keys())
|
||||
project_canonical_names.update(self._projectsFromPlaybooks(
|
||||
chain(self.pre_run, [self.run[0]], self.post_run,
|
||||
self.cleanup_run), with_implicit=True))
|
||||
itertools.chain(self.pre_run, [self.run[0]], self.post_run,
|
||||
self.cleanup_run), with_implicit=True))
|
||||
|
||||
projects = list()
|
||||
for project_canonical_name in project_canonical_names:
|
||||
|
@ -2061,7 +2060,7 @@ class BuildRequest:
|
|||
ALL_STATES = (REQUESTED, HOLD, RUNNING, PAUSED, COMPLETED)
|
||||
|
||||
def __init__(self, uuid, state, precedence, params, zone,
|
||||
tenant_name, pipeline_name):
|
||||
tenant_name, pipeline_name, event_id):
|
||||
self.uuid = uuid
|
||||
self.state = state
|
||||
self.precedence = precedence
|
||||
|
@ -2069,6 +2068,7 @@ class BuildRequest:
|
|||
self.zone = zone
|
||||
self.tenant_name = tenant_name
|
||||
self.pipeline_name = pipeline_name
|
||||
self.event_id = event_id
|
||||
|
||||
# ZK related data
|
||||
self.path = None
|
||||
|
@ -2084,6 +2084,7 @@ class BuildRequest:
|
|||
"zone": self.zone,
|
||||
"tenant_name": self.tenant_name,
|
||||
"pipeline_name": self.pipeline_name,
|
||||
"event_id": self.event_id,
|
||||
}
|
||||
|
||||
@classmethod
|
||||
|
@ -2096,6 +2097,7 @@ class BuildRequest:
|
|||
data["zone"],
|
||||
data["tenant_name"],
|
||||
data["pipeline_name"],
|
||||
data["event_id"],
|
||||
)
|
||||
|
||||
return build_request
|
||||
|
|
|
@ -28,9 +28,7 @@ import urllib
|
|||
|
||||
from kazoo.exceptions import NotEmptyError
|
||||
|
||||
from zuul import configloader
|
||||
from zuul import model
|
||||
from zuul import exceptions
|
||||
from zuul import configloader, exceptions
|
||||
from zuul import version as zuul_version
|
||||
from zuul import rpclistener
|
||||
from zuul.lib import commandsocket
|
||||
|
@ -53,6 +51,7 @@ from zuul.model import (
|
|||
BuildPausedEvent,
|
||||
BuildStartedEvent,
|
||||
BuildStatusEvent,
|
||||
Change,
|
||||
ChangeManagementEvent,
|
||||
DequeueEvent,
|
||||
EnqueueEvent,
|
||||
|
@ -65,6 +64,7 @@ from zuul.model import (
|
|||
SmartReconfigureEvent,
|
||||
Tenant,
|
||||
TenantReconfigureEvent,
|
||||
TimeDataBase,
|
||||
UnparsedAbideConfig,
|
||||
)
|
||||
from zuul.zk import ZooKeeperClient
|
||||
|
@ -112,6 +112,7 @@ class Scheduler(threading.Thread):
|
|||
_stats_interval = 30
|
||||
_cleanup_interval = 60 * 60
|
||||
_merger_client_class = MergeClient
|
||||
_executor_client_class = ExecutorClient
|
||||
|
||||
# Number of seconds past node expiration a hold request will remain
|
||||
EXPIRED_HOLD_REQUEST_TTL = 24 * 60 * 60
|
||||
|
@ -190,7 +191,7 @@ class Scheduler(threading.Thread):
|
|||
|
||||
if not testonly:
|
||||
time_dir = self._get_time_database_dir()
|
||||
self.time_database = model.TimeDataBase(time_dir)
|
||||
self.time_database = TimeDataBase(time_dir)
|
||||
|
||||
command_socket = get_default(
|
||||
self.config, 'scheduler', 'command_socket',
|
||||
|
@ -219,7 +220,7 @@ class Scheduler(threading.Thread):
|
|||
default_version=default_ansible_version)
|
||||
|
||||
if not testonly:
|
||||
self.executor = ExecutorClient(self.config, self)
|
||||
self.executor = self._executor_client_class(self.config, self)
|
||||
self.merger = self._merger_client_class(self.config, self)
|
||||
self.nodepool = nodepool.Nodepool(
|
||||
self.zk_client, self.hostname, self.statsd, self)
|
||||
|
@ -1180,7 +1181,7 @@ class Scheduler(threading.Thread):
|
|||
for item in shared_queue.queue:
|
||||
if item.change.project != change.project:
|
||||
continue
|
||||
if (isinstance(item.change, model.Change) and
|
||||
if (isinstance(item.change, Change) and
|
||||
item.change.number == change.number and
|
||||
item.change.patchset == change.patchset) or\
|
||||
(item.change.ref == change.ref):
|
||||
|
@ -1553,10 +1554,10 @@ class Scheduler(threading.Thread):
|
|||
return
|
||||
|
||||
build.start_time = event.data["start_time"]
|
||||
# TODO (felix): Remove this once the builds are executed via ZooKeeper.
|
||||
# It's currently necessary to set the correct private attribute on the
|
||||
# build for the gearman worker.
|
||||
self.executor.setWorkerInfo(build, event.data)
|
||||
# Update information about worker
|
||||
if event.data:
|
||||
# Noop builds don't provide any event data
|
||||
build.worker.updateFromData(event.data)
|
||||
|
||||
log = get_annotated_logger(self.log, build.zuul_event_id)
|
||||
if build.build_set is not build.build_set.item.current_build_set:
|
||||
|
@ -1792,7 +1793,7 @@ class Scheduler(threading.Thread):
|
|||
# If the build was canceled, we did actively cancel the job so
|
||||
# don't overwrite the result and don't retry.
|
||||
if build.canceled:
|
||||
result = build.result
|
||||
result = build.result or "CANCELED"
|
||||
build.retry = False
|
||||
|
||||
build.end_time = event_result["end_time"]
|
||||
|
@ -1821,7 +1822,7 @@ class Scheduler(threading.Thread):
|
|||
|
||||
# The test suite expects the build to be removed from the
|
||||
# internal dict after it's added to the report queue.
|
||||
del self.executor.builds[build.uuid]
|
||||
self.executor.removeBuild(build)
|
||||
|
||||
if build.build_set is not build.build_set.item.current_build_set:
|
||||
log.debug("Build %s is not in the current build set", build)
|
||||
|
|
|
@ -213,7 +213,7 @@ class ExecutorApi(ZooKeeperSimpleBase):
|
|||
yield from self.inState(BuildRequest.REQUESTED)
|
||||
|
||||
def submit(self, uuid, tenant_name, pipeline_name, params, zone,
|
||||
precedence=200):
|
||||
event_id, precedence=200):
|
||||
log = get_annotated_logger(self.log, event=None, build=uuid)
|
||||
|
||||
path = "/".join([self._getZoneRoot(zone), uuid])
|
||||
|
@ -226,6 +226,7 @@ class ExecutorApi(ZooKeeperSimpleBase):
|
|||
zone,
|
||||
tenant_name,
|
||||
pipeline_name,
|
||||
event_id,
|
||||
)
|
||||
|
||||
log.debug("Submitting build request to ZooKeeper %s", build_request)
|
||||
|
|
Loading…
Reference in New Issue