Allow (de-)serialization of management events
Since management events will be stored in Zookeeper we need a way to (de-)serialze them from/to dictionaries. In the current implementation not all of the event dictionariesa are JSON serializable. The necessary changes will be made in later patches. Change-Id: Id27e0d0ebeeedacf72cab8beb80e3ef3d7de80c5
This commit is contained in:
130
zuul/model.py
130
zuul/model.py
@@ -3467,7 +3467,25 @@ class Change(Branch):
|
||||
return d
|
||||
|
||||
|
||||
class ManagementEvent:
|
||||
class AbstractEvent(abc.ABC):
|
||||
"""Base class defining the interface for all events."""
|
||||
|
||||
@abc.abstractmethod
|
||||
def toDict(self):
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def updateFromDict(self, d) -> None:
|
||||
pass
|
||||
|
||||
@classmethod
|
||||
def fromDict(cls, data):
|
||||
event = cls()
|
||||
event.updateFromDict(data)
|
||||
return event
|
||||
|
||||
|
||||
class ManagementEvent(AbstractEvent):
|
||||
"""An event that should be processed within the main queue run loop"""
|
||||
def __init__(self):
|
||||
self._wait_event = threading.Event()
|
||||
@@ -3489,6 +3507,14 @@ class ManagementEvent:
|
||||
raise exception_instance.with_traceback(traceback)
|
||||
return self._wait_event.is_set()
|
||||
|
||||
def toDict(self):
|
||||
return {
|
||||
"zuul_event_id": self.zuul_event_id,
|
||||
}
|
||||
|
||||
def updateFromDict(self, d):
|
||||
self.zuul_event_id = d.get("zuul_event_id")
|
||||
|
||||
|
||||
class ReconfigureEvent(ManagementEvent):
|
||||
"""Reconfigure the scheduler. The layout will be (re-)loaded from
|
||||
@@ -3501,6 +3527,18 @@ class ReconfigureEvent(ManagementEvent):
|
||||
self.config = config
|
||||
self.validate_tenants = validate_tenants
|
||||
|
||||
def toDict(self):
|
||||
d = super().toDict()
|
||||
# Note: config is not JSON serializable and will be removed
|
||||
# before this is serialized into ZK.
|
||||
d["config"] = self.config
|
||||
d["validate_tenants"] = self.validate_tenants
|
||||
return d
|
||||
|
||||
@classmethod
|
||||
def fromDict(cls, data):
|
||||
return cls(data.get("config"), data.get("validate_tenants"))
|
||||
|
||||
|
||||
class SmartReconfigureEvent(ManagementEvent):
|
||||
"""Reconfigure the scheduler. The layout will be (re-)loaded from
|
||||
@@ -3512,20 +3550,31 @@ class SmartReconfigureEvent(ManagementEvent):
|
||||
super().__init__()
|
||||
self.config = config
|
||||
|
||||
def toDict(self):
|
||||
d = super().toDict()
|
||||
# Note: config is not JSON serializable and will be removed
|
||||
# before this is serialized into ZK.
|
||||
d["config"] = self.config
|
||||
return d
|
||||
|
||||
@classmethod
|
||||
def fromDict(cls, data):
|
||||
return cls(data.get("config"))
|
||||
|
||||
|
||||
class TenantReconfigureEvent(ManagementEvent):
|
||||
"""Reconfigure the given tenant. The layout will be (re-)loaded from
|
||||
the path specified in the configuration.
|
||||
|
||||
:arg Tenant tenant: the tenant to reconfigure
|
||||
:arg str tenant_name: the tenant to reconfigure
|
||||
:arg Project project: if supplied, clear the cached configuration
|
||||
from this project first
|
||||
:arg Branch branch: if supplied along with project, only remove the
|
||||
configuration of the specific branch from the cache
|
||||
"""
|
||||
def __init__(self, tenant, project, branch):
|
||||
def __init__(self, tenant_name, project, branch):
|
||||
super(TenantReconfigureEvent, self).__init__()
|
||||
self.tenant_name = tenant.name
|
||||
self.tenant_name = tenant_name
|
||||
self.project_branches = set([(project, branch)])
|
||||
|
||||
def __ne__(self, other):
|
||||
@@ -3543,6 +3592,25 @@ class TenantReconfigureEvent(ManagementEvent):
|
||||
raise Exception("Can not merge events from different tenants")
|
||||
self.project_branches |= other.project_branches
|
||||
|
||||
def toDict(self):
|
||||
d = super().toDict()
|
||||
d["tenant_name"] = self.tenant_name
|
||||
d["project_branches"] = list(self.project_branches)
|
||||
return d
|
||||
|
||||
@classmethod
|
||||
def fromDict(cls, data):
|
||||
project, branch = next(iter(data["project_branches"]))
|
||||
event = cls(data.get("tenant_name"), project, branch)
|
||||
# In case the dictionary was deserialized from JSON we get
|
||||
# [[project, branch]] instead of [(project, branch]).
|
||||
# Because of that we need to make sure we have a hashable
|
||||
# project-branch tuple.
|
||||
event.project_branches = set(
|
||||
tuple(pb) for pb in data["project_branches"]
|
||||
)
|
||||
return event
|
||||
|
||||
|
||||
class PromoteEvent(ManagementEvent):
|
||||
"""Promote one or more changes to the head of the queue.
|
||||
@@ -3559,6 +3627,21 @@ class PromoteEvent(ManagementEvent):
|
||||
self.pipeline_name = pipeline_name
|
||||
self.change_ids = change_ids
|
||||
|
||||
def toDict(self):
|
||||
d = super().toDict()
|
||||
d["tenant_name"] = self.tenant_name
|
||||
d["pipeline_name"] = self.pipeline_name
|
||||
d["change_ids"] = list(self.change_ids)
|
||||
return d
|
||||
|
||||
@classmethod
|
||||
def fromDict(cls, data):
|
||||
return cls(
|
||||
data.get("tenant_name"),
|
||||
data.get("pipeline_name"),
|
||||
list(data.get("change_ids", [])),
|
||||
)
|
||||
|
||||
|
||||
class DequeueEvent(ManagementEvent):
|
||||
"""Dequeue a change from a pipeline
|
||||
@@ -3585,6 +3668,34 @@ class DequeueEvent(ManagementEvent):
|
||||
self.oldrev = '0000000000000000000000000000000000000000'
|
||||
self.newrev = '0000000000000000000000000000000000000000'
|
||||
|
||||
def toDict(self):
|
||||
d = super().toDict()
|
||||
d["tenant_name"] = self.tenant_name
|
||||
d["pipeline_name"] = self.pipeline_name
|
||||
d["project_name"] = self.project_name
|
||||
d["change"] = self.change
|
||||
d["ref"] = self.ref
|
||||
d["oldrev"] = self.oldrev
|
||||
d["newrev"] = self.newrev
|
||||
return d
|
||||
|
||||
def updateFromDict(self, d):
|
||||
super().updateFromDict(d)
|
||||
self.oldrev = d.get("oldrev")
|
||||
self.newrev = d.get("newrev")
|
||||
|
||||
@classmethod
|
||||
def fromDict(cls, data):
|
||||
event = cls(
|
||||
data.get("tenant_name"),
|
||||
data.get("pipeline_name"),
|
||||
data.get("project_name"),
|
||||
data.get("change"),
|
||||
data.get("ref"),
|
||||
)
|
||||
event.updateFromDict(data)
|
||||
return event
|
||||
|
||||
|
||||
class EnqueueEvent(ManagementEvent):
|
||||
"""Enqueue a change into a pipeline
|
||||
@@ -3597,6 +3708,17 @@ class EnqueueEvent(ManagementEvent):
|
||||
super(EnqueueEvent, self).__init__()
|
||||
self.trigger_event = trigger_event
|
||||
|
||||
def toDict(self):
|
||||
d = super().toDict()
|
||||
d["trigger_event"] = self.trigger_event
|
||||
return d
|
||||
|
||||
@classmethod
|
||||
def fromDict(cls, data):
|
||||
return cls(
|
||||
data.get("trigger_event"),
|
||||
)
|
||||
|
||||
|
||||
class ResultEvent:
|
||||
"""An event that needs to modify the pipeline state due to a
|
||||
|
||||
@@ -417,7 +417,7 @@ class Scheduler(threading.Thread):
|
||||
"%s due to event %s in project %s",
|
||||
tenant.name, event, project)
|
||||
branch = event.branch if event is not None else None
|
||||
event = TenantReconfigureEvent(tenant, project, branch)
|
||||
event = TenantReconfigureEvent(tenant.name, project, branch)
|
||||
self.management_event_queue.put(event)
|
||||
self.wake_event.set()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user