Add serial pipeline manager

This is designed to handle the case where we want:

* The pipeline to be triggered by changes so results report back to
  the change.
* Triggered on change merged.
* Jobs with file matchers so that if a subsystem is changed, only the
  deployment job for that subsystem is run.
* Every change is processed in strict sequence.

This is designed to accomodate a deployment pipeline with the above
constraints.

The pipeline manager hierarchy is getting complicated; mark the base
class as abstract, and also move the shared-queue methods into an
intermediate abstract class.  These are shared by both serial and
dependent managers.

Change-Id: I3c5f3b2f6298292c5c25665923e3a10b07be5419
This commit is contained in:
James E. Blair 2020-04-24 11:03:48 -07:00
parent ccddc37fa0
commit 22640baef4
10 changed files with 377 additions and 73 deletions

View File

@ -99,6 +99,24 @@
gerrit: {}
sqlreporter:
- pipeline:
name: deploy
description: |
This pipeline runs jobs that operate after each change is merged
in order to deploy to production.
manager: serial
precedence: high
post-review: True
trigger:
gerrit:
- event: change-merged
success:
gerrit: {}
sqlreporter:
failure:
gerrit: {}
sqlreporter:
- pipeline:
name: release
description: |

View File

@ -63,7 +63,18 @@ success, the pipeline reports back to Gerrit with ``Verified`` vote of
.. attr:: manager
:required:
There are three schemes for managing pipelines:
There are several schemes for managing pipelines. The following
table summarizes their features; each is described in detail
below.
=========== ============ ===== ============= =========
Manager Dependencies Merge Shared Queues Window
=========== ============ ===== ============= =========
Independent No No No Unlimited
Dependent Yes Yes Yes Variable
Serial No No Yes 1
Supercedent No No Project-ref 1
=========== ============ ===== ============= =========
.. value:: independent
@ -107,6 +118,22 @@ success, the pipeline reports back to Gerrit with ``Verified`` vote of
For more detail on the theory and operation of Zuul's
dependent pipeline manager, see: :doc:`/discussion/gating`.
.. value:: serial
This pipeline manager supports shared queues (like depedent
pipelines) but only one item in each shared queue is
processed at a time.
This may be useful for post-merge pipelines which perform
partial production deployments (i.e., there are jobs with
file matchers which only deploy to affected parts of the
system). In such a case it is important for every change to
be processed, but they must still be processed one at a time
in order to ensure that the production system is not
inadvertently regressed. Support for shared queues ensures
that if multiple projects are involved deployment runs still
execute sequentially.
.. value:: supercedent
This is like an independent pipeline, in that every item is
@ -124,11 +151,12 @@ success, the pipeline reports back to Gerrit with ``Verified`` vote of
these cases, build resources can be conserved by avoiding
building intermediate versions.
.. note:: Since this pipeline filters intermediate buildsets using
it in combination with file filters on jobs is dangerous.
In this case jobs of in between buildsets can be
unexpectedly skipped entirely. If file filters are needed
the independent pipeline manager should be used.
.. note:: Since this pipeline filters intermediate buildsets
using it in combination with file filters on jobs
is dangerous. In this case jobs of in between
buildsets can be unexpectedly skipped entirely. If
file filters are needed the ``independent`` or
``serial`` pipeline managers should be used.
.. attr:: post-review
:default: false

View File

@ -0,0 +1,6 @@
features:
- |
The :value:`pipeline.manager.serial` pipeline manager has been
added. It is designed to handle serialized deployment pipelines
where supercedent is unsuitable in the case that not all jobs run
on every merge.

48
tests/fixtures/layouts/serial.yaml vendored Normal file
View File

@ -0,0 +1,48 @@
- pipeline:
name: deploy
manager: serial
trigger:
gerrit:
- event: change-merged
post-review: True
success:
gerrit: {}
failure:
gerrit: {}
- job:
name: base
parent: null
nodeset:
nodes:
- label: ubuntu-xenial
name: controller
- job:
name: job1
run: playbooks/job1.yaml
- job:
name: job2
run: playbooks/job2.yaml
- project:
name: org/project
deploy:
jobs:
- job1
- job2
- project:
name: org/project1
deploy:
queue: shared
jobs:
- job1
- project:
name: org/project2
deploy:
queue: shared
jobs:
- job1

130
tests/unit/test_serial.py Normal file
View File

@ -0,0 +1,130 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tests.base import (
ZuulTestCase,
simple_layout,
)
class TestSerial(ZuulTestCase):
tenant_config_file = 'config/single-tenant/main.yaml'
@simple_layout('layouts/serial.yaml')
def test_deploy_window(self):
self.executor_server.hold_jobs_in_build = True
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
A.setMerged()
self.fake_gerrit.addEvent(A.getChangeMergedEvent())
self.waitUntilSettled()
# The gerrit upstream repo simulation isn't perfect -- when
# change A is merged above, the master ref is updated to point
# to that change, it doesn't actually "merge" it. The same is
# true for B, so if it didn't have A in its git history, then
# A would not appear in the jobs run for B. We simulate the
# correct situation by setting A as the git parent of B.
B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B',
parent='refs/changes/1/1/1')
B.setMerged()
self.fake_gerrit.addEvent(B.getChangeMergedEvent())
self.waitUntilSettled()
self.assertEqual(len(self.builds), 2)
self.assertTrue(self.builds[0].hasChanges(A))
self.assertTrue(self.builds[1].hasChanges(A))
self.assertFalse(self.builds[0].hasChanges(B))
self.assertFalse(self.builds[1].hasChanges(B))
self.executor_server.release()
self.waitUntilSettled()
self.assertEqual(len(self.builds), 2)
self.assertTrue(self.builds[0].hasChanges(A))
self.assertTrue(self.builds[1].hasChanges(A))
self.assertTrue(self.builds[0].hasChanges(B))
self.assertTrue(self.builds[1].hasChanges(B))
self.executor_server.release()
self.waitUntilSettled()
self.assertEqual(A.reported, 1)
self.assertEqual(B.reported, 1)
self.assertHistory([
dict(name='job1', result='SUCCESS', changes='1,1'),
dict(name='job2', result='SUCCESS', changes='1,1'),
dict(name='job1', result='SUCCESS', changes='2,1'),
dict(name='job2', result='SUCCESS', changes='2,1'),
], ordered=False)
@simple_layout('layouts/serial.yaml')
def test_deploy_shared(self):
# Same as test_deploy_window but with two separate projects
# sharing a queue.
self.executor_server.hold_jobs_in_build = True
A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
A.setMerged()
self.fake_gerrit.addEvent(A.getChangeMergedEvent())
self.waitUntilSettled()
B = self.fake_gerrit.addFakeChange('org/project2', 'master', 'B')
B.setMerged()
self.fake_gerrit.addEvent(B.getChangeMergedEvent())
self.waitUntilSettled()
self.assertEqual(len(self.builds), 1)
self.assertTrue(self.builds[0].hasChanges(A))
self.executor_server.release()
self.waitUntilSettled()
self.assertEqual(len(self.builds), 1)
self.assertTrue(self.builds[0].hasChanges(B))
self.executor_server.release()
self.waitUntilSettled()
self.assertEqual(A.reported, 1)
self.assertEqual(B.reported, 1)
self.assertHistory([
dict(name='job1', result='SUCCESS', changes='1,1'),
dict(name='job1', result='SUCCESS', changes='2,1'),
], ordered=False)
@simple_layout('layouts/serial.yaml')
def test_deploy_unshared(self):
# Test two projects which don't share a queue.
self.executor_server.hold_jobs_in_build = True
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
A.setMerged()
self.fake_gerrit.addEvent(A.getChangeMergedEvent())
self.waitUntilSettled()
B = self.fake_gerrit.addFakeChange('org/project1', 'master', 'B')
B.setMerged()
self.fake_gerrit.addEvent(B.getChangeMergedEvent())
self.waitUntilSettled()
self.assertEqual(len(self.builds), 3)
self.assertTrue(self.builds[0].hasChanges(A))
self.assertTrue(self.builds[1].hasChanges(A))
self.assertTrue(self.builds[2].hasChanges(B))
self.assertFalse(self.builds[2].hasChanges(A))
self.executor_server.release()
self.waitUntilSettled()
self.assertEqual(A.reported, 1)
self.assertEqual(B.reported, 1)
self.assertHistory([
dict(name='job1', result='SUCCESS', changes='1,1'),
dict(name='job2', result='SUCCESS', changes='1,1'),
dict(name='job1', result='SUCCESS', changes='2,1'),
], ordered=False)

View File

@ -29,6 +29,7 @@ from zuul.lib import yamlutil as yaml
import zuul.manager.dependent
import zuul.manager.independent
import zuul.manager.supercedent
import zuul.manager.serial
from zuul.lib import encryption
from zuul.lib.keystorage import KeyStorage
from zuul.lib.logutil import get_annotated_logger
@ -1178,6 +1179,7 @@ class PipelineParser(object):
def getSchema(self):
manager = vs.Any('independent',
'dependent',
'serial',
'supercedent')
precedence = vs.Any('normal', 'low', 'high')
@ -1292,6 +1294,9 @@ class PipelineParser(object):
elif manager_name == 'independent':
manager = zuul.manager.independent.IndependentPipelineManager(
self.pcontext.scheduler, pipeline)
elif manager_name == 'serial':
manager = zuul.manager.serial.SerialPipelineManager(
self.pcontext.scheduler, pipeline)
elif manager_name == 'supercedent':
manager = zuul.manager.supercedent.SupercedentPipelineManager(
self.pcontext.scheduler, pipeline)

View File

@ -13,6 +13,7 @@
import logging
import textwrap
import urllib
from abc import ABCMeta
from zuul import exceptions
from zuul import model
@ -43,7 +44,7 @@ class StaticChangeQueueContextManager(object):
pass
class PipelineManager(object):
class PipelineManager(metaclass=ABCMeta):
"""Abstract Base Class for enqueing and processing Changes in a Pipeline"""
def __init__(self, sched, pipeline):

View File

@ -12,11 +12,10 @@
from zuul import model
from zuul.lib.logutil import get_annotated_logger
from zuul.manager import PipelineManager, StaticChangeQueueContextManager
from zuul.manager import DynamicChangeQueueContextManager
from zuul.manager.shared import SharedQueuePipelineManager
class DependentPipelineManager(PipelineManager):
class DependentPipelineManager(SharedQueuePipelineManager):
"""PipelineManager for handling interrelated Changes.
The DependentPipelineManager puts Changes that share a Pipeline
@ -29,69 +28,17 @@ class DependentPipelineManager(PipelineManager):
def __init__(self, *args, **kwargs):
super(DependentPipelineManager, self).__init__(*args, **kwargs)
def buildChangeQueues(self, layout):
self.log.debug("Building shared change queues")
change_queues = {}
tenant = self.pipeline.tenant
layout_project_configs = layout.project_configs
for project_name, project_configs in layout_project_configs.items():
(trusted, project) = tenant.getProject(project_name)
queue_name = None
project_in_pipeline = False
for project_config in layout.getAllProjectConfigs(project_name):
project_pipeline_config = project_config.pipelines.get(
self.pipeline.name)
if project_pipeline_config is None:
continue
project_in_pipeline = True
queue_name = project_pipeline_config.queue_name
if queue_name:
break
if not project_in_pipeline:
continue
if queue_name and queue_name in change_queues:
change_queue = change_queues[queue_name]
else:
p = self.pipeline
change_queue = model.ChangeQueue(
p,
window=p.window,
window_floor=p.window_floor,
window_increase_type=p.window_increase_type,
window_increase_factor=p.window_increase_factor,
window_decrease_type=p.window_decrease_type,
window_decrease_factor=p.window_decrease_factor,
name=queue_name)
if queue_name:
# If this is a named queue, keep track of it in
# case it is referenced again. Otherwise, it will
# have a name automatically generated from its
# constituent projects.
change_queues[queue_name] = change_queue
self.pipeline.addQueue(change_queue)
self.log.debug("Created queue: %s" % change_queue)
change_queue.addProject(project)
self.log.debug("Added project %s to queue: %s" %
(project, change_queue))
def getChangeQueue(self, change, event, existing=None):
log = get_annotated_logger(self.log, event)
# Ignore the existing queue, since we can always get the correct queue
# from the pipeline. This avoids enqueuing changes in a wrong queue
# e.g. during re-configuration.
queue = self.pipeline.getQueue(change.project)
if queue:
return StaticChangeQueueContextManager(queue)
else:
# There is no existing queue for this change. Create a
# dynamic one for this one change's use
change_queue = model.ChangeQueue(self.pipeline, dynamic=True)
change_queue.addProject(change.project)
self.pipeline.addQueue(change_queue)
log.debug("Dynamically created queue %s", change_queue)
return DynamicChangeQueueContextManager(change_queue)
def constructChangeQueue(self, queue_name):
p = self.pipeline
return model.ChangeQueue(
p,
window=p.window,
window_floor=p.window_floor,
window_increase_type=p.window_increase_type,
window_increase_factor=p.window_increase_factor,
window_decrease_type=p.window_decrease_type,
window_decrease_factor=p.window_decrease_factor,
name=queue_name)
def getNodePriority(self, item):
with self.getChangeQueue(item.change, item.event) as change_queue:

37
zuul/manager/serial.py Normal file
View File

@ -0,0 +1,37 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from zuul import model
from zuul.manager.shared import SharedQueuePipelineManager
class SerialPipelineManager(SharedQueuePipelineManager):
"""PipelineManager with shared queues and a window of 1"""
changes_merge = False
def constructChangeQueue(self, queue_name):
return model.ChangeQueue(
self.pipeline,
window=1,
window_floor=1,
window_increase_type='none',
window_decrease_type='none',
name=queue_name)
def dequeueItem(self, item):
super(SerialPipelineManager, self).dequeueItem(item)
# If this was a dynamic queue from a speculative change,
# remove the queue (if empty)
if item.queue.dynamic:
if not item.queue.queue:
self.pipeline.removeQueue(item.queue)

84
zuul/manager/shared.py Normal file
View File

@ -0,0 +1,84 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from abc import ABCMeta
from zuul import model
from zuul.lib.logutil import get_annotated_logger
from zuul.manager import PipelineManager, StaticChangeQueueContextManager
from zuul.manager import DynamicChangeQueueContextManager
class SharedQueuePipelineManager(PipelineManager, metaclass=ABCMeta):
"""Intermediate class that adds the shared-queue behavior.
This is not a full pipeline manager; it just adds the shared-queue
behavior to the base class and is used by the dependent and serial
managers.
"""
changes_merge = False
def buildChangeQueues(self, layout):
self.log.debug("Building shared change queues")
change_queues = {}
tenant = self.pipeline.tenant
layout_project_configs = layout.project_configs
for project_name, project_configs in layout_project_configs.items():
(trusted, project) = tenant.getProject(project_name)
queue_name = None
project_in_pipeline = False
for project_config in layout.getAllProjectConfigs(project_name):
project_pipeline_config = project_config.pipelines.get(
self.pipeline.name)
if project_pipeline_config is None:
continue
project_in_pipeline = True
queue_name = project_pipeline_config.queue_name
if queue_name:
break
if not project_in_pipeline:
continue
if queue_name and queue_name in change_queues:
change_queue = change_queues[queue_name]
else:
change_queue = self.constructChangeQueue(queue_name)
if queue_name:
# If this is a named queue, keep track of it in
# case it is referenced again. Otherwise, it will
# have a name automatically generated from its
# constituent projects.
change_queues[queue_name] = change_queue
self.pipeline.addQueue(change_queue)
self.log.debug("Created queue: %s" % change_queue)
change_queue.addProject(project)
self.log.debug("Added project %s to queue: %s" %
(project, change_queue))
def getChangeQueue(self, change, event, existing=None):
log = get_annotated_logger(self.log, event)
# Ignore the existing queue, since we can always get the correct queue
# from the pipeline. This avoids enqueuing changes in a wrong queue
# e.g. during re-configuration.
queue = self.pipeline.getQueue(change.project)
if queue:
return StaticChangeQueueContextManager(queue)
else:
# There is no existing queue for this change. Create a
# dynamic one for this one change's use
change_queue = model.ChangeQueue(self.pipeline, dynamic=True)
change_queue.addProject(change.project)
self.pipeline.addQueue(change_queue)
log.debug("Dynamically created queue %s", change_queue)
return DynamicChangeQueueContextManager(change_queue)