From ca19abf311cd96e74c5ce08a824097f563725966 Mon Sep 17 00:00:00 2001 From: Endre Karlson Date: Fri, 19 Jun 2015 19:23:29 +0200 Subject: [PATCH] Add shard and domain_shard to tables This change adds shard to domains and domain_shard to records / recordsets as a prerequisite for the designate-zm service. Change-Id: Ieb6eee611bf7b87fef79e3a108294bc0769cd685 --- designate/objects/domain.py | 7 ++ designate/objects/record.py | 7 ++ designate/objects/recordset.py | 7 ++ .../versions/068_add_shard_column.py | 101 ++++++++++++++++++ designate/storage/impl_sqlalchemy/tables.py | 17 ++- 5 files changed, 136 insertions(+), 3 deletions(-) create mode 100644 designate/storage/impl_sqlalchemy/migrate_repo/versions/068_add_shard_column.py diff --git a/designate/objects/domain.py b/designate/objects/domain.py index 72a8130bc..e8bc4a34e 100644 --- a/designate/objects/domain.py +++ b/designate/objects/domain.py @@ -24,6 +24,13 @@ from designate.objects.domain_attribute import DomainAttributeList class Domain(base.DictObjectMixin, base.SoftDeleteObjectMixin, base.PersistentObjectMixin, base.DesignateObject): FIELDS = { + 'shard': { + 'schema': { + 'type': 'integer', + 'minimum': 0, + 'maximum': 4095 + } + }, 'tenant_id': { 'schema': { 'type': 'string', diff --git a/designate/objects/record.py b/designate/objects/record.py index c26e8008f..094a64812 100644 --- a/designate/objects/record.py +++ b/designate/objects/record.py @@ -20,6 +20,13 @@ class Record(base.DictObjectMixin, base.PersistentObjectMixin, # TODO(kiall): `hash` is an implementation detail of our SQLA driver, # so we should remove it. FIELDS = { + 'shard': { + 'schema': { + 'type': 'integer', + 'minimum': 0, + 'maximum': 4095 + } + }, 'data': {}, 'domain_id': { 'schema': { diff --git a/designate/objects/recordset.py b/designate/objects/recordset.py index 1b6bbc7d6..b38afda67 100644 --- a/designate/objects/recordset.py +++ b/designate/objects/recordset.py @@ -68,6 +68,13 @@ class RecordSet(base.DictObjectMixin, base.PersistentObjectMixin, return status FIELDS = { + 'shard': { + 'schema': { + 'type': 'integer', + 'minimum': 0, + 'maximum': 4095 + } + }, 'tenant_id': { 'schema': { 'type': 'string', diff --git a/designate/storage/impl_sqlalchemy/migrate_repo/versions/068_add_shard_column.py b/designate/storage/impl_sqlalchemy/migrate_repo/versions/068_add_shard_column.py new file mode 100644 index 000000000..0cef1b00e --- /dev/null +++ b/designate/storage/impl_sqlalchemy/migrate_repo/versions/068_add_shard_column.py @@ -0,0 +1,101 @@ +# Copyright 2015 Hewlett-Packard Development Company, L.P. +# +# Author: Endre Karlson +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +from oslo_log import log as logging +from sqlalchemy import Column, MetaData, Table, SmallInteger +from sqlalchemy.sql import select, update + +from designate import i18n + +LOG = logging.getLogger(__name__) + +meta = MetaData() + + +def _add_shards(engine, table, dst_col, src_col): + dialect = engine.url.get_dialect().name + if dialect.startswith('mysql'): + sql = "UPDATE %s SET %s = CONV(SUBSTR(%s, 1, 3), 16, 10)" + engine.execute(sql % (table.name, dst_col.name, src_col.name)) + elif dialect.startswith('postgres'): + sql = "UPDATE %s SET %s = ('x'||lpad(substr(%s::text, 1, 3), 8, '0')"\ + ")::bit(32)::int" + engine.execute(sql % (table.name, dst_col.name, src_col.name)) + else: + rows = select(columns=[table.c.id]).execute().fetchall() + for r in rows: + shard = int(r.id[0:3], 16) + values = {dst_col.name: shard} + update(table).where(table.id == r.id).values(values) + + +def upgrade(migrate_engine): + meta.bind = migrate_engine + + domains_table = Table('domains', meta, autoload=True) + recordsets_table = Table('recordsets', meta, autoload=True) + records_table = Table('records', meta, autoload=True) + + domains_shard_col = Column('shard', SmallInteger(), nullable=True) + domains_shard_col.create(domains_table) + + recordset_domain_shard_col = Column('domain_shard', SmallInteger(), + nullable=True) + recordset_domain_shard_col.create(recordsets_table) + + records_domain_shard_col = Column('domain_shard', SmallInteger(), + nullable=True) + records_domain_shard_col.create(records_table) + + def _set_default(): + _add_shards( + migrate_engine, + domains_table, + domains_shard_col, + domains_table.c.id) + _add_shards( + migrate_engine, + recordsets_table, + recordset_domain_shard_col, + recordsets_table.c.domain_id) + _add_shards( + migrate_engine, + records_table, + records_domain_shard_col, + records_table.c.domain_id) + + def _set_nullable(): + domains_table.c.shard.alter(nullable=False) + recordsets_table.c.domain_shard.alter(nullable=False) + records_table.c.domain_shard.alter(nullable=False) + + for i in range(0, 5): + try: + _set_default() + _set_nullable() + except Exception as e: + # The population default & enforcement of nullable=False failed, + # try again + msg = i18n._LW( + "Updating migration for sharding failed, retrying.") + LOG.warn(msg) + if i >= 4: + # Raise if we've reached max attempts causing migration to + # fail + raise e + else: + continue + # It was successful, no exception so we break the loop. + break diff --git a/designate/storage/impl_sqlalchemy/tables.py b/designate/storage/impl_sqlalchemy/tables.py index e62605f86..48c1e549d 100644 --- a/designate/storage/impl_sqlalchemy/tables.py +++ b/designate/storage/impl_sqlalchemy/tables.py @@ -13,9 +13,9 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. -from sqlalchemy import (Table, MetaData, Column, String, Text, Integer, CHAR, - DateTime, Enum, Boolean, Unicode, UniqueConstraint, - ForeignKeyConstraint) +from sqlalchemy import (Table, MetaData, Column, String, Text, Integer, + SmallInteger, CHAR, DateTime, Enum, Boolean, Unicode, + UniqueConstraint, ForeignKeyConstraint) from oslo_config import cfg from oslo_utils import timeutils @@ -44,6 +44,11 @@ ZONE_TASK_TYPES = ['IMPORT'] metadata = MetaData() + +def default_shard(context, id_col): + return int(context.current_parameters[id_col][0:3], 16) + + quotas = Table('quotas', metadata, Column('id', UUID, default=utils.generate_uuid, primary_key=True), Column('version', Integer(), default=1, nullable=False), @@ -80,6 +85,8 @@ domains = Table('domains', metadata, Column('deleted', CHAR(32), nullable=False, default='0', server_default='0'), Column('deleted_at', DateTime, nullable=True, default=None), + Column('shard', SmallInteger(), nullable=False, + default=lambda ctxt: default_shard(ctxt, 'id')), Column('tenant_id', String(36), default=None, nullable=True), Column('name', String(255), nullable=False), @@ -134,6 +141,8 @@ recordsets = Table('recordsets', metadata, Column('version', Integer(), default=1, nullable=False), Column('created_at', DateTime, default=lambda: timeutils.utcnow()), Column('updated_at', DateTime, onupdate=lambda: timeutils.utcnow()), + Column('domain_shard', SmallInteger(), nullable=False, + default=lambda ctxt: default_shard(ctxt, 'domain_id')), Column('tenant_id', String(36), default=None, nullable=True), Column('domain_id', UUID, nullable=False), @@ -155,6 +164,8 @@ records = Table('records', metadata, Column('version', Integer(), default=1, nullable=False), Column('created_at', DateTime, default=lambda: timeutils.utcnow()), Column('updated_at', DateTime, onupdate=lambda: timeutils.utcnow()), + Column('domain_shard', SmallInteger(), nullable=False, + default=lambda ctxt: default_shard(ctxt, 'domain_id')), Column('tenant_id', String(36), default=None, nullable=True), Column('domain_id', UUID, nullable=False),