Merge "Parallelize some pipeline refresh ops"

This commit is contained in:
Zuul 2022-11-10 15:01:09 +00:00 committed by Gerrit Code Review
commit ed013d82cc
16 changed files with 418 additions and 278 deletions

View File

@ -5612,10 +5612,9 @@ class ZuulTestCase(BaseTestCase):
for tenant in sched.abide.tenants.values():
with tenant_read_lock(self.zk_client, tenant.name):
for pipeline in tenant.layout.pipelines.values():
with pipeline_lock(
self.zk_client, tenant.name, pipeline.name
) as lock:
ctx = self.createZKContext(lock)
with pipeline_lock(self.zk_client, tenant.name,
pipeline.name) as lock,\
self.createZKContext(lock) as ctx:
with pipeline.manager.currentContext(ctx):
pipeline.state.refresh(ctx)
# return the context in case the caller wants to examine iops

View File

@ -1053,9 +1053,9 @@ class TestGerritCircularDependencies(ZuulTestCase):
# We only want to have a merge failure for the first item in the queue
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
items = tenant.layout.pipelines['gate'].getAllItems()
context = self.createZKContext()
items[0].current_build_set.updateAttributes(context,
unable_to_merge=True)
with self.createZKContext() as context:
items[0].current_build_set.updateAttributes(context,
unable_to_merge=True)
self.waitUntilSettled()

View File

@ -77,8 +77,9 @@ class TestJob(BaseTestCase):
self.pipeline.state = model.PipelineState()
self.pipeline.state._set(pipeline=self.pipeline)
self.layout.addPipeline(self.pipeline)
self.queue = model.ChangeQueue.new(
self.zk_context, pipeline=self.pipeline)
with self.zk_context as ctx:
self.queue = model.ChangeQueue.new(
ctx, pipeline=self.pipeline)
self.pcontext = configloader.ParseContext(
self.connections, None, self.tenant, AnsibleManager())
@ -229,9 +230,10 @@ class TestJob(BaseTestCase):
self.assertTrue(python27.changeMatchesBranch(change))
self.assertFalse(python27diablo.changeMatchesBranch(change))
item.freezeJobGraph(self.layout, self.zk_context,
skip_file_matcher=False,
redact_secrets_and_keys=False)
with self.zk_context as ctx:
item.freezeJobGraph(self.layout, ctx,
skip_file_matcher=False,
redact_secrets_and_keys=False)
self.assertEqual(len(item.getJobs()), 1)
job = item.getJobs()[0]
self.assertEqual(job.name, 'python27')
@ -244,9 +246,10 @@ class TestJob(BaseTestCase):
self.assertTrue(python27.changeMatchesBranch(change))
self.assertTrue(python27diablo.changeMatchesBranch(change))
item.freezeJobGraph(self.layout, self.zk_context,
skip_file_matcher=False,
redact_secrets_and_keys=False)
with self.zk_context as ctx:
item.freezeJobGraph(self.layout, ctx,
skip_file_matcher=False,
redact_secrets_and_keys=False)
self.assertEqual(len(item.getJobs()), 1)
job = item.getJobs()[0]
self.assertEqual(job.name, 'python27')
@ -293,9 +296,10 @@ class TestJob(BaseTestCase):
self.assertFalse(python27.changeMatchesFiles(change))
self.pipeline.manager.getFallbackLayout = mock.Mock(return_value=None)
item.freezeJobGraph(self.layout, self.zk_context,
skip_file_matcher=False,
redact_secrets_and_keys=False)
with self.zk_context as ctx:
item.freezeJobGraph(self.layout, ctx,
skip_file_matcher=False,
redact_secrets_and_keys=False)
self.assertEqual([], item.getJobs())
def test_job_source_project(self):
@ -365,9 +369,10 @@ class TestJob(BaseTestCase):
with testtools.ExpectedException(
Exception,
"Pre-review pipeline gate does not allow post-review job"):
item.freezeJobGraph(self.layout, self.zk_context,
skip_file_matcher=False,
redact_secrets_and_keys=False)
with self.zk_context as ctx:
item.freezeJobGraph(self.layout, ctx,
skip_file_matcher=False,
redact_secrets_and_keys=False)
class TestGraph(BaseTestCase):

View File

@ -54,6 +54,7 @@ from tests.base import (
from zuul.zk.change_cache import ChangeKey
from zuul.zk.layout import LayoutState
from zuul.zk.locks import management_queue_lock
from zuul.zk import zkobject
EMPTY_LAYOUT_STATE = LayoutState("", "", 0, None, {}, -1)
@ -3584,7 +3585,8 @@ class TestScheduler(ZuulTestCase):
FakeChange = namedtuple('FakeChange', ['project', 'branch'])
fake_a = FakeChange(project1, 'master')
fake_b = FakeChange(project2, 'master')
with gate.manager.currentContext(self.createZKContext()):
with self.createZKContext() as ctx,\
gate.manager.currentContext(ctx):
gate.manager.getChangeQueue(fake_a, None)
gate.manager.getChangeQueue(fake_b, None)
q1 = gate.getQueue(project1.canonical_name, None)
@ -3606,7 +3608,8 @@ class TestScheduler(ZuulTestCase):
FakeChange = namedtuple('FakeChange', ['project', 'branch'])
fake_a = FakeChange(project1, 'master')
fake_b = FakeChange(project2, 'master')
with gate.manager.currentContext(self.createZKContext()):
with self.createZKContext() as ctx,\
gate.manager.currentContext(ctx):
gate.manager.getChangeQueue(fake_a, None)
gate.manager.getChangeQueue(fake_b, None)
q1 = gate.getQueue(project1.canonical_name, None)
@ -3628,7 +3631,8 @@ class TestScheduler(ZuulTestCase):
FakeChange = namedtuple('FakeChange', ['project', 'branch'])
fake_a = FakeChange(project1, 'master')
fake_b = FakeChange(project2, 'master')
with gate.manager.currentContext(self.createZKContext()):
with self.createZKContext() as ctx,\
gate.manager.currentContext(ctx):
gate.manager.getChangeQueue(fake_a, None)
gate.manager.getChangeQueue(fake_b, None)
q1 = gate.getQueue(project1.canonical_name, None)
@ -3649,7 +3653,8 @@ class TestScheduler(ZuulTestCase):
FakeChange = namedtuple('FakeChange', ['project', 'branch'])
fake_a = FakeChange(project1, 'master')
fake_b = FakeChange(project2, 'master')
with gate.manager.currentContext(self.createZKContext()):
with self.createZKContext() as ctx,\
gate.manager.currentContext(ctx):
gate.manager.getChangeQueue(fake_a, None)
gate.manager.getChangeQueue(fake_b, None)
q1 = gate.getQueue(project1.canonical_name, None)
@ -3671,7 +3676,8 @@ class TestScheduler(ZuulTestCase):
FakeChange = namedtuple('FakeChange', ['project', 'branch'])
fake_a = FakeChange(project1, 'master')
fake_b = FakeChange(project2, 'master')
with gate.manager.currentContext(self.createZKContext()):
with self.createZKContext() as ctx,\
gate.manager.currentContext(ctx):
gate.manager.getChangeQueue(fake_a, None)
gate.manager.getChangeQueue(fake_b, None)
q1 = gate.getQueue(project1.canonical_name, None)
@ -6433,6 +6439,33 @@ For CI problems and help debugging, contact ci@example.org"""
dict(name='hold-job', result='SUCCESS', changes='1,1'),
], ordered=False)
def test_zkobject_parallel_refresh(self):
# Test that we don't deadlock when refreshing objects
zkobject.BaseZKContext._max_workers = 1
self.executor_server.hold_jobs_in_build = True
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
A.addApproval('Code-Review', 2)
B.addApproval('Code-Review', 2)
self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
self.fake_gerrit.addEvent(B.addApproval('Approved', 1))
self.waitUntilSettled()
self.executor_server.hold_jobs_in_build = False
self.executor_server.release()
self.waitUntilSettled()
self.assertHistory([
dict(name='project-merge', result='SUCCESS', changes='1,1'),
dict(name='project-test1', result='SUCCESS', changes='1,1'),
dict(name='project-test2', result='SUCCESS', changes='1,1'),
dict(name='project-merge', result='SUCCESS', changes='1,1 2,1'),
dict(name='project-test1', result='SUCCESS', changes='1,1 2,1'),
dict(name='project-test2', result='SUCCESS', changes='1,1 2,1'),
], ordered=False)
self.assertEqual(A.data['status'], 'MERGED')
self.assertEqual(B.data['status'], 'MERGED')
class TestChangeQueues(ZuulTestCase):
tenant_config_file = 'config/change-queues/main.yaml'

View File

@ -163,8 +163,8 @@ class TestScaleOutScheduler(ZuulTestCase):
pipeline = tenant.layout.pipelines['check']
summary = zuul.model.PipelineSummary()
summary._set(pipeline=pipeline)
context = self.createZKContext()
summary.refresh(context)
with self.createZKContext() as context:
summary.refresh(context)
self.assertEqual(summary.status['change_queues'], [])
def test_config_priming(self):
@ -322,7 +322,8 @@ class TestScaleOutScheduler(ZuulTestCase):
def new_summary():
summary = zuul.model.PipelineSummary()
summary._set(pipeline=pipeline)
summary.refresh(context)
with context:
summary.refresh(context)
return summary
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
@ -345,7 +346,8 @@ class TestScaleOutScheduler(ZuulTestCase):
self.assertTrue(context.client.exists(summary2.getPath()))
# Our earlier summary object should use its cached data
summary1.refresh(context)
with context:
summary1.refresh(context)
self.assertNotEqual(summary1.status, {})
self.executor_server.hold_jobs_in_build = False
@ -354,7 +356,8 @@ class TestScaleOutScheduler(ZuulTestCase):
# The scheduler should have written a new summary that our
# second object can read now.
summary2.refresh(context)
with context:
summary2.refresh(context)
self.assertNotEqual(summary2.status, {})
@simple_layout('layouts/semaphore.yaml')

View File

@ -6194,27 +6194,28 @@ class TestSecrets(ZuulTestCase):
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
context = self.scheds.first.sched.createZKContext(None, self.log)
bs = BlobStore(context)
self.assertEqual(len(bs), 1)
with self.scheds.first.sched.createZKContext(None, self.log)\
as context:
bs = BlobStore(context)
self.assertEqual(len(bs), 1)
self.scheds.first.sched._runBlobStoreCleanup()
self.assertEqual(len(bs), 1)
self.scheds.first.sched._runBlobStoreCleanup()
self.assertEqual(len(bs), 1)
self.executor_server.hold_jobs_in_build = False
self.executor_server.release()
self.waitUntilSettled()
self.executor_server.hold_jobs_in_build = False
self.executor_server.release()
self.waitUntilSettled()
self.assertEqual(A.reported, 1, "A should report success")
self.assertHistory([
dict(name='project1-secret', result='SUCCESS', changes='1,1'),
])
self.assertEqual(
[{'secret_name': self.secret}],
self._getSecrets('project1-secret', 'playbooks'))
self.assertEqual(A.reported, 1, "A should report success")
self.assertHistory([
dict(name='project1-secret', result='SUCCESS', changes='1,1'),
])
self.assertEqual(
[{'secret_name': self.secret}],
self._getSecrets('project1-secret', 'playbooks'))
self.scheds.first.sched._runBlobStoreCleanup()
self.assertEqual(len(bs), 0)
self.scheds.first.sched._runBlobStoreCleanup()
self.assertEqual(len(bs), 0)
class TestSecretInheritance(ZuulTestCase):

View File

@ -1686,8 +1686,9 @@ class TestZKObject(ZooKeeperBaseTestCase):
return zstat.last_modified_transaction_id
# Update an object
with tenant_write_lock(self.zk_client, tenant_name) as lock:
context = ZKContext(self.zk_client, lock, stop_event, self.log)
with tenant_write_lock(self.zk_client, tenant_name) as lock,\
ZKContext(self.zk_client, lock, stop_event, self.log)\
as context:
ltime1 = get_ltime(pipeline1)
pipeline1.updateAttributes(context, foo='qux')
self.assertEqual(pipeline1.foo, 'qux')
@ -1700,8 +1701,9 @@ class TestZKObject(ZooKeeperBaseTestCase):
self.assertEqual(ltime2, ltime3)
# Update an object using an active context
with tenant_write_lock(self.zk_client, tenant_name) as lock:
context = ZKContext(self.zk_client, lock, stop_event, self.log)
with tenant_write_lock(self.zk_client, tenant_name) as lock,\
ZKContext(self.zk_client, lock, stop_event, self.log)\
as context:
ltime1 = get_ltime(pipeline1)
with pipeline1.activeContext(context):
pipeline1.foo = 'baz'
@ -1721,14 +1723,16 @@ class TestZKObject(ZooKeeperBaseTestCase):
self.assertEqual(pipeline1.foo, 'baz')
# Refresh an existing object
with tenant_write_lock(self.zk_client, tenant_name) as lock:
context = ZKContext(self.zk_client, lock, stop_event, self.log)
with tenant_write_lock(self.zk_client, tenant_name) as lock,\
ZKContext(self.zk_client, lock, stop_event, self.log)\
as context:
pipeline2.refresh(context)
self.assertEqual(pipeline2.foo, 'baz')
# Delete an object
with tenant_write_lock(self.zk_client, tenant_name) as lock:
context = ZKContext(self.zk_client, lock, stop_event, self.log)
with tenant_write_lock(self.zk_client, tenant_name) as lock,\
ZKContext(self.zk_client, lock, stop_event, self.log)\
as context:
self.assertIsNotNone(self.zk_client.client.exists(
'/zuul/pipeline/fake_tenant'))
pipeline2.delete(context)
@ -1770,8 +1774,9 @@ class TestZKObject(ZooKeeperBaseTestCase):
return self._real_client.set(*args, **kw)
# Fail an update
with tenant_write_lock(self.zk_client, tenant_name) as lock:
context = ZKContext(self.zk_client, lock, stop_event, self.log)
with tenant_write_lock(self.zk_client, tenant_name) as lock,\
ZKContext(self.zk_client, lock, stop_event, self.log)\
as context:
pipeline1 = zkobject_class.new(context,
name=tenant_name,
foo='one')
@ -1976,8 +1981,9 @@ class TestConfigurationErrorList(ZooKeeperBaseTestCase):
start_mark = model.ZuulMark(m1, m2, 'hello')
# Create a new object
with tenant_write_lock(self.zk_client, 'test') as lock:
context = ZKContext(self.zk_client, lock, stop_event, self.log)
with tenant_write_lock(self.zk_client, 'test') as lock,\
ZKContext(self.zk_client, lock, stop_event, self.log)\
as context:
pipeline = DummyZKObject.new(context, name="test", foo="bar")
e1 = model.ConfigurationError(
source_context, start_mark, "Test error1")
@ -2006,8 +2012,9 @@ class TestBlobStore(ZooKeeperBaseTestCase):
tenant_name = 'fake_tenant'
start_ltime = self.zk_client.getCurrentLtime()
with tenant_write_lock(self.zk_client, tenant_name) as lock:
context = ZKContext(self.zk_client, lock, stop_event, self.log)
with tenant_write_lock(self.zk_client, tenant_name) as lock,\
ZKContext(self.zk_client, lock, stop_event, self.log)\
as context:
bs = BlobStore(context)
with testtools.ExpectedException(KeyError):
bs.get('nope')

View File

@ -1035,9 +1035,9 @@ class Client(zuul.cmd.ZuulApp):
zk_client.client.delete(
f'/zuul/tenant/{safe_tenant}/pipeline/{safe_pipeline}',
recursive=True)
context = ZKContext(zk_client, lock, None, self.log)
ps = PipelineState.new(context, _path=path,
layout_uuid=layout_uuid)
with ZKContext(zk_client, lock, None, self.log) as context:
ps = PipelineState.new(context, _path=path,
layout_uuid=layout_uuid)
# Force everyone to make a new layout for this tenant in
# order to rebuild the shared change queues.
layout_state = LayoutState(

View File

@ -971,8 +971,8 @@ class AnsibleJob(object):
def __init__(self, executor_server, build_request, arguments):
logger = logging.getLogger("zuul.AnsibleJob")
self.arguments = arguments
self.job = FrozenJob.fromZK(executor_server.zk_context,
arguments["job_ref"])
with executor_server.zk_context as ctx:
self.job = FrozenJob.fromZK(ctx, arguments["job_ref"])
self.arguments["zuul"].update(zuul_params_from_job(self.job))
self.zuul_event_id = self.arguments["zuul_event_id"]
@ -1246,11 +1246,12 @@ class AnsibleJob(object):
def loadRepoState(self):
merge_rs_path = self.arguments['merge_repo_state_ref']
merge_repo_state = merge_rs_path and MergeRepoState.fromZK(
self.executor_server.zk_context, merge_rs_path)
extra_rs_path = self.arguments['extra_repo_state_ref']
extra_repo_state = extra_rs_path and ExtraRepoState.fromZK(
self.executor_server.zk_context, extra_rs_path)
with self.executor_server.zk_context as ctx:
merge_repo_state = merge_rs_path and MergeRepoState.fromZK(
ctx, merge_rs_path)
extra_rs_path = self.arguments['extra_repo_state_ref']
extra_repo_state = extra_rs_path and ExtraRepoState.fromZK(
ctx, extra_rs_path)
d = {}
for rs in (merge_repo_state, extra_repo_state):
if not rs:
@ -2088,23 +2089,24 @@ class AnsibleJob(object):
"""
ret = {}
blobstore = BlobStore(self.executor_server.zk_context)
for secret_name, secret_index in secrets.items():
if isinstance(secret_index, dict):
key = secret_index['blob']
data = blobstore.get(key)
frozen_secret = json.loads(data.decode('utf-8'))
else:
frozen_secret = self.job.secrets[secret_index]
secret = zuul.model.Secret(secret_name, None)
secret.secret_data = yaml.encrypted_load(
frozen_secret['encrypted_data'])
private_secrets_key, public_secrets_key = \
self.executor_server.keystore.getProjectSecretsKeys(
frozen_secret['connection_name'],
frozen_secret['project_name'])
secret = secret.decrypt(private_secrets_key)
ret[secret_name] = secret.secret_data
with self.executor_server.zk_context as ctx:
blobstore = BlobStore(ctx)
for secret_name, secret_index in secrets.items():
if isinstance(secret_index, dict):
key = secret_index['blob']
data = blobstore.get(key)
frozen_secret = json.loads(data.decode('utf-8'))
else:
frozen_secret = self.job.secrets[secret_index]
secret = zuul.model.Secret(secret_name, None)
secret.secret_data = yaml.encrypted_load(
frozen_secret['encrypted_data'])
private_secrets_key, public_secrets_key = \
self.executor_server.keystore.getProjectSecretsKeys(
frozen_secret['connection_name'],
frozen_secret['project_name'])
secret = secret.decrypt(private_secrets_key)
ret[secret_name] = secret.secret_data
return ret
def checkoutTrustedProject(self, project, branch, args):

View File

@ -102,8 +102,8 @@ class PipelineManager(metaclass=ABCMeta):
# state may still be out of date after this because we skip
# the refresh.
self.buildChangeQueues(layout)
ctx = self.sched.createZKContext(None, self.log)
with self.currentContext(ctx):
with self.sched.createZKContext(None, self.log) as ctx,\
self.currentContext(ctx):
if layout.uuid == PipelineState.peekLayoutUUID(self.pipeline):
self.pipeline.state = PipelineState()
self.pipeline.state._set(pipeline=self.pipeline)
@ -112,25 +112,25 @@ class PipelineManager(metaclass=ABCMeta):
return
with pipeline_lock(
self.sched.zk_client, self.pipeline.tenant.name, self.pipeline.name
) as lock:
ctx = self.sched.createZKContext(lock, self.log)
with self.currentContext(ctx):
# Since the layout UUID is new, this will move queues
# to "old_queues". Note that it will *not* refresh
# the contents, in fact, we will get a new
# PipelineState python object with no queues, just as
# above. Our state is guaranteed to be out of date
# now, but we don't need to do anything with it, we
# will let the next actor to use it refresh it then.
self.pipeline.state = PipelineState.resetOrCreate(
self.pipeline, layout.uuid)
self.pipeline.change_list = PipelineChangeList.create(
self.pipeline)
event = PipelinePostConfigEvent()
self.sched.pipeline_management_events[
self.pipeline.tenant.name][self.pipeline.name].put(
event, needs_result=False)
self.sched.zk_client, self.pipeline.tenant.name,
self.pipeline.name) as lock,\
self.sched.createZKContext(lock, self.log) as ctx,\
self.currentContext(ctx):
# Since the layout UUID is new, this will move queues
# to "old_queues". Note that it will *not* refresh
# the contents, in fact, we will get a new
# PipelineState python object with no queues, just as
# above. Our state is guaranteed to be out of date
# now, but we don't need to do anything with it, we
# will let the next actor to use it refresh it then.
self.pipeline.state = PipelineState.resetOrCreate(
self.pipeline, layout.uuid)
self.pipeline.change_list = PipelineChangeList.create(
self.pipeline)
event = PipelinePostConfigEvent()
self.sched.pipeline_management_events[
self.pipeline.tenant.name][self.pipeline.name].put(
event, needs_result=False)
def buildChangeQueues(self, layout):
self.log.debug("Building relative_priority queues")

View File

@ -1050,14 +1050,26 @@ class ChangeQueue(zkobject.ZKObject):
})
items_by_path = OrderedDict()
# This is a tuple of (x, Future), where x is None if no action
# needs to be taken, or a string to indicate which kind of job
# it was. This structure allows us to execute async ZK reads
# and perform local data updates in order.
tpe_jobs = []
tpe = context.executor[ChangeQueue]
for item_path in data["queue"]:
item = existing_items.get(item_path)
items_by_path[item_path] = item
if item:
item.refresh(context)
tpe_jobs.append((None, tpe.submit(item.refresh, context)))
else:
item = QueueItem.fromZK(context, item_path,
pipeline=self.pipeline, queue=self)
items_by_path[item.getPath()] = item
tpe_jobs.append(('item', tpe.submit(
QueueItem.fromZK, context, item_path,
pipeline=self.pipeline, queue=self)))
for (kind, future) in tpe_jobs:
result = future.result()
if kind == 'item':
items_by_path[result.getPath()] = result
# Resolve ahead/behind references between queue items
for item in items_by_path.values():
@ -4193,6 +4205,13 @@ class BuildSet(zkobject.ZKObject):
existing_retry_builds = {b.getPath(): b
for bl in self.retry_builds.values()
for b in bl}
# This is a tuple of (kind, job_name, Future), where kind is
# None if no action needs to be taken, or a string to indicate
# which kind of job it was. This structure allows us to
# execute async ZK reads and perform local data updates in
# order.
tpe_jobs = []
tpe = context.executor[BuildSet]
# jobs (deserialize as separate objects)
if data['job_graph']:
for job_name in data['job_graph'].jobs:
@ -4207,39 +4226,62 @@ class BuildSet(zkobject.ZKObject):
if job_name in self.jobs:
job = self.jobs[job_name]
if not old_build_exists:
job.refresh(context)
tpe_jobs.append((None, job_name,
tpe.submit(job.refresh, context)))
else:
job_path = FrozenJob.jobPath(job_name, self.getPath())
job = FrozenJob.fromZK(context, job_path, buildset=self)
self.jobs[job_name] = job
tpe_jobs.append(('job', job_name, tpe.submit(
FrozenJob.fromZK, context, job_path, buildset=self)))
if build_path:
build = self.builds.get(job_name)
builds[job_name] = build
if build and build.getPath() == build_path:
if not build.result:
build.refresh(context)
tpe_jobs.append((
None, job_name, tpe.submit(
build.refresh, context)))
else:
if not self._isMyBuild(build_path):
build = BuildReference(build_path)
context.build_references = True
builds[job_name] = build
else:
build = Build.fromZK(
context, build_path, job=job, build_set=self)
builds[job_name] = build
tpe_jobs.append((
'build', job_name, tpe.submit(
Build.fromZK, context, build_path,
build_set=self)))
for retry_path in data["retry_builds"].get(job_name, []):
retry_build = existing_retry_builds.get(retry_path)
if retry_build and retry_build.getPath() == retry_path:
# Retry builds never change.
pass
retry_builds[job_name].append(retry_build)
else:
if not self._isMyBuild(retry_path):
retry_build = BuildReference(retry_path)
context.build_references = True
retry_builds[job_name].append(retry_build)
else:
retry_build = Build.fromZK(
context, retry_path, job=job, build_set=self)
retry_builds[job_name].append(retry_build)
tpe_jobs.append((
'retry', job_name, tpe.submit(
Build.fromZK, context, retry_path,
build_set=self)))
for (kind, job_name, future) in tpe_jobs:
result = future.result()
if kind == 'job':
self.jobs[job_name] = result
elif kind == 'build':
# We normally set the job on the constructor, but we
# may not have had it in time. At this point though,
# the job future is guaranteed to have completed, so
# we can look it up now.
result._set(job=self.jobs[job_name])
builds[job_name] = result
elif kind == 'retry':
result._set(job=self.jobs[job_name])
retry_builds[job_name].append(result)
data.update({
"builds": builds,

View File

@ -681,16 +681,16 @@ class Scheduler(threading.Thread):
with pipeline_lock(
self.zk_client, tenant.name, pipeline.name,
) as lock:
ctx = self.createZKContext(lock, self.log)
with pipeline.manager.currentContext(ctx):
pipeline.change_list.refresh(ctx)
pipeline.state.refresh(ctx)
# In case we're in the middle of a reconfig,
# include the old queue items.
for item in pipeline.getAllItems(include_old=True):
nrs = item.current_build_set.node_requests
for req_id in (nrs.values()):
outstanding_requests.add(req_id)
with self.createZKContext(lock, self.log) as ctx:
with pipeline.manager.currentContext(ctx):
pipeline.change_list.refresh(ctx)
pipeline.state.refresh(ctx)
# In case we're in the middle of a reconfig,
# include the old queue items.
for item in pipeline.getAllItems(include_old=True):
nrs = item.current_build_set.node_requests
for req_id in (nrs.values()):
outstanding_requests.add(req_id)
leaked_requests = zk_requests - outstanding_requests
for req_id in leaked_requests:
try:
@ -763,24 +763,24 @@ class Scheduler(threading.Thread):
for tenant in self.abide.tenants.values():
for pipeline in tenant.layout.pipelines.values():
with pipeline_lock(
self.zk_client, tenant.name, pipeline.name,
) as lock:
ctx = self.createZKContext(lock, self.log)
self.zk_client, tenant.name,
pipeline.name) as lock,\
self.createZKContext(lock, self.log) as ctx:
pipeline.state.refresh(ctx)
# add any blobstore references
for item in pipeline.getAllItems(include_old=True):
live_blobs.update(item.getBlobKeys())
ctx = self.createZKContext(None, self.log)
blobstore = BlobStore(ctx)
# get the set of blob keys unused since the start time
# (ie, we have already filtered any newly added keys)
unused_blobs = blobstore.getKeysLastUsedBefore(start_ltime)
# remove the current refences
unused_blobs -= live_blobs
# delete what's left
for key in unused_blobs:
self.log.debug("Deleting unused blob: %s", key)
blobstore.delete(key, start_ltime)
with self.createZKContext(None, self.log) as ctx:
blobstore = BlobStore(ctx)
# get the set of blob keys unused since the start time
# (ie, we have already filtered any newly added keys)
unused_blobs = blobstore.getKeysLastUsedBefore(start_ltime)
# remove the current refences
unused_blobs -= live_blobs
# delete what's left
for key in unused_blobs:
self.log.debug("Deleting unused blob: %s", key)
blobstore.delete(key, start_ltime)
self.log.debug("Finished blob store cleanup")
except Exception:
self.log.exception("Error in blob store cleanup:")
@ -1005,8 +1005,9 @@ class Scheduler(threading.Thread):
if layout_state is None:
# Reconfigure only tenants w/o an existing layout state
ctx = self.createZKContext(tlock, self.log)
self._reconfigureTenant(ctx, min_ltimes, -1, tenant)
with self.createZKContext(tlock, self.log) as ctx:
self._reconfigureTenant(
ctx, min_ltimes, -1, tenant)
self._reportInitialStats(tenant)
else:
self.local_layout_state[tenant_name] = layout_state
@ -1455,17 +1456,17 @@ class Scheduler(threading.Thread):
self.unparsed_abide, min_ltimes=min_ltimes,
branch_cache_min_ltimes=branch_cache_min_ltimes)
reconfigured_tenants.append(tenant_name)
ctx = self.createZKContext(lock, self.log)
if tenant is not None:
self._reconfigureTenant(ctx, min_ltimes,
-1,
tenant, old_tenant)
else:
self._reconfigureDeleteTenant(ctx, old_tenant)
with suppress(KeyError):
del self.tenant_layout_state[tenant_name]
with suppress(KeyError):
del self.local_layout_state[tenant_name]
with self.createZKContext(lock, self.log) as ctx:
if tenant is not None:
self._reconfigureTenant(ctx, min_ltimes,
-1,
tenant, old_tenant)
else:
self._reconfigureDeleteTenant(ctx, old_tenant)
with suppress(KeyError):
del self.tenant_layout_state[tenant_name]
with suppress(KeyError):
del self.local_layout_state[tenant_name]
duration = round(time.monotonic() - start, 3)
self.log.info("Reconfiguration complete (smart: %s, tenants: %s, "
@ -1521,10 +1522,10 @@ class Scheduler(threading.Thread):
self.unparsed_abide, min_ltimes=min_ltimes,
branch_cache_min_ltimes=branch_cache_min_ltimes)
tenant = self.abide.tenants[event.tenant_name]
ctx = self.createZKContext(lock, self.log)
self._reconfigureTenant(ctx, min_ltimes,
event.trigger_event_ltime,
tenant, old_tenant)
with self.createZKContext(lock, self.log) as ctx:
self._reconfigureTenant(ctx, min_ltimes,
event.trigger_event_ltime,
tenant, old_tenant)
duration = round(time.monotonic() - start, 3)
self.log.info("Tenant reconfiguration complete for %s (duration: %s "
"seconds)", event.tenant_name, duration)
@ -2125,9 +2126,9 @@ class Scheduler(threading.Thread):
stats_key = f'zuul.tenant.{tenant.name}.pipeline.{pipeline.name}'
try:
with pipeline_lock(
self.zk_client, tenant.name, pipeline.name, blocking=False
) as lock:
ctx = self.createZKContext(lock, self.log)
self.zk_client, tenant.name, pipeline.name,
blocking=False) as lock,\
self.createZKContext(lock, self.log) as ctx:
with pipeline.manager.currentContext(ctx):
with self.statsd_timer(f'{stats_key}.handling'):
refreshed = self._process_pipeline(
@ -2234,8 +2235,8 @@ class Scheduler(threading.Thread):
def _gatherConnectionCacheKeys(self):
relevant = set()
with self.layout_lock:
ctx = self.createZKContext(None, self.log)
with self.layout_lock,\
self.createZKContext(None, self.log) as ctx:
for tenant in self.abide.tenants.values():
for pipeline in tenant.layout.pipelines.values():
self.log.debug("Gather relevant cache items for: %s %s",

View File

@ -972,21 +972,23 @@ class ZuulWebAPI(object):
def _tenants(self):
result = []
for tenant_name, tenant in sorted(self.zuulweb.abide.tenants.items()):
queue_size = 0
for pipeline in tenant.layout.pipelines.values():
status = pipeline.summary.refresh(self.zuulweb.zk_context)
for queue in status.get("change_queues", []):
for head in queue["heads"]:
for item in head:
if item["live"]:
queue_size += 1
with self.zuulweb.zk_context as ctx:
for tenant_name, tenant in sorted(
self.zuulweb.abide.tenants.items()):
queue_size = 0
for pipeline in tenant.layout.pipelines.values():
status = pipeline.summary.refresh(ctx)
for queue in status.get("change_queues", []):
for head in queue["heads"]:
for item in head:
if item["live"]:
queue_size += 1
result.append({
'name': tenant_name,
'projects': len(tenant.untrusted_projects),
'queue': queue_size,
})
result.append({
'name': tenant_name,
'projects': len(tenant.untrusted_projects),
'queue': queue_size,
})
return result
@cherrypy.expose
@ -1096,13 +1098,16 @@ class ZuulWebAPI(object):
result_event_queues = self.zuulweb.pipeline_result_events[tenant.name]
management_event_queues = self.zuulweb.pipeline_management_events[
tenant.name]
for pipeline in tenant.layout.pipelines.values():
status = pipeline.summary.refresh(self.zuulweb.zk_context)
status['trigger_events'] = len(trigger_event_queues[pipeline.name])
status['result_events'] = len(result_event_queues[pipeline.name])
status['management_events'] = len(
management_event_queues[pipeline.name])
pipelines.append(status)
with self.zuulweb.zk_context as ctx:
for pipeline in tenant.layout.pipelines.values():
status = pipeline.summary.refresh(ctx)
status['trigger_events'] = len(
trigger_event_queues[pipeline.name])
status['result_events'] = len(
result_event_queues[pipeline.name])
status['management_events'] = len(
management_event_queues[pipeline.name])
pipelines.append(status)
return data, json.dumps(data).encode('utf-8')
def _getTenantOrRaise(self, tenant_name):
@ -1694,13 +1699,13 @@ class ZuulWebAPI(object):
change = Branch(project)
change.branch = branch_name or "master"
context = LocalZKContext(self.log)
queue = ChangeQueue.new(context, pipeline=pipeline)
item = QueueItem.new(context, queue=queue, change=change,
pipeline=queue.pipeline)
item.freezeJobGraph(tenant.layout, context,
skip_file_matcher=True,
redact_secrets_and_keys=True)
with LocalZKContext(self.log) as context:
queue = ChangeQueue.new(context, pipeline=pipeline)
item = QueueItem.new(context, queue=queue, change=change,
pipeline=queue.pipeline)
item.freezeJobGraph(tenant.layout, context,
skip_file_matcher=True,
redact_secrets_and_keys=True)
return item

View File

@ -134,15 +134,14 @@ class BlobStore:
return key
# make a new context based on the old one
locked_context = ZKContext(self.context.client, lock,
self.context.stop_event,
self.context.log)
self._retry(
locked_context,
self._retryableSave,
locked_context, path, flag, data)
self.context.updateStatsFromOtherContext(locked_context)
with ZKContext(self.context.client, lock,
self.context.stop_event,
self.context.log) as locked_context:
self._retry(
locked_context,
self._retryableSave,
locked_context, path, flag, data)
self.context.updateStatsFromOtherContext(locked_context)
return key
def delete(self, key, ltime):
@ -158,18 +157,18 @@ class BlobStore:
blocking=True
) as lock:
# make a new context based on the old one
locked_context = ZKContext(self.context.client, lock,
self.context.stop_event,
self.context.log)
with ZKContext(self.context.client, lock,
self.context.stop_event,
self.context.log) as locked_context:
# Double check that it hasn't been used since we
# decided to delete it
data, zstat = self._retry(locked_context,
self.context.client.get,
flag)
if zstat.last_modified_transaction_id < ltime:
self._retry(locked_context, self.context.client.delete,
path, recursive=True)
# Double check that it hasn't been used since we
# decided to delete it
data, zstat = self._retry(locked_context,
self.context.client.get,
flag)
if zstat.last_modified_transaction_id < ltime:
self._retry(locked_context, self.context.client.delete,
path, recursive=True)
except NoNodeError:
raise KeyError(key)

View File

@ -102,25 +102,27 @@ class BranchCache:
# to the context.
self.zk_context = ZKContext(zk_client, self.wlock, None, self.log)
with locked(self.wlock):
with self.zk_context as ctx,\
locked(self.wlock):
try:
self.cache = BranchCacheZKObject.fromZK(
self.zk_context, data_path, _path=data_path)
ctx, data_path, _path=data_path)
except NoNodeError:
self.cache = BranchCacheZKObject.new(
self.zk_context, _path=data_path)
ctx, _path=data_path)
def clear(self, projects=None):
"""Clear the cache"""
with locked(self.wlock):
with self.cache.activeContext(self.zk_context):
if projects is None:
self.cache.protected.clear()
self.cache.remainder.clear()
else:
for p in projects:
self.cache.protected.pop(p, None)
self.cache.remainder.pop(p, None)
with locked(self.wlock),\
self.zk_context as ctx,\
self.cache.activeContext(ctx):
if projects is None:
self.cache.protected.clear()
self.cache.remainder.clear()
else:
for p in projects:
self.cache.protected.pop(p, None)
self.cache.remainder.pop(p, None)
def getProjectBranches(self, project_name, exclude_unprotected,
min_ltime=-1, default=RAISE_EXCEPTION):
@ -154,8 +156,9 @@ class BranchCache:
an error when fetching the branches.
"""
if self.ltime < min_ltime:
with locked(self.rlock):
self.cache.refresh(self.zk_context)
with locked(self.rlock),\
self.zk_context as ctx:
self.cache.refresh(ctx)
protected_branches = None
try:
@ -197,23 +200,24 @@ class BranchCache:
The list of branches or None to indicate a fetch error.
"""
with locked(self.wlock):
with self.cache.activeContext(self.zk_context):
if exclude_unprotected:
self.cache.protected[project_name] = branches
remainder_branches = self.cache.remainder.get(project_name)
if remainder_branches and branches:
remainder = list(set(remainder_branches) -
set(branches))
self.cache.remainder[project_name] = remainder
else:
protected_branches = self.cache.protected.get(project_name)
if protected_branches and branches:
remainder = list(set(branches) -
set(protected_branches))
else:
remainder = branches
with locked(self.wlock),\
self.zk_context as ctx,\
self.cache.activeContext(ctx):
if exclude_unprotected:
self.cache.protected[project_name] = branches
remainder_branches = self.cache.remainder.get(project_name)
if remainder_branches and branches:
remainder = list(set(remainder_branches) -
set(branches))
self.cache.remainder[project_name] = remainder
else:
protected_branches = self.cache.protected.get(project_name)
if protected_branches and branches:
remainder = list(set(branches) -
set(protected_branches))
else:
remainder = branches
self.cache.remainder[project_name] = remainder
def setProtected(self, project_name, branch, protected):
"""Correct the protection state of a branch.
@ -222,33 +226,34 @@ class BranchCache:
receiving an explicit event.
"""
with locked(self.wlock):
with self.cache.activeContext(self.zk_context):
protected_branches = self.cache.protected.get(project_name)
remainder_branches = self.cache.remainder.get(project_name)
if protected:
if protected_branches is None:
# We've never run a protected query, so we
# should ignore this branch.
return
else:
# We have run a protected query; if we have
# also run an unprotected query, we need to
# move the branch from remainder to protected.
if remainder_branches and branch in remainder_branches:
remainder_branches.remove(branch)
if branch not in protected_branches:
protected_branches.append(branch)
with locked(self.wlock),\
self.zk_context as ctx,\
self.cache.activeContext(ctx):
protected_branches = self.cache.protected.get(project_name)
remainder_branches = self.cache.remainder.get(project_name)
if protected:
if protected_branches is None:
# We've never run a protected query, so we
# should ignore this branch.
return
else:
if protected_branches and branch in protected_branches:
protected_branches.remove(branch)
if remainder_branches is None:
# We've never run an unprotected query, so we
# should ignore this branch.
return
else:
if branch not in remainder_branches:
remainder_branches.append(branch)
# We have run a protected query; if we have
# also run an unprotected query, we need to
# move the branch from remainder to protected.
if remainder_branches and branch in remainder_branches:
remainder_branches.remove(branch)
if branch not in protected_branches:
protected_branches.append(branch)
else:
if protected_branches and branch in protected_branches:
protected_branches.remove(branch)
if remainder_branches is None:
# We've never run an unprotected query, so we
# should ignore this branch.
return
else:
if branch not in remainder_branches:
remainder_branches.append(branch)
@property
def ltime(self):

View File

@ -1,4 +1,4 @@
# Copyright 2021 Acme Gating, LLC
# Copyright 2021-2022 Acme Gating, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@ -12,12 +12,15 @@
# License for the specific language governing permissions and limitations
# under the License.
from concurrent.futures import ThreadPoolExecutor
import contextlib
import json
import logging
import sys
import time
import types
import zlib
import collections
from kazoo.exceptions import NodeExistsError, NoNodeError
from kazoo.retry import KazooRetry
@ -26,11 +29,45 @@ from zuul.zk import sharding
from zuul.zk import ZooKeeperClient
class ZKContext:
class BaseZKContext:
profile_logger = logging.getLogger('zuul.profile')
profile_default = False
# Only changed by unit tests.
# The default scales with number of procs.
_max_workers = None
def __init__(self):
# We create the executor dict in enter to make sure that this
# is used as a context manager and cleaned up properly.
self.executor = None
def __enter__(self):
if self.executor:
raise RuntimeError("ZKContext entered multiple times")
# This is a dictionary keyed by class. ZKObject subclasses
# can request a dedicated ThreadPoolExecutor for their class
# so that deserialize methods that use it can avoid deadlocks
# with child class deserialize methods.
self.executor = collections.defaultdict(
lambda: ThreadPoolExecutor(
max_workers=self._max_workers,
thread_name_prefix="ZKContext",
))
return self
def __exit__(self, etype, value, tb):
if self.executor:
for executor in self.executor.values():
if sys.version_info >= (3, 9):
executor.shutdown(wait=False, cancel_futures=True)
else:
executor.shutdown(wait=False)
self.executor = None
class ZKContext(BaseZKContext):
def __init__(self, zk_client, lock, stop_event, log):
super().__init__()
if isinstance(zk_client, ZooKeeperClient):
client = zk_client.client
else:
@ -80,10 +117,11 @@ class ZKContext:
self.cumulative_read_bytes, self.cumulative_write_bytes)
class LocalZKContext:
class LocalZKContext(BaseZKContext):
"""A Local ZKContext that means don't actually write anything to ZK"""
def __init__(self, log):
super().__init__()
self.client = None
self.lock = None
self.stop_event = None