From bbaffd59b522f86d9a47799f470429b76a1d73ae Mon Sep 17 00:00:00 2001 From: Simon Westphahl Date: Fri, 24 Sep 2021 16:11:13 +0200 Subject: [PATCH] Store change queues in Zookeeper The change queue state is store in the following Zookeeper path: /zuul//pipeline//queue// Change-Id: I0a64bd9adc7b9f8f7a775280bb7a01ace22baac4 --- tests/base.py | 13 ++- tests/unit/test_model.py | 12 ++- tests/unit/test_scheduler.py | 60 ++++++++------ zuul/manager/dependent.py | 13 ++- zuul/manager/independent.py | 5 +- zuul/manager/serial.py | 5 +- zuul/manager/shared.py | 16 ++-- zuul/manager/supercedent.py | 5 +- zuul/model.py | 156 +++++++++++++++++++++-------------- zuul/rpclistener.py | 11 ++- zuul/scheduler.py | 1 + 11 files changed, 187 insertions(+), 110 deletions(-) diff --git a/tests/base.py b/tests/base.py index eb6783c3ec..a26e8a892f 100644 --- a/tests/base.py +++ b/tests/base.py @@ -94,9 +94,10 @@ from zuul.driver.github.githubconnection import GithubClientManager from zuul.driver.elasticsearch import ElasticsearchDriver from zuul.lib.collections import DefaultKeyDict from zuul.lib.connections import ConnectionRegistry -from zuul.zk import ZooKeeperClient +from zuul.zk import zkobject, ZooKeeperClient from zuul.zk.event_queues import ConnectionEventQueue from zuul.zk.executor import ExecutorApi +from zuul.zk.locks import SessionAwareLock from zuul.zk.merger import MergerApi from psutil import Popen @@ -4808,6 +4809,10 @@ class ZuulTestCase(BaseTestCase): self.additional_event_queues = [] self.zk_client = ZooKeeperClient.fromConfig(self.config) self.zk_client.connect() + + self._context_lock = SessionAwareLock( + self.zk_client.client, f"/test/{uuid.uuid4().hex}") + self.connection_event_queues = DefaultKeyDict( lambda cn: ConnectionEventQueue(self.zk_client, cn) ) @@ -4859,6 +4864,12 @@ class ZuulTestCase(BaseTestCase): self.poller_events, self.git_url_with_auth, self.fake_sql, self.addCleanup, self.validate_tenants) + def createZKContext(self): + # Just make sure the lock is acquired + self._context_lock.acquire(blocking=False) + return zkobject.ZKContext(self.zk_client, self._context_lock, + stop_event=None, log=self.log) + def __event_queues(self, matcher) -> List[Queue]: # TODO (swestphahl): Can be removed when we no longer use global # management events. diff --git a/tests/unit/test_model.py b/tests/unit/test_model.py index aa12db80d0..39e441d88b 100644 --- a/tests/unit/test_model.py +++ b/tests/unit/test_model.py @@ -38,6 +38,12 @@ class Dummy(object): setattr(self, k, v) +class DummyChangeQueue(model.ChangeQueue): + + def _save(self, ctx, create=False): + pass + + class TestJob(BaseTestCase): def setUp(self): self._env_fixture = self.useFixture( @@ -66,7 +72,7 @@ class TestJob(BaseTestCase): self.pipeline.source_context = self.context self.pipeline.manager = mock.Mock() self.layout.addPipeline(self.pipeline) - self.queue = model.ChangeQueue(self.pipeline) + self.queue = DummyChangeQueue.new(None, pipeline=self.pipeline) self.pcontext = configloader.ParseContext( self.connections, None, self.tenant, AnsibleManager()) @@ -165,7 +171,7 @@ class TestJob(BaseTestCase): @mock.patch("zuul.model.zkobject.ZKObject._save") def test_job_inheritance_job_tree(self, save_mock): - queue = model.ChangeQueue(self.pipeline) + queue = DummyChangeQueue.new(None, pipeline=self.pipeline) base = self.pcontext.job_parser.fromYaml({ '_source_context': self.context, @@ -236,7 +242,7 @@ class TestJob(BaseTestCase): @mock.patch("zuul.model.zkobject.ZKObject._save") def test_inheritance_keeps_matchers(self, save_mock): - queue = model.ChangeQueue(self.pipeline) + queue = DummyChangeQueue.new(None, pipeline=self.pipeline) base = self.pcontext.job_parser.fromYaml({ '_source_context': self.context, diff --git a/tests/unit/test_scheduler.py b/tests/unit/test_scheduler.py index 56b9b33b32..7df1415d2c 100644 --- a/tests/unit/test_scheduler.py +++ b/tests/unit/test_scheduler.py @@ -3528,10 +3528,11 @@ class TestScheduler(ZuulTestCase): FakeChange = namedtuple('FakeChange', ['project', 'branch']) fake_a = FakeChange(project1, 'master') fake_b = FakeChange(project2, 'master') - gate.manager.getChangeQueue(fake_a, None) - gate.manager.getChangeQueue(fake_b, None) - q1 = gate.getQueue(project1, None) - q2 = gate.getQueue(project2, None) + with gate.manager.currentContext(self.createZKContext()): + gate.manager.getChangeQueue(fake_a, None) + gate.manager.getChangeQueue(fake_b, None) + q1 = gate.getQueue(project1.canonical_name, None) + q2 = gate.getQueue(project2.canonical_name, None) self.assertEqual(q1.name, 'integrated') self.assertEqual(q2.name, 'integrated') @@ -3549,10 +3550,11 @@ class TestScheduler(ZuulTestCase): FakeChange = namedtuple('FakeChange', ['project', 'branch']) fake_a = FakeChange(project1, 'master') fake_b = FakeChange(project2, 'master') - gate.manager.getChangeQueue(fake_a, None) - gate.manager.getChangeQueue(fake_b, None) - q1 = gate.getQueue(project1, None) - q2 = gate.getQueue(project2, None) + with gate.manager.currentContext(self.createZKContext()): + gate.manager.getChangeQueue(fake_a, None) + gate.manager.getChangeQueue(fake_b, None) + q1 = gate.getQueue(project1.canonical_name, None) + q2 = gate.getQueue(project2.canonical_name, None) self.assertEqual(q1.name, 'integrated') self.assertEqual(q2.name, 'integrated') @@ -3570,10 +3572,11 @@ class TestScheduler(ZuulTestCase): FakeChange = namedtuple('FakeChange', ['project', 'branch']) fake_a = FakeChange(project1, 'master') fake_b = FakeChange(project2, 'master') - gate.manager.getChangeQueue(fake_a, None) - gate.manager.getChangeQueue(fake_b, None) - q1 = gate.getQueue(project1, None) - q2 = gate.getQueue(project2, None) + with gate.manager.currentContext(self.createZKContext()): + gate.manager.getChangeQueue(fake_a, None) + gate.manager.getChangeQueue(fake_b, None) + q1 = gate.getQueue(project1.canonical_name, None) + q2 = gate.getQueue(project2.canonical_name, None) self.assertEqual(q1.name, 'integrated') self.assertEqual(q2.name, 'integrated') @@ -3590,10 +3593,11 @@ class TestScheduler(ZuulTestCase): FakeChange = namedtuple('FakeChange', ['project', 'branch']) fake_a = FakeChange(project1, 'master') fake_b = FakeChange(project2, 'master') - gate.manager.getChangeQueue(fake_a, None) - gate.manager.getChangeQueue(fake_b, None) - q1 = gate.getQueue(project1, None) - q2 = gate.getQueue(project2, None) + with gate.manager.currentContext(self.createZKContext()): + gate.manager.getChangeQueue(fake_a, None) + gate.manager.getChangeQueue(fake_b, None) + q1 = gate.getQueue(project1.canonical_name, None) + q2 = gate.getQueue(project2.canonical_name, None) self.assertEqual(q1.name, 'integrated') self.assertEqual(q2.name, 'integrated') @@ -3610,10 +3614,11 @@ class TestScheduler(ZuulTestCase): FakeChange = namedtuple('FakeChange', ['project', 'branch']) fake_a = FakeChange(project1, 'master') fake_b = FakeChange(project2, 'master') - gate.manager.getChangeQueue(fake_a, None) - gate.manager.getChangeQueue(fake_b, None) - q1 = gate.getQueue(project1, None) - q2 = gate.getQueue(project2, None) + with gate.manager.currentContext(self.createZKContext()): + gate.manager.getChangeQueue(fake_a, None) + gate.manager.getChangeQueue(fake_b, None) + q1 = gate.getQueue(project1.canonical_name, None) + q2 = gate.getQueue(project2.canonical_name, None) self.assertEqual(q1.name, 'integrated') self.assertEqual(q2.name, 'integrated') @@ -6715,8 +6720,10 @@ class TestChangeQueues(ZuulTestCase): ]) tenant = self.scheds.first.sched.abide.tenants.get('tenant-one') _, p = tenant.getProject(project) - q1 = tenant.layout.pipelines['gate'].getQueue(p, 'master') - q2 = tenant.layout.pipelines['gate'].getQueue(p, 'stable') + q1 = tenant.layout.pipelines['gate'].getQueue( + p.canonical_name, 'master') + q2 = tenant.layout.pipelines['gate'].getQueue( + p.canonical_name, 'stable') self.assertEqual(q1.name, queue_name) self.assertEqual(q2.name, queue_name) @@ -6772,9 +6779,12 @@ class TestChangeQueues(ZuulTestCase): ]) tenant = self.scheds.first.sched.abide.tenants.get('tenant-one') _, p = tenant.getProject(project) - q1 = tenant.layout.pipelines['gate'].getQueue(p, 'master') - q2 = tenant.layout.pipelines['gate'].getQueue(p, 'stable') - q3 = tenant.layout.pipelines['gate'].getQueue(p, None) + q1 = tenant.layout.pipelines['gate'].getQueue( + p.canonical_name, 'master') + q2 = tenant.layout.pipelines['gate'].getQueue( + p.canonical_name, 'stable') + q3 = tenant.layout.pipelines['gate'].getQueue( + p.canonical_name, None) # There should be no branch specific queues anymore self.assertEqual(q1, None) diff --git a/zuul/manager/dependent.py b/zuul/manager/dependent.py index 24b7306085..0d98114c4a 100644 --- a/zuul/manager/dependent.py +++ b/zuul/manager/dependent.py @@ -30,8 +30,9 @@ class DependentPipelineManager(SharedQueuePipelineManager): def constructChangeQueue(self, queue_name): p = self.pipeline - return model.ChangeQueue( - p, + return model.ChangeQueue.new( + p.manager.current_context, + pipeline=p, window=p.window, window_floor=p.window_floor, window_increase_type=p.window_increase_type, @@ -81,16 +82,14 @@ class DependentPipelineManager(SharedQueuePipelineManager): return # for project in change_queue, project.source get changes, then dedup. - sources = set() - for project, _ in change_queue.project_branches: - sources.add(project.source) + projects = [self.pipeline.tenant.getProject(pcn)[1] for pcn, _ in + change_queue.project_branches] + sources = {p.source for p in projects} needed_by_changes = self.resolveChangeKeys(change.needed_by_changes) seen = set(needed_by_changes) for source in sources: log.debug(" Checking source: %s", source) - projects = [project_branch[0] - for project_branch in change_queue.project_branches] for c in source.getChangesDependingOn(change, projects, self.pipeline.tenant): diff --git a/zuul/manager/independent.py b/zuul/manager/independent.py index 2bc5bceb35..3336295303 100644 --- a/zuul/manager/independent.py +++ b/zuul/manager/independent.py @@ -30,7 +30,10 @@ class IndependentPipelineManager(PipelineManager): # instead create a new change queue for every change. if existing: return DynamicChangeQueueContextManager(existing) - change_queue = model.ChangeQueue(self.pipeline, dynamic=True) + change_queue = model.ChangeQueue.new( + self.pipeline.manager.current_context, + pipeline=self.pipeline, + dynamic=True) change_queue.addProject(change.project, None) self.pipeline.addQueue(change_queue) log.debug("Dynamically created queue %s", change_queue) diff --git a/zuul/manager/serial.py b/zuul/manager/serial.py index caaa147c6f..1caec68bb7 100644 --- a/zuul/manager/serial.py +++ b/zuul/manager/serial.py @@ -20,8 +20,9 @@ class SerialPipelineManager(SharedQueuePipelineManager): changes_merge = False def constructChangeQueue(self, queue_name): - return model.ChangeQueue( - self.pipeline, + return model.ChangeQueue.new( + self.pipeline.manager.current_context, + pipeline=self.pipeline, window=1, window_floor=1, window_increase_type='none', diff --git a/zuul/manager/shared.py b/zuul/manager/shared.py index c45cd7d690..db87352895 100644 --- a/zuul/manager/shared.py +++ b/zuul/manager/shared.py @@ -36,12 +36,12 @@ class ChangeQueueManager: if not change_queue: p = self.pipeline_manager.pipeline - change_queue = self.pipeline_manager.constructChangeQueue( - self.name) + name = self.name or project.name + change_queue = self.pipeline_manager.constructChangeQueue(name) p.addQueue(change_queue) self.created_for_branches[branch] = change_queue - if not change_queue.matches(project, branch): + if not change_queue.matches(project.canonical_name, branch): change_queue.addProject(project, branch) self.log.debug("Added project %s to queue: %s" % (project, change_queue)) @@ -119,7 +119,8 @@ class SharedQueuePipelineManager(PipelineManager, metaclass=ABCMeta): # Ignore the existing queue, since we can always get the correct queue # from the pipeline. This avoids enqueuing changes in a wrong queue # e.g. during re-configuration. - queue = self.pipeline.getQueue(change.project, change.branch) + queue = self.pipeline.getQueue(change.project.canonical_name, + change.branch) if queue: return StaticChangeQueueContextManager(queue) else: @@ -140,13 +141,16 @@ class SharedQueuePipelineManager(PipelineManager, metaclass=ABCMeta): ) # No specific per-branch queue matched so look again with no branch - queue = self.pipeline.getQueue(change.project, None) + queue = self.pipeline.getQueue(change.project.canonical_name, None) if queue: return StaticChangeQueueContextManager(queue) # There is no existing queue for this change. Create a # dynamic one for this one change's use - change_queue = model.ChangeQueue(self.pipeline, dynamic=True) + change_queue = model.ChangeQueue.new( + self.pipeline.manager.current_context, + pipeline=self.pipeline, + dynamic=True) change_queue.addProject(change.project, None) self.pipeline.addQueue(change_queue) log.debug("Dynamically created queue %s", change_queue) diff --git a/zuul/manager/supercedent.py b/zuul/manager/supercedent.py index d55533a50a..9c65169d3b 100644 --- a/zuul/manager/supercedent.py +++ b/zuul/manager/supercedent.py @@ -38,8 +38,9 @@ class SupercedentPipelineManager(PipelineManager): queue.queue[-1].change.ref == change.ref)): log.debug("Found existing queue %s", queue) return DynamicChangeQueueContextManager(queue) - change_queue = model.ChangeQueue( - self.pipeline, + change_queue = model.ChangeQueue.new( + self.pipeline.manager.current_context, + pipeline=self.pipeline, window=1, window_floor=1, window_increase_type='none', diff --git a/zuul/model.py b/zuul/model.py index 625e60899f..060db346e3 100644 --- a/zuul/model.py +++ b/zuul/model.py @@ -348,10 +348,10 @@ class Pipeline(object): def addQueue(self, queue): self.queues.append(queue) - def getQueue(self, project, branch): + def getQueue(self, project_cname, branch): # Queues might be branch specific so match with branch for queue in self.queues: - if queue.matches(project, branch): + if queue.matches(project_cname, branch): return queue return None @@ -363,7 +363,9 @@ class Pipeline(object): def removeQueue(self, queue): if queue in self.queues: - self.queues.remove(queue) + with self.state.activeContext(self.manager.current_context): + self.state.queues.remove(queue) + queue.delete(self.manager.current_context) def getChangesInQueue(self): changes = [] @@ -463,7 +465,7 @@ class PipelineState(zkobject.ZKObject): queue.refresh(context) -class ChangeQueue(object): +class ChangeQueue(zkobject.ZKObject): """A ChangeQueue contains Changes to be processed for related projects. A Pipeline with a DependentPipelineManager has multiple parallel @@ -483,46 +485,72 @@ class ChangeQueue(object): A ChangeQueue may be a dynamically created queue, which may be removed from a DependentPipelineManager once empty. """ - def __init__(self, pipeline, window=0, window_floor=1, - window_increase_type='linear', window_increase_factor=1, - window_decrease_type='exponential', window_decrease_factor=2, - name=None, dynamic=False): - self.pipeline = pipeline - if name: - self.name = name - else: - self.name = '' - self.project_branches = [] - self._jobs = set() - self.queue = [] - self.window = window - self.window_floor = window_floor - self.window_increase_type = window_increase_type - self.window_increase_factor = window_increase_factor - self.window_decrease_type = window_decrease_type - self.window_decrease_factor = window_decrease_factor - self.dynamic = dynamic + def __init__(self): + super().__init__() + self._set( + uuid=uuid4().hex, + pipeline=None, + name="", + project_branches=[], + _jobs=set(), + queue=[], + window=0, + window_floor=1, + window_increase_type="linear", + window_increase_factor=1, + window_decrease_type="exponential", + window_decrease_factor=2, + dynamic=False, + ) - def getPath(self): - queue_id = (self.uuid if self.dynamic - else urllib.parse.quote_plus(self.name)) - return f"{self.pipeline.getPath()}/queue/{queue_id}" + def serialize(self): + data = { + "uuid": self.uuid, + "name": self.name, + "project_branches": self.project_branches, + "_jobs": list(self._jobs), + "queue": [i.getPath() for i in self.queue], + "window": self.window, + "window_floor": self.window_floor, + "window_increase_type": self.window_increase_type, + "window_increase_factor": self.window_increase_factor, + "window_decrease_type": self.window_decrease_type, + "window_decrease_factor": self.window_decrease_factor, + "dynamic": self.dynamic, + } + return json.dumps(data).encode("utf8") - def refresh(self, context): - # FIXME: we currently don't have the queue data in ZK, so we just - # construct a similar list of item uuids. - existing_items = {i.uuid: i for i in self.queue} - items_in_queue = existing_items.keys() - for item_uuid in items_in_queue: - item = existing_items.get(item_uuid) + def deserialize(self, raw): + data = super().deserialize(raw) + + existing_items = {} + for item in self.queue: + existing_items[item.getPath()] = item + if item.bundle: + existing_items.update({ + i.getPath(): i for i in item.bundle.items + }) + + queue = [] + context = self.pipeline.manager.current_context + for item_path in data["queue"]: + item = existing_items.get(item_path) if item: item.refresh(context) else: - # FIXME: this code path is currently unused as we have all - # items in the queue. - item = QueueItem.fromZK( - context, - QueueItem.itemPath(self.pipeline.getPath(), item_uuid)) + item = QueueItem.fromZK(context, item_path) + queue.append(item) + + data.update({ + "_jobs": set(data["_jobs"]), + "queue": queue, + "project_branches": [tuple(pb) for pb in data["project_branches"]], + }) + return data + + def getPath(self): + pipeline_path = self.pipeline.state.getPath() + return f"{pipeline_path}/queue/{self.uuid}" @property def zk_context(self): @@ -542,15 +570,13 @@ class ChangeQueue(object): care about branches it can supply None (but must supply None as well when matching) """ - project_branch = (project, branch) + project_branch = (project.canonical_name, branch) if project_branch not in self.project_branches: - self.project_branches.append(project_branch) + with self.activeContext(self.zk_context): + self.project_branches.append(project_branch) - if not self.name: - self.name = project.name - - def matches(self, project, branch): - return (project, branch) in self.project_branches + def matches(self, project_cname, branch): + return (project_cname, branch) in self.project_branches def enqueueChange(self, change, event): item = QueueItem.new(self.zk_context, @@ -567,17 +593,20 @@ class ChangeQueue(object): item._set(pipeline=self.pipeline, queue=self) if self.queue: item.updateAttributes(self.zk_context, item_ahead=self.queue[-1]) - item.item_ahead.items_behind.append(item) - self.queue.append(item) + with item.item_ahead.activeContext(self.zk_context): + item.item_ahead.items_behind.append(item) + with self.activeContext(self.zk_context): + self.queue.append(item) def dequeueItem(self, item): if item in self.queue: - self.queue.remove(item) + with self.activeContext(self.zk_context): + self.queue.remove(item) if item.item_ahead: - item.item_ahead.items_behind.remove(item) + with item.item_ahead.activeContext(self.zk_context): + item.item_ahead.items_behind.remove(item) + item.item_ahead.items_behind.extend(item.items_behind) for item_behind in item.items_behind: - if item.item_ahead: - item.item_ahead.items_behind.append(item_behind) item_behind.updateAttributes(self.zk_context, item_ahead=item.item_ahead) @@ -602,20 +631,21 @@ class ChangeQueue(object): return False # Remove from current location if item.item_ahead: - item.item_ahead.items_behind.remove(item) + with item.item_ahead.activeContext(self.zk_context): + item.item_ahead.items_behind.remove(item) + item.item_ahead.items_behind.extend(item.items_behind) for item_behind in item.items_behind: - if item.item_ahead: - item.item_ahead.items_behind.append(item_behind) - item_behind.updateAttributes( - self.zk_context, - item_ahead=item.item_ahead) + item_behind.updateAttributes( + self.zk_context, + item_ahead=item.item_ahead) # Add to new location item.updateAttributes( self.zk_context, item_ahead=item_ahead, items_behind=[]) if item.item_ahead: - item.item_ahead.items_behind.append(item) + with item.item_ahead.activeContext(self.zk_context): + item.item_ahead.items_behind.append(item) return True def isActionable(self, item): @@ -630,14 +660,18 @@ class ChangeQueue(object): return item in self.queue[:window] def increaseWindowSize(self): - if self.window: + if not self.window: + return + with self.activeContext(self.zk_context): if self.window_increase_type == 'linear': self.window += self.window_increase_factor elif self.window_increase_type == 'exponential': self.window *= self.window_increase_factor def decreaseWindowSize(self): - if self.window: + if not self.window: + return + with self.activeContext(self.zk_context): if self.window_decrease_type == 'linear': self.window = max( self.window_floor, diff --git a/zuul/rpclistener.py b/zuul/rpclistener.py index 6f4cad4e5b..427b8de00f 100644 --- a/zuul/rpclistener.py +++ b/zuul/rpclistener.py @@ -35,6 +35,13 @@ class LocalQueueItem(model.QueueItem): pass +class LocalChangeQueue(model.ChangeQueue): + """Local non-persistent change queue.""" + + def _save(self, ctx, create=False): + pass + + class RPCListenerBase(metaclass=ABCMeta): log = logging.getLogger("zuul.RPCListenerBase") thread_name = 'zuul-rpc-gearman-worker' @@ -447,7 +454,7 @@ class RPCListener(RPCListenerBase): change = model.Branch(project) change.branch = args.get("branch", "master") - queue = model.ChangeQueue(pipeline) + queue = LocalChangeQueue.new(None, pipeline=pipeline) item = LocalQueueItem.new(None, queue=queue, change=change, pipeline=queue.pipeline) item.freezeJobGraph(tenant.layout, skip_file_matcher=True) @@ -478,7 +485,7 @@ class RPCListener(RPCListenerBase): change = model.Branch(project) change.branch = args.get("branch", "master") - queue = model.ChangeQueue(pipeline) + queue = LocalChangeQueue.new(None, pipeline=pipeline) item = LocalQueueItem.new(None, queue=queue, change=change, pipeline=queue.pipeline) item.freezeJobGraph(tenant.layout, skip_file_matcher=True) diff --git a/zuul/scheduler.py b/zuul/scheduler.py index 738669f1b9..71dcd4a564 100644 --- a/zuul/scheduler.py +++ b/zuul/scheduler.py @@ -1393,6 +1393,7 @@ class Scheduler(threading.Thread): "Canceling node request %s during reconfiguration", request) self.cancelJob(build_set, request_job, force=True) + shared_queue.delete(pipeline.manager.current_context) def _doPromoteEvent(self, event): tenant = self.abide.tenants.get(event.tenant_name)