Record updater refactoring

Moved together code that updates records after default data changed
and code that called during data pulling. Removed unnecessary code.

Part of bug 1260696

Change-Id: Ic9b52f84fbf0237669e920541524c4e3e62f09d1
This commit is contained in:
Ilya Shakhat
2013-12-24 17:26:37 +04:00
parent 26fb1a1a26
commit 8eea5e3d86
4 changed files with 41 additions and 85 deletions

View File

@@ -120,7 +120,7 @@ def _store_default_data(runtime_storage_inst, default_data):
def _update_records(runtime_storage_inst, sources_root):
LOG.debug('Gather release index for all repos')
LOG.debug('Update existing records')
release_index = {}
for repo in utils.load_repos(runtime_storage_inst):
vcs_inst = vcs.get_vcs(repo, sources_root)
@@ -128,18 +128,7 @@ def _update_records(runtime_storage_inst, sources_root):
record_processor_inst = record_processor.RecordProcessor(
runtime_storage_inst)
# need to iterate over full view of records and generate valid
# users profiles
LOG.debug('Iterate all records to create valid users profiles')
for record in runtime_storage_inst.get_all_records():
record_processor_inst.update_user(record)
# update records according to generated users profiles
LOG.debug('Update all records according to users profiles')
updated_records = record_processor_inst.update(
runtime_storage_inst.get_all_records(), release_index)
runtime_storage_inst.set_records(updated_records)
record_processor_inst.update(release_index)
def process(runtime_storage_inst, default_data, sources_root, force_update):

View File

@@ -150,7 +150,7 @@ def update_records(runtime_storage_inst):
process_mail_list(mail_list, runtime_storage_inst,
record_processor_inst)
record_processor_inst.finalize()
record_processor_inst.update()
def apply_corrections(uri, runtime_storage_inst):

View File

@@ -34,8 +34,6 @@ class RecordProcessor(object):
self.modules = None
self.updated_users = set()
def _get_release(self, timestamp):
release_index = bisect.bisect(self.releases_dates, timestamp)
return self.releases[release_index]['release_name']
@@ -134,7 +132,6 @@ class RecordProcessor(object):
LOG.debug('Updating affiliation of user %s to %s',
user['user_id'], company_name)
uc[0]['company_name'] = company_name
self.updated_users.add(user['user_id'])
break
def _get_user_exact_company(self, user):
@@ -149,7 +146,6 @@ class RecordProcessor(object):
user[key] = user_a.get(key) or user_b.get(key) or user_c.get(key)
if user['launchpad_id'] and user['user_id'] != user['launchpad_id']:
self.updated_users.add(user['user_id'])
user['user_id'] = user['launchpad_id']
emails = set([])
@@ -161,11 +157,6 @@ class RecordProcessor(object):
user['core'] = list(core_in)
self._update_user_affiliation(user)
if (self._get_user_exact_company(user_b) and
(self._get_user_exact_company(user)
!= self._get_user_exact_company(user_b))):
# affiliation changed automatically
self.updated_users.add(user_b['user_id'])
if user_a.get('seq') and user_b.get('seq'):
LOG.debug('Delete user: %s', user_b)
@@ -397,10 +388,23 @@ class RecordProcessor(object):
yield r
def update(self, record_iterator, release_index):
for record in record_iterator:
need_update = False
def _update_records_with_releases(self, release_index):
LOG.debug('Update records with releases')
for record in self.runtime_storage_inst.get_all_records():
if record['primary_key'] in release_index:
release = release_index[record['primary_key']]
else:
release = self._get_release(record['date'])
if record['release'] != release:
record['release'] = release
yield record
def _update_records_with_user_info(self):
LOG.debug('Update user info in records')
for record in self.runtime_storage_inst.get_all_records():
company_name = record['company_name']
user_id = record['user_id']
author_name = record['author_name']
@@ -410,42 +414,10 @@ class RecordProcessor(object):
if ((record['company_name'] != company_name) or
(record['user_id'] != user_id) or
(record['author_name'] != author_name)):
need_update = True
if record['primary_key'] in release_index:
release = release_index[record['primary_key']]
else:
release = self._get_release(record['date'])
if record['release'] != release:
need_update = True
record['release'] = release
if need_update:
yield record
def _update_records_with_user_info(self):
LOG.debug('Update user info in records')
for record in self.runtime_storage_inst.get_all_records():
need_update = False
if record['user_id'] in self.updated_users:
user = utils.load_user(self.runtime_storage_inst,
record['user_id'])
user_company_name = user['companies'][0]['company_name']
if record['company_name'] != user_company_name:
LOG.debug('Update record %s: company changed to: %s',
record['primary_key'], user_company_name)
record['company_name'] = user_company_name
need_update = True
if record['user_id'] != user['user_id']:
LOG.debug('Update record %s, user id changed to: %s',
record['primary_key'], user['user_id'])
record['user_id'] = user['user_id']
need_update = True
if need_update:
LOG.debug('User info (%(id)s, %(name)s, %(company)s) has '
'changed in record %(record)s',
{'id': user_id, 'name': author_name,
'company': company_name, 'record': record})
yield record
def _update_commits_with_merge_date(self):
@@ -465,6 +437,9 @@ class RecordProcessor(object):
if old_date != change_id_to_date[change_id]:
record['date'] = change_id_to_date[change_id]
self._renew_record_date(record)
LOG.debug('Date %(date)s has changed in record '
'%(record)s', {'date': old_date,
'record': record})
yield record
def _update_blueprints_with_mention_info(self):
@@ -616,10 +591,14 @@ class RecordProcessor(object):
if old_disagreement != disagreement:
yield mark
def finalize(self):
def update(self, release_index=None):
self.runtime_storage_inst.set_records(
self._update_records_with_user_info())
if release_index:
self.runtime_storage_inst.set_records(
self._update_records_with_releases(release_index))
self.runtime_storage_inst.set_records(
self._update_reviews_with_sequence_number())

View File

@@ -12,6 +12,7 @@
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import itertools
import mock
@@ -629,7 +630,7 @@ class TestRecordProcessor(testtools.TestCase):
'date_created': 1234567890},
{'record_type': 'email',
'message_id': '<message-id>',
'author_email': 'john_doe@ibm.com',
'author_email': 'john_doe@ibm.com', 'author_name': 'John Doe',
'subject': 'hello, world!',
'body': 'lorem ipsum',
'date': 1234567890},
@@ -643,7 +644,7 @@ class TestRecordProcessor(testtools.TestCase):
'module': 'nova', 'branch': 'master'}
]))
record_processor_inst.finalize()
record_processor_inst.update()
user = {'seq': 2,
'core': [],
@@ -713,7 +714,7 @@ class TestRecordProcessor(testtools.TestCase):
}]}
]))
record_processor_inst.finalize()
record_processor_inst.update()
user_1 = {'seq': 1, 'user_id': 'john_doe',
'launchpad_id': 'john_doe', 'user_name': 'John Doe',
@@ -751,20 +752,20 @@ class TestRecordProcessor(testtools.TestCase):
'date_created': 1234567890},
{'record_type': 'email',
'message_id': '<message-id>',
'author_email': 'john_doe@gmail.com',
'author_email': 'john_doe@gmail.com', 'author_name': 'John Doe',
'subject': 'hello, world!',
'body': 'lorem ipsum',
'date': 1234567890,
'blueprint_id': ['mod:blueprint']},
{'record_type': 'email',
'message_id': '<another-message-id>',
'author_email': 'john_doe@gmail.com',
'author_email': 'john_doe@gmail.com', 'author_name': 'John Doe',
'subject': 'hello, world!',
'body': 'lorem ipsum',
'date': 1234567895,
'blueprint_id': ['mod:blueprint', 'mod:invalid']},
]))
record_processor_inst.finalize()
record_processor_inst.update()
bp1 = runtime_storage_inst.get_by_primary_key('bpd:mod:blueprint')
self.assertEqual(2, bp1['mention_count'])
@@ -800,7 +801,7 @@ class TestRecordProcessor(testtools.TestCase):
'createdOn': 5,
'module': 'glance', 'branch': 'master'},
]))
record_processor_inst.finalize()
record_processor_inst.update()
review1 = runtime_storage_inst.get_by_primary_key('I111')
self.assertEqual(2, review1['review_number'])
@@ -855,7 +856,7 @@ class TestRecordProcessor(testtools.TestCase):
]
}]}
]))
record_processor_inst.finalize()
record_processor_inst.update()
marks = list([r for r in runtime_storage_inst.get_all_records()
if r['record_type'] == 'mark'])
@@ -888,7 +889,7 @@ class TestRecordProcessor(testtools.TestCase):
'status': 'MERGED',
'module': 'nova', 'branch': 'master'},
]))
record_processor_inst.finalize()
record_processor_inst.update()
commit = runtime_storage_inst.get_by_primary_key('de7e8f2')
self.assertEqual(1385490000, commit['date'])
@@ -921,19 +922,6 @@ class TestRecordProcessor(testtools.TestCase):
'change_id': u'I33f0f37b6460dc494abf2520dc109c9893ace9e6',
'release': u'havana'}
def test_update_record_no_changes(self):
commit_generator = self._generate_record_commit()
release_index = {'0afdc64bfd041b03943ceda7849c4443940b6053': 'havana'}
record_processor_inst = self.make_record_processor(
users=[],
companies=[{'company_name': 'SuperCompany',
'domains': ['super.no']}])
updated = list(record_processor_inst.update(commit_generator,
release_index))
self.assertEqual(0, len(updated))
# mail processing
def test_process_mail(self):