diff --git a/tests/base.py b/tests/base.py index 5edd75b813..adf08468a8 100644 --- a/tests/base.py +++ b/tests/base.py @@ -86,7 +86,6 @@ from zuul.driver.github.githubconnection import GithubClientManager from zuul.driver.elasticsearch import ElasticsearchDriver from zuul.lib.collections import DefaultKeyDict from zuul.lib.connections import ConnectionRegistry -from zuul.lib.queue import NamedQueue from zuul.zk import ZooKeeperClient from zuul.zk.event_queues import ConnectionEventQueue from psutil import Popen @@ -236,7 +235,6 @@ class GerritDriverMock(GerritDriver): if connection.web_server: self.add_cleanup(connection.web_server.stop) - self.additional_event_queues.append(connection.event_queue) setattr(self.registry, 'fake_' + name, connection) return connection @@ -1159,7 +1157,6 @@ class FakeGerritConnection(gerritconnection.GerritConnection): super(FakeGerritConnection, self).__init__(driver, connection_name, connection_config) - self.event_queue = NamedQueue('FakeGerritConnectionEventQueue') self.fixture_dir = os.path.join(FIXTURE_DIR, 'gerrit') self.change_number = 0 self.changes = changes_db diff --git a/zuul/driver/gerrit/gerritconnection.py b/zuul/driver/gerrit/gerritconnection.py index acda8985cb..30b030b395 100644 --- a/zuul/driver/gerrit/gerritconnection.py +++ b/zuul/driver/gerrit/gerritconnection.py @@ -41,8 +41,8 @@ from zuul.driver.gerrit.gcloudauth import GCloudAuth from zuul.driver.gerrit.gerritmodel import GerritChange, GerritTriggerEvent from zuul.driver.git.gitwatcher import GitWatcher from zuul.lib.logutil import get_annotated_logger -from zuul.lib.queue import NamedQueue from zuul.model import Ref, Tag, Branch, Project +from zuul.zk.event_queues import ConnectionEventQueue # HTTP timeout in seconds TIMEOUT = 30 @@ -131,16 +131,44 @@ class GerritEventConnector(threading.Thread): super(GerritEventConnector, self).__init__() self.daemon = True self.connection = connection + self.event_queue = connection.event_queue self._stopped = False + self._connector_wake_event = threading.Event() def stop(self): self._stopped = True - self.connection.addEvent(None) + self._connector_wake_event.set() + self.event_queue.election.cancel() + + def _onNewEvent(self): + self._connector_wake_event.set() + # Stop the data watch in case the connector was stopped + return not self._stopped + + def run(self): + self.event_queue.registerEventWatch(self._onNewEvent) + while not self._stopped: + try: + self.event_queue.election.run(self._run) + except Exception: + self.log.exception("Exception moving Gerrit event:") + + def _run(self): + while not self._stopped: + for event in self.event_queue: + try: + self._handleEvent(event) + finally: + self.event_queue.ack(event) + if self._stopped: + return + self._connector_wake_event.wait(10) + self._connector_wake_event.clear() + + def _handleEvent(self, connection_event): + timestamp = connection_event["timestamp"] + data = connection_event["payload"] - def _handleEvent(self): - ts, data = self.connection.getEvent() - if self._stopped: - return # Gerrit can produce inconsistent data immediately after an # event, So ensure that we do not deliver the event to Zuul # until at least a certain amount of time has passed. Note @@ -148,9 +176,9 @@ class GerritEventConnector(threading.Thread): # only need to delay for the first event. In essence, Zuul # should always be a constant number of seconds behind Gerrit. now = time.time() - time.sleep(max((ts + self.delay) - now, 0.0)) + time.sleep(max((timestamp + self.delay) - now, 0.0)) event = GerritTriggerEvent() - event.timestamp = ts + event.timestamp = timestamp # Gerrit events don't have an event id that could be used to globally # identify this event in the system so we have to generate one. @@ -294,17 +322,6 @@ class GerritEventConnector(threading.Thread): event.patch_number, refresh=True, event=event) - def run(self): - while True: - if self._stopped: - return - try: - self._handleEvent() - except Exception: - self.log.exception("Exception moving Gerrit event:") - finally: - self.connection.eventDone() - class GerritWatcher(threading.Thread): log = logging.getLogger("gerrit.GerritWatcher") @@ -527,7 +544,6 @@ class GerritConnection(BaseConnection): self.watcher_thread = None self.poller_thread = None self.ref_watcher_thread = None - self.event_queue = NamedQueue(f'GerritEventQueue<{connection_name}>') self.client = None self.watched_checkers = [] self.project_checker_map = {} @@ -1031,13 +1047,11 @@ class GerritConnection(BaseConnection): return heads def addEvent(self, data): - return self.event_queue.put((time.time(), data)) - - def getEvent(self): - return self.event_queue.get() - - def eventDone(self): - self.event_queue.task_done() + event = { + "timestamp": time.time(), + "payload": data + } + self.event_queue.put(event) def review(self, item, message, submit, labels, checks_api, file_comments, zuul_event_id=None): @@ -1468,6 +1482,10 @@ class GerritConnection(BaseConnection): except Exception: self.log.exception("Unable to determine remote Gerrit version") + self.log.info("Creating Zookeeper event queue") + self.event_queue = ConnectionEventQueue(self.sched.zk_client, + self.connection_name) + if self.enable_stream_events: self._start_watcher_thread() else: