diff --git a/tests/unit/test_scheduler.py b/tests/unit/test_scheduler.py index 577c323948..1f1d75bb78 100644 --- a/tests/unit/test_scheduler.py +++ b/tests/unit/test_scheduler.py @@ -4445,9 +4445,14 @@ class TestScheduler(ZuulTestCase): client = zuul.rpcclient.RPCClient('127.0.0.1', self.gearman_server.port) self.addCleanup(client.shutdown) - with testtools.ExpectedException( - zuul.rpcclient.RPCFailure, - 'Change 2,1 does not belong to project "org/project1"'): + matcher = AfterPreprocessing( + str, MatchesRegex( + r'.*Change does not belong to ' + r'project "org/project1"', + re.DOTALL + ) + ) + with testtools.ExpectedException(zuul.rpcclient.RPCFailure, matcher): r = client.enqueue(tenant='tenant-one', pipeline='gate', project='org/project1', @@ -4714,8 +4719,10 @@ class TestScheduler(ZuulTestCase): client = zuul.rpcclient.RPCClient('127.0.0.1', self.gearman_server.port) self.addCleanup(client.shutdown) - with testtools.ExpectedException(zuul.rpcclient.RPCFailure, - "Invalid tenant"): + matcher = AfterPreprocessing( + str, MatchesRegex(r'.*Unknown tenant', re.DOTALL) + ) + with testtools.ExpectedException(zuul.rpcclient.RPCFailure, matcher): r = client.enqueue(tenant='tenant-foo', pipeline='gate', project='org/project', @@ -4723,8 +4730,10 @@ class TestScheduler(ZuulTestCase): change='1,1') self.assertEqual(r, False) - with testtools.ExpectedException(zuul.rpcclient.RPCFailure, - "Invalid project"): + matcher = AfterPreprocessing( + str, MatchesRegex(r'.*Unknown project', re.DOTALL) + ) + with testtools.ExpectedException(zuul.rpcclient.RPCFailure, matcher): r = client.enqueue(tenant='tenant-one', pipeline='gate', project='project-does-not-exist', @@ -4732,8 +4741,10 @@ class TestScheduler(ZuulTestCase): change='1,1') self.assertEqual(r, False) - with testtools.ExpectedException(zuul.rpcclient.RPCFailure, - "Invalid pipeline"): + matcher = AfterPreprocessing( + str, MatchesRegex(r'.*Unknown pipeline', re.DOTALL) + ) + with testtools.ExpectedException(zuul.rpcclient.RPCFailure, matcher): r = client.enqueue(tenant='tenant-one', pipeline='pipeline-does-not-exist', project='org/project', @@ -4741,8 +4752,10 @@ class TestScheduler(ZuulTestCase): change='1,1') self.assertEqual(r, False) - with testtools.ExpectedException(zuul.rpcclient.RPCFailure, - "Invalid change"): + matcher = AfterPreprocessing( + str, MatchesRegex(r'.*Unknown change', re.DOTALL) + ) + with testtools.ExpectedException(zuul.rpcclient.RPCFailure, matcher): r = client.enqueue(tenant='tenant-one', pipeline='gate', project='org/project', diff --git a/zuul/model.py b/zuul/model.py index 684266c9e4..894ca11f0f 100644 --- a/zuul/model.py +++ b/zuul/model.py @@ -3617,18 +3617,21 @@ class PromoteEvent(ManagementEvent): ) -class DequeueEvent(ManagementEvent): - """Dequeue a change from a pipeline +class ChangeManagementEvent(ManagementEvent): + """Base class for events that dequeue/enqueue changes :arg str tenant_name: the name of the tenant :arg str pipeline_name: the name of the pipeline :arg str project_name: the name of the project - :arg str change: optional, the change to dequeue - :arg str ref: optional, the ref to look for + :arg str change: optional, the change + :arg str ref: optional, the ref + :arg str oldrev: optional, the old revision + :arg str newrev: optional, the new revision """ - def __init__(self, tenant_name, pipeline_name, project_name, change, ref): - super(DequeueEvent, self).__init__() + def __init__(self, tenant_name, pipeline_name, project_name, + change=None, ref=None, oldrev=None, newrev=None): + super().__init__() self.tenant_name = tenant_name self.pipeline_name = pipeline_name self.project_name = project_name @@ -3638,9 +3641,8 @@ class DequeueEvent(ManagementEvent): else: self.change_number, self.patch_number = (None, None) self.ref = ref - # set to mock values - self.oldrev = '0000000000000000000000000000000000000000' - self.newrev = '0000000000000000000000000000000000000000' + self.oldrev = oldrev or '0000000000000000000000000000000000000000' + self.newrev = newrev or '0000000000000000000000000000000000000000' def toDict(self): d = super().toDict() @@ -3653,11 +3655,6 @@ class DequeueEvent(ManagementEvent): d["newrev"] = self.newrev return d - def updateFromDict(self, d): - super().updateFromDict(d) - self.oldrev = d.get("oldrev") - self.newrev = d.get("newrev") - @classmethod def fromDict(cls, data): event = cls( @@ -3666,32 +3663,19 @@ class DequeueEvent(ManagementEvent): data.get("project_name"), data.get("change"), data.get("ref"), + data.get("oldrev"), + data.get("newrev"), ) event.updateFromDict(data) return event -class EnqueueEvent(ManagementEvent): - """Enqueue a change into a pipeline +class DequeueEvent(ChangeManagementEvent): + """Dequeue a change from a pipeline""" - :arg TriggerEvent trigger_event: a TriggerEvent describing the - trigger, pipeline, and change to enqueue - """ - def __init__(self, trigger_event): - super(EnqueueEvent, self).__init__() - self.trigger_event = trigger_event - - def toDict(self): - d = super().toDict() - d["trigger_event"] = self.trigger_event.toDict() - return d - - @classmethod - def fromDict(cls, data): - return cls( - TriggerEvent.fromDict(data["trigger_event"]), - ) +class EnqueueEvent(ChangeManagementEvent): + """Enqueue a change into a pipeline""" class ResultEvent: diff --git a/zuul/rpclistener.py b/zuul/rpclistener.py index b5f03b4dba..55c9920ea6 100644 --- a/zuul/rpclistener.py +++ b/zuul/rpclistener.py @@ -84,84 +84,49 @@ class RPCListenerSlow(RPCListenerBase): return job.sendWorkComplete() - def _common_enqueue(self, job): - args = json.loads(job.arguments) - event = model.TriggerEvent() - event.timestamp = time.time() - errors = '' - tenant = None - project = None - pipeline = None + def _common_enqueue(self, job, args): + tenant_name = args['tenant'] + pipeline_name = args['pipeline'] + project_name = args['project'] + change = args.get('change') + ref = args.get('ref') + oldrev = args.get('oldrev') + newrev = args.get('newrev') + try: + self.sched.enqueue(tenant_name, pipeline_name, project_name, + change, ref, oldrev, newrev) + except Exception as e: + job.sendWorkException(str(e).encode('utf8')) + return - tenant = self.sched.abide.tenants.get(args['tenant']) - if tenant: - event.tenant_name = args['tenant'] - - (trusted, project) = tenant.getProject(args['project']) - if project: - event.project_hostname = project.canonical_hostname - event.project_name = project.name - else: - errors += 'Invalid project: %s\n' % (args['project'],) - - pipeline = tenant.layout.pipelines.get(args['pipeline']) - if pipeline: - event.forced_pipeline = args['pipeline'] - else: - errors += 'Invalid pipeline: %s\n' % (args['pipeline'],) - else: - errors += 'Invalid tenant: %s\n' % (args['tenant'],) - - return (args, event, errors, project) + job.sendWorkComplete() def handle_enqueue(self, job): - (args, event, errors, project) = self._common_enqueue(job) - - if not errors: - event.change_number, event.patch_number = args['change'].split(',') - try: - ch = project.source.getChange(event, refresh=True) - if ch.project.name != project.name: - errors += ('Change %s does not belong to project "%s", ' - % (args['change'], project.name)) - except Exception: - errors += 'Invalid change: %s\n' % (args['change'],) - - if errors: - job.sendWorkException(errors.encode('utf8')) - else: - self.sched.enqueue(event) - job.sendWorkComplete() + args = json.loads(job.arguments) + self._common_enqueue(job, args) def handle_enqueue_ref(self, job): - (args, event, errors, project) = self._common_enqueue(job) - - if not errors: - event.ref = args['ref'] - event.oldrev = args['oldrev'] - event.newrev = args['newrev'] - try: - int(event.oldrev, 16) - if len(event.oldrev) != 40: - errors += 'Old rev must be 40 character sha1: ' \ - '%s\n' % event.oldrev - except Exception: - errors += 'Old rev must be base16 hash: ' \ - '%s\n' % event.oldrev - try: - int(event.newrev, 16) - if len(event.newrev) != 40: - errors += 'New rev must be 40 character sha1: ' \ - '%s\n' % event.newrev - except Exception: - errors += 'New rev must be base16 hash: ' \ - '%s\n' % event.newrev + args = json.loads(job.arguments) + oldrev = args['oldrev'] + newrev = args['newrev'] + errors = '' + try: + int(oldrev, 16) + if len(oldrev) != 40: + errors += f'Old rev must be 40 character sha1: {oldrev}\n' + except Exception: + errors += f'Old rev must be base16 hash: {oldrev}\n' + try: + int(newrev, 16) + if len(newrev) != 40: + errors += f'New rev must be 40 character sha1: {newrev}\n' + except Exception: + errors += f'New rev must be base16 hash: {newrev}\n' if errors: job.sendWorkException(errors.encode('utf8')) else: - self.sched.enqueue(event) - job.sendWorkComplete() + self._common_enqueue(job, args) def handle_promote(self, job): args = json.loads(job.arguments) diff --git a/zuul/scheduler.py b/zuul/scheduler.py index 9efd47d787..34cf7e5d44 100644 --- a/zuul/scheduler.py +++ b/zuul/scheduler.py @@ -48,6 +48,7 @@ from zuul.model import ( BuildCompletedEvent, BuildPausedEvent, BuildStartedEvent, + ChangeManagementEvent, DequeueEvent, EnqueueEvent, FilesChangesCompletedEvent, @@ -573,8 +574,11 @@ class Scheduler(threading.Thread): result.wait() self.log.debug("Dequeue complete") - def enqueue(self, trigger_event): - event = EnqueueEvent(trigger_event) + def enqueue(self, tenant_name, pipeline_name, canonical_project_name, + change, ref, oldrev, newrev): + event = EnqueueEvent(tenant_name, pipeline_name, + canonical_project_name, change, ref, oldrev, + newrev) result = self.management_events.put(event) self.log.debug("Waiting for enqueue") result.wait() @@ -977,7 +981,7 @@ class Scheduler(threading.Thread): (trusted, project) = tenant.getProject(event.project_name) if project is None: raise ValueError('Unknown project %s' % event.project_name) - change = project.source.getChange(event, project) + change = project.source.getChange(event, refresh=True) if change.project.name != project.name: if event.change: item = 'Change %s' % event.change @@ -1001,13 +1005,24 @@ class Scheduler(threading.Thread): def _doEnqueueEvent(self, event): tenant = self.abide.tenants.get(event.tenant_name) - full_project_name = ('/'.join([event.project_hostname, - event.project_name])) - (trusted, project) = tenant.getProject(full_project_name) - pipeline = tenant.layout.pipelines[event.forced_pipeline] - change = project.source.getChange(event, project) + if tenant is None: + raise ValueError(f'Unknown tenant {event.tenant_name}') + pipeline = tenant.layout.pipelines.get(event.pipeline_name) + if pipeline is None: + raise ValueError(f'Unknown pipeline {event.pipeline_name}') + (trusted, project) = tenant.getProject(event.project_name) + if project is None: + raise ValueError(f'Unknown project {event.project_name}') + try: + change = project.source.getChange(event, refresh=True) + except Exception as exc: + raise ValueError('Unknown change') from exc + + if change.project.name != project.name: + raise Exception( + f'Change {change} does not belong to project "{project.name}"') self.log.debug("Event %s for change %s was directly assigned " - "to pipeline %s" % (event, change, self)) + "to pipeline %s", event, change, self) pipeline.manager.addChange(change, event, ignore_requirements=True) def _areAllBuildsComplete(self): @@ -1229,37 +1244,8 @@ class Scheduler(threading.Thread): self._doSmartReconfigureEvent(event) elif isinstance(event, TenantReconfigureEvent): self._doTenantReconfigureEvent(event) - elif isinstance(event, (PromoteEvent, DequeueEvent)): - try: - tenant = self.abide.tenants[event.tenant_name] - pipeline = tenant.layout.pipelines[event.pipeline_name] - self.pipeline_management_events[tenant.name][ - pipeline.name - ].put(event) - event_forwarded = True - except Exception: - event.exception( - "".join( - traceback.format_exception(*sys.exc_info()) - ) - ) - elif isinstance(event, EnqueueEvent): - try: - trigger_event = event.trigger_event - tenant = self.abide.tenants[trigger_event.tenant_name] - pipeline = tenant.layout.pipelines[ - trigger_event.forced_pipeline - ] - self.pipeline_management_events[tenant.name][ - pipeline.name - ].put(event) - event_forwarded = True - except Exception: - event.exception( - "".join( - traceback.format_exception(*sys.exc_info()) - ) - ) + elif isinstance(event, (PromoteEvent, ChangeManagementEvent)): + event_forwarded = self._forward_management_event(event) else: self.log.error("Unable to handle event %s", event) finally: @@ -1268,6 +1254,27 @@ class Scheduler(threading.Thread): else: self.management_events.ack(event) + def _forward_management_event(self, event): + event_forwarded = False + try: + tenant = self.abide.tenants.get(event.tenant_name) + if tenant is None: + raise ValueError(f'Unknown tenant {event.tenant_name}') + pipeline = tenant.layout.pipelines.get(event.pipeline_name) + if pipeline is None: + raise ValueError(f'Unknown pipeline {event.pipeline_name}') + self.pipeline_management_events[tenant.name][ + pipeline.name + ].put(event) + event_forwarded = True + except Exception: + event.exception( + "".join( + traceback.format_exception(*sys.exc_info()) + ) + ) + return event_forwarded + def process_management_queue(self): for tenant in self.abide.tenants.values(): for pipeline in tenant.layout.pipelines.values(): @@ -1294,7 +1301,7 @@ class Scheduler(threading.Thread): elif isinstance(event, DequeueEvent): self._doDequeueEvent(event) elif isinstance(event, EnqueueEvent): - self._doEnqueueEvent(event.trigger_event) + self._doEnqueueEvent(event) else: self.log.error("Unable to handle event %s" % event) except Exception: