From 97e3af47c86ffc43967903c601bc03dd245cb0c2 Mon Sep 17 00:00:00 2001 From: Ilya Shakhat Date: Mon, 11 Aug 2014 20:03:11 +0400 Subject: [PATCH] Optimize communication with Gerrit Connection to Gerrit is established only once upon initialization of the processor. The connection is used for a hard-coded number of requests (20), once it is exceeded the connection is re-established. Closes bug 1381539 Change-Id: I5aa0129b56bd6c0217b338f771b4b68a7adf8db1 --- .../processor/default_data_processor.py | 7 +- stackalytics/processor/main.py | 24 ++--- stackalytics/processor/rcs.py | 95 ++++++++++--------- 3 files changed, 66 insertions(+), 60 deletions(-) diff --git a/stackalytics/processor/default_data_processor.py b/stackalytics/processor/default_data_processor.py index 320ff3596..b4ae25554 100644 --- a/stackalytics/processor/default_data_processor.py +++ b/stackalytics/processor/default_data_processor.py @@ -67,13 +67,14 @@ def _retrieve_project_list_from_gerrit(project_source): LOG.info('Retrieving project list from Gerrit') try: uri = project_source.get('uri') or cfg.CONF.review_uri - gerrit = rcs.Gerrit(None, uri) + gerrit_inst = rcs.Gerrit(uri) key_filename = (project_source.get('ssh_key_filename') or cfg.CONF.ssh_key_filename) username = project_source.get('ssh_username') or cfg.CONF.ssh_username - gerrit.setup(key_filename=key_filename, username=username) + gerrit_inst.setup(key_filename=key_filename, username=username) - project_list = gerrit.get_project_list() + project_list = gerrit_inst.get_project_list() + gerrit_inst.close() except Exception as e: LOG.exception(e) LOG.warn('Fail to retrieve list of projects. Keep it unmodified') diff --git a/stackalytics/processor/main.py b/stackalytics/processor/main.py index ca7045ab2..40c3e65d7 100644 --- a/stackalytics/processor/main.py +++ b/stackalytics/processor/main.py @@ -95,7 +95,7 @@ def _process_reviews(record_iterator, ci_map, module, branch): def _process_repo(repo, runtime_storage_inst, record_processor_inst, - bug_modified_since): + rcs_inst, bug_modified_since): uri = repo['uri'] LOG.info('Processing repo uri: %s', uri) @@ -118,10 +118,6 @@ def _process_repo(repo, runtime_storage_inst, record_processor_inst, vcs_inst = vcs.get_vcs(repo, cfg.CONF.sources_root) vcs_inst.fetch() - rcs_inst = rcs.get_rcs(repo, cfg.CONF.review_uri) - rcs_inst.setup(key_filename=cfg.CONF.ssh_key_filename, - username=cfg.CONF.ssh_username) - branches = set(['master']) for release in repo.get('releases'): if 'branch' in release: @@ -148,7 +144,7 @@ def _process_repo(repo, runtime_storage_inst, record_processor_inst, rcs_key = 'rcs:' + str(parse.quote_plus(uri) + ':' + branch) last_id = runtime_storage_inst.get_by_key(rcs_key) - review_iterator = rcs_inst.log(branch, last_id, + review_iterator = rcs_inst.log(repo, branch, last_id, grab_comments=('ci' in repo)) review_iterator_typed = _record_typer(review_iterator, 'review') @@ -161,7 +157,7 @@ def _process_repo(repo, runtime_storage_inst, record_processor_inst, runtime_storage_inst.set_records(processed_review_iterator, utils.merge_records) - last_id = rcs_inst.get_last_id(branch) + last_id = rcs_inst.get_last_id(repo, branch) runtime_storage_inst.set_by_key(rcs_key, last_id) @@ -206,9 +202,17 @@ def process(runtime_storage_inst, record_processor_inst): current_date = utils.date_to_timestamp('now') bug_modified_since = runtime_storage_inst.get_by_key('bug_modified_since') + + rcs_inst = rcs.get_rcs(cfg.CONF.review_uri) + rcs_inst.setup(key_filename=cfg.CONF.ssh_key_filename, + username=cfg.CONF.ssh_username) + for repo in repos: _process_repo(repo, runtime_storage_inst, record_processor_inst, - bug_modified_since) + rcs_inst, bug_modified_since) + + rcs_inst.close() + runtime_storage_inst.set_by_key('bug_modified_since', current_date) LOG.info('Processing mail lists') @@ -327,10 +331,6 @@ def main(): LOG.critical('Unable to load default data') return not 0 - gerrit = rcs.get_rcs(None, cfg.CONF.review_uri) - gerrit.setup(key_filename=cfg.CONF.ssh_key_filename, - username=cfg.CONF.ssh_username) - default_data_processor.process(runtime_storage_inst, default_data, cfg.CONF.driverlog_data_uri) diff --git a/stackalytics/processor/rcs.py b/stackalytics/processor/rcs.py index c2409da53..83bf9c5cf 100644 --- a/stackalytics/processor/rcs.py +++ b/stackalytics/processor/rcs.py @@ -26,25 +26,32 @@ LOG = logging.getLogger(__name__) DEFAULT_PORT = 29418 GERRIT_URI_PREFIX = r'^gerrit:\/\/' PAGE_LIMIT = 100 +REQUEST_COUNT_LIMIT = 20 class Rcs(object): - def __init__(self, repo, uri): - self.repo = repo - - def setup(self, **kwargs): + def __init__(self): pass - def log(self, branch, last_id): + def setup(self, **kwargs): + return True + + def get_project_list(self): + pass + + def log(self, repo, branch, last_id): return [] - def get_last_id(self, branch): + def get_last_id(self, repo, branch): return -1 + def close(self): + pass + class Gerrit(Rcs): - def __init__(self, repo, uri): - super(Gerrit, self).__init__(repo, uri) + def __init__(self, uri): + super(Gerrit, self).__init__() stripped = re.sub(GERRIT_URI_PREFIX, '', uri) if stripped: @@ -54,20 +61,20 @@ class Gerrit(Rcs): else: raise Exception('Invalid rcs uri %s' % uri) + self.key_filename = None + self.username = None + self.client = paramiko.SSHClient() self.client.load_system_host_keys() self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) - def setup(self, **kwargs): - if 'key_filename' in kwargs: - self.key_filename = kwargs['key_filename'] - else: - self.key_filename = None + self.request_count = 0 - if 'username' in kwargs: - self.username = kwargs['username'] - else: - self.username = None + def setup(self, **kwargs): + self.key_filename = kwargs.get('key_filename') + self.username = kwargs.get('username') + + return self._connect() def _connect(self): try: @@ -99,12 +106,21 @@ class Gerrit(Rcs): return cmd def _exec_command(self, cmd): + # check how many requests were sent over connection and reconnect + if self.request_count >= REQUEST_COUNT_LIMIT: + self.close() + self.request_count = 0 + self._connect() + else: + self.request_count += 1 + try: return self.client.exec_command(cmd) except Exception as e: LOG.error('Error %(error)s while execute command %(cmd)s', {'error': e, 'cmd': cmd}) LOG.exception(e) + self.request_count = REQUEST_COUNT_LIMIT return False def _poll_reviews(self, project_organization, module, branch, @@ -139,50 +155,38 @@ class Gerrit(Rcs): break def get_project_list(self): - if not self._connect(): - return - exec_result = self._exec_command('gerrit ls-projects') if not exec_result: raise Exception("Unable to retrieve list of projects from gerrit.") stdin, stdout, stderr = exec_result result = [line.strip() for line in stdout] - self.client.close() return result - def log(self, branch, last_id, grab_comments=False): - if not self._connect(): - return - + def log(self, repo, branch, last_id, grab_comments=False): # poll new reviews from the top down to last_id - LOG.debug('Poll new reviews for module: %s', self.repo['module']) - for review in self._poll_reviews(self.repo['organization'], - self.repo['module'], branch, + LOG.debug('Poll new reviews for module: %s', repo['module']) + for review in self._poll_reviews(repo['organization'], + repo['module'], branch, last_id=last_id, grab_comments=grab_comments): yield review # poll open reviews from last_id down to bottom - LOG.debug('Poll open reviews for module: %s', self.repo['module']) + LOG.debug('Poll open reviews for module: %s', repo['module']) start_id = None if last_id: start_id = last_id + 1 # include the last review into query - for review in self._poll_reviews(self.repo['organization'], - self.repo['module'], branch, + for review in self._poll_reviews(repo['organization'], + repo['module'], branch, start_id=start_id, is_open=True, grab_comments=grab_comments): yield review - self.client.close() + def get_last_id(self, repo, branch): + LOG.debug('Get last id for module: %s', repo['module']) - def get_last_id(self, branch): - if not self._connect(): - return None - - LOG.debug('Get last id for module: %s', self.repo['module']) - - cmd = self._get_cmd(self.repo['organization'], self.repo['module'], + cmd = self._get_cmd(repo['organization'], repo['module'], branch, limit=1) LOG.debug('Executing command: %s', cmd) exec_result = self._exec_command(cmd) @@ -197,18 +201,19 @@ class Gerrit(Rcs): last_id = int(review['sortKey'], 16) break - self.client.close() - LOG.debug('Module %(module)s last id is %(id)s', - {'module': self.repo['module'], 'id': last_id}) + {'module': repo['module'], 'id': last_id}) return last_id + def close(self): + self.client.close() -def get_rcs(repo, uri): + +def get_rcs(uri): LOG.debug('Review control system is requested for uri %s' % uri) match = re.search(GERRIT_URI_PREFIX, uri) if match: - return Gerrit(repo, uri) + return Gerrit(uri) else: LOG.warning('Unsupported review control system, fallback to dummy') - return Rcs(repo, uri) + return Rcs()