Users are stored individually, not in a single collection
Users were stored in collection 'users', that collection was updated from default data and after the whole update process. In case of failure the collection misses all changes. The issue is fixed by storing all users individually by their user_id, email and launchpad_id. Fixes bug 1242588 Change-Id: Ie3d57ec7f3f18c4259dbd34cc8f29be45b06e2cd
This commit is contained in:
@@ -29,8 +29,6 @@ class RecordProcessor(object):
|
|||||||
|
|
||||||
self.domains_index = runtime_storage_inst.get_by_key('companies')
|
self.domains_index = runtime_storage_inst.get_by_key('companies')
|
||||||
|
|
||||||
self.users_index = runtime_storage_inst.get_by_key('users')
|
|
||||||
|
|
||||||
self.releases = runtime_storage_inst.get_by_key('releases')
|
self.releases = runtime_storage_inst.get_by_key('releases')
|
||||||
self.releases_dates = [r['end_date'] for r in self.releases]
|
self.releases_dates = [r['end_date'] for r in self.releases]
|
||||||
|
|
||||||
@@ -142,18 +140,19 @@ class RecordProcessor(object):
|
|||||||
def update_user(self, record):
|
def update_user(self, record):
|
||||||
email = record.get('author_email')
|
email = record.get('author_email')
|
||||||
|
|
||||||
if email in self.users_index:
|
user = utils.load_user(self.runtime_storage_inst, email)
|
||||||
user = self.users_index[email]
|
if not user:
|
||||||
else:
|
|
||||||
if record.get('launchpad_id'):
|
if record.get('launchpad_id'):
|
||||||
launchpad_id = record.get('launchpad_id')
|
launchpad_id = record.get('launchpad_id')
|
||||||
user_name = record.get('author_name')
|
user_name = record.get('author_name')
|
||||||
else:
|
else:
|
||||||
launchpad_id, user_name = self._get_lp_info(email)
|
launchpad_id, user_name = self._get_lp_info(email)
|
||||||
|
|
||||||
if (launchpad_id) and (launchpad_id in self.users_index):
|
if launchpad_id:
|
||||||
|
user = utils.load_user(self.runtime_storage_inst, launchpad_id)
|
||||||
|
|
||||||
|
if user:
|
||||||
# merge emails
|
# merge emails
|
||||||
user = self.users_index[launchpad_id]
|
|
||||||
if email:
|
if email:
|
||||||
self._update_user_profile(user, email)
|
self._update_user_profile(user, email)
|
||||||
else:
|
else:
|
||||||
@@ -165,10 +164,6 @@ class RecordProcessor(object):
|
|||||||
user = self._create_user(launchpad_id, email, user_name)
|
user = self._create_user(launchpad_id, email, user_name)
|
||||||
|
|
||||||
utils.store_user(self.runtime_storage_inst, user)
|
utils.store_user(self.runtime_storage_inst, user)
|
||||||
if email:
|
|
||||||
self.users_index[email] = user
|
|
||||||
if user['launchpad_id']:
|
|
||||||
self.users_index[user['launchpad_id']] = user
|
|
||||||
|
|
||||||
return user
|
return user
|
||||||
|
|
||||||
@@ -198,7 +193,7 @@ class RecordProcessor(object):
|
|||||||
yield record
|
yield record
|
||||||
|
|
||||||
def _spawn_review(self, record):
|
def _spawn_review(self, record):
|
||||||
# copy everything except pathsets and flatten user data
|
# copy everything except patchsets and flatten user data
|
||||||
review = dict([(k, v) for k, v in record.iteritems()
|
review = dict([(k, v) for k, v in record.iteritems()
|
||||||
if k not in ['patchSets', 'owner', 'createdOn']])
|
if k not in ['patchSets', 'owner', 'createdOn']])
|
||||||
owner = record['owner']
|
owner = record['owner']
|
||||||
@@ -340,8 +335,6 @@ class RecordProcessor(object):
|
|||||||
|
|
||||||
yield r
|
yield r
|
||||||
|
|
||||||
self.runtime_storage_inst.set_by_key('users', self.users_index)
|
|
||||||
|
|
||||||
def update(self, record_iterator, release_index):
|
def update(self, record_iterator, release_index):
|
||||||
for record in record_iterator:
|
for record in record_iterator:
|
||||||
need_update = False
|
need_update = False
|
||||||
@@ -369,8 +362,6 @@ class RecordProcessor(object):
|
|||||||
if need_update:
|
if need_update:
|
||||||
yield record
|
yield record
|
||||||
|
|
||||||
self.runtime_storage_inst.set_by_key('users', self.users_index)
|
|
||||||
|
|
||||||
def _get_records_for_users_to_update(self):
|
def _get_records_for_users_to_update(self):
|
||||||
users_reviews = {}
|
users_reviews = {}
|
||||||
valid_blueprints = {}
|
valid_blueprints = {}
|
||||||
@@ -424,7 +415,7 @@ class RecordProcessor(object):
|
|||||||
|
|
||||||
user_id = record['user_id']
|
user_id = record['user_id']
|
||||||
if user_id in self.updated_users:
|
if user_id in self.updated_users:
|
||||||
user = self.users_index[user_id]
|
user = utils.load_user(self.runtime_storage_inst, user_id)
|
||||||
user_company_name = user['companies'][0]['company_name']
|
user_company_name = user['companies'][0]['company_name']
|
||||||
if record['company_name'] != user_company_name:
|
if record['company_name'] != user_company_name:
|
||||||
LOG.debug('Update record %s: company changed to: %s',
|
LOG.debug('Update record %s: company changed to: %s',
|
||||||
|
|||||||
@@ -42,7 +42,7 @@ class RuntimeStorage(object):
|
|||||||
def get_by_key(self, key):
|
def get_by_key(self, key):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def set_by_key(self, key, head_commit_id):
|
def set_by_key(self, key, value):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def get_update(self, pid):
|
def get_update(self, pid):
|
||||||
|
|||||||
@@ -82,6 +82,10 @@ def make_range(start, stop, step):
|
|||||||
|
|
||||||
def store_user(runtime_storage_inst, user):
|
def store_user(runtime_storage_inst, user):
|
||||||
runtime_storage_inst.set_by_key('user:%s' % user['user_id'], user)
|
runtime_storage_inst.set_by_key('user:%s' % user['user_id'], user)
|
||||||
|
if user.get('launchpad_id'):
|
||||||
|
runtime_storage_inst.set_by_key('user:%s' % user['launchpad_id'], user)
|
||||||
|
for email in user.get('emails') or []:
|
||||||
|
runtime_storage_inst.set_by_key('user:%s' % email, user)
|
||||||
|
|
||||||
|
|
||||||
def load_user(runtime_storage_inst, user_id):
|
def load_user(runtime_storage_inst, user_id):
|
||||||
|
|||||||
@@ -202,8 +202,8 @@ class TestRecordProcessor(testtools.TestCase):
|
|||||||
}
|
}
|
||||||
|
|
||||||
self.assertRecordsMatch(expected_commit, processed_commit)
|
self.assertRecordsMatch(expected_commit, processed_commit)
|
||||||
self.assertIn('johndoe@ibm.com',
|
self.assertIn('johndoe@ibm.com', utils.load_user(
|
||||||
record_processor_inst.users_index['john_doe']['emails'])
|
record_processor_inst.runtime_storage_inst, 'john_doe')['emails'])
|
||||||
|
|
||||||
def test_process_commit_existing_user_new_email_unknown_company(self):
|
def test_process_commit_existing_user_new_email_unknown_company(self):
|
||||||
# User is known to LP, but his email is new to us. Should match
|
# User is known to LP, but his email is new to us. Should match
|
||||||
@@ -232,8 +232,8 @@ class TestRecordProcessor(testtools.TestCase):
|
|||||||
}
|
}
|
||||||
|
|
||||||
self.assertRecordsMatch(expected_commit, processed_commit)
|
self.assertRecordsMatch(expected_commit, processed_commit)
|
||||||
self.assertIn('johndoe@gmail.com',
|
self.assertIn('johndoe@gmail.com', utils.load_user(
|
||||||
record_processor_inst.users_index['john_doe']['emails'])
|
record_processor_inst.runtime_storage_inst, 'john_doe')['emails'])
|
||||||
|
|
||||||
def test_process_commit_existing_user_new_email_known_company_update(self):
|
def test_process_commit_existing_user_new_email_known_company_update(self):
|
||||||
record_processor_inst = self.make_record_processor(
|
record_processor_inst = self.make_record_processor(
|
||||||
@@ -261,7 +261,8 @@ class TestRecordProcessor(testtools.TestCase):
|
|||||||
}
|
}
|
||||||
|
|
||||||
self.assertRecordsMatch(expected_commit, processed_commit)
|
self.assertRecordsMatch(expected_commit, processed_commit)
|
||||||
user = record_processor_inst.users_index['john_doe']
|
user = utils.load_user(
|
||||||
|
record_processor_inst.runtime_storage_inst, 'john_doe')
|
||||||
self.assertIn('johndoe@gmail.com', user['emails'])
|
self.assertIn('johndoe@gmail.com', user['emails'])
|
||||||
self.assertEquals('IBM', user['companies'][0]['company_name'],
|
self.assertEquals('IBM', user['companies'][0]['company_name'],
|
||||||
message='User affiliation should be updated')
|
message='User affiliation should be updated')
|
||||||
@@ -286,7 +287,8 @@ class TestRecordProcessor(testtools.TestCase):
|
|||||||
}
|
}
|
||||||
|
|
||||||
self.assertRecordsMatch(expected_commit, processed_commit)
|
self.assertRecordsMatch(expected_commit, processed_commit)
|
||||||
user = record_processor_inst.users_index['john_doe']
|
user = utils.load_user(
|
||||||
|
record_processor_inst.runtime_storage_inst, 'john_doe')
|
||||||
self.assertIn('johndoe@ibm.com', user['emails'])
|
self.assertIn('johndoe@ibm.com', user['emails'])
|
||||||
self.assertEquals('IBM', user['companies'][0]['company_name'])
|
self.assertEquals('IBM', user['companies'][0]['company_name'])
|
||||||
|
|
||||||
@@ -308,8 +310,8 @@ class TestRecordProcessor(testtools.TestCase):
|
|||||||
}
|
}
|
||||||
|
|
||||||
self.assertRecordsMatch(expected_commit, processed_commit)
|
self.assertRecordsMatch(expected_commit, processed_commit)
|
||||||
self.assertEquals(1, len(record_processor_inst.users_index))
|
user = utils.load_user(
|
||||||
user = record_processor_inst.users_index['johndoe@ibm.com']
|
record_processor_inst.runtime_storage_inst, 'johndoe@ibm.com')
|
||||||
self.assertIn('johndoe@ibm.com', user['emails'])
|
self.assertIn('johndoe@ibm.com', user['emails'])
|
||||||
self.assertEquals('IBM', user['companies'][0]['company_name'])
|
self.assertEquals('IBM', user['companies'][0]['company_name'])
|
||||||
self.assertEquals(None, user['launchpad_id'])
|
self.assertEquals(None, user['launchpad_id'])
|
||||||
@@ -338,8 +340,8 @@ class TestRecordProcessor(testtools.TestCase):
|
|||||||
'company_name': '*independent'},
|
'company_name': '*independent'},
|
||||||
processed_records[0])
|
processed_records[0])
|
||||||
|
|
||||||
self.assertEquals(1, len(record_processor_inst.users_index))
|
user = utils.load_user(
|
||||||
user = record_processor_inst.users_index['john_doe']
|
record_processor_inst.runtime_storage_inst, 'john_doe')
|
||||||
self.assertEquals({
|
self.assertEquals({
|
||||||
'user_id': 'john_doe',
|
'user_id': 'john_doe',
|
||||||
'launchpad_id': 'john_doe',
|
'launchpad_id': 'john_doe',
|
||||||
@@ -372,8 +374,8 @@ class TestRecordProcessor(testtools.TestCase):
|
|||||||
'company_name': '*independent'},
|
'company_name': '*independent'},
|
||||||
processed_records[0])
|
processed_records[0])
|
||||||
|
|
||||||
self.assertEquals(1, len(record_processor_inst.users_index))
|
user = utils.load_user(
|
||||||
user = record_processor_inst.users_index['john_doe']
|
record_processor_inst.runtime_storage_inst, 'john_doe')
|
||||||
self.assertEquals({
|
self.assertEquals({
|
||||||
'user_id': 'john_doe',
|
'user_id': 'john_doe',
|
||||||
'launchpad_id': 'john_doe',
|
'launchpad_id': 'john_doe',
|
||||||
@@ -423,8 +425,10 @@ class TestRecordProcessor(testtools.TestCase):
|
|||||||
'user_name': 'John Doe',
|
'user_name': 'John Doe',
|
||||||
'emails': ['john_doe@gmail.com'],
|
'emails': ['john_doe@gmail.com'],
|
||||||
'companies': [{'company_name': '*independent', 'end_date': 0}]}
|
'companies': [{'company_name': '*independent', 'end_date': 0}]}
|
||||||
self.assertEquals({'john_doe': user, 'john_doe@gmail.com': user},
|
self.assertEquals(user, utils.load_user(
|
||||||
record_processor_inst.users_index)
|
record_processor_inst.runtime_storage_inst, 'john_doe'))
|
||||||
|
self.assertEquals(user, utils.load_user(
|
||||||
|
record_processor_inst.runtime_storage_inst, 'john_doe@gmail.com'))
|
||||||
|
|
||||||
def test_process_blueprint_then_commit(self):
|
def test_process_blueprint_then_commit(self):
|
||||||
record_processor_inst = self.make_record_processor(
|
record_processor_inst = self.make_record_processor(
|
||||||
@@ -469,8 +473,10 @@ class TestRecordProcessor(testtools.TestCase):
|
|||||||
'user_name': 'John Doe',
|
'user_name': 'John Doe',
|
||||||
'emails': ['john_doe@gmail.com'],
|
'emails': ['john_doe@gmail.com'],
|
||||||
'companies': [{'company_name': '*independent', 'end_date': 0}]}
|
'companies': [{'company_name': '*independent', 'end_date': 0}]}
|
||||||
self.assertEquals({'john_doe': user, 'john_doe@gmail.com': user},
|
self.assertEquals(user, utils.load_user(
|
||||||
record_processor_inst.users_index)
|
record_processor_inst.runtime_storage_inst, 'john_doe'))
|
||||||
|
self.assertEquals(user, utils.load_user(
|
||||||
|
record_processor_inst.runtime_storage_inst, 'john_doe@gmail.com'))
|
||||||
|
|
||||||
def test_process_review_then_blueprint(self):
|
def test_process_review_then_blueprint(self):
|
||||||
record_processor_inst = self.make_record_processor(
|
record_processor_inst = self.make_record_processor(
|
||||||
@@ -513,8 +519,10 @@ class TestRecordProcessor(testtools.TestCase):
|
|||||||
'user_name': 'John Doe',
|
'user_name': 'John Doe',
|
||||||
'emails': ['john_doe@gmail.com'],
|
'emails': ['john_doe@gmail.com'],
|
||||||
'companies': [{'company_name': '*independent', 'end_date': 0}]}
|
'companies': [{'company_name': '*independent', 'end_date': 0}]}
|
||||||
self.assertEquals({'john_doe': user, 'john_doe@gmail.com': user},
|
self.assertEquals(user, utils.load_user(
|
||||||
record_processor_inst.users_index)
|
record_processor_inst.runtime_storage_inst, 'john_doe'))
|
||||||
|
self.assertEquals(user, utils.load_user(
|
||||||
|
record_processor_inst.runtime_storage_inst, 'john_doe@gmail.com'))
|
||||||
|
|
||||||
# update records
|
# update records
|
||||||
|
|
||||||
@@ -788,22 +796,37 @@ def generate_emails(author_name='John Doe', author_email='johndoe@gmail.com',
|
|||||||
|
|
||||||
def make_runtime_storage(users=None, companies=None, releases=None,
|
def make_runtime_storage(users=None, companies=None, releases=None,
|
||||||
repos=None):
|
repos=None):
|
||||||
def get_by_key(collection):
|
runtime_storage_cache = {}
|
||||||
if collection == 'companies':
|
|
||||||
|
def get_by_key(key):
|
||||||
|
if key == 'companies':
|
||||||
return _make_companies(companies or [
|
return _make_companies(companies or [
|
||||||
{"company_name": "*independent", "domains": [""]},
|
{"company_name": "*independent", "domains": [""]},
|
||||||
])
|
])
|
||||||
elif collection == 'users':
|
elif key == 'users':
|
||||||
return _make_users(users or [])
|
return _make_users(users or [])
|
||||||
elif collection == 'releases':
|
elif key == 'releases':
|
||||||
return releases or RELEASES
|
return releases or RELEASES
|
||||||
elif collection == 'repos':
|
elif key == 'repos':
|
||||||
return repos or REPOS
|
return repos or REPOS
|
||||||
else:
|
else:
|
||||||
raise Exception('Wrong collection: %s' % collection)
|
return runtime_storage_cache.get(key)
|
||||||
|
|
||||||
|
def set_by_key(key, value):
|
||||||
|
runtime_storage_cache[key] = value
|
||||||
|
|
||||||
rs = mock.Mock(runtime_storage.RuntimeStorage)
|
rs = mock.Mock(runtime_storage.RuntimeStorage)
|
||||||
rs.get_by_key = mock.Mock(side_effect=get_by_key)
|
rs.get_by_key = mock.Mock(side_effect=get_by_key)
|
||||||
|
rs.set_by_key = mock.Mock(side_effect=set_by_key)
|
||||||
|
|
||||||
|
if users:
|
||||||
|
for user in users:
|
||||||
|
set_by_key('user:%s' % user['user_id'], user)
|
||||||
|
if user.get('launchpad_id'):
|
||||||
|
set_by_key('user:%s' % user['launchpad_id'], user)
|
||||||
|
for email in user.get('emails') or []:
|
||||||
|
set_by_key('user:%s' % email, user)
|
||||||
|
|
||||||
return rs
|
return rs
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user