Add online-data-migration DB commands
Placement uses alembic to manage the DB version for schema changes. However, changes with data manupulation should be separated from the schema changes since the table can be locked and in the worst case it breaks the service for backward incompatible changes. We could handle them as a task that is done in a service down time. However, to minimize the down time, it is better to have the concepts of online data migration which has been a traditional way to handle those data manipulation changes in nova. This patch adds online data migration command to placement to enable operators to manipulate DB data while the service is running: placement-manage db online_data_migrations [--max-count] where --max-count controls the maximum number of objects to migrate in a given call. If not specified, migration will occur in batches of 50 until fully complete. Change-Id: I9cef6829513d9a54d110426baf6bcc312554e3e7
This commit is contained in:
parent
326b5cf38c
commit
80fa50187a
@ -70,3 +70,41 @@ Placement Database
|
||||
Stamp the revision table with the given revision; don’t run any migrations.
|
||||
This can be used when the database already exists and you want to bring it
|
||||
under alembic control.
|
||||
|
||||
``placement-manage db online_data_migrations [--max-count]``
|
||||
Perform data migration to update all live data.
|
||||
|
||||
``--max-count`` controls the maximum number of objects to migrate in a given
|
||||
call. If not specified, migration will occur in batches of 50 until fully
|
||||
complete.
|
||||
|
||||
Returns exit code 0 if no (further) updates are possible, 1 if the
|
||||
``--max-count`` option was used and some updates were completed successfully
|
||||
(even if others generated errors), 2 if some updates generated errors and no
|
||||
other migrations were able to take effect in the last batch attempted, or
|
||||
127 if invalid input is provided (e.g. non-numeric max-count).
|
||||
|
||||
This command should be called after upgrading database schema and placement
|
||||
services on all controller nodes. If it exits with partial updates (exit
|
||||
status 1) it should be called again, even if some updates initially
|
||||
generated errors, because some updates may depend on others having
|
||||
completed. If it exits with status 2, intervention is required to resolve
|
||||
the issue causing remaining updates to fail. It should be considered
|
||||
successfully completed only when the exit status is 0.
|
||||
|
||||
For example::
|
||||
|
||||
$ placement-manage db online_data_migrations
|
||||
Running batches of 50 until complete
|
||||
2 rows matched query create_incomplete_consumers, 2 migrated
|
||||
+---------------------------------------------+-------------+-----------+
|
||||
| Migration | Total Found | Completed |
|
||||
+---------------------------------------------+-------------+-----------+
|
||||
| create_incomplete_consumers | 2 | 2 |
|
||||
+---------------------------------------------+-------------+-----------+
|
||||
|
||||
In the above example, the ``create_incomplete_consumers`` migration
|
||||
found two candidate records which required a data migration. Since
|
||||
``--max-count`` defaults to 50 and only two records were migrated with no
|
||||
more candidates remaining, the command completed successfully with exit
|
||||
code 0.
|
||||
|
@ -10,19 +10,41 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import collections
|
||||
import functools
|
||||
import prettytable
|
||||
import six
|
||||
import sys
|
||||
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
import pbr.version
|
||||
|
||||
from placement import conf
|
||||
from placement import context
|
||||
from placement.db.sqlalchemy import migration
|
||||
from placement import db_api
|
||||
from placement.i18n import _
|
||||
|
||||
version_info = pbr.version.VersionInfo('openstack-placement')
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
online_migrations = (
|
||||
# These functions are called with a DB context and a count, which is the
|
||||
# maximum batch size requested by the user. They must be idempotent.
|
||||
# At most $count records should be migrated. The function must return a
|
||||
# tuple of (found, done). The found value indicates how many
|
||||
# unmigrated/candidate records existed in the database prior to the
|
||||
# migration (either total, or up to the $count limit provided), and a
|
||||
# nonzero found value may tell the user that there is still work to do.
|
||||
# The done value indicates whether or not any records were actually
|
||||
# migrated by the function. Thus if both (found, done) are nonzero, work
|
||||
# was done and some work remains. If found is nonzero and done is zero,
|
||||
# some records are not migratable, but all migrations that can complete
|
||||
# have finished.
|
||||
|
||||
# Added in Stein
|
||||
)
|
||||
|
||||
|
||||
class DbCommands(object):
|
||||
@ -42,6 +64,101 @@ class DbCommands(object):
|
||||
migration.stamp(self.config.command.version)
|
||||
return 0
|
||||
|
||||
def db_online_data_migrations(self):
|
||||
"""Processes online data migration.
|
||||
|
||||
:returns: 0 if no (further) updates are possible, 1 if the
|
||||
``--max-count`` option was used and some updates were
|
||||
completed successfully (even if others generated errors),
|
||||
2 if some updates generated errors and no other migrations
|
||||
were able to take effect in the last batch attempted, or
|
||||
127 if invalid input is provided.
|
||||
"""
|
||||
max_count = self.config.command.max_count
|
||||
if max_count is not None:
|
||||
try:
|
||||
max_count = int(max_count)
|
||||
except ValueError:
|
||||
max_count = -1
|
||||
if max_count < 1:
|
||||
print(_('Must supply a positive value for max_count'))
|
||||
return 127
|
||||
limited = True
|
||||
else:
|
||||
max_count = 50
|
||||
limited = False
|
||||
print(_('Running batches of %i until complete') % max_count)
|
||||
|
||||
ran = None
|
||||
migration_info = collections.OrderedDict()
|
||||
exceptions = False
|
||||
while ran is None or ran != 0:
|
||||
migrations, exceptions = self._run_online_migration(max_count)
|
||||
ran = 0
|
||||
# For each batch of migration method results, build the cumulative
|
||||
# set of results.
|
||||
for name in migrations:
|
||||
migration_info.setdefault(name, (0, 0))
|
||||
migration_info[name] = (
|
||||
migration_info[name][0] + migrations[name][0],
|
||||
migration_info[name][1] + migrations[name][1],
|
||||
)
|
||||
ran += migrations[name][1]
|
||||
if limited:
|
||||
break
|
||||
|
||||
t = prettytable.PrettyTable(
|
||||
[_('Migration'), _('Total Found'), _('Completed')])
|
||||
for name, info in migration_info.items():
|
||||
t.add_row([name, info[0], info[1]])
|
||||
print(t)
|
||||
|
||||
# NOTE(tetsuro): In "limited" case, if some update has been "ran",
|
||||
# exceptions are not considered fatal because work may still remain
|
||||
# to be done, and that work may resolve dependencies for the failing
|
||||
# migrations.
|
||||
if exceptions and not (limited and ran):
|
||||
print(_("Some migrations failed unexpectedly. Check log for "
|
||||
"details."))
|
||||
return 2
|
||||
|
||||
# TODO(mriedem): Potentially add another return code for
|
||||
# "there are more migrations, but not completable right now"
|
||||
return ran and 1 or 0
|
||||
|
||||
def _run_online_migration(self, max_count):
|
||||
ctxt = context.RequestContext(config=self.config)
|
||||
ran = 0
|
||||
exceptions = False
|
||||
migrations = collections.OrderedDict()
|
||||
for migration_meth in online_migrations:
|
||||
count = max_count - ran
|
||||
try:
|
||||
found, done = migration_meth(ctxt, count)
|
||||
except Exception:
|
||||
msg = (_("Error attempting to run %(method)s") % dict(
|
||||
method=migration_meth))
|
||||
print(msg)
|
||||
LOG.exception(msg)
|
||||
exceptions = True
|
||||
found = done = 0
|
||||
|
||||
name = migration_meth.__name__
|
||||
if found:
|
||||
print(_('%(total)i rows matched query %(meth)s, %(done)i '
|
||||
'migrated') % {'total': found,
|
||||
'meth': name,
|
||||
'done': done})
|
||||
# This is the per-migration method result for this batch, and
|
||||
# _run_online_migration will either continue on to the next
|
||||
# migration, or stop if up to this point we've processed max_count
|
||||
# of records across all migration methods.
|
||||
migrations[name] = found, done
|
||||
ran += done
|
||||
if ran >= max_count:
|
||||
break
|
||||
return migrations, exceptions
|
||||
|
||||
|
||||
def add_db_command_parsers(subparsers, config):
|
||||
command_object = DbCommands(config)
|
||||
@ -70,6 +187,15 @@ def add_db_command_parsers(subparsers, config):
|
||||
stamp_parser.add_argument('version', help=_('the version to stamp'))
|
||||
stamp_parser.set_defaults(func=command_object.db_stamp)
|
||||
|
||||
help = _('Run the online data migrations.')
|
||||
online_dm_parser = db_parser.add_parser(
|
||||
'online_data_migrations', help=help, description=help)
|
||||
online_dm_parser.add_argument(
|
||||
'--max-count', metavar='<number>',
|
||||
help='Maximum number of objects to consider')
|
||||
online_dm_parser.set_defaults(
|
||||
func=command_object.db_online_data_migrations)
|
||||
|
||||
|
||||
def setup_commands(config):
|
||||
# This is a separate method because it facilitates unit testing.
|
||||
|
@ -21,7 +21,7 @@ from placement import policy
|
||||
class RequestContext(context.RequestContext):
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
self.config = None
|
||||
self.config = kwargs.pop('config', None)
|
||||
super(RequestContext, self).__init__(*args, **kwargs)
|
||||
|
||||
def can(self, action, target=None, fatal=True):
|
||||
|
@ -51,6 +51,8 @@ class TestCommandParsers(testtools.TestCase):
|
||||
('db_version', ['db', 'version']),
|
||||
('db_sync', ['db', 'sync']),
|
||||
('db_stamp', ['db', 'stamp', 'b4ed3a175331']),
|
||||
('db_online_data_migrations',
|
||||
['db', 'online_data_migrations']),
|
||||
]:
|
||||
with mock.patch('placement.cmd.manage.DbCommands.'
|
||||
+ command) as mock_command:
|
||||
@ -101,6 +103,125 @@ class TestCommandParsers(testtools.TestCase):
|
||||
self.output.stderr.seek(0)
|
||||
|
||||
if six.PY2:
|
||||
self.assertIn('{sync,version,stamp}', self.output.stderr.read())
|
||||
self.assertIn('{sync,version,stamp,online_data_migrations}',
|
||||
self.output.stderr.read())
|
||||
else:
|
||||
self.assertIn('{sync,version,stamp}', self.output.stdout.read())
|
||||
self.assertIn('{sync,version,stamp,online_data_migrations}',
|
||||
self.output.stdout.read())
|
||||
|
||||
|
||||
class TestDBCommands(testtools.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(TestDBCommands, self).setUp()
|
||||
self.conf = cfg.ConfigOpts()
|
||||
conf_fixture = config_fixture.Config(self.conf)
|
||||
self.useFixture(conf_fixture)
|
||||
conf.register_opts(conf_fixture.conf)
|
||||
conf_fixture.config(group="placement_database", connection='sqlite://')
|
||||
command_opts = manage.setup_commands(conf_fixture)
|
||||
conf_fixture.register_cli_opts(command_opts)
|
||||
self.output = self.useFixture(
|
||||
output.CaptureOutput(do_stderr=True, do_stdout=True))
|
||||
|
||||
def _command_setup(self, max_count=None):
|
||||
command_list = ["db", "online_data_migrations"]
|
||||
if max_count is not None:
|
||||
command_list.extend(["--max-count", str(max_count)])
|
||||
self.conf(command_list,
|
||||
project='placement',
|
||||
default_config_files=None)
|
||||
return manage.DbCommands(self.conf)
|
||||
|
||||
def test_online_migrations(self):
|
||||
# Mock two online migrations
|
||||
mock_mig1 = mock.MagicMock(__name__="mock_mig_1")
|
||||
mock_mig2 = mock.MagicMock(__name__="mock_mig_2")
|
||||
mock_mig1.side_effect = [(10, 10), (0, 0)]
|
||||
mock_mig2.side_effect = [(15, 15), (0, 0)]
|
||||
mock_migrations = (mock_mig1, mock_mig2)
|
||||
|
||||
with mock.patch('placement.cmd.manage.online_migrations',
|
||||
new=mock_migrations):
|
||||
commands = self._command_setup()
|
||||
commands.db_online_data_migrations()
|
||||
expected = '''\
|
||||
Running batches of 50 until complete
|
||||
10 rows matched query mock_mig_1, 10 migrated
|
||||
15 rows matched query mock_mig_2, 15 migrated
|
||||
+------------+-------------+-----------+
|
||||
| Migration | Total Found | Completed |
|
||||
+------------+-------------+-----------+
|
||||
| mock_mig_1 | 10 | 10 |
|
||||
| mock_mig_2 | 15 | 15 |
|
||||
+------------+-------------+-----------+
|
||||
'''
|
||||
self.output.stdout.seek(0)
|
||||
self.assertEqual(expected, self.output.stdout.read())
|
||||
|
||||
def test_online_migrations_error(self):
|
||||
good_remaining = [50]
|
||||
|
||||
def good_migration(context, count):
|
||||
found = good_remaining[0]
|
||||
done = min(found, count)
|
||||
good_remaining[0] -= done
|
||||
return found, done
|
||||
|
||||
bad_migration = mock.MagicMock()
|
||||
bad_migration.side_effect = Exception("Mock Exception")
|
||||
bad_migration.__name__ = 'bad'
|
||||
|
||||
mock_migrations = (bad_migration, good_migration)
|
||||
|
||||
with mock.patch('placement.cmd.manage.online_migrations',
|
||||
new=mock_migrations):
|
||||
|
||||
# bad_migration raises an exception, but it could be because
|
||||
# good_migration had not completed yet. We should get 1 in this
|
||||
# case, because some work was done, and the command should be
|
||||
# reiterated.
|
||||
commands = self._command_setup(max_count=50)
|
||||
self.assertEqual(1, commands.db_online_data_migrations())
|
||||
|
||||
# When running this for the second time, there's no work left for
|
||||
# good_migration to do, but bad_migration still fails - should
|
||||
# get 2 this time.
|
||||
self.assertEqual(2, commands.db_online_data_migrations())
|
||||
|
||||
# When --max-count is not used, we should get 2 if all possible
|
||||
# migrations completed but some raise exceptions
|
||||
commands = self._command_setup()
|
||||
good_remaining = [125]
|
||||
self.assertEqual(2, commands.db_online_data_migrations())
|
||||
|
||||
def test_online_migrations_bad_max(self):
|
||||
commands = self._command_setup(max_count=-2)
|
||||
self.assertEqual(127, commands.db_online_data_migrations())
|
||||
|
||||
commands = self._command_setup(max_count="a")
|
||||
self.assertEqual(127, commands.db_online_data_migrations())
|
||||
|
||||
commands = self._command_setup(max_count=0)
|
||||
self.assertEqual(127, commands.db_online_data_migrations())
|
||||
|
||||
def test_online_migrations_no_max(self):
|
||||
with mock.patch('placement.cmd.manage.DbCommands.'
|
||||
'_run_online_migration') as rm:
|
||||
rm.return_value = {}, False
|
||||
commands = self._command_setup()
|
||||
self.assertEqual(0, commands.db_online_data_migrations())
|
||||
|
||||
def test_online_migrations_finished(self):
|
||||
with mock.patch('placement.cmd.manage.DbCommands.'
|
||||
'_run_online_migration') as rm:
|
||||
rm.return_value = {}, False
|
||||
commands = self._command_setup(max_count=5)
|
||||
self.assertEqual(0, commands.db_online_data_migrations())
|
||||
|
||||
def test_online_migrations_not_finished(self):
|
||||
with mock.patch('placement.cmd.manage.DbCommands.'
|
||||
'_run_online_migration') as rm:
|
||||
rm.return_value = {'mig': (10, 5)}, False
|
||||
commands = self._command_setup(max_count=5)
|
||||
self.assertEqual(1, commands.db_online_data_migrations())
|
||||
|
Loading…
x
Reference in New Issue
Block a user