Merge "Unify handling of dequeue and enqueue events"

This commit is contained in:
Zuul 2021-03-24 22:30:25 +00:00 committed by Gerrit Code Review
commit 6f989bddb5
4 changed files with 123 additions and 154 deletions

View File

@ -4445,9 +4445,14 @@ class TestScheduler(ZuulTestCase):
client = zuul.rpcclient.RPCClient('127.0.0.1',
self.gearman_server.port)
self.addCleanup(client.shutdown)
with testtools.ExpectedException(
zuul.rpcclient.RPCFailure,
'Change 2,1 does not belong to project "org/project1"'):
matcher = AfterPreprocessing(
str, MatchesRegex(
r'.*Change <Change \w+ org/project2 2,1> does not belong to '
r'project "org/project1"',
re.DOTALL
)
)
with testtools.ExpectedException(zuul.rpcclient.RPCFailure, matcher):
r = client.enqueue(tenant='tenant-one',
pipeline='gate',
project='org/project1',
@ -4714,8 +4719,10 @@ class TestScheduler(ZuulTestCase):
client = zuul.rpcclient.RPCClient('127.0.0.1',
self.gearman_server.port)
self.addCleanup(client.shutdown)
with testtools.ExpectedException(zuul.rpcclient.RPCFailure,
"Invalid tenant"):
matcher = AfterPreprocessing(
str, MatchesRegex(r'.*Unknown tenant', re.DOTALL)
)
with testtools.ExpectedException(zuul.rpcclient.RPCFailure, matcher):
r = client.enqueue(tenant='tenant-foo',
pipeline='gate',
project='org/project',
@ -4723,8 +4730,10 @@ class TestScheduler(ZuulTestCase):
change='1,1')
self.assertEqual(r, False)
with testtools.ExpectedException(zuul.rpcclient.RPCFailure,
"Invalid project"):
matcher = AfterPreprocessing(
str, MatchesRegex(r'.*Unknown project', re.DOTALL)
)
with testtools.ExpectedException(zuul.rpcclient.RPCFailure, matcher):
r = client.enqueue(tenant='tenant-one',
pipeline='gate',
project='project-does-not-exist',
@ -4732,8 +4741,10 @@ class TestScheduler(ZuulTestCase):
change='1,1')
self.assertEqual(r, False)
with testtools.ExpectedException(zuul.rpcclient.RPCFailure,
"Invalid pipeline"):
matcher = AfterPreprocessing(
str, MatchesRegex(r'.*Unknown pipeline', re.DOTALL)
)
with testtools.ExpectedException(zuul.rpcclient.RPCFailure, matcher):
r = client.enqueue(tenant='tenant-one',
pipeline='pipeline-does-not-exist',
project='org/project',
@ -4741,8 +4752,10 @@ class TestScheduler(ZuulTestCase):
change='1,1')
self.assertEqual(r, False)
with testtools.ExpectedException(zuul.rpcclient.RPCFailure,
"Invalid change"):
matcher = AfterPreprocessing(
str, MatchesRegex(r'.*Unknown change', re.DOTALL)
)
with testtools.ExpectedException(zuul.rpcclient.RPCFailure, matcher):
r = client.enqueue(tenant='tenant-one',
pipeline='gate',
project='org/project',

View File

@ -3617,18 +3617,21 @@ class PromoteEvent(ManagementEvent):
)
class DequeueEvent(ManagementEvent):
"""Dequeue a change from a pipeline
class ChangeManagementEvent(ManagementEvent):
"""Base class for events that dequeue/enqueue changes
:arg str tenant_name: the name of the tenant
:arg str pipeline_name: the name of the pipeline
:arg str project_name: the name of the project
:arg str change: optional, the change to dequeue
:arg str ref: optional, the ref to look for
:arg str change: optional, the change
:arg str ref: optional, the ref
:arg str oldrev: optional, the old revision
:arg str newrev: optional, the new revision
"""
def __init__(self, tenant_name, pipeline_name, project_name, change, ref):
super(DequeueEvent, self).__init__()
def __init__(self, tenant_name, pipeline_name, project_name,
change=None, ref=None, oldrev=None, newrev=None):
super().__init__()
self.tenant_name = tenant_name
self.pipeline_name = pipeline_name
self.project_name = project_name
@ -3638,9 +3641,8 @@ class DequeueEvent(ManagementEvent):
else:
self.change_number, self.patch_number = (None, None)
self.ref = ref
# set to mock values
self.oldrev = '0000000000000000000000000000000000000000'
self.newrev = '0000000000000000000000000000000000000000'
self.oldrev = oldrev or '0000000000000000000000000000000000000000'
self.newrev = newrev or '0000000000000000000000000000000000000000'
def toDict(self):
d = super().toDict()
@ -3653,11 +3655,6 @@ class DequeueEvent(ManagementEvent):
d["newrev"] = self.newrev
return d
def updateFromDict(self, d):
super().updateFromDict(d)
self.oldrev = d.get("oldrev")
self.newrev = d.get("newrev")
@classmethod
def fromDict(cls, data):
event = cls(
@ -3666,32 +3663,19 @@ class DequeueEvent(ManagementEvent):
data.get("project_name"),
data.get("change"),
data.get("ref"),
data.get("oldrev"),
data.get("newrev"),
)
event.updateFromDict(data)
return event
class EnqueueEvent(ManagementEvent):
"""Enqueue a change into a pipeline
class DequeueEvent(ChangeManagementEvent):
"""Dequeue a change from a pipeline"""
:arg TriggerEvent trigger_event: a TriggerEvent describing the
trigger, pipeline, and change to enqueue
"""
def __init__(self, trigger_event):
super(EnqueueEvent, self).__init__()
self.trigger_event = trigger_event
def toDict(self):
d = super().toDict()
d["trigger_event"] = self.trigger_event.toDict()
return d
@classmethod
def fromDict(cls, data):
return cls(
TriggerEvent.fromDict(data["trigger_event"]),
)
class EnqueueEvent(ChangeManagementEvent):
"""Enqueue a change into a pipeline"""
class ResultEvent:

View File

@ -84,84 +84,49 @@ class RPCListenerSlow(RPCListenerBase):
return
job.sendWorkComplete()
def _common_enqueue(self, job):
args = json.loads(job.arguments)
event = model.TriggerEvent()
event.timestamp = time.time()
errors = ''
tenant = None
project = None
pipeline = None
def _common_enqueue(self, job, args):
tenant_name = args['tenant']
pipeline_name = args['pipeline']
project_name = args['project']
change = args.get('change')
ref = args.get('ref')
oldrev = args.get('oldrev')
newrev = args.get('newrev')
try:
self.sched.enqueue(tenant_name, pipeline_name, project_name,
change, ref, oldrev, newrev)
except Exception as e:
job.sendWorkException(str(e).encode('utf8'))
return
tenant = self.sched.abide.tenants.get(args['tenant'])
if tenant:
event.tenant_name = args['tenant']
(trusted, project) = tenant.getProject(args['project'])
if project:
event.project_hostname = project.canonical_hostname
event.project_name = project.name
else:
errors += 'Invalid project: %s\n' % (args['project'],)
pipeline = tenant.layout.pipelines.get(args['pipeline'])
if pipeline:
event.forced_pipeline = args['pipeline']
else:
errors += 'Invalid pipeline: %s\n' % (args['pipeline'],)
else:
errors += 'Invalid tenant: %s\n' % (args['tenant'],)
return (args, event, errors, project)
job.sendWorkComplete()
def handle_enqueue(self, job):
(args, event, errors, project) = self._common_enqueue(job)
if not errors:
event.change_number, event.patch_number = args['change'].split(',')
try:
ch = project.source.getChange(event, refresh=True)
if ch.project.name != project.name:
errors += ('Change %s does not belong to project "%s", '
% (args['change'], project.name))
except Exception:
errors += 'Invalid change: %s\n' % (args['change'],)
if errors:
job.sendWorkException(errors.encode('utf8'))
else:
self.sched.enqueue(event)
job.sendWorkComplete()
args = json.loads(job.arguments)
self._common_enqueue(job, args)
def handle_enqueue_ref(self, job):
(args, event, errors, project) = self._common_enqueue(job)
if not errors:
event.ref = args['ref']
event.oldrev = args['oldrev']
event.newrev = args['newrev']
try:
int(event.oldrev, 16)
if len(event.oldrev) != 40:
errors += 'Old rev must be 40 character sha1: ' \
'%s\n' % event.oldrev
except Exception:
errors += 'Old rev must be base16 hash: ' \
'%s\n' % event.oldrev
try:
int(event.newrev, 16)
if len(event.newrev) != 40:
errors += 'New rev must be 40 character sha1: ' \
'%s\n' % event.newrev
except Exception:
errors += 'New rev must be base16 hash: ' \
'%s\n' % event.newrev
args = json.loads(job.arguments)
oldrev = args['oldrev']
newrev = args['newrev']
errors = ''
try:
int(oldrev, 16)
if len(oldrev) != 40:
errors += f'Old rev must be 40 character sha1: {oldrev}\n'
except Exception:
errors += f'Old rev must be base16 hash: {oldrev}\n'
try:
int(newrev, 16)
if len(newrev) != 40:
errors += f'New rev must be 40 character sha1: {newrev}\n'
except Exception:
errors += f'New rev must be base16 hash: {newrev}\n'
if errors:
job.sendWorkException(errors.encode('utf8'))
else:
self.sched.enqueue(event)
job.sendWorkComplete()
self._common_enqueue(job, args)
def handle_promote(self, job):
args = json.loads(job.arguments)

View File

@ -48,6 +48,7 @@ from zuul.model import (
BuildCompletedEvent,
BuildPausedEvent,
BuildStartedEvent,
ChangeManagementEvent,
DequeueEvent,
EnqueueEvent,
FilesChangesCompletedEvent,
@ -573,8 +574,11 @@ class Scheduler(threading.Thread):
result.wait()
self.log.debug("Dequeue complete")
def enqueue(self, trigger_event):
event = EnqueueEvent(trigger_event)
def enqueue(self, tenant_name, pipeline_name, canonical_project_name,
change, ref, oldrev, newrev):
event = EnqueueEvent(tenant_name, pipeline_name,
canonical_project_name, change, ref, oldrev,
newrev)
result = self.management_events.put(event)
self.log.debug("Waiting for enqueue")
result.wait()
@ -977,7 +981,7 @@ class Scheduler(threading.Thread):
(trusted, project) = tenant.getProject(event.project_name)
if project is None:
raise ValueError('Unknown project %s' % event.project_name)
change = project.source.getChange(event, project)
change = project.source.getChange(event, refresh=True)
if change.project.name != project.name:
if event.change:
item = 'Change %s' % event.change
@ -1001,13 +1005,24 @@ class Scheduler(threading.Thread):
def _doEnqueueEvent(self, event):
tenant = self.abide.tenants.get(event.tenant_name)
full_project_name = ('/'.join([event.project_hostname,
event.project_name]))
(trusted, project) = tenant.getProject(full_project_name)
pipeline = tenant.layout.pipelines[event.forced_pipeline]
change = project.source.getChange(event, project)
if tenant is None:
raise ValueError(f'Unknown tenant {event.tenant_name}')
pipeline = tenant.layout.pipelines.get(event.pipeline_name)
if pipeline is None:
raise ValueError(f'Unknown pipeline {event.pipeline_name}')
(trusted, project) = tenant.getProject(event.project_name)
if project is None:
raise ValueError(f'Unknown project {event.project_name}')
try:
change = project.source.getChange(event, refresh=True)
except Exception as exc:
raise ValueError('Unknown change') from exc
if change.project.name != project.name:
raise Exception(
f'Change {change} does not belong to project "{project.name}"')
self.log.debug("Event %s for change %s was directly assigned "
"to pipeline %s" % (event, change, self))
"to pipeline %s", event, change, self)
pipeline.manager.addChange(change, event, ignore_requirements=True)
def _areAllBuildsComplete(self):
@ -1229,37 +1244,8 @@ class Scheduler(threading.Thread):
self._doSmartReconfigureEvent(event)
elif isinstance(event, TenantReconfigureEvent):
self._doTenantReconfigureEvent(event)
elif isinstance(event, (PromoteEvent, DequeueEvent)):
try:
tenant = self.abide.tenants[event.tenant_name]
pipeline = tenant.layout.pipelines[event.pipeline_name]
self.pipeline_management_events[tenant.name][
pipeline.name
].put(event)
event_forwarded = True
except Exception:
event.exception(
"".join(
traceback.format_exception(*sys.exc_info())
)
)
elif isinstance(event, EnqueueEvent):
try:
trigger_event = event.trigger_event
tenant = self.abide.tenants[trigger_event.tenant_name]
pipeline = tenant.layout.pipelines[
trigger_event.forced_pipeline
]
self.pipeline_management_events[tenant.name][
pipeline.name
].put(event)
event_forwarded = True
except Exception:
event.exception(
"".join(
traceback.format_exception(*sys.exc_info())
)
)
elif isinstance(event, (PromoteEvent, ChangeManagementEvent)):
event_forwarded = self._forward_management_event(event)
else:
self.log.error("Unable to handle event %s", event)
finally:
@ -1268,6 +1254,27 @@ class Scheduler(threading.Thread):
else:
self.management_events.ack(event)
def _forward_management_event(self, event):
event_forwarded = False
try:
tenant = self.abide.tenants.get(event.tenant_name)
if tenant is None:
raise ValueError(f'Unknown tenant {event.tenant_name}')
pipeline = tenant.layout.pipelines.get(event.pipeline_name)
if pipeline is None:
raise ValueError(f'Unknown pipeline {event.pipeline_name}')
self.pipeline_management_events[tenant.name][
pipeline.name
].put(event)
event_forwarded = True
except Exception:
event.exception(
"".join(
traceback.format_exception(*sys.exc_info())
)
)
return event_forwarded
def process_management_queue(self):
for tenant in self.abide.tenants.values():
for pipeline in tenant.layout.pipelines.values():
@ -1294,7 +1301,7 @@ class Scheduler(threading.Thread):
elif isinstance(event, DequeueEvent):
self._doDequeueEvent(event)
elif isinstance(event, EnqueueEvent):
self._doEnqueueEvent(event.trigger_event)
self._doEnqueueEvent(event)
else:
self.log.error("Unable to handle event %s" % event)
except Exception: