Execute merge jobs via ZooKeeper

This is the second part of I767c0b4c5473b2948487c3ae5bbc612c25a2a24a.
It uses the MergerAPI.

Note: since we no longer have a central gearman server where we can
record all of the merge jobs, some tests now consult the merger api
to get the list of merge jobs which were submitted by that scheduler.
This should generally be equivalent, but something to keep in mind
as we add multiple schedulers.

Change-Id: I1c694bcdc967283f1b1a4821df7700d93240690a
This commit is contained in:
Felix Edel 2021-07-21 14:23:43 +02:00 committed by James E. Blair
parent 97152216b2
commit 8038f9f75c
15 changed files with 490 additions and 331 deletions

View File

@ -3218,7 +3218,7 @@ class TestingMergerApi(HoldableMergerApi):
self.update(merge_request)
def queued(self):
return self.inState(
return self._test_getMergeJobsInState(
MergeRequest.REQUESTED, MergeRequest.HOLD
)
@ -3228,16 +3228,25 @@ class TestingMergerApi(HoldableMergerApi):
class RecordingMergeClient(zuul.merger.client.MergeClient):
_merger_api_class = HoldableMergerApi
def __init__(self, config, sched):
super().__init__(config, sched)
self.history = {}
def submitJob(self, name, data, build_set,
precedence=PRECEDENCE_NORMAL, event=None):
self.history.setdefault(name, [])
self.history[name].append((data, build_set))
def submitJob(
self,
job_type,
data,
build_set,
precedence=PRECEDENCE_NORMAL,
needs_result=False,
event=None,
):
self.history.setdefault(job_type, [])
self.history[job_type].append((data, build_set))
return super().submitJob(
name, data, build_set, precedence, event=event)
job_type, data, build_set, precedence, needs_result, event=event)
class HoldableExecutorApi(ExecutorApi):
@ -4582,6 +4591,7 @@ class ZuulTestCase(BaseTestCase):
executor_connections.configure(self.config,
source_only=True)
self.executor_api = TestingExecutorApi(self.zk_client)
self.merger_api = TestingMergerApi(self.zk_client)
self.executor_server = RecordingExecutorServer(
self.config,
executor_connections,
@ -4961,7 +4971,6 @@ class ZuulTestCase(BaseTestCase):
self.executor_server.hold_jobs_in_build = False
self.executor_server.release()
self.scheds.execute(lambda app: app.sched.executor.stop())
self.scheds.execute(lambda app: app.sched.merger.stop())
if self.merge_server:
self.merge_server.stop()
self.merge_server.join()
@ -5102,6 +5111,30 @@ class ZuulTestCase(BaseTestCase):
for app in self.scheds:
app.sched.executor.executor_api.hold_in_queue = hold_jobs_in_queue
@property
def hold_merge_jobs_in_queue(self):
return self.merger_api.hold_in_queue
@hold_merge_jobs_in_queue.setter
def hold_merge_jobs_in_queue(self, hold_in_queue):
"""Helper method to set hold_in_queue on all involved Merger APIs"""
self.merger_api.hold_in_queue = hold_in_queue
for app in self.scheds:
app.sched.merger.merger_api.hold_in_queue = hold_in_queue
@property
def merge_job_history(self):
history = {}
for app in self.scheds:
history.update(app.sched.merger.merger_api.history)
return history
@merge_job_history.deleter
def merge_job_history(self):
for app in self.scheds:
app.sched.merger.merger_api.history.clear()
def __haveAllBuildsReported(self, matcher) -> bool:
for app in self.scheds.filter(matcher):
executor_client = app.sched.executor
@ -5183,24 +5216,11 @@ class ZuulTestCase(BaseTestCase):
return True
def __areAllMergeJobsWaiting(self, matcher) -> bool:
for app in self.scheds.filter(matcher):
merge_client = app.sched.merger
for client_job in list(merge_client.jobs):
if not client_job.handle:
self.log.debug("%s has no handle" % client_job)
return False
server_job = self.gearman_server.jobs.get(client_job.handle)
if not server_job:
self.log.debug("%s is not known to the gearman server" %
client_job)
return False
if not hasattr(server_job, 'waiting'):
self.log.debug("%s is being enqueued" % server_job)
return False
if server_job.waiting:
self.log.debug("%s is waiting" % server_job)
continue
self.log.debug("%s is not waiting" % server_job)
# Look up the queued merge jobs directly from ZooKeeper
queued_merge_jobs = list(self.merger_api.all())
# Always ignore merge jobs which are on hold
for job in queued_merge_jobs:
if job.state != MergeRequest.HOLD:
return False
return True
@ -5308,11 +5328,6 @@ class ZuulTestCase(BaseTestCase):
logger("All builds waiting: %s", all_builds_waiting)
logger("All requests completed: %s", all_node_requests_completed)
logger("All event queues empty: %s", all_event_queues_empty)
for app in self.scheds.filter(matcher):
logger(
"[Sched: %s] Merge client jobs: %s",
app.sched, app.sched.merger.jobs
)
def waitForPoll(self, poller, timeout=30):
self.log.debug("Wait for poll on %s", poller)

View File

@ -901,7 +901,7 @@ class TestGerritCircularDependencies(ZuulTestCase):
self.assertEqual(A.patchsets[-1]["approvals"][0]["value"], "1")
def test_cycle_merge_conflict(self):
self.gearman_server.hold_merge_jobs_in_queue = True
self.hold_merge_jobs_in_queue = True
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
@ -927,8 +927,8 @@ class TestGerritCircularDependencies(ZuulTestCase):
self.waitUntilSettled()
self.gearman_server.hold_merge_jobs_in_queue = False
self.gearman_server.release()
self.hold_merge_jobs_in_queue = False
self.merger_api.release()
self.waitUntilSettled()
self.assertEqual(A.reported, 0)

View File

@ -22,7 +22,7 @@ from zuul import model
from zuul.configloader import AuthorizationRuleParser, safe_load_yaml
from tests.base import ZuulTestCase
from zuul.model import SourceContext
from zuul.model import MergeRequest, SourceContext
class TenantParserTestCase(ZuulTestCase):
@ -326,8 +326,8 @@ class TestTenantGroups4(TenantParserTestCase):
tpc.load_classes)
# Check that only one merger:cat job was requested
# org/project1 and org/project2 have an empty load_classes
cat_jobs = [job for job in self.gearman_server.jobs_history
if job.name == b'merger:cat']
cat_jobs = [job for job in self.merge_job_history.values()
if job.job_type == MergeRequest.CAT]
self.assertEqual(1, len(cat_jobs))
old_layout = tenant.layout
@ -361,8 +361,8 @@ class TestTenantGroups5(TenantParserTestCase):
tpc.load_classes)
# Check that only one merger:cat job was requested
# org/project1 and org/project2 have an empty load_classes
cat_jobs = [job for job in self.gearman_server.jobs_history
if job.name == b'merger:cat']
cat_jobs = [job for job in self.merge_job_history.values()
if job.job_type == MergeRequest.CAT]
self.assertEqual(1, len(cat_jobs))
@ -628,7 +628,7 @@ class TestUnparsedConfigCache(ZuulTestCase):
# Clear the unparsed branch cache so all projects (except for
# org/project2) are retrieved from the cache in Zookeeper.
sched.abide.unparsed_project_branch_cache.clear()
self.gearman_server.jobs_history.clear()
del self.merge_job_history
# Create a tenant reconfiguration event with a known ltime that is
# smaller than the ltime of the items in the cache.
@ -640,8 +640,8 @@ class TestUnparsedConfigCache(ZuulTestCase):
# As the cache should be valid (cache ltime of org/project2 newer than
# event ltime) we don't expect any cat jobs.
cat_jobs = [job for job in self.gearman_server.jobs_history
if job.name == b"merger:cat"]
cat_jobs = [job for job in self.merge_job_history.values()
if job.job_type == MergeRequest.CAT]
self.assertEqual(len(cat_jobs), 0)
# Set canary value so we can detect if the configloader used
@ -649,7 +649,7 @@ class TestUnparsedConfigCache(ZuulTestCase):
common_cache = cache.getFilesCache("review.example.com/common-config",
"master")
common_cache.setValidFor({"CANARY"}, set(), common_cache.ltime)
self.gearman_server.jobs_history.clear()
del self.merge_job_history
# Create a tenant reconfiguration event with a known ltime that is
# smaller than the ltime of the items in the cache.
@ -666,8 +666,8 @@ class TestUnparsedConfigCache(ZuulTestCase):
# As the cache should be valid (cache ltime of org/project2 newer than
# event ltime) we don't expect any cat jobs.
cat_jobs = [job for job in self.gearman_server.jobs_history
if job.name == b"merger:cat"]
cat_jobs = [job for job in self.merge_job_history.values()
if job.job_type == MergeRequest.CAT]
self.assertEqual(len(cat_jobs), 0)
sched.apsched.start()

View File

@ -30,6 +30,7 @@ from zuul.driver.github.githubconnection import GithubShaCache
from zuul.zk.layout import LayoutState
import zuul.rpcclient
from zuul.lib import strings
from zuul.model import MergeRequest
from tests.base import (AnsibleZuulTestCase, BaseTestCase,
ZuulGithubAppTestCase, ZuulTestCase,
@ -976,9 +977,9 @@ class TestGithubDriver(ZuulTestCase):
self.waitUntilSettled()
if expected_cat_jobs is not None:
# clear the gearman jobs history so we can count the cat jobs
# clear the merge jobs history so we can count the cat jobs
# issued during reconfiguration
self.gearman_server.jobs_history.clear()
del self.merge_job_history
self.fake_github.emitEvent(pevent)
self.waitUntilSettled()
@ -995,8 +996,8 @@ class TestGithubDriver(ZuulTestCase):
if expected_cat_jobs is not None:
# Check the expected number of cat jobs here as the (empty) config
# of org/project should be cached.
cat_jobs = set([job for job in self.gearman_server.jobs_history
if job.name == b'merger:cat'])
cat_jobs = [job for job in self.merge_job_history.values()
if job.job_type == MergeRequest.CAT]
self.assertEqual(expected_cat_jobs, len(cat_jobs), cat_jobs)
@simple_layout('layouts/basic-github.yaml', driver='github')
@ -1587,9 +1588,9 @@ class TestGithubUnprotectedBranches(ZuulTestCase):
self.waitUntilSettled()
if expected_cat_jobs is not None:
# clear the gearman jobs history so we can count the cat jobs
# clear the merge jobs history so we can count the cat jobs
# issued during reconfiguration
self.gearman_server.jobs_history.clear()
del self.merge_job_history
self.fake_github.emitEvent(pevent)
self.waitUntilSettled()
@ -1606,8 +1607,8 @@ class TestGithubUnprotectedBranches(ZuulTestCase):
if expected_cat_jobs is not None:
# Check the expected number of cat jobs here as the (empty) config
# of org/project should be cached.
cat_jobs = set([job for job in self.gearman_server.jobs_history
if job.name == b'merger:cat'])
cat_jobs = [job for job in self.merge_job_history.values()
if job.job_type == MergeRequest.CAT]
self.assertEqual(expected_cat_jobs, len(cat_jobs), cat_jobs)
def test_push_event_reconfigure_complex_branch(self):

View File

@ -21,8 +21,12 @@ import git
import testtools
from zuul.merger.merger import MergerTree, Repo
from zuul.zk.merger import MergerApi
import zuul.model
from tests.base import BaseTestCase, ZuulTestCase, FIXTURE_DIR, simple_layout
from zuul.model import MergeRequest
from tests.base import (
BaseTestCase, ZuulTestCase, FIXTURE_DIR, simple_layout, iterate_timeout
)
class TestMergerRepo(ZuulTestCase):
@ -896,9 +900,10 @@ class TestMerger(ZuulTestCase):
def test_stale_index_lock_cleanup(self):
# Stop the running executor's merger. We needed it running to merge
# things during test boostrapping but now it is just in the way.
self.executor_server.merger_gearworker.stop()
self.executor_server.merger_gearworker.join()
# Start the merger and do a merge to populate the repo on disk
self.executor_server._merger_running = False
self.executor_server.merger_loop_wake_event.set()
self.executor_server.merger_thread.join()
# Start a dedicated merger and do a merge to populate the repo on disk
self._startMerger()
A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
@ -1004,6 +1009,62 @@ class TestMerger(ZuulTestCase):
self.assertEqual(upstream_repo.commit(change_ref).hexsha,
zuul_repo.commit('HEAD').hexsha)
def test_lost_merge_requests(self):
# Test the cleanupLostMergeRequests method of the merger
# client. This is normally called from apsched from the
# scheduler. To exercise it, we need to produce a fake lost
# merge request and then invoke it ourselves.
# Stop the actual merger which will see this as garbage:
self.executor_server._merger_running = False
self.executor_server.merger_loop_wake_event.set()
self.executor_server.merger_thread.join()
# Create a fake lost merge request. This is based on
# test_lost_merge_requests in test_zk.
merger_api = MergerApi(self.zk_client)
payload = {'merge': 'test'}
merger_api.submit(
uuid='B',
job_type=MergeRequest.MERGE,
build_set_uuid='BB',
tenant_name='tenant',
pipeline_name='check',
params=payload,
event_id='1',
)
b = merger_api.get(f"{merger_api.MERGE_REQUEST_ROOT}/B")
b.state = MergeRequest.RUNNING
merger_api.update(b)
# Wait until the latest state transition is reflected in the Merger
# APIs cache. Using a DataWatch for this purpose could lead to race
# conditions depending on which DataWatch is executed first. The
# DataWatch might be triggered for the correct event, but the cache
# might still be outdated as the DataWatch that updates the cache
# itself wasn't triggered yet.
cache = merger_api._cached_merge_requests
for _ in iterate_timeout(30, "cache to be up-to-date"):
if (cache and cache[b.path].state == MergeRequest.RUNNING):
break
# The lost_merges method should only return merges which are running
# but not locked by any merger, in this case merge b
lost_merge_requests = list(merger_api.lostMergeRequests())
self.assertEqual(1, len(lost_merge_requests))
self.assertEqual(b.path, lost_merge_requests[0].path)
# Exercise the cleanup code
merger_client = self.scheds.first.sched.merger
merger_client.cleanupLostMergeRequests()
lost_merge_requests = list(merger_api.lostMergeRequests())
self.assertEqual(0, len(lost_merge_requests))
class TestMergerTree(BaseTestCase):

View File

@ -550,12 +550,13 @@ class TestScheduler(ZuulTestCase):
@simple_layout('layouts/branch-deletion.yaml')
def test_branch_deletion(self):
"Test the correct variant of a job runs on a branch"
# Start a secondary merger so this test exercises branch
# deletion on both a merger and a separate executor.
self.executor_server._merger_running = False
self.executor_server.merger_loop_wake_event.set()
self.executor_server.merger_thread.join()
self._startMerger()
merger_gear = self.executor_server.merger_gearworker.gearman
for f in list(merger_gear.functions.keys()):
f = f.decode('utf8')
if f.startswith('merger:'):
merger_gear.unRegisterFunction(f)
self.create_branch('org/project', 'stable')
self.fake_gerrit.addEvent(
@ -4087,13 +4088,9 @@ class TestScheduler(ZuulTestCase):
self.assertEqual(len(self.builds), 2)
merge_count_project1 = 0
for job in self.gearman_server.jobs_history:
if job.name == b'merger:refstate':
args = job.arguments
if isinstance(args, bytes):
args = args.decode('utf-8')
args = json.loads(args)
if args["items"][0]["project"] == "org/project1":
for job in self.merge_job_history.values():
if job.job_type == zuul.model.MergeRequest.REF_STATE:
if job.payload["items"][0]["project"] == "org/project1":
merge_count_project1 += 1
self.assertEquals(merge_count_project1, 0,
"project1 shouldn't have any refstate call")
@ -6361,18 +6358,18 @@ For CI problems and help debugging, contact ci@example.org"""
def test_pending_merge_in_reconfig(self):
# Test that if we are waiting for an outstanding merge on
# reconfiguration that we continue to do so.
self.gearman_server.hold_merge_jobs_in_queue = True
self.hold_merge_jobs_in_queue = True
A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
A.setMerged()
self.fake_gerrit.addEvent(A.getRefUpdatedEvent())
self.waitUntilSettled()
self.assertEqual(len(self.scheds.first.sched.merger.jobs), 1)
gearJob = next(iter(self.scheds.first.sched.merger.jobs))
self.assertEqual(gearJob.complete, False)
jobs = list(self.merger_api.all())
self.assertEqual(len(jobs), 1)
self.assertEqual(jobs[0].state, zuul.model.MergeRequest.HOLD)
# Reconfigure while we still have an outstanding merge job
self.gearman_server.hold_merge_jobs_in_queue = False
self.hold_merge_jobs_in_queue = False
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
(trusted, project1) = tenant.getProject('org/project1')
self.scheds.first.sched.reconfigureTenant(
@ -6382,16 +6379,17 @@ For CI problems and help debugging, contact ci@example.org"""
# Verify the merge job is still running and that the item is
# in the pipeline
self.assertEqual(gearJob.complete, False)
self.assertEqual(len(self.scheds.first.sched.merger.jobs), 1)
jobs = list(self.merger_api.all())
self.assertEqual(jobs[0].state, zuul.model.MergeRequest.HOLD)
self.assertEqual(len(jobs), 1)
pipeline = tenant.layout.pipelines['post']
self.assertEqual(len(pipeline.getAllItems()), 1)
self.gearman_server.release()
self.merger_api.release()
self.waitUntilSettled()
self.assertEqual(gearJob.complete, True)
self.assertEqual(len(self.scheds.first.sched.merger.jobs), 0)
jobs = list(self.merger_api.all())
self.assertEqual(len(jobs), 0)
@simple_layout('layouts/parent-matchers.yaml')
def test_parent_matchers(self):
@ -8767,7 +8765,7 @@ class TestSchedulerSmartReconfiguration(ZuulTestCase):
self.newTenantConfig('config/multi-tenant/main-reconfig.yaml')
self.gearman_server.jobs_history.clear()
del self.merge_job_history
self.scheds.execute(
lambda app: app.smartReconfigure(command_socket=command_socket))
@ -8790,8 +8788,8 @@ class TestSchedulerSmartReconfiguration(ZuulTestCase):
# We're only adding two new repos, so we should only need to
# issue 2 cat jobs.
cat_jobs = [job for job in self.gearman_server.jobs_history
if job.name == b"merger:cat"]
cat_jobs = [job for job in self.merge_job_history.values()
if job.job_type == zuul.model.MergeRequest.CAT]
self.assertEqual(len(cat_jobs), 2)
# Ensure that tenant-one has not been reconfigured

View File

@ -58,10 +58,10 @@ class TestScaleOutScheduler(ZuulTestCase):
for app in self.scheds.instances:
if app is self.scheds.first:
self.assertIsNotNone(
app.sched.merger.history.get("merger:cat"))
app.sched.merger.history.get("cat"))
else:
# Make sure the other schedulers did not issue any cat jobs
self.assertIsNone(app.sched.merger.history.get("merger:cat"))
self.assertIsNone(app.sched.merger.history.get("cat"))
self.waitUntilSettled()
self.assertEqual(self.scheds.first.sched.globals.max_hold_expiration,

View File

@ -29,6 +29,8 @@ import git
import paramiko
import zuul.configloader
from zuul.model import MergeRequest
from tests.base import (
AnsibleZuulTestCase,
ZuulTestCase,
@ -2833,7 +2835,7 @@ class TestNonLiveMerges(ZuulTestCase):
# We expect one merge call per live change, plus one call for
# each non-live change with a config update (which is all of them).
self.assertEqual(
len(self.scheds.first.sched.merger.history['merger:merge']), 6)
len(self.scheds.first.sched.merger.history[MergeRequest.MERGE]), 6)
def test_non_live_merges(self):
"""
@ -2857,7 +2859,7 @@ class TestNonLiveMerges(ZuulTestCase):
# We expect one merge call per live change.
self.assertEqual(
len(self.scheds.first.sched.merger.history['merger:merge']), 3)
len(self.scheds.first.sched.merger.history[MergeRequest.MERGE]), 3)
class TestJobContamination(AnsibleZuulTestCase):
@ -3866,7 +3868,7 @@ class TestBrokenMultiTenantConfig(ZuulTestCase):
# the tenant loads with an error, but proceeds.
# Don't run any merge jobs, so we can run them out of order.
self.gearman_server.hold_merge_jobs_in_queue = True
self.hold_merge_jobs_in_queue = True
# Create a first change which modifies the config (and
# therefore will require a merge job).
@ -3886,13 +3888,11 @@ class TestBrokenMultiTenantConfig(ZuulTestCase):
self.waitUntilSettled()
# There should be a merge job for each change.
self.assertEqual(len(self.scheds.first.sched.merger.jobs), 2)
self.assertEqual(len(list(self.merger_api.all())), 2)
jobs = [job for job in self.gearman_server.getQueue()
if job.name.startswith(b'merger:')]
jobs = list(self.merger_api.queued())
# Release the second merge job.
jobs[-1].waiting = False
self.gearman_server.wakeConnections()
self.merger_api.release(jobs[-1])
self.waitUntilSettled()
# At this point we should still be waiting on the first
@ -3900,8 +3900,8 @@ class TestBrokenMultiTenantConfig(ZuulTestCase):
self.assertHistory([])
# Proceed.
self.gearman_server.hold_merge_jobs_in_queue = False
self.gearman_server.release()
self.hold_merge_jobs_in_queue = False
self.merger_api.release()
self.waitUntilSettled()
self.assertHistory([
dict(name='base', result='SUCCESS', changes='1,1 2,1'),

View File

@ -66,7 +66,8 @@ class GitConnection(BaseConnection):
def getChangeFilesUpdated(self, project_name, branch, tosha):
job = self.sched.merger.getFilesChanges(
self.connection_name, project_name, branch, tosha)
self.connection_name, project_name, branch, tosha,
needs_result=True)
self.log.debug("Waiting for fileschanges job %s" % job)
job.wait()
if not job.updated:

View File

@ -44,8 +44,6 @@ from zuul.lib import filecomments
from zuul.lib.keystorage import ZooKeeperKeyStorage
from zuul.lib.varnames import check_varnames
import gear
import zuul.lib.repl
import zuul.merger.merger
import zuul.ansible.logconfig
@ -3016,19 +3014,6 @@ class AnsibleJob(object):
return result, code
class ExecutorMergeWorker(gear.TextWorker):
def __init__(self, executor_server, *args, **kw):
self.zuul_executor_server = executor_server
super(ExecutorMergeWorker, self).__init__(*args, **kw)
def handleNoop(self, packet):
# Wait until the update queue is empty before responding
while self.zuul_executor_server.update_queue.qsize():
time.sleep(1)
super(ExecutorMergeWorker, self).handleNoop(packet)
class ExecutorServer(BaseMergeServer):
log = logging.getLogger("zuul.ExecutorServer")
_ansible_manager_class = AnsibleManager
@ -3242,10 +3227,6 @@ class ExecutorServer(BaseMergeServer):
def accepting_work(self, work):
self.component_info.accepting_work = work
def noop(self, job):
"""A noop gearman job so we can register for statistics."""
job.sendWorkComplete()
def start(self):
# Start merger worker only if we process merge jobs
if self.process_merge_jobs:
@ -3305,9 +3286,6 @@ class ExecutorServer(BaseMergeServer):
# it has stopped.
self.governor_stop_event.set()
self.governor_thread.join()
# Stop accepting new jobs
if self.merger_gearworker is not None:
self.merger_gearworker.gearman.setFunctions([])
# Tell the executor worker to abort any jobs it just accepted,
# and grab the list of currently running job workers.
with self.run_lock:
@ -3337,7 +3315,7 @@ class ExecutorServer(BaseMergeServer):
self.command_socket.stop()
# All job results should have been sent by now, shutdown the
# gearman workers.
# build and merger workers.
self.build_loop_wake_event.set()
self.build_worker.join()

View File

@ -12,193 +12,144 @@
# License for the specific language governing permissions and limitations
# under the License.
import json
import logging
import threading
from uuid import uuid4
import gear
import zuul.model
from zuul.lib.config import get_default
from zuul.lib.logutil import get_annotated_logger
def getJobData(job):
if not len(job.data):
return {}
d = job.data[-1]
if not d:
return {}
return json.loads(d)
class MergeGearmanClient(gear.Client):
def __init__(self, merge_client):
super(MergeGearmanClient, self).__init__('Zuul Merge Client')
self.__merge_client = merge_client
def handleWorkComplete(self, packet):
job = super(MergeGearmanClient, self).handleWorkComplete(packet)
self.__merge_client.onBuildCompleted(job)
return job
def handleWorkFail(self, packet):
job = super(MergeGearmanClient, self).handleWorkFail(packet)
self.__merge_client.onBuildCompleted(job)
return job
def handleWorkException(self, packet):
job = super(MergeGearmanClient, self).handleWorkException(packet)
self.__merge_client.onBuildCompleted(job)
return job
def handleDisconnect(self, job):
job = super(MergeGearmanClient, self).handleDisconnect(job)
self.__merge_client.onBuildCompleted(job)
class MergeJob(gear.TextJob):
def __init__(self, *args, **kw):
super(MergeJob, self).__init__(*args, **kw)
self.__event = threading.Event()
def setComplete(self):
self.__event.set()
def wait(self, timeout=300):
return self.__event.wait(timeout)
from zuul.model import MergeRequest, PRECEDENCE_HIGH, PRECEDENCE_NORMAL
from zuul.zk.merger import MergerApi
from zuul.zk.exceptions import MergeRequestNotFound
from kazoo.exceptions import BadVersionError
class MergeClient(object):
log = logging.getLogger("zuul.MergeClient")
_merger_api_class = MergerApi
def __init__(self, config, sched):
self.config = config
self.sched = sched
server = self.config.get('gearman', 'server')
port = get_default(self.config, 'gearman', 'port', 4730)
ssl_key = get_default(self.config, 'gearman', 'ssl_key')
ssl_cert = get_default(self.config, 'gearman', 'ssl_cert')
ssl_ca = get_default(self.config, 'gearman', 'ssl_ca')
self.log.debug("Connecting to gearman at %s:%s" % (server, port))
self.gearman = MergeGearmanClient(self)
self.gearman.addServer(server, port, ssl_key, ssl_cert, ssl_ca,
keepalive=True, tcp_keepidle=60,
tcp_keepintvl=30, tcp_keepcnt=5)
self.git_timeout = get_default(
self.config, 'merger', 'git_timeout', 300)
self.log.debug("Waiting for gearman")
self.gearman.waitForServer()
self.jobs = set()
self.merger_api = self._merger_api_class(self.sched.zk_client)
def stop(self):
self.gearman.shutdown()
def submitJob(
self,
job_type,
data,
build_set,
precedence=PRECEDENCE_NORMAL,
needs_result=False,
event=None,
):
# We need the tenant, pipeline and queue names to put the merge result
# in the correct queue. The only source for those values in this
# context is the buildset. If no buildset is provided, we can't provide
# a result event. In those cases a user of this function can fall back
# to the return value which provides the result as a future stored in a
# ZooKeeper path.
build_set_uuid = None
tenant_name = None
pipeline_name = None
def areMergesOutstanding(self):
if self.jobs:
return True
return False
if build_set is not None:
build_set_uuid = build_set.uuid
tenant_name = build_set.item.pipeline.tenant.name
pipeline_name = build_set.item.pipeline.name
def submitJob(self, name, data, build_set,
precedence=zuul.model.PRECEDENCE_NORMAL, event=None):
log = get_annotated_logger(self.log, event)
uuid = str(uuid4().hex)
job = MergeJob(name,
json.dumps(data),
unique=uuid)
job.build_set = build_set
log.debug("Submitting job %s with data %s", job, data)
self.jobs.add(job)
self.gearman.submitJob(job, precedence=precedence,
timeout=300)
return job
log = get_annotated_logger(self.log, event)
log.debug("Submitting job %s with data %s", uuid, data)
return self.merger_api.submit(
uuid=uuid,
job_type=job_type,
build_set_uuid=build_set_uuid,
tenant_name=tenant_name,
pipeline_name=pipeline_name,
params=data,
event_id=event.zuul_event_id if event else None,
precedence=precedence,
needs_result=needs_result,
)
def mergeChanges(self, items, build_set, files=None, dirs=None,
repo_state=None, precedence=zuul.model.PRECEDENCE_NORMAL,
repo_state=None, precedence=PRECEDENCE_NORMAL,
branches=None, event=None):
if event is not None:
zuul_event_id = event.zuul_event_id
else:
zuul_event_id = None
data = dict(items=items,
files=files,
dirs=dirs,
repo_state=repo_state,
branches=branches,
zuul_event_id=zuul_event_id)
self.submitJob('merger:merge', data, build_set, precedence,
event=event)
branches=branches)
self.submitJob(
MergeRequest.MERGE, data, build_set, precedence, event=event)
def getRepoState(self, items, build_set,
precedence=zuul.model.PRECEDENCE_NORMAL,
def getRepoState(self, items, build_set, precedence=PRECEDENCE_NORMAL,
branches=None, event=None):
if event is not None:
zuul_event_id = event.zuul_event_id
else:
zuul_event_id = None
data = dict(items=items, branches=branches,
zuul_event_id=zuul_event_id)
self.submitJob('merger:refstate', data, build_set, precedence,
event=event)
data = dict(items=items, branches=branches)
self.submitJob(
MergeRequest.REF_STATE, data, build_set, precedence, event=event)
def getFiles(self, connection_name, project_name, branch, files, dirs=[],
precedence=zuul.model.PRECEDENCE_HIGH, event=None):
if event is not None:
zuul_event_id = event.zuul_event_id
else:
zuul_event_id = None
precedence=PRECEDENCE_HIGH, event=None):
data = dict(connection=connection_name,
project=project_name,
branch=branch,
files=files,
dirs=dirs,
zuul_event_id=zuul_event_id)
job = self.submitJob('merger:cat', data, None, precedence, event=event)
dirs=dirs)
job = self.submitJob(
MergeRequest.CAT,
data,
None,
precedence,
needs_result=True,
event=event,
)
return job
def getFilesChanges(self, connection_name, project_name, branch,
tosha=None, precedence=zuul.model.PRECEDENCE_HIGH,
build_set=None, event=None):
if event is not None:
zuul_event_id = event.zuul_event_id
else:
zuul_event_id = None
tosha=None, precedence=PRECEDENCE_HIGH,
build_set=None, needs_result=False, event=None):
data = dict(connection=connection_name,
project=project_name,
branch=branch,
tosha=tosha,
zuul_event_id=zuul_event_id)
job = self.submitJob('merger:fileschanges', data, build_set,
precedence, event=event)
tosha=tosha)
job = self.submitJob(
MergeRequest.FILES_CHANGES,
data,
build_set,
precedence,
needs_result=needs_result,
event=event,
)
return job
def onBuildCompleted(self, job):
data = getJobData(job)
zuul_event_id = data.get('zuul_event_id')
log = get_annotated_logger(self.log, zuul_event_id)
def cleanupLostMergeRequests(self):
for merge_request in self.merger_api.lostMergeRequests():
try:
self.cleanupLostMergeRequest(merge_request)
except Exception:
self.log.exception("Exception cleaning up lost merge request:")
merged = data.get('merged', False)
job.updated = data.get('updated', False)
commit = data.get('commit')
files = data.get('files', {})
repo_state = data.get('repo_state', {})
item_in_branches = data.get('item_in_branches', [])
job.files = files
log.info("Merge %s complete, merged: %s, updated: %s, "
"commit: %s, branches: %s", job, merged, job.updated, commit,
item_in_branches)
job.setComplete()
if job.build_set:
if job.name == 'merger:fileschanges':
self.sched.onFilesChangesCompleted(job.build_set, files)
else:
self.sched.onMergeCompleted(job.build_set,
merged, job.updated, commit, files,
repo_state, item_in_branches)
# The test suite expects the job to be removed from the
# internal account after the wake flag is set.
self.jobs.remove(job)
def cleanupLostMergeRequest(self, merge_request):
merge_request.state = MergeRequest.COMPLETED
try:
self.merger_api.update(merge_request)
# No need to unlock the build, as it is by definition unlocked.
# TODO (felix): If we want to optimize ZK requests, we could only
# call the remove() here.
self.merger_api.remove(merge_request)
except MergeRequestNotFound as e:
self.log.warning("Could not complete merge: %s", str(e))
return
except BadVersionError:
# There could be a race condition:
# The merge request is found by lost_merge_requests in
# state RUNNING but gets completed/unlocked before the
# is_locked() check. Since we use the znode version, the
# update will fail in this case and we can simply ignore
# the exception.
return

View File

@ -18,17 +18,22 @@ import os
import socket
import sys
import threading
import time
from abc import ABCMeta
from configparser import ConfigParser
from zuul.zk import ZooKeeperClient
from zuul.lib import commandsocket
from zuul.lib.config import get_default
from zuul.lib.gearworker import ZuulGearWorker
from zuul.lib.logutil import get_annotated_logger
from zuul.merger import merger
from zuul.merger.merger import nullcontext
from zuul.model import (
FilesChangesCompletedEvent, MergeCompletedEvent, MergeRequest
)
from zuul.zk import ZooKeeperClient
from zuul.zk.components import MergerComponent
from zuul.zk.event_queues import PipelineResultEventQueue
from zuul.zk.merger import MergerApi
COMMANDS = ['stop', 'pause', 'unpause']
@ -62,6 +67,8 @@ class BaseMergeServer(metaclass=ABCMeta):
connections,
):
self.connections = connections
self._merger_running = False
self._merger_paused = False
self.merge_email = get_default(config, 'merger', 'git_user_email',
'zuul.merger.default@example.com')
self.merge_name = get_default(config, 'merger', 'git_user_name',
@ -80,6 +87,25 @@ class BaseMergeServer(metaclass=ABCMeta):
self.zk_client = ZooKeeperClient.fromConfig(self.config)
self.zk_client.connect()
self.result_events = PipelineResultEventQueue.createRegistry(
self.zk_client
)
self.merger_thread = threading.Thread(
target=self.runMerger,
name="Merger",
)
self.merger_loop_wake_event = threading.Event()
self.merger_cleanup_election = self.zk_client.client.Election(
f"{MergerApi.MERGE_REQUEST_ROOT}/election"
)
self.merger_api = MergerApi(
self.zk_client,
merge_request_callback=self.merger_loop_wake_event.set,
)
# This merger and its git repos are used to maintain
# up-to-date copies of all the repos that are used by jobs, as
# well as to support the merger:cat functon to supply
@ -90,19 +116,6 @@ class BaseMergeServer(metaclass=ABCMeta):
# Repo locking is needed on the executor
self.repo_locks = self._repo_locks_class()
self.merger_jobs = {
'merger:merge': self.merge,
'merger:cat': self.cat,
'merger:refstate': self.refstate,
'merger:fileschanges': self.fileschanges,
}
self.merger_gearworker = ZuulGearWorker(
'Zuul Merger',
'zuul.BaseMergeServer',
'merger-gearman-worker',
self.config,
self.merger_jobs)
def _getMerger(self, root, cache_root, logger=None,
execution_context=True, scheme=None,
cache_scheme=None):
@ -133,7 +146,7 @@ class BaseMergeServer(metaclass=ABCMeta):
zuul_event_id=zuul_event_id)
def start(self):
self.log.debug('Starting merger worker')
self.log.debug('Starting merger')
self.log.debug('Cleaning any stale git index.lock files')
for (dirpath, dirnames, filenames) in os.walk(self.merge_root):
if '.git' in dirnames:
@ -152,27 +165,87 @@ class BaseMergeServer(metaclass=ABCMeta):
self.log.exception(
'Unable to remove stale git lock: '
'%s this may result in failed merges' % fp)
self.merger_gearworker.start()
self._merger_running = True
self.merger_thread.start()
def stop(self):
self.log.debug('Stopping merger worker')
self.merger_gearworker.stop()
self.log.debug('Stopping merger')
self._merger_running = False
self.merger_loop_wake_event.set()
self.merger_cleanup_election.cancel()
self.zk_client.disconnect()
def join(self):
self.merger_gearworker.join()
self.merger_loop_wake_event.set()
self.merger_thread.join()
def pause(self):
self.log.debug('Pausing merger worker')
self.merger_gearworker.unregister()
self.log.debug('Pausing merger')
self._merger_paused = True
def unpause(self):
self.log.debug('Resuming merger worker')
self.merger_gearworker.register()
self.log.debug('Resuming merger')
self._merger_paused = False
self.merger_loop_wake_event.set()
def cat(self, job):
self.log.debug("Got cat job: %s" % job.unique)
args = json.loads(job.arguments)
def runMerger(self):
while self._merger_running:
self.merger_loop_wake_event.wait()
self.merger_loop_wake_event.clear()
if self._merger_paused:
continue
try:
for merge_request in self.merger_api.next():
if not self._merger_running:
break
try:
self._runMergeJob(merge_request)
except Exception:
log = get_annotated_logger(
self.log, merge_request.event_id
)
log.exception("Exception while performing merge")
self.completeMergeJob(merge_request, None)
except Exception:
self.log.exception("Error in merge thread:")
time.sleep(5)
def _runMergeJob(self, merge_request):
if not self.merger_api.lock(merge_request, blocking=False):
return
merge_request.state = MergeRequest.RUNNING
params = self.merger_api.getMergeParams(merge_request)
self.merger_api.clearMergeParams(merge_request)
# Directly update the merge request in ZooKeeper, so we don't loop over
# and try to lock it again and again.
self.merger_api.update(merge_request)
self.log.debug("Next executed merge job: %s", merge_request)
result = None
try:
result = self.executeMergeJob(merge_request, params)
except Exception:
self.log.exception("Error running merge job:")
finally:
try:
self.completeMergeJob(merge_request, result)
except Exception:
self.log.exception("Error completing merge job:")
def executeMergeJob(self, merge_request, params):
result = None
if merge_request.job_type == MergeRequest.MERGE:
result = self.merge(merge_request, params)
elif merge_request.job_type == MergeRequest.CAT:
result = self.cat(merge_request, params)
elif merge_request.job_type == MergeRequest.REF_STATE:
result = self.refstate(merge_request, params)
elif merge_request.job_type == MergeRequest.FILES_CHANGES:
result = self.fileschanges(merge_request, params)
return result
def cat(self, merge_request, args):
self.log.debug("Got cat job: %s", merge_request.uuid)
connection_name = args['connection']
project_name = args['project']
@ -189,15 +262,11 @@ class BaseMergeServer(metaclass=ABCMeta):
else:
result = dict(updated=True, files=files)
payload = json.dumps(result)
self.log.debug("Completed cat job %s: payload size: %s",
job.unique, sys.getsizeof(payload))
job.sendWorkComplete(payload)
return result
def merge(self, job):
self.log.debug("Got merge job: %s" % job.unique)
args = json.loads(job.arguments)
zuul_event_id = args.get('zuul_event_id')
def merge(self, merge_request, args):
self.log.debug("Got merge job: %s", merge_request.uuid)
zuul_event_id = merge_request.event_id
for item in args['items']:
self._update(item['connection'], item['project'])
@ -216,15 +285,11 @@ class BaseMergeServer(metaclass=ABCMeta):
(result['commit'], result['files'], result['repo_state'],
recent, orig_commit) = ret
result['zuul_event_id'] = zuul_event_id
payload = json.dumps(result)
self.log.debug("Completed merge job %s: payload size: %s",
job.unique, sys.getsizeof(payload))
job.sendWorkComplete(payload)
return result
def refstate(self, job):
self.log.debug("Got refstate job: %s" % job.unique)
args = json.loads(job.arguments)
zuul_event_id = args.get('zuul_event_id')
def refstate(self, merge_request, args):
self.log.debug("Got refstate job: %s", merge_request.uuid)
zuul_event_id = merge_request.event_id
success, repo_state, item_in_branches = \
self.merger.getRepoState(
args['items'], self.repo_locks, branches=args.get('branches'))
@ -232,15 +297,11 @@ class BaseMergeServer(metaclass=ABCMeta):
repo_state=repo_state,
item_in_branches=item_in_branches)
result['zuul_event_id'] = zuul_event_id
payload = json.dumps(result)
self.log.debug("Completed refstate job %s: payload size: %s",
job.unique, sys.getsizeof(payload))
job.sendWorkComplete(payload)
return result
def fileschanges(self, job):
self.log.debug("Got fileschanges job: %s" % job.unique)
args = json.loads(job.arguments)
zuul_event_id = args.get('zuul_event_id')
def fileschanges(self, merge_request, args):
self.log.debug("Got fileschanges job: %s", merge_request.uuid)
zuul_event_id = merge_request.event_id
connection_name = args['connection']
project_name = args['project']
@ -260,10 +321,77 @@ class BaseMergeServer(metaclass=ABCMeta):
result = dict(updated=True, files=files)
result['zuul_event_id'] = zuul_event_id
payload = json.dumps(result)
self.log.debug("Completed fileschanges job %s: payload size: %s",
job.unique, sys.getsizeof(payload))
job.sendWorkComplete(payload)
return result
def completeMergeJob(self, merge_request, result):
log = get_annotated_logger(self.log, merge_request.event_id)
if result is not None:
payload = json.dumps(result)
self.log.debug("Completed %s job %s: payload size: %s",
merge_request.job_type, merge_request.uuid,
sys.getsizeof(payload))
merged = result.get("merged", False)
updated = result.get("updated", False)
commit = result.get("commit")
repo_state = result.get("repo_state", {})
item_in_branches = result.get("item_in_branches", [])
files = result.get("files", {})
log.info(
"Merge %s complete, merged: %s, updated: %s, commit: %s, "
"branches: %s",
merge_request,
merged,
updated,
commit,
item_in_branches,
)
# Provide a result either via a result future or a result event
if merge_request.result_path:
log.debug(
"Providing synchronous result via future for %s",
merge_request,
)
self.merger_api.reportResult(merge_request, result)
elif merge_request.build_set_uuid:
log.debug(
"Providing asynchronous result via result event for %s",
merge_request,
)
if merge_request.job_type == MergeRequest.FILES_CHANGES:
event = FilesChangesCompletedEvent(
merge_request.build_set_uuid, files
)
else:
event = MergeCompletedEvent(
merge_request.build_set_uuid,
merged,
updated,
commit,
files,
repo_state,
item_in_branches,
)
tenant_name = merge_request.tenant_name
pipeline_name = merge_request.pipeline_name
self.result_events[tenant_name][pipeline_name].put(event)
# Set the merge request to completed, unlock and delete it. Although
# the state update is mainly for consistency reasons, it might come in
# handy in case the deletion or unlocking failes. Thus, we know that
# the merge request was already processed and we have a result in the
# result queue.
merge_request.state = MergeRequest.COMPLETED
self.merger_api.update(merge_request)
self.merger_api.unlock(merge_request)
# TODO (felix): If we want to optimize ZK requests, we could only call
# the remove() here.
self.merger_api.remove(merge_request)
class MergeServer(BaseMergeServer):

View File

@ -75,6 +75,7 @@ from zuul.zk.cleanup import (
SemaphoreCleanupLock,
BuildRequestCleanupLock,
GeneralCleanupLock,
MergeRequestCleanupLock,
)
from zuul.zk.components import (
BaseComponent, ComponentRegistry, SchedulerComponent
@ -129,6 +130,7 @@ class Scheduler(threading.Thread):
_semaphore_cleanup_interval = IntervalTrigger(minutes=60, jitter=60)
_general_cleanup_interval = IntervalTrigger(minutes=60, jitter=60)
_build_request_cleanup_interval = IntervalTrigger(seconds=60, jitter=5)
_merge_request_cleanup_interval = IntervalTrigger(seconds=60, jitter=5)
_merger_client_class = MergeClient
_executor_client_class = ExecutorClient
@ -207,6 +209,8 @@ class Scheduler(threading.Thread):
self.semaphore_cleanup_lock = SemaphoreCleanupLock(self.zk_client)
self.build_request_cleanup_lock = BuildRequestCleanupLock(
self.zk_client)
self.merge_request_cleanup_lock = MergeRequestCleanupLock(
self.zk_client)
self.abide = Abide()
self.unparsed_abide = UnparsedAbideConfig()
@ -465,6 +469,8 @@ class Scheduler(threading.Thread):
trigger=self._semaphore_cleanup_interval)
self.apsched.add_job(self._runBuildRequestCleanup,
trigger=self._build_request_cleanup_interval)
self.apsched.add_job(self._runMergeRequestCleanup,
trigger=self._merge_request_cleanup_interval)
self.apsched.add_job(self._runGeneralCleanup,
trigger=self._general_cleanup_interval)
return
@ -488,6 +494,7 @@ class Scheduler(threading.Thread):
if self.general_cleanup_lock.acquire(blocking=False):
self._runConfigCacheCleanup()
self._runExecutorApiCleanup()
self._runMergerApiCleanup()
def _runConfigCacheCleanup(self):
with self.layout_lock:
@ -509,6 +516,12 @@ class Scheduler(threading.Thread):
except Exception:
self.log.exception("Error in executor API cleanup:")
def _runMergerApiCleanup(self):
try:
self.merger.merger_api.cleanup()
except Exception:
self.log.exception("Error in merger API cleanup:")
def _runBuildRequestCleanup(self):
# If someone else is running the cleanup, skip it.
if self.build_request_cleanup_lock.acquire(blocking=False):
@ -518,6 +531,15 @@ class Scheduler(threading.Thread):
finally:
self.build_request_cleanup_lock.release()
def _runMergeRequestCleanup(self):
# If someone else is running the cleanup, skip it.
if self.merge_request_cleanup_lock.acquire(blocking=False):
try:
self.log.debug("Starting merge request cleanup")
self.merger.cleanupLostMergeRequests()
finally:
self.merge_request_cleanup_lock.release()
def addTriggerEvent(self, driver_name, event):
event.arrived_at_scheduler_timestamp = time.time()
for tenant in self.abide.tenants.values():
@ -1412,9 +1434,6 @@ class Scheduler(threading.Thread):
def _areAllBuildsComplete(self):
self.log.debug("Checking if all builds are complete")
if self.merger.areMergesOutstanding():
self.log.debug("Waiting on merger")
return False
waiting = False
for tenant in self.abide.tenants.values():
for pipeline in tenant.layout.pipelines.values():

View File

@ -14,7 +14,7 @@ import time
from abc import ABCMeta
from configparser import ConfigParser
from threading import Thread
from typing import Optional, List, Callable
from typing import List, Callable
from kazoo.client import KazooClient
from kazoo.exceptions import NoNodeError
@ -33,12 +33,12 @@ class ZooKeeperClient(object):
def __init__(
self,
hosts: str,
read_only: bool = False,
timeout: float = 10.0,
tls_cert: Optional[str] = None,
tls_key: Optional[str] = None,
tls_ca: Optional[str] = None,
hosts,
read_only=False,
timeout=10.0,
tls_cert=None,
tls_key=None,
tls_ca=None,
):
"""
Initialize the ZooKeeper base client object.
@ -60,7 +60,7 @@ class ZooKeeperClient(object):
self.tls_ca = tls_ca
self.was_lost = False
self.client: Optional[KazooClient] = None
self.client = None
self._last_retry_log: int = 0
self.on_connect_listeners: List[Callable[[], None]] = []
self.on_disconnect_listeners: List[Callable[[], None]] = []

View File

@ -29,6 +29,13 @@ class BuildRequestCleanupLock(kazoo.recipe.lock.Lock):
super().__init__(client.client, self._path)
class MergeRequestCleanupLock(kazoo.recipe.lock.Lock):
_path = '/zuul/cleanup/merge_requests'
def __init__(self, client):
super().__init__(client.client, self._path)
class GeneralCleanupLock(kazoo.recipe.lock.Lock):
_path = '/zuul/cleanup/general'