Rework CI votes processing

1. Take into account only votes for the merged change requests.
2. Make processing compatible with the latest DriverLog: process comments
   only and allow multiple CIs share the same gerrit-id

NOTE: this patch introduces incompatible changes into data stored
in runtime storage. Full re-load is required.

Change-Id: Ic3e31d3cd3164c5ce786e0a19df2b4d0529ef40f
This commit is contained in:
Ilya Shakhat 2015-10-07 12:54:50 +03:00
parent 4912190783
commit 1b6a5fe764
12 changed files with 279 additions and 161 deletions

View File

@ -54,7 +54,7 @@ METRIC_TO_RECORD_TYPE = {
'resolved-bugs': ['bugr'],
'members': ['member'],
'person-day': ['mark', 'patch', 'email', 'bpd', 'bugf'],
'ci': ['ci_vote'],
'ci': ['ci'],
'patches': ['patch'],
}

View File

@ -230,7 +230,7 @@ def _get_activity_summary(record_ids):
memory_storage_inst = vault.get_memory_storage()
record_ids_by_type = memory_storage_inst.get_record_ids_by_types(
['mark', 'patch', 'email', 'bpd', 'bpc', 'ci_vote'])
['mark', 'patch', 'email', 'bpd', 'bpc', 'ci'])
record_ids &= record_ids_by_type
punch_card_data = _get_punch_card_data(

View File

@ -158,10 +158,11 @@ show_record_type=True, show_user_gravatar=True, gravatar_size=32, show_all=True)
<div class="header">Bug &ldquo;${title}&rdquo; (<a href="${web_link}" class="ext_link">${number}</a>)</div>
<div>Status: <span class="status${status_class}">${status}</span></div>
<div>Importance: <span class="importance${importance}">${importance}</span></div>
{%elif record_type == "ci_vote" %}
<div class="header">New CI vote in change request ${review_number}
{%if is_merged %}(<span style="color: green;">Merged</span>){%/if%}</div>
<div>Parsed result: {%if ci_result == true %}<span style="color: green">Success</span>{%else%}<span style="color: red">Failure</span>{%/if%}</div>
{%elif record_type == "ci" %}
<div class="header">CI vote in merged change request
<a href="https://review.openstack.org/#/c/${review_number}" target="_blank">${review_number}</a>
</div>
<div>Parsed result: {%if value == true %}<span style="color: green">Success</span>{%else%}<span style="color: red">Failure</span>{%/if%}</div>
<div>Message: ${message}</div>
<div>Change Id: <a href="https://review.openstack.org/#/c/${review_number}" target="_blank">${review_id}</a></div>
{%elif record_type == "member" %}

View File

@ -159,31 +159,26 @@ def _update_with_driverlog_data(default_data, driverlog_data_uri):
LOG.info('Reading DriverLog data from uri: %s', driverlog_data_uri)
driverlog_data = utils.read_json_from_uri(driverlog_data_uri)
module_ci_ids = {}
ci_ids = set()
module_cis = collections.defaultdict(list)
for driver in driverlog_data['drivers']:
if 'ci' in driver:
module = driver['project_id'].split('/')[1]
if 'ci' not in driver:
continue
if module not in module_ci_ids:
module_ci_ids[module] = {}
ci_id = driver['ci']['id']
module_ci_ids[module][ci_id] = driver
module = (driver.get('repo') or driver['project_id']).split('/')[1]
if ci_id not in ci_ids:
ci_ids.add(ci_id)
default_data['users'].append({
'user_id': user_processor.make_user_id(gerrit_id=ci_id),
'gerrit_id': ci_id,
'user_name': ci_id,
'static': True,
'companies': [
{'company_name': driver['vendor'], 'end_date': None}],
})
module_cis[module].append(driver)
default_data['users'].append({
'user_id': user_processor.make_user_id(ci_id=driver['name']),
'user_name': driver['name'],
'static': True,
'companies': [
{'company_name': driver['vendor'], 'end_date': None}],
})
for repo in default_data['repos']:
if repo['module'] in module_ci_ids:
repo['ci'] = module_ci_ids[repo['module']]
if repo['module'] in module_cis:
repo['drivers'] = module_cis[repo['module']]
def _store_users(runtime_storage_inst, users):

View File

@ -16,77 +16,88 @@
import re
from oslo_log import log as logging
from stackalytics.processor import user_processor
LOG = logging.getLogger(__name__)
def _find_vote(review, ci_id, patch_set_number):
"""Finds vote corresponding to ci_id."""
for patch_set in review['patchSets']:
if patch_set['number'] == patch_set_number:
for approval in (patch_set.get('approvals') or []):
if approval['type'] not in ['Verified', 'VRIF']:
continue
if approval['by'].get('username') == ci_id:
return approval['value'] in ['1', '2']
return None
def find_ci_result(review, ci_map):
def _find_ci_result(review, drivers):
"""For a given stream of reviews yields results produced by CIs."""
review_id = review['id']
review_number = review['number']
ci_already_seen = set()
ci_id_set = set(d['ci']['id'] for d in drivers)
candidate_drivers = [d for d in drivers]
last_patch_set_number = review['patchSets'][-1]['number']
for comment in reversed(review.get('comments') or []):
reviewer_id = comment['reviewer'].get('username')
if reviewer_id not in ci_map:
continue
comment_author = comment['reviewer'].get('username')
if comment_author not in ci_id_set:
continue # not any of registered CIs
message = comment['message']
m = re.match(r'Patch Set (?P<number>\d+):(?P<message>.*)',
message, flags=re.DOTALL)
if not m:
continue # do not understand comment
patch_set_number = m.groupdict()['number']
message = m.groupdict()['message'].strip()
prefix = 'Patch Set'
if comment['message'].find(prefix) != 0:
continue # look for special messages only
prefix = 'Patch Set %s:' % last_patch_set_number
if comment['message'].find(prefix) != 0:
break # all comments from the latest patch set already parsed
message = message[len(prefix):].strip()
result = None
ci = ci_map[reviewer_id]['ci']
matched_drivers = set()
# try to get result by parsing comment message
success_pattern = ci.get('success_pattern')
failure_pattern = ci.get('failure_pattern')
for driver in candidate_drivers:
ci = driver['ci']
if ci['id'] != comment_author:
continue
if success_pattern and re.search(success_pattern, message):
result = True
elif failure_pattern and re.search(failure_pattern, message):
result = False
# try to get result by parsing comment message
success_pattern = ci.get('success_pattern')
failure_pattern = ci.get('failure_pattern')
# try to get result from vote
if result is None:
result = _find_vote(review, ci['id'], patch_set_number)
message_lines = (l for l in message.split('\n') if l.strip())
if result is not None:
is_merged = (
review['status'] == 'MERGED' and
patch_set_number == review['patchSets'][-1]['number'] and
ci['id'] not in ci_already_seen)
line = ''
for line in message_lines:
if success_pattern and re.search(success_pattern, line):
result = True
break
elif failure_pattern and re.search(failure_pattern, line):
result = False
break
ci_already_seen.add(ci['id'])
if result is not None:
matched_drivers.add(driver['name'])
record = {
'user_id': user_processor.make_user_id(
ci_id=driver['name']),
'value': result,
'message': line,
'date': comment['timestamp'],
'branch': review['branch'],
'review_id': review_id,
'review_number': review_number,
'driver_name': driver['name'],
'driver_vendor': driver['vendor'],
'module': review['module']
}
if review['branch'].find('/') > 0:
record['release'] = review['branch'].split('/')[1]
yield {
'reviewer': comment['reviewer'],
'ci_result': result,
'is_merged': is_merged,
'message': message,
'date': comment['timestamp'],
'review_id': review_id,
'review_number': review_number,
'driver_name': ci_map[reviewer_id]['name'],
'driver_vendor': ci_map[reviewer_id]['vendor'],
}
yield record
candidate_drivers = [d for d in candidate_drivers
if d['name'] not in matched_drivers]
if not candidate_drivers:
break # found results from all drivers
def log(review_iterator, drivers):
for record in review_iterator:
for driver_info in _find_ci_result(record, drivers):
yield driver_info

View File

@ -21,7 +21,6 @@ import memcache
from oslo_config import cfg
from oslo_log import log as logging
import six
from six.moves.urllib import parse
from stackalytics.processor import config
from stackalytics.processor import utils
@ -84,14 +83,19 @@ def import_data(memcached_inst, fd):
def get_repo_keys(memcached_inst):
for repo in (memcached_inst.get('repos') or []):
uri = repo['uri']
quoted_uri = six.moves.urllib.parse.quote_plus(uri)
yield 'bug_modified_since-%s' % repo['module']
branches = {repo.get('default_branch', 'master')}
for release in repo.get('releases'):
if 'branch' in release:
branches.add(release['branch'])
for branch in branches:
yield 'vcs:' + str(parse.quote_plus(uri) + ':' + branch)
yield 'rcs:' + str(parse.quote_plus(uri) + ':' + branch)
yield 'vcs:%s:%s' % (quoted_uri, branch)
yield 'rcs:%s:%s' % (quoted_uri, branch)
yield 'ci:%s:%s' % (quoted_uri, branch)
def export_data(memcached_inst, fd):

View File

@ -75,22 +75,6 @@ def _record_typer(record_iterator, record_type):
yield record
def _process_reviews(record_iterator, ci_map, module, branch):
for record in record_iterator:
yield record
for driver_info in driverlog.find_ci_result(record, ci_map):
driver_info['record_type'] = 'ci_vote'
driver_info['module'] = module
driver_info['branch'] = branch
release = branch.lower()
if release.find('/') > 0:
driver_info['release'] = release.split('/')[1]
yield driver_info
def _process_repo(repo, runtime_storage_inst, record_processor_inst,
rcs_inst):
uri = repo['uri']
@ -154,10 +138,6 @@ def _process_repo(repo, runtime_storage_inst, record_processor_inst,
grab_comments=('ci' in repo))
review_iterator_typed = _record_typer(review_iterator, 'review')
if 'ci' in repo: # add external CI data
review_iterator_typed = _process_reviews(
review_iterator_typed, repo['ci'], repo['module'], branch)
processed_review_iterator = record_processor_inst.process(
review_iterator_typed)
runtime_storage_inst.set_records(processed_review_iterator,
@ -165,6 +145,26 @@ def _process_repo(repo, runtime_storage_inst, record_processor_inst,
runtime_storage_inst.set_by_key(rcs_key, current_retrieval_time)
if 'drivers' in repo:
LOG.debug('Processing CI votes for repo: %s, branch: %s',
uri, branch)
rcs_key = 'ci:%s:%s' % (quoted_uri, branch)
last_retrieval_time = runtime_storage_inst.get_by_key(rcs_key)
current_retrieval_time = int(time.time())
review_iterator = rcs_inst.log(repo, branch, last_retrieval_time,
status='merged', grab_comments=True)
review_iterator = driverlog.log(review_iterator, repo['drivers'])
review_iterator_typed = _record_typer(review_iterator, 'ci')
processed_review_iterator = record_processor_inst.process(
review_iterator_typed)
runtime_storage_inst.set_records(processed_review_iterator,
utils.merge_records)
runtime_storage_inst.set_by_key(rcs_key, current_retrieval_time)
def _process_mail_list(uri, runtime_storage_inst, record_processor_inst):
mail_iterator = mls.log(uri, runtime_storage_inst)
@ -265,6 +265,9 @@ def process_project_list(runtime_storage_inst, project_list_uri):
module = repo['module']
module_groups[module] = utils.make_module_group(module, tag='module')
if 'drivers' in repo:
module_groups[module]['has_drivers'] = True
# register module 'unknown' - used for emails not mapped to any module
module_groups['unknown'] = utils.make_module_group('unknown', tag='module')

View File

@ -45,8 +45,8 @@ def _normalize_user(user):
launchpad_id=user.get('launchpad_id'),
emails=user.get('emails'),
gerrit_id=user.get('gerrit_id'),
github_id=user.get('user_id'),
ldap_id=user.get('ldap_id'))
github_id=user.get('github_id'),
ldap_id=user.get('ldap_id')) or user.get('user_id')
def _normalize_users(users):

View File

@ -547,17 +547,12 @@ class RecordProcessor(object):
yield record
def _process_ci(self, record):
ci_vote = dict((k, v) for k, v in six.iteritems(record)
if k not in ['reviewer'])
ci_vote = dict((k, v) for k, v in six.iteritems(record))
reviewer = record['reviewer']
ci_vote['primary_key'] = ('%s:%s' % (reviewer['username'],
ci_vote['date']))
ci_vote['user_id'] = reviewer['username']
ci_vote['gerrit_id'] = reviewer['username']
ci_vote['author_name'] = reviewer.get('name') or reviewer['username']
ci_vote['author_email'] = (
reviewer.get('email') or reviewer['username']).lower()
ci_vote['primary_key'] = '%s:%s' % (record['review_id'],
record['driver_name'])
ci_vote['author_name'] = record['driver_name']
ci_vote['author_email'] = record['user_id']
self._update_record_and_user(ci_vote)
@ -576,7 +571,7 @@ class RecordProcessor(object):
'bp': self._process_blueprint,
'bug': self._process_bug,
'member': self._process_member,
'ci_vote': self._process_ci,
'ci': self._process_ci,
}
for record in record_iterator:

View File

@ -12,7 +12,9 @@
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
import re
from oslo_log import log as logging
@ -20,7 +22,7 @@ LOG = logging.getLogger(__name__)
def make_user_id(emails=None, launchpad_id=None, gerrit_id=None,
member_id=None, github_id=None, ldap_id=None):
member_id=None, github_id=None, ldap_id=None, ci_id=None):
if launchpad_id or emails:
return launchpad_id or emails[0]
if gerrit_id:
@ -31,6 +33,8 @@ def make_user_id(emails=None, launchpad_id=None, gerrit_id=None,
return 'github:%s' % github_id
if ldap_id:
return 'ldap:%s' % ldap_id
if ci_id:
return 'ci:%s' % re.sub(r'[^\w]', '_', ci_id.lower())
def store_user(runtime_storage_inst, user):

View File

@ -97,3 +97,61 @@ class TestDefaultDataProcessor(testtools.TestCase):
'module_group_name': 'stackforge',
'modules': ['tux'],
'tag': 'organization'}, dd['module_groups'])
@mock.patch('stackalytics.processor.utils.read_json_from_uri')
def test_update_with_driverlog(self, mock_read_from_json):
default_data = {'repos': [{'module': 'cinder', }], 'users': []}
driverlog_dd = {'drivers': [{
'project_id': 'openstack/cinder',
'vendor': 'VMware',
'name': 'VMware VMDK Driver',
'ci': {
'id': 'vmwareminesweeper',
'success_pattern': 'Build successful',
'failure_pattern': 'Build failed'
}
}]}
mock_read_from_json.return_value = driverlog_dd
default_data_processor._update_with_driverlog_data(default_data, 'uri')
expected_user = {
'user_id': 'ci:vmware_vmdk_driver',
'user_name': 'VMware VMDK Driver',
'static': True,
'companies': [
{'company_name': 'VMware', 'end_date': None}],
}
self.assertIn(expected_user, default_data['users'])
self.assertIn(driverlog_dd['drivers'][0],
default_data['repos'][0]['drivers'])
@mock.patch('stackalytics.processor.utils.read_json_from_uri')
def test_update_with_driverlog_specific_repo(self, mock_read_from_json):
default_data = {'repos': [{'module': 'fuel-plugin-mellanox', }],
'users': []}
driverlog_dd = {'drivers': [{
'project_id': 'openstack/fuel',
'repo': 'stackforge/fuel-plugin-mellanox',
'vendor': 'Mellanox',
'name': 'ConnectX-3 Pro Network Adapter Support plugin',
'ci': {
'id': 'mellanox',
'success_pattern': 'SUCCESS',
'failure_pattern': 'FAILURE'
}
}]}
mock_read_from_json.return_value = driverlog_dd
default_data_processor._update_with_driverlog_data(default_data, 'uri')
expected_user = {
'user_id': 'ci:connectx_3_pro_network_adapter_support_plugin',
'user_name': 'ConnectX-3 Pro Network Adapter Support plugin',
'static': True,
'companies': [
{'company_name': 'Mellanox', 'end_date': None}],
}
self.assertIn(expected_user, default_data['users'])
self.assertIn(driverlog_dd['drivers'][0],
default_data['repos'][0]['drivers'])

View File

@ -12,68 +12,115 @@
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
import testtools
from stackalytics.processor import driverlog
COMMENT_SUCCESS = {
'message': 'Patch Set 2: build successful',
'reviewer': {'username': 'virt-ci'},
'timestamp': 1234567890
}
COMMENT_FAILURE = {
'message': 'Patch Set 2: build failed',
'reviewer': {'username': 'virt-ci'},
'timestamp': 1234567880
}
REVIEW = {
'record_type': 'review',
'id': 'I1045730e47e9e6ad31fcdfbaefdad77e2f3b2c3e',
'module': 'nova',
'branch': 'master',
'status': 'MERGED',
'number': '97860',
'patchSets': [{'number': '1'}, {'number': '2'}],
'comments': [
{'message': 'Patch Set 2: build successful',
'reviewer': {'username': 'other-ci'}, },
{'message': 'Patch Set 2: job started',
'reviewer': {'username': 'virt-ci'}, }]
}
DRIVER = {
'name': 'Virt Nova Driver',
'vendor': 'Virt Inc',
'ci': {
'id': 'virt-ci',
'success_pattern': 'successful',
'failure_pattern': 'failed',
}
}
DRIVER_NON_EXISTENT = {
'name': 'No Virt Nova Driver',
'vendor': 'No Virt Inc',
'ci': {
'id': 'no-virt-ci',
'success_pattern': 'successful',
'failure_pattern': 'failed',
}
}
class TestDriverlog(testtools.TestCase):
def setUp(self):
super(TestDriverlog, self).setUp()
def test_find_ci_result_voting_ci(self):
review = {
'record_type': 'review',
'id': 'I1045730e47e9e6ad31fcdfbaefdad77e2f3b2c3e',
'module': 'nova',
'branch': 'master',
'status': 'NEW',
'number': '97860',
'patchSets': [
{'number': '1',
'approvals': [
{'type': 'Verified', 'description': 'Verified',
'value': '1', 'grantedOn': 1234567890 - 1,
'by': {
'name': 'Batman',
'email': 'batman@openstack.org',
'username': 'batman'}},
{'type': 'Verified', 'description': 'Verified',
'value': '-1', 'grantedOn': 1234567890,
'by': {
'name': 'Pikachu',
'email': 'pikachu@openstack.org',
'username': 'pikachu'}},
]}],
'comments': [
{'message': 'Patch Set 1: build successful',
'reviewer': {'username': 'batman'},
'timestamp': 1234567890}
]}
def test_find_ci_result_success(self):
drivers = [DRIVER]
review = copy.deepcopy(REVIEW)
review['comments'].append(COMMENT_SUCCESS)
ci_map = {
'batman': {
'name': 'Batman Driver',
'vendor': 'Gotham Inc',
'ci': {
'id': 'batman'
}
}
}
res = list(driverlog.find_ci_result(review, ci_map))
res = list(driverlog.log([review], drivers))
expected_result = {
'reviewer': {'username': 'batman'},
'ci_result': True,
'is_merged': False,
'user_id': 'ci:virt_nova_driver',
'value': True,
'message': 'build successful',
'date': 1234567890,
'branch': 'master',
'review_id': 'I1045730e47e9e6ad31fcdfbaefdad77e2f3b2c3e',
'review_number': '97860',
'driver_name': 'Batman Driver',
'driver_vendor': 'Gotham Inc',
'driver_name': 'Virt Nova Driver',
'driver_vendor': 'Virt Inc',
'module': 'nova',
}
self.assertEqual(1, len(res), 'One CI result is expected')
self.assertEqual(expected_result, res[0])
def test_find_ci_result_failure(self):
drivers = [DRIVER]
review = copy.deepcopy(REVIEW)
review['comments'].append(COMMENT_FAILURE)
res = list(driverlog.log([review], drivers))
self.assertEqual(1, len(res), 'One CI result is expected')
self.assertEqual(False, res[0]['value'])
def test_find_ci_result_non_existent(self):
drivers = [DRIVER_NON_EXISTENT]
review = copy.deepcopy(REVIEW)
review['comments'].append(COMMENT_SUCCESS)
res = list(driverlog.log([REVIEW], drivers))
self.assertEqual(0, len(res), 'No CI results expected')
def test_find_ci_result_last_vote_only(self):
# there may be multiple comments from the same CI,
# only the last one is important
drivers = [DRIVER]
review = copy.deepcopy(REVIEW)
review['comments'].append(COMMENT_FAILURE)
review['comments'].append(COMMENT_SUCCESS)
res = list(driverlog.log([review], drivers))
self.assertEqual(1, len(res), 'One CI result is expected')
self.assertEqual(True, res[0]['value'])