Merge "Make review processing compatible with Gerrit 2.9+"
This commit is contained in:
@@ -17,6 +17,7 @@ from oslo_config import cfg
|
|||||||
from oslo_log import log as logging
|
from oslo_log import log as logging
|
||||||
import psutil
|
import psutil
|
||||||
import six
|
import six
|
||||||
|
import time
|
||||||
|
|
||||||
from stackalytics.processor import bps
|
from stackalytics.processor import bps
|
||||||
from stackalytics.processor import config
|
from stackalytics.processor import config
|
||||||
@@ -93,6 +94,7 @@ def _process_reviews(record_iterator, ci_map, module, branch):
|
|||||||
def _process_repo(repo, runtime_storage_inst, record_processor_inst,
|
def _process_repo(repo, runtime_storage_inst, record_processor_inst,
|
||||||
rcs_inst):
|
rcs_inst):
|
||||||
uri = repo['uri']
|
uri = repo['uri']
|
||||||
|
quoted_uri = six.moves.urllib.parse.quote_plus(uri)
|
||||||
LOG.info('Processing repo uri: %s', uri)
|
LOG.info('Processing repo uri: %s', uri)
|
||||||
|
|
||||||
LOG.debug('Processing blueprints for repo uri: %s', uri)
|
LOG.debug('Processing blueprints for repo uri: %s', uri)
|
||||||
@@ -129,8 +131,7 @@ def _process_repo(repo, runtime_storage_inst, record_processor_inst,
|
|||||||
for branch in branches:
|
for branch in branches:
|
||||||
LOG.debug('Processing commits in repo: %s, branch: %s', uri, branch)
|
LOG.debug('Processing commits in repo: %s, branch: %s', uri, branch)
|
||||||
|
|
||||||
vcs_key = 'vcs:' + str(six.moves.urllib.parse.quote_plus(uri) +
|
vcs_key = 'vcs:%s:%s' % (quoted_uri, branch)
|
||||||
':' + branch)
|
|
||||||
last_id = runtime_storage_inst.get_by_key(vcs_key)
|
last_id = runtime_storage_inst.get_by_key(vcs_key)
|
||||||
|
|
||||||
commit_iterator = vcs_inst.log(branch, last_id)
|
commit_iterator = vcs_inst.log(branch, last_id)
|
||||||
@@ -145,11 +146,11 @@ def _process_repo(repo, runtime_storage_inst, record_processor_inst,
|
|||||||
|
|
||||||
LOG.debug('Processing reviews for repo: %s, branch: %s', uri, branch)
|
LOG.debug('Processing reviews for repo: %s, branch: %s', uri, branch)
|
||||||
|
|
||||||
rcs_key = 'rcs:' + str(six.moves.urllib.parse.quote_plus(uri) +
|
rcs_key = 'rcs:%s:%s' % (quoted_uri, branch)
|
||||||
':' + branch)
|
last_retrieval_time = runtime_storage_inst.get_by_key(rcs_key)
|
||||||
last_id = runtime_storage_inst.get_by_key(rcs_key)
|
current_retrieval_time = int(time.time())
|
||||||
|
|
||||||
review_iterator = rcs_inst.log(repo, branch, last_id,
|
review_iterator = rcs_inst.log(repo, branch, last_retrieval_time,
|
||||||
grab_comments=('ci' in repo))
|
grab_comments=('ci' in repo))
|
||||||
review_iterator_typed = _record_typer(review_iterator, 'review')
|
review_iterator_typed = _record_typer(review_iterator, 'review')
|
||||||
|
|
||||||
@@ -162,8 +163,7 @@ def _process_repo(repo, runtime_storage_inst, record_processor_inst,
|
|||||||
runtime_storage_inst.set_records(processed_review_iterator,
|
runtime_storage_inst.set_records(processed_review_iterator,
|
||||||
utils.merge_records)
|
utils.merge_records)
|
||||||
|
|
||||||
last_id = rcs_inst.get_last_id(repo, branch)
|
runtime_storage_inst.set_by_key(rcs_key, current_retrieval_time)
|
||||||
runtime_storage_inst.set_by_key(rcs_key, last_id)
|
|
||||||
|
|
||||||
|
|
||||||
def _process_mail_list(uri, runtime_storage_inst, record_processor_inst):
|
def _process_mail_list(uri, runtime_storage_inst, record_processor_inst):
|
||||||
|
|||||||
@@ -18,7 +18,7 @@ import re
|
|||||||
|
|
||||||
from oslo_log import log as logging
|
from oslo_log import log as logging
|
||||||
import paramiko
|
import paramiko
|
||||||
|
import time
|
||||||
|
|
||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
@@ -38,12 +38,10 @@ class Rcs(object):
|
|||||||
def get_project_list(self):
|
def get_project_list(self):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def log(self, repo, branch, last_id):
|
def log(self, repo, branch, last_retrieval_time, status=None,
|
||||||
|
grab_comments=False):
|
||||||
return []
|
return []
|
||||||
|
|
||||||
def get_last_id(self, repo, branch):
|
|
||||||
return -1
|
|
||||||
|
|
||||||
def close(self):
|
def close(self):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
@@ -89,17 +87,15 @@ class Gerrit(Rcs):
|
|||||||
LOG.exception(e)
|
LOG.exception(e)
|
||||||
return False
|
return False
|
||||||
|
|
||||||
def _get_cmd(self, project_organization, module, branch, sort_key=None,
|
def _get_cmd(self, project_organization, module, branch, age=0,
|
||||||
is_open=False, limit=PAGE_LIMIT, grab_comments=False):
|
status=None, limit=PAGE_LIMIT, grab_comments=False):
|
||||||
cmd = ('gerrit query --all-approvals --patch-sets --format JSON '
|
cmd = ('gerrit query --all-approvals --patch-sets --format JSON '
|
||||||
'project:\'%(ogn)s/%(module)s\' branch:%(branch)s '
|
'project:\'%(ogn)s/%(module)s\' branch:%(branch)s '
|
||||||
'limit:%(limit)s' %
|
'limit:%(limit)s age:%(age)ss' %
|
||||||
{'ogn': project_organization, 'module': module,
|
{'ogn': project_organization, 'module': module,
|
||||||
'branch': branch, 'limit': limit})
|
'branch': branch, 'limit': limit, 'age': age})
|
||||||
if is_open:
|
if status:
|
||||||
cmd += ' is:open'
|
cmd += ' status:%s' % status
|
||||||
if sort_key:
|
|
||||||
cmd += ' resume_sortkey:%016x' % sort_key
|
|
||||||
if grab_comments:
|
if grab_comments:
|
||||||
cmd += ' --comments'
|
cmd += ' --comments'
|
||||||
return cmd
|
return cmd
|
||||||
@@ -123,37 +119,47 @@ class Gerrit(Rcs):
|
|||||||
return False
|
return False
|
||||||
|
|
||||||
def _poll_reviews(self, project_organization, module, branch,
|
def _poll_reviews(self, project_organization, module, branch,
|
||||||
start_id=0, last_id=0, is_open=False,
|
last_retrieval_time, status=None, grab_comments=False):
|
||||||
grab_comments=False):
|
age = 0
|
||||||
sort_key = start_id
|
proceed = True
|
||||||
last_id = last_id or 0
|
|
||||||
|
|
||||||
while True:
|
# the algorithm retrieves reviews by age; the next page is started
|
||||||
cmd = self._get_cmd(project_organization, module, branch, sort_key,
|
# with the time of the oldest; it is possible that the oldest
|
||||||
is_open, grab_comments=grab_comments)
|
# will be included in consequent result (as the age offsets to local
|
||||||
|
# machine timestamp, but evaluated remotely), so we need to track all
|
||||||
|
# ids and ignore those we've already seen
|
||||||
|
processed = set()
|
||||||
|
|
||||||
|
while proceed:
|
||||||
|
cmd = self._get_cmd(project_organization, module, branch,
|
||||||
|
age=age, status=status,
|
||||||
|
grab_comments=grab_comments)
|
||||||
LOG.debug('Executing command: %s', cmd)
|
LOG.debug('Executing command: %s', cmd)
|
||||||
exec_result = self._exec_command(cmd)
|
exec_result = self._exec_command(cmd)
|
||||||
if not exec_result:
|
if not exec_result:
|
||||||
break
|
break
|
||||||
stdin, stdout, stderr = exec_result
|
stdin, stdout, stderr = exec_result
|
||||||
|
|
||||||
proceed = False
|
proceed = False # assume there are no more reviews available
|
||||||
for line in stdout:
|
for line in stdout:
|
||||||
review = json.loads(line)
|
review = json.loads(line)
|
||||||
|
|
||||||
if 'sortKey' in review:
|
if 'number' in review: # choose reviews not summary
|
||||||
sort_key = int(review['sortKey'], 16)
|
|
||||||
if sort_key <= last_id:
|
if review['number'] in processed:
|
||||||
|
continue # already seen that
|
||||||
|
|
||||||
|
last_updated = int(review['lastUpdated'])
|
||||||
|
if last_updated < last_retrieval_time: # too old
|
||||||
proceed = False
|
proceed = False
|
||||||
break
|
break
|
||||||
|
|
||||||
proceed = True
|
proceed = True # have at least one review, can dig deeper
|
||||||
|
age = max(age, int(time.time()) - last_updated)
|
||||||
|
processed.add(review['number'])
|
||||||
review['module'] = module
|
review['module'] = module
|
||||||
yield review
|
yield review
|
||||||
|
|
||||||
if not proceed:
|
|
||||||
break
|
|
||||||
|
|
||||||
def get_project_list(self):
|
def get_project_list(self):
|
||||||
exec_result = self._exec_command('gerrit ls-projects')
|
exec_result = self._exec_command('gerrit ls-projects')
|
||||||
if not exec_result:
|
if not exec_result:
|
||||||
@@ -163,48 +169,16 @@ class Gerrit(Rcs):
|
|||||||
|
|
||||||
return result
|
return result
|
||||||
|
|
||||||
def log(self, repo, branch, last_id, grab_comments=False):
|
def log(self, repo, branch, last_retrieval_time, status=None,
|
||||||
# poll new reviews from the top down to last_id
|
grab_comments=False):
|
||||||
LOG.debug('Poll new reviews for module: %s', repo['module'])
|
# poll reviews down from top between last_r_t and current_r_t
|
||||||
for review in self._poll_reviews(repo['organization'],
|
LOG.debug('Poll reviews for module: %s', repo['module'])
|
||||||
repo['module'], branch,
|
for review in self._poll_reviews(
|
||||||
last_id=last_id,
|
repo['organization'], repo['module'], branch,
|
||||||
|
last_retrieval_time, status=status,
|
||||||
grab_comments=grab_comments):
|
grab_comments=grab_comments):
|
||||||
yield review
|
yield review
|
||||||
|
|
||||||
# poll open reviews from last_id down to bottom
|
|
||||||
LOG.debug('Poll open reviews for module: %s', repo['module'])
|
|
||||||
start_id = None
|
|
||||||
if last_id:
|
|
||||||
start_id = last_id + 1 # include the last review into query
|
|
||||||
for review in self._poll_reviews(repo['organization'],
|
|
||||||
repo['module'], branch,
|
|
||||||
start_id=start_id, is_open=True,
|
|
||||||
grab_comments=grab_comments):
|
|
||||||
yield review
|
|
||||||
|
|
||||||
def get_last_id(self, repo, branch):
|
|
||||||
LOG.debug('Get last id for module: %s', repo['module'])
|
|
||||||
|
|
||||||
cmd = self._get_cmd(repo['organization'], repo['module'],
|
|
||||||
branch, limit=1)
|
|
||||||
LOG.debug('Executing command: %s', cmd)
|
|
||||||
exec_result = self._exec_command(cmd)
|
|
||||||
if not exec_result:
|
|
||||||
return None
|
|
||||||
stdin, stdout, stderr = exec_result
|
|
||||||
|
|
||||||
last_id = None
|
|
||||||
for line in stdout:
|
|
||||||
review = json.loads(line)
|
|
||||||
if 'sortKey' in review:
|
|
||||||
last_id = int(review['sortKey'], 16)
|
|
||||||
break
|
|
||||||
|
|
||||||
LOG.debug('Module %(module)s last id is %(id)s',
|
|
||||||
{'module': repo['module'], 'id': last_id})
|
|
||||||
return last_id
|
|
||||||
|
|
||||||
def close(self):
|
def close(self):
|
||||||
self.client.close()
|
self.client.close()
|
||||||
|
|
||||||
|
|||||||
127
stackalytics/tests/unit/test_rcs.py
Normal file
127
stackalytics/tests/unit/test_rcs.py
Normal file
@@ -0,0 +1,127 @@
|
|||||||
|
# Copyright (c) 2015 Mirantis Inc.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||||
|
# implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
import json
|
||||||
|
|
||||||
|
import mock
|
||||||
|
import testtools
|
||||||
|
|
||||||
|
from stackalytics.processor import rcs
|
||||||
|
|
||||||
|
REVIEW_ONE = json.dumps(
|
||||||
|
{"project": "openstack/nova", "branch": "master", "topic": "bug/1494374",
|
||||||
|
"id": "Id741dfc769c02a5544691a7db49a7dbff6b11376", "number": "229382",
|
||||||
|
"subject": "method is nearly 400 LOC and should be broken up",
|
||||||
|
"createdOn": 1443613948, "lastUpdated": 1444222222,
|
||||||
|
"sortKey": "0038481b00038006", "open": True, "status": "NEW"})
|
||||||
|
REVIEW_END_LINE = json.dumps(
|
||||||
|
{"type": "stats", "rowCount": 2, "runTimeMilliseconds": 13})
|
||||||
|
|
||||||
|
|
||||||
|
class TestRcs(testtools.TestCase):
|
||||||
|
|
||||||
|
@mock.patch('paramiko.SSHClient')
|
||||||
|
def test_setup(self, mock_client_cons):
|
||||||
|
mock_client = mock.Mock()
|
||||||
|
mock_client_cons.return_value = mock_client
|
||||||
|
|
||||||
|
mock_connect = mock.Mock()
|
||||||
|
mock_client.connect = mock_connect
|
||||||
|
|
||||||
|
gerrit = rcs.Gerrit('gerrit://review.openstack.org')
|
||||||
|
setup_result = gerrit.setup(username='user', key_filename='key')
|
||||||
|
|
||||||
|
self.assertEqual(True, setup_result)
|
||||||
|
mock_connect.assert_called_once_with(
|
||||||
|
'review.openstack.org', port=rcs.DEFAULT_PORT, key_filename='key',
|
||||||
|
username='user')
|
||||||
|
|
||||||
|
@mock.patch('paramiko.SSHClient')
|
||||||
|
def test_setup_error(self, mock_client_cons):
|
||||||
|
mock_client = mock.Mock()
|
||||||
|
mock_client_cons.return_value = mock_client
|
||||||
|
|
||||||
|
mock_connect = mock.Mock()
|
||||||
|
mock_client.connect = mock_connect
|
||||||
|
mock_connect.side_effect = Exception
|
||||||
|
|
||||||
|
gerrit = rcs.Gerrit('gerrit://review.openstack.org')
|
||||||
|
setup_result = gerrit.setup(username='user', key_filename='key')
|
||||||
|
|
||||||
|
self.assertEqual(False, setup_result)
|
||||||
|
mock_connect.assert_called_once_with(
|
||||||
|
'review.openstack.org', port=rcs.DEFAULT_PORT, key_filename='key',
|
||||||
|
username='user')
|
||||||
|
|
||||||
|
@mock.patch('paramiko.SSHClient')
|
||||||
|
@mock.patch('time.time')
|
||||||
|
def test_log(self, mock_time, mock_client_cons):
|
||||||
|
mock_client = mock.Mock()
|
||||||
|
mock_client_cons.return_value = mock_client
|
||||||
|
|
||||||
|
mock_exec = mock.Mock()
|
||||||
|
mock_client.exec_command = mock_exec
|
||||||
|
mock_exec.side_effect = [
|
||||||
|
('', [REVIEW_ONE, REVIEW_END_LINE], ''), # one review and summary
|
||||||
|
('', [REVIEW_END_LINE], ''), # only summary = no more reviews
|
||||||
|
]
|
||||||
|
|
||||||
|
gerrit = rcs.Gerrit('uri')
|
||||||
|
|
||||||
|
repo = dict(organization='openstack', module='nova')
|
||||||
|
branch = 'master'
|
||||||
|
last_retrieval_time = 1444000000
|
||||||
|
mock_time.return_value = 1444333333
|
||||||
|
records = list(gerrit.log(repo, branch, last_retrieval_time))
|
||||||
|
|
||||||
|
self.assertEqual(1, len(records))
|
||||||
|
self.assertEqual('229382', records[0]['number'])
|
||||||
|
|
||||||
|
mock_client.exec_command.assert_has_calls([
|
||||||
|
mock.call('gerrit query --all-approvals --patch-sets '
|
||||||
|
'--format JSON project:\'openstack/nova\' branch:master '
|
||||||
|
'limit:100 age:0s'),
|
||||||
|
mock.call('gerrit query --all-approvals --patch-sets '
|
||||||
|
'--format JSON project:\'openstack/nova\' branch:master '
|
||||||
|
'limit:100 age:111111s'),
|
||||||
|
])
|
||||||
|
|
||||||
|
@mock.patch('paramiko.SSHClient')
|
||||||
|
def test_log_old_reviews(self, mock_client_cons):
|
||||||
|
mock_client = mock.Mock()
|
||||||
|
mock_client_cons.return_value = mock_client
|
||||||
|
|
||||||
|
mock_exec = mock.Mock()
|
||||||
|
mock_client.exec_command = mock_exec
|
||||||
|
mock_exec.side_effect = [
|
||||||
|
('', [REVIEW_ONE, REVIEW_END_LINE], ''), # one review and summary
|
||||||
|
('', [REVIEW_END_LINE], ''), # only summary = no more reviews
|
||||||
|
]
|
||||||
|
|
||||||
|
gerrit = rcs.Gerrit('uri')
|
||||||
|
|
||||||
|
repo = dict(organization='openstack', module='nova')
|
||||||
|
branch = 'master'
|
||||||
|
last_retrieval_time = 1445000000
|
||||||
|
records = list(gerrit.log(repo, branch, last_retrieval_time,
|
||||||
|
status='merged', grab_comments=True))
|
||||||
|
|
||||||
|
self.assertEqual(0, len(records))
|
||||||
|
|
||||||
|
mock_client.exec_command.assert_has_calls([
|
||||||
|
mock.call('gerrit query --all-approvals --patch-sets '
|
||||||
|
'--format JSON project:\'openstack/nova\' branch:master '
|
||||||
|
'limit:100 age:0s status:merged --comments'),
|
||||||
|
])
|
||||||
Reference in New Issue
Block a user