diff --git a/.gitignore b/.gitignore index 21a0a9fd3f..f516785644 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ +*.sw? *.egg *.egg-info *.pyc diff --git a/doc/source/launchers.rst b/doc/source/launchers.rst index 0a1e0e7743..c61cea8724 100644 --- a/doc/source/launchers.rst +++ b/doc/source/launchers.rst @@ -239,7 +239,7 @@ the Git plugin to prepare them, or you may chose to use a shell script instead. As an example, the OpenStack project uses the following script to prepare the workspace for its integration testing: - https://github.com/openstack-infra/devstack-gate/blob/master/devstack-vm-gate-wrap.sh + https://git.openstack.org/cgit/openstack-infra/devstack-gate/tree/devstack-vm-gate-wrap.sh Turbo Hipster Worker ~~~~~~~~~~~~~~~~~~~~ diff --git a/doc/source/zuul.rst b/doc/source/zuul.rst index 74ce3607e7..b5b8d7bf19 100644 --- a/doc/source/zuul.rst +++ b/doc/source/zuul.rst @@ -568,8 +568,8 @@ file. The first is called a *check* pipeline:: my_gerrit: verified: 1 failure: - gerrit: - my_gerrit: -1 + my_gerrit: + verified: -1 This will trigger jobs each time a new patchset (or change) is uploaded to Gerrit, and report +/-1 values to Gerrit in the @@ -704,6 +704,11 @@ each job as it builds a list from the project specification. would largely defeat the parallelization of dependent change testing that is the main feature of Zuul. Default: ``false``. +**mutex (optional)** + This is a string that names a mutex that should be observed by this + job. Only one build of any job that references the same named mutex + will be enqueued at a time. This applies across all pipelines. + **branch (optional)** This job should only be run on matching branches. This field is treated as a regular expression and multiple branches may be diff --git a/requirements.txt b/requirements.txt index f626f4c98a..5818c5f673 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,9 +1,9 @@ -pbr>=0.5.21,<1.0 +pbr>=1.1.0 argparse PyYAML>=3.1.0 Paste -WebOb>=1.2.3,<1.3 +WebOb>=1.2.3 paramiko>=1.8.0 GitPython>=0.3.3 ordereddict @@ -12,7 +12,7 @@ extras statsd>=1.0.0,<3.0 voluptuous>=0.7 gear>=0.5.7,<1.0.0 -apscheduler>=2.1.1,<3.0 +apscheduler>=3.0 PrettyTable>=0.6,<0.8 babel>=1.0 six>=1.6.0 diff --git a/tests/fixtures/layout-mutex.yaml b/tests/fixtures/layout-mutex.yaml new file mode 100644 index 0000000000..fcd052973c --- /dev/null +++ b/tests/fixtures/layout-mutex.yaml @@ -0,0 +1,25 @@ +pipelines: + - name: check + manager: IndependentPipelineManager + trigger: + gerrit: + - event: patchset-created + success: + gerrit: + verified: 1 + failure: + gerrit: + verified: -1 + +jobs: + - name: mutex-one + mutex: test-mutex + - name: mutex-two + mutex: test-mutex + +projects: + - name: org/project + check: + - project-test1 + - mutex-one + - mutex-two diff --git a/tests/fixtures/layout.yaml b/tests/fixtures/layout.yaml index e30147f7ce..757fffed8f 100644 --- a/tests/fixtures/layout.yaml +++ b/tests/fixtures/layout.yaml @@ -132,6 +132,10 @@ jobs: parameter-function: select_debian_node - name: project1-project2-integration queue-name: integration + - name: mutex-one + mutex: test-mutex + - name: mutex-two + mutex: test-mutex project-templates: - name: test-one-and-two diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py index 85ac6006ef..b56e227e24 100755 --- a/tests/test_scheduler.py +++ b/tests/test_scheduler.py @@ -2286,6 +2286,70 @@ class TestScheduler(ZuulTestCase): self.sched.reconfigure(self.config) self.assertEqual(len(self.sched.layout.pipelines['gate'].queues), 1) + def test_mutex(self): + "Test job mutexes" + self.config.set('zuul', 'layout_config', + 'tests/fixtures/layout-mutex.yaml') + self.sched.reconfigure(self.config) + + self.worker.hold_jobs_in_build = True + A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A') + B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B') + self.assertFalse('test-mutex' in self.sched.mutex.mutexes) + + self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1)) + self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1)) + self.waitUntilSettled() + self.assertEqual(len(self.builds), 3) + self.assertEqual(self.builds[0].name, 'project-test1') + self.assertEqual(self.builds[1].name, 'mutex-one') + self.assertEqual(self.builds[2].name, 'project-test1') + + self.worker.release('mutex-one') + self.waitUntilSettled() + + self.assertEqual(len(self.builds), 3) + self.assertEqual(self.builds[0].name, 'project-test1') + self.assertEqual(self.builds[1].name, 'project-test1') + self.assertEqual(self.builds[2].name, 'mutex-two') + self.assertTrue('test-mutex' in self.sched.mutex.mutexes) + + self.worker.release('mutex-two') + self.waitUntilSettled() + + self.assertEqual(len(self.builds), 3) + self.assertEqual(self.builds[0].name, 'project-test1') + self.assertEqual(self.builds[1].name, 'project-test1') + self.assertEqual(self.builds[2].name, 'mutex-one') + self.assertTrue('test-mutex' in self.sched.mutex.mutexes) + + self.worker.release('mutex-one') + self.waitUntilSettled() + + self.assertEqual(len(self.builds), 3) + self.assertEqual(self.builds[0].name, 'project-test1') + self.assertEqual(self.builds[1].name, 'project-test1') + self.assertEqual(self.builds[2].name, 'mutex-two') + self.assertTrue('test-mutex' in self.sched.mutex.mutexes) + + self.worker.release('mutex-two') + self.waitUntilSettled() + + self.assertEqual(len(self.builds), 2) + self.assertEqual(self.builds[0].name, 'project-test1') + self.assertEqual(self.builds[1].name, 'project-test1') + self.assertFalse('test-mutex' in self.sched.mutex.mutexes) + + self.worker.hold_jobs_in_build = False + self.worker.release() + + self.waitUntilSettled() + self.assertEqual(len(self.builds), 0) + + self.assertEqual(A.reported, 1) + self.assertEqual(B.reported, 1) + self.assertFalse('test-mutex' in self.sched.mutex.mutexes) + def test_node_label(self): "Test that a job runs on a specific node label" self.worker.registerFunction('build:node-project-test1:debian') @@ -2742,11 +2806,11 @@ class TestScheduler(ZuulTestCase): 'tests/fixtures/layout-idle.yaml') self.sched.reconfigure(self.config) self.registerJobs() + self.waitUntilSettled() # The pipeline triggers every second, so we should have seen # several by now. time.sleep(5) - self.waitUntilSettled() # Stop queuing timer triggered jobs so that the assertions # below don't race against more jobs being queued. @@ -2754,6 +2818,7 @@ class TestScheduler(ZuulTestCase): 'tests/fixtures/layout-no-timer.yaml') self.sched.reconfigure(self.config) self.registerJobs() + self.waitUntilSettled() self.assertEqual(len(self.builds), 2) self.worker.release('.*') @@ -3412,6 +3477,31 @@ For CI problems and help debugging, contact ci@example.org""" self.assertEqual('The merge failed! For more information...', self.smtp_messages[0]['body']) + def test_default_merge_failure_reports(self): + """Check that the default merge failure reports are correct.""" + + # A should report success, B should report merge failure. + A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A') + A.addPatchset(['conflict']) + B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B') + B.addPatchset(['conflict']) + A.addApproval('CRVW', 2) + B.addApproval('CRVW', 2) + self.fake_gerrit.addEvent(A.addApproval('APRV', 1)) + self.fake_gerrit.addEvent(B.addApproval('APRV', 1)) + self.waitUntilSettled() + + self.assertEqual(3, len(self.history)) # A jobs + self.assertEqual(A.reported, 2) + self.assertEqual(B.reported, 2) + self.assertEqual(A.data['status'], 'MERGED') + self.assertEqual(B.data['status'], 'NEW') + self.assertIn('Build succeeded', A.messages[1]) + self.assertIn('Merge Failed', B.messages[1]) + self.assertIn('automatically merged', B.messages[1]) + self.assertNotIn('logs.example.com', B.messages[1]) + self.assertNotIn('SKIPPED', B.messages[1]) + def test_swift_instructions(self): "Test that the correct swift instructions are sent to the workers" self.updateConfigLayout( diff --git a/tox.ini b/tox.ini index f9b0df1cb2..79ea939b11 100644 --- a/tox.ini +++ b/tox.ini @@ -17,9 +17,6 @@ deps = -r{toxinidir}/requirements.txt commands = python setup.py testr --slowest --testr-args='{posargs}' -[tox:jenkins] -downloadcache = ~/cache/pip - [testenv:pep8] commands = flake8 {posargs} diff --git a/zuul/configloader.py b/zuul/configloader.py index 396a015f9a..a9dbbc2910 100644 --- a/zuul/configloader.py +++ b/zuul/configloader.py @@ -60,6 +60,7 @@ class JobParser(object): 'failure-url': str, 'success-url': str, 'voting': bool, + 'mutex': str, 'branches': to_list(str), 'files': to_list(str), 'swift': to_list(swift), @@ -81,6 +82,7 @@ class JobParser(object): job.pre_run = as_list(conf.get('pre-run', job.pre_run)) job.post_run = as_list(conf.get('post-run', job.post_run)) job.voting = conf.get('voting', True) + job.mutex = conf.get('mutex', None) job.failure_message = conf.get('failure-message', job.failure_message) job.success_message = conf.get('success-message', job.success_message) diff --git a/zuul/connection/__init__.py b/zuul/connection/__init__.py index f2ea47a790..402528f084 100644 --- a/zuul/connection/__init__.py +++ b/zuul/connection/__init__.py @@ -43,6 +43,14 @@ class BaseConnection(object): self.connection_name = connection_name self.connection_config = connection_config + # Keep track of the sources, triggers and reporters using this + # connection + self.attached_to = { + 'source': [], + 'trigger': [], + 'reporter': [], + } + def onLoad(self): pass @@ -51,3 +59,6 @@ class BaseConnection(object): def registerScheduler(self, sched): self.sched = sched + + def registerUse(self, what, instance): + self.attached_to[what].append(instance) diff --git a/zuul/connection/gerrit.py b/zuul/connection/gerrit.py index a203c24820..08a6569612 100644 --- a/zuul/connection/gerrit.py +++ b/zuul/connection/gerrit.py @@ -47,7 +47,6 @@ class GerritEventConnector(threading.Thread): def _handleEvent(self): ts, data = self.connection.getEvent() if self._stopped: - self.connection.eventDone() return # Gerrit can produce inconsistent data immediately after an # event, So ensure that we do not deliver the event to Zuul @@ -99,16 +98,27 @@ class GerritEventConnector(threading.Thread): Can not get account information." % event.type) event.account = None - # TODOv3(jeblair,jhesketh): this is broken in the main branch and - # the fix needs to be merged here - # if (event.change_number and - # self.connection.sched.getProject(event.project_name)): if event.change_number: - # Mark the change as needing a refresh in the cache - event._needs_refresh = True + # TODO(jhesketh): Check if the project exists? + # and self.connection.sched.getProject(event.project_name): + # Call _getChange for the side effect of updating the + # cache. Note that this modifies Change objects outside + # the main thread. + # NOTE(jhesketh): Ideally we'd just remove the change from the + # cache to denote that it needs updating. However the change + # object is already used by Item's and hence BuildSet's etc. and + # we need to update those objects by reference so that they have + # the correct/new information and also avoid hitting gerrit + # multiple times. + if self.connection.attached_to['source']: + self.connection.attached_to['source'][0]._getChange( + event.change_number, event.patch_number, refresh=True) + # We only need to do this once since the connection maintains + # the cache (which is shared between all the sources) + # NOTE(jhesketh): We may couple sources and connections again + # at which point this becomes more sensible. self.connection.sched.addEvent(event) - self.connection.eventDone() def run(self): while True: @@ -118,6 +128,8 @@ class GerritEventConnector(threading.Thread): self._handleEvent() except: self.log.exception("Exception moving Gerrit event:") + finally: + self.connection.eventDone() class GerritWatcher(threading.Thread): diff --git a/zuul/layoutvalidator.py b/zuul/layoutvalidator.py index 0adb78cdae..6c3b0795d3 100644 --- a/zuul/layoutvalidator.py +++ b/zuul/layoutvalidator.py @@ -127,6 +127,7 @@ class LayoutSchema(object): 'success-pattern': str, 'hold-following-changes': bool, 'voting': bool, + 'mutex': str, 'parameter-function': str, 'branch': toList(str), 'files': toList(str), diff --git a/zuul/lib/connections.py b/zuul/lib/connections.py index 5f42c3ac4f..64cc3a75a3 100644 --- a/zuul/lib/connections.py +++ b/zuul/lib/connections.py @@ -71,12 +71,12 @@ class ConnectionRegistry(object): if 'gerrit' in config.sections(): connections['gerrit'] = \ zuul.connection.gerrit.GerritConnection( - '_legacy_gerrit', dict(config.items('gerrit'))) + 'gerrit', dict(config.items('gerrit'))) if 'smtp' in config.sections(): connections['smtp'] = \ zuul.connection.smtp.SMTPConnection( - '_legacy_smtp', dict(config.items('smtp'))) + 'smtp', dict(config.items('smtp'))) self.connections = connections @@ -118,6 +118,9 @@ class ConnectionRegistry(object): driver_config, self.sched, connection ) + if connection: + connection.registerUse(dtype, driver_instance) + return driver_instance def getSource(self, connection_name): diff --git a/zuul/manager/__init__.py b/zuul/manager/__init__.py index e3e0997612..ce369f125d 100644 --- a/zuul/manager/__init__.py +++ b/zuul/manager/__init__.py @@ -77,14 +77,16 @@ class BasePipelineManager(object): efilters += str(tree.job.skip_if_matcher) if efilters: efilters = ' ' + efilters - hold = '' + tags = [] if tree.job.hold_following_changes: - hold = ' [hold]' - voting = '' + tags.append('[hold]') if not tree.job.voting: - voting = ' [nonvoting]' - self.log.info("%s%s%s%s%s" % (istr, repr(tree.job), - efilters, hold, voting)) + tags.append('[nonvoting]') + if tree.job.mutex: + tags.append('[mutex: %s]' % tree.job.mutex) + tags = ' '.join(tags) + self.log.info("%s%s%s %s" % (istr, repr(tree.job), + efilters, tags)) for x in tree.job_trees: log_jobs(x, indent + 2) @@ -348,7 +350,7 @@ class BasePipelineManager(object): def launchJobs(self, item): # TODO(jeblair): This should return a value indicating a job # was launched. Appears to be a longstanding bug. - jobs = self.pipeline.findJobsToRun(item) + jobs = self.pipeline.findJobsToRun(item, self.sched.mutex) if jobs: self._launchJobs(item, jobs) @@ -474,13 +476,23 @@ class BasePipelineManager(object): def updateBuildDescriptions(self, build_set): for build in build_set.getBuilds(): - desc = self.formatDescription(build) - self.sched.launcher.setBuildDescription(build, desc) + try: + desc = self.formatDescription(build) + self.sched.launcher.setBuildDescription(build, desc) + except: + # Log the failure and let loop continue + self.log.error("Failed to update description for build %s" % + (build)) if build_set.previous_build_set: for build in build_set.previous_build_set.getBuilds(): - desc = self.formatDescription(build) - self.sched.launcher.setBuildDescription(build, desc) + try: + desc = self.formatDescription(build) + self.sched.launcher.setBuildDescription(build, desc) + except: + # Log the failure and let loop continue + self.log.error("Failed to update description for " + "build %s in previous build set" % (build)) def onBuildStarted(self, build): self.log.debug("Build %s started" % build) @@ -491,6 +503,7 @@ class BasePipelineManager(object): item = build.build_set.item self.pipeline.setResult(item, build) + self.sched.mutex.release(item, build.job) self.log.debug("Item %s status is now:\n %s" % (item, item.formatStatus())) return True @@ -503,9 +516,9 @@ class BasePipelineManager(object): if event.merged: build_set.commit = event.commit elif event.updated: - if not isinstance(item, NullChange): + if not isinstance(item.change, NullChange): build_set.commit = item.change.newrev - if not build_set.commit: + if not build_set.commit and not isinstance(item.change, NullChange): self.log.info("Unable to merge change %s" % item.change) self.pipeline.setUnableToMerge(item) diff --git a/zuul/model.py b/zuul/model.py index 22d19b4f0a..2571dc0ab2 100644 --- a/zuul/model.py +++ b/zuul/model.py @@ -146,7 +146,7 @@ class Pipeline(object): return [] return item.change.filterJobs(tree.getJobs()) - def _findJobsToRun(self, job_trees, item): + def _findJobsToRun(self, job_trees, item, mutex): torun = [] for tree in job_trees: job = tree.job @@ -160,20 +160,23 @@ class Pipeline(object): else: # There is no build for the root of this job tree, # so we should run it. - torun.append(job) + if mutex.acquire(item, job): + # If this job needs a mutex, either acquire it or make + # sure that we have it before running the job. + torun.append(job) # If there is no job, this is a null job tree, and we should # run all of its jobs. if result == 'SUCCESS' or not job: - torun.extend(self._findJobsToRun(tree.job_trees, item)) + torun.extend(self._findJobsToRun(tree.job_trees, item, mutex)) return torun - def findJobsToRun(self, item): + def findJobsToRun(self, item, mutex): if not item.live: return [] tree = item.job_tree if not tree: return [] - return self._findJobsToRun(tree.job_trees, item) + return self._findJobsToRun(tree.job_trees, item, mutex) def haveAllJobsStarted(self, item): for job in self.getJobs(item): @@ -464,6 +467,7 @@ class Job(object): swift=None, # TODOv3(jeblair): move to auth parameter_function=None, # TODOv3(jeblair): remove success_pattern=None, # TODOv3(jeblair): remove + mutex=None, ) def __init__(self, name): @@ -1051,9 +1055,6 @@ class TriggerEvent(object): # an admin command, etc): self.forced_pipeline = None - # Internal mechanism to track if the change needs a refresh from cache - self._needs_refresh = False - def __repr__(self): ret = '