diff --git a/tests/base.py b/tests/base.py index 912c94a80a..5edd75b813 100644 --- a/tests/base.py +++ b/tests/base.py @@ -310,7 +310,6 @@ class GitlabDriverMock(GitlabDriver): self, name, config, self.rpcclient, changes_db=db, upstream_root=self.upstream_root) - self.additional_event_queues.append(connection.event_queue) setattr(self.registry, 'fake_' + name, connection) registerProjects(connection.source.name, connection.gl_client, self.config) @@ -1836,10 +1835,9 @@ class FakeGitlabConnection(gitlabconnection.GitlabConnection): % (self.zuul_web_port, self.connection_name), data=payload, headers=headers) else: - job = self.rpcclient.submitJob( - 'gitlab:%s:payload' % self.connection_name, - {'payload': payload}) - return json.loads(job.data[0]) + data = {'payload': payload} + self.event_queue.put(data) + return data def setZuulWebPort(self, port): self.zuul_web_port = port diff --git a/zuul/driver/gitlab/gitlabconnection.py b/zuul/driver/gitlab/gitlabconnection.py index d8a654fbef..70692f890a 100644 --- a/zuul/driver/gitlab/gitlabconnection.py +++ b/zuul/driver/gitlab/gitlabconnection.py @@ -15,7 +15,6 @@ import logging import threading import json -import queue import cherrypy import voluptuous as v import time @@ -29,63 +28,10 @@ from typing import List, Optional from zuul.connection import CachedBranchConnection from zuul.web.handler import BaseWebController -from zuul.lib.gearworker import ZuulGearWorker from zuul.lib.logutil import get_annotated_logger from zuul.model import Branch, Project, Ref, Tag - from zuul.driver.gitlab.gitlabmodel import GitlabTriggerEvent, MergeRequest - - -class GitlabGearmanWorker(object): - """A thread that answers gearman requests""" - log = logging.getLogger("zuul.GitlabGearmanWorker") - - def __init__(self, connection): - self.config = connection.sched.config - self.connection = connection - handler = "gitlab:%s:payload" % self.connection.connection_name - self.jobs = { - handler: self.handle_payload, - } - self.gearworker = ZuulGearWorker( - 'Zuul Gitlab Worker', - 'zuul.GitlabGearmanWorker', - 'gitlab', - self.config, - self.jobs) - - def handle_payload(self, job): - args = json.loads(job.arguments) - payload = args["payload"] - - self.log.info( - "Gitlab Webhook Received event kind: %(object_kind)s" % payload) - - try: - self.__dispatch_event(payload) - output = {'return_code': 200} - except Exception: - output = {'return_code': 503} - self.log.exception("Exception handling Gitlab event:") - - job.sendWorkComplete(json.dumps(output)) - - def __dispatch_event(self, payload): - self.log.info(payload) - event = payload['object_kind'] - try: - self.log.info("Dispatching event %s" % event) - self.connection.addEvent(payload, event) - except Exception as err: - message = 'Exception dispatching event: %s' % str(err) - self.log.exception(message) - raise Exception(message) - - def start(self): - self.gearworker.start() - - def stop(self): - self.gearworker.stop() +from zuul.zk.event_queues import ConnectionEventQueue class GitlabEventConnector(threading.Thread): @@ -97,7 +43,9 @@ class GitlabEventConnector(threading.Thread): super(GitlabEventConnector, self).__init__() self.daemon = True self.connection = connection + self.event_queue = connection.event_queue self._stopped = False + self._process_event = threading.Event() self.event_handler_mapping = { 'merge_request': self._event_merge_request, 'note': self._event_note, @@ -107,7 +55,33 @@ class GitlabEventConnector(threading.Thread): def stop(self): self._stopped = True - self.connection.addEvent(None) + self._process_event.set() + self.event_queue.election.cancel() + + def _onNewEvent(self): + self._process_event.set() + # Stop the data watch in case the connector was stopped + return not self._stopped + + def run(self): + self.event_queue.registerEventWatch(self._onNewEvent) + while not self._stopped: + try: + self.event_queue.election.run(self._run) + except Exception: + self.log.exception("Exception handling Gitlab event:") + + def _run(self): + while not self._stopped: + for event in self.event_queue: + try: + self._handleEvent(event) + finally: + self.event_queue.ack(event) + if self._stopped: + return + self._process_event.wait(10) + self._process_event.clear() def _event_base(self, body): event = GitlabTriggerEvent() @@ -195,12 +169,16 @@ class GitlabEventConnector(threading.Thread): event.type = 'gl_push' return event - def _handleEvent(self): - ts, json_body, event_type = self.connection.getEvent() + def _handleEvent(self, connection_event): if self._stopped: return - self.log.info("Received event: %s" % str(event_type)) + timestamp = time.time() + json_body = connection_event["payload"] + self.log.info(json_body) + + event_type = json_body['object_kind'] + self.log.info("Received event: %s", event_type) if event_type not in self.event_handler_mapping: message = "Unhandled Gitlab event: %s" % event_type @@ -219,7 +197,7 @@ class GitlabEventConnector(threading.Thread): if event: event.zuul_event_id = str(uuid.uuid4()) - event.timestamp = ts + event.timestamp = timestamp event.project_hostname = self.connection.canonical_hostname if event.change_number: project = self.connection.source.getProject(event.project_name) @@ -241,17 +219,6 @@ class GitlabEventConnector(threading.Thread): self.connection.driver_name, event ) - def run(self): - while True: - if self._stopped: - return - try: - self._handleEvent() - except Exception: - self.log.exception("Exception moving Gitlab event:") - finally: - self.connection.eventDone() - class GitlabAPIClientException(Exception): pass @@ -416,7 +383,6 @@ class GitlabConnection(CachedBranchConnection): 'api_token', '') self.gl_client = GitlabAPIClient(self.baseurl, self.api_token) self.sched = None - self.event_queue = queue.Queue() self.source = driver.getSource(self) def _start_event_connector(self): @@ -429,27 +395,18 @@ class GitlabConnection(CachedBranchConnection): self.gitlab_event_connector.join() def onLoad(self): - self.log.info('Starting Gitlab connection: %s' % self.connection_name) - self.gearman_worker = GitlabGearmanWorker(self) + self.log.info('Starting Gitlab connection: %s', self.connection_name) + self.log.info('Creating Zookeeper event queue') + self.event_queue = ConnectionEventQueue( + self.sched.zk_client, self.connection_name + ) self.log.info('Starting event connector') self._start_event_connector() - self.log.info('Starting GearmanWorker') - self.gearman_worker.start() def onStop(self): - if hasattr(self, 'gearman_worker'): - self.gearman_worker.stop() + if hasattr(self, 'gitlab_event_connector'): self._stop_event_connector() - def addEvent(self, data, event=None): - return self.event_queue.put((time.time(), data, event)) - - def getEvent(self): - return self.event_queue.get() - - def eventDone(self): - self.event_queue.task_done() - def getWebController(self, zuul_web): return GitlabWebController(zuul_web, self) @@ -630,6 +587,10 @@ class GitlabWebController(BaseWebController): def __init__(self, zuul_web, connection): self.connection = connection self.zuul_web = zuul_web + self.event_queue = ConnectionEventQueue( + self.zuul_web.zk_client, + self.connection.connection_name + ) def _validate_token(self, headers): try: @@ -658,11 +619,9 @@ class GitlabWebController(BaseWebController): self._validate_token(headers) json_payload = json.loads(body.decode('utf-8')) - job = self.zuul_web.rpc.submitJob( - 'gitlab:%s:payload' % self.connection.connection_name, - {'payload': json_payload}) - - return json.loads(job.data[0]) + data = {'payload': json_payload} + self.event_queue.put(data) + return data def getSchema():