Merge "Fix the migration issue for the user doesn't have a password"

This commit is contained in:
Jenkins 2016-03-01 22:09:55 +00:00 committed by Gerrit Code Review
commit 1ee14754ea
2 changed files with 61 additions and 5 deletions

View File

@ -12,6 +12,7 @@
import migrate import migrate
import sqlalchemy as sql import sqlalchemy as sql
from sqlalchemy import func
def upgrade(migrate_engine): def upgrade(migrate_engine):
@ -25,9 +26,16 @@ def upgrade(migrate_engine):
# migrate data to local_user table # migrate data to local_user table
local_user_values = [] local_user_values = []
for row in user_table.select().execute(): for row in user_table.select().execute():
local_user_values.append({'user_id': row['id'], # skip the row that already exists in `local_user`, this could
'domain_id': row['domain_id'], # happen if run into a partially-migrated table due to the
'name': row['name']}) # bug #1549705.
filter_by = local_user_table.c.user_id == row['id']
user_count = sql.select([func.count()]).select_from(
local_user_table).where(filter_by).execute().fetchone()[0]
if user_count == 0:
local_user_values.append({'user_id': row['id'],
'domain_id': row['domain_id'],
'name': row['name']})
if local_user_values: if local_user_values:
local_user_table.insert().values(local_user_values).execute() local_user_table.insert().values(local_user_values).execute()
@ -40,8 +48,9 @@ def upgrade(migrate_engine):
user_rows = sel.execute() user_rows = sel.execute()
password_values = [] password_values = []
for row in user_rows: for row in user_rows:
password_values.append({'local_user_id': row['local_user_id'], if row['user_password']:
'password': row['user_password']}) password_values.append({'local_user_id': row['local_user_id'],
'password': row['user_password']})
if password_values: if password_values:
password_table.insert().values(password_values).execute() password_table.insert().values(password_values).execute()

View File

@ -805,6 +805,53 @@ class SqlUpgradeTests(SqlMigrateBase):
password_table) password_table)
self.assertListEqual(expected_users, actual_users) self.assertListEqual(expected_users, actual_users)
def test_migrate_user_with_null_password_to_password_tables(self):
USER_TABLE_NAME = 'user'
LOCAL_USER_TABLE_NAME = 'local_user'
PASSWORD_TABLE_NAME = 'password'
self.upgrade(90)
user_ref = unit.new_user_ref(uuid.uuid4().hex)
user_ref.pop('password')
# pop extra attribute which doesn't recognized by SQL expression
# layer.
user_ref.pop('email')
session = self.Session()
self.insert_dict(session, USER_TABLE_NAME, user_ref)
self.metadata.clear()
self.upgrade(91)
# migration should be successful.
self.assertTableCountsMatch(USER_TABLE_NAME, LOCAL_USER_TABLE_NAME)
# no new entry was added to the password table because the
# user doesn't have a password.
password_table = self.select_table(PASSWORD_TABLE_NAME)
rows = session.execute(password_table.count()).scalar()
self.assertEqual(0, rows)
def test_migrate_user_skip_user_already_exist_in_local_user(self):
USER_TABLE_NAME = 'user'
LOCAL_USER_TABLE_NAME = 'local_user'
self.upgrade(90)
user1_ref = unit.new_user_ref(uuid.uuid4().hex)
# pop extra attribute which doesn't recognized by SQL expression
# layer.
user1_ref.pop('email')
user2_ref = unit.new_user_ref(uuid.uuid4().hex)
user2_ref.pop('email')
session = self.Session()
self.insert_dict(session, USER_TABLE_NAME, user1_ref)
self.insert_dict(session, USER_TABLE_NAME, user2_ref)
user_id = user1_ref.pop('id')
user_name = user1_ref.pop('name')
domain_id = user1_ref.pop('domain_id')
local_user_ref = {'user_id': user_id, 'name': user_name,
'domain_id': domain_id}
self.insert_dict(session, LOCAL_USER_TABLE_NAME, local_user_ref)
self.metadata.clear()
self.upgrade(91)
# migration should be successful and user2_ref has been migrated to
# `local_user` table.
self.assertTableCountsMatch(USER_TABLE_NAME, LOCAL_USER_TABLE_NAME)
def test_implied_roles_fk_on_delete_cascade(self): def test_implied_roles_fk_on_delete_cascade(self):
if self.engine.name == 'sqlite': if self.engine.name == 'sqlite':
self.skipTest('sqlite backend does not support foreign keys') self.skipTest('sqlite backend does not support foreign keys')