Parallelize some pipeline refresh ops
We may be able to speed up pipeline refreshes in cases where there are large numbers of items or jobs/builds by parallelizing ZK reads. Quick refresher: the ZK protocol is async, and kazoo uses a queue to send operations to a single thread which manages IO. We typically call synchronous kazoo client methods which wait for the async result before returning. Since this is all thread-safe, we can attempt to fill the kazoo pipe by having multiple threads call the synchronous kazoo methods. If kazoo is waiting on IO for an earlier call, it will be able to start a later request simultaneously. Quick aside: it would be difficult for us to use the async methods directly since our overall code structure is still ordered and effectively single threaded (we need to load a QueueItem before we can load the BuildSet and the Builds, etc). Thus it makes the most sense for us to retain our ordering by using a ThreadPoolExecutor to run some operations in parallel. This change parallelizes loading QueueItems within a ChangeQueue, and also Builds/Jobs within a BuildSet. These are the points in a pipeline refresh tree which potentially have the largest number of children and could benefit the most from the change, especially if the ZK server has some measurable latency. Change-Id: I0871cc05a2d13e4ddc4ac284bd67e5e3003200ad
This commit is contained in:
parent
8a8502f661
commit
3a981b89a8
@ -5611,10 +5611,9 @@ class ZuulTestCase(BaseTestCase):
|
||||
for tenant in sched.abide.tenants.values():
|
||||
with tenant_read_lock(self.zk_client, tenant.name):
|
||||
for pipeline in tenant.layout.pipelines.values():
|
||||
with pipeline_lock(
|
||||
self.zk_client, tenant.name, pipeline.name
|
||||
) as lock:
|
||||
ctx = self.createZKContext(lock)
|
||||
with pipeline_lock(self.zk_client, tenant.name,
|
||||
pipeline.name) as lock,\
|
||||
self.createZKContext(lock) as ctx:
|
||||
with pipeline.manager.currentContext(ctx):
|
||||
pipeline.state.refresh(ctx)
|
||||
# return the context in case the caller wants to examine iops
|
||||
|
@ -1053,9 +1053,9 @@ class TestGerritCircularDependencies(ZuulTestCase):
|
||||
# We only want to have a merge failure for the first item in the queue
|
||||
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
|
||||
items = tenant.layout.pipelines['gate'].getAllItems()
|
||||
context = self.createZKContext()
|
||||
items[0].current_build_set.updateAttributes(context,
|
||||
unable_to_merge=True)
|
||||
with self.createZKContext() as context:
|
||||
items[0].current_build_set.updateAttributes(context,
|
||||
unable_to_merge=True)
|
||||
|
||||
self.waitUntilSettled()
|
||||
|
||||
|
@ -73,8 +73,9 @@ class TestJob(BaseTestCase):
|
||||
self.pipeline.state = model.PipelineState()
|
||||
self.pipeline.state._set(pipeline=self.pipeline)
|
||||
self.layout.addPipeline(self.pipeline)
|
||||
self.queue = model.ChangeQueue.new(
|
||||
self.zk_context, pipeline=self.pipeline)
|
||||
with self.zk_context as ctx:
|
||||
self.queue = model.ChangeQueue.new(
|
||||
ctx, pipeline=self.pipeline)
|
||||
self.pcontext = configloader.ParseContext(
|
||||
self.connections, None, self.tenant, AnsibleManager())
|
||||
|
||||
@ -221,9 +222,10 @@ class TestJob(BaseTestCase):
|
||||
self.assertTrue(python27.changeMatchesBranch(change))
|
||||
self.assertFalse(python27diablo.changeMatchesBranch(change))
|
||||
|
||||
item.freezeJobGraph(self.layout, self.zk_context,
|
||||
skip_file_matcher=False,
|
||||
redact_secrets_and_keys=False)
|
||||
with self.zk_context as ctx:
|
||||
item.freezeJobGraph(self.layout, ctx,
|
||||
skip_file_matcher=False,
|
||||
redact_secrets_and_keys=False)
|
||||
self.assertEqual(len(item.getJobs()), 1)
|
||||
job = item.getJobs()[0]
|
||||
self.assertEqual(job.name, 'python27')
|
||||
@ -236,9 +238,10 @@ class TestJob(BaseTestCase):
|
||||
self.assertTrue(python27.changeMatchesBranch(change))
|
||||
self.assertTrue(python27diablo.changeMatchesBranch(change))
|
||||
|
||||
item.freezeJobGraph(self.layout, self.zk_context,
|
||||
skip_file_matcher=False,
|
||||
redact_secrets_and_keys=False)
|
||||
with self.zk_context as ctx:
|
||||
item.freezeJobGraph(self.layout, ctx,
|
||||
skip_file_matcher=False,
|
||||
redact_secrets_and_keys=False)
|
||||
self.assertEqual(len(item.getJobs()), 1)
|
||||
job = item.getJobs()[0]
|
||||
self.assertEqual(job.name, 'python27')
|
||||
@ -285,9 +288,10 @@ class TestJob(BaseTestCase):
|
||||
self.assertFalse(python27.changeMatchesFiles(change))
|
||||
|
||||
self.pipeline.manager.getFallbackLayout = mock.Mock(return_value=None)
|
||||
item.freezeJobGraph(self.layout, self.zk_context,
|
||||
skip_file_matcher=False,
|
||||
redact_secrets_and_keys=False)
|
||||
with self.zk_context as ctx:
|
||||
item.freezeJobGraph(self.layout, ctx,
|
||||
skip_file_matcher=False,
|
||||
redact_secrets_and_keys=False)
|
||||
self.assertEqual([], item.getJobs())
|
||||
|
||||
def test_job_source_project(self):
|
||||
@ -357,9 +361,10 @@ class TestJob(BaseTestCase):
|
||||
with testtools.ExpectedException(
|
||||
Exception,
|
||||
"Pre-review pipeline gate does not allow post-review job"):
|
||||
item.freezeJobGraph(self.layout, self.zk_context,
|
||||
skip_file_matcher=False,
|
||||
redact_secrets_and_keys=False)
|
||||
with self.zk_context as ctx:
|
||||
item.freezeJobGraph(self.layout, ctx,
|
||||
skip_file_matcher=False,
|
||||
redact_secrets_and_keys=False)
|
||||
|
||||
|
||||
class TestGraph(BaseTestCase):
|
||||
|
@ -54,6 +54,7 @@ from tests.base import (
|
||||
from zuul.zk.change_cache import ChangeKey
|
||||
from zuul.zk.layout import LayoutState
|
||||
from zuul.zk.locks import management_queue_lock
|
||||
from zuul.zk import zkobject
|
||||
|
||||
EMPTY_LAYOUT_STATE = LayoutState("", "", 0, None, {}, -1)
|
||||
|
||||
@ -3584,7 +3585,8 @@ class TestScheduler(ZuulTestCase):
|
||||
FakeChange = namedtuple('FakeChange', ['project', 'branch'])
|
||||
fake_a = FakeChange(project1, 'master')
|
||||
fake_b = FakeChange(project2, 'master')
|
||||
with gate.manager.currentContext(self.createZKContext()):
|
||||
with self.createZKContext() as ctx,\
|
||||
gate.manager.currentContext(ctx):
|
||||
gate.manager.getChangeQueue(fake_a, None)
|
||||
gate.manager.getChangeQueue(fake_b, None)
|
||||
q1 = gate.getQueue(project1.canonical_name, None)
|
||||
@ -3606,7 +3608,8 @@ class TestScheduler(ZuulTestCase):
|
||||
FakeChange = namedtuple('FakeChange', ['project', 'branch'])
|
||||
fake_a = FakeChange(project1, 'master')
|
||||
fake_b = FakeChange(project2, 'master')
|
||||
with gate.manager.currentContext(self.createZKContext()):
|
||||
with self.createZKContext() as ctx,\
|
||||
gate.manager.currentContext(ctx):
|
||||
gate.manager.getChangeQueue(fake_a, None)
|
||||
gate.manager.getChangeQueue(fake_b, None)
|
||||
q1 = gate.getQueue(project1.canonical_name, None)
|
||||
@ -3628,7 +3631,8 @@ class TestScheduler(ZuulTestCase):
|
||||
FakeChange = namedtuple('FakeChange', ['project', 'branch'])
|
||||
fake_a = FakeChange(project1, 'master')
|
||||
fake_b = FakeChange(project2, 'master')
|
||||
with gate.manager.currentContext(self.createZKContext()):
|
||||
with self.createZKContext() as ctx,\
|
||||
gate.manager.currentContext(ctx):
|
||||
gate.manager.getChangeQueue(fake_a, None)
|
||||
gate.manager.getChangeQueue(fake_b, None)
|
||||
q1 = gate.getQueue(project1.canonical_name, None)
|
||||
@ -3649,7 +3653,8 @@ class TestScheduler(ZuulTestCase):
|
||||
FakeChange = namedtuple('FakeChange', ['project', 'branch'])
|
||||
fake_a = FakeChange(project1, 'master')
|
||||
fake_b = FakeChange(project2, 'master')
|
||||
with gate.manager.currentContext(self.createZKContext()):
|
||||
with self.createZKContext() as ctx,\
|
||||
gate.manager.currentContext(ctx):
|
||||
gate.manager.getChangeQueue(fake_a, None)
|
||||
gate.manager.getChangeQueue(fake_b, None)
|
||||
q1 = gate.getQueue(project1.canonical_name, None)
|
||||
@ -3671,7 +3676,8 @@ class TestScheduler(ZuulTestCase):
|
||||
FakeChange = namedtuple('FakeChange', ['project', 'branch'])
|
||||
fake_a = FakeChange(project1, 'master')
|
||||
fake_b = FakeChange(project2, 'master')
|
||||
with gate.manager.currentContext(self.createZKContext()):
|
||||
with self.createZKContext() as ctx,\
|
||||
gate.manager.currentContext(ctx):
|
||||
gate.manager.getChangeQueue(fake_a, None)
|
||||
gate.manager.getChangeQueue(fake_b, None)
|
||||
q1 = gate.getQueue(project1.canonical_name, None)
|
||||
@ -6433,6 +6439,33 @@ For CI problems and help debugging, contact ci@example.org"""
|
||||
dict(name='hold-job', result='SUCCESS', changes='1,1'),
|
||||
], ordered=False)
|
||||
|
||||
def test_zkobject_parallel_refresh(self):
|
||||
# Test that we don't deadlock when refreshing objects
|
||||
zkobject.BaseZKContext._max_workers = 1
|
||||
|
||||
self.executor_server.hold_jobs_in_build = True
|
||||
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
|
||||
B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
|
||||
A.addApproval('Code-Review', 2)
|
||||
B.addApproval('Code-Review', 2)
|
||||
self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
|
||||
self.fake_gerrit.addEvent(B.addApproval('Approved', 1))
|
||||
self.waitUntilSettled()
|
||||
self.executor_server.hold_jobs_in_build = False
|
||||
self.executor_server.release()
|
||||
self.waitUntilSettled()
|
||||
|
||||
self.assertHistory([
|
||||
dict(name='project-merge', result='SUCCESS', changes='1,1'),
|
||||
dict(name='project-test1', result='SUCCESS', changes='1,1'),
|
||||
dict(name='project-test2', result='SUCCESS', changes='1,1'),
|
||||
dict(name='project-merge', result='SUCCESS', changes='1,1 2,1'),
|
||||
dict(name='project-test1', result='SUCCESS', changes='1,1 2,1'),
|
||||
dict(name='project-test2', result='SUCCESS', changes='1,1 2,1'),
|
||||
], ordered=False)
|
||||
self.assertEqual(A.data['status'], 'MERGED')
|
||||
self.assertEqual(B.data['status'], 'MERGED')
|
||||
|
||||
|
||||
class TestChangeQueues(ZuulTestCase):
|
||||
tenant_config_file = 'config/change-queues/main.yaml'
|
||||
|
@ -163,8 +163,8 @@ class TestScaleOutScheduler(ZuulTestCase):
|
||||
pipeline = tenant.layout.pipelines['check']
|
||||
summary = zuul.model.PipelineSummary()
|
||||
summary._set(pipeline=pipeline)
|
||||
context = self.createZKContext()
|
||||
summary.refresh(context)
|
||||
with self.createZKContext() as context:
|
||||
summary.refresh(context)
|
||||
self.assertEqual(summary.status['change_queues'], [])
|
||||
|
||||
def test_config_priming(self):
|
||||
@ -322,7 +322,8 @@ class TestScaleOutScheduler(ZuulTestCase):
|
||||
def new_summary():
|
||||
summary = zuul.model.PipelineSummary()
|
||||
summary._set(pipeline=pipeline)
|
||||
summary.refresh(context)
|
||||
with context:
|
||||
summary.refresh(context)
|
||||
return summary
|
||||
|
||||
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
|
||||
@ -345,7 +346,8 @@ class TestScaleOutScheduler(ZuulTestCase):
|
||||
self.assertTrue(context.client.exists(summary2.getPath()))
|
||||
|
||||
# Our earlier summary object should use its cached data
|
||||
summary1.refresh(context)
|
||||
with context:
|
||||
summary1.refresh(context)
|
||||
self.assertNotEqual(summary1.status, {})
|
||||
|
||||
self.executor_server.hold_jobs_in_build = False
|
||||
@ -354,7 +356,8 @@ class TestScaleOutScheduler(ZuulTestCase):
|
||||
|
||||
# The scheduler should have written a new summary that our
|
||||
# second object can read now.
|
||||
summary2.refresh(context)
|
||||
with context:
|
||||
summary2.refresh(context)
|
||||
self.assertNotEqual(summary2.status, {})
|
||||
|
||||
@simple_layout('layouts/semaphore.yaml')
|
||||
|
@ -5802,27 +5802,28 @@ class TestSecrets(ZuulTestCase):
|
||||
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
|
||||
self.waitUntilSettled()
|
||||
|
||||
context = self.scheds.first.sched.createZKContext(None, self.log)
|
||||
bs = BlobStore(context)
|
||||
self.assertEqual(len(bs), 1)
|
||||
with self.scheds.first.sched.createZKContext(None, self.log)\
|
||||
as context:
|
||||
bs = BlobStore(context)
|
||||
self.assertEqual(len(bs), 1)
|
||||
|
||||
self.scheds.first.sched._runBlobStoreCleanup()
|
||||
self.assertEqual(len(bs), 1)
|
||||
self.scheds.first.sched._runBlobStoreCleanup()
|
||||
self.assertEqual(len(bs), 1)
|
||||
|
||||
self.executor_server.hold_jobs_in_build = False
|
||||
self.executor_server.release()
|
||||
self.waitUntilSettled()
|
||||
self.executor_server.hold_jobs_in_build = False
|
||||
self.executor_server.release()
|
||||
self.waitUntilSettled()
|
||||
|
||||
self.assertEqual(A.reported, 1, "A should report success")
|
||||
self.assertHistory([
|
||||
dict(name='project1-secret', result='SUCCESS', changes='1,1'),
|
||||
])
|
||||
self.assertEqual(
|
||||
[{'secret_name': self.secret}],
|
||||
self._getSecrets('project1-secret', 'playbooks'))
|
||||
self.assertEqual(A.reported, 1, "A should report success")
|
||||
self.assertHistory([
|
||||
dict(name='project1-secret', result='SUCCESS', changes='1,1'),
|
||||
])
|
||||
self.assertEqual(
|
||||
[{'secret_name': self.secret}],
|
||||
self._getSecrets('project1-secret', 'playbooks'))
|
||||
|
||||
self.scheds.first.sched._runBlobStoreCleanup()
|
||||
self.assertEqual(len(bs), 0)
|
||||
self.scheds.first.sched._runBlobStoreCleanup()
|
||||
self.assertEqual(len(bs), 0)
|
||||
|
||||
|
||||
class TestSecretInheritance(ZuulTestCase):
|
||||
|
@ -1686,8 +1686,9 @@ class TestZKObject(ZooKeeperBaseTestCase):
|
||||
return zstat.last_modified_transaction_id
|
||||
|
||||
# Update an object
|
||||
with tenant_write_lock(self.zk_client, tenant_name) as lock:
|
||||
context = ZKContext(self.zk_client, lock, stop_event, self.log)
|
||||
with tenant_write_lock(self.zk_client, tenant_name) as lock,\
|
||||
ZKContext(self.zk_client, lock, stop_event, self.log)\
|
||||
as context:
|
||||
ltime1 = get_ltime(pipeline1)
|
||||
pipeline1.updateAttributes(context, foo='qux')
|
||||
self.assertEqual(pipeline1.foo, 'qux')
|
||||
@ -1700,8 +1701,9 @@ class TestZKObject(ZooKeeperBaseTestCase):
|
||||
self.assertEqual(ltime2, ltime3)
|
||||
|
||||
# Update an object using an active context
|
||||
with tenant_write_lock(self.zk_client, tenant_name) as lock:
|
||||
context = ZKContext(self.zk_client, lock, stop_event, self.log)
|
||||
with tenant_write_lock(self.zk_client, tenant_name) as lock,\
|
||||
ZKContext(self.zk_client, lock, stop_event, self.log)\
|
||||
as context:
|
||||
ltime1 = get_ltime(pipeline1)
|
||||
with pipeline1.activeContext(context):
|
||||
pipeline1.foo = 'baz'
|
||||
@ -1721,14 +1723,16 @@ class TestZKObject(ZooKeeperBaseTestCase):
|
||||
self.assertEqual(pipeline1.foo, 'baz')
|
||||
|
||||
# Refresh an existing object
|
||||
with tenant_write_lock(self.zk_client, tenant_name) as lock:
|
||||
context = ZKContext(self.zk_client, lock, stop_event, self.log)
|
||||
with tenant_write_lock(self.zk_client, tenant_name) as lock,\
|
||||
ZKContext(self.zk_client, lock, stop_event, self.log)\
|
||||
as context:
|
||||
pipeline2.refresh(context)
|
||||
self.assertEqual(pipeline2.foo, 'baz')
|
||||
|
||||
# Delete an object
|
||||
with tenant_write_lock(self.zk_client, tenant_name) as lock:
|
||||
context = ZKContext(self.zk_client, lock, stop_event, self.log)
|
||||
with tenant_write_lock(self.zk_client, tenant_name) as lock,\
|
||||
ZKContext(self.zk_client, lock, stop_event, self.log)\
|
||||
as context:
|
||||
self.assertIsNotNone(self.zk_client.client.exists(
|
||||
'/zuul/pipeline/fake_tenant'))
|
||||
pipeline2.delete(context)
|
||||
@ -1770,8 +1774,9 @@ class TestZKObject(ZooKeeperBaseTestCase):
|
||||
return self._real_client.set(*args, **kw)
|
||||
|
||||
# Fail an update
|
||||
with tenant_write_lock(self.zk_client, tenant_name) as lock:
|
||||
context = ZKContext(self.zk_client, lock, stop_event, self.log)
|
||||
with tenant_write_lock(self.zk_client, tenant_name) as lock,\
|
||||
ZKContext(self.zk_client, lock, stop_event, self.log)\
|
||||
as context:
|
||||
pipeline1 = zkobject_class.new(context,
|
||||
name=tenant_name,
|
||||
foo='one')
|
||||
@ -1976,8 +1981,9 @@ class TestConfigurationErrorList(ZooKeeperBaseTestCase):
|
||||
start_mark = model.ZuulMark(m1, m2, 'hello')
|
||||
|
||||
# Create a new object
|
||||
with tenant_write_lock(self.zk_client, 'test') as lock:
|
||||
context = ZKContext(self.zk_client, lock, stop_event, self.log)
|
||||
with tenant_write_lock(self.zk_client, 'test') as lock,\
|
||||
ZKContext(self.zk_client, lock, stop_event, self.log)\
|
||||
as context:
|
||||
pipeline = DummyZKObject.new(context, name="test", foo="bar")
|
||||
e1 = model.ConfigurationError(
|
||||
source_context, start_mark, "Test error1")
|
||||
@ -2006,8 +2012,9 @@ class TestBlobStore(ZooKeeperBaseTestCase):
|
||||
tenant_name = 'fake_tenant'
|
||||
|
||||
start_ltime = self.zk_client.getCurrentLtime()
|
||||
with tenant_write_lock(self.zk_client, tenant_name) as lock:
|
||||
context = ZKContext(self.zk_client, lock, stop_event, self.log)
|
||||
with tenant_write_lock(self.zk_client, tenant_name) as lock,\
|
||||
ZKContext(self.zk_client, lock, stop_event, self.log)\
|
||||
as context:
|
||||
bs = BlobStore(context)
|
||||
with testtools.ExpectedException(KeyError):
|
||||
bs.get('nope')
|
||||
|
@ -1035,9 +1035,9 @@ class Client(zuul.cmd.ZuulApp):
|
||||
zk_client.client.delete(
|
||||
f'/zuul/tenant/{safe_tenant}/pipeline/{safe_pipeline}',
|
||||
recursive=True)
|
||||
context = ZKContext(zk_client, lock, None, self.log)
|
||||
ps = PipelineState.new(context, _path=path,
|
||||
layout_uuid=layout_uuid)
|
||||
with ZKContext(zk_client, lock, None, self.log) as context:
|
||||
ps = PipelineState.new(context, _path=path,
|
||||
layout_uuid=layout_uuid)
|
||||
# Force everyone to make a new layout for this tenant in
|
||||
# order to rebuild the shared change queues.
|
||||
layout_state = LayoutState(
|
||||
|
@ -966,8 +966,8 @@ class AnsibleJob(object):
|
||||
def __init__(self, executor_server, build_request, arguments):
|
||||
logger = logging.getLogger("zuul.AnsibleJob")
|
||||
self.arguments = arguments
|
||||
self.job = FrozenJob.fromZK(executor_server.zk_context,
|
||||
arguments["job_ref"])
|
||||
with executor_server.zk_context as ctx:
|
||||
self.job = FrozenJob.fromZK(ctx, arguments["job_ref"])
|
||||
self.arguments["zuul"].update(zuul_params_from_job(self.job))
|
||||
|
||||
self.zuul_event_id = self.arguments["zuul_event_id"]
|
||||
@ -1240,11 +1240,12 @@ class AnsibleJob(object):
|
||||
|
||||
def loadRepoState(self):
|
||||
merge_rs_path = self.arguments['merge_repo_state_ref']
|
||||
merge_repo_state = merge_rs_path and MergeRepoState.fromZK(
|
||||
self.executor_server.zk_context, merge_rs_path)
|
||||
extra_rs_path = self.arguments['extra_repo_state_ref']
|
||||
extra_repo_state = extra_rs_path and ExtraRepoState.fromZK(
|
||||
self.executor_server.zk_context, extra_rs_path)
|
||||
with self.executor_server.zk_context as ctx:
|
||||
merge_repo_state = merge_rs_path and MergeRepoState.fromZK(
|
||||
ctx, merge_rs_path)
|
||||
extra_rs_path = self.arguments['extra_repo_state_ref']
|
||||
extra_repo_state = extra_rs_path and ExtraRepoState.fromZK(
|
||||
ctx, extra_rs_path)
|
||||
d = {}
|
||||
for rs in (merge_repo_state, extra_repo_state):
|
||||
if not rs:
|
||||
@ -2081,23 +2082,24 @@ class AnsibleJob(object):
|
||||
|
||||
"""
|
||||
ret = {}
|
||||
blobstore = BlobStore(self.executor_server.zk_context)
|
||||
for secret_name, secret_index in secrets.items():
|
||||
if isinstance(secret_index, dict):
|
||||
key = secret_index['blob']
|
||||
data = blobstore.get(key)
|
||||
frozen_secret = json.loads(data.decode('utf-8'))
|
||||
else:
|
||||
frozen_secret = self.job.secrets[secret_index]
|
||||
secret = zuul.model.Secret(secret_name, None)
|
||||
secret.secret_data = yaml.encrypted_load(
|
||||
frozen_secret['encrypted_data'])
|
||||
private_secrets_key, public_secrets_key = \
|
||||
self.executor_server.keystore.getProjectSecretsKeys(
|
||||
frozen_secret['connection_name'],
|
||||
frozen_secret['project_name'])
|
||||
secret = secret.decrypt(private_secrets_key)
|
||||
ret[secret_name] = secret.secret_data
|
||||
with self.executor_server.zk_context as ctx:
|
||||
blobstore = BlobStore(ctx)
|
||||
for secret_name, secret_index in secrets.items():
|
||||
if isinstance(secret_index, dict):
|
||||
key = secret_index['blob']
|
||||
data = blobstore.get(key)
|
||||
frozen_secret = json.loads(data.decode('utf-8'))
|
||||
else:
|
||||
frozen_secret = self.job.secrets[secret_index]
|
||||
secret = zuul.model.Secret(secret_name, None)
|
||||
secret.secret_data = yaml.encrypted_load(
|
||||
frozen_secret['encrypted_data'])
|
||||
private_secrets_key, public_secrets_key = \
|
||||
self.executor_server.keystore.getProjectSecretsKeys(
|
||||
frozen_secret['connection_name'],
|
||||
frozen_secret['project_name'])
|
||||
secret = secret.decrypt(private_secrets_key)
|
||||
ret[secret_name] = secret.secret_data
|
||||
return ret
|
||||
|
||||
def checkoutTrustedProject(self, project, branch, args):
|
||||
|
@ -102,8 +102,8 @@ class PipelineManager(metaclass=ABCMeta):
|
||||
# state may still be out of date after this because we skip
|
||||
# the refresh.
|
||||
self.buildChangeQueues(layout)
|
||||
ctx = self.sched.createZKContext(None, self.log)
|
||||
with self.currentContext(ctx):
|
||||
with self.sched.createZKContext(None, self.log) as ctx,\
|
||||
self.currentContext(ctx):
|
||||
if layout.uuid == PipelineState.peekLayoutUUID(self.pipeline):
|
||||
self.pipeline.state = PipelineState()
|
||||
self.pipeline.state._set(pipeline=self.pipeline)
|
||||
@ -112,25 +112,25 @@ class PipelineManager(metaclass=ABCMeta):
|
||||
return
|
||||
|
||||
with pipeline_lock(
|
||||
self.sched.zk_client, self.pipeline.tenant.name, self.pipeline.name
|
||||
) as lock:
|
||||
ctx = self.sched.createZKContext(lock, self.log)
|
||||
with self.currentContext(ctx):
|
||||
# Since the layout UUID is new, this will move queues
|
||||
# to "old_queues". Note that it will *not* refresh
|
||||
# the contents, in fact, we will get a new
|
||||
# PipelineState python object with no queues, just as
|
||||
# above. Our state is guaranteed to be out of date
|
||||
# now, but we don't need to do anything with it, we
|
||||
# will let the next actor to use it refresh it then.
|
||||
self.pipeline.state = PipelineState.resetOrCreate(
|
||||
self.pipeline, layout.uuid)
|
||||
self.pipeline.change_list = PipelineChangeList.create(
|
||||
self.pipeline)
|
||||
event = PipelinePostConfigEvent()
|
||||
self.sched.pipeline_management_events[
|
||||
self.pipeline.tenant.name][self.pipeline.name].put(
|
||||
event, needs_result=False)
|
||||
self.sched.zk_client, self.pipeline.tenant.name,
|
||||
self.pipeline.name) as lock,\
|
||||
self.sched.createZKContext(lock, self.log) as ctx,\
|
||||
self.currentContext(ctx):
|
||||
# Since the layout UUID is new, this will move queues
|
||||
# to "old_queues". Note that it will *not* refresh
|
||||
# the contents, in fact, we will get a new
|
||||
# PipelineState python object with no queues, just as
|
||||
# above. Our state is guaranteed to be out of date
|
||||
# now, but we don't need to do anything with it, we
|
||||
# will let the next actor to use it refresh it then.
|
||||
self.pipeline.state = PipelineState.resetOrCreate(
|
||||
self.pipeline, layout.uuid)
|
||||
self.pipeline.change_list = PipelineChangeList.create(
|
||||
self.pipeline)
|
||||
event = PipelinePostConfigEvent()
|
||||
self.sched.pipeline_management_events[
|
||||
self.pipeline.tenant.name][self.pipeline.name].put(
|
||||
event, needs_result=False)
|
||||
|
||||
def buildChangeQueues(self, layout):
|
||||
self.log.debug("Building relative_priority queues")
|
||||
|
@ -1050,14 +1050,26 @@ class ChangeQueue(zkobject.ZKObject):
|
||||
})
|
||||
|
||||
items_by_path = OrderedDict()
|
||||
# This is a tuple of (x, Future), where x is None if no action
|
||||
# needs to be taken, or a string to indicate which kind of job
|
||||
# it was. This structure allows us to execute async ZK reads
|
||||
# and perform local data updates in order.
|
||||
tpe_jobs = []
|
||||
tpe = context.executor[ChangeQueue]
|
||||
for item_path in data["queue"]:
|
||||
item = existing_items.get(item_path)
|
||||
items_by_path[item_path] = item
|
||||
if item:
|
||||
item.refresh(context)
|
||||
tpe_jobs.append((None, tpe.submit(item.refresh, context)))
|
||||
else:
|
||||
item = QueueItem.fromZK(context, item_path,
|
||||
pipeline=self.pipeline, queue=self)
|
||||
items_by_path[item.getPath()] = item
|
||||
tpe_jobs.append(('item', tpe.submit(
|
||||
QueueItem.fromZK, context, item_path,
|
||||
pipeline=self.pipeline, queue=self)))
|
||||
|
||||
for (kind, future) in tpe_jobs:
|
||||
result = future.result()
|
||||
if kind == 'item':
|
||||
items_by_path[result.getPath()] = result
|
||||
|
||||
# Resolve ahead/behind references between queue items
|
||||
for item in items_by_path.values():
|
||||
@ -4157,6 +4169,13 @@ class BuildSet(zkobject.ZKObject):
|
||||
existing_retry_builds = {b.getPath(): b
|
||||
for bl in self.retry_builds.values()
|
||||
for b in bl}
|
||||
# This is a tuple of (kind, job_name, Future), where kind is
|
||||
# None if no action needs to be taken, or a string to indicate
|
||||
# which kind of job it was. This structure allows us to
|
||||
# execute async ZK reads and perform local data updates in
|
||||
# order.
|
||||
tpe_jobs = []
|
||||
tpe = context.executor[BuildSet]
|
||||
# jobs (deserialize as separate objects)
|
||||
if data['job_graph']:
|
||||
for job_name in data['job_graph'].jobs:
|
||||
@ -4171,39 +4190,62 @@ class BuildSet(zkobject.ZKObject):
|
||||
if job_name in self.jobs:
|
||||
job = self.jobs[job_name]
|
||||
if not old_build_exists:
|
||||
job.refresh(context)
|
||||
tpe_jobs.append((None, job_name,
|
||||
tpe.submit(job.refresh, context)))
|
||||
else:
|
||||
job_path = FrozenJob.jobPath(job_name, self.getPath())
|
||||
job = FrozenJob.fromZK(context, job_path, buildset=self)
|
||||
self.jobs[job_name] = job
|
||||
tpe_jobs.append(('job', job_name, tpe.submit(
|
||||
FrozenJob.fromZK, context, job_path, buildset=self)))
|
||||
|
||||
if build_path:
|
||||
build = self.builds.get(job_name)
|
||||
builds[job_name] = build
|
||||
if build and build.getPath() == build_path:
|
||||
if not build.result:
|
||||
build.refresh(context)
|
||||
tpe_jobs.append((
|
||||
None, job_name, tpe.submit(
|
||||
build.refresh, context)))
|
||||
else:
|
||||
if not self._isMyBuild(build_path):
|
||||
build = BuildReference(build_path)
|
||||
context.build_references = True
|
||||
builds[job_name] = build
|
||||
else:
|
||||
build = Build.fromZK(
|
||||
context, build_path, job=job, build_set=self)
|
||||
builds[job_name] = build
|
||||
tpe_jobs.append((
|
||||
'build', job_name, tpe.submit(
|
||||
Build.fromZK, context, build_path,
|
||||
build_set=self)))
|
||||
|
||||
for retry_path in data["retry_builds"].get(job_name, []):
|
||||
retry_build = existing_retry_builds.get(retry_path)
|
||||
if retry_build and retry_build.getPath() == retry_path:
|
||||
# Retry builds never change.
|
||||
pass
|
||||
retry_builds[job_name].append(retry_build)
|
||||
else:
|
||||
if not self._isMyBuild(retry_path):
|
||||
retry_build = BuildReference(retry_path)
|
||||
context.build_references = True
|
||||
retry_builds[job_name].append(retry_build)
|
||||
else:
|
||||
retry_build = Build.fromZK(
|
||||
context, retry_path, job=job, build_set=self)
|
||||
retry_builds[job_name].append(retry_build)
|
||||
tpe_jobs.append((
|
||||
'retry', job_name, tpe.submit(
|
||||
Build.fromZK, context, retry_path,
|
||||
build_set=self)))
|
||||
|
||||
for (kind, job_name, future) in tpe_jobs:
|
||||
result = future.result()
|
||||
if kind == 'job':
|
||||
self.jobs[job_name] = result
|
||||
elif kind == 'build':
|
||||
# We normally set the job on the constructor, but we
|
||||
# may not have had it in time. At this point though,
|
||||
# the job future is guaranteed to have completed, so
|
||||
# we can look it up now.
|
||||
result._set(job=self.jobs[job_name])
|
||||
builds[job_name] = result
|
||||
elif kind == 'retry':
|
||||
result._set(job=self.jobs[job_name])
|
||||
retry_builds[job_name].append(result)
|
||||
|
||||
data.update({
|
||||
"builds": builds,
|
||||
|
@ -680,16 +680,16 @@ class Scheduler(threading.Thread):
|
||||
with pipeline_lock(
|
||||
self.zk_client, tenant.name, pipeline.name,
|
||||
) as lock:
|
||||
ctx = self.createZKContext(lock, self.log)
|
||||
with pipeline.manager.currentContext(ctx):
|
||||
pipeline.change_list.refresh(ctx)
|
||||
pipeline.state.refresh(ctx)
|
||||
# In case we're in the middle of a reconfig,
|
||||
# include the old queue items.
|
||||
for item in pipeline.getAllItems(include_old=True):
|
||||
nrs = item.current_build_set.node_requests
|
||||
for req_id in (nrs.values()):
|
||||
outstanding_requests.add(req_id)
|
||||
with self.createZKContext(lock, self.log) as ctx:
|
||||
with pipeline.manager.currentContext(ctx):
|
||||
pipeline.change_list.refresh(ctx)
|
||||
pipeline.state.refresh(ctx)
|
||||
# In case we're in the middle of a reconfig,
|
||||
# include the old queue items.
|
||||
for item in pipeline.getAllItems(include_old=True):
|
||||
nrs = item.current_build_set.node_requests
|
||||
for req_id in (nrs.values()):
|
||||
outstanding_requests.add(req_id)
|
||||
leaked_requests = zk_requests - outstanding_requests
|
||||
for req_id in leaked_requests:
|
||||
try:
|
||||
@ -762,24 +762,24 @@ class Scheduler(threading.Thread):
|
||||
for tenant in self.abide.tenants.values():
|
||||
for pipeline in tenant.layout.pipelines.values():
|
||||
with pipeline_lock(
|
||||
self.zk_client, tenant.name, pipeline.name,
|
||||
) as lock:
|
||||
ctx = self.createZKContext(lock, self.log)
|
||||
self.zk_client, tenant.name,
|
||||
pipeline.name) as lock,\
|
||||
self.createZKContext(lock, self.log) as ctx:
|
||||
pipeline.state.refresh(ctx)
|
||||
# add any blobstore references
|
||||
for item in pipeline.getAllItems(include_old=True):
|
||||
live_blobs.update(item.getBlobKeys())
|
||||
ctx = self.createZKContext(None, self.log)
|
||||
blobstore = BlobStore(ctx)
|
||||
# get the set of blob keys unused since the start time
|
||||
# (ie, we have already filtered any newly added keys)
|
||||
unused_blobs = blobstore.getKeysLastUsedBefore(start_ltime)
|
||||
# remove the current refences
|
||||
unused_blobs -= live_blobs
|
||||
# delete what's left
|
||||
for key in unused_blobs:
|
||||
self.log.debug("Deleting unused blob: %s", key)
|
||||
blobstore.delete(key, start_ltime)
|
||||
with self.createZKContext(None, self.log) as ctx:
|
||||
blobstore = BlobStore(ctx)
|
||||
# get the set of blob keys unused since the start time
|
||||
# (ie, we have already filtered any newly added keys)
|
||||
unused_blobs = blobstore.getKeysLastUsedBefore(start_ltime)
|
||||
# remove the current refences
|
||||
unused_blobs -= live_blobs
|
||||
# delete what's left
|
||||
for key in unused_blobs:
|
||||
self.log.debug("Deleting unused blob: %s", key)
|
||||
blobstore.delete(key, start_ltime)
|
||||
self.log.debug("Finished blob store cleanup")
|
||||
except Exception:
|
||||
self.log.exception("Error in blob store cleanup:")
|
||||
@ -1004,8 +1004,9 @@ class Scheduler(threading.Thread):
|
||||
|
||||
if layout_state is None:
|
||||
# Reconfigure only tenants w/o an existing layout state
|
||||
ctx = self.createZKContext(tlock, self.log)
|
||||
self._reconfigureTenant(ctx, min_ltimes, -1, tenant)
|
||||
with self.createZKContext(tlock, self.log) as ctx:
|
||||
self._reconfigureTenant(
|
||||
ctx, min_ltimes, -1, tenant)
|
||||
self._reportInitialStats(tenant)
|
||||
else:
|
||||
self.local_layout_state[tenant_name] = layout_state
|
||||
@ -1454,17 +1455,17 @@ class Scheduler(threading.Thread):
|
||||
self.unparsed_abide, min_ltimes=min_ltimes,
|
||||
branch_cache_min_ltimes=branch_cache_min_ltimes)
|
||||
reconfigured_tenants.append(tenant_name)
|
||||
ctx = self.createZKContext(lock, self.log)
|
||||
if tenant is not None:
|
||||
self._reconfigureTenant(ctx, min_ltimes,
|
||||
-1,
|
||||
tenant, old_tenant)
|
||||
else:
|
||||
self._reconfigureDeleteTenant(ctx, old_tenant)
|
||||
with suppress(KeyError):
|
||||
del self.tenant_layout_state[tenant_name]
|
||||
with suppress(KeyError):
|
||||
del self.local_layout_state[tenant_name]
|
||||
with self.createZKContext(lock, self.log) as ctx:
|
||||
if tenant is not None:
|
||||
self._reconfigureTenant(ctx, min_ltimes,
|
||||
-1,
|
||||
tenant, old_tenant)
|
||||
else:
|
||||
self._reconfigureDeleteTenant(ctx, old_tenant)
|
||||
with suppress(KeyError):
|
||||
del self.tenant_layout_state[tenant_name]
|
||||
with suppress(KeyError):
|
||||
del self.local_layout_state[tenant_name]
|
||||
|
||||
duration = round(time.monotonic() - start, 3)
|
||||
self.log.info("Reconfiguration complete (smart: %s, tenants: %s, "
|
||||
@ -1520,10 +1521,10 @@ class Scheduler(threading.Thread):
|
||||
self.unparsed_abide, min_ltimes=min_ltimes,
|
||||
branch_cache_min_ltimes=branch_cache_min_ltimes)
|
||||
tenant = self.abide.tenants[event.tenant_name]
|
||||
ctx = self.createZKContext(lock, self.log)
|
||||
self._reconfigureTenant(ctx, min_ltimes,
|
||||
event.trigger_event_ltime,
|
||||
tenant, old_tenant)
|
||||
with self.createZKContext(lock, self.log) as ctx:
|
||||
self._reconfigureTenant(ctx, min_ltimes,
|
||||
event.trigger_event_ltime,
|
||||
tenant, old_tenant)
|
||||
duration = round(time.monotonic() - start, 3)
|
||||
self.log.info("Tenant reconfiguration complete for %s (duration: %s "
|
||||
"seconds)", event.tenant_name, duration)
|
||||
@ -2103,9 +2104,9 @@ class Scheduler(threading.Thread):
|
||||
stats_key = f'zuul.tenant.{tenant.name}.pipeline.{pipeline.name}'
|
||||
try:
|
||||
with pipeline_lock(
|
||||
self.zk_client, tenant.name, pipeline.name, blocking=False
|
||||
) as lock:
|
||||
ctx = self.createZKContext(lock, self.log)
|
||||
self.zk_client, tenant.name, pipeline.name,
|
||||
blocking=False) as lock,\
|
||||
self.createZKContext(lock, self.log) as ctx:
|
||||
with pipeline.manager.currentContext(ctx):
|
||||
with self.statsd_timer(f'{stats_key}.handling'):
|
||||
refreshed = self._process_pipeline(
|
||||
@ -2212,8 +2213,8 @@ class Scheduler(threading.Thread):
|
||||
|
||||
def _gatherConnectionCacheKeys(self):
|
||||
relevant = set()
|
||||
with self.layout_lock:
|
||||
ctx = self.createZKContext(None, self.log)
|
||||
with self.layout_lock,\
|
||||
self.createZKContext(None, self.log) as ctx:
|
||||
for tenant in self.abide.tenants.values():
|
||||
for pipeline in tenant.layout.pipelines.values():
|
||||
self.log.debug("Gather relevant cache items for: %s %s",
|
||||
|
@ -972,21 +972,23 @@ class ZuulWebAPI(object):
|
||||
|
||||
def _tenants(self):
|
||||
result = []
|
||||
for tenant_name, tenant in sorted(self.zuulweb.abide.tenants.items()):
|
||||
queue_size = 0
|
||||
for pipeline in tenant.layout.pipelines.values():
|
||||
status = pipeline.summary.refresh(self.zuulweb.zk_context)
|
||||
for queue in status.get("change_queues", []):
|
||||
for head in queue["heads"]:
|
||||
for item in head:
|
||||
if item["live"]:
|
||||
queue_size += 1
|
||||
with self.zuulweb.zk_context as ctx:
|
||||
for tenant_name, tenant in sorted(
|
||||
self.zuulweb.abide.tenants.items()):
|
||||
queue_size = 0
|
||||
for pipeline in tenant.layout.pipelines.values():
|
||||
status = pipeline.summary.refresh(ctx)
|
||||
for queue in status.get("change_queues", []):
|
||||
for head in queue["heads"]:
|
||||
for item in head:
|
||||
if item["live"]:
|
||||
queue_size += 1
|
||||
|
||||
result.append({
|
||||
'name': tenant_name,
|
||||
'projects': len(tenant.untrusted_projects),
|
||||
'queue': queue_size,
|
||||
})
|
||||
result.append({
|
||||
'name': tenant_name,
|
||||
'projects': len(tenant.untrusted_projects),
|
||||
'queue': queue_size,
|
||||
})
|
||||
return result
|
||||
|
||||
@cherrypy.expose
|
||||
@ -1096,13 +1098,16 @@ class ZuulWebAPI(object):
|
||||
result_event_queues = self.zuulweb.pipeline_result_events[tenant.name]
|
||||
management_event_queues = self.zuulweb.pipeline_management_events[
|
||||
tenant.name]
|
||||
for pipeline in tenant.layout.pipelines.values():
|
||||
status = pipeline.summary.refresh(self.zuulweb.zk_context)
|
||||
status['trigger_events'] = len(trigger_event_queues[pipeline.name])
|
||||
status['result_events'] = len(result_event_queues[pipeline.name])
|
||||
status['management_events'] = len(
|
||||
management_event_queues[pipeline.name])
|
||||
pipelines.append(status)
|
||||
with self.zuulweb.zk_context as ctx:
|
||||
for pipeline in tenant.layout.pipelines.values():
|
||||
status = pipeline.summary.refresh(ctx)
|
||||
status['trigger_events'] = len(
|
||||
trigger_event_queues[pipeline.name])
|
||||
status['result_events'] = len(
|
||||
result_event_queues[pipeline.name])
|
||||
status['management_events'] = len(
|
||||
management_event_queues[pipeline.name])
|
||||
pipelines.append(status)
|
||||
return data, json.dumps(data).encode('utf-8')
|
||||
|
||||
def _getTenantOrRaise(self, tenant_name):
|
||||
@ -1694,13 +1699,13 @@ class ZuulWebAPI(object):
|
||||
|
||||
change = Branch(project)
|
||||
change.branch = branch_name or "master"
|
||||
context = LocalZKContext(self.log)
|
||||
queue = ChangeQueue.new(context, pipeline=pipeline)
|
||||
item = QueueItem.new(context, queue=queue, change=change,
|
||||
pipeline=queue.pipeline)
|
||||
item.freezeJobGraph(tenant.layout, context,
|
||||
skip_file_matcher=True,
|
||||
redact_secrets_and_keys=True)
|
||||
with LocalZKContext(self.log) as context:
|
||||
queue = ChangeQueue.new(context, pipeline=pipeline)
|
||||
item = QueueItem.new(context, queue=queue, change=change,
|
||||
pipeline=queue.pipeline)
|
||||
item.freezeJobGraph(tenant.layout, context,
|
||||
skip_file_matcher=True,
|
||||
redact_secrets_and_keys=True)
|
||||
|
||||
return item
|
||||
|
||||
|
@ -134,15 +134,14 @@ class BlobStore:
|
||||
return key
|
||||
|
||||
# make a new context based on the old one
|
||||
locked_context = ZKContext(self.context.client, lock,
|
||||
self.context.stop_event,
|
||||
self.context.log)
|
||||
|
||||
self._retry(
|
||||
locked_context,
|
||||
self._retryableSave,
|
||||
locked_context, path, flag, data)
|
||||
self.context.updateStatsFromOtherContext(locked_context)
|
||||
with ZKContext(self.context.client, lock,
|
||||
self.context.stop_event,
|
||||
self.context.log) as locked_context:
|
||||
self._retry(
|
||||
locked_context,
|
||||
self._retryableSave,
|
||||
locked_context, path, flag, data)
|
||||
self.context.updateStatsFromOtherContext(locked_context)
|
||||
return key
|
||||
|
||||
def delete(self, key, ltime):
|
||||
@ -158,18 +157,18 @@ class BlobStore:
|
||||
blocking=True
|
||||
) as lock:
|
||||
# make a new context based on the old one
|
||||
locked_context = ZKContext(self.context.client, lock,
|
||||
self.context.stop_event,
|
||||
self.context.log)
|
||||
with ZKContext(self.context.client, lock,
|
||||
self.context.stop_event,
|
||||
self.context.log) as locked_context:
|
||||
|
||||
# Double check that it hasn't been used since we
|
||||
# decided to delete it
|
||||
data, zstat = self._retry(locked_context,
|
||||
self.context.client.get,
|
||||
flag)
|
||||
if zstat.last_modified_transaction_id < ltime:
|
||||
self._retry(locked_context, self.context.client.delete,
|
||||
path, recursive=True)
|
||||
# Double check that it hasn't been used since we
|
||||
# decided to delete it
|
||||
data, zstat = self._retry(locked_context,
|
||||
self.context.client.get,
|
||||
flag)
|
||||
if zstat.last_modified_transaction_id < ltime:
|
||||
self._retry(locked_context, self.context.client.delete,
|
||||
path, recursive=True)
|
||||
except NoNodeError:
|
||||
raise KeyError(key)
|
||||
|
||||
|
@ -102,25 +102,27 @@ class BranchCache:
|
||||
# to the context.
|
||||
self.zk_context = ZKContext(zk_client, self.wlock, None, self.log)
|
||||
|
||||
with locked(self.wlock):
|
||||
with self.zk_context as ctx,\
|
||||
locked(self.wlock):
|
||||
try:
|
||||
self.cache = BranchCacheZKObject.fromZK(
|
||||
self.zk_context, data_path, _path=data_path)
|
||||
ctx, data_path, _path=data_path)
|
||||
except NoNodeError:
|
||||
self.cache = BranchCacheZKObject.new(
|
||||
self.zk_context, _path=data_path)
|
||||
ctx, _path=data_path)
|
||||
|
||||
def clear(self, projects=None):
|
||||
"""Clear the cache"""
|
||||
with locked(self.wlock):
|
||||
with self.cache.activeContext(self.zk_context):
|
||||
if projects is None:
|
||||
self.cache.protected.clear()
|
||||
self.cache.remainder.clear()
|
||||
else:
|
||||
for p in projects:
|
||||
self.cache.protected.pop(p, None)
|
||||
self.cache.remainder.pop(p, None)
|
||||
with locked(self.wlock),\
|
||||
self.zk_context as ctx,\
|
||||
self.cache.activeContext(ctx):
|
||||
if projects is None:
|
||||
self.cache.protected.clear()
|
||||
self.cache.remainder.clear()
|
||||
else:
|
||||
for p in projects:
|
||||
self.cache.protected.pop(p, None)
|
||||
self.cache.remainder.pop(p, None)
|
||||
|
||||
def getProjectBranches(self, project_name, exclude_unprotected,
|
||||
min_ltime=-1, default=RAISE_EXCEPTION):
|
||||
@ -154,8 +156,9 @@ class BranchCache:
|
||||
an error when fetching the branches.
|
||||
"""
|
||||
if self.ltime < min_ltime:
|
||||
with locked(self.rlock):
|
||||
self.cache.refresh(self.zk_context)
|
||||
with locked(self.rlock),\
|
||||
self.zk_context as ctx:
|
||||
self.cache.refresh(ctx)
|
||||
|
||||
protected_branches = None
|
||||
try:
|
||||
@ -197,23 +200,24 @@ class BranchCache:
|
||||
The list of branches or None to indicate a fetch error.
|
||||
"""
|
||||
|
||||
with locked(self.wlock):
|
||||
with self.cache.activeContext(self.zk_context):
|
||||
if exclude_unprotected:
|
||||
self.cache.protected[project_name] = branches
|
||||
remainder_branches = self.cache.remainder.get(project_name)
|
||||
if remainder_branches and branches:
|
||||
remainder = list(set(remainder_branches) -
|
||||
set(branches))
|
||||
self.cache.remainder[project_name] = remainder
|
||||
else:
|
||||
protected_branches = self.cache.protected.get(project_name)
|
||||
if protected_branches and branches:
|
||||
remainder = list(set(branches) -
|
||||
set(protected_branches))
|
||||
else:
|
||||
remainder = branches
|
||||
with locked(self.wlock),\
|
||||
self.zk_context as ctx,\
|
||||
self.cache.activeContext(ctx):
|
||||
if exclude_unprotected:
|
||||
self.cache.protected[project_name] = branches
|
||||
remainder_branches = self.cache.remainder.get(project_name)
|
||||
if remainder_branches and branches:
|
||||
remainder = list(set(remainder_branches) -
|
||||
set(branches))
|
||||
self.cache.remainder[project_name] = remainder
|
||||
else:
|
||||
protected_branches = self.cache.protected.get(project_name)
|
||||
if protected_branches and branches:
|
||||
remainder = list(set(branches) -
|
||||
set(protected_branches))
|
||||
else:
|
||||
remainder = branches
|
||||
self.cache.remainder[project_name] = remainder
|
||||
|
||||
def setProtected(self, project_name, branch, protected):
|
||||
"""Correct the protection state of a branch.
|
||||
@ -222,33 +226,34 @@ class BranchCache:
|
||||
receiving an explicit event.
|
||||
"""
|
||||
|
||||
with locked(self.wlock):
|
||||
with self.cache.activeContext(self.zk_context):
|
||||
protected_branches = self.cache.protected.get(project_name)
|
||||
remainder_branches = self.cache.remainder.get(project_name)
|
||||
if protected:
|
||||
if protected_branches is None:
|
||||
# We've never run a protected query, so we
|
||||
# should ignore this branch.
|
||||
return
|
||||
else:
|
||||
# We have run a protected query; if we have
|
||||
# also run an unprotected query, we need to
|
||||
# move the branch from remainder to protected.
|
||||
if remainder_branches and branch in remainder_branches:
|
||||
remainder_branches.remove(branch)
|
||||
if branch not in protected_branches:
|
||||
protected_branches.append(branch)
|
||||
with locked(self.wlock),\
|
||||
self.zk_context as ctx,\
|
||||
self.cache.activeContext(ctx):
|
||||
protected_branches = self.cache.protected.get(project_name)
|
||||
remainder_branches = self.cache.remainder.get(project_name)
|
||||
if protected:
|
||||
if protected_branches is None:
|
||||
# We've never run a protected query, so we
|
||||
# should ignore this branch.
|
||||
return
|
||||
else:
|
||||
if protected_branches and branch in protected_branches:
|
||||
protected_branches.remove(branch)
|
||||
if remainder_branches is None:
|
||||
# We've never run an unprotected query, so we
|
||||
# should ignore this branch.
|
||||
return
|
||||
else:
|
||||
if branch not in remainder_branches:
|
||||
remainder_branches.append(branch)
|
||||
# We have run a protected query; if we have
|
||||
# also run an unprotected query, we need to
|
||||
# move the branch from remainder to protected.
|
||||
if remainder_branches and branch in remainder_branches:
|
||||
remainder_branches.remove(branch)
|
||||
if branch not in protected_branches:
|
||||
protected_branches.append(branch)
|
||||
else:
|
||||
if protected_branches and branch in protected_branches:
|
||||
protected_branches.remove(branch)
|
||||
if remainder_branches is None:
|
||||
# We've never run an unprotected query, so we
|
||||
# should ignore this branch.
|
||||
return
|
||||
else:
|
||||
if branch not in remainder_branches:
|
||||
remainder_branches.append(branch)
|
||||
|
||||
@property
|
||||
def ltime(self):
|
||||
|
@ -1,4 +1,4 @@
|
||||
# Copyright 2021 Acme Gating, LLC
|
||||
# Copyright 2021-2022 Acme Gating, LLC
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
@ -12,12 +12,15 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
import contextlib
|
||||
import json
|
||||
import logging
|
||||
import sys
|
||||
import time
|
||||
import types
|
||||
import zlib
|
||||
import collections
|
||||
|
||||
from kazoo.exceptions import NodeExistsError, NoNodeError
|
||||
from kazoo.retry import KazooRetry
|
||||
@ -26,11 +29,45 @@ from zuul.zk import sharding
|
||||
from zuul.zk import ZooKeeperClient
|
||||
|
||||
|
||||
class ZKContext:
|
||||
class BaseZKContext:
|
||||
profile_logger = logging.getLogger('zuul.profile')
|
||||
profile_default = False
|
||||
# Only changed by unit tests.
|
||||
# The default scales with number of procs.
|
||||
_max_workers = None
|
||||
|
||||
def __init__(self):
|
||||
# We create the executor dict in enter to make sure that this
|
||||
# is used as a context manager and cleaned up properly.
|
||||
self.executor = None
|
||||
|
||||
def __enter__(self):
|
||||
if self.executor:
|
||||
raise RuntimeError("ZKContext entered multiple times")
|
||||
# This is a dictionary keyed by class. ZKObject subclasses
|
||||
# can request a dedicated ThreadPoolExecutor for their class
|
||||
# so that deserialize methods that use it can avoid deadlocks
|
||||
# with child class deserialize methods.
|
||||
self.executor = collections.defaultdict(
|
||||
lambda: ThreadPoolExecutor(
|
||||
max_workers=self._max_workers,
|
||||
thread_name_prefix="ZKContext",
|
||||
))
|
||||
return self
|
||||
|
||||
def __exit__(self, etype, value, tb):
|
||||
if self.executor:
|
||||
for executor in self.executor.values():
|
||||
if sys.version_info >= (3, 9):
|
||||
executor.shutdown(wait=False, cancel_futures=True)
|
||||
else:
|
||||
executor.shutdown(wait=False)
|
||||
self.executor = None
|
||||
|
||||
|
||||
class ZKContext(BaseZKContext):
|
||||
def __init__(self, zk_client, lock, stop_event, log):
|
||||
super().__init__()
|
||||
if isinstance(zk_client, ZooKeeperClient):
|
||||
client = zk_client.client
|
||||
else:
|
||||
@ -80,10 +117,11 @@ class ZKContext:
|
||||
self.cumulative_read_bytes, self.cumulative_write_bytes)
|
||||
|
||||
|
||||
class LocalZKContext:
|
||||
class LocalZKContext(BaseZKContext):
|
||||
"""A Local ZKContext that means don't actually write anything to ZK"""
|
||||
|
||||
def __init__(self, log):
|
||||
super().__init__()
|
||||
self.client = None
|
||||
self.lock = None
|
||||
self.stop_event = None
|
||||
|
Loading…
x
Reference in New Issue
Block a user