Merge "Switch to Zookeeper backed trigger event queues"
This commit is contained in:
@@ -258,7 +258,9 @@ class GerritEventConnector(threading.Thread):
|
||||
|
||||
self._getChange(event)
|
||||
self.connection.logEvent(event)
|
||||
self.connection.sched.addEvent(event)
|
||||
self.connection.sched.addTriggerEvent(
|
||||
self.connection.driver_name, event
|
||||
)
|
||||
|
||||
def _getChange(self, event):
|
||||
# Grab the change if we are managing the project or if it exists in the
|
||||
|
||||
@@ -151,7 +151,7 @@ class GitConnection(BaseConnection):
|
||||
self.getChange(event)
|
||||
self.logEvent(event)
|
||||
# Pass the event to the scheduler
|
||||
self.sched.addEvent(event)
|
||||
self.sched.addTriggerEvent(self.driver_name, event)
|
||||
|
||||
def onLoad(self):
|
||||
self.log.debug("Starting Git Watcher")
|
||||
|
||||
@@ -695,13 +695,7 @@ class GithubEventProcessor(object):
|
||||
return event
|
||||
|
||||
def _get_sender(self, body):
|
||||
login = body.get('sender').get('login')
|
||||
if login:
|
||||
# TODO(tobiash): it might be better to plumb in the installation id
|
||||
project = body.get('repository', {}).get('full_name')
|
||||
user = self.connection.getUser(login, project)
|
||||
self.log.debug("Got user %s", user)
|
||||
return user
|
||||
return body.get('sender').get('login')
|
||||
|
||||
|
||||
class GithubEventConnector:
|
||||
@@ -757,9 +751,15 @@ class GithubEventConnector:
|
||||
if future is None:
|
||||
return
|
||||
event = future.result()
|
||||
if event:
|
||||
self.connection.logEvent(event)
|
||||
self.connection.sched.addEvent(event)
|
||||
if not event:
|
||||
return
|
||||
self.connection.logEvent(event)
|
||||
if isinstance(event, DequeueEvent):
|
||||
self.connection.sched.addManagementEvent(event)
|
||||
else:
|
||||
self.connection.sched.addTriggerEvent(
|
||||
self.connection.driver_name, event
|
||||
)
|
||||
except Exception:
|
||||
self.log.exception("Exception moving GitHub event:")
|
||||
finally:
|
||||
|
||||
@@ -146,7 +146,7 @@ class GitlabEventConnector(threading.Thread):
|
||||
label["title"] for
|
||||
label in body["changes"]["labels"]["current"]]
|
||||
new_labels = set(current_labels) - set(previous_labels)
|
||||
event.labels = new_labels
|
||||
event.labels = list(new_labels)
|
||||
elif attrs['action'] in ('approved', 'unapproved'):
|
||||
event.action = attrs['action']
|
||||
else:
|
||||
@@ -237,7 +237,9 @@ class GitlabEventConnector(threading.Thread):
|
||||
self.connection.checkBranchCache(event.project_name, event)
|
||||
|
||||
self.connection.logEvent(event)
|
||||
self.connection.sched.addEvent(event)
|
||||
self.connection.sched.addTriggerEvent(
|
||||
self.connection.driver_name, event
|
||||
)
|
||||
|
||||
def run(self):
|
||||
while True:
|
||||
|
||||
@@ -88,7 +88,7 @@ class GitlabTriggerEvent(TriggerEvent):
|
||||
r = [super(GitlabTriggerEvent, self)._repr()]
|
||||
if self.action:
|
||||
r.append("action:%s" % self.action)
|
||||
r.append("project:%s" % self.canonical_project_name)
|
||||
r.append("project:%s" % self.project_name)
|
||||
if self.change_number:
|
||||
r.append("mr:%s" % self.change_number)
|
||||
if self.labels:
|
||||
|
||||
@@ -255,7 +255,9 @@ class PagureEventConnector(threading.Thread):
|
||||
event=event)
|
||||
event.project_hostname = self.connection.canonical_hostname
|
||||
self.connection.logEvent(event)
|
||||
self.connection.sched.addEvent(event)
|
||||
self.connection.sched.addTriggerEvent(
|
||||
self.connection.driver_name, event
|
||||
)
|
||||
|
||||
def _event_base(self, body, pull_data_field='pullrequest'):
|
||||
event = PagureTriggerEvent()
|
||||
|
||||
@@ -130,7 +130,7 @@ class TimerDriver(Driver, TriggerInterface):
|
||||
event.timestamp = time.time()
|
||||
log = get_annotated_logger(self.log, event)
|
||||
log.debug("Adding event")
|
||||
self.sched.addEvent(event)
|
||||
self.sched.addTriggerEvent(self.name, event)
|
||||
|
||||
def stop(self):
|
||||
if self.apsched:
|
||||
|
||||
@@ -96,7 +96,7 @@ class ZuulDriver(Driver, TriggerInterface):
|
||||
event.ref = change.ref
|
||||
event.zuul_event_id = str(uuid4().hex)
|
||||
event.timestamp = time.time()
|
||||
self.sched.addEvent(event)
|
||||
self.sched.addTriggerEvent(self.name, event)
|
||||
|
||||
def _createParentChangeEnqueuedEvents(self, change, pipeline, tenant,
|
||||
event):
|
||||
@@ -133,7 +133,7 @@ class ZuulDriver(Driver, TriggerInterface):
|
||||
event.patch_number = change.patchset
|
||||
event.ref = change.ref
|
||||
event.zuul_event_id = str(uuid4().hex)
|
||||
self.sched.addEvent(event)
|
||||
self.sched.addTriggerEvent(self.name, event)
|
||||
|
||||
def getTrigger(self, connection_name, config=None):
|
||||
return zuultrigger.ZuulTrigger(self, config)
|
||||
|
||||
Reference in New Issue
Block a user