Move management and result events to model

As preparation for storing trigger, management and result events in
Zookeeper we need to move them to model.py to avoid circular imports.

Change-Id: I807b8cc2fdb499c25ec5cbf702ab2b14b094df62
This commit is contained in:
Simon Westphahl 2020-10-30 09:20:11 +01:00
parent 4bb45bf2a0
commit a2f84dace0
3 changed files with 237 additions and 218 deletions

View File

@ -47,7 +47,7 @@ from zuul.lib.logutil import get_annotated_logger
from zuul.model import Ref, Branch, Tag, Project
from zuul.exceptions import MergeFailure
from zuul.driver.github.githubmodel import PullRequest, GithubTriggerEvent
from zuul.scheduler import DequeueEvent
from zuul.model import DequeueEvent
GITHUB_BASE_URL = 'https://api.github.com'
PREVIEW_JSON_ACCEPT = 'application/vnd.github.machine-man-preview+json'

View File

@ -23,6 +23,7 @@ from itertools import chain
import re2
import struct
import threading
import time
from uuid import uuid4
import urllib.parse
@ -3466,6 +3467,221 @@ class Change(Branch):
return d
class ManagementEvent:
"""An event that should be processed within the main queue run loop"""
def __init__(self):
self._wait_event = threading.Event()
self._exc_info = None
self.zuul_event_id = None
def exception(self, exc_info):
self._exc_info = exc_info
self._wait_event.set()
def done(self):
self._wait_event.set()
def wait(self, timeout=None):
self._wait_event.wait(timeout)
if self._exc_info:
# sys.exc_info returns (type, value, traceback)
type_, exception_instance, traceback = self._exc_info
raise exception_instance.with_traceback(traceback)
return self._wait_event.is_set()
class ReconfigureEvent(ManagementEvent):
"""Reconfigure the scheduler. The layout will be (re-)loaded from
the path specified in the configuration.
:arg ConfigParser config: the new configuration
"""
def __init__(self, config, validate_tenants=None):
super(ReconfigureEvent, self).__init__()
self.config = config
self.validate_tenants = validate_tenants
class SmartReconfigureEvent(ManagementEvent):
"""Reconfigure the scheduler. The layout will be (re-)loaded from
the path specified in the configuration.
:arg ConfigParser config: the new configuration
"""
def __init__(self, config, smart=False):
super().__init__()
self.config = config
class TenantReconfigureEvent(ManagementEvent):
"""Reconfigure the given tenant. The layout will be (re-)loaded from
the path specified in the configuration.
:arg Tenant tenant: the tenant to reconfigure
:arg Project project: if supplied, clear the cached configuration
from this project first
:arg Branch branch: if supplied along with project, only remove the
configuration of the specific branch from the cache
"""
def __init__(self, tenant, project, branch):
super(TenantReconfigureEvent, self).__init__()
self.tenant_name = tenant.name
self.project_branches = set([(project, branch)])
def __ne__(self, other):
return not self.__eq__(other)
def __eq__(self, other):
if not isinstance(other, TenantReconfigureEvent):
return False
# We don't check projects because they will get combined when
# merged.
return (self.tenant_name == other.tenant_name)
def merge(self, other):
if self.tenant_name != other.tenant_name:
raise Exception("Can not merge events from different tenants")
self.project_branches |= other.project_branches
class PromoteEvent(ManagementEvent):
"""Promote one or more changes to the head of the queue.
:arg str tenant_name: the name of the tenant
:arg str pipeline_name: the name of the pipeline
:arg list change_ids: a list of strings of change ids in the form
1234,1
"""
def __init__(self, tenant_name, pipeline_name, change_ids):
super(PromoteEvent, self).__init__()
self.tenant_name = tenant_name
self.pipeline_name = pipeline_name
self.change_ids = change_ids
class DequeueEvent(ManagementEvent):
"""Dequeue a change from a pipeline
:arg str tenant_name: the name of the tenant
:arg str pipeline_name: the name of the pipeline
:arg str project_name: the name of the project
:arg str change: optional, the change to dequeue
:arg str ref: optional, the ref to look for
"""
def __init__(self, tenant_name, pipeline_name, project_name, change, ref):
super(DequeueEvent, self).__init__()
self.tenant_name = tenant_name
self.pipeline_name = pipeline_name
self.project_name = project_name
self.change = change
if change is not None:
self.change_number, self.patch_number = change.split(',')
else:
self.change_number, self.patch_number = (None, None)
self.ref = ref
# set to mock values
self.oldrev = '0000000000000000000000000000000000000000'
self.newrev = '0000000000000000000000000000000000000000'
class EnqueueEvent(ManagementEvent):
"""Enqueue a change into a pipeline
:arg TriggerEvent trigger_event: a TriggerEvent describing the
trigger, pipeline, and change to enqueue
"""
def __init__(self, trigger_event):
super(EnqueueEvent, self).__init__()
self.trigger_event = trigger_event
class ResultEvent:
"""An event that needs to modify the pipeline state due to a
result from an external system."""
pass
class BuildStartedEvent(ResultEvent):
"""A build has started.
:arg Build build: The build which has started.
"""
def __init__(self, build):
self.build = build
class BuildPausedEvent(ResultEvent):
"""A build has been paused.
:arg Build build: The build which has been paused.
"""
def __init__(self, build):
self.build = build
class BuildCompletedEvent(ResultEvent):
"""A build has completed
:arg Build build: The build which has completed.
"""
def __init__(self, build, result):
self.build = build
self.result = result
class MergeCompletedEvent(ResultEvent):
"""A remote merge operation has completed
:arg BuildSet build_set: The build_set which is ready.
:arg bool merged: Whether the merge succeeded (changes with refs).
:arg bool updated: Whether the repo was updated (changes without refs).
:arg str commit: The SHA of the merged commit (changes with refs).
:arg dict repo_state: The starting repo state before the merge.
:arg list item_in_branches: A list of branches in which the final
commit in the merge list appears (changes without refs).
"""
def __init__(self, build_set, merged, updated, commit,
files, repo_state, item_in_branches):
self.build_set = build_set
self.merged = merged
self.updated = updated
self.commit = commit
self.files = files
self.repo_state = repo_state
self.item_in_branches = item_in_branches
class FilesChangesCompletedEvent(ResultEvent):
"""A remote fileschanges operation has completed
:arg BuildSet build_set: The build_set which is ready.
:arg list files: List of files changed.
"""
def __init__(self, build_set, files):
self.build_set = build_set
self.files = files
class NodesProvisionedEvent(ResultEvent):
"""Nodes have been provisioned for a build_set
:arg BuildSet build_set: The build_set which has nodes.
:arg list of Node objects nodes: The provisioned nodes
"""
def __init__(self, request):
self.request = request
self.request_id = request.id
class TriggerEvent(object):
"""Incoming event from an external system."""
def __init__(self):

View File

@ -44,7 +44,26 @@ import zuul.lib.repl
from zuul import nodepool
from zuul.executor.client import ExecutorClient
from zuul.merger.client import MergeClient
from zuul.model import Build, HoldRequest, Tenant, TriggerEvent
from zuul.model import (
Build,
BuildCompletedEvent,
BuildPausedEvent,
BuildStartedEvent,
DequeueEvent,
EnqueueEvent,
FilesChangesCompletedEvent,
HoldRequest,
ManagementEvent,
MergeCompletedEvent,
NodesProvisionedEvent,
PromoteEvent,
ReconfigureEvent,
ResultEvent,
SmartReconfigureEvent,
Tenant,
TenantReconfigureEvent,
TriggerEvent,
)
from zuul.zk import ZooKeeperClient
from zuul.zk.components import ZooKeeperComponentRegistry
from zuul.zk.nodepool import ZooKeeperNodepool
@ -52,222 +71,6 @@ from zuul.zk.nodepool import ZooKeeperNodepool
COMMANDS = ['full-reconfigure', 'smart-reconfigure', 'stop', 'repl', 'norepl']
class ManagementEvent(object):
"""An event that should be processed within the main queue run loop"""
def __init__(self):
self._wait_event = threading.Event()
self._exc_info = None
self.zuul_event_id = None
def exception(self, exc_info):
self._exc_info = exc_info
self._wait_event.set()
def done(self):
self._wait_event.set()
def wait(self, timeout=None):
self._wait_event.wait(timeout)
if self._exc_info:
# sys.exc_info returns (type, value, traceback)
type_, exception_instance, traceback = self._exc_info
raise exception_instance.with_traceback(traceback)
return self._wait_event.is_set()
class ReconfigureEvent(ManagementEvent):
"""Reconfigure the scheduler. The layout will be (re-)loaded from
the path specified in the configuration.
:arg ConfigParser config: the new configuration
"""
def __init__(self, config, validate_tenants=None):
super(ReconfigureEvent, self).__init__()
self.config = config
self.validate_tenants = validate_tenants
class SmartReconfigureEvent(ManagementEvent):
"""Reconfigure the scheduler. The layout will be (re-)loaded from
the path specified in the configuration.
:arg ConfigParser config: the new configuration
"""
def __init__(self, config, smart=False):
super().__init__()
self.config = config
class TenantReconfigureEvent(ManagementEvent):
"""Reconfigure the given tenant. The layout will be (re-)loaded from
the path specified in the configuration.
:arg Tenant tenant: the tenant to reconfigure
:arg Project project: if supplied, clear the cached configuration
from this project first
:arg Branch branch: if supplied along with project, only remove the
configuration of the specific branch from the cache
"""
def __init__(self, tenant, project, branch):
super(TenantReconfigureEvent, self).__init__()
self.tenant_name = tenant.name
self.project_branches = set([(project, branch)])
def __ne__(self, other):
return not self.__eq__(other)
def __eq__(self, other):
if not isinstance(other, TenantReconfigureEvent):
return False
# We don't check projects because they will get combined when
# merged.
return (self.tenant_name == other.tenant_name)
def merge(self, other):
if self.tenant_name != other.tenant_name:
raise Exception("Can not merge events from different tenants")
self.project_branches |= other.project_branches
class PromoteEvent(ManagementEvent):
"""Promote one or more changes to the head of the queue.
:arg str tenant_name: the name of the tenant
:arg str pipeline_name: the name of the pipeline
:arg list change_ids: a list of strings of change ids in the form
1234,1
"""
def __init__(self, tenant_name, pipeline_name, change_ids):
super(PromoteEvent, self).__init__()
self.tenant_name = tenant_name
self.pipeline_name = pipeline_name
self.change_ids = change_ids
class DequeueEvent(ManagementEvent):
"""Dequeue a change from a pipeline
:arg str tenant_name: the name of the tenant
:arg str pipeline_name: the name of the pipeline
:arg str project_name: the name of the project
:arg str change: optional, the change to dequeue
:arg str ref: optional, the ref to look for
"""
def __init__(self, tenant_name, pipeline_name, project_name, change, ref):
super(DequeueEvent, self).__init__()
self.tenant_name = tenant_name
self.pipeline_name = pipeline_name
self.project_name = project_name
self.change = change
if change is not None:
self.change_number, self.patch_number = change.split(',')
else:
self.change_number, self.patch_number = (None, None)
self.ref = ref
# set to mock values
self.oldrev = '0000000000000000000000000000000000000000'
self.newrev = '0000000000000000000000000000000000000000'
class EnqueueEvent(ManagementEvent):
"""Enqueue a change into a pipeline
:arg TriggerEvent trigger_event: a TriggerEvent describing the
trigger, pipeline, and change to enqueue
"""
def __init__(self, trigger_event):
super(EnqueueEvent, self).__init__()
self.trigger_event = trigger_event
class ResultEvent(object):
"""An event that needs to modify the pipeline state due to a
result from an external system."""
pass
class BuildStartedEvent(ResultEvent):
"""A build has started.
:arg Build build: The build which has started.
"""
def __init__(self, build):
self.build = build
class BuildPausedEvent(ResultEvent):
"""A build has been paused.
:arg Build build: The build which has been paused.
"""
def __init__(self, build):
self.build = build
class BuildCompletedEvent(ResultEvent):
"""A build has completed
:arg Build build: The build which has completed.
"""
def __init__(self, build, result):
self.build = build
self.result = result
class MergeCompletedEvent(ResultEvent):
"""A remote merge operation has completed
:arg BuildSet build_set: The build_set which is ready.
:arg bool merged: Whether the merge succeeded (changes with refs).
:arg bool updated: Whether the repo was updated (changes without refs).
:arg str commit: The SHA of the merged commit (changes with refs).
:arg dict repo_state: The starting repo state before the merge.
:arg list item_in_branches: A list of branches in which the final
commit in the merge list appears (changes without refs).
"""
def __init__(self, build_set, merged, updated, commit,
files, repo_state, item_in_branches):
self.build_set = build_set
self.merged = merged
self.updated = updated
self.commit = commit
self.files = files
self.repo_state = repo_state
self.item_in_branches = item_in_branches
class FilesChangesCompletedEvent(ResultEvent):
"""A remote fileschanges operation has completed
:arg BuildSet build_set: The build_set which is ready.
:arg list files: List of files changed.
"""
def __init__(self, build_set, files):
self.build_set = build_set
self.files = files
class NodesProvisionedEvent(ResultEvent):
"""Nodes have been provisioned for a build_set
:arg BuildSet build_set: The build_set which has nodes.
:arg list of Node objects nodes: The provisioned nodes
"""
def __init__(self, request):
self.request = request
self.request_id = request.id
class Scheduler(threading.Thread):
"""The engine of Zuul.