- completely refactored ColumnDelta to extract differences between columns/parameters (also fixes issue #23)

- fixed some bugs (passing server_default) on column.alter
- updated tests, specially ColumnDelta and column.alter
- introduced alter_metadata which can preserve altering existing objects if False (defaults to True)
- updated documentation
This commit is contained in:
iElectric 2009-06-27 14:13:27 +00:00
parent a8c31eb25f
commit 9f7ab96881
11 changed files with 544 additions and 308 deletions

9
TODO
View File

@ -1,7 +1,3 @@
- better MySQL support
- fix unit tests for other databases than PostgreSQL (MySQL and SQLite
fail at test_changeset.test_fk(..))
- better SQL scripts support (testing, source viewing)
make_update_script_for_model:
@ -9,9 +5,4 @@ make_update_script_for_model:
- columns are not compared?
- even if two "models" are equal, it doesn't yield so
- refactor test_shell to test_api and use TestScript for cmd line testing
- controlledschema.drop() drops whole migrate table, maybe there are some other repositories bound to it!
- document sqlite hacks (unique index for pk constraint)
- document constraints usage, document all ways then can be used, document cascade,table,columns options

View File

@ -1,7 +1,10 @@
0.5.5
-----
- alter column constructs now accept `alter_metadata` parameter. If True, it will modify Column/Table objects according to changes. Otherwise, everything will be untouched.
- complete refactoring of :class:`~migrate.changeset.schema.ColumnDelta` (fixes issue 23)
- added support for :ref:`firebird <firebird-d>`
- fixed bug when column.alter(server_default='string') was not properly set
- server_defaults passed to column.create are now issued correctly
- constraints passed to column.create are correctly interpreted (ALTER TABLE ADD CONSTRAINT is issued after ADD COLUMN)
- column.create accepts `primary_key_name`, `unique_name` and `index_name` as string value which is used as contraint name when adding a column
@ -18,6 +21,7 @@
**Backward incompatible changes**:
- python upgrade/downgrade scripts do not import migrate_engine magically, but recieve engine as the only parameter to function
- alter column does not accept `current_name` anymore, it extracts name from the old column.
0.5.4
-----

View File

@ -59,8 +59,8 @@ Dialect support
| :ref:`ALTER TABLE DROP COLUMN <column-drop>` | yes | yes | yes | yes | yes | |
| | (workaround) [#1]_ | | | | | |
+---------------------------------------------------------+--------------------------+------------------------------+------------------------+---------------------------+-------------------------------+-------+
| :ref:`ALTER TABLE ALTER COLUMN <column-alter>` | no | yes | yes | yes | yes [#4]_ | |
| | | | | (with limitations) [#3]_ | | |
| :ref:`ALTER TABLE ALTER COLUMN <column-alter>` | yes | yes | yes | yes | yes [#4]_ | |
| | (workaround) [#1]_ | | | (with limitations) [#3]_ | | |
+---------------------------------------------------------+--------------------------+------------------------------+------------------------+---------------------------+-------------------------------+-------+
| :ref:`ALTER TABLE ADD CONSTRAINT <constraint-tutorial>` | no | yes | yes | yes | yes | |
| | | | | | | |

View File

@ -12,3 +12,5 @@ from migrate.changeset.constraint import *
sqlalchemy.schema.Table.__bases__ += (ChangesetTable, )
sqlalchemy.schema.Column.__bases__ += (ChangesetColumn, )
sqlalchemy.schema.Index.__bases__ += (ChangesetIndex, )
sqlalchemy.schema.DefaultClause.__bases__ += (ChangesetDefaultClause, )

View File

@ -45,30 +45,6 @@ class AlterTableVisitor(SchemaIterator):
self.append('\nALTER TABLE %s ' % self.preparer.format_table(table))
return table
# DEPRECATED: use plain constraints instead
#def _pk_constraint(self, table, column, status):
# """Create a primary key constraint from a table, column.
# Status: true if the constraint is being added; false if being dropped
# """
# if isinstance(column, basestring):
# column = getattr(table.c, name)
# ret = constraint.PrimaryKeyConstraint(*table.primary_key)
# if status:
# # Created PK
# ret.c.append(column)
# else:
# # Dropped PK
# names = [c.name for c in cons.c]
# index = names.index(col.name)
# del ret.c[index]
# # Allow explicit PK name assignment
# if isinstance(pk, basestring):
# ret.name = pk
# return ret
class ANSIColumnGenerator(AlterTableVisitor, SchemaGenerator):
"""Extends ansisql generator for column creation (alter table add col)"""
@ -160,10 +136,9 @@ class ANSISchemaChanger(AlterTableVisitor, SchemaGenerator):
True), index.quote)))
self.execute()
def visit_column(self, column):
def visit_column(self, delta):
"""Rename/change a column."""
# ALTER COLUMN is implemented as several ALTER statements
delta = column.delta
keys = delta.keys()
if 'type' in keys:
self._run_subvisit(delta, self._visit_column_type)
@ -182,44 +157,37 @@ class ANSISchemaChanger(AlterTableVisitor, SchemaGenerator):
col_name = delta.current_name
if start_alter:
self.start_alter_column(table, col_name)
ret = func(table, col_name, delta)
ret = func(table, delta.result_column, delta)
self.execute()
def start_alter_column(self, table, col_name):
"""Starts ALTER COLUMN"""
self.start_alter_table(table)
# TODO: use preparer.format_column
self.append("ALTER COLUMN %s " % self.preparer.quote(col_name, table.quote))
def _visit_column_nullable(self, table, col_name, delta):
def _visit_column_nullable(self, table, column, delta):
nullable = delta['nullable']
if nullable:
self.append("DROP NOT NULL")
else:
self.append("SET NOT NULL")
def _visit_column_default(self, table, col_name, delta):
server_default = delta['server_default']
# Dummy column: get_col_default_string needs a column for some
# reason
dummy = sa.Column(None, None, server_default=server_default)
default_text = self.get_column_default_string(dummy)
def _visit_column_default(self, table, column, delta):
default_text = self.get_column_default_string(column)
if default_text is not None:
self.append("SET DEFAULT %s" % default_text)
else:
self.append("DROP DEFAULT")
def _visit_column_type(self, table, col_name, delta):
def _visit_column_type(self, table, column, delta):
type_ = delta['type']
if not isinstance(type_, sa.types.AbstractType):
# It's the class itself, not an instance... make an instance
type_ = type_()
type_text = type_.dialect_impl(self.dialect).get_col_spec()
self.append("TYPE %s" % type_text)
def _visit_column_name(self, table, col_name, delta):
new_name = delta['name']
def _visit_column_name(self, table, column, delta):
self.start_alter_table(table)
col_name = self.preparer.quote(delta.current_name, table.quote)
new_name = self.preparer.format_column(delta.result_column)
self.append('RENAME COLUMN %s TO %s' % (col_name, new_name))

View File

@ -30,12 +30,13 @@ class FBSchemaChanger(ansisql.ANSISchemaChanger):
raise exceptions.NotSupportedError(
"Firebird does not support renaming tables.")
def _visit_column_name(self, table, col_name, delta):
new_name = delta['name']
def _visit_column_name(self, table, column, delta):
self.start_alter_table(table)
self.append('ALTER COLUMN %s TO %s' % ((col_name), (new_name)))
col_name = self.preparer.quote(delta.current_name, table.quote)
new_name = self.preparer.format_column(delta.result_column)
self.append('ALTER COLUMN %s TO %s' % (col_name, new_name))
def _visit_column_nullable(self, table, col_name, delta):
def _visit_column_nullable(self, table, column, delta):
"""Changing NULL is not supported"""
# TODO: http://www.firebirdfaq.org/faq103/
raise exceptions.NotSupportedError(
@ -50,6 +51,7 @@ class FBConstraintDropper(ansisql.ANSIConstraintDropper):
"""Firebird constaint dropper implementation."""
def cascade_constraint(self, constraint):
"""Cascading constraints is not supported"""
raise exceptions.NotSupportedError(
"Firebird does not support cascading constraints")

View File

@ -20,19 +20,13 @@ class MySQLColumnDropper(ansisql.ANSIColumnDropper):
class MySQLSchemaChanger(MySQLSchemaGenerator, ansisql.ANSISchemaChanger):
def visit_column(self, column):
delta = column.delta
table = column.table
colspec = self.get_column_specification(column)
if not hasattr(delta, 'result_column'):
# Mysql needs the whole column definition, not just a lone name/type
raise exceptions.NotSupportedError(
"A column object must be present in table to alter it")
def visit_column(self, delta):
table = delta.table
colspec = self.get_column_specification(delta.result_column)
old_col_name = self.preparer.quote(delta.current_name, table.quote)
self.start_alter_table(table)
old_col_name = self.preparer.quote(delta.current_name, column.quote)
self.append("CHANGE COLUMN %s " % old_col_name)
self.append(colspec)
self.execute()

View File

@ -32,27 +32,20 @@ class OracleSchemaChanger(OracleSchemaGenerator, ansisql.ANSISchemaChanger):
column.nullable = orig
return ret
def visit_column(self, column):
delta = column.delta
def visit_column(self, delta):
keys = delta.keys()
if len(set(('type', 'nullable', 'server_default')).intersection(keys)):
self._run_subvisit(delta,
self._visit_column_change,
start_alter=False)
# change name as the last action to avoid conflicts
if 'name' in keys:
self._run_subvisit(delta,
self._visit_column_name,
start_alter=False)
def _visit_column_change(self, table, col_name, delta):
if not hasattr(delta, 'result_column'):
# Oracle needs the whole column definition, not just a lone name/type
raise exceptions.NotSupportedError(
"A column object must be present in table to alter it")
if len(set(('type', 'nullable', 'server_default')).intersection(keys)):
self._run_subvisit(delta,
self._visit_column_change,
start_alter=False)
column = delta.result_column
def _visit_column_change(self, table, column, delta):
# Oracle cannot drop a default once created, but it can set it
# to null. We'll do that if default=None
# http://forums.oracle.com/forums/message.jspa?messageID=1273234#1273234

View File

@ -3,6 +3,9 @@
.. _`SQLite`: http://www.sqlite.org/
"""
from UserDict import DictMixin
from copy import copy
from sqlalchemy.databases import sqlite as sa_base
from migrate.changeset import ansisql, exceptions
@ -19,18 +22,25 @@ class SQLiteCommon(object):
class SQLiteHelper(SQLiteCommon):
def visit_column(self, column):
table = self._to_table(column.table)
def visit_column(self, delta):
if isinstance(delta, DictMixin):
column = delta.result_column
table = self._to_table(delta.table)
else:
column = delta
table = self._to_table(column.table)
table_name = self.preparer.format_table(table)
# we remove all constraints, indexes so it doesnt recreate them
ixbackup = copy(table.indexes)
consbackup = copy(table.constraints)
table.indexes = set()
table.constraints = set()
self.append('ALTER TABLE %s RENAME TO migration_tmp' % table_name)
self.execute()
insertion_string = self._modify_table(table, column)
insertion_string = self._modify_table(table, column, delta)
table.create()
self.append(insertion_string % {'table_name': table_name})
@ -38,6 +48,10 @@ class SQLiteHelper(SQLiteCommon):
self.append('DROP TABLE migration_tmp')
self.execute()
# restore indexes, constraints
table.indexes = ixbackup
table.constraints = consbackup
class SQLiteColumnGenerator(SQLiteSchemaGenerator, SQLiteCommon,
ansisql.ANSIColumnGenerator):
@ -51,7 +65,7 @@ class SQLiteColumnGenerator(SQLiteSchemaGenerator, SQLiteCommon,
class SQLiteColumnDropper(SQLiteHelper, ansisql.ANSIColumnDropper):
"""SQLite ColumnDropper"""
def _modify_table(self, table, column):
def _modify_table(self, table, column, delta):
columns = ' ,'.join(map(self.preparer.format_column, table.columns))
return 'INSERT INTO %(table_name)s SELECT ' + columns + \
' from migration_tmp'
@ -60,11 +74,8 @@ class SQLiteColumnDropper(SQLiteHelper, ansisql.ANSIColumnDropper):
class SQLiteSchemaChanger(SQLiteHelper, ansisql.ANSISchemaChanger):
"""SQLite SchemaChanger"""
def _modify_table(self, table, column):
delta = column.delta
def _modify_table(self, table, column, delta):
column = table.columns[delta.current_name]
for k, v in delta.items():
setattr(column, k, v)
return 'INSERT INTO %(table_name)s SELECT * from migration_tmp'
def visit_index(self, index):
@ -94,6 +105,7 @@ class SQLiteConstraintDropper(ansisql.ANSIColumnDropper, ansisql.ANSIConstraintC
self.execute()
# TODO: add not_supported tags for constraint dropper/generator
# TODO: technically primary key is a NOT NULL + UNIQUE constraint, should add NOT NULL to index
class SQLiteDialect(ansisql.ANSIDialect):
columngenerator = SQLiteColumnGenerator

View File

@ -1,11 +1,12 @@
"""
Schema module providing common schema operations.
"""
from UserDict import DictMixin
import sqlalchemy
from migrate.changeset.exceptions import *
from migrate.changeset.databases.visitor import (get_engine_visitor,
run_single_visitor)
from migrate.changeset.exceptions import *
__all__ = [
@ -17,13 +18,17 @@ __all__ = [
'ChangesetTable',
'ChangesetColumn',
'ChangesetIndex',
'ChangesetDefaultClause',
'ColumnDelta',
]
DEFAULT_ALTER_METADATA = True
def create_column(column, table=None, *p, **k):
"""Create a column, given the table
API to :meth:`column.create`
API to :meth:`ChangesetColumn.create`
"""
if table is not None:
return table.create_column(column, *p, **k)
@ -33,7 +38,7 @@ def create_column(column, table=None, *p, **k):
def drop_column(column, table=None, *p, **k):
"""Drop a column, given the table
API to :meth:`column.drop`
API to :meth:`ChangesetColumn.drop`
"""
if table is not None:
return table.drop_column(column, *p, **k)
@ -45,7 +50,7 @@ def rename_table(table, name, engine=None):
If Table instance is given, engine is not used.
API to :meth:`table.rename`
API to :meth:`ChangesetTable.rename`
:param table: Table to be renamed
:param name: new name
@ -64,7 +69,7 @@ def rename_index(index, name, table=None, engine=None):
If Index and Table object instances are given,
table and engine are not used.
API to :meth:`index.rename`
API to :meth:`ChangesetIndex.rename`
:param index: Index to be renamed
:param name: new name
@ -82,50 +87,25 @@ def rename_index(index, name, table=None, engine=None):
def alter_column(*p, **k):
"""Alter a column.
Parameters: column name, table name, an engine, and the properties
of that column to change
Direct API to :class:`ColumnDelta`
API to :meth:`column.alter`
:param table: Table or table name (will issue reflection)
:param engine: Will be used for reflection
:param alter_metadata: Defaults to True. It will alter changes also to objects.
"""
if len(p) and isinstance(p[0], sqlalchemy.Column):
col = p[0]
else:
col = None
k.setdefault('alter_metadata', DEFAULT_ALTER_METADATA)
if 'table' not in k:
k['table'] = col.table
if 'table' not in k and isinstance(p[0], sqlalchemy.Column):
k['table'] = p[0].table
if 'engine' not in k:
k['engine'] = k['table'].bind
engine = k['engine']
delta = _ColumnDelta(*p, **k)
delta.result_column.delta = delta
delta.result_column.table = delta.table
delta = ColumnDelta(*p, **k)
visitorcallable = get_engine_visitor(engine, 'schemachanger')
engine._run_visitor(visitorcallable, delta.result_column)
# Update column
if col is not None:
# Special case: change column key on rename, if key not
# explicit
#
# Used by SA : table.c.[key]
#
# This fails if the key was explit AND equal to the column
# name. (It changes the key name when it shouldn't.)
#
# Not much we can do about it.
if 'name' in delta.keys():
if (col.name == col.key):
newname = delta['name']
del col.table.c[col.key]
setattr(col, 'key', newname)
col.table.c[col.key] = col
# Change all other attrs
for key, val in delta.iteritems():
setattr(col, key, val)
engine._run_visitor(visitorcallable, delta)
def _to_table(table, engine=None):
@ -152,122 +132,250 @@ def _to_index(index, table=None, engine=None):
return ret
class _ColumnDelta(dict):
"""Extracts the differences between two columns/column-parameters"""
def __init__(self, *p, **k):
"""Extract ALTER-able differences from two columns.
class ColumnDelta(DictMixin, sqlalchemy.schema.SchemaItem):
"""Extracts the differences between two columns/column-parameters
May receive parameters arranged in several different ways:
* old_column_object,new_column_object,*parameters Identifies
attributes that differ between the two columns.
Parameters specified outside of either column are always
executed and override column differences.
* column_object,[current_name,]*parameters Parameters
specified are changed; table name is extracted from column
object. Name is changed to column_object.name from
current_name, if current_name is specified. If not
specified, name is unchanged.
* current_name,table,*parameters 'table' may be either an
object or a name
"""
* **current_column, new_column, \*p, \*\*kw**
Additional parameters can be specified to override column
differences.
* **current_column, \*p, \*\*kw**
Additional parameters alter current_column. Table name is extracted
from current_column object.
Name is changed to current_column.name from current_name,
if current_name is specified.
* **current_col_name, \*p, \*\*kw**
Table kw must specified.
:param table: Table at which current Column should be bound to.\
If table name is given, reflection will be used.
:type table: string or Table instance
:param alter_metadata: If True, it will apply changes to metadata.
:type alter_metadata: bool
:param metadata: If `alter_metadata` is true, \
metadata is used to reflect table names into
:type metadata: :class:`MetaData` instance
:param engine: When reflecting tables, either engine or metadata must \
be specified to acquire engine object.
:type engine: :class:`Engine` instance
:returns: :class:`ColumnDelta` instance provides interface for altered attributes to \
`result_column` through :func:`dict` alike object.
* :class:`ColumnDelta`.result_column is altered column with new attributes
* :class:`ColumnDelta`.current_name is current name of column in db
"""
# Column attributes that can be altered
diff_keys = ('name', 'type', 'primary_key', 'nullable',
'server_onupdate', 'server_default')
diffs = dict()
__visit_name__ = 'column'
def __init__(self, *p, **kw):
self.alter_metadata = kw.pop("alter_metadata", False)
self.meta = kw.pop("metadata", None)
self.engine = kw.pop("engine", None)
# Things are initialized differently depending on how many column
# parameters are given. Figure out how many and call the appropriate
# method.
if len(p) >= 1 and isinstance(p[0], sqlalchemy.Column):
# At least one column specified
if len(p) >= 2 and isinstance(p[1], sqlalchemy.Column):
# Two columns specified
func = self._init_2col
diffs = self.compare_2_columns(*p, **kw)
else:
# Exactly one column specified
func = self._init_1col
diffs = self.compare_1_column(*p, **kw)
else:
# Zero columns specified
func = self._init_0col
diffs = func(*p, **k)
self._set_diffs(diffs)
if not len(p) or not isinstance(p[0], basestring):
raise ValueError("First argument must be column name")
diffs = self.compare_parameters(*p, **kw)
# Column attributes that can be altered
diff_keys = ('name',
'type',
'nullable',
'default',
'server_default',
'primary_key',
'foreign_key')
self.apply_diffs(diffs)
@property
def table(self):
if isinstance(self._table, sqlalchemy.Table):
return self._table
def __repr__(self):
return '<ColumnDelta altermetadata=%r, %s>' % (self.alter_metadata,
super(ColumnDelta, self).__repr__())
def _init_0col(self, current_name, *p, **k):
p, k = self._init_normalize_params(p, k)
table = k.pop('table')
self.current_name = current_name
self._table = table
self.result_column = table.c.get(current_name, None)
def __getitem__(self, key):
if key not in self.keys():
raise KeyError("No such diff key, available: %s" % self.diffs )
return getattr(self.result_column, key)
def __setitem__(self, key, value):
if key not in self.keys():
raise KeyError("No such diff key, available: %s" % self.diffs )
setattr(self.result_column, key, value)
def __delitem__(self, key):
raise NotImplementedError
def keys(self):
return self.diffs.keys()
def compare_parameters(self, current_name, *p, **k):
"""Compares Column objects with reflection"""
self.table = k.pop('table')
self.result_column = self._table.c.get(current_name)
if len(p):
k = self._extract_parameters(p, k, self.result_column)
return k
def _init_1col(self, col, *p, **k):
p, k = self._init_normalize_params(p, k)
self._table = k.pop('table', None) or col.table
self.result_column = col.copy()
if 'current_name' in k:
# Renamed
self.current_name = k.pop('current_name')
k.setdefault('name', col.name)
else:
self.current_name = col.name
def compare_1_column(self, col, *p, **k):
"""Compares one Column object"""
self.table = k.pop('table', None) or col.table
self.result_column = col
if len(p):
k = self._extract_parameters(p, k, self.result_column)
return k
def _init_2col(self, start_col, end_col, *p, **k):
p, k = self._init_normalize_params(p, k)
self.result_column = start_col.copy()
self._table = k.pop('table', None) or start_col.table \
or end_col.table
self.current_name = start_col.name
for key in ('name', 'nullable', 'default', 'server_default',
'primary_key', 'foreign_key'):
val = getattr(end_col, key, None)
if getattr(start_col, key, None) != val:
def compare_2_columns(self, old_col, new_col, *p, **k):
"""Compares two Column objects"""
self.process_column(new_col)
self.table = k.pop('table', None) or old_col.table or new_col.table
self.result_column = old_col
# set differences
# leave out some stuff for later comp
for key in (set(self.diff_keys) - set(('type',))):
val = getattr(new_col, key, None)
if getattr(self.result_column, key, None) != val:
k.setdefault(key, val)
if not self.column_types_eq(start_col.type, end_col.type):
k.setdefault('type', end_col.type)
# inspect types
if not self.are_column_types_eq(self.result_column.type, new_col.type):
k.setdefault('type', new_col.type)
if len(p):
k = self._extract_parameters(p, k, self.result_column)
return k
def _init_normalize_params(self, p, k):
p = list(p)
if len(p):
k.setdefault('name', p.pop(0))
if len(p):
k.setdefault('type', p.pop(0))
# TODO: sequences? FKs?
return p, k
def _set_diffs(self, diffs):
def apply_diffs(self, diffs):
"""Populate dict and column object with new values"""
self.diffs = diffs
for key in self.diff_keys:
if key in diffs:
self[key] = diffs[key]
if getattr(self, 'result_column', None) is not None:
setattr(self.result_column, key, diffs[key])
setattr(self.result_column, key, diffs[key])
self.process_column(self.result_column)
# create an instance of class type if not yet
if 'type' in diffs and callable(self.result_column.type):
self.result_column.type = self.result_column.type()
# add column to the table
if self.table and self.alter_metadata:
self.result_column.add_to_table(self.table)
def are_column_types_eq(self, old_type, new_type):
"""Compares two types to be equal"""
ret = old_type.__class__ == new_type.__class__
def column_types_eq(self, this, that):
ret = isinstance(this, that.__class__)
ret = ret or isinstance(that, this.__class__)
# String length is a special case
if ret and isinstance(that, sqlalchemy.types.String):
ret = (getattr(this, 'length', None) == \
getattr(that, 'length', None))
if ret and isinstance(new_type, sqlalchemy.types.String):
ret = (getattr(old_type, 'length', None) == \
getattr(new_type, 'length', None))
return ret
def _extract_parameters(self, p, k, column):
"""Extracts data from p and modifies diffs"""
p = list(p)
while len(p):
if isinstance(p[0], basestring):
k.setdefault('name', p.pop(0))
elif isinstance(p[0], sqlalchemy.types.AbstractType):
k.setdefault('type', p.pop(0))
elif callable(p[0]):
p[0] = p[0]()
else:
break
if len(p):
new_col = column.copy_fixed()
new_col._init_items(*p)
k = self.compare_2_columns(column, new_col, **k)
return k
def process_column(self, column):
"""Processes default values for column"""
# XXX: this is a snippet from SA processing of positional parameters
if column.args:
toinit = list(column.args)
else:
toinit = list()
if column.server_default is not None:
if isinstance(column.server_default, sqlalchemy.FetchedValue):
toinit.append(column.server_default)
else:
toinit.append(sqlalchemy.DefaultClause(column.server_default))
if column.server_onupdate is not None:
if isinstance(column.server_onupdate, FetchedValue):
toinit.append(column.server_default)
else:
toinit.append(sqlalchemy.DefaultClause(column.server_onupdate,
for_update=True))
if toinit:
column._init_items(*toinit)
column.args = []
def _get_table(self):
return getattr(self, '_table', None)
def _set_table(self, table):
if isinstance(table, basestring):
if self.alter_metadata:
if not self.meta:
raise ValueError("metadata must be specified for table"
" reflection when using alter_metadata")
meta = self.meta
if self.engine:
meta.bind = self.engine
else:
if not self.engine and not self.meta:
raise ValueError("engine or metadata must be specified"
" to reflect tables")
if not self.engine:
self.engine = self.meta.bind
meta = sqlalchemy.MetaData(bind=self.engine)
self._table = sqlalchemy.Table(table, meta, autoload=True)
elif isinstance(table, sqlalchemy.Table):
self._table = table
if not self.alter_metadata:
self._table.meta = sqlalchemy.MetaData(bind=self._table.bind)
def _get_result_column(self):
return getattr(self, '_result_column', None)
def _set_result_column(self, column):
"""Set Column to Table based on alter_metadata evaluation."""
self.process_column(column)
if not hasattr(self, 'current_name'):
self.current_name = column.name
if self.alter_metadata:
self._result_column = column
# remove column from table, nothing has changed yet
if self.table:
column.remove_from_table(self.table)
else:
self._result_column = column.copy_fixed()
table = property(_get_table, _set_table)
result_column = property(_get_result_column, _set_result_column)
class ChangesetTable(object):
"""Changeset extensions to SQLAlchemy tables."""
def create_column(self, column):
def create_column(self, column, **kw):
"""Creates a column.
The column parameter may be a column definition or the name of
@ -278,7 +386,7 @@ class ChangesetTable(object):
column = getattr(self.c, str(column))
column.create(table=self)
def drop_column(self, column):
def drop_column(self, column, **kw):
"""Drop a column, given its name or definition."""
if not isinstance(column, sqlalchemy.Column):
# It's a column name
@ -327,17 +435,16 @@ class ChangesetColumn(object):
May supply a new column object, or a list of properties to
change.
For example; the following are equivalent:
col.alter(Column('myint', Integer, nullable=False))
col.alter('myint', Integer, nullable=False)
col.alter(name='myint', type=Integer, nullable=False)
For example; the following are equivalent::
Column name, type, default, and nullable may be changed
here. Note that for column defaults, only PassiveDefaults are
managed by the database - changing others doesn't make sense.
col.alter(Column('myint', Integer, DefaultClause('foobar')))
col.alter('myint', Integer, server_default='foobar', nullable=False)
col.alter(DefaultClause('foobar'), name='myint', type=Integer, nullable=False)
:param table: Table to be altered
:param engine: Engine to be used
Column name, type, server_default, and nullable may be changed
here.
Direct API to :func:`alter_column`
"""
if 'table' not in k:
k['table'] = self.table
@ -371,8 +478,8 @@ class ChangesetColumn(object):
"""
if table is not None:
self.table = table
self.remove_from_table(self.table)
engine = self.table.bind
self.remove_from_table(self.table, unset_table=False)
visitorcallable = get_engine_visitor(engine, 'columndropper')
engine._run_visitor(visitorcallable, self, *args, **kwargs)
return self
@ -381,12 +488,31 @@ class ChangesetColumn(object):
if table and not self.table:
self._set_parent(table)
def remove_from_table(self, table):
def remove_from_table(self, table, unset_table=True):
# TODO: remove indexes, primary keys, constraints, etc
if unset_table:
self.table = None
if table.c.contains_column(self):
table.c.remove(self)
# TODO: this is fixed in 0.6
def copy_fixed(self, **kw):
"""Create a copy of this ``Column``, with all attributes."""
return sqlalchemy.Column(self.name, self.type, self.default,
key=self.key,
primary_key=self.primary_key,
nullable=self.nullable,
quote=self.quote,
index=self.index,
unique=self.unique,
onupdate=self.onupdate,
autoincrement=self.autoincrement,
server_default=self.server_default,
server_onupdate=self.server_onupdate,
*[c.copy(**kw) for c in self.constraints])
def _check_sanity_constraints(self, name):
obj = getattr(self, name)
if (getattr(self, name[:-5]) and not obj):
raise InvalidConstraintError("Column.create() accepts index_name,"
@ -412,3 +538,15 @@ class ChangesetIndex(object):
visitorcallable = get_engine_visitor(engine, 'schemachanger')
engine._run_visitor(visitorcallable, self, *args, **kwargs)
self.name = name
class ChangesetDefaultClause(object):
"""Implements comparison between :class:`DefaultClause` instances"""
def __eq__(self, other):
if isinstance(other, self.__class__):
if self.arg == other.arg:
return True
def __ne__(self, other):
return not self.__eq__(other)

View File

@ -5,13 +5,14 @@ from sqlalchemy import *
from migrate import changeset
from migrate.changeset import *
from migrate.changeset.schema import _ColumnDelta
from migrate.changeset.schema import ColumnDelta
from test import fixture
class TestAddDropColumn(fixture.DB):
"""Test add/drop column through all possible interfaces
also test for constraints"""
also test for constraints
"""
level = fixture.DB.CONNECT
table_name = 'tmp_adddropcol'
table_int = 0
@ -272,12 +273,10 @@ class TestAddDropColumn(fixture.DB):
self.assertEqual(u'foobar', row['data'])
col.drop()
# TODO: test sequence
# TODO: test that if column is appended on creation and removed on deletion
# TODO: test column.alter with all changes at one time
# TODO: test quoting
# TODO: test drop default
# TODO: test non-autoname constraints
class TestRename(fixture.DB):
@ -445,23 +444,6 @@ class TestColumnChange(fixture.DB):
self.table.c.data # Should not raise exception
self.assertEquals(num_rows(self.table.c.data,content), 1)
#@fixture.usedb()
#def test_fk(self):
# """Can add/drop foreign key constraints to/from a column
# Not supported
# """
# self.assert_(self.table.c.data.foreign_key is None)
# # add
# self.table.c.data.alter(foreign_key=ForeignKey(self.table.c.id))
# self.refresh_table(self.table.name)
# self.assert_(self.table.c.data.foreign_key is not None)
# # drop
# self.table.c.data.alter(foreign_key=None)
# self.refresh_table(self.table.name)
# self.assert_(self.table.c.data.foreign_key is None)
@fixture.usedb()
def test_type(self):
"""Can change a column's type"""
@ -508,6 +490,9 @@ class TestColumnChange(fixture.DB):
#self.assertEquals(self.table.c.data.server_default.arg,default)
# TextClause returned by autoload
self.assert_(default in str(self.table.c.data.server_default.arg))
self.engine.execute(self.table.insert(), id=12)
row = self.table.select(autocommit=True).execute().fetchone()
self.assertEqual(row['data'], default)
# Column object
default = 'your_default'
@ -515,13 +500,15 @@ class TestColumnChange(fixture.DB):
self.refresh_table(self.table.name)
self.assert_(default in str(self.table.c.data.server_default.arg))
# Remove default
# Drop/remove default
self.table.c.data.alter(server_default=None)
self.assertEqual(self.table.c.data.server_default, None)
self.refresh_table(self.table.name)
# server_default isn't necessarily None for Oracle
#self.assert_(self.table.c.data.server_default is None,self.table.c.data.server_default)
self.engine.execute(self.table.insert(), id=11)
row = self.table.select().execute().fetchone()
row = self.table.select(self.table.c.id == 11, autocommit=True).execute().fetchone()
self.assert_(row['data'] is None, row['data'])
@ -541,80 +528,225 @@ class TestColumnChange(fixture.DB):
self.refresh_table(self.table.name)
self.assertEquals(self.table.c.data.nullable, True)
#@fixture.usedb()
#def test_pk(self):
# """Can add/drop a column to/from its table's primary key
# Not supported
# """
# self.engine.echo = True
# self.assertEquals(len(self.table.primary_key), 1)
@fixture.usedb()
def test_alter_metadata(self):
"""Test if alter_metadata is respected"""
# # Entire column definition
# self.table.c.data.alter(Column('data', String, primary_key=True))
# self.refresh_table(self.table.name)
# self.assertEquals(len(self.table.primary_key), 2)
self.table.c.data.alter(Column('data', String(100)))
# # Just the new status
# self.table.c.data.alter(primary_key=False)
# self.refresh_table(self.table.name)
# self.assertEquals(len(self.table.primary_key), 1)
self.assert_(isinstance(self.table.c.data.type, String))
self.assertEqual(self.table.c.data.type.length, 100)
# nothing should change
self.table.c.data.alter(Column('data', String(200)), alter_metadata=False)
self.assert_(isinstance(self.table.c.data.type, String))
self.assertEqual(self.table.c.data.type.length, 100)
@fixture.usedb()
def test_alter_all(self):
"""Tests all alter changes at one time"""
# test for each db separately
# since currently some dont support everything
# test pre settings
self.assertEqual(self.table.c.data.nullable, True)
self.assertEqual(self.table.c.data.server_default.arg, 'tluafed')
self.assertEqual(self.table.c.data.name, 'data')
self.assertTrue(isinstance(self.table.c.data.type, String))
self.assertTrue(self.table.c.data.type.length, 40)
kw = dict(nullable=False,
server_default='foobar',
name='data_new',
type=String(50),
alter_metadata=True)
if self.engine.name == 'firebird':
del kw['nullable']
self.table.c.data.alter(**kw)
# test altered objects
self.assertEqual(self.table.c.data.server_default.arg, 'foobar')
if not self.engine.name == 'firebird':
self.assertEqual(self.table.c.data.nullable, False)
self.assertEqual(self.table.c.data.name, 'data_new')
self.assertEqual(self.table.c.data.type.length, 50)
self.refresh_table(self.table.name)
# test post settings
if not self.engine.name == 'firebird':
self.assertEqual(self.table.c.data_new.nullable, False)
self.assertEqual(self.table.c.data_new.name, 'data_new')
self.assertTrue(isinstance(self.table.c.data_new.type, String))
self.assertTrue(self.table.c.data_new.type.length, 50)
# insert data and assert default
self.table.insert(values={'id': 10}).execute()
row = self.table.select(autocommit=True).execute().fetchone()
self.assertEqual(u'foobar', row['data_new'])
class TestColumnDelta(fixture.Base):
def test_deltas(self):
def mkcol(name='id', type=String, *p, **k):
return Column(name, type, *p, **k)
class TestColumnDelta(fixture.DB):
"""Tests ColumnDelta class"""
def verify(expected, original, *p, **k):
delta = _ColumnDelta(original, *p, **k)
result = delta.keys()
result.sort()
self.assertEquals(expected, result)
return delta
level = fixture.DB.CONNECT
table_name = 'tmp_coldelta'
table_int = 0
col_orig = mkcol(primary_key=True)
def _setup(self, url):
super(TestColumnDelta, self)._setup(url)
self.meta = MetaData()
self.table = Table(self.table_name, self.meta,
Column('ids', String(10)),
)
self.meta.bind = self.engine
if self.engine.has_table(self.table.name):
self.table.drop()
self.table.create()
verify([], col_orig)
verify(['name'], col_orig, 'ids')
# Parameters are always executed, even if they're 'unchanged'
# (We can't assume given column is up-to-date)
verify(['name', 'primary_key', 'type'],
col_orig, 'id', Integer, primary_key=True)
verify(['name', 'primary_key', 'type'],
col_orig, name='id', type=Integer, primary_key=True)
def _teardown(self):
if self.engine.has_table(self.table.name):
self.table.drop()
self.meta.clear()
super(TestColumnDelta,self)._teardown()
# Can compare two columns and find differences
col_new = mkcol(name='ids', primary_key=True)
verify([], col_orig, col_orig)
verify(['name'], 'ids', table=Table('test', MetaData()), name='hey')
verify(['name'], col_orig, col_orig, 'ids')
verify(['name'], col_orig, col_orig, name='ids')
verify(['name'], col_orig, col_new)
verify(['name','type'], col_orig, col_new, type=String)
def mkcol(self, name='id', type=String, *p, **k):
return Column(name, type, *p, **k)
# Change name, given an up-to-date definition and the current name
delta = verify(['name'], col_new, current_name='id')
self.assertEquals(delta.get('name'), 'ids')
def verify(self, expected, original, *p, **k):
self.delta = ColumnDelta(original, *p, **k)
result = self.delta.keys()
result.sort()
self.assertEquals(expected, result)
return self.delta
# Change other params at the same time
verify(['name', 'type'], col_new, current_name='id', type=String)
def test_deltas_two_columns(self):
"""Testing ColumnDelta with two columns"""
col_orig = self.mkcol(primary_key=True)
col_new = self.mkcol(name='ids', primary_key=True)
self.verify([], col_orig, col_orig)
self.verify(['name'], col_orig, col_orig, 'ids')
self.verify(['name'], col_orig, col_orig, name='ids')
self.verify(['name'], col_orig, col_new)
self.verify(['name', 'type'], col_orig, col_new, type=String)
# Type comparisons
verify([], mkcol(type=String), mkcol(type=String))
verify(['type'], mkcol(type=String), mkcol(type=Integer))
verify(['type'], mkcol(type=String), mkcol(type=String(42)))
verify([], mkcol(type=String(42)), mkcol(type=String(42)))
verify(['type'], mkcol(type=String(24)), mkcol(type=String(42)))
self.verify([], self.mkcol(type=String), self.mkcol(type=String))
self.verify(['type'], self.mkcol(type=String), self.mkcol(type=Integer))
self.verify(['type'], self.mkcol(type=String), self.mkcol(type=String(42)))
self.verify([], self.mkcol(type=String(42)), self.mkcol(type=String(42)))
self.verify(['type'], self.mkcol(type=String(24)), self.mkcol(type=String(42)))
self.verify(['type'], self.mkcol(type=String(24)), self.mkcol(type=Text(24)))
# Other comparisons
verify(['primary_key'], mkcol(nullable=False), mkcol(primary_key=True))
self.verify(['primary_key'], self.mkcol(nullable=False), self.mkcol(primary_key=True))
# PK implies nullable=False
verify(['nullable', 'primary_key'],
mkcol(nullable=True), mkcol(primary_key=True))
verify([], mkcol(primary_key=True), mkcol(primary_key=True))
verify(['nullable'], mkcol(nullable=True), mkcol(nullable=False))
verify([], mkcol(nullable=True), mkcol(nullable=True))
verify(['default'], mkcol(default=None), mkcol(default='42'))
verify([], mkcol(default=None), mkcol(default=None))
verify([], mkcol(default='42'), mkcol(default='42'))
self.verify(['nullable', 'primary_key'], self.mkcol(nullable=True), self.mkcol(primary_key=True))
self.verify([], self.mkcol(primary_key=True), self.mkcol(primary_key=True))
self.verify(['nullable'], self.mkcol(nullable=True), self.mkcol(nullable=False))
self.verify([], self.mkcol(nullable=True), self.mkcol(nullable=True))
self.verify([], self.mkcol(server_default=None), self.mkcol(server_default=None))
self.verify([], self.mkcol(server_default='42'), self.mkcol(server_default='42'))
# test server default
delta = self.verify(['server_default'], self.mkcol(), self.mkcol('id', String, DefaultClause('foobar')))
self.assertEqual(delta['server_default'].arg, 'foobar')
self.verify([], self.mkcol(server_default='foobar'), self.mkcol('id', String, DefaultClause('foobar')))
self.verify(['type'], self.mkcol(server_default='foobar'), self.mkcol('id', Text, DefaultClause('foobar')))
# test alter_metadata
col = self.mkcol(server_default='foobar')
self.verify(['type'], col, self.mkcol('id', Text, DefaultClause('foobar')), alter_metadata=True)
self.assert_(isinstance(col.type, Text))
col = self.mkcol()
self.verify(['name', 'server_default', 'type'], col, self.mkcol('beep', Text, DefaultClause('foobar')), alter_metadata=True)
self.assert_(isinstance(col.type, Text))
self.assertEqual(col.name, 'beep')
self.assertEqual(col.server_default.arg, 'foobar')
col = self.mkcol()
self.verify(['name', 'server_default', 'type'], col, self.mkcol('beep', Text, DefaultClause('foobar')), alter_metadata=False)
self.assertFalse(isinstance(col.type, Text))
self.assertNotEqual(col.name, 'beep')
self.assertFalse(col.server_default)
@fixture.usedb()
def test_deltas_zero_columns(self):
"""Testing ColumnDelta with zero columns"""
self.verify(['name'], 'ids', table=self.table, name='hey')
# test reflection
self.verify(['type'], 'ids', table=self.table.name, type=String(80), engine=self.engine)
self.verify(['type'], 'ids', table=self.table.name, type=String(80), metadata=self.meta)
# check if alter_metadata is respected
self.meta.clear()
delta = self.verify(['type'], 'ids', table=self.table.name, type=String(80), alter_metadata=True, metadata=self.meta)
self.assert_(self.table.name in self.meta)
self.assertEqual(delta.result_column.type.length, 80)
self.assertEqual(self.meta.tables.get(self.table.name).c.ids.type.length, 80)
self.meta.clear()
self.verify(['type'], 'ids', table=self.table.name, type=String(80), alter_metadata=False, engine=self.engine)
self.assert_(self.table.name not in self.meta)
self.meta.clear()
self.verify(['type'], 'ids', table=self.table.name, type=String(80), alter_metadata=False, metadata=self.meta)
self.assert_(self.table.name not in self.meta)
# test defaults
self.meta.clear()
self.verify(['server_default'], 'ids', table=self.table.name, server_default='foobar', alter_metadata=True, metadata=self.meta)
self.meta.tables.get(self.table.name).c.ids.server_default.arg == 'foobar'
# test missing parameters
self.assertRaises(ValueError, ColumnDelta, table=self.table.name)
self.assertRaises(ValueError, ColumnDelta, 'ids', table=self.table.name, alter_metadata=True)
self.assertRaises(ValueError, ColumnDelta, 'ids', table=self.table.name, alter_metadata=False)
def test_deltas_one_column(self):
"""Testing ColumnDelta with one column"""
col_orig = self.mkcol(primary_key=True)
self.verify([], col_orig)
self.verify(['name'], col_orig, 'ids')
# Parameters are always executed, even if they're 'unchanged'
# (We can't assume given column is up-to-date)
self.verify(['name', 'primary_key', 'type'], col_orig, 'id', Integer, primary_key=True)
self.verify(['name', 'primary_key', 'type'], col_orig, name='id', type=Integer, primary_key=True)
# Change name, given an up-to-date definition and the current name
delta = self.verify(['name'], col_orig, name='blah')
self.assertEquals(delta.get('name'), 'blah')
self.assertEquals(delta.current_name, 'id')
# check if alter_metadata is respected
col_orig = self.mkcol(primary_key=True)
self.verify(['name', 'type'], col_orig, name='id12', type=Text, alter_metadata=True)
self.assert_(isinstance(col_orig.type, Text))
self.assertEqual(col_orig.name, 'id12')
col_orig = self.mkcol(primary_key=True)
self.verify(['name', 'type'], col_orig, name='id12', type=Text, alter_metadata=False)
self.assert_(isinstance(col_orig.type, String))
self.assertEqual(col_orig.name, 'id')
# test server default
col_orig = self.mkcol(primary_key=True)
delta = self.verify(['server_default'], col_orig, DefaultClause('foobar'))
self.assertEqual(delta['server_default'].arg, 'foobar')
delta = self.verify(['server_default'], col_orig, server_default=DefaultClause('foobar'))
self.assertEqual(delta['server_default'].arg, 'foobar')
# no change
col_orig = self.mkcol(server_default=DefaultClause('foobar'))
delta = self.verify(['type'], col_orig, DefaultClause('foobar'), type=PickleType)
self.assert_(isinstance(delta.result_column.type, PickleType))
# TODO: test server on update
# TODO: test bind metadata