From 56e474d7259e5b496f36930caaeb9104bb2f5bb0 Mon Sep 17 00:00:00 2001 From: Mikhail S Medvedev Date: Mon, 23 Nov 2015 15:26:35 -0600 Subject: [PATCH] Use zuul gerrit event listener implementation Current event listener implementation does not handle network problems well. E.g. if ssh stream connection is lost, it would not reconnect or recover on its own. Instead of fixing the implementation, use well-tested gerrit listener used by zuul. Explicitly specify version of zuul to be 2.1.0 to avoid accidental breakages due to changes in zuul lib. Downside is that we need to install zuul and its dependencies just to use gerrit listener. parse_json_event function had to be changed, because zuul gerrit event listener provides object, not json string. We still need to create event from json in populate_db.py, so that part of the function has been moved there. Closes-Bug: #1516820 Change-Id: I8aa7a18460b58998f6c378e9d9c0d783032eca21 --- ciwatch/events.py | 68 +++++++++------------------------------------ ciwatch/populate.py | 16 ++++++++--- requirements.txt | 1 + 3 files changed, 26 insertions(+), 59 deletions(-) diff --git a/ciwatch/events.py b/ciwatch/events.py index a748cd1..26c270f 100644 --- a/ciwatch/events.py +++ b/ciwatch/events.py @@ -14,14 +14,13 @@ from datetime import datetime import json -import paramiko import re -import time from ciwatch.config import Config from ciwatch import db from ciwatch.log import logger from ciwatch import models +from zuul.lib.gerrit import Gerrit def _process_project_name(project_name): @@ -70,49 +69,7 @@ def _store_event(event, datadir): return event -class GerritEventStream(object): - def __init__(self, cfg): - - logger.debug('Connecting to %(host)s:%(port)d as ' - '%(user)s using %(key)s', - {'user': cfg.AccountInfo.gerrit_username, - 'key': cfg.AccountInfo.gerrit_ssh_key, - 'host': cfg.AccountInfo.gerrit_host, - 'port': int(cfg.AccountInfo.gerrit_port)}) - - self.ssh = paramiko.SSHClient() - self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) - - connected = False - while not connected: - try: - self.ssh.connect(cfg.AccountInfo.gerrit_host, - int(cfg.AccountInfo.gerrit_port), - cfg.AccountInfo.gerrit_username, - key_filename=cfg.AccountInfo.gerrit_ssh_key) - connected = True - except paramiko.SSHException as e: - logger.error('%s', e) - logger.warn('Gerrit may be down, will pause and retry...') - time.sleep(10) - - self.stdin, self.stdout, self.stderr =\ - self.ssh.exec_command("gerrit stream-events") - - def __iter__(self): - return self - - def next(self): - return self.stdout.readline() - - -def parse_json_event(event, projects): - try: - event = json.loads(event) - except Exception as ex: - logger.error('Failed json.loads on event: %s', event) - logger.exception(ex) - return None +def parse_event(event, projects): if _is_valid(event, projects): _process_event(event) logger.info('Parsed valid event: %s', event) @@ -162,17 +119,18 @@ def add_event_to_db(event, commit_=True): def main(): config = Config() db.create_projects() # This will make sure the database has projects in it + gerrit = Gerrit( + hostname=config.cfg.AccountInfo.gerrit_host, + username=config.cfg.AccountInfo.gerrit_username, + port=int(config.cfg.AccountInfo.gerrit_port), + keyfile=config.cfg.AccountInfo.gerrit_ssh_key + ) + gerrit.startWatching() while True: - try: - events = GerritEventStream(config.cfg) - except paramiko.SSHException as ex: - logger.exception('Error connecting to Gerrit: %s', ex) - time.sleep(60) - for event in events: - event = parse_json_event(event, config.get_projects()) - if event is not None: - _store_event(event, config.DATA_DIR) - + event = gerrit.getEvent()[1] + parsed_event = parse_event(event, config.get_projects()) + if parsed_event is not None: + _store_event(parsed_event, config.DATA_DIR) if __name__ == '__main__': main() diff --git a/ciwatch/populate.py b/ciwatch/populate.py index 3bf5a90..6e8cd54 100644 --- a/ciwatch/populate.py +++ b/ciwatch/populate.py @@ -12,21 +12,29 @@ # License for the specific language governing permissions and limitations # under the License. +import json import os from ciwatch.config import Config from ciwatch import db from ciwatch.events import add_event_to_db -from ciwatch.events import parse_json_event +from ciwatch.events import parse_event +from ciwatch.log import logger def get_data(datafile, projects): data = [] with open(datafile) as file_: for line in file_: - event = parse_json_event(line, projects) - if event is not None: - data.append(event) + try: + event = json.loads(line) + except Exception as ex: + logger.error('Failed json.loads on event: %s', event) + logger.exception(ex) + continue + parsed_event = parse_event(event, projects) + if parsed_event is not None: + data.append(parsed_event) return data diff --git a/requirements.txt b/requirements.txt index a37220b..839b96b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -7,3 +7,4 @@ flask>=0.10 sqlalchemy>=1.0 iniparse>=0.4 paramiko>=1.15 +zuul==2.1.0