Store change queues in Zookeeper

The change queue state is store in the following Zookeeper path:

    /zuul/<tenant>/pipeline/<pipeline>/queue/<layout-uuid>/<queue-uuid>

Change-Id: I0a64bd9adc7b9f8f7a775280bb7a01ace22baac4
This commit is contained in:
Simon Westphahl
2021-09-24 16:11:13 +02:00
parent 36f558fdbe
commit bbaffd59b5
11 changed files with 187 additions and 110 deletions

View File

@@ -94,9 +94,10 @@ from zuul.driver.github.githubconnection import GithubClientManager
from zuul.driver.elasticsearch import ElasticsearchDriver
from zuul.lib.collections import DefaultKeyDict
from zuul.lib.connections import ConnectionRegistry
from zuul.zk import ZooKeeperClient
from zuul.zk import zkobject, ZooKeeperClient
from zuul.zk.event_queues import ConnectionEventQueue
from zuul.zk.executor import ExecutorApi
from zuul.zk.locks import SessionAwareLock
from zuul.zk.merger import MergerApi
from psutil import Popen
@@ -4808,6 +4809,10 @@ class ZuulTestCase(BaseTestCase):
self.additional_event_queues = []
self.zk_client = ZooKeeperClient.fromConfig(self.config)
self.zk_client.connect()
self._context_lock = SessionAwareLock(
self.zk_client.client, f"/test/{uuid.uuid4().hex}")
self.connection_event_queues = DefaultKeyDict(
lambda cn: ConnectionEventQueue(self.zk_client, cn)
)
@@ -4859,6 +4864,12 @@ class ZuulTestCase(BaseTestCase):
self.poller_events, self.git_url_with_auth,
self.fake_sql, self.addCleanup, self.validate_tenants)
def createZKContext(self):
# Just make sure the lock is acquired
self._context_lock.acquire(blocking=False)
return zkobject.ZKContext(self.zk_client, self._context_lock,
stop_event=None, log=self.log)
def __event_queues(self, matcher) -> List[Queue]:
# TODO (swestphahl): Can be removed when we no longer use global
# management events.

View File

@@ -38,6 +38,12 @@ class Dummy(object):
setattr(self, k, v)
class DummyChangeQueue(model.ChangeQueue):
def _save(self, ctx, create=False):
pass
class TestJob(BaseTestCase):
def setUp(self):
self._env_fixture = self.useFixture(
@@ -66,7 +72,7 @@ class TestJob(BaseTestCase):
self.pipeline.source_context = self.context
self.pipeline.manager = mock.Mock()
self.layout.addPipeline(self.pipeline)
self.queue = model.ChangeQueue(self.pipeline)
self.queue = DummyChangeQueue.new(None, pipeline=self.pipeline)
self.pcontext = configloader.ParseContext(
self.connections, None, self.tenant, AnsibleManager())
@@ -165,7 +171,7 @@ class TestJob(BaseTestCase):
@mock.patch("zuul.model.zkobject.ZKObject._save")
def test_job_inheritance_job_tree(self, save_mock):
queue = model.ChangeQueue(self.pipeline)
queue = DummyChangeQueue.new(None, pipeline=self.pipeline)
base = self.pcontext.job_parser.fromYaml({
'_source_context': self.context,
@@ -236,7 +242,7 @@ class TestJob(BaseTestCase):
@mock.patch("zuul.model.zkobject.ZKObject._save")
def test_inheritance_keeps_matchers(self, save_mock):
queue = model.ChangeQueue(self.pipeline)
queue = DummyChangeQueue.new(None, pipeline=self.pipeline)
base = self.pcontext.job_parser.fromYaml({
'_source_context': self.context,

View File

@@ -3528,10 +3528,11 @@ class TestScheduler(ZuulTestCase):
FakeChange = namedtuple('FakeChange', ['project', 'branch'])
fake_a = FakeChange(project1, 'master')
fake_b = FakeChange(project2, 'master')
gate.manager.getChangeQueue(fake_a, None)
gate.manager.getChangeQueue(fake_b, None)
q1 = gate.getQueue(project1, None)
q2 = gate.getQueue(project2, None)
with gate.manager.currentContext(self.createZKContext()):
gate.manager.getChangeQueue(fake_a, None)
gate.manager.getChangeQueue(fake_b, None)
q1 = gate.getQueue(project1.canonical_name, None)
q2 = gate.getQueue(project2.canonical_name, None)
self.assertEqual(q1.name, 'integrated')
self.assertEqual(q2.name, 'integrated')
@@ -3549,10 +3550,11 @@ class TestScheduler(ZuulTestCase):
FakeChange = namedtuple('FakeChange', ['project', 'branch'])
fake_a = FakeChange(project1, 'master')
fake_b = FakeChange(project2, 'master')
gate.manager.getChangeQueue(fake_a, None)
gate.manager.getChangeQueue(fake_b, None)
q1 = gate.getQueue(project1, None)
q2 = gate.getQueue(project2, None)
with gate.manager.currentContext(self.createZKContext()):
gate.manager.getChangeQueue(fake_a, None)
gate.manager.getChangeQueue(fake_b, None)
q1 = gate.getQueue(project1.canonical_name, None)
q2 = gate.getQueue(project2.canonical_name, None)
self.assertEqual(q1.name, 'integrated')
self.assertEqual(q2.name, 'integrated')
@@ -3570,10 +3572,11 @@ class TestScheduler(ZuulTestCase):
FakeChange = namedtuple('FakeChange', ['project', 'branch'])
fake_a = FakeChange(project1, 'master')
fake_b = FakeChange(project2, 'master')
gate.manager.getChangeQueue(fake_a, None)
gate.manager.getChangeQueue(fake_b, None)
q1 = gate.getQueue(project1, None)
q2 = gate.getQueue(project2, None)
with gate.manager.currentContext(self.createZKContext()):
gate.manager.getChangeQueue(fake_a, None)
gate.manager.getChangeQueue(fake_b, None)
q1 = gate.getQueue(project1.canonical_name, None)
q2 = gate.getQueue(project2.canonical_name, None)
self.assertEqual(q1.name, 'integrated')
self.assertEqual(q2.name, 'integrated')
@@ -3590,10 +3593,11 @@ class TestScheduler(ZuulTestCase):
FakeChange = namedtuple('FakeChange', ['project', 'branch'])
fake_a = FakeChange(project1, 'master')
fake_b = FakeChange(project2, 'master')
gate.manager.getChangeQueue(fake_a, None)
gate.manager.getChangeQueue(fake_b, None)
q1 = gate.getQueue(project1, None)
q2 = gate.getQueue(project2, None)
with gate.manager.currentContext(self.createZKContext()):
gate.manager.getChangeQueue(fake_a, None)
gate.manager.getChangeQueue(fake_b, None)
q1 = gate.getQueue(project1.canonical_name, None)
q2 = gate.getQueue(project2.canonical_name, None)
self.assertEqual(q1.name, 'integrated')
self.assertEqual(q2.name, 'integrated')
@@ -3610,10 +3614,11 @@ class TestScheduler(ZuulTestCase):
FakeChange = namedtuple('FakeChange', ['project', 'branch'])
fake_a = FakeChange(project1, 'master')
fake_b = FakeChange(project2, 'master')
gate.manager.getChangeQueue(fake_a, None)
gate.manager.getChangeQueue(fake_b, None)
q1 = gate.getQueue(project1, None)
q2 = gate.getQueue(project2, None)
with gate.manager.currentContext(self.createZKContext()):
gate.manager.getChangeQueue(fake_a, None)
gate.manager.getChangeQueue(fake_b, None)
q1 = gate.getQueue(project1.canonical_name, None)
q2 = gate.getQueue(project2.canonical_name, None)
self.assertEqual(q1.name, 'integrated')
self.assertEqual(q2.name, 'integrated')
@@ -6715,8 +6720,10 @@ class TestChangeQueues(ZuulTestCase):
])
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
_, p = tenant.getProject(project)
q1 = tenant.layout.pipelines['gate'].getQueue(p, 'master')
q2 = tenant.layout.pipelines['gate'].getQueue(p, 'stable')
q1 = tenant.layout.pipelines['gate'].getQueue(
p.canonical_name, 'master')
q2 = tenant.layout.pipelines['gate'].getQueue(
p.canonical_name, 'stable')
self.assertEqual(q1.name, queue_name)
self.assertEqual(q2.name, queue_name)
@@ -6772,9 +6779,12 @@ class TestChangeQueues(ZuulTestCase):
])
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
_, p = tenant.getProject(project)
q1 = tenant.layout.pipelines['gate'].getQueue(p, 'master')
q2 = tenant.layout.pipelines['gate'].getQueue(p, 'stable')
q3 = tenant.layout.pipelines['gate'].getQueue(p, None)
q1 = tenant.layout.pipelines['gate'].getQueue(
p.canonical_name, 'master')
q2 = tenant.layout.pipelines['gate'].getQueue(
p.canonical_name, 'stable')
q3 = tenant.layout.pipelines['gate'].getQueue(
p.canonical_name, None)
# There should be no branch specific queues anymore
self.assertEqual(q1, None)

View File

@@ -30,8 +30,9 @@ class DependentPipelineManager(SharedQueuePipelineManager):
def constructChangeQueue(self, queue_name):
p = self.pipeline
return model.ChangeQueue(
p,
return model.ChangeQueue.new(
p.manager.current_context,
pipeline=p,
window=p.window,
window_floor=p.window_floor,
window_increase_type=p.window_increase_type,
@@ -81,16 +82,14 @@ class DependentPipelineManager(SharedQueuePipelineManager):
return
# for project in change_queue, project.source get changes, then dedup.
sources = set()
for project, _ in change_queue.project_branches:
sources.add(project.source)
projects = [self.pipeline.tenant.getProject(pcn)[1] for pcn, _ in
change_queue.project_branches]
sources = {p.source for p in projects}
needed_by_changes = self.resolveChangeKeys(change.needed_by_changes)
seen = set(needed_by_changes)
for source in sources:
log.debug(" Checking source: %s", source)
projects = [project_branch[0]
for project_branch in change_queue.project_branches]
for c in source.getChangesDependingOn(change,
projects,
self.pipeline.tenant):

View File

@@ -30,7 +30,10 @@ class IndependentPipelineManager(PipelineManager):
# instead create a new change queue for every change.
if existing:
return DynamicChangeQueueContextManager(existing)
change_queue = model.ChangeQueue(self.pipeline, dynamic=True)
change_queue = model.ChangeQueue.new(
self.pipeline.manager.current_context,
pipeline=self.pipeline,
dynamic=True)
change_queue.addProject(change.project, None)
self.pipeline.addQueue(change_queue)
log.debug("Dynamically created queue %s", change_queue)

View File

@@ -20,8 +20,9 @@ class SerialPipelineManager(SharedQueuePipelineManager):
changes_merge = False
def constructChangeQueue(self, queue_name):
return model.ChangeQueue(
self.pipeline,
return model.ChangeQueue.new(
self.pipeline.manager.current_context,
pipeline=self.pipeline,
window=1,
window_floor=1,
window_increase_type='none',

View File

@@ -36,12 +36,12 @@ class ChangeQueueManager:
if not change_queue:
p = self.pipeline_manager.pipeline
change_queue = self.pipeline_manager.constructChangeQueue(
self.name)
name = self.name or project.name
change_queue = self.pipeline_manager.constructChangeQueue(name)
p.addQueue(change_queue)
self.created_for_branches[branch] = change_queue
if not change_queue.matches(project, branch):
if not change_queue.matches(project.canonical_name, branch):
change_queue.addProject(project, branch)
self.log.debug("Added project %s to queue: %s" %
(project, change_queue))
@@ -119,7 +119,8 @@ class SharedQueuePipelineManager(PipelineManager, metaclass=ABCMeta):
# Ignore the existing queue, since we can always get the correct queue
# from the pipeline. This avoids enqueuing changes in a wrong queue
# e.g. during re-configuration.
queue = self.pipeline.getQueue(change.project, change.branch)
queue = self.pipeline.getQueue(change.project.canonical_name,
change.branch)
if queue:
return StaticChangeQueueContextManager(queue)
else:
@@ -140,13 +141,16 @@ class SharedQueuePipelineManager(PipelineManager, metaclass=ABCMeta):
)
# No specific per-branch queue matched so look again with no branch
queue = self.pipeline.getQueue(change.project, None)
queue = self.pipeline.getQueue(change.project.canonical_name, None)
if queue:
return StaticChangeQueueContextManager(queue)
# There is no existing queue for this change. Create a
# dynamic one for this one change's use
change_queue = model.ChangeQueue(self.pipeline, dynamic=True)
change_queue = model.ChangeQueue.new(
self.pipeline.manager.current_context,
pipeline=self.pipeline,
dynamic=True)
change_queue.addProject(change.project, None)
self.pipeline.addQueue(change_queue)
log.debug("Dynamically created queue %s", change_queue)

View File

@@ -38,8 +38,9 @@ class SupercedentPipelineManager(PipelineManager):
queue.queue[-1].change.ref == change.ref)):
log.debug("Found existing queue %s", queue)
return DynamicChangeQueueContextManager(queue)
change_queue = model.ChangeQueue(
self.pipeline,
change_queue = model.ChangeQueue.new(
self.pipeline.manager.current_context,
pipeline=self.pipeline,
window=1,
window_floor=1,
window_increase_type='none',

View File

@@ -348,10 +348,10 @@ class Pipeline(object):
def addQueue(self, queue):
self.queues.append(queue)
def getQueue(self, project, branch):
def getQueue(self, project_cname, branch):
# Queues might be branch specific so match with branch
for queue in self.queues:
if queue.matches(project, branch):
if queue.matches(project_cname, branch):
return queue
return None
@@ -363,7 +363,9 @@ class Pipeline(object):
def removeQueue(self, queue):
if queue in self.queues:
self.queues.remove(queue)
with self.state.activeContext(self.manager.current_context):
self.state.queues.remove(queue)
queue.delete(self.manager.current_context)
def getChangesInQueue(self):
changes = []
@@ -463,7 +465,7 @@ class PipelineState(zkobject.ZKObject):
queue.refresh(context)
class ChangeQueue(object):
class ChangeQueue(zkobject.ZKObject):
"""A ChangeQueue contains Changes to be processed for related projects.
A Pipeline with a DependentPipelineManager has multiple parallel
@@ -483,46 +485,72 @@ class ChangeQueue(object):
A ChangeQueue may be a dynamically created queue, which may be removed
from a DependentPipelineManager once empty.
"""
def __init__(self, pipeline, window=0, window_floor=1,
window_increase_type='linear', window_increase_factor=1,
window_decrease_type='exponential', window_decrease_factor=2,
name=None, dynamic=False):
self.pipeline = pipeline
if name:
self.name = name
else:
self.name = ''
self.project_branches = []
self._jobs = set()
self.queue = []
self.window = window
self.window_floor = window_floor
self.window_increase_type = window_increase_type
self.window_increase_factor = window_increase_factor
self.window_decrease_type = window_decrease_type
self.window_decrease_factor = window_decrease_factor
self.dynamic = dynamic
def __init__(self):
super().__init__()
self._set(
uuid=uuid4().hex,
pipeline=None,
name="",
project_branches=[],
_jobs=set(),
queue=[],
window=0,
window_floor=1,
window_increase_type="linear",
window_increase_factor=1,
window_decrease_type="exponential",
window_decrease_factor=2,
dynamic=False,
)
def getPath(self):
queue_id = (self.uuid if self.dynamic
else urllib.parse.quote_plus(self.name))
return f"{self.pipeline.getPath()}/queue/{queue_id}"
def serialize(self):
data = {
"uuid": self.uuid,
"name": self.name,
"project_branches": self.project_branches,
"_jobs": list(self._jobs),
"queue": [i.getPath() for i in self.queue],
"window": self.window,
"window_floor": self.window_floor,
"window_increase_type": self.window_increase_type,
"window_increase_factor": self.window_increase_factor,
"window_decrease_type": self.window_decrease_type,
"window_decrease_factor": self.window_decrease_factor,
"dynamic": self.dynamic,
}
return json.dumps(data).encode("utf8")
def refresh(self, context):
# FIXME: we currently don't have the queue data in ZK, so we just
# construct a similar list of item uuids.
existing_items = {i.uuid: i for i in self.queue}
items_in_queue = existing_items.keys()
for item_uuid in items_in_queue:
item = existing_items.get(item_uuid)
def deserialize(self, raw):
data = super().deserialize(raw)
existing_items = {}
for item in self.queue:
existing_items[item.getPath()] = item
if item.bundle:
existing_items.update({
i.getPath(): i for i in item.bundle.items
})
queue = []
context = self.pipeline.manager.current_context
for item_path in data["queue"]:
item = existing_items.get(item_path)
if item:
item.refresh(context)
else:
# FIXME: this code path is currently unused as we have all
# items in the queue.
item = QueueItem.fromZK(
context,
QueueItem.itemPath(self.pipeline.getPath(), item_uuid))
item = QueueItem.fromZK(context, item_path)
queue.append(item)
data.update({
"_jobs": set(data["_jobs"]),
"queue": queue,
"project_branches": [tuple(pb) for pb in data["project_branches"]],
})
return data
def getPath(self):
pipeline_path = self.pipeline.state.getPath()
return f"{pipeline_path}/queue/{self.uuid}"
@property
def zk_context(self):
@@ -542,15 +570,13 @@ class ChangeQueue(object):
care about branches it can supply None (but must supply None as well
when matching)
"""
project_branch = (project, branch)
project_branch = (project.canonical_name, branch)
if project_branch not in self.project_branches:
self.project_branches.append(project_branch)
with self.activeContext(self.zk_context):
self.project_branches.append(project_branch)
if not self.name:
self.name = project.name
def matches(self, project, branch):
return (project, branch) in self.project_branches
def matches(self, project_cname, branch):
return (project_cname, branch) in self.project_branches
def enqueueChange(self, change, event):
item = QueueItem.new(self.zk_context,
@@ -567,17 +593,20 @@ class ChangeQueue(object):
item._set(pipeline=self.pipeline, queue=self)
if self.queue:
item.updateAttributes(self.zk_context, item_ahead=self.queue[-1])
item.item_ahead.items_behind.append(item)
self.queue.append(item)
with item.item_ahead.activeContext(self.zk_context):
item.item_ahead.items_behind.append(item)
with self.activeContext(self.zk_context):
self.queue.append(item)
def dequeueItem(self, item):
if item in self.queue:
self.queue.remove(item)
with self.activeContext(self.zk_context):
self.queue.remove(item)
if item.item_ahead:
item.item_ahead.items_behind.remove(item)
with item.item_ahead.activeContext(self.zk_context):
item.item_ahead.items_behind.remove(item)
item.item_ahead.items_behind.extend(item.items_behind)
for item_behind in item.items_behind:
if item.item_ahead:
item.item_ahead.items_behind.append(item_behind)
item_behind.updateAttributes(self.zk_context,
item_ahead=item.item_ahead)
@@ -602,20 +631,21 @@ class ChangeQueue(object):
return False
# Remove from current location
if item.item_ahead:
item.item_ahead.items_behind.remove(item)
with item.item_ahead.activeContext(self.zk_context):
item.item_ahead.items_behind.remove(item)
item.item_ahead.items_behind.extend(item.items_behind)
for item_behind in item.items_behind:
if item.item_ahead:
item.item_ahead.items_behind.append(item_behind)
item_behind.updateAttributes(
self.zk_context,
item_ahead=item.item_ahead)
item_behind.updateAttributes(
self.zk_context,
item_ahead=item.item_ahead)
# Add to new location
item.updateAttributes(
self.zk_context,
item_ahead=item_ahead,
items_behind=[])
if item.item_ahead:
item.item_ahead.items_behind.append(item)
with item.item_ahead.activeContext(self.zk_context):
item.item_ahead.items_behind.append(item)
return True
def isActionable(self, item):
@@ -630,14 +660,18 @@ class ChangeQueue(object):
return item in self.queue[:window]
def increaseWindowSize(self):
if self.window:
if not self.window:
return
with self.activeContext(self.zk_context):
if self.window_increase_type == 'linear':
self.window += self.window_increase_factor
elif self.window_increase_type == 'exponential':
self.window *= self.window_increase_factor
def decreaseWindowSize(self):
if self.window:
if not self.window:
return
with self.activeContext(self.zk_context):
if self.window_decrease_type == 'linear':
self.window = max(
self.window_floor,

View File

@@ -35,6 +35,13 @@ class LocalQueueItem(model.QueueItem):
pass
class LocalChangeQueue(model.ChangeQueue):
"""Local non-persistent change queue."""
def _save(self, ctx, create=False):
pass
class RPCListenerBase(metaclass=ABCMeta):
log = logging.getLogger("zuul.RPCListenerBase")
thread_name = 'zuul-rpc-gearman-worker'
@@ -447,7 +454,7 @@ class RPCListener(RPCListenerBase):
change = model.Branch(project)
change.branch = args.get("branch", "master")
queue = model.ChangeQueue(pipeline)
queue = LocalChangeQueue.new(None, pipeline=pipeline)
item = LocalQueueItem.new(None, queue=queue, change=change,
pipeline=queue.pipeline)
item.freezeJobGraph(tenant.layout, skip_file_matcher=True)
@@ -478,7 +485,7 @@ class RPCListener(RPCListenerBase):
change = model.Branch(project)
change.branch = args.get("branch", "master")
queue = model.ChangeQueue(pipeline)
queue = LocalChangeQueue.new(None, pipeline=pipeline)
item = LocalQueueItem.new(None, queue=queue, change=change,
pipeline=queue.pipeline)
item.freezeJobGraph(tenant.layout, skip_file_matcher=True)

View File

@@ -1393,6 +1393,7 @@ class Scheduler(threading.Thread):
"Canceling node request %s during reconfiguration",
request)
self.cancelJob(build_set, request_job, force=True)
shared_queue.delete(pipeline.manager.current_context)
def _doPromoteEvent(self, event):
tenant = self.abide.tenants.get(event.tenant_name)