Optimize communication with Gerrit

Connection to Gerrit is established only once upon initialization
of the processor. The connection is used for a hard-coded number
of requests (20), once it is exceeded the connection is
re-established.

Closes bug 1381539

Change-Id: I5aa0129b56bd6c0217b338f771b4b68a7adf8db1
This commit is contained in:
Ilya Shakhat
2014-08-11 20:03:11 +04:00
parent 0d9f72e52f
commit 97e3af47c8
3 changed files with 66 additions and 60 deletions

View File

@@ -67,13 +67,14 @@ def _retrieve_project_list_from_gerrit(project_source):
LOG.info('Retrieving project list from Gerrit') LOG.info('Retrieving project list from Gerrit')
try: try:
uri = project_source.get('uri') or cfg.CONF.review_uri uri = project_source.get('uri') or cfg.CONF.review_uri
gerrit = rcs.Gerrit(None, uri) gerrit_inst = rcs.Gerrit(uri)
key_filename = (project_source.get('ssh_key_filename') or key_filename = (project_source.get('ssh_key_filename') or
cfg.CONF.ssh_key_filename) cfg.CONF.ssh_key_filename)
username = project_source.get('ssh_username') or cfg.CONF.ssh_username username = project_source.get('ssh_username') or cfg.CONF.ssh_username
gerrit.setup(key_filename=key_filename, username=username) gerrit_inst.setup(key_filename=key_filename, username=username)
project_list = gerrit.get_project_list() project_list = gerrit_inst.get_project_list()
gerrit_inst.close()
except Exception as e: except Exception as e:
LOG.exception(e) LOG.exception(e)
LOG.warn('Fail to retrieve list of projects. Keep it unmodified') LOG.warn('Fail to retrieve list of projects. Keep it unmodified')

View File

@@ -95,7 +95,7 @@ def _process_reviews(record_iterator, ci_map, module, branch):
def _process_repo(repo, runtime_storage_inst, record_processor_inst, def _process_repo(repo, runtime_storage_inst, record_processor_inst,
bug_modified_since): rcs_inst, bug_modified_since):
uri = repo['uri'] uri = repo['uri']
LOG.info('Processing repo uri: %s', uri) LOG.info('Processing repo uri: %s', uri)
@@ -118,10 +118,6 @@ def _process_repo(repo, runtime_storage_inst, record_processor_inst,
vcs_inst = vcs.get_vcs(repo, cfg.CONF.sources_root) vcs_inst = vcs.get_vcs(repo, cfg.CONF.sources_root)
vcs_inst.fetch() vcs_inst.fetch()
rcs_inst = rcs.get_rcs(repo, cfg.CONF.review_uri)
rcs_inst.setup(key_filename=cfg.CONF.ssh_key_filename,
username=cfg.CONF.ssh_username)
branches = set(['master']) branches = set(['master'])
for release in repo.get('releases'): for release in repo.get('releases'):
if 'branch' in release: if 'branch' in release:
@@ -148,7 +144,7 @@ def _process_repo(repo, runtime_storage_inst, record_processor_inst,
rcs_key = 'rcs:' + str(parse.quote_plus(uri) + ':' + branch) rcs_key = 'rcs:' + str(parse.quote_plus(uri) + ':' + branch)
last_id = runtime_storage_inst.get_by_key(rcs_key) last_id = runtime_storage_inst.get_by_key(rcs_key)
review_iterator = rcs_inst.log(branch, last_id, review_iterator = rcs_inst.log(repo, branch, last_id,
grab_comments=('ci' in repo)) grab_comments=('ci' in repo))
review_iterator_typed = _record_typer(review_iterator, 'review') review_iterator_typed = _record_typer(review_iterator, 'review')
@@ -161,7 +157,7 @@ def _process_repo(repo, runtime_storage_inst, record_processor_inst,
runtime_storage_inst.set_records(processed_review_iterator, runtime_storage_inst.set_records(processed_review_iterator,
utils.merge_records) utils.merge_records)
last_id = rcs_inst.get_last_id(branch) last_id = rcs_inst.get_last_id(repo, branch)
runtime_storage_inst.set_by_key(rcs_key, last_id) runtime_storage_inst.set_by_key(rcs_key, last_id)
@@ -206,9 +202,17 @@ def process(runtime_storage_inst, record_processor_inst):
current_date = utils.date_to_timestamp('now') current_date = utils.date_to_timestamp('now')
bug_modified_since = runtime_storage_inst.get_by_key('bug_modified_since') bug_modified_since = runtime_storage_inst.get_by_key('bug_modified_since')
rcs_inst = rcs.get_rcs(cfg.CONF.review_uri)
rcs_inst.setup(key_filename=cfg.CONF.ssh_key_filename,
username=cfg.CONF.ssh_username)
for repo in repos: for repo in repos:
_process_repo(repo, runtime_storage_inst, record_processor_inst, _process_repo(repo, runtime_storage_inst, record_processor_inst,
bug_modified_since) rcs_inst, bug_modified_since)
rcs_inst.close()
runtime_storage_inst.set_by_key('bug_modified_since', current_date) runtime_storage_inst.set_by_key('bug_modified_since', current_date)
LOG.info('Processing mail lists') LOG.info('Processing mail lists')
@@ -327,10 +331,6 @@ def main():
LOG.critical('Unable to load default data') LOG.critical('Unable to load default data')
return not 0 return not 0
gerrit = rcs.get_rcs(None, cfg.CONF.review_uri)
gerrit.setup(key_filename=cfg.CONF.ssh_key_filename,
username=cfg.CONF.ssh_username)
default_data_processor.process(runtime_storage_inst, default_data_processor.process(runtime_storage_inst,
default_data, default_data,
cfg.CONF.driverlog_data_uri) cfg.CONF.driverlog_data_uri)

View File

@@ -26,25 +26,32 @@ LOG = logging.getLogger(__name__)
DEFAULT_PORT = 29418 DEFAULT_PORT = 29418
GERRIT_URI_PREFIX = r'^gerrit:\/\/' GERRIT_URI_PREFIX = r'^gerrit:\/\/'
PAGE_LIMIT = 100 PAGE_LIMIT = 100
REQUEST_COUNT_LIMIT = 20
class Rcs(object): class Rcs(object):
def __init__(self, repo, uri): def __init__(self):
self.repo = repo
def setup(self, **kwargs):
pass pass
def log(self, branch, last_id): def setup(self, **kwargs):
return True
def get_project_list(self):
pass
def log(self, repo, branch, last_id):
return [] return []
def get_last_id(self, branch): def get_last_id(self, repo, branch):
return -1 return -1
def close(self):
pass
class Gerrit(Rcs): class Gerrit(Rcs):
def __init__(self, repo, uri): def __init__(self, uri):
super(Gerrit, self).__init__(repo, uri) super(Gerrit, self).__init__()
stripped = re.sub(GERRIT_URI_PREFIX, '', uri) stripped = re.sub(GERRIT_URI_PREFIX, '', uri)
if stripped: if stripped:
@@ -54,20 +61,20 @@ class Gerrit(Rcs):
else: else:
raise Exception('Invalid rcs uri %s' % uri) raise Exception('Invalid rcs uri %s' % uri)
self.key_filename = None
self.username = None
self.client = paramiko.SSHClient() self.client = paramiko.SSHClient()
self.client.load_system_host_keys() self.client.load_system_host_keys()
self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
def setup(self, **kwargs): self.request_count = 0
if 'key_filename' in kwargs:
self.key_filename = kwargs['key_filename']
else:
self.key_filename = None
if 'username' in kwargs: def setup(self, **kwargs):
self.username = kwargs['username'] self.key_filename = kwargs.get('key_filename')
else: self.username = kwargs.get('username')
self.username = None
return self._connect()
def _connect(self): def _connect(self):
try: try:
@@ -99,12 +106,21 @@ class Gerrit(Rcs):
return cmd return cmd
def _exec_command(self, cmd): def _exec_command(self, cmd):
# check how many requests were sent over connection and reconnect
if self.request_count >= REQUEST_COUNT_LIMIT:
self.close()
self.request_count = 0
self._connect()
else:
self.request_count += 1
try: try:
return self.client.exec_command(cmd) return self.client.exec_command(cmd)
except Exception as e: except Exception as e:
LOG.error('Error %(error)s while execute command %(cmd)s', LOG.error('Error %(error)s while execute command %(cmd)s',
{'error': e, 'cmd': cmd}) {'error': e, 'cmd': cmd})
LOG.exception(e) LOG.exception(e)
self.request_count = REQUEST_COUNT_LIMIT
return False return False
def _poll_reviews(self, project_organization, module, branch, def _poll_reviews(self, project_organization, module, branch,
@@ -139,50 +155,38 @@ class Gerrit(Rcs):
break break
def get_project_list(self): def get_project_list(self):
if not self._connect():
return
exec_result = self._exec_command('gerrit ls-projects') exec_result = self._exec_command('gerrit ls-projects')
if not exec_result: if not exec_result:
raise Exception("Unable to retrieve list of projects from gerrit.") raise Exception("Unable to retrieve list of projects from gerrit.")
stdin, stdout, stderr = exec_result stdin, stdout, stderr = exec_result
result = [line.strip() for line in stdout] result = [line.strip() for line in stdout]
self.client.close()
return result return result
def log(self, branch, last_id, grab_comments=False): def log(self, repo, branch, last_id, grab_comments=False):
if not self._connect():
return
# poll new reviews from the top down to last_id # poll new reviews from the top down to last_id
LOG.debug('Poll new reviews for module: %s', self.repo['module']) LOG.debug('Poll new reviews for module: %s', repo['module'])
for review in self._poll_reviews(self.repo['organization'], for review in self._poll_reviews(repo['organization'],
self.repo['module'], branch, repo['module'], branch,
last_id=last_id, last_id=last_id,
grab_comments=grab_comments): grab_comments=grab_comments):
yield review yield review
# poll open reviews from last_id down to bottom # poll open reviews from last_id down to bottom
LOG.debug('Poll open reviews for module: %s', self.repo['module']) LOG.debug('Poll open reviews for module: %s', repo['module'])
start_id = None start_id = None
if last_id: if last_id:
start_id = last_id + 1 # include the last review into query start_id = last_id + 1 # include the last review into query
for review in self._poll_reviews(self.repo['organization'], for review in self._poll_reviews(repo['organization'],
self.repo['module'], branch, repo['module'], branch,
start_id=start_id, is_open=True, start_id=start_id, is_open=True,
grab_comments=grab_comments): grab_comments=grab_comments):
yield review yield review
self.client.close() def get_last_id(self, repo, branch):
LOG.debug('Get last id for module: %s', repo['module'])
def get_last_id(self, branch): cmd = self._get_cmd(repo['organization'], repo['module'],
if not self._connect():
return None
LOG.debug('Get last id for module: %s', self.repo['module'])
cmd = self._get_cmd(self.repo['organization'], self.repo['module'],
branch, limit=1) branch, limit=1)
LOG.debug('Executing command: %s', cmd) LOG.debug('Executing command: %s', cmd)
exec_result = self._exec_command(cmd) exec_result = self._exec_command(cmd)
@@ -197,18 +201,19 @@ class Gerrit(Rcs):
last_id = int(review['sortKey'], 16) last_id = int(review['sortKey'], 16)
break break
self.client.close()
LOG.debug('Module %(module)s last id is %(id)s', LOG.debug('Module %(module)s last id is %(id)s',
{'module': self.repo['module'], 'id': last_id}) {'module': repo['module'], 'id': last_id})
return last_id return last_id
def close(self):
self.client.close()
def get_rcs(repo, uri):
def get_rcs(uri):
LOG.debug('Review control system is requested for uri %s' % uri) LOG.debug('Review control system is requested for uri %s' % uri)
match = re.search(GERRIT_URI_PREFIX, uri) match = re.search(GERRIT_URI_PREFIX, uri)
if match: if match:
return Gerrit(repo, uri) return Gerrit(uri)
else: else:
LOG.warning('Unsupported review control system, fallback to dummy') LOG.warning('Unsupported review control system, fallback to dummy')
return Rcs(repo, uri) return Rcs()