diff --git a/etc/test_default_data.json b/etc/test_default_data.json index 5bde4f429..01d7f98f9 100644 --- a/etc/test_default_data.json +++ b/etc/test_default_data.json @@ -41,13 +41,7 @@ "branches": ["master"], "module": "stackalytics", "project_type": "stackforge", - "uri": "git://github.com/stackforge/stackalytics.git", - "releases": [ - { - "release_name": "Havana", - "tag_to": "HEAD" - } - ] + "uri": "git://github.com/stackforge/stackalytics.git" } ], diff --git a/stackalytics/processor/default_data_processor.py b/stackalytics/processor/default_data_processor.py new file mode 100644 index 000000000..a94000343 --- /dev/null +++ b/stackalytics/processor/default_data_processor.py @@ -0,0 +1,77 @@ +# Copyright (c) 2013 Mirantis Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from stackalytics.openstack.common import log as logging +from stackalytics.processor import utils + +LOG = logging.getLogger(__name__) + + +def normalize_user(user): + user['emails'] = [email.lower() for email in user['emails']] + if user['launchpad_id']: + user['launchpad_id'] = user['launchpad_id'].lower() + + for c in user['companies']: + end_date_numeric = 0 + if c['end_date']: + end_date_numeric = utils.date_to_timestamp(c['end_date']) + c['end_date'] = end_date_numeric + + # sort companies by end_date + def end_date_comparator(x, y): + if x["end_date"] == 0: + return 1 + elif y["end_date"] == 0: + return -1 + else: + return cmp(x["end_date"], y["end_date"]) + + user['companies'].sort(cmp=end_date_comparator) + + +def _process_users(users): + for user in users: + if ('launchpad_id' not in user) or ('emails' not in user): + LOG.warn('Skipping invalid user: %s', user) + continue + + normalize_user(user) + user['user_id'] = user['launchpad_id'] or user['emails'][0] + + +def _process_releases(releases): + for release in releases: + release['release_name'] = release['release_name'].lower() + release['end_date'] = utils.date_to_timestamp(release['end_date']) + releases.sort(key=lambda x: x['end_date']) + + +def _process_repos(repos): + for repo in repos: + if 'releases' not in repo: + repo['releases'] = [] # release will be assigned automatically + +PROCESSORS = { + 'users': _process_users, + 'releases': _process_releases, + 'repos': _process_repos, +} + + +def process(persistent_storage, default_data): + for key, processor in PROCESSORS.items(): + processor(default_data[key]) + persistent_storage.sync(default_data, force=True) diff --git a/stackalytics/processor/main.py b/stackalytics/processor/main.py index 0f38a5f02..38860a652 100644 --- a/stackalytics/processor/main.py +++ b/stackalytics/processor/main.py @@ -23,11 +23,11 @@ from psutil import _error from stackalytics.openstack.common import log as logging from stackalytics.processor import config +from stackalytics.processor import default_data_processor from stackalytics.processor import persistent_storage from stackalytics.processor import rcs from stackalytics.processor import record_processor from stackalytics.processor import runtime_storage -from stackalytics.processor import utils from stackalytics.processor import vcs @@ -140,35 +140,9 @@ def _read_default_persistent_storage(file_name): LOG.error('Error while reading config: %s' % e) -def process_users(users): - res = [] - for user in users: - if ('launchpad_id' not in user) or ('emails' not in user): - LOG.warn('Skipping invalid user: %s', user) - continue - - u = utils.normalize_user(user.copy()) - u['user_id'] = user['launchpad_id'] or user['emails'][0] - res.append(u) - return res - - -def process_releases(releases): - res = [] - for release in releases: - r = utils.normalize_release(release) - res.append(r) - res.sort(key=lambda x: x['end_date']) - return res - - def load_default_data(persistent_storage_inst, file_name, force): default_data = _read_default_persistent_storage(file_name) - - default_data['users'] = process_users(default_data['users']) - default_data['releases'] = process_releases(default_data['releases']) - - persistent_storage_inst.sync(default_data, force=force) + default_data_processor.process(persistent_storage_inst, default_data) def main(): diff --git a/stackalytics/processor/record_processor.py b/stackalytics/processor/record_processor.py index 8f37d4899..63b7302ec 100644 --- a/stackalytics/processor/record_processor.py +++ b/stackalytics/processor/record_processor.py @@ -12,13 +12,14 @@ # implied. # See the License for the specific language governing permissions and # limitations under the License. -import bisect -import logging +import bisect import re from launchpadlib import launchpad from oslo.config import cfg +from stackalytics.openstack.common import log as logging +from stackalytics.processor import default_data_processor from stackalytics.processor import utils LOG = logging.getLogger(__name__) @@ -51,6 +52,13 @@ class CachedProcessor(RecordProcessor): for email in user['emails']: self.users_index[email] = user + self.releases = list(persistent_storage.get_releases()) + self.releases_dates = [r['end_date'] for r in self.releases] + + def _get_release(self, timestamp): + release_index = bisect.bisect(self.releases_dates, timestamp) + return self.releases[release_index]['release_name'] + def _find_company(self, companies, date): for r in companies: if date < r['end_date']: @@ -101,6 +109,7 @@ class CachedProcessor(RecordProcessor): 'end_date': 0, }], } + default_data_processor.normalize_user(user) self.persistent_storage.insert_user(user) return user @@ -133,6 +142,7 @@ class CachedProcessor(RecordProcessor): 'end_date': 0 }] } + default_data_processor.normalize_user(user) # add new user self.persistent_storage.insert_user(user) else: @@ -186,6 +196,9 @@ class CommitProcessor(CachedProcessor): record['week'] = utils.timestamp_to_week(record['date']) record['loc'] = record['lines_added'] + record['lines_deleted'] + if not record['release']: + record['release'] = self._get_release(record['date']) + yield record @@ -198,15 +211,8 @@ class ReviewProcessor(CachedProcessor): for user in users: self.launchpad_to_company_index[user['launchpad_id']] = user - self.releases = list(persistent_storage.get_releases()) - self.releases_dates = [r['end_date'] for r in self.releases] - LOG.debug('Review processor is instantiated') - def _get_release(self, timestamp): - release_index = bisect.bisect(self.releases_dates, timestamp) - return self.releases[release_index]['release_name'] - def _process_user(self, email, launchpad_id, user_name, date): if email in self.users_index: user = self.users_index[email] @@ -228,21 +234,25 @@ class ReviewProcessor(CachedProcessor): return # ignore review['record_type'] = 'review' - review['primary_key'] = record['id'] + review['primary_key'] = review['id'] review['launchpad_id'] = owner['username'] + review['author'] = owner['name'] review['author_email'] = owner['email'].lower() review['release'] = self._get_release(review['createdOn']) + review['week'] = utils.timestamp_to_week(review['createdOn']) company, user_id = self._process_user(review['author_email'], review['launchpad_id'], - owner['name'], - record['createdOn']) + review['author'], + review['createdOn']) review['company_name'] = company review['user_id'] = user_id yield review def _spawn_marks(self, record): review_id = record['id'] + module = record['module'] + for patch in record['patchSets']: if 'approvals' not in patch: continue # not reviewed by anyone @@ -260,17 +270,19 @@ class ReviewProcessor(CachedProcessor): str(mark['grantedOn']) + mark['type']) mark['launchpad_id'] = reviewer['username'] + mark['author'] = reviewer['name'] mark['author_email'] = reviewer['email'].lower() - mark['module'] = record['module'] + mark['module'] = module + mark['review_id'] = review_id + mark['release'] = self._get_release(mark['grantedOn']) + mark['week'] = utils.timestamp_to_week(mark['grantedOn']) company, user_id = self._process_user(mark['author_email'], mark['launchpad_id'], - reviewer['name'], + mark['author'], mark['grantedOn']) mark['company_name'] = company mark['user_id'] = user_id - mark['review_id'] = review_id - mark['release'] = self._get_release(mark['grantedOn']) yield mark diff --git a/stackalytics/processor/utils.py b/stackalytics/processor/utils.py index 7a0680cd0..3df47ff85 100644 --- a/stackalytics/processor/utils.py +++ b/stackalytics/processor/utils.py @@ -17,36 +17,6 @@ import datetime import time -def normalize_user(user): - user['emails'] = [email.lower() for email in user['emails']] - if user['launchpad_id']: - user['launchpad_id'] = user['launchpad_id'].lower() - - for c in user['companies']: - end_date_numeric = 0 - if c['end_date']: - end_date_numeric = date_to_timestamp(c['end_date']) - c['end_date'] = end_date_numeric - - # sort companies by end_date - def end_date_comparator(x, y): - if x["end_date"] == 0: - return 1 - elif y["end_date"] == 0: - return -1 - else: - return cmp(x["end_date"], y["end_date"]) - - user['companies'].sort(cmp=end_date_comparator) - return user - - -def normalize_release(release): - release['release_name'] = release['release_name'].lower() - release['end_date'] = date_to_timestamp(release['end_date']) - return release - - def date_to_timestamp(d): if d == 'now': return int(time.time()) diff --git a/stackalytics/processor/vcs.py b/stackalytics/processor/vcs.py index a1b93597c..859538d2e 100644 --- a/stackalytics/processor/vcs.py +++ b/stackalytics/processor/vcs.py @@ -78,24 +78,22 @@ class Git(Vcs): uri = self.repo['uri'] match = re.search(r'([^\/]+)\.git$', uri) if match: - self.module = match.group(1) + self.folder = os.path.normpath(self.sources_root + '/' + + match.group(1)) else: raise Exception('Unexpected uri %s for git' % uri) self.release_index = {} def _chdir(self): - folder = os.path.normpath(self.sources_root + '/' + self.module) - os.chdir(folder) + os.chdir(self.folder) def fetch(self): LOG.debug('Fetching repo uri %s' % self.repo['uri']) - folder = os.path.normpath(self.sources_root + '/' + self.module) - - if not os.path.exists(folder): + if not os.path.exists(self.folder): os.chdir(self.sources_root) sh.git('clone', '%s' % self.repo['uri']) - os.chdir(folder) + os.chdir(self.folder) else: self._chdir() sh.git('pull', 'origin') @@ -157,7 +155,7 @@ class Git(Vcs): commit[key] = None commit['date'] = int(commit['date']) - commit['module'] = self.module + commit['module'] = self.repo['module'] commit['branches'] = set([branch]) if commit['commit_id'] in self.release_index: commit['release'] = self.release_index[commit['commit_id']] diff --git a/tests/unit/test_commit_processor.py b/tests/unit/test_commit_processor.py index 2a0494201..58d4699d4 100644 --- a/tests/unit/test_commit_processor.py +++ b/tests/unit/test_commit_processor.py @@ -20,6 +20,7 @@ import testtools from stackalytics.processor import persistent_storage from stackalytics.processor import record_processor +from stackalytics.processor import utils class TestCommitProcessor(testtools.TestCase): @@ -57,6 +58,17 @@ class TestCommitProcessor(testtools.TestCase): self.user, ]) + p_storage.get_releases = mock.Mock(return_value=[ + { + 'release_name': 'prehistory', + 'end_date': utils.date_to_timestamp('2011-Apr-21') + }, + { + 'release_name': 'Diablo', + 'end_date': utils.date_to_timestamp('2011-Sep-08') + }, + ]) + self.persistent_storage = p_storage self.commit_processor = record_processor.CommitProcessor(p_storage) self.launchpad_patch = mock.patch('launchpadlib.launchpad.Launchpad') diff --git a/tests/unit/test_vcs.py b/tests/unit/test_vcs.py index c0435d825..cfc5cfec0 100644 --- a/tests/unit/test_vcs.py +++ b/tests/unit/test_vcs.py @@ -27,6 +27,7 @@ class TestVcsProcessor(testtools.TestCase): super(TestVcsProcessor, self).setUp() self.repo = { + 'module': 'dummy', 'uri': 'git://github.com/dummy.git', 'releases': [] }